You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@hive.apache.org by da...@apache.org on 2018/10/19 00:24:00 UTC
[2/2] hive git commit: HIVE-20720: Add partition column option to
JDBC handler (Daniel Dai, reviewed by Jesus Camacho Rodriguez)
HIVE-20720: Add partition column option to JDBC handler (Daniel Dai, reviewed by Jesus Camacho Rodriguez)
Signed-off-by: Jesus Camacho Rodriguez <jc...@apache.org>
Project: http://git-wip-us.apache.org/repos/asf/hive/repo
Commit: http://git-wip-us.apache.org/repos/asf/hive/commit/3e5e77d1
Tree: http://git-wip-us.apache.org/repos/asf/hive/tree/3e5e77d1
Diff: http://git-wip-us.apache.org/repos/asf/hive/diff/3e5e77d1
Branch: refs/heads/master
Commit: 3e5e77d1f12e52796ac34d32517b09e5887cd695
Parents: a2bbf79
Author: Daniel Dai <da...@gmail.com>
Authored: Thu Oct 18 17:18:05 2018 -0700
Committer: Daniel Dai <da...@gmail.com>
Committed: Thu Oct 18 17:18:47 2018 -0700
----------------------------------------------------------------------
.../org/apache/hadoop/hive/conf/Constants.java | 4 +
.../test/resources/testconfiguration.properties | 2 +
.../hive/storage/jdbc/JdbcInputFormat.java | 114 +++++--
.../hive/storage/jdbc/JdbcInputSplit.java | 54 ++++
.../hive/storage/jdbc/JdbcRecordReader.java | 3 +-
.../org/apache/hive/storage/jdbc/JdbcSerDe.java | 63 ++--
.../jdbc/conf/JdbcStorageConfigManager.java | 16 +
.../hive/storage/jdbc/dao/DatabaseAccessor.java | 12 +-
.../jdbc/dao/GenericJdbcDatabaseAccessor.java | 216 ++++++++-----
.../storage/jdbc/dao/JdbcRecordIterator.java | 92 +++++-
.../jdbc/dao/JethroDatabaseAccessor.java | 3 +-
.../storage/jdbc/dao/MySqlDatabaseAccessor.java | 16 +-
.../jdbc/spitter/DateIntervalSplitter.java | 42 +++
.../jdbc/spitter/DecimalIntervalSplitter.java | 50 +++
.../jdbc/spitter/DoubleIntervalSplitter.java | 41 +++
.../storage/jdbc/spitter/IntervalSplitter.java | 24 ++
.../jdbc/spitter/IntervalSplitterFactory.java | 45 +++
.../jdbc/spitter/LongIntervalSpitter.java | 42 +++
.../jdbc/spitter/TimestampIntervalSplitter.java | 43 +++
.../hive/storage/jdbc/TestJdbcInputFormat.java | 204 +++++++++++-
.../dao/TestGenericJdbcDatabaseAccessor.java | 18 +-
.../external_jdbc_table_partition.q | 135 ++++++++
.../external_jdbc_table_typeconversion.q | 119 +++++++
.../llap/external_jdbc_table_partition.q.out | 319 +++++++++++++++++++
.../external_jdbc_table_typeconversion.q.out | 280 ++++++++++++++++
25 files changed, 1787 insertions(+), 170 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/hive/blob/3e5e77d1/common/src/java/org/apache/hadoop/hive/conf/Constants.java
----------------------------------------------------------------------
diff --git a/common/src/java/org/apache/hadoop/hive/conf/Constants.java b/common/src/java/org/apache/hadoop/hive/conf/Constants.java
index 1190679..61bc9df 100644
--- a/common/src/java/org/apache/hadoop/hive/conf/Constants.java
+++ b/common/src/java/org/apache/hadoop/hive/conf/Constants.java
@@ -73,6 +73,10 @@ public class Constants {
public static final String JDBC_QUERY_FIELD_NAMES = JDBC_CONFIG_PREFIX + ".query.fieldNames";
public static final String JDBC_QUERY_FIELD_TYPES = JDBC_CONFIG_PREFIX + ".query.fieldTypes";
public static final String JDBC_SPLIT_QUERY = JDBC_CONFIG_PREFIX + ".query.split";
+ public static final String JDBC_PARTITION_COLUMN = JDBC_CONFIG_PREFIX + ".partitionColumn";
+ public static final String JDBC_NUM_PARTITIONS = JDBC_CONFIG_PREFIX + ".numPartitions";
+ public static final String JDBC_LOW_BOUND = JDBC_CONFIG_PREFIX + ".lowerBound";
+ public static final String JDBC_UPPER_BOUND = JDBC_CONFIG_PREFIX + ".upperBound";
public static final String HIVE_SERVER2_JOB_CREDSTORE_PASSWORD_ENVVAR = "HIVE_JOB_CREDSTORE_PASSWORD";
public static final String HADOOP_CREDENTIAL_PASSWORD_ENVVAR = "HADOOP_CREDSTORE_PASSWORD";
http://git-wip-us.apache.org/repos/asf/hive/blob/3e5e77d1/itests/src/test/resources/testconfiguration.properties
----------------------------------------------------------------------
diff --git a/itests/src/test/resources/testconfiguration.properties b/itests/src/test/resources/testconfiguration.properties
index b6d42c6..8349e3d 100644
--- a/itests/src/test/resources/testconfiguration.properties
+++ b/itests/src/test/resources/testconfiguration.properties
@@ -520,6 +520,8 @@ minillaplocal.query.files=\
external_jdbc_auth.q,\
external_jdbc_table.q,\
external_jdbc_table2.q,\
+ external_jdbc_table_partition.q,\
+ external_jdbc_table_typeconversion.q,\
fullouter_mapjoin_1_optimized.q,\
groupby2.q,\
groupby_groupingset_bug.q,\
http://git-wip-us.apache.org/repos/asf/hive/blob/3e5e77d1/jdbc-handler/src/main/java/org/apache/hive/storage/jdbc/JdbcInputFormat.java
----------------------------------------------------------------------
diff --git a/jdbc-handler/src/main/java/org/apache/hive/storage/jdbc/JdbcInputFormat.java b/jdbc-handler/src/main/java/org/apache/hive/storage/jdbc/JdbcInputFormat.java
index 74999db..8a0a26a 100644
--- a/jdbc-handler/src/main/java/org/apache/hive/storage/jdbc/JdbcInputFormat.java
+++ b/jdbc-handler/src/main/java/org/apache/hive/storage/jdbc/JdbcInputFormat.java
@@ -15,9 +15,15 @@
package org.apache.hive.storage.jdbc;
+import org.apache.commons.lang3.tuple.MutablePair;
+import org.apache.commons.lang3.tuple.Pair;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.hive.conf.Constants;
import org.apache.hadoop.hive.ql.io.HiveInputFormat;
+import org.apache.hadoop.hive.serde.serdeConstants;
+import org.apache.hadoop.hive.serde2.typeinfo.PrimitiveTypeInfo;
+import org.apache.hadoop.hive.serde2.typeinfo.TypeInfo;
+import org.apache.hadoop.hive.serde2.typeinfo.TypeInfoUtils;
import org.apache.hadoop.io.LongWritable;
import org.apache.hadoop.io.MapWritable;
import org.apache.hadoop.mapred.FileInputFormat;
@@ -25,6 +31,8 @@ import org.apache.hadoop.mapred.InputSplit;
import org.apache.hadoop.mapred.JobConf;
import org.apache.hadoop.mapred.RecordReader;
import org.apache.hadoop.mapred.Reporter;
+import org.apache.hive.storage.jdbc.spitter.IntervalSplitter;
+import org.apache.hive.storage.jdbc.spitter.IntervalSplitterFactory;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
@@ -32,6 +40,7 @@ import org.apache.hive.storage.jdbc.dao.DatabaseAccessor;
import org.apache.hive.storage.jdbc.dao.DatabaseAccessorFactory;
import java.io.IOException;
+import java.util.List;
public class JdbcInputFormat extends HiveInputFormat<LongWritable, MapWritable> {
@@ -61,47 +70,100 @@ public class JdbcInputFormat extends HiveInputFormat<LongWritable, MapWritable>
public InputSplit[] getSplits(JobConf job, int numSplits) throws IOException {
try {
- if (!job.getBoolean(Constants.JDBC_SPLIT_QUERY, true)) {
- // We will not split this query
- LOGGER.debug("Creating 1 input splits");
- InputSplit[] splits = new InputSplit[1];
+ String partitionColumn = job.get(Constants.JDBC_PARTITION_COLUMN);
+ int numPartitions = job.getInt(Constants.JDBC_NUM_PARTITIONS, -1);
+ String lowerBound = job.get(Constants.JDBC_LOW_BOUND);
+ String upperBound = job.get(Constants.JDBC_UPPER_BOUND);
+
+ InputSplit[] splits;
+
+ if (!job.getBoolean(Constants.JDBC_SPLIT_QUERY, true) || numPartitions <= 1) {
+ // We will not split this query if:
+ // 1. hive.sql.query.split is set to false (either manually or automatically by calcite
+ // 2. numPartitions == 1
+ splits = new InputSplit[1];
splits[0] = new JdbcInputSplit(FileInputFormat.getInputPaths(job)[0]);
+ LOGGER.info("Creating 1 input split " + splits[0]);
return splits;
}
- // We will split this query into n splits
- LOGGER.debug("Creating {} input splits", numSplits);
dbAccessor = DatabaseAccessorFactory.getAccessor(job);
+ Path[] tablePaths = FileInputFormat.getInputPaths(job);
- int numRecords = numSplits <=1 ? Integer.MAX_VALUE : dbAccessor.getTotalNumberOfRecords(job);
+ // We will split this query into n splits
+ LOGGER.debug("Creating {} input splits", numPartitions);
- if (numRecords < numSplits) {
- numSplits = numRecords;
- }
+ if (partitionColumn != null) {
+ List<String> columnNames = dbAccessor.getColumnNames(job);
+ if (!columnNames.contains(partitionColumn)) {
+ throw new IOException("Cannot find partitionColumn:" + partitionColumn + " in " + columnNames);
+ }
+ List<TypeInfo> hiveColumnTypesList = TypeInfoUtils.getTypeInfosFromTypeString(job.get(serdeConstants.LIST_COLUMN_TYPES));
+ TypeInfo typeInfo = hiveColumnTypesList.get(columnNames.indexOf(partitionColumn));
+ if (!(typeInfo instanceof PrimitiveTypeInfo)) {
+ throw new IOException(partitionColumn + " is a complex type, only primitive type can be a partition column");
+ }
+ if (lowerBound == null || upperBound == null) {
+ Pair<String, String> boundary = dbAccessor.getBounds(job, partitionColumn, lowerBound == null,
+ upperBound == null);
+ if (lowerBound == null) {
+ lowerBound = boundary.getLeft();
+ }
+ if (upperBound == null) {
+ upperBound = boundary.getRight();
+ }
+ }
+ if (lowerBound == null) {
+ throw new IOException("lowerBound of " + partitionColumn + " cannot be null");
+ }
+ if (upperBound == null) {
+ throw new IOException("upperBound of " + partitionColumn + " cannot be null");
+ }
+ IntervalSplitter intervalSplitter = IntervalSplitterFactory.newIntervalSpitter(typeInfo);
+ List<MutablePair<String, String>> intervals = intervalSplitter.getIntervals(lowerBound, upperBound, numPartitions,
+ typeInfo);
+ if (intervals.size()<=1) {
+ LOGGER.debug("Creating 1 input splits");
+ splits = new InputSplit[1];
+ splits[0] = new JdbcInputSplit(FileInputFormat.getInputPaths(job)[0]);
+ return splits;
+ }
+ intervals.get(0).setLeft(null);
+ intervals.get(intervals.size()-1).setRight(null);
+ splits = new InputSplit[intervals.size()];
+ for (int i = 0; i < intervals.size(); i++) {
+ splits[i] = new JdbcInputSplit(partitionColumn, intervals.get(i).getLeft(), intervals.get(i).getRight());
+ }
+ } else {
+ int numRecords = dbAccessor.getTotalNumberOfRecords(job);
- if (numSplits <= 0) {
- numSplits = 1;
- }
+ if (numRecords < numPartitions) {
+ numPartitions = numRecords;
+ }
- int numRecordsPerSplit = numRecords / numSplits;
- int numSplitsWithExtraRecords = numRecords % numSplits;
+ int numRecordsPerSplit = numRecords / numPartitions;
+ int numSplitsWithExtraRecords = numRecords % numPartitions;
- LOGGER.debug("Num records = {}", numRecords);
- InputSplit[] splits = new InputSplit[numSplits];
- Path[] tablePaths = FileInputFormat.getInputPaths(job);
+ LOGGER.debug("Num records = {}", numRecords);
+ splits = new InputSplit[numPartitions];
- int offset = 0;
- for (int i = 0; i < numSplits; i++) {
- int numRecordsInThisSplit = numRecordsPerSplit;
- if (i < numSplitsWithExtraRecords) {
- numRecordsInThisSplit++;
- }
+ int offset = 0;
+ for (int i = 0; i < numPartitions; i++) {
+ int numRecordsInThisSplit = numRecordsPerSplit;
+ if (i < numSplitsWithExtraRecords) {
+ numRecordsInThisSplit++;
+ }
- splits[i] = new JdbcInputSplit(numRecordsInThisSplit, offset, tablePaths[0]);
- offset += numRecordsInThisSplit;
+ splits[i] = new JdbcInputSplit(numRecordsInThisSplit, offset, tablePaths[0]);
+ offset += numRecordsInThisSplit;
+ }
}
dbAccessor = null;
+ LOGGER.info("Num input splits created {}", splits.length);
+ for (InputSplit split : splits) {
+ LOGGER.info("split:" + split.toString());
+ }
return splits;
}
catch (Exception e) {
http://git-wip-us.apache.org/repos/asf/hive/blob/3e5e77d1/jdbc-handler/src/main/java/org/apache/hive/storage/jdbc/JdbcInputSplit.java
----------------------------------------------------------------------
diff --git a/jdbc-handler/src/main/java/org/apache/hive/storage/jdbc/JdbcInputSplit.java b/jdbc-handler/src/main/java/org/apache/hive/storage/jdbc/JdbcInputSplit.java
index 3a6ada8..a10ed75 100644
--- a/jdbc-handler/src/main/java/org/apache/hive/storage/jdbc/JdbcInputSplit.java
+++ b/jdbc-handler/src/main/java/org/apache/hive/storage/jdbc/JdbcInputSplit.java
@@ -28,6 +28,9 @@ public class JdbcInputSplit extends FileSplit implements InputSplit {
private int limit = 0;
private int offset = 0;
+ private String partitionColumn = null;
+ private String lowerBound = null;
+ private String upperBound = null;
public JdbcInputSplit() {
@@ -54,12 +57,26 @@ public class JdbcInputSplit extends FileSplit implements InputSplit {
this.offset = offset;
}
+ public JdbcInputSplit(String partitionColumn, String lowerBound, String upperBound) {
+ super(null, 0, 0, EMPTY_ARRAY);
+ this.partitionColumn = partitionColumn;
+ this.lowerBound = lowerBound;
+ this.upperBound = upperBound;
+ }
@Override
public void write(DataOutput out) throws IOException {
super.write(out);
out.writeInt(limit);
out.writeInt(offset);
+ if (partitionColumn != null) {
+ out.writeBoolean(true);
+ out.writeUTF(partitionColumn);
+ out.writeUTF(lowerBound);
+ out.writeUTF(upperBound);
+ } else {
+ out.writeBoolean(false);
+ }
}
@@ -68,6 +85,12 @@ public class JdbcInputSplit extends FileSplit implements InputSplit {
super.readFields(in);
limit = in.readInt();
offset = in.readInt();
+ boolean partitionColumnExists = in.readBoolean();
+ if (partitionColumnExists) {
+ partitionColumn = in.readUTF();
+ lowerBound = in.readUTF();
+ upperBound = in.readUTF();
+ }
}
@@ -102,4 +125,35 @@ public class JdbcInputSplit extends FileSplit implements InputSplit {
this.offset = offset;
}
+ public String getPartitionColumn() {
+ return this.partitionColumn;
+ }
+
+ public String getLowerBound() {
+ return this.lowerBound;
+ }
+
+ public String getUpperBound() {
+ return this.upperBound;
+ }
+
+ @Override
+ public String toString() {
+ StringBuilder sb = new StringBuilder();
+ if (partitionColumn != null) {
+ sb.append("interval:");
+ sb.append(partitionColumn).append("[");
+ if (lowerBound != null) {
+ sb.append(lowerBound);
+ }
+ sb.append(",");
+ if (upperBound != null) {
+ sb.append(upperBound);
+ }
+ sb.append(")");
+ } else {
+ sb.append("limit:" + limit + ", offset:" + offset);
+ }
+ return sb.toString();
+ }
}
http://git-wip-us.apache.org/repos/asf/hive/blob/3e5e77d1/jdbc-handler/src/main/java/org/apache/hive/storage/jdbc/JdbcRecordReader.java
----------------------------------------------------------------------
diff --git a/jdbc-handler/src/main/java/org/apache/hive/storage/jdbc/JdbcRecordReader.java b/jdbc-handler/src/main/java/org/apache/hive/storage/jdbc/JdbcRecordReader.java
index 1da6213..a3248b4 100644
--- a/jdbc-handler/src/main/java/org/apache/hive/storage/jdbc/JdbcRecordReader.java
+++ b/jdbc-handler/src/main/java/org/apache/hive/storage/jdbc/JdbcRecordReader.java
@@ -55,7 +55,8 @@ public class JdbcRecordReader implements RecordReader<LongWritable, MapWritable>
LOGGER.trace("JdbcRecordReader.next called");
if (dbAccessor == null) {
dbAccessor = DatabaseAccessorFactory.getAccessor(conf);
- iterator = dbAccessor.getRecordIterator(conf, split.getLimit(), split.getOffset());
+ iterator = dbAccessor.getRecordIterator(conf, split.getPartitionColumn(), split.getLowerBound(), split
+ .getUpperBound(), split.getLimit(), split.getOffset());
}
if (iterator.hasNext()) {
http://git-wip-us.apache.org/repos/asf/hive/blob/3e5e77d1/jdbc-handler/src/main/java/org/apache/hive/storage/jdbc/JdbcSerDe.java
----------------------------------------------------------------------
diff --git a/jdbc-handler/src/main/java/org/apache/hive/storage/jdbc/JdbcSerDe.java b/jdbc-handler/src/main/java/org/apache/hive/storage/jdbc/JdbcSerDe.java
index 5947628..b68340c 100644
--- a/jdbc-handler/src/main/java/org/apache/hive/storage/jdbc/JdbcSerDe.java
+++ b/jdbc-handler/src/main/java/org/apache/hive/storage/jdbc/JdbcSerDe.java
@@ -16,9 +16,10 @@ package org.apache.hive.storage.jdbc;
import com.google.common.base.Preconditions;
import org.apache.hadoop.conf.Configuration;
-import org.apache.hadoop.hive.conf.Constants;
import org.apache.hadoop.hive.common.type.Date;
+import org.apache.hadoop.hive.common.type.HiveDecimal;
import org.apache.hadoop.hive.common.type.Timestamp;
+import org.apache.hadoop.hive.conf.Constants;
import org.apache.hadoop.hive.serde.serdeConstants;
import org.apache.hadoop.hive.serde2.AbstractSerDe;
import org.apache.hadoop.hive.serde2.SerDeException;
@@ -27,6 +28,7 @@ import org.apache.hadoop.hive.serde2.objectinspector.ObjectInspector;
import org.apache.hadoop.hive.serde2.objectinspector.ObjectInspector.Category;
import org.apache.hadoop.hive.serde2.objectinspector.ObjectInspectorFactory;
import org.apache.hadoop.hive.serde2.objectinspector.primitive.PrimitiveObjectInspectorFactory;
+import org.apache.hadoop.hive.serde2.typeinfo.DecimalTypeInfo;
import org.apache.hadoop.hive.serde2.typeinfo.PrimitiveTypeInfo;
import org.apache.hadoop.hive.serde2.typeinfo.TypeInfo;
import org.apache.hadoop.hive.serde2.typeinfo.TypeInfoUtils;
@@ -35,13 +37,13 @@ import org.apache.hadoop.io.MapWritable;
import org.apache.hadoop.io.ObjectWritable;
import org.apache.hadoop.io.Text;
import org.apache.hadoop.io.Writable;
+import org.apache.hive.storage.jdbc.conf.JdbcStorageConfigManager;
+import org.apache.hive.storage.jdbc.dao.DatabaseAccessor;
+import org.apache.hive.storage.jdbc.dao.DatabaseAccessorFactory;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import org.apache.hive.storage.jdbc.conf.JdbcStorageConfig;
-import org.apache.hive.storage.jdbc.conf.JdbcStorageConfigManager;
-import org.apache.hive.storage.jdbc.dao.DatabaseAccessor;
-import org.apache.hive.storage.jdbc.dao.DatabaseAccessorFactory;
import java.math.BigDecimal;
import java.util.ArrayList;
@@ -73,42 +75,33 @@ public class JdbcSerDe extends AbstractSerDe {
if (properties.containsKey(JdbcStorageConfig.DATABASE_TYPE.getPropertyName())) {
Configuration tableConfig = JdbcStorageConfigManager.convertPropertiesToConfiguration(properties);
DatabaseAccessor dbAccessor = DatabaseAccessorFactory.getAccessor(tableConfig);
-
- // Extract information from properties
- String[] jdbcColumnNamesArray;
- List<TypeInfo> hiveColumnTypesArray;
+ // Extract column names and types from properties
+ List<TypeInfo> hiveColumnTypesList;
if (properties.containsKey(Constants.JDBC_TABLE) && properties.containsKey(Constants.JDBC_QUERY)) {
// The query has been autogenerated by Hive, the column names are the
// same in the query pushed and the list of hiveColumnNames
String fieldNamesProperty =
- Preconditions.checkNotNull(properties.getProperty(Constants.JDBC_QUERY_FIELD_NAMES, null));
+ Preconditions.checkNotNull(properties.getProperty(Constants.JDBC_QUERY_FIELD_NAMES, null));
String fieldTypesProperty =
- Preconditions.checkNotNull(properties.getProperty(Constants.JDBC_QUERY_FIELD_TYPES, null));
+ Preconditions.checkNotNull(properties.getProperty(Constants.JDBC_QUERY_FIELD_TYPES, null));
hiveColumnNames = fieldNamesProperty.trim().split(",");
- jdbcColumnNamesArray = hiveColumnNames;
- hiveColumnTypesArray = TypeInfoUtils.getTypeInfosFromTypeString(fieldTypesProperty);
+ hiveColumnTypesList = TypeInfoUtils.getTypeInfosFromTypeString(fieldTypesProperty);
} else {
- // The query was hardcoded by user or we are creating the table.
- // We obtain the column names with the db accessor.
- List<String> columnNames = dbAccessor.getColumnNames(tableConfig);
- hiveColumnNames = columnNames.toArray(new String[columnNames.size()]);
- // These are the column names for the table defined with the JDBC storage handler.
- jdbcColumnNamesArray = parseProperty(properties.getProperty(serdeConstants.LIST_COLUMNS), ",");
- if (hiveColumnNames.length != jdbcColumnNamesArray.length) {
- throw new SerDeException("Expected " + hiveColumnNames.length + " hiveColumnNames. Table definition has "
- + jdbcColumnNamesArray.length + " hiveColumnNames");
- }
- hiveColumnTypesArray = TypeInfoUtils.getTypeInfosFromTypeString(properties.getProperty(serdeConstants.LIST_COLUMN_TYPES));
+ hiveColumnNames = properties.getProperty(serdeConstants.LIST_COLUMNS).split(",");
+ hiveColumnTypesList = TypeInfoUtils.getTypeInfosFromTypeString(properties.getProperty(serdeConstants.LIST_COLUMN_TYPES));
+ }
+ if (hiveColumnNames.length == 0) {
+ throw new SerDeException("Received an empty Hive column name definition");
}
- if (hiveColumnTypesArray.size() == 0) {
+ if (hiveColumnTypesList.size() == 0) {
throw new SerDeException("Received an empty Hive column type definition");
}
// Populate column types and inspector
- hiveColumnTypes = new PrimitiveTypeInfo[hiveColumnTypesArray.size()];
+ hiveColumnTypes = new PrimitiveTypeInfo[hiveColumnTypesList.size()];
List<ObjectInspector> fieldInspectors = new ArrayList<>(hiveColumnNames.length);
for (int i = 0; i < hiveColumnNames.length; i++) {
- TypeInfo ti = hiveColumnTypesArray.get(i);
+ TypeInfo ti = hiveColumnTypesList.get(i);
if (ti.getCategory() != Category.PRIMITIVE) {
throw new SerDeException("Non primitive types not supported yet");
}
@@ -117,7 +110,7 @@ public class JdbcSerDe extends AbstractSerDe {
PrimitiveObjectInspectorFactory.getPrimitiveJavaObjectInspector(hiveColumnTypes[i]));
}
inspector =
- ObjectInspectorFactory.getStandardStructObjectInspector(Arrays.asList(jdbcColumnNamesArray),
+ ObjectInspectorFactory.getStandardStructObjectInspector(Arrays.asList(hiveColumnNames),
fieldInspectors);
row = new ArrayList<>(hiveColumnNames.length);
}
@@ -128,16 +121,6 @@ public class JdbcSerDe extends AbstractSerDe {
}
}
-
- private String[] parseProperty(String propertyValue, String delimiter) {
- if ((propertyValue == null) || (propertyValue.trim().isEmpty())) {
- return new String[] {};
- }
-
- return propertyValue.split(delimiter);
- }
-
-
@Override
public Object deserialize(Writable blob) throws SerDeException {
LOGGER.trace("Deserializing from SerDe");
@@ -195,9 +178,9 @@ public class JdbcSerDe extends AbstractSerDe {
}
break;
case DECIMAL:
- if (!(rowVal instanceof BigDecimal)) {
- rowVal = new BigDecimal(rowVal.toString());
- }
+ int scale = ((DecimalTypeInfo)hiveColumnTypes[i]).getScale();
+ rowVal = HiveDecimal.create(rowVal.toString());
+ ((HiveDecimal)rowVal).setScale(scale, BigDecimal.ROUND_HALF_EVEN);
break;
case BOOLEAN:
if (rowVal instanceof Number) {
http://git-wip-us.apache.org/repos/asf/hive/blob/3e5e77d1/jdbc-handler/src/main/java/org/apache/hive/storage/jdbc/conf/JdbcStorageConfigManager.java
----------------------------------------------------------------------
diff --git a/jdbc-handler/src/main/java/org/apache/hive/storage/jdbc/conf/JdbcStorageConfigManager.java b/jdbc-handler/src/main/java/org/apache/hive/storage/jdbc/conf/JdbcStorageConfigManager.java
index 18e2397..5679f1b 100644
--- a/jdbc-handler/src/main/java/org/apache/hive/storage/jdbc/conf/JdbcStorageConfigManager.java
+++ b/jdbc-handler/src/main/java/org/apache/hive/storage/jdbc/conf/JdbcStorageConfigManager.java
@@ -124,6 +124,22 @@ public class JdbcStorageConfigManager {
return config.get(key.getPropertyName());
}
+ public static String getOrigQueryToExecute(Configuration config) {
+ String query;
+ String tableName = config.get(Constants.JDBC_TABLE);
+ if (tableName != null) {
+ // We generate query as select *
+ query = "select * from " + tableName;
+ String hiveFilterCondition = QueryConditionBuilder.getInstance().buildCondition(config);
+ if ((hiveFilterCondition != null) && (!hiveFilterCondition.trim().isEmpty())) {
+ query = query + " WHERE " + hiveFilterCondition;
+ }
+ } else {
+ query = config.get(Constants.JDBC_QUERY);
+ }
+
+ return query;
+ }
public static String getQueryToExecute(Configuration config) {
String query = config.get(Constants.JDBC_QUERY);
http://git-wip-us.apache.org/repos/asf/hive/blob/3e5e77d1/jdbc-handler/src/main/java/org/apache/hive/storage/jdbc/dao/DatabaseAccessor.java
----------------------------------------------------------------------
diff --git a/jdbc-handler/src/main/java/org/apache/hive/storage/jdbc/dao/DatabaseAccessor.java b/jdbc-handler/src/main/java/org/apache/hive/storage/jdbc/dao/DatabaseAccessor.java
index fdaa794..a6d0306 100644
--- a/jdbc-handler/src/main/java/org/apache/hive/storage/jdbc/dao/DatabaseAccessor.java
+++ b/jdbc-handler/src/main/java/org/apache/hive/storage/jdbc/dao/DatabaseAccessor.java
@@ -14,8 +14,10 @@
*/
package org.apache.hive.storage.jdbc.dao;
+import org.apache.commons.lang3.tuple.Pair;
import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.hive.serde2.typeinfo.PrimitiveTypeInfo;
import org.apache.hive.storage.jdbc.exception.HiveJdbcDatabaseAccessException;
import java.util.List;
@@ -24,11 +26,15 @@ public interface DatabaseAccessor {
List<String> getColumnNames(Configuration conf) throws HiveJdbcDatabaseAccessException;
- List<String> getColumnTypes(Configuration conf) throws HiveJdbcDatabaseAccessException;
-
int getTotalNumberOfRecords(Configuration conf) throws HiveJdbcDatabaseAccessException;
JdbcRecordIterator
- getRecordIterator(Configuration conf, int limit, int offset) throws HiveJdbcDatabaseAccessException;
+ getRecordIterator(Configuration conf, String partitionColumn, String lowerBound, String upperBound, int limit, int
+ offset) throws
+ HiveJdbcDatabaseAccessException;
+
+ Pair<String, String> getBounds(Configuration conf, String partitionColumn, boolean lower, boolean upper) throws
+ HiveJdbcDatabaseAccessException;
+ boolean needColumnQuote();
}
http://git-wip-us.apache.org/repos/asf/hive/blob/3e5e77d1/jdbc-handler/src/main/java/org/apache/hive/storage/jdbc/dao/GenericJdbcDatabaseAccessor.java
----------------------------------------------------------------------
diff --git a/jdbc-handler/src/main/java/org/apache/hive/storage/jdbc/dao/GenericJdbcDatabaseAccessor.java b/jdbc-handler/src/main/java/org/apache/hive/storage/jdbc/dao/GenericJdbcDatabaseAccessor.java
index abdc5f0..607c45c 100644
--- a/jdbc-handler/src/main/java/org/apache/hive/storage/jdbc/dao/GenericJdbcDatabaseAccessor.java
+++ b/jdbc-handler/src/main/java/org/apache/hive/storage/jdbc/dao/GenericJdbcDatabaseAccessor.java
@@ -14,11 +14,15 @@
*/
package org.apache.hive.storage.jdbc.dao;
+import com.google.common.base.Preconditions;
import org.apache.commons.dbcp.BasicDataSourceFactory;
+import org.apache.commons.lang3.tuple.ImmutablePair;
+import org.apache.commons.lang3.tuple.Pair;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.hive.conf.Constants;
import org.apache.hadoop.hive.ql.exec.Utilities;
import org.apache.hadoop.hive.serde.serdeConstants;
+import org.apache.hadoop.hive.serde2.typeinfo.PrimitiveTypeInfo;
import org.apache.hadoop.io.Text;
import org.apache.hadoop.security.Credentials;
import org.apache.hadoop.security.UserGroupInformation;
@@ -43,6 +47,8 @@ import java.util.List;
import java.util.Map;
import java.util.Map.Entry;
import java.util.Properties;
+import java.util.regex.Matcher;
+import java.util.regex.Pattern;
/**
* A data accessor that should in theory work with all JDBC compliant database drivers.
@@ -53,6 +59,7 @@ public class GenericJdbcDatabaseAccessor implements DatabaseAccessor {
protected static final int DEFAULT_FETCH_SIZE = 1000;
protected static final Logger LOGGER = LoggerFactory.getLogger(GenericJdbcDatabaseAccessor.class);
protected DataSource dbcpDataSource = null;
+ static final Pattern fromPattern = Pattern.compile("(.*?\\sfrom\\s)(.*+)", Pattern.CASE_INSENSITIVE);
public GenericJdbcDatabaseAccessor() {
@@ -67,7 +74,8 @@ public class GenericJdbcDatabaseAccessor implements DatabaseAccessor {
try {
initializeDatabaseConnection(conf);
- String metadataQuery = getMetaDataQuery(conf);
+ String query = JdbcStorageConfigManager.getOrigQueryToExecute(conf);
+ String metadataQuery = getMetaDataQuery(query);
LOGGER.debug("Query to execute is [{}]", metadataQuery);
conn = dbcpDataSource.getConnection();
@@ -94,75 +102,11 @@ public class GenericJdbcDatabaseAccessor implements DatabaseAccessor {
}
- protected String getMetaDataQuery(Configuration conf) {
- String sql = JdbcStorageConfigManager.getQueryToExecute(conf);
- String metadataQuery = addLimitToQuery(sql, 1);
- return metadataQuery;
+ protected String getMetaDataQuery(String sql) {
+ return addLimitToQuery(sql, 1);
}
@Override
- public List<String> getColumnTypes(Configuration conf) throws HiveJdbcDatabaseAccessException {
- Connection conn = null;
- PreparedStatement ps = null;
- ResultSet rs = null;
-
- try {
- initializeDatabaseConnection(conf);
- String metadataQuery = getMetaDataQuery(conf);
- LOGGER.debug("Query to execute is [{}]", metadataQuery);
-
- conn = dbcpDataSource.getConnection();
- ps = conn.prepareStatement(metadataQuery);
- rs = ps.executeQuery();
-
- ResultSetMetaData metadata = rs.getMetaData();
- int numColumns = metadata.getColumnCount();
- List<String> columnTypes = new ArrayList<String>(numColumns);
- for (int i = 0; i < numColumns; i++) {
- switch (metadata.getColumnType(i + 1)) {
- case Types.CHAR:
- columnTypes.add(serdeConstants.STRING_TYPE_NAME);
- break;
- case Types.INTEGER:
- columnTypes.add(serdeConstants.INT_TYPE_NAME);
- break;
- case Types.BIGINT:
- columnTypes.add(serdeConstants.BIGINT_TYPE_NAME);
- break;
- case Types.DECIMAL:
- columnTypes.add(serdeConstants.DECIMAL_TYPE_NAME);
- break;
- case Types.FLOAT:
- case Types.REAL:
- columnTypes.add(serdeConstants.FLOAT_TYPE_NAME);
- break;
- case Types.DOUBLE:
- columnTypes.add(serdeConstants.DOUBLE_TYPE_NAME);
- break;
- case Types.DATE:
- columnTypes.add(serdeConstants.DATE_TYPE_NAME);
- break;
- case Types.TIMESTAMP:
- columnTypes.add(serdeConstants.TIMESTAMP_TYPE_NAME);
- break;
-
- default:
- columnTypes.add(metadata.getColumnTypeName(i+1));
- break;
- }
- }
-
- return columnTypes;
- } catch (Exception e) {
- LOGGER.error("Error while trying to get column names.", e);
- throw new HiveJdbcDatabaseAccessException("Error while trying to get column names: " + e.getMessage(), e);
- } finally {
- cleanupResources(conn, ps, rs);
- }
- }
-
-
- @Override
public int getTotalNumberOfRecords(Configuration conf) throws HiveJdbcDatabaseAccessException {
Connection conn = null;
PreparedStatement ps = null;
@@ -200,7 +144,9 @@ public class GenericJdbcDatabaseAccessor implements DatabaseAccessor {
@Override
public JdbcRecordIterator
- getRecordIterator(Configuration conf, int limit, int offset) throws HiveJdbcDatabaseAccessException {
+ getRecordIterator(Configuration conf, String partitionColumn, String lowerBound, String upperBound, int limit, int
+ offset) throws
+ HiveJdbcDatabaseAccessException {
Connection conn = null;
PreparedStatement ps = null;
@@ -208,16 +154,22 @@ public class GenericJdbcDatabaseAccessor implements DatabaseAccessor {
try {
initializeDatabaseConnection(conf);
+ String tableName = conf.get(Constants.JDBC_TABLE);
String sql = JdbcStorageConfigManager.getQueryToExecute(conf);
- String limitQuery = addLimitAndOffsetToQuery(sql, limit, offset);
- LOGGER.info("Query to execute is [{}]", limitQuery);
+ String partitionQuery;
+ if (partitionColumn != null) {
+ partitionQuery = addBoundaryToQuery(tableName, sql, partitionColumn, lowerBound, upperBound);
+ } else {
+ partitionQuery = addLimitAndOffsetToQuery(sql, limit, offset);
+ }
+ LOGGER.info("Query to execute is [{}]", partitionQuery);
conn = dbcpDataSource.getConnection();
- ps = conn.prepareStatement(limitQuery, ResultSet.TYPE_FORWARD_ONLY, ResultSet.CONCUR_READ_ONLY);
+ ps = conn.prepareStatement(partitionQuery, ResultSet.TYPE_FORWARD_ONLY, ResultSet.CONCUR_READ_ONLY);
ps.setFetchSize(getFetchSize(conf));
rs = ps.executeQuery();
- return new JdbcRecordIterator(conn, ps, rs);
+ return new JdbcRecordIterator(conn, ps, rs, conf);
}
catch (Exception e) {
LOGGER.error("Caught exception while trying to execute query", e);
@@ -245,7 +197,6 @@ public class GenericJdbcDatabaseAccessor implements DatabaseAccessor {
}
}
-
/*
* Uses generic JDBC escape functions to add a limit clause to a query string
*/
@@ -256,6 +207,54 @@ public class GenericJdbcDatabaseAccessor implements DatabaseAccessor {
return sql + " {LIMIT " + limit + "}";
}
+ protected String addBoundaryToQuery(String tableName, String sql, String partitionColumn, String lowerBound,
+ String upperBound) {
+ String boundaryQuery;
+ if (tableName != null) {
+ boundaryQuery = "SELECT * FROM " + tableName + " WHERE ";
+ } else {
+ boundaryQuery = "SELECT * FROM (" + sql + ") tmptable WHERE ";
+ }
+ if (lowerBound != null) {
+ boundaryQuery += quote() + partitionColumn + quote() + " >= " + lowerBound;
+ }
+ if (upperBound != null) {
+ if (lowerBound != null) {
+ boundaryQuery += " AND ";
+ }
+ boundaryQuery += quote() + partitionColumn + quote() + " < " + upperBound;
+ }
+ if (lowerBound == null && upperBound != null) {
+ boundaryQuery += " OR " + quote() + partitionColumn + quote() + " IS NULL";
+ }
+ String result;
+ if (tableName != null) {
+ // Looking for table name in from clause, replace with the boundary query
+ // TODO consolidate this
+ // Currently only use simple string match, this should be improved by looking
+ // for only table name in from clause
+ String tableString = null;
+ Matcher m = fromPattern.matcher(sql);
+ Preconditions.checkArgument(m.matches());
+ String queryBeforeFrom = m.group(1);
+ String queryAfterFrom = m.group(2);
+
+ Character[] possibleDelimits = new Character[] {'`', '\"', ' '};
+ for (Character possibleDelimit : possibleDelimits) {
+ if (queryAfterFrom.contains(possibleDelimit + tableName + possibleDelimit)) {
+ tableString = possibleDelimit + tableName + possibleDelimit;
+ break;
+ }
+ }
+ if (tableString == null) {
+ throw new RuntimeException("Cannot find " + tableName + " in sql query " + sql);
+ }
+ result = queryBeforeFrom + queryAfterFrom.replace(tableString, " (" + boundaryQuery + ") " + tableName + " ");
+ } else {
+ result = boundaryQuery;
+ }
+ return result;
+ }
protected void cleanupResources(Connection conn, PreparedStatement ps, ResultSet rs) {
try {
@@ -344,4 +343,75 @@ public class GenericJdbcDatabaseAccessor implements DatabaseAccessor {
protected int getFetchSize(Configuration conf) {
return conf.getInt(JdbcStorageConfig.JDBC_FETCH_SIZE.getPropertyName(), DEFAULT_FETCH_SIZE);
}
+
+ @Override
+ public Pair<String, String> getBounds(Configuration conf, String partitionColumn, boolean retrieveMin, boolean
+ retrieveMax) throws HiveJdbcDatabaseAccessException {
+ Connection conn = null;
+ PreparedStatement ps = null;
+ ResultSet rs = null;
+
+ try {
+ Preconditions.checkArgument(retrieveMin || retrieveMax);
+ initializeDatabaseConnection(conf);
+ String sql = JdbcStorageConfigManager.getOrigQueryToExecute(conf);
+ String minClause = "MIN(" + quote() + partitionColumn + quote() + ")";
+ String maxClause = "MAX(" + quote() + partitionColumn + quote() + ")";
+ String countQuery = "SELECT ";
+ if (retrieveMin) {
+ countQuery += minClause;
+ }
+ if (retrieveMax) {
+ if (retrieveMin) {
+ countQuery += ",";
+ }
+ countQuery += maxClause;
+ }
+ countQuery += " FROM (" + sql + ") tmptable " + "WHERE " + quote() + partitionColumn + quote() + " IS NOT NULL";
+
+ LOGGER.debug("MIN/MAX Query to execute is [{}]", countQuery);
+
+ conn = dbcpDataSource.getConnection();
+ ps = conn.prepareStatement(countQuery);
+ rs = ps.executeQuery();
+ String lower = null, upper = null;
+ int pos = 1;
+ if (rs.next()) {
+ if (retrieveMin) {
+ lower = rs.getString(pos);
+ pos++;
+ }
+ if (retrieveMax) {
+ upper = rs.getString(pos);
+ }
+ return new ImmutablePair<>(lower, upper);
+ }
+ else {
+ LOGGER.warn("The count query did not return any results.", countQuery);
+ throw new HiveJdbcDatabaseAccessException("MIN/MAX query did not return any results.");
+ }
+ }
+ catch (HiveJdbcDatabaseAccessException he) {
+ throw he;
+ }
+ catch (Exception e) {
+ LOGGER.error("Caught exception while trying to get MIN/MAX of " + partitionColumn, e);
+ throw new HiveJdbcDatabaseAccessException(e);
+ }
+ finally {
+ cleanupResources(conn, ps, rs);
+ }
+ }
+
+ private String quote() {
+ if (needColumnQuote()) {
+ return "\"";
+ } else {
+ return "";
+ }
+ }
+ @Override
+ public boolean needColumnQuote() {
+ return true;
+ }
}
http://git-wip-us.apache.org/repos/asf/hive/blob/3e5e77d1/jdbc-handler/src/main/java/org/apache/hive/storage/jdbc/dao/JdbcRecordIterator.java
----------------------------------------------------------------------
diff --git a/jdbc-handler/src/main/java/org/apache/hive/storage/jdbc/dao/JdbcRecordIterator.java b/jdbc-handler/src/main/java/org/apache/hive/storage/jdbc/dao/JdbcRecordIterator.java
index a95aca2..27538f7 100644
--- a/jdbc-handler/src/main/java/org/apache/hive/storage/jdbc/dao/JdbcRecordIterator.java
+++ b/jdbc-handler/src/main/java/org/apache/hive/storage/jdbc/dao/JdbcRecordIterator.java
@@ -14,15 +14,24 @@
*/
package org.apache.hive.storage.jdbc.dao;
+import com.google.common.base.Preconditions;
+import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.hive.conf.Constants;
+import org.apache.hadoop.hive.serde.serdeConstants;
+import org.apache.hadoop.hive.serde2.objectinspector.PrimitiveObjectInspector;
+import org.apache.hadoop.hive.serde2.typeinfo.PrimitiveTypeInfo;
+import org.apache.hadoop.hive.serde2.typeinfo.TypeInfo;
+import org.apache.hadoop.hive.serde2.typeinfo.TypeInfoUtils;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import java.sql.Connection;
import java.sql.PreparedStatement;
import java.sql.ResultSet;
-import java.sql.ResultSetMetaData;
+import java.sql.SQLDataException;
import java.util.HashMap;
import java.util.Iterator;
+import java.util.List;
import java.util.Map;
/**
@@ -35,12 +44,24 @@ public class JdbcRecordIterator implements Iterator<Map<String, Object>> {
private Connection conn;
private PreparedStatement ps;
private ResultSet rs;
+ private String[] hiveColumnNames;
+ List<TypeInfo> hiveColumnTypesList;
-
- public JdbcRecordIterator(Connection conn, PreparedStatement ps, ResultSet rs) {
+ public JdbcRecordIterator(Connection conn, PreparedStatement ps, ResultSet rs, Configuration conf) {
this.conn = conn;
this.ps = ps;
this.rs = rs;
+ String fieldNamesProperty;
+ String fieldTypesProperty;
+ if (conf.get(Constants.JDBC_TABLE) != null && conf.get(Constants.JDBC_QUERY) != null) {
+ fieldNamesProperty = Preconditions.checkNotNull(conf.get(Constants.JDBC_QUERY_FIELD_NAMES));
+ fieldTypesProperty = Preconditions.checkNotNull(conf.get(Constants.JDBC_QUERY_FIELD_TYPES));
+ } else {
+ fieldNamesProperty = Preconditions.checkNotNull(conf.get(serdeConstants.LIST_COLUMNS));
+ fieldTypesProperty = Preconditions.checkNotNull(conf.get(serdeConstants.LIST_COLUMN_TYPES));
+ }
+ hiveColumnNames = fieldNamesProperty.trim().split(",");
+ hiveColumnTypesList = TypeInfoUtils.getTypeInfosFromTypeString(fieldTypesProperty);
}
@@ -59,14 +80,63 @@ public class JdbcRecordIterator implements Iterator<Map<String, Object>> {
@Override
public Map<String, Object> next() {
try {
- ResultSetMetaData metadata = rs.getMetaData();
- int numColumns = metadata.getColumnCount();
- Map<String, Object> record = new HashMap<String, Object>(numColumns);
- for (int i = 0; i < numColumns; i++) {
- String key = metadata.getColumnName(i + 1);
- Object value = rs.getObject(i + 1);
-
- record.put(key, value);
+ Map<String, Object> record = new HashMap<String, Object>(hiveColumnNames.length);
+ for (int i = 0; i < hiveColumnNames.length; i++) {
+ String key = hiveColumnNames[i];
+ Object value = null;
+ if (!(hiveColumnTypesList.get(i) instanceof PrimitiveTypeInfo)) {
+ throw new RuntimeException("date type of column " + hiveColumnNames[i] + ":" +
+ hiveColumnTypesList.get(i).getTypeName() + " is not supported");
+ }
+ try {
+ switch (((PrimitiveTypeInfo) hiveColumnTypesList.get(i)).getPrimitiveCategory()) {
+ case INT:
+ case SHORT:
+ case BYTE:
+ value = rs.getInt(i + 1);
+ break;
+ case LONG:
+ value = rs.getLong(i + 1);
+ break;
+ case FLOAT:
+ value = rs.getFloat(i + 1);
+ break;
+ case DOUBLE:
+ value = rs.getDouble(i + 1);
+ break;
+ case DECIMAL:
+ value = rs.getBigDecimal(i + 1);
+ break;
+ case BOOLEAN:
+ value = rs.getBoolean(i + 1);
+ break;
+ case CHAR:
+ case VARCHAR:
+ case STRING:
+ value = rs.getString(i + 1);
+ break;
+ case DATE:
+ value = rs.getDate(i + 1);
+ break;
+ case TIMESTAMP:
+ value = rs.getTimestamp(i + 1);
+ break;
+ default:
+ LOGGER.error("date type of column " + hiveColumnNames[i] + ":" +
+ ((PrimitiveTypeInfo) hiveColumnTypesList.get(i)).getPrimitiveCategory() +
+ " is not supported");
+ value = null;
+ break;
+ }
+ if (value != null && !rs.wasNull()) {
+ record.put(key, value);
+ } else {
+ record.put(key, null);
+ }
+ } catch (SQLDataException e) {
+ record.put(key, null);
+ }
+
}
return record;
http://git-wip-us.apache.org/repos/asf/hive/blob/3e5e77d1/jdbc-handler/src/main/java/org/apache/hive/storage/jdbc/dao/JethroDatabaseAccessor.java
----------------------------------------------------------------------
diff --git a/jdbc-handler/src/main/java/org/apache/hive/storage/jdbc/dao/JethroDatabaseAccessor.java b/jdbc-handler/src/main/java/org/apache/hive/storage/jdbc/dao/JethroDatabaseAccessor.java
index db0454e..db72a1b 100644
--- a/jdbc-handler/src/main/java/org/apache/hive/storage/jdbc/dao/JethroDatabaseAccessor.java
+++ b/jdbc-handler/src/main/java/org/apache/hive/storage/jdbc/dao/JethroDatabaseAccessor.java
@@ -43,8 +43,7 @@ public class JethroDatabaseAccessor extends GenericJdbcDatabaseAccessor {
}
@Override
- protected String getMetaDataQuery(Configuration conf) {
- String sql = JdbcStorageConfigManager.getQueryToExecute(conf);
+ protected String getMetaDataQuery(String sql) {
return addLimitToQuery(sql, 0);
}
}
http://git-wip-us.apache.org/repos/asf/hive/blob/3e5e77d1/jdbc-handler/src/main/java/org/apache/hive/storage/jdbc/dao/MySqlDatabaseAccessor.java
----------------------------------------------------------------------
diff --git a/jdbc-handler/src/main/java/org/apache/hive/storage/jdbc/dao/MySqlDatabaseAccessor.java b/jdbc-handler/src/main/java/org/apache/hive/storage/jdbc/dao/MySqlDatabaseAccessor.java
index 86fde7c..405ca4c 100644
--- a/jdbc-handler/src/main/java/org/apache/hive/storage/jdbc/dao/MySqlDatabaseAccessor.java
+++ b/jdbc-handler/src/main/java/org/apache/hive/storage/jdbc/dao/MySqlDatabaseAccessor.java
@@ -26,14 +26,26 @@ public class MySqlDatabaseAccessor extends GenericJdbcDatabaseAccessor {
return addLimitToQuery(sql, limit);
}
else {
- return sql + " LIMIT " + offset + "," + limit;
+ if (limit != -1) {
+ return sql + " LIMIT " + offset + "," + limit;
+ } else {
+ return sql;
+ }
}
}
@Override
protected String addLimitToQuery(String sql, int limit) {
- return sql + " LIMIT " + limit;
+ if (limit != -1) {
+ return sql + " LIMIT " + limit;
+ } else {
+ return sql;
+ }
}
+ @Override
+ public boolean needColumnQuote() {
+ return false;
+ }
}
http://git-wip-us.apache.org/repos/asf/hive/blob/3e5e77d1/jdbc-handler/src/main/java/org/apache/hive/storage/jdbc/spitter/DateIntervalSplitter.java
----------------------------------------------------------------------
diff --git a/jdbc-handler/src/main/java/org/apache/hive/storage/jdbc/spitter/DateIntervalSplitter.java b/jdbc-handler/src/main/java/org/apache/hive/storage/jdbc/spitter/DateIntervalSplitter.java
new file mode 100644
index 0000000..664e61b
--- /dev/null
+++ b/jdbc-handler/src/main/java/org/apache/hive/storage/jdbc/spitter/DateIntervalSplitter.java
@@ -0,0 +1,42 @@
+/*
+ *
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.hive.storage.jdbc.spitter;
+
+import org.apache.commons.lang3.tuple.MutablePair;
+import org.apache.hadoop.hive.serde2.typeinfo.TypeInfo;
+
+import java.sql.Date;
+import java.util.ArrayList;
+import java.util.List;
+
+public class DateIntervalSplitter implements IntervalSplitter {
+ @Override
+ public List<MutablePair<String, String>> getIntervals(String lowerBound, String upperBound, int numPartitions, TypeInfo
+ typeInfo) {
+ List<MutablePair<String, String>> intervals = new ArrayList<>();
+ Date dateLower = Date.valueOf(lowerBound);
+ Date dateUpper = Date.valueOf(upperBound);
+ double dateInterval = (dateUpper.getTime() - dateLower.getTime())/(double)numPartitions;
+ Date splitDateLower, splitDateUpper;
+ for (int i=0;i<numPartitions;i++) {
+ splitDateLower = new Date(Math.round(dateLower.getTime() + dateInterval*i));
+ splitDateUpper = new Date(Math.round(dateLower.getTime() + dateInterval*(i+1)));
+ if (splitDateLower.compareTo(splitDateUpper) < 0) {
+ intervals.add(new MutablePair<String, String>(splitDateLower.toString(), splitDateUpper.toString()));
+ }
+ }
+ return intervals;
+ }
+}
http://git-wip-us.apache.org/repos/asf/hive/blob/3e5e77d1/jdbc-handler/src/main/java/org/apache/hive/storage/jdbc/spitter/DecimalIntervalSplitter.java
----------------------------------------------------------------------
diff --git a/jdbc-handler/src/main/java/org/apache/hive/storage/jdbc/spitter/DecimalIntervalSplitter.java b/jdbc-handler/src/main/java/org/apache/hive/storage/jdbc/spitter/DecimalIntervalSplitter.java
new file mode 100644
index 0000000..5636c7d
--- /dev/null
+++ b/jdbc-handler/src/main/java/org/apache/hive/storage/jdbc/spitter/DecimalIntervalSplitter.java
@@ -0,0 +1,50 @@
+/*
+ *
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.hive.storage.jdbc.spitter;
+
+import org.apache.commons.lang3.tuple.MutablePair;
+import org.apache.hadoop.hive.serde2.typeinfo.DecimalTypeInfo;
+import org.apache.hadoop.hive.serde2.typeinfo.TypeInfo;
+
+import java.math.BigDecimal;
+import java.math.MathContext;
+import java.math.RoundingMode;
+import java.util.ArrayList;
+import java.util.List;
+
+public class DecimalIntervalSplitter implements IntervalSplitter {
+ @Override
+ public List<MutablePair<String, String>> getIntervals(String lowerBound, String upperBound, int numPartitions, TypeInfo
+ typeInfo) {
+ List<MutablePair<String, String>> intervals = new ArrayList<>();
+ DecimalTypeInfo decimalTypeInfo = (DecimalTypeInfo)typeInfo;
+ int scale = decimalTypeInfo.getScale();
+ BigDecimal decimalLower = new BigDecimal(lowerBound);
+ BigDecimal decimalUpper = new BigDecimal(upperBound);
+ BigDecimal decimalInterval = (decimalUpper.subtract(decimalLower)).divide(new BigDecimal(numPartitions),
+ MathContext.DECIMAL64);
+ BigDecimal splitDecimalLower, splitDecimalUpper;
+ for (int i=0;i<numPartitions;i++) {
+ splitDecimalLower = decimalLower.add(decimalInterval.multiply(new BigDecimal(i))).setScale(scale,
+ RoundingMode.HALF_EVEN);
+ splitDecimalUpper = decimalLower.add(decimalInterval.multiply(new BigDecimal(i+1))).setScale(scale,
+ RoundingMode.HALF_EVEN);
+ if (splitDecimalLower.compareTo(splitDecimalUpper) < 0) {
+ intervals.add(new MutablePair<String, String>(splitDecimalLower.toPlainString(), splitDecimalUpper.toPlainString()));
+ }
+ }
+ return intervals;
+ }
+}
http://git-wip-us.apache.org/repos/asf/hive/blob/3e5e77d1/jdbc-handler/src/main/java/org/apache/hive/storage/jdbc/spitter/DoubleIntervalSplitter.java
----------------------------------------------------------------------
diff --git a/jdbc-handler/src/main/java/org/apache/hive/storage/jdbc/spitter/DoubleIntervalSplitter.java b/jdbc-handler/src/main/java/org/apache/hive/storage/jdbc/spitter/DoubleIntervalSplitter.java
new file mode 100644
index 0000000..aa955c2
--- /dev/null
+++ b/jdbc-handler/src/main/java/org/apache/hive/storage/jdbc/spitter/DoubleIntervalSplitter.java
@@ -0,0 +1,41 @@
+/*
+ *
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.hive.storage.jdbc.spitter;
+
+import org.apache.commons.lang3.tuple.MutablePair;
+import org.apache.hadoop.hive.serde2.typeinfo.TypeInfo;
+
+import java.util.ArrayList;
+import java.util.List;
+
+public class DoubleIntervalSplitter implements IntervalSplitter {
+ @Override
+ public List<MutablePair<String, String>> getIntervals(String lowerBound, String upperBound, int numPartitions, TypeInfo
+ typeInfo) {
+ List<MutablePair<String, String>> intervals = new ArrayList<>();
+ double doubleLower = Double.parseDouble(lowerBound);
+ double doubleUpper = Double.parseDouble(upperBound);
+ double doubleInterval = (doubleUpper - doubleLower)/(double)numPartitions;
+ double splitDoubleLower, splitDoubleUpper;
+ for (int i=0;i<numPartitions;i++) {
+ splitDoubleLower = doubleLower + doubleInterval*i;
+ splitDoubleUpper = doubleLower + doubleInterval*(i+1);
+ if (splitDoubleUpper > splitDoubleLower) {
+ intervals.add(new MutablePair<String, String>(Double.toString(splitDoubleLower), Double.toString(splitDoubleUpper)));
+ }
+ }
+ return intervals;
+ }
+}
http://git-wip-us.apache.org/repos/asf/hive/blob/3e5e77d1/jdbc-handler/src/main/java/org/apache/hive/storage/jdbc/spitter/IntervalSplitter.java
----------------------------------------------------------------------
diff --git a/jdbc-handler/src/main/java/org/apache/hive/storage/jdbc/spitter/IntervalSplitter.java b/jdbc-handler/src/main/java/org/apache/hive/storage/jdbc/spitter/IntervalSplitter.java
new file mode 100644
index 0000000..4f3455c
--- /dev/null
+++ b/jdbc-handler/src/main/java/org/apache/hive/storage/jdbc/spitter/IntervalSplitter.java
@@ -0,0 +1,24 @@
+/*
+ *
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.hive.storage.jdbc.spitter;
+
+import org.apache.commons.lang3.tuple.MutablePair;
+import org.apache.hadoop.hive.serde2.typeinfo.TypeInfo;
+
+import java.util.List;
+
+public interface IntervalSplitter {
+ List<MutablePair<String, String>> getIntervals(String lowerBound, String upperBound, int numPartitions, TypeInfo typeInfo);
+}
\ No newline at end of file
http://git-wip-us.apache.org/repos/asf/hive/blob/3e5e77d1/jdbc-handler/src/main/java/org/apache/hive/storage/jdbc/spitter/IntervalSplitterFactory.java
----------------------------------------------------------------------
diff --git a/jdbc-handler/src/main/java/org/apache/hive/storage/jdbc/spitter/IntervalSplitterFactory.java b/jdbc-handler/src/main/java/org/apache/hive/storage/jdbc/spitter/IntervalSplitterFactory.java
new file mode 100644
index 0000000..efa8c0c
--- /dev/null
+++ b/jdbc-handler/src/main/java/org/apache/hive/storage/jdbc/spitter/IntervalSplitterFactory.java
@@ -0,0 +1,45 @@
+/*
+ *
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.hive.storage.jdbc.spitter;
+
+import org.apache.hadoop.hive.serde2.typeinfo.PrimitiveTypeInfo;
+import org.apache.hadoop.hive.serde2.typeinfo.TypeInfo;
+
+import java.io.IOException;
+
+public class IntervalSplitterFactory {
+ public static IntervalSplitter newIntervalSpitter(TypeInfo typeInfo) throws IOException {
+ PrimitiveTypeInfo primitiveTypeInfo = (PrimitiveTypeInfo) typeInfo;
+ switch (primitiveTypeInfo.getPrimitiveCategory()) {
+ case BYTE:
+ case SHORT:
+ case INT:
+ case LONG:
+ return new LongIntervalSpitter();
+ case FLOAT:
+ case DOUBLE:
+ return new DoubleIntervalSplitter();
+ case DECIMAL:
+ return new DecimalIntervalSplitter();
+ case TIMESTAMP:
+ return new TimestampIntervalSplitter();
+ case DATE:
+ return new DateIntervalSplitter();
+ default:
+ throw new IOException("partitionColumn is " + primitiveTypeInfo.getPrimitiveCategory() +
+ ", only numeric/date/timestamp type can be a partition column");
+ }
+ }
+}
http://git-wip-us.apache.org/repos/asf/hive/blob/3e5e77d1/jdbc-handler/src/main/java/org/apache/hive/storage/jdbc/spitter/LongIntervalSpitter.java
----------------------------------------------------------------------
diff --git a/jdbc-handler/src/main/java/org/apache/hive/storage/jdbc/spitter/LongIntervalSpitter.java b/jdbc-handler/src/main/java/org/apache/hive/storage/jdbc/spitter/LongIntervalSpitter.java
new file mode 100644
index 0000000..e540fb8
--- /dev/null
+++ b/jdbc-handler/src/main/java/org/apache/hive/storage/jdbc/spitter/LongIntervalSpitter.java
@@ -0,0 +1,42 @@
+/*
+ *
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.hive.storage.jdbc.spitter;
+
+import org.apache.commons.lang3.tuple.MutablePair;
+import org.apache.hadoop.hive.serde2.typeinfo.TypeInfo;
+
+import java.util.ArrayList;
+import java.util.List;
+
+public class LongIntervalSpitter implements IntervalSplitter {
+
+ @Override
+ public List<MutablePair<String, String>> getIntervals(String lowerBound, String upperBound, int numPartitions, TypeInfo
+ typeInfo) {
+ List<MutablePair<String, String>> intervals = new ArrayList<>();
+ long longLower = Long.parseLong(lowerBound);
+ long longUpper = Long.parseLong(upperBound);
+ double longInterval = (longUpper - longLower) / (double) numPartitions;
+ long splitLongLower, splitLongUpper;
+ for (int i = 0; i < numPartitions; i++) {
+ splitLongLower = Math.round(longLower + longInterval * i);
+ splitLongUpper = Math.round(longLower + longInterval * (i + 1));
+ if (splitLongUpper > splitLongLower) {
+ intervals.add(new MutablePair<String, String>(Long.toString(splitLongLower), Long.toString(splitLongUpper)));
+ }
+ }
+ return intervals;
+ }
+}
http://git-wip-us.apache.org/repos/asf/hive/blob/3e5e77d1/jdbc-handler/src/main/java/org/apache/hive/storage/jdbc/spitter/TimestampIntervalSplitter.java
----------------------------------------------------------------------
diff --git a/jdbc-handler/src/main/java/org/apache/hive/storage/jdbc/spitter/TimestampIntervalSplitter.java b/jdbc-handler/src/main/java/org/apache/hive/storage/jdbc/spitter/TimestampIntervalSplitter.java
new file mode 100644
index 0000000..e948a5f
--- /dev/null
+++ b/jdbc-handler/src/main/java/org/apache/hive/storage/jdbc/spitter/TimestampIntervalSplitter.java
@@ -0,0 +1,43 @@
+/*
+ *
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.hive.storage.jdbc.spitter;
+
+import org.apache.commons.lang3.tuple.MutablePair;
+import org.apache.hadoop.hive.serde2.typeinfo.TypeInfo;
+
+import java.sql.Timestamp;
+import java.util.ArrayList;
+import java.util.List;
+
+public class TimestampIntervalSplitter implements IntervalSplitter {
+ @Override
+ public List<MutablePair<String, String>> getIntervals(String lowerBound, String upperBound, int numPartitions, TypeInfo
+ typeInfo) {
+ List<MutablePair<String, String>> intervals = new ArrayList<>();
+ Timestamp timestampLower = Timestamp.valueOf(lowerBound);
+ Timestamp timestampUpper = Timestamp.valueOf(upperBound);
+ // Note nano is not fully represented as the precision limit
+ double timestampInterval = (timestampUpper.getTime() - timestampLower.getTime())/(double)numPartitions;
+ Timestamp splitTimestampLower, splitTimestampUpper;
+ for (int i=0;i<numPartitions;i++) {
+ splitTimestampLower = new Timestamp(Math.round(timestampLower.getTime() + timestampInterval*i));
+ splitTimestampUpper = new Timestamp(Math.round(timestampLower.getTime() + timestampInterval*(i+1)));
+ if (splitTimestampLower.compareTo(splitTimestampUpper) < 0) {
+ intervals.add(new MutablePair<String, String>(splitTimestampLower.toString(), splitTimestampUpper.toString()));
+ }
+ }
+ return intervals;
+ }
+}
http://git-wip-us.apache.org/repos/asf/hive/blob/3e5e77d1/jdbc-handler/src/test/java/org/apache/hive/storage/jdbc/TestJdbcInputFormat.java
----------------------------------------------------------------------
diff --git a/jdbc-handler/src/test/java/org/apache/hive/storage/jdbc/TestJdbcInputFormat.java b/jdbc-handler/src/test/java/org/apache/hive/storage/jdbc/TestJdbcInputFormat.java
index b146633..cde97d6 100644
--- a/jdbc-handler/src/test/java/org/apache/hive/storage/jdbc/TestJdbcInputFormat.java
+++ b/jdbc-handler/src/test/java/org/apache/hive/storage/jdbc/TestJdbcInputFormat.java
@@ -14,7 +14,10 @@
*/
package org.apache.hive.storage.jdbc;
+import com.google.common.collect.Lists;
+import org.apache.commons.lang3.tuple.ImmutablePair;
import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.hive.serde.serdeConstants;
import org.apache.hadoop.mapred.InputSplit;
import org.apache.hadoop.mapred.JobConf;
import org.apache.hive.storage.jdbc.dao.DatabaseAccessor;
@@ -33,7 +36,11 @@ import java.io.IOException;
import static org.hamcrest.Matchers.is;
import static org.hamcrest.Matchers.notNullValue;
import static org.junit.Assert.assertThat;
+import static org.junit.Assert.assertEquals;
+import static org.junit.Assert.assertNull;
+import static org.junit.Assert.assertTrue;
import static org.mockito.Matchers.any;
+import static org.mockito.Matchers.anyBoolean;
import static org.mockito.Mockito.when;
@RunWith(PowerMockRunner.class)
@@ -45,7 +52,7 @@ public class TestJdbcInputFormat {
@Test
- public void testSplitLogic_noSpillOver() throws HiveJdbcDatabaseAccessException, IOException {
+ public void testLimitSplit_noSpillOver() throws HiveJdbcDatabaseAccessException, IOException {
PowerMockito.mockStatic(DatabaseAccessorFactory.class);
BDDMockito.given(DatabaseAccessorFactory.getAccessor(any(Configuration.class))).willReturn(mockDatabaseAccessor);
JdbcInputFormat f = new JdbcInputFormat();
@@ -53,7 +60,8 @@ public class TestJdbcInputFormat {
JobConf conf = new JobConf();
conf.set("mapred.input.dir", "/temp");
- InputSplit[] splits = f.getSplits(conf, 3);
+ conf.set("hive.sql.numPartitions", "3");
+ InputSplit[] splits = f.getSplits(conf, -1);
assertThat(splits, is(notNullValue()));
assertThat(splits.length, is(3));
@@ -63,7 +71,7 @@ public class TestJdbcInputFormat {
@Test
- public void testSplitLogic_withSpillOver() throws HiveJdbcDatabaseAccessException, IOException {
+ public void testLimitSplit_withSpillOver() throws HiveJdbcDatabaseAccessException, IOException {
PowerMockito.mockStatic(DatabaseAccessorFactory.class);
BDDMockito.given(DatabaseAccessorFactory.getAccessor(any(Configuration.class))).willReturn(mockDatabaseAccessor);
JdbcInputFormat f = new JdbcInputFormat();
@@ -71,7 +79,8 @@ public class TestJdbcInputFormat {
JobConf conf = new JobConf();
conf.set("mapred.input.dir", "/temp");
- InputSplit[] splits = f.getSplits(conf, 6);
+ conf.set("hive.sql.numPartitions", "6");
+ InputSplit[] splits = f.getSplits(conf, -1);
assertThat(splits, is(notNullValue()));
assertThat(splits.length, is(6));
@@ -84,4 +93,191 @@ public class TestJdbcInputFormat {
assertThat(splits[i].getLength(), is(2L));
}
}
+
+ @Test
+ public void testIntervalSplit_Long() throws HiveJdbcDatabaseAccessException, IOException {
+ PowerMockito.mockStatic(DatabaseAccessorFactory.class);
+ BDDMockito.given(DatabaseAccessorFactory.getAccessor(any(Configuration.class))).willReturn(mockDatabaseAccessor);
+ JdbcInputFormat f = new JdbcInputFormat();
+ when(mockDatabaseAccessor.getColumnNames(any(Configuration.class))).thenReturn(Lists.newArrayList("a"));
+
+ JobConf conf = new JobConf();
+ conf.set("mapred.input.dir", "/temp");
+ conf.set(serdeConstants.LIST_COLUMN_TYPES, "int");
+ conf.set("hive.sql.partitionColumn", "a");
+ conf.set("hive.sql.numPartitions", "3");
+ conf.set("hive.sql.lowerBound", "1");
+ conf.set("hive.sql.upperBound", "10");
+ InputSplit[] splits = f.getSplits(conf, -1);
+
+ assertThat(splits, is(notNullValue()));
+ assertThat(splits.length, is(3));
+
+ assertNull(((JdbcInputSplit)splits[0]).getLowerBound());
+ assertEquals(((JdbcInputSplit)splits[0]).getUpperBound(), "4");
+ assertEquals(((JdbcInputSplit)splits[1]).getLowerBound(), "4");
+ assertEquals(((JdbcInputSplit)splits[1]).getUpperBound(), "7");
+ assertEquals(((JdbcInputSplit)splits[2]).getLowerBound(), "7");
+ assertNull(((JdbcInputSplit)splits[2]).getUpperBound());
+ }
+
+ @Test
+ public void testIntervalSplit_Double() throws HiveJdbcDatabaseAccessException, IOException {
+ PowerMockito.mockStatic(DatabaseAccessorFactory.class);
+ BDDMockito.given(DatabaseAccessorFactory.getAccessor(any(Configuration.class))).willReturn(mockDatabaseAccessor);
+ JdbcInputFormat f = new JdbcInputFormat();
+ when(mockDatabaseAccessor.getColumnNames(any(Configuration.class))).thenReturn(Lists.newArrayList("a"));
+
+ JobConf conf = new JobConf();
+ conf.set("mapred.input.dir", "/temp");
+ conf.set(serdeConstants.LIST_COLUMN_TYPES, "double");
+ conf.set("hive.sql.partitionColumn", "a");
+ conf.set("hive.sql.numPartitions", "3");
+ conf.set("hive.sql.lowerBound", "0");
+ conf.set("hive.sql.upperBound", "10");
+ InputSplit[] splits = f.getSplits(conf, -1);
+
+ assertThat(splits, is(notNullValue()));
+ assertThat(splits.length, is(3));
+
+ assertNull(((JdbcInputSplit)splits[0]).getLowerBound());
+ assertTrue(Double.parseDouble(((JdbcInputSplit)splits[0]).getUpperBound()) > 3.3 && Double.parseDouble((
+ (JdbcInputSplit)splits[0]).getUpperBound()) < 3.4);
+ assertTrue(Double.parseDouble(((JdbcInputSplit)splits[1]).getLowerBound()) > 3.3 && Double.parseDouble((
+ (JdbcInputSplit)splits[1]).getLowerBound()) < 3.4);
+ assertTrue(Double.parseDouble(((JdbcInputSplit)splits[1]).getUpperBound()) > 6.6 && Double.parseDouble((
+ (JdbcInputSplit)splits[1]).getUpperBound()) < 6.7);
+ assertTrue(Double.parseDouble(((JdbcInputSplit)splits[2]).getLowerBound()) > 6.6 && Double.parseDouble((
+ (JdbcInputSplit)splits[2]).getLowerBound()) < 6.7);
+ assertNull(((JdbcInputSplit)splits[2]).getUpperBound());
+ }
+
+ @Test
+ public void testIntervalSplit_Decimal() throws HiveJdbcDatabaseAccessException, IOException {
+ PowerMockito.mockStatic(DatabaseAccessorFactory.class);
+ BDDMockito.given(DatabaseAccessorFactory.getAccessor(any(Configuration.class))).willReturn(mockDatabaseAccessor);
+ JdbcInputFormat f = new JdbcInputFormat();
+ when(mockDatabaseAccessor.getColumnNames(any(Configuration.class))).thenReturn(Lists.newArrayList("a"));
+
+ JobConf conf = new JobConf();
+ conf.set("mapred.input.dir", "/temp");
+ conf.set(serdeConstants.LIST_COLUMN_TYPES, "decimal(10,5)");
+ conf.set("hive.sql.partitionColumn", "a");
+ conf.set("hive.sql.numPartitions", "4");
+ conf.set("hive.sql.lowerBound", "5");
+ conf.set("hive.sql.upperBound", "1000");
+ InputSplit[] splits = f.getSplits(conf, -1);
+
+ assertThat(splits, is(notNullValue()));
+ assertThat(splits.length, is(4));
+
+ assertNull(((JdbcInputSplit)splits[0]).getLowerBound());
+ assertEquals(((JdbcInputSplit)splits[0]).getUpperBound(), "253.75000");
+ assertEquals(((JdbcInputSplit)splits[1]).getLowerBound(), "253.75000");
+ assertEquals(((JdbcInputSplit)splits[1]).getUpperBound(), "502.50000");
+ assertEquals(((JdbcInputSplit)splits[2]).getLowerBound(), "502.50000");
+ assertEquals(((JdbcInputSplit)splits[2]).getUpperBound(), "751.25000");
+ assertEquals(((JdbcInputSplit)splits[3]).getLowerBound(), "751.25000");
+ assertNull(((JdbcInputSplit)splits[3]).getUpperBound());
+ }
+
+ @Test
+ public void testIntervalSplit_Timestamp() throws HiveJdbcDatabaseAccessException, IOException {
+ PowerMockito.mockStatic(DatabaseAccessorFactory.class);
+ BDDMockito.given(DatabaseAccessorFactory.getAccessor(any(Configuration.class))).willReturn(mockDatabaseAccessor);
+ JdbcInputFormat f = new JdbcInputFormat();
+ when(mockDatabaseAccessor.getColumnNames(any(Configuration.class))).thenReturn(Lists.newArrayList("a"));
+ when(mockDatabaseAccessor.getBounds(any(Configuration.class), any(String.class), anyBoolean(), anyBoolean()))
+ .thenReturn(new ImmutablePair<String, String>("2010-01-01 00:00:00.000000000", "2018-01-01 " +
+ "12:00:00.000000000"));
+
+ JobConf conf = new JobConf();
+ conf.set("mapred.input.dir", "/temp");
+ conf.set(serdeConstants.LIST_COLUMN_TYPES, "timestamp");
+ conf.set("hive.sql.partitionColumn", "a");
+ conf.set("hive.sql.numPartitions", "2");
+ InputSplit[] splits = f.getSplits(conf, -1);
+
+ assertThat(splits, is(notNullValue()));
+ assertThat(splits.length, is(2));
+
+ assertNull(((JdbcInputSplit)splits[0]).getLowerBound());
+ assertEquals(((JdbcInputSplit)splits[0]).getUpperBound(), "2014-01-01 06:00:00.0");
+ assertEquals(((JdbcInputSplit)splits[1]).getLowerBound(), "2014-01-01 06:00:00.0");
+ assertNull(((JdbcInputSplit)splits[1]).getUpperBound());
+ }
+
+ @Test
+ public void testIntervalSplit_Date() throws HiveJdbcDatabaseAccessException, IOException {
+ PowerMockito.mockStatic(DatabaseAccessorFactory.class);
+ BDDMockito.given(DatabaseAccessorFactory.getAccessor(any(Configuration.class))).willReturn(mockDatabaseAccessor);
+ JdbcInputFormat f = new JdbcInputFormat();
+ when(mockDatabaseAccessor.getColumnNames(any(Configuration.class))).thenReturn(Lists.newArrayList("a"));
+ when(mockDatabaseAccessor.getBounds(any(Configuration.class), any(String.class), anyBoolean(), anyBoolean()))
+ .thenReturn(new ImmutablePair<String, String>("2010-01-01", "2018-01-01"));
+
+ JobConf conf = new JobConf();
+ conf.set("mapred.input.dir", "/temp");
+ conf.set(serdeConstants.LIST_COLUMN_TYPES, "date");
+ conf.set("hive.sql.partitionColumn", "a");
+ conf.set("hive.sql.numPartitions", "3");
+ InputSplit[] splits = f.getSplits(conf, -1);
+
+ assertThat(splits, is(notNullValue()));
+ assertThat(splits.length, is(3));
+
+ assertNull(((JdbcInputSplit)splits[0]).getLowerBound());
+ assertEquals(((JdbcInputSplit)splits[0]).getUpperBound(), "2012-09-01");
+ assertEquals(((JdbcInputSplit)splits[1]).getLowerBound(), "2012-09-01");
+ assertEquals(((JdbcInputSplit)splits[1]).getUpperBound(), "2015-05-03");
+ assertEquals(((JdbcInputSplit)splits[2]).getLowerBound(), "2015-05-03");
+ assertNull(((JdbcInputSplit)splits[2]).getUpperBound());
+ }
+
+ @Test
+ public void testIntervalSplit_AutoShrink() throws HiveJdbcDatabaseAccessException, IOException {
+ PowerMockito.mockStatic(DatabaseAccessorFactory.class);
+ BDDMockito.given(DatabaseAccessorFactory.getAccessor(any(Configuration.class))).willReturn(mockDatabaseAccessor);
+ JdbcInputFormat f = new JdbcInputFormat();
+ when(mockDatabaseAccessor.getColumnNames(any(Configuration.class))).thenReturn(Lists.newArrayList("a"));
+
+ JobConf conf = new JobConf();
+ conf.set("mapred.input.dir", "/temp");
+ conf.set(serdeConstants.LIST_COLUMN_TYPES, "int");
+ conf.set("hive.sql.partitionColumn", "a");
+ conf.set("hive.sql.numPartitions", "5");
+ conf.set("hive.sql.lowerBound", "2");
+ conf.set("hive.sql.upperBound", "4");
+ InputSplit[] splits = f.getSplits(conf, -1);
+
+ assertThat(splits, is(notNullValue()));
+ assertThat(splits.length, is(2));
+
+ assertNull(((JdbcInputSplit)splits[0]).getLowerBound());
+ assertEquals(((JdbcInputSplit)splits[0]).getUpperBound(), "3");
+ assertEquals(((JdbcInputSplit)splits[1]).getLowerBound(), "3");
+ assertNull(((JdbcInputSplit)splits[1]).getUpperBound());
+ }
+
+ @Test
+ public void testIntervalSplit_NoSplit() throws HiveJdbcDatabaseAccessException, IOException {
+ PowerMockito.mockStatic(DatabaseAccessorFactory.class);
+ BDDMockito.given(DatabaseAccessorFactory.getAccessor(any(Configuration.class))).willReturn(mockDatabaseAccessor);
+ JdbcInputFormat f = new JdbcInputFormat();
+ when(mockDatabaseAccessor.getColumnNames(any(Configuration.class))).thenReturn(Lists.newArrayList("a"));
+
+ JobConf conf = new JobConf();
+ conf.set("mapred.input.dir", "/temp");
+ conf.set(serdeConstants.LIST_COLUMN_TYPES, "int");
+ conf.set("hive.sql.partitionColumn", "a");
+ conf.set("hive.sql.numPartitions", "5");
+ conf.set("hive.sql.lowerBound", "1");
+ conf.set("hive.sql.upperBound", "2");
+ InputSplit[] splits = f.getSplits(conf, -1);
+
+ assertThat(splits, is(notNullValue()));
+ assertThat(splits.length, is(1));
+
+ assertNull(((JdbcInputSplit)splits[0]).getPartitionColumn());
+ }
}
http://git-wip-us.apache.org/repos/asf/hive/blob/3e5e77d1/jdbc-handler/src/test/java/org/apache/hive/storage/jdbc/dao/TestGenericJdbcDatabaseAccessor.java
----------------------------------------------------------------------
diff --git a/jdbc-handler/src/test/java/org/apache/hive/storage/jdbc/dao/TestGenericJdbcDatabaseAccessor.java b/jdbc-handler/src/test/java/org/apache/hive/storage/jdbc/dao/TestGenericJdbcDatabaseAccessor.java
index 34f061e..545a71f 100644
--- a/jdbc-handler/src/test/java/org/apache/hive/storage/jdbc/dao/TestGenericJdbcDatabaseAccessor.java
+++ b/jdbc-handler/src/test/java/org/apache/hive/storage/jdbc/dao/TestGenericJdbcDatabaseAccessor.java
@@ -15,6 +15,7 @@
package org.apache.hive.storage.jdbc.dao;
import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.hive.serde.serdeConstants;
import org.apache.hive.storage.jdbc.conf.JdbcStorageConfig;
import org.apache.hive.storage.jdbc.exception.HiveJdbcDatabaseAccessException;
import org.junit.Test;
@@ -111,7 +112,7 @@ public class TestGenericJdbcDatabaseAccessor {
public void testGetRecordIterator() throws HiveJdbcDatabaseAccessException {
Configuration conf = buildConfiguration();
DatabaseAccessor accessor = DatabaseAccessorFactory.getAccessor(conf);
- JdbcRecordIterator iterator = accessor.getRecordIterator(conf, 2, 0);
+ JdbcRecordIterator iterator = accessor.getRecordIterator(conf, null, null, null,2, 0);
assertThat(iterator, is(notNullValue()));
@@ -122,7 +123,7 @@ public class TestGenericJdbcDatabaseAccessor {
assertThat(record, is(notNullValue()));
assertThat(record.size(), is(equalTo(7)));
- assertThat(record.get("STRATEGY_ID"), is(equalTo(count)));
+ assertThat(record.get("strategy_id"), is(equalTo(count)));
}
assertThat(count, is(equalTo(2)));
@@ -134,7 +135,7 @@ public class TestGenericJdbcDatabaseAccessor {
public void testGetRecordIterator_offsets() throws HiveJdbcDatabaseAccessException {
Configuration conf = buildConfiguration();
DatabaseAccessor accessor = DatabaseAccessorFactory.getAccessor(conf);
- JdbcRecordIterator iterator = accessor.getRecordIterator(conf, 2, 2);
+ JdbcRecordIterator iterator = accessor.getRecordIterator(conf, null, null, null, 2, 2);
assertThat(iterator, is(notNullValue()));
@@ -145,7 +146,7 @@ public class TestGenericJdbcDatabaseAccessor {
assertThat(record, is(notNullValue()));
assertThat(record.size(), is(equalTo(7)));
- assertThat(record.get("STRATEGY_ID"), is(equalTo(count + 2)));
+ assertThat(record.get("strategy_id"), is(equalTo(count + 2)));
}
assertThat(count, is(equalTo(2)));
@@ -158,7 +159,7 @@ public class TestGenericJdbcDatabaseAccessor {
Configuration conf = buildConfiguration();
conf.set(JdbcStorageConfig.QUERY.getPropertyName(), "select * from test_strategy where strategy_id = '25'");
DatabaseAccessor accessor = DatabaseAccessorFactory.getAccessor(conf);
- JdbcRecordIterator iterator = accessor.getRecordIterator(conf, 0, 2);
+ JdbcRecordIterator iterator = accessor.getRecordIterator(conf, null, null, null, 0, 2);
assertThat(iterator, is(notNullValue()));
assertThat(iterator.hasNext(), is(false));
@@ -170,7 +171,7 @@ public class TestGenericJdbcDatabaseAccessor {
public void testGetRecordIterator_largeOffset() throws HiveJdbcDatabaseAccessException {
Configuration conf = buildConfiguration();
DatabaseAccessor accessor = DatabaseAccessorFactory.getAccessor(conf);
- JdbcRecordIterator iterator = accessor.getRecordIterator(conf, 10, 25);
+ JdbcRecordIterator iterator = accessor.getRecordIterator(conf, null, null, null, 10, 25);
assertThat(iterator, is(notNullValue()));
assertThat(iterator.hasNext(), is(false));
@@ -184,7 +185,7 @@ public class TestGenericJdbcDatabaseAccessor {
conf.set(JdbcStorageConfig.QUERY.getPropertyName(), "select * from strategyx");
DatabaseAccessor accessor = DatabaseAccessorFactory.getAccessor(conf);
@SuppressWarnings("unused")
- JdbcRecordIterator iterator = accessor.getRecordIterator(conf, 0, 2);
+ JdbcRecordIterator iterator = accessor.getRecordIterator(conf, null, null, null, 0, 2);
}
@@ -198,7 +199,8 @@ public class TestGenericJdbcDatabaseAccessor {
config.set(JdbcStorageConfig.JDBC_URL.getPropertyName(), "jdbc:h2:mem:test;MODE=MySQL;INIT=runscript from '"
+ scriptPath + "'");
config.set(JdbcStorageConfig.QUERY.getPropertyName(), "select * from test_strategy");
-
+ config.set(serdeConstants.LIST_COLUMNS, "strategy_id,name,referrer,landing,priority,implementation,last_modified");
+ config.set(serdeConstants.LIST_COLUMN_TYPES, "int,string,string,string,int,string,timestamp");
return config;
}
http://git-wip-us.apache.org/repos/asf/hive/blob/3e5e77d1/ql/src/test/queries/clientpositive/external_jdbc_table_partition.q
----------------------------------------------------------------------
diff --git a/ql/src/test/queries/clientpositive/external_jdbc_table_partition.q b/ql/src/test/queries/clientpositive/external_jdbc_table_partition.q
new file mode 100644
index 0000000..f285d17
--- /dev/null
+++ b/ql/src/test/queries/clientpositive/external_jdbc_table_partition.q
@@ -0,0 +1,135 @@
+--! qt:dataset:src
+
+CREATE TEMPORARY FUNCTION dboutput AS 'org.apache.hadoop.hive.contrib.genericudf.example.GenericUDFDBOutput';
+
+FROM src
+SELECT
+dboutput ('jdbc:derby:;databaseName=${system:test.tmp.dir}/test_derby2;create=true','user','passwd',
+'CREATE TABLE EXTERNAL_JDBC_PARTITION_TABLE1 ("ikey" INTEGER, "bkey" BIGINT, "fkey" REAL, "dkey" DOUBLE, "chkey" VARCHAR(20), "dekey" DECIMAL(6,4), "dtkey" DATE, "tkey" TIMESTAMP)' ),
+dboutput('jdbc:derby:;databaseName=${system:test.tmp.dir}/test_derby2','user','passwd',
+'INSERT INTO EXTERNAL_JDBC_PARTITION_TABLE1 ("ikey","bkey","fkey","dkey","chkey","dekey","dtkey","tkey") VALUES (?,?,?,?,?,?,?,?)','1','1000','20.0','40.0','aaa','3.1415','2010-01-01','2018-01-01 12:00:00.000000000'),
+dboutput('jdbc:derby:;databaseName=${system:test.tmp.dir}/test_derby2','user','passwd',
+'INSERT INTO EXTERNAL_JDBC_PARTITION_TABLE1 ("ikey","bkey","fkey","dkey","chkey","dekey","dtkey","tkey") VALUES (?,?,?,?,?,?,?,?)','5','9000',null,'10.0','bbb','2.7182','2018-01-01','2010-06-01 14:00:00.000000000'),
+dboutput('jdbc:derby:;databaseName=${system:test.tmp.dir}/test_derby2','user','passwd',
+'INSERT INTO EXTERNAL_JDBC_PARTITION_TABLE1 ("ikey","bkey","fkey","dkey","chkey","dekey","dtkey","tkey") VALUES (?,?,?,?,?,?,?,?)','3','4000','120.0','25.4','hello','2.7182','2017-06-05','2011-11-10 18:00:08.000000000'),
+dboutput('jdbc:derby:;databaseName=${system:test.tmp.dir}/test_derby2','user','passwd',
+'INSERT INTO EXTERNAL_JDBC_PARTITION_TABLE1 ("ikey","bkey","fkey","dkey","chkey","dekey","dtkey","tkey") VALUES (?,?,?,?,?,?,?,?)','8','3000','180.0','35.8','world','3.1415','2014-03-03','2016-07-04 13:00:00.000000000'),
+dboutput('jdbc:derby:;databaseName=${system:test.tmp.dir}/test_derby2','user','passwd',
+'INSERT INTO EXTERNAL_JDBC_PARTITION_TABLE1 ("ikey","bkey","fkey","dkey","chkey","dekey","dtkey","tkey") VALUES (?,?,?,?,?,?,?,?)','4','8000','120.4','31.3','ccc',null,'2014-03-04','2018-07-08 11:00:00.000000000')
+limit 1;
+
+-- integer partition column
+-- lower/upper bound unset
+CREATE EXTERNAL TABLE jdbc_partition_table1
+(
+ ikey int,
+ bkey bigint,
+ fkey float,
+ dkey double,
+ chkey string,
+ dekey decimal(5,3),
+ dtkey date,
+ tkey timestamp
+)
+STORED BY 'org.apache.hive.storage.jdbc.JdbcStorageHandler'
+TBLPROPERTIES (
+ "hive.sql.database.type" = "DERBY",
+ "hive.sql.jdbc.driver" = "org.apache.derby.jdbc.EmbeddedDriver",
+ "hive.sql.jdbc.url" = "jdbc:derby:;databaseName=${system:test.tmp.dir}/test_derby2;collation=TERRITORY_BASED:PRIMARY",
+ "hive.sql.dbcp.username" = "user",
+ "hive.sql.dbcp.password" = "passwd",
+ "hive.sql.table" = "EXTERNAL_JDBC_PARTITION_TABLE1",
+ "hive.sql.dbcp.maxActive" = "1",
+ "hive.sql.partitionColumn" = "ikey",
+ "hive.sql.numPartitions" = "2"
+);
+
+SELECT * FROM jdbc_partition_table1;
+
+-- decimal partition column
+-- lower/upper bound unset
+CREATE EXTERNAL TABLE jdbc_partition_table2
+(
+ ikey int,
+ bkey bigint,
+ fkey float,
+ dkey double,
+ chkey string,
+ dekey decimal(5,3),
+ dtkey date,
+ tkey timestamp
+)
+STORED BY 'org.apache.hive.storage.jdbc.JdbcStorageHandler'
+TBLPROPERTIES (
+ "hive.sql.database.type" = "DERBY",
+ "hive.sql.jdbc.driver" = "org.apache.derby.jdbc.EmbeddedDriver",
+ "hive.sql.jdbc.url" = "jdbc:derby:;databaseName=${system:test.tmp.dir}/test_derby2;collation=TERRITORY_BASED:PRIMARY",
+ "hive.sql.dbcp.username" = "user",
+ "hive.sql.dbcp.password" = "passwd",
+ "hive.sql.table" = "EXTERNAL_JDBC_PARTITION_TABLE1",
+ "hive.sql.dbcp.maxActive" = "1",
+ "hive.sql.partitionColumn" = "dekey",
+ "hive.sql.numPartitions" = "2"
+);
+
+SELECT * FROM jdbc_partition_table2;
+
+-- float partition column
+-- lower/upper bound set
+CREATE EXTERNAL TABLE jdbc_partition_table3
+(
+ ikey int,
+ bkey bigint,
+ fkey float,
+ dkey double,
+ chkey string,
+ dekey decimal(5,3),
+ dtkey date,
+ tkey timestamp
+)
+STORED BY 'org.apache.hive.storage.jdbc.JdbcStorageHandler'
+TBLPROPERTIES (
+ "hive.sql.database.type" = "DERBY",
+ "hive.sql.jdbc.driver" = "org.apache.derby.jdbc.EmbeddedDriver",
+ "hive.sql.jdbc.url" = "jdbc:derby:;databaseName=${system:test.tmp.dir}/test_derby2;collation=TERRITORY_BASED:PRIMARY",
+ "hive.sql.dbcp.username" = "user",
+ "hive.sql.dbcp.password" = "passwd",
+ "hive.sql.table" = "EXTERNAL_JDBC_PARTITION_TABLE1",
+ "hive.sql.dbcp.maxActive" = "1",
+ "hive.sql.partitionColumn" = "fkey",
+ "hive.sql.lowerBound" = "0",
+ "hive.sql.upperBound" = "200",
+ "hive.sql.partitionColumn" = "fkey",
+ "hive.sql.numPartitions" = "2"
+);
+
+SELECT * FROM jdbc_partition_table3;
+
+-- transform push to table
+SELECT ikey+1 FROM jdbc_partition_table3;
+
+-- partition column in query not table
+CREATE EXTERNAL TABLE jdbc_partition_table4
+(
+ ikey int,
+ bkey bigint,
+ fkey float,
+ dkey double
+)
+STORED BY 'org.apache.hive.storage.jdbc.JdbcStorageHandler'
+TBLPROPERTIES (
+ "hive.sql.database.type" = "DERBY",
+ "hive.sql.jdbc.driver" = "org.apache.derby.jdbc.EmbeddedDriver",
+ "hive.sql.jdbc.url" = "jdbc:derby:;databaseName=${system:test.tmp.dir}/test_derby2;collation=TERRITORY_BASED:PRIMARY",
+ "hive.sql.dbcp.username" = "user",
+ "hive.sql.dbcp.password" = "passwd",
+ "hive.sql.query" = "SELECT \"ikey\",\"bkey\",\"fkey\",\"dkey\" FROM EXTERNAL_JDBC_PARTITION_TABLE1 WHERE \"ikey\">1",
+ "hive.sql.dbcp.maxActive" = "1",
+ "hive.sql.partitionColumn" = "fkey",
+ "hive.sql.lowerBound" = "0",
+ "hive.sql.upperBound" = "200",
+ "hive.sql.partitionColumn" = "fkey",
+ "hive.sql.numPartitions" = "2"
+);
+
+SELECT * FROM jdbc_partition_table4;