You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@sqoop.apache.org by ar...@apache.org on 2011/09/28 19:43:11 UTC

svn commit: r1176981 - in /incubator/sqoop/trunk/src: docs/man/ docs/user/ java/com/cloudera/sqoop/ java/com/cloudera/sqoop/manager/ java/com/cloudera/sqoop/mapreduce/ java/com/cloudera/sqoop/mapreduce/db/ java/com/cloudera/sqoop/tool/ test/com/clouder...

Author: arvind
Date: Wed Sep 28 17:43:11 2011
New Revision: 1176981

URL: http://svn.apache.org/viewvc?rev=1176981&view=rev
Log:
SQOOP-331. Support for boundary query.

(Jarek Jarcec Cecho via Arvind Prabhakar)

Added:
    incubator/sqoop/trunk/src/test/com/cloudera/sqoop/TestBoundaryQuery.java
Modified:
    incubator/sqoop/trunk/src/docs/man/import-args.txt
    incubator/sqoop/trunk/src/docs/user/import.txt
    incubator/sqoop/trunk/src/java/com/cloudera/sqoop/SqoopOptions.java
    incubator/sqoop/trunk/src/java/com/cloudera/sqoop/manager/SqlManager.java
    incubator/sqoop/trunk/src/java/com/cloudera/sqoop/mapreduce/DataDrivenImportJob.java
    incubator/sqoop/trunk/src/java/com/cloudera/sqoop/mapreduce/db/DataDrivenDBInputFormat.java
    incubator/sqoop/trunk/src/java/com/cloudera/sqoop/tool/BaseSqoopTool.java
    incubator/sqoop/trunk/src/java/com/cloudera/sqoop/tool/ImportTool.java
    incubator/sqoop/trunk/src/test/com/cloudera/sqoop/TestSqoopOptions.java

Modified: incubator/sqoop/trunk/src/docs/man/import-args.txt
URL: http://svn.apache.org/viewvc/incubator/sqoop/trunk/src/docs/man/import-args.txt?rev=1176981&r1=1176980&r2=1176981&view=diff
==============================================================================
--- incubator/sqoop/trunk/src/docs/man/import-args.txt (original)
+++ incubator/sqoop/trunk/src/docs/man/import-args.txt Wed Sep 28 17:43:11 2011
@@ -35,6 +35,9 @@ Import control options
 --as-textfile::
   Imports data as plain text (default)
 
+--boundary-query (query)::
+  Using following query to select minimal and maximal value of '--split-by' column for creating splits
+
 --columns (col,col,col...)::
   Columns to export from table
 

Modified: incubator/sqoop/trunk/src/docs/user/import.txt
URL: http://svn.apache.org/viewvc/incubator/sqoop/trunk/src/docs/user/import.txt?rev=1176981&r1=1176980&r2=1176981&view=diff
==============================================================================
--- incubator/sqoop/trunk/src/docs/user/import.txt (original)
+++ incubator/sqoop/trunk/src/docs/user/import.txt Wed Sep 28 17:43:11 2011
@@ -60,6 +60,7 @@ Argument                          Descri
 +\--as-avrodatafile+              Imports data to Avro Data Files
 +\--as-sequencefile+              Imports data to SequenceFiles
 +\--as-textfile+                  Imports data as plain text (default)
++\--boundary-query <statement>+   Boundary query to use for creating splits
 +\--columns <col,col,col...>+     Columns to import from table
 +\--direct+                       Use direct import fast path
 +\--direct-split-size <n>+        Split the input stream every 'n' bytes\
@@ -114,6 +115,11 @@ form +SELECT <column list> FROM <table n
 "id > 400"+. Only rows where the +id+ column has a value greater than
 400 will be imported.
 
+By default sqoop will use query +select min(<split-by>), max(<split-by>) from
+<table name>+ to find out boundaries for creating splits. In some cases this query
+is not the most optimal so you can specify any arbitrary query returning two
+numeric columns using +\--boundary-query+ argument.
+
 Free-form Query Imports
 ^^^^^^^^^^^^^^^^^^^^^^^
 

