You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@sqoop.apache.org by ar...@apache.org on 2011/08/25 08:27:06 UTC

svn commit: r1161404 - in /incubator/sqoop/trunk/src: java/com/cloudera/sqoop/manager/ConnManager.java java/com/cloudera/sqoop/manager/OracleManager.java java/com/cloudera/sqoop/tool/ImportTool.java test/com/cloudera/sqoop/TestIncrementalImport.java

Author: arvind
Date: Thu Aug 25 06:27:05 2011
New Revision: 1161404

URL: http://svn.apache.org/viewvc?rev=1161404&view=rev
Log:
SQOOP-321. Support date/time for incremental append imports.

(Bilung Lee via Arvind Prabhakar)

Modified:
    incubator/sqoop/trunk/src/java/com/cloudera/sqoop/manager/ConnManager.java
    incubator/sqoop/trunk/src/java/com/cloudera/sqoop/manager/OracleManager.java
    incubator/sqoop/trunk/src/java/com/cloudera/sqoop/tool/ImportTool.java
    incubator/sqoop/trunk/src/test/com/cloudera/sqoop/TestIncrementalImport.java

Modified: incubator/sqoop/trunk/src/java/com/cloudera/sqoop/manager/ConnManager.java
URL: http://svn.apache.org/viewvc/incubator/sqoop/trunk/src/java/com/cloudera/sqoop/manager/ConnManager.java?rev=1161404&r1=1161403&r2=1161404&view=diff
==============================================================================
--- incubator/sqoop/trunk/src/java/com/cloudera/sqoop/manager/ConnManager.java (original)
+++ incubator/sqoop/trunk/src/java/com/cloudera/sqoop/manager/ConnManager.java Thu Aug 25 06:27:05 2011
@@ -302,6 +302,14 @@ public abstract class ConnManager {
   }
 
   /**
+   * Given a date/time, return the quoted string that can
+   * be inserted into a SQL statement, representing that date/time.
+   */
+  public String datetimeToQueryString(String datetime, int columnType) {
+    return "'" + datetime + "'";
+  }
+
+  /**
    * This method allows the ConnManager to override the creation of an
    * input-bounds query that is used to create splits when running import
    * based on free-form query. Any non-null return value is used, whereas a null

Modified: incubator/sqoop/trunk/src/java/com/cloudera/sqoop/manager/OracleManager.java
URL: http://svn.apache.org/viewvc/incubator/sqoop/trunk/src/java/com/cloudera/sqoop/manager/OracleManager.java?rev=1161404&r1=1161403&r2=1161404&view=diff
==============================================================================
--- incubator/sqoop/trunk/src/java/com/cloudera/sqoop/manager/OracleManager.java (original)
+++ incubator/sqoop/trunk/src/java/com/cloudera/sqoop/manager/OracleManager.java Thu Aug 25 06:27:05 2011
@@ -27,6 +27,7 @@ import java.sql.ResultSet;
 import java.sql.SQLException;
 import java.sql.Statement;
 import java.sql.Timestamp;
+import java.sql.Types;
 import java.util.ArrayList;
 import java.util.HashMap;
 import java.util.List;
@@ -535,6 +536,19 @@ public class OracleManager extends Gener
   }
 
   @Override
+  public String datetimeToQueryString(String datetime, int columnType) {
+    if (columnType == Types.TIMESTAMP) {
+      return "TO_TIMESTAMP('" + datetime + "', 'YYYY-MM-DD HH24:MI:SS.FF')";
+    } else if (columnType == Types.DATE) {
+      return "TO_DATE('" + datetime + "', 'YYYY-MM-DD HH24:MI:SS')";
+    } else {
+      String msg = "Column type is neither timestamp nor date!";
+      LOG.error(msg);
+      throw new RuntimeException(msg);
+    }
+  }
+
+  @Override
   public boolean supportsStagingForExport() {
     return true;
   }

Modified: incubator/sqoop/trunk/src/java/com/cloudera/sqoop/tool/ImportTool.java
URL: http://svn.apache.org/viewvc/incubator/sqoop/trunk/src/java/com/cloudera/sqoop/tool/ImportTool.java?rev=1161404&r1=1161403&r2=1161404&view=diff
==============================================================================
--- incubator/sqoop/trunk/src/java/com/cloudera/sqoop/tool/ImportTool.java (original)
+++ incubator/sqoop/trunk/src/java/com/cloudera/sqoop/tool/ImportTool.java Thu Aug 25 06:27:05 2011
@@ -20,13 +20,12 @@ package com.cloudera.sqoop.tool;
 
 import java.io.IOException;
 
-import java.math.BigDecimal;
-
 import java.sql.Connection;
 import java.sql.ResultSet;
+import java.sql.ResultSetMetaData;
 import java.sql.SQLException;
 import java.sql.Statement;
-import java.sql.Timestamp;
+import java.sql.Types;
 import java.util.List;
 import java.util.Map;
 
@@ -66,6 +65,9 @@ public class ImportTool extends BaseSqoo
   // a single table).
   private boolean allTables;
 
+  // store check column type for incremental option
+  private int checkColumnType;
+
   public ImportTool() {
     this("import", false);
   }
@@ -159,7 +161,7 @@ public class ImportTool extends BaseSqoo
    * Return the max value in the incremental-import test column. This
    * value must be numeric.
    */
