You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@hive.apache.org by jc...@apache.org on 2017/05/31 14:55:14 UTC
hive git commit: HIVE-16776: Strange cast behavior for table backed
by druid (Jesus Camacho Rodriguez, reviewed by Ashutosh Chauhan)
Repository: hive
Updated Branches:
refs/heads/master 509308f64 -> 727a3dfcd
HIVE-16776: Strange cast behavior for table backed by druid (Jesus Camacho Rodriguez, reviewed by Ashutosh Chauhan)
Project: http://git-wip-us.apache.org/repos/asf/hive/repo
Commit: http://git-wip-us.apache.org/repos/asf/hive/commit/727a3dfc
Tree: http://git-wip-us.apache.org/repos/asf/hive/tree/727a3dfc
Diff: http://git-wip-us.apache.org/repos/asf/hive/diff/727a3dfc
Branch: refs/heads/master
Commit: 727a3dfcda43972b5947b8351223506e2bdfb4c0
Parents: 509308f
Author: Jesus Camacho Rodriguez <jc...@apache.org>
Authored: Wed May 31 15:54:34 2017 +0100
Committer: Jesus Camacho Rodriguez <jc...@apache.org>
Committed: Wed May 31 15:54:34 2017 +0100
----------------------------------------------------------------------
.../hadoop/hive/druid/serde/DruidSerDe.java | 95 ++++++++---
.../hadoop/hive/druid/TestDruidSerDe.java | 169 ++++++++++++++++++-
.../test/queries/clientpositive/druid_basic2.q | 4 +
.../results/clientpositive/druid_basic2.q.out | 40 +++++
4 files changed, 278 insertions(+), 30 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/hive/blob/727a3dfc/druid-handler/src/java/org/apache/hadoop/hive/druid/serde/DruidSerDe.java
----------------------------------------------------------------------
diff --git a/druid-handler/src/java/org/apache/hadoop/hive/druid/serde/DruidSerDe.java b/druid-handler/src/java/org/apache/hadoop/hive/druid/serde/DruidSerDe.java
index 656c0f1..2d11e4b 100644
--- a/druid-handler/src/java/org/apache/hadoop/hive/druid/serde/DruidSerDe.java
+++ b/druid-handler/src/java/org/apache/hadoop/hive/druid/serde/DruidSerDe.java
@@ -40,7 +40,10 @@ import org.apache.hadoop.hive.serde2.AbstractSerDe;
import org.apache.hadoop.hive.serde2.SerDeException;
import org.apache.hadoop.hive.serde2.SerDeSpec;
import org.apache.hadoop.hive.serde2.SerDeStats;
+import org.apache.hadoop.hive.serde2.io.ByteWritable;
+import org.apache.hadoop.hive.serde2.io.DoubleWritable;
import org.apache.hadoop.hive.serde2.io.HiveDecimalWritable;
+import org.apache.hadoop.hive.serde2.io.ShortWritable;
import org.apache.hadoop.hive.serde2.io.TimestampWritable;
import org.apache.hadoop.hive.serde2.objectinspector.ObjectInspector;
import org.apache.hadoop.hive.serde2.objectinspector.ObjectInspectorFactory;
@@ -58,12 +61,9 @@ import org.apache.hadoop.hive.serde2.objectinspector.primitive.StringObjectInspe
import org.apache.hadoop.hive.serde2.objectinspector.primitive.TimestampObjectInspector;
import org.apache.hadoop.hive.serde2.typeinfo.PrimitiveTypeInfo;
import org.apache.hadoop.hive.serde2.typeinfo.TypeInfoFactory;
-import org.apache.hadoop.io.ByteWritable;
-import org.apache.hadoop.io.DoubleWritable;
import org.apache.hadoop.io.FloatWritable;
import org.apache.hadoop.io.IntWritable;
import org.apache.hadoop.io.LongWritable;
-import org.apache.hadoop.io.ShortWritable;
import org.apache.hadoop.io.Text;
import org.apache.hadoop.io.Writable;
import org.apache.hadoop.util.StringUtils;
@@ -73,6 +73,7 @@ import org.slf4j.LoggerFactory;
import com.fasterxml.jackson.core.type.TypeReference;
import com.google.common.base.Function;
+import com.google.common.collect.ImmutableMap;
import com.google.common.collect.Lists;
import com.metamx.common.lifecycle.Lifecycle;
import com.metamx.http.client.HttpClient;
@@ -203,12 +204,29 @@ public class DruidSerDe extends AbstractSerDe {
try {
query = DruidStorageHandlerUtils.JSON_MAPPER.readValue(druidQuery, Query.class);
+ // Extract column names and types (if present)
+ ImmutableMap.Builder<String, PrimitiveTypeInfo> mapColumnNamesTypes = ImmutableMap.builder();
+ if (!org.apache.commons.lang3.StringUtils
+ .isEmpty(properties.getProperty(serdeConstants.LIST_COLUMNS))
+ && !org.apache.commons.lang3.StringUtils
+ .isEmpty(properties.getProperty(serdeConstants.LIST_COLUMN_TYPES))) {
+ List<String> propColumnNames = Utilities.getColumnNames(properties);
+ List<String> propColumnTypes = Utilities.getColumnTypes(properties);
+ for (int i = 0; i < propColumnNames.size(); i++) {
+ mapColumnNamesTypes.put(
+ propColumnNames.get(i),
+ TypeInfoFactory.getPrimitiveTypeInfo(propColumnTypes.get(i)));
+ }
+ }
+
switch (query.getType()) {
case Query.TIMESERIES:
- inferSchema((TimeseriesQuery) query, columnNames, columnTypes);
+ inferSchema((TimeseriesQuery) query, columnNames, columnTypes,
+ mapColumnNamesTypes.build());
break;
case Query.TOPN:
- inferSchema((TopNQuery) query, columnNames, columnTypes);
+ inferSchema((TopNQuery) query, columnNames, columnTypes,
+ mapColumnNamesTypes.build());
break;
case Query.SELECT:
String address = HiveConf.getVar(configuration,
@@ -216,10 +234,12 @@ public class DruidSerDe extends AbstractSerDe {
if (org.apache.commons.lang3.StringUtils.isEmpty(address)) {
throw new SerDeException("Druid broker address not specified in configuration");
}
- inferSchema((SelectQuery) query, columnNames, columnTypes, address);
+ inferSchema((SelectQuery) query, columnNames, columnTypes, address,
+ mapColumnNamesTypes.build());
break;
case Query.GROUP_BY:
- inferSchema((GroupByQuery) query, columnNames, columnTypes);
+ inferSchema((GroupByQuery) query, columnNames, columnTypes,
+ mapColumnNamesTypes.build());
break;
default:
throw new SerDeException("Not supported Druid query");
@@ -233,8 +253,7 @@ public class DruidSerDe extends AbstractSerDe {
for (int i = 0; i < columnTypes.size(); ++i) {
columns[i] = columnNames.get(i);
types[i] = columnTypes.get(i);
- inspectors
- .add(PrimitiveObjectInspectorFactory.getPrimitiveWritableObjectInspector(types[i]));
+ inspectors.add(PrimitiveObjectInspectorFactory.getPrimitiveWritableObjectInspector(types[i]));
}
inspector = ObjectInspectorFactory.getStandardStructObjectInspector(columnNames, inspectors);
}
@@ -280,16 +299,22 @@ public class DruidSerDe extends AbstractSerDe {
}
/* Timeseries query */
- private void inferSchema(TimeseriesQuery query, List<String> columnNames,
- List<PrimitiveTypeInfo> columnTypes
- ) {
+ private void inferSchema(TimeseriesQuery query,
+ List<String> columnNames, List<PrimitiveTypeInfo> columnTypes,
+ Map<String, PrimitiveTypeInfo> mapColumnNamesTypes) {
// Timestamp column
columnNames.add(DruidTable.DEFAULT_TIMESTAMP_COLUMN);
columnTypes.add(TypeInfoFactory.timestampTypeInfo);
// Aggregator columns
for (AggregatorFactory af : query.getAggregatorSpecs()) {
columnNames.add(af.getName());
- columnTypes.add(DruidSerDeUtils.convertDruidToHiveType(af.getTypeName()));
+ PrimitiveTypeInfo typeInfo = mapColumnNamesTypes.get(af.getName());
+ if (typeInfo != null) {
+ // If datasource was created by Hive, we consider Hive type
+ columnTypes.add(typeInfo);
+ } else {
+ columnTypes.add(DruidSerDeUtils.convertDruidToHiveType(af.getTypeName()));
+ }
}
// Post-aggregator columns
// TODO: Currently Calcite only infers avg for post-aggregate,
@@ -302,9 +327,9 @@ public class DruidSerDe extends AbstractSerDe {
}
/* TopN query */
- private void inferSchema(TopNQuery query, List<String> columnNames,
- List<PrimitiveTypeInfo> columnTypes
- ) {
+ private void inferSchema(TopNQuery query,
+ List<String> columnNames, List<PrimitiveTypeInfo> columnTypes,
+ Map<String, PrimitiveTypeInfo> mapColumnNamesTypes) {
// Timestamp column
columnNames.add(DruidTable.DEFAULT_TIMESTAMP_COLUMN);
columnTypes.add(TypeInfoFactory.timestampTypeInfo);
@@ -314,7 +339,13 @@ public class DruidSerDe extends AbstractSerDe {
// Aggregator columns
for (AggregatorFactory af : query.getAggregatorSpecs()) {
columnNames.add(af.getName());
- columnTypes.add(DruidSerDeUtils.convertDruidToHiveType(af.getTypeName()));
+ PrimitiveTypeInfo typeInfo = mapColumnNamesTypes.get(af.getName());
+ if (typeInfo != null) {
+ // If datasource was created by Hive, we consider Hive type
+ columnTypes.add(typeInfo);
+ } else {
+ columnTypes.add(DruidSerDeUtils.convertDruidToHiveType(af.getTypeName()));
+ }
}
// Post-aggregator columns
// TODO: Currently Calcite only infers avg for post-aggregate,
@@ -327,8 +358,10 @@ public class DruidSerDe extends AbstractSerDe {
}
/* Select query */
- private void inferSchema(SelectQuery query, List<String> columnNames,
- List<PrimitiveTypeInfo> columnTypes, String address) throws SerDeException {
+ private void inferSchema(SelectQuery query,
+ List<String> columnNames, List<PrimitiveTypeInfo> columnTypes,
+ String address, Map<String, PrimitiveTypeInfo> mapColumnNamesTypes)
+ throws SerDeException {
// Timestamp column
columnNames.add(DruidTable.DEFAULT_TIMESTAMP_COLUMN);
columnTypes.add(TypeInfoFactory.timestampTypeInfo);
@@ -356,15 +389,21 @@ public class DruidSerDe extends AbstractSerDe {
}
for (String metric : query.getMetrics()) {
columnNames.add(metric);
- columnTypes.add(DruidSerDeUtils.convertDruidToHiveType(
- schemaInfo.getColumns().get(metric).getType()));
+ PrimitiveTypeInfo typeInfo = mapColumnNamesTypes.get(metric);
+ if (typeInfo != null) {
+ // If datasource was created by Hive, we consider Hive type
+ columnTypes.add(typeInfo);
+ } else {
+ columnTypes.add(DruidSerDeUtils.convertDruidToHiveType(
+ schemaInfo.getColumns().get(metric).getType()));
+ }
}
}
/* GroupBy query */
- private void inferSchema(GroupByQuery query, List<String> columnNames,
- List<PrimitiveTypeInfo> columnTypes
- ) {
+ private void inferSchema(GroupByQuery query,
+ List<String> columnNames, List<PrimitiveTypeInfo> columnTypes,
+ Map<String, PrimitiveTypeInfo> mapColumnNamesTypes) {
// Timestamp column
columnNames.add(DruidTable.DEFAULT_TIMESTAMP_COLUMN);
columnTypes.add(TypeInfoFactory.timestampTypeInfo);
@@ -376,7 +415,13 @@ public class DruidSerDe extends AbstractSerDe {
// Aggregator columns
for (AggregatorFactory af : query.getAggregatorSpecs()) {
columnNames.add(af.getName());
- columnTypes.add(DruidSerDeUtils.convertDruidToHiveType(af.getTypeName()));
+ PrimitiveTypeInfo typeInfo = mapColumnNamesTypes.get(af.getName());
+ if (typeInfo != null) {
+ // If datasource was created by Hive, we consider Hive type
+ columnTypes.add(typeInfo);
+ } else {
+ columnTypes.add(DruidSerDeUtils.convertDruidToHiveType(af.getTypeName()));
+ }
}
// Post-aggregator columns
// TODO: Currently Calcite only infers avg for post-aggregate,
http://git-wip-us.apache.org/repos/asf/hive/blob/727a3dfc/druid-handler/src/test/org/apache/hadoop/hive/druid/TestDruidSerDe.java
----------------------------------------------------------------------
diff --git a/druid-handler/src/test/org/apache/hadoop/hive/druid/TestDruidSerDe.java b/druid-handler/src/test/org/apache/hadoop/hive/druid/TestDruidSerDe.java
index 1bd5d84..695e0dd 100644
--- a/druid-handler/src/test/org/apache/hadoop/hive/druid/TestDruidSerDe.java
+++ b/druid-handler/src/test/org/apache/hadoop/hive/druid/TestDruidSerDe.java
@@ -143,6 +143,16 @@ public class TestDruidSerDe {
new FloatWritable(3.32F), new FloatWritable(4F) }
};
+ // Timeseries query results as records (types defined by metastore)
+ private static final String TIMESERIES_COLUMN_NAMES = "__time,sample_name1,sample_name2,sample_divide";
+ private static final String TIMESERIES_COLUMN_TYPES = "timestamp,smallint,double,float";
+ private static final Object[][] TIMESERIES_QUERY_RESULTS_RECORDS_2 = new Object[][] {
+ new Object[] { new TimestampWritable(new Timestamp(1325376000000L)), new ShortWritable((short) 0),
+ new DoubleWritable(1.0d), new FloatWritable(2.2222F) },
+ new Object[] { new TimestampWritable(new Timestamp(1325462400000L)), new ShortWritable((short) 2),
+ new DoubleWritable(3.32d), new FloatWritable(4F) }
+ };
+
// TopN query
private static final String TOPN_QUERY =
"{ \"queryType\": \"topN\", "
@@ -259,6 +269,27 @@ public class TestDruidSerDe {
new FloatWritable(46.45F) }
};
+ // TopN query results as records (types defined by metastore)
+ private static final String TOPN_COLUMN_NAMES = "__time,sample_dim,count,some_metric,sample_divide";
+ private static final String TOPN_COLUMN_TYPES = "timestamp,string,bigint,double,float";
+ private static final Object[][] TOPN_QUERY_RESULTS_RECORDS_2 = new Object[][] {
+ new Object[] { new TimestampWritable(new Timestamp(1377907200000L)),
+ new Text("dim1_val"), new LongWritable(111), new DoubleWritable(10669d),
+ new FloatWritable(96.11711711711712F) },
+ new Object[] { new TimestampWritable(new Timestamp(1377907200000L)),
+ new Text("another_dim1_val"), new LongWritable(88), new DoubleWritable(28344d),
+ new FloatWritable(322.09090909090907F) },
+ new Object[] { new TimestampWritable(new Timestamp(1377907200000L)),
+ new Text("dim1_val3"), new LongWritable(70), new DoubleWritable(871d),
+ new FloatWritable(12.442857142857143F) },
+ new Object[] { new TimestampWritable(new Timestamp(1377907200000L)),
+ new Text("dim1_val4"), new LongWritable(62), new DoubleWritable(815d),
+ new FloatWritable(13.14516129032258F) },
+ new Object[] { new TimestampWritable(new Timestamp(1377907200000L)),
+ new Text("dim1_val5"), new LongWritable(60), new DoubleWritable(2787d),
+ new FloatWritable(46.45F) }
+ };
+
// GroupBy query
private static final String GROUP_BY_QUERY =
"{ "
@@ -339,6 +370,18 @@ public class TestDruidSerDe {
new FloatWritable(6.333333F) }
};
+ // GroupBy query results as records (types defined by metastore)
+ private static final String GROUP_BY_COLUMN_NAMES = "__time,country,device,total_usage,data_transfer,avg_usage";
+ private static final String GROUP_BY_COLUMN_TYPES = "timestamp,string,string,int,double,float";
+ private static final Object[][] GROUP_BY_QUERY_RESULTS_RECORDS_2 = new Object[][] {
+ new Object[] { new TimestampWritable(new Timestamp(1325376000000L)), new Text("India"),
+ new Text("phone"), new IntWritable(88), new DoubleWritable(29.91233453F),
+ new FloatWritable(60.32F) },
+ new Object[] { new TimestampWritable(new Timestamp(1325376012000L)), new Text("Spain"),
+ new Text("pc"), new IntWritable(16), new DoubleWritable(172.93494959F),
+ new FloatWritable(6.333333F) }
+ };
+
// Select query
private static final String SELECT_QUERY =
"{ \"queryType\": \"select\", "
@@ -483,6 +526,38 @@ public class TestDruidSerDe {
new FloatWritable(68.0F), new FloatWritable(0.0F) }
};
+ // Select query results as records (types defined by metastore)
+ private static final String SELECT_COLUMN_NAMES = "__time,robot,namespace,anonymous,unpatrolled,page,language,newpage,user,count,added,delta,variation,deleted";
+ private static final String SELECT_COLUMN_TYPES = "timestamp,string,string,string,string,string,string,string,string,double,double,float,float,float";
+ private static final Object[][] SELECT_QUERY_RESULTS_RECORDS_2 = new Object[][] {
+ new Object[] { new TimestampWritable(new Timestamp(1356998400000L)), new Text("1"),
+ new Text("article"), new Text("0"), new Text("0"),
+ new Text("11._korpus_(NOVJ)"), new Text("sl"), new Text("0"),
+ new Text("EmausBot"),
+ new DoubleWritable(1.0d), new DoubleWritable(39.0d), new FloatWritable(39.0F),
+ new FloatWritable(39.0F), new FloatWritable(0.0F) },
+ new Object[] { new TimestampWritable(new Timestamp(1356998400000L)), new Text("0"),
+ new Text("article"), new Text("0"), new Text("0"),
+ new Text("112_U.S._580"), new Text("en"), new Text("1"), new Text("MZMcBride"),
+ new DoubleWritable(1.0d), new DoubleWritable(70.0d), new FloatWritable(70.0F),
+ new FloatWritable(70.0F), new FloatWritable(0.0F) },
+ new Object[] { new TimestampWritable(new Timestamp(1356998412000L)), new Text("0"),
+ new Text("article"), new Text("0"), new Text("0"),
+ new Text("113_U.S._243"), new Text("en"), new Text("1"), new Text("MZMcBride"),
+ new DoubleWritable(1.0d), new DoubleWritable(77.0d), new FloatWritable(77.0F),
+ new FloatWritable(77.0F), new FloatWritable(0.0F) },
+ new Object[] { new TimestampWritable(new Timestamp(1356998412000L)), new Text("0"),
+ new Text("article"), new Text("0"), new Text("0"),
+ new Text("113_U.S._73"), new Text("en"), new Text("1"), new Text("MZMcBride"),
+ new DoubleWritable(1.0d), new DoubleWritable(70.0d), new FloatWritable(70.0F),
+ new FloatWritable(70.0F), new FloatWritable(0.0F) },
+ new Object[] { new TimestampWritable(new Timestamp(1356998412000L)), new Text("0"),
+ new Text("article"), new Text("0"), new Text("0"),
+ new Text("113_U.S._756"), new Text("en"), new Text("1"), new Text("MZMcBride"),
+ new DoubleWritable(1.0d), new DoubleWritable(68.0d), new FloatWritable(68.0F),
+ new FloatWritable(68.0F), new FloatWritable(0.0F) }
+ };
+
/**
* Test the default behavior of the objects and object inspectors.
* @throws IOException
@@ -511,24 +586,52 @@ public class TestDruidSerDe {
deserializeQueryResults(serDe, Query.TIMESERIES, TIMESERIES_QUERY,
TIMESERIES_QUERY_RESULTS, TIMESERIES_QUERY_RESULTS_RECORDS
);
+ // Timeseries query (simulating column types from metastore)
+ tbl.setProperty(serdeConstants.LIST_COLUMNS, TIMESERIES_COLUMN_NAMES);
+ tbl.setProperty(serdeConstants.LIST_COLUMN_TYPES, TIMESERIES_COLUMN_TYPES);
+ SerDeUtils.initializeSerDe(serDe, conf, tbl, null);
+ deserializeQueryResults(serDe, Query.TIMESERIES, TIMESERIES_QUERY,
+ TIMESERIES_QUERY_RESULTS, TIMESERIES_QUERY_RESULTS_RECORDS_2
+ );
// TopN query
tbl = createPropertiesQuery("sample_data", Query.TOPN, TOPN_QUERY);
SerDeUtils.initializeSerDe(serDe, conf, tbl, null);
deserializeQueryResults(serDe, Query.TOPN, TOPN_QUERY,
TOPN_QUERY_RESULTS, TOPN_QUERY_RESULTS_RECORDS
);
+ // TopN query (simulating column types from metastore)
+ tbl.setProperty(serdeConstants.LIST_COLUMNS, TOPN_COLUMN_NAMES);
+ tbl.setProperty(serdeConstants.LIST_COLUMN_TYPES, TOPN_COLUMN_TYPES);
+ SerDeUtils.initializeSerDe(serDe, conf, tbl, null);
+ deserializeQueryResults(serDe, Query.TOPN, TOPN_QUERY,
+ TOPN_QUERY_RESULTS, TOPN_QUERY_RESULTS_RECORDS_2
+ );
// GroupBy query
tbl = createPropertiesQuery("sample_datasource", Query.GROUP_BY, GROUP_BY_QUERY);
SerDeUtils.initializeSerDe(serDe, conf, tbl, null);
deserializeQueryResults(serDe, Query.GROUP_BY, GROUP_BY_QUERY,
GROUP_BY_QUERY_RESULTS, GROUP_BY_QUERY_RESULTS_RECORDS
);
+ // GroupBy query (simulating column types from metastore)
+ tbl.setProperty(serdeConstants.LIST_COLUMNS, GROUP_BY_COLUMN_NAMES);
+ tbl.setProperty(serdeConstants.LIST_COLUMN_TYPES, GROUP_BY_COLUMN_TYPES);
+ SerDeUtils.initializeSerDe(serDe, conf, tbl, null);
+ deserializeQueryResults(serDe, Query.GROUP_BY, GROUP_BY_QUERY,
+ GROUP_BY_QUERY_RESULTS, GROUP_BY_QUERY_RESULTS_RECORDS_2
+ );
// Select query
tbl = createPropertiesQuery("wikipedia", Query.SELECT, SELECT_QUERY);
SerDeUtils.initializeSerDe(serDe, conf, tbl, null);
deserializeQueryResults(serDe, Query.SELECT, SELECT_QUERY,
SELECT_QUERY_RESULTS, SELECT_QUERY_RESULTS_RECORDS
);
+ // Select query (simulating column types from metastore)
+ tbl.setProperty(serdeConstants.LIST_COLUMNS, SELECT_COLUMN_NAMES);
+ tbl.setProperty(serdeConstants.LIST_COLUMN_TYPES, SELECT_COLUMN_TYPES);
+ SerDeUtils.initializeSerDe(serDe, conf, tbl, null);
+ deserializeQueryResults(serDe, Query.SELECT, SELECT_QUERY,
+ SELECT_QUERY_RESULTS, SELECT_QUERY_RESULTS_RECORDS_2
+ );
}
private static Properties createPropertiesQuery(String dataSource, String queryType,
@@ -543,6 +646,7 @@ public class TestDruidSerDe {
return tbl;
}
+ @SuppressWarnings("unchecked")
private static void deserializeQueryResults(DruidSerDe serDe, String queryType, String jsonQuery,
String resultString, Object[][] records
) throws SerDeException, JsonParseException,
@@ -612,10 +716,11 @@ public class TestDruidSerDe {
DruidWritable writable = new DruidWritable();
int pos = 0;
while (reader.next(NullWritable.get(), writable)) {
- Object row = serDe.deserialize(writable);
+ List<Object> row = (List<Object>) serDe.deserialize(writable);
Object[] expectedFieldsData = records[pos];
assertEquals(expectedFieldsData.length, fieldRefs.size());
for (int i = 0; i < fieldRefs.size(); i++) {
+ assertEquals("Field " + i + " type", expectedFieldsData[i].getClass(), row.get(i).getClass());
Object fieldData = oi.getStructFieldData(row, fieldRefs.get(i));
assertEquals("Field " + i, expectedFieldsData[i], fieldData);
}
@@ -628,10 +733,11 @@ public class TestDruidSerDe {
field2.set(reader, results);
pos = 0;
while (reader.nextKeyValue()) {
- Object row = serDe.deserialize(reader.getCurrentValue());
+ List<Object> row = (List<Object>) serDe.deserialize(reader.getCurrentValue());
Object[] expectedFieldsData = records[pos];
assertEquals(expectedFieldsData.length, fieldRefs.size());
for (int i = 0; i < fieldRefs.size(); i++) {
+ assertEquals("Field " + i + " type", expectedFieldsData[i].getClass(), row.get(i).getClass());
Object fieldData = oi.getStructFieldData(row, fieldRefs.get(i));
assertEquals("Field " + i, expectedFieldsData[i], fieldData);
}
@@ -682,7 +788,7 @@ public class TestDruidSerDe {
* @throws NoSuchMethodException
*/
@Test
- public void testDruidSerializer()
+ public void testDruidObjectSerializer()
throws SerDeException, JsonParseException, JsonMappingException,
NoSuchFieldException, SecurityException, IllegalArgumentException,
IllegalAccessException, IOException, InterruptedException,
@@ -737,9 +843,62 @@ public class TestDruidSerDe {
// Serialize
DruidWritable writable = (DruidWritable) serDe.serialize(rowObject, inspector);
// Check result
- assertEquals(DRUID_WRITABLE.getValue().size(), writable.getValue().size());
- for (Entry<String, Object> e: DRUID_WRITABLE.getValue().entrySet()) {
+ assertEquals(druidWritable.getValue().size(), writable.getValue().size());
+ for (Entry<String, Object> e: druidWritable.getValue().entrySet()) {
assertEquals(e.getValue(), writable.getValue().get(e.getKey()));
}
}
+
+ private static final Object[] ROW_OBJECT_2 = new Object[] {
+ new TimestampWritable(new Timestamp(1377907200000L)),
+ new Text("dim1_val"),
+ new DoubleWritable(10669.3D),
+ new FloatWritable(10669.45F),
+ new HiveDecimalWritable(HiveDecimal.create(1064.34D)),
+ new LongWritable(1113939),
+ new IntWritable(1112123),
+ new ShortWritable((short) 12),
+ new ByteWritable((byte) 0)
+ };
+ private static final DruidWritable DRUID_WRITABLE_2 = new DruidWritable(
+ ImmutableMap.<String, Object>builder()
+ .put("__time", 1377907200000L)
+ .put("c0", "dim1_val")
+ .put("c1", 10669.3D)
+ .put("c2", 10669.45F)
+ .put("c3", 1064.34D)
+ .put("c4", 1113939L)
+ .put("c5", 1112123)
+ .put("c6", (short) 12)
+ .put("c7", (byte) 0)
+ .build());
+
+ @Test
+ public void testDruidObjectDeserializer()
+ throws SerDeException, JsonParseException, JsonMappingException,
+ NoSuchFieldException, SecurityException, IllegalArgumentException,
+ IllegalAccessException, IOException, InterruptedException,
+ NoSuchMethodException, InvocationTargetException {
+ // Create, initialize, and test the SerDe
+ DruidSerDe serDe = new DruidSerDe();
+ Configuration conf = new Configuration();
+ Properties tbl;
+ // Mixed source (all types)
+ tbl = createPropertiesSource(COLUMN_NAMES, COLUMN_TYPES);
+ SerDeUtils.initializeSerDe(serDe, conf, tbl, null);
+ deserializeObject(tbl, serDe, ROW_OBJECT_2, DRUID_WRITABLE_2);
+ }
+
+ @SuppressWarnings("unchecked")
+ private static void deserializeObject(Properties properties, DruidSerDe serDe,
+ Object[] rowObject, DruidWritable druidWritable) throws SerDeException {
+ // Deserialize
+ List<Object> object = (List<Object>) serDe.deserialize(druidWritable);
+ // Check result
+ assertEquals(rowObject.length, object.size());
+ for (int i = 0; i < rowObject.length; i++) {
+ assertEquals(rowObject[i].getClass(), object.get(i).getClass());
+ assertEquals(rowObject[i], object.get(i));
+ }
+ }
}
http://git-wip-us.apache.org/repos/asf/hive/blob/727a3dfc/ql/src/test/queries/clientpositive/druid_basic2.q
----------------------------------------------------------------------
diff --git a/ql/src/test/queries/clientpositive/druid_basic2.q b/ql/src/test/queries/clientpositive/druid_basic2.q
index 530e53a..3c17bc5 100644
--- a/ql/src/test/queries/clientpositive/druid_basic2.q
+++ b/ql/src/test/queries/clientpositive/druid_basic2.q
@@ -59,6 +59,10 @@ ORDER BY CAST(robot AS INTEGER) ASC, m DESC
LIMIT 10;
EXPLAIN
+SELECT substring(namespace, CAST(deleted AS INT), 4)
+FROM druid_table_1;
+
+EXPLAIN
SELECT robot, floor_day(`__time`)
FROM druid_table_1
WHERE floor_day(`__time`) BETWEEN '1999-11-01 00:00:00' AND '1999-11-10 00:00:00'
http://git-wip-us.apache.org/repos/asf/hive/blob/727a3dfc/ql/src/test/results/clientpositive/druid_basic2.q.out
----------------------------------------------------------------------
diff --git a/ql/src/test/results/clientpositive/druid_basic2.q.out b/ql/src/test/results/clientpositive/druid_basic2.q.out
index a3f05be..0d9e683 100644
--- a/ql/src/test/results/clientpositive/druid_basic2.q.out
+++ b/ql/src/test/results/clientpositive/druid_basic2.q.out
@@ -672,6 +672,46 @@ STAGE PLANS:
ListSink
PREHOOK: query: EXPLAIN
+SELECT substring(namespace, CAST(deleted AS INT), 4)
+FROM druid_table_1
+PREHOOK: type: QUERY
+POSTHOOK: query: EXPLAIN
+SELECT substring(namespace, CAST(deleted AS INT), 4)
+FROM druid_table_1
+POSTHOOK: type: QUERY
+STAGE DEPENDENCIES:
+ Stage-1 is a root stage
+ Stage-0 depends on stages: Stage-1
+
+STAGE PLANS:
+ Stage: Stage-1
+ Map Reduce
+ Map Operator Tree:
+ TableScan
+ alias: druid_table_1
+ properties:
+ druid.query.json {"queryType":"select","dataSource":"wikipedia","descending":false,"intervals":["1900-01-01T00:00:00.000/3000-01-01T00:00:00.000"],"dimensions":["namespace"],"metrics":["deleted"],"granularity":"all","pagingSpec":{"threshold":16384,"fromNext":true},"context":{"druid.query.fetch":false}}
+ druid.query.type select
+ Statistics: Num rows: 1 Data size: 0 Basic stats: PARTIAL Column stats: NONE
+ Select Operator
+ expressions: substring(namespace, UDFToInteger(deleted), 4) (type: string)
+ outputColumnNames: _col0
+ Statistics: Num rows: 1 Data size: 0 Basic stats: PARTIAL Column stats: NONE
+ File Output Operator
+ compressed: false
+ Statistics: Num rows: 1 Data size: 0 Basic stats: PARTIAL Column stats: NONE
+ table:
+ input format: org.apache.hadoop.mapred.SequenceFileInputFormat
+ output format: org.apache.hadoop.hive.ql.io.HiveSequenceFileOutputFormat
+ serde: org.apache.hadoop.hive.serde2.lazy.LazySimpleSerDe
+
+ Stage: Stage-0
+ Fetch Operator
+ limit: -1
+ Processor Tree:
+ ListSink
+
+PREHOOK: query: EXPLAIN
SELECT robot, floor_day(`__time`)
FROM druid_table_1
WHERE floor_day(`__time`) BETWEEN '1999-11-01 00:00:00' AND '1999-11-10 00:00:00'