You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@hive.apache.org by jc...@apache.org on 2016/09/08 08:46:00 UTC

[5/6] hive git commit: HIVE-14217: Druid integration (Jesus Camacho Rodriguez, reviewed by Ashutosh Chauhan)

http://git-wip-us.apache.org/repos/asf/hive/blob/58d1befa/druid-handler/src/java/org/apache/hadoop/hive/druid/serde/DruidTopNQueryRecordReader.java
----------------------------------------------------------------------
diff --git a/druid-handler/src/java/org/apache/hadoop/hive/druid/serde/DruidTopNQueryRecordReader.java b/druid-handler/src/java/org/apache/hadoop/hive/druid/serde/DruidTopNQueryRecordReader.java
new file mode 100644
index 0000000..0b87976
--- /dev/null
+++ b/druid-handler/src/java/org/apache/hadoop/hive/druid/serde/DruidTopNQueryRecordReader.java
@@ -0,0 +1,106 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.hadoop.hive.druid.serde;
+
+import java.io.IOException;
+import java.io.InputStream;
+import java.util.Iterator;
+import java.util.List;
+
+import org.apache.hadoop.hive.druid.DruidStorageHandlerUtils;
+import org.apache.hadoop.hive.ql.optimizer.calcite.druid.DruidTable;
+import org.apache.hadoop.io.NullWritable;
+
+import com.fasterxml.jackson.core.type.TypeReference;
+import com.google.common.collect.Iterators;
+
+import io.druid.query.Result;
+import io.druid.query.topn.DimensionAndMetricValueExtractor;
+import io.druid.query.topn.TopNQuery;
+import io.druid.query.topn.TopNResultValue;
+
+/**
+ * Record reader for results for Druid TopNQuery.
+ */
+public class DruidTopNQueryRecordReader
+        extends DruidQueryRecordReader<TopNQuery, Result<TopNResultValue>> {
+
+  private Result<TopNResultValue> current;
+  private Iterator<DimensionAndMetricValueExtractor> values = Iterators.emptyIterator();
+
+  @Override
+  protected TopNQuery createQuery(String content) throws IOException {
+    return DruidStorageHandlerUtils.JSON_MAPPER.readValue(content, TopNQuery.class);
+  }
+
+  @Override
+  protected List<Result<TopNResultValue>> createResultsList(InputStream content) throws IOException {
+    return DruidStorageHandlerUtils.SMILE_MAPPER.readValue(content,
+            new TypeReference<List<Result<TopNResultValue>>>(){});
+  }
+
+  @Override
+  public boolean nextKeyValue() {
+    if (values.hasNext()) {
+      return true;
+    }
+    if (results.hasNext()) {
+      current = results.next();
+      values = current.getValue().getValue().iterator();
+      return true;
+    }
+    return false;
+  }
+
+  @Override
+  public NullWritable getCurrentKey() throws IOException, InterruptedException {
+    return NullWritable.get();
+  }
+
+  @Override
+  public DruidWritable getCurrentValue() throws IOException, InterruptedException {
+    // Create new value
+    DruidWritable value = new DruidWritable();
+    value.getValue().put(DruidTable.DEFAULT_TIMESTAMP_COLUMN, current.getTimestamp().getMillis());
+    if (values.hasNext()) {
+      value.getValue().putAll(values.next().getBaseObject());
+      return value;
+    }
+    return value;
+  }
+
+  @Override
+  public boolean next(NullWritable key, DruidWritable value) {
+    if (nextKeyValue()) {
+      // Update value
+      value.getValue().clear();
+      value.getValue().put(DruidTable.DEFAULT_TIMESTAMP_COLUMN, current.getTimestamp().getMillis());
+      if (values.hasNext()) {
+        value.getValue().putAll(values.next().getBaseObject());
+      }
+      return true;
+    }
+    return false;
+  }
+
+  @Override
+  public float getProgress() {
+    return results.hasNext() || values.hasNext() ? 0 : 1;
+  }
+
+}

http://git-wip-us.apache.org/repos/asf/hive/blob/58d1befa/druid-handler/src/java/org/apache/hadoop/hive/druid/serde/DruidWritable.java
----------------------------------------------------------------------
diff --git a/druid-handler/src/java/org/apache/hadoop/hive/druid/serde/DruidWritable.java b/druid-handler/src/java/org/apache/hadoop/hive/druid/serde/DruidWritable.java
new file mode 100644
index 0000000..77ffcd4
--- /dev/null
+++ b/druid-handler/src/java/org/apache/hadoop/hive/druid/serde/DruidWritable.java
@@ -0,0 +1,81 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.hadoop.hive.druid.serde;
+
+import java.io.DataInput;
+import java.io.DataOutput;
+import java.io.IOException;
+import java.util.HashMap;
+import java.util.Map;
+
+import org.apache.hadoop.io.Writable;
+
+import com.google.common.base.Objects;
+
+/**
+ * Writable for Druid results.
+ */
+public class DruidWritable implements Writable {
+
+  private final Map<String, Object> value;
+
+  public DruidWritable() {
+    value = new HashMap<>();
+  }
+
+  public DruidWritable(Map<String, Object> value) {
+    this.value = value;
+  }
+
+  public Map<String, Object> getValue() {
+    return value;
+  }
+
+  @Override
+  public void write(DataOutput out) throws IOException {
+    throw new UnsupportedOperationException();
+  }
+
+  @Override
+  public void readFields(DataInput in) throws IOException {
+    throw new UnsupportedOperationException();
+  }
+
+  @Override
+  public int hashCode() {
+    return Objects.hashCode(value);
+  }
+
+  @Override
+  public boolean equals(Object o) {
+    if (this == o) {
+      return true;
+    }
+    if (o == null || getClass() != o.getClass()) {
+      return false;
+    }
+
+    return Objects.equal(value, ((DruidWritable) o).value);
+  }
+
+  @Override
+  public String toString() {
+    return "DruidWritable{value=" + value + '}';
+  }
+
+}

http://git-wip-us.apache.org/repos/asf/hive/blob/58d1befa/druid-handler/src/test/org/apache/hadoop/hive/druid/QTestDruidSerDe.java
----------------------------------------------------------------------
diff --git a/druid-handler/src/test/org/apache/hadoop/hive/druid/QTestDruidSerDe.java b/druid-handler/src/test/org/apache/hadoop/hive/druid/QTestDruidSerDe.java
new file mode 100644
index 0000000..2b4df78
--- /dev/null
+++ b/druid-handler/src/test/org/apache/hadoop/hive/druid/QTestDruidSerDe.java
@@ -0,0 +1,88 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.hadoop.hive.druid;
+
+import java.util.List;
+
+import org.apache.hadoop.hive.druid.serde.DruidSerDe;
+import org.apache.hadoop.hive.serde2.SerDeException;
+import org.apache.hadoop.util.StringUtils;
+
+import com.fasterxml.jackson.core.type.TypeReference;
+
+import io.druid.query.metadata.metadata.SegmentAnalysis;
+import io.druid.query.metadata.metadata.SegmentMetadataQuery;
+
+/**
+ * Druid SerDe to be used in tests.
+ */
+public class QTestDruidSerDe extends DruidSerDe {
+
+  // Request :
+  //        "{\"queryType\":\"segmentMetadata\",\"dataSource\":{\"type\":\"table\",\"name\":\"wikipedia\"},"
+  //        + "\"intervals\":{\"type\":\"intervals\","
+  //        + "\"intervals\":[\"-146136543-09-08T00:30:34.096-07:52:58/146140482-04-24T08:36:27.903-07:00\"]},"
+  //        + "\"toInclude\":{\"type\":\"all\"},\"merge\":true,\"context\":null,\"analysisTypes\":[],"
+  //        + "\"usingDefaultInterval\":true,\"lenientAggregatorMerge\":false,\"descending\":false}";
+  private static final String RESPONSE =
+          "[ {\r\n "
+          + " \"id\" : \"merged\",\r\n "
+          + " \"intervals\" : [ \"2010-01-01T00:00:00.000Z/2015-12-31T00:00:00.000Z\" ],\r\n "
+          + " \"columns\" : {\r\n  "
+          + "  \"__time\" : { \"type\" : \"LONG\", \"hasMultipleValues\" : false, \"size\" : 407240380, \"cardinality\" : null, \"errorMessage\" : null },\r\n  "
+          + "  \"robot\" : { \"type\" : \"STRING\", \"hasMultipleValues\" : false, \"size\" : 100000, \"cardinality\" : 1944, \"errorMessage\" : null },\r\n  "
+          + "  \"namespace\" : { \"type\" : \"STRING\", \"hasMultipleValues\" : true, \"size\" : 100000, \"cardinality\" : 1504, \"errorMessage\" : null },\r\n  "
+          + "  \"anonymous\" : { \"type\" : \"STRING\", \"hasMultipleValues\" : false, \"size\" : 100000, \"cardinality\" : 1944, \"errorMessage\" : null },\r\n  "
+          + "  \"unpatrolled\" : { \"type\" : \"STRING\", \"hasMultipleValues\" : false, \"size\" : 100000, \"cardinality\" : 1944, \"errorMessage\" : null },\r\n  "
+          + "  \"page\" : { \"type\" : \"STRING\", \"hasMultipleValues\" : false, \"size\" : 100000, \"cardinality\" : 1944, \"errorMessage\" : null },\r\n  "
+          + "  \"language\" : { \"type\" : \"STRING\", \"hasMultipleValues\" : false, \"size\" : 100000, \"cardinality\" : 1944, \"errorMessage\" : null },\r\n  "
+          + "  \"newpage\" : { \"type\" : \"STRING\", \"hasMultipleValues\" : false, \"size\" : 100000, \"cardinality\" : 1944, \"errorMessage\" : null },\r\n  "
+          + "  \"user\" : { \"type\" : \"STRING\", \"hasMultipleValues\" : false, \"size\" : 100000, \"cardinality\" : 1944, \"errorMessage\" : null },\r\n  "
+          + "  \"count\" : { \"type\" : \"FLOAT\", \"hasMultipleValues\" : false, \"size\" : 100000, \"cardinality\" : null, \"errorMessage\" : null },\r\n  "
+          + "  \"added\" : { \"type\" : \"FLOAT\", \"hasMultipleValues\" : false, \"size\" : 100000, \"cardinality\" : null, \"errorMessage\" : null },\r\n  "
+          + "  \"delta\" : { \"type\" : \"FLOAT\", \"hasMultipleValues\" : false, \"size\" : 100000, \"cardinality\" : null, \"errorMessage\" : null },\r\n  "
+          + "  \"variation\" : { \"type\" : \"FLOAT\", \"hasMultipleValues\" : false, \"size\" : 100000, \"cardinality\" : null, \"errorMessage\" : null },\r\n  "
+          + "  \"deleted\" : { \"type\" : \"FLOAT\", \"hasMultipleValues\" : false, \"size\" : 100000, \"cardinality\" : null, \"errorMessage\" : null }\r\n "
+          + " },\r\n "
+          + " \"aggregators\" : {\r\n  "
+          + "  \"count\" : { \"type\" : \"longSum\", \"name\" : \"count\", \"fieldName\" : \"count\" },\r\n  "
+          + "  \"added\" : { \"type\" : \"doubleSum\", \"name\" : \"added\", \"fieldName\" : \"added\" },\r\n  "
+          + "  \"delta\" : { \"type\" : \"doubleSum\", \"name\" : \"delta\", \"fieldName\" : \"delta\" },\r\n  "
+          + "  \"variation\" : { \"type\" : \"doubleSum\", \"name\" : \"variation\", \"fieldName\" : \"variation\" },\r\n  "
+          + "  \"deleted\" : { \"type\" : \"doubleSum\", \"name\" : \"deleted\", \"fieldName\" : \"deleted\" }\r\n "
+          + " },\r\n "
+          + " \"queryGranularity\" : {\r\n    \"type\": \"none\"\r\n  },\r\n "
+          + " \"size\" : 300000,\r\n "
+          + " \"numRows\" : 5000000\r\n} ]";
+
+  /* Submits the request and returns */
+  @Override
+  protected SegmentAnalysis submitMetadataRequest(String address, SegmentMetadataQuery query)
+          throws SerDeException {
+    // Retrieve results
+    List<SegmentAnalysis> resultsList;
+    try {
+      resultsList = DruidStorageHandlerUtils.JSON_MAPPER.readValue(RESPONSE,
+            new TypeReference<List<SegmentAnalysis>>() {});
+    } catch (Exception e) {
+      throw new SerDeException(StringUtils.stringifyException(e));
+    }
+    return resultsList.get(0);
+  }
+
+}

