You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@sqoop.apache.org by an...@apache.org on 2018/01/18 14:03:57 UTC

[11/32] sqoop git commit: SQOOP-3273: Removing com.cloudera.sqoop packages

http://git-wip-us.apache.org/repos/asf/sqoop/blob/6984a36c/src/test/org/apache/sqoop/TestFreeFormQueryImport.java
----------------------------------------------------------------------
diff --git a/src/test/org/apache/sqoop/TestFreeFormQueryImport.java b/src/test/org/apache/sqoop/TestFreeFormQueryImport.java
new file mode 100644
index 0000000..2df4352
--- /dev/null
+++ b/src/test/org/apache/sqoop/TestFreeFormQueryImport.java
@@ -0,0 +1,159 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.sqoop;
+
+import java.io.BufferedReader;
+import java.io.File;
+import java.io.FileInputStream;
+import java.io.IOException;
+import java.io.InputStreamReader;
+import java.sql.SQLException;
+import java.util.ArrayList;
+import java.util.List;
+
+import org.apache.commons.logging.Log;
+import org.apache.commons.logging.LogFactory;
+import org.apache.hadoop.fs.FSDataInputStream;
+import org.apache.hadoop.fs.FileSystem;
+import org.apache.hadoop.fs.Path;
+import org.apache.hadoop.io.IOUtils;
+
+import org.apache.sqoop.testutil.CommonArgs;
+import org.apache.sqoop.testutil.ImportJobTestCase;
+import org.junit.After;
+import org.junit.Test;
+
+import static org.junit.Assert.assertEquals;
+
+/**
+ * Test free form query import.
+ */
+public class TestFreeFormQueryImport extends ImportJobTestCase {
+
+  private Log log;
+
+  public TestFreeFormQueryImport() {
+    this.log = LogFactory.getLog(TestFreeFormQueryImport.class.getName());
+  }
+
+  /**
+   * @return the Log object to use for reporting during this test
+   */
+  protected Log getLogger() {
+    return log;
+  }
+
+  /** the names of the tables we're creating. */
+  private List<String> tableNames;
+
+  @After
+  public void tearDown() {
+    // Clean up the database on our way out.
+    for (String tableName : tableNames) {
+      try {
+        dropTableIfExists(tableName);
+      } catch (SQLException e) {
+        log.warn("Error trying to drop table '" + tableName
+            + "' on tearDown: " + e);
+      }
+    }
+    super.tearDown();
+  }
+
+  /**
+   * Create the argv to pass to Sqoop.
+   * @param splitByCol column of the table used to split work.
+   * @param query free form query to be used.
+   * @return the argv as an array of strings.
+   */
+  protected String [] getArgv(String splitByCol, String query) {
+    ArrayList<String> args = new ArrayList<String>();
+
+    CommonArgs.addHadoopFlags(args);
+
+    args.add("--connect");
+    args.add(getConnectString());
+    args.add("--target-dir");
+    args.add(getWarehouseDir());
+    args.add("--split-by");
+    args.add(splitByCol);
+    args.add("--num-mappers");
+    args.add("2");
+    args.add("--query");
+    args.add(query);
+
+    return args.toArray(new String[0]);
+  }
+
+  /**
+   * Create two tables that share the common id column.  Run free-form query
+   * import on the result table that is created by joining the two tables on
+   * the id column.
+   */
+  @Test
+  public void testSimpleJoin() throws IOException {
+    tableNames = new ArrayList<String>();
+
+    String [] types1 = { "SMALLINT", };
+    String [] vals1 = { "1", };
+    String tableName1 = getTableName();
+    createTableWithColTypes(types1, vals1);
+    tableNames.add(tableName1);
+
+    incrementTableNum();
+
+    String [] types2 = { "SMALLINT", "VARCHAR(32)", };
+    String [] vals2 = { "1", "'foo'", };
+    String tableName2 = getTableName();
+    createTableWithColTypes(types2, vals2);
+    tableNames.add(tableName2);
+
+    String query = "SELECT "
+        + tableName1 + "." + getColName(0) + ", "
+        + tableName2 + "." + getColName(1) + " "
+        + "FROM " + tableName1 + " JOIN " + tableName2 + " ON ("
+        + tableName1 + "." + getColName(0) + " = "
+        + tableName2 + "." + getColName(0) + ") WHERE "
+        + tableName1 + "." + getColName(0) + " < 3 AND $CONDITIONS";
+
+    runImport(getArgv(tableName1 + "." + getColName(0), query));
+
+    Path warehousePath = new Path(this.getWarehouseDir());
+    Path filePath = new Path(warehousePath, "part-m-00000");
+    String expectedVal = "1,foo";
+
+    BufferedReader reader = null;
+    if (!isOnPhysicalCluster()) {
+      reader = new BufferedReader(
+          new InputStreamReader(new FileInputStream(
+              new File(filePath.toString()))));
+    } else {
+      FileSystem dfs = FileSystem.get(getConf());
+      FSDataInputStream dis = dfs.open(filePath);
+      reader = new BufferedReader(new InputStreamReader(dis));
+    }
+    try {
+      String line = reader.readLine();
+      assertEquals("QueryResult expected a different string",
+          expectedVal, line);
+    } finally {
+      IOUtils.closeStream(reader);
+    }
+  }
+}

