You are viewing a plain text version of this content. The canonical link for it is here.
Posted to common-commits@hadoop.apache.org by to...@apache.org on 2012/01/25 00:22:01 UTC

svn commit: r1235548 [2/8] - in /hadoop/common/branches/branch-1: ./ src/core/org/apache/hadoop/conf/ src/core/org/apache/hadoop/io/ src/mapred/org/apache/hadoop/mapreduce/ src/mapred/org/apache/hadoop/mapreduce/lib/db/ src/mapred/org/apache/hadoop/map...

Added: hadoop/common/branches/branch-1/src/mapred/org/apache/hadoop/mapreduce/lib/db/DataDrivenDBRecordReader.java
URL: http://svn.apache.org/viewvc/hadoop/common/branches/branch-1/src/mapred/org/apache/hadoop/mapreduce/lib/db/DataDrivenDBRecordReader.java?rev=1235548&view=auto
==============================================================================
--- hadoop/common/branches/branch-1/src/mapred/org/apache/hadoop/mapreduce/lib/db/DataDrivenDBRecordReader.java (added)
+++ hadoop/common/branches/branch-1/src/mapred/org/apache/hadoop/mapreduce/lib/db/DataDrivenDBRecordReader.java Tue Jan 24 23:21:58 2012
@@ -0,0 +1,137 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.hadoop.mapreduce.lib.db;
+
+import java.io.DataInput;
+import java.io.DataOutput;
+import java.io.IOException;
+import java.sql.Connection;
+import java.sql.DatabaseMetaData;
+import java.sql.PreparedStatement;
+import java.sql.ResultSet;
+import java.sql.SQLException;
+import java.sql.Statement;
+import java.util.ArrayList;
+import java.util.List;
+
+import org.apache.commons.logging.Log;
+import org.apache.commons.logging.LogFactory;
+import org.apache.hadoop.io.LongWritable;
+import org.apache.hadoop.io.Writable;
+import org.apache.hadoop.mapreduce.InputFormat;
+import org.apache.hadoop.mapreduce.InputSplit;
+import org.apache.hadoop.mapreduce.Job;
+import org.apache.hadoop.mapreduce.JobContext;
+import org.apache.hadoop.mapreduce.RecordReader;
+import org.apache.hadoop.mapreduce.TaskAttemptContext;
+import org.apache.hadoop.util.ReflectionUtils;
+import org.apache.hadoop.classification.InterfaceAudience;
+import org.apache.hadoop.classification.InterfaceStability;
+import org.apache.hadoop.conf.Configurable;
+import org.apache.hadoop.conf.Configuration;
+
+/**
+ * A RecordReader that reads records from a SQL table,
+ * using data-driven WHERE clause splits.
+ * Emits LongWritables containing the record number as
+ * key and DBWritables as value.
+ */
+@InterfaceAudience.Public
+@InterfaceStability.Evolving
+public class DataDrivenDBRecordReader<T extends DBWritable> extends DBRecordReader<T> {
+
+  private static final Log LOG = LogFactory.getLog(DataDrivenDBRecordReader.class);
+
+  private String dbProductName; // database manufacturer string.
+
+  /**
+   * @param split The InputSplit to read data for
+   * @throws SQLException 
+   */
+  public DataDrivenDBRecordReader(DBInputFormat.DBInputSplit split,
+      Class<T> inputClass, Configuration conf, Connection conn, DBConfiguration dbConfig,
+      String cond, String [] fields, String table, String dbProduct)
+      throws SQLException {
+    super(split, inputClass, conf, conn, dbConfig, cond, fields, table);
+    this.dbProductName = dbProduct;
+  }
+
+  /** Returns the query for selecting the records,
+   * subclasses can override this for custom behaviour.*/
+  @SuppressWarnings("unchecked")
+  protected String getSelectQuery() {
+    StringBuilder query = new StringBuilder();
+    DataDrivenDBInputFormat.DataDrivenDBInputSplit dataSplit =
+        (DataDrivenDBInputFormat.DataDrivenDBInputSplit) getSplit();
+    DBConfiguration dbConf = getDBConf();
+    String [] fieldNames = getFieldNames();
+    String tableName = getTableName();
+    String conditions = getConditions();
+
+    // Build the WHERE clauses associated with the data split first.
+    // We need them in both branches of this function.
+    StringBuilder conditionClauses = new StringBuilder();
+    conditionClauses.append("( ").append(dataSplit.getLowerClause());
+    conditionClauses.append(" ) AND ( ").append(dataSplit.getUpperClause());
+    conditionClauses.append(" )");
+
+    if(dbConf.getInputQuery() == null) {
+      // We need to generate the entire query.
+      query.append("SELECT ");
+
+      for (int i = 0; i < fieldNames.length; i++) {
+        query.append(fieldNames[i]);
+        if (i != fieldNames.length -1) {
+          query.append(", ");
+        }
+      }
+
+      query.append(" FROM ").append(tableName);
+      if (!dbProductName.startsWith("ORACLE")) {
+        // Seems to be necessary for hsqldb? Oracle explicitly does *not*
+        // use this clause.
+        query.append(" AS ").append(tableName);
+      }
+      query.append(" WHERE ");
+      if (conditions != null && conditions.length() > 0) {
+        // Put the user's conditions first.
+        query.append("( ").append(conditions).append(" ) AND ");
+      }
+
+      // Now append the conditions associated with our split.
+      query.append(conditionClauses.toString());
+
+    } else {
+      // User provided the query. We replace the special token with our WHERE clause.
+      String inputQuery = dbConf.getInputQuery();
+      if (inputQuery.indexOf(DataDrivenDBInputFormat.SUBSTITUTE_TOKEN) == -1) {
+        LOG.error("Could not find the clause substitution token "
+            + DataDrivenDBInputFormat.SUBSTITUTE_TOKEN + " in the query: ["
+            + inputQuery + "]. Parallel splits may not work correctly.");
+      }
+
+      query.append(inputQuery.replace(DataDrivenDBInputFormat.SUBSTITUTE_TOKEN,
+          conditionClauses.toString()));
+    }
+
+    LOG.debug("Using query: " + query.toString());
+
+    return query.toString();
+  }
+}

Propchange: hadoop/common/branches/branch-1/src/mapred/org/apache/hadoop/mapreduce/lib/db/DataDrivenDBRecordReader.java
------------------------------------------------------------------------------
    svn:eol-style = native

