You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@hive.apache.org by jc...@apache.org on 2017/05/31 14:55:14 UTC

hive git commit: HIVE-16776: Strange cast behavior for table backed by druid (Jesus Camacho Rodriguez, reviewed by Ashutosh Chauhan)

Repository: hive
Updated Branches:
  refs/heads/master 509308f64 -> 727a3dfcd


HIVE-16776: Strange cast behavior for table backed by druid (Jesus Camacho Rodriguez, reviewed by Ashutosh Chauhan)


Project: http://git-wip-us.apache.org/repos/asf/hive/repo
Commit: http://git-wip-us.apache.org/repos/asf/hive/commit/727a3dfc
Tree: http://git-wip-us.apache.org/repos/asf/hive/tree/727a3dfc
Diff: http://git-wip-us.apache.org/repos/asf/hive/diff/727a3dfc

Branch: refs/heads/master
Commit: 727a3dfcda43972b5947b8351223506e2bdfb4c0
Parents: 509308f
Author: Jesus Camacho Rodriguez <jc...@apache.org>
Authored: Wed May 31 15:54:34 2017 +0100
Committer: Jesus Camacho Rodriguez <jc...@apache.org>
Committed: Wed May 31 15:54:34 2017 +0100

----------------------------------------------------------------------
 .../hadoop/hive/druid/serde/DruidSerDe.java     |  95 ++++++++---
 .../hadoop/hive/druid/TestDruidSerDe.java       | 169 ++++++++++++++++++-
 .../test/queries/clientpositive/druid_basic2.q  |   4 +
 .../results/clientpositive/druid_basic2.q.out   |  40 +++++
 4 files changed, 278 insertions(+), 30 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/hive/blob/727a3dfc/druid-handler/src/java/org/apache/hadoop/hive/druid/serde/DruidSerDe.java
----------------------------------------------------------------------
diff --git a/druid-handler/src/java/org/apache/hadoop/hive/druid/serde/DruidSerDe.java b/druid-handler/src/java/org/apache/hadoop/hive/druid/serde/DruidSerDe.java
index 656c0f1..2d11e4b 100644
--- a/druid-handler/src/java/org/apache/hadoop/hive/druid/serde/DruidSerDe.java
+++ b/druid-handler/src/java/org/apache/hadoop/hive/druid/serde/DruidSerDe.java
@@ -40,7 +40,10 @@ import org.apache.hadoop.hive.serde2.AbstractSerDe;
 import org.apache.hadoop.hive.serde2.SerDeException;
 import org.apache.hadoop.hive.serde2.SerDeSpec;
 import org.apache.hadoop.hive.serde2.SerDeStats;
+import org.apache.hadoop.hive.serde2.io.ByteWritable;
+import org.apache.hadoop.hive.serde2.io.DoubleWritable;
 import org.apache.hadoop.hive.serde2.io.HiveDecimalWritable;
+import org.apache.hadoop.hive.serde2.io.ShortWritable;
 import org.apache.hadoop.hive.serde2.io.TimestampWritable;
 import org.apache.hadoop.hive.serde2.objectinspector.ObjectInspector;
 import org.apache.hadoop.hive.serde2.objectinspector.ObjectInspectorFactory;
@@ -58,12 +61,9 @@ import org.apache.hadoop.hive.serde2.objectinspector.primitive.StringObjectInspe
 import org.apache.hadoop.hive.serde2.objectinspector.primitive.TimestampObjectInspector;
 import org.apache.hadoop.hive.serde2.typeinfo.PrimitiveTypeInfo;
 import org.apache.hadoop.hive.serde2.typeinfo.TypeInfoFactory;
-import org.apache.hadoop.io.ByteWritable;
-import org.apache.hadoop.io.DoubleWritable;
 import org.apache.hadoop.io.FloatWritable;
 import org.apache.hadoop.io.IntWritable;
 import org.apache.hadoop.io.LongWritable;
-import org.apache.hadoop.io.ShortWritable;
 import org.apache.hadoop.io.Text;
 import org.apache.hadoop.io.Writable;
 import org.apache.hadoop.util.StringUtils;
@@ -73,6 +73,7 @@ import org.slf4j.LoggerFactory;
 
 import com.fasterxml.jackson.core.type.TypeReference;
 import com.google.common.base.Function;