http://git-wip-us.apache.org/repos/asf/hive/blob/58d1befa/druid-handler/src/test/org/apache/hadoop/hive/druid/QTestDruidStorageHandler.java
----------------------------------------------------------------------
diff --git a/druid-handler/src/test/org/apache/hadoop/hive/druid/QTestDruidStorageHandler.java b/druid-handler/src/test/org/apache/hadoop/hive/druid/QTestDruidStorageHandler.java
new file mode 100644
index 0000000..0a44aaa
--- /dev/null
+++ b/druid-handler/src/test/org/apache/hadoop/hive/druid/QTestDruidStorageHandler.java
@@ -0,0 +1,34 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.hadoop.hive.druid;
+
+import org.apache.hadoop.hive.serde2.SerDe;
+
+/**
+ * Storage handler for Druid to be used in tests. It cannot connect to
+ * Druid, and thus it cannot execute queries.
+ */
+@SuppressWarnings("deprecation")
+public class QTestDruidStorageHandler extends DruidStorageHandler {
+
+  @Override
+  public Class<? extends SerDe> getSerDeClass() {
+    return QTestDruidSerDe.class;
+  }
+
+}

http://git-wip-us.apache.org/repos/asf/hive/blob/58d1befa/druid-handler/src/test/org/apache/hadoop/hive/druid/TestDruidSerDe.java
----------------------------------------------------------------------
diff --git a/druid-handler/src/test/org/apache/hadoop/hive/druid/TestDruidSerDe.java b/druid-handler/src/test/org/apache/hadoop/hive/druid/TestDruidSerDe.java
new file mode 100644
index 0000000..9c5c65c
--- /dev/null
+++ b/druid-handler/src/test/org/apache/hadoop/hive/druid/TestDruidSerDe.java
@@ -0,0 +1,576 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.hadoop.hive.druid;
+
+import static org.junit.Assert.assertEquals;
+
+import java.io.IOException;
+import java.lang.reflect.Field;
+import java.lang.reflect.InvocationTargetException;
+import java.lang.reflect.Method;
+import java.sql.Timestamp;
+import java.util.Iterator;
+import java.util.List;
+import java.util.Properties;
+
+import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.hive.conf.Constants;
+import org.apache.hadoop.hive.druid.serde.DruidGroupByQueryRecordReader;
+import org.apache.hadoop.hive.druid.serde.DruidQueryRecordReader;
+import org.apache.hadoop.hive.druid.serde.DruidSelectQueryRecordReader;
+import org.apache.hadoop.hive.druid.serde.DruidSerDe;
+import org.apache.hadoop.hive.druid.serde.DruidTimeseriesQueryRecordReader;
+import org.apache.hadoop.hive.druid.serde.DruidTopNQueryRecordReader;
+import org.apache.hadoop.hive.druid.serde.DruidWritable;
+import org.apache.hadoop.hive.serde2.SerDeException;
+import org.apache.hadoop.hive.serde2.SerDeUtils;
+import org.apache.hadoop.hive.serde2.io.TimestampWritable;
+import org.apache.hadoop.hive.serde2.objectinspector.StructField;
+import org.apache.hadoop.hive.serde2.objectinspector.StructObjectInspector;
+import org.apache.hadoop.io.FloatWritable;
+import org.apache.hadoop.io.LongWritable;
+import org.apache.hadoop.io.NullWritable;
+import org.apache.hadoop.io.Text;
+import org.junit.Test;
+
+import com.fasterxml.jackson.core.JsonParseException;
+import com.fasterxml.jackson.core.type.TypeReference;
+import com.fasterxml.jackson.databind.JsonMappingException;
+import com.fasterxml.jackson.databind.ObjectMapper;
+
+import io.druid.data.input.Row;
+import io.druid.jackson.DefaultObjectMapper;
+import io.druid.query.Query;
+import io.druid.query.Result;
+import io.druid.query.groupby.GroupByQuery;
+import io.druid.query.select.SelectQuery;
+import io.druid.query.select.SelectResultValue;
+import io.druid.query.timeseries.TimeseriesQuery;
+import io.druid.query.timeseries.TimeseriesResultValue;
+import io.druid.query.topn.TopNQuery;
+import io.druid.query.topn.TopNResultValue;
+
+/**
+ * Basic tests for Druid SerDe. The examples are taken from Druid 0.9.1.1
+ * documentation.
+ */
+public class TestDruidSerDe {
+
+  // Timeseries query
+  private static final String TIMESERIES_QUERY =
+          "{  \"queryType\": \"timeseries\", "
+          + " \"dataSource\": \"sample_datasource\", "
+          + " \"granularity\": \"day\", "
+          + " \"descending\": \"true\", "
+          + " \"filter\": {  "
+          + "  \"type\": \"and\",  "
+          + "  \"fields\": [   "
+          + "   { \"type\": \"selector\", \"dimension\": \"sample_dimension1\", \"value\": \"sample_value1\" },   "
+          + "   { \"type\": \"or\",    "
+          + "    \"fields\": [     "
+          + "     { \"type\": \"selector\", \"dimension\": \"sample_dimension2\", \"value\": \"sample_value2\" },     "
+          + "     { \"type\": \"selector\", \"dimension\": \"sample_dimension3\", \"value\": \"sample_value3\" }    "
+          + "    ]   "
+          + "   }  "
+          + "  ] "
+          + " }, "
+          + " \"aggregations\": [  "
+          + "  { \"type\": \"longSum\", \"name\": \"sample_name1\", \"fieldName\": \"sample_fieldName1\" },  "
+          + "  { \"type\": \"doubleSum\", \"name\": \"sample_name2\", \"fieldName\": \"sample_fieldName2\" } "
+          + " ], "
+          + " \"postAggregations\": [  "
+          + "  { \"type\": \"arithmetic\",  "
+          + "    \"name\": \"sample_divide\",  "
+          + "    \"fn\": \"/\",  "
+          + "    \"fields\": [   "
+          + "     { \"type\": \"fieldAccess\", \"name\": \"postAgg__sample_name1\", \"fieldName\": \"sample_name1\" },   "
+          + "     { \"type\": \"fieldAccess\", \"name\": \"postAgg__sample_name2\", \"fieldName\": \"sample_name2\" }  "
+          + "    ]  "
+          + "  } "
+          + " ], "
+          + " \"intervals\": [ \"2012-01-01T00:00:00.000/2012-01-03T00:00:00.000\" ]}";
+  // Timeseries query results
+  private static final String TIMESERIES_QUERY_RESULTS =
+          "[  "
+          + "{   "
+          + " \"timestamp\": \"2012-01-01T00:00:00.000Z\",   "
+          + " \"result\": { \"sample_name1\": 0, \"sample_name2\": 1.0, \"sample_divide\": 2.2222 }   "
+          + "},  "
+          + "{   "
+          + " \"timestamp\": \"2012-01-02T00:00:00.000Z\",   "
+          + " \"result\": { \"sample_name1\": 2, \"sample_name2\": 3.32, \"sample_divide\": 4 }  "
+          + "}]";
+  // Timeseries query results as records
+  private static final Object[][] TIMESERIES_QUERY_RESULTS_RECORDS = new Object[][] {
+    new Object[] { new TimestampWritable(new Timestamp(1325376000000L)), new LongWritable(0), new FloatWritable(1.0F), new FloatWritable(2.2222F) } ,
+    new Object[] { new TimestampWritable(new Timestamp(1325462400000L)), new LongWritable(2), new FloatWritable(3.32F), new FloatWritable(4F)}
+  };
+
+  // TopN query
+  private static final String TOPN_QUERY =
+          "{  \"queryType\": \"topN\", "
+          + " \"dataSource\": \"sample_data\", "
+          + " \"dimension\": \"sample_dim\", "
+          + " \"threshold\": 5, "
+          + " \"metric\": \"count\", "
+          + " \"granularity\": \"all\", "
+          + " \"filter\": {  "
+          + "  \"type\": \"and\",  "
+          + "  \"fields\": [   "
+          + "   {    "
+          + "    \"type\": \"selector\",    "
+          + "    \"dimension\": \"dim1\",    "
+          + "    \"value\": \"some_value\"   "
+          + "   },   "
+          + "   {    "
+          + "    \"type\": \"selector\",    "
+          + "    \"dimension\": \"dim2\",    "
+          + "    \"value\": \"some_other_val\"   "
+          + "   }  "
+          + "  ] "
+          + " }, "
+          + " \"aggregations\": [  "
+          + "  {   "
+          + "   \"type\": \"longSum\",   "
+          + "   \"name\": \"count\",   "
+          + "   \"fieldName\": \"count\"  "
+          + "  },  "
+          + "  {   "
+          + "   \"type\": \"doubleSum\",   "
+          + "   \"name\": \"some_metric\",   "
+          + "   \"fieldName\": \"some_metric\"  "
+          + "  } "
+          + " ], "
+          + " \"postAggregations\": [  "
+          + "  {   "
+          + "   \"type\": \"arithmetic\",   "
+          + "   \"name\": \"sample_divide\",   "
+          + "   \"fn\": \"/\",   "
+          + "   \"fields\": [    "
+          + "    {     "
+          + "     \"type\": \"fieldAccess\",     "
+          + "     \"name\": \"some_metric\",     "
+          + "     \"fieldName\": \"some_metric\"    "
+          + "    },    "
+          + "    {     "
+          + "     \"type\": \"fieldAccess\",     "
+          + "     \"name\": \"count\",     "
+          + "     \"fieldName\": \"count\"    "
+          + "    }   "
+          + "   ]  "
+          + "  } "
+          + " ], "
+          + " \"intervals\": [  "
+          + "  \"2013-08-31T00:00:00.000/2013-09-03T00:00:00.000\" "
+          + " ]}";
+  // TopN query results
+  private static final String TOPN_QUERY_RESULTS =
+          "[ "
+          + " {  "
+          + "  \"timestamp\": \"2013-08-31T00:00:00.000Z\",  "
+          + "  \"result\": [   "
+          + "   {   "
+          + "     \"sample_dim\": \"dim1_val\",   "
+          + "     \"count\": 111,   "
+          + "     \"some_metric\": 10669,   "
+          + "     \"sample_divide\": 96.11711711711712   "
+          + "   },   "
+          + "   {   "
+          + "     \"sample_dim\": \"another_dim1_val\",   "
+          + "     \"count\": 88,   "
+          + "     \"some_metric\": 28344,   "
+          + "     \"sample_divide\": 322.09090909090907   "
+          + "   },   "
+          + "   {   "
+          + "     \"sample_dim\": \"dim1_val3\",   "
+          + "     \"count\": 70,   "
+          + "     \"some_metric\": 871,   "
+          + "     \"sample_divide\": 12.442857142857143   "
+          + "   },   "
+          + "   {   "
+          + "     \"sample_dim\": \"dim1_val4\",   "
+          + "     \"count\": 62,   "
+          + "     \"some_metric\": 815,   "
+          + "     \"sample_divide\": 13.14516129032258   "
+          + "   },   "
+          + "   {   "
+          + "     \"sample_dim\": \"dim1_val5\",   "
+          + "     \"count\": 60,   "
+          + "     \"some_metric\": 2787,   "
+          + "     \"sample_divide\": 46.45   "
+          + "   }  "
+          + "  ] "
+          + " }]";
+  // TopN query results as records
+  private static final Object[][] TOPN_QUERY_RESULTS_RECORDS = new Object[][] {
+    new Object[] { new TimestampWritable(new Timestamp(1377907200000L)), new Text("dim1_val"), new LongWritable(111), new FloatWritable(10669F), new FloatWritable(96.11711711711712F) } ,
+    new Object[] { new TimestampWritable(new Timestamp(1377907200000L)), new Text("another_dim1_val"), new LongWritable(88), new FloatWritable(28344F), new FloatWritable(322.09090909090907F) } ,
+    new Object[] { new TimestampWritable(new Timestamp(1377907200000L)), new Text("dim1_val3"), new LongWritable(70), new FloatWritable(871F), new FloatWritable(12.442857142857143F) } ,
+    new Object[] { new TimestampWritable(new Timestamp(1377907200000L)), new Text("dim1_val4"), new LongWritable(62), new FloatWritable(815F), new FloatWritable(13.14516129032258F) } ,
+    new Object[] { new TimestampWritable(new Timestamp(1377907200000L)), new Text("dim1_val5"), new LongWritable(60), new FloatWritable(2787F), new FloatWritable(46.45F) }
+  };
+
+  // GroupBy query
+  private static final String GROUP_BY_QUERY =
+          "{ "
+          + " \"queryType\": \"groupBy\", "
+          + " \"dataSource\": \"sample_datasource\", "
+          + " \"granularity\": \"day\", "
+          + " \"dimensions\": [\"country\", \"device\"], "
+          + " \"limitSpec\": {"
+          + " \"type\": \"default\","
+          + " \"limit\": 5000,"
+          + " \"columns\": [\"country\", \"data_transfer\"] }, "
+          + " \"filter\": {  "
+          + "  \"type\": \"and\",  "
+          + "  \"fields\": [   "
+          + "   { \"type\": \"selector\", \"dimension\": \"carrier\", \"value\": \"AT&T\" },   "
+          + "   { \"type\": \"or\",     "
+          + "    \"fields\": [     "
+          + "     { \"type\": \"selector\", \"dimension\": \"make\", \"value\": \"Apple\" },     "
+          + "     { \"type\": \"selector\", \"dimension\": \"make\", \"value\": \"Samsung\" }    "
+          + "    ]   "
+          + "   }  "
+          + "  ] "
+          + " }, "
+          + " \"aggregations\": [  "
+          + "  { \"type\": \"longSum\", \"name\": \"total_usage\", \"fieldName\": \"user_count\" },  "
+          + "  { \"type\": \"doubleSum\", \"name\": \"data_transfer\", \"fieldName\": \"data_transfer\" } "
+          + " ], "
+          + " \"postAggregations\": [  "
+          + "  { \"type\": \"arithmetic\",  "
+          + "    \"name\": \"avg_usage\",  "
+          + "    \"fn\": \"/\",  "
+          + "    \"fields\": [   "
+          + "     { \"type\": \"fieldAccess\", \"fieldName\": \"data_transfer\" },   "
+          + "     { \"type\": \"fieldAccess\", \"fieldName\": \"total_usage\" }  "
+          + "    ]  "
+          + "  } "
+          + " ], "
+          + " \"intervals\": [ \"2012-01-01T00:00:00.000/2012-01-03T00:00:00.000\" ], "
+          + " \"having\": {  "
+          + "  \"type\": \"greaterThan\",  "
+          + "  \"aggregation\": \"total_usage\",  "
+          + "  \"value\": 100 "
+          + " }}";
+  // GroupBy query results
+  private static final String GROUP_BY_QUERY_RESULTS =
+          "[  "
+          + " {  "
+          + "  \"version\" : \"v1\",  "
+          + "  \"timestamp\" : \"2012-01-01T00:00:00.000Z\",  "
+          + "  \"event\" : {   "
+          + "   \"country\" : \"India\",   "
+          + "   \"device\" : \"phone\",   "
+          + "   \"total_usage\" : 88,   "
+          + "   \"data_transfer\" : 29.91233453,   "
+          + "   \"avg_usage\" : 60.32  "
+          + "  } "
+          + " },  "
+          + " {  "
+          + "  \"version\" : \"v1\",  "
+          + "  \"timestamp\" : \"2012-01-01T00:00:12.000Z\",  "
+          + "  \"event\" : {   "
+          + "   \"country\" : \"Spain\",   "
+          + "   \"device\" : \"pc\",   "
+          + "   \"total_usage\" : 16,   "
+          + "   \"data_transfer\" : 172.93494959,   "
+          + "   \"avg_usage\" : 6.333333  "
+          + "  } "
+          + " }]";
+  // GroupBy query results as records
+  private static final Object[][] GROUP_BY_QUERY_RESULTS_RECORDS = new Object[][] {
+    new Object[] { new TimestampWritable(new Timestamp(1325376000000L)), new Text("India"), new Text("phone"), new LongWritable(88), new FloatWritable(29.91233453F), new FloatWritable(60.32F) } ,
+    new Object[] { new TimestampWritable(new Timestamp(1325376012000L)), new Text("Spain"), new Text("pc"), new LongWritable(16), new FloatWritable(172.93494959F), new FloatWritable(6.333333F) }
+  };
+
+  // Select query
+  private static final String SELECT_QUERY =
+          "{   \"queryType\": \"select\",  "
+          + " \"dataSource\": \"wikipedia\",   \"descending\": \"false\",  "
+          + " \"dimensions\":[\"robot\",\"namespace\",\"anonymous\",\"unpatrolled\",\"page\",\"language\",\"newpage\",\"user\"],  "
+          + " \"metrics\":[\"count\",\"added\",\"delta\",\"variation\",\"deleted\"],  "
+          + " \"granularity\": \"all\",  "
+          + " \"intervals\": [     \"2013-01-01/2013-01-02\"   ],  "
+          + " \"pagingSpec\":{\"pagingIdentifiers\": {}, \"threshold\":5} }";
+  // Select query results
+  private static final String SELECT_QUERY_RESULTS =
+          "[{ "
+          + " \"timestamp\" : \"2013-01-01T00:00:00.000Z\", "
+          + " \"result\" : {  "
+          + "  \"pagingIdentifiers\" : {   "
+          + "   \"wikipedia_2012-12-29T00:00:00.000Z_2013-01-10T08:00:00.000Z_2013-01-10T08:13:47.830Z_v9\" : 4    }, "
+          + "   \"events\" : [ {  "
+          + "    \"segmentId\" : \"wikipedia_editstream_2012-12-29T00:00:00.000Z_2013-01-10T08:00:00.000Z_2013-01-10T08:13:47.830Z_v9\",  "
+          + "    \"offset\" : 0,  "
+          + "    \"event\" : {   "
+          + "     \"timestamp\" : \"2013-01-01T00:00:00.000Z\",   "
+          + "     \"robot\" : \"1\",   "
+          + "     \"namespace\" : \"article\",   "
+          + "     \"anonymous\" : \"0\",   "
+          + "     \"unpatrolled\" : \"0\",   "
+          + "     \"page\" : \"11._korpus_(NOVJ)\",   "
+          + "     \"language\" : \"sl\",   "
+          + "     \"newpage\" : \"0\",   "
+          + "     \"user\" : \"EmausBot\",   "
+          + "     \"count\" : 1.0,   "
+          + "     \"added\" : 39.0,   "
+          + "     \"delta\" : 39.0,   "
+          + "     \"variation\" : 39.0,   "
+          + "     \"deleted\" : 0.0  "
+          + "    } "
+          + "   }, {  "
+          + "    \"segmentId\" : \"wikipedia_2012-12-29T00:00:00.000Z_2013-01-10T08:00:00.000Z_2013-01-10T08:13:47.830Z_v9\",  "
+          + "    \"offset\" : 1,  "
+          + "    \"event\" : {   "
+          + "     \"timestamp\" : \"2013-01-01T00:00:00.000Z\",   "
+          + "     \"robot\" : \"0\",   "
+          + "     \"namespace\" : \"article\",   "
+          + "     \"anonymous\" : \"0\",   "
+          + "     \"unpatrolled\" : \"0\",   "
+          + "     \"page\" : \"112_U.S._580\",   "
+          + "     \"language\" : \"en\",   "
+          + "     \"newpage\" : \"1\",   "
+          + "     \"user\" : \"MZMcBride\",   "
+          + "     \"count\" : 1.0,   "
+          + "     \"added\" : 70.0,   "
+          + "     \"delta\" : 70.0,   "
+          + "     \"variation\" : 70.0,   "
+          + "     \"deleted\" : 0.0  "
+          + "    } "
+          + "   }, {  "
+          + "    \"segmentId\" : \"wikipedia_2012-12-29T00:00:00.000Z_2013-01-10T08:00:00.000Z_2013-01-10T08:13:47.830Z_v9\",  "
+          + "    \"offset\" : 2,  "
+          + "    \"event\" : {   "
+          + "     \"timestamp\" : \"2013-01-01T00:00:00.000Z\",   "
+          + "     \"robot\" : \"0\",   "
+          + "     \"namespace\" : \"article\",   "
+          + "     \"anonymous\" : \"0\",   "
+          + "     \"unpatrolled\" : \"0\",   "
+          + "     \"page\" : \"113_U.S._243\",   "
+          + "     \"language\" : \"en\",   "
+          + "     \"newpage\" : \"1\",   "
+          + "     \"user\" : \"MZMcBride\",   "
+          + "     \"count\" : 1.0,   "
+          + "     \"added\" : 77.0,   "
+          + "     \"delta\" : 77.0,   "
+          + "     \"variation\" : 77.0,   "
+          + "     \"deleted\" : 0.0  "
+          + "    } "
+          + "   }, {  "
+          + "    \"segmentId\" : \"wikipedia_2012-12-29T00:00:00.000Z_2013-01-10T08:00:00.000Z_2013-01-10T08:13:47.830Z_v9\",  "
+          + "    \"offset\" : 3,  "
+          + "    \"event\" : {   "
+          + "     \"timestamp\" : \"2013-01-01T00:00:00.000Z\",   "
+          + "     \"robot\" : \"0\",   "
+          + "     \"namespace\" : \"article\",   "
+          + "     \"anonymous\" : \"0\",   "
+          + "     \"unpatrolled\" : \"0\",   "
+          + "     \"page\" : \"113_U.S._73\",   "
+          + "     \"language\" : \"en\",   "
+          + "     \"newpage\" : \"1\",   "
+          + "     \"user\" : \"MZMcBride\",   "
+          + "     \"count\" : 1.0,   "
+          + "     \"added\" : 70.0,   "
+          + "     \"delta\" : 70.0,   "
+          + "     \"variation\" : 70.0,   "
+          + "     \"deleted\" : 0.0  "
+          + "    } "
+          + "   }, {  "
+          + "    \"segmentId\" : \"wikipedia_2012-12-29T00:00:00.000Z_2013-01-10T08:00:00.000Z_2013-01-10T08:13:47.830Z_v9\",  "
+          + "    \"offset\" : 4,  "
+          + "    \"event\" : {   "
+          + "     \"timestamp\" : \"2013-01-01T00:00:00.000Z\",   "
+          + "     \"robot\" : \"0\",   "
+          + "     \"namespace\" : \"article\",   "
+          + "     \"anonymous\" : \"0\",   "
+          + "     \"unpatrolled\" : \"0\",   "
+          + "     \"page\" : \"113_U.S._756\",   "
+          + "     \"language\" : \"en\",   "
+          + "     \"newpage\" : \"1\",   "
+          + "     \"user\" : \"MZMcBride\",   "
+          + "     \"count\" : 1.0,   "
+          + "     \"added\" : 68.0,   "
+          + "     \"delta\" : 68.0,   "
+          + "     \"variation\" : 68.0,   "
+          + "     \"deleted\" : 0.0  "
+          + "    } "
+          + "   } ]  }} ]";
+  // Select query results as records
+  private static final Object[][] SELECT_QUERY_RESULTS_RECORDS = new Object[][] {
+    new Object[] { new TimestampWritable(new Timestamp(1356998400000L)), new Text("1"), new Text("article"), new Text("0"), new Text("0"),
+        new Text("11._korpus_(NOVJ)"), new Text("sl"), new Text("0"), new Text("EmausBot"),
+        new FloatWritable(1.0F), new FloatWritable(39.0F), new FloatWritable(39.0F), new FloatWritable(39.0F), new FloatWritable(0.0F) } ,
+    new Object[] { new TimestampWritable(new Timestamp(1356998400000L)), new Text("0"), new Text("article"), new Text("0"), new Text("0"),
+        new Text("112_U.S._580"), new Text("en"), new Text("1"), new Text("MZMcBride"),
+        new FloatWritable(1.0F), new FloatWritable(70.0F), new FloatWritable(70.0F), new FloatWritable(70.0F), new FloatWritable(0.0F) } ,
+    new Object[] { new TimestampWritable(new Timestamp(1356998400000L)), new Text("0"), new Text("article"), new Text("0"), new Text("0"),
+        new Text("113_U.S._243"), new Text("en"), new Text("1"), new Text("MZMcBride"),
+        new FloatWritable(1.0F), new FloatWritable(77.0F), new FloatWritable(77.0F), new FloatWritable(77.0F), new FloatWritable(0.0F) } ,
+    new Object[] { new TimestampWritable(new Timestamp(1356998400000L)), new Text("0"), new Text("article"), new Text("0"), new Text("0"),
+        new Text("113_U.S._73"), new Text("en"), new Text("1"), new Text("MZMcBride"),
+        new FloatWritable(1.0F), new FloatWritable(70.0F), new FloatWritable(70.0F), new FloatWritable(70.0F), new FloatWritable(0.0F) } ,
+    new Object[] { new TimestampWritable(new Timestamp(1356998400000L)), new Text("0"), new Text("article"), new Text("0"), new Text("0"),
+        new Text("113_U.S._756"), new Text("en"), new Text("1"), new Text("MZMcBride"),
+        new FloatWritable(1.0F), new FloatWritable(68.0F), new FloatWritable(68.0F), new FloatWritable(68.0F), new FloatWritable(0.0F) }
+  };
+
+
+  /**
+   * Test the default behavior of the objects and object inspectors.
+   * @throws IOException 
+   * @throws IllegalAccessException 
+   * @throws IllegalArgumentException 
+   * @throws SecurityException 
+   * @throws NoSuchFieldException 
+   * @throws JsonMappingException 
+   * @throws JsonParseException 
+   * @throws InvocationTargetException 
+   * @throws NoSuchMethodException 
+   */
+  @Test
+  public void testDruidSerDe()
+          throws SerDeException, JsonParseException, JsonMappingException,
+          NoSuchFieldException, SecurityException, IllegalArgumentException,
+          IllegalAccessException, IOException, InterruptedException,
+          NoSuchMethodException, InvocationTargetException {
+    // Create, initialize, and test the SerDe
+    DruidSerDe serDe = new DruidSerDe();
+    Configuration conf = new Configuration();
+    Properties tbl;
+    // Timeseries query
+    tbl = createPropertiesQuery("sample_datasource", Query.TIMESERIES, TIMESERIES_QUERY);
+    SerDeUtils.initializeSerDe(serDe, conf, tbl, null);
+    deserializeQueryResults(serDe, Query.TIMESERIES, TIMESERIES_QUERY,
+            TIMESERIES_QUERY_RESULTS, TIMESERIES_QUERY_RESULTS_RECORDS);
+    // TopN query
+    tbl = createPropertiesQuery("sample_data", Query.TOPN, TOPN_QUERY);
+    SerDeUtils.initializeSerDe(serDe, conf, tbl, null);
+    deserializeQueryResults(serDe, Query.TOPN, TOPN_QUERY,
+            TOPN_QUERY_RESULTS, TOPN_QUERY_RESULTS_RECORDS);
+    // GroupBy query
+    tbl = createPropertiesQuery("sample_datasource", Query.GROUP_BY, GROUP_BY_QUERY);
+    SerDeUtils.initializeSerDe(serDe, conf, tbl, null);
+    deserializeQueryResults(serDe, Query.GROUP_BY, GROUP_BY_QUERY,
+            GROUP_BY_QUERY_RESULTS, GROUP_BY_QUERY_RESULTS_RECORDS);
+    // Select query
+    tbl = createPropertiesQuery("wikipedia", Query.SELECT, SELECT_QUERY);
+    SerDeUtils.initializeSerDe(serDe, conf, tbl, null);
+    deserializeQueryResults(serDe, Query.SELECT, SELECT_QUERY,
+            SELECT_QUERY_RESULTS, SELECT_QUERY_RESULTS_RECORDS);
+  }
+
+  private static Properties createPropertiesQuery(String dataSource, String queryType, String jsonQuery) {
+    Properties tbl = new Properties();
+
+    // Set the configuration parameters
+    tbl.setProperty(Constants.DRUID_DATA_SOURCE, dataSource);
+    tbl.setProperty(Constants.DRUID_QUERY_JSON, jsonQuery);
+    tbl.setProperty(Constants.DRUID_QUERY_TYPE, queryType);
+    return tbl;
+  }
+
+  private static void deserializeQueryResults(DruidSerDe serDe, String queryType, String jsonQuery,
+          String resultString, Object[][] records) throws SerDeException, JsonParseException,
+          JsonMappingException, IOException, NoSuchFieldException, SecurityException,
+          IllegalArgumentException, IllegalAccessException, InterruptedException,
+          NoSuchMethodException, InvocationTargetException {
+
+    // Initialize
+    Query<?> query = null;
+    DruidQueryRecordReader<?,?> reader = null;
+    List<?> resultsList = null;
+    ObjectMapper mapper = new DefaultObjectMapper();
+    switch (queryType) {
+      case Query.TIMESERIES:
+        query = mapper.readValue(jsonQuery, TimeseriesQuery.class);
+        reader = new DruidTimeseriesQueryRecordReader();
+        resultsList = mapper.readValue(resultString,
+                new TypeReference<List<Result<TimeseriesResultValue>>>() {});
+        break;
+      case Query.TOPN:
+        query = mapper.readValue(jsonQuery, TopNQuery.class);
+        reader = new DruidTopNQueryRecordReader();
+        resultsList = mapper.readValue(resultString,
+                new TypeReference<List<Result<TopNResultValue>>>() {});
+        break;
+      case Query.GROUP_BY:
+        query = mapper.readValue(jsonQuery, GroupByQuery.class);
+        reader = new DruidGroupByQueryRecordReader();
+        resultsList = mapper.readValue(resultString,
+                new TypeReference<List<Row>>() {});
+        break;
+      case Query.SELECT:
+        query = mapper.readValue(jsonQuery, SelectQuery.class);
+        reader = new DruidSelectQueryRecordReader();
+        resultsList = mapper.readValue(resultString,
+                new TypeReference<List<Result<SelectResultValue>>>() {});
+        break;
+    }
+
+    // Set query and fields access
+    Field field1 = DruidQueryRecordReader.class.getDeclaredField("query");
+    field1.setAccessible(true);
+    field1.set(reader, query);
+    if (reader instanceof DruidGroupByQueryRecordReader) {
+      Method method1 = DruidGroupByQueryRecordReader.class.getDeclaredMethod("initExtractors");
+      method1.setAccessible(true);
+      method1.invoke(reader);
+    }
+    Field field2 = DruidQueryRecordReader.class.getDeclaredField("results");
+    field2.setAccessible(true);
+    
+    // Get the row structure
+    StructObjectInspector oi = (StructObjectInspector) serDe.getObjectInspector();
+    List<? extends StructField> fieldRefs = oi.getAllStructFieldRefs();
+
+    // Check mapred
+    Iterator<?> results = resultsList.iterator();
+    field2.set(reader, results);
+    DruidWritable writable = new DruidWritable();
+    int pos = 0;
+    while (reader.next(NullWritable.get(), writable)) {
+      Object row = serDe.deserialize(writable);
+      Object[] expectedFieldsData = records[pos];
+      assertEquals(expectedFieldsData.length, fieldRefs.size());
+      for (int i = 0; i < fieldRefs.size(); i++) {
+        Object fieldData = oi.getStructFieldData(row, fieldRefs.get(i));
+        assertEquals("Field " + i, expectedFieldsData[i], fieldData);
+      }
+      pos++;
+    }
+    assertEquals(pos, records.length);
+
+    // Check mapreduce
+    results = resultsList.iterator();
+    field2.set(reader, results);
+    pos = 0;
+    while (reader.nextKeyValue()) {
+      Object row = serDe.deserialize(reader.getCurrentValue());
+      Object[] expectedFieldsData = records[pos];
+      assertEquals(expectedFieldsData.length, fieldRefs.size());
+      for (int i = 0; i < fieldRefs.size(); i++) {
+        Object fieldData = oi.getStructFieldData(row, fieldRefs.get(i));
+        assertEquals("Field " + i, expectedFieldsData[i], fieldData);
+      }
+      pos++;
+    }
+    assertEquals(pos, records.length);
+  }
+
+}