Modified: incubator/sqoop/trunk/src/java/com/cloudera/sqoop/SqoopOptions.java
URL: http://svn.apache.org/viewvc/incubator/sqoop/trunk/src/java/com/cloudera/sqoop/SqoopOptions.java?rev=1176981&r1=1176980&r2=1176981&view=diff
==============================================================================
--- incubator/sqoop/trunk/src/java/com/cloudera/sqoop/SqoopOptions.java (original)
+++ incubator/sqoop/trunk/src/java/com/cloudera/sqoop/SqoopOptions.java Wed Sep 28 17:43:11 2011
@@ -138,6 +138,7 @@ public class SqoopOptions implements Clo
   @StoredAsProperty("db.split.column") private String splitByCol;
   @StoredAsProperty("db.where.clause") private String whereClause;
   @StoredAsProperty("db.query") private String sqlQuery;
+  @StoredAsProperty("db.query.boundary") private String boundaryQuery;
   @StoredAsProperty("jdbc.driver.class") private String driverClassName;
   @StoredAsProperty("hdfs.warehouse.dir") private String warehouseDir;
   @StoredAsProperty("hdfs.target.dir") private String targetDir;
@@ -1200,6 +1201,14 @@ public class SqoopOptions implements Clo
     this.sqlQuery = sqlStatement;
   }
 
+  public String getBoundaryQuery() {
+    return boundaryQuery;
+  }
+
+  public void setBoundaryQuery(String sqlStatement) {
+    boundaryQuery = sqlStatement;
+  }
+
   /**
    * @return The JDBC driver class name specified with --driver.
    */

Modified: incubator/sqoop/trunk/src/java/com/cloudera/sqoop/manager/SqlManager.java
URL: http://svn.apache.org/viewvc/incubator/sqoop/trunk/src/java/com/cloudera/sqoop/manager/SqlManager.java?rev=1176981&r1=1176980&r2=1176981&view=diff
==============================================================================
--- incubator/sqoop/trunk/src/java/com/cloudera/sqoop/manager/SqlManager.java (original)
+++ incubator/sqoop/trunk/src/java/com/cloudera/sqoop/manager/SqlManager.java Wed Sep 28 17:43:11 2011
@@ -870,4 +870,8 @@ public abstract class SqlManager extends
       }
     }
   }
+
+  public String getInputBoundsQuery(String splitByCol, String sanitizedQuery) {
+    return options.getBoundaryQuery();
+  }
 }

Modified: incubator/sqoop/trunk/src/java/com/cloudera/sqoop/mapreduce/DataDrivenImportJob.java
URL: http://svn.apache.org/viewvc/incubator/sqoop/trunk/src/java/com/cloudera/sqoop/mapreduce/DataDrivenImportJob.java?rev=1176981&r1=1176980&r2=1176981&view=diff
==============================================================================
--- incubator/sqoop/trunk/src/java/com/cloudera/sqoop/mapreduce/DataDrivenImportJob.java (original)
+++ incubator/sqoop/trunk/src/java/com/cloudera/sqoop/mapreduce/DataDrivenImportJob.java Wed Sep 28 17:43:11 2011
@@ -155,14 +155,24 @@ public class DataDrivenImportJob extends
         DataDrivenDBInputFormat.setInput(job, DBWritable.class,
             mgr.escapeTableName(tableName), whereClause,
             mgr.escapeColName(splitByCol), sqlColNames);
