You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@sqoop.apache.org by ja...@apache.org on 2013/09/11 10:13:18 UTC

git commit: SQOOP-974: Sqoop2: Add staging table support to generic jdbc export job

Updated Branches:
  refs/heads/sqoop2 08a829fd6 -> e33d69347


SQOOP-974: Sqoop2: Add staging table support to generic jdbc export job

(Raghav Kumar Gautam via Jarek Jarcec Cecho)


Project: http://git-wip-us.apache.org/repos/asf/sqoop/repo
Commit: http://git-wip-us.apache.org/repos/asf/sqoop/commit/e33d6934
Tree: http://git-wip-us.apache.org/repos/asf/sqoop/tree/e33d6934
Diff: http://git-wip-us.apache.org/repos/asf/sqoop/diff/e33d6934

Branch: refs/heads/sqoop2
Commit: e33d6934761b66f24a7d197aacc3d40ea00f8155
Parents: 08a829f
Author: Jarek Jarcec Cecho <ja...@apache.org>
Authored: Wed Sep 11 01:11:54 2013 -0700
Committer: Jarek Jarcec Cecho <ja...@apache.org>
Committed: Wed Sep 11 01:11:54 2013 -0700

----------------------------------------------------------------------
 .../jdbc/GenericJdbcConnectorError.java         |   5 +
 .../connector/jdbc/GenericJdbcExecutor.java     |  75 +++++++++
 .../jdbc/GenericJdbcExportDestroyer.java        |  28 ++++
 .../jdbc/GenericJdbcExportInitializer.java      |  29 +++-
 .../connector/jdbc/GenericJdbcValidator.java    |  11 ++
 .../jdbc/configuration/ExportTableForm.java     |   2 +
 .../generic-jdbc-connector-resources.properties |   8 +
 .../connector/jdbc/GenericJdbcExecutorTest.java |  88 ++++++++++
 .../connector/jdbc/TestExportInitializer.java   | 165 ++++++++++++++++++-
 .../generic/exports/TableStagedExportTest.java  |  77 +++++++++
 10 files changed, 486 insertions(+), 2 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/sqoop/blob/e33d6934/connector/connector-generic-jdbc/src/main/java/org/apache/sqoop/connector/jdbc/GenericJdbcConnectorError.java
