You are viewing a plain text version of this content. The canonical link for it is here.
Posted to mapreduce-commits@hadoop.apache.org by sh...@apache.org on 2009/07/29 06:38:41 UTC

svn commit: r798778 - in /hadoop/mapreduce/trunk: ./ src/java/org/apache/hadoop/mapred/lib/ src/java/org/apache/hadoop/mapreduce/lib/fieldsel/ src/test/mapred/org/apache/hadoop/mapred/ src/test/mapred/org/apache/hadoop/mapreduce/ src/test/mapred/org/ap...

Author: sharad
Date: Wed Jul 29 04:38:40 2009
New Revision: 798778

URL: http://svn.apache.org/viewvc?rev=798778&view=rev
Log:
MAPREDUCE-373. Change org.apache.hadoop.mapred.lib.FieldSelectionMapReduce to use new mapreduce API. Contributed by Amareshwari Sriramadasu.

Added:
    hadoop/mapreduce/trunk/src/java/org/apache/hadoop/mapreduce/lib/fieldsel/
    hadoop/mapreduce/trunk/src/java/org/apache/hadoop/mapreduce/lib/fieldsel/FieldSelectionHelper.java
    hadoop/mapreduce/trunk/src/java/org/apache/hadoop/mapreduce/lib/fieldsel/FieldSelectionMapper.java
    hadoop/mapreduce/trunk/src/java/org/apache/hadoop/mapreduce/lib/fieldsel/FieldSelectionReducer.java
    hadoop/mapreduce/trunk/src/test/mapred/org/apache/hadoop/mapreduce/lib/fieldsel/
    hadoop/mapreduce/trunk/src/test/mapred/org/apache/hadoop/mapreduce/lib/fieldsel/TestMRFieldSelection.java
Modified:
    hadoop/mapreduce/trunk/CHANGES.txt
    hadoop/mapreduce/trunk/src/java/org/apache/hadoop/mapred/lib/FieldSelectionMapReduce.java
    hadoop/mapreduce/trunk/src/test/mapred/org/apache/hadoop/mapred/TestFieldSelection.java
    hadoop/mapreduce/trunk/src/test/mapred/org/apache/hadoop/mapreduce/MapReduceTestUtil.java

Modified: hadoop/mapreduce/trunk/CHANGES.txt
URL: http://svn.apache.org/viewvc/hadoop/mapreduce/trunk/CHANGES.txt?rev=798778&r1=798777&r2=798778&view=diff
==============================================================================
--- hadoop/mapreduce/trunk/CHANGES.txt (original)
+++ hadoop/mapreduce/trunk/CHANGES.txt Wed Jul 29 04:38:40 2009
@@ -147,6 +147,9 @@
     MAPREDUCE-369. Change org.apache.hadoop.mapred.lib.MultipleInputs to 
     use new api. (Amareshwari Sriramadasu via sharad)
 
+    MAPREDUCE-373. Change org.apache.hadoop.mapred.lib.FieldSelectionMapReduce
+    to use new api. (Amareshwari Sriramadasu via sharad)
+
   BUG FIXES
     MAPREDUCE-703. Sqoop requires dependency on hsqldb in ivy.
     (Aaron Kimball via matei)

Modified: hadoop/mapreduce/trunk/src/java/org/apache/hadoop/mapred/lib/FieldSelectionMapReduce.java
URL: http://svn.apache.org/viewvc/hadoop/mapreduce/trunk/src/java/org/apache/hadoop/mapred/lib/FieldSelectionMapReduce.java?rev=798778&r1=798777&r2=798778&view=diff
==============================================================================
--- hadoop/mapreduce/trunk/src/java/org/apache/hadoop/mapred/lib/FieldSelectionMapReduce.java (original)
+++ hadoop/mapreduce/trunk/src/java/org/apache/hadoop/mapred/lib/FieldSelectionMapReduce.java Wed Jul 29 04:38:40 2009
@@ -21,6 +21,7 @@
 import java.io.IOException;
 import java.util.ArrayList;
 import java.util.Iterator;
+import java.util.List;
 
 import org.apache.commons.logging.Log;
 import org.apache.commons.logging.LogFactory;
@@ -31,6 +32,7 @@
 import org.apache.hadoop.mapred.Reducer;
 import org.apache.hadoop.mapred.Reporter;
 import org.apache.hadoop.mapred.TextInputFormat;
+import org.apache.hadoop.mapreduce.lib.fieldsel.*;
 
 /**
  * This class implements a mapper/reducer class that can be used to perform
@@ -59,8 +61,10 @@
  * 
  * The reducer extracts output key/value pairs in a similar manner, except that
  * the key is never ignored.
- * 
+ * @deprecated Use {@link FieldSelectionMapper} and 
+ * {@link FieldSelectionReducer} instead
  */
