You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@sqoop.apache.org by bl...@apache.org on 2011/10/28 18:50:41 UTC
svn commit: r1190441 [1/3] - in /incubator/sqoop/trunk/src:
java/com/cloudera/sqoop/mapreduce/db/ java/org/apache/sqoop/mapreduce/
java/org/apache/sqoop/mapreduce/db/ test/com/cloudera/sqoop/mapreduce/db/
test/org/apache/sqoop/ test/org/apache/sqoop/ma...
Author: blee
Date: Fri Oct 28 16:50:39 2011
New Revision: 1190441
URL: http://svn.apache.org/viewvc?rev=1190441&view=rev
Log:
SQOOP-377 Migrate mapreduce.db package to new name space
Added:
incubator/sqoop/trunk/src/java/org/apache/sqoop/mapreduce/
incubator/sqoop/trunk/src/java/org/apache/sqoop/mapreduce/db/
incubator/sqoop/trunk/src/java/org/apache/sqoop/mapreduce/db/BigDecimalSplitter.java (with props)
incubator/sqoop/trunk/src/java/org/apache/sqoop/mapreduce/db/BooleanSplitter.java (with props)
incubator/sqoop/trunk/src/java/org/apache/sqoop/mapreduce/db/DBConfiguration.java (with props)
incubator/sqoop/trunk/src/java/org/apache/sqoop/mapreduce/db/DBInputFormat.java (with props)
incubator/sqoop/trunk/src/java/org/apache/sqoop/mapreduce/db/DBOutputFormat.java (with props)
incubator/sqoop/trunk/src/java/org/apache/sqoop/mapreduce/db/DBRecordReader.java (with props)
incubator/sqoop/trunk/src/java/org/apache/sqoop/mapreduce/db/DBSplitter.java (with props)
incubator/sqoop/trunk/src/java/org/apache/sqoop/mapreduce/db/DataDrivenDBInputFormat.java (with props)
incubator/sqoop/trunk/src/java/org/apache/sqoop/mapreduce/db/DataDrivenDBRecordReader.java (with props)
incubator/sqoop/trunk/src/java/org/apache/sqoop/mapreduce/db/DateSplitter.java (with props)
incubator/sqoop/trunk/src/java/org/apache/sqoop/mapreduce/db/FloatSplitter.java (with props)
incubator/sqoop/trunk/src/java/org/apache/sqoop/mapreduce/db/IntegerSplitter.java (with props)
incubator/sqoop/trunk/src/java/org/apache/sqoop/mapreduce/db/OracleDBRecordReader.java (with props)
incubator/sqoop/trunk/src/java/org/apache/sqoop/mapreduce/db/OracleDataDrivenDBInputFormat.java (with props)
incubator/sqoop/trunk/src/java/org/apache/sqoop/mapreduce/db/OracleDataDrivenDBRecordReader.java (with props)
incubator/sqoop/trunk/src/java/org/apache/sqoop/mapreduce/db/OracleDateSplitter.java (with props)
incubator/sqoop/trunk/src/java/org/apache/sqoop/mapreduce/db/TextSplitter.java (with props)
incubator/sqoop/trunk/src/test/org/apache/sqoop/
incubator/sqoop/trunk/src/test/org/apache/sqoop/mapreduce/
incubator/sqoop/trunk/src/test/org/apache/sqoop/mapreduce/db/
incubator/sqoop/trunk/src/test/org/apache/sqoop/mapreduce/db/TestIntegerSplitter.java (with props)
incubator/sqoop/trunk/src/test/org/apache/sqoop/mapreduce/db/TestTextSplitter.java (with props)
Modified:
incubator/sqoop/trunk/src/java/com/cloudera/sqoop/mapreduce/db/BigDecimalSplitter.java
incubator/sqoop/trunk/src/java/com/cloudera/sqoop/mapreduce/db/BooleanSplitter.java
incubator/sqoop/trunk/src/java/com/cloudera/sqoop/mapreduce/db/DBConfiguration.java
incubator/sqoop/trunk/src/java/com/cloudera/sqoop/mapreduce/db/DBInputFormat.java
incubator/sqoop/trunk/src/java/com/cloudera/sqoop/mapreduce/db/DBOutputFormat.java
incubator/sqoop/trunk/src/java/com/cloudera/sqoop/mapreduce/db/DBRecordReader.java
incubator/sqoop/trunk/src/java/com/cloudera/sqoop/mapreduce/db/DBSplitter.java
incubator/sqoop/trunk/src/java/com/cloudera/sqoop/mapreduce/db/DataDrivenDBInputFormat.java
incubator/sqoop/trunk/src/java/com/cloudera/sqoop/mapreduce/db/DataDrivenDBRecordReader.java
incubator/sqoop/trunk/src/java/com/cloudera/sqoop/mapreduce/db/DateSplitter.java
incubator/sqoop/trunk/src/java/com/cloudera/sqoop/mapreduce/db/FloatSplitter.java
incubator/sqoop/trunk/src/java/com/cloudera/sqoop/mapreduce/db/IntegerSplitter.java
incubator/sqoop/trunk/src/java/com/cloudera/sqoop/mapreduce/db/OracleDBRecordReader.java
incubator/sqoop/trunk/src/java/com/cloudera/sqoop/mapreduce/db/OracleDataDrivenDBInputFormat.java
incubator/sqoop/trunk/src/java/com/cloudera/sqoop/mapreduce/db/OracleDataDrivenDBRecordReader.java
incubator/sqoop/trunk/src/java/com/cloudera/sqoop/mapreduce/db/OracleDateSplitter.java
incubator/sqoop/trunk/src/java/com/cloudera/sqoop/mapreduce/db/TextSplitter.java
incubator/sqoop/trunk/src/test/com/cloudera/sqoop/mapreduce/db/TestIntegerSplitter.java
incubator/sqoop/trunk/src/test/com/cloudera/sqoop/mapreduce/db/TestTextSplitter.java
Modified: incubator/sqoop/trunk/src/java/com/cloudera/sqoop/mapreduce/db/BigDecimalSplitter.java
URL: http://svn.apache.org/viewvc/incubator/sqoop/trunk/src/java/com/cloudera/sqoop/mapreduce/db/BigDecimalSplitter.java?rev=1190441&r1=1190440&r2=1190441&view=diff
==============================================================================
--- incubator/sqoop/trunk/src/java/com/cloudera/sqoop/mapreduce/db/BigDecimalSplitter.java (original)
+++ incubator/sqoop/trunk/src/java/com/cloudera/sqoop/mapreduce/db/BigDecimalSplitter.java Fri Oct 28 16:50:39 2011
@@ -1,6 +1,4 @@
/**
- * Copyright 2011 The Apache Software Foundation
- *
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
@@ -17,137 +15,16 @@
* See the License for the specific language governing permissions and
* limitations under the License.
*/
-
package com.cloudera.sqoop.mapreduce.db;
-import java.math.BigDecimal;
-import java.sql.ResultSet;
-import java.sql.SQLException;
-import java.util.ArrayList;
-import java.util.List;
-
-import org.apache.commons.logging.Log;
-import org.apache.commons.logging.LogFactory;
-
-import org.apache.hadoop.conf.Configuration;
-import org.apache.hadoop.mapreduce.InputSplit;
-
-import com.cloudera.sqoop.config.ConfigurationHelper;
/**
* Implement DBSplitter over BigDecimal values.
+ *
+ * @deprecated use org.apache.sqoop.mapreduce.db.BigDecimalSplitter instead.
+ * @see org.apache.sqoop.mapreduce.db.BigDecimalSplitter
*/
-public class BigDecimalSplitter implements DBSplitter {
- private static final Log LOG = LogFactory.getLog(BigDecimalSplitter.class);
-
- public List<InputSplit> split(Configuration conf, ResultSet results,
- String colName) throws SQLException {
-
- BigDecimal minVal = results.getBigDecimal(1);
- BigDecimal maxVal = results.getBigDecimal(2);
-
- String lowClausePrefix = colName + " >= ";
- String highClausePrefix = colName + " < ";
-
- BigDecimal numSplits = new BigDecimal(
- ConfigurationHelper.getConfNumMaps(conf));
-
- if (minVal == null && maxVal == null) {
- // Range is null to null. Return a null split accordingly.
- List<InputSplit> splits = new ArrayList<InputSplit>();
- splits.add(new DataDrivenDBInputFormat.DataDrivenDBInputSplit(
- colName + " IS NULL", colName + " IS NULL"));
- return splits;
- }
-
- if (minVal == null || maxVal == null) {
- // Don't know what is a reasonable min/max value for interpolation. Fail.
- LOG.error("Cannot find a range for NUMERIC or DECIMAL "
- + "fields with one end NULL.");
- return null;
- }
-
- // Get all the split points together.
- List<BigDecimal> splitPoints = split(numSplits, minVal, maxVal);
- List<InputSplit> splits = new ArrayList<InputSplit>();
-
- // Turn the split points into a set of intervals.
- BigDecimal start = splitPoints.get(0);
- for (int i = 1; i < splitPoints.size(); i++) {
- BigDecimal end = splitPoints.get(i);
-
- if (i == splitPoints.size() - 1) {
- // This is the last one; use a closed interval.
- splits.add(new DataDrivenDBInputFormat.DataDrivenDBInputSplit(
- lowClausePrefix + start.toString(),
- colName + " <= " + end.toString()));
- } else {
- // Normal open-interval case.
- splits.add(new DataDrivenDBInputFormat.DataDrivenDBInputSplit(
- lowClausePrefix + start.toString(),
- highClausePrefix + end.toString()));
- }
-
- start = end;
- }
-
- return splits;
- }
-
- private static final BigDecimal MIN_INCREMENT =
- new BigDecimal(10000 * Double.MIN_VALUE);
-
- /**
- * Divide numerator by denominator. If impossible in exact mode, use rounding.
- */
- protected BigDecimal tryDivide(BigDecimal numerator, BigDecimal denominator) {
- try {
- return numerator.divide(denominator);
- } catch (ArithmeticException ae) {
- return numerator.divide(denominator, BigDecimal.ROUND_HALF_UP);
- }
- }
-
- /**
- * Returns a list of BigDecimals one element longer than the list of input
- * splits. This represents the boundaries between input splits. All splits
- * are open on the top end, except the last one.
- *
- * So the list [0, 5, 8, 12, 18] would represent splits capturing the
- * intervals:
- *
- * [0, 5)
- * [5, 8)
- * [8, 12)
- * [12, 18] note the closed interval for the last split.
- */
- List<BigDecimal> split(BigDecimal numSplits, BigDecimal minVal,
- BigDecimal maxVal) throws SQLException {
-
- List<BigDecimal> splits = new ArrayList<BigDecimal>();
-
- // Use numSplits as a hint. May need an extra task if the size doesn't
- // divide cleanly.
-
- BigDecimal splitSize = tryDivide(maxVal.subtract(minVal), (numSplits));
- if (splitSize.compareTo(MIN_INCREMENT) < 0) {
- splitSize = MIN_INCREMENT;
- LOG.warn("Set BigDecimal splitSize to MIN_INCREMENT");
- }
-
- BigDecimal curVal = minVal;
-
- while (curVal.compareTo(maxVal) <= 0) {
- splits.add(curVal);
- curVal = curVal.add(splitSize);
- }
-
- if (splits.get(splits.size() - 1).compareTo(maxVal) != 0
- || splits.size() == 1) {
- // We didn't end on the maxVal. Add that to the end of the list.
- splits.add(maxVal);
- }
+public class BigDecimalSplitter
+ extends org.apache.sqoop.mapreduce.db.BigDecimalSplitter {
- return splits;
- }
}
Modified: incubator/sqoop/trunk/src/java/com/cloudera/sqoop/mapreduce/db/BooleanSplitter.java
URL: http://svn.apache.org/viewvc/incubator/sqoop/trunk/src/java/com/cloudera/sqoop/mapreduce/db/BooleanSplitter.java?rev=1190441&r1=1190440&r2=1190441&view=diff
==============================================================================
--- incubator/sqoop/trunk/src/java/com/cloudera/sqoop/mapreduce/db/BooleanSplitter.java (original)
+++ incubator/sqoop/trunk/src/java/com/cloudera/sqoop/mapreduce/db/BooleanSplitter.java Fri Oct 28 16:50:39 2011
@@ -1,6 +1,4 @@
/**
- * Copyright 2011 The Apache Software Foundation
- *
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
@@ -17,53 +15,16 @@
* See the License for the specific language governing permissions and
* limitations under the License.
*/
-
package com.cloudera.sqoop.mapreduce.db;
-import java.sql.ResultSet;
-import java.sql.SQLException;
-import java.util.ArrayList;
-import java.util.List;
-
-import org.apache.hadoop.conf.Configuration;
-import org.apache.hadoop.mapreduce.InputSplit;
/**
* Implement DBSplitter over boolean values.
+ *
+ * @deprecated use org.apache.sqoop.mapreduce.db.BooleanSplitter instead.
+ * @see org.apache.sqoop.mapreduce.db.BooleanSplitter
*/
-public class BooleanSplitter implements DBSplitter {
- public List<InputSplit> split(Configuration conf, ResultSet results,
- String colName) throws SQLException {
-
- List<InputSplit> splits = new ArrayList<InputSplit>();
-
- if (results.getString(1) == null && results.getString(2) == null) {
- // Range is null to null. Return a null split accordingly.
- splits.add(new DataDrivenDBInputFormat.DataDrivenDBInputSplit(
- colName + " IS NULL", colName + " IS NULL"));
- return splits;
- }
-
- boolean minVal = results.getBoolean(1);
- boolean maxVal = results.getBoolean(2);
-
- // Use one or two splits.
- if (!minVal) {
- splits.add(new DataDrivenDBInputFormat.DataDrivenDBInputSplit(
- colName + " = FALSE", colName + " = FALSE"));
- }
-
- if (maxVal) {
- splits.add(new DataDrivenDBInputFormat.DataDrivenDBInputSplit(
- colName + " = TRUE", colName + " = TRUE"));
- }
-
- if (results.getString(1) == null || results.getString(2) == null) {
- // Include a null value.
- splits.add(new DataDrivenDBInputFormat.DataDrivenDBInputSplit(
- colName + " IS NULL", colName + " IS NULL"));
- }
+public class BooleanSplitter
+ extends org.apache.sqoop.mapreduce.db.BooleanSplitter {
- return splits;
- }
}
Modified: incubator/sqoop/trunk/src/java/com/cloudera/sqoop/mapreduce/db/DBConfiguration.java
URL: http://svn.apache.org/viewvc/incubator/sqoop/trunk/src/java/com/cloudera/sqoop/mapreduce/db/DBConfiguration.java?rev=1190441&r1=1190440&r2=1190441&view=diff
==============================================================================
--- incubator/sqoop/trunk/src/java/com/cloudera/sqoop/mapreduce/db/DBConfiguration.java (original)
+++ incubator/sqoop/trunk/src/java/com/cloudera/sqoop/mapreduce/db/DBConfiguration.java Fri Oct 28 16:50:39 2011
@@ -1,6 +1,4 @@
/**
- * Copyright 2011 The Apache Software Foundation
- *
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
@@ -17,17 +15,9 @@
* See the License for the specific language governing permissions and
* limitations under the License.
*/
-
package com.cloudera.sqoop.mapreduce.db;
-import java.sql.Connection;
-import java.sql.DriverManager;
-import java.sql.SQLException;
-
import org.apache.hadoop.conf.Configuration;
-import org.apache.hadoop.mapreduce.lib.db.DBWritable;
-
-import com.cloudera.sqoop.mapreduce.db.DBInputFormat.NullDBWritable;
/**
* A container for configuration property names for jobs with DB input/output.
@@ -42,67 +32,76 @@ import com.cloudera.sqoop.mapreduce.db.D
* @see DBInputFormat#setInput(Job, Class, String, String)
* @see DBInputFormat#setInput(Job, Class, String, String, String, String...)
* @see DBOutputFormat#setOutput(Job, String, String...)
+ *
+ * @deprecated use org.apache.sqoop.mapreduce.db.DBConfiguration instead.
+ * @see org.apache.sqoop.mapreduce.db.DBConfiguration
*/
-public class DBConfiguration {
+public class DBConfiguration
+ extends org.apache.sqoop.mapreduce.db.DBConfiguration {
/** The JDBC Driver class name. */
public static final String DRIVER_CLASS_PROPERTY =
- "mapreduce.jdbc.driver.class";
+ org.apache.sqoop.mapreduce.db.DBConfiguration.DRIVER_CLASS_PROPERTY;
/** JDBC Database access URL. */
- public static final String URL_PROPERTY = "mapreduce.jdbc.url";
+ public static final String URL_PROPERTY =
+ org.apache.sqoop.mapreduce.db.DBConfiguration.URL_PROPERTY;
/** User name to access the database. */
- public static final String USERNAME_PROPERTY = "mapreduce.jdbc.username";
+ public static final String USERNAME_PROPERTY =
+ org.apache.sqoop.mapreduce.db.DBConfiguration.USERNAME_PROPERTY;
/** Password to access the database. */
- public static final String PASSWORD_PROPERTY = "mapreduce.jdbc.password";
+ public static final String PASSWORD_PROPERTY =
+ org.apache.sqoop.mapreduce.db.DBConfiguration.PASSWORD_PROPERTY;
/** Fetch size. */
- public static final String FETCH_SIZE = "mapreduce.jdbc.fetchsize";
+ public static final String FETCH_SIZE =
+ org.apache.sqoop.mapreduce.db.DBConfiguration.FETCH_SIZE;
/** Input table name. */
public static final String INPUT_TABLE_NAME_PROPERTY =
- "mapreduce.jdbc.input.table.name";
+ org.apache.sqoop.mapreduce.db.DBConfiguration.INPUT_TABLE_NAME_PROPERTY;
/** Field names in the Input table. */
public static final String INPUT_FIELD_NAMES_PROPERTY =
- "mapreduce.jdbc.input.field.names";
+ org.apache.sqoop.mapreduce.db.DBConfiguration.INPUT_FIELD_NAMES_PROPERTY;
/** WHERE clause in the input SELECT statement. */
public static final String INPUT_CONDITIONS_PROPERTY =
- "mapreduce.jdbc.input.conditions";
+ org.apache.sqoop.mapreduce.db.DBConfiguration.INPUT_CONDITIONS_PROPERTY;
/** ORDER BY clause in the input SELECT statement. */
public static final String INPUT_ORDER_BY_PROPERTY =
- "mapreduce.jdbc.input.orderby";
+ org.apache.sqoop.mapreduce.db.DBConfiguration.INPUT_ORDER_BY_PROPERTY;
/** Whole input query, exluding LIMIT...OFFSET. */
- public static final String INPUT_QUERY = "mapreduce.jdbc.input.query";
+ public static final String INPUT_QUERY =
+ org.apache.sqoop.mapreduce.db.DBConfiguration.INPUT_QUERY;
/** Input query to get the count of records. */
public static final String INPUT_COUNT_QUERY =
- "mapreduce.jdbc.input.count.query";
+ org.apache.sqoop.mapreduce.db.DBConfiguration.INPUT_COUNT_QUERY;
/** Input query to get the max and min values of the jdbc.input.query. */
public static final String INPUT_BOUNDING_QUERY =
- "mapred.jdbc.input.bounding.query";
+ org.apache.sqoop.mapreduce.db.DBConfiguration.INPUT_BOUNDING_QUERY;
/** Class name implementing DBWritable which will hold input tuples. */
public static final String INPUT_CLASS_PROPERTY =
- "mapreduce.jdbc.input.class";
+ org.apache.sqoop.mapreduce.db.DBConfiguration.INPUT_CLASS_PROPERTY;
/** Output table name. */
public static final String OUTPUT_TABLE_NAME_PROPERTY =
- "mapreduce.jdbc.output.table.name";
+ org.apache.sqoop.mapreduce.db.DBConfiguration.OUTPUT_TABLE_NAME_PROPERTY;
/** Field names in the Output table. */
public static final String OUTPUT_FIELD_NAMES_PROPERTY =
- "mapreduce.jdbc.output.field.names";
+ org.apache.sqoop.mapreduce.db.DBConfiguration.OUTPUT_FIELD_NAMES_PROPERTY;
/** Number of fields in the Output table. */
public static final String OUTPUT_FIELD_COUNT_PROPERTY =
- "mapreduce.jdbc.output.field.count";
+ org.apache.sqoop.mapreduce.db.DBConfiguration.OUTPUT_FIELD_COUNT_PROPERTY;
/**
* Sets the DB access related fields in the {@link Configuration}.
@@ -116,17 +115,8 @@ public class DBConfiguration {
public static void configureDB(Configuration conf, String driverClass,
String dbUrl, String userName, String passwd, Integer fetchSize) {
- conf.set(DRIVER_CLASS_PROPERTY, driverClass);
- conf.set(URL_PROPERTY, dbUrl);
- if (userName != null) {
- conf.set(USERNAME_PROPERTY, userName);
- }
- if (passwd != null) {
- conf.set(PASSWORD_PROPERTY, passwd);
- }
- if (fetchSize != null) {
- conf.setInt(FETCH_SIZE, fetchSize);
- }
+ org.apache.sqoop.mapreduce.db.DBConfiguration.configureDB(
+ conf, driverClass, dbUrl, userName, passwd, fetchSize);
}
/**
@@ -138,7 +128,8 @@ public class DBConfiguration {
*/
public static void configureDB(Configuration job, String driverClass,
String dbUrl, Integer fetchSize) {
- configureDB(job, driverClass, dbUrl, null, null, fetchSize);
+ org.apache.sqoop.mapreduce.db.DBConfiguration.configureDB(job, driverClass,
+ dbUrl, fetchSize);
}
/**
@@ -151,7 +142,8 @@ public class DBConfiguration {
*/
public static void configureDB(Configuration conf, String driverClass,
String dbUrl, String userName, String passwd) {
- configureDB(conf, driverClass, dbUrl, userName, passwd, null);
+ org.apache.sqoop.mapreduce.db.DBConfiguration.configureDB(conf, driverClass,
+ dbUrl, userName, passwd);
}
/**
@@ -162,151 +154,12 @@ public class DBConfiguration {
*/
public static void configureDB(Configuration job, String driverClass,
String dbUrl) {
- configureDB(job, driverClass, dbUrl, null);
+ org.apache.sqoop.mapreduce.db.DBConfiguration.configureDB(job, driverClass,
+ dbUrl);
}
- private Configuration conf;
-
public DBConfiguration(Configuration job) {
- this.conf = job;
- }
-
- /** Returns a connection object to the DB.
- * @throws ClassNotFoundException
- * @throws SQLException */
- public Connection getConnection()
- throws ClassNotFoundException, SQLException {
-
- Class.forName(conf.get(DBConfiguration.DRIVER_CLASS_PROPERTY));
-
- if(conf.get(DBConfiguration.USERNAME_PROPERTY) == null) {
- return DriverManager.getConnection(
- conf.get(DBConfiguration.URL_PROPERTY));
- } else {
- return DriverManager.getConnection(
- conf.get(DBConfiguration.URL_PROPERTY),
- conf.get(DBConfiguration.USERNAME_PROPERTY),
- conf.get(DBConfiguration.PASSWORD_PROPERTY));
- }
- }
-
- public Configuration getConf() {
- return conf;
- }
-
- public Integer getFetchSize() {
- if (conf.get(DBConfiguration.FETCH_SIZE) == null) {
- return null;
- }
- return conf.getInt(DBConfiguration.FETCH_SIZE, 0);
- }
-
- public void setFetchSize(Integer fetchSize) {
- if (fetchSize != null) {
- conf.setInt(DBConfiguration.FETCH_SIZE, fetchSize);
- } else {
- conf.set(FETCH_SIZE, null);
- }
- }
- public String getInputTableName() {
- return conf.get(DBConfiguration.INPUT_TABLE_NAME_PROPERTY);
- }
-
- public void setInputTableName(String tableName) {
- conf.set(DBConfiguration.INPUT_TABLE_NAME_PROPERTY, tableName);
- }
-
- public String[] getInputFieldNames() {
- return conf.getStrings(DBConfiguration.INPUT_FIELD_NAMES_PROPERTY);
- }
-
- public void setInputFieldNames(String... fieldNames) {
- conf.setStrings(DBConfiguration.INPUT_FIELD_NAMES_PROPERTY, fieldNames);
- }
-
- public String getInputConditions() {
- return conf.get(DBConfiguration.INPUT_CONDITIONS_PROPERTY);
- }
-
- public void setInputConditions(String conditions) {
- if (conditions != null && conditions.length() > 0) {
- conf.set(DBConfiguration.INPUT_CONDITIONS_PROPERTY, conditions);
- }
- }
-
- public String getInputOrderBy() {
- return conf.get(DBConfiguration.INPUT_ORDER_BY_PROPERTY);
- }
-
- public void setInputOrderBy(String orderby) {
- if(orderby != null && orderby.length() >0) {
- conf.set(DBConfiguration.INPUT_ORDER_BY_PROPERTY, orderby);
- }
+ super(job);
}
-
- public String getInputQuery() {
- return conf.get(DBConfiguration.INPUT_QUERY);
- }
-
- public void setInputQuery(String query) {
- if(query != null && query.length() >0) {
- conf.set(DBConfiguration.INPUT_QUERY, query);
- }
- }
-
- public String getInputCountQuery() {
- return conf.get(DBConfiguration.INPUT_COUNT_QUERY);
- }
-
- public void setInputCountQuery(String query) {
- if(query != null && query.length() > 0) {
- conf.set(DBConfiguration.INPUT_COUNT_QUERY, query);
- }
- }
-
- public void setInputBoundingQuery(String query) {
- if (query != null && query.length() > 0) {
- conf.set(DBConfiguration.INPUT_BOUNDING_QUERY, query);
- }
- }
-
- public String getInputBoundingQuery() {
- return conf.get(DBConfiguration.INPUT_BOUNDING_QUERY);
- }
-
- public Class<?> getInputClass() {
- return conf.getClass(DBConfiguration.INPUT_CLASS_PROPERTY,
- NullDBWritable.class);
- }
-
- public void setInputClass(Class<? extends DBWritable> inputClass) {
- conf.setClass(DBConfiguration.INPUT_CLASS_PROPERTY, inputClass,
- DBWritable.class);
- }
-
- public String getOutputTableName() {
- return conf.get(DBConfiguration.OUTPUT_TABLE_NAME_PROPERTY);
- }
-
- public void setOutputTableName(String tableName) {
- conf.set(DBConfiguration.OUTPUT_TABLE_NAME_PROPERTY, tableName);
- }
-
- public String[] getOutputFieldNames() {
- return conf.getStrings(DBConfiguration.OUTPUT_FIELD_NAMES_PROPERTY);
- }
-
- public void setOutputFieldNames(String... fieldNames) {
- conf.setStrings(DBConfiguration.OUTPUT_FIELD_NAMES_PROPERTY, fieldNames);
- }
-
- public void setOutputFieldCount(int fieldCount) {
- conf.setInt(DBConfiguration.OUTPUT_FIELD_COUNT_PROPERTY, fieldCount);
- }
-
- public int getOutputFieldCount() {
- return conf.getInt(OUTPUT_FIELD_COUNT_PROPERTY, 0);
- }
-
}
Modified: incubator/sqoop/trunk/src/java/com/cloudera/sqoop/mapreduce/db/DBInputFormat.java
URL: http://svn.apache.org/viewvc/incubator/sqoop/trunk/src/java/com/cloudera/sqoop/mapreduce/db/DBInputFormat.java?rev=1190441&r1=1190440&r2=1190441&view=diff
==============================================================================
--- incubator/sqoop/trunk/src/java/com/cloudera/sqoop/mapreduce/db/DBInputFormat.java (original)
+++ incubator/sqoop/trunk/src/java/com/cloudera/sqoop/mapreduce/db/DBInputFormat.java Fri Oct 28 16:50:39 2011
@@ -1,6 +1,4 @@
/**
- * Copyright 2011 The Apache Software Foundation
- *
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
@@ -17,34 +15,9 @@
* See the License for the specific language governing permissions and
* limitations under the License.
*/
-
package com.cloudera.sqoop.mapreduce.db;
-import java.io.DataInput;
-import java.io.DataOutput;
-import java.io.IOException;
-import java.sql.Connection;
-import java.sql.DatabaseMetaData;
-import java.sql.PreparedStatement;
-import java.sql.ResultSet;
-import java.sql.SQLException;
-import java.sql.Statement;
-import java.util.ArrayList;
-import java.util.List;
-
-import org.apache.hadoop.io.LongWritable;
-import org.apache.hadoop.io.Writable;
-import org.apache.hadoop.mapreduce.InputFormat;
-import org.apache.hadoop.mapreduce.InputSplit;
-import org.apache.hadoop.mapreduce.Job;
-import org.apache.hadoop.mapreduce.JobContext;
-import org.apache.hadoop.mapreduce.RecordReader;
-import org.apache.hadoop.mapreduce.TaskAttemptContext;
import org.apache.hadoop.mapreduce.lib.db.DBWritable;
-import org.apache.hadoop.conf.Configurable;
-import org.apache.hadoop.conf.Configuration;
-
-import com.cloudera.sqoop.config.ConfigurationHelper;
/**
* A InputFormat that reads input data from an SQL table.
@@ -54,38 +27,38 @@ import com.cloudera.sqoop.config.Configu
*
* The SQL query, and input class can be using one of the two
* setInput methods.
+ *
+ * @deprecated use org.apache.sqoop.mapreduce.db.DBInputFormat instead.
+ * @see org.apache.sqoop.mapreduce.db.DBInputFormat
*/
public class DBInputFormat<T extends DBWritable>
- extends InputFormat<LongWritable, T> implements Configurable {
-
- private String dbProductName = "DEFAULT";
+ extends org.apache.sqoop.mapreduce.db.DBInputFormat<T> {
/**
* A Class that does nothing, implementing DBWritable.
+ * @deprecated use org.apache.sqoop.mapreduce.db.DBInputFormat.NullDBWritable
+ * instead.
+ * @see org.apache.sqoop.mapreduce.db.DBInputFormat.NullDBWritable
*/
- public static class NullDBWritable implements DBWritable, Writable {
- @Override
- public void readFields(DataInput in) throws IOException { }
- @Override
- public void readFields(ResultSet arg0) throws SQLException { }
- @Override
- public void write(DataOutput out) throws IOException { }
- @Override
- public void write(PreparedStatement arg0) throws SQLException { }
+ public static class NullDBWritable
+ extends org.apache.sqoop.mapreduce.db.DBInputFormat.NullDBWritable {
}
/**
* A InputSplit that spans a set of rows.
+ *
+ * @deprecated use org.apache.sqoop.mapreduce.db.DBInputFormat.DBInputSplit
+ * instead.
+ * @see org.apache.sqoop.mapreduce.db.DBInputFormat.DBInputSplit
*/
- public static class DBInputSplit extends InputSplit implements Writable {
-
- private long end = 0;
- private long start = 0;
+ public static class DBInputSplit extends
+ org.apache.sqoop.mapreduce.db.DBInputFormat.DBInputSplit {
/**
* Default Constructor.
*/
public DBInputSplit() {
+ super();
}
/**
@@ -94,266 +67,7 @@ public class DBInputFormat<T extends DBW
* @param end the index of the last row to select
*/
public DBInputSplit(long start, long end) {
- this.start = start;
- this.end = end;
- }
-
- @Override
- /** {@inheritDoc} */
- public String[] getLocations() throws IOException {
- // TODO Add a layer to enable SQL "sharding" and support locality
- return new String[] {};
- }
-
- /**
- * @return The index of the first row to select
- */
- public long getStart() {
- return start;
- }
-
- /**
- * @return The index of the last row to select
- */
- public long getEnd() {
- return end;
- }
-
- /**
- * @return The total row count in this split
- */
- public long getLength() throws IOException {
- return end - start;
- }
-
- @Override
- /** {@inheritDoc} */
- public void readFields(DataInput input) throws IOException {
- start = input.readLong();
- end = input.readLong();
- }
-
- @Override
- /** {@inheritDoc} */
- public void write(DataOutput output) throws IOException {
- output.writeLong(start);
- output.writeLong(end);
- }
- }
-
- private String conditions;
-
- private Connection connection;
-
- private String tableName;
-
- private String[] fieldNames;
-
- private DBConfiguration dbConf;
-
- @Override
- /** {@inheritDoc} */
- public void setConf(Configuration conf) {
-
- dbConf = new DBConfiguration(conf);
-
- try {
- getConnection();
-
- DatabaseMetaData dbMeta = connection.getMetaData();
- this.dbProductName = dbMeta.getDatabaseProductName().toUpperCase();
- } catch (Exception ex) {
- throw new RuntimeException(ex);
- }
-
- tableName = dbConf.getInputTableName();
- fieldNames = dbConf.getInputFieldNames();
- conditions = dbConf.getInputConditions();
- }
-
- public Configuration getConf() {
- return dbConf.getConf();
- }
-
- public DBConfiguration getDBConf() {
- return dbConf;
- }
-
- public Connection getConnection() {
- try {
- if (null == this.connection) {
- // The connection was closed; reinstantiate it.
- this.connection = dbConf.getConnection();
- this.connection.setAutoCommit(false);
- this.connection.setTransactionIsolation(
- Connection.TRANSACTION_READ_COMMITTED);
- }
- } catch (Exception e) {
- throw new RuntimeException(e);
+ super(start, end);
}
- return connection;
- }
-
- public String getDBProductName() {
- return dbProductName;
- }
-
- protected RecordReader<LongWritable, T> createDBRecordReader(
- DBInputSplit split, Configuration conf) throws IOException {
-
- @SuppressWarnings("unchecked")
- Class<T> inputClass = (Class<T>) (dbConf.getInputClass());
- try {
- // use database product name to determine appropriate record reader.
- if (dbProductName.startsWith("ORACLE")) {
- // use Oracle-specific db reader.
- return new OracleDBRecordReader<T>(split, inputClass,
- conf, getConnection(), getDBConf(), conditions, fieldNames,
- tableName);
- } else {
- // Generic reader.
- return new DBRecordReader<T>(split, inputClass,
- conf, getConnection(), getDBConf(), conditions, fieldNames,
- tableName);
- }
- } catch (SQLException ex) {
- throw new IOException(ex);
- }
- }
-
- @Override
- /** {@inheritDoc} */
- public RecordReader<LongWritable, T> createRecordReader(InputSplit split,
- TaskAttemptContext context) throws IOException, InterruptedException {
-
- return createDBRecordReader((DBInputSplit) split,
- context.getConfiguration());
- }
-
- /** {@inheritDoc} */
- @Override
- public List<InputSplit> getSplits(JobContext job) throws IOException {
-
- ResultSet results = null;
- Statement statement = null;
- try {
- statement = connection.createStatement();
-
- results = statement.executeQuery(getCountQuery());
- results.next();
-
- long count = results.getLong(1);
- int chunks = ConfigurationHelper.getJobNumMaps(job);
- long chunkSize = (count / chunks);
-
- results.close();
- statement.close();
-
- List<InputSplit> splits = new ArrayList<InputSplit>();
-
- // Split the rows into n-number of chunks and adjust the last chunk
- // accordingly
- for (int i = 0; i < chunks; i++) {
- DBInputSplit split;
-
- if ((i + 1) == chunks) {
- split = new DBInputSplit(i * chunkSize, count);
- } else {
- split = new DBInputSplit(i * chunkSize, (i * chunkSize)
- + chunkSize);
- }
-
- splits.add(split);
- }
-
- connection.commit();
- return splits;
- } catch (SQLException e) {
- throw new IOException("Got SQLException", e);
- } finally {
- try {
- if (results != null) { results.close(); }
- } catch (SQLException e1) { /* ignored */ }
- try {
- if (statement != null) { statement.close(); }
- } catch (SQLException e1) { /* ignored */ }
-
- closeConnection();
- }
- }
-
- /** Returns the query for getting the total number of rows,
- * subclasses can override this for custom behaviour.*/
- protected String getCountQuery() {
-
- if(dbConf.getInputCountQuery() != null) {
- return dbConf.getInputCountQuery();
- }
-
- StringBuilder query = new StringBuilder();
- query.append("SELECT COUNT(*) FROM " + tableName);
-
- if (conditions != null && conditions.length() > 0) {
- query.append(" WHERE " + conditions);
- }
- return query.toString();
- }
-
- /**
- * Initializes the map-part of the job with the appropriate input settings.
- *
- * @param job The map-reduce job
- * @param inputClass the class object implementing DBWritable, which is the
- * Java object holding tuple fields.
- * @param tableName The table to read data from
- * @param conditions The condition which to select data with,
- * eg. '(updated > 20070101 AND length > 0)'
- * @param orderBy the fieldNames in the orderBy clause.
- * @param fieldNames The field names in the table
- * @see #setInput(Job, Class, String, String)
- */
- public static void setInput(Job job,
- Class<? extends DBWritable> inputClass,
- String tableName, String conditions,
- String orderBy, String... fieldNames) {
- job.setInputFormatClass(DBInputFormat.class);
- DBConfiguration dbConf = new DBConfiguration(job.getConfiguration());
- dbConf.setInputClass(inputClass);
- dbConf.setInputTableName(tableName);
- dbConf.setInputFieldNames(fieldNames);
- dbConf.setInputConditions(conditions);
- dbConf.setInputOrderBy(orderBy);
- }
-
- /**
- * Initializes the map-part of the job with the appropriate input settings.
- *
- * @param job The map-reduce job
- * @param inputClass the class object implementing DBWritable, which is the
- * Java object holding tuple fields.
- * @param inputQuery the input query to select fields. Example :
- * "SELECT f1, f2, f3 FROM Mytable ORDER BY f1"
- * @param inputCountQuery the input query that returns
- * the number of records in the table.
- * Example : "SELECT COUNT(f1) FROM Mytable"
- * @see #setInput(Job, Class, String, String, String, String...)
- */
- public static void setInput(Job job,
- Class<? extends DBWritable> inputClass,
- String inputQuery, String inputCountQuery) {
- job.setInputFormatClass(DBInputFormat.class);
- DBConfiguration dbConf = new DBConfiguration(job.getConfiguration());
- dbConf.setInputClass(inputClass);
- dbConf.setInputQuery(inputQuery);
- dbConf.setInputCountQuery(inputCountQuery);
- }
-
- protected void closeConnection() {
- try {
- if (null != this.connection) {
- this.connection.close();
- this.connection = null;
- }
- } catch (SQLException sqlE) { /* ignore exception on close. */ }
}
}
Modified: incubator/sqoop/trunk/src/java/com/cloudera/sqoop/mapreduce/db/DBOutputFormat.java
URL: http://svn.apache.org/viewvc/incubator/sqoop/trunk/src/java/com/cloudera/sqoop/mapreduce/db/DBOutputFormat.java?rev=1190441&r1=1190440&r2=1190441&view=diff
==============================================================================
--- incubator/sqoop/trunk/src/java/com/cloudera/sqoop/mapreduce/db/DBOutputFormat.java (original)
+++ incubator/sqoop/trunk/src/java/com/cloudera/sqoop/mapreduce/db/DBOutputFormat.java Fri Oct 28 16:50:39 2011
@@ -1,6 +1,4 @@
/**
- * Copyright 2011 The Apache Software Foundation
- *
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
@@ -17,28 +15,14 @@
* See the License for the specific language governing permissions and
* limitations under the License.
*/
-
package com.cloudera.sqoop.mapreduce.db;
-import java.io.IOException;
import java.sql.Connection;
import java.sql.PreparedStatement;
import java.sql.SQLException;
-import org.apache.commons.logging.Log;
-import org.apache.commons.logging.LogFactory;
-import org.apache.hadoop.mapreduce.Job;
-import org.apache.hadoop.mapreduce.JobContext;
-import org.apache.hadoop.mapreduce.OutputCommitter;
-import org.apache.hadoop.mapreduce.OutputFormat;
import org.apache.hadoop.mapreduce.RecordWriter;
-import org.apache.hadoop.mapreduce.TaskAttemptContext;
import org.apache.hadoop.mapreduce.lib.db.DBWritable;
-import org.apache.hadoop.mapreduce.lib.output.FileOutputCommitter;
-import org.apache.hadoop.mapreduce.lib.output.FileOutputFormat;
-import org.apache.hadoop.util.StringUtils;
-
-import com.cloudera.sqoop.config.ConfigurationHelper;
/**
* A OutputFormat that sends the reduce output to a SQL table.
@@ -47,191 +31,29 @@ import com.cloudera.sqoop.config.Configu
* key has a type extending DBWritable. Returned {@link RecordWriter}
* writes <b>only the key</b> to the database with a batch SQL query.
*
+ * @deprecated use org.apache.sqoop.mapreduce.db.DBoutputFormat instead.
+ * @see org.apache.sqoop.mapreduce.db.DBOutputFormat
*/
public class DBOutputFormat<K extends DBWritable, V>
- extends OutputFormat<K, V> {
-
- private static final Log LOG = LogFactory.getLog(DBOutputFormat.class);
- public void checkOutputSpecs(JobContext context)
- throws IOException, InterruptedException {}
-
- public OutputCommitter getOutputCommitter(TaskAttemptContext context)
- throws IOException, InterruptedException {
- return new FileOutputCommitter(FileOutputFormat.getOutputPath(context),
- context);
- }
+ extends org.apache.sqoop.mapreduce.db.DBOutputFormat<K, V> {
/**
* A RecordWriter that writes the reduce output to a SQL table.
- */
- public class DBRecordWriter
- extends RecordWriter<K, V> {
-
- private Connection connection;
- private PreparedStatement statement;
-
- public DBRecordWriter() throws SQLException {
- }
-
- public DBRecordWriter(Connection connection
- , PreparedStatement statement) throws SQLException {
- this.connection = connection;
- this.statement = statement;
- this.connection.setAutoCommit(false);
- }
-
- public Connection getConnection() {
- return connection;
- }
-
- public PreparedStatement getStatement() {
- return statement;
- }
-
- @Override
- /** {@inheritDoc} */
- public void close(TaskAttemptContext context) throws IOException {
- try {
- statement.executeBatch();
- connection.commit();
- } catch (SQLException e) {
- try {
- connection.rollback();
- } catch (SQLException ex) {
- LOG.warn(StringUtils.stringifyException(ex));
- }
- throw new IOException(e);
- } finally {
- try {
- statement.close();
- connection.close();
- } catch (SQLException ex) {
- LOG.error("Unable to close connection", ex);
- }
- }
- }
-
- @Override
- /** {@inheritDoc} */
- public void write(K key, V value) throws IOException {
- try {
- key.write(statement);
- statement.addBatch();
- } catch (SQLException e) {
- LOG.error("Exception encountered", e);
- }
- }
- }
-
- /**
- * Constructs the query used as the prepared statement to insert data.
*
- * @param table
- * the table to insert into
- * @param fieldNames
- * the fields to insert into. If field names are unknown, supply an
- * array of nulls.
+ * @deprecated use
+ * org.apache.sqoop.mapreduce.db.DBOutputFormat.DBRecordWriter instead.
+ * @see org.apache.sqoop.mapreduce.db.DBOutputFormat.DBRecordWriter
*/
- public String constructQuery(String table, String[] fieldNames) {
- if(fieldNames == null) {
- throw new IllegalArgumentException("Field names may not be null");
- }
+ public static class DBRecordWriter<K extends DBWritable, V> extends
+ org.apache.sqoop.mapreduce.db.DBOutputFormat.DBRecordWriter<K, V> {
- StringBuilder query = new StringBuilder();
- query.append("INSERT INTO ").append(table);
-
- if (fieldNames.length > 0 && fieldNames[0] != null) {
- query.append(" (");
- for (int i = 0; i < fieldNames.length; i++) {
- query.append(fieldNames[i]);
- if (i != fieldNames.length - 1) {
- query.append(",");
- }
- }
- query.append(")");
- }
- query.append(" VALUES (");
-
- for (int i = 0; i < fieldNames.length; i++) {
- query.append("?");
- if(i != fieldNames.length - 1) {
- query.append(",");
- }
- }
- query.append(");");
-
- return query.toString();
- }
-
- @Override
- /** {@inheritDoc} */
- public RecordWriter<K, V> getRecordWriter(TaskAttemptContext context)
- throws IOException {
- DBConfiguration dbConf = new DBConfiguration(context.getConfiguration());
- String tableName = dbConf.getOutputTableName();
- String[] fieldNames = dbConf.getOutputFieldNames();
-
- if(fieldNames == null) {
- fieldNames = new String[dbConf.getOutputFieldCount()];
- }
-
- try {
- Connection connection = dbConf.getConnection();
- PreparedStatement statement = null;
-
- statement = connection.prepareStatement(
- constructQuery(tableName, fieldNames));
- return new DBRecordWriter(connection, statement);
- } catch (Exception ex) {
- throw new IOException(ex);
+ public DBRecordWriter() throws SQLException {
+ super();
}
- }
- /**
- * Initializes the reduce-part of the job with
- * the appropriate output settings.
- *
- * @param job The job
- * @param tableName The table to insert data into
- * @param fieldNames The field names in the table.
- */
- public static void setOutput(Job job, String tableName,
- String... fieldNames) throws IOException {
- if(fieldNames.length > 0 && fieldNames[0] != null) {
- DBConfiguration dbConf = setOutput(job, tableName);
- dbConf.setOutputFieldNames(fieldNames);
- } else {
- if (fieldNames.length > 0) {
- setOutput(job, tableName, fieldNames.length);
- } else {
- throw new IllegalArgumentException(
- "Field names must be greater than 0");
- }
+ public DBRecordWriter(Connection connection,
+ PreparedStatement statement) throws SQLException {
+ super(connection, statement);
}
}
-
- /**
- * Initializes the reduce-part of the job
- * with the appropriate output settings.
- *
- * @param job The job
- * @param tableName The table to insert data into
- * @param fieldCount the number of fields in the table.
- */
- public static void setOutput(Job job, String tableName,
- int fieldCount) throws IOException {
- DBConfiguration dbConf = setOutput(job, tableName);
- dbConf.setOutputFieldCount(fieldCount);
- }
-
- private static DBConfiguration setOutput(Job job,
- String tableName) throws IOException {
- job.setOutputFormatClass(DBOutputFormat.class);
- ConfigurationHelper.setJobReduceSpeculativeExecution(job, false);
-
- DBConfiguration dbConf = new DBConfiguration(job.getConfiguration());
-
- dbConf.setOutputTableName(tableName);
- return dbConf;
- }
}
Modified: incubator/sqoop/trunk/src/java/com/cloudera/sqoop/mapreduce/db/DBRecordReader.java
URL: http://svn.apache.org/viewvc/incubator/sqoop/trunk/src/java/com/cloudera/sqoop/mapreduce/db/DBRecordReader.java?rev=1190441&r1=1190440&r2=1190441&view=diff
==============================================================================
--- incubator/sqoop/trunk/src/java/com/cloudera/sqoop/mapreduce/db/DBRecordReader.java (original)
+++ incubator/sqoop/trunk/src/java/com/cloudera/sqoop/mapreduce/db/DBRecordReader.java Fri Oct 28 16:50:39 2011
@@ -1,6 +1,4 @@
/**
- * Copyright 2011 The Apache Software Foundation
- *
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
@@ -17,63 +15,24 @@
* See the License for the specific language governing permissions and
* limitations under the License.
*/
-
package com.cloudera.sqoop.mapreduce.db;
-import java.io.IOException;
import java.sql.Connection;
-import java.sql.PreparedStatement;
-import java.sql.ResultSet;
import java.sql.SQLException;
-import java.util.Arrays;
-import org.apache.commons.logging.Log;
-import org.apache.commons.logging.LogFactory;
-import org.apache.hadoop.io.LongWritable;
-import org.apache.hadoop.mapreduce.InputSplit;
-import org.apache.hadoop.mapreduce.RecordReader;
-import org.apache.hadoop.mapreduce.TaskAttemptContext;
-import org.apache.hadoop.mapreduce.lib.db.DBWritable;
-import org.apache.hadoop.util.ReflectionUtils;
import org.apache.hadoop.conf.Configuration;
-
-import com.cloudera.sqoop.util.LoggingUtils;
+import org.apache.hadoop.mapreduce.lib.db.DBWritable;
/**
* A RecordReader that reads records from a SQL table.
* Emits LongWritables containing the record number as
* key and DBWritables as value.
+ *
+ * @deprecated use org.apache.sqoop.mapreduce.db.DBRecordReader instead.
+ * @see org.apache.sqoop.mapreduce.db.DBRecordReader
*/
public class DBRecordReader<T extends DBWritable> extends
- RecordReader<LongWritable, T> {
-
- private static final Log LOG = LogFactory.getLog(DBRecordReader.class);
-
- private ResultSet results = null;
-
- private Class<T> inputClass;
-
- private Configuration conf;
-
- private DBInputFormat.DBInputSplit split;
-
- private long pos = 0;
-
- private LongWritable key = null;
-
- private T value = null;
-
- private Connection connection;
-
- protected PreparedStatement statement;
-
- private DBConfiguration dbConf;
-
- private String conditions;
-
- private String [] fieldNames;
-
- private String tableName;
+ org.apache.sqoop.mapreduce.db.DBRecordReader<T> {
/**
* @param split The InputSplit to read data for
@@ -85,222 +44,7 @@ public class DBRecordReader<T extends DB
Class<T> inputClass, Configuration conf, Connection conn,
DBConfiguration dbConfig, String cond, String [] fields, String table)
throws SQLException {
- this.inputClass = inputClass;
- this.split = split;
- this.conf = conf;
- this.connection = conn;
- this.dbConf = dbConfig;
- this.conditions = cond;
- if (fields != null) {
- this.fieldNames = Arrays.copyOf(fields, fields.length);
- }
- this.tableName = table;
+ super(split, inputClass, conf, conn, dbConfig, cond, fields, table);
}
// CHECKSTYLE:ON
-
- protected ResultSet executeQuery(String query) throws SQLException {
- this.statement = connection.prepareStatement(query,
- ResultSet.TYPE_FORWARD_ONLY, ResultSet.CONCUR_READ_ONLY);
-
- Integer fetchSize = dbConf.getFetchSize();
- if (fetchSize != null) {
- LOG.debug("Using fetchSize for next query: " + fetchSize);
- statement.setFetchSize(fetchSize);
- }
-
- LOG.debug("Executing query: " + query);
- return statement.executeQuery();
- }
-
- /** Returns the query for selecting the records,
- * subclasses can override this for custom behaviour.*/
- protected String getSelectQuery() {
- StringBuilder query = new StringBuilder();
-
- // Default codepath for MySQL, HSQLDB, etc.
- // Relies on LIMIT/OFFSET for splits.
- if(dbConf.getInputQuery() == null) {
- query.append("SELECT ");
-
- for (int i = 0; i < fieldNames.length; i++) {
- query.append(fieldNames[i]);
- if (i != fieldNames.length -1) {
- query.append(", ");
- }
- }
-
- query.append(" FROM ").append(tableName);
- query.append(" AS ").append(tableName); //in hsqldb this is necessary
- if (conditions != null && conditions.length() > 0) {
- query.append(" WHERE (").append(conditions).append(")");
- }
-
- String orderBy = dbConf.getInputOrderBy();
- if (orderBy != null && orderBy.length() > 0) {
- query.append(" ORDER BY ").append(orderBy);
- }
- } else {
- //PREBUILT QUERY
- query.append(dbConf.getInputQuery());
- }
-
- try {
- query.append(" LIMIT ").append(split.getLength());
- query.append(" OFFSET ").append(split.getStart());
- } catch (IOException ex) {
- // Ignore, will not throw.
- }
-
- return query.toString();
- }
-
- @Override
- /** {@inheritDoc} */
- public void close() throws IOException {
- try {
- if (null != results) {
- results.close();
- }
- if (null != statement) {
- statement.close();
- }
- if (null != connection) {
- connection.commit();
- connection.close();
- }
- } catch (SQLException e) {
- throw new IOException(e);
- }
- }
-
- public void initialize(InputSplit inputSplit, TaskAttemptContext context)
- throws IOException, InterruptedException {
- //do nothing
- }
-
- @Override
- /** {@inheritDoc} */
- public LongWritable getCurrentKey() {
- return key;
- }
-
- @Override
- /** {@inheritDoc} */
- public T getCurrentValue() {
- return value;
- }
-
- /**
- * @deprecated
- */
- @Deprecated
- public T createValue() {
- return ReflectionUtils.newInstance(inputClass, conf);
- }
-
- /**
- * @deprecated
- */
- @Deprecated
- public long getPos() throws IOException {
- return pos;
- }
-
- /**
- * @deprecated Use {@link #nextKeyValue()}
- */
- @Deprecated
- public boolean next(LongWritable k, T v) throws IOException {
- this.key = k;
- this.value = v;
- return nextKeyValue();
- }
-
- @Override
- /** {@inheritDoc} */
- public float getProgress() throws IOException {
- return pos / (float)split.getLength();
- }
-
- @Override
- /** {@inheritDoc} */
- public boolean nextKeyValue() throws IOException {
- try {
- if (key == null) {
- key = new LongWritable();
- }
- if (value == null) {
- value = createValue();
- }
- if (null == this.results) {
- // First time into this method, run the query.
- this.results = executeQuery(getSelectQuery());
- }
- if (!results.next()) {
- return false;
- }
-
- // Set the key field value as the output key value
- key.set(pos + split.getStart());
-
- value.readFields(results);
-
- pos++;
- } catch (SQLException e) {
- LoggingUtils.logAll(LOG, e);
- throw new IOException("SQLException in nextKeyValue", e);
- }
- return true;
- }
-
- /**
- * @return true if nextKeyValue() would return false.
- */
- protected boolean isDone() {
- try {
- return this.results != null
- && (results.isLast() || results.isAfterLast());
- } catch (SQLException sqlE) {
- return true;
- }
- }
-
- protected DBInputFormat.DBInputSplit getSplit() {
- return split;
- }
-
- protected String [] getFieldNames() {
- return fieldNames;
- }
-
- protected String getTableName() {
- return tableName;
- }
-
- protected String getConditions() {
- return conditions;
- }
-
- protected DBConfiguration getDBConf() {
- return dbConf;
- }
-
- protected Connection getConnection() {
- return connection;
- }
-
- protected PreparedStatement getStatement() {
- return statement;
- }
-
- protected void setStatement(PreparedStatement stmt) {
- this.statement = stmt;
- }
-
- /**
- * @return the configuration. Allows subclasses to access the configuration
- */
- protected Configuration getConf(){
- return conf;
- }
}
Modified: incubator/sqoop/trunk/src/java/com/cloudera/sqoop/mapreduce/db/DBSplitter.java
URL: http://svn.apache.org/viewvc/incubator/sqoop/trunk/src/java/com/cloudera/sqoop/mapreduce/db/DBSplitter.java?rev=1190441&r1=1190440&r2=1190441&view=diff
==============================================================================
--- incubator/sqoop/trunk/src/java/com/cloudera/sqoop/mapreduce/db/DBSplitter.java (original)
+++ incubator/sqoop/trunk/src/java/com/cloudera/sqoop/mapreduce/db/DBSplitter.java Fri Oct 28 16:50:39 2011
@@ -1,6 +1,4 @@
/**
- * Copyright 2011 The Apache Software Foundation
- *
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
@@ -17,15 +15,8 @@
* See the License for the specific language governing permissions and
* limitations under the License.
*/
-
package com.cloudera.sqoop.mapreduce.db;
-import java.sql.ResultSet;
-import java.sql.SQLException;
-import java.util.List;
-
-import org.apache.hadoop.conf.Configuration;
-import org.apache.hadoop.mapreduce.InputSplit;
/**
* DBSplitter will generate DBInputSplits to use with DataDrivenDBInputFormat.
@@ -34,13 +25,10 @@ import org.apache.hadoop.mapreduce.Input
* on the data-type of the column, this requires different behavior.
* DBSplitter implementations should perform this for a data type or family
* of data types.
+ *
+ * @deprecated use org.apache.sqoop.mapreduce.db.DBSplitter instead.
+ * @see org.apache.sqoop.mapreduce.db.DBSplitter
*/
-public interface DBSplitter {
- /**
- * Given a ResultSet containing one record (and already advanced to that
- * record) with two columns (a low value, and a high value, both of the same
- * type), determine a set of splits that span the given values.
- */
- List<InputSplit> split(Configuration conf, ResultSet results, String colName)
- throws SQLException;
+public interface DBSplitter extends org.apache.sqoop.mapreduce.db.DBSplitter {
+
}
Modified: incubator/sqoop/trunk/src/java/com/cloudera/sqoop/mapreduce/db/DataDrivenDBInputFormat.java
URL: http://svn.apache.org/viewvc/incubator/sqoop/trunk/src/java/com/cloudera/sqoop/mapreduce/db/DataDrivenDBInputFormat.java?rev=1190441&r1=1190440&r2=1190441&view=diff
==============================================================================
--- incubator/sqoop/trunk/src/java/com/cloudera/sqoop/mapreduce/db/DataDrivenDBInputFormat.java (original)
+++ incubator/sqoop/trunk/src/java/com/cloudera/sqoop/mapreduce/db/DataDrivenDBInputFormat.java Fri Oct 28 16:50:39 2011
@@ -1,6 +1,4 @@
/**
- * Copyright 2011 The Apache Software Foundation
- *
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
@@ -17,67 +15,48 @@
* See the License for the specific language governing permissions and
* limitations under the License.
*/
-
package com.cloudera.sqoop.mapreduce.db;
-import java.io.DataInput;
-import java.io.DataOutput;
-import java.io.IOException;
-import java.sql.Connection;
-import java.sql.ResultSet;
-import java.sql.SQLException;
-import java.sql.Statement;
-import java.sql.Types;
-import java.util.ArrayList;
-import java.util.List;
-
-import org.apache.commons.logging.Log;
-import org.apache.commons.logging.LogFactory;
-
-import org.apache.hadoop.io.LongWritable;
-import org.apache.hadoop.io.Text;
-import org.apache.hadoop.mapreduce.InputSplit;
+import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.mapreduce.Job;
-import org.apache.hadoop.mapreduce.JobContext;
-import org.apache.hadoop.mapreduce.RecordReader;
import org.apache.hadoop.mapreduce.lib.db.DBWritable;
-import org.apache.hadoop.conf.Configurable;
-import org.apache.hadoop.conf.Configuration;
-
-import com.cloudera.sqoop.config.ConfigurationHelper;
/**
* A InputFormat that reads input data from an SQL table.
* Operates like DBInputFormat, but instead of using LIMIT and OFFSET to
* demarcate splits, it tries to generate WHERE clauses which separate the
* data into roughly equivalent shards.
+ *
+ * @deprecated use org.apache.sqoop.mapreduce.db.DataDrivenDBInputFormat instead
+ * @see org.apache.sqoop.mapreduce.db.DataDrivenDBInputFormat
*/
public class DataDrivenDBInputFormat<T extends DBWritable>
- extends DBInputFormat<T> implements Configurable {
-
- private static final Log LOG =
- LogFactory.getLog(DataDrivenDBInputFormat.class);
+ extends org.apache.sqoop.mapreduce.db.DataDrivenDBInputFormat<T> {
/**
* If users are providing their own query, the following string is expected
* to appear in the WHERE clause, which will be substituted with a pair of
* conditions on the input to allow input splits to parallelise the import.
*/
- public static final String SUBSTITUTE_TOKEN = "$CONDITIONS";
+ public static final String SUBSTITUTE_TOKEN =
+ org.apache.sqoop.mapreduce.db.DataDrivenDBInputFormat.SUBSTITUTE_TOKEN;
/**
* A InputSplit that spans a set of rows.
+ *
+ * @deprecated use org.apache.sqoop.mapreduce.db.DataDrivenDBInputFormat.
+ * DataDrivenDBInputSplit instead.
+ * @see org.apache.sqoop.mapreduce.db.DataDrivenDBInputFormat.
+ * DataDrivenDBInputSplit
*/
- public static class DataDrivenDBInputSplit
- extends DBInputFormat.DBInputSplit {
-
- private String lowerBoundClause;
- private String upperBoundClause;
+ public static class DataDrivenDBInputSplit extends
+ org.apache.sqoop.mapreduce.db.DataDrivenDBInputFormat.DataDrivenDBInputSplit {
/**
* Default Constructor.
*/
public DataDrivenDBInputSplit() {
+ super();
}
/**
@@ -88,189 +67,10 @@ public class DataDrivenDBInputFormat<T e
* on the 'upper' end.
*/
public DataDrivenDBInputSplit(final String lower, final String upper) {
- this.lowerBoundClause = lower;
- this.upperBoundClause = upper;
- }
-
-
- /**
- * @return The total row count in this split.
- */
- public long getLength() throws IOException {
- return 0; // unfortunately, we don't know this.
- }
-
- @Override
- /** {@inheritDoc} */
- public void readFields(DataInput input) throws IOException {
- this.lowerBoundClause = Text.readString(input);
- this.upperBoundClause = Text.readString(input);
- }
-
- @Override
- /** {@inheritDoc} */
- public void write(DataOutput output) throws IOException {
- Text.writeString(output, this.lowerBoundClause);
- Text.writeString(output, this.upperBoundClause);
- }
-
- public String getLowerClause() {
- return lowerBoundClause;
- }
-
- public String getUpperClause() {
- return upperBoundClause;
+ super(lower, upper);
}
}
- /**
- * @return the DBSplitter implementation to use to divide the table/query
- * into InputSplits.
- */
- protected DBSplitter getSplitter(int sqlDataType) {
- switch (sqlDataType) {
- case Types.NUMERIC:
- case Types.DECIMAL:
- return new BigDecimalSplitter();
-
- case Types.BIT:
- case Types.BOOLEAN:
- return new BooleanSplitter();
-
- case Types.INTEGER:
- case Types.TINYINT:
- case Types.SMALLINT:
- case Types.BIGINT:
- return new IntegerSplitter();
-
- case Types.REAL:
- case Types.FLOAT:
- case Types.DOUBLE:
- return new FloatSplitter();
-
- case Types.CHAR:
- case Types.VARCHAR:
- case Types.LONGVARCHAR:
- return new TextSplitter();
-
- case Types.DATE:
- case Types.TIME:
- case Types.TIMESTAMP:
- return new DateSplitter();
-
- default:
- // TODO: Support BINARY, VARBINARY, LONGVARBINARY, DISTINCT, CLOB,
- // BLOB, ARRAY, STRUCT, REF, DATALINK, and JAVA_OBJECT.
- return null;
- }
- }
-
- @Override
- /** {@inheritDoc} */
- public List<InputSplit> getSplits(JobContext job) throws IOException {
-
- int targetNumTasks = ConfigurationHelper.getJobNumMaps(job);
- String boundaryQuery = getDBConf().getInputBoundingQuery();
-
- // If user do not forced us to use his boundary query and we don't have to
- // bacause there is only one mapper we will return single split that
- // separates nothing. This can be considerably more optimal for a large
- // table with no index.
- if (1 == targetNumTasks
- && (boundaryQuery == null || boundaryQuery.isEmpty())) {
- List<InputSplit> singletonSplit = new ArrayList<InputSplit>();
- singletonSplit.add(new DataDrivenDBInputSplit("1=1", "1=1"));
- return singletonSplit;
- }
-
- ResultSet results = null;
- Statement statement = null;
- Connection connection = getConnection();
- try {
- statement = connection.createStatement();
-
- String query = getBoundingValsQuery();
- LOG.info("BoundingValsQuery: " + query);
-
- results = statement.executeQuery(query);
- results.next();
-
- // Based on the type of the results, use a different mechanism
- // for interpolating split points (i.e., numeric splits, text splits,
- // dates, etc.)
- int sqlDataType = results.getMetaData().getColumnType(1);
- boolean isSigned = results.getMetaData().isSigned(1);
-
- // MySQL has an unsigned integer which we need to allocate space for
- if (sqlDataType == Types.INTEGER && !isSigned){
- sqlDataType = Types.BIGINT;
- }
-
- DBSplitter splitter = getSplitter(sqlDataType);
- if (null == splitter) {
- throw new IOException("Unknown SQL data type: " + sqlDataType);
- }
-
- return splitter.split(job.getConfiguration(), results,
- getDBConf().getInputOrderBy());
- } catch (SQLException e) {
- throw new IOException(e);
- } finally {
- // More-or-less ignore SQL exceptions here, but log in case we need it.
- try {
- if (null != results) {
- results.close();
- }
- } catch (SQLException se) {
- LOG.debug("SQLException closing resultset: " + se.toString());
- }
-
- try {
- if (null != statement) {
- statement.close();
- }
- } catch (SQLException se) {
- LOG.debug("SQLException closing statement: " + se.toString());
- }
-
- try {
- connection.commit();
- closeConnection();
- } catch (SQLException se) {
- LOG.debug("SQLException committing split transaction: "
- + se.toString());
- }
- }
- }
-
- /**
- * @return a query which returns the minimum and maximum values for
- * the order-by column.
- *
- * The min value should be in the first column, and the
- * max value should be in the second column of the results.
- */
- protected String getBoundingValsQuery() {
- // If the user has provided a query, use that instead.
- String userQuery = getDBConf().getInputBoundingQuery();
- if (null != userQuery) {
- return userQuery;
- }
-
- // Auto-generate one based on the table name we've been provided with.
- StringBuilder query = new StringBuilder();
-
- String splitCol = getDBConf().getInputOrderBy();
- query.append("SELECT MIN(").append(splitCol).append("), ");
- query.append("MAX(").append(splitCol).append(") FROM ");
- query.append(getDBConf().getInputTableName());
- String conditions = getDBConf().getInputConditions();
- if (null != conditions) {
- query.append(" WHERE ( " + conditions + " )");
- }
-
- return query.toString();
- }
/** Set the user-defined bounding query to use with a user-defined query.
This *must* include the substring "$CONDITIONS"
@@ -282,35 +82,8 @@ public class DataDrivenDBInputFormat<T e
inside each split.
*/
public static void setBoundingQuery(Configuration conf, String query) {
- if (null != query) {
- // If the user's settng a query, warn if they don't allow conditions.
- if (query.indexOf(SUBSTITUTE_TOKEN) == -1) {
- LOG.warn("Could not find " + SUBSTITUTE_TOKEN + " token in query: "
- + query + "; splits may not partition data.");
- }
- }
-
- conf.set(DBConfiguration.INPUT_BOUNDING_QUERY, query);
- }
-
- protected RecordReader<LongWritable, T> createDBRecordReader(
- DBInputSplit split, Configuration conf) throws IOException {
-
- DBConfiguration dbConf = getDBConf();
- @SuppressWarnings("unchecked")
- Class<T> inputClass = (Class<T>) (dbConf.getInputClass());
- String dbProductName = getDBProductName();
-
- LOG.debug("Creating db record reader for db product: " + dbProductName);
-
- try {
- return new DataDrivenDBRecordReader<T>(split, inputClass,
- conf, getConnection(), dbConf, dbConf.getInputConditions(),
- dbConf.getInputFieldNames(), dbConf.getInputTableName(),
- dbProductName);
- } catch (SQLException ex) {
- throw new IOException(ex);
- }
+ org.apache.sqoop.mapreduce.db.DataDrivenDBInputFormat.setBoundingQuery(
+ conf, query);
}
// Configuration methods override superclass to ensure that the proper
@@ -324,9 +97,8 @@ public class DataDrivenDBInputFormat<T e
Class<? extends DBWritable> inputClass,
String tableName, String conditions,
String splitBy, String... fieldNames) {
- DBInputFormat.setInput(job, inputClass, tableName, conditions,
- splitBy, fieldNames);
- job.setInputFormatClass(DataDrivenDBInputFormat.class);
+ org.apache.sqoop.mapreduce.db.DataDrivenDBInputFormat.setInput(
+ job, inputClass, tableName, conditions, splitBy, fieldNames);
}
/** setInput() takes a custom query and a separate "bounding query" to use
@@ -335,9 +107,7 @@ public class DataDrivenDBInputFormat<T e
public static void setInput(Job job,
Class<? extends DBWritable> inputClass,
String inputQuery, String inputBoundingQuery) {
- DBInputFormat.setInput(job, inputClass, inputQuery, "");
- job.getConfiguration().set(DBConfiguration.INPUT_BOUNDING_QUERY,
- inputBoundingQuery);
- job.setInputFormatClass(DataDrivenDBInputFormat.class);
+ org.apache.sqoop.mapreduce.db.DataDrivenDBInputFormat.setInput(
+ job, inputClass, inputQuery, inputBoundingQuery);
}
}
Modified: incubator/sqoop/trunk/src/java/com/cloudera/sqoop/mapreduce/db/DataDrivenDBRecordReader.java
URL: http://svn.apache.org/viewvc/incubator/sqoop/trunk/src/java/com/cloudera/sqoop/mapreduce/db/DataDrivenDBRecordReader.java?rev=1190441&r1=1190440&r2=1190441&view=diff
==============================================================================
--- incubator/sqoop/trunk/src/java/com/cloudera/sqoop/mapreduce/db/DataDrivenDBRecordReader.java (original)
+++ incubator/sqoop/trunk/src/java/com/cloudera/sqoop/mapreduce/db/DataDrivenDBRecordReader.java Fri Oct 28 16:50:39 2011
@@ -1,6 +1,4 @@
/**
- * Copyright 2011 The Apache Software Foundation
- *
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
@@ -17,15 +15,11 @@
* See the License for the specific language governing permissions and
* limitations under the License.
*/
-
package com.cloudera.sqoop.mapreduce.db;
-import java.io.IOException;
import java.sql.Connection;
import java.sql.SQLException;
-import org.apache.commons.logging.Log;
-import org.apache.commons.logging.LogFactory;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.mapreduce.lib.db.DBWritable;
@@ -34,14 +28,13 @@ import org.apache.hadoop.mapreduce.lib.d
* using data-driven WHERE clause splits.
* Emits LongWritables containing the record number as
* key and DBWritables as value.
+ *
+ * @deprecated use org.apache.sqoop.mapreduce.db.DataDrivenDBRecordReader
+ * instead.
+ * @see org.apache.sqoop.mapreduce.db.DataDrivenDBRecordReader
*/
public class DataDrivenDBRecordReader<T extends DBWritable>
- extends DBRecordReader<T> {
-
- private static final Log LOG =
- LogFactory.getLog(DataDrivenDBRecordReader.class);
-
- private String dbProductName; // database manufacturer string.
+ extends org.apache.sqoop.mapreduce.db.DataDrivenDBRecordReader<T> {
// CHECKSTYLE:OFF
// TODO(aaron): Refactor constructor to use fewer arguments.
@@ -53,77 +46,8 @@ public class DataDrivenDBRecordReader<T
Class<T> inputClass, Configuration conf, Connection conn,
DBConfiguration dbConfig, String cond, String [] fields, String table,
String dbProduct) throws SQLException {
- super(split, inputClass, conf, conn, dbConfig, cond, fields, table);
- this.dbProductName = dbProduct;
+ super(split, inputClass, conf, conn, dbConfig,
+ cond, fields, table, dbProduct);
}
// CHECKSTYLE:ON
-
- @Override
- /** {@inheritDoc} */
- public float getProgress() throws IOException {
- return isDone() ? 1.0f : 0.0f;
- }
-
- /** Returns the query for selecting the records,
- * subclasses can override this for custom behaviour.*/
- protected String getSelectQuery() {
- StringBuilder query = new StringBuilder();
- DataDrivenDBInputFormat.DataDrivenDBInputSplit dataSplit =
- (DataDrivenDBInputFormat.DataDrivenDBInputSplit) getSplit();
- DBConfiguration dbConf = getDBConf();
- String [] fieldNames = getFieldNames();
- String tableName = getTableName();
- String conditions = getConditions();
-
- // Build the WHERE clauses associated with the data split first.
- // We need them in both branches of this function.
- StringBuilder conditionClauses = new StringBuilder();
- conditionClauses.append("( ").append(dataSplit.getLowerClause());
- conditionClauses.append(" ) AND ( ").append(dataSplit.getUpperClause());
- conditionClauses.append(" )");
-
- if(dbConf.getInputQuery() == null) {
- // We need to generate the entire query.
- query.append("SELECT ");
-
- for (int i = 0; i < fieldNames.length; i++) {
- query.append(fieldNames[i]);
- if (i != fieldNames.length -1) {
- query.append(", ");
- }
- }
-
- query.append(" FROM ").append(tableName);
- if (!dbProductName.startsWith("ORACLE")) {
- // Seems to be necessary for hsqldb? Oracle explicitly does *not*
- // use this clause.
- query.append(" AS ").append(tableName);
- }
- query.append(" WHERE ");
- if (conditions != null && conditions.length() > 0) {
- // Put the user's conditions first.
- query.append("( ").append(conditions).append(" ) AND ");
- }
-
- // Now append the conditions associated with our split.
- query.append(conditionClauses.toString());
-
- } else {
- // User provided the query. We replace the special token with
- // our WHERE clause.
- String inputQuery = dbConf.getInputQuery();
- if (inputQuery.indexOf(DataDrivenDBInputFormat.SUBSTITUTE_TOKEN) == -1) {
- LOG.error("Could not find the clause substitution token "
- + DataDrivenDBInputFormat.SUBSTITUTE_TOKEN + " in the query: ["
- + inputQuery + "]. Parallel splits may not work correctly.");
- }
-
- query.append(inputQuery.replace(DataDrivenDBInputFormat.SUBSTITUTE_TOKEN,
- conditionClauses.toString()));
- }
-
- LOG.debug("Using query: " + query.toString());
-
- return query.toString();
- }
}
Modified: incubator/sqoop/trunk/src/java/com/cloudera/sqoop/mapreduce/db/DateSplitter.java
URL: http://svn.apache.org/viewvc/incubator/sqoop/trunk/src/java/com/cloudera/sqoop/mapreduce/db/DateSplitter.java?rev=1190441&r1=1190440&r2=1190441&view=diff
==============================================================================
--- incubator/sqoop/trunk/src/java/com/cloudera/sqoop/mapreduce/db/DateSplitter.java (original)
+++ incubator/sqoop/trunk/src/java/com/cloudera/sqoop/mapreduce/db/DateSplitter.java Fri Oct 28 16:50:39 2011
@@ -1,6 +1,4 @@
/**
- * Copyright 2011 The Apache Software Foundation
- *
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
@@ -17,168 +15,17 @@
* See the License for the specific language governing permissions and
* limitations under the License.
*/
-
package com.cloudera.sqoop.mapreduce.db;
-import java.sql.ResultSet;
-import java.sql.SQLException;
-import java.sql.Types;
-import java.util.ArrayList;
-import java.util.Date;
-import java.util.List;
-
-import org.apache.commons.logging.Log;
-import org.apache.commons.logging.LogFactory;
-import org.apache.hadoop.conf.Configuration;
-import org.apache.hadoop.mapreduce.InputSplit;
-
-import com.cloudera.sqoop.config.ConfigurationHelper;
/**
* Implement DBSplitter over date/time values.
* Make use of logic from IntegerSplitter, since date/time are just longs
* in Java.
+ *
+ * @deprecated use org.apache.sqoop.mapreduce.db.DateSplitter instead.
+ * @see org.apache.sqoop.mapreduce.db.DateSplitter
*/
-public class DateSplitter extends IntegerSplitter {
-
- private static final Log LOG = LogFactory.getLog(DateSplitter.class);
-
- public List<InputSplit> split(Configuration conf, ResultSet results,
- String colName) throws SQLException {
-
- long minVal;
- long maxVal;
-
- int sqlDataType = results.getMetaData().getColumnType(1);
- minVal = resultSetColToLong(results, 1, sqlDataType);
- maxVal = resultSetColToLong(results, 2, sqlDataType);
-
- String lowClausePrefix = colName + " >= ";
- String highClausePrefix = colName + " < ";
-
- int numSplits = ConfigurationHelper.getConfNumMaps(conf);
- if (numSplits < 1) {
- numSplits = 1;
- }
-
- if (minVal == Long.MIN_VALUE && maxVal == Long.MIN_VALUE) {
- // The range of acceptable dates is NULL to NULL. Just create a single
- // split.
- List<InputSplit> splits = new ArrayList<InputSplit>();
- splits.add(new DataDrivenDBInputFormat.DataDrivenDBInputSplit(
- colName + " IS NULL", colName + " IS NULL"));
- return splits;
- }
-
- // Gather the split point integers
- List<Long> splitPoints = split(numSplits, minVal, maxVal);
- List<InputSplit> splits = new ArrayList<InputSplit>();
-
- // Turn the split points into a set of intervals.
- long start = splitPoints.get(0);
- Date startDate = longToDate(start, sqlDataType);
- if (sqlDataType == Types.TIMESTAMP) {
- // The lower bound's nanos value needs to match the actual lower-bound
- // nanos.
- try {
- ((java.sql.Timestamp) startDate).setNanos(
- results.getTimestamp(1).getNanos());
- } catch (NullPointerException npe) {
- // If the lower bound was NULL, we'll get an NPE; just ignore it and
- // don't set nanos.
- }
- }
-
- for (int i = 1; i < splitPoints.size(); i++) {
- long end = splitPoints.get(i);
- Date endDate = longToDate(end, sqlDataType);
-
- if (i == splitPoints.size() - 1) {
- if (sqlDataType == Types.TIMESTAMP) {
- // The upper bound's nanos value needs to match the actual
- // upper-bound nanos.
- try {
- ((java.sql.Timestamp) endDate).setNanos(
- results.getTimestamp(2).getNanos());
- } catch (NullPointerException npe) {
- // If the upper bound was NULL, we'll get an NPE; just ignore it
- // and don't set nanos.
- }
- }
- // This is the last one; use a closed interval.
- splits.add(new DataDrivenDBInputFormat.DataDrivenDBInputSplit(
- lowClausePrefix + dateToString(startDate),
- colName + " <= " + dateToString(endDate)));
- } else {
- // Normal open-interval case.
- splits.add(new DataDrivenDBInputFormat.DataDrivenDBInputSplit(
- lowClausePrefix + dateToString(startDate),
- highClausePrefix + dateToString(endDate)));
- }
-
- start = end;
- startDate = endDate;
- }
-
- if (minVal == Long.MIN_VALUE || maxVal == Long.MIN_VALUE) {
- // Add an extra split to handle the null case that we saw.
- splits.add(new DataDrivenDBInputFormat.DataDrivenDBInputSplit(
- colName + " IS NULL", colName + " IS NULL"));
- }
-
- return splits;
- }
-
- /**
- Retrieve the value from the column in a type-appropriate manner and
- return its timestamp since the epoch. If the column is null, then return
- Long.MIN_VALUE. This will cause a special split to be generated for the
- NULL case, but may also cause poorly-balanced splits if most of the
- actual dates are positive time since the epoch, etc.
- */
- private long resultSetColToLong(ResultSet rs, int colNum, int sqlDataType)
- throws SQLException {
- try {
- switch (sqlDataType) {
- case Types.DATE:
- return rs.getDate(colNum).getTime();
- case Types.TIME:
- return rs.getTime(colNum).getTime();
- case Types.TIMESTAMP:
- return rs.getTimestamp(colNum).getTime();
- default:
- throw new SQLException("Not a date-type field");
- }
- } catch (NullPointerException npe) {
- // null column. return minimum long value.
- LOG.warn("Encountered a NULL date in the split column. "
- + "Splits may be poorly balanced.");
- return Long.MIN_VALUE;
- }
- }
-
- /** Parse the long-valued timestamp into the appropriate SQL date type. */
- private Date longToDate(long val, int sqlDataType) {
- switch (sqlDataType) {
- case Types.DATE:
- return new java.sql.Date(val);
- case Types.TIME:
- return new java.sql.Time(val);
- case Types.TIMESTAMP:
- return new java.sql.Timestamp(val);
- default: // Shouldn't ever hit this case.
- return null;
- }
- }
+public class DateSplitter extends org.apache.sqoop.mapreduce.db.DateSplitter {
- /**
- * Given a Date 'd', format it as a string for use in a SQL date
- * comparison operation.
- * @param d the date to format.
- * @return the string representing this date in SQL with any appropriate
- * quotation characters, etc.
- */
- protected String dateToString(Date d) {
- return "'" + d.toString() + "'";
- }
}
Modified: incubator/sqoop/trunk/src/java/com/cloudera/sqoop/mapreduce/db/FloatSplitter.java
URL: http://svn.apache.org/viewvc/incubator/sqoop/trunk/src/java/com/cloudera/sqoop/mapreduce/db/FloatSplitter.java?rev=1190441&r1=1190440&r2=1190441&view=diff
==============================================================================
--- incubator/sqoop/trunk/src/java/com/cloudera/sqoop/mapreduce/db/FloatSplitter.java (original)
+++ incubator/sqoop/trunk/src/java/com/cloudera/sqoop/mapreduce/db/FloatSplitter.java Fri Oct 28 16:50:39 2011
@@ -1,6 +1,4 @@
/**
- * Copyright 2011 The Apache Software Foundation
- *
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
@@ -17,88 +15,16 @@
* See the License for the specific language governing permissions and
* limitations under the License.
*/
-
package com.cloudera.sqoop.mapreduce.db;
-import java.sql.ResultSet;
-import java.sql.SQLException;
-import java.util.ArrayList;
-import java.util.List;
-
-import org.apache.commons.logging.Log;
-import org.apache.commons.logging.LogFactory;
-
-import org.apache.hadoop.conf.Configuration;
-import org.apache.hadoop.mapreduce.InputSplit;
-
-import com.cloudera.sqoop.config.ConfigurationHelper;
/**
* Implement DBSplitter over floating-point values.
+ *
+ * @deprecated use org.apache.sqoop.mapreduce.db.FloatSplitter instead.
+ * @see org.apache.sqoop.mapreduce.db.FloatSplitter
*/
-public class FloatSplitter implements DBSplitter {
-
- private static final Log LOG = LogFactory.getLog(FloatSplitter.class);
-
- private static final double MIN_INCREMENT = 10000 * Double.MIN_VALUE;
-
- public List<InputSplit> split(Configuration conf, ResultSet results,
- String colName) throws SQLException {
-
- LOG.warn("Generating splits for a floating-point index column. Due to the");
- LOG.warn("imprecise representation of floating-point values in Java, this");
- LOG.warn("may result in an incomplete import.");
- LOG.warn("You are strongly encouraged to choose an integral split column.");
-
- List<InputSplit> splits = new ArrayList<InputSplit>();
-
- if (results.getString(1) == null && results.getString(2) == null) {
- // Range is null to null. Return a null split accordingly.
- splits.add(new DataDrivenDBInputFormat.DataDrivenDBInputSplit(
- colName + " IS NULL", colName + " IS NULL"));
- return splits;
- }
-
- double minVal = results.getDouble(1);
- double maxVal = results.getDouble(2);
-
- // Use this as a hint. May need an extra task if the size doesn't
- // divide cleanly.
- int numSplits = ConfigurationHelper.getConfNumMaps(conf);
- double splitSize = (maxVal - minVal) / (double) numSplits;
-
- if (splitSize < MIN_INCREMENT) {
- splitSize = MIN_INCREMENT;
- }
-
- String lowClausePrefix = colName + " >= ";
- String highClausePrefix = colName + " < ";
-
- double curLower = minVal;
- double curUpper = curLower + splitSize;
-
- while (curUpper < maxVal) {
- splits.add(new DataDrivenDBInputFormat.DataDrivenDBInputSplit(
- lowClausePrefix + Double.toString(curLower),
- highClausePrefix + Double.toString(curUpper)));
-
- curLower = curUpper;
- curUpper += splitSize;
- }
-
- // Catch any overage and create the closed interval for the last split.
- if (curLower <= maxVal || splits.size() == 1) {
- splits.add(new DataDrivenDBInputFormat.DataDrivenDBInputSplit(
- lowClausePrefix + Double.toString(curUpper),
- colName + " <= " + Double.toString(maxVal)));
- }
-
- if (results.getString(1) == null || results.getString(2) == null) {
- // At least one extrema is null; add a null split.
- splits.add(new DataDrivenDBInputFormat.DataDrivenDBInputSplit(
- colName + " IS NULL", colName + " IS NULL"));
- }
+public class FloatSplitter
+ extends org.apache.sqoop.mapreduce.db.FloatSplitter {
- return splits;
- }
}
Modified: incubator/sqoop/trunk/src/java/com/cloudera/sqoop/mapreduce/db/IntegerSplitter.java
URL: http://svn.apache.org/viewvc/incubator/sqoop/trunk/src/java/com/cloudera/sqoop/mapreduce/db/IntegerSplitter.java?rev=1190441&r1=1190440&r2=1190441&view=diff
==============================================================================
--- incubator/sqoop/trunk/src/java/com/cloudera/sqoop/mapreduce/db/IntegerSplitter.java (original)
+++ incubator/sqoop/trunk/src/java/com/cloudera/sqoop/mapreduce/db/IntegerSplitter.java Fri Oct 28 16:50:39 2011
@@ -1,6 +1,4 @@
/**
- * Copyright 2011 The Apache Software Foundation
- *
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
@@ -17,133 +15,18 @@
* See the License for the specific language governing permissions and
* limitations under the License.
*/
-
package com.cloudera.sqoop.mapreduce.db;
-import java.sql.ResultSet;
-import java.sql.SQLException;
-import java.util.ArrayList;
-import java.util.List;
-
import org.apache.commons.logging.Log;
-import org.apache.commons.logging.LogFactory;
-import org.apache.hadoop.conf.Configuration;
-import org.apache.hadoop.mapreduce.InputSplit;
-
-import com.cloudera.sqoop.config.ConfigurationHelper;
/**
* Implement DBSplitter over integer values.
+ *
+ * @deprecated use org.apache.sqoop.mapreduce.db.IntegerSplitter instead.
+ * @see org.apache.sqoop.mapreduce.db.IntegerSplitter
*/
-public class IntegerSplitter implements DBSplitter {
+public class IntegerSplitter
+ extends org.apache.sqoop.mapreduce.db.IntegerSplitter {
public static final Log LOG =
- LogFactory.getLog(IntegerSplitter.class.getName());
-
- public List<InputSplit> split(Configuration conf, ResultSet results,
- String colName) throws SQLException {
-
- long minVal = results.getLong(1);
- long maxVal = results.getLong(2);
-
- String lowClausePrefix = colName + " >= ";
- String highClausePrefix = colName + " < ";
-
- int numSplits = ConfigurationHelper.getConfNumMaps(conf);
- if (numSplits < 1) {
- numSplits = 1;
- }
-
- if (results.getString(1) == null && results.getString(2) == null) {
- // Range is null to null. Return a null split accordingly.
- List<InputSplit> splits = new ArrayList<InputSplit>();
- splits.add(new DataDrivenDBInputFormat.DataDrivenDBInputSplit(
- colName + " IS NULL", colName + " IS NULL"));
- return splits;
- }
-
- // Get all the split points together.
- List<Long> splitPoints = split(numSplits, minVal, maxVal);
- if (LOG.isDebugEnabled()) {
- LOG.debug(String.format("Splits: [%,28d to %,28d] into %d parts",
- minVal, maxVal, numSplits));
- for (int i = 0; i < splitPoints.size(); i++) {
- LOG.debug(String.format("%,28d", splitPoints.get(i)));
- }
- }
- List<InputSplit> splits = new ArrayList<InputSplit>();
-
- // Turn the split points into a set of intervals.
- long start = splitPoints.get(0);
- for (int i = 1; i < splitPoints.size(); i++) {
- long end = splitPoints.get(i);
-
- if (i == splitPoints.size() - 1) {
- // This is the last one; use a closed interval.
- splits.add(new DataDrivenDBInputFormat.DataDrivenDBInputSplit(
- lowClausePrefix + Long.toString(start),
- colName + " <= " + Long.toString(end)));
- } else {
- // Normal open-interval case.
- splits.add(new DataDrivenDBInputFormat.DataDrivenDBInputSplit(
- lowClausePrefix + Long.toString(start),
- highClausePrefix + Long.toString(end)));
- }
-
- start = end;
- }
-
- if (results.getString(1) == null || results.getString(2) == null) {
- // At least one extrema is null; add a null split.
- splits.add(new DataDrivenDBInputFormat.DataDrivenDBInputSplit(
- colName + " IS NULL", colName + " IS NULL"));
- }
-
- return splits;
- }
-
- /**
- * Returns a list of longs one element longer than the list of input splits.
- * This represents the boundaries between input splits.
- * All splits are open on the top end, except the last one.
- *
- * So the list [0, 5, 8, 12, 18] would represent splits capturing the
- * intervals:
- *
- * [0, 5)
- * [5, 8)
- * [8, 12)
- * [12, 18] note the closed interval for the last split.
- */
- List<Long> split(long numSplits, long minVal, long maxVal)
- throws SQLException {
-
- List<Long> splits = new ArrayList<Long>();
-
- // We take the min-max interval and divide by the numSplits and also
- // calculate a remainder. Because of integer division rules, numsplits *
- // splitSize + minVal will always be <= maxVal. We then use the remainder
- // and add 1 if the current split index is less than the < the remainder.
- // This is guaranteed to add up to remainder and not surpass the value.
- long splitSize = (maxVal - minVal) / numSplits;
- long remainder = (maxVal - minVal) % numSplits;
- long curVal = minVal;
-
- // This will honor numSplits as long as split size > 0. If split size is
- // 0, it will have remainder splits.
- for (int i = 0; i <= numSplits; i++) {
- splits.add(curVal);
- if (curVal >= maxVal) {
- break;
- }
- curVal += splitSize;
- curVal += (i < remainder) ? 1 : 0;
- }
-
- if (splits.size() == 1) {
- // make a valid singleton split
- splits.add(maxVal);
- }
-
- return splits;
- }
+ org.apache.sqoop.mapreduce.db.IntegerSplitter.LOG;
}