+import com.google.common.collect.ImmutableMap;
 import com.google.common.collect.Lists;
 import com.metamx.common.lifecycle.Lifecycle;
 import com.metamx.http.client.HttpClient;
@@ -203,12 +204,29 @@ public class DruidSerDe extends AbstractSerDe {
       try {
         query = DruidStorageHandlerUtils.JSON_MAPPER.readValue(druidQuery, Query.class);
 
+        // Extract column names and types (if present)
+        ImmutableMap.Builder<String, PrimitiveTypeInfo> mapColumnNamesTypes = ImmutableMap.builder();
+        if (!org.apache.commons.lang3.StringUtils
+                .isEmpty(properties.getProperty(serdeConstants.LIST_COLUMNS))
+                && !org.apache.commons.lang3.StringUtils
+                .isEmpty(properties.getProperty(serdeConstants.LIST_COLUMN_TYPES))) {
+          List<String> propColumnNames = Utilities.getColumnNames(properties);
+          List<String> propColumnTypes = Utilities.getColumnTypes(properties);
+          for (int i = 0; i < propColumnNames.size(); i++) {
+            mapColumnNamesTypes.put(
+                    propColumnNames.get(i),
+                    TypeInfoFactory.getPrimitiveTypeInfo(propColumnTypes.get(i)));
+          }
+        }
+
         switch (query.getType()) {
           case Query.TIMESERIES:
-            inferSchema((TimeseriesQuery) query, columnNames, columnTypes);
+            inferSchema((TimeseriesQuery) query, columnNames, columnTypes,
+                    mapColumnNamesTypes.build());
             break;
           case Query.TOPN:
-            inferSchema((TopNQuery) query, columnNames, columnTypes);
+            inferSchema((TopNQuery) query, columnNames, columnTypes,
+                    mapColumnNamesTypes.build());
             break;
           case Query.SELECT:
             String address = HiveConf.getVar(configuration,
@@ -216,10 +234,12 @@ public class DruidSerDe extends AbstractSerDe {
             if (org.apache.commons.lang3.StringUtils.isEmpty(address)) {
               throw new SerDeException("Druid broker address not specified in configuration");
             }
-            inferSchema((SelectQuery) query, columnNames, columnTypes, address);
+            inferSchema((SelectQuery) query, columnNames, columnTypes, address,
+                    mapColumnNamesTypes.build());
             break;
           case Query.GROUP_BY:
-            inferSchema((GroupByQuery) query, columnNames, columnTypes);
+            inferSchema((GroupByQuery) query, columnNames, columnTypes,
+                    mapColumnNamesTypes.build());
             break;
           default:
             throw new SerDeException("Not supported Druid query");
@@ -233,8 +253,7 @@ public class DruidSerDe extends AbstractSerDe {
       for (int i = 0; i < columnTypes.size(); ++i) {
         columns[i] = columnNames.get(i);
         types[i] = columnTypes.get(i);
-        inspectors
-                .add(PrimitiveObjectInspectorFactory.getPrimitiveWritableObjectInspector(types[i]));
+        inspectors.add(PrimitiveObjectInspectorFactory.getPrimitiveWritableObjectInspector(types[i]));
       }
       inspector = ObjectInspectorFactory.getStandardStructObjectInspector(columnNames, inspectors);
     }
@@ -280,16 +299,22 @@ public class DruidSerDe extends AbstractSerDe {
   }
 
   /* Timeseries query */
-  private void inferSchema(TimeseriesQuery query, List<String> columnNames,
-          List<PrimitiveTypeInfo> columnTypes
-  ) {
+  private void inferSchema(TimeseriesQuery query,
+          List<String> columnNames, List<PrimitiveTypeInfo> columnTypes,
+          Map<String, PrimitiveTypeInfo> mapColumnNamesTypes) {
     // Timestamp column
     columnNames.add(DruidTable.DEFAULT_TIMESTAMP_COLUMN);
     columnTypes.add(TypeInfoFactory.timestampTypeInfo);
     // Aggregator columns
     for (AggregatorFactory af : query.getAggregatorSpecs()) {
       columnNames.add(af.getName());
-      columnTypes.add(DruidSerDeUtils.convertDruidToHiveType(af.getTypeName()));
+      PrimitiveTypeInfo typeInfo = mapColumnNamesTypes.get(af.getName());
+      if (typeInfo != null) {
+        // If datasource was created by Hive, we consider Hive type
+        columnTypes.add(typeInfo);
+      } else {
+        columnTypes.add(DruidSerDeUtils.convertDruidToHiveType(af.getTypeName()));
+      }
     }
     // Post-aggregator columns
     // TODO: Currently Calcite only infers avg for post-aggregate,
@@ -302,9 +327,9 @@ public class DruidSerDe extends AbstractSerDe {
   }
 
   /* TopN query */
-  private void inferSchema(TopNQuery query, List<String> columnNames,
-          List<PrimitiveTypeInfo> columnTypes
-  ) {
+  private void inferSchema(TopNQuery query,
+          List<String> columnNames, List<PrimitiveTypeInfo> columnTypes,
+          Map<String, PrimitiveTypeInfo> mapColumnNamesTypes) {
     // Timestamp column
     columnNames.add(DruidTable.DEFAULT_TIMESTAMP_COLUMN);
     columnTypes.add(TypeInfoFactory.timestampTypeInfo);
@@ -314,7 +339,13 @@ public class DruidSerDe extends AbstractSerDe {
     // Aggregator columns
     for (AggregatorFactory af : query.getAggregatorSpecs()) {
       columnNames.add(af.getName());
-      columnTypes.add(DruidSerDeUtils.convertDruidToHiveType(af.getTypeName()));
+      PrimitiveTypeInfo typeInfo = mapColumnNamesTypes.get(af.getName());
+      if (typeInfo != null) {
+        // If datasource was created by Hive, we consider Hive type
+        columnTypes.add(typeInfo);
+      } else {
+        columnTypes.add(DruidSerDeUtils.convertDruidToHiveType(af.getTypeName()));
+      }
     }
     // Post-aggregator columns
     // TODO: Currently Calcite only infers avg for post-aggregate,
@@ -327,8 +358,10 @@ public class DruidSerDe extends AbstractSerDe {
   }
 
   /* Select query */
-  private void inferSchema(SelectQuery query, List<String> columnNames,
-          List<PrimitiveTypeInfo> columnTypes, String address) throws SerDeException {
+  private void inferSchema(SelectQuery query,
+          List<String> columnNames, List<PrimitiveTypeInfo> columnTypes,
+          String address, Map<String, PrimitiveTypeInfo> mapColumnNamesTypes)
+                  throws SerDeException {
     // Timestamp column
     columnNames.add(DruidTable.DEFAULT_TIMESTAMP_COLUMN);
     columnTypes.add(TypeInfoFactory.timestampTypeInfo);
@@ -356,15 +389,21 @@ public class DruidSerDe extends AbstractSerDe {
     }
     for (String metric : query.getMetrics()) {
       columnNames.add(metric);
-      columnTypes.add(DruidSerDeUtils.convertDruidToHiveType(
-              schemaInfo.getColumns().get(metric).getType()));
+      PrimitiveTypeInfo typeInfo = mapColumnNamesTypes.get(metric);
+      if (typeInfo != null) {
+        // If datasource was created by Hive, we consider Hive type
+        columnTypes.add(typeInfo);
+      } else {
+        columnTypes.add(DruidSerDeUtils.convertDruidToHiveType(
+                schemaInfo.getColumns().get(metric).getType()));
+      }
     }
   }
 
   /* GroupBy query */
-  private void inferSchema(GroupByQuery query, List<String> columnNames,
-          List<PrimitiveTypeInfo> columnTypes
-  ) {
+  private void inferSchema(GroupByQuery query,
+          List<String> columnNames, List<PrimitiveTypeInfo> columnTypes,
+          Map<String, PrimitiveTypeInfo> mapColumnNamesTypes) {
     // Timestamp column
     columnNames.add(DruidTable.DEFAULT_TIMESTAMP_COLUMN);
     columnTypes.add(TypeInfoFactory.timestampTypeInfo);
@@ -376,7 +415,13 @@ public class DruidSerDe extends AbstractSerDe {
     // Aggregator columns
     for (AggregatorFactory af : query.getAggregatorSpecs()) {
       columnNames.add(af.getName());
-      columnTypes.add(DruidSerDeUtils.convertDruidToHiveType(af.getTypeName()));
+      PrimitiveTypeInfo typeInfo = mapColumnNamesTypes.get(af.getName());
+      if (typeInfo != null) {
+        // If datasource was created by Hive, we consider Hive type
+        columnTypes.add(typeInfo);
+      } else {
+        columnTypes.add(DruidSerDeUtils.convertDruidToHiveType(af.getTypeName()));
+      }
     }
     // Post-aggregator columns
     // TODO: Currently Calcite only infers avg for post-aggregate,

http://git-wip-us.apache.org/repos/asf/hive/blob/727a3dfc/druid-handler/src/test/org/apache/hadoop/hive/druid/TestDruidSerDe.java
----------------------------------------------------------------------
diff --git a/druid-handler/src/test/org/apache/hadoop/hive/druid/TestDruidSerDe.java b/druid-handler/src/test/org/apache/hadoop/hive/druid/TestDruidSerDe.java
index 1bd5d84..695e0dd 100644
--- a/druid-handler/src/test/org/apache/hadoop/hive/druid/TestDruidSerDe.java
+++ b/druid-handler/src/test/org/apache/hadoop/hive/druid/TestDruidSerDe.java
@@ -143,6 +143,16 @@ public class TestDruidSerDe {
                   new FloatWritable(3.32F), new FloatWritable(4F) }
   };
 
+  // Timeseries query results as records (types defined by metastore)
+  private static final String TIMESERIES_COLUMN_NAMES = "__time,sample_name1,sample_name2,sample_divide";
+  private static final String TIMESERIES_COLUMN_TYPES = "timestamp,smallint,double,float";
+  private static final Object[][] TIMESERIES_QUERY_RESULTS_RECORDS_2 = new Object[][] {
+    new Object[] { new TimestampWritable(new Timestamp(1325376000000L)), new ShortWritable((short) 0),
+        new DoubleWritable(1.0d), new FloatWritable(2.2222F) },
+    new Object[] { new TimestampWritable(new Timestamp(1325462400000L)), new ShortWritable((short) 2),
+        new DoubleWritable(3.32d), new FloatWritable(4F) }
+  };
+
   // TopN query
   private static final String TOPN_QUERY =
           "{  \"queryType\": \"topN\", "
@@ -259,6 +269,27 @@ public class TestDruidSerDe {
                   new FloatWritable(46.45F) }
   };
 
+  // TopN query results as records (types defined by metastore)
+  private static final String TOPN_COLUMN_NAMES = "__time,sample_dim,count,some_metric,sample_divide";
+  private static final String TOPN_COLUMN_TYPES = "timestamp,string,bigint,double,float";
+  private static final Object[][] TOPN_QUERY_RESULTS_RECORDS_2 = new Object[][] {
+          new Object[] { new TimestampWritable(new Timestamp(1377907200000L)),
+                  new Text("dim1_val"), new LongWritable(111), new DoubleWritable(10669d),
+                  new FloatWritable(96.11711711711712F) },
+          new Object[] { new TimestampWritable(new Timestamp(1377907200000L)),
+                  new Text("another_dim1_val"), new LongWritable(88), new DoubleWritable(28344d),
+                  new FloatWritable(322.09090909090907F) },
+          new Object[] { new TimestampWritable(new Timestamp(1377907200000L)),
+                  new Text("dim1_val3"), new LongWritable(70), new DoubleWritable(871d),
+                  new FloatWritable(12.442857142857143F) },
+          new Object[] { new TimestampWritable(new Timestamp(1377907200000L)),
+                  new Text("dim1_val4"), new LongWritable(62), new DoubleWritable(815d),
+                  new FloatWritable(13.14516129032258F) },
+          new Object[] { new TimestampWritable(new Timestamp(1377907200000L)),
+                  new Text("dim1_val5"), new LongWritable(60), new DoubleWritable(2787d),
+                  new FloatWritable(46.45F) }
+  };
+
   // GroupBy query
   private static final String GROUP_BY_QUERY =
           "{ "
@@ -339,6 +370,18 @@ public class TestDruidSerDe {
                   new FloatWritable(6.333333F) }
   };
 
+  // GroupBy query results as records (types defined by metastore)
+  private static final String GROUP_BY_COLUMN_NAMES = "__time,country,device,total_usage,data_transfer,avg_usage";
+  private static final String GROUP_BY_COLUMN_TYPES = "timestamp,string,string,int,double,float";
+  private static final Object[][] GROUP_BY_QUERY_RESULTS_RECORDS_2 = new Object[][] {
+    new Object[] { new TimestampWritable(new Timestamp(1325376000000L)), new Text("India"),
+            new Text("phone"), new IntWritable(88), new DoubleWritable(29.91233453F),
+            new FloatWritable(60.32F) },
+    new Object[] { new TimestampWritable(new Timestamp(1325376012000L)), new Text("Spain"),
+            new Text("pc"), new IntWritable(16), new DoubleWritable(172.93494959F),
+            new FloatWritable(6.333333F) }
+  };
+
   // Select query
   private static final String SELECT_QUERY =
           "{   \"queryType\": \"select\",  "
@@ -483,6 +526,38 @@ public class TestDruidSerDe {
                   new FloatWritable(68.0F), new FloatWritable(0.0F) }
   };
 
+  // Select query results as records (types defined by metastore)
+  private static final String SELECT_COLUMN_NAMES = "__time,robot,namespace,anonymous,unpatrolled,page,language,newpage,user,count,added,delta,variation,deleted";
+  private static final String SELECT_COLUMN_TYPES = "timestamp,string,string,string,string,string,string,string,string,double,double,float,float,float";
+  private static final Object[][] SELECT_QUERY_RESULTS_RECORDS_2 = new Object[][] {
+          new Object[] { new TimestampWritable(new Timestamp(1356998400000L)), new Text("1"),
+                  new Text("article"), new Text("0"), new Text("0"),
+                  new Text("11._korpus_(NOVJ)"), new Text("sl"), new Text("0"),
+                  new Text("EmausBot"),
+                  new DoubleWritable(1.0d), new DoubleWritable(39.0d), new FloatWritable(39.0F),
+                  new FloatWritable(39.0F), new FloatWritable(0.0F) },
+          new Object[] { new TimestampWritable(new Timestamp(1356998400000L)), new Text("0"),
+                  new Text("article"), new Text("0"), new Text("0"),
+                  new Text("112_U.S._580"), new Text("en"), new Text("1"), new Text("MZMcBride"),
+                  new DoubleWritable(1.0d), new DoubleWritable(70.0d), new FloatWritable(70.0F),
+                  new FloatWritable(70.0F), new FloatWritable(0.0F) },
+          new Object[] { new TimestampWritable(new Timestamp(1356998412000L)), new Text("0"),
+                  new Text("article"), new Text("0"), new Text("0"),
+                  new Text("113_U.S._243"), new Text("en"), new Text("1"), new Text("MZMcBride"),
+                  new DoubleWritable(1.0d), new DoubleWritable(77.0d), new FloatWritable(77.0F),
+                  new FloatWritable(77.0F), new FloatWritable(0.0F) },
+          new Object[] { new TimestampWritable(new Timestamp(1356998412000L)), new Text("0"),
+                  new Text("article"), new Text("0"), new Text("0"),
+                  new Text("113_U.S._73"), new Text("en"), new Text("1"), new Text("MZMcBride"),
+                  new DoubleWritable(1.0d), new DoubleWritable(70.0d), new FloatWritable(70.0F),
+                  new FloatWritable(70.0F), new FloatWritable(0.0F) },
+          new Object[] { new TimestampWritable(new Timestamp(1356998412000L)), new Text("0"),
+                  new Text("article"), new Text("0"), new Text("0"),
+                  new Text("113_U.S._756"), new Text("en"), new Text("1"), new Text("MZMcBride"),
+                  new DoubleWritable(1.0d), new DoubleWritable(68.0d), new FloatWritable(68.0F),
+                  new FloatWritable(68.0F), new FloatWritable(0.0F) }
+  };
+
   /**
    * Test the default behavior of the objects and object inspectors.
    * @throws IOException
@@ -511,24 +586,52 @@ public class TestDruidSerDe {
     deserializeQueryResults(serDe, Query.TIMESERIES, TIMESERIES_QUERY,
             TIMESERIES_QUERY_RESULTS, TIMESERIES_QUERY_RESULTS_RECORDS
     );
+    // Timeseries query (simulating column types from metastore)
+    tbl.setProperty(serdeConstants.LIST_COLUMNS, TIMESERIES_COLUMN_NAMES);
+    tbl.setProperty(serdeConstants.LIST_COLUMN_TYPES, TIMESERIES_COLUMN_TYPES);
+    SerDeUtils.initializeSerDe(serDe, conf, tbl, null);
+    deserializeQueryResults(serDe, Query.TIMESERIES, TIMESERIES_QUERY,
+            TIMESERIES_QUERY_RESULTS, TIMESERIES_QUERY_RESULTS_RECORDS_2
+    );
     // TopN query
     tbl = createPropertiesQuery("sample_data", Query.TOPN, TOPN_QUERY);
     SerDeUtils.initializeSerDe(serDe, conf, tbl, null);
     deserializeQueryResults(serDe, Query.TOPN, TOPN_QUERY,
             TOPN_QUERY_RESULTS, TOPN_QUERY_RESULTS_RECORDS
     );
+    // TopN query (simulating column types from metastore)
+    tbl.setProperty(serdeConstants.LIST_COLUMNS, TOPN_COLUMN_NAMES);
+    tbl.setProperty(serdeConstants.LIST_COLUMN_TYPES, TOPN_COLUMN_TYPES);
+    SerDeUtils.initializeSerDe(serDe, conf, tbl, null);
+    deserializeQueryResults(serDe, Query.TOPN, TOPN_QUERY,
+            TOPN_QUERY_RESULTS, TOPN_QUERY_RESULTS_RECORDS_2
+    );
     // GroupBy query
     tbl = createPropertiesQuery("sample_datasource", Query.GROUP_BY, GROUP_BY_QUERY);
     SerDeUtils.initializeSerDe(serDe, conf, tbl, null);
     deserializeQueryResults(serDe, Query.GROUP_BY, GROUP_BY_QUERY,
             GROUP_BY_QUERY_RESULTS, GROUP_BY_QUERY_RESULTS_RECORDS
     );
+    // GroupBy query (simulating column types from metastore)
+    tbl.setProperty(serdeConstants.LIST_COLUMNS, GROUP_BY_COLUMN_NAMES);
+    tbl.setProperty(serdeConstants.LIST_COLUMN_TYPES, GROUP_BY_COLUMN_TYPES);
+    SerDeUtils.initializeSerDe(serDe, conf, tbl, null);
+    deserializeQueryResults(serDe, Query.GROUP_BY, GROUP_BY_QUERY,
+            GROUP_BY_QUERY_RESULTS, GROUP_BY_QUERY_RESULTS_RECORDS_2
+    );
     // Select query
     tbl = createPropertiesQuery("wikipedia", Query.SELECT, SELECT_QUERY);
     SerDeUtils.initializeSerDe(serDe, conf, tbl, null);
     deserializeQueryResults(serDe, Query.SELECT, SELECT_QUERY,
             SELECT_QUERY_RESULTS, SELECT_QUERY_RESULTS_RECORDS
     );
+    // Select query (simulating column types from metastore)
+    tbl.setProperty(serdeConstants.LIST_COLUMNS, SELECT_COLUMN_NAMES);
+    tbl.setProperty(serdeConstants.LIST_COLUMN_TYPES, SELECT_COLUMN_TYPES);
+    SerDeUtils.initializeSerDe(serDe, conf, tbl, null);
+    deserializeQueryResults(serDe, Query.SELECT, SELECT_QUERY,
+            SELECT_QUERY_RESULTS, SELECT_QUERY_RESULTS_RECORDS_2
+    );
   }
 
   private static Properties createPropertiesQuery(String dataSource, String queryType,
@@ -543,6 +646,7 @@ public class TestDruidSerDe {
     return tbl;
   }
 
+  @SuppressWarnings("unchecked")
   private static void deserializeQueryResults(DruidSerDe serDe, String queryType, String jsonQuery,
           String resultString, Object[][] records
   ) throws SerDeException, JsonParseException,
@@ -612,10 +716,11 @@ public class TestDruidSerDe {
     DruidWritable writable = new DruidWritable();
     int pos = 0;
     while (reader.next(NullWritable.get(), writable)) {
-      Object row = serDe.deserialize(writable);
+      List<Object> row = (List<Object>) serDe.deserialize(writable);
       Object[] expectedFieldsData = records[pos];
       assertEquals(expectedFieldsData.length, fieldRefs.size());
       for (int i = 0; i < fieldRefs.size(); i++) {
+        assertEquals("Field " + i + " type", expectedFieldsData[i].getClass(), row.get(i).getClass());
         Object fieldData = oi.getStructFieldData(row, fieldRefs.get(i));
         assertEquals("Field " + i, expectedFieldsData[i], fieldData);
       }
@@ -628,10 +733,11 @@ public class TestDruidSerDe {
     field2.set(reader, results);
     pos = 0;
     while (reader.nextKeyValue()) {
-      Object row = serDe.deserialize(reader.getCurrentValue());
+      List<Object> row = (List<Object>) serDe.deserialize(reader.getCurrentValue());
       Object[] expectedFieldsData = records[pos];
       assertEquals(expectedFieldsData.length, fieldRefs.size());
       for (int i = 0; i < fieldRefs.size(); i++) {
+        assertEquals("Field " + i + " type", expectedFieldsData[i].getClass(), row.get(i).getClass());
         Object fieldData = oi.getStructFieldData(row, fieldRefs.get(i));
         assertEquals("Field " + i, expectedFieldsData[i], fieldData);
       }
@@ -682,7 +788,7 @@ public class TestDruidSerDe {
    * @throws NoSuchMethodException
    */
   @Test
-  public void testDruidSerializer()
+  public void testDruidObjectSerializer()
           throws SerDeException, JsonParseException, JsonMappingException,
           NoSuchFieldException, SecurityException, IllegalArgumentException,
           IllegalAccessException, IOException, InterruptedException,
@@ -737,9 +843,62 @@ public class TestDruidSerDe {
     // Serialize
     DruidWritable writable = (DruidWritable) serDe.serialize(rowObject, inspector);
     // Check result
-    assertEquals(DRUID_WRITABLE.getValue().size(), writable.getValue().size());
-    for (Entry<String, Object> e: DRUID_WRITABLE.getValue().entrySet()) {
+    assertEquals(druidWritable.getValue().size(), writable.getValue().size());
+    for (Entry<String, Object> e: druidWritable.getValue().entrySet()) {
       assertEquals(e.getValue(), writable.getValue().get(e.getKey()));
     }
   }
+
+  private static final Object[] ROW_OBJECT_2 = new Object[] {
+      new TimestampWritable(new Timestamp(1377907200000L)),
+      new Text("dim1_val"),
+      new DoubleWritable(10669.3D),
+      new FloatWritable(10669.45F),
+      new HiveDecimalWritable(HiveDecimal.create(1064.34D)),
+      new LongWritable(1113939),
+      new IntWritable(1112123),
+      new ShortWritable((short) 12),
+      new ByteWritable((byte) 0)
+  };
+  private static final DruidWritable DRUID_WRITABLE_2 = new DruidWritable(
+      ImmutableMap.<String, Object>builder()
+          .put("__time", 1377907200000L)
+          .put("c0", "dim1_val")
+          .put("c1", 10669.3D)
+          .put("c2", 10669.45F)
+          .put("c3", 1064.34D)
+          .put("c4", 1113939L)
+          .put("c5", 1112123)
+          .put("c6", (short) 12)
+          .put("c7", (byte) 0)
+          .build());
+
+  @Test
+  public void testDruidObjectDeserializer()
+          throws SerDeException, JsonParseException, JsonMappingException,
+          NoSuchFieldException, SecurityException, IllegalArgumentException,
+          IllegalAccessException, IOException, InterruptedException,
+          NoSuchMethodException, InvocationTargetException {
+    // Create, initialize, and test the SerDe
+    DruidSerDe serDe = new DruidSerDe();
+    Configuration conf = new Configuration();
+    Properties tbl;
+    // Mixed source (all types)
+    tbl = createPropertiesSource(COLUMN_NAMES, COLUMN_TYPES);
+    SerDeUtils.initializeSerDe(serDe, conf, tbl, null);
+    deserializeObject(tbl, serDe, ROW_OBJECT_2, DRUID_WRITABLE_2);
+  }
+
+  @SuppressWarnings("unchecked")
+  private static void deserializeObject(Properties properties, DruidSerDe serDe,
+      Object[] rowObject, DruidWritable druidWritable) throws SerDeException {
+    // Deserialize
+    List<Object> object = (List<Object>) serDe.deserialize(druidWritable);
+    // Check result
+    assertEquals(rowObject.length, object.size());
+    for (int i = 0; i < rowObject.length; i++) {
+      assertEquals(rowObject[i].getClass(), object.get(i).getClass());
+      assertEquals(rowObject[i], object.get(i));
+    }
+  }
 }

http://git-wip-us.apache.org/repos/asf/hive/blob/727a3dfc/ql/src/test/queries/clientpositive/druid_basic2.q
----------------------------------------------------------------------
diff --git a/ql/src/test/queries/clientpositive/druid_basic2.q b/ql/src/test/queries/clientpositive/druid_basic2.q
index 530e53a..3c17bc5 100644
--- a/ql/src/test/queries/clientpositive/druid_basic2.q
+++ b/ql/src/test/queries/clientpositive/druid_basic2.q
@@ -59,6 +59,10 @@ ORDER BY CAST(robot AS INTEGER) ASC, m DESC
 LIMIT 10;
 
 EXPLAIN
+SELECT substring(namespace, CAST(deleted AS INT), 4)
+FROM druid_table_1;
+
+EXPLAIN
 SELECT robot, floor_day(`__time`)
 FROM druid_table_1
 WHERE floor_day(`__time`) BETWEEN '1999-11-01 00:00:00' AND '1999-11-10 00:00:00'

http://git-wip-us.apache.org/repos/asf/hive/blob/727a3dfc/ql/src/test/results/clientpositive/druid_basic2.q.out
----------------------------------------------------------------------
diff --git a/ql/src/test/results/clientpositive/druid_basic2.q.out b/ql/src/test/results/clientpositive/druid_basic2.q.out
index a3f05be..0d9e683 100644
--- a/ql/src/test/results/clientpositive/druid_basic2.q.out
+++ b/ql/src/test/results/clientpositive/druid_basic2.q.out
@@ -672,6 +672,46 @@ STAGE PLANS:
         ListSink
 
 PREHOOK: query: EXPLAIN
+SELECT substring(namespace, CAST(deleted AS INT), 4)
+FROM druid_table_1
+PREHOOK: type: QUERY
+POSTHOOK: query: EXPLAIN
+SELECT substring(namespace, CAST(deleted AS INT), 4)
+FROM druid_table_1
+POSTHOOK: type: QUERY
+STAGE DEPENDENCIES:
+  Stage-1 is a root stage
+  Stage-0 depends on stages: Stage-1
+
+STAGE PLANS:
+  Stage: Stage-1
+    Map Reduce
+      Map Operator Tree:
+          TableScan
+            alias: druid_table_1
+            properties:
+              druid.query.json {"queryType":"select","dataSource":"wikipedia","descending":false,"intervals":["1900-01-01T00:00:00.000/3000-01-01T00:00:00.000"],"dimensions":["namespace"],"metrics":["deleted"],"granularity":"all","pagingSpec":{"threshold":16384,"fromNext":true},"context":{"druid.query.fetch":false}}
+              druid.query.type select
+            Statistics: Num rows: 1 Data size: 0 Basic stats: PARTIAL Column stats: NONE
+            Select Operator
+              expressions: substring(namespace, UDFToInteger(deleted), 4) (type: string)
+              outputColumnNames: _col0
+              Statistics: Num rows: 1 Data size: 0 Basic stats: PARTIAL Column stats: NONE
+              File Output Operator
+                compressed: false
+                Statistics: Num rows: 1 Data size: 0 Basic stats: PARTIAL Column stats: NONE
+                table:
+                    input format: org.apache.hadoop.mapred.SequenceFileInputFormat
+                    output format: org.apache.hadoop.hive.ql.io.HiveSequenceFileOutputFormat
+                    serde: org.apache.hadoop.hive.serde2.lazy.LazySimpleSerDe
+
+  Stage: Stage-0
+    Fetch Operator
+      limit: -1
+      Processor Tree:
+        ListSink
+
+PREHOOK: query: EXPLAIN
 SELECT robot, floor_day(`__time`)
 FROM druid_table_1
 WHERE floor_day(`__time`) BETWEEN '1999-11-01 00:00:00' AND '1999-11-10 00:00:00'