You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@hive.apache.org by ha...@apache.org on 2018/05/30 18:40:33 UTC

hive git commit: HIVE-19675 : Cast to timestamps on Druid time column leads to an exception (Slim B via Ashutosh Chauhan)

Repository: hive
Updated Branches:
  refs/heads/master 16e6b837d -> 15bf4eb98


HIVE-19675 : Cast to timestamps on Druid time column leads to an exception (Slim B via Ashutosh Chauhan)

Signed-off-by: Ashutosh Chauhan <ha...@apache.org>


Project: http://git-wip-us.apache.org/repos/asf/hive/repo
Commit: http://git-wip-us.apache.org/repos/asf/hive/commit/15bf4eb9
Tree: http://git-wip-us.apache.org/repos/asf/hive/tree/15bf4eb9
Diff: http://git-wip-us.apache.org/repos/asf/hive/diff/15bf4eb9

Branch: refs/heads/master
Commit: 15bf4eb989ed0c69fb4fb76f0e6190e38f22ab80
Parents: 16e6b83
Author: Slim Bouguerra <sl...@gmail.com>
Authored: Wed May 30 11:40:02 2018 -0700
Committer: Ashutosh Chauhan <ha...@apache.org>
Committed: Wed May 30 11:40:02 2018 -0700

----------------------------------------------------------------------
 .../serde/DruidGroupByQueryRecordReader.java    |  77 +----
 .../hadoop/hive/druid/serde/DruidSerDe.java     | 331 +++++++++----------
 .../hive/druid/serde/DruidSerDeUtils.java       |   5 +-
 .../clientpositive/druidmini_extractTime.q      |  15 +
 .../druid/druidmini_extractTime.q.out           |  80 +++++
 5 files changed, 272 insertions(+), 236 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/hive/blob/15bf4eb9/druid-handler/src/java/org/apache/hadoop/hive/druid/serde/DruidGroupByQueryRecordReader.java
----------------------------------------------------------------------
diff --git a/druid-handler/src/java/org/apache/hadoop/hive/druid/serde/DruidGroupByQueryRecordReader.java b/druid-handler/src/java/org/apache/hadoop/hive/druid/serde/DruidGroupByQueryRecordReader.java
index 00a4b72..82c6653 100644
--- a/druid-handler/src/java/org/apache/hadoop/hive/druid/serde/DruidGroupByQueryRecordReader.java
+++ b/druid-handler/src/java/org/apache/hadoop/hive/druid/serde/DruidGroupByQueryRecordReader.java
@@ -20,113 +20,60 @@ package org.apache.hadoop.hive.druid.serde;
 import com.fasterxml.jackson.core.type.TypeReference;
 import com.fasterxml.jackson.databind.JavaType;
 import com.fasterxml.jackson.databind.ObjectMapper;
-import com.google.common.collect.Lists;
-import com.google.common.collect.Maps;
 import com.metamx.http.client.HttpClient;
 import io.druid.data.input.MapBasedRow;
 import io.druid.data.input.Row;
-import io.druid.query.dimension.DimensionSpec;
-import io.druid.query.dimension.ExtractionDimensionSpec;
-import io.druid.query.extraction.TimeFormatExtractionFn;
 import io.druid.query.groupby.GroupByQuery;
 import org.apache.hadoop.conf.Configuration;
 import org.apache.hadoop.hive.druid.DruidStorageHandlerUtils;
 import org.apache.hadoop.io.NullWritable;
 import org.apache.hadoop.mapreduce.InputSplit;
-import org.joda.time.format.ISODateTimeFormat;
 
 import java.io.IOException;
-import java.util.List;
 import java.util.Map;
-import java.util.stream.Collectors;
-
-import static org.apache.hadoop.hive.druid.serde.DruidSerDeUtils.ISO_TIME_FORMAT;
 
 /**
  * Record reader for results for Druid GroupByQuery.
  */