+
+        // If user specified boundary query on the command line propagate it to
+        // the job
+        if(options.getBoundaryQuery() != null) {
+          DataDrivenDBInputFormat.setBoundingQuery(job.getConfiguration(),
+                  options.getBoundaryQuery());
+        }
       } else {
         // Import a free-form query.
         String inputQuery = options.getSqlQuery();
         String sanitizedQuery = inputQuery.replace(
             DataDrivenDBInputFormat.SUBSTITUTE_TOKEN, " (1 = 1) ");
 
-        String inputBoundingQuery =
+        String inputBoundingQuery = options.getBoundaryQuery();
+
+        if(inputBoundingQuery == null) {
             mgr.getInputBoundsQuery(splitByCol, sanitizedQuery);
+        }
         if (inputBoundingQuery == null) {
             inputBoundingQuery = "SELECT MIN(" + splitByCol + "), MAX("
                     + splitByCol + ") FROM (" + sanitizedQuery + ") AS t1";

Modified: incubator/sqoop/trunk/src/java/com/cloudera/sqoop/mapreduce/db/DataDrivenDBInputFormat.java
URL: http://svn.apache.org/viewvc/incubator/sqoop/trunk/src/java/com/cloudera/sqoop/mapreduce/db/DataDrivenDBInputFormat.java?rev=1176981&r1=1176980&r2=1176981&view=diff
==============================================================================
--- incubator/sqoop/trunk/src/java/com/cloudera/sqoop/mapreduce/db/DataDrivenDBInputFormat.java (original)
+++ incubator/sqoop/trunk/src/java/com/cloudera/sqoop/mapreduce/db/DataDrivenDBInputFormat.java Wed Sep 28 17:43:11 2011
@@ -170,10 +170,14 @@ public class DataDrivenDBInputFormat<T e
   public List<InputSplit> getSplits(JobContext job) throws IOException {
 
     int targetNumTasks = ConfigurationHelper.getJobNumMaps(job);
-    if (1 == targetNumTasks) {
-      // There's no need to run a bounding vals query; just return a split
-      // that separates nothing. This can be considerably more optimal for a
-      // large table with no index.
+    String boundaryQuery = getDBConf().getInputBoundingQuery();
+
+    // If user do not forced us to use his boundary query and we don't have to
+    // bacause there is only one mapper we will return single split that
+    // separates nothing. This can be considerably more optimal for a large
+    // table with no index.
+    if (1 == targetNumTasks
+            && (boundaryQuery == null || boundaryQuery.isEmpty())) {
       List<InputSplit> singletonSplit = new ArrayList<InputSplit>();
       singletonSplit.add(new DataDrivenDBInputSplit("1=1", "1=1"));
       return singletonSplit;

Modified: incubator/sqoop/trunk/src/java/com/cloudera/sqoop/tool/BaseSqoopTool.java
URL: http://svn.apache.org/viewvc/incubator/sqoop/trunk/src/java/com/cloudera/sqoop/tool/BaseSqoopTool.java?rev=1176981&r1=1176980&r2=1176981&view=diff
==============================================================================
--- incubator/sqoop/trunk/src/java/com/cloudera/sqoop/tool/BaseSqoopTool.java (original)
+++ incubator/sqoop/trunk/src/java/com/cloudera/sqoop/tool/BaseSqoopTool.java Wed Sep 28 17:43:11 2011
@@ -136,6 +136,7 @@ public abstract class BaseSqoopTool exte
   public static final String CLASS_NAME_ARG = "class-name";
   public static final String JAR_FILE_NAME_ARG = "jar-file";
   public static final String SQL_QUERY_ARG = "query";
+  public static final String SQL_QUERY_BOUNDARY = "boundary-query";
   public static final String SQL_QUERY_SHORT_ARG = "e";
   public static final String VERBOSE_ARG = "verbose";
   public static final String HELP_ARG = "help";

Modified: incubator/sqoop/trunk/src/java/com/cloudera/sqoop/tool/ImportTool.java
URL: http://svn.apache.org/viewvc/incubator/sqoop/trunk/src/java/com/cloudera/sqoop/tool/ImportTool.java?rev=1176981&r1=1176980&r2=1176981&view=diff
==============================================================================
--- incubator/sqoop/trunk/src/java/com/cloudera/sqoop/tool/ImportTool.java (original)
+++ incubator/sqoop/trunk/src/java/com/cloudera/sqoop/tool/ImportTool.java Wed Sep 28 17:43:11 2011
@@ -527,6 +527,12 @@ public class ImportTool extends BaseSqoo
           .withDescription("Import results of SQL 'statement'")
           .withLongOpt(SQL_QUERY_ARG)
           .create(SQL_QUERY_SHORT_ARG));
+      importOpts.addOption(OptionBuilder.withArgName("statement")
+          .hasArg()
+          .withDescription("Set boundary query for retrieving max and min"
+              + " value of the primary key")
+          .withLongOpt(SQL_QUERY_BOUNDARY)
+          .create());
     }
 
     importOpts.addOption(OptionBuilder.withArgName("dir")
@@ -721,6 +727,10 @@ public class ImportTool extends BaseSqoo
         if (in.hasOption(SQL_QUERY_ARG)) {
           out.setSqlQuery(in.getOptionValue(SQL_QUERY_ARG));
         }
+
+        if(in.hasOption(SQL_QUERY_BOUNDARY)) {
+          out.setBoundaryQuery(in.getOptionValue(SQL_QUERY_BOUNDARY));
+        }
       }
 
       if (in.hasOption(WAREHOUSE_DIR_ARG)) {

Added: incubator/sqoop/trunk/src/test/com/cloudera/sqoop/TestBoundaryQuery.java
URL: http://svn.apache.org/viewvc/incubator/sqoop/trunk/src/test/com/cloudera/sqoop/TestBoundaryQuery.java?rev=1176981&view=auto
==============================================================================
--- incubator/sqoop/trunk/src/test/com/cloudera/sqoop/TestBoundaryQuery.java (added)
+++ incubator/sqoop/trunk/src/test/com/cloudera/sqoop/TestBoundaryQuery.java Wed Sep 28 17:43:11 2011
@@ -0,0 +1,166 @@
+/**
+ * Copyright 2011 The Apache Software Foundation
+ *
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package com.cloudera.sqoop;
+
+import java.io.IOException;
+import java.util.ArrayList;
+
+import org.apache.commons.cli.ParseException;
+import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.io.IOUtils;
+import org.apache.hadoop.io.SequenceFile;
+import org.apache.hadoop.util.ReflectionUtils;
+
+import com.cloudera.sqoop.SqoopOptions.InvalidOptionsException;
+import com.cloudera.sqoop.orm.CompilationManager;
+import com.cloudera.sqoop.testutil.CommonArgs;
+import com.cloudera.sqoop.testutil.HsqldbTestServer;
+import com.cloudera.sqoop.testutil.ImportJobTestCase;
+import com.cloudera.sqoop.testutil.SeqFileReader;
+import com.cloudera.sqoop.tool.ImportTool;
+import com.cloudera.sqoop.util.ClassLoaderStack;
+
+/**
+ * Test that --boundary-query works in Sqoop.
+ */
+public class TestBoundaryQuery extends ImportJobTestCase {
+
+  /**
+   * Create the argv to pass to Sqoop.
+   * @return the argv as an array of strings.
+   */
+  protected String [] getArgv(boolean includeHadoopFlags, String boundaryQuery,
+      String targetDir) {
+
+    ArrayList<String> args = new ArrayList<String>();
+
+    if (includeHadoopFlags) {
+      CommonArgs.addHadoopFlags(args);
+    }
+
+    args.add("--table");
+    args.add(HsqldbTestServer.getTableName());
+    args.add("--split-by");
+    args.add("INTFIELD1");
+    args.add("--connect");
+    args.add(HsqldbTestServer.getUrl());
+    args.add("--boundary-query");
+    args.add(boundaryQuery);
+    args.add("--as-sequencefile");
+    args.add("--target-dir");
+    args.add(targetDir);
+    args.add("--class-name");
+    args.add(getTableName());
+    args.add("--verbose");
+
+    return args.toArray(new String[0]);
+  }
+
+  // this test just uses the two int table.
+  protected String getTableName() {
+    return HsqldbTestServer.getTableName();
+  }
+
+
+  /**
+   * Given a comma-delimited list of integers, grab and parse the first int.
+   * @param str a comma-delimited list of values, the first of which is an int.
+   * @return the first field in the string, cast to int
+   */
+  private int getFirstInt(String str) {
+    String [] parts = str.split(",");
+    return Integer.parseInt(parts[0]);
+  }
+
+  public void runQueryTest(String query, int numExpectedResults,
+    int expectedSum, String targetDir)
+      throws IOException {
+
+    ClassLoader prevClassLoader = null;
+    SequenceFile.Reader reader = null;
+
+    String [] argv = getArgv(true, query, targetDir);
+    runImport(argv);
+    try {
+      SqoopOptions opts = new ImportTool().parseArguments(
+          getArgv(false, query, targetDir),
+          null, null, true);
+
+      CompilationManager compileMgr = new CompilationManager(opts);
+      String jarFileName = compileMgr.getJarFilename();
+
+      prevClassLoader = ClassLoaderStack.addJarFile(jarFileName,
+          getTableName());
+
+      reader = SeqFileReader.getSeqFileReader(getDataFilePath().toString());
+
+      // here we can actually instantiate (k, v) pairs.
+      Configuration conf = new Configuration();
+      Object key = ReflectionUtils.newInstance(reader.getKeyClass(), conf);
+      Object val = ReflectionUtils.newInstance(reader.getValueClass(), conf);
+
+      if (reader.next(key) == null) {
+        fail("Empty SequenceFile during import");
+      }
+
+      // make sure that the value we think should be at the top, is.
+      reader.getCurrentValue(val);
+
+      // We know that these values are two ints separated by a ',' character.
+      // Since this is all dynamic, though, we don't want to actually link
+      // against the class and use its methods. So we just parse this back
+      // into int fields manually.  Sum them up and ensure that we get the
+      // expected total for the first column, to verify that we got all the
+      // results from the db into the file.
+      int curSum = getFirstInt(val.toString());
+      int totalResults = 1;
+
+      // now sum up everything else in the file.
+      while (reader.next(key) != null) {
+        reader.getCurrentValue(val);
+        curSum += getFirstInt(val.toString());
+        totalResults++;
+      }
+
+      assertEquals("Total sum of first db column mismatch", expectedSum,
+          curSum);
+      assertEquals("Incorrect number of results for query", numExpectedResults,
+          totalResults);
+    } catch (InvalidOptionsException ioe) {
+      fail(ioe.toString());
+    } catch (ParseException pe) {
+      fail(pe.toString());
+    } finally {
+      IOUtils.closeStream(reader);
+
+      if (null != prevClassLoader) {
+        ClassLoaderStack.setCurrentClassLoader(prevClassLoader);
+      }
+    }
+  }
+
+  public void testBoundaryQuery() throws IOException {
+    System.out.println("PCYO");
+    String query = "select min(intfield1), max(intfield1) from "
+      + getTableName() +" where intfield1 in (3, 5)";
+
+    runQueryTest(query, 2, 8, getTablePath().toString());
+  }
+}

Modified: incubator/sqoop/trunk/src/test/com/cloudera/sqoop/TestSqoopOptions.java
URL: http://svn.apache.org/viewvc/incubator/sqoop/trunk/src/test/com/cloudera/sqoop/TestSqoopOptions.java?rev=1176981&r1=1176980&r2=1176981&view=diff
==============================================================================
--- incubator/sqoop/trunk/src/test/com/cloudera/sqoop/TestSqoopOptions.java (original)
+++ incubator/sqoop/trunk/src/test/com/cloudera/sqoop/TestSqoopOptions.java Wed Sep 28 17:43:11 2011
@@ -248,6 +248,15 @@ public class TestSqoopOptions extends Te
     assertEquals("20110413", opts.getHivePartitionValue());
   }
 
+  public void testBoundaryQueryParams() throws Exception {
+    String[] args = {
+      "--boundary-query", "select 1, 2",
+    };
+
+    SqoopOptions opts = parse(args);
+    assertEquals("select 1, 2", opts.getBoundaryQuery());
+  }
+
   public void testPropertySerialization1() {
     // Test that if we write a SqoopOptions out to a Properties,
     // and then read it back in, we get all the same results.