Added: hadoop/common/branches/branch-1/src/mapred/org/apache/hadoop/mapreduce/lib/db/DateSplitter.java
URL: http://svn.apache.org/viewvc/hadoop/common/branches/branch-1/src/mapred/org/apache/hadoop/mapreduce/lib/db/DateSplitter.java?rev=1235548&view=auto
==============================================================================
--- hadoop/common/branches/branch-1/src/mapred/org/apache/hadoop/mapreduce/lib/db/DateSplitter.java (added)
+++ hadoop/common/branches/branch-1/src/mapred/org/apache/hadoop/mapreduce/lib/db/DateSplitter.java Tue Jan 24 23:21:58 2012
@@ -0,0 +1,177 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.hadoop.mapreduce.lib.db;
+
+import java.sql.ResultSet;
+import java.sql.SQLException;
+import java.sql.Time;
+import java.sql.Timestamp;
+import java.sql.Types;
+import java.util.ArrayList;
+import java.util.Date;
+import java.util.List;
+
+import org.apache.commons.logging.Log;
+import org.apache.commons.logging.LogFactory;
+
+import org.apache.hadoop.classification.InterfaceAudience;
+import org.apache.hadoop.classification.InterfaceStability;
+import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.mapreduce.InputSplit;
+
+/**
+ * Implement DBSplitter over date/time values.
+ * Make use of logic from IntegerSplitter, since date/time are just longs
+ * in Java.
+ */
+@InterfaceAudience.Public
+@InterfaceStability.Evolving
+public class DateSplitter extends IntegerSplitter {
+
+  private static final Log LOG = LogFactory.getLog(DateSplitter.class);
+
+  public List<InputSplit> split(Configuration conf, ResultSet results, String colName)
+      throws SQLException {
+
+    long minVal;
+    long maxVal;
+
+    int sqlDataType = results.getMetaData().getColumnType(1);
+    minVal = resultSetColToLong(results, 1, sqlDataType);
+    maxVal = resultSetColToLong(results, 2, sqlDataType);
+
+    String lowClausePrefix = colName + " >= ";
+    String highClausePrefix = colName + " < ";
+
+    int numSplits = conf.getInt("mapred.map.tasks", 1);
+    if (numSplits < 1) {
+      numSplits = 1;
+    }
+
+    if (minVal == Long.MIN_VALUE && maxVal == Long.MIN_VALUE) {
+      // The range of acceptable dates is NULL to NULL. Just create a single split.
+      List<InputSplit> splits = new ArrayList<InputSplit>();
+      splits.add(new DataDrivenDBInputFormat.DataDrivenDBInputSplit(
+          colName + " IS NULL", colName + " IS NULL"));
+      return splits;
+    }
+
+    // Gather the split point integers
+    List<Long> splitPoints = split(numSplits, minVal, maxVal);
+    List<InputSplit> splits = new ArrayList<InputSplit>();
+
+    // Turn the split points into a set of intervals.
+    long start = splitPoints.get(0);
+    Date startDate = longToDate(start, sqlDataType);
+    if (sqlDataType == Types.TIMESTAMP) {
+      // The lower bound's nanos value needs to match the actual lower-bound nanos.
+      try {
+        ((java.sql.Timestamp) startDate).setNanos(results.getTimestamp(1).getNanos());
+      } catch (NullPointerException npe) {
+        // If the lower bound was NULL, we'll get an NPE; just ignore it and don't set nanos.
+      }
+    }
+
+    for (int i = 1; i < splitPoints.size(); i++) {
+      long end = splitPoints.get(i);
+      Date endDate = longToDate(end, sqlDataType);
+
+      if (i == splitPoints.size() - 1) {
+        if (sqlDataType == Types.TIMESTAMP) {
+          // The upper bound's nanos value needs to match the actual upper-bound nanos.
+          try {
+            ((java.sql.Timestamp) endDate).setNanos(results.getTimestamp(2).getNanos());
+          } catch (NullPointerException npe) {
+            // If the upper bound was NULL, we'll get an NPE; just ignore it and don't set nanos.
+          }
+        }
+        // This is the last one; use a closed interval.
+        splits.add(new DataDrivenDBInputFormat.DataDrivenDBInputSplit(
+            lowClausePrefix + dateToString(startDate),
+            colName + " <= " + dateToString(endDate)));
+      } else {
+        // Normal open-interval case.
+        splits.add(new DataDrivenDBInputFormat.DataDrivenDBInputSplit(
+            lowClausePrefix + dateToString(startDate),
+            highClausePrefix + dateToString(endDate)));
+      }
+
+      start = end;
+      startDate = endDate;
+    }
+
+    if (minVal == Long.MIN_VALUE || maxVal == Long.MIN_VALUE) {
+      // Add an extra split to handle the null case that we saw.
+      splits.add(new DataDrivenDBInputFormat.DataDrivenDBInputSplit(
+          colName + " IS NULL", colName + " IS NULL"));
+    }
+
+    return splits;
+  }
+
+  /** Retrieve the value from the column in a type-appropriate manner and return
+      its timestamp since the epoch. If the column is null, then return Long.MIN_VALUE.
+      This will cause a special split to be generated for the NULL case, but may also
+      cause poorly-balanced splits if most of the actual dates are positive time
+      since the epoch, etc.
+    */
+  private long resultSetColToLong(ResultSet rs, int colNum, int sqlDataType) throws SQLException {
+    try {
+      switch (sqlDataType) {
+      case Types.DATE:
+        return rs.getDate(colNum).getTime();
+      case Types.TIME:
+        return rs.getTime(colNum).getTime();
+      case Types.TIMESTAMP:
+        return rs.getTimestamp(colNum).getTime();
+      default:
+        throw new SQLException("Not a date-type field");
+      }
+    } catch (NullPointerException npe) {
+      // null column. return minimum long value.
+      LOG.warn("Encountered a NULL date in the split column. Splits may be poorly balanced.");
+      return Long.MIN_VALUE;
+    }
+  }
+
+  /**  Parse the long-valued timestamp into the appropriate SQL date type. */
+  private Date longToDate(long val, int sqlDataType) {
+    switch (sqlDataType) {
+    case Types.DATE:
+      return new java.sql.Date(val);
+    case Types.TIME:
+      return new java.sql.Time(val);
+    case Types.TIMESTAMP:
+      return new java.sql.Timestamp(val);
+    default: // Shouldn't ever hit this case.
+      return null;
+    }
+  }
+
+  /**
+   * Given a Date 'd', format it as a string for use in a SQL date
+   * comparison operation.
+   * @param d the date to format.
+   * @return the string representing this date in SQL with any appropriate
+   * quotation characters, etc.
+   */
+  protected String dateToString(Date d) {
+    return "'" + d.toString() + "'";
+  }
+}

Propchange: hadoop/common/branches/branch-1/src/mapred/org/apache/hadoop/mapreduce/lib/db/DateSplitter.java
------------------------------------------------------------------------------
    svn:eol-style = native

Added: hadoop/common/branches/branch-1/src/mapred/org/apache/hadoop/mapreduce/lib/db/FloatSplitter.java
URL: http://svn.apache.org/viewvc/hadoop/common/branches/branch-1/src/mapred/org/apache/hadoop/mapreduce/lib/db/FloatSplitter.java?rev=1235548&view=auto
==============================================================================
--- hadoop/common/branches/branch-1/src/mapred/org/apache/hadoop/mapreduce/lib/db/FloatSplitter.java (added)
+++ hadoop/common/branches/branch-1/src/mapred/org/apache/hadoop/mapreduce/lib/db/FloatSplitter.java Tue Jan 24 23:21:58 2012
@@ -0,0 +1,104 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.hadoop.mapreduce.lib.db;
+
+import java.sql.ResultSet;
+import java.sql.SQLException;
+import java.util.ArrayList;
+import java.util.List;
+
+import org.apache.commons.logging.Log;
+import org.apache.commons.logging.LogFactory;
+
+import org.apache.hadoop.classification.InterfaceAudience;
+import org.apache.hadoop.classification.InterfaceStability;
+import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.mapreduce.InputSplit;
+
+/**
+ * Implement DBSplitter over floating-point values.
+ */
+@InterfaceAudience.Public
+@InterfaceStability.Evolving
+public class FloatSplitter implements DBSplitter {
+
+  private static final Log LOG = LogFactory.getLog(FloatSplitter.class);
+
+  private static final double MIN_INCREMENT = 10000 * Double.MIN_VALUE;
+
+  public List<InputSplit> split(Configuration conf, ResultSet results, String colName)
+      throws SQLException {
+
+    LOG.warn("Generating splits for a floating-point index column. Due to the");
+    LOG.warn("imprecise representation of floating-point values in Java, this");
+    LOG.warn("may result in an incomplete import.");
+    LOG.warn("You are strongly encouraged to choose an integral split column.");
+
+    List<InputSplit> splits = new ArrayList<InputSplit>();
+
+    if (results.getString(1) == null && results.getString(2) == null) {
+      // Range is null to null. Return a null split accordingly.
+      splits.add(new DataDrivenDBInputFormat.DataDrivenDBInputSplit(
+          colName + " IS NULL", colName + " IS NULL"));
+      return splits;
+    }
+
+    double minVal = results.getDouble(1);
+    double maxVal = results.getDouble(2);
+
+    // Use this as a hint. May need an extra task if the size doesn't
+    // divide cleanly.
+    int numSplits = conf.getInt("mapred.map.tasks", 1);
+    double splitSize = (maxVal - minVal) / (double) numSplits;
+
+    if (splitSize < MIN_INCREMENT) {
+      splitSize = MIN_INCREMENT;
+    }
+
+    String lowClausePrefix = colName + " >= ";
+    String highClausePrefix = colName + " < ";
+
+    double curLower = minVal;
+    double curUpper = curLower + splitSize;
+
+    while (curUpper < maxVal) {
+      splits.add(new DataDrivenDBInputFormat.DataDrivenDBInputSplit(
+          lowClausePrefix + Double.toString(curLower),
+          highClausePrefix + Double.toString(curUpper)));
+
+      curLower = curUpper;
+      curUpper += splitSize;
+    }
+
+    // Catch any overage and create the closed interval for the last split.
+    if (curLower <= maxVal || splits.size() == 1) {
+      splits.add(new DataDrivenDBInputFormat.DataDrivenDBInputSplit(
+          lowClausePrefix + Double.toString(curUpper),
+          colName + " <= " + Double.toString(maxVal)));
+    }
+
+    if (results.getString(1) == null || results.getString(2) == null) {
+      // At least one extrema is null; add a null split.
+      splits.add(new DataDrivenDBInputFormat.DataDrivenDBInputSplit(
+          colName + " IS NULL", colName + " IS NULL"));
+    }
+
+    return splits;
+  }
+}

