You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@hive.apache.org by jc...@apache.org on 2016/09/08 08:46:01 UTC
[6/6] hive git commit: HIVE-14217: Druid integration (Jesus Camacho
Rodriguez, reviewed by Ashutosh Chauhan)
HIVE-14217: Druid integration (Jesus Camacho Rodriguez, reviewed by Ashutosh Chauhan)
Close apache/hive#98
Project: http://git-wip-us.apache.org/repos/asf/hive/repo
Commit: http://git-wip-us.apache.org/repos/asf/hive/commit/58d1befa
Tree: http://git-wip-us.apache.org/repos/asf/hive/tree/58d1befa
Diff: http://git-wip-us.apache.org/repos/asf/hive/diff/58d1befa
Branch: refs/heads/master
Commit: 58d1befa2131254b53122b3573189ac1c5022217
Parents: 63fdb51
Author: Jesus Camacho Rodriguez <jc...@apache.org>
Authored: Fri Aug 12 12:55:46 2016 +0100
Committer: Jesus Camacho Rodriguez <jc...@apache.org>
Committed: Thu Sep 8 09:42:26 2016 +0100
----------------------------------------------------------------------
.../org/apache/hadoop/hive/conf/Constants.java | 10 +-
.../org/apache/hadoop/hive/conf/HiveConf.java | 11 +
druid-handler/pom.xml | 201 ++++
.../hadoop/hive/druid/DruidStorageHandler.java | 109 ++
.../hive/druid/DruidStorageHandlerUtils.java | 90 ++
.../hive/druid/HiveDruidOutputFormat.java | 55 +
.../druid/HiveDruidQueryBasedInputFormat.java | 369 ++++++
.../hadoop/hive/druid/HiveDruidSplit.java | 83 ++
.../serde/DruidGroupByQueryRecordReader.java | 199 ++++
.../druid/serde/DruidQueryRecordReader.java | 142 +++
.../serde/DruidSelectQueryRecordReader.java | 106 ++
.../hadoop/hive/druid/serde/DruidSerDe.java | 343 ++++++
.../hive/druid/serde/DruidSerDeUtils.java | 83 ++
.../serde/DruidTimeseriesQueryRecordReader.java | 93 ++
.../druid/serde/DruidTopNQueryRecordReader.java | 106 ++
.../hadoop/hive/druid/serde/DruidWritable.java | 81 ++
.../hadoop/hive/druid/QTestDruidSerDe.java | 88 ++
.../hive/druid/QTestDruidStorageHandler.java | 34 +
.../hadoop/hive/druid/TestDruidSerDe.java | 576 ++++++++++
.../TestHiveDruidQueryBasedInputFormat.java | 101 ++
itests/qtest/pom.xml | 13 +
packaging/pom.xml | 5 +
pom.xml | 2 +
.../org/apache/hadoop/hive/ql/exec/DDLTask.java | 8 +-
.../hadoop/hive/ql/exec/FunctionRegistry.java | 22 +-
.../optimizer/calcite/HivePlannerContext.java | 17 +-
.../calcite/druid/DruidIntervalUtils.java | 466 ++++++++
.../ql/optimizer/calcite/druid/DruidQuery.java | 1053 ++++++++++++++++++
.../optimizer/calcite/druid/DruidQueryType.java | 42 +
.../ql/optimizer/calcite/druid/DruidRules.java | 591 ++++++++++
.../ql/optimizer/calcite/druid/DruidSchema.java | 51 +
.../ql/optimizer/calcite/druid/DruidTable.java | 121 ++
.../optimizer/calcite/druid/HiveDruidConf.java | 33 +
.../functions/HiveSqlCountAggFunction.java | 2 +-
.../functions/HiveSqlMinMaxAggFunction.java | 2 +-
.../functions/HiveSqlSumAggFunction.java | 2 +-
.../reloperators/HiveDateGranularity.java | 54 +
.../rules/HiveProjectSortTransposeRule.java | 5 +
.../rules/HiveSortProjectTransposeRule.java | 5 +
.../calcite/translator/ASTBuilder.java | 38 +-
.../calcite/translator/ASTConverter.java | 9 +-
.../translator/SqlFunctionConverter.java | 23 +-
.../hadoop/hive/ql/parse/CalcitePlanner.java | 119 +-
.../hadoop/hive/ql/parse/SemanticAnalyzer.java | 22 +-
.../hadoop/hive/ql/plan/CreateTableDesc.java | 8 +-
.../hadoop/hive/ql/plan/TableScanDesc.java | 7 +
.../apache/hadoop/hive/ql/udf/UDFDateFloor.java | 506 +++++++++
.../hadoop/hive/ql/udf/UDFDateFloorDay.java | 39 +
.../hadoop/hive/ql/udf/UDFDateFloorHour.java | 39 +
.../hadoop/hive/ql/udf/UDFDateFloorMinute.java | 39 +
.../hadoop/hive/ql/udf/UDFDateFloorMonth.java | 39 +
.../hadoop/hive/ql/udf/UDFDateFloorQuarter.java | 39 +
.../hadoop/hive/ql/udf/UDFDateFloorSecond.java | 39 +
.../hadoop/hive/ql/udf/UDFDateFloorWeek.java | 39 +
.../hadoop/hive/ql/udf/UDFDateFloorYear.java | 39 +
.../calcite/TestCBORuleFiredOnlyOnce.java | 2 +-
.../ql/udf/TestUDFDateFormatGranularity.java | 85 ++
.../test/queries/clientnegative/druid_address.q | 5 +
.../test/queries/clientnegative/druid_buckets.q | 6 +
.../queries/clientnegative/druid_datasource.q | 3 +
.../queries/clientnegative/druid_external.q | 5 +
.../queries/clientnegative/druid_location.q | 6 +
.../queries/clientnegative/druid_partitions.q | 6 +
.../test/queries/clientpositive/druid_basic1.q | 18 +
.../test/queries/clientpositive/druid_basic2.q | 52 +
.../queries/clientpositive/druid_intervals.q | 67 ++
.../queries/clientpositive/druid_timeseries.q | 94 ++
ql/src/test/queries/clientpositive/druid_topn.q | 75 ++
.../results/clientnegative/druid_address.q.out | 7 +
.../results/clientnegative/druid_buckets.q.out | 8 +
.../clientnegative/druid_datasource.q.out | 7 +
.../results/clientnegative/druid_external.q.out | 7 +
.../results/clientnegative/druid_location.q.out | 9 +
.../clientnegative/druid_partitions.q.out | 8 +
.../results/clientpositive/create_view.q.out | 2 +
.../results/clientpositive/druid_basic1.q.out | 142 +++
.../results/clientpositive/druid_basic2.q.out | 533 +++++++++
.../clientpositive/druid_intervals.q.out | 398 +++++++
.../clientpositive/druid_timeseries.q.out | 566 ++++++++++
.../results/clientpositive/druid_topn.q.out | 419 +++++++
.../results/clientpositive/explain_ddl.q.out | 2 +
.../clientpositive/explain_logical.q.out | 16 +
.../test/results/clientpositive/join_view.q.out | 4 +
.../clientpositive/llap/explainuser_1.q.out | 2 +-
.../test/results/clientpositive/masking_2.q.out | 14 +
.../test/results/clientpositive/masking_6.q.out | 8 +
.../test/results/clientpositive/masking_7.q.out | 8 +
.../clientpositive/serde_user_properties.q.out | 4 +
.../results/clientpositive/show_functions.q.out | 9 +
.../clientpositive/spark/join_view.q.out | 4 +
.../results/clientpositive/subquery_notin.q.out | 6 +
.../results/clientpositive/subquery_views.q.out | 4 +
92 files changed, 8969 insertions(+), 60 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/hive/blob/58d1befa/common/src/java/org/apache/hadoop/hive/conf/Constants.java
----------------------------------------------------------------------
diff --git a/common/src/java/org/apache/hadoop/hive/conf/Constants.java b/common/src/java/org/apache/hadoop/hive/conf/Constants.java
index 00ec8c0..77c6aa5 100644
--- a/common/src/java/org/apache/hadoop/hive/conf/Constants.java
+++ b/common/src/java/org/apache/hadoop/hive/conf/Constants.java
@@ -15,8 +15,6 @@
* See the License for the specific language governing permissions and
* limitations under the License.
*/
-
-
package org.apache.hadoop.hive.conf;
public class Constants {
@@ -24,4 +22,12 @@ public class Constants {
public static final String LLAP_LOGGER_NAME_QUERY_ROUTING = "query-routing";
public static final String LLAP_LOGGER_NAME_CONSOLE = "console";
public static final String LLAP_LOGGER_NAME_RFA = "RFA";
+
+ /* Constants for Druid storage handler */
+ public static final String DRUID_HIVE_STORAGE_HANDLER_ID =
+ "org.apache.hadoop.hive.druid.DruidStorageHandler";
+ public static final String DRUID_DATA_SOURCE = "druid.datasource";
+ public static final String DRUID_QUERY_JSON = "druid.query.json";
+ public static final String DRUID_QUERY_TYPE = "druid.query.type";
+ public static final String DRUID_QUERY_FETCH = "druid.query.fetch";
}
http://git-wip-us.apache.org/repos/asf/hive/blob/58d1befa/common/src/java/org/apache/hadoop/hive/conf/HiveConf.java
----------------------------------------------------------------------
diff --git a/common/src/java/org/apache/hadoop/hive/conf/HiveConf.java b/common/src/java/org/apache/hadoop/hive/conf/HiveConf.java
index 13cfdf1..d6944ee 100644
--- a/common/src/java/org/apache/hadoop/hive/conf/HiveConf.java
+++ b/common/src/java/org/apache/hadoop/hive/conf/HiveConf.java
@@ -1884,6 +1884,17 @@ public class HiveConf extends Configuration {
WRITE_SET_REAPER_INTERVAL("hive.writeset.reaper.interval", "60s",
new TimeValidator(TimeUnit.MILLISECONDS), "Frequency of WriteSet reaper runs"),
+ // For Druid storage handler
+ HIVE_DRUID_BROKER_DEFAULT_ADDRESS("hive.druid.broker.address.default", "localhost:8082",
+ "Address of the Druid broker. If we are querying Druid from Hive, this address needs to be\n" +
+ "declared"),
+ HIVE_DRUID_SELECT_THRESHOLD("hive.druid.select.threshold", 10000,
+ "When we can split a Select query, this is the maximum number of rows that we try to retrieve\n" +
+ "per query. In order to do that, we obtain the estimated size for the complete result. If the\n" +
+ "number of records of the query results is larger than this threshold, we split the query in\n" +
+ "total number of rows/threshold parts across the time dimension. Note that we assume the\n" +
+ "records to be split uniformly across the time dimension"),
+
// For HBase storage handler
HIVE_HBASE_WAL_ENABLED("hive.hbase.wal.enabled", true,
"Whether writes to HBase should be forced to the write-ahead log. \n" +
http://git-wip-us.apache.org/repos/asf/hive/blob/58d1befa/druid-handler/pom.xml
----------------------------------------------------------------------
diff --git a/druid-handler/pom.xml b/druid-handler/pom.xml
new file mode 100644
index 0000000..2173cdc
--- /dev/null
+++ b/druid-handler/pom.xml
@@ -0,0 +1,201 @@
+<?xml version="1.0" encoding="UTF-8"?>
+<!--
+ Licensed under the Apache License, Version 2.0 (the "License");
+ you may not use this file except in compliance with the License.
+ You may obtain a copy of the License at
+
+ http://www.apache.org/licenses/LICENSE-2.0
+
+ Unless required by applicable law or agreed to in writing, software
+ distributed under the License is distributed on an "AS IS" BASIS,
+ WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ See the License for the specific language governing permissions and
+ limitations under the License.
+-->
+<project xmlns="http://maven.apache.org/POM/4.0.0"
+ xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
+ xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/xsd/maven-4.0.0.xsd">
+ <modelVersion>4.0.0</modelVersion>
+ <parent>
+ <groupId>org.apache.hive</groupId>
+ <artifactId>hive</artifactId>
+ <version>2.2.0-SNAPSHOT</version>
+ <relativePath>../pom.xml</relativePath>
+ </parent>
+
+ <artifactId>hive-druid-handler</artifactId>
+ <packaging>jar</packaging>
+ <name>Hive Druid Handler</name>
+
+ <properties>
+ <hive.path.to.root>..</hive.path.to.root>
+ </properties>
+
+ <dependencies>
+ <!-- dependencies are always listed in sorted order by groupId, artifectId -->
+ <!-- intra-project -->
+ <dependency>
+ <groupId>org.apache.hive</groupId>
+ <artifactId>hive-exec</artifactId>
+ <version>${project.version}</version>
+ <exclusions>
+ <exclusion>
+ <groupId>io.netty</groupId>
+ <artifactId>netty-all</artifactId>
+ </exclusion>
+ <exclusion>
+ <groupId>io.netty</groupId>
+ <artifactId>netty</artifactId>
+ </exclusion>
+ </exclusions>
+ </dependency>
+ <!-- inter-project -->
+ <dependency>
+ <groupId>commons-lang</groupId>
+ <artifactId>commons-lang</artifactId>
+ <version>${commons-lang.version}</version>
+ </dependency>
+ <dependency>
+ <groupId>org.apache.hadoop</groupId>
+ <artifactId>hadoop-common</artifactId>
+ <version>${hadoop.version}</version>
+ <optional>true</optional>
+ <exclusions>
+ <exclusion>
+ <groupId>org.slf4j</groupId>
+ <artifactId>slf4j-log4j12</artifactId>
+ </exclusion>
+ <exclusion>
+ <groupId>commmons-logging</groupId>
+ <artifactId>commons-logging</artifactId>
+ </exclusion>
+ </exclusions>
+ </dependency>
+ <dependency>
+ <groupId>org.apache.hadoop</groupId>
+ <artifactId>hadoop-mapreduce-client-core</artifactId>
+ <version>${hadoop.version}</version>
+ <optional>true</optional>
+ <exclusions>
+ <exclusion>
+ <groupId>io.netty</groupId>
+ <artifactId>netty</artifactId>
+ </exclusion>
+ </exclusions>
+ </dependency>
+ <dependency>
+ <groupId>io.druid</groupId>
+ <artifactId>druid-processing</artifactId>
+ <version>${druid.version}</version>
+ <exclusions>
+ <exclusion>
+ <groupId>com.fasterxml.jackson.core</groupId>
+ <artifactId>jackson-core</artifactId>
+ </exclusion>
+ <exclusion>
+ <groupId>com.fasterxml.jackson.core</groupId>
+ <artifactId>jackson-annotations</artifactId>
+ </exclusion>
+ <exclusion>
+ <groupId>com.fasterxml.jackson.core</groupId>
+ <artifactId>jackson-databind</artifactId>
+ </exclusion>
+ </exclusions>
+ </dependency>
+
+ <!-- test inter-project -->
+ <dependency>
+ <groupId>junit</groupId>
+ <artifactId>junit</artifactId>
+ <version>${junit.version}</version>
+ <scope>test</scope>
+ </dependency>
+ </dependencies>
+
+ <build>
+ <sourceDirectory>${basedir}/src/java</sourceDirectory>
+ <testSourceDirectory>${basedir}/src/test</testSourceDirectory>
+ <plugins>
+ <plugin>
+ <groupId>org.apache.maven.plugins</groupId>
+ <artifactId>maven-jar-plugin</artifactId>
+ <executions>
+ <execution>
+ <goals>
+ <goal>test-jar</goal>
+ </goals>
+ </execution>
+ </executions>
+ </plugin>
+ <plugin>
+ <groupId>org.apache.maven.plugins</groupId>
+ <artifactId>maven-shade-plugin</artifactId>
+ <version>${maven.shade.plugin.version}</version>
+ <executions>
+ <!-- we need to shade netty, as there is a conflict between versions
+ used by Hadoop (3.6.2.Final) and Druid (3.10.4.Final) -->
+ <!-- we need to shade jackson, as there is a conflict between versions
+ used by Hive (2.4.2) and Druid (2.4.6) -->
+ <execution>
+ <phase>package</phase>
+ <goals>
+ <goal>shade</goal>
+ </goals>
+ <configuration>
+ <shadeTestJar>true</shadeTestJar>
+ <createDependencyReducedPom>false</createDependencyReducedPom>
+ <relocations>
+ <relocation>
+ <pattern>io.druid</pattern>
+ <shadedPattern>org.apache.hive.druid.io.druid</shadedPattern>
+ </relocation>
+ <relocation>
+ <pattern>com.metamx.emitter</pattern>
+ <shadedPattern>org.apache.hive.druid.com.metamx.emitter</shadedPattern>
+ </relocation>
+ <relocation>
+ <pattern>com.metamx.http.client</pattern>
+ <shadedPattern>org.apache.hive.druid.com.metamx.http.client</shadedPattern>
+ </relocation>
+ <relocation>
+ <pattern>io.netty</pattern>
+ <shadedPattern>org.apache.hive.druid.io.netty</shadedPattern>
+ </relocation>
+ <relocation>
+ <pattern>org.jboss.netty</pattern>
+ <shadedPattern>org.apache.hive.druid.org.jboss.netty</shadedPattern>
+ </relocation>
+ <relocation>
+ <pattern>com.fasterxml.jackson</pattern>
+ <shadedPattern>org.apache.hive.druid.com.fasterxml.jackson</shadedPattern>
+ </relocation>
+ </relocations>
+ <artifactSet>
+ <includes>
+ <include>io.druid:*</include>
+ <include>com.metamx:emitter:*</include>
+ <include>com.metamx:http-client:*</include>
+ <include>io.netty:*</include>
+ <include>com.fasterxml.jackson.core:*</include>
+ <include>com.fasterxml.jackson.datatype:*</include>
+ <include>com.fasterxml.jackson.dataformat:*</include>
+ </includes>
+ </artifactSet>
+ <filters>
+ <filter>
+ <artifact>*:*</artifact>
+ <excludes>
+ <exclude>META-INF/*.SF</exclude>
+ <exclude>META-INF/*.DSA</exclude>
+ <exclude>META-INF/*.RSA</exclude>
+ </excludes>
+ </filter>
+ </filters>
+ </configuration>
+ </execution>
+ </executions>
+ </plugin>
+ </plugins>
+ </build>
+
+</project>
http://git-wip-us.apache.org/repos/asf/hive/blob/58d1befa/druid-handler/src/java/org/apache/hadoop/hive/druid/DruidStorageHandler.java
----------------------------------------------------------------------
diff --git a/druid-handler/src/java/org/apache/hadoop/hive/druid/DruidStorageHandler.java b/druid-handler/src/java/org/apache/hadoop/hive/druid/DruidStorageHandler.java
new file mode 100644
index 0000000..ac03099
--- /dev/null
+++ b/druid-handler/src/java/org/apache/hadoop/hive/druid/DruidStorageHandler.java
@@ -0,0 +1,109 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.hadoop.hive.druid;
+
+import org.apache.commons.lang3.StringUtils;
+import org.apache.hadoop.hive.conf.Constants;
+import org.apache.hadoop.hive.druid.serde.DruidSerDe;
+import org.apache.hadoop.hive.metastore.HiveMetaHook;
+import org.apache.hadoop.hive.metastore.MetaStoreUtils;
+import org.apache.hadoop.hive.metastore.api.MetaException;
+import org.apache.hadoop.hive.metastore.api.Table;
+import org.apache.hadoop.hive.ql.metadata.DefaultStorageHandler;
+import org.apache.hadoop.hive.serde2.SerDe;
+import org.apache.hadoop.mapred.InputFormat;
+import org.apache.hadoop.mapred.OutputFormat;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+/**
+ * DruidStorageHandler provides a HiveStorageHandler implementation for Druid.
+ */
+@SuppressWarnings({"deprecation","rawtypes"})
+public class DruidStorageHandler extends DefaultStorageHandler implements HiveMetaHook {
+
+ protected static final Logger LOG = LoggerFactory.getLogger(DruidStorageHandler.class);
+
+ @Override
+ public Class<? extends InputFormat> getInputFormatClass() {
+ return HiveDruidQueryBasedInputFormat.class;
+ }
+
+ @Override
+ public Class<? extends OutputFormat> getOutputFormatClass() {
+ return HiveDruidOutputFormat.class;
+ }
+
+ @Override
+ public Class<? extends SerDe> getSerDeClass() {
+ return DruidSerDe.class;
+ }
+
+ @Override
+ public HiveMetaHook getMetaHook() {
+ return this;
+ }
+
+ @Override
+ public void preCreateTable(Table table) throws MetaException {
+ // Do safety checks
+ if (!MetaStoreUtils.isExternalTable(table)) {
+ throw new MetaException("Table in Druid needs to be declared as EXTERNAL");
+ }
+ if (!StringUtils.isEmpty(table.getSd().getLocation())) {
+ throw new MetaException("LOCATION may not be specified for Druid");
+ }
+ if (table.getPartitionKeysSize() != 0) {
+ throw new MetaException("PARTITIONED BY may not be specified for Druid");
+ }
+ if (table.getSd().getBucketColsSize() != 0) {
+ throw new MetaException("CLUSTERED BY may not be specified for Druid");
+ }
+ }
+
+ @Override
+ public void rollbackCreateTable(Table table) throws MetaException {
+ // Nothing to do
+ }
+
+ @Override
+ public void commitCreateTable(Table table) throws MetaException {
+ // Nothing to do
+ }
+
+ @Override
+ public void preDropTable(Table table) throws MetaException {
+ // Nothing to do
+ }
+
+ @Override
+ public void rollbackDropTable(Table table) throws MetaException {
+ // Nothing to do
+ }
+
+ @Override
+ public void commitDropTable(Table table, boolean deleteData) throws MetaException {
+ // Nothing to do
+ }
+
+ @Override
+ public String toString() {
+ return Constants.DRUID_HIVE_STORAGE_HANDLER_ID;
+ }
+
+}
http://git-wip-us.apache.org/repos/asf/hive/blob/58d1befa/druid-handler/src/java/org/apache/hadoop/hive/druid/DruidStorageHandlerUtils.java
----------------------------------------------------------------------
diff --git a/druid-handler/src/java/org/apache/hadoop/hive/druid/DruidStorageHandlerUtils.java b/druid-handler/src/java/org/apache/hadoop/hive/druid/DruidStorageHandlerUtils.java
new file mode 100644
index 0000000..c6b8024
--- /dev/null
+++ b/druid-handler/src/java/org/apache/hadoop/hive/druid/DruidStorageHandlerUtils.java
@@ -0,0 +1,90 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.hadoop.hive.druid;
+
+import java.io.IOException;
+import java.io.InputStream;
+import java.net.URL;
+import java.util.concurrent.ExecutionException;
+
+import org.jboss.netty.handler.codec.http.HttpHeaders;
+import org.jboss.netty.handler.codec.http.HttpMethod;
+
+import com.fasterxml.jackson.databind.ObjectMapper;
+import com.fasterxml.jackson.dataformat.smile.SmileFactory;
+import com.metamx.http.client.HttpClient;
+import com.metamx.http.client.Request;
+import com.metamx.http.client.response.InputStreamResponseHandler;
+
+import io.druid.jackson.DefaultObjectMapper;
+import io.druid.query.BaseQuery;
+
+/**
+ * Utils class for Druid storage handler.
+ */
+public final class DruidStorageHandlerUtils {
+
+ private static final String SMILE_CONTENT_TYPE = "application/x-jackson-smile";
+
+ /**
+ * Mapper to use to serialize/deserialize Druid objects (JSON)
+ */
+ public static final ObjectMapper JSON_MAPPER = new DefaultObjectMapper();
+
+ /**
+ * Mapper to use to serialize/deserialize Druid objects (SMILE)
+ */
+ public static final ObjectMapper SMILE_MAPPER = new DefaultObjectMapper(new SmileFactory());
+
+ /**
+ * Method that creates a request for Druid JSON query (using SMILE).
+ * @param mapper
+ * @param address
+ * @param query
+ * @return
+ * @throws IOException
+ */
+ public static Request createRequest(String address, BaseQuery<?> query)
+ throws IOException {
+ return new Request(HttpMethod.POST, new URL(String.format("%s/druid/v2/", "http://" + address)))
+ .setContent(SMILE_MAPPER.writeValueAsBytes(query))
+ .setHeader(HttpHeaders.Names.CONTENT_TYPE, SMILE_CONTENT_TYPE);
+ }
+
+ /**
+ * Method that submits a request to an Http address and retrieves the result.
+ * The caller is responsible for closing the stream once it finishes consuming it.
+ * @param client
+ * @param request
+ * @return
+ * @throws IOException
+ */
+ public static InputStream submitRequest(HttpClient client, Request request)
+ throws IOException {
+ InputStream response;
+ try {
+ response = client.go(request, new InputStreamResponseHandler()).get();
+ } catch (ExecutionException e) {
+ throw new IOException(e.getCause());
+ } catch (InterruptedException e) {
+ throw new IOException(e.getCause());
+ }
+ return response;
+ }
+
+}
http://git-wip-us.apache.org/repos/asf/hive/blob/58d1befa/druid-handler/src/java/org/apache/hadoop/hive/druid/HiveDruidOutputFormat.java
----------------------------------------------------------------------
diff --git a/druid-handler/src/java/org/apache/hadoop/hive/druid/HiveDruidOutputFormat.java b/druid-handler/src/java/org/apache/hadoop/hive/druid/HiveDruidOutputFormat.java
new file mode 100644
index 0000000..45e31d6
--- /dev/null
+++ b/druid-handler/src/java/org/apache/hadoop/hive/druid/HiveDruidOutputFormat.java
@@ -0,0 +1,55 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.hadoop.hive.druid;
+
+import java.io.IOException;
+import java.util.Properties;
+
+import org.apache.hadoop.fs.FileSystem;
+import org.apache.hadoop.fs.Path;
+import org.apache.hadoop.hive.ql.exec.FileSinkOperator;
+import org.apache.hadoop.hive.ql.io.HiveOutputFormat;
+import org.apache.hadoop.mapred.JobConf;
+import org.apache.hadoop.mapred.RecordWriter;
+import org.apache.hadoop.util.Progressable;
+
+/**
+ * Place holder for Druid output format. Currently not implemented.
+ */
+@SuppressWarnings("rawtypes")
+public class HiveDruidOutputFormat implements HiveOutputFormat {
+
+ @Override
+ public RecordWriter getRecordWriter(FileSystem ignored, JobConf job, String name,
+ Progressable progress) throws IOException {
+ throw new UnsupportedOperationException();
+ }
+
+ @Override
+ public void checkOutputSpecs(FileSystem ignored, JobConf job) throws IOException {
+ throw new UnsupportedOperationException();
+ }
+
+ @Override
+ public FileSinkOperator.RecordWriter getHiveRecordWriter(JobConf jc, Path finalOutPath,
+ Class valueClass, boolean isCompressed, Properties tableProperties, Progressable progress)
+ throws IOException {
+ throw new UnsupportedOperationException();
+ }
+
+}
http://git-wip-us.apache.org/repos/asf/hive/blob/58d1befa/druid-handler/src/java/org/apache/hadoop/hive/druid/HiveDruidQueryBasedInputFormat.java
----------------------------------------------------------------------
diff --git a/druid-handler/src/java/org/apache/hadoop/hive/druid/HiveDruidQueryBasedInputFormat.java b/druid-handler/src/java/org/apache/hadoop/hive/druid/HiveDruidQueryBasedInputFormat.java
new file mode 100644
index 0000000..3df1452
--- /dev/null
+++ b/druid-handler/src/java/org/apache/hadoop/hive/druid/HiveDruidQueryBasedInputFormat.java
@@ -0,0 +1,369 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.hadoop.hive.druid;
+
+import java.io.IOException;
+import java.io.InputStream;
+import java.util.ArrayList;
+import java.util.Arrays;
+import java.util.HashMap;
+import java.util.List;
+import java.util.Map;
+
+import org.apache.commons.lang3.StringEscapeUtils;
+import org.apache.commons.lang3.StringUtils;
+import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.fs.Path;
+import org.apache.hadoop.hive.conf.Constants;
+import org.apache.hadoop.hive.conf.HiveConf;
+import org.apache.hadoop.hive.druid.serde.DruidGroupByQueryRecordReader;
+import org.apache.hadoop.hive.druid.serde.DruidQueryRecordReader;
+import org.apache.hadoop.hive.druid.serde.DruidSelectQueryRecordReader;
+import org.apache.hadoop.hive.druid.serde.DruidTimeseriesQueryRecordReader;
+import org.apache.hadoop.hive.druid.serde.DruidTopNQueryRecordReader;
+import org.apache.hadoop.hive.druid.serde.DruidWritable;
+import org.apache.hadoop.hive.ql.optimizer.calcite.druid.DruidIntervalUtils;
+import org.apache.hadoop.hive.ql.optimizer.calcite.druid.DruidTable;
+import org.apache.hadoop.hive.shims.ShimLoader;
+import org.apache.hadoop.io.NullWritable;
+import org.apache.hadoop.mapred.JobConf;
+import org.apache.hadoop.mapred.Reporter;
+import org.apache.hadoop.mapreduce.InputFormat;
+import org.apache.hadoop.mapreduce.InputSplit;
+import org.apache.hadoop.mapreduce.Job;
+import org.apache.hadoop.mapreduce.JobContext;
+import org.apache.hadoop.mapreduce.RecordReader;
+import org.apache.hadoop.mapreduce.TaskAttemptContext;
+import org.apache.hadoop.mapreduce.lib.input.FileInputFormat;
+import org.joda.time.Interval;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import com.fasterxml.jackson.core.type.TypeReference;
+import com.metamx.common.lifecycle.Lifecycle;
+import com.metamx.http.client.HttpClient;
+import com.metamx.http.client.HttpClientConfig;
+import com.metamx.http.client.HttpClientInit;
+
+import io.druid.query.Druids;
+import io.druid.query.Druids.SegmentMetadataQueryBuilder;
+import io.druid.query.Druids.SelectQueryBuilder;
+import io.druid.query.Druids.TimeBoundaryQueryBuilder;
+import io.druid.query.Query;
+import io.druid.query.Result;
+import io.druid.query.metadata.metadata.SegmentAnalysis;
+import io.druid.query.metadata.metadata.SegmentMetadataQuery;
+import io.druid.query.select.PagingSpec;
+import io.druid.query.select.SelectQuery;
+import io.druid.query.spec.MultipleIntervalSegmentSpec;
+import io.druid.query.timeboundary.TimeBoundaryQuery;
+import io.druid.query.timeboundary.TimeBoundaryResultValue;
+
+/**
+ * Druid query based input format.
+ *
+ * Given a query and the Druid broker address, it will send it, and retrieve
+ * and parse the results.
+ */
+public class HiveDruidQueryBasedInputFormat extends InputFormat<NullWritable, DruidWritable>
+ implements org.apache.hadoop.mapred.InputFormat<NullWritable, DruidWritable> {
+
+ protected static final Logger LOG = LoggerFactory.getLogger(HiveDruidQueryBasedInputFormat.class);
+
+ @Override
+ public org.apache.hadoop.mapred.InputSplit[] getSplits(JobConf job, int numSplits)
+ throws IOException {
+ return getInputSplits(job);
+ }
+
+ @Override
+ public List<InputSplit> getSplits(JobContext context) throws IOException, InterruptedException {
+ return Arrays.<InputSplit> asList(getInputSplits(context.getConfiguration()));
+ }
+
+ @SuppressWarnings("deprecation")
+ private HiveDruidSplit[] getInputSplits(Configuration conf) throws IOException {
+ String address = HiveConf.getVar(conf,
+ HiveConf.ConfVars.HIVE_DRUID_BROKER_DEFAULT_ADDRESS);
+ if (StringUtils.isEmpty(address)) {
+ throw new IOException("Druid broker address not specified in configuration");
+ }
+ String druidQuery = StringEscapeUtils.unescapeJava(conf.get(Constants.DRUID_QUERY_JSON));
+ String druidQueryType;
+ if (StringUtils.isEmpty(druidQuery)) {
+ // Empty, maybe because CBO did not run; we fall back to
+ // full Select query
+ if (LOG.isWarnEnabled()) {
+ LOG.warn("Druid query is empty; creating Select query");
+ }
+ String dataSource = conf.get(Constants.DRUID_DATA_SOURCE);
+ if (dataSource == null) {
+ throw new IOException("Druid data source cannot be empty");
+ }
+ druidQuery = createSelectStarQuery(address, dataSource);
+ druidQueryType = Query.SELECT;
+ } else {
+ druidQueryType = conf.get(Constants.DRUID_QUERY_TYPE);
+ if (druidQueryType == null) {
+ throw new IOException("Druid query type not recognized");
+ }
+ }
+
+ // hive depends on FileSplits
+ Job job = new Job(conf);
+ JobContext jobContext = ShimLoader.getHadoopShims().newJobContext(job);
+ Path [] paths = FileInputFormat.getInputPaths(jobContext);
+
+ switch (druidQueryType) {
+ case Query.TIMESERIES:
+ case Query.TOPN:
+ case Query.GROUP_BY:
+ return new HiveDruidSplit[] { new HiveDruidSplit(address, druidQuery, paths[0]) };
+ case Query.SELECT:
+ return splitSelectQuery(conf, address, druidQuery, paths[0]);
+ default:
+ throw new IOException("Druid query type not recognized");
+ }
+ }
+
+ private static String createSelectStarQuery(String address, String dataSource) throws IOException {
+ // Create Select query
+ SelectQueryBuilder builder = new Druids.SelectQueryBuilder();
+ builder.dataSource(dataSource);
+ builder.intervals(Arrays.asList(DruidTable.DEFAULT_INTERVAL));
+ builder.pagingSpec(PagingSpec.newSpec(1));
+ Map<String, Object> context = new HashMap<>();
+ context.put(Constants.DRUID_QUERY_FETCH, false);
+ builder.context(context);
+ return DruidStorageHandlerUtils.JSON_MAPPER.writeValueAsString(builder.build());
+ }
+
+ /* Method that splits Select query depending on the threshold so read can be
+ * parallelized */
+ private static HiveDruidSplit[] splitSelectQuery(Configuration conf, String address,
+ String druidQuery, Path dummyPath) throws IOException {
+ final int selectThreshold = (int) HiveConf.getIntVar(
+ conf, HiveConf.ConfVars.HIVE_DRUID_SELECT_THRESHOLD);
+
+ SelectQuery query;
+ try {
+ query = DruidStorageHandlerUtils.JSON_MAPPER.readValue(druidQuery, SelectQuery.class);
+ } catch (Exception e) {
+ throw new IOException(e);
+ }
+
+ final boolean isFetch = query.getContextBoolean(Constants.DRUID_QUERY_FETCH, false);
+ if (isFetch) {
+ // If it has a limit, we use it and we do not split the query
+ return new HiveDruidSplit[] { new HiveDruidSplit(
+ address, DruidStorageHandlerUtils.JSON_MAPPER.writeValueAsString(query), dummyPath) };
+ }
+
+ // We do not have the number of rows, thus we need to execute a
+ // Segment Metadata query to obtain number of rows
+ SegmentMetadataQueryBuilder metadataBuilder = new Druids.SegmentMetadataQueryBuilder();
+ metadataBuilder.dataSource(query.getDataSource());
+ metadataBuilder.intervals(query.getIntervals());
+ metadataBuilder.merge(true);
+ metadataBuilder.analysisTypes();
+ SegmentMetadataQuery metadataQuery = metadataBuilder.build();
+
+ HttpClient client = HttpClientInit.createClient(HttpClientConfig.builder().build(), new Lifecycle());
+ InputStream response;
+ try {
+ response = DruidStorageHandlerUtils.submitRequest(client,
+ DruidStorageHandlerUtils.createRequest(address, metadataQuery));
+ } catch (Exception e) {
+ throw new IOException(org.apache.hadoop.util.StringUtils.stringifyException(e));
+ }
+
+ // Retrieve results
+ List<SegmentAnalysis> metadataList;
+ try {
+ metadataList = DruidStorageHandlerUtils.SMILE_MAPPER.readValue(response,
+ new TypeReference<List<SegmentAnalysis>>() {});
+ } catch (Exception e) {
+ response.close();
+ throw new IOException(org.apache.hadoop.util.StringUtils.stringifyException(e));
+ }
+ if (metadataList == null || metadataList.isEmpty()) {
+ throw new IOException("Connected to Druid but could not retrieve datasource information");
+ }
+ if (metadataList.size() != 1) {
+ throw new IOException("Information about segments should have been merged");
+ }
+
+ final long numRows = metadataList.get(0).getNumRows();
+
+ query = query.withPagingSpec(PagingSpec.newSpec(selectThreshold));
+ if (numRows <= selectThreshold) {
+ // We are not going to split it
+ return new HiveDruidSplit[] { new HiveDruidSplit(address,
+ DruidStorageHandlerUtils.JSON_MAPPER.writeValueAsString(query), dummyPath) };
+ }
+
+ // If the query does not specify a timestamp, we obtain the total time using
+ // a Time Boundary query. Then, we use the information to split the query
+ // following the Select threshold configuration property
+ final List<Interval> intervals = new ArrayList<>();
+ if (query.getIntervals().size() == 1 &&
+ query.getIntervals().get(0).equals(DruidTable.DEFAULT_INTERVAL)) {
+ // Default max and min, we should execute a time boundary query to get a
+ // more precise range
+ TimeBoundaryQueryBuilder timeBuilder = new Druids.TimeBoundaryQueryBuilder();
+ timeBuilder.dataSource(query.getDataSource());
+ TimeBoundaryQuery timeQuery = timeBuilder.build();
+
+ try {
+ response = DruidStorageHandlerUtils.submitRequest(client,
+ DruidStorageHandlerUtils.createRequest(address, timeQuery));
+ } catch (Exception e) {
+ throw new IOException(org.apache.hadoop.util.StringUtils.stringifyException(e));
+ }
+
+ // Retrieve results
+ List<Result<TimeBoundaryResultValue>> timeList;
+ try {
+ timeList = DruidStorageHandlerUtils.SMILE_MAPPER.readValue(response,
+ new TypeReference<List<Result<TimeBoundaryResultValue>>>() {});
+ } catch (Exception e) {
+ response.close();
+ throw new IOException(org.apache.hadoop.util.StringUtils.stringifyException(e));
+ }
+ if (timeList == null || timeList.isEmpty()) {
+ throw new IOException("Connected to Druid but could not retrieve time boundary information");
+ }
+ if (timeList.size() != 1) {
+ throw new IOException("We should obtain a single time boundary");
+ }
+
+ intervals.add(new Interval(timeList.get(0).getValue().getMinTime().getMillis(),
+ timeList.get(0).getValue().getMaxTime().getMillis()));
+ } else {
+ intervals.addAll(query.getIntervals());
+ }
+
+ // Create (numRows/default threshold) input splits
+ int numSplits = (int) Math.ceil((double) numRows / selectThreshold);
+ List<List<Interval>> newIntervals = createSplitsIntervals(intervals, numSplits);
+ HiveDruidSplit[] splits = new HiveDruidSplit[numSplits];
+ for (int i = 0; i < numSplits; i++) {
+ // Create partial Select query
+ final SelectQuery partialQuery = query.withQuerySegmentSpec(
+ new MultipleIntervalSegmentSpec(newIntervals.get(i)));
+ splits[i] = new HiveDruidSplit(address,
+ DruidStorageHandlerUtils.JSON_MAPPER.writeValueAsString(partialQuery), dummyPath);
+ }
+ return splits;
+ }
+
+ private static List<List<Interval>> createSplitsIntervals(List<Interval> intervals, int numSplits) {
+ final long totalTime = DruidIntervalUtils.extractTotalTime(intervals);
+ long startTime = intervals.get(0).getStartMillis();
+ long endTime = startTime;
+ long currTime = 0;
+ List<List<Interval>> newIntervals = new ArrayList<>();
+ for (int i = 0, posIntervals = 0; i < numSplits; i++) {
+ final long rangeSize = Math.round( (double) (totalTime * (i + 1)) / numSplits) -
+ Math.round( (double) (totalTime * i) / numSplits);
+ // Create the new interval(s)
+ List<Interval> currentIntervals = new ArrayList<>();
+ while (posIntervals < intervals.size()) {
+ final Interval interval = intervals.get(posIntervals);
+ final long expectedRange = rangeSize - currTime;
+ if (interval.getEndMillis() - startTime >= expectedRange) {
+ endTime = startTime + expectedRange;
+ currentIntervals.add(new Interval(startTime, endTime));
+ startTime = endTime;
+ currTime = 0;
+ break;
+ }
+ endTime = interval.getEndMillis();
+ currentIntervals.add(new Interval(startTime, endTime));
+ currTime += (endTime - startTime);
+ startTime = intervals.get(++posIntervals).getStartMillis();
+ }
+ newIntervals.add(currentIntervals);
+ }
+ assert endTime == intervals.get(intervals.size()-1).getEndMillis();
+ return newIntervals;
+ }
+
+ @Override
+ public org.apache.hadoop.mapred.RecordReader<NullWritable, DruidWritable> getRecordReader(
+ org.apache.hadoop.mapred.InputSplit split, JobConf job, Reporter reporter)
+ throws IOException {
+ // We need to provide a different record reader for every type of Druid query.
+ // The reason is that Druid results format is different for each type.
+ final DruidQueryRecordReader<?,?> reader;
+ final String druidQueryType = job.get(Constants.DRUID_QUERY_TYPE);
+ if (druidQueryType == null) {
+ reader = new DruidSelectQueryRecordReader(); // By default
+ reader.initialize((HiveDruidSplit)split, job);
+ return reader;
+ }
+ switch (druidQueryType) {
+ case Query.TIMESERIES:
+ reader = new DruidTimeseriesQueryRecordReader();
+ break;
+ case Query.TOPN:
+ reader = new DruidTopNQueryRecordReader();
+ break;
+ case Query.GROUP_BY:
+ reader = new DruidGroupByQueryRecordReader();
+ break;
+ case Query.SELECT:
+ reader = new DruidSelectQueryRecordReader();
+ break;
+ default:
+ throw new IOException("Druid query type not recognized");
+ }
+ reader.initialize((HiveDruidSplit)split, job);
+ return reader;
+ }
+
+ @Override
+ public RecordReader<NullWritable, DruidWritable> createRecordReader(InputSplit split,
+ TaskAttemptContext context) throws IOException, InterruptedException {
+ // We need to provide a different record reader for every type of Druid query.
+ // The reason is that Druid results format is different for each type.
+ final String druidQueryType = context.getConfiguration().get(Constants.DRUID_QUERY_TYPE);
+ if (druidQueryType == null) {
+ return new DruidSelectQueryRecordReader(); // By default
+ }
+ final DruidQueryRecordReader<?,?> reader;
+ switch (druidQueryType) {
+ case Query.TIMESERIES:
+ reader = new DruidTimeseriesQueryRecordReader();
+ break;
+ case Query.TOPN:
+ reader = new DruidTopNQueryRecordReader();
+ break;
+ case Query.GROUP_BY:
+ reader = new DruidGroupByQueryRecordReader();
+ break;
+ case Query.SELECT:
+ reader = new DruidSelectQueryRecordReader();
+ break;
+ default:
+ throw new IOException("Druid query type not recognized");
+ }
+ return reader;
+ }
+
+}
http://git-wip-us.apache.org/repos/asf/hive/blob/58d1befa/druid-handler/src/java/org/apache/hadoop/hive/druid/HiveDruidSplit.java
----------------------------------------------------------------------
diff --git a/druid-handler/src/java/org/apache/hadoop/hive/druid/HiveDruidSplit.java b/druid-handler/src/java/org/apache/hadoop/hive/druid/HiveDruidSplit.java
new file mode 100644
index 0000000..3fba5d0
--- /dev/null
+++ b/druid-handler/src/java/org/apache/hadoop/hive/druid/HiveDruidSplit.java
@@ -0,0 +1,83 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.hadoop.hive.druid;
+
+import java.io.DataInput;
+import java.io.DataOutput;
+import java.io.IOException;
+
+import org.apache.hadoop.fs.Path;
+import org.apache.hadoop.mapred.FileSplit;
+
+/**
+ * Druid split. Its purpose is to trigger query execution in Druid.
+ */
+public class HiveDruidSplit extends FileSplit implements org.apache.hadoop.mapred.InputSplit {
+
+ private String address;
+ private String druidQuery;
+
+ // required for deserialization
+ public HiveDruidSplit() {
+ super((Path) null, 0, 0, (String[]) null);
+ }
+
+ public HiveDruidSplit(String address, String druidQuery, Path dummyPath) {
+ super(dummyPath, 0, 0, (String[]) null);
+ this.address = address;
+ this.druidQuery = druidQuery;
+ }
+
+ @Override
+ public void write(DataOutput out) throws IOException {
+ super.write(out);
+ out.writeUTF(address);
+ out.writeUTF(druidQuery);
+ }
+
+ @Override
+ public void readFields(DataInput in) throws IOException {
+ super.readFields(in);
+ address = in.readUTF();
+ druidQuery = in.readUTF();
+ }
+
+ @Override
+ public long getLength() {
+ return 0L;
+ }
+
+ @Override
+ public String[] getLocations() {
+ return new String[] {""} ;
+ }
+
+ public String getAddress() {
+ return address;
+ }
+
+ public String getDruidQuery() {
+ return druidQuery;
+ }
+
+ @Override
+ public String toString() {
+ return "HiveDruidSplit{" + address + ", " + druidQuery + "}";
+ }
+
+}
http://git-wip-us.apache.org/repos/asf/hive/blob/58d1befa/druid-handler/src/java/org/apache/hadoop/hive/druid/serde/DruidGroupByQueryRecordReader.java
----------------------------------------------------------------------
diff --git a/druid-handler/src/java/org/apache/hadoop/hive/druid/serde/DruidGroupByQueryRecordReader.java b/druid-handler/src/java/org/apache/hadoop/hive/druid/serde/DruidGroupByQueryRecordReader.java
new file mode 100644
index 0000000..226060f
--- /dev/null
+++ b/druid-handler/src/java/org/apache/hadoop/hive/druid/serde/DruidGroupByQueryRecordReader.java
@@ -0,0 +1,199 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.hadoop.hive.druid.serde;
+
+import java.io.IOException;
+import java.io.InputStream;
+import java.util.List;
+
+import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.hive.druid.DruidStorageHandlerUtils;
+import org.apache.hadoop.hive.ql.optimizer.calcite.druid.DruidTable;
+import org.apache.hadoop.io.NullWritable;
+import org.apache.hadoop.mapreduce.InputSplit;
+
+import com.fasterxml.jackson.core.type.TypeReference;
+
+import io.druid.data.input.Row;
+import io.druid.query.aggregation.AggregatorFactory;
+import io.druid.query.aggregation.PostAggregator;
+import io.druid.query.dimension.DimensionSpec;
+import io.druid.query.groupby.GroupByQuery;
+
+/**
+ * Record reader for results for Druid GroupByQuery.
+ */
+public class DruidGroupByQueryRecordReader
+ extends DruidQueryRecordReader<GroupByQuery, Row> {
+
+ private Row current;
+ private int[] indexes = new int[0];
+ // Row objects returned by GroupByQuery have different access paths depending on
+ // whether the result for the metric is a Float or a Long, thus we keep track
+ // using these converters
+ private Extract[] extractors;
+
+ @Override
+ public void initialize(InputSplit split, Configuration conf) throws IOException {
+ super.initialize(split, conf);
+ initExtractors();
+ }
+
+ @Override
+ protected GroupByQuery createQuery(String content) throws IOException {
+ return DruidStorageHandlerUtils.JSON_MAPPER.readValue(content, GroupByQuery.class);
+ }
+
+ @Override
+ protected List<Row> createResultsList(InputStream content) throws IOException {
+ return DruidStorageHandlerUtils.SMILE_MAPPER.readValue(content,
+ new TypeReference<List<Row>>(){});
+ }
+
+ private void initExtractors() throws IOException {
+ extractors = new Extract[query.getAggregatorSpecs().size() + query.getPostAggregatorSpecs().size()];
+ int counter = 0;
+ for (int i = 0; i < query.getAggregatorSpecs().size(); i++, counter++) {
+ AggregatorFactory af = query.getAggregatorSpecs().get(i);
+ switch (af.getTypeName().toUpperCase()) {
+ case DruidSerDeUtils.FLOAT_TYPE:
+ extractors[counter] = Extract.FLOAT;
+ break;
+ case DruidSerDeUtils.LONG_TYPE:
+ extractors[counter] = Extract.LONG;
+ break;
+ default:
+ throw new IOException("Type not supported");
+ }
+ }
+ for (int i = 0; i < query.getPostAggregatorSpecs().size(); i++, counter++) {
+ extractors[counter] = Extract.FLOAT;
+ }
+ }
+
+ @Override
+ public boolean nextKeyValue() {
+ // Refresh indexes
+ for (int i = indexes.length - 1; i >= 0; i--) {
+ if (indexes[i] > 0) {
+ indexes[i]--;
+ for (int j = i + 1; j < indexes.length; j++) {
+ indexes[j] = current.getDimension(
+ query.getDimensions().get(j).getDimension()).size() - 1;
+ }
+ return true;
+ }
+ }
+ // Results
+ if (results.hasNext()) {
+ current = results.next();
+ indexes = new int[query.getDimensions().size()];
+ for (int i=0; i < query.getDimensions().size(); i++) {
+ DimensionSpec ds = query.getDimensions().get(i);
+ indexes[i] = current.getDimension(ds.getDimension()).size() - 1;
+ }
+ return true;
+ }
+ return false;
+ }
+
+ @Override
+ public NullWritable getCurrentKey() throws IOException, InterruptedException {
+ return NullWritable.get();
+ }
+
+ @Override
+ public DruidWritable getCurrentValue() throws IOException, InterruptedException {
+ // Create new value
+ DruidWritable value = new DruidWritable();
+ // 1) The timestamp column
+ value.getValue().put(DruidTable.DEFAULT_TIMESTAMP_COLUMN, current.getTimestamp().getMillis());
+ // 2) The dimension columns
+ for (int i=0; i < query.getDimensions().size(); i++) {
+ DimensionSpec ds = query.getDimensions().get(i);
+ List<String> dims = current.getDimension(ds.getDimension());
+ int pos = dims.size() - indexes[i] - 1;
+ value.getValue().put(ds.getOutputName(), dims.get(pos));
+ }
+ int counter = 0;
+ // 3) The aggregation columns
+ for (AggregatorFactory af : query.getAggregatorSpecs()) {
+ switch (extractors[counter++]) {
+ case FLOAT:
+ value.getValue().put(af.getName(), current.getFloatMetric(af.getName()));
+ break;
+ case LONG:
+ value.getValue().put(af.getName(), current.getLongMetric(af.getName()));
+ break;
+ }
+ }
+ // 4) The post-aggregation columns
+ for (PostAggregator pa : query.getPostAggregatorSpecs()) {
+ assert extractors[counter++] == Extract.FLOAT;
+ value.getValue().put(pa.getName(), current.getFloatMetric(pa.getName()));
+ }
+ return value;
+ }
+
+ @Override
+ public boolean next(NullWritable key, DruidWritable value) {
+ if (nextKeyValue()) {
+ // Update value
+ value.getValue().clear();
+ // 1) The timestamp column
+ value.getValue().put(DruidTable.DEFAULT_TIMESTAMP_COLUMN, current.getTimestamp().getMillis());
+ // 2) The dimension columns
+ for (int i=0; i < query.getDimensions().size(); i++) {
+ DimensionSpec ds = query.getDimensions().get(i);
+ List<String> dims = current.getDimension(ds.getDimension());
+ int pos = dims.size() - indexes[i] - 1;
+ value.getValue().put(ds.getOutputName(), dims.get(pos));
+ }
+ int counter = 0;
+ // 3) The aggregation columns
+ for (AggregatorFactory af : query.getAggregatorSpecs()) {
+ switch (extractors[counter++]) {
+ case FLOAT:
+ value.getValue().put(af.getName(), current.getFloatMetric(af.getName()));
+ break;
+ case LONG:
+ value.getValue().put(af.getName(), current.getLongMetric(af.getName()));
+ break;
+ }
+ }
+ // 4) The post-aggregation columns
+ for (PostAggregator pa : query.getPostAggregatorSpecs()) {
+ assert extractors[counter++] == Extract.FLOAT;
+ value.getValue().put(pa.getName(), current.getFloatMetric(pa.getName()));
+ }
+ return true;
+ }
+ return false;
+ }
+
+ @Override
+ public float getProgress() throws IOException {
+ return results.hasNext() ? 0 : 1;
+ }
+
+ private enum Extract {
+ FLOAT,
+ LONG
+ }
+
+}
http://git-wip-us.apache.org/repos/asf/hive/blob/58d1befa/druid-handler/src/java/org/apache/hadoop/hive/druid/serde/DruidQueryRecordReader.java
----------------------------------------------------------------------
diff --git a/druid-handler/src/java/org/apache/hadoop/hive/druid/serde/DruidQueryRecordReader.java b/druid-handler/src/java/org/apache/hadoop/hive/druid/serde/DruidQueryRecordReader.java
new file mode 100644
index 0000000..96bcee8
--- /dev/null
+++ b/druid-handler/src/java/org/apache/hadoop/hive/druid/serde/DruidQueryRecordReader.java
@@ -0,0 +1,142 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.hadoop.hive.druid.serde;
+
+import java.io.IOException;
+import java.io.InputStream;
+import java.util.Iterator;
+import java.util.List;
+
+import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.hive.druid.DruidStorageHandlerUtils;
+import org.apache.hadoop.hive.druid.HiveDruidSplit;
+import org.apache.hadoop.io.NullWritable;
+import org.apache.hadoop.mapreduce.InputSplit;
+import org.apache.hadoop.mapreduce.RecordReader;
+import org.apache.hadoop.mapreduce.TaskAttemptContext;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import com.google.common.collect.Iterators;
+import com.metamx.common.lifecycle.Lifecycle;
+import com.metamx.http.client.HttpClient;
+import com.metamx.http.client.HttpClientConfig;
+import com.metamx.http.client.HttpClientInit;
+
+import io.druid.query.BaseQuery;
+
+/**
+ * Base record reader for given a Druid query. This class contains the logic to
+ * send the query to the broker and retrieve the results. The transformation to
+ * emit records needs to be done by the classes that extend the reader.
+ *
+ * The key for each record will be a NullWritable, while the value will be a
+ * DruidWritable containing the timestamp as well as all values resulting from
+ * the query.
+ */
+public abstract class DruidQueryRecordReader<T extends BaseQuery<R>,R extends Comparable<R>>
+ extends RecordReader<NullWritable, DruidWritable>
+ implements org.apache.hadoop.mapred.RecordReader<NullWritable, DruidWritable> {
+
+ private static final Logger LOG = LoggerFactory.getLogger(DruidQueryRecordReader.class);
+
+ /**
+ * Query that Druid executes.
+ */
+ protected T query;
+
+ /**
+ * Query results.
+ */
+ protected Iterator<R> results = Iterators.emptyIterator();
+
+ @Override
+ public void initialize(InputSplit split, TaskAttemptContext context) throws IOException {
+ initialize(split, context.getConfiguration());
+ }
+
+ public void initialize(InputSplit split, Configuration conf) throws IOException {
+ HiveDruidSplit hiveDruidSplit = (HiveDruidSplit) split;
+
+ // Create query
+ query = createQuery(hiveDruidSplit.getDruidQuery());
+
+ // Execute query
+ if (LOG.isInfoEnabled()) {
+ LOG.info("Retrieving from druid using query:\n " + query);
+ }
+
+ HttpClient client = HttpClientInit.createClient(HttpClientConfig.builder().build(), new Lifecycle());
+ InputStream response = DruidStorageHandlerUtils.submitRequest(client,
+ DruidStorageHandlerUtils.createRequest(hiveDruidSplit.getAddress(), query));
+
+ // Retrieve results
+ List<R> resultsList;
+ try {
+ resultsList = createResultsList(response);
+ } catch (IOException e) {
+ response.close();
+ throw e;
+ }
+ if (resultsList == null || resultsList.isEmpty()) {
+ return;
+ }
+ results = resultsList.iterator();
+ }
+
+ protected abstract T createQuery(String content) throws IOException;
+
+ protected abstract List<R> createResultsList(InputStream content) throws IOException;
+
+ @Override
+ public NullWritable createKey() {
+ return NullWritable.get();
+ }
+
+ @Override
+ public DruidWritable createValue() {
+ return new DruidWritable();
+ }
+
+ @Override
+ public abstract boolean next(NullWritable key, DruidWritable value) throws IOException;
+
+ @Override
+ public long getPos() {
+ return 0;
+ }
+
+ @Override
+ public abstract boolean nextKeyValue() throws IOException;
+
+ @Override
+ public abstract NullWritable getCurrentKey() throws IOException, InterruptedException;
+
+ @Override
+ // TODO: we could generate vector row batches so that vectorized execution may get triggered
+ public abstract DruidWritable getCurrentValue() throws IOException, InterruptedException;
+
+ @Override
+ public abstract float getProgress() throws IOException;
+
+ @Override
+ public void close() {
+ // Nothing to do
+ }
+
+}
http://git-wip-us.apache.org/repos/asf/hive/blob/58d1befa/druid-handler/src/java/org/apache/hadoop/hive/druid/serde/DruidSelectQueryRecordReader.java
----------------------------------------------------------------------
diff --git a/druid-handler/src/java/org/apache/hadoop/hive/druid/serde/DruidSelectQueryRecordReader.java b/druid-handler/src/java/org/apache/hadoop/hive/druid/serde/DruidSelectQueryRecordReader.java
new file mode 100644
index 0000000..70b493c
--- /dev/null
+++ b/druid-handler/src/java/org/apache/hadoop/hive/druid/serde/DruidSelectQueryRecordReader.java
@@ -0,0 +1,106 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.hadoop.hive.druid.serde;
+
+import java.io.IOException;
+import java.io.InputStream;
+import java.util.Iterator;
+import java.util.List;
+
+import org.apache.hadoop.hive.druid.DruidStorageHandlerUtils;
+import org.apache.hadoop.hive.ql.optimizer.calcite.druid.DruidTable;
+import org.apache.hadoop.io.NullWritable;
+
+import com.fasterxml.jackson.core.type.TypeReference;
+import com.google.common.collect.Iterators;
+
+import io.druid.query.Result;
+import io.druid.query.select.EventHolder;
+import io.druid.query.select.SelectQuery;
+import io.druid.query.select.SelectResultValue;
+
+/**
+ * Record reader for results for Druid SelectQuery.
+ */
+public class DruidSelectQueryRecordReader
+ extends DruidQueryRecordReader<SelectQuery, Result<SelectResultValue>> {
+
+ private Result<SelectResultValue> current;
+ private Iterator<EventHolder> values = Iterators.emptyIterator();
+
+ @Override
+ protected SelectQuery createQuery(String content) throws IOException {
+ return DruidStorageHandlerUtils.JSON_MAPPER.readValue(content, SelectQuery.class);
+ }
+
+ @Override
+ protected List<Result<SelectResultValue>> createResultsList(InputStream content) throws IOException {
+ return DruidStorageHandlerUtils.SMILE_MAPPER.readValue(content,
+ new TypeReference<List<Result<SelectResultValue>>>(){});
+ }
+
+ @Override
+ public boolean nextKeyValue() throws IOException {
+ if (values.hasNext()) {
+ return true;
+ }
+ if (results.hasNext()) {
+ current = results.next();
+ values = current.getValue().getEvents().iterator();
+ return true;
+ }
+ return false;
+ }
+
+ @Override
+ public NullWritable getCurrentKey() throws IOException, InterruptedException {
+ return NullWritable.get();
+ }
+
+ @Override
+ public DruidWritable getCurrentValue() throws IOException, InterruptedException {
+ // Create new value
+ DruidWritable value = new DruidWritable();
+ value.getValue().put(DruidTable.DEFAULT_TIMESTAMP_COLUMN, current.getTimestamp().getMillis());
+ if (values.hasNext()) {
+ value.getValue().putAll(values.next().getEvent());
+ return value;
+ }
+ return value;
+ }
+
+ @Override
+ public boolean next(NullWritable key, DruidWritable value) throws IOException {
+ if (nextKeyValue()) {
+ // Update value
+ value.getValue().clear();
+ value.getValue().put(DruidTable.DEFAULT_TIMESTAMP_COLUMN, current.getTimestamp().getMillis());
+ if (values.hasNext()) {
+ value.getValue().putAll(values.next().getEvent());
+ }
+ return true;
+ }
+ return false;
+ }
+
+ @Override
+ public float getProgress() {
+ return results.hasNext() || values.hasNext() ? 0 : 1;
+ }
+
+}
http://git-wip-us.apache.org/repos/asf/hive/blob/58d1befa/druid-handler/src/java/org/apache/hadoop/hive/druid/serde/DruidSerDe.java
----------------------------------------------------------------------
diff --git a/druid-handler/src/java/org/apache/hadoop/hive/druid/serde/DruidSerDe.java b/druid-handler/src/java/org/apache/hadoop/hive/druid/serde/DruidSerDe.java
new file mode 100644
index 0000000..8f53d4a
--- /dev/null
+++ b/druid-handler/src/java/org/apache/hadoop/hive/druid/serde/DruidSerDe.java
@@ -0,0 +1,343 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.hadoop.hive.druid.serde;
+
+import java.io.IOException;
+import java.io.InputStream;
+import java.sql.Timestamp;
+import java.util.ArrayList;
+import java.util.List;
+import java.util.Map.Entry;
+import java.util.Properties;
+
+import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.hive.conf.Constants;
+import org.apache.hadoop.hive.conf.HiveConf;
+import org.apache.hadoop.hive.druid.DruidStorageHandlerUtils;
+import org.apache.hadoop.hive.ql.optimizer.calcite.druid.DruidTable;
+import org.apache.hadoop.hive.serde2.AbstractSerDe;
+import org.apache.hadoop.hive.serde2.SerDeException;
+import org.apache.hadoop.hive.serde2.SerDeSpec;
+import org.apache.hadoop.hive.serde2.SerDeStats;
+import org.apache.hadoop.hive.serde2.io.TimestampWritable;
+import org.apache.hadoop.hive.serde2.objectinspector.ObjectInspector;
+import org.apache.hadoop.hive.serde2.objectinspector.ObjectInspectorFactory;
+import org.apache.hadoop.hive.serde2.objectinspector.primitive.PrimitiveObjectInspectorFactory;
+import org.apache.hadoop.hive.serde2.typeinfo.PrimitiveTypeInfo;
+import org.apache.hadoop.hive.serde2.typeinfo.TypeInfoFactory;
+import org.apache.hadoop.io.FloatWritable;
+import org.apache.hadoop.io.LongWritable;
+import org.apache.hadoop.io.NullWritable;
+import org.apache.hadoop.io.Text;
+import org.apache.hadoop.io.Writable;
+import org.apache.hadoop.util.StringUtils;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import com.fasterxml.jackson.core.type.TypeReference;
+import com.google.common.collect.Lists;
+import com.metamx.common.lifecycle.Lifecycle;
+import com.metamx.http.client.HttpClient;
+import com.metamx.http.client.HttpClientConfig;
+import com.metamx.http.client.HttpClientInit;
+
+import io.druid.query.Druids;
+import io.druid.query.Druids.SegmentMetadataQueryBuilder;
+import io.druid.query.Query;
+import io.druid.query.aggregation.AggregatorFactory;
+import io.druid.query.aggregation.PostAggregator;
+import io.druid.query.dimension.DimensionSpec;
+import io.druid.query.groupby.GroupByQuery;
+import io.druid.query.metadata.metadata.ColumnAnalysis;
+import io.druid.query.metadata.metadata.SegmentAnalysis;
+import io.druid.query.metadata.metadata.SegmentMetadataQuery;
+import io.druid.query.select.SelectQuery;
+import io.druid.query.timeseries.TimeseriesQuery;
+import io.druid.query.topn.TopNQuery;
+
+/**
+ * DruidSerDe that is used to deserialize objects from a Druid data source.
+ */
+@SerDeSpec(schemaProps = {Constants.DRUID_DATA_SOURCE})
+public class DruidSerDe extends AbstractSerDe {
+
+ protected static final Logger LOG = LoggerFactory.getLogger(DruidSerDe.class);
+
+ private String[] columns;
+ private PrimitiveTypeInfo[] types;
+ private ObjectInspector inspector;
+
+
+ @Override
+ public void initialize(Configuration configuration, Properties properties) throws SerDeException {
+ final List<String> columnNames = new ArrayList<>();
+ final List<PrimitiveTypeInfo> columnTypes = new ArrayList<>();
+ List<ObjectInspector> inspectors = new ArrayList<>();
+
+ // Druid query
+ String druidQuery = properties.getProperty(Constants.DRUID_QUERY_JSON);
+ if (druidQuery == null) {
+ // No query. We need to create a Druid Segment Metadata query that retrieves all
+ // columns present in the data source (dimensions and metrics).
+ // Create Segment Metadata Query
+ String dataSource = properties.getProperty(Constants.DRUID_DATA_SOURCE);
+ if (dataSource == null) {
+ throw new SerDeException("Druid data source not specified; use " +
+ Constants.DRUID_DATA_SOURCE + " in table properties");
+ }
+ SegmentMetadataQueryBuilder builder = new Druids.SegmentMetadataQueryBuilder();
+ builder.dataSource(dataSource);
+ builder.merge(true);
+ builder.analysisTypes();
+ SegmentMetadataQuery query = builder.build();
+
+ // Execute query in Druid
+ String address = HiveConf.getVar(configuration,
+ HiveConf.ConfVars.HIVE_DRUID_BROKER_DEFAULT_ADDRESS);
+ if (org.apache.commons.lang3.StringUtils.isEmpty(address)) {
+ throw new SerDeException("Druid broker address not specified in configuration");
+ }
+
+ // Infer schema
+ SegmentAnalysis schemaInfo;
+ try {
+ schemaInfo = submitMetadataRequest(address, query);
+ } catch (IOException e) {
+ throw new SerDeException(e);
+ }
+ for (Entry<String,ColumnAnalysis> columnInfo : schemaInfo.getColumns().entrySet()) {
+ if (columnInfo.getKey().equals(DruidTable.DEFAULT_TIMESTAMP_COLUMN)) {
+ // Special handling for timestamp column
+ columnNames.add(columnInfo.getKey()); // field name
+ PrimitiveTypeInfo type = TypeInfoFactory.timestampTypeInfo; // field type
+ columnTypes.add(type);
+ inspectors.add(PrimitiveObjectInspectorFactory.getPrimitiveWritableObjectInspector(type));
+ continue;
+ }
+ columnNames.add(columnInfo.getKey()); // field name
+ PrimitiveTypeInfo type = DruidSerDeUtils.convertDruidToHiveType(
+ columnInfo.getValue().getType()); // field type
+ columnTypes.add(type);
+ inspectors.add(PrimitiveObjectInspectorFactory.getPrimitiveWritableObjectInspector(type));
+ }
+ columns = columnNames.toArray(new String[columnNames.size()]);
+ types = columnTypes.toArray(new PrimitiveTypeInfo[columnTypes.size()]);
+ inspector = ObjectInspectorFactory.getStandardStructObjectInspector(columnNames, inspectors);
+ } else {
+ // Query is specified, we can extract the results schema from the query
+ Query<?> query;
+ try {
+ query = DruidStorageHandlerUtils.JSON_MAPPER.readValue(druidQuery, Query.class);
+ } catch (Exception e) {
+ throw new SerDeException(e);
+ }
+
+ switch (query.getType()) {
+ case Query.TIMESERIES:
+ inferSchema((TimeseriesQuery) query, columnNames, columnTypes);
+ break;
+ case Query.TOPN:
+ inferSchema((TopNQuery) query, columnNames, columnTypes);
+ break;
+ case Query.SELECT:
+ inferSchema((SelectQuery) query, columnNames, columnTypes);
+ break;
+ case Query.GROUP_BY:
+ inferSchema((GroupByQuery) query, columnNames, columnTypes);
+ break;
+ default:
+ throw new SerDeException("Not supported Druid query");
+ }
+
+ columns = new String[columnNames.size()];
+ types = new PrimitiveTypeInfo[columnNames.size()];
+ for (int i = 0; i < columnTypes.size(); ++i) {
+ columns[i] = columnNames.get(i);
+ types[i] = columnTypes.get(i);
+ inspectors.add(PrimitiveObjectInspectorFactory.getPrimitiveWritableObjectInspector(types[i]));
+ }
+ inspector = ObjectInspectorFactory.getStandardStructObjectInspector(columnNames, inspectors);
+ }
+
+ if (LOG.isDebugEnabled()) {
+ LOG.debug("DruidSerDe initialized with\n"
+ + "\t columns: " + columnNames
+ + "\n\t types: " + columnTypes);
+ }
+ }
+
+ /* Submits the request and returns */
+ protected SegmentAnalysis submitMetadataRequest(String address, SegmentMetadataQuery query)
+ throws SerDeException, IOException {
+ HttpClient client = HttpClientInit.createClient(HttpClientConfig.builder().build(), new Lifecycle());
+ InputStream response;
+ try {
+ response = DruidStorageHandlerUtils.submitRequest(client,
+ DruidStorageHandlerUtils.createRequest(address, query));
+ } catch (Exception e) {
+ throw new SerDeException(StringUtils.stringifyException(e));
+ }
+
+ // Retrieve results
+ List<SegmentAnalysis> resultsList;
+ try {
+ resultsList = DruidStorageHandlerUtils.SMILE_MAPPER.readValue(response,
+ new TypeReference<List<SegmentAnalysis>>() {});
+ } catch (Exception e) {
+ response.close();
+ throw new SerDeException(StringUtils.stringifyException(e));
+ }
+ if (resultsList == null || resultsList.isEmpty()) {
+ throw new SerDeException("Connected to Druid but could not retrieve datasource information");
+ }
+ if (resultsList.size() != 1) {
+ throw new SerDeException("Information about segments should have been merged");
+ }
+
+ return resultsList.get(0);
+ }
+
+ /* Timeseries query */
+ private void inferSchema(TimeseriesQuery query, List<String> columnNames,
+ List<PrimitiveTypeInfo> columnTypes) {
+ // Timestamp column
+ columnNames.add(DruidTable.DEFAULT_TIMESTAMP_COLUMN);
+ columnTypes.add(TypeInfoFactory.timestampTypeInfo);
+ // Aggregator columns
+ for (AggregatorFactory af : query.getAggregatorSpecs()) {
+ columnNames.add(af.getName());
+ columnTypes.add(DruidSerDeUtils.convertDruidToHiveType(af.getTypeName()));
+ }
+ // Post-aggregator columns
+ for (PostAggregator pa : query.getPostAggregatorSpecs()) {
+ columnNames.add(pa.getName());
+ columnTypes.add(TypeInfoFactory.floatTypeInfo);
+ }
+ }
+
+ /* TopN query */
+ private void inferSchema(TopNQuery query, List<String> columnNames, List<PrimitiveTypeInfo> columnTypes) {
+ // Timestamp column
+ columnNames.add(DruidTable.DEFAULT_TIMESTAMP_COLUMN);
+ columnTypes.add(TypeInfoFactory.timestampTypeInfo);
+ // Dimension column
+ columnNames.add(query.getDimensionSpec().getOutputName());
+ columnTypes.add(TypeInfoFactory.stringTypeInfo);
+ // Aggregator columns
+ for (AggregatorFactory af : query.getAggregatorSpecs()) {
+ columnNames.add(af.getName());
+ columnTypes.add(DruidSerDeUtils.convertDruidToHiveType(af.getTypeName()));
+ }
+ // Post-aggregator columns
+ for (PostAggregator pa : query.getPostAggregatorSpecs()) {
+ columnNames.add(pa.getName());
+ columnTypes.add(TypeInfoFactory.floatTypeInfo);
+ }
+ }
+
+ /* Select query */
+ private void inferSchema(SelectQuery query, List<String> columnNames,
+ List<PrimitiveTypeInfo> columnTypes) {
+ // Timestamp column
+ columnNames.add(DruidTable.DEFAULT_TIMESTAMP_COLUMN);
+ columnTypes.add(TypeInfoFactory.timestampTypeInfo);
+ // Dimension columns
+ for (DimensionSpec ds : query.getDimensions()) {
+ columnNames.add(ds.getOutputName());
+ columnTypes.add(TypeInfoFactory.stringTypeInfo);
+ }
+ // Metric columns
+ for (String metric : query.getMetrics()) {
+ columnNames.add(metric);
+ columnTypes.add(TypeInfoFactory.floatTypeInfo);
+ }
+ }
+
+ /* GroupBy query */
+ private void inferSchema(GroupByQuery query, List<String> columnNames, List<PrimitiveTypeInfo> columnTypes) {
+ // Timestamp column
+ columnNames.add(DruidTable.DEFAULT_TIMESTAMP_COLUMN);
+ columnTypes.add(TypeInfoFactory.timestampTypeInfo);
+ // Dimension columns
+ for (DimensionSpec ds : query.getDimensions()) {
+ columnNames.add(ds.getOutputName());
+ columnTypes.add(TypeInfoFactory.stringTypeInfo);
+ }
+ // Aggregator columns
+ for (AggregatorFactory af : query.getAggregatorSpecs()) {
+ columnNames.add(af.getName());
+ columnTypes.add(DruidSerDeUtils.convertDruidToHiveType(af.getTypeName()));
+ }
+ // Post-aggregator columns
+ for (PostAggregator pa : query.getPostAggregatorSpecs()) {
+ columnNames.add(pa.getName());
+ columnTypes.add(TypeInfoFactory.floatTypeInfo);
+ }
+ }
+
+ @Override
+ public Class<? extends Writable> getSerializedClass() {
+ return NullWritable.class;
+ }
+
+ @Override
+ public Writable serialize(Object o, ObjectInspector objectInspector) throws SerDeException {
+ return NullWritable.get();
+ }
+
+ @Override
+ public SerDeStats getSerDeStats() {
+ throw new UnsupportedOperationException("SerdeStats not supported.");
+ }
+
+ @Override
+ public Object deserialize(Writable writable) throws SerDeException {
+ DruidWritable input = (DruidWritable) writable;
+ List<Object> output = Lists.newArrayListWithExpectedSize(columns.length);
+ for (int i = 0; i < columns.length; i++) {
+ final Object value = input.getValue().get(columns[i]);
+ if (value == null) {
+ output.add(null);
+ continue;
+ }
+ switch (types[i].getPrimitiveCategory()) {
+ case TIMESTAMP:
+ output.add(new TimestampWritable(new Timestamp((Long)value)));
+ break;
+ case LONG:
+ output.add(new LongWritable(((Number)value).longValue()));
+ break;
+ case FLOAT:
+ output.add(new FloatWritable(((Number)value).floatValue()));
+ break;
+ case STRING:
+ output.add(new Text(value.toString()));
+ break;
+ default:
+ throw new SerDeException("Unknown type: " + types[i].getPrimitiveCategory());
+ }
+ }
+ return output;
+ }
+
+ @Override
+ public ObjectInspector getObjectInspector() throws SerDeException {
+ return inspector;
+ }
+
+}
http://git-wip-us.apache.org/repos/asf/hive/blob/58d1befa/druid-handler/src/java/org/apache/hadoop/hive/druid/serde/DruidSerDeUtils.java
----------------------------------------------------------------------
diff --git a/druid-handler/src/java/org/apache/hadoop/hive/druid/serde/DruidSerDeUtils.java b/druid-handler/src/java/org/apache/hadoop/hive/druid/serde/DruidSerDeUtils.java
new file mode 100644
index 0000000..29b8845
--- /dev/null
+++ b/druid-handler/src/java/org/apache/hadoop/hive/druid/serde/DruidSerDeUtils.java
@@ -0,0 +1,83 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.hadoop.hive.druid.serde;
+
+import org.apache.hadoop.hive.serde.serdeConstants;
+import org.apache.hadoop.hive.serde2.typeinfo.PrimitiveTypeInfo;
+import org.apache.hadoop.hive.serde2.typeinfo.TypeInfoFactory;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+/**
+ * Utils class for Druid SerDe.
+ */
+public final class DruidSerDeUtils {
+
+ private static final Logger LOG = LoggerFactory.getLogger(DruidSerDeUtils.class);
+
+ protected static final String FLOAT_TYPE = "FLOAT";
+ protected static final String LONG_TYPE = "LONG";
+ protected static final String STRING_TYPE = "STRING";
+
+ /* This method converts from the String representation of Druid type
+ * to the corresponding Hive type */
+ public static PrimitiveTypeInfo convertDruidToHiveType(String typeName) {
+ typeName = typeName.toUpperCase();
+ switch(typeName) {
+ case FLOAT_TYPE:
+ return TypeInfoFactory.floatTypeInfo;
+ case LONG_TYPE:
+ return TypeInfoFactory.longTypeInfo;
+ case STRING_TYPE:
+ return TypeInfoFactory.stringTypeInfo;
+ default:
+ // This is a guard for special Druid types e.g. hyperUnique
+ // (http://druid.io/docs/0.9.1.1/querying/aggregations.html#hyperunique-aggregator).
+ // Currently, we do not support doing anything special with them in Hive.
+ // However, those columns are there, and they can be actually read as normal
+ // dimensions e.g. with a select query. Thus, we print the warning and just read them
+ // as String.
+ LOG.warn("Transformation to STRING for unknown type " + typeName);
+ return TypeInfoFactory.stringTypeInfo;
+ }
+ }
+
+ /* This method converts from the String representation of Druid type
+ * to the String representation of the corresponding Hive type */
+ public static String convertDruidToHiveTypeString(String typeName) {
+ typeName = typeName.toUpperCase();
+ switch(typeName) {
+ case FLOAT_TYPE:
+ return serdeConstants.FLOAT_TYPE_NAME;
+ case LONG_TYPE:
+ return serdeConstants.BIGINT_TYPE_NAME;
+ case STRING_TYPE:
+ return serdeConstants.STRING_TYPE_NAME;
+ default:
+ // This is a guard for special Druid types e.g. hyperUnique
+ // (http://druid.io/docs/0.9.1.1/querying/aggregations.html#hyperunique-aggregator).
+ // Currently, we do not support doing anything special with them in Hive.
+ // However, those columns are there, and they can be actually read as normal
+ // dimensions e.g. with a select query. Thus, we print the warning and just read them
+ // as String.
+ LOG.warn("Transformation to STRING for unknown type " + typeName);
+ return serdeConstants.STRING_TYPE_NAME;
+ }
+ }
+
+}
http://git-wip-us.apache.org/repos/asf/hive/blob/58d1befa/druid-handler/src/java/org/apache/hadoop/hive/druid/serde/DruidTimeseriesQueryRecordReader.java
----------------------------------------------------------------------
diff --git a/druid-handler/src/java/org/apache/hadoop/hive/druid/serde/DruidTimeseriesQueryRecordReader.java b/druid-handler/src/java/org/apache/hadoop/hive/druid/serde/DruidTimeseriesQueryRecordReader.java
new file mode 100644
index 0000000..812ae03
--- /dev/null
+++ b/druid-handler/src/java/org/apache/hadoop/hive/druid/serde/DruidTimeseriesQueryRecordReader.java
@@ -0,0 +1,93 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.hadoop.hive.druid.serde;
+
+import java.io.IOException;
+import java.io.InputStream;
+import java.util.List;
+
+import org.apache.hadoop.hive.druid.DruidStorageHandlerUtils;
+import org.apache.hadoop.hive.ql.optimizer.calcite.druid.DruidTable;
+import org.apache.hadoop.io.NullWritable;
+
+import com.fasterxml.jackson.core.type.TypeReference;
+
+import io.druid.query.Result;
+import io.druid.query.timeseries.TimeseriesQuery;
+import io.druid.query.timeseries.TimeseriesResultValue;
+
+/**
+ * Record reader for results for Druid TimeseriesQuery.
+ */
+public class DruidTimeseriesQueryRecordReader
+ extends DruidQueryRecordReader<TimeseriesQuery, Result<TimeseriesResultValue>> {
+
+ private Result<TimeseriesResultValue> current;
+
+ @Override
+ protected TimeseriesQuery createQuery(String content) throws IOException {
+ return DruidStorageHandlerUtils.JSON_MAPPER.readValue(content, TimeseriesQuery.class);
+ }
+
+ @Override
+ protected List<Result<TimeseriesResultValue>> createResultsList(InputStream content) throws IOException {
+ return DruidStorageHandlerUtils.SMILE_MAPPER.readValue(content,
+ new TypeReference<List<Result<TimeseriesResultValue>>>(){});
+ }
+
+ @Override
+ public boolean nextKeyValue() {
+ if (results.hasNext()) {
+ current = results.next();
+ return true;
+ }
+ return false;
+ }
+
+ @Override
+ public NullWritable getCurrentKey() throws IOException, InterruptedException {
+ return NullWritable.get();
+ }
+
+ @Override
+ public DruidWritable getCurrentValue() throws IOException, InterruptedException {
+ // Create new value
+ DruidWritable value = new DruidWritable();
+ value.getValue().put(DruidTable.DEFAULT_TIMESTAMP_COLUMN, current.getTimestamp().getMillis());
+ value.getValue().putAll(current.getValue().getBaseObject());
+ return value;
+ }
+
+ @Override
+ public boolean next(NullWritable key, DruidWritable value) {
+ if (nextKeyValue()) {
+ // Update value
+ value.getValue().clear();
+ value.getValue().put(DruidTable.DEFAULT_TIMESTAMP_COLUMN, current.getTimestamp().getMillis());
+ value.getValue().putAll(current.getValue().getBaseObject());
+ return true;
+ }
+ return false;
+ }
+
+ @Override
+ public float getProgress() throws IOException {
+ return results.hasNext() ? 0 : 1;
+ }
+
+}