-public class DruidGroupByQueryRecordReader
-        extends DruidQueryRecordReader<GroupByQuery, Row> {
+public class DruidGroupByQueryRecordReader extends DruidQueryRecordReader<GroupByQuery, Row> {
   private final static TypeReference<Row> TYPE_REFERENCE = new TypeReference<Row>() {
   };
 
   private MapBasedRow currentRow;
   private Map<String, Object> currentEvent;
 
-  private List<String> timeExtractionFields = Lists.newArrayList();
-  private List<String> intFormattedTimeExtractionFields = Lists.newArrayList();
-
-  @Override
-  public void initialize(InputSplit split, Configuration conf) throws IOException {
+  @Override public void initialize(InputSplit split, Configuration conf) throws IOException {
     super.initialize(split, conf);
-    initDimensionTypes();
   }
 
-  @Override
-  public void initialize(InputSplit split, Configuration conf, ObjectMapper mapper,
-          ObjectMapper smileMapper, HttpClient httpClient
+  @Override public void initialize(InputSplit split, Configuration conf, ObjectMapper mapper, ObjectMapper smileMapper,
+      HttpClient httpClient
   ) throws IOException {
     super.initialize(split, conf, mapper, smileMapper, httpClient);
-    initDimensionTypes();
   }
 
-  @Override
-  protected JavaType getResultTypeDef() {
+  @Override protected JavaType getResultTypeDef() {
     return DruidStorageHandlerUtils.JSON_MAPPER.getTypeFactory().constructType(TYPE_REFERENCE);
   }
 
-  private void initDimensionTypes() throws IOException {
-    //@TODO move this out of here to org.apache.hadoop.hive.druid.serde.DruidSerDe
-    List<DimensionSpec> dimensionSpecList = ((GroupByQuery) query).getDimensions();
-    List<DimensionSpec> extractionDimensionSpecList = dimensionSpecList.stream()
-            .filter(dimensionSpecs -> dimensionSpecs instanceof ExtractionDimensionSpec)
-            .collect(Collectors.toList());
-    extractionDimensionSpecList.stream().forEach(dimensionSpec -> {
-      ExtractionDimensionSpec extractionDimensionSpec = (ExtractionDimensionSpec) dimensionSpec;
-      if (extractionDimensionSpec.getExtractionFn() instanceof TimeFormatExtractionFn) {
-        final TimeFormatExtractionFn timeFormatExtractionFn = (TimeFormatExtractionFn) extractionDimensionSpec
-                .getExtractionFn();
-        if (timeFormatExtractionFn  == null || timeFormatExtractionFn.getFormat().equals(ISO_TIME_FORMAT)) {
-          timeExtractionFields.add(extractionDimensionSpec.getOutputName());
-        } else {
-          intFormattedTimeExtractionFields.add(extractionDimensionSpec.getOutputName());
-        }
-      }
-    });
-  }
-
-  @Override
-  public boolean nextKeyValue() {
+  @Override public boolean nextKeyValue() {
     // Results
 
     if (queryResultsIterator.hasNext()) {
       final Row row = queryResultsIterator.next();
       // currently druid supports only MapBasedRow as Jackson SerDe so it should safe to cast without check
       currentRow = (MapBasedRow) row;
-      //@TODO move this out of here to org.apache.hadoop.hive.druid.serde.DruidSerDe
-      currentEvent = Maps.transformEntries(currentRow.getEvent(),
-              (key, value1) -> {
-                if (timeExtractionFields.contains(key)) {
-                  return ISODateTimeFormat.dateTimeParser().parseMillis((String) value1);
-                }
-                if (intFormattedTimeExtractionFields.contains(key)) {
-                  return Integer.valueOf((String) value1);
-                }
-                return value1;
-              }
-      );
+      currentEvent = currentRow.getEvent();
       return true;
     }
     return false;
   }
 
-  @Override
-  public NullWritable getCurrentKey() throws IOException, InterruptedException {
+  @Override public NullWritable getCurrentKey() throws IOException, InterruptedException {
     return NullWritable.get();
   }
 
-  @Override
-  public DruidWritable getCurrentValue() throws IOException, InterruptedException {
+  @Override public DruidWritable getCurrentValue() throws IOException, InterruptedException {
     // Create new value
     DruidWritable value = new DruidWritable();
     // 1) The timestamp column
@@ -138,8 +85,7 @@ public class DruidGroupByQueryRecordReader
     return value;
   }
 
-  @Override
-  public boolean next(NullWritable key, DruidWritable value) {
+  @Override public boolean next(NullWritable key, DruidWritable value) {
     if (nextKeyValue()) {
       // Update value
       value.getValue().clear();
@@ -154,8 +100,7 @@ public class DruidGroupByQueryRecordReader
     return false;
   }
 
-  @Override
-  public float getProgress() throws IOException {
+  @Override public float getProgress() throws IOException {
     return queryResultsIterator.hasNext() ? 0 : 1;
   }
 

http://git-wip-us.apache.org/repos/asf/hive/blob/15bf4eb9/druid-handler/src/java/org/apache/hadoop/hive/druid/serde/DruidSerDe.java
----------------------------------------------------------------------
diff --git a/druid-handler/src/java/org/apache/hadoop/hive/druid/serde/DruidSerDe.java b/druid-handler/src/java/org/apache/hadoop/hive/druid/serde/DruidSerDe.java
index 5f76579..842a9fa 100644
--- a/druid-handler/src/java/org/apache/hadoop/hive/druid/serde/DruidSerDe.java
+++ b/druid-handler/src/java/org/apache/hadoop/hive/druid/serde/DruidSerDe.java
@@ -42,6 +42,7 @@ import org.apache.hadoop.hive.serde2.SerDeException;
 import org.apache.hadoop.hive.serde2.SerDeSpec;
 import org.apache.hadoop.hive.serde2.SerDeStats;
 import org.apache.hadoop.hive.serde2.io.ByteWritable;
+import org.apache.hadoop.hive.serde2.io.DateWritable;
 import org.apache.hadoop.hive.serde2.io.DoubleWritable;
 import org.apache.hadoop.hive.serde2.io.HiveCharWritable;
 import org.apache.hadoop.hive.serde2.io.HiveVarcharWritable;
@@ -83,6 +84,7 @@ import org.slf4j.LoggerFactory;
 
 import java.io.IOException;
 import java.io.InputStream;
+import java.sql.Date;
 import java.sql.Timestamp;
 import java.time.Instant;
 import java.time.ZonedDateTime;
@@ -97,11 +99,13 @@ import java.util.Map.Entry;
 import java.util.Properties;
 import java.util.stream.Collectors;
 
+import static org.apache.hadoop.hive.druid.serde.DruidSerDeUtils.TIMESTAMP_FORMAT;
+import static org.joda.time.format.ISODateTimeFormat.dateOptionalTimeParser;
+
 /**
  * DruidSerDe that is used to  deserialize objects from a Druid data source.
  */
-@SerDeSpec(schemaProps = { Constants.DRUID_DATA_SOURCE })
-public class DruidSerDe extends AbstractSerDe {
+@SerDeSpec(schemaProps = { Constants.DRUID_DATA_SOURCE }) public class DruidSerDe extends AbstractSerDe {
 
   protected static final Logger LOG = LoggerFactory.getLogger(DruidSerDe.class);
 
@@ -110,23 +114,19 @@ public class DruidSerDe extends AbstractSerDe {
   private ObjectInspector inspector;
   private TimestampLocalTZTypeInfo tsTZTypeInfo;
 
-  @Override
-  public void initialize(Configuration configuration, Properties properties) throws SerDeException {
+  @Override public void initialize(Configuration configuration, Properties properties) throws SerDeException {
 
-    tsTZTypeInfo = new TimestampLocalTZTypeInfo(
-          configuration.get(HiveConf.ConfVars.HIVE_LOCAL_TIME_ZONE.varname));
+    tsTZTypeInfo = new TimestampLocalTZTypeInfo(configuration.get(HiveConf.ConfVars.HIVE_LOCAL_TIME_ZONE.varname));
     // Druid query
     final String druidQuery = properties.getProperty(Constants.DRUID_QUERY_JSON, null);
-    if (druidQuery !=  null && !druidQuery.isEmpty()) {
+    if (druidQuery != null && !druidQuery.isEmpty()) {
       initFromDruidQueryPlan(properties, druidQuery);
     } else {
       // No query. Either it is a CTAS, or we need to create a Druid meta data Query
-      if (!org.apache.commons.lang3.StringUtils
-              .isEmpty(properties.getProperty(serdeConstants.LIST_COLUMNS))
-              && !org.apache.commons.lang3.StringUtils
-              .isEmpty(properties.getProperty(serdeConstants.LIST_COLUMN_TYPES))) {
+      if (!org.apache.commons.lang3.StringUtils.isEmpty(properties.getProperty(serdeConstants.LIST_COLUMNS))
+          && !org.apache.commons.lang3.StringUtils.isEmpty(properties.getProperty(serdeConstants.LIST_COLUMN_TYPES))) {
         // CASE CTAS statement
-       initFromProperties(properties);
+        initFromProperties(properties);
       } else {
         // Segment Metadata query that retrieves all columns present in
         // the data source (dimensions and metrics).
@@ -134,8 +134,8 @@ public class DruidSerDe extends AbstractSerDe {
       }
     }
     if (LOG.isDebugEnabled()) {
-      LOG.debug("DruidSerDe initialized with\n" + "\t columns: " + Arrays.toString(columns)
-              + "\n\t types: " + Arrays.toString(types));
+      LOG.debug("DruidSerDe initialized with\n" + "\t columns: " + Arrays.toString(columns) + "\n\t types: " + Arrays
+          .toString(types));
     }
   }
 
@@ -147,8 +147,8 @@ public class DruidSerDe extends AbstractSerDe {
 
     String dataSource = properties.getProperty(Constants.DRUID_DATA_SOURCE);
     if (dataSource == null) {
-      throw new SerDeException("Druid data source not specified; use " +
-          Constants.DRUID_DATA_SOURCE + " in table properties");
+      throw new SerDeException(
+          "Druid data source not specified; use " + Constants.DRUID_DATA_SOURCE + " in table properties");
     }
     SegmentMetadataQueryBuilder builder = new Druids.SegmentMetadataQueryBuilder();
     builder.dataSource(dataSource);
@@ -157,9 +157,7 @@ public class DruidSerDe extends AbstractSerDe {
     SegmentMetadataQuery query = builder.build();
 
     // Execute query in Druid
-    String address = HiveConf.getVar(configuration,
-        HiveConf.ConfVars.HIVE_DRUID_BROKER_DEFAULT_ADDRESS
-    );
+    String address = HiveConf.getVar(configuration, HiveConf.ConfVars.HIVE_DRUID_BROKER_DEFAULT_ADDRESS);
     if (org.apache.commons.lang3.StringUtils.isEmpty(address)) {
       throw new SerDeException("Druid broker address not specified in configuration");
     }
@@ -176,33 +174,29 @@ public class DruidSerDe extends AbstractSerDe {
         columnNames.add(columnInfo.getKey()); // field name
         PrimitiveTypeInfo type = tsTZTypeInfo; // field type
         columnTypes.add(type);
-        inspectors
-            .add(PrimitiveObjectInspectorFactory.getPrimitiveWritableObjectInspector(type));
+        inspectors.add(PrimitiveObjectInspectorFactory.getPrimitiveWritableObjectInspector(type));
         continue;
       }
       columnNames.add(columnInfo.getKey()); // field name
-      PrimitiveTypeInfo type = DruidSerDeUtils.convertDruidToHiveType(
-          columnInfo.getValue().getType()); // field type
+      PrimitiveTypeInfo type = DruidSerDeUtils.convertDruidToHiveType(columnInfo.getValue().getType()); // field type
       columnTypes.add(type instanceof TimestampLocalTZTypeInfo ? tsTZTypeInfo : type);
       inspectors.add(PrimitiveObjectInspectorFactory.getPrimitiveWritableObjectInspector(type));
     }
     columns = columnNames.toArray(new String[columnNames.size()]);
     types = columnTypes.toArray(new PrimitiveTypeInfo[columnTypes.size()]);
-    inspector = ObjectInspectorFactory
-        .getStandardStructObjectInspector(columnNames, inspectors);
+    inspector = ObjectInspectorFactory.getStandardStructObjectInspector(columnNames, inspectors);
   }
 
-  private void initFromProperties(final Properties properties)
-      throws SerDeException {
+  private void initFromProperties(final Properties properties) throws SerDeException {
     final List<ObjectInspector> inspectors = new ArrayList<>();
     final List<String> columnNames = new ArrayList<>();
     final List<PrimitiveTypeInfo> columnTypes = new ArrayList<>();
 
     columnNames.addAll(Utilities.getColumnNames(properties));
     if (!columnNames.contains(DruidStorageHandlerUtils.DEFAULT_TIMESTAMP_COLUMN)) {
-      throw new SerDeException("Timestamp column (' " + DruidStorageHandlerUtils.DEFAULT_TIMESTAMP_COLUMN +
-          "') not specified in create table; list of columns is : " +
-          properties.getProperty(serdeConstants.LIST_COLUMNS));
+      throw new SerDeException("Timestamp column (' " + DruidStorageHandlerUtils.DEFAULT_TIMESTAMP_COLUMN
+          + "') not specified in create table; list of columns is : " + properties
+          .getProperty(serdeConstants.LIST_COLUMNS));
     }
     columnTypes.addAll(Lists.transform(
         Lists.transform(Utilities.getColumnTypes(properties), type -> TypeInfoFactory.getPrimitiveTypeInfo(type)),
@@ -214,8 +208,7 @@ public class DruidSerDe extends AbstractSerDe {
     ));
     columns = columnNames.toArray(new String[columnNames.size()]);
     types = columnTypes.toArray(new PrimitiveTypeInfo[columnTypes.size()]);
-    inspector = ObjectInspectorFactory
-        .getStandardStructObjectInspector(columnNames, inspectors);
+    inspector = ObjectInspectorFactory.getStandardStructObjectInspector(columnNames, inspectors);
   }
 
   private void initFromDruidQueryPlan(Properties properties, String druidQuery) {
@@ -229,16 +222,16 @@ public class DruidSerDe extends AbstractSerDe {
         Preconditions.checkNotNull(properties.getProperty(Constants.DRUID_QUERY_FIELD_TYPES, null));
     if (fieldNamesProperty.isEmpty()) {
       // this might seem counter intuitive but some queries like query
-      // SELECT YEAR(Calcs.date0) AS yr_date0_ok FROM druid_tableau.calcs Calcs WHERE (YEAR(Calcs.date0) IS NULL) LIMIT 1
-      // is planed in a way where we only push a filter down and keep the project of null as hive project. Thus empty columns
+      // SELECT YEAR(Calcs.date0) AS yr_date0_ok FROM druid_tableau.calcs Calcs WHERE (YEAR(Calcs.date0) IS NULL)
+      // LIMIT 1
+      // is planed in a way where we only push a filter down and keep the project of null as hive project. Thus empty
+      // columns
       columnNames = Collections.EMPTY_LIST;
       columnTypes = Collections.EMPTY_LIST;
     } else {
-      columnNames =
-          Arrays.stream(fieldNamesProperty.trim().split(",")).collect(Collectors.toList());
+      columnNames = Arrays.stream(fieldNamesProperty.trim().split(",")).collect(Collectors.toList());
       columnTypes = TypeInfoUtils.getTypeInfosFromTypeString(fieldTypesProperty).stream()
-          .map(e -> TypeInfoFactory.getPrimitiveTypeInfo(e.getTypeName()))
-          .map(primitiveTypeInfo -> {
+          .map(e -> TypeInfoFactory.getPrimitiveTypeInfo(e.getTypeName())).map(primitiveTypeInfo -> {
             if (primitiveTypeInfo instanceof TimestampLocalTZTypeInfo) {
               return tsTZTypeInfo;
             }
@@ -257,11 +250,11 @@ public class DruidSerDe extends AbstractSerDe {
 
   /* Submits the request and returns */
   protected SegmentAnalysis submitMetadataRequest(String address, SegmentMetadataQuery query)
-          throws SerDeException, IOException {
+      throws SerDeException, IOException {
     InputStream response;
     try {
       response = DruidStorageHandlerUtils.submitRequest(DruidStorageHandler.getHttpClient(),
-              DruidStorageHandlerUtils.createSmileRequest(address, query)
+          DruidStorageHandlerUtils.createSmileRequest(address, query)
       );
     } catch (Exception e) {
       throw new SerDeException(StringUtils.stringifyException(e));
@@ -272,10 +265,9 @@ public class DruidSerDe extends AbstractSerDe {
     try {
       // This will throw an exception in case of the response from druid is not an array
       // this case occurs if for instance druid query execution returns an exception instead of array of results.
-      resultsList = DruidStorageHandlerUtils.SMILE_MAPPER.readValue(response,
-              new TypeReference<List<SegmentAnalysis>>() {
-              }
-      );
+      resultsList =
+          DruidStorageHandlerUtils.SMILE_MAPPER.readValue(response, new TypeReference<List<SegmentAnalysis>>() {
+          });
     } catch (Exception e) {
       response.close();
       throw new SerDeException(StringUtils.stringifyException(e));
@@ -290,17 +282,14 @@ public class DruidSerDe extends AbstractSerDe {
     return resultsList.get(0);
   }
 
-  @Override
-  public Class<? extends Writable> getSerializedClass() {
+  @Override public Class<? extends Writable> getSerializedClass() {
     return DruidWritable.class;
   }
 
-  @Override
-  public Writable serialize(Object o, ObjectInspector objectInspector) throws SerDeException {
+  @Override public Writable serialize(Object o, ObjectInspector objectInspector) throws SerDeException {
     if (objectInspector.getCategory() != ObjectInspector.Category.STRUCT) {
-      throw new SerDeException(getClass().toString()
-              + " can only serialize struct types, but we got: "
-              + objectInspector.getTypeName());
+      throw new SerDeException(
+          getClass().toString() + " can only serialize struct types, but we got: " + objectInspector.getTypeName());
     }
 
     // Prepare the field ObjectInspectors
@@ -317,52 +306,49 @@ public class DruidSerDe extends AbstractSerDe {
       }
       final Object res;
       switch (types[i].getPrimitiveCategory()) {
-        case TIMESTAMP:
-        res = ((TimestampObjectInspector) fields.get(i).getFieldObjectInspector())
-            .getPrimitiveJavaObject(
-                values.get(i)).getTime();
-          break;
-        case TIMESTAMPLOCALTZ:
-          res = ((TimestampLocalTZObjectInspector) fields.get(i).getFieldObjectInspector())
-                  .getPrimitiveJavaObject(values.get(i)).getZonedDateTime().toInstant().toEpochMilli();
-          break;
-        case BYTE:
-          res = ((ByteObjectInspector) fields.get(i).getFieldObjectInspector()).get(values.get(i));
-          break;
-        case SHORT:
-          res = ((ShortObjectInspector) fields.get(i).getFieldObjectInspector()).get(values.get(i));
-          break;
-        case INT:
-          res = ((IntObjectInspector) fields.get(i).getFieldObjectInspector()).get(values.get(i));
-          break;
-        case LONG:
-          res = ((LongObjectInspector) fields.get(i).getFieldObjectInspector()).get(values.get(i));
-          break;
-        case FLOAT:
-          res = ((FloatObjectInspector) fields.get(i).getFieldObjectInspector()).get(values.get(i));
-          break;
-        case DOUBLE:
-          res = ((DoubleObjectInspector) fields.get(i).getFieldObjectInspector())
-                  .get(values.get(i));
-          break;
-        case CHAR:
-          res = ((HiveCharObjectInspector) fields.get(i).getFieldObjectInspector())
-                  .getPrimitiveJavaObject(values.get(i)).getValue();
-          break;
-        case VARCHAR:
-          res = ((HiveVarcharObjectInspector) fields.get(i).getFieldObjectInspector())
-                  .getPrimitiveJavaObject(values.get(i)).getValue();
-          break;
-        case STRING:
-          res = ((StringObjectInspector) fields.get(i).getFieldObjectInspector())
-                  .getPrimitiveJavaObject(values.get(i));
-          break;
-        case BOOLEAN:
-          res = ((BooleanObjectInspector) fields.get(i).getFieldObjectInspector())
-                  .get(values.get(i));
-          break;
-        default:
-          throw new SerDeException("Unsupported type: " + types[i].getPrimitiveCategory());
+      case TIMESTAMP:
+        res = ((TimestampObjectInspector) fields.get(i).getFieldObjectInspector()).getPrimitiveJavaObject(values.get(i))
+            .getTime();
+        break;
+      case TIMESTAMPLOCALTZ:
+        res = ((TimestampLocalTZObjectInspector) fields.get(i).getFieldObjectInspector())
+            .getPrimitiveJavaObject(values.get(i)).getZonedDateTime().toInstant().toEpochMilli();
+        break;
+      case BYTE:
+        res = ((ByteObjectInspector) fields.get(i).getFieldObjectInspector()).get(values.get(i));
+        break;
+      case SHORT:
+        res = ((ShortObjectInspector) fields.get(i).getFieldObjectInspector()).get(values.get(i));
+        break;
+      case INT:
+        res = ((IntObjectInspector) fields.get(i).getFieldObjectInspector()).get(values.get(i));
+        break;
+      case LONG:
+        res = ((LongObjectInspector) fields.get(i).getFieldObjectInspector()).get(values.get(i));
+        break;
+      case FLOAT:
+        res = ((FloatObjectInspector) fields.get(i).getFieldObjectInspector()).get(values.get(i));
+        break;
+      case DOUBLE:
+        res = ((DoubleObjectInspector) fields.get(i).getFieldObjectInspector()).get(values.get(i));
+        break;
+      case CHAR:
+        res = ((HiveCharObjectInspector) fields.get(i).getFieldObjectInspector()).getPrimitiveJavaObject(values.get(i))
+            .getValue();
+        break;
+      case VARCHAR:
+        res =
+            ((HiveVarcharObjectInspector) fields.get(i).getFieldObjectInspector()).getPrimitiveJavaObject(values.get(i))
+                .getValue();
+        break;
+      case STRING:
+        res = ((StringObjectInspector) fields.get(i).getFieldObjectInspector()).getPrimitiveJavaObject(values.get(i));
+        break;
+      case BOOLEAN:
+        res = ((BooleanObjectInspector) fields.get(i).getFieldObjectInspector()).get(values.get(i));
+        break;
+      default:
+        throw new SerDeException("Unsupported type: " + types[i].getPrimitiveCategory());
       }
       value.put(columns[i], res);
     }
@@ -370,38 +356,34 @@ public class DruidSerDe extends AbstractSerDe {
     // First Segment Granularity has to be here.
     final int granularityFieldIndex = columns.length;
     assert values.size() > granularityFieldIndex;
-    Preconditions.checkArgument(fields.get(granularityFieldIndex).getFieldName()
-        .equals(Constants.DRUID_TIMESTAMP_GRANULARITY_COL_NAME));
+    Preconditions.checkArgument(
+        fields.get(granularityFieldIndex).getFieldName().equals(Constants.DRUID_TIMESTAMP_GRANULARITY_COL_NAME));
     value.put(Constants.DRUID_TIMESTAMP_GRANULARITY_COL_NAME,
-            ((TimestampObjectInspector) fields.get(granularityFieldIndex).getFieldObjectInspector())
-                    .getPrimitiveJavaObject(values.get(granularityFieldIndex)).getTime()
+        ((TimestampObjectInspector) fields.get(granularityFieldIndex).getFieldObjectInspector())
+            .getPrimitiveJavaObject(values.get(granularityFieldIndex)).getTime()
     );
     if (values.size() == columns.length + 2) {
       // Then partition number if any.
       final int partitionNumPos = granularityFieldIndex + 1;
-      Preconditions.checkArgument(
-          fields.get(partitionNumPos).getFieldName().equals(Constants.DRUID_SHARD_KEY_COL_NAME),
+      Preconditions.checkArgument(fields.get(partitionNumPos).getFieldName().equals(Constants.DRUID_SHARD_KEY_COL_NAME),
           String.format("expecting to encounter %s but was %s", Constants.DRUID_SHARD_KEY_COL_NAME,
               fields.get(partitionNumPos).getFieldName()
           )
       );
       value.put(Constants.DRUID_SHARD_KEY_COL_NAME,
-          ((LongObjectInspector) fields.get(partitionNumPos).getFieldObjectInspector())
-              .get(values.get(partitionNumPos))
+          ((LongObjectInspector) fields.get(partitionNumPos).getFieldObjectInspector()).get(values.get(partitionNumPos))
       );
     }
 
     return new DruidWritable(value);
   }
 
-  @Override
-  public SerDeStats getSerDeStats() {
+  @Override public SerDeStats getSerDeStats() {
     // no support for statistics
     return null;
   }
 
-  @Override
-  public Object deserialize(Writable writable) throws SerDeException {
+  @Override public Object deserialize(Writable writable) throws SerDeException {
     final DruidWritable input = (DruidWritable) writable;
     final List<Object> output = Lists.newArrayListWithExpectedSize(columns.length);
     for (int i = 0; i < columns.length; i++) {
@@ -411,72 +393,89 @@ public class DruidSerDe extends AbstractSerDe {
         continue;
       }
       switch (types[i].getPrimitiveCategory()) {
-        case TIMESTAMP:
-          output.add(new TimestampWritable(Timestamp.valueOf(ZonedDateTime
-              .ofInstant(Instant.ofEpochMilli(((Number) value).longValue()),
-                  tsTZTypeInfo.timeZone()
-              ).format(DateTimeFormatter.ofPattern("yyyy-MM-dd HH:mm:ss")).toString())));
-          break;
-        case TIMESTAMPLOCALTZ:
-          output.add(
-              new TimestampLocalTZWritable(
-                  new TimestampTZ(
-                      ZonedDateTime.ofInstant(
-                          Instant.ofEpochMilli(((Number) value).longValue()),
-                          ((TimestampLocalTZTypeInfo) types[i]).timeZone()))));
-          break;
-        case BYTE:
-          output.add(new ByteWritable(((Number) value).byteValue()));
-          break;
-        case SHORT:
-          output.add(new ShortWritable(((Number) value).shortValue()));
-          break;
-        case INT:
+      case TIMESTAMP:
+        if (value instanceof Number) {
+          output.add(new TimestampWritable(Timestamp.valueOf(
+              ZonedDateTime.ofInstant(Instant.ofEpochMilli(((Number) value).longValue()), tsTZTypeInfo.timeZone())
+                  .format(DateTimeFormatter.ofPattern(TIMESTAMP_FORMAT)))));
+        } else {
+          output.add(new TimestampWritable(Timestamp.valueOf((String) value)));
+        }
+
+        break;
+      case TIMESTAMPLOCALTZ:
+        final long numberOfMillis;
+        if (value instanceof Number) {
+          numberOfMillis = ((Number) value).longValue();
+        } else {
+          // it is an extraction fn need to be parsed
+          numberOfMillis = dateOptionalTimeParser().parseDateTime((String) value).getMillis();
+        }
+        output.add(new TimestampLocalTZWritable(new TimestampTZ(ZonedDateTime
+            .ofInstant(Instant.ofEpochMilli(numberOfMillis),
+                ((TimestampLocalTZTypeInfo) types[i]).timeZone()
+            ))));
+        break;
+      case DATE:
+        final DateWritable dateWritable;
+        if (value instanceof Number) {
+          dateWritable = new DateWritable(new Date((((Number) value).longValue())));
+        } else {
+          // it is an extraction fn need to be parsed
+          dateWritable = new DateWritable(new Date(dateOptionalTimeParser().parseDateTime((String) value).getMillis()));
+        }
+        output.add(dateWritable);
+        break;
+      case BYTE:
+        output.add(new ByteWritable(((Number) value).byteValue()));
+        break;
+      case SHORT:
+        output.add(new ShortWritable(((Number) value).shortValue()));
+        break;
+      case INT:
+        if (value instanceof Number) {
           output.add(new IntWritable(((Number) value).intValue()));
-          break;
-        case LONG:
-          output.add(new LongWritable(((Number) value).longValue()));
-          break;
-        case FLOAT:
-          output.add(new FloatWritable(((Number) value).floatValue()));
-          break;
-        case DOUBLE:
-          output.add(new DoubleWritable(((Number) value).doubleValue()));
-          break;
-        case CHAR:
-          output.add(
-              new HiveCharWritable(
-                  new HiveChar(
-                      value.toString(),
-                      ((CharTypeInfo) types[i]).getLength())));
-          break;
-        case VARCHAR:
-          output.add(
-              new HiveVarcharWritable(
-                  new HiveVarchar(
-                      value.toString(),
-                      ((VarcharTypeInfo) types[i]).getLength())));
-          break;
-        case STRING:
-          output.add(new Text(value.toString()));
-          break;
-        case BOOLEAN:
-          output.add(new BooleanWritable(Boolean.valueOf(value.toString())));
-          break;
-        default:
-          throw new SerDeException("Unknown type: " + types[i].getPrimitiveCategory());
+        } else {
+          // This is a corner case where we have an extract of time unit like day/month pushed as Extraction Fn
+          //@TODO The best way to fix this is to add explicit output Druid types to Calcite Extraction Functions impls
+          output.add(new IntWritable(Integer.valueOf((String) value)));
+        }
+
+        break;
+      case LONG:
+        output.add(new LongWritable(((Number) value).longValue()));
+        break;
+      case FLOAT:
+        output.add(new FloatWritable(((Number) value).floatValue()));
+        break;
+      case DOUBLE:
+        output.add(new DoubleWritable(((Number) value).doubleValue()));
+        break;
+      case CHAR:
+        output.add(new HiveCharWritable(new HiveChar(value.toString(), ((CharTypeInfo) types[i]).getLength())));
+        break;
+      case VARCHAR:
+        output
+            .add(new HiveVarcharWritable(new HiveVarchar(value.toString(), ((VarcharTypeInfo) types[i]).getLength())));
+        break;
+      case STRING:
+        output.add(new Text(value.toString()));
+        break;
+      case BOOLEAN:
+        output.add(new BooleanWritable(Boolean.valueOf(value.toString())));
+        break;
+      default:
+        throw new SerDeException("Unknown type: " + types[i].getPrimitiveCategory());
       }
     }
     return output;
   }
 
-  @Override
-  public ObjectInspector getObjectInspector() {
+  @Override public ObjectInspector getObjectInspector() {
     return inspector;
   }
 
-  @Override
-  public boolean shouldStoreFieldsInMetastore(Map<String, String> tableParams) {
+  @Override public boolean shouldStoreFieldsInMetastore(Map<String, String> tableParams) {
     // If Druid table is not an external table store the schema in metadata store.
     return !MetaStoreUtils.isExternal(tableParams);
   }

http://git-wip-us.apache.org/repos/asf/hive/blob/15bf4eb9/druid-handler/src/java/org/apache/hadoop/hive/druid/serde/DruidSerDeUtils.java
----------------------------------------------------------------------
diff --git a/druid-handler/src/java/org/apache/hadoop/hive/druid/serde/DruidSerDeUtils.java b/druid-handler/src/java/org/apache/hadoop/hive/druid/serde/DruidSerDeUtils.java
index 1c34e41..630e097 100644
--- a/druid-handler/src/java/org/apache/hadoop/hive/druid/serde/DruidSerDeUtils.java
+++ b/druid-handler/src/java/org/apache/hadoop/hive/druid/serde/DruidSerDeUtils.java
@@ -22,10 +22,6 @@ import org.apache.hadoop.hive.serde2.typeinfo.TypeInfoFactory;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 
-import io.druid.query.dimension.DimensionSpec;
-import io.druid.query.dimension.ExtractionDimensionSpec;
-import io.druid.query.extraction.TimeFormatExtractionFn;
-
 /**
  * Utils class for Druid SerDe.
  */
@@ -34,6 +30,7 @@ public final class DruidSerDeUtils {
   private static final Logger LOG = LoggerFactory.getLogger(DruidSerDeUtils.class);
 
   protected static final String ISO_TIME_FORMAT = "yyyy-MM-dd'T'HH:mm:ss.SSS'Z'";
+  protected static final String TIMESTAMP_FORMAT = "yyyy-MM-dd HH:mm:ss";
 
   protected static final String FLOAT_TYPE = "FLOAT";
   protected static final String DOUBLE_TYPE = "DOUBLE";

http://git-wip-us.apache.org/repos/asf/hive/blob/15bf4eb9/ql/src/test/queries/clientpositive/druidmini_extractTime.q
----------------------------------------------------------------------
diff --git a/ql/src/test/queries/clientpositive/druidmini_extractTime.q b/ql/src/test/queries/clientpositive/druidmini_extractTime.q
index 2f7129e..429f796 100644
--- a/ql/src/test/queries/clientpositive/druidmini_extractTime.q
+++ b/ql/src/test/queries/clientpositive/druidmini_extractTime.q
@@ -1,3 +1,5 @@
+--! qt:dataset:alltypesorc
+
 SET hive.vectorized.execution.enabled=false;
 CREATE TABLE druid_table
 STORED BY 'org.apache.hadoop.hive.druid.DruidStorageHandler'
@@ -160,5 +162,18 @@ AND CAST(EXTRACT(YEAR from `__time`) as STRING) = '1969' LIMIT 1;
 SELECT EXTRACT(YEAR from `__time`), SUBSTRING(CAST(CAST(`__time` AS DATE) AS STRING), 1, 4) as year_str FROM druid_table WHERE EXTRACT(YEAR from `__time`) >= 1969
 AND CAST(EXTRACT(YEAR from `__time`) as STRING) = '1969' LIMIT 1;
 
+-- Cast to Timestamp
+
+explain SELECT CAST(`__time` AS TIMESTAMP) AS `x_time`, SUM(cfloat)  FROM druid_table GROUP BY CAST(`__time` AS TIMESTAMP) ORDER BY `x_time` LIMIT 5;
+
+SELECT CAST(`__time` AS TIMESTAMP) AS `x_time`, SUM(cfloat)  FROM druid_table GROUP BY CAST(`__time` AS TIMESTAMP) ORDER BY `x_time` LIMIT 5;
+
+-- Cast to Date
+
+explain SELECT CAST(`__time` AS DATE) AS `x_date`, SUM(cfloat)  FROM druid_table GROUP BY CAST(`__time` AS DATE) ORDER BY `x_date` LIMIT 5;
+
+SELECT CAST(`__time` AS DATE) AS `x_date`, SUM(cfloat)  FROM druid_table GROUP BY CAST(`__time` AS DATE) ORDER BY `x_date` LIMIT 5;
+
+SELECT CAST(`__time` AS DATE) AS `x_date` FROM druid_table ORDER BY `x_date` LIMIT 5;
 
 DROP TABLE druid_table;
\ No newline at end of file

http://git-wip-us.apache.org/repos/asf/hive/blob/15bf4eb9/ql/src/test/results/clientpositive/druid/druidmini_extractTime.q.out
----------------------------------------------------------------------
diff --git a/ql/src/test/results/clientpositive/druid/druidmini_extractTime.q.out b/ql/src/test/results/clientpositive/druid/druidmini_extractTime.q.out
index c3679a3..a1b1d24 100644
--- a/ql/src/test/results/clientpositive/druid/druidmini_extractTime.q.out
+++ b/ql/src/test/results/clientpositive/druid/druidmini_extractTime.q.out
@@ -1009,6 +1009,86 @@ POSTHOOK: type: QUERY
 POSTHOOK: Input: default@druid_table
 POSTHOOK: Output: hdfs://### HDFS PATH ###
 1969	1969
+PREHOOK: query: explain SELECT CAST(`__time` AS TIMESTAMP) AS `x_time`, SUM(cfloat)  FROM druid_table GROUP BY CAST(`__time` AS TIMESTAMP) ORDER BY `x_time` LIMIT 5
+PREHOOK: type: QUERY
+POSTHOOK: query: explain SELECT CAST(`__time` AS TIMESTAMP) AS `x_time`, SUM(cfloat)  FROM druid_table GROUP BY CAST(`__time` AS TIMESTAMP) ORDER BY `x_time` LIMIT 5
+POSTHOOK: type: QUERY
+STAGE DEPENDENCIES:
+  Stage-0 is a root stage
+
+STAGE PLANS:
+  Stage: Stage-0
+    Fetch Operator
+      limit: -1
+      Processor Tree:
+        TableScan
+          alias: druid_table
+          properties:
+            druid.fieldNames extract,$f1
+            druid.fieldTypes timestamp,double
+            druid.query.json {"queryType":"groupBy","dataSource":"default.druid_table","granularity":"all","dimensions":[{"type":"extraction","dimension":"__time","outputName":"extract","extractionFn":{"type":"timeFormat","format":"yyyy-MM-dd HH:mm:ss","timeZone":"US/Pacific","locale":"en"}}],"limitSpec":{"type":"default","limit":5,"columns":[{"dimension":"extract","direction":"ascending","dimensionOrder":"lexicographic"}]},"aggregations":[{"type":"doubleSum","name":"$f1","fieldName":"cfloat"}],"intervals":["1900-01-01T00:00:00.000Z/3000-01-01T00:00:00.000Z"]}
+            druid.query.type groupBy
+          Select Operator
+            expressions: extract (type: timestamp), $f1 (type: double)
+            outputColumnNames: _col0, _col1
+            ListSink
+
+PREHOOK: query: SELECT CAST(`__time` AS TIMESTAMP) AS `x_time`, SUM(cfloat)  FROM druid_table GROUP BY CAST(`__time` AS TIMESTAMP) ORDER BY `x_time` LIMIT 5
+PREHOOK: type: QUERY
+PREHOOK: Input: default@druid_table
+PREHOOK: Output: hdfs://### HDFS PATH ###
+POSTHOOK: query: SELECT CAST(`__time` AS TIMESTAMP) AS `x_time`, SUM(cfloat)  FROM druid_table GROUP BY CAST(`__time` AS TIMESTAMP) ORDER BY `x_time` LIMIT 5
+POSTHOOK: type: QUERY
+POSTHOOK: Input: default@druid_table
+POSTHOOK: Output: hdfs://### HDFS PATH ###
+1969-12-31 15:59:00	-4532.569952011108
+1969-12-31 16:00:00	-35057.67698967457
+PREHOOK: query: explain SELECT CAST(`__time` AS DATE) AS `x_date`, SUM(cfloat)  FROM druid_table GROUP BY CAST(`__time` AS DATE) ORDER BY `x_date` LIMIT 5
+PREHOOK: type: QUERY
+POSTHOOK: query: explain SELECT CAST(`__time` AS DATE) AS `x_date`, SUM(cfloat)  FROM druid_table GROUP BY CAST(`__time` AS DATE) ORDER BY `x_date` LIMIT 5
+POSTHOOK: type: QUERY
+STAGE DEPENDENCIES:
+  Stage-0 is a root stage
+
+STAGE PLANS:
+  Stage: Stage-0
+    Fetch Operator
+      limit: -1
+      Processor Tree:
+        TableScan
+          alias: druid_table
+          properties:
+            druid.fieldNames vc,$f1
+            druid.fieldTypes date,double
+            druid.query.json {"queryType":"groupBy","dataSource":"default.druid_table","granularity":"all","dimensions":[{"type":"default","dimension":"vc","outputName":"vc","outputType":"LONG"}],"virtualColumns":[{"type":"expression","name":"vc","expression":"timestamp_floor(\"__time\",'P1D','','US/Pacific')","outputType":"LONG"}],"limitSpec":{"type":"default","limit":5,"columns":[{"dimension":"vc","direction":"ascending","dimensionOrder":"lexicographic"}]},"aggregations":[{"type":"doubleSum","name":"$f1","fieldName":"cfloat"}],"intervals":["1900-01-01T00:00:00.000Z/3000-01-01T00:00:00.000Z"]}
+            druid.query.type groupBy
+          Select Operator
+            expressions: vc (type: date), $f1 (type: double)
+            outputColumnNames: _col0, _col1
+            ListSink
+
+PREHOOK: query: SELECT CAST(`__time` AS DATE) AS `x_date`, SUM(cfloat)  FROM druid_table GROUP BY CAST(`__time` AS DATE) ORDER BY `x_date` LIMIT 5
+PREHOOK: type: QUERY
+PREHOOK: Input: default@druid_table
+PREHOOK: Output: hdfs://### HDFS PATH ###
+POSTHOOK: query: SELECT CAST(`__time` AS DATE) AS `x_date`, SUM(cfloat)  FROM druid_table GROUP BY CAST(`__time` AS DATE) ORDER BY `x_date` LIMIT 5
+POSTHOOK: type: QUERY
+POSTHOOK: Input: default@druid_table
+POSTHOOK: Output: hdfs://### HDFS PATH ###
+1969-12-31	-39590.24694168568
+PREHOOK: query: SELECT CAST(`__time` AS DATE) AS `x_date` FROM druid_table ORDER BY `x_date` LIMIT 5
+PREHOOK: type: QUERY
+PREHOOK: Input: default@druid_table
+PREHOOK: Output: hdfs://### HDFS PATH ###
+POSTHOOK: query: SELECT CAST(`__time` AS DATE) AS `x_date` FROM druid_table ORDER BY `x_date` LIMIT 5
+POSTHOOK: type: QUERY
+POSTHOOK: Input: default@druid_table
+POSTHOOK: Output: hdfs://### HDFS PATH ###
+1969-12-31
+1969-12-31
+1969-12-31
+1969-12-31
+1969-12-31
 PREHOOK: query: DROP TABLE druid_table
 PREHOOK: type: DROPTABLE
 PREHOOK: Input: default@druid_table