+@Deprecated
 public class FieldSelectionMapReduce<K, V>
     implements Mapper<K, V, Text, Text>, Reducer<Text, Text, Text, Text> {
 
@@ -70,21 +74,20 @@
 
   private String fieldSeparator = "\t";
 
-  private int[] mapOutputKeyFieldList = null;
+  private List<Integer> mapOutputKeyFieldList = new ArrayList<Integer>();
 
-  private int[] mapOutputValueFieldList = null;
+  private List<Integer> mapOutputValueFieldList = new ArrayList<Integer>();
 
   private int allMapValueFieldsFrom = -1;
 
   private String reduceOutputKeyValueSpec;
 
-  private int[] reduceOutputKeyFieldList = null;
+  private List<Integer> reduceOutputKeyFieldList = new ArrayList<Integer>();
 
-  private int[] reduceOutputValueFieldList = null;
+  private List<Integer> reduceOutputValueFieldList = new ArrayList<Integer>();
 
   private int allReduceValueFieldsFrom = -1;
 
-  private static Text emptyText = new Text("");
 
   public static final Log LOG = LogFactory.getLog("FieldSelectionMapReduce");
 
@@ -106,25 +109,25 @@
     int i = 0;
 
     sb.append("mapOutputKeyFieldList.length: ").append(
-        mapOutputKeyFieldList.length).append("\n");
-    for (i = 0; i < mapOutputKeyFieldList.length; i++) {
-      sb.append("\t").append(mapOutputKeyFieldList[i]).append("\n");
+        mapOutputKeyFieldList.size()).append("\n");
+    for (i = 0; i < mapOutputKeyFieldList.size(); i++) {
+      sb.append("\t").append(mapOutputKeyFieldList.get(i)).append("\n");
     }
     sb.append("mapOutputValueFieldList.length: ").append(
-        mapOutputValueFieldList.length).append("\n");
-    for (i = 0; i < mapOutputValueFieldList.length; i++) {
-      sb.append("\t").append(mapOutputValueFieldList[i]).append("\n");
+        mapOutputValueFieldList.size()).append("\n");
+    for (i = 0; i < mapOutputValueFieldList.size(); i++) {
+      sb.append("\t").append(mapOutputValueFieldList.get(i)).append("\n");
     }
 
     sb.append("reduceOutputKeyFieldList.length: ").append(
-        reduceOutputKeyFieldList.length).append("\n");
-    for (i = 0; i < reduceOutputKeyFieldList.length; i++) {
-      sb.append("\t").append(reduceOutputKeyFieldList[i]).append("\n");
+        reduceOutputKeyFieldList.size()).append("\n");
+    for (i = 0; i < reduceOutputKeyFieldList.size(); i++) {
+      sb.append("\t").append(reduceOutputKeyFieldList.get(i)).append("\n");
     }
     sb.append("reduceOutputValueFieldList.length: ").append(
-        reduceOutputValueFieldList.length).append("\n");
-    for (i = 0; i < reduceOutputValueFieldList.length; i++) {
-      sb.append("\t").append(reduceOutputValueFieldList[i]).append("\n");
+        reduceOutputValueFieldList.size()).append("\n");
+    for (i = 0; i < reduceOutputValueFieldList.size(); i++) {
+      sb.append("\t").append(reduceOutputValueFieldList.get(i)).append("\n");
     }
     return sb.toString();
   }
@@ -133,131 +136,23 @@
    * The identify function. Input key/value pair is written directly to output.
    */
   public void map(K key, V val,
-                  OutputCollector<Text, Text> output, Reporter reporter) throws IOException {
-    String valStr = val.toString();
-    String[] inputValFields = valStr.split(this.fieldSeparator);
-    String[] inputKeyFields = null;
-    String[] fields = null;
-    if (this.ignoreInputKey) {
-      fields = inputValFields;
-    } else {
-      inputKeyFields = key.toString().split(this.fieldSeparator);
-      fields = new String[inputKeyFields.length + inputValFields.length];
-      int i = 0;
-      for (i = 0; i < inputKeyFields.length; i++) {
-        fields[i] = inputKeyFields[i];
-      }
-      for (i = 0; i < inputValFields.length; i++) {
-        fields[inputKeyFields.length + i] = inputValFields[i];
-      }
-    }
-    String newKey = selectFields(fields, mapOutputKeyFieldList, -1,
-        fieldSeparator);
-    String newVal = selectFields(fields, mapOutputValueFieldList,
-        allMapValueFieldsFrom, fieldSeparator);
-
-    if (newKey == null) {
-      newKey = newVal;
-      newVal = null;
-    }
-    Text newTextKey = emptyText;
-    if (newKey != null) {
-      newTextKey = new Text(newKey);
-    }
-    Text newTextVal = emptyText;
-    if (newTextVal != null) {
-      newTextVal = new Text(newVal);
-    }
-    output.collect(newTextKey, newTextVal);
-  }
-
-  /**
-   * Extract the actual field numbers from the given field specs.
-   * If a field spec is in the form of "n-" (like 3-), then n will be the 
-   * return value. Otherwise, -1 will be returned.  
-   * @param fieldListSpec an array of field specs
-   * @param fieldList an array of field numbers extracted from the specs.
-   * @return number n if some field spec is in the form of "n-", -1 otherwise.
-   */
-  private int extractFields(String[] fieldListSpec,
-                            ArrayList<Integer> fieldList) {
-    int allFieldsFrom = -1;
-    int i = 0;
-    int j = 0;
-    int pos = -1;
-    String fieldSpec = null;
-    for (i = 0; i < fieldListSpec.length; i++) {
-      fieldSpec = fieldListSpec[i];
-      if (fieldSpec.length() == 0) {
-        continue;
-      }
-      pos = fieldSpec.indexOf('-');
-      if (pos < 0) {
-        Integer fn = new Integer(fieldSpec);
-        fieldList.add(fn);
-      } else {
-        String start = fieldSpec.substring(0, pos);
-        String end = fieldSpec.substring(pos + 1);
-        if (start.length() == 0) {
-          start = "0";
-        }
-        if (end.length() == 0) {
-          allFieldsFrom = Integer.parseInt(start);
-          continue;
-        }
-        int startPos = Integer.parseInt(start);
-        int endPos = Integer.parseInt(end);
-        for (j = startPos; j <= endPos; j++) {
-          fieldList.add(j);
-        }
-      }
-    }
-    return allFieldsFrom;
+      OutputCollector<Text, Text> output, Reporter reporter) 
+      throws IOException {
+    FieldSelectionHelper helper = new FieldSelectionHelper(
+      FieldSelectionHelper.emptyText, FieldSelectionHelper.emptyText);
+    helper.extractOutputKeyValue(key.toString(), val.toString(),
+      fieldSeparator, mapOutputKeyFieldList, mapOutputValueFieldList,
+      allMapValueFieldsFrom, ignoreInputKey, true);
+    output.collect(helper.getKey(), helper.getValue());
   }
 
   private void parseOutputKeyValueSpec() {
-    String[] mapKeyValSpecs = mapOutputKeyValueSpec.split(":", -1);
-    String[] mapKeySpec = mapKeyValSpecs[0].split(",");
-    String[] mapValSpec = new String[0];
-    if (mapKeyValSpecs.length > 1) {
-      mapValSpec = mapKeyValSpecs[1].split(",");
-    }
-
-    int i = 0;
-    ArrayList<Integer> fieldList = new ArrayList<Integer>();
-    extractFields(mapKeySpec, fieldList);
-    this.mapOutputKeyFieldList = new int[fieldList.size()];
-    for (i = 0; i < fieldList.size(); i++) {
-      this.mapOutputKeyFieldList[i] = fieldList.get(i).intValue();
-    }
-
-    fieldList = new ArrayList<Integer>();
-    allMapValueFieldsFrom = extractFields(mapValSpec, fieldList);
-    this.mapOutputValueFieldList = new int[fieldList.size()];
-    for (i = 0; i < fieldList.size(); i++) {
-      this.mapOutputValueFieldList[i] = fieldList.get(i).intValue();
-    }
-
-    String[] reduceKeyValSpecs = reduceOutputKeyValueSpec.split(":", -1);
-    String[] reduceKeySpec = reduceKeyValSpecs[0].split(",");
-    String[] reduceValSpec = new String[0];
-    if (reduceKeyValSpecs.length > 1) {
-      reduceValSpec = reduceKeyValSpecs[1].split(",");
-    }
-
-    fieldList = new ArrayList<Integer>();
-    extractFields(reduceKeySpec, fieldList);
-    this.reduceOutputKeyFieldList = new int[fieldList.size()];
-    for (i = 0; i < fieldList.size(); i++) {
-      this.reduceOutputKeyFieldList[i] = fieldList.get(i).intValue();
-    }
-
-    fieldList = new ArrayList<Integer>();
-    allReduceValueFieldsFrom = extractFields(reduceValSpec, fieldList);
-    this.reduceOutputValueFieldList = new int[fieldList.size()];
-    for (i = 0; i < fieldList.size(); i++) {
-      this.reduceOutputValueFieldList[i] = fieldList.get(i).intValue();
-    }
+    allMapValueFieldsFrom = FieldSelectionHelper.parseOutputKeyValueSpec(
+      mapOutputKeyValueSpec, mapOutputKeyFieldList, mapOutputValueFieldList);
+    
+    allReduceValueFieldsFrom = FieldSelectionHelper.parseOutputKeyValueSpec(
+      reduceOutputKeyValueSpec, reduceOutputKeyFieldList,
+      reduceOutputValueFieldList);
   }
 
   public void configure(JobConf job) {
@@ -277,61 +172,16 @@
 
   }
 