http://git-wip-us.apache.org/repos/asf/hive/blob/58d1befa/druid-handler/src/test/org/apache/hadoop/hive/druid/TestHiveDruidQueryBasedInputFormat.java
----------------------------------------------------------------------
diff --git a/druid-handler/src/test/org/apache/hadoop/hive/druid/TestHiveDruidQueryBasedInputFormat.java b/druid-handler/src/test/org/apache/hadoop/hive/druid/TestHiveDruidQueryBasedInputFormat.java
new file mode 100644
index 0000000..b20168d
--- /dev/null
+++ b/druid-handler/src/test/org/apache/hadoop/hive/druid/TestHiveDruidQueryBasedInputFormat.java
@@ -0,0 +1,101 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.hadoop.hive.druid;
+
+import java.lang.reflect.Method;
+import java.util.ArrayList;
+import java.util.Arrays;
+import java.util.List;
+
+import org.joda.time.Interval;
+import org.junit.Test;
+
+import junit.framework.TestCase;
+
+public class TestHiveDruidQueryBasedInputFormat extends TestCase {
+  
+  @SuppressWarnings("unchecked")
+  @Test
+  public void testCreateSplitsIntervals() throws Exception {
+    HiveDruidQueryBasedInputFormat input = new HiveDruidQueryBasedInputFormat();
+
+    Method method1 = HiveDruidQueryBasedInputFormat.class.getDeclaredMethod("createSplitsIntervals",
+            List.class, int.class);
+    method1.setAccessible(true);
+
+    List<Interval> intervals;
+    List<List<Interval>> resultList;
+    List<List<Interval>> expectedResultList;
+
+    // Test 1 : single split, create 4
+    intervals = new ArrayList<>();
+    intervals.add(new Interval(1262304000000L, 1293840000000L));
+    resultList = (List<List<Interval>>) method1.invoke(input, intervals, 4);
+    expectedResultList = new ArrayList<>();
+    expectedResultList.add(Arrays.asList(new Interval(1262304000000L, 1270188000000L)));
+    expectedResultList.add(Arrays.asList(new Interval(1270188000000L, 1278072000000L)));
+    expectedResultList.add(Arrays.asList(new Interval(1278072000000L, 1285956000000L)));
+    expectedResultList.add(Arrays.asList(new Interval(1285956000000L, 1293840000000L)));
+    assertEquals(expectedResultList, resultList);
+
+    // Test 2 : two splits, create 4
+    intervals = new ArrayList<>();
+    intervals.add(new Interval(1262304000000L, 1293840000000L));
+    intervals.add(new Interval(1325376000000L, 1356998400000L));
+    resultList = (List<List<Interval>>) method1.invoke(input, intervals, 4);
+    expectedResultList = new ArrayList<>();
+    expectedResultList.add(Arrays.asList(new Interval(1262304000000L, 1278093600000L)));
+    expectedResultList.add(Arrays.asList(new Interval(1278093600000L, 1293840000000L),
+            new Interval(1325376000000L, 1325419200000L)));
+    expectedResultList.add(Arrays.asList(new Interval(1325419200000L, 1341208800000L)));
+    expectedResultList.add(Arrays.asList(new Interval(1341208800000L, 1356998400000L)));
+    assertEquals(expectedResultList, resultList);
+
+    // Test 3 : two splits, create 5
+    intervals = new ArrayList<>();
+    intervals.add(new Interval(1262304000000L, 1293840000000L));
+    intervals.add(new Interval(1325376000000L, 1356998400000L));
+    resultList = (List<List<Interval>>) method1.invoke(input, intervals, 5);
+    expectedResultList = new ArrayList<>();
+    expectedResultList.add(Arrays.asList(new Interval(1262304000000L, 1274935680000L)));
+    expectedResultList.add(Arrays.asList(new Interval(1274935680000L, 1287567360000L)));
+    expectedResultList.add(Arrays.asList(new Interval(1287567360000L, 1293840000000L),
+            new Interval(1325376000000L, 1331735040000L)));
+    expectedResultList.add(Arrays.asList(new Interval(1331735040000L, 1344366720000L)));
+    expectedResultList.add(Arrays.asList(new Interval(1344366720000L, 1356998400000L)));
+    assertEquals(expectedResultList, resultList);
+
+    // Test 4 : three splits, different ranges, create 6
+    intervals = new ArrayList<>();
+    intervals.add(new Interval(1199145600000L, 1201824000000L)); // one month
+    intervals.add(new Interval(1325376000000L, 1356998400000L)); // one year
+    intervals.add(new Interval(1407283200000L, 1407888000000L)); // 7 days
+    resultList = (List<List<Interval>>) method1.invoke(input, intervals, 6);
+    expectedResultList = new ArrayList<>();
+    expectedResultList.add(Arrays.asList(new Interval(1199145600000L, 1201824000000L),
+            new Interval(1325376000000L, 1328515200000L)));
+    expectedResultList.add(Arrays.asList(new Interval(1328515200000L, 1334332800000L)));
+    expectedResultList.add(Arrays.asList(new Interval(1334332800000L, 1340150400000L)));
+    expectedResultList.add(Arrays.asList(new Interval(1340150400000L, 1345968000000L)));
+    expectedResultList.add(Arrays.asList(new Interval(1345968000000L, 1351785600000L)));
+    expectedResultList.add(Arrays.asList(new Interval(1351785600000L, 1356998400000L),
+            new Interval(1407283200000L, 1407888000000L)));
+    assertEquals(expectedResultList, resultList);
+  }
+
+}

