You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@sqoop.apache.org by bl...@apache.org on 2012/02/28 20:56:29 UTC

svn commit: r1294802 - in /incubator/sqoop/trunk/src: java/org/apache/sqoop/tool/ImportTool.java test/com/cloudera/sqoop/TestIncrementalImport.java

Author: blee
Date: Tue Feb 28 19:56:29 2012
New Revision: 1294802

URL: http://svn.apache.org/viewvc?rev=1294802&view=rev
Log:
SQOOP-444 Support incremental import for free form queries

Modified:
    incubator/sqoop/trunk/src/java/org/apache/sqoop/tool/ImportTool.java
    incubator/sqoop/trunk/src/test/com/cloudera/sqoop/TestIncrementalImport.java

Modified: incubator/sqoop/trunk/src/java/org/apache/sqoop/tool/ImportTool.java
URL: http://svn.apache.org/viewvc/incubator/sqoop/trunk/src/java/org/apache/sqoop/tool/ImportTool.java?rev=1294802&r1=1294801&r2=1294802&view=diff
==============================================================================
--- incubator/sqoop/trunk/src/java/org/apache/sqoop/tool/ImportTool.java (original)
+++ incubator/sqoop/trunk/src/java/org/apache/sqoop/tool/ImportTool.java Tue Feb 28 19:56:29 2012
@@ -163,23 +163,38 @@ public class ImportTool extends com.clou
    */
   private Object getMaxColumnId(SqoopOptions options) throws SQLException {
     StringBuilder sb = new StringBuilder();
+    String query;
+
     sb.append("SELECT MAX(");
     sb.append(options.getIncrementalTestColumn());
     sb.append(") FROM ");
-    sb.append(options.getTableName());
 
-    String where = options.getWhereClause();
-    if (null != where) {
-      sb.append(" WHERE ");
-      sb.append(where);
+    if (options.getTableName() != null) {
+      // Table import
+      sb.append(options.getTableName());
+
+      String where = options.getWhereClause();
+      if (null != where) {
+        sb.append(" WHERE ");
+        sb.append(where);
+      }
+      query = sb.toString();
+    } else {
+      // Free form table based import
+      sb.append("(");
+      sb.append(options.getSqlQuery());
+      sb.append(") sqoop_import_query_alias");
+
+      query = sb.toString().replaceAll("\\$CONDITIONS", "(1 = 1)");
     }
 
     Connection conn = manager.getConnection();
     Statement s = null;
     ResultSet rs = null;
     try {
+      LOG.info("Maximal id query for free form incremental import: " + query);
       s = conn.createStatement();
-      rs = s.executeQuery(sb.toString());
+      rs = s.executeQuery(query);
       if (!rs.next()) {
         // This probably means the table is empty.
         LOG.warn("Unexpected: empty results for max value query?");
@@ -334,16 +349,24 @@ public class ImportTool extends com.clou
 
     LOG.info("Upper bound value: " + nextIncrementalValue);
 
-    String prevWhereClause = options.getWhereClause();
-    if (null != prevWhereClause) {
-      sb.append(" AND (");
-      sb.append(prevWhereClause);
-      sb.append(")");
-    }
-
-    String newConstraints = sb.toString();
-    options.setWhereClause(newConstraints);
+    if (options.getTableName() != null) {
+      // Table based import
+      String prevWhereClause = options.getWhereClause();
+      if (null != prevWhereClause) {
+        sb.append(" AND (");
+        sb.append(prevWhereClause);
+        sb.append(")");
+      }
 
+      String newConstraints = sb.toString();
+      options.setWhereClause(newConstraints);
+    } else {
+      // Incremental based import
+      sb.append(" AND $CONDITIONS");
+      String newQuery = options.getSqlQuery().replace(
+        "$CONDITIONS", sb.toString());
+      options.setSqlQuery(newQuery);
+    }
     // Save this state for next time.
     SqoopOptions recordOptions = options.getParent();
     if (null == recordOptions) {
@@ -863,12 +886,6 @@ public class ImportTool extends com.clou
           "You must specify an incremental import mode with --"
           + INCREMENT_TYPE_ARG + ". " + HELP_STR);
     }
-
-    if (options.getIncrementalMode() != SqoopOptions.IncrementalMode.None
-        && options.getTableName() == null) {
-      throw new InvalidOptionsException("Incremental imports require a table."
-          + HELP_STR);
-    }
   }
 
   @Override

Modified: incubator/sqoop/trunk/src/test/com/cloudera/sqoop/TestIncrementalImport.java
URL: http://svn.apache.org/viewvc/incubator/sqoop/trunk/src/test/com/cloudera/sqoop/TestIncrementalImport.java?rev=1294802&r1=1294801&r2=1294802&view=diff
==============================================================================
--- incubator/sqoop/trunk/src/test/com/cloudera/sqoop/TestIncrementalImport.java (original)
+++ incubator/sqoop/trunk/src/test/com/cloudera/sqoop/TestIncrementalImport.java Tue Feb 28 19:56:29 2012
@@ -390,6 +390,44 @@ public class TestIncrementalImport exten
   }
 
   /**
+   * Return list of arguments to import by query.
+   * @return
+   */
+  private List<String> getArgListForQuery(String query, String directoryName,
+    boolean commonArgs, boolean isAppend, boolean appendTimestamp) {
+    List<String> args = new ArrayList<String>();
+    if (commonArgs) {
+      CommonArgs.addHadoopFlags(args);
+    }
+    args.add("--connect");
+    args.add(SOURCE_DB_URL);
+    args.add("--query");
+    args.add(query);
+    args.add("--target-dir");
+    args.add(BaseSqoopTestCase.LOCAL_WAREHOUSE_DIR
+      + System.getProperty("file.separator") + directoryName);
+    if (isAppend) {
+      args.add("--incremental");
+      args.add("append");
+      if (!appendTimestamp) {
+        args.add("--check-column");
+        args.add("id");
+      } else {
+        args.add("--check-column");
+        args.add("last_modified");
+      }
+    } else {
+      args.add("--incremental");
+      args.add("lastmodified");
+      args.add("--check-column");
+      args.add("last_modified");
+    }
+    args.add("-m");
+    args.add("1");
+
+    return args;
+  }
+  /**
    * Create a job with the specified name, where the job performs
    * an import configured with 'jobArgs'.
    */
@@ -527,6 +565,37 @@ public class TestIncrementalImport exten
     assertDirOfNumbers(TABLE_NAME, 20);
   }
 
