You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@sqoop.apache.org by ja...@apache.org on 2015/12/16 20:35:46 UTC

sqoop git commit: SQOOP-2727: Sqoop2: Use PreparedStatements for constructing GenericJDBCPartitions

Repository: sqoop
Updated Branches:
  refs/heads/sqoop2 0ab7c05e1 -> 6e33d6afb


SQOOP-2727: Sqoop2: Use PreparedStatements for constructing GenericJDBCPartitions

(Abraham Fine via Jarek Jarcec Cecho)


Project: http://git-wip-us.apache.org/repos/asf/sqoop/repo
Commit: http://git-wip-us.apache.org/repos/asf/sqoop/commit/6e33d6af
Tree: http://git-wip-us.apache.org/repos/asf/sqoop/tree/6e33d6af
Diff: http://git-wip-us.apache.org/repos/asf/sqoop/diff/6e33d6af

Branch: refs/heads/sqoop2
Commit: 6e33d6afb43a2e6c2b6ac4efa001b3a0380dec6e
Parents: 0ab7c05
Author: Jarek Jarcec Cecho <ja...@apache.org>
Authored: Wed Dec 16 20:35:17 2015 +0100
Committer: Jarek Jarcec Cecho <ja...@apache.org>
Committed: Wed Dec 16 20:35:17 2015 +0100

----------------------------------------------------------------------
 .../sqoop/common/test/db/DatabaseProvider.java  |   4 +-
 .../connector/jdbc/GenericJdbcExecutor.java     |   2 +-
 .../connector/jdbc/GenericJdbcExtractor.java    |  33 ++-
 .../jdbc/GenericJdbcFromInitializer.java        |  26 +-
 .../connector/jdbc/GenericJdbcPartition.java    | 217 ++++++++++++++-
 .../connector/jdbc/GenericJdbcPartitioner.java  | 270 ++++++++-----------
 .../sqoop/connector/jdbc/TestExtractor.java     |  58 ++--
 .../jdbc/TestGenericJdbcPartition.java          |  78 ++++++
 .../sqoop/connector/jdbc/TestPartitioner.java   |  68 ++++-
 .../jdbc/generic/TableStagedRDBMSTest.java      |   6 +
 10 files changed, 544 insertions(+), 218 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/sqoop/blob/6e33d6af/common-test/src/main/java/org/apache/sqoop/common/test/db/DatabaseProvider.java
----------------------------------------------------------------------
diff --git a/common-test/src/main/java/org/apache/sqoop/common/test/db/DatabaseProvider.java b/common-test/src/main/java/org/apache/sqoop/common/test/db/DatabaseProvider.java
index 3b0b568..afc5016 100644
--- a/common-test/src/main/java/org/apache/sqoop/common/test/db/DatabaseProvider.java
+++ b/common-test/src/main/java/org/apache/sqoop/common/test/db/DatabaseProvider.java
@@ -371,9 +371,9 @@ abstract public class DatabaseProvider {
     } else if (value instanceof Character) {
       preparedStatement.setString(parameterIndex, value.toString());
     } else if (value instanceof Timestamp) {
-      preparedStatement.setString(parameterIndex, value.toString());
+      preparedStatement.setTimestamp(parameterIndex, (Timestamp) value);
     } else if (value instanceof BigDecimal) {
-      preparedStatement.setString(parameterIndex, value.toString());
+      preparedStatement.setBigDecimal(parameterIndex, (BigDecimal) value);
     } else {
       preparedStatement.setObject(parameterIndex, value);
     }

http://git-wip-us.apache.org/repos/asf/sqoop/blob/6e33d6af/connector/connector-generic-jdbc/src/main/java/org/apache/sqoop/connector/jdbc/GenericJdbcExecutor.java
----------------------------------------------------------------------
diff --git a/connector/connector-generic-jdbc/src/main/java/org/apache/sqoop/connector/jdbc/GenericJdbcExecutor.java b/connector/connector-generic-jdbc/src/main/java/org/apache/sqoop/connector/jdbc/GenericJdbcExecutor.java
index ff33a4b..7c943c2 100644
--- a/connector/connector-generic-jdbc/src/main/java/org/apache/sqoop/connector/jdbc/GenericJdbcExecutor.java
+++ b/connector/connector-generic-jdbc/src/main/java/org/apache/sqoop/connector/jdbc/GenericJdbcExecutor.java
@@ -50,7 +50,7 @@ import java.util.Set;
  */
 @edu.umd.cs.findbugs.annotations.SuppressWarnings({
         "SQL_PREPARED_STATEMENT_GENERATED_FROM_NONCONSTANT_STRING", "SQL_NONCONSTANT_STRING_PASSED_TO_EXECUTE"})
