You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@sqoop.apache.org by ja...@apache.org on 2014/08/08 22:12:22 UTC

[4/5] SQOOP-1376: Sqoop2: From/To: Refactor connector interface

http://git-wip-us.apache.org/repos/asf/sqoop/blob/dfd30036/connector/connector-generic-jdbc/src/main/java/org/apache/sqoop/connector/jdbc/GenericJdbcFromInitializer.java
----------------------------------------------------------------------
diff --git a/connector/connector-generic-jdbc/src/main/java/org/apache/sqoop/connector/jdbc/GenericJdbcFromInitializer.java b/connector/connector-generic-jdbc/src/main/java/org/apache/sqoop/connector/jdbc/GenericJdbcFromInitializer.java
new file mode 100644
index 0000000..55d386b
--- /dev/null
+++ b/connector/connector-generic-jdbc/src/main/java/org/apache/sqoop/connector/jdbc/GenericJdbcFromInitializer.java
@@ -0,0 +1,321 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.sqoop.connector.jdbc;
+
+import java.sql.ResultSet;
+import java.sql.ResultSetMetaData;
+import java.sql.SQLException;
+import java.util.LinkedList;
+import java.util.List;
+
+import org.apache.commons.lang.StringUtils;
+import org.apache.log4j.Logger;
+import org.apache.sqoop.common.MutableContext;
+import org.apache.sqoop.common.SqoopException;
+import org.apache.sqoop.connector.jdbc.configuration.ConnectionConfiguration;
+import org.apache.sqoop.connector.jdbc.configuration.FromJobConfiguration;
+import org.apache.sqoop.connector.jdbc.util.SqlTypesUtils;
+import org.apache.sqoop.job.Constants;
+import org.apache.sqoop.job.etl.Initializer;
+import org.apache.sqoop.job.etl.InitializerContext;
+import org.apache.sqoop.schema.Schema;
+import org.apache.sqoop.schema.type.Column;
+import org.apache.sqoop.utils.ClassUtils;
+
+public class GenericJdbcFromInitializer extends Initializer<ConnectionConfiguration, FromJobConfiguration> {
+
+  private static final Logger LOG =
+    Logger.getLogger(GenericJdbcFromInitializer.class);
+
+  private GenericJdbcExecutor executor;
+
+  @Override
+  public void initialize(InitializerContext context, ConnectionConfiguration connection, FromJobConfiguration job) {
+    configureJdbcProperties(context.getContext(), connection, job);
+    try {
+      configurePartitionProperties(context.getContext(), connection, job);
+      configureTableProperties(context.getContext(), connection, job);
+    } finally {
+      executor.close();
+    }
+  }
+
+  @Override
+  public List<String> getJars(InitializerContext context, ConnectionConfiguration connection, FromJobConfiguration job) {
+    List<String> jars = new LinkedList<String>();
+
+    jars.add(ClassUtils.jarForClass(connection.connection.jdbcDriver));
+
+    return jars;
+  }
+
+  @Override
+  public Schema getSchema(InitializerContext context, ConnectionConfiguration connectionConfiguration, FromJobConfiguration fromJobConfiguration) {
+    configureJdbcProperties(context.getContext(), connectionConfiguration, fromJobConfiguration);
+
+    String schemaName = fromJobConfiguration.table.tableName;
+    if(schemaName == null) {
+      schemaName = "Query";
+    }
+
+    Schema schema = new Schema(schemaName);
+
+    ResultSet rs = null;
+    ResultSetMetaData rsmt = null;
+    try {
+      rs = executor.executeQuery(
+        context.getString(GenericJdbcConnectorConstants.CONNECTOR_FROM_JDBC_DATA_SQL)
+          .replace(GenericJdbcConnectorConstants.SQL_CONDITIONS_TOKEN, "1 = 0")
+      );
+
+      rsmt = rs.getMetaData();
+      for (int i = 1 ; i <= rsmt.getColumnCount(); i++) {
+        Column column = SqlTypesUtils.sqlTypeToAbstractType(rsmt.getColumnType(i));
+
+        String columnName = rsmt.getColumnName(i);
+        if (columnName == null || columnName.equals("")) {
+          columnName = rsmt.getColumnLabel(i);
+          if (null == columnName) {
+            columnName = "Column " + i;
+          }
+        }
+
+        column.setName(columnName);
+        schema.addColumn(column);
+      }
+
+      return schema;
+    } catch (SQLException e) {
+      throw new SqoopException(GenericJdbcConnectorError.GENERIC_JDBC_CONNECTOR_0016, e);
+    } finally {
+      if(rs != null) {
+        try {
+          rs.close();
+        } catch (SQLException e) {
+          LOG.info("Ignoring exception while closing ResultSet", e);
+        }
+      }
+    }
+  }
+
+  private void configureJdbcProperties(MutableContext context, ConnectionConfiguration connectionConfig, FromJobConfiguration jobConfig) {
+    String driver = connectionConfig.connection.jdbcDriver;
+    String url = connectionConfig.connection.connectionString;
+    String username = connectionConfig.connection.username;
+    String password = connectionConfig.connection.password;
+
+    assert driver != null;
+    assert url != null;
+
+    executor = new GenericJdbcExecutor(driver, url, username, password);
+  }
+
+  private void configurePartitionProperties(MutableContext context, ConnectionConfiguration connectionConfig, FromJobConfiguration jobConfig) {
+    // ----- configure column name -----
+
+    String partitionColumnName = jobConfig.table.partitionColumn;
+
+    if (partitionColumnName == null) {
+      // if column is not specified by the user,
+      // find the primary key of the table (when there is a table).
+      String tableName = jobConfig.table.tableName;
+      if (tableName != null) {
+        partitionColumnName = executor.getPrimaryKey(tableName);
+      }
+    }
+
+    if (partitionColumnName != null) {
+      context.setString(
+          GenericJdbcConnectorConstants.CONNECTOR_JDBC_PARTITION_COLUMNNAME,
+          partitionColumnName);
+
+    } else {
+      throw new SqoopException(
+          GenericJdbcConnectorError.GENERIC_JDBC_CONNECTOR_0005);
+    }
+
+    // ----- configure column type, min value, and max value -----
+
+    String minMaxQuery = jobConfig.table.boundaryQuery;
+
+    if (minMaxQuery == null) {
+      StringBuilder builder = new StringBuilder();
+
+      String schemaName = jobConfig.table.schemaName;
+      String tableName = jobConfig.table.tableName;
+      String tableSql = jobConfig.table.sql;
+
+      if (tableName != null && tableSql != null) {
+        // when both table name and table sql are specified:
+        throw new SqoopException(
+            GenericJdbcConnectorError.GENERIC_JDBC_CONNECTOR_0007);
+
+      } else if (tableName != null) {
+        // when table name is specified:
+
+        // For databases that support schemas (IE: postgresql).
+        String fullTableName = (schemaName == null) ? executor.delimitIdentifier(tableName) : executor.delimitIdentifier(schemaName) + "." + executor.delimitIdentifier(tableName);
+
+        String column = partitionColumnName;
+        builder.append("SELECT MIN(");
+        builder.append(column);
+        builder.append("), MAX(");
+        builder.append(column);
+        builder.append(") FROM ");
+        builder.append(fullTableName);
+
+      } else if (tableSql != null) {
+        String column = executor.qualify(
+            partitionColumnName, GenericJdbcConnectorConstants.SUBQUERY_ALIAS);
+        builder.append("SELECT MIN(");
+        builder.append(column);
+        builder.append("), MAX(");
+        builder.append(column);
+        builder.append(") FROM ");
+        builder.append("(");
+        builder.append(tableSql.replace(
+            GenericJdbcConnectorConstants.SQL_CONDITIONS_TOKEN, "1 = 1"));
+        builder.append(") ");
+        builder.append(GenericJdbcConnectorConstants.SUBQUERY_ALIAS);
+
+      } else {
+        // when neither are specified:
+        throw new SqoopException(
+            GenericJdbcConnectorError.GENERIC_JDBC_CONNECTOR_0008);
+      }
+
+      minMaxQuery = builder.toString();
+    }
+
+
+    LOG.debug("Using minMaxQuery: " + minMaxQuery);
+    ResultSet rs = executor.executeQuery(minMaxQuery);
+    try {
+      ResultSetMetaData rsmd = rs.getMetaData();
+      if (rsmd.getColumnCount() != 2) {
+        throw new SqoopException(
+            GenericJdbcConnectorError.GENERIC_JDBC_CONNECTOR_0006);
+      }
+
+      rs.next();
+
+      int columnType = rsmd.getColumnType(1);
+      String min = rs.getString(1);
+      String max = rs.getString(2);
+
+      LOG.info("Boundaries: min=" + min + ", max=" + max + ", columnType=" + columnType);
+
+      context.setInteger(GenericJdbcConnectorConstants.CONNECTOR_JDBC_PARTITION_COLUMNTYPE, columnType);
+      context.setString(GenericJdbcConnectorConstants.CONNECTOR_JDBC_PARTITION_MINVALUE, min);
+      context.setString(GenericJdbcConnectorConstants.CONNECTOR_JDBC_PARTITION_MAXVALUE, max);
+
+    } catch (SQLException e) {
+      throw new SqoopException(
+          GenericJdbcConnectorError.GENERIC_JDBC_CONNECTOR_0006, e);
+    }
+  }
+
+  private void configureTableProperties(MutableContext context, ConnectionConfiguration connectionConfig, FromJobConfiguration jobConfig) {
+    String dataSql;
+    String fieldNames;
+
+    String schemaName = jobConfig.table.schemaName;
+    String tableName = jobConfig.table.tableName;
+    String tableSql = jobConfig.table.sql;
+    String tableColumns = jobConfig.table.columns;
+
+    if (tableName != null && tableSql != null) {
+      // when both table name and table sql are specified:
+      throw new SqoopException(
+          GenericJdbcConnectorError.GENERIC_JDBC_CONNECTOR_0007);
+
+    } else if (tableName != null) {
+      // when table name is specified:
+
+      // For databases that support schemas (IE: postgresql).
+      String fullTableName = (schemaName == null) ? executor.delimitIdentifier(tableName) : executor.delimitIdentifier(schemaName) + "." + executor.delimitIdentifier(tableName);
+
+      if (tableColumns == null) {
+        StringBuilder builder = new StringBuilder();
+        builder.append("SELECT * FROM ");
+        builder.append(fullTableName);
+        builder.append(" WHERE ");
+        builder.append(GenericJdbcConnectorConstants.SQL_CONDITIONS_TOKEN);
+        dataSql = builder.toString();
+
+        String[] queryColumns = executor.getQueryColumns(dataSql.replace(
+            GenericJdbcConnectorConstants.SQL_CONDITIONS_TOKEN, "1 = 0"));
+        fieldNames = StringUtils.join(queryColumns, ',');
+
+      } else {
+        StringBuilder builder = new StringBuilder();
+        builder.append("SELECT ");
+        builder.append(tableColumns);
+        builder.append(" FROM ");
+        builder.append(fullTableName);
+        builder.append(" WHERE ");
+        builder.append(GenericJdbcConnectorConstants.SQL_CONDITIONS_TOKEN);
+        dataSql = builder.toString();
+
+        fieldNames = tableColumns;
+      }
+    } else if (tableSql != null) {
+      // when table sql is specified:
+
+      assert tableSql.contains(GenericJdbcConnectorConstants.SQL_CONDITIONS_TOKEN);
+
+      if (tableColumns == null) {
+        dataSql = tableSql;
+
+        String[] queryColumns = executor.getQueryColumns(dataSql.replace(
+            GenericJdbcConnectorConstants.SQL_CONDITIONS_TOKEN, "1 = 0"));
+        fieldNames = StringUtils.join(queryColumns, ',');
+
+      } else {
+        String[] columns = StringUtils.split(tableColumns, ',');
+        StringBuilder builder = new StringBuilder();
+        builder.append("SELECT ");
+        builder.append(executor.qualify(
+            columns[0], GenericJdbcConnectorConstants.SUBQUERY_ALIAS));
+        for (int i = 1; i < columns.length; i++) {
+          builder.append(",");
+          builder.append(executor.qualify(
+              columns[i], GenericJdbcConnectorConstants.SUBQUERY_ALIAS));
+        }
+        builder.append(" FROM ");
+        builder.append("(");
+        builder.append(tableSql);
+        builder.append(") ");
+        builder.append(GenericJdbcConnectorConstants.SUBQUERY_ALIAS);
+        dataSql = builder.toString();
+
+        fieldNames = tableColumns;
+      }
+    } else {
+      // when neither are specified:
+      throw new SqoopException(
+          GenericJdbcConnectorError.GENERIC_JDBC_CONNECTOR_0008);
+    }
+
+    LOG.info("Using dataSql: " + dataSql);
+    LOG.info("Field names: " + fieldNames);
+
+    context.setString(GenericJdbcConnectorConstants.CONNECTOR_FROM_JDBC_DATA_SQL, dataSql);
+    context.setString(Constants.JOB_ETL_FIELD_NAMES, fieldNames);
+  }
+}

