You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@sqoop.apache.org by ja...@apache.org on 2013/03/15 06:01:50 UTC
git commit: SQOOP-929: Add more Netezza direct mode tests
Updated Branches:
refs/heads/trunk 05976e709 -> b4b9920c4
SQOOP-929: Add more Netezza direct mode tests
(Venkat Ranganathan via Jarek Jarcec Cecho)
Project: http://git-wip-us.apache.org/repos/asf/sqoop/repo
Commit: http://git-wip-us.apache.org/repos/asf/sqoop/commit/b4b9920c
Tree: http://git-wip-us.apache.org/repos/asf/sqoop/tree/b4b9920c
Diff: http://git-wip-us.apache.org/repos/asf/sqoop/diff/b4b9920c
Branch: refs/heads/trunk
Commit: b4b9920c4ffb8980b285538186fc90ae689cee2e
Parents: 05976e7
Author: Jarek Jarcec Cecho <ja...@apache.org>
Authored: Thu Mar 14 22:01:09 2013 -0700
Committer: Jarek Jarcec Cecho <ja...@apache.org>
Committed: Thu Mar 14 22:01:09 2013 -0700
----------------------------------------------------------------------
.../manager/DirectNetezzaExportManualTest.java | 260 ++++---------
.../sqoop/manager/NetezzaExportManualTest.java | 246 +++++++++++++
.../sqoop/manager/NetezzaImportManualTest.java | 283 +++++++++++----
.../cloudera/sqoop/manager/NetezzaTestUtils.java | 2 +-
4 files changed, 536 insertions(+), 255 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/sqoop/blob/b4b9920c/src/test/com/cloudera/sqoop/manager/DirectNetezzaExportManualTest.java
----------------------------------------------------------------------
diff --git a/src/test/com/cloudera/sqoop/manager/DirectNetezzaExportManualTest.java b/src/test/com/cloudera/sqoop/manager/DirectNetezzaExportManualTest.java
index bbcd138..938ffc5 100644
--- a/src/test/com/cloudera/sqoop/manager/DirectNetezzaExportManualTest.java
+++ b/src/test/com/cloudera/sqoop/manager/DirectNetezzaExportManualTest.java
@@ -18,56 +18,30 @@
package com.cloudera.sqoop.manager;
-import java.io.BufferedWriter;
import java.io.IOException;
-import java.io.OutputStream;
-import java.io.OutputStreamWriter;
-import java.sql.Connection;
-import java.sql.PreparedStatement;
-import java.sql.ResultSet;
-import java.sql.Statement;
import java.sql.SQLException;
+import java.util.Arrays;
import org.apache.commons.logging.Log;
import org.apache.commons.logging.LogFactory;
-import org.apache.hadoop.conf.Configuration;
-import org.apache.hadoop.fs.FileSystem;
-import org.apache.hadoop.fs.Path;
import org.apache.sqoop.manager.DirectNetezzaManager;
import org.junit.After;
import org.junit.Before;
+import org.junit.Test;
+
import com.cloudera.sqoop.SqoopOptions;
-import com.cloudera.sqoop.TestExport;
+import com.cloudera.sqoop.TestExport.ColumnGenerator;
/**
* Test the DirectNetezzaManager implementation's exportJob() functionality.
*/
-public class DirectNetezzaExportManualTest extends TestExport {
-
- public static final Log LOG = LogFactory.getLog(
- DirectNetezzaExportManualTest.class.getName());
-
- static final String TABLE_PREFIX = "EMPNZ";
+public class DirectNetezzaExportManualTest extends NetezzaExportManualTest {
- // instance variables populated during setUp, used during tests.
- private DirectNetezzaManager manager;
- private Connection conn;
+ public static final Log LOG = LogFactory
+ .getLog(DirectNetezzaExportManualTest.class.getName());
- @Override
- protected Connection getConnection() {
- return conn;
- }
-
- @Override
- protected boolean useHsqldbTestServer() {
- return false;
- }
-
- @Override
- protected String getConnectString() {
- return NetezzaTestUtils.getNZConnectString();
- }
+ static final String TABLE_PREFIX = "EMPNZ_D_EXP";
@Override
protected String getTablePrefix() {
@@ -75,50 +49,14 @@ public class DirectNetezzaExportManualTest extends TestExport {
}
@Override
- protected String getDropTableStatement(String tableName) {
- return "DROP TABLE " + tableName;
- }
-
- @Before
- public void setUp() {
- super.setUp();
- conn = getConnection();
- SqoopOptions options = new SqoopOptions(
- NetezzaTestUtils.getNZConnectString(), getTableName());
- options.setUsername(NetezzaTestUtils.getNZUser());
- options.setPassword(NetezzaTestUtils.getNZPassword());
- this.manager = new DirectNetezzaManager(options);
-
- try {
- this.conn = manager.getConnection();
- this.conn.setAutoCommit(false);
- } catch (SQLException sqlE) {
- LOG.error("Encountered SQL Exception: " + sqlE);
- sqlE.printStackTrace();
- fail("SQLException when running test setUp(): " + sqlE);
- }
- }
-
- @After
- public void tearDown() {
- super.tearDown();
- if (null != manager) {
- try {
- manager.close();
- } catch (SQLException sqlE) {
- LOG.error("Got SQLException: " + sqlE.toString());
- fail("Got SQLException: " + sqlE.toString());
- }
- }
- this.conn = null;
- this.manager = null;
-
+ protected boolean isDirectMode() {
+ return true;
}
@Override
- protected String [] getCodeGenArgv(String... extraArgs) {
+ protected String[] getCodeGenArgv(String... extraArgs) {
- String [] moreArgs = new String[extraArgs.length + 4];
+ String[] moreArgs = new String[extraArgs.length + 4];
int i = 0;
for (i = 0; i < extraArgs.length; i++) {
moreArgs[i] = extraArgs[i];
@@ -134,153 +72,117 @@ public class DirectNetezzaExportManualTest extends TestExport {
}
@Override
- protected String [] getArgv(boolean includeHadoopFlags,
- int rowsPerStatement, int statementsPerTx, String... additionalArgv) {
-
- String [] subArgv = newStrArray(additionalArgv, "--direct",
- "--username", NetezzaTestUtils.getNZUser(), "--password",
- NetezzaTestUtils.getNZPassword());
- return super.getArgv(includeHadoopFlags, rowsPerStatement,
- statementsPerTx, subArgv);
+ protected String[] getArgv(boolean includeHadoopFlags, int rowsPerStatement,
+ int statementsPerTx, String... additionalArgv) {
+
+ String[] argV = super.getArgv(includeHadoopFlags,
+ rowsPerStatement, statementsPerTx);
+ String[] subArgV = newStrArray(argV, "--direct",
+ "--username", NetezzaTestUtils.getNZUser(), "--password",
+ NetezzaTestUtils.getNZPassword());
+ String[] newArgV = new String[subArgV.length + additionalArgv.length];
+ int i = 0;
+ for (String s : subArgV) {
+ newArgV[i++] = s;
+ }
+ for (String s: additionalArgv) {
+ newArgV[i++] = s;
+ }
+ return newArgV;
}
-
/**
* Create the table definition to export to, removing any prior table. By
* specifying ColumnGenerator arguments, you can add extra columns to the
* table of arbitrary type.
*/
@Override
- public void createTable(ColumnGenerator... extraColumns) throws SQLException {
- PreparedStatement statement = conn.prepareStatement(
- getDropTableStatement(getTableName()), ResultSet.TYPE_FORWARD_ONLY,
- ResultSet.CONCUR_READ_ONLY);
- try {
- statement.executeUpdate();
- conn.commit();
- } catch (SQLException sqle) {
- conn.rollback();
- } finally {
- statement.close();
- }
-
- StringBuilder sb = new StringBuilder();
- sb.append("CREATE TABLE ");
- sb.append(getTableName());
- sb.append(" (id INT NOT NULL PRIMARY KEY, msg VARCHAR(64)");
- int colNum = 0;
- for (ColumnGenerator gen : extraColumns) {
- sb.append(", " + forIdx(colNum++) + " " + gen.getType());
- }
- sb.append(")");
-
- statement = conn.prepareStatement(sb.toString(),
- ResultSet.TYPE_FORWARD_ONLY, ResultSet.CONCUR_READ_ONLY);
- try {
- statement.executeUpdate();
- conn.commit();
- } finally {
- statement.close();
- }
+ public void createTable(ColumnGenerator... extraColumns)
+ throws SQLException {
+ createTableNZ(getTableName(), extraColumns);
}
+
/**
- * Test an authenticated export using netezza external table import.
+ * Creates the staging table.
+ * @param extraColumns extra columns that go in the staging table
+ * @throws SQLException if an error occurs during export
*/
- public void testAuthExport() throws IOException, SQLException {
+ @Override
+ public void createStagingTable(ColumnGenerator... extraColumns)
+ throws SQLException {
+ createTableNZ(getStagingTableName(), extraColumns);
+ }
+
+ private void runNetezzaTest(String tableName, String[] argv,
+ ColumnGenerator...extraCols) throws IOException {
SqoopOptions options = new SqoopOptions(
- NetezzaTestUtils.getNZConnectString(),
- getTableName());
+ NetezzaTestUtils.getNZConnectString(), getTableName());
options.setUsername(NetezzaTestUtils.getNZUser());
options.setPassword(NetezzaTestUtils.getNZPassword());
+ LOG.info("Running export with argv : " + Arrays.toString(argv));
manager = new DirectNetezzaManager(options);
- Connection connection = null;
- Statement st = null;
-
- String tableName = getTableName();
-
try {
- connection = manager.getConnection();
- connection.setAutoCommit(false);
- st = connection.createStatement();
-
- // create a target database table.
- try {
- st.executeUpdate("DROP TABLE " + tableName);
- } catch(SQLException sqle) {
- LOG.info("Ignoring exception from DROP TABLE : " + sqle.getMessage());
- connection.rollback();
- }
-
- LOG.info("Creating table " + tableName);
-
- st.executeUpdate("CREATE TABLE " + tableName + " ("
- + "id INT NOT NULL PRIMARY KEY, "
- + "msg VARCHAR(24) NOT NULL)");
-
- connection.commit();
+ createTable(extraCols);
LOG.info("Created table " + tableName);
-
- // Write a file containing a record to export.
- Path tablePath = getTablePath();
- Path filePath = new Path(tablePath, "datafile");
- Configuration conf = new Configuration();
-
- FileSystem fs = FileSystem.get(conf);
- fs.mkdirs(tablePath);
- OutputStream os = fs.create(filePath);
- BufferedWriter w = new BufferedWriter(new OutputStreamWriter(os));
- w.write(getRecordLine(0));
- w.write(getRecordLine(1));
- w.write(getRecordLine(2));
- w.close();
- os.close();
-
+ createExportFile(extraCols);
// run the export and verify that the results are good.
- runExport(getArgv(true, 10, 10,
- "--username", NetezzaTestUtils.getNZUser(),
- "--password", NetezzaTestUtils.getNZPassword(),
- "--connect", NetezzaTestUtils.getNZConnectString()));
- verifyExport(3, connection);
+ runExport(argv);
+ verifyExport(3, conn);
+ if (extraCols.length > 0) {
+ assertColMinAndMax(forIdx(0), extraCols[0]);
+ }
} catch (SQLException sqlE) {
LOG.error("Encountered SQL Exception: " + sqlE);
sqlE.printStackTrace();
fail("SQLException when accessing target table. " + sqlE);
- } finally {
- try {
- if (null != st) {
- st.close();
- }
- } catch (SQLException sqlE) {
- LOG.warn("Got SQLException when closing connection: " + sqlE);
- }
}
}
+ /**
+ * Test an authenticated export using netezza external table import.
+ */
+ @Test
+ public void testSimpleExport() throws IOException, SQLException {
+ String[] argv = getArgv(true, 10, 10);
+ runNetezzaTest(getTableName(), argv);
+ }
+
+ @Test
+ public void testValidExtraArgs() throws Exception {
+
+ String [] extraArgs = {
+ "--",
+ "--log-dir", "/tmp",
+ "--max-errors", "2",
+ };
+ String[] argv = getArgv(true, 10, 10, extraArgs);
+ runNetezzaTest(getTableName(), argv);
+ }
+
+
@Override
- public void testMultiMapTextExportWithStaging()
- throws IOException, SQLException {
+ public void testMultiMapTextExportWithStaging() throws IOException,
+ SQLException {
// disable this test as staging is not supported in direct mode
}
@Override
- public void testMultiTransactionWithStaging()
- throws IOException, SQLException {
+ public void testMultiTransactionWithStaging() throws IOException,
+ SQLException {
// disable this test as staging is not supported in direct mode
}
@Override
- public void testColumnsExport()
- throws IOException, SQLException {
+ public void testColumnsExport() throws IOException, SQLException {
// disable this test as it is not supported in direct mode
}
@Override
- public void testSequenceFileExport()
- throws IOException, SQLException {
+ public void testSequenceFileExport() throws IOException, SQLException {
// disable this test as it is not supported in direct mode
}
}
http://git-wip-us.apache.org/repos/asf/sqoop/blob/b4b9920c/src/test/com/cloudera/sqoop/manager/NetezzaExportManualTest.java
----------------------------------------------------------------------
diff --git a/src/test/com/cloudera/sqoop/manager/NetezzaExportManualTest.java b/src/test/com/cloudera/sqoop/manager/NetezzaExportManualTest.java
new file mode 100644
index 0000000..50d27fe
--- /dev/null
+++ b/src/test/com/cloudera/sqoop/manager/NetezzaExportManualTest.java
@@ -0,0 +1,246 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package com.cloudera.sqoop.manager;
+
+import java.io.BufferedWriter;
+import java.io.IOException;
+import java.io.OutputStream;
+import java.io.OutputStreamWriter;
+import java.sql.Connection;
+import java.sql.PreparedStatement;
+import java.sql.ResultSet;
+import java.sql.SQLException;
+
+import org.apache.commons.logging.Log;
+import org.apache.commons.logging.LogFactory;
+import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.fs.FileSystem;
+import org.apache.hadoop.fs.Path;
+import org.apache.sqoop.manager.DirectNetezzaManager;
+import org.apache.sqoop.manager.NetezzaManager;
+import org.junit.Before;
+
+
+import com.cloudera.sqoop.SqoopOptions;
+import com.cloudera.sqoop.TestExport;
+import com.cloudera.sqoop.testutil.BaseSqoopTestCase;
+import com.cloudera.sqoop.testutil.CommonArgs;
+
+/**
+ * Test the Netezza implementation.
+ *
+ * This uses JDBC to export data from an Netezza database into HDFS. See
+ * DirectNetezzaExportManualTest for external table methods.
+ *
+ * Since this requires an Netezza Server installation, this class is named in
+ * such a way that Sqoop's default QA process does not run it. You need to run
+ * this manually with -Dtestcase=NetezzaExportManualTest.
+ *
+ */
+public class NetezzaExportManualTest extends TestExport {
+ public static final Log LOG = LogFactory.getLog(NetezzaExportManualTest.class
+ .getName());
+ static final String TABLE_PREFIX = "EMPNZ_EXP_";
+ // instance variables populated during setUp, used during tests
+ protected NetezzaManager manager;
+ protected Connection conn;
+
+
+ @Override
+ protected boolean useHsqldbTestServer() {
+ return false;
+ }
+
+ protected boolean isDirectMode() {
+ return false;
+ }
+
+ @Override
+ protected Connection getConnection() {
+ return conn;
+ }
+
+
+ @Override
+ protected String getConnectString() {
+ return NetezzaTestUtils.getNZConnectString();
+ }
+
+ @Override
+ protected String getTablePrefix() {
+ return TABLE_PREFIX;
+ }
+
+ @Override
+ protected String getDropTableStatement(String tableName) {
+ return "DROP TABLE " + tableName;
+ }
+
+ protected void createTableNZ(String tableName, ColumnGenerator...extraCols)
+ throws SQLException {
+ String sqlStatement = getDropTableStatement(tableName);
+ conn.rollback();
+ LOG.info("Executing drop statement : " + sqlStatement);
+ PreparedStatement statement = conn.prepareStatement(
+ sqlStatement, ResultSet.TYPE_FORWARD_ONLY,
+ ResultSet.CONCUR_READ_ONLY);
+ try {
+ statement.executeUpdate();
+ conn.commit();
+ } catch (SQLException sqle) {
+ conn.rollback();
+ } finally {
+ statement.close();
+ }
+
+ StringBuilder sb = new StringBuilder();
+ sb.append("CREATE TABLE ");
+ sb.append(tableName);
+ sb.append(" (id INT NOT NULL PRIMARY KEY, msg VARCHAR(64)");
+ int colNum = 0;
+ for (ColumnGenerator gen : extraCols) {
+ sb.append(", " + forIdx(colNum++) + " " + gen.getType());
+ }
+ sb.append(")");
+ sqlStatement = sb.toString();
+ LOG.info("Executing create statement : " + sqlStatement);
+ statement = conn.prepareStatement(sqlStatement,
+ ResultSet.TYPE_FORWARD_ONLY, ResultSet.CONCUR_READ_ONLY);
+ try {
+ statement.executeUpdate();
+ conn.commit();
+ } finally {
+ statement.close();
+ }
+ }
+
+ /**
+ * Create the table definition to export to, removing any prior table. By
+ * specifying ColumnGenerator arguments, you can add extra columns to the
+ * table of arbitrary type.
+ */
+ @Override
+ public void createTable(ColumnGenerator... extraColumns)
+ throws SQLException {
+ createTableNZ(getTableName(), extraColumns);
+ }
+
+ /**
+ * Creates the staging table.
+ * @param extraColumns extra columns that go in the staging table
+ * @throws SQLException if an error occurs during export
+ */
+ @Override
+ public void createStagingTable(ColumnGenerator... extraColumns)
+ throws SQLException {
+ createTableNZ(getStagingTableName(), extraColumns);
+ }
+
+
+
+ @Before
+ public void setUp() {
+ super.setUp();
+ SqoopOptions options = new SqoopOptions(
+ NetezzaTestUtils.getNZConnectString(), getTableName());
+ options.setUsername(NetezzaTestUtils.getNZUser());
+ options.setPassword(NetezzaTestUtils.getNZPassword());
+ if (isDirectMode()) {
+ this.manager = new DirectNetezzaManager(options);
+ } else {
+ this.manager = new NetezzaManager(options);
+ }
+
+ try {
+ this.conn = manager.getConnection();
+ this.conn.setAutoCommit(false);
+ } catch (SQLException sqlE) {
+ LOG.error("Encountered SQL Exception: " + sqlE);
+ sqlE.printStackTrace();
+ fail("SQLException when running test setUp(): " + sqlE);
+ }
+ }
+
+
+
+ @Override
+ protected String[] getArgv(boolean includeHadoopFlags, int rowsPerStatement,
+ int statementsPerTx, String... additionalArgv) {
+
+ String[] argV = super.getArgv(includeHadoopFlags,
+ rowsPerStatement, statementsPerTx);
+ String[] subArgV = newStrArray(argV,
+ "--username", NetezzaTestUtils.getNZUser(), "--password",
+ NetezzaTestUtils.getNZPassword());
+ String[] newArgV = new String[subArgV.length + additionalArgv.length];
+ int i = 0;
+ for (String s : subArgV) {
+ newArgV[i++] = s;
+ }
+ for (String s: additionalArgv) {
+ newArgV[i++] = s;
+ }
+ return newArgV;
+ }
+
+ @Override
+ protected String[] getCodeGenArgv(String... extraArgs) {
+ String[] moreArgs;
+
+ moreArgs = new String[extraArgs.length + 4];
+
+ int i = 0;
+ for (i = 0; i < extraArgs.length; i++) {
+ moreArgs[i] = extraArgs[i];
+ }
+
+ // Add username argument for netezza.
+ moreArgs[i++] = "--username";
+ moreArgs[i++] = NetezzaTestUtils.getNZUser();
+ moreArgs[i++] = "--password";
+ moreArgs[i++] = NetezzaTestUtils.getNZPassword();
+
+ return super.getCodeGenArgv(moreArgs);
+ }
+
+ protected void createExportFile(ColumnGenerator...extraCols)
+ throws IOException, SQLException {
+ String ext = ".txt";
+
+ Path tablePath = getTablePath();
+ Path filePath = new Path(tablePath, "part0" + ext);
+
+ Configuration conf = new Configuration();
+ if (!BaseSqoopTestCase.isOnPhysicalCluster()) {
+ conf.set(CommonArgs.FS_DEFAULT_NAME, CommonArgs.LOCAL_FS);
+ }
+ FileSystem fs = FileSystem.get(conf);
+ fs.mkdirs(tablePath);
+ OutputStream os = fs.create(filePath);
+
+
+ BufferedWriter w = new BufferedWriter(new OutputStreamWriter(os));
+ for (int i = 0; i < 3; i++) {
+ String line = getRecordLine(i, extraCols);
+ w.write(line);
+ LOG.debug("Create Export file - Writing line : " + line);
+ }
+ w.close();
+ os.close();
+ }
+}
http://git-wip-us.apache.org/repos/asf/sqoop/blob/b4b9920c/src/test/com/cloudera/sqoop/manager/NetezzaImportManualTest.java
----------------------------------------------------------------------
diff --git a/src/test/com/cloudera/sqoop/manager/NetezzaImportManualTest.java b/src/test/com/cloudera/sqoop/manager/NetezzaImportManualTest.java
index 97399fe..3482dd8 100644
--- a/src/test/com/cloudera/sqoop/manager/NetezzaImportManualTest.java
+++ b/src/test/com/cloudera/sqoop/manager/NetezzaImportManualTest.java
@@ -23,6 +23,8 @@ import java.io.FileInputStream;
import java.io.IOException;
import java.io.InputStreamReader;
import java.sql.Connection;
+import java.sql.PreparedStatement;
+import java.sql.ResultSet;
import java.sql.SQLException;
import java.sql.Statement;
import java.util.ArrayList;
@@ -43,26 +45,24 @@ import com.cloudera.sqoop.testutil.ImportJobTestCase;
import com.cloudera.sqoop.util.FileListing;
/**
- * Test the Netezza implementation.
+ * Test the Netezza implementation.
*
- * This uses JDBC to import data from an Netezza database into HDFS.
+ * This uses both JDBC and external tables to import data from an Netezza
+ * database into HDFS.
*
- * Since this requires an Netezza SErver installation, this class is named
- * in such a way that Sqoop's default QA process does not run it. You need to
- * run this manually with -Dtestcase=NetezzaManagerImportManualTest.
+ * Since this requires an Netezza Server installation, this class is named in
+ * such a way that Sqoop's default QA process does not run it. You need to run
+ * this manually with -Dtestcase=NetezzaImportManualTest.
*
*/
public class NetezzaImportManualTest extends ImportJobTestCase {
- public static final Log LOG = LogFactory
- .getLog(NetezzaImportManualTest.class.getName());
-
-
+ public static final Log LOG = LogFactory.
+ getLog(NetezzaImportManualTest.class.getName());
// instance variables populated during setUp, used during tests
private NetezzaManager manager;
-
-
+ private Connection conn;
@Override
protected boolean useHsqldbTestServer() {
return false;
@@ -70,72 +70,101 @@ public class NetezzaImportManualTest extends ImportJobTestCase {
@Override
protected String getTableName() {
- return NetezzaTestUtils.TABLE_NAME;
+ return NetezzaTestUtils.TABLE_NAME + "_IMP_";
}
- @Before
- public void setUp() {
- super.setUp();
- SqoopOptions options = new SqoopOptions(
- NetezzaTestUtils.getNZConnectString(), getTableName());
- options.setUsername(NetezzaTestUtils.getNZUser());
- options.setPassword(NetezzaTestUtils.getNZPassword());
+ private void createTable(String tableName, String... extraColumns)
+ throws SQLException {
+ PreparedStatement statement = conn.prepareStatement("DROP TABLE "
+ + tableName, ResultSet.TYPE_FORWARD_ONLY, ResultSet.CONCUR_READ_ONLY);
+ try {
+ statement.executeUpdate();
+ conn.commit();
+ } catch (SQLException sqle) {
+ conn.rollback();
+ } finally {
+ statement.close();
+ }
- manager = new NetezzaManager(options);
+ StringBuilder sb = new StringBuilder();
+ sb.append("CREATE TABLE " + tableName + " (");
+ sb.append("id INT NOT NULL PRIMARY KEY, ");
+ sb.append("name VARCHAR(24) NOT NULL, ");
+ sb.append("start_date DATE, ");
+ sb.append("Salary FLOAT, ");
+ sb.append("Fired BOOL, ");
+ sb.append("dept VARCHAR(32) ");
+ for (String col : extraColumns) {
+ sb.append(", " + col + " INTEGER");
+ }
+ sb.append(")");
- // Drop the existing table, if there is one.
- Connection conn = null;
- Statement stmt = null;
+ statement = conn.prepareStatement(sb.toString(),
+ ResultSet.TYPE_FORWARD_ONLY, ResultSet.CONCUR_READ_ONLY);
try {
- conn = manager.getConnection();
- stmt = conn.createStatement();
- stmt.execute("DROP TABLE " + getTableName());
- } catch (SQLException sqlE) {
- LOG.info("Table was not dropped: " + sqlE.getMessage());
+ statement.executeUpdate();
+ conn.commit();
} finally {
- try {
- if (null != stmt) {
- stmt.close();
- }
- } catch (Exception ex) {
- LOG.warn("Exception while closing stmt", ex);
- }
+ statement.close();
}
+ }
- // Create and populate table
+ private void populateTable(String tableName) throws SQLException {
+ Statement statement = conn.createStatement();
try {
- conn = manager.getConnection();
- conn.setAutoCommit(false);
- stmt = conn.createStatement();
-
- // create the database table and populate it with data.
- stmt.executeUpdate("CREATE TABLE " + getTableName() + " ("
- + "id INT NOT NULL, " + "name VARCHAR(24) NOT NULL, "
- + "salary FLOAT, " + "dept VARCHAR(32), " + "PRIMARY KEY (id))");
-
- stmt.executeUpdate("INSERT INTO " + getTableName() + " VALUES("
- + "1,'Aaron', " + "1000000.00,'engineering')");
- stmt.executeUpdate("INSERT INTO " + getTableName() + " VALUES("
- + "2,'Bob', " + "400.00,'sales')");
- stmt.executeUpdate("INSERT INTO " + getTableName() + " VALUES("
- + "3,'Fred', 15.00," + "'marketing')");
+ statement.executeUpdate("INSERT INTO " + tableName
+ + " VALUES(1,'Aaron','2009-05-14',1000000.00,TRUE,'engineering')");
+ statement.executeUpdate("INSERT INTO " + tableName
+ + " VALUES(2,'Bob','2009-04-20',400.00,TRUE,'sales')");
+ statement.executeUpdate("INSERT INTO " + tableName
+ + " VALUES(3,'Fred','2009-01-23',15.00,FALSE,'marketing')");
conn.commit();
- } catch (SQLException sqlE) {
- LOG.error("Encountered SQL Exception: ", sqlE);
- sqlE.printStackTrace();
- fail("SQLException when running test setUp(): " + sqlE);
} finally {
- try {
- if (null != stmt) {
- stmt.close();
- }
- } catch (Exception ex) {
- LOG.warn("Exception while closing connection/stmt", ex);
- }
+ statement.close();
+ }
+ }
+
+ private void populateTableWithNull(String tableName) throws SQLException{
+ Statement statement = conn.createStatement();
+ try {
+ statement.executeUpdate("INSERT INTO " + tableName
+ + " VALUES(1,'Aaron','2009-05-14',1000000.00,TRUE,"
+ + "'engineering',NULL,1)");
+ statement.executeUpdate("INSERT INTO " + tableName
+ + " VALUES(2,'Bob','2009-04-20',400.00,TRUE,'sales',NULL,2)");
+ statement.executeUpdate("INSERT INTO " + tableName
+ + " VALUES(3,'Fred','2009-01-23',15.00,FALSE,'marketing',NULL,3)");
+ conn.commit();
+ } finally {
+ statement.close();
+ }
+ }
+
+ public void setUpData() {
+ SqoopOptions options = new SqoopOptions(
+ NetezzaTestUtils.getNZConnectString(), getTableName());
+ options.setUsername(NetezzaTestUtils.getNZUser());
+ options.setPassword(NetezzaTestUtils.getNZPassword());
+ try {
+ manager = new NetezzaManager(options);
+ conn = manager.getConnection();
+ createTable(getTableName());
+ populateTable(getTableName());
+ String tableNameWithNull = getTableName() + "_W_N";
+ createTable(tableNameWithNull, new String[] { "col0", "col1" });
+ populateTableWithNull(tableNameWithNull);
+ } catch (SQLException sqlE) {
+ fail("Setup failed with SQLException " + sqlE);
}
}
+ @Before
+ public void setUp() {
+ super.setUp();
+ setUpData();
+ }
+
@After
public void tearDown() {
super.tearDown();
@@ -147,24 +176,50 @@ public class NetezzaImportManualTest extends ImportJobTestCase {
}
}
- @Test
- public void testNetezzaImport() throws IOException {
+ private String[] getExpectedResults() {
+ String [] expectedResults = {
+ "1,Aaron,2009-05-14,1000000.0,true,engineering",
+ "2,Bob,2009-04-20,400.0,true,sales",
+ "3,Fred,2009-01-23,15.0,false,marketing",
+ };
- runNetezzaTest(getExpectedResults());
+ return expectedResults;
}
+ private String[] getDirectModeExpectedResults() {
+ String [] expectedResults = {
+ "1,Aaron,2009-05-14,1000000,T,engineering",
+ "2,Bob,2009-04-20,400,T,sales",
+ "3,Fred,2009-01-23,15,F,marketing",
+ };
+ return expectedResults;
+ }
+ private String[] getExpectedResultsWithNulls() {
+ String [] expectedResults = {
+ "1,Aaron,2009-05-14,1000000.0,true,engineering,\\N,1",
+ "2,Bob,2009-04-20,400.0,true,sales,\\N,2",
+ "3,Fred,2009-01-23,15.0,false,marketing,\\N,3",
+ };
- private String[] getExpectedResults() {
- return new String[] { "1,Aaron,1000000.0,engineering", "2,Bob,400.0,sales",
- "3,Fred,15.0,marketing", };
+ return expectedResults;
}
- private String[] getArgv() {
+ private String[] getDirectModeExpectedResultsWithNulls() {
+ String [] expectedResults = {
+ "1,Aaron,2009-05-14,1000000,T,engineering,nvl,1",
+ "2,Bob,2009-04-20,400,T,sales,nvl,2",
+ "3,Fred,2009-01-23,15,F,marketing,nvl,3",
+ };
+
+ return expectedResults;
+ }
+ private String[] getArgv(boolean isDirect, String tableName,
+ String... extraArgs) {
ArrayList<String> args = new ArrayList<String>();
CommonArgs.addHadoopFlags(args);
args.add("--table");
- args.add(getTableName());
+ args.add(tableName);
args.add("--warehouse-dir");
args.add(getWarehouseDir());
args.add("--connect");
@@ -175,14 +230,25 @@ public class NetezzaImportManualTest extends ImportJobTestCase {
args.add(NetezzaTestUtils.getNZPassword());
args.add("--num-mappers");
args.add("1");
+
+ if (isDirect) {
+ args.add("--direct");
+ }
+ for (String arg : extraArgs) {
+ args.add(arg);
+ }
return args.toArray(new String[args.size()]);
}
- private void runNetezzaTest(String[] expectedResults) throws IOException {
+ private void runNetezzaTest(boolean isDirect, String tableName,
+ String[] expectedResults, String... extraArgs) throws IOException {
Path warehousePath = new Path(this.getWarehouseDir());
- Path tablePath = new Path(warehousePath, getTableName());
- Path filePath = new Path(tablePath, "part-m-00000");
+ Path tablePath = new Path(warehousePath, tableName);
+
+ Path filePath;
+
+ filePath = new Path(tablePath, "part-m-00000");
File tableFile = new File(tablePath.toString());
if (tableFile.exists() && tableFile.isDirectory()) {
@@ -190,7 +256,7 @@ public class NetezzaImportManualTest extends ImportJobTestCase {
FileListing.recursiveDeleteDir(tableFile);
}
- String[] argv = getArgv();
+ String[] argv = getArgv(isDirect, tableName, extraArgs);
try {
runImport(argv);
} catch (IOException ioe) {
@@ -200,7 +266,7 @@ public class NetezzaImportManualTest extends ImportJobTestCase {
}
File f = new File(filePath.toString());
- assertTrue("Could not find imported data file", f.exists());
+ assertTrue("Could not find imported data file : " + f, f.exists());
BufferedReader r = null;
try {
// Read through the file and make sure it's all there.
@@ -208,6 +274,7 @@ public class NetezzaImportManualTest extends ImportJobTestCase {
String[] s = new String[3];
for (int i = 0; i < s.length; ++i) {
s[i] = r.readLine();
+ LOG.info("Line read from file = " + s[i]);
}
Arrays.sort(s);
for (int i = 0; i < expectedResults.length; ++i) {
@@ -222,4 +289,70 @@ public class NetezzaImportManualTest extends ImportJobTestCase {
}
}
+ @Test
+ public void testNetezzaImport() throws IOException {
+
+ runNetezzaTest(false, getTableName(), getExpectedResults());
+ }
+
+ @Test
+ public void testDirectImport() throws IOException {
+ runNetezzaTest(true, getTableName(), getDirectModeExpectedResults());
+ }
+
+ @Test
+ public void testListTables() throws IOException {
+ SqoopOptions options = new SqoopOptions(
+ NetezzaTestUtils.getNZConnectString(), getTableName());
+ options.setUsername(NetezzaTestUtils.getNZUser());
+ options.setPassword(NetezzaTestUtils.getNZPassword());
+
+ ConnManager mgr = new NetezzaManager(options);
+ String[] tables = mgr.listTables();
+ Arrays.sort(tables);
+ assertTrue(getTableName() + " is not found!",
+ Arrays.binarySearch(tables, getTableName()) >= 0);
+ }
+
+ @Test
+ public void testIncrementalImport() throws IOException {
+ String[] expectedResults = {};
+
+ String[] extraArgs = { "--incremental", "lastmodified", "--check-column",
+ "START_DATE", };
+
+ runNetezzaTest(false, getTableName(), expectedResults, extraArgs);
+ }
+
+ @Test
+ public void testNullEscapeCharacters() throws Exception {
+
+
+ String [] extraArgs = {
+ "--null-string", "\\\\N",
+ "--null-non-string", "\\\\N",
+ };
+
+ String[] expectedResultsWithNulls =
+ getExpectedResultsWithNulls();
+ String tableNameWithNull = getTableName() + "_W_N";
+
+ runNetezzaTest(false, tableNameWithNull, expectedResultsWithNulls,
+ extraArgs);
+ }
+
+ @Test
+ public void testValidExtraArgs() throws Exception {
+
+ String [] extraArgs = {
+ "--",
+ "--log-dir", "/tmp",
+ "--max-errors", "2",
+ };
+ String[] expectedResults = getDirectModeExpectedResults();
+ String tableName = getTableName();
+
+ runNetezzaTest(true, tableName, expectedResults,
+ extraArgs);
+ }
}
http://git-wip-us.apache.org/repos/asf/sqoop/blob/b4b9920c/src/test/com/cloudera/sqoop/manager/NetezzaTestUtils.java
----------------------------------------------------------------------
diff --git a/src/test/com/cloudera/sqoop/manager/NetezzaTestUtils.java b/src/test/com/cloudera/sqoop/manager/NetezzaTestUtils.java
index 9f4c07c..4bf05b8 100644
--- a/src/test/com/cloudera/sqoop/manager/NetezzaTestUtils.java
+++ b/src/test/com/cloudera/sqoop/manager/NetezzaTestUtils.java
@@ -45,7 +45,7 @@ public final class NetezzaTestUtils {
public static final String NZ_DB_NAME = System.getProperty(
"sqoop.test.netezza.db.name", "SQOOP");
public static final String TABLE_NAME = System.getProperty(
- "sqoop.test.netezza.table.name", "EMP");
+ "sqoop.test.netezza.table.name", "EMPNZ");
private NetezzaTestUtils() { }