http://git-wip-us.apache.org/repos/asf/sqoop/blob/6984a36c/src/test/org/apache/sqoop/TestIncrementalImport.java
----------------------------------------------------------------------
diff --git a/src/test/org/apache/sqoop/TestIncrementalImport.java b/src/test/org/apache/sqoop/TestIncrementalImport.java
new file mode 100644
index 0000000..1ab9802
--- /dev/null
+++ b/src/test/org/apache/sqoop/TestIncrementalImport.java
@@ -0,0 +1,1348 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.sqoop;
+
+import java.io.BufferedReader;
+import java.io.File;
+import java.io.IOException;
+import java.io.InputStreamReader;
+import java.sql.Connection;
+import java.sql.PreparedStatement;
+import java.sql.ResultSet;
+import java.sql.SQLException;
+import java.sql.Timestamp;
+import java.util.ArrayList;
+import java.util.Arrays;
+import java.util.List;
+
+import org.apache.sqoop.metastore.SavedJobsTestBase;
+import org.apache.commons.logging.Log;
+import org.apache.commons.logging.LogFactory;
+import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.fs.FileStatus;
+import org.apache.hadoop.fs.FileSystem;
+import org.apache.hadoop.fs.Path;
+import org.apache.hadoop.util.StringUtils;
+import org.apache.sqoop.hive.HiveImport;
+import org.apache.sqoop.manager.ConnManager;
+import org.apache.sqoop.manager.HsqldbManager;
+import org.apache.sqoop.manager.ManagerFactory;
+import org.apache.sqoop.metastore.JobData;
+import org.apache.sqoop.testutil.BaseSqoopTestCase;
+import org.apache.sqoop.testutil.CommonArgs;
+import org.apache.sqoop.tool.ImportTool;
+import org.apache.sqoop.tool.JobTool;
+import org.apache.sqoop.metastore.AutoGenericJobStorage;
+import org.junit.Before;
+import org.junit.Rule;
+import org.junit.Test;
+import org.junit.rules.ExpectedException;
+
+
+import static org.junit.Assert.*;
+
+/**
+ * Test the incremental import functionality.
+ *
+ * These all make use of the auto-connect hsqldb-based metastore.
+ * The metastore URL is configured to be in-memory, and drop all
+ * state between individual tests.
+ */
+
+public class TestIncrementalImport  {
+
+  public static final Log LOG = LogFactory.getLog(
+      TestIncrementalImport.class.getName());
+
+  // What database do we read from.
+  public static final String SOURCE_DB_URL = "jdbc:hsqldb:mem:incremental";
+  public static final String AUTO_STORAGE_PASSWORD = "";
+  public static final String AUTO_STORAGE_USERNAME = "SA";
+
+  @Rule
+  public ExpectedException thrown = ExpectedException.none();
+
+  @Before
+  public void setUp() throws Exception {
+    resetSourceDataSchema();
+  }
+
+  public static void resetSourceDataSchema() throws SQLException {
+    SqoopOptions options = new SqoopOptions();
+    options.setConnectString(SOURCE_DB_URL);
+    options.setUsername(AUTO_STORAGE_USERNAME);
+    options.setPassword(AUTO_STORAGE_PASSWORD);
+    SavedJobsTestBase.resetSchema(options);
+  }
+
+  public static Configuration newConf() {
+    Configuration conf = new Configuration();
+    conf.set(AutoGenericJobStorage.AUTO_STORAGE_USER_KEY, AUTO_STORAGE_USERNAME);
+    conf.set(AutoGenericJobStorage.AUTO_STORAGE_PASS_KEY, AUTO_STORAGE_PASSWORD);
+    conf.set(AutoGenericJobStorage.AUTO_STORAGE_CONNECT_STRING_KEY,
+            SOURCE_DB_URL);
+    return conf;
+  }
+
+  /**
+   * Assert that a table has a specified number of rows.
+   */
+  private void assertRowCount(String table, int numRows) throws SQLException {
+    SqoopOptions options = new SqoopOptions();
+    options.setConnectString(SOURCE_DB_URL);
+    HsqldbManager manager = new HsqldbManager(options);
+    Connection c = manager.getConnection();
+    PreparedStatement s = null;
+    ResultSet rs = null;
+    try {
+      s = c.prepareStatement("SELECT COUNT(*) FROM " + manager.escapeTableName(table));
+      rs = s.executeQuery();
+      if (!rs.next()) {
+        fail("No resultset");
+      }
+      int realNumRows = rs.getInt(1);
+      assertEquals(numRows, realNumRows);
+      LOG.info("Expected " + numRows + " rows -- ok.");
+    } finally {
+      if (null != s) {
+        try {
+          s.close();
+        } catch (SQLException sqlE) {
+          LOG.warn("exception: " + sqlE);
+        }
+      }
+
+      if (null != rs) {
+        try {
+          rs.close();
+        } catch (SQLException sqlE) {
+          LOG.warn("exception: " + sqlE);
+        }
+      }
+    }
+  }
+
+  /**
+   * Insert rows with id = [low, hi) into tableName.
+   */
+  private void insertIdRows(String tableName, int low, int hi)
+      throws SQLException {
+    SqoopOptions options = new SqoopOptions();
+    options.setConnectString(SOURCE_DB_URL);
+    HsqldbManager manager = new HsqldbManager(options);
+    Connection c = manager.getConnection();
+    PreparedStatement s = null;
+    try {
+      s = c.prepareStatement("INSERT INTO " + manager.escapeTableName(tableName) + " VALUES(?)");
+      for (int i = low; i < hi; i++) {
+        s.setInt(1, i);
+        s.executeUpdate();
+      }
+
+      c.commit();
+    } finally {
+      if(s != null) {
+        s.close();
+      }
+    }
+  }
+
+  /**
+   * Insert rows with id = [low, hi) into tableName with
+   * the timestamp column set to the specified ts.
+   */
+  private void insertIdTimestampRows(String tableName, int low, int hi,
+      Timestamp ts) throws SQLException {
+    LOG.info("Inserting id rows in [" + low + ", " + hi + ") @ " + ts);
+    SqoopOptions options = new SqoopOptions();
+    options.setConnectString(SOURCE_DB_URL);
+    HsqldbManager manager = new HsqldbManager(options);
+    Connection c = manager.getConnection();
+    PreparedStatement s = null;
+    try {
+      s = c.prepareStatement("INSERT INTO " + manager.escapeTableName(tableName) + " VALUES(?,?)");
+      for (int i = low; i < hi; i++) {
+        s.setInt(1, i);
+        s.setTimestamp(2, ts);
+        s.executeUpdate();
+      }
+
+      c.commit();
+    } finally {
+      s.close();
+    }
+  }
+
+  /**
+   * Insert rows with id = [low, hi) into tableName with
+   * id converted to string.
+   */
+  private void insertIdVarcharRows(String tableName, int low, int hi)
+      throws SQLException {
+    LOG.info("Inserting rows in [" + low + ", " + hi + ")");
+    SqoopOptions options = new SqoopOptions();
+    options.setConnectString(SOURCE_DB_URL);
+    HsqldbManager manager = new HsqldbManager(options);
+    Connection c = manager.getConnection();
+    PreparedStatement s = null;
+    try {
+      s = c.prepareStatement("INSERT INTO " + manager.escapeTableName(tableName) + " VALUES(?)");
+      for (int i = low; i < hi; i++) {
+        s.setString(1, Integer.toString(i));
+        s.executeUpdate();
+      }
+      c.commit();
+    } finally {
+      s.close();
+    }
+  }
+
+  /**
+   * Create a table with an 'id' column full of integers.
+   */
+  private void createIdTable(String tableName, int insertRows)
+      throws SQLException {
+    SqoopOptions options = new SqoopOptions();
+    options.setConnectString(SOURCE_DB_URL);
+    HsqldbManager manager = new HsqldbManager(options);
+    Connection c = manager.getConnection();
+    PreparedStatement s = null;
+    try {
+      s = c.prepareStatement("CREATE TABLE " + manager.escapeTableName(tableName) + "(id INT NOT NULL)");
+      s.executeUpdate();
+      c.commit();
+      insertIdRows(tableName, 0, insertRows);
+    } finally {
+      s.close();
+    }
+  }
+
+  /**
+   * Create a table with an 'id' column full of integers and a
+   * last_modified column with timestamps.
+   */
+  private void createTimestampTable(String tableName, int insertRows,
+      Timestamp baseTime) throws SQLException {
+    SqoopOptions options = new SqoopOptions();
+    options.setConnectString(SOURCE_DB_URL);
+    HsqldbManager manager = new HsqldbManager(options);
+    Connection c = manager.getConnection();
+    PreparedStatement s = null;
+    try {
+      s = c.prepareStatement("CREATE TABLE " + manager.escapeTableName(tableName) + "(id INT NOT NULL, "
+          + "last_modified TIMESTAMP)");
+      s.executeUpdate();
+      c.commit();
+      insertIdTimestampRows(tableName, 0, insertRows, baseTime);
+    } finally {
+      s.close();
+    }
+  }
+
+  /**
+   * Create a table with an 'id' column of type varchar(20)
+   */
+  private void createIdVarcharTable(String tableName,
+       int insertRows) throws SQLException {
+    SqoopOptions options = new SqoopOptions();
+    options.setConnectString(SOURCE_DB_URL);
+    HsqldbManager manager = new HsqldbManager(options);
+    Connection c = manager.getConnection();
+    PreparedStatement s = null;
+    try {
+      s = c.prepareStatement("CREATE TABLE " + manager.escapeTableName(tableName) + "(id varchar(20) NOT NULL)");
+      s.executeUpdate();
+      c.commit();
+      insertIdVarcharRows(tableName, 0, insertRows);
+    } finally {
+      s.close();
+    }
+  }
+
+  /**
+   * Delete all files in a directory for a table.
+   */
+  public void clearDir(String tableName) {
+    try {
+      FileSystem fs = FileSystem.getLocal(new Configuration());
+      Path warehouse = new Path(BaseSqoopTestCase.LOCAL_WAREHOUSE_DIR);
+      Path tableDir = new Path(warehouse, tableName);
+      fs.delete(tableDir, true);
+    } catch (Exception e) {
+      fail("Got unexpected exception: " + StringUtils.stringifyException(e));
+    }
+  }
+
+  /**
+   * Look at a directory that should contain files full of an imported 'id'
+   * column. Assert that all numbers in [0, expectedNums) are present
+   * in order.
+   */
+  public void assertDirOfNumbers(String tableName, int expectedNums) {
+    try {
+      FileSystem fs = FileSystem.getLocal(new Configuration());
+      Path warehouse = new Path(BaseSqoopTestCase.LOCAL_WAREHOUSE_DIR);
+      Path tableDir = new Path(warehouse, tableName);
+      FileStatus [] stats = fs.listStatus(tableDir);
+      String [] fileNames = new String[stats.length];
+      for (int i = 0; i < stats.length; i++) {
+        fileNames[i] = stats[i].getPath().toString();
+      }
+
+      Arrays.sort(fileNames);
+
+      // Read all the files in sorted order, adding the value lines to the list.
+      List<String> receivedNums = new ArrayList<String>();
+      for (String fileName : fileNames) {
+        if (fileName.startsWith("_") || fileName.startsWith(".")) {
+          continue;
+        }
+
+        BufferedReader r = new BufferedReader(
+            new InputStreamReader(fs.open(new Path(fileName))));
+        try {
+          while (true) {
+            String s = r.readLine();
+            if (null == s) {
+              break;
+            }
+
+            receivedNums.add(s.trim());
+          }
+        } finally {
+          r.close();
+        }
+      }
+
+      assertEquals(expectedNums, receivedNums.size());
+
+      // Compare the received values with the expected set.
+      for (int i = 0; i < expectedNums; i++) {
+        assertEquals((int) i, (int) Integer.valueOf(receivedNums.get(i)));
+      }
+    } catch (Exception e) {
+      fail("Got unexpected exception: " + StringUtils.stringifyException(e));
+    }
+  }
+
+  /**
+   * Look at a directory that should contain files full of an imported 'id'
+   * column and 'last_modified' column. Assert that all numbers in [0, expectedNums) are present
+   * in order.
+   */
+  public void assertDirOfNumbersAndTimestamps(String tableName, int expectedNums) {
+    try {
+      FileSystem fs = FileSystem.getLocal(new Configuration());
+      Path warehouse = new Path(BaseSqoopTestCase.LOCAL_WAREHOUSE_DIR);
+      Path tableDir = new Path(warehouse, tableName);
+      FileStatus [] stats = fs.listStatus(tableDir);
+      String [] fileNames = new String[stats.length];
+      for (int i = 0; i < stats.length; i++) {
+        fileNames[i] = stats[i].getPath().toString();
+      }
+
+      Arrays.sort(fileNames);
+
+      // Read all the files in sorted order, adding the value lines to the list.
+      List<String> receivedNums = new ArrayList<String>();
+      for (String fileName : fileNames) {
+        if (fileName.startsWith("_") || fileName.startsWith(".")) {
+          continue;
+        }
+
+        BufferedReader r = new BufferedReader(
+            new InputStreamReader(fs.open(new Path(fileName))));
+        try {
+          while (true) {
+            String s = r.readLine();
+            if (null == s) {
+              break;
+            }
+
+            receivedNums.add(s.trim());
+          }
+        } finally {
+          r.close();
+        }
+      }
+
+      assertEquals(expectedNums, receivedNums.size());
+
+      // Compare the received values with the expected set.
+      for (int i = 0; i < expectedNums; i++) {
+        assertEquals((int) i, (int) Integer.valueOf(receivedNums.get(i).split(",")[0]));
+      }
+    } catch (Exception e) {
+      fail("Got unexpected exception: " + StringUtils.stringifyException(e));
+    }
+  }
+
+  /**
+   * Assert that a directory contains a file with exactly one line
+   * in it, containing the prescribed number 'val'.
+   */
+  public void assertFirstSpecificNumber(String tableName, int val) {
+    try {
+      FileSystem fs = FileSystem.getLocal(new Configuration());
+      Path warehouse = new Path(BaseSqoopTestCase.LOCAL_WAREHOUSE_DIR);
+      Path tableDir = new Path(warehouse, tableName);
+      FileStatus [] stats = fs.listStatus(tableDir);
+      String [] filePaths = new String[stats.length];
+      for (int i = 0; i < stats.length; i++) {
+        filePaths[i] = stats[i].getPath().toString();
+      }
+
+      // Read the first file that is not a hidden file.
+      boolean foundVal = false;
+      for (String filePath : filePaths) {
+        String fileName = new Path(filePath).getName();
+        if (fileName.startsWith("_") || fileName.startsWith(".")) {
+          continue;
+        }
+
+        if (foundVal) {
+          // Make sure we don't have two or more "real" files in the dir.
+          fail("Got an extra data-containing file in this directory.");
+        }
+
+        BufferedReader r = new BufferedReader(
+            new InputStreamReader(fs.open(new Path(filePath))));
+        try {
+          String s = r.readLine();
+          if (null == s) {
+            fail("Unexpected empty file " + filePath + ".");
+          }
+          assertEquals(val, (int) Integer.valueOf(s.trim()));
+
+          String nextLine = r.readLine();
+          if (nextLine != null) {
+            fail("Expected only one result, but got another line: " + nextLine);
+          }
+
+          // Successfully got the value we were looking for.
+          foundVal = true;
+        } finally {
+          r.close();
+        }
+      }
+    } catch (IOException e) {
+      fail("Got unexpected exception: " + StringUtils.stringifyException(e));
+    }
+  }
+
+  /**
+   * Assert that a directory contains a file with exactly one line
+   * in it, containing the prescribed number 'val'.
+   */
+  public void assertSpecificNumber(String tableName, int val) {
+    try {
+      FileSystem fs = FileSystem.getLocal(new Configuration());
+      Path warehouse = new Path(BaseSqoopTestCase.LOCAL_WAREHOUSE_DIR);
+      Path tableDir = new Path(warehouse, tableName);
+      FileStatus [] stats = fs.listStatus(tableDir);
+      String [] filePaths = new String[stats.length];
+      for (int i = 0; i < stats.length; i++) {
+        filePaths[i] = stats[i].getPath().toString();
+      }
+
+      // Read the first file that is not a hidden file.
+      boolean foundVal = false;
+      for (String filePath : filePaths) {
+        String fileName = new Path(filePath).getName();
+        if (fileName.startsWith("_") || fileName.startsWith(".")) {
+          continue;
+        }
+
+        if (foundVal) {
+          // Make sure we don't have two or more "real" files in the dir.
+          fail("Got an extra data-containing file in this directory.");
+        }
+
+        BufferedReader r = new BufferedReader(
+            new InputStreamReader(fs.open(new Path(filePath))));
+        try {
+          String s = r.readLine();
+          if (val == (int) Integer.valueOf(s.trim().split(",")[0])) {
+            if (foundVal) {
+              fail("Expected only one result, but got another line: " + s);
+            }
+            foundVal = true;
+          }
+        } finally {
+          r.close();
+        }
+      }
+    } catch (IOException e) {
+      fail("Got unexpected exception: " + StringUtils.stringifyException(e));
+    }
+  }
+
+  public void runImport(SqoopOptions options, List<String> args) {
+    try {
+      Sqoop importer = new Sqoop(new ImportTool(), options.getConf(), options);
+      int ret = Sqoop.runSqoop(importer, args.toArray(new String[0]));
+      assertEquals("Failure during job", 0, ret);
+    } catch (Exception e) {
+      LOG.error("Got exception running Sqoop: "
+          + StringUtils.stringifyException(e));
+      throw new RuntimeException(e);
+    }
+  }
+
+  /**
+   * Return a list of arguments to import the specified table.
+   */
+  private List<String> getArgListForTable(String tableName, boolean commonArgs,
+      boolean isAppend) {
+    return getArgListForTable(tableName, commonArgs, isAppend, false);
+  }
+
+  /**
+   * Return a list of arguments to import the specified table.
+   */
+  private List<String> getArgListForTable(String tableName, boolean commonArgs,
+      boolean isAppend, boolean appendTimestamp) {
+    List<String> args = new ArrayList<String>();
+    if (commonArgs) {
+      CommonArgs.addHadoopFlags(args);
+    }
+    args.add("--connect");
+    args.add(SOURCE_DB_URL);
+    args.add("--table");
+    args.add(tableName);
+    args.add("--warehouse-dir");
+    args.add(BaseSqoopTestCase.LOCAL_WAREHOUSE_DIR);
+    if (isAppend) {
+      args.add("--incremental");
+      args.add("append");
+      if (!appendTimestamp) {
+        args.add("--check-column");
+        args.add("ID");
+      } else {
+        args.add("--check-column");
+        args.add("LAST_MODIFIED");
+      }
+    } else {
+      args.add("--incremental");
+      args.add("lastmodified");
+      args.add("--check-column");
+      args.add("LAST_MODIFIED");
+    }
+    args.add("--columns");
+    args.add("ID");
+    args.add("-m");
+    args.add("1");
+
+    return args;
+  }
+
+  /**
+   * Return list of arguments to import by query.
+   * @return
+   */
+  private List<String> getArgListForQuery(String query, String directoryName,
+    boolean commonArgs, boolean isAppend, boolean appendTimestamp) {
+    List<String> args = new ArrayList<String>();
+    if (commonArgs) {
+      CommonArgs.addHadoopFlags(args);
+    }
+
+    String [] directoryNames = directoryName.split("/");
+    String className = directoryNames[directoryNames.length -1];
+
+    args.add("--connect");
+    args.add(SOURCE_DB_URL);
+    args.add("--query");
+    args.add(query);
+    args.add("--class-name");
+    args.add(className);
+    args.add("--target-dir");
+    args.add(BaseSqoopTestCase.LOCAL_WAREHOUSE_DIR
+      + System.getProperty("file.separator") + directoryName);
+    if (isAppend) {
+      args.add("--incremental");
+      args.add("append");
+      if (!appendTimestamp) {
+        args.add("--check-column");
+        args.add("ID");
+      } else {
+        args.add("--check-column");
+        args.add("LAST_MODIFIED");
+      }
+    } else {
+      args.add("--incremental");
+      args.add("lastmodified");
+      args.add("--check-column");
+      args.add("LAST_MODIFIED");
+    }
+    args.add("-m");
+    args.add("1");
+
+    return args;
+  }
+  /**
+   * Create a job with the specified name, where the job performs
+   * an import configured with 'jobArgs'.
+   */
+  private void createJob(String jobName, List<String> jobArgs) {
+    createJob(jobName, jobArgs, newConf());
+  }
+
+  /**
+   * Create a job with the specified name, where the job performs
+   * an import configured with 'jobArgs', using the provided configuration
+   * as defaults.
+   */
+  private void createJob(String jobName, List<String> jobArgs,
+      Configuration conf) {
+    try {
+      SqoopOptions options = new SqoopOptions();
+      options.setConf(conf);
+      Sqoop makeJob = new Sqoop(new JobTool(), conf, options);
+
+      List<String> args = new ArrayList<String>();
+      args.add("--create");
+      args.add(jobName);
+      args.add("--");
+      args.add("import");
+      args.addAll(jobArgs);
+
+      int ret = Sqoop.runSqoop(makeJob, args.toArray(new String[0]));
+      assertEquals("Failure to create job", 0, ret);
+    } catch (Exception e) {
+      LOG.error("Got exception running Sqoop to create job: "
+          + StringUtils.stringifyException(e));
+      throw new RuntimeException(e);
+    }
+  }
+
+  /**
+   * Run the specified job.
+   */
+  private void runJob(String jobName) {
+    runJob(jobName, newConf());
+  }
+
+  /**
+   * Run the specified job.
+   */
+  private void runJob(String jobName, Configuration conf) {
+    try {
+      SqoopOptions options = new SqoopOptions();
+      options.setConf(conf);
+      Sqoop runJob = new Sqoop(new JobTool(), conf, options);
+
+      List<String> args = new ArrayList<String>();
+      args.add("--exec");
+      args.add(jobName);
+
+      int ret = Sqoop.runSqoop(runJob, args.toArray(new String[0]));
+      assertEquals("Failure to run job", 0, ret);
+    } catch (Exception e) {
+      LOG.error("Got exception running Sqoop to run job: "
+          + StringUtils.stringifyException(e));
+      throw new RuntimeException(e);
+    }
+  }
+
+  // Incremental import of an empty table, no metastore.
+  @Test
+  public void testEmptyAppendImport() throws Exception {
+    final String TABLE_NAME = "emptyAppend1";
+    createIdTable(TABLE_NAME, 0);
+    List<String> args = getArgListForTable(TABLE_NAME, true, true);
+
+    Configuration conf = newConf();
+    SqoopOptions options = new SqoopOptions();
+    options.setConf(conf);
+    runImport(options, args);
+
+    assertDirOfNumbers(TABLE_NAME, 0);
+  }
+
+  // Incremental import of a filled table, no metastore.
+  @Test
+  public void testFullAppendImport() throws Exception {
+    final String TABLE_NAME = "fullAppend1";
+    createIdTable(TABLE_NAME, 10);
+    List<String> args = getArgListForTable(TABLE_NAME, true, true);
+
+    Configuration conf = newConf();
+    SqoopOptions options = new SqoopOptions();
+    options.setConf(conf);
+    runImport(options, args);
+
+    assertDirOfNumbers(TABLE_NAME, 10);
+  }
+
+  @Test
+  public void testEmptyJobAppend() throws Exception {
+    // Create a job and run an import on an empty table.
+    // Nothing should happen.
+
+    final String TABLE_NAME = "emptyJob";
+    createIdTable(TABLE_NAME, 0);
+
+    List<String> args = getArgListForTable(TABLE_NAME, false, true);
+    createJob("emptyJob", args);
+    runJob("emptyJob");
+    assertDirOfNumbers(TABLE_NAME, 0);
+
+    // Running the job a second time should result in
+    // nothing happening, it's still empty.
+    runJob("emptyJob");
+    assertDirOfNumbers(TABLE_NAME, 0);
+  }
+
+  @Test
+  public void testEmptyThenFullJobAppend() throws Exception {
+    // Create an empty table. Import it; nothing happens.
+    // Add some rows. Verify they are appended.
+
+    final String TABLE_NAME = "emptyThenFull";
+    createIdTable(TABLE_NAME, 0);
+
+    List<String> args = getArgListForTable(TABLE_NAME, false, true);
+    createJob(TABLE_NAME, args);
+    runJob(TABLE_NAME);
+    assertDirOfNumbers(TABLE_NAME, 0);
+
+    // Now add some rows.
+    insertIdRows(TABLE_NAME, 0, 10);
+
+    // Running the job a second time should import 10 rows.
+    runJob(TABLE_NAME);
+    assertDirOfNumbers(TABLE_NAME, 10);
+
+    // Add some more rows.
+    insertIdRows(TABLE_NAME, 10, 20);
+
+    // Import only those rows.
+    runJob(TABLE_NAME);
+    assertDirOfNumbers(TABLE_NAME, 20);
+  }
+
+  @Test
+  public void testEmptyThenFullJobAppendWithQuery() throws Exception {
+    // Create an empty table. Import it; nothing happens.
+    // Add some rows. Verify they are appended.
+
+    final String TABLE_NAME = "withQuery";
+    createIdTable(TABLE_NAME, 0);
+    clearDir(TABLE_NAME);
+
+    final String QUERY = "SELECT id FROM \"withQuery\" WHERE $CONDITIONS";
+
+    List<String> args = getArgListForQuery(QUERY, TABLE_NAME,
+      false, true, false);
+    createJob(TABLE_NAME, args);
+    runJob(TABLE_NAME);
+    assertDirOfNumbers(TABLE_NAME, 0);
+
+    // Now add some rows.
+    insertIdRows(TABLE_NAME, 0, 10);
+
+    // Running the job a second time should import 10 rows.
+    runJob(TABLE_NAME);
+    assertDirOfNumbers(TABLE_NAME, 10);
+
+    // Add some more rows.
+    insertIdRows(TABLE_NAME, 10, 20);
+
+    // Import only those rows.
+    runJob(TABLE_NAME);
+    assertDirOfNumbers(TABLE_NAME, 20);
+  }
+
+  @Test
+  public void testAppend() throws Exception {
+    // Create a table with data in it; import it.
+    // Then add more data, verify that only the incremental data is pulled.
+
+    final String TABLE_NAME = "append";
+    createIdTable(TABLE_NAME, 10);
+
+    List<String> args = getArgListForTable(TABLE_NAME, false, true);
+    createJob(TABLE_NAME, args);
+    runJob(TABLE_NAME);
+    assertDirOfNumbers(TABLE_NAME, 10);
+
+    // Add some more rows.
+    insertIdRows(TABLE_NAME, 10, 20);
+
+    // Import only those rows.
+    runJob(TABLE_NAME);
+    assertDirOfNumbers(TABLE_NAME, 20);
+  }
+
+  @Test
+  public void testEmptyLastModified() throws Exception {
+    final String TABLE_NAME = "emptyLastModified";
+    createTimestampTable(TABLE_NAME, 0, null);
+    List<String> args = getArgListForTable(TABLE_NAME, true, false);
+
+    Configuration conf = newConf();
+    SqoopOptions options = new SqoopOptions();
+    options.setConf(conf);
+    runImport(options, args);
+
+    assertDirOfNumbers(TABLE_NAME, 0);
+  }
+
+  @Test
+  public void testEmptyLastModifiedWithNonExistingParentDirectory() throws Exception {
+    final String TABLE_NAME = "emptyLastModifiedNoParent";
+    final String QUERY = "SELECT id, last_modified FROM \"" + TABLE_NAME + "\" WHERE $CONDITIONS";
+    final String DIRECTORY = "non-existing/parents/" + TABLE_NAME;
+    createTimestampTable(TABLE_NAME, 0, null);
+    List<String> args = getArgListForQuery(QUERY, DIRECTORY, true, false, false);
+
+    Configuration conf = newConf();
+    SqoopOptions options = new SqoopOptions();
+    options.setConf(conf);
+    runImport(options, args);
+
+    assertDirOfNumbers(DIRECTORY, 0);
+  }
+
+  @Test
+  public void testFullLastModifiedImport() throws Exception {
+    // Given a table of rows imported in the past,
+    // see that they are imported.
+    final String TABLE_NAME = "fullLastModified";
+    Timestamp thePast = new Timestamp(System.currentTimeMillis() - 100);
+    createTimestampTable(TABLE_NAME, 10, thePast);
+
+    List<String> args = getArgListForTable(TABLE_NAME, true, false);
+
+    Configuration conf = newConf();
+    SqoopOptions options = new SqoopOptions();
+    options.setConf(conf);
+    runImport(options, args);
+
+    assertDirOfNumbers(TABLE_NAME, 10);
+  }
+
+  @Test
+  public void testNoImportFromTheFuture() throws Exception {
+    // If last-modified dates for writes are serialized to be in the
+    // future w.r.t. an import, do not import these rows.
+
+    final String TABLE_NAME = "futureLastModified";
+    Timestamp theFuture = new Timestamp(System.currentTimeMillis() + 1000000);
+    createTimestampTable(TABLE_NAME, 10, theFuture);
+
+    List<String> args = getArgListForTable(TABLE_NAME, true, false);
+
+    Configuration conf = newConf();
+    SqoopOptions options = new SqoopOptions();
+    options.setConf(conf);
+    runImport(options, args);
+
+    assertDirOfNumbers(TABLE_NAME, 0);
+  }
+
+  @Test
+  public void testEmptyJobLastMod() throws Exception {
+    // Create a job and run an import on an empty table.
+    // Nothing should happen.
+
+    final String TABLE_NAME = "emptyJobLastMod";
+    createTimestampTable(TABLE_NAME, 0, null);
+
+    List<String> args = getArgListForTable(TABLE_NAME, false, false);
+    args.add("--append");
+    createJob("emptyJobLastMod", args);
+    runJob("emptyJobLastMod");
+    assertDirOfNumbers(TABLE_NAME, 0);
+
+    // Running the job a second time should result in
+    // nothing happening, it's still empty.
+    runJob("emptyJobLastMod");
+    assertDirOfNumbers(TABLE_NAME, 0);
+  }
+
+  @Test
+  public void testEmptyThenFullJobLastMod() throws Exception {
+    // Create an empty table. Import it; nothing happens.
+    // Add some rows. Verify they are appended.
+
+    final String TABLE_NAME = "emptyThenFullTimestamp";
+    createTimestampTable(TABLE_NAME, 0, null);
+
+    List<String> args = getArgListForTable(TABLE_NAME, false, false);
+    args.add("--append");
+    createJob(TABLE_NAME, args);
+    runJob(TABLE_NAME);
+    assertDirOfNumbers(TABLE_NAME, 0);
+
+    long importWasBefore = System.currentTimeMillis();
+
+    // Let some time elapse.
+    Thread.sleep(50);
+
+    long rowsAddedTime = System.currentTimeMillis() - 5;
+
+    // Check: we are adding rows after the previous import time
+    // and before the current time.
+    assertTrue(rowsAddedTime > importWasBefore);
+    assertTrue(rowsAddedTime < System.currentTimeMillis());
+
+    insertIdTimestampRows(TABLE_NAME, 0, 10, new Timestamp(rowsAddedTime));
+
+    // Running the job a second time should import 10 rows.
+    runJob(TABLE_NAME);
+    assertDirOfNumbers(TABLE_NAME, 10);
+
+    // Add some more rows.
+    importWasBefore = System.currentTimeMillis();
+    Thread.sleep(50);
+    rowsAddedTime = System.currentTimeMillis() - 5;
+    assertTrue(rowsAddedTime > importWasBefore);
+    assertTrue(rowsAddedTime < System.currentTimeMillis());
+    insertIdTimestampRows(TABLE_NAME, 10, 20, new Timestamp(rowsAddedTime));
+
+    // Import only those rows.
+    runJob(TABLE_NAME);
+    assertDirOfNumbers(TABLE_NAME, 20);
+  }
+
+  @Test
+  public void testAppendWithTimestamp() throws Exception {
+    // Create a table with data in it; import it.
+    // Then add more data, verify that only the incremental data is pulled.
+
+    final String TABLE_NAME = "appendTimestamp";
+    Timestamp thePast = new Timestamp(System.currentTimeMillis() - 100);
+    createTimestampTable(TABLE_NAME, 10, thePast);
+
+    List<String> args = getArgListForTable(TABLE_NAME, false, false);
+    args.add("--append");
+    createJob(TABLE_NAME, args);
+    runJob(TABLE_NAME);
+    assertDirOfNumbers(TABLE_NAME, 10);
+
+    // Add some more rows.
+    long importWasBefore = System.currentTimeMillis();
+    Thread.sleep(50);
+    long rowsAddedTime = System.currentTimeMillis() - 5;
+    assertTrue(rowsAddedTime > importWasBefore);
+    assertTrue(rowsAddedTime < System.currentTimeMillis());
+    insertIdTimestampRows(TABLE_NAME, 10, 20, new Timestamp(rowsAddedTime));
+
+    // Import only those rows.
+    runJob(TABLE_NAME);
+    assertDirOfNumbers(TABLE_NAME, 20);
+  }
+
+  @Test
+  public void testAppendWithString() throws Exception {
+    // Create a table with string column in it;
+    // incrementally import it on the string column - it should fail.
+
+    final String TABLE_NAME = "appendString";
+    createIdVarcharTable(TABLE_NAME, 10);
+
+    List<String> args = getArgListForTable(TABLE_NAME, false, true);
+    args.add("--append");
+    createJob(TABLE_NAME, args);
+
+    thrown.expect(RuntimeException.class);
+    thrown.reportMissingExceptionWithMessage("Expected incremental import on varchar column to fail");
+    runJob(TABLE_NAME);
+  }
+
+  @Test
+  public void testModifyWithTimestamp() throws Exception {
+    // Create a table with data in it; import it.
+    // Then modify some existing rows, and verify that we only grab
+    // those rows.
+
+    final String TABLE_NAME = "modifyTimestamp";
+    Timestamp thePast = new Timestamp(System.currentTimeMillis() - 100);
+    createTimestampTable(TABLE_NAME, 10, thePast);
+
+    List<String> args = getArgListForTable(TABLE_NAME, false, false);
+    createJob(TABLE_NAME, args);
+    runJob(TABLE_NAME);
+    assertDirOfNumbers(TABLE_NAME, 10);
+
+    // Modify a row.
+    long importWasBefore = System.currentTimeMillis();
+    Thread.sleep(50);
+    long rowsAddedTime = System.currentTimeMillis() - 5;
+    assertTrue(rowsAddedTime > importWasBefore);
+    assertTrue(rowsAddedTime < System.currentTimeMillis());
+    SqoopOptions options = new SqoopOptions();
+    options.setConnectString(SOURCE_DB_URL);
+    HsqldbManager manager = new HsqldbManager(options);
+    Connection c = manager.getConnection();
+    PreparedStatement s = null;
+    try {
+      s = c.prepareStatement("UPDATE " + manager.escapeTableName(TABLE_NAME) + " SET id=?, last_modified=? WHERE id=?");
+      s.setInt(1, 4000); // the first row should have '4000' in it now.
+      s.setTimestamp(2, new Timestamp(rowsAddedTime));
+      s.setInt(3, 0);
+      s.executeUpdate();
+      c.commit();
+    } finally {
+      s.close();
+    }
+
+    // Import only the new row.
+    clearDir(TABLE_NAME);
+    runJob(TABLE_NAME);
+    assertFirstSpecificNumber(TABLE_NAME, 4000);
+  }
+  @Test
+  public void testUpdateModifyWithTimestamp() throws Exception {
+    // Create a table with data in it; import it.
+    // Then modify some existing rows, and verify that we only grab
+    // those rows.
+
+    final String TABLE_NAME = "updateModifyTimestamp";
+    Timestamp thePast = new Timestamp(System.currentTimeMillis() - 100);
+    createTimestampTable(TABLE_NAME, 10, thePast);
+
+    List<String> args = getArgListForTable(TABLE_NAME, false, false);
+
+    Configuration conf = newConf();
+    SqoopOptions options = new SqoopOptions();
+    options.setConf(conf);
+    runImport(options, args);
+    assertDirOfNumbers(TABLE_NAME, 10);
+
+    // Modify a row.
+    long importWasBefore = System.currentTimeMillis();
+    Thread.sleep(50);
+    long rowsAddedTime = System.currentTimeMillis() - 5;
+    assertTrue(rowsAddedTime > importWasBefore);
+    assertTrue(rowsAddedTime < System.currentTimeMillis());
+    SqoopOptions options2 = new SqoopOptions();
+    options2.setConnectString(SOURCE_DB_URL);
+    HsqldbManager manager = new HsqldbManager(options2);
+    Connection c = manager.getConnection();
+    PreparedStatement s = null;
+    try {
+      s = c.prepareStatement("UPDATE " + manager.escapeTableName(TABLE_NAME) + " SET id=?, last_modified=? WHERE id=?");
+      s.setInt(1, 4000); // the first row should have '4000' in it now.
+      s.setTimestamp(2, new Timestamp(rowsAddedTime));
+      s.setInt(3, 0);
+      s.executeUpdate();
+      c.commit();
+    } finally {
+      s.close();
+    }
+
+    // Update the new row.
+    args.add("--last-value");
+    args.add(new Timestamp(importWasBefore).toString());
+    args.add("--merge-key");
+    args.add("ID");
+    conf = newConf();
+    options = new SqoopOptions();
+    options.setConf(conf);
+    runImport(options, args);
+    assertSpecificNumber(TABLE_NAME, 4000);
+  }
+
+  @Test
+  public void testUpdateModifyWithTimestampWithQuery() throws Exception {
+    // Create an empty table. Import it; nothing happens.
+    // Add some rows. Verify they are appended.
+
+    final String TABLE_NAME = "UpdateModifyWithTimestampWithQuery";
+    Timestamp thePast = new Timestamp(System.currentTimeMillis() - 100);
+    createTimestampTable(TABLE_NAME, 10, thePast);
+
+    final String QUERY = "SELECT id, last_modified FROM \"UpdateModifyWithTimestampWithQuery\" WHERE $CONDITIONS";
+
+    List<String> args = getArgListForQuery(QUERY, TABLE_NAME,
+        true, false, false);
+
+    Configuration conf = newConf();
+    SqoopOptions options = new SqoopOptions();
+    options.setConf(conf);
+    runImport(options, args);
+    assertDirOfNumbersAndTimestamps(TABLE_NAME, 10);
+
+    // Modify a row.
+    long importWasBefore = System.currentTimeMillis();
+    Thread.sleep(50);
+    long rowsAddedTime = System.currentTimeMillis() - 5;
+    assertTrue(rowsAddedTime > importWasBefore);
+    assertTrue(rowsAddedTime < System.currentTimeMillis());
+    SqoopOptions options2 = new SqoopOptions();
+    options2.setConnectString(SOURCE_DB_URL);
+    HsqldbManager manager = new HsqldbManager(options2);
+    Connection c = manager.getConnection();
+    PreparedStatement s = null;
+    try {
+      s = c.prepareStatement("UPDATE " + manager.escapeTableName(TABLE_NAME) + " SET id=?, last_modified=? WHERE id=?");
+      s.setInt(1, 4000); // the first row should have '4000' in it now.
+      s.setTimestamp(2, new Timestamp(rowsAddedTime));
+      s.setInt(3, 0);
+      s.executeUpdate();
+      c.commit();
+    } finally {
+      s.close();
+    }
+
+    // Update the new row.
+    args.add("--last-value");
+    args.add(new Timestamp(importWasBefore).toString());
+    args.add("--merge-key");
+    args.add("ID");
+    conf = newConf();
+    options = new SqoopOptions();
+    options.setConf(conf);
+    runImport(options, args);
+    assertSpecificNumber(TABLE_NAME, 4000);
+  }
+
+  @Test
+  public void testUpdateModifyWithTimestampJob() throws Exception {
+    // Create a table with data in it; import it.
+    // Then modify some existing rows, and verify that we only grab
+    // those rows.
+
+    final String TABLE_NAME = "updateModifyTimestampJob";
+    Timestamp thePast = new Timestamp(System.currentTimeMillis() - 100);
+    createTimestampTable(TABLE_NAME, 10, thePast);
+
+    List<String> args = getArgListForTable(TABLE_NAME, false, false);
+    args.add("--merge-key");
+    args.add("ID");
+    createJob(TABLE_NAME, args);
+    runJob(TABLE_NAME);
+    assertDirOfNumbers(TABLE_NAME, 10);
+
+    // Modify a row.
+    long importWasBefore = System.currentTimeMillis();
+    Thread.sleep(50);
+    long rowsAddedTime = System.currentTimeMillis() - 5;
+    assertTrue(rowsAddedTime > importWasBefore);
+    assertTrue(rowsAddedTime < System.currentTimeMillis());
+    SqoopOptions options2 = new SqoopOptions();
+    options2.setConnectString(SOURCE_DB_URL);
+    HsqldbManager manager = new HsqldbManager(options2);
+    Connection c = manager.getConnection();
+    PreparedStatement s = null;
+    try {
+      s = c.prepareStatement("UPDATE " + manager.escapeTableName(TABLE_NAME) + " SET id=?, last_modified=? WHERE id=?");
+      s.setInt(1, 4000); // the first row should have '4000' in it now.
+      s.setTimestamp(2, new Timestamp(rowsAddedTime));
+      s.setInt(3, 0);
+      s.executeUpdate();
+      c.commit();
+    } finally {
+      s.close();
+    }
+
+    // Update the new row.
+    runJob(TABLE_NAME);
+    assertSpecificNumber(TABLE_NAME, 4000);
+  }
+
+  /**
+   * ManagerFactory returning an HSQLDB ConnManager which allows you to
+   * specify the current database timestamp.
+   */
+  public static class InstrumentHsqldbManagerFactory extends ManagerFactory {
+    @Override
+    public ConnManager accept(JobData data) {
+      LOG.info("Using instrumented manager");
+      return new InstrumentHsqldbManager(data.getSqoopOptions());
+    }
+  }
+
+  /**
+   * Hsqldb ConnManager that lets you set the current reported timestamp
+   * from the database, to allow testing of boundary conditions for imports.
+   */
+  public static class InstrumentHsqldbManager extends HsqldbManager {
+    private static Timestamp curTimestamp;
+
+    public InstrumentHsqldbManager(SqoopOptions options) {
+      super(options);
+    }
+
+    @Override
+    public Timestamp getCurrentDbTimestamp() {
+      return InstrumentHsqldbManager.curTimestamp;
+    }
+
+    public static void setCurrentDbTimestamp(Timestamp t) {
+      InstrumentHsqldbManager.curTimestamp = t;
+    }
+  }
+
+  @Test
+  public void testTimestampBoundary() throws Exception {
+    // Run an import, and then insert rows with the last-modified timestamp
+    // set to the exact time when the first import runs. Run a second import
+    // and ensure that we pick up the new data.
+
+    long now = System.currentTimeMillis();
+
+    final String TABLE_NAME = "boundaryTimestamp";
+    Timestamp thePast = new Timestamp(now - 100);
+    createTimestampTable(TABLE_NAME, 10, thePast);
+
+    Timestamp firstJobTime = new Timestamp(now);
+    InstrumentHsqldbManager.setCurrentDbTimestamp(firstJobTime);
+
+    // Configure the job to use the instrumented Hsqldb manager.
+    Configuration conf = newConf();
+    conf.set(ConnFactory.FACTORY_CLASS_NAMES_KEY,
+        InstrumentHsqldbManagerFactory.class.getName());
+
+    List<String> args = getArgListForTable(TABLE_NAME, false, false);
+    args.add("--append");
+    createJob(TABLE_NAME, args, conf);
+    runJob(TABLE_NAME);
+    assertDirOfNumbers(TABLE_NAME, 10);
+
+    // Add some more rows with the timestamp equal to the job run timestamp.
+    insertIdTimestampRows(TABLE_NAME, 10, 20, firstJobTime);
+    assertRowCount(TABLE_NAME, 20);
+
+    // Run a second job with the clock advanced by 100 ms.
+    Timestamp secondJobTime = new Timestamp(now + 100);
+    InstrumentHsqldbManager.setCurrentDbTimestamp(secondJobTime);
+
+    // Import only those rows.
+    runJob(TABLE_NAME);
+    assertDirOfNumbers(TABLE_NAME, 20);
+  }
+
+  @Test
+  public void testIncrementalAppendTimestamp() throws Exception {
+    // Run an import, and then insert rows with the last-modified timestamp
+    // set to the exact time when the first import runs. Run a second import
+    // and ensure that we pick up the new data.
+
+    long now = System.currentTimeMillis();
+
+    final String TABLE_NAME = "incrementalAppendTimestamp";
+    Timestamp thePast = new Timestamp(now - 100);
+    createTimestampTable(TABLE_NAME, 10, thePast);
+
+    Timestamp firstJobTime = new Timestamp(now);
+    InstrumentHsqldbManager.setCurrentDbTimestamp(firstJobTime);
+
+    // Configure the job to use the instrumented Hsqldb manager.
+    Configuration conf = newConf();
+    conf.set(ConnFactory.FACTORY_CLASS_NAMES_KEY,
+        InstrumentHsqldbManagerFactory.class.getName());
+
+    List<String> args = getArgListForTable(TABLE_NAME, false, true, true);
+    createJob(TABLE_NAME, args, conf);
+    runJob(TABLE_NAME);
+    assertDirOfNumbers(TABLE_NAME, 10);
+
+    // Add some more rows with the timestamp equal to the job run timestamp.
+    insertIdTimestampRows(TABLE_NAME, 10, 20, firstJobTime);
+    assertRowCount(TABLE_NAME, 20);
+
+    // Run a second job with the clock advanced by 100 ms.
+    Timestamp secondJobTime = new Timestamp(now + 100);
+    InstrumentHsqldbManager.setCurrentDbTimestamp(secondJobTime);
+
+    // Import only those rows.
+    runJob(TABLE_NAME);
+    assertDirOfNumbers(TABLE_NAME, 20);
+  }
+  @Test
+	public void testIncrementalHiveAppendEmptyThenFull() throws Exception {
+		// This is to test Incremental Hive append feature. SQOOP-2470
+		final String TABLE_NAME = "incrementalHiveAppendEmptyThenFull";
+		Configuration conf = newConf();
+		conf.set(ConnFactory.FACTORY_CLASS_NAMES_KEY,
+				InstrumentHsqldbManagerFactory.class.getName());
+		clearDir(TABLE_NAME);
+		createIdTable(TABLE_NAME, 0);
+		List<String> args = new ArrayList<String>();
+		args.add("--connect");
+		args.add(SOURCE_DB_URL);
+		args.add("--table");
+		args.add(TABLE_NAME);
+		args.add("--warehouse-dir");
+		args.add(BaseSqoopTestCase.LOCAL_WAREHOUSE_DIR);
+		args.add("--hive-import");
+		args.add("--hive-table");
+		args.add(TABLE_NAME + "hive");
+		args.add("--incremental");
+		args.add("append");
+		args.add("--check-column");
+		args.add("ID");
+		args.add("-m");
+		args.add("1");
+		createJob(TABLE_NAME, args, conf);
+		HiveImport.setTestMode(true);
+		String hiveHome = org.apache.sqoop.SqoopOptions.getHiveHomeDefault();
+		assertNotNull("hive.home was not set", hiveHome);
+		String testDataPath = new Path(new Path(hiveHome), "scripts/"
+				+ "incrementalHiveAppendEmpty.q").toString();
+		System.clearProperty("expected.script");
+		System.setProperty("expected.script",
+				new File(testDataPath).getAbsolutePath());
+		runJob(TABLE_NAME);
+		assertDirOfNumbers(TABLE_NAME, 0);
+		// Now add some rows.
+		insertIdRows(TABLE_NAME, 0, 10);
+		String testDataPath10 = new Path(new Path(hiveHome), "scripts/"
+				+ "incrementalHiveAppend10.q").toString();
+		System.clearProperty("expected.script");
+		System.setProperty("expected.script",
+				new File(testDataPath10).getAbsolutePath());
+		System.getProperty("expected.script");
+		// Running the job a second time should import 10 rows.
+		runJob(TABLE_NAME);
+		assertDirOfNumbers(TABLE_NAME, 10);
+		// Add some more rows.
+		insertIdRows(TABLE_NAME, 10, 20);
+		String testDataPath20 = new Path(new Path(hiveHome), "scripts/"
+				+ "incrementalHiveAppend20.q").toString();
+		System.clearProperty("expected.script");
+		System.setProperty("expected.script",
+				new File(testDataPath20).getAbsolutePath());
+		// Import only those rows.
+		runJob(TABLE_NAME);
+		assertDirOfNumbers(TABLE_NAME, 20);
+	}
+
+  // SQOOP-1890
+  @Test
+  public void testTableNameWithSpecialCharacters() throws Exception {
+    // Table name with special characters to verify proper table name escaping
+    final String TABLE_NAME = "my-table.ext";
+    createIdTable(TABLE_NAME, 0);
+
+    // Now add some rows.
+    insertIdRows(TABLE_NAME, 0, 10);
+
+    List<String> args = getArgListForTable(TABLE_NAME, false, true);
+    createJob("emptyJob", args);
+    runJob("emptyJob");
+    assertDirOfNumbers(TABLE_NAME, 10);
+  }
+
+}
+

