You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@hive.apache.org by jc...@apache.org on 2016/09/08 08:46:00 UTC
[5/6] hive git commit: HIVE-14217: Druid integration (Jesus Camacho
Rodriguez, reviewed by Ashutosh Chauhan)
http://git-wip-us.apache.org/repos/asf/hive/blob/58d1befa/druid-handler/src/java/org/apache/hadoop/hive/druid/serde/DruidTopNQueryRecordReader.java
----------------------------------------------------------------------
diff --git a/druid-handler/src/java/org/apache/hadoop/hive/druid/serde/DruidTopNQueryRecordReader.java b/druid-handler/src/java/org/apache/hadoop/hive/druid/serde/DruidTopNQueryRecordReader.java
new file mode 100644
index 0000000..0b87976
--- /dev/null
+++ b/druid-handler/src/java/org/apache/hadoop/hive/druid/serde/DruidTopNQueryRecordReader.java
@@ -0,0 +1,106 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.hadoop.hive.druid.serde;
+
+import java.io.IOException;
+import java.io.InputStream;
+import java.util.Iterator;
+import java.util.List;
+
+import org.apache.hadoop.hive.druid.DruidStorageHandlerUtils;
+import org.apache.hadoop.hive.ql.optimizer.calcite.druid.DruidTable;
+import org.apache.hadoop.io.NullWritable;
+
+import com.fasterxml.jackson.core.type.TypeReference;
+import com.google.common.collect.Iterators;
+
+import io.druid.query.Result;
+import io.druid.query.topn.DimensionAndMetricValueExtractor;
+import io.druid.query.topn.TopNQuery;
+import io.druid.query.topn.TopNResultValue;
+
+/**
+ * Record reader for results for Druid TopNQuery.
+ */
+public class DruidTopNQueryRecordReader
+ extends DruidQueryRecordReader<TopNQuery, Result<TopNResultValue>> {
+
+ private Result<TopNResultValue> current;
+ private Iterator<DimensionAndMetricValueExtractor> values = Iterators.emptyIterator();
+
+ @Override
+ protected TopNQuery createQuery(String content) throws IOException {
+ return DruidStorageHandlerUtils.JSON_MAPPER.readValue(content, TopNQuery.class);
+ }
+
+ @Override
+ protected List<Result<TopNResultValue>> createResultsList(InputStream content) throws IOException {
+ return DruidStorageHandlerUtils.SMILE_MAPPER.readValue(content,
+ new TypeReference<List<Result<TopNResultValue>>>(){});
+ }
+
+ @Override
+ public boolean nextKeyValue() {
+ if (values.hasNext()) {
+ return true;
+ }
+ if (results.hasNext()) {
+ current = results.next();
+ values = current.getValue().getValue().iterator();
+ return true;
+ }
+ return false;
+ }
+
+ @Override
+ public NullWritable getCurrentKey() throws IOException, InterruptedException {
+ return NullWritable.get();
+ }
+
+ @Override
+ public DruidWritable getCurrentValue() throws IOException, InterruptedException {
+ // Create new value
+ DruidWritable value = new DruidWritable();
+ value.getValue().put(DruidTable.DEFAULT_TIMESTAMP_COLUMN, current.getTimestamp().getMillis());
+ if (values.hasNext()) {
+ value.getValue().putAll(values.next().getBaseObject());
+ return value;
+ }
+ return value;
+ }
+
+ @Override
+ public boolean next(NullWritable key, DruidWritable value) {
+ if (nextKeyValue()) {
+ // Update value
+ value.getValue().clear();
+ value.getValue().put(DruidTable.DEFAULT_TIMESTAMP_COLUMN, current.getTimestamp().getMillis());
+ if (values.hasNext()) {
+ value.getValue().putAll(values.next().getBaseObject());
+ }
+ return true;
+ }
+ return false;
+ }
+
+ @Override
+ public float getProgress() {
+ return results.hasNext() || values.hasNext() ? 0 : 1;
+ }
+
+}
http://git-wip-us.apache.org/repos/asf/hive/blob/58d1befa/druid-handler/src/java/org/apache/hadoop/hive/druid/serde/DruidWritable.java
----------------------------------------------------------------------
diff --git a/druid-handler/src/java/org/apache/hadoop/hive/druid/serde/DruidWritable.java b/druid-handler/src/java/org/apache/hadoop/hive/druid/serde/DruidWritable.java
new file mode 100644
index 0000000..77ffcd4
--- /dev/null
+++ b/druid-handler/src/java/org/apache/hadoop/hive/druid/serde/DruidWritable.java
@@ -0,0 +1,81 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.hadoop.hive.druid.serde;
+
+import java.io.DataInput;
+import java.io.DataOutput;
+import java.io.IOException;
+import java.util.HashMap;
+import java.util.Map;
+
+import org.apache.hadoop.io.Writable;
+
+import com.google.common.base.Objects;
+
+/**
+ * Writable for Druid results.
+ */
+public class DruidWritable implements Writable {
+
+ private final Map<String, Object> value;
+
+ public DruidWritable() {
+ value = new HashMap<>();
+ }
+
+ public DruidWritable(Map<String, Object> value) {
+ this.value = value;
+ }
+
+ public Map<String, Object> getValue() {
+ return value;
+ }
+
+ @Override
+ public void write(DataOutput out) throws IOException {
+ throw new UnsupportedOperationException();
+ }
+
+ @Override
+ public void readFields(DataInput in) throws IOException {
+ throw new UnsupportedOperationException();
+ }
+
+ @Override
+ public int hashCode() {
+ return Objects.hashCode(value);
+ }
+
+ @Override
+ public boolean equals(Object o) {
+ if (this == o) {
+ return true;
+ }
+ if (o == null || getClass() != o.getClass()) {
+ return false;
+ }
+
+ return Objects.equal(value, ((DruidWritable) o).value);
+ }
+
+ @Override
+ public String toString() {
+ return "DruidWritable{value=" + value + '}';
+ }
+
+}
http://git-wip-us.apache.org/repos/asf/hive/blob/58d1befa/druid-handler/src/test/org/apache/hadoop/hive/druid/QTestDruidSerDe.java
----------------------------------------------------------------------
diff --git a/druid-handler/src/test/org/apache/hadoop/hive/druid/QTestDruidSerDe.java b/druid-handler/src/test/org/apache/hadoop/hive/druid/QTestDruidSerDe.java
new file mode 100644
index 0000000..2b4df78
--- /dev/null
+++ b/druid-handler/src/test/org/apache/hadoop/hive/druid/QTestDruidSerDe.java
@@ -0,0 +1,88 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.hadoop.hive.druid;
+
+import java.util.List;
+
+import org.apache.hadoop.hive.druid.serde.DruidSerDe;
+import org.apache.hadoop.hive.serde2.SerDeException;
+import org.apache.hadoop.util.StringUtils;
+
+import com.fasterxml.jackson.core.type.TypeReference;
+
+import io.druid.query.metadata.metadata.SegmentAnalysis;
+import io.druid.query.metadata.metadata.SegmentMetadataQuery;
+
+/**
+ * Druid SerDe to be used in tests.
+ */
+public class QTestDruidSerDe extends DruidSerDe {
+
+ // Request :
+ // "{\"queryType\":\"segmentMetadata\",\"dataSource\":{\"type\":\"table\",\"name\":\"wikipedia\"},"
+ // + "\"intervals\":{\"type\":\"intervals\","
+ // + "\"intervals\":[\"-146136543-09-08T00:30:34.096-07:52:58/146140482-04-24T08:36:27.903-07:00\"]},"
+ // + "\"toInclude\":{\"type\":\"all\"},\"merge\":true,\"context\":null,\"analysisTypes\":[],"
+ // + "\"usingDefaultInterval\":true,\"lenientAggregatorMerge\":false,\"descending\":false}";
+ private static final String RESPONSE =
+ "[ {\r\n "
+ + " \"id\" : \"merged\",\r\n "
+ + " \"intervals\" : [ \"2010-01-01T00:00:00.000Z/2015-12-31T00:00:00.000Z\" ],\r\n "
+ + " \"columns\" : {\r\n "
+ + " \"__time\" : { \"type\" : \"LONG\", \"hasMultipleValues\" : false, \"size\" : 407240380, \"cardinality\" : null, \"errorMessage\" : null },\r\n "
+ + " \"robot\" : { \"type\" : \"STRING\", \"hasMultipleValues\" : false, \"size\" : 100000, \"cardinality\" : 1944, \"errorMessage\" : null },\r\n "
+ + " \"namespace\" : { \"type\" : \"STRING\", \"hasMultipleValues\" : true, \"size\" : 100000, \"cardinality\" : 1504, \"errorMessage\" : null },\r\n "
+ + " \"anonymous\" : { \"type\" : \"STRING\", \"hasMultipleValues\" : false, \"size\" : 100000, \"cardinality\" : 1944, \"errorMessage\" : null },\r\n "
+ + " \"unpatrolled\" : { \"type\" : \"STRING\", \"hasMultipleValues\" : false, \"size\" : 100000, \"cardinality\" : 1944, \"errorMessage\" : null },\r\n "
+ + " \"page\" : { \"type\" : \"STRING\", \"hasMultipleValues\" : false, \"size\" : 100000, \"cardinality\" : 1944, \"errorMessage\" : null },\r\n "
+ + " \"language\" : { \"type\" : \"STRING\", \"hasMultipleValues\" : false, \"size\" : 100000, \"cardinality\" : 1944, \"errorMessage\" : null },\r\n "
+ + " \"newpage\" : { \"type\" : \"STRING\", \"hasMultipleValues\" : false, \"size\" : 100000, \"cardinality\" : 1944, \"errorMessage\" : null },\r\n "
+ + " \"user\" : { \"type\" : \"STRING\", \"hasMultipleValues\" : false, \"size\" : 100000, \"cardinality\" : 1944, \"errorMessage\" : null },\r\n "
+ + " \"count\" : { \"type\" : \"FLOAT\", \"hasMultipleValues\" : false, \"size\" : 100000, \"cardinality\" : null, \"errorMessage\" : null },\r\n "
+ + " \"added\" : { \"type\" : \"FLOAT\", \"hasMultipleValues\" : false, \"size\" : 100000, \"cardinality\" : null, \"errorMessage\" : null },\r\n "
+ + " \"delta\" : { \"type\" : \"FLOAT\", \"hasMultipleValues\" : false, \"size\" : 100000, \"cardinality\" : null, \"errorMessage\" : null },\r\n "
+ + " \"variation\" : { \"type\" : \"FLOAT\", \"hasMultipleValues\" : false, \"size\" : 100000, \"cardinality\" : null, \"errorMessage\" : null },\r\n "
+ + " \"deleted\" : { \"type\" : \"FLOAT\", \"hasMultipleValues\" : false, \"size\" : 100000, \"cardinality\" : null, \"errorMessage\" : null }\r\n "
+ + " },\r\n "
+ + " \"aggregators\" : {\r\n "
+ + " \"count\" : { \"type\" : \"longSum\", \"name\" : \"count\", \"fieldName\" : \"count\" },\r\n "
+ + " \"added\" : { \"type\" : \"doubleSum\", \"name\" : \"added\", \"fieldName\" : \"added\" },\r\n "
+ + " \"delta\" : { \"type\" : \"doubleSum\", \"name\" : \"delta\", \"fieldName\" : \"delta\" },\r\n "
+ + " \"variation\" : { \"type\" : \"doubleSum\", \"name\" : \"variation\", \"fieldName\" : \"variation\" },\r\n "
+ + " \"deleted\" : { \"type\" : \"doubleSum\", \"name\" : \"deleted\", \"fieldName\" : \"deleted\" }\r\n "
+ + " },\r\n "
+ + " \"queryGranularity\" : {\r\n \"type\": \"none\"\r\n },\r\n "
+ + " \"size\" : 300000,\r\n "
+ + " \"numRows\" : 5000000\r\n} ]";
+
+ /* Submits the request and returns */
+ @Override
+ protected SegmentAnalysis submitMetadataRequest(String address, SegmentMetadataQuery query)
+ throws SerDeException {
+ // Retrieve results
+ List<SegmentAnalysis> resultsList;
+ try {
+ resultsList = DruidStorageHandlerUtils.JSON_MAPPER.readValue(RESPONSE,
+ new TypeReference<List<SegmentAnalysis>>() {});
+ } catch (Exception e) {
+ throw new SerDeException(StringUtils.stringifyException(e));
+ }
+ return resultsList.get(0);
+ }
+
+}
http://git-wip-us.apache.org/repos/asf/hive/blob/58d1befa/druid-handler/src/test/org/apache/hadoop/hive/druid/QTestDruidStorageHandler.java
----------------------------------------------------------------------
diff --git a/druid-handler/src/test/org/apache/hadoop/hive/druid/QTestDruidStorageHandler.java b/druid-handler/src/test/org/apache/hadoop/hive/druid/QTestDruidStorageHandler.java
new file mode 100644
index 0000000..0a44aaa
--- /dev/null
+++ b/druid-handler/src/test/org/apache/hadoop/hive/druid/QTestDruidStorageHandler.java
@@ -0,0 +1,34 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.hadoop.hive.druid;
+
+import org.apache.hadoop.hive.serde2.SerDe;
+
+/**
+ * Storage handler for Druid to be used in tests. It cannot connect to
+ * Druid, and thus it cannot execute queries.
+ */
+@SuppressWarnings("deprecation")
+public class QTestDruidStorageHandler extends DruidStorageHandler {
+
+ @Override
+ public Class<? extends SerDe> getSerDeClass() {
+ return QTestDruidSerDe.class;
+ }
+
+}
http://git-wip-us.apache.org/repos/asf/hive/blob/58d1befa/druid-handler/src/test/org/apache/hadoop/hive/druid/TestDruidSerDe.java
----------------------------------------------------------------------
diff --git a/druid-handler/src/test/org/apache/hadoop/hive/druid/TestDruidSerDe.java b/druid-handler/src/test/org/apache/hadoop/hive/druid/TestDruidSerDe.java
new file mode 100644
index 0000000..9c5c65c
--- /dev/null
+++ b/druid-handler/src/test/org/apache/hadoop/hive/druid/TestDruidSerDe.java
@@ -0,0 +1,576 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.hadoop.hive.druid;
+
+import static org.junit.Assert.assertEquals;
+
+import java.io.IOException;
+import java.lang.reflect.Field;
+import java.lang.reflect.InvocationTargetException;
+import java.lang.reflect.Method;
+import java.sql.Timestamp;
+import java.util.Iterator;
+import java.util.List;
+import java.util.Properties;
+
+import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.hive.conf.Constants;
+import org.apache.hadoop.hive.druid.serde.DruidGroupByQueryRecordReader;
+import org.apache.hadoop.hive.druid.serde.DruidQueryRecordReader;
+import org.apache.hadoop.hive.druid.serde.DruidSelectQueryRecordReader;
+import org.apache.hadoop.hive.druid.serde.DruidSerDe;
+import org.apache.hadoop.hive.druid.serde.DruidTimeseriesQueryRecordReader;
+import org.apache.hadoop.hive.druid.serde.DruidTopNQueryRecordReader;
+import org.apache.hadoop.hive.druid.serde.DruidWritable;
+import org.apache.hadoop.hive.serde2.SerDeException;
+import org.apache.hadoop.hive.serde2.SerDeUtils;
+import org.apache.hadoop.hive.serde2.io.TimestampWritable;
+import org.apache.hadoop.hive.serde2.objectinspector.StructField;
+import org.apache.hadoop.hive.serde2.objectinspector.StructObjectInspector;
+import org.apache.hadoop.io.FloatWritable;
+import org.apache.hadoop.io.LongWritable;
+import org.apache.hadoop.io.NullWritable;
+import org.apache.hadoop.io.Text;
+import org.junit.Test;
+
+import com.fasterxml.jackson.core.JsonParseException;
+import com.fasterxml.jackson.core.type.TypeReference;
+import com.fasterxml.jackson.databind.JsonMappingException;
+import com.fasterxml.jackson.databind.ObjectMapper;
+
+import io.druid.data.input.Row;
+import io.druid.jackson.DefaultObjectMapper;
+import io.druid.query.Query;
+import io.druid.query.Result;
+import io.druid.query.groupby.GroupByQuery;
+import io.druid.query.select.SelectQuery;
+import io.druid.query.select.SelectResultValue;
+import io.druid.query.timeseries.TimeseriesQuery;
+import io.druid.query.timeseries.TimeseriesResultValue;
+import io.druid.query.topn.TopNQuery;
+import io.druid.query.topn.TopNResultValue;
+
+/**
+ * Basic tests for Druid SerDe. The examples are taken from Druid 0.9.1.1
+ * documentation.
+ */
+public class TestDruidSerDe {
+
+ // Timeseries query
+ private static final String TIMESERIES_QUERY =
+ "{ \"queryType\": \"timeseries\", "
+ + " \"dataSource\": \"sample_datasource\", "
+ + " \"granularity\": \"day\", "
+ + " \"descending\": \"true\", "
+ + " \"filter\": { "
+ + " \"type\": \"and\", "
+ + " \"fields\": [ "
+ + " { \"type\": \"selector\", \"dimension\": \"sample_dimension1\", \"value\": \"sample_value1\" }, "
+ + " { \"type\": \"or\", "
+ + " \"fields\": [ "
+ + " { \"type\": \"selector\", \"dimension\": \"sample_dimension2\", \"value\": \"sample_value2\" }, "
+ + " { \"type\": \"selector\", \"dimension\": \"sample_dimension3\", \"value\": \"sample_value3\" } "
+ + " ] "
+ + " } "
+ + " ] "
+ + " }, "
+ + " \"aggregations\": [ "
+ + " { \"type\": \"longSum\", \"name\": \"sample_name1\", \"fieldName\": \"sample_fieldName1\" }, "
+ + " { \"type\": \"doubleSum\", \"name\": \"sample_name2\", \"fieldName\": \"sample_fieldName2\" } "
+ + " ], "
+ + " \"postAggregations\": [ "
+ + " { \"type\": \"arithmetic\", "
+ + " \"name\": \"sample_divide\", "
+ + " \"fn\": \"/\", "
+ + " \"fields\": [ "
+ + " { \"type\": \"fieldAccess\", \"name\": \"postAgg__sample_name1\", \"fieldName\": \"sample_name1\" }, "
+ + " { \"type\": \"fieldAccess\", \"name\": \"postAgg__sample_name2\", \"fieldName\": \"sample_name2\" } "
+ + " ] "
+ + " } "
+ + " ], "
+ + " \"intervals\": [ \"2012-01-01T00:00:00.000/2012-01-03T00:00:00.000\" ]}";
+ // Timeseries query results
+ private static final String TIMESERIES_QUERY_RESULTS =
+ "[ "
+ + "{ "
+ + " \"timestamp\": \"2012-01-01T00:00:00.000Z\", "
+ + " \"result\": { \"sample_name1\": 0, \"sample_name2\": 1.0, \"sample_divide\": 2.2222 } "
+ + "}, "
+ + "{ "
+ + " \"timestamp\": \"2012-01-02T00:00:00.000Z\", "
+ + " \"result\": { \"sample_name1\": 2, \"sample_name2\": 3.32, \"sample_divide\": 4 } "
+ + "}]";
+ // Timeseries query results as records
+ private static final Object[][] TIMESERIES_QUERY_RESULTS_RECORDS = new Object[][] {
+ new Object[] { new TimestampWritable(new Timestamp(1325376000000L)), new LongWritable(0), new FloatWritable(1.0F), new FloatWritable(2.2222F) } ,
+ new Object[] { new TimestampWritable(new Timestamp(1325462400000L)), new LongWritable(2), new FloatWritable(3.32F), new FloatWritable(4F)}
+ };
+
+ // TopN query
+ private static final String TOPN_QUERY =
+ "{ \"queryType\": \"topN\", "
+ + " \"dataSource\": \"sample_data\", "
+ + " \"dimension\": \"sample_dim\", "
+ + " \"threshold\": 5, "
+ + " \"metric\": \"count\", "
+ + " \"granularity\": \"all\", "
+ + " \"filter\": { "
+ + " \"type\": \"and\", "
+ + " \"fields\": [ "
+ + " { "
+ + " \"type\": \"selector\", "
+ + " \"dimension\": \"dim1\", "
+ + " \"value\": \"some_value\" "
+ + " }, "
+ + " { "
+ + " \"type\": \"selector\", "
+ + " \"dimension\": \"dim2\", "
+ + " \"value\": \"some_other_val\" "
+ + " } "
+ + " ] "
+ + " }, "
+ + " \"aggregations\": [ "
+ + " { "
+ + " \"type\": \"longSum\", "
+ + " \"name\": \"count\", "
+ + " \"fieldName\": \"count\" "
+ + " }, "
+ + " { "
+ + " \"type\": \"doubleSum\", "
+ + " \"name\": \"some_metric\", "
+ + " \"fieldName\": \"some_metric\" "
+ + " } "
+ + " ], "
+ + " \"postAggregations\": [ "
+ + " { "
+ + " \"type\": \"arithmetic\", "
+ + " \"name\": \"sample_divide\", "
+ + " \"fn\": \"/\", "
+ + " \"fields\": [ "
+ + " { "
+ + " \"type\": \"fieldAccess\", "
+ + " \"name\": \"some_metric\", "
+ + " \"fieldName\": \"some_metric\" "
+ + " }, "
+ + " { "
+ + " \"type\": \"fieldAccess\", "
+ + " \"name\": \"count\", "
+ + " \"fieldName\": \"count\" "
+ + " } "
+ + " ] "
+ + " } "
+ + " ], "
+ + " \"intervals\": [ "
+ + " \"2013-08-31T00:00:00.000/2013-09-03T00:00:00.000\" "
+ + " ]}";
+ // TopN query results
+ private static final String TOPN_QUERY_RESULTS =
+ "[ "
+ + " { "
+ + " \"timestamp\": \"2013-08-31T00:00:00.000Z\", "
+ + " \"result\": [ "
+ + " { "
+ + " \"sample_dim\": \"dim1_val\", "
+ + " \"count\": 111, "
+ + " \"some_metric\": 10669, "
+ + " \"sample_divide\": 96.11711711711712 "
+ + " }, "
+ + " { "
+ + " \"sample_dim\": \"another_dim1_val\", "
+ + " \"count\": 88, "
+ + " \"some_metric\": 28344, "
+ + " \"sample_divide\": 322.09090909090907 "
+ + " }, "
+ + " { "
+ + " \"sample_dim\": \"dim1_val3\", "
+ + " \"count\": 70, "
+ + " \"some_metric\": 871, "
+ + " \"sample_divide\": 12.442857142857143 "
+ + " }, "
+ + " { "
+ + " \"sample_dim\": \"dim1_val4\", "
+ + " \"count\": 62, "
+ + " \"some_metric\": 815, "
+ + " \"sample_divide\": 13.14516129032258 "
+ + " }, "
+ + " { "
+ + " \"sample_dim\": \"dim1_val5\", "
+ + " \"count\": 60, "
+ + " \"some_metric\": 2787, "
+ + " \"sample_divide\": 46.45 "
+ + " } "
+ + " ] "
+ + " }]";
+ // TopN query results as records
+ private static final Object[][] TOPN_QUERY_RESULTS_RECORDS = new Object[][] {
+ new Object[] { new TimestampWritable(new Timestamp(1377907200000L)), new Text("dim1_val"), new LongWritable(111), new FloatWritable(10669F), new FloatWritable(96.11711711711712F) } ,
+ new Object[] { new TimestampWritable(new Timestamp(1377907200000L)), new Text("another_dim1_val"), new LongWritable(88), new FloatWritable(28344F), new FloatWritable(322.09090909090907F) } ,
+ new Object[] { new TimestampWritable(new Timestamp(1377907200000L)), new Text("dim1_val3"), new LongWritable(70), new FloatWritable(871F), new FloatWritable(12.442857142857143F) } ,
+ new Object[] { new TimestampWritable(new Timestamp(1377907200000L)), new Text("dim1_val4"), new LongWritable(62), new FloatWritable(815F), new FloatWritable(13.14516129032258F) } ,
+ new Object[] { new TimestampWritable(new Timestamp(1377907200000L)), new Text("dim1_val5"), new LongWritable(60), new FloatWritable(2787F), new FloatWritable(46.45F) }
+ };
+
+ // GroupBy query
+ private static final String GROUP_BY_QUERY =
+ "{ "
+ + " \"queryType\": \"groupBy\", "
+ + " \"dataSource\": \"sample_datasource\", "
+ + " \"granularity\": \"day\", "
+ + " \"dimensions\": [\"country\", \"device\"], "
+ + " \"limitSpec\": {"
+ + " \"type\": \"default\","
+ + " \"limit\": 5000,"
+ + " \"columns\": [\"country\", \"data_transfer\"] }, "
+ + " \"filter\": { "
+ + " \"type\": \"and\", "
+ + " \"fields\": [ "
+ + " { \"type\": \"selector\", \"dimension\": \"carrier\", \"value\": \"AT&T\" }, "
+ + " { \"type\": \"or\", "
+ + " \"fields\": [ "
+ + " { \"type\": \"selector\", \"dimension\": \"make\", \"value\": \"Apple\" }, "
+ + " { \"type\": \"selector\", \"dimension\": \"make\", \"value\": \"Samsung\" } "
+ + " ] "
+ + " } "
+ + " ] "
+ + " }, "
+ + " \"aggregations\": [ "
+ + " { \"type\": \"longSum\", \"name\": \"total_usage\", \"fieldName\": \"user_count\" }, "
+ + " { \"type\": \"doubleSum\", \"name\": \"data_transfer\", \"fieldName\": \"data_transfer\" } "
+ + " ], "
+ + " \"postAggregations\": [ "
+ + " { \"type\": \"arithmetic\", "
+ + " \"name\": \"avg_usage\", "
+ + " \"fn\": \"/\", "
+ + " \"fields\": [ "
+ + " { \"type\": \"fieldAccess\", \"fieldName\": \"data_transfer\" }, "
+ + " { \"type\": \"fieldAccess\", \"fieldName\": \"total_usage\" } "
+ + " ] "
+ + " } "
+ + " ], "
+ + " \"intervals\": [ \"2012-01-01T00:00:00.000/2012-01-03T00:00:00.000\" ], "
+ + " \"having\": { "
+ + " \"type\": \"greaterThan\", "
+ + " \"aggregation\": \"total_usage\", "
+ + " \"value\": 100 "
+ + " }}";
+ // GroupBy query results
+ private static final String GROUP_BY_QUERY_RESULTS =
+ "[ "
+ + " { "
+ + " \"version\" : \"v1\", "
+ + " \"timestamp\" : \"2012-01-01T00:00:00.000Z\", "
+ + " \"event\" : { "
+ + " \"country\" : \"India\", "
+ + " \"device\" : \"phone\", "
+ + " \"total_usage\" : 88, "
+ + " \"data_transfer\" : 29.91233453, "
+ + " \"avg_usage\" : 60.32 "
+ + " } "
+ + " }, "
+ + " { "
+ + " \"version\" : \"v1\", "
+ + " \"timestamp\" : \"2012-01-01T00:00:12.000Z\", "
+ + " \"event\" : { "
+ + " \"country\" : \"Spain\", "
+ + " \"device\" : \"pc\", "
+ + " \"total_usage\" : 16, "
+ + " \"data_transfer\" : 172.93494959, "
+ + " \"avg_usage\" : 6.333333 "
+ + " } "
+ + " }]";
+ // GroupBy query results as records
+ private static final Object[][] GROUP_BY_QUERY_RESULTS_RECORDS = new Object[][] {
+ new Object[] { new TimestampWritable(new Timestamp(1325376000000L)), new Text("India"), new Text("phone"), new LongWritable(88), new FloatWritable(29.91233453F), new FloatWritable(60.32F) } ,
+ new Object[] { new TimestampWritable(new Timestamp(1325376012000L)), new Text("Spain"), new Text("pc"), new LongWritable(16), new FloatWritable(172.93494959F), new FloatWritable(6.333333F) }
+ };
+
+ // Select query
+ private static final String SELECT_QUERY =
+ "{ \"queryType\": \"select\", "
+ + " \"dataSource\": \"wikipedia\", \"descending\": \"false\", "
+ + " \"dimensions\":[\"robot\",\"namespace\",\"anonymous\",\"unpatrolled\",\"page\",\"language\",\"newpage\",\"user\"], "
+ + " \"metrics\":[\"count\",\"added\",\"delta\",\"variation\",\"deleted\"], "
+ + " \"granularity\": \"all\", "
+ + " \"intervals\": [ \"2013-01-01/2013-01-02\" ], "
+ + " \"pagingSpec\":{\"pagingIdentifiers\": {}, \"threshold\":5} }";
+ // Select query results
+ private static final String SELECT_QUERY_RESULTS =
+ "[{ "
+ + " \"timestamp\" : \"2013-01-01T00:00:00.000Z\", "
+ + " \"result\" : { "
+ + " \"pagingIdentifiers\" : { "
+ + " \"wikipedia_2012-12-29T00:00:00.000Z_2013-01-10T08:00:00.000Z_2013-01-10T08:13:47.830Z_v9\" : 4 }, "
+ + " \"events\" : [ { "
+ + " \"segmentId\" : \"wikipedia_editstream_2012-12-29T00:00:00.000Z_2013-01-10T08:00:00.000Z_2013-01-10T08:13:47.830Z_v9\", "
+ + " \"offset\" : 0, "
+ + " \"event\" : { "
+ + " \"timestamp\" : \"2013-01-01T00:00:00.000Z\", "
+ + " \"robot\" : \"1\", "
+ + " \"namespace\" : \"article\", "
+ + " \"anonymous\" : \"0\", "
+ + " \"unpatrolled\" : \"0\", "
+ + " \"page\" : \"11._korpus_(NOVJ)\", "
+ + " \"language\" : \"sl\", "
+ + " \"newpage\" : \"0\", "
+ + " \"user\" : \"EmausBot\", "
+ + " \"count\" : 1.0, "
+ + " \"added\" : 39.0, "
+ + " \"delta\" : 39.0, "
+ + " \"variation\" : 39.0, "
+ + " \"deleted\" : 0.0 "
+ + " } "
+ + " }, { "
+ + " \"segmentId\" : \"wikipedia_2012-12-29T00:00:00.000Z_2013-01-10T08:00:00.000Z_2013-01-10T08:13:47.830Z_v9\", "
+ + " \"offset\" : 1, "
+ + " \"event\" : { "
+ + " \"timestamp\" : \"2013-01-01T00:00:00.000Z\", "
+ + " \"robot\" : \"0\", "
+ + " \"namespace\" : \"article\", "
+ + " \"anonymous\" : \"0\", "
+ + " \"unpatrolled\" : \"0\", "
+ + " \"page\" : \"112_U.S._580\", "
+ + " \"language\" : \"en\", "
+ + " \"newpage\" : \"1\", "
+ + " \"user\" : \"MZMcBride\", "
+ + " \"count\" : 1.0, "
+ + " \"added\" : 70.0, "
+ + " \"delta\" : 70.0, "
+ + " \"variation\" : 70.0, "
+ + " \"deleted\" : 0.0 "
+ + " } "
+ + " }, { "
+ + " \"segmentId\" : \"wikipedia_2012-12-29T00:00:00.000Z_2013-01-10T08:00:00.000Z_2013-01-10T08:13:47.830Z_v9\", "
+ + " \"offset\" : 2, "
+ + " \"event\" : { "
+ + " \"timestamp\" : \"2013-01-01T00:00:00.000Z\", "
+ + " \"robot\" : \"0\", "
+ + " \"namespace\" : \"article\", "
+ + " \"anonymous\" : \"0\", "
+ + " \"unpatrolled\" : \"0\", "
+ + " \"page\" : \"113_U.S._243\", "
+ + " \"language\" : \"en\", "
+ + " \"newpage\" : \"1\", "
+ + " \"user\" : \"MZMcBride\", "
+ + " \"count\" : 1.0, "
+ + " \"added\" : 77.0, "
+ + " \"delta\" : 77.0, "
+ + " \"variation\" : 77.0, "
+ + " \"deleted\" : 0.0 "
+ + " } "
+ + " }, { "
+ + " \"segmentId\" : \"wikipedia_2012-12-29T00:00:00.000Z_2013-01-10T08:00:00.000Z_2013-01-10T08:13:47.830Z_v9\", "
+ + " \"offset\" : 3, "
+ + " \"event\" : { "
+ + " \"timestamp\" : \"2013-01-01T00:00:00.000Z\", "
+ + " \"robot\" : \"0\", "
+ + " \"namespace\" : \"article\", "
+ + " \"anonymous\" : \"0\", "
+ + " \"unpatrolled\" : \"0\", "
+ + " \"page\" : \"113_U.S._73\", "
+ + " \"language\" : \"en\", "
+ + " \"newpage\" : \"1\", "
+ + " \"user\" : \"MZMcBride\", "
+ + " \"count\" : 1.0, "
+ + " \"added\" : 70.0, "
+ + " \"delta\" : 70.0, "
+ + " \"variation\" : 70.0, "
+ + " \"deleted\" : 0.0 "
+ + " } "
+ + " }, { "
+ + " \"segmentId\" : \"wikipedia_2012-12-29T00:00:00.000Z_2013-01-10T08:00:00.000Z_2013-01-10T08:13:47.830Z_v9\", "
+ + " \"offset\" : 4, "
+ + " \"event\" : { "
+ + " \"timestamp\" : \"2013-01-01T00:00:00.000Z\", "
+ + " \"robot\" : \"0\", "
+ + " \"namespace\" : \"article\", "
+ + " \"anonymous\" : \"0\", "
+ + " \"unpatrolled\" : \"0\", "
+ + " \"page\" : \"113_U.S._756\", "
+ + " \"language\" : \"en\", "
+ + " \"newpage\" : \"1\", "
+ + " \"user\" : \"MZMcBride\", "
+ + " \"count\" : 1.0, "
+ + " \"added\" : 68.0, "
+ + " \"delta\" : 68.0, "
+ + " \"variation\" : 68.0, "
+ + " \"deleted\" : 0.0 "
+ + " } "
+ + " } ] }} ]";
+ // Select query results as records
+ private static final Object[][] SELECT_QUERY_RESULTS_RECORDS = new Object[][] {
+ new Object[] { new TimestampWritable(new Timestamp(1356998400000L)), new Text("1"), new Text("article"), new Text("0"), new Text("0"),
+ new Text("11._korpus_(NOVJ)"), new Text("sl"), new Text("0"), new Text("EmausBot"),
+ new FloatWritable(1.0F), new FloatWritable(39.0F), new FloatWritable(39.0F), new FloatWritable(39.0F), new FloatWritable(0.0F) } ,
+ new Object[] { new TimestampWritable(new Timestamp(1356998400000L)), new Text("0"), new Text("article"), new Text("0"), new Text("0"),
+ new Text("112_U.S._580"), new Text("en"), new Text("1"), new Text("MZMcBride"),
+ new FloatWritable(1.0F), new FloatWritable(70.0F), new FloatWritable(70.0F), new FloatWritable(70.0F), new FloatWritable(0.0F) } ,
+ new Object[] { new TimestampWritable(new Timestamp(1356998400000L)), new Text("0"), new Text("article"), new Text("0"), new Text("0"),
+ new Text("113_U.S._243"), new Text("en"), new Text("1"), new Text("MZMcBride"),
+ new FloatWritable(1.0F), new FloatWritable(77.0F), new FloatWritable(77.0F), new FloatWritable(77.0F), new FloatWritable(0.0F) } ,
+ new Object[] { new TimestampWritable(new Timestamp(1356998400000L)), new Text("0"), new Text("article"), new Text("0"), new Text("0"),
+ new Text("113_U.S._73"), new Text("en"), new Text("1"), new Text("MZMcBride"),
+ new FloatWritable(1.0F), new FloatWritable(70.0F), new FloatWritable(70.0F), new FloatWritable(70.0F), new FloatWritable(0.0F) } ,
+ new Object[] { new TimestampWritable(new Timestamp(1356998400000L)), new Text("0"), new Text("article"), new Text("0"), new Text("0"),
+ new Text("113_U.S._756"), new Text("en"), new Text("1"), new Text("MZMcBride"),
+ new FloatWritable(1.0F), new FloatWritable(68.0F), new FloatWritable(68.0F), new FloatWritable(68.0F), new FloatWritable(0.0F) }
+ };
+
+
+ /**
+ * Test the default behavior of the objects and object inspectors.
+ * @throws IOException
+ * @throws IllegalAccessException
+ * @throws IllegalArgumentException
+ * @throws SecurityException
+ * @throws NoSuchFieldException
+ * @throws JsonMappingException
+ * @throws JsonParseException
+ * @throws InvocationTargetException
+ * @throws NoSuchMethodException
+ */
+ @Test
+ public void testDruidSerDe()
+ throws SerDeException, JsonParseException, JsonMappingException,
+ NoSuchFieldException, SecurityException, IllegalArgumentException,
+ IllegalAccessException, IOException, InterruptedException,
+ NoSuchMethodException, InvocationTargetException {
+ // Create, initialize, and test the SerDe
+ DruidSerDe serDe = new DruidSerDe();
+ Configuration conf = new Configuration();
+ Properties tbl;
+ // Timeseries query
+ tbl = createPropertiesQuery("sample_datasource", Query.TIMESERIES, TIMESERIES_QUERY);
+ SerDeUtils.initializeSerDe(serDe, conf, tbl, null);
+ deserializeQueryResults(serDe, Query.TIMESERIES, TIMESERIES_QUERY,
+ TIMESERIES_QUERY_RESULTS, TIMESERIES_QUERY_RESULTS_RECORDS);
+ // TopN query
+ tbl = createPropertiesQuery("sample_data", Query.TOPN, TOPN_QUERY);
+ SerDeUtils.initializeSerDe(serDe, conf, tbl, null);
+ deserializeQueryResults(serDe, Query.TOPN, TOPN_QUERY,
+ TOPN_QUERY_RESULTS, TOPN_QUERY_RESULTS_RECORDS);
+ // GroupBy query
+ tbl = createPropertiesQuery("sample_datasource", Query.GROUP_BY, GROUP_BY_QUERY);
+ SerDeUtils.initializeSerDe(serDe, conf, tbl, null);
+ deserializeQueryResults(serDe, Query.GROUP_BY, GROUP_BY_QUERY,
+ GROUP_BY_QUERY_RESULTS, GROUP_BY_QUERY_RESULTS_RECORDS);
+ // Select query
+ tbl = createPropertiesQuery("wikipedia", Query.SELECT, SELECT_QUERY);
+ SerDeUtils.initializeSerDe(serDe, conf, tbl, null);
+ deserializeQueryResults(serDe, Query.SELECT, SELECT_QUERY,
+ SELECT_QUERY_RESULTS, SELECT_QUERY_RESULTS_RECORDS);
+ }
+
+ private static Properties createPropertiesQuery(String dataSource, String queryType, String jsonQuery) {
+ Properties tbl = new Properties();
+
+ // Set the configuration parameters
+ tbl.setProperty(Constants.DRUID_DATA_SOURCE, dataSource);
+ tbl.setProperty(Constants.DRUID_QUERY_JSON, jsonQuery);
+ tbl.setProperty(Constants.DRUID_QUERY_TYPE, queryType);
+ return tbl;
+ }
+
+ private static void deserializeQueryResults(DruidSerDe serDe, String queryType, String jsonQuery,
+ String resultString, Object[][] records) throws SerDeException, JsonParseException,
+ JsonMappingException, IOException, NoSuchFieldException, SecurityException,
+ IllegalArgumentException, IllegalAccessException, InterruptedException,
+ NoSuchMethodException, InvocationTargetException {
+
+ // Initialize
+ Query<?> query = null;
+ DruidQueryRecordReader<?,?> reader = null;
+ List<?> resultsList = null;
+ ObjectMapper mapper = new DefaultObjectMapper();
+ switch (queryType) {
+ case Query.TIMESERIES:
+ query = mapper.readValue(jsonQuery, TimeseriesQuery.class);
+ reader = new DruidTimeseriesQueryRecordReader();
+ resultsList = mapper.readValue(resultString,
+ new TypeReference<List<Result<TimeseriesResultValue>>>() {});
+ break;
+ case Query.TOPN:
+ query = mapper.readValue(jsonQuery, TopNQuery.class);
+ reader = new DruidTopNQueryRecordReader();
+ resultsList = mapper.readValue(resultString,
+ new TypeReference<List<Result<TopNResultValue>>>() {});
+ break;
+ case Query.GROUP_BY:
+ query = mapper.readValue(jsonQuery, GroupByQuery.class);
+ reader = new DruidGroupByQueryRecordReader();
+ resultsList = mapper.readValue(resultString,
+ new TypeReference<List<Row>>() {});
+ break;
+ case Query.SELECT:
+ query = mapper.readValue(jsonQuery, SelectQuery.class);
+ reader = new DruidSelectQueryRecordReader();
+ resultsList = mapper.readValue(resultString,
+ new TypeReference<List<Result<SelectResultValue>>>() {});
+ break;
+ }
+
+ // Set query and fields access
+ Field field1 = DruidQueryRecordReader.class.getDeclaredField("query");
+ field1.setAccessible(true);
+ field1.set(reader, query);
+ if (reader instanceof DruidGroupByQueryRecordReader) {
+ Method method1 = DruidGroupByQueryRecordReader.class.getDeclaredMethod("initExtractors");
+ method1.setAccessible(true);
+ method1.invoke(reader);
+ }
+ Field field2 = DruidQueryRecordReader.class.getDeclaredField("results");
+ field2.setAccessible(true);
+
+ // Get the row structure
+ StructObjectInspector oi = (StructObjectInspector) serDe.getObjectInspector();
+ List<? extends StructField> fieldRefs = oi.getAllStructFieldRefs();
+
+ // Check mapred
+ Iterator<?> results = resultsList.iterator();
+ field2.set(reader, results);
+ DruidWritable writable = new DruidWritable();
+ int pos = 0;
+ while (reader.next(NullWritable.get(), writable)) {
+ Object row = serDe.deserialize(writable);
+ Object[] expectedFieldsData = records[pos];
+ assertEquals(expectedFieldsData.length, fieldRefs.size());
+ for (int i = 0; i < fieldRefs.size(); i++) {
+ Object fieldData = oi.getStructFieldData(row, fieldRefs.get(i));
+ assertEquals("Field " + i, expectedFieldsData[i], fieldData);
+ }
+ pos++;
+ }
+ assertEquals(pos, records.length);
+
+ // Check mapreduce
+ results = resultsList.iterator();
+ field2.set(reader, results);
+ pos = 0;
+ while (reader.nextKeyValue()) {
+ Object row = serDe.deserialize(reader.getCurrentValue());
+ Object[] expectedFieldsData = records[pos];
+ assertEquals(expectedFieldsData.length, fieldRefs.size());
+ for (int i = 0; i < fieldRefs.size(); i++) {
+ Object fieldData = oi.getStructFieldData(row, fieldRefs.get(i));
+ assertEquals("Field " + i, expectedFieldsData[i], fieldData);
+ }
+ pos++;
+ }
+ assertEquals(pos, records.length);
+ }
+
+}
http://git-wip-us.apache.org/repos/asf/hive/blob/58d1befa/druid-handler/src/test/org/apache/hadoop/hive/druid/TestHiveDruidQueryBasedInputFormat.java
----------------------------------------------------------------------
diff --git a/druid-handler/src/test/org/apache/hadoop/hive/druid/TestHiveDruidQueryBasedInputFormat.java b/druid-handler/src/test/org/apache/hadoop/hive/druid/TestHiveDruidQueryBasedInputFormat.java
new file mode 100644
index 0000000..b20168d
--- /dev/null
+++ b/druid-handler/src/test/org/apache/hadoop/hive/druid/TestHiveDruidQueryBasedInputFormat.java
@@ -0,0 +1,101 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.hadoop.hive.druid;
+
+import java.lang.reflect.Method;
+import java.util.ArrayList;
+import java.util.Arrays;
+import java.util.List;
+
+import org.joda.time.Interval;
+import org.junit.Test;
+
+import junit.framework.TestCase;
+
+public class TestHiveDruidQueryBasedInputFormat extends TestCase {
+
+ @SuppressWarnings("unchecked")
+ @Test
+ public void testCreateSplitsIntervals() throws Exception {
+ HiveDruidQueryBasedInputFormat input = new HiveDruidQueryBasedInputFormat();
+
+ Method method1 = HiveDruidQueryBasedInputFormat.class.getDeclaredMethod("createSplitsIntervals",
+ List.class, int.class);
+ method1.setAccessible(true);
+
+ List<Interval> intervals;
+ List<List<Interval>> resultList;
+ List<List<Interval>> expectedResultList;
+
+ // Test 1 : single split, create 4
+ intervals = new ArrayList<>();
+ intervals.add(new Interval(1262304000000L, 1293840000000L));
+ resultList = (List<List<Interval>>) method1.invoke(input, intervals, 4);
+ expectedResultList = new ArrayList<>();
+ expectedResultList.add(Arrays.asList(new Interval(1262304000000L, 1270188000000L)));
+ expectedResultList.add(Arrays.asList(new Interval(1270188000000L, 1278072000000L)));
+ expectedResultList.add(Arrays.asList(new Interval(1278072000000L, 1285956000000L)));
+ expectedResultList.add(Arrays.asList(new Interval(1285956000000L, 1293840000000L)));
+ assertEquals(expectedResultList, resultList);
+
+ // Test 2 : two splits, create 4
+ intervals = new ArrayList<>();
+ intervals.add(new Interval(1262304000000L, 1293840000000L));
+ intervals.add(new Interval(1325376000000L, 1356998400000L));
+ resultList = (List<List<Interval>>) method1.invoke(input, intervals, 4);
+ expectedResultList = new ArrayList<>();
+ expectedResultList.add(Arrays.asList(new Interval(1262304000000L, 1278093600000L)));
+ expectedResultList.add(Arrays.asList(new Interval(1278093600000L, 1293840000000L),
+ new Interval(1325376000000L, 1325419200000L)));
+ expectedResultList.add(Arrays.asList(new Interval(1325419200000L, 1341208800000L)));
+ expectedResultList.add(Arrays.asList(new Interval(1341208800000L, 1356998400000L)));
+ assertEquals(expectedResultList, resultList);
+
+ // Test 3 : two splits, create 5
+ intervals = new ArrayList<>();
+ intervals.add(new Interval(1262304000000L, 1293840000000L));
+ intervals.add(new Interval(1325376000000L, 1356998400000L));
+ resultList = (List<List<Interval>>) method1.invoke(input, intervals, 5);
+ expectedResultList = new ArrayList<>();
+ expectedResultList.add(Arrays.asList(new Interval(1262304000000L, 1274935680000L)));
+ expectedResultList.add(Arrays.asList(new Interval(1274935680000L, 1287567360000L)));
+ expectedResultList.add(Arrays.asList(new Interval(1287567360000L, 1293840000000L),
+ new Interval(1325376000000L, 1331735040000L)));
+ expectedResultList.add(Arrays.asList(new Interval(1331735040000L, 1344366720000L)));
+ expectedResultList.add(Arrays.asList(new Interval(1344366720000L, 1356998400000L)));
+ assertEquals(expectedResultList, resultList);
+
+ // Test 4 : three splits, different ranges, create 6
+ intervals = new ArrayList<>();
+ intervals.add(new Interval(1199145600000L, 1201824000000L)); // one month
+ intervals.add(new Interval(1325376000000L, 1356998400000L)); // one year
+ intervals.add(new Interval(1407283200000L, 1407888000000L)); // 7 days
+ resultList = (List<List<Interval>>) method1.invoke(input, intervals, 6);
+ expectedResultList = new ArrayList<>();
+ expectedResultList.add(Arrays.asList(new Interval(1199145600000L, 1201824000000L),
+ new Interval(1325376000000L, 1328515200000L)));
+ expectedResultList.add(Arrays.asList(new Interval(1328515200000L, 1334332800000L)));
+ expectedResultList.add(Arrays.asList(new Interval(1334332800000L, 1340150400000L)));
+ expectedResultList.add(Arrays.asList(new Interval(1340150400000L, 1345968000000L)));
+ expectedResultList.add(Arrays.asList(new Interval(1345968000000L, 1351785600000L)));
+ expectedResultList.add(Arrays.asList(new Interval(1351785600000L, 1356998400000L),
+ new Interval(1407283200000L, 1407888000000L)));
+ assertEquals(expectedResultList, resultList);
+ }
+
+}
http://git-wip-us.apache.org/repos/asf/hive/blob/58d1befa/itests/qtest/pom.xml
----------------------------------------------------------------------
diff --git a/itests/qtest/pom.xml b/itests/qtest/pom.xml
index 7fc72b9..e762d0e 100644
--- a/itests/qtest/pom.xml
+++ b/itests/qtest/pom.xml
@@ -110,6 +110,19 @@
<version>${hadoop.version}</version>
<optional>true</optional>
</dependency>
+ <dependency>
+ <groupId>org.apache.hive</groupId>
+ <artifactId>hive-druid-handler</artifactId>
+ <version>${project.version}</version>
+ <scope>test</scope>
+ </dependency>
+ <dependency>
+ <groupId>org.apache.hive</groupId>
+ <artifactId>hive-druid-handler</artifactId>
+ <version>${project.version}</version>
+ <classifier>tests</classifier>
+ <scope>test</scope>
+ </dependency>
<!-- test inter-project -->
<dependency>
http://git-wip-us.apache.org/repos/asf/hive/blob/58d1befa/packaging/pom.xml
----------------------------------------------------------------------
diff --git a/packaging/pom.xml b/packaging/pom.xml
index 679dfe8..76e0cff 100644
--- a/packaging/pom.xml
+++ b/packaging/pom.xml
@@ -210,6 +210,11 @@
</dependency>
<dependency>
<groupId>org.apache.hive</groupId>
+ <artifactId>hive-druid-handler</artifactId>
+ <version>${project.version}</version>
+ </dependency>
+ <dependency>
+ <groupId>org.apache.hive</groupId>
<artifactId>hive-hwi</artifactId>
<version>${project.version}</version>
</dependency>
http://git-wip-us.apache.org/repos/asf/hive/blob/58d1befa/pom.xml
----------------------------------------------------------------------
diff --git a/pom.xml b/pom.xml
index 4c41200..2fb78cd 100644
--- a/pom.xml
+++ b/pom.xml
@@ -37,6 +37,7 @@
<module>cli</module>
<module>common</module>
<module>contrib</module>
+ <module>druid-handler</module>
<module>hbase-handler</module>
<module>hcatalog</module>
<module>hplsql</module>
@@ -130,6 +131,7 @@
<derby.version>10.10.2.0</derby.version>
<dropwizard.version>3.1.0</dropwizard.version>
<dropwizard-metrics-hadoop-metrics2-reporter.version>0.1.2</dropwizard-metrics-hadoop-metrics2-reporter.version>
+ <druid.version>0.9.1.1</druid.version>
<guava.version>14.0.1</guava.version>
<groovy.version>2.4.4</groovy.version>
<hadoop.version>2.7.2</hadoop.version>
http://git-wip-us.apache.org/repos/asf/hive/blob/58d1befa/ql/src/java/org/apache/hadoop/hive/ql/exec/DDLTask.java
----------------------------------------------------------------------
diff --git a/ql/src/java/org/apache/hadoop/hive/ql/exec/DDLTask.java b/ql/src/java/org/apache/hadoop/hive/ql/exec/DDLTask.java
index 5722305..66cbdd3 100644
--- a/ql/src/java/org/apache/hadoop/hive/ql/exec/DDLTask.java
+++ b/ql/src/java/org/apache/hadoop/hive/ql/exec/DDLTask.java
@@ -60,6 +60,7 @@ import org.apache.hadoop.fs.Path;
import org.apache.hadoop.hive.common.FileUtils;
import org.apache.hadoop.hive.common.StatsSetupConst;
import org.apache.hadoop.hive.common.type.HiveDecimal;
+import org.apache.hadoop.hive.conf.Constants;
import org.apache.hadoop.hive.conf.HiveConf;
import org.apache.hadoop.hive.conf.HiveConf.ConfVars;
import org.apache.hadoop.hive.io.HdfsUtils;
@@ -4468,12 +4469,13 @@ public class DDLTask extends Task<DDLWork> implements Serializable {
}
public static boolean doesTableNeedLocation(Table tbl) {
- // If we are ok with breaking compatibility of existing 3rd party StorageHandlers,
+ // TODO: If we are ok with breaking compatibility of existing 3rd party StorageHandlers,
// this method could be moved to the HiveStorageHandler interface.
boolean retval = true;
if (tbl.getStorageHandler() != null) {
- retval = !tbl.getStorageHandler().toString().equals(
- "org.apache.hadoop.hive.hbase.HBaseStorageHandler");
+ String sh = tbl.getStorageHandler().toString();
+ retval = !sh.equals("org.apache.hadoop.hive.hbase.HBaseStorageHandler")
+ && !sh.equals(Constants.DRUID_HIVE_STORAGE_HANDLER_ID);
}
return retval;
}
http://git-wip-us.apache.org/repos/asf/hive/blob/58d1befa/ql/src/java/org/apache/hadoop/hive/ql/exec/FunctionRegistry.java
----------------------------------------------------------------------
diff --git a/ql/src/java/org/apache/hadoop/hive/ql/exec/FunctionRegistry.java b/ql/src/java/org/apache/hadoop/hive/ql/exec/FunctionRegistry.java
index 60646ba..4710b8f 100644
--- a/ql/src/java/org/apache/hadoop/hive/ql/exec/FunctionRegistry.java
+++ b/ql/src/java/org/apache/hadoop/hive/ql/exec/FunctionRegistry.java
@@ -31,14 +31,20 @@ import java.util.TreeSet;
import java.util.regex.Pattern;
import java.util.regex.PatternSyntaxException;
-import org.slf4j.Logger;
-import org.slf4j.LoggerFactory;
import org.apache.hadoop.hive.ql.exec.FunctionInfo.FunctionResource;
import org.apache.hadoop.hive.ql.metadata.HiveException;
import org.apache.hadoop.hive.ql.parse.SemanticException;
import org.apache.hadoop.hive.ql.plan.ExprNodeDesc;
import org.apache.hadoop.hive.ql.plan.ExprNodeGenericFuncDesc;
import org.apache.hadoop.hive.ql.session.SessionState;
+import org.apache.hadoop.hive.ql.udf.UDFDateFloorDay;
+import org.apache.hadoop.hive.ql.udf.UDFDateFloorHour;
+import org.apache.hadoop.hive.ql.udf.UDFDateFloorMinute;
+import org.apache.hadoop.hive.ql.udf.UDFDateFloorMonth;
+import org.apache.hadoop.hive.ql.udf.UDFDateFloorQuarter;
+import org.apache.hadoop.hive.ql.udf.UDFDateFloorSecond;
+import org.apache.hadoop.hive.ql.udf.UDFDateFloorWeek;
+import org.apache.hadoop.hive.ql.udf.UDFDateFloorYear;
import org.apache.hadoop.hive.ql.udf.SettableUDF;
import org.apache.hadoop.hive.ql.udf.UDAFPercentile;
import org.apache.hadoop.hive.ql.udf.UDFAcos;
@@ -141,6 +147,8 @@ import org.apache.hadoop.hive.serde2.typeinfo.TypeInfoFactory;
import org.apache.hadoop.hive.serde2.typeinfo.TypeInfoUtils;
import org.apache.hadoop.util.ReflectionUtils;
import org.apache.hive.common.util.AnnotationUtils;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
/**
* FunctionRegistry.
@@ -289,6 +297,16 @@ public final class FunctionRegistry {
system.registerGenericUDF("trunc", GenericUDFTrunc.class);
system.registerGenericUDF("date_format", GenericUDFDateFormat.class);
+ // Special date formatting functions
+ system.registerUDF("floor_year", UDFDateFloorYear.class, false);
+ system.registerUDF("floor_quarter", UDFDateFloorQuarter.class, false);
+ system.registerUDF("floor_month", UDFDateFloorMonth.class, false);
+ system.registerUDF("floor_day", UDFDateFloorDay.class, false);
+ system.registerUDF("floor_week", UDFDateFloorWeek.class, false);
+ system.registerUDF("floor_hour", UDFDateFloorHour.class, false);
+ system.registerUDF("floor_minute", UDFDateFloorMinute.class, false);
+ system.registerUDF("floor_second", UDFDateFloorSecond.class, false);
+
system.registerGenericUDF("date_add", GenericUDFDateAdd.class);
system.registerGenericUDF("date_sub", GenericUDFDateSub.class);
system.registerGenericUDF("datediff", GenericUDFDateDiff.class);
http://git-wip-us.apache.org/repos/asf/hive/blob/58d1befa/ql/src/java/org/apache/hadoop/hive/ql/optimizer/calcite/HivePlannerContext.java
----------------------------------------------------------------------
diff --git a/ql/src/java/org/apache/hadoop/hive/ql/optimizer/calcite/HivePlannerContext.java b/ql/src/java/org/apache/hadoop/hive/ql/optimizer/calcite/HivePlannerContext.java
index aeb4e7d..890aea1 100644
--- a/ql/src/java/org/apache/hadoop/hive/ql/optimizer/calcite/HivePlannerContext.java
+++ b/ql/src/java/org/apache/hadoop/hive/ql/optimizer/calcite/HivePlannerContext.java
@@ -19,21 +19,28 @@ package org.apache.hadoop.hive.ql.optimizer.calcite;
import org.apache.calcite.plan.Context;
import org.apache.hadoop.hive.ql.optimizer.calcite.cost.HiveAlgorithmsConf;
+import org.apache.hadoop.hive.ql.optimizer.calcite.druid.HiveDruidConf;
import org.apache.hadoop.hive.ql.optimizer.calcite.rules.HiveRulesRegistry;
public class HivePlannerContext implements Context {
- private HiveAlgorithmsConf config;
+ private HiveAlgorithmsConf algoConfig;
+ private HiveDruidConf druidConf;
private HiveRulesRegistry registry;
- public HivePlannerContext(HiveAlgorithmsConf config, HiveRulesRegistry registry) {
- this.config = config;
+ public HivePlannerContext(HiveAlgorithmsConf algoConfig, HiveDruidConf druidConf,
+ HiveRulesRegistry registry) {
+ this.algoConfig = algoConfig;
+ this.druidConf = druidConf;
this.registry = registry;
}
public <T> T unwrap(Class<T> clazz) {
- if (clazz.isInstance(config)) {
- return clazz.cast(config);
+ if (clazz.isInstance(algoConfig)) {
+ return clazz.cast(algoConfig);
+ }
+ if (clazz.isInstance(druidConf)) {
+ return clazz.cast(druidConf);
}
if (clazz.isInstance(registry)) {
return clazz.cast(registry);
http://git-wip-us.apache.org/repos/asf/hive/blob/58d1befa/ql/src/java/org/apache/hadoop/hive/ql/optimizer/calcite/druid/DruidIntervalUtils.java
----------------------------------------------------------------------
diff --git a/ql/src/java/org/apache/hadoop/hive/ql/optimizer/calcite/druid/DruidIntervalUtils.java b/ql/src/java/org/apache/hadoop/hive/ql/optimizer/calcite/druid/DruidIntervalUtils.java
new file mode 100644
index 0000000..82ab4d7
--- /dev/null
+++ b/ql/src/java/org/apache/hadoop/hive/ql/optimizer/calcite/druid/DruidIntervalUtils.java
@@ -0,0 +1,466 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.hadoop.hive.ql.optimizer.calcite.druid;
+
+import java.sql.Timestamp;
+import java.text.DateFormat;
+import java.text.ParseException;
+import java.util.ArrayList;
+import java.util.Arrays;
+import java.util.Comparator;
+import java.util.Date;
+import java.util.Iterator;
+import java.util.List;
+import java.util.TreeSet;
+
+import org.apache.calcite.rel.type.RelDataType;
+import org.apache.calcite.rex.RexCall;
+import org.apache.calcite.rex.RexInputRef;
+import org.apache.calcite.rex.RexLiteral;
+import org.apache.calcite.rex.RexNode;
+import org.apache.calcite.sql.SqlKind;
+import org.apache.commons.lang.StringUtils;
+import org.joda.time.Interval;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import com.google.common.base.Function;
+import com.google.common.collect.BoundType;
+import com.google.common.collect.Lists;
+import com.google.common.collect.Range;
+import com.google.common.collect.Sets;
+
+/**
+ * Utilities for generating intervals from RexNode.
+ *
+ * Based on Navis logic implemented on Hive data structures.
+ * See <a href="https://github.com/druid-io/druid/pull/2880">Druid PR-2880</a>
+ *
+ */
+@SuppressWarnings({"rawtypes","unchecked"})
+public class DruidIntervalUtils {
+
+ protected static final Logger LOG = LoggerFactory.getLogger(DruidIntervalUtils.class);
+
+
+ /**
+ * Given a list of predicates, it generates the equivalent Interval
+ * (if possible). It assumes that all the predicates in the input
+ * reference a single column : the timestamp column.
+ *
+ * @param conjs list of conditions to use for the transformation
+ * @return interval representing the conditions in the input list
+ */
+ public static List<Interval> createInterval(RelDataType type, List<RexNode> conjs) {
+ List<Range> ranges = new ArrayList<>();
+ for (RexNode child : conjs) {
+ List<Range> extractedRanges = extractRanges(type, child, false);
+ if (extractedRanges == null || extractedRanges.isEmpty()) {
+ // We could not extract, we bail out
+ return null;
+ }
+ if (ranges.isEmpty()) {
+ ranges.addAll(extractedRanges);
+ continue;
+ }
+ List<Range> overlapped = Lists.newArrayList();
+ for (Range current : ranges) {
+ for (Range interval : extractedRanges) {
+ if (current.isConnected(interval)) {
+ overlapped.add(current.intersection(interval));
+ }
+ }
+ }
+ ranges = overlapped;
+ }
+ List<Range> compactRanges = condenseRanges(ranges);
+ LOG.debug("Inferred ranges on interval : " + compactRanges);
+ return toInterval(compactRanges);
+ }
+
+ protected static List<Interval> toInterval(List<Range> ranges) {
+ List<Interval> intervals = Lists.transform(ranges, new Function<Range, Interval>() {
+ @Override
+ public Interval apply(Range range) {
+ if (!range.hasLowerBound() && !range.hasUpperBound()) {
+ return DruidTable.DEFAULT_INTERVAL;
+ }
+ long start = range.hasLowerBound() ? toLong(range.lowerEndpoint()) :
+ DruidTable.DEFAULT_INTERVAL.getStartMillis();
+ long end = range.hasUpperBound() ? toLong(range.upperEndpoint()) :
+ DruidTable.DEFAULT_INTERVAL.getEndMillis();
+ if (range.hasLowerBound() && range.lowerBoundType() == BoundType.OPEN) {
+ start++;
+ }
+ if (range.hasUpperBound() && range.upperBoundType() == BoundType.CLOSED) {
+ end++;
+ }
+ return new Interval(start, end);
+ }
+ });
+ LOG.info("Converted time ranges " + ranges + " to interval " + intervals);
+ return intervals;
+ }
+
+ protected static List<Range> extractRanges(RelDataType type, RexNode node,
+ boolean withNot) {
+ switch (node.getKind()) {
+ case EQUALS:
+ case LESS_THAN:
+ case LESS_THAN_OR_EQUAL:
+ case GREATER_THAN:
+ case GREATER_THAN_OR_EQUAL:
+ case BETWEEN:
+ case IN:
+ return leafToRanges(type, (RexCall) node, withNot);
+
+ case NOT:
+ return extractRanges(type, ((RexCall) node).getOperands().get(0), !withNot);
+
+ case OR:
+ RexCall call = (RexCall) node;
+ List<Range> intervals = Lists.newArrayList();
+ for (RexNode child : call.getOperands()) {
+ List<Range> extracted = extractRanges(type, child, withNot);
+ if (extracted != null) {
+ intervals.addAll(extracted);
+ }
+ }
+ return intervals;
+
+ default:
+ return null;
+ }
+ }
+
+ protected static List<Range> leafToRanges(RelDataType type, RexCall call,
+ boolean withNot) {
+ switch (call.getKind()) {
+ case EQUALS:
+ case LESS_THAN:
+ case LESS_THAN_OR_EQUAL:
+ case GREATER_THAN:
+ case GREATER_THAN_OR_EQUAL:
+ {
+ RexLiteral literal = null;
+ if (call.getOperands().get(0) instanceof RexInputRef &&
+ call.getOperands().get(1) instanceof RexLiteral) {
+ literal = extractLiteral(call.getOperands().get(1));
+ } else if (call.getOperands().get(0) instanceof RexInputRef &&
+ call.getOperands().get(1).getKind() == SqlKind.CAST) {
+ literal = extractLiteral(call.getOperands().get(1));
+ } else if (call.getOperands().get(1) instanceof RexInputRef &&
+ call.getOperands().get(0) instanceof RexLiteral) {
+ literal = extractLiteral(call.getOperands().get(0));
+ } else if (call.getOperands().get(1) instanceof RexInputRef &&
+ call.getOperands().get(0).getKind() == SqlKind.CAST) {
+ literal = extractLiteral(call.getOperands().get(0));
+ }
+ if (literal == null) {
+ return null;
+ }
+ Comparable value = literalToType(literal, type);
+ if (value == null) {
+ return null;
+ }
+ if (call.getKind() == SqlKind.LESS_THAN) {
+ return Arrays.<Range> asList(withNot ? Range.atLeast(value) : Range.lessThan(value));
+ } else if (call.getKind() == SqlKind.LESS_THAN_OR_EQUAL) {
+ return Arrays.<Range> asList(withNot ? Range.greaterThan(value) : Range.atMost(value));
+ } else if (call.getKind() == SqlKind.GREATER_THAN) {
+ return Arrays.<Range> asList(withNot ? Range.atMost(value) : Range.greaterThan(value));
+ } else if (call.getKind() == SqlKind.GREATER_THAN_OR_EQUAL) {
+ return Arrays.<Range> asList(withNot ? Range.lessThan(value) : Range.atLeast(value));
+ } else { //EQUALS
+ if (!withNot) {
+ return Arrays.<Range> asList(Range.closed(value, value));
+ }
+ return Arrays.<Range> asList(Range.lessThan(value), Range.greaterThan(value));
+ }
+ }
+ case BETWEEN:
+ {
+ RexLiteral literal1 = extractLiteral(call.getOperands().get(2));
+ if (literal1 == null) {
+ return null;
+ }
+ RexLiteral literal2 = extractLiteral(call.getOperands().get(3));
+ if (literal2 == null) {
+ return null;
+ }
+ Comparable value1 = literalToType(literal1, type);
+ Comparable value2 = literalToType(literal2, type);
+ if (value1 == null || value2 == null) {
+ return null;
+ }
+ boolean inverted = value1.compareTo(value2) > 0;
+ if (!withNot) {
+ return Arrays.<Range> asList(
+ inverted ? Range.closed(value2, value1) : Range.closed(value1, value2));
+ }
+ return Arrays.<Range> asList(Range.lessThan(inverted ? value2 : value1),
+ Range.greaterThan(inverted ? value1 : value2));
+ }
+ case IN:
+ {
+ List<Range> ranges = Lists.newArrayList();
+ for (int i = 1; i < call.getOperands().size(); i++) {
+ RexLiteral literal = extractLiteral(call.getOperands().get(i));
+ if (literal == null) {
+ return null;
+ }
+ Comparable element = literalToType(literal, type);
+ if (element == null) {
+ return null;
+ }
+ if (withNot) {
+ ranges.addAll(
+ Arrays.<Range> asList(Range.lessThan(element), Range.greaterThan(element)));
+ } else {
+ ranges.add(Range.closed(element, element));
+ }
+ }
+ return ranges;
+ }
+ default:
+ return null;
+ }
+ }
+
+ @SuppressWarnings("incomplete-switch")
+ protected static Comparable literalToType(RexLiteral literal, RelDataType type) {
+ // Extract
+ Object value = null;
+ switch (literal.getType().getSqlTypeName()) {
+ case DATE:
+ case TIME:
+ case TIMESTAMP:
+ case INTERVAL_YEAR_MONTH:
+ case INTERVAL_DAY_TIME:
+ value = literal.getValue();
+ break;
+ case TINYINT:
+ case SMALLINT:
+ case INTEGER:
+ case BIGINT:
+ case DOUBLE:
+ case DECIMAL:
+ case FLOAT:
+ case REAL:
+ case VARCHAR:
+ case CHAR:
+ case BOOLEAN:
+ value = literal.getValue3();
+ }
+ if (value == null) {
+ return null;
+ }
+
+ // Convert
+ switch (type.getSqlTypeName()) {
+ case BIGINT:
+ return toLong(value);
+ case INTEGER:
+ return toInt(value);
+ case FLOAT:
+ return toFloat(value);
+ case DOUBLE:
+ return toDouble(value);
+ case VARCHAR:
+ case CHAR:
+ return String.valueOf(value);
+ case TIMESTAMP:
+ return toTimestamp(value);
+ }
+ return null;
+ }
+
+ private static RexLiteral extractLiteral(RexNode node) {
+ RexNode target = node;
+ if (node.getKind() == SqlKind.CAST) {
+ target = ((RexCall)node).getOperands().get(0);
+ }
+ if (!(target instanceof RexLiteral)) {
+ return null;
+ }
+ return (RexLiteral) target;
+ }
+
+ private static Comparable toTimestamp(Object literal) {
+ if (literal instanceof Timestamp) {
+ return (Timestamp) literal;
+ }
+ if (literal instanceof Date) {
+ return new Timestamp(((Date) literal).getTime());
+ }
+ if (literal instanceof Number) {
+ return new Timestamp(((Number) literal).longValue());
+ }
+ if (literal instanceof String) {
+ String string = (String) literal;
+ if (StringUtils.isNumeric(string)) {
+ return new Timestamp(Long.valueOf(string));
+ }
+ try {
+ return Timestamp.valueOf(string);
+ } catch (NumberFormatException e) {
+ // ignore
+ }
+ }
+ return null;
+ }
+
+ private static Long toLong(Object literal) {
+ if (literal instanceof Number) {
+ return ((Number) literal).longValue();
+ }
+ if (literal instanceof Date) {
+ return ((Date) literal).getTime();
+ }
+ if (literal instanceof Timestamp) {
+ return ((Timestamp) literal).getTime();
+ }
+ if (literal instanceof String) {
+ try {
+ return Long.valueOf((String) literal);
+ } catch (NumberFormatException e) {
+ // ignore
+ }
+ try {
+ return DateFormat.getDateInstance().parse((String) literal).getTime();
+ } catch (ParseException e) {
+ // best effort. ignore
+ }
+ }
+ return null;
+ }
+
+
+ private static Integer toInt(Object literal) {
+ if (literal instanceof Number) {
+ return ((Number) literal).intValue();
+ }
+ if (literal instanceof String) {
+ try {
+ return Integer.valueOf((String) literal);
+ } catch (NumberFormatException e) {
+ // ignore
+ }
+ }
+ return null;
+ }
+
+ private static Float toFloat(Object literal) {
+ if (literal instanceof Number) {
+ return ((Number) literal).floatValue();
+ }
+ if (literal instanceof String) {
+ try {
+ return Float.valueOf((String) literal);
+ } catch (NumberFormatException e) {
+ // ignore
+ }
+ }
+ return null;
+ }
+
+ private static Double toDouble(Object literal) {
+ if (literal instanceof Number) {
+ return ((Number) literal).doubleValue();
+ }
+ if (literal instanceof String) {
+ try {
+ return Double.valueOf((String) literal);
+ } catch (NumberFormatException e) {
+ // ignore
+ }
+ }
+ return null;
+ }
+
+ protected static List<Range> condenseRanges(List<Range> ranges) {
+ if (ranges.size() <= 1) {
+ return ranges;
+ }
+
+ Comparator<Range> startThenEnd = new Comparator<Range>() {
+ @Override
+ public int compare(Range lhs, Range rhs) {
+ int compare = 0;
+ if (lhs.hasLowerBound() && rhs.hasLowerBound()) {
+ compare = lhs.lowerEndpoint().compareTo(rhs.lowerEndpoint());
+ } else if (!lhs.hasLowerBound() && rhs.hasLowerBound()) {
+ compare = -1;
+ } else if (lhs.hasLowerBound() && !rhs.hasLowerBound()) {
+ compare = 1;
+ }
+ if (compare != 0) {
+ return compare;
+ }
+ if (lhs.hasUpperBound() && rhs.hasUpperBound()) {
+ compare = lhs.upperEndpoint().compareTo(rhs.upperEndpoint());
+ } else if (!lhs.hasUpperBound() && rhs.hasUpperBound()) {
+ compare = -1;
+ } else if (lhs.hasUpperBound() && !rhs.hasUpperBound()) {
+ compare = 1;
+ }
+ return compare;
+ }
+ };
+
+ TreeSet<Range> sortedIntervals = Sets.newTreeSet(startThenEnd);
+ sortedIntervals.addAll(ranges);
+
+ List<Range> retVal = Lists.newArrayList();
+
+ Iterator<Range> intervalsIter = sortedIntervals.iterator();
+ Range currInterval = intervalsIter.next();
+ while (intervalsIter.hasNext()) {
+ Range next = intervalsIter.next();
+ if (currInterval.encloses(next)) {
+ continue;
+ }
+ if (mergeable(currInterval, next)) {
+ currInterval = currInterval.span(next);
+ } else {
+ retVal.add(currInterval);
+ currInterval = next;
+ }
+ }
+ retVal.add(currInterval);
+
+ return retVal;
+ }
+
+ protected static boolean mergeable(Range range1, Range range2) {
+ Comparable x1 = range1.upperEndpoint();
+ Comparable x2 = range2.lowerEndpoint();
+ int compare = x1.compareTo(x2);
+ return compare > 0 || (compare == 0 && range1.upperBoundType() == BoundType.CLOSED
+ && range2.lowerBoundType() == BoundType.CLOSED);
+ }
+
+ public static long extractTotalTime(List<Interval> intervals) {
+ long totalTime = 0;
+ for (Interval interval : intervals) {
+ totalTime += (interval.getEndMillis() - interval.getStartMillis());
+ }
+ return totalTime;
+ }
+
+}