http://git-wip-us.apache.org/repos/asf/sqoop/blob/dfd30036/connector/connector-generic-jdbc/src/main/java/org/apache/sqoop/connector/jdbc/GenericJdbcImportDestroyer.java
----------------------------------------------------------------------
diff --git a/connector/connector-generic-jdbc/src/main/java/org/apache/sqoop/connector/jdbc/GenericJdbcImportDestroyer.java b/connector/connector-generic-jdbc/src/main/java/org/apache/sqoop/connector/jdbc/GenericJdbcImportDestroyer.java
deleted file mode 100644
index 2cf07fe..0000000
--- a/connector/connector-generic-jdbc/src/main/java/org/apache/sqoop/connector/jdbc/GenericJdbcImportDestroyer.java
+++ /dev/null
@@ -1,36 +0,0 @@
-/**
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *     http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package org.apache.sqoop.connector.jdbc;
-
-import org.apache.log4j.Logger;
-import org.apache.sqoop.connector.jdbc.configuration.ConnectionConfiguration;
-import org.apache.sqoop.connector.jdbc.configuration.ImportJobConfiguration;
-import org.apache.sqoop.job.etl.Destroyer;
-import org.apache.sqoop.job.etl.DestroyerContext;
-
-public class GenericJdbcImportDestroyer extends Destroyer<ConnectionConfiguration, ImportJobConfiguration> {
-
-  private static final Logger LOG =
-    Logger.getLogger(GenericJdbcImportDestroyer.class);
-
-  @Override
-  public void destroy(DestroyerContext context, ConnectionConfiguration connection, ImportJobConfiguration job) {
-    LOG.info("Running generic JDBC connector destroyer");
-  }
-
-}

http://git-wip-us.apache.org/repos/asf/sqoop/blob/dfd30036/connector/connector-generic-jdbc/src/main/java/org/apache/sqoop/connector/jdbc/GenericJdbcImportExtractor.java
----------------------------------------------------------------------
diff --git a/connector/connector-generic-jdbc/src/main/java/org/apache/sqoop/connector/jdbc/GenericJdbcImportExtractor.java b/connector/connector-generic-jdbc/src/main/java/org/apache/sqoop/connector/jdbc/GenericJdbcImportExtractor.java
deleted file mode 100644
index 3f9aa9b..0000000
--- a/connector/connector-generic-jdbc/src/main/java/org/apache/sqoop/connector/jdbc/GenericJdbcImportExtractor.java
+++ /dev/null
@@ -1,78 +0,0 @@
-/**
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *     http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package org.apache.sqoop.connector.jdbc;
-
-import java.sql.ResultSet;
-import java.sql.ResultSetMetaData;
-import java.sql.SQLException;
-
-import org.apache.log4j.Logger;
-import org.apache.sqoop.common.SqoopException;
-import org.apache.sqoop.connector.jdbc.configuration.ConnectionConfiguration;
-import org.apache.sqoop.connector.jdbc.configuration.ImportJobConfiguration;
-import org.apache.sqoop.job.etl.ExtractorContext;
-import org.apache.sqoop.job.etl.Extractor;
-
-public class GenericJdbcImportExtractor extends Extractor<ConnectionConfiguration, ImportJobConfiguration, GenericJdbcImportPartition> {
-
- public static final Logger LOG = Logger.getLogger(GenericJdbcImportExtractor.class);
-
- private long rowsRead = 0;
-  @Override
-  public void extract(ExtractorContext context, ConnectionConfiguration connection, ImportJobConfiguration job, GenericJdbcImportPartition partition) {
-    String driver = connection.connection.jdbcDriver;
-    String url = connection.connection.connectionString;
-    String username = connection.connection.username;
-    String password = connection.connection.password;
-    GenericJdbcExecutor executor = new GenericJdbcExecutor(driver, url, username, password);
-
-    String query = context.getString(GenericJdbcConnectorConstants.CONNECTOR_JDBC_DATA_SQL);
-    String conditions = partition.getConditions();
-    query = query.replace(GenericJdbcConnectorConstants.SQL_CONDITIONS_TOKEN, conditions);
-    LOG.info("Using query: " + query);
-
-    rowsRead = 0;
-    ResultSet resultSet = executor.executeQuery(query);
-
-    try {
-      ResultSetMetaData metaData = resultSet.getMetaData();
-      int column = metaData.getColumnCount();
-      while (resultSet.next()) {
-        Object[] array = new Object[column];
-        for (int i = 0; i< column; i++) {
-          array[i] = resultSet.getObject(i + 1) == null ? GenericJdbcConnectorConstants.SQL_NULL_VALUE
-              : resultSet.getObject(i + 1);
-        }
-        context.getDataWriter().writeArrayRecord(array);
-        rowsRead++;
-      }
-    } catch (SQLException e) {
-      throw new SqoopException(
-          GenericJdbcConnectorError.GENERIC_JDBC_CONNECTOR_0004, e);
-
-    } finally {
-      executor.close();
-    }
-  }
-
-  @Override
-  public long getRowsRead() {
-    return rowsRead;
-  }
-
-}

http://git-wip-us.apache.org/repos/asf/sqoop/blob/dfd30036/connector/connector-generic-jdbc/src/main/java/org/apache/sqoop/connector/jdbc/GenericJdbcImportInitializer.java
----------------------------------------------------------------------
diff --git a/connector/connector-generic-jdbc/src/main/java/org/apache/sqoop/connector/jdbc/GenericJdbcImportInitializer.java b/connector/connector-generic-jdbc/src/main/java/org/apache/sqoop/connector/jdbc/GenericJdbcImportInitializer.java
deleted file mode 100644
index 96818ba..0000000
--- a/connector/connector-generic-jdbc/src/main/java/org/apache/sqoop/connector/jdbc/GenericJdbcImportInitializer.java
+++ /dev/null
@@ -1,321 +0,0 @@
-/**
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *     http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package org.apache.sqoop.connector.jdbc;
-
-import java.sql.ResultSet;
-import java.sql.ResultSetMetaData;
-import java.sql.SQLException;
-import java.util.LinkedList;
-import java.util.List;
-
-import org.apache.commons.lang.StringUtils;
-import org.apache.log4j.Logger;
-import org.apache.sqoop.common.MutableContext;
-import org.apache.sqoop.common.SqoopException;
-import org.apache.sqoop.connector.jdbc.configuration.ConnectionConfiguration;
-import org.apache.sqoop.connector.jdbc.configuration.ImportJobConfiguration;
-import org.apache.sqoop.connector.jdbc.util.SqlTypesUtils;
-import org.apache.sqoop.job.Constants;
-import org.apache.sqoop.job.etl.Initializer;
-import org.apache.sqoop.job.etl.InitializerContext;
-import org.apache.sqoop.schema.Schema;
-import org.apache.sqoop.schema.type.Column;
-import org.apache.sqoop.utils.ClassUtils;
-
-public class GenericJdbcImportInitializer extends Initializer<ConnectionConfiguration, ImportJobConfiguration> {
-
-  private static final Logger LOG =
-    Logger.getLogger(GenericJdbcImportInitializer.class);
-
-  private GenericJdbcExecutor executor;
-
-  @Override
-  public void initialize(InitializerContext context, ConnectionConfiguration connection, ImportJobConfiguration job) {
-    configureJdbcProperties(context.getContext(), connection, job);
-    try {
-      configurePartitionProperties(context.getContext(), connection, job);
-      configureTableProperties(context.getContext(), connection, job);
-    } finally {
-      executor.close();
-    }
-  }
-
-  @Override
-  public List<String> getJars(InitializerContext context, ConnectionConfiguration connection, ImportJobConfiguration job) {
-    List<String> jars = new LinkedList<String>();
-
-    jars.add(ClassUtils.jarForClass(connection.connection.jdbcDriver));
-
-    return jars;
-  }
-
-  @Override
-  public Schema getSchema(InitializerContext context, ConnectionConfiguration connectionConfiguration, ImportJobConfiguration importJobConfiguration) {
-    configureJdbcProperties(context.getContext(), connectionConfiguration, importJobConfiguration);
-
-    String schemaName = importJobConfiguration.table.tableName;
-    if(schemaName == null) {
-      schemaName = "Query";
-    }
-
-    Schema schema = new Schema(schemaName);
-
-    ResultSet rs = null;
-    ResultSetMetaData rsmt = null;
-    try {
-      rs = executor.executeQuery(
-        context.getString(GenericJdbcConnectorConstants.CONNECTOR_JDBC_DATA_SQL)
-          .replace(GenericJdbcConnectorConstants.SQL_CONDITIONS_TOKEN, "1 = 0")
-      );
-
-      rsmt = rs.getMetaData();
-      for (int i = 1 ; i <= rsmt.getColumnCount(); i++) {
-        Column column = SqlTypesUtils.sqlTypeToAbstractType(rsmt.getColumnType(i));
-
-        String columnName = rsmt.getColumnName(i);
-        if (columnName == null || columnName.equals("")) {
-          columnName = rsmt.getColumnLabel(i);
-          if (null == columnName) {
-            columnName = "Column " + i;
-          }
-        }
-
-        column.setName(columnName);
-        schema.addColumn(column);
-      }
-
-      return schema;
-    } catch (SQLException e) {
-      throw new SqoopException(GenericJdbcConnectorError.GENERIC_JDBC_CONNECTOR_0016, e);
-    } finally {
-      if(rs != null) {
-        try {
-          rs.close();
-        } catch (SQLException e) {
-          LOG.info("Ignoring exception while closing ResultSet", e);
-        }
-      }
-    }
-  }
-
-  private void configureJdbcProperties(MutableContext context, ConnectionConfiguration connectionConfig, ImportJobConfiguration jobConfig) {
-    String driver = connectionConfig.connection.jdbcDriver;
-    String url = connectionConfig.connection.connectionString;
-    String username = connectionConfig.connection.username;
-    String password = connectionConfig.connection.password;
-
-    assert driver != null;
-    assert url != null;
-
-    executor = new GenericJdbcExecutor(driver, url, username, password);
-  }
-
-  private void configurePartitionProperties(MutableContext context, ConnectionConfiguration connectionConfig, ImportJobConfiguration jobConfig) {
-    // ----- configure column name -----
-
-    String partitionColumnName = jobConfig.table.partitionColumn;
-
-    if (partitionColumnName == null) {
-      // if column is not specified by the user,
-      // find the primary key of the table (when there is a table).
-      String tableName = jobConfig.table.tableName;
-      if (tableName != null) {
-        partitionColumnName = executor.getPrimaryKey(tableName);
-      }
-    }
-
-    if (partitionColumnName != null) {
-      context.setString(
-          GenericJdbcConnectorConstants.CONNECTOR_JDBC_PARTITION_COLUMNNAME,
-          partitionColumnName);
-
-    } else {
-      throw new SqoopException(
-          GenericJdbcConnectorError.GENERIC_JDBC_CONNECTOR_0005);
-    }
-
-    // ----- configure column type, min value, and max value -----
-
-    String minMaxQuery = jobConfig.table.boundaryQuery;
-
-    if (minMaxQuery == null) {
-      StringBuilder builder = new StringBuilder();
-
-      String schemaName = jobConfig.table.schemaName;
-      String tableName = jobConfig.table.tableName;
-      String tableSql = jobConfig.table.sql;
-
-      if (tableName != null && tableSql != null) {
-        // when both table name and table sql are specified:
-        throw new SqoopException(
-            GenericJdbcConnectorError.GENERIC_JDBC_CONNECTOR_0007);
-
-      } else if (tableName != null) {
-        // when table name is specified:
-
-        // For databases that support schemas (IE: postgresql).
-        String fullTableName = (schemaName == null) ? executor.delimitIdentifier(tableName) : executor.delimitIdentifier(schemaName) + "." + executor.delimitIdentifier(tableName);
-
-        String column = partitionColumnName;
-        builder.append("SELECT MIN(");
-        builder.append(column);
-        builder.append("), MAX(");
-        builder.append(column);
-        builder.append(") FROM ");
-        builder.append(fullTableName);
-
-      } else if (tableSql != null) {
-        String column = executor.qualify(
-            partitionColumnName, GenericJdbcConnectorConstants.SUBQUERY_ALIAS);
-        builder.append("SELECT MIN(");
-        builder.append(column);
-        builder.append("), MAX(");
-        builder.append(column);
-        builder.append(") FROM ");
-        builder.append("(");
-        builder.append(tableSql.replace(
-            GenericJdbcConnectorConstants.SQL_CONDITIONS_TOKEN, "1 = 1"));
-        builder.append(") ");
-        builder.append(GenericJdbcConnectorConstants.SUBQUERY_ALIAS);
-
-      } else {
-        // when neither are specified:
-        throw new SqoopException(
-            GenericJdbcConnectorError.GENERIC_JDBC_CONNECTOR_0008);
-      }
-
-      minMaxQuery = builder.toString();
-    }
-
-
-    LOG.debug("Using minMaxQuery: " + minMaxQuery);
-    ResultSet rs = executor.executeQuery(minMaxQuery);
-    try {
-      ResultSetMetaData rsmd = rs.getMetaData();
-      if (rsmd.getColumnCount() != 2) {
-        throw new SqoopException(
-            GenericJdbcConnectorError.GENERIC_JDBC_CONNECTOR_0006);
-      }
-
-      rs.next();
-
-      int columnType = rsmd.getColumnType(1);
-      String min = rs.getString(1);
-      String max = rs.getString(2);
-
-      LOG.info("Boundaries: min=" + min + ", max=" + max + ", columnType=" + columnType);
-
-      context.setInteger(GenericJdbcConnectorConstants.CONNECTOR_JDBC_PARTITION_COLUMNTYPE, columnType);
-      context.setString(GenericJdbcConnectorConstants.CONNECTOR_JDBC_PARTITION_MINVALUE, min);
-      context.setString(GenericJdbcConnectorConstants.CONNECTOR_JDBC_PARTITION_MAXVALUE, max);
-
-    } catch (SQLException e) {
-      throw new SqoopException(
-          GenericJdbcConnectorError.GENERIC_JDBC_CONNECTOR_0006, e);
-    }
-  }
-
-  private void configureTableProperties(MutableContext context, ConnectionConfiguration connectionConfig, ImportJobConfiguration jobConfig) {
-    String dataSql;
-    String fieldNames;
-
-    String schemaName = jobConfig.table.schemaName;
-    String tableName = jobConfig.table.tableName;
-    String tableSql = jobConfig.table.sql;
-    String tableColumns = jobConfig.table.columns;
-
-    if (tableName != null && tableSql != null) {
-      // when both table name and table sql are specified:
-      throw new SqoopException(
-          GenericJdbcConnectorError.GENERIC_JDBC_CONNECTOR_0007);
-
-    } else if (tableName != null) {
-      // when table name is specified:
-
-      // For databases that support schemas (IE: postgresql).
-      String fullTableName = (schemaName == null) ? executor.delimitIdentifier(tableName) : executor.delimitIdentifier(schemaName) + "." + executor.delimitIdentifier(tableName);
-
-      if (tableColumns == null) {
-        StringBuilder builder = new StringBuilder();
-        builder.append("SELECT * FROM ");
-        builder.append(fullTableName);
-        builder.append(" WHERE ");
-        builder.append(GenericJdbcConnectorConstants.SQL_CONDITIONS_TOKEN);
-        dataSql = builder.toString();
-
-        String[] queryColumns = executor.getQueryColumns(dataSql.replace(
-            GenericJdbcConnectorConstants.SQL_CONDITIONS_TOKEN, "1 = 0"));
-        fieldNames = StringUtils.join(queryColumns, ',');
-
-      } else {
-        StringBuilder builder = new StringBuilder();
-        builder.append("SELECT ");
-        builder.append(tableColumns);
-        builder.append(" FROM ");
-        builder.append(fullTableName);
-        builder.append(" WHERE ");
-        builder.append(GenericJdbcConnectorConstants.SQL_CONDITIONS_TOKEN);
-        dataSql = builder.toString();
-
-        fieldNames = tableColumns;
-      }
-    } else if (tableSql != null) {
-      // when table sql is specified:
-
-      assert tableSql.contains(GenericJdbcConnectorConstants.SQL_CONDITIONS_TOKEN);
-
-      if (tableColumns == null) {
-        dataSql = tableSql;
-
-        String[] queryColumns = executor.getQueryColumns(dataSql.replace(
-            GenericJdbcConnectorConstants.SQL_CONDITIONS_TOKEN, "1 = 0"));
-        fieldNames = StringUtils.join(queryColumns, ',');
-
-      } else {
-        String[] columns = StringUtils.split(tableColumns, ',');
-        StringBuilder builder = new StringBuilder();
-        builder.append("SELECT ");
-        builder.append(executor.qualify(
-            columns[0], GenericJdbcConnectorConstants.SUBQUERY_ALIAS));
-        for (int i = 1; i < columns.length; i++) {
-          builder.append(",");
-          builder.append(executor.qualify(
-              columns[i], GenericJdbcConnectorConstants.SUBQUERY_ALIAS));
-        }
-        builder.append(" FROM ");
-        builder.append("(");
-        builder.append(tableSql);
-        builder.append(") ");
-        builder.append(GenericJdbcConnectorConstants.SUBQUERY_ALIAS);
-        dataSql = builder.toString();
-
-        fieldNames = tableColumns;
-      }
-    } else {
-      // when neither are specified:
-      throw new SqoopException(
-          GenericJdbcConnectorError.GENERIC_JDBC_CONNECTOR_0008);
-    }
-
-    LOG.info("Using dataSql: " + dataSql);
-    LOG.info("Field names: " + fieldNames);
-
-    context.setString(GenericJdbcConnectorConstants.CONNECTOR_JDBC_DATA_SQL, dataSql);
-    context.setString(Constants.JOB_ETL_FIELD_NAMES, fieldNames);
-  }
-}

http://git-wip-us.apache.org/repos/asf/sqoop/blob/dfd30036/connector/connector-generic-jdbc/src/main/java/org/apache/sqoop/connector/jdbc/GenericJdbcImportPartition.java
----------------------------------------------------------------------
diff --git a/connector/connector-generic-jdbc/src/main/java/org/apache/sqoop/connector/jdbc/GenericJdbcImportPartition.java b/connector/connector-generic-jdbc/src/main/java/org/apache/sqoop/connector/jdbc/GenericJdbcImportPartition.java
deleted file mode 100644
index 66ed556..0000000
--- a/connector/connector-generic-jdbc/src/main/java/org/apache/sqoop/connector/jdbc/GenericJdbcImportPartition.java
+++ /dev/null
@@ -1,53 +0,0 @@
-/**
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *     http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package org.apache.sqoop.connector.jdbc;
-
-import java.io.DataInput;
-import java.io.DataOutput;
-import java.io.IOException;
-
-import org.apache.sqoop.job.etl.Partition;
-
-public class GenericJdbcImportPartition extends Partition {
-
-  private String conditions;
-
-  public void setConditions(String conditions) {
-    this.conditions = conditions;
-  }
-
-  public String getConditions() {
-    return conditions;
-  }
-
-  @Override
-  public void readFields(DataInput in) throws IOException {
-    conditions = in.readUTF();
-  }
-
-  @Override
-  public void write(DataOutput out) throws IOException {
-    out.writeUTF(conditions);
-  }
-
-  @Override
-  public String toString() {
-    return conditions;
-  }
-
-}

http://git-wip-us.apache.org/repos/asf/sqoop/blob/dfd30036/connector/connector-generic-jdbc/src/main/java/org/apache/sqoop/connector/jdbc/GenericJdbcImportPartitioner.java
----------------------------------------------------------------------
diff --git a/connector/connector-generic-jdbc/src/main/java/org/apache/sqoop/connector/jdbc/GenericJdbcImportPartitioner.java b/connector/connector-generic-jdbc/src/main/java/org/apache/sqoop/connector/jdbc/GenericJdbcImportPartitioner.java
deleted file mode 100644
index d103223..0000000
--- a/connector/connector-generic-jdbc/src/main/java/org/apache/sqoop/connector/jdbc/GenericJdbcImportPartitioner.java
+++ /dev/null
@@ -1,605 +0,0 @@
-/**
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *     http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package org.apache.sqoop.connector.jdbc;
-
-import java.math.BigDecimal;
-import java.sql.Date;
-import java.sql.Time;
-import java.sql.Timestamp;
-import java.sql.Types;
-import java.text.ParseException;
-import java.text.SimpleDateFormat;
-import java.util.LinkedList;
-import java.util.List;
-import java.util.TimeZone;
-
-import org.apache.sqoop.common.SqoopException;
-import org.apache.sqoop.connector.jdbc.configuration.ConnectionConfiguration;
-import org.apache.sqoop.connector.jdbc.configuration.ImportJobConfiguration;
-import org.apache.sqoop.job.etl.Partition;
-import org.apache.sqoop.job.etl.Partitioner;
-import org.apache.sqoop.job.etl.PartitionerContext;
-
-public class GenericJdbcImportPartitioner extends Partitioner<ConnectionConfiguration, ImportJobConfiguration> {
-
-  private static final BigDecimal NUMERIC_MIN_INCREMENT = new BigDecimal(10000 * Double.MIN_VALUE);
-
-
-  private long numberPartitions;
-  private String partitionColumnName;
-  private int partitionColumnType;
-  private String partitionMinValue;
-  private String partitionMaxValue;
-  private Boolean partitionColumnNull;
-
-  @Override
-  public List<Partition> getPartitions(PartitionerContext context,ConnectionConfiguration connection, ImportJobConfiguration job) {
-    List<Partition> partitions = new LinkedList<Partition>();
-
-    numberPartitions = context.getMaxPartitions();
-    partitionColumnName = context.getString(GenericJdbcConnectorConstants.CONNECTOR_JDBC_PARTITION_COLUMNNAME);
-    partitionColumnType = context.getInt(GenericJdbcConnectorConstants.CONNECTOR_JDBC_PARTITION_COLUMNTYPE, -1);
-    partitionMinValue = context.getString(GenericJdbcConnectorConstants.CONNECTOR_JDBC_PARTITION_MINVALUE);
-    partitionMaxValue = context.getString(GenericJdbcConnectorConstants.CONNECTOR_JDBC_PARTITION_MAXVALUE);
-
-    partitionColumnNull = job.table.partitionColumnNull;
-    if (partitionColumnNull == null) {
-      partitionColumnNull = false;
-    }
-
-    if (partitionMinValue == null && partitionMaxValue == null) {
-      GenericJdbcImportPartition partition = new GenericJdbcImportPartition();
-      partition.setConditions(partitionColumnName + " IS NULL");
-      partitions.add(partition);
-      return partitions;
-    }
-
-    if (partitionColumnNull) {
-      GenericJdbcImportPartition partition = new GenericJdbcImportPartition();
-      partition.setConditions(partitionColumnName + " IS NULL");
-      partitions.add(partition);
-      numberPartitions -= 1;
-    }
-
-    switch (partitionColumnType) {
-    case Types.TINYINT:
-    case Types.SMALLINT:
-    case Types.INTEGER:
-    case Types.BIGINT:
-      // Integer column
-      partitions.addAll(partitionIntegerColumn());
-      break;
-
-    case Types.REAL:
-    case Types.FLOAT:
-    case Types.DOUBLE:
-      // Floating point column
-      partitions.addAll(partitionFloatingPointColumn());
-      break;
-
-    case Types.NUMERIC:
-    case Types.DECIMAL:
-      // Decimal column
-      partitions.addAll(partitionNumericColumn());
-      break;
-
-    case Types.BIT:
-    case Types.BOOLEAN:
-      // Boolean column
-      return partitionBooleanColumn();
-
-    case Types.DATE:
-    case Types.TIME:
-    case Types.TIMESTAMP:
-      // Date time column
-      partitions.addAll(partitionDateTimeColumn());
-      break;
-
-    case Types.CHAR:
-    case Types.VARCHAR:
-    case Types.LONGVARCHAR:
-      // Text column
-      partitions.addAll(partitionTextColumn());
-      break;
-
-    default:
-      throw new SqoopException(
-          GenericJdbcConnectorError.GENERIC_JDBC_CONNECTOR_0011,
-          String.valueOf(partitionColumnType));
-    }
-
-    return partitions;
-  }
-
-  protected List<Partition> partitionDateTimeColumn() {
-    List<Partition> partitions = new LinkedList<Partition>();
-
-    long minDateValue = 0;
-    long maxDateValue = 0;
-    SimpleDateFormat sdf = null;
-    switch(partitionColumnType) {
-      case Types.DATE:
-        sdf = new SimpleDateFormat("yyyy-MM-dd");
-        minDateValue = Date.valueOf(partitionMinValue).getTime();
-        maxDateValue = Date.valueOf(partitionMaxValue).getTime();
-        break;
-      case Types.TIME:
-        sdf = new SimpleDateFormat("HH:mm:ss");
-        minDateValue = Time.valueOf(partitionMinValue).getTime();
-        maxDateValue = Time.valueOf(partitionMaxValue).getTime();
-        break;
-      case Types.TIMESTAMP:
-        sdf = new SimpleDateFormat("yyyy-MM-dd HH:mm:ss.SSS");
-        minDateValue = Timestamp.valueOf(partitionMinValue).getTime();
-        maxDateValue = Timestamp.valueOf(partitionMaxValue).getTime();
-        break;
-    }
-
-
-    minDateValue += TimeZone.getDefault().getOffset(minDateValue);
-    maxDateValue += TimeZone.getDefault().getOffset(maxDateValue);
-
-    sdf.setTimeZone(TimeZone.getTimeZone("GMT"));
-
-    long interval =  (maxDateValue - minDateValue) / numberPartitions;
-    long remainder = (maxDateValue - minDateValue) % numberPartitions;
-
-    if (interval == 0) {
-      numberPartitions = (int)remainder;
-    }
-
-    long lowerBound;
-    long upperBound = minDateValue;
-
-    Object objLB = null;
-    Object objUB = null;
-
-    for (int i = 1; i < numberPartitions; i++) {
-      lowerBound = upperBound;
-      upperBound = lowerBound + interval;
-      upperBound += (i <= remainder) ? 1 : 0;
-
-      switch(partitionColumnType) {
-        case Types.DATE:
-          objLB = new Date(lowerBound);
-          objUB = new Date(upperBound);
-          break;
-        case Types.TIME:
-          objLB = new Time(lowerBound);
-          objUB = new Time(upperBound);
-
-          break;
-        case Types.TIMESTAMP:
-          objLB = new Timestamp(lowerBound);
-          objUB = new Timestamp(upperBound);
-          break;
-      }
-
-      GenericJdbcImportPartition partition = new GenericJdbcImportPartition();
-      partition.setConditions(
-          constructDateConditions(sdf, objLB, objUB, false));
-      partitions.add(partition);
-    }
-
-    switch(partitionColumnType) {
-      case Types.DATE:
-        objLB = new Date(upperBound);
-        objUB = new Date(maxDateValue);
-        break;
-      case Types.TIME:
-        objLB = new Time(upperBound);
-        objUB = new Time(maxDateValue);
-        break;
-      case Types.TIMESTAMP:
-        objLB = new Timestamp(upperBound);
-        objUB = new Timestamp(maxDateValue);
-        break;
-    }
-
-
-    GenericJdbcImportPartition partition = new GenericJdbcImportPartition();
-    partition.setConditions(
-        constructDateConditions(sdf, objLB, objUB, true));
-    partitions.add(partition);
-    return partitions;
-  }
-
-  protected List<Partition> partitionTextColumn() {
-    List<Partition> partitions = new LinkedList<Partition>();
-
-    String minStringValue = null;
-    String maxStringValue = null;
-
-    // Remove common prefix if any as it does not affect outcome.
-    int maxPrefixLen = Math.min(partitionMinValue.length(),
-        partitionMaxValue.length());
-    // Calculate common prefix length
-    int cpLen = 0;
-
-    for (cpLen = 0; cpLen < maxPrefixLen; cpLen++) {
-      char c1 = partitionMinValue.charAt(cpLen);
-      char c2 = partitionMaxValue.charAt(cpLen);
-      if (c1 != c2) {
-        break;
-      }
-    }
-
-    // The common prefix has length 'sharedLen'. Extract it from both.
-    String prefix = partitionMinValue.substring(0, cpLen);
-    minStringValue = partitionMinValue.substring(cpLen);
-    maxStringValue = partitionMaxValue.substring(cpLen);
-
-    BigDecimal minStringBD = textToBigDecimal(minStringValue);
-    BigDecimal maxStringBD = textToBigDecimal(maxStringValue);
-
-    // Having one single value means that we can create only one single split
-    if(minStringBD.equals(maxStringBD)) {
-      GenericJdbcImportPartition partition = new GenericJdbcImportPartition();
-      partition.setConditions(constructTextConditions(prefix, 0, 0,
-        partitionMinValue, partitionMaxValue, true, true));
-      partitions.add(partition);
-      return partitions;
-    }
-
-    // Get all the split points together.
-    List<BigDecimal> splitPoints = new LinkedList<BigDecimal>();
-
-    BigDecimal splitSize = divide(maxStringBD.subtract(minStringBD),
-        new BigDecimal(numberPartitions));
-    if (splitSize.compareTo(NUMERIC_MIN_INCREMENT) < 0) {
-      splitSize = NUMERIC_MIN_INCREMENT;
-    }
-
-    BigDecimal curVal = minStringBD;
-
-    int parts = 0;
-
-    while (curVal.compareTo(maxStringBD) <= 0 && parts < numberPartitions) {
-      splitPoints.add(curVal);
-      curVal = curVal.add(splitSize);
-      // bigDecimalToText approximates to next comparison location.
-      // Make sure we are still in range
-      String text = bigDecimalToText(curVal);
-      curVal = textToBigDecimal(text);
-      ++parts;
-    }
-
-    if (splitPoints.size() == 0
-        || splitPoints.get(0).compareTo(minStringBD) != 0) {
-      splitPoints.add(0, minStringBD);
-    }
-
-    if (splitPoints.get(splitPoints.size() - 1).compareTo(maxStringBD) != 0
-        || splitPoints.size() == 1) {
-      splitPoints.add(maxStringBD);
-    }
-
-    // Turn the split points into a set of string intervals.
-    BigDecimal start = splitPoints.get(0);
-    for (int i = 1; i < splitPoints.size(); i++) {
-      BigDecimal end = splitPoints.get(i);
-
-      GenericJdbcImportPartition partition = new GenericJdbcImportPartition();
-      partition.setConditions(constructTextConditions(prefix, start, end,
-        partitionMinValue, partitionMaxValue, i == 1, i == splitPoints.size() - 1));
-      partitions.add(partition);
-
-      start = end;
-    }
-
-    return partitions;
-  }
-
-
-  protected List<Partition> partitionIntegerColumn() {
-    List<Partition> partitions = new LinkedList<Partition>();
-
-    long minValue = partitionMinValue == null ? Long.MIN_VALUE
-      : Long.parseLong(partitionMinValue);
-    long maxValue = Long.parseLong(partitionMaxValue);
-
-    long interval =  (maxValue - minValue) / numberPartitions;
-    long remainder = (maxValue - minValue) % numberPartitions;
-
-    if (interval == 0) {
-      numberPartitions = (int)remainder;
-    }
-
-    long lowerBound;
-    long upperBound = minValue;
-    for (int i = 1; i < numberPartitions; i++) {
-      lowerBound = upperBound;
-      upperBound = lowerBound + interval;
-      upperBound += (i <= remainder) ? 1 : 0;
-
-      GenericJdbcImportPartition partition = new GenericJdbcImportPartition();
-      partition.setConditions(
-          constructConditions(lowerBound, upperBound, false));
-      partitions.add(partition);
-    }
-
-    GenericJdbcImportPartition partition = new GenericJdbcImportPartition();
-    partition.setConditions(
-        constructConditions(upperBound, maxValue, true));
-    partitions.add(partition);
-
-    return partitions;
-  }
-
-  protected List<Partition> partitionFloatingPointColumn() {
-    List<Partition> partitions = new LinkedList<Partition>();
-
-
-    double minValue = partitionMinValue == null ? Double.MIN_VALUE
-      : Double.parseDouble(partitionMinValue);
-    double maxValue = Double.parseDouble(partitionMaxValue);
-
-    double interval =  (maxValue - minValue) / numberPartitions;
-
-    double lowerBound;
-    double upperBound = minValue;
-    for (int i = 1; i < numberPartitions; i++) {
-      lowerBound = upperBound;
-      upperBound = lowerBound + interval;
-
-      GenericJdbcImportPartition partition = new GenericJdbcImportPartition();
-      partition.setConditions(
-          constructConditions(lowerBound, upperBound, false));
-      partitions.add(partition);
-    }
-
-    GenericJdbcImportPartition partition = new GenericJdbcImportPartition();
-    partition.setConditions(
-        constructConditions(upperBound, maxValue, true));
-    partitions.add(partition);
-
-    return partitions;
-  }
-
-  protected List<Partition> partitionNumericColumn() {
-    List<Partition> partitions = new LinkedList<Partition>();
-    // Having one end in null is not supported
-    if (partitionMinValue == null || partitionMaxValue == null) {
-      throw new SqoopException(GenericJdbcConnectorError.GENERIC_JDBC_CONNECTOR_0015);
-    }
-
-    BigDecimal minValue = new BigDecimal(partitionMinValue);
-    BigDecimal maxValue = new BigDecimal(partitionMaxValue);
-
-    // Having one single value means that we can create only one single split
-    if(minValue.equals(maxValue)) {
-      GenericJdbcImportPartition partition = new GenericJdbcImportPartition();
-      partition.setConditions(constructConditions(minValue));
-      partitions.add(partition);
-      return partitions;
-    }
-
-    // Get all the split points together.
-    List<BigDecimal> splitPoints = new LinkedList<BigDecimal>();
-
-    BigDecimal splitSize = divide(maxValue.subtract(minValue), new BigDecimal(numberPartitions));
-
-    if (splitSize.compareTo(NUMERIC_MIN_INCREMENT) < 0) {
-      splitSize = NUMERIC_MIN_INCREMENT;
-    }
-
-    BigDecimal curVal = minValue;
-
-    while (curVal.compareTo(maxValue) <= 0) {
-      splitPoints.add(curVal);
-      curVal = curVal.add(splitSize);
-    }
-
-    if (splitPoints.get(splitPoints.size() - 1).compareTo(maxValue) != 0 || splitPoints.size() == 1) {
-      splitPoints.remove(splitPoints.size() - 1);
-      splitPoints.add(maxValue);
-    }
-
-    // Turn the split points into a set of intervals.
-    BigDecimal start = splitPoints.get(0);
-    for (int i = 1; i < splitPoints.size(); i++) {
-      BigDecimal end = splitPoints.get(i);
-
-      GenericJdbcImportPartition partition = new GenericJdbcImportPartition();
-      partition.setConditions(constructConditions(start, end, i == splitPoints.size() - 1));
-      partitions.add(partition);
-
-      start = end;
-    }
-
-    return partitions;
-  }
-
-  protected  List<Partition> partitionBooleanColumn() {
-    List<Partition> partitions = new LinkedList<Partition>();
-
-
-    Boolean minValue = parseBooleanValue(partitionMinValue);
-    Boolean maxValue = parseBooleanValue(partitionMaxValue);
-
-    StringBuilder conditions = new StringBuilder();
-
-    // Having one single value means that we can create only one single split
-    if(minValue.equals(maxValue)) {
-      GenericJdbcImportPartition partition = new GenericJdbcImportPartition();
-
-      conditions.append(partitionColumnName).append(" = ")
-          .append(maxValue);
-      partition.setConditions(conditions.toString());
-      partitions.add(partition);
-      return partitions;
-    }
-
-    GenericJdbcImportPartition partition = new GenericJdbcImportPartition();
-
-    if (partitionMinValue == null) {
-      conditions = new StringBuilder();
-      conditions.append(partitionColumnName).append(" IS NULL");
-      partition.setConditions(conditions.toString());
-      partitions.add(partition);
-    }
-    partition = new GenericJdbcImportPartition();
-    conditions = new StringBuilder();
-    conditions.append(partitionColumnName).append(" = TRUE");
-    partition.setConditions(conditions.toString());
-    partitions.add(partition);
-    partition = new GenericJdbcImportPartition();
-    conditions = new StringBuilder();
-    conditions.append(partitionColumnName).append(" = FALSE");
-    partition.setConditions(conditions.toString());
-    partitions.add(partition);
-    return partitions;
-  }
-
-  private Boolean parseBooleanValue(String value) {
-    if (value == null) {
-      return null;
-    }
-    if (value.equals("1")) {
-      return Boolean.TRUE;
-    } else if (value.equals("0")) {
-      return Boolean.FALSE;
-    } else {
-      return Boolean.parseBoolean(value);
-    }
-  }
-
-  protected BigDecimal divide(BigDecimal numerator, BigDecimal denominator) {
-    try {
-      return numerator.divide(denominator);
-    } catch (ArithmeticException ae) {
-      return numerator.divide(denominator, BigDecimal.ROUND_HALF_UP);
-    }
-  }
-
-  protected String constructConditions(
-      Object lowerBound, Object upperBound, boolean lastOne) {
-    StringBuilder conditions = new StringBuilder();
-    conditions.append(lowerBound);
-    conditions.append(" <= ");
-    conditions.append(partitionColumnName);
-    conditions.append(" AND ");
-    conditions.append(partitionColumnName);
-    conditions.append(lastOne ? " <= " : " < ");
-    conditions.append(upperBound);
-    return conditions.toString();
-  }
-
-  protected String constructConditions(Object value) {
-    return new StringBuilder()
-      .append(partitionColumnName)
-      .append(" = ")
-      .append(value)
-      .toString()
-     ;
-  }
-
-  protected String constructDateConditions(SimpleDateFormat sdf,
-      Object lowerBound, Object upperBound, boolean lastOne) {
-    StringBuilder conditions = new StringBuilder();
-    conditions.append('\'').append(sdf.format((java.util.Date)lowerBound)).append('\'');
-    conditions.append(" <= ");
-    conditions.append(partitionColumnName);
-    conditions.append(" AND ");
-    conditions.append(partitionColumnName);
-    conditions.append(lastOne ? " <= " : " < ");
-    conditions.append('\'').append(sdf.format((java.util.Date)upperBound)).append('\'');
-    return conditions.toString();
-  }
-
-  protected String constructTextConditions(String prefix, Object lowerBound, Object upperBound,
-      String lowerStringBound, String upperStringBound, boolean firstOne, boolean lastOne) {
-    StringBuilder conditions = new StringBuilder();
-    String lbString = prefix + bigDecimalToText((BigDecimal)lowerBound);
-    String ubString = prefix + bigDecimalToText((BigDecimal)upperBound);
-    conditions.append('\'').append(firstOne ? lowerStringBound : lbString).append('\'');
-    conditions.append(" <= ");
-    conditions.append(partitionColumnName);
-    conditions.append(" AND ");
-    conditions.append(partitionColumnName);
-    conditions.append(lastOne ? " <= " : " < ");
-    conditions.append('\'').append(lastOne ? upperStringBound : ubString).append('\'');
-    return conditions.toString();
-  }
-
-  /**
-   *  Converts a string to a BigDecimal representation in Base 2^21 format.
-   *  The maximum Unicode code point value defined is 10FFFF.  Although
-   *  not all database system support UTF16 and mostly we expect UCS2
-   *  characters only, for completeness, we assume that all the unicode
-   *  characters are supported.
-   *  Given a string 's' containing characters s_0, s_1,..s_n,
-   *  the string is interpreted as the number: 0.s_0 s_1 s_2 s_3 s_48)
-   *  This can be split and each split point can be converted back to
-   *  a string value for comparison purposes.   The number of characters
-   *  is restricted to prevent repeating fractions and rounding errors
-   *  towards the higher fraction positions.
-   */
-  private static final BigDecimal UNITS_BASE = new BigDecimal(0x200000);
-  private static final int MAX_CHARS_TO_CONVERT = 4;
-
-  private BigDecimal textToBigDecimal(String str) {
-    BigDecimal result = BigDecimal.ZERO;
-    BigDecimal divisor = UNITS_BASE;
-
-    int len = Math.min(str.length(), MAX_CHARS_TO_CONVERT);
-
-    for (int n = 0; n < len; ) {
-      int codePoint = str.codePointAt(n);
-      n += Character.charCount(codePoint);
-      BigDecimal val = divide(new BigDecimal(codePoint), divisor);
-      result = result.add(val);
-      divisor = divisor.multiply(UNITS_BASE);
-    }
-
-    return result;
-  }
-
-  private String bigDecimalToText(BigDecimal bd) {
-    BigDecimal curVal = bd.stripTrailingZeros();
-    StringBuilder sb = new StringBuilder();
-
-    for (int n = 0; n < MAX_CHARS_TO_CONVERT; ++n) {
-      curVal = curVal.multiply(UNITS_BASE);
-      int cp = curVal.intValue();
-      if (0 >= cp) {
-        break;
-      }
-
-      if (!Character.isDefined(cp)) {
-        int t_cp = Character.MAX_CODE_POINT < cp ? 1 : cp;
-        // We are guaranteed to find at least one character
-        while(!Character.isDefined(t_cp)) {
-          ++t_cp;
-          if (t_cp == cp) {
-            break;
-          }
-          if (t_cp >= Character.MAX_CODE_POINT || t_cp <= 0)  {
-            t_cp = 1;
-          }
-        }
-        cp = t_cp;
-      }
-      curVal = curVal.subtract(new BigDecimal(cp));
-      sb.append(Character.toChars(cp));
-    }
-
-    return sb.toString();
-  }
-
-}