-  private static String selectFields(String[] fields, int[] fieldList,
-      int allFieldsFrom, String separator) {
-    String retv = null;
-    int i = 0;
-    StringBuffer sb = null;
-    if (fieldList != null && fieldList.length > 0) {
-      if (sb == null) {
-        sb = new StringBuffer();
-      }
-      for (i = 0; i < fieldList.length; i++) {
-        if (fieldList[i] < fields.length) {
-          sb.append(fields[fieldList[i]]);
-        }
-        sb.append(separator);
-      }
-    }
-    if (allFieldsFrom >= 0) {
-      if (sb == null) {
-        sb = new StringBuffer();
-      }
-      for (i = allFieldsFrom; i < fields.length; i++) {
-        sb.append(fields[i]).append(separator);
-      }
-    }
-    if (sb != null) {
-      retv = sb.toString();
-      if (retv.length() > 0) {
-        retv = retv.substring(0, retv.length() - 1);
-      }
-    }
-    return retv;
-  }
-
   public void reduce(Text key, Iterator<Text> values,
                      OutputCollector<Text, Text> output, Reporter reporter)
     throws IOException {
-
     String keyStr = key.toString() + this.fieldSeparator;
     while (values.hasNext()) {
-      String valStr = values.next().toString();
-      valStr = keyStr + valStr;
-      String[] fields = valStr.split(this.fieldSeparator);
-      String newKey = selectFields(fields, reduceOutputKeyFieldList, -1,
-          fieldSeparator);
-      String newVal = selectFields(fields, reduceOutputValueFieldList,
-          allReduceValueFieldsFrom, fieldSeparator);
-      Text newTextKey = null;
-      if (newKey != null) {
-        newTextKey = new Text(newKey);
-      }
-      Text newTextVal = null;
-      if (newVal != null) {
-        newTextVal = new Text(newVal);
-      }
-      output.collect(newTextKey, newTextVal);
+        FieldSelectionHelper helper = new FieldSelectionHelper();
+        helper.extractOutputKeyValue(keyStr, values.next().toString(),
+          fieldSeparator, reduceOutputKeyFieldList,
+          reduceOutputValueFieldList, allReduceValueFieldsFrom, false, false);
+      output.collect(helper.getKey(), helper.getValue());
     }
   }
 }

