You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@hive.apache.org by jc...@apache.org on 2016/12/15 20:51:38 UTC
[4/4] hive git commit: HIVE-15277: Teach Hive how to create/delete
Druid segments (Slim Bouguerra, reviewed by Jesus Camacho Rodriguez)
HIVE-15277: Teach Hive how to create/delete Druid segments (Slim Bouguerra, reviewed by Jesus Camacho Rodriguez)
Project: http://git-wip-us.apache.org/repos/asf/hive/repo
Commit: http://git-wip-us.apache.org/repos/asf/hive/commit/590687bc
Tree: http://git-wip-us.apache.org/repos/asf/hive/tree/590687bc
Diff: http://git-wip-us.apache.org/repos/asf/hive/diff/590687bc
Branch: refs/heads/master
Commit: 590687bc4cb97cdf4aa95ba94f28949986d1b3e8
Parents: 89362a1
Author: Slim Bouguerra <sl...@gmail.com>
Authored: Thu Dec 15 20:43:45 2016 +0000
Committer: Jesus Camacho Rodriguez <jc...@apache.org>
Committed: Thu Dec 15 20:43:45 2016 +0000
----------------------------------------------------------------------
.../hadoop/hive/common/JvmPauseMonitor.java | 3 +-
.../org/apache/hadoop/hive/conf/Constants.java | 7 +
.../org/apache/hadoop/hive/conf/HiveConf.java | 38 +-
druid-handler/README.md | 3 +
druid-handler/pom.xml | 145 +++-
.../hadoop/hive/druid/DruidStorageHandler.java | 311 +++++++-
.../hive/druid/DruidStorageHandlerUtils.java | 351 ++++++++-
.../hive/druid/HiveDruidOutputFormat.java | 55 --
.../druid/HiveDruidQueryBasedInputFormat.java | 376 ----------
.../hadoop/hive/druid/HiveDruidSplit.java | 83 ---
.../hadoop/hive/druid/io/DruidOutputFormat.java | 204 ++++++
.../druid/io/DruidQueryBasedInputFormat.java | 397 ++++++++++
.../hadoop/hive/druid/io/DruidRecordWriter.java | 260 +++++++
.../hadoop/hive/druid/io/HiveDruidSplit.java | 84 +++
.../serde/DruidGroupByQueryRecordReader.java | 15 +-
.../druid/serde/DruidQueryRecordReader.java | 42 +-
.../serde/DruidSelectQueryRecordReader.java | 8 +-
.../hadoop/hive/druid/serde/DruidSerDe.java | 286 +++++---
.../hive/druid/serde/DruidSerDeUtils.java | 6 +-
.../serde/DruidTimeseriesQueryRecordReader.java | 7 +-
.../druid/serde/DruidTopNQueryRecordReader.java | 8 +-
.../hive/druid/DruidStorageHandlerTest.java | 181 +++++
.../hadoop/hive/druid/QTestDruidSerDe.java | 60 +-
.../hadoop/hive/druid/TestDerbyConnector.java | 108 +++
.../hadoop/hive/druid/TestDruidSerDe.java | 731 ++++++++++---------
.../TestHiveDruidQueryBasedInputFormat.java | 91 ++-
.../hive/ql/io/DruidRecordWriterTest.java | 221 ++++++
.../llap/daemon/impl/TaskRunnerCallable.java | 45 +-
pom.xml | 2 +-
.../hadoop/hive/ql/exec/FileSinkOperator.java | 29 +-
.../apache/hadoop/hive/ql/exec/Utilities.java | 113 ++-
.../hadoop/hive/ql/hooks/LineageLogger.java | 31 +-
.../hadoop/hive/ql/optimizer/Optimizer.java | 2 +
...tedDynPartitionTimeGranularityOptimizer.java | 363 +++++++++
.../hadoop/hive/ql/parse/SemanticAnalyzer.java | 4 -
.../apache/hadoop/hive/ql/plan/PlanUtils.java | 75 +-
.../apache/hadoop/hive/ql/plan/TableDesc.java | 18 +-
.../queries/clientnegative/druid_external.q | 5 -
.../results/clientnegative/druid_external.q.out | 7 -
.../results/clientpositive/druid_basic2.q.out | 16 +-
40 files changed, 3534 insertions(+), 1257 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/hive/blob/590687bc/common/src/java/org/apache/hadoop/hive/common/JvmPauseMonitor.java
----------------------------------------------------------------------
diff --git a/common/src/java/org/apache/hadoop/hive/common/JvmPauseMonitor.java b/common/src/java/org/apache/hadoop/hive/common/JvmPauseMonitor.java
index 5d475f4..cf080e3 100644
--- a/common/src/java/org/apache/hadoop/hive/common/JvmPauseMonitor.java
+++ b/common/src/java/org/apache/hadoop/hive/common/JvmPauseMonitor.java
@@ -37,6 +37,7 @@ import java.lang.management.ManagementFactory;
import java.util.List;
import java.util.Map;
import java.util.Set;
+import java.util.concurrent.TimeUnit;
/**
* Based on the JvmPauseMonitor from Hadoop.
@@ -181,7 +182,7 @@ public class JvmPauseMonitor {
} catch (InterruptedException ie) {
return;
}
- long extraSleepTime = sw.elapsedMillis() - SLEEP_INTERVAL_MS;
+ long extraSleepTime = sw.elapsed(TimeUnit.MILLISECONDS) - SLEEP_INTERVAL_MS;
Map<String, GcTimes> gcTimesAfterSleep = getGcTimes();
if (extraSleepTime > warnThresholdMs) {
http://git-wip-us.apache.org/repos/asf/hive/blob/590687bc/common/src/java/org/apache/hadoop/hive/conf/Constants.java
----------------------------------------------------------------------
diff --git a/common/src/java/org/apache/hadoop/hive/conf/Constants.java b/common/src/java/org/apache/hadoop/hive/conf/Constants.java
index 6c42163..ea7864a 100644
--- a/common/src/java/org/apache/hadoop/hive/conf/Constants.java
+++ b/common/src/java/org/apache/hadoop/hive/conf/Constants.java
@@ -26,10 +26,17 @@ public class Constants {
/* Constants for Druid storage handler */
public static final String DRUID_HIVE_STORAGE_HANDLER_ID =
"org.apache.hadoop.hive.druid.DruidStorageHandler";
+ public static final String DRUID_HIVE_OUTPUT_FORMAT =
+ "org.apache.hadoop.hive.druid.io.DruidOutputFormat";
public static final String DRUID_DATA_SOURCE = "druid.datasource";
+ public static final String DRUID_SEGMENT_GRANULARITY = "druid.segment.granularity";
+ public static final String DRUID_TIMESTAMP_GRANULARITY_COL_NAME = "__time_granularity";
public static final String DRUID_QUERY_JSON = "druid.query.json";
public static final String DRUID_QUERY_TYPE = "druid.query.type";
public static final String DRUID_QUERY_FETCH = "druid.query.fetch";
+ public static final String DRUID_SEGMENT_DIRECTORY = "druid.storage.storageDirectory";
+ public static final String DRUID_SEGMENT_VERSION = "druid.segment.version";
+ public static final String DRUID_JOB_WORKING_DIRECTORY = "druid.job.workingDirectory";
public static final String HIVE_SERVER2_JOB_CREDSTORE_PASSWORD_ENVVAR = "HIVE_JOB_CREDSTORE_PASSWORD";
public static final String HADOOP_CREDENTIAL_PASSWORD_ENVVAR = "HADOOP_CREDSTORE_PASSWORD";
http://git-wip-us.apache.org/repos/asf/hive/blob/590687bc/common/src/java/org/apache/hadoop/hive/conf/HiveConf.java
----------------------------------------------------------------------
diff --git a/common/src/java/org/apache/hadoop/hive/conf/HiveConf.java b/common/src/java/org/apache/hadoop/hive/conf/HiveConf.java
index 032ff0c..dcb383d 100644
--- a/common/src/java/org/apache/hadoop/hive/conf/HiveConf.java
+++ b/common/src/java/org/apache/hadoop/hive/conf/HiveConf.java
@@ -1936,9 +1936,21 @@ public class HiveConf extends Configuration {
new TimeValidator(TimeUnit.MILLISECONDS), "Frequency of WriteSet reaper runs"),
// For Druid storage handler
+ HIVE_DRUID_INDEXING_GRANULARITY("hive.druid.indexer.segments.granularity", "DAY",
+ new PatternSet("YEAR", "MONTH", "WEEK", "DAY", "HOUR", "MINUTE", "SECOND"),
+ "Granularity for the segments created by the Druid storage handler"
+ ),
+ HIVE_DRUID_MAX_PARTITION_SIZE("hive.druid.indexer.partition.size.max", 5000000,
+ "Maximum number of records per segment partition"
+ ),
+ HIVE_DRUID_MAX_ROW_IN_MEMORY("hive.druid.indexer.memory.rownum.max", 75000,
+ "Maximum number of records in memory while storing data in Druid"
+ ),
HIVE_DRUID_BROKER_DEFAULT_ADDRESS("hive.druid.broker.address.default", "localhost:8082",
- "Address of the Druid broker. If we are querying Druid from Hive, this address needs to be\n" +
- "declared"),
+ "Address of the Druid broker. If we are querying Druid from Hive, this address needs to be\n"
+ +
+ "declared"
+ ),
HIVE_DRUID_SELECT_THRESHOLD("hive.druid.select.threshold", 10000,
"When we can split a Select query, this is the maximum number of rows that we try to retrieve\n" +
"per query. In order to do that, we obtain the estimated size for the complete result. If the\n" +
@@ -1949,7 +1961,27 @@ public class HiveConf extends Configuration {
"the HTTP client."),
HIVE_DRUID_HTTP_READ_TIMEOUT("hive.druid.http.read.timeout", "PT1M", "Read timeout period for the HTTP\n" +
"client in ISO8601 format (for example P2W, P3M, PT1H30M, PT0.750S), default is period of 1 minute."),
-
+ HIVE_DRUID_BASE_PERSIST_DIRECTORY("hive.druid.basePersistDirectory", "/tmp",
+ "Local temporary directory used to persist intermediate indexing state."
+ ),
+ DRUID_SEGMENT_DIRECTORY("hive.druid.storage.storageDirectory", "/druid/segments"
+ , "druid deep storage location."),
+ DRUID_METADATA_BASE("hive.druid.metadata.base", "druid", "Default prefix for metadata tables"),
+ DRUID_METADATA_DB_TYPE("hive.druid.metadata.db.type", "mysql",
+ new PatternSet("mysql", "postgres"), "Type of the metadata database."
+ ),
+ DRUID_METADATA_DB_USERNAME("hive.druid.metadata.username", "",
+ "Username to connect to Type of the metadata DB."
+ ),
+ DRUID_METADATA_DB_PASSWORD("hive.druid.metadata.password", "",
+ "Password to connect to Type of the metadata DB."
+ ),
+ DRUID_METADATA_DB_URI("hive.druid.metadata.uri", "",
+ "URI to connect to the database (for example jdbc:mysql://hostname:port/DBName)."
+ ),
+ DRUID_WORKING_DIR("hive.druid.working.directory", "/tmp/workingDirectory",
+ "Default hdfs working directory used to store some intermediate metadata"
+ ),
// For HBase storage handler
HIVE_HBASE_WAL_ENABLED("hive.hbase.wal.enabled", true,
"Whether writes to HBase should be forced to the write-ahead log. \n" +
http://git-wip-us.apache.org/repos/asf/hive/blob/590687bc/druid-handler/README.md
----------------------------------------------------------------------
diff --git a/druid-handler/README.md b/druid-handler/README.md
new file mode 100644
index 0000000..b548567
--- /dev/null
+++ b/druid-handler/README.md
@@ -0,0 +1,3 @@
+# Druid Storage Handler
+
+[Link for documentation]( https://cwiki.apache.org/confluence/display/Hive/Druid+Integration)
http://git-wip-us.apache.org/repos/asf/hive/blob/590687bc/druid-handler/pom.xml
----------------------------------------------------------------------
diff --git a/druid-handler/pom.xml b/druid-handler/pom.xml
index e4fa8fd..f691a2c 100644
--- a/druid-handler/pom.xml
+++ b/druid-handler/pom.xml
@@ -29,6 +29,8 @@
<properties>
<hive.path.to.root>..</hive.path.to.root>
+ <druid.metamx.util.version>0.27.10</druid.metamx.util.version>
+ <druid.guava.version>16.0.1</druid.guava.version>
</properties>
<dependencies>
@@ -47,6 +49,10 @@
<groupId>io.netty</groupId>
<artifactId>netty</artifactId>
</exclusion>
+ <exclusion>
+ <groupId>com.google.guava</groupId>
+ <artifactId>guava</artifactId>
+ </exclusion>
</exclusions>
</dependency>
<!-- inter-project -->
@@ -56,37 +62,47 @@
<version>${commons-lang.version}</version>
</dependency>
<dependency>
- <groupId>org.apache.hadoop</groupId>
- <artifactId>hadoop-common</artifactId>
- <version>${hadoop.version}</version>
- <optional>true</optional>
- <exclusions>
- <exclusion>
- <groupId>org.slf4j</groupId>
- <artifactId>slf4j-log4j12</artifactId>
- </exclusion>
- <exclusion>
- <groupId>commmons-logging</groupId>
- <artifactId>commons-logging</artifactId>
- </exclusion>
- </exclusions>
+ <groupId>com.google.guava</groupId>
+ <artifactId>guava</artifactId>
+ <version>${druid.guava.version}</version>
</dependency>
<dependency>
- <groupId>org.apache.hadoop</groupId>
- <artifactId>hadoop-mapreduce-client-core</artifactId>
- <version>${hadoop.version}</version>
- <optional>true</optional>
+ <groupId>com.metamx</groupId>
+ <artifactId>java-util</artifactId>
+ <version>${druid.metamx.util.version}</version>
<exclusions>
<exclusion>
- <groupId>io.netty</groupId>
- <artifactId>netty</artifactId>
+ <groupId>com.fasterxml.jackson.core</groupId>
+ <artifactId>jackson-core</artifactId>
+ </exclusion>
+ <exclusion>
+ <groupId>com.fasterxml.jackson.core</groupId>
+ <artifactId>jackson-annotations</artifactId>
+ </exclusion>
+ <exclusion>
+ <groupId>com.fasterxml.jackson.core</groupId>
+ <artifactId>jackson-databind</artifactId>
</exclusion>
</exclusions>
</dependency>
<dependency>
- <groupId>org.apache.calcite</groupId>
- <artifactId>calcite-druid</artifactId>
- <version>${calcite.version}</version>
+ <groupId>io.druid</groupId>
+ <artifactId>druid-server</artifactId>
+ <version>${druid.version}</version>
+ <exclusions>
+ <exclusion>
+ <groupId>com.fasterxml.jackson.core</groupId>
+ <artifactId>jackson-core</artifactId>
+ </exclusion>
+ <exclusion>
+ <groupId>com.fasterxml.jackson.core</groupId>
+ <artifactId>jackson-annotations</artifactId>
+ </exclusion>
+ <exclusion>
+ <groupId>com.fasterxml.jackson.core</groupId>
+ <artifactId>jackson-databind</artifactId>
+ </exclusion>
+ </exclusions>
</dependency>
<dependency>
<groupId>io.druid</groupId>
@@ -107,7 +123,72 @@
</exclusion>
</exclusions>
</dependency>
-
+ <dependency>
+ <groupId>io.druid.extensions</groupId>
+ <artifactId>druid-hdfs-storage</artifactId>
+ <version>${druid.version}</version>
+ </dependency>
+ <dependency>
+ <groupId>io.druid.extensions</groupId>
+ <artifactId>mysql-metadata-storage</artifactId>
+ <version>${druid.version}</version>
+ </dependency>
+ <dependency>
+ <groupId>io.druid.extensions</groupId>
+ <artifactId>postgresql-metadata-storage</artifactId>
+ <version>${druid.version}</version>
+ </dependency>
+ <dependency>
+ <groupId>org.apache.hadoop</groupId>
+ <artifactId>hadoop-common</artifactId>
+ <scope>provided</scope>
+ <version>${hadoop.version}</version>
+ <optional>true</optional>
+ <exclusions>
+ <exclusion>
+ <groupId>org.slf4j</groupId>
+ <artifactId>slf4j-log4j12</artifactId>
+ </exclusion>
+ <exclusion>
+ <groupId>com.google.guava</groupId>
+ <artifactId>guava</artifactId>
+ </exclusion>
+ <exclusion>
+ <groupId>commmons-logging</groupId>
+ <artifactId>commons-logging</artifactId>
+ </exclusion>
+ </exclusions>
+ </dependency>
+ <dependency>
+ <groupId>org.apache.hadoop</groupId>
+ <scope>provided</scope>
+ <artifactId>hadoop-mapreduce-client-core</artifactId>
+ <version>${hadoop.version}</version>
+ <optional>true</optional>
+ <exclusions>
+ <exclusion>
+ <groupId>com.google.inject</groupId>
+ <artifactId>guice</artifactId>
+ </exclusion>
+ <exclusion>
+ <groupId>com.google.inject.extensions</groupId>
+ <artifactId>guice-servlet</artifactId>
+ </exclusion>
+ <exclusion>
+ <groupId>io.netty</groupId>
+ <artifactId>netty</artifactId>
+ </exclusion>
+ <exclusion>
+ <groupId>com.google.guava</groupId>
+ <artifactId>guava</artifactId>
+ </exclusion>
+ </exclusions>
+ </dependency>
+ <dependency>
+ <groupId>org.apache.calcite</groupId>
+ <artifactId>calcite-druid</artifactId>
+ <version>${calcite.version}</version>
+ </dependency>
<!-- test inter-project -->
<dependency>
<groupId>junit</groupId>
@@ -115,6 +196,12 @@
<version>${junit.version}</version>
<scope>test</scope>
</dependency>
+ <dependency>
+ <groupId>io.druid</groupId>
+ <artifactId>druid-indexing-hadoop</artifactId>
+ <version>${druid.version}</version>
+ <scope>test</scope>
+ </dependency>
</dependencies>
<build>
@@ -170,15 +257,25 @@
<pattern>com.fasterxml.jackson</pattern>
<shadedPattern>org.apache.hive.druid.com.fasterxml.jackson</shadedPattern>
</relocation>
+ <relocation>
+ <pattern>com.google.common</pattern>
+ <shadedPattern>org.apache.hive.druid.com.google.common</shadedPattern>
+ </relocation>
</relocations>
<artifactSet>
<includes>
<include>io.druid:*</include>
+ <include>io.druid.extensions:*</include>
<include>com.metamx:*</include>
<include>io.netty:*</include>
<include>com.fasterxml.jackson.core:*</include>
<include>com.fasterxml.jackson.datatype:*</include>
<include>com.fasterxml.jackson.dataformat:*</include>
+ <include>com.google.guava:*</include>
+ <include>it.unimi.dsi:*</include>
+ <include>org.jdbi:*</include>
+ <include>net.jpountz.lz4:*</include>
+ <include>org.apache.commons:*</include>
</includes>
</artifactSet>
<filters>
http://git-wip-us.apache.org/repos/asf/hive/blob/590687bc/druid-handler/src/java/org/apache/hadoop/hive/druid/DruidStorageHandler.java
----------------------------------------------------------------------
diff --git a/druid-handler/src/java/org/apache/hadoop/hive/druid/DruidStorageHandler.java b/druid-handler/src/java/org/apache/hadoop/hive/druid/DruidStorageHandler.java
index 8242385..a08a4e3 100644
--- a/druid-handler/src/java/org/apache/hadoop/hive/druid/DruidStorageHandler.java
+++ b/druid-handler/src/java/org/apache/hadoop/hive/druid/DruidStorageHandler.java
@@ -6,9 +6,9 @@
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
+ * <p>
+ * http://www.apache.org/licenses/LICENSE-2.0
+ * <p>
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
@@ -17,36 +17,131 @@
*/
package org.apache.hadoop.hive.druid;
+import com.google.common.annotations.VisibleForTesting;
+import com.google.common.base.Preconditions;
+import com.google.common.base.Strings;
+import com.google.common.base.Supplier;
+import com.google.common.base.Suppliers;
+import com.google.common.base.Throwables;
+import io.druid.indexer.SQLMetadataStorageUpdaterJobHandler;
+import io.druid.metadata.MetadataStorageConnectorConfig;
+import io.druid.metadata.MetadataStorageTablesConfig;
+import io.druid.metadata.SQLMetadataConnector;
+import io.druid.metadata.storage.mysql.MySQLConnector;
+import io.druid.metadata.storage.postgresql.PostgreSQLConnector;
+import io.druid.segment.loading.SegmentLoadingException;
+import io.druid.timeline.DataSegment;
import org.apache.commons.lang3.StringUtils;
+import org.apache.hadoop.fs.FileSystem;
+import org.apache.hadoop.fs.Path;
import org.apache.hadoop.hive.conf.Constants;
+import org.apache.hadoop.hive.conf.HiveConf;
+import org.apache.hadoop.hive.druid.io.DruidOutputFormat;
+import org.apache.hadoop.hive.druid.io.DruidQueryBasedInputFormat;
import org.apache.hadoop.hive.druid.serde.DruidSerDe;
import org.apache.hadoop.hive.metastore.HiveMetaHook;
import org.apache.hadoop.hive.metastore.MetaStoreUtils;
import org.apache.hadoop.hive.metastore.api.MetaException;
import org.apache.hadoop.hive.metastore.api.Table;
import org.apache.hadoop.hive.ql.metadata.DefaultStorageHandler;
+import org.apache.hadoop.hive.ql.plan.TableDesc;
+import org.apache.hadoop.hive.ql.session.SessionState;
import org.apache.hadoop.hive.serde2.AbstractSerDe;
import org.apache.hadoop.mapred.InputFormat;
import org.apache.hadoop.mapred.OutputFormat;
+import org.joda.time.DateTime;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
+import java.io.IOException;
+import java.util.Collection;
+import java.util.List;
+import java.util.Map;
+
/**
* DruidStorageHandler provides a HiveStorageHandler implementation for Druid.
*/
-@SuppressWarnings({"deprecation","rawtypes"})
+@SuppressWarnings({ "deprecation", "rawtypes" })
public class DruidStorageHandler extends DefaultStorageHandler implements HiveMetaHook {
protected static final Logger LOG = LoggerFactory.getLogger(DruidStorageHandler.class);
+ public static final String SEGMENTS_DESCRIPTOR_DIR_NAME = "segmentsDescriptorDir";
+
+ private final SQLMetadataConnector connector;
+
+ private final SQLMetadataStorageUpdaterJobHandler druidSqlMetadataStorageUpdaterJobHandler;
+
+ private final MetadataStorageTablesConfig druidMetadataStorageTablesConfig;
+
+ private String uniqueId = null;
+
+ private String rootWorkingDir = null;
+
+ public DruidStorageHandler() {
+ //this is the default value in druid
+ final String base = HiveConf
+ .getVar(SessionState.getSessionConf(), HiveConf.ConfVars.DRUID_METADATA_BASE);
+ final String dbType = HiveConf
+ .getVar(SessionState.getSessionConf(), HiveConf.ConfVars.DRUID_METADATA_DB_TYPE);
+ final String username = HiveConf
+ .getVar(SessionState.getSessionConf(), HiveConf.ConfVars.DRUID_METADATA_DB_USERNAME);
+ final String password = HiveConf
+ .getVar(SessionState.getSessionConf(), HiveConf.ConfVars.DRUID_METADATA_DB_PASSWORD);
+ final String uri = HiveConf
+ .getVar(SessionState.getSessionConf(), HiveConf.ConfVars.DRUID_METADATA_DB_URI);
+ druidMetadataStorageTablesConfig = MetadataStorageTablesConfig.fromBase(base);
+
+ final Supplier<MetadataStorageConnectorConfig> storageConnectorConfigSupplier = Suppliers.<MetadataStorageConnectorConfig>ofInstance(
+ new MetadataStorageConnectorConfig() {
+ @Override
+ public String getConnectURI() {
+ return uri;
+ }
+
+ @Override
+ public String getUser() {
+ return username;
+ }
+
+ @Override
+ public String getPassword() {
+ return password;
+ }
+ });
+
+ if (dbType.equals("mysql")) {
+ connector = new MySQLConnector(storageConnectorConfigSupplier,
+ Suppliers.ofInstance(druidMetadataStorageTablesConfig)
+ );
+ } else if (dbType.equals("postgres")) {
+ connector = new PostgreSQLConnector(storageConnectorConfigSupplier,
+ Suppliers.ofInstance(druidMetadataStorageTablesConfig)
+ );
+ } else {
+ throw new IllegalStateException(String.format("Unknown metadata storage type [%s]", dbType));
+ }
+ druidSqlMetadataStorageUpdaterJobHandler = new SQLMetadataStorageUpdaterJobHandler(connector);
+ }
+
+ @VisibleForTesting
+ public DruidStorageHandler(SQLMetadataConnector connector,
+ SQLMetadataStorageUpdaterJobHandler druidSqlMetadataStorageUpdaterJobHandler,
+ MetadataStorageTablesConfig druidMetadataStorageTablesConfig
+ ) {
+ this.connector = connector;
+ this.druidSqlMetadataStorageUpdaterJobHandler = druidSqlMetadataStorageUpdaterJobHandler;
+ this.druidMetadataStorageTablesConfig = druidMetadataStorageTablesConfig;
+ }
+
@Override
public Class<? extends InputFormat> getInputFormatClass() {
- return HiveDruidQueryBasedInputFormat.class;
+ return DruidQueryBasedInputFormat.class;
}
@Override
public Class<? extends OutputFormat> getOutputFormatClass() {
- return HiveDruidOutputFormat.class;
+ return DruidOutputFormat.class;
}
@Override
@@ -62,28 +157,141 @@ public class DruidStorageHandler extends DefaultStorageHandler implements HiveMe
@Override
public void preCreateTable(Table table) throws MetaException {
// Do safety checks
- if (!MetaStoreUtils.isExternalTable(table)) {
- throw new MetaException("Table in Druid needs to be declared as EXTERNAL");
- }
- if (!StringUtils.isEmpty(table.getSd().getLocation())) {
+ if (MetaStoreUtils.isExternalTable(table) && !StringUtils
+ .isEmpty(table.getSd().getLocation())) {
throw new MetaException("LOCATION may not be specified for Druid");
}
+
if (table.getPartitionKeysSize() != 0) {
throw new MetaException("PARTITIONED BY may not be specified for Druid");
}
if (table.getSd().getBucketColsSize() != 0) {
throw new MetaException("CLUSTERED BY may not be specified for Druid");
}
+ String dataSourceName = table.getParameters().get(Constants.DRUID_DATA_SOURCE);
+ if (MetaStoreUtils.isExternalTable(table)) {
+ return;
+ }
+ // If it is not an external table we need to check the metadata
+ try {
+ connector.createSegmentTable();
+ } catch (Exception e) {
+ LOG.error("Exception while trying to create druid segments table", e);
+ throw new MetaException(e.getMessage());
+ }
+ Collection<String> existingDataSources = DruidStorageHandlerUtils
+ .getAllDataSourceNames(connector, druidMetadataStorageTablesConfig);
+ LOG.debug(String.format("pre-create data source with name [%s]", dataSourceName));
+ if (existingDataSources.contains(dataSourceName)) {
+ throw new MetaException(String.format("Data source [%s] already existing", dataSourceName));
+ }
}
@Override
public void rollbackCreateTable(Table table) throws MetaException {
- // Nothing to do
+ if (MetaStoreUtils.isExternalTable(table)) {
+ return;
+ }
+ final Path segmentDescriptorDir = getSegmentDescriptorDir();
+ try {
+ List<DataSegment> dataSegmentList = DruidStorageHandlerUtils
+ .getPublishedSegments(segmentDescriptorDir, getConf());
+ for (DataSegment dataSegment : dataSegmentList) {
+ try {
+ deleteSegment(dataSegment);
+ } catch (SegmentLoadingException e) {
+ LOG.error(String.format("Error while trying to clean the segment [%s]", dataSegment), e);
+ }
+ }
+ } catch (IOException e) {
+ LOG.error("Exception while rollback", e);
+ throw Throwables.propagate(e);
+ } finally {
+ cleanWorkingDir();
+ }
}
@Override
public void commitCreateTable(Table table) throws MetaException {
- // Nothing to do
+ if (MetaStoreUtils.isExternalTable(table)) {
+ return;
+ }
+ LOG.info(String.format("Committing table [%s] to the druid metastore", table.getDbName()));
+ final Path tableDir = getSegmentDescriptorDir();
+ try {
+ List<DataSegment> segmentList = DruidStorageHandlerUtils
+ .getPublishedSegments(tableDir, getConf());
+ LOG.info(String.format("Found [%d] segments under path [%s]", segmentList.size(), tableDir));
+ druidSqlMetadataStorageUpdaterJobHandler.publishSegments(
+ druidMetadataStorageTablesConfig.getSegmentsTable(),
+ segmentList,
+ DruidStorageHandlerUtils.JSON_MAPPER
+ );
+ } catch (IOException e) {
+ LOG.error("Exception while commit", e);
+ Throwables.propagate(e);
+ } finally {
+ cleanWorkingDir();
+ }
+ }
+
+ @VisibleForTesting
+ protected void deleteSegment(DataSegment segment) throws SegmentLoadingException {
+
+ final Path path = getPath(segment);
+ LOG.info(String.format("removing segment[%s], located at path[%s]", segment.getIdentifier(),
+ path
+ ));
+
+ try {
+ if (path.getName().endsWith(".zip")) {
+
+ final FileSystem fs = path.getFileSystem(getConf());
+
+ if (!fs.exists(path)) {
+ LOG.warn(String.format(
+ "Segment Path [%s] does not exist. It appears to have been deleted already.",
+ path
+ ));
+ return;
+ }
+
+ // path format -- > .../dataSource/interval/version/partitionNum/xxx.zip
+ Path partitionNumDir = path.getParent();
+ if (!fs.delete(partitionNumDir, true)) {
+ throw new SegmentLoadingException(
+ "Unable to kill segment, failed to delete dir [%s]",
+ partitionNumDir.toString()
+ );
+ }
+
+ //try to delete other directories if possible
+ Path versionDir = partitionNumDir.getParent();
+ if (safeNonRecursiveDelete(fs, versionDir)) {
+ Path intervalDir = versionDir.getParent();
+ if (safeNonRecursiveDelete(fs, intervalDir)) {
+ Path dataSourceDir = intervalDir.getParent();
+ safeNonRecursiveDelete(fs, dataSourceDir);
+ }
+ }
+ } else {
+ throw new SegmentLoadingException("Unknown file type[%s]", path);
+ }
+ } catch (IOException e) {
+ throw new SegmentLoadingException(e, "Unable to kill segment");
+ }
+ }
+
+ private static Path getPath(DataSegment dataSegment) {
+ return new Path(String.valueOf(dataSegment.getLoadSpec().get("path")));
+ }
+
+ private static boolean safeNonRecursiveDelete(FileSystem fs, Path path) {
+ try {
+ return fs.delete(path, false);
+ } catch (Exception ex) {
+ return false;
+ }
}
@Override
@@ -98,7 +306,45 @@ public class DruidStorageHandler extends DefaultStorageHandler implements HiveMe
@Override
public void commitDropTable(Table table, boolean deleteData) throws MetaException {
- // Nothing to do
+ if (MetaStoreUtils.isExternalTable(table)) {
+ return;
+ }
+ String dataSourceName = Preconditions
+ .checkNotNull(table.getParameters().get(Constants.DRUID_DATA_SOURCE),
+ "DataSource name is null !"
+ );
+
+ if (deleteData == true) {
+ LOG.info(String.format("Dropping with purge all the data for data source [%s]",
+ dataSourceName
+ ));
+ List<DataSegment> dataSegmentList = DruidStorageHandlerUtils
+ .getDataSegmentList(connector, druidMetadataStorageTablesConfig, dataSourceName);
+ if (dataSegmentList.isEmpty()) {
+ LOG.info(String.format("Nothing to delete for data source [%s]", dataSourceName));
+ return;
+ }
+ for (DataSegment dataSegment : dataSegmentList) {
+ try {
+ deleteSegment(dataSegment);
+ } catch (SegmentLoadingException e) {
+ LOG.error(String.format("Error while deleting segment [%s]", dataSegment.getIdentifier()),
+ e
+ );
+ }
+ }
+ }
+ if (DruidStorageHandlerUtils
+ .disableDataSource(connector, druidMetadataStorageTablesConfig, dataSourceName)) {
+ LOG.info(String.format("Successfully dropped druid data source [%s]", dataSourceName));
+ }
+ }
+
+ @Override
+ public void configureOutputJobProperties(TableDesc tableDesc, Map<String, String> jobProperties
+ ) {
+ jobProperties.put(Constants.DRUID_SEGMENT_VERSION, new DateTime().toString());
+ jobProperties.put(Constants.DRUID_JOB_WORKING_DIRECTORY, getStagingWorkingDir().toString());
}
@Override
@@ -106,4 +352,43 @@ public class DruidStorageHandler extends DefaultStorageHandler implements HiveMe
return Constants.DRUID_HIVE_STORAGE_HANDLER_ID;
}
+ public String getUniqueId() {
+ if (uniqueId == null) {
+ uniqueId = Preconditions.checkNotNull(
+ Strings.emptyToNull(HiveConf.getVar(getConf(), HiveConf.ConfVars.HIVEQUERYID)),
+ "Hive query id is null"
+ );
+ }
+ return uniqueId;
+ }
+
+ private Path getStagingWorkingDir() {
+ return new Path(getRootWorkingDir(), makeStagingName());
+ }
+
+ @VisibleForTesting
+ protected String makeStagingName() {
+ return ".staging-".concat(getUniqueId().replace(":", ""));
+ }
+
+ private Path getSegmentDescriptorDir() {
+ return new Path(getStagingWorkingDir(), SEGMENTS_DESCRIPTOR_DIR_NAME);
+ }
+
+ private void cleanWorkingDir() {
+ final FileSystem fileSystem;
+ try {
+ fileSystem = getStagingWorkingDir().getFileSystem(getConf());
+ fileSystem.delete(getStagingWorkingDir(), true);
+ } catch (IOException e) {
+ LOG.error("Got Exception while cleaning working directory", e);
+ }
+ }
+
+ private String getRootWorkingDir() {
+ if (Strings.isNullOrEmpty(rootWorkingDir)) {
+ rootWorkingDir = HiveConf.getVar(getConf(), HiveConf.ConfVars.DRUID_WORKING_DIR);
+ }
+ return rootWorkingDir;
+ }
}
http://git-wip-us.apache.org/repos/asf/hive/blob/590687bc/druid-handler/src/java/org/apache/hadoop/hive/druid/DruidStorageHandlerUtils.java
----------------------------------------------------------------------
diff --git a/druid-handler/src/java/org/apache/hadoop/hive/druid/DruidStorageHandlerUtils.java b/druid-handler/src/java/org/apache/hadoop/hive/druid/DruidStorageHandlerUtils.java
index c6b8024..193e4aa 100644
--- a/druid-handler/src/java/org/apache/hadoop/hive/druid/DruidStorageHandlerUtils.java
+++ b/druid-handler/src/java/org/apache/hadoop/hive/druid/DruidStorageHandlerUtils.java
@@ -6,9 +6,9 @@
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
+ * <p>
+ * http://www.apache.org/licenses/LICENSE-2.0
+ * <p>
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
@@ -17,22 +17,62 @@
*/
package org.apache.hadoop.hive.druid;
-import java.io.IOException;
-import java.io.InputStream;
-import java.net.URL;
-import java.util.concurrent.ExecutionException;
-
-import org.jboss.netty.handler.codec.http.HttpHeaders;
-import org.jboss.netty.handler.codec.http.HttpMethod;
-
import com.fasterxml.jackson.databind.ObjectMapper;
+import com.fasterxml.jackson.databind.jsontype.NamedType;
import com.fasterxml.jackson.dataformat.smile.SmileFactory;
+import com.google.common.base.Throwables;
+import com.google.common.collect.ImmutableList;
+import com.google.common.collect.Interner;
+import com.google.common.collect.Interners;
+import com.google.common.collect.Lists;
+import com.metamx.common.MapUtils;
+import com.metamx.emitter.EmittingLogger;
+import com.metamx.emitter.core.NoopEmitter;
+import com.metamx.emitter.service.ServiceEmitter;
import com.metamx.http.client.HttpClient;
import com.metamx.http.client.Request;
import com.metamx.http.client.response.InputStreamResponseHandler;
-
import io.druid.jackson.DefaultObjectMapper;
+import io.druid.metadata.MetadataStorageTablesConfig;
+import io.druid.metadata.SQLMetadataConnector;
+import io.druid.metadata.storage.mysql.MySQLConnector;
import io.druid.query.BaseQuery;
+import io.druid.segment.IndexIO;
+import io.druid.segment.IndexMergerV9;
+import io.druid.segment.column.ColumnConfig;
+import io.druid.timeline.DataSegment;
+import io.druid.timeline.partition.LinearShardSpec;
+import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.fs.FileStatus;
+import org.apache.hadoop.fs.FileSystem;
+import org.apache.hadoop.fs.Path;
+import org.apache.hadoop.io.retry.RetryPolicies;
+import org.apache.hadoop.io.retry.RetryProxy;
+import org.jboss.netty.handler.codec.http.HttpHeaders;
+import org.jboss.netty.handler.codec.http.HttpMethod;
+import org.skife.jdbi.v2.FoldController;
+import org.skife.jdbi.v2.Folder3;
+import org.skife.jdbi.v2.Handle;
+import org.skife.jdbi.v2.StatementContext;
+import org.skife.jdbi.v2.TransactionCallback;
+import org.skife.jdbi.v2.TransactionStatus;
+import org.skife.jdbi.v2.tweak.HandleCallback;
+import org.skife.jdbi.v2.util.ByteArrayMapper;
+
+import java.io.IOException;
+import java.io.InputStream;
+import java.io.OutputStream;
+import java.net.InetAddress;
+import java.net.URL;
+import java.net.UnknownHostException;
+import java.sql.SQLException;
+import java.util.ArrayList;
+import java.util.Collection;
+import java.util.List;
+import java.util.Map;
+import java.util.TimeZone;
+import java.util.concurrent.ExecutionException;
+import java.util.concurrent.TimeUnit;
/**
* Utils class for Druid storage handler.
@@ -51,12 +91,61 @@ public final class DruidStorageHandlerUtils {
*/
public static final ObjectMapper SMILE_MAPPER = new DefaultObjectMapper(new SmileFactory());
+ private static final int NUM_RETRIES = 8;
+
+ private static final int SECONDS_BETWEEN_RETRIES = 2;
+
+ private static final int DEFAULT_FS_BUFFER_SIZE = 1 << 18; // 256KB
+
+ private static final int DEFAULT_STREAMING_RESULT_SIZE = 100;
+
+ /**
+ * Used by druid to perform IO on indexes
+ */
+ public static final IndexIO INDEX_IO = new IndexIO(JSON_MAPPER, new ColumnConfig() {
+ @Override
+ public int columnCacheSizeBytes() {
+ return 0;
+ }
+ });
+
+ /**
+ * Used by druid to merge indexes
+ */
+ public static final IndexMergerV9 INDEX_MERGER_V9 = new IndexMergerV9(JSON_MAPPER,
+ DruidStorageHandlerUtils.INDEX_IO
+ );
+
+ /**
+ * Generic Interner implementation used to read segments object from metadata storage
+ */
+ public static final Interner<DataSegment> DATA_SEGMENT_INTERNER = Interners.newWeakInterner();
+
+ static {
+ // Register the shard sub type to be used by the mapper
+ JSON_MAPPER.registerSubtypes(new NamedType(LinearShardSpec.class, "linear"));
+ // set the timezone of the object mapper
+ // THIS IS NOT WORKING workaround is to set it as part of java opts -Duser.timezone="UTC"
+ JSON_MAPPER.setTimeZone(TimeZone.getTimeZone("UTC"));
+ try {
+ // No operation emitter will be used by some internal druid classes.
+ EmittingLogger.registerEmitter(
+ new ServiceEmitter("druid-hive-indexer", InetAddress.getLocalHost().getHostName(),
+ new NoopEmitter()
+ ));
+ } catch (UnknownHostException e) {
+ throw Throwables.propagate(e);
+ }
+ }
+
/**
* Method that creates a request for Druid JSON query (using SMILE).
- * @param mapper
+ *
* @param address
* @param query
+ *
* @return
+ *
* @throws IOException
*/
public static Request createRequest(String address, BaseQuery<?> query)
@@ -69,9 +158,12 @@ public final class DruidStorageHandlerUtils {
/**
* Method that submits a request to an Http address and retrieves the result.
* The caller is responsible for closing the stream once it finishes consuming it.
+ *
* @param client
* @param request
+ *
* @return
+ *
* @throws IOException
*/
public static InputStream submitRequest(HttpClient client, Request request)
@@ -87,4 +179,237 @@ public final class DruidStorageHandlerUtils {
return response;
}
+ /**
+ * @param taskDir path to the directory containing the segments descriptor info
+ * the descriptor path will be .../workingPath/task_id/{@link DruidStorageHandler#SEGMENTS_DESCRIPTOR_DIR_NAME}/*.json
+ * @param conf hadoop conf to get the file system
+ *
+ * @return List of DataSegments
+ *
+ * @throws IOException can be for the case we did not produce data.
+ */
+ public static List<DataSegment> getPublishedSegments(Path taskDir, Configuration conf)
+ throws IOException {
+ ImmutableList.Builder<DataSegment> publishedSegmentsBuilder = ImmutableList.builder();
+ FileSystem fs = taskDir.getFileSystem(conf);
+ for (FileStatus fileStatus : fs.listStatus(taskDir)) {
+ final DataSegment segment = JSON_MAPPER
+ .readValue(fs.open(fileStatus.getPath()), DataSegment.class);
+ publishedSegmentsBuilder.add(segment);
+ }
+ List<DataSegment> publishedSegments = publishedSegmentsBuilder.build();
+ return publishedSegments;
+ }
+
+ /**
+ * This function will write to filesystem serialized from of segment descriptor
+ * if an existing file exists it will try to replace it.
+ *
+ * @param outputFS filesystem
+ * @param segment DataSegment object
+ * @param descriptorPath path
+ *
+ * @throws IOException
+ */
+ public static void writeSegmentDescriptor(
+ final FileSystem outputFS,
+ final DataSegment segment,
+ final Path descriptorPath
+ )
+ throws IOException {
+ final DataPusher descriptorPusher = (DataPusher) RetryProxy.create(
+ DataPusher.class, new DataPusher() {
+ @Override
+ public long push() throws IOException {
+ try {
+ if (outputFS.exists(descriptorPath)) {
+ if (!outputFS.delete(descriptorPath, false)) {
+ throw new IOException(
+ String.format("Failed to delete descriptor at [%s]", descriptorPath));
+ }
+ }
+ try (final OutputStream descriptorOut = outputFS.create(
+ descriptorPath,
+ true,
+ DEFAULT_FS_BUFFER_SIZE
+ )) {
+ JSON_MAPPER.writeValue(descriptorOut, segment);
+ descriptorOut.flush();
+ }
+ } catch (RuntimeException | IOException ex) {
+ throw ex;
+ }
+ return -1;
+ }
+ },
+ RetryPolicies
+ .exponentialBackoffRetry(NUM_RETRIES, SECONDS_BETWEEN_RETRIES, TimeUnit.SECONDS)
+ );
+ descriptorPusher.push();
+ }
+
+ /**
+ * @param connector SQL metadata connector to the metadata storage
+ * @param metadataStorageTablesConfig Table config
+ *
+ * @return all the active data sources in the metadata storage
+ */
+ public static Collection<String> getAllDataSourceNames(SQLMetadataConnector connector,
+ final MetadataStorageTablesConfig metadataStorageTablesConfig
+ ) {
+ return connector.getDBI().withHandle(
+ new HandleCallback<List<String>>() {
+ @Override
+ public List<String> withHandle(Handle handle) throws Exception {
+ return handle.createQuery(
+ String.format("SELECT DISTINCT(datasource) FROM %s WHERE used = true",
+ metadataStorageTablesConfig.getSegmentsTable()
+ ))
+ .fold(Lists.<String>newArrayList(),
+ new Folder3<ArrayList<String>, Map<String, Object>>() {
+ @Override
+ public ArrayList<String> fold(ArrayList<String> druidDataSources,
+ Map<String, Object> stringObjectMap,
+ FoldController foldController,
+ StatementContext statementContext) throws SQLException {
+ druidDataSources.add(
+ MapUtils.getString(stringObjectMap, "datasource")
+ );
+ return druidDataSources;
+ }
+ }
+ );
+
+ }
+ }
+ );
+ }
+
+ /**
+ * @param connector SQL connector to metadata
+ * @param metadataStorageTablesConfig Tables configuration
+ * @param dataSource Name of data source
+ *
+ * @return true if the data source was successfully disabled false otherwise
+ */
+ public static boolean disableDataSource(SQLMetadataConnector connector,
+ final MetadataStorageTablesConfig metadataStorageTablesConfig, final String dataSource
+ ) {
+ try {
+ if (!getAllDataSourceNames(connector, metadataStorageTablesConfig).contains(dataSource)) {
+ DruidStorageHandler.LOG
+ .warn(String.format("Cannot delete data source [%s], does not exist", dataSource));
+ return false;
+ }
+
+ connector.getDBI().withHandle(
+ new HandleCallback<Void>() {
+ @Override
+ public Void withHandle(Handle handle) throws Exception {
+ handle.createStatement(
+ String.format("UPDATE %s SET used=false WHERE dataSource = :dataSource",
+ metadataStorageTablesConfig.getSegmentsTable()
+ )
+ )
+ .bind("dataSource", dataSource)
+ .execute();
+
+ return null;
+ }
+ }
+ );
+
+ } catch (Exception e) {
+ DruidStorageHandler.LOG.error(String.format("Error removing dataSource %s", dataSource), e);
+ return false;
+ }
+ return true;
+ }
+
+ /**
+ * @param connector SQL connector to metadata
+ * @param metadataStorageTablesConfig Tables configuration
+ * @param dataSource Name of data source
+ *
+ * @return List of all data segments part of the given data source
+ */
+ public static List<DataSegment> getDataSegmentList(final SQLMetadataConnector connector,
+ final MetadataStorageTablesConfig metadataStorageTablesConfig, final String dataSource
+ ) {
+ List<DataSegment> segmentList = connector.retryTransaction(
+ new TransactionCallback<List<DataSegment>>() {
+ @Override
+ public List<DataSegment> inTransaction(
+ Handle handle, TransactionStatus status
+ ) throws Exception {
+ return handle
+ .createQuery(String.format(
+ "SELECT payload FROM %s WHERE dataSource = :dataSource",
+ metadataStorageTablesConfig.getSegmentsTable()
+ ))
+ .setFetchSize(getStreamingFetchSize(connector))
+ .bind("dataSource", dataSource)
+ .map(ByteArrayMapper.FIRST)
+ .fold(
+ new ArrayList<DataSegment>(),
+ new Folder3<List<DataSegment>, byte[]>() {
+ @Override
+ public List<DataSegment> fold(List<DataSegment> accumulator,
+ byte[] payload, FoldController control,
+ StatementContext ctx
+ ) throws SQLException {
+ try {
+ final DataSegment segment = DATA_SEGMENT_INTERNER.intern(
+ JSON_MAPPER.readValue(
+ payload,
+ DataSegment.class
+ ));
+
+ accumulator.add(segment);
+ return accumulator;
+ } catch (Exception e) {
+ throw new SQLException(e.toString());
+ }
+ }
+ }
+ );
+ }
+ }
+ , 3, SQLMetadataConnector.DEFAULT_MAX_TRIES);
+ return segmentList;
+ }
+
+ /**
+ * @param connector
+ *
+ * @return streaming fetch size.
+ */
+ private static int getStreamingFetchSize(SQLMetadataConnector connector) {
+ if (connector instanceof MySQLConnector) {
+ return Integer.MIN_VALUE;
+ }
+ return DEFAULT_STREAMING_RESULT_SIZE;
+ }
+
+ /**
+ * @param pushedSegment
+ * @param segmentsDescriptorDir
+ *
+ * @return a sanitize file name
+ */
+ public static Path makeSegmentDescriptorOutputPath(DataSegment pushedSegment,
+ Path segmentsDescriptorDir
+ ) {
+ return new Path(
+ segmentsDescriptorDir,
+ String.format("%s.json", pushedSegment.getIdentifier().replace(":", ""))
+ );
+ }
+
+ /**
+ * Simple interface for retry operations
+ */
+ public interface DataPusher {
+ long push() throws IOException;
+ }
}
http://git-wip-us.apache.org/repos/asf/hive/blob/590687bc/druid-handler/src/java/org/apache/hadoop/hive/druid/HiveDruidOutputFormat.java
----------------------------------------------------------------------
diff --git a/druid-handler/src/java/org/apache/hadoop/hive/druid/HiveDruidOutputFormat.java b/druid-handler/src/java/org/apache/hadoop/hive/druid/HiveDruidOutputFormat.java
deleted file mode 100644
index 45e31d6..0000000
--- a/druid-handler/src/java/org/apache/hadoop/hive/druid/HiveDruidOutputFormat.java
+++ /dev/null
@@ -1,55 +0,0 @@
-/**
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements. See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership. The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package org.apache.hadoop.hive.druid;
-
-import java.io.IOException;
-import java.util.Properties;
-
-import org.apache.hadoop.fs.FileSystem;
-import org.apache.hadoop.fs.Path;
-import org.apache.hadoop.hive.ql.exec.FileSinkOperator;
-import org.apache.hadoop.hive.ql.io.HiveOutputFormat;
-import org.apache.hadoop.mapred.JobConf;
-import org.apache.hadoop.mapred.RecordWriter;
-import org.apache.hadoop.util.Progressable;
-
-/**
- * Place holder for Druid output format. Currently not implemented.
- */
-@SuppressWarnings("rawtypes")
-public class HiveDruidOutputFormat implements HiveOutputFormat {
-
- @Override
- public RecordWriter getRecordWriter(FileSystem ignored, JobConf job, String name,
- Progressable progress) throws IOException {
- throw new UnsupportedOperationException();
- }
-
- @Override
- public void checkOutputSpecs(FileSystem ignored, JobConf job) throws IOException {
- throw new UnsupportedOperationException();
- }
-
- @Override
- public FileSinkOperator.RecordWriter getHiveRecordWriter(JobConf jc, Path finalOutPath,
- Class valueClass, boolean isCompressed, Properties tableProperties, Progressable progress)
- throws IOException {
- throw new UnsupportedOperationException();
- }
-
-}
http://git-wip-us.apache.org/repos/asf/hive/blob/590687bc/druid-handler/src/java/org/apache/hadoop/hive/druid/HiveDruidQueryBasedInputFormat.java
----------------------------------------------------------------------
diff --git a/druid-handler/src/java/org/apache/hadoop/hive/druid/HiveDruidQueryBasedInputFormat.java b/druid-handler/src/java/org/apache/hadoop/hive/druid/HiveDruidQueryBasedInputFormat.java
deleted file mode 100644
index 612f853..0000000
--- a/druid-handler/src/java/org/apache/hadoop/hive/druid/HiveDruidQueryBasedInputFormat.java
+++ /dev/null
@@ -1,376 +0,0 @@
-/**
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements. See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership. The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package org.apache.hadoop.hive.druid;
-
-import java.io.IOException;
-import java.io.InputStream;
-import java.util.ArrayList;
-import java.util.Arrays;
-import java.util.HashMap;
-import java.util.List;
-import java.util.Map;
-
-import org.apache.calcite.adapter.druid.DruidDateTimeUtils;
-import org.apache.calcite.adapter.druid.DruidTable;
-import org.apache.commons.lang3.StringEscapeUtils;
-import org.apache.commons.lang3.StringUtils;
-import org.apache.hadoop.conf.Configuration;
-import org.apache.hadoop.fs.Path;
-import org.apache.hadoop.hive.conf.Constants;
-import org.apache.hadoop.hive.conf.HiveConf;
-import org.apache.hadoop.hive.druid.serde.DruidGroupByQueryRecordReader;
-import org.apache.hadoop.hive.druid.serde.DruidQueryRecordReader;
-import org.apache.hadoop.hive.druid.serde.DruidSelectQueryRecordReader;
-import org.apache.hadoop.hive.druid.serde.DruidTimeseriesQueryRecordReader;
-import org.apache.hadoop.hive.druid.serde.DruidTopNQueryRecordReader;
-import org.apache.hadoop.hive.druid.serde.DruidWritable;
-import org.apache.hadoop.hive.shims.ShimLoader;
-import org.apache.hadoop.io.NullWritable;
-import org.apache.hadoop.mapred.JobConf;
-import org.apache.hadoop.mapred.Reporter;
-import org.apache.hadoop.mapreduce.InputFormat;
-import org.apache.hadoop.mapreduce.InputSplit;
-import org.apache.hadoop.mapreduce.Job;
-import org.apache.hadoop.mapreduce.JobContext;
-import org.apache.hadoop.mapreduce.RecordReader;
-import org.apache.hadoop.mapreduce.TaskAttemptContext;
-import org.apache.hadoop.mapreduce.lib.input.FileInputFormat;
-import org.joda.time.Interval;
-import org.joda.time.Period;
-import org.joda.time.chrono.ISOChronology;
-import org.slf4j.Logger;
-import org.slf4j.LoggerFactory;
-
-import com.fasterxml.jackson.core.type.TypeReference;
-import com.metamx.common.lifecycle.Lifecycle;
-import com.metamx.http.client.HttpClient;
-import com.metamx.http.client.HttpClientConfig;
-import com.metamx.http.client.HttpClientInit;
-
-import io.druid.query.Druids;
-import io.druid.query.Druids.SegmentMetadataQueryBuilder;
-import io.druid.query.Druids.SelectQueryBuilder;
-import io.druid.query.Druids.TimeBoundaryQueryBuilder;
-import io.druid.query.Query;
-import io.druid.query.Result;
-import io.druid.query.metadata.metadata.SegmentAnalysis;
-import io.druid.query.metadata.metadata.SegmentMetadataQuery;
-import io.druid.query.select.PagingSpec;
-import io.druid.query.select.SelectQuery;
-import io.druid.query.spec.MultipleIntervalSegmentSpec;
-import io.druid.query.timeboundary.TimeBoundaryQuery;
-import io.druid.query.timeboundary.TimeBoundaryResultValue;
-
-/**
- * Druid query based input format.
- *
- * Given a query and the Druid broker address, it will send it, and retrieve
- * and parse the results.
- */
-public class HiveDruidQueryBasedInputFormat extends InputFormat<NullWritable, DruidWritable>
- implements org.apache.hadoop.mapred.InputFormat<NullWritable, DruidWritable> {
-
- protected static final Logger LOG = LoggerFactory.getLogger(HiveDruidQueryBasedInputFormat.class);
-
- @Override
- public org.apache.hadoop.mapred.InputSplit[] getSplits(JobConf job, int numSplits)
- throws IOException {
- return getInputSplits(job);
- }
-
- @Override
- public List<InputSplit> getSplits(JobContext context) throws IOException, InterruptedException {
- return Arrays.<InputSplit> asList(getInputSplits(context.getConfiguration()));
- }
-
- @SuppressWarnings("deprecation")
- private HiveDruidSplit[] getInputSplits(Configuration conf) throws IOException {
- String address = HiveConf.getVar(conf,
- HiveConf.ConfVars.HIVE_DRUID_BROKER_DEFAULT_ADDRESS);
- if (StringUtils.isEmpty(address)) {
- throw new IOException("Druid broker address not specified in configuration");
- }
- String druidQuery = StringEscapeUtils.unescapeJava(conf.get(Constants.DRUID_QUERY_JSON));
- String druidQueryType;
- if (StringUtils.isEmpty(druidQuery)) {
- // Empty, maybe because CBO did not run; we fall back to
- // full Select query
- if (LOG.isWarnEnabled()) {
- LOG.warn("Druid query is empty; creating Select query");
- }
- String dataSource = conf.get(Constants.DRUID_DATA_SOURCE);
- if (dataSource == null) {
- throw new IOException("Druid data source cannot be empty");
- }
- druidQuery = createSelectStarQuery(address, dataSource);
- druidQueryType = Query.SELECT;
- } else {
- druidQueryType = conf.get(Constants.DRUID_QUERY_TYPE);
- if (druidQueryType == null) {
- throw new IOException("Druid query type not recognized");
- }
- }
-
- // hive depends on FileSplits
- Job job = new Job(conf);
- JobContext jobContext = ShimLoader.getHadoopShims().newJobContext(job);
- Path [] paths = FileInputFormat.getInputPaths(jobContext);
-
- switch (druidQueryType) {
- case Query.TIMESERIES:
- case Query.TOPN:
- case Query.GROUP_BY:
- return new HiveDruidSplit[] { new HiveDruidSplit(address, druidQuery, paths[0]) };
- case Query.SELECT:
- return splitSelectQuery(conf, address, druidQuery, paths[0]);
- default:
- throw new IOException("Druid query type not recognized");
- }
- }
-
- private static String createSelectStarQuery(String address, String dataSource) throws IOException {
- // Create Select query
- SelectQueryBuilder builder = new Druids.SelectQueryBuilder();
- builder.dataSource(dataSource);
- builder.intervals(Arrays.asList(DruidTable.DEFAULT_INTERVAL));
- builder.pagingSpec(PagingSpec.newSpec(1));
- Map<String, Object> context = new HashMap<>();
- context.put(Constants.DRUID_QUERY_FETCH, false);
- builder.context(context);
- return DruidStorageHandlerUtils.JSON_MAPPER.writeValueAsString(builder.build());
- }
-
- /* Method that splits Select query depending on the threshold so read can be
- * parallelized */
- private static HiveDruidSplit[] splitSelectQuery(Configuration conf, String address,
- String druidQuery, Path dummyPath) throws IOException {
- final int selectThreshold = (int) HiveConf.getIntVar(
- conf, HiveConf.ConfVars.HIVE_DRUID_SELECT_THRESHOLD);
- final int numConnection = HiveConf
- .getIntVar(conf, HiveConf.ConfVars.HIVE_DRUID_NUM_HTTP_CONNECTION);
- final Period readTimeout = new Period(
- HiveConf.getVar(conf, HiveConf.ConfVars.HIVE_DRUID_HTTP_READ_TIMEOUT));
- SelectQuery query;
- try {
- query = DruidStorageHandlerUtils.JSON_MAPPER.readValue(druidQuery, SelectQuery.class);
- } catch (Exception e) {
- throw new IOException(e);
- }
-
- final boolean isFetch = query.getContextBoolean(Constants.DRUID_QUERY_FETCH, false);
- if (isFetch) {
- // If it has a limit, we use it and we do not split the query
- return new HiveDruidSplit[] { new HiveDruidSplit(
- address, DruidStorageHandlerUtils.JSON_MAPPER.writeValueAsString(query), dummyPath) };
- }
-
- // We do not have the number of rows, thus we need to execute a
- // Segment Metadata query to obtain number of rows
- SegmentMetadataQueryBuilder metadataBuilder = new Druids.SegmentMetadataQueryBuilder();
- metadataBuilder.dataSource(query.getDataSource());
- metadataBuilder.intervals(query.getIntervals());
- metadataBuilder.merge(true);
- metadataBuilder.analysisTypes();
- SegmentMetadataQuery metadataQuery = metadataBuilder.build();
-
- HttpClient client = HttpClientInit.createClient(
- HttpClientConfig.builder().withNumConnections(numConnection)
- .withReadTimeout(readTimeout.toStandardDuration()).build(), new Lifecycle());
- InputStream response;
- try {
- response = DruidStorageHandlerUtils.submitRequest(client,
- DruidStorageHandlerUtils.createRequest(address, metadataQuery));
- } catch (Exception e) {
- throw new IOException(org.apache.hadoop.util.StringUtils.stringifyException(e));
- }
-
- // Retrieve results
- List<SegmentAnalysis> metadataList;
- try {
- metadataList = DruidStorageHandlerUtils.SMILE_MAPPER.readValue(response,
- new TypeReference<List<SegmentAnalysis>>() {});
- } catch (Exception e) {
- response.close();
- throw new IOException(org.apache.hadoop.util.StringUtils.stringifyException(e));
- }
- if (metadataList == null || metadataList.isEmpty()) {
- throw new IOException("Connected to Druid but could not retrieve datasource information");
- }
- if (metadataList.size() != 1) {
- throw new IOException("Information about segments should have been merged");
- }
-
- final long numRows = metadataList.get(0).getNumRows();
-
- query = query.withPagingSpec(PagingSpec.newSpec(Integer.MAX_VALUE));
- if (numRows <= selectThreshold) {
- // We are not going to split it
- return new HiveDruidSplit[] { new HiveDruidSplit(address,
- DruidStorageHandlerUtils.JSON_MAPPER.writeValueAsString(query), dummyPath) };
- }
-
- // If the query does not specify a timestamp, we obtain the total time using
- // a Time Boundary query. Then, we use the information to split the query
- // following the Select threshold configuration property
- final List<Interval> intervals = new ArrayList<>();
- if (query.getIntervals().size() == 1 && query.getIntervals().get(0).withChronology(
- ISOChronology.getInstanceUTC()).equals(DruidTable.DEFAULT_INTERVAL)) {
- // Default max and min, we should execute a time boundary query to get a
- // more precise range
- TimeBoundaryQueryBuilder timeBuilder = new Druids.TimeBoundaryQueryBuilder();
- timeBuilder.dataSource(query.getDataSource());
- TimeBoundaryQuery timeQuery = timeBuilder.build();
-
- try {
- response = DruidStorageHandlerUtils.submitRequest(client,
- DruidStorageHandlerUtils.createRequest(address, timeQuery));
- } catch (Exception e) {
- throw new IOException(org.apache.hadoop.util.StringUtils.stringifyException(e));
- }
-
- // Retrieve results
- List<Result<TimeBoundaryResultValue>> timeList;
- try {
- timeList = DruidStorageHandlerUtils.SMILE_MAPPER.readValue(response,
- new TypeReference<List<Result<TimeBoundaryResultValue>>>() {});
- } catch (Exception e) {
- response.close();
- throw new IOException(org.apache.hadoop.util.StringUtils.stringifyException(e));
- }
- if (timeList == null || timeList.isEmpty()) {
- throw new IOException("Connected to Druid but could not retrieve time boundary information");
- }
- if (timeList.size() != 1) {
- throw new IOException("We should obtain a single time boundary");
- }
-
- intervals.add(new Interval(timeList.get(0).getValue().getMinTime().getMillis(),
- timeList.get(0).getValue().getMaxTime().getMillis(), ISOChronology.getInstanceUTC()));
- } else {
- intervals.addAll(query.getIntervals());
- }
-
- // Create (numRows/default threshold) input splits
- int numSplits = (int) Math.ceil((double) numRows / selectThreshold);
- List<List<Interval>> newIntervals = createSplitsIntervals(intervals, numSplits);
- HiveDruidSplit[] splits = new HiveDruidSplit[numSplits];
- for (int i = 0; i < numSplits; i++) {
- // Create partial Select query
- final SelectQuery partialQuery = query.withQuerySegmentSpec(
- new MultipleIntervalSegmentSpec(newIntervals.get(i)));
- splits[i] = new HiveDruidSplit(address,
- DruidStorageHandlerUtils.JSON_MAPPER.writeValueAsString(partialQuery), dummyPath);
- }
- return splits;
- }
-
- private static List<List<Interval>> createSplitsIntervals(List<Interval> intervals, int numSplits) {
- final long totalTime = DruidDateTimeUtils.extractTotalTime(intervals);
- long startTime = intervals.get(0).getStartMillis();
- long endTime = startTime;
- long currTime = 0;
- List<List<Interval>> newIntervals = new ArrayList<>();
- for (int i = 0, posIntervals = 0; i < numSplits; i++) {
- final long rangeSize = Math.round( (double) (totalTime * (i + 1)) / numSplits) -
- Math.round( (double) (totalTime * i) / numSplits);
- // Create the new interval(s)
- List<Interval> currentIntervals = new ArrayList<>();
- while (posIntervals < intervals.size()) {
- final Interval interval = intervals.get(posIntervals);
- final long expectedRange = rangeSize - currTime;
- if (interval.getEndMillis() - startTime >= expectedRange) {
- endTime = startTime + expectedRange;
- currentIntervals.add(new Interval(startTime, endTime, ISOChronology.getInstanceUTC()));
- startTime = endTime;
- currTime = 0;
- break;
- }
- endTime = interval.getEndMillis();
- currentIntervals.add(new Interval(startTime, endTime, ISOChronology.getInstanceUTC()));
- currTime += (endTime - startTime);
- startTime = intervals.get(++posIntervals).getStartMillis();
- }
- newIntervals.add(currentIntervals);
- }
- assert endTime == intervals.get(intervals.size()-1).getEndMillis();
- return newIntervals;
- }
-
- @Override
- public org.apache.hadoop.mapred.RecordReader<NullWritable, DruidWritable> getRecordReader(
- org.apache.hadoop.mapred.InputSplit split, JobConf job, Reporter reporter)
- throws IOException {
- // We need to provide a different record reader for every type of Druid query.
- // The reason is that Druid results format is different for each type.
- final DruidQueryRecordReader<?,?> reader;
- final String druidQueryType = job.get(Constants.DRUID_QUERY_TYPE);
- if (druidQueryType == null) {
- reader = new DruidSelectQueryRecordReader(); // By default
- reader.initialize((HiveDruidSplit)split, job);
- return reader;
- }
- switch (druidQueryType) {
- case Query.TIMESERIES:
- reader = new DruidTimeseriesQueryRecordReader();
- break;
- case Query.TOPN:
- reader = new DruidTopNQueryRecordReader();
- break;
- case Query.GROUP_BY:
- reader = new DruidGroupByQueryRecordReader();
- break;
- case Query.SELECT:
- reader = new DruidSelectQueryRecordReader();
- break;
- default:
- throw new IOException("Druid query type not recognized");
- }
- reader.initialize((HiveDruidSplit)split, job);
- return reader;
- }
-
- @Override
- public RecordReader<NullWritable, DruidWritable> createRecordReader(InputSplit split,
- TaskAttemptContext context) throws IOException, InterruptedException {
- // We need to provide a different record reader for every type of Druid query.
- // The reason is that Druid results format is different for each type.
- final String druidQueryType = context.getConfiguration().get(Constants.DRUID_QUERY_TYPE);
- if (druidQueryType == null) {
- return new DruidSelectQueryRecordReader(); // By default
- }
- final DruidQueryRecordReader<?,?> reader;
- switch (druidQueryType) {
- case Query.TIMESERIES:
- reader = new DruidTimeseriesQueryRecordReader();
- break;
- case Query.TOPN:
- reader = new DruidTopNQueryRecordReader();
- break;
- case Query.GROUP_BY:
- reader = new DruidGroupByQueryRecordReader();
- break;
- case Query.SELECT:
- reader = new DruidSelectQueryRecordReader();
- break;
- default:
- throw new IOException("Druid query type not recognized");
- }
- return reader;
- }
-
-}
http://git-wip-us.apache.org/repos/asf/hive/blob/590687bc/druid-handler/src/java/org/apache/hadoop/hive/druid/HiveDruidSplit.java
----------------------------------------------------------------------
diff --git a/druid-handler/src/java/org/apache/hadoop/hive/druid/HiveDruidSplit.java b/druid-handler/src/java/org/apache/hadoop/hive/druid/HiveDruidSplit.java
deleted file mode 100644
index 3fba5d0..0000000
--- a/druid-handler/src/java/org/apache/hadoop/hive/druid/HiveDruidSplit.java
+++ /dev/null
@@ -1,83 +0,0 @@
-/**
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements. See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership. The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package org.apache.hadoop.hive.druid;
-
-import java.io.DataInput;
-import java.io.DataOutput;
-import java.io.IOException;
-
-import org.apache.hadoop.fs.Path;
-import org.apache.hadoop.mapred.FileSplit;
-
-/**
- * Druid split. Its purpose is to trigger query execution in Druid.
- */
-public class HiveDruidSplit extends FileSplit implements org.apache.hadoop.mapred.InputSplit {
-
- private String address;
- private String druidQuery;
-
- // required for deserialization
- public HiveDruidSplit() {
- super((Path) null, 0, 0, (String[]) null);
- }
-
- public HiveDruidSplit(String address, String druidQuery, Path dummyPath) {
- super(dummyPath, 0, 0, (String[]) null);
- this.address = address;
- this.druidQuery = druidQuery;
- }
-
- @Override
- public void write(DataOutput out) throws IOException {
- super.write(out);
- out.writeUTF(address);
- out.writeUTF(druidQuery);
- }
-
- @Override
- public void readFields(DataInput in) throws IOException {
- super.readFields(in);
- address = in.readUTF();
- druidQuery = in.readUTF();
- }
-
- @Override
- public long getLength() {
- return 0L;
- }
-
- @Override
- public String[] getLocations() {
- return new String[] {""} ;
- }
-
- public String getAddress() {
- return address;
- }
-
- public String getDruidQuery() {
- return druidQuery;
- }
-
- @Override
- public String toString() {
- return "HiveDruidSplit{" + address + ", " + druidQuery + "}";
- }
-
-}
http://git-wip-us.apache.org/repos/asf/hive/blob/590687bc/druid-handler/src/java/org/apache/hadoop/hive/druid/io/DruidOutputFormat.java
----------------------------------------------------------------------
diff --git a/druid-handler/src/java/org/apache/hadoop/hive/druid/io/DruidOutputFormat.java b/druid-handler/src/java/org/apache/hadoop/hive/druid/io/DruidOutputFormat.java
new file mode 100644
index 0000000..86ddca8
--- /dev/null
+++ b/druid-handler/src/java/org/apache/hadoop/hive/druid/io/DruidOutputFormat.java
@@ -0,0 +1,204 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ * <p>
+ * http://www.apache.org/licenses/LICENSE-2.0
+ * <p>
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.hadoop.hive.druid.io;
+
+import com.google.common.base.Preconditions;
+import com.google.common.collect.ImmutableList;
+import com.google.common.collect.Lists;
+import com.metamx.common.Granularity;
+import io.druid.data.input.impl.DimensionSchema;
+import io.druid.data.input.impl.DimensionsSpec;
+import io.druid.data.input.impl.InputRowParser;
+import io.druid.data.input.impl.MapInputRowParser;
+import io.druid.data.input.impl.StringDimensionSchema;
+import io.druid.data.input.impl.TimeAndDimsParseSpec;
+import io.druid.data.input.impl.TimestampSpec;
+import io.druid.query.aggregation.AggregatorFactory;
+import io.druid.query.aggregation.DoubleSumAggregatorFactory;
+import io.druid.query.aggregation.LongSumAggregatorFactory;
+import io.druid.segment.indexing.DataSchema;
+import io.druid.segment.indexing.RealtimeTuningConfig;
+import io.druid.segment.indexing.granularity.GranularitySpec;
+import io.druid.segment.indexing.granularity.UniformGranularitySpec;
+import io.druid.segment.loading.DataSegmentPusher;
+import io.druid.segment.realtime.plumber.CustomVersioningPolicy;
+import io.druid.storage.hdfs.HdfsDataSegmentPusher;
+import io.druid.storage.hdfs.HdfsDataSegmentPusherConfig;
+import org.apache.calcite.adapter.druid.DruidTable;
+import org.apache.commons.lang.StringUtils;
+import org.apache.hadoop.fs.FileSystem;
+import org.apache.hadoop.fs.Path;
+import org.apache.hadoop.hive.conf.Constants;
+import org.apache.hadoop.hive.conf.HiveConf;
+import org.apache.hadoop.hive.druid.DruidStorageHandlerUtils;
+import org.apache.hadoop.hive.druid.serde.DruidWritable;
+import org.apache.hadoop.hive.ql.exec.FileSinkOperator;
+import org.apache.hadoop.hive.ql.io.HiveOutputFormat;
+import org.apache.hadoop.hive.serde.serdeConstants;
+import org.apache.hadoop.hive.serde2.objectinspector.ObjectInspector;
+import org.apache.hadoop.hive.serde2.typeinfo.TypeInfo;
+import org.apache.hadoop.hive.serde2.typeinfo.TypeInfoUtils;
+import org.apache.hadoop.io.Writable;
+import org.apache.hadoop.mapred.JobConf;
+import org.apache.hadoop.mapred.RecordWriter;
+import org.apache.hadoop.util.Progressable;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import java.io.File;
+import java.io.IOException;
+import java.util.ArrayList;
+import java.util.List;
+import java.util.Map;
+import java.util.Properties;
+
+import static org.apache.hadoop.hive.druid.DruidStorageHandler.SEGMENTS_DESCRIPTOR_DIR_NAME;
+
+public class DruidOutputFormat<K, V> implements HiveOutputFormat<K, DruidWritable> {
+
+ protected static final Logger LOG = LoggerFactory.getLogger(DruidOutputFormat.class);
+
+ @Override
+ public FileSinkOperator.RecordWriter getHiveRecordWriter(
+ JobConf jc,
+ Path finalOutPath,
+ Class<? extends Writable> valueClass,
+ boolean isCompressed,
+ Properties tableProperties,
+ Progressable progress
+ ) throws IOException {
+
+ final String segmentGranularity =
+ tableProperties.getProperty(Constants.DRUID_SEGMENT_GRANULARITY) != null ?
+ tableProperties.getProperty(Constants.DRUID_SEGMENT_GRANULARITY) :
+ HiveConf.getVar(jc, HiveConf.ConfVars.HIVE_DRUID_INDEXING_GRANULARITY);
+ final String dataSource = tableProperties.getProperty(Constants.DRUID_DATA_SOURCE);
+ final String segmentDirectory =
+ tableProperties.getProperty(Constants.DRUID_SEGMENT_DIRECTORY) != null
+ ? tableProperties.getProperty(Constants.DRUID_SEGMENT_DIRECTORY)
+ : HiveConf.getVar(jc, HiveConf.ConfVars.DRUID_SEGMENT_DIRECTORY);
+
+ final HdfsDataSegmentPusherConfig hdfsDataSegmentPusherConfig = new HdfsDataSegmentPusherConfig();
+ hdfsDataSegmentPusherConfig.setStorageDirectory(segmentDirectory);
+ final DataSegmentPusher hdfsDataSegmentPusher = new HdfsDataSegmentPusher(
+ hdfsDataSegmentPusherConfig, jc, DruidStorageHandlerUtils.JSON_MAPPER);
+
+ final GranularitySpec granularitySpec = new UniformGranularitySpec(
+ Granularity.valueOf(segmentGranularity),
+ null,
+ null
+ );
+
+ final String columnNameProperty = tableProperties.getProperty(serdeConstants.LIST_COLUMNS);
+ final String columnTypeProperty = tableProperties.getProperty(serdeConstants.LIST_COLUMN_TYPES);
+
+ if (StringUtils.isEmpty(columnNameProperty) || StringUtils.isEmpty(columnTypeProperty)) {
+ throw new IllegalStateException(
+ String.format("List of columns names [%s] or columns type [%s] is/are not present",
+ columnNameProperty, columnTypeProperty
+ ));
+ }
+ ArrayList<String> columnNames = new ArrayList<String>();
+ for (String name : columnNameProperty.split(",")) {
+ columnNames.add(name);
+ }
+ if (!columnNames.contains(DruidTable.DEFAULT_TIMESTAMP_COLUMN)) {
+ throw new IllegalStateException("Timestamp column (' " + DruidTable.DEFAULT_TIMESTAMP_COLUMN +
+ "') not specified in create table; list of columns is : " +
+ tableProperties.getProperty(serdeConstants.LIST_COLUMNS));
+ }
+ ArrayList<TypeInfo> columnTypes = TypeInfoUtils.getTypeInfosFromTypeString(columnTypeProperty);
+
+ // Default, all columns that are not metrics or timestamp, are treated as dimensions
+ final List<DimensionSchema> dimensions = new ArrayList<>();
+ ImmutableList.Builder<AggregatorFactory> aggregatorFactoryBuilder = ImmutableList.builder();
+ for (int i = 0; i < columnTypes.size(); i++) {
+ TypeInfo f = columnTypes.get(i);
+ assert f.getCategory() == ObjectInspector.Category.PRIMITIVE;
+ AggregatorFactory af;
+ switch (f.getTypeName()) {
+ case serdeConstants.TINYINT_TYPE_NAME:
+ case serdeConstants.SMALLINT_TYPE_NAME:
+ case serdeConstants.INT_TYPE_NAME:
+ case serdeConstants.BIGINT_TYPE_NAME:
+ af = new LongSumAggregatorFactory(columnNames.get(i), columnNames.get(i));
+ break;
+ case serdeConstants.FLOAT_TYPE_NAME:
+ case serdeConstants.DOUBLE_TYPE_NAME:
+ af = new DoubleSumAggregatorFactory(columnNames.get(i), columnNames.get(i));
+ break;
+ default:
+ // Dimension or timestamp
+ String columnName = columnNames.get(i);
+ if (!columnName.equals(DruidTable.DEFAULT_TIMESTAMP_COLUMN) && !columnName
+ .equals(Constants.DRUID_TIMESTAMP_GRANULARITY_COL_NAME)) {
+ dimensions.add(new StringDimensionSchema(columnName));
+ }
+ continue;
+ }
+ aggregatorFactoryBuilder.add(af);
+ }
+ List<AggregatorFactory> aggregatorFactories = aggregatorFactoryBuilder.build();
+ final InputRowParser inputRowParser = new MapInputRowParser(new TimeAndDimsParseSpec(
+ new TimestampSpec(DruidTable.DEFAULT_TIMESTAMP_COLUMN, "auto", null),
+ new DimensionsSpec(dimensions,
+ Lists.newArrayList(Constants.DRUID_TIMESTAMP_GRANULARITY_COL_NAME), null
+ )
+ ));
+
+ Map<String, Object> inputParser = DruidStorageHandlerUtils.JSON_MAPPER
+ .convertValue(inputRowParser, Map.class);
+
+ final DataSchema dataSchema = new DataSchema(
+ Preconditions.checkNotNull(dataSource, "Data source name is null"),
+ inputParser,
+ aggregatorFactories.toArray(new AggregatorFactory[aggregatorFactories.size()]),
+ granularitySpec,
+ DruidStorageHandlerUtils.JSON_MAPPER
+ );
+
+ final String workingPath = jc.get(Constants.DRUID_JOB_WORKING_DIRECTORY);
+ final String version = jc.get(Constants.DRUID_SEGMENT_VERSION);
+ Integer maxPartitionSize = HiveConf
+ .getIntVar(jc, HiveConf.ConfVars.HIVE_DRUID_MAX_PARTITION_SIZE);
+ String basePersistDirectory = HiveConf
+ .getVar(jc, HiveConf.ConfVars.HIVE_DRUID_BASE_PERSIST_DIRECTORY);
+ final RealtimeTuningConfig realtimeTuningConfig = RealtimeTuningConfig
+ .makeDefaultTuningConfig(new File(
+ basePersistDirectory))
+ .withVersioningPolicy(new CustomVersioningPolicy(version));
+
+ LOG.debug(String.format("running with Data schema [%s] ", dataSchema));
+ return new DruidRecordWriter(dataSchema, realtimeTuningConfig, hdfsDataSegmentPusher,
+ maxPartitionSize, new Path(workingPath, SEGMENTS_DESCRIPTOR_DIR_NAME),
+ finalOutPath.getFileSystem(jc)
+ );
+ }
+
+ @Override
+ public RecordWriter<K, DruidWritable> getRecordWriter(
+ FileSystem ignored, JobConf job, String name, Progressable progress
+ ) throws IOException {
+ throw new UnsupportedOperationException("please implement me !");
+ }
+
+ @Override
+ public void checkOutputSpecs(FileSystem ignored, JobConf job) throws IOException {
+ throw new UnsupportedOperationException("not implemented yet");
+ }
+}