You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@sqoop.apache.org by an...@apache.org on 2018/01/18 14:03:57 UTC
[11/32] sqoop git commit: SQOOP-3273: Removing com.cloudera.sqoop
packages
http://git-wip-us.apache.org/repos/asf/sqoop/blob/6984a36c/src/test/org/apache/sqoop/TestFreeFormQueryImport.java
----------------------------------------------------------------------
diff --git a/src/test/org/apache/sqoop/TestFreeFormQueryImport.java b/src/test/org/apache/sqoop/TestFreeFormQueryImport.java
new file mode 100644
index 0000000..2df4352
--- /dev/null
+++ b/src/test/org/apache/sqoop/TestFreeFormQueryImport.java
@@ -0,0 +1,159 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.sqoop;
+
+import java.io.BufferedReader;
+import java.io.File;
+import java.io.FileInputStream;
+import java.io.IOException;
+import java.io.InputStreamReader;
+import java.sql.SQLException;
+import java.util.ArrayList;
+import java.util.List;
+
+import org.apache.commons.logging.Log;
+import org.apache.commons.logging.LogFactory;
+import org.apache.hadoop.fs.FSDataInputStream;
+import org.apache.hadoop.fs.FileSystem;
+import org.apache.hadoop.fs.Path;
+import org.apache.hadoop.io.IOUtils;
+
+import org.apache.sqoop.testutil.CommonArgs;
+import org.apache.sqoop.testutil.ImportJobTestCase;
+import org.junit.After;
+import org.junit.Test;
+
+import static org.junit.Assert.assertEquals;
+
+/**
+ * Test free form query import.
+ */
+public class TestFreeFormQueryImport extends ImportJobTestCase {
+
+ private Log log;
+
+ public TestFreeFormQueryImport() {
+ this.log = LogFactory.getLog(TestFreeFormQueryImport.class.getName());
+ }
+
+ /**
+ * @return the Log object to use for reporting during this test
+ */
+ protected Log getLogger() {
+ return log;
+ }
+
+ /** the names of the tables we're creating. */
+ private List<String> tableNames;
+
+ @After
+ public void tearDown() {
+ // Clean up the database on our way out.
+ for (String tableName : tableNames) {
+ try {
+ dropTableIfExists(tableName);
+ } catch (SQLException e) {
+ log.warn("Error trying to drop table '" + tableName
+ + "' on tearDown: " + e);
+ }
+ }
+ super.tearDown();
+ }
+
+ /**
+ * Create the argv to pass to Sqoop.
+ * @param splitByCol column of the table used to split work.
+ * @param query free form query to be used.
+ * @return the argv as an array of strings.
+ */
+ protected String [] getArgv(String splitByCol, String query) {
+ ArrayList<String> args = new ArrayList<String>();
+
+ CommonArgs.addHadoopFlags(args);
+
+ args.add("--connect");
+ args.add(getConnectString());
+ args.add("--target-dir");
+ args.add(getWarehouseDir());
+ args.add("--split-by");
+ args.add(splitByCol);
+ args.add("--num-mappers");
+ args.add("2");
+ args.add("--query");
+ args.add(query);
+
+ return args.toArray(new String[0]);
+ }
+
+ /**
+ * Create two tables that share the common id column. Run free-form query
+ * import on the result table that is created by joining the two tables on
+ * the id column.
+ */
+ @Test
+ public void testSimpleJoin() throws IOException {
+ tableNames = new ArrayList<String>();
+
+ String [] types1 = { "SMALLINT", };
+ String [] vals1 = { "1", };
+ String tableName1 = getTableName();
+ createTableWithColTypes(types1, vals1);
+ tableNames.add(tableName1);
+
+ incrementTableNum();
+
+ String [] types2 = { "SMALLINT", "VARCHAR(32)", };
+ String [] vals2 = { "1", "'foo'", };
+ String tableName2 = getTableName();
+ createTableWithColTypes(types2, vals2);
+ tableNames.add(tableName2);
+
+ String query = "SELECT "
+ + tableName1 + "." + getColName(0) + ", "
+ + tableName2 + "." + getColName(1) + " "
+ + "FROM " + tableName1 + " JOIN " + tableName2 + " ON ("
+ + tableName1 + "." + getColName(0) + " = "
+ + tableName2 + "." + getColName(0) + ") WHERE "
+ + tableName1 + "." + getColName(0) + " < 3 AND $CONDITIONS";
+
+ runImport(getArgv(tableName1 + "." + getColName(0), query));
+
+ Path warehousePath = new Path(this.getWarehouseDir());
+ Path filePath = new Path(warehousePath, "part-m-00000");
+ String expectedVal = "1,foo";
+
+ BufferedReader reader = null;
+ if (!isOnPhysicalCluster()) {
+ reader = new BufferedReader(
+ new InputStreamReader(new FileInputStream(
+ new File(filePath.toString()))));
+ } else {
+ FileSystem dfs = FileSystem.get(getConf());
+ FSDataInputStream dis = dfs.open(filePath);
+ reader = new BufferedReader(new InputStreamReader(dis));
+ }
+ try {
+ String line = reader.readLine();
+ assertEquals("QueryResult expected a different string",
+ expectedVal, line);
+ } finally {
+ IOUtils.closeStream(reader);
+ }
+ }
+}
http://git-wip-us.apache.org/repos/asf/sqoop/blob/6984a36c/src/test/org/apache/sqoop/TestIncrementalImport.java
----------------------------------------------------------------------
diff --git a/src/test/org/apache/sqoop/TestIncrementalImport.java b/src/test/org/apache/sqoop/TestIncrementalImport.java
new file mode 100644
index 0000000..1ab9802
--- /dev/null
+++ b/src/test/org/apache/sqoop/TestIncrementalImport.java
@@ -0,0 +1,1348 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.sqoop;
+
+import java.io.BufferedReader;
+import java.io.File;
+import java.io.IOException;
+import java.io.InputStreamReader;
+import java.sql.Connection;
+import java.sql.PreparedStatement;
+import java.sql.ResultSet;
+import java.sql.SQLException;
+import java.sql.Timestamp;
+import java.util.ArrayList;
+import java.util.Arrays;
+import java.util.List;
+
+import org.apache.sqoop.metastore.SavedJobsTestBase;
+import org.apache.commons.logging.Log;
+import org.apache.commons.logging.LogFactory;
+import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.fs.FileStatus;
+import org.apache.hadoop.fs.FileSystem;
+import org.apache.hadoop.fs.Path;
+import org.apache.hadoop.util.StringUtils;
+import org.apache.sqoop.hive.HiveImport;
+import org.apache.sqoop.manager.ConnManager;
+import org.apache.sqoop.manager.HsqldbManager;
+import org.apache.sqoop.manager.ManagerFactory;
+import org.apache.sqoop.metastore.JobData;
+import org.apache.sqoop.testutil.BaseSqoopTestCase;
+import org.apache.sqoop.testutil.CommonArgs;
+import org.apache.sqoop.tool.ImportTool;
+import org.apache.sqoop.tool.JobTool;
+import org.apache.sqoop.metastore.AutoGenericJobStorage;
+import org.junit.Before;
+import org.junit.Rule;
+import org.junit.Test;
+import org.junit.rules.ExpectedException;
+
+
+import static org.junit.Assert.*;
+
+/**
+ * Test the incremental import functionality.
+ *
+ * These all make use of the auto-connect hsqldb-based metastore.
+ * The metastore URL is configured to be in-memory, and drop all
+ * state between individual tests.
+ */
+
+public class TestIncrementalImport {
+
+ public static final Log LOG = LogFactory.getLog(
+ TestIncrementalImport.class.getName());
+
+ // What database do we read from.
+ public static final String SOURCE_DB_URL = "jdbc:hsqldb:mem:incremental";
+ public static final String AUTO_STORAGE_PASSWORD = "";
+ public static final String AUTO_STORAGE_USERNAME = "SA";
+
+ @Rule
+ public ExpectedException thrown = ExpectedException.none();
+
+ @Before
+ public void setUp() throws Exception {
+ resetSourceDataSchema();
+ }
+
+ public static void resetSourceDataSchema() throws SQLException {
+ SqoopOptions options = new SqoopOptions();
+ options.setConnectString(SOURCE_DB_URL);
+ options.setUsername(AUTO_STORAGE_USERNAME);
+ options.setPassword(AUTO_STORAGE_PASSWORD);
+ SavedJobsTestBase.resetSchema(options);
+ }
+
+ public static Configuration newConf() {
+ Configuration conf = new Configuration();
+ conf.set(AutoGenericJobStorage.AUTO_STORAGE_USER_KEY, AUTO_STORAGE_USERNAME);
+ conf.set(AutoGenericJobStorage.AUTO_STORAGE_PASS_KEY, AUTO_STORAGE_PASSWORD);
+ conf.set(AutoGenericJobStorage.AUTO_STORAGE_CONNECT_STRING_KEY,
+ SOURCE_DB_URL);
+ return conf;
+ }
+
+ /**
+ * Assert that a table has a specified number of rows.
+ */
+ private void assertRowCount(String table, int numRows) throws SQLException {
+ SqoopOptions options = new SqoopOptions();
+ options.setConnectString(SOURCE_DB_URL);
+ HsqldbManager manager = new HsqldbManager(options);
+ Connection c = manager.getConnection();
+ PreparedStatement s = null;
+ ResultSet rs = null;
+ try {
+ s = c.prepareStatement("SELECT COUNT(*) FROM " + manager.escapeTableName(table));
+ rs = s.executeQuery();
+ if (!rs.next()) {
+ fail("No resultset");
+ }
+ int realNumRows = rs.getInt(1);
+ assertEquals(numRows, realNumRows);
+ LOG.info("Expected " + numRows + " rows -- ok.");
+ } finally {
+ if (null != s) {
+ try {
+ s.close();
+ } catch (SQLException sqlE) {
+ LOG.warn("exception: " + sqlE);
+ }
+ }
+
+ if (null != rs) {
+ try {
+ rs.close();
+ } catch (SQLException sqlE) {
+ LOG.warn("exception: " + sqlE);
+ }
+ }
+ }
+ }
+
+ /**
+ * Insert rows with id = [low, hi) into tableName.
+ */
+ private void insertIdRows(String tableName, int low, int hi)
+ throws SQLException {
+ SqoopOptions options = new SqoopOptions();
+ options.setConnectString(SOURCE_DB_URL);
+ HsqldbManager manager = new HsqldbManager(options);
+ Connection c = manager.getConnection();
+ PreparedStatement s = null;
+ try {
+ s = c.prepareStatement("INSERT INTO " + manager.escapeTableName(tableName) + " VALUES(?)");
+ for (int i = low; i < hi; i++) {
+ s.setInt(1, i);
+ s.executeUpdate();
+ }
+
+ c.commit();
+ } finally {
+ if(s != null) {
+ s.close();
+ }
+ }
+ }
+
+ /**
+ * Insert rows with id = [low, hi) into tableName with
+ * the timestamp column set to the specified ts.
+ */
+ private void insertIdTimestampRows(String tableName, int low, int hi,
+ Timestamp ts) throws SQLException {
+ LOG.info("Inserting id rows in [" + low + ", " + hi + ") @ " + ts);
+ SqoopOptions options = new SqoopOptions();
+ options.setConnectString(SOURCE_DB_URL);
+ HsqldbManager manager = new HsqldbManager(options);
+ Connection c = manager.getConnection();
+ PreparedStatement s = null;
+ try {
+ s = c.prepareStatement("INSERT INTO " + manager.escapeTableName(tableName) + " VALUES(?,?)");
+ for (int i = low; i < hi; i++) {
+ s.setInt(1, i);
+ s.setTimestamp(2, ts);
+ s.executeUpdate();
+ }
+
+ c.commit();
+ } finally {
+ s.close();
+ }
+ }
+
+ /**
+ * Insert rows with id = [low, hi) into tableName with
+ * id converted to string.
+ */
+ private void insertIdVarcharRows(String tableName, int low, int hi)
+ throws SQLException {
+ LOG.info("Inserting rows in [" + low + ", " + hi + ")");
+ SqoopOptions options = new SqoopOptions();
+ options.setConnectString(SOURCE_DB_URL);
+ HsqldbManager manager = new HsqldbManager(options);
+ Connection c = manager.getConnection();
+ PreparedStatement s = null;
+ try {
+ s = c.prepareStatement("INSERT INTO " + manager.escapeTableName(tableName) + " VALUES(?)");
+ for (int i = low; i < hi; i++) {
+ s.setString(1, Integer.toString(i));
+ s.executeUpdate();
+ }
+ c.commit();
+ } finally {
+ s.close();
+ }
+ }
+
+ /**
+ * Create a table with an 'id' column full of integers.
+ */
+ private void createIdTable(String tableName, int insertRows)
+ throws SQLException {
+ SqoopOptions options = new SqoopOptions();
+ options.setConnectString(SOURCE_DB_URL);
+ HsqldbManager manager = new HsqldbManager(options);
+ Connection c = manager.getConnection();
+ PreparedStatement s = null;
+ try {
+ s = c.prepareStatement("CREATE TABLE " + manager.escapeTableName(tableName) + "(id INT NOT NULL)");
+ s.executeUpdate();
+ c.commit();
+ insertIdRows(tableName, 0, insertRows);
+ } finally {
+ s.close();
+ }
+ }
+
+ /**
+ * Create a table with an 'id' column full of integers and a
+ * last_modified column with timestamps.
+ */
+ private void createTimestampTable(String tableName, int insertRows,
+ Timestamp baseTime) throws SQLException {
+ SqoopOptions options = new SqoopOptions();
+ options.setConnectString(SOURCE_DB_URL);
+ HsqldbManager manager = new HsqldbManager(options);
+ Connection c = manager.getConnection();
+ PreparedStatement s = null;
+ try {
+ s = c.prepareStatement("CREATE TABLE " + manager.escapeTableName(tableName) + "(id INT NOT NULL, "
+ + "last_modified TIMESTAMP)");
+ s.executeUpdate();
+ c.commit();
+ insertIdTimestampRows(tableName, 0, insertRows, baseTime);
+ } finally {
+ s.close();
+ }
+ }
+
+ /**
+ * Create a table with an 'id' column of type varchar(20)
+ */
+ private void createIdVarcharTable(String tableName,
+ int insertRows) throws SQLException {
+ SqoopOptions options = new SqoopOptions();
+ options.setConnectString(SOURCE_DB_URL);
+ HsqldbManager manager = new HsqldbManager(options);
+ Connection c = manager.getConnection();
+ PreparedStatement s = null;
+ try {
+ s = c.prepareStatement("CREATE TABLE " + manager.escapeTableName(tableName) + "(id varchar(20) NOT NULL)");
+ s.executeUpdate();
+ c.commit();
+ insertIdVarcharRows(tableName, 0, insertRows);
+ } finally {
+ s.close();
+ }
+ }
+
+ /**
+ * Delete all files in a directory for a table.
+ */
+ public void clearDir(String tableName) {
+ try {
+ FileSystem fs = FileSystem.getLocal(new Configuration());
+ Path warehouse = new Path(BaseSqoopTestCase.LOCAL_WAREHOUSE_DIR);
+ Path tableDir = new Path(warehouse, tableName);
+ fs.delete(tableDir, true);
+ } catch (Exception e) {
+ fail("Got unexpected exception: " + StringUtils.stringifyException(e));
+ }
+ }
+
+ /**
+ * Look at a directory that should contain files full of an imported 'id'
+ * column. Assert that all numbers in [0, expectedNums) are present
+ * in order.
+ */
+ public void assertDirOfNumbers(String tableName, int expectedNums) {
+ try {
+ FileSystem fs = FileSystem.getLocal(new Configuration());
+ Path warehouse = new Path(BaseSqoopTestCase.LOCAL_WAREHOUSE_DIR);
+ Path tableDir = new Path(warehouse, tableName);
+ FileStatus [] stats = fs.listStatus(tableDir);
+ String [] fileNames = new String[stats.length];
+ for (int i = 0; i < stats.length; i++) {
+ fileNames[i] = stats[i].getPath().toString();
+ }
+
+ Arrays.sort(fileNames);
+
+ // Read all the files in sorted order, adding the value lines to the list.
+ List<String> receivedNums = new ArrayList<String>();
+ for (String fileName : fileNames) {
+ if (fileName.startsWith("_") || fileName.startsWith(".")) {
+ continue;
+ }
+
+ BufferedReader r = new BufferedReader(
+ new InputStreamReader(fs.open(new Path(fileName))));
+ try {
+ while (true) {
+ String s = r.readLine();
+ if (null == s) {
+ break;
+ }
+
+ receivedNums.add(s.trim());
+ }
+ } finally {
+ r.close();
+ }
+ }
+
+ assertEquals(expectedNums, receivedNums.size());
+
+ // Compare the received values with the expected set.
+ for (int i = 0; i < expectedNums; i++) {
+ assertEquals((int) i, (int) Integer.valueOf(receivedNums.get(i)));
+ }
+ } catch (Exception e) {
+ fail("Got unexpected exception: " + StringUtils.stringifyException(e));
+ }
+ }
+
+ /**
+ * Look at a directory that should contain files full of an imported 'id'
+ * column and 'last_modified' column. Assert that all numbers in [0, expectedNums) are present
+ * in order.
+ */
+ public void assertDirOfNumbersAndTimestamps(String tableName, int expectedNums) {
+ try {
+ FileSystem fs = FileSystem.getLocal(new Configuration());
+ Path warehouse = new Path(BaseSqoopTestCase.LOCAL_WAREHOUSE_DIR);
+ Path tableDir = new Path(warehouse, tableName);
+ FileStatus [] stats = fs.listStatus(tableDir);
+ String [] fileNames = new String[stats.length];
+ for (int i = 0; i < stats.length; i++) {
+ fileNames[i] = stats[i].getPath().toString();
+ }
+
+ Arrays.sort(fileNames);
+
+ // Read all the files in sorted order, adding the value lines to the list.
+ List<String> receivedNums = new ArrayList<String>();
+ for (String fileName : fileNames) {
+ if (fileName.startsWith("_") || fileName.startsWith(".")) {
+ continue;
+ }
+
+ BufferedReader r = new BufferedReader(
+ new InputStreamReader(fs.open(new Path(fileName))));
+ try {
+ while (true) {
+ String s = r.readLine();
+ if (null == s) {
+ break;
+ }
+
+ receivedNums.add(s.trim());
+ }
+ } finally {
+ r.close();
+ }
+ }
+
+ assertEquals(expectedNums, receivedNums.size());
+
+ // Compare the received values with the expected set.
+ for (int i = 0; i < expectedNums; i++) {
+ assertEquals((int) i, (int) Integer.valueOf(receivedNums.get(i).split(",")[0]));
+ }
+ } catch (Exception e) {
+ fail("Got unexpected exception: " + StringUtils.stringifyException(e));
+ }
+ }
+
+ /**
+ * Assert that a directory contains a file with exactly one line
+ * in it, containing the prescribed number 'val'.
+ */
+ public void assertFirstSpecificNumber(String tableName, int val) {
+ try {
+ FileSystem fs = FileSystem.getLocal(new Configuration());
+ Path warehouse = new Path(BaseSqoopTestCase.LOCAL_WAREHOUSE_DIR);
+ Path tableDir = new Path(warehouse, tableName);
+ FileStatus [] stats = fs.listStatus(tableDir);
+ String [] filePaths = new String[stats.length];
+ for (int i = 0; i < stats.length; i++) {
+ filePaths[i] = stats[i].getPath().toString();
+ }
+
+ // Read the first file that is not a hidden file.
+ boolean foundVal = false;
+ for (String filePath : filePaths) {
+ String fileName = new Path(filePath).getName();
+ if (fileName.startsWith("_") || fileName.startsWith(".")) {
+ continue;
+ }
+
+ if (foundVal) {
+ // Make sure we don't have two or more "real" files in the dir.
+ fail("Got an extra data-containing file in this directory.");
+ }
+
+ BufferedReader r = new BufferedReader(
+ new InputStreamReader(fs.open(new Path(filePath))));
+ try {
+ String s = r.readLine();
+ if (null == s) {
+ fail("Unexpected empty file " + filePath + ".");
+ }
+ assertEquals(val, (int) Integer.valueOf(s.trim()));
+
+ String nextLine = r.readLine();
+ if (nextLine != null) {
+ fail("Expected only one result, but got another line: " + nextLine);
+ }
+
+ // Successfully got the value we were looking for.
+ foundVal = true;
+ } finally {
+ r.close();
+ }
+ }
+ } catch (IOException e) {
+ fail("Got unexpected exception: " + StringUtils.stringifyException(e));
+ }
+ }
+
+ /**
+ * Assert that a directory contains a file with exactly one line
+ * in it, containing the prescribed number 'val'.
+ */
+ public void assertSpecificNumber(String tableName, int val) {
+ try {
+ FileSystem fs = FileSystem.getLocal(new Configuration());
+ Path warehouse = new Path(BaseSqoopTestCase.LOCAL_WAREHOUSE_DIR);
+ Path tableDir = new Path(warehouse, tableName);
+ FileStatus [] stats = fs.listStatus(tableDir);
+ String [] filePaths = new String[stats.length];
+ for (int i = 0; i < stats.length; i++) {
+ filePaths[i] = stats[i].getPath().toString();
+ }
+
+ // Read the first file that is not a hidden file.
+ boolean foundVal = false;
+ for (String filePath : filePaths) {
+ String fileName = new Path(filePath).getName();
+ if (fileName.startsWith("_") || fileName.startsWith(".")) {
+ continue;
+ }
+
+ if (foundVal) {
+ // Make sure we don't have two or more "real" files in the dir.
+ fail("Got an extra data-containing file in this directory.");
+ }
+
+ BufferedReader r = new BufferedReader(
+ new InputStreamReader(fs.open(new Path(filePath))));
+ try {
+ String s = r.readLine();
+ if (val == (int) Integer.valueOf(s.trim().split(",")[0])) {
+ if (foundVal) {
+ fail("Expected only one result, but got another line: " + s);
+ }
+ foundVal = true;
+ }
+ } finally {
+ r.close();
+ }
+ }
+ } catch (IOException e) {
+ fail("Got unexpected exception: " + StringUtils.stringifyException(e));
+ }
+ }
+
+ public void runImport(SqoopOptions options, List<String> args) {
+ try {
+ Sqoop importer = new Sqoop(new ImportTool(), options.getConf(), options);
+ int ret = Sqoop.runSqoop(importer, args.toArray(new String[0]));
+ assertEquals("Failure during job", 0, ret);
+ } catch (Exception e) {
+ LOG.error("Got exception running Sqoop: "
+ + StringUtils.stringifyException(e));
+ throw new RuntimeException(e);
+ }
+ }
+
+ /**
+ * Return a list of arguments to import the specified table.
+ */
+ private List<String> getArgListForTable(String tableName, boolean commonArgs,
+ boolean isAppend) {
+ return getArgListForTable(tableName, commonArgs, isAppend, false);
+ }
+
+ /**
+ * Return a list of arguments to import the specified table.
+ */
+ private List<String> getArgListForTable(String tableName, boolean commonArgs,
+ boolean isAppend, boolean appendTimestamp) {
+ List<String> args = new ArrayList<String>();
+ if (commonArgs) {
+ CommonArgs.addHadoopFlags(args);
+ }
+ args.add("--connect");
+ args.add(SOURCE_DB_URL);
+ args.add("--table");
+ args.add(tableName);
+ args.add("--warehouse-dir");
+ args.add(BaseSqoopTestCase.LOCAL_WAREHOUSE_DIR);
+ if (isAppend) {
+ args.add("--incremental");
+ args.add("append");
+ if (!appendTimestamp) {
+ args.add("--check-column");
+ args.add("ID");
+ } else {
+ args.add("--check-column");
+ args.add("LAST_MODIFIED");
+ }
+ } else {
+ args.add("--incremental");
+ args.add("lastmodified");
+ args.add("--check-column");
+ args.add("LAST_MODIFIED");
+ }
+ args.add("--columns");
+ args.add("ID");
+ args.add("-m");
+ args.add("1");
+
+ return args;
+ }
+
+ /**
+ * Return list of arguments to import by query.
+ * @return
+ */
+ private List<String> getArgListForQuery(String query, String directoryName,
+ boolean commonArgs, boolean isAppend, boolean appendTimestamp) {
+ List<String> args = new ArrayList<String>();
+ if (commonArgs) {
+ CommonArgs.addHadoopFlags(args);
+ }
+
+ String [] directoryNames = directoryName.split("/");
+ String className = directoryNames[directoryNames.length -1];
+
+ args.add("--connect");
+ args.add(SOURCE_DB_URL);
+ args.add("--query");
+ args.add(query);
+ args.add("--class-name");
+ args.add(className);
+ args.add("--target-dir");
+ args.add(BaseSqoopTestCase.LOCAL_WAREHOUSE_DIR
+ + System.getProperty("file.separator") + directoryName);
+ if (isAppend) {
+ args.add("--incremental");
+ args.add("append");
+ if (!appendTimestamp) {
+ args.add("--check-column");
+ args.add("ID");
+ } else {
+ args.add("--check-column");
+ args.add("LAST_MODIFIED");
+ }
+ } else {
+ args.add("--incremental");
+ args.add("lastmodified");
+ args.add("--check-column");
+ args.add("LAST_MODIFIED");
+ }
+ args.add("-m");
+ args.add("1");
+
+ return args;
+ }
+ /**
+ * Create a job with the specified name, where the job performs
+ * an import configured with 'jobArgs'.
+ */
+ private void createJob(String jobName, List<String> jobArgs) {
+ createJob(jobName, jobArgs, newConf());
+ }
+
+ /**
+ * Create a job with the specified name, where the job performs
+ * an import configured with 'jobArgs', using the provided configuration
+ * as defaults.
+ */
+ private void createJob(String jobName, List<String> jobArgs,
+ Configuration conf) {
+ try {
+ SqoopOptions options = new SqoopOptions();
+ options.setConf(conf);
+ Sqoop makeJob = new Sqoop(new JobTool(), conf, options);
+
+ List<String> args = new ArrayList<String>();
+ args.add("--create");
+ args.add(jobName);
+ args.add("--");
+ args.add("import");
+ args.addAll(jobArgs);
+
+ int ret = Sqoop.runSqoop(makeJob, args.toArray(new String[0]));
+ assertEquals("Failure to create job", 0, ret);
+ } catch (Exception e) {
+ LOG.error("Got exception running Sqoop to create job: "
+ + StringUtils.stringifyException(e));
+ throw new RuntimeException(e);
+ }
+ }
+
+ /**
+ * Run the specified job.
+ */
+ private void runJob(String jobName) {
+ runJob(jobName, newConf());
+ }
+
+ /**
+ * Run the specified job.
+ */
+ private void runJob(String jobName, Configuration conf) {
+ try {
+ SqoopOptions options = new SqoopOptions();
+ options.setConf(conf);
+ Sqoop runJob = new Sqoop(new JobTool(), conf, options);
+
+ List<String> args = new ArrayList<String>();
+ args.add("--exec");
+ args.add(jobName);
+
+ int ret = Sqoop.runSqoop(runJob, args.toArray(new String[0]));
+ assertEquals("Failure to run job", 0, ret);
+ } catch (Exception e) {
+ LOG.error("Got exception running Sqoop to run job: "
+ + StringUtils.stringifyException(e));
+ throw new RuntimeException(e);
+ }
+ }
+
+ // Incremental import of an empty table, no metastore.
+ @Test
+ public void testEmptyAppendImport() throws Exception {
+ final String TABLE_NAME = "emptyAppend1";
+ createIdTable(TABLE_NAME, 0);
+ List<String> args = getArgListForTable(TABLE_NAME, true, true);
+
+ Configuration conf = newConf();
+ SqoopOptions options = new SqoopOptions();
+ options.setConf(conf);
+ runImport(options, args);
+
+ assertDirOfNumbers(TABLE_NAME, 0);
+ }
+
+ // Incremental import of a filled table, no metastore.
+ @Test
+ public void testFullAppendImport() throws Exception {
+ final String TABLE_NAME = "fullAppend1";
+ createIdTable(TABLE_NAME, 10);
+ List<String> args = getArgListForTable(TABLE_NAME, true, true);
+
+ Configuration conf = newConf();
+ SqoopOptions options = new SqoopOptions();
+ options.setConf(conf);
+ runImport(options, args);
+
+ assertDirOfNumbers(TABLE_NAME, 10);
+ }
+
+ @Test
+ public void testEmptyJobAppend() throws Exception {
+ // Create a job and run an import on an empty table.
+ // Nothing should happen.
+
+ final String TABLE_NAME = "emptyJob";
+ createIdTable(TABLE_NAME, 0);
+
+ List<String> args = getArgListForTable(TABLE_NAME, false, true);
+ createJob("emptyJob", args);
+ runJob("emptyJob");
+ assertDirOfNumbers(TABLE_NAME, 0);
+
+ // Running the job a second time should result in
+ // nothing happening, it's still empty.
+ runJob("emptyJob");
+ assertDirOfNumbers(TABLE_NAME, 0);
+ }
+
+ @Test
+ public void testEmptyThenFullJobAppend() throws Exception {
+ // Create an empty table. Import it; nothing happens.
+ // Add some rows. Verify they are appended.
+
+ final String TABLE_NAME = "emptyThenFull";
+ createIdTable(TABLE_NAME, 0);
+
+ List<String> args = getArgListForTable(TABLE_NAME, false, true);
+ createJob(TABLE_NAME, args);
+ runJob(TABLE_NAME);
+ assertDirOfNumbers(TABLE_NAME, 0);
+
+ // Now add some rows.
+ insertIdRows(TABLE_NAME, 0, 10);
+
+ // Running the job a second time should import 10 rows.
+ runJob(TABLE_NAME);
+ assertDirOfNumbers(TABLE_NAME, 10);
+
+ // Add some more rows.
+ insertIdRows(TABLE_NAME, 10, 20);
+
+ // Import only those rows.
+ runJob(TABLE_NAME);
+ assertDirOfNumbers(TABLE_NAME, 20);
+ }
+
+ @Test
+ public void testEmptyThenFullJobAppendWithQuery() throws Exception {
+ // Create an empty table. Import it; nothing happens.
+ // Add some rows. Verify they are appended.
+
+ final String TABLE_NAME = "withQuery";
+ createIdTable(TABLE_NAME, 0);
+ clearDir(TABLE_NAME);
+
+ final String QUERY = "SELECT id FROM \"withQuery\" WHERE $CONDITIONS";
+
+ List<String> args = getArgListForQuery(QUERY, TABLE_NAME,
+ false, true, false);
+ createJob(TABLE_NAME, args);
+ runJob(TABLE_NAME);
+ assertDirOfNumbers(TABLE_NAME, 0);
+
+ // Now add some rows.
+ insertIdRows(TABLE_NAME, 0, 10);
+
+ // Running the job a second time should import 10 rows.
+ runJob(TABLE_NAME);
+ assertDirOfNumbers(TABLE_NAME, 10);
+
+ // Add some more rows.
+ insertIdRows(TABLE_NAME, 10, 20);
+
+ // Import only those rows.
+ runJob(TABLE_NAME);
+ assertDirOfNumbers(TABLE_NAME, 20);
+ }
+
+ @Test
+ public void testAppend() throws Exception {
+ // Create a table with data in it; import it.
+ // Then add more data, verify that only the incremental data is pulled.
+
+ final String TABLE_NAME = "append";
+ createIdTable(TABLE_NAME, 10);
+
+ List<String> args = getArgListForTable(TABLE_NAME, false, true);
+ createJob(TABLE_NAME, args);
+ runJob(TABLE_NAME);
+ assertDirOfNumbers(TABLE_NAME, 10);
+
+ // Add some more rows.
+ insertIdRows(TABLE_NAME, 10, 20);
+
+ // Import only those rows.
+ runJob(TABLE_NAME);
+ assertDirOfNumbers(TABLE_NAME, 20);
+ }
+
+ @Test
+ public void testEmptyLastModified() throws Exception {
+ final String TABLE_NAME = "emptyLastModified";
+ createTimestampTable(TABLE_NAME, 0, null);
+ List<String> args = getArgListForTable(TABLE_NAME, true, false);
+
+ Configuration conf = newConf();
+ SqoopOptions options = new SqoopOptions();
+ options.setConf(conf);
+ runImport(options, args);
+
+ assertDirOfNumbers(TABLE_NAME, 0);
+ }
+
+ @Test
+ public void testEmptyLastModifiedWithNonExistingParentDirectory() throws Exception {
+ final String TABLE_NAME = "emptyLastModifiedNoParent";
+ final String QUERY = "SELECT id, last_modified FROM \"" + TABLE_NAME + "\" WHERE $CONDITIONS";
+ final String DIRECTORY = "non-existing/parents/" + TABLE_NAME;
+ createTimestampTable(TABLE_NAME, 0, null);
+ List<String> args = getArgListForQuery(QUERY, DIRECTORY, true, false, false);
+
+ Configuration conf = newConf();
+ SqoopOptions options = new SqoopOptions();
+ options.setConf(conf);
+ runImport(options, args);
+
+ assertDirOfNumbers(DIRECTORY, 0);
+ }
+
+ @Test
+ public void testFullLastModifiedImport() throws Exception {
+ // Given a table of rows imported in the past,
+ // see that they are imported.
+ final String TABLE_NAME = "fullLastModified";
+ Timestamp thePast = new Timestamp(System.currentTimeMillis() - 100);
+ createTimestampTable(TABLE_NAME, 10, thePast);
+
+ List<String> args = getArgListForTable(TABLE_NAME, true, false);
+
+ Configuration conf = newConf();
+ SqoopOptions options = new SqoopOptions();
+ options.setConf(conf);
+ runImport(options, args);
+
+ assertDirOfNumbers(TABLE_NAME, 10);
+ }
+
+ @Test
+ public void testNoImportFromTheFuture() throws Exception {
+ // If last-modified dates for writes are serialized to be in the
+ // future w.r.t. an import, do not import these rows.
+
+ final String TABLE_NAME = "futureLastModified";
+ Timestamp theFuture = new Timestamp(System.currentTimeMillis() + 1000000);
+ createTimestampTable(TABLE_NAME, 10, theFuture);
+
+ List<String> args = getArgListForTable(TABLE_NAME, true, false);
+
+ Configuration conf = newConf();
+ SqoopOptions options = new SqoopOptions();
+ options.setConf(conf);
+ runImport(options, args);
+
+ assertDirOfNumbers(TABLE_NAME, 0);
+ }
+
+ @Test
+ public void testEmptyJobLastMod() throws Exception {
+ // Create a job and run an import on an empty table.
+ // Nothing should happen.
+
+ final String TABLE_NAME = "emptyJobLastMod";
+ createTimestampTable(TABLE_NAME, 0, null);
+
+ List<String> args = getArgListForTable(TABLE_NAME, false, false);
+ args.add("--append");
+ createJob("emptyJobLastMod", args);
+ runJob("emptyJobLastMod");
+ assertDirOfNumbers(TABLE_NAME, 0);
+
+ // Running the job a second time should result in
+ // nothing happening, it's still empty.
+ runJob("emptyJobLastMod");
+ assertDirOfNumbers(TABLE_NAME, 0);
+ }
+
+ @Test
+ public void testEmptyThenFullJobLastMod() throws Exception {
+ // Create an empty table. Import it; nothing happens.
+ // Add some rows. Verify they are appended.
+
+ final String TABLE_NAME = "emptyThenFullTimestamp";
+ createTimestampTable(TABLE_NAME, 0, null);
+
+ List<String> args = getArgListForTable(TABLE_NAME, false, false);
+ args.add("--append");
+ createJob(TABLE_NAME, args);
+ runJob(TABLE_NAME);
+ assertDirOfNumbers(TABLE_NAME, 0);
+
+ long importWasBefore = System.currentTimeMillis();
+
+ // Let some time elapse.
+ Thread.sleep(50);
+
+ long rowsAddedTime = System.currentTimeMillis() - 5;
+
+ // Check: we are adding rows after the previous import time
+ // and before the current time.
+ assertTrue(rowsAddedTime > importWasBefore);
+ assertTrue(rowsAddedTime < System.currentTimeMillis());
+
+ insertIdTimestampRows(TABLE_NAME, 0, 10, new Timestamp(rowsAddedTime));
+
+ // Running the job a second time should import 10 rows.
+ runJob(TABLE_NAME);
+ assertDirOfNumbers(TABLE_NAME, 10);
+
+ // Add some more rows.
+ importWasBefore = System.currentTimeMillis();
+ Thread.sleep(50);
+ rowsAddedTime = System.currentTimeMillis() - 5;
+ assertTrue(rowsAddedTime > importWasBefore);
+ assertTrue(rowsAddedTime < System.currentTimeMillis());
+ insertIdTimestampRows(TABLE_NAME, 10, 20, new Timestamp(rowsAddedTime));
+
+ // Import only those rows.
+ runJob(TABLE_NAME);
+ assertDirOfNumbers(TABLE_NAME, 20);
+ }
+
+ @Test
+ public void testAppendWithTimestamp() throws Exception {
+ // Create a table with data in it; import it.
+ // Then add more data, verify that only the incremental data is pulled.
+
+ final String TABLE_NAME = "appendTimestamp";
+ Timestamp thePast = new Timestamp(System.currentTimeMillis() - 100);
+ createTimestampTable(TABLE_NAME, 10, thePast);
+
+ List<String> args = getArgListForTable(TABLE_NAME, false, false);
+ args.add("--append");
+ createJob(TABLE_NAME, args);
+ runJob(TABLE_NAME);
+ assertDirOfNumbers(TABLE_NAME, 10);
+
+ // Add some more rows.
+ long importWasBefore = System.currentTimeMillis();
+ Thread.sleep(50);
+ long rowsAddedTime = System.currentTimeMillis() - 5;
+ assertTrue(rowsAddedTime > importWasBefore);
+ assertTrue(rowsAddedTime < System.currentTimeMillis());
+ insertIdTimestampRows(TABLE_NAME, 10, 20, new Timestamp(rowsAddedTime));
+
+ // Import only those rows.
+ runJob(TABLE_NAME);
+ assertDirOfNumbers(TABLE_NAME, 20);
+ }
+
+ @Test
+ public void testAppendWithString() throws Exception {
+ // Create a table with string column in it;
+ // incrementally import it on the string column - it should fail.
+
+ final String TABLE_NAME = "appendString";
+ createIdVarcharTable(TABLE_NAME, 10);
+
+ List<String> args = getArgListForTable(TABLE_NAME, false, true);
+ args.add("--append");
+ createJob(TABLE_NAME, args);
+
+ thrown.expect(RuntimeException.class);
+ thrown.reportMissingExceptionWithMessage("Expected incremental import on varchar column to fail");
+ runJob(TABLE_NAME);
+ }
+
+ @Test
+ public void testModifyWithTimestamp() throws Exception {
+ // Create a table with data in it; import it.
+ // Then modify some existing rows, and verify that we only grab
+ // those rows.
+
+ final String TABLE_NAME = "modifyTimestamp";
+ Timestamp thePast = new Timestamp(System.currentTimeMillis() - 100);
+ createTimestampTable(TABLE_NAME, 10, thePast);
+
+ List<String> args = getArgListForTable(TABLE_NAME, false, false);
+ createJob(TABLE_NAME, args);
+ runJob(TABLE_NAME);
+ assertDirOfNumbers(TABLE_NAME, 10);
+
+ // Modify a row.
+ long importWasBefore = System.currentTimeMillis();
+ Thread.sleep(50);
+ long rowsAddedTime = System.currentTimeMillis() - 5;
+ assertTrue(rowsAddedTime > importWasBefore);
+ assertTrue(rowsAddedTime < System.currentTimeMillis());
+ SqoopOptions options = new SqoopOptions();
+ options.setConnectString(SOURCE_DB_URL);
+ HsqldbManager manager = new HsqldbManager(options);
+ Connection c = manager.getConnection();
+ PreparedStatement s = null;
+ try {
+ s = c.prepareStatement("UPDATE " + manager.escapeTableName(TABLE_NAME) + " SET id=?, last_modified=? WHERE id=?");
+ s.setInt(1, 4000); // the first row should have '4000' in it now.
+ s.setTimestamp(2, new Timestamp(rowsAddedTime));
+ s.setInt(3, 0);
+ s.executeUpdate();
+ c.commit();
+ } finally {
+ s.close();
+ }
+
+ // Import only the new row.
+ clearDir(TABLE_NAME);
+ runJob(TABLE_NAME);
+ assertFirstSpecificNumber(TABLE_NAME, 4000);
+ }
+ @Test
+ public void testUpdateModifyWithTimestamp() throws Exception {
+ // Create a table with data in it; import it.
+ // Then modify some existing rows, and verify that we only grab
+ // those rows.
+
+ final String TABLE_NAME = "updateModifyTimestamp";
+ Timestamp thePast = new Timestamp(System.currentTimeMillis() - 100);
+ createTimestampTable(TABLE_NAME, 10, thePast);
+
+ List<String> args = getArgListForTable(TABLE_NAME, false, false);
+
+ Configuration conf = newConf();
+ SqoopOptions options = new SqoopOptions();
+ options.setConf(conf);
+ runImport(options, args);
+ assertDirOfNumbers(TABLE_NAME, 10);
+
+ // Modify a row.
+ long importWasBefore = System.currentTimeMillis();
+ Thread.sleep(50);
+ long rowsAddedTime = System.currentTimeMillis() - 5;
+ assertTrue(rowsAddedTime > importWasBefore);
+ assertTrue(rowsAddedTime < System.currentTimeMillis());
+ SqoopOptions options2 = new SqoopOptions();
+ options2.setConnectString(SOURCE_DB_URL);
+ HsqldbManager manager = new HsqldbManager(options2);
+ Connection c = manager.getConnection();
+ PreparedStatement s = null;
+ try {
+ s = c.prepareStatement("UPDATE " + manager.escapeTableName(TABLE_NAME) + " SET id=?, last_modified=? WHERE id=?");
+ s.setInt(1, 4000); // the first row should have '4000' in it now.
+ s.setTimestamp(2, new Timestamp(rowsAddedTime));
+ s.setInt(3, 0);
+ s.executeUpdate();
+ c.commit();
+ } finally {
+ s.close();
+ }
+
+ // Update the new row.
+ args.add("--last-value");
+ args.add(new Timestamp(importWasBefore).toString());
+ args.add("--merge-key");
+ args.add("ID");
+ conf = newConf();
+ options = new SqoopOptions();
+ options.setConf(conf);
+ runImport(options, args);
+ assertSpecificNumber(TABLE_NAME, 4000);
+ }
+
+ @Test
+ public void testUpdateModifyWithTimestampWithQuery() throws Exception {
+ // Create an empty table. Import it; nothing happens.
+ // Add some rows. Verify they are appended.
+
+ final String TABLE_NAME = "UpdateModifyWithTimestampWithQuery";
+ Timestamp thePast = new Timestamp(System.currentTimeMillis() - 100);
+ createTimestampTable(TABLE_NAME, 10, thePast);
+
+ final String QUERY = "SELECT id, last_modified FROM \"UpdateModifyWithTimestampWithQuery\" WHERE $CONDITIONS";
+
+ List<String> args = getArgListForQuery(QUERY, TABLE_NAME,
+ true, false, false);
+
+ Configuration conf = newConf();
+ SqoopOptions options = new SqoopOptions();
+ options.setConf(conf);
+ runImport(options, args);
+ assertDirOfNumbersAndTimestamps(TABLE_NAME, 10);
+
+ // Modify a row.
+ long importWasBefore = System.currentTimeMillis();
+ Thread.sleep(50);
+ long rowsAddedTime = System.currentTimeMillis() - 5;
+ assertTrue(rowsAddedTime > importWasBefore);
+ assertTrue(rowsAddedTime < System.currentTimeMillis());
+ SqoopOptions options2 = new SqoopOptions();
+ options2.setConnectString(SOURCE_DB_URL);
+ HsqldbManager manager = new HsqldbManager(options2);
+ Connection c = manager.getConnection();
+ PreparedStatement s = null;
+ try {
+ s = c.prepareStatement("UPDATE " + manager.escapeTableName(TABLE_NAME) + " SET id=?, last_modified=? WHERE id=?");
+ s.setInt(1, 4000); // the first row should have '4000' in it now.
+ s.setTimestamp(2, new Timestamp(rowsAddedTime));
+ s.setInt(3, 0);
+ s.executeUpdate();
+ c.commit();
+ } finally {
+ s.close();
+ }
+
+ // Update the new row.
+ args.add("--last-value");
+ args.add(new Timestamp(importWasBefore).toString());
+ args.add("--merge-key");
+ args.add("ID");
+ conf = newConf();
+ options = new SqoopOptions();
+ options.setConf(conf);
+ runImport(options, args);
+ assertSpecificNumber(TABLE_NAME, 4000);
+ }
+
+ @Test
+ public void testUpdateModifyWithTimestampJob() throws Exception {
+ // Create a table with data in it; import it.
+ // Then modify some existing rows, and verify that we only grab
+ // those rows.
+
+ final String TABLE_NAME = "updateModifyTimestampJob";
+ Timestamp thePast = new Timestamp(System.currentTimeMillis() - 100);
+ createTimestampTable(TABLE_NAME, 10, thePast);
+
+ List<String> args = getArgListForTable(TABLE_NAME, false, false);
+ args.add("--merge-key");
+ args.add("ID");
+ createJob(TABLE_NAME, args);
+ runJob(TABLE_NAME);
+ assertDirOfNumbers(TABLE_NAME, 10);
+
+ // Modify a row.
+ long importWasBefore = System.currentTimeMillis();
+ Thread.sleep(50);
+ long rowsAddedTime = System.currentTimeMillis() - 5;
+ assertTrue(rowsAddedTime > importWasBefore);
+ assertTrue(rowsAddedTime < System.currentTimeMillis());
+ SqoopOptions options2 = new SqoopOptions();
+ options2.setConnectString(SOURCE_DB_URL);
+ HsqldbManager manager = new HsqldbManager(options2);
+ Connection c = manager.getConnection();
+ PreparedStatement s = null;
+ try {
+ s = c.prepareStatement("UPDATE " + manager.escapeTableName(TABLE_NAME) + " SET id=?, last_modified=? WHERE id=?");
+ s.setInt(1, 4000); // the first row should have '4000' in it now.
+ s.setTimestamp(2, new Timestamp(rowsAddedTime));
+ s.setInt(3, 0);
+ s.executeUpdate();
+ c.commit();
+ } finally {
+ s.close();
+ }
+
+ // Update the new row.
+ runJob(TABLE_NAME);
+ assertSpecificNumber(TABLE_NAME, 4000);
+ }
+
+ /**
+ * ManagerFactory returning an HSQLDB ConnManager which allows you to
+ * specify the current database timestamp.
+ */
+ public static class InstrumentHsqldbManagerFactory extends ManagerFactory {
+ @Override
+ public ConnManager accept(JobData data) {
+ LOG.info("Using instrumented manager");
+ return new InstrumentHsqldbManager(data.getSqoopOptions());
+ }
+ }
+
+ /**
+ * Hsqldb ConnManager that lets you set the current reported timestamp
+ * from the database, to allow testing of boundary conditions for imports.
+ */
+ public static class InstrumentHsqldbManager extends HsqldbManager {
+ private static Timestamp curTimestamp;
+
+ public InstrumentHsqldbManager(SqoopOptions options) {
+ super(options);
+ }
+
+ @Override
+ public Timestamp getCurrentDbTimestamp() {
+ return InstrumentHsqldbManager.curTimestamp;
+ }
+
+ public static void setCurrentDbTimestamp(Timestamp t) {
+ InstrumentHsqldbManager.curTimestamp = t;
+ }
+ }
+
+ @Test
+ public void testTimestampBoundary() throws Exception {
+ // Run an import, and then insert rows with the last-modified timestamp
+ // set to the exact time when the first import runs. Run a second import
+ // and ensure that we pick up the new data.
+
+ long now = System.currentTimeMillis();
+
+ final String TABLE_NAME = "boundaryTimestamp";
+ Timestamp thePast = new Timestamp(now - 100);
+ createTimestampTable(TABLE_NAME, 10, thePast);
+
+ Timestamp firstJobTime = new Timestamp(now);
+ InstrumentHsqldbManager.setCurrentDbTimestamp(firstJobTime);
+
+ // Configure the job to use the instrumented Hsqldb manager.
+ Configuration conf = newConf();
+ conf.set(ConnFactory.FACTORY_CLASS_NAMES_KEY,
+ InstrumentHsqldbManagerFactory.class.getName());
+
+ List<String> args = getArgListForTable(TABLE_NAME, false, false);
+ args.add("--append");
+ createJob(TABLE_NAME, args, conf);
+ runJob(TABLE_NAME);
+ assertDirOfNumbers(TABLE_NAME, 10);
+
+ // Add some more rows with the timestamp equal to the job run timestamp.
+ insertIdTimestampRows(TABLE_NAME, 10, 20, firstJobTime);
+ assertRowCount(TABLE_NAME, 20);
+
+ // Run a second job with the clock advanced by 100 ms.
+ Timestamp secondJobTime = new Timestamp(now + 100);
+ InstrumentHsqldbManager.setCurrentDbTimestamp(secondJobTime);
+
+ // Import only those rows.
+ runJob(TABLE_NAME);
+ assertDirOfNumbers(TABLE_NAME, 20);
+ }
+
+ @Test
+ public void testIncrementalAppendTimestamp() throws Exception {
+ // Run an import, and then insert rows with the last-modified timestamp
+ // set to the exact time when the first import runs. Run a second import
+ // and ensure that we pick up the new data.
+
+ long now = System.currentTimeMillis();
+
+ final String TABLE_NAME = "incrementalAppendTimestamp";
+ Timestamp thePast = new Timestamp(now - 100);
+ createTimestampTable(TABLE_NAME, 10, thePast);
+
+ Timestamp firstJobTime = new Timestamp(now);
+ InstrumentHsqldbManager.setCurrentDbTimestamp(firstJobTime);
+
+ // Configure the job to use the instrumented Hsqldb manager.
+ Configuration conf = newConf();
+ conf.set(ConnFactory.FACTORY_CLASS_NAMES_KEY,
+ InstrumentHsqldbManagerFactory.class.getName());
+
+ List<String> args = getArgListForTable(TABLE_NAME, false, true, true);
+ createJob(TABLE_NAME, args, conf);
+ runJob(TABLE_NAME);
+ assertDirOfNumbers(TABLE_NAME, 10);
+
+ // Add some more rows with the timestamp equal to the job run timestamp.
+ insertIdTimestampRows(TABLE_NAME, 10, 20, firstJobTime);
+ assertRowCount(TABLE_NAME, 20);
+
+ // Run a second job with the clock advanced by 100 ms.
+ Timestamp secondJobTime = new Timestamp(now + 100);
+ InstrumentHsqldbManager.setCurrentDbTimestamp(secondJobTime);
+
+ // Import only those rows.
+ runJob(TABLE_NAME);
+ assertDirOfNumbers(TABLE_NAME, 20);
+ }
+ @Test
+ public void testIncrementalHiveAppendEmptyThenFull() throws Exception {
+ // This is to test Incremental Hive append feature. SQOOP-2470
+ final String TABLE_NAME = "incrementalHiveAppendEmptyThenFull";
+ Configuration conf = newConf();
+ conf.set(ConnFactory.FACTORY_CLASS_NAMES_KEY,
+ InstrumentHsqldbManagerFactory.class.getName());
+ clearDir(TABLE_NAME);
+ createIdTable(TABLE_NAME, 0);
+ List<String> args = new ArrayList<String>();
+ args.add("--connect");
+ args.add(SOURCE_DB_URL);
+ args.add("--table");
+ args.add(TABLE_NAME);
+ args.add("--warehouse-dir");
+ args.add(BaseSqoopTestCase.LOCAL_WAREHOUSE_DIR);
+ args.add("--hive-import");
+ args.add("--hive-table");
+ args.add(TABLE_NAME + "hive");
+ args.add("--incremental");
+ args.add("append");
+ args.add("--check-column");
+ args.add("ID");
+ args.add("-m");
+ args.add("1");
+ createJob(TABLE_NAME, args, conf);
+ HiveImport.setTestMode(true);
+ String hiveHome = org.apache.sqoop.SqoopOptions.getHiveHomeDefault();
+ assertNotNull("hive.home was not set", hiveHome);
+ String testDataPath = new Path(new Path(hiveHome), "scripts/"
+ + "incrementalHiveAppendEmpty.q").toString();
+ System.clearProperty("expected.script");
+ System.setProperty("expected.script",
+ new File(testDataPath).getAbsolutePath());
+ runJob(TABLE_NAME);
+ assertDirOfNumbers(TABLE_NAME, 0);
+ // Now add some rows.
+ insertIdRows(TABLE_NAME, 0, 10);
+ String testDataPath10 = new Path(new Path(hiveHome), "scripts/"
+ + "incrementalHiveAppend10.q").toString();
+ System.clearProperty("expected.script");
+ System.setProperty("expected.script",
+ new File(testDataPath10).getAbsolutePath());
+ System.getProperty("expected.script");
+ // Running the job a second time should import 10 rows.
+ runJob(TABLE_NAME);
+ assertDirOfNumbers(TABLE_NAME, 10);
+ // Add some more rows.
+ insertIdRows(TABLE_NAME, 10, 20);
+ String testDataPath20 = new Path(new Path(hiveHome), "scripts/"
+ + "incrementalHiveAppend20.q").toString();
+ System.clearProperty("expected.script");
+ System.setProperty("expected.script",
+ new File(testDataPath20).getAbsolutePath());
+ // Import only those rows.
+ runJob(TABLE_NAME);
+ assertDirOfNumbers(TABLE_NAME, 20);
+ }
+
+ // SQOOP-1890
+ @Test
+ public void testTableNameWithSpecialCharacters() throws Exception {
+ // Table name with special characters to verify proper table name escaping
+ final String TABLE_NAME = "my-table.ext";
+ createIdTable(TABLE_NAME, 0);
+
+ // Now add some rows.
+ insertIdRows(TABLE_NAME, 0, 10);
+
+ List<String> args = getArgListForTable(TABLE_NAME, false, true);
+ createJob("emptyJob", args);
+ runJob("emptyJob");
+ assertDirOfNumbers(TABLE_NAME, 10);
+ }
+
+}
+
http://git-wip-us.apache.org/repos/asf/sqoop/blob/6984a36c/src/test/org/apache/sqoop/TestMerge.java
----------------------------------------------------------------------
diff --git a/src/test/org/apache/sqoop/TestMerge.java b/src/test/org/apache/sqoop/TestMerge.java
new file mode 100644
index 0000000..8eef8d4
--- /dev/null
+++ b/src/test/org/apache/sqoop/TestMerge.java
@@ -0,0 +1,375 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.sqoop;
+
+import java.io.BufferedReader;
+import java.io.IOException;
+import java.io.InputStreamReader;
+import java.sql.Connection;
+import java.sql.PreparedStatement;
+import java.sql.SQLException;
+import java.util.Arrays;
+import java.util.List;
+import org.apache.sqoop.testutil.CommonArgs;
+import org.apache.sqoop.testutil.HsqldbTestServer;
+import org.apache.sqoop.manager.ConnManager;
+import org.apache.sqoop.testutil.BaseSqoopTestCase;
+import org.apache.sqoop.tool.CodeGenTool;
+import org.apache.sqoop.tool.ImportTool;
+import org.apache.sqoop.tool.MergeTool;
+import org.apache.sqoop.util.ClassLoaderStack;
+import org.apache.avro.file.DataFileReader;
+import org.apache.avro.file.FileReader;
+import org.apache.avro.file.SeekableInput;
+import org.apache.avro.generic.GenericDatumReader;
+import org.apache.avro.generic.GenericRecord;
+import org.apache.avro.io.DatumReader;
+import org.apache.avro.mapred.FsInput;
+import org.apache.commons.lang.StringUtils;
+import org.apache.commons.logging.Log;
+import org.apache.commons.logging.LogFactory;
+import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.fs.FileStatus;
+import org.apache.hadoop.fs.FileSystem;
+import org.apache.hadoop.fs.Path;
+import org.junit.Before;
+import org.junit.Test;
+import org.kitesdk.data.Dataset;
+import org.kitesdk.data.DatasetReader;
+import org.kitesdk.data.Datasets;
+
+import static org.apache.avro.generic.GenericData.Record;
+import static org.junit.Assert.fail;
+
+/**
+ * Test that the merge tool works.
+ */
+public class TestMerge extends BaseSqoopTestCase {
+
+ private static final Log LOG =
+ LogFactory.getLog(TestMerge.class.getName());
+
+ protected ConnManager manager;
+ protected Connection conn;
+
+ public static final String SOURCE_DB_URL = "jdbc:hsqldb:mem:merge";
+
+ private static final List<List<Integer>> initRecords = Arrays
+ .asList(Arrays.asList(new Integer(0), new Integer(0)),
+ Arrays.asList(new Integer(1), new Integer(42)));
+
+ private static final List<List<Integer>> newRecords = Arrays.asList(
+ Arrays.asList(new Integer(1), new Integer(43)),
+ Arrays.asList(new Integer(3), new Integer(313)));
+
+ private static final List<List<Integer>> mergedRecords = Arrays.asList(
+ Arrays.asList(new Integer(0), new Integer(0)),
+ Arrays.asList(new Integer(1), new Integer(43)),
+ Arrays.asList(new Integer(3), new Integer(313)));
+
+ @Before
+ public void setUp() {
+ super.setUp();
+ manager = getManager();
+ try {
+ conn = manager.getConnection();
+ } catch (SQLException e) {
+ e.printStackTrace();
+ throw new RuntimeException(e);
+ }
+ }
+
+ public static final String TABLE_NAME = "MergeTable";
+ private static final String OLD_PATH = "merge_old";
+ private static final String NEW_PATH = "merge_new";
+ private static final String FINAL_PATH = "merge_final";
+
+ public Configuration newConf() {
+ Configuration conf = new Configuration();
+ if (!BaseSqoopTestCase.isOnPhysicalCluster()) {
+ conf.set(CommonArgs.FS_DEFAULT_NAME, CommonArgs.LOCAL_FS);
+ }
+ conf.set("mapred.job.tracker", "local");
+ return conf;
+ }
+
+ /**
+ * Create a SqoopOptions to connect to the manager.
+ */
+ public SqoopOptions getSqoopOptions(Configuration conf) {
+ SqoopOptions options = new SqoopOptions(conf);
+ options.setConnectString(HsqldbTestServer.getDbUrl());
+
+ return options;
+ }
+
+ protected void createTable(List<List<Integer>> records) throws SQLException {
+ PreparedStatement s = conn.prepareStatement("DROP TABLE \"" + TABLE_NAME + "\" IF EXISTS");
+ try {
+ s.executeUpdate();
+ } finally {
+ s.close();
+ }
+
+ s = conn.prepareStatement("CREATE TABLE \"" + TABLE_NAME
+ + "\" (id INT NOT NULL PRIMARY KEY, val INT, LASTMOD timestamp)");
+ try {
+ s.executeUpdate();
+ } finally {
+ s.close();
+ }
+
+ for (List<Integer> record : records) {
+ final String values = StringUtils.join(record, ", ");
+ s = conn
+ .prepareStatement("INSERT INTO \"" + TABLE_NAME + "\" VALUES (" + values + ", now())");
+ try {
+ s.executeUpdate();
+ } finally {
+ s.close();
+ }
+ }
+
+ conn.commit();
+ }
+
+ @Test
+ public void testTextFileMerge() throws Exception {
+ runMergeTest(SqoopOptions.FileLayout.TextFile);
+ }
+
+ @Test
+ public void testAvroFileMerge() throws Exception {
+ runMergeTest(SqoopOptions.FileLayout.AvroDataFile);
+ }
+
+ @Test
+ public void testParquetFileMerge() throws Exception {
+ runMergeTest(SqoopOptions.FileLayout.ParquetFile);
+ }
+
+ public void runMergeTest(SqoopOptions.FileLayout fileLayout) throws Exception {
+ createTable(initRecords);
+
+ // Create a jar to use for the merging process; we'll load it
+ // into the current thread CL for when this runs. This needs
+ // to contain a different class name than used for the imports
+ // due to classloaderstack issues in the same JVM.
+ final String MERGE_CLASS_NAME = "ClassForMerging";
+ SqoopOptions options = getSqoopOptions(newConf());
+ options.setTableName(TABLE_NAME);
+ options.setClassName(MERGE_CLASS_NAME);
+
+ CodeGenTool codeGen = new CodeGenTool();
+ Sqoop codeGenerator = new Sqoop(codeGen, options.getConf(), options);
+ int ret = Sqoop.runSqoop(codeGenerator, new String[0]);
+ if (0 != ret) {
+ fail("Nonzero exit from codegen: " + ret);
+ }
+
+ List<String> jars = codeGen.getGeneratedJarFiles();
+ String jarFileName = jars.get(0);
+
+ // Now do the imports.
+ importData(OLD_PATH, fileLayout);
+
+ // Check that we got records that meet our expected values.
+ checkData(OLD_PATH, initRecords, fileLayout);
+
+ Thread.sleep(25);
+
+ // Modify the data in the warehouse.
+ createTable(newRecords);
+
+ Thread.sleep(25);
+
+ // Do another import, into the "new" dir.
+ importData(NEW_PATH, fileLayout);
+
+ checkData(NEW_PATH, newRecords, fileLayout);
+
+ // Now merge the results!
+ ClassLoaderStack.addJarFile(jarFileName, MERGE_CLASS_NAME);
+ Path warehouse = new Path(BaseSqoopTestCase.LOCAL_WAREHOUSE_DIR);
+ options = getSqoopOptions(newConf());
+ options.setMergeOldPath(new Path(warehouse, OLD_PATH).toString());
+ options.setMergeNewPath(new Path(warehouse, NEW_PATH).toString());
+ options.setMergeKeyCol("ID");
+ options.setTargetDir(new Path(warehouse, FINAL_PATH).toString());
+ options.setClassName(MERGE_CLASS_NAME);
+ options.setExistingJarName(jarFileName);
+
+ MergeTool mergeTool = new MergeTool();
+ Sqoop merger = new Sqoop(mergeTool, options.getConf(), options);
+ ret = Sqoop.runSqoop(merger, new String[0]);
+ if (0 != ret) {
+ fail("Merge failed with exit code " + ret);
+ }
+
+ checkData(FINAL_PATH, mergedRecords, fileLayout);
+ }
+
+ private void checkData(String dataDir, List<List<Integer>> records,
+ SqoopOptions.FileLayout fileLayout) throws Exception {
+ for (List<Integer> record : records) {
+ assertRecordStartsWith(record, dataDir, fileLayout);
+ }
+ }
+
+ private boolean valueMatches(GenericRecord genericRecord, List<Integer> recordVals) {
+ return recordVals.get(0).equals(genericRecord.get(0))
+ && recordVals.get(1).equals(genericRecord.get(1));
+ }
+
+ private void importData(String targetDir, SqoopOptions.FileLayout fileLayout) {
+ SqoopOptions options;
+ options = getSqoopOptions(newConf());
+ options.setTableName(TABLE_NAME);
+ options.setNumMappers(1);
+ options.setFileLayout(fileLayout);
+ options.setDeleteMode(true);
+
+ Path warehouse = new Path(BaseSqoopTestCase.LOCAL_WAREHOUSE_DIR);
+ options.setTargetDir(new Path(warehouse, targetDir).toString());
+
+ ImportTool importTool = new ImportTool();
+ Sqoop importer = new Sqoop(importTool, options.getConf(), options);
+ int ret = Sqoop.runSqoop(importer, new String[0]);
+ if (0 != ret) {
+ fail("Initial import failed with exit code " + ret);
+ }
+ }
+
+ /**
+ * @return true if the file specified by path 'p' contains a line
+ * that starts with 'prefix'
+ */
+ protected boolean checkTextFileForLine(FileSystem fs, Path p, List<Integer> record)
+ throws IOException {
+ final String prefix = StringUtils.join(record, ',');
+ BufferedReader r = new BufferedReader(new InputStreamReader(fs.open(p)));
+ try {
+ while (true) {
+ String in = r.readLine();
+ if (null == in) {
+ break; // done with the file.
+ }
+
+ if (in.startsWith(prefix)) {
+ return true;
+ }
+ }
+ } finally {
+ r.close();
+ }
+
+ return false;
+ }
+
+ private boolean checkAvroFileForLine(FileSystem fs, Path p, List<Integer> record)
+ throws IOException {
+ SeekableInput in = new FsInput(p, new Configuration());
+ DatumReader<GenericRecord> datumReader = new GenericDatumReader<GenericRecord>();
+ FileReader<GenericRecord> reader = DataFileReader.openReader(in, datumReader);
+ reader.sync(0);
+
+ while (reader.hasNext()) {
+ if (valueMatches(reader.next(), record)) {
+ return true;
+ }
+ }
+
+ return false;
+ }
+
+ private boolean checkParquetFileForLine(FileSystem fileSystem, Path path, List<Integer> record) throws IOException
+ {
+ Dataset<Record> parquetRecords = Datasets.load("dataset:" + path.getParent(), Record.class);
+ DatasetReader<Record> datasetReader = null;
+ try {
+ datasetReader = parquetRecords.newReader();
+ for (GenericRecord genericRecord : datasetReader) {
+ if (valueMatches(genericRecord, record)) {
+ return true;
+ }
+ }
+ }
+ finally {
+ if (datasetReader != null) {
+ datasetReader.close();
+ }
+ }
+
+ return false;
+ }
+
+ protected boolean checkFileForLine(FileSystem fs, Path p, SqoopOptions.FileLayout fileLayout,
+ List<Integer> record) throws IOException {
+ boolean result = false;
+ switch (fileLayout) {
+ case TextFile:
+ result = checkTextFileForLine(fs, p, record);
+ break;
+ case AvroDataFile:
+ result = checkAvroFileForLine(fs, p, record);
+ break;
+ case ParquetFile:
+ result = checkParquetFileForLine(fs, p, record);
+ break;
+ }
+ return result;
+ }
+
+ /**
+ * Return true if there's a file in 'dirName' with a line that starts with
+ * 'prefix'.
+ */
+ protected boolean recordStartsWith(List<Integer> record, String dirName,
+ SqoopOptions.FileLayout fileLayout)
+ throws Exception {
+ Path warehousePath = new Path(LOCAL_WAREHOUSE_DIR);
+ Path targetPath = new Path(warehousePath, dirName);
+
+ FileSystem fs = FileSystem.getLocal(new Configuration());
+ FileStatus [] files = fs.listStatus(targetPath);
+
+ if (null == files || files.length == 0) {
+ fail("Got no import files!");
+ }
+
+ for (FileStatus stat : files) {
+ Path p = stat.getPath();
+ if (p.getName().startsWith("part-") || p.getName().endsWith(".parquet")) {
+ if (checkFileForLine(fs, p, fileLayout, record)) {
+ // We found the line. Nothing further to do.
+ return true;
+ }
+ }
+ }
+
+ return false;
+ }
+
+ protected void assertRecordStartsWith(List<Integer> record, String dirName,
+ SqoopOptions.FileLayout fileLayout) throws Exception {
+ if (!recordStartsWith(record, dirName, fileLayout)) {
+ fail("No record found that starts with [" + StringUtils.join(record, ", ") + "] in " + dirName);
+ }
+ }
+}
http://git-wip-us.apache.org/repos/asf/sqoop/blob/6984a36c/src/test/org/apache/sqoop/TestMultiCols.java
----------------------------------------------------------------------
diff --git a/src/test/org/apache/sqoop/TestMultiCols.java b/src/test/org/apache/sqoop/TestMultiCols.java
new file mode 100644
index 0000000..1c932e9
--- /dev/null
+++ b/src/test/org/apache/sqoop/TestMultiCols.java
@@ -0,0 +1,241 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.sqoop;
+
+import java.io.IOException;
+
+import org.apache.commons.logging.Log;
+import org.apache.commons.logging.LogFactory;
+
+import org.apache.sqoop.testutil.ImportJobTestCase;
+import org.junit.Test;
+
+/**
+ * Test cases that import rows containing multiple columns,
+ * some of which may contain null values.
+ *
+ * Also test loading only selected columns from the db.
+ */
+public class TestMultiCols extends ImportJobTestCase {
+
+ public static final Log LOG = LogFactory.getLog(
+ TestMultiCols.class.getName());
+
+ /**
+ * Do a full import verification test on a table containing one row.
+ * @param types the types of the columns to insert
+ * @param insertVals the SQL text to use to insert each value
+ * @param validateLine the text to expect as a toString() of the entire row,
+ * as imported by the tool
+ */
+ private void verifyTypes(String [] types , String [] insertVals,
+ String validateLine) {
+ verifyTypes(types, insertVals, validateLine, null);
+ }
+
+ private void verifyTypes(String [] types , String [] insertVals,
+ String validateLine, String [] importColumns) {
+
+ createTableWithColTypes(types, insertVals);
+ verifyImport(validateLine, importColumns);
+ LOG.debug("Verified input line as " + validateLine + " -- ok!");
+ }
+
+ @Test
+ public void testThreeStrings() {
+ String [] types = { "VARCHAR(32)", "VARCHAR(32)", "VARCHAR(32)" };
+ String [] insertVals = { "'foo'", "'bar'", "'baz'" };
+ String validateLine = "foo,bar,baz";
+
+ verifyTypes(types, insertVals, validateLine);
+ }
+
+ @Test
+ public void testStringsWithNull1() {
+ String [] types = { "VARCHAR(32)", "VARCHAR(32)", "VARCHAR(32)" };
+ String [] insertVals = { "'foo'", "null", "'baz'" };
+ String validateLine = "foo,null,baz";
+
+ verifyTypes(types, insertVals, validateLine);
+ }
+
+ @Test
+ public void testStringsWithNull2() {
+ String [] types = { "VARCHAR(32)", "VARCHAR(32)", "VARCHAR(32)" };
+ String [] insertVals = { "null", "'foo'", "'baz'" };
+ String validateLine = "null,foo,baz";
+
+ verifyTypes(types, insertVals, validateLine);
+ }
+
+ @Test
+ public void testStringsWithNull3() {
+ String [] types = { "VARCHAR(32)", "VARCHAR(32)", "VARCHAR(32)" };
+ String [] insertVals = { "'foo'", "'baz'", "null"};
+ String validateLine = "foo,baz,null";
+
+ verifyTypes(types, insertVals, validateLine);
+ }
+
+ @Test
+ public void testThreeInts() {
+ String [] types = { "INTEGER", "INTEGER", "INTEGER" };
+ String [] insertVals = { "1", "2", "3" };
+ String validateLine = "1,2,3";
+
+ verifyTypes(types, insertVals, validateLine);
+ }
+
+ @Test
+ public void testIntsWithNulls() {
+ String [] types = { "INTEGER", "INTEGER", "INTEGER" };
+ String [] insertVals = { "1", "null", "3" };
+ String validateLine = "1,null,3";
+
+ verifyTypes(types, insertVals, validateLine);
+ }
+
+ @Test
+ public void testMixed1() {
+ String [] types = { "INTEGER", "VARCHAR(32)", "DATE" };
+ String [] insertVals = { "1", "'meep'", "'2009-12-31'" };
+ String validateLine = "1,meep,2009-12-31";
+
+ verifyTypes(types, insertVals, validateLine);
+ }
+
+ @Test
+ public void testMixed2() {
+ String [] types = { "INTEGER", "VARCHAR(32)", "DATE" };
+ String [] insertVals = { "null", "'meep'", "'2009-12-31'" };
+ String validateLine = "null,meep,2009-12-31";
+
+ verifyTypes(types, insertVals, validateLine);
+ }
+
+ @Test
+ public void testMixed3() {
+ String [] types = { "INTEGER", "VARCHAR(32)", "DATE" };
+ String [] insertVals = { "1", "'meep'", "null" };
+ String validateLine = "1,meep,null";
+
+ verifyTypes(types, insertVals, validateLine);
+ }
+
+ @Test
+ public void testMixed4() {
+ String [] types = { "NUMERIC", "INTEGER", "NUMERIC" };
+ String [] insertVals = { "-42", "17", "33333333333333333333333.1714" };
+ String validateLine = "-42,17,33333333333333333333333.1714";
+
+ verifyTypes(types, insertVals, validateLine);
+ }
+
+ @Test
+ public void testMixed5() {
+ String [] types = { "NUMERIC", "INTEGER", "NUMERIC" };
+ String [] insertVals = { "null", "17", "33333333333333333333333.0" };
+ String validateLine = "null,17,33333333333333333333333.0";
+
+ verifyTypes(types, insertVals, validateLine);
+ }
+
+ @Test
+ public void testMixed6() {
+ String [] types = { "NUMERIC", "INTEGER", "NUMERIC" };
+ String [] insertVals = { "33333333333333333333333", "17", "-42"};
+ String validateLine = "33333333333333333333333,17,-42";
+
+ verifyTypes(types, insertVals, validateLine);
+ }
+
+ //////////////////////////////////////////////////////////////////////////
+ // the tests below here test the --columns parameter and ensure that
+ // we can selectively import only certain columns.
+ //////////////////////////////////////////////////////////////////////////
+
+ @Test
+ public void testSkipFirstCol() {
+ String [] types = { "NUMERIC", "INTEGER", "NUMERIC" };
+ String [] insertVals = { "33333333333333333333333", "17", "-42"};
+ String validateLine = "17,-42";
+
+ String [] loadCols = {"DATA_COL1", "DATA_COL2"};
+
+ verifyTypes(types, insertVals, validateLine, loadCols);
+ }
+
+ @Test
+ public void testSkipSecondCol() {
+ String [] types = { "NUMERIC", "INTEGER", "NUMERIC" };
+ String [] insertVals = { "33333333333333333333333", "17", "-42"};
+ String validateLine = "33333333333333333333333,-42";
+
+ String [] loadCols = {"DATA_COL0", "DATA_COL2"};
+
+ verifyTypes(types, insertVals, validateLine, loadCols);
+ }
+
+ @Test
+ public void testSkipThirdCol() {
+ String [] types = { "NUMERIC", "INTEGER", "NUMERIC" };
+ String [] insertVals = { "33333333333333333333333", "17", "-42"};
+ String validateLine = "33333333333333333333333,17";
+
+ String [] loadCols = {"DATA_COL0", "DATA_COL1"};
+
+ verifyTypes(types, insertVals, validateLine, loadCols);
+ }
+
+ /**
+ * This tests that the columns argument can handle comma-separated column
+ * names. So this is like having:
+ * --columns "DATA_COL0,DATA_COL1,DATA_COL2"
+ * as two args on a sqoop command line
+ *
+ * @throws IOException
+ */
+ @Test
+ public void testSingleColumnsArg() throws IOException {
+ String [] types = { "VARCHAR(32)", "VARCHAR(32)", "VARCHAR(32)" };
+ String [] insertVals = { "'foo'", "'bar'", "'baz'" };
+ String validateLine = "foo,bar,baz";
+ String [] loadCols = {"DATA_COL0,DATA_COL1,DATA_COL2"};
+
+ verifyTypes(types, insertVals, validateLine, loadCols);
+ }
+
+ /**
+ * This tests that the columns argument can handle spaces between column
+ * names. So this is like having:
+ * --columns "DATA_COL0, DATA_COL1, DATA_COL2"
+ * as two args on a sqoop command line
+ *
+ * @throws IOException
+ */
+ @Test
+ public void testColumnsWithSpaces() throws IOException {
+ String [] types = { "VARCHAR(32)", "VARCHAR(32)", "VARCHAR(32)" };
+ String [] insertVals = { "'foo'", "'bar'", "'baz'" };
+ String validateLine = "foo,bar,baz";
+ String [] loadCols = {"DATA_COL0, DATA_COL1, DATA_COL2"};
+
+ verifyTypes(types, insertVals, validateLine, loadCols);
+ }
+}
http://git-wip-us.apache.org/repos/asf/sqoop/blob/6984a36c/src/test/org/apache/sqoop/TestMultiMaps.java
----------------------------------------------------------------------
diff --git a/src/test/org/apache/sqoop/TestMultiMaps.java b/src/test/org/apache/sqoop/TestMultiMaps.java
new file mode 100644
index 0000000..050e268
--- /dev/null
+++ b/src/test/org/apache/sqoop/TestMultiMaps.java
@@ -0,0 +1,189 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.sqoop;
+
+import java.io.IOException;
+import java.util.ArrayList;
+import java.util.List;
+
+import org.apache.commons.cli.ParseException;
+import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.fs.FileStatus;
+import org.apache.hadoop.fs.FileSystem;
+import org.apache.hadoop.fs.Path;
+import org.apache.hadoop.io.IOUtils;
+import org.apache.hadoop.io.SequenceFile;
+import org.apache.hadoop.mapred.Utils;
+import org.apache.hadoop.util.ReflectionUtils;
+
+import org.apache.sqoop.SqoopOptions;
+import org.apache.sqoop.SqoopOptions.InvalidOptionsException;
+import org.apache.sqoop.orm.CompilationManager;
+import org.apache.sqoop.testutil.*;
+import org.apache.sqoop.tool.ImportTool;
+import org.apache.sqoop.util.ClassLoaderStack;
+import org.junit.Test;
+
+import static org.junit.Assert.assertEquals;
+import static org.junit.Assert.fail;
+
+/**
+ * Test that using multiple mapper splits works.
+ */
+public class TestMultiMaps extends ImportJobTestCase {
+
+ /**
+ * Create the argv to pass to Sqoop.
+ * @return the argv as an array of strings.
+ */
+ protected String [] getArgv(boolean includeHadoopFlags, String [] colNames,
+ String splitByCol) {
+ String columnsString = "";
+ for (String col : colNames) {
+ columnsString += col + ",";
+ }
+
+ ArrayList<String> args = new ArrayList<String>();
+
+ if (includeHadoopFlags) {
+ CommonArgs.addHadoopFlags(args);
+ }
+
+ args.add("--table");
+ args.add(HsqldbTestServer.getTableName());
+ args.add("--columns");
+ args.add(columnsString);
+ args.add("--split-by");
+ args.add(splitByCol);
+ args.add("--warehouse-dir");
+ args.add(getWarehouseDir());
+ args.add("--connect");
+ args.add(HsqldbTestServer.getUrl());
+ args.add("--as-sequencefile");
+ args.add("--num-mappers");
+ args.add("2");
+
+ return args.toArray(new String[0]);
+ }
+
+ // this test just uses the two int table.
+ protected String getTableName() {
+ return HsqldbTestServer.getTableName();
+ }
+
+ /** @return a list of Path objects for each data file */
+ protected List<Path> getDataFilePaths() throws IOException {
+ List<Path> paths = new ArrayList<Path>();
+ Configuration conf = new Configuration();
+ if (!BaseSqoopTestCase.isOnPhysicalCluster()) {
+ conf.set(CommonArgs.FS_DEFAULT_NAME, CommonArgs.LOCAL_FS);
+ }
+ FileSystem fs = FileSystem.get(conf);
+
+ FileStatus [] stats = fs.listStatus(getTablePath(),
+ new Utils.OutputFileUtils.OutputFilesFilter());
+
+ for (FileStatus stat : stats) {
+ paths.add(stat.getPath());
+ }
+
+ return paths;
+ }
+
+ /**
+ * Given a comma-delimited list of integers, grab and parse the first int.
+ * @param str a comma-delimited list of values, the first of which is an int.
+ * @return the first field in the string, cast to int
+ */
+ private int getFirstInt(String str) {
+ String [] parts = str.split(",");
+ return Integer.parseInt(parts[0]);
+ }
+
+ public void runMultiMapTest(String splitByCol, int expectedSum)
+ throws IOException {
+
+ String [] columns = HsqldbTestServer.getFieldNames();
+ ClassLoader prevClassLoader = null;
+ SequenceFile.Reader reader = null;
+
+ String [] argv = getArgv(true, columns, splitByCol);
+ runImport(argv);
+ try {
+ ImportTool importTool = new ImportTool();
+ SqoopOptions opts = importTool.parseArguments(
+ getArgv(false, columns, splitByCol),
+ null, null, true);
+
+ CompilationManager compileMgr = new CompilationManager(opts);
+ String jarFileName = compileMgr.getJarFilename();
+
+ prevClassLoader = ClassLoaderStack.addJarFile(jarFileName,
+ getTableName());
+
+ List<Path> paths = getDataFilePaths();
+ Configuration conf = new Configuration();
+ int curSum = 0;
+
+ // We expect multiple files. We need to open all the files and sum up the
+ // first column across all of them.
+ for (Path p : paths) {
+ reader = SeqFileReader.getSeqFileReader(p.toString());
+
+ // here we can actually instantiate (k, v) pairs.
+ Object key = ReflectionUtils.newInstance(reader.getKeyClass(), conf);
+ Object val = ReflectionUtils.newInstance(reader.getValueClass(), conf);
+
+ // We know that these values are two ints separated by a ','
+ // character. Since this is all dynamic, though, we don't want to
+ // actually link against the class and use its methods. So we just
+ // parse this back into int fields manually. Sum them up and ensure
+ // that we get the expected total for the first column, to verify that
+ // we got all the results from the db into the file.
+
+ // now sum up everything in the file.
+ while (reader.next(key) != null) {
+ reader.getCurrentValue(val);
+ curSum += getFirstInt(val.toString());
+ }
+
+ IOUtils.closeStream(reader);
+ reader = null;
+ }
+
+ assertEquals("Total sum of first db column mismatch", expectedSum,
+ curSum);
+ } catch (InvalidOptionsException ioe) {
+ fail(ioe.toString());
+ } catch (ParseException pe) {
+ fail(pe.toString());
+ } finally {
+ IOUtils.closeStream(reader);
+
+ if (null != prevClassLoader) {
+ ClassLoaderStack.setCurrentClassLoader(prevClassLoader);
+ }
+ }
+ }
+
+ @Test
+ public void testSplitByFirstCol() throws IOException {
+ runMultiMapTest("INTFIELD1", HsqldbTestServer.getFirstColSum());
+ }
+}