You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@sqoop.apache.org by ja...@apache.org on 2012/11/04 17:04:07 UTC

git commit: SQOOP-655: Generic JDBC connector for export

Updated Branches:
  refs/heads/sqoop2 2481b7f8d -> 0976713f0


SQOOP-655: Generic JDBC connector for export

(Bilung Lee via Jarek Jarcec Cecho)


Project: http://git-wip-us.apache.org/repos/asf/sqoop/repo
Commit: http://git-wip-us.apache.org/repos/asf/sqoop/commit/0976713f
Tree: http://git-wip-us.apache.org/repos/asf/sqoop/tree/0976713f
Diff: http://git-wip-us.apache.org/repos/asf/sqoop/diff/0976713f

Branch: refs/heads/sqoop2
Commit: 0976713f0104709565b8c3a4c628c1abdca83569
Parents: 2481b7f
Author: Jarek Jarcec Cecho <ja...@apache.org>
Authored: Sun Nov 4 08:02:53 2012 -0800
Committer: Jarek Jarcec Cecho <ja...@apache.org>
Committed: Sun Nov 4 08:02:53 2012 -0800

----------------------------------------------------------------------
 .../java/org/apache/sqoop/common/MapContext.java   |    1 -
 .../sqoop/connector/jdbc/GenericJdbcExecutor.java  |   48 ++++
 .../connector/jdbc/GenericJdbcExportDestroyer.java |    4 +-
 .../jdbc/GenericJdbcExportInitializer.java         |  170 ++++++++++++++-
 .../connector/jdbc/GenericJdbcExportLoader.java    |   51 +++++-
 .../connector/jdbc/GenericJdbcImportDestroyer.java |    4 +-
 .../jdbc/GenericJdbcImportInitializer.java         |   14 +-
 .../jdbc/GenericJdbcImportPartitioner.java         |    1 -
 .../connector/jdbc/TestExportInitializer.java      |  164 ++++++++++++++
 .../sqoop/connector/jdbc/TestExportLoader.java     |  140 ++++++++++++
 .../main/java/org/apache/sqoop/job/Constants.java  |    3 +
 .../java/org/apache/sqoop/job/etl/Destroyer.java   |    5 +-
 .../java/org/apache/sqoop/job/etl/Initializer.java |    8 +-
 13 files changed, 589 insertions(+), 24 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/sqoop/blob/0976713f/common/src/main/java/org/apache/sqoop/common/MapContext.java
----------------------------------------------------------------------
diff --git a/common/src/main/java/org/apache/sqoop/common/MapContext.java b/common/src/main/java/org/apache/sqoop/common/MapContext.java
index c1d24ad..b245148 100644
--- a/common/src/main/java/org/apache/sqoop/common/MapContext.java
+++ b/common/src/main/java/org/apache/sqoop/common/MapContext.java
@@ -18,7 +18,6 @@
 package org.apache.sqoop.common;
 
 import java.util.HashMap;
