You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@hive.apache.org by jc...@apache.org on 2016/09/08 08:46:01 UTC

[6/6] hive git commit: HIVE-14217: Druid integration (Jesus Camacho Rodriguez, reviewed by Ashutosh Chauhan)

HIVE-14217: Druid integration (Jesus Camacho Rodriguez, reviewed by Ashutosh Chauhan)

Close apache/hive#98


Project: http://git-wip-us.apache.org/repos/asf/hive/repo
Commit: http://git-wip-us.apache.org/repos/asf/hive/commit/58d1befa
Tree: http://git-wip-us.apache.org/repos/asf/hive/tree/58d1befa
Diff: http://git-wip-us.apache.org/repos/asf/hive/diff/58d1befa

Branch: refs/heads/master
Commit: 58d1befa2131254b53122b3573189ac1c5022217
Parents: 63fdb51
Author: Jesus Camacho Rodriguez <jc...@apache.org>
Authored: Fri Aug 12 12:55:46 2016 +0100
Committer: Jesus Camacho Rodriguez <jc...@apache.org>
Committed: Thu Sep 8 09:42:26 2016 +0100

----------------------------------------------------------------------
 .../org/apache/hadoop/hive/conf/Constants.java  |   10 +-
 .../org/apache/hadoop/hive/conf/HiveConf.java   |   11 +
 druid-handler/pom.xml                           |  201 ++++
 .../hadoop/hive/druid/DruidStorageHandler.java  |  109 ++
 .../hive/druid/DruidStorageHandlerUtils.java    |   90 ++
 .../hive/druid/HiveDruidOutputFormat.java       |   55 +
 .../druid/HiveDruidQueryBasedInputFormat.java   |  369 ++++++
 .../hadoop/hive/druid/HiveDruidSplit.java       |   83 ++
 .../serde/DruidGroupByQueryRecordReader.java    |  199 ++++
 .../druid/serde/DruidQueryRecordReader.java     |  142 +++
 .../serde/DruidSelectQueryRecordReader.java     |  106 ++
 .../hadoop/hive/druid/serde/DruidSerDe.java     |  343 ++++++
 .../hive/druid/serde/DruidSerDeUtils.java       |   83 ++
 .../serde/DruidTimeseriesQueryRecordReader.java |   93 ++
 .../druid/serde/DruidTopNQueryRecordReader.java |  106 ++
 .../hadoop/hive/druid/serde/DruidWritable.java  |   81 ++
 .../hadoop/hive/druid/QTestDruidSerDe.java      |   88 ++
 .../hive/druid/QTestDruidStorageHandler.java    |   34 +
 .../hadoop/hive/druid/TestDruidSerDe.java       |  576 ++++++++++
 .../TestHiveDruidQueryBasedInputFormat.java     |  101 ++
 itests/qtest/pom.xml                            |   13 +
 packaging/pom.xml                               |    5 +
 pom.xml                                         |    2 +
 .../org/apache/hadoop/hive/ql/exec/DDLTask.java |    8 +-
 .../hadoop/hive/ql/exec/FunctionRegistry.java   |   22 +-
 .../optimizer/calcite/HivePlannerContext.java   |   17 +-
 .../calcite/druid/DruidIntervalUtils.java       |  466 ++++++++
 .../ql/optimizer/calcite/druid/DruidQuery.java  | 1053 ++++++++++++++++++
 .../optimizer/calcite/druid/DruidQueryType.java |   42 +
 .../ql/optimizer/calcite/druid/DruidRules.java  |  591 ++++++++++
 .../ql/optimizer/calcite/druid/DruidSchema.java |   51 +
 .../ql/optimizer/calcite/druid/DruidTable.java  |  121 ++
 .../optimizer/calcite/druid/HiveDruidConf.java  |   33 +
 .../functions/HiveSqlCountAggFunction.java      |    2 +-
 .../functions/HiveSqlMinMaxAggFunction.java     |    2 +-
 .../functions/HiveSqlSumAggFunction.java        |    2 +-
 .../reloperators/HiveDateGranularity.java       |   54 +
 .../rules/HiveProjectSortTransposeRule.java     |    5 +
 .../rules/HiveSortProjectTransposeRule.java     |    5 +
 .../calcite/translator/ASTBuilder.java          |   38 +-
 .../calcite/translator/ASTConverter.java        |    9 +-
 .../translator/SqlFunctionConverter.java        |   23 +-
 .../hadoop/hive/ql/parse/CalcitePlanner.java    |  119 +-
 .../hadoop/hive/ql/parse/SemanticAnalyzer.java  |   22 +-
 .../hadoop/hive/ql/plan/CreateTableDesc.java    |    8 +-
 .../hadoop/hive/ql/plan/TableScanDesc.java      |    7 +
 .../apache/hadoop/hive/ql/udf/UDFDateFloor.java |  506 +++++++++
 .../hadoop/hive/ql/udf/UDFDateFloorDay.java     |   39 +
 .../hadoop/hive/ql/udf/UDFDateFloorHour.java    |   39 +
 .../hadoop/hive/ql/udf/UDFDateFloorMinute.java  |   39 +
 .../hadoop/hive/ql/udf/UDFDateFloorMonth.java   |   39 +
 .../hadoop/hive/ql/udf/UDFDateFloorQuarter.java |   39 +
 .../hadoop/hive/ql/udf/UDFDateFloorSecond.java  |   39 +
 .../hadoop/hive/ql/udf/UDFDateFloorWeek.java    |   39 +
 .../hadoop/hive/ql/udf/UDFDateFloorYear.java    |   39 +
 .../calcite/TestCBORuleFiredOnlyOnce.java       |    2 +-
 .../ql/udf/TestUDFDateFormatGranularity.java    |   85 ++
 .../test/queries/clientnegative/druid_address.q |    5 +
 .../test/queries/clientnegative/druid_buckets.q |    6 +
 .../queries/clientnegative/druid_datasource.q   |    3 +
 .../queries/clientnegative/druid_external.q     |    5 +
 .../queries/clientnegative/druid_location.q     |    6 +
 .../queries/clientnegative/druid_partitions.q   |    6 +
 .../test/queries/clientpositive/druid_basic1.q  |   18 +
 .../test/queries/clientpositive/druid_basic2.q  |   52 +
 .../queries/clientpositive/druid_intervals.q    |   67 ++
 .../queries/clientpositive/druid_timeseries.q   |   94 ++
 ql/src/test/queries/clientpositive/druid_topn.q |   75 ++
 .../results/clientnegative/druid_address.q.out  |    7 +
 .../results/clientnegative/druid_buckets.q.out  |    8 +
 .../clientnegative/druid_datasource.q.out       |    7 +
 .../results/clientnegative/druid_external.q.out |    7 +
 .../results/clientnegative/druid_location.q.out |    9 +
 .../clientnegative/druid_partitions.q.out       |    8 +
 .../results/clientpositive/create_view.q.out    |    2 +
 .../results/clientpositive/druid_basic1.q.out   |  142 +++
 .../results/clientpositive/druid_basic2.q.out   |  533 +++++++++
 .../clientpositive/druid_intervals.q.out        |  398 +++++++
 .../clientpositive/druid_timeseries.q.out       |  566 ++++++++++
 .../results/clientpositive/druid_topn.q.out     |  419 +++++++
 .../results/clientpositive/explain_ddl.q.out    |    2 +
 .../clientpositive/explain_logical.q.out        |   16 +
 .../test/results/clientpositive/join_view.q.out |    4 +
 .../clientpositive/llap/explainuser_1.q.out     |    2 +-
 .../test/results/clientpositive/masking_2.q.out |   14 +
 .../test/results/clientpositive/masking_6.q.out |    8 +
 .../test/results/clientpositive/masking_7.q.out |    8 +
 .../clientpositive/serde_user_properties.q.out  |    4 +
 .../results/clientpositive/show_functions.q.out |    9 +
 .../clientpositive/spark/join_view.q.out        |    4 +
 .../results/clientpositive/subquery_notin.q.out |    6 +
 .../results/clientpositive/subquery_views.q.out |    4 +
 92 files changed, 8969 insertions(+), 60 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/hive/blob/58d1befa/common/src/java/org/apache/hadoop/hive/conf/Constants.java
----------------------------------------------------------------------
diff --git a/common/src/java/org/apache/hadoop/hive/conf/Constants.java b/common/src/java/org/apache/hadoop/hive/conf/Constants.java
index 00ec8c0..77c6aa5 100644
--- a/common/src/java/org/apache/hadoop/hive/conf/Constants.java
+++ b/common/src/java/org/apache/hadoop/hive/conf/Constants.java
@@ -15,8 +15,6 @@
  * See the License for the specific language governing permissions and
  * limitations under the License.
  */