Added: hadoop/mapreduce/trunk/src/java/org/apache/hadoop/mapreduce/lib/fieldsel/FieldSelectionHelper.java
URL: http://svn.apache.org/viewvc/hadoop/mapreduce/trunk/src/java/org/apache/hadoop/mapreduce/lib/fieldsel/FieldSelectionHelper.java?rev=798778&view=auto
==============================================================================
--- hadoop/mapreduce/trunk/src/java/org/apache/hadoop/mapreduce/lib/fieldsel/FieldSelectionHelper.java (added)
+++ hadoop/mapreduce/trunk/src/java/org/apache/hadoop/mapreduce/lib/fieldsel/FieldSelectionHelper.java Wed Jul 29 04:38:40 2009
@@ -0,0 +1,214 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.hadoop.mapreduce.lib.fieldsel;
+
+import java.util.List;
+
+import org.apache.hadoop.io.Text;
+
+/**
+ * This class implements a mapper/reducer class that can be used to perform
+ * field selections in a manner similar to unix cut. The input data is treated
+ * as fields separated by a user specified separator (the default value is
+ * "\t"). The user can specify a list of fields that form the map output keys,
+ * and a list of fields that form the map output values. If the inputformat is
+ * TextInputFormat, the mapper will ignore the key to the map function. and the
+ * fields are from the value only. Otherwise, the fields are the union of those
+ * from the key and those from the value.
+ * 
+ * The field separator is under attribute "mapred.data.field.separator"
+ * 
+ * The map output field list spec is under attribute "map.output.key.value.fields.spec".
+ * The value is expected to be like "keyFieldsSpec:valueFieldsSpec"
+ * key/valueFieldsSpec are comma (,) separated field spec: fieldSpec,fieldSpec,fieldSpec ...
+ * Each field spec can be a simple number (e.g. 5) specifying a specific field, or a range
+ * (like 2-5) to specify a range of fields, or an open range (like 3-) specifying all 
+ * the fields starting from field 3. The open range field spec applies value fields only.
+ * They have no effect on the key fields.
+ * 
+ * Here is an example: "4,3,0,1:6,5,1-3,7-". It specifies to use fields 4,3,0 and 1 for keys,
+ * and use fields 6,5,1,2,3,7 and above for values.
+ * 
+ * The reduce output field list spec is under attribute "reduce.output.key.value.fields.spec".
+ * 
+ * The reducer extracts output key/value pairs in a similar manner, except that
+ * the key is never ignored.
+ * 
+ */
+public class FieldSelectionHelper {
+
+  public static Text emptyText = new Text("");
+
+  /**
+   * Extract the actual field numbers from the given field specs.
+   * If a field spec is in the form of "n-" (like 3-), then n will be the 
+   * return value. Otherwise, -1 will be returned.  
+   * @param fieldListSpec an array of field specs
+   * @param fieldList an array of field numbers extracted from the specs.
+   * @return number n if some field spec is in the form of "n-", -1 otherwise.
+   */
+  private static int extractFields(String[] fieldListSpec,
+      List<Integer> fieldList) {
+    int allFieldsFrom = -1;
+    int i = 0;
+    int j = 0;
+    int pos = -1;
+    String fieldSpec = null;
+    for (i = 0; i < fieldListSpec.length; i++) {
+      fieldSpec = fieldListSpec[i];
+      if (fieldSpec.length() == 0) {
+        continue;
+      }
+      pos = fieldSpec.indexOf('-');
+      if (pos < 0) {
+        Integer fn = new Integer(fieldSpec);
+        fieldList.add(fn);
+      } else {
+        String start = fieldSpec.substring(0, pos);
+        String end = fieldSpec.substring(pos + 1);
+        if (start.length() == 0) {
+          start = "0";
+        }
+        if (end.length() == 0) {
+          allFieldsFrom = Integer.parseInt(start);
+          continue;
+        }
+        int startPos = Integer.parseInt(start);
+        int endPos = Integer.parseInt(end);
+        for (j = startPos; j <= endPos; j++) {
+          fieldList.add(j);
+        }
+      }
+    }
+    return allFieldsFrom;
+  }
+
+  private static String selectFields(String[] fields, List<Integer> fieldList,
+      int allFieldsFrom, String separator) {
+    String retv = null;
+    int i = 0;
+    StringBuffer sb = null;
+    if (fieldList != null && fieldList.size() > 0) {
+      if (sb == null) {
+        sb = new StringBuffer();
+      }
+      for (Integer index : fieldList) {
+        if (index < fields.length) {
+          sb.append(fields[index]);
+        }
+        sb.append(separator);
+      }
+    }
+    if (allFieldsFrom >= 0) {
+      if (sb == null) {
+        sb = new StringBuffer();
+      }
+      for (i = allFieldsFrom; i < fields.length; i++) {
+        sb.append(fields[i]).append(separator);
+      }
+    }
+    if (sb != null) {
+      retv = sb.toString();
+      if (retv.length() > 0) {
+        retv = retv.substring(0, retv.length() - 1);
+      }
+    }
+    return retv;
+  }
+  
+  public static int parseOutputKeyValueSpec(String keyValueSpec,
+      List<Integer> keyFieldList, List<Integer> valueFieldList) {
+    String[] keyValSpecs = keyValueSpec.split(":", -1);
+    
+    String[] keySpec = keyValSpecs[0].split(",");
+    
+    String[] valSpec = new String[0];
+    if (keyValSpecs.length > 1) {
+      valSpec = keyValSpecs[1].split(",");
+    }
+
+    FieldSelectionHelper.extractFields(keySpec, keyFieldList);
+    return FieldSelectionHelper.extractFields(valSpec, valueFieldList);
+  }
+
+  public static String specToString(String fieldSeparator, String keyValueSpec,
+      int allValueFieldsFrom, List<Integer> keyFieldList,
+      List<Integer> valueFieldList) {
+    StringBuffer sb = new StringBuffer();
+    sb.append("fieldSeparator: ").append(fieldSeparator).append("\n");
+
+    sb.append("keyValueSpec: ").append(keyValueSpec).append("\n");
+    sb.append("allValueFieldsFrom: ").append(allValueFieldsFrom);
+    sb.append("\n");
+    sb.append("keyFieldList.length: ").append(keyFieldList.size());
+    sb.append("\n");
+    for (Integer field : keyFieldList) {
+      sb.append("\t").append(field).append("\n");
+    }
+    sb.append("valueFieldList.length: ").append(valueFieldList.size());
+    sb.append("\n");
+    for (Integer field : valueFieldList) {
+      sb.append("\t").append(field).append("\n");
+    }
+    return sb.toString();
+  }
+
+  private Text key = null;
+  private Text value = null;
+  
+  public FieldSelectionHelper() {
+  }
+
+  public FieldSelectionHelper(Text key, Text val) {
+    this.key = key;
+    this.value = val;
+  }
+  
+  public Text getKey() {
+    return key;
+  }
+ 
+  public Text getValue() {
+    return value;
+  }
+
+  public void extractOutputKeyValue(String key, String val,
+      String fieldSep, List<Integer> keyFieldList, List<Integer> valFieldList,
+      int allValueFieldsFrom, boolean ignoreKey, boolean isMap) {
+    if (!ignoreKey) {
+      val = key + val;
+    }
+    String[] fields = val.split(fieldSep);
+    
+    String newKey = selectFields(fields, keyFieldList, -1, fieldSep);
+    String newVal = selectFields(fields, valFieldList, allValueFieldsFrom,
+      fieldSep);
+    if (isMap && newKey == null) {
+      newKey = newVal;
+      newVal = null;
+    }
+    
+    if (newKey != null) {
+      this.key = new Text(newKey);
+    }
+    if (newVal != null) {
+      this.value = new Text(newVal);
+    }
+  }
+}

