You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@sqoop.apache.org by ja...@apache.org on 2013/09/11 10:13:18 UTC
git commit: SQOOP-974: Sqoop2: Add staging table support to generic
jdbc export job
Updated Branches:
refs/heads/sqoop2 08a829fd6 -> e33d69347
SQOOP-974: Sqoop2: Add staging table support to generic jdbc export job
(Raghav Kumar Gautam via Jarek Jarcec Cecho)
Project: http://git-wip-us.apache.org/repos/asf/sqoop/repo
Commit: http://git-wip-us.apache.org/repos/asf/sqoop/commit/e33d6934
Tree: http://git-wip-us.apache.org/repos/asf/sqoop/tree/e33d6934
Diff: http://git-wip-us.apache.org/repos/asf/sqoop/diff/e33d6934
Branch: refs/heads/sqoop2
Commit: e33d6934761b66f24a7d197aacc3d40ea00f8155
Parents: 08a829f
Author: Jarek Jarcec Cecho <ja...@apache.org>
Authored: Wed Sep 11 01:11:54 2013 -0700
Committer: Jarek Jarcec Cecho <ja...@apache.org>
Committed: Wed Sep 11 01:11:54 2013 -0700
----------------------------------------------------------------------
.../jdbc/GenericJdbcConnectorError.java | 5 +
.../connector/jdbc/GenericJdbcExecutor.java | 75 +++++++++
.../jdbc/GenericJdbcExportDestroyer.java | 28 ++++
.../jdbc/GenericJdbcExportInitializer.java | 29 +++-
.../connector/jdbc/GenericJdbcValidator.java | 11 ++
.../jdbc/configuration/ExportTableForm.java | 2 +
.../generic-jdbc-connector-resources.properties | 8 +
.../connector/jdbc/GenericJdbcExecutorTest.java | 88 ++++++++++
.../connector/jdbc/TestExportInitializer.java | 165 ++++++++++++++++++-
.../generic/exports/TableStagedExportTest.java | 77 +++++++++
10 files changed, 486 insertions(+), 2 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/sqoop/blob/e33d6934/connector/connector-generic-jdbc/src/main/java/org/apache/sqoop/connector/jdbc/GenericJdbcConnectorError.java
----------------------------------------------------------------------
diff --git a/connector/connector-generic-jdbc/src/main/java/org/apache/sqoop/connector/jdbc/GenericJdbcConnectorError.java b/connector/connector-generic-jdbc/src/main/java/org/apache/sqoop/connector/jdbc/GenericJdbcConnectorError.java
index 671bb4a..2b1a0ad 100644
--- a/connector/connector-generic-jdbc/src/main/java/org/apache/sqoop/connector/jdbc/GenericJdbcConnectorError.java
+++ b/connector/connector-generic-jdbc/src/main/java/org/apache/sqoop/connector/jdbc/GenericJdbcConnectorError.java
@@ -73,6 +73,11 @@ public enum GenericJdbcConnectorError implements ErrorCode {
/** Can't fetch schema */
GENERIC_JDBC_CONNECTOR_0016("Can't fetch schema"),
+ /** Neither the table name nor the table sql are specified. */
+ GENERIC_JDBC_CONNECTOR_0017("The stage table is not empty."),
+
+ GENERIC_JDBC_CONNECTOR_0018("Error occurred while transferring data from " +
+ "stage table to destination table."),
;
private final String message;
http://git-wip-us.apache.org/repos/asf/sqoop/blob/e33d6934/connector/connector-generic-jdbc/src/main/java/org/apache/sqoop/connector/jdbc/GenericJdbcExecutor.java
----------------------------------------------------------------------
diff --git a/connector/connector-generic-jdbc/src/main/java/org/apache/sqoop/connector/jdbc/GenericJdbcExecutor.java b/connector/connector-generic-jdbc/src/main/java/org/apache/sqoop/connector/jdbc/GenericJdbcExecutor.java
index 75cf9d9..9fd2e4f 100644
--- a/connector/connector-generic-jdbc/src/main/java/org/apache/sqoop/connector/jdbc/GenericJdbcExecutor.java
+++ b/connector/connector-generic-jdbc/src/main/java/org/apache/sqoop/connector/jdbc/GenericJdbcExecutor.java
@@ -26,10 +26,14 @@ import java.sql.ResultSetMetaData;
import java.sql.SQLException;
import java.sql.Statement;
+import org.apache.log4j.Logger;
import org.apache.sqoop.common.SqoopException;
public class GenericJdbcExecutor {
+ private static final Logger LOG =
+ Logger.getLogger(GenericJdbcExecutor.class);
+
private Connection connection;
private PreparedStatement preparedStatement;
@@ -69,6 +73,77 @@ public class GenericJdbcExecutor {
}
}
+ public void deleteTableData(String tableName) {
+ LOG.info("Deleting all the rows from: " + tableName);
+ executeUpdate("DELETE FROM " + tableName);
+ }
+
+ public void migrateData(String fromTable, String toTable) {
+ String insertQuery = "INSERT INTO " + toTable +
+ " ( SELECT * FROM " + fromTable + " )";
+ Statement stmt = null;
+ Boolean oldAutoCommit = null;
+ try {
+ final long expectedInsertCount = getTableRowCount(fromTable);
+ oldAutoCommit = connection.getAutoCommit();
+ connection.setAutoCommit(false);
+ stmt = connection.createStatement();
+ final int actualInsertCount = stmt.executeUpdate(insertQuery);
+ if(expectedInsertCount == actualInsertCount) {
+ LOG.info("Transferred " + actualInsertCount + " rows of staged data " +
+ "from: " + fromTable + " to: " + toTable);
+ connection.commit();
+ deleteTableData(fromTable);
+ connection.commit();
+ } else {
+ LOG.error("Rolling back as number of rows inserted into table: " +
+ toTable + " was: " + actualInsertCount + " expected: " +
+ expectedInsertCount);
+ connection.rollback();
+ throw new SqoopException(
+ GenericJdbcConnectorError.GENERIC_JDBC_CONNECTOR_0018);
+ }
+ } catch(SQLException e) {
+ LOG.error("Got SQLException while migrating data from: " + fromTable +
+ " to: " + toTable, e);
+ throw new SqoopException(
+ GenericJdbcConnectorError.GENERIC_JDBC_CONNECTOR_0018, e);
+ } finally {
+ if(stmt != null) {
+ try {
+ stmt.close();
+ } catch(SQLException e) {
+ LOG.warn("Got SQLException at the time of closing statement.", e);
+ }
+ }
+ if(oldAutoCommit != null) {
+ try {
+ connection.setAutoCommit(oldAutoCommit);
+ } catch(SQLException e) {
+ LOG.warn("Got SQLException while setting autoCommit mode.", e);
+ }
+ }
+ }
+ }
+
+ public long getTableRowCount(String tableName) {
+ ResultSet resultSet = executeQuery("SELECT COUNT(1) FROM " + tableName);
+ try {
+ resultSet.next();
+ return resultSet.getLong(1);
+ } catch(SQLException e) {
+ throw new SqoopException(
+ GenericJdbcConnectorError.GENERIC_JDBC_CONNECTOR_0004, e);
+ } finally {
+ try {
+ if(resultSet != null)
+ resultSet.close();
+ } catch(SQLException e) {
+ LOG.warn("Got SQLException while closing resultset.", e);
+ }
+ }
+ }
+
public void executeUpdate(String sql) {
try {
Statement statement = connection.createStatement(
http://git-wip-us.apache.org/repos/asf/sqoop/blob/e33d6934/connector/connector-generic-jdbc/src/main/java/org/apache/sqoop/connector/jdbc/GenericJdbcExportDestroyer.java
----------------------------------------------------------------------
diff --git a/connector/connector-generic-jdbc/src/main/java/org/apache/sqoop/connector/jdbc/GenericJdbcExportDestroyer.java b/connector/connector-generic-jdbc/src/main/java/org/apache/sqoop/connector/jdbc/GenericJdbcExportDestroyer.java
index 588e236..c5faa09 100644
--- a/connector/connector-generic-jdbc/src/main/java/org/apache/sqoop/connector/jdbc/GenericJdbcExportDestroyer.java
+++ b/connector/connector-generic-jdbc/src/main/java/org/apache/sqoop/connector/jdbc/GenericJdbcExportDestroyer.java
@@ -30,5 +30,33 @@ public class GenericJdbcExportDestroyer extends Destroyer<ConnectionConfiguratio
@Override
public void destroy(DestroyerContext context, ConnectionConfiguration connection, ExportJobConfiguration job) {
LOG.info("Running generic JDBC connector destroyer");
+
+ final String tableName = job.table.tableName;
+ final String stageTableName = job.table.stageTableName;
+ final boolean stageEnabled = stageTableName != null &&
+ stageTableName.length() > 0;
+ if(stageEnabled) {
+ moveDataToDestinationTable(connection,
+ context.isSuccess(), stageTableName, tableName);
+ }
}
+
+ private void moveDataToDestinationTable(ConnectionConfiguration connectorConf,
+ boolean success, String stageTableName, String tableName) {
+ GenericJdbcExecutor executor =
+ new GenericJdbcExecutor(connectorConf.connection.jdbcDriver,
+ connectorConf.connection.connectionString,
+ connectorConf.connection.username,
+ connectorConf.connection.password);
+
+ if(success) {
+ LOG.info("Job completed, transferring data from stage table to " +
+ "destination table.");
+ executor.migrateData(stageTableName, tableName);
+ } else {
+ LOG.warn("Job failed, clearing stage table.");
+ executor.deleteTableData(stageTableName);
+ }
+ }
+
}
http://git-wip-us.apache.org/repos/asf/sqoop/blob/e33d6934/connector/connector-generic-jdbc/src/main/java/org/apache/sqoop/connector/jdbc/GenericJdbcExportInitializer.java
----------------------------------------------------------------------
diff --git a/connector/connector-generic-jdbc/src/main/java/org/apache/sqoop/connector/jdbc/GenericJdbcExportInitializer.java b/connector/connector-generic-jdbc/src/main/java/org/apache/sqoop/connector/jdbc/GenericJdbcExportInitializer.java
index 7212843..ef39cdc 100644
--- a/connector/connector-generic-jdbc/src/main/java/org/apache/sqoop/connector/jdbc/GenericJdbcExportInitializer.java
+++ b/connector/connector-generic-jdbc/src/main/java/org/apache/sqoop/connector/jdbc/GenericJdbcExportInitializer.java
@@ -21,6 +21,7 @@ import java.util.LinkedList;
import java.util.List;
import org.apache.commons.lang.StringUtils;
+import org.apache.log4j.Logger;
import org.apache.sqoop.common.MutableContext;
import org.apache.sqoop.common.SqoopException;
import org.apache.sqoop.connector.jdbc.configuration.ConnectionConfiguration;
@@ -33,6 +34,8 @@ import org.apache.sqoop.utils.ClassUtils;
public class GenericJdbcExportInitializer extends Initializer<ConnectionConfiguration, ExportJobConfiguration> {
private GenericJdbcExecutor executor;
+ private static final Logger LOG =
+ Logger.getLogger(GenericJdbcExportInitializer.class);
@Override
public void initialize(InitializerContext context, ConnectionConfiguration connection, ExportJobConfiguration job) {
@@ -75,6 +78,11 @@ public class GenericJdbcExportInitializer extends Initializer<ConnectionConfigur
String schemaName = jobConfig.table.schemaName;
String tableName = jobConfig.table.tableName;
+ String stageTableName = jobConfig.table.stageTableName;
+ boolean clearStageTable = jobConfig.table.clearStageTable == null ?
+ false : jobConfig.table.clearStageTable;
+ final boolean stageEnabled =
+ stageTableName != null && stageTableName.length() > 0;
String tableSql = jobConfig.table.sql;
String tableColumns = jobConfig.table.columns;
@@ -85,9 +93,28 @@ public class GenericJdbcExportInitializer extends Initializer<ConnectionConfigur
} else if (tableName != null) {
// when table name is specified:
+ if(stageEnabled) {
+ LOG.info("Stage has been enabled.");
+ LOG.info("Use stageTable: " + stageTableName +
+ " with clearStageTable: " + clearStageTable);
+
+ if(clearStageTable) {
+ executor.deleteTableData(stageTableName);
+ } else {
+ long stageRowCount = executor.getTableRowCount(stageTableName);
+ if(stageRowCount > 0) {
+ throw new SqoopException(
+ GenericJdbcConnectorError.GENERIC_JDBC_CONNECTOR_0017);
+ }
+ }
+ }
// For databases that support schemas (IE: postgresql).
- String fullTableName = (schemaName == null) ? executor.delimitIdentifier(tableName) : executor.delimitIdentifier(schemaName) + "." + executor.delimitIdentifier(tableName);
+ final String tableInUse = stageEnabled ? stageTableName : tableName;
+ String fullTableName = (schemaName == null) ?
+ executor.delimitIdentifier(tableInUse) :
+ executor.delimitIdentifier(schemaName) +
+ "." + executor.delimitIdentifier(tableInUse);
if (tableColumns == null) {
String[] columns = executor.getQueryColumns("SELECT * FROM "
http://git-wip-us.apache.org/repos/asf/sqoop/blob/e33d6934/connector/connector-generic-jdbc/src/main/java/org/apache/sqoop/connector/jdbc/GenericJdbcValidator.java
----------------------------------------------------------------------
diff --git a/connector/connector-generic-jdbc/src/main/java/org/apache/sqoop/connector/jdbc/GenericJdbcValidator.java b/connector/connector-generic-jdbc/src/main/java/org/apache/sqoop/connector/jdbc/GenericJdbcValidator.java
index 4e24517..0c5f6e1 100644
--- a/connector/connector-generic-jdbc/src/main/java/org/apache/sqoop/connector/jdbc/GenericJdbcValidator.java
+++ b/connector/connector-generic-jdbc/src/main/java/org/apache/sqoop/connector/jdbc/GenericJdbcValidator.java
@@ -88,6 +88,17 @@ public class GenericJdbcValidator extends Validator {
if(configuration.table.tableName != null && configuration.table.sql != null) {
validation.addMessage(Status.UNACCEPTABLE, "table", "Both table name and SQL cannot be specified");
}
+ if(configuration.table.tableName == null &&
+ configuration.table.stageTableName != null) {
+ validation.addMessage(Status.UNACCEPTABLE, "table",
+ "Stage table name cannot be specified without specifying table name");
+ }
+ if(configuration.table.stageTableName == null &&
+ configuration.table.clearStageTable != null) {
+ validation.addMessage(Status.UNACCEPTABLE, "table",
+ "Clear stage table cannot be specified without specifying name of " +
+ "the stage table.");
+ }
return validation;
}
http://git-wip-us.apache.org/repos/asf/sqoop/blob/e33d6934/connector/connector-generic-jdbc/src/main/java/org/apache/sqoop/connector/jdbc/configuration/ExportTableForm.java
----------------------------------------------------------------------
diff --git a/connector/connector-generic-jdbc/src/main/java/org/apache/sqoop/connector/jdbc/configuration/ExportTableForm.java b/connector/connector-generic-jdbc/src/main/java/org/apache/sqoop/connector/jdbc/configuration/ExportTableForm.java
index a311c06..14a7033 100644
--- a/connector/connector-generic-jdbc/src/main/java/org/apache/sqoop/connector/jdbc/configuration/ExportTableForm.java
+++ b/connector/connector-generic-jdbc/src/main/java/org/apache/sqoop/connector/jdbc/configuration/ExportTableForm.java
@@ -29,4 +29,6 @@ public class ExportTableForm {
@Input(size = 2000) public String tableName;
@Input(size = 50) public String sql;
@Input(size = 50) public String columns;
+ @Input(size = 2000) public String stageTableName;
+ @Input public Boolean clearStageTable;
}
http://git-wip-us.apache.org/repos/asf/sqoop/blob/e33d6934/connector/connector-generic-jdbc/src/main/resources/generic-jdbc-connector-resources.properties
----------------------------------------------------------------------
diff --git a/connector/connector-generic-jdbc/src/main/resources/generic-jdbc-connector-resources.properties b/connector/connector-generic-jdbc/src/main/resources/generic-jdbc-connector-resources.properties
index 0950e32..fc805df 100644
--- a/connector/connector-generic-jdbc/src/main/resources/generic-jdbc-connector-resources.properties
+++ b/connector/connector-generic-jdbc/src/main/resources/generic-jdbc-connector-resources.properties
@@ -73,6 +73,14 @@ table.columns.help = Specific columns of a table name or a table SQL
table.warehouse.label = Data warehouse
table.warehouse.help = The root directory for data
+# Stage table name
+table.stageTableName.label = Stage table name
+table.stageTableName.help = Name of the stage table to use
+
+# Clear stage table
+table.clearStageTable.label = Clear stage table
+table.clearStageTable.help = Indicate if the stage table should be cleared
+
# Table datadir
table.dataDirectory.label = Data directory
table.dataDirectory.help = The sub-directory under warehouse for data
http://git-wip-us.apache.org/repos/asf/sqoop/blob/e33d6934/connector/connector-generic-jdbc/src/test/java/org/apache/sqoop/connector/jdbc/GenericJdbcExecutorTest.java
----------------------------------------------------------------------
diff --git a/connector/connector-generic-jdbc/src/test/java/org/apache/sqoop/connector/jdbc/GenericJdbcExecutorTest.java b/connector/connector-generic-jdbc/src/test/java/org/apache/sqoop/connector/jdbc/GenericJdbcExecutorTest.java
new file mode 100644
index 0000000..e10a5b4
--- /dev/null
+++ b/connector/connector-generic-jdbc/src/test/java/org/apache/sqoop/connector/jdbc/GenericJdbcExecutorTest.java
@@ -0,0 +1,88 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.sqoop.connector.jdbc;
+
+import junit.framework.TestCase;
+
+public class GenericJdbcExecutorTest extends TestCase {
+ private final String table;
+ private final String emptyTable;
+ private final GenericJdbcExecutor executor;
+
+ private static final int START = -50;
+ private static final int NUMBER_OF_ROWS = 974;
+
+ public GenericJdbcExecutorTest() {
+ table = getClass().getSimpleName().toUpperCase();
+ emptyTable = table + "_EMPTY";
+ executor = new GenericJdbcExecutor(GenericJdbcTestConstants.DRIVER,
+ GenericJdbcTestConstants.URL, null, null);
+ }
+
+ @Override
+ public void setUp() {
+ if(executor.existTable(emptyTable)) {
+ executor.executeUpdate("DROP TABLE " + emptyTable);
+ }
+ executor.executeUpdate("CREATE TABLE "
+ + emptyTable + "(ICOL INTEGER PRIMARY KEY, VCOL VARCHAR(20))");
+
+ if(executor.existTable(table)) {
+ executor.executeUpdate("DROP TABLE " + table);
+ }
+ executor.executeUpdate("CREATE TABLE "
+ + table + "(ICOL INTEGER PRIMARY KEY, VCOL VARCHAR(20))");
+
+ for (int i = 0; i < NUMBER_OF_ROWS; i++) {
+ int value = START + i;
+ String sql = "INSERT INTO " + table
+ + " VALUES(" + value + ", '" + value + "')";
+ executor.executeUpdate(sql);
+ }
+ }
+
+ @SuppressWarnings("unchecked")
+ public void testDeleteTableData() throws Exception {
+ executor.deleteTableData(table);
+ assertEquals("Table " + table + " is expected to be empty.",
+ 0, executor.getTableRowCount(table));
+ }
+
+ @SuppressWarnings("unchecked")
+ public void testMigrateData() throws Exception {
+ assertEquals("Table " + emptyTable + " is expected to be empty.",
+ 0, executor.getTableRowCount(emptyTable));
+ assertEquals("Table " + table + " is expected to have " +
+ NUMBER_OF_ROWS + " rows.", NUMBER_OF_ROWS,
+ executor.getTableRowCount(table));
+
+ executor.migrateData(table, emptyTable);
+
+ assertEquals("Table " + table + " is expected to be empty.", 0,
+ executor.getTableRowCount(table));
+ assertEquals("Table " + emptyTable + " is expected to have " +
+ NUMBER_OF_ROWS + " rows.", NUMBER_OF_ROWS,
+ executor.getTableRowCount(emptyTable));
+ }
+
+ @SuppressWarnings("unchecked")
+ public void testGetTableRowCount() throws Exception {
+ assertEquals("Table " + table + " is expected to be empty.",
+ NUMBER_OF_ROWS, executor.getTableRowCount(table));
+ }
+}
http://git-wip-us.apache.org/repos/asf/sqoop/blob/e33d6934/connector/connector-generic-jdbc/src/test/java/org/apache/sqoop/connector/jdbc/TestExportInitializer.java
----------------------------------------------------------------------
diff --git a/connector/connector-generic-jdbc/src/test/java/org/apache/sqoop/connector/jdbc/TestExportInitializer.java b/connector/connector-generic-jdbc/src/test/java/org/apache/sqoop/connector/jdbc/TestExportInitializer.java
index f83aaa2..d55b0f1 100644
--- a/connector/connector-generic-jdbc/src/test/java/org/apache/sqoop/connector/jdbc/TestExportInitializer.java
+++ b/connector/connector-generic-jdbc/src/test/java/org/apache/sqoop/connector/jdbc/TestExportInitializer.java
@@ -18,19 +18,23 @@
package org.apache.sqoop.connector.jdbc;
import junit.framework.TestCase;
-
import org.apache.sqoop.common.MutableContext;
import org.apache.sqoop.common.MutableMapContext;
+import org.apache.sqoop.common.SqoopException;
import org.apache.sqoop.connector.jdbc.configuration.ConnectionConfiguration;
import org.apache.sqoop.connector.jdbc.configuration.ExportJobConfiguration;
import org.apache.sqoop.job.etl.Initializer;
import org.apache.sqoop.job.etl.InitializerContext;
+import org.apache.sqoop.model.MJob;
+import org.apache.sqoop.validation.Status;
+import org.apache.sqoop.validation.Validation;
public class TestExportInitializer extends TestCase {
private final String schemaName;
private final String tableName;
private final String schemalessTableName;
+ private final String stageTableName;
private final String tableSql;
private final String schemalessTableSql;
private final String tableColumns;
@@ -41,6 +45,8 @@ public class TestExportInitializer extends TestCase {
schemaName = getClass().getSimpleName().toUpperCase() + "SCHEMA";
tableName = getClass().getSimpleName().toUpperCase() + "TABLEWITHSCHEMA";
schemalessTableName = getClass().getSimpleName().toUpperCase() + "TABLE";
+ stageTableName = getClass().getSimpleName().toUpperCase() +
+ "_STAGE_TABLE";
tableSql = "INSERT INTO " + tableName + " VALUES (?,?,?)";
schemalessTableSql = "INSERT INTO " + schemalessTableName + " VALUES (?,?,?)";
tableColumns = "ICOL,VCOL";
@@ -199,4 +205,161 @@ public class TestExportInitializer extends TestCase {
assertEquals(dataSql, context.getString(
GenericJdbcConnectorConstants.CONNECTOR_JDBC_DATA_SQL));
}
+
+ private void createTable(String tableName) {
+ try {
+ executor.executeUpdate("DROP TABLE " + tableName);
+ } catch(SqoopException e) {
+ //Ok to fail as the table might not exist
+ }
+ executor.executeUpdate("CREATE TABLE " + tableName +
+ "(ICOL INTEGER PRIMARY KEY, DCOL DOUBLE, VCOL VARCHAR(20))");
+ }
+
+ public void testNonExistingStageTable() throws Exception {
+ ConnectionConfiguration connConf = new ConnectionConfiguration();
+ ExportJobConfiguration jobConf = new ExportJobConfiguration();
+
+ connConf.connection.jdbcDriver = GenericJdbcTestConstants.DRIVER;
+ connConf.connection.connectionString = GenericJdbcTestConstants.URL;
+ jobConf.table.tableName = schemalessTableName;
+ jobConf.table.stageTableName = stageTableName;
+
+ MutableContext context = new MutableMapContext();
+ InitializerContext initializerContext = new InitializerContext(context);
+
+ @SuppressWarnings("rawtypes")
+ Initializer initializer = new GenericJdbcExportInitializer();
+ try {
+ initializer.initialize(initializerContext, connConf, jobConf);
+ fail("Initialization should fail for non-existing stage table.");
+ } catch(SqoopException se) {
+ //expected
+ }
+ }
+
+ @SuppressWarnings("unchecked")
+ public void testNonEmptyStageTable() throws Exception {
+ ConnectionConfiguration connConf = new ConnectionConfiguration();
+ ExportJobConfiguration jobConf = new ExportJobConfiguration();
+
+ String fullStageTableName = executor.delimitIdentifier(stageTableName);
+
+ connConf.connection.jdbcDriver = GenericJdbcTestConstants.DRIVER;
+ connConf.connection.connectionString = GenericJdbcTestConstants.URL;
+ jobConf.table.tableName = schemalessTableName;
+ jobConf.table.stageTableName = stageTableName;
+ createTable(fullStageTableName);
+ executor.executeUpdate("INSERT INTO " + fullStageTableName +
+ " VALUES(1, 1.1, 'one')");
+ MutableContext context = new MutableMapContext();
+ InitializerContext initializerContext = new InitializerContext(context);
+
+ @SuppressWarnings("rawtypes")
+ Initializer initializer = new GenericJdbcExportInitializer();
+ try {
+ initializer.initialize(initializerContext, connConf, jobConf);
+ fail("Initialization should fail for non-empty stage table.");
+ } catch(SqoopException se) {
+ //expected
+ }
+ }
+
+ @SuppressWarnings("unchecked")
+ public void testClearStageTableValidation() throws Exception {
+ ConnectionConfiguration connConf = new ConnectionConfiguration();
+ ExportJobConfiguration jobConf = new ExportJobConfiguration();
+
+ connConf.connection.jdbcDriver = GenericJdbcTestConstants.DRIVER;
+ connConf.connection.connectionString = GenericJdbcTestConstants.URL;
+ //specifying clear stage table flag without specifying name of
+ // the stage table
+ jobConf.table.tableName = schemalessTableName;
+ jobConf.table.clearStageTable = false;
+ GenericJdbcValidator validator = new GenericJdbcValidator();
+ Validation validation = validator.validateJob(MJob.Type.EXPORT, jobConf);
+ assertEquals("User should not specify clear stage table flag without " +
+ "specifying name of the stage table",
+ Status.UNACCEPTABLE,
+ validation.getStatus());
+ assertTrue(validation.getMessages().containsKey(
+ new Validation.FormInput("table")));
+
+ jobConf.table.clearStageTable = true;
+ validation = validator.validateJob(MJob.Type.EXPORT, jobConf);
+ assertEquals("User should not specify clear stage table flag without " +
+ "specifying name of the stage table",
+ Status.UNACCEPTABLE,
+ validation.getStatus());
+ assertTrue(validation.getMessages().containsKey(
+ new Validation.FormInput("table")));
+ }
+
+ @SuppressWarnings("unchecked")
+ public void testStageTableWithoutTable() throws Exception {
+ ConnectionConfiguration connConf = new ConnectionConfiguration();
+ ExportJobConfiguration jobConf = new ExportJobConfiguration();
+
+ connConf.connection.jdbcDriver = GenericJdbcTestConstants.DRIVER;
+ connConf.connection.connectionString = GenericJdbcTestConstants.URL;
+ //specifying stage table without specifying table name
+ jobConf.table.stageTableName = stageTableName;
+ jobConf.table.sql = "";
+
+ GenericJdbcValidator validator = new GenericJdbcValidator();
+ Validation validation = validator.validateJob(MJob.Type.EXPORT, jobConf);
+ assertEquals("Stage table name cannot be specified without specifying " +
+ "table name", Status.UNACCEPTABLE, validation.getStatus());
+ assertTrue(validation.getMessages().containsKey(
+ new Validation.FormInput("table")));
+ }
+
+ @SuppressWarnings("unchecked")
+ public void testClearStageTable() throws Exception {
+ ConnectionConfiguration connConf = new ConnectionConfiguration();
+ ExportJobConfiguration jobConf = new ExportJobConfiguration();
+
+ String fullStageTableName = executor.delimitIdentifier(stageTableName);
+
+ connConf.connection.jdbcDriver = GenericJdbcTestConstants.DRIVER;
+ connConf.connection.connectionString = GenericJdbcTestConstants.URL;
+ jobConf.table.tableName = schemalessTableName;
+ jobConf.table.stageTableName = stageTableName;
+ jobConf.table.clearStageTable = true;
+ createTable(fullStageTableName);
+ executor.executeUpdate("INSERT INTO " + fullStageTableName +
+ " VALUES(1, 1.1, 'one')");
+ MutableContext context = new MutableMapContext();
+ InitializerContext initializerContext = new InitializerContext(context);
+
+ @SuppressWarnings("rawtypes")
+ Initializer initializer = new GenericJdbcExportInitializer();
+ initializer.initialize(initializerContext, connConf, jobConf);
+ assertEquals("Stage table should have been cleared", 0,
+ executor.getTableRowCount(stageTableName));
+ }
+
+ @SuppressWarnings("unchecked")
+ public void testStageTable() throws Exception {
+ ConnectionConfiguration connConf = new ConnectionConfiguration();
+ ExportJobConfiguration jobConf = new ExportJobConfiguration();
+
+ String fullStageTableName = executor.delimitIdentifier(stageTableName);
+
+ connConf.connection.jdbcDriver = GenericJdbcTestConstants.DRIVER;
+ connConf.connection.connectionString = GenericJdbcTestConstants.URL;
+ jobConf.table.tableName = schemalessTableName;
+ jobConf.table.stageTableName = stageTableName;
+ createTable(fullStageTableName);
+ MutableContext context = new MutableMapContext();
+ InitializerContext initializerContext = new InitializerContext(context);
+
+ @SuppressWarnings("rawtypes")
+ Initializer initializer = new GenericJdbcExportInitializer();
+ initializer.initialize(initializerContext, connConf, jobConf);
+
+ verifyResult(context, "INSERT INTO " + fullStageTableName +
+ " VALUES (?,?,?)");
+ }
+
}
http://git-wip-us.apache.org/repos/asf/sqoop/blob/e33d6934/test/src/test/java/org/apache/sqoop/integration/connector/jdbc/generic/exports/TableStagedExportTest.java
----------------------------------------------------------------------
diff --git a/test/src/test/java/org/apache/sqoop/integration/connector/jdbc/generic/exports/TableStagedExportTest.java b/test/src/test/java/org/apache/sqoop/integration/connector/jdbc/generic/exports/TableStagedExportTest.java
new file mode 100644
index 0000000..e36437b
--- /dev/null
+++ b/test/src/test/java/org/apache/sqoop/integration/connector/jdbc/generic/exports/TableStagedExportTest.java
@@ -0,0 +1,77 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.sqoop.integration.connector.jdbc.generic.exports;
+
+import org.apache.sqoop.model.MConnection;
+import org.apache.sqoop.model.MFormList;
+import org.apache.sqoop.model.MJob;
+import org.apache.sqoop.test.data.Cities;
+import org.apache.sqoop.test.testcases.ConnectorTestCase;
+import org.junit.Test;
+
+import static org.junit.Assert.assertEquals;
+
+/**
+ *
+ */
+public class TableStagedExportTest extends ConnectorTestCase {
+
+ @Test
+ public void testStagedExport() throws Exception {
+ final String stageTableName = "STAGE_" + getTableName();
+ createTableCities();
+ createInputMapreduceFile("input-0001",
+ "1,'USA','San Francisco'",
+ "2,'USA','Sunnyvale'",
+ "3,'Czech Republic','Brno'",
+ "4,'USA','Palo Alto'"
+ );
+ new Cities(provider, stageTableName).createTables();
+ // Connection creation
+ MConnection connection = getClient().newConnection("generic-jdbc-connector");
+ fillConnectionForm(connection);
+ createConnection(connection);
+
+ // Job creation
+ MJob job = getClient().newJob(connection.getPersistenceId(),
+ MJob.Type.EXPORT);
+
+ // Connector values
+ MFormList forms = job.getConnectorPart();
+ forms.getStringInput("table.tableName").setValue(
+ provider.escapeTableName(getTableName()));
+ forms.getStringInput("table.stageTableName").setValue(
+ provider.escapeTableName(stageTableName));
+ fillInputForm(job);
+ createJob(job);
+
+ runJob(job);
+
+ assertEquals(0L, provider.rowCount(stageTableName));
+ assertEquals(4L, rowCount());
+ assertRowInCities(1, "USA", "San Francisco");
+ assertRowInCities(2, "USA", "Sunnyvale");
+ assertRowInCities(3, "Czech Republic", "Brno");
+ assertRowInCities(4, "USA", "Palo Alto");
+
+ // Clean up testing table
+ provider.dropTable(stageTableName);
+ dropTable();
+ }
+
+}