You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@hive.apache.org by da...@apache.org on 2018/10/19 00:24:00 UTC

[2/2] hive git commit: HIVE-20720: Add partition column option to JDBC handler (Daniel Dai, reviewed by Jesus Camacho Rodriguez)

HIVE-20720: Add partition column option to JDBC handler (Daniel Dai, reviewed by Jesus Camacho Rodriguez)

Signed-off-by: Jesus Camacho Rodriguez <jc...@apache.org>


Project: http://git-wip-us.apache.org/repos/asf/hive/repo
Commit: http://git-wip-us.apache.org/repos/asf/hive/commit/3e5e77d1
Tree: http://git-wip-us.apache.org/repos/asf/hive/tree/3e5e77d1
Diff: http://git-wip-us.apache.org/repos/asf/hive/diff/3e5e77d1

Branch: refs/heads/master
Commit: 3e5e77d1f12e52796ac34d32517b09e5887cd695
Parents: a2bbf79
Author: Daniel Dai <da...@gmail.com>
Authored: Thu Oct 18 17:18:05 2018 -0700
Committer: Daniel Dai <da...@gmail.com>
Committed: Thu Oct 18 17:18:47 2018 -0700

----------------------------------------------------------------------
 .../org/apache/hadoop/hive/conf/Constants.java  |   4 +
 .../test/resources/testconfiguration.properties |   2 +
 .../hive/storage/jdbc/JdbcInputFormat.java      | 114 +++++--
 .../hive/storage/jdbc/JdbcInputSplit.java       |  54 ++++
 .../hive/storage/jdbc/JdbcRecordReader.java     |   3 +-
 .../org/apache/hive/storage/jdbc/JdbcSerDe.java |  63 ++--
 .../jdbc/conf/JdbcStorageConfigManager.java     |  16 +
 .../hive/storage/jdbc/dao/DatabaseAccessor.java |  12 +-
 .../jdbc/dao/GenericJdbcDatabaseAccessor.java   | 216 ++++++++-----
 .../storage/jdbc/dao/JdbcRecordIterator.java    |  92 +++++-
 .../jdbc/dao/JethroDatabaseAccessor.java        |   3 +-
 .../storage/jdbc/dao/MySqlDatabaseAccessor.java |  16 +-
 .../jdbc/spitter/DateIntervalSplitter.java      |  42 +++
 .../jdbc/spitter/DecimalIntervalSplitter.java   |  50 +++
 .../jdbc/spitter/DoubleIntervalSplitter.java    |  41 +++
 .../storage/jdbc/spitter/IntervalSplitter.java  |  24 ++
 .../jdbc/spitter/IntervalSplitterFactory.java   |  45 +++
 .../jdbc/spitter/LongIntervalSpitter.java       |  42 +++
 .../jdbc/spitter/TimestampIntervalSplitter.java |  43 +++
 .../hive/storage/jdbc/TestJdbcInputFormat.java  | 204 +++++++++++-
 .../dao/TestGenericJdbcDatabaseAccessor.java    |  18 +-
 .../external_jdbc_table_partition.q             | 135 ++++++++
 .../external_jdbc_table_typeconversion.q        | 119 +++++++
 .../llap/external_jdbc_table_partition.q.out    | 319 +++++++++++++++++++
 .../external_jdbc_table_typeconversion.q.out    | 280 ++++++++++++++++
 25 files changed, 1787 insertions(+), 170 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/hive/blob/3e5e77d1/common/src/java/org/apache/hadoop/hive/conf/Constants.java