Added: hadoop/mapreduce/trunk/src/java/org/apache/hadoop/mapreduce/lib/fieldsel/FieldSelectionMapper.java
URL: http://svn.apache.org/viewvc/hadoop/mapreduce/trunk/src/java/org/apache/hadoop/mapreduce/lib/fieldsel/FieldSelectionMapper.java?rev=798778&view=auto
==============================================================================
--- hadoop/mapreduce/trunk/src/java/org/apache/hadoop/mapreduce/lib/fieldsel/FieldSelectionMapper.java (added)
+++ hadoop/mapreduce/trunk/src/java/org/apache/hadoop/mapreduce/lib/fieldsel/FieldSelectionMapper.java Wed Jul 29 04:38:40 2009
@@ -0,0 +1,104 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.hadoop.mapreduce.lib.fieldsel;
+
+import java.io.IOException;
+import java.util.ArrayList;
+import java.util.List;
+
+import org.apache.commons.logging.Log;
+import org.apache.commons.logging.LogFactory;
+import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.io.Text;
+import org.apache.hadoop.mapreduce.Mapper;
+import org.apache.hadoop.mapreduce.lib.input.TextInputFormat;
+
+/**
+ * This class implements a mapper class that can be used to perform
+ * field selections in a manner similar to unix cut. The input data is treated
+ * as fields separated by a user specified separator (the default value is
+ * "\t"). The user can specify a list of fields that form the map output keys,
+ * and a list of fields that form the map output values. If the inputformat is
+ * TextInputFormat, the mapper will ignore the key to the map function. and the
+ * fields are from the value only. Otherwise, the fields are the union of those
+ * from the key and those from the value.
+ * 
+ * The field separator is under attribute "mapred.data.field.separator"
+ * 
+ * The map output field list spec is under attribute 
+ * "map.output.key.value.fields.spec". The value is expected to be like
+ * "keyFieldsSpec:valueFieldsSpec" key/valueFieldsSpec are comma (,) separated
+ * field spec: fieldSpec,fieldSpec,fieldSpec ... Each field spec can be a 
+ * simple number (e.g. 5) specifying a specific field, or a range (like 2-5)
+ * to specify a range of fields, or an open range (like 3-) specifying all 
+ * the fields starting from field 3. The open range field spec applies value
+ * fields only. They have no effect on the key fields.
+ * 
+ * Here is an example: "4,3,0,1:6,5,1-3,7-". It specifies to use fields
+ * 4,3,0 and 1 for keys, and use fields 6,5,1,2,3,7 and above for values.
+ */
+public class FieldSelectionMapper<K, V>
+    extends Mapper<K, V, Text, Text> {
+
+  private String mapOutputKeyValueSpec;
+
+  private boolean ignoreInputKey;
+
+  private String fieldSeparator = "\t";
+
+  private List<Integer> mapOutputKeyFieldList = new ArrayList<Integer>();
+
+  private List<Integer> mapOutputValueFieldList = new ArrayList<Integer>();
+
+  private int allMapValueFieldsFrom = -1;
+
+  public static final Log LOG = LogFactory.getLog("FieldSelectionMapReduce");
+
+  public void setup(Context context) 
+      throws IOException, InterruptedException {
+    Configuration conf = context.getConfiguration();
+    this.fieldSeparator = conf.get("mapred.data.field.separator", "\t");
+    this.mapOutputKeyValueSpec = 
+      conf.get("map.output.key.value.fields.spec", "0-:");
+    try {
+      this.ignoreInputKey = TextInputFormat.class.getCanonicalName().equals(
+        context.getInputFormatClass().getCanonicalName());
+    } catch (ClassNotFoundException e) {
+      throw new IOException("Input format class not found", e);
+    }
+    allMapValueFieldsFrom = FieldSelectionHelper.parseOutputKeyValueSpec(
+      mapOutputKeyValueSpec, mapOutputKeyFieldList, mapOutputValueFieldList);
+    LOG.info(FieldSelectionHelper.specToString(fieldSeparator,
+      mapOutputKeyValueSpec, allMapValueFieldsFrom, mapOutputKeyFieldList,
+      mapOutputValueFieldList) + "\nignoreInputKey:" + ignoreInputKey);
+  }
+
+  /**
+   * The identify function. Input key/value pair is written directly to output.
+   */
+  public void map(K key, V val, Context context) 
+      throws IOException, InterruptedException {
+    FieldSelectionHelper helper = new FieldSelectionHelper(
+      FieldSelectionHelper.emptyText, FieldSelectionHelper.emptyText);
+    helper.extractOutputKeyValue(key.toString(), val.toString(),
+      fieldSeparator, mapOutputKeyFieldList, mapOutputValueFieldList,
+      allMapValueFieldsFrom, ignoreInputKey, true);
+    context.write(helper.getKey(), helper.getValue());
+  }
+}

