You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@sqoop.apache.org by an...@apache.org on 2018/01/18 14:04:09 UTC
[23/32] sqoop git commit: SQOOP-3273: Removing com.cloudera.sqoop
packages
http://git-wip-us.apache.org/repos/asf/sqoop/blob/6984a36c/src/test/com/cloudera/sqoop/TestExportUpdate.java
----------------------------------------------------------------------
diff --git a/src/test/com/cloudera/sqoop/TestExportUpdate.java b/src/test/com/cloudera/sqoop/TestExportUpdate.java
deleted file mode 100644
index 683f591..0000000
--- a/src/test/com/cloudera/sqoop/TestExportUpdate.java
+++ /dev/null
@@ -1,710 +0,0 @@
-/**
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements. See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership. The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package com.cloudera.sqoop;
-
-import java.io.BufferedWriter;
-import java.io.IOException;
-import java.io.OutputStream;
-import java.io.OutputStreamWriter;
-import java.sql.Connection;
-import java.sql.PreparedStatement;
-import java.sql.ResultSet;
-import java.sql.SQLException;
-
-import org.apache.hadoop.conf.Configuration;
-import org.apache.hadoop.fs.FileSystem;
-import org.apache.hadoop.fs.Path;
-import org.apache.hadoop.util.StringUtils;
-
-import com.cloudera.sqoop.testutil.CommonArgs;
-import com.cloudera.sqoop.testutil.ExportJobTestCase;
-
-import static org.junit.Assert.assertEquals;
-import static org.junit.Assert.assertFalse;
-import static org.junit.Assert.assertTrue;
-
-import org.junit.Rule;
-import org.junit.Test;
-import org.junit.rules.ExpectedException;
-
-/**
- * Test that we can update a copy of data in the database,
- * based on newer data in HDFS.
- */
-public class TestExportUpdate extends ExportJobTestCase {
-
- @Rule
- public ExpectedException thrown = ExpectedException.none();
-
- @Override
- protected String getTablePrefix() {
- return "UPDATE_TABLE_";
- }
-
- /**
- * Create the underlying table to update.
- */
- private void populateDatabase(int numRows) throws SQLException {
- Connection conn = getConnection();
-
- PreparedStatement statement = conn.prepareStatement(
- "CREATE TABLE " + getTableName()
- + " (A INT NOT NULL, B VARCHAR(32), C INT)");
- try {
- statement.executeUpdate();
- conn.commit();
- } finally {
- statement.close();
- statement = null;
- }
-
- try {
- for (int i = 0; i < numRows; i++) {
- statement = conn.prepareStatement("INSERT INTO " + getTableName()
- + " VALUES (" + i + ", 'foo" + i + "', " + i + ")");
- statement.executeUpdate();
- statement.close();
- statement = null;
- }
- } finally {
- if (null != statement) {
- statement.close();
- }
- }
-
- conn.commit();
- }
-
- /**
- * <p>Creates a table with three columns - A INT, B INT and C VARCHAR(32).
- * This table is populated with records in a set of three with total records
- * with the total number of unique values of A equal to the specified aMax
- * value. For each value of A, there will be three records with value of
- * B ranging from 0-2, and a corresponding value of C.</p>
- * <p>For example if <tt>aMax = 2</tt>, the table will contain the
- * following records:
- * <pre>
- * A | B | C
- * ----------------------
- * 0 | 0 | 0foo0
- * 0 | 1 | 0foo1
- * 0 | 2 | 0foo2
- * 1 | 0 | 1foo0
- * 1 | 1 | 1foo1
- * 1 | 2 | 1foo2
- * </pre></p>
- * @param aMax the number of
- * @throws SQLException
- */
- private void createMultiKeyTable(int aMax) throws SQLException {
- Connection conn = getConnection();
-
- PreparedStatement statement = conn.prepareStatement(
- "CREATE TABLE " + getTableName()
- + " (A INT NOT NULL, B INT NOT NULL, C VARCHAR(32))");
- try {
- statement.executeUpdate();
- conn.commit();
- } finally {
- statement.close();
- statement = null;
- }
-
- try {
- for (int i = 0; i< aMax; i++) {
- for (int j = 0; j < 3; j++) {
- statement = conn.prepareStatement("INSERT INTO " + getTableName()
- + " VALUES (" + i + ", " + j + ", '"
- + i + "foo" + j + "')");
- statement.executeUpdate();
- statement.close();
- statement = null;
- }
- }
- } finally {
- if (null != statement) {
- statement.close();
- }
- }
-
- conn.commit();
- }
-
- /**
- * <p>Creates update files for multi-key update test. The total number of
- * update records will be number of files times the number of aKeysPerFile
- * times 3. Column A value will start with the specified <tt>startAtValue</tt>
- * and for each value there will be three records corresponding to Column
- * B values [0-2].</p>
- * @param numFiles number of files to create
- * @param aKeysPerFile number of records sets with different column A values
- * @param startAtValue the starting value of column A
- * @param bKeyValues the list of values for the column B
- * @throws IOException
- */
- private void createMultiKeyUpdateFiles(int numFiles, int aKeysPerFile,
- int startAtValue, int[] bKeyValues)
- throws IOException {
- Configuration conf = getConf();
- if (!isOnPhysicalCluster()) {
- conf.set(CommonArgs.FS_DEFAULT_NAME, CommonArgs.LOCAL_FS);
- }
- FileSystem fs = FileSystem.get(conf);
-
- int aValue = startAtValue;
- for (int i = 0; i < numFiles; i++) {
- OutputStream os = fs.create(new Path(getTablePath(), "" + i + ".txt"));
- BufferedWriter w = new BufferedWriter(new OutputStreamWriter(os));
-
- for (int j = 0; j < aKeysPerFile; j++) {
- for (int k = 0; k < bKeyValues.length; k++) {
- w.write(getUpdateStringForMultiKeyRow(aValue, bKeyValues[k]));
- }
- aValue++;
- }
-
- w.close();
- os.close();
- }
- }
-
- /**
- * Generate a string of text representing an update for one row
- * of the multi-key table. The values of columns A and B are given
- * and the value of column C is generated as <em>a</em>bar<em>b</em>.
- * @param a the value of column a
- * @param b the value of column b
- */
- private String getUpdateStringForMultiKeyRow(int a, int b) {
- StringBuilder sb = new StringBuilder();
- sb.append(a).append("\t").append(b).append("\t").append(a);
- sb.append("bar").append(b).append("\n");
-
- return sb.toString();
- }
-
-
- /**
- * Create a set of files that will be used as the input to the update
- * process.
- * @param numFiles the number of files to generate
- * @param updatesPerFile the number of rows to create in each file
- * @param keyCol a value between 0 and 2 specifying whether 'a',
- * 'b', or 'c' ({@see populateDatabase()}) is the key column to keep
- * the same.
- * @param startOffsets is an optional list of row ids/values for a/c
- * which are the record ids at which the update files begin.
- * For instance, if numFiles=3, updatesPerFile=2, and keyCol=0 then
- * if startOffsets is {5, 10, 12}, files will be generated to update
- * rows with A=5,6; A=10,11; A=12,13.
- *
- * If startOffsets is empty or underspecified (given numFiles), then
- * subsequent files will start immediately after the previous file.
- */
- private void createUpdateFiles(int numFiles, int updatesPerFile,
- int keyCol, int... startOffsets) throws IOException {
- Configuration conf = getConf();
- if (!isOnPhysicalCluster()) {
- conf.set(CommonArgs.FS_DEFAULT_NAME, CommonArgs.LOCAL_FS);
- }
- FileSystem fs = FileSystem.get(conf);
-
- int rowId = 0;
- for (int i = 0; i < numFiles; i++) {
- OutputStream os = fs.create(new Path(getTablePath(), "" + i + ".txt"));
- BufferedWriter w = new BufferedWriter(new OutputStreamWriter(os));
-
- if (null != startOffsets && startOffsets.length > i) {
- // If a start offset has been specified for this file, go there.
- // Otherwise, just carry over from the previous file iteration.
- rowId = startOffsets[i];
- }
-
- for (int j = 0; j < updatesPerFile; j++) {
- w.write(getUpdateStringForRow(keyCol, rowId++));
- }
-
- w.close();
- os.close();
- }
- }
-
- /**
- * Generate a string of text representing an update for one row
- * of the database. keyCol is a value in [0, 2] representing which
- * column is kept fixed. rowId specifies the row being updated.
- */
- private String getUpdateStringForRow(int keyCol, int rowId) {
- StringBuilder sb = new StringBuilder();
-
- int [] rowInts = new int[3]; // There are 3 columns in the table.
- for (int i = 0; i < 3; i++) {
- if (keyCol == i) {
- // Keep this column fixed.
- rowInts[i] = rowId;
- } else {
- // Update the int in this column.
- rowInts[i] = rowId * 2;
- }
- }
-
- sb.append(rowInts[0]);
- sb.append("\tfoo");
- sb.append(rowInts[1]);
- sb.append("\t");
- sb.append(rowInts[2]);
- sb.append("\n");
-
- return sb.toString();
- }
-
- /**
- * Verifies the number of rows in the table.
- */
- private void verifyRowCount(int expectedCount) throws SQLException {
- String query = "SELECT COUNT(*) FROM " + getTableName();
- PreparedStatement statement = null;
- ResultSet rs = null;
-
- try {
- Connection conn = getConnection();
- statement = conn.prepareStatement(query);
- rs = statement.executeQuery();
-
- boolean success = rs.next();
- assertTrue("Expected at least one result", success);
-
- int trueCount = rs.getInt(1);
- assertEquals(expectedCount, trueCount);
-
- // This query should have returned exactly one row.
- success = rs.next();
- assertFalse("Expected no more than one output record", success);
- } finally {
- if (null != rs) {
- try {
- rs.close();
- } catch (SQLException sqle) {
- LOG.error("Error closing result set: "
- + StringUtils.stringifyException(sqle));
- }
- }
-
- if (null != statement) {
- statement.close();
- }
- }
- }
-
- private void verifyMultiKeyRow(String[] keyColumnNames, int[] keyValues,
- Object ...expectedVals) throws SQLException {
- StringBuilder querySb = new StringBuilder("SELECT A, B, C FROM ");
- querySb.append(getTableName()).append(" WHERE ");
- boolean first = true;
- for (int i = 0; i< keyColumnNames.length; i++) {
- if (first) {
- first = false;
- } else {
- querySb.append(" AND ");
- }
- querySb.append(keyColumnNames[i]).append(" = ");
- querySb.append(keyValues[i]);
- }
-
- String query = querySb.toString();
- PreparedStatement statement = null;
- ResultSet rs = null;
-
- try {
- Connection conn = getConnection();
- statement = conn.prepareStatement(query);
- rs = statement.executeQuery();
-
- boolean success = rs.next();
- assertTrue("Expected at least one output record", success);
-
- // Assert that all three columns have the correct values.
- for (int i = 0; i < expectedVals.length; i++) {
- String expected = expectedVals[i].toString();
- String result = rs.getString(i + 1);
- assertEquals("Invalid response for column " + i + "; got " + result
- + " when expected " + expected, expected, result);
- }
-
- // This query should have returned exactly one row.
- success = rs.next();
- assertFalse("Expected no more than one output record", success);
- } finally {
- if (null != rs) {
- try {
- rs.close();
- } catch (SQLException sqle) {
- LOG.error("Error closing result set: "
- + StringUtils.stringifyException(sqle));
- }
- }
-
- if (null != statement) {
- statement.close();
- }
- }
- }
-
- /**
- * Verify that a particular row has the expected values.
- */
- private void verifyRow(String keyColName, String keyVal,
- String... expectedVals) throws SQLException {
- String query = "SELECT A, B, C FROM " + getTableName() + " WHERE "
- + keyColName + " = " + keyVal;
- PreparedStatement statement = null;
- ResultSet rs = null;
-
- try {
- Connection conn = getConnection();
- statement = conn.prepareStatement(query);
- rs = statement.executeQuery();
-
- boolean success = rs.next();
- assertTrue("Expected at least one output record", success);
-
- // Assert that all three columns have the correct values.
- for (int i = 0; i < expectedVals.length; i++) {
- String expected = expectedVals[i];
- String result = rs.getString(i + 1);
- assertEquals("Invalid response for column " + i + "; got " + result
- + " when expected " + expected, expected, result);
- }
-
- // This query should have returned exactly one row.
- success = rs.next();
- assertFalse("Expected no more than one output record", success);
- } finally {
- if (null != rs) {
- try {
- rs.close();
- } catch (SQLException sqle) {
- LOG.error("Error closing result set: "
- + StringUtils.stringifyException(sqle));
- }
- }
-
- if (null != statement) {
- statement.close();
- }
- }
- }
-
- private void runUpdate(int numMappers, String updateCol) throws IOException {
- runExport(getArgv(true, 2, 2, "-m", "" + numMappers,
- "--update-key", updateCol));
- }
-
- @Test
- public void testBasicUpdate() throws Exception {
- // Test that we can do a single-task single-file update.
- // This updates the entire database.
-
- populateDatabase(10);
- createUpdateFiles(1, 10, 0, 0);
- runUpdate(1, "A");
- verifyRowCount(10);
- // Check a few rows...
- verifyRow("A", "0", "0", "foo0", "0");
- verifyRow("A", "1", "1", "foo2", "2");
- verifyRow("A", "9", "9", "foo18", "18");
- }
-
- /**
- * Creates a table with two columns that together act as unique keys
- * and then modifies a subset of the rows via update.
- * @throws Exception
- */
- @Test
- public void testMultiKeyUpdate() throws Exception {
- createMultiKeyTable(3);
-
- createMultiKeyUpdateFiles(1, 1, 1, new int[] {0, 1, 3});
-
- runExport(getArgv(true, 2, 2, "-m", "1",
- "--update-key", "A,B"));
- verifyRowCount(9);
- // Check a few rows...
- verifyMultiKeyRow(new String[] { "A", "B"},
- new int[] { 0, 0 }, 0, 0, "0foo0");
- verifyMultiKeyRow(new String[] { "A", "B"},
- new int[] { 0, 1 }, 0, 1, "0foo1");
- verifyMultiKeyRow(new String[] { "A", "B"},
- new int[] { 0, 2 }, 0, 2, "0foo2");
-
- verifyMultiKeyRow(new String[] { "A", "B"},
- new int[] { 1, 0 }, 1, 0, "1bar0");
- verifyMultiKeyRow(new String[] { "A", "B"},
- new int[] { 1, 1 }, 1, 1, "1bar1");
- verifyMultiKeyRow(new String[] { "A", "B"},
- new int[] { 1, 2 }, 1, 2, "1foo2");
-
- verifyMultiKeyRow(new String[] { "A", "B"},
- new int[] { 2, 0 }, 2, 0, "2foo0");
- verifyMultiKeyRow(new String[] { "A", "B"},
- new int[] { 2, 1 }, 2, 1, "2foo1");
- verifyMultiKeyRow(new String[] { "A", "B"},
- new int[] { 2, 2 }, 2, 2, "2foo2");
-
- }
-
- /**
- * Creates a table with two columns that together act as unique keys
- * and then modifies a subset of the rows via update.
- * @throws Exception
- */
- @Test
- public void testMultiKeyUpdateMultipleFilesNoUpdate() throws Exception {
- createMultiKeyTable(4);
-
- createMultiKeyUpdateFiles(2, 1, 1, new int[] {3, 4, 5});
-
- runExport(getArgv(true, 2, 2, "-m", "1",
- "--update-key", "A,B"));
- verifyRowCount(12);
- // Check a few rows...
- verifyMultiKeyRow(new String[] { "A", "B"},
- new int[] { 0, 0 }, 0, 0, "0foo0");
- verifyMultiKeyRow(new String[] { "A", "B"},
- new int[] { 0, 1 }, 0, 1, "0foo1");
- verifyMultiKeyRow(new String[] { "A", "B"},
- new int[] { 0, 2 }, 0, 2, "0foo2");
-
- verifyMultiKeyRow(new String[] { "A", "B"},
- new int[] { 1, 0 }, 1, 0, "1foo0");
- verifyMultiKeyRow(new String[] { "A", "B"},
- new int[] { 1, 1 }, 1, 1, "1foo1");
- verifyMultiKeyRow(new String[] { "A", "B"},
- new int[] { 1, 2 }, 1, 2, "1foo2");
-
- verifyMultiKeyRow(new String[] { "A", "B"},
- new int[] { 2, 0 }, 2, 0, "2foo0");
- verifyMultiKeyRow(new String[] { "A", "B"},
- new int[] { 2, 1 }, 2, 1, "2foo1");
- verifyMultiKeyRow(new String[] { "A", "B"},
- new int[] { 2, 2 }, 2, 2, "2foo2");
-
- verifyMultiKeyRow(new String[] { "A", "B"},
- new int[] { 3, 0 }, 3, 0, "3foo0");
- verifyMultiKeyRow(new String[] { "A", "B"},
- new int[] { 3, 1 }, 3, 1, "3foo1");
- verifyMultiKeyRow(new String[] { "A", "B"},
- new int[] { 3, 2 }, 3, 2, "3foo2");
- }
-
- /**
- * Creates a table with two columns that together act as unique keys
- * and then modifies a subset of the rows via update.
- * @throws Exception
- */
- @Test
- public void testMultiKeyUpdateMultipleFilesFullUpdate() throws Exception {
- createMultiKeyTable(4);
-
- createMultiKeyUpdateFiles(2, 2, 0, new int[] {0, 1, 2});
-
- runExport(getArgv(true, 2, 2, "-m", "1",
- "--update-key", "A,B"));
- verifyRowCount(12);
- // Check a few rows...
- verifyMultiKeyRow(new String[] { "A", "B"},
- new int[] { 0, 0 }, 0, 0, "0bar0");
- verifyMultiKeyRow(new String[] { "A", "B"},
- new int[] { 0, 1 }, 0, 1, "0bar1");
- verifyMultiKeyRow(new String[] { "A", "B"},
- new int[] { 0, 2 }, 0, 2, "0bar2");
-
- verifyMultiKeyRow(new String[] { "A", "B"},
- new int[] { 1, 0 }, 1, 0, "1bar0");
- verifyMultiKeyRow(new String[] { "A", "B"},
- new int[] { 1, 1 }, 1, 1, "1bar1");
- verifyMultiKeyRow(new String[] { "A", "B"},
- new int[] { 1, 2 }, 1, 2, "1bar2");
-
- verifyMultiKeyRow(new String[] { "A", "B"},
- new int[] { 2, 0 }, 2, 0, "2bar0");
- verifyMultiKeyRow(new String[] { "A", "B"},
- new int[] { 2, 1 }, 2, 1, "2bar1");
- verifyMultiKeyRow(new String[] { "A", "B"},
- new int[] { 2, 2 }, 2, 2, "2bar2");
-
- verifyMultiKeyRow(new String[] { "A", "B"},
- new int[] { 3, 0 }, 3, 0, "3bar0");
- verifyMultiKeyRow(new String[] { "A", "B"},
- new int[] { 3, 1 }, 3, 1, "3bar1");
- verifyMultiKeyRow(new String[] { "A", "B"},
- new int[] { 3, 2 }, 3, 2, "3bar2");
- }
-
- @Test
- public void testEmptyTable() throws Exception {
- // Test that an empty table will "accept" updates that modify
- // no rows; no new data is injected into the database.
- populateDatabase(0);
- createUpdateFiles(1, 10, 0, 0);
- runUpdate(1, "A");
- verifyRowCount(0);
- }
-
- @Test
- public void testEmptyFiles() throws Exception {
- // An empty input file results in no changes to a db table.
- populateDatabase(10);
- createUpdateFiles(1, 0, 0);
- runUpdate(1, "A");
- verifyRowCount(10);
- // Check that a few rows have not changed at all.
- verifyRow("A", "0", "0", "foo0", "0");
- verifyRow("A", "1", "1", "foo1", "1");
- verifyRow("A", "9", "9", "foo9", "9");
- }
-
- @Test
- public void testStringCol() throws Exception {
- // Test that we can do modifications based on the string "B" column.
- populateDatabase(10);
- createUpdateFiles(1, 10, 1);
- runUpdate(1, "B");
- verifyRowCount(10);
- verifyRow("B", "'foo0'", "0", "foo0", "0");
- verifyRow("B", "'foo1'", "2", "foo1", "2");
- verifyRow("B", "'foo9'", "18", "foo9", "18");
- }
-
- @Test
- public void testLastCol() throws Exception {
- // Test that we can do modifications based on the third int column.
- populateDatabase(10);
- createUpdateFiles(1, 10, 2);
- runUpdate(1, "C");
- verifyRowCount(10);
- verifyRow("C", "0", "0", "foo0", "0");
- verifyRow("C", "1", "2", "foo2", "1");
- verifyRow("C", "9", "18", "foo18", "9");
- }
-
- @Test
- public void testMultiMaps() throws Exception {
- // Test that we can handle multiple map tasks.
- populateDatabase(20);
- createUpdateFiles(2, 10, 0);
- runUpdate(1, "A");
- verifyRowCount(20);
- verifyRow("A", "0", "0", "foo0", "0");
- verifyRow("A", "1", "1", "foo2", "2");
- verifyRow("A", "9", "9", "foo18", "18");
- verifyRow("A", "10", "10", "foo20", "20");
- verifyRow("A", "15", "15", "foo30", "30");
- verifyRow("A", "19", "19", "foo38", "38");
- }
-
- @Test
- public void testSubsetUpdate() throws Exception {
- // Update only a few rows in the middle of the table.
- populateDatabase(10);
- createUpdateFiles(1, 5, 0, 3); // only rows A=3..7 change.
- runUpdate(1, "A");
- verifyRowCount(10);
-
- // Verify these rows are unchanged.
- verifyRow("A", "0", "0", "foo0", "0");
- verifyRow("A", "2", "2", "foo2", "2");
- verifyRow("A", "8", "8", "foo8", "8");
- verifyRow("A", "9", "9", "foo9", "9");
-
- // Verify these rows have been updated.
- verifyRow("A", "3", "3", "foo6", "6");
- verifyRow("A", "5", "5", "foo10", "10");
- verifyRow("A", "7", "7", "foo14", "14");
- }
-
- @Test
- public void testSubsetUpdate2() throws Exception {
- // Update only some of the rows in the db. Also include some
- // updates that do not affect actual rows in the table.
- // These should just be ignored.
-
- populateDatabase(10);
- // Create two files that update four rows each.
- // File0 updates A=-2..1 (-2 and -1 don't exist).
- // File1 updates A=8..11 (10 and 11 don't exist).
- createUpdateFiles(2, 4, 0, -2, 8);
- runUpdate(2, "A");
- verifyRowCount(10);
-
- // Verify these rows are unchanged.
- verifyRow("A", "4", "4", "foo4", "4");
- verifyRow("A", "7", "7", "foo7", "7");
-
- // Verify these updates succeeded.
- verifyRow("A", "1", "1", "foo2", "2");
- verifyRow("A", "8", "8", "foo16", "16");
- verifyRow("A", "9", "9", "foo18", "18");
- }
-
- /**
- * Test updating only subset of the columns.
- *
- * @throws Exception
- */
- @Test
- public void testUpdateColumnSubset() throws Exception {
- populateDatabase(4);
- createUpdateFiles(1, 3, 0);
-
- runExport(getArgv(true, 2, 2, "-m", "1",
- "--update-key", "A", "--columns", "A,B"));
-
- verifyRowCount(4);
-
- // First column should not have any changes (even though it was updated)
- verifyRow("A", "0", "0", "foo0", "0");
-
- // Second column have updated column B, but C should be left untouched
- verifyRow("A", "1", "1", "foo2", "1");
-
- // Third column have updated column B, but C should be left untouched
- verifyRow("A", "2", "2", "foo4", "2");
-
- // Last columns should be completely untouched
- verifyRow("A", "3", "3", "foo3", "3");
- }
-
- /**
- * Parameter --columns must be superset of --update-key in order for
- * CompilationManager and other parts of the framework work correctly.
- *
- * @throws Exception
- */
- @Test
- public void testUpdateColumnNotInColumns() throws Exception {
- populateDatabase(1);
-
- thrown.expect(IOException.class);
- thrown.reportMissingExceptionWithMessage("Expected IOException as --columns is not a superset of --update-key");
- runExport(getArgv(true, 2, 2, "-m", "1",
- "--update-key", "A", "--columns", "B"));
- }
-
-}
http://git-wip-us.apache.org/repos/asf/sqoop/blob/6984a36c/src/test/com/cloudera/sqoop/TestFreeFormQueryImport.java
----------------------------------------------------------------------
diff --git a/src/test/com/cloudera/sqoop/TestFreeFormQueryImport.java b/src/test/com/cloudera/sqoop/TestFreeFormQueryImport.java
deleted file mode 100644
index 4f9e652..0000000
--- a/src/test/com/cloudera/sqoop/TestFreeFormQueryImport.java
+++ /dev/null
@@ -1,159 +0,0 @@
-/**
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements. See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership. The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package com.cloudera.sqoop;
-
-import java.io.BufferedReader;
-import java.io.File;
-import java.io.FileInputStream;
-import java.io.IOException;
-import java.io.InputStreamReader;
-import java.sql.SQLException;
-import java.util.ArrayList;
-import java.util.List;
-
-import org.apache.commons.logging.Log;
-import org.apache.commons.logging.LogFactory;
-import org.apache.hadoop.fs.FSDataInputStream;
-import org.apache.hadoop.fs.FileSystem;
-import org.apache.hadoop.fs.Path;
-import org.apache.hadoop.io.IOUtils;
-
-import com.cloudera.sqoop.testutil.CommonArgs;
-import com.cloudera.sqoop.testutil.ImportJobTestCase;
-import org.junit.After;
-import org.junit.Test;
-
-import static org.junit.Assert.assertEquals;
-
-/**
- * Test free form query import.
- */
-public class TestFreeFormQueryImport extends ImportJobTestCase {
-
- private Log log;
-
- public TestFreeFormQueryImport() {
- this.log = LogFactory.getLog(TestFreeFormQueryImport.class.getName());
- }
-
- /**
- * @return the Log object to use for reporting during this test
- */
- protected Log getLogger() {
- return log;
- }
-
- /** the names of the tables we're creating. */
- private List<String> tableNames;
-
- @After
- public void tearDown() {
- // Clean up the database on our way out.
- for (String tableName : tableNames) {
- try {
- dropTableIfExists(tableName);
- } catch (SQLException e) {
- log.warn("Error trying to drop table '" + tableName
- + "' on tearDown: " + e);
- }
- }
- super.tearDown();
- }
-
- /**
- * Create the argv to pass to Sqoop.
- * @param splitByCol column of the table used to split work.
- * @param query free form query to be used.
- * @return the argv as an array of strings.
- */
- protected String [] getArgv(String splitByCol, String query) {
- ArrayList<String> args = new ArrayList<String>();
-
- CommonArgs.addHadoopFlags(args);
-
- args.add("--connect");
- args.add(getConnectString());
- args.add("--target-dir");
- args.add(getWarehouseDir());
- args.add("--split-by");
- args.add(splitByCol);
- args.add("--num-mappers");
- args.add("2");
- args.add("--query");
- args.add(query);
-
- return args.toArray(new String[0]);
- }
-
- /**
- * Create two tables that share the common id column. Run free-form query
- * import on the result table that is created by joining the two tables on
- * the id column.
- */
- @Test
- public void testSimpleJoin() throws IOException {
- tableNames = new ArrayList<String>();
-
- String [] types1 = { "SMALLINT", };
- String [] vals1 = { "1", };
- String tableName1 = getTableName();
- createTableWithColTypes(types1, vals1);
- tableNames.add(tableName1);
-
- incrementTableNum();
-
- String [] types2 = { "SMALLINT", "VARCHAR(32)", };
- String [] vals2 = { "1", "'foo'", };
- String tableName2 = getTableName();
- createTableWithColTypes(types2, vals2);
- tableNames.add(tableName2);
-
- String query = "SELECT "
- + tableName1 + "." + getColName(0) + ", "
- + tableName2 + "." + getColName(1) + " "
- + "FROM " + tableName1 + " JOIN " + tableName2 + " ON ("
- + tableName1 + "." + getColName(0) + " = "
- + tableName2 + "." + getColName(0) + ") WHERE "
- + tableName1 + "." + getColName(0) + " < 3 AND $CONDITIONS";
-
- runImport(getArgv(tableName1 + "." + getColName(0), query));
-
- Path warehousePath = new Path(this.getWarehouseDir());
- Path filePath = new Path(warehousePath, "part-m-00000");
- String expectedVal = "1,foo";
-
- BufferedReader reader = null;
- if (!isOnPhysicalCluster()) {
- reader = new BufferedReader(
- new InputStreamReader(new FileInputStream(
- new File(filePath.toString()))));
- } else {
- FileSystem dfs = FileSystem.get(getConf());
- FSDataInputStream dis = dfs.open(filePath);
- reader = new BufferedReader(new InputStreamReader(dis));
- }
- try {
- String line = reader.readLine();
- assertEquals("QueryResult expected a different string",
- expectedVal, line);
- } finally {
- IOUtils.closeStream(reader);
- }
- }
-}
http://git-wip-us.apache.org/repos/asf/sqoop/blob/6984a36c/src/test/com/cloudera/sqoop/TestIncrementalImport.java
----------------------------------------------------------------------
diff --git a/src/test/com/cloudera/sqoop/TestIncrementalImport.java b/src/test/com/cloudera/sqoop/TestIncrementalImport.java
deleted file mode 100644
index 1faa52b..0000000
--- a/src/test/com/cloudera/sqoop/TestIncrementalImport.java
+++ /dev/null
@@ -1,1348 +0,0 @@
-/**
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements. See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership. The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package com.cloudera.sqoop;
-
-import java.io.BufferedReader;
-import java.io.File;
-import java.io.IOException;
-import java.io.InputStreamReader;
-import java.sql.Connection;
-import java.sql.PreparedStatement;
-import java.sql.ResultSet;
-import java.sql.SQLException;
-import java.sql.Timestamp;
-import java.util.ArrayList;
-import java.util.Arrays;
-import java.util.List;
-
-import com.cloudera.sqoop.metastore.SavedJobsTestBase;
-import org.apache.commons.logging.Log;
-import org.apache.commons.logging.LogFactory;
-import org.apache.hadoop.conf.Configuration;
-import org.apache.hadoop.fs.FileStatus;
-import org.apache.hadoop.fs.FileSystem;
-import org.apache.hadoop.fs.Path;
-import org.apache.hadoop.util.StringUtils;
-import org.apache.sqoop.hive.HiveImport;
-import com.cloudera.sqoop.manager.ConnManager;
-import com.cloudera.sqoop.manager.HsqldbManager;
-import com.cloudera.sqoop.manager.ManagerFactory;
-import com.cloudera.sqoop.metastore.JobData;
-import com.cloudera.sqoop.testutil.BaseSqoopTestCase;
-import com.cloudera.sqoop.testutil.CommonArgs;
-import com.cloudera.sqoop.tool.ImportTool;
-import com.cloudera.sqoop.tool.JobTool;
-import org.apache.sqoop.metastore.AutoGenericJobStorage;
-import org.junit.Before;
-import org.junit.Rule;
-import org.junit.Test;
-import org.junit.rules.ExpectedException;
-
-
-import static org.junit.Assert.*;
-
-/**
- * Test the incremental import functionality.
- *
- * These all make use of the auto-connect hsqldb-based metastore.
- * The metastore URL is configured to be in-memory, and drop all
- * state between individual tests.
- */
-
-public class TestIncrementalImport {
-
- public static final Log LOG = LogFactory.getLog(
- TestIncrementalImport.class.getName());
-
- // What database do we read from.
- public static final String SOURCE_DB_URL = "jdbc:hsqldb:mem:incremental";
- public static final String AUTO_STORAGE_PASSWORD = "";
- public static final String AUTO_STORAGE_USERNAME = "SA";
-
- @Rule
- public ExpectedException thrown = ExpectedException.none();
-
- @Before
- public void setUp() throws Exception {
- resetSourceDataSchema();
- }
-
- public static void resetSourceDataSchema() throws SQLException {
- SqoopOptions options = new SqoopOptions();
- options.setConnectString(SOURCE_DB_URL);
- options.setUsername(AUTO_STORAGE_USERNAME);
- options.setPassword(AUTO_STORAGE_PASSWORD);
- SavedJobsTestBase.resetSchema(options);
- }
-
- public static Configuration newConf() {
- Configuration conf = new Configuration();
- conf.set(AutoGenericJobStorage.AUTO_STORAGE_USER_KEY, AUTO_STORAGE_USERNAME);
- conf.set(AutoGenericJobStorage.AUTO_STORAGE_PASS_KEY, AUTO_STORAGE_PASSWORD);
- conf.set(AutoGenericJobStorage.AUTO_STORAGE_CONNECT_STRING_KEY,
- SOURCE_DB_URL);
- return conf;
- }
-
- /**
- * Assert that a table has a specified number of rows.
- */
- private void assertRowCount(String table, int numRows) throws SQLException {
- SqoopOptions options = new SqoopOptions();
- options.setConnectString(SOURCE_DB_URL);
- HsqldbManager manager = new HsqldbManager(options);
- Connection c = manager.getConnection();
- PreparedStatement s = null;
- ResultSet rs = null;
- try {
- s = c.prepareStatement("SELECT COUNT(*) FROM " + manager.escapeTableName(table));
- rs = s.executeQuery();
- if (!rs.next()) {
- fail("No resultset");
- }
- int realNumRows = rs.getInt(1);
- assertEquals(numRows, realNumRows);
- LOG.info("Expected " + numRows + " rows -- ok.");
- } finally {
- if (null != s) {
- try {
- s.close();
- } catch (SQLException sqlE) {
- LOG.warn("exception: " + sqlE);
- }
- }
-
- if (null != rs) {
- try {
- rs.close();
- } catch (SQLException sqlE) {
- LOG.warn("exception: " + sqlE);
- }
- }
- }
- }
-
- /**
- * Insert rows with id = [low, hi) into tableName.
- */
- private void insertIdRows(String tableName, int low, int hi)
- throws SQLException {
- SqoopOptions options = new SqoopOptions();
- options.setConnectString(SOURCE_DB_URL);
- HsqldbManager manager = new HsqldbManager(options);
- Connection c = manager.getConnection();
- PreparedStatement s = null;
- try {
- s = c.prepareStatement("INSERT INTO " + manager.escapeTableName(tableName) + " VALUES(?)");
- for (int i = low; i < hi; i++) {
- s.setInt(1, i);
- s.executeUpdate();
- }
-
- c.commit();
- } finally {
- if(s != null) {
- s.close();
- }
- }
- }
-
- /**
- * Insert rows with id = [low, hi) into tableName with
- * the timestamp column set to the specified ts.
- */
- private void insertIdTimestampRows(String tableName, int low, int hi,
- Timestamp ts) throws SQLException {
- LOG.info("Inserting id rows in [" + low + ", " + hi + ") @ " + ts);
- SqoopOptions options = new SqoopOptions();
- options.setConnectString(SOURCE_DB_URL);
- HsqldbManager manager = new HsqldbManager(options);
- Connection c = manager.getConnection();
- PreparedStatement s = null;
- try {
- s = c.prepareStatement("INSERT INTO " + manager.escapeTableName(tableName) + " VALUES(?,?)");
- for (int i = low; i < hi; i++) {
- s.setInt(1, i);
- s.setTimestamp(2, ts);
- s.executeUpdate();
- }
-
- c.commit();
- } finally {
- s.close();
- }
- }
-
- /**
- * Insert rows with id = [low, hi) into tableName with
- * id converted to string.
- */
- private void insertIdVarcharRows(String tableName, int low, int hi)
- throws SQLException {
- LOG.info("Inserting rows in [" + low + ", " + hi + ")");
- SqoopOptions options = new SqoopOptions();
- options.setConnectString(SOURCE_DB_URL);
- HsqldbManager manager = new HsqldbManager(options);
- Connection c = manager.getConnection();
- PreparedStatement s = null;
- try {
- s = c.prepareStatement("INSERT INTO " + manager.escapeTableName(tableName) + " VALUES(?)");
- for (int i = low; i < hi; i++) {
- s.setString(1, Integer.toString(i));
- s.executeUpdate();
- }
- c.commit();
- } finally {
- s.close();
- }
- }
-
- /**
- * Create a table with an 'id' column full of integers.
- */
- private void createIdTable(String tableName, int insertRows)
- throws SQLException {
- SqoopOptions options = new SqoopOptions();
- options.setConnectString(SOURCE_DB_URL);
- HsqldbManager manager = new HsqldbManager(options);
- Connection c = manager.getConnection();
- PreparedStatement s = null;
- try {
- s = c.prepareStatement("CREATE TABLE " + manager.escapeTableName(tableName) + "(id INT NOT NULL)");
- s.executeUpdate();
- c.commit();
- insertIdRows(tableName, 0, insertRows);
- } finally {
- s.close();
- }
- }
-
- /**
- * Create a table with an 'id' column full of integers and a
- * last_modified column with timestamps.
- */
- private void createTimestampTable(String tableName, int insertRows,
- Timestamp baseTime) throws SQLException {
- SqoopOptions options = new SqoopOptions();
- options.setConnectString(SOURCE_DB_URL);
- HsqldbManager manager = new HsqldbManager(options);
- Connection c = manager.getConnection();
- PreparedStatement s = null;
- try {
- s = c.prepareStatement("CREATE TABLE " + manager.escapeTableName(tableName) + "(id INT NOT NULL, "
- + "last_modified TIMESTAMP)");
- s.executeUpdate();
- c.commit();
- insertIdTimestampRows(tableName, 0, insertRows, baseTime);
- } finally {
- s.close();
- }
- }
-
- /**
- * Create a table with an 'id' column of type varchar(20)
- */
- private void createIdVarcharTable(String tableName,
- int insertRows) throws SQLException {
- SqoopOptions options = new SqoopOptions();
- options.setConnectString(SOURCE_DB_URL);
- HsqldbManager manager = new HsqldbManager(options);
- Connection c = manager.getConnection();
- PreparedStatement s = null;
- try {
- s = c.prepareStatement("CREATE TABLE " + manager.escapeTableName(tableName) + "(id varchar(20) NOT NULL)");
- s.executeUpdate();
- c.commit();
- insertIdVarcharRows(tableName, 0, insertRows);
- } finally {
- s.close();
- }
- }
-
- /**
- * Delete all files in a directory for a table.
- */
- public void clearDir(String tableName) {
- try {
- FileSystem fs = FileSystem.getLocal(new Configuration());
- Path warehouse = new Path(BaseSqoopTestCase.LOCAL_WAREHOUSE_DIR);
- Path tableDir = new Path(warehouse, tableName);
- fs.delete(tableDir, true);
- } catch (Exception e) {
- fail("Got unexpected exception: " + StringUtils.stringifyException(e));
- }
- }
-
- /**
- * Look at a directory that should contain files full of an imported 'id'
- * column. Assert that all numbers in [0, expectedNums) are present
- * in order.
- */
- public void assertDirOfNumbers(String tableName, int expectedNums) {
- try {
- FileSystem fs = FileSystem.getLocal(new Configuration());
- Path warehouse = new Path(BaseSqoopTestCase.LOCAL_WAREHOUSE_DIR);
- Path tableDir = new Path(warehouse, tableName);
- FileStatus [] stats = fs.listStatus(tableDir);
- String [] fileNames = new String[stats.length];
- for (int i = 0; i < stats.length; i++) {
- fileNames[i] = stats[i].getPath().toString();
- }
-
- Arrays.sort(fileNames);
-
- // Read all the files in sorted order, adding the value lines to the list.
- List<String> receivedNums = new ArrayList<String>();
- for (String fileName : fileNames) {
- if (fileName.startsWith("_") || fileName.startsWith(".")) {
- continue;
- }
-
- BufferedReader r = new BufferedReader(
- new InputStreamReader(fs.open(new Path(fileName))));
- try {
- while (true) {
- String s = r.readLine();
- if (null == s) {
- break;
- }
-
- receivedNums.add(s.trim());
- }
- } finally {
- r.close();
- }
- }
-
- assertEquals(expectedNums, receivedNums.size());
-
- // Compare the received values with the expected set.
- for (int i = 0; i < expectedNums; i++) {
- assertEquals((int) i, (int) Integer.valueOf(receivedNums.get(i)));
- }
- } catch (Exception e) {
- fail("Got unexpected exception: " + StringUtils.stringifyException(e));
- }
- }
-
- /**
- * Look at a directory that should contain files full of an imported 'id'
- * column and 'last_modified' column. Assert that all numbers in [0, expectedNums) are present
- * in order.
- */
- public void assertDirOfNumbersAndTimestamps(String tableName, int expectedNums) {
- try {
- FileSystem fs = FileSystem.getLocal(new Configuration());
- Path warehouse = new Path(BaseSqoopTestCase.LOCAL_WAREHOUSE_DIR);
- Path tableDir = new Path(warehouse, tableName);
- FileStatus [] stats = fs.listStatus(tableDir);
- String [] fileNames = new String[stats.length];
- for (int i = 0; i < stats.length; i++) {
- fileNames[i] = stats[i].getPath().toString();
- }
-
- Arrays.sort(fileNames);
-
- // Read all the files in sorted order, adding the value lines to the list.
- List<String> receivedNums = new ArrayList<String>();
- for (String fileName : fileNames) {
- if (fileName.startsWith("_") || fileName.startsWith(".")) {
- continue;
- }
-
- BufferedReader r = new BufferedReader(
- new InputStreamReader(fs.open(new Path(fileName))));
- try {
- while (true) {
- String s = r.readLine();
- if (null == s) {
- break;
- }
-
- receivedNums.add(s.trim());
- }
- } finally {
- r.close();
- }
- }
-
- assertEquals(expectedNums, receivedNums.size());
-
- // Compare the received values with the expected set.
- for (int i = 0; i < expectedNums; i++) {
- assertEquals((int) i, (int) Integer.valueOf(receivedNums.get(i).split(",")[0]));
- }
- } catch (Exception e) {
- fail("Got unexpected exception: " + StringUtils.stringifyException(e));
- }
- }
-
- /**
- * Assert that a directory contains a file with exactly one line
- * in it, containing the prescribed number 'val'.
- */
- public void assertFirstSpecificNumber(String tableName, int val) {
- try {
- FileSystem fs = FileSystem.getLocal(new Configuration());
- Path warehouse = new Path(BaseSqoopTestCase.LOCAL_WAREHOUSE_DIR);
- Path tableDir = new Path(warehouse, tableName);
- FileStatus [] stats = fs.listStatus(tableDir);
- String [] filePaths = new String[stats.length];
- for (int i = 0; i < stats.length; i++) {
- filePaths[i] = stats[i].getPath().toString();
- }
-
- // Read the first file that is not a hidden file.
- boolean foundVal = false;
- for (String filePath : filePaths) {
- String fileName = new Path(filePath).getName();
- if (fileName.startsWith("_") || fileName.startsWith(".")) {
- continue;
- }
-
- if (foundVal) {
- // Make sure we don't have two or more "real" files in the dir.
- fail("Got an extra data-containing file in this directory.");
- }
-
- BufferedReader r = new BufferedReader(
- new InputStreamReader(fs.open(new Path(filePath))));
- try {
- String s = r.readLine();
- if (null == s) {
- fail("Unexpected empty file " + filePath + ".");
- }
- assertEquals(val, (int) Integer.valueOf(s.trim()));
-
- String nextLine = r.readLine();
- if (nextLine != null) {
- fail("Expected only one result, but got another line: " + nextLine);
- }
-
- // Successfully got the value we were looking for.
- foundVal = true;
- } finally {
- r.close();
- }
- }
- } catch (IOException e) {
- fail("Got unexpected exception: " + StringUtils.stringifyException(e));
- }
- }
-
- /**
- * Assert that a directory contains a file with exactly one line
- * in it, containing the prescribed number 'val'.
- */
- public void assertSpecificNumber(String tableName, int val) {
- try {
- FileSystem fs = FileSystem.getLocal(new Configuration());
- Path warehouse = new Path(BaseSqoopTestCase.LOCAL_WAREHOUSE_DIR);
- Path tableDir = new Path(warehouse, tableName);
- FileStatus [] stats = fs.listStatus(tableDir);
- String [] filePaths = new String[stats.length];
- for (int i = 0; i < stats.length; i++) {
- filePaths[i] = stats[i].getPath().toString();
- }
-
- // Read the first file that is not a hidden file.
- boolean foundVal = false;
- for (String filePath : filePaths) {
- String fileName = new Path(filePath).getName();
- if (fileName.startsWith("_") || fileName.startsWith(".")) {
- continue;
- }
-
- if (foundVal) {
- // Make sure we don't have two or more "real" files in the dir.
- fail("Got an extra data-containing file in this directory.");
- }
-
- BufferedReader r = new BufferedReader(
- new InputStreamReader(fs.open(new Path(filePath))));
- try {
- String s = r.readLine();
- if (val == (int) Integer.valueOf(s.trim().split(",")[0])) {
- if (foundVal) {
- fail("Expected only one result, but got another line: " + s);
- }
- foundVal = true;
- }
- } finally {
- r.close();
- }
- }
- } catch (IOException e) {
- fail("Got unexpected exception: " + StringUtils.stringifyException(e));
- }
- }
-
- public void runImport(SqoopOptions options, List<String> args) {
- try {
- Sqoop importer = new Sqoop(new ImportTool(), options.getConf(), options);
- int ret = Sqoop.runSqoop(importer, args.toArray(new String[0]));
- assertEquals("Failure during job", 0, ret);
- } catch (Exception e) {
- LOG.error("Got exception running Sqoop: "
- + StringUtils.stringifyException(e));
- throw new RuntimeException(e);
- }
- }
-
- /**
- * Return a list of arguments to import the specified table.
- */
- private List<String> getArgListForTable(String tableName, boolean commonArgs,
- boolean isAppend) {
- return getArgListForTable(tableName, commonArgs, isAppend, false);
- }
-
- /**
- * Return a list of arguments to import the specified table.
- */
- private List<String> getArgListForTable(String tableName, boolean commonArgs,
- boolean isAppend, boolean appendTimestamp) {
- List<String> args = new ArrayList<String>();
- if (commonArgs) {
- CommonArgs.addHadoopFlags(args);
- }
- args.add("--connect");
- args.add(SOURCE_DB_URL);
- args.add("--table");
- args.add(tableName);
- args.add("--warehouse-dir");
- args.add(BaseSqoopTestCase.LOCAL_WAREHOUSE_DIR);
- if (isAppend) {
- args.add("--incremental");
- args.add("append");
- if (!appendTimestamp) {
- args.add("--check-column");
- args.add("ID");
- } else {
- args.add("--check-column");
- args.add("LAST_MODIFIED");
- }
- } else {
- args.add("--incremental");
- args.add("lastmodified");
- args.add("--check-column");
- args.add("LAST_MODIFIED");
- }
- args.add("--columns");
- args.add("ID");
- args.add("-m");
- args.add("1");
-
- return args;
- }
-
- /**
- * Return list of arguments to import by query.
- * @return
- */
- private List<String> getArgListForQuery(String query, String directoryName,
- boolean commonArgs, boolean isAppend, boolean appendTimestamp) {
- List<String> args = new ArrayList<String>();
- if (commonArgs) {
- CommonArgs.addHadoopFlags(args);
- }
-
- String [] directoryNames = directoryName.split("/");
- String className = directoryNames[directoryNames.length -1];
-
- args.add("--connect");
- args.add(SOURCE_DB_URL);
- args.add("--query");
- args.add(query);
- args.add("--class-name");
- args.add(className);
- args.add("--target-dir");
- args.add(BaseSqoopTestCase.LOCAL_WAREHOUSE_DIR
- + System.getProperty("file.separator") + directoryName);
- if (isAppend) {
- args.add("--incremental");
- args.add("append");
- if (!appendTimestamp) {
- args.add("--check-column");
- args.add("ID");
- } else {
- args.add("--check-column");
- args.add("LAST_MODIFIED");
- }
- } else {
- args.add("--incremental");
- args.add("lastmodified");
- args.add("--check-column");
- args.add("LAST_MODIFIED");
- }
- args.add("-m");
- args.add("1");
-
- return args;
- }
- /**
- * Create a job with the specified name, where the job performs
- * an import configured with 'jobArgs'.
- */
- private void createJob(String jobName, List<String> jobArgs) {
- createJob(jobName, jobArgs, newConf());
- }
-
- /**
- * Create a job with the specified name, where the job performs
- * an import configured with 'jobArgs', using the provided configuration
- * as defaults.
- */
- private void createJob(String jobName, List<String> jobArgs,
- Configuration conf) {
- try {
- SqoopOptions options = new SqoopOptions();
- options.setConf(conf);
- Sqoop makeJob = new Sqoop(new JobTool(), conf, options);
-
- List<String> args = new ArrayList<String>();
- args.add("--create");
- args.add(jobName);
- args.add("--");
- args.add("import");
- args.addAll(jobArgs);
-
- int ret = Sqoop.runSqoop(makeJob, args.toArray(new String[0]));
- assertEquals("Failure to create job", 0, ret);
- } catch (Exception e) {
- LOG.error("Got exception running Sqoop to create job: "
- + StringUtils.stringifyException(e));
- throw new RuntimeException(e);
- }
- }
-
- /**
- * Run the specified job.
- */
- private void runJob(String jobName) {
- runJob(jobName, newConf());
- }
-
- /**
- * Run the specified job.
- */
- private void runJob(String jobName, Configuration conf) {
- try {
- SqoopOptions options = new SqoopOptions();
- options.setConf(conf);
- Sqoop runJob = new Sqoop(new JobTool(), conf, options);
-
- List<String> args = new ArrayList<String>();
- args.add("--exec");
- args.add(jobName);
-
- int ret = Sqoop.runSqoop(runJob, args.toArray(new String[0]));
- assertEquals("Failure to run job", 0, ret);
- } catch (Exception e) {
- LOG.error("Got exception running Sqoop to run job: "
- + StringUtils.stringifyException(e));
- throw new RuntimeException(e);
- }
- }
-
- // Incremental import of an empty table, no metastore.
- @Test
- public void testEmptyAppendImport() throws Exception {
- final String TABLE_NAME = "emptyAppend1";
- createIdTable(TABLE_NAME, 0);
- List<String> args = getArgListForTable(TABLE_NAME, true, true);
-
- Configuration conf = newConf();
- SqoopOptions options = new SqoopOptions();
- options.setConf(conf);
- runImport(options, args);
-
- assertDirOfNumbers(TABLE_NAME, 0);
- }
-
- // Incremental import of a filled table, no metastore.
- @Test
- public void testFullAppendImport() throws Exception {
- final String TABLE_NAME = "fullAppend1";
- createIdTable(TABLE_NAME, 10);
- List<String> args = getArgListForTable(TABLE_NAME, true, true);
-
- Configuration conf = newConf();
- SqoopOptions options = new SqoopOptions();
- options.setConf(conf);
- runImport(options, args);
-
- assertDirOfNumbers(TABLE_NAME, 10);
- }
-
- @Test
- public void testEmptyJobAppend() throws Exception {
- // Create a job and run an import on an empty table.
- // Nothing should happen.
-
- final String TABLE_NAME = "emptyJob";
- createIdTable(TABLE_NAME, 0);
-
- List<String> args = getArgListForTable(TABLE_NAME, false, true);
- createJob("emptyJob", args);
- runJob("emptyJob");
- assertDirOfNumbers(TABLE_NAME, 0);
-
- // Running the job a second time should result in
- // nothing happening, it's still empty.
- runJob("emptyJob");
- assertDirOfNumbers(TABLE_NAME, 0);
- }
-
- @Test
- public void testEmptyThenFullJobAppend() throws Exception {
- // Create an empty table. Import it; nothing happens.
- // Add some rows. Verify they are appended.
-
- final String TABLE_NAME = "emptyThenFull";
- createIdTable(TABLE_NAME, 0);
-
- List<String> args = getArgListForTable(TABLE_NAME, false, true);
- createJob(TABLE_NAME, args);
- runJob(TABLE_NAME);
- assertDirOfNumbers(TABLE_NAME, 0);
-
- // Now add some rows.
- insertIdRows(TABLE_NAME, 0, 10);
-
- // Running the job a second time should import 10 rows.
- runJob(TABLE_NAME);
- assertDirOfNumbers(TABLE_NAME, 10);
-
- // Add some more rows.
- insertIdRows(TABLE_NAME, 10, 20);
-
- // Import only those rows.
- runJob(TABLE_NAME);
- assertDirOfNumbers(TABLE_NAME, 20);
- }
-
- @Test
- public void testEmptyThenFullJobAppendWithQuery() throws Exception {
- // Create an empty table. Import it; nothing happens.
- // Add some rows. Verify they are appended.
-
- final String TABLE_NAME = "withQuery";
- createIdTable(TABLE_NAME, 0);
- clearDir(TABLE_NAME);
-
- final String QUERY = "SELECT id FROM \"withQuery\" WHERE $CONDITIONS";
-
- List<String> args = getArgListForQuery(QUERY, TABLE_NAME,
- false, true, false);
- createJob(TABLE_NAME, args);
- runJob(TABLE_NAME);
- assertDirOfNumbers(TABLE_NAME, 0);
-
- // Now add some rows.
- insertIdRows(TABLE_NAME, 0, 10);
-
- // Running the job a second time should import 10 rows.
- runJob(TABLE_NAME);
- assertDirOfNumbers(TABLE_NAME, 10);
-
- // Add some more rows.
- insertIdRows(TABLE_NAME, 10, 20);
-
- // Import only those rows.
- runJob(TABLE_NAME);
- assertDirOfNumbers(TABLE_NAME, 20);
- }
-
- @Test
- public void testAppend() throws Exception {
- // Create a table with data in it; import it.
- // Then add more data, verify that only the incremental data is pulled.
-
- final String TABLE_NAME = "append";
- createIdTable(TABLE_NAME, 10);
-
- List<String> args = getArgListForTable(TABLE_NAME, false, true);
- createJob(TABLE_NAME, args);
- runJob(TABLE_NAME);
- assertDirOfNumbers(TABLE_NAME, 10);
-
- // Add some more rows.
- insertIdRows(TABLE_NAME, 10, 20);
-
- // Import only those rows.
- runJob(TABLE_NAME);
- assertDirOfNumbers(TABLE_NAME, 20);
- }
-
- @Test
- public void testEmptyLastModified() throws Exception {
- final String TABLE_NAME = "emptyLastModified";
- createTimestampTable(TABLE_NAME, 0, null);
- List<String> args = getArgListForTable(TABLE_NAME, true, false);
-
- Configuration conf = newConf();
- SqoopOptions options = new SqoopOptions();
- options.setConf(conf);
- runImport(options, args);
-
- assertDirOfNumbers(TABLE_NAME, 0);
- }
-
- @Test
- public void testEmptyLastModifiedWithNonExistingParentDirectory() throws Exception {
- final String TABLE_NAME = "emptyLastModifiedNoParent";
- final String QUERY = "SELECT id, last_modified FROM \"" + TABLE_NAME + "\" WHERE $CONDITIONS";
- final String DIRECTORY = "non-existing/parents/" + TABLE_NAME;
- createTimestampTable(TABLE_NAME, 0, null);
- List<String> args = getArgListForQuery(QUERY, DIRECTORY, true, false, false);
-
- Configuration conf = newConf();
- SqoopOptions options = new SqoopOptions();
- options.setConf(conf);
- runImport(options, args);
-
- assertDirOfNumbers(DIRECTORY, 0);
- }
-
- @Test
- public void testFullLastModifiedImport() throws Exception {
- // Given a table of rows imported in the past,
- // see that they are imported.
- final String TABLE_NAME = "fullLastModified";
- Timestamp thePast = new Timestamp(System.currentTimeMillis() - 100);
- createTimestampTable(TABLE_NAME, 10, thePast);
-
- List<String> args = getArgListForTable(TABLE_NAME, true, false);
-
- Configuration conf = newConf();
- SqoopOptions options = new SqoopOptions();
- options.setConf(conf);
- runImport(options, args);
-
- assertDirOfNumbers(TABLE_NAME, 10);
- }
-
- @Test
- public void testNoImportFromTheFuture() throws Exception {
- // If last-modified dates for writes are serialized to be in the
- // future w.r.t. an import, do not import these rows.
-
- final String TABLE_NAME = "futureLastModified";
- Timestamp theFuture = new Timestamp(System.currentTimeMillis() + 1000000);
- createTimestampTable(TABLE_NAME, 10, theFuture);
-
- List<String> args = getArgListForTable(TABLE_NAME, true, false);
-
- Configuration conf = newConf();
- SqoopOptions options = new SqoopOptions();
- options.setConf(conf);
- runImport(options, args);
-
- assertDirOfNumbers(TABLE_NAME, 0);
- }
-
- @Test
- public void testEmptyJobLastMod() throws Exception {
- // Create a job and run an import on an empty table.
- // Nothing should happen.
-
- final String TABLE_NAME = "emptyJobLastMod";
- createTimestampTable(TABLE_NAME, 0, null);
-
- List<String> args = getArgListForTable(TABLE_NAME, false, false);
- args.add("--append");
- createJob("emptyJobLastMod", args);
- runJob("emptyJobLastMod");
- assertDirOfNumbers(TABLE_NAME, 0);
-
- // Running the job a second time should result in
- // nothing happening, it's still empty.
- runJob("emptyJobLastMod");
- assertDirOfNumbers(TABLE_NAME, 0);
- }
-
- @Test
- public void testEmptyThenFullJobLastMod() throws Exception {
- // Create an empty table. Import it; nothing happens.
- // Add some rows. Verify they are appended.
-
- final String TABLE_NAME = "emptyThenFullTimestamp";
- createTimestampTable(TABLE_NAME, 0, null);
-
- List<String> args = getArgListForTable(TABLE_NAME, false, false);
- args.add("--append");
- createJob(TABLE_NAME, args);
- runJob(TABLE_NAME);
- assertDirOfNumbers(TABLE_NAME, 0);
-
- long importWasBefore = System.currentTimeMillis();
-
- // Let some time elapse.
- Thread.sleep(50);
-
- long rowsAddedTime = System.currentTimeMillis() - 5;
-
- // Check: we are adding rows after the previous import time
- // and before the current time.
- assertTrue(rowsAddedTime > importWasBefore);
- assertTrue(rowsAddedTime < System.currentTimeMillis());
-
- insertIdTimestampRows(TABLE_NAME, 0, 10, new Timestamp(rowsAddedTime));
-
- // Running the job a second time should import 10 rows.
- runJob(TABLE_NAME);
- assertDirOfNumbers(TABLE_NAME, 10);
-
- // Add some more rows.
- importWasBefore = System.currentTimeMillis();
- Thread.sleep(50);
- rowsAddedTime = System.currentTimeMillis() - 5;
- assertTrue(rowsAddedTime > importWasBefore);
- assertTrue(rowsAddedTime < System.currentTimeMillis());
- insertIdTimestampRows(TABLE_NAME, 10, 20, new Timestamp(rowsAddedTime));
-
- // Import only those rows.
- runJob(TABLE_NAME);
- assertDirOfNumbers(TABLE_NAME, 20);
- }
-
- @Test
- public void testAppendWithTimestamp() throws Exception {
- // Create a table with data in it; import it.
- // Then add more data, verify that only the incremental data is pulled.
-
- final String TABLE_NAME = "appendTimestamp";
- Timestamp thePast = new Timestamp(System.currentTimeMillis() - 100);
- createTimestampTable(TABLE_NAME, 10, thePast);
-
- List<String> args = getArgListForTable(TABLE_NAME, false, false);
- args.add("--append");
- createJob(TABLE_NAME, args);
- runJob(TABLE_NAME);
- assertDirOfNumbers(TABLE_NAME, 10);
-
- // Add some more rows.
- long importWasBefore = System.currentTimeMillis();
- Thread.sleep(50);
- long rowsAddedTime = System.currentTimeMillis() - 5;
- assertTrue(rowsAddedTime > importWasBefore);
- assertTrue(rowsAddedTime < System.currentTimeMillis());
- insertIdTimestampRows(TABLE_NAME, 10, 20, new Timestamp(rowsAddedTime));
-
- // Import only those rows.
- runJob(TABLE_NAME);
- assertDirOfNumbers(TABLE_NAME, 20);
- }
-
- @Test
- public void testAppendWithString() throws Exception {
- // Create a table with string column in it;
- // incrementally import it on the string column - it should fail.
-
- final String TABLE_NAME = "appendString";
- createIdVarcharTable(TABLE_NAME, 10);
-
- List<String> args = getArgListForTable(TABLE_NAME, false, true);
- args.add("--append");
- createJob(TABLE_NAME, args);
-
- thrown.expect(RuntimeException.class);
- thrown.reportMissingExceptionWithMessage("Expected incremental import on varchar column to fail");
- runJob(TABLE_NAME);
- }
-
- @Test
- public void testModifyWithTimestamp() throws Exception {
- // Create a table with data in it; import it.
- // Then modify some existing rows, and verify that we only grab
- // those rows.
-
- final String TABLE_NAME = "modifyTimestamp";
- Timestamp thePast = new Timestamp(System.currentTimeMillis() - 100);
- createTimestampTable(TABLE_NAME, 10, thePast);
-
- List<String> args = getArgListForTable(TABLE_NAME, false, false);
- createJob(TABLE_NAME, args);
- runJob(TABLE_NAME);
- assertDirOfNumbers(TABLE_NAME, 10);
-
- // Modify a row.
- long importWasBefore = System.currentTimeMillis();
- Thread.sleep(50);
- long rowsAddedTime = System.currentTimeMillis() - 5;
- assertTrue(rowsAddedTime > importWasBefore);
- assertTrue(rowsAddedTime < System.currentTimeMillis());
- SqoopOptions options = new SqoopOptions();
- options.setConnectString(SOURCE_DB_URL);
- HsqldbManager manager = new HsqldbManager(options);
- Connection c = manager.getConnection();
- PreparedStatement s = null;
- try {
- s = c.prepareStatement("UPDATE " + manager.escapeTableName(TABLE_NAME) + " SET id=?, last_modified=? WHERE id=?");
- s.setInt(1, 4000); // the first row should have '4000' in it now.
- s.setTimestamp(2, new Timestamp(rowsAddedTime));
- s.setInt(3, 0);
- s.executeUpdate();
- c.commit();
- } finally {
- s.close();
- }
-
- // Import only the new row.
- clearDir(TABLE_NAME);
- runJob(TABLE_NAME);
- assertFirstSpecificNumber(TABLE_NAME, 4000);
- }
- @Test
- public void testUpdateModifyWithTimestamp() throws Exception {
- // Create a table with data in it; import it.
- // Then modify some existing rows, and verify that we only grab
- // those rows.
-
- final String TABLE_NAME = "updateModifyTimestamp";
- Timestamp thePast = new Timestamp(System.currentTimeMillis() - 100);
- createTimestampTable(TABLE_NAME, 10, thePast);
-
- List<String> args = getArgListForTable(TABLE_NAME, false, false);
-
- Configuration conf = newConf();
- SqoopOptions options = new SqoopOptions();
- options.setConf(conf);
- runImport(options, args);
- assertDirOfNumbers(TABLE_NAME, 10);
-
- // Modify a row.
- long importWasBefore = System.currentTimeMillis();
- Thread.sleep(50);
- long rowsAddedTime = System.currentTimeMillis() - 5;
- assertTrue(rowsAddedTime > importWasBefore);
- assertTrue(rowsAddedTime < System.currentTimeMillis());
- SqoopOptions options2 = new SqoopOptions();
- options2.setConnectString(SOURCE_DB_URL);
- HsqldbManager manager = new HsqldbManager(options2);
- Connection c = manager.getConnection();
- PreparedStatement s = null;
- try {
- s = c.prepareStatement("UPDATE " + manager.escapeTableName(TABLE_NAME) + " SET id=?, last_modified=? WHERE id=?");
- s.setInt(1, 4000); // the first row should have '4000' in it now.
- s.setTimestamp(2, new Timestamp(rowsAddedTime));
- s.setInt(3, 0);
- s.executeUpdate();
- c.commit();
- } finally {
- s.close();
- }
-
- // Update the new row.
- args.add("--last-value");
- args.add(new Timestamp(importWasBefore).toString());
- args.add("--merge-key");
- args.add("ID");
- conf = newConf();
- options = new SqoopOptions();
- options.setConf(conf);
- runImport(options, args);
- assertSpecificNumber(TABLE_NAME, 4000);
- }
-
- @Test
- public void testUpdateModifyWithTimestampWithQuery() throws Exception {
- // Create an empty table. Import it; nothing happens.
- // Add some rows. Verify they are appended.
-
- final String TABLE_NAME = "UpdateModifyWithTimestampWithQuery";
- Timestamp thePast = new Timestamp(System.currentTimeMillis() - 100);
- createTimestampTable(TABLE_NAME, 10, thePast);
-
- final String QUERY = "SELECT id, last_modified FROM \"UpdateModifyWithTimestampWithQuery\" WHERE $CONDITIONS";
-
- List<String> args = getArgListForQuery(QUERY, TABLE_NAME,
- true, false, false);
-
- Configuration conf = newConf();
- SqoopOptions options = new SqoopOptions();
- options.setConf(conf);
- runImport(options, args);
- assertDirOfNumbersAndTimestamps(TABLE_NAME, 10);
-
- // Modify a row.
- long importWasBefore = System.currentTimeMillis();
- Thread.sleep(50);
- long rowsAddedTime = System.currentTimeMillis() - 5;
- assertTrue(rowsAddedTime > importWasBefore);
- assertTrue(rowsAddedTime < System.currentTimeMillis());
- SqoopOptions options2 = new SqoopOptions();
- options2.setConnectString(SOURCE_DB_URL);
- HsqldbManager manager = new HsqldbManager(options2);
- Connection c = manager.getConnection();
- PreparedStatement s = null;
- try {
- s = c.prepareStatement("UPDATE " + manager.escapeTableName(TABLE_NAME) + " SET id=?, last_modified=? WHERE id=?");
- s.setInt(1, 4000); // the first row should have '4000' in it now.
- s.setTimestamp(2, new Timestamp(rowsAddedTime));
- s.setInt(3, 0);
- s.executeUpdate();
- c.commit();
- } finally {
- s.close();
- }
-
- // Update the new row.
- args.add("--last-value");
- args.add(new Timestamp(importWasBefore).toString());
- args.add("--merge-key");
- args.add("ID");
- conf = newConf();
- options = new SqoopOptions();
- options.setConf(conf);
- runImport(options, args);
- assertSpecificNumber(TABLE_NAME, 4000);
- }
-
- @Test
- public void testUpdateModifyWithTimestampJob() throws Exception {
- // Create a table with data in it; import it.
- // Then modify some existing rows, and verify that we only grab
- // those rows.
-
- final String TABLE_NAME = "updateModifyTimestampJob";
- Timestamp thePast = new Timestamp(System.currentTimeMillis() - 100);
- createTimestampTable(TABLE_NAME, 10, thePast);
-
- List<String> args = getArgListForTable(TABLE_NAME, false, false);
- args.add("--merge-key");
- args.add("ID");
- createJob(TABLE_NAME, args);
- runJob(TABLE_NAME);
- assertDirOfNumbers(TABLE_NAME, 10);
-
- // Modify a row.
- long importWasBefore = System.currentTimeMillis();
- Thread.sleep(50);
- long rowsAddedTime = System.currentTimeMillis() - 5;
- assertTrue(rowsAddedTime > importWasBefore);
- assertTrue(rowsAddedTime < System.currentTimeMillis());
- SqoopOptions options2 = new SqoopOptions();
- options2.setConnectString(SOURCE_DB_URL);
- HsqldbManager manager = new HsqldbManager(options2);
- Connection c = manager.getConnection();
- PreparedStatement s = null;
- try {
- s = c.prepareStatement("UPDATE " + manager.escapeTableName(TABLE_NAME) + " SET id=?, last_modified=? WHERE id=?");
- s.setInt(1, 4000); // the first row should have '4000' in it now.
- s.setTimestamp(2, new Timestamp(rowsAddedTime));
- s.setInt(3, 0);
- s.executeUpdate();
- c.commit();
- } finally {
- s.close();
- }
-
- // Update the new row.
- runJob(TABLE_NAME);
- assertSpecificNumber(TABLE_NAME, 4000);
- }
-
- /**
- * ManagerFactory returning an HSQLDB ConnManager which allows you to
- * specify the current database timestamp.
- */
- public static class InstrumentHsqldbManagerFactory extends ManagerFactory {
- @Override
- public ConnManager accept(JobData data) {
- LOG.info("Using instrumented manager");
- return new InstrumentHsqldbManager(data.getSqoopOptions());
- }
- }
-
- /**
- * Hsqldb ConnManager that lets you set the current reported timestamp
- * from the database, to allow testing of boundary conditions for imports.
- */
- public static class InstrumentHsqldbManager extends HsqldbManager {
- private static Timestamp curTimestamp;
-
- public InstrumentHsqldbManager(SqoopOptions options) {
- super(options);
- }
-
- @Override
- public Timestamp getCurrentDbTimestamp() {
- return InstrumentHsqldbManager.curTimestamp;
- }
-
- public static void setCurrentDbTimestamp(Timestamp t) {
- InstrumentHsqldbManager.curTimestamp = t;
- }
- }
-
- @Test
- public void testTimestampBoundary() throws Exception {
- // Run an import, and then insert rows with the last-modified timestamp
- // set to the exact time when the first import runs. Run a second import
- // and ensure that we pick up the new data.
-
- long now = System.currentTimeMillis();
-
- final String TABLE_NAME = "boundaryTimestamp";
- Timestamp thePast = new Timestamp(now - 100);
- createTimestampTable(TABLE_NAME, 10, thePast);
-
- Timestamp firstJobTime = new Timestamp(now);
- InstrumentHsqldbManager.setCurrentDbTimestamp(firstJobTime);
-
- // Configure the job to use the instrumented Hsqldb manager.
- Configuration conf = newConf();
- conf.set(ConnFactory.FACTORY_CLASS_NAMES_KEY,
- InstrumentHsqldbManagerFactory.class.getName());
-
- List<String> args = getArgListForTable(TABLE_NAME, false, false);
- args.add("--append");
- createJob(TABLE_NAME, args, conf);
- runJob(TABLE_NAME);
- assertDirOfNumbers(TABLE_NAME, 10);
-
- // Add some more rows with the timestamp equal to the job run timestamp.
- insertIdTimestampRows(TABLE_NAME, 10, 20, firstJobTime);
- assertRowCount(TABLE_NAME, 20);
-
- // Run a second job with the clock advanced by 100 ms.
- Timestamp secondJobTime = new Timestamp(now + 100);
- InstrumentHsqldbManager.setCurrentDbTimestamp(secondJobTime);
-
- // Import only those rows.
- runJob(TABLE_NAME);
- assertDirOfNumbers(TABLE_NAME, 20);
- }
-
- @Test
- public void testIncrementalAppendTimestamp() throws Exception {
- // Run an import, and then insert rows with the last-modified timestamp
- // set to the exact time when the first import runs. Run a second import
- // and ensure that we pick up the new data.
-
- long now = System.currentTimeMillis();
-
- final String TABLE_NAME = "incrementalAppendTimestamp";
- Timestamp thePast = new Timestamp(now - 100);
- createTimestampTable(TABLE_NAME, 10, thePast);
-
- Timestamp firstJobTime = new Timestamp(now);
- InstrumentHsqldbManager.setCurrentDbTimestamp(firstJobTime);
-
- // Configure the job to use the instrumented Hsqldb manager.
- Configuration conf = newConf();
- conf.set(ConnFactory.FACTORY_CLASS_NAMES_KEY,
- InstrumentHsqldbManagerFactory.class.getName());
-
- List<String> args = getArgListForTable(TABLE_NAME, false, true, true);
- createJob(TABLE_NAME, args, conf);
- runJob(TABLE_NAME);
- assertDirOfNumbers(TABLE_NAME, 10);
-
- // Add some more rows with the timestamp equal to the job run timestamp.
- insertIdTimestampRows(TABLE_NAME, 10, 20, firstJobTime);
- assertRowCount(TABLE_NAME, 20);
-
- // Run a second job with the clock advanced by 100 ms.
- Timestamp secondJobTime = new Timestamp(now + 100);
- InstrumentHsqldbManager.setCurrentDbTimestamp(secondJobTime);
-
- // Import only those rows.
- runJob(TABLE_NAME);
- assertDirOfNumbers(TABLE_NAME, 20);
- }
- @Test
- public void testIncrementalHiveAppendEmptyThenFull() throws Exception {
- // This is to test Incremental Hive append feature. SQOOP-2470
- final String TABLE_NAME = "incrementalHiveAppendEmptyThenFull";
- Configuration conf = newConf();
- conf.set(ConnFactory.FACTORY_CLASS_NAMES_KEY,
- InstrumentHsqldbManagerFactory.class.getName());
- clearDir(TABLE_NAME);
- createIdTable(TABLE_NAME, 0);
- List<String> args = new ArrayList<String>();
- args.add("--connect");
- args.add(SOURCE_DB_URL);
- args.add("--table");
- args.add(TABLE_NAME);
- args.add("--warehouse-dir");
- args.add(BaseSqoopTestCase.LOCAL_WAREHOUSE_DIR);
- args.add("--hive-import");
- args.add("--hive-table");
- args.add(TABLE_NAME + "hive");
- args.add("--incremental");
- args.add("append");
- args.add("--check-column");
- args.add("ID");
- args.add("-m");
- args.add("1");
- createJob(TABLE_NAME, args, conf);
- HiveImport.setTestMode(true);
- String hiveHome = org.apache.sqoop.SqoopOptions.getHiveHomeDefault();
- assertNotNull("hive.home was not set", hiveHome);
- String testDataPath = new Path(new Path(hiveHome), "scripts/"
- + "incrementalHiveAppendEmpty.q").toString();
- System.clearProperty("expected.script");
- System.setProperty("expected.script",
- new File(testDataPath).getAbsolutePath());
- runJob(TABLE_NAME);
- assertDirOfNumbers(TABLE_NAME, 0);
- // Now add some rows.
- insertIdRows(TABLE_NAME, 0, 10);
- String testDataPath10 = new Path(new Path(hiveHome), "scripts/"
- + "incrementalHiveAppend10.q").toString();
- System.clearProperty("expected.script");
- System.setProperty("expected.script",
- new File(testDataPath10).getAbsolutePath());
- System.getProperty("expected.script");
- // Running the job a second time should import 10 rows.
- runJob(TABLE_NAME);
- assertDirOfNumbers(TABLE_NAME, 10);
- // Add some more rows.
- insertIdRows(TABLE_NAME, 10, 20);
- String testDataPath20 = new Path(new Path(hiveHome), "scripts/"
- + "incrementalHiveAppend20.q").toString();
- System.clearProperty("expected.script");
- System.setProperty("expected.script",
- new File(testDataPath20).getAbsolutePath());
- // Import only those rows.
- runJob(TABLE_NAME);
- assertDirOfNumbers(TABLE_NAME, 20);
- }
-
- // SQOOP-1890
- @Test
- public void testTableNameWithSpecialCharacters() throws Exception {
- // Table name with special characters to verify proper table name escaping
- final String TABLE_NAME = "my-table.ext";
- createIdTable(TABLE_NAME, 0);
-
- // Now add some rows.
- insertIdRows(TABLE_NAME, 0, 10);
-
- List<String> args = getArgListForTable(TABLE_NAME, false, true);
- createJob("emptyJob", args);
- runJob("emptyJob");
- assertDirOfNumbers(TABLE_NAME, 10);
- }
-
-}
-
http://git-wip-us.apache.org/repos/asf/sqoop/blob/6984a36c/src/test/com/cloudera/sqoop/TestMerge.java
----------------------------------------------------------------------
diff --git a/src/test/com/cloudera/sqoop/TestMerge.java b/src/test/com/cloudera/sqoop/TestMerge.java
deleted file mode 100644
index 9639f84..0000000
--- a/src/test/com/cloudera/sqoop/TestMerge.java
+++ /dev/null
@@ -1,379 +0,0 @@
-/**
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements. See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership. The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package com.cloudera.sqoop;
-
-import java.io.BufferedReader;
-import java.io.IOException;
-import java.io.InputStreamReader;
-import java.sql.Connection;
-import java.sql.Timestamp;
-import java.sql.PreparedStatement;
-import java.sql.SQLException;
-import java.util.Arrays;
-import java.util.LinkedList;
-import java.util.List;
-import com.cloudera.sqoop.testutil.CommonArgs;
-import com.cloudera.sqoop.testutil.HsqldbTestServer;
-import com.cloudera.sqoop.SqoopOptions.FileLayout;
-import com.cloudera.sqoop.SqoopOptions.IncrementalMode;
-import com.cloudera.sqoop.manager.ConnManager;
-import com.cloudera.sqoop.testutil.BaseSqoopTestCase;
-import com.cloudera.sqoop.tool.CodeGenTool;
-import com.cloudera.sqoop.tool.ImportTool;
-import com.cloudera.sqoop.tool.MergeTool;
-import com.cloudera.sqoop.util.ClassLoaderStack;
-import org.apache.avro.file.DataFileReader;
-import org.apache.avro.file.FileReader;
-import org.apache.avro.file.SeekableInput;
-import org.apache.avro.generic.GenericDatumReader;
-import org.apache.avro.generic.GenericRecord;
-import org.apache.avro.io.DatumReader;
-import org.apache.avro.mapred.FsInput;
-import org.apache.commons.lang.StringUtils;
-import org.apache.commons.logging.Log;
-import org.apache.commons.logging.LogFactory;
-import org.apache.hadoop.conf.Configuration;
-import org.apache.hadoop.fs.FileStatus;
-import org.apache.hadoop.fs.FileSystem;
-import org.apache.hadoop.fs.Path;
-import org.junit.Before;
-import org.junit.Test;
-import org.kitesdk.data.Dataset;
-import org.kitesdk.data.DatasetReader;
-import org.kitesdk.data.Datasets;
-
-import static org.apache.avro.generic.GenericData.Record;
-import static org.junit.Assert.fail;
-
-/**
- * Test that the merge tool works.
- */
-public class TestMerge extends BaseSqoopTestCase {
-
- private static final Log LOG =
- LogFactory.getLog(TestMerge.class.getName());
-
- protected ConnManager manager;
- protected Connection conn;
-
- public static final String SOURCE_DB_URL = "jdbc:hsqldb:mem:merge";
-
- private static final List<List<Integer>> initRecords = Arrays
- .asList(Arrays.asList(new Integer(0), new Integer(0)),
- Arrays.asList(new Integer(1), new Integer(42)));
-
- private static final List<List<Integer>> newRecords = Arrays.asList(
- Arrays.asList(new Integer(1), new Integer(43)),
- Arrays.asList(new Integer(3), new Integer(313)));
-
- private static final List<List<Integer>> mergedRecords = Arrays.asList(
- Arrays.asList(new Integer(0), new Integer(0)),
- Arrays.asList(new Integer(1), new Integer(43)),
- Arrays.asList(new Integer(3), new Integer(313)));
-
- @Before
- public void setUp() {
- super.setUp();
- manager = getManager();
- try {
- conn = manager.getConnection();
- } catch (SQLException e) {
- e.printStackTrace();
- throw new RuntimeException(e);
- }
- }
-
- public static final String TABLE_NAME = "MergeTable";
- private static final String OLD_PATH = "merge_old";
- private static final String NEW_PATH = "merge_new";
- private static final String FINAL_PATH = "merge_final";
-
- public Configuration newConf() {
- Configuration conf = new Configuration();
- if (!BaseSqoopTestCase.isOnPhysicalCluster()) {
- conf.set(CommonArgs.FS_DEFAULT_NAME, CommonArgs.LOCAL_FS);
- }
- conf.set("mapred.job.tracker", "local");
- return conf;
- }
-
- /**
- * Create a SqoopOptions to connect to the manager.
- */
- public SqoopOptions getSqoopOptions(Configuration conf) {
- SqoopOptions options = new SqoopOptions(conf);
- options.setConnectString(HsqldbTestServer.getDbUrl());
-
- return options;
- }
-
- protected void createTable(List<List<Integer>> records) throws SQLException {
- PreparedStatement s = conn.prepareStatement("DROP TABLE \"" + TABLE_NAME + "\" IF EXISTS");
- try {
- s.executeUpdate();
- } finally {
- s.close();
- }
-
- s = conn.prepareStatement("CREATE TABLE \"" + TABLE_NAME
- + "\" (id INT NOT NULL PRIMARY KEY, val INT, LASTMOD timestamp)");
- try {
- s.executeUpdate();
- } finally {
- s.close();
- }
-
- for (List<Integer> record : records) {
- final String values = StringUtils.join(record, ", ");
- s = conn
- .prepareStatement("INSERT INTO \"" + TABLE_NAME + "\" VALUES (" + values + ", now())");
- try {
- s.executeUpdate();
- } finally {
- s.close();
- }
- }
-
- conn.commit();
- }
-
- @Test
- public void testTextFileMerge() throws Exception {
- runMergeTest(SqoopOptions.FileLayout.TextFile);
- }
-
- @Test
- public void testAvroFileMerge() throws Exception {
- runMergeTest(SqoopOptions.FileLayout.AvroDataFile);
- }
-
- @Test
- public void testParquetFileMerge() throws Exception {
- runMergeTest(SqoopOptions.FileLayout.ParquetFile);
- }
-
- public void runMergeTest(SqoopOptions.FileLayout fileLayout) throws Exception {
- createTable(initRecords);
-
- // Create a jar to use for the merging process; we'll load it
- // into the current thread CL for when this runs. This needs
- // to contain a different class name than used for the imports
- // due to classloaderstack issues in the same JVM.
- final String MERGE_CLASS_NAME = "ClassForMerging";
- SqoopOptions options = getSqoopOptions(newConf());
- options.setTableName(TABLE_NAME);
- options.setClassName(MERGE_CLASS_NAME);
-
- CodeGenTool codeGen = new CodeGenTool();
- Sqoop codeGenerator = new Sqoop(codeGen, options.getConf(), options);
- int ret = Sqoop.runSqoop(codeGenerator, new String[0]);
- if (0 != ret) {
- fail("Nonzero exit from codegen: " + ret);
- }
-
- List<String> jars = codeGen.getGeneratedJarFiles();
- String jarFileName = jars.get(0);
-
- // Now do the imports.
- importData(OLD_PATH, fileLayout);
-
- // Check that we got records that meet our expected values.
- checkData(OLD_PATH, initRecords, fileLayout);
-
- Thread.sleep(25);
-
- // Modify the data in the warehouse.
- createTable(newRecords);
-
- Thread.sleep(25);
-
- // Do another import, into the "new" dir.
- importData(NEW_PATH, fileLayout);
-
- checkData(NEW_PATH, newRecords, fileLayout);
-
- // Now merge the results!
- ClassLoaderStack.addJarFile(jarFileName, MERGE_CLASS_NAME);
- Path warehouse = new Path(BaseSqoopTestCase.LOCAL_WAREHOUSE_DIR);
- options = getSqoopOptions(newConf());
- options.setMergeOldPath(new Path(warehouse, OLD_PATH).toString());
- options.setMergeNewPath(new Path(warehouse, NEW_PATH).toString());
- options.setMergeKeyCol("ID");
- options.setTargetDir(new Path(warehouse, FINAL_PATH).toString());
- options.setClassName(MERGE_CLASS_NAME);
- options.setExistingJarName(jarFileName);
-
- MergeTool mergeTool = new MergeTool();
- Sqoop merger = new Sqoop(mergeTool, options.getConf(), options);
- ret = Sqoop.runSqoop(merger, new String[0]);
- if (0 != ret) {
- fail("Merge failed with exit code " + ret);
- }
-
- checkData(FINAL_PATH, mergedRecords, fileLayout);
- }
-
- private void checkData(String dataDir, List<List<Integer>> records,
- SqoopOptions.FileLayout fileLayout) throws Exception {
- for (List<Integer> record : records) {
- assertRecordStartsWith(record, dataDir, fileLayout);
- }
- }
-
- private boolean valueMatches(GenericRecord genericRecord, List<Integer> recordVals) {
- return recordVals.get(0).equals(genericRecord.get(0))
- && recordVals.get(1).equals(genericRecord.get(1));
- }
-
- private void importData(String targetDir, SqoopOptions.FileLayout fileLayout) {
- SqoopOptions options;
- options = getSqoopOptions(newConf());
- options.setTableName(TABLE_NAME);
- options.setNumMappers(1);
- options.setFileLayout(fileLayout);
- options.setDeleteMode(true);
-
- Path warehouse = new Path(BaseSqoopTestCase.LOCAL_WAREHOUSE_DIR);
- options.setTargetDir(new Path(warehouse, targetDir).toString());
-
- ImportTool importTool = new ImportTool();
- Sqoop importer = new Sqoop(importTool, options.getConf(), options);
- int ret = Sqoop.runSqoop(importer, new String[0]);
- if (0 != ret) {
- fail("Initial import failed with exit code " + ret);
- }
- }
-
- /**
- * @return true if the file specified by path 'p' contains a line
- * that starts with 'prefix'
- */
- protected boolean checkTextFileForLine(FileSystem fs, Path p, List<Integer> record)
- throws IOException {
- final String prefix = StringUtils.join(record, ',');
- BufferedReader r = new BufferedReader(new InputStreamReader(fs.open(p)));
- try {
- while (true) {
- String in = r.readLine();
- if (null == in) {
- break; // done with the file.
- }
-
- if (in.startsWith(prefix)) {
- return true;
- }
- }
- } finally {
- r.close();
- }
-
- return false;
- }
-
- private boolean checkAvroFileForLine(FileSystem fs, Path p, List<Integer> record)
- throws IOException {
- SeekableInput in = new FsInput(p, new Configuration());
- DatumReader<GenericRecord> datumReader = new GenericDatumReader<GenericRecord>();
- FileReader<GenericRecord> reader = DataFileReader.openReader(in, datumReader);
- reader.sync(0);
-
- while (reader.hasNext()) {
- if (valueMatches(reader.next(), record)) {
- return true;
- }
- }
-
- return false;
- }
-
- private boolean checkParquetFileForLine(FileSystem fileSystem, Path path, List<Integer> record) throws IOException
- {
- Dataset<Record> parquetRecords = Datasets.load("dataset:" + path.getParent(), Record.class);
- DatasetReader<Record> datasetReader = null;
- try {
- datasetReader = parquetRecords.newReader();
- for (GenericRecord genericRecord : datasetReader) {
- if (valueMatches(genericRecord, record)) {
- return true;
- }
- }
- }
- finally {
- if (datasetReader != null) {
- datasetReader.close();
- }
- }
-
- return false;
- }
-
- protected boolean checkFileForLine(FileSystem fs, Path p, SqoopOptions.FileLayout fileLayout,
- List<Integer> record) throws IOException {
- boolean result = false;
- switch (fileLayout) {
- case TextFile:
- result = checkTextFileForLine(fs, p, record);
- break;
- case AvroDataFile:
- result = checkAvroFileForLine(fs, p, record);
- break;
- case ParquetFile:
- result = checkParquetFileForLine(fs, p, record);
- break;
- }
- return result;
- }
-
- /**
- * Return true if there's a file in 'dirName' with a line that starts with
- * 'prefix'.
- */
- protected boolean recordStartsWith(List<Integer> record, String dirName,
- SqoopOptions.FileLayout fileLayout)
- throws Exception {
- Path warehousePath = new Path(LOCAL_WAREHOUSE_DIR);
- Path targetPath = new Path(warehousePath, dirName);
-
- FileSystem fs = FileSystem.getLocal(new Configuration());
- FileStatus [] files = fs.listStatus(targetPath);
-
- if (null == files || files.length == 0) {
- fail("Got no import files!");
- }
-
- for (FileStatus stat : files) {
- Path p = stat.getPath();
- if (p.getName().startsWith("part-") || p.getName().endsWith(".parquet")) {
- if (checkFileForLine(fs, p, fileLayout, record)) {
- // We found the line. Nothing further to do.
- return true;
- }
- }
- }
-
- return false;
- }
-
- protected void assertRecordStartsWith(List<Integer> record, String dirName,
- SqoopOptions.FileLayout fileLayout) throws Exception {
- if (!recordStartsWith(record, dirName, fileLayout)) {
- fail("No record found that starts with [" + StringUtils.join(record, ", ") + "] in " + dirName);
- }
- }
-}