-public class GenericJdbcExecutor {
+public class GenericJdbcExecutor implements AutoCloseable {
 
   private static final Logger LOG = Logger.getLogger(GenericJdbcExecutor.class);
 

http://git-wip-us.apache.org/repos/asf/sqoop/blob/6e33d6af/connector/connector-generic-jdbc/src/main/java/org/apache/sqoop/connector/jdbc/GenericJdbcExtractor.java
----------------------------------------------------------------------
diff --git a/connector/connector-generic-jdbc/src/main/java/org/apache/sqoop/connector/jdbc/GenericJdbcExtractor.java b/connector/connector-generic-jdbc/src/main/java/org/apache/sqoop/connector/jdbc/GenericJdbcExtractor.java
index edb2754..d56abec 100644
--- a/connector/connector-generic-jdbc/src/main/java/org/apache/sqoop/connector/jdbc/GenericJdbcExtractor.java
+++ b/connector/connector-generic-jdbc/src/main/java/org/apache/sqoop/connector/jdbc/GenericJdbcExtractor.java
@@ -17,6 +17,7 @@
  */
 package org.apache.sqoop.connector.jdbc;
 
+import java.sql.PreparedStatement;
 import java.sql.ResultSet;
 import java.sql.ResultSetMetaData;
 import java.sql.SQLException;
@@ -35,7 +36,8 @@ import org.joda.time.LocalDate;
 import org.joda.time.LocalDateTime;
 import org.joda.time.LocalTime;
 
-@edu.umd.cs.findbugs.annotations.SuppressWarnings("SQL_NONCONSTANT_STRING_PASSED_TO_EXECUTE")
+@edu.umd.cs.findbugs.annotations.SuppressWarnings(
+  {"SQL_PREPARED_STATEMENT_GENERATED_FROM_NONCONSTANT_STRING", "OBL_UNSATISFIED_OBLIGATION_EXCEPTION_EDGE"})
 public class GenericJdbcExtractor extends Extractor<LinkConfiguration, FromJobConfiguration, GenericJdbcPartition> {
 
  public static final Logger LOG = Logger.getLogger(GenericJdbcExtractor.class);
@@ -43,19 +45,14 @@ public class GenericJdbcExtractor extends Extractor<LinkConfiguration, FromJobCo
  private long rowsRead = 0;
   @Override
   public void extract(ExtractorContext context, LinkConfiguration linkConfig, FromJobConfiguration fromJobConfig, GenericJdbcPartition partition) {
-    GenericJdbcExecutor executor = new GenericJdbcExecutor(linkConfig);
-
-    String query = context.getString(GenericJdbcConnectorConstants.CONNECTOR_JDBC_FROM_DATA_SQL);
-    String conditions = partition.getConditions();
-    query = query.replace(GenericJdbcConnectorConstants.SQL_CONDITIONS_TOKEN, conditions);
-    LOG.info("Using query: " + query);
-
     rowsRead = 0;
     Schema schema = context.getSchema();
     Column[] schemaColumns = schema.getColumnsArray();
-    try (Statement statement = executor.getConnection().createStatement(
-            ResultSet.TYPE_FORWARD_ONLY, ResultSet.CONCUR_READ_ONLY);
-         ResultSet resultSet = statement.executeQuery(query);) {
+    try (
+      GenericJdbcExecutor executor = new GenericJdbcExecutor(linkConfig);
+      PreparedStatement preparedStatement = createPreparedStatement(context, executor, partition);
+      ResultSet resultSet = preparedStatement.executeQuery()
+    ) {
       ResultSetMetaData metaData = resultSet.getMetaData();
       int columnCount = metaData.getColumnCount();
       if (schemaColumns.length != columnCount) {
@@ -96,8 +93,6 @@ public class GenericJdbcExtractor extends Extractor<LinkConfiguration, FromJobCo
       throw new SqoopException(
           GenericJdbcConnectorError.GENERIC_JDBC_CONNECTOR_0004, e);
 
-    } finally {
-      executor.close();
     }
   }
 
@@ -106,4 +101,16 @@ public class GenericJdbcExtractor extends Extractor<LinkConfiguration, FromJobCo
     return rowsRead;
   }
 
+  private PreparedStatement createPreparedStatement(ExtractorContext context, GenericJdbcExecutor executor, GenericJdbcPartition partition) throws SQLException {
+    String query = context.getString(GenericJdbcConnectorConstants.CONNECTOR_JDBC_FROM_DATA_SQL);
+    String condition = partition.getCondition();
+    query = query.replace(GenericJdbcConnectorConstants.SQL_CONDITIONS_TOKEN, condition);
+
+    LOG.info("Creating PreparedStatement with query: " + query);
+
+    PreparedStatement preparedStatement = executor.getConnection().prepareStatement(query, ResultSet.TYPE_FORWARD_ONLY, ResultSet.CONCUR_READ_ONLY);
+    partition.addParamsToPreparedStatement(preparedStatement);
+
+    return preparedStatement;
+  }
 }

http://git-wip-us.apache.org/repos/asf/sqoop/blob/6e33d6af/connector/connector-generic-jdbc/src/main/java/org/apache/sqoop/connector/jdbc/GenericJdbcFromInitializer.java
----------------------------------------------------------------------
diff --git a/connector/connector-generic-jdbc/src/main/java/org/apache/sqoop/connector/jdbc/GenericJdbcFromInitializer.java b/connector/connector-generic-jdbc/src/main/java/org/apache/sqoop/connector/jdbc/GenericJdbcFromInitializer.java
index 41be8bb..13715a0 100644
--- a/connector/connector-generic-jdbc/src/main/java/org/apache/sqoop/connector/jdbc/GenericJdbcFromInitializer.java
+++ b/connector/connector-generic-jdbc/src/main/java/org/apache/sqoop/connector/jdbc/GenericJdbcFromInitializer.java
@@ -259,10 +259,6 @@ public class GenericJdbcFromInitializer extends Initializer<LinkConfiguration, F
         throw new SqoopException(GenericJdbcConnectorError.GENERIC_JDBC_CONNECTOR_0006);
       }
 
-      // Boundaries for the job
-      String min = rs.getString(1);
-      String max = rs.getString(2);
-
       // Type of the partition column
       ResultSetMetaData rsmd = rs.getMetaData();
       if (rsmd.getColumnCount() != 2) {
@@ -270,6 +266,28 @@ public class GenericJdbcFromInitializer extends Initializer<LinkConfiguration, F
       }
       int columnType = rsmd.getColumnType(1);
 
+      String min;
+      String max;
+      // Boundaries for the job
+      switch (columnType) {
+        case Types.DATE:
+          min = String.valueOf(rs.getDate(1).getTime());
+          max = String.valueOf(rs.getDate(2).getTime());
+          break;
+        case Types.TIMESTAMP:
+          min = String.valueOf(rs.getTimestamp(1).getTime());
+          max = String.valueOf(rs.getTimestamp(2).getTime());
+          break;
+        case Types.TIME:
+          min = String.valueOf(rs.getTime(1).getTime());
+          max = String.valueOf(rs.getTime(2).getTime());
+          break;
+        default:
+          min = rs.getString(1);
+          max = rs.getString(2);
+      }
+
+
       LOG.info("Boundaries for the job: min=" + min + ", max=" + max + ", columnType=" + columnType);
 
       context.setInteger(GenericJdbcConnectorConstants.CONNECTOR_JDBC_PARTITION_COLUMNTYPE, columnType);

http://git-wip-us.apache.org/repos/asf/sqoop/blob/6e33d6af/connector/connector-generic-jdbc/src/main/java/org/apache/sqoop/connector/jdbc/GenericJdbcPartition.java
----------------------------------------------------------------------
diff --git a/connector/connector-generic-jdbc/src/main/java/org/apache/sqoop/connector/jdbc/GenericJdbcPartition.java b/connector/connector-generic-jdbc/src/main/java/org/apache/sqoop/connector/jdbc/GenericJdbcPartition.java
index 65400ef..924bbb2 100644
--- a/connector/connector-generic-jdbc/src/main/java/org/apache/sqoop/connector/jdbc/GenericJdbcPartition.java
+++ b/connector/connector-generic-jdbc/src/main/java/org/apache/sqoop/connector/jdbc/GenericJdbcPartition.java
@@ -20,34 +20,231 @@ package org.apache.sqoop.connector.jdbc;
 import java.io.DataInput;
 import java.io.DataOutput;
 import java.io.IOException;
+import java.math.BigDecimal;
+import java.sql.PreparedStatement;
+import java.sql.SQLException;
+import java.sql.Types;
+import java.text.DateFormat;
+import java.text.SimpleDateFormat;
+import java.util.ArrayList;
+import java.util.List;
+import java.util.TimeZone;
 
+import org.apache.sqoop.common.SqoopException;
+import org.apache.sqoop.error.code.GenericJdbcConnectorError;
 import org.apache.sqoop.job.etl.Partition;
 
+@edu.umd.cs.findbugs.annotations.SuppressWarnings("DB_DUPLICATE_SWITCH_CLAUSES")
 public class GenericJdbcPartition extends Partition {
 
-  private String conditions;
+  private String condition;
+  private List<Integer> sqlTypes;
+  private List<Object> params;
 
-  public void setConditions(String conditions) {
-    this.conditions = conditions;
+
+  public GenericJdbcPartition() {
+    sqlTypes = new ArrayList<>();
+    params = new ArrayList<>();
+  }
+
+  public void setCondition(String condition) {
+    this.condition = condition;
+  }
+
+  public String getCondition() {
+    return condition;
+  }
+
+  public void addParamsToPreparedStatement(PreparedStatement ps) throws SQLException {
+    for (int i = 0; i < params.size(); i++) {
+      ps.setObject(i + 1, params.get(i));
+    }
   }
 
-  public String getConditions() {
-    return conditions;
+  public void addParam(int sqlType, Object param) {
+    sqlTypes.add(sqlType);
+    params.add(param);
+  }
+
+
+  public List<Object> getParams() {
+    return params;
+  }
+
+  public List<Integer> getSqlTypes() {
+    return sqlTypes;
   }
 
   @Override
   public void readFields(DataInput in) throws IOException {
-    conditions = in.readUTF();
+    int numParams = in.readInt();
+    condition = in.readUTF();
+    for (int i = 0; i < numParams; i++) {
+      int type = in.readInt();
+      sqlTypes.add(type);
+      switch (type) {
+        case Types.TINYINT:
+        case Types.SMALLINT:
+        case Types.INTEGER:
+        case Types.BIGINT:
+          params.add(in.readLong());
+          break;
+        case Types.REAL:
+        case Types.FLOAT:
+          params.add(in.readDouble());
+          break;
+        case Types.DOUBLE:
+        case Types.NUMERIC:
+        case Types.DECIMAL:
+          params.add(new BigDecimal(in.readUTF()));
+          break;
+        case Types.BIT:
+        case Types.BOOLEAN:
+          params.add(in.readBoolean());
+          break;
+        case Types.DATE:
+          params.add(new java.sql.Date(in.readLong()));
+          break;
+        case Types.TIME:
+          params.add(new java.sql.Time(in.readLong()));
+          break;
+        case Types.TIMESTAMP:
+          params.add(new java.sql.Timestamp(in.readLong()));
+          break;
+        case Types.CHAR:
+        case Types.VARCHAR:
+        case Types.LONGVARCHAR:
+          params.add(in.readUTF());
+          break;
+        default:
+          throw new SqoopException(
+            GenericJdbcConnectorError.GENERIC_JDBC_CONNECTOR_0011,
+            String.valueOf(type));
+      }
+    }
   }
 
   @Override
   public void write(DataOutput out) throws IOException {
-    out.writeUTF(conditions);
+    out.writeInt(sqlTypes.size());
+    out.writeUTF(condition);
+    for (int i = 0; i < sqlTypes.size(); i++) {
+      int type = sqlTypes.get(i);
+      out.writeInt(type);
+      switch (type) {
+        case Types.TINYINT:
+        case Types.SMALLINT:
+        case Types.INTEGER:
+        case Types.BIGINT:
+          out.writeLong((Long) params.get(i));
+          break;
+        case Types.REAL:
+        case Types.FLOAT:
+          out.writeDouble((Double) params.get(i));
+          break;
+        case Types.DOUBLE:
+        case Types.NUMERIC:
+        case Types.DECIMAL:
+          out.writeUTF(params.get(i).toString());
+          break;
+        case Types.BIT:
+        case Types.BOOLEAN:
+          out.writeBoolean((Boolean) params.get(i));
+          break;
+        case Types.DATE:
+          out.writeLong(((java.sql.Date) params.get(i)).getTime());
+          break;
+        case Types.TIME:
+          out.writeLong(((java.sql.Time) params.get(i)).getTime());
+          break;
+        case Types.TIMESTAMP:
+          out.writeLong(((java.sql.Timestamp) params.get(i)).getTime());
+          break;
+        case Types.CHAR:
+        case Types.VARCHAR:
+        case Types.LONGVARCHAR:
+          out.writeUTF((String) params.get(i));
+          break;
+        default:
+          throw new SqoopException(
+            GenericJdbcConnectorError.GENERIC_JDBC_CONNECTOR_0011,
+            String.valueOf(type));
+      }
+    }
   }
 
   @Override
-  public String toString() {
-    return conditions;
+  public String toString(){
+    return asStringWithTimezone(TimeZone.getTimeZone("UTC"));
   }
 
-}
+  public String asStringWithTimezone(TimeZone timeZone) {
+    DateFormat dateDateFormat = new SimpleDateFormat("yyyy-MM-dd");
+
+    DateFormat timeDateFormat = new SimpleDateFormat("HH:mm:ss");
+    timeDateFormat.setTimeZone(timeZone);
+
+    DateFormat timestampDateFormat = new SimpleDateFormat("yyyy-MM-dd HH:mm:ss.SSS");
+    timestampDateFormat.setTimeZone(timeZone);
+
+    String condition = getCondition();
+    for (int j = 0; j < getParams().size(); j++) {
+      Object param = getParams().get(j);
+      int sqlType = getSqlTypes().get(j);
+      switch (sqlType) {
+        case Types.TINYINT:
+        case Types.SMALLINT:
+        case Types.INTEGER:
+        case Types.BIGINT:
+        case Types.REAL:
+        case Types.FLOAT:
+        case Types.DOUBLE:
+        case Types.NUMERIC:
+        case Types.DECIMAL:
+        case Types.BIT:
+        case Types.BOOLEAN:
+          condition = condition.replaceFirst("\\?", param.toString());
+          break;
+        case Types.DATE:
+          condition = condition.replaceFirst("\\?", "'" + dateDateFormat.format(param) + "'");
+          break;
+        case Types.TIME:
+          condition = condition.replaceFirst("\\?", "'" + timeDateFormat.format(param) + "'");
+          break;
+        case Types.TIMESTAMP:
+          condition = condition.replaceFirst("\\?", "'" + timestampDateFormat.format(param) + "'");
+          break;
+        case Types.CHAR:
+        case Types.VARCHAR:
+        case Types.LONGVARCHAR:
+        default:
+          condition = condition.replaceFirst("\\?", "'" + param.toString() + "'");
+          break;
+      }
+    }
+    return condition;
+  }
+
+  @Override
+  public boolean equals(Object o) {
+    if (this == o) return true;
+    if (o == null || getClass() != o.getClass()) return false;
+
+    GenericJdbcPartition that = (GenericJdbcPartition) o;
+
+    if (getCondition() != null ? !getCondition().equals(that.getCondition())
+      : that.getCondition() != null)
+      return false;
+    if (!getSqlTypes().equals(that.getSqlTypes())) return false;
+    return getParams().equals(that.getParams());
+
+  }
+
+  @Override
+  public int hashCode() {
+    int result = getCondition() != null ? getCondition().hashCode() : 0;
+    result = 31 * result + getSqlTypes().hashCode();
+    result = 31 * result + getParams().hashCode();
+    return result;
+  }
+}
\ No newline at end of file

http://git-wip-us.apache.org/repos/asf/sqoop/blob/6e33d6af/connector/connector-generic-jdbc/src/main/java/org/apache/sqoop/connector/jdbc/GenericJdbcPartitioner.java
----------------------------------------------------------------------
diff --git a/connector/connector-generic-jdbc/src/main/java/org/apache/sqoop/connector/jdbc/GenericJdbcPartitioner.java b/connector/connector-generic-jdbc/src/main/java/org/apache/sqoop/connector/jdbc/GenericJdbcPartitioner.java
index 2a42ed4..77b486c 100644
--- a/connector/connector-generic-jdbc/src/main/java/org/apache/sqoop/connector/jdbc/GenericJdbcPartitioner.java
+++ b/connector/connector-generic-jdbc/src/main/java/org/apache/sqoop/connector/jdbc/GenericJdbcPartitioner.java
@@ -49,7 +49,7 @@ public class GenericJdbcPartitioner extends Partitioner<LinkConfiguration, FromJ
 
   @Override
   public List<Partition> getPartitions(PartitionerContext context, LinkConfiguration linkConfig,
-      FromJobConfiguration fromJobConfig) {
+                                       FromJobConfiguration fromJobConfig) {
     List<Partition> partitions = new LinkedList<Partition>();
 
     numberPartitions = context.getMaxPartitions();
@@ -65,14 +65,14 @@ public class GenericJdbcPartitioner extends Partitioner<LinkConfiguration, FromJ
 
     if (partitionMinValue == null && partitionMaxValue == null) {
       GenericJdbcPartition partition = new GenericJdbcPartition();
-      partition.setConditions(partitionColumnName + " IS NULL");
+      partition.setCondition(partitionColumnName + " IS NULL");
       partitions.add(partition);
       return partitions;
     }
 
     if (allowNullValueInPartitionColumn) {
       GenericJdbcPartition partition = new GenericJdbcPartition();
-      partition.setConditions(partitionColumnName + " IS NULL");
+      partition.setCondition(partitionColumnName + " IS NULL");
       partitions.add(partition);
       if (numberPartitions > 1) {
         numberPartitions -= 1;
@@ -83,85 +83,60 @@ public class GenericJdbcPartitioner extends Partitioner<LinkConfiguration, FromJ
     }
 
     switch (partitionColumnType) {
-    case Types.TINYINT:
-    case Types.SMALLINT:
-    case Types.INTEGER:
-    case Types.BIGINT:
-      // Integer column
-      partitions.addAll(partitionIntegerColumn());
-      break;
-
-    case Types.REAL:
-    case Types.FLOAT:
-    case Types.DOUBLE:
-      // Floating point column
-      partitions.addAll(partitionFloatingPointColumn());
-      break;
-
-    case Types.NUMERIC:
-    case Types.DECIMAL:
-      // Decimal column
-      partitions.addAll(partitionNumericColumn());
-      break;
-
-    case Types.BIT:
-    case Types.BOOLEAN:
-      // Boolean column
-      return partitionBooleanColumn();
-
-    case Types.DATE:
-    case Types.TIME:
-    case Types.TIMESTAMP:
-      // Date time column
-      partitions.addAll(partitionDateTimeColumn());
-      break;
-
-    case Types.CHAR:
-    case Types.VARCHAR:
-    case Types.LONGVARCHAR:
-      // Text column
-      partitions.addAll(partitionTextColumn());
-      break;
-
-    default:
-      throw new SqoopException(
-          GenericJdbcConnectorError.GENERIC_JDBC_CONNECTOR_0011,
-          String.valueOf(partitionColumnType));
-    }
+      case Types.TINYINT:
+      case Types.SMALLINT:
+      case Types.INTEGER:
+      case Types.BIGINT:
+        // Integer column
+        partitions.addAll(partitionIntegerColumn(partitionColumnType));
+        break;
 
-    return partitions;
-  }
+      case Types.REAL:
+      case Types.FLOAT:
+      case Types.DOUBLE:
+        // Floating point column
+        partitions.addAll(partitionFloatingPointColumn(partitionColumnType));
+        break;
 
-  protected List<Partition> partitionDateTimeColumn() {
-    List<Partition> partitions = new LinkedList<Partition>();
+      case Types.NUMERIC:
+      case Types.DECIMAL:
+        // Decimal column
+        partitions.addAll(partitionNumericColumn(partitionColumnType));
+        break;
+
+      case Types.BIT:
+      case Types.BOOLEAN:
+        // Boolean column
+        return partitionBooleanColumn();
 
-    long minDateValue = 0;
-    long maxDateValue = 0;
-    SimpleDateFormat sdf = null;
-    switch(partitionColumnType) {
       case Types.DATE:
-        sdf = new SimpleDateFormat("yyyy-MM-dd");
-        minDateValue = Date.valueOf(partitionMinValue).getTime();
-        maxDateValue = Date.valueOf(partitionMaxValue).getTime();
-        break;
       case Types.TIME:
-        sdf = new SimpleDateFormat("HH:mm:ss");
-        minDateValue = Time.valueOf(partitionMinValue).getTime();
-        maxDateValue = Time.valueOf(partitionMaxValue).getTime();
+      case Types.TIMESTAMP:
+        // Date time column
+        partitions.addAll(partitionDateTimeColumn());
         break;
-      // Here should be the type of Types.TIMESTAMP:
-      default:
-        sdf = new SimpleDateFormat("yyyy-MM-dd HH:mm:ss.SSS");
-        minDateValue = Timestamp.valueOf(partitionMinValue).getTime();
-        maxDateValue = Timestamp.valueOf(partitionMaxValue).getTime();
+
+      case Types.CHAR:
+      case Types.VARCHAR:
+      case Types.LONGVARCHAR:
+        // Text column
+        partitions.addAll(partitionTextColumn());
         break;
+
+      default:
+        throw new SqoopException(
+          GenericJdbcConnectorError.GENERIC_JDBC_CONNECTOR_0011,
+          String.valueOf(partitionColumnType));
     }
 
+    return partitions;
+  }
 
-    minDateValue += TimeZone.getDefault().getOffset(minDateValue);
-    maxDateValue += TimeZone.getDefault().getOffset(maxDateValue);
+  protected List<Partition> partitionDateTimeColumn() {
+    List<Partition> partitions = new LinkedList<Partition>();
 
-    sdf.setTimeZone(TimeZone.getTimeZone("GMT"));
+    long minDateValue = Long.parseLong(partitionMinValue);
+    long maxDateValue = Long.parseLong(partitionMaxValue);
 
     long interval =  (maxDateValue - minDateValue) / numberPartitions;
     long remainder = (maxDateValue - minDateValue) % numberPartitions;
@@ -173,57 +148,41 @@ public class GenericJdbcPartitioner extends Partitioner<LinkConfiguration, FromJ
     long lowerBound;
     long upperBound = minDateValue;
 
-    Object objLB = null;
-    Object objUB = null;
-
+    GenericJdbcPartition partition;
     for (int i = 1; i < numberPartitions; i++) {
       lowerBound = upperBound;
       upperBound = lowerBound + interval;
       upperBound += (i <= remainder) ? 1 : 0;
 
+      partition = new GenericJdbcPartition();
       switch(partitionColumnType) {
         case Types.DATE:
-          objLB = new Date(lowerBound);
-          objUB = new Date(upperBound);
+          constructDateConditions(partition, Types.DATE, new Date(lowerBound), new Date(upperBound), false);
           break;
         case Types.TIME:
-          objLB = new Time(lowerBound);
-          objUB = new Time(upperBound);
-
+          constructDateConditions(partition, Types.TIME, new Time(lowerBound), new Time(upperBound), false);
           break;
         // Here should be the type of Types.TIMESTAMP:
         default:
-          objLB = new Timestamp(lowerBound);
-          objUB = new Timestamp(upperBound);
+          constructDateConditions(partition, Types.TIMESTAMP, new Timestamp(lowerBound), new Timestamp(upperBound), false);
           break;
       }
-
-      GenericJdbcPartition partition = new GenericJdbcPartition();
-      partition.setConditions(
-          constructDateConditions(sdf, objLB, objUB, false));
       partitions.add(partition);
     }
 
+    partition = new GenericJdbcPartition();
     switch(partitionColumnType) {
       case Types.DATE:
-        objLB = new Date(upperBound);
-        objUB = new Date(maxDateValue);
+        constructDateConditions(partition, Types.DATE, new Date(upperBound), new Date(maxDateValue), true);
         break;
       case Types.TIME:
-        objLB = new Time(upperBound);
-        objUB = new Time(maxDateValue);
+        constructDateConditions(partition, Types.TIME, new Time(upperBound), new Time(maxDateValue), true);
         break;
       // Here should be the type of Types.TIMESTAMP:
       default:
-        objLB = new Timestamp(upperBound);
-        objUB = new Timestamp(maxDateValue);
+        constructDateConditions(partition, Types.TIMESTAMP, new Timestamp(upperBound), new Timestamp(maxDateValue), true);
         break;
     }
-
-
-    GenericJdbcPartition partition = new GenericJdbcPartition();
-    partition.setConditions(
-        constructDateConditions(sdf, objLB, objUB, true));
     partitions.add(partition);
     return partitions;
   }
@@ -236,7 +195,7 @@ public class GenericJdbcPartitioner extends Partitioner<LinkConfiguration, FromJ
 
     // Remove common prefix if any as it does not affect outcome.
     int maxPrefixLen = Math.min(partitionMinValue.length(),
-        partitionMaxValue.length());
+      partitionMaxValue.length());
     // Calculate common prefix length
     int cpLen = 0;
 
@@ -259,8 +218,8 @@ public class GenericJdbcPartitioner extends Partitioner<LinkConfiguration, FromJ
     // Having one single value means that we can create only one single split
     if(minStringBD.equals(maxStringBD)) {
       GenericJdbcPartition partition = new GenericJdbcPartition();
-      partition.setConditions(constructTextConditions(prefix, 0, 0,
-        partitionMinValue, partitionMaxValue, true, true));
+      constructTextConditions(partition, prefix, 0, 0,
+        partitionMinValue, partitionMaxValue, true, true);
       partitions.add(partition);
       return partitions;
     }
@@ -269,7 +228,7 @@ public class GenericJdbcPartitioner extends Partitioner<LinkConfiguration, FromJ
     List<BigDecimal> splitPoints = new LinkedList<BigDecimal>();
 
     BigDecimal splitSize = divide(maxStringBD.subtract(minStringBD),
-        new BigDecimal(numberPartitions));
+      new BigDecimal(numberPartitions));
     if (splitSize.compareTo(NUMERIC_MIN_INCREMENT) < 0) {
       splitSize = NUMERIC_MIN_INCREMENT;
     }
@@ -289,12 +248,12 @@ public class GenericJdbcPartitioner extends Partitioner<LinkConfiguration, FromJ
     }
 
     if (splitPoints.size() == 0
-        || splitPoints.get(0).compareTo(minStringBD) != 0) {
+      || splitPoints.get(0).compareTo(minStringBD) != 0) {
       splitPoints.add(0, minStringBD);
     }
 
     if (splitPoints.get(splitPoints.size() - 1).compareTo(maxStringBD) != 0
-        || splitPoints.size() == 1) {
+      || splitPoints.size() == 1) {
       splitPoints.add(maxStringBD);
     }
 
@@ -304,8 +263,8 @@ public class GenericJdbcPartitioner extends Partitioner<LinkConfiguration, FromJ
       BigDecimal end = splitPoints.get(i);
 
       GenericJdbcPartition partition = new GenericJdbcPartition();
-      partition.setConditions(constructTextConditions(prefix, start, end,
-        partitionMinValue, partitionMaxValue, i == 1, i == splitPoints.size() - 1));
+      constructTextConditions(partition, prefix, start, end,
+        partitionMinValue, partitionMaxValue, i == 1, i == splitPoints.size() - 1);
       partitions.add(partition);
 
       start = end;
@@ -315,7 +274,7 @@ public class GenericJdbcPartitioner extends Partitioner<LinkConfiguration, FromJ
   }
 
 
-  protected List<Partition> partitionIntegerColumn() {
+  protected List<Partition> partitionIntegerColumn(int sqlType) {
     List<Partition> partitions = new LinkedList<Partition>();
 
     long minValue = partitionMinValue == null ? Long.MIN_VALUE
@@ -337,20 +296,18 @@ public class GenericJdbcPartitioner extends Partitioner<LinkConfiguration, FromJ
       upperBound += (i <= remainder) ? 1 : 0;
 
       GenericJdbcPartition partition = new GenericJdbcPartition();
-      partition.setConditions(
-          constructConditions(lowerBound, upperBound, false));
+      constructConditions(partition, sqlType, lowerBound, upperBound, false);
       partitions.add(partition);
     }
 
     GenericJdbcPartition partition = new GenericJdbcPartition();
-    partition.setConditions(
-        constructConditions(upperBound, maxValue, true));
+    constructConditions(partition, sqlType, upperBound, maxValue, true);
     partitions.add(partition);
 
     return partitions;
   }
 
-  protected List<Partition> partitionFloatingPointColumn() {
+  protected List<Partition> partitionFloatingPointColumn(int sqlType) {
     List<Partition> partitions = new LinkedList<Partition>();
 
 
@@ -367,20 +324,18 @@ public class GenericJdbcPartitioner extends Partitioner<LinkConfiguration, FromJ
       upperBound = lowerBound + interval;
 
       GenericJdbcPartition partition = new GenericJdbcPartition();
-      partition.setConditions(
-          constructConditions(lowerBound, upperBound, false));
+      constructConditions(partition, sqlType, lowerBound, upperBound, false);
       partitions.add(partition);
     }
 
     GenericJdbcPartition partition = new GenericJdbcPartition();
-    partition.setConditions(
-        constructConditions(upperBound, maxValue, true));
+    constructConditions(partition, sqlType, upperBound, maxValue, true);
     partitions.add(partition);
 
     return partitions;
   }
 
-  protected List<Partition> partitionNumericColumn() {
+  protected List<Partition> partitionNumericColumn(int sqlType) {
     List<Partition> partitions = new LinkedList<Partition>();
     // Having one end in null is not supported
     if (partitionMinValue == null || partitionMaxValue == null) {
@@ -393,7 +348,7 @@ public class GenericJdbcPartitioner extends Partitioner<LinkConfiguration, FromJ
     // Having one single value means that we can create only one single split
     if(minValue.equals(maxValue)) {
       GenericJdbcPartition partition = new GenericJdbcPartition();
-      partition.setConditions(constructConditions(minValue));
+      constructConditions(partition, sqlType, minValue);
       partitions.add(partition);
       return partitions;
     }
@@ -420,17 +375,20 @@ public class GenericJdbcPartitioner extends Partitioner<LinkConfiguration, FromJ
     }
 
     // Turn the split points into a set of intervals.
-    BigDecimal start = splitPoints.get(0);
-    for (int i = 1; i < splitPoints.size(); i++) {
-      BigDecimal end = splitPoints.get(i);
+    BigDecimal end = splitPoints.get(0);
+    for (int i = 0; i < numberPartitions - 1; i++) {
+      BigDecimal start = splitPoints.get(i);
+      end = splitPoints.get(i + 1);
 
       GenericJdbcPartition partition = new GenericJdbcPartition();
-      partition.setConditions(constructConditions(start, end, i == splitPoints.size() - 1));
+      constructConditions(partition, sqlType, start, end, false);
       partitions.add(partition);
-
-      start = end;
     }
 
+    GenericJdbcPartition partition = new GenericJdbcPartition();
+    constructConditions(partition, sqlType, end, maxValue, true);
+    partitions.add(partition);
+
     return partitions;
   }
 
@@ -447,9 +405,9 @@ public class GenericJdbcPartitioner extends Partitioner<LinkConfiguration, FromJ
     if(minValue.equals(maxValue)) {
       GenericJdbcPartition partition = new GenericJdbcPartition();
 
-      conditions.append(partitionColumnName).append(" = ")
-          .append(maxValue);
-      partition.setConditions(conditions.toString());
+      conditions.append(partitionColumnName).append(" = ?");
+      partition.setCondition(conditions.toString());
+      partition.addParam(Types.BOOLEAN, maxValue);
       partitions.add(partition);
       return partitions;
     }
@@ -459,18 +417,18 @@ public class GenericJdbcPartitioner extends Partitioner<LinkConfiguration, FromJ
     if (partitionMinValue == null) {
       conditions = new StringBuilder();
       conditions.append(partitionColumnName).append(" IS NULL");
-      partition.setConditions(conditions.toString());
+      partition.setCondition(conditions.toString());
       partitions.add(partition);
     }
     partition = new GenericJdbcPartition();
     conditions = new StringBuilder();
     conditions.append(partitionColumnName).append(" = TRUE");
-    partition.setConditions(conditions.toString());
+    partition.setCondition(conditions.toString());
     partitions.add(partition);
     partition = new GenericJdbcPartition();
     conditions = new StringBuilder();
     conditions.append(partitionColumnName).append(" = FALSE");
-    partition.setConditions(conditions.toString());
+    partition.setCondition(conditions.toString());
     partitions.add(partition);
     return partitions;
   }
@@ -493,54 +451,53 @@ public class GenericJdbcPartitioner extends Partitioner<LinkConfiguration, FromJ
     }
   }
 