Propchange: hadoop/common/branches/branch-1/src/mapred/org/apache/hadoop/mapreduce/lib/db/FloatSplitter.java
------------------------------------------------------------------------------
    svn:eol-style = native

Added: hadoop/common/branches/branch-1/src/mapred/org/apache/hadoop/mapreduce/lib/db/IntegerSplitter.java
URL: http://svn.apache.org/viewvc/hadoop/common/branches/branch-1/src/mapred/org/apache/hadoop/mapreduce/lib/db/IntegerSplitter.java?rev=1235548&view=auto
==============================================================================
--- hadoop/common/branches/branch-1/src/mapred/org/apache/hadoop/mapreduce/lib/db/IntegerSplitter.java (added)
+++ hadoop/common/branches/branch-1/src/mapred/org/apache/hadoop/mapreduce/lib/db/IntegerSplitter.java Tue Jan 24 23:21:58 2012
@@ -0,0 +1,131 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.hadoop.mapreduce.lib.db;
+
+import java.sql.ResultSet;
+import java.sql.SQLException;
+import java.util.ArrayList;
+import java.util.List;
+
+import org.apache.hadoop.classification.InterfaceAudience;
+import org.apache.hadoop.classification.InterfaceStability;
+import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.mapreduce.InputSplit;
+
+/**
+ * Implement DBSplitter over integer values.
+ */
+@InterfaceAudience.Public
+@InterfaceStability.Evolving
+public class IntegerSplitter implements DBSplitter {
+  public List<InputSplit> split(Configuration conf, ResultSet results, String colName)
+      throws SQLException {
+
+    long minVal = results.getLong(1);
+    long maxVal = results.getLong(2);
+
+    String lowClausePrefix = colName + " >= ";
+    String highClausePrefix = colName + " < ";
+
+    int numSplits = conf.getInt("mapred.map.tasks", 1);
+    if (numSplits < 1) {
+      numSplits = 1;
+    }
+
+    if (results.getString(1) == null && results.getString(2) == null) {
+      // Range is null to null. Return a null split accordingly.
+      List<InputSplit> splits = new ArrayList<InputSplit>();
+      splits.add(new DataDrivenDBInputFormat.DataDrivenDBInputSplit(
+          colName + " IS NULL", colName + " IS NULL"));
+      return splits;
+    }
+
+    // Get all the split points together.
+    List<Long> splitPoints = split(numSplits, minVal, maxVal);
+    List<InputSplit> splits = new ArrayList<InputSplit>();
+
+    // Turn the split points into a set of intervals.
+    long start = splitPoints.get(0);
+    for (int i = 1; i < splitPoints.size(); i++) {
+      long end = splitPoints.get(i);
+
+      if (i == splitPoints.size() - 1) {
+        // This is the last one; use a closed interval.
+        splits.add(new DataDrivenDBInputFormat.DataDrivenDBInputSplit(
+            lowClausePrefix + Long.toString(start),
+            colName + " <= " + Long.toString(end)));
+      } else {
+        // Normal open-interval case.
+        splits.add(new DataDrivenDBInputFormat.DataDrivenDBInputSplit(
+            lowClausePrefix + Long.toString(start),
+            highClausePrefix + Long.toString(end)));
+      }
+
+      start = end;
+    }
+
+    if (results.getString(1) == null || results.getString(2) == null) {
+      // At least one extrema is null; add a null split.
+      splits.add(new DataDrivenDBInputFormat.DataDrivenDBInputSplit(
+          colName + " IS NULL", colName + " IS NULL"));
+    }
+
+    return splits;
+  }
+
+  /**
+   * Returns a list of longs one element longer than the list of input splits.
+   * This represents the boundaries between input splits.
+   * All splits are open on the top end, except the last one.
+   *
+   * So the list [0, 5, 8, 12, 18] would represent splits capturing the intervals:
+   *
+   * [0, 5)
+   * [5, 8)
+   * [8, 12)
+   * [12, 18] note the closed interval for the last split.
+   */
+  List<Long> split(long numSplits, long minVal, long maxVal)
+      throws SQLException {
+
+    List<Long> splits = new ArrayList<Long>();
+
+    // Use numSplits as a hint. May need an extra task if the size doesn't
+    // divide cleanly.
+
+    long splitSize = (maxVal - minVal) / numSplits;
+    if (splitSize < 1) {
+      splitSize = 1;
+    }
+
+    long curVal = minVal;
+
+    while (curVal <= maxVal) {
+      splits.add(curVal);
+      curVal += splitSize;
+    }
+
+    if (splits.get(splits.size() - 1) != maxVal || splits.size() == 1) {
+      // We didn't end on the maxVal. Add that to the end of the list.
+      splits.add(maxVal);
+    }
+
+    return splits;
+  }
+}

Propchange: hadoop/common/branches/branch-1/src/mapred/org/apache/hadoop/mapreduce/lib/db/IntegerSplitter.java
------------------------------------------------------------------------------
    svn:eol-style = native

Added: hadoop/common/branches/branch-1/src/mapred/org/apache/hadoop/mapreduce/lib/db/MySQLDBRecordReader.java
URL: http://svn.apache.org/viewvc/hadoop/common/branches/branch-1/src/mapred/org/apache/hadoop/mapreduce/lib/db/MySQLDBRecordReader.java?rev=1235548&view=auto
==============================================================================
--- hadoop/common/branches/branch-1/src/mapred/org/apache/hadoop/mapreduce/lib/db/MySQLDBRecordReader.java (added)
+++ hadoop/common/branches/branch-1/src/mapred/org/apache/hadoop/mapreduce/lib/db/MySQLDBRecordReader.java Tue Jan 24 23:21:58 2012
@@ -0,0 +1,50 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.hadoop.mapreduce.lib.db;
+
+import java.sql.Connection;
+import java.sql.ResultSet;
+import java.sql.PreparedStatement;
+import java.sql.SQLException;
+
+import org.apache.hadoop.classification.InterfaceAudience;
+import org.apache.hadoop.classification.InterfaceStability;
+import org.apache.hadoop.conf.Configuration;
+
+/**
+ * A RecordReader that reads records from a MySQL table.
+ */
+@InterfaceAudience.Public
+@InterfaceStability.Evolving
+public class MySQLDBRecordReader<T extends DBWritable> extends DBRecordReader<T> {
+
+  public MySQLDBRecordReader(DBInputFormat.DBInputSplit split, 
+      Class<T> inputClass, Configuration conf, Connection conn, DBConfiguration dbConfig,
+      String cond, String [] fields, String table) throws SQLException {
+    super(split, inputClass, conf, conn, dbConfig, cond, fields, table);
+  }
+
+  // Execute statements for mysql in unbuffered mode.
+  protected ResultSet executeQuery(String query) throws SQLException {
+    statement = getConnection().prepareStatement(query,
+      ResultSet.TYPE_FORWARD_ONLY, ResultSet.CONCUR_READ_ONLY);
+    statement.setFetchSize(Integer.MIN_VALUE); // MySQL: read row-at-a-time.
+    return statement.executeQuery();
+  }
+}

Propchange: hadoop/common/branches/branch-1/src/mapred/org/apache/hadoop/mapreduce/lib/db/MySQLDBRecordReader.java
------------------------------------------------------------------------------
    svn:eol-style = native

Added: hadoop/common/branches/branch-1/src/mapred/org/apache/hadoop/mapreduce/lib/db/MySQLDataDrivenDBRecordReader.java
URL: http://svn.apache.org/viewvc/hadoop/common/branches/branch-1/src/mapred/org/apache/hadoop/mapreduce/lib/db/MySQLDataDrivenDBRecordReader.java?rev=1235548&view=auto
==============================================================================
--- hadoop/common/branches/branch-1/src/mapred/org/apache/hadoop/mapreduce/lib/db/MySQLDataDrivenDBRecordReader.java (added)
+++ hadoop/common/branches/branch-1/src/mapred/org/apache/hadoop/mapreduce/lib/db/MySQLDataDrivenDBRecordReader.java Tue Jan 24 23:21:58 2012
@@ -0,0 +1,51 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.hadoop.mapreduce.lib.db;
+
+import java.sql.Connection;
+import java.sql.ResultSet;
+import java.sql.PreparedStatement;
+import java.sql.SQLException;
+
+import org.apache.hadoop.classification.InterfaceAudience;
+import org.apache.hadoop.classification.InterfaceStability;
+import org.apache.hadoop.conf.Configuration;
+
+/**
+ * A RecordReader that reads records from a MySQL table via DataDrivenDBRecordReader
+ */
+@InterfaceAudience.Public
+@InterfaceStability.Evolving
+public class MySQLDataDrivenDBRecordReader<T extends DBWritable>
+    extends DataDrivenDBRecordReader<T> {
+
+  public MySQLDataDrivenDBRecordReader(DBInputFormat.DBInputSplit split,
+      Class<T> inputClass, Configuration conf, Connection conn, DBConfiguration dbConfig,
+      String cond, String [] fields, String table) throws SQLException {
+    super(split, inputClass, conf, conn, dbConfig, cond, fields, table, "MYSQL");
+  }
+
+  // Execute statements for mysql in unbuffered mode.
+  protected ResultSet executeQuery(String query) throws SQLException {
+    statement = getConnection().prepareStatement(query,
+      ResultSet.TYPE_FORWARD_ONLY, ResultSet.CONCUR_READ_ONLY);
+    statement.setFetchSize(Integer.MIN_VALUE); // MySQL: read row-at-a-time.
+    return statement.executeQuery();
+  }
+}