-  private BigDecimal getMaxColumnId(SqoopOptions options) throws SQLException {
+  private Object getMaxColumnId(SqoopOptions options) throws SQLException {
     StringBuilder sb = new StringBuilder();
     sb.append("SELECT MAX(");
     sb.append(options.getIncrementalTestColumn());
@@ -184,7 +186,17 @@ public class ImportTool extends BaseSqoo
         return null;
       }
 
-      return rs.getBigDecimal(1);
+      ResultSetMetaData rsmd = rs.getMetaData();
+      checkColumnType = rsmd.getColumnType(1);
+      if (checkColumnType == Types.TIMESTAMP) {
+        return rs.getTimestamp(1);
+      } else if (checkColumnType == Types.DATE) {
+        return rs.getDate(1);
+      } else if (checkColumnType == Types.TIME) {
+        return rs.getTime(1);
+      } else {
+        return rs.getObject(1);
+      }
     } finally {
       try {
         if (null != rs) {
@@ -205,6 +217,16 @@ public class ImportTool extends BaseSqoo
   }
 
   /**
+   * Determine if a column is date/time.
+   * @return true if column type is TIMESTAMP, DATE, or TIME.
+   */
+  private boolean isDateTimeColumn(int columnType) {
+    return (columnType == Types.TIMESTAMP)
+        || (columnType == Types.DATE)
+        || (columnType == Types.TIME);
+  }
+
+  /**
    * Initialize the constraints which set the incremental import range.
    * @return false if an import is not necessary, because the dataset has not
    * changed.
@@ -224,24 +246,30 @@ public class ImportTool extends BaseSqoo
     SqoopOptions.IncrementalMode incrementalMode = options.getIncrementalMode();
     String nextIncrementalValue = null;
 
+    Object nextVal;
     switch (incrementalMode) {
     case AppendRows:
       try {
-        BigDecimal nextVal = getMaxColumnId(options);
-        if (null != nextVal) {
-          nextIncrementalValue = nextVal.toString();
+        nextVal = getMaxColumnId(options);
+        if (isDateTimeColumn(checkColumnType)) {
+          nextIncrementalValue = (nextVal == null) ? null
+            : manager.datetimeToQueryString(nextVal.toString(),
+                                            checkColumnType);
+        } else {
+          nextIncrementalValue = (nextVal == null) ? null : nextVal.toString();
         }
       } catch (SQLException sqlE) {
         throw new IOException(sqlE);
       }
       break;
     case DateLastModified:
-      Timestamp dbTimestamp = manager.getCurrentDbTimestamp();
-      if (null == dbTimestamp) {
+      checkColumnType = Types.TIMESTAMP;
+      nextVal = manager.getCurrentDbTimestamp();
+      if (null == nextVal) {
         throw new IOException("Could not get current time from database");
       }
-
-      nextIncrementalValue = manager.timestampToQueryString(dbTimestamp);
+      nextIncrementalValue = manager.datetimeToQueryString(nextVal.toString(),
+          checkColumnType);
       break;
     default:
       throw new ImportException("Undefined incremental import type: "
@@ -253,12 +281,13 @@ public class ImportTool extends BaseSqoo
     StringBuilder sb = new StringBuilder();
     String prevEndpoint = options.getIncrementalLastValue();
 
-    if (incrementalMode == SqoopOptions.IncrementalMode.DateLastModified
-        && null != prevEndpoint && !prevEndpoint.contains("\'")) {
-      // Incremental imports based on timestamps should be 'quoted' in
+    if (isDateTimeColumn(checkColumnType) && null != prevEndpoint
+        && !prevEndpoint.startsWith("\'") && !prevEndpoint.endsWith("\'")) {
+      // Incremental imports based on date/time should be 'quoted' in
       // ANSI SQL. If the user didn't specify single-quotes, put them
       // around, here.
-      prevEndpoint = "'" + prevEndpoint + "'";
+      prevEndpoint = manager.datetimeToQueryString(prevEndpoint,
+          checkColumnType);
     }
 
     String checkColName = manager.escapeColName(
@@ -320,7 +349,8 @@ public class ImportTool extends BaseSqoo
     if (null == recordOptions) {
       recordOptions = options;
     }
-    recordOptions.setIncrementalLastValue(nextIncrementalValue);
+    recordOptions.setIncrementalLastValue(
+        (nextVal == null) ? null : nextVal.toString());
 
     return true;
   }

Modified: incubator/sqoop/trunk/src/test/com/cloudera/sqoop/TestIncrementalImport.java
URL: http://svn.apache.org/viewvc/incubator/sqoop/trunk/src/test/com/cloudera/sqoop/TestIncrementalImport.java?rev=1161404&r1=1161403&r2=1161404&view=diff
==============================================================================
--- incubator/sqoop/trunk/src/test/com/cloudera/sqoop/TestIncrementalImport.java (original)
+++ incubator/sqoop/trunk/src/test/com/cloudera/sqoop/TestIncrementalImport.java Thu Aug 25 06:27:05 2011
@@ -347,6 +347,14 @@ public class TestIncrementalImport exten
    */
   private List<String> getArgListForTable(String tableName, boolean commonArgs,
       boolean isAppend) {
+    return getArgListForTable(tableName, commonArgs, isAppend, false);
+  }
+
+  /**
+   * Return a list of arguments to import the specified table.
+   */
+  private List<String> getArgListForTable(String tableName, boolean commonArgs,
+      boolean isAppend, boolean appendTimestamp) {
     List<String> args = new ArrayList<String>();
     if (commonArgs) {
       CommonArgs.addHadoopFlags(args);
@@ -360,8 +368,13 @@ public class TestIncrementalImport exten
     if (isAppend) {
       args.add("--incremental");
       args.add("append");
-      args.add("--check-column");
-      args.add("id");
+      if (!appendTimestamp) {
+        args.add("--check-column");
+        args.add("id");
+      } else {
+        args.add("--check-column");
+        args.add("last_modified");
+      }
     } else {
       args.add("--incremental");
       args.add("lastmodified");
@@ -785,5 +798,42 @@ public class TestIncrementalImport exten
     runJob(TABLE_NAME);
     assertDirOfNumbers(TABLE_NAME, 20);
   }
+
+  public void testIncrementalAppendTimestamp() throws Exception {
+    // Run an import, and then insert rows with the last-modified timestamp
+    // set to the exact time when the first import runs. Run a second import
+    // and ensure that we pick up the new data.
+
+    long now = System.currentTimeMillis();
+
+    final String TABLE_NAME = "incrementalAppendTimestamp";
+    Timestamp thePast = new Timestamp(now - 100);
+    createTimestampTable(TABLE_NAME, 10, thePast);
+
+    Timestamp firstJobTime = new Timestamp(now);
+    InstrumentHsqldbManager.setCurrentDbTimestamp(firstJobTime);
+
+    // Configure the job to use the instrumented Hsqldb manager.
+    Configuration conf = newConf();
+    conf.set(ConnFactory.FACTORY_CLASS_NAMES_KEY,
+        InstrumentHsqldbManagerFactory.class.getName());
+
+    List<String> args = getArgListForTable(TABLE_NAME, false, true, true);
+    createJob(TABLE_NAME, args, conf);
+    runJob(TABLE_NAME);
+    assertDirOfNumbers(TABLE_NAME, 10);
+
+    // Add some more rows with the timestamp equal to the job run timestamp.
+    insertIdTimestampRows(TABLE_NAME, 10, 20, firstJobTime);
+    assertRowCount(TABLE_NAME, 20);
+
+    // Run a second job with the clock advanced by 100 ms.
+    Timestamp secondJobTime = new Timestamp(now + 100);
+    InstrumentHsqldbManager.setCurrentDbTimestamp(secondJobTime);
+
+    // Import only those rows.
+    runJob(TABLE_NAME);
+    assertDirOfNumbers(TABLE_NAME, 20);
+  }
 }