You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@hive.apache.org by jc...@apache.org on 2016/12/15 20:51:38 UTC

[4/4] hive git commit: HIVE-15277: Teach Hive how to create/delete Druid segments (Slim Bouguerra, reviewed by Jesus Camacho Rodriguez)

HIVE-15277: Teach Hive how to create/delete Druid segments (Slim Bouguerra, reviewed by Jesus Camacho Rodriguez)


Project: http://git-wip-us.apache.org/repos/asf/hive/repo
Commit: http://git-wip-us.apache.org/repos/asf/hive/commit/590687bc
Tree: http://git-wip-us.apache.org/repos/asf/hive/tree/590687bc
Diff: http://git-wip-us.apache.org/repos/asf/hive/diff/590687bc

Branch: refs/heads/master
Commit: 590687bc4cb97cdf4aa95ba94f28949986d1b3e8
Parents: 89362a1
Author: Slim Bouguerra <sl...@gmail.com>
Authored: Thu Dec 15 20:43:45 2016 +0000
Committer: Jesus Camacho Rodriguez <jc...@apache.org>
Committed: Thu Dec 15 20:43:45 2016 +0000

----------------------------------------------------------------------
 .../hadoop/hive/common/JvmPauseMonitor.java     |   3 +-
 .../org/apache/hadoop/hive/conf/Constants.java  |   7 +
 .../org/apache/hadoop/hive/conf/HiveConf.java   |  38 +-
 druid-handler/README.md                         |   3 +
 druid-handler/pom.xml                           | 145 +++-
 .../hadoop/hive/druid/DruidStorageHandler.java  | 311 +++++++-
 .../hive/druid/DruidStorageHandlerUtils.java    | 351 ++++++++-
 .../hive/druid/HiveDruidOutputFormat.java       |  55 --
 .../druid/HiveDruidQueryBasedInputFormat.java   | 376 ----------
 .../hadoop/hive/druid/HiveDruidSplit.java       |  83 ---
 .../hadoop/hive/druid/io/DruidOutputFormat.java | 204 ++++++
 .../druid/io/DruidQueryBasedInputFormat.java    | 397 ++++++++++
 .../hadoop/hive/druid/io/DruidRecordWriter.java | 260 +++++++
 .../hadoop/hive/druid/io/HiveDruidSplit.java    |  84 +++
 .../serde/DruidGroupByQueryRecordReader.java    |  15 +-
 .../druid/serde/DruidQueryRecordReader.java     |  42 +-
 .../serde/DruidSelectQueryRecordReader.java     |   8 +-
 .../hadoop/hive/druid/serde/DruidSerDe.java     | 286 +++++---
 .../hive/druid/serde/DruidSerDeUtils.java       |   6 +-
 .../serde/DruidTimeseriesQueryRecordReader.java |   7 +-
 .../druid/serde/DruidTopNQueryRecordReader.java |   8 +-
 .../hive/druid/DruidStorageHandlerTest.java     | 181 +++++
 .../hadoop/hive/druid/QTestDruidSerDe.java      |  60 +-
 .../hadoop/hive/druid/TestDerbyConnector.java   | 108 +++
 .../hadoop/hive/druid/TestDruidSerDe.java       | 731 ++++++++++---------
 .../TestHiveDruidQueryBasedInputFormat.java     |  91 ++-
 .../hive/ql/io/DruidRecordWriterTest.java       | 221 ++++++
 .../llap/daemon/impl/TaskRunnerCallable.java    |  45 +-
 pom.xml                                         |   2 +-
 .../hadoop/hive/ql/exec/FileSinkOperator.java   |  29 +-
 .../apache/hadoop/hive/ql/exec/Utilities.java   | 113 ++-
 .../hadoop/hive/ql/hooks/LineageLogger.java     |  31 +-
 .../hadoop/hive/ql/optimizer/Optimizer.java     |   2 +
 ...tedDynPartitionTimeGranularityOptimizer.java | 363 +++++++++
 .../hadoop/hive/ql/parse/SemanticAnalyzer.java  |   4 -
 .../apache/hadoop/hive/ql/plan/PlanUtils.java   |  75 +-
 .../apache/hadoop/hive/ql/plan/TableDesc.java   |  18 +-
 .../queries/clientnegative/druid_external.q     |   5 -
 .../results/clientnegative/druid_external.q.out |   7 -
 .../results/clientpositive/druid_basic2.q.out   |  16 +-
 40 files changed, 3534 insertions(+), 1257 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/hive/blob/590687bc/common/src/java/org/apache/hadoop/hive/common/JvmPauseMonitor.java
----------------------------------------------------------------------
diff --git a/common/src/java/org/apache/hadoop/hive/common/JvmPauseMonitor.java b/common/src/java/org/apache/hadoop/hive/common/JvmPauseMonitor.java
index 5d475f4..cf080e3 100644
--- a/common/src/java/org/apache/hadoop/hive/common/JvmPauseMonitor.java
+++ b/common/src/java/org/apache/hadoop/hive/common/JvmPauseMonitor.java
@@ -37,6 +37,7 @@ import java.lang.management.ManagementFactory;
 import java.util.List;
 import java.util.Map;
 import java.util.Set;
+import java.util.concurrent.TimeUnit;
 
 /**
  * Based on the JvmPauseMonitor from Hadoop.
@@ -181,7 +182,7 @@ public class JvmPauseMonitor {
         } catch (InterruptedException ie) {
           return;
         }
-        long extraSleepTime = sw.elapsedMillis() - SLEEP_INTERVAL_MS;
+        long extraSleepTime = sw.elapsed(TimeUnit.MILLISECONDS) - SLEEP_INTERVAL_MS;
         Map<String, GcTimes> gcTimesAfterSleep = getGcTimes();
 
         if (extraSleepTime > warnThresholdMs) {

http://git-wip-us.apache.org/repos/asf/hive/blob/590687bc/common/src/java/org/apache/hadoop/hive/conf/Constants.java
----------------------------------------------------------------------
diff --git a/common/src/java/org/apache/hadoop/hive/conf/Constants.java b/common/src/java/org/apache/hadoop/hive/conf/Constants.java
index 6c42163..ea7864a 100644
--- a/common/src/java/org/apache/hadoop/hive/conf/Constants.java
+++ b/common/src/java/org/apache/hadoop/hive/conf/Constants.java
@@ -26,10 +26,17 @@ public class Constants {
   /* Constants for Druid storage handler */
   public static final String DRUID_HIVE_STORAGE_HANDLER_ID =
           "org.apache.hadoop.hive.druid.DruidStorageHandler";
+  public static final String DRUID_HIVE_OUTPUT_FORMAT =
+          "org.apache.hadoop.hive.druid.io.DruidOutputFormat";
   public static final String DRUID_DATA_SOURCE = "druid.datasource";
+  public static final String DRUID_SEGMENT_GRANULARITY = "druid.segment.granularity";
+  public static final String DRUID_TIMESTAMP_GRANULARITY_COL_NAME = "__time_granularity";
   public static final String DRUID_QUERY_JSON = "druid.query.json";
   public static final String DRUID_QUERY_TYPE = "druid.query.type";
   public static final String DRUID_QUERY_FETCH = "druid.query.fetch";
+  public static final String DRUID_SEGMENT_DIRECTORY = "druid.storage.storageDirectory";
+  public static final String DRUID_SEGMENT_VERSION = "druid.segment.version";
+  public static final String DRUID_JOB_WORKING_DIRECTORY = "druid.job.workingDirectory";
 
   public static final String HIVE_SERVER2_JOB_CREDSTORE_PASSWORD_ENVVAR = "HIVE_JOB_CREDSTORE_PASSWORD";
   public static final String HADOOP_CREDENTIAL_PASSWORD_ENVVAR = "HADOOP_CREDSTORE_PASSWORD";

http://git-wip-us.apache.org/repos/asf/hive/blob/590687bc/common/src/java/org/apache/hadoop/hive/conf/HiveConf.java
----------------------------------------------------------------------
diff --git a/common/src/java/org/apache/hadoop/hive/conf/HiveConf.java b/common/src/java/org/apache/hadoop/hive/conf/HiveConf.java
index 032ff0c..dcb383d 100644
--- a/common/src/java/org/apache/hadoop/hive/conf/HiveConf.java
+++ b/common/src/java/org/apache/hadoop/hive/conf/HiveConf.java
@@ -1936,9 +1936,21 @@ public class HiveConf extends Configuration {
       new TimeValidator(TimeUnit.MILLISECONDS), "Frequency of WriteSet reaper runs"),
 
     // For Druid storage handler