Propchange: hadoop/common/branches/branch-1/src/mapred/org/apache/hadoop/mapreduce/lib/db/MySQLDataDrivenDBRecordReader.java
------------------------------------------------------------------------------
    svn:eol-style = native

Added: hadoop/common/branches/branch-1/src/mapred/org/apache/hadoop/mapreduce/lib/db/OracleDBRecordReader.java
URL: http://svn.apache.org/viewvc/hadoop/common/branches/branch-1/src/mapred/org/apache/hadoop/mapreduce/lib/db/OracleDBRecordReader.java?rev=1235548&view=auto
==============================================================================
--- hadoop/common/branches/branch-1/src/mapred/org/apache/hadoop/mapreduce/lib/db/OracleDBRecordReader.java (added)
+++ hadoop/common/branches/branch-1/src/mapred/org/apache/hadoop/mapreduce/lib/db/OracleDBRecordReader.java Tue Jan 24 23:21:58 2012
@@ -0,0 +1,145 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.hadoop.mapreduce.lib.db;
+
+import java.io.IOException;
+import java.sql.Connection;
+import java.sql.SQLException;
+import java.lang.reflect.Method;
+
+import org.apache.hadoop.classification.InterfaceAudience;
+import org.apache.hadoop.classification.InterfaceStability;
+import org.apache.hadoop.conf.Configuration;
+import org.apache.commons.logging.Log;
+import org.apache.commons.logging.LogFactory;
+
+/**
+ * A RecordReader that reads records from an Oracle SQL table.
+ */
+@InterfaceAudience.Public
+@InterfaceStability.Evolving
+public class OracleDBRecordReader<T extends DBWritable> extends DBRecordReader<T> {
+
+  /** Configuration key to set to a timezone string. */
+  public static final String SESSION_TIMEZONE_KEY = "oracle.sessionTimeZone";
+
+  private static final Log LOG = LogFactory.getLog(OracleDBRecordReader.class);
+
+  public OracleDBRecordReader(DBInputFormat.DBInputSplit split, 
+      Class<T> inputClass, Configuration conf, Connection conn, DBConfiguration dbConfig,
+      String cond, String [] fields, String table) throws SQLException {
+    super(split, inputClass, conf, conn, dbConfig, cond, fields, table);
+    setSessionTimeZone(conf, conn);
+  }
+
+  /** Returns the query for selecting the records from an Oracle DB. */
+  protected String getSelectQuery() {
+    StringBuilder query = new StringBuilder();
+    DBConfiguration dbConf = getDBConf();
+    String conditions = getConditions();
+    String tableName = getTableName();
+    String [] fieldNames = getFieldNames();
+
+    // Oracle-specific codepath to use rownum instead of LIMIT/OFFSET.
+    if(dbConf.getInputQuery() == null) {
+      query.append("SELECT ");
+  
+      for (int i = 0; i < fieldNames.length; i++) {
+        query.append(fieldNames[i]);
+        if (i != fieldNames.length -1) {
+          query.append(", ");
+        }
+      }
+  
+      query.append(" FROM ").append(tableName);
+      if (conditions != null && conditions.length() > 0)
+        query.append(" WHERE ").append(conditions);
+      String orderBy = dbConf.getInputOrderBy();
+      if (orderBy != null && orderBy.length() > 0) {
+        query.append(" ORDER BY ").append(orderBy);
+      }
+    } else {
+      //PREBUILT QUERY
+      query.append(dbConf.getInputQuery());
+    }
+        
+    try {
+      DBInputFormat.DBInputSplit split = getSplit();
+      if (split.getLength() > 0 && split.getStart() > 0){
+        String querystring = query.toString();
+
+        query = new StringBuilder();
+        query.append("SELECT * FROM (SELECT a.*,ROWNUM dbif_rno FROM ( ");
+        query.append(querystring);
+        query.append(" ) a WHERE rownum <= ").append(split.getStart());
+        query.append(" + ").append(split.getLength());
+        query.append(" ) WHERE dbif_rno >= ").append(split.getStart());
+      }
+    } catch (IOException ex) {
+      // ignore, will not throw.
+    }		      
+
+    return query.toString();
+  }
+
+  /**
+   * Set session time zone
+   * @param conf The current configuration.
+   * We read the 'oracle.sessionTimeZone' property from here.
+   * @param conn The connection to alter the timezone properties of.
+   */
+  public static void setSessionTimeZone(Configuration conf,
+      Connection conn) throws SQLException {
+    // need to use reflection to call the method setSessionTimeZone on
+    // the OracleConnection class because oracle specific java libraries are
+    // not accessible in this context.
+    Method method;
+    try {
+      method = conn.getClass().getMethod(
+              "setSessionTimeZone", new Class [] {String.class});
+    } catch (Exception ex) {
+      LOG.error("Could not find method setSessionTimeZone in " + conn.getClass().getName(), ex);
+      // rethrow SQLException
+      throw new SQLException(ex);
+    }
+
+    // Need to set the time zone in order for Java
+    // to correctly access the column "TIMESTAMP WITH LOCAL TIME ZONE".
+    // We can't easily get the correct Oracle-specific timezone string
+    // from Java; just let the user set the timezone in a property.
+    String clientTimeZone = conf.get(SESSION_TIMEZONE_KEY, "GMT");
+    try {
+      method.setAccessible(true);
+      method.invoke(conn, clientTimeZone);
+      LOG.info("Time zone has been set to " + clientTimeZone);
+    } catch (Exception ex) {
+      LOG.warn("Time zone " + clientTimeZone +
+               " could not be set on Oracle database.");
+      LOG.warn("Setting default time zone: GMT");
+      try {
+        // "GMT" timezone is guaranteed to exist.
+        method.invoke(conn, "GMT");
+      } catch (Exception ex2) {
+        LOG.error("Could not set time zone for oracle connection", ex2);
+        // rethrow SQLException
+        throw new SQLException(ex);
+      }
+    }
+  }
+}

Propchange: hadoop/common/branches/branch-1/src/mapred/org/apache/hadoop/mapreduce/lib/db/OracleDBRecordReader.java
------------------------------------------------------------------------------
    svn:eol-style = native