http://git-wip-us.apache.org/repos/asf/hive/blob/58d1befa/itests/qtest/pom.xml
----------------------------------------------------------------------
diff --git a/itests/qtest/pom.xml b/itests/qtest/pom.xml
index 7fc72b9..e762d0e 100644
--- a/itests/qtest/pom.xml
+++ b/itests/qtest/pom.xml
@@ -110,6 +110,19 @@
       <version>${hadoop.version}</version>
       <optional>true</optional>
     </dependency>
+    <dependency>
+      <groupId>org.apache.hive</groupId>
+      <artifactId>hive-druid-handler</artifactId>
+      <version>${project.version}</version>
+      <scope>test</scope>
+    </dependency>
+    <dependency>
+      <groupId>org.apache.hive</groupId>
+      <artifactId>hive-druid-handler</artifactId>
+      <version>${project.version}</version>
+      <classifier>tests</classifier>
+      <scope>test</scope>
+    </dependency>
 
     <!-- test inter-project -->
     <dependency>

http://git-wip-us.apache.org/repos/asf/hive/blob/58d1befa/packaging/pom.xml
----------------------------------------------------------------------
diff --git a/packaging/pom.xml b/packaging/pom.xml
index 679dfe8..76e0cff 100644
--- a/packaging/pom.xml
+++ b/packaging/pom.xml
@@ -210,6 +210,11 @@
     </dependency>
     <dependency>
       <groupId>org.apache.hive</groupId>
