You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@sqoop.apache.org by bl...@apache.org on 2011/10/28 18:50:41 UTC

svn commit: r1190441 [1/3] - in /incubator/sqoop/trunk/src: java/com/cloudera/sqoop/mapreduce/db/ java/org/apache/sqoop/mapreduce/ java/org/apache/sqoop/mapreduce/db/ test/com/cloudera/sqoop/mapreduce/db/ test/org/apache/sqoop/ test/org/apache/sqoop/ma...

Author: blee
Date: Fri Oct 28 16:50:39 2011
New Revision: 1190441

URL: http://svn.apache.org/viewvc?rev=1190441&view=rev
Log:
SQOOP-377 Migrate mapreduce.db package to new name space

Added:
    incubator/sqoop/trunk/src/java/org/apache/sqoop/mapreduce/
    incubator/sqoop/trunk/src/java/org/apache/sqoop/mapreduce/db/
    incubator/sqoop/trunk/src/java/org/apache/sqoop/mapreduce/db/BigDecimalSplitter.java   (with props)
    incubator/sqoop/trunk/src/java/org/apache/sqoop/mapreduce/db/BooleanSplitter.java   (with props)
    incubator/sqoop/trunk/src/java/org/apache/sqoop/mapreduce/db/DBConfiguration.java   (with props)
    incubator/sqoop/trunk/src/java/org/apache/sqoop/mapreduce/db/DBInputFormat.java   (with props)
    incubator/sqoop/trunk/src/java/org/apache/sqoop/mapreduce/db/DBOutputFormat.java   (with props)
    incubator/sqoop/trunk/src/java/org/apache/sqoop/mapreduce/db/DBRecordReader.java   (with props)
    incubator/sqoop/trunk/src/java/org/apache/sqoop/mapreduce/db/DBSplitter.java   (with props)
    incubator/sqoop/trunk/src/java/org/apache/sqoop/mapreduce/db/DataDrivenDBInputFormat.java   (with props)
    incubator/sqoop/trunk/src/java/org/apache/sqoop/mapreduce/db/DataDrivenDBRecordReader.java   (with props)
    incubator/sqoop/trunk/src/java/org/apache/sqoop/mapreduce/db/DateSplitter.java   (with props)
    incubator/sqoop/trunk/src/java/org/apache/sqoop/mapreduce/db/FloatSplitter.java   (with props)
    incubator/sqoop/trunk/src/java/org/apache/sqoop/mapreduce/db/IntegerSplitter.java   (with props)
    incubator/sqoop/trunk/src/java/org/apache/sqoop/mapreduce/db/OracleDBRecordReader.java   (with props)
    incubator/sqoop/trunk/src/java/org/apache/sqoop/mapreduce/db/OracleDataDrivenDBInputFormat.java   (with props)
    incubator/sqoop/trunk/src/java/org/apache/sqoop/mapreduce/db/OracleDataDrivenDBRecordReader.java   (with props)
    incubator/sqoop/trunk/src/java/org/apache/sqoop/mapreduce/db/OracleDateSplitter.java   (with props)
    incubator/sqoop/trunk/src/java/org/apache/sqoop/mapreduce/db/TextSplitter.java   (with props)
    incubator/sqoop/trunk/src/test/org/apache/sqoop/
    incubator/sqoop/trunk/src/test/org/apache/sqoop/mapreduce/
    incubator/sqoop/trunk/src/test/org/apache/sqoop/mapreduce/db/
    incubator/sqoop/trunk/src/test/org/apache/sqoop/mapreduce/db/TestIntegerSplitter.java   (with props)
    incubator/sqoop/trunk/src/test/org/apache/sqoop/mapreduce/db/TestTextSplitter.java   (with props)
Modified:
    incubator/sqoop/trunk/src/java/com/cloudera/sqoop/mapreduce/db/BigDecimalSplitter.java
    incubator/sqoop/trunk/src/java/com/cloudera/sqoop/mapreduce/db/BooleanSplitter.java
    incubator/sqoop/trunk/src/java/com/cloudera/sqoop/mapreduce/db/DBConfiguration.java
    incubator/sqoop/trunk/src/java/com/cloudera/sqoop/mapreduce/db/DBInputFormat.java
    incubator/sqoop/trunk/src/java/com/cloudera/sqoop/mapreduce/db/DBOutputFormat.java
    incubator/sqoop/trunk/src/java/com/cloudera/sqoop/mapreduce/db/DBRecordReader.java
    incubator/sqoop/trunk/src/java/com/cloudera/sqoop/mapreduce/db/DBSplitter.java
    incubator/sqoop/trunk/src/java/com/cloudera/sqoop/mapreduce/db/DataDrivenDBInputFormat.java
    incubator/sqoop/trunk/src/java/com/cloudera/sqoop/mapreduce/db/DataDrivenDBRecordReader.java
    incubator/sqoop/trunk/src/java/com/cloudera/sqoop/mapreduce/db/DateSplitter.java
    incubator/sqoop/trunk/src/java/com/cloudera/sqoop/mapreduce/db/FloatSplitter.java
    incubator/sqoop/trunk/src/java/com/cloudera/sqoop/mapreduce/db/IntegerSplitter.java
    incubator/sqoop/trunk/src/java/com/cloudera/sqoop/mapreduce/db/OracleDBRecordReader.java
    incubator/sqoop/trunk/src/java/com/cloudera/sqoop/mapreduce/db/OracleDataDrivenDBInputFormat.java
    incubator/sqoop/trunk/src/java/com/cloudera/sqoop/mapreduce/db/OracleDataDrivenDBRecordReader.java
    incubator/sqoop/trunk/src/java/com/cloudera/sqoop/mapreduce/db/OracleDateSplitter.java
    incubator/sqoop/trunk/src/java/com/cloudera/sqoop/mapreduce/db/TextSplitter.java
    incubator/sqoop/trunk/src/test/com/cloudera/sqoop/mapreduce/db/TestIntegerSplitter.java
    incubator/sqoop/trunk/src/test/com/cloudera/sqoop/mapreduce/db/TestTextSplitter.java

Modified: incubator/sqoop/trunk/src/java/com/cloudera/sqoop/mapreduce/db/BigDecimalSplitter.java
URL: http://svn.apache.org/viewvc/incubator/sqoop/trunk/src/java/com/cloudera/sqoop/mapreduce/db/BigDecimalSplitter.java?rev=1190441&r1=1190440&r2=1190441&view=diff
==============================================================================
--- incubator/sqoop/trunk/src/java/com/cloudera/sqoop/mapreduce/db/BigDecimalSplitter.java (original)
+++ incubator/sqoop/trunk/src/java/com/cloudera/sqoop/mapreduce/db/BigDecimalSplitter.java Fri Oct 28 16:50:39 2011
@@ -1,6 +1,4 @@
 /**
- * Copyright 2011 The Apache Software Foundation
- *
  * Licensed to the Apache Software Foundation (ASF) under one
  * or more contributor license agreements.  See the NOTICE file
  * distributed with this work for additional information
@@ -17,137 +15,16 @@
  * See the License for the specific language governing permissions and
  * limitations under the License.
  */
-
 package com.cloudera.sqoop.mapreduce.db;
 
-import java.math.BigDecimal;
-import java.sql.ResultSet;
-import java.sql.SQLException;
-import java.util.ArrayList;
-import java.util.List;
-
-import org.apache.commons.logging.Log;
-import org.apache.commons.logging.LogFactory;
-
-import org.apache.hadoop.conf.Configuration;
-import org.apache.hadoop.mapreduce.InputSplit;
-
-import com.cloudera.sqoop.config.ConfigurationHelper;
 
 /**
  * Implement DBSplitter over BigDecimal values.
+ *
+ * @deprecated use org.apache.sqoop.mapreduce.db.BigDecimalSplitter instead.
+ * @see org.apache.sqoop.mapreduce.db.BigDecimalSplitter
  */