http://git-wip-us.apache.org/repos/asf/sqoop/blob/dfd30036/connector/connector-generic-jdbc/src/main/java/org/apache/sqoop/connector/jdbc/GenericJdbcLoader.java
----------------------------------------------------------------------
diff --git a/connector/connector-generic-jdbc/src/main/java/org/apache/sqoop/connector/jdbc/GenericJdbcLoader.java b/connector/connector-generic-jdbc/src/main/java/org/apache/sqoop/connector/jdbc/GenericJdbcLoader.java
new file mode 100644
index 0000000..7d583c5
--- /dev/null
+++ b/connector/connector-generic-jdbc/src/main/java/org/apache/sqoop/connector/jdbc/GenericJdbcLoader.java
@@ -0,0 +1,76 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.sqoop.connector.jdbc;
+
+import org.apache.sqoop.connector.jdbc.configuration.ConnectionConfiguration;
+import org.apache.sqoop.connector.jdbc.configuration.ToJobConfiguration;
+import org.apache.sqoop.job.etl.Loader;
+import org.apache.sqoop.job.etl.LoaderContext;
+
+public class GenericJdbcLoader extends Loader<ConnectionConfiguration, ToJobConfiguration> {
+
+  public static final int DEFAULT_ROWS_PER_BATCH = 100;
+  public static final int DEFAULT_BATCHES_PER_TRANSACTION = 100;
+  private int rowsPerBatch = DEFAULT_ROWS_PER_BATCH;
+  private int batchesPerTransaction = DEFAULT_BATCHES_PER_TRANSACTION;
+
+  @Override
+  public void load(LoaderContext context, ConnectionConfiguration connection, ToJobConfiguration job) throws Exception{
+    String driver = connection.connection.jdbcDriver;
+    String url = connection.connection.connectionString;
+    String username = connection.connection.username;
+    String password = connection.connection.password;
+    GenericJdbcExecutor executor = new GenericJdbcExecutor(driver, url, username, password);
+    executor.setAutoCommit(false);
+
+    String sql = context.getString(GenericJdbcConnectorConstants.CONNECTOR_TO_JDBC_DATA_SQL);
+    executor.beginBatch(sql);
+    try {
+      int numberOfRows = 0;
+      int numberOfBatches = 0;
+      Object[] array;
+
+      while ((array = context.getDataReader().readArrayRecord()) != null) {
+        numberOfRows++;
+        executor.addBatch(array);
+
+        if (numberOfRows == rowsPerBatch) {
+          numberOfBatches++;
+          if (numberOfBatches == batchesPerTransaction) {
+            executor.executeBatch(true);
+            numberOfBatches = 0;
+          } else {
+            executor.executeBatch(false);
+          }
+          numberOfRows = 0;
+        }
+      }
+
+      if (numberOfRows != 0 || numberOfBatches != 0) {
+        // execute and commit the remaining rows
+        executor.executeBatch(true);
+      }
+
+      executor.endBatch();
+
+    } finally {
+      executor.close();
+    }
+  }
+
+}

