You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@sqoop.apache.org by ar...@apache.org on 2011/08/25 08:27:06 UTC
svn commit: r1161404 - in /incubator/sqoop/trunk/src:
java/com/cloudera/sqoop/manager/ConnManager.java
java/com/cloudera/sqoop/manager/OracleManager.java
java/com/cloudera/sqoop/tool/ImportTool.java
test/com/cloudera/sqoop/TestIncrementalImport.java
Author: arvind
Date: Thu Aug 25 06:27:05 2011
New Revision: 1161404
URL: http://svn.apache.org/viewvc?rev=1161404&view=rev
Log:
SQOOP-321. Support date/time for incremental append imports.
(Bilung Lee via Arvind Prabhakar)
Modified:
incubator/sqoop/trunk/src/java/com/cloudera/sqoop/manager/ConnManager.java
incubator/sqoop/trunk/src/java/com/cloudera/sqoop/manager/OracleManager.java
incubator/sqoop/trunk/src/java/com/cloudera/sqoop/tool/ImportTool.java
incubator/sqoop/trunk/src/test/com/cloudera/sqoop/TestIncrementalImport.java
Modified: incubator/sqoop/trunk/src/java/com/cloudera/sqoop/manager/ConnManager.java
URL: http://svn.apache.org/viewvc/incubator/sqoop/trunk/src/java/com/cloudera/sqoop/manager/ConnManager.java?rev=1161404&r1=1161403&r2=1161404&view=diff
==============================================================================
--- incubator/sqoop/trunk/src/java/com/cloudera/sqoop/manager/ConnManager.java (original)
+++ incubator/sqoop/trunk/src/java/com/cloudera/sqoop/manager/ConnManager.java Thu Aug 25 06:27:05 2011
@@ -302,6 +302,14 @@ public abstract class ConnManager {
}
/**
+ * Given a date/time, return the quoted string that can
+ * be inserted into a SQL statement, representing that date/time.
+ */
+ public String datetimeToQueryString(String datetime, int columnType) {
+ return "'" + datetime + "'";
+ }
+
+ /**
* This method allows the ConnManager to override the creation of an
* input-bounds query that is used to create splits when running import
* based on free-form query. Any non-null return value is used, whereas a null
Modified: incubator/sqoop/trunk/src/java/com/cloudera/sqoop/manager/OracleManager.java
URL: http://svn.apache.org/viewvc/incubator/sqoop/trunk/src/java/com/cloudera/sqoop/manager/OracleManager.java?rev=1161404&r1=1161403&r2=1161404&view=diff
==============================================================================
--- incubator/sqoop/trunk/src/java/com/cloudera/sqoop/manager/OracleManager.java (original)
+++ incubator/sqoop/trunk/src/java/com/cloudera/sqoop/manager/OracleManager.java Thu Aug 25 06:27:05 2011
@@ -27,6 +27,7 @@ import java.sql.ResultSet;
import java.sql.SQLException;
import java.sql.Statement;
import java.sql.Timestamp;
+import java.sql.Types;
import java.util.ArrayList;
import java.util.HashMap;
import java.util.List;
@@ -535,6 +536,19 @@ public class OracleManager extends Gener
}
@Override
+ public String datetimeToQueryString(String datetime, int columnType) {
+ if (columnType == Types.TIMESTAMP) {
+ return "TO_TIMESTAMP('" + datetime + "', 'YYYY-MM-DD HH24:MI:SS.FF')";
+ } else if (columnType == Types.DATE) {
+ return "TO_DATE('" + datetime + "', 'YYYY-MM-DD HH24:MI:SS')";
+ } else {
+ String msg = "Column type is neither timestamp nor date!";
+ LOG.error(msg);
+ throw new RuntimeException(msg);
+ }
+ }
+
+ @Override
public boolean supportsStagingForExport() {
return true;
}
Modified: incubator/sqoop/trunk/src/java/com/cloudera/sqoop/tool/ImportTool.java
URL: http://svn.apache.org/viewvc/incubator/sqoop/trunk/src/java/com/cloudera/sqoop/tool/ImportTool.java?rev=1161404&r1=1161403&r2=1161404&view=diff
==============================================================================
--- incubator/sqoop/trunk/src/java/com/cloudera/sqoop/tool/ImportTool.java (original)
+++ incubator/sqoop/trunk/src/java/com/cloudera/sqoop/tool/ImportTool.java Thu Aug 25 06:27:05 2011
@@ -20,13 +20,12 @@ package com.cloudera.sqoop.tool;
import java.io.IOException;
-import java.math.BigDecimal;
-
import java.sql.Connection;
import java.sql.ResultSet;
+import java.sql.ResultSetMetaData;
import java.sql.SQLException;
import java.sql.Statement;
-import java.sql.Timestamp;
+import java.sql.Types;
import java.util.List;
import java.util.Map;
@@ -66,6 +65,9 @@ public class ImportTool extends BaseSqoo
// a single table).
private boolean allTables;
+ // store check column type for incremental option
+ private int checkColumnType;
+
public ImportTool() {
this("import", false);
}
@@ -159,7 +161,7 @@ public class ImportTool extends BaseSqoo
* Return the max value in the incremental-import test column. This
* value must be numeric.
*/
- private BigDecimal getMaxColumnId(SqoopOptions options) throws SQLException {
+ private Object getMaxColumnId(SqoopOptions options) throws SQLException {
StringBuilder sb = new StringBuilder();
sb.append("SELECT MAX(");
sb.append(options.getIncrementalTestColumn());
@@ -184,7 +186,17 @@ public class ImportTool extends BaseSqoo
return null;
}
- return rs.getBigDecimal(1);
+ ResultSetMetaData rsmd = rs.getMetaData();
+ checkColumnType = rsmd.getColumnType(1);
+ if (checkColumnType == Types.TIMESTAMP) {
+ return rs.getTimestamp(1);
+ } else if (checkColumnType == Types.DATE) {
+ return rs.getDate(1);
+ } else if (checkColumnType == Types.TIME) {
+ return rs.getTime(1);
+ } else {
+ return rs.getObject(1);
+ }
} finally {
try {
if (null != rs) {
@@ -205,6 +217,16 @@ public class ImportTool extends BaseSqoo
}
/**
+ * Determine if a column is date/time.
+ * @return true if column type is TIMESTAMP, DATE, or TIME.
+ */
+ private boolean isDateTimeColumn(int columnType) {
+ return (columnType == Types.TIMESTAMP)
+ || (columnType == Types.DATE)
+ || (columnType == Types.TIME);
+ }
+
+ /**
* Initialize the constraints which set the incremental import range.
* @return false if an import is not necessary, because the dataset has not
* changed.
@@ -224,24 +246,30 @@ public class ImportTool extends BaseSqoo
SqoopOptions.IncrementalMode incrementalMode = options.getIncrementalMode();
String nextIncrementalValue = null;
+ Object nextVal;
switch (incrementalMode) {
case AppendRows:
try {
- BigDecimal nextVal = getMaxColumnId(options);
- if (null != nextVal) {
- nextIncrementalValue = nextVal.toString();
+ nextVal = getMaxColumnId(options);
+ if (isDateTimeColumn(checkColumnType)) {
+ nextIncrementalValue = (nextVal == null) ? null
+ : manager.datetimeToQueryString(nextVal.toString(),
+ checkColumnType);
+ } else {
+ nextIncrementalValue = (nextVal == null) ? null : nextVal.toString();
}
} catch (SQLException sqlE) {
throw new IOException(sqlE);
}
break;
case DateLastModified:
- Timestamp dbTimestamp = manager.getCurrentDbTimestamp();
- if (null == dbTimestamp) {
+ checkColumnType = Types.TIMESTAMP;
+ nextVal = manager.getCurrentDbTimestamp();
+ if (null == nextVal) {
throw new IOException("Could not get current time from database");
}
-
- nextIncrementalValue = manager.timestampToQueryString(dbTimestamp);
+ nextIncrementalValue = manager.datetimeToQueryString(nextVal.toString(),
+ checkColumnType);
break;
default:
throw new ImportException("Undefined incremental import type: "
@@ -253,12 +281,13 @@ public class ImportTool extends BaseSqoo
StringBuilder sb = new StringBuilder();
String prevEndpoint = options.getIncrementalLastValue();
- if (incrementalMode == SqoopOptions.IncrementalMode.DateLastModified
- && null != prevEndpoint && !prevEndpoint.contains("\'")) {
- // Incremental imports based on timestamps should be 'quoted' in
+ if (isDateTimeColumn(checkColumnType) && null != prevEndpoint
+ && !prevEndpoint.startsWith("\'") && !prevEndpoint.endsWith("\'")) {
+ // Incremental imports based on date/time should be 'quoted' in
// ANSI SQL. If the user didn't specify single-quotes, put them
// around, here.
- prevEndpoint = "'" + prevEndpoint + "'";
+ prevEndpoint = manager.datetimeToQueryString(prevEndpoint,
+ checkColumnType);
}
String checkColName = manager.escapeColName(
@@ -320,7 +349,8 @@ public class ImportTool extends BaseSqoo
if (null == recordOptions) {
recordOptions = options;
}
- recordOptions.setIncrementalLastValue(nextIncrementalValue);
+ recordOptions.setIncrementalLastValue(
+ (nextVal == null) ? null : nextVal.toString());
return true;
}
Modified: incubator/sqoop/trunk/src/test/com/cloudera/sqoop/TestIncrementalImport.java
URL: http://svn.apache.org/viewvc/incubator/sqoop/trunk/src/test/com/cloudera/sqoop/TestIncrementalImport.java?rev=1161404&r1=1161403&r2=1161404&view=diff
==============================================================================
--- incubator/sqoop/trunk/src/test/com/cloudera/sqoop/TestIncrementalImport.java (original)
+++ incubator/sqoop/trunk/src/test/com/cloudera/sqoop/TestIncrementalImport.java Thu Aug 25 06:27:05 2011
@@ -347,6 +347,14 @@ public class TestIncrementalImport exten
*/
private List<String> getArgListForTable(String tableName, boolean commonArgs,
boolean isAppend) {
+ return getArgListForTable(tableName, commonArgs, isAppend, false);
+ }
+
+ /**
+ * Return a list of arguments to import the specified table.
+ */
+ private List<String> getArgListForTable(String tableName, boolean commonArgs,
+ boolean isAppend, boolean appendTimestamp) {
List<String> args = new ArrayList<String>();
if (commonArgs) {
CommonArgs.addHadoopFlags(args);
@@ -360,8 +368,13 @@ public class TestIncrementalImport exten
if (isAppend) {
args.add("--incremental");
args.add("append");
- args.add("--check-column");
- args.add("id");
+ if (!appendTimestamp) {
+ args.add("--check-column");
+ args.add("id");
+ } else {
+ args.add("--check-column");
+ args.add("last_modified");
+ }
} else {
args.add("--incremental");
args.add("lastmodified");
@@ -785,5 +798,42 @@ public class TestIncrementalImport exten
runJob(TABLE_NAME);
assertDirOfNumbers(TABLE_NAME, 20);
}
+
+ public void testIncrementalAppendTimestamp() throws Exception {
+ // Run an import, and then insert rows with the last-modified timestamp
+ // set to the exact time when the first import runs. Run a second import
+ // and ensure that we pick up the new data.
+
+ long now = System.currentTimeMillis();
+
+ final String TABLE_NAME = "incrementalAppendTimestamp";
+ Timestamp thePast = new Timestamp(now - 100);
+ createTimestampTable(TABLE_NAME, 10, thePast);
+
+ Timestamp firstJobTime = new Timestamp(now);
+ InstrumentHsqldbManager.setCurrentDbTimestamp(firstJobTime);
+
+ // Configure the job to use the instrumented Hsqldb manager.
+ Configuration conf = newConf();
+ conf.set(ConnFactory.FACTORY_CLASS_NAMES_KEY,
+ InstrumentHsqldbManagerFactory.class.getName());
+
+ List<String> args = getArgListForTable(TABLE_NAME, false, true, true);
+ createJob(TABLE_NAME, args, conf);
+ runJob(TABLE_NAME);
+ assertDirOfNumbers(TABLE_NAME, 10);
+
+ // Add some more rows with the timestamp equal to the job run timestamp.
+ insertIdTimestampRows(TABLE_NAME, 10, 20, firstJobTime);
+ assertRowCount(TABLE_NAME, 20);
+
+ // Run a second job with the clock advanced by 100 ms.
+ Timestamp secondJobTime = new Timestamp(now + 100);
+ InstrumentHsqldbManager.setCurrentDbTimestamp(secondJobTime);
+
+ // Import only those rows.
+ runJob(TABLE_NAME);
+ assertDirOfNumbers(TABLE_NAME, 20);
+ }
}