-  protected String constructConditions(
-      Object lowerBound, Object upperBound, boolean lastOne) {
+  protected void constructConditions(
+    GenericJdbcPartition partition, int sqlType, Object lowerBound, Object upperBound, boolean lastOne) {
     StringBuilder conditions = new StringBuilder();
-    conditions.append(lowerBound);
-    conditions.append(" <= ");
+    partition.addParam(sqlType, lowerBound);
+    conditions.append("? <= ");
     conditions.append(partitionColumnName);
     conditions.append(" AND ");
     conditions.append(partitionColumnName);
-    conditions.append(lastOne ? " <= " : " < ");
-    conditions.append(upperBound);
-    return conditions.toString();
+    conditions.append(lastOne ? " <= ?" : " < ?");
+    partition.addParam(sqlType, upperBound);
+
+    partition.setCondition(conditions.toString());
   }
 
-  protected String constructConditions(Object value) {
-    return new StringBuilder()
-      .append(partitionColumnName)
-      .append(" = ")
-      .append(value)
-      .toString()
-     ;
+  protected void constructConditions(GenericJdbcPartition partition, int sqlType, Object value) {
+    String condition = partitionColumnName + " = ?";
+    partition.addParam(sqlType, value);
+    partition.setCondition(condition);
   }
 
-  protected String constructDateConditions(SimpleDateFormat sdf,
-      Object lowerBound, Object upperBound, boolean lastOne) {
+  protected void constructDateConditions(GenericJdbcPartition partition, int dateType,
+                                         Object lowerBound, Object upperBound, boolean lastOne) {
     StringBuilder conditions = new StringBuilder();
-    conditions.append('\'').append(sdf.format((java.util.Date)lowerBound)).append('\'');
-    conditions.append(" <= ");
+    partition.addParam(dateType, lowerBound);
+    conditions.append("? <= ");
     conditions.append(partitionColumnName);
     conditions.append(" AND ");
     conditions.append(partitionColumnName);
-    conditions.append(lastOne ? " <= " : " < ");
-    conditions.append('\'').append(sdf.format((java.util.Date)upperBound)).append('\'');
-    return conditions.toString();
+    conditions.append(lastOne ? " <= ?" : " < ?");
+    partition.setCondition(conditions.toString());
+    partition.addParam(dateType, upperBound);
   }
 
-  protected String constructTextConditions(String prefix, Object lowerBound, Object upperBound,
-      String lowerStringBound, String upperStringBound, boolean firstOne, boolean lastOne) {
+  protected void constructTextConditions(GenericJdbcPartition partition, String prefix, Object lowerBound, Object upperBound,
+                                         String lowerStringBound, String upperStringBound, boolean firstOne, boolean lastOne) {
     StringBuilder conditions = new StringBuilder();
     String lbString = prefix + bigDecimalToText((BigDecimal)lowerBound);
     String ubString = prefix + bigDecimalToText((BigDecimal)upperBound);
-    conditions.append('\'').append(firstOne ? lowerStringBound : lbString).append('\'');
-    conditions.append(" <= ");
+    partition.addParam(Types.VARCHAR, firstOne ? lowerStringBound : lbString);
+    conditions.append("? <= ");
     conditions.append(partitionColumnName);
     conditions.append(" AND ");
     conditions.append(partitionColumnName);
-    conditions.append(lastOne ? " <= " : " < ");
-    conditions.append('\'').append(lastOne ? upperStringBound : ubString).append('\'');
-    return conditions.toString();
+    conditions.append(lastOne ? " <= ?" : " < ?");
+    partition.addParam(Types.VARCHAR, lastOne ? upperStringBound : ubString);
+
+    partition.setCondition(conditions.toString());
   }
 
   /**
@@ -607,5 +564,4 @@ public class GenericJdbcPartitioner extends Partitioner<LinkConfiguration, FromJ
 
     return sb.toString();
   }
-
 }

http://git-wip-us.apache.org/repos/asf/sqoop/blob/6e33d6af/connector/connector-generic-jdbc/src/test/java/org/apache/sqoop/connector/jdbc/TestExtractor.java
----------------------------------------------------------------------
diff --git a/connector/connector-generic-jdbc/src/test/java/org/apache/sqoop/connector/jdbc/TestExtractor.java b/connector/connector-generic-jdbc/src/test/java/org/apache/sqoop/connector/jdbc/TestExtractor.java
index 3b52128..9847d5e 100644
--- a/connector/connector-generic-jdbc/src/test/java/org/apache/sqoop/connector/jdbc/TestExtractor.java
+++ b/connector/connector-generic-jdbc/src/test/java/org/apache/sqoop/connector/jdbc/TestExtractor.java
@@ -36,6 +36,8 @@ import org.testng.annotations.AfterMethod;
 import org.testng.annotations.BeforeMethod;
 import org.testng.annotations.Test;
 
+import java.sql.Types;
+
 import static org.testng.Assert.assertEquals;
 import static org.testng.Assert.fail;
 
@@ -54,7 +56,7 @@ public class TestExtractor {
 
   public TestExtractor() {
     tableName = getClass().getSimpleName().toUpperCase();
-   nullDataTableName = getClass().getSimpleName().toUpperCase() + "NULL";
+    nullDataTableName = getClass().getSimpleName().toUpperCase() + "NULL";
   }
 
   @BeforeMethod(alwaysRun = true)
@@ -63,13 +65,13 @@ public class TestExtractor {
 
     if (!executor.existTable(tableName)) {
       executor.executeUpdate("CREATE TABLE "
-          + executor.encloseIdentifier(tableName)
-          + "(ICOL INTEGER PRIMARY KEY, DCOL DOUBLE, VCOL VARCHAR(20), DATECOL DATE)");
+        + executor.encloseIdentifier(tableName)
+        + "(ICOL INTEGER PRIMARY KEY, DCOL DOUBLE, VCOL VARCHAR(20), DATECOL DATE)");
 
       for (int i = 0; i < NUMBER_OF_ROWS; i++) {
         int value = START + i;
         String sql = "INSERT INTO " + executor.encloseIdentifier(tableName)
-            + " VALUES(" + value + ", " + value + ", '" + value + "', '2004-10-19')";
+          + " VALUES(" + value + ", " + value + ", '" + value + "', '2004-10-19')";
         executor.executeUpdate(sql);
       }
     }
@@ -93,7 +95,7 @@ public class TestExtractor {
     FromJobConfiguration jobConfig = new FromJobConfiguration();
 
     context.setString(GenericJdbcConnectorConstants.CONNECTOR_JDBC_FROM_DATA_SQL,
-        "SELECT * FROM " + executor.encloseIdentifier(tableName) + " WHERE ${CONDITIONS}");
+      "SELECT * FROM " + executor.encloseIdentifier(tableName) + " WHERE ${CONDITIONS}");
 
     GenericJdbcPartition partition;
 
@@ -107,15 +109,21 @@ public class TestExtractor {
     ExtractorContext extractorContext = new ExtractorContext(context, writer, schema, "test_user");
 
     partition = new GenericJdbcPartition();
-    partition.setConditions("-50.0 <= DCOL AND DCOL < -16.6666666666666665");
+    partition.setCondition("? <= DCOL AND DCOL < ?");
+    partition.addParam(Types.NUMERIC, -50.0);
+    partition.addParam(Types.NUMERIC, -16.6666666666666665);
     extractor.extract(extractorContext, linkConfig, jobConfig, partition);
 
     partition = new GenericJdbcPartition();
-    partition.setConditions("-16.6666666666666665 <= DCOL AND DCOL < 16.666666666666667");
+    partition.setCondition("? <= DCOL AND DCOL < ?");
+    partition.addParam(Types.NUMERIC, -16.6666666666666665);
+    partition.addParam(Types.NUMERIC, 16.6666666666666667);
     extractor.extract(extractorContext, linkConfig, jobConfig, partition);
 
     partition = new GenericJdbcPartition();
-    partition.setConditions("16.666666666666667 <= DCOL AND DCOL <= 50.0");
+    partition.setCondition("? <= DCOL AND DCOL <= ?");
+    partition.addParam(Types.NUMERIC, 16.6666666666666667);
+    partition.addParam(Types.NUMERIC, 50.0);
     extractor.extract(extractorContext, linkConfig, jobConfig, partition);
   }
 
@@ -132,8 +140,8 @@ public class TestExtractor {
     FromJobConfiguration jobConfig = new FromJobConfiguration();
 
     context.setString(GenericJdbcConnectorConstants.CONNECTOR_JDBC_FROM_DATA_SQL,
-        "SELECT SQOOP_SUBQUERY_ALIAS.ICOL,SQOOP_SUBQUERY_ALIAS.VCOL,SQOOP_SUBQUERY_ALIAS.DATECOL FROM " + "(SELECT * FROM "
-            + executor.encloseIdentifier(tableName) + " WHERE ${CONDITIONS}) SQOOP_SUBQUERY_ALIAS");
+      "SELECT SQOOP_SUBQUERY_ALIAS.ICOL,SQOOP_SUBQUERY_ALIAS.VCOL,SQOOP_SUBQUERY_ALIAS.DATECOL FROM " + "(SELECT * FROM "
+        + executor.encloseIdentifier(tableName) + " WHERE ${CONDITIONS}) SQOOP_SUBQUERY_ALIAS");
 
     GenericJdbcPartition partition;
 
@@ -147,15 +155,21 @@ public class TestExtractor {
     ExtractorContext extractorContext = new ExtractorContext(context, writer, schema, "test_user");
 
     partition = new GenericJdbcPartition();
-    partition.setConditions("-50 <= ICOL AND ICOL < -16");
+    partition.setCondition("? <= ICOL AND ICOL < ?");
+    partition.addParam(Types.NUMERIC, -50);
+    partition.addParam(Types.NUMERIC, -16);
     extractor.extract(extractorContext, linkConfig, jobConfig, partition);
 
     partition = new GenericJdbcPartition();
-    partition.setConditions("-16 <= ICOL AND ICOL < 17");
+    partition.setCondition("? <= ICOL AND ICOL < ?");
+    partition.addParam(Types.NUMERIC, -16);
+    partition.addParam(Types.NUMERIC, 17);
     extractor.extract(extractorContext, linkConfig, jobConfig, partition);
 
     partition = new GenericJdbcPartition();
-    partition.setConditions("17 <= ICOL AND ICOL < 50");
+    partition.setCondition("? <= ICOL AND ICOL < ?");
+    partition.addParam(Types.NUMERIC, 17);
+    partition.addParam(Types.NUMERIC, 50);
     extractor.extract(extractorContext, linkConfig, jobConfig, partition);
   }
 
@@ -172,9 +186,9 @@ public class TestExtractor {
     FromJobConfiguration jobConfig = new FromJobConfiguration();
 
     context.setString(
-        GenericJdbcConnectorConstants.CONNECTOR_JDBC_FROM_DATA_SQL,
-        "SELECT SQOOP_SUBQUERY_ALIAS.ICOL,SQOOP_SUBQUERY_ALIAS.VCOL FROM " + "(SELECT * FROM "
-            + executor.encloseIdentifier(tableName) + " WHERE ${CONDITIONS}) SQOOP_SUBQUERY_ALIAS");
+      GenericJdbcConnectorConstants.CONNECTOR_JDBC_FROM_DATA_SQL,
+      "SELECT SQOOP_SUBQUERY_ALIAS.ICOL,SQOOP_SUBQUERY_ALIAS.VCOL FROM " + "(SELECT * FROM "
+        + executor.encloseIdentifier(tableName) + " WHERE ${CONDITIONS}) SQOOP_SUBQUERY_ALIAS");
 
     GenericJdbcPartition partition = new GenericJdbcPartition();
 
@@ -183,7 +197,9 @@ public class TestExtractor {
     Schema schema = new Schema("TestIncorrectColumns");
     ExtractorContext extractorContext = new ExtractorContext(context, writer, schema, "test_user");
 
-    partition.setConditions("-50 <= ICOL AND ICOL < -16");
+    partition.setCondition("? <= ICOL AND ICOL < ?");
+    partition.addParam(Types.NUMERIC, -50);
+    partition.addParam(Types.NUMERIC, -16);
     extractor.extract(extractorContext, linkConfig, jobConfig, partition);
 
   }
@@ -193,7 +209,7 @@ public class TestExtractor {
 
     if (!executor.existTable(nullDataTableName)) {
       executor.executeUpdate("CREATE TABLE " + executor.encloseIdentifier(nullDataTableName)
-          + "(ICOL INTEGER PRIMARY KEY, DCOL DOUBLE, VCOL VARCHAR(20), DATECOL DATE)");
+        + "(ICOL INTEGER PRIMARY KEY, DCOL DOUBLE, VCOL VARCHAR(20), DATECOL DATE)");
 
       for (int i = 0; i < NUMBER_OF_ROWS; i++) {
         int value = i;
@@ -210,7 +226,7 @@ public class TestExtractor {
 
     FromJobConfiguration jobConfig = new FromJobConfiguration();
     context.setString(GenericJdbcConnectorConstants.CONNECTOR_JDBC_FROM_DATA_SQL,
-        "SELECT * FROM " + executor.encloseIdentifier(nullDataTableName) + " WHERE ${CONDITIONS}");
+      "SELECT * FROM " + executor.encloseIdentifier(nullDataTableName) + " WHERE ${CONDITIONS}");
 
     Extractor extractor = new GenericJdbcExtractor();
     DummyNullDataWriter writer = new DummyNullDataWriter();
@@ -220,7 +236,9 @@ public class TestExtractor {
     ExtractorContext extractorContext = new ExtractorContext(context, writer, schema, "test_user");
 
     GenericJdbcPartition partition = new GenericJdbcPartition();
-    partition.setConditions("-50 <= ICOL AND ICOL < -16");
+    partition.setCondition("? <= ICOL AND ICOL < ?");
+    partition.addParam(Types.NUMERIC, -50);
+    partition.addParam(Types.NUMERIC, -16);
     extractor.extract(extractorContext, linkConfig, jobConfig, partition);
 
   }

http://git-wip-us.apache.org/repos/asf/sqoop/blob/6e33d6af/connector/connector-generic-jdbc/src/test/java/org/apache/sqoop/connector/jdbc/TestGenericJdbcPartition.java
----------------------------------------------------------------------
diff --git a/connector/connector-generic-jdbc/src/test/java/org/apache/sqoop/connector/jdbc/TestGenericJdbcPartition.java b/connector/connector-generic-jdbc/src/test/java/org/apache/sqoop/connector/jdbc/TestGenericJdbcPartition.java
new file mode 100644
index 0000000..0a97939
--- /dev/null
+++ b/connector/connector-generic-jdbc/src/test/java/org/apache/sqoop/connector/jdbc/TestGenericJdbcPartition.java
@@ -0,0 +1,78 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.sqoop.connector.jdbc;
+
+import static org.testng.Assert.*;
+
+import org.testng.annotations.Test;
+
+import java.io.ByteArrayInputStream;
+import java.io.ByteArrayOutputStream;
+import java.io.DataInput;
+import java.io.DataInputStream;
+import java.io.DataOutput;
+import java.io.DataOutputStream;
+import java.math.BigDecimal;
+import java.sql.Types;
+
+public class TestGenericJdbcPartition {
+
+  @Test
+  public void testSerialization() throws Exception {
+    GenericJdbcPartition expectedPartition = new GenericJdbcPartition();
+    expectedPartition.setCondition("TEST_CONDITION");
+
+    expectedPartition.addParam(Types.TINYINT, 1L);
+    expectedPartition.addParam(Types.SMALLINT, 2L);
+    expectedPartition.addParam(Types.INTEGER, 3L);
+    expectedPartition.addParam(Types.BIGINT, 4L);
+
+    expectedPartition.addParam(Types.REAL, 1.1);
+    expectedPartition.addParam(Types.FLOAT, 1.2);
+
+    expectedPartition.addParam(Types.DOUBLE, new BigDecimal(1.3));
+    expectedPartition.addParam(Types.NUMERIC, new BigDecimal(1.4));
+    expectedPartition.addParam(Types.DECIMAL, new BigDecimal(1.5));
+
+    expectedPartition.addParam(Types.BIT, true);
+    expectedPartition.addParam(Types.BOOLEAN, false);
+
+    expectedPartition.addParam(Types.DATE, java.sql.Date.valueOf("2015-12-14"));
+    expectedPartition.addParam(Types.TIME, java.sql.Time.valueOf("00:00:00"));
+    expectedPartition.addParam(Types.TIMESTAMP, java.sql.Timestamp.valueOf("2015-12-16 00:00:00"));
+
+    expectedPartition.addParam(Types.CHAR, "a");
+    expectedPartition.addParam(Types.VARCHAR, "aa");
+    expectedPartition.addParam(Types.LONGVARCHAR, "aaa");
+
+    ByteArrayOutputStream byteArrayOutputStream = new ByteArrayOutputStream();
+    DataOutput dataOutput = new DataOutputStream(byteArrayOutputStream);
+
+    expectedPartition.write(dataOutput);
+
+    GenericJdbcPartition actualPartition = new GenericJdbcPartition();
+
+    ByteArrayInputStream byteArrayInputStream = new ByteArrayInputStream(byteArrayOutputStream.toByteArray());
+    DataInput dataInput = new DataInputStream(byteArrayInputStream);
+
+    actualPartition.readFields(dataInput);
+
+    assertEquals(actualPartition, expectedPartition);
+  }
+
+}

http://git-wip-us.apache.org/repos/asf/sqoop/blob/6e33d6af/connector/connector-generic-jdbc/src/test/java/org/apache/sqoop/connector/jdbc/TestPartitioner.java
----------------------------------------------------------------------
diff --git a/connector/connector-generic-jdbc/src/test/java/org/apache/sqoop/connector/jdbc/TestPartitioner.java b/connector/connector-generic-jdbc/src/test/java/org/apache/sqoop/connector/jdbc/TestPartitioner.java
index 3a767ab..b250f94 100644
--- a/connector/connector-generic-jdbc/src/test/java/org/apache/sqoop/connector/jdbc/TestPartitioner.java
+++ b/connector/connector-generic-jdbc/src/test/java/org/apache/sqoop/connector/jdbc/TestPartitioner.java
@@ -22,8 +22,12 @@ import java.sql.Date;
 import java.sql.Time;
 import java.sql.Timestamp;
 import java.sql.Types;
+import java.text.DateFormat;
+import java.text.SimpleDateFormat;
+import java.util.ArrayList;
 import java.util.Iterator;
 import java.util.List;
+import java.util.TimeZone;
 
 import org.apache.sqoop.common.MutableContext;
 import org.apache.sqoop.common.MutableMapContext;
@@ -249,6 +253,37 @@ public class TestPartitioner {
     });
   }
 
+  // We may round the quotient when calculating splitsize, this tests ensure we catch those values in the final partition
+  @Test
+  public void testNumericInaccurateSplit() throws Exception {
+    MutableContext context = new MutableMapContext();
+    context.setString(GenericJdbcConnectorConstants.CONNECTOR_JDBC_PARTITION_COLUMNNAME, "DCOL");
+    context.setString(GenericJdbcConnectorConstants.CONNECTOR_JDBC_PARTITION_COLUMNTYPE, String.valueOf(Types.NUMERIC));
+    context.setString(GenericJdbcConnectorConstants.CONNECTOR_JDBC_PARTITION_MINVALUE, String.valueOf(new BigDecimal(1)));
+    context.setString(GenericJdbcConnectorConstants.CONNECTOR_JDBC_PARTITION_MAXVALUE, String.valueOf(new BigDecimal(13)));
+
+    LinkConfiguration linkConfig = new LinkConfiguration();
+    FromJobConfiguration jobConfig = new FromJobConfiguration();
+
+    Partitioner partitioner = new GenericJdbcPartitioner();
+    PartitionerContext partitionerContext = new PartitionerContext(context, 11, null, "test_user");
+    List<Partition> partitions = partitioner.getPartitions(partitionerContext, linkConfig, jobConfig);
+
+    verifyResult(partitions, new String[] {
+      "1 <= DCOL AND DCOL < 2",
+      "2 <= DCOL AND DCOL < 3",
+      "3 <= DCOL AND DCOL < 4",
+      "4 <= DCOL AND DCOL < 5",
+      "5 <= DCOL AND DCOL < 6",
+      "6 <= DCOL AND DCOL < 7",
+      "7 <= DCOL AND DCOL < 8",
+      "8 <= DCOL AND DCOL < 9",
+      "9 <= DCOL AND DCOL < 10",
+      "10 <= DCOL AND DCOL < 11",
+      "11 <= DCOL AND DCOL <= 13"
+    });
+  }
+
   @Test
   public void testNumericSinglePartition() throws Exception {
     MutableContext context = new MutableMapContext();
@@ -276,10 +311,9 @@ public class TestPartitioner {
     context.setString(GenericJdbcConnectorConstants
         .CONNECTOR_JDBC_PARTITION_COLUMNTYPE, String.valueOf(Types.DATE));
     context.setString(GenericJdbcConnectorConstants.CONNECTOR_JDBC_PARTITION_MINVALUE,
-        Date.valueOf("2004-10-20").toString());
-    context.setString(GenericJdbcConnectorConstants
-        .CONNECTOR_JDBC_PARTITION_MAXVALUE, Date.valueOf("2013-10-17")
-        .toString());
+      String.valueOf(Date.valueOf("2004-10-20").getTime()));
+    context.setString(GenericJdbcConnectorConstants.CONNECTOR_JDBC_PARTITION_MAXVALUE,
+      String.valueOf(Date.valueOf("2013-10-17").getTime()));
 
 
     LinkConfiguration linkConfig = new LinkConfiguration();
@@ -300,15 +334,21 @@ public class TestPartitioner {
 
   @Test
   public void testTimePartition() throws Exception {
+    DateFormat timeDateFormat = new SimpleDateFormat("HH:mm:ss");
+    timeDateFormat.setTimeZone(TimeZone.getTimeZone("UTC"));
+
+    Long startTime = timeDateFormat.parse("01:01:01").getTime();
+    Long endTime = timeDateFormat.parse("10:40:50").getTime();
+
     MutableContext context = new MutableMapContext();
     context.setString(GenericJdbcConnectorConstants
         .CONNECTOR_JDBC_PARTITION_COLUMNNAME, "TCOL");
     context.setString(GenericJdbcConnectorConstants
         .CONNECTOR_JDBC_PARTITION_COLUMNTYPE, String.valueOf(Types.TIME));
     context.setString(GenericJdbcConnectorConstants.CONNECTOR_JDBC_PARTITION_MINVALUE,
-        Time.valueOf("01:01:01").toString());
+        String.valueOf(startTime));
     context.setString(GenericJdbcConnectorConstants.CONNECTOR_JDBC_PARTITION_MAXVALUE,
-        Time.valueOf("10:40:50").toString());
+        String.valueOf(endTime));
 
 
     LinkConfiguration linkConfig = new LinkConfiguration();
@@ -327,15 +367,21 @@ public class TestPartitioner {
 
   @Test
   public void testTimestampPartition() throws Exception {
+    DateFormat timestampDateFormat = new SimpleDateFormat("yyyy-MM-dd HH:mm:ss.SSS");
+    timestampDateFormat.setTimeZone(TimeZone.getTimeZone("UTC"));
+
+    Long startTime = timestampDateFormat.parse("2013-01-01 01:01:01.123").getTime();
+    Long endTime = timestampDateFormat.parse("2013-12-31 10:40:50.654").getTime();
+
     MutableContext context = new MutableMapContext();
     context.setString(GenericJdbcConnectorConstants
         .CONNECTOR_JDBC_PARTITION_COLUMNNAME, "TSCOL");
     context.setString(GenericJdbcConnectorConstants
         .CONNECTOR_JDBC_PARTITION_COLUMNTYPE, String.valueOf(Types.TIMESTAMP));
     context.setString(GenericJdbcConnectorConstants.CONNECTOR_JDBC_PARTITION_MINVALUE,
-        Timestamp.valueOf("2013-01-01 01:01:01.123").toString());
+        String.valueOf(startTime));
     context.setString(GenericJdbcConnectorConstants.CONNECTOR_JDBC_PARTITION_MAXVALUE,
-        Timestamp.valueOf("2013-12-31 10:40:50.654").toString());
+        String.valueOf(endTime));
 
     LinkConfiguration linkConfig = new LinkConfiguration();
     FromJobConfiguration jobConfig = new FromJobConfiguration();
@@ -513,8 +559,8 @@ public class TestPartitioner {
 
     Iterator<Partition> iterator = partitions.iterator();
     for (int i = 0; i < expected.length; i++) {
-      assertEquals(expected[i],
-          ((GenericJdbcPartition)iterator.next()).getConditions());
+      GenericJdbcPartition partition = ((GenericJdbcPartition) iterator.next());
+      assertEquals(partition.toString(), expected[i]);
     }
   }
-}
+}
\ No newline at end of file

http://git-wip-us.apache.org/repos/asf/sqoop/blob/6e33d6af/test/src/test/java/org/apache/sqoop/integration/connector/jdbc/generic/TableStagedRDBMSTest.java
----------------------------------------------------------------------
diff --git a/test/src/test/java/org/apache/sqoop/integration/connector/jdbc/generic/TableStagedRDBMSTest.java b/test/src/test/java/org/apache/sqoop/integration/connector/jdbc/generic/TableStagedRDBMSTest.java
index 2565c3f..70b6eff 100644
--- a/test/src/test/java/org/apache/sqoop/integration/connector/jdbc/generic/TableStagedRDBMSTest.java
+++ b/test/src/test/java/org/apache/sqoop/integration/connector/jdbc/generic/TableStagedRDBMSTest.java
@@ -89,4 +89,10 @@ public class TableStagedRDBMSTest extends ConnectorTestCase {
     dropTable();
   }
 
+  // shorter name for oracle
+  @Override
+  public TableName getTableName() {
+    return new TableName("stagedrdbms");
+  }
+
 }