http://git-wip-us.apache.org/repos/asf/sqoop/blob/dfd30036/connector/connector-generic-jdbc/src/main/java/org/apache/sqoop/connector/jdbc/GenericJdbcPartition.java
----------------------------------------------------------------------
diff --git a/connector/connector-generic-jdbc/src/main/java/org/apache/sqoop/connector/jdbc/GenericJdbcPartition.java b/connector/connector-generic-jdbc/src/main/java/org/apache/sqoop/connector/jdbc/GenericJdbcPartition.java
new file mode 100644
index 0000000..65400ef
--- /dev/null
+++ b/connector/connector-generic-jdbc/src/main/java/org/apache/sqoop/connector/jdbc/GenericJdbcPartition.java
@@ -0,0 +1,53 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.sqoop.connector.jdbc;
+
+import java.io.DataInput;
+import java.io.DataOutput;
+import java.io.IOException;
+
+import org.apache.sqoop.job.etl.Partition;
+
+public class GenericJdbcPartition extends Partition {
+
+  private String conditions;
+
+  public void setConditions(String conditions) {
+    this.conditions = conditions;
+  }
+
+  public String getConditions() {
+    return conditions;
+  }
+
+  @Override
+  public void readFields(DataInput in) throws IOException {
+    conditions = in.readUTF();
+  }
+
+  @Override
+  public void write(DataOutput out) throws IOException {
+    out.writeUTF(conditions);
+  }
+
+  @Override
+  public String toString() {
+    return conditions;
+  }
+
+}

