You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@hive.apache.org by ha...@apache.org on 2018/05/30 18:40:33 UTC
hive git commit: HIVE-19675 : Cast to timestamps on Druid time column
leads to an exception (Slim B via Ashutosh Chauhan)
Repository: hive
Updated Branches:
refs/heads/master 16e6b837d -> 15bf4eb98
HIVE-19675 : Cast to timestamps on Druid time column leads to an exception (Slim B via Ashutosh Chauhan)
Signed-off-by: Ashutosh Chauhan <ha...@apache.org>
Project: http://git-wip-us.apache.org/repos/asf/hive/repo
Commit: http://git-wip-us.apache.org/repos/asf/hive/commit/15bf4eb9
Tree: http://git-wip-us.apache.org/repos/asf/hive/tree/15bf4eb9
Diff: http://git-wip-us.apache.org/repos/asf/hive/diff/15bf4eb9
Branch: refs/heads/master
Commit: 15bf4eb989ed0c69fb4fb76f0e6190e38f22ab80
Parents: 16e6b83
Author: Slim Bouguerra <sl...@gmail.com>
Authored: Wed May 30 11:40:02 2018 -0700
Committer: Ashutosh Chauhan <ha...@apache.org>
Committed: Wed May 30 11:40:02 2018 -0700
----------------------------------------------------------------------
.../serde/DruidGroupByQueryRecordReader.java | 77 +----
.../hadoop/hive/druid/serde/DruidSerDe.java | 331 +++++++++----------
.../hive/druid/serde/DruidSerDeUtils.java | 5 +-
.../clientpositive/druidmini_extractTime.q | 15 +
.../druid/druidmini_extractTime.q.out | 80 +++++
5 files changed, 272 insertions(+), 236 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/hive/blob/15bf4eb9/druid-handler/src/java/org/apache/hadoop/hive/druid/serde/DruidGroupByQueryRecordReader.java
----------------------------------------------------------------------
diff --git a/druid-handler/src/java/org/apache/hadoop/hive/druid/serde/DruidGroupByQueryRecordReader.java b/druid-handler/src/java/org/apache/hadoop/hive/druid/serde/DruidGroupByQueryRecordReader.java
index 00a4b72..82c6653 100644
--- a/druid-handler/src/java/org/apache/hadoop/hive/druid/serde/DruidGroupByQueryRecordReader.java
+++ b/druid-handler/src/java/org/apache/hadoop/hive/druid/serde/DruidGroupByQueryRecordReader.java
@@ -20,113 +20,60 @@ package org.apache.hadoop.hive.druid.serde;
import com.fasterxml.jackson.core.type.TypeReference;
import com.fasterxml.jackson.databind.JavaType;
import com.fasterxml.jackson.databind.ObjectMapper;
-import com.google.common.collect.Lists;
-import com.google.common.collect.Maps;
import com.metamx.http.client.HttpClient;
import io.druid.data.input.MapBasedRow;
import io.druid.data.input.Row;
-import io.druid.query.dimension.DimensionSpec;
-import io.druid.query.dimension.ExtractionDimensionSpec;
-import io.druid.query.extraction.TimeFormatExtractionFn;
import io.druid.query.groupby.GroupByQuery;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.hive.druid.DruidStorageHandlerUtils;
import org.apache.hadoop.io.NullWritable;
import org.apache.hadoop.mapreduce.InputSplit;
-import org.joda.time.format.ISODateTimeFormat;
import java.io.IOException;
-import java.util.List;
import java.util.Map;
-import java.util.stream.Collectors;
-
-import static org.apache.hadoop.hive.druid.serde.DruidSerDeUtils.ISO_TIME_FORMAT;
/**
* Record reader for results for Druid GroupByQuery.
*/
-public class DruidGroupByQueryRecordReader
- extends DruidQueryRecordReader<GroupByQuery, Row> {
+public class DruidGroupByQueryRecordReader extends DruidQueryRecordReader<GroupByQuery, Row> {
private final static TypeReference<Row> TYPE_REFERENCE = new TypeReference<Row>() {
};
private MapBasedRow currentRow;
private Map<String, Object> currentEvent;
- private List<String> timeExtractionFields = Lists.newArrayList();
- private List<String> intFormattedTimeExtractionFields = Lists.newArrayList();
-
- @Override
- public void initialize(InputSplit split, Configuration conf) throws IOException {
+ @Override public void initialize(InputSplit split, Configuration conf) throws IOException {
super.initialize(split, conf);
- initDimensionTypes();
}
- @Override
- public void initialize(InputSplit split, Configuration conf, ObjectMapper mapper,
- ObjectMapper smileMapper, HttpClient httpClient
+ @Override public void initialize(InputSplit split, Configuration conf, ObjectMapper mapper, ObjectMapper smileMapper,
+ HttpClient httpClient
) throws IOException {
super.initialize(split, conf, mapper, smileMapper, httpClient);
- initDimensionTypes();
}
- @Override
- protected JavaType getResultTypeDef() {
+ @Override protected JavaType getResultTypeDef() {
return DruidStorageHandlerUtils.JSON_MAPPER.getTypeFactory().constructType(TYPE_REFERENCE);
}
- private void initDimensionTypes() throws IOException {
- //@TODO move this out of here to org.apache.hadoop.hive.druid.serde.DruidSerDe
- List<DimensionSpec> dimensionSpecList = ((GroupByQuery) query).getDimensions();
- List<DimensionSpec> extractionDimensionSpecList = dimensionSpecList.stream()
- .filter(dimensionSpecs -> dimensionSpecs instanceof ExtractionDimensionSpec)
- .collect(Collectors.toList());
- extractionDimensionSpecList.stream().forEach(dimensionSpec -> {
- ExtractionDimensionSpec extractionDimensionSpec = (ExtractionDimensionSpec) dimensionSpec;
- if (extractionDimensionSpec.getExtractionFn() instanceof TimeFormatExtractionFn) {
- final TimeFormatExtractionFn timeFormatExtractionFn = (TimeFormatExtractionFn) extractionDimensionSpec
- .getExtractionFn();
- if (timeFormatExtractionFn == null || timeFormatExtractionFn.getFormat().equals(ISO_TIME_FORMAT)) {
- timeExtractionFields.add(extractionDimensionSpec.getOutputName());
- } else {
- intFormattedTimeExtractionFields.add(extractionDimensionSpec.getOutputName());
- }
- }
- });
- }
-
- @Override
- public boolean nextKeyValue() {
+ @Override public boolean nextKeyValue() {
// Results
if (queryResultsIterator.hasNext()) {
final Row row = queryResultsIterator.next();
// currently druid supports only MapBasedRow as Jackson SerDe so it should safe to cast without check
currentRow = (MapBasedRow) row;
- //@TODO move this out of here to org.apache.hadoop.hive.druid.serde.DruidSerDe
- currentEvent = Maps.transformEntries(currentRow.getEvent(),
- (key, value1) -> {
- if (timeExtractionFields.contains(key)) {
- return ISODateTimeFormat.dateTimeParser().parseMillis((String) value1);
- }
- if (intFormattedTimeExtractionFields.contains(key)) {
- return Integer.valueOf((String) value1);
- }
- return value1;
- }
- );
+ currentEvent = currentRow.getEvent();
return true;
}
return false;
}
- @Override
- public NullWritable getCurrentKey() throws IOException, InterruptedException {
+ @Override public NullWritable getCurrentKey() throws IOException, InterruptedException {
return NullWritable.get();
}
- @Override
- public DruidWritable getCurrentValue() throws IOException, InterruptedException {
+ @Override public DruidWritable getCurrentValue() throws IOException, InterruptedException {
// Create new value
DruidWritable value = new DruidWritable();
// 1) The timestamp column
@@ -138,8 +85,7 @@ public class DruidGroupByQueryRecordReader
return value;
}
- @Override
- public boolean next(NullWritable key, DruidWritable value) {
+ @Override public boolean next(NullWritable key, DruidWritable value) {
if (nextKeyValue()) {
// Update value
value.getValue().clear();
@@ -154,8 +100,7 @@ public class DruidGroupByQueryRecordReader
return false;
}
- @Override
- public float getProgress() throws IOException {
+ @Override public float getProgress() throws IOException {
return queryResultsIterator.hasNext() ? 0 : 1;
}
http://git-wip-us.apache.org/repos/asf/hive/blob/15bf4eb9/druid-handler/src/java/org/apache/hadoop/hive/druid/serde/DruidSerDe.java
----------------------------------------------------------------------
diff --git a/druid-handler/src/java/org/apache/hadoop/hive/druid/serde/DruidSerDe.java b/druid-handler/src/java/org/apache/hadoop/hive/druid/serde/DruidSerDe.java
index 5f76579..842a9fa 100644
--- a/druid-handler/src/java/org/apache/hadoop/hive/druid/serde/DruidSerDe.java
+++ b/druid-handler/src/java/org/apache/hadoop/hive/druid/serde/DruidSerDe.java
@@ -42,6 +42,7 @@ import org.apache.hadoop.hive.serde2.SerDeException;
import org.apache.hadoop.hive.serde2.SerDeSpec;
import org.apache.hadoop.hive.serde2.SerDeStats;
import org.apache.hadoop.hive.serde2.io.ByteWritable;
+import org.apache.hadoop.hive.serde2.io.DateWritable;
import org.apache.hadoop.hive.serde2.io.DoubleWritable;
import org.apache.hadoop.hive.serde2.io.HiveCharWritable;
import org.apache.hadoop.hive.serde2.io.HiveVarcharWritable;
@@ -83,6 +84,7 @@ import org.slf4j.LoggerFactory;
import java.io.IOException;
import java.io.InputStream;
+import java.sql.Date;
import java.sql.Timestamp;
import java.time.Instant;
import java.time.ZonedDateTime;
@@ -97,11 +99,13 @@ import java.util.Map.Entry;
import java.util.Properties;
import java.util.stream.Collectors;
+import static org.apache.hadoop.hive.druid.serde.DruidSerDeUtils.TIMESTAMP_FORMAT;
+import static org.joda.time.format.ISODateTimeFormat.dateOptionalTimeParser;
+
/**
* DruidSerDe that is used to deserialize objects from a Druid data source.
*/
-@SerDeSpec(schemaProps = { Constants.DRUID_DATA_SOURCE })
-public class DruidSerDe extends AbstractSerDe {
+@SerDeSpec(schemaProps = { Constants.DRUID_DATA_SOURCE }) public class DruidSerDe extends AbstractSerDe {
protected static final Logger LOG = LoggerFactory.getLogger(DruidSerDe.class);
@@ -110,23 +114,19 @@ public class DruidSerDe extends AbstractSerDe {
private ObjectInspector inspector;
private TimestampLocalTZTypeInfo tsTZTypeInfo;
- @Override
- public void initialize(Configuration configuration, Properties properties) throws SerDeException {
+ @Override public void initialize(Configuration configuration, Properties properties) throws SerDeException {
- tsTZTypeInfo = new TimestampLocalTZTypeInfo(
- configuration.get(HiveConf.ConfVars.HIVE_LOCAL_TIME_ZONE.varname));
+ tsTZTypeInfo = new TimestampLocalTZTypeInfo(configuration.get(HiveConf.ConfVars.HIVE_LOCAL_TIME_ZONE.varname));
// Druid query
final String druidQuery = properties.getProperty(Constants.DRUID_QUERY_JSON, null);
- if (druidQuery != null && !druidQuery.isEmpty()) {
+ if (druidQuery != null && !druidQuery.isEmpty()) {
initFromDruidQueryPlan(properties, druidQuery);
} else {
// No query. Either it is a CTAS, or we need to create a Druid meta data Query
- if (!org.apache.commons.lang3.StringUtils
- .isEmpty(properties.getProperty(serdeConstants.LIST_COLUMNS))
- && !org.apache.commons.lang3.StringUtils
- .isEmpty(properties.getProperty(serdeConstants.LIST_COLUMN_TYPES))) {
+ if (!org.apache.commons.lang3.StringUtils.isEmpty(properties.getProperty(serdeConstants.LIST_COLUMNS))
+ && !org.apache.commons.lang3.StringUtils.isEmpty(properties.getProperty(serdeConstants.LIST_COLUMN_TYPES))) {
// CASE CTAS statement
- initFromProperties(properties);
+ initFromProperties(properties);
} else {
// Segment Metadata query that retrieves all columns present in
// the data source (dimensions and metrics).
@@ -134,8 +134,8 @@ public class DruidSerDe extends AbstractSerDe {
}
}
if (LOG.isDebugEnabled()) {
- LOG.debug("DruidSerDe initialized with\n" + "\t columns: " + Arrays.toString(columns)
- + "\n\t types: " + Arrays.toString(types));
+ LOG.debug("DruidSerDe initialized with\n" + "\t columns: " + Arrays.toString(columns) + "\n\t types: " + Arrays
+ .toString(types));
}
}
@@ -147,8 +147,8 @@ public class DruidSerDe extends AbstractSerDe {
String dataSource = properties.getProperty(Constants.DRUID_DATA_SOURCE);
if (dataSource == null) {
- throw new SerDeException("Druid data source not specified; use " +
- Constants.DRUID_DATA_SOURCE + " in table properties");
+ throw new SerDeException(
+ "Druid data source not specified; use " + Constants.DRUID_DATA_SOURCE + " in table properties");
}
SegmentMetadataQueryBuilder builder = new Druids.SegmentMetadataQueryBuilder();
builder.dataSource(dataSource);
@@ -157,9 +157,7 @@ public class DruidSerDe extends AbstractSerDe {
SegmentMetadataQuery query = builder.build();
// Execute query in Druid
- String address = HiveConf.getVar(configuration,
- HiveConf.ConfVars.HIVE_DRUID_BROKER_DEFAULT_ADDRESS
- );
+ String address = HiveConf.getVar(configuration, HiveConf.ConfVars.HIVE_DRUID_BROKER_DEFAULT_ADDRESS);
if (org.apache.commons.lang3.StringUtils.isEmpty(address)) {
throw new SerDeException("Druid broker address not specified in configuration");
}
@@ -176,33 +174,29 @@ public class DruidSerDe extends AbstractSerDe {
columnNames.add(columnInfo.getKey()); // field name
PrimitiveTypeInfo type = tsTZTypeInfo; // field type
columnTypes.add(type);
- inspectors
- .add(PrimitiveObjectInspectorFactory.getPrimitiveWritableObjectInspector(type));
+ inspectors.add(PrimitiveObjectInspectorFactory.getPrimitiveWritableObjectInspector(type));
continue;
}
columnNames.add(columnInfo.getKey()); // field name
- PrimitiveTypeInfo type = DruidSerDeUtils.convertDruidToHiveType(
- columnInfo.getValue().getType()); // field type
+ PrimitiveTypeInfo type = DruidSerDeUtils.convertDruidToHiveType(columnInfo.getValue().getType()); // field type
columnTypes.add(type instanceof TimestampLocalTZTypeInfo ? tsTZTypeInfo : type);
inspectors.add(PrimitiveObjectInspectorFactory.getPrimitiveWritableObjectInspector(type));
}
columns = columnNames.toArray(new String[columnNames.size()]);
types = columnTypes.toArray(new PrimitiveTypeInfo[columnTypes.size()]);
- inspector = ObjectInspectorFactory
- .getStandardStructObjectInspector(columnNames, inspectors);
+ inspector = ObjectInspectorFactory.getStandardStructObjectInspector(columnNames, inspectors);
}
- private void initFromProperties(final Properties properties)
- throws SerDeException {
+ private void initFromProperties(final Properties properties) throws SerDeException {
final List<ObjectInspector> inspectors = new ArrayList<>();
final List<String> columnNames = new ArrayList<>();
final List<PrimitiveTypeInfo> columnTypes = new ArrayList<>();
columnNames.addAll(Utilities.getColumnNames(properties));
if (!columnNames.contains(DruidStorageHandlerUtils.DEFAULT_TIMESTAMP_COLUMN)) {
- throw new SerDeException("Timestamp column (' " + DruidStorageHandlerUtils.DEFAULT_TIMESTAMP_COLUMN +
- "') not specified in create table; list of columns is : " +
- properties.getProperty(serdeConstants.LIST_COLUMNS));
+ throw new SerDeException("Timestamp column (' " + DruidStorageHandlerUtils.DEFAULT_TIMESTAMP_COLUMN
+ + "') not specified in create table; list of columns is : " + properties
+ .getProperty(serdeConstants.LIST_COLUMNS));
}
columnTypes.addAll(Lists.transform(
Lists.transform(Utilities.getColumnTypes(properties), type -> TypeInfoFactory.getPrimitiveTypeInfo(type)),
@@ -214,8 +208,7 @@ public class DruidSerDe extends AbstractSerDe {
));
columns = columnNames.toArray(new String[columnNames.size()]);
types = columnTypes.toArray(new PrimitiveTypeInfo[columnTypes.size()]);
- inspector = ObjectInspectorFactory
- .getStandardStructObjectInspector(columnNames, inspectors);
+ inspector = ObjectInspectorFactory.getStandardStructObjectInspector(columnNames, inspectors);
}
private void initFromDruidQueryPlan(Properties properties, String druidQuery) {
@@ -229,16 +222,16 @@ public class DruidSerDe extends AbstractSerDe {
Preconditions.checkNotNull(properties.getProperty(Constants.DRUID_QUERY_FIELD_TYPES, null));
if (fieldNamesProperty.isEmpty()) {
// this might seem counter intuitive but some queries like query
- // SELECT YEAR(Calcs.date0) AS yr_date0_ok FROM druid_tableau.calcs Calcs WHERE (YEAR(Calcs.date0) IS NULL) LIMIT 1
- // is planed in a way where we only push a filter down and keep the project of null as hive project. Thus empty columns
+ // SELECT YEAR(Calcs.date0) AS yr_date0_ok FROM druid_tableau.calcs Calcs WHERE (YEAR(Calcs.date0) IS NULL)
+ // LIMIT 1
+ // is planed in a way where we only push a filter down and keep the project of null as hive project. Thus empty
+ // columns
columnNames = Collections.EMPTY_LIST;
columnTypes = Collections.EMPTY_LIST;
} else {
- columnNames =
- Arrays.stream(fieldNamesProperty.trim().split(",")).collect(Collectors.toList());
+ columnNames = Arrays.stream(fieldNamesProperty.trim().split(",")).collect(Collectors.toList());
columnTypes = TypeInfoUtils.getTypeInfosFromTypeString(fieldTypesProperty).stream()
- .map(e -> TypeInfoFactory.getPrimitiveTypeInfo(e.getTypeName()))
- .map(primitiveTypeInfo -> {
+ .map(e -> TypeInfoFactory.getPrimitiveTypeInfo(e.getTypeName())).map(primitiveTypeInfo -> {
if (primitiveTypeInfo instanceof TimestampLocalTZTypeInfo) {
return tsTZTypeInfo;
}
@@ -257,11 +250,11 @@ public class DruidSerDe extends AbstractSerDe {
/* Submits the request and returns */
protected SegmentAnalysis submitMetadataRequest(String address, SegmentMetadataQuery query)
- throws SerDeException, IOException {
+ throws SerDeException, IOException {
InputStream response;
try {
response = DruidStorageHandlerUtils.submitRequest(DruidStorageHandler.getHttpClient(),
- DruidStorageHandlerUtils.createSmileRequest(address, query)
+ DruidStorageHandlerUtils.createSmileRequest(address, query)
);
} catch (Exception e) {
throw new SerDeException(StringUtils.stringifyException(e));
@@ -272,10 +265,9 @@ public class DruidSerDe extends AbstractSerDe {
try {
// This will throw an exception in case of the response from druid is not an array
// this case occurs if for instance druid query execution returns an exception instead of array of results.
- resultsList = DruidStorageHandlerUtils.SMILE_MAPPER.readValue(response,
- new TypeReference<List<SegmentAnalysis>>() {
- }
- );
+ resultsList =
+ DruidStorageHandlerUtils.SMILE_MAPPER.readValue(response, new TypeReference<List<SegmentAnalysis>>() {
+ });
} catch (Exception e) {
response.close();
throw new SerDeException(StringUtils.stringifyException(e));
@@ -290,17 +282,14 @@ public class DruidSerDe extends AbstractSerDe {
return resultsList.get(0);
}
- @Override
- public Class<? extends Writable> getSerializedClass() {
+ @Override public Class<? extends Writable> getSerializedClass() {
return DruidWritable.class;
}
- @Override
- public Writable serialize(Object o, ObjectInspector objectInspector) throws SerDeException {
+ @Override public Writable serialize(Object o, ObjectInspector objectInspector) throws SerDeException {
if (objectInspector.getCategory() != ObjectInspector.Category.STRUCT) {
- throw new SerDeException(getClass().toString()
- + " can only serialize struct types, but we got: "
- + objectInspector.getTypeName());
+ throw new SerDeException(
+ getClass().toString() + " can only serialize struct types, but we got: " + objectInspector.getTypeName());
}
// Prepare the field ObjectInspectors
@@ -317,52 +306,49 @@ public class DruidSerDe extends AbstractSerDe {
}
final Object res;
switch (types[i].getPrimitiveCategory()) {
- case TIMESTAMP:
- res = ((TimestampObjectInspector) fields.get(i).getFieldObjectInspector())
- .getPrimitiveJavaObject(
- values.get(i)).getTime();
- break;
- case TIMESTAMPLOCALTZ:
- res = ((TimestampLocalTZObjectInspector) fields.get(i).getFieldObjectInspector())
- .getPrimitiveJavaObject(values.get(i)).getZonedDateTime().toInstant().toEpochMilli();
- break;
- case BYTE:
- res = ((ByteObjectInspector) fields.get(i).getFieldObjectInspector()).get(values.get(i));
- break;
- case SHORT:
- res = ((ShortObjectInspector) fields.get(i).getFieldObjectInspector()).get(values.get(i));
- break;
- case INT:
- res = ((IntObjectInspector) fields.get(i).getFieldObjectInspector()).get(values.get(i));
- break;
- case LONG:
- res = ((LongObjectInspector) fields.get(i).getFieldObjectInspector()).get(values.get(i));
- break;
- case FLOAT:
- res = ((FloatObjectInspector) fields.get(i).getFieldObjectInspector()).get(values.get(i));
- break;
- case DOUBLE:
- res = ((DoubleObjectInspector) fields.get(i).getFieldObjectInspector())
- .get(values.get(i));
- break;
- case CHAR:
- res = ((HiveCharObjectInspector) fields.get(i).getFieldObjectInspector())
- .getPrimitiveJavaObject(values.get(i)).getValue();
- break;
- case VARCHAR:
- res = ((HiveVarcharObjectInspector) fields.get(i).getFieldObjectInspector())
- .getPrimitiveJavaObject(values.get(i)).getValue();
- break;
- case STRING:
- res = ((StringObjectInspector) fields.get(i).getFieldObjectInspector())
- .getPrimitiveJavaObject(values.get(i));
- break;
- case BOOLEAN:
- res = ((BooleanObjectInspector) fields.get(i).getFieldObjectInspector())
- .get(values.get(i));
- break;
- default:
- throw new SerDeException("Unsupported type: " + types[i].getPrimitiveCategory());
+ case TIMESTAMP:
+ res = ((TimestampObjectInspector) fields.get(i).getFieldObjectInspector()).getPrimitiveJavaObject(values.get(i))
+ .getTime();
+ break;
+ case TIMESTAMPLOCALTZ:
+ res = ((TimestampLocalTZObjectInspector) fields.get(i).getFieldObjectInspector())
+ .getPrimitiveJavaObject(values.get(i)).getZonedDateTime().toInstant().toEpochMilli();
+ break;
+ case BYTE:
+ res = ((ByteObjectInspector) fields.get(i).getFieldObjectInspector()).get(values.get(i));
+ break;
+ case SHORT:
+ res = ((ShortObjectInspector) fields.get(i).getFieldObjectInspector()).get(values.get(i));
+ break;
+ case INT:
+ res = ((IntObjectInspector) fields.get(i).getFieldObjectInspector()).get(values.get(i));
+ break;
+ case LONG:
+ res = ((LongObjectInspector) fields.get(i).getFieldObjectInspector()).get(values.get(i));
+ break;
+ case FLOAT:
+ res = ((FloatObjectInspector) fields.get(i).getFieldObjectInspector()).get(values.get(i));
+ break;
+ case DOUBLE:
+ res = ((DoubleObjectInspector) fields.get(i).getFieldObjectInspector()).get(values.get(i));
+ break;
+ case CHAR:
+ res = ((HiveCharObjectInspector) fields.get(i).getFieldObjectInspector()).getPrimitiveJavaObject(values.get(i))
+ .getValue();
+ break;
+ case VARCHAR:
+ res =
+ ((HiveVarcharObjectInspector) fields.get(i).getFieldObjectInspector()).getPrimitiveJavaObject(values.get(i))
+ .getValue();
+ break;
+ case STRING:
+ res = ((StringObjectInspector) fields.get(i).getFieldObjectInspector()).getPrimitiveJavaObject(values.get(i));
+ break;
+ case BOOLEAN:
+ res = ((BooleanObjectInspector) fields.get(i).getFieldObjectInspector()).get(values.get(i));
+ break;
+ default:
+ throw new SerDeException("Unsupported type: " + types[i].getPrimitiveCategory());
}
value.put(columns[i], res);
}
@@ -370,38 +356,34 @@ public class DruidSerDe extends AbstractSerDe {
// First Segment Granularity has to be here.
final int granularityFieldIndex = columns.length;
assert values.size() > granularityFieldIndex;
- Preconditions.checkArgument(fields.get(granularityFieldIndex).getFieldName()
- .equals(Constants.DRUID_TIMESTAMP_GRANULARITY_COL_NAME));
+ Preconditions.checkArgument(
+ fields.get(granularityFieldIndex).getFieldName().equals(Constants.DRUID_TIMESTAMP_GRANULARITY_COL_NAME));
value.put(Constants.DRUID_TIMESTAMP_GRANULARITY_COL_NAME,
- ((TimestampObjectInspector) fields.get(granularityFieldIndex).getFieldObjectInspector())
- .getPrimitiveJavaObject(values.get(granularityFieldIndex)).getTime()
+ ((TimestampObjectInspector) fields.get(granularityFieldIndex).getFieldObjectInspector())
+ .getPrimitiveJavaObject(values.get(granularityFieldIndex)).getTime()
);
if (values.size() == columns.length + 2) {
// Then partition number if any.
final int partitionNumPos = granularityFieldIndex + 1;
- Preconditions.checkArgument(
- fields.get(partitionNumPos).getFieldName().equals(Constants.DRUID_SHARD_KEY_COL_NAME),
+ Preconditions.checkArgument(fields.get(partitionNumPos).getFieldName().equals(Constants.DRUID_SHARD_KEY_COL_NAME),
String.format("expecting to encounter %s but was %s", Constants.DRUID_SHARD_KEY_COL_NAME,
fields.get(partitionNumPos).getFieldName()
)
);
value.put(Constants.DRUID_SHARD_KEY_COL_NAME,
- ((LongObjectInspector) fields.get(partitionNumPos).getFieldObjectInspector())
- .get(values.get(partitionNumPos))
+ ((LongObjectInspector) fields.get(partitionNumPos).getFieldObjectInspector()).get(values.get(partitionNumPos))
);
}
return new DruidWritable(value);
}
- @Override
- public SerDeStats getSerDeStats() {
+ @Override public SerDeStats getSerDeStats() {
// no support for statistics
return null;
}
- @Override
- public Object deserialize(Writable writable) throws SerDeException {
+ @Override public Object deserialize(Writable writable) throws SerDeException {
final DruidWritable input = (DruidWritable) writable;
final List<Object> output = Lists.newArrayListWithExpectedSize(columns.length);
for (int i = 0; i < columns.length; i++) {
@@ -411,72 +393,89 @@ public class DruidSerDe extends AbstractSerDe {
continue;
}
switch (types[i].getPrimitiveCategory()) {
- case TIMESTAMP:
- output.add(new TimestampWritable(Timestamp.valueOf(ZonedDateTime
- .ofInstant(Instant.ofEpochMilli(((Number) value).longValue()),
- tsTZTypeInfo.timeZone()
- ).format(DateTimeFormatter.ofPattern("yyyy-MM-dd HH:mm:ss")).toString())));
- break;
- case TIMESTAMPLOCALTZ:
- output.add(
- new TimestampLocalTZWritable(
- new TimestampTZ(
- ZonedDateTime.ofInstant(
- Instant.ofEpochMilli(((Number) value).longValue()),
- ((TimestampLocalTZTypeInfo) types[i]).timeZone()))));
- break;
- case BYTE:
- output.add(new ByteWritable(((Number) value).byteValue()));
- break;
- case SHORT:
- output.add(new ShortWritable(((Number) value).shortValue()));
- break;
- case INT:
+ case TIMESTAMP:
+ if (value instanceof Number) {
+ output.add(new TimestampWritable(Timestamp.valueOf(
+ ZonedDateTime.ofInstant(Instant.ofEpochMilli(((Number) value).longValue()), tsTZTypeInfo.timeZone())
+ .format(DateTimeFormatter.ofPattern(TIMESTAMP_FORMAT)))));
+ } else {
+ output.add(new TimestampWritable(Timestamp.valueOf((String) value)));
+ }
+
+ break;
+ case TIMESTAMPLOCALTZ:
+ final long numberOfMillis;
+ if (value instanceof Number) {
+ numberOfMillis = ((Number) value).longValue();
+ } else {
+ // it is an extraction fn need to be parsed
+ numberOfMillis = dateOptionalTimeParser().parseDateTime((String) value).getMillis();
+ }
+ output.add(new TimestampLocalTZWritable(new TimestampTZ(ZonedDateTime
+ .ofInstant(Instant.ofEpochMilli(numberOfMillis),
+ ((TimestampLocalTZTypeInfo) types[i]).timeZone()
+ ))));
+ break;
+ case DATE:
+ final DateWritable dateWritable;
+ if (value instanceof Number) {
+ dateWritable = new DateWritable(new Date((((Number) value).longValue())));
+ } else {
+ // it is an extraction fn need to be parsed
+ dateWritable = new DateWritable(new Date(dateOptionalTimeParser().parseDateTime((String) value).getMillis()));
+ }
+ output.add(dateWritable);
+ break;
+ case BYTE:
+ output.add(new ByteWritable(((Number) value).byteValue()));
+ break;
+ case SHORT:
+ output.add(new ShortWritable(((Number) value).shortValue()));
+ break;
+ case INT:
+ if (value instanceof Number) {
output.add(new IntWritable(((Number) value).intValue()));
- break;
- case LONG:
- output.add(new LongWritable(((Number) value).longValue()));
- break;
- case FLOAT:
- output.add(new FloatWritable(((Number) value).floatValue()));
- break;
- case DOUBLE:
- output.add(new DoubleWritable(((Number) value).doubleValue()));
- break;
- case CHAR:
- output.add(
- new HiveCharWritable(
- new HiveChar(
- value.toString(),
- ((CharTypeInfo) types[i]).getLength())));
- break;
- case VARCHAR:
- output.add(
- new HiveVarcharWritable(
- new HiveVarchar(
- value.toString(),
- ((VarcharTypeInfo) types[i]).getLength())));
- break;
- case STRING:
- output.add(new Text(value.toString()));
- break;
- case BOOLEAN:
- output.add(new BooleanWritable(Boolean.valueOf(value.toString())));
- break;
- default:
- throw new SerDeException("Unknown type: " + types[i].getPrimitiveCategory());
+ } else {
+ // This is a corner case where we have an extract of time unit like day/month pushed as Extraction Fn
+ //@TODO The best way to fix this is to add explicit output Druid types to Calcite Extraction Functions impls
+ output.add(new IntWritable(Integer.valueOf((String) value)));
+ }
+
+ break;
+ case LONG:
+ output.add(new LongWritable(((Number) value).longValue()));
+ break;
+ case FLOAT:
+ output.add(new FloatWritable(((Number) value).floatValue()));
+ break;
+ case DOUBLE:
+ output.add(new DoubleWritable(((Number) value).doubleValue()));
+ break;
+ case CHAR:
+ output.add(new HiveCharWritable(new HiveChar(value.toString(), ((CharTypeInfo) types[i]).getLength())));
+ break;
+ case VARCHAR:
+ output
+ .add(new HiveVarcharWritable(new HiveVarchar(value.toString(), ((VarcharTypeInfo) types[i]).getLength())));
+ break;
+ case STRING:
+ output.add(new Text(value.toString()));
+ break;
+ case BOOLEAN:
+ output.add(new BooleanWritable(Boolean.valueOf(value.toString())));
+ break;
+ default:
+ throw new SerDeException("Unknown type: " + types[i].getPrimitiveCategory());
}
}
return output;
}
- @Override
- public ObjectInspector getObjectInspector() {
+ @Override public ObjectInspector getObjectInspector() {
return inspector;
}
- @Override
- public boolean shouldStoreFieldsInMetastore(Map<String, String> tableParams) {
+ @Override public boolean shouldStoreFieldsInMetastore(Map<String, String> tableParams) {
// If Druid table is not an external table store the schema in metadata store.
return !MetaStoreUtils.isExternal(tableParams);
}
http://git-wip-us.apache.org/repos/asf/hive/blob/15bf4eb9/druid-handler/src/java/org/apache/hadoop/hive/druid/serde/DruidSerDeUtils.java
----------------------------------------------------------------------
diff --git a/druid-handler/src/java/org/apache/hadoop/hive/druid/serde/DruidSerDeUtils.java b/druid-handler/src/java/org/apache/hadoop/hive/druid/serde/DruidSerDeUtils.java
index 1c34e41..630e097 100644
--- a/druid-handler/src/java/org/apache/hadoop/hive/druid/serde/DruidSerDeUtils.java
+++ b/druid-handler/src/java/org/apache/hadoop/hive/druid/serde/DruidSerDeUtils.java
@@ -22,10 +22,6 @@ import org.apache.hadoop.hive.serde2.typeinfo.TypeInfoFactory;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
-import io.druid.query.dimension.DimensionSpec;
-import io.druid.query.dimension.ExtractionDimensionSpec;
-import io.druid.query.extraction.TimeFormatExtractionFn;
-
/**
* Utils class for Druid SerDe.
*/
@@ -34,6 +30,7 @@ public final class DruidSerDeUtils {
private static final Logger LOG = LoggerFactory.getLogger(DruidSerDeUtils.class);
protected static final String ISO_TIME_FORMAT = "yyyy-MM-dd'T'HH:mm:ss.SSS'Z'";
+ protected static final String TIMESTAMP_FORMAT = "yyyy-MM-dd HH:mm:ss";
protected static final String FLOAT_TYPE = "FLOAT";
protected static final String DOUBLE_TYPE = "DOUBLE";
http://git-wip-us.apache.org/repos/asf/hive/blob/15bf4eb9/ql/src/test/queries/clientpositive/druidmini_extractTime.q
----------------------------------------------------------------------
diff --git a/ql/src/test/queries/clientpositive/druidmini_extractTime.q b/ql/src/test/queries/clientpositive/druidmini_extractTime.q
index 2f7129e..429f796 100644
--- a/ql/src/test/queries/clientpositive/druidmini_extractTime.q
+++ b/ql/src/test/queries/clientpositive/druidmini_extractTime.q
@@ -1,3 +1,5 @@
+--! qt:dataset:alltypesorc
+
SET hive.vectorized.execution.enabled=false;
CREATE TABLE druid_table
STORED BY 'org.apache.hadoop.hive.druid.DruidStorageHandler'
@@ -160,5 +162,18 @@ AND CAST(EXTRACT(YEAR from `__time`) as STRING) = '1969' LIMIT 1;
SELECT EXTRACT(YEAR from `__time`), SUBSTRING(CAST(CAST(`__time` AS DATE) AS STRING), 1, 4) as year_str FROM druid_table WHERE EXTRACT(YEAR from `__time`) >= 1969
AND CAST(EXTRACT(YEAR from `__time`) as STRING) = '1969' LIMIT 1;
+-- Cast to Timestamp
+
+explain SELECT CAST(`__time` AS TIMESTAMP) AS `x_time`, SUM(cfloat) FROM druid_table GROUP BY CAST(`__time` AS TIMESTAMP) ORDER BY `x_time` LIMIT 5;
+
+SELECT CAST(`__time` AS TIMESTAMP) AS `x_time`, SUM(cfloat) FROM druid_table GROUP BY CAST(`__time` AS TIMESTAMP) ORDER BY `x_time` LIMIT 5;
+
+-- Cast to Date
+
+explain SELECT CAST(`__time` AS DATE) AS `x_date`, SUM(cfloat) FROM druid_table GROUP BY CAST(`__time` AS DATE) ORDER BY `x_date` LIMIT 5;
+
+SELECT CAST(`__time` AS DATE) AS `x_date`, SUM(cfloat) FROM druid_table GROUP BY CAST(`__time` AS DATE) ORDER BY `x_date` LIMIT 5;
+
+SELECT CAST(`__time` AS DATE) AS `x_date` FROM druid_table ORDER BY `x_date` LIMIT 5;
DROP TABLE druid_table;
\ No newline at end of file
http://git-wip-us.apache.org/repos/asf/hive/blob/15bf4eb9/ql/src/test/results/clientpositive/druid/druidmini_extractTime.q.out
----------------------------------------------------------------------
diff --git a/ql/src/test/results/clientpositive/druid/druidmini_extractTime.q.out b/ql/src/test/results/clientpositive/druid/druidmini_extractTime.q.out
index c3679a3..a1b1d24 100644
--- a/ql/src/test/results/clientpositive/druid/druidmini_extractTime.q.out
+++ b/ql/src/test/results/clientpositive/druid/druidmini_extractTime.q.out
@@ -1009,6 +1009,86 @@ POSTHOOK: type: QUERY
POSTHOOK: Input: default@druid_table
POSTHOOK: Output: hdfs://### HDFS PATH ###
1969 1969
+PREHOOK: query: explain SELECT CAST(`__time` AS TIMESTAMP) AS `x_time`, SUM(cfloat) FROM druid_table GROUP BY CAST(`__time` AS TIMESTAMP) ORDER BY `x_time` LIMIT 5
+PREHOOK: type: QUERY
+POSTHOOK: query: explain SELECT CAST(`__time` AS TIMESTAMP) AS `x_time`, SUM(cfloat) FROM druid_table GROUP BY CAST(`__time` AS TIMESTAMP) ORDER BY `x_time` LIMIT 5
+POSTHOOK: type: QUERY
+STAGE DEPENDENCIES:
+ Stage-0 is a root stage
+
+STAGE PLANS:
+ Stage: Stage-0
+ Fetch Operator
+ limit: -1
+ Processor Tree:
+ TableScan
+ alias: druid_table
+ properties:
+ druid.fieldNames extract,$f1
+ druid.fieldTypes timestamp,double
+ druid.query.json {"queryType":"groupBy","dataSource":"default.druid_table","granularity":"all","dimensions":[{"type":"extraction","dimension":"__time","outputName":"extract","extractionFn":{"type":"timeFormat","format":"yyyy-MM-dd HH:mm:ss","timeZone":"US/Pacific","locale":"en"}}],"limitSpec":{"type":"default","limit":5,"columns":[{"dimension":"extract","direction":"ascending","dimensionOrder":"lexicographic"}]},"aggregations":[{"type":"doubleSum","name":"$f1","fieldName":"cfloat"}],"intervals":["1900-01-01T00:00:00.000Z/3000-01-01T00:00:00.000Z"]}
+ druid.query.type groupBy
+ Select Operator
+ expressions: extract (type: timestamp), $f1 (type: double)
+ outputColumnNames: _col0, _col1
+ ListSink
+
+PREHOOK: query: SELECT CAST(`__time` AS TIMESTAMP) AS `x_time`, SUM(cfloat) FROM druid_table GROUP BY CAST(`__time` AS TIMESTAMP) ORDER BY `x_time` LIMIT 5
+PREHOOK: type: QUERY
+PREHOOK: Input: default@druid_table
+PREHOOK: Output: hdfs://### HDFS PATH ###
+POSTHOOK: query: SELECT CAST(`__time` AS TIMESTAMP) AS `x_time`, SUM(cfloat) FROM druid_table GROUP BY CAST(`__time` AS TIMESTAMP) ORDER BY `x_time` LIMIT 5
+POSTHOOK: type: QUERY
+POSTHOOK: Input: default@druid_table
+POSTHOOK: Output: hdfs://### HDFS PATH ###
+1969-12-31 15:59:00 -4532.569952011108
+1969-12-31 16:00:00 -35057.67698967457
+PREHOOK: query: explain SELECT CAST(`__time` AS DATE) AS `x_date`, SUM(cfloat) FROM druid_table GROUP BY CAST(`__time` AS DATE) ORDER BY `x_date` LIMIT 5
+PREHOOK: type: QUERY
+POSTHOOK: query: explain SELECT CAST(`__time` AS DATE) AS `x_date`, SUM(cfloat) FROM druid_table GROUP BY CAST(`__time` AS DATE) ORDER BY `x_date` LIMIT 5
+POSTHOOK: type: QUERY
+STAGE DEPENDENCIES:
+ Stage-0 is a root stage
+
+STAGE PLANS:
+ Stage: Stage-0
+ Fetch Operator
+ limit: -1
+ Processor Tree:
+ TableScan
+ alias: druid_table
+ properties:
+ druid.fieldNames vc,$f1
+ druid.fieldTypes date,double
+ druid.query.json {"queryType":"groupBy","dataSource":"default.druid_table","granularity":"all","dimensions":[{"type":"default","dimension":"vc","outputName":"vc","outputType":"LONG"}],"virtualColumns":[{"type":"expression","name":"vc","expression":"timestamp_floor(\"__time\",'P1D','','US/Pacific')","outputType":"LONG"}],"limitSpec":{"type":"default","limit":5,"columns":[{"dimension":"vc","direction":"ascending","dimensionOrder":"lexicographic"}]},"aggregations":[{"type":"doubleSum","name":"$f1","fieldName":"cfloat"}],"intervals":["1900-01-01T00:00:00.000Z/3000-01-01T00:00:00.000Z"]}
+ druid.query.type groupBy
+ Select Operator
+ expressions: vc (type: date), $f1 (type: double)
+ outputColumnNames: _col0, _col1
+ ListSink
+
+PREHOOK: query: SELECT CAST(`__time` AS DATE) AS `x_date`, SUM(cfloat) FROM druid_table GROUP BY CAST(`__time` AS DATE) ORDER BY `x_date` LIMIT 5
+PREHOOK: type: QUERY
+PREHOOK: Input: default@druid_table
+PREHOOK: Output: hdfs://### HDFS PATH ###
+POSTHOOK: query: SELECT CAST(`__time` AS DATE) AS `x_date`, SUM(cfloat) FROM druid_table GROUP BY CAST(`__time` AS DATE) ORDER BY `x_date` LIMIT 5
+POSTHOOK: type: QUERY
+POSTHOOK: Input: default@druid_table
+POSTHOOK: Output: hdfs://### HDFS PATH ###
+1969-12-31 -39590.24694168568
+PREHOOK: query: SELECT CAST(`__time` AS DATE) AS `x_date` FROM druid_table ORDER BY `x_date` LIMIT 5
+PREHOOK: type: QUERY
+PREHOOK: Input: default@druid_table
+PREHOOK: Output: hdfs://### HDFS PATH ###
+POSTHOOK: query: SELECT CAST(`__time` AS DATE) AS `x_date` FROM druid_table ORDER BY `x_date` LIMIT 5
+POSTHOOK: type: QUERY
+POSTHOOK: Input: default@druid_table
+POSTHOOK: Output: hdfs://### HDFS PATH ###
+1969-12-31
+1969-12-31
+1969-12-31
+1969-12-31
+1969-12-31
PREHOOK: query: DROP TABLE druid_table
PREHOOK: type: DROPTABLE
PREHOOK: Input: default@druid_table