----------------------------------------------------------------------
diff --git a/common/src/java/org/apache/hadoop/hive/conf/Constants.java b/common/src/java/org/apache/hadoop/hive/conf/Constants.java
index 1190679..61bc9df 100644
--- a/common/src/java/org/apache/hadoop/hive/conf/Constants.java
+++ b/common/src/java/org/apache/hadoop/hive/conf/Constants.java
@@ -73,6 +73,10 @@ public class Constants {
   public static final String JDBC_QUERY_FIELD_NAMES = JDBC_CONFIG_PREFIX + ".query.fieldNames";
   public static final String JDBC_QUERY_FIELD_TYPES = JDBC_CONFIG_PREFIX + ".query.fieldTypes";
   public static final String JDBC_SPLIT_QUERY = JDBC_CONFIG_PREFIX + ".query.split";
+  public static final String JDBC_PARTITION_COLUMN = JDBC_CONFIG_PREFIX + ".partitionColumn";
+  public static final String JDBC_NUM_PARTITIONS = JDBC_CONFIG_PREFIX + ".numPartitions";
+  public static final String JDBC_LOW_BOUND = JDBC_CONFIG_PREFIX + ".lowerBound";
+  public static final String JDBC_UPPER_BOUND = JDBC_CONFIG_PREFIX + ".upperBound";
 
   public static final String HIVE_SERVER2_JOB_CREDSTORE_PASSWORD_ENVVAR = "HIVE_JOB_CREDSTORE_PASSWORD";
   public static final String HADOOP_CREDENTIAL_PASSWORD_ENVVAR = "HADOOP_CREDSTORE_PASSWORD";

http://git-wip-us.apache.org/repos/asf/hive/blob/3e5e77d1/itests/src/test/resources/testconfiguration.properties
----------------------------------------------------------------------
diff --git a/itests/src/test/resources/testconfiguration.properties b/itests/src/test/resources/testconfiguration.properties
index b6d42c6..8349e3d 100644
--- a/itests/src/test/resources/testconfiguration.properties
+++ b/itests/src/test/resources/testconfiguration.properties
@@ -520,6 +520,8 @@ minillaplocal.query.files=\
   external_jdbc_auth.q,\
   external_jdbc_table.q,\
   external_jdbc_table2.q,\
+  external_jdbc_table_partition.q,\
+  external_jdbc_table_typeconversion.q,\
   fullouter_mapjoin_1_optimized.q,\
   groupby2.q,\
   groupby_groupingset_bug.q,\

http://git-wip-us.apache.org/repos/asf/hive/blob/3e5e77d1/jdbc-handler/src/main/java/org/apache/hive/storage/jdbc/JdbcInputFormat.java
----------------------------------------------------------------------
diff --git a/jdbc-handler/src/main/java/org/apache/hive/storage/jdbc/JdbcInputFormat.java b/jdbc-handler/src/main/java/org/apache/hive/storage/jdbc/JdbcInputFormat.java
index 74999db..8a0a26a 100644
--- a/jdbc-handler/src/main/java/org/apache/hive/storage/jdbc/JdbcInputFormat.java
+++ b/jdbc-handler/src/main/java/org/apache/hive/storage/jdbc/JdbcInputFormat.java
@@ -15,9 +15,15 @@
 
 package org.apache.hive.storage.jdbc;
 
+import org.apache.commons.lang3.tuple.MutablePair;
+import org.apache.commons.lang3.tuple.Pair;
 import org.apache.hadoop.fs.Path;
 import org.apache.hadoop.hive.conf.Constants;
 import org.apache.hadoop.hive.ql.io.HiveInputFormat;
+import org.apache.hadoop.hive.serde.serdeConstants;
+import org.apache.hadoop.hive.serde2.typeinfo.PrimitiveTypeInfo;
+import org.apache.hadoop.hive.serde2.typeinfo.TypeInfo;
+import org.apache.hadoop.hive.serde2.typeinfo.TypeInfoUtils;
 import org.apache.hadoop.io.LongWritable;
 import org.apache.hadoop.io.MapWritable;
 import org.apache.hadoop.mapred.FileInputFormat;
@@ -25,6 +31,8 @@ import org.apache.hadoop.mapred.InputSplit;
 import org.apache.hadoop.mapred.JobConf;
 import org.apache.hadoop.mapred.RecordReader;
 import org.apache.hadoop.mapred.Reporter;
+import org.apache.hive.storage.jdbc.spitter.IntervalSplitter;
+import org.apache.hive.storage.jdbc.spitter.IntervalSplitterFactory;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 
@@ -32,6 +40,7 @@ import org.apache.hive.storage.jdbc.dao.DatabaseAccessor;
 import org.apache.hive.storage.jdbc.dao.DatabaseAccessorFactory;
 
 import java.io.IOException;
+import java.util.List;
 
 public class JdbcInputFormat extends HiveInputFormat<LongWritable, MapWritable> {
 
@@ -61,47 +70,100 @@ public class JdbcInputFormat extends HiveInputFormat<LongWritable, MapWritable>
   public InputSplit[] getSplits(JobConf job, int numSplits) throws IOException {
     try {
 
-      if (!job.getBoolean(Constants.JDBC_SPLIT_QUERY, true)) {
-        // We will not split this query
-        LOGGER.debug("Creating 1 input splits");
-        InputSplit[] splits = new InputSplit[1];
+      String partitionColumn = job.get(Constants.JDBC_PARTITION_COLUMN);
+      int numPartitions = job.getInt(Constants.JDBC_NUM_PARTITIONS, -1);
+      String lowerBound = job.get(Constants.JDBC_LOW_BOUND);
+      String upperBound = job.get(Constants.JDBC_UPPER_BOUND);
+
+      InputSplit[] splits;
+
+      if (!job.getBoolean(Constants.JDBC_SPLIT_QUERY, true) || numPartitions <= 1) {
+        // We will not split this query if:
+        // 1. hive.sql.query.split is set to false (either manually or automatically by calcite
+        // 2. numPartitions == 1
+        splits = new InputSplit[1];
         splits[0] = new JdbcInputSplit(FileInputFormat.getInputPaths(job)[0]);
+        LOGGER.info("Creating 1 input split " + splits[0]);
         return splits;
       }
 
-      // We will split this query into n splits
-      LOGGER.debug("Creating {} input splits", numSplits);
       dbAccessor = DatabaseAccessorFactory.getAccessor(job);
+      Path[] tablePaths = FileInputFormat.getInputPaths(job);
 
-      int numRecords = numSplits <=1 ? Integer.MAX_VALUE : dbAccessor.getTotalNumberOfRecords(job);
+      // We will split this query into n splits
+      LOGGER.debug("Creating {} input splits", numPartitions);
 
-      if (numRecords < numSplits) {
-        numSplits = numRecords;
-      }
+      if (partitionColumn != null) {
+        List<String> columnNames = dbAccessor.getColumnNames(job);
+        if (!columnNames.contains(partitionColumn)) {
+          throw new IOException("Cannot find partitionColumn:" + partitionColumn + " in " + columnNames);
+        }
+        List<TypeInfo> hiveColumnTypesList = TypeInfoUtils.getTypeInfosFromTypeString(job.get(serdeConstants.LIST_COLUMN_TYPES));
+        TypeInfo typeInfo = hiveColumnTypesList.get(columnNames.indexOf(partitionColumn));
+        if (!(typeInfo instanceof PrimitiveTypeInfo)) {
+          throw new IOException(partitionColumn + " is a complex type, only primitive type can be a partition column");
+        }
+        if (lowerBound == null || upperBound == null) {
+          Pair<String, String> boundary = dbAccessor.getBounds(job, partitionColumn, lowerBound == null,
+                  upperBound == null);
+          if (lowerBound == null) {
+            lowerBound = boundary.getLeft();
+          }
+          if (upperBound == null) {
+            upperBound = boundary.getRight();
+          }
+        }
+        if (lowerBound == null) {
+          throw new IOException("lowerBound of " + partitionColumn + " cannot be null");
+        }
+        if (upperBound == null) {
+          throw new IOException("upperBound of " + partitionColumn + " cannot be null");
+        }
+        IntervalSplitter intervalSplitter = IntervalSplitterFactory.newIntervalSpitter(typeInfo);
+        List<MutablePair<String, String>> intervals = intervalSplitter.getIntervals(lowerBound, upperBound, numPartitions,
+                typeInfo);
+        if (intervals.size()<=1) {
+          LOGGER.debug("Creating 1 input splits");
+          splits = new InputSplit[1];
+          splits[0] = new JdbcInputSplit(FileInputFormat.getInputPaths(job)[0]);
+          return splits;
+        }
+        intervals.get(0).setLeft(null);
+        intervals.get(intervals.size()-1).setRight(null);
+        splits = new InputSplit[intervals.size()];
+        for (int i = 0; i < intervals.size(); i++) {
+          splits[i] = new JdbcInputSplit(partitionColumn, intervals.get(i).getLeft(), intervals.get(i).getRight());
+        }
+      } else {
+        int numRecords = dbAccessor.getTotalNumberOfRecords(job);
 
-      if (numSplits <= 0) {
-        numSplits = 1;
-      }
+        if (numRecords < numPartitions) {
+          numPartitions = numRecords;
+        }
 
-      int numRecordsPerSplit = numRecords / numSplits;
-      int numSplitsWithExtraRecords = numRecords % numSplits;
+        int numRecordsPerSplit = numRecords / numPartitions;
+        int numSplitsWithExtraRecords = numRecords % numPartitions;
 
-      LOGGER.debug("Num records = {}", numRecords);
-      InputSplit[] splits = new InputSplit[numSplits];
-      Path[] tablePaths = FileInputFormat.getInputPaths(job);
+        LOGGER.debug("Num records = {}", numRecords);
+        splits = new InputSplit[numPartitions];
 
-      int offset = 0;
-      for (int i = 0; i < numSplits; i++) {
-        int numRecordsInThisSplit = numRecordsPerSplit;
-        if (i < numSplitsWithExtraRecords) {
-          numRecordsInThisSplit++;
-        }
+        int offset = 0;
+        for (int i = 0; i < numPartitions; i++) {
+          int numRecordsInThisSplit = numRecordsPerSplit;
+          if (i < numSplitsWithExtraRecords) {
+            numRecordsInThisSplit++;
+          }
 
-        splits[i] = new JdbcInputSplit(numRecordsInThisSplit, offset, tablePaths[0]);
-        offset += numRecordsInThisSplit;
+          splits[i] = new JdbcInputSplit(numRecordsInThisSplit, offset, tablePaths[0]);
+          offset += numRecordsInThisSplit;
+        }
       }
 
       dbAccessor = null;
+      LOGGER.info("Num input splits created {}", splits.length);
+      for (InputSplit split : splits) {
+        LOGGER.info("split:" + split.toString());
+      }
       return splits;
     }
     catch (Exception e) {

http://git-wip-us.apache.org/repos/asf/hive/blob/3e5e77d1/jdbc-handler/src/main/java/org/apache/hive/storage/jdbc/JdbcInputSplit.java
----------------------------------------------------------------------
diff --git a/jdbc-handler/src/main/java/org/apache/hive/storage/jdbc/JdbcInputSplit.java b/jdbc-handler/src/main/java/org/apache/hive/storage/jdbc/JdbcInputSplit.java
index 3a6ada8..a10ed75 100644
--- a/jdbc-handler/src/main/java/org/apache/hive/storage/jdbc/JdbcInputSplit.java
+++ b/jdbc-handler/src/main/java/org/apache/hive/storage/jdbc/JdbcInputSplit.java
@@ -28,6 +28,9 @@ public class JdbcInputSplit extends FileSplit implements InputSplit {
 
   private int limit = 0;
   private int offset = 0;
+  private String partitionColumn = null;
+  private String lowerBound = null;
+  private String upperBound = null;
 
 
   public JdbcInputSplit() {
@@ -54,12 +57,26 @@ public class JdbcInputSplit extends FileSplit implements InputSplit {
     this.offset = offset;
   }
 
+  public JdbcInputSplit(String partitionColumn, String lowerBound, String upperBound) {
+    super(null, 0, 0, EMPTY_ARRAY);
+    this.partitionColumn = partitionColumn;
+    this.lowerBound = lowerBound;
+    this.upperBound = upperBound;
+  }
 
   @Override
   public void write(DataOutput out) throws IOException {
     super.write(out);
     out.writeInt(limit);
     out.writeInt(offset);
+    if (partitionColumn != null) {
+      out.writeBoolean(true);
+      out.writeUTF(partitionColumn);
+      out.writeUTF(lowerBound);
+      out.writeUTF(upperBound);
+    } else {
+      out.writeBoolean(false);
+    }
   }
 
 
@@ -68,6 +85,12 @@ public class JdbcInputSplit extends FileSplit implements InputSplit {
     super.readFields(in);
     limit = in.readInt();
     offset = in.readInt();
+    boolean partitionColumnExists = in.readBoolean();
+    if (partitionColumnExists) {
+      partitionColumn = in.readUTF();
+      lowerBound = in.readUTF();
+      upperBound = in.readUTF();
+    }
   }
 
 
@@ -102,4 +125,35 @@ public class JdbcInputSplit extends FileSplit implements InputSplit {
     this.offset = offset;
   }
 
+  public String getPartitionColumn() {
+    return this.partitionColumn;
+  }
+
+  public String getLowerBound() {
+    return this.lowerBound;
+  }
+
+  public String getUpperBound() {
+    return this.upperBound;
+  }
+
+  @Override
+  public String toString() {
+    StringBuilder sb = new StringBuilder();
+    if (partitionColumn != null) {
+      sb.append("interval:");
+      sb.append(partitionColumn).append("[");
+      if (lowerBound != null) {
+        sb.append(lowerBound);
+      }
+      sb.append(",");
+      if (upperBound != null) {
+        sb.append(upperBound);
+      }
+      sb.append(")");
+    } else {
+      sb.append("limit:" + limit + ", offset:" + offset);
+    }
+    return sb.toString();
+  }
 }

http://git-wip-us.apache.org/repos/asf/hive/blob/3e5e77d1/jdbc-handler/src/main/java/org/apache/hive/storage/jdbc/JdbcRecordReader.java
----------------------------------------------------------------------
diff --git a/jdbc-handler/src/main/java/org/apache/hive/storage/jdbc/JdbcRecordReader.java b/jdbc-handler/src/main/java/org/apache/hive/storage/jdbc/JdbcRecordReader.java
index 1da6213..a3248b4 100644
--- a/jdbc-handler/src/main/java/org/apache/hive/storage/jdbc/JdbcRecordReader.java
+++ b/jdbc-handler/src/main/java/org/apache/hive/storage/jdbc/JdbcRecordReader.java
@@ -55,7 +55,8 @@ public class JdbcRecordReader implements RecordReader<LongWritable, MapWritable>
       LOGGER.trace("JdbcRecordReader.next called");
       if (dbAccessor == null) {
         dbAccessor = DatabaseAccessorFactory.getAccessor(conf);
-        iterator = dbAccessor.getRecordIterator(conf, split.getLimit(), split.getOffset());
+        iterator = dbAccessor.getRecordIterator(conf, split.getPartitionColumn(), split.getLowerBound(), split
+                        .getUpperBound(), split.getLimit(), split.getOffset());
       }
 
       if (iterator.hasNext()) {

http://git-wip-us.apache.org/repos/asf/hive/blob/3e5e77d1/jdbc-handler/src/main/java/org/apache/hive/storage/jdbc/JdbcSerDe.java
----------------------------------------------------------------------
diff --git a/jdbc-handler/src/main/java/org/apache/hive/storage/jdbc/JdbcSerDe.java b/jdbc-handler/src/main/java/org/apache/hive/storage/jdbc/JdbcSerDe.java
index 5947628..b68340c 100644
--- a/jdbc-handler/src/main/java/org/apache/hive/storage/jdbc/JdbcSerDe.java
+++ b/jdbc-handler/src/main/java/org/apache/hive/storage/jdbc/JdbcSerDe.java
@@ -16,9 +16,10 @@ package org.apache.hive.storage.jdbc;
 
 import com.google.common.base.Preconditions;
 import org.apache.hadoop.conf.Configuration;
-import org.apache.hadoop.hive.conf.Constants;
 import org.apache.hadoop.hive.common.type.Date;
+import org.apache.hadoop.hive.common.type.HiveDecimal;
 import org.apache.hadoop.hive.common.type.Timestamp;
+import org.apache.hadoop.hive.conf.Constants;
 import org.apache.hadoop.hive.serde.serdeConstants;
 import org.apache.hadoop.hive.serde2.AbstractSerDe;
 import org.apache.hadoop.hive.serde2.SerDeException;
@@ -27,6 +28,7 @@ import org.apache.hadoop.hive.serde2.objectinspector.ObjectInspector;
 import org.apache.hadoop.hive.serde2.objectinspector.ObjectInspector.Category;
 import org.apache.hadoop.hive.serde2.objectinspector.ObjectInspectorFactory;
 import org.apache.hadoop.hive.serde2.objectinspector.primitive.PrimitiveObjectInspectorFactory;
+import org.apache.hadoop.hive.serde2.typeinfo.DecimalTypeInfo;
 import org.apache.hadoop.hive.serde2.typeinfo.PrimitiveTypeInfo;
 import org.apache.hadoop.hive.serde2.typeinfo.TypeInfo;
 import org.apache.hadoop.hive.serde2.typeinfo.TypeInfoUtils;
@@ -35,13 +37,13 @@ import org.apache.hadoop.io.MapWritable;
 import org.apache.hadoop.io.ObjectWritable;
 import org.apache.hadoop.io.Text;
 import org.apache.hadoop.io.Writable;
+import org.apache.hive.storage.jdbc.conf.JdbcStorageConfigManager;
+import org.apache.hive.storage.jdbc.dao.DatabaseAccessor;
+import org.apache.hive.storage.jdbc.dao.DatabaseAccessorFactory;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 
 import org.apache.hive.storage.jdbc.conf.JdbcStorageConfig;
-import org.apache.hive.storage.jdbc.conf.JdbcStorageConfigManager;
-import org.apache.hive.storage.jdbc.dao.DatabaseAccessor;
-import org.apache.hive.storage.jdbc.dao.DatabaseAccessorFactory;
 
 import java.math.BigDecimal;
 import java.util.ArrayList;
@@ -73,42 +75,33 @@ public class JdbcSerDe extends AbstractSerDe {
       if (properties.containsKey(JdbcStorageConfig.DATABASE_TYPE.getPropertyName())) {
         Configuration tableConfig = JdbcStorageConfigManager.convertPropertiesToConfiguration(properties);
         DatabaseAccessor dbAccessor = DatabaseAccessorFactory.getAccessor(tableConfig);
-
-        // Extract information from properties
-        String[] jdbcColumnNamesArray;
-        List<TypeInfo> hiveColumnTypesArray;
+        // Extract column names and types from properties
+        List<TypeInfo> hiveColumnTypesList;
         if (properties.containsKey(Constants.JDBC_TABLE) && properties.containsKey(Constants.JDBC_QUERY)) {
           // The query has been autogenerated by Hive, the column names are the
           // same in the query pushed and the list of hiveColumnNames
           String fieldNamesProperty =
-              Preconditions.checkNotNull(properties.getProperty(Constants.JDBC_QUERY_FIELD_NAMES, null));
+                  Preconditions.checkNotNull(properties.getProperty(Constants.JDBC_QUERY_FIELD_NAMES, null));
           String fieldTypesProperty =
-              Preconditions.checkNotNull(properties.getProperty(Constants.JDBC_QUERY_FIELD_TYPES, null));
+                  Preconditions.checkNotNull(properties.getProperty(Constants.JDBC_QUERY_FIELD_TYPES, null));
           hiveColumnNames = fieldNamesProperty.trim().split(",");
-          jdbcColumnNamesArray = hiveColumnNames;
-          hiveColumnTypesArray = TypeInfoUtils.getTypeInfosFromTypeString(fieldTypesProperty);
+          hiveColumnTypesList = TypeInfoUtils.getTypeInfosFromTypeString(fieldTypesProperty);
         } else {
-          // The query was hardcoded by user or we are creating the table.
-          // We obtain the column names with the db accessor.
-          List<String> columnNames = dbAccessor.getColumnNames(tableConfig);
-          hiveColumnNames = columnNames.toArray(new String[columnNames.size()]);
-          // These are the column names for the table defined with the JDBC storage handler.
-          jdbcColumnNamesArray = parseProperty(properties.getProperty(serdeConstants.LIST_COLUMNS), ",");
-          if (hiveColumnNames.length != jdbcColumnNamesArray.length) {
-            throw new SerDeException("Expected " + hiveColumnNames.length + " hiveColumnNames. Table definition has "
-                    + jdbcColumnNamesArray.length + " hiveColumnNames");
-          }
-          hiveColumnTypesArray = TypeInfoUtils.getTypeInfosFromTypeString(properties.getProperty(serdeConstants.LIST_COLUMN_TYPES));
+          hiveColumnNames = properties.getProperty(serdeConstants.LIST_COLUMNS).split(",");
+          hiveColumnTypesList = TypeInfoUtils.getTypeInfosFromTypeString(properties.getProperty(serdeConstants.LIST_COLUMN_TYPES));
+        }
+        if (hiveColumnNames.length == 0) {
+          throw new SerDeException("Received an empty Hive column name definition");
         }
-        if (hiveColumnTypesArray.size() == 0) {
+        if (hiveColumnTypesList.size() == 0) {
           throw new SerDeException("Received an empty Hive column type definition");
         }
 
         // Populate column types and inspector
-        hiveColumnTypes = new PrimitiveTypeInfo[hiveColumnTypesArray.size()];
+        hiveColumnTypes = new PrimitiveTypeInfo[hiveColumnTypesList.size()];
         List<ObjectInspector> fieldInspectors = new ArrayList<>(hiveColumnNames.length);
         for (int i = 0; i < hiveColumnNames.length; i++) {
-          TypeInfo ti = hiveColumnTypesArray.get(i);
+          TypeInfo ti = hiveColumnTypesList.get(i);
           if (ti.getCategory() != Category.PRIMITIVE) {
             throw new SerDeException("Non primitive types not supported yet");
           }
@@ -117,7 +110,7 @@ public class JdbcSerDe extends AbstractSerDe {
               PrimitiveObjectInspectorFactory.getPrimitiveJavaObjectInspector(hiveColumnTypes[i]));
         }
         inspector =
-            ObjectInspectorFactory.getStandardStructObjectInspector(Arrays.asList(jdbcColumnNamesArray),
+            ObjectInspectorFactory.getStandardStructObjectInspector(Arrays.asList(hiveColumnNames),
                 fieldInspectors);
         row = new ArrayList<>(hiveColumnNames.length);
       }
@@ -128,16 +121,6 @@ public class JdbcSerDe extends AbstractSerDe {
     }
   }
 
-
-  private String[] parseProperty(String propertyValue, String delimiter) {
-    if ((propertyValue == null) || (propertyValue.trim().isEmpty())) {
-      return new String[] {};
-    }
-
-    return propertyValue.split(delimiter);
-  }
-
-
   @Override
   public Object deserialize(Writable blob) throws SerDeException {
     LOGGER.trace("Deserializing from SerDe");
@@ -195,9 +178,9 @@ public class JdbcSerDe extends AbstractSerDe {
           }
           break;
         case DECIMAL:
-          if (!(rowVal instanceof BigDecimal)) {
-            rowVal = new BigDecimal(rowVal.toString());
-          }
+          int scale = ((DecimalTypeInfo)hiveColumnTypes[i]).getScale();
+          rowVal = HiveDecimal.create(rowVal.toString());
+          ((HiveDecimal)rowVal).setScale(scale, BigDecimal.ROUND_HALF_EVEN);
           break;
         case BOOLEAN:
           if (rowVal instanceof Number) {

http://git-wip-us.apache.org/repos/asf/hive/blob/3e5e77d1/jdbc-handler/src/main/java/org/apache/hive/storage/jdbc/conf/JdbcStorageConfigManager.java
----------------------------------------------------------------------
diff --git a/jdbc-handler/src/main/java/org/apache/hive/storage/jdbc/conf/JdbcStorageConfigManager.java b/jdbc-handler/src/main/java/org/apache/hive/storage/jdbc/conf/JdbcStorageConfigManager.java
index 18e2397..5679f1b 100644
--- a/jdbc-handler/src/main/java/org/apache/hive/storage/jdbc/conf/JdbcStorageConfigManager.java
+++ b/jdbc-handler/src/main/java/org/apache/hive/storage/jdbc/conf/JdbcStorageConfigManager.java
@@ -124,6 +124,22 @@ public class JdbcStorageConfigManager {
     return config.get(key.getPropertyName());
   }
 
+  public static String getOrigQueryToExecute(Configuration config) {
+    String query;
+    String tableName = config.get(Constants.JDBC_TABLE);
+    if (tableName != null) {
+      // We generate query as select *
+      query = "select * from " + tableName;
+      String hiveFilterCondition = QueryConditionBuilder.getInstance().buildCondition(config);
+      if ((hiveFilterCondition != null) && (!hiveFilterCondition.trim().isEmpty())) {
+        query = query + " WHERE " + hiveFilterCondition;
+      }
+    } else {
+      query = config.get(Constants.JDBC_QUERY);
+    }
+
+    return query;
+  }
 
   public static String getQueryToExecute(Configuration config) {
     String query = config.get(Constants.JDBC_QUERY);

http://git-wip-us.apache.org/repos/asf/hive/blob/3e5e77d1/jdbc-handler/src/main/java/org/apache/hive/storage/jdbc/dao/DatabaseAccessor.java
----------------------------------------------------------------------
diff --git a/jdbc-handler/src/main/java/org/apache/hive/storage/jdbc/dao/DatabaseAccessor.java b/jdbc-handler/src/main/java/org/apache/hive/storage/jdbc/dao/DatabaseAccessor.java
index fdaa794..a6d0306 100644
--- a/jdbc-handler/src/main/java/org/apache/hive/storage/jdbc/dao/DatabaseAccessor.java
+++ b/jdbc-handler/src/main/java/org/apache/hive/storage/jdbc/dao/DatabaseAccessor.java
@@ -14,8 +14,10 @@
  */
 package org.apache.hive.storage.jdbc.dao;
 
+import org.apache.commons.lang3.tuple.Pair;
 import org.apache.hadoop.conf.Configuration;
 
+import org.apache.hadoop.hive.serde2.typeinfo.PrimitiveTypeInfo;
 import org.apache.hive.storage.jdbc.exception.HiveJdbcDatabaseAccessException;
 
 import java.util.List;
@@ -24,11 +26,15 @@ public interface DatabaseAccessor {
 
   List<String> getColumnNames(Configuration conf) throws HiveJdbcDatabaseAccessException;
 
-  List<String> getColumnTypes(Configuration conf) throws HiveJdbcDatabaseAccessException;
-
   int getTotalNumberOfRecords(Configuration conf) throws HiveJdbcDatabaseAccessException;
 
   JdbcRecordIterator
-    getRecordIterator(Configuration conf, int limit, int offset) throws HiveJdbcDatabaseAccessException;
+    getRecordIterator(Configuration conf, String partitionColumn, String lowerBound, String upperBound, int limit, int
+          offset) throws
+          HiveJdbcDatabaseAccessException;
+
+  Pair<String, String> getBounds(Configuration conf, String partitionColumn, boolean lower, boolean upper) throws
+          HiveJdbcDatabaseAccessException;
 
+  boolean needColumnQuote();
 }

http://git-wip-us.apache.org/repos/asf/hive/blob/3e5e77d1/jdbc-handler/src/main/java/org/apache/hive/storage/jdbc/dao/GenericJdbcDatabaseAccessor.java
----------------------------------------------------------------------
diff --git a/jdbc-handler/src/main/java/org/apache/hive/storage/jdbc/dao/GenericJdbcDatabaseAccessor.java b/jdbc-handler/src/main/java/org/apache/hive/storage/jdbc/dao/GenericJdbcDatabaseAccessor.java
index abdc5f0..607c45c 100644
--- a/jdbc-handler/src/main/java/org/apache/hive/storage/jdbc/dao/GenericJdbcDatabaseAccessor.java
+++ b/jdbc-handler/src/main/java/org/apache/hive/storage/jdbc/dao/GenericJdbcDatabaseAccessor.java
@@ -14,11 +14,15 @@
  */
 package org.apache.hive.storage.jdbc.dao;
 
+import com.google.common.base.Preconditions;
 import org.apache.commons.dbcp.BasicDataSourceFactory;
+import org.apache.commons.lang3.tuple.ImmutablePair;
+import org.apache.commons.lang3.tuple.Pair;
 import org.apache.hadoop.conf.Configuration;
 import org.apache.hadoop.hive.conf.Constants;
 import org.apache.hadoop.hive.ql.exec.Utilities;
 import org.apache.hadoop.hive.serde.serdeConstants;
+import org.apache.hadoop.hive.serde2.typeinfo.PrimitiveTypeInfo;
 import org.apache.hadoop.io.Text;
 import org.apache.hadoop.security.Credentials;
 import org.apache.hadoop.security.UserGroupInformation;
@@ -43,6 +47,8 @@ import java.util.List;
 import java.util.Map;
 import java.util.Map.Entry;
 import java.util.Properties;
+import java.util.regex.Matcher;
+import java.util.regex.Pattern;
 
 /**
  * A data accessor that should in theory work with all JDBC compliant database drivers.
@@ -53,6 +59,7 @@ public class GenericJdbcDatabaseAccessor implements DatabaseAccessor {
   protected static final int DEFAULT_FETCH_SIZE = 1000;
   protected static final Logger LOGGER = LoggerFactory.getLogger(GenericJdbcDatabaseAccessor.class);
   protected DataSource dbcpDataSource = null;
+  static final Pattern fromPattern = Pattern.compile("(.*?\\sfrom\\s)(.*+)", Pattern.CASE_INSENSITIVE);
 
 
   public GenericJdbcDatabaseAccessor() {
@@ -67,7 +74,8 @@ public class GenericJdbcDatabaseAccessor implements DatabaseAccessor {
 
     try {
       initializeDatabaseConnection(conf);
-      String metadataQuery = getMetaDataQuery(conf);
+      String query = JdbcStorageConfigManager.getOrigQueryToExecute(conf);
+      String metadataQuery = getMetaDataQuery(query);
       LOGGER.debug("Query to execute is [{}]", metadataQuery);
 
       conn = dbcpDataSource.getConnection();
@@ -94,75 +102,11 @@ public class GenericJdbcDatabaseAccessor implements DatabaseAccessor {
   }
 
 
-  protected String getMetaDataQuery(Configuration conf) {
-    String sql = JdbcStorageConfigManager.getQueryToExecute(conf);
-    String metadataQuery = addLimitToQuery(sql, 1);
-    return metadataQuery;
+  protected String getMetaDataQuery(String sql) {
+    return addLimitToQuery(sql, 1);
   }
 
   @Override
-  public List<String> getColumnTypes(Configuration conf) throws HiveJdbcDatabaseAccessException {
-    Connection conn = null;
-    PreparedStatement ps = null;
-    ResultSet rs = null;
-
-    try {
-      initializeDatabaseConnection(conf);
-      String metadataQuery = getMetaDataQuery(conf);
-      LOGGER.debug("Query to execute is [{}]", metadataQuery);
-
-      conn = dbcpDataSource.getConnection();
-      ps = conn.prepareStatement(metadataQuery);
-      rs = ps.executeQuery();
-
-      ResultSetMetaData metadata = rs.getMetaData();
-      int numColumns = metadata.getColumnCount();
-      List<String> columnTypes = new ArrayList<String>(numColumns);
-      for (int i = 0; i < numColumns; i++) {
-        switch (metadata.getColumnType(i + 1)) {
-        case Types.CHAR:
-          columnTypes.add(serdeConstants.STRING_TYPE_NAME);
-          break;
-        case Types.INTEGER:
-          columnTypes.add(serdeConstants.INT_TYPE_NAME);
-          break;
-        case Types.BIGINT:
-          columnTypes.add(serdeConstants.BIGINT_TYPE_NAME);
-          break;
-        case Types.DECIMAL:
-          columnTypes.add(serdeConstants.DECIMAL_TYPE_NAME);
-          break;
-        case Types.FLOAT:
-        case Types.REAL:
-          columnTypes.add(serdeConstants.FLOAT_TYPE_NAME);
-          break;
-        case Types.DOUBLE:
-          columnTypes.add(serdeConstants.DOUBLE_TYPE_NAME);
-          break;
-        case Types.DATE:
-          columnTypes.add(serdeConstants.DATE_TYPE_NAME);
-          break;
-        case Types.TIMESTAMP:
-          columnTypes.add(serdeConstants.TIMESTAMP_TYPE_NAME);
-          break;
-
-        default:
-          columnTypes.add(metadata.getColumnTypeName(i+1));
-          break;
-        }
-      }
-
-      return columnTypes;
-    } catch (Exception e) {
-      LOGGER.error("Error while trying to get column names.", e);
-      throw new HiveJdbcDatabaseAccessException("Error while trying to get column names: " + e.getMessage(), e);
-    } finally {
-      cleanupResources(conn, ps, rs);
-    }
-  }
-
-
-  @Override
   public int getTotalNumberOfRecords(Configuration conf) throws HiveJdbcDatabaseAccessException {
     Connection conn = null;
     PreparedStatement ps = null;
@@ -200,7 +144,9 @@ public class GenericJdbcDatabaseAccessor implements DatabaseAccessor {
 
   @Override
   public JdbcRecordIterator
-    getRecordIterator(Configuration conf, int limit, int offset) throws HiveJdbcDatabaseAccessException {
+    getRecordIterator(Configuration conf, String partitionColumn, String lowerBound, String upperBound, int limit, int
+          offset) throws
+          HiveJdbcDatabaseAccessException {
 
     Connection conn = null;
     PreparedStatement ps = null;
@@ -208,16 +154,22 @@ public class GenericJdbcDatabaseAccessor implements DatabaseAccessor {
 
     try {
       initializeDatabaseConnection(conf);
+      String tableName = conf.get(Constants.JDBC_TABLE);
       String sql = JdbcStorageConfigManager.getQueryToExecute(conf);
-      String limitQuery = addLimitAndOffsetToQuery(sql, limit, offset);
-      LOGGER.info("Query to execute is [{}]", limitQuery);
+      String partitionQuery;
+      if (partitionColumn != null) {
+        partitionQuery = addBoundaryToQuery(tableName, sql, partitionColumn, lowerBound, upperBound);
+      } else {
+        partitionQuery = addLimitAndOffsetToQuery(sql, limit, offset);
+      }
+      LOGGER.info("Query to execute is [{}]", partitionQuery);
 
       conn = dbcpDataSource.getConnection();
-      ps = conn.prepareStatement(limitQuery, ResultSet.TYPE_FORWARD_ONLY, ResultSet.CONCUR_READ_ONLY);
+      ps = conn.prepareStatement(partitionQuery, ResultSet.TYPE_FORWARD_ONLY, ResultSet.CONCUR_READ_ONLY);
       ps.setFetchSize(getFetchSize(conf));
       rs = ps.executeQuery();
 
-      return new JdbcRecordIterator(conn, ps, rs);
+      return new JdbcRecordIterator(conn, ps, rs, conf);
     }
     catch (Exception e) {
       LOGGER.error("Caught exception while trying to execute query", e);
@@ -245,7 +197,6 @@ public class GenericJdbcDatabaseAccessor implements DatabaseAccessor {
     }
   }
 
-
   /*
    * Uses generic JDBC escape functions to add a limit clause to a query string
    */
@@ -256,6 +207,54 @@ public class GenericJdbcDatabaseAccessor implements DatabaseAccessor {
     return sql + " {LIMIT " + limit + "}";
   }
 
+  protected String addBoundaryToQuery(String tableName, String sql, String partitionColumn, String lowerBound,
+          String upperBound) {
+    String boundaryQuery;
+    if (tableName != null) {
+      boundaryQuery = "SELECT * FROM " + tableName + " WHERE ";
+    } else {
+      boundaryQuery = "SELECT * FROM (" + sql + ") tmptable WHERE ";
+    }
+    if (lowerBound != null) {
+      boundaryQuery += quote() + partitionColumn + quote() + " >= " + lowerBound;
+    }
+    if (upperBound != null) {
+      if (lowerBound != null) {
+        boundaryQuery += " AND ";
+      }
+      boundaryQuery += quote() + partitionColumn + quote() + " < " + upperBound;
+    }
+    if (lowerBound == null && upperBound != null) {
+      boundaryQuery += " OR " + quote() + partitionColumn + quote() + " IS NULL";
+    }
+    String result;
+    if (tableName != null) {
+      // Looking for table name in from clause, replace with the boundary query
+      // TODO consolidate this
+      // Currently only use simple string match, this should be improved by looking
+      // for only table name in from clause
+      String tableString = null;
+      Matcher m = fromPattern.matcher(sql);
+      Preconditions.checkArgument(m.matches());
+      String queryBeforeFrom = m.group(1);
+      String queryAfterFrom = m.group(2);
+
+      Character[] possibleDelimits = new Character[] {'`', '\"', ' '};
+      for (Character possibleDelimit : possibleDelimits) {
+        if (queryAfterFrom.contains(possibleDelimit + tableName + possibleDelimit)) {
+          tableString = possibleDelimit + tableName + possibleDelimit;
+          break;
+        }
+      }
+      if (tableString == null) {
+        throw new RuntimeException("Cannot find " + tableName + " in sql query " + sql);
+      }
+      result = queryBeforeFrom + queryAfterFrom.replace(tableString, " (" + boundaryQuery + ") " + tableName + " ");
+    } else {
+      result = boundaryQuery;
+    }
+    return result;
+  }
 
   protected void cleanupResources(Connection conn, PreparedStatement ps, ResultSet rs) {
     try {
@@ -344,4 +343,75 @@ public class GenericJdbcDatabaseAccessor implements DatabaseAccessor {
   protected int getFetchSize(Configuration conf) {
     return conf.getInt(JdbcStorageConfig.JDBC_FETCH_SIZE.getPropertyName(), DEFAULT_FETCH_SIZE);
   }
+
+  @Override
+  public Pair<String, String> getBounds(Configuration conf, String partitionColumn, boolean retrieveMin, boolean
+          retrieveMax) throws HiveJdbcDatabaseAccessException {
+    Connection conn = null;
+    PreparedStatement ps = null;
+    ResultSet rs = null;
+
+    try {
+      Preconditions.checkArgument(retrieveMin || retrieveMax);
+      initializeDatabaseConnection(conf);
+      String sql = JdbcStorageConfigManager.getOrigQueryToExecute(conf);
+      String minClause = "MIN(" + quote() + partitionColumn  + quote() + ")";
+      String maxClause = "MAX(" + quote() + partitionColumn  + quote() + ")";
+      String countQuery = "SELECT ";
+      if (retrieveMin) {
+        countQuery += minClause;
+      }
+      if (retrieveMax) {
+        if (retrieveMin) {
+          countQuery += ",";
+        }
+        countQuery += maxClause;
+      }
+      countQuery += " FROM (" + sql + ") tmptable " + "WHERE " + quote() + partitionColumn + quote() + " IS NOT NULL";
+
+      LOGGER.debug("MIN/MAX Query to execute is [{}]", countQuery);
+
+      conn = dbcpDataSource.getConnection();
+      ps = conn.prepareStatement(countQuery);
+      rs = ps.executeQuery();
+      String lower = null, upper = null;
+      int pos = 1;
+      if (rs.next()) {
+        if (retrieveMin) {
+          lower = rs.getString(pos);
+          pos++;
+        }
+        if (retrieveMax) {
+          upper = rs.getString(pos);
+        }
+        return new ImmutablePair<>(lower, upper);
+      }
+      else {
+        LOGGER.warn("The count query did not return any results.", countQuery);
+        throw new HiveJdbcDatabaseAccessException("MIN/MAX query did not return any results.");
+      }
+    }
+    catch (HiveJdbcDatabaseAccessException he) {
+      throw he;
+    }
+    catch (Exception e) {
+      LOGGER.error("Caught exception while trying to get MIN/MAX of " + partitionColumn, e);
+      throw new HiveJdbcDatabaseAccessException(e);
+    }
+    finally {
+      cleanupResources(conn, ps, rs);
+    }
+  }
+
+  private String quote() {
+    if (needColumnQuote()) {
+      return "\"";
+    } else {
+      return "";
+    }
+  }
+  @Override
+  public boolean needColumnQuote() {
+    return true;
+  }
 }

http://git-wip-us.apache.org/repos/asf/hive/blob/3e5e77d1/jdbc-handler/src/main/java/org/apache/hive/storage/jdbc/dao/JdbcRecordIterator.java
----------------------------------------------------------------------
diff --git a/jdbc-handler/src/main/java/org/apache/hive/storage/jdbc/dao/JdbcRecordIterator.java b/jdbc-handler/src/main/java/org/apache/hive/storage/jdbc/dao/JdbcRecordIterator.java
index a95aca2..27538f7 100644
--- a/jdbc-handler/src/main/java/org/apache/hive/storage/jdbc/dao/JdbcRecordIterator.java
+++ b/jdbc-handler/src/main/java/org/apache/hive/storage/jdbc/dao/JdbcRecordIterator.java
@@ -14,15 +14,24 @@
  */
 package org.apache.hive.storage.jdbc.dao;
 
+import com.google.common.base.Preconditions;
+import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.hive.conf.Constants;
+import org.apache.hadoop.hive.serde.serdeConstants;
+import org.apache.hadoop.hive.serde2.objectinspector.PrimitiveObjectInspector;
+import org.apache.hadoop.hive.serde2.typeinfo.PrimitiveTypeInfo;
+import org.apache.hadoop.hive.serde2.typeinfo.TypeInfo;
+import org.apache.hadoop.hive.serde2.typeinfo.TypeInfoUtils;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 
 import java.sql.Connection;
 import java.sql.PreparedStatement;
 import java.sql.ResultSet;
-import java.sql.ResultSetMetaData;
+import java.sql.SQLDataException;
 import java.util.HashMap;
 import java.util.Iterator;
+import java.util.List;
 import java.util.Map;
 
 /**
@@ -35,12 +44,24 @@ public class JdbcRecordIterator implements Iterator<Map<String, Object>> {
   private Connection conn;
   private PreparedStatement ps;
   private ResultSet rs;
+  private String[] hiveColumnNames;
+  List<TypeInfo> hiveColumnTypesList;
 
-
-  public JdbcRecordIterator(Connection conn, PreparedStatement ps, ResultSet rs) {
+  public JdbcRecordIterator(Connection conn, PreparedStatement ps, ResultSet rs, Configuration conf) {
     this.conn = conn;
     this.ps = ps;
     this.rs = rs;
+    String fieldNamesProperty;
+    String fieldTypesProperty;
+    if (conf.get(Constants.JDBC_TABLE) != null && conf.get(Constants.JDBC_QUERY) != null) {
+      fieldNamesProperty = Preconditions.checkNotNull(conf.get(Constants.JDBC_QUERY_FIELD_NAMES));
+      fieldTypesProperty = Preconditions.checkNotNull(conf.get(Constants.JDBC_QUERY_FIELD_TYPES));
+    } else {
+      fieldNamesProperty = Preconditions.checkNotNull(conf.get(serdeConstants.LIST_COLUMNS));
+      fieldTypesProperty = Preconditions.checkNotNull(conf.get(serdeConstants.LIST_COLUMN_TYPES));
+    }
+    hiveColumnNames = fieldNamesProperty.trim().split(",");
+    hiveColumnTypesList = TypeInfoUtils.getTypeInfosFromTypeString(fieldTypesProperty);
   }
 
 
@@ -59,14 +80,63 @@ public class JdbcRecordIterator implements Iterator<Map<String, Object>> {
   @Override
   public Map<String, Object> next() {
     try {
-      ResultSetMetaData metadata = rs.getMetaData();
-      int numColumns = metadata.getColumnCount();
-      Map<String, Object> record = new HashMap<String, Object>(numColumns);
-      for (int i = 0; i < numColumns; i++) {
-        String key = metadata.getColumnName(i + 1);
-        Object value = rs.getObject(i + 1);
-
-        record.put(key, value);
+      Map<String, Object> record = new HashMap<String, Object>(hiveColumnNames.length);
+      for (int i = 0; i < hiveColumnNames.length; i++) {
+        String key = hiveColumnNames[i];
+        Object value = null;
+        if (!(hiveColumnTypesList.get(i) instanceof PrimitiveTypeInfo)) {
+          throw new RuntimeException("date type of column " + hiveColumnNames[i] + ":" +
+                  hiveColumnTypesList.get(i).getTypeName() + " is not supported");
+        }
+        try {
+          switch (((PrimitiveTypeInfo) hiveColumnTypesList.get(i)).getPrimitiveCategory()) {
+            case INT:
+            case SHORT:
+            case BYTE:
+              value = rs.getInt(i + 1);
+              break;
+            case LONG:
+              value = rs.getLong(i + 1);
+              break;
+            case FLOAT:
+              value = rs.getFloat(i + 1);
+              break;
+            case DOUBLE:
+              value = rs.getDouble(i + 1);
+              break;
+            case DECIMAL:
+              value = rs.getBigDecimal(i + 1);
+              break;
+            case BOOLEAN:
+              value = rs.getBoolean(i + 1);
+              break;
+            case CHAR:
+            case VARCHAR:
+            case STRING:
+              value = rs.getString(i + 1);
+              break;
+            case DATE:
+              value = rs.getDate(i + 1);
+              break;
+            case TIMESTAMP:
+              value = rs.getTimestamp(i + 1);
+              break;
+            default:
+              LOGGER.error("date type of column " + hiveColumnNames[i] + ":" +
+                      ((PrimitiveTypeInfo) hiveColumnTypesList.get(i)).getPrimitiveCategory() +
+                      " is not supported");
+              value = null;
+              break;
+          }
+          if (value != null && !rs.wasNull()) {
+            record.put(key, value);
+          } else {
+            record.put(key, null);
+          }
+        } catch (SQLDataException e) {
+          record.put(key, null);
+        }
+
       }
 
       return record;

http://git-wip-us.apache.org/repos/asf/hive/blob/3e5e77d1/jdbc-handler/src/main/java/org/apache/hive/storage/jdbc/dao/JethroDatabaseAccessor.java
----------------------------------------------------------------------
diff --git a/jdbc-handler/src/main/java/org/apache/hive/storage/jdbc/dao/JethroDatabaseAccessor.java b/jdbc-handler/src/main/java/org/apache/hive/storage/jdbc/dao/JethroDatabaseAccessor.java
index db0454e..db72a1b 100644
--- a/jdbc-handler/src/main/java/org/apache/hive/storage/jdbc/dao/JethroDatabaseAccessor.java
+++ b/jdbc-handler/src/main/java/org/apache/hive/storage/jdbc/dao/JethroDatabaseAccessor.java
@@ -43,8 +43,7 @@ public class JethroDatabaseAccessor extends GenericJdbcDatabaseAccessor {
   }
 
   @Override
-  protected String getMetaDataQuery(Configuration conf) {
-    String sql = JdbcStorageConfigManager.getQueryToExecute(conf);
+  protected String getMetaDataQuery(String sql) {
     return addLimitToQuery(sql, 0);
   }
 }

http://git-wip-us.apache.org/repos/asf/hive/blob/3e5e77d1/jdbc-handler/src/main/java/org/apache/hive/storage/jdbc/dao/MySqlDatabaseAccessor.java
----------------------------------------------------------------------
diff --git a/jdbc-handler/src/main/java/org/apache/hive/storage/jdbc/dao/MySqlDatabaseAccessor.java b/jdbc-handler/src/main/java/org/apache/hive/storage/jdbc/dao/MySqlDatabaseAccessor.java
index 86fde7c..405ca4c 100644
--- a/jdbc-handler/src/main/java/org/apache/hive/storage/jdbc/dao/MySqlDatabaseAccessor.java
+++ b/jdbc-handler/src/main/java/org/apache/hive/storage/jdbc/dao/MySqlDatabaseAccessor.java
@@ -26,14 +26,26 @@ public class MySqlDatabaseAccessor extends GenericJdbcDatabaseAccessor {
       return addLimitToQuery(sql, limit);
     }
     else {
-      return sql + " LIMIT " + offset + "," + limit;
+      if (limit != -1) {
+        return sql + " LIMIT " + offset + "," + limit;
+      } else {
+        return sql;
+      }
     }
   }
 
 
   @Override
   protected String addLimitToQuery(String sql, int limit) {
-    return sql + " LIMIT " + limit;
+    if (limit != -1) {
+      return sql + " LIMIT " + limit;
+    } else {
+      return sql;
+    }
   }
 
+  @Override
+  public boolean needColumnQuote() {
+    return false;
+  }
 }

http://git-wip-us.apache.org/repos/asf/hive/blob/3e5e77d1/jdbc-handler/src/main/java/org/apache/hive/storage/jdbc/spitter/DateIntervalSplitter.java
----------------------------------------------------------------------
diff --git a/jdbc-handler/src/main/java/org/apache/hive/storage/jdbc/spitter/DateIntervalSplitter.java b/jdbc-handler/src/main/java/org/apache/hive/storage/jdbc/spitter/DateIntervalSplitter.java
new file mode 100644
index 0000000..664e61b
--- /dev/null
+++ b/jdbc-handler/src/main/java/org/apache/hive/storage/jdbc/spitter/DateIntervalSplitter.java
@@ -0,0 +1,42 @@
+/*
+ *
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.hive.storage.jdbc.spitter;
+
+import org.apache.commons.lang3.tuple.MutablePair;
+import org.apache.hadoop.hive.serde2.typeinfo.TypeInfo;
+
+import java.sql.Date;
+import java.util.ArrayList;
+import java.util.List;
+
+public class DateIntervalSplitter implements IntervalSplitter {
+  @Override
+  public List<MutablePair<String, String>> getIntervals(String lowerBound, String upperBound, int numPartitions, TypeInfo
+          typeInfo) {
+    List<MutablePair<String, String>> intervals = new ArrayList<>();
+    Date dateLower = Date.valueOf(lowerBound);
+    Date dateUpper = Date.valueOf(upperBound);
+    double dateInterval = (dateUpper.getTime() - dateLower.getTime())/(double)numPartitions;
+    Date splitDateLower, splitDateUpper;
+    for (int i=0;i<numPartitions;i++) {
+      splitDateLower = new Date(Math.round(dateLower.getTime() + dateInterval*i));
+      splitDateUpper = new Date(Math.round(dateLower.getTime() + dateInterval*(i+1)));
+      if (splitDateLower.compareTo(splitDateUpper) < 0) {
+        intervals.add(new MutablePair<String, String>(splitDateLower.toString(), splitDateUpper.toString()));
+      }
+    }
+    return intervals;
+  }
+}

http://git-wip-us.apache.org/repos/asf/hive/blob/3e5e77d1/jdbc-handler/src/main/java/org/apache/hive/storage/jdbc/spitter/DecimalIntervalSplitter.java
----------------------------------------------------------------------
diff --git a/jdbc-handler/src/main/java/org/apache/hive/storage/jdbc/spitter/DecimalIntervalSplitter.java b/jdbc-handler/src/main/java/org/apache/hive/storage/jdbc/spitter/DecimalIntervalSplitter.java
new file mode 100644
index 0000000..5636c7d
--- /dev/null
+++ b/jdbc-handler/src/main/java/org/apache/hive/storage/jdbc/spitter/DecimalIntervalSplitter.java
@@ -0,0 +1,50 @@
+/*
+ *
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.hive.storage.jdbc.spitter;
+
+import org.apache.commons.lang3.tuple.MutablePair;
+import org.apache.hadoop.hive.serde2.typeinfo.DecimalTypeInfo;
+import org.apache.hadoop.hive.serde2.typeinfo.TypeInfo;
+
+import java.math.BigDecimal;
+import java.math.MathContext;
+import java.math.RoundingMode;
+import java.util.ArrayList;
+import java.util.List;
+
+public class DecimalIntervalSplitter implements IntervalSplitter {
+  @Override
+  public List<MutablePair<String, String>> getIntervals(String lowerBound, String upperBound, int numPartitions, TypeInfo
+          typeInfo) {
+    List<MutablePair<String, String>> intervals = new ArrayList<>();
+    DecimalTypeInfo decimalTypeInfo = (DecimalTypeInfo)typeInfo;
+    int scale = decimalTypeInfo.getScale();
+    BigDecimal decimalLower = new BigDecimal(lowerBound);
+    BigDecimal decimalUpper = new BigDecimal(upperBound);
+    BigDecimal decimalInterval = (decimalUpper.subtract(decimalLower)).divide(new BigDecimal(numPartitions),
+            MathContext.DECIMAL64);
+    BigDecimal splitDecimalLower, splitDecimalUpper;
+    for (int i=0;i<numPartitions;i++) {
+      splitDecimalLower = decimalLower.add(decimalInterval.multiply(new BigDecimal(i))).setScale(scale,
+              RoundingMode.HALF_EVEN);
+      splitDecimalUpper = decimalLower.add(decimalInterval.multiply(new BigDecimal(i+1))).setScale(scale,
+              RoundingMode.HALF_EVEN);
+      if (splitDecimalLower.compareTo(splitDecimalUpper) < 0) {
+        intervals.add(new MutablePair<String, String>(splitDecimalLower.toPlainString(), splitDecimalUpper.toPlainString()));
+      }
+    }
+    return intervals;
+  }
+}

http://git-wip-us.apache.org/repos/asf/hive/blob/3e5e77d1/jdbc-handler/src/main/java/org/apache/hive/storage/jdbc/spitter/DoubleIntervalSplitter.java
----------------------------------------------------------------------
diff --git a/jdbc-handler/src/main/java/org/apache/hive/storage/jdbc/spitter/DoubleIntervalSplitter.java b/jdbc-handler/src/main/java/org/apache/hive/storage/jdbc/spitter/DoubleIntervalSplitter.java
new file mode 100644
index 0000000..aa955c2
--- /dev/null
+++ b/jdbc-handler/src/main/java/org/apache/hive/storage/jdbc/spitter/DoubleIntervalSplitter.java
@@ -0,0 +1,41 @@
+/*
+ *
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.hive.storage.jdbc.spitter;
+
+import org.apache.commons.lang3.tuple.MutablePair;
+import org.apache.hadoop.hive.serde2.typeinfo.TypeInfo;
+
+import java.util.ArrayList;
+import java.util.List;
+
+public class DoubleIntervalSplitter implements IntervalSplitter {
+  @Override
+  public List<MutablePair<String, String>> getIntervals(String lowerBound, String upperBound, int numPartitions, TypeInfo
+          typeInfo) {
+    List<MutablePair<String, String>> intervals = new ArrayList<>();
+    double doubleLower = Double.parseDouble(lowerBound);
+    double doubleUpper = Double.parseDouble(upperBound);
+    double doubleInterval = (doubleUpper - doubleLower)/(double)numPartitions;
+    double splitDoubleLower, splitDoubleUpper;
+    for (int i=0;i<numPartitions;i++) {
+      splitDoubleLower = doubleLower + doubleInterval*i;
+      splitDoubleUpper = doubleLower + doubleInterval*(i+1);
+      if (splitDoubleUpper > splitDoubleLower) {
+        intervals.add(new MutablePair<String, String>(Double.toString(splitDoubleLower), Double.toString(splitDoubleUpper)));
+      }
+    }
+    return intervals;
+  }
+}

http://git-wip-us.apache.org/repos/asf/hive/blob/3e5e77d1/jdbc-handler/src/main/java/org/apache/hive/storage/jdbc/spitter/IntervalSplitter.java
----------------------------------------------------------------------
diff --git a/jdbc-handler/src/main/java/org/apache/hive/storage/jdbc/spitter/IntervalSplitter.java b/jdbc-handler/src/main/java/org/apache/hive/storage/jdbc/spitter/IntervalSplitter.java
new file mode 100644
index 0000000..4f3455c
--- /dev/null
+++ b/jdbc-handler/src/main/java/org/apache/hive/storage/jdbc/spitter/IntervalSplitter.java
@@ -0,0 +1,24 @@
+/*
+ *
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.hive.storage.jdbc.spitter;
+
+import org.apache.commons.lang3.tuple.MutablePair;
+import org.apache.hadoop.hive.serde2.typeinfo.TypeInfo;
+
+import java.util.List;
+
+public interface IntervalSplitter {
+  List<MutablePair<String, String>> getIntervals(String lowerBound, String upperBound, int numPartitions, TypeInfo typeInfo);
+}
\ No newline at end of file

http://git-wip-us.apache.org/repos/asf/hive/blob/3e5e77d1/jdbc-handler/src/main/java/org/apache/hive/storage/jdbc/spitter/IntervalSplitterFactory.java
----------------------------------------------------------------------
diff --git a/jdbc-handler/src/main/java/org/apache/hive/storage/jdbc/spitter/IntervalSplitterFactory.java b/jdbc-handler/src/main/java/org/apache/hive/storage/jdbc/spitter/IntervalSplitterFactory.java
new file mode 100644
index 0000000..efa8c0c
--- /dev/null
+++ b/jdbc-handler/src/main/java/org/apache/hive/storage/jdbc/spitter/IntervalSplitterFactory.java
@@ -0,0 +1,45 @@
+/*
+ *
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.hive.storage.jdbc.spitter;
+
+import org.apache.hadoop.hive.serde2.typeinfo.PrimitiveTypeInfo;
+import org.apache.hadoop.hive.serde2.typeinfo.TypeInfo;
+
+import java.io.IOException;
+
+public class IntervalSplitterFactory {
+  public static IntervalSplitter newIntervalSpitter(TypeInfo typeInfo) throws IOException {
+    PrimitiveTypeInfo primitiveTypeInfo = (PrimitiveTypeInfo) typeInfo;
+    switch (primitiveTypeInfo.getPrimitiveCategory()) {
+      case BYTE:
+      case SHORT:
+      case INT:
+      case LONG:
+        return new LongIntervalSpitter();
+      case FLOAT:
+      case DOUBLE:
+        return new DoubleIntervalSplitter();
+      case DECIMAL:
+        return new DecimalIntervalSplitter();
+      case TIMESTAMP:
+        return new TimestampIntervalSplitter();
+      case DATE:
+        return new DateIntervalSplitter();
+      default:
+        throw new IOException("partitionColumn is " + primitiveTypeInfo.getPrimitiveCategory() +
+                ", only numeric/date/timestamp type can be a partition column");
+    }
+  }
+}

http://git-wip-us.apache.org/repos/asf/hive/blob/3e5e77d1/jdbc-handler/src/main/java/org/apache/hive/storage/jdbc/spitter/LongIntervalSpitter.java
----------------------------------------------------------------------
diff --git a/jdbc-handler/src/main/java/org/apache/hive/storage/jdbc/spitter/LongIntervalSpitter.java b/jdbc-handler/src/main/java/org/apache/hive/storage/jdbc/spitter/LongIntervalSpitter.java
new file mode 100644
index 0000000..e540fb8
--- /dev/null
+++ b/jdbc-handler/src/main/java/org/apache/hive/storage/jdbc/spitter/LongIntervalSpitter.java
@@ -0,0 +1,42 @@
+/*
+ *
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.hive.storage.jdbc.spitter;
+
+import org.apache.commons.lang3.tuple.MutablePair;
+import org.apache.hadoop.hive.serde2.typeinfo.TypeInfo;
+
+import java.util.ArrayList;
+import java.util.List;
+
+public class LongIntervalSpitter implements IntervalSplitter {
+
+  @Override
+  public List<MutablePair<String, String>> getIntervals(String lowerBound, String upperBound, int numPartitions, TypeInfo
+          typeInfo) {
+    List<MutablePair<String, String>> intervals = new ArrayList<>();
+    long longLower = Long.parseLong(lowerBound);
+    long longUpper = Long.parseLong(upperBound);
+    double longInterval = (longUpper - longLower) / (double) numPartitions;
+    long splitLongLower, splitLongUpper;
+    for (int i = 0; i < numPartitions; i++) {
+      splitLongLower = Math.round(longLower + longInterval * i);
+      splitLongUpper = Math.round(longLower + longInterval * (i + 1));
+      if (splitLongUpper > splitLongLower) {
+        intervals.add(new MutablePair<String, String>(Long.toString(splitLongLower), Long.toString(splitLongUpper)));
+      }
+    }
+    return intervals;
+  }
+}

http://git-wip-us.apache.org/repos/asf/hive/blob/3e5e77d1/jdbc-handler/src/main/java/org/apache/hive/storage/jdbc/spitter/TimestampIntervalSplitter.java
----------------------------------------------------------------------
diff --git a/jdbc-handler/src/main/java/org/apache/hive/storage/jdbc/spitter/TimestampIntervalSplitter.java b/jdbc-handler/src/main/java/org/apache/hive/storage/jdbc/spitter/TimestampIntervalSplitter.java
new file mode 100644
index 0000000..e948a5f
--- /dev/null
+++ b/jdbc-handler/src/main/java/org/apache/hive/storage/jdbc/spitter/TimestampIntervalSplitter.java
@@ -0,0 +1,43 @@
+/*
+ *
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.hive.storage.jdbc.spitter;
+
+import org.apache.commons.lang3.tuple.MutablePair;
+import org.apache.hadoop.hive.serde2.typeinfo.TypeInfo;
+
+import java.sql.Timestamp;
+import java.util.ArrayList;
+import java.util.List;
+
+public class TimestampIntervalSplitter implements IntervalSplitter {
+  @Override
+  public List<MutablePair<String, String>> getIntervals(String lowerBound, String upperBound, int numPartitions, TypeInfo
+          typeInfo) {
+    List<MutablePair<String, String>> intervals = new ArrayList<>();
+    Timestamp timestampLower = Timestamp.valueOf(lowerBound);
+    Timestamp timestampUpper = Timestamp.valueOf(upperBound);
+    // Note nano is not fully represented as the precision limit
+    double timestampInterval = (timestampUpper.getTime() - timestampLower.getTime())/(double)numPartitions;
+    Timestamp splitTimestampLower, splitTimestampUpper;
+    for (int i=0;i<numPartitions;i++) {
+      splitTimestampLower = new Timestamp(Math.round(timestampLower.getTime() + timestampInterval*i));
+      splitTimestampUpper = new Timestamp(Math.round(timestampLower.getTime() + timestampInterval*(i+1)));
+      if (splitTimestampLower.compareTo(splitTimestampUpper) < 0) {
+        intervals.add(new MutablePair<String, String>(splitTimestampLower.toString(), splitTimestampUpper.toString()));
+      }
+    }
+    return intervals;
+  }
+}

http://git-wip-us.apache.org/repos/asf/hive/blob/3e5e77d1/jdbc-handler/src/test/java/org/apache/hive/storage/jdbc/TestJdbcInputFormat.java
----------------------------------------------------------------------
diff --git a/jdbc-handler/src/test/java/org/apache/hive/storage/jdbc/TestJdbcInputFormat.java b/jdbc-handler/src/test/java/org/apache/hive/storage/jdbc/TestJdbcInputFormat.java
index b146633..cde97d6 100644
--- a/jdbc-handler/src/test/java/org/apache/hive/storage/jdbc/TestJdbcInputFormat.java
+++ b/jdbc-handler/src/test/java/org/apache/hive/storage/jdbc/TestJdbcInputFormat.java
@@ -14,7 +14,10 @@
  */
 package org.apache.hive.storage.jdbc;
 
+import com.google.common.collect.Lists;
+import org.apache.commons.lang3.tuple.ImmutablePair;
 import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.hive.serde.serdeConstants;
 import org.apache.hadoop.mapred.InputSplit;
 import org.apache.hadoop.mapred.JobConf;
 import org.apache.hive.storage.jdbc.dao.DatabaseAccessor;
@@ -33,7 +36,11 @@ import java.io.IOException;
 import static org.hamcrest.Matchers.is;
 import static org.hamcrest.Matchers.notNullValue;
 import static org.junit.Assert.assertThat;
+import static org.junit.Assert.assertEquals;
+import static org.junit.Assert.assertNull;
+import static org.junit.Assert.assertTrue;
 import static org.mockito.Matchers.any;
+import static org.mockito.Matchers.anyBoolean;
 import static org.mockito.Mockito.when;
 
 @RunWith(PowerMockRunner.class)
@@ -45,7 +52,7 @@ public class TestJdbcInputFormat {
 
 
   @Test
-  public void testSplitLogic_noSpillOver() throws HiveJdbcDatabaseAccessException, IOException {
+  public void testLimitSplit_noSpillOver() throws HiveJdbcDatabaseAccessException, IOException {
     PowerMockito.mockStatic(DatabaseAccessorFactory.class);
     BDDMockito.given(DatabaseAccessorFactory.getAccessor(any(Configuration.class))).willReturn(mockDatabaseAccessor);
     JdbcInputFormat f = new JdbcInputFormat();
@@ -53,7 +60,8 @@ public class TestJdbcInputFormat {
 
     JobConf conf = new JobConf();
     conf.set("mapred.input.dir", "/temp");
-    InputSplit[] splits = f.getSplits(conf, 3);
+    conf.set("hive.sql.numPartitions", "3");
+    InputSplit[] splits = f.getSplits(conf, -1);
 
     assertThat(splits, is(notNullValue()));
     assertThat(splits.length, is(3));
@@ -63,7 +71,7 @@ public class TestJdbcInputFormat {
 
 
   @Test
-  public void testSplitLogic_withSpillOver() throws HiveJdbcDatabaseAccessException, IOException {
+  public void testLimitSplit_withSpillOver() throws HiveJdbcDatabaseAccessException, IOException {
     PowerMockito.mockStatic(DatabaseAccessorFactory.class);
     BDDMockito.given(DatabaseAccessorFactory.getAccessor(any(Configuration.class))).willReturn(mockDatabaseAccessor);
     JdbcInputFormat f = new JdbcInputFormat();
@@ -71,7 +79,8 @@ public class TestJdbcInputFormat {
 
     JobConf conf = new JobConf();
     conf.set("mapred.input.dir", "/temp");
-    InputSplit[] splits = f.getSplits(conf, 6);
+    conf.set("hive.sql.numPartitions", "6");
+    InputSplit[] splits = f.getSplits(conf, -1);
 
     assertThat(splits, is(notNullValue()));
     assertThat(splits.length, is(6));
@@ -84,4 +93,191 @@ public class TestJdbcInputFormat {
       assertThat(splits[i].getLength(), is(2L));
     }
   }
+
+  @Test
+  public void testIntervalSplit_Long() throws HiveJdbcDatabaseAccessException, IOException {
+    PowerMockito.mockStatic(DatabaseAccessorFactory.class);
+    BDDMockito.given(DatabaseAccessorFactory.getAccessor(any(Configuration.class))).willReturn(mockDatabaseAccessor);
+    JdbcInputFormat f = new JdbcInputFormat();
+    when(mockDatabaseAccessor.getColumnNames(any(Configuration.class))).thenReturn(Lists.newArrayList("a"));
+
+    JobConf conf = new JobConf();
+    conf.set("mapred.input.dir", "/temp");
+    conf.set(serdeConstants.LIST_COLUMN_TYPES, "int");
+    conf.set("hive.sql.partitionColumn", "a");
+    conf.set("hive.sql.numPartitions", "3");
+    conf.set("hive.sql.lowerBound", "1");
+    conf.set("hive.sql.upperBound", "10");
+    InputSplit[] splits = f.getSplits(conf, -1);
+
+    assertThat(splits, is(notNullValue()));
+    assertThat(splits.length, is(3));
+
+    assertNull(((JdbcInputSplit)splits[0]).getLowerBound());
+    assertEquals(((JdbcInputSplit)splits[0]).getUpperBound(), "4");
+    assertEquals(((JdbcInputSplit)splits[1]).getLowerBound(), "4");
+    assertEquals(((JdbcInputSplit)splits[1]).getUpperBound(), "7");
+    assertEquals(((JdbcInputSplit)splits[2]).getLowerBound(), "7");
+    assertNull(((JdbcInputSplit)splits[2]).getUpperBound());
+  }
+
+  @Test
+  public void testIntervalSplit_Double() throws HiveJdbcDatabaseAccessException, IOException {
+    PowerMockito.mockStatic(DatabaseAccessorFactory.class);
+    BDDMockito.given(DatabaseAccessorFactory.getAccessor(any(Configuration.class))).willReturn(mockDatabaseAccessor);
+    JdbcInputFormat f = new JdbcInputFormat();
+    when(mockDatabaseAccessor.getColumnNames(any(Configuration.class))).thenReturn(Lists.newArrayList("a"));
+
+    JobConf conf = new JobConf();
+    conf.set("mapred.input.dir", "/temp");
+    conf.set(serdeConstants.LIST_COLUMN_TYPES, "double");
+    conf.set("hive.sql.partitionColumn", "a");
+    conf.set("hive.sql.numPartitions", "3");
+    conf.set("hive.sql.lowerBound", "0");
+    conf.set("hive.sql.upperBound", "10");
+    InputSplit[] splits = f.getSplits(conf, -1);
+
+    assertThat(splits, is(notNullValue()));
+    assertThat(splits.length, is(3));
+
+    assertNull(((JdbcInputSplit)splits[0]).getLowerBound());
+    assertTrue(Double.parseDouble(((JdbcInputSplit)splits[0]).getUpperBound()) > 3.3 && Double.parseDouble((
+            (JdbcInputSplit)splits[0]).getUpperBound()) < 3.4);
+    assertTrue(Double.parseDouble(((JdbcInputSplit)splits[1]).getLowerBound()) > 3.3 && Double.parseDouble((
+            (JdbcInputSplit)splits[1]).getLowerBound()) < 3.4);
+    assertTrue(Double.parseDouble(((JdbcInputSplit)splits[1]).getUpperBound()) > 6.6 && Double.parseDouble((
+            (JdbcInputSplit)splits[1]).getUpperBound()) < 6.7);
+    assertTrue(Double.parseDouble(((JdbcInputSplit)splits[2]).getLowerBound()) > 6.6 && Double.parseDouble((
+            (JdbcInputSplit)splits[2]).getLowerBound()) < 6.7);
+    assertNull(((JdbcInputSplit)splits[2]).getUpperBound());
+  }
+
+  @Test
+  public void testIntervalSplit_Decimal() throws HiveJdbcDatabaseAccessException, IOException {
+    PowerMockito.mockStatic(DatabaseAccessorFactory.class);
+    BDDMockito.given(DatabaseAccessorFactory.getAccessor(any(Configuration.class))).willReturn(mockDatabaseAccessor);
+    JdbcInputFormat f = new JdbcInputFormat();
+    when(mockDatabaseAccessor.getColumnNames(any(Configuration.class))).thenReturn(Lists.newArrayList("a"));
+
+    JobConf conf = new JobConf();
+    conf.set("mapred.input.dir", "/temp");
+    conf.set(serdeConstants.LIST_COLUMN_TYPES, "decimal(10,5)");
+    conf.set("hive.sql.partitionColumn", "a");
+    conf.set("hive.sql.numPartitions", "4");
+    conf.set("hive.sql.lowerBound", "5");
+    conf.set("hive.sql.upperBound", "1000");
+    InputSplit[] splits = f.getSplits(conf, -1);
+
+    assertThat(splits, is(notNullValue()));
+    assertThat(splits.length, is(4));
+
+    assertNull(((JdbcInputSplit)splits[0]).getLowerBound());
+    assertEquals(((JdbcInputSplit)splits[0]).getUpperBound(), "253.75000");
+    assertEquals(((JdbcInputSplit)splits[1]).getLowerBound(), "253.75000");
+    assertEquals(((JdbcInputSplit)splits[1]).getUpperBound(), "502.50000");
+    assertEquals(((JdbcInputSplit)splits[2]).getLowerBound(), "502.50000");
+    assertEquals(((JdbcInputSplit)splits[2]).getUpperBound(), "751.25000");
+    assertEquals(((JdbcInputSplit)splits[3]).getLowerBound(), "751.25000");
+    assertNull(((JdbcInputSplit)splits[3]).getUpperBound());
+  }
+
+  @Test
+  public void testIntervalSplit_Timestamp() throws HiveJdbcDatabaseAccessException, IOException {
+    PowerMockito.mockStatic(DatabaseAccessorFactory.class);
+    BDDMockito.given(DatabaseAccessorFactory.getAccessor(any(Configuration.class))).willReturn(mockDatabaseAccessor);
+    JdbcInputFormat f = new JdbcInputFormat();
+    when(mockDatabaseAccessor.getColumnNames(any(Configuration.class))).thenReturn(Lists.newArrayList("a"));
+    when(mockDatabaseAccessor.getBounds(any(Configuration.class), any(String.class), anyBoolean(), anyBoolean()))
+            .thenReturn(new ImmutablePair<String, String>("2010-01-01 00:00:00.000000000", "2018-01-01 " +
+            "12:00:00.000000000"));
+
+    JobConf conf = new JobConf();
+    conf.set("mapred.input.dir", "/temp");
+    conf.set(serdeConstants.LIST_COLUMN_TYPES, "timestamp");
+    conf.set("hive.sql.partitionColumn", "a");
+    conf.set("hive.sql.numPartitions", "2");
+    InputSplit[] splits = f.getSplits(conf, -1);
+
+    assertThat(splits, is(notNullValue()));
+    assertThat(splits.length, is(2));
+
+    assertNull(((JdbcInputSplit)splits[0]).getLowerBound());
+    assertEquals(((JdbcInputSplit)splits[0]).getUpperBound(), "2014-01-01 06:00:00.0");
+    assertEquals(((JdbcInputSplit)splits[1]).getLowerBound(), "2014-01-01 06:00:00.0");
+    assertNull(((JdbcInputSplit)splits[1]).getUpperBound());
+  }
+
+  @Test
+  public void testIntervalSplit_Date() throws HiveJdbcDatabaseAccessException, IOException {
+    PowerMockito.mockStatic(DatabaseAccessorFactory.class);
+    BDDMockito.given(DatabaseAccessorFactory.getAccessor(any(Configuration.class))).willReturn(mockDatabaseAccessor);
+    JdbcInputFormat f = new JdbcInputFormat();
+    when(mockDatabaseAccessor.getColumnNames(any(Configuration.class))).thenReturn(Lists.newArrayList("a"));
+    when(mockDatabaseAccessor.getBounds(any(Configuration.class), any(String.class), anyBoolean(), anyBoolean()))
+            .thenReturn(new ImmutablePair<String, String>("2010-01-01", "2018-01-01"));
+
+    JobConf conf = new JobConf();
+    conf.set("mapred.input.dir", "/temp");
+    conf.set(serdeConstants.LIST_COLUMN_TYPES, "date");
+    conf.set("hive.sql.partitionColumn", "a");
+    conf.set("hive.sql.numPartitions", "3");
+    InputSplit[] splits = f.getSplits(conf, -1);
+
+    assertThat(splits, is(notNullValue()));
+    assertThat(splits.length, is(3));
+
+    assertNull(((JdbcInputSplit)splits[0]).getLowerBound());
+    assertEquals(((JdbcInputSplit)splits[0]).getUpperBound(), "2012-09-01");
+    assertEquals(((JdbcInputSplit)splits[1]).getLowerBound(), "2012-09-01");
+    assertEquals(((JdbcInputSplit)splits[1]).getUpperBound(), "2015-05-03");
+    assertEquals(((JdbcInputSplit)splits[2]).getLowerBound(), "2015-05-03");
+    assertNull(((JdbcInputSplit)splits[2]).getUpperBound());
+  }
+
+  @Test
+  public void testIntervalSplit_AutoShrink() throws HiveJdbcDatabaseAccessException, IOException {
+    PowerMockito.mockStatic(DatabaseAccessorFactory.class);
+    BDDMockito.given(DatabaseAccessorFactory.getAccessor(any(Configuration.class))).willReturn(mockDatabaseAccessor);
+    JdbcInputFormat f = new JdbcInputFormat();
+    when(mockDatabaseAccessor.getColumnNames(any(Configuration.class))).thenReturn(Lists.newArrayList("a"));
+
+    JobConf conf = new JobConf();
+    conf.set("mapred.input.dir", "/temp");
+    conf.set(serdeConstants.LIST_COLUMN_TYPES, "int");
+    conf.set("hive.sql.partitionColumn", "a");
+    conf.set("hive.sql.numPartitions", "5");
+    conf.set("hive.sql.lowerBound", "2");
+    conf.set("hive.sql.upperBound", "4");
+    InputSplit[] splits = f.getSplits(conf, -1);
+
+    assertThat(splits, is(notNullValue()));
+    assertThat(splits.length, is(2));
+
+    assertNull(((JdbcInputSplit)splits[0]).getLowerBound());
+    assertEquals(((JdbcInputSplit)splits[0]).getUpperBound(), "3");
+    assertEquals(((JdbcInputSplit)splits[1]).getLowerBound(), "3");
+    assertNull(((JdbcInputSplit)splits[1]).getUpperBound());
+  }
+
+  @Test
+  public void testIntervalSplit_NoSplit() throws HiveJdbcDatabaseAccessException, IOException {
+    PowerMockito.mockStatic(DatabaseAccessorFactory.class);
+    BDDMockito.given(DatabaseAccessorFactory.getAccessor(any(Configuration.class))).willReturn(mockDatabaseAccessor);
+    JdbcInputFormat f = new JdbcInputFormat();
+    when(mockDatabaseAccessor.getColumnNames(any(Configuration.class))).thenReturn(Lists.newArrayList("a"));
+
+    JobConf conf = new JobConf();
+    conf.set("mapred.input.dir", "/temp");
+    conf.set(serdeConstants.LIST_COLUMN_TYPES, "int");
+    conf.set("hive.sql.partitionColumn", "a");
+    conf.set("hive.sql.numPartitions", "5");
+    conf.set("hive.sql.lowerBound", "1");
+    conf.set("hive.sql.upperBound", "2");
+    InputSplit[] splits = f.getSplits(conf, -1);
+
+    assertThat(splits, is(notNullValue()));
+    assertThat(splits.length, is(1));
+
+    assertNull(((JdbcInputSplit)splits[0]).getPartitionColumn());
+  }
 }

http://git-wip-us.apache.org/repos/asf/hive/blob/3e5e77d1/jdbc-handler/src/test/java/org/apache/hive/storage/jdbc/dao/TestGenericJdbcDatabaseAccessor.java
----------------------------------------------------------------------
diff --git a/jdbc-handler/src/test/java/org/apache/hive/storage/jdbc/dao/TestGenericJdbcDatabaseAccessor.java b/jdbc-handler/src/test/java/org/apache/hive/storage/jdbc/dao/TestGenericJdbcDatabaseAccessor.java
index 34f061e..545a71f 100644
--- a/jdbc-handler/src/test/java/org/apache/hive/storage/jdbc/dao/TestGenericJdbcDatabaseAccessor.java
+++ b/jdbc-handler/src/test/java/org/apache/hive/storage/jdbc/dao/TestGenericJdbcDatabaseAccessor.java
@@ -15,6 +15,7 @@
 package org.apache.hive.storage.jdbc.dao;
 
 import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.hive.serde.serdeConstants;
 import org.apache.hive.storage.jdbc.conf.JdbcStorageConfig;
 import org.apache.hive.storage.jdbc.exception.HiveJdbcDatabaseAccessException;
 import org.junit.Test;
@@ -111,7 +112,7 @@ public class TestGenericJdbcDatabaseAccessor {
   public void testGetRecordIterator() throws HiveJdbcDatabaseAccessException {
     Configuration conf = buildConfiguration();
     DatabaseAccessor accessor = DatabaseAccessorFactory.getAccessor(conf);
-    JdbcRecordIterator iterator = accessor.getRecordIterator(conf, 2, 0);
+    JdbcRecordIterator iterator = accessor.getRecordIterator(conf, null, null, null,2, 0);
 
     assertThat(iterator, is(notNullValue()));
 
@@ -122,7 +123,7 @@ public class TestGenericJdbcDatabaseAccessor {
 
       assertThat(record, is(notNullValue()));
       assertThat(record.size(), is(equalTo(7)));
-      assertThat(record.get("STRATEGY_ID"), is(equalTo(count)));
+      assertThat(record.get("strategy_id"), is(equalTo(count)));
     }
 
     assertThat(count, is(equalTo(2)));
@@ -134,7 +135,7 @@ public class TestGenericJdbcDatabaseAccessor {
   public void testGetRecordIterator_offsets() throws HiveJdbcDatabaseAccessException {
     Configuration conf = buildConfiguration();
     DatabaseAccessor accessor = DatabaseAccessorFactory.getAccessor(conf);
-    JdbcRecordIterator iterator = accessor.getRecordIterator(conf, 2, 2);
+    JdbcRecordIterator iterator = accessor.getRecordIterator(conf, null, null, null, 2, 2);
 
     assertThat(iterator, is(notNullValue()));
 
@@ -145,7 +146,7 @@ public class TestGenericJdbcDatabaseAccessor {
 
       assertThat(record, is(notNullValue()));
       assertThat(record.size(), is(equalTo(7)));
-      assertThat(record.get("STRATEGY_ID"), is(equalTo(count + 2)));
+      assertThat(record.get("strategy_id"), is(equalTo(count + 2)));
     }
 
     assertThat(count, is(equalTo(2)));
@@ -158,7 +159,7 @@ public class TestGenericJdbcDatabaseAccessor {
     Configuration conf = buildConfiguration();
     conf.set(JdbcStorageConfig.QUERY.getPropertyName(), "select * from test_strategy where strategy_id = '25'");
     DatabaseAccessor accessor = DatabaseAccessorFactory.getAccessor(conf);
-    JdbcRecordIterator iterator = accessor.getRecordIterator(conf, 0, 2);
+    JdbcRecordIterator iterator = accessor.getRecordIterator(conf, null, null, null, 0, 2);
 
     assertThat(iterator, is(notNullValue()));
     assertThat(iterator.hasNext(), is(false));
@@ -170,7 +171,7 @@ public class TestGenericJdbcDatabaseAccessor {
   public void testGetRecordIterator_largeOffset() throws HiveJdbcDatabaseAccessException {
     Configuration conf = buildConfiguration();
     DatabaseAccessor accessor = DatabaseAccessorFactory.getAccessor(conf);
-    JdbcRecordIterator iterator = accessor.getRecordIterator(conf, 10, 25);
+    JdbcRecordIterator iterator = accessor.getRecordIterator(conf, null, null, null, 10, 25);
 
     assertThat(iterator, is(notNullValue()));
     assertThat(iterator.hasNext(), is(false));
@@ -184,7 +185,7 @@ public class TestGenericJdbcDatabaseAccessor {
     conf.set(JdbcStorageConfig.QUERY.getPropertyName(), "select * from strategyx");
     DatabaseAccessor accessor = DatabaseAccessorFactory.getAccessor(conf);
     @SuppressWarnings("unused")
-      JdbcRecordIterator iterator = accessor.getRecordIterator(conf, 0, 2);
+      JdbcRecordIterator iterator = accessor.getRecordIterator(conf, null, null, null, 0, 2);
   }
 
 
@@ -198,7 +199,8 @@ public class TestGenericJdbcDatabaseAccessor {
     config.set(JdbcStorageConfig.JDBC_URL.getPropertyName(), "jdbc:h2:mem:test;MODE=MySQL;INIT=runscript from '"
         + scriptPath + "'");
     config.set(JdbcStorageConfig.QUERY.getPropertyName(), "select * from test_strategy");
-
+    config.set(serdeConstants.LIST_COLUMNS, "strategy_id,name,referrer,landing,priority,implementation,last_modified");
+    config.set(serdeConstants.LIST_COLUMN_TYPES, "int,string,string,string,int,string,timestamp");
     return config;
   }
 

http://git-wip-us.apache.org/repos/asf/hive/blob/3e5e77d1/ql/src/test/queries/clientpositive/external_jdbc_table_partition.q
----------------------------------------------------------------------
diff --git a/ql/src/test/queries/clientpositive/external_jdbc_table_partition.q b/ql/src/test/queries/clientpositive/external_jdbc_table_partition.q
new file mode 100644
index 0000000..f285d17
--- /dev/null
+++ b/ql/src/test/queries/clientpositive/external_jdbc_table_partition.q
@@ -0,0 +1,135 @@
+--! qt:dataset:src
+
+CREATE TEMPORARY FUNCTION dboutput AS 'org.apache.hadoop.hive.contrib.genericudf.example.GenericUDFDBOutput';
+
+FROM src
+SELECT
+dboutput ('jdbc:derby:;databaseName=${system:test.tmp.dir}/test_derby2;create=true','user','passwd',
+'CREATE TABLE EXTERNAL_JDBC_PARTITION_TABLE1 ("ikey" INTEGER, "bkey" BIGINT, "fkey" REAL, "dkey" DOUBLE, "chkey" VARCHAR(20), "dekey" DECIMAL(6,4), "dtkey" DATE, "tkey" TIMESTAMP)' ),
+dboutput('jdbc:derby:;databaseName=${system:test.tmp.dir}/test_derby2','user','passwd',
+'INSERT INTO EXTERNAL_JDBC_PARTITION_TABLE1 ("ikey","bkey","fkey","dkey","chkey","dekey","dtkey","tkey") VALUES (?,?,?,?,?,?,?,?)','1','1000','20.0','40.0','aaa','3.1415','2010-01-01','2018-01-01 12:00:00.000000000'),
+dboutput('jdbc:derby:;databaseName=${system:test.tmp.dir}/test_derby2','user','passwd',
+'INSERT INTO EXTERNAL_JDBC_PARTITION_TABLE1 ("ikey","bkey","fkey","dkey","chkey","dekey","dtkey","tkey") VALUES (?,?,?,?,?,?,?,?)','5','9000',null,'10.0','bbb','2.7182','2018-01-01','2010-06-01 14:00:00.000000000'),
+dboutput('jdbc:derby:;databaseName=${system:test.tmp.dir}/test_derby2','user','passwd',
+'INSERT INTO EXTERNAL_JDBC_PARTITION_TABLE1 ("ikey","bkey","fkey","dkey","chkey","dekey","dtkey","tkey") VALUES (?,?,?,?,?,?,?,?)','3','4000','120.0','25.4','hello','2.7182','2017-06-05','2011-11-10 18:00:08.000000000'),
+dboutput('jdbc:derby:;databaseName=${system:test.tmp.dir}/test_derby2','user','passwd',
+'INSERT INTO EXTERNAL_JDBC_PARTITION_TABLE1 ("ikey","bkey","fkey","dkey","chkey","dekey","dtkey","tkey") VALUES (?,?,?,?,?,?,?,?)','8','3000','180.0','35.8','world','3.1415','2014-03-03','2016-07-04 13:00:00.000000000'),
+dboutput('jdbc:derby:;databaseName=${system:test.tmp.dir}/test_derby2','user','passwd',
+'INSERT INTO EXTERNAL_JDBC_PARTITION_TABLE1 ("ikey","bkey","fkey","dkey","chkey","dekey","dtkey","tkey") VALUES (?,?,?,?,?,?,?,?)','4','8000','120.4','31.3','ccc',null,'2014-03-04','2018-07-08 11:00:00.000000000')
+limit 1;
+
+-- integer partition column
+-- lower/upper bound unset
+CREATE EXTERNAL TABLE jdbc_partition_table1
+(
+ ikey int,
+ bkey bigint,
+ fkey float,
+ dkey double,
+ chkey string,
+ dekey decimal(5,3),
+ dtkey date,
+ tkey timestamp
+)
+STORED BY 'org.apache.hive.storage.jdbc.JdbcStorageHandler'
+TBLPROPERTIES (
+                "hive.sql.database.type" = "DERBY",
+                "hive.sql.jdbc.driver" = "org.apache.derby.jdbc.EmbeddedDriver",
+                "hive.sql.jdbc.url" = "jdbc:derby:;databaseName=${system:test.tmp.dir}/test_derby2;collation=TERRITORY_BASED:PRIMARY",
+                "hive.sql.dbcp.username" = "user",
+                "hive.sql.dbcp.password" = "passwd",
+                "hive.sql.table" = "EXTERNAL_JDBC_PARTITION_TABLE1",
+                "hive.sql.dbcp.maxActive" = "1",
+                "hive.sql.partitionColumn" = "ikey",
+                "hive.sql.numPartitions" = "2"
+);
+
+SELECT * FROM jdbc_partition_table1;
+
+-- decimal partition column
+-- lower/upper bound unset
+CREATE EXTERNAL TABLE jdbc_partition_table2
+(
+ ikey int,
+ bkey bigint,
+ fkey float,
+ dkey double,
+ chkey string,
+ dekey decimal(5,3),
+ dtkey date,
+ tkey timestamp
+)
+STORED BY 'org.apache.hive.storage.jdbc.JdbcStorageHandler'
+TBLPROPERTIES (
+                "hive.sql.database.type" = "DERBY",
+                "hive.sql.jdbc.driver" = "org.apache.derby.jdbc.EmbeddedDriver",
+                "hive.sql.jdbc.url" = "jdbc:derby:;databaseName=${system:test.tmp.dir}/test_derby2;collation=TERRITORY_BASED:PRIMARY",
+                "hive.sql.dbcp.username" = "user",
+                "hive.sql.dbcp.password" = "passwd",
+                "hive.sql.table" = "EXTERNAL_JDBC_PARTITION_TABLE1",
+                "hive.sql.dbcp.maxActive" = "1",
+                "hive.sql.partitionColumn" = "dekey",
+                "hive.sql.numPartitions" = "2"
+);
+
+SELECT * FROM jdbc_partition_table2;
+
+-- float partition column
+-- lower/upper bound set
+CREATE EXTERNAL TABLE jdbc_partition_table3
+(
+ ikey int,
+ bkey bigint,
+ fkey float,
+ dkey double,
+ chkey string,
+ dekey decimal(5,3),
+ dtkey date,
+ tkey timestamp
+)
+STORED BY 'org.apache.hive.storage.jdbc.JdbcStorageHandler'
+TBLPROPERTIES (
+                "hive.sql.database.type" = "DERBY",
+                "hive.sql.jdbc.driver" = "org.apache.derby.jdbc.EmbeddedDriver",
+                "hive.sql.jdbc.url" = "jdbc:derby:;databaseName=${system:test.tmp.dir}/test_derby2;collation=TERRITORY_BASED:PRIMARY",
+                "hive.sql.dbcp.username" = "user",
+                "hive.sql.dbcp.password" = "passwd",
+                "hive.sql.table" = "EXTERNAL_JDBC_PARTITION_TABLE1",
+                "hive.sql.dbcp.maxActive" = "1",
+                "hive.sql.partitionColumn" = "fkey",
+                "hive.sql.lowerBound" = "0",
+                "hive.sql.upperBound" = "200",
+                "hive.sql.partitionColumn" = "fkey",
+                "hive.sql.numPartitions" = "2"
+);
+
+SELECT * FROM jdbc_partition_table3;
+
+-- transform push to table
+SELECT ikey+1 FROM jdbc_partition_table3;
+
+-- partition column in query not table
+CREATE EXTERNAL TABLE jdbc_partition_table4
+(
+ ikey int,
+ bkey bigint,
+ fkey float,
+ dkey double
+)
+STORED BY 'org.apache.hive.storage.jdbc.JdbcStorageHandler'
+TBLPROPERTIES (
+                "hive.sql.database.type" = "DERBY",
+                "hive.sql.jdbc.driver" = "org.apache.derby.jdbc.EmbeddedDriver",
+                "hive.sql.jdbc.url" = "jdbc:derby:;databaseName=${system:test.tmp.dir}/test_derby2;collation=TERRITORY_BASED:PRIMARY",
+                "hive.sql.dbcp.username" = "user",
+                "hive.sql.dbcp.password" = "passwd",
+                "hive.sql.query" = "SELECT \"ikey\",\"bkey\",\"fkey\",\"dkey\" FROM EXTERNAL_JDBC_PARTITION_TABLE1 WHERE \"ikey\">1",
+                "hive.sql.dbcp.maxActive" = "1",
+                "hive.sql.partitionColumn" = "fkey",
+                "hive.sql.lowerBound" = "0",
+                "hive.sql.upperBound" = "200",
+                "hive.sql.partitionColumn" = "fkey",
+                "hive.sql.numPartitions" = "2"
+);
+
+SELECT * FROM jdbc_partition_table4;