You are viewing a plain text version of this content. The canonical link for it is here.
Posted to common-commits@hadoop.apache.org by to...@apache.org on 2012/01/25 00:22:01 UTC
svn commit: r1235548 [2/8] - in /hadoop/common/branches/branch-1: ./
src/core/org/apache/hadoop/conf/ src/core/org/apache/hadoop/io/
src/mapred/org/apache/hadoop/mapreduce/
src/mapred/org/apache/hadoop/mapreduce/lib/db/
src/mapred/org/apache/hadoop/map...
Added: hadoop/common/branches/branch-1/src/mapred/org/apache/hadoop/mapreduce/lib/db/DataDrivenDBRecordReader.java
URL: http://svn.apache.org/viewvc/hadoop/common/branches/branch-1/src/mapred/org/apache/hadoop/mapreduce/lib/db/DataDrivenDBRecordReader.java?rev=1235548&view=auto
==============================================================================
--- hadoop/common/branches/branch-1/src/mapred/org/apache/hadoop/mapreduce/lib/db/DataDrivenDBRecordReader.java (added)
+++ hadoop/common/branches/branch-1/src/mapred/org/apache/hadoop/mapreduce/lib/db/DataDrivenDBRecordReader.java Tue Jan 24 23:21:58 2012
@@ -0,0 +1,137 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.hadoop.mapreduce.lib.db;
+
+import java.io.DataInput;
+import java.io.DataOutput;
+import java.io.IOException;
+import java.sql.Connection;
+import java.sql.DatabaseMetaData;
+import java.sql.PreparedStatement;
+import java.sql.ResultSet;
+import java.sql.SQLException;
+import java.sql.Statement;
+import java.util.ArrayList;
+import java.util.List;
+
+import org.apache.commons.logging.Log;
+import org.apache.commons.logging.LogFactory;
+import org.apache.hadoop.io.LongWritable;
+import org.apache.hadoop.io.Writable;
+import org.apache.hadoop.mapreduce.InputFormat;
+import org.apache.hadoop.mapreduce.InputSplit;
+import org.apache.hadoop.mapreduce.Job;
+import org.apache.hadoop.mapreduce.JobContext;
+import org.apache.hadoop.mapreduce.RecordReader;
+import org.apache.hadoop.mapreduce.TaskAttemptContext;
+import org.apache.hadoop.util.ReflectionUtils;
+import org.apache.hadoop.classification.InterfaceAudience;
+import org.apache.hadoop.classification.InterfaceStability;
+import org.apache.hadoop.conf.Configurable;
+import org.apache.hadoop.conf.Configuration;
+
+/**
+ * A RecordReader that reads records from a SQL table,
+ * using data-driven WHERE clause splits.
+ * Emits LongWritables containing the record number as
+ * key and DBWritables as value.
+ */
+@InterfaceAudience.Public
+@InterfaceStability.Evolving
+public class DataDrivenDBRecordReader<T extends DBWritable> extends DBRecordReader<T> {
+
+ private static final Log LOG = LogFactory.getLog(DataDrivenDBRecordReader.class);
+
+ private String dbProductName; // database manufacturer string.
+
+ /**
+ * @param split The InputSplit to read data for
+ * @throws SQLException
+ */
+ public DataDrivenDBRecordReader(DBInputFormat.DBInputSplit split,
+ Class<T> inputClass, Configuration conf, Connection conn, DBConfiguration dbConfig,
+ String cond, String [] fields, String table, String dbProduct)
+ throws SQLException {
+ super(split, inputClass, conf, conn, dbConfig, cond, fields, table);
+ this.dbProductName = dbProduct;
+ }
+
+ /** Returns the query for selecting the records,
+ * subclasses can override this for custom behaviour.*/
+ @SuppressWarnings("unchecked")
+ protected String getSelectQuery() {
+ StringBuilder query = new StringBuilder();
+ DataDrivenDBInputFormat.DataDrivenDBInputSplit dataSplit =
+ (DataDrivenDBInputFormat.DataDrivenDBInputSplit) getSplit();
+ DBConfiguration dbConf = getDBConf();
+ String [] fieldNames = getFieldNames();
+ String tableName = getTableName();
+ String conditions = getConditions();
+
+ // Build the WHERE clauses associated with the data split first.
+ // We need them in both branches of this function.
+ StringBuilder conditionClauses = new StringBuilder();
+ conditionClauses.append("( ").append(dataSplit.getLowerClause());
+ conditionClauses.append(" ) AND ( ").append(dataSplit.getUpperClause());
+ conditionClauses.append(" )");
+
+ if(dbConf.getInputQuery() == null) {
+ // We need to generate the entire query.
+ query.append("SELECT ");
+
+ for (int i = 0; i < fieldNames.length; i++) {
+ query.append(fieldNames[i]);
+ if (i != fieldNames.length -1) {
+ query.append(", ");
+ }
+ }
+
+ query.append(" FROM ").append(tableName);
+ if (!dbProductName.startsWith("ORACLE")) {
+ // Seems to be necessary for hsqldb? Oracle explicitly does *not*
+ // use this clause.
+ query.append(" AS ").append(tableName);
+ }
+ query.append(" WHERE ");
+ if (conditions != null && conditions.length() > 0) {
+ // Put the user's conditions first.
+ query.append("( ").append(conditions).append(" ) AND ");
+ }
+
+ // Now append the conditions associated with our split.
+ query.append(conditionClauses.toString());
+
+ } else {
+ // User provided the query. We replace the special token with our WHERE clause.
+ String inputQuery = dbConf.getInputQuery();
+ if (inputQuery.indexOf(DataDrivenDBInputFormat.SUBSTITUTE_TOKEN) == -1) {
+ LOG.error("Could not find the clause substitution token "
+ + DataDrivenDBInputFormat.SUBSTITUTE_TOKEN + " in the query: ["
+ + inputQuery + "]. Parallel splits may not work correctly.");
+ }
+
+ query.append(inputQuery.replace(DataDrivenDBInputFormat.SUBSTITUTE_TOKEN,
+ conditionClauses.toString()));
+ }
+
+ LOG.debug("Using query: " + query.toString());
+
+ return query.toString();
+ }
+}
Propchange: hadoop/common/branches/branch-1/src/mapred/org/apache/hadoop/mapreduce/lib/db/DataDrivenDBRecordReader.java
------------------------------------------------------------------------------
svn:eol-style = native
Added: hadoop/common/branches/branch-1/src/mapred/org/apache/hadoop/mapreduce/lib/db/DateSplitter.java
URL: http://svn.apache.org/viewvc/hadoop/common/branches/branch-1/src/mapred/org/apache/hadoop/mapreduce/lib/db/DateSplitter.java?rev=1235548&view=auto
==============================================================================
--- hadoop/common/branches/branch-1/src/mapred/org/apache/hadoop/mapreduce/lib/db/DateSplitter.java (added)
+++ hadoop/common/branches/branch-1/src/mapred/org/apache/hadoop/mapreduce/lib/db/DateSplitter.java Tue Jan 24 23:21:58 2012
@@ -0,0 +1,177 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.hadoop.mapreduce.lib.db;
+
+import java.sql.ResultSet;
+import java.sql.SQLException;
+import java.sql.Time;
+import java.sql.Timestamp;
+import java.sql.Types;
+import java.util.ArrayList;
+import java.util.Date;
+import java.util.List;
+
+import org.apache.commons.logging.Log;
+import org.apache.commons.logging.LogFactory;
+
+import org.apache.hadoop.classification.InterfaceAudience;
+import org.apache.hadoop.classification.InterfaceStability;
+import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.mapreduce.InputSplit;
+
+/**
+ * Implement DBSplitter over date/time values.
+ * Make use of logic from IntegerSplitter, since date/time are just longs
+ * in Java.
+ */
+@InterfaceAudience.Public
+@InterfaceStability.Evolving
+public class DateSplitter extends IntegerSplitter {
+
+ private static final Log LOG = LogFactory.getLog(DateSplitter.class);
+
+ public List<InputSplit> split(Configuration conf, ResultSet results, String colName)
+ throws SQLException {
+
+ long minVal;
+ long maxVal;
+
+ int sqlDataType = results.getMetaData().getColumnType(1);
+ minVal = resultSetColToLong(results, 1, sqlDataType);
+ maxVal = resultSetColToLong(results, 2, sqlDataType);
+
+ String lowClausePrefix = colName + " >= ";
+ String highClausePrefix = colName + " < ";
+
+ int numSplits = conf.getInt("mapred.map.tasks", 1);
+ if (numSplits < 1) {
+ numSplits = 1;
+ }
+
+ if (minVal == Long.MIN_VALUE && maxVal == Long.MIN_VALUE) {
+ // The range of acceptable dates is NULL to NULL. Just create a single split.
+ List<InputSplit> splits = new ArrayList<InputSplit>();
+ splits.add(new DataDrivenDBInputFormat.DataDrivenDBInputSplit(
+ colName + " IS NULL", colName + " IS NULL"));
+ return splits;
+ }
+
+ // Gather the split point integers
+ List<Long> splitPoints = split(numSplits, minVal, maxVal);
+ List<InputSplit> splits = new ArrayList<InputSplit>();
+
+ // Turn the split points into a set of intervals.
+ long start = splitPoints.get(0);
+ Date startDate = longToDate(start, sqlDataType);
+ if (sqlDataType == Types.TIMESTAMP) {
+ // The lower bound's nanos value needs to match the actual lower-bound nanos.
+ try {
+ ((java.sql.Timestamp) startDate).setNanos(results.getTimestamp(1).getNanos());
+ } catch (NullPointerException npe) {
+ // If the lower bound was NULL, we'll get an NPE; just ignore it and don't set nanos.
+ }
+ }
+
+ for (int i = 1; i < splitPoints.size(); i++) {
+ long end = splitPoints.get(i);
+ Date endDate = longToDate(end, sqlDataType);
+
+ if (i == splitPoints.size() - 1) {
+ if (sqlDataType == Types.TIMESTAMP) {
+ // The upper bound's nanos value needs to match the actual upper-bound nanos.
+ try {
+ ((java.sql.Timestamp) endDate).setNanos(results.getTimestamp(2).getNanos());
+ } catch (NullPointerException npe) {
+ // If the upper bound was NULL, we'll get an NPE; just ignore it and don't set nanos.
+ }
+ }
+ // This is the last one; use a closed interval.
+ splits.add(new DataDrivenDBInputFormat.DataDrivenDBInputSplit(
+ lowClausePrefix + dateToString(startDate),
+ colName + " <= " + dateToString(endDate)));
+ } else {
+ // Normal open-interval case.
+ splits.add(new DataDrivenDBInputFormat.DataDrivenDBInputSplit(
+ lowClausePrefix + dateToString(startDate),
+ highClausePrefix + dateToString(endDate)));
+ }
+
+ start = end;
+ startDate = endDate;
+ }
+
+ if (minVal == Long.MIN_VALUE || maxVal == Long.MIN_VALUE) {
+ // Add an extra split to handle the null case that we saw.
+ splits.add(new DataDrivenDBInputFormat.DataDrivenDBInputSplit(
+ colName + " IS NULL", colName + " IS NULL"));
+ }
+
+ return splits;
+ }
+
+ /** Retrieve the value from the column in a type-appropriate manner and return
+ its timestamp since the epoch. If the column is null, then return Long.MIN_VALUE.
+ This will cause a special split to be generated for the NULL case, but may also
+ cause poorly-balanced splits if most of the actual dates are positive time
+ since the epoch, etc.
+ */
+ private long resultSetColToLong(ResultSet rs, int colNum, int sqlDataType) throws SQLException {
+ try {
+ switch (sqlDataType) {
+ case Types.DATE:
+ return rs.getDate(colNum).getTime();
+ case Types.TIME:
+ return rs.getTime(colNum).getTime();
+ case Types.TIMESTAMP:
+ return rs.getTimestamp(colNum).getTime();
+ default:
+ throw new SQLException("Not a date-type field");
+ }
+ } catch (NullPointerException npe) {
+ // null column. return minimum long value.
+ LOG.warn("Encountered a NULL date in the split column. Splits may be poorly balanced.");
+ return Long.MIN_VALUE;
+ }
+ }
+
+ /** Parse the long-valued timestamp into the appropriate SQL date type. */
+ private Date longToDate(long val, int sqlDataType) {
+ switch (sqlDataType) {
+ case Types.DATE:
+ return new java.sql.Date(val);
+ case Types.TIME:
+ return new java.sql.Time(val);
+ case Types.TIMESTAMP:
+ return new java.sql.Timestamp(val);
+ default: // Shouldn't ever hit this case.
+ return null;
+ }
+ }
+
+ /**
+ * Given a Date 'd', format it as a string for use in a SQL date
+ * comparison operation.
+ * @param d the date to format.
+ * @return the string representing this date in SQL with any appropriate
+ * quotation characters, etc.
+ */
+ protected String dateToString(Date d) {
+ return "'" + d.toString() + "'";
+ }
+}
Propchange: hadoop/common/branches/branch-1/src/mapred/org/apache/hadoop/mapreduce/lib/db/DateSplitter.java
------------------------------------------------------------------------------
svn:eol-style = native
Added: hadoop/common/branches/branch-1/src/mapred/org/apache/hadoop/mapreduce/lib/db/FloatSplitter.java
URL: http://svn.apache.org/viewvc/hadoop/common/branches/branch-1/src/mapred/org/apache/hadoop/mapreduce/lib/db/FloatSplitter.java?rev=1235548&view=auto
==============================================================================
--- hadoop/common/branches/branch-1/src/mapred/org/apache/hadoop/mapreduce/lib/db/FloatSplitter.java (added)
+++ hadoop/common/branches/branch-1/src/mapred/org/apache/hadoop/mapreduce/lib/db/FloatSplitter.java Tue Jan 24 23:21:58 2012
@@ -0,0 +1,104 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.hadoop.mapreduce.lib.db;
+
+import java.sql.ResultSet;
+import java.sql.SQLException;
+import java.util.ArrayList;
+import java.util.List;
+
+import org.apache.commons.logging.Log;
+import org.apache.commons.logging.LogFactory;
+
+import org.apache.hadoop.classification.InterfaceAudience;
+import org.apache.hadoop.classification.InterfaceStability;
+import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.mapreduce.InputSplit;
+
+/**
+ * Implement DBSplitter over floating-point values.
+ */
+@InterfaceAudience.Public
+@InterfaceStability.Evolving
+public class FloatSplitter implements DBSplitter {
+
+ private static final Log LOG = LogFactory.getLog(FloatSplitter.class);
+
+ private static final double MIN_INCREMENT = 10000 * Double.MIN_VALUE;
+
+ public List<InputSplit> split(Configuration conf, ResultSet results, String colName)
+ throws SQLException {
+
+ LOG.warn("Generating splits for a floating-point index column. Due to the");
+ LOG.warn("imprecise representation of floating-point values in Java, this");
+ LOG.warn("may result in an incomplete import.");
+ LOG.warn("You are strongly encouraged to choose an integral split column.");
+
+ List<InputSplit> splits = new ArrayList<InputSplit>();
+
+ if (results.getString(1) == null && results.getString(2) == null) {
+ // Range is null to null. Return a null split accordingly.
+ splits.add(new DataDrivenDBInputFormat.DataDrivenDBInputSplit(
+ colName + " IS NULL", colName + " IS NULL"));
+ return splits;
+ }
+
+ double minVal = results.getDouble(1);
+ double maxVal = results.getDouble(2);
+
+ // Use this as a hint. May need an extra task if the size doesn't
+ // divide cleanly.
+ int numSplits = conf.getInt("mapred.map.tasks", 1);
+ double splitSize = (maxVal - minVal) / (double) numSplits;
+
+ if (splitSize < MIN_INCREMENT) {
+ splitSize = MIN_INCREMENT;
+ }
+
+ String lowClausePrefix = colName + " >= ";
+ String highClausePrefix = colName + " < ";
+
+ double curLower = minVal;
+ double curUpper = curLower + splitSize;
+
+ while (curUpper < maxVal) {
+ splits.add(new DataDrivenDBInputFormat.DataDrivenDBInputSplit(
+ lowClausePrefix + Double.toString(curLower),
+ highClausePrefix + Double.toString(curUpper)));
+
+ curLower = curUpper;
+ curUpper += splitSize;
+ }
+
+ // Catch any overage and create the closed interval for the last split.
+ if (curLower <= maxVal || splits.size() == 1) {
+ splits.add(new DataDrivenDBInputFormat.DataDrivenDBInputSplit(
+ lowClausePrefix + Double.toString(curUpper),
+ colName + " <= " + Double.toString(maxVal)));
+ }
+
+ if (results.getString(1) == null || results.getString(2) == null) {
+ // At least one extrema is null; add a null split.
+ splits.add(new DataDrivenDBInputFormat.DataDrivenDBInputSplit(
+ colName + " IS NULL", colName + " IS NULL"));
+ }
+
+ return splits;
+ }
+}
Propchange: hadoop/common/branches/branch-1/src/mapred/org/apache/hadoop/mapreduce/lib/db/FloatSplitter.java
------------------------------------------------------------------------------
svn:eol-style = native
Added: hadoop/common/branches/branch-1/src/mapred/org/apache/hadoop/mapreduce/lib/db/IntegerSplitter.java
URL: http://svn.apache.org/viewvc/hadoop/common/branches/branch-1/src/mapred/org/apache/hadoop/mapreduce/lib/db/IntegerSplitter.java?rev=1235548&view=auto
==============================================================================
--- hadoop/common/branches/branch-1/src/mapred/org/apache/hadoop/mapreduce/lib/db/IntegerSplitter.java (added)
+++ hadoop/common/branches/branch-1/src/mapred/org/apache/hadoop/mapreduce/lib/db/IntegerSplitter.java Tue Jan 24 23:21:58 2012
@@ -0,0 +1,131 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.hadoop.mapreduce.lib.db;
+
+import java.sql.ResultSet;
+import java.sql.SQLException;
+import java.util.ArrayList;
+import java.util.List;
+
+import org.apache.hadoop.classification.InterfaceAudience;
+import org.apache.hadoop.classification.InterfaceStability;
+import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.mapreduce.InputSplit;
+
+/**
+ * Implement DBSplitter over integer values.
+ */
+@InterfaceAudience.Public
+@InterfaceStability.Evolving
+public class IntegerSplitter implements DBSplitter {
+ public List<InputSplit> split(Configuration conf, ResultSet results, String colName)
+ throws SQLException {
+
+ long minVal = results.getLong(1);
+ long maxVal = results.getLong(2);
+
+ String lowClausePrefix = colName + " >= ";
+ String highClausePrefix = colName + " < ";
+
+ int numSplits = conf.getInt("mapred.map.tasks", 1);
+ if (numSplits < 1) {
+ numSplits = 1;
+ }
+
+ if (results.getString(1) == null && results.getString(2) == null) {
+ // Range is null to null. Return a null split accordingly.
+ List<InputSplit> splits = new ArrayList<InputSplit>();
+ splits.add(new DataDrivenDBInputFormat.DataDrivenDBInputSplit(
+ colName + " IS NULL", colName + " IS NULL"));
+ return splits;
+ }
+
+ // Get all the split points together.
+ List<Long> splitPoints = split(numSplits, minVal, maxVal);
+ List<InputSplit> splits = new ArrayList<InputSplit>();
+
+ // Turn the split points into a set of intervals.
+ long start = splitPoints.get(0);
+ for (int i = 1; i < splitPoints.size(); i++) {
+ long end = splitPoints.get(i);
+
+ if (i == splitPoints.size() - 1) {
+ // This is the last one; use a closed interval.
+ splits.add(new DataDrivenDBInputFormat.DataDrivenDBInputSplit(
+ lowClausePrefix + Long.toString(start),
+ colName + " <= " + Long.toString(end)));
+ } else {
+ // Normal open-interval case.
+ splits.add(new DataDrivenDBInputFormat.DataDrivenDBInputSplit(
+ lowClausePrefix + Long.toString(start),
+ highClausePrefix + Long.toString(end)));
+ }
+
+ start = end;
+ }
+
+ if (results.getString(1) == null || results.getString(2) == null) {
+ // At least one extrema is null; add a null split.
+ splits.add(new DataDrivenDBInputFormat.DataDrivenDBInputSplit(
+ colName + " IS NULL", colName + " IS NULL"));
+ }
+
+ return splits;
+ }
+
+ /**
+ * Returns a list of longs one element longer than the list of input splits.
+ * This represents the boundaries between input splits.
+ * All splits are open on the top end, except the last one.
+ *
+ * So the list [0, 5, 8, 12, 18] would represent splits capturing the intervals:
+ *
+ * [0, 5)
+ * [5, 8)
+ * [8, 12)
+ * [12, 18] note the closed interval for the last split.
+ */
+ List<Long> split(long numSplits, long minVal, long maxVal)
+ throws SQLException {
+
+ List<Long> splits = new ArrayList<Long>();
+
+ // Use numSplits as a hint. May need an extra task if the size doesn't
+ // divide cleanly.
+
+ long splitSize = (maxVal - minVal) / numSplits;
+ if (splitSize < 1) {
+ splitSize = 1;
+ }
+
+ long curVal = minVal;
+
+ while (curVal <= maxVal) {
+ splits.add(curVal);
+ curVal += splitSize;
+ }
+
+ if (splits.get(splits.size() - 1) != maxVal || splits.size() == 1) {
+ // We didn't end on the maxVal. Add that to the end of the list.
+ splits.add(maxVal);
+ }
+
+ return splits;
+ }
+}
Propchange: hadoop/common/branches/branch-1/src/mapred/org/apache/hadoop/mapreduce/lib/db/IntegerSplitter.java
------------------------------------------------------------------------------
svn:eol-style = native
Added: hadoop/common/branches/branch-1/src/mapred/org/apache/hadoop/mapreduce/lib/db/MySQLDBRecordReader.java
URL: http://svn.apache.org/viewvc/hadoop/common/branches/branch-1/src/mapred/org/apache/hadoop/mapreduce/lib/db/MySQLDBRecordReader.java?rev=1235548&view=auto
==============================================================================
--- hadoop/common/branches/branch-1/src/mapred/org/apache/hadoop/mapreduce/lib/db/MySQLDBRecordReader.java (added)
+++ hadoop/common/branches/branch-1/src/mapred/org/apache/hadoop/mapreduce/lib/db/MySQLDBRecordReader.java Tue Jan 24 23:21:58 2012
@@ -0,0 +1,50 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.hadoop.mapreduce.lib.db;
+
+import java.sql.Connection;
+import java.sql.ResultSet;
+import java.sql.PreparedStatement;
+import java.sql.SQLException;
+
+import org.apache.hadoop.classification.InterfaceAudience;
+import org.apache.hadoop.classification.InterfaceStability;
+import org.apache.hadoop.conf.Configuration;
+
+/**
+ * A RecordReader that reads records from a MySQL table.
+ */
+@InterfaceAudience.Public
+@InterfaceStability.Evolving
+public class MySQLDBRecordReader<T extends DBWritable> extends DBRecordReader<T> {
+
+ public MySQLDBRecordReader(DBInputFormat.DBInputSplit split,
+ Class<T> inputClass, Configuration conf, Connection conn, DBConfiguration dbConfig,
+ String cond, String [] fields, String table) throws SQLException {
+ super(split, inputClass, conf, conn, dbConfig, cond, fields, table);
+ }
+
+ // Execute statements for mysql in unbuffered mode.
+ protected ResultSet executeQuery(String query) throws SQLException {
+ statement = getConnection().prepareStatement(query,
+ ResultSet.TYPE_FORWARD_ONLY, ResultSet.CONCUR_READ_ONLY);
+ statement.setFetchSize(Integer.MIN_VALUE); // MySQL: read row-at-a-time.
+ return statement.executeQuery();
+ }
+}
Propchange: hadoop/common/branches/branch-1/src/mapred/org/apache/hadoop/mapreduce/lib/db/MySQLDBRecordReader.java
------------------------------------------------------------------------------
svn:eol-style = native
Added: hadoop/common/branches/branch-1/src/mapred/org/apache/hadoop/mapreduce/lib/db/MySQLDataDrivenDBRecordReader.java
URL: http://svn.apache.org/viewvc/hadoop/common/branches/branch-1/src/mapred/org/apache/hadoop/mapreduce/lib/db/MySQLDataDrivenDBRecordReader.java?rev=1235548&view=auto
==============================================================================
--- hadoop/common/branches/branch-1/src/mapred/org/apache/hadoop/mapreduce/lib/db/MySQLDataDrivenDBRecordReader.java (added)
+++ hadoop/common/branches/branch-1/src/mapred/org/apache/hadoop/mapreduce/lib/db/MySQLDataDrivenDBRecordReader.java Tue Jan 24 23:21:58 2012
@@ -0,0 +1,51 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.hadoop.mapreduce.lib.db;
+
+import java.sql.Connection;
+import java.sql.ResultSet;
+import java.sql.PreparedStatement;
+import java.sql.SQLException;
+
+import org.apache.hadoop.classification.InterfaceAudience;
+import org.apache.hadoop.classification.InterfaceStability;
+import org.apache.hadoop.conf.Configuration;
+
+/**
+ * A RecordReader that reads records from a MySQL table via DataDrivenDBRecordReader
+ */
+@InterfaceAudience.Public
+@InterfaceStability.Evolving
+public class MySQLDataDrivenDBRecordReader<T extends DBWritable>
+ extends DataDrivenDBRecordReader<T> {
+
+ public MySQLDataDrivenDBRecordReader(DBInputFormat.DBInputSplit split,
+ Class<T> inputClass, Configuration conf, Connection conn, DBConfiguration dbConfig,
+ String cond, String [] fields, String table) throws SQLException {
+ super(split, inputClass, conf, conn, dbConfig, cond, fields, table, "MYSQL");
+ }
+
+ // Execute statements for mysql in unbuffered mode.
+ protected ResultSet executeQuery(String query) throws SQLException {
+ statement = getConnection().prepareStatement(query,
+ ResultSet.TYPE_FORWARD_ONLY, ResultSet.CONCUR_READ_ONLY);
+ statement.setFetchSize(Integer.MIN_VALUE); // MySQL: read row-at-a-time.
+ return statement.executeQuery();
+ }
+}
Propchange: hadoop/common/branches/branch-1/src/mapred/org/apache/hadoop/mapreduce/lib/db/MySQLDataDrivenDBRecordReader.java
------------------------------------------------------------------------------
svn:eol-style = native
Added: hadoop/common/branches/branch-1/src/mapred/org/apache/hadoop/mapreduce/lib/db/OracleDBRecordReader.java
URL: http://svn.apache.org/viewvc/hadoop/common/branches/branch-1/src/mapred/org/apache/hadoop/mapreduce/lib/db/OracleDBRecordReader.java?rev=1235548&view=auto
==============================================================================
--- hadoop/common/branches/branch-1/src/mapred/org/apache/hadoop/mapreduce/lib/db/OracleDBRecordReader.java (added)
+++ hadoop/common/branches/branch-1/src/mapred/org/apache/hadoop/mapreduce/lib/db/OracleDBRecordReader.java Tue Jan 24 23:21:58 2012
@@ -0,0 +1,145 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.hadoop.mapreduce.lib.db;
+
+import java.io.IOException;
+import java.sql.Connection;
+import java.sql.SQLException;
+import java.lang.reflect.Method;
+
+import org.apache.hadoop.classification.InterfaceAudience;
+import org.apache.hadoop.classification.InterfaceStability;
+import org.apache.hadoop.conf.Configuration;
+import org.apache.commons.logging.Log;
+import org.apache.commons.logging.LogFactory;
+
+/**
+ * A RecordReader that reads records from an Oracle SQL table.
+ */
+@InterfaceAudience.Public
+@InterfaceStability.Evolving
+public class OracleDBRecordReader<T extends DBWritable> extends DBRecordReader<T> {
+
+ /** Configuration key to set to a timezone string. */
+ public static final String SESSION_TIMEZONE_KEY = "oracle.sessionTimeZone";
+
+ private static final Log LOG = LogFactory.getLog(OracleDBRecordReader.class);
+
+ public OracleDBRecordReader(DBInputFormat.DBInputSplit split,
+ Class<T> inputClass, Configuration conf, Connection conn, DBConfiguration dbConfig,
+ String cond, String [] fields, String table) throws SQLException {
+ super(split, inputClass, conf, conn, dbConfig, cond, fields, table);
+ setSessionTimeZone(conf, conn);
+ }
+
+ /** Returns the query for selecting the records from an Oracle DB. */
+ protected String getSelectQuery() {
+ StringBuilder query = new StringBuilder();
+ DBConfiguration dbConf = getDBConf();
+ String conditions = getConditions();
+ String tableName = getTableName();
+ String [] fieldNames = getFieldNames();
+
+ // Oracle-specific codepath to use rownum instead of LIMIT/OFFSET.
+ if(dbConf.getInputQuery() == null) {
+ query.append("SELECT ");
+
+ for (int i = 0; i < fieldNames.length; i++) {
+ query.append(fieldNames[i]);
+ if (i != fieldNames.length -1) {
+ query.append(", ");
+ }
+ }
+
+ query.append(" FROM ").append(tableName);
+ if (conditions != null && conditions.length() > 0)
+ query.append(" WHERE ").append(conditions);
+ String orderBy = dbConf.getInputOrderBy();
+ if (orderBy != null && orderBy.length() > 0) {
+ query.append(" ORDER BY ").append(orderBy);
+ }
+ } else {
+ //PREBUILT QUERY
+ query.append(dbConf.getInputQuery());
+ }
+
+ try {
+ DBInputFormat.DBInputSplit split = getSplit();
+ if (split.getLength() > 0 && split.getStart() > 0){
+ String querystring = query.toString();
+
+ query = new StringBuilder();
+ query.append("SELECT * FROM (SELECT a.*,ROWNUM dbif_rno FROM ( ");
+ query.append(querystring);
+ query.append(" ) a WHERE rownum <= ").append(split.getStart());
+ query.append(" + ").append(split.getLength());
+ query.append(" ) WHERE dbif_rno >= ").append(split.getStart());
+ }
+ } catch (IOException ex) {
+ // ignore, will not throw.
+ }
+
+ return query.toString();
+ }
+
+ /**
+ * Set session time zone
+ * @param conf The current configuration.
+ * We read the 'oracle.sessionTimeZone' property from here.
+ * @param conn The connection to alter the timezone properties of.
+ */
+ public static void setSessionTimeZone(Configuration conf,
+ Connection conn) throws SQLException {
+ // need to use reflection to call the method setSessionTimeZone on
+ // the OracleConnection class because oracle specific java libraries are
+ // not accessible in this context.
+ Method method;
+ try {
+ method = conn.getClass().getMethod(
+ "setSessionTimeZone", new Class [] {String.class});
+ } catch (Exception ex) {
+ LOG.error("Could not find method setSessionTimeZone in " + conn.getClass().getName(), ex);
+ // rethrow SQLException
+ throw new SQLException(ex);
+ }
+
+ // Need to set the time zone in order for Java
+ // to correctly access the column "TIMESTAMP WITH LOCAL TIME ZONE".
+ // We can't easily get the correct Oracle-specific timezone string
+ // from Java; just let the user set the timezone in a property.
+ String clientTimeZone = conf.get(SESSION_TIMEZONE_KEY, "GMT");
+ try {
+ method.setAccessible(true);
+ method.invoke(conn, clientTimeZone);
+ LOG.info("Time zone has been set to " + clientTimeZone);
+ } catch (Exception ex) {
+ LOG.warn("Time zone " + clientTimeZone +
+ " could not be set on Oracle database.");
+ LOG.warn("Setting default time zone: GMT");
+ try {
+ // "GMT" timezone is guaranteed to exist.
+ method.invoke(conn, "GMT");
+ } catch (Exception ex2) {
+ LOG.error("Could not set time zone for oracle connection", ex2);
+ // rethrow SQLException
+ throw new SQLException(ex);
+ }
+ }
+ }
+}
Propchange: hadoop/common/branches/branch-1/src/mapred/org/apache/hadoop/mapreduce/lib/db/OracleDBRecordReader.java
------------------------------------------------------------------------------
svn:eol-style = native
Added: hadoop/common/branches/branch-1/src/mapred/org/apache/hadoop/mapreduce/lib/db/OracleDataDrivenDBInputFormat.java
URL: http://svn.apache.org/viewvc/hadoop/common/branches/branch-1/src/mapred/org/apache/hadoop/mapreduce/lib/db/OracleDataDrivenDBInputFormat.java?rev=1235548&view=auto
==============================================================================
--- hadoop/common/branches/branch-1/src/mapred/org/apache/hadoop/mapreduce/lib/db/OracleDataDrivenDBInputFormat.java (added)
+++ hadoop/common/branches/branch-1/src/mapred/org/apache/hadoop/mapreduce/lib/db/OracleDataDrivenDBInputFormat.java Tue Jan 24 23:21:58 2012
@@ -0,0 +1,93 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.hadoop.mapreduce.lib.db;
+
+import java.io.DataInput;
+import java.io.DataOutput;
+import java.io.IOException;
+import java.sql.Connection;
+import java.sql.DatabaseMetaData;
+import java.sql.PreparedStatement;
+import java.sql.ResultSet;
+import java.sql.SQLException;
+import java.sql.Statement;
+import java.sql.Types;
+import java.util.ArrayList;
+import java.util.List;
+
+import org.apache.commons.logging.Log;
+import org.apache.commons.logging.LogFactory;
+
+import org.apache.hadoop.io.LongWritable;
+import org.apache.hadoop.io.Text;
+import org.apache.hadoop.io.Writable;
+import org.apache.hadoop.mapreduce.InputFormat;
+import org.apache.hadoop.mapreduce.InputSplit;
+import org.apache.hadoop.mapreduce.Job;
+import org.apache.hadoop.mapreduce.JobContext;
+import org.apache.hadoop.mapreduce.RecordReader;
+import org.apache.hadoop.mapreduce.TaskAttemptContext;
+import org.apache.hadoop.util.ReflectionUtils;
+import org.apache.hadoop.classification.InterfaceAudience;
+import org.apache.hadoop.classification.InterfaceStability;
+import org.apache.hadoop.conf.Configurable;
+import org.apache.hadoop.conf.Configuration;
+
+/**
+ * A InputFormat that reads input data from an SQL table in an Oracle db.
+ */
+@InterfaceAudience.Public
+@InterfaceStability.Evolving
+public class OracleDataDrivenDBInputFormat<T extends DBWritable>
+ extends DataDrivenDBInputFormat<T> implements Configurable {
+
+ /**
+ * @return the DBSplitter implementation to use to divide the table/query into InputSplits.
+ */
+ @Override
+ protected DBSplitter getSplitter(int sqlDataType) {
+ switch (sqlDataType) {
+ case Types.DATE:
+ case Types.TIME:
+ case Types.TIMESTAMP:
+ return new OracleDateSplitter();
+
+ default:
+ return super.getSplitter(sqlDataType);
+ }
+ }
+
+ @Override
+ protected RecordReader<LongWritable, T> createDBRecordReader(DBInputSplit split,
+ Configuration conf) throws IOException {
+
+ DBConfiguration dbConf = getDBConf();
+ @SuppressWarnings("unchecked")
+ Class<T> inputClass = (Class<T>) (dbConf.getInputClass());
+
+ try {
+ // Use Oracle-specific db reader
+ return new OracleDataDrivenDBRecordReader<T>(split, inputClass,
+ conf, getConnection(), dbConf, dbConf.getInputConditions(),
+ dbConf.getInputFieldNames(), dbConf.getInputTableName());
+ } catch (SQLException ex) {
+ throw new IOException(ex.getMessage());
+ }
+ }
+}
Propchange: hadoop/common/branches/branch-1/src/mapred/org/apache/hadoop/mapreduce/lib/db/OracleDataDrivenDBInputFormat.java
------------------------------------------------------------------------------
svn:eol-style = native
Added: hadoop/common/branches/branch-1/src/mapred/org/apache/hadoop/mapreduce/lib/db/OracleDataDrivenDBRecordReader.java
URL: http://svn.apache.org/viewvc/hadoop/common/branches/branch-1/src/mapred/org/apache/hadoop/mapreduce/lib/db/OracleDataDrivenDBRecordReader.java?rev=1235548&view=auto
==============================================================================
--- hadoop/common/branches/branch-1/src/mapred/org/apache/hadoop/mapreduce/lib/db/OracleDataDrivenDBRecordReader.java (added)
+++ hadoop/common/branches/branch-1/src/mapred/org/apache/hadoop/mapreduce/lib/db/OracleDataDrivenDBRecordReader.java Tue Jan 24 23:21:58 2012
@@ -0,0 +1,49 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.hadoop.mapreduce.lib.db;
+
+import java.sql.Connection;
+import java.sql.ResultSet;
+import java.sql.PreparedStatement;
+import java.sql.SQLException;
+
+import org.apache.hadoop.classification.InterfaceAudience;
+import org.apache.hadoop.classification.InterfaceStability;
+import org.apache.hadoop.conf.Configuration;
+
+/**
+ * A RecordReader that reads records from a Oracle table via DataDrivenDBRecordReader
+ */
+@InterfaceAudience.Public
+@InterfaceStability.Evolving
+public class OracleDataDrivenDBRecordReader<T extends DBWritable>
+ extends DataDrivenDBRecordReader<T> {
+
+ public OracleDataDrivenDBRecordReader(DBInputFormat.DBInputSplit split,
+ Class<T> inputClass, Configuration conf, Connection conn,
+ DBConfiguration dbConfig, String cond, String [] fields,
+ String table) throws SQLException {
+
+ super(split, inputClass, conf, conn, dbConfig, cond, fields, table,
+ "ORACLE");
+
+ // Must initialize the tz used by the connection for Oracle.
+ OracleDBRecordReader.setSessionTimeZone(conf, conn);
+ }
+}
Propchange: hadoop/common/branches/branch-1/src/mapred/org/apache/hadoop/mapreduce/lib/db/OracleDataDrivenDBRecordReader.java
------------------------------------------------------------------------------
svn:eol-style = native
Added: hadoop/common/branches/branch-1/src/mapred/org/apache/hadoop/mapreduce/lib/db/OracleDateSplitter.java
URL: http://svn.apache.org/viewvc/hadoop/common/branches/branch-1/src/mapred/org/apache/hadoop/mapreduce/lib/db/OracleDateSplitter.java?rev=1235548&view=auto
==============================================================================
--- hadoop/common/branches/branch-1/src/mapred/org/apache/hadoop/mapreduce/lib/db/OracleDateSplitter.java (added)
+++ hadoop/common/branches/branch-1/src/mapred/org/apache/hadoop/mapreduce/lib/db/OracleDateSplitter.java Tue Jan 24 23:21:58 2012
@@ -0,0 +1,42 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.hadoop.mapreduce.lib.db;
+
+import java.util.Date;
+
+import org.apache.hadoop.classification.InterfaceAudience;
+import org.apache.hadoop.classification.InterfaceStability;
+
+/**
+ * Implement DBSplitter over date/time values returned by an Oracle db.
+ * Make use of logic from DateSplitter, since this just needs to use
+ * some Oracle-specific functions on the formatting end when generating
+ * InputSplits.
+ */
+@InterfaceAudience.Public
+@InterfaceStability.Evolving
+public class OracleDateSplitter extends DateSplitter {
+
+ @SuppressWarnings("unchecked")
+ @Override
+ protected String dateToString(Date d) {
+ // Oracle Data objects are always actually Timestamps
+ return "TO_TIMESTAMP('" + d.toString() + "', 'YYYY-MM-DD HH24:MI:SS.FF')";
+ }
+}
Propchange: hadoop/common/branches/branch-1/src/mapred/org/apache/hadoop/mapreduce/lib/db/OracleDateSplitter.java
------------------------------------------------------------------------------
svn:eol-style = native
Added: hadoop/common/branches/branch-1/src/mapred/org/apache/hadoop/mapreduce/lib/db/TextSplitter.java
URL: http://svn.apache.org/viewvc/hadoop/common/branches/branch-1/src/mapred/org/apache/hadoop/mapreduce/lib/db/TextSplitter.java?rev=1235548&view=auto
==============================================================================
--- hadoop/common/branches/branch-1/src/mapred/org/apache/hadoop/mapreduce/lib/db/TextSplitter.java (added)
+++ hadoop/common/branches/branch-1/src/mapred/org/apache/hadoop/mapreduce/lib/db/TextSplitter.java Tue Jan 24 23:21:58 2012
@@ -0,0 +1,220 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.hadoop.mapreduce.lib.db;
+
+import java.sql.ResultSet;
+import java.sql.SQLException;
+import java.math.BigDecimal;
+import java.util.ArrayList;
+import java.util.Iterator;
+import java.util.List;
+
+import org.apache.commons.logging.Log;
+import org.apache.commons.logging.LogFactory;
+
+import org.apache.hadoop.classification.InterfaceAudience;
+import org.apache.hadoop.classification.InterfaceStability;
+import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.mapreduce.InputSplit;
+
+/**
+ * Implement DBSplitter over text strings.
+ */
+@InterfaceAudience.Public
+@InterfaceStability.Evolving
+public class TextSplitter extends BigDecimalSplitter {
+
+ private static final Log LOG = LogFactory.getLog(TextSplitter.class);
+
+ /**
+ * This method needs to determine the splits between two user-provided strings.
+ * In the case where the user's strings are 'A' and 'Z', this is not hard; we
+ * could create two splits from ['A', 'M') and ['M', 'Z'], 26 splits for strings
+ * beginning with each letter, etc.
+ *
+ * If a user has provided us with the strings "Ham" and "Haze", however, we need
+ * to create splits that differ in the third letter.
+ *
+ * The algorithm used is as follows:
+ * Since there are 2**16 unicode characters, we interpret characters as digits in
+ * base 65536. Given a string 's' containing characters s_0, s_1 .. s_n, we interpret
+ * the string as the number: 0.s_0 s_1 s_2.. s_n in base 65536. Having mapped the
+ * low and high strings into floating-point values, we then use the BigDecimalSplitter
+ * to establish the even split points, then map the resulting floating point values
+ * back into strings.
+ */
+ public List<InputSplit> split(Configuration conf, ResultSet results, String colName)
+ throws SQLException {
+
+ LOG.warn("Generating splits for a textual index column.");
+ LOG.warn("If your database sorts in a case-insensitive order, "
+ + "this may result in a partial import or duplicate records.");
+ LOG.warn("You are strongly encouraged to choose an integral split column.");
+
+ String minString = results.getString(1);
+ String maxString = results.getString(2);
+
+ boolean minIsNull = false;
+
+ // If the min value is null, switch it to an empty string instead for purposes
+ // of interpolation. Then add [null, null] as a special case split.
+ if (null == minString) {
+ minString = "";
+ minIsNull = true;
+ }
+
+ if (null == maxString) {
+ // If the max string is null, then the min string has to be null too.
+ // Just return a special split for this case.
+ List<InputSplit> splits = new ArrayList<InputSplit>();
+ splits.add(new DataDrivenDBInputFormat.DataDrivenDBInputSplit(
+ colName + " IS NULL", colName + " IS NULL"));
+ return splits;
+ }
+
+ // Use this as a hint. May need an extra task if the size doesn't
+ // divide cleanly.
+ int numSplits = conf.getInt("mapred.map.tasks", 1);
+
+ String lowClausePrefix = colName + " >= '";
+ String highClausePrefix = colName + " < '";
+
+ // If there is a common prefix between minString and maxString, establish it
+ // and pull it out of minString and maxString.
+ int maxPrefixLen = Math.min(minString.length(), maxString.length());
+ int sharedLen;
+ for (sharedLen = 0; sharedLen < maxPrefixLen; sharedLen++) {
+ char c1 = minString.charAt(sharedLen);
+ char c2 = maxString.charAt(sharedLen);
+ if (c1 != c2) {
+ break;
+ }
+ }
+
+ // The common prefix has length 'sharedLen'. Extract it from both.
+ String commonPrefix = minString.substring(0, sharedLen);
+ minString = minString.substring(sharedLen);
+ maxString = maxString.substring(sharedLen);
+
+ List<String> splitStrings = split(numSplits, minString, maxString, commonPrefix);
+ List<InputSplit> splits = new ArrayList<InputSplit>();
+
+ // Convert the list of split point strings into an actual set of InputSplits.
+ String start = splitStrings.get(0);
+ for (int i = 1; i < splitStrings.size(); i++) {
+ String end = splitStrings.get(i);
+
+ if (i == splitStrings.size() - 1) {
+ // This is the last one; use a closed interval.
+ splits.add(new DataDrivenDBInputFormat.DataDrivenDBInputSplit(
+ lowClausePrefix + start + "'", colName + " <= '" + end + "'"));
+ } else {
+ // Normal open-interval case.
+ splits.add(new DataDrivenDBInputFormat.DataDrivenDBInputSplit(
+ lowClausePrefix + start + "'", highClausePrefix + end + "'"));
+ }
+ }
+
+ if (minIsNull) {
+ // Add the special null split at the end.
+ splits.add(new DataDrivenDBInputFormat.DataDrivenDBInputSplit(
+ colName + " IS NULL", colName + " IS NULL"));
+ }
+
+ return splits;
+ }
+
+ List<String> split(int numSplits, String minString, String maxString, String commonPrefix)
+ throws SQLException {
+
+ BigDecimal minVal = stringToBigDecimal(minString);
+ BigDecimal maxVal = stringToBigDecimal(maxString);
+
+ List<BigDecimal> splitPoints = split(new BigDecimal(numSplits), minVal, maxVal);
+ List<String> splitStrings = new ArrayList<String>();
+
+ // Convert the BigDecimal splitPoints into their string representations.
+ for (BigDecimal bd : splitPoints) {
+ splitStrings.add(commonPrefix + bigDecimalToString(bd));
+ }
+
+ // Make sure that our user-specified boundaries are the first and last entries
+ // in the array.
+ if (splitStrings.size() == 0 || !splitStrings.get(0).equals(commonPrefix + minString)) {
+ splitStrings.add(0, commonPrefix + minString);
+ }
+ if (splitStrings.size() == 1
+ || !splitStrings.get(splitStrings.size() - 1).equals(commonPrefix + maxString)) {
+ splitStrings.add(commonPrefix + maxString);
+ }
+
+ return splitStrings;
+ }
+
+ private final static BigDecimal ONE_PLACE = new BigDecimal(65536);
+
+ // Maximum number of characters to convert. This is to prevent rounding errors
+ // or repeating fractions near the very bottom from getting out of control. Note
+ // that this still gives us a huge number of possible splits.
+ private final static int MAX_CHARS = 8;
+
+ /**
+ * Return a BigDecimal representation of string 'str' suitable for use
+ * in a numerically-sorting order.
+ */
+ BigDecimal stringToBigDecimal(String str) {
+ BigDecimal result = BigDecimal.ZERO;
+ BigDecimal curPlace = ONE_PLACE; // start with 1/65536 to compute the first digit.
+
+ int len = Math.min(str.length(), MAX_CHARS);
+
+ for (int i = 0; i < len; i++) {
+ int codePoint = str.codePointAt(i);
+ result = result.add(tryDivide(new BigDecimal(codePoint), curPlace));
+ // advance to the next less significant place. e.g., 1/(65536^2) for the second char.
+ curPlace = curPlace.multiply(ONE_PLACE);
+ }
+
+ return result;
+ }
+
+ /**
+ * Return the string encoded in a BigDecimal.
+ * Repeatedly multiply the input value by 65536; the integer portion after such a multiplication
+ * represents a single character in base 65536. Convert that back into a char and create a
+ * string out of these until we have no data left.
+ */
+ String bigDecimalToString(BigDecimal bd) {
+ BigDecimal cur = bd.stripTrailingZeros();
+ StringBuilder sb = new StringBuilder();
+
+ for (int numConverted = 0; numConverted < MAX_CHARS; numConverted++) {
+ cur = cur.multiply(ONE_PLACE);
+ int curCodePoint = cur.intValue();
+ if (0 == curCodePoint) {
+ break;
+ }
+
+ cur = cur.subtract(new BigDecimal(curCodePoint));
+ sb.append(Character.toChars(curCodePoint));
+ }
+
+ return sb.toString();
+ }
+}
Propchange: hadoop/common/branches/branch-1/src/mapred/org/apache/hadoop/mapreduce/lib/db/TextSplitter.java
------------------------------------------------------------------------------
svn:eol-style = native
Added: hadoop/common/branches/branch-1/src/mapred/org/apache/hadoop/mapreduce/lib/fieldsel/FieldSelectionHelper.java
URL: http://svn.apache.org/viewvc/hadoop/common/branches/branch-1/src/mapred/org/apache/hadoop/mapreduce/lib/fieldsel/FieldSelectionHelper.java?rev=1235548&view=auto
==============================================================================
--- hadoop/common/branches/branch-1/src/mapred/org/apache/hadoop/mapreduce/lib/fieldsel/FieldSelectionHelper.java (added)
+++ hadoop/common/branches/branch-1/src/mapred/org/apache/hadoop/mapreduce/lib/fieldsel/FieldSelectionHelper.java Tue Jan 24 23:21:58 2012
@@ -0,0 +1,227 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.hadoop.mapreduce.lib.fieldsel;
+
+import java.util.List;
+
+import org.apache.hadoop.classification.InterfaceAudience;
+import org.apache.hadoop.classification.InterfaceStability;
+import org.apache.hadoop.io.Text;
+
+/**
+ * This class implements a mapper/reducer class that can be used to perform
+ * field selections in a manner similar to unix cut. The input data is treated
+ * as fields separated by a user specified separator (the default value is
+ * "\t"). The user can specify a list of fields that form the map output keys,
+ * and a list of fields that form the map output values. If the inputformat is
+ * TextInputFormat, the mapper will ignore the key to the map function. and the
+ * fields are from the value only. Otherwise, the fields are the union of those
+ * from the key and those from the value.
+ *
+ * The field separator is under attribute "mapreduce.fieldsel.data.field.separator"
+ *
+ * The map output field list spec is under attribute
+ * "mapreduce.fieldsel.map.output.key.value.fields.spec".
+ * The value is expected to be like "keyFieldsSpec:valueFieldsSpec"
+ * key/valueFieldsSpec are comma (,) separated field spec: fieldSpec,fieldSpec,fieldSpec ...
+ * Each field spec can be a simple number (e.g. 5) specifying a specific field, or a range
+ * (like 2-5) to specify a range of fields, or an open range (like 3-) specifying all
+ * the fields starting from field 3. The open range field spec applies value fields only.
+ * They have no effect on the key fields.
+ *
+ * Here is an example: "4,3,0,1:6,5,1-3,7-". It specifies to use fields 4,3,0 and 1 for keys,
+ * and use fields 6,5,1,2,3,7 and above for values.
+ *
+ * The reduce output field list spec is under attribute
+ * "mapreduce.fieldsel.reduce.output.key.value.fields.spec".
+ *
+ * The reducer extracts output key/value pairs in a similar manner, except that
+ * the key is never ignored.
+ *
+ */
+@InterfaceAudience.Public
+@InterfaceStability.Stable
+public class FieldSelectionHelper {
+
+ public static Text emptyText = new Text("");
+ public static final String DATA_FIELD_SEPERATOR =
+ "mapreduce.fieldsel.data.field.separator";
+ public static final String MAP_OUTPUT_KEY_VALUE_SPEC =
+ "mapreduce.fieldsel.map.output.key.value.fields.spec";
+ public static final String REDUCE_OUTPUT_KEY_VALUE_SPEC =
+ "mapreduce.fieldsel.reduce.output.key.value.fields.spec";
+
+
+ /**
+ * Extract the actual field numbers from the given field specs.
+ * If a field spec is in the form of "n-" (like 3-), then n will be the
+ * return value. Otherwise, -1 will be returned.
+ * @param fieldListSpec an array of field specs
+ * @param fieldList an array of field numbers extracted from the specs.
+ * @return number n if some field spec is in the form of "n-", -1 otherwise.
+ */
+ private static int extractFields(String[] fieldListSpec,
+ List<Integer> fieldList) {
+ int allFieldsFrom = -1;
+ int i = 0;
+ int j = 0;
+ int pos = -1;
+ String fieldSpec = null;
+ for (i = 0; i < fieldListSpec.length; i++) {
+ fieldSpec = fieldListSpec[i];
+ if (fieldSpec.length() == 0) {
+ continue;
+ }
+ pos = fieldSpec.indexOf('-');
+ if (pos < 0) {
+ Integer fn = new Integer(fieldSpec);
+ fieldList.add(fn);
+ } else {
+ String start = fieldSpec.substring(0, pos);
+ String end = fieldSpec.substring(pos + 1);
+ if (start.length() == 0) {
+ start = "0";
+ }
+ if (end.length() == 0) {
+ allFieldsFrom = Integer.parseInt(start);
+ continue;
+ }
+ int startPos = Integer.parseInt(start);
+ int endPos = Integer.parseInt(end);
+ for (j = startPos; j <= endPos; j++) {
+ fieldList.add(j);
+ }
+ }
+ }
+ return allFieldsFrom;
+ }
+
+ private static String selectFields(String[] fields, List<Integer> fieldList,
+ int allFieldsFrom, String separator) {
+ String retv = null;
+ int i = 0;
+ StringBuffer sb = null;
+ if (fieldList != null && fieldList.size() > 0) {
+ if (sb == null) {
+ sb = new StringBuffer();
+ }
+ for (Integer index : fieldList) {
+ if (index < fields.length) {
+ sb.append(fields[index]);
+ }
+ sb.append(separator);
+ }
+ }
+ if (allFieldsFrom >= 0) {
+ if (sb == null) {
+ sb = new StringBuffer();
+ }
+ for (i = allFieldsFrom; i < fields.length; i++) {
+ sb.append(fields[i]).append(separator);
+ }
+ }
+ if (sb != null) {
+ retv = sb.toString();
+ if (retv.length() > 0) {
+ retv = retv.substring(0, retv.length() - 1);
+ }
+ }
+ return retv;
+ }
+
+ public static int parseOutputKeyValueSpec(String keyValueSpec,
+ List<Integer> keyFieldList, List<Integer> valueFieldList) {
+ String[] keyValSpecs = keyValueSpec.split(":", -1);
+
+ String[] keySpec = keyValSpecs[0].split(",");
+
+ String[] valSpec = new String[0];
+ if (keyValSpecs.length > 1) {
+ valSpec = keyValSpecs[1].split(",");
+ }
+
+ FieldSelectionHelper.extractFields(keySpec, keyFieldList);
+ return FieldSelectionHelper.extractFields(valSpec, valueFieldList);
+ }
+
+ public static String specToString(String fieldSeparator, String keyValueSpec,
+ int allValueFieldsFrom, List<Integer> keyFieldList,
+ List<Integer> valueFieldList) {
+ StringBuffer sb = new StringBuffer();
+ sb.append("fieldSeparator: ").append(fieldSeparator).append("\n");
+
+ sb.append("keyValueSpec: ").append(keyValueSpec).append("\n");
+ sb.append("allValueFieldsFrom: ").append(allValueFieldsFrom);
+ sb.append("\n");
+ sb.append("keyFieldList.length: ").append(keyFieldList.size());
+ sb.append("\n");
+ for (Integer field : keyFieldList) {
+ sb.append("\t").append(field).append("\n");
+ }
+ sb.append("valueFieldList.length: ").append(valueFieldList.size());
+ sb.append("\n");
+ for (Integer field : valueFieldList) {
+ sb.append("\t").append(field).append("\n");
+ }
+ return sb.toString();
+ }
+
+ private Text key = null;
+ private Text value = null;
+
+ public FieldSelectionHelper() {
+ }
+
+ public FieldSelectionHelper(Text key, Text val) {
+ this.key = key;
+ this.value = val;
+ }
+
+ public Text getKey() {
+ return key;
+ }
+
+ public Text getValue() {
+ return value;
+ }
+
+ public void extractOutputKeyValue(String key, String val,
+ String fieldSep, List<Integer> keyFieldList, List<Integer> valFieldList,
+ int allValueFieldsFrom, boolean ignoreKey, boolean isMap) {
+ if (!ignoreKey) {
+ val = key + val;
+ }
+ String[] fields = val.split(fieldSep);
+
+ String newKey = selectFields(fields, keyFieldList, -1, fieldSep);
+ String newVal = selectFields(fields, valFieldList, allValueFieldsFrom,
+ fieldSep);
+ if (isMap && newKey == null) {
+ newKey = newVal;
+ newVal = null;
+ }
+
+ if (newKey != null) {
+ this.key = new Text(newKey);
+ }
+ if (newVal != null) {
+ this.value = new Text(newVal);
+ }
+ }
+}
Propchange: hadoop/common/branches/branch-1/src/mapred/org/apache/hadoop/mapreduce/lib/fieldsel/FieldSelectionHelper.java
------------------------------------------------------------------------------
svn:eol-style = native
Added: hadoop/common/branches/branch-1/src/mapred/org/apache/hadoop/mapreduce/lib/fieldsel/FieldSelectionMapper.java
URL: http://svn.apache.org/viewvc/hadoop/common/branches/branch-1/src/mapred/org/apache/hadoop/mapreduce/lib/fieldsel/FieldSelectionMapper.java?rev=1235548&view=auto
==============================================================================
--- hadoop/common/branches/branch-1/src/mapred/org/apache/hadoop/mapreduce/lib/fieldsel/FieldSelectionMapper.java (added)
+++ hadoop/common/branches/branch-1/src/mapred/org/apache/hadoop/mapreduce/lib/fieldsel/FieldSelectionMapper.java Tue Jan 24 23:21:58 2012
@@ -0,0 +1,110 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.hadoop.mapreduce.lib.fieldsel;
+
+import java.io.IOException;
+import java.util.ArrayList;
+import java.util.List;
+
+import org.apache.commons.logging.Log;
+import org.apache.commons.logging.LogFactory;
+import org.apache.hadoop.classification.InterfaceAudience;
+import org.apache.hadoop.classification.InterfaceStability;
+import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.io.Text;
+import org.apache.hadoop.mapreduce.Mapper;
+import org.apache.hadoop.mapreduce.lib.input.TextInputFormat;
+
+/**
+ * This class implements a mapper class that can be used to perform
+ * field selections in a manner similar to unix cut. The input data is treated
+ * as fields separated by a user specified separator (the default value is
+ * "\t"). The user can specify a list of fields that form the map output keys,
+ * and a list of fields that form the map output values. If the inputformat is
+ * TextInputFormat, the mapper will ignore the key to the map function. and the
+ * fields are from the value only. Otherwise, the fields are the union of those
+ * from the key and those from the value.
+ *
+ * The field separator is under attribute "mapreduce.fieldsel.data.field.separator"
+ *
+ * The map output field list spec is under attribute
+ * "mapreduce.fieldsel.map.output.key.value.fields.spec".
+ * The value is expected to be like
+ * "keyFieldsSpec:valueFieldsSpec" key/valueFieldsSpec are comma (,) separated
+ * field spec: fieldSpec,fieldSpec,fieldSpec ... Each field spec can be a
+ * simple number (e.g. 5) specifying a specific field, or a range (like 2-5)
+ * to specify a range of fields, or an open range (like 3-) specifying all
+ * the fields starting from field 3. The open range field spec applies value
+ * fields only. They have no effect on the key fields.
+ *
+ * Here is an example: "4,3,0,1:6,5,1-3,7-". It specifies to use fields
+ * 4,3,0 and 1 for keys, and use fields 6,5,1,2,3,7 and above for values.
+ */
+@InterfaceAudience.Public
+@InterfaceStability.Stable
+public class FieldSelectionMapper<K, V>
+ extends Mapper<K, V, Text, Text> {
+
+ private String mapOutputKeyValueSpec;
+
+ private boolean ignoreInputKey;
+
+ private String fieldSeparator = "\t";
+
+ private List<Integer> mapOutputKeyFieldList = new ArrayList<Integer>();
+
+ private List<Integer> mapOutputValueFieldList = new ArrayList<Integer>();
+
+ private int allMapValueFieldsFrom = -1;
+
+ public static final Log LOG = LogFactory.getLog("FieldSelectionMapReduce");
+
+ public void setup(Context context)
+ throws IOException, InterruptedException {
+ Configuration conf = context.getConfiguration();
+ this.fieldSeparator =
+ conf.get(FieldSelectionHelper.DATA_FIELD_SEPERATOR, "\t");
+ this.mapOutputKeyValueSpec =
+ conf.get(FieldSelectionHelper.MAP_OUTPUT_KEY_VALUE_SPEC, "0-:");
+ try {
+ this.ignoreInputKey = TextInputFormat.class.getCanonicalName().equals(
+ context.getInputFormatClass().getCanonicalName());
+ } catch (ClassNotFoundException e) {
+ throw new IOException("Input format class not found", e);
+ }
+ allMapValueFieldsFrom = FieldSelectionHelper.parseOutputKeyValueSpec(
+ mapOutputKeyValueSpec, mapOutputKeyFieldList, mapOutputValueFieldList);
+ LOG.info(FieldSelectionHelper.specToString(fieldSeparator,
+ mapOutputKeyValueSpec, allMapValueFieldsFrom, mapOutputKeyFieldList,
+ mapOutputValueFieldList) + "\nignoreInputKey:" + ignoreInputKey);
+ }
+
+ /**
+ * The identify function. Input key/value pair is written directly to output.
+ */
+ public void map(K key, V val, Context context)
+ throws IOException, InterruptedException {
+ FieldSelectionHelper helper = new FieldSelectionHelper(
+ FieldSelectionHelper.emptyText, FieldSelectionHelper.emptyText);
+ helper.extractOutputKeyValue(key.toString(), val.toString(),
+ fieldSeparator, mapOutputKeyFieldList, mapOutputValueFieldList,
+ allMapValueFieldsFrom, ignoreInputKey, true);
+ context.write(helper.getKey(), helper.getValue());
+ }
+}
Propchange: hadoop/common/branches/branch-1/src/mapred/org/apache/hadoop/mapreduce/lib/fieldsel/FieldSelectionMapper.java
------------------------------------------------------------------------------
svn:eol-style = native
Added: hadoop/common/branches/branch-1/src/mapred/org/apache/hadoop/mapreduce/lib/fieldsel/FieldSelectionReducer.java
URL: http://svn.apache.org/viewvc/hadoop/common/branches/branch-1/src/mapred/org/apache/hadoop/mapreduce/lib/fieldsel/FieldSelectionReducer.java?rev=1235548&view=auto
==============================================================================
--- hadoop/common/branches/branch-1/src/mapred/org/apache/hadoop/mapreduce/lib/fieldsel/FieldSelectionReducer.java (added)
+++ hadoop/common/branches/branch-1/src/mapred/org/apache/hadoop/mapreduce/lib/fieldsel/FieldSelectionReducer.java Tue Jan 24 23:21:58 2012
@@ -0,0 +1,106 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.hadoop.mapreduce.lib.fieldsel;
+
+import java.io.IOException;
+import java.util.ArrayList;
+import java.util.List;
+
+import org.apache.commons.logging.Log;
+import org.apache.commons.logging.LogFactory;
+import org.apache.hadoop.classification.InterfaceAudience;
+import org.apache.hadoop.classification.InterfaceStability;
+import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.io.Text;
+import org.apache.hadoop.mapreduce.Reducer;
+
+/**
+ * This class implements a reducer class that can be used to perform field
+ * selections in a manner similar to unix cut.
+ *
+ * The input data is treated as fields separated by a user specified
+ * separator (the default value is "\t"). The user can specify a list of
+ * fields that form the reduce output keys, and a list of fields that form
+ * the reduce output values. The fields are the union of those from the key
+ * and those from the value.
+ *
+ * The field separator is under attribute "mapreduce.fieldsel.data.field.separator"
+ *
+ * The reduce output field list spec is under attribute
+ * "mapreduce.fieldsel.reduce.output.key.value.fields.spec".
+ * The value is expected to be like
+ * "keyFieldsSpec:valueFieldsSpec" key/valueFieldsSpec are comma (,)
+ * separated field spec: fieldSpec,fieldSpec,fieldSpec ... Each field spec
+ * can be a simple number (e.g. 5) specifying a specific field, or a range
+ * (like 2-5) to specify a range of fields, or an open range (like 3-)
+ * specifying all the fields starting from field 3. The open range field
+ * spec applies value fields only. They have no effect on the key fields.
+ *
+ * Here is an example: "4,3,0,1:6,5,1-3,7-". It specifies to use fields
+ * 4,3,0 and 1 for keys, and use fields 6,5,1,2,3,7 and above for values.
+ */
+@InterfaceAudience.Public
+@InterfaceStability.Stable
+public class FieldSelectionReducer<K, V>
+ extends Reducer<Text, Text, Text, Text> {
+
+ private String fieldSeparator = "\t";
+
+ private String reduceOutputKeyValueSpec;
+
+ private List<Integer> reduceOutputKeyFieldList = new ArrayList<Integer>();
+
+ private List<Integer> reduceOutputValueFieldList = new ArrayList<Integer>();
+
+ private int allReduceValueFieldsFrom = -1;
+
+ public static final Log LOG = LogFactory.getLog("FieldSelectionMapReduce");
+
+ public void setup(Context context)
+ throws IOException, InterruptedException {
+ Configuration conf = context.getConfiguration();
+
+ this.fieldSeparator =
+ conf.get(FieldSelectionHelper.DATA_FIELD_SEPERATOR, "\t");
+
+ this.reduceOutputKeyValueSpec =
+ conf.get(FieldSelectionHelper.REDUCE_OUTPUT_KEY_VALUE_SPEC, "0-:");
+
+ allReduceValueFieldsFrom = FieldSelectionHelper.parseOutputKeyValueSpec(
+ reduceOutputKeyValueSpec, reduceOutputKeyFieldList,
+ reduceOutputValueFieldList);
+
+ LOG.info(FieldSelectionHelper.specToString(fieldSeparator,
+ reduceOutputKeyValueSpec, allReduceValueFieldsFrom,
+ reduceOutputKeyFieldList, reduceOutputValueFieldList));
+ }
+
+ public void reduce(Text key, Iterable<Text> values, Context context)
+ throws IOException, InterruptedException {
+ String keyStr = key.toString() + this.fieldSeparator;
+
+ for (Text val : values) {
+ FieldSelectionHelper helper = new FieldSelectionHelper();
+ helper.extractOutputKeyValue(keyStr, val.toString(),
+ fieldSeparator, reduceOutputKeyFieldList,
+ reduceOutputValueFieldList, allReduceValueFieldsFrom, false, false);
+ context.write(helper.getKey(), helper.getValue());
+ }
+ }
+}
Propchange: hadoop/common/branches/branch-1/src/mapred/org/apache/hadoop/mapreduce/lib/fieldsel/FieldSelectionReducer.java
------------------------------------------------------------------------------
svn:eol-style = native