Added: hadoop/mapreduce/trunk/src/java/org/apache/hadoop/mapreduce/lib/fieldsel/FieldSelectionReducer.java
URL: http://svn.apache.org/viewvc/hadoop/mapreduce/trunk/src/java/org/apache/hadoop/mapreduce/lib/fieldsel/FieldSelectionReducer.java?rev=798778&view=auto
==============================================================================
--- hadoop/mapreduce/trunk/src/java/org/apache/hadoop/mapreduce/lib/fieldsel/FieldSelectionReducer.java (added)
+++ hadoop/mapreduce/trunk/src/java/org/apache/hadoop/mapreduce/lib/fieldsel/FieldSelectionReducer.java Wed Jul 29 04:38:40 2009
@@ -0,0 +1,100 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.hadoop.mapreduce.lib.fieldsel;
+
+import java.io.IOException;
+import java.util.ArrayList;
+import java.util.List;
+
+import org.apache.commons.logging.Log;
+import org.apache.commons.logging.LogFactory;
+import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.io.Text;
+import org.apache.hadoop.mapreduce.Reducer;
+
+/**
+ * This class implements a reducer class that can be used to perform field
+ * selections in a manner similar to unix cut. 
+ * 
+ * The input data is treated as fields separated by a user specified
+ * separator (the default value is "\t"). The user can specify a list of
+ * fields that form the reduce output keys, and a list of fields that form
+ * the reduce output values. The fields are the union of those from the key
+ * and those from the value.
+ * 
+ * The field separator is under attribute "mapred.data.field.separator"
+ * 
+ * The reduce output field list spec is under attribute 
+ * "reduce.output.key.value.fields.spec". The value is expected to be like
+ * "keyFieldsSpec:valueFieldsSpec" key/valueFieldsSpec are comma (,) 
+ * separated field spec: fieldSpec,fieldSpec,fieldSpec ... Each field spec
+ * can be a simple number (e.g. 5) specifying a specific field, or a range
+ * (like 2-5) to specify a range of fields, or an open range (like 3-) 
+ * specifying all the fields starting from field 3. The open range field
+ * spec applies value fields only. They have no effect on the key fields.
+ * 
+ * Here is an example: "4,3,0,1:6,5,1-3,7-". It specifies to use fields
+ * 4,3,0 and 1 for keys, and use fields 6,5,1,2,3,7 and above for values.
+ */
+public class FieldSelectionReducer<K, V>
+    extends Reducer<Text, Text, Text, Text> {
+
+  private String fieldSeparator = "\t";
+
+  private String reduceOutputKeyValueSpec;
+
+  private List<Integer> reduceOutputKeyFieldList = new ArrayList<Integer>();
+
+  private List<Integer> reduceOutputValueFieldList = new ArrayList<Integer>();
+
+  private int allReduceValueFieldsFrom = -1;
+
+  public static final Log LOG = LogFactory.getLog("FieldSelectionMapReduce");
+
+  public void setup(Context context) 
+      throws IOException, InterruptedException {
+    Configuration conf = context.getConfiguration();
+    
+    this.fieldSeparator = conf.get("mapred.data.field.separator", "\t");
+    
+    this.reduceOutputKeyValueSpec = 
+      conf.get("reduce.output.key.value.fields.spec", "0-:");
+    
+    allReduceValueFieldsFrom = FieldSelectionHelper.parseOutputKeyValueSpec(
+      reduceOutputKeyValueSpec, reduceOutputKeyFieldList,
+      reduceOutputValueFieldList);
+
+    LOG.info(FieldSelectionHelper.specToString(fieldSeparator,
+      reduceOutputKeyValueSpec, allReduceValueFieldsFrom,
+      reduceOutputKeyFieldList, reduceOutputValueFieldList));
+  }
+
+  public void reduce(Text key, Iterable<Text> values, Context context)
+      throws IOException, InterruptedException {
+    String keyStr = key.toString() + this.fieldSeparator;
+    
+    for (Text val : values) {
+      FieldSelectionHelper helper = new FieldSelectionHelper();
+      helper.extractOutputKeyValue(keyStr, val.toString(),
+        fieldSeparator, reduceOutputKeyFieldList,
+        reduceOutputValueFieldList, allReduceValueFieldsFrom, false, false);
+      context.write(helper.getKey(), helper.getValue());
+    }
+  }
+}