Added: hadoop/common/branches/branch-1/src/mapred/org/apache/hadoop/mapreduce/lib/db/OracleDataDrivenDBInputFormat.java
URL: http://svn.apache.org/viewvc/hadoop/common/branches/branch-1/src/mapred/org/apache/hadoop/mapreduce/lib/db/OracleDataDrivenDBInputFormat.java?rev=1235548&view=auto
==============================================================================
--- hadoop/common/branches/branch-1/src/mapred/org/apache/hadoop/mapreduce/lib/db/OracleDataDrivenDBInputFormat.java (added)
+++ hadoop/common/branches/branch-1/src/mapred/org/apache/hadoop/mapreduce/lib/db/OracleDataDrivenDBInputFormat.java Tue Jan 24 23:21:58 2012
@@ -0,0 +1,93 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.hadoop.mapreduce.lib.db;
+
+import java.io.DataInput;
+import java.io.DataOutput;
+import java.io.IOException;
+import java.sql.Connection;
+import java.sql.DatabaseMetaData;
+import java.sql.PreparedStatement;
+import java.sql.ResultSet;
+import java.sql.SQLException;
+import java.sql.Statement;
+import java.sql.Types;
+import java.util.ArrayList;
+import java.util.List;
+
+import org.apache.commons.logging.Log;
+import org.apache.commons.logging.LogFactory;
+
+import org.apache.hadoop.io.LongWritable;
+import org.apache.hadoop.io.Text;
+import org.apache.hadoop.io.Writable;
+import org.apache.hadoop.mapreduce.InputFormat;
+import org.apache.hadoop.mapreduce.InputSplit;
+import org.apache.hadoop.mapreduce.Job;
+import org.apache.hadoop.mapreduce.JobContext;
+import org.apache.hadoop.mapreduce.RecordReader;
+import org.apache.hadoop.mapreduce.TaskAttemptContext;
+import org.apache.hadoop.util.ReflectionUtils;
+import org.apache.hadoop.classification.InterfaceAudience;
+import org.apache.hadoop.classification.InterfaceStability;
+import org.apache.hadoop.conf.Configurable;
+import org.apache.hadoop.conf.Configuration;
+
+/**
+ * A InputFormat that reads input data from an SQL table in an Oracle db.
+ */
+@InterfaceAudience.Public
+@InterfaceStability.Evolving
+public class OracleDataDrivenDBInputFormat<T extends DBWritable>
+    extends DataDrivenDBInputFormat<T> implements Configurable {
+
+  /**
+   * @return the DBSplitter implementation to use to divide the table/query into InputSplits.
+   */
+  @Override
+  protected DBSplitter getSplitter(int sqlDataType) {
+    switch (sqlDataType) {
+    case Types.DATE:
+    case Types.TIME:
+    case Types.TIMESTAMP:
+      return new OracleDateSplitter();
+
+    default:
+      return super.getSplitter(sqlDataType);
+    }
+  }
+
+  @Override
+  protected RecordReader<LongWritable, T> createDBRecordReader(DBInputSplit split,
+      Configuration conf) throws IOException {
+
+    DBConfiguration dbConf = getDBConf();
+    @SuppressWarnings("unchecked")
+    Class<T> inputClass = (Class<T>) (dbConf.getInputClass());
+
+    try {
+      // Use Oracle-specific db reader
+      return new OracleDataDrivenDBRecordReader<T>(split, inputClass,
+          conf, getConnection(), dbConf, dbConf.getInputConditions(),
+          dbConf.getInputFieldNames(), dbConf.getInputTableName());
+    } catch (SQLException ex) {
+      throw new IOException(ex.getMessage());
+    }
+  }
+}

Propchange: hadoop/common/branches/branch-1/src/mapred/org/apache/hadoop/mapreduce/lib/db/OracleDataDrivenDBInputFormat.java
------------------------------------------------------------------------------
    svn:eol-style = native

Added: hadoop/common/branches/branch-1/src/mapred/org/apache/hadoop/mapreduce/lib/db/OracleDataDrivenDBRecordReader.java
URL: http://svn.apache.org/viewvc/hadoop/common/branches/branch-1/src/mapred/org/apache/hadoop/mapreduce/lib/db/OracleDataDrivenDBRecordReader.java?rev=1235548&view=auto
==============================================================================
--- hadoop/common/branches/branch-1/src/mapred/org/apache/hadoop/mapreduce/lib/db/OracleDataDrivenDBRecordReader.java (added)
+++ hadoop/common/branches/branch-1/src/mapred/org/apache/hadoop/mapreduce/lib/db/OracleDataDrivenDBRecordReader.java Tue Jan 24 23:21:58 2012
@@ -0,0 +1,49 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.hadoop.mapreduce.lib.db;
+
+import java.sql.Connection;
+import java.sql.ResultSet;
+import java.sql.PreparedStatement;
+import java.sql.SQLException;
+
+import org.apache.hadoop.classification.InterfaceAudience;
+import org.apache.hadoop.classification.InterfaceStability;
+import org.apache.hadoop.conf.Configuration;
+
+/**
+ * A RecordReader that reads records from a Oracle table via DataDrivenDBRecordReader
+ */
+@InterfaceAudience.Public
+@InterfaceStability.Evolving
+public class OracleDataDrivenDBRecordReader<T extends DBWritable>
+    extends DataDrivenDBRecordReader<T> {
+
+  public OracleDataDrivenDBRecordReader(DBInputFormat.DBInputSplit split,
+      Class<T> inputClass, Configuration conf, Connection conn,
+      DBConfiguration dbConfig, String cond, String [] fields,
+      String table) throws SQLException {
+
+    super(split, inputClass, conf, conn, dbConfig, cond, fields, table,
+        "ORACLE");
+
+    // Must initialize the tz used by the connection for Oracle.
+    OracleDBRecordReader.setSessionTimeZone(conf, conn);
+  }
+}

Propchange: hadoop/common/branches/branch-1/src/mapred/org/apache/hadoop/mapreduce/lib/db/OracleDataDrivenDBRecordReader.java
------------------------------------------------------------------------------
    svn:eol-style = native

Added: hadoop/common/branches/branch-1/src/mapred/org/apache/hadoop/mapreduce/lib/db/OracleDateSplitter.java
URL: http://svn.apache.org/viewvc/hadoop/common/branches/branch-1/src/mapred/org/apache/hadoop/mapreduce/lib/db/OracleDateSplitter.java?rev=1235548&view=auto
==============================================================================
--- hadoop/common/branches/branch-1/src/mapred/org/apache/hadoop/mapreduce/lib/db/OracleDateSplitter.java (added)
+++ hadoop/common/branches/branch-1/src/mapred/org/apache/hadoop/mapreduce/lib/db/OracleDateSplitter.java Tue Jan 24 23:21:58 2012
@@ -0,0 +1,42 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.hadoop.mapreduce.lib.db;
+
+import java.util.Date;
+
+import org.apache.hadoop.classification.InterfaceAudience;
+import org.apache.hadoop.classification.InterfaceStability;
+
+/**
+ * Implement DBSplitter over date/time values returned by an Oracle db.
+ * Make use of logic from DateSplitter, since this just needs to use
+ * some Oracle-specific functions on the formatting end when generating
+ * InputSplits.
+ */
+@InterfaceAudience.Public
+@InterfaceStability.Evolving
+public class OracleDateSplitter extends DateSplitter {
+
+  @SuppressWarnings("unchecked")
+  @Override
+  protected String dateToString(Date d) {
+    // Oracle Data objects are always actually Timestamps
+    return "TO_TIMESTAMP('" + d.toString() + "', 'YYYY-MM-DD HH24:MI:SS.FF')";
+  }
+}

Propchange: hadoop/common/branches/branch-1/src/mapred/org/apache/hadoop/mapreduce/lib/db/OracleDateSplitter.java
------------------------------------------------------------------------------
    svn:eol-style = native