http://git-wip-us.apache.org/repos/asf/sqoop/blob/6984a36c/src/test/org/apache/sqoop/TestMerge.java
----------------------------------------------------------------------
diff --git a/src/test/org/apache/sqoop/TestMerge.java b/src/test/org/apache/sqoop/TestMerge.java
new file mode 100644
index 0000000..8eef8d4
--- /dev/null
+++ b/src/test/org/apache/sqoop/TestMerge.java
@@ -0,0 +1,375 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.sqoop;
+
+import java.io.BufferedReader;
+import java.io.IOException;
+import java.io.InputStreamReader;
+import java.sql.Connection;
+import java.sql.PreparedStatement;
+import java.sql.SQLException;
+import java.util.Arrays;
+import java.util.List;
+import org.apache.sqoop.testutil.CommonArgs;
+import org.apache.sqoop.testutil.HsqldbTestServer;
+import org.apache.sqoop.manager.ConnManager;
+import org.apache.sqoop.testutil.BaseSqoopTestCase;
+import org.apache.sqoop.tool.CodeGenTool;
+import org.apache.sqoop.tool.ImportTool;
+import org.apache.sqoop.tool.MergeTool;
+import org.apache.sqoop.util.ClassLoaderStack;
+import org.apache.avro.file.DataFileReader;
+import org.apache.avro.file.FileReader;
+import org.apache.avro.file.SeekableInput;
+import org.apache.avro.generic.GenericDatumReader;
+import org.apache.avro.generic.GenericRecord;
+import org.apache.avro.io.DatumReader;
+import org.apache.avro.mapred.FsInput;
+import org.apache.commons.lang.StringUtils;
+import org.apache.commons.logging.Log;
+import org.apache.commons.logging.LogFactory;
+import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.fs.FileStatus;
+import org.apache.hadoop.fs.FileSystem;
+import org.apache.hadoop.fs.Path;
+import org.junit.Before;
+import org.junit.Test;
+import org.kitesdk.data.Dataset;
+import org.kitesdk.data.DatasetReader;
+import org.kitesdk.data.Datasets;
+
+import static org.apache.avro.generic.GenericData.Record;
+import static org.junit.Assert.fail;
+
+/**
+ * Test that the merge tool works.
+ */
+public class TestMerge extends BaseSqoopTestCase {
+
+  private static final Log LOG =
+      LogFactory.getLog(TestMerge.class.getName());
+
+  protected ConnManager manager;
+  protected Connection conn;
+
+  public static final String SOURCE_DB_URL = "jdbc:hsqldb:mem:merge";
+
+  private static final List<List<Integer>> initRecords = Arrays
+      .asList(Arrays.asList(new Integer(0), new Integer(0)),
+          Arrays.asList(new Integer(1), new Integer(42)));
+
+  private static final List<List<Integer>> newRecords = Arrays.asList(
+      Arrays.asList(new Integer(1), new Integer(43)),
+      Arrays.asList(new Integer(3), new Integer(313)));
+
+  private static final List<List<Integer>> mergedRecords = Arrays.asList(
+      Arrays.asList(new Integer(0), new Integer(0)),
+      Arrays.asList(new Integer(1), new Integer(43)),
+      Arrays.asList(new Integer(3), new Integer(313)));
+
+  @Before
+  public void setUp() {
+    super.setUp();
+    manager = getManager();
+    try {
+      conn = manager.getConnection();
+    } catch (SQLException e) {
+      e.printStackTrace();
+      throw new RuntimeException(e);
+    }
+  }
+
+  public static final String TABLE_NAME = "MergeTable";
+  private static final String OLD_PATH = "merge_old";
+  private static final String NEW_PATH = "merge_new";
+  private static final String FINAL_PATH = "merge_final";
+
+  public Configuration newConf() {
+    Configuration conf = new Configuration();
+    if (!BaseSqoopTestCase.isOnPhysicalCluster()) {
+      conf.set(CommonArgs.FS_DEFAULT_NAME, CommonArgs.LOCAL_FS);
+    }
+    conf.set("mapred.job.tracker", "local");
+    return conf;
+  }
+
+  /**
+   * Create a SqoopOptions to connect to the manager.
+   */
+  public SqoopOptions getSqoopOptions(Configuration conf) {
+    SqoopOptions options = new SqoopOptions(conf);
+    options.setConnectString(HsqldbTestServer.getDbUrl());
+
+    return options;
+  }
+
+  protected void createTable(List<List<Integer>> records) throws SQLException {
+    PreparedStatement s = conn.prepareStatement("DROP TABLE \"" + TABLE_NAME + "\" IF EXISTS");
+    try {
+      s.executeUpdate();
+    } finally {
+      s.close();
+    }
+
+    s = conn.prepareStatement("CREATE TABLE \"" + TABLE_NAME
+        + "\" (id INT NOT NULL PRIMARY KEY, val INT, LASTMOD timestamp)");
+    try {
+      s.executeUpdate();
+    } finally {
+      s.close();
+    }
+
+    for (List<Integer> record : records) {
+      final String values = StringUtils.join(record, ", ");
+      s = conn
+          .prepareStatement("INSERT INTO \"" + TABLE_NAME + "\" VALUES (" + values + ", now())");
+      try {
+        s.executeUpdate();
+      } finally {
+        s.close();
+      }
+    }
+
+    conn.commit();
+  }
+
+  @Test
+  public void testTextFileMerge() throws Exception {
+    runMergeTest(SqoopOptions.FileLayout.TextFile);
+  }
+
+  @Test
+  public void testAvroFileMerge() throws Exception {
+    runMergeTest(SqoopOptions.FileLayout.AvroDataFile);
+  }
+
+  @Test
+  public void testParquetFileMerge() throws Exception {
+    runMergeTest(SqoopOptions.FileLayout.ParquetFile);
+  }
+
+  public void runMergeTest(SqoopOptions.FileLayout fileLayout) throws Exception {
+    createTable(initRecords);
+
+    // Create a jar to use for the merging process; we'll load it
+    // into the current thread CL for when this runs. This needs
+    // to contain a different class name than used for the imports
+    // due to classloaderstack issues in the same JVM.
+    final String MERGE_CLASS_NAME = "ClassForMerging";
+    SqoopOptions options = getSqoopOptions(newConf());
+    options.setTableName(TABLE_NAME);
+    options.setClassName(MERGE_CLASS_NAME);
+
+    CodeGenTool codeGen = new CodeGenTool();
+    Sqoop codeGenerator = new Sqoop(codeGen, options.getConf(), options);
+    int ret = Sqoop.runSqoop(codeGenerator, new String[0]);
+    if (0 != ret) {
+      fail("Nonzero exit from codegen: " + ret);
+    }
+
+    List<String> jars = codeGen.getGeneratedJarFiles();
+    String jarFileName = jars.get(0);
+
+    // Now do the imports.
+    importData(OLD_PATH, fileLayout);
+
+    // Check that we got records that meet our expected values.
+    checkData(OLD_PATH, initRecords, fileLayout);
+
+    Thread.sleep(25);
+
+    // Modify the data in the warehouse.
+    createTable(newRecords);
+
+    Thread.sleep(25);
+
+    // Do another import, into the "new" dir.
+    importData(NEW_PATH, fileLayout);
+
+    checkData(NEW_PATH, newRecords, fileLayout);
+
+    // Now merge the results!
+    ClassLoaderStack.addJarFile(jarFileName, MERGE_CLASS_NAME);
+    Path warehouse = new Path(BaseSqoopTestCase.LOCAL_WAREHOUSE_DIR);
+    options = getSqoopOptions(newConf());
+    options.setMergeOldPath(new Path(warehouse, OLD_PATH).toString());
+    options.setMergeNewPath(new Path(warehouse, NEW_PATH).toString());
+    options.setMergeKeyCol("ID");
+    options.setTargetDir(new Path(warehouse, FINAL_PATH).toString());
+    options.setClassName(MERGE_CLASS_NAME);
+    options.setExistingJarName(jarFileName);
+
+    MergeTool mergeTool = new MergeTool();
+    Sqoop merger = new Sqoop(mergeTool, options.getConf(), options);
+    ret = Sqoop.runSqoop(merger, new String[0]);
+    if (0 != ret) {
+      fail("Merge failed with exit code " + ret);
+    }
+
+    checkData(FINAL_PATH, mergedRecords, fileLayout);
+  }
+
+  private void checkData(String dataDir, List<List<Integer>> records,
+      SqoopOptions.FileLayout fileLayout) throws Exception {
+    for (List<Integer> record : records) {
+      assertRecordStartsWith(record, dataDir, fileLayout);
+    }
+  }
+
+  private boolean valueMatches(GenericRecord genericRecord, List<Integer> recordVals) {
+    return recordVals.get(0).equals(genericRecord.get(0))
+        && recordVals.get(1).equals(genericRecord.get(1));
+  }
+
+  private void importData(String targetDir, SqoopOptions.FileLayout fileLayout) {
+    SqoopOptions options;
+    options = getSqoopOptions(newConf());
+    options.setTableName(TABLE_NAME);
+    options.setNumMappers(1);
+    options.setFileLayout(fileLayout);
+    options.setDeleteMode(true);
+
+    Path warehouse = new Path(BaseSqoopTestCase.LOCAL_WAREHOUSE_DIR);
+    options.setTargetDir(new Path(warehouse, targetDir).toString());
+
+    ImportTool importTool = new ImportTool();
+    Sqoop importer = new Sqoop(importTool, options.getConf(), options);
+    int ret = Sqoop.runSqoop(importer, new String[0]);
+    if (0 != ret) {
+      fail("Initial import failed with exit code " + ret);
+    }
+  }
+
+  /**
+   * @return true if the file specified by path 'p' contains a line
+   * that starts with 'prefix'
+   */
+  protected boolean checkTextFileForLine(FileSystem fs, Path p, List<Integer> record)
+      throws IOException {
+    final String prefix = StringUtils.join(record, ',');
+    BufferedReader r = new BufferedReader(new InputStreamReader(fs.open(p)));
+    try {
+      while (true) {
+        String in = r.readLine();
+        if (null == in) {
+          break; // done with the file.
+        }
+
+        if (in.startsWith(prefix)) {
+          return true;
+        }
+      }
+    } finally {
+      r.close();
+    }
+
+    return false;
+  }
+
+  private boolean checkAvroFileForLine(FileSystem fs, Path p, List<Integer> record)
+      throws IOException {
+    SeekableInput in = new FsInput(p, new Configuration());
+    DatumReader<GenericRecord> datumReader = new GenericDatumReader<GenericRecord>();
+    FileReader<GenericRecord> reader = DataFileReader.openReader(in, datumReader);
+    reader.sync(0);
+
+    while (reader.hasNext()) {
+      if (valueMatches(reader.next(), record)) {
+        return true;
+      }
+    }
+
+    return false;
+  }
+
+  private boolean checkParquetFileForLine(FileSystem fileSystem, Path path, List<Integer> record) throws IOException
+  {
+    Dataset<Record> parquetRecords = Datasets.load("dataset:" + path.getParent(), Record.class);
+    DatasetReader<Record> datasetReader = null;
+    try {
+      datasetReader = parquetRecords.newReader();
+      for (GenericRecord genericRecord : datasetReader) {
+        if (valueMatches(genericRecord, record)) {
+          return true;
+        }
+      }
+    }
+    finally {
+      if (datasetReader != null) {
+        datasetReader.close();
+      }
+    }
+
+    return false;
+  }
+
+  protected boolean checkFileForLine(FileSystem fs, Path p, SqoopOptions.FileLayout fileLayout,
+      List<Integer> record) throws IOException {
+    boolean result = false;
+    switch (fileLayout) {
+      case TextFile:
+        result = checkTextFileForLine(fs, p, record);
+        break;
+      case AvroDataFile:
+        result = checkAvroFileForLine(fs, p, record);
+        break;
+      case ParquetFile:
+        result = checkParquetFileForLine(fs, p, record);
+        break;
+    }
+    return result;
+  }
+
+  /**
+   * Return true if there's a file in 'dirName' with a line that starts with
+   * 'prefix'.
+   */
+  protected boolean recordStartsWith(List<Integer> record, String dirName,
+      SqoopOptions.FileLayout fileLayout)
+      throws Exception {
+    Path warehousePath = new Path(LOCAL_WAREHOUSE_DIR);
+    Path targetPath = new Path(warehousePath, dirName);
+
+    FileSystem fs = FileSystem.getLocal(new Configuration());
+    FileStatus [] files = fs.listStatus(targetPath);
+
+    if (null == files || files.length == 0) {
+      fail("Got no import files!");
+    }
+
+    for (FileStatus stat : files) {
+      Path p = stat.getPath();
+      if (p.getName().startsWith("part-") || p.getName().endsWith(".parquet")) {
+        if (checkFileForLine(fs, p, fileLayout, record)) {
+          // We found the line. Nothing further to do.
+          return true;
+        }
+      }
+    }
+
+    return false;
+  }
+
+  protected void assertRecordStartsWith(List<Integer> record, String dirName,
+      SqoopOptions.FileLayout fileLayout) throws Exception {
+    if (!recordStartsWith(record, dirName, fileLayout)) {
+      fail("No record found that starts with [" + StringUtils.join(record, ", ") + "] in " + dirName);
+    }
+  }
+}