Modified: hadoop/mapreduce/trunk/src/test/mapred/org/apache/hadoop/mapred/TestFieldSelection.java
URL: http://svn.apache.org/viewvc/hadoop/mapreduce/trunk/src/test/mapred/org/apache/hadoop/mapred/TestFieldSelection.java?rev=798778&r1=798777&r2=798778&view=diff
==============================================================================
--- hadoop/mapreduce/trunk/src/test/mapred/org/apache/hadoop/mapred/TestFieldSelection.java (original)
+++ hadoop/mapreduce/trunk/src/test/mapred/org/apache/hadoop/mapred/TestFieldSelection.java Wed Jul 29 04:38:40 2009
@@ -20,6 +20,8 @@
 import org.apache.hadoop.fs.*;
 import org.apache.hadoop.io.*;
 import org.apache.hadoop.mapred.lib.*;
+import org.apache.hadoop.mapreduce.lib.fieldsel.TestMRFieldSelection;
+
 import junit.framework.TestCase;
 import java.text.NumberFormat;
 
@@ -50,39 +52,9 @@
     StringBuffer inputData = new StringBuffer();
     StringBuffer expectedOutput = new StringBuffer();
 
+    TestMRFieldSelection.constructInputOutputData(inputData,
+      expectedOutput, numOfInputLines);
     FSDataOutputStream fileOut = fs.create(new Path(INPUT_DIR, inputFile));
-    for (int i = 0; i < numOfInputLines; i++) {
-        inputData.append(idFormat.format(i));
-        inputData.append("-").append(idFormat.format(i+1));
-        inputData.append("-").append(idFormat.format(i+2));
-        inputData.append("-").append(idFormat.format(i+3));
-        inputData.append("-").append(idFormat.format(i+4));
-        inputData.append("-").append(idFormat.format(i+5));
-        inputData.append("-").append(idFormat.format(i+6));
-        inputData.append("\n");
-
-
-        expectedOutput.append(idFormat.format(i+3));
-        expectedOutput.append("-" ).append (idFormat.format(i+2));
-        expectedOutput.append("-" ).append (idFormat.format(i+1));
-        expectedOutput.append("-" ).append (idFormat.format(i+5));
-        expectedOutput.append("-" ).append (idFormat.format(i+6));
-
-        expectedOutput.append("-" ).append (idFormat.format(i+6));
-        expectedOutput.append("-" ).append (idFormat.format(i+5));
-        expectedOutput.append("-" ).append (idFormat.format(i+1));
-        expectedOutput.append("-" ).append (idFormat.format(i+2));
-        expectedOutput.append("-" ).append (idFormat.format(i+3));
-
-        expectedOutput.append("-" ).append (idFormat.format(i+0));
-        expectedOutput.append("-" ).append (idFormat.format(i+1));
-        expectedOutput.append("-" ).append (idFormat.format(i+2));
-        expectedOutput.append("-" ).append (idFormat.format(i+3));
-        expectedOutput.append("-" ).append (idFormat.format(i+4));
-        expectedOutput.append("-" ).append (idFormat.format(i+5));
-        expectedOutput.append("-" ).append (idFormat.format(i+6));
-        expectedOutput.append("\n");
-    }
     fileOut.write(inputData.toString().getBytes("utf-8"));
     fileOut.close();
 

Modified: hadoop/mapreduce/trunk/src/test/mapred/org/apache/hadoop/mapreduce/MapReduceTestUtil.java
URL: http://svn.apache.org/viewvc/hadoop/mapreduce/trunk/src/test/mapred/org/apache/hadoop/mapreduce/MapReduceTestUtil.java?rev=798778&r1=798777&r2=798778&view=diff
==============================================================================
--- hadoop/mapreduce/trunk/src/test/mapred/org/apache/hadoop/mapreduce/MapReduceTestUtil.java (original)
+++ hadoop/mapreduce/trunk/src/test/mapred/org/apache/hadoop/mapreduce/MapReduceTestUtil.java Wed Jul 29 04:38:40 2009
@@ -18,20 +18,26 @@
 
 package org.apache.hadoop.mapreduce;
 
+import java.io.BufferedReader;
 import java.io.DataOutputStream;
 import java.io.IOException;
+import java.io.InputStreamReader;
 import java.text.NumberFormat;
 import java.util.Iterator;
 import java.util.Random;
 
+import org.apache.commons.logging.Log;
+import org.apache.commons.logging.LogFactory;
 import org.apache.hadoop.conf.Configuration;
 import org.apache.hadoop.fs.FSDataOutputStream;
 import org.apache.hadoop.fs.FileSystem;
+import org.apache.hadoop.fs.FileUtil;
 import org.apache.hadoop.fs.Path;
 import org.apache.hadoop.io.LongWritable;
 import org.apache.hadoop.io.Text;
 import org.apache.hadoop.io.Writable;
 import org.apache.hadoop.io.WritableComparable;
+import org.apache.hadoop.mapred.OutputLogFilter;
 import org.apache.hadoop.mapreduce.Job;
 import org.apache.hadoop.mapreduce.Mapper;
 import org.apache.hadoop.mapreduce.Reducer;
@@ -42,6 +48,8 @@
  * Utility methods used in various Job Control unit tests.
  */
 public class MapReduceTestUtil {
+  public static final Log LOG = 
+    LogFactory.getLog(MapReduceTestUtil.class.getName());
 
   static private Random rand = new Random();
 
@@ -280,4 +288,27 @@
       }
     };
   }
+  
+  public static String readOutput(Path outDir, Configuration conf) 
+      throws IOException {
+    FileSystem fs = outDir.getFileSystem(conf);
+    StringBuffer result = new StringBuffer();
+
+    Path[] fileList = FileUtil.stat2Paths(fs.listStatus(outDir,
+           new OutputLogFilter()));
+    for (Path outputFile : fileList) {
+      LOG.info("Path" + ": "+ outputFile);
+      BufferedReader file = 
+        new BufferedReader(new InputStreamReader(fs.open(outputFile)));
+      String line = file.readLine();
+      while (line != null) {
+        result.append(line);
+        result.append("\n");
+        line = file.readLine();
+      }
+      file.close();
+    }
+    return result.toString();
+  }
+
 }

