You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@sqoop.apache.org by bl...@apache.org on 2012/02/28 20:56:29 UTC
svn commit: r1294802 - in /incubator/sqoop/trunk/src:
java/org/apache/sqoop/tool/ImportTool.java
test/com/cloudera/sqoop/TestIncrementalImport.java
Author: blee
Date: Tue Feb 28 19:56:29 2012
New Revision: 1294802
URL: http://svn.apache.org/viewvc?rev=1294802&view=rev
Log:
SQOOP-444 Support incremental import for free form queries
Modified:
incubator/sqoop/trunk/src/java/org/apache/sqoop/tool/ImportTool.java
incubator/sqoop/trunk/src/test/com/cloudera/sqoop/TestIncrementalImport.java
Modified: incubator/sqoop/trunk/src/java/org/apache/sqoop/tool/ImportTool.java
URL: http://svn.apache.org/viewvc/incubator/sqoop/trunk/src/java/org/apache/sqoop/tool/ImportTool.java?rev=1294802&r1=1294801&r2=1294802&view=diff
==============================================================================
--- incubator/sqoop/trunk/src/java/org/apache/sqoop/tool/ImportTool.java (original)
+++ incubator/sqoop/trunk/src/java/org/apache/sqoop/tool/ImportTool.java Tue Feb 28 19:56:29 2012
@@ -163,23 +163,38 @@ public class ImportTool extends com.clou
*/
private Object getMaxColumnId(SqoopOptions options) throws SQLException {
StringBuilder sb = new StringBuilder();
+ String query;
+
sb.append("SELECT MAX(");
sb.append(options.getIncrementalTestColumn());
sb.append(") FROM ");
- sb.append(options.getTableName());
- String where = options.getWhereClause();
- if (null != where) {
- sb.append(" WHERE ");
- sb.append(where);
+ if (options.getTableName() != null) {
+ // Table import
+ sb.append(options.getTableName());
+
+ String where = options.getWhereClause();
+ if (null != where) {
+ sb.append(" WHERE ");
+ sb.append(where);
+ }
+ query = sb.toString();
+ } else {
+ // Free form table based import
+ sb.append("(");
+ sb.append(options.getSqlQuery());
+ sb.append(") sqoop_import_query_alias");
+
+ query = sb.toString().replaceAll("\\$CONDITIONS", "(1 = 1)");
}
Connection conn = manager.getConnection();
Statement s = null;
ResultSet rs = null;
try {
+ LOG.info("Maximal id query for free form incremental import: " + query);
s = conn.createStatement();
- rs = s.executeQuery(sb.toString());
+ rs = s.executeQuery(query);
if (!rs.next()) {
// This probably means the table is empty.
LOG.warn("Unexpected: empty results for max value query?");
@@ -334,16 +349,24 @@ public class ImportTool extends com.clou
LOG.info("Upper bound value: " + nextIncrementalValue);
- String prevWhereClause = options.getWhereClause();
- if (null != prevWhereClause) {
- sb.append(" AND (");
- sb.append(prevWhereClause);
- sb.append(")");
- }
-
- String newConstraints = sb.toString();
- options.setWhereClause(newConstraints);
+ if (options.getTableName() != null) {
+ // Table based import
+ String prevWhereClause = options.getWhereClause();
+ if (null != prevWhereClause) {
+ sb.append(" AND (");
+ sb.append(prevWhereClause);
+ sb.append(")");
+ }
+ String newConstraints = sb.toString();
+ options.setWhereClause(newConstraints);
+ } else {
+ // Incremental based import
+ sb.append(" AND $CONDITIONS");
+ String newQuery = options.getSqlQuery().replace(
+ "$CONDITIONS", sb.toString());
+ options.setSqlQuery(newQuery);
+ }
// Save this state for next time.
SqoopOptions recordOptions = options.getParent();
if (null == recordOptions) {
@@ -863,12 +886,6 @@ public class ImportTool extends com.clou
"You must specify an incremental import mode with --"
+ INCREMENT_TYPE_ARG + ". " + HELP_STR);
}
-
- if (options.getIncrementalMode() != SqoopOptions.IncrementalMode.None
- && options.getTableName() == null) {
- throw new InvalidOptionsException("Incremental imports require a table."
- + HELP_STR);
- }
}
@Override
Modified: incubator/sqoop/trunk/src/test/com/cloudera/sqoop/TestIncrementalImport.java
URL: http://svn.apache.org/viewvc/incubator/sqoop/trunk/src/test/com/cloudera/sqoop/TestIncrementalImport.java?rev=1294802&r1=1294801&r2=1294802&view=diff
==============================================================================
--- incubator/sqoop/trunk/src/test/com/cloudera/sqoop/TestIncrementalImport.java (original)
+++ incubator/sqoop/trunk/src/test/com/cloudera/sqoop/TestIncrementalImport.java Tue Feb 28 19:56:29 2012
@@ -390,6 +390,44 @@ public class TestIncrementalImport exten
}
/**
+ * Return list of arguments to import by query.
+ * @return
+ */
+ private List<String> getArgListForQuery(String query, String directoryName,
+ boolean commonArgs, boolean isAppend, boolean appendTimestamp) {
+ List<String> args = new ArrayList<String>();
+ if (commonArgs) {
+ CommonArgs.addHadoopFlags(args);
+ }
+ args.add("--connect");
+ args.add(SOURCE_DB_URL);
+ args.add("--query");
+ args.add(query);
+ args.add("--target-dir");
+ args.add(BaseSqoopTestCase.LOCAL_WAREHOUSE_DIR
+ + System.getProperty("file.separator") + directoryName);
+ if (isAppend) {
+ args.add("--incremental");
+ args.add("append");
+ if (!appendTimestamp) {
+ args.add("--check-column");
+ args.add("id");
+ } else {
+ args.add("--check-column");
+ args.add("last_modified");
+ }
+ } else {
+ args.add("--incremental");
+ args.add("lastmodified");
+ args.add("--check-column");
+ args.add("last_modified");
+ }
+ args.add("-m");
+ args.add("1");
+
+ return args;
+ }
+ /**
* Create a job with the specified name, where the job performs
* an import configured with 'jobArgs'.
*/
@@ -527,6 +565,37 @@ public class TestIncrementalImport exten
assertDirOfNumbers(TABLE_NAME, 20);
}
+ public void testEmptyThenFullJobAppendWithQuery() throws Exception {
+ // Create an empty table. Import it; nothing happens.
+ // Add some rows. Verify they are appended.
+
+ final String TABLE_NAME = "withQuery";
+ createIdTable(TABLE_NAME, 0);
+ clearDir(TABLE_NAME);
+
+ final String QUERY = "SELECT id FROM withQuery WHERE $CONDITIONS";
+
+ List<String> args = getArgListForQuery(QUERY, TABLE_NAME,
+ false, true, false);
+ createJob(TABLE_NAME, args);
+ runJob(TABLE_NAME);
+ assertDirOfNumbers(TABLE_NAME, 0);
+
+ // Now add some rows.
+ insertIdRows(TABLE_NAME, 0, 10);
+
+ // Running the job a second time should import 10 rows.
+ runJob(TABLE_NAME);
+ assertDirOfNumbers(TABLE_NAME, 10);
+
+ // Add some more rows.
+ insertIdRows(TABLE_NAME, 10, 20);
+
+ // Import only those rows.
+ runJob(TABLE_NAME);
+ assertDirOfNumbers(TABLE_NAME, 20);
+ }
+
public void testAppend() throws Exception {
// Create a table with data in it; import it.
// Then add more data, verify that only the incremental data is pulled.