+      <artifactId>hive-druid-handler</artifactId>
+      <version>${project.version}</version>
+    </dependency>
+    <dependency>
+      <groupId>org.apache.hive</groupId>
       <artifactId>hive-hwi</artifactId>
       <version>${project.version}</version>
     </dependency>

http://git-wip-us.apache.org/repos/asf/hive/blob/58d1befa/pom.xml
----------------------------------------------------------------------
diff --git a/pom.xml b/pom.xml
index 4c41200..2fb78cd 100644
--- a/pom.xml
+++ b/pom.xml
@@ -37,6 +37,7 @@
     <module>cli</module>
     <module>common</module>
     <module>contrib</module>
+    <module>druid-handler</module>
     <module>hbase-handler</module>
     <module>hcatalog</module>
     <module>hplsql</module>
@@ -130,6 +131,7 @@
     <derby.version>10.10.2.0</derby.version>
     <dropwizard.version>3.1.0</dropwizard.version>
     <dropwizard-metrics-hadoop-metrics2-reporter.version>0.1.2</dropwizard-metrics-hadoop-metrics2-reporter.version>
+    <druid.version>0.9.1.1</druid.version>
     <guava.version>14.0.1</guava.version>
     <groovy.version>2.4.4</groovy.version>
     <hadoop.version>2.7.2</hadoop.version>