+    HIVE_DRUID_INDEXING_GRANULARITY("hive.druid.indexer.segments.granularity", "DAY",
+            new PatternSet("YEAR", "MONTH", "WEEK", "DAY", "HOUR", "MINUTE", "SECOND"),
+            "Granularity for the segments created by the Druid storage handler"
+    ),
+    HIVE_DRUID_MAX_PARTITION_SIZE("hive.druid.indexer.partition.size.max", 5000000,
+            "Maximum number of records per segment partition"
+    ),
+    HIVE_DRUID_MAX_ROW_IN_MEMORY("hive.druid.indexer.memory.rownum.max", 75000,
+            "Maximum number of records in memory while storing data in Druid"
+    ),
     HIVE_DRUID_BROKER_DEFAULT_ADDRESS("hive.druid.broker.address.default", "localhost:8082",
-        "Address of the Druid broker. If we are querying Druid from Hive, this address needs to be\n" +
-        "declared"),
+            "Address of the Druid broker. If we are querying Druid from Hive, this address needs to be\n"
+                    +
+                    "declared"
+    ),
     HIVE_DRUID_SELECT_THRESHOLD("hive.druid.select.threshold", 10000,
         "When we can split a Select query, this is the maximum number of rows that we try to retrieve\n" +
         "per query. In order to do that, we obtain the estimated size for the complete result. If the\n" +
@@ -1949,7 +1961,27 @@ public class HiveConf extends Configuration {
         "the HTTP client."),
     HIVE_DRUID_HTTP_READ_TIMEOUT("hive.druid.http.read.timeout", "PT1M", "Read timeout period for the HTTP\n" +
         "client in ISO8601 format (for example P2W, P3M, PT1H30M, PT0.750S), default is period of 1 minute."),
-
+    HIVE_DRUID_BASE_PERSIST_DIRECTORY("hive.druid.basePersistDirectory", "/tmp",
+            "Local temporary directory used to persist intermediate indexing state."
+    ),
+    DRUID_SEGMENT_DIRECTORY("hive.druid.storage.storageDirectory", "/druid/segments"
+            , "druid deep storage location."),
+    DRUID_METADATA_BASE("hive.druid.metadata.base", "druid", "Default prefix for metadata tables"),
+    DRUID_METADATA_DB_TYPE("hive.druid.metadata.db.type", "mysql",
+            new PatternSet("mysql", "postgres"), "Type of the metadata database."
+    ),
+    DRUID_METADATA_DB_USERNAME("hive.druid.metadata.username", "",
+            "Username to connect to Type of the metadata DB."
+    ),
+    DRUID_METADATA_DB_PASSWORD("hive.druid.metadata.password", "",
+            "Password to connect to Type of the metadata DB."
+    ),
+    DRUID_METADATA_DB_URI("hive.druid.metadata.uri", "",
+            "URI to connect to the database (for example jdbc:mysql://hostname:port/DBName)."
+    ),
+    DRUID_WORKING_DIR("hive.druid.working.directory", "/tmp/workingDirectory",
+            "Default hdfs working directory used to store some intermediate metadata"
+    ),
     // For HBase storage handler
     HIVE_HBASE_WAL_ENABLED("hive.hbase.wal.enabled", true,
         "Whether writes to HBase should be forced to the write-ahead log. \n" +

http://git-wip-us.apache.org/repos/asf/hive/blob/590687bc/druid-handler/README.md
----------------------------------------------------------------------
diff --git a/druid-handler/README.md b/druid-handler/README.md
new file mode 100644
index 0000000..b548567
--- /dev/null
+++ b/druid-handler/README.md
@@ -0,0 +1,3 @@
+# Druid Storage Handler
+
+[Link for documentation]( https://cwiki.apache.org/confluence/display/Hive/Druid+Integration) 

http://git-wip-us.apache.org/repos/asf/hive/blob/590687bc/druid-handler/pom.xml
----------------------------------------------------------------------
diff --git a/druid-handler/pom.xml b/druid-handler/pom.xml
index e4fa8fd..f691a2c 100644
--- a/druid-handler/pom.xml
+++ b/druid-handler/pom.xml
@@ -29,6 +29,8 @@
 
   <properties>
     <hive.path.to.root>..</hive.path.to.root>
+    <druid.metamx.util.version>0.27.10</druid.metamx.util.version>
+    <druid.guava.version>16.0.1</druid.guava.version>
   </properties>
 
   <dependencies>
@@ -47,6 +49,10 @@
           <groupId>io.netty</groupId>
           <artifactId>netty</artifactId>
         </exclusion>
+        <exclusion>
+          <groupId>com.google.guava</groupId>
+          <artifactId>guava</artifactId>
+        </exclusion>
       </exclusions>
     </dependency>
     <!-- inter-project -->
@@ -56,37 +62,47 @@
       <version>${commons-lang.version}</version>
     </dependency>
     <dependency>
-      <groupId>org.apache.hadoop</groupId>
-      <artifactId>hadoop-common</artifactId>
-      <version>${hadoop.version}</version>
-      <optional>true</optional>
-        <exclusions>
-          <exclusion>
-            <groupId>org.slf4j</groupId>
-            <artifactId>slf4j-log4j12</artifactId>
-          </exclusion>
-          <exclusion>
-            <groupId>commmons-logging</groupId>
-            <artifactId>commons-logging</artifactId>
-          </exclusion>
-      </exclusions>
+      <groupId>com.google.guava</groupId>
+      <artifactId>guava</artifactId>
+      <version>${druid.guava.version}</version>
     </dependency>
     <dependency>
-      <groupId>org.apache.hadoop</groupId>
-      <artifactId>hadoop-mapreduce-client-core</artifactId>
-      <version>${hadoop.version}</version>
-      <optional>true</optional>
+      <groupId>com.metamx</groupId>
+      <artifactId>java-util</artifactId>
+      <version>${druid.metamx.util.version}</version>
       <exclusions>
         <exclusion>
-          <groupId>io.netty</groupId>
-          <artifactId>netty</artifactId>
+          <groupId>com.fasterxml.jackson.core</groupId>
+          <artifactId>jackson-core</artifactId>
+        </exclusion>
+        <exclusion>
+          <groupId>com.fasterxml.jackson.core</groupId>
+          <artifactId>jackson-annotations</artifactId>
+        </exclusion>
+        <exclusion>
+          <groupId>com.fasterxml.jackson.core</groupId>
+          <artifactId>jackson-databind</artifactId>
         </exclusion>
       </exclusions>
     </dependency>
     <dependency>
-      <groupId>org.apache.calcite</groupId>
-      <artifactId>calcite-druid</artifactId>
-      <version>${calcite.version}</version>
+      <groupId>io.druid</groupId>
+      <artifactId>druid-server</artifactId>
+      <version>${druid.version}</version>
+      <exclusions>
+        <exclusion>
+          <groupId>com.fasterxml.jackson.core</groupId>
+          <artifactId>jackson-core</artifactId>
+        </exclusion>
+        <exclusion>
+          <groupId>com.fasterxml.jackson.core</groupId>
+          <artifactId>jackson-annotations</artifactId>
+        </exclusion>
+        <exclusion>
+          <groupId>com.fasterxml.jackson.core</groupId>
+          <artifactId>jackson-databind</artifactId>
+        </exclusion>
+      </exclusions>
     </dependency>
     <dependency>
       <groupId>io.druid</groupId>
@@ -107,7 +123,72 @@
         </exclusion>
       </exclusions>
     </dependency>
-
+    <dependency>
+      <groupId>io.druid.extensions</groupId>
+      <artifactId>druid-hdfs-storage</artifactId>
+      <version>${druid.version}</version>
+    </dependency>
+    <dependency>
+      <groupId>io.druid.extensions</groupId>
+      <artifactId>mysql-metadata-storage</artifactId>
+      <version>${druid.version}</version>
+    </dependency>
+    <dependency>
+      <groupId>io.druid.extensions</groupId>
+      <artifactId>postgresql-metadata-storage</artifactId>
+      <version>${druid.version}</version>
+    </dependency>
+    <dependency>
+      <groupId>org.apache.hadoop</groupId>
+      <artifactId>hadoop-common</artifactId>
+      <scope>provided</scope>
+      <version>${hadoop.version}</version>
+      <optional>true</optional>
+      <exclusions>
+        <exclusion>
+          <groupId>org.slf4j</groupId>
+          <artifactId>slf4j-log4j12</artifactId>
+        </exclusion>
+        <exclusion>
+          <groupId>com.google.guava</groupId>
+          <artifactId>guava</artifactId>
+        </exclusion>
+        <exclusion>
+          <groupId>commmons-logging</groupId>
+          <artifactId>commons-logging</artifactId>
+        </exclusion>
+      </exclusions>
+    </dependency>
+    <dependency>
+      <groupId>org.apache.hadoop</groupId>
+      <scope>provided</scope>
+      <artifactId>hadoop-mapreduce-client-core</artifactId>
+      <version>${hadoop.version}</version>
+      <optional>true</optional>
+      <exclusions>
+        <exclusion>
+          <groupId>com.google.inject</groupId>
+          <artifactId>guice</artifactId>
+        </exclusion>
+        <exclusion>
+          <groupId>com.google.inject.extensions</groupId>
+          <artifactId>guice-servlet</artifactId>
+        </exclusion>
+        <exclusion>
+          <groupId>io.netty</groupId>
+          <artifactId>netty</artifactId>
+        </exclusion>
+        <exclusion>
+          <groupId>com.google.guava</groupId>
+          <artifactId>guava</artifactId>
+        </exclusion>
+      </exclusions>
+    </dependency>
+    <dependency>
+      <groupId>org.apache.calcite</groupId>
+      <artifactId>calcite-druid</artifactId>
+      <version>${calcite.version}</version>
+    </dependency>
     <!-- test inter-project -->
     <dependency>
       <groupId>junit</groupId>
@@ -115,6 +196,12 @@
       <version>${junit.version}</version>
       <scope>test</scope>
     </dependency>
+    <dependency>
+      <groupId>io.druid</groupId>
+      <artifactId>druid-indexing-hadoop</artifactId>
+      <version>${druid.version}</version>
+      <scope>test</scope>
+    </dependency>
   </dependencies>
 
   <build>
@@ -170,15 +257,25 @@
                   <pattern>com.fasterxml.jackson</pattern>
                   <shadedPattern>org.apache.hive.druid.com.fasterxml.jackson</shadedPattern>
                 </relocation>
+                <relocation>
+                  <pattern>com.google.common</pattern>
+                  <shadedPattern>org.apache.hive.druid.com.google.common</shadedPattern>
+                </relocation>
               </relocations>
               <artifactSet>
                 <includes>
                   <include>io.druid:*</include>
+                  <include>io.druid.extensions:*</include>
                   <include>com.metamx:*</include>
                   <include>io.netty:*</include>
                   <include>com.fasterxml.jackson.core:*</include>
                   <include>com.fasterxml.jackson.datatype:*</include>
                   <include>com.fasterxml.jackson.dataformat:*</include>
+                  <include>com.google.guava:*</include>
+                  <include>it.unimi.dsi:*</include>
+                  <include>org.jdbi:*</include>
+                  <include>net.jpountz.lz4:*</include>
+                  <include>org.apache.commons:*</include>
                 </includes>
               </artifactSet>
               <filters>

http://git-wip-us.apache.org/repos/asf/hive/blob/590687bc/druid-handler/src/java/org/apache/hadoop/hive/druid/DruidStorageHandler.java
----------------------------------------------------------------------
diff --git a/druid-handler/src/java/org/apache/hadoop/hive/druid/DruidStorageHandler.java b/druid-handler/src/java/org/apache/hadoop/hive/druid/DruidStorageHandler.java
index 8242385..a08a4e3 100644
--- a/druid-handler/src/java/org/apache/hadoop/hive/druid/DruidStorageHandler.java
+++ b/druid-handler/src/java/org/apache/hadoop/hive/druid/DruidStorageHandler.java
@@ -6,9 +6,9 @@
  * to you under the Apache License, Version 2.0 (the
  * "License"); you may not use this file except in compliance
  * with the License.  You may obtain a copy of the License at
- *
- *     http://www.apache.org/licenses/LICENSE-2.0
- *
+ * <p>
+ * http://www.apache.org/licenses/LICENSE-2.0
+ * <p>
  * Unless required by applicable law or agreed to in writing, software
  * distributed under the License is distributed on an "AS IS" BASIS,
  * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
@@ -17,36 +17,131 @@
  */
 package org.apache.hadoop.hive.druid;
 
+import com.google.common.annotations.VisibleForTesting;
+import com.google.common.base.Preconditions;
+import com.google.common.base.Strings;
+import com.google.common.base.Supplier;
+import com.google.common.base.Suppliers;
+import com.google.common.base.Throwables;
+import io.druid.indexer.SQLMetadataStorageUpdaterJobHandler;
+import io.druid.metadata.MetadataStorageConnectorConfig;
+import io.druid.metadata.MetadataStorageTablesConfig;
+import io.druid.metadata.SQLMetadataConnector;
+import io.druid.metadata.storage.mysql.MySQLConnector;
+import io.druid.metadata.storage.postgresql.PostgreSQLConnector;
+import io.druid.segment.loading.SegmentLoadingException;
+import io.druid.timeline.DataSegment;
 import org.apache.commons.lang3.StringUtils;
+import org.apache.hadoop.fs.FileSystem;
+import org.apache.hadoop.fs.Path;
 import org.apache.hadoop.hive.conf.Constants;
+import org.apache.hadoop.hive.conf.HiveConf;
+import org.apache.hadoop.hive.druid.io.DruidOutputFormat;
+import org.apache.hadoop.hive.druid.io.DruidQueryBasedInputFormat;
 import org.apache.hadoop.hive.druid.serde.DruidSerDe;
 import org.apache.hadoop.hive.metastore.HiveMetaHook;
 import org.apache.hadoop.hive.metastore.MetaStoreUtils;
 import org.apache.hadoop.hive.metastore.api.MetaException;
 import org.apache.hadoop.hive.metastore.api.Table;
 import org.apache.hadoop.hive.ql.metadata.DefaultStorageHandler;
+import org.apache.hadoop.hive.ql.plan.TableDesc;
+import org.apache.hadoop.hive.ql.session.SessionState;
 import org.apache.hadoop.hive.serde2.AbstractSerDe;
 import org.apache.hadoop.mapred.InputFormat;
 import org.apache.hadoop.mapred.OutputFormat;
+import org.joda.time.DateTime;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 
+import java.io.IOException;
+import java.util.Collection;
+import java.util.List;
+import java.util.Map;
+
 /**
  * DruidStorageHandler provides a HiveStorageHandler implementation for Druid.
  */
-@SuppressWarnings({"deprecation","rawtypes"})
+@SuppressWarnings({ "deprecation", "rawtypes" })
 public class DruidStorageHandler extends DefaultStorageHandler implements HiveMetaHook {
 
   protected static final Logger LOG = LoggerFactory.getLogger(DruidStorageHandler.class);
 
+  public static final String SEGMENTS_DESCRIPTOR_DIR_NAME = "segmentsDescriptorDir";
+
+  private final SQLMetadataConnector connector;
+
+  private final SQLMetadataStorageUpdaterJobHandler druidSqlMetadataStorageUpdaterJobHandler;
+
+  private final MetadataStorageTablesConfig druidMetadataStorageTablesConfig;
+
+  private String uniqueId = null;
+
+  private String rootWorkingDir = null;
+
+  public DruidStorageHandler() {
+    //this is the default value in druid
+    final String base = HiveConf
+            .getVar(SessionState.getSessionConf(), HiveConf.ConfVars.DRUID_METADATA_BASE);
+    final String dbType = HiveConf
+            .getVar(SessionState.getSessionConf(), HiveConf.ConfVars.DRUID_METADATA_DB_TYPE);
+    final String username = HiveConf
+            .getVar(SessionState.getSessionConf(), HiveConf.ConfVars.DRUID_METADATA_DB_USERNAME);
+    final String password = HiveConf
+            .getVar(SessionState.getSessionConf(), HiveConf.ConfVars.DRUID_METADATA_DB_PASSWORD);
+    final String uri = HiveConf
+            .getVar(SessionState.getSessionConf(), HiveConf.ConfVars.DRUID_METADATA_DB_URI);
+    druidMetadataStorageTablesConfig = MetadataStorageTablesConfig.fromBase(base);
+
+    final Supplier<MetadataStorageConnectorConfig> storageConnectorConfigSupplier = Suppliers.<MetadataStorageConnectorConfig>ofInstance(
+            new MetadataStorageConnectorConfig() {
+              @Override
+              public String getConnectURI() {
+                return uri;
+              }
+
+              @Override
+              public String getUser() {
+                return username;
+              }
+
+              @Override
+              public String getPassword() {
+                return password;
+              }
+            });
+
+    if (dbType.equals("mysql")) {
+      connector = new MySQLConnector(storageConnectorConfigSupplier,
+              Suppliers.ofInstance(druidMetadataStorageTablesConfig)
+      );
+    } else if (dbType.equals("postgres")) {
+      connector = new PostgreSQLConnector(storageConnectorConfigSupplier,
+              Suppliers.ofInstance(druidMetadataStorageTablesConfig)
+      );
+    } else {
+      throw new IllegalStateException(String.format("Unknown metadata storage type [%s]", dbType));
+    }
+    druidSqlMetadataStorageUpdaterJobHandler = new SQLMetadataStorageUpdaterJobHandler(connector);
+  }
+
+  @VisibleForTesting
+  public DruidStorageHandler(SQLMetadataConnector connector,
+          SQLMetadataStorageUpdaterJobHandler druidSqlMetadataStorageUpdaterJobHandler,
+          MetadataStorageTablesConfig druidMetadataStorageTablesConfig
+  ) {
+    this.connector = connector;
+    this.druidSqlMetadataStorageUpdaterJobHandler = druidSqlMetadataStorageUpdaterJobHandler;
+    this.druidMetadataStorageTablesConfig = druidMetadataStorageTablesConfig;
+  }
+
   @Override
   public Class<? extends InputFormat> getInputFormatClass() {
-    return HiveDruidQueryBasedInputFormat.class;
+    return DruidQueryBasedInputFormat.class;
   }
 
   @Override
   public Class<? extends OutputFormat> getOutputFormatClass() {
-    return HiveDruidOutputFormat.class;
+    return DruidOutputFormat.class;
   }
 
   @Override
@@ -62,28 +157,141 @@ public class DruidStorageHandler extends DefaultStorageHandler implements HiveMe
   @Override
   public void preCreateTable(Table table) throws MetaException {
     // Do safety checks
-    if (!MetaStoreUtils.isExternalTable(table)) {
-      throw new MetaException("Table in Druid needs to be declared as EXTERNAL");
-    }
-    if (!StringUtils.isEmpty(table.getSd().getLocation())) {
+    if (MetaStoreUtils.isExternalTable(table) && !StringUtils
+            .isEmpty(table.getSd().getLocation())) {
       throw new MetaException("LOCATION may not be specified for Druid");
     }
+
     if (table.getPartitionKeysSize() != 0) {
       throw new MetaException("PARTITIONED BY may not be specified for Druid");
     }
     if (table.getSd().getBucketColsSize() != 0) {
       throw new MetaException("CLUSTERED BY may not be specified for Druid");
     }
+    String dataSourceName = table.getParameters().get(Constants.DRUID_DATA_SOURCE);
+    if (MetaStoreUtils.isExternalTable(table)) {
+      return;
+    }
+    // If it is not an external table we need to check the metadata
+    try {
+      connector.createSegmentTable();
+    } catch (Exception e) {
+      LOG.error("Exception while trying to create druid segments table", e);
+      throw new MetaException(e.getMessage());
+    }
+    Collection<String> existingDataSources = DruidStorageHandlerUtils
+            .getAllDataSourceNames(connector, druidMetadataStorageTablesConfig);
+    LOG.debug(String.format("pre-create data source with name [%s]", dataSourceName));
+    if (existingDataSources.contains(dataSourceName)) {
+      throw new MetaException(String.format("Data source [%s] already existing", dataSourceName));
+    }
   }
 
   @Override
   public void rollbackCreateTable(Table table) throws MetaException {
-    // Nothing to do
+    if (MetaStoreUtils.isExternalTable(table)) {
+      return;
+    }
+    final Path segmentDescriptorDir = getSegmentDescriptorDir();
+    try {
+      List<DataSegment> dataSegmentList = DruidStorageHandlerUtils
+              .getPublishedSegments(segmentDescriptorDir, getConf());
+      for (DataSegment dataSegment : dataSegmentList) {
+        try {
+          deleteSegment(dataSegment);
+        } catch (SegmentLoadingException e) {
+          LOG.error(String.format("Error while trying to clean the segment [%s]", dataSegment), e);
+        }
+      }
+    } catch (IOException e) {
+      LOG.error("Exception while rollback", e);
+      throw Throwables.propagate(e);
+    } finally {
+      cleanWorkingDir();
+    }
   }
 
   @Override
   public void commitCreateTable(Table table) throws MetaException {
-    // Nothing to do
+    if (MetaStoreUtils.isExternalTable(table)) {
+      return;
+    }
+    LOG.info(String.format("Committing table [%s] to the druid metastore", table.getDbName()));
+    final Path tableDir = getSegmentDescriptorDir();
+    try {
+      List<DataSegment> segmentList = DruidStorageHandlerUtils
+              .getPublishedSegments(tableDir, getConf());
+      LOG.info(String.format("Found [%d] segments under path [%s]", segmentList.size(), tableDir));
+      druidSqlMetadataStorageUpdaterJobHandler.publishSegments(
+              druidMetadataStorageTablesConfig.getSegmentsTable(),
+              segmentList,
+              DruidStorageHandlerUtils.JSON_MAPPER
+      );
+    } catch (IOException e) {
+      LOG.error("Exception while commit", e);
+      Throwables.propagate(e);
+    } finally {
+      cleanWorkingDir();
+    }
+  }
+
+  @VisibleForTesting
+  protected void deleteSegment(DataSegment segment) throws SegmentLoadingException {
+
+    final Path path = getPath(segment);
+    LOG.info(String.format("removing segment[%s], located at path[%s]", segment.getIdentifier(),
+            path
+    ));
+
+    try {
+      if (path.getName().endsWith(".zip")) {
+
+        final FileSystem fs = path.getFileSystem(getConf());
+
+        if (!fs.exists(path)) {
+          LOG.warn(String.format(
+                  "Segment Path [%s] does not exist. It appears to have been deleted already.",
+                  path
+          ));
+          return;
+        }
+
+        // path format -- > .../dataSource/interval/version/partitionNum/xxx.zip
+        Path partitionNumDir = path.getParent();
+        if (!fs.delete(partitionNumDir, true)) {
+          throw new SegmentLoadingException(
+                  "Unable to kill segment, failed to delete dir [%s]",
+                  partitionNumDir.toString()
+          );
+        }
+
+        //try to delete other directories if possible
+        Path versionDir = partitionNumDir.getParent();
+        if (safeNonRecursiveDelete(fs, versionDir)) {
+          Path intervalDir = versionDir.getParent();
+          if (safeNonRecursiveDelete(fs, intervalDir)) {
+            Path dataSourceDir = intervalDir.getParent();
+            safeNonRecursiveDelete(fs, dataSourceDir);
+          }
+        }
+      } else {
+        throw new SegmentLoadingException("Unknown file type[%s]", path);
+      }
+    } catch (IOException e) {
+      throw new SegmentLoadingException(e, "Unable to kill segment");
+    }
+  }
+
+  private static Path getPath(DataSegment dataSegment) {
+    return new Path(String.valueOf(dataSegment.getLoadSpec().get("path")));
+  }
+
+  private static boolean safeNonRecursiveDelete(FileSystem fs, Path path) {
+    try {
+      return fs.delete(path, false);
+    } catch (Exception ex) {
+      return false;
+    }
   }
 
   @Override
@@ -98,7 +306,45 @@ public class DruidStorageHandler extends DefaultStorageHandler implements HiveMe
 
   @Override
   public void commitDropTable(Table table, boolean deleteData) throws MetaException {
-    // Nothing to do
+    if (MetaStoreUtils.isExternalTable(table)) {
+      return;
+    }
+    String dataSourceName = Preconditions
+            .checkNotNull(table.getParameters().get(Constants.DRUID_DATA_SOURCE),
+                    "DataSource name is null !"
+            );
+
+    if (deleteData == true) {
+      LOG.info(String.format("Dropping with purge all the data for data source [%s]",
+              dataSourceName
+      ));
+      List<DataSegment> dataSegmentList = DruidStorageHandlerUtils
+              .getDataSegmentList(connector, druidMetadataStorageTablesConfig, dataSourceName);
+      if (dataSegmentList.isEmpty()) {
+        LOG.info(String.format("Nothing to delete for data source [%s]", dataSourceName));
+        return;
+      }
+      for (DataSegment dataSegment : dataSegmentList) {
+        try {
+          deleteSegment(dataSegment);
+        } catch (SegmentLoadingException e) {
+          LOG.error(String.format("Error while deleting segment [%s]", dataSegment.getIdentifier()),
+                  e
+          );
+        }
+      }
+    }
+    if (DruidStorageHandlerUtils
+            .disableDataSource(connector, druidMetadataStorageTablesConfig, dataSourceName)) {
+      LOG.info(String.format("Successfully dropped druid data source [%s]", dataSourceName));
+    }
+  }
+
+  @Override
+  public void configureOutputJobProperties(TableDesc tableDesc, Map<String, String> jobProperties
+  ) {
+    jobProperties.put(Constants.DRUID_SEGMENT_VERSION, new DateTime().toString());
+    jobProperties.put(Constants.DRUID_JOB_WORKING_DIRECTORY, getStagingWorkingDir().toString());
   }
 
   @Override
@@ -106,4 +352,43 @@ public class DruidStorageHandler extends DefaultStorageHandler implements HiveMe
     return Constants.DRUID_HIVE_STORAGE_HANDLER_ID;
   }
 
+  public String getUniqueId() {
+    if (uniqueId == null) {
+      uniqueId = Preconditions.checkNotNull(
+              Strings.emptyToNull(HiveConf.getVar(getConf(), HiveConf.ConfVars.HIVEQUERYID)),
+              "Hive query id is null"
+      );
+    }
+    return uniqueId;
+  }
+
+  private Path getStagingWorkingDir() {
+    return new Path(getRootWorkingDir(), makeStagingName());
+  }
+
+  @VisibleForTesting
+  protected String makeStagingName() {
+    return ".staging-".concat(getUniqueId().replace(":", ""));
+  }
+
+  private Path getSegmentDescriptorDir() {
+    return new Path(getStagingWorkingDir(), SEGMENTS_DESCRIPTOR_DIR_NAME);
+  }
+
+  private void cleanWorkingDir() {
+    final FileSystem fileSystem;
+    try {
+      fileSystem = getStagingWorkingDir().getFileSystem(getConf());
+      fileSystem.delete(getStagingWorkingDir(), true);
+    } catch (IOException e) {
+      LOG.error("Got Exception while cleaning working directory", e);
+    }
+  }
+
+  private String getRootWorkingDir() {
+    if (Strings.isNullOrEmpty(rootWorkingDir)) {
+      rootWorkingDir = HiveConf.getVar(getConf(), HiveConf.ConfVars.DRUID_WORKING_DIR);
+    }
+    return rootWorkingDir;
+  }
 }

http://git-wip-us.apache.org/repos/asf/hive/blob/590687bc/druid-handler/src/java/org/apache/hadoop/hive/druid/DruidStorageHandlerUtils.java
----------------------------------------------------------------------
diff --git a/druid-handler/src/java/org/apache/hadoop/hive/druid/DruidStorageHandlerUtils.java b/druid-handler/src/java/org/apache/hadoop/hive/druid/DruidStorageHandlerUtils.java
index c6b8024..193e4aa 100644
--- a/druid-handler/src/java/org/apache/hadoop/hive/druid/DruidStorageHandlerUtils.java
+++ b/druid-handler/src/java/org/apache/hadoop/hive/druid/DruidStorageHandlerUtils.java
@@ -6,9 +6,9 @@
  * to you under the Apache License, Version 2.0 (the
  * "License"); you may not use this file except in compliance
  * with the License.  You may obtain a copy of the License at
- *
- *     http://www.apache.org/licenses/LICENSE-2.0
- *
+ * <p>
+ * http://www.apache.org/licenses/LICENSE-2.0
+ * <p>
  * Unless required by applicable law or agreed to in writing, software
  * distributed under the License is distributed on an "AS IS" BASIS,
  * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
@@ -17,22 +17,62 @@
  */
 package org.apache.hadoop.hive.druid;
 
-import java.io.IOException;
-import java.io.InputStream;
-import java.net.URL;
-import java.util.concurrent.ExecutionException;
-
-import org.jboss.netty.handler.codec.http.HttpHeaders;
-import org.jboss.netty.handler.codec.http.HttpMethod;
-
 import com.fasterxml.jackson.databind.ObjectMapper;
+import com.fasterxml.jackson.databind.jsontype.NamedType;
 import com.fasterxml.jackson.dataformat.smile.SmileFactory;
+import com.google.common.base.Throwables;
+import com.google.common.collect.ImmutableList;
+import com.google.common.collect.Interner;
+import com.google.common.collect.Interners;
+import com.google.common.collect.Lists;
+import com.metamx.common.MapUtils;
+import com.metamx.emitter.EmittingLogger;
+import com.metamx.emitter.core.NoopEmitter;
+import com.metamx.emitter.service.ServiceEmitter;
 import com.metamx.http.client.HttpClient;
 import com.metamx.http.client.Request;
 import com.metamx.http.client.response.InputStreamResponseHandler;
-
 import io.druid.jackson.DefaultObjectMapper;
+import io.druid.metadata.MetadataStorageTablesConfig;
+import io.druid.metadata.SQLMetadataConnector;
+import io.druid.metadata.storage.mysql.MySQLConnector;
 import io.druid.query.BaseQuery;
+import io.druid.segment.IndexIO;
+import io.druid.segment.IndexMergerV9;
+import io.druid.segment.column.ColumnConfig;
+import io.druid.timeline.DataSegment;
+import io.druid.timeline.partition.LinearShardSpec;
+import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.fs.FileStatus;
+import org.apache.hadoop.fs.FileSystem;
+import org.apache.hadoop.fs.Path;
+import org.apache.hadoop.io.retry.RetryPolicies;
+import org.apache.hadoop.io.retry.RetryProxy;
+import org.jboss.netty.handler.codec.http.HttpHeaders;
+import org.jboss.netty.handler.codec.http.HttpMethod;
+import org.skife.jdbi.v2.FoldController;
+import org.skife.jdbi.v2.Folder3;
+import org.skife.jdbi.v2.Handle;
+import org.skife.jdbi.v2.StatementContext;
+import org.skife.jdbi.v2.TransactionCallback;
+import org.skife.jdbi.v2.TransactionStatus;
+import org.skife.jdbi.v2.tweak.HandleCallback;
+import org.skife.jdbi.v2.util.ByteArrayMapper;
+
+import java.io.IOException;
+import java.io.InputStream;
+import java.io.OutputStream;
+import java.net.InetAddress;
+import java.net.URL;
+import java.net.UnknownHostException;
+import java.sql.SQLException;
+import java.util.ArrayList;
+import java.util.Collection;
+import java.util.List;
+import java.util.Map;
+import java.util.TimeZone;
+import java.util.concurrent.ExecutionException;
+import java.util.concurrent.TimeUnit;
 
 /**
  * Utils class for Druid storage handler.
@@ -51,12 +91,61 @@ public final class DruidStorageHandlerUtils {
    */
   public static final ObjectMapper SMILE_MAPPER = new DefaultObjectMapper(new SmileFactory());
 
+  private static final int NUM_RETRIES = 8;
+
+  private static final int SECONDS_BETWEEN_RETRIES = 2;
+
+  private static final int DEFAULT_FS_BUFFER_SIZE = 1 << 18; // 256KB
+
+  private static final int DEFAULT_STREAMING_RESULT_SIZE = 100;
+
+  /**
+   * Used by druid to perform IO on indexes
+   */
+  public static final IndexIO INDEX_IO = new IndexIO(JSON_MAPPER, new ColumnConfig() {
+    @Override
+    public int columnCacheSizeBytes() {
+      return 0;
+    }
+  });
+
+  /**
+   * Used by druid to merge indexes
+   */
+  public static final IndexMergerV9 INDEX_MERGER_V9 = new IndexMergerV9(JSON_MAPPER,
+          DruidStorageHandlerUtils.INDEX_IO
+  );
+
+  /**
+   * Generic Interner implementation used to read segments object from metadata storage
+   */
+  public static final Interner<DataSegment> DATA_SEGMENT_INTERNER = Interners.newWeakInterner();
+
+  static {
+    // Register the shard sub type to be used by the mapper
+    JSON_MAPPER.registerSubtypes(new NamedType(LinearShardSpec.class, "linear"));
+    // set the timezone of the object mapper
+    // THIS IS NOT WORKING workaround is to set it as part of java opts -Duser.timezone="UTC"
+    JSON_MAPPER.setTimeZone(TimeZone.getTimeZone("UTC"));
+    try {
+      // No operation emitter will be used by some internal druid classes.
+      EmittingLogger.registerEmitter(
+              new ServiceEmitter("druid-hive-indexer", InetAddress.getLocalHost().getHostName(),
+                      new NoopEmitter()
+              ));
+    } catch (UnknownHostException e) {
+      throw Throwables.propagate(e);
+    }
+  }
+
   /**
    * Method that creates a request for Druid JSON query (using SMILE).
-   * @param mapper
+   *
    * @param address
    * @param query
+   *
    * @return
+   *
    * @throws IOException
    */
   public static Request createRequest(String address, BaseQuery<?> query)
@@ -69,9 +158,12 @@ public final class DruidStorageHandlerUtils {
   /**
    * Method that submits a request to an Http address and retrieves the result.
    * The caller is responsible for closing the stream once it finishes consuming it.
+   *
    * @param client
    * @param request
+   *
    * @return
+   *
    * @throws IOException
    */
   public static InputStream submitRequest(HttpClient client, Request request)
@@ -87,4 +179,237 @@ public final class DruidStorageHandlerUtils {
     return response;
   }
 
+  /**
+   * @param taskDir path to the  directory containing the segments descriptor info
+   *                 the descriptor path will be .../workingPath/task_id/{@link DruidStorageHandler#SEGMENTS_DESCRIPTOR_DIR_NAME}/*.json
+   * @param conf     hadoop conf to get the file system
+   *
+   * @return List of DataSegments
+   *
+   * @throws IOException can be for the case we did not produce data.
+   */
+  public static List<DataSegment> getPublishedSegments(Path taskDir, Configuration conf)
+          throws IOException {
+    ImmutableList.Builder<DataSegment> publishedSegmentsBuilder = ImmutableList.builder();
+    FileSystem fs = taskDir.getFileSystem(conf);
+    for (FileStatus fileStatus : fs.listStatus(taskDir)) {
+      final DataSegment segment = JSON_MAPPER
+              .readValue(fs.open(fileStatus.getPath()), DataSegment.class);
+      publishedSegmentsBuilder.add(segment);
+    }
+    List<DataSegment> publishedSegments = publishedSegmentsBuilder.build();
+    return publishedSegments;
+  }
+
+  /**
+   * This function will write to filesystem serialized from of segment descriptor
+   * if an existing file exists it will try to replace it.
+   *
+   * @param outputFS       filesystem
+   * @param segment        DataSegment object
+   * @param descriptorPath path
+   *
+   * @throws IOException
+   */
+  public static void writeSegmentDescriptor(
+          final FileSystem outputFS,
+          final DataSegment segment,
+          final Path descriptorPath
+  )
+          throws IOException {
+    final DataPusher descriptorPusher = (DataPusher) RetryProxy.create(
+            DataPusher.class, new DataPusher() {
+              @Override
+              public long push() throws IOException {
+                try {
+                  if (outputFS.exists(descriptorPath)) {
+                    if (!outputFS.delete(descriptorPath, false)) {
+                      throw new IOException(
+                              String.format("Failed to delete descriptor at [%s]", descriptorPath));
+                    }
+                  }
+                  try (final OutputStream descriptorOut = outputFS.create(
+                          descriptorPath,
+                          true,
+                          DEFAULT_FS_BUFFER_SIZE
+                  )) {
+                    JSON_MAPPER.writeValue(descriptorOut, segment);
+                    descriptorOut.flush();
+                  }
+                } catch (RuntimeException | IOException ex) {
+                  throw ex;
+                }
+                return -1;
+              }
+            },
+            RetryPolicies
+                    .exponentialBackoffRetry(NUM_RETRIES, SECONDS_BETWEEN_RETRIES, TimeUnit.SECONDS)
+    );
+    descriptorPusher.push();
+  }
+
+  /**
+   * @param connector                   SQL metadata connector to the metadata storage
+   * @param metadataStorageTablesConfig Table config
+   *
+   * @return all the active data sources in the metadata storage
+   */
+  public static Collection<String> getAllDataSourceNames(SQLMetadataConnector connector,
+          final MetadataStorageTablesConfig metadataStorageTablesConfig
+  ) {
+    return connector.getDBI().withHandle(
+            new HandleCallback<List<String>>() {
+              @Override
+              public List<String> withHandle(Handle handle) throws Exception {
+                return handle.createQuery(
+                        String.format("SELECT DISTINCT(datasource) FROM %s WHERE used = true",
+                                metadataStorageTablesConfig.getSegmentsTable()
+                        ))
+                        .fold(Lists.<String>newArrayList(),
+                                new Folder3<ArrayList<String>, Map<String, Object>>() {
+                                  @Override
+                                  public ArrayList<String> fold(ArrayList<String> druidDataSources,
+                                          Map<String, Object> stringObjectMap,
+                                          FoldController foldController,
+                                          StatementContext statementContext) throws SQLException {
+                                    druidDataSources.add(
+                                            MapUtils.getString(stringObjectMap, "datasource")
+                                    );
+                                    return druidDataSources;
+                                  }
+                                }
+                        );
+
+              }
+            }
+    );
+  }
+
+  /**
+   * @param connector                   SQL connector to metadata
+   * @param metadataStorageTablesConfig Tables configuration
+   * @param dataSource                  Name of data source
+   *
+   * @return true if the data source was successfully disabled false otherwise
+   */
+  public static boolean disableDataSource(SQLMetadataConnector connector,
+          final MetadataStorageTablesConfig metadataStorageTablesConfig, final String dataSource
+  ) {
+    try {
+      if (!getAllDataSourceNames(connector, metadataStorageTablesConfig).contains(dataSource)) {
+        DruidStorageHandler.LOG
+                .warn(String.format("Cannot delete data source [%s], does not exist", dataSource));
+        return false;
+      }
+
+      connector.getDBI().withHandle(
+              new HandleCallback<Void>() {
+                @Override
+                public Void withHandle(Handle handle) throws Exception {
+                  handle.createStatement(
+                          String.format("UPDATE %s SET used=false WHERE dataSource = :dataSource",
+                                  metadataStorageTablesConfig.getSegmentsTable()
+                          )
+                  )
+                          .bind("dataSource", dataSource)
+                          .execute();
+
+                  return null;
+                }
+              }
+      );
+
+    } catch (Exception e) {
+      DruidStorageHandler.LOG.error(String.format("Error removing dataSource %s", dataSource), e);
+      return false;
+    }
+    return true;
+  }
+
+  /**
+   * @param connector                   SQL connector to metadata
+   * @param metadataStorageTablesConfig Tables configuration
+   * @param dataSource                  Name of data source
+   *
+   * @return List of all data segments part of the given data source
+   */
+  public static List<DataSegment> getDataSegmentList(final SQLMetadataConnector connector,
+          final MetadataStorageTablesConfig metadataStorageTablesConfig, final String dataSource
+  ) {
+    List<DataSegment> segmentList = connector.retryTransaction(
+            new TransactionCallback<List<DataSegment>>() {
+              @Override
+              public List<DataSegment> inTransaction(
+                      Handle handle, TransactionStatus status
+              ) throws Exception {
+                return handle
+                        .createQuery(String.format(
+                                "SELECT payload FROM %s WHERE dataSource = :dataSource",
+                                metadataStorageTablesConfig.getSegmentsTable()
+                        ))
+                        .setFetchSize(getStreamingFetchSize(connector))
+                        .bind("dataSource", dataSource)
+                        .map(ByteArrayMapper.FIRST)
+                        .fold(
+                                new ArrayList<DataSegment>(),
+                                new Folder3<List<DataSegment>, byte[]>() {
+                                  @Override
+                                  public List<DataSegment> fold(List<DataSegment> accumulator,
+                                          byte[] payload, FoldController control,
+                                          StatementContext ctx
+                                  ) throws SQLException {
+                                    try {
+                                      final DataSegment segment = DATA_SEGMENT_INTERNER.intern(
+                                              JSON_MAPPER.readValue(
+                                                      payload,
+                                                      DataSegment.class
+                                              ));
+
+                                      accumulator.add(segment);
+                                      return accumulator;
+                                    } catch (Exception e) {
+                                      throw new SQLException(e.toString());
+                                    }
+                                  }
+                                }
+                        );
+              }
+            }
+            , 3, SQLMetadataConnector.DEFAULT_MAX_TRIES);
+    return segmentList;
+  }
+
+  /**
+   * @param connector
+   *
+   * @return streaming fetch size.
+   */
+  private static int getStreamingFetchSize(SQLMetadataConnector connector) {
+    if (connector instanceof MySQLConnector) {
+      return Integer.MIN_VALUE;
+    }
+    return DEFAULT_STREAMING_RESULT_SIZE;
+  }
+
+  /**
+   * @param pushedSegment
+   * @param segmentsDescriptorDir
+   *
+   * @return a sanitize file name
+   */
+  public static Path makeSegmentDescriptorOutputPath(DataSegment pushedSegment,
+          Path segmentsDescriptorDir
+  ) {
+    return new Path(
+            segmentsDescriptorDir,
+            String.format("%s.json", pushedSegment.getIdentifier().replace(":", ""))
+    );
+  }
+
+  /**
+   * Simple interface for retry operations
+   */
+  public interface DataPusher {
+    long push() throws IOException;
+  }
 }

http://git-wip-us.apache.org/repos/asf/hive/blob/590687bc/druid-handler/src/java/org/apache/hadoop/hive/druid/HiveDruidOutputFormat.java
----------------------------------------------------------------------
diff --git a/druid-handler/src/java/org/apache/hadoop/hive/druid/HiveDruidOutputFormat.java b/druid-handler/src/java/org/apache/hadoop/hive/druid/HiveDruidOutputFormat.java
deleted file mode 100644
index 45e31d6..0000000
--- a/druid-handler/src/java/org/apache/hadoop/hive/druid/HiveDruidOutputFormat.java
+++ /dev/null
@@ -1,55 +0,0 @@
-/**
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *     http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package org.apache.hadoop.hive.druid;
-
-import java.io.IOException;
-import java.util.Properties;
-
-import org.apache.hadoop.fs.FileSystem;
-import org.apache.hadoop.fs.Path;
-import org.apache.hadoop.hive.ql.exec.FileSinkOperator;
-import org.apache.hadoop.hive.ql.io.HiveOutputFormat;
-import org.apache.hadoop.mapred.JobConf;
-import org.apache.hadoop.mapred.RecordWriter;
-import org.apache.hadoop.util.Progressable;
-
-/**
- * Place holder for Druid output format. Currently not implemented.
- */
-@SuppressWarnings("rawtypes")
-public class HiveDruidOutputFormat implements HiveOutputFormat {
-
-  @Override
-  public RecordWriter getRecordWriter(FileSystem ignored, JobConf job, String name,
-          Progressable progress) throws IOException {
-    throw new UnsupportedOperationException();
-  }
-
-  @Override
-  public void checkOutputSpecs(FileSystem ignored, JobConf job) throws IOException {
-    throw new UnsupportedOperationException();
-  }
-
-  @Override
-  public FileSinkOperator.RecordWriter getHiveRecordWriter(JobConf jc, Path finalOutPath,
-          Class valueClass, boolean isCompressed, Properties tableProperties, Progressable progress)
-                  throws IOException {
-    throw new UnsupportedOperationException();
-  }
-
-}

http://git-wip-us.apache.org/repos/asf/hive/blob/590687bc/druid-handler/src/java/org/apache/hadoop/hive/druid/HiveDruidQueryBasedInputFormat.java
----------------------------------------------------------------------
diff --git a/druid-handler/src/java/org/apache/hadoop/hive/druid/HiveDruidQueryBasedInputFormat.java b/druid-handler/src/java/org/apache/hadoop/hive/druid/HiveDruidQueryBasedInputFormat.java
deleted file mode 100644
index 612f853..0000000
--- a/druid-handler/src/java/org/apache/hadoop/hive/druid/HiveDruidQueryBasedInputFormat.java
+++ /dev/null
@@ -1,376 +0,0 @@
-/**
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *     http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package org.apache.hadoop.hive.druid;
-
-import java.io.IOException;
-import java.io.InputStream;
-import java.util.ArrayList;
-import java.util.Arrays;
-import java.util.HashMap;
-import java.util.List;
-import java.util.Map;
-
-import org.apache.calcite.adapter.druid.DruidDateTimeUtils;
-import org.apache.calcite.adapter.druid.DruidTable;
-import org.apache.commons.lang3.StringEscapeUtils;
-import org.apache.commons.lang3.StringUtils;
-import org.apache.hadoop.conf.Configuration;
-import org.apache.hadoop.fs.Path;
-import org.apache.hadoop.hive.conf.Constants;
-import org.apache.hadoop.hive.conf.HiveConf;
-import org.apache.hadoop.hive.druid.serde.DruidGroupByQueryRecordReader;
-import org.apache.hadoop.hive.druid.serde.DruidQueryRecordReader;
-import org.apache.hadoop.hive.druid.serde.DruidSelectQueryRecordReader;
-import org.apache.hadoop.hive.druid.serde.DruidTimeseriesQueryRecordReader;
-import org.apache.hadoop.hive.druid.serde.DruidTopNQueryRecordReader;
-import org.apache.hadoop.hive.druid.serde.DruidWritable;
-import org.apache.hadoop.hive.shims.ShimLoader;
-import org.apache.hadoop.io.NullWritable;
-import org.apache.hadoop.mapred.JobConf;
-import org.apache.hadoop.mapred.Reporter;
-import org.apache.hadoop.mapreduce.InputFormat;
-import org.apache.hadoop.mapreduce.InputSplit;
-import org.apache.hadoop.mapreduce.Job;
-import org.apache.hadoop.mapreduce.JobContext;
-import org.apache.hadoop.mapreduce.RecordReader;
-import org.apache.hadoop.mapreduce.TaskAttemptContext;
-import org.apache.hadoop.mapreduce.lib.input.FileInputFormat;
-import org.joda.time.Interval;
-import org.joda.time.Period;
-import org.joda.time.chrono.ISOChronology;
-import org.slf4j.Logger;
-import org.slf4j.LoggerFactory;
-
-import com.fasterxml.jackson.core.type.TypeReference;
-import com.metamx.common.lifecycle.Lifecycle;
-import com.metamx.http.client.HttpClient;
-import com.metamx.http.client.HttpClientConfig;
-import com.metamx.http.client.HttpClientInit;
-
-import io.druid.query.Druids;
-import io.druid.query.Druids.SegmentMetadataQueryBuilder;
-import io.druid.query.Druids.SelectQueryBuilder;
-import io.druid.query.Druids.TimeBoundaryQueryBuilder;
-import io.druid.query.Query;
-import io.druid.query.Result;
-import io.druid.query.metadata.metadata.SegmentAnalysis;
-import io.druid.query.metadata.metadata.SegmentMetadataQuery;
-import io.druid.query.select.PagingSpec;
-import io.druid.query.select.SelectQuery;
-import io.druid.query.spec.MultipleIntervalSegmentSpec;
-import io.druid.query.timeboundary.TimeBoundaryQuery;
-import io.druid.query.timeboundary.TimeBoundaryResultValue;
-
-/**
- * Druid query based input format.
- * 
- * Given a query and the Druid broker address, it will send it, and retrieve
- * and parse the results.
- */
-public class HiveDruidQueryBasedInputFormat extends InputFormat<NullWritable, DruidWritable>
-        implements org.apache.hadoop.mapred.InputFormat<NullWritable, DruidWritable> {
-
-  protected static final Logger LOG = LoggerFactory.getLogger(HiveDruidQueryBasedInputFormat.class);
-
-  @Override
-  public org.apache.hadoop.mapred.InputSplit[] getSplits(JobConf job, int numSplits)
-          throws IOException {
-    return getInputSplits(job);
-  }
-
-  @Override
-  public List<InputSplit> getSplits(JobContext context) throws IOException, InterruptedException {
-    return Arrays.<InputSplit> asList(getInputSplits(context.getConfiguration()));
-  }
-
-  @SuppressWarnings("deprecation")
-  private HiveDruidSplit[] getInputSplits(Configuration conf) throws IOException {
-    String address = HiveConf.getVar(conf,
-            HiveConf.ConfVars.HIVE_DRUID_BROKER_DEFAULT_ADDRESS);
-    if (StringUtils.isEmpty(address)) {
-      throw new IOException("Druid broker address not specified in configuration");
-    }
-    String druidQuery = StringEscapeUtils.unescapeJava(conf.get(Constants.DRUID_QUERY_JSON));
-    String druidQueryType;
-    if (StringUtils.isEmpty(druidQuery)) {
-      // Empty, maybe because CBO did not run; we fall back to
-      // full Select query
-      if (LOG.isWarnEnabled()) {
-        LOG.warn("Druid query is empty; creating Select query");
-      }
-      String dataSource = conf.get(Constants.DRUID_DATA_SOURCE);
-      if (dataSource == null) {
-        throw new IOException("Druid data source cannot be empty");
-      }
-      druidQuery = createSelectStarQuery(address, dataSource);
-      druidQueryType = Query.SELECT;
-    } else {
-      druidQueryType = conf.get(Constants.DRUID_QUERY_TYPE);
-      if (druidQueryType == null) {
-        throw new IOException("Druid query type not recognized");
-      }
-    }
-
-    // hive depends on FileSplits
-    Job job = new Job(conf);
-    JobContext jobContext = ShimLoader.getHadoopShims().newJobContext(job);
-    Path [] paths = FileInputFormat.getInputPaths(jobContext);
-
-    switch (druidQueryType) {
-      case Query.TIMESERIES:
-      case Query.TOPN:
-      case Query.GROUP_BY:
-        return new HiveDruidSplit[] { new HiveDruidSplit(address, druidQuery, paths[0]) };
-      case Query.SELECT:
-        return splitSelectQuery(conf, address, druidQuery, paths[0]);
-      default:
-        throw new IOException("Druid query type not recognized");
-    }
-  }
-
-  private static String createSelectStarQuery(String address, String dataSource) throws IOException {
-    // Create Select query
-    SelectQueryBuilder builder = new Druids.SelectQueryBuilder();
-    builder.dataSource(dataSource);
-    builder.intervals(Arrays.asList(DruidTable.DEFAULT_INTERVAL));
-    builder.pagingSpec(PagingSpec.newSpec(1));
-    Map<String, Object> context = new HashMap<>();
-    context.put(Constants.DRUID_QUERY_FETCH, false);
-    builder.context(context);
-    return DruidStorageHandlerUtils.JSON_MAPPER.writeValueAsString(builder.build());
-  }
-
-  /* Method that splits Select query depending on the threshold so read can be
-   * parallelized */
-  private static HiveDruidSplit[] splitSelectQuery(Configuration conf, String address,
-          String druidQuery, Path dummyPath) throws IOException {
-    final int selectThreshold = (int) HiveConf.getIntVar(
-            conf, HiveConf.ConfVars.HIVE_DRUID_SELECT_THRESHOLD);
-    final int numConnection = HiveConf
-            .getIntVar(conf, HiveConf.ConfVars.HIVE_DRUID_NUM_HTTP_CONNECTION);
-    final Period readTimeout = new Period(
-            HiveConf.getVar(conf, HiveConf.ConfVars.HIVE_DRUID_HTTP_READ_TIMEOUT));
-    SelectQuery query;
-    try {
-      query = DruidStorageHandlerUtils.JSON_MAPPER.readValue(druidQuery, SelectQuery.class);
-    } catch (Exception e) {
-      throw new IOException(e);
-    }
-
-    final boolean isFetch = query.getContextBoolean(Constants.DRUID_QUERY_FETCH, false);
-    if (isFetch) {
-      // If it has a limit, we use it and we do not split the query
-      return new HiveDruidSplit[] { new HiveDruidSplit(
-              address, DruidStorageHandlerUtils.JSON_MAPPER.writeValueAsString(query), dummyPath) };
-    }
-
-    // We do not have the number of rows, thus we need to execute a
-    // Segment Metadata query to obtain number of rows
-    SegmentMetadataQueryBuilder metadataBuilder = new Druids.SegmentMetadataQueryBuilder();
-    metadataBuilder.dataSource(query.getDataSource());
-    metadataBuilder.intervals(query.getIntervals());
-    metadataBuilder.merge(true);
-    metadataBuilder.analysisTypes();
-    SegmentMetadataQuery metadataQuery = metadataBuilder.build();
-
-    HttpClient client = HttpClientInit.createClient(
-            HttpClientConfig.builder().withNumConnections(numConnection)
-                    .withReadTimeout(readTimeout.toStandardDuration()).build(), new Lifecycle());
-    InputStream response;
-    try {
-      response = DruidStorageHandlerUtils.submitRequest(client,
-              DruidStorageHandlerUtils.createRequest(address, metadataQuery));
-    } catch (Exception e) {
-      throw new IOException(org.apache.hadoop.util.StringUtils.stringifyException(e));
-    }
-
-    // Retrieve results
-    List<SegmentAnalysis> metadataList;
-    try {
-      metadataList = DruidStorageHandlerUtils.SMILE_MAPPER.readValue(response,
-            new TypeReference<List<SegmentAnalysis>>() {});
-    } catch (Exception e) {
-      response.close();
-      throw new IOException(org.apache.hadoop.util.StringUtils.stringifyException(e));
-    }
-    if (metadataList == null || metadataList.isEmpty()) {
-      throw new IOException("Connected to Druid but could not retrieve datasource information");
-    }
-    if (metadataList.size() != 1) {
-      throw new IOException("Information about segments should have been merged");
-    }
-
-    final long numRows = metadataList.get(0).getNumRows();
-
-    query = query.withPagingSpec(PagingSpec.newSpec(Integer.MAX_VALUE));
-    if (numRows <= selectThreshold) {
-      // We are not going to split it
-      return new HiveDruidSplit[] { new HiveDruidSplit(address,
-              DruidStorageHandlerUtils.JSON_MAPPER.writeValueAsString(query), dummyPath) };
-    }
-
-    // If the query does not specify a timestamp, we obtain the total time using
-    // a Time Boundary query. Then, we use the information to split the query
-    // following the Select threshold configuration property
-    final List<Interval> intervals = new ArrayList<>();
-    if (query.getIntervals().size() == 1 && query.getIntervals().get(0).withChronology(
-            ISOChronology.getInstanceUTC()).equals(DruidTable.DEFAULT_INTERVAL)) {
-      // Default max and min, we should execute a time boundary query to get a
-      // more precise range
-      TimeBoundaryQueryBuilder timeBuilder = new Druids.TimeBoundaryQueryBuilder();
-      timeBuilder.dataSource(query.getDataSource());
-      TimeBoundaryQuery timeQuery = timeBuilder.build();
-
-      try {
-        response = DruidStorageHandlerUtils.submitRequest(client,
-                DruidStorageHandlerUtils.createRequest(address, timeQuery));
-      } catch (Exception e) {
-        throw new IOException(org.apache.hadoop.util.StringUtils.stringifyException(e));
-      }
-
-      // Retrieve results
-      List<Result<TimeBoundaryResultValue>> timeList;
-      try {
-        timeList = DruidStorageHandlerUtils.SMILE_MAPPER.readValue(response,
-              new TypeReference<List<Result<TimeBoundaryResultValue>>>() {});
-      } catch (Exception e) {
-        response.close();
-        throw new IOException(org.apache.hadoop.util.StringUtils.stringifyException(e));
-      }
-      if (timeList == null || timeList.isEmpty()) {
-        throw new IOException("Connected to Druid but could not retrieve time boundary information");
-      }
-      if (timeList.size() != 1) {
-        throw new IOException("We should obtain a single time boundary");
-      }
-
-      intervals.add(new Interval(timeList.get(0).getValue().getMinTime().getMillis(),
-              timeList.get(0).getValue().getMaxTime().getMillis(), ISOChronology.getInstanceUTC()));
-    } else {
-      intervals.addAll(query.getIntervals());
-    }
-
-    // Create (numRows/default threshold) input splits
-    int numSplits = (int) Math.ceil((double) numRows / selectThreshold);
-    List<List<Interval>> newIntervals = createSplitsIntervals(intervals, numSplits);
-    HiveDruidSplit[] splits = new HiveDruidSplit[numSplits];
-    for (int i = 0; i < numSplits; i++) {
-      // Create partial Select query
-      final SelectQuery partialQuery = query.withQuerySegmentSpec(
-              new MultipleIntervalSegmentSpec(newIntervals.get(i)));
-      splits[i] = new HiveDruidSplit(address,
-              DruidStorageHandlerUtils.JSON_MAPPER.writeValueAsString(partialQuery), dummyPath);
-    }
-    return splits;
-  }
-
-  private static List<List<Interval>> createSplitsIntervals(List<Interval> intervals, int numSplits) {
-    final long totalTime = DruidDateTimeUtils.extractTotalTime(intervals);
-    long startTime = intervals.get(0).getStartMillis();
-    long endTime = startTime;
-    long currTime = 0;
-    List<List<Interval>> newIntervals = new ArrayList<>();
-    for (int i = 0, posIntervals = 0; i < numSplits; i++) {
-      final long rangeSize = Math.round( (double) (totalTime * (i + 1)) / numSplits) -
-              Math.round( (double) (totalTime * i) / numSplits);
-      // Create the new interval(s)
-      List<Interval> currentIntervals = new ArrayList<>();
-      while (posIntervals < intervals.size()) {
-        final Interval interval = intervals.get(posIntervals);
-        final long expectedRange = rangeSize - currTime;
-        if (interval.getEndMillis() - startTime >= expectedRange) {
-          endTime = startTime + expectedRange;
-          currentIntervals.add(new Interval(startTime, endTime, ISOChronology.getInstanceUTC()));
-          startTime = endTime;
-          currTime = 0;
-          break;
-        }
-        endTime = interval.getEndMillis();
-        currentIntervals.add(new Interval(startTime, endTime, ISOChronology.getInstanceUTC()));
-        currTime += (endTime - startTime);
-        startTime = intervals.get(++posIntervals).getStartMillis();
-      }
-      newIntervals.add(currentIntervals);
-    }
-    assert endTime == intervals.get(intervals.size()-1).getEndMillis();
-    return newIntervals;
-  }
-
-  @Override
-  public org.apache.hadoop.mapred.RecordReader<NullWritable, DruidWritable> getRecordReader(
-          org.apache.hadoop.mapred.InputSplit split, JobConf job, Reporter reporter)
-                  throws IOException {
-    // We need to provide a different record reader for every type of Druid query.
-    // The reason is that Druid results format is different for each type.
-    final DruidQueryRecordReader<?,?> reader;
-    final String druidQueryType = job.get(Constants.DRUID_QUERY_TYPE);
-    if (druidQueryType == null) {
-      reader = new DruidSelectQueryRecordReader(); // By default
-      reader.initialize((HiveDruidSplit)split, job);
-      return reader;
-    }
-    switch (druidQueryType) {
-      case Query.TIMESERIES:
-        reader = new DruidTimeseriesQueryRecordReader();
-        break;
-      case Query.TOPN:
-        reader = new DruidTopNQueryRecordReader();
-        break;
-      case Query.GROUP_BY:
-        reader = new DruidGroupByQueryRecordReader();
-        break;
-      case Query.SELECT:
-        reader = new DruidSelectQueryRecordReader();
-        break;
-      default:
-        throw new IOException("Druid query type not recognized");
-    }
-    reader.initialize((HiveDruidSplit)split, job);
-    return reader;
-  }
-
-  @Override
-  public RecordReader<NullWritable, DruidWritable> createRecordReader(InputSplit split,
-          TaskAttemptContext context) throws IOException, InterruptedException {
-    // We need to provide a different record reader for every type of Druid query.
-    // The reason is that Druid results format is different for each type.
-    final String druidQueryType = context.getConfiguration().get(Constants.DRUID_QUERY_TYPE);
-    if (druidQueryType == null) {
-      return new DruidSelectQueryRecordReader(); // By default
-    }
-    final DruidQueryRecordReader<?,?> reader;
-    switch (druidQueryType) {
-      case Query.TIMESERIES:
-        reader = new DruidTimeseriesQueryRecordReader();
-        break;
-      case Query.TOPN:
-        reader = new DruidTopNQueryRecordReader();
-        break;
-      case Query.GROUP_BY:
-        reader = new DruidGroupByQueryRecordReader();
-        break;
-      case Query.SELECT:
-        reader = new DruidSelectQueryRecordReader();
-        break;
-      default:
-        throw new IOException("Druid query type not recognized");
-    }
-    return reader;
-  }
-
-}

http://git-wip-us.apache.org/repos/asf/hive/blob/590687bc/druid-handler/src/java/org/apache/hadoop/hive/druid/HiveDruidSplit.java
----------------------------------------------------------------------
diff --git a/druid-handler/src/java/org/apache/hadoop/hive/druid/HiveDruidSplit.java b/druid-handler/src/java/org/apache/hadoop/hive/druid/HiveDruidSplit.java
deleted file mode 100644
index 3fba5d0..0000000
--- a/druid-handler/src/java/org/apache/hadoop/hive/druid/HiveDruidSplit.java
+++ /dev/null
@@ -1,83 +0,0 @@
-/**
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *     http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package org.apache.hadoop.hive.druid;
-
-import java.io.DataInput;
-import java.io.DataOutput;
-import java.io.IOException;
-
-import org.apache.hadoop.fs.Path;
-import org.apache.hadoop.mapred.FileSplit;
-
-/**
- * Druid split. Its purpose is to trigger query execution in Druid.
- */
-public class HiveDruidSplit extends FileSplit implements org.apache.hadoop.mapred.InputSplit {
-
-  private String address;
-  private String druidQuery;
-
-  // required for deserialization
-  public HiveDruidSplit() {
-    super((Path) null, 0, 0, (String[]) null);
-  }
-
-  public HiveDruidSplit(String address, String druidQuery, Path dummyPath) {
-    super(dummyPath, 0, 0, (String[]) null);
-    this.address = address;
-    this.druidQuery = druidQuery;
-  }
-
-  @Override
-  public void write(DataOutput out) throws IOException {
-    super.write(out);
-    out.writeUTF(address);
-    out.writeUTF(druidQuery);
-  }
-
-  @Override
-  public void readFields(DataInput in) throws IOException {
-    super.readFields(in);
-    address = in.readUTF();
-    druidQuery = in.readUTF();
-  }
-
-  @Override
-  public long getLength() {
-    return 0L;
-  }
-
-  @Override
-  public String[] getLocations() {
-    return new String[] {""} ;
-  }
-
-  public String getAddress() {
-    return address;
-  }
-
-  public String getDruidQuery() {
-    return druidQuery;
-  }
-
-  @Override
-  public String toString() {
-    return "HiveDruidSplit{" + address + ", " + druidQuery + "}";
-  }
-
-}

http://git-wip-us.apache.org/repos/asf/hive/blob/590687bc/druid-handler/src/java/org/apache/hadoop/hive/druid/io/DruidOutputFormat.java
----------------------------------------------------------------------
diff --git a/druid-handler/src/java/org/apache/hadoop/hive/druid/io/DruidOutputFormat.java b/druid-handler/src/java/org/apache/hadoop/hive/druid/io/DruidOutputFormat.java
new file mode 100644
index 0000000..86ddca8
--- /dev/null
+++ b/druid-handler/src/java/org/apache/hadoop/hive/druid/io/DruidOutputFormat.java
@@ -0,0 +1,204 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ * <p>
+ * http://www.apache.org/licenses/LICENSE-2.0
+ * <p>
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.hadoop.hive.druid.io;
+
+import com.google.common.base.Preconditions;
+import com.google.common.collect.ImmutableList;
+import com.google.common.collect.Lists;
+import com.metamx.common.Granularity;
+import io.druid.data.input.impl.DimensionSchema;
+import io.druid.data.input.impl.DimensionsSpec;
+import io.druid.data.input.impl.InputRowParser;
+import io.druid.data.input.impl.MapInputRowParser;
+import io.druid.data.input.impl.StringDimensionSchema;
+import io.druid.data.input.impl.TimeAndDimsParseSpec;
+import io.druid.data.input.impl.TimestampSpec;
+import io.druid.query.aggregation.AggregatorFactory;
+import io.druid.query.aggregation.DoubleSumAggregatorFactory;
+import io.druid.query.aggregation.LongSumAggregatorFactory;
+import io.druid.segment.indexing.DataSchema;
+import io.druid.segment.indexing.RealtimeTuningConfig;
+import io.druid.segment.indexing.granularity.GranularitySpec;
+import io.druid.segment.indexing.granularity.UniformGranularitySpec;
+import io.druid.segment.loading.DataSegmentPusher;
+import io.druid.segment.realtime.plumber.CustomVersioningPolicy;
+import io.druid.storage.hdfs.HdfsDataSegmentPusher;
+import io.druid.storage.hdfs.HdfsDataSegmentPusherConfig;
+import org.apache.calcite.adapter.druid.DruidTable;
+import org.apache.commons.lang.StringUtils;
+import org.apache.hadoop.fs.FileSystem;
+import org.apache.hadoop.fs.Path;
+import org.apache.hadoop.hive.conf.Constants;
+import org.apache.hadoop.hive.conf.HiveConf;
+import org.apache.hadoop.hive.druid.DruidStorageHandlerUtils;
+import org.apache.hadoop.hive.druid.serde.DruidWritable;
+import org.apache.hadoop.hive.ql.exec.FileSinkOperator;
+import org.apache.hadoop.hive.ql.io.HiveOutputFormat;
+import org.apache.hadoop.hive.serde.serdeConstants;
+import org.apache.hadoop.hive.serde2.objectinspector.ObjectInspector;
+import org.apache.hadoop.hive.serde2.typeinfo.TypeInfo;
+import org.apache.hadoop.hive.serde2.typeinfo.TypeInfoUtils;
+import org.apache.hadoop.io.Writable;
+import org.apache.hadoop.mapred.JobConf;
+import org.apache.hadoop.mapred.RecordWriter;
+import org.apache.hadoop.util.Progressable;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import java.io.File;
+import java.io.IOException;
+import java.util.ArrayList;
+import java.util.List;
+import java.util.Map;
+import java.util.Properties;
+
+import static org.apache.hadoop.hive.druid.DruidStorageHandler.SEGMENTS_DESCRIPTOR_DIR_NAME;
+
+public class DruidOutputFormat<K, V> implements HiveOutputFormat<K, DruidWritable> {
+
+  protected static final Logger LOG = LoggerFactory.getLogger(DruidOutputFormat.class);
+
+  @Override
+  public FileSinkOperator.RecordWriter getHiveRecordWriter(
+          JobConf jc,
+          Path finalOutPath,
+          Class<? extends Writable> valueClass,
+          boolean isCompressed,
+          Properties tableProperties,
+          Progressable progress
+  ) throws IOException {
+
+    final String segmentGranularity =
+            tableProperties.getProperty(Constants.DRUID_SEGMENT_GRANULARITY) != null ?
+                    tableProperties.getProperty(Constants.DRUID_SEGMENT_GRANULARITY) :
+                    HiveConf.getVar(jc, HiveConf.ConfVars.HIVE_DRUID_INDEXING_GRANULARITY);
+    final String dataSource = tableProperties.getProperty(Constants.DRUID_DATA_SOURCE);
+    final String segmentDirectory =
+            tableProperties.getProperty(Constants.DRUID_SEGMENT_DIRECTORY) != null
+                    ? tableProperties.getProperty(Constants.DRUID_SEGMENT_DIRECTORY)
+                    : HiveConf.getVar(jc, HiveConf.ConfVars.DRUID_SEGMENT_DIRECTORY);
+
+    final HdfsDataSegmentPusherConfig hdfsDataSegmentPusherConfig = new HdfsDataSegmentPusherConfig();
+    hdfsDataSegmentPusherConfig.setStorageDirectory(segmentDirectory);
+    final DataSegmentPusher hdfsDataSegmentPusher = new HdfsDataSegmentPusher(
+            hdfsDataSegmentPusherConfig, jc, DruidStorageHandlerUtils.JSON_MAPPER);
+
+    final GranularitySpec granularitySpec = new UniformGranularitySpec(
+            Granularity.valueOf(segmentGranularity),
+            null,
+            null
+    );
+
+    final String columnNameProperty = tableProperties.getProperty(serdeConstants.LIST_COLUMNS);
+    final String columnTypeProperty = tableProperties.getProperty(serdeConstants.LIST_COLUMN_TYPES);
+
+    if (StringUtils.isEmpty(columnNameProperty) || StringUtils.isEmpty(columnTypeProperty)) {
+      throw new IllegalStateException(
+              String.format("List of columns names [%s] or columns type [%s] is/are not present",
+                      columnNameProperty, columnTypeProperty
+              ));
+    }
+    ArrayList<String> columnNames = new ArrayList<String>();
+    for (String name : columnNameProperty.split(",")) {
+      columnNames.add(name);
+    }
+    if (!columnNames.contains(DruidTable.DEFAULT_TIMESTAMP_COLUMN)) {
+      throw new IllegalStateException("Timestamp column (' " + DruidTable.DEFAULT_TIMESTAMP_COLUMN +
+              "') not specified in create table; list of columns is : " +
+              tableProperties.getProperty(serdeConstants.LIST_COLUMNS));
+    }
+    ArrayList<TypeInfo> columnTypes = TypeInfoUtils.getTypeInfosFromTypeString(columnTypeProperty);
+
+    // Default, all columns that are not metrics or timestamp, are treated as dimensions
+    final List<DimensionSchema> dimensions = new ArrayList<>();
+    ImmutableList.Builder<AggregatorFactory> aggregatorFactoryBuilder = ImmutableList.builder();
+    for (int i = 0; i < columnTypes.size(); i++) {
+      TypeInfo f = columnTypes.get(i);
+      assert f.getCategory() == ObjectInspector.Category.PRIMITIVE;
+      AggregatorFactory af;
+      switch (f.getTypeName()) {
+        case serdeConstants.TINYINT_TYPE_NAME:
+        case serdeConstants.SMALLINT_TYPE_NAME:
+        case serdeConstants.INT_TYPE_NAME:
+        case serdeConstants.BIGINT_TYPE_NAME:
+          af = new LongSumAggregatorFactory(columnNames.get(i), columnNames.get(i));
+          break;
+        case serdeConstants.FLOAT_TYPE_NAME:
+        case serdeConstants.DOUBLE_TYPE_NAME:
+          af = new DoubleSumAggregatorFactory(columnNames.get(i), columnNames.get(i));
+          break;
+        default:
+          // Dimension or timestamp
+          String columnName = columnNames.get(i);
+          if (!columnName.equals(DruidTable.DEFAULT_TIMESTAMP_COLUMN) && !columnName
+                  .equals(Constants.DRUID_TIMESTAMP_GRANULARITY_COL_NAME)) {
+            dimensions.add(new StringDimensionSchema(columnName));
+          }
+          continue;
+      }
+      aggregatorFactoryBuilder.add(af);
+    }
+    List<AggregatorFactory> aggregatorFactories = aggregatorFactoryBuilder.build();
+    final InputRowParser inputRowParser = new MapInputRowParser(new TimeAndDimsParseSpec(
+            new TimestampSpec(DruidTable.DEFAULT_TIMESTAMP_COLUMN, "auto", null),
+            new DimensionsSpec(dimensions,
+                    Lists.newArrayList(Constants.DRUID_TIMESTAMP_GRANULARITY_COL_NAME), null
+            )
+    ));
+
+    Map<String, Object> inputParser = DruidStorageHandlerUtils.JSON_MAPPER
+            .convertValue(inputRowParser, Map.class);
+
+    final DataSchema dataSchema = new DataSchema(
+            Preconditions.checkNotNull(dataSource, "Data source name is null"),
+            inputParser,
+            aggregatorFactories.toArray(new AggregatorFactory[aggregatorFactories.size()]),
+            granularitySpec,
+            DruidStorageHandlerUtils.JSON_MAPPER
+    );
+
+    final String workingPath = jc.get(Constants.DRUID_JOB_WORKING_DIRECTORY);
+    final String version = jc.get(Constants.DRUID_SEGMENT_VERSION);
+    Integer maxPartitionSize = HiveConf
+            .getIntVar(jc, HiveConf.ConfVars.HIVE_DRUID_MAX_PARTITION_SIZE);
+    String basePersistDirectory = HiveConf
+            .getVar(jc, HiveConf.ConfVars.HIVE_DRUID_BASE_PERSIST_DIRECTORY);
+    final RealtimeTuningConfig realtimeTuningConfig = RealtimeTuningConfig
+            .makeDefaultTuningConfig(new File(
+                    basePersistDirectory))
+            .withVersioningPolicy(new CustomVersioningPolicy(version));
+
+    LOG.debug(String.format("running with Data schema [%s] ", dataSchema));
+    return new DruidRecordWriter(dataSchema, realtimeTuningConfig, hdfsDataSegmentPusher,
+            maxPartitionSize, new Path(workingPath, SEGMENTS_DESCRIPTOR_DIR_NAME),
+            finalOutPath.getFileSystem(jc)
+    );
+  }
+
+  @Override
+  public RecordWriter<K, DruidWritable> getRecordWriter(
+          FileSystem ignored, JobConf job, String name, Progressable progress
+  ) throws IOException {
+    throw new UnsupportedOperationException("please implement me !");
+  }
+
+  @Override
+  public void checkOutputSpecs(FileSystem ignored, JobConf job) throws IOException {
+    throw new UnsupportedOperationException("not implemented yet");
+  }
+}