-
-
 package org.apache.hadoop.hive.conf;
 
 public class Constants {
@@ -24,4 +22,12 @@ public class Constants {
   public static final String LLAP_LOGGER_NAME_QUERY_ROUTING = "query-routing";
   public static final String LLAP_LOGGER_NAME_CONSOLE = "console";
   public static final String LLAP_LOGGER_NAME_RFA = "RFA";
+
+  /* Constants for Druid storage handler */
+  public static final String DRUID_HIVE_STORAGE_HANDLER_ID =
+          "org.apache.hadoop.hive.druid.DruidStorageHandler";
+  public static final String DRUID_DATA_SOURCE = "druid.datasource";
+  public static final String DRUID_QUERY_JSON = "druid.query.json";
+  public static final String DRUID_QUERY_TYPE = "druid.query.type";
+  public static final String DRUID_QUERY_FETCH = "druid.query.fetch";
 }

http://git-wip-us.apache.org/repos/asf/hive/blob/58d1befa/common/src/java/org/apache/hadoop/hive/conf/HiveConf.java
----------------------------------------------------------------------
diff --git a/common/src/java/org/apache/hadoop/hive/conf/HiveConf.java b/common/src/java/org/apache/hadoop/hive/conf/HiveConf.java
index 13cfdf1..d6944ee 100644
--- a/common/src/java/org/apache/hadoop/hive/conf/HiveConf.java
+++ b/common/src/java/org/apache/hadoop/hive/conf/HiveConf.java
@@ -1884,6 +1884,17 @@ public class HiveConf extends Configuration {
     WRITE_SET_REAPER_INTERVAL("hive.writeset.reaper.interval", "60s",
       new TimeValidator(TimeUnit.MILLISECONDS), "Frequency of WriteSet reaper runs"),
 
+    // For Druid storage handler
+    HIVE_DRUID_BROKER_DEFAULT_ADDRESS("hive.druid.broker.address.default", "localhost:8082",
+        "Address of the Druid broker. If we are querying Druid from Hive, this address needs to be\n" +
+        "declared"),
+    HIVE_DRUID_SELECT_THRESHOLD("hive.druid.select.threshold", 10000,
+        "When we can split a Select query, this is the maximum number of rows that we try to retrieve\n" +
+        "per query. In order to do that, we obtain the estimated size for the complete result. If the\n" +
+        "number of records of the query results is larger than this threshold, we split the query in\n" +
+        "total number of rows/threshold parts across the time dimension. Note that we assume the\n" +
+        "records to be split uniformly across the time dimension"),
+
     // For HBase storage handler
     HIVE_HBASE_WAL_ENABLED("hive.hbase.wal.enabled", true,
         "Whether writes to HBase should be forced to the write-ahead log. \n" +

http://git-wip-us.apache.org/repos/asf/hive/blob/58d1befa/druid-handler/pom.xml
----------------------------------------------------------------------
diff --git a/druid-handler/pom.xml b/druid-handler/pom.xml
new file mode 100644
index 0000000..2173cdc
--- /dev/null
+++ b/druid-handler/pom.xml
@@ -0,0 +1,201 @@
+<?xml version="1.0" encoding="UTF-8"?>
+<!--
+  Licensed under the Apache License, Version 2.0 (the "License");
+  you may not use this file except in compliance with the License.
+  You may obtain a copy of the License at
+
+      http://www.apache.org/licenses/LICENSE-2.0
+
+  Unless required by applicable law or agreed to in writing, software
+  distributed under the License is distributed on an "AS IS" BASIS,
+  WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+  See the License for the specific language governing permissions and
+  limitations under the License.
+-->
+<project xmlns="http://maven.apache.org/POM/4.0.0"
+         xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
+         xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/xsd/maven-4.0.0.xsd">
+  <modelVersion>4.0.0</modelVersion>
+  <parent>
+    <groupId>org.apache.hive</groupId>
+    <artifactId>hive</artifactId>
+    <version>2.2.0-SNAPSHOT</version>
+    <relativePath>../pom.xml</relativePath>
+  </parent>
+
+  <artifactId>hive-druid-handler</artifactId>
+  <packaging>jar</packaging>
+  <name>Hive Druid Handler</name>
+
+  <properties>
+    <hive.path.to.root>..</hive.path.to.root>
+  </properties>
+
+  <dependencies>
+    <!-- dependencies are always listed in sorted order by groupId, artifectId -->
+    <!-- intra-project -->
+    <dependency>
+      <groupId>org.apache.hive</groupId>
+      <artifactId>hive-exec</artifactId>
+      <version>${project.version}</version>
+      <exclusions>
+        <exclusion>
+          <groupId>io.netty</groupId>
+          <artifactId>netty-all</artifactId>
+        </exclusion>
+        <exclusion>
+          <groupId>io.netty</groupId>
+          <artifactId>netty</artifactId>
+        </exclusion>
+      </exclusions>
+    </dependency>
+    <!-- inter-project -->
+    <dependency>
+      <groupId>commons-lang</groupId>
+      <artifactId>commons-lang</artifactId>
+      <version>${commons-lang.version}</version>
+    </dependency>
+    <dependency>
+      <groupId>org.apache.hadoop</groupId>
+      <artifactId>hadoop-common</artifactId>
+      <version>${hadoop.version}</version>
+      <optional>true</optional>
+        <exclusions>
+          <exclusion>
+            <groupId>org.slf4j</groupId>
+            <artifactId>slf4j-log4j12</artifactId>
+          </exclusion>
+          <exclusion>
+            <groupId>commmons-logging</groupId>
+            <artifactId>commons-logging</artifactId>
+          </exclusion>
+      </exclusions>
+    </dependency>
+    <dependency>
+      <groupId>org.apache.hadoop</groupId>
+      <artifactId>hadoop-mapreduce-client-core</artifactId>
+      <version>${hadoop.version}</version>
+      <optional>true</optional>
+      <exclusions>
+        <exclusion>
+          <groupId>io.netty</groupId>
+          <artifactId>netty</artifactId>
+        </exclusion>
+      </exclusions>
+    </dependency>
+    <dependency>
+      <groupId>io.druid</groupId>
+      <artifactId>druid-processing</artifactId>
+      <version>${druid.version}</version>
+      <exclusions>
+        <exclusion>
+          <groupId>com.fasterxml.jackson.core</groupId>
+          <artifactId>jackson-core</artifactId>
+        </exclusion>
+        <exclusion>
+          <groupId>com.fasterxml.jackson.core</groupId>
+          <artifactId>jackson-annotations</artifactId>
+        </exclusion>
+        <exclusion>
+          <groupId>com.fasterxml.jackson.core</groupId>
+          <artifactId>jackson-databind</artifactId>
+        </exclusion>
+      </exclusions>
+    </dependency>
+
+    <!-- test inter-project -->
+    <dependency>
+      <groupId>junit</groupId>
+      <artifactId>junit</artifactId>
+      <version>${junit.version}</version>
+      <scope>test</scope>
+    </dependency>
+  </dependencies>
+
+  <build>
+    <sourceDirectory>${basedir}/src/java</sourceDirectory>
+    <testSourceDirectory>${basedir}/src/test</testSourceDirectory>
+    <plugins>
+      <plugin>
+        <groupId>org.apache.maven.plugins</groupId>
+        <artifactId>maven-jar-plugin</artifactId>
+        <executions>
+          <execution>
+            <goals>
+              <goal>test-jar</goal>
+            </goals>
+          </execution>
+        </executions>
+      </plugin>
+      <plugin>
+        <groupId>org.apache.maven.plugins</groupId>
+        <artifactId>maven-shade-plugin</artifactId>
+        <version>${maven.shade.plugin.version}</version>
+        <executions>
+          <!-- we need to shade netty, as there is a conflict between versions
+          used by Hadoop (3.6.2.Final) and Druid (3.10.4.Final) -->
+          <!-- we need to shade jackson, as there is a conflict between versions
+          used by Hive (2.4.2) and Druid (2.4.6) -->
+          <execution>
+            <phase>package</phase>
+            <goals>
+              <goal>shade</goal>
+            </goals>
+            <configuration>
+              <shadeTestJar>true</shadeTestJar>
+              <createDependencyReducedPom>false</createDependencyReducedPom>
+              <relocations>
+                <relocation>
+                  <pattern>io.druid</pattern>
+                  <shadedPattern>org.apache.hive.druid.io.druid</shadedPattern>
+                </relocation>
+                <relocation>
+                  <pattern>com.metamx.emitter</pattern>
+                  <shadedPattern>org.apache.hive.druid.com.metamx.emitter</shadedPattern>
+                </relocation>
+                <relocation>
+                  <pattern>com.metamx.http.client</pattern>
+                  <shadedPattern>org.apache.hive.druid.com.metamx.http.client</shadedPattern>
+                </relocation>
+                <relocation>
+                  <pattern>io.netty</pattern>
+                  <shadedPattern>org.apache.hive.druid.io.netty</shadedPattern>
+                </relocation>
+                <relocation>
+                  <pattern>org.jboss.netty</pattern>
+                  <shadedPattern>org.apache.hive.druid.org.jboss.netty</shadedPattern>
+                </relocation>
+                <relocation>
+                  <pattern>com.fasterxml.jackson</pattern>
+                  <shadedPattern>org.apache.hive.druid.com.fasterxml.jackson</shadedPattern>
+                </relocation>
+              </relocations>
+              <artifactSet>
+                <includes>
+                  <include>io.druid:*</include>
+                  <include>com.metamx:emitter:*</include>
+                  <include>com.metamx:http-client:*</include>
+                  <include>io.netty:*</include>
+                  <include>com.fasterxml.jackson.core:*</include>
+                  <include>com.fasterxml.jackson.datatype:*</include>
+                  <include>com.fasterxml.jackson.dataformat:*</include>
+                </includes>
+              </artifactSet>
+              <filters>
+                <filter>
+                  <artifact>*:*</artifact>
+                  <excludes>
+                    <exclude>META-INF/*.SF</exclude>
+                    <exclude>META-INF/*.DSA</exclude>
+                    <exclude>META-INF/*.RSA</exclude>
+                  </excludes>
+                </filter>
+              </filters>
+            </configuration>
+          </execution>
+        </executions>
+      </plugin>
+    </plugins>
+  </build>
+
+</project>

http://git-wip-us.apache.org/repos/asf/hive/blob/58d1befa/druid-handler/src/java/org/apache/hadoop/hive/druid/DruidStorageHandler.java
----------------------------------------------------------------------
diff --git a/druid-handler/src/java/org/apache/hadoop/hive/druid/DruidStorageHandler.java b/druid-handler/src/java/org/apache/hadoop/hive/druid/DruidStorageHandler.java
new file mode 100644
index 0000000..ac03099
--- /dev/null
+++ b/druid-handler/src/java/org/apache/hadoop/hive/druid/DruidStorageHandler.java
@@ -0,0 +1,109 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.hadoop.hive.druid;
+
+import org.apache.commons.lang3.StringUtils;
+import org.apache.hadoop.hive.conf.Constants;
+import org.apache.hadoop.hive.druid.serde.DruidSerDe;
+import org.apache.hadoop.hive.metastore.HiveMetaHook;
+import org.apache.hadoop.hive.metastore.MetaStoreUtils;
+import org.apache.hadoop.hive.metastore.api.MetaException;
+import org.apache.hadoop.hive.metastore.api.Table;
+import org.apache.hadoop.hive.ql.metadata.DefaultStorageHandler;
+import org.apache.hadoop.hive.serde2.SerDe;
+import org.apache.hadoop.mapred.InputFormat;
+import org.apache.hadoop.mapred.OutputFormat;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+/**
+ * DruidStorageHandler provides a HiveStorageHandler implementation for Druid.
+ */
+@SuppressWarnings({"deprecation","rawtypes"})
+public class DruidStorageHandler extends DefaultStorageHandler implements HiveMetaHook {
+
+  protected static final Logger LOG = LoggerFactory.getLogger(DruidStorageHandler.class);
+
+  @Override
+  public Class<? extends InputFormat> getInputFormatClass() {
+    return HiveDruidQueryBasedInputFormat.class;
+  }
+
+  @Override
+  public Class<? extends OutputFormat> getOutputFormatClass() {
+    return HiveDruidOutputFormat.class;
+  }
+
+  @Override
+  public Class<? extends SerDe> getSerDeClass() {
+    return DruidSerDe.class;
+  }
+
+  @Override
+  public HiveMetaHook getMetaHook() {
+    return this;
+  }
+
+  @Override
+  public void preCreateTable(Table table) throws MetaException {
+    // Do safety checks
+    if (!MetaStoreUtils.isExternalTable(table)) {
+      throw new MetaException("Table in Druid needs to be declared as EXTERNAL");
+    }
+    if (!StringUtils.isEmpty(table.getSd().getLocation())) {
+      throw new MetaException("LOCATION may not be specified for Druid");
+    }
+    if (table.getPartitionKeysSize() != 0) {
+      throw new MetaException("PARTITIONED BY may not be specified for Druid");
+    }
+    if (table.getSd().getBucketColsSize() != 0) {
+      throw new MetaException("CLUSTERED BY may not be specified for Druid");
+    }
+  }
+
+  @Override
+  public void rollbackCreateTable(Table table) throws MetaException {
+    // Nothing to do
+  }
+
+  @Override
+  public void commitCreateTable(Table table) throws MetaException {
+    // Nothing to do
+  }
+
+  @Override
+  public void preDropTable(Table table) throws MetaException {
+    // Nothing to do
+  }
+
+  @Override
+  public void rollbackDropTable(Table table) throws MetaException {
+    // Nothing to do
+  }
+
+  @Override
+  public void commitDropTable(Table table, boolean deleteData) throws MetaException {
+    // Nothing to do
+  }
+
+  @Override
+  public String toString() {
+    return Constants.DRUID_HIVE_STORAGE_HANDLER_ID;
+  }
+
+}

http://git-wip-us.apache.org/repos/asf/hive/blob/58d1befa/druid-handler/src/java/org/apache/hadoop/hive/druid/DruidStorageHandlerUtils.java
----------------------------------------------------------------------
diff --git a/druid-handler/src/java/org/apache/hadoop/hive/druid/DruidStorageHandlerUtils.java b/druid-handler/src/java/org/apache/hadoop/hive/druid/DruidStorageHandlerUtils.java
new file mode 100644
index 0000000..c6b8024
--- /dev/null
+++ b/druid-handler/src/java/org/apache/hadoop/hive/druid/DruidStorageHandlerUtils.java
@@ -0,0 +1,90 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.hadoop.hive.druid;
+
+import java.io.IOException;
+import java.io.InputStream;
+import java.net.URL;
+import java.util.concurrent.ExecutionException;
+
+import org.jboss.netty.handler.codec.http.HttpHeaders;
+import org.jboss.netty.handler.codec.http.HttpMethod;
+
+import com.fasterxml.jackson.databind.ObjectMapper;
+import com.fasterxml.jackson.dataformat.smile.SmileFactory;
+import com.metamx.http.client.HttpClient;
+import com.metamx.http.client.Request;
+import com.metamx.http.client.response.InputStreamResponseHandler;
+
+import io.druid.jackson.DefaultObjectMapper;
+import io.druid.query.BaseQuery;
+
+/**
+ * Utils class for Druid storage handler.
+ */
+public final class DruidStorageHandlerUtils {
+
+  private static final String SMILE_CONTENT_TYPE = "application/x-jackson-smile";
+
+  /**
+   * Mapper to use to serialize/deserialize Druid objects (JSON)
+   */
+  public static final ObjectMapper JSON_MAPPER = new DefaultObjectMapper();
+
+  /**
+   * Mapper to use to serialize/deserialize Druid objects (SMILE)
+   */
+  public static final ObjectMapper SMILE_MAPPER = new DefaultObjectMapper(new SmileFactory());
+
+  /**
+   * Method that creates a request for Druid JSON query (using SMILE).
+   * @param mapper
+   * @param address
+   * @param query
+   * @return
+   * @throws IOException
+   */
+  public static Request createRequest(String address, BaseQuery<?> query)
+          throws IOException {
+    return new Request(HttpMethod.POST, new URL(String.format("%s/druid/v2/", "http://" + address)))
+            .setContent(SMILE_MAPPER.writeValueAsBytes(query))
+            .setHeader(HttpHeaders.Names.CONTENT_TYPE, SMILE_CONTENT_TYPE);
+  }
+
+  /**
+   * Method that submits a request to an Http address and retrieves the result.
+   * The caller is responsible for closing the stream once it finishes consuming it.
+   * @param client
+   * @param request
+   * @return
+   * @throws IOException
+   */
+  public static InputStream submitRequest(HttpClient client, Request request)
+          throws IOException {
+    InputStream response;
+    try {
+      response = client.go(request, new InputStreamResponseHandler()).get();
+    } catch (ExecutionException e) {
+      throw new IOException(e.getCause());
+    } catch (InterruptedException e) {
+      throw new IOException(e.getCause());
+    }
+    return response;
+  }
+
+}

http://git-wip-us.apache.org/repos/asf/hive/blob/58d1befa/druid-handler/src/java/org/apache/hadoop/hive/druid/HiveDruidOutputFormat.java
----------------------------------------------------------------------
diff --git a/druid-handler/src/java/org/apache/hadoop/hive/druid/HiveDruidOutputFormat.java b/druid-handler/src/java/org/apache/hadoop/hive/druid/HiveDruidOutputFormat.java
new file mode 100644
index 0000000..45e31d6
--- /dev/null
+++ b/druid-handler/src/java/org/apache/hadoop/hive/druid/HiveDruidOutputFormat.java
@@ -0,0 +1,55 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.hadoop.hive.druid;
+
+import java.io.IOException;
+import java.util.Properties;
+
+import org.apache.hadoop.fs.FileSystem;
+import org.apache.hadoop.fs.Path;
+import org.apache.hadoop.hive.ql.exec.FileSinkOperator;
+import org.apache.hadoop.hive.ql.io.HiveOutputFormat;
+import org.apache.hadoop.mapred.JobConf;
+import org.apache.hadoop.mapred.RecordWriter;
+import org.apache.hadoop.util.Progressable;
+
+/**
+ * Place holder for Druid output format. Currently not implemented.
+ */
+@SuppressWarnings("rawtypes")
+public class HiveDruidOutputFormat implements HiveOutputFormat {
+
+  @Override
+  public RecordWriter getRecordWriter(FileSystem ignored, JobConf job, String name,
+          Progressable progress) throws IOException {
+    throw new UnsupportedOperationException();
+  }
+
+  @Override
+  public void checkOutputSpecs(FileSystem ignored, JobConf job) throws IOException {
+    throw new UnsupportedOperationException();
+  }
+
+  @Override
+  public FileSinkOperator.RecordWriter getHiveRecordWriter(JobConf jc, Path finalOutPath,
+          Class valueClass, boolean isCompressed, Properties tableProperties, Progressable progress)
+                  throws IOException {
+    throw new UnsupportedOperationException();
+  }
+
+}

http://git-wip-us.apache.org/repos/asf/hive/blob/58d1befa/druid-handler/src/java/org/apache/hadoop/hive/druid/HiveDruidQueryBasedInputFormat.java
----------------------------------------------------------------------
diff --git a/druid-handler/src/java/org/apache/hadoop/hive/druid/HiveDruidQueryBasedInputFormat.java b/druid-handler/src/java/org/apache/hadoop/hive/druid/HiveDruidQueryBasedInputFormat.java
new file mode 100644
index 0000000..3df1452
--- /dev/null
+++ b/druid-handler/src/java/org/apache/hadoop/hive/druid/HiveDruidQueryBasedInputFormat.java
@@ -0,0 +1,369 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.hadoop.hive.druid;
+
+import java.io.IOException;
+import java.io.InputStream;
+import java.util.ArrayList;
+import java.util.Arrays;
+import java.util.HashMap;
+import java.util.List;
+import java.util.Map;
+
+import org.apache.commons.lang3.StringEscapeUtils;
+import org.apache.commons.lang3.StringUtils;
+import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.fs.Path;
+import org.apache.hadoop.hive.conf.Constants;
+import org.apache.hadoop.hive.conf.HiveConf;
+import org.apache.hadoop.hive.druid.serde.DruidGroupByQueryRecordReader;
+import org.apache.hadoop.hive.druid.serde.DruidQueryRecordReader;
+import org.apache.hadoop.hive.druid.serde.DruidSelectQueryRecordReader;
+import org.apache.hadoop.hive.druid.serde.DruidTimeseriesQueryRecordReader;
+import org.apache.hadoop.hive.druid.serde.DruidTopNQueryRecordReader;
+import org.apache.hadoop.hive.druid.serde.DruidWritable;
+import org.apache.hadoop.hive.ql.optimizer.calcite.druid.DruidIntervalUtils;
+import org.apache.hadoop.hive.ql.optimizer.calcite.druid.DruidTable;
+import org.apache.hadoop.hive.shims.ShimLoader;
+import org.apache.hadoop.io.NullWritable;
+import org.apache.hadoop.mapred.JobConf;
+import org.apache.hadoop.mapred.Reporter;
+import org.apache.hadoop.mapreduce.InputFormat;
+import org.apache.hadoop.mapreduce.InputSplit;
+import org.apache.hadoop.mapreduce.Job;
+import org.apache.hadoop.mapreduce.JobContext;
+import org.apache.hadoop.mapreduce.RecordReader;
+import org.apache.hadoop.mapreduce.TaskAttemptContext;
+import org.apache.hadoop.mapreduce.lib.input.FileInputFormat;
+import org.joda.time.Interval;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import com.fasterxml.jackson.core.type.TypeReference;
+import com.metamx.common.lifecycle.Lifecycle;
+import com.metamx.http.client.HttpClient;
+import com.metamx.http.client.HttpClientConfig;
+import com.metamx.http.client.HttpClientInit;
+
+import io.druid.query.Druids;
+import io.druid.query.Druids.SegmentMetadataQueryBuilder;
+import io.druid.query.Druids.SelectQueryBuilder;
+import io.druid.query.Druids.TimeBoundaryQueryBuilder;
+import io.druid.query.Query;
+import io.druid.query.Result;
+import io.druid.query.metadata.metadata.SegmentAnalysis;
+import io.druid.query.metadata.metadata.SegmentMetadataQuery;
+import io.druid.query.select.PagingSpec;
+import io.druid.query.select.SelectQuery;
+import io.druid.query.spec.MultipleIntervalSegmentSpec;
+import io.druid.query.timeboundary.TimeBoundaryQuery;
+import io.druid.query.timeboundary.TimeBoundaryResultValue;
+
+/**
+ * Druid query based input format.
+ * 
+ * Given a query and the Druid broker address, it will send it, and retrieve
+ * and parse the results.
+ */
+public class HiveDruidQueryBasedInputFormat extends InputFormat<NullWritable, DruidWritable>
+        implements org.apache.hadoop.mapred.InputFormat<NullWritable, DruidWritable> {
+
+  protected static final Logger LOG = LoggerFactory.getLogger(HiveDruidQueryBasedInputFormat.class);
+
+  @Override
+  public org.apache.hadoop.mapred.InputSplit[] getSplits(JobConf job, int numSplits)
+          throws IOException {
+    return getInputSplits(job);
+  }
+
+  @Override
+  public List<InputSplit> getSplits(JobContext context) throws IOException, InterruptedException {
+    return Arrays.<InputSplit> asList(getInputSplits(context.getConfiguration()));
+  }
+
+  @SuppressWarnings("deprecation")
+  private HiveDruidSplit[] getInputSplits(Configuration conf) throws IOException {
+    String address = HiveConf.getVar(conf,
+            HiveConf.ConfVars.HIVE_DRUID_BROKER_DEFAULT_ADDRESS);
+    if (StringUtils.isEmpty(address)) {
+      throw new IOException("Druid broker address not specified in configuration");
+    }
+    String druidQuery = StringEscapeUtils.unescapeJava(conf.get(Constants.DRUID_QUERY_JSON));
+    String druidQueryType;
+    if (StringUtils.isEmpty(druidQuery)) {
+      // Empty, maybe because CBO did not run; we fall back to
+      // full Select query
+      if (LOG.isWarnEnabled()) {
+        LOG.warn("Druid query is empty; creating Select query");
+      }
+      String dataSource = conf.get(Constants.DRUID_DATA_SOURCE);
+      if (dataSource == null) {
+        throw new IOException("Druid data source cannot be empty");
+      }
+      druidQuery = createSelectStarQuery(address, dataSource);
+      druidQueryType = Query.SELECT;
+    } else {
+      druidQueryType = conf.get(Constants.DRUID_QUERY_TYPE);
+      if (druidQueryType == null) {
+        throw new IOException("Druid query type not recognized");
+      }
+    }
+
+    // hive depends on FileSplits
+    Job job = new Job(conf);
+    JobContext jobContext = ShimLoader.getHadoopShims().newJobContext(job);
+    Path [] paths = FileInputFormat.getInputPaths(jobContext);
+
+    switch (druidQueryType) {
+      case Query.TIMESERIES:
+      case Query.TOPN:
+      case Query.GROUP_BY:
+        return new HiveDruidSplit[] { new HiveDruidSplit(address, druidQuery, paths[0]) };
+      case Query.SELECT:
+        return splitSelectQuery(conf, address, druidQuery, paths[0]);
+      default:
+        throw new IOException("Druid query type not recognized");
+    }
+  }
+
+  private static String createSelectStarQuery(String address, String dataSource) throws IOException {
+    // Create Select query
+    SelectQueryBuilder builder = new Druids.SelectQueryBuilder();
+    builder.dataSource(dataSource);
+    builder.intervals(Arrays.asList(DruidTable.DEFAULT_INTERVAL));
+    builder.pagingSpec(PagingSpec.newSpec(1));
+    Map<String, Object> context = new HashMap<>();
+    context.put(Constants.DRUID_QUERY_FETCH, false);
+    builder.context(context);
+    return DruidStorageHandlerUtils.JSON_MAPPER.writeValueAsString(builder.build());
+  }
+
+  /* Method that splits Select query depending on the threshold so read can be
+   * parallelized */
+  private static HiveDruidSplit[] splitSelectQuery(Configuration conf, String address,
+          String druidQuery, Path dummyPath) throws IOException {
+    final int selectThreshold = (int) HiveConf.getIntVar(
+            conf, HiveConf.ConfVars.HIVE_DRUID_SELECT_THRESHOLD);
+
+    SelectQuery query;
+    try {
+      query = DruidStorageHandlerUtils.JSON_MAPPER.readValue(druidQuery, SelectQuery.class);
+    } catch (Exception e) {
+      throw new IOException(e);
+    }
+
+    final boolean isFetch = query.getContextBoolean(Constants.DRUID_QUERY_FETCH, false);
+    if (isFetch) {
+      // If it has a limit, we use it and we do not split the query
+      return new HiveDruidSplit[] { new HiveDruidSplit(
+              address, DruidStorageHandlerUtils.JSON_MAPPER.writeValueAsString(query), dummyPath) };
+    }
+
+    // We do not have the number of rows, thus we need to execute a
+    // Segment Metadata query to obtain number of rows
+    SegmentMetadataQueryBuilder metadataBuilder = new Druids.SegmentMetadataQueryBuilder();
+    metadataBuilder.dataSource(query.getDataSource());
+    metadataBuilder.intervals(query.getIntervals());
+    metadataBuilder.merge(true);
+    metadataBuilder.analysisTypes();
+    SegmentMetadataQuery metadataQuery = metadataBuilder.build();
+
+    HttpClient client = HttpClientInit.createClient(HttpClientConfig.builder().build(), new Lifecycle());
+    InputStream response;
+    try {
+      response = DruidStorageHandlerUtils.submitRequest(client,
+              DruidStorageHandlerUtils.createRequest(address, metadataQuery));
+    } catch (Exception e) {
+      throw new IOException(org.apache.hadoop.util.StringUtils.stringifyException(e));
+    }
+
+    // Retrieve results
+    List<SegmentAnalysis> metadataList;
+    try {
+      metadataList = DruidStorageHandlerUtils.SMILE_MAPPER.readValue(response,
+            new TypeReference<List<SegmentAnalysis>>() {});
+    } catch (Exception e) {
+      response.close();
+      throw new IOException(org.apache.hadoop.util.StringUtils.stringifyException(e));
+    }
+    if (metadataList == null || metadataList.isEmpty()) {
+      throw new IOException("Connected to Druid but could not retrieve datasource information");
+    }
+    if (metadataList.size() != 1) {
+      throw new IOException("Information about segments should have been merged");
+    }
+
+    final long numRows = metadataList.get(0).getNumRows();
+
+    query = query.withPagingSpec(PagingSpec.newSpec(selectThreshold));
+    if (numRows <= selectThreshold) {
+      // We are not going to split it
+      return new HiveDruidSplit[] { new HiveDruidSplit(address,
+              DruidStorageHandlerUtils.JSON_MAPPER.writeValueAsString(query), dummyPath) };
+    }
+
+    // If the query does not specify a timestamp, we obtain the total time using
+    // a Time Boundary query. Then, we use the information to split the query
+    // following the Select threshold configuration property
+    final List<Interval> intervals = new ArrayList<>();
+    if (query.getIntervals().size() == 1 &&
+            query.getIntervals().get(0).equals(DruidTable.DEFAULT_INTERVAL)) {
+      // Default max and min, we should execute a time boundary query to get a
+      // more precise range
+      TimeBoundaryQueryBuilder timeBuilder = new Druids.TimeBoundaryQueryBuilder();
+      timeBuilder.dataSource(query.getDataSource());
+      TimeBoundaryQuery timeQuery = timeBuilder.build();
+
+      try {
+        response = DruidStorageHandlerUtils.submitRequest(client,
+                DruidStorageHandlerUtils.createRequest(address, timeQuery));
+      } catch (Exception e) {
+        throw new IOException(org.apache.hadoop.util.StringUtils.stringifyException(e));
+      }
+
+      // Retrieve results
+      List<Result<TimeBoundaryResultValue>> timeList;
+      try {
+        timeList = DruidStorageHandlerUtils.SMILE_MAPPER.readValue(response,
+              new TypeReference<List<Result<TimeBoundaryResultValue>>>() {});
+      } catch (Exception e) {
+        response.close();
+        throw new IOException(org.apache.hadoop.util.StringUtils.stringifyException(e));
+      }
+      if (timeList == null || timeList.isEmpty()) {
+        throw new IOException("Connected to Druid but could not retrieve time boundary information");
+      }
+      if (timeList.size() != 1) {
+        throw new IOException("We should obtain a single time boundary");
+      }
+
+      intervals.add(new Interval(timeList.get(0).getValue().getMinTime().getMillis(),
+              timeList.get(0).getValue().getMaxTime().getMillis()));
+    } else {
+      intervals.addAll(query.getIntervals());
+    }
+
+    // Create (numRows/default threshold) input splits
+    int numSplits = (int) Math.ceil((double) numRows / selectThreshold);
+    List<List<Interval>> newIntervals = createSplitsIntervals(intervals, numSplits);
+    HiveDruidSplit[] splits = new HiveDruidSplit[numSplits];
+    for (int i = 0; i < numSplits; i++) {
+      // Create partial Select query
+      final SelectQuery partialQuery = query.withQuerySegmentSpec(
+              new MultipleIntervalSegmentSpec(newIntervals.get(i)));
+      splits[i] = new HiveDruidSplit(address,
+              DruidStorageHandlerUtils.JSON_MAPPER.writeValueAsString(partialQuery), dummyPath);
+    }
+    return splits;
+  }
+
+  private static List<List<Interval>> createSplitsIntervals(List<Interval> intervals, int numSplits) {
+    final long totalTime = DruidIntervalUtils.extractTotalTime(intervals);
+    long startTime = intervals.get(0).getStartMillis();
+    long endTime = startTime;
+    long currTime = 0;
+    List<List<Interval>> newIntervals = new ArrayList<>();
+    for (int i = 0, posIntervals = 0; i < numSplits; i++) {
+      final long rangeSize = Math.round( (double) (totalTime * (i + 1)) / numSplits) -
+              Math.round( (double) (totalTime * i) / numSplits);
+      // Create the new interval(s)
+      List<Interval> currentIntervals = new ArrayList<>();
+      while (posIntervals < intervals.size()) {
+        final Interval interval = intervals.get(posIntervals);
+        final long expectedRange = rangeSize - currTime;
+        if (interval.getEndMillis() - startTime >= expectedRange) {
+          endTime = startTime + expectedRange;
+          currentIntervals.add(new Interval(startTime, endTime));
+          startTime = endTime;
+          currTime = 0;
+          break;
+        }
+        endTime = interval.getEndMillis();
+        currentIntervals.add(new Interval(startTime, endTime));
+        currTime += (endTime - startTime);
+        startTime = intervals.get(++posIntervals).getStartMillis();
+      }
+      newIntervals.add(currentIntervals);
+    }
+    assert endTime == intervals.get(intervals.size()-1).getEndMillis();
+    return newIntervals;
+  }
+
+  @Override
+  public org.apache.hadoop.mapred.RecordReader<NullWritable, DruidWritable> getRecordReader(
+          org.apache.hadoop.mapred.InputSplit split, JobConf job, Reporter reporter)
+                  throws IOException {
+    // We need to provide a different record reader for every type of Druid query.
+    // The reason is that Druid results format is different for each type.
+    final DruidQueryRecordReader<?,?> reader;
+    final String druidQueryType = job.get(Constants.DRUID_QUERY_TYPE);
+    if (druidQueryType == null) {
+      reader = new DruidSelectQueryRecordReader(); // By default
+      reader.initialize((HiveDruidSplit)split, job);
+      return reader;
+    }
+    switch (druidQueryType) {
+      case Query.TIMESERIES:
+        reader = new DruidTimeseriesQueryRecordReader();
+        break;
+      case Query.TOPN:
+        reader = new DruidTopNQueryRecordReader();
+        break;
+      case Query.GROUP_BY:
+        reader = new DruidGroupByQueryRecordReader();
+        break;
+      case Query.SELECT:
+        reader = new DruidSelectQueryRecordReader();
+        break;
+      default:
+        throw new IOException("Druid query type not recognized");
+    }
+    reader.initialize((HiveDruidSplit)split, job);
+    return reader;
+  }
+
+  @Override
+  public RecordReader<NullWritable, DruidWritable> createRecordReader(InputSplit split,
+          TaskAttemptContext context) throws IOException, InterruptedException {
+    // We need to provide a different record reader for every type of Druid query.
+    // The reason is that Druid results format is different for each type.
+    final String druidQueryType = context.getConfiguration().get(Constants.DRUID_QUERY_TYPE);
+    if (druidQueryType == null) {
+      return new DruidSelectQueryRecordReader(); // By default
+    }
+    final DruidQueryRecordReader<?,?> reader;
+    switch (druidQueryType) {
+      case Query.TIMESERIES:
+        reader = new DruidTimeseriesQueryRecordReader();
+        break;
+      case Query.TOPN:
+        reader = new DruidTopNQueryRecordReader();
+        break;
+      case Query.GROUP_BY:
+        reader = new DruidGroupByQueryRecordReader();
+        break;
+      case Query.SELECT:
+        reader = new DruidSelectQueryRecordReader();
+        break;
+      default:
+        throw new IOException("Druid query type not recognized");
+    }
+    return reader;
+  }
+
+}

http://git-wip-us.apache.org/repos/asf/hive/blob/58d1befa/druid-handler/src/java/org/apache/hadoop/hive/druid/HiveDruidSplit.java
----------------------------------------------------------------------
diff --git a/druid-handler/src/java/org/apache/hadoop/hive/druid/HiveDruidSplit.java b/druid-handler/src/java/org/apache/hadoop/hive/druid/HiveDruidSplit.java
new file mode 100644
index 0000000..3fba5d0
--- /dev/null
+++ b/druid-handler/src/java/org/apache/hadoop/hive/druid/HiveDruidSplit.java
@@ -0,0 +1,83 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.hadoop.hive.druid;
+
+import java.io.DataInput;
+import java.io.DataOutput;
+import java.io.IOException;
+
+import org.apache.hadoop.fs.Path;
+import org.apache.hadoop.mapred.FileSplit;
+
+/**
+ * Druid split. Its purpose is to trigger query execution in Druid.
+ */
+public class HiveDruidSplit extends FileSplit implements org.apache.hadoop.mapred.InputSplit {
+
+  private String address;
+  private String druidQuery;
+
+  // required for deserialization
+  public HiveDruidSplit() {
+    super((Path) null, 0, 0, (String[]) null);
+  }
+
+  public HiveDruidSplit(String address, String druidQuery, Path dummyPath) {
+    super(dummyPath, 0, 0, (String[]) null);
+    this.address = address;
+    this.druidQuery = druidQuery;
+  }
+
+  @Override
+  public void write(DataOutput out) throws IOException {
+    super.write(out);
+    out.writeUTF(address);
+    out.writeUTF(druidQuery);
+  }
+
+  @Override
+  public void readFields(DataInput in) throws IOException {
+    super.readFields(in);
+    address = in.readUTF();
+    druidQuery = in.readUTF();
+  }
+
+  @Override
+  public long getLength() {
+    return 0L;
+  }
+
+  @Override
+  public String[] getLocations() {
+    return new String[] {""} ;
+  }
+
+  public String getAddress() {
+    return address;
+  }
+
+  public String getDruidQuery() {
+    return druidQuery;
+  }
+
+  @Override
+  public String toString() {
+    return "HiveDruidSplit{" + address + ", " + druidQuery + "}";
+  }
+
+}

http://git-wip-us.apache.org/repos/asf/hive/blob/58d1befa/druid-handler/src/java/org/apache/hadoop/hive/druid/serde/DruidGroupByQueryRecordReader.java
----------------------------------------------------------------------
diff --git a/druid-handler/src/java/org/apache/hadoop/hive/druid/serde/DruidGroupByQueryRecordReader.java b/druid-handler/src/java/org/apache/hadoop/hive/druid/serde/DruidGroupByQueryRecordReader.java
new file mode 100644
index 0000000..226060f
--- /dev/null
+++ b/druid-handler/src/java/org/apache/hadoop/hive/druid/serde/DruidGroupByQueryRecordReader.java
@@ -0,0 +1,199 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.hadoop.hive.druid.serde;
+
+import java.io.IOException;
+import java.io.InputStream;
+import java.util.List;
+
+import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.hive.druid.DruidStorageHandlerUtils;
+import org.apache.hadoop.hive.ql.optimizer.calcite.druid.DruidTable;
+import org.apache.hadoop.io.NullWritable;
+import org.apache.hadoop.mapreduce.InputSplit;
+
+import com.fasterxml.jackson.core.type.TypeReference;
+
+import io.druid.data.input.Row;
+import io.druid.query.aggregation.AggregatorFactory;
+import io.druid.query.aggregation.PostAggregator;
+import io.druid.query.dimension.DimensionSpec;
+import io.druid.query.groupby.GroupByQuery;
+
+/**
+ * Record reader for results for Druid GroupByQuery.
+ */
+public class DruidGroupByQueryRecordReader
+        extends DruidQueryRecordReader<GroupByQuery, Row> {
+
+  private Row current;
+  private int[] indexes = new int[0];
+  // Row objects returned by GroupByQuery have different access paths depending on
+  // whether the result for the metric is a Float or a Long, thus we keep track
+  // using these converters
+  private Extract[] extractors;
+
+  @Override
+  public void initialize(InputSplit split, Configuration conf) throws IOException {
+    super.initialize(split, conf);
+    initExtractors();
+  }
+
+  @Override
+  protected GroupByQuery createQuery(String content) throws IOException {
+    return DruidStorageHandlerUtils.JSON_MAPPER.readValue(content, GroupByQuery.class);
+  }
+
+  @Override
+  protected List<Row> createResultsList(InputStream content) throws IOException {
+    return DruidStorageHandlerUtils.SMILE_MAPPER.readValue(content,
+            new TypeReference<List<Row>>(){});
+  }
+
+  private void initExtractors() throws IOException {
+    extractors = new Extract[query.getAggregatorSpecs().size() + query.getPostAggregatorSpecs().size()];
+    int counter = 0;
+    for (int i = 0; i < query.getAggregatorSpecs().size(); i++, counter++) {
+      AggregatorFactory af = query.getAggregatorSpecs().get(i);
+      switch (af.getTypeName().toUpperCase()) {
+        case DruidSerDeUtils.FLOAT_TYPE:
+          extractors[counter] = Extract.FLOAT;
+          break;
+        case DruidSerDeUtils.LONG_TYPE:
+          extractors[counter] = Extract.LONG;
+          break;
+        default:
+          throw new IOException("Type not supported");
+      }
+    }
+    for (int i = 0; i < query.getPostAggregatorSpecs().size(); i++, counter++) {
+      extractors[counter] = Extract.FLOAT;
+    }
+  }
+
+  @Override
+  public boolean nextKeyValue() {
+    // Refresh indexes
+    for (int i = indexes.length - 1; i >= 0; i--) {
+      if (indexes[i] > 0) {
+        indexes[i]--;
+        for (int j = i + 1; j < indexes.length; j++) {
+          indexes[j] = current.getDimension(
+                  query.getDimensions().get(j).getDimension()).size() - 1;
+        }
+        return true;
+      }
+    }
+    // Results
+    if (results.hasNext()) {
+      current = results.next();
+      indexes = new int[query.getDimensions().size()];
+      for (int i=0; i < query.getDimensions().size(); i++) {
+        DimensionSpec ds = query.getDimensions().get(i);
+        indexes[i] = current.getDimension(ds.getDimension()).size() - 1;
+      }
+      return true;
+    }
+    return false;
+  }
+
+  @Override
+  public NullWritable getCurrentKey() throws IOException, InterruptedException {
+    return NullWritable.get();
+  }
+
+  @Override
+  public DruidWritable getCurrentValue() throws IOException, InterruptedException {
+    // Create new value
+    DruidWritable value = new DruidWritable();
+    // 1) The timestamp column
+    value.getValue().put(DruidTable.DEFAULT_TIMESTAMP_COLUMN, current.getTimestamp().getMillis());
+    // 2) The dimension columns
+    for (int i=0; i < query.getDimensions().size(); i++) {
+      DimensionSpec ds = query.getDimensions().get(i);
+      List<String> dims = current.getDimension(ds.getDimension());
+      int pos = dims.size() - indexes[i] - 1;
+      value.getValue().put(ds.getOutputName(), dims.get(pos));
+    }
+    int counter = 0;
+    // 3) The aggregation columns
+    for (AggregatorFactory af : query.getAggregatorSpecs()) {
+      switch (extractors[counter++]) {
+        case FLOAT:
+          value.getValue().put(af.getName(), current.getFloatMetric(af.getName()));
+          break;
+        case LONG:
+          value.getValue().put(af.getName(), current.getLongMetric(af.getName()));
+          break;
+      }
+    }
+    // 4) The post-aggregation columns
+    for (PostAggregator pa : query.getPostAggregatorSpecs()) {
+      assert extractors[counter++] == Extract.FLOAT;
+      value.getValue().put(pa.getName(), current.getFloatMetric(pa.getName()));
+    }
+    return value;
+  }
+
+  @Override
+  public boolean next(NullWritable key, DruidWritable value) {
+    if (nextKeyValue()) {
+      // Update value
+      value.getValue().clear();
+      // 1) The timestamp column
+      value.getValue().put(DruidTable.DEFAULT_TIMESTAMP_COLUMN, current.getTimestamp().getMillis());
+      // 2) The dimension columns
+      for (int i=0; i < query.getDimensions().size(); i++) {
+        DimensionSpec ds = query.getDimensions().get(i);
+        List<String> dims = current.getDimension(ds.getDimension());
+        int pos = dims.size() - indexes[i] - 1;
+        value.getValue().put(ds.getOutputName(), dims.get(pos));
+      }
+      int counter = 0;
+      // 3) The aggregation columns
+      for (AggregatorFactory af : query.getAggregatorSpecs()) {
+        switch (extractors[counter++]) {
+          case FLOAT:
+            value.getValue().put(af.getName(), current.getFloatMetric(af.getName()));
+            break;
+          case LONG:
+            value.getValue().put(af.getName(), current.getLongMetric(af.getName()));
+            break;
+        }
+      }
+      // 4) The post-aggregation columns
+      for (PostAggregator pa : query.getPostAggregatorSpecs()) {
+        assert extractors[counter++] == Extract.FLOAT;
+        value.getValue().put(pa.getName(), current.getFloatMetric(pa.getName()));
+      }
+      return true;
+    }
+    return false;
+  }
+
+  @Override
+  public float getProgress() throws IOException {
+    return results.hasNext() ? 0 : 1;
+  }
+
+  private enum Extract {
+    FLOAT,
+    LONG
+  }
+
+}

http://git-wip-us.apache.org/repos/asf/hive/blob/58d1befa/druid-handler/src/java/org/apache/hadoop/hive/druid/serde/DruidQueryRecordReader.java
----------------------------------------------------------------------
diff --git a/druid-handler/src/java/org/apache/hadoop/hive/druid/serde/DruidQueryRecordReader.java b/druid-handler/src/java/org/apache/hadoop/hive/druid/serde/DruidQueryRecordReader.java
new file mode 100644
index 0000000..96bcee8
--- /dev/null
+++ b/druid-handler/src/java/org/apache/hadoop/hive/druid/serde/DruidQueryRecordReader.java
@@ -0,0 +1,142 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.hadoop.hive.druid.serde;
+
+import java.io.IOException;
+import java.io.InputStream;
+import java.util.Iterator;
+import java.util.List;
+
+import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.hive.druid.DruidStorageHandlerUtils;
+import org.apache.hadoop.hive.druid.HiveDruidSplit;
+import org.apache.hadoop.io.NullWritable;
+import org.apache.hadoop.mapreduce.InputSplit;
+import org.apache.hadoop.mapreduce.RecordReader;
+import org.apache.hadoop.mapreduce.TaskAttemptContext;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import com.google.common.collect.Iterators;
+import com.metamx.common.lifecycle.Lifecycle;
+import com.metamx.http.client.HttpClient;
+import com.metamx.http.client.HttpClientConfig;
+import com.metamx.http.client.HttpClientInit;
+
+import io.druid.query.BaseQuery;
+
+/**
+ * Base record reader for given a Druid query. This class contains the logic to
+ * send the query to the broker and retrieve the results. The transformation to
+ * emit records needs to be done by the classes that extend the reader.
+ * 
+ * The key for each record will be a NullWritable, while the value will be a
+ * DruidWritable containing the timestamp as well as all values resulting from
+ * the query.
+ */
+public abstract class DruidQueryRecordReader<T extends BaseQuery<R>,R extends Comparable<R>>
+        extends RecordReader<NullWritable, DruidWritable>
+        implements org.apache.hadoop.mapred.RecordReader<NullWritable, DruidWritable> {
+
+  private static final Logger LOG = LoggerFactory.getLogger(DruidQueryRecordReader.class);
+
+  /**
+   * Query that Druid executes.
+   */
+  protected T query;
+
+  /**
+   * Query results.
+   */
+  protected Iterator<R> results = Iterators.emptyIterator();
+
+  @Override
+  public void initialize(InputSplit split, TaskAttemptContext context) throws IOException {
+    initialize(split, context.getConfiguration());
+  }
+
+  public void initialize(InputSplit split, Configuration conf) throws IOException {
+    HiveDruidSplit hiveDruidSplit = (HiveDruidSplit) split;
+
+    // Create query
+    query = createQuery(hiveDruidSplit.getDruidQuery());
+
+    // Execute query
+    if (LOG.isInfoEnabled()) {
+      LOG.info("Retrieving from druid using query:\n " + query);
+    }
+
+    HttpClient client = HttpClientInit.createClient(HttpClientConfig.builder().build(), new Lifecycle());
+    InputStream response = DruidStorageHandlerUtils.submitRequest(client,
+            DruidStorageHandlerUtils.createRequest(hiveDruidSplit.getAddress(), query));
+
+    // Retrieve results
+    List<R> resultsList;
+    try {
+      resultsList = createResultsList(response);
+    } catch (IOException e) {
+      response.close();
+      throw e;
+    }
+    if (resultsList == null || resultsList.isEmpty()) {
+      return;
+    }
+    results = resultsList.iterator();
+  }
+
+  protected abstract T createQuery(String content) throws IOException;
+
+  protected abstract List<R> createResultsList(InputStream content) throws IOException;
+
+  @Override
+  public NullWritable createKey() {
+    return NullWritable.get();
+  }
+
+  @Override
+  public DruidWritable createValue() {
+    return new DruidWritable();
+  }
+
+  @Override
+  public abstract boolean next(NullWritable key, DruidWritable value) throws IOException;
+
+  @Override
+  public long getPos() {
+    return 0;
+  }
+
+  @Override
+  public abstract boolean nextKeyValue() throws IOException;
+
+  @Override
+  public abstract NullWritable getCurrentKey() throws IOException, InterruptedException;
+
+  @Override
+  // TODO: we could generate vector row batches so that vectorized execution may get triggered
+  public abstract DruidWritable getCurrentValue() throws IOException, InterruptedException;
+
+  @Override
+  public abstract float getProgress() throws IOException;
+
+  @Override
+  public void close() {
+    // Nothing to do
+  }
+
+}

http://git-wip-us.apache.org/repos/asf/hive/blob/58d1befa/druid-handler/src/java/org/apache/hadoop/hive/druid/serde/DruidSelectQueryRecordReader.java
----------------------------------------------------------------------
diff --git a/druid-handler/src/java/org/apache/hadoop/hive/druid/serde/DruidSelectQueryRecordReader.java b/druid-handler/src/java/org/apache/hadoop/hive/druid/serde/DruidSelectQueryRecordReader.java
new file mode 100644
index 0000000..70b493c
--- /dev/null
+++ b/druid-handler/src/java/org/apache/hadoop/hive/druid/serde/DruidSelectQueryRecordReader.java
@@ -0,0 +1,106 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.hadoop.hive.druid.serde;
+
+import java.io.IOException;
+import java.io.InputStream;
+import java.util.Iterator;
+import java.util.List;
+
+import org.apache.hadoop.hive.druid.DruidStorageHandlerUtils;
+import org.apache.hadoop.hive.ql.optimizer.calcite.druid.DruidTable;
+import org.apache.hadoop.io.NullWritable;
+
+import com.fasterxml.jackson.core.type.TypeReference;
+import com.google.common.collect.Iterators;
+
+import io.druid.query.Result;
+import io.druid.query.select.EventHolder;
+import io.druid.query.select.SelectQuery;
+import io.druid.query.select.SelectResultValue;
+
+/**
+ * Record reader for results for Druid SelectQuery.
+ */
+public class DruidSelectQueryRecordReader
+        extends DruidQueryRecordReader<SelectQuery, Result<SelectResultValue>> {
+
+  private Result<SelectResultValue> current;
+  private Iterator<EventHolder> values = Iterators.emptyIterator();
+
+  @Override
+  protected SelectQuery createQuery(String content) throws IOException {
+    return DruidStorageHandlerUtils.JSON_MAPPER.readValue(content, SelectQuery.class);
+  }
+
+  @Override
+  protected List<Result<SelectResultValue>> createResultsList(InputStream content) throws IOException {
+    return DruidStorageHandlerUtils.SMILE_MAPPER.readValue(content,
+            new TypeReference<List<Result<SelectResultValue>>>(){});
+  }
+
+  @Override
+  public boolean nextKeyValue() throws IOException {
+    if (values.hasNext()) {
+      return true;
+    }
+    if (results.hasNext()) {
+      current = results.next();
+      values = current.getValue().getEvents().iterator();
+      return true;
+    }
+    return false;
+  }
+
+  @Override
+  public NullWritable getCurrentKey() throws IOException, InterruptedException {
+    return NullWritable.get();
+  }
+
+  @Override
+  public DruidWritable getCurrentValue() throws IOException, InterruptedException {
+    // Create new value
+    DruidWritable value = new DruidWritable();
+    value.getValue().put(DruidTable.DEFAULT_TIMESTAMP_COLUMN, current.getTimestamp().getMillis());
+    if (values.hasNext()) {
+      value.getValue().putAll(values.next().getEvent());
+      return value;
+    }
+    return value;
+  }
+
+  @Override
+  public boolean next(NullWritable key, DruidWritable value) throws IOException {
+    if (nextKeyValue()) {
+      // Update value
+      value.getValue().clear();
+      value.getValue().put(DruidTable.DEFAULT_TIMESTAMP_COLUMN, current.getTimestamp().getMillis());
+      if (values.hasNext()) {
+        value.getValue().putAll(values.next().getEvent());
+      }
+      return true;
+    }
+    return false;
+  }
+
+  @Override
+  public float getProgress() {
+    return results.hasNext() || values.hasNext() ? 0 : 1;
+  }
+
+}

http://git-wip-us.apache.org/repos/asf/hive/blob/58d1befa/druid-handler/src/java/org/apache/hadoop/hive/druid/serde/DruidSerDe.java
----------------------------------------------------------------------
diff --git a/druid-handler/src/java/org/apache/hadoop/hive/druid/serde/DruidSerDe.java b/druid-handler/src/java/org/apache/hadoop/hive/druid/serde/DruidSerDe.java
new file mode 100644
index 0000000..8f53d4a
--- /dev/null
+++ b/druid-handler/src/java/org/apache/hadoop/hive/druid/serde/DruidSerDe.java
@@ -0,0 +1,343 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.hadoop.hive.druid.serde;
+
+import java.io.IOException;
+import java.io.InputStream;
+import java.sql.Timestamp;
+import java.util.ArrayList;
+import java.util.List;
+import java.util.Map.Entry;
+import java.util.Properties;
+
+import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.hive.conf.Constants;
+import org.apache.hadoop.hive.conf.HiveConf;
+import org.apache.hadoop.hive.druid.DruidStorageHandlerUtils;
+import org.apache.hadoop.hive.ql.optimizer.calcite.druid.DruidTable;
+import org.apache.hadoop.hive.serde2.AbstractSerDe;
+import org.apache.hadoop.hive.serde2.SerDeException;
+import org.apache.hadoop.hive.serde2.SerDeSpec;
+import org.apache.hadoop.hive.serde2.SerDeStats;
+import org.apache.hadoop.hive.serde2.io.TimestampWritable;
+import org.apache.hadoop.hive.serde2.objectinspector.ObjectInspector;
+import org.apache.hadoop.hive.serde2.objectinspector.ObjectInspectorFactory;
+import org.apache.hadoop.hive.serde2.objectinspector.primitive.PrimitiveObjectInspectorFactory;
+import org.apache.hadoop.hive.serde2.typeinfo.PrimitiveTypeInfo;
+import org.apache.hadoop.hive.serde2.typeinfo.TypeInfoFactory;
+import org.apache.hadoop.io.FloatWritable;
+import org.apache.hadoop.io.LongWritable;
+import org.apache.hadoop.io.NullWritable;
+import org.apache.hadoop.io.Text;
+import org.apache.hadoop.io.Writable;
+import org.apache.hadoop.util.StringUtils;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import com.fasterxml.jackson.core.type.TypeReference;
+import com.google.common.collect.Lists;
+import com.metamx.common.lifecycle.Lifecycle;
+import com.metamx.http.client.HttpClient;
+import com.metamx.http.client.HttpClientConfig;
+import com.metamx.http.client.HttpClientInit;
+
+import io.druid.query.Druids;
+import io.druid.query.Druids.SegmentMetadataQueryBuilder;
+import io.druid.query.Query;
+import io.druid.query.aggregation.AggregatorFactory;
+import io.druid.query.aggregation.PostAggregator;
+import io.druid.query.dimension.DimensionSpec;
+import io.druid.query.groupby.GroupByQuery;
+import io.druid.query.metadata.metadata.ColumnAnalysis;
+import io.druid.query.metadata.metadata.SegmentAnalysis;
+import io.druid.query.metadata.metadata.SegmentMetadataQuery;
+import io.druid.query.select.SelectQuery;
+import io.druid.query.timeseries.TimeseriesQuery;
+import io.druid.query.topn.TopNQuery;
+
+/**
+ * DruidSerDe that is used to  deserialize objects from a Druid data source.
+ */
+@SerDeSpec(schemaProps = {Constants.DRUID_DATA_SOURCE})
+public class DruidSerDe extends AbstractSerDe {
+
+  protected static final Logger LOG = LoggerFactory.getLogger(DruidSerDe.class);
+
+  private String[] columns;
+  private PrimitiveTypeInfo[] types;
+  private ObjectInspector inspector;
+
+
+  @Override
+  public void initialize(Configuration configuration, Properties properties) throws SerDeException {
+    final List<String> columnNames = new ArrayList<>();
+    final List<PrimitiveTypeInfo> columnTypes = new ArrayList<>();
+    List<ObjectInspector> inspectors = new ArrayList<>();
+
+    // Druid query
+    String druidQuery = properties.getProperty(Constants.DRUID_QUERY_JSON);
+    if (druidQuery == null) {
+      // No query. We need to create a Druid Segment Metadata query that retrieves all
+      // columns present in the data source (dimensions and metrics).
+      // Create Segment Metadata Query
+      String dataSource = properties.getProperty(Constants.DRUID_DATA_SOURCE);
+      if (dataSource == null) {
+        throw new SerDeException("Druid data source not specified; use " +
+                Constants.DRUID_DATA_SOURCE + " in table properties");
+      }
+      SegmentMetadataQueryBuilder builder = new Druids.SegmentMetadataQueryBuilder();
+      builder.dataSource(dataSource);
+      builder.merge(true);
+      builder.analysisTypes();
+      SegmentMetadataQuery query = builder.build();
+
+      // Execute query in Druid
+      String address = HiveConf.getVar(configuration,
+              HiveConf.ConfVars.HIVE_DRUID_BROKER_DEFAULT_ADDRESS);
+      if (org.apache.commons.lang3.StringUtils.isEmpty(address)) {
+        throw new SerDeException("Druid broker address not specified in configuration");
+      }
+
+      // Infer schema
+      SegmentAnalysis schemaInfo;
+      try {
+        schemaInfo = submitMetadataRequest(address, query);
+      } catch (IOException e) {
+        throw new SerDeException(e);
+      }
+      for (Entry<String,ColumnAnalysis> columnInfo : schemaInfo.getColumns().entrySet()) {
+        if (columnInfo.getKey().equals(DruidTable.DEFAULT_TIMESTAMP_COLUMN)) {
+          // Special handling for timestamp column
+          columnNames.add(columnInfo.getKey()); // field name
+          PrimitiveTypeInfo type = TypeInfoFactory.timestampTypeInfo; // field type
+          columnTypes.add(type);
+          inspectors.add(PrimitiveObjectInspectorFactory.getPrimitiveWritableObjectInspector(type));
+          continue;
+        }
+        columnNames.add(columnInfo.getKey()); // field name
+        PrimitiveTypeInfo type = DruidSerDeUtils.convertDruidToHiveType(
+                columnInfo.getValue().getType()); // field type
+        columnTypes.add(type);
+        inspectors.add(PrimitiveObjectInspectorFactory.getPrimitiveWritableObjectInspector(type));
+      }
+      columns = columnNames.toArray(new String[columnNames.size()]);
+      types = columnTypes.toArray(new PrimitiveTypeInfo[columnTypes.size()]);
+      inspector = ObjectInspectorFactory.getStandardStructObjectInspector(columnNames, inspectors);
+    } else {
+      // Query is specified, we can extract the results schema from the query
+      Query<?> query;
+      try {
+        query = DruidStorageHandlerUtils.JSON_MAPPER.readValue(druidQuery, Query.class);
+      } catch (Exception e) {
+        throw new SerDeException(e);
+      }
+
+      switch (query.getType()) {
+        case Query.TIMESERIES:
+          inferSchema((TimeseriesQuery) query, columnNames, columnTypes);
+          break;
+        case Query.TOPN:
+          inferSchema((TopNQuery) query, columnNames, columnTypes);
+          break;
+        case Query.SELECT:
+          inferSchema((SelectQuery) query, columnNames, columnTypes);
+          break;
+        case Query.GROUP_BY:
+          inferSchema((GroupByQuery) query, columnNames, columnTypes);
+          break;
+        default:
+          throw new SerDeException("Not supported Druid query");
+      }
+    
+      columns = new String[columnNames.size()];
+      types = new PrimitiveTypeInfo[columnNames.size()];
+      for (int i = 0; i < columnTypes.size(); ++i) {
+        columns[i] = columnNames.get(i);
+        types[i] = columnTypes.get(i);
+        inspectors.add(PrimitiveObjectInspectorFactory.getPrimitiveWritableObjectInspector(types[i]));
+      }
+      inspector = ObjectInspectorFactory.getStandardStructObjectInspector(columnNames, inspectors);
+    }
+
+    if (LOG.isDebugEnabled()) {
+      LOG.debug("DruidSerDe initialized with\n"
+              + "\t columns: " + columnNames
+              + "\n\t types: " + columnTypes);
+    }
+  }
+
+  /* Submits the request and returns */
+  protected SegmentAnalysis submitMetadataRequest(String address, SegmentMetadataQuery query)
+          throws SerDeException, IOException {
+    HttpClient client = HttpClientInit.createClient(HttpClientConfig.builder().build(), new Lifecycle());
+    InputStream response;
+    try {
+      response = DruidStorageHandlerUtils.submitRequest(client,
+              DruidStorageHandlerUtils.createRequest(address, query));
+    } catch (Exception e) {
+      throw new SerDeException(StringUtils.stringifyException(e));
+    }
+
+    // Retrieve results
+    List<SegmentAnalysis> resultsList;
+    try {
+      resultsList = DruidStorageHandlerUtils.SMILE_MAPPER.readValue(response,
+              new TypeReference<List<SegmentAnalysis>>() {});
+    } catch (Exception e) {
+      response.close();
+      throw new SerDeException(StringUtils.stringifyException(e));
+    }
+    if (resultsList == null || resultsList.isEmpty()) {
+      throw new SerDeException("Connected to Druid but could not retrieve datasource information");
+    }
+    if (resultsList.size() != 1) {
+      throw new SerDeException("Information about segments should have been merged");
+    }
+
+    return resultsList.get(0);
+  }
+
+  /* Timeseries query */
+  private void inferSchema(TimeseriesQuery query, List<String> columnNames,
+          List<PrimitiveTypeInfo> columnTypes) {
+    // Timestamp column
+    columnNames.add(DruidTable.DEFAULT_TIMESTAMP_COLUMN);
+    columnTypes.add(TypeInfoFactory.timestampTypeInfo);
+    // Aggregator columns
+    for (AggregatorFactory af : query.getAggregatorSpecs()) {
+      columnNames.add(af.getName());
+      columnTypes.add(DruidSerDeUtils.convertDruidToHiveType(af.getTypeName()));
+    }
+    // Post-aggregator columns
+    for (PostAggregator pa : query.getPostAggregatorSpecs()) {
+      columnNames.add(pa.getName());
+      columnTypes.add(TypeInfoFactory.floatTypeInfo);
+    }
+  }
+
+  /* TopN query */
+  private void inferSchema(TopNQuery query, List<String> columnNames, List<PrimitiveTypeInfo> columnTypes) {
+    // Timestamp column
+    columnNames.add(DruidTable.DEFAULT_TIMESTAMP_COLUMN);
+    columnTypes.add(TypeInfoFactory.timestampTypeInfo);
+    // Dimension column
+    columnNames.add(query.getDimensionSpec().getOutputName());
+    columnTypes.add(TypeInfoFactory.stringTypeInfo);
+    // Aggregator columns
+    for (AggregatorFactory af : query.getAggregatorSpecs()) {
+      columnNames.add(af.getName());
+      columnTypes.add(DruidSerDeUtils.convertDruidToHiveType(af.getTypeName()));
+    }
+    // Post-aggregator columns
+    for (PostAggregator pa : query.getPostAggregatorSpecs()) {
+      columnNames.add(pa.getName());
+      columnTypes.add(TypeInfoFactory.floatTypeInfo);
+    }
+  }
+
+  /* Select query */
+  private void inferSchema(SelectQuery query, List<String> columnNames,
+          List<PrimitiveTypeInfo> columnTypes) {
+    // Timestamp column
+    columnNames.add(DruidTable.DEFAULT_TIMESTAMP_COLUMN);
+    columnTypes.add(TypeInfoFactory.timestampTypeInfo);
+    // Dimension columns
+    for (DimensionSpec ds : query.getDimensions()) {
+      columnNames.add(ds.getOutputName());
+      columnTypes.add(TypeInfoFactory.stringTypeInfo);
+    }
+    // Metric columns
+    for (String metric : query.getMetrics()) {
+      columnNames.add(metric);
+      columnTypes.add(TypeInfoFactory.floatTypeInfo);
+    }
+  }
+
+  /* GroupBy query */
+  private void inferSchema(GroupByQuery query, List<String> columnNames, List<PrimitiveTypeInfo> columnTypes) {
+    // Timestamp column
+    columnNames.add(DruidTable.DEFAULT_TIMESTAMP_COLUMN);
+    columnTypes.add(TypeInfoFactory.timestampTypeInfo);
+    // Dimension columns
+    for (DimensionSpec ds : query.getDimensions()) {
+      columnNames.add(ds.getOutputName());
+      columnTypes.add(TypeInfoFactory.stringTypeInfo);
+    }
+    // Aggregator columns
+    for (AggregatorFactory af : query.getAggregatorSpecs()) {
+      columnNames.add(af.getName());
+      columnTypes.add(DruidSerDeUtils.convertDruidToHiveType(af.getTypeName()));
+    }
+    // Post-aggregator columns
+    for (PostAggregator pa : query.getPostAggregatorSpecs()) {
+      columnNames.add(pa.getName());
+      columnTypes.add(TypeInfoFactory.floatTypeInfo);
+    }
+  }
+
+  @Override
+  public Class<? extends Writable> getSerializedClass() {
+    return NullWritable.class;
+  }
+
+  @Override
+  public Writable serialize(Object o, ObjectInspector objectInspector) throws SerDeException {
+    return NullWritable.get();
+  }
+
+  @Override
+  public SerDeStats getSerDeStats() {
+    throw new UnsupportedOperationException("SerdeStats not supported.");
+  }
+
+  @Override
+  public Object deserialize(Writable writable) throws SerDeException {
+    DruidWritable input = (DruidWritable) writable;
+    List<Object> output = Lists.newArrayListWithExpectedSize(columns.length);
+    for (int i = 0; i < columns.length; i++) {
+      final Object value = input.getValue().get(columns[i]);
+      if (value == null) {
+        output.add(null);
+        continue;
+      }
+      switch (types[i].getPrimitiveCategory()) {
+        case TIMESTAMP:
+          output.add(new TimestampWritable(new Timestamp((Long)value)));
+          break;
+        case LONG:
+          output.add(new LongWritable(((Number)value).longValue()));
+          break;
+        case FLOAT:
+          output.add(new FloatWritable(((Number)value).floatValue()));
+          break;
+        case STRING:
+          output.add(new Text(value.toString()));
+          break;
+        default:
+          throw new SerDeException("Unknown type: " + types[i].getPrimitiveCategory());
+      }
+    }
+    return output;
+  }
+
+  @Override
+  public ObjectInspector getObjectInspector() throws SerDeException {
+    return inspector;
+  }
+
+}

http://git-wip-us.apache.org/repos/asf/hive/blob/58d1befa/druid-handler/src/java/org/apache/hadoop/hive/druid/serde/DruidSerDeUtils.java
----------------------------------------------------------------------
diff --git a/druid-handler/src/java/org/apache/hadoop/hive/druid/serde/DruidSerDeUtils.java b/druid-handler/src/java/org/apache/hadoop/hive/druid/serde/DruidSerDeUtils.java
new file mode 100644
index 0000000..29b8845
--- /dev/null
+++ b/druid-handler/src/java/org/apache/hadoop/hive/druid/serde/DruidSerDeUtils.java
@@ -0,0 +1,83 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.hadoop.hive.druid.serde;
+
+import org.apache.hadoop.hive.serde.serdeConstants;
+import org.apache.hadoop.hive.serde2.typeinfo.PrimitiveTypeInfo;
+import org.apache.hadoop.hive.serde2.typeinfo.TypeInfoFactory;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+/**
+ * Utils class for Druid SerDe.
+ */
+public final class DruidSerDeUtils {
+
+  private static final Logger LOG = LoggerFactory.getLogger(DruidSerDeUtils.class);
+
+  protected static final String FLOAT_TYPE = "FLOAT";
+  protected static final String LONG_TYPE = "LONG";
+  protected static final String STRING_TYPE = "STRING";
+
+  /* This method converts from the String representation of Druid type
+   * to the corresponding Hive type */
+  public static PrimitiveTypeInfo convertDruidToHiveType(String typeName) {
+    typeName = typeName.toUpperCase();
+    switch(typeName) {
+      case FLOAT_TYPE:
+        return TypeInfoFactory.floatTypeInfo;
+      case LONG_TYPE:
+        return TypeInfoFactory.longTypeInfo;
+      case STRING_TYPE:
+        return TypeInfoFactory.stringTypeInfo;
+      default:
+        // This is a guard for special Druid types e.g. hyperUnique
+        // (http://druid.io/docs/0.9.1.1/querying/aggregations.html#hyperunique-aggregator).
+        // Currently, we do not support doing anything special with them in Hive.
+        // However, those columns are there, and they can be actually read as normal
+        // dimensions e.g. with a select query. Thus, we print the warning and just read them
+        // as String.
+        LOG.warn("Transformation to STRING for unknown type " + typeName);
+        return TypeInfoFactory.stringTypeInfo;
+    }
+  }
+
+  /* This method converts from the String representation of Druid type
+   * to the String representation of the corresponding Hive type */
+  public static String convertDruidToHiveTypeString(String typeName) {
+    typeName = typeName.toUpperCase();
+    switch(typeName) {
+      case FLOAT_TYPE:
+        return serdeConstants.FLOAT_TYPE_NAME;
+      case LONG_TYPE:
+        return serdeConstants.BIGINT_TYPE_NAME;
+      case STRING_TYPE:
+        return serdeConstants.STRING_TYPE_NAME;
+      default:
+        // This is a guard for special Druid types e.g. hyperUnique
+        // (http://druid.io/docs/0.9.1.1/querying/aggregations.html#hyperunique-aggregator).
+        // Currently, we do not support doing anything special with them in Hive.
+        // However, those columns are there, and they can be actually read as normal
+        // dimensions e.g. with a select query. Thus, we print the warning and just read them
+        // as String.
+        LOG.warn("Transformation to STRING for unknown type " + typeName);
+        return serdeConstants.STRING_TYPE_NAME;
+    }
+  }
+
+}

http://git-wip-us.apache.org/repos/asf/hive/blob/58d1befa/druid-handler/src/java/org/apache/hadoop/hive/druid/serde/DruidTimeseriesQueryRecordReader.java
----------------------------------------------------------------------
diff --git a/druid-handler/src/java/org/apache/hadoop/hive/druid/serde/DruidTimeseriesQueryRecordReader.java b/druid-handler/src/java/org/apache/hadoop/hive/druid/serde/DruidTimeseriesQueryRecordReader.java
new file mode 100644
index 0000000..812ae03
--- /dev/null
+++ b/druid-handler/src/java/org/apache/hadoop/hive/druid/serde/DruidTimeseriesQueryRecordReader.java
@@ -0,0 +1,93 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.hadoop.hive.druid.serde;
+
+import java.io.IOException;
+import java.io.InputStream;
+import java.util.List;
+
+import org.apache.hadoop.hive.druid.DruidStorageHandlerUtils;
+import org.apache.hadoop.hive.ql.optimizer.calcite.druid.DruidTable;
+import org.apache.hadoop.io.NullWritable;
+
+import com.fasterxml.jackson.core.type.TypeReference;
+
+import io.druid.query.Result;
+import io.druid.query.timeseries.TimeseriesQuery;
+import io.druid.query.timeseries.TimeseriesResultValue;
+
+/**
+ * Record reader for results for Druid TimeseriesQuery.
+ */
+public class DruidTimeseriesQueryRecordReader
+        extends DruidQueryRecordReader<TimeseriesQuery, Result<TimeseriesResultValue>> {
+
+  private Result<TimeseriesResultValue> current;
+
+  @Override
+  protected TimeseriesQuery createQuery(String content) throws IOException {
+    return DruidStorageHandlerUtils.JSON_MAPPER.readValue(content, TimeseriesQuery.class);
+  }
+
+  @Override
+  protected List<Result<TimeseriesResultValue>> createResultsList(InputStream content) throws IOException {
+    return DruidStorageHandlerUtils.SMILE_MAPPER.readValue(content,
+            new TypeReference<List<Result<TimeseriesResultValue>>>(){});
+  }
+
+  @Override
+  public boolean nextKeyValue() {
+    if (results.hasNext()) {
+      current = results.next();
+      return true;
+    }
+    return false;
+  }
+
+  @Override
+  public NullWritable getCurrentKey() throws IOException, InterruptedException {
+    return NullWritable.get();
+  }
+
+  @Override
+  public DruidWritable getCurrentValue() throws IOException, InterruptedException {
+    // Create new value
+    DruidWritable value = new DruidWritable();
+    value.getValue().put(DruidTable.DEFAULT_TIMESTAMP_COLUMN, current.getTimestamp().getMillis());
+    value.getValue().putAll(current.getValue().getBaseObject());
+    return value;
+  }
+
+  @Override
+  public boolean next(NullWritable key, DruidWritable value) {
+    if (nextKeyValue()) {
+      // Update value
+      value.getValue().clear();
+      value.getValue().put(DruidTable.DEFAULT_TIMESTAMP_COLUMN, current.getTimestamp().getMillis());
+      value.getValue().putAll(current.getValue().getBaseObject());
+      return true;
+    }
+    return false;
+  }
+
+  @Override
+  public float getProgress() throws IOException {
+    return results.hasNext() ? 0 : 1;
+  }
+
+}