-import java.util.Iterator;
 import java.util.Map;
 
 /**

http://git-wip-us.apache.org/repos/asf/sqoop/blob/0976713f/connector/connector-generic-jdbc/src/main/java/org/apache/sqoop/connector/jdbc/GenericJdbcExecutor.java
----------------------------------------------------------------------
diff --git a/connector/connector-generic-jdbc/src/main/java/org/apache/sqoop/connector/jdbc/GenericJdbcExecutor.java b/connector/connector-generic-jdbc/src/main/java/org/apache/sqoop/connector/jdbc/GenericJdbcExecutor.java
index 226fcd3..2dba8af 100644
--- a/connector/connector-generic-jdbc/src/main/java/org/apache/sqoop/connector/jdbc/GenericJdbcExecutor.java
+++ b/connector/connector-generic-jdbc/src/main/java/org/apache/sqoop/connector/jdbc/GenericJdbcExecutor.java
@@ -20,6 +20,7 @@ package org.apache.sqoop.connector.jdbc;
 import java.sql.Connection;
 import java.sql.DatabaseMetaData;
 import java.sql.DriverManager;
+import java.sql.PreparedStatement;
 import java.sql.ResultSet;
 import java.sql.ResultSetMetaData;
 import java.sql.SQLException;
@@ -30,6 +31,7 @@ import org.apache.sqoop.common.SqoopException;
 public class GenericJdbcExecutor {
 
   private Connection connection;
+  private PreparedStatement preparedStatement;
 
   public GenericJdbcExecutor(String driver, String url,
       String username, String password) {
@@ -71,6 +73,52 @@ public class GenericJdbcExecutor {
     }
   }
 
+  public void beginBatch(String sql) {
+    try {
+      preparedStatement = connection.prepareStatement(sql,
+          ResultSet.TYPE_FORWARD_ONLY, ResultSet.CONCUR_READ_ONLY);
+
+    } catch (SQLException e) {
+      throw new SqoopException(
+          GenericJdbcConnectorError.GENERIC_JDBC_CONNECTOR_0002, e);
+    }
+  }
+
+  public void addBatch(Object[] array) {
+    try {
+      for (int i=0; i<array.length; i++) {
+        preparedStatement.setObject(i+1, array[i]);
+      }
+      preparedStatement.addBatch();
+    } catch (SQLException e) {
+      throw new SqoopException(
+          GenericJdbcConnectorError.GENERIC_JDBC_CONNECTOR_0002, e);
+    }
+  }
+
+  public void executeBatch(boolean commit) {
+    try {
+      preparedStatement.executeBatch();
+      if (commit) {
+        connection.commit();
+      }
+    } catch (SQLException e) {
+      throw new SqoopException(
+          GenericJdbcConnectorError.GENERIC_JDBC_CONNECTOR_0002, e);
+    }
+  }
+
+  public void endBatch() {
+    try {
+      if (preparedStatement != null) {
+        preparedStatement.close();
+      }
+    } catch (SQLException e) {
+      throw new SqoopException(
+          GenericJdbcConnectorError.GENERIC_JDBC_CONNECTOR_0002, e);
+    }
+  }
+
   public String getPrimaryKey(String table) {
     try {
       String[] splitNames = dequalify(table);

http://git-wip-us.apache.org/repos/asf/sqoop/blob/0976713f/connector/connector-generic-jdbc/src/main/java/org/apache/sqoop/connector/jdbc/GenericJdbcExportDestroyer.java
----------------------------------------------------------------------
diff --git a/connector/connector-generic-jdbc/src/main/java/org/apache/sqoop/connector/jdbc/GenericJdbcExportDestroyer.java b/connector/connector-generic-jdbc/src/main/java/org/apache/sqoop/connector/jdbc/GenericJdbcExportDestroyer.java
index c230f01..7f952ac 100644
--- a/connector/connector-generic-jdbc/src/main/java/org/apache/sqoop/connector/jdbc/GenericJdbcExportDestroyer.java
+++ b/connector/connector-generic-jdbc/src/main/java/org/apache/sqoop/connector/jdbc/GenericJdbcExportDestroyer.java
@@ -17,13 +17,13 @@
  */
 package org.apache.sqoop.connector.jdbc;
 