+  public void testEmptyThenFullJobAppendWithQuery() throws Exception {
+    // Create an empty table. Import it; nothing happens.
+    // Add some rows. Verify they are appended.
+
+    final String TABLE_NAME = "withQuery";
+    createIdTable(TABLE_NAME, 0);
+    clearDir(TABLE_NAME);
+
+    final String QUERY = "SELECT id FROM withQuery WHERE $CONDITIONS";
+
+    List<String> args = getArgListForQuery(QUERY, TABLE_NAME,
+      false, true, false);
+    createJob(TABLE_NAME, args);
+    runJob(TABLE_NAME);
+    assertDirOfNumbers(TABLE_NAME, 0);
+
+    // Now add some rows.
+    insertIdRows(TABLE_NAME, 0, 10);
+
+    // Running the job a second time should import 10 rows.
+    runJob(TABLE_NAME);
+    assertDirOfNumbers(TABLE_NAME, 10);
+
+    // Add some more rows.
+    insertIdRows(TABLE_NAME, 10, 20);
+
+    // Import only those rows.
+    runJob(TABLE_NAME);
+    assertDirOfNumbers(TABLE_NAME, 20);
+  }
+
   public void testAppend() throws Exception {
     // Create a table with data in it; import it.
     // Then add more data, verify that only the incremental data is pulled.