You are viewing a plain text version of this content. The canonical link for it is here.
Posted to mapreduce-commits@hadoop.apache.org by sh...@apache.org on 2009/07/29 06:38:41 UTC
svn commit: r798778 - in /hadoop/mapreduce/trunk: ./
src/java/org/apache/hadoop/mapred/lib/
src/java/org/apache/hadoop/mapreduce/lib/fieldsel/
src/test/mapred/org/apache/hadoop/mapred/
src/test/mapred/org/apache/hadoop/mapreduce/ src/test/mapred/org/ap...
Author: sharad
Date: Wed Jul 29 04:38:40 2009
New Revision: 798778
URL: http://svn.apache.org/viewvc?rev=798778&view=rev
Log:
MAPREDUCE-373. Change org.apache.hadoop.mapred.lib.FieldSelectionMapReduce to use new mapreduce API. Contributed by Amareshwari Sriramadasu.
Added:
hadoop/mapreduce/trunk/src/java/org/apache/hadoop/mapreduce/lib/fieldsel/
hadoop/mapreduce/trunk/src/java/org/apache/hadoop/mapreduce/lib/fieldsel/FieldSelectionHelper.java
hadoop/mapreduce/trunk/src/java/org/apache/hadoop/mapreduce/lib/fieldsel/FieldSelectionMapper.java
hadoop/mapreduce/trunk/src/java/org/apache/hadoop/mapreduce/lib/fieldsel/FieldSelectionReducer.java
hadoop/mapreduce/trunk/src/test/mapred/org/apache/hadoop/mapreduce/lib/fieldsel/
hadoop/mapreduce/trunk/src/test/mapred/org/apache/hadoop/mapreduce/lib/fieldsel/TestMRFieldSelection.java
Modified:
hadoop/mapreduce/trunk/CHANGES.txt
hadoop/mapreduce/trunk/src/java/org/apache/hadoop/mapred/lib/FieldSelectionMapReduce.java
hadoop/mapreduce/trunk/src/test/mapred/org/apache/hadoop/mapred/TestFieldSelection.java
hadoop/mapreduce/trunk/src/test/mapred/org/apache/hadoop/mapreduce/MapReduceTestUtil.java
Modified: hadoop/mapreduce/trunk/CHANGES.txt
URL: http://svn.apache.org/viewvc/hadoop/mapreduce/trunk/CHANGES.txt?rev=798778&r1=798777&r2=798778&view=diff
==============================================================================
--- hadoop/mapreduce/trunk/CHANGES.txt (original)
+++ hadoop/mapreduce/trunk/CHANGES.txt Wed Jul 29 04:38:40 2009
@@ -147,6 +147,9 @@
MAPREDUCE-369. Change org.apache.hadoop.mapred.lib.MultipleInputs to
use new api. (Amareshwari Sriramadasu via sharad)
+ MAPREDUCE-373. Change org.apache.hadoop.mapred.lib.FieldSelectionMapReduce
+ to use new api. (Amareshwari Sriramadasu via sharad)
+
BUG FIXES
MAPREDUCE-703. Sqoop requires dependency on hsqldb in ivy.
(Aaron Kimball via matei)
Modified: hadoop/mapreduce/trunk/src/java/org/apache/hadoop/mapred/lib/FieldSelectionMapReduce.java
URL: http://svn.apache.org/viewvc/hadoop/mapreduce/trunk/src/java/org/apache/hadoop/mapred/lib/FieldSelectionMapReduce.java?rev=798778&r1=798777&r2=798778&view=diff
==============================================================================
--- hadoop/mapreduce/trunk/src/java/org/apache/hadoop/mapred/lib/FieldSelectionMapReduce.java (original)
+++ hadoop/mapreduce/trunk/src/java/org/apache/hadoop/mapred/lib/FieldSelectionMapReduce.java Wed Jul 29 04:38:40 2009
@@ -21,6 +21,7 @@
import java.io.IOException;
import java.util.ArrayList;
import java.util.Iterator;
+import java.util.List;
import org.apache.commons.logging.Log;
import org.apache.commons.logging.LogFactory;
@@ -31,6 +32,7 @@
import org.apache.hadoop.mapred.Reducer;
import org.apache.hadoop.mapred.Reporter;
import org.apache.hadoop.mapred.TextInputFormat;
+import org.apache.hadoop.mapreduce.lib.fieldsel.*;
/**
* This class implements a mapper/reducer class that can be used to perform
@@ -59,8 +61,10 @@
*
* The reducer extracts output key/value pairs in a similar manner, except that
* the key is never ignored.
- *
+ * @deprecated Use {@link FieldSelectionMapper} and
+ * {@link FieldSelectionReducer} instead
*/
+@Deprecated
public class FieldSelectionMapReduce<K, V>
implements Mapper<K, V, Text, Text>, Reducer<Text, Text, Text, Text> {
@@ -70,21 +74,20 @@
private String fieldSeparator = "\t";
- private int[] mapOutputKeyFieldList = null;
+ private List<Integer> mapOutputKeyFieldList = new ArrayList<Integer>();
- private int[] mapOutputValueFieldList = null;
+ private List<Integer> mapOutputValueFieldList = new ArrayList<Integer>();
private int allMapValueFieldsFrom = -1;
private String reduceOutputKeyValueSpec;
- private int[] reduceOutputKeyFieldList = null;
+ private List<Integer> reduceOutputKeyFieldList = new ArrayList<Integer>();
- private int[] reduceOutputValueFieldList = null;
+ private List<Integer> reduceOutputValueFieldList = new ArrayList<Integer>();
private int allReduceValueFieldsFrom = -1;
- private static Text emptyText = new Text("");
public static final Log LOG = LogFactory.getLog("FieldSelectionMapReduce");
@@ -106,25 +109,25 @@
int i = 0;
sb.append("mapOutputKeyFieldList.length: ").append(
- mapOutputKeyFieldList.length).append("\n");
- for (i = 0; i < mapOutputKeyFieldList.length; i++) {
- sb.append("\t").append(mapOutputKeyFieldList[i]).append("\n");
+ mapOutputKeyFieldList.size()).append("\n");
+ for (i = 0; i < mapOutputKeyFieldList.size(); i++) {
+ sb.append("\t").append(mapOutputKeyFieldList.get(i)).append("\n");
}
sb.append("mapOutputValueFieldList.length: ").append(
- mapOutputValueFieldList.length).append("\n");
- for (i = 0; i < mapOutputValueFieldList.length; i++) {
- sb.append("\t").append(mapOutputValueFieldList[i]).append("\n");
+ mapOutputValueFieldList.size()).append("\n");
+ for (i = 0; i < mapOutputValueFieldList.size(); i++) {
+ sb.append("\t").append(mapOutputValueFieldList.get(i)).append("\n");
}
sb.append("reduceOutputKeyFieldList.length: ").append(
- reduceOutputKeyFieldList.length).append("\n");
- for (i = 0; i < reduceOutputKeyFieldList.length; i++) {
- sb.append("\t").append(reduceOutputKeyFieldList[i]).append("\n");
+ reduceOutputKeyFieldList.size()).append("\n");
+ for (i = 0; i < reduceOutputKeyFieldList.size(); i++) {
+ sb.append("\t").append(reduceOutputKeyFieldList.get(i)).append("\n");
}
sb.append("reduceOutputValueFieldList.length: ").append(
- reduceOutputValueFieldList.length).append("\n");
- for (i = 0; i < reduceOutputValueFieldList.length; i++) {
- sb.append("\t").append(reduceOutputValueFieldList[i]).append("\n");
+ reduceOutputValueFieldList.size()).append("\n");
+ for (i = 0; i < reduceOutputValueFieldList.size(); i++) {
+ sb.append("\t").append(reduceOutputValueFieldList.get(i)).append("\n");
}
return sb.toString();
}
@@ -133,131 +136,23 @@
* The identify function. Input key/value pair is written directly to output.
*/
public void map(K key, V val,
- OutputCollector<Text, Text> output, Reporter reporter) throws IOException {
- String valStr = val.toString();
- String[] inputValFields = valStr.split(this.fieldSeparator);
- String[] inputKeyFields = null;
- String[] fields = null;
- if (this.ignoreInputKey) {
- fields = inputValFields;
- } else {
- inputKeyFields = key.toString().split(this.fieldSeparator);
- fields = new String[inputKeyFields.length + inputValFields.length];
- int i = 0;
- for (i = 0; i < inputKeyFields.length; i++) {
- fields[i] = inputKeyFields[i];
- }
- for (i = 0; i < inputValFields.length; i++) {
- fields[inputKeyFields.length + i] = inputValFields[i];
- }
- }
- String newKey = selectFields(fields, mapOutputKeyFieldList, -1,
- fieldSeparator);
- String newVal = selectFields(fields, mapOutputValueFieldList,
- allMapValueFieldsFrom, fieldSeparator);
-
- if (newKey == null) {
- newKey = newVal;
- newVal = null;
- }
- Text newTextKey = emptyText;
- if (newKey != null) {
- newTextKey = new Text(newKey);
- }
- Text newTextVal = emptyText;
- if (newTextVal != null) {
- newTextVal = new Text(newVal);
- }
- output.collect(newTextKey, newTextVal);
- }
-
- /**
- * Extract the actual field numbers from the given field specs.
- * If a field spec is in the form of "n-" (like 3-), then n will be the
- * return value. Otherwise, -1 will be returned.
- * @param fieldListSpec an array of field specs
- * @param fieldList an array of field numbers extracted from the specs.
- * @return number n if some field spec is in the form of "n-", -1 otherwise.
- */
- private int extractFields(String[] fieldListSpec,
- ArrayList<Integer> fieldList) {
- int allFieldsFrom = -1;
- int i = 0;
- int j = 0;
- int pos = -1;
- String fieldSpec = null;
- for (i = 0; i < fieldListSpec.length; i++) {
- fieldSpec = fieldListSpec[i];
- if (fieldSpec.length() == 0) {
- continue;
- }
- pos = fieldSpec.indexOf('-');
- if (pos < 0) {
- Integer fn = new Integer(fieldSpec);
- fieldList.add(fn);
- } else {
- String start = fieldSpec.substring(0, pos);
- String end = fieldSpec.substring(pos + 1);
- if (start.length() == 0) {
- start = "0";
- }
- if (end.length() == 0) {
- allFieldsFrom = Integer.parseInt(start);
- continue;
- }
- int startPos = Integer.parseInt(start);
- int endPos = Integer.parseInt(end);
- for (j = startPos; j <= endPos; j++) {
- fieldList.add(j);
- }
- }
- }
- return allFieldsFrom;
+ OutputCollector<Text, Text> output, Reporter reporter)
+ throws IOException {
+ FieldSelectionHelper helper = new FieldSelectionHelper(
+ FieldSelectionHelper.emptyText, FieldSelectionHelper.emptyText);
+ helper.extractOutputKeyValue(key.toString(), val.toString(),
+ fieldSeparator, mapOutputKeyFieldList, mapOutputValueFieldList,
+ allMapValueFieldsFrom, ignoreInputKey, true);
+ output.collect(helper.getKey(), helper.getValue());
}
private void parseOutputKeyValueSpec() {
- String[] mapKeyValSpecs = mapOutputKeyValueSpec.split(":", -1);
- String[] mapKeySpec = mapKeyValSpecs[0].split(",");
- String[] mapValSpec = new String[0];
- if (mapKeyValSpecs.length > 1) {
- mapValSpec = mapKeyValSpecs[1].split(",");
- }
-
- int i = 0;
- ArrayList<Integer> fieldList = new ArrayList<Integer>();
- extractFields(mapKeySpec, fieldList);
- this.mapOutputKeyFieldList = new int[fieldList.size()];
- for (i = 0; i < fieldList.size(); i++) {
- this.mapOutputKeyFieldList[i] = fieldList.get(i).intValue();
- }
-
- fieldList = new ArrayList<Integer>();
- allMapValueFieldsFrom = extractFields(mapValSpec, fieldList);
- this.mapOutputValueFieldList = new int[fieldList.size()];
- for (i = 0; i < fieldList.size(); i++) {
- this.mapOutputValueFieldList[i] = fieldList.get(i).intValue();
- }
-
- String[] reduceKeyValSpecs = reduceOutputKeyValueSpec.split(":", -1);
- String[] reduceKeySpec = reduceKeyValSpecs[0].split(",");
- String[] reduceValSpec = new String[0];
- if (reduceKeyValSpecs.length > 1) {
- reduceValSpec = reduceKeyValSpecs[1].split(",");
- }
-
- fieldList = new ArrayList<Integer>();
- extractFields(reduceKeySpec, fieldList);
- this.reduceOutputKeyFieldList = new int[fieldList.size()];
- for (i = 0; i < fieldList.size(); i++) {
- this.reduceOutputKeyFieldList[i] = fieldList.get(i).intValue();
- }
-
- fieldList = new ArrayList<Integer>();
- allReduceValueFieldsFrom = extractFields(reduceValSpec, fieldList);
- this.reduceOutputValueFieldList = new int[fieldList.size()];
- for (i = 0; i < fieldList.size(); i++) {
- this.reduceOutputValueFieldList[i] = fieldList.get(i).intValue();
- }
+ allMapValueFieldsFrom = FieldSelectionHelper.parseOutputKeyValueSpec(
+ mapOutputKeyValueSpec, mapOutputKeyFieldList, mapOutputValueFieldList);
+
+ allReduceValueFieldsFrom = FieldSelectionHelper.parseOutputKeyValueSpec(
+ reduceOutputKeyValueSpec, reduceOutputKeyFieldList,
+ reduceOutputValueFieldList);
}
public void configure(JobConf job) {
@@ -277,61 +172,16 @@
}
- private static String selectFields(String[] fields, int[] fieldList,
- int allFieldsFrom, String separator) {
- String retv = null;
- int i = 0;
- StringBuffer sb = null;
- if (fieldList != null && fieldList.length > 0) {
- if (sb == null) {
- sb = new StringBuffer();
- }
- for (i = 0; i < fieldList.length; i++) {
- if (fieldList[i] < fields.length) {
- sb.append(fields[fieldList[i]]);
- }
- sb.append(separator);
- }
- }
- if (allFieldsFrom >= 0) {
- if (sb == null) {
- sb = new StringBuffer();
- }
- for (i = allFieldsFrom; i < fields.length; i++) {
- sb.append(fields[i]).append(separator);
- }
- }
- if (sb != null) {
- retv = sb.toString();
- if (retv.length() > 0) {
- retv = retv.substring(0, retv.length() - 1);
- }
- }
- return retv;
- }
-
public void reduce(Text key, Iterator<Text> values,
OutputCollector<Text, Text> output, Reporter reporter)
throws IOException {
-
String keyStr = key.toString() + this.fieldSeparator;
while (values.hasNext()) {
- String valStr = values.next().toString();
- valStr = keyStr + valStr;
- String[] fields = valStr.split(this.fieldSeparator);
- String newKey = selectFields(fields, reduceOutputKeyFieldList, -1,
- fieldSeparator);
- String newVal = selectFields(fields, reduceOutputValueFieldList,
- allReduceValueFieldsFrom, fieldSeparator);
- Text newTextKey = null;
- if (newKey != null) {
- newTextKey = new Text(newKey);
- }
- Text newTextVal = null;
- if (newVal != null) {
- newTextVal = new Text(newVal);
- }
- output.collect(newTextKey, newTextVal);
+ FieldSelectionHelper helper = new FieldSelectionHelper();
+ helper.extractOutputKeyValue(keyStr, values.next().toString(),
+ fieldSeparator, reduceOutputKeyFieldList,
+ reduceOutputValueFieldList, allReduceValueFieldsFrom, false, false);
+ output.collect(helper.getKey(), helper.getValue());
}
}
}
Added: hadoop/mapreduce/trunk/src/java/org/apache/hadoop/mapreduce/lib/fieldsel/FieldSelectionHelper.java
URL: http://svn.apache.org/viewvc/hadoop/mapreduce/trunk/src/java/org/apache/hadoop/mapreduce/lib/fieldsel/FieldSelectionHelper.java?rev=798778&view=auto
==============================================================================
--- hadoop/mapreduce/trunk/src/java/org/apache/hadoop/mapreduce/lib/fieldsel/FieldSelectionHelper.java (added)
+++ hadoop/mapreduce/trunk/src/java/org/apache/hadoop/mapreduce/lib/fieldsel/FieldSelectionHelper.java Wed Jul 29 04:38:40 2009
@@ -0,0 +1,214 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.hadoop.mapreduce.lib.fieldsel;
+
+import java.util.List;
+
+import org.apache.hadoop.io.Text;
+
+/**
+ * This class implements a mapper/reducer class that can be used to perform
+ * field selections in a manner similar to unix cut. The input data is treated
+ * as fields separated by a user specified separator (the default value is
+ * "\t"). The user can specify a list of fields that form the map output keys,
+ * and a list of fields that form the map output values. If the inputformat is
+ * TextInputFormat, the mapper will ignore the key to the map function. and the
+ * fields are from the value only. Otherwise, the fields are the union of those
+ * from the key and those from the value.
+ *
+ * The field separator is under attribute "mapred.data.field.separator"
+ *
+ * The map output field list spec is under attribute "map.output.key.value.fields.spec".
+ * The value is expected to be like "keyFieldsSpec:valueFieldsSpec"
+ * key/valueFieldsSpec are comma (,) separated field spec: fieldSpec,fieldSpec,fieldSpec ...
+ * Each field spec can be a simple number (e.g. 5) specifying a specific field, or a range
+ * (like 2-5) to specify a range of fields, or an open range (like 3-) specifying all
+ * the fields starting from field 3. The open range field spec applies value fields only.
+ * They have no effect on the key fields.
+ *
+ * Here is an example: "4,3,0,1:6,5,1-3,7-". It specifies to use fields 4,3,0 and 1 for keys,
+ * and use fields 6,5,1,2,3,7 and above for values.
+ *
+ * The reduce output field list spec is under attribute "reduce.output.key.value.fields.spec".
+ *
+ * The reducer extracts output key/value pairs in a similar manner, except that
+ * the key is never ignored.
+ *
+ */
+public class FieldSelectionHelper {
+
+ public static Text emptyText = new Text("");
+
+ /**
+ * Extract the actual field numbers from the given field specs.
+ * If a field spec is in the form of "n-" (like 3-), then n will be the
+ * return value. Otherwise, -1 will be returned.
+ * @param fieldListSpec an array of field specs
+ * @param fieldList an array of field numbers extracted from the specs.
+ * @return number n if some field spec is in the form of "n-", -1 otherwise.
+ */
+ private static int extractFields(String[] fieldListSpec,
+ List<Integer> fieldList) {
+ int allFieldsFrom = -1;
+ int i = 0;
+ int j = 0;
+ int pos = -1;
+ String fieldSpec = null;
+ for (i = 0; i < fieldListSpec.length; i++) {
+ fieldSpec = fieldListSpec[i];
+ if (fieldSpec.length() == 0) {
+ continue;
+ }
+ pos = fieldSpec.indexOf('-');
+ if (pos < 0) {
+ Integer fn = new Integer(fieldSpec);
+ fieldList.add(fn);
+ } else {
+ String start = fieldSpec.substring(0, pos);
+ String end = fieldSpec.substring(pos + 1);
+ if (start.length() == 0) {
+ start = "0";
+ }
+ if (end.length() == 0) {
+ allFieldsFrom = Integer.parseInt(start);
+ continue;
+ }
+ int startPos = Integer.parseInt(start);
+ int endPos = Integer.parseInt(end);
+ for (j = startPos; j <= endPos; j++) {
+ fieldList.add(j);
+ }
+ }
+ }
+ return allFieldsFrom;
+ }
+
+ private static String selectFields(String[] fields, List<Integer> fieldList,
+ int allFieldsFrom, String separator) {
+ String retv = null;
+ int i = 0;
+ StringBuffer sb = null;
+ if (fieldList != null && fieldList.size() > 0) {
+ if (sb == null) {
+ sb = new StringBuffer();
+ }
+ for (Integer index : fieldList) {
+ if (index < fields.length) {
+ sb.append(fields[index]);
+ }
+ sb.append(separator);
+ }
+ }
+ if (allFieldsFrom >= 0) {
+ if (sb == null) {
+ sb = new StringBuffer();
+ }
+ for (i = allFieldsFrom; i < fields.length; i++) {
+ sb.append(fields[i]).append(separator);
+ }
+ }
+ if (sb != null) {
+ retv = sb.toString();
+ if (retv.length() > 0) {
+ retv = retv.substring(0, retv.length() - 1);
+ }
+ }
+ return retv;
+ }
+
+ public static int parseOutputKeyValueSpec(String keyValueSpec,
+ List<Integer> keyFieldList, List<Integer> valueFieldList) {
+ String[] keyValSpecs = keyValueSpec.split(":", -1);
+
+ String[] keySpec = keyValSpecs[0].split(",");
+
+ String[] valSpec = new String[0];
+ if (keyValSpecs.length > 1) {
+ valSpec = keyValSpecs[1].split(",");
+ }
+
+ FieldSelectionHelper.extractFields(keySpec, keyFieldList);
+ return FieldSelectionHelper.extractFields(valSpec, valueFieldList);
+ }
+
+ public static String specToString(String fieldSeparator, String keyValueSpec,
+ int allValueFieldsFrom, List<Integer> keyFieldList,
+ List<Integer> valueFieldList) {
+ StringBuffer sb = new StringBuffer();
+ sb.append("fieldSeparator: ").append(fieldSeparator).append("\n");
+
+ sb.append("keyValueSpec: ").append(keyValueSpec).append("\n");
+ sb.append("allValueFieldsFrom: ").append(allValueFieldsFrom);
+ sb.append("\n");
+ sb.append("keyFieldList.length: ").append(keyFieldList.size());
+ sb.append("\n");
+ for (Integer field : keyFieldList) {
+ sb.append("\t").append(field).append("\n");
+ }
+ sb.append("valueFieldList.length: ").append(valueFieldList.size());
+ sb.append("\n");
+ for (Integer field : valueFieldList) {
+ sb.append("\t").append(field).append("\n");
+ }
+ return sb.toString();
+ }
+
+ private Text key = null;
+ private Text value = null;
+
+ public FieldSelectionHelper() {
+ }
+
+ public FieldSelectionHelper(Text key, Text val) {
+ this.key = key;
+ this.value = val;
+ }
+
+ public Text getKey() {
+ return key;
+ }
+
+ public Text getValue() {
+ return value;
+ }
+
+ public void extractOutputKeyValue(String key, String val,
+ String fieldSep, List<Integer> keyFieldList, List<Integer> valFieldList,
+ int allValueFieldsFrom, boolean ignoreKey, boolean isMap) {
+ if (!ignoreKey) {
+ val = key + val;
+ }
+ String[] fields = val.split(fieldSep);
+
+ String newKey = selectFields(fields, keyFieldList, -1, fieldSep);
+ String newVal = selectFields(fields, valFieldList, allValueFieldsFrom,
+ fieldSep);
+ if (isMap && newKey == null) {
+ newKey = newVal;
+ newVal = null;
+ }
+
+ if (newKey != null) {
+ this.key = new Text(newKey);
+ }
+ if (newVal != null) {
+ this.value = new Text(newVal);
+ }
+ }
+}
Added: hadoop/mapreduce/trunk/src/java/org/apache/hadoop/mapreduce/lib/fieldsel/FieldSelectionMapper.java
URL: http://svn.apache.org/viewvc/hadoop/mapreduce/trunk/src/java/org/apache/hadoop/mapreduce/lib/fieldsel/FieldSelectionMapper.java?rev=798778&view=auto
==============================================================================
--- hadoop/mapreduce/trunk/src/java/org/apache/hadoop/mapreduce/lib/fieldsel/FieldSelectionMapper.java (added)
+++ hadoop/mapreduce/trunk/src/java/org/apache/hadoop/mapreduce/lib/fieldsel/FieldSelectionMapper.java Wed Jul 29 04:38:40 2009
@@ -0,0 +1,104 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.hadoop.mapreduce.lib.fieldsel;
+
+import java.io.IOException;
+import java.util.ArrayList;
+import java.util.List;
+
+import org.apache.commons.logging.Log;
+import org.apache.commons.logging.LogFactory;
+import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.io.Text;
+import org.apache.hadoop.mapreduce.Mapper;
+import org.apache.hadoop.mapreduce.lib.input.TextInputFormat;
+
+/**
+ * This class implements a mapper class that can be used to perform
+ * field selections in a manner similar to unix cut. The input data is treated
+ * as fields separated by a user specified separator (the default value is
+ * "\t"). The user can specify a list of fields that form the map output keys,
+ * and a list of fields that form the map output values. If the inputformat is
+ * TextInputFormat, the mapper will ignore the key to the map function. and the
+ * fields are from the value only. Otherwise, the fields are the union of those
+ * from the key and those from the value.
+ *
+ * The field separator is under attribute "mapred.data.field.separator"
+ *
+ * The map output field list spec is under attribute
+ * "map.output.key.value.fields.spec". The value is expected to be like
+ * "keyFieldsSpec:valueFieldsSpec" key/valueFieldsSpec are comma (,) separated
+ * field spec: fieldSpec,fieldSpec,fieldSpec ... Each field spec can be a
+ * simple number (e.g. 5) specifying a specific field, or a range (like 2-5)
+ * to specify a range of fields, or an open range (like 3-) specifying all
+ * the fields starting from field 3. The open range field spec applies value
+ * fields only. They have no effect on the key fields.
+ *
+ * Here is an example: "4,3,0,1:6,5,1-3,7-". It specifies to use fields
+ * 4,3,0 and 1 for keys, and use fields 6,5,1,2,3,7 and above for values.
+ */
+public class FieldSelectionMapper<K, V>
+ extends Mapper<K, V, Text, Text> {
+
+ private String mapOutputKeyValueSpec;
+
+ private boolean ignoreInputKey;
+
+ private String fieldSeparator = "\t";
+
+ private List<Integer> mapOutputKeyFieldList = new ArrayList<Integer>();
+
+ private List<Integer> mapOutputValueFieldList = new ArrayList<Integer>();
+
+ private int allMapValueFieldsFrom = -1;
+
+ public static final Log LOG = LogFactory.getLog("FieldSelectionMapReduce");
+
+ public void setup(Context context)
+ throws IOException, InterruptedException {
+ Configuration conf = context.getConfiguration();
+ this.fieldSeparator = conf.get("mapred.data.field.separator", "\t");
+ this.mapOutputKeyValueSpec =
+ conf.get("map.output.key.value.fields.spec", "0-:");
+ try {
+ this.ignoreInputKey = TextInputFormat.class.getCanonicalName().equals(
+ context.getInputFormatClass().getCanonicalName());
+ } catch (ClassNotFoundException e) {
+ throw new IOException("Input format class not found", e);
+ }
+ allMapValueFieldsFrom = FieldSelectionHelper.parseOutputKeyValueSpec(
+ mapOutputKeyValueSpec, mapOutputKeyFieldList, mapOutputValueFieldList);
+ LOG.info(FieldSelectionHelper.specToString(fieldSeparator,
+ mapOutputKeyValueSpec, allMapValueFieldsFrom, mapOutputKeyFieldList,
+ mapOutputValueFieldList) + "\nignoreInputKey:" + ignoreInputKey);
+ }
+
+ /**
+ * The identify function. Input key/value pair is written directly to output.
+ */
+ public void map(K key, V val, Context context)
+ throws IOException, InterruptedException {
+ FieldSelectionHelper helper = new FieldSelectionHelper(
+ FieldSelectionHelper.emptyText, FieldSelectionHelper.emptyText);
+ helper.extractOutputKeyValue(key.toString(), val.toString(),
+ fieldSeparator, mapOutputKeyFieldList, mapOutputValueFieldList,
+ allMapValueFieldsFrom, ignoreInputKey, true);
+ context.write(helper.getKey(), helper.getValue());
+ }
+}
Added: hadoop/mapreduce/trunk/src/java/org/apache/hadoop/mapreduce/lib/fieldsel/FieldSelectionReducer.java
URL: http://svn.apache.org/viewvc/hadoop/mapreduce/trunk/src/java/org/apache/hadoop/mapreduce/lib/fieldsel/FieldSelectionReducer.java?rev=798778&view=auto
==============================================================================
--- hadoop/mapreduce/trunk/src/java/org/apache/hadoop/mapreduce/lib/fieldsel/FieldSelectionReducer.java (added)
+++ hadoop/mapreduce/trunk/src/java/org/apache/hadoop/mapreduce/lib/fieldsel/FieldSelectionReducer.java Wed Jul 29 04:38:40 2009
@@ -0,0 +1,100 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.hadoop.mapreduce.lib.fieldsel;
+
+import java.io.IOException;
+import java.util.ArrayList;
+import java.util.List;
+
+import org.apache.commons.logging.Log;
+import org.apache.commons.logging.LogFactory;
+import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.io.Text;
+import org.apache.hadoop.mapreduce.Reducer;
+
+/**
+ * This class implements a reducer class that can be used to perform field
+ * selections in a manner similar to unix cut.
+ *
+ * The input data is treated as fields separated by a user specified
+ * separator (the default value is "\t"). The user can specify a list of
+ * fields that form the reduce output keys, and a list of fields that form
+ * the reduce output values. The fields are the union of those from the key
+ * and those from the value.
+ *
+ * The field separator is under attribute "mapred.data.field.separator"
+ *
+ * The reduce output field list spec is under attribute
+ * "reduce.output.key.value.fields.spec". The value is expected to be like
+ * "keyFieldsSpec:valueFieldsSpec" key/valueFieldsSpec are comma (,)
+ * separated field spec: fieldSpec,fieldSpec,fieldSpec ... Each field spec
+ * can be a simple number (e.g. 5) specifying a specific field, or a range
+ * (like 2-5) to specify a range of fields, or an open range (like 3-)
+ * specifying all the fields starting from field 3. The open range field
+ * spec applies value fields only. They have no effect on the key fields.
+ *
+ * Here is an example: "4,3,0,1:6,5,1-3,7-". It specifies to use fields
+ * 4,3,0 and 1 for keys, and use fields 6,5,1,2,3,7 and above for values.
+ */
+public class FieldSelectionReducer<K, V>
+ extends Reducer<Text, Text, Text, Text> {
+
+ private String fieldSeparator = "\t";
+
+ private String reduceOutputKeyValueSpec;
+
+ private List<Integer> reduceOutputKeyFieldList = new ArrayList<Integer>();
+
+ private List<Integer> reduceOutputValueFieldList = new ArrayList<Integer>();
+
+ private int allReduceValueFieldsFrom = -1;
+
+ public static final Log LOG = LogFactory.getLog("FieldSelectionMapReduce");
+
+ public void setup(Context context)
+ throws IOException, InterruptedException {
+ Configuration conf = context.getConfiguration();
+
+ this.fieldSeparator = conf.get("mapred.data.field.separator", "\t");
+
+ this.reduceOutputKeyValueSpec =
+ conf.get("reduce.output.key.value.fields.spec", "0-:");
+
+ allReduceValueFieldsFrom = FieldSelectionHelper.parseOutputKeyValueSpec(
+ reduceOutputKeyValueSpec, reduceOutputKeyFieldList,
+ reduceOutputValueFieldList);
+
+ LOG.info(FieldSelectionHelper.specToString(fieldSeparator,
+ reduceOutputKeyValueSpec, allReduceValueFieldsFrom,
+ reduceOutputKeyFieldList, reduceOutputValueFieldList));
+ }
+
+ public void reduce(Text key, Iterable<Text> values, Context context)
+ throws IOException, InterruptedException {
+ String keyStr = key.toString() + this.fieldSeparator;
+
+ for (Text val : values) {
+ FieldSelectionHelper helper = new FieldSelectionHelper();
+ helper.extractOutputKeyValue(keyStr, val.toString(),
+ fieldSeparator, reduceOutputKeyFieldList,
+ reduceOutputValueFieldList, allReduceValueFieldsFrom, false, false);
+ context.write(helper.getKey(), helper.getValue());
+ }
+ }
+}
Modified: hadoop/mapreduce/trunk/src/test/mapred/org/apache/hadoop/mapred/TestFieldSelection.java
URL: http://svn.apache.org/viewvc/hadoop/mapreduce/trunk/src/test/mapred/org/apache/hadoop/mapred/TestFieldSelection.java?rev=798778&r1=798777&r2=798778&view=diff
==============================================================================
--- hadoop/mapreduce/trunk/src/test/mapred/org/apache/hadoop/mapred/TestFieldSelection.java (original)
+++ hadoop/mapreduce/trunk/src/test/mapred/org/apache/hadoop/mapred/TestFieldSelection.java Wed Jul 29 04:38:40 2009
@@ -20,6 +20,8 @@
import org.apache.hadoop.fs.*;
import org.apache.hadoop.io.*;
import org.apache.hadoop.mapred.lib.*;
+import org.apache.hadoop.mapreduce.lib.fieldsel.TestMRFieldSelection;
+
import junit.framework.TestCase;
import java.text.NumberFormat;
@@ -50,39 +52,9 @@
StringBuffer inputData = new StringBuffer();
StringBuffer expectedOutput = new StringBuffer();
+ TestMRFieldSelection.constructInputOutputData(inputData,
+ expectedOutput, numOfInputLines);
FSDataOutputStream fileOut = fs.create(new Path(INPUT_DIR, inputFile));
- for (int i = 0; i < numOfInputLines; i++) {
- inputData.append(idFormat.format(i));
- inputData.append("-").append(idFormat.format(i+1));
- inputData.append("-").append(idFormat.format(i+2));
- inputData.append("-").append(idFormat.format(i+3));
- inputData.append("-").append(idFormat.format(i+4));
- inputData.append("-").append(idFormat.format(i+5));
- inputData.append("-").append(idFormat.format(i+6));
- inputData.append("\n");
-
-
- expectedOutput.append(idFormat.format(i+3));
- expectedOutput.append("-" ).append (idFormat.format(i+2));
- expectedOutput.append("-" ).append (idFormat.format(i+1));
- expectedOutput.append("-" ).append (idFormat.format(i+5));
- expectedOutput.append("-" ).append (idFormat.format(i+6));
-
- expectedOutput.append("-" ).append (idFormat.format(i+6));
- expectedOutput.append("-" ).append (idFormat.format(i+5));
- expectedOutput.append("-" ).append (idFormat.format(i+1));
- expectedOutput.append("-" ).append (idFormat.format(i+2));
- expectedOutput.append("-" ).append (idFormat.format(i+3));
-
- expectedOutput.append("-" ).append (idFormat.format(i+0));
- expectedOutput.append("-" ).append (idFormat.format(i+1));
- expectedOutput.append("-" ).append (idFormat.format(i+2));
- expectedOutput.append("-" ).append (idFormat.format(i+3));
- expectedOutput.append("-" ).append (idFormat.format(i+4));
- expectedOutput.append("-" ).append (idFormat.format(i+5));
- expectedOutput.append("-" ).append (idFormat.format(i+6));
- expectedOutput.append("\n");
- }
fileOut.write(inputData.toString().getBytes("utf-8"));
fileOut.close();
Modified: hadoop/mapreduce/trunk/src/test/mapred/org/apache/hadoop/mapreduce/MapReduceTestUtil.java
URL: http://svn.apache.org/viewvc/hadoop/mapreduce/trunk/src/test/mapred/org/apache/hadoop/mapreduce/MapReduceTestUtil.java?rev=798778&r1=798777&r2=798778&view=diff
==============================================================================
--- hadoop/mapreduce/trunk/src/test/mapred/org/apache/hadoop/mapreduce/MapReduceTestUtil.java (original)
+++ hadoop/mapreduce/trunk/src/test/mapred/org/apache/hadoop/mapreduce/MapReduceTestUtil.java Wed Jul 29 04:38:40 2009
@@ -18,20 +18,26 @@
package org.apache.hadoop.mapreduce;
+import java.io.BufferedReader;
import java.io.DataOutputStream;
import java.io.IOException;
+import java.io.InputStreamReader;
import java.text.NumberFormat;
import java.util.Iterator;
import java.util.Random;
+import org.apache.commons.logging.Log;
+import org.apache.commons.logging.LogFactory;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.fs.FSDataOutputStream;
import org.apache.hadoop.fs.FileSystem;
+import org.apache.hadoop.fs.FileUtil;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.io.LongWritable;
import org.apache.hadoop.io.Text;
import org.apache.hadoop.io.Writable;
import org.apache.hadoop.io.WritableComparable;
+import org.apache.hadoop.mapred.OutputLogFilter;
import org.apache.hadoop.mapreduce.Job;
import org.apache.hadoop.mapreduce.Mapper;
import org.apache.hadoop.mapreduce.Reducer;
@@ -42,6 +48,8 @@
* Utility methods used in various Job Control unit tests.
*/
public class MapReduceTestUtil {
+ public static final Log LOG =
+ LogFactory.getLog(MapReduceTestUtil.class.getName());
static private Random rand = new Random();
@@ -280,4 +288,27 @@
}
};
}
+
+ public static String readOutput(Path outDir, Configuration conf)
+ throws IOException {
+ FileSystem fs = outDir.getFileSystem(conf);
+ StringBuffer result = new StringBuffer();
+
+ Path[] fileList = FileUtil.stat2Paths(fs.listStatus(outDir,
+ new OutputLogFilter()));
+ for (Path outputFile : fileList) {
+ LOG.info("Path" + ": "+ outputFile);
+ BufferedReader file =
+ new BufferedReader(new InputStreamReader(fs.open(outputFile)));
+ String line = file.readLine();
+ while (line != null) {
+ result.append(line);
+ result.append("\n");
+ line = file.readLine();
+ }
+ file.close();
+ }
+ return result.toString();
+ }
+
}
Added: hadoop/mapreduce/trunk/src/test/mapred/org/apache/hadoop/mapreduce/lib/fieldsel/TestMRFieldSelection.java
URL: http://svn.apache.org/viewvc/hadoop/mapreduce/trunk/src/test/mapred/org/apache/hadoop/mapreduce/lib/fieldsel/TestMRFieldSelection.java?rev=798778&view=auto
==============================================================================
--- hadoop/mapreduce/trunk/src/test/mapred/org/apache/hadoop/mapreduce/lib/fieldsel/TestMRFieldSelection.java (added)
+++ hadoop/mapreduce/trunk/src/test/mapred/org/apache/hadoop/mapreduce/lib/fieldsel/TestMRFieldSelection.java Wed Jul 29 04:38:40 2009
@@ -0,0 +1,123 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.hadoop.mapreduce.lib.fieldsel;
+
+import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.fs.*;
+import org.apache.hadoop.io.*;
+import org.apache.hadoop.mapreduce.Job;
+import org.apache.hadoop.mapreduce.MapReduceTestUtil;
+
+import junit.framework.TestCase;
+import java.text.NumberFormat;
+
+public class TestMRFieldSelection extends TestCase {
+
+private static NumberFormat idFormat = NumberFormat.getInstance();
+ static {
+ idFormat.setMinimumIntegerDigits(4);
+ idFormat.setGroupingUsed(false);
+ }
+
+ public void testFieldSelection() throws Exception {
+ launch();
+ }
+ private static Path testDir = new Path(
+ System.getProperty("test.build.data", "/tmp"), "field");
+
+ public static void launch() throws Exception {
+ Configuration conf = new Configuration();
+ FileSystem fs = FileSystem.get(conf);
+ int numOfInputLines = 10;
+
+ Path outDir = new Path(testDir, "output_for_field_selection_test");
+ Path inDir = new Path(testDir, "input_for_field_selection_test");
+
+ StringBuffer inputData = new StringBuffer();
+ StringBuffer expectedOutput = new StringBuffer();
+ constructInputOutputData(inputData, expectedOutput, numOfInputLines);
+
+ conf.set("mapred.data.field.separator", "-");
+ conf.set("map.output.key.value.fields.spec", "6,5,1-3:0-");
+ conf.set("reduce.output.key.value.fields.spec", ":4,3,2,1,0,0-");
+ Job job = MapReduceTestUtil.createJob(conf, inDir, outDir,
+ 1, 1, inputData.toString());
+ job.setMapperClass(FieldSelectionMapper.class);
+ job.setReducerClass(FieldSelectionReducer.class);
+ job.setOutputKeyClass(Text.class);
+ job.setOutputValueClass(Text.class);
+ job.setNumReduceTasks(1);
+
+ job.waitForCompletion(true);
+ assertTrue("Job Failed!", job.isSuccessful());
+
+ //
+ // Finally, we compare the reconstructed answer key with the
+ // original one. Remember, we need to ignore zero-count items
+ // in the original key.
+ //
+ String outdata = MapReduceTestUtil.readOutput(outDir, conf);
+ assertEquals("Outputs doesnt match.",expectedOutput.toString(), outdata);
+ fs.delete(outDir, true);
+ }
+
+ public static void constructInputOutputData(StringBuffer inputData,
+ StringBuffer expectedOutput, int numOfInputLines) {
+ for (int i = 0; i < numOfInputLines; i++) {
+ inputData.append(idFormat.format(i));
+ inputData.append("-").append(idFormat.format(i+1));
+ inputData.append("-").append(idFormat.format(i+2));
+ inputData.append("-").append(idFormat.format(i+3));
+ inputData.append("-").append(idFormat.format(i+4));
+ inputData.append("-").append(idFormat.format(i+5));
+ inputData.append("-").append(idFormat.format(i+6));
+ inputData.append("\n");
+
+ expectedOutput.append(idFormat.format(i+3));
+ expectedOutput.append("-" ).append (idFormat.format(i+2));
+ expectedOutput.append("-" ).append (idFormat.format(i+1));
+ expectedOutput.append("-" ).append (idFormat.format(i+5));
+ expectedOutput.append("-" ).append (idFormat.format(i+6));
+
+ expectedOutput.append("-" ).append (idFormat.format(i+6));
+ expectedOutput.append("-" ).append (idFormat.format(i+5));
+ expectedOutput.append("-" ).append (idFormat.format(i+1));
+ expectedOutput.append("-" ).append (idFormat.format(i+2));
+ expectedOutput.append("-" ).append (idFormat.format(i+3));
+ expectedOutput.append("-" ).append (idFormat.format(i+0));
+ expectedOutput.append("-" ).append (idFormat.format(i+1));
+ expectedOutput.append("-" ).append (idFormat.format(i+2));
+ expectedOutput.append("-" ).append (idFormat.format(i+3));
+ expectedOutput.append("-" ).append (idFormat.format(i+4));
+ expectedOutput.append("-" ).append (idFormat.format(i+5));
+ expectedOutput.append("-" ).append (idFormat.format(i+6));
+ expectedOutput.append("\n");
+ }
+ System.out.println("inputData:");
+ System.out.println(inputData.toString());
+ System.out.println("ExpectedData:");
+ System.out.println(expectedOutput.toString());
+ }
+
+ /**
+ * Launches all the tasks in order.
+ */
+ public static void main(String[] argv) throws Exception {
+ launch();
+ }
+}