Added: hadoop/mapreduce/trunk/src/test/mapred/org/apache/hadoop/mapreduce/lib/fieldsel/TestMRFieldSelection.java
URL: http://svn.apache.org/viewvc/hadoop/mapreduce/trunk/src/test/mapred/org/apache/hadoop/mapreduce/lib/fieldsel/TestMRFieldSelection.java?rev=798778&view=auto
==============================================================================
--- hadoop/mapreduce/trunk/src/test/mapred/org/apache/hadoop/mapreduce/lib/fieldsel/TestMRFieldSelection.java (added)
+++ hadoop/mapreduce/trunk/src/test/mapred/org/apache/hadoop/mapreduce/lib/fieldsel/TestMRFieldSelection.java Wed Jul 29 04:38:40 2009
@@ -0,0 +1,123 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.hadoop.mapreduce.lib.fieldsel;
+
+import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.fs.*;
+import org.apache.hadoop.io.*;
+import org.apache.hadoop.mapreduce.Job;
+import org.apache.hadoop.mapreduce.MapReduceTestUtil;
+
+import junit.framework.TestCase;
+import java.text.NumberFormat;
+
+public class TestMRFieldSelection extends TestCase {
+
+private static NumberFormat idFormat = NumberFormat.getInstance();
+  static {
+    idFormat.setMinimumIntegerDigits(4);
+    idFormat.setGroupingUsed(false);
+  }
+
+  public void testFieldSelection() throws Exception {
+    launch();
+  }
+  private static Path testDir = new Path(
+    System.getProperty("test.build.data", "/tmp"), "field");
+  
+  public static void launch() throws Exception {
+    Configuration conf = new Configuration();
+    FileSystem fs = FileSystem.get(conf);
+    int numOfInputLines = 10;
+
+    Path outDir = new Path(testDir, "output_for_field_selection_test");
+    Path inDir = new Path(testDir, "input_for_field_selection_test");
+
+    StringBuffer inputData = new StringBuffer();
+    StringBuffer expectedOutput = new StringBuffer();
+    constructInputOutputData(inputData, expectedOutput, numOfInputLines);
+    
+    conf.set("mapred.data.field.separator", "-");
+    conf.set("map.output.key.value.fields.spec", "6,5,1-3:0-");
+    conf.set("reduce.output.key.value.fields.spec", ":4,3,2,1,0,0-");
+    Job job = MapReduceTestUtil.createJob(conf, inDir, outDir,
+      1, 1, inputData.toString());
+    job.setMapperClass(FieldSelectionMapper.class);
+    job.setReducerClass(FieldSelectionReducer.class);
+    job.setOutputKeyClass(Text.class);
+    job.setOutputValueClass(Text.class);
+    job.setNumReduceTasks(1);
+
+    job.waitForCompletion(true);
+    assertTrue("Job Failed!", job.isSuccessful());
+
+    //
+    // Finally, we compare the reconstructed answer key with the
+    // original one.  Remember, we need to ignore zero-count items
+    // in the original key.
+    //
+    String outdata = MapReduceTestUtil.readOutput(outDir, conf);
+    assertEquals("Outputs doesnt match.",expectedOutput.toString(), outdata);
+    fs.delete(outDir, true);
+  }
+
+  public static void constructInputOutputData(StringBuffer inputData,
+      StringBuffer expectedOutput, int numOfInputLines) {
+    for (int i = 0; i < numOfInputLines; i++) {
+      inputData.append(idFormat.format(i));
+      inputData.append("-").append(idFormat.format(i+1));
+      inputData.append("-").append(idFormat.format(i+2));
+      inputData.append("-").append(idFormat.format(i+3));
+      inputData.append("-").append(idFormat.format(i+4));
+      inputData.append("-").append(idFormat.format(i+5));
+      inputData.append("-").append(idFormat.format(i+6));
+      inputData.append("\n");
+
+      expectedOutput.append(idFormat.format(i+3));
+      expectedOutput.append("-" ).append (idFormat.format(i+2));
+      expectedOutput.append("-" ).append (idFormat.format(i+1));
+      expectedOutput.append("-" ).append (idFormat.format(i+5));
+      expectedOutput.append("-" ).append (idFormat.format(i+6));
+
+      expectedOutput.append("-" ).append (idFormat.format(i+6));
+      expectedOutput.append("-" ).append (idFormat.format(i+5));
+      expectedOutput.append("-" ).append (idFormat.format(i+1));
+      expectedOutput.append("-" ).append (idFormat.format(i+2));
+      expectedOutput.append("-" ).append (idFormat.format(i+3));
+      expectedOutput.append("-" ).append (idFormat.format(i+0));
+      expectedOutput.append("-" ).append (idFormat.format(i+1));
+      expectedOutput.append("-" ).append (idFormat.format(i+2));
+      expectedOutput.append("-" ).append (idFormat.format(i+3));
+      expectedOutput.append("-" ).append (idFormat.format(i+4));
+      expectedOutput.append("-" ).append (idFormat.format(i+5));
+      expectedOutput.append("-" ).append (idFormat.format(i+6));
+      expectedOutput.append("\n");
+    }
+    System.out.println("inputData:");
+    System.out.println(inputData.toString());
+    System.out.println("ExpectedData:");
+    System.out.println(expectedOutput.toString());
+  }
+  
+  /**
+   * Launches all the tasks in order.
+   */
+  public static void main(String[] argv) throws Exception {
+    launch();
+  }
+}