http://git-wip-us.apache.org/repos/asf/sqoop/blob/dfd30036/connector/connector-generic-jdbc/src/main/java/org/apache/sqoop/connector/jdbc/GenericJdbcPartitioner.java
----------------------------------------------------------------------
diff --git a/connector/connector-generic-jdbc/src/main/java/org/apache/sqoop/connector/jdbc/GenericJdbcPartitioner.java b/connector/connector-generic-jdbc/src/main/java/org/apache/sqoop/connector/jdbc/GenericJdbcPartitioner.java
new file mode 100644
index 0000000..bf84445
--- /dev/null
+++ b/connector/connector-generic-jdbc/src/main/java/org/apache/sqoop/connector/jdbc/GenericJdbcPartitioner.java
@@ -0,0 +1,604 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.sqoop.connector.jdbc;
+
+import java.math.BigDecimal;
+import java.sql.Date;
+import java.sql.Time;
+import java.sql.Timestamp;
+import java.sql.Types;
+import java.text.SimpleDateFormat;
+import java.util.LinkedList;
+import java.util.List;
+import java.util.TimeZone;
+
+import org.apache.sqoop.common.SqoopException;
+import org.apache.sqoop.connector.jdbc.configuration.ConnectionConfiguration;
+import org.apache.sqoop.connector.jdbc.configuration.FromJobConfiguration;
+import org.apache.sqoop.job.etl.Partition;
+import org.apache.sqoop.job.etl.Partitioner;
+import org.apache.sqoop.job.etl.PartitionerContext;
+
+public class GenericJdbcPartitioner extends Partitioner<ConnectionConfiguration, FromJobConfiguration> {
+
+  private static final BigDecimal NUMERIC_MIN_INCREMENT = new BigDecimal(10000 * Double.MIN_VALUE);
+
+
+  private long numberPartitions;
+  private String partitionColumnName;
+  private int partitionColumnType;
+  private String partitionMinValue;
+  private String partitionMaxValue;
+  private Boolean partitionColumnNull;
+
+  @Override
+  public List<Partition> getPartitions(PartitionerContext context,ConnectionConfiguration connection, FromJobConfiguration job) {
+    List<Partition> partitions = new LinkedList<Partition>();
+
+    numberPartitions = context.getMaxPartitions();
+    partitionColumnName = context.getString(GenericJdbcConnectorConstants.CONNECTOR_JDBC_PARTITION_COLUMNNAME);
+    partitionColumnType = context.getInt(GenericJdbcConnectorConstants.CONNECTOR_JDBC_PARTITION_COLUMNTYPE, -1);
+    partitionMinValue = context.getString(GenericJdbcConnectorConstants.CONNECTOR_JDBC_PARTITION_MINVALUE);
+    partitionMaxValue = context.getString(GenericJdbcConnectorConstants.CONNECTOR_JDBC_PARTITION_MAXVALUE);
+
+    partitionColumnNull = job.table.partitionColumnNull;
+    if (partitionColumnNull == null) {
+      partitionColumnNull = false;
+    }
+
+    if (partitionMinValue == null && partitionMaxValue == null) {
+      GenericJdbcPartition partition = new GenericJdbcPartition();
+      partition.setConditions(partitionColumnName + " IS NULL");
+      partitions.add(partition);
+      return partitions;
+    }
+
+    if (partitionColumnNull) {
+      GenericJdbcPartition partition = new GenericJdbcPartition();
+      partition.setConditions(partitionColumnName + " IS NULL");
+      partitions.add(partition);
+      numberPartitions -= 1;
+    }
+
+    switch (partitionColumnType) {
+    case Types.TINYINT:
+    case Types.SMALLINT:
+    case Types.INTEGER:
+    case Types.BIGINT:
+      // Integer column
+      partitions.addAll(partitionIntegerColumn());
+      break;
+
+    case Types.REAL:
+    case Types.FLOAT:
+    case Types.DOUBLE:
+      // Floating point column
+      partitions.addAll(partitionFloatingPointColumn());
+      break;
+
+    case Types.NUMERIC:
+    case Types.DECIMAL:
+      // Decimal column
+      partitions.addAll(partitionNumericColumn());
+      break;
+
+    case Types.BIT:
+    case Types.BOOLEAN:
+      // Boolean column
+      return partitionBooleanColumn();
+
+    case Types.DATE:
+    case Types.TIME:
+    case Types.TIMESTAMP:
+      // Date time column
+      partitions.addAll(partitionDateTimeColumn());
+      break;
+
+    case Types.CHAR:
+    case Types.VARCHAR:
+    case Types.LONGVARCHAR:
+      // Text column
+      partitions.addAll(partitionTextColumn());
+      break;
+
+    default:
+      throw new SqoopException(
+          GenericJdbcConnectorError.GENERIC_JDBC_CONNECTOR_0011,
+          String.valueOf(partitionColumnType));
+    }
+
+    return partitions;
+  }
+
+  protected List<Partition> partitionDateTimeColumn() {
+    List<Partition> partitions = new LinkedList<Partition>();
+
+    long minDateValue = 0;
+    long maxDateValue = 0;
+    SimpleDateFormat sdf = null;
+    switch(partitionColumnType) {
+      case Types.DATE:
+        sdf = new SimpleDateFormat("yyyy-MM-dd");
+        minDateValue = Date.valueOf(partitionMinValue).getTime();
+        maxDateValue = Date.valueOf(partitionMaxValue).getTime();
+        break;
+      case Types.TIME:
+        sdf = new SimpleDateFormat("HH:mm:ss");
+        minDateValue = Time.valueOf(partitionMinValue).getTime();
+        maxDateValue = Time.valueOf(partitionMaxValue).getTime();
+        break;
+      case Types.TIMESTAMP:
+        sdf = new SimpleDateFormat("yyyy-MM-dd HH:mm:ss.SSS");
+        minDateValue = Timestamp.valueOf(partitionMinValue).getTime();
+        maxDateValue = Timestamp.valueOf(partitionMaxValue).getTime();
+        break;
+    }
+
+
+    minDateValue += TimeZone.getDefault().getOffset(minDateValue);
+    maxDateValue += TimeZone.getDefault().getOffset(maxDateValue);
+
+    sdf.setTimeZone(TimeZone.getTimeZone("GMT"));
+
+    long interval =  (maxDateValue - minDateValue) / numberPartitions;
+    long remainder = (maxDateValue - minDateValue) % numberPartitions;
+
+    if (interval == 0) {
+      numberPartitions = (int)remainder;
+    }
+
+    long lowerBound;
+    long upperBound = minDateValue;
+
+    Object objLB = null;
+    Object objUB = null;
+
+    for (int i = 1; i < numberPartitions; i++) {
+      lowerBound = upperBound;
+      upperBound = lowerBound + interval;
+      upperBound += (i <= remainder) ? 1 : 0;
+
+      switch(partitionColumnType) {
+        case Types.DATE:
+          objLB = new Date(lowerBound);
+          objUB = new Date(upperBound);
+          break;
+        case Types.TIME:
+          objLB = new Time(lowerBound);
+          objUB = new Time(upperBound);
+
+          break;
+        case Types.TIMESTAMP:
+          objLB = new Timestamp(lowerBound);
+          objUB = new Timestamp(upperBound);
+          break;
+      }
+
+      GenericJdbcPartition partition = new GenericJdbcPartition();
+      partition.setConditions(
+          constructDateConditions(sdf, objLB, objUB, false));
+      partitions.add(partition);
+    }
+
+    switch(partitionColumnType) {
+      case Types.DATE:
+        objLB = new Date(upperBound);
+        objUB = new Date(maxDateValue);
+        break;
+      case Types.TIME:
+        objLB = new Time(upperBound);
+        objUB = new Time(maxDateValue);
+        break;
+      case Types.TIMESTAMP:
+        objLB = new Timestamp(upperBound);
+        objUB = new Timestamp(maxDateValue);
+        break;
+    }
+
+
+    GenericJdbcPartition partition = new GenericJdbcPartition();
+    partition.setConditions(
+        constructDateConditions(sdf, objLB, objUB, true));
+    partitions.add(partition);
+    return partitions;
+  }
+
+  protected List<Partition> partitionTextColumn() {
+    List<Partition> partitions = new LinkedList<Partition>();
+
+    String minStringValue = null;
+    String maxStringValue = null;
+
+    // Remove common prefix if any as it does not affect outcome.
+    int maxPrefixLen = Math.min(partitionMinValue.length(),
+        partitionMaxValue.length());
+    // Calculate common prefix length
+    int cpLen = 0;
+
+    for (cpLen = 0; cpLen < maxPrefixLen; cpLen++) {
+      char c1 = partitionMinValue.charAt(cpLen);
+      char c2 = partitionMaxValue.charAt(cpLen);
+      if (c1 != c2) {
+        break;
+      }
+    }
+
+    // The common prefix has length 'sharedLen'. Extract it from both.
+    String prefix = partitionMinValue.substring(0, cpLen);
+    minStringValue = partitionMinValue.substring(cpLen);
+    maxStringValue = partitionMaxValue.substring(cpLen);
+
+    BigDecimal minStringBD = textToBigDecimal(minStringValue);
+    BigDecimal maxStringBD = textToBigDecimal(maxStringValue);
+
+    // Having one single value means that we can create only one single split
+    if(minStringBD.equals(maxStringBD)) {
+      GenericJdbcPartition partition = new GenericJdbcPartition();
+      partition.setConditions(constructTextConditions(prefix, 0, 0,
+        partitionMinValue, partitionMaxValue, true, true));
+      partitions.add(partition);
+      return partitions;
+    }
+
+    // Get all the split points together.
+    List<BigDecimal> splitPoints = new LinkedList<BigDecimal>();
+
+    BigDecimal splitSize = divide(maxStringBD.subtract(minStringBD),
+        new BigDecimal(numberPartitions));
+    if (splitSize.compareTo(NUMERIC_MIN_INCREMENT) < 0) {
+      splitSize = NUMERIC_MIN_INCREMENT;
+    }
+
+    BigDecimal curVal = minStringBD;
+
+    int parts = 0;
+
+    while (curVal.compareTo(maxStringBD) <= 0 && parts < numberPartitions) {
+      splitPoints.add(curVal);
+      curVal = curVal.add(splitSize);
+      // bigDecimalToText approximates to next comparison location.
+      // Make sure we are still in range
+      String text = bigDecimalToText(curVal);
+      curVal = textToBigDecimal(text);
+      ++parts;
+    }
+
+    if (splitPoints.size() == 0
+        || splitPoints.get(0).compareTo(minStringBD) != 0) {
+      splitPoints.add(0, minStringBD);
+    }
+
+    if (splitPoints.get(splitPoints.size() - 1).compareTo(maxStringBD) != 0
+        || splitPoints.size() == 1) {
+      splitPoints.add(maxStringBD);
+    }
+
+    // Turn the split points into a set of string intervals.
+    BigDecimal start = splitPoints.get(0);
+    for (int i = 1; i < splitPoints.size(); i++) {
+      BigDecimal end = splitPoints.get(i);
+
+      GenericJdbcPartition partition = new GenericJdbcPartition();
+      partition.setConditions(constructTextConditions(prefix, start, end,
+        partitionMinValue, partitionMaxValue, i == 1, i == splitPoints.size() - 1));
+      partitions.add(partition);
+
+      start = end;
+    }
+
+    return partitions;
+  }
+
+
+  protected List<Partition> partitionIntegerColumn() {
+    List<Partition> partitions = new LinkedList<Partition>();
+
+    long minValue = partitionMinValue == null ? Long.MIN_VALUE
+      : Long.parseLong(partitionMinValue);
+    long maxValue = Long.parseLong(partitionMaxValue);
+
+    long interval =  (maxValue - minValue) / numberPartitions;
+    long remainder = (maxValue - minValue) % numberPartitions;
+
+    if (interval == 0) {
+      numberPartitions = (int)remainder;
+    }
+
+    long lowerBound;
+    long upperBound = minValue;
+    for (int i = 1; i < numberPartitions; i++) {
+      lowerBound = upperBound;
+      upperBound = lowerBound + interval;
+      upperBound += (i <= remainder) ? 1 : 0;
+
+      GenericJdbcPartition partition = new GenericJdbcPartition();
+      partition.setConditions(
+          constructConditions(lowerBound, upperBound, false));
+      partitions.add(partition);
+    }
+
+    GenericJdbcPartition partition = new GenericJdbcPartition();
+    partition.setConditions(
+        constructConditions(upperBound, maxValue, true));
+    partitions.add(partition);
+
+    return partitions;
+  }
+
+  protected List<Partition> partitionFloatingPointColumn() {
+    List<Partition> partitions = new LinkedList<Partition>();
+
+
+    double minValue = partitionMinValue == null ? Double.MIN_VALUE
+      : Double.parseDouble(partitionMinValue);
+    double maxValue = Double.parseDouble(partitionMaxValue);
+
+    double interval =  (maxValue - minValue) / numberPartitions;
+
+    double lowerBound;
+    double upperBound = minValue;
+    for (int i = 1; i < numberPartitions; i++) {
+      lowerBound = upperBound;
+      upperBound = lowerBound + interval;
+
+      GenericJdbcPartition partition = new GenericJdbcPartition();
+      partition.setConditions(
+          constructConditions(lowerBound, upperBound, false));
+      partitions.add(partition);
+    }
+
+    GenericJdbcPartition partition = new GenericJdbcPartition();
+    partition.setConditions(
+        constructConditions(upperBound, maxValue, true));
+    partitions.add(partition);
+
+    return partitions;
+  }
+
+  protected List<Partition> partitionNumericColumn() {
+    List<Partition> partitions = new LinkedList<Partition>();
+    // Having one end in null is not supported
+    if (partitionMinValue == null || partitionMaxValue == null) {
+      throw new SqoopException(GenericJdbcConnectorError.GENERIC_JDBC_CONNECTOR_0015);
+    }
+
+    BigDecimal minValue = new BigDecimal(partitionMinValue);
+    BigDecimal maxValue = new BigDecimal(partitionMaxValue);
+
+    // Having one single value means that we can create only one single split
+    if(minValue.equals(maxValue)) {
+      GenericJdbcPartition partition = new GenericJdbcPartition();
+      partition.setConditions(constructConditions(minValue));
+      partitions.add(partition);
+      return partitions;
+    }
+
+    // Get all the split points together.
+    List<BigDecimal> splitPoints = new LinkedList<BigDecimal>();
+
+    BigDecimal splitSize = divide(maxValue.subtract(minValue), new BigDecimal(numberPartitions));
+
+    if (splitSize.compareTo(NUMERIC_MIN_INCREMENT) < 0) {
+      splitSize = NUMERIC_MIN_INCREMENT;
+    }
+
+    BigDecimal curVal = minValue;
+
+    while (curVal.compareTo(maxValue) <= 0) {
+      splitPoints.add(curVal);
+      curVal = curVal.add(splitSize);
+    }
+
+    if (splitPoints.get(splitPoints.size() - 1).compareTo(maxValue) != 0 || splitPoints.size() == 1) {
+      splitPoints.remove(splitPoints.size() - 1);
+      splitPoints.add(maxValue);
+    }
+
+    // Turn the split points into a set of intervals.
+    BigDecimal start = splitPoints.get(0);
+    for (int i = 1; i < splitPoints.size(); i++) {
+      BigDecimal end = splitPoints.get(i);
+
+      GenericJdbcPartition partition = new GenericJdbcPartition();
+      partition.setConditions(constructConditions(start, end, i == splitPoints.size() - 1));
+      partitions.add(partition);
+
+      start = end;
+    }
+
+    return partitions;
+  }
+
+  protected  List<Partition> partitionBooleanColumn() {
+    List<Partition> partitions = new LinkedList<Partition>();
+
+
+    Boolean minValue = parseBooleanValue(partitionMinValue);
+    Boolean maxValue = parseBooleanValue(partitionMaxValue);
+
+    StringBuilder conditions = new StringBuilder();
+
+    // Having one single value means that we can create only one single split
+    if(minValue.equals(maxValue)) {
+      GenericJdbcPartition partition = new GenericJdbcPartition();
+
+      conditions.append(partitionColumnName).append(" = ")
+          .append(maxValue);
+      partition.setConditions(conditions.toString());
+      partitions.add(partition);
+      return partitions;
+    }
+
+    GenericJdbcPartition partition = new GenericJdbcPartition();
+
+    if (partitionMinValue == null) {
+      conditions = new StringBuilder();
+      conditions.append(partitionColumnName).append(" IS NULL");
+      partition.setConditions(conditions.toString());
+      partitions.add(partition);
+    }
+    partition = new GenericJdbcPartition();
+    conditions = new StringBuilder();
+    conditions.append(partitionColumnName).append(" = TRUE");
+    partition.setConditions(conditions.toString());
+    partitions.add(partition);
+    partition = new GenericJdbcPartition();
+    conditions = new StringBuilder();
+    conditions.append(partitionColumnName).append(" = FALSE");
+    partition.setConditions(conditions.toString());
+    partitions.add(partition);
+    return partitions;
+  }
+
+  private Boolean parseBooleanValue(String value) {
+    if (value == null) {
+      return null;
+    }
+    if (value.equals("1")) {
+      return Boolean.TRUE;
+    } else if (value.equals("0")) {
+      return Boolean.FALSE;
+    } else {
+      return Boolean.parseBoolean(value);
+    }
+  }
+
+  protected BigDecimal divide(BigDecimal numerator, BigDecimal denominator) {
+    try {
+      return numerator.divide(denominator);
+    } catch (ArithmeticException ae) {
+      return numerator.divide(denominator, BigDecimal.ROUND_HALF_UP);
+    }
+  }
+
+  protected String constructConditions(
+      Object lowerBound, Object upperBound, boolean lastOne) {
+    StringBuilder conditions = new StringBuilder();
+    conditions.append(lowerBound);
+    conditions.append(" <= ");
+    conditions.append(partitionColumnName);
+    conditions.append(" AND ");
+    conditions.append(partitionColumnName);
+    conditions.append(lastOne ? " <= " : " < ");
+    conditions.append(upperBound);
+    return conditions.toString();
+  }
+
+  protected String constructConditions(Object value) {
+    return new StringBuilder()
+      .append(partitionColumnName)
+      .append(" = ")
+      .append(value)
+      .toString()
+     ;
+  }
+
+  protected String constructDateConditions(SimpleDateFormat sdf,
+      Object lowerBound, Object upperBound, boolean lastOne) {
+    StringBuilder conditions = new StringBuilder();
+    conditions.append('\'').append(sdf.format((java.util.Date)lowerBound)).append('\'');
+    conditions.append(" <= ");
+    conditions.append(partitionColumnName);
+    conditions.append(" AND ");
+    conditions.append(partitionColumnName);
+    conditions.append(lastOne ? " <= " : " < ");
+    conditions.append('\'').append(sdf.format((java.util.Date)upperBound)).append('\'');
+    return conditions.toString();
+  }
+
+  protected String constructTextConditions(String prefix, Object lowerBound, Object upperBound,
+      String lowerStringBound, String upperStringBound, boolean firstOne, boolean lastOne) {
+    StringBuilder conditions = new StringBuilder();
+    String lbString = prefix + bigDecimalToText((BigDecimal)lowerBound);
+    String ubString = prefix + bigDecimalToText((BigDecimal)upperBound);
+    conditions.append('\'').append(firstOne ? lowerStringBound : lbString).append('\'');
+    conditions.append(" <= ");
+    conditions.append(partitionColumnName);
+    conditions.append(" AND ");
+    conditions.append(partitionColumnName);
+    conditions.append(lastOne ? " <= " : " < ");
+    conditions.append('\'').append(lastOne ? upperStringBound : ubString).append('\'');
+    return conditions.toString();
+  }
+
+  /**
+   *  Converts a string to a BigDecimal representation in Base 2^21 format.
+   *  The maximum Unicode code point value defined is 10FFFF.  Although
+   *  not all database system support UTF16 and mostly we expect UCS2
+   *  characters only, for completeness, we assume that all the unicode
+   *  characters are supported.
+   *  Given a string 's' containing characters s_0, s_1,..s_n,
+   *  the string is interpreted as the number: 0.s_0 s_1 s_2 s_3 s_48)
+   *  This can be split and each split point can be converted back to
+   *  a string value for comparison purposes.   The number of characters
+   *  is restricted to prevent repeating fractions and rounding errors
+   *  towards the higher fraction positions.
+   */
+  private static final BigDecimal UNITS_BASE = new BigDecimal(0x200000);
+  private static final int MAX_CHARS_TO_CONVERT = 4;
+
+  private BigDecimal textToBigDecimal(String str) {
+    BigDecimal result = BigDecimal.ZERO;
+    BigDecimal divisor = UNITS_BASE;
+
+    int len = Math.min(str.length(), MAX_CHARS_TO_CONVERT);
+
+    for (int n = 0; n < len; ) {
+      int codePoint = str.codePointAt(n);
+      n += Character.charCount(codePoint);
+      BigDecimal val = divide(new BigDecimal(codePoint), divisor);
+      result = result.add(val);
+      divisor = divisor.multiply(UNITS_BASE);
+    }
+
+    return result;
+  }
+
+  private String bigDecimalToText(BigDecimal bd) {
+    BigDecimal curVal = bd.stripTrailingZeros();
+    StringBuilder sb = new StringBuilder();
+
+    for (int n = 0; n < MAX_CHARS_TO_CONVERT; ++n) {
+      curVal = curVal.multiply(UNITS_BASE);
+      int cp = curVal.intValue();
+      if (0 >= cp) {
+        break;
+      }
+
+      if (!Character.isDefined(cp)) {
+        int t_cp = Character.MAX_CODE_POINT < cp ? 1 : cp;
+        // We are guaranteed to find at least one character
+        while(!Character.isDefined(t_cp)) {
+          ++t_cp;
+          if (t_cp == cp) {
+            break;
+          }
+          if (t_cp >= Character.MAX_CODE_POINT || t_cp <= 0)  {
+            t_cp = 1;
+          }
+        }
+        cp = t_cp;
+      }
+      curVal = curVal.subtract(new BigDecimal(cp));
+      sb.append(Character.toChars(cp));
+    }
+
+    return sb.toString();
+  }
+
+}

