You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@sqoop.apache.org by an...@apache.org on 2018/01/18 14:03:51 UTC
[05/32] sqoop git commit: SQOOP-3273: Removing com.cloudera.sqoop
packages
http://git-wip-us.apache.org/repos/asf/sqoop/blob/6984a36c/src/test/org/apache/sqoop/manager/oracle/SystemImportTest.java
----------------------------------------------------------------------
diff --git a/src/test/org/apache/sqoop/manager/oracle/SystemImportTest.java b/src/test/org/apache/sqoop/manager/oracle/SystemImportTest.java
index f6e5c0e..e0a0462 100644
--- a/src/test/org/apache/sqoop/manager/oracle/SystemImportTest.java
+++ b/src/test/org/apache/sqoop/manager/oracle/SystemImportTest.java
@@ -44,10 +44,10 @@ import org.apache.hadoop.io.SequenceFile;
import org.apache.sqoop.manager.oracle.util.*;
import org.junit.Test;
-import com.cloudera.sqoop.lib.BlobRef;
-import com.cloudera.sqoop.lib.ClobRef;
-import com.cloudera.sqoop.lib.SqoopRecord;
-import com.cloudera.sqoop.manager.OracleUtils;
+import org.apache.sqoop.lib.BlobRef;
+import org.apache.sqoop.lib.ClobRef;
+import org.apache.sqoop.lib.SqoopRecord;
+import org.apache.sqoop.manager.oracle.util.OracleUtils;
/**
* OraOop system tests of importing data from oracle to hadoop.
http://git-wip-us.apache.org/repos/asf/sqoop/blob/6984a36c/src/test/org/apache/sqoop/manager/oracle/TestOraOopDataDrivenDBInputFormat.java
----------------------------------------------------------------------
diff --git a/src/test/org/apache/sqoop/manager/oracle/TestOraOopDataDrivenDBInputFormat.java b/src/test/org/apache/sqoop/manager/oracle/TestOraOopDataDrivenDBInputFormat.java
index 7d3abfd..e98fdfe 100644
--- a/src/test/org/apache/sqoop/manager/oracle/TestOraOopDataDrivenDBInputFormat.java
+++ b/src/test/org/apache/sqoop/manager/oracle/TestOraOopDataDrivenDBInputFormat.java
@@ -25,7 +25,7 @@ import org.apache.hadoop.mapreduce.InputSplit;
import org.junit.Assert;
import org.junit.Test;
-import com.cloudera.sqoop.lib.SqoopRecord;
+import org.apache.sqoop.lib.SqoopRecord;
import org.apache.sqoop.manager.oracle.OraOopConstants.
OraOopOracleBlockToSplitAllocationMethod;
http://git-wip-us.apache.org/repos/asf/sqoop/blob/6984a36c/src/test/org/apache/sqoop/manager/oracle/util/OracleUtils.java
----------------------------------------------------------------------
diff --git a/src/test/org/apache/sqoop/manager/oracle/util/OracleUtils.java b/src/test/org/apache/sqoop/manager/oracle/util/OracleUtils.java
new file mode 100644
index 0000000..6d752aa
--- /dev/null
+++ b/src/test/org/apache/sqoop/manager/oracle/util/OracleUtils.java
@@ -0,0 +1,102 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.sqoop.manager.oracle.util;
+
+import java.sql.Connection;
+import java.sql.SQLException;
+import java.sql.Statement;
+
+import org.apache.commons.logging.Log;
+import org.apache.commons.logging.LogFactory;
+
+import org.apache.sqoop.SqoopOptions;
+import org.apache.sqoop.manager.ConnManager;
+
+/**
+ * Helper methods for Oracle testing.
+ */
+public final class OracleUtils {
+
+ public static final Log LOG = LogFactory.getLog(OracleUtils.class.getName());
+
+ // Express edition hardcoded name.
+ public static final String ORACLE_DATABASE_NAME = "xe";
+
+ public static final String CONNECT_STRING = System.getProperty("sqoop.test.oracle.connectstring", "jdbc:oracle:thin:@//localhost/" + ORACLE_DATABASE_NAME);
+ public static final String ORACLE_USER_NAME = System.getProperty("sqoop.test.oracle.username", "SQOOPTEST");
+ public static final String ORACLE_USER_PASS = System.getProperty("sqoop.test.oracle.password", "12345");
+
+ public static final String ORACLE_SECONDARY_USER_NAME = "SQOOPTEST2";
+ public static final String ORACLE_SECONDARY_USER_PASS = "ABCDEF";
+
+ public static final String ORACLE_INVALID_USER_NAME = "invalidusr";
+ public static final String SYSTEMTEST_TABLE_NAME = "ORAOOP_TEST";
+ public static final int SYSTEMTEST_NUM_ROWS = 100;
+ public static final int INTEGRATIONTEST_NUM_ROWS = 10000;
+ // Number of mappers if wanting to override default setting
+ public static final int NUM_MAPPERS = 0;
+ // Oracle degree of parallelism to use when creating table.
+ // If 0 we will calculate a recommended value
+ public static final int ORACLE_PARALLEL_DEGREE = 0;
+
+ private OracleUtils() { }
+
+ public static void setOracleAuth(SqoopOptions options) {
+ options.setUsername(ORACLE_USER_NAME);
+ options.setPassword(ORACLE_USER_PASS);
+ }
+
+ public static void setOracleSecondaryUserAuth(SqoopOptions options) {
+ options.setUsername(ORACLE_SECONDARY_USER_NAME);
+ options.setPassword(ORACLE_SECONDARY_USER_PASS);
+ }
+
+ /**
+ * Drop a table if it exists.
+ */
+ public static void dropTable(String tableName, ConnManager manager)
+ throws SQLException {
+ Connection connection = null;
+ Statement st = null;
+
+ try {
+ connection = manager.getConnection();
+ connection.setAutoCommit(false);
+ st = connection.createStatement();
+
+ // create the database table and populate it with data.
+ st.executeUpdate(getDropTableStatement(tableName));
+
+ connection.commit();
+ } finally {
+ try {
+ if (null != st) {
+ st.close();
+ }
+ } catch (SQLException sqlE) {
+ LOG.warn("Got SQLException when closing connection: " + sqlE);
+ }
+ }
+ }
+
+ public static String getDropTableStatement(String tableName) {
+ return "BEGIN EXECUTE IMMEDIATE 'DROP TABLE " + tableName + "'; "
+ + "exception when others then null; end;";
+ }
+}
http://git-wip-us.apache.org/repos/asf/sqoop/blob/6984a36c/src/test/org/apache/sqoop/manager/postgresql/DirectPostgreSQLExportManualTest.java
----------------------------------------------------------------------
diff --git a/src/test/org/apache/sqoop/manager/postgresql/DirectPostgreSQLExportManualTest.java b/src/test/org/apache/sqoop/manager/postgresql/DirectPostgreSQLExportManualTest.java
new file mode 100644
index 0000000..22b202a
--- /dev/null
+++ b/src/test/org/apache/sqoop/manager/postgresql/DirectPostgreSQLExportManualTest.java
@@ -0,0 +1,189 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.sqoop.manager.postgresql;
+
+import java.io.IOException;
+import java.sql.Connection;
+import java.sql.SQLException;
+import java.sql.PreparedStatement;
+import java.util.Arrays;
+import java.util.ArrayList;
+import org.apache.commons.logging.Log;
+import org.apache.commons.logging.LogFactory;
+import org.apache.hadoop.mapred.JobConf;
+import org.apache.sqoop.TestExport;
+import org.apache.sqoop.mapreduce.db.DBConfiguration;
+import org.junit.Ignore;
+import org.junit.Test;
+
+
+/**
+ * Test the DirectPostgresqlManager implementations.
+ * DirectPostgresqlManager uses JDBC driver to facilitate it.
+ *
+ * Since this requires a Postgresql installation on your local machine to use,
+ * this class is named in such a way that Hadoop's default QA process does not
+ * run it.
+ *
+ * You need to run this manually with
+ * -Dtestcase=DirectPostgreSQLExportManualTest.
+ *
+ * You need to put Postgresql's JDBC driver library into lib dir.
+ *
+ * You need to create a sqooptest superuser and database and tablespace,
+ *
+ * $ sudo -u postgres createuser -U postgres -s sqooptest
+ * $ sudo -u postgres createdb -U sqooptest sqooptest
+ * $ psql -U sqooptest sqooptest
+ *
+ */
+public class DirectPostgreSQLExportManualTest extends TestExport {
+
+ public static final Log LOG =
+ LogFactory.getLog(DirectPostgreSQLExportManualTest.class.getName());
+ private DBConfiguration dbConf;
+
+ static final String HOST_URL =
+ System.getProperty("sqoop.test.postgresql.connectstring.host_url",
+ "jdbc:postgresql://localhost/");
+ static final String DATABASE =
+ System.getProperty("sqoop.test.postgresql.database", "sqooptest");
+ static final String USERNAME =
+ System.getProperty("sqoop.test.postgresql.username", "sqooptest");
+ static final String PASSWORD = System.getProperty(
+ "sqoop.test.postgresql.password");
+ static final String CONNECT_STRING = HOST_URL + DATABASE;
+
+ public DirectPostgreSQLExportManualTest() {
+ JobConf conf = new JobConf(getConf());
+ DBConfiguration.configureDB(conf,
+ "org.postgresql.Driver",
+ getConnectString(),
+ getUserName(),
+ PASSWORD, (Integer) null);
+ dbConf = new DBConfiguration(conf);
+ }
+
+ @Override
+ protected boolean useHsqldbTestServer() {
+ return false;
+ }
+
+ @Override
+ protected String getConnectString() {
+ return CONNECT_STRING;
+ }
+
+ protected String getUserName() {
+ return USERNAME;
+ }
+
+ @Override
+ protected String getTablePrefix() {
+ return super.getTablePrefix().toLowerCase();
+ }
+
+ @Override
+ protected String getTableName() {
+ return super.getTableName().toLowerCase();
+ }
+
+ @Override
+ public String getStagingTableName() {
+ return super.getStagingTableName().toLowerCase();
+ }
+
+ @Override
+ protected Connection getConnection() {
+ try {
+ Connection conn = dbConf.getConnection();
+ conn.setAutoCommit(false);
+ PreparedStatement stmt =
+ conn.prepareStatement("SET extra_float_digits TO 0");
+ stmt.executeUpdate();
+ conn.commit();
+ return conn;
+ } catch (SQLException sqlE) {
+ LOG.error("Could not get connection to test server: " + sqlE);
+ return null;
+ } catch (ClassNotFoundException cnfE) {
+ LOG.error("Could not find driver class: " + cnfE);
+ return null;
+ }
+ }
+
+ @Override
+ protected String getDropTableStatement(String tableName) {
+ return "DROP TABLE IF EXISTS " + tableName;
+ }
+
+ @Override
+ protected String[] getArgv(boolean includeHadoopFlags,
+ int rowsPerStatement,
+ int statementsPerTx,
+ String... additionalArgv) {
+ ArrayList<String> args =
+ new ArrayList<String>(Arrays.asList(additionalArgv));
+ args.add("--username");
+ args.add(getUserName());
+ args.add("--password");
+ args.add(PASSWORD);
+ args.add("--direct");
+ return super.getArgv(includeHadoopFlags,
+ rowsPerStatement,
+ statementsPerTx,
+ args.toArray(new String[0]));
+ }
+
+ @Override
+ protected String [] getCodeGenArgv(String... extraArgs) {
+ ArrayList<String> args = new ArrayList<String>(Arrays.asList(extraArgs));
+ args.add("--username");
+ args.add(getUserName());
+ args.add("--password");
+ args.add(PASSWORD);
+ return super.getCodeGenArgv(args.toArray(new String[0]));
+ }
+
+ @Ignore("Ignoring this test case as direct export does not support --columns option.")
+ @Override
+ @Test
+ public void testColumnsExport() throws IOException, SQLException {
+ }
+
+ @Ignore("Ignoring this test case as the scenario is not supported with direct export.")
+ @Override
+ @Test
+ public void testLessColumnsInFileThanInTable() throws IOException, SQLException {
+ }
+
+ @Ignore("Ignoring this test case as the scenario is not supported with direct export.")
+ @Override
+ @Test
+ public void testLessColumnsInFileThanInTableInputNullIntPassed() throws IOException, SQLException {
+ }
+
+ @Ignore("Ignoring this test case as the scenario is not supported with direct export.")
+ @Override
+ @Test
+ public void testLessColumnsInFileThanInTableInputNullStringPassed() throws IOException, SQLException {
+ }
+
+
+}
http://git-wip-us.apache.org/repos/asf/sqoop/blob/6984a36c/src/test/org/apache/sqoop/manager/postgresql/PGBulkloadManagerManualTest.java
----------------------------------------------------------------------
diff --git a/src/test/org/apache/sqoop/manager/postgresql/PGBulkloadManagerManualTest.java b/src/test/org/apache/sqoop/manager/postgresql/PGBulkloadManagerManualTest.java
new file mode 100644
index 0000000..8855316
--- /dev/null
+++ b/src/test/org/apache/sqoop/manager/postgresql/PGBulkloadManagerManualTest.java
@@ -0,0 +1,207 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.sqoop.manager.postgresql;
+
+import java.io.IOException;
+import java.sql.Connection;
+import java.sql.PreparedStatement;
+import java.sql.SQLException;
+import java.util.ArrayList;
+import java.util.Arrays;
+
+import org.apache.commons.logging.Log;
+import org.apache.commons.logging.LogFactory;
+import org.apache.hadoop.mapred.JobConf;
+import org.junit.Test;
+
+import org.apache.sqoop.TestExport;
+import org.apache.sqoop.mapreduce.db.DBConfiguration;
+
+
+/**
+ * Test the PGBulkloadManager implementations.
+ * PGBulkloadManager uses both JDBC driver and pg_bulkload to facilitate it.
+ *
+ * Since this requires a Postgresql installation on your local machine to use,
+ * this class is named in such a way that Hadoop's default QA process does not
+ * run it.
+ *
+ * You need to run this manually with -Dtestcase=PGBulkloadManagerManualTest.
+ *
+ * You need to put Postgresql's JDBC driver library into lib dir.
+ *
+ * You need to create a sqooptest superuser and database and tablespace,
+ * and install pg_bulkload for sqooptest database:
+ *
+ * $ sudo -u postgres createuser -U postgres -s sqooptest
+ * $ sudo -u postgres createdb -U sqooptest sqooptest
+ * $ sudo -u postgres mkdir /var/pgdata/stagingtablespace
+ * $ psql -U sqooptest
+ * -f /usr/local/share/postgresql/contrib/pg_bulkload.sql sqooptest
+ * $ psql -U sqooptest sqooptest
+ * sqooptest=# CREATE USER sqooptest;
+ * sqooptest=# CREATE DATABASE sqooptest;
+ * sqooptest=# CREATE TABLESPACE sqooptest
+ * LOCATION '/var/pgdata/stagingtablespace';
+ * sqooptest=# \q
+ *
+ */
+public class PGBulkloadManagerManualTest extends TestExport {
+
+ public static final Log LOG =
+ LogFactory.getLog(PGBulkloadManagerManualTest.class.getName());
+ private DBConfiguration dbConf;
+ static final String HOST_URL =
+ System.getProperty("sqoop.test.postgresql.connectstring.host_url",
+ "jdbc:postgresql://localhost/");
+ static final String DATABASE =
+ System.getProperty("sqoop.test.postgresql.database", "sqooptest");
+ static final String TABLESPACE =
+ System.getProperty("sqoop.test.postgresql.tablespace", "sqooptest");
+ static final String USERNAME =
+ System.getProperty("sqoop.test.postgresql.username", "sqooptest");
+ static final String PG_BULKLOAD =
+ System.getProperty("sqoop.test.postgresql.pg_bulkload", "pg_bulkload");
+ static final String CONNECT_STRING = HOST_URL + DATABASE;
+
+ public PGBulkloadManagerManualTest() {
+ JobConf conf = new JobConf(getConf());
+ DBConfiguration.configureDB(conf,
+ "org.postgresql.Driver",
+ getConnectString(),
+ getUserName(),
+ (String) null, (Integer) null);
+ dbConf = new DBConfiguration(conf);
+ }
+
+
+ @Override
+ protected boolean useHsqldbTestServer() {
+ return false;
+ }
+
+
+ @Override
+ protected String getConnectString() {
+ return CONNECT_STRING;
+ }
+
+
+ protected String getUserName() {
+ return USERNAME;
+ }
+
+
+ @Override
+ protected String getTablePrefix() {
+ return super.getTablePrefix().toLowerCase();
+ }
+
+
+ @Override
+ protected String getTableName() {
+ return super.getTableName().toLowerCase();
+ }
+
+ @Override
+ public String getStagingTableName() {
+ return super.getStagingTableName().toLowerCase();
+ }
+
+
+ @Override
+ protected Connection getConnection() {
+ try {
+ Connection conn = dbConf.getConnection();
+ conn.setAutoCommit(false);
+ PreparedStatement stmt =
+ conn.prepareStatement("SET extra_float_digits TO 0");
+ stmt.executeUpdate();
+ conn.commit();
+ return conn;
+ } catch (SQLException sqlE) {
+ LOG.error("Could not get connection to test server: " + sqlE);
+ return null;
+ } catch (ClassNotFoundException cnfE) {
+ LOG.error("Could not find driver class: " + cnfE);
+ return null;
+ }
+ }
+
+
+ @Override
+ protected String getDropTableStatement(String tableName) {
+ return "DROP TABLE IF EXISTS " + tableName;
+ }
+
+
+ @Override
+ protected String[] getArgv(boolean includeHadoopFlags,
+ int rowsPerStatement,
+ int statementsPerTx,
+ String... additionalArgv) {
+ ArrayList<String> args =
+ new ArrayList<String>(Arrays.asList(additionalArgv));
+ args.add("-D");
+ args.add("pgbulkload.bin=" + PG_BULKLOAD);
+ args.add("--username");
+ args.add(getUserName());
+ args.add("--connection-manager");
+ args.add("org.apache.sqoop.manager.PGBulkloadManager");
+ args.add("--staging-table");
+ args.add("dummy");
+ args.add("--clear-staging-table");
+ return super.getArgv(includeHadoopFlags,
+ rowsPerStatement,
+ statementsPerTx,
+ args.toArray(new String[0]));
+ }
+
+
+ @Override
+ protected String [] getCodeGenArgv(String... extraArgs) {
+ ArrayList<String> args = new ArrayList<String>(Arrays.asList(extraArgs));
+ args.add("--username");
+ args.add(getUserName());
+ return super.getCodeGenArgv(args.toArray(new String[0]));
+ }
+
+
+ @Override
+ public void testColumnsExport() throws IOException, SQLException {
+ // PGBulkloadManager does not support --columns option.
+ }
+
+ @Test
+ public void testMultiReduceExport() throws IOException, SQLException {
+ multiFileTest(2, 10, 2, "-D", "mapred.reduce.tasks=2");
+ }
+
+ @Test
+ public void testMultiReduceExportWithNewProp()
+ throws IOException, SQLException {
+ multiFileTest(2, 10, 2, "-D", "mapreduce.job.reduces=2");
+ }
+
+ @Test
+ public void testExportWithTablespace() throws IOException, SQLException {
+ multiFileTest(1, 10, 1,
+ "-D", "pgbulkload.staging.tablespace=" + TABLESPACE);
+ }
+}
http://git-wip-us.apache.org/repos/asf/sqoop/blob/6984a36c/src/test/org/apache/sqoop/manager/postgresql/PostgresqlExportTest.java
----------------------------------------------------------------------
diff --git a/src/test/org/apache/sqoop/manager/postgresql/PostgresqlExportTest.java b/src/test/org/apache/sqoop/manager/postgresql/PostgresqlExportTest.java
new file mode 100644
index 0000000..f86b119
--- /dev/null
+++ b/src/test/org/apache/sqoop/manager/postgresql/PostgresqlExportTest.java
@@ -0,0 +1,492 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.sqoop.manager.postgresql;
+
+import org.apache.sqoop.SqoopOptions;
+import org.apache.sqoop.testutil.CommonArgs;
+import org.apache.sqoop.testutil.ExportJobTestCase;
+import org.apache.commons.logging.Log;
+import org.apache.commons.logging.LogFactory;
+import org.apache.sqoop.manager.ConnManager;
+import org.apache.sqoop.manager.PostgresqlManager;
+import org.junit.Before;
+import org.junit.Test;
+
+import java.io.BufferedWriter;
+import java.io.File;
+import java.io.FileWriter;
+import java.io.IOException;
+import java.io.Writer;
+import java.sql.Connection;
+import java.sql.DriverManager;
+import java.sql.ResultSet;
+import java.sql.SQLException;
+import java.sql.Statement;
+import java.util.ArrayList;
+
+import static org.junit.Assert.assertEquals;
+import static org.junit.Assert.fail;
+
+/**
+ *
+ */
+public class PostgresqlExportTest extends ExportJobTestCase {
+ public static final Log LOG = LogFactory.getLog(
+ PostgresqlExportTest.class.getName());
+
+ static final String HOST_URL = System.getProperty(
+ "sqoop.test.postgresql.connectstring.host_url",
+ "jdbc:postgresql://localhost/");
+ static final String DATABASE_USER = System.getProperty(
+ "sqoop.test.postgresql.username",
+ "sqooptest");
+ static final String DATABASE_NAME = System.getProperty(
+ "sqoop.test.postgresql.database",
+ "sqooptest");
+ static final String PASSWORD = System.getProperty(
+ "sqoop.test.postgresql.password");
+
+ static final String TABLE_NAME = "EMPLOYEES_PG";
+ static final String PROCEDURE_NAME = "INSERT_AN_EMPLOYEE";
+ static final String STAGING_TABLE_NAME = "STAGING";
+ static final String SCHEMA_PUBLIC = "public";
+ static final String SCHEMA_SPECIAL = "special";
+ static final String CONNECT_STRING = HOST_URL + DATABASE_NAME;
+
+ protected Connection connection;
+
+ @Override
+ protected boolean useHsqldbTestServer() {
+ return false;
+ }
+
+ private String getDropTableStatement(String tableName, String schema) {
+ return "DROP TABLE IF EXISTS " + quoteTableOrSchemaName(schema) + "." + quoteTableOrSchemaName(tableName);
+ }
+
+ @Before
+ public void setUp() {
+ super.setUp();
+
+ LOG.debug("Setting up postgresql test: " + CONNECT_STRING);
+
+ try {
+ connection = DriverManager.getConnection(CONNECT_STRING, DATABASE_USER, PASSWORD);
+ connection.setAutoCommit(false);
+ } catch (SQLException ex) {
+ LOG.error("Can't create connection", ex);
+ throw new RuntimeException(ex);
+ }
+
+ createTable(TABLE_NAME, SCHEMA_PUBLIC);
+ createTable(STAGING_TABLE_NAME, SCHEMA_PUBLIC);
+ createTable(TABLE_NAME, SCHEMA_SPECIAL);
+ createTable(STAGING_TABLE_NAME, SCHEMA_SPECIAL);
+ createProcedure(PROCEDURE_NAME, SCHEMA_PUBLIC);
+
+ LOG.debug("setUp complete.");
+ }
+
+ @Override
+ public void tearDown() {
+ try {
+ Statement stmt = connection.createStatement();
+ stmt.executeUpdate(getDropTableStatement(TABLE_NAME, SCHEMA_PUBLIC));
+ stmt.executeUpdate(getDropTableStatement(STAGING_TABLE_NAME, SCHEMA_PUBLIC));
+ stmt.executeUpdate(getDropTableStatement(TABLE_NAME, SCHEMA_SPECIAL));
+ stmt.executeUpdate(getDropTableStatement(STAGING_TABLE_NAME, SCHEMA_SPECIAL));
+ } catch(SQLException e) {
+ LOG.error("Can't clean up the database:", e);
+ }
+
+ super.tearDown();
+
+ try {
+ connection.close();
+ } catch (SQLException e) {
+ LOG.error("Ignoring exception in tearDown", e);
+ }
+ }
+
+ private interface CreateIt {
+ void createIt(
+ Statement st,
+ String fullName,
+ ConnManager manager) throws SQLException;
+ }
+
+ private void createTable(String tableName, String schema) {
+ CreateIt createIt = new CreateIt() {
+ @Override
+ public void createIt(
+ Statement st,
+ String fullName,
+ ConnManager manager) throws SQLException {
+ st.executeUpdate("CREATE TABLE " + fullName + " ("
+ + manager.escapeColName("id") + " INT NOT NULL PRIMARY KEY, "
+ + manager.escapeColName("name") + " VARCHAR(24) NOT NULL, "
+ + manager.escapeColName("start_date") + " DATE, "
+ + manager.escapeColName("salary") + " FLOAT, "
+ + manager.escapeColName("dept") + " VARCHAR(32))");
+ }
+ };
+ create(tableName, "TABLE", schema, createIt);
+ }
+
+ private void createProcedure(String procedureName, String schema) {
+ CreateIt createIt = new CreateIt() {
+ @Override
+ public void createIt(
+ Statement st,
+ String fullName,
+ ConnManager manager) throws SQLException {
+ st.executeUpdate("CREATE OR REPLACE FUNCTION " + fullName + " ("
+ + "IN " + manager.escapeColName("id") + " INT,"
+ + "IN " + manager.escapeColName("name") + " VARCHAR(24),"
+ + "IN " + manager.escapeColName("start_date") + " DATE,"
+ + "IN " + manager.escapeColName("salary") + " FLOAT,"
+ + "IN " + manager.escapeColName("dept") + " VARCHAR(32)"
+ + ") "
+ + "RETURNS VOID "
+ + "AS $$ "
+ + "BEGIN "
+ + "INSERT INTO "
+ + quoteTableOrSchemaName(SCHEMA_PUBLIC)
+ + "."
+ + quoteTableOrSchemaName(TABLE_NAME)
+ + " ("
+ + manager.escapeColName("id")
+ +", "
+ + manager.escapeColName("name")
+ +", "
+ + manager.escapeColName("start_date")
+ +", "
+ + manager.escapeColName("salary")
+ +", "
+ + manager.escapeColName("dept")
+ + ") VALUES ("
+ + manager.escapeColName("id")
+ +", "
+ + manager.escapeColName("name")
+ +", "
+ + manager.escapeColName("start_date")
+ +", "
+ + manager.escapeColName("salary")
+ +", "
+ + manager.escapeColName("dept")
+ + ");"
+ + "END;"
+ + "$$ LANGUAGE plpgsql;");
+ }
+ };
+ create(procedureName, "FUNCTION", schema, createIt);
+ }
+
+ private void create(
+ String name,
+ String type,
+ String schema,
+ CreateIt createIt) {
+ SqoopOptions options = new SqoopOptions(CONNECT_STRING, name);
+ options.setUsername(DATABASE_USER);
+
+ ConnManager manager = null;
+ Statement st = null;
+
+ try {
+ manager = new PostgresqlManager(options);
+ st = connection.createStatement();
+
+ // Create schema if not exists in dummy way (always create and ignore
+ // errors.
+ try {
+ st.executeUpdate("CREATE SCHEMA " + quoteTableOrSchemaName(schema));
+ connection.commit();
+ } catch (SQLException e) {
+ LOG.info("Couldn't create schema " + schema + " (is o.k. as long as"
+ + "the schema already exists.", e);
+ connection.rollback();
+ }
+
+ String fullTableName = quoteTableOrSchemaName(schema)
+ + "." + quoteTableOrSchemaName(name);
+
+ try {
+ // Try to remove the table first. DROP TABLE IF EXISTS didn't
+ // get added until pg 8.3, so we just use "DROP TABLE" and ignore
+ // any exception here if one occurs.
+ st.executeUpdate("DROP " + type + " " + fullTableName);
+ } catch (SQLException e) {
+ LOG.info("Couldn't drop "
+ + type.toLowerCase()
+ + " " +fullTableName
+ + " (ok)",
+ e);
+ // Now we need to reset the transaction.
+ connection.rollback();
+ }
+
+ createIt.createIt(st, fullTableName, manager);
+
+ connection.commit();
+ } catch (SQLException sqlE) {
+ LOG.error("Encountered SQL Exception: " + sqlE);
+ sqlE.printStackTrace();
+ fail("SQLException when running test setUp(): " + sqlE);
+ } finally {
+ try {
+ if (null != st) {
+ st.close();
+ }
+
+ if (null != manager) {
+ manager.close();
+ }
+ } catch (SQLException sqlE) {
+ LOG.warn("Got SQLException when closing connection: " + sqlE);
+ }
+ }
+
+ LOG.debug("setUp complete.");
+ }
+
+ private String [] getArgv(boolean useTable,
+ String... extraArgs) {
+ ArrayList<String> args = new ArrayList<String>();
+
+ CommonArgs.addHadoopFlags(args);
+
+ if (useTable) {
+ args.add("--table");
+ args.add(TABLE_NAME);
+ } else {
+ args.add("--call");
+ args.add(PROCEDURE_NAME);
+ }
+ args.add("--export-dir");
+ args.add(getWarehouseDir());
+ args.add("--fields-terminated-by");
+ args.add(",");
+ args.add("--lines-terminated-by");
+ args.add("\\n");
+ args.add("--connect");
+ args.add(CONNECT_STRING);
+ args.add("--username");
+ args.add(DATABASE_USER);
+ args.add("--password");
+ args.add(PASSWORD);
+ args.add("-m");
+ args.add("1");
+
+ for (String arg : extraArgs) {
+ args.add(arg);
+ }
+
+ return args.toArray(new String[0]);
+ }
+
+ protected void createTestFile(String filename,
+ String[] lines)
+ throws IOException {
+ new File(getWarehouseDir()).mkdirs();
+ File file = new File(getWarehouseDir() + "/" + filename);
+ Writer output = new BufferedWriter(new FileWriter(file));
+ for(String line : lines) {
+ output.write(line);
+ output.write("\n");
+ }
+ output.close();
+ }
+
+ @Test
+ public void testExport() throws IOException, SQLException {
+ createTestFile("inputFile", new String[] {
+ "2,Bob,2009-04-20,400,sales",
+ "3,Fred,2009-01-23,15,marketing",
+ });
+
+ runExport(getArgv(true));
+
+ assertRowCount(2, quoteTableOrSchemaName(TABLE_NAME), connection);
+ }
+
+ @Test
+ public void testExportUsingProcedure() throws IOException, SQLException {
+ createTestFile("inputFile", new String[] {
+ "2,Bob,2009-04-20,400,sales",
+ "3,Fred,2009-01-23,15,marketing",
+ });
+
+ runExport(getArgv(false));
+
+ assertRowCount(2, quoteTableOrSchemaName(TABLE_NAME), connection);
+ }
+
+ @Test
+ public void testExportStaging() throws IOException, SQLException {
+ createTestFile("inputFile", new String[] {
+ "2,Bob,2009-04-20,400,sales",
+ "3,Fred,2009-01-23,15,marketing",
+ });
+
+ String[] extra = new String[] {"--staging-table", STAGING_TABLE_NAME, };
+
+ runExport(getArgv(true, extra));
+
+ assertRowCount(2, quoteTableOrSchemaName(TABLE_NAME), connection);
+ }
+
+ @Test
+ public void testExportDirect() throws IOException, SQLException {
+ createTestFile("inputFile", new String[] {
+ "2,Bob,2009-04-20,400,sales",
+ "3,Fred,2009-01-23,15,marketing",
+ });
+
+ String[] extra = new String[] {"--direct"};
+
+ runExport(getArgv(true, extra));
+
+ assertRowCount(2, quoteTableOrSchemaName(TABLE_NAME), connection);
+ }
+
+ @Test
+ public void testExportCustomSchema() throws IOException, SQLException {
+ createTestFile("inputFile", new String[] {
+ "2,Bob,2009-04-20,400,sales",
+ "3,Fred,2009-01-23,15,marketing",
+ });
+
+ String[] extra = new String[] {"--",
+ "--schema",
+ SCHEMA_SPECIAL,
+ };
+
+ runExport(getArgv(true, extra));
+
+ assertRowCount(2,
+ quoteTableOrSchemaName(SCHEMA_SPECIAL)
+ + "." + quoteTableOrSchemaName(TABLE_NAME),
+ connection);
+ }
+
+ @Test
+ public void testExportCustomSchemaStaging() throws IOException, SQLException {
+ createTestFile("inputFile", new String[] {
+ "2,Bob,2009-04-20,400,sales",
+ "3,Fred,2009-01-23,15,marketing",
+ });
+
+ String[] extra = new String[] {
+ "--staging-table",
+ STAGING_TABLE_NAME,
+ "--",
+ "--schema",
+ SCHEMA_SPECIAL,
+ };
+
+ runExport(getArgv(true, extra));
+
+ assertRowCount(2,
+ quoteTableOrSchemaName(SCHEMA_SPECIAL)
+ + "." + quoteTableOrSchemaName(TABLE_NAME),
+ connection);
+ }
+
+ @Test
+ public void testExportCustomSchemaStagingClear()
+ throws IOException, SQLException {
+ createTestFile("inputFile", new String[] {
+ "2,Bob,2009-04-20,400,sales",
+ "3,Fred,2009-01-23,15,marketing",
+ });
+
+ String[] extra = new String[] {
+ "--staging-table",
+ STAGING_TABLE_NAME,
+ "--clear-staging-table",
+ "--",
+ "--schema",
+ SCHEMA_SPECIAL,
+ };
+
+ runExport(getArgv(true, extra));
+
+ assertRowCount(2,
+ quoteTableOrSchemaName(SCHEMA_SPECIAL)
+ + "." + quoteTableOrSchemaName(TABLE_NAME),
+ connection);
+ }
+
+ @Test
+ public void testExportCustomSchemaDirect() throws IOException, SQLException {
+ createTestFile("inputFile", new String[] {
+ "2,Bob,2009-04-20,400,sales",
+ "3,Fred,2009-01-23,15,marketing",
+ });
+
+ String[] extra = new String[] {
+ "--direct",
+ "--",
+ "--schema",
+ SCHEMA_SPECIAL,
+ };
+
+ runExport(getArgv(true, extra));
+
+ assertRowCount(2,
+ quoteTableOrSchemaName(SCHEMA_SPECIAL)
+ + "." + quoteTableOrSchemaName(TABLE_NAME),
+ connection);
+ }
+
+ public static void assertRowCount(long expected,
+ String tableName,
+ Connection connection) {
+ Statement stmt = null;
+ ResultSet rs = null;
+ try {
+ stmt = connection.createStatement();
+ rs = stmt.executeQuery("SELECT count(*) FROM " + tableName);
+
+ rs.next();
+
+ assertEquals(expected, rs.getLong(1));
+ } catch (SQLException e) {
+ LOG.error("Can't verify number of rows", e);
+ fail();
+ } finally {
+ try {
+ connection.commit();
+
+ if (stmt != null) {
+ stmt.close();
+ }
+ if (rs != null) {
+ rs.close();
+ }
+ } catch (SQLException ex) {
+ LOG.info("Ignored exception in finally block.");
+ }
+ }
+ }
+
+ public String quoteTableOrSchemaName(String tableName) {
+ return "\"" + tableName + "\"";
+ }
+}
http://git-wip-us.apache.org/repos/asf/sqoop/blob/6984a36c/src/test/org/apache/sqoop/manager/postgresql/PostgresqlExternalTableImportTest.java
----------------------------------------------------------------------
diff --git a/src/test/org/apache/sqoop/manager/postgresql/PostgresqlExternalTableImportTest.java b/src/test/org/apache/sqoop/manager/postgresql/PostgresqlExternalTableImportTest.java
new file mode 100644
index 0000000..dd4cfb4
--- /dev/null
+++ b/src/test/org/apache/sqoop/manager/postgresql/PostgresqlExternalTableImportTest.java
@@ -0,0 +1,287 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.sqoop.manager.postgresql;
+
+import static org.junit.Assert.assertEquals;
+import static org.junit.Assert.assertTrue;
+import static org.junit.Assert.fail;
+
+import java.io.BufferedReader;
+import java.io.File;
+import java.io.FileInputStream;
+import java.io.IOException;
+import java.io.InputStreamReader;
+import java.sql.Connection;
+import java.sql.SQLException;
+import java.sql.Statement;
+import java.util.ArrayList;
+import java.util.Arrays;
+
+import org.apache.commons.logging.Log;
+import org.apache.commons.logging.LogFactory;
+import org.apache.hadoop.fs.Path;
+import org.apache.hadoop.io.IOUtils;
+import org.apache.sqoop.manager.ConnManager;
+import org.apache.sqoop.manager.PostgresqlManager;
+import org.junit.After;
+import org.junit.Before;
+import org.junit.Test;
+
+import org.apache.sqoop.SqoopOptions;
+import org.apache.sqoop.testutil.CommonArgs;
+import org.apache.sqoop.testutil.ImportJobTestCase;
+import org.apache.sqoop.util.FileListing;
+
+public class PostgresqlExternalTableImportTest extends ImportJobTestCase {
+
+ public static final Log LOG = LogFactory
+ .getLog(PostgresqlExternalTableImportTest.class.getName());
+ static final String HOST_URL = System.getProperty("sqoop.test.postgresql.connectstring.host_url",
+ "jdbc:postgresql://localhost/");
+ static final String DATABASE_USER = System.getProperty(
+ "sqoop.test.postgresql.username", "sqooptest");
+ static final String DATABASE_NAME = System.getProperty(
+ "sqoop.test.postgresql.database", "sqooptest");
+ static final String PASSWORD = System.getProperty("sqoop.test.postgresql.password");
+
+ static final String TABLE_NAME = "EMPLOYEES_PG";
+ static final String NULL_TABLE_NAME = "NULL_EMPLOYEES_PG";
+ static final String SPECIAL_TABLE_NAME = "EMPLOYEES_PG's";
+ static final String DIFFERENT_TABLE_NAME = "DIFFERENT_TABLE";
+ static final String SCHEMA_PUBLIC = "public";
+ static final String SCHEMA_SPECIAL = "special";
+ static final String CONNECT_STRING = HOST_URL + DATABASE_NAME;
+ static final String EXTERNAL_TABLE_DIR = "/tmp/external/employees_pg";
+ protected Connection connection;
+
+ @Override
+ protected boolean useHsqldbTestServer() {
+ return false;
+ }
+
+ public String quoteTableOrSchemaName(String tableName) {
+ return "\"" + tableName + "\"";
+ }
+
+ private String getDropTableStatement(String tableName, String schema) {
+ return "DROP TABLE IF EXISTS " + quoteTableOrSchemaName(schema) + "."
+ + quoteTableOrSchemaName(tableName);
+ }
+
+ @Before
+ public void setUp() {
+ super.setUp();
+
+ LOG.debug("Setting up another postgresql test: " + CONNECT_STRING);
+
+ setUpData(TABLE_NAME, SCHEMA_PUBLIC, false);
+ setUpData(NULL_TABLE_NAME, SCHEMA_PUBLIC, true);
+ setUpData(SPECIAL_TABLE_NAME, SCHEMA_PUBLIC, false);
+ setUpData(DIFFERENT_TABLE_NAME, SCHEMA_SPECIAL, false);
+
+ LOG.debug("setUp complete.");
+ }
+
+ @After
+ public void tearDown() {
+ try {
+ Statement stmt = connection.createStatement();
+ stmt.executeUpdate(getDropTableStatement(TABLE_NAME, SCHEMA_PUBLIC));
+ stmt.executeUpdate(getDropTableStatement(NULL_TABLE_NAME, SCHEMA_PUBLIC));
+ stmt.executeUpdate(getDropTableStatement(SPECIAL_TABLE_NAME, SCHEMA_PUBLIC));
+ stmt.executeUpdate(getDropTableStatement(DIFFERENT_TABLE_NAME, SCHEMA_SPECIAL));
+ } catch (SQLException e) {
+ LOG.error("Can't clean up the database:", e);
+ }
+
+ super.tearDown();
+
+ try {
+ connection.close();
+ } catch (SQLException e) {
+ LOG.error("Ignoring exception in tearDown", e);
+ }
+ }
+
+ public void setUpData(String tableName, String schema, boolean nullEntry) {
+ SqoopOptions options = new SqoopOptions(CONNECT_STRING, tableName);
+ options.setUsername(DATABASE_USER);
+ options.setPassword(PASSWORD);
+
+ ConnManager manager = null;
+ Statement st = null;
+
+ try {
+ manager = new PostgresqlManager(options);
+ connection = manager.getConnection();
+ connection.setAutoCommit(false);
+ st = connection.createStatement();
+
+ // Create schema if not exists in dummy way (always create and ignore
+ // errors.
+ try {
+ st.executeUpdate("CREATE SCHEMA " + manager.escapeTableName(schema));
+ connection.commit();
+ } catch (SQLException e) {
+ LOG.info("Couldn't create schema " + schema + " (is o.k. as long as"
+ + "the schema already exists.");
+ connection.rollback();
+ }
+
+ String fullTableName = manager.escapeTableName(schema) + "."
+ + manager.escapeTableName(tableName);
+ LOG.info("Creating table: " + fullTableName);
+
+ try {
+ // Try to remove the table first. DROP TABLE IF EXISTS didn't
+ // get added until pg 8.3, so we just use "DROP TABLE" and ignore
+ // any exception here if one occurs.
+ st.executeUpdate("DROP TABLE " + fullTableName);
+ } catch (SQLException e) {
+ LOG.info("Couldn't drop table " + schema + "." + tableName + " (ok)");
+ // Now we need to reset the transaction.
+ connection.rollback();
+ }
+
+ st.executeUpdate("CREATE TABLE " + fullTableName + " (" + manager.escapeColName("id")
+ + " INT NOT NULL PRIMARY KEY, " + manager.escapeColName("name")
+ + " VARCHAR(24) NOT NULL, " + manager.escapeColName("start_date") + " DATE, "
+ + manager.escapeColName("Salary") + " FLOAT, " + manager.escapeColName("Fired")
+ + " BOOL, " + manager.escapeColName("dept") + " VARCHAR(32))");
+
+ st.executeUpdate("INSERT INTO " + fullTableName
+ + " VALUES(1,'Aaron','2009-05-14',1000000.00,TRUE,'engineering')");
+ st.executeUpdate("INSERT INTO " + fullTableName
+ + " VALUES(2,'Bob','2009-04-20',400.00,TRUE,'sales')");
+ st.executeUpdate("INSERT INTO " + fullTableName
+ + " VALUES(3,'Fred','2009-01-23',15.00,FALSE,'marketing')");
+ if (nullEntry) {
+ st.executeUpdate("INSERT INTO " + fullTableName + " VALUES(4,'Mike',NULL,NULL,NULL,NULL)");
+
+ }
+ connection.commit();
+ } catch (SQLException sqlE) {
+ LOG.error("Encountered SQL Exception: " + sqlE);
+ sqlE.printStackTrace();
+ fail("SQLException when running test setUp(): " + sqlE);
+ } finally {
+ try {
+ if (null != st) {
+ st.close();
+ }
+
+ if (null != manager) {
+ manager.close();
+ }
+ } catch (SQLException sqlE) {
+ LOG.warn("Got SQLException when closing connection: " + sqlE);
+ }
+ }
+
+ LOG.debug("setUp complete.");
+ }
+
+ private String[] getArgv(boolean isDirect, String tableName, String... extraArgs) {
+ ArrayList<String> args = new ArrayList<String>();
+
+ CommonArgs.addHadoopFlags(args);
+
+ args.add("--table");
+ args.add(tableName);
+ args.add("--external-table-dir");
+ args.add(EXTERNAL_TABLE_DIR);
+ args.add("--hive-import");
+ args.add("--warehouse-dir");
+ args.add(getWarehouseDir());
+ args.add("--connect");
+ args.add(CONNECT_STRING);
+ args.add("--username");
+ args.add(DATABASE_USER);
+ args.add("--password");
+ args.add(PASSWORD);
+ args.add("--where");
+ args.add("id > 1");
+ args.add("-m");
+ args.add("1");
+
+ if (isDirect) {
+ args.add("--direct");
+ }
+
+ for (String arg : extraArgs) {
+ args.add(arg);
+ }
+
+ return args.toArray(new String[0]);
+ }
+
+ private void doImportAndVerify(boolean isDirect, String[] expectedResults, String tableName,
+ String... extraArgs) throws IOException {
+
+ Path tablePath = new Path(EXTERNAL_TABLE_DIR);
+
+ // if importing with merge step, directory should exist and output should be
+ // from a reducer
+ boolean isMerge = Arrays.asList(extraArgs).contains("--merge-key");
+ Path filePath = new Path(tablePath, isMerge ? "part-r-00000" : "part-m-00000");
+
+ File tableFile = new File(tablePath.toString());
+ if (tableFile.exists() && tableFile.isDirectory() && !isMerge) {
+ // remove the directory before running the import.
+ FileListing.recursiveDeleteDir(tableFile);
+ }
+
+ String[] argv = getArgv(isDirect, tableName, extraArgs);
+ try {
+ runImport(argv);
+ } catch (IOException ioe) {
+ LOG.error("Got IOException during import: " + ioe.toString());
+ ioe.printStackTrace();
+ fail(ioe.toString());
+ }
+
+ File f = new File(filePath.toString());
+ assertTrue("Could not find imported data file, " + f, f.exists());
+ BufferedReader r = null;
+ try {
+ // Read through the file and make sure it's all there.
+ r = new BufferedReader(new InputStreamReader(new FileInputStream(f)));
+ for (String expectedLine : expectedResults) {
+ assertEquals(expectedLine, r.readLine());
+ }
+ } catch (IOException ioe) {
+ LOG.error("Got IOException verifying results: " + ioe.toString());
+ ioe.printStackTrace();
+ fail(ioe.toString());
+ } finally {
+ IOUtils.closeStream(r);
+ }
+ }
+
+ @Test
+ public void testJdbcBasedImport() throws IOException {
+ // separator is different to other tests
+ // because the CREATE EXTERNAL TABLE DDL is
+ // ROW FORMAT DELIMITED FIELDS TERMINATED BY '\001'
+ char sep = '\001';
+ String[] expectedResults = {
+ "2" + sep + "Bob" + sep + "2009-04-20" + sep + "400.0" + sep + "true" + sep + "sales",
+ "3" + sep + "Fred" + sep + "2009-01-23" + sep + "15.0" + sep + "false" + sep + "marketing" };
+ doImportAndVerify(false, expectedResults, TABLE_NAME);
+ }
+}
http://git-wip-us.apache.org/repos/asf/sqoop/blob/6984a36c/src/test/org/apache/sqoop/manager/postgresql/PostgresqlImportTest.java
----------------------------------------------------------------------
diff --git a/src/test/org/apache/sqoop/manager/postgresql/PostgresqlImportTest.java b/src/test/org/apache/sqoop/manager/postgresql/PostgresqlImportTest.java
new file mode 100644
index 0000000..846228a
--- /dev/null
+++ b/src/test/org/apache/sqoop/manager/postgresql/PostgresqlImportTest.java
@@ -0,0 +1,468 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.sqoop.manager.postgresql;
+
+import java.io.BufferedReader;
+import java.io.IOException;
+import java.io.InputStreamReader;
+import java.io.FileInputStream;
+import java.io.File;
+import java.sql.Connection;
+import java.sql.SQLException;
+import java.sql.Statement;
+import java.util.ArrayList;
+import java.util.Arrays;
+
+import org.apache.commons.logging.Log;
+import org.apache.commons.logging.LogFactory;
+import org.junit.After;
+import org.junit.Before;
+import org.junit.Test;
+
+import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.fs.Path;
+import org.apache.hadoop.io.IOUtils;
+import org.apache.sqoop.manager.ConnManager;
+import org.apache.sqoop.manager.PostgresqlManager;
+import org.apache.sqoop.SqoopOptions;
+import org.apache.sqoop.testutil.CommonArgs;
+import org.apache.sqoop.testutil.ImportJobTestCase;
+import org.apache.sqoop.util.FileListing;
+
+import static org.junit.Assert.assertEquals;
+import static org.junit.Assert.assertTrue;
+import static org.junit.Assert.fail;
+
+/**
+ * Test the PostgresqlManager and DirectPostgresqlManager implementations.
+ * The former uses the postgres JDBC driver to perform an import;
+ * the latter uses pg_dump to facilitate it.
+ *
+ * Since this requires a Postgresql installation on your local machine to use,
+ * this class is named in such a way that Hadoop's default QA process does not
+ * run it. You need to run this manually with -Dtestcase=PostgresqlImportTest or
+ * -Dthirdparty=true.
+ *
+ * You need to put Postgresql's JDBC driver library into a location where
+ * Hadoop can access it (e.g., $HADOOP_HOME/lib).
+ *
+ * To configure a postgresql database to allow local connections, put the
+ * following in /etc/postgresql/8.3/main/pg_hba.conf:
+ * local all all trust
+ * host all all 127.0.0.1/32 trust
+ * host all all ::1/128 trust
+ *
+ * ... and comment out any other lines referencing 127.0.0.1 or ::1.
+ *
+ * Also in the file /etc/postgresql/8.3/main/postgresql.conf, uncomment
+ * the line that starts with listen_addresses and set its value to '*' as
+ * follows
+ * listen_addresses = '*'
+ *
+ * For postgresql 8.1, this may be in /var/lib/pgsql/data, instead. You may
+ * need to restart the postgresql service after modifying this file.
+ *
+ * You should also create a sqooptest user and database:
+ *
+ * $ sudo -u postgres psql -U postgres template1
+ * template1=> CREATE USER sqooptest;
+ * template1=> CREATE DATABASE sqooptest;
+ * template1=> GRANT ALL ON DATABASE sqooptest TO sqooptest;
+ * template1=> \q
+ *
+ */
+public class PostgresqlImportTest extends ImportJobTestCase {
+
+ public static final Log LOG = LogFactory.getLog(
+ PostgresqlImportTest.class.getName());
+
+ static final String HOST_URL = System.getProperty(
+ "sqoop.test.postgresql.connectstring.host_url",
+ "jdbc:postgresql://localhost/");
+ static final String DATABASE_USER = System.getProperty(
+ "sqoop.test.postgresql.username",
+ "sqooptest");
+ static final String DATABASE_NAME = System.getProperty(
+ "sqoop.test.postgresql.database",
+ "sqooptest");
+ static final String PASSWORD = System.getProperty(
+ "sqoop.test.postgresql.password");
+
+ static final String TABLE_NAME = "EMPLOYEES_PG";
+ static final String NULL_TABLE_NAME = "NULL_EMPLOYEES_PG";
+ static final String SPECIAL_TABLE_NAME = "EMPLOYEES_PG's";
+ static final String DIFFERENT_TABLE_NAME = "DIFFERENT_TABLE";
+ static final String SCHEMA_PUBLIC = "public";
+ static final String SCHEMA_SPECIAL = "special";
+ static final String CONNECT_STRING = HOST_URL + DATABASE_NAME;
+
+ protected Connection connection;
+
+ @Override
+ protected boolean useHsqldbTestServer() {
+ return false;
+ }
+
+ public String quoteTableOrSchemaName(String tableName) {
+ return "\"" + tableName + "\"";
+ }
+
+ private String getDropTableStatement(String tableName, String schema) {
+ return "DROP TABLE IF EXISTS " + quoteTableOrSchemaName(schema) + "." + quoteTableOrSchemaName(tableName);
+ }
+
+ @Before
+ public void setUp() {
+ super.setUp();
+
+ LOG.debug("Setting up another postgresql test: " + CONNECT_STRING);
+
+ setUpData(TABLE_NAME, SCHEMA_PUBLIC, false);
+ setUpData(NULL_TABLE_NAME, SCHEMA_PUBLIC, true);
+ setUpData(SPECIAL_TABLE_NAME, SCHEMA_PUBLIC, false);
+ setUpData(DIFFERENT_TABLE_NAME, SCHEMA_SPECIAL, false);
+
+ LOG.debug("setUp complete.");
+ }
+
+ @After
+ public void tearDown() {
+ try {
+ Statement stmt = connection.createStatement();
+ stmt.executeUpdate(getDropTableStatement(TABLE_NAME, SCHEMA_PUBLIC));
+ stmt.executeUpdate(getDropTableStatement(NULL_TABLE_NAME, SCHEMA_PUBLIC));
+ stmt.executeUpdate(getDropTableStatement(SPECIAL_TABLE_NAME, SCHEMA_PUBLIC));
+ stmt.executeUpdate(getDropTableStatement(DIFFERENT_TABLE_NAME, SCHEMA_SPECIAL));
+ } catch (SQLException e) {
+ LOG.error("Can't clean up the database:", e);
+ }
+
+ super.tearDown();
+
+ try {
+ connection.close();
+ } catch (SQLException e) {
+ LOG.error("Ignoring exception in tearDown", e);
+ }
+ }
+
+
+
+ public void setUpData(String tableName, String schema, boolean nullEntry) {
+ SqoopOptions options = new SqoopOptions(CONNECT_STRING, tableName);
+ options.setUsername(DATABASE_USER);
+ options.setPassword(PASSWORD);
+
+ ConnManager manager = null;
+ Statement st = null;
+
+ try {
+ manager = new PostgresqlManager(options);
+ connection = manager.getConnection();
+ connection.setAutoCommit(false);
+ st = connection.createStatement();
+
+ // Create schema if not exists in dummy way (always create and ignore
+ // errors.
+ try {
+ st.executeUpdate("CREATE SCHEMA " + manager.escapeTableName(schema));
+ connection.commit();
+ } catch (SQLException e) {
+ LOG.info("Couldn't create schema " + schema + " (is o.k. as long as"
+ + "the schema already exists.");
+ connection.rollback();
+ }
+
+ String fullTableName = manager.escapeTableName(schema)
+ + "." + manager.escapeTableName(tableName);
+ LOG.info("Creating table: " + fullTableName);
+
+ try {
+ // Try to remove the table first. DROP TABLE IF EXISTS didn't
+ // get added until pg 8.3, so we just use "DROP TABLE" and ignore
+ // any exception here if one occurs.
+ st.executeUpdate("DROP TABLE " + fullTableName);
+ } catch (SQLException e) {
+ LOG.info("Couldn't drop table " + schema + "." + tableName + " (ok)");
+ // Now we need to reset the transaction.
+ connection.rollback();
+ }
+
+ st.executeUpdate("CREATE TABLE " + fullTableName + " ("
+ + manager.escapeColName("id") + " INT NOT NULL PRIMARY KEY, "
+ + manager.escapeColName("name") + " VARCHAR(24) NOT NULL, "
+ + manager.escapeColName("start_date") + " DATE, "
+ + manager.escapeColName("Salary") + " FLOAT, "
+ + manager.escapeColName("Fired") + " BOOL, "
+ + manager.escapeColName("dept") + " VARCHAR(32))");
+
+ st.executeUpdate("INSERT INTO " + fullTableName
+ + " VALUES(1,'Aaron','2009-05-14',1000000.00,TRUE,'engineering')");
+ st.executeUpdate("INSERT INTO " + fullTableName
+ + " VALUES(2,'Bob','2009-04-20',400.00,TRUE,'sales')");
+ st.executeUpdate("INSERT INTO " + fullTableName
+ + " VALUES(3,'Fred','2009-01-23',15.00,FALSE,'marketing')");
+ if (nullEntry) {
+ st.executeUpdate("INSERT INTO " + fullTableName
+ + " VALUES(4,'Mike',NULL,NULL,NULL,NULL)");
+
+ }
+ connection.commit();
+ } catch (SQLException sqlE) {
+ LOG.error("Encountered SQL Exception: " + sqlE);
+ sqlE.printStackTrace();
+ fail("SQLException when running test setUp(): " + sqlE);
+ } finally {
+ try {
+ if (null != st) {
+ st.close();
+ }
+
+ if (null != manager) {
+ manager.close();
+ }
+ } catch (SQLException sqlE) {
+ LOG.warn("Got SQLException when closing connection: " + sqlE);
+ }
+ }
+
+ LOG.debug("setUp complete.");
+ }
+
+
+ private String [] getArgv(boolean isDirect, String tableName,
+ String... extraArgs) {
+ ArrayList<String> args = new ArrayList<String>();
+
+ CommonArgs.addHadoopFlags(args);
+
+ args.add("--table");
+ args.add(tableName);
+ args.add("--warehouse-dir");
+ args.add(getWarehouseDir());
+ args.add("--connect");
+ args.add(CONNECT_STRING);
+ args.add("--username");
+ args.add(DATABASE_USER);
+ args.add("--password");
+ args.add(PASSWORD);
+ args.add("--where");
+ args.add("id > 1");
+ args.add("-m");
+ args.add("1");
+
+ if (isDirect) {
+ args.add("--direct");
+ }
+
+ for (String arg : extraArgs) {
+ args.add(arg);
+ }
+
+ return args.toArray(new String[0]);
+ }
+
+ private void doImportAndVerify(boolean isDirect, String[] expectedResults,
+ String tableName, String... extraArgs) throws IOException {
+
+ Path warehousePath = new Path(this.getWarehouseDir());
+ Path tablePath = new Path(warehousePath, tableName);
+
+ // if importing with merge step, directory should exist and output should be from a reducer
+ boolean isMerge = Arrays.asList(extraArgs).contains("--merge-key");
+ Path filePath = new Path(tablePath, isMerge ? "part-r-00000" : "part-m-00000");
+
+ File tableFile = new File(tablePath.toString());
+ if (tableFile.exists() && tableFile.isDirectory() && !isMerge) {
+ // remove the directory before running the import.
+ FileListing.recursiveDeleteDir(tableFile);
+ }
+
+ String [] argv = getArgv(isDirect, tableName, extraArgs);
+ try {
+ runImport(argv);
+ } catch (IOException ioe) {
+ LOG.error("Got IOException during import: " + ioe.toString());
+ ioe.printStackTrace();
+ fail(ioe.toString());
+ }
+
+ File f = new File(filePath.toString());
+ assertTrue("Could not find imported data file, " + f, f.exists());
+ BufferedReader r = null;
+ try {
+ // Read through the file and make sure it's all there.
+ r = new BufferedReader(new InputStreamReader(new FileInputStream(f)));
+ for (String expectedLine : expectedResults) {
+ assertEquals(expectedLine, r.readLine());
+ }
+ } catch (IOException ioe) {
+ LOG.error("Got IOException verifying results: " + ioe.toString());
+ ioe.printStackTrace();
+ fail(ioe.toString());
+ } finally {
+ IOUtils.closeStream(r);
+ }
+ }
+
+ @Test
+ public void testJdbcBasedImport() throws IOException {
+ String [] expectedResults = {
+ "2,Bob,2009-04-20,400.0,true,sales",
+ "3,Fred,2009-01-23,15.0,false,marketing",
+ };
+
+ doImportAndVerify(false, expectedResults, TABLE_NAME);
+ }
+
+ @Test
+ public void testDirectImport() throws IOException {
+ String [] expectedResults = {
+ "2,Bob,2009-04-20,400,TRUE,sales",
+ "3,Fred,2009-01-23,15,FALSE,marketing",
+ };
+
+ doImportAndVerify(true, expectedResults, TABLE_NAME);
+ }
+
+ @Test
+ public void testListTables() throws IOException {
+ SqoopOptions options = new SqoopOptions(new Configuration());
+ options.setConnectString(CONNECT_STRING);
+ options.setUsername(DATABASE_USER);
+ options.setPassword(PASSWORD);
+
+ ConnManager mgr = new PostgresqlManager(options);
+ String[] tables = mgr.listTables();
+ Arrays.sort(tables);
+ assertTrue(TABLE_NAME + " is not found!",
+ Arrays.binarySearch(tables, TABLE_NAME) >= 0);
+ }
+
+ @Test
+ public void testTableNameWithSpecialCharacter() throws IOException {
+ String [] expectedResults = {
+ "2,Bob,2009-04-20,400.0,true,sales",
+ "3,Fred,2009-01-23,15.0,false,marketing",
+ };
+
+ doImportAndVerify(false, expectedResults, SPECIAL_TABLE_NAME);
+ }
+
+ @Test
+ public void testIncrementalImport() throws IOException {
+ String [] expectedResults = { };
+
+ String [] extraArgs = { "--incremental", "lastmodified",
+ "--check-column", "start_date",
+ };
+
+ doImportAndVerify(false, expectedResults, TABLE_NAME, extraArgs);
+ }
+
+ @Test
+ public void testDirectIncrementalImport() throws IOException {
+ String [] expectedResults = { };
+
+ String [] extraArgs = { "--incremental", "lastmodified",
+ "--check-column", "start_date",
+ };
+
+ doImportAndVerify(true, expectedResults, TABLE_NAME, extraArgs);
+ }
+
+ @Test
+ public void testDirectIncrementalImportMerge() throws IOException {
+ String [] expectedResults = { };
+
+ String [] extraArgs = { "--incremental", "lastmodified",
+ "--check-column", "start_date",
+ };
+
+ doImportAndVerify(true, expectedResults, TABLE_NAME, extraArgs);
+
+ extraArgs = new String[] { "--incremental", "lastmodified",
+ "--check-column", "start_date",
+ "--merge-key", "id",
+ "--last-value", "2009-04-20"
+ };
+
+ doImportAndVerify(true, expectedResults, TABLE_NAME, extraArgs);
+ }
+
+ @Test
+ public void testDifferentSchemaImport() throws IOException {
+ String [] expectedResults = {
+ "2,Bob,2009-04-20,400.0,true,sales",
+ "3,Fred,2009-01-23,15.0,false,marketing",
+ };
+
+ String [] extraArgs = { "--",
+ "--schema", SCHEMA_SPECIAL,
+ };
+
+ doImportAndVerify(false, expectedResults, DIFFERENT_TABLE_NAME, extraArgs);
+ }
+
+ @Test
+ public void testDifferentSchemaImportDirect() throws IOException {
+ String [] expectedResults = {
+ "2,Bob,2009-04-20,400,TRUE,sales",
+ "3,Fred,2009-01-23,15,FALSE,marketing",
+ };
+
+ String [] extraArgs = { "--",
+ "--schema", SCHEMA_SPECIAL,
+ };
+
+ doImportAndVerify(true, expectedResults, DIFFERENT_TABLE_NAME, extraArgs);
+ }
+
+ @Test
+ public void testNullEscapeCharacters() throws Exception {
+ String [] expectedResults = {
+ "2,Bob,2009-04-20,400,TRUE,sales",
+ "3,Fred,2009-01-23,15,FALSE,marketing",
+ "4,Mike,\\N,\\N,\\N,\\N",
+ };
+
+ String [] extraArgs = {
+ "--null-string", "\\\\\\\\N",
+ "--null-non-string", "\\\\\\\\N",
+ };
+
+ doImportAndVerify(true, expectedResults, NULL_TABLE_NAME, extraArgs);
+ }
+
+ @Test
+ public void testDifferentBooleanValues() throws Exception {
+ String [] expectedResults = {
+ "2,Bob,2009-04-20,400,REAL_TRUE,sales",
+ "3,Fred,2009-01-23,15,REAL_FALSE,marketing",
+ };
+
+ String [] extraArgs = {
+ "--",
+ "--boolean-true-string", "REAL_TRUE",
+ "--boolean-false-string", "REAL_FALSE",
+ };
+
+ doImportAndVerify(true, expectedResults, TABLE_NAME, extraArgs);
+ }
+}
http://git-wip-us.apache.org/repos/asf/sqoop/blob/6984a36c/src/test/org/apache/sqoop/manager/sqlserver/ManagerCompatExport.java
----------------------------------------------------------------------
diff --git a/src/test/org/apache/sqoop/manager/sqlserver/ManagerCompatExport.java b/src/test/org/apache/sqoop/manager/sqlserver/ManagerCompatExport.java
index be2b22c..15672b1 100644
--- a/src/test/org/apache/sqoop/manager/sqlserver/ManagerCompatExport.java
+++ b/src/test/org/apache/sqoop/manager/sqlserver/ManagerCompatExport.java
@@ -34,10 +34,10 @@ import org.apache.sqoop.manager.sqlserver.MSSQLTestData.KEY_STRINGS;
import org.apache.sqoop.manager.sqlserver.MSSQLTestDataFileParser.DATATYPES;
import org.junit.Before;
import org.junit.Test;
-import com.cloudera.sqoop.Sqoop;
-import com.cloudera.sqoop.SqoopOptions;
-import com.cloudera.sqoop.testutil.ExportJobTestCase;
-import com.cloudera.sqoop.tool.ExportTool;
+import org.apache.sqoop.Sqoop;
+import org.apache.sqoop.SqoopOptions;
+import org.apache.sqoop.testutil.ExportJobTestCase;
+import org.apache.sqoop.tool.ExportTool;
import static org.junit.Assert.assertEquals;
import static org.junit.Assert.assertNull;
http://git-wip-us.apache.org/repos/asf/sqoop/blob/6984a36c/src/test/org/apache/sqoop/manager/sqlserver/SQLServerDatatypeExportSequenceFileTest.java
----------------------------------------------------------------------
diff --git a/src/test/org/apache/sqoop/manager/sqlserver/SQLServerDatatypeExportSequenceFileTest.java b/src/test/org/apache/sqoop/manager/sqlserver/SQLServerDatatypeExportSequenceFileTest.java
index a68ed30..293da00 100644
--- a/src/test/org/apache/sqoop/manager/sqlserver/SQLServerDatatypeExportSequenceFileTest.java
+++ b/src/test/org/apache/sqoop/manager/sqlserver/SQLServerDatatypeExportSequenceFileTest.java
@@ -31,11 +31,11 @@ import org.apache.hadoop.io.SequenceFile;
import org.apache.hadoop.util.ReflectionUtils;
import org.apache.sqoop.manager.sqlserver.MSSQLTestDataFileParser.DATATYPES;
-import com.cloudera.sqoop.SqoopOptions;
-import com.cloudera.sqoop.lib.RecordParser;
-import com.cloudera.sqoop.lib.SqoopRecord;
-import com.cloudera.sqoop.tool.CodeGenTool;
-import com.cloudera.sqoop.util.ClassLoaderStack;
+import org.apache.sqoop.SqoopOptions;
+import org.apache.sqoop.lib.RecordParser;
+import org.apache.sqoop.lib.SqoopRecord;
+import org.apache.sqoop.tool.CodeGenTool;
+import org.apache.sqoop.util.ClassLoaderStack;
import static org.junit.Assert.assertEquals;
import static org.junit.Assert.assertNotNull;
http://git-wip-us.apache.org/repos/asf/sqoop/blob/6984a36c/src/test/org/apache/sqoop/manager/sqlserver/SQLServerDatatypeImportDelimitedFileTest.java
----------------------------------------------------------------------
diff --git a/src/test/org/apache/sqoop/manager/sqlserver/SQLServerDatatypeImportDelimitedFileTest.java b/src/test/org/apache/sqoop/manager/sqlserver/SQLServerDatatypeImportDelimitedFileTest.java
index a4d1822..520c4ac 100644
--- a/src/test/org/apache/sqoop/manager/sqlserver/SQLServerDatatypeImportDelimitedFileTest.java
+++ b/src/test/org/apache/sqoop/manager/sqlserver/SQLServerDatatypeImportDelimitedFileTest.java
@@ -31,12 +31,12 @@ import org.apache.hadoop.fs.Path;
import org.apache.hadoop.io.IOUtils;
import org.apache.hadoop.util.StringUtils;
import org.apache.sqoop.manager.sqlserver.MSSQLTestDataFileParser.DATATYPES;
-import com.cloudera.sqoop.Sqoop;
-import com.cloudera.sqoop.SqoopOptions;
-import com.cloudera.sqoop.orm.CompilationManager;
-import com.cloudera.sqoop.testutil.CommonArgs;
-import com.cloudera.sqoop.tool.ImportTool;
-import com.cloudera.sqoop.util.ClassLoaderStack;
+import org.apache.sqoop.Sqoop;
+import org.apache.sqoop.SqoopOptions;
+import org.apache.sqoop.orm.CompilationManager;
+import org.apache.sqoop.testutil.CommonArgs;
+import org.apache.sqoop.tool.ImportTool;
+import org.apache.sqoop.util.ClassLoaderStack;
import org.junit.Ignore;
import org.junit.Test;
http://git-wip-us.apache.org/repos/asf/sqoop/blob/6984a36c/src/test/org/apache/sqoop/manager/sqlserver/SQLServerDatatypeImportSequenceFileTest.java
----------------------------------------------------------------------
diff --git a/src/test/org/apache/sqoop/manager/sqlserver/SQLServerDatatypeImportSequenceFileTest.java b/src/test/org/apache/sqoop/manager/sqlserver/SQLServerDatatypeImportSequenceFileTest.java
index 409c4ad..592a78f 100644
--- a/src/test/org/apache/sqoop/manager/sqlserver/SQLServerDatatypeImportSequenceFileTest.java
+++ b/src/test/org/apache/sqoop/manager/sqlserver/SQLServerDatatypeImportSequenceFileTest.java
@@ -33,8 +33,8 @@ import org.apache.hadoop.util.StringUtils;
import org.junit.After;
import org.junit.Before;
import org.junit.Test;
-import com.cloudera.sqoop.SqoopOptions;
-import com.cloudera.sqoop.testutil.ManagerCompatTestCase;
+import org.apache.sqoop.SqoopOptions;
+import org.apache.sqoop.testutil.ManagerCompatTestCase;
import org.apache.sqoop.manager.sqlserver.MSSQLTestDataFileParser.DATATYPES;
import org.apache.sqoop.manager.sqlserver.MSSQLTestData.KEY_STRINGS;
http://git-wip-us.apache.org/repos/asf/sqoop/blob/6984a36c/src/test/org/apache/sqoop/manager/sqlserver/SQLServerHiveImportTest.java
----------------------------------------------------------------------
diff --git a/src/test/org/apache/sqoop/manager/sqlserver/SQLServerHiveImportTest.java b/src/test/org/apache/sqoop/manager/sqlserver/SQLServerHiveImportTest.java
index 535e599..e6b0865 100644
--- a/src/test/org/apache/sqoop/manager/sqlserver/SQLServerHiveImportTest.java
+++ b/src/test/org/apache/sqoop/manager/sqlserver/SQLServerHiveImportTest.java
@@ -26,10 +26,10 @@ import java.util.ArrayList;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.util.StringUtils;
-import com.cloudera.sqoop.SqoopOptions;
-import com.cloudera.sqoop.hive.TestHiveImport;
-import com.cloudera.sqoop.testutil.CommonArgs;
-import com.cloudera.sqoop.tool.SqoopTool;
+import org.apache.sqoop.SqoopOptions;
+import org.apache.sqoop.hive.TestHiveImport;
+import org.apache.sqoop.testutil.CommonArgs;
+import org.apache.sqoop.tool.SqoopTool;
import org.junit.After;
import org.junit.Before;
http://git-wip-us.apache.org/repos/asf/sqoop/blob/6984a36c/src/test/org/apache/sqoop/manager/sqlserver/SQLServerManagerExportTest.java
----------------------------------------------------------------------
diff --git a/src/test/org/apache/sqoop/manager/sqlserver/SQLServerManagerExportTest.java b/src/test/org/apache/sqoop/manager/sqlserver/SQLServerManagerExportTest.java
new file mode 100644
index 0000000..b7c2b75
--- /dev/null
+++ b/src/test/org/apache/sqoop/manager/sqlserver/SQLServerManagerExportTest.java
@@ -0,0 +1,474 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.sqoop.manager.sqlserver;
+
+import org.apache.sqoop.ConnFactory;
+import org.apache.sqoop.SqoopOptions;
+import org.apache.sqoop.testutil.CommonArgs;
+import org.apache.commons.logging.Log;
+import org.apache.commons.logging.LogFactory;
+import org.apache.sqoop.manager.SQLServerManager;
+import org.apache.sqoop.testutil.ExportJobTestCase;
+import org.apache.hadoop.conf.Configuration;
+import org.junit.After;
+import org.junit.Before;
+import org.junit.Test;
+
+import java.io.BufferedWriter;
+import java.io.File;
+import java.io.FileWriter;
+import java.io.IOException;
+import java.io.Writer;
+import java.sql.Connection;
+import java.sql.ResultSet;
+import java.sql.SQLException;
+import java.sql.Statement;
+import java.util.ArrayList;
+
+import static org.junit.Assert.assertEquals;
+import static org.junit.Assert.fail;
+
+/**
+ * Please see instructions in SQLServerManagerImportTest.
+ */
+public class SQLServerManagerExportTest extends ExportJobTestCase {
+
+ public static final Log LOG = LogFactory.getLog(
+ SQLServerManagerExportTest.class.getName());
+
+ static final String HOST_URL = System.getProperty(
+ "sqoop.test.sqlserver.connectstring.host_url",
+ "jdbc:sqlserver://sqlserverhost:1433");
+ static final String DATABASE_NAME = System.getProperty(
+ "sqoop.test.sqlserver.database",
+ "sqooptest");
+ static final String DATABASE_USER = System.getProperty(
+ "ms.sqlserver.username",
+ "sqoopuser");
+ static final String DATABASE_PASSWORD = System.getProperty(
+ "ms.sqlserver.password",
+ "password");
+
+ static final String SCHEMA_DBO = "dbo";
+ static final String DBO_TABLE_NAME = "EMPLOYEES_MSSQL";
+ static final String DBO_BINARY_TABLE_NAME = "BINARYTYPE_MSSQL";
+ static final String SCHEMA_SCH = "sch";
+ static final String SCH_TABLE_NAME = "PRIVATE_TABLE";
+ static final String CONNECT_STRING = HOST_URL
+ + ";databaseName=" + DATABASE_NAME;
+
+ static final String CONNECTOR_FACTORY = System.getProperty(
+ "sqoop.test.msserver.connector.factory",
+ ConnFactory.DEFAULT_FACTORY_CLASS_NAMES);
+
+ // instance variables populated during setUp, used during tests
+ private SQLServerManager manager;
+ private Configuration conf = new Configuration();
+ private Connection conn = null;
+
+ @Override
+ protected Configuration getConf() {
+ return conf;
+ }
+
+ @Override
+ protected boolean useHsqldbTestServer() {
+ return false;
+ }
+
+ private String getDropTableStatement(String schema, String tableName) {
+ return "DROP TABLE IF EXISTS " + manager.escapeObjectName(schema)
+ + "." + manager.escapeObjectName(tableName);
+ }
+
+ @Before
+ public void setUp() {
+ super.setUp();
+
+ SqoopOptions options = new SqoopOptions(CONNECT_STRING,
+ DBO_TABLE_NAME);
+ options.setUsername(DATABASE_USER);
+ options.setPassword(DATABASE_PASSWORD);
+
+ manager = new SQLServerManager(options);
+
+ createTableAndPopulateData(SCHEMA_DBO, DBO_TABLE_NAME);
+ createTableAndPopulateData(SCHEMA_SCH, SCH_TABLE_NAME);
+
+ // To test with Microsoft SQL server connector, copy the connector jar to
+ // sqoop.thirdparty.lib.dir and set sqoop.test.msserver.connector.factory
+ // to com.microsoft.sqoop.SqlServer.MSSQLServerManagerFactory. By default,
+ // the built-in SQL server connector is used.
+ conf.setStrings(ConnFactory.FACTORY_CLASS_NAMES_KEY, CONNECTOR_FACTORY);
+ }
+
+ public void createTableAndPopulateData(String schema, String table) {
+ String fulltableName = manager.escapeObjectName(schema)
+ + "." + manager.escapeObjectName(table);
+
+ Statement stmt = null;
+
+ // Create schema if needed
+ try {
+ conn = manager.getConnection();
+ stmt = conn.createStatement();
+ stmt.execute("CREATE SCHEMA " + schema);
+ conn.commit();
+ } catch (SQLException sqlE) {
+ LOG.info("Can't create schema: " + sqlE.getMessage());
+ } finally {
+ try {
+ if (null != stmt) {
+ stmt.close();
+ }
+ } catch (Exception ex) {
+ LOG.warn("Exception while closing stmt", ex);
+ }
+ }
+
+ // Drop the existing table, if there is one.
+ try {
+ conn = manager.getConnection();
+ stmt = conn.createStatement();
+ stmt.execute("DROP TABLE " + fulltableName);
+ conn.commit();
+ } catch (SQLException sqlE) {
+ LOG.info("Table was not dropped: " + sqlE.getMessage());
+ } finally {
+ try {
+ if (null != stmt) {
+ stmt.close();
+ }
+ } catch (Exception ex) {
+ LOG.warn("Exception while closing stmt", ex);
+ }
+ }
+
+ // Create and populate table
+ try {
+ conn = manager.getConnection();
+ conn.setAutoCommit(false);
+ stmt = conn.createStatement();
+
+ // create the database table and populate it with data.
+ stmt.executeUpdate("CREATE TABLE " + fulltableName + " ("
+ + "id INT NOT NULL, "
+ + "name VARCHAR(24) NOT NULL, "
+ + "salary FLOAT, "
+ + "dept VARCHAR(32), "
+ + "PRIMARY KEY (id))");
+ conn.commit();
+ } catch (SQLException sqlE) {
+ LOG.error("Encountered SQL Exception: ", sqlE);
+ sqlE.printStackTrace();
+ fail("SQLException when running test setUp(): " + sqlE);
+ } finally {
+ try {
+ if (null != stmt) {
+ stmt.close();
+ }
+ } catch (Exception ex) {
+ LOG.warn("Exception while closing connection/stmt", ex);
+ }
+ }
+ }
+
+ public void createSQLServerBinaryTypeTable(String schema, String table) {
+ String fulltableName = manager.escapeObjectName(schema)
+ + "." + manager.escapeObjectName(table);
+
+ Statement stmt = null;
+
+ // Create schema if needed
+ try {
+ conn = manager.getConnection();
+ stmt = conn.createStatement();
+ stmt.execute("CREATE SCHEMA " + schema);
+ conn.commit();
+ } catch (SQLException sqlE) {
+ LOG.info("Can't create schema: " + sqlE.getMessage());
+ } finally {
+ try {
+ if (null != stmt) {
+ stmt.close();
+ }
+ } catch (Exception ex) {
+ LOG.warn("Exception while closing stmt", ex);
+ }
+ }
+
+ // Drop the existing table, if there is one.
+ try {
+ conn = manager.getConnection();
+ stmt = conn.createStatement();
+ stmt.execute("DROP TABLE " + fulltableName);
+ conn.commit();
+ } catch (SQLException sqlE) {
+ LOG.info("Table was not dropped: " + sqlE.getMessage());
+ } finally {
+ try {
+ if (null != stmt) {
+ stmt.close();
+ }
+ } catch (Exception ex) {
+ LOG.warn("Exception while closing stmt", ex);
+ }
+ }
+
+ // Create and populate table
+ try {
+ conn = manager.getConnection();
+ conn.setAutoCommit(false);
+ stmt = conn.createStatement();
+
+ // create the database table and populate it with data.
+ stmt.executeUpdate("CREATE TABLE " + fulltableName + " ("
+ + "id INT PRIMARY KEY, "
+ + "b1 BINARY(10), "
+ + "b2 VARBINARY(10))");
+ conn.commit();
+ } catch (SQLException sqlE) {
+ LOG.error("Encountered SQL Exception: ", sqlE);
+ sqlE.printStackTrace();
+ fail("SQLException when running test setUp(): " + sqlE);
+ } finally {
+ try {
+ if (null != stmt) {
+ stmt.close();
+ }
+ } catch (Exception ex) {
+ LOG.warn("Exception while closing connection/stmt", ex);
+ }
+ }
+ }
+
+ @After
+ public void tearDown() {
+ try {
+ Statement stmt = conn.createStatement();
+ stmt.executeUpdate(getDropTableStatement(SCHEMA_DBO, DBO_TABLE_NAME));
+ stmt.executeUpdate(getDropTableStatement(SCHEMA_SCH, SCH_TABLE_NAME));
+ } catch (SQLException e) {
+ LOG.error("Can't clean up the database:", e);
+ }
+
+ super.tearDown();
+ try {
+ conn.close();
+ manager.close();
+ } catch (SQLException sqlE) {
+ LOG.error("Got SQLException: " + sqlE.toString());
+ fail("Got SQLException: " + sqlE.toString());
+ }
+ }
+
+ private String [] getArgv(String tableName,
+ String... extraArgs) {
+ ArrayList<String> args = new ArrayList<String>();
+
+ CommonArgs.addHadoopFlags(args);
+
+ args.add("--table");
+ args.add(tableName);
+ args.add("--export-dir");
+ args.add(getWarehouseDir());
+ args.add("--fields-terminated-by");
+ args.add(",");
+ args.add("--lines-terminated-by");
+ args.add("\\n");
+ args.add("--connect");
+ args.add(CONNECT_STRING);
+ args.add("--username");
+ args.add(DATABASE_USER);
+ args.add("--password");
+ args.add(DATABASE_PASSWORD);
+ args.add("-m");
+ args.add("1");
+
+ for (String arg : extraArgs) {
+ args.add(arg);
+ }
+
+ return args.toArray(new String[0]);
+ }
+
+ protected void createTestFile(String filename,
+ String[] lines)
+ throws IOException {
+ new File(getWarehouseDir()).mkdirs();
+ File file = new File(getWarehouseDir() + "/" + filename);
+ Writer output = new BufferedWriter(new FileWriter(file));
+ for(String line : lines) {
+ output.write(line);
+ output.write("\n");
+ }
+ output.close();
+ }
+
+ @Test
+ public void testExport() throws IOException, SQLException {
+ createTestFile("inputFile", new String[] {
+ "2,Bob,400,sales",
+ "3,Fred,15,marketing",
+ });
+
+ runExport(getArgv(DBO_TABLE_NAME));
+
+ assertRowCount(2, escapeObjectName(DBO_TABLE_NAME), conn);
+ }
+
+ @Test
+ public void testExportCustomSchema() throws IOException, SQLException {
+ createTestFile("inputFile", new String[] {
+ "2,Bob,400,sales",
+ "3,Fred,15,marketing",
+ });
+
+ String[] extra = new String[] {"--",
+ "--schema",
+ SCHEMA_SCH,
+ };
+
+ runExport(getArgv(SCH_TABLE_NAME, extra));
+
+ assertRowCount(
+ 2,
+ escapeObjectName(SCHEMA_SCH) + "." + escapeObjectName(SCH_TABLE_NAME),
+ conn
+ );
+ }
+
+ @Test
+ public void testExportTableHints() throws IOException, SQLException {
+ createTestFile("inputFile", new String[] {
+ "2,Bob,400,sales",
+ "3,Fred,15,marketing",
+ });
+
+ String []extra = new String[] {"--", "--table-hints",
+ "ROWLOCK",
+ };
+ runExport(getArgv(DBO_TABLE_NAME, extra));
+ assertRowCount(2, escapeObjectName(DBO_TABLE_NAME), conn);
+ }
+
+ @Test
+ public void testExportTableHintsMultiple() throws IOException, SQLException {
+ createTestFile("inputFile", new String[] {
+ "2,Bob,400,sales",
+ "3,Fred,15,marketing",
+ });
+
+ String []extra = new String[] {"--", "--table-hints",
+ "ROWLOCK,NOWAIT",
+ };
+ runExport(getArgv(DBO_TABLE_NAME, extra));
+ assertRowCount(2, escapeObjectName(DBO_TABLE_NAME), conn);
+ }
+
+ @Test
+ public void testSQLServerBinaryType() throws IOException, SQLException {
+ createSQLServerBinaryTypeTable(SCHEMA_DBO, DBO_BINARY_TABLE_NAME);
+ createTestFile("inputFile", new String[] {
+ "1,73 65 63 72 65 74 00 00 00 00,73 65 63 72 65 74"
+ });
+ String[] expectedContent = {"73656372657400000000", "736563726574"};
+ runExport(getArgv(DBO_BINARY_TABLE_NAME));
+ assertRowCount(1, escapeObjectName(DBO_BINARY_TABLE_NAME), conn);
+ checkSQLBinaryTableContent(expectedContent, escapeObjectName(DBO_BINARY_TABLE_NAME), conn);
+ }
+
+ /** Make sure mixed update/insert export work correctly. */
+ @Test
+ public void testUpsertTextExport() throws IOException, SQLException {
+ createTestFile("inputFile", new String[] {
+ "2,Bob,400,sales",
+ "3,Fred,15,marketing",
+ });
+ // first time will be insert.
+ runExport(getArgv(SCH_TABLE_NAME, "--update-key", "id",
+ "--update-mode", "allowinsert", "--", "--schema", SCHEMA_SCH));
+ // second time will be update.
+ runExport(getArgv(SCH_TABLE_NAME, "--update-key", "id",
+ "--update-mode", "allowinsert", "--", "--schema", SCHEMA_SCH));
+ assertRowCount(2, escapeObjectName(SCHEMA_SCH) + "." + escapeObjectName(SCH_TABLE_NAME), conn);
+ }
+
+ public static void checkSQLBinaryTableContent(String[] expected, String tableName, Connection connection){
+ Statement stmt = null;
+ ResultSet rs = null;
+ try {
+ stmt = connection.createStatement();
+ rs = stmt.executeQuery("SELECT TOP 1 [b1], [b2] FROM " + tableName);
+ rs.next();
+ assertEquals(expected[0], rs.getString("b1"));
+ assertEquals(expected[1], rs.getString("b2"));
+ } catch (SQLException e) {
+ LOG.error("Can't verify table content", e);
+ fail();
+ } finally {
+ try {
+ connection.commit();
+
+ if (stmt != null) {
+ stmt.close();
+ }
+ if (rs != null) {
+ rs.close();
+ }
+ } catch (SQLException ex) {
+ LOG.info("Ignored exception in finally block.");
+ }
+ }
+ }
+
+ public static void assertRowCount(long expected,
+ String tableName,
+ Connection connection) {
+ Statement stmt = null;
+ ResultSet rs = null;
+ try {
+ stmt = connection.createStatement();
+ rs = stmt.executeQuery("SELECT count(*) FROM " + tableName);
+
+ rs.next();
+
+ assertEquals(expected, rs.getLong(1));
+ } catch (SQLException e) {
+ LOG.error("Can't verify number of rows", e);
+ fail();
+ } finally {
+ try {
+ connection.commit();
+
+ if (stmt != null) {
+ stmt.close();
+ }
+ if (rs != null) {
+ rs.close();
+ }
+ } catch (SQLException ex) {
+ LOG.info("Ignored exception in finally block.");
+ }
+ }
+ }
+
+ public String escapeObjectName(String objectName) {
+ return "[" + objectName + "]";
+ }
+}