Added: hadoop/common/branches/branch-1/src/mapred/org/apache/hadoop/mapreduce/lib/db/TextSplitter.java
URL: http://svn.apache.org/viewvc/hadoop/common/branches/branch-1/src/mapred/org/apache/hadoop/mapreduce/lib/db/TextSplitter.java?rev=1235548&view=auto
==============================================================================
--- hadoop/common/branches/branch-1/src/mapred/org/apache/hadoop/mapreduce/lib/db/TextSplitter.java (added)
+++ hadoop/common/branches/branch-1/src/mapred/org/apache/hadoop/mapreduce/lib/db/TextSplitter.java Tue Jan 24 23:21:58 2012
@@ -0,0 +1,220 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.hadoop.mapreduce.lib.db;
+
+import java.sql.ResultSet;
+import java.sql.SQLException;
+import java.math.BigDecimal;
+import java.util.ArrayList;
+import java.util.Iterator;
+import java.util.List;
+
+import org.apache.commons.logging.Log;
+import org.apache.commons.logging.LogFactory;
+
+import org.apache.hadoop.classification.InterfaceAudience;
+import org.apache.hadoop.classification.InterfaceStability;
+import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.mapreduce.InputSplit;
+
+/**
+ * Implement DBSplitter over text strings.
+ */
+@InterfaceAudience.Public
+@InterfaceStability.Evolving
+public class TextSplitter extends BigDecimalSplitter {
+
+  private static final Log LOG = LogFactory.getLog(TextSplitter.class);
+
+  /**
+   * This method needs to determine the splits between two user-provided strings.
+   * In the case where the user's strings are 'A' and 'Z', this is not hard; we 
+   * could create two splits from ['A', 'M') and ['M', 'Z'], 26 splits for strings
+   * beginning with each letter, etc.
+   *
+   * If a user has provided us with the strings "Ham" and "Haze", however, we need
+   * to create splits that differ in the third letter.
+   *
+   * The algorithm used is as follows:
+   * Since there are 2**16 unicode characters, we interpret characters as digits in
+   * base 65536. Given a string 's' containing characters s_0, s_1 .. s_n, we interpret
+   * the string as the number: 0.s_0 s_1 s_2.. s_n in base 65536. Having mapped the
+   * low and high strings into floating-point values, we then use the BigDecimalSplitter
+   * to establish the even split points, then map the resulting floating point values
+   * back into strings.
+   */
+  public List<InputSplit> split(Configuration conf, ResultSet results, String colName)
+      throws SQLException {
+
+    LOG.warn("Generating splits for a textual index column.");
+    LOG.warn("If your database sorts in a case-insensitive order, "
+        + "this may result in a partial import or duplicate records.");
+    LOG.warn("You are strongly encouraged to choose an integral split column.");
+
+    String minString = results.getString(1);
+    String maxString = results.getString(2);
+
+    boolean minIsNull = false;
+
+    // If the min value is null, switch it to an empty string instead for purposes
+    // of interpolation. Then add [null, null] as a special case split.
+    if (null == minString) {
+      minString = "";
+      minIsNull = true;
+    }
+
+    if (null == maxString) {
+      // If the max string is null, then the min string has to be null too.
+      // Just return a special split for this case.
+      List<InputSplit> splits = new ArrayList<InputSplit>();
+      splits.add(new DataDrivenDBInputFormat.DataDrivenDBInputSplit(
+          colName + " IS NULL", colName + " IS NULL"));
+      return splits;
+    }
+
+    // Use this as a hint. May need an extra task if the size doesn't
+    // divide cleanly.
+    int numSplits = conf.getInt("mapred.map.tasks", 1);
+
+    String lowClausePrefix = colName + " >= '";
+    String highClausePrefix = colName + " < '";
+
+    // If there is a common prefix between minString and maxString, establish it
+    // and pull it out of minString and maxString.
+    int maxPrefixLen = Math.min(minString.length(), maxString.length());
+    int sharedLen;
+    for (sharedLen = 0; sharedLen < maxPrefixLen; sharedLen++) {
+      char c1 = minString.charAt(sharedLen);
+      char c2 = maxString.charAt(sharedLen);
+      if (c1 != c2) {
+        break;
+      }
+    }
+
+    // The common prefix has length 'sharedLen'. Extract it from both.
+    String commonPrefix = minString.substring(0, sharedLen);
+    minString = minString.substring(sharedLen);
+    maxString = maxString.substring(sharedLen);
+
+    List<String> splitStrings = split(numSplits, minString, maxString, commonPrefix);
+    List<InputSplit> splits = new ArrayList<InputSplit>();
+
+    // Convert the list of split point strings into an actual set of InputSplits.
+    String start = splitStrings.get(0);
+    for (int i = 1; i < splitStrings.size(); i++) {
+      String end = splitStrings.get(i);
+
+      if (i == splitStrings.size() - 1) {
+        // This is the last one; use a closed interval.
+        splits.add(new DataDrivenDBInputFormat.DataDrivenDBInputSplit(
+            lowClausePrefix + start + "'", colName + " <= '" + end + "'"));
+      } else {
+        // Normal open-interval case.
+        splits.add(new DataDrivenDBInputFormat.DataDrivenDBInputSplit(
+            lowClausePrefix + start + "'", highClausePrefix + end + "'"));
+      }
+    }
+
+    if (minIsNull) {
+      // Add the special null split at the end.
+      splits.add(new DataDrivenDBInputFormat.DataDrivenDBInputSplit(
+          colName + " IS NULL", colName + " IS NULL"));
+    }
+
+    return splits;
+  }
+
+  List<String> split(int numSplits, String minString, String maxString, String commonPrefix)
+      throws SQLException {
+
+    BigDecimal minVal = stringToBigDecimal(minString);
+    BigDecimal maxVal = stringToBigDecimal(maxString);
+
+    List<BigDecimal> splitPoints = split(new BigDecimal(numSplits), minVal, maxVal);
+    List<String> splitStrings = new ArrayList<String>();
+
+    // Convert the BigDecimal splitPoints into their string representations.
+    for (BigDecimal bd : splitPoints) {
+      splitStrings.add(commonPrefix + bigDecimalToString(bd));
+    }
+
+    // Make sure that our user-specified boundaries are the first and last entries
+    // in the array.
+    if (splitStrings.size() == 0 || !splitStrings.get(0).equals(commonPrefix + minString)) {
+      splitStrings.add(0, commonPrefix + minString);
+    }
+    if (splitStrings.size() == 1
+        || !splitStrings.get(splitStrings.size() - 1).equals(commonPrefix + maxString)) {
+      splitStrings.add(commonPrefix + maxString);
+    }
+
+    return splitStrings;
+  }
+
+  private final static BigDecimal ONE_PLACE = new BigDecimal(65536);
+
+  // Maximum number of characters to convert. This is to prevent rounding errors
+  // or repeating fractions near the very bottom from getting out of control. Note
+  // that this still gives us a huge number of possible splits.
+  private final static int MAX_CHARS = 8;
+
+  /**
+   * Return a BigDecimal representation of string 'str' suitable for use
+   * in a numerically-sorting order.
+   */
+  BigDecimal stringToBigDecimal(String str) {
+    BigDecimal result = BigDecimal.ZERO;
+    BigDecimal curPlace = ONE_PLACE; // start with 1/65536 to compute the first digit.
+
+    int len = Math.min(str.length(), MAX_CHARS);
+
+    for (int i = 0; i < len; i++) {
+      int codePoint = str.codePointAt(i);
+      result = result.add(tryDivide(new BigDecimal(codePoint), curPlace));
+      // advance to the next less significant place. e.g., 1/(65536^2) for the second char.
+      curPlace = curPlace.multiply(ONE_PLACE);
+    }
+
+    return result;
+  }
+
+  /**
+   * Return the string encoded in a BigDecimal.
+   * Repeatedly multiply the input value by 65536; the integer portion after such a multiplication
+   * represents a single character in base 65536. Convert that back into a char and create a
+   * string out of these until we have no data left.
+   */
+  String bigDecimalToString(BigDecimal bd) {
+    BigDecimal cur = bd.stripTrailingZeros();
+    StringBuilder sb = new StringBuilder();
+
+    for (int numConverted = 0; numConverted < MAX_CHARS; numConverted++) {
+      cur = cur.multiply(ONE_PLACE);
+      int curCodePoint = cur.intValue();
+      if (0 == curCodePoint) {
+        break;
+      }
+
+      cur = cur.subtract(new BigDecimal(curCodePoint));
+      sb.append(Character.toChars(curCodePoint));
+    }
+
+    return sb.toString();
+  }
+}

Propchange: hadoop/common/branches/branch-1/src/mapred/org/apache/hadoop/mapreduce/lib/db/TextSplitter.java
------------------------------------------------------------------------------
    svn:eol-style = native