http://git-wip-us.apache.org/repos/asf/hive/blob/58d1befa/ql/src/java/org/apache/hadoop/hive/ql/exec/DDLTask.java
----------------------------------------------------------------------
diff --git a/ql/src/java/org/apache/hadoop/hive/ql/exec/DDLTask.java b/ql/src/java/org/apache/hadoop/hive/ql/exec/DDLTask.java
index 5722305..66cbdd3 100644
--- a/ql/src/java/org/apache/hadoop/hive/ql/exec/DDLTask.java
+++ b/ql/src/java/org/apache/hadoop/hive/ql/exec/DDLTask.java
@@ -60,6 +60,7 @@ import org.apache.hadoop.fs.Path;
 import org.apache.hadoop.hive.common.FileUtils;
 import org.apache.hadoop.hive.common.StatsSetupConst;
 import org.apache.hadoop.hive.common.type.HiveDecimal;
+import org.apache.hadoop.hive.conf.Constants;
 import org.apache.hadoop.hive.conf.HiveConf;
 import org.apache.hadoop.hive.conf.HiveConf.ConfVars;
 import org.apache.hadoop.hive.io.HdfsUtils;
@@ -4468,12 +4469,13 @@ public class DDLTask extends Task<DDLWork> implements Serializable {
   }
 
   public static boolean doesTableNeedLocation(Table tbl) {
-    // If we are ok with breaking compatibility of existing 3rd party StorageHandlers,
+    // TODO: If we are ok with breaking compatibility of existing 3rd party StorageHandlers,
     // this method could be moved to the HiveStorageHandler interface.
     boolean retval = true;
     if (tbl.getStorageHandler() != null) {
-      retval = !tbl.getStorageHandler().toString().equals(
-          "org.apache.hadoop.hive.hbase.HBaseStorageHandler");
+      String sh = tbl.getStorageHandler().toString();
+      retval = !sh.equals("org.apache.hadoop.hive.hbase.HBaseStorageHandler")
+              && !sh.equals(Constants.DRUID_HIVE_STORAGE_HANDLER_ID);
     }
     return retval;
   }

http://git-wip-us.apache.org/repos/asf/hive/blob/58d1befa/ql/src/java/org/apache/hadoop/hive/ql/exec/FunctionRegistry.java
----------------------------------------------------------------------
diff --git a/ql/src/java/org/apache/hadoop/hive/ql/exec/FunctionRegistry.java b/ql/src/java/org/apache/hadoop/hive/ql/exec/FunctionRegistry.java
index 60646ba..4710b8f 100644
--- a/ql/src/java/org/apache/hadoop/hive/ql/exec/FunctionRegistry.java
+++ b/ql/src/java/org/apache/hadoop/hive/ql/exec/FunctionRegistry.java
@@ -31,14 +31,20 @@ import java.util.TreeSet;
 import java.util.regex.Pattern;
 import java.util.regex.PatternSyntaxException;
 
-import org.slf4j.Logger;
-import org.slf4j.LoggerFactory;
 import org.apache.hadoop.hive.ql.exec.FunctionInfo.FunctionResource;
 import org.apache.hadoop.hive.ql.metadata.HiveException;
 import org.apache.hadoop.hive.ql.parse.SemanticException;
 import org.apache.hadoop.hive.ql.plan.ExprNodeDesc;
 import org.apache.hadoop.hive.ql.plan.ExprNodeGenericFuncDesc;
 import org.apache.hadoop.hive.ql.session.SessionState;
+import org.apache.hadoop.hive.ql.udf.UDFDateFloorDay;
+import org.apache.hadoop.hive.ql.udf.UDFDateFloorHour;
+import org.apache.hadoop.hive.ql.udf.UDFDateFloorMinute;
+import org.apache.hadoop.hive.ql.udf.UDFDateFloorMonth;
+import org.apache.hadoop.hive.ql.udf.UDFDateFloorQuarter;
+import org.apache.hadoop.hive.ql.udf.UDFDateFloorSecond;
+import org.apache.hadoop.hive.ql.udf.UDFDateFloorWeek;
+import org.apache.hadoop.hive.ql.udf.UDFDateFloorYear;
 import org.apache.hadoop.hive.ql.udf.SettableUDF;
 import org.apache.hadoop.hive.ql.udf.UDAFPercentile;
 import org.apache.hadoop.hive.ql.udf.UDFAcos;