http://git-wip-us.apache.org/repos/asf/sqoop/blob/6984a36c/src/test/org/apache/sqoop/TestMultiCols.java
----------------------------------------------------------------------
diff --git a/src/test/org/apache/sqoop/TestMultiCols.java b/src/test/org/apache/sqoop/TestMultiCols.java
new file mode 100644
index 0000000..1c932e9
--- /dev/null
+++ b/src/test/org/apache/sqoop/TestMultiCols.java
@@ -0,0 +1,241 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.sqoop;
+
+import java.io.IOException;
+
+import org.apache.commons.logging.Log;
+import org.apache.commons.logging.LogFactory;
+
+import org.apache.sqoop.testutil.ImportJobTestCase;
+import org.junit.Test;
+
+/**
+ * Test cases that import rows containing multiple columns,
+ * some of which may contain null values.
+ *
+ * Also test loading only selected columns from the db.
+ */
+public class TestMultiCols extends ImportJobTestCase {
+
+  public static final Log LOG = LogFactory.getLog(
+      TestMultiCols.class.getName());
+
+  /**
+   * Do a full import verification test on a table containing one row.
+   * @param types the types of the columns to insert
+   * @param insertVals the SQL text to use to insert each value
+   * @param validateLine the text to expect as a toString() of the entire row,
+   * as imported by the tool
+   */
+  private void verifyTypes(String [] types , String [] insertVals,
+      String validateLine) {
+    verifyTypes(types, insertVals, validateLine, null);
+  }
+
+  private void verifyTypes(String [] types , String [] insertVals,
+      String validateLine, String [] importColumns) {
+
+    createTableWithColTypes(types, insertVals);
+    verifyImport(validateLine, importColumns);
+    LOG.debug("Verified input line as " + validateLine + " -- ok!");
+  }
+
+  @Test
+  public void testThreeStrings() {
+    String [] types = { "VARCHAR(32)", "VARCHAR(32)", "VARCHAR(32)" };
+    String [] insertVals = { "'foo'", "'bar'", "'baz'" };
+    String validateLine = "foo,bar,baz";
+
+    verifyTypes(types, insertVals, validateLine);
+  }
+
+  @Test
+  public void testStringsWithNull1() {
+    String [] types = { "VARCHAR(32)", "VARCHAR(32)", "VARCHAR(32)" };
+    String [] insertVals = { "'foo'", "null", "'baz'" };
+    String validateLine = "foo,null,baz";
+
+    verifyTypes(types, insertVals, validateLine);
+  }
+
+  @Test
+  public void testStringsWithNull2() {
+    String [] types = { "VARCHAR(32)", "VARCHAR(32)", "VARCHAR(32)" };
+    String [] insertVals = { "null", "'foo'", "'baz'" };
+    String validateLine = "null,foo,baz";
+
+    verifyTypes(types, insertVals, validateLine);
+  }
+
+  @Test
+  public void testStringsWithNull3() {
+    String [] types = { "VARCHAR(32)", "VARCHAR(32)", "VARCHAR(32)" };
+    String [] insertVals = { "'foo'", "'baz'", "null"};
+    String validateLine = "foo,baz,null";
+
+    verifyTypes(types, insertVals, validateLine);
+  }
+
+  @Test
+  public void testThreeInts() {
+    String [] types = { "INTEGER", "INTEGER", "INTEGER" };
+    String [] insertVals = { "1", "2", "3" };
+    String validateLine = "1,2,3";
+
+    verifyTypes(types, insertVals, validateLine);
+  }
+
+  @Test
+  public void testIntsWithNulls() {
+    String [] types = { "INTEGER", "INTEGER", "INTEGER" };
+    String [] insertVals = { "1", "null", "3" };
+    String validateLine = "1,null,3";
+
+    verifyTypes(types, insertVals, validateLine);
+  }
+
+  @Test
+  public void testMixed1() {
+    String [] types = { "INTEGER", "VARCHAR(32)", "DATE" };
+    String [] insertVals = { "1", "'meep'", "'2009-12-31'" };
+    String validateLine = "1,meep,2009-12-31";
+
+    verifyTypes(types, insertVals, validateLine);
+  }
+
+  @Test
+  public void testMixed2() {
+    String [] types = { "INTEGER", "VARCHAR(32)", "DATE" };
+    String [] insertVals = { "null", "'meep'", "'2009-12-31'" };
+    String validateLine = "null,meep,2009-12-31";
+
+    verifyTypes(types, insertVals, validateLine);
+  }
+
+  @Test
+  public void testMixed3() {
+    String [] types = { "INTEGER", "VARCHAR(32)", "DATE" };
+    String [] insertVals = { "1", "'meep'", "null" };
+    String validateLine = "1,meep,null";
+
+    verifyTypes(types, insertVals, validateLine);
+  }
+
+  @Test
+  public void testMixed4() {
+    String [] types = { "NUMERIC", "INTEGER", "NUMERIC" };
+    String [] insertVals = { "-42", "17", "33333333333333333333333.1714" };
+    String validateLine = "-42,17,33333333333333333333333.1714";
+
+    verifyTypes(types, insertVals, validateLine);
+  }
+
+  @Test
+  public void testMixed5() {
+    String [] types = { "NUMERIC", "INTEGER", "NUMERIC" };
+    String [] insertVals = { "null", "17", "33333333333333333333333.0" };
+    String validateLine = "null,17,33333333333333333333333.0";
+
+    verifyTypes(types, insertVals, validateLine);
+  }
+
+  @Test
+  public void testMixed6() {
+    String [] types = { "NUMERIC", "INTEGER", "NUMERIC" };
+    String [] insertVals = { "33333333333333333333333", "17", "-42"};
+    String validateLine = "33333333333333333333333,17,-42";
+
+    verifyTypes(types, insertVals, validateLine);
+  }
+
+  //////////////////////////////////////////////////////////////////////////
+  // the tests below here test the --columns parameter and ensure that
+  // we can selectively import only certain columns.
+  //////////////////////////////////////////////////////////////////////////
+
+  @Test
+  public void testSkipFirstCol() {
+    String [] types = { "NUMERIC", "INTEGER", "NUMERIC" };
+    String [] insertVals = { "33333333333333333333333", "17", "-42"};
+    String validateLine = "17,-42";
+
+    String [] loadCols = {"DATA_COL1", "DATA_COL2"};
+
+    verifyTypes(types, insertVals, validateLine, loadCols);
+  }
+
+  @Test
+  public void testSkipSecondCol() {
+    String [] types = { "NUMERIC", "INTEGER", "NUMERIC" };
+    String [] insertVals = { "33333333333333333333333", "17", "-42"};
+    String validateLine = "33333333333333333333333,-42";
+
+    String [] loadCols = {"DATA_COL0", "DATA_COL2"};
+
+    verifyTypes(types, insertVals, validateLine, loadCols);
+  }
+
+  @Test
+  public void testSkipThirdCol() {
+    String [] types = { "NUMERIC", "INTEGER", "NUMERIC" };
+    String [] insertVals = { "33333333333333333333333", "17", "-42"};
+    String validateLine = "33333333333333333333333,17";
+
+    String [] loadCols = {"DATA_COL0", "DATA_COL1"};
+
+    verifyTypes(types, insertVals, validateLine, loadCols);
+  }
+
+  /**
+   * This tests that the columns argument can handle comma-separated column
+   * names.  So this is like having:
+   *   --columns "DATA_COL0,DATA_COL1,DATA_COL2"
+   * as two args on a sqoop command line
+   *
+   * @throws IOException
+   */
+  @Test
+  public void testSingleColumnsArg() throws IOException {
+    String [] types = { "VARCHAR(32)", "VARCHAR(32)", "VARCHAR(32)" };
+    String [] insertVals = { "'foo'", "'bar'", "'baz'" };
+    String validateLine = "foo,bar,baz";
+    String [] loadCols = {"DATA_COL0,DATA_COL1,DATA_COL2"};
+
+    verifyTypes(types, insertVals, validateLine, loadCols);
+  }
+
+  /**
+   * This tests that the columns argument can handle spaces between column
+   * names.  So this is like having:
+   *   --columns "DATA_COL0, DATA_COL1, DATA_COL2"
+   * as two args on a sqoop command line
+   *
+   * @throws IOException
+   */
+  @Test
+  public void testColumnsWithSpaces() throws IOException {
+    String [] types = { "VARCHAR(32)", "VARCHAR(32)", "VARCHAR(32)" };
+    String [] insertVals = { "'foo'", "'bar'", "'baz'" };
+    String validateLine = "foo,bar,baz";
+    String [] loadCols = {"DATA_COL0, DATA_COL1, DATA_COL2"};
+
+    verifyTypes(types, insertVals, validateLine, loadCols);
+  }
+}