-public class BigDecimalSplitter implements DBSplitter {
-  private static final Log LOG = LogFactory.getLog(BigDecimalSplitter.class);
-
-  public List<InputSplit> split(Configuration conf, ResultSet results,
-      String colName) throws SQLException {
-
-    BigDecimal minVal = results.getBigDecimal(1);
-    BigDecimal maxVal = results.getBigDecimal(2);
-
-    String lowClausePrefix = colName + " >= ";
-    String highClausePrefix = colName + " < ";
-
-    BigDecimal numSplits = new BigDecimal(
-        ConfigurationHelper.getConfNumMaps(conf));
-
-    if (minVal == null && maxVal == null) {
-      // Range is null to null. Return a null split accordingly.
-      List<InputSplit> splits = new ArrayList<InputSplit>();
-      splits.add(new DataDrivenDBInputFormat.DataDrivenDBInputSplit(
-          colName + " IS NULL", colName + " IS NULL"));
-      return splits;
-    }
-
-    if (minVal == null || maxVal == null) {
-      // Don't know what is a reasonable min/max value for interpolation. Fail.
-      LOG.error("Cannot find a range for NUMERIC or DECIMAL "
-          + "fields with one end NULL.");
-      return null;
-    }
-
-    // Get all the split points together.
-    List<BigDecimal> splitPoints = split(numSplits, minVal, maxVal);
-    List<InputSplit> splits = new ArrayList<InputSplit>();
-
-    // Turn the split points into a set of intervals.
-    BigDecimal start = splitPoints.get(0);
-    for (int i = 1; i < splitPoints.size(); i++) {
-      BigDecimal end = splitPoints.get(i);
-
-      if (i == splitPoints.size() - 1) {
-        // This is the last one; use a closed interval.
-        splits.add(new DataDrivenDBInputFormat.DataDrivenDBInputSplit(
-            lowClausePrefix + start.toString(),
-            colName + " <= " + end.toString()));
-      } else {
-        // Normal open-interval case.
-        splits.add(new DataDrivenDBInputFormat.DataDrivenDBInputSplit(
-            lowClausePrefix + start.toString(),
-            highClausePrefix + end.toString()));
-      }
-
-      start = end;
-    }
-
-    return splits;
-  }
-
-  private static final BigDecimal MIN_INCREMENT =
-      new BigDecimal(10000 * Double.MIN_VALUE);
-
-  /**
-   * Divide numerator by denominator. If impossible in exact mode, use rounding.
-   */
-  protected BigDecimal tryDivide(BigDecimal numerator, BigDecimal denominator) {
-    try {
-      return numerator.divide(denominator);
-    } catch (ArithmeticException ae) {
-      return numerator.divide(denominator, BigDecimal.ROUND_HALF_UP);
-    }
-  }
-
-  /**
-   * Returns a list of BigDecimals one element longer than the list of input
-   * splits.  This represents the boundaries between input splits.  All splits
-   * are open on the top end, except the last one.
-   *
-   * So the list [0, 5, 8, 12, 18] would represent splits capturing the
-   * intervals:
-   *
-   * [0, 5)
-   * [5, 8)
-   * [8, 12)
-   * [12, 18] note the closed interval for the last split.
-   */
-  List<BigDecimal> split(BigDecimal numSplits, BigDecimal minVal,
-      BigDecimal maxVal) throws SQLException {
-
-    List<BigDecimal> splits = new ArrayList<BigDecimal>();
-
-    // Use numSplits as a hint. May need an extra task if the size doesn't
-    // divide cleanly.
-
-    BigDecimal splitSize = tryDivide(maxVal.subtract(minVal), (numSplits));
-    if (splitSize.compareTo(MIN_INCREMENT) < 0) {
-      splitSize = MIN_INCREMENT;
-      LOG.warn("Set BigDecimal splitSize to MIN_INCREMENT");
-    }
-
-    BigDecimal curVal = minVal;
-
-    while (curVal.compareTo(maxVal) <= 0) {
-      splits.add(curVal);
-      curVal = curVal.add(splitSize);
-    }
-
-    if (splits.get(splits.size() - 1).compareTo(maxVal) != 0
-        || splits.size() == 1) {
-      // We didn't end on the maxVal. Add that to the end of the list.
-      splits.add(maxVal);
-    }
+public class BigDecimalSplitter
+  extends org.apache.sqoop.mapreduce.db.BigDecimalSplitter {
 
-    return splits;
-  }
 }

Modified: incubator/sqoop/trunk/src/java/com/cloudera/sqoop/mapreduce/db/BooleanSplitter.java
URL: http://svn.apache.org/viewvc/incubator/sqoop/trunk/src/java/com/cloudera/sqoop/mapreduce/db/BooleanSplitter.java?rev=1190441&r1=1190440&r2=1190441&view=diff
==============================================================================
--- incubator/sqoop/trunk/src/java/com/cloudera/sqoop/mapreduce/db/BooleanSplitter.java (original)
+++ incubator/sqoop/trunk/src/java/com/cloudera/sqoop/mapreduce/db/BooleanSplitter.java Fri Oct 28 16:50:39 2011
@@ -1,6 +1,4 @@
 /**
- * Copyright 2011 The Apache Software Foundation
- *
  * Licensed to the Apache Software Foundation (ASF) under one
  * or more contributor license agreements.  See the NOTICE file
  * distributed with this work for additional information
@@ -17,53 +15,16 @@
  * See the License for the specific language governing permissions and
  * limitations under the License.
  */
-
 package com.cloudera.sqoop.mapreduce.db;
 
-import java.sql.ResultSet;
-import java.sql.SQLException;
-import java.util.ArrayList;
-import java.util.List;
-
-import org.apache.hadoop.conf.Configuration;
-import org.apache.hadoop.mapreduce.InputSplit;
 
 /**
  * Implement DBSplitter over boolean values.
+ *
+ * @deprecated use org.apache.sqoop.mapreduce.db.BooleanSplitter instead.
+ * @see org.apache.sqoop.mapreduce.db.BooleanSplitter
  */
-public class BooleanSplitter implements DBSplitter {
-  public List<InputSplit> split(Configuration conf, ResultSet results,
-      String colName) throws SQLException {
-
-    List<InputSplit> splits = new ArrayList<InputSplit>();
-
-    if (results.getString(1) == null && results.getString(2) == null) {
-      // Range is null to null. Return a null split accordingly.
-      splits.add(new DataDrivenDBInputFormat.DataDrivenDBInputSplit(
-          colName + " IS NULL", colName + " IS NULL"));
-      return splits;
-    }
-
-    boolean minVal = results.getBoolean(1);
-    boolean maxVal = results.getBoolean(2);
-
-    // Use one or two splits.
-    if (!minVal) {
-      splits.add(new DataDrivenDBInputFormat.DataDrivenDBInputSplit(
-          colName + " = FALSE", colName + " = FALSE"));
-    }
-
-    if (maxVal) {
-      splits.add(new DataDrivenDBInputFormat.DataDrivenDBInputSplit(
-          colName + " = TRUE", colName + " = TRUE"));
-    }
-
-    if (results.getString(1) == null || results.getString(2) == null) {
-      // Include a null value.
-      splits.add(new DataDrivenDBInputFormat.DataDrivenDBInputSplit(
-          colName + " IS NULL", colName + " IS NULL"));
-    }
+public class BooleanSplitter
+  extends org.apache.sqoop.mapreduce.db.BooleanSplitter {
 
-    return splits;
-  }
 }

Modified: incubator/sqoop/trunk/src/java/com/cloudera/sqoop/mapreduce/db/DBConfiguration.java
URL: http://svn.apache.org/viewvc/incubator/sqoop/trunk/src/java/com/cloudera/sqoop/mapreduce/db/DBConfiguration.java?rev=1190441&r1=1190440&r2=1190441&view=diff
==============================================================================
--- incubator/sqoop/trunk/src/java/com/cloudera/sqoop/mapreduce/db/DBConfiguration.java (original)
+++ incubator/sqoop/trunk/src/java/com/cloudera/sqoop/mapreduce/db/DBConfiguration.java Fri Oct 28 16:50:39 2011
@@ -1,6 +1,4 @@
 /**
- * Copyright 2011 The Apache Software Foundation
- *
  * Licensed to the Apache Software Foundation (ASF) under one
  * or more contributor license agreements.  See the NOTICE file
  * distributed with this work for additional information
@@ -17,17 +15,9 @@
  * See the License for the specific language governing permissions and
  * limitations under the License.
  */
-
 package com.cloudera.sqoop.mapreduce.db;
 
-import java.sql.Connection;
-import java.sql.DriverManager;
-import java.sql.SQLException;
-
 import org.apache.hadoop.conf.Configuration;
-import org.apache.hadoop.mapreduce.lib.db.DBWritable;
-
-import com.cloudera.sqoop.mapreduce.db.DBInputFormat.NullDBWritable;
 
 /**
  * A container for configuration property names for jobs with DB input/output.
@@ -42,67 +32,76 @@ import com.cloudera.sqoop.mapreduce.db.D
  * @see DBInputFormat#setInput(Job, Class, String, String)
  * @see DBInputFormat#setInput(Job, Class, String, String, String, String...)
  * @see DBOutputFormat#setOutput(Job, String, String...)
+ *
+ * @deprecated use org.apache.sqoop.mapreduce.db.DBConfiguration instead.
+ * @see org.apache.sqoop.mapreduce.db.DBConfiguration
  */
-public class DBConfiguration {
+public class DBConfiguration
+  extends org.apache.sqoop.mapreduce.db.DBConfiguration {
 
   /** The JDBC Driver class name. */
   public static final String DRIVER_CLASS_PROPERTY =
-    "mapreduce.jdbc.driver.class";
+      org.apache.sqoop.mapreduce.db.DBConfiguration.DRIVER_CLASS_PROPERTY;
 
   /** JDBC Database access URL. */
-  public static final String URL_PROPERTY = "mapreduce.jdbc.url";
+  public static final String URL_PROPERTY =
+      org.apache.sqoop.mapreduce.db.DBConfiguration.URL_PROPERTY;
 
   /** User name to access the database. */
-  public static final String USERNAME_PROPERTY = "mapreduce.jdbc.username";
+  public static final String USERNAME_PROPERTY =
+      org.apache.sqoop.mapreduce.db.DBConfiguration.USERNAME_PROPERTY;
 
   /** Password to access the database. */
-  public static final String PASSWORD_PROPERTY = "mapreduce.jdbc.password";
+  public static final String PASSWORD_PROPERTY =
+      org.apache.sqoop.mapreduce.db.DBConfiguration.PASSWORD_PROPERTY;
 
   /** Fetch size. */
-  public static final String FETCH_SIZE = "mapreduce.jdbc.fetchsize";
+  public static final String FETCH_SIZE =
+      org.apache.sqoop.mapreduce.db.DBConfiguration.FETCH_SIZE;
 
   /** Input table name. */
   public static final String INPUT_TABLE_NAME_PROPERTY =
-    "mapreduce.jdbc.input.table.name";
+      org.apache.sqoop.mapreduce.db.DBConfiguration.INPUT_TABLE_NAME_PROPERTY;
 
   /** Field names in the Input table. */
   public static final String INPUT_FIELD_NAMES_PROPERTY =
-    "mapreduce.jdbc.input.field.names";
+      org.apache.sqoop.mapreduce.db.DBConfiguration.INPUT_FIELD_NAMES_PROPERTY;
 
   /** WHERE clause in the input SELECT statement. */
   public static final String INPUT_CONDITIONS_PROPERTY =
-    "mapreduce.jdbc.input.conditions";
+      org.apache.sqoop.mapreduce.db.DBConfiguration.INPUT_CONDITIONS_PROPERTY;
 
   /** ORDER BY clause in the input SELECT statement. */
   public static final String INPUT_ORDER_BY_PROPERTY =
-    "mapreduce.jdbc.input.orderby";
+      org.apache.sqoop.mapreduce.db.DBConfiguration.INPUT_ORDER_BY_PROPERTY;
 
   /** Whole input query, exluding LIMIT...OFFSET. */
-  public static final String INPUT_QUERY = "mapreduce.jdbc.input.query";
+  public static final String INPUT_QUERY =
+      org.apache.sqoop.mapreduce.db.DBConfiguration.INPUT_QUERY;
 
   /** Input query to get the count of records. */
   public static final String INPUT_COUNT_QUERY =
-    "mapreduce.jdbc.input.count.query";
+      org.apache.sqoop.mapreduce.db.DBConfiguration.INPUT_COUNT_QUERY;
 
   /** Input query to get the max and min values of the jdbc.input.query. */
   public static final String INPUT_BOUNDING_QUERY =
-      "mapred.jdbc.input.bounding.query";
+      org.apache.sqoop.mapreduce.db.DBConfiguration.INPUT_BOUNDING_QUERY;
 
   /** Class name implementing DBWritable which will hold input tuples. */
   public static final String INPUT_CLASS_PROPERTY =
-    "mapreduce.jdbc.input.class";
+      org.apache.sqoop.mapreduce.db.DBConfiguration.INPUT_CLASS_PROPERTY;
 
   /** Output table name. */
   public static final String OUTPUT_TABLE_NAME_PROPERTY =
-    "mapreduce.jdbc.output.table.name";
+      org.apache.sqoop.mapreduce.db.DBConfiguration.OUTPUT_TABLE_NAME_PROPERTY;
 
   /** Field names in the Output table. */
   public static final String OUTPUT_FIELD_NAMES_PROPERTY =
-    "mapreduce.jdbc.output.field.names";
+      org.apache.sqoop.mapreduce.db.DBConfiguration.OUTPUT_FIELD_NAMES_PROPERTY;
 
   /** Number of fields in the Output table. */
   public static final String OUTPUT_FIELD_COUNT_PROPERTY =
-    "mapreduce.jdbc.output.field.count";
+      org.apache.sqoop.mapreduce.db.DBConfiguration.OUTPUT_FIELD_COUNT_PROPERTY;
 
   /**
    * Sets the DB access related fields in the {@link Configuration}.
@@ -116,17 +115,8 @@ public class DBConfiguration {
   public static void configureDB(Configuration conf, String driverClass,
       String dbUrl, String userName, String passwd, Integer fetchSize) {
 
-    conf.set(DRIVER_CLASS_PROPERTY, driverClass);
-    conf.set(URL_PROPERTY, dbUrl);
-    if (userName != null) {
-      conf.set(USERNAME_PROPERTY, userName);
-    }
-    if (passwd != null) {
-      conf.set(PASSWORD_PROPERTY, passwd);
-    }
-    if (fetchSize != null) {
-      conf.setInt(FETCH_SIZE, fetchSize);
-    }
+    org.apache.sqoop.mapreduce.db.DBConfiguration.configureDB(
+        conf, driverClass, dbUrl, userName, passwd, fetchSize);
   }
 
   /**
@@ -138,7 +128,8 @@ public class DBConfiguration {
    */
   public static void configureDB(Configuration job, String driverClass,
       String dbUrl, Integer fetchSize) {
-    configureDB(job, driverClass, dbUrl, null, null, fetchSize);
+    org.apache.sqoop.mapreduce.db.DBConfiguration.configureDB(job, driverClass,
+        dbUrl, fetchSize);
   }
 
   /**
@@ -151,7 +142,8 @@ public class DBConfiguration {
    */
   public static void configureDB(Configuration conf, String driverClass,
       String dbUrl, String userName, String passwd) {
-    configureDB(conf, driverClass, dbUrl, userName, passwd, null);
+    org.apache.sqoop.mapreduce.db.DBConfiguration.configureDB(conf, driverClass,
+        dbUrl, userName, passwd);
   }
 
   /**
@@ -162,151 +154,12 @@ public class DBConfiguration {
    */
   public static void configureDB(Configuration job, String driverClass,
       String dbUrl) {
-    configureDB(job, driverClass, dbUrl, null);
+    org.apache.sqoop.mapreduce.db.DBConfiguration.configureDB(job, driverClass,
+        dbUrl);
   }
 
-  private Configuration conf;
-
   public DBConfiguration(Configuration job) {
-    this.conf = job;
-  }
-
-  /** Returns a connection object to the DB.
-   * @throws ClassNotFoundException
-   * @throws SQLException */
-  public Connection getConnection()
-      throws ClassNotFoundException, SQLException {
-
-    Class.forName(conf.get(DBConfiguration.DRIVER_CLASS_PROPERTY));
-
-    if(conf.get(DBConfiguration.USERNAME_PROPERTY) == null) {
-      return DriverManager.getConnection(
-               conf.get(DBConfiguration.URL_PROPERTY));
-    } else {
-      return DriverManager.getConnection(
-          conf.get(DBConfiguration.URL_PROPERTY),
-          conf.get(DBConfiguration.USERNAME_PROPERTY),
-          conf.get(DBConfiguration.PASSWORD_PROPERTY));
-    }
-  }
-
-  public Configuration getConf() {
-    return conf;
-  }
-
-  public Integer getFetchSize() {
-    if (conf.get(DBConfiguration.FETCH_SIZE) == null) {
-      return null;
-    }
-    return conf.getInt(DBConfiguration.FETCH_SIZE, 0);
-  }
-
-  public void setFetchSize(Integer fetchSize) {
-    if (fetchSize != null) {
-      conf.setInt(DBConfiguration.FETCH_SIZE, fetchSize);
-    } else {
-      conf.set(FETCH_SIZE, null);
-    }
-  }
-  public String getInputTableName() {
-    return conf.get(DBConfiguration.INPUT_TABLE_NAME_PROPERTY);
-  }
-
-  public void setInputTableName(String tableName) {
-    conf.set(DBConfiguration.INPUT_TABLE_NAME_PROPERTY, tableName);
-  }
-
-  public String[] getInputFieldNames() {
-    return conf.getStrings(DBConfiguration.INPUT_FIELD_NAMES_PROPERTY);
-  }
-
-  public void setInputFieldNames(String... fieldNames) {
-    conf.setStrings(DBConfiguration.INPUT_FIELD_NAMES_PROPERTY, fieldNames);
-  }
-
-  public String getInputConditions() {
-    return conf.get(DBConfiguration.INPUT_CONDITIONS_PROPERTY);
-  }
-
-  public void setInputConditions(String conditions) {
-    if (conditions != null && conditions.length() > 0) {
-      conf.set(DBConfiguration.INPUT_CONDITIONS_PROPERTY, conditions);
-    }
-  }
-
-  public String getInputOrderBy() {
-    return conf.get(DBConfiguration.INPUT_ORDER_BY_PROPERTY);
-  }
-
-  public void setInputOrderBy(String orderby) {
-    if(orderby != null && orderby.length() >0) {
-      conf.set(DBConfiguration.INPUT_ORDER_BY_PROPERTY, orderby);
-    }
+    super(job);
   }
-
-  public String getInputQuery() {
-    return conf.get(DBConfiguration.INPUT_QUERY);
-  }
-
-  public void setInputQuery(String query) {
-    if(query != null && query.length() >0) {
-      conf.set(DBConfiguration.INPUT_QUERY, query);
-    }
-  }
-
-  public String getInputCountQuery() {
-    return conf.get(DBConfiguration.INPUT_COUNT_QUERY);
-  }
-
-  public void setInputCountQuery(String query) {
-    if(query != null && query.length() > 0) {
-      conf.set(DBConfiguration.INPUT_COUNT_QUERY, query);
-    }
-  }
-
-  public void setInputBoundingQuery(String query) {
-    if (query != null && query.length() > 0) {
-      conf.set(DBConfiguration.INPUT_BOUNDING_QUERY, query);
-    }
-  }
-
-  public String getInputBoundingQuery() {
-    return conf.get(DBConfiguration.INPUT_BOUNDING_QUERY);
-  }
-
-  public Class<?> getInputClass() {
-    return conf.getClass(DBConfiguration.INPUT_CLASS_PROPERTY,
-                         NullDBWritable.class);
-  }
-
-  public void setInputClass(Class<? extends DBWritable> inputClass) {
-    conf.setClass(DBConfiguration.INPUT_CLASS_PROPERTY, inputClass,
-                  DBWritable.class);
-  }
-
-  public String getOutputTableName() {
-    return conf.get(DBConfiguration.OUTPUT_TABLE_NAME_PROPERTY);
-  }
-
-  public void setOutputTableName(String tableName) {
-    conf.set(DBConfiguration.OUTPUT_TABLE_NAME_PROPERTY, tableName);
-  }
-
-  public String[] getOutputFieldNames() {
-    return conf.getStrings(DBConfiguration.OUTPUT_FIELD_NAMES_PROPERTY);
-  }
-
-  public void setOutputFieldNames(String... fieldNames) {
-    conf.setStrings(DBConfiguration.OUTPUT_FIELD_NAMES_PROPERTY, fieldNames);
-  }
-
-  public void setOutputFieldCount(int fieldCount) {
-    conf.setInt(DBConfiguration.OUTPUT_FIELD_COUNT_PROPERTY, fieldCount);
-  }
-
-  public int getOutputFieldCount() {
-    return conf.getInt(OUTPUT_FIELD_COUNT_PROPERTY, 0);
-  }
-
 }
 

Modified: incubator/sqoop/trunk/src/java/com/cloudera/sqoop/mapreduce/db/DBInputFormat.java
URL: http://svn.apache.org/viewvc/incubator/sqoop/trunk/src/java/com/cloudera/sqoop/mapreduce/db/DBInputFormat.java?rev=1190441&r1=1190440&r2=1190441&view=diff
==============================================================================
--- incubator/sqoop/trunk/src/java/com/cloudera/sqoop/mapreduce/db/DBInputFormat.java (original)
+++ incubator/sqoop/trunk/src/java/com/cloudera/sqoop/mapreduce/db/DBInputFormat.java Fri Oct 28 16:50:39 2011
@@ -1,6 +1,4 @@
 /**
- * Copyright 2011 The Apache Software Foundation
- *
  * Licensed to the Apache Software Foundation (ASF) under one
  * or more contributor license agreements.  See the NOTICE file
  * distributed with this work for additional information
@@ -17,34 +15,9 @@
  * See the License for the specific language governing permissions and
  * limitations under the License.
  */
-
 package com.cloudera.sqoop.mapreduce.db;
 
-import java.io.DataInput;
-import java.io.DataOutput;
-import java.io.IOException;
-import java.sql.Connection;
-import java.sql.DatabaseMetaData;
-import java.sql.PreparedStatement;
-import java.sql.ResultSet;
-import java.sql.SQLException;
-import java.sql.Statement;
-import java.util.ArrayList;
-import java.util.List;
-
-import org.apache.hadoop.io.LongWritable;
-import org.apache.hadoop.io.Writable;
-import org.apache.hadoop.mapreduce.InputFormat;
-import org.apache.hadoop.mapreduce.InputSplit;
-import org.apache.hadoop.mapreduce.Job;
-import org.apache.hadoop.mapreduce.JobContext;
-import org.apache.hadoop.mapreduce.RecordReader;
-import org.apache.hadoop.mapreduce.TaskAttemptContext;
 import org.apache.hadoop.mapreduce.lib.db.DBWritable;
-import org.apache.hadoop.conf.Configurable;
-import org.apache.hadoop.conf.Configuration;
-
-import com.cloudera.sqoop.config.ConfigurationHelper;
 
 /**
  * A InputFormat that reads input data from an SQL table.
@@ -54,38 +27,38 @@ import com.cloudera.sqoop.config.Configu
  *
  * The SQL query, and input class can be using one of the two
  * setInput methods.
+ *
+ * @deprecated use org.apache.sqoop.mapreduce.db.DBInputFormat instead.
+ * @see org.apache.sqoop.mapreduce.db.DBInputFormat
  */
 public class DBInputFormat<T extends DBWritable>
-    extends InputFormat<LongWritable, T> implements Configurable {
-
-  private String dbProductName = "DEFAULT";
+    extends org.apache.sqoop.mapreduce.db.DBInputFormat<T> {
 
   /**
    * A Class that does nothing, implementing DBWritable.
+   * @deprecated use org.apache.sqoop.mapreduce.db.DBInputFormat.NullDBWritable
+   *   instead.
+   * @see org.apache.sqoop.mapreduce.db.DBInputFormat.NullDBWritable
    */
-  public static class NullDBWritable implements DBWritable, Writable {
-    @Override
-    public void readFields(DataInput in) throws IOException { }
-    @Override
-    public void readFields(ResultSet arg0) throws SQLException { }
-    @Override
-    public void write(DataOutput out) throws IOException { }
-    @Override
-    public void write(PreparedStatement arg0) throws SQLException { }
+  public static class NullDBWritable
+    extends org.apache.sqoop.mapreduce.db.DBInputFormat.NullDBWritable {
   }
 
   /**
    * A InputSplit that spans a set of rows.
+   *
+   * @deprecated use org.apache.sqoop.mapreduce.db.DBInputFormat.DBInputSplit
+   *   instead.
+   * @see org.apache.sqoop.mapreduce.db.DBInputFormat.DBInputSplit
    */
-  public static class DBInputSplit extends InputSplit implements Writable {
-
-    private long end = 0;
-    private long start = 0;
+  public static class DBInputSplit extends
+    org.apache.sqoop.mapreduce.db.DBInputFormat.DBInputSplit {
 
     /**
      * Default Constructor.
      */
     public DBInputSplit() {
+      super();
     }
 
     /**
@@ -94,266 +67,7 @@ public class DBInputFormat<T extends DBW
      * @param end the index of the last row to select
      */
     public DBInputSplit(long start, long end) {
-      this.start = start;
-      this.end = end;
-    }
-
-    @Override
-    /** {@inheritDoc} */
-    public String[] getLocations() throws IOException {
-      // TODO Add a layer to enable SQL "sharding" and support locality
-      return new String[] {};
-    }
-
-    /**
-     * @return The index of the first row to select
-     */
-    public long getStart() {
-      return start;
-    }
-
-    /**
-     * @return The index of the last row to select
-     */
-    public long getEnd() {
-      return end;
-    }
-
-    /**
-     * @return The total row count in this split
-     */
-    public long getLength() throws IOException {
-      return end - start;
-    }
-
-    @Override
-    /** {@inheritDoc} */
-    public void readFields(DataInput input) throws IOException {
-      start = input.readLong();
-      end = input.readLong();
-    }
-
-    @Override
-    /** {@inheritDoc} */
-    public void write(DataOutput output) throws IOException {
-      output.writeLong(start);
-      output.writeLong(end);
-    }
-  }
-
-  private String conditions;
-
-  private Connection connection;
-
-  private String tableName;
-
-  private String[] fieldNames;
-
-  private DBConfiguration dbConf;
-
-  @Override
-  /** {@inheritDoc} */
-  public void setConf(Configuration conf) {
-
-    dbConf = new DBConfiguration(conf);
-
-    try {
-      getConnection();
-
-      DatabaseMetaData dbMeta = connection.getMetaData();
-      this.dbProductName = dbMeta.getDatabaseProductName().toUpperCase();
-    } catch (Exception ex) {
-      throw new RuntimeException(ex);
-    }
-
-    tableName = dbConf.getInputTableName();
-    fieldNames = dbConf.getInputFieldNames();
-    conditions = dbConf.getInputConditions();
-  }
-
-  public Configuration getConf() {
-    return dbConf.getConf();
-  }
-
-  public DBConfiguration getDBConf() {
-    return dbConf;
-  }
-
-  public Connection getConnection() {
-    try {
-      if (null == this.connection) {
-        // The connection was closed; reinstantiate it.
-        this.connection = dbConf.getConnection();
-        this.connection.setAutoCommit(false);
-        this.connection.setTransactionIsolation(
-            Connection.TRANSACTION_READ_COMMITTED);
-      }
-    } catch (Exception e) {
-      throw new RuntimeException(e);
+      super(start, end);
     }
-    return connection;
-  }
-
-  public String getDBProductName() {
-    return dbProductName;
-  }
-
-  protected RecordReader<LongWritable, T> createDBRecordReader(
-      DBInputSplit split, Configuration conf) throws IOException {
-
-    @SuppressWarnings("unchecked")
-    Class<T> inputClass = (Class<T>) (dbConf.getInputClass());
-    try {
-      // use database product name to determine appropriate record reader.
-      if (dbProductName.startsWith("ORACLE")) {
-        // use Oracle-specific db reader.
-        return new OracleDBRecordReader<T>(split, inputClass,
-            conf, getConnection(), getDBConf(), conditions, fieldNames,
-            tableName);
-      } else {
-        // Generic reader.
-        return new DBRecordReader<T>(split, inputClass,
-            conf, getConnection(), getDBConf(), conditions, fieldNames,
-            tableName);
-      }
-    } catch (SQLException ex) {
-      throw new IOException(ex);
-    }
-  }
-
-  @Override
-  /** {@inheritDoc} */
-  public RecordReader<LongWritable, T> createRecordReader(InputSplit split,
-      TaskAttemptContext context) throws IOException, InterruptedException {
-
-    return createDBRecordReader((DBInputSplit) split,
-        context.getConfiguration());
-  }
-
-  /** {@inheritDoc} */
-  @Override
-  public List<InputSplit> getSplits(JobContext job) throws IOException {
-
-    ResultSet results = null;
-    Statement statement = null;
-    try {
-      statement = connection.createStatement();
-
-      results = statement.executeQuery(getCountQuery());
-      results.next();
-
-      long count = results.getLong(1);
-      int chunks = ConfigurationHelper.getJobNumMaps(job);
-      long chunkSize = (count / chunks);
-
-      results.close();
-      statement.close();
-
-      List<InputSplit> splits = new ArrayList<InputSplit>();
-
-      // Split the rows into n-number of chunks and adjust the last chunk
-      // accordingly
-      for (int i = 0; i < chunks; i++) {
-        DBInputSplit split;
-
-        if ((i + 1) == chunks) {
-          split = new DBInputSplit(i * chunkSize, count);
-        } else {
-          split = new DBInputSplit(i * chunkSize, (i * chunkSize)
-              + chunkSize);
-        }
-
-        splits.add(split);
-      }
-
-      connection.commit();
-      return splits;
-    } catch (SQLException e) {
-      throw new IOException("Got SQLException", e);
-    } finally {
-      try {
-        if (results != null) { results.close(); }
-      } catch (SQLException e1) { /* ignored */ }
-      try {
-        if (statement != null) { statement.close(); }
-      } catch (SQLException e1) { /* ignored */ }
-
-      closeConnection();
-    }
-  }
-
-  /** Returns the query for getting the total number of rows,
-   * subclasses can override this for custom behaviour.*/
-  protected String getCountQuery() {
-
-    if(dbConf.getInputCountQuery() != null) {
-      return dbConf.getInputCountQuery();
-    }
-
-    StringBuilder query = new StringBuilder();
-    query.append("SELECT COUNT(*) FROM " + tableName);
-
-    if (conditions != null && conditions.length() > 0) {
-      query.append(" WHERE " + conditions);
-    }
-    return query.toString();
-  }
-
-  /**
-   * Initializes the map-part of the job with the appropriate input settings.
-   *
-   * @param job The map-reduce job
-   * @param inputClass the class object implementing DBWritable, which is the
-   * Java object holding tuple fields.
-   * @param tableName The table to read data from
-   * @param conditions The condition which to select data with,
-   * eg. '(updated &gt; 20070101 AND length &gt; 0)'
-   * @param orderBy the fieldNames in the orderBy clause.
-   * @param fieldNames The field names in the table
-   * @see #setInput(Job, Class, String, String)
-   */
-  public static void setInput(Job job,
-      Class<? extends DBWritable> inputClass,
-      String tableName, String conditions,
-      String orderBy, String... fieldNames) {
-    job.setInputFormatClass(DBInputFormat.class);
-    DBConfiguration dbConf = new DBConfiguration(job.getConfiguration());
-    dbConf.setInputClass(inputClass);
-    dbConf.setInputTableName(tableName);
-    dbConf.setInputFieldNames(fieldNames);
-    dbConf.setInputConditions(conditions);
-    dbConf.setInputOrderBy(orderBy);
-  }
-
-  /**
-   * Initializes the map-part of the job with the appropriate input settings.
-   *
-   * @param job The map-reduce job
-   * @param inputClass the class object implementing DBWritable, which is the
-   * Java object holding tuple fields.
-   * @param inputQuery the input query to select fields. Example :
-   * "SELECT f1, f2, f3 FROM Mytable ORDER BY f1"
-   * @param inputCountQuery the input query that returns
-   * the number of records in the table.
-   * Example : "SELECT COUNT(f1) FROM Mytable"
-   * @see #setInput(Job, Class, String, String, String, String...)
-   */
-  public static void setInput(Job job,
-      Class<? extends DBWritable> inputClass,
-      String inputQuery, String inputCountQuery) {
-    job.setInputFormatClass(DBInputFormat.class);
-    DBConfiguration dbConf = new DBConfiguration(job.getConfiguration());
-    dbConf.setInputClass(inputClass);
-    dbConf.setInputQuery(inputQuery);
-    dbConf.setInputCountQuery(inputCountQuery);
-  }
-
-  protected void closeConnection() {
-    try {
-      if (null != this.connection) {
-        this.connection.close();
-        this.connection = null;
-      }
-    } catch (SQLException sqlE) { /* ignore exception on close. */ }
   }
 }

Modified: incubator/sqoop/trunk/src/java/com/cloudera/sqoop/mapreduce/db/DBOutputFormat.java
URL: http://svn.apache.org/viewvc/incubator/sqoop/trunk/src/java/com/cloudera/sqoop/mapreduce/db/DBOutputFormat.java?rev=1190441&r1=1190440&r2=1190441&view=diff
==============================================================================
--- incubator/sqoop/trunk/src/java/com/cloudera/sqoop/mapreduce/db/DBOutputFormat.java (original)
+++ incubator/sqoop/trunk/src/java/com/cloudera/sqoop/mapreduce/db/DBOutputFormat.java Fri Oct 28 16:50:39 2011
@@ -1,6 +1,4 @@
 /**
- * Copyright 2011 The Apache Software Foundation
- *
  * Licensed to the Apache Software Foundation (ASF) under one
  * or more contributor license agreements.  See the NOTICE file
  * distributed with this work for additional information
@@ -17,28 +15,14 @@
  * See the License for the specific language governing permissions and
  * limitations under the License.
  */
-
 package com.cloudera.sqoop.mapreduce.db;
 
-import java.io.IOException;
 import java.sql.Connection;
 import java.sql.PreparedStatement;
 import java.sql.SQLException;
 
-import org.apache.commons.logging.Log;
-import org.apache.commons.logging.LogFactory;
-import org.apache.hadoop.mapreduce.Job;
-import org.apache.hadoop.mapreduce.JobContext;
-import org.apache.hadoop.mapreduce.OutputCommitter;
-import org.apache.hadoop.mapreduce.OutputFormat;
 import org.apache.hadoop.mapreduce.RecordWriter;
-import org.apache.hadoop.mapreduce.TaskAttemptContext;
 import org.apache.hadoop.mapreduce.lib.db.DBWritable;
-import org.apache.hadoop.mapreduce.lib.output.FileOutputCommitter;
-import org.apache.hadoop.mapreduce.lib.output.FileOutputFormat;
-import org.apache.hadoop.util.StringUtils;
-
-import com.cloudera.sqoop.config.ConfigurationHelper;
 
 /**
  * A OutputFormat that sends the reduce output to a SQL table.
@@ -47,191 +31,29 @@ import com.cloudera.sqoop.config.Configu
  * key has a type extending DBWritable. Returned {@link RecordWriter}
  * writes <b>only the key</b> to the database with a batch SQL query.
  *
+ * @deprecated use org.apache.sqoop.mapreduce.db.DBoutputFormat instead.
+ * @see org.apache.sqoop.mapreduce.db.DBOutputFormat
  */
 public class DBOutputFormat<K extends DBWritable, V>
-    extends OutputFormat<K, V> {
-
-  private static final Log LOG = LogFactory.getLog(DBOutputFormat.class);
-  public void checkOutputSpecs(JobContext context)
-      throws IOException, InterruptedException {}
-
-  public OutputCommitter getOutputCommitter(TaskAttemptContext context)
-      throws IOException, InterruptedException {
-    return new FileOutputCommitter(FileOutputFormat.getOutputPath(context),
-                                   context);
-  }
+    extends org.apache.sqoop.mapreduce.db.DBOutputFormat<K, V> {
 
   /**
    * A RecordWriter that writes the reduce output to a SQL table.
-   */
-  public class DBRecordWriter
-      extends RecordWriter<K, V> {
-
-    private Connection connection;
-    private PreparedStatement statement;
-
-    public DBRecordWriter() throws SQLException {
-    }
-
-    public DBRecordWriter(Connection connection
-        , PreparedStatement statement) throws SQLException {
-      this.connection = connection;
-      this.statement = statement;
-      this.connection.setAutoCommit(false);
-    }
-
-    public Connection getConnection() {
-      return connection;
-    }
-
-    public PreparedStatement getStatement() {
-      return statement;
-    }
-
-    @Override
-    /** {@inheritDoc} */
-    public void close(TaskAttemptContext context) throws IOException {
-      try {
-        statement.executeBatch();
-        connection.commit();
-      } catch (SQLException e) {
-        try {
-          connection.rollback();
-        } catch (SQLException ex) {
-          LOG.warn(StringUtils.stringifyException(ex));
-        }
-        throw new IOException(e);
-      } finally {
-        try {
-          statement.close();
-          connection.close();
-        } catch (SQLException ex) {
-          LOG.error("Unable to close connection", ex);
-        }
-      }
-    }
-
-    @Override
-    /** {@inheritDoc} */
-    public void write(K key, V value) throws IOException {
-      try {
-        key.write(statement);
-        statement.addBatch();
-      } catch (SQLException e) {
-        LOG.error("Exception encountered", e);
-      }
-    }
-  }
-
-  /**
-   * Constructs the query used as the prepared statement to insert data.
    *
-   * @param table
-   *          the table to insert into
-   * @param fieldNames
-   *          the fields to insert into. If field names are unknown, supply an
-   *          array of nulls.
+   * @deprecated use
+   *   org.apache.sqoop.mapreduce.db.DBOutputFormat.DBRecordWriter instead.
+   * @see org.apache.sqoop.mapreduce.db.DBOutputFormat.DBRecordWriter
    */
-  public String constructQuery(String table, String[] fieldNames) {
-    if(fieldNames == null) {
-      throw new IllegalArgumentException("Field names may not be null");
-    }
+  public static class DBRecordWriter<K extends DBWritable, V> extends
+    org.apache.sqoop.mapreduce.db.DBOutputFormat.DBRecordWriter<K, V> {
 
-    StringBuilder query = new StringBuilder();
-    query.append("INSERT INTO ").append(table);
-
-    if (fieldNames.length > 0 && fieldNames[0] != null) {
-      query.append(" (");
-      for (int i = 0; i < fieldNames.length; i++) {
-        query.append(fieldNames[i]);
-        if (i != fieldNames.length - 1) {
-          query.append(",");
-        }
-      }
-      query.append(")");
-    }
-    query.append(" VALUES (");
-
-    for (int i = 0; i < fieldNames.length; i++) {
-      query.append("?");
-      if(i != fieldNames.length - 1) {
-        query.append(",");
-      }
-    }
-    query.append(");");
-
-    return query.toString();
-  }
-
-  @Override
-  /** {@inheritDoc} */
-  public RecordWriter<K, V> getRecordWriter(TaskAttemptContext context)
-      throws IOException {
-    DBConfiguration dbConf = new DBConfiguration(context.getConfiguration());
-    String tableName = dbConf.getOutputTableName();
-    String[] fieldNames = dbConf.getOutputFieldNames();
-
-    if(fieldNames == null) {
-      fieldNames = new String[dbConf.getOutputFieldCount()];
-    }
-
-    try {
-      Connection connection = dbConf.getConnection();
-      PreparedStatement statement = null;
-
-      statement = connection.prepareStatement(
-                    constructQuery(tableName, fieldNames));
-      return new DBRecordWriter(connection, statement);
-    } catch (Exception ex) {
-      throw new IOException(ex);
+    public DBRecordWriter() throws SQLException {
+      super();
     }
-  }
 
-  /**
-   * Initializes the reduce-part of the job with
-   * the appropriate output settings.
-   *
-   * @param job The job
-   * @param tableName The table to insert data into
-   * @param fieldNames The field names in the table.
-   */
-  public static void setOutput(Job job, String tableName,
-      String... fieldNames) throws IOException {
-    if(fieldNames.length > 0 && fieldNames[0] != null) {
-      DBConfiguration dbConf = setOutput(job, tableName);
-      dbConf.setOutputFieldNames(fieldNames);
-    } else {
-      if (fieldNames.length > 0) {
-        setOutput(job, tableName, fieldNames.length);
-      } else {
-        throw new IllegalArgumentException(
-            "Field names must be greater than 0");
-      }
+    public DBRecordWriter(Connection connection,
+        PreparedStatement statement) throws SQLException {
+      super(connection, statement);
     }
   }
-
-  /**
-   * Initializes the reduce-part of the job
-   * with the appropriate output settings.
-   *
-   * @param job The job
-   * @param tableName The table to insert data into
-   * @param fieldCount the number of fields in the table.
-   */
-  public static void setOutput(Job job, String tableName,
-      int fieldCount) throws IOException {
-    DBConfiguration dbConf = setOutput(job, tableName);
-    dbConf.setOutputFieldCount(fieldCount);
-  }
-
-  private static DBConfiguration setOutput(Job job,
-      String tableName) throws IOException {
-    job.setOutputFormatClass(DBOutputFormat.class);
-    ConfigurationHelper.setJobReduceSpeculativeExecution(job, false);
-
-    DBConfiguration dbConf = new DBConfiguration(job.getConfiguration());
-
-    dbConf.setOutputTableName(tableName);
-    return dbConf;
-  }
 }

Modified: incubator/sqoop/trunk/src/java/com/cloudera/sqoop/mapreduce/db/DBRecordReader.java
URL: http://svn.apache.org/viewvc/incubator/sqoop/trunk/src/java/com/cloudera/sqoop/mapreduce/db/DBRecordReader.java?rev=1190441&r1=1190440&r2=1190441&view=diff
==============================================================================
--- incubator/sqoop/trunk/src/java/com/cloudera/sqoop/mapreduce/db/DBRecordReader.java (original)
+++ incubator/sqoop/trunk/src/java/com/cloudera/sqoop/mapreduce/db/DBRecordReader.java Fri Oct 28 16:50:39 2011
@@ -1,6 +1,4 @@
 /**
- * Copyright 2011 The Apache Software Foundation
- *
  * Licensed to the Apache Software Foundation (ASF) under one
  * or more contributor license agreements.  See the NOTICE file
  * distributed with this work for additional information
@@ -17,63 +15,24 @@
  * See the License for the specific language governing permissions and
  * limitations under the License.
  */
-
 package com.cloudera.sqoop.mapreduce.db;
 
-import java.io.IOException;
 import java.sql.Connection;
-import java.sql.PreparedStatement;
-import java.sql.ResultSet;
 import java.sql.SQLException;
-import java.util.Arrays;
 
-import org.apache.commons.logging.Log;
-import org.apache.commons.logging.LogFactory;
-import org.apache.hadoop.io.LongWritable;
-import org.apache.hadoop.mapreduce.InputSplit;
-import org.apache.hadoop.mapreduce.RecordReader;
-import org.apache.hadoop.mapreduce.TaskAttemptContext;
-import org.apache.hadoop.mapreduce.lib.db.DBWritable;
-import org.apache.hadoop.util.ReflectionUtils;
 import org.apache.hadoop.conf.Configuration;
-
-import com.cloudera.sqoop.util.LoggingUtils;
+import org.apache.hadoop.mapreduce.lib.db.DBWritable;
 
 /**
  * A RecordReader that reads records from a SQL table.
  * Emits LongWritables containing the record number as
  * key and DBWritables as value.
+ *
+ * @deprecated use org.apache.sqoop.mapreduce.db.DBRecordReader instead.
+ * @see org.apache.sqoop.mapreduce.db.DBRecordReader
  */
 public class DBRecordReader<T extends DBWritable> extends
-    RecordReader<LongWritable, T> {
-
-  private static final Log LOG = LogFactory.getLog(DBRecordReader.class);
-
-  private ResultSet results = null;
-
-  private Class<T> inputClass;
-
-  private Configuration conf;
-
-  private DBInputFormat.DBInputSplit split;
-
-  private long pos = 0;
-
-  private LongWritable key = null;
-
-  private T value = null;
-
-  private Connection connection;
-
-  protected PreparedStatement statement;
-
-  private DBConfiguration dbConf;
-
-  private String conditions;
-
-  private String [] fieldNames;
-
-  private String tableName;
+    org.apache.sqoop.mapreduce.db.DBRecordReader<T> {
 
   /**
    * @param split The InputSplit to read data for
@@ -85,222 +44,7 @@ public class DBRecordReader<T extends DB
       Class<T> inputClass, Configuration conf, Connection conn,
       DBConfiguration dbConfig, String cond, String [] fields, String table)
       throws SQLException {
-    this.inputClass = inputClass;
-    this.split = split;
-    this.conf = conf;
-    this.connection = conn;
-    this.dbConf = dbConfig;
-    this.conditions = cond;
-    if (fields != null) {
-      this.fieldNames = Arrays.copyOf(fields, fields.length);
-    }
-    this.tableName = table;
+    super(split, inputClass, conf, conn, dbConfig, cond, fields, table);
   }
   // CHECKSTYLE:ON
-
-  protected ResultSet executeQuery(String query) throws SQLException {
-    this.statement = connection.prepareStatement(query,
-        ResultSet.TYPE_FORWARD_ONLY, ResultSet.CONCUR_READ_ONLY);
-
-    Integer fetchSize = dbConf.getFetchSize();
-    if (fetchSize != null) {
-      LOG.debug("Using fetchSize for next query: " + fetchSize);
-      statement.setFetchSize(fetchSize);
-    }
-
-    LOG.debug("Executing query: " + query);
-    return statement.executeQuery();
-  }
-
-  /** Returns the query for selecting the records,
-   * subclasses can override this for custom behaviour.*/
-  protected String getSelectQuery() {
-    StringBuilder query = new StringBuilder();
-
-    // Default codepath for MySQL, HSQLDB, etc.
-    // Relies on LIMIT/OFFSET for splits.
-    if(dbConf.getInputQuery() == null) {
-      query.append("SELECT ");
-
-      for (int i = 0; i < fieldNames.length; i++) {
-        query.append(fieldNames[i]);
-        if (i != fieldNames.length -1) {
-          query.append(", ");
-        }
-      }
-
-      query.append(" FROM ").append(tableName);
-      query.append(" AS ").append(tableName); //in hsqldb this is necessary
-      if (conditions != null && conditions.length() > 0) {
-        query.append(" WHERE (").append(conditions).append(")");
-      }
-
-      String orderBy = dbConf.getInputOrderBy();
-      if (orderBy != null && orderBy.length() > 0) {
-        query.append(" ORDER BY ").append(orderBy);
-      }
-    } else {
-      //PREBUILT QUERY
-      query.append(dbConf.getInputQuery());
-    }
-
-    try {
-      query.append(" LIMIT ").append(split.getLength());
-      query.append(" OFFSET ").append(split.getStart());
-    } catch (IOException ex) {
-      // Ignore, will not throw.
-    }
-
-    return query.toString();
-  }
-
-  @Override
-  /** {@inheritDoc} */
-  public void close() throws IOException {
-    try {
-      if (null != results) {
-        results.close();
-      }
-      if (null != statement) {
-        statement.close();
-      }
-      if (null != connection) {
-        connection.commit();
-        connection.close();
-      }
-    } catch (SQLException e) {
-      throw new IOException(e);
-    }
-  }
-
-  public void initialize(InputSplit inputSplit, TaskAttemptContext context)
-      throws IOException, InterruptedException {
-    //do nothing
-  }
-
-  @Override
-  /** {@inheritDoc} */
-  public LongWritable getCurrentKey() {
-    return key;
-  }
-
-  @Override
-  /** {@inheritDoc} */
-  public T getCurrentValue() {
-    return value;
-  }
-
-  /**
-   * @deprecated
-   */
-  @Deprecated
-  public T createValue() {
-    return ReflectionUtils.newInstance(inputClass, conf);
-  }
-
-  /**
-   * @deprecated
-   */
-  @Deprecated
-  public long getPos() throws IOException {
-    return pos;
-  }
-
-  /**
-   * @deprecated Use {@link #nextKeyValue()}
-   */
-  @Deprecated
-  public boolean next(LongWritable k, T v) throws IOException {
-    this.key = k;
-    this.value = v;
-    return nextKeyValue();
-  }
-
-  @Override
-  /** {@inheritDoc} */
-  public float getProgress() throws IOException {
-    return pos / (float)split.getLength();
-  }
-
-  @Override
-  /** {@inheritDoc} */
-  public boolean nextKeyValue() throws IOException {
-    try {
-      if (key == null) {
-        key = new LongWritable();
-      }
-      if (value == null) {
-        value = createValue();
-      }
-      if (null == this.results) {
-        // First time into this method, run the query.
-        this.results = executeQuery(getSelectQuery());
-      }
-      if (!results.next()) {
-        return false;
-      }
-
-      // Set the key field value as the output key value
-      key.set(pos + split.getStart());
-
-      value.readFields(results);
-
-      pos++;
-    } catch (SQLException e) {
-      LoggingUtils.logAll(LOG, e);
-      throw new IOException("SQLException in nextKeyValue", e);
-    }
-    return true;
-  }
-
-  /**
-   * @return true if nextKeyValue() would return false.
-   */
-  protected boolean isDone() {
-    try {
-      return this.results != null
-          && (results.isLast() || results.isAfterLast());
-    } catch (SQLException sqlE) {
-      return true;
-    }
-  }
-
-  protected DBInputFormat.DBInputSplit getSplit() {
-    return split;
-  }
-
-  protected String [] getFieldNames() {
-    return fieldNames;
-  }
-
-  protected String getTableName() {
-    return tableName;
-  }
-
-  protected String getConditions() {
-    return conditions;
-  }
-
-  protected DBConfiguration getDBConf() {
-    return dbConf;
-  }
-
-  protected Connection getConnection() {
-    return connection;
-  }
-
-  protected PreparedStatement getStatement() {
-    return statement;
-  }
-
-  protected void setStatement(PreparedStatement stmt) {
-    this.statement = stmt;
-  }
-
-  /**
-   * @return the configuration. Allows subclasses to access the configuration
-   */
-  protected Configuration getConf(){
-    return conf;
-  }
 }

Modified: incubator/sqoop/trunk/src/java/com/cloudera/sqoop/mapreduce/db/DBSplitter.java
URL: http://svn.apache.org/viewvc/incubator/sqoop/trunk/src/java/com/cloudera/sqoop/mapreduce/db/DBSplitter.java?rev=1190441&r1=1190440&r2=1190441&view=diff
==============================================================================
--- incubator/sqoop/trunk/src/java/com/cloudera/sqoop/mapreduce/db/DBSplitter.java (original)
+++ incubator/sqoop/trunk/src/java/com/cloudera/sqoop/mapreduce/db/DBSplitter.java Fri Oct 28 16:50:39 2011
@@ -1,6 +1,4 @@
 /**
- * Copyright 2011 The Apache Software Foundation
- *
  * Licensed to the Apache Software Foundation (ASF) under one
  * or more contributor license agreements.  See the NOTICE file
  * distributed with this work for additional information
@@ -17,15 +15,8 @@
  * See the License for the specific language governing permissions and
  * limitations under the License.
  */
-
 package com.cloudera.sqoop.mapreduce.db;
 
-import java.sql.ResultSet;
-import java.sql.SQLException;
-import java.util.List;
-
-import org.apache.hadoop.conf.Configuration;
-import org.apache.hadoop.mapreduce.InputSplit;
 
 /**
  * DBSplitter will generate DBInputSplits to use with DataDrivenDBInputFormat.
@@ -34,13 +25,10 @@ import org.apache.hadoop.mapreduce.Input
  * on the data-type of the column, this requires different behavior.
  * DBSplitter implementations should perform this for a data type or family
  * of data types.
+ *
+ * @deprecated use org.apache.sqoop.mapreduce.db.DBSplitter instead.
+ * @see org.apache.sqoop.mapreduce.db.DBSplitter
  */
-public interface DBSplitter {
-  /**
-   * Given a ResultSet containing one record (and already advanced to that
-   * record) with two columns (a low value, and a high value, both of the same
-   * type), determine a set of splits that span the given values.
-   */
-  List<InputSplit> split(Configuration conf, ResultSet results, String colName)
-      throws SQLException;
+public interface DBSplitter extends org.apache.sqoop.mapreduce.db.DBSplitter {
+
 }

Modified: incubator/sqoop/trunk/src/java/com/cloudera/sqoop/mapreduce/db/DataDrivenDBInputFormat.java
URL: http://svn.apache.org/viewvc/incubator/sqoop/trunk/src/java/com/cloudera/sqoop/mapreduce/db/DataDrivenDBInputFormat.java?rev=1190441&r1=1190440&r2=1190441&view=diff
==============================================================================
--- incubator/sqoop/trunk/src/java/com/cloudera/sqoop/mapreduce/db/DataDrivenDBInputFormat.java (original)
+++ incubator/sqoop/trunk/src/java/com/cloudera/sqoop/mapreduce/db/DataDrivenDBInputFormat.java Fri Oct 28 16:50:39 2011
@@ -1,6 +1,4 @@
 /**
- * Copyright 2011 The Apache Software Foundation
- *
  * Licensed to the Apache Software Foundation (ASF) under one
  * or more contributor license agreements.  See the NOTICE file
  * distributed with this work for additional information
@@ -17,67 +15,48 @@
  * See the License for the specific language governing permissions and
  * limitations under the License.
  */
-
 package com.cloudera.sqoop.mapreduce.db;
 
-import java.io.DataInput;
-import java.io.DataOutput;
-import java.io.IOException;
-import java.sql.Connection;
-import java.sql.ResultSet;
-import java.sql.SQLException;
-import java.sql.Statement;
-import java.sql.Types;
-import java.util.ArrayList;
-import java.util.List;
-
-import org.apache.commons.logging.Log;
-import org.apache.commons.logging.LogFactory;
-
-import org.apache.hadoop.io.LongWritable;
-import org.apache.hadoop.io.Text;
-import org.apache.hadoop.mapreduce.InputSplit;
+import org.apache.hadoop.conf.Configuration;
 import org.apache.hadoop.mapreduce.Job;
-import org.apache.hadoop.mapreduce.JobContext;
-import org.apache.hadoop.mapreduce.RecordReader;
 import org.apache.hadoop.mapreduce.lib.db.DBWritable;
-import org.apache.hadoop.conf.Configurable;
-import org.apache.hadoop.conf.Configuration;
-
-import com.cloudera.sqoop.config.ConfigurationHelper;
 
 /**
  * A InputFormat that reads input data from an SQL table.
  * Operates like DBInputFormat, but instead of using LIMIT and OFFSET to
  * demarcate splits, it tries to generate WHERE clauses which separate the
  * data into roughly equivalent shards.
+ *
+ * @deprecated use org.apache.sqoop.mapreduce.db.DataDrivenDBInputFormat instead
+ * @see org.apache.sqoop.mapreduce.db.DataDrivenDBInputFormat
  */
 public class DataDrivenDBInputFormat<T extends DBWritable>
-    extends DBInputFormat<T> implements Configurable {
-
-  private static final Log LOG =
-      LogFactory.getLog(DataDrivenDBInputFormat.class);
+    extends org.apache.sqoop.mapreduce.db.DataDrivenDBInputFormat<T> {
 
   /**
    * If users are providing their own query, the following string is expected
    * to appear in the WHERE clause, which will be substituted with a pair of
    * conditions on the input to allow input splits to parallelise the import.
    */
-  public static final String SUBSTITUTE_TOKEN = "$CONDITIONS";
+  public static final String SUBSTITUTE_TOKEN =
+      org.apache.sqoop.mapreduce.db.DataDrivenDBInputFormat.SUBSTITUTE_TOKEN;
 
   /**
    * A InputSplit that spans a set of rows.
+   *
+   * @deprecated use org.apache.sqoop.mapreduce.db.DataDrivenDBInputFormat.
+   *   DataDrivenDBInputSplit instead.
+   * @see org.apache.sqoop.mapreduce.db.DataDrivenDBInputFormat.
+   * 					DataDrivenDBInputSplit
    */
-  public static class DataDrivenDBInputSplit
-      extends DBInputFormat.DBInputSplit {
-
-    private String lowerBoundClause;
-    private String upperBoundClause;
+  public static class DataDrivenDBInputSplit extends
+  org.apache.sqoop.mapreduce.db.DataDrivenDBInputFormat.DataDrivenDBInputSplit {
 
     /**
      * Default Constructor.
      */
     public DataDrivenDBInputSplit() {
+      super();
     }
 
     /**
@@ -88,189 +67,10 @@ public class DataDrivenDBInputFormat<T e
      * on the 'upper' end.
      */
     public DataDrivenDBInputSplit(final String lower, final String upper) {
-      this.lowerBoundClause = lower;
-      this.upperBoundClause = upper;
-    }
-
-
-    /**
-     * @return The total row count in this split.
-     */
-    public long getLength() throws IOException {
-      return 0; // unfortunately, we don't know this.
-    }
-
-    @Override
-    /** {@inheritDoc} */
-    public void readFields(DataInput input) throws IOException {
-      this.lowerBoundClause = Text.readString(input);
-      this.upperBoundClause = Text.readString(input);
-    }
-
-    @Override
-    /** {@inheritDoc} */
-    public void write(DataOutput output) throws IOException {
-      Text.writeString(output, this.lowerBoundClause);
-      Text.writeString(output, this.upperBoundClause);
-    }
-
-    public String getLowerClause() {
-      return lowerBoundClause;
-    }
-
-    public String getUpperClause() {
-      return upperBoundClause;
+      super(lower, upper);
     }
   }
 
-  /**
-   * @return the DBSplitter implementation to use to divide the table/query
-   * into InputSplits.
-   */
-  protected DBSplitter getSplitter(int sqlDataType) {
-    switch (sqlDataType) {
-    case Types.NUMERIC:
-    case Types.DECIMAL:
-      return new BigDecimalSplitter();
-
-    case Types.BIT:
-    case Types.BOOLEAN:
-      return new BooleanSplitter();
-
-    case Types.INTEGER:
-    case Types.TINYINT:
-    case Types.SMALLINT:
-    case Types.BIGINT:
-      return new IntegerSplitter();
-
-    case Types.REAL:
-    case Types.FLOAT:
-    case Types.DOUBLE:
-      return new FloatSplitter();
-
-    case Types.CHAR:
-    case Types.VARCHAR:
-    case Types.LONGVARCHAR:
-      return new TextSplitter();
-
-    case Types.DATE:
-    case Types.TIME:
-    case Types.TIMESTAMP:
-      return new DateSplitter();
-
-    default:
-      // TODO: Support BINARY, VARBINARY, LONGVARBINARY, DISTINCT, CLOB,
-      // BLOB, ARRAY, STRUCT, REF, DATALINK, and JAVA_OBJECT.
-      return null;
-    }
-  }
-
-  @Override
-  /** {@inheritDoc} */
-  public List<InputSplit> getSplits(JobContext job) throws IOException {
-
-    int targetNumTasks = ConfigurationHelper.getJobNumMaps(job);
-    String boundaryQuery = getDBConf().getInputBoundingQuery();
-
-    // If user do not forced us to use his boundary query and we don't have to
-    // bacause there is only one mapper we will return single split that
-    // separates nothing. This can be considerably more optimal for a large
-    // table with no index.
-    if (1 == targetNumTasks
-            && (boundaryQuery == null || boundaryQuery.isEmpty())) {
-      List<InputSplit> singletonSplit = new ArrayList<InputSplit>();
-      singletonSplit.add(new DataDrivenDBInputSplit("1=1", "1=1"));
-      return singletonSplit;
-    }
-
-    ResultSet results = null;
-    Statement statement = null;
-    Connection connection = getConnection();
-    try {
-      statement = connection.createStatement();
-
-      String query = getBoundingValsQuery();
-      LOG.info("BoundingValsQuery: " + query);
-
-      results = statement.executeQuery(query);
-      results.next();
-
-      // Based on the type of the results, use a different mechanism
-      // for interpolating split points (i.e., numeric splits, text splits,
-      // dates, etc.)
-      int sqlDataType = results.getMetaData().getColumnType(1);
-      boolean isSigned = results.getMetaData().isSigned(1);
-
-      // MySQL has an unsigned integer which we need to allocate space for
-      if (sqlDataType == Types.INTEGER && !isSigned){
-          sqlDataType = Types.BIGINT;
-      }
-
-      DBSplitter splitter = getSplitter(sqlDataType);
-      if (null == splitter) {
-        throw new IOException("Unknown SQL data type: " + sqlDataType);
-      }
-
-      return splitter.split(job.getConfiguration(), results,
-          getDBConf().getInputOrderBy());
-    } catch (SQLException e) {
-      throw new IOException(e);
-    } finally {
-      // More-or-less ignore SQL exceptions here, but log in case we need it.
-      try {
-        if (null != results) {
-          results.close();
-        }
-      } catch (SQLException se) {
-        LOG.debug("SQLException closing resultset: " + se.toString());
-      }
-
-      try {
-        if (null != statement) {
-          statement.close();
-        }
-      } catch (SQLException se) {
-        LOG.debug("SQLException closing statement: " + se.toString());
-      }
-
-      try {
-        connection.commit();
-        closeConnection();
-      } catch (SQLException se) {
-        LOG.debug("SQLException committing split transaction: "
-            + se.toString());
-      }
-    }
-  }
-
-  /**
-   * @return a query which returns the minimum and maximum values for
-   * the order-by column.
-   *
-   * The min value should be in the first column, and the
-   * max value should be in the second column of the results.
-   */
-  protected String getBoundingValsQuery() {
-    // If the user has provided a query, use that instead.
-    String userQuery = getDBConf().getInputBoundingQuery();
-    if (null != userQuery) {
-      return userQuery;
-    }
-
-    // Auto-generate one based on the table name we've been provided with.
-    StringBuilder query = new StringBuilder();
-
-    String splitCol = getDBConf().getInputOrderBy();
-    query.append("SELECT MIN(").append(splitCol).append("), ");
-    query.append("MAX(").append(splitCol).append(") FROM ");
-    query.append(getDBConf().getInputTableName());
-    String conditions = getDBConf().getInputConditions();
-    if (null != conditions) {
-      query.append(" WHERE ( " + conditions + " )");
-    }
-
-    return query.toString();
-  }
 
   /** Set the user-defined bounding query to use with a user-defined query.
       This *must* include the substring "$CONDITIONS"
@@ -282,35 +82,8 @@ public class DataDrivenDBInputFormat<T e
       inside each split.
     */
   public static void setBoundingQuery(Configuration conf, String query) {
-    if (null != query) {
-      // If the user's settng a query, warn if they don't allow conditions.
-      if (query.indexOf(SUBSTITUTE_TOKEN) == -1) {
-        LOG.warn("Could not find " + SUBSTITUTE_TOKEN + " token in query: "
-            + query + "; splits may not partition data.");
-      }
-    }
-
-    conf.set(DBConfiguration.INPUT_BOUNDING_QUERY, query);
-  }
-
-  protected RecordReader<LongWritable, T> createDBRecordReader(
-      DBInputSplit split, Configuration conf) throws IOException {
-
-    DBConfiguration dbConf = getDBConf();
-    @SuppressWarnings("unchecked")
-    Class<T> inputClass = (Class<T>) (dbConf.getInputClass());
-    String dbProductName = getDBProductName();
-
-    LOG.debug("Creating db record reader for db product: " + dbProductName);
-
-    try {
-      return new DataDrivenDBRecordReader<T>(split, inputClass,
-          conf, getConnection(), dbConf, dbConf.getInputConditions(),
-          dbConf.getInputFieldNames(), dbConf.getInputTableName(),
-          dbProductName);
-    } catch (SQLException ex) {
-      throw new IOException(ex);
-    }
+    org.apache.sqoop.mapreduce.db.DataDrivenDBInputFormat.setBoundingQuery(
+        conf, query);
   }
 
   // Configuration methods override superclass to ensure that the proper
@@ -324,9 +97,8 @@ public class DataDrivenDBInputFormat<T e
       Class<? extends DBWritable> inputClass,
       String tableName, String conditions,
       String splitBy, String... fieldNames) {
-    DBInputFormat.setInput(job, inputClass, tableName, conditions,
-        splitBy, fieldNames);
-    job.setInputFormatClass(DataDrivenDBInputFormat.class);
+    org.apache.sqoop.mapreduce.db.DataDrivenDBInputFormat.setInput(
+        job, inputClass, tableName, conditions, splitBy, fieldNames);
   }
 
   /** setInput() takes a custom query and a separate "bounding query" to use
@@ -335,9 +107,7 @@ public class DataDrivenDBInputFormat<T e
   public static void setInput(Job job,
       Class<? extends DBWritable> inputClass,
       String inputQuery, String inputBoundingQuery) {
-    DBInputFormat.setInput(job, inputClass, inputQuery, "");
-    job.getConfiguration().set(DBConfiguration.INPUT_BOUNDING_QUERY,
-        inputBoundingQuery);
-    job.setInputFormatClass(DataDrivenDBInputFormat.class);
+    org.apache.sqoop.mapreduce.db.DataDrivenDBInputFormat.setInput(
+        job, inputClass, inputQuery, inputBoundingQuery);
   }
 }

Modified: incubator/sqoop/trunk/src/java/com/cloudera/sqoop/mapreduce/db/DataDrivenDBRecordReader.java
URL: http://svn.apache.org/viewvc/incubator/sqoop/trunk/src/java/com/cloudera/sqoop/mapreduce/db/DataDrivenDBRecordReader.java?rev=1190441&r1=1190440&r2=1190441&view=diff
==============================================================================
--- incubator/sqoop/trunk/src/java/com/cloudera/sqoop/mapreduce/db/DataDrivenDBRecordReader.java (original)
+++ incubator/sqoop/trunk/src/java/com/cloudera/sqoop/mapreduce/db/DataDrivenDBRecordReader.java Fri Oct 28 16:50:39 2011
@@ -1,6 +1,4 @@
 /**
- * Copyright 2011 The Apache Software Foundation
- *
  * Licensed to the Apache Software Foundation (ASF) under one
  * or more contributor license agreements.  See the NOTICE file
  * distributed with this work for additional information
@@ -17,15 +15,11 @@
  * See the License for the specific language governing permissions and
  * limitations under the License.
  */
-
 package com.cloudera.sqoop.mapreduce.db;
 
-import java.io.IOException;
 import java.sql.Connection;
 import java.sql.SQLException;
 
-import org.apache.commons.logging.Log;
-import org.apache.commons.logging.LogFactory;
 import org.apache.hadoop.conf.Configuration;
 import org.apache.hadoop.mapreduce.lib.db.DBWritable;
 
@@ -34,14 +28,13 @@ import org.apache.hadoop.mapreduce.lib.d
  * using data-driven WHERE clause splits.
  * Emits LongWritables containing the record number as
  * key and DBWritables as value.
+ *
+ * @deprecated use org.apache.sqoop.mapreduce.db.DataDrivenDBRecordReader
+ *   instead.
+ * @see org.apache.sqoop.mapreduce.db.DataDrivenDBRecordReader
  */
 public class DataDrivenDBRecordReader<T extends DBWritable>
-    extends DBRecordReader<T> {
-
-  private static final Log LOG =
-      LogFactory.getLog(DataDrivenDBRecordReader.class);
-
-  private String dbProductName; // database manufacturer string.
+    extends org.apache.sqoop.mapreduce.db.DataDrivenDBRecordReader<T> {
 
   // CHECKSTYLE:OFF
   // TODO(aaron): Refactor constructor to use fewer arguments.
@@ -53,77 +46,8 @@ public class DataDrivenDBRecordReader<T 
       Class<T> inputClass, Configuration conf, Connection conn,
       DBConfiguration dbConfig, String cond, String [] fields, String table,
       String dbProduct) throws SQLException {
-    super(split, inputClass, conf, conn, dbConfig, cond, fields, table);
-    this.dbProductName = dbProduct;
+    super(split, inputClass, conf, conn, dbConfig,
+        cond, fields, table, dbProduct);
   }
   // CHECKSTYLE:ON
-
-  @Override
-  /** {@inheritDoc} */
-  public float getProgress() throws IOException {
-    return isDone() ? 1.0f : 0.0f;
-  }
-
-  /** Returns the query for selecting the records,
-   * subclasses can override this for custom behaviour.*/
-  protected String getSelectQuery() {
-    StringBuilder query = new StringBuilder();
-    DataDrivenDBInputFormat.DataDrivenDBInputSplit dataSplit =
-        (DataDrivenDBInputFormat.DataDrivenDBInputSplit) getSplit();
-    DBConfiguration dbConf = getDBConf();
-    String [] fieldNames = getFieldNames();
-    String tableName = getTableName();
-    String conditions = getConditions();
-
-    // Build the WHERE clauses associated with the data split first.
-    // We need them in both branches of this function.
-    StringBuilder conditionClauses = new StringBuilder();
-    conditionClauses.append("( ").append(dataSplit.getLowerClause());
-    conditionClauses.append(" ) AND ( ").append(dataSplit.getUpperClause());
-    conditionClauses.append(" )");
-
-    if(dbConf.getInputQuery() == null) {
-      // We need to generate the entire query.
-      query.append("SELECT ");
-
-      for (int i = 0; i < fieldNames.length; i++) {
-        query.append(fieldNames[i]);
-        if (i != fieldNames.length -1) {
-          query.append(", ");
-        }
-      }
-
-      query.append(" FROM ").append(tableName);
-      if (!dbProductName.startsWith("ORACLE")) {
-        // Seems to be necessary for hsqldb? Oracle explicitly does *not*
-        // use this clause.
-        query.append(" AS ").append(tableName);
-      }
-      query.append(" WHERE ");
-      if (conditions != null && conditions.length() > 0) {
-        // Put the user's conditions first.
-        query.append("( ").append(conditions).append(" ) AND ");
-      }
-
-      // Now append the conditions associated with our split.
-      query.append(conditionClauses.toString());
-
-    } else {
-      // User provided the query. We replace the special token with
-      // our WHERE clause.
-      String inputQuery = dbConf.getInputQuery();
-      if (inputQuery.indexOf(DataDrivenDBInputFormat.SUBSTITUTE_TOKEN) == -1) {
-        LOG.error("Could not find the clause substitution token "
-            + DataDrivenDBInputFormat.SUBSTITUTE_TOKEN + " in the query: ["
-            + inputQuery + "]. Parallel splits may not work correctly.");
-      }
-
-      query.append(inputQuery.replace(DataDrivenDBInputFormat.SUBSTITUTE_TOKEN,
-          conditionClauses.toString()));
-    }
-
-    LOG.debug("Using query: " + query.toString());
-
-    return query.toString();
-  }
 }

Modified: incubator/sqoop/trunk/src/java/com/cloudera/sqoop/mapreduce/db/DateSplitter.java
URL: http://svn.apache.org/viewvc/incubator/sqoop/trunk/src/java/com/cloudera/sqoop/mapreduce/db/DateSplitter.java?rev=1190441&r1=1190440&r2=1190441&view=diff
==============================================================================
--- incubator/sqoop/trunk/src/java/com/cloudera/sqoop/mapreduce/db/DateSplitter.java (original)
+++ incubator/sqoop/trunk/src/java/com/cloudera/sqoop/mapreduce/db/DateSplitter.java Fri Oct 28 16:50:39 2011
@@ -1,6 +1,4 @@
 /**
- * Copyright 2011 The Apache Software Foundation
- *
  * Licensed to the Apache Software Foundation (ASF) under one
  * or more contributor license agreements.  See the NOTICE file
  * distributed with this work for additional information
@@ -17,168 +15,17 @@
  * See the License for the specific language governing permissions and
  * limitations under the License.
  */
-
 package com.cloudera.sqoop.mapreduce.db;
 
-import java.sql.ResultSet;
-import java.sql.SQLException;
-import java.sql.Types;
-import java.util.ArrayList;
-import java.util.Date;
-import java.util.List;
-
-import org.apache.commons.logging.Log;
-import org.apache.commons.logging.LogFactory;
-import org.apache.hadoop.conf.Configuration;
-import org.apache.hadoop.mapreduce.InputSplit;
-
-import com.cloudera.sqoop.config.ConfigurationHelper;
 
 /**
  * Implement DBSplitter over date/time values.
  * Make use of logic from IntegerSplitter, since date/time are just longs
  * in Java.
+ *
+ * @deprecated use org.apache.sqoop.mapreduce.db.DateSplitter instead.
+ * @see org.apache.sqoop.mapreduce.db.DateSplitter
  */
-public class DateSplitter extends IntegerSplitter {
-
-  private static final Log LOG = LogFactory.getLog(DateSplitter.class);
-
-  public List<InputSplit> split(Configuration conf, ResultSet results,
-      String colName) throws SQLException {
-
-    long minVal;
-    long maxVal;
-
-    int sqlDataType = results.getMetaData().getColumnType(1);
-    minVal = resultSetColToLong(results, 1, sqlDataType);
-    maxVal = resultSetColToLong(results, 2, sqlDataType);
-
-    String lowClausePrefix = colName + " >= ";
-    String highClausePrefix = colName + " < ";
-
-    int numSplits = ConfigurationHelper.getConfNumMaps(conf);
-    if (numSplits < 1) {
-      numSplits = 1;
-    }
-
-    if (minVal == Long.MIN_VALUE && maxVal == Long.MIN_VALUE) {
-      // The range of acceptable dates is NULL to NULL. Just create a single
-      // split.
-      List<InputSplit> splits = new ArrayList<InputSplit>();
-      splits.add(new DataDrivenDBInputFormat.DataDrivenDBInputSplit(
-          colName + " IS NULL", colName + " IS NULL"));
-      return splits;
-    }
-
-    // Gather the split point integers
-    List<Long> splitPoints = split(numSplits, minVal, maxVal);
-    List<InputSplit> splits = new ArrayList<InputSplit>();
-
-    // Turn the split points into a set of intervals.
-    long start = splitPoints.get(0);
-    Date startDate = longToDate(start, sqlDataType);
-    if (sqlDataType == Types.TIMESTAMP) {
-      // The lower bound's nanos value needs to match the actual lower-bound
-      // nanos.
-      try {
-        ((java.sql.Timestamp) startDate).setNanos(
-            results.getTimestamp(1).getNanos());
-      } catch (NullPointerException npe) {
-        // If the lower bound was NULL, we'll get an NPE; just ignore it and
-        // don't set nanos.
-      }
-    }
-
-    for (int i = 1; i < splitPoints.size(); i++) {
-      long end = splitPoints.get(i);
-      Date endDate = longToDate(end, sqlDataType);
-
-      if (i == splitPoints.size() - 1) {
-        if (sqlDataType == Types.TIMESTAMP) {
-          // The upper bound's nanos value needs to match the actual
-          // upper-bound nanos.
-          try {
-            ((java.sql.Timestamp) endDate).setNanos(
-                results.getTimestamp(2).getNanos());
-          } catch (NullPointerException npe) {
-            // If the upper bound was NULL, we'll get an NPE; just ignore it
-            // and don't set nanos.
-          }
-        }
-        // This is the last one; use a closed interval.
-        splits.add(new DataDrivenDBInputFormat.DataDrivenDBInputSplit(
-            lowClausePrefix + dateToString(startDate),
-            colName + " <= " + dateToString(endDate)));
-      } else {
-        // Normal open-interval case.
-        splits.add(new DataDrivenDBInputFormat.DataDrivenDBInputSplit(
-            lowClausePrefix + dateToString(startDate),
-            highClausePrefix + dateToString(endDate)));
-      }
-
-      start = end;
-      startDate = endDate;
-    }
-
-    if (minVal == Long.MIN_VALUE || maxVal == Long.MIN_VALUE) {
-      // Add an extra split to handle the null case that we saw.
-      splits.add(new DataDrivenDBInputFormat.DataDrivenDBInputSplit(
-          colName + " IS NULL", colName + " IS NULL"));
-    }
-
-    return splits;
-  }
-
-  /**
-      Retrieve the value from the column in a type-appropriate manner and
-      return its timestamp since the epoch. If the column is null, then return
-      Long.MIN_VALUE.  This will cause a special split to be generated for the
-      NULL case, but may also cause poorly-balanced splits if most of the
-      actual dates are positive time since the epoch, etc.
-    */
-  private long resultSetColToLong(ResultSet rs, int colNum, int sqlDataType)
-      throws SQLException {
-    try {
-      switch (sqlDataType) {
-      case Types.DATE:
-        return rs.getDate(colNum).getTime();
-      case Types.TIME:
-        return rs.getTime(colNum).getTime();
-      case Types.TIMESTAMP:
-        return rs.getTimestamp(colNum).getTime();
-      default:
-        throw new SQLException("Not a date-type field");
-      }
-    } catch (NullPointerException npe) {
-      // null column. return minimum long value.
-      LOG.warn("Encountered a NULL date in the split column. "
-          + "Splits may be poorly balanced.");
-      return Long.MIN_VALUE;
-    }
-  }
-
-  /**  Parse the long-valued timestamp into the appropriate SQL date type. */
-  private Date longToDate(long val, int sqlDataType) {
-    switch (sqlDataType) {
-    case Types.DATE:
-      return new java.sql.Date(val);
-    case Types.TIME:
-      return new java.sql.Time(val);
-    case Types.TIMESTAMP:
-      return new java.sql.Timestamp(val);
-    default: // Shouldn't ever hit this case.
-      return null;
-    }
-  }
+public class DateSplitter extends org.apache.sqoop.mapreduce.db.DateSplitter {
 
-  /**
-   * Given a Date 'd', format it as a string for use in a SQL date
-   * comparison operation.
-   * @param d the date to format.
-   * @return the string representing this date in SQL with any appropriate
-   * quotation characters, etc.
-   */
-  protected String dateToString(Date d) {
-    return "'" + d.toString() + "'";
-  }
 }

Modified: incubator/sqoop/trunk/src/java/com/cloudera/sqoop/mapreduce/db/FloatSplitter.java
URL: http://svn.apache.org/viewvc/incubator/sqoop/trunk/src/java/com/cloudera/sqoop/mapreduce/db/FloatSplitter.java?rev=1190441&r1=1190440&r2=1190441&view=diff
==============================================================================
--- incubator/sqoop/trunk/src/java/com/cloudera/sqoop/mapreduce/db/FloatSplitter.java (original)
+++ incubator/sqoop/trunk/src/java/com/cloudera/sqoop/mapreduce/db/FloatSplitter.java Fri Oct 28 16:50:39 2011
@@ -1,6 +1,4 @@
 /**
- * Copyright 2011 The Apache Software Foundation
- *
  * Licensed to the Apache Software Foundation (ASF) under one
  * or more contributor license agreements.  See the NOTICE file
  * distributed with this work for additional information
@@ -17,88 +15,16 @@
  * See the License for the specific language governing permissions and
  * limitations under the License.
  */
-
 package com.cloudera.sqoop.mapreduce.db;
 
-import java.sql.ResultSet;
-import java.sql.SQLException;
-import java.util.ArrayList;
-import java.util.List;
-
-import org.apache.commons.logging.Log;
-import org.apache.commons.logging.LogFactory;
-
-import org.apache.hadoop.conf.Configuration;
-import org.apache.hadoop.mapreduce.InputSplit;
-
-import com.cloudera.sqoop.config.ConfigurationHelper;
 
 /**
  * Implement DBSplitter over floating-point values.
+ *
+ * @deprecated use org.apache.sqoop.mapreduce.db.FloatSplitter instead.
+ * @see org.apache.sqoop.mapreduce.db.FloatSplitter
  */
-public class FloatSplitter implements DBSplitter {
-
-  private static final Log LOG = LogFactory.getLog(FloatSplitter.class);
-
-  private static final double MIN_INCREMENT = 10000 * Double.MIN_VALUE;
-
-  public List<InputSplit> split(Configuration conf, ResultSet results,
-      String colName) throws SQLException {
-
-    LOG.warn("Generating splits for a floating-point index column. Due to the");
-    LOG.warn("imprecise representation of floating-point values in Java, this");
-    LOG.warn("may result in an incomplete import.");
-    LOG.warn("You are strongly encouraged to choose an integral split column.");
-
-    List<InputSplit> splits = new ArrayList<InputSplit>();
-
-    if (results.getString(1) == null && results.getString(2) == null) {
-      // Range is null to null. Return a null split accordingly.
-      splits.add(new DataDrivenDBInputFormat.DataDrivenDBInputSplit(
-          colName + " IS NULL", colName + " IS NULL"));
-      return splits;
-    }
-
-    double minVal = results.getDouble(1);
-    double maxVal = results.getDouble(2);
-
-    // Use this as a hint. May need an extra task if the size doesn't
-    // divide cleanly.
-    int numSplits = ConfigurationHelper.getConfNumMaps(conf);
-    double splitSize = (maxVal - minVal) / (double) numSplits;
-
-    if (splitSize < MIN_INCREMENT) {
-      splitSize = MIN_INCREMENT;
-    }
-
-    String lowClausePrefix = colName + " >= ";
-    String highClausePrefix = colName + " < ";
-
-    double curLower = minVal;
-    double curUpper = curLower + splitSize;
-
-    while (curUpper < maxVal) {
-      splits.add(new DataDrivenDBInputFormat.DataDrivenDBInputSplit(
-          lowClausePrefix + Double.toString(curLower),
-          highClausePrefix + Double.toString(curUpper)));
-
-      curLower = curUpper;
-      curUpper += splitSize;
-    }
-
-    // Catch any overage and create the closed interval for the last split.
-    if (curLower <= maxVal || splits.size() == 1) {
-      splits.add(new DataDrivenDBInputFormat.DataDrivenDBInputSplit(
-          lowClausePrefix + Double.toString(curUpper),
-          colName + " <= " + Double.toString(maxVal)));
-    }
-
-    if (results.getString(1) == null || results.getString(2) == null) {
-      // At least one extrema is null; add a null split.
-      splits.add(new DataDrivenDBInputFormat.DataDrivenDBInputSplit(
-          colName + " IS NULL", colName + " IS NULL"));
-    }
+public class FloatSplitter
+  extends org.apache.sqoop.mapreduce.db.FloatSplitter {
 
-    return splits;
-  }
 }

Modified: incubator/sqoop/trunk/src/java/com/cloudera/sqoop/mapreduce/db/IntegerSplitter.java
URL: http://svn.apache.org/viewvc/incubator/sqoop/trunk/src/java/com/cloudera/sqoop/mapreduce/db/IntegerSplitter.java?rev=1190441&r1=1190440&r2=1190441&view=diff
==============================================================================
--- incubator/sqoop/trunk/src/java/com/cloudera/sqoop/mapreduce/db/IntegerSplitter.java (original)
+++ incubator/sqoop/trunk/src/java/com/cloudera/sqoop/mapreduce/db/IntegerSplitter.java Fri Oct 28 16:50:39 2011
@@ -1,6 +1,4 @@
 /**
- * Copyright 2011 The Apache Software Foundation
- *
  * Licensed to the Apache Software Foundation (ASF) under one
  * or more contributor license agreements.  See the NOTICE file
  * distributed with this work for additional information
@@ -17,133 +15,18 @@
  * See the License for the specific language governing permissions and
  * limitations under the License.
  */
-
 package com.cloudera.sqoop.mapreduce.db;
 
-import java.sql.ResultSet;
-import java.sql.SQLException;
-import java.util.ArrayList;
-import java.util.List;
-
 import org.apache.commons.logging.Log;
-import org.apache.commons.logging.LogFactory;
-import org.apache.hadoop.conf.Configuration;
-import org.apache.hadoop.mapreduce.InputSplit;
-
-import com.cloudera.sqoop.config.ConfigurationHelper;
 
 /**
  * Implement DBSplitter over integer values.
+ *
+ * @deprecated use org.apache.sqoop.mapreduce.db.IntegerSplitter instead.
+ * @see org.apache.sqoop.mapreduce.db.IntegerSplitter
  */
-public class IntegerSplitter implements DBSplitter {
+public class IntegerSplitter
+  extends org.apache.sqoop.mapreduce.db.IntegerSplitter {
   public static final Log LOG =
-    LogFactory.getLog(IntegerSplitter.class.getName());
-
-  public List<InputSplit> split(Configuration conf, ResultSet results,
-      String colName) throws SQLException {
-
-    long minVal = results.getLong(1);
-    long maxVal = results.getLong(2);
-
-    String lowClausePrefix = colName + " >= ";
-    String highClausePrefix = colName + " < ";
-
-    int numSplits = ConfigurationHelper.getConfNumMaps(conf);
-    if (numSplits < 1) {
-      numSplits = 1;
-    }
-
-    if (results.getString(1) == null && results.getString(2) == null) {
-      // Range is null to null. Return a null split accordingly.
-      List<InputSplit> splits = new ArrayList<InputSplit>();
-      splits.add(new DataDrivenDBInputFormat.DataDrivenDBInputSplit(
-          colName + " IS NULL", colName + " IS NULL"));
-      return splits;
-    }
-
-    // Get all the split points together.
-    List<Long> splitPoints = split(numSplits, minVal, maxVal);
-    if (LOG.isDebugEnabled()) {
-      LOG.debug(String.format("Splits: [%,28d to %,28d] into %d parts",
-          minVal, maxVal, numSplits));
-      for (int i = 0; i < splitPoints.size(); i++) {
-        LOG.debug(String.format("%,28d", splitPoints.get(i)));
-      }
-    }
-    List<InputSplit> splits = new ArrayList<InputSplit>();
-
-    // Turn the split points into a set of intervals.
-    long start = splitPoints.get(0);
-    for (int i = 1; i < splitPoints.size(); i++) {
-      long end = splitPoints.get(i);
-
-      if (i == splitPoints.size() - 1) {
-        // This is the last one; use a closed interval.
-        splits.add(new DataDrivenDBInputFormat.DataDrivenDBInputSplit(
-            lowClausePrefix + Long.toString(start),
-            colName + " <= " + Long.toString(end)));
-      } else {
-        // Normal open-interval case.
-        splits.add(new DataDrivenDBInputFormat.DataDrivenDBInputSplit(
-            lowClausePrefix + Long.toString(start),
-            highClausePrefix + Long.toString(end)));
-      }
-
-      start = end;
-    }
-
-    if (results.getString(1) == null || results.getString(2) == null) {
-      // At least one extrema is null; add a null split.
-      splits.add(new DataDrivenDBInputFormat.DataDrivenDBInputSplit(
-          colName + " IS NULL", colName + " IS NULL"));
-    }
-
-    return splits;
-  }
-
-  /**
-   * Returns a list of longs one element longer than the list of input splits.
-   * This represents the boundaries between input splits.
-   * All splits are open on the top end, except the last one.
-   *
-   * So the list [0, 5, 8, 12, 18] would represent splits capturing the
-   * intervals:
-   *
-   * [0, 5)
-   * [5, 8)
-   * [8, 12)
-   * [12, 18] note the closed interval for the last split.
-   */
-  List<Long> split(long numSplits, long minVal, long maxVal)
-      throws SQLException {
-
-    List<Long> splits = new ArrayList<Long>();
-
-    // We take the min-max interval and divide by the numSplits and also
-    // calculate a remainder.  Because of integer division rules, numsplits *
-    // splitSize + minVal will always be <= maxVal.  We then use the remainder
-    // and add 1 if the current split index is less than the < the remainder.
-    // This is guaranteed to add up to remainder and not surpass the value.
-    long splitSize = (maxVal - minVal) / numSplits;
-    long remainder = (maxVal - minVal) % numSplits;
-    long curVal = minVal;
-
-    // This will honor numSplits as long as split size > 0.  If split size is
-    // 0, it will have remainder splits.
-    for (int i = 0; i <= numSplits; i++) {
-      splits.add(curVal);
-      if (curVal >= maxVal) {
-        break;
-      }
-      curVal += splitSize;
-      curVal += (i < remainder) ? 1 : 0;
-    }
-
-    if (splits.size() == 1) {
-      // make a valid singleton split
-      splits.add(maxVal);
-    }
-
-    return splits;
-  }
+      org.apache.sqoop.mapreduce.db.IntegerSplitter.LOG;
 }