@@ -141,6 +147,8 @@ import org.apache.hadoop.hive.serde2.typeinfo.TypeInfoFactory;
 import org.apache.hadoop.hive.serde2.typeinfo.TypeInfoUtils;
 import org.apache.hadoop.util.ReflectionUtils;
 import org.apache.hive.common.util.AnnotationUtils;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
 
 /**
  * FunctionRegistry.
@@ -289,6 +297,16 @@ public final class FunctionRegistry {
     system.registerGenericUDF("trunc", GenericUDFTrunc.class);
     system.registerGenericUDF("date_format", GenericUDFDateFormat.class);
 
+    // Special date formatting functions
+    system.registerUDF("floor_year", UDFDateFloorYear.class, false);
+    system.registerUDF("floor_quarter", UDFDateFloorQuarter.class, false);
+    system.registerUDF("floor_month", UDFDateFloorMonth.class, false);
+    system.registerUDF("floor_day", UDFDateFloorDay.class, false);
+    system.registerUDF("floor_week", UDFDateFloorWeek.class, false);
+    system.registerUDF("floor_hour", UDFDateFloorHour.class, false);
+    system.registerUDF("floor_minute", UDFDateFloorMinute.class, false);
+    system.registerUDF("floor_second", UDFDateFloorSecond.class, false);
+
     system.registerGenericUDF("date_add", GenericUDFDateAdd.class);
     system.registerGenericUDF("date_sub", GenericUDFDateSub.class);
     system.registerGenericUDF("datediff", GenericUDFDateDiff.class);

http://git-wip-us.apache.org/repos/asf/hive/blob/58d1befa/ql/src/java/org/apache/hadoop/hive/ql/optimizer/calcite/HivePlannerContext.java
----------------------------------------------------------------------
diff --git a/ql/src/java/org/apache/hadoop/hive/ql/optimizer/calcite/HivePlannerContext.java b/ql/src/java/org/apache/hadoop/hive/ql/optimizer/calcite/HivePlannerContext.java
index aeb4e7d..890aea1 100644
--- a/ql/src/java/org/apache/hadoop/hive/ql/optimizer/calcite/HivePlannerContext.java
+++ b/ql/src/java/org/apache/hadoop/hive/ql/optimizer/calcite/HivePlannerContext.java
@@ -19,21 +19,28 @@ package org.apache.hadoop.hive.ql.optimizer.calcite;
 
 import org.apache.calcite.plan.Context;
 import org.apache.hadoop.hive.ql.optimizer.calcite.cost.HiveAlgorithmsConf;
+import org.apache.hadoop.hive.ql.optimizer.calcite.druid.HiveDruidConf;
 import org.apache.hadoop.hive.ql.optimizer.calcite.rules.HiveRulesRegistry;
 
 
 public class HivePlannerContext implements Context {
-  private HiveAlgorithmsConf config;
+  private HiveAlgorithmsConf algoConfig;
+  private HiveDruidConf druidConf;
   private HiveRulesRegistry registry;
 
-  public HivePlannerContext(HiveAlgorithmsConf config, HiveRulesRegistry registry) {
-    this.config = config;
+  public HivePlannerContext(HiveAlgorithmsConf algoConfig, HiveDruidConf druidConf,
+          HiveRulesRegistry registry) {
+    this.algoConfig = algoConfig;
+    this.druidConf = druidConf;
     this.registry = registry;
   }
 
   public <T> T unwrap(Class<T> clazz) {
-    if (clazz.isInstance(config)) {
-      return clazz.cast(config);
+    if (clazz.isInstance(algoConfig)) {
+      return clazz.cast(algoConfig);
+    }
+    if (clazz.isInstance(druidConf)) {
+      return clazz.cast(druidConf);
     }
     if (clazz.isInstance(registry)) {
       return clazz.cast(registry);

http://git-wip-us.apache.org/repos/asf/hive/blob/58d1befa/ql/src/java/org/apache/hadoop/hive/ql/optimizer/calcite/druid/DruidIntervalUtils.java
----------------------------------------------------------------------
diff --git a/ql/src/java/org/apache/hadoop/hive/ql/optimizer/calcite/druid/DruidIntervalUtils.java b/ql/src/java/org/apache/hadoop/hive/ql/optimizer/calcite/druid/DruidIntervalUtils.java
new file mode 100644
index 0000000..82ab4d7
--- /dev/null
+++ b/ql/src/java/org/apache/hadoop/hive/ql/optimizer/calcite/druid/DruidIntervalUtils.java
@@ -0,0 +1,466 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.hadoop.hive.ql.optimizer.calcite.druid;
+
+import java.sql.Timestamp;
+import java.text.DateFormat;
+import java.text.ParseException;
+import java.util.ArrayList;
+import java.util.Arrays;
+import java.util.Comparator;
+import java.util.Date;
+import java.util.Iterator;
+import java.util.List;
+import java.util.TreeSet;
+
+import org.apache.calcite.rel.type.RelDataType;
+import org.apache.calcite.rex.RexCall;
+import org.apache.calcite.rex.RexInputRef;
+import org.apache.calcite.rex.RexLiteral;
+import org.apache.calcite.rex.RexNode;
+import org.apache.calcite.sql.SqlKind;
+import org.apache.commons.lang.StringUtils;
+import org.joda.time.Interval;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import com.google.common.base.Function;
+import com.google.common.collect.BoundType;
+import com.google.common.collect.Lists;
+import com.google.common.collect.Range;
+import com.google.common.collect.Sets;
+
+/** 
+ * Utilities for generating intervals from RexNode.
+ * 
+ * Based on Navis logic implemented on Hive data structures.
+ * See <a href="https://github.com/druid-io/druid/pull/2880">Druid PR-2880</a>
+ * 
+ */
+@SuppressWarnings({"rawtypes","unchecked"})
+public class DruidIntervalUtils {
+
+  protected static final Logger LOG = LoggerFactory.getLogger(DruidIntervalUtils.class);
+
+
+  /**
+   * Given a list of predicates, it generates the equivalent Interval
+   * (if possible). It assumes that all the predicates in the input
+   * reference a single column : the timestamp column.
+   * 
+   * @param conjs list of conditions to use for the transformation
+   * @return interval representing the conditions in the input list
+   */
+  public static List<Interval> createInterval(RelDataType type, List<RexNode> conjs) {
+    List<Range> ranges = new ArrayList<>();
+    for (RexNode child : conjs) {
+      List<Range> extractedRanges = extractRanges(type, child, false);
+      if (extractedRanges == null || extractedRanges.isEmpty()) {
+        // We could not extract, we bail out
+        return null;
+      }
+      if (ranges.isEmpty()) {
+        ranges.addAll(extractedRanges);
+        continue;
+      }
+      List<Range> overlapped = Lists.newArrayList();
+      for (Range current : ranges) {
+        for (Range interval : extractedRanges) {
+          if (current.isConnected(interval)) {
+            overlapped.add(current.intersection(interval));
+          }
+        }
+      }
+      ranges = overlapped;
+    }
+    List<Range> compactRanges = condenseRanges(ranges);
+    LOG.debug("Inferred ranges on interval : " + compactRanges);
+    return toInterval(compactRanges);
+  }
+
+  protected static List<Interval> toInterval(List<Range> ranges) {
+    List<Interval> intervals = Lists.transform(ranges, new Function<Range, Interval>() {
+      @Override
+      public Interval apply(Range range) {
+        if (!range.hasLowerBound() && !range.hasUpperBound()) {
+          return DruidTable.DEFAULT_INTERVAL;
+        }
+        long start = range.hasLowerBound() ? toLong(range.lowerEndpoint()) :
+          DruidTable.DEFAULT_INTERVAL.getStartMillis();
+        long end = range.hasUpperBound() ? toLong(range.upperEndpoint()) :
+          DruidTable.DEFAULT_INTERVAL.getEndMillis();
+        if (range.hasLowerBound() && range.lowerBoundType() == BoundType.OPEN) {
+          start++;
+        }
+        if (range.hasUpperBound() && range.upperBoundType() == BoundType.CLOSED) {
+          end++;
+        }
+        return new Interval(start, end);
+      }
+    });
+    LOG.info("Converted time ranges " + ranges + " to interval " + intervals);
+    return intervals;
+  }
+
+  protected static List<Range> extractRanges(RelDataType type, RexNode node,
+          boolean withNot) {
+    switch (node.getKind()) {
+      case EQUALS:
+      case LESS_THAN:
+      case LESS_THAN_OR_EQUAL:
+      case GREATER_THAN:
+      case GREATER_THAN_OR_EQUAL:
+      case BETWEEN:
+      case IN:
+        return leafToRanges(type, (RexCall) node, withNot);
+
+      case NOT:
+        return extractRanges(type, ((RexCall) node).getOperands().get(0), !withNot);
+
+      case OR:
+        RexCall call = (RexCall) node;
+        List<Range> intervals = Lists.newArrayList();
+        for (RexNode child : call.getOperands()) {
+          List<Range> extracted = extractRanges(type, child, withNot);
+          if (extracted != null) {
+            intervals.addAll(extracted);
+          }
+        }
+        return intervals;
+
+      default:
+        return null;
+    }
+  }
+
+  protected static List<Range> leafToRanges(RelDataType type, RexCall call,
+          boolean withNot) {
+    switch (call.getKind()) {
+      case EQUALS:
+      case LESS_THAN:
+      case LESS_THAN_OR_EQUAL:
+      case GREATER_THAN:
+      case GREATER_THAN_OR_EQUAL:
+      {
+        RexLiteral literal = null;
+        if (call.getOperands().get(0) instanceof RexInputRef &&
+                call.getOperands().get(1) instanceof RexLiteral) {
+          literal = extractLiteral(call.getOperands().get(1));
+        } else if (call.getOperands().get(0) instanceof RexInputRef &&
+                call.getOperands().get(1).getKind() == SqlKind.CAST) {
+          literal = extractLiteral(call.getOperands().get(1));
+        } else if (call.getOperands().get(1) instanceof RexInputRef &&
+                call.getOperands().get(0) instanceof RexLiteral) {
+          literal = extractLiteral(call.getOperands().get(0));
+        } else if (call.getOperands().get(1) instanceof RexInputRef &&
+                call.getOperands().get(0).getKind() == SqlKind.CAST) {
+          literal = extractLiteral(call.getOperands().get(0));
+        }
+        if (literal == null) {
+          return null;
+        }
+        Comparable value = literalToType(literal, type);
+        if (value == null) {
+          return null;
+        }
+        if (call.getKind() == SqlKind.LESS_THAN) {
+          return Arrays.<Range> asList(withNot ? Range.atLeast(value) : Range.lessThan(value));
+        } else if (call.getKind() == SqlKind.LESS_THAN_OR_EQUAL) {
+          return Arrays.<Range> asList(withNot ? Range.greaterThan(value) : Range.atMost(value));
+        } else if (call.getKind() == SqlKind.GREATER_THAN) {
+          return Arrays.<Range> asList(withNot ? Range.atMost(value) : Range.greaterThan(value));
+        } else if (call.getKind() == SqlKind.GREATER_THAN_OR_EQUAL) {
+          return Arrays.<Range> asList(withNot ? Range.lessThan(value) : Range.atLeast(value));
+        } else { //EQUALS
+          if (!withNot) {
+            return Arrays.<Range> asList(Range.closed(value, value));
+          }
+          return Arrays.<Range> asList(Range.lessThan(value), Range.greaterThan(value));
+        }
+      }
+      case BETWEEN:
+      {
+        RexLiteral literal1 = extractLiteral(call.getOperands().get(2));
+        if (literal1 == null) {
+          return null;
+        }
+        RexLiteral literal2 = extractLiteral(call.getOperands().get(3));
+        if (literal2 == null) {
+          return null;
+        }
+        Comparable value1 = literalToType(literal1, type);
+        Comparable value2 = literalToType(literal2, type);
+        if (value1 == null || value2 == null) {
+          return null;
+        }
+        boolean inverted = value1.compareTo(value2) > 0;
+        if (!withNot) {
+          return Arrays.<Range> asList(
+                  inverted ? Range.closed(value2, value1) : Range.closed(value1, value2));
+        }
+        return Arrays.<Range> asList(Range.lessThan(inverted ? value2 : value1),
+                Range.greaterThan(inverted ? value1 : value2));
+      }
+      case IN:
+      {
+        List<Range> ranges = Lists.newArrayList();
+        for (int i = 1; i < call.getOperands().size(); i++) {
+          RexLiteral literal = extractLiteral(call.getOperands().get(i));
+          if (literal == null) {
+            return null;
+          }
+          Comparable element = literalToType(literal, type);
+          if (element == null) {
+            return null;
+          }
+          if (withNot) {
+            ranges.addAll(
+                    Arrays.<Range> asList(Range.lessThan(element), Range.greaterThan(element)));
+          } else {
+            ranges.add(Range.closed(element, element));
+          }
+        }
+        return ranges;
+      }
+      default:
+        return null;
+    }
+  }
+
+  @SuppressWarnings("incomplete-switch")
+  protected static Comparable literalToType(RexLiteral literal, RelDataType type) {
+    // Extract
+    Object value = null;
+    switch (literal.getType().getSqlTypeName()) {
+      case DATE:
+      case TIME:
+      case TIMESTAMP:
+      case INTERVAL_YEAR_MONTH:
+      case INTERVAL_DAY_TIME:
+        value = literal.getValue();
+        break;
+      case TINYINT:
+      case SMALLINT:
+      case INTEGER:
+      case BIGINT:
+      case DOUBLE:
+      case DECIMAL:
+      case FLOAT:
+      case REAL:
+      case VARCHAR:
+      case CHAR:
+      case BOOLEAN:
+        value = literal.getValue3();
+    }
+    if (value == null) {
+      return null;
+    }
+
+    // Convert
+    switch (type.getSqlTypeName()) {
+      case BIGINT:
+        return toLong(value);
+      case INTEGER:
+        return toInt(value);
+      case FLOAT:
+        return toFloat(value);
+      case DOUBLE:
+        return toDouble(value);
+      case VARCHAR:
+      case CHAR:
+        return String.valueOf(value);
+      case TIMESTAMP:
+        return toTimestamp(value);
+    }
+    return null;
+  }
+
+  private static RexLiteral extractLiteral(RexNode node) {
+    RexNode target = node;
+    if (node.getKind() == SqlKind.CAST) {
+      target = ((RexCall)node).getOperands().get(0);
+    }
+    if (!(target instanceof RexLiteral)) {
+      return null;
+    }
+    return (RexLiteral) target;
+  }
+
+  private static Comparable toTimestamp(Object literal) {
+    if (literal instanceof Timestamp) {
+      return (Timestamp) literal;
+    }
+    if (literal instanceof Date) {
+      return new Timestamp(((Date) literal).getTime());
+    }
+    if (literal instanceof Number) {
+      return new Timestamp(((Number) literal).longValue());
+    }
+    if (literal instanceof String) {
+      String string = (String) literal;
+      if (StringUtils.isNumeric(string)) {
+        return new Timestamp(Long.valueOf(string));
+      }
+      try {
+        return Timestamp.valueOf(string);
+      } catch (NumberFormatException e) {
+        // ignore
+      }
+    }
+    return null;
+  }
+
+  private static Long toLong(Object literal) {
+    if (literal instanceof Number) {
+      return ((Number) literal).longValue();
+    }
+    if (literal instanceof Date) {
+      return ((Date) literal).getTime();
+    }
+    if (literal instanceof Timestamp) {
+      return ((Timestamp) literal).getTime();
+    }
+    if (literal instanceof String) {
+      try {
+        return Long.valueOf((String) literal);
+      } catch (NumberFormatException e) {
+        // ignore
+      }
+      try {
+        return DateFormat.getDateInstance().parse((String) literal).getTime();
+      } catch (ParseException e) {
+        // best effort. ignore
+      }
+    }
+    return null;
+  }
+
+
+  private static Integer toInt(Object literal) {
+    if (literal instanceof Number) {
+      return ((Number) literal).intValue();
+    }
+    if (literal instanceof String) {
+      try {
+        return Integer.valueOf((String) literal);
+      } catch (NumberFormatException e) {
+        // ignore
+      }
+    }
+    return null;
+  }
+
+  private static Float toFloat(Object literal) {
+    if (literal instanceof Number) {
+      return ((Number) literal).floatValue();
+    }
+    if (literal instanceof String) {
+      try {
+        return Float.valueOf((String) literal);
+      } catch (NumberFormatException e) {
+        // ignore
+      }
+    }
+    return null;
+  }
+
+  private static Double toDouble(Object literal) {
+    if (literal instanceof Number) {
+      return ((Number) literal).doubleValue();
+    }
+    if (literal instanceof String) {
+      try {
+        return Double.valueOf((String) literal);
+      } catch (NumberFormatException e) {
+        // ignore
+      }
+    }
+    return null;
+  }
+
+  protected static List<Range> condenseRanges(List<Range> ranges) {
+    if (ranges.size() <= 1) {
+      return ranges;
+    }
+
+    Comparator<Range> startThenEnd = new Comparator<Range>() {
+      @Override
+      public int compare(Range lhs, Range rhs) {
+        int compare = 0;
+        if (lhs.hasLowerBound() && rhs.hasLowerBound()) {
+          compare = lhs.lowerEndpoint().compareTo(rhs.lowerEndpoint());
+        } else if (!lhs.hasLowerBound() && rhs.hasLowerBound()) {
+          compare = -1;
+        } else if (lhs.hasLowerBound() && !rhs.hasLowerBound()) {
+          compare = 1;
+        }
+        if (compare != 0) {
+          return compare;
+        }
+        if (lhs.hasUpperBound() && rhs.hasUpperBound()) {
+          compare = lhs.upperEndpoint().compareTo(rhs.upperEndpoint());
+        } else if (!lhs.hasUpperBound() && rhs.hasUpperBound()) {
+          compare = -1;
+        } else if (lhs.hasUpperBound() && !rhs.hasUpperBound()) {
+          compare = 1;
+        }
+        return compare;
+      }
+    };
+
+    TreeSet<Range> sortedIntervals = Sets.newTreeSet(startThenEnd);
+    sortedIntervals.addAll(ranges);
+
+    List<Range> retVal = Lists.newArrayList();
+
+    Iterator<Range> intervalsIter = sortedIntervals.iterator();
+    Range currInterval = intervalsIter.next();
+    while (intervalsIter.hasNext()) {
+      Range next = intervalsIter.next();
+      if (currInterval.encloses(next)) {
+        continue;
+      }
+      if (mergeable(currInterval, next)) {
+        currInterval = currInterval.span(next);
+      } else {
+        retVal.add(currInterval);
+        currInterval = next;
+      }
+    }
+    retVal.add(currInterval);
+
+    return retVal;
+  }
+
+  protected static boolean mergeable(Range range1, Range range2) {
+    Comparable x1 = range1.upperEndpoint();
+    Comparable x2 = range2.lowerEndpoint();
+    int compare = x1.compareTo(x2);
+    return compare > 0 || (compare == 0 && range1.upperBoundType() == BoundType.CLOSED
+            && range2.lowerBoundType() == BoundType.CLOSED);
+  }
+
+  public static long extractTotalTime(List<Interval> intervals) {
+    long totalTime = 0;
+    for (Interval interval : intervals) {
+      totalTime += (interval.getEndMillis() - interval.getStartMillis());
+    }
+    return totalTime;
+  }
+
+}