-import org.apache.sqoop.common.MapContext;
+import org.apache.sqoop.common.ImmutableContext;
 import org.apache.sqoop.job.etl.Destroyer;
 
 public class GenericJdbcExportDestroyer extends Destroyer {
 
   @Override
-  public void run(MapContext context) {
+  public void run(ImmutableContext context) {
     // TODO Auto-generated method stub
   }
 

http://git-wip-us.apache.org/repos/asf/sqoop/blob/0976713f/connector/connector-generic-jdbc/src/main/java/org/apache/sqoop/connector/jdbc/GenericJdbcExportInitializer.java
----------------------------------------------------------------------
diff --git a/connector/connector-generic-jdbc/src/main/java/org/apache/sqoop/connector/jdbc/GenericJdbcExportInitializer.java b/connector/connector-generic-jdbc/src/main/java/org/apache/sqoop/connector/jdbc/GenericJdbcExportInitializer.java
index 0e91767..72b992c 100644
--- a/connector/connector-generic-jdbc/src/main/java/org/apache/sqoop/connector/jdbc/GenericJdbcExportInitializer.java
+++ b/connector/connector-generic-jdbc/src/main/java/org/apache/sqoop/connector/jdbc/GenericJdbcExportInitializer.java
@@ -17,14 +17,178 @@
  */
 package org.apache.sqoop.connector.jdbc;
 
-import org.apache.sqoop.common.MutableMapContext;
+import java.util.LinkedList;
+import java.util.List;
+
+import org.apache.commons.lang.StringUtils;
+import org.apache.sqoop.common.ImmutableContext;
+import org.apache.sqoop.common.MutableContext;
+import org.apache.sqoop.common.SqoopException;
+import org.apache.sqoop.connector.jdbc.configuration.ConnectionConfiguration;
+import org.apache.sqoop.connector.jdbc.configuration.ExportJobConfiguration;
+import org.apache.sqoop.connector.jdbc.configuration.ImportJobConfiguration;
+import org.apache.sqoop.job.Constants;
 import org.apache.sqoop.job.etl.Initializer;
+import org.apache.sqoop.utils.ClassUtils;
 
 public class GenericJdbcExportInitializer extends Initializer {
 
+  private GenericJdbcExecutor executor;
+
+  @Override
+  public void initialize(MutableContext context, Object connectionConfiguration, Object jobConfiguration) {
+    ConnectionConfiguration connectionConfig = (ConnectionConfiguration)connectionConfiguration;
+    ExportJobConfiguration jobConfig = (ExportJobConfiguration)jobConfiguration;
+
+    configureJdbcProperties(context, connectionConfig, jobConfig);
+    try {
+      configureTableProperties(context, connectionConfig, jobConfig);
+
+    } finally {
+      executor.close();
+    }
+  }
+
   @Override
-  public void initialize(MutableMapContext context, Object connectionConfiguration, Object jobConfiguration) {
-    // TODO Auto-generated method stub
+  public List<String> getJars(ImmutableContext context, Object connectionConfiguration, Object jobConfiguration) {
+    List<String> jars = new LinkedList<String>();
+
+    ConnectionConfiguration connection = (ConnectionConfiguration) connectionConfiguration;
+    jars.add(ClassUtils.jarForClass(connection.jdbcDriver));
+
+    return jars;
+  }
+
+  private void configureJdbcProperties(MutableContext context, ConnectionConfiguration connectionConfig, ExportJobConfiguration jobConfig) {
+    String driver = connectionConfig.jdbcDriver;
+    String url = connectionConfig.connectionString;
+    String username = connectionConfig.username;
+    String password = connectionConfig.password;
+
+    if (driver == null) {
+      throw new SqoopException(
+          GenericJdbcConnectorError.GENERIC_JDBC_CONNECTOR_0012,
+          GenericJdbcConnectorConstants.INPUT_CONN_JDBCDRIVER);
+    }
+    context.setString(
+        GenericJdbcConnectorConstants.CONNECTOR_JDBC_DRIVER,
+        driver);
+
+    if (url == null) {
+      throw new SqoopException(
+          GenericJdbcConnectorError.GENERIC_JDBC_CONNECTOR_0012,
+          GenericJdbcConnectorConstants.INPUT_CONN_CONNECTSTRING);
+    }
+    context.setString(
+        GenericJdbcConnectorConstants.CONNECTOR_JDBC_URL,
+        url);
+
+    if (username != null) {
+      context.setString(
+          GenericJdbcConnectorConstants.CONNECTOR_JDBC_USERNAME,
+          username);
+    }
+
+    if (password != null) {
+      context.setString(
+          GenericJdbcConnectorConstants.CONNECTOR_JDBC_PASSWORD,
+          password);
+    }
+
+    executor = new GenericJdbcExecutor(driver, url, username, password);
+  }
+
+  private void configureTableProperties(MutableContext context, ConnectionConfiguration connectionConfig, ExportJobConfiguration jobConfig) {
+    String dataSql;
+    String inputDirectory;
+
+    String tableName = connectionConfig.tableName;
+    String tableSql = connectionConfig.sql;
+    String tableColumns = connectionConfig.columns;
+
+    String datadir = connectionConfig.dataDirectory;
+    String warehouse = connectionConfig.warehouse;
+    if (warehouse == null) {
+      warehouse = GenericJdbcConnectorConstants.DEFAULT_WAREHOUSE;
+    } else if (!warehouse.endsWith(GenericJdbcConnectorConstants.FILE_SEPARATOR)) {
+      warehouse += GenericJdbcConnectorConstants.FILE_SEPARATOR;
+    }
+
+    if (tableName != null && tableSql != null) {
+      // when both table name and table sql are specified:
+      throw new SqoopException(
+          GenericJdbcConnectorError.GENERIC_JDBC_CONNECTOR_0007);
+
+    } else if (tableName != null) {
+      // when table name is specified:
+
+      if (tableColumns == null) {
+        String[] columns = executor.getQueryColumns("SELECT * FROM "
+            + executor.delimitIdentifier(tableName) + " WHERE 1 = 0");
+        StringBuilder builder = new StringBuilder();
+        builder.append("INSERT INTO ");
+        builder.append(executor.delimitIdentifier(tableName));
+        builder.append(" VALUES (?");
+        for (int i = 1; i < columns.length; i++) {
+          builder.append(",?");
+        }
+        builder.append(")");
+        dataSql = builder.toString();
+
+      } else {
+        String[] columns = StringUtils.split(tableColumns, ',');
+        StringBuilder builder = new StringBuilder();
+        builder.append("INSERT INTO ");
+        builder.append(executor.delimitIdentifier(tableName));
+        builder.append(" (");
+        builder.append(tableColumns);
+        builder.append(") VALUES (?");
+        for (int i = 1; i < columns.length; i++) {
+          builder.append(",?");
+        }
+        builder.append(")");
+        dataSql = builder.toString();
+      }
+
+      if (datadir == null) {
+        inputDirectory = warehouse + tableName;
+      } else {
+        inputDirectory = warehouse + datadir;
+      }
+
+    } else if (tableSql != null) {
+      // when table sql is specified:
+
+      if (tableSql.indexOf(
+          GenericJdbcConnectorConstants.SQL_PARAMETER_MARKER) == -1) {
+        // make sure parameter marker is in the specified sql
+        throw new SqoopException(
+            GenericJdbcConnectorError.GENERIC_JDBC_CONNECTOR_0013);
+      }
+
+      if (tableColumns == null) {
+        dataSql = tableSql;
+      } else {
+        throw new SqoopException(
+            GenericJdbcConnectorError.GENERIC_JDBC_CONNECTOR_0014);
+      }
+
+      if (datadir == null) {
+        inputDirectory =
+            warehouse + GenericJdbcConnectorConstants.DEFAULT_DATADIR;
+      } else {
+        inputDirectory = warehouse + datadir;
+      }
+
+    } else {
+      // when neither are specified:
+      throw new SqoopException(
+          GenericJdbcConnectorError.GENERIC_JDBC_CONNECTOR_0008);
+    }
+
+    context.setString(GenericJdbcConnectorConstants.CONNECTOR_JDBC_DATA_SQL,
+        dataSql.toString());
+    context.setString(Constants.JOB_ETL_INPUT_DIRECTORY, inputDirectory);
   }
 
 }

http://git-wip-us.apache.org/repos/asf/sqoop/blob/0976713f/connector/connector-generic-jdbc/src/main/java/org/apache/sqoop/connector/jdbc/GenericJdbcExportLoader.java
----------------------------------------------------------------------
diff --git a/connector/connector-generic-jdbc/src/main/java/org/apache/sqoop/connector/jdbc/GenericJdbcExportLoader.java b/connector/connector-generic-jdbc/src/main/java/org/apache/sqoop/connector/jdbc/GenericJdbcExportLoader.java
index 4cf0595..ff7384c 100644
--- a/connector/connector-generic-jdbc/src/main/java/org/apache/sqoop/connector/jdbc/GenericJdbcExportLoader.java
+++ b/connector/connector-generic-jdbc/src/main/java/org/apache/sqoop/connector/jdbc/GenericJdbcExportLoader.java
@@ -23,9 +23,58 @@ import org.apache.sqoop.job.io.DataReader;
 
 public class GenericJdbcExportLoader extends Loader {
 
+  public static final int DEFAULT_ROWS_PER_BATCH = 100;
+  public static final int DEFAULT_BATCHES_PER_TRANSACTION = 100;
+  private int rowsPerBatch = DEFAULT_ROWS_PER_BATCH;
+  private int batchesPerTransaction = DEFAULT_BATCHES_PER_TRANSACTION;
+
   @Override
   public void run(ImmutableContext context, DataReader reader) {
-    // TODO Auto-generated method stub
+    String driver = context.getString(
+        GenericJdbcConnectorConstants.CONNECTOR_JDBC_DRIVER);
+    String url = context.getString(
+        GenericJdbcConnectorConstants.CONNECTOR_JDBC_URL);
+    String username = context.getString(
+        GenericJdbcConnectorConstants.CONNECTOR_JDBC_USERNAME);
+    String password = context.getString(
+        GenericJdbcConnectorConstants.CONNECTOR_JDBC_PASSWORD);
+    GenericJdbcExecutor executor = new GenericJdbcExecutor(
+        driver, url, username, password);
+
+    String sql = context.getString(
+        GenericJdbcConnectorConstants.CONNECTOR_JDBC_DATA_SQL);
+    executor.beginBatch(sql);
+    try {
+      int numberOfRows = 0;
+      int numberOfBatches = 0;
+      Object[] array;
+
+      while ((array = reader.readArrayRecord()) != null) {
+        numberOfRows++;
+        executor.addBatch(array);
+
+        if (numberOfRows == rowsPerBatch) {
+          numberOfBatches++;
+          if (numberOfBatches == batchesPerTransaction) {
+            executor.executeBatch(true);
+            numberOfBatches = 0;
+          } else {
+            executor.executeBatch(false);
+          }
+          numberOfRows = 0;
+        }
+      }
+
+      if (numberOfRows != 0) {
+        // execute and commit the remaining rows
+        executor.executeBatch(true);
+      }
+
+      executor.endBatch();
+
+    } finally {
+      executor.close();
+    }
   }
 
 }

http://git-wip-us.apache.org/repos/asf/sqoop/blob/0976713f/connector/connector-generic-jdbc/src/main/java/org/apache/sqoop/connector/jdbc/GenericJdbcImportDestroyer.java
----------------------------------------------------------------------
diff --git a/connector/connector-generic-jdbc/src/main/java/org/apache/sqoop/connector/jdbc/GenericJdbcImportDestroyer.java b/connector/connector-generic-jdbc/src/main/java/org/apache/sqoop/connector/jdbc/GenericJdbcImportDestroyer.java
index 3f6718d..a53fa59 100644
--- a/connector/connector-generic-jdbc/src/main/java/org/apache/sqoop/connector/jdbc/GenericJdbcImportDestroyer.java
+++ b/connector/connector-generic-jdbc/src/main/java/org/apache/sqoop/connector/jdbc/GenericJdbcImportDestroyer.java
@@ -17,13 +17,13 @@
  */
 package org.apache.sqoop.connector.jdbc;
 
-import org.apache.sqoop.common.MapContext;
+import org.apache.sqoop.common.ImmutableContext;
 import org.apache.sqoop.job.etl.Destroyer;
 
 public class GenericJdbcImportDestroyer extends Destroyer {
 
   @Override
-  public void run(MapContext context) {
+  public void run(ImmutableContext context) {
     // TODO Auto-generated method stub
   }
 

http://git-wip-us.apache.org/repos/asf/sqoop/blob/0976713f/connector/connector-generic-jdbc/src/main/java/org/apache/sqoop/connector/jdbc/GenericJdbcImportInitializer.java
----------------------------------------------------------------------
diff --git a/connector/connector-generic-jdbc/src/main/java/org/apache/sqoop/connector/jdbc/GenericJdbcImportInitializer.java b/connector/connector-generic-jdbc/src/main/java/org/apache/sqoop/connector/jdbc/GenericJdbcImportInitializer.java
index 2075d99..f8e941c 100644
--- a/connector/connector-generic-jdbc/src/main/java/org/apache/sqoop/connector/jdbc/GenericJdbcImportInitializer.java
+++ b/connector/connector-generic-jdbc/src/main/java/org/apache/sqoop/connector/jdbc/GenericJdbcImportInitializer.java
@@ -25,8 +25,8 @@ import java.util.List;
 
 import org.apache.commons.lang.StringUtils;
 import org.apache.log4j.Logger;
-import org.apache.sqoop.common.MapContext;
-import org.apache.sqoop.common.MutableMapContext;
+import org.apache.sqoop.common.ImmutableContext;
+import org.apache.sqoop.common.MutableContext;
 import org.apache.sqoop.common.SqoopException;
 import org.apache.sqoop.connector.jdbc.configuration.ConnectionConfiguration;
 import org.apache.sqoop.connector.jdbc.configuration.ImportJobConfiguration;
@@ -42,7 +42,7 @@ public class GenericJdbcImportInitializer extends Initializer {
   private GenericJdbcExecutor executor;
 
   @Override
-  public void initialize(MutableMapContext context, Object oConnectionConfig, Object oJobConfig) {
+  public void initialize(MutableContext context, Object oConnectionConfig, Object oJobConfig) {
     ConnectionConfiguration connectionConfig = (ConnectionConfiguration)oConnectionConfig;
     ImportJobConfiguration jobConfig = (ImportJobConfiguration)oJobConfig;
 
@@ -58,7 +58,7 @@ public class GenericJdbcImportInitializer extends Initializer {
   }
 
   @Override
-  public List<String> getJars(MapContext context, Object connectionConfiguration, Object jobConfiguration) {
+  public List<String> getJars(ImmutableContext context, Object connectionConfiguration, Object jobConfiguration) {
     List<String> jars = new LinkedList<String>();
 
     ConnectionConfiguration connection = (ConnectionConfiguration) connectionConfiguration;
@@ -67,7 +67,7 @@ public class GenericJdbcImportInitializer extends Initializer {
     return jars;
   }
 
-  private void configureJdbcProperties(MutableMapContext context, ConnectionConfiguration connectionConfig, ImportJobConfiguration jobConfig) {
+  private void configureJdbcProperties(MutableContext context, ConnectionConfiguration connectionConfig, ImportJobConfiguration jobConfig) {
     String driver = connectionConfig.jdbcDriver;
     String url = connectionConfig.connectionString;
     String username = connectionConfig.username;
@@ -107,7 +107,7 @@ public class GenericJdbcImportInitializer extends Initializer {
     executor = new GenericJdbcExecutor(driver, url, username, password);
   }
 
-  private void configurePartitionProperties(MutableMapContext context, ConnectionConfiguration connectionConfig, ImportJobConfiguration jobConfig) {
+  private void configurePartitionProperties(MutableContext context, ConnectionConfiguration connectionConfig, ImportJobConfiguration jobConfig) {
     // ----- configure column name -----
 
     String partitionColumnName = connectionConfig.partitionColumn;
@@ -207,7 +207,7 @@ public class GenericJdbcImportInitializer extends Initializer {
     }
   }
 
-  private void configureTableProperties(MutableMapContext context, ConnectionConfiguration connectionConfig, ImportJobConfiguration jobConfig) {
+  private void configureTableProperties(MutableContext context, ConnectionConfiguration connectionConfig, ImportJobConfiguration jobConfig) {
     String dataSql;
     String fieldNames;
     String outputDirectory;

http://git-wip-us.apache.org/repos/asf/sqoop/blob/0976713f/connector/connector-generic-jdbc/src/main/java/org/apache/sqoop/connector/jdbc/GenericJdbcImportPartitioner.java
----------------------------------------------------------------------
diff --git a/connector/connector-generic-jdbc/src/main/java/org/apache/sqoop/connector/jdbc/GenericJdbcImportPartitioner.java b/connector/connector-generic-jdbc/src/main/java/org/apache/sqoop/connector/jdbc/GenericJdbcImportPartitioner.java
index 5071471..a6d3b52 100644
--- a/connector/connector-generic-jdbc/src/main/java/org/apache/sqoop/connector/jdbc/GenericJdbcImportPartitioner.java
+++ b/connector/connector-generic-jdbc/src/main/java/org/apache/sqoop/connector/jdbc/GenericJdbcImportPartitioner.java
@@ -22,7 +22,6 @@ import java.util.LinkedList;
 import java.util.List;
 
 import org.apache.sqoop.common.ImmutableContext;
-import org.apache.sqoop.common.MapContext;
 import org.apache.sqoop.common.SqoopException;
 import org.apache.sqoop.job.Constants;
 import org.apache.sqoop.job.etl.Partition;

http://git-wip-us.apache.org/repos/asf/sqoop/blob/0976713f/connector/connector-generic-jdbc/src/test/java/org/apache/sqoop/connector/jdbc/TestExportInitializer.java
----------------------------------------------------------------------
diff --git a/connector/connector-generic-jdbc/src/test/java/org/apache/sqoop/connector/jdbc/TestExportInitializer.java b/connector/connector-generic-jdbc/src/test/java/org/apache/sqoop/connector/jdbc/TestExportInitializer.java
new file mode 100644
index 0000000..532e6fd
--- /dev/null
+++ b/connector/connector-generic-jdbc/src/test/java/org/apache/sqoop/connector/jdbc/TestExportInitializer.java
@@ -0,0 +1,164 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.sqoop.connector.jdbc;
+
+import java.util.Hashtable;
+
+import junit.framework.TestCase;
+
+import org.apache.sqoop.job.Constants;
+import org.apache.sqoop.job.etl.Initializer;
+//import org.apache.sqoop.job.etl.MutableContext;
+//import org.apache.sqoop.job.etl.Options;
+import org.junit.Test;
+
+public class TestExportInitializer extends TestCase {
+
+  private final String tableName;
+  private final String tableSql;
+  private final String tableColumns;
+
+  private GenericJdbcExecutor executor;
+
+  public TestExportInitializer() {
+    tableName = getClass().getSimpleName();
+    tableSql = "INSERT INTO \"" + tableName + "\" VALUES (?,?,?)";
+    tableColumns = "ICOL,VCOL";
+  }
+
+  public void testVoid() { }
+
+//  @Override
+//  public void setUp() {
+//    executor = new GenericJdbcExecutor(GenericJdbcTestConstants.DRIVER,
+//        GenericJdbcTestConstants.URL, null, null);
+//
+//    if (!executor.existTable(tableName)) {
+//      executor.executeUpdate("CREATE TABLE "
+//          + executor.delimitIdentifier(tableName)
+//          + "(ICOL INTEGER PRIMARY KEY, DCOL DOUBLE, VCOL VARCHAR(20))");
+//    }
+//  }
+//
+//  @Override
+//  public void tearDown() {
+//    executor.close();
+//  }
+//
+//  @Test
+//  public void testTableName() throws Exception {
+//    DummyOptions options = new DummyOptions();
+//    options.setOption(GenericJdbcConnectorConstants.INPUT_CONN_JDBCDRIVER,
+//        GenericJdbcTestConstants.DRIVER);
+//    options.setOption(GenericJdbcConnectorConstants.INPUT_CONN_CONNECTSTRING,
+//        GenericJdbcTestConstants.URL);
+//    options.setOption(GenericJdbcConnectorConstants.INPUT_TBL_NAME,
+//        tableName);
+//
+//    DummyContext context = new DummyContext();
+//
+//    Initializer initializer = new GenericJdbcExportInitializer();
+//    initializer.run(context, options);
+//
+//    verifyResult(context,
+//        "INSERT INTO " + executor.delimitIdentifier(tableName)
+//            + " VALUES (?,?,?)",
+//        GenericJdbcConnectorConstants.DEFAULT_WAREHOUSE + tableName);
+//  }
+//
+//  @Test
+//  public void testTableNameWithTableColumns() throws Exception {
+//    DummyOptions options = new DummyOptions();
+//    options.setOption(GenericJdbcConnectorConstants.INPUT_CONN_JDBCDRIVER,
+//        GenericJdbcTestConstants.DRIVER);
+//    options.setOption(GenericJdbcConnectorConstants.INPUT_CONN_CONNECTSTRING,
+//        GenericJdbcTestConstants.URL);
+//    options.setOption(GenericJdbcConnectorConstants.INPUT_TBL_NAME,
+//        tableName);
+//    options.setOption(GenericJdbcConnectorConstants.INPUT_TBL_COLUMNS,
+//        tableColumns);
+//
+//    DummyContext context = new DummyContext();
+//
+//    Initializer initializer = new GenericJdbcExportInitializer();
+//    initializer.run(context, options);
+//
+//    verifyResult(context,
+//        "INSERT INTO " + executor.delimitIdentifier(tableName)
+//            + " (" + tableColumns + ") VALUES (?,?)",
+//        GenericJdbcConnectorConstants.DEFAULT_WAREHOUSE + tableName);
+//  }
+//
+//  @Test
+//  public void testTableSql() throws Exception {
+//    DummyOptions options = new DummyOptions();
+//    options.setOption(GenericJdbcConnectorConstants.INPUT_CONN_JDBCDRIVER,
+//        GenericJdbcTestConstants.DRIVER);
+//    options.setOption(GenericJdbcConnectorConstants.INPUT_CONN_CONNECTSTRING,
+//        GenericJdbcTestConstants.URL);
+//    options.setOption(GenericJdbcConnectorConstants.INPUT_TBL_SQL,
+//        tableSql);
+//
+//    DummyContext context = new DummyContext();
+//
+//    Initializer initializer = new GenericJdbcExportInitializer();
+//    initializer.run(context, options);
+//
+//    verifyResult(context,
+//        "INSERT INTO " + executor.delimitIdentifier(tableName)
+//            + " VALUES (?,?,?)",
+//        GenericJdbcConnectorConstants.DEFAULT_WAREHOUSE
+//            + GenericJdbcConnectorConstants.DEFAULT_DATADIR);
+//  }
+//
+//  private void verifyResult(DummyContext context,
+//      String dataSql, String inputDirectory) {
+//    assertEquals(dataSql, context.getString(
+//        GenericJdbcConnectorConstants.CONNECTOR_JDBC_DATA_SQL));
+//    assertEquals(inputDirectory, context.getString(
+//        Constants.JOB_ETL_INPUT_DIRECTORY));
+//  }
+//
+//  public class DummyOptions implements Options {
+//    Hashtable<String, String> store = new Hashtable<String, String>();
+//
+//    public void setOption(String key, String value) {
+//      store.put(key, value);
+//    }
+//
+//    @Override
+//    public String getOption(String key) {
+//      return store.get(key);
+//    }
+//  }
+//
+//  public class DummyContext implements MutableContext {
+//    Hashtable<String, String> store = new Hashtable<String, String>();
+//
+//    @Override
+//    public String getString(String key) {
+//      return store.get(key);
+//    }
+//
+//    @Override
+//    public void setString(String key, String value) {
+//      store.put(key, value);
+//    }
+//  }
+
+}

http://git-wip-us.apache.org/repos/asf/sqoop/blob/0976713f/connector/connector-generic-jdbc/src/test/java/org/apache/sqoop/connector/jdbc/TestExportLoader.java
----------------------------------------------------------------------
diff --git a/connector/connector-generic-jdbc/src/test/java/org/apache/sqoop/connector/jdbc/TestExportLoader.java b/connector/connector-generic-jdbc/src/test/java/org/apache/sqoop/connector/jdbc/TestExportLoader.java
new file mode 100644
index 0000000..649808d
--- /dev/null
+++ b/connector/connector-generic-jdbc/src/test/java/org/apache/sqoop/connector/jdbc/TestExportLoader.java
@@ -0,0 +1,140 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.sqoop.connector.jdbc;
+
+import java.sql.ResultSet;
+import java.util.HashMap;
+
+import junit.framework.TestCase;
+
+import org.apache.sqoop.job.etl.Loader;
+import org.apache.sqoop.job.io.DataReader;
+import org.junit.Test;
+
+public class TestExportLoader extends TestCase {
+
+  private final String tableName;
+
+  private GenericJdbcExecutor executor;
+
+  private static final int START = -50;
+  private static final int NUMBER_OF_ROWS = 101;
+
+  public TestExportLoader() {
+    tableName = getClass().getSimpleName();
+  }
+
+  public void testVoid() { }
+
+//  @Override
+//  public void setUp() {
+//    executor = new GenericJdbcExecutor(GenericJdbcTestConstants.DRIVER,
+//        GenericJdbcTestConstants.URL, null, null);
+//
+//    if (!executor.existTable(tableName)) {
+//      executor.executeUpdate("CREATE TABLE "
+//          + executor.delimitIdentifier(tableName)
+//          + "(ICOL INTEGER PRIMARY KEY, DCOL DOUBLE, VCOL VARCHAR(20))");
+//    }
+//  }
+//
+//  @Override
+//  public void tearDown() {
+//    executor.close();
+//  }
+//
+//  @Test
+//  public void testInsert() throws Exception {
+//    DummyContext context = new DummyContext();
+//    context.setString(
+//        GenericJdbcConnectorConstants.CONNECTOR_JDBC_DRIVER,
+//        GenericJdbcTestConstants.DRIVER);
+//    context.setString(
+//        GenericJdbcConnectorConstants.CONNECTOR_JDBC_URL,
+//        GenericJdbcTestConstants.URL);
+//    context.setString(GenericJdbcConnectorConstants.CONNECTOR_JDBC_DATA_SQL,
+//        "INSERT INTO " + executor.delimitIdentifier(tableName)
+//            + " VALUES (?,?,?)");
+//
+//    Loader loader = new GenericJdbcExportLoader();
+//    DummyReader reader = new DummyReader();
+//
+//    loader.run(context, reader);
+//
+//    int index = START;
+//    ResultSet rs = executor.executeQuery("SELECT * FROM "
+//        + executor.delimitIdentifier(tableName) + " ORDER BY ICOL");
+//    while (rs.next()) {
+//      assertEquals(Integer.valueOf(index), rs.getObject(1));
+//      assertEquals(Double.valueOf(index), rs.getObject(2));
+//      assertEquals(String.valueOf(index), rs.getObject(3));
+//      index++;
+//    }
+//    assertEquals(NUMBER_OF_ROWS, index-START);
+//  }
+//
+//  public class DummyContext implements MutableContext {
+//    HashMap<String, String> store = new HashMap<String, String>();
+//
+//    @Override
+//    public String getString(String key) {
+//      return store.get(key);
+//    }
+//
+//    @Override
+//    public void setString(String key, String value) {
+//      store.put(key, value);
+//    }
+//  }
+//
+//  public class DummyReader extends DataReader {
+//    int index = 0;
+//
+//    @Override
+//    public void setFieldDelimiter(char fieldDelimiter) {
+//      // do nothing and use default delimiter
+//    }
+//
+//    @Override
+//    public Object[] readArrayRecord() {
+//      if (index < NUMBER_OF_ROWS) {
+//        Object[] array = new Object[] {
+//            new Integer(START+index),
+//            new Double(START+index),
+//            String.valueOf(START+index) };
+//        index++;
+//        return array;
+//      } else {
+//        return null;
+//      }
+//    }
+//
+//    @Override
+//    public String readCsvRecord() {
+//      fail("This method should not be invoked.");
+//      return null;
+//    }
+//
+//    @Override
+//    public Object readContent(int type) {
+//      fail("This method should not be invoked.");
+//      return null;
+//    }
+//  }
+
+}

http://git-wip-us.apache.org/repos/asf/sqoop/blob/0976713f/spi/src/main/java/org/apache/sqoop/job/Constants.java
----------------------------------------------------------------------
diff --git a/spi/src/main/java/org/apache/sqoop/job/Constants.java b/spi/src/main/java/org/apache/sqoop/job/Constants.java
index 927950d..90935cf 100644
--- a/spi/src/main/java/org/apache/sqoop/job/Constants.java
+++ b/spi/src/main/java/org/apache/sqoop/job/Constants.java
@@ -34,6 +34,9 @@ public class Constants {
   public static final String JOB_ETL_OUTPUT_DIRECTORY = PREFIX_CONFIG
       + "etl.output.directory";
 
+  public static final String JOB_ETL_INPUT_DIRECTORY = PREFIX_CONFIG
+      + "etl.input.directory";
+
   protected Constants() {
     // Disable explicit object creation
   }

http://git-wip-us.apache.org/repos/asf/sqoop/blob/0976713f/spi/src/main/java/org/apache/sqoop/job/etl/Destroyer.java
----------------------------------------------------------------------
diff --git a/spi/src/main/java/org/apache/sqoop/job/etl/Destroyer.java b/spi/src/main/java/org/apache/sqoop/job/etl/Destroyer.java
index 37b9f1b..c8dc7c3 100644
--- a/spi/src/main/java/org/apache/sqoop/job/etl/Destroyer.java
+++ b/spi/src/main/java/org/apache/sqoop/job/etl/Destroyer.java
@@ -17,7 +17,7 @@
  */
 package org.apache.sqoop.job.etl;
 
-import org.apache.sqoop.common.MapContext;
+import org.apache.sqoop.common.ImmutableContext;
 
 /**
  * This allows connector to define work to complete execution, for example,
@@ -25,7 +25,6 @@ import org.apache.sqoop.common.MapContext;
  */
 public abstract class Destroyer {
 
-  // TODO(Jarcec): This should be called with ImmutableContext
-  public abstract void run(MapContext context);
+  public abstract void run(ImmutableContext context);
 
 }

http://git-wip-us.apache.org/repos/asf/sqoop/blob/0976713f/spi/src/main/java/org/apache/sqoop/job/etl/Initializer.java
----------------------------------------------------------------------
diff --git a/spi/src/main/java/org/apache/sqoop/job/etl/Initializer.java b/spi/src/main/java/org/apache/sqoop/job/etl/Initializer.java
index 2092815..685378f 100644
--- a/spi/src/main/java/org/apache/sqoop/job/etl/Initializer.java
+++ b/spi/src/main/java/org/apache/sqoop/job/etl/Initializer.java
@@ -17,8 +17,8 @@
  */
 package org.apache.sqoop.job.etl;
 
-import org.apache.sqoop.common.MapContext;
-import org.apache.sqoop.common.MutableMapContext;
+import org.apache.sqoop.common.ImmutableContext;
+import org.apache.sqoop.common.MutableContext;
 
 import java.util.LinkedList;
 import java.util.List;
@@ -38,7 +38,7 @@ public abstract class Initializer {
    * @param connectionConfiguration Connector's connection configuration object
    * @param jobConfiguration Connector's job configuration object
    */
-  public abstract void initialize(MutableMapContext context,
+  public abstract void initialize(MutableContext context,
                                   Object connectionConfiguration,
                                   Object jobConfiguration);
 
@@ -49,7 +49,7 @@ public abstract class Initializer {
    *
    * @return
    */
-  public List<String> getJars(MapContext context,
+  public List<String> getJars(ImmutableContext context,
                               Object connectionConfiguration,
                               Object jobConfiguration) {
     return new LinkedList<String>();