Added: hadoop/common/branches/branch-1/src/mapred/org/apache/hadoop/mapreduce/lib/fieldsel/FieldSelectionHelper.java
URL: http://svn.apache.org/viewvc/hadoop/common/branches/branch-1/src/mapred/org/apache/hadoop/mapreduce/lib/fieldsel/FieldSelectionHelper.java?rev=1235548&view=auto
==============================================================================
--- hadoop/common/branches/branch-1/src/mapred/org/apache/hadoop/mapreduce/lib/fieldsel/FieldSelectionHelper.java (added)
+++ hadoop/common/branches/branch-1/src/mapred/org/apache/hadoop/mapreduce/lib/fieldsel/FieldSelectionHelper.java Tue Jan 24 23:21:58 2012
@@ -0,0 +1,227 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.hadoop.mapreduce.lib.fieldsel;
+
+import java.util.List;
+
+import org.apache.hadoop.classification.InterfaceAudience;
+import org.apache.hadoop.classification.InterfaceStability;
+import org.apache.hadoop.io.Text;
+
+/**
+ * This class implements a mapper/reducer class that can be used to perform
+ * field selections in a manner similar to unix cut. The input data is treated
+ * as fields separated by a user specified separator (the default value is
+ * "\t"). The user can specify a list of fields that form the map output keys,
+ * and a list of fields that form the map output values. If the inputformat is
+ * TextInputFormat, the mapper will ignore the key to the map function. and the
+ * fields are from the value only. Otherwise, the fields are the union of those
+ * from the key and those from the value.
+ * 
+ * The field separator is under attribute "mapreduce.fieldsel.data.field.separator"
+ * 
+ * The map output field list spec is under attribute 
+ * "mapreduce.fieldsel.map.output.key.value.fields.spec".
+ * The value is expected to be like "keyFieldsSpec:valueFieldsSpec"
+ * key/valueFieldsSpec are comma (,) separated field spec: fieldSpec,fieldSpec,fieldSpec ...
+ * Each field spec can be a simple number (e.g. 5) specifying a specific field, or a range
+ * (like 2-5) to specify a range of fields, or an open range (like 3-) specifying all 
+ * the fields starting from field 3. The open range field spec applies value fields only.
+ * They have no effect on the key fields.
+ * 
+ * Here is an example: "4,3,0,1:6,5,1-3,7-". It specifies to use fields 4,3,0 and 1 for keys,
+ * and use fields 6,5,1,2,3,7 and above for values.
+ * 
+ * The reduce output field list spec is under attribute 
+ * "mapreduce.fieldsel.reduce.output.key.value.fields.spec".
+ * 
+ * The reducer extracts output key/value pairs in a similar manner, except that
+ * the key is never ignored.
+ * 
+ */
+@InterfaceAudience.Public
+@InterfaceStability.Stable
+public class FieldSelectionHelper {
+
+  public static Text emptyText = new Text("");
+  public static final String DATA_FIELD_SEPERATOR = 
+    "mapreduce.fieldsel.data.field.separator";
+  public static final String MAP_OUTPUT_KEY_VALUE_SPEC = 
+    "mapreduce.fieldsel.map.output.key.value.fields.spec";
+  public static final String REDUCE_OUTPUT_KEY_VALUE_SPEC = 
+    "mapreduce.fieldsel.reduce.output.key.value.fields.spec";
+
+
+  /**
+   * Extract the actual field numbers from the given field specs.
+   * If a field spec is in the form of "n-" (like 3-), then n will be the 
+   * return value. Otherwise, -1 will be returned.  
+   * @param fieldListSpec an array of field specs
+   * @param fieldList an array of field numbers extracted from the specs.
+   * @return number n if some field spec is in the form of "n-", -1 otherwise.
+   */
+  private static int extractFields(String[] fieldListSpec,
+      List<Integer> fieldList) {
+    int allFieldsFrom = -1;
+    int i = 0;
+    int j = 0;
+    int pos = -1;
+    String fieldSpec = null;
+    for (i = 0; i < fieldListSpec.length; i++) {
+      fieldSpec = fieldListSpec[i];
+      if (fieldSpec.length() == 0) {
+        continue;
+      }
+      pos = fieldSpec.indexOf('-');
+      if (pos < 0) {
+        Integer fn = new Integer(fieldSpec);
+        fieldList.add(fn);
+      } else {
+        String start = fieldSpec.substring(0, pos);
+        String end = fieldSpec.substring(pos + 1);
+        if (start.length() == 0) {
+          start = "0";
+        }
+        if (end.length() == 0) {
+          allFieldsFrom = Integer.parseInt(start);
+          continue;
+        }
+        int startPos = Integer.parseInt(start);
+        int endPos = Integer.parseInt(end);
+        for (j = startPos; j <= endPos; j++) {
+          fieldList.add(j);
+        }
+      }
+    }
+    return allFieldsFrom;
+  }
+
+  private static String selectFields(String[] fields, List<Integer> fieldList,
+      int allFieldsFrom, String separator) {
+    String retv = null;
+    int i = 0;
+    StringBuffer sb = null;
+    if (fieldList != null && fieldList.size() > 0) {
+      if (sb == null) {
+        sb = new StringBuffer();
+      }
+      for (Integer index : fieldList) {
+        if (index < fields.length) {
+          sb.append(fields[index]);
+        }
+        sb.append(separator);
+      }
+    }
+    if (allFieldsFrom >= 0) {
+      if (sb == null) {
+        sb = new StringBuffer();
+      }
+      for (i = allFieldsFrom; i < fields.length; i++) {
+        sb.append(fields[i]).append(separator);
+      }
+    }
+    if (sb != null) {
+      retv = sb.toString();
+      if (retv.length() > 0) {
+        retv = retv.substring(0, retv.length() - 1);
+      }
+    }
+    return retv;
+  }
+  
+  public static int parseOutputKeyValueSpec(String keyValueSpec,
+      List<Integer> keyFieldList, List<Integer> valueFieldList) {
+    String[] keyValSpecs = keyValueSpec.split(":", -1);
+    
+    String[] keySpec = keyValSpecs[0].split(",");
+    
+    String[] valSpec = new String[0];
+    if (keyValSpecs.length > 1) {
+      valSpec = keyValSpecs[1].split(",");
+    }
+
+    FieldSelectionHelper.extractFields(keySpec, keyFieldList);
+    return FieldSelectionHelper.extractFields(valSpec, valueFieldList);
+  }
+
+  public static String specToString(String fieldSeparator, String keyValueSpec,
+      int allValueFieldsFrom, List<Integer> keyFieldList,
+      List<Integer> valueFieldList) {
+    StringBuffer sb = new StringBuffer();
+    sb.append("fieldSeparator: ").append(fieldSeparator).append("\n");
+
+    sb.append("keyValueSpec: ").append(keyValueSpec).append("\n");
+    sb.append("allValueFieldsFrom: ").append(allValueFieldsFrom);
+    sb.append("\n");
+    sb.append("keyFieldList.length: ").append(keyFieldList.size());
+    sb.append("\n");
+    for (Integer field : keyFieldList) {
+      sb.append("\t").append(field).append("\n");
+    }
+    sb.append("valueFieldList.length: ").append(valueFieldList.size());
+    sb.append("\n");
+    for (Integer field : valueFieldList) {
+      sb.append("\t").append(field).append("\n");
+    }
+    return sb.toString();
+  }
+
+  private Text key = null;
+  private Text value = null;
+  
+  public FieldSelectionHelper() {
+  }
+
+  public FieldSelectionHelper(Text key, Text val) {
+    this.key = key;
+    this.value = val;
+  }
+  
+  public Text getKey() {
+    return key;
+  }
+ 
+  public Text getValue() {
+    return value;
+  }
+
+  public void extractOutputKeyValue(String key, String val,
+      String fieldSep, List<Integer> keyFieldList, List<Integer> valFieldList,
+      int allValueFieldsFrom, boolean ignoreKey, boolean isMap) {
+    if (!ignoreKey) {
+      val = key + val;
+    }
+    String[] fields = val.split(fieldSep);
+    
+    String newKey = selectFields(fields, keyFieldList, -1, fieldSep);
+    String newVal = selectFields(fields, valFieldList, allValueFieldsFrom,
+      fieldSep);
+    if (isMap && newKey == null) {
+      newKey = newVal;
+      newVal = null;
+    }
+    
+    if (newKey != null) {
+      this.key = new Text(newKey);
+    }
+    if (newVal != null) {
+      this.value = new Text(newVal);
+    }
+  }
+}

Propchange: hadoop/common/branches/branch-1/src/mapred/org/apache/hadoop/mapreduce/lib/fieldsel/FieldSelectionHelper.java
------------------------------------------------------------------------------
    svn:eol-style = native