----------------------------------------------------------------------
diff --git a/connector/connector-generic-jdbc/src/main/java/org/apache/sqoop/connector/jdbc/GenericJdbcConnectorError.java b/connector/connector-generic-jdbc/src/main/java/org/apache/sqoop/connector/jdbc/GenericJdbcConnectorError.java
index 671bb4a..2b1a0ad 100644
--- a/connector/connector-generic-jdbc/src/main/java/org/apache/sqoop/connector/jdbc/GenericJdbcConnectorError.java
+++ b/connector/connector-generic-jdbc/src/main/java/org/apache/sqoop/connector/jdbc/GenericJdbcConnectorError.java
@@ -73,6 +73,11 @@ public enum GenericJdbcConnectorError implements ErrorCode {
   /** Can't fetch schema */
   GENERIC_JDBC_CONNECTOR_0016("Can't fetch schema"),
 
+  /** Neither the table name nor the table sql are specified. */
+  GENERIC_JDBC_CONNECTOR_0017("The stage table is not empty."),
+
+  GENERIC_JDBC_CONNECTOR_0018("Error occurred while transferring data from " +
+    "stage table to destination table."),
   ;
 
   private final String message;

http://git-wip-us.apache.org/repos/asf/sqoop/blob/e33d6934/connector/connector-generic-jdbc/src/main/java/org/apache/sqoop/connector/jdbc/GenericJdbcExecutor.java
----------------------------------------------------------------------
diff --git a/connector/connector-generic-jdbc/src/main/java/org/apache/sqoop/connector/jdbc/GenericJdbcExecutor.java b/connector/connector-generic-jdbc/src/main/java/org/apache/sqoop/connector/jdbc/GenericJdbcExecutor.java
index 75cf9d9..9fd2e4f 100644
--- a/connector/connector-generic-jdbc/src/main/java/org/apache/sqoop/connector/jdbc/GenericJdbcExecutor.java
+++ b/connector/connector-generic-jdbc/src/main/java/org/apache/sqoop/connector/jdbc/GenericJdbcExecutor.java
@@ -26,10 +26,14 @@ import java.sql.ResultSetMetaData;
 import java.sql.SQLException;
 import java.sql.Statement;
 
+import org.apache.log4j.Logger;
 import org.apache.sqoop.common.SqoopException;
 
 public class GenericJdbcExecutor {
 
+  private static final Logger LOG =
+    Logger.getLogger(GenericJdbcExecutor.class);
+
   private Connection connection;
   private PreparedStatement preparedStatement;
 
@@ -69,6 +73,77 @@ public class GenericJdbcExecutor {
     }
   }
 
+  public void deleteTableData(String tableName) {
+    LOG.info("Deleting all the rows from: " + tableName);
+    executeUpdate("DELETE FROM " + tableName);
+  }
+
+  public void migrateData(String fromTable, String toTable) {
+    String insertQuery = "INSERT INTO " + toTable +
+      " ( SELECT * FROM " + fromTable + " )";
+    Statement stmt = null;
+    Boolean oldAutoCommit = null;
+    try {
+      final long expectedInsertCount = getTableRowCount(fromTable);
+      oldAutoCommit = connection.getAutoCommit();
+      connection.setAutoCommit(false);
+      stmt = connection.createStatement();
+      final int actualInsertCount = stmt.executeUpdate(insertQuery);
+      if(expectedInsertCount == actualInsertCount) {
+        LOG.info("Transferred " + actualInsertCount + " rows of staged data " +
+          "from: " + fromTable + " to: " + toTable);
+        connection.commit();
+        deleteTableData(fromTable);
+        connection.commit();
+      } else {
+        LOG.error("Rolling back as number of rows inserted into table: " +
+          toTable + " was: " + actualInsertCount + " expected: " +
+          expectedInsertCount);
+        connection.rollback();
+        throw new SqoopException(
+          GenericJdbcConnectorError.GENERIC_JDBC_CONNECTOR_0018);
+      }
+    } catch(SQLException e) {
+      LOG.error("Got SQLException while migrating data from: " + fromTable +
+        " to: " + toTable, e);
+      throw new SqoopException(
+        GenericJdbcConnectorError.GENERIC_JDBC_CONNECTOR_0018, e);
+    } finally {
+      if(stmt != null) {
+        try {
+          stmt.close();
+        } catch(SQLException e) {
+          LOG.warn("Got SQLException at the time of closing statement.", e);
+        }
+      }
+      if(oldAutoCommit != null) {
+        try {
+          connection.setAutoCommit(oldAutoCommit);
+        } catch(SQLException e) {
+          LOG.warn("Got SQLException while setting autoCommit mode.", e);
+        }
+      }
+    }
+  }
+
+  public long getTableRowCount(String tableName) {
+    ResultSet resultSet = executeQuery("SELECT COUNT(1) FROM " + tableName);
+    try {
+      resultSet.next();
+      return resultSet.getLong(1);
+    } catch(SQLException e) {
+      throw new SqoopException(
+        GenericJdbcConnectorError.GENERIC_JDBC_CONNECTOR_0004, e);
+    } finally {
+      try {
+        if(resultSet != null)
+          resultSet.close();
+      } catch(SQLException e) {
+        LOG.warn("Got SQLException while closing resultset.", e);
+      }
+    }
+  }
+
   public void executeUpdate(String sql) {
     try {
       Statement statement = connection.createStatement(

http://git-wip-us.apache.org/repos/asf/sqoop/blob/e33d6934/connector/connector-generic-jdbc/src/main/java/org/apache/sqoop/connector/jdbc/GenericJdbcExportDestroyer.java
----------------------------------------------------------------------
diff --git a/connector/connector-generic-jdbc/src/main/java/org/apache/sqoop/connector/jdbc/GenericJdbcExportDestroyer.java b/connector/connector-generic-jdbc/src/main/java/org/apache/sqoop/connector/jdbc/GenericJdbcExportDestroyer.java
index 588e236..c5faa09 100644
--- a/connector/connector-generic-jdbc/src/main/java/org/apache/sqoop/connector/jdbc/GenericJdbcExportDestroyer.java
+++ b/connector/connector-generic-jdbc/src/main/java/org/apache/sqoop/connector/jdbc/GenericJdbcExportDestroyer.java
@@ -30,5 +30,33 @@ public class GenericJdbcExportDestroyer extends Destroyer<ConnectionConfiguratio
   @Override
   public void destroy(DestroyerContext context, ConnectionConfiguration connection, ExportJobConfiguration job) {
     LOG.info("Running generic JDBC connector destroyer");
+
+    final String tableName = job.table.tableName;
+    final String stageTableName = job.table.stageTableName;
+    final boolean stageEnabled = stageTableName != null &&
+      stageTableName.length() > 0;
+    if(stageEnabled) {
+      moveDataToDestinationTable(connection,
+        context.isSuccess(), stageTableName, tableName);
+    }
   }
+
+  private void moveDataToDestinationTable(ConnectionConfiguration connectorConf,
+    boolean success, String stageTableName, String tableName) {
+    GenericJdbcExecutor executor =
+      new GenericJdbcExecutor(connectorConf.connection.jdbcDriver,
+        connectorConf.connection.connectionString,
+        connectorConf.connection.username,
+        connectorConf.connection.password);
+
+    if(success) {
+      LOG.info("Job completed, transferring data from stage table to " +
+        "destination table.");
+      executor.migrateData(stageTableName, tableName);
+    } else {
+      LOG.warn("Job failed, clearing stage table.");
+      executor.deleteTableData(stageTableName);
+    }
+  }
+
 }

http://git-wip-us.apache.org/repos/asf/sqoop/blob/e33d6934/connector/connector-generic-jdbc/src/main/java/org/apache/sqoop/connector/jdbc/GenericJdbcExportInitializer.java
----------------------------------------------------------------------
diff --git a/connector/connector-generic-jdbc/src/main/java/org/apache/sqoop/connector/jdbc/GenericJdbcExportInitializer.java b/connector/connector-generic-jdbc/src/main/java/org/apache/sqoop/connector/jdbc/GenericJdbcExportInitializer.java
index 7212843..ef39cdc 100644
--- a/connector/connector-generic-jdbc/src/main/java/org/apache/sqoop/connector/jdbc/GenericJdbcExportInitializer.java
+++ b/connector/connector-generic-jdbc/src/main/java/org/apache/sqoop/connector/jdbc/GenericJdbcExportInitializer.java
@@ -21,6 +21,7 @@ import java.util.LinkedList;
 import java.util.List;
 
 import org.apache.commons.lang.StringUtils;
+import org.apache.log4j.Logger;
 import org.apache.sqoop.common.MutableContext;
 import org.apache.sqoop.common.SqoopException;
 import org.apache.sqoop.connector.jdbc.configuration.ConnectionConfiguration;
@@ -33,6 +34,8 @@ import org.apache.sqoop.utils.ClassUtils;
 public class GenericJdbcExportInitializer extends Initializer<ConnectionConfiguration, ExportJobConfiguration> {
 
   private GenericJdbcExecutor executor;
+  private static final Logger LOG =
+    Logger.getLogger(GenericJdbcExportInitializer.class);
 
   @Override
   public void initialize(InitializerContext context, ConnectionConfiguration connection, ExportJobConfiguration job) {
@@ -75,6 +78,11 @@ public class GenericJdbcExportInitializer extends Initializer<ConnectionConfigur
 
     String schemaName = jobConfig.table.schemaName;
     String tableName = jobConfig.table.tableName;
+    String stageTableName = jobConfig.table.stageTableName;
+    boolean clearStageTable = jobConfig.table.clearStageTable == null ?
+      false : jobConfig.table.clearStageTable;
+    final boolean stageEnabled =
+      stageTableName != null && stageTableName.length() > 0;
     String tableSql = jobConfig.table.sql;
     String tableColumns = jobConfig.table.columns;
 
@@ -85,9 +93,28 @@ public class GenericJdbcExportInitializer extends Initializer<ConnectionConfigur
 
     } else if (tableName != null) {
       // when table name is specified:
+      if(stageEnabled) {
+        LOG.info("Stage has been enabled.");
+        LOG.info("Use stageTable: " + stageTableName +
+          " with clearStageTable: " + clearStageTable);
+
+        if(clearStageTable) {
+          executor.deleteTableData(stageTableName);
+        } else {
+          long stageRowCount = executor.getTableRowCount(stageTableName);
+          if(stageRowCount > 0) {
+            throw new SqoopException(
+              GenericJdbcConnectorError.GENERIC_JDBC_CONNECTOR_0017);
+          }
+        }
+      }
 
       // For databases that support schemas (IE: postgresql).
-      String fullTableName = (schemaName == null) ? executor.delimitIdentifier(tableName) : executor.delimitIdentifier(schemaName) + "." + executor.delimitIdentifier(tableName);
+      final String tableInUse = stageEnabled ? stageTableName : tableName;
+      String fullTableName = (schemaName == null) ?
+        executor.delimitIdentifier(tableInUse) :
+        executor.delimitIdentifier(schemaName) +
+          "." + executor.delimitIdentifier(tableInUse);
 
       if (tableColumns == null) {
         String[] columns = executor.getQueryColumns("SELECT * FROM "

http://git-wip-us.apache.org/repos/asf/sqoop/blob/e33d6934/connector/connector-generic-jdbc/src/main/java/org/apache/sqoop/connector/jdbc/GenericJdbcValidator.java
----------------------------------------------------------------------
diff --git a/connector/connector-generic-jdbc/src/main/java/org/apache/sqoop/connector/jdbc/GenericJdbcValidator.java b/connector/connector-generic-jdbc/src/main/java/org/apache/sqoop/connector/jdbc/GenericJdbcValidator.java
index 4e24517..0c5f6e1 100644
--- a/connector/connector-generic-jdbc/src/main/java/org/apache/sqoop/connector/jdbc/GenericJdbcValidator.java
+++ b/connector/connector-generic-jdbc/src/main/java/org/apache/sqoop/connector/jdbc/GenericJdbcValidator.java
@@ -88,6 +88,17 @@ public class GenericJdbcValidator extends Validator {
     if(configuration.table.tableName != null && configuration.table.sql != null) {
       validation.addMessage(Status.UNACCEPTABLE, "table", "Both table name and SQL cannot be specified");
     }
+    if(configuration.table.tableName == null &&
+        configuration.table.stageTableName != null) {
+      validation.addMessage(Status.UNACCEPTABLE, "table",
+        "Stage table name cannot be specified without specifying table name");
+    }
+    if(configuration.table.stageTableName == null &&
+        configuration.table.clearStageTable != null) {
+      validation.addMessage(Status.UNACCEPTABLE, "table",
+        "Clear stage table cannot be specified without specifying name of " +
+        "the stage table.");
+    }
 
     return validation;
   }

http://git-wip-us.apache.org/repos/asf/sqoop/blob/e33d6934/connector/connector-generic-jdbc/src/main/java/org/apache/sqoop/connector/jdbc/configuration/ExportTableForm.java
----------------------------------------------------------------------
diff --git a/connector/connector-generic-jdbc/src/main/java/org/apache/sqoop/connector/jdbc/configuration/ExportTableForm.java b/connector/connector-generic-jdbc/src/main/java/org/apache/sqoop/connector/jdbc/configuration/ExportTableForm.java
index a311c06..14a7033 100644
--- a/connector/connector-generic-jdbc/src/main/java/org/apache/sqoop/connector/jdbc/configuration/ExportTableForm.java
+++ b/connector/connector-generic-jdbc/src/main/java/org/apache/sqoop/connector/jdbc/configuration/ExportTableForm.java
@@ -29,4 +29,6 @@ public class ExportTableForm {
   @Input(size = 2000) public String tableName;
   @Input(size = 50)   public String sql;
   @Input(size = 50)   public String columns;
+  @Input(size = 2000) public String stageTableName;
+  @Input              public Boolean clearStageTable;
 }

http://git-wip-us.apache.org/repos/asf/sqoop/blob/e33d6934/connector/connector-generic-jdbc/src/main/resources/generic-jdbc-connector-resources.properties
----------------------------------------------------------------------
diff --git a/connector/connector-generic-jdbc/src/main/resources/generic-jdbc-connector-resources.properties b/connector/connector-generic-jdbc/src/main/resources/generic-jdbc-connector-resources.properties
index 0950e32..fc805df 100644
--- a/connector/connector-generic-jdbc/src/main/resources/generic-jdbc-connector-resources.properties
+++ b/connector/connector-generic-jdbc/src/main/resources/generic-jdbc-connector-resources.properties
@@ -73,6 +73,14 @@ table.columns.help = Specific columns of a table name or a table SQL
 table.warehouse.label = Data warehouse
 table.warehouse.help = The root directory for data
 
+# Stage table name
+table.stageTableName.label = Stage table name
+table.stageTableName.help = Name of the stage table to use
+
+# Clear stage table
+table.clearStageTable.label = Clear stage table
+table.clearStageTable.help = Indicate if the stage table should be cleared
+
 # Table datadir
 table.dataDirectory.label = Data directory
 table.dataDirectory.help = The sub-directory under warehouse for data

http://git-wip-us.apache.org/repos/asf/sqoop/blob/e33d6934/connector/connector-generic-jdbc/src/test/java/org/apache/sqoop/connector/jdbc/GenericJdbcExecutorTest.java
----------------------------------------------------------------------
diff --git a/connector/connector-generic-jdbc/src/test/java/org/apache/sqoop/connector/jdbc/GenericJdbcExecutorTest.java b/connector/connector-generic-jdbc/src/test/java/org/apache/sqoop/connector/jdbc/GenericJdbcExecutorTest.java
new file mode 100644
index 0000000..e10a5b4
--- /dev/null
+++ b/connector/connector-generic-jdbc/src/test/java/org/apache/sqoop/connector/jdbc/GenericJdbcExecutorTest.java
@@ -0,0 +1,88 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.sqoop.connector.jdbc;
+
+import junit.framework.TestCase;
+
+public class GenericJdbcExecutorTest extends TestCase {
+  private final String table;
+  private final String emptyTable;
+  private final GenericJdbcExecutor executor;
+
+  private static final int START = -50;
+  private static final int NUMBER_OF_ROWS = 974;
+
+  public GenericJdbcExecutorTest() {
+    table = getClass().getSimpleName().toUpperCase();
+    emptyTable = table + "_EMPTY";
+    executor = new GenericJdbcExecutor(GenericJdbcTestConstants.DRIVER,
+      GenericJdbcTestConstants.URL, null, null);
+  }
+
+  @Override
+  public void setUp() {
+    if(executor.existTable(emptyTable)) {
+      executor.executeUpdate("DROP TABLE " + emptyTable);
+    }
+    executor.executeUpdate("CREATE TABLE "
+      + emptyTable + "(ICOL INTEGER PRIMARY KEY, VCOL VARCHAR(20))");
+
+    if(executor.existTable(table)) {
+      executor.executeUpdate("DROP TABLE " + table);
+    }
+    executor.executeUpdate("CREATE TABLE "
+      + table + "(ICOL INTEGER PRIMARY KEY, VCOL VARCHAR(20))");
+
+    for (int i = 0; i < NUMBER_OF_ROWS; i++) {
+      int value = START + i;
+      String sql = "INSERT INTO " + table
+        + " VALUES(" + value + ", '" + value + "')";
+      executor.executeUpdate(sql);
+    }
+  }
+
+  @SuppressWarnings("unchecked")
+  public void testDeleteTableData() throws Exception {
+    executor.deleteTableData(table);
+    assertEquals("Table " + table + " is expected to be empty.",
+      0, executor.getTableRowCount(table));
+  }
+
+  @SuppressWarnings("unchecked")
+  public void testMigrateData() throws Exception {
+    assertEquals("Table " + emptyTable + " is expected to be empty.",
+      0, executor.getTableRowCount(emptyTable));
+    assertEquals("Table " + table + " is expected to have " +
+      NUMBER_OF_ROWS + " rows.", NUMBER_OF_ROWS,
+      executor.getTableRowCount(table));
+
+    executor.migrateData(table, emptyTable);
+
+    assertEquals("Table " + table + " is expected to be empty.", 0,
+      executor.getTableRowCount(table));
+    assertEquals("Table " + emptyTable + " is expected to have " +
+      NUMBER_OF_ROWS + " rows.", NUMBER_OF_ROWS,
+      executor.getTableRowCount(emptyTable));
+  }
+
+  @SuppressWarnings("unchecked")
+  public void testGetTableRowCount() throws Exception {
+    assertEquals("Table " + table + " is expected to be empty.",
+      NUMBER_OF_ROWS, executor.getTableRowCount(table));
+  }
+}

http://git-wip-us.apache.org/repos/asf/sqoop/blob/e33d6934/connector/connector-generic-jdbc/src/test/java/org/apache/sqoop/connector/jdbc/TestExportInitializer.java
----------------------------------------------------------------------
diff --git a/connector/connector-generic-jdbc/src/test/java/org/apache/sqoop/connector/jdbc/TestExportInitializer.java b/connector/connector-generic-jdbc/src/test/java/org/apache/sqoop/connector/jdbc/TestExportInitializer.java
index f83aaa2..d55b0f1 100644
--- a/connector/connector-generic-jdbc/src/test/java/org/apache/sqoop/connector/jdbc/TestExportInitializer.java
+++ b/connector/connector-generic-jdbc/src/test/java/org/apache/sqoop/connector/jdbc/TestExportInitializer.java
@@ -18,19 +18,23 @@
 package org.apache.sqoop.connector.jdbc;
 
 import junit.framework.TestCase;
-
 import org.apache.sqoop.common.MutableContext;
 import org.apache.sqoop.common.MutableMapContext;
+import org.apache.sqoop.common.SqoopException;
 import org.apache.sqoop.connector.jdbc.configuration.ConnectionConfiguration;
 import org.apache.sqoop.connector.jdbc.configuration.ExportJobConfiguration;
 import org.apache.sqoop.job.etl.Initializer;
 import org.apache.sqoop.job.etl.InitializerContext;
+import org.apache.sqoop.model.MJob;
+import org.apache.sqoop.validation.Status;
+import org.apache.sqoop.validation.Validation;
 
 public class TestExportInitializer extends TestCase {
 
   private final String schemaName;
   private final String tableName;
   private final String schemalessTableName;
+  private final String stageTableName;
   private final String tableSql;
   private final String schemalessTableSql;
   private final String tableColumns;
@@ -41,6 +45,8 @@ public class TestExportInitializer extends TestCase {
     schemaName = getClass().getSimpleName().toUpperCase() + "SCHEMA";
     tableName = getClass().getSimpleName().toUpperCase() + "TABLEWITHSCHEMA";
     schemalessTableName = getClass().getSimpleName().toUpperCase() + "TABLE";
+    stageTableName = getClass().getSimpleName().toUpperCase() +
+      "_STAGE_TABLE";
     tableSql = "INSERT INTO " + tableName + " VALUES (?,?,?)";
     schemalessTableSql = "INSERT INTO " + schemalessTableName + " VALUES (?,?,?)";
     tableColumns = "ICOL,VCOL";
@@ -199,4 +205,161 @@ public class TestExportInitializer extends TestCase {
     assertEquals(dataSql, context.getString(
         GenericJdbcConnectorConstants.CONNECTOR_JDBC_DATA_SQL));
   }
+
+  private void createTable(String tableName) {
+    try {
+      executor.executeUpdate("DROP TABLE " + tableName);
+    } catch(SqoopException e) {
+      //Ok to fail as the table might not exist
+    }
+    executor.executeUpdate("CREATE TABLE " + tableName +
+      "(ICOL INTEGER PRIMARY KEY, DCOL DOUBLE, VCOL VARCHAR(20))");
+  }
+
+  public void testNonExistingStageTable() throws Exception {
+    ConnectionConfiguration connConf = new ConnectionConfiguration();
+    ExportJobConfiguration jobConf = new ExportJobConfiguration();
+
+    connConf.connection.jdbcDriver = GenericJdbcTestConstants.DRIVER;
+    connConf.connection.connectionString = GenericJdbcTestConstants.URL;
+    jobConf.table.tableName = schemalessTableName;
+    jobConf.table.stageTableName = stageTableName;
+
+    MutableContext context = new MutableMapContext();
+    InitializerContext initializerContext = new InitializerContext(context);
+
+    @SuppressWarnings("rawtypes")
+    Initializer initializer = new GenericJdbcExportInitializer();
+    try {
+      initializer.initialize(initializerContext, connConf, jobConf);
+      fail("Initialization should fail for non-existing stage table.");
+    } catch(SqoopException se) {
+      //expected
+    }
+  }
+
+  @SuppressWarnings("unchecked")
+  public void testNonEmptyStageTable() throws Exception {
+    ConnectionConfiguration connConf = new ConnectionConfiguration();
+    ExportJobConfiguration jobConf = new ExportJobConfiguration();
+
+    String fullStageTableName = executor.delimitIdentifier(stageTableName);
+
+    connConf.connection.jdbcDriver = GenericJdbcTestConstants.DRIVER;
+    connConf.connection.connectionString = GenericJdbcTestConstants.URL;
+    jobConf.table.tableName = schemalessTableName;
+    jobConf.table.stageTableName = stageTableName;
+    createTable(fullStageTableName);
+    executor.executeUpdate("INSERT INTO " + fullStageTableName +
+      " VALUES(1, 1.1, 'one')");
+    MutableContext context = new MutableMapContext();
+    InitializerContext initializerContext = new InitializerContext(context);
+
+    @SuppressWarnings("rawtypes")
+    Initializer initializer = new GenericJdbcExportInitializer();
+    try {
+      initializer.initialize(initializerContext, connConf, jobConf);
+      fail("Initialization should fail for non-empty stage table.");
+    } catch(SqoopException se) {
+      //expected
+    }
+  }
+
+  @SuppressWarnings("unchecked")
+  public void testClearStageTableValidation() throws Exception {
+    ConnectionConfiguration connConf = new ConnectionConfiguration();
+    ExportJobConfiguration jobConf = new ExportJobConfiguration();
+
+    connConf.connection.jdbcDriver = GenericJdbcTestConstants.DRIVER;
+    connConf.connection.connectionString = GenericJdbcTestConstants.URL;
+    //specifying clear stage table flag without specifying name of
+    // the stage table
+    jobConf.table.tableName = schemalessTableName;
+    jobConf.table.clearStageTable = false;
+    GenericJdbcValidator validator = new GenericJdbcValidator();
+    Validation validation = validator.validateJob(MJob.Type.EXPORT, jobConf);
+    assertEquals("User should not specify clear stage table flag without " +
+      "specifying name of the stage table",
+      Status.UNACCEPTABLE,
+      validation.getStatus());
+    assertTrue(validation.getMessages().containsKey(
+      new Validation.FormInput("table")));
+
+    jobConf.table.clearStageTable = true;
+    validation = validator.validateJob(MJob.Type.EXPORT, jobConf);
+    assertEquals("User should not specify clear stage table flag without " +
+      "specifying name of the stage table",
+      Status.UNACCEPTABLE,
+      validation.getStatus());
+    assertTrue(validation.getMessages().containsKey(
+      new Validation.FormInput("table")));
+  }
+
+  @SuppressWarnings("unchecked")
+  public void testStageTableWithoutTable() throws Exception {
+    ConnectionConfiguration connConf = new ConnectionConfiguration();
+    ExportJobConfiguration jobConf = new ExportJobConfiguration();
+
+    connConf.connection.jdbcDriver = GenericJdbcTestConstants.DRIVER;
+    connConf.connection.connectionString = GenericJdbcTestConstants.URL;
+    //specifying stage table without specifying table name
+    jobConf.table.stageTableName = stageTableName;
+    jobConf.table.sql = "";
+
+    GenericJdbcValidator validator = new GenericJdbcValidator();
+    Validation validation = validator.validateJob(MJob.Type.EXPORT, jobConf);
+    assertEquals("Stage table name cannot be specified without specifying " +
+      "table name", Status.UNACCEPTABLE, validation.getStatus());
+    assertTrue(validation.getMessages().containsKey(
+      new Validation.FormInput("table")));
+  }
+
+  @SuppressWarnings("unchecked")
+  public void testClearStageTable() throws Exception {
+    ConnectionConfiguration connConf = new ConnectionConfiguration();
+    ExportJobConfiguration jobConf = new ExportJobConfiguration();
+
+    String fullStageTableName = executor.delimitIdentifier(stageTableName);
+
+    connConf.connection.jdbcDriver = GenericJdbcTestConstants.DRIVER;
+    connConf.connection.connectionString = GenericJdbcTestConstants.URL;
+    jobConf.table.tableName = schemalessTableName;
+    jobConf.table.stageTableName = stageTableName;
+    jobConf.table.clearStageTable = true;
+    createTable(fullStageTableName);
+    executor.executeUpdate("INSERT INTO " + fullStageTableName +
+      " VALUES(1, 1.1, 'one')");
+    MutableContext context = new MutableMapContext();
+    InitializerContext initializerContext = new InitializerContext(context);
+
+    @SuppressWarnings("rawtypes")
+    Initializer initializer = new GenericJdbcExportInitializer();
+    initializer.initialize(initializerContext, connConf, jobConf);
+    assertEquals("Stage table should have been cleared", 0,
+      executor.getTableRowCount(stageTableName));
+  }
+
+  @SuppressWarnings("unchecked")
+  public void testStageTable() throws Exception {
+    ConnectionConfiguration connConf = new ConnectionConfiguration();
+    ExportJobConfiguration jobConf = new ExportJobConfiguration();
+
+    String fullStageTableName = executor.delimitIdentifier(stageTableName);
+
+    connConf.connection.jdbcDriver = GenericJdbcTestConstants.DRIVER;
+    connConf.connection.connectionString = GenericJdbcTestConstants.URL;
+    jobConf.table.tableName = schemalessTableName;
+    jobConf.table.stageTableName = stageTableName;
+    createTable(fullStageTableName);
+    MutableContext context = new MutableMapContext();
+    InitializerContext initializerContext = new InitializerContext(context);
+
+    @SuppressWarnings("rawtypes")
+    Initializer initializer = new GenericJdbcExportInitializer();
+    initializer.initialize(initializerContext, connConf, jobConf);
+
+    verifyResult(context, "INSERT INTO " + fullStageTableName +
+      " VALUES (?,?,?)");
+  }
+
 }

http://git-wip-us.apache.org/repos/asf/sqoop/blob/e33d6934/test/src/test/java/org/apache/sqoop/integration/connector/jdbc/generic/exports/TableStagedExportTest.java
----------------------------------------------------------------------
diff --git a/test/src/test/java/org/apache/sqoop/integration/connector/jdbc/generic/exports/TableStagedExportTest.java b/test/src/test/java/org/apache/sqoop/integration/connector/jdbc/generic/exports/TableStagedExportTest.java
new file mode 100644
index 0000000..e36437b
--- /dev/null
+++ b/test/src/test/java/org/apache/sqoop/integration/connector/jdbc/generic/exports/TableStagedExportTest.java
@@ -0,0 +1,77 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.sqoop.integration.connector.jdbc.generic.exports;
+
+import org.apache.sqoop.model.MConnection;
+import org.apache.sqoop.model.MFormList;
+import org.apache.sqoop.model.MJob;
+import org.apache.sqoop.test.data.Cities;
+import org.apache.sqoop.test.testcases.ConnectorTestCase;
+import org.junit.Test;
+
+import static org.junit.Assert.assertEquals;
+
+/**
+ *
+ */
+public class TableStagedExportTest extends ConnectorTestCase {
+
+  @Test
+  public void testStagedExport() throws Exception {
+    final String stageTableName = "STAGE_" + getTableName();
+    createTableCities();
+    createInputMapreduceFile("input-0001",
+      "1,'USA','San Francisco'",
+      "2,'USA','Sunnyvale'",
+      "3,'Czech Republic','Brno'",
+      "4,'USA','Palo Alto'"
+    );
+    new Cities(provider, stageTableName).createTables();
+    // Connection creation
+    MConnection connection = getClient().newConnection("generic-jdbc-connector");
+    fillConnectionForm(connection);
+    createConnection(connection);
+
+    // Job creation
+    MJob job = getClient().newJob(connection.getPersistenceId(),
+      MJob.Type.EXPORT);
+
+    // Connector values
+    MFormList forms = job.getConnectorPart();
+    forms.getStringInput("table.tableName").setValue(
+      provider.escapeTableName(getTableName()));
+    forms.getStringInput("table.stageTableName").setValue(
+      provider.escapeTableName(stageTableName));
+    fillInputForm(job);
+    createJob(job);
+
+    runJob(job);
+
+    assertEquals(0L, provider.rowCount(stageTableName));
+    assertEquals(4L, rowCount());
+    assertRowInCities(1, "USA", "San Francisco");
+    assertRowInCities(2, "USA", "Sunnyvale");
+    assertRowInCities(3, "Czech Republic", "Brno");
+    assertRowInCities(4, "USA", "Palo Alto");
+
+    // Clean up testing table
+    provider.dropTable(stageTableName);
+    dropTable();
+  }
+
+}