http://git-wip-us.apache.org/repos/asf/sqoop/blob/dfd30036/connector/connector-generic-jdbc/src/main/java/org/apache/sqoop/connector/jdbc/GenericJdbcToDestroyer.java
----------------------------------------------------------------------
diff --git a/connector/connector-generic-jdbc/src/main/java/org/apache/sqoop/connector/jdbc/GenericJdbcToDestroyer.java b/connector/connector-generic-jdbc/src/main/java/org/apache/sqoop/connector/jdbc/GenericJdbcToDestroyer.java
new file mode 100644
index 0000000..6be3e12
--- /dev/null
+++ b/connector/connector-generic-jdbc/src/main/java/org/apache/sqoop/connector/jdbc/GenericJdbcToDestroyer.java
@@ -0,0 +1,62 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.sqoop.connector.jdbc;
+
+import org.apache.log4j.Logger;
+import org.apache.sqoop.connector.jdbc.configuration.ConnectionConfiguration;
+import org.apache.sqoop.connector.jdbc.configuration.ToJobConfiguration;
+import org.apache.sqoop.job.etl.Destroyer;
+import org.apache.sqoop.job.etl.DestroyerContext;
+
+public class GenericJdbcToDestroyer extends Destroyer<ConnectionConfiguration, ToJobConfiguration> {
+
+  private static final Logger LOG = Logger.getLogger(GenericJdbcToDestroyer.class);
+
+  @Override
+  public void destroy(DestroyerContext context, ConnectionConfiguration connection, ToJobConfiguration job) {
+    LOG.info("Running generic JDBC connector destroyer");
+
+    final String tableName = job.table.tableName;
+    final String stageTableName = job.table.stageTableName;
+    final boolean stageEnabled = stageTableName != null &&
+      stageTableName.length() > 0;
+    if(stageEnabled) {
+      moveDataToDestinationTable(connection,
+        context.isSuccess(), stageTableName, tableName);
+    }
+  }
+
+  private void moveDataToDestinationTable(ConnectionConfiguration connectorConf,
+    boolean success, String stageTableName, String tableName) {
+    GenericJdbcExecutor executor =
+      new GenericJdbcExecutor(connectorConf.connection.jdbcDriver,
+        connectorConf.connection.connectionString,
+        connectorConf.connection.username,
+        connectorConf.connection.password);
+
+    if(success) {
+      LOG.info("Job completed, transferring data from stage table to " +
+        "destination table.");
+      executor.migrateData(stageTableName, tableName);
+    } else {
+      LOG.warn("Job failed, clearing stage table.");
+      executor.deleteTableData(stageTableName);
+    }
+  }
+
+}