Added: hadoop/common/branches/branch-1/src/mapred/org/apache/hadoop/mapreduce/lib/fieldsel/FieldSelectionMapper.java
URL: http://svn.apache.org/viewvc/hadoop/common/branches/branch-1/src/mapred/org/apache/hadoop/mapreduce/lib/fieldsel/FieldSelectionMapper.java?rev=1235548&view=auto
==============================================================================
--- hadoop/common/branches/branch-1/src/mapred/org/apache/hadoop/mapreduce/lib/fieldsel/FieldSelectionMapper.java (added)
+++ hadoop/common/branches/branch-1/src/mapred/org/apache/hadoop/mapreduce/lib/fieldsel/FieldSelectionMapper.java Tue Jan 24 23:21:58 2012
@@ -0,0 +1,110 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.hadoop.mapreduce.lib.fieldsel;
+
+import java.io.IOException;
+import java.util.ArrayList;
+import java.util.List;
+
+import org.apache.commons.logging.Log;
+import org.apache.commons.logging.LogFactory;
+import org.apache.hadoop.classification.InterfaceAudience;
+import org.apache.hadoop.classification.InterfaceStability;
+import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.io.Text;
+import org.apache.hadoop.mapreduce.Mapper;
+import org.apache.hadoop.mapreduce.lib.input.TextInputFormat;
+
+/**
+ * This class implements a mapper class that can be used to perform
+ * field selections in a manner similar to unix cut. The input data is treated
+ * as fields separated by a user specified separator (the default value is
+ * "\t"). The user can specify a list of fields that form the map output keys,
+ * and a list of fields that form the map output values. If the inputformat is
+ * TextInputFormat, the mapper will ignore the key to the map function. and the
+ * fields are from the value only. Otherwise, the fields are the union of those
+ * from the key and those from the value.
+ * 
+ * The field separator is under attribute "mapreduce.fieldsel.data.field.separator"
+ * 
+ * The map output field list spec is under attribute 
+ * "mapreduce.fieldsel.map.output.key.value.fields.spec". 
+ * The value is expected to be like
+ * "keyFieldsSpec:valueFieldsSpec" key/valueFieldsSpec are comma (,) separated
+ * field spec: fieldSpec,fieldSpec,fieldSpec ... Each field spec can be a 
+ * simple number (e.g. 5) specifying a specific field, or a range (like 2-5)
+ * to specify a range of fields, or an open range (like 3-) specifying all 
+ * the fields starting from field 3. The open range field spec applies value
+ * fields only. They have no effect on the key fields.
+ * 
+ * Here is an example: "4,3,0,1:6,5,1-3,7-". It specifies to use fields
+ * 4,3,0 and 1 for keys, and use fields 6,5,1,2,3,7 and above for values.
+ */
+@InterfaceAudience.Public
+@InterfaceStability.Stable
+public class FieldSelectionMapper<K, V>
+    extends Mapper<K, V, Text, Text> {
+
+  private String mapOutputKeyValueSpec;
+
+  private boolean ignoreInputKey;
+
+  private String fieldSeparator = "\t";
+
+  private List<Integer> mapOutputKeyFieldList = new ArrayList<Integer>();
+
+  private List<Integer> mapOutputValueFieldList = new ArrayList<Integer>();
+
+  private int allMapValueFieldsFrom = -1;
+
+  public static final Log LOG = LogFactory.getLog("FieldSelectionMapReduce");
+
+  public void setup(Context context) 
+      throws IOException, InterruptedException {
+    Configuration conf = context.getConfiguration();
+    this.fieldSeparator = 
+      conf.get(FieldSelectionHelper.DATA_FIELD_SEPERATOR, "\t");
+    this.mapOutputKeyValueSpec = 
+      conf.get(FieldSelectionHelper.MAP_OUTPUT_KEY_VALUE_SPEC, "0-:");
+    try {
+      this.ignoreInputKey = TextInputFormat.class.getCanonicalName().equals(
+        context.getInputFormatClass().getCanonicalName());
+    } catch (ClassNotFoundException e) {
+      throw new IOException("Input format class not found", e);
+    }
+    allMapValueFieldsFrom = FieldSelectionHelper.parseOutputKeyValueSpec(
+      mapOutputKeyValueSpec, mapOutputKeyFieldList, mapOutputValueFieldList);
+    LOG.info(FieldSelectionHelper.specToString(fieldSeparator,
+      mapOutputKeyValueSpec, allMapValueFieldsFrom, mapOutputKeyFieldList,
+      mapOutputValueFieldList) + "\nignoreInputKey:" + ignoreInputKey);
+  }
+
+  /**
+   * The identify function. Input key/value pair is written directly to output.
+   */
+  public void map(K key, V val, Context context) 
+      throws IOException, InterruptedException {
+    FieldSelectionHelper helper = new FieldSelectionHelper(
+      FieldSelectionHelper.emptyText, FieldSelectionHelper.emptyText);
+    helper.extractOutputKeyValue(key.toString(), val.toString(),
+      fieldSeparator, mapOutputKeyFieldList, mapOutputValueFieldList,
+      allMapValueFieldsFrom, ignoreInputKey, true);
+    context.write(helper.getKey(), helper.getValue());
+  }
+}

Propchange: hadoop/common/branches/branch-1/src/mapred/org/apache/hadoop/mapreduce/lib/fieldsel/FieldSelectionMapper.java
------------------------------------------------------------------------------
    svn:eol-style = native

Added: hadoop/common/branches/branch-1/src/mapred/org/apache/hadoop/mapreduce/lib/fieldsel/FieldSelectionReducer.java
URL: http://svn.apache.org/viewvc/hadoop/common/branches/branch-1/src/mapred/org/apache/hadoop/mapreduce/lib/fieldsel/FieldSelectionReducer.java?rev=1235548&view=auto
==============================================================================
--- hadoop/common/branches/branch-1/src/mapred/org/apache/hadoop/mapreduce/lib/fieldsel/FieldSelectionReducer.java (added)
+++ hadoop/common/branches/branch-1/src/mapred/org/apache/hadoop/mapreduce/lib/fieldsel/FieldSelectionReducer.java Tue Jan 24 23:21:58 2012
@@ -0,0 +1,106 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.hadoop.mapreduce.lib.fieldsel;
+
+import java.io.IOException;
+import java.util.ArrayList;
+import java.util.List;
+
+import org.apache.commons.logging.Log;
+import org.apache.commons.logging.LogFactory;
+import org.apache.hadoop.classification.InterfaceAudience;
+import org.apache.hadoop.classification.InterfaceStability;
+import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.io.Text;
+import org.apache.hadoop.mapreduce.Reducer;
+
+/**
+ * This class implements a reducer class that can be used to perform field
+ * selections in a manner similar to unix cut. 
+ * 
+ * The input data is treated as fields separated by a user specified
+ * separator (the default value is "\t"). The user can specify a list of
+ * fields that form the reduce output keys, and a list of fields that form
+ * the reduce output values. The fields are the union of those from the key
+ * and those from the value.
+ * 
+ * The field separator is under attribute "mapreduce.fieldsel.data.field.separator"
+ * 
+ * The reduce output field list spec is under attribute 
+ * "mapreduce.fieldsel.reduce.output.key.value.fields.spec". 
+ * The value is expected to be like
+ * "keyFieldsSpec:valueFieldsSpec" key/valueFieldsSpec are comma (,) 
+ * separated field spec: fieldSpec,fieldSpec,fieldSpec ... Each field spec
+ * can be a simple number (e.g. 5) specifying a specific field, or a range
+ * (like 2-5) to specify a range of fields, or an open range (like 3-) 
+ * specifying all the fields starting from field 3. The open range field
+ * spec applies value fields only. They have no effect on the key fields.
+ * 
+ * Here is an example: "4,3,0,1:6,5,1-3,7-". It specifies to use fields
+ * 4,3,0 and 1 for keys, and use fields 6,5,1,2,3,7 and above for values.
+ */
+@InterfaceAudience.Public
+@InterfaceStability.Stable
+public class FieldSelectionReducer<K, V>
+    extends Reducer<Text, Text, Text, Text> {
+
+  private String fieldSeparator = "\t";
+
+  private String reduceOutputKeyValueSpec;
+
+  private List<Integer> reduceOutputKeyFieldList = new ArrayList<Integer>();
+
+  private List<Integer> reduceOutputValueFieldList = new ArrayList<Integer>();
+
+  private int allReduceValueFieldsFrom = -1;
+
+  public static final Log LOG = LogFactory.getLog("FieldSelectionMapReduce");
+
+  public void setup(Context context) 
+      throws IOException, InterruptedException {
+    Configuration conf = context.getConfiguration();
+    
+    this.fieldSeparator = 
+      conf.get(FieldSelectionHelper.DATA_FIELD_SEPERATOR, "\t");
+    
+    this.reduceOutputKeyValueSpec = 
+      conf.get(FieldSelectionHelper.REDUCE_OUTPUT_KEY_VALUE_SPEC, "0-:");
+    
+    allReduceValueFieldsFrom = FieldSelectionHelper.parseOutputKeyValueSpec(
+      reduceOutputKeyValueSpec, reduceOutputKeyFieldList,
+      reduceOutputValueFieldList);
+
+    LOG.info(FieldSelectionHelper.specToString(fieldSeparator,
+      reduceOutputKeyValueSpec, allReduceValueFieldsFrom,
+      reduceOutputKeyFieldList, reduceOutputValueFieldList));
+  }
+
+  public void reduce(Text key, Iterable<Text> values, Context context)
+      throws IOException, InterruptedException {
+    String keyStr = key.toString() + this.fieldSeparator;
+    
+    for (Text val : values) {
+      FieldSelectionHelper helper = new FieldSelectionHelper();
+      helper.extractOutputKeyValue(keyStr, val.toString(),
+        fieldSeparator, reduceOutputKeyFieldList,
+        reduceOutputValueFieldList, allReduceValueFieldsFrom, false, false);
+      context.write(helper.getKey(), helper.getValue());
+    }
+  }
+}

Propchange: hadoop/common/branches/branch-1/src/mapred/org/apache/hadoop/mapreduce/lib/fieldsel/FieldSelectionReducer.java
------------------------------------------------------------------------------
    svn:eol-style = native