http://git-wip-us.apache.org/repos/asf/sqoop/blob/6984a36c/src/test/org/apache/sqoop/TestMultiMaps.java
----------------------------------------------------------------------
diff --git a/src/test/org/apache/sqoop/TestMultiMaps.java b/src/test/org/apache/sqoop/TestMultiMaps.java
new file mode 100644
index 0000000..050e268
--- /dev/null
+++ b/src/test/org/apache/sqoop/TestMultiMaps.java
@@ -0,0 +1,189 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.sqoop;
+
+import java.io.IOException;
+import java.util.ArrayList;
+import java.util.List;
+
+import org.apache.commons.cli.ParseException;
+import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.fs.FileStatus;
+import org.apache.hadoop.fs.FileSystem;
+import org.apache.hadoop.fs.Path;
+import org.apache.hadoop.io.IOUtils;
+import org.apache.hadoop.io.SequenceFile;
+import org.apache.hadoop.mapred.Utils;
+import org.apache.hadoop.util.ReflectionUtils;
+
+import org.apache.sqoop.SqoopOptions;
+import org.apache.sqoop.SqoopOptions.InvalidOptionsException;
+import org.apache.sqoop.orm.CompilationManager;
+import org.apache.sqoop.testutil.*;
+import org.apache.sqoop.tool.ImportTool;
+import org.apache.sqoop.util.ClassLoaderStack;
+import org.junit.Test;
+
+import static org.junit.Assert.assertEquals;
+import static org.junit.Assert.fail;
+
+/**
+ * Test that using multiple mapper splits works.
+ */
+public class TestMultiMaps extends ImportJobTestCase {
+
+  /**
+   * Create the argv to pass to Sqoop.
+   * @return the argv as an array of strings.
+   */
+  protected String [] getArgv(boolean includeHadoopFlags, String [] colNames,
+      String splitByCol) {
+    String columnsString = "";
+    for (String col : colNames) {
+      columnsString += col + ",";
+    }
+
+    ArrayList<String> args = new ArrayList<String>();
+
+    if (includeHadoopFlags) {
+      CommonArgs.addHadoopFlags(args);
+    }
+
+    args.add("--table");
+    args.add(HsqldbTestServer.getTableName());
+    args.add("--columns");
+    args.add(columnsString);
+    args.add("--split-by");
+    args.add(splitByCol);
+    args.add("--warehouse-dir");
+    args.add(getWarehouseDir());
+    args.add("--connect");
+    args.add(HsqldbTestServer.getUrl());
+    args.add("--as-sequencefile");
+    args.add("--num-mappers");
+    args.add("2");
+
+    return args.toArray(new String[0]);
+  }
+
+  // this test just uses the two int table.
+  protected String getTableName() {
+    return HsqldbTestServer.getTableName();
+  }
+
+  /** @return a list of Path objects for each data file */
+  protected List<Path> getDataFilePaths() throws IOException {
+    List<Path> paths = new ArrayList<Path>();
+    Configuration conf = new Configuration();
+    if (!BaseSqoopTestCase.isOnPhysicalCluster()) {
+      conf.set(CommonArgs.FS_DEFAULT_NAME, CommonArgs.LOCAL_FS);
+    }
+    FileSystem fs = FileSystem.get(conf);
+
+    FileStatus [] stats = fs.listStatus(getTablePath(),
+        new Utils.OutputFileUtils.OutputFilesFilter());
+
+    for (FileStatus stat : stats) {
+      paths.add(stat.getPath());
+    }
+
+    return paths;
+  }
+
+  /**
+   * Given a comma-delimited list of integers, grab and parse the first int.
+   * @param str a comma-delimited list of values, the first of which is an int.
+   * @return the first field in the string, cast to int
+   */
+  private int getFirstInt(String str) {
+    String [] parts = str.split(",");
+    return Integer.parseInt(parts[0]);
+  }
+
+  public void runMultiMapTest(String splitByCol, int expectedSum)
+      throws IOException {
+
+    String [] columns = HsqldbTestServer.getFieldNames();
+    ClassLoader prevClassLoader = null;
+    SequenceFile.Reader reader = null;
+
+    String [] argv = getArgv(true, columns, splitByCol);
+    runImport(argv);
+    try {
+      ImportTool importTool = new ImportTool();
+      SqoopOptions opts = importTool.parseArguments(
+          getArgv(false, columns, splitByCol),
+          null, null, true);
+
+      CompilationManager compileMgr = new CompilationManager(opts);
+      String jarFileName = compileMgr.getJarFilename();
+
+      prevClassLoader = ClassLoaderStack.addJarFile(jarFileName,
+          getTableName());
+
+      List<Path> paths = getDataFilePaths();
+      Configuration conf = new Configuration();
+      int curSum = 0;
+
+      // We expect multiple files. We need to open all the files and sum up the
+      // first column across all of them.
+      for (Path p : paths) {
+        reader = SeqFileReader.getSeqFileReader(p.toString());
+
+        // here we can actually instantiate (k, v) pairs.
+        Object key = ReflectionUtils.newInstance(reader.getKeyClass(), conf);
+        Object val = ReflectionUtils.newInstance(reader.getValueClass(), conf);
+
+        // We know that these values are two ints separated by a ','
+        // character.  Since this is all dynamic, though, we don't want to
+        // actually link against the class and use its methods. So we just
+        // parse this back into int fields manually.  Sum them up and ensure
+        // that we get the expected total for the first column, to verify that
+        // we got all the results from the db into the file.
+
+        // now sum up everything in the file.
+        while (reader.next(key) != null) {
+          reader.getCurrentValue(val);
+          curSum += getFirstInt(val.toString());
+        }
+
+        IOUtils.closeStream(reader);
+        reader = null;
+      }
+
+      assertEquals("Total sum of first db column mismatch", expectedSum,
+          curSum);
+    } catch (InvalidOptionsException ioe) {
+      fail(ioe.toString());
+    } catch (ParseException pe) {
+      fail(pe.toString());
+    } finally {
+      IOUtils.closeStream(reader);
+
+      if (null != prevClassLoader) {
+        ClassLoaderStack.setCurrentClassLoader(prevClassLoader);
+      }
+    }
+  }
+
+  @Test
+  public void testSplitByFirstCol() throws IOException {
+    runMultiMapTest("INTFIELD1", HsqldbTestServer.getFirstColSum());
+  }
+}