You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@hive.apache.org by jc...@apache.org on 2016/12/15 20:51:35 UTC

[1/4] hive git commit: HIVE-15277: Teach Hive how to create/delete Druid segments (Slim Bouguerra, reviewed by Jesus Camacho Rodriguez)

Repository: hive
Updated Branches:
  refs/heads/master 89362a14d -> 590687bc4


http://git-wip-us.apache.org/repos/asf/hive/blob/590687bc/ql/src/java/org/apache/hadoop/hive/ql/optimizer/SortedDynPartitionTimeGranularityOptimizer.java
----------------------------------------------------------------------
diff --git a/ql/src/java/org/apache/hadoop/hive/ql/optimizer/SortedDynPartitionTimeGranularityOptimizer.java b/ql/src/java/org/apache/hadoop/hive/ql/optimizer/SortedDynPartitionTimeGranularityOptimizer.java
new file mode 100644
index 0000000..7ea4754
--- /dev/null
+++ b/ql/src/java/org/apache/hadoop/hive/ql/optimizer/SortedDynPartitionTimeGranularityOptimizer.java
@@ -0,0 +1,363 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.hadoop.hive.ql.optimizer;
+
+import com.google.common.collect.Lists;
+import com.google.common.collect.Maps;
+import org.apache.commons.lang.StringUtils;
+import org.apache.hadoop.hive.conf.Constants;
+import org.apache.hadoop.hive.conf.HiveConf;
+import org.apache.hadoop.hive.metastore.api.FieldSchema;
+import org.apache.hadoop.hive.ql.exec.ColumnInfo;
+import org.apache.hadoop.hive.ql.exec.FileSinkOperator;
+import org.apache.hadoop.hive.ql.exec.Operator;
+import org.apache.hadoop.hive.ql.exec.OperatorFactory;
+import org.apache.hadoop.hive.ql.exec.ReduceSinkOperator;
+import org.apache.hadoop.hive.ql.exec.RowSchema;
+import org.apache.hadoop.hive.ql.exec.SelectOperator;
+import org.apache.hadoop.hive.ql.exec.UDF;
+import org.apache.hadoop.hive.ql.exec.Utilities;
+import org.apache.hadoop.hive.ql.exec.Utilities.ReduceField;
+import org.apache.hadoop.hive.ql.io.AcidUtils;
+import org.apache.hadoop.hive.ql.lib.DefaultGraphWalker;
+import org.apache.hadoop.hive.ql.lib.DefaultRuleDispatcher;
+import org.apache.hadoop.hive.ql.lib.Dispatcher;
+import org.apache.hadoop.hive.ql.lib.GraphWalker;
+import org.apache.hadoop.hive.ql.lib.Node;
+import org.apache.hadoop.hive.ql.lib.NodeProcessor;
+import org.apache.hadoop.hive.ql.lib.NodeProcessorCtx;
+import org.apache.hadoop.hive.ql.lib.Rule;
+import org.apache.hadoop.hive.ql.lib.RuleRegExp;
+import org.apache.hadoop.hive.ql.parse.ParseContext;
+import org.apache.hadoop.hive.ql.parse.SemanticException;
+import org.apache.hadoop.hive.ql.plan.ExprNodeColumnDesc;
+import org.apache.hadoop.hive.ql.plan.ExprNodeDesc;
+import org.apache.hadoop.hive.ql.plan.ExprNodeGenericFuncDesc;
+import org.apache.hadoop.hive.ql.plan.FileSinkDesc;
+import org.apache.hadoop.hive.ql.plan.OperatorDesc;
+import org.apache.hadoop.hive.ql.plan.PlanUtils;
+import org.apache.hadoop.hive.ql.plan.ReduceSinkDesc;
+import org.apache.hadoop.hive.ql.plan.SelectDesc;
+import org.apache.hadoop.hive.ql.plan.TableDesc;
+import org.apache.hadoop.hive.ql.udf.UDFDateFloorDay;
+import org.apache.hadoop.hive.ql.udf.UDFDateFloorHour;
+import org.apache.hadoop.hive.ql.udf.UDFDateFloorMinute;
+import org.apache.hadoop.hive.ql.udf.UDFDateFloorMonth;
+import org.apache.hadoop.hive.ql.udf.UDFDateFloorSecond;
+import org.apache.hadoop.hive.ql.udf.UDFDateFloorWeek;
+import org.apache.hadoop.hive.ql.udf.UDFDateFloorYear;
+import org.apache.hadoop.hive.ql.udf.generic.GenericUDFBridge;
+import org.apache.hadoop.hive.serde2.objectinspector.ObjectInspector;
+import org.apache.hadoop.hive.serde2.objectinspector.PrimitiveObjectInspector.PrimitiveCategory;
+import org.apache.hadoop.hive.serde2.typeinfo.PrimitiveTypeInfo;
+import org.apache.hadoop.hive.serde2.typeinfo.TypeInfoFactory;
+import org.apache.parquet.Strings;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import java.util.ArrayList;
+import java.util.HashMap;
+import java.util.LinkedHashMap;
+import java.util.List;
+import java.util.Map;
+import java.util.Stack;
+
+/**
+ * Introduces a RS before FS to partition data by configuration specified
+ * time granularity.
+ */
+public class SortedDynPartitionTimeGranularityOptimizer extends Transform {
+
+  @Override
+  public ParseContext transform(ParseContext pCtx) throws SemanticException {
+    // create a walker which walks the tree in a DFS manner while maintaining the
+    // operator stack. The dispatcher generates the plan from the operator tree
+    Map<Rule, NodeProcessor> opRules = new LinkedHashMap<Rule, NodeProcessor>();
+
+    String FS = FileSinkOperator.getOperatorName() + "%";
+
+    opRules.put(new RuleRegExp("Sorted Dynamic Partition Time Granularity", FS), getSortDynPartProc(pCtx));
+
+    Dispatcher disp = new DefaultRuleDispatcher(null, opRules, null);
+    GraphWalker ogw = new DefaultGraphWalker(disp);
+
+    ArrayList<Node> topNodes = new ArrayList<Node>();
+    topNodes.addAll(pCtx.getTopOps().values());
+    ogw.startWalking(topNodes, null);
+
+    return pCtx;
+  }
+
+  private NodeProcessor getSortDynPartProc(ParseContext pCtx) {
+    return new SortedDynamicPartitionProc(pCtx);
+  }
+
+  class SortedDynamicPartitionProc implements NodeProcessor {
+
+    private final Logger LOG = LoggerFactory.getLogger(SortedDynPartitionTimeGranularityOptimizer.class);
+    protected ParseContext parseCtx;
+
+    public SortedDynamicPartitionProc(ParseContext pCtx) {
+      this.parseCtx = pCtx;
+    }
+
+    @Override
+    public Object process(Node nd, Stack<Node> stack, NodeProcessorCtx procCtx,
+        Object... nodeOutputs) throws SemanticException {
+
+      // introduce RS and EX before FS
+      FileSinkOperator fsOp = (FileSinkOperator) nd;
+      final String sh = fsOp.getConf().getTableInfo().getOutputFileFormatClassName();
+      if (parseCtx.getQueryProperties().isQuery() || sh == null || !sh
+              .equals(Constants.DRUID_HIVE_OUTPUT_FORMAT)) {
+        // Bail out, nothing to do
+        return null;
+      }
+      String segmentGranularity = parseCtx.getCreateTable().getTblProps()
+              .get(Constants.DRUID_SEGMENT_GRANULARITY);
+      segmentGranularity = !Strings.isNullOrEmpty(segmentGranularity)
+              ? segmentGranularity
+              : HiveConf.getVar(parseCtx.getConf(),
+                      HiveConf.ConfVars.HIVE_DRUID_INDEXING_GRANULARITY
+              );
+      LOG.info("Sorted dynamic partitioning on time granularity optimization kicked in...");
+
+      // unlink connection between FS and its parent
+      Operator<? extends OperatorDesc> fsParent = fsOp.getParentOperators().get(0);
+      fsParent = fsOp.getParentOperators().get(0);
+      fsParent.getChildOperators().clear();
+
+      // Create SelectOp with granularity column
+      Operator<? extends OperatorDesc> granularitySelOp = getGranularitySelOp(fsParent,
+              segmentGranularity
+      );
+
+      // Create ReduceSinkOp operator
+      ArrayList<ColumnInfo> parentCols = Lists.newArrayList(granularitySelOp.getSchema().getSignature());
+      ArrayList<ExprNodeDesc> allRSCols = Lists.newArrayList();
+      for (ColumnInfo ci : parentCols) {
+        allRSCols.add(new ExprNodeColumnDesc(ci));
+      }
+      // Get the key positions
+      List<Integer> keyPositions = new ArrayList<>();
+      keyPositions.add(allRSCols.size() - 1);
+      List<Integer> sortOrder = new ArrayList<Integer>(1);
+      sortOrder.add(1); // asc
+      List<Integer> sortNullOrder = new ArrayList<Integer>(1);
+      sortNullOrder.add(0); // nulls first
+      ReduceSinkOperator rsOp = getReduceSinkOp(keyPositions, sortOrder,
+          sortNullOrder, allRSCols, granularitySelOp, fsOp.getConf().getWriteType());
+
+      // Create backtrack SelectOp
+      List<ExprNodeDesc> descs = new ArrayList<ExprNodeDesc>(allRSCols.size());
+      List<String> colNames = new ArrayList<String>();
+      String colName;
+      for (int i = 0; i < allRSCols.size(); i++) {
+        ExprNodeDesc col = allRSCols.get(i);
+        colName = col.getExprString();
+        colNames.add(colName);
+        if (keyPositions.contains(i)) {
+          descs.add(new ExprNodeColumnDesc(col.getTypeInfo(), ReduceField.KEY.toString()+"."+colName, null, false));
+        } else {
+          descs.add(new ExprNodeColumnDesc(col.getTypeInfo(), ReduceField.VALUE.toString()+"."+colName, null, false));
+        }
+      }
+      RowSchema selRS = new RowSchema(granularitySelOp.getSchema());
+      SelectDesc selConf = new SelectDesc(descs, colNames);
+      SelectOperator backtrackSelOp = (SelectOperator) OperatorFactory.getAndMakeChild(
+          selConf, selRS, rsOp);
+
+      // Link backtrack SelectOp to FileSinkOp
+      fsOp.getParentOperators().clear();
+      fsOp.getParentOperators().add(backtrackSelOp);
+      backtrackSelOp.getChildOperators().add(fsOp);
+
+      // Update file sink descriptor
+      fsOp.getConf().setDpSortState(FileSinkDesc.DPSortState.PARTITION_SORTED);
+      fsOp.getConf().setPartitionCols(rsOp.getConf().getPartitionCols());
+      ColumnInfo ci = new ColumnInfo(granularitySelOp.getSchema().getSignature().get(
+              granularitySelOp.getSchema().getSignature().size() - 1)); // granularity column
+      fsOp.getSchema().getSignature().add(ci);
+
+      LOG.info("Inserted " + granularitySelOp.getOperatorId() + ", " + rsOp.getOperatorId() + " and "
+          + backtrackSelOp.getOperatorId() + " as parent of " + fsOp.getOperatorId()
+          + " and child of " + fsParent.getOperatorId());
+
+      parseCtx.setReduceSinkAddedBySortedDynPartition(true);
+      return null;
+    }
+
+    private Operator<? extends OperatorDesc> getGranularitySelOp(
+            Operator<? extends OperatorDesc> fsParent, String segmentGranularity
+    ) throws SemanticException {
+      ArrayList<ColumnInfo> parentCols = Lists.newArrayList(fsParent.getSchema().getSignature());
+      ArrayList<ExprNodeDesc> descs = Lists.newArrayList();
+      List<String> colNames = Lists.newArrayList();
+      int timestampPos = -1;
+      for (int i = 0; i < parentCols.size(); i++) {
+        ColumnInfo ci = parentCols.get(i);
+        ExprNodeColumnDesc columnDesc = new ExprNodeColumnDesc(ci);
+        descs.add(columnDesc);
+        colNames.add(columnDesc.getExprString());
+        if (columnDesc.getTypeInfo().getCategory() == ObjectInspector.Category.PRIMITIVE
+                && ((PrimitiveTypeInfo) columnDesc.getTypeInfo()).getPrimitiveCategory() == PrimitiveCategory.TIMESTAMP) {
+          if (timestampPos != -1) {
+            throw new SemanticException("Multiple columns with timestamp type on query result; "
+                    + "could not resolve which one is the timestamp column");
+          }
+          timestampPos = i;
+        }
+      }
+      if (timestampPos == -1) {
+        throw new SemanticException("No column with timestamp type on query result; "
+                + "one column should be of timestamp type");
+      }
+      RowSchema selRS = new RowSchema(fsParent.getSchema());
+      // Granularity (partition) column
+      String udfName;
+
+      Class<? extends UDF> udfClass;
+      switch (segmentGranularity) {
+        case "YEAR":
+          udfName = "floor_year";
+          udfClass = UDFDateFloorYear.class;
+          break;
+        case "MONTH":
+          udfName = "floor_month";
+          udfClass = UDFDateFloorMonth.class;
+          break;
+        case "WEEK":
+          udfName = "floor_week";
+          udfClass = UDFDateFloorWeek.class;
+          break;
+        case "DAY":
+          udfName = "floor_day";
+          udfClass = UDFDateFloorDay.class;
+          break;
+        case "HOUR":
+          udfName = "floor_hour";
+          udfClass = UDFDateFloorHour.class;
+          break;
+        case "MINUTE":
+          udfName = "floor_minute";
+          udfClass = UDFDateFloorMinute.class;
+          break;
+        case "SECOND":
+          udfName = "floor_second";
+          udfClass = UDFDateFloorSecond.class;
+          break;
+        default:
+          throw new SemanticException("Granularity for Druid segment not recognized");
+      }
+      ExprNodeDesc expr = new ExprNodeColumnDesc(parentCols.get(timestampPos));
+      descs.add(new ExprNodeGenericFuncDesc(
+              TypeInfoFactory.timestampTypeInfo,
+              new GenericUDFBridge(udfName, false, udfClass.getName()),
+              Lists.newArrayList(expr)));
+      colNames.add(Constants.DRUID_TIMESTAMP_GRANULARITY_COL_NAME);
+      // Add granularity to the row schema
+      ColumnInfo ci = new ColumnInfo(Constants.DRUID_TIMESTAMP_GRANULARITY_COL_NAME, TypeInfoFactory.timestampTypeInfo,
+              selRS.getSignature().get(0).getTabAlias(), false, false);
+      selRS.getSignature().add(ci);
+
+      // Create SelectDesc
+      SelectDesc selConf = new SelectDesc(descs, colNames);
+
+      // Create Select Operator
+      SelectOperator selOp = (SelectOperator) OperatorFactory.getAndMakeChild(
+              selConf, selRS, fsParent);
+
+      return selOp;
+    }
+
+    private ReduceSinkOperator getReduceSinkOp(List<Integer> keyPositions, List<Integer> sortOrder,
+        List<Integer> sortNullOrder, ArrayList<ExprNodeDesc> allCols, Operator<? extends OperatorDesc> parent,
+        AcidUtils.Operation writeType) throws SemanticException {
+
+      ArrayList<ExprNodeDesc> keyCols = Lists.newArrayList();
+      // we will clone here as RS will update bucket column key with its
+      // corresponding with bucket number and hence their OIs
+      for (Integer idx : keyPositions) {
+        keyCols.add(allCols.get(idx).clone());
+      }
+
+      ArrayList<ExprNodeDesc> valCols = Lists.newArrayList();
+      for (int i = 0; i < allCols.size(); i++) {
+        if (!keyPositions.contains(i)) {
+          valCols.add(allCols.get(i).clone());
+        }
+      }
+
+      ArrayList<ExprNodeDesc> partCols = Lists.newArrayList();
+      for (Integer idx : keyPositions) {
+        partCols.add(allCols.get(idx).clone());
+      }
+
+      // map _col0 to KEY._col0, etc
+      Map<String, ExprNodeDesc> colExprMap = Maps.newHashMap();
+      Map<String, String> nameMapping = new HashMap<>();
+      ArrayList<String> keyColNames = Lists.newArrayList();
+      for (ExprNodeDesc keyCol : keyCols) {
+        String keyColName = keyCol.getExprString();
+        keyColNames.add(keyColName);
+        colExprMap.put(Utilities.ReduceField.KEY + "." +keyColName, keyCol);
+        nameMapping.put(keyColName, Utilities.ReduceField.KEY + "." + keyColName);
+      }
+      ArrayList<String> valColNames = Lists.newArrayList();
+      for (ExprNodeDesc valCol : valCols) {
+        String colName = valCol.getExprString();
+        valColNames.add(colName);
+        colExprMap.put(Utilities.ReduceField.VALUE + "." + colName, valCol);
+        nameMapping.put(colName, Utilities.ReduceField.VALUE + "." + colName);
+      }
+
+      // order and null order
+      String orderStr = StringUtils.repeat("+", sortOrder.size());
+      String nullOrderStr = StringUtils.repeat("a", sortNullOrder.size());
+
+      // Create Key/Value TableDesc. When the operator plan is split into MR tasks,
+      // the reduce operator will initialize Extract operator with information
+      // from Key and Value TableDesc
+      List<FieldSchema> fields = PlanUtils.getFieldSchemasFromColumnList(keyCols,
+          keyColNames, 0, "");
+      TableDesc keyTable = PlanUtils.getReduceKeyTableDesc(fields, orderStr, nullOrderStr);
+      List<FieldSchema> valFields = PlanUtils.getFieldSchemasFromColumnList(valCols,
+          valColNames, 0, "");
+      TableDesc valueTable = PlanUtils.getReduceValueTableDesc(valFields);
+      List<List<Integer>> distinctColumnIndices = Lists.newArrayList();
+
+      // Number of reducers is set to default (-1)
+      ReduceSinkDesc rsConf = new ReduceSinkDesc(keyCols, keyCols.size(), valCols,
+          keyColNames, distinctColumnIndices, valColNames, -1, partCols, -1, keyTable,
+          valueTable, writeType);
+
+      ArrayList<ColumnInfo> signature = new ArrayList<>();
+      for (int index = 0; index < parent.getSchema().getSignature().size(); index++) {
+        ColumnInfo colInfo = new ColumnInfo(parent.getSchema().getSignature().get(index));
+        colInfo.setInternalName(nameMapping.get(colInfo.getInternalName()));
+        signature.add(colInfo);
+      }
+      ReduceSinkOperator op = (ReduceSinkOperator) OperatorFactory.getAndMakeChild(
+          rsConf, new RowSchema(signature), parent);
+      op.setColumnExprMap(colExprMap);
+      return op;
+    }
+
+  }
+
+}

http://git-wip-us.apache.org/repos/asf/hive/blob/590687bc/ql/src/java/org/apache/hadoop/hive/ql/parse/SemanticAnalyzer.java
----------------------------------------------------------------------
diff --git a/ql/src/java/org/apache/hadoop/hive/ql/parse/SemanticAnalyzer.java b/ql/src/java/org/apache/hadoop/hive/ql/parse/SemanticAnalyzer.java
index def1a7d..7e6dcf3 100644
--- a/ql/src/java/org/apache/hadoop/hive/ql/parse/SemanticAnalyzer.java
+++ b/ql/src/java/org/apache/hadoop/hive/ql/parse/SemanticAnalyzer.java
@@ -11803,10 +11803,6 @@ public class SemanticAnalyzer extends BaseSemanticAnalyzer {
 
     storageFormat.fillDefaultStorageFormat(isExt, false);
 
-    if ((command_type == CTAS) && (storageFormat.getStorageHandler() != null)) {
-      throw new SemanticException(ErrorMsg.CREATE_NON_NATIVE_AS.getMsg());
-    }
-
     // check for existence of table
     if (ifNotExists) {
       try {

http://git-wip-us.apache.org/repos/asf/hive/blob/590687bc/ql/src/java/org/apache/hadoop/hive/ql/plan/PlanUtils.java
----------------------------------------------------------------------
diff --git a/ql/src/java/org/apache/hadoop/hive/ql/plan/PlanUtils.java b/ql/src/java/org/apache/hadoop/hive/ql/plan/PlanUtils.java
index 1c0ac1e..d3a1528 100644
--- a/ql/src/java/org/apache/hadoop/hive/ql/plan/PlanUtils.java
+++ b/ql/src/java/org/apache/hadoop/hive/ql/plan/PlanUtils.java
@@ -18,21 +18,11 @@
 
 package org.apache.hadoop.hive.ql.plan;
 
-import java.io.IOException;
-import java.util.ArrayList;
-import java.util.Collection;
-import java.util.Collections;
-import java.util.Comparator;
-import java.util.LinkedHashMap;
-import java.util.List;
-import java.util.Map;
-import java.util.Properties;
-import java.util.Set;
-
 import org.apache.hadoop.conf.Configuration;
 import org.apache.hadoop.hive.common.JavaUtils;
 import org.apache.hadoop.hive.conf.HiveConf;
 import org.apache.hadoop.hive.conf.HiveConf.ConfVars;
+import org.apache.hadoop.hive.llap.LlapOutputFormat;
 import org.apache.hadoop.hive.metastore.MetaStoreUtils;
 import org.apache.hadoop.hive.metastore.api.FieldSchema;
 import org.apache.hadoop.hive.metastore.api.hive_metastoreConstants;
@@ -46,7 +36,6 @@ import org.apache.hadoop.hive.ql.io.HiveOutputFormat;
 import org.apache.hadoop.hive.ql.io.IgnoreKeyTextOutputFormat;
 import org.apache.hadoop.hive.ql.io.RCFileInputFormat;
 import org.apache.hadoop.hive.ql.io.RCFileOutputFormat;
-import org.apache.hadoop.hive.llap.LlapOutputFormat;
 import org.apache.hadoop.hive.ql.metadata.Hive;
 import org.apache.hadoop.hive.ql.metadata.HiveException;
 import org.apache.hadoop.hive.ql.metadata.HiveStorageHandler;
@@ -71,12 +60,24 @@ import org.apache.hadoop.hive.serde2.typeinfo.TypeInfo;
 import org.apache.hadoop.hive.serde2.typeinfo.TypeInfoFactory;
 import org.apache.hadoop.mapred.InputFormat;
 import org.apache.hadoop.mapred.JobConf;
+import org.apache.hadoop.mapred.OutputFormat;
 import org.apache.hadoop.mapred.SequenceFileInputFormat;
 import org.apache.hadoop.mapred.SequenceFileOutputFormat;
 import org.apache.hadoop.mapred.TextInputFormat;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 
+import java.io.IOException;
+import java.util.ArrayList;
+import java.util.Collection;
+import java.util.Collections;
+import java.util.Comparator;
+import java.util.LinkedHashMap;
+import java.util.List;
+import java.util.Map;
+import java.util.Properties;
+import java.util.Set;
+
 /**
  * PlanUtils.
  *
@@ -306,17 +307,26 @@ public final class PlanUtils {
   public static TableDesc getTableDesc(CreateTableDesc crtTblDesc, String cols,
       String colTypes) {
 
-    Class<? extends Deserializer> serdeClass = LazySimpleSerDe.class;
-    String separatorCode = Integer.toString(Utilities.ctrlaCode);
-    String columns = cols;
-    String columnTypes = colTypes;
-    boolean lastColumnTakesRestOfTheLine = false;
     TableDesc ret;
 
+    // Resolve storage handler (if any)
     try {
-      if (crtTblDesc.getSerName() != null) {
-        Class c = JavaUtils.loadClass(crtTblDesc.getSerName());
-        serdeClass = c;
+      HiveStorageHandler storageHandler = null;
+      if (crtTblDesc.getStorageHandler() != null) {
+        storageHandler = HiveUtils.getStorageHandler(
+                SessionState.getSessionConf(), crtTblDesc.getStorageHandler());
+      }
+
+      Class<? extends Deserializer> serdeClass = LazySimpleSerDe.class;
+      String separatorCode = Integer.toString(Utilities.ctrlaCode);
+      String columns = cols;
+      String columnTypes = colTypes;
+      boolean lastColumnTakesRestOfTheLine = false;
+
+      if (storageHandler != null) {
+        serdeClass = storageHandler.getSerDeClass();
+      } else if (crtTblDesc.getSerName() != null) {
+        serdeClass = JavaUtils.loadClass(crtTblDesc.getSerName());
       }
 
       if (crtTblDesc.getFieldDelim() != null) {
@@ -329,6 +339,12 @@ public final class PlanUtils {
       // set other table properties
       Properties properties = ret.getProperties();
 
+      if (crtTblDesc.getStorageHandler() != null) {
+        properties.setProperty(
+                org.apache.hadoop.hive.metastore.api.hive_metastoreConstants.META_TABLE_STORAGE,
+                crtTblDesc.getStorageHandler());
+      }
+
       if (crtTblDesc.getCollItemDelim() != null) {
         properties.setProperty(serdeConstants.COLLECTION_DELIM, crtTblDesc
             .getCollItemDelim());
@@ -367,15 +383,24 @@ public final class PlanUtils {
 
       // replace the default input & output file format with those found in
       // crtTblDesc
-      Class c1 = JavaUtils.loadClass(crtTblDesc.getInputFormat());
-      Class c2 = JavaUtils.loadClass(crtTblDesc.getOutputFormat());
-      Class<? extends InputFormat> in_class = c1;
-      Class<? extends HiveOutputFormat> out_class = c2;
-
+      Class<? extends InputFormat> in_class;
+      if (storageHandler != null) {
+        in_class = storageHandler.getInputFormatClass();
+      } else {
+        in_class = JavaUtils.loadClass(crtTblDesc.getInputFormat());
+      }
+      Class<? extends OutputFormat> out_class;
+      if (storageHandler != null) {
+        out_class = storageHandler.getOutputFormatClass();
+      } else {
+        out_class = JavaUtils.loadClass(crtTblDesc.getOutputFormat());
+      }
       ret.setInputFileFormatClass(in_class);
       ret.setOutputFileFormatClass(out_class);
     } catch (ClassNotFoundException e) {
       throw new RuntimeException("Unable to find class in getTableDesc: " + e.getMessage(), e);
+    } catch (HiveException e) {
+      throw new RuntimeException("Error loading storage handler in getTableDesc: " + e.getMessage(), e);
     }
     return ret;
   }

http://git-wip-us.apache.org/repos/asf/hive/blob/590687bc/ql/src/java/org/apache/hadoop/hive/ql/plan/TableDesc.java
----------------------------------------------------------------------
diff --git a/ql/src/java/org/apache/hadoop/hive/ql/plan/TableDesc.java b/ql/src/java/org/apache/hadoop/hive/ql/plan/TableDesc.java
index 4f053d8..63dc94c 100644
--- a/ql/src/java/org/apache/hadoop/hive/ql/plan/TableDesc.java
+++ b/ql/src/java/org/apache/hadoop/hive/ql/plan/TableDesc.java
@@ -18,12 +18,6 @@
 
 package org.apache.hadoop.hive.ql.plan;
 
-import java.io.Serializable;
-import java.util.Enumeration;
-import java.util.LinkedHashMap;
-import java.util.Map;
-import java.util.Properties;
-
 import org.apache.hadoop.conf.Configuration;
 import org.apache.hadoop.hive.metastore.api.hive_metastoreConstants;
 import org.apache.hadoop.hive.ql.exec.Utilities;
@@ -34,15 +28,25 @@ import org.apache.hadoop.hive.serde2.Deserializer;
 import org.apache.hadoop.hive.serde2.SerDeUtils;
 import org.apache.hadoop.mapred.InputFormat;
 import org.apache.hadoop.mapred.OutputFormat;
-
 import org.apache.hive.common.util.HiveStringUtils;
 import org.apache.hive.common.util.ReflectionUtil;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import java.io.Serializable;
+import java.util.Enumeration;
+import java.util.LinkedHashMap;
+import java.util.Map;
+import java.util.Properties;
 
 /**
  * TableDesc.
  *
  */
 public class TableDesc implements Serializable, Cloneable {
+
+  private static final Logger LOG = LoggerFactory.getLogger(TableDesc.class);
+
   private static final long serialVersionUID = 1L;
   private Class<? extends InputFormat> inputFileFormatClass;
   private Class<? extends OutputFormat> outputFileFormatClass;

http://git-wip-us.apache.org/repos/asf/hive/blob/590687bc/ql/src/test/queries/clientnegative/druid_external.q
----------------------------------------------------------------------
diff --git a/ql/src/test/queries/clientnegative/druid_external.q b/ql/src/test/queries/clientnegative/druid_external.q
deleted file mode 100644
index 2de04db..0000000
--- a/ql/src/test/queries/clientnegative/druid_external.q
+++ /dev/null
@@ -1,5 +0,0 @@
-set hive.druid.broker.address.default=localhost.test;
-
-CREATE TABLE druid_table_1
-STORED BY 'org.apache.hadoop.hive.druid.QTestDruidStorageHandler'
-TBLPROPERTIES ("druid.datasource" = "wikipedia");

http://git-wip-us.apache.org/repos/asf/hive/blob/590687bc/ql/src/test/results/clientnegative/druid_external.q.out
----------------------------------------------------------------------
diff --git a/ql/src/test/results/clientnegative/druid_external.q.out b/ql/src/test/results/clientnegative/druid_external.q.out
deleted file mode 100644
index e5fac51..0000000
--- a/ql/src/test/results/clientnegative/druid_external.q.out
+++ /dev/null
@@ -1,7 +0,0 @@
-PREHOOK: query: CREATE TABLE druid_table_1
-STORED BY 'org.apache.hadoop.hive.druid.QTestDruidStorageHandler'
-TBLPROPERTIES ("druid.datasource" = "wikipedia")
-PREHOOK: type: CREATETABLE
-PREHOOK: Output: database:default
-PREHOOK: Output: default@druid_table_1
-FAILED: Execution Error, return code 1 from org.apache.hadoop.hive.ql.exec.DDLTask. MetaException(message:Table in Druid needs to be declared as EXTERNAL)

http://git-wip-us.apache.org/repos/asf/hive/blob/590687bc/ql/src/test/results/clientpositive/druid_basic2.q.out
----------------------------------------------------------------------
diff --git a/ql/src/test/results/clientpositive/druid_basic2.q.out b/ql/src/test/results/clientpositive/druid_basic2.q.out
index 5c4359b..48de99a 100644
--- a/ql/src/test/results/clientpositive/druid_basic2.q.out
+++ b/ql/src/test/results/clientpositive/druid_basic2.q.out
@@ -269,8 +269,8 @@ STAGE PLANS:
 #### A masked pattern was here ####
           Partition
             base file name: druid_table_1
-            input format: org.apache.hadoop.hive.druid.HiveDruidQueryBasedInputFormat
-            output format: org.apache.hadoop.hive.druid.HiveDruidOutputFormat
+            input format: org.apache.hadoop.hive.druid.io.DruidQueryBasedInputFormat
+            output format: org.apache.hadoop.hive.druid.io.DruidOutputFormat
             properties:
               COLUMN_STATS_ACCURATE {"BASIC_STATS":"true"}
               EXTERNAL TRUE
@@ -294,8 +294,8 @@ STAGE PLANS:
 #### A masked pattern was here ####
             serde: org.apache.hadoop.hive.druid.QTestDruidSerDe
           
-              input format: org.apache.hadoop.hive.druid.HiveDruidQueryBasedInputFormat
-              output format: org.apache.hadoop.hive.druid.HiveDruidOutputFormat
+              input format: org.apache.hadoop.hive.druid.io.DruidQueryBasedInputFormat
+              output format: org.apache.hadoop.hive.druid.io.DruidOutputFormat
               properties:
                 COLUMN_STATS_ACCURATE {"BASIC_STATS":"true"}
                 EXTERNAL TRUE
@@ -435,8 +435,8 @@ STAGE PLANS:
 #### A masked pattern was here ####
           Partition
             base file name: druid_table_1
-            input format: org.apache.hadoop.hive.druid.HiveDruidQueryBasedInputFormat
-            output format: org.apache.hadoop.hive.druid.HiveDruidOutputFormat
+            input format: org.apache.hadoop.hive.druid.io.DruidQueryBasedInputFormat
+            output format: org.apache.hadoop.hive.druid.io.DruidOutputFormat
             properties:
               COLUMN_STATS_ACCURATE {"BASIC_STATS":"true"}
               EXTERNAL TRUE
@@ -460,8 +460,8 @@ STAGE PLANS:
 #### A masked pattern was here ####
             serde: org.apache.hadoop.hive.druid.QTestDruidSerDe
           
-              input format: org.apache.hadoop.hive.druid.HiveDruidQueryBasedInputFormat
-              output format: org.apache.hadoop.hive.druid.HiveDruidOutputFormat
+              input format: org.apache.hadoop.hive.druid.io.DruidQueryBasedInputFormat
+              output format: org.apache.hadoop.hive.druid.io.DruidOutputFormat
               properties:
                 COLUMN_STATS_ACCURATE {"BASIC_STATS":"true"}
                 EXTERNAL TRUE


[4/4] hive git commit: HIVE-15277: Teach Hive how to create/delete Druid segments (Slim Bouguerra, reviewed by Jesus Camacho Rodriguez)

Posted by jc...@apache.org.
HIVE-15277: Teach Hive how to create/delete Druid segments (Slim Bouguerra, reviewed by Jesus Camacho Rodriguez)


Project: http://git-wip-us.apache.org/repos/asf/hive/repo
Commit: http://git-wip-us.apache.org/repos/asf/hive/commit/590687bc
Tree: http://git-wip-us.apache.org/repos/asf/hive/tree/590687bc
Diff: http://git-wip-us.apache.org/repos/asf/hive/diff/590687bc

Branch: refs/heads/master
Commit: 590687bc4cb97cdf4aa95ba94f28949986d1b3e8
Parents: 89362a1
Author: Slim Bouguerra <sl...@gmail.com>
Authored: Thu Dec 15 20:43:45 2016 +0000
Committer: Jesus Camacho Rodriguez <jc...@apache.org>
Committed: Thu Dec 15 20:43:45 2016 +0000

----------------------------------------------------------------------
 .../hadoop/hive/common/JvmPauseMonitor.java     |   3 +-
 .../org/apache/hadoop/hive/conf/Constants.java  |   7 +
 .../org/apache/hadoop/hive/conf/HiveConf.java   |  38 +-
 druid-handler/README.md                         |   3 +
 druid-handler/pom.xml                           | 145 +++-
 .../hadoop/hive/druid/DruidStorageHandler.java  | 311 +++++++-
 .../hive/druid/DruidStorageHandlerUtils.java    | 351 ++++++++-
 .../hive/druid/HiveDruidOutputFormat.java       |  55 --
 .../druid/HiveDruidQueryBasedInputFormat.java   | 376 ----------
 .../hadoop/hive/druid/HiveDruidSplit.java       |  83 ---
 .../hadoop/hive/druid/io/DruidOutputFormat.java | 204 ++++++
 .../druid/io/DruidQueryBasedInputFormat.java    | 397 ++++++++++
 .../hadoop/hive/druid/io/DruidRecordWriter.java | 260 +++++++
 .../hadoop/hive/druid/io/HiveDruidSplit.java    |  84 +++
 .../serde/DruidGroupByQueryRecordReader.java    |  15 +-
 .../druid/serde/DruidQueryRecordReader.java     |  42 +-
 .../serde/DruidSelectQueryRecordReader.java     |   8 +-
 .../hadoop/hive/druid/serde/DruidSerDe.java     | 286 +++++---
 .../hive/druid/serde/DruidSerDeUtils.java       |   6 +-
 .../serde/DruidTimeseriesQueryRecordReader.java |   7 +-
 .../druid/serde/DruidTopNQueryRecordReader.java |   8 +-
 .../hive/druid/DruidStorageHandlerTest.java     | 181 +++++
 .../hadoop/hive/druid/QTestDruidSerDe.java      |  60 +-
 .../hadoop/hive/druid/TestDerbyConnector.java   | 108 +++
 .../hadoop/hive/druid/TestDruidSerDe.java       | 731 ++++++++++---------
 .../TestHiveDruidQueryBasedInputFormat.java     |  91 ++-
 .../hive/ql/io/DruidRecordWriterTest.java       | 221 ++++++
 .../llap/daemon/impl/TaskRunnerCallable.java    |  45 +-
 pom.xml                                         |   2 +-
 .../hadoop/hive/ql/exec/FileSinkOperator.java   |  29 +-
 .../apache/hadoop/hive/ql/exec/Utilities.java   | 113 ++-
 .../hadoop/hive/ql/hooks/LineageLogger.java     |  31 +-
 .../hadoop/hive/ql/optimizer/Optimizer.java     |   2 +
 ...tedDynPartitionTimeGranularityOptimizer.java | 363 +++++++++
 .../hadoop/hive/ql/parse/SemanticAnalyzer.java  |   4 -
 .../apache/hadoop/hive/ql/plan/PlanUtils.java   |  75 +-
 .../apache/hadoop/hive/ql/plan/TableDesc.java   |  18 +-
 .../queries/clientnegative/druid_external.q     |   5 -
 .../results/clientnegative/druid_external.q.out |   7 -
 .../results/clientpositive/druid_basic2.q.out   |  16 +-
 40 files changed, 3534 insertions(+), 1257 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/hive/blob/590687bc/common/src/java/org/apache/hadoop/hive/common/JvmPauseMonitor.java
----------------------------------------------------------------------
diff --git a/common/src/java/org/apache/hadoop/hive/common/JvmPauseMonitor.java b/common/src/java/org/apache/hadoop/hive/common/JvmPauseMonitor.java
index 5d475f4..cf080e3 100644
--- a/common/src/java/org/apache/hadoop/hive/common/JvmPauseMonitor.java
+++ b/common/src/java/org/apache/hadoop/hive/common/JvmPauseMonitor.java
@@ -37,6 +37,7 @@ import java.lang.management.ManagementFactory;
 import java.util.List;
 import java.util.Map;
 import java.util.Set;
+import java.util.concurrent.TimeUnit;
 
 /**
  * Based on the JvmPauseMonitor from Hadoop.
@@ -181,7 +182,7 @@ public class JvmPauseMonitor {
         } catch (InterruptedException ie) {
           return;
         }
-        long extraSleepTime = sw.elapsedMillis() - SLEEP_INTERVAL_MS;
+        long extraSleepTime = sw.elapsed(TimeUnit.MILLISECONDS) - SLEEP_INTERVAL_MS;
         Map<String, GcTimes> gcTimesAfterSleep = getGcTimes();
 
         if (extraSleepTime > warnThresholdMs) {

http://git-wip-us.apache.org/repos/asf/hive/blob/590687bc/common/src/java/org/apache/hadoop/hive/conf/Constants.java
----------------------------------------------------------------------
diff --git a/common/src/java/org/apache/hadoop/hive/conf/Constants.java b/common/src/java/org/apache/hadoop/hive/conf/Constants.java
index 6c42163..ea7864a 100644
--- a/common/src/java/org/apache/hadoop/hive/conf/Constants.java
+++ b/common/src/java/org/apache/hadoop/hive/conf/Constants.java
@@ -26,10 +26,17 @@ public class Constants {
   /* Constants for Druid storage handler */
   public static final String DRUID_HIVE_STORAGE_HANDLER_ID =
           "org.apache.hadoop.hive.druid.DruidStorageHandler";
+  public static final String DRUID_HIVE_OUTPUT_FORMAT =
+          "org.apache.hadoop.hive.druid.io.DruidOutputFormat";
   public static final String DRUID_DATA_SOURCE = "druid.datasource";
+  public static final String DRUID_SEGMENT_GRANULARITY = "druid.segment.granularity";
+  public static final String DRUID_TIMESTAMP_GRANULARITY_COL_NAME = "__time_granularity";
   public static final String DRUID_QUERY_JSON = "druid.query.json";
   public static final String DRUID_QUERY_TYPE = "druid.query.type";
   public static final String DRUID_QUERY_FETCH = "druid.query.fetch";
+  public static final String DRUID_SEGMENT_DIRECTORY = "druid.storage.storageDirectory";
+  public static final String DRUID_SEGMENT_VERSION = "druid.segment.version";
+  public static final String DRUID_JOB_WORKING_DIRECTORY = "druid.job.workingDirectory";
 
   public static final String HIVE_SERVER2_JOB_CREDSTORE_PASSWORD_ENVVAR = "HIVE_JOB_CREDSTORE_PASSWORD";
   public static final String HADOOP_CREDENTIAL_PASSWORD_ENVVAR = "HADOOP_CREDSTORE_PASSWORD";

http://git-wip-us.apache.org/repos/asf/hive/blob/590687bc/common/src/java/org/apache/hadoop/hive/conf/HiveConf.java
----------------------------------------------------------------------
diff --git a/common/src/java/org/apache/hadoop/hive/conf/HiveConf.java b/common/src/java/org/apache/hadoop/hive/conf/HiveConf.java
index 032ff0c..dcb383d 100644
--- a/common/src/java/org/apache/hadoop/hive/conf/HiveConf.java
+++ b/common/src/java/org/apache/hadoop/hive/conf/HiveConf.java
@@ -1936,9 +1936,21 @@ public class HiveConf extends Configuration {
       new TimeValidator(TimeUnit.MILLISECONDS), "Frequency of WriteSet reaper runs"),
 
     // For Druid storage handler
+    HIVE_DRUID_INDEXING_GRANULARITY("hive.druid.indexer.segments.granularity", "DAY",
+            new PatternSet("YEAR", "MONTH", "WEEK", "DAY", "HOUR", "MINUTE", "SECOND"),
+            "Granularity for the segments created by the Druid storage handler"
+    ),
+    HIVE_DRUID_MAX_PARTITION_SIZE("hive.druid.indexer.partition.size.max", 5000000,
+            "Maximum number of records per segment partition"
+    ),
+    HIVE_DRUID_MAX_ROW_IN_MEMORY("hive.druid.indexer.memory.rownum.max", 75000,
+            "Maximum number of records in memory while storing data in Druid"
+    ),
     HIVE_DRUID_BROKER_DEFAULT_ADDRESS("hive.druid.broker.address.default", "localhost:8082",
-        "Address of the Druid broker. If we are querying Druid from Hive, this address needs to be\n" +
-        "declared"),
+            "Address of the Druid broker. If we are querying Druid from Hive, this address needs to be\n"
+                    +
+                    "declared"
+    ),
     HIVE_DRUID_SELECT_THRESHOLD("hive.druid.select.threshold", 10000,
         "When we can split a Select query, this is the maximum number of rows that we try to retrieve\n" +
         "per query. In order to do that, we obtain the estimated size for the complete result. If the\n" +
@@ -1949,7 +1961,27 @@ public class HiveConf extends Configuration {
         "the HTTP client."),
     HIVE_DRUID_HTTP_READ_TIMEOUT("hive.druid.http.read.timeout", "PT1M", "Read timeout period for the HTTP\n" +
         "client in ISO8601 format (for example P2W, P3M, PT1H30M, PT0.750S), default is period of 1 minute."),
-
+    HIVE_DRUID_BASE_PERSIST_DIRECTORY("hive.druid.basePersistDirectory", "/tmp",
+            "Local temporary directory used to persist intermediate indexing state."
+    ),
+    DRUID_SEGMENT_DIRECTORY("hive.druid.storage.storageDirectory", "/druid/segments"
+            , "druid deep storage location."),
+    DRUID_METADATA_BASE("hive.druid.metadata.base", "druid", "Default prefix for metadata tables"),
+    DRUID_METADATA_DB_TYPE("hive.druid.metadata.db.type", "mysql",
+            new PatternSet("mysql", "postgres"), "Type of the metadata database."
+    ),
+    DRUID_METADATA_DB_USERNAME("hive.druid.metadata.username", "",
+            "Username to connect to Type of the metadata DB."
+    ),
+    DRUID_METADATA_DB_PASSWORD("hive.druid.metadata.password", "",
+            "Password to connect to Type of the metadata DB."
+    ),
+    DRUID_METADATA_DB_URI("hive.druid.metadata.uri", "",
+            "URI to connect to the database (for example jdbc:mysql://hostname:port/DBName)."
+    ),
+    DRUID_WORKING_DIR("hive.druid.working.directory", "/tmp/workingDirectory",
+            "Default hdfs working directory used to store some intermediate metadata"
+    ),
     // For HBase storage handler
     HIVE_HBASE_WAL_ENABLED("hive.hbase.wal.enabled", true,
         "Whether writes to HBase should be forced to the write-ahead log. \n" +

http://git-wip-us.apache.org/repos/asf/hive/blob/590687bc/druid-handler/README.md
----------------------------------------------------------------------
diff --git a/druid-handler/README.md b/druid-handler/README.md
new file mode 100644
index 0000000..b548567
--- /dev/null
+++ b/druid-handler/README.md
@@ -0,0 +1,3 @@
+# Druid Storage Handler
+
+[Link for documentation]( https://cwiki.apache.org/confluence/display/Hive/Druid+Integration) 

http://git-wip-us.apache.org/repos/asf/hive/blob/590687bc/druid-handler/pom.xml
----------------------------------------------------------------------
diff --git a/druid-handler/pom.xml b/druid-handler/pom.xml
index e4fa8fd..f691a2c 100644
--- a/druid-handler/pom.xml
+++ b/druid-handler/pom.xml
@@ -29,6 +29,8 @@
 
   <properties>
     <hive.path.to.root>..</hive.path.to.root>
+    <druid.metamx.util.version>0.27.10</druid.metamx.util.version>
+    <druid.guava.version>16.0.1</druid.guava.version>
   </properties>
 
   <dependencies>
@@ -47,6 +49,10 @@
           <groupId>io.netty</groupId>
           <artifactId>netty</artifactId>
         </exclusion>
+        <exclusion>
+          <groupId>com.google.guava</groupId>
+          <artifactId>guava</artifactId>
+        </exclusion>
       </exclusions>
     </dependency>
     <!-- inter-project -->
@@ -56,37 +62,47 @@
       <version>${commons-lang.version}</version>
     </dependency>
     <dependency>
-      <groupId>org.apache.hadoop</groupId>
-      <artifactId>hadoop-common</artifactId>
-      <version>${hadoop.version}</version>
-      <optional>true</optional>
-        <exclusions>
-          <exclusion>
-            <groupId>org.slf4j</groupId>
-            <artifactId>slf4j-log4j12</artifactId>
-          </exclusion>
-          <exclusion>
-            <groupId>commmons-logging</groupId>
-            <artifactId>commons-logging</artifactId>
-          </exclusion>
-      </exclusions>
+      <groupId>com.google.guava</groupId>
+      <artifactId>guava</artifactId>
+      <version>${druid.guava.version}</version>
     </dependency>
     <dependency>
-      <groupId>org.apache.hadoop</groupId>
-      <artifactId>hadoop-mapreduce-client-core</artifactId>
-      <version>${hadoop.version}</version>
-      <optional>true</optional>
+      <groupId>com.metamx</groupId>
+      <artifactId>java-util</artifactId>
+      <version>${druid.metamx.util.version}</version>
       <exclusions>
         <exclusion>
-          <groupId>io.netty</groupId>
-          <artifactId>netty</artifactId>
+          <groupId>com.fasterxml.jackson.core</groupId>
+          <artifactId>jackson-core</artifactId>
+        </exclusion>
+        <exclusion>
+          <groupId>com.fasterxml.jackson.core</groupId>
+          <artifactId>jackson-annotations</artifactId>
+        </exclusion>
+        <exclusion>
+          <groupId>com.fasterxml.jackson.core</groupId>
+          <artifactId>jackson-databind</artifactId>
         </exclusion>
       </exclusions>
     </dependency>
     <dependency>
-      <groupId>org.apache.calcite</groupId>
-      <artifactId>calcite-druid</artifactId>
-      <version>${calcite.version}</version>
+      <groupId>io.druid</groupId>
+      <artifactId>druid-server</artifactId>
+      <version>${druid.version}</version>
+      <exclusions>
+        <exclusion>
+          <groupId>com.fasterxml.jackson.core</groupId>
+          <artifactId>jackson-core</artifactId>
+        </exclusion>
+        <exclusion>
+          <groupId>com.fasterxml.jackson.core</groupId>
+          <artifactId>jackson-annotations</artifactId>
+        </exclusion>
+        <exclusion>
+          <groupId>com.fasterxml.jackson.core</groupId>
+          <artifactId>jackson-databind</artifactId>
+        </exclusion>
+      </exclusions>
     </dependency>
     <dependency>
       <groupId>io.druid</groupId>
@@ -107,7 +123,72 @@
         </exclusion>
       </exclusions>
     </dependency>
-
+    <dependency>
+      <groupId>io.druid.extensions</groupId>
+      <artifactId>druid-hdfs-storage</artifactId>
+      <version>${druid.version}</version>
+    </dependency>
+    <dependency>
+      <groupId>io.druid.extensions</groupId>
+      <artifactId>mysql-metadata-storage</artifactId>
+      <version>${druid.version}</version>
+    </dependency>
+    <dependency>
+      <groupId>io.druid.extensions</groupId>
+      <artifactId>postgresql-metadata-storage</artifactId>
+      <version>${druid.version}</version>
+    </dependency>
+    <dependency>
+      <groupId>org.apache.hadoop</groupId>
+      <artifactId>hadoop-common</artifactId>
+      <scope>provided</scope>
+      <version>${hadoop.version}</version>
+      <optional>true</optional>
+      <exclusions>
+        <exclusion>
+          <groupId>org.slf4j</groupId>
+          <artifactId>slf4j-log4j12</artifactId>
+        </exclusion>
+        <exclusion>
+          <groupId>com.google.guava</groupId>
+          <artifactId>guava</artifactId>
+        </exclusion>
+        <exclusion>
+          <groupId>commmons-logging</groupId>
+          <artifactId>commons-logging</artifactId>
+        </exclusion>
+      </exclusions>
+    </dependency>
+    <dependency>
+      <groupId>org.apache.hadoop</groupId>
+      <scope>provided</scope>
+      <artifactId>hadoop-mapreduce-client-core</artifactId>
+      <version>${hadoop.version}</version>
+      <optional>true</optional>
+      <exclusions>
+        <exclusion>
+          <groupId>com.google.inject</groupId>
+          <artifactId>guice</artifactId>
+        </exclusion>
+        <exclusion>
+          <groupId>com.google.inject.extensions</groupId>
+          <artifactId>guice-servlet</artifactId>
+        </exclusion>
+        <exclusion>
+          <groupId>io.netty</groupId>
+          <artifactId>netty</artifactId>
+        </exclusion>
+        <exclusion>
+          <groupId>com.google.guava</groupId>
+          <artifactId>guava</artifactId>
+        </exclusion>
+      </exclusions>
+    </dependency>
+    <dependency>
+      <groupId>org.apache.calcite</groupId>
+      <artifactId>calcite-druid</artifactId>
+      <version>${calcite.version}</version>
+    </dependency>
     <!-- test inter-project -->
     <dependency>
       <groupId>junit</groupId>
@@ -115,6 +196,12 @@
       <version>${junit.version}</version>
       <scope>test</scope>
     </dependency>
+    <dependency>
+      <groupId>io.druid</groupId>
+      <artifactId>druid-indexing-hadoop</artifactId>
+      <version>${druid.version}</version>
+      <scope>test</scope>
+    </dependency>
   </dependencies>
 
   <build>
@@ -170,15 +257,25 @@
                   <pattern>com.fasterxml.jackson</pattern>
                   <shadedPattern>org.apache.hive.druid.com.fasterxml.jackson</shadedPattern>
                 </relocation>
+                <relocation>
+                  <pattern>com.google.common</pattern>
+                  <shadedPattern>org.apache.hive.druid.com.google.common</shadedPattern>
+                </relocation>
               </relocations>
               <artifactSet>
                 <includes>
                   <include>io.druid:*</include>
+                  <include>io.druid.extensions:*</include>
                   <include>com.metamx:*</include>
                   <include>io.netty:*</include>
                   <include>com.fasterxml.jackson.core:*</include>
                   <include>com.fasterxml.jackson.datatype:*</include>
                   <include>com.fasterxml.jackson.dataformat:*</include>
+                  <include>com.google.guava:*</include>
+                  <include>it.unimi.dsi:*</include>
+                  <include>org.jdbi:*</include>
+                  <include>net.jpountz.lz4:*</include>
+                  <include>org.apache.commons:*</include>
                 </includes>
               </artifactSet>
               <filters>

http://git-wip-us.apache.org/repos/asf/hive/blob/590687bc/druid-handler/src/java/org/apache/hadoop/hive/druid/DruidStorageHandler.java
----------------------------------------------------------------------
diff --git a/druid-handler/src/java/org/apache/hadoop/hive/druid/DruidStorageHandler.java b/druid-handler/src/java/org/apache/hadoop/hive/druid/DruidStorageHandler.java
index 8242385..a08a4e3 100644
--- a/druid-handler/src/java/org/apache/hadoop/hive/druid/DruidStorageHandler.java
+++ b/druid-handler/src/java/org/apache/hadoop/hive/druid/DruidStorageHandler.java
@@ -6,9 +6,9 @@
  * to you under the Apache License, Version 2.0 (the
  * "License"); you may not use this file except in compliance
  * with the License.  You may obtain a copy of the License at
- *
- *     http://www.apache.org/licenses/LICENSE-2.0
- *
+ * <p>
+ * http://www.apache.org/licenses/LICENSE-2.0
+ * <p>
  * Unless required by applicable law or agreed to in writing, software
  * distributed under the License is distributed on an "AS IS" BASIS,
  * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
@@ -17,36 +17,131 @@
  */
 package org.apache.hadoop.hive.druid;
 
+import com.google.common.annotations.VisibleForTesting;
+import com.google.common.base.Preconditions;
+import com.google.common.base.Strings;
+import com.google.common.base.Supplier;
+import com.google.common.base.Suppliers;
+import com.google.common.base.Throwables;
+import io.druid.indexer.SQLMetadataStorageUpdaterJobHandler;
+import io.druid.metadata.MetadataStorageConnectorConfig;
+import io.druid.metadata.MetadataStorageTablesConfig;
+import io.druid.metadata.SQLMetadataConnector;
+import io.druid.metadata.storage.mysql.MySQLConnector;
+import io.druid.metadata.storage.postgresql.PostgreSQLConnector;
+import io.druid.segment.loading.SegmentLoadingException;
+import io.druid.timeline.DataSegment;
 import org.apache.commons.lang3.StringUtils;
+import org.apache.hadoop.fs.FileSystem;
+import org.apache.hadoop.fs.Path;
 import org.apache.hadoop.hive.conf.Constants;
+import org.apache.hadoop.hive.conf.HiveConf;
+import org.apache.hadoop.hive.druid.io.DruidOutputFormat;
+import org.apache.hadoop.hive.druid.io.DruidQueryBasedInputFormat;
 import org.apache.hadoop.hive.druid.serde.DruidSerDe;
 import org.apache.hadoop.hive.metastore.HiveMetaHook;
 import org.apache.hadoop.hive.metastore.MetaStoreUtils;
 import org.apache.hadoop.hive.metastore.api.MetaException;
 import org.apache.hadoop.hive.metastore.api.Table;
 import org.apache.hadoop.hive.ql.metadata.DefaultStorageHandler;
+import org.apache.hadoop.hive.ql.plan.TableDesc;
+import org.apache.hadoop.hive.ql.session.SessionState;
 import org.apache.hadoop.hive.serde2.AbstractSerDe;
 import org.apache.hadoop.mapred.InputFormat;
 import org.apache.hadoop.mapred.OutputFormat;
+import org.joda.time.DateTime;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 
+import java.io.IOException;
+import java.util.Collection;
+import java.util.List;
+import java.util.Map;
+
 /**
  * DruidStorageHandler provides a HiveStorageHandler implementation for Druid.
  */
-@SuppressWarnings({"deprecation","rawtypes"})
+@SuppressWarnings({ "deprecation", "rawtypes" })
 public class DruidStorageHandler extends DefaultStorageHandler implements HiveMetaHook {
 
   protected static final Logger LOG = LoggerFactory.getLogger(DruidStorageHandler.class);
 
+  public static final String SEGMENTS_DESCRIPTOR_DIR_NAME = "segmentsDescriptorDir";
+
+  private final SQLMetadataConnector connector;
+
+  private final SQLMetadataStorageUpdaterJobHandler druidSqlMetadataStorageUpdaterJobHandler;
+
+  private final MetadataStorageTablesConfig druidMetadataStorageTablesConfig;
+
+  private String uniqueId = null;
+
+  private String rootWorkingDir = null;
+
+  public DruidStorageHandler() {
+    //this is the default value in druid
+    final String base = HiveConf
+            .getVar(SessionState.getSessionConf(), HiveConf.ConfVars.DRUID_METADATA_BASE);
+    final String dbType = HiveConf
+            .getVar(SessionState.getSessionConf(), HiveConf.ConfVars.DRUID_METADATA_DB_TYPE);
+    final String username = HiveConf
+            .getVar(SessionState.getSessionConf(), HiveConf.ConfVars.DRUID_METADATA_DB_USERNAME);
+    final String password = HiveConf
+            .getVar(SessionState.getSessionConf(), HiveConf.ConfVars.DRUID_METADATA_DB_PASSWORD);
+    final String uri = HiveConf
+            .getVar(SessionState.getSessionConf(), HiveConf.ConfVars.DRUID_METADATA_DB_URI);
+    druidMetadataStorageTablesConfig = MetadataStorageTablesConfig.fromBase(base);
+
+    final Supplier<MetadataStorageConnectorConfig> storageConnectorConfigSupplier = Suppliers.<MetadataStorageConnectorConfig>ofInstance(
+            new MetadataStorageConnectorConfig() {
+              @Override
+              public String getConnectURI() {
+                return uri;
+              }
+
+              @Override
+              public String getUser() {
+                return username;
+              }
+
+              @Override
+              public String getPassword() {
+                return password;
+              }
+            });
+
+    if (dbType.equals("mysql")) {
+      connector = new MySQLConnector(storageConnectorConfigSupplier,
+              Suppliers.ofInstance(druidMetadataStorageTablesConfig)
+      );
+    } else if (dbType.equals("postgres")) {
+      connector = new PostgreSQLConnector(storageConnectorConfigSupplier,
+              Suppliers.ofInstance(druidMetadataStorageTablesConfig)
+      );
+    } else {
+      throw new IllegalStateException(String.format("Unknown metadata storage type [%s]", dbType));
+    }
+    druidSqlMetadataStorageUpdaterJobHandler = new SQLMetadataStorageUpdaterJobHandler(connector);
+  }
+
+  @VisibleForTesting
+  public DruidStorageHandler(SQLMetadataConnector connector,
+          SQLMetadataStorageUpdaterJobHandler druidSqlMetadataStorageUpdaterJobHandler,
+          MetadataStorageTablesConfig druidMetadataStorageTablesConfig
+  ) {
+    this.connector = connector;
+    this.druidSqlMetadataStorageUpdaterJobHandler = druidSqlMetadataStorageUpdaterJobHandler;
+    this.druidMetadataStorageTablesConfig = druidMetadataStorageTablesConfig;
+  }
+
   @Override
   public Class<? extends InputFormat> getInputFormatClass() {
-    return HiveDruidQueryBasedInputFormat.class;
+    return DruidQueryBasedInputFormat.class;
   }
 
   @Override
   public Class<? extends OutputFormat> getOutputFormatClass() {
-    return HiveDruidOutputFormat.class;
+    return DruidOutputFormat.class;
   }
 
   @Override
@@ -62,28 +157,141 @@ public class DruidStorageHandler extends DefaultStorageHandler implements HiveMe
   @Override
   public void preCreateTable(Table table) throws MetaException {
     // Do safety checks
-    if (!MetaStoreUtils.isExternalTable(table)) {
-      throw new MetaException("Table in Druid needs to be declared as EXTERNAL");
-    }
-    if (!StringUtils.isEmpty(table.getSd().getLocation())) {
+    if (MetaStoreUtils.isExternalTable(table) && !StringUtils
+            .isEmpty(table.getSd().getLocation())) {
       throw new MetaException("LOCATION may not be specified for Druid");
     }
+
     if (table.getPartitionKeysSize() != 0) {
       throw new MetaException("PARTITIONED BY may not be specified for Druid");
     }
     if (table.getSd().getBucketColsSize() != 0) {
       throw new MetaException("CLUSTERED BY may not be specified for Druid");
     }
+    String dataSourceName = table.getParameters().get(Constants.DRUID_DATA_SOURCE);
+    if (MetaStoreUtils.isExternalTable(table)) {
+      return;
+    }
+    // If it is not an external table we need to check the metadata
+    try {
+      connector.createSegmentTable();
+    } catch (Exception e) {
+      LOG.error("Exception while trying to create druid segments table", e);
+      throw new MetaException(e.getMessage());
+    }
+    Collection<String> existingDataSources = DruidStorageHandlerUtils
+            .getAllDataSourceNames(connector, druidMetadataStorageTablesConfig);
+    LOG.debug(String.format("pre-create data source with name [%s]", dataSourceName));
+    if (existingDataSources.contains(dataSourceName)) {
+      throw new MetaException(String.format("Data source [%s] already existing", dataSourceName));
+    }
   }
 
   @Override
   public void rollbackCreateTable(Table table) throws MetaException {
-    // Nothing to do
+    if (MetaStoreUtils.isExternalTable(table)) {
+      return;
+    }
+    final Path segmentDescriptorDir = getSegmentDescriptorDir();
+    try {
+      List<DataSegment> dataSegmentList = DruidStorageHandlerUtils
+              .getPublishedSegments(segmentDescriptorDir, getConf());
+      for (DataSegment dataSegment : dataSegmentList) {
+        try {
+          deleteSegment(dataSegment);
+        } catch (SegmentLoadingException e) {
+          LOG.error(String.format("Error while trying to clean the segment [%s]", dataSegment), e);
+        }
+      }
+    } catch (IOException e) {
+      LOG.error("Exception while rollback", e);
+      throw Throwables.propagate(e);
+    } finally {
+      cleanWorkingDir();
+    }
   }
 
   @Override
   public void commitCreateTable(Table table) throws MetaException {
-    // Nothing to do
+    if (MetaStoreUtils.isExternalTable(table)) {
+      return;
+    }
+    LOG.info(String.format("Committing table [%s] to the druid metastore", table.getDbName()));
+    final Path tableDir = getSegmentDescriptorDir();
+    try {
+      List<DataSegment> segmentList = DruidStorageHandlerUtils
+              .getPublishedSegments(tableDir, getConf());
+      LOG.info(String.format("Found [%d] segments under path [%s]", segmentList.size(), tableDir));
+      druidSqlMetadataStorageUpdaterJobHandler.publishSegments(
+              druidMetadataStorageTablesConfig.getSegmentsTable(),
+              segmentList,
+              DruidStorageHandlerUtils.JSON_MAPPER
+      );
+    } catch (IOException e) {
+      LOG.error("Exception while commit", e);
+      Throwables.propagate(e);
+    } finally {
+      cleanWorkingDir();
+    }
+  }
+
+  @VisibleForTesting
+  protected void deleteSegment(DataSegment segment) throws SegmentLoadingException {
+
+    final Path path = getPath(segment);
+    LOG.info(String.format("removing segment[%s], located at path[%s]", segment.getIdentifier(),
+            path
+    ));
+
+    try {
+      if (path.getName().endsWith(".zip")) {
+
+        final FileSystem fs = path.getFileSystem(getConf());
+
+        if (!fs.exists(path)) {
+          LOG.warn(String.format(
+                  "Segment Path [%s] does not exist. It appears to have been deleted already.",
+                  path
+          ));
+          return;
+        }
+
+        // path format -- > .../dataSource/interval/version/partitionNum/xxx.zip
+        Path partitionNumDir = path.getParent();
+        if (!fs.delete(partitionNumDir, true)) {
+          throw new SegmentLoadingException(
+                  "Unable to kill segment, failed to delete dir [%s]",
+                  partitionNumDir.toString()
+          );
+        }
+
+        //try to delete other directories if possible
+        Path versionDir = partitionNumDir.getParent();
+        if (safeNonRecursiveDelete(fs, versionDir)) {
+          Path intervalDir = versionDir.getParent();
+          if (safeNonRecursiveDelete(fs, intervalDir)) {
+            Path dataSourceDir = intervalDir.getParent();
+            safeNonRecursiveDelete(fs, dataSourceDir);
+          }
+        }
+      } else {
+        throw new SegmentLoadingException("Unknown file type[%s]", path);
+      }
+    } catch (IOException e) {
+      throw new SegmentLoadingException(e, "Unable to kill segment");
+    }
+  }
+
+  private static Path getPath(DataSegment dataSegment) {
+    return new Path(String.valueOf(dataSegment.getLoadSpec().get("path")));
+  }
+
+  private static boolean safeNonRecursiveDelete(FileSystem fs, Path path) {
+    try {
+      return fs.delete(path, false);
+    } catch (Exception ex) {
+      return false;
+    }
   }
 
   @Override
@@ -98,7 +306,45 @@ public class DruidStorageHandler extends DefaultStorageHandler implements HiveMe
 
   @Override
   public void commitDropTable(Table table, boolean deleteData) throws MetaException {
-    // Nothing to do
+    if (MetaStoreUtils.isExternalTable(table)) {
+      return;
+    }
+    String dataSourceName = Preconditions
+            .checkNotNull(table.getParameters().get(Constants.DRUID_DATA_SOURCE),
+                    "DataSource name is null !"
+            );
+
+    if (deleteData == true) {
+      LOG.info(String.format("Dropping with purge all the data for data source [%s]",
+              dataSourceName
+      ));
+      List<DataSegment> dataSegmentList = DruidStorageHandlerUtils
+              .getDataSegmentList(connector, druidMetadataStorageTablesConfig, dataSourceName);
+      if (dataSegmentList.isEmpty()) {
+        LOG.info(String.format("Nothing to delete for data source [%s]", dataSourceName));
+        return;
+      }
+      for (DataSegment dataSegment : dataSegmentList) {
+        try {
+          deleteSegment(dataSegment);
+        } catch (SegmentLoadingException e) {
+          LOG.error(String.format("Error while deleting segment [%s]", dataSegment.getIdentifier()),
+                  e
+          );
+        }
+      }
+    }
+    if (DruidStorageHandlerUtils
+            .disableDataSource(connector, druidMetadataStorageTablesConfig, dataSourceName)) {
+      LOG.info(String.format("Successfully dropped druid data source [%s]", dataSourceName));
+    }
+  }
+
+  @Override
+  public void configureOutputJobProperties(TableDesc tableDesc, Map<String, String> jobProperties
+  ) {
+    jobProperties.put(Constants.DRUID_SEGMENT_VERSION, new DateTime().toString());
+    jobProperties.put(Constants.DRUID_JOB_WORKING_DIRECTORY, getStagingWorkingDir().toString());
   }
 
   @Override
@@ -106,4 +352,43 @@ public class DruidStorageHandler extends DefaultStorageHandler implements HiveMe
     return Constants.DRUID_HIVE_STORAGE_HANDLER_ID;
   }
 
+  public String getUniqueId() {
+    if (uniqueId == null) {
+      uniqueId = Preconditions.checkNotNull(
+              Strings.emptyToNull(HiveConf.getVar(getConf(), HiveConf.ConfVars.HIVEQUERYID)),
+              "Hive query id is null"
+      );
+    }
+    return uniqueId;
+  }
+
+  private Path getStagingWorkingDir() {
+    return new Path(getRootWorkingDir(), makeStagingName());
+  }
+
+  @VisibleForTesting
+  protected String makeStagingName() {
+    return ".staging-".concat(getUniqueId().replace(":", ""));
+  }
+
+  private Path getSegmentDescriptorDir() {
+    return new Path(getStagingWorkingDir(), SEGMENTS_DESCRIPTOR_DIR_NAME);
+  }
+
+  private void cleanWorkingDir() {
+    final FileSystem fileSystem;
+    try {
+      fileSystem = getStagingWorkingDir().getFileSystem(getConf());
+      fileSystem.delete(getStagingWorkingDir(), true);
+    } catch (IOException e) {
+      LOG.error("Got Exception while cleaning working directory", e);
+    }
+  }
+
+  private String getRootWorkingDir() {
+    if (Strings.isNullOrEmpty(rootWorkingDir)) {
+      rootWorkingDir = HiveConf.getVar(getConf(), HiveConf.ConfVars.DRUID_WORKING_DIR);
+    }
+    return rootWorkingDir;
+  }
 }

http://git-wip-us.apache.org/repos/asf/hive/blob/590687bc/druid-handler/src/java/org/apache/hadoop/hive/druid/DruidStorageHandlerUtils.java
----------------------------------------------------------------------
diff --git a/druid-handler/src/java/org/apache/hadoop/hive/druid/DruidStorageHandlerUtils.java b/druid-handler/src/java/org/apache/hadoop/hive/druid/DruidStorageHandlerUtils.java
index c6b8024..193e4aa 100644
--- a/druid-handler/src/java/org/apache/hadoop/hive/druid/DruidStorageHandlerUtils.java
+++ b/druid-handler/src/java/org/apache/hadoop/hive/druid/DruidStorageHandlerUtils.java
@@ -6,9 +6,9 @@
  * to you under the Apache License, Version 2.0 (the
  * "License"); you may not use this file except in compliance
  * with the License.  You may obtain a copy of the License at
- *
- *     http://www.apache.org/licenses/LICENSE-2.0
- *
+ * <p>
+ * http://www.apache.org/licenses/LICENSE-2.0
+ * <p>
  * Unless required by applicable law or agreed to in writing, software
  * distributed under the License is distributed on an "AS IS" BASIS,
  * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
@@ -17,22 +17,62 @@
  */
 package org.apache.hadoop.hive.druid;
 
-import java.io.IOException;
-import java.io.InputStream;
-import java.net.URL;
-import java.util.concurrent.ExecutionException;
-
-import org.jboss.netty.handler.codec.http.HttpHeaders;
-import org.jboss.netty.handler.codec.http.HttpMethod;
-
 import com.fasterxml.jackson.databind.ObjectMapper;
+import com.fasterxml.jackson.databind.jsontype.NamedType;
 import com.fasterxml.jackson.dataformat.smile.SmileFactory;
+import com.google.common.base.Throwables;
+import com.google.common.collect.ImmutableList;
+import com.google.common.collect.Interner;
+import com.google.common.collect.Interners;
+import com.google.common.collect.Lists;
+import com.metamx.common.MapUtils;
+import com.metamx.emitter.EmittingLogger;
+import com.metamx.emitter.core.NoopEmitter;
+import com.metamx.emitter.service.ServiceEmitter;
 import com.metamx.http.client.HttpClient;
 import com.metamx.http.client.Request;
 import com.metamx.http.client.response.InputStreamResponseHandler;
-
 import io.druid.jackson.DefaultObjectMapper;
+import io.druid.metadata.MetadataStorageTablesConfig;
+import io.druid.metadata.SQLMetadataConnector;
+import io.druid.metadata.storage.mysql.MySQLConnector;
 import io.druid.query.BaseQuery;
+import io.druid.segment.IndexIO;
+import io.druid.segment.IndexMergerV9;
+import io.druid.segment.column.ColumnConfig;
+import io.druid.timeline.DataSegment;
+import io.druid.timeline.partition.LinearShardSpec;
+import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.fs.FileStatus;
+import org.apache.hadoop.fs.FileSystem;
+import org.apache.hadoop.fs.Path;
+import org.apache.hadoop.io.retry.RetryPolicies;
+import org.apache.hadoop.io.retry.RetryProxy;
+import org.jboss.netty.handler.codec.http.HttpHeaders;
+import org.jboss.netty.handler.codec.http.HttpMethod;
+import org.skife.jdbi.v2.FoldController;
+import org.skife.jdbi.v2.Folder3;
+import org.skife.jdbi.v2.Handle;
+import org.skife.jdbi.v2.StatementContext;
+import org.skife.jdbi.v2.TransactionCallback;
+import org.skife.jdbi.v2.TransactionStatus;
+import org.skife.jdbi.v2.tweak.HandleCallback;
+import org.skife.jdbi.v2.util.ByteArrayMapper;
+
+import java.io.IOException;
+import java.io.InputStream;
+import java.io.OutputStream;
+import java.net.InetAddress;
+import java.net.URL;
+import java.net.UnknownHostException;
+import java.sql.SQLException;
+import java.util.ArrayList;
+import java.util.Collection;
+import java.util.List;
+import java.util.Map;
+import java.util.TimeZone;
+import java.util.concurrent.ExecutionException;
+import java.util.concurrent.TimeUnit;
 
 /**
  * Utils class for Druid storage handler.
@@ -51,12 +91,61 @@ public final class DruidStorageHandlerUtils {
    */
   public static final ObjectMapper SMILE_MAPPER = new DefaultObjectMapper(new SmileFactory());
 
+  private static final int NUM_RETRIES = 8;
+
+  private static final int SECONDS_BETWEEN_RETRIES = 2;
+
+  private static final int DEFAULT_FS_BUFFER_SIZE = 1 << 18; // 256KB
+
+  private static final int DEFAULT_STREAMING_RESULT_SIZE = 100;
+
+  /**
+   * Used by druid to perform IO on indexes
+   */
+  public static final IndexIO INDEX_IO = new IndexIO(JSON_MAPPER, new ColumnConfig() {
+    @Override
+    public int columnCacheSizeBytes() {
+      return 0;
+    }
+  });
+
+  /**
+   * Used by druid to merge indexes
+   */
+  public static final IndexMergerV9 INDEX_MERGER_V9 = new IndexMergerV9(JSON_MAPPER,
+          DruidStorageHandlerUtils.INDEX_IO
+  );
+
+  /**
+   * Generic Interner implementation used to read segments object from metadata storage
+   */
+  public static final Interner<DataSegment> DATA_SEGMENT_INTERNER = Interners.newWeakInterner();
+
+  static {
+    // Register the shard sub type to be used by the mapper
+    JSON_MAPPER.registerSubtypes(new NamedType(LinearShardSpec.class, "linear"));
+    // set the timezone of the object mapper
+    // THIS IS NOT WORKING workaround is to set it as part of java opts -Duser.timezone="UTC"
+    JSON_MAPPER.setTimeZone(TimeZone.getTimeZone("UTC"));
+    try {
+      // No operation emitter will be used by some internal druid classes.
+      EmittingLogger.registerEmitter(
+              new ServiceEmitter("druid-hive-indexer", InetAddress.getLocalHost().getHostName(),
+                      new NoopEmitter()
+              ));
+    } catch (UnknownHostException e) {
+      throw Throwables.propagate(e);
+    }
+  }
+
   /**
    * Method that creates a request for Druid JSON query (using SMILE).
-   * @param mapper
+   *
    * @param address
    * @param query
+   *
    * @return
+   *
    * @throws IOException
    */
   public static Request createRequest(String address, BaseQuery<?> query)
@@ -69,9 +158,12 @@ public final class DruidStorageHandlerUtils {
   /**
    * Method that submits a request to an Http address and retrieves the result.
    * The caller is responsible for closing the stream once it finishes consuming it.
+   *
    * @param client
    * @param request
+   *
    * @return
+   *
    * @throws IOException
    */
   public static InputStream submitRequest(HttpClient client, Request request)
@@ -87,4 +179,237 @@ public final class DruidStorageHandlerUtils {
     return response;
   }
 
+  /**
+   * @param taskDir path to the  directory containing the segments descriptor info
+   *                 the descriptor path will be .../workingPath/task_id/{@link DruidStorageHandler#SEGMENTS_DESCRIPTOR_DIR_NAME}/*.json
+   * @param conf     hadoop conf to get the file system
+   *
+   * @return List of DataSegments
+   *
+   * @throws IOException can be for the case we did not produce data.
+   */
+  public static List<DataSegment> getPublishedSegments(Path taskDir, Configuration conf)
+          throws IOException {
+    ImmutableList.Builder<DataSegment> publishedSegmentsBuilder = ImmutableList.builder();
+    FileSystem fs = taskDir.getFileSystem(conf);
+    for (FileStatus fileStatus : fs.listStatus(taskDir)) {
+      final DataSegment segment = JSON_MAPPER
+              .readValue(fs.open(fileStatus.getPath()), DataSegment.class);
+      publishedSegmentsBuilder.add(segment);
+    }
+    List<DataSegment> publishedSegments = publishedSegmentsBuilder.build();
+    return publishedSegments;
+  }
+
+  /**
+   * This function will write to filesystem serialized from of segment descriptor
+   * if an existing file exists it will try to replace it.
+   *
+   * @param outputFS       filesystem
+   * @param segment        DataSegment object
+   * @param descriptorPath path
+   *
+   * @throws IOException
+   */
+  public static void writeSegmentDescriptor(
+          final FileSystem outputFS,
+          final DataSegment segment,
+          final Path descriptorPath
+  )
+          throws IOException {
+    final DataPusher descriptorPusher = (DataPusher) RetryProxy.create(
+            DataPusher.class, new DataPusher() {
+              @Override
+              public long push() throws IOException {
+                try {
+                  if (outputFS.exists(descriptorPath)) {
+                    if (!outputFS.delete(descriptorPath, false)) {
+                      throw new IOException(
+                              String.format("Failed to delete descriptor at [%s]", descriptorPath));
+                    }
+                  }
+                  try (final OutputStream descriptorOut = outputFS.create(
+                          descriptorPath,
+                          true,
+                          DEFAULT_FS_BUFFER_SIZE
+                  )) {
+                    JSON_MAPPER.writeValue(descriptorOut, segment);
+                    descriptorOut.flush();
+                  }
+                } catch (RuntimeException | IOException ex) {
+                  throw ex;
+                }
+                return -1;
+              }
+            },
+            RetryPolicies
+                    .exponentialBackoffRetry(NUM_RETRIES, SECONDS_BETWEEN_RETRIES, TimeUnit.SECONDS)
+    );
+    descriptorPusher.push();
+  }
+
+  /**
+   * @param connector                   SQL metadata connector to the metadata storage
+   * @param metadataStorageTablesConfig Table config
+   *
+   * @return all the active data sources in the metadata storage
+   */
+  public static Collection<String> getAllDataSourceNames(SQLMetadataConnector connector,
+          final MetadataStorageTablesConfig metadataStorageTablesConfig
+  ) {
+    return connector.getDBI().withHandle(
+            new HandleCallback<List<String>>() {
+              @Override
+              public List<String> withHandle(Handle handle) throws Exception {
+                return handle.createQuery(
+                        String.format("SELECT DISTINCT(datasource) FROM %s WHERE used = true",
+                                metadataStorageTablesConfig.getSegmentsTable()
+                        ))
+                        .fold(Lists.<String>newArrayList(),
+                                new Folder3<ArrayList<String>, Map<String, Object>>() {
+                                  @Override
+                                  public ArrayList<String> fold(ArrayList<String> druidDataSources,
+                                          Map<String, Object> stringObjectMap,
+                                          FoldController foldController,
+                                          StatementContext statementContext) throws SQLException {
+                                    druidDataSources.add(
+                                            MapUtils.getString(stringObjectMap, "datasource")
+                                    );
+                                    return druidDataSources;
+                                  }
+                                }
+                        );
+
+              }
+            }
+    );
+  }
+
+  /**
+   * @param connector                   SQL connector to metadata
+   * @param metadataStorageTablesConfig Tables configuration
+   * @param dataSource                  Name of data source
+   *
+   * @return true if the data source was successfully disabled false otherwise
+   */
+  public static boolean disableDataSource(SQLMetadataConnector connector,
+          final MetadataStorageTablesConfig metadataStorageTablesConfig, final String dataSource
+  ) {
+    try {
+      if (!getAllDataSourceNames(connector, metadataStorageTablesConfig).contains(dataSource)) {
+        DruidStorageHandler.LOG
+                .warn(String.format("Cannot delete data source [%s], does not exist", dataSource));
+        return false;
+      }
+
+      connector.getDBI().withHandle(
+              new HandleCallback<Void>() {
+                @Override
+                public Void withHandle(Handle handle) throws Exception {
+                  handle.createStatement(
+                          String.format("UPDATE %s SET used=false WHERE dataSource = :dataSource",
+                                  metadataStorageTablesConfig.getSegmentsTable()
+                          )
+                  )
+                          .bind("dataSource", dataSource)
+                          .execute();
+
+                  return null;
+                }
+              }
+      );
+
+    } catch (Exception e) {
+      DruidStorageHandler.LOG.error(String.format("Error removing dataSource %s", dataSource), e);
+      return false;
+    }
+    return true;
+  }
+
+  /**
+   * @param connector                   SQL connector to metadata
+   * @param metadataStorageTablesConfig Tables configuration
+   * @param dataSource                  Name of data source
+   *
+   * @return List of all data segments part of the given data source
+   */
+  public static List<DataSegment> getDataSegmentList(final SQLMetadataConnector connector,
+          final MetadataStorageTablesConfig metadataStorageTablesConfig, final String dataSource
+  ) {
+    List<DataSegment> segmentList = connector.retryTransaction(
+            new TransactionCallback<List<DataSegment>>() {
+              @Override
+              public List<DataSegment> inTransaction(
+                      Handle handle, TransactionStatus status
+              ) throws Exception {
+                return handle
+                        .createQuery(String.format(
+                                "SELECT payload FROM %s WHERE dataSource = :dataSource",
+                                metadataStorageTablesConfig.getSegmentsTable()
+                        ))
+                        .setFetchSize(getStreamingFetchSize(connector))
+                        .bind("dataSource", dataSource)
+                        .map(ByteArrayMapper.FIRST)
+                        .fold(
+                                new ArrayList<DataSegment>(),
+                                new Folder3<List<DataSegment>, byte[]>() {
+                                  @Override
+                                  public List<DataSegment> fold(List<DataSegment> accumulator,
+                                          byte[] payload, FoldController control,
+                                          StatementContext ctx
+                                  ) throws SQLException {
+                                    try {
+                                      final DataSegment segment = DATA_SEGMENT_INTERNER.intern(
+                                              JSON_MAPPER.readValue(
+                                                      payload,
+                                                      DataSegment.class
+                                              ));
+
+                                      accumulator.add(segment);
+                                      return accumulator;
+                                    } catch (Exception e) {
+                                      throw new SQLException(e.toString());
+                                    }
+                                  }
+                                }
+                        );
+              }
+            }
+            , 3, SQLMetadataConnector.DEFAULT_MAX_TRIES);
+    return segmentList;
+  }
+
+  /**
+   * @param connector
+   *
+   * @return streaming fetch size.
+   */
+  private static int getStreamingFetchSize(SQLMetadataConnector connector) {
+    if (connector instanceof MySQLConnector) {
+      return Integer.MIN_VALUE;
+    }
+    return DEFAULT_STREAMING_RESULT_SIZE;
+  }
+
+  /**
+   * @param pushedSegment
+   * @param segmentsDescriptorDir
+   *
+   * @return a sanitize file name
+   */
+  public static Path makeSegmentDescriptorOutputPath(DataSegment pushedSegment,
+          Path segmentsDescriptorDir
+  ) {
+    return new Path(
+            segmentsDescriptorDir,
+            String.format("%s.json", pushedSegment.getIdentifier().replace(":", ""))
+    );
+  }
+
+  /**
+   * Simple interface for retry operations
+   */
+  public interface DataPusher {
+    long push() throws IOException;
+  }
 }

http://git-wip-us.apache.org/repos/asf/hive/blob/590687bc/druid-handler/src/java/org/apache/hadoop/hive/druid/HiveDruidOutputFormat.java
----------------------------------------------------------------------
diff --git a/druid-handler/src/java/org/apache/hadoop/hive/druid/HiveDruidOutputFormat.java b/druid-handler/src/java/org/apache/hadoop/hive/druid/HiveDruidOutputFormat.java
deleted file mode 100644
index 45e31d6..0000000
--- a/druid-handler/src/java/org/apache/hadoop/hive/druid/HiveDruidOutputFormat.java
+++ /dev/null
@@ -1,55 +0,0 @@
-/**
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *     http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package org.apache.hadoop.hive.druid;
-
-import java.io.IOException;
-import java.util.Properties;
-
-import org.apache.hadoop.fs.FileSystem;
-import org.apache.hadoop.fs.Path;
-import org.apache.hadoop.hive.ql.exec.FileSinkOperator;
-import org.apache.hadoop.hive.ql.io.HiveOutputFormat;
-import org.apache.hadoop.mapred.JobConf;
-import org.apache.hadoop.mapred.RecordWriter;
-import org.apache.hadoop.util.Progressable;
-
-/**
- * Place holder for Druid output format. Currently not implemented.
- */
-@SuppressWarnings("rawtypes")
-public class HiveDruidOutputFormat implements HiveOutputFormat {
-
-  @Override
-  public RecordWriter getRecordWriter(FileSystem ignored, JobConf job, String name,
-          Progressable progress) throws IOException {
-    throw new UnsupportedOperationException();
-  }
-
-  @Override
-  public void checkOutputSpecs(FileSystem ignored, JobConf job) throws IOException {
-    throw new UnsupportedOperationException();
-  }
-
-  @Override
-  public FileSinkOperator.RecordWriter getHiveRecordWriter(JobConf jc, Path finalOutPath,
-          Class valueClass, boolean isCompressed, Properties tableProperties, Progressable progress)
-                  throws IOException {
-    throw new UnsupportedOperationException();
-  }
-
-}

http://git-wip-us.apache.org/repos/asf/hive/blob/590687bc/druid-handler/src/java/org/apache/hadoop/hive/druid/HiveDruidQueryBasedInputFormat.java
----------------------------------------------------------------------
diff --git a/druid-handler/src/java/org/apache/hadoop/hive/druid/HiveDruidQueryBasedInputFormat.java b/druid-handler/src/java/org/apache/hadoop/hive/druid/HiveDruidQueryBasedInputFormat.java
deleted file mode 100644
index 612f853..0000000
--- a/druid-handler/src/java/org/apache/hadoop/hive/druid/HiveDruidQueryBasedInputFormat.java
+++ /dev/null
@@ -1,376 +0,0 @@
-/**
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *     http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package org.apache.hadoop.hive.druid;
-
-import java.io.IOException;
-import java.io.InputStream;
-import java.util.ArrayList;
-import java.util.Arrays;
-import java.util.HashMap;
-import java.util.List;
-import java.util.Map;
-
-import org.apache.calcite.adapter.druid.DruidDateTimeUtils;
-import org.apache.calcite.adapter.druid.DruidTable;
-import org.apache.commons.lang3.StringEscapeUtils;
-import org.apache.commons.lang3.StringUtils;
-import org.apache.hadoop.conf.Configuration;
-import org.apache.hadoop.fs.Path;
-import org.apache.hadoop.hive.conf.Constants;
-import org.apache.hadoop.hive.conf.HiveConf;
-import org.apache.hadoop.hive.druid.serde.DruidGroupByQueryRecordReader;
-import org.apache.hadoop.hive.druid.serde.DruidQueryRecordReader;
-import org.apache.hadoop.hive.druid.serde.DruidSelectQueryRecordReader;
-import org.apache.hadoop.hive.druid.serde.DruidTimeseriesQueryRecordReader;
-import org.apache.hadoop.hive.druid.serde.DruidTopNQueryRecordReader;
-import org.apache.hadoop.hive.druid.serde.DruidWritable;
-import org.apache.hadoop.hive.shims.ShimLoader;
-import org.apache.hadoop.io.NullWritable;
-import org.apache.hadoop.mapred.JobConf;
-import org.apache.hadoop.mapred.Reporter;
-import org.apache.hadoop.mapreduce.InputFormat;
-import org.apache.hadoop.mapreduce.InputSplit;
-import org.apache.hadoop.mapreduce.Job;
-import org.apache.hadoop.mapreduce.JobContext;
-import org.apache.hadoop.mapreduce.RecordReader;
-import org.apache.hadoop.mapreduce.TaskAttemptContext;
-import org.apache.hadoop.mapreduce.lib.input.FileInputFormat;
-import org.joda.time.Interval;
-import org.joda.time.Period;
-import org.joda.time.chrono.ISOChronology;
-import org.slf4j.Logger;
-import org.slf4j.LoggerFactory;
-
-import com.fasterxml.jackson.core.type.TypeReference;
-import com.metamx.common.lifecycle.Lifecycle;
-import com.metamx.http.client.HttpClient;
-import com.metamx.http.client.HttpClientConfig;
-import com.metamx.http.client.HttpClientInit;
-
-import io.druid.query.Druids;
-import io.druid.query.Druids.SegmentMetadataQueryBuilder;
-import io.druid.query.Druids.SelectQueryBuilder;
-import io.druid.query.Druids.TimeBoundaryQueryBuilder;
-import io.druid.query.Query;
-import io.druid.query.Result;
-import io.druid.query.metadata.metadata.SegmentAnalysis;
-import io.druid.query.metadata.metadata.SegmentMetadataQuery;
-import io.druid.query.select.PagingSpec;
-import io.druid.query.select.SelectQuery;
-import io.druid.query.spec.MultipleIntervalSegmentSpec;
-import io.druid.query.timeboundary.TimeBoundaryQuery;
-import io.druid.query.timeboundary.TimeBoundaryResultValue;
-
-/**
- * Druid query based input format.
- * 
- * Given a query and the Druid broker address, it will send it, and retrieve
- * and parse the results.
- */
-public class HiveDruidQueryBasedInputFormat extends InputFormat<NullWritable, DruidWritable>
-        implements org.apache.hadoop.mapred.InputFormat<NullWritable, DruidWritable> {
-
-  protected static final Logger LOG = LoggerFactory.getLogger(HiveDruidQueryBasedInputFormat.class);
-
-  @Override
-  public org.apache.hadoop.mapred.InputSplit[] getSplits(JobConf job, int numSplits)
-          throws IOException {
-    return getInputSplits(job);
-  }
-
-  @Override
-  public List<InputSplit> getSplits(JobContext context) throws IOException, InterruptedException {
-    return Arrays.<InputSplit> asList(getInputSplits(context.getConfiguration()));
-  }
-
-  @SuppressWarnings("deprecation")
-  private HiveDruidSplit[] getInputSplits(Configuration conf) throws IOException {
-    String address = HiveConf.getVar(conf,
-            HiveConf.ConfVars.HIVE_DRUID_BROKER_DEFAULT_ADDRESS);
-    if (StringUtils.isEmpty(address)) {
-      throw new IOException("Druid broker address not specified in configuration");
-    }
-    String druidQuery = StringEscapeUtils.unescapeJava(conf.get(Constants.DRUID_QUERY_JSON));
-    String druidQueryType;
-    if (StringUtils.isEmpty(druidQuery)) {
-      // Empty, maybe because CBO did not run; we fall back to
-      // full Select query
-      if (LOG.isWarnEnabled()) {
-        LOG.warn("Druid query is empty; creating Select query");
-      }
-      String dataSource = conf.get(Constants.DRUID_DATA_SOURCE);
-      if (dataSource == null) {
-        throw new IOException("Druid data source cannot be empty");
-      }
-      druidQuery = createSelectStarQuery(address, dataSource);
-      druidQueryType = Query.SELECT;
-    } else {
-      druidQueryType = conf.get(Constants.DRUID_QUERY_TYPE);
-      if (druidQueryType == null) {
-        throw new IOException("Druid query type not recognized");
-      }
-    }
-
-    // hive depends on FileSplits
-    Job job = new Job(conf);
-    JobContext jobContext = ShimLoader.getHadoopShims().newJobContext(job);
-    Path [] paths = FileInputFormat.getInputPaths(jobContext);
-
-    switch (druidQueryType) {
-      case Query.TIMESERIES:
-      case Query.TOPN:
-      case Query.GROUP_BY:
-        return new HiveDruidSplit[] { new HiveDruidSplit(address, druidQuery, paths[0]) };
-      case Query.SELECT:
-        return splitSelectQuery(conf, address, druidQuery, paths[0]);
-      default:
-        throw new IOException("Druid query type not recognized");
-    }
-  }
-
-  private static String createSelectStarQuery(String address, String dataSource) throws IOException {
-    // Create Select query
-    SelectQueryBuilder builder = new Druids.SelectQueryBuilder();
-    builder.dataSource(dataSource);
-    builder.intervals(Arrays.asList(DruidTable.DEFAULT_INTERVAL));
-    builder.pagingSpec(PagingSpec.newSpec(1));
-    Map<String, Object> context = new HashMap<>();
-    context.put(Constants.DRUID_QUERY_FETCH, false);
-    builder.context(context);
-    return DruidStorageHandlerUtils.JSON_MAPPER.writeValueAsString(builder.build());
-  }
-
-  /* Method that splits Select query depending on the threshold so read can be
-   * parallelized */
-  private static HiveDruidSplit[] splitSelectQuery(Configuration conf, String address,
-          String druidQuery, Path dummyPath) throws IOException {
-    final int selectThreshold = (int) HiveConf.getIntVar(
-            conf, HiveConf.ConfVars.HIVE_DRUID_SELECT_THRESHOLD);
-    final int numConnection = HiveConf
-            .getIntVar(conf, HiveConf.ConfVars.HIVE_DRUID_NUM_HTTP_CONNECTION);
-    final Period readTimeout = new Period(
-            HiveConf.getVar(conf, HiveConf.ConfVars.HIVE_DRUID_HTTP_READ_TIMEOUT));
-    SelectQuery query;
-    try {
-      query = DruidStorageHandlerUtils.JSON_MAPPER.readValue(druidQuery, SelectQuery.class);
-    } catch (Exception e) {
-      throw new IOException(e);
-    }
-
-    final boolean isFetch = query.getContextBoolean(Constants.DRUID_QUERY_FETCH, false);
-    if (isFetch) {
-      // If it has a limit, we use it and we do not split the query
-      return new HiveDruidSplit[] { new HiveDruidSplit(
-              address, DruidStorageHandlerUtils.JSON_MAPPER.writeValueAsString(query), dummyPath) };
-    }
-
-    // We do not have the number of rows, thus we need to execute a
-    // Segment Metadata query to obtain number of rows
-    SegmentMetadataQueryBuilder metadataBuilder = new Druids.SegmentMetadataQueryBuilder();
-    metadataBuilder.dataSource(query.getDataSource());
-    metadataBuilder.intervals(query.getIntervals());
-    metadataBuilder.merge(true);
-    metadataBuilder.analysisTypes();
-    SegmentMetadataQuery metadataQuery = metadataBuilder.build();
-
-    HttpClient client = HttpClientInit.createClient(
-            HttpClientConfig.builder().withNumConnections(numConnection)
-                    .withReadTimeout(readTimeout.toStandardDuration()).build(), new Lifecycle());
-    InputStream response;
-    try {
-      response = DruidStorageHandlerUtils.submitRequest(client,
-              DruidStorageHandlerUtils.createRequest(address, metadataQuery));
-    } catch (Exception e) {
-      throw new IOException(org.apache.hadoop.util.StringUtils.stringifyException(e));
-    }
-
-    // Retrieve results
-    List<SegmentAnalysis> metadataList;
-    try {
-      metadataList = DruidStorageHandlerUtils.SMILE_MAPPER.readValue(response,
-            new TypeReference<List<SegmentAnalysis>>() {});
-    } catch (Exception e) {
-      response.close();
-      throw new IOException(org.apache.hadoop.util.StringUtils.stringifyException(e));
-    }
-    if (metadataList == null || metadataList.isEmpty()) {
-      throw new IOException("Connected to Druid but could not retrieve datasource information");
-    }
-    if (metadataList.size() != 1) {
-      throw new IOException("Information about segments should have been merged");
-    }
-
-    final long numRows = metadataList.get(0).getNumRows();
-
-    query = query.withPagingSpec(PagingSpec.newSpec(Integer.MAX_VALUE));
-    if (numRows <= selectThreshold) {
-      // We are not going to split it
-      return new HiveDruidSplit[] { new HiveDruidSplit(address,
-              DruidStorageHandlerUtils.JSON_MAPPER.writeValueAsString(query), dummyPath) };
-    }
-
-    // If the query does not specify a timestamp, we obtain the total time using
-    // a Time Boundary query. Then, we use the information to split the query
-    // following the Select threshold configuration property
-    final List<Interval> intervals = new ArrayList<>();
-    if (query.getIntervals().size() == 1 && query.getIntervals().get(0).withChronology(
-            ISOChronology.getInstanceUTC()).equals(DruidTable.DEFAULT_INTERVAL)) {
-      // Default max and min, we should execute a time boundary query to get a
-      // more precise range
-      TimeBoundaryQueryBuilder timeBuilder = new Druids.TimeBoundaryQueryBuilder();
-      timeBuilder.dataSource(query.getDataSource());
-      TimeBoundaryQuery timeQuery = timeBuilder.build();
-
-      try {
-        response = DruidStorageHandlerUtils.submitRequest(client,
-                DruidStorageHandlerUtils.createRequest(address, timeQuery));
-      } catch (Exception e) {
-        throw new IOException(org.apache.hadoop.util.StringUtils.stringifyException(e));
-      }
-
-      // Retrieve results
-      List<Result<TimeBoundaryResultValue>> timeList;
-      try {
-        timeList = DruidStorageHandlerUtils.SMILE_MAPPER.readValue(response,
-              new TypeReference<List<Result<TimeBoundaryResultValue>>>() {});
-      } catch (Exception e) {
-        response.close();
-        throw new IOException(org.apache.hadoop.util.StringUtils.stringifyException(e));
-      }
-      if (timeList == null || timeList.isEmpty()) {
-        throw new IOException("Connected to Druid but could not retrieve time boundary information");
-      }
-      if (timeList.size() != 1) {
-        throw new IOException("We should obtain a single time boundary");
-      }
-
-      intervals.add(new Interval(timeList.get(0).getValue().getMinTime().getMillis(),
-              timeList.get(0).getValue().getMaxTime().getMillis(), ISOChronology.getInstanceUTC()));
-    } else {
-      intervals.addAll(query.getIntervals());
-    }
-
-    // Create (numRows/default threshold) input splits
-    int numSplits = (int) Math.ceil((double) numRows / selectThreshold);
-    List<List<Interval>> newIntervals = createSplitsIntervals(intervals, numSplits);
-    HiveDruidSplit[] splits = new HiveDruidSplit[numSplits];
-    for (int i = 0; i < numSplits; i++) {
-      // Create partial Select query
-      final SelectQuery partialQuery = query.withQuerySegmentSpec(
-              new MultipleIntervalSegmentSpec(newIntervals.get(i)));
-      splits[i] = new HiveDruidSplit(address,
-              DruidStorageHandlerUtils.JSON_MAPPER.writeValueAsString(partialQuery), dummyPath);
-    }
-    return splits;
-  }
-
-  private static List<List<Interval>> createSplitsIntervals(List<Interval> intervals, int numSplits) {
-    final long totalTime = DruidDateTimeUtils.extractTotalTime(intervals);
-    long startTime = intervals.get(0).getStartMillis();
-    long endTime = startTime;
-    long currTime = 0;
-    List<List<Interval>> newIntervals = new ArrayList<>();
-    for (int i = 0, posIntervals = 0; i < numSplits; i++) {
-      final long rangeSize = Math.round( (double) (totalTime * (i + 1)) / numSplits) -
-              Math.round( (double) (totalTime * i) / numSplits);
-      // Create the new interval(s)
-      List<Interval> currentIntervals = new ArrayList<>();
-      while (posIntervals < intervals.size()) {
-        final Interval interval = intervals.get(posIntervals);
-        final long expectedRange = rangeSize - currTime;
-        if (interval.getEndMillis() - startTime >= expectedRange) {
-          endTime = startTime + expectedRange;
-          currentIntervals.add(new Interval(startTime, endTime, ISOChronology.getInstanceUTC()));
-          startTime = endTime;
-          currTime = 0;
-          break;
-        }
-        endTime = interval.getEndMillis();
-        currentIntervals.add(new Interval(startTime, endTime, ISOChronology.getInstanceUTC()));
-        currTime += (endTime - startTime);
-        startTime = intervals.get(++posIntervals).getStartMillis();
-      }
-      newIntervals.add(currentIntervals);
-    }
-    assert endTime == intervals.get(intervals.size()-1).getEndMillis();
-    return newIntervals;
-  }
-
-  @Override
-  public org.apache.hadoop.mapred.RecordReader<NullWritable, DruidWritable> getRecordReader(
-          org.apache.hadoop.mapred.InputSplit split, JobConf job, Reporter reporter)
-                  throws IOException {
-    // We need to provide a different record reader for every type of Druid query.
-    // The reason is that Druid results format is different for each type.
-    final DruidQueryRecordReader<?,?> reader;
-    final String druidQueryType = job.get(Constants.DRUID_QUERY_TYPE);
-    if (druidQueryType == null) {
-      reader = new DruidSelectQueryRecordReader(); // By default
-      reader.initialize((HiveDruidSplit)split, job);
-      return reader;
-    }
-    switch (druidQueryType) {
-      case Query.TIMESERIES:
-        reader = new DruidTimeseriesQueryRecordReader();
-        break;
-      case Query.TOPN:
-        reader = new DruidTopNQueryRecordReader();
-        break;
-      case Query.GROUP_BY:
-        reader = new DruidGroupByQueryRecordReader();
-        break;
-      case Query.SELECT:
-        reader = new DruidSelectQueryRecordReader();
-        break;
-      default:
-        throw new IOException("Druid query type not recognized");
-    }
-    reader.initialize((HiveDruidSplit)split, job);
-    return reader;
-  }
-
-  @Override
-  public RecordReader<NullWritable, DruidWritable> createRecordReader(InputSplit split,
-          TaskAttemptContext context) throws IOException, InterruptedException {
-    // We need to provide a different record reader for every type of Druid query.
-    // The reason is that Druid results format is different for each type.
-    final String druidQueryType = context.getConfiguration().get(Constants.DRUID_QUERY_TYPE);
-    if (druidQueryType == null) {
-      return new DruidSelectQueryRecordReader(); // By default
-    }
-    final DruidQueryRecordReader<?,?> reader;
-    switch (druidQueryType) {
-      case Query.TIMESERIES:
-        reader = new DruidTimeseriesQueryRecordReader();
-        break;
-      case Query.TOPN:
-        reader = new DruidTopNQueryRecordReader();
-        break;
-      case Query.GROUP_BY:
-        reader = new DruidGroupByQueryRecordReader();
-        break;
-      case Query.SELECT:
-        reader = new DruidSelectQueryRecordReader();
-        break;
-      default:
-        throw new IOException("Druid query type not recognized");
-    }
-    return reader;
-  }
-
-}

http://git-wip-us.apache.org/repos/asf/hive/blob/590687bc/druid-handler/src/java/org/apache/hadoop/hive/druid/HiveDruidSplit.java
----------------------------------------------------------------------
diff --git a/druid-handler/src/java/org/apache/hadoop/hive/druid/HiveDruidSplit.java b/druid-handler/src/java/org/apache/hadoop/hive/druid/HiveDruidSplit.java
deleted file mode 100644
index 3fba5d0..0000000
--- a/druid-handler/src/java/org/apache/hadoop/hive/druid/HiveDruidSplit.java
+++ /dev/null
@@ -1,83 +0,0 @@
-/**
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *     http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package org.apache.hadoop.hive.druid;
-
-import java.io.DataInput;
-import java.io.DataOutput;
-import java.io.IOException;
-
-import org.apache.hadoop.fs.Path;
-import org.apache.hadoop.mapred.FileSplit;
-
-/**
- * Druid split. Its purpose is to trigger query execution in Druid.
- */
-public class HiveDruidSplit extends FileSplit implements org.apache.hadoop.mapred.InputSplit {
-
-  private String address;
-  private String druidQuery;
-
-  // required for deserialization
-  public HiveDruidSplit() {
-    super((Path) null, 0, 0, (String[]) null);
-  }
-
-  public HiveDruidSplit(String address, String druidQuery, Path dummyPath) {
-    super(dummyPath, 0, 0, (String[]) null);
-    this.address = address;
-    this.druidQuery = druidQuery;
-  }
-
-  @Override
-  public void write(DataOutput out) throws IOException {
-    super.write(out);
-    out.writeUTF(address);
-    out.writeUTF(druidQuery);
-  }
-
-  @Override
-  public void readFields(DataInput in) throws IOException {
-    super.readFields(in);
-    address = in.readUTF();
-    druidQuery = in.readUTF();
-  }
-
-  @Override
-  public long getLength() {
-    return 0L;
-  }
-
-  @Override
-  public String[] getLocations() {
-    return new String[] {""} ;
-  }
-
-  public String getAddress() {
-    return address;
-  }
-
-  public String getDruidQuery() {
-    return druidQuery;
-  }
-
-  @Override
-  public String toString() {
-    return "HiveDruidSplit{" + address + ", " + druidQuery + "}";
-  }
-
-}

http://git-wip-us.apache.org/repos/asf/hive/blob/590687bc/druid-handler/src/java/org/apache/hadoop/hive/druid/io/DruidOutputFormat.java
----------------------------------------------------------------------
diff --git a/druid-handler/src/java/org/apache/hadoop/hive/druid/io/DruidOutputFormat.java b/druid-handler/src/java/org/apache/hadoop/hive/druid/io/DruidOutputFormat.java
new file mode 100644
index 0000000..86ddca8
--- /dev/null
+++ b/druid-handler/src/java/org/apache/hadoop/hive/druid/io/DruidOutputFormat.java
@@ -0,0 +1,204 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ * <p>
+ * http://www.apache.org/licenses/LICENSE-2.0
+ * <p>
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.hadoop.hive.druid.io;
+
+import com.google.common.base.Preconditions;
+import com.google.common.collect.ImmutableList;
+import com.google.common.collect.Lists;
+import com.metamx.common.Granularity;
+import io.druid.data.input.impl.DimensionSchema;
+import io.druid.data.input.impl.DimensionsSpec;
+import io.druid.data.input.impl.InputRowParser;
+import io.druid.data.input.impl.MapInputRowParser;
+import io.druid.data.input.impl.StringDimensionSchema;
+import io.druid.data.input.impl.TimeAndDimsParseSpec;
+import io.druid.data.input.impl.TimestampSpec;
+import io.druid.query.aggregation.AggregatorFactory;
+import io.druid.query.aggregation.DoubleSumAggregatorFactory;
+import io.druid.query.aggregation.LongSumAggregatorFactory;
+import io.druid.segment.indexing.DataSchema;
+import io.druid.segment.indexing.RealtimeTuningConfig;
+import io.druid.segment.indexing.granularity.GranularitySpec;
+import io.druid.segment.indexing.granularity.UniformGranularitySpec;
+import io.druid.segment.loading.DataSegmentPusher;
+import io.druid.segment.realtime.plumber.CustomVersioningPolicy;
+import io.druid.storage.hdfs.HdfsDataSegmentPusher;
+import io.druid.storage.hdfs.HdfsDataSegmentPusherConfig;
+import org.apache.calcite.adapter.druid.DruidTable;
+import org.apache.commons.lang.StringUtils;
+import org.apache.hadoop.fs.FileSystem;
+import org.apache.hadoop.fs.Path;
+import org.apache.hadoop.hive.conf.Constants;
+import org.apache.hadoop.hive.conf.HiveConf;
+import org.apache.hadoop.hive.druid.DruidStorageHandlerUtils;
+import org.apache.hadoop.hive.druid.serde.DruidWritable;
+import org.apache.hadoop.hive.ql.exec.FileSinkOperator;
+import org.apache.hadoop.hive.ql.io.HiveOutputFormat;
+import org.apache.hadoop.hive.serde.serdeConstants;
+import org.apache.hadoop.hive.serde2.objectinspector.ObjectInspector;
+import org.apache.hadoop.hive.serde2.typeinfo.TypeInfo;
+import org.apache.hadoop.hive.serde2.typeinfo.TypeInfoUtils;
+import org.apache.hadoop.io.Writable;
+import org.apache.hadoop.mapred.JobConf;
+import org.apache.hadoop.mapred.RecordWriter;
+import org.apache.hadoop.util.Progressable;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import java.io.File;
+import java.io.IOException;
+import java.util.ArrayList;
+import java.util.List;
+import java.util.Map;
+import java.util.Properties;
+
+import static org.apache.hadoop.hive.druid.DruidStorageHandler.SEGMENTS_DESCRIPTOR_DIR_NAME;
+
+public class DruidOutputFormat<K, V> implements HiveOutputFormat<K, DruidWritable> {
+
+  protected static final Logger LOG = LoggerFactory.getLogger(DruidOutputFormat.class);
+
+  @Override
+  public FileSinkOperator.RecordWriter getHiveRecordWriter(
+          JobConf jc,
+          Path finalOutPath,
+          Class<? extends Writable> valueClass,
+          boolean isCompressed,
+          Properties tableProperties,
+          Progressable progress
+  ) throws IOException {
+
+    final String segmentGranularity =
+            tableProperties.getProperty(Constants.DRUID_SEGMENT_GRANULARITY) != null ?
+                    tableProperties.getProperty(Constants.DRUID_SEGMENT_GRANULARITY) :
+                    HiveConf.getVar(jc, HiveConf.ConfVars.HIVE_DRUID_INDEXING_GRANULARITY);
+    final String dataSource = tableProperties.getProperty(Constants.DRUID_DATA_SOURCE);
+    final String segmentDirectory =
+            tableProperties.getProperty(Constants.DRUID_SEGMENT_DIRECTORY) != null
+                    ? tableProperties.getProperty(Constants.DRUID_SEGMENT_DIRECTORY)
+                    : HiveConf.getVar(jc, HiveConf.ConfVars.DRUID_SEGMENT_DIRECTORY);
+
+    final HdfsDataSegmentPusherConfig hdfsDataSegmentPusherConfig = new HdfsDataSegmentPusherConfig();
+    hdfsDataSegmentPusherConfig.setStorageDirectory(segmentDirectory);
+    final DataSegmentPusher hdfsDataSegmentPusher = new HdfsDataSegmentPusher(
+            hdfsDataSegmentPusherConfig, jc, DruidStorageHandlerUtils.JSON_MAPPER);
+
+    final GranularitySpec granularitySpec = new UniformGranularitySpec(
+            Granularity.valueOf(segmentGranularity),
+            null,
+            null
+    );
+
+    final String columnNameProperty = tableProperties.getProperty(serdeConstants.LIST_COLUMNS);
+    final String columnTypeProperty = tableProperties.getProperty(serdeConstants.LIST_COLUMN_TYPES);
+
+    if (StringUtils.isEmpty(columnNameProperty) || StringUtils.isEmpty(columnTypeProperty)) {
+      throw new IllegalStateException(
+              String.format("List of columns names [%s] or columns type [%s] is/are not present",
+                      columnNameProperty, columnTypeProperty
+              ));
+    }
+    ArrayList<String> columnNames = new ArrayList<String>();
+    for (String name : columnNameProperty.split(",")) {
+      columnNames.add(name);
+    }
+    if (!columnNames.contains(DruidTable.DEFAULT_TIMESTAMP_COLUMN)) {
+      throw new IllegalStateException("Timestamp column (' " + DruidTable.DEFAULT_TIMESTAMP_COLUMN +
+              "') not specified in create table; list of columns is : " +
+              tableProperties.getProperty(serdeConstants.LIST_COLUMNS));
+    }
+    ArrayList<TypeInfo> columnTypes = TypeInfoUtils.getTypeInfosFromTypeString(columnTypeProperty);
+
+    // Default, all columns that are not metrics or timestamp, are treated as dimensions
+    final List<DimensionSchema> dimensions = new ArrayList<>();
+    ImmutableList.Builder<AggregatorFactory> aggregatorFactoryBuilder = ImmutableList.builder();
+    for (int i = 0; i < columnTypes.size(); i++) {
+      TypeInfo f = columnTypes.get(i);
+      assert f.getCategory() == ObjectInspector.Category.PRIMITIVE;
+      AggregatorFactory af;
+      switch (f.getTypeName()) {
+        case serdeConstants.TINYINT_TYPE_NAME:
+        case serdeConstants.SMALLINT_TYPE_NAME:
+        case serdeConstants.INT_TYPE_NAME:
+        case serdeConstants.BIGINT_TYPE_NAME:
+          af = new LongSumAggregatorFactory(columnNames.get(i), columnNames.get(i));
+          break;
+        case serdeConstants.FLOAT_TYPE_NAME:
+        case serdeConstants.DOUBLE_TYPE_NAME:
+          af = new DoubleSumAggregatorFactory(columnNames.get(i), columnNames.get(i));
+          break;
+        default:
+          // Dimension or timestamp
+          String columnName = columnNames.get(i);
+          if (!columnName.equals(DruidTable.DEFAULT_TIMESTAMP_COLUMN) && !columnName
+                  .equals(Constants.DRUID_TIMESTAMP_GRANULARITY_COL_NAME)) {
+            dimensions.add(new StringDimensionSchema(columnName));
+          }
+          continue;
+      }
+      aggregatorFactoryBuilder.add(af);
+    }
+    List<AggregatorFactory> aggregatorFactories = aggregatorFactoryBuilder.build();
+    final InputRowParser inputRowParser = new MapInputRowParser(new TimeAndDimsParseSpec(
+            new TimestampSpec(DruidTable.DEFAULT_TIMESTAMP_COLUMN, "auto", null),
+            new DimensionsSpec(dimensions,
+                    Lists.newArrayList(Constants.DRUID_TIMESTAMP_GRANULARITY_COL_NAME), null
+            )
+    ));
+
+    Map<String, Object> inputParser = DruidStorageHandlerUtils.JSON_MAPPER
+            .convertValue(inputRowParser, Map.class);
+
+    final DataSchema dataSchema = new DataSchema(
+            Preconditions.checkNotNull(dataSource, "Data source name is null"),
+            inputParser,
+            aggregatorFactories.toArray(new AggregatorFactory[aggregatorFactories.size()]),
+            granularitySpec,
+            DruidStorageHandlerUtils.JSON_MAPPER
+    );
+
+    final String workingPath = jc.get(Constants.DRUID_JOB_WORKING_DIRECTORY);
+    final String version = jc.get(Constants.DRUID_SEGMENT_VERSION);
+    Integer maxPartitionSize = HiveConf
+            .getIntVar(jc, HiveConf.ConfVars.HIVE_DRUID_MAX_PARTITION_SIZE);
+    String basePersistDirectory = HiveConf
+            .getVar(jc, HiveConf.ConfVars.HIVE_DRUID_BASE_PERSIST_DIRECTORY);
+    final RealtimeTuningConfig realtimeTuningConfig = RealtimeTuningConfig
+            .makeDefaultTuningConfig(new File(
+                    basePersistDirectory))
+            .withVersioningPolicy(new CustomVersioningPolicy(version));
+
+    LOG.debug(String.format("running with Data schema [%s] ", dataSchema));
+    return new DruidRecordWriter(dataSchema, realtimeTuningConfig, hdfsDataSegmentPusher,
+            maxPartitionSize, new Path(workingPath, SEGMENTS_DESCRIPTOR_DIR_NAME),
+            finalOutPath.getFileSystem(jc)
+    );
+  }
+
+  @Override
+  public RecordWriter<K, DruidWritable> getRecordWriter(
+          FileSystem ignored, JobConf job, String name, Progressable progress
+  ) throws IOException {
+    throw new UnsupportedOperationException("please implement me !");
+  }
+
+  @Override
+  public void checkOutputSpecs(FileSystem ignored, JobConf job) throws IOException {
+    throw new UnsupportedOperationException("not implemented yet");
+  }
+}


[3/4] hive git commit: HIVE-15277: Teach Hive how to create/delete Druid segments (Slim Bouguerra, reviewed by Jesus Camacho Rodriguez)

Posted by jc...@apache.org.
http://git-wip-us.apache.org/repos/asf/hive/blob/590687bc/druid-handler/src/java/org/apache/hadoop/hive/druid/io/DruidQueryBasedInputFormat.java
----------------------------------------------------------------------
diff --git a/druid-handler/src/java/org/apache/hadoop/hive/druid/io/DruidQueryBasedInputFormat.java b/druid-handler/src/java/org/apache/hadoop/hive/druid/io/DruidQueryBasedInputFormat.java
new file mode 100644
index 0000000..7ac52c6
--- /dev/null
+++ b/druid-handler/src/java/org/apache/hadoop/hive/druid/io/DruidQueryBasedInputFormat.java
@@ -0,0 +1,397 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.hadoop.hive.druid.io;
+
+import com.fasterxml.jackson.core.type.TypeReference;
+import com.metamx.common.lifecycle.Lifecycle;
+import com.metamx.http.client.HttpClient;
+import com.metamx.http.client.HttpClientConfig;
+import com.metamx.http.client.HttpClientInit;
+import io.druid.query.Druids;
+import io.druid.query.Druids.SegmentMetadataQueryBuilder;
+import io.druid.query.Druids.SelectQueryBuilder;
+import io.druid.query.Druids.TimeBoundaryQueryBuilder;
+import io.druid.query.Query;
+import io.druid.query.Result;
+import io.druid.query.metadata.metadata.SegmentAnalysis;
+import io.druid.query.metadata.metadata.SegmentMetadataQuery;
+import io.druid.query.select.PagingSpec;
+import io.druid.query.select.SelectQuery;
+import io.druid.query.spec.MultipleIntervalSegmentSpec;
+import io.druid.query.timeboundary.TimeBoundaryQuery;
+import io.druid.query.timeboundary.TimeBoundaryResultValue;
+import org.apache.calcite.adapter.druid.DruidDateTimeUtils;
+import org.apache.calcite.adapter.druid.DruidTable;
+import org.apache.commons.lang3.StringEscapeUtils;
+import org.apache.commons.lang3.StringUtils;
+import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.fs.Path;
+import org.apache.hadoop.hive.conf.Constants;
+import org.apache.hadoop.hive.conf.HiveConf;
+import org.apache.hadoop.hive.druid.DruidStorageHandlerUtils;
+import org.apache.hadoop.hive.druid.serde.DruidGroupByQueryRecordReader;
+import org.apache.hadoop.hive.druid.serde.DruidQueryRecordReader;
+import org.apache.hadoop.hive.druid.serde.DruidSelectQueryRecordReader;
+import org.apache.hadoop.hive.druid.serde.DruidTimeseriesQueryRecordReader;
+import org.apache.hadoop.hive.druid.serde.DruidTopNQueryRecordReader;
+import org.apache.hadoop.hive.druid.serde.DruidWritable;
+import org.apache.hadoop.hive.shims.ShimLoader;
+import org.apache.hadoop.io.NullWritable;
+import org.apache.hadoop.mapred.JobConf;
+import org.apache.hadoop.mapred.Reporter;
+import org.apache.hadoop.mapreduce.InputFormat;
+import org.apache.hadoop.mapreduce.InputSplit;
+import org.apache.hadoop.mapreduce.Job;
+import org.apache.hadoop.mapreduce.JobContext;
+import org.apache.hadoop.mapreduce.RecordReader;
+import org.apache.hadoop.mapreduce.TaskAttemptContext;
+import org.apache.hadoop.mapreduce.lib.input.FileInputFormat;
+import org.joda.time.Interval;
+import org.joda.time.Period;
+import org.joda.time.chrono.ISOChronology;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import java.io.IOException;
+import java.io.InputStream;
+import java.util.ArrayList;
+import java.util.Arrays;
+import java.util.HashMap;
+import java.util.List;
+import java.util.Map;
+
+/**
+ * Druid query based input format.
+ *
+ * Given a query and the Druid broker address, it will send it, and retrieve
+ * and parse the results.
+ */
+public class DruidQueryBasedInputFormat extends InputFormat<NullWritable, DruidWritable>
+        implements org.apache.hadoop.mapred.InputFormat<NullWritable, DruidWritable> {
+
+  protected static final Logger LOG = LoggerFactory.getLogger(DruidQueryBasedInputFormat.class);
+
+  @Override
+  public org.apache.hadoop.mapred.InputSplit[] getSplits(JobConf job, int numSplits)
+          throws IOException {
+    return getInputSplits(job);
+  }
+
+  @Override
+  public List<InputSplit> getSplits(JobContext context) throws IOException, InterruptedException {
+    return Arrays.<InputSplit>asList(getInputSplits(context.getConfiguration()));
+  }
+
+  @SuppressWarnings("deprecation")
+  private HiveDruidSplit[] getInputSplits(Configuration conf) throws IOException {
+    String address = HiveConf.getVar(conf,
+            HiveConf.ConfVars.HIVE_DRUID_BROKER_DEFAULT_ADDRESS
+    );
+    if (StringUtils.isEmpty(address)) {
+      throw new IOException("Druid broker address not specified in configuration");
+    }
+    String druidQuery = StringEscapeUtils.unescapeJava(conf.get(Constants.DRUID_QUERY_JSON));
+    String druidQueryType;
+    if (StringUtils.isEmpty(druidQuery)) {
+      // Empty, maybe because CBO did not run; we fall back to
+      // full Select query
+      if (LOG.isWarnEnabled()) {
+        LOG.warn("Druid query is empty; creating Select query");
+      }
+      String dataSource = conf.get(Constants.DRUID_DATA_SOURCE);
+      if (dataSource == null) {
+        throw new IOException("Druid data source cannot be empty");
+      }
+      druidQuery = createSelectStarQuery(dataSource);
+      druidQueryType = Query.SELECT;
+    } else {
+      druidQueryType = conf.get(Constants.DRUID_QUERY_TYPE);
+      if (druidQueryType == null) {
+        throw new IOException("Druid query type not recognized");
+      }
+    }
+
+    // hive depends on FileSplits
+    Job job = new Job(conf);
+    JobContext jobContext = ShimLoader.getHadoopShims().newJobContext(job);
+    Path[] paths = FileInputFormat.getInputPaths(jobContext);
+
+    switch (druidQueryType) {
+      case Query.TIMESERIES:
+      case Query.TOPN:
+      case Query.GROUP_BY:
+        return new HiveDruidSplit[] { new HiveDruidSplit(address, druidQuery, paths[0]) };
+      case Query.SELECT:
+        return splitSelectQuery(conf, address, druidQuery, paths[0]);
+      default:
+        throw new IOException("Druid query type not recognized");
+    }
+  }
+
+  private static String createSelectStarQuery(String dataSource) throws IOException {
+    // Create Select query
+    SelectQueryBuilder builder = new Druids.SelectQueryBuilder();
+    builder.dataSource(dataSource);
+    builder.intervals(Arrays.asList(DruidTable.DEFAULT_INTERVAL));
+    builder.pagingSpec(PagingSpec.newSpec(1));
+    Map<String, Object> context = new HashMap<>();
+    context.put(Constants.DRUID_QUERY_FETCH, false);
+    builder.context(context);
+    return DruidStorageHandlerUtils.JSON_MAPPER.writeValueAsString(builder.build());
+  }
+
+  /* Method that splits Select query depending on the threshold so read can be
+   * parallelized */
+  private static HiveDruidSplit[] splitSelectQuery(Configuration conf, String address,
+          String druidQuery, Path dummyPath
+  ) throws IOException {
+    final int selectThreshold = (int) HiveConf.getIntVar(
+            conf, HiveConf.ConfVars.HIVE_DRUID_SELECT_THRESHOLD);
+    final int numConnection = HiveConf
+            .getIntVar(conf, HiveConf.ConfVars.HIVE_DRUID_NUM_HTTP_CONNECTION);
+    final Period readTimeout = new Period(
+            HiveConf.getVar(conf, HiveConf.ConfVars.HIVE_DRUID_HTTP_READ_TIMEOUT));
+    SelectQuery query;
+    try {
+      query = DruidStorageHandlerUtils.JSON_MAPPER.readValue(druidQuery, SelectQuery.class);
+    } catch (Exception e) {
+      throw new IOException(e);
+    }
+
+    final boolean isFetch = query.getContextBoolean(Constants.DRUID_QUERY_FETCH, false);
+    if (isFetch) {
+      // If it has a limit, we use it and we do not split the query
+      return new HiveDruidSplit[] { new HiveDruidSplit(
+              address, DruidStorageHandlerUtils.JSON_MAPPER.writeValueAsString(query), dummyPath) };
+    }
+
+    // We do not have the number of rows, thus we need to execute a
+    // Segment Metadata query to obtain number of rows
+    SegmentMetadataQueryBuilder metadataBuilder = new Druids.SegmentMetadataQueryBuilder();
+    metadataBuilder.dataSource(query.getDataSource());
+    metadataBuilder.intervals(query.getIntervals());
+    metadataBuilder.merge(true);
+    metadataBuilder.analysisTypes();
+    SegmentMetadataQuery metadataQuery = metadataBuilder.build();
+    final Lifecycle lifecycle = new Lifecycle();
+    HttpClient client = HttpClientInit.createClient(
+            HttpClientConfig.builder().withNumConnections(numConnection)
+                    .withReadTimeout(readTimeout.toStandardDuration()).build(), lifecycle);
+    try {
+      lifecycle.start();
+    } catch (Exception e) {
+      LOG.error("Lifecycle start issue", e);
+    }
+    InputStream response;
+    try {
+      response = DruidStorageHandlerUtils.submitRequest(client,
+              DruidStorageHandlerUtils.createRequest(address, metadataQuery)
+      );
+    } catch (Exception e) {
+      throw new IOException(org.apache.hadoop.util.StringUtils.stringifyException(e));
+    } finally {
+      lifecycle.stop();
+    }
+
+    // Retrieve results
+    List<SegmentAnalysis> metadataList;
+    try {
+      metadataList = DruidStorageHandlerUtils.SMILE_MAPPER.readValue(response,
+              new TypeReference<List<SegmentAnalysis>>() {
+              }
+      );
+    } catch (Exception e) {
+      response.close();
+      throw new IOException(org.apache.hadoop.util.StringUtils.stringifyException(e));
+    }
+    if (metadataList == null || metadataList.isEmpty()) {
+      throw new IOException("Connected to Druid but could not retrieve datasource information");
+    }
+    if (metadataList.size() != 1) {
+      throw new IOException("Information about segments should have been merged");
+    }
+
+    final long numRows = metadataList.get(0).getNumRows();
+
+    query = query.withPagingSpec(PagingSpec.newSpec(Integer.MAX_VALUE));
+    if (numRows <= selectThreshold) {
+      // We are not going to split it
+      return new HiveDruidSplit[] { new HiveDruidSplit(address,
+              DruidStorageHandlerUtils.JSON_MAPPER.writeValueAsString(query), dummyPath
+      ) };
+    }
+
+    // If the query does not specify a timestamp, we obtain the total time using
+    // a Time Boundary query. Then, we use the information to split the query
+    // following the Select threshold configuration property
+    final List<Interval> intervals = new ArrayList<>();
+    if (query.getIntervals().size() == 1 && query.getIntervals().get(0).withChronology(
+            ISOChronology.getInstanceUTC()).equals(DruidTable.DEFAULT_INTERVAL)) {
+      // Default max and min, we should execute a time boundary query to get a
+      // more precise range
+      TimeBoundaryQueryBuilder timeBuilder = new Druids.TimeBoundaryQueryBuilder();
+      timeBuilder.dataSource(query.getDataSource());
+      TimeBoundaryQuery timeQuery = timeBuilder.build();
+
+      try {
+        response = DruidStorageHandlerUtils.submitRequest(client,
+                DruidStorageHandlerUtils.createRequest(address, timeQuery)
+        );
+      } catch (Exception e) {
+        throw new IOException(org.apache.hadoop.util.StringUtils.stringifyException(e));
+      }
+
+      // Retrieve results
+      List<Result<TimeBoundaryResultValue>> timeList;
+      try {
+        timeList = DruidStorageHandlerUtils.SMILE_MAPPER.readValue(response,
+                new TypeReference<List<Result<TimeBoundaryResultValue>>>() {
+                }
+        );
+      } catch (Exception e) {
+        response.close();
+        throw new IOException(org.apache.hadoop.util.StringUtils.stringifyException(e));
+      }
+      if (timeList == null || timeList.isEmpty()) {
+        throw new IOException(
+                "Connected to Druid but could not retrieve time boundary information");
+      }
+      if (timeList.size() != 1) {
+        throw new IOException("We should obtain a single time boundary");
+      }
+
+      intervals.add(new Interval(timeList.get(0).getValue().getMinTime().getMillis(),
+              timeList.get(0).getValue().getMaxTime().getMillis(), ISOChronology.getInstanceUTC()
+      ));
+    } else {
+      intervals.addAll(query.getIntervals());
+    }
+
+    // Create (numRows/default threshold) input splits
+    int numSplits = (int) Math.ceil((double) numRows / selectThreshold);
+    List<List<Interval>> newIntervals = createSplitsIntervals(intervals, numSplits);
+    HiveDruidSplit[] splits = new HiveDruidSplit[numSplits];
+    for (int i = 0; i < numSplits; i++) {
+      // Create partial Select query
+      final SelectQuery partialQuery = query.withQuerySegmentSpec(
+              new MultipleIntervalSegmentSpec(newIntervals.get(i)));
+      splits[i] = new HiveDruidSplit(address,
+              DruidStorageHandlerUtils.JSON_MAPPER.writeValueAsString(partialQuery), dummyPath
+      );
+    }
+    return splits;
+  }
+
+  private static List<List<Interval>> createSplitsIntervals(List<Interval> intervals, int numSplits
+  ) {
+    final long totalTime = DruidDateTimeUtils.extractTotalTime(intervals);
+    long startTime = intervals.get(0).getStartMillis();
+    long endTime = startTime;
+    long currTime = 0;
+    List<List<Interval>> newIntervals = new ArrayList<>();
+    for (int i = 0, posIntervals = 0; i < numSplits; i++) {
+      final long rangeSize = Math.round((double) (totalTime * (i + 1)) / numSplits) -
+              Math.round((double) (totalTime * i) / numSplits);
+      // Create the new interval(s)
+      List<Interval> currentIntervals = new ArrayList<>();
+      while (posIntervals < intervals.size()) {
+        final Interval interval = intervals.get(posIntervals);
+        final long expectedRange = rangeSize - currTime;
+        if (interval.getEndMillis() - startTime >= expectedRange) {
+          endTime = startTime + expectedRange;
+          currentIntervals.add(new Interval(startTime, endTime, ISOChronology.getInstanceUTC()));
+          startTime = endTime;
+          currTime = 0;
+          break;
+        }
+        endTime = interval.getEndMillis();
+        currentIntervals.add(new Interval(startTime, endTime, ISOChronology.getInstanceUTC()));
+        currTime += (endTime - startTime);
+        startTime = intervals.get(++posIntervals).getStartMillis();
+      }
+      newIntervals.add(currentIntervals);
+    }
+    assert endTime == intervals.get(intervals.size() - 1).getEndMillis();
+    return newIntervals;
+  }
+
+  @Override
+  public org.apache.hadoop.mapred.RecordReader<NullWritable, DruidWritable> getRecordReader(
+          org.apache.hadoop.mapred.InputSplit split, JobConf job, Reporter reporter
+  )
+          throws IOException {
+    // We need to provide a different record reader for every type of Druid query.
+    // The reason is that Druid results format is different for each type.
+    final DruidQueryRecordReader<?, ?> reader;
+    final String druidQueryType = job.get(Constants.DRUID_QUERY_TYPE);
+    if (druidQueryType == null) {
+      reader = new DruidSelectQueryRecordReader(); // By default
+      reader.initialize((HiveDruidSplit) split, job);
+      return reader;
+    }
+    switch (druidQueryType) {
+      case Query.TIMESERIES:
+        reader = new DruidTimeseriesQueryRecordReader();
+        break;
+      case Query.TOPN:
+        reader = new DruidTopNQueryRecordReader();
+        break;
+      case Query.GROUP_BY:
+        reader = new DruidGroupByQueryRecordReader();
+        break;
+      case Query.SELECT:
+        reader = new DruidSelectQueryRecordReader();
+        break;
+      default:
+        throw new IOException("Druid query type not recognized");
+    }
+    reader.initialize((HiveDruidSplit) split, job);
+    return reader;
+  }
+
+  @Override
+  public RecordReader<NullWritable, DruidWritable> createRecordReader(InputSplit split,
+          TaskAttemptContext context
+  ) throws IOException, InterruptedException {
+    // We need to provide a different record reader for every type of Druid query.
+    // The reason is that Druid results format is different for each type.
+    final String druidQueryType = context.getConfiguration().get(Constants.DRUID_QUERY_TYPE);
+    if (druidQueryType == null) {
+      return new DruidSelectQueryRecordReader(); // By default
+    }
+    final DruidQueryRecordReader<?, ?> reader;
+    switch (druidQueryType) {
+      case Query.TIMESERIES:
+        reader = new DruidTimeseriesQueryRecordReader();
+        break;
+      case Query.TOPN:
+        reader = new DruidTopNQueryRecordReader();
+        break;
+      case Query.GROUP_BY:
+        reader = new DruidGroupByQueryRecordReader();
+        break;
+      case Query.SELECT:
+        reader = new DruidSelectQueryRecordReader();
+        break;
+      default:
+        throw new IOException("Druid query type not recognized");
+    }
+    return reader;
+  }
+
+}

http://git-wip-us.apache.org/repos/asf/hive/blob/590687bc/druid-handler/src/java/org/apache/hadoop/hive/druid/io/DruidRecordWriter.java
----------------------------------------------------------------------
diff --git a/druid-handler/src/java/org/apache/hadoop/hive/druid/io/DruidRecordWriter.java b/druid-handler/src/java/org/apache/hadoop/hive/druid/io/DruidRecordWriter.java
new file mode 100644
index 0000000..1601a9a
--- /dev/null
+++ b/druid-handler/src/java/org/apache/hadoop/hive/druid/io/DruidRecordWriter.java
@@ -0,0 +1,260 @@
+package org.apache.hadoop.hive.druid.io;
+
+import com.google.common.base.Function;
+import com.google.common.base.Joiner;
+import com.google.common.base.Preconditions;
+import com.google.common.base.Supplier;
+import com.google.common.base.Suppliers;
+import com.google.common.base.Throwables;
+import com.google.common.collect.FluentIterable;
+import com.google.common.collect.Lists;
+import com.metamx.common.Granularity;
+import io.druid.data.input.Committer;
+import io.druid.data.input.InputRow;
+import io.druid.data.input.MapBasedInputRow;
+import io.druid.segment.indexing.DataSchema;
+import io.druid.segment.indexing.RealtimeTuningConfig;
+import io.druid.segment.loading.DataSegmentPusher;
+import io.druid.segment.realtime.FireDepartmentMetrics;
+import io.druid.segment.realtime.appenderator.Appenderator;
+import io.druid.segment.realtime.appenderator.Appenderators;
+import io.druid.segment.realtime.appenderator.SegmentIdentifier;
+import io.druid.segment.realtime.appenderator.SegmentNotWritableException;
+import io.druid.segment.realtime.appenderator.SegmentsAndMetadata;
+import io.druid.segment.realtime.plumber.Committers;
+import io.druid.timeline.DataSegment;
+import io.druid.timeline.partition.LinearShardSpec;
+import org.apache.calcite.adapter.druid.DruidTable;
+import org.apache.hadoop.fs.FileSystem;
+import org.apache.hadoop.fs.Path;
+import org.apache.hadoop.hive.conf.Constants;
+import org.apache.hadoop.hive.druid.DruidStorageHandlerUtils;
+import org.apache.hadoop.hive.druid.serde.DruidWritable;
+import org.apache.hadoop.io.NullWritable;
+import org.apache.hadoop.io.Writable;
+import org.apache.hadoop.mapred.RecordWriter;
+import org.apache.hadoop.mapred.Reporter;
+import org.joda.time.DateTime;
+import org.joda.time.Interval;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import javax.annotation.Nullable;
+import java.io.IOException;
+import java.util.HashSet;
+import java.util.List;
+import java.util.concurrent.ExecutionException;
+
+public class DruidRecordWriter implements RecordWriter<NullWritable, DruidWritable>,
+        org.apache.hadoop.hive.ql.exec.FileSinkOperator.RecordWriter {
+  protected static final Logger LOG = LoggerFactory.getLogger(DruidRecordWriter.class);
+
+  private final DataSchema dataSchema;
+
+  private final Appenderator appenderator;
+
+  private final RealtimeTuningConfig tuningConfig;
+
+  private final Path segmentsDescriptorDir;
+
+  private SegmentIdentifier currentOpenSegment = null;
+
+  private final Integer maxPartitionSize;
+
+  private final FileSystem fileSystem;
+
+  private final Supplier<Committer> committerSupplier;
+
+  public DruidRecordWriter(
+          DataSchema dataSchema,
+          RealtimeTuningConfig realtimeTuningConfig,
+          DataSegmentPusher dataSegmentPusher,
+          int maxPartitionSize,
+          final Path segmentsDescriptorsDir,
+          final FileSystem fileSystem
+  ) {
+    this.tuningConfig = Preconditions
+            .checkNotNull(realtimeTuningConfig, "realtimeTuningConfig is null");
+    this.dataSchema = Preconditions.checkNotNull(dataSchema, "data schema is null");
+    appenderator = Appenderators
+            .createOffline(this.dataSchema,
+                    tuningConfig,
+                    new FireDepartmentMetrics(), dataSegmentPusher,
+                    DruidStorageHandlerUtils.JSON_MAPPER,
+                    DruidStorageHandlerUtils.INDEX_IO,
+                    DruidStorageHandlerUtils.INDEX_MERGER_V9
+            );
+    Preconditions.checkArgument(maxPartitionSize > 0, "maxPartitionSize need to be greater than 0");
+    this.maxPartitionSize = maxPartitionSize;
+    appenderator.startJob(); // maybe we need to move this out of the constructor
+    this.segmentsDescriptorDir = Preconditions
+            .checkNotNull(segmentsDescriptorsDir, "segmentsDescriptorsDir is null");
+    this.fileSystem = Preconditions.checkNotNull(fileSystem, "file system is null");
+    committerSupplier = Suppliers.ofInstance(Committers.nil());
+  }
+
+  /**
+   * This function computes the segment identifier and push the current open segment
+   * The push will occur if max size is reached or the event belongs to the next interval.
+   * Note that this function assumes that timestamps are pseudo sorted.
+   * This function will close and move to the next segment granularity as soon as
+   * an event from the next interval appears. The sorting is done by the previous stage.
+   *
+   * @return segmentIdentifier with of the truncatedTime and maybe push the current open segment.
+   */
+  private SegmentIdentifier getSegmentIdentifierAndMaybePush(long truncatedTime) {
+
+    final Granularity segmentGranularity = dataSchema.getGranularitySpec()
+            .getSegmentGranularity();
+
+    final Interval interval = new Interval(
+            new DateTime(truncatedTime),
+            segmentGranularity.increment(new DateTime(truncatedTime))
+    );
+
+    SegmentIdentifier retVal;
+    if (currentOpenSegment == null) {
+      retVal = new SegmentIdentifier(
+              dataSchema.getDataSource(),
+              interval,
+              tuningConfig.getVersioningPolicy().getVersion(interval),
+              new LinearShardSpec(0)
+      );
+      currentOpenSegment = retVal;
+      return retVal;
+    } else if (currentOpenSegment.getInterval().equals(interval)) {
+      retVal = currentOpenSegment;
+      int rowCount = appenderator.getRowCount(retVal);
+      if (rowCount < maxPartitionSize) {
+        return retVal;
+      } else {
+        retVal = new SegmentIdentifier(
+                dataSchema.getDataSource(),
+                interval,
+                tuningConfig.getVersioningPolicy().getVersion(interval),
+                new LinearShardSpec(currentOpenSegment.getShardSpec().getPartitionNum() + 1)
+        );
+        pushSegments(Lists.newArrayList(currentOpenSegment));
+        currentOpenSegment = retVal;
+        return retVal;
+      }
+    } else {
+      retVal = new SegmentIdentifier(
+              dataSchema.getDataSource(),
+              interval,
+              tuningConfig.getVersioningPolicy().getVersion(interval),
+              new LinearShardSpec(0)
+      );
+      pushSegments(Lists.newArrayList(currentOpenSegment));
+      currentOpenSegment = retVal;
+      return retVal;
+    }
+  }
+
+  private void pushSegments(List<SegmentIdentifier> segmentsToPush) {
+    try {
+      SegmentsAndMetadata segmentsAndMetadata = appenderator
+              .push(segmentsToPush, committerSupplier.get()).get();
+      final HashSet<String> pushedSegmentIdentifierHashSet = new HashSet<>();
+
+      for (DataSegment pushedSegment : segmentsAndMetadata.getSegments()) {
+        pushedSegmentIdentifierHashSet
+                .add(SegmentIdentifier.fromDataSegment(pushedSegment).getIdentifierAsString());
+        final Path segmentDescriptorOutputPath = DruidStorageHandlerUtils
+                .makeSegmentDescriptorOutputPath(pushedSegment, segmentsDescriptorDir);
+        DruidStorageHandlerUtils
+                .writeSegmentDescriptor(fileSystem, pushedSegment, segmentDescriptorOutputPath);
+
+        LOG.info(
+                String.format(
+                        "Pushed the segment [%s] and persisted the descriptor located at [%s]",
+                        pushedSegment,
+                        segmentDescriptorOutputPath
+                )
+        );
+      }
+
+      final HashSet<String> toPushSegmentsHashSet = new HashSet(
+              FluentIterable.from(segmentsToPush)
+                      .transform(new Function<SegmentIdentifier, String>() {
+                        @Nullable
+                        @Override
+                        public String apply(
+                                @Nullable SegmentIdentifier input
+                        ) {
+                          return input.getIdentifierAsString();
+                        }
+                      })
+                      .toList());
+
+      if (!pushedSegmentIdentifierHashSet.equals(toPushSegmentsHashSet)) {
+        throw new IllegalStateException(String.format(
+                "was asked to publish [%s] but was able to publish only [%s]",
+                Joiner.on(", ").join(toPushSegmentsHashSet),
+                Joiner.on(", ").join(pushedSegmentIdentifierHashSet)
+        ));
+      }
+
+      LOG.info(String.format("Published [%,d] segments.", segmentsToPush.size()));
+    } catch (InterruptedException e) {
+      LOG.error(String.format("got interrupted, failed to push  [%,d] segments.",
+              segmentsToPush.size()
+      ), e);
+      Thread.currentThread().interrupt();
+    } catch (IOException | ExecutionException e) {
+      LOG.error(String.format("Failed to push  [%,d] segments.", segmentsToPush.size()), e);
+      Throwables.propagate(e);
+    }
+  }
+
+  @Override
+  public void write(Writable w) throws IOException {
+    DruidWritable record = (DruidWritable) w;
+    final long timestamp = (long) record.getValue().get(DruidTable.DEFAULT_TIMESTAMP_COLUMN);
+    final long truncatedTime = (long) record.getValue()
+            .get(Constants.DRUID_TIMESTAMP_GRANULARITY_COL_NAME);
+
+    InputRow inputRow = new MapBasedInputRow(
+            timestamp,
+            dataSchema.getParser()
+                    .getParseSpec()
+                    .getDimensionsSpec()
+                    .getDimensionNames(),
+            record.getValue()
+    );
+
+    try {
+      appenderator
+              .add(getSegmentIdentifierAndMaybePush(truncatedTime), inputRow, committerSupplier);
+    } catch (SegmentNotWritableException e) {
+      throw new IOException(e);
+    }
+  }
+
+  @Override
+  public void close(boolean abort) throws IOException {
+    try {
+      if (abort == false) {
+        final List<SegmentIdentifier> segmentsToPush = Lists.newArrayList();
+        segmentsToPush.addAll(appenderator.getSegments());
+        pushSegments(segmentsToPush);
+      }
+      appenderator.clear();
+    } catch (InterruptedException e) {
+      Throwables.propagate(e);
+    } finally {
+      appenderator.close();
+    }
+  }
+
+  @Override
+  public void write(NullWritable key, DruidWritable value) throws IOException {
+    this.write(value);
+  }
+
+  @Override
+  public void close(Reporter reporter) throws IOException {
+    this.close(true);
+  }
+
+}

http://git-wip-us.apache.org/repos/asf/hive/blob/590687bc/druid-handler/src/java/org/apache/hadoop/hive/druid/io/HiveDruidSplit.java
----------------------------------------------------------------------
diff --git a/druid-handler/src/java/org/apache/hadoop/hive/druid/io/HiveDruidSplit.java b/druid-handler/src/java/org/apache/hadoop/hive/druid/io/HiveDruidSplit.java
new file mode 100644
index 0000000..861075d
--- /dev/null
+++ b/druid-handler/src/java/org/apache/hadoop/hive/druid/io/HiveDruidSplit.java
@@ -0,0 +1,84 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.hadoop.hive.druid.io;
+
+import java.io.DataInput;
+import java.io.DataOutput;
+import java.io.IOException;
+
+import org.apache.hadoop.fs.Path;
+import org.apache.hadoop.mapred.FileSplit;
+
+/**
+ * Druid split. Its purpose is to trigger query execution in Druid.
+ */
+public class HiveDruidSplit extends FileSplit implements org.apache.hadoop.mapred.InputSplit {
+
+  private String address;
+
+  private String druidQuery;
+
+  // required for deserialization
+  public HiveDruidSplit() {
+    super((Path) null, 0, 0, (String[]) null);
+  }
+
+  public HiveDruidSplit(String address, String druidQuery, Path dummyPath) {
+    super(dummyPath, 0, 0, (String[]) null);
+    this.address = address;
+    this.druidQuery = druidQuery;
+  }
+
+  @Override
+  public void write(DataOutput out) throws IOException {
+    super.write(out);
+    out.writeUTF(address);
+    out.writeUTF(druidQuery);
+  }
+
+  @Override
+  public void readFields(DataInput in) throws IOException {
+    super.readFields(in);
+    address = in.readUTF();
+    druidQuery = in.readUTF();
+  }
+
+  @Override
+  public long getLength() {
+    return 0L;
+  }
+
+  @Override
+  public String[] getLocations() {
+    return new String[] { "" };
+  }
+
+  public String getAddress() {
+    return address;
+  }
+
+  public String getDruidQuery() {
+    return druidQuery;
+  }
+
+  @Override
+  public String toString() {
+    return "HiveDruidSplit{" + address + ", " + druidQuery + "}";
+  }
+
+}

http://git-wip-us.apache.org/repos/asf/hive/blob/590687bc/druid-handler/src/java/org/apache/hadoop/hive/druid/serde/DruidGroupByQueryRecordReader.java
----------------------------------------------------------------------
diff --git a/druid-handler/src/java/org/apache/hadoop/hive/druid/serde/DruidGroupByQueryRecordReader.java b/druid-handler/src/java/org/apache/hadoop/hive/druid/serde/DruidGroupByQueryRecordReader.java
index f97f820..9e8b439 100644
--- a/druid-handler/src/java/org/apache/hadoop/hive/druid/serde/DruidGroupByQueryRecordReader.java
+++ b/druid-handler/src/java/org/apache/hadoop/hive/druid/serde/DruidGroupByQueryRecordReader.java
@@ -42,7 +42,9 @@ public class DruidGroupByQueryRecordReader
         extends DruidQueryRecordReader<GroupByQuery, Row> {
 
   private Row current;
+
   private int[] indexes = new int[0];
+
   // Row objects returned by GroupByQuery have different access paths depending on
   // whether the result for the metric is a Float or a Long, thus we keep track
   // using these converters
@@ -62,11 +64,14 @@ public class DruidGroupByQueryRecordReader
   @Override
   protected List<Row> createResultsList(InputStream content) throws IOException {
     return DruidStorageHandlerUtils.SMILE_MAPPER.readValue(content,
-            new TypeReference<List<Row>>(){});
+            new TypeReference<List<Row>>() {
+            }
+    );
   }
 
   private void initExtractors() throws IOException {
-    extractors = new Extract[query.getAggregatorSpecs().size() + query.getPostAggregatorSpecs().size()];
+    extractors = new Extract[query.getAggregatorSpecs().size() + query.getPostAggregatorSpecs()
+            .size()];
     int counter = 0;
     for (int i = 0; i < query.getAggregatorSpecs().size(); i++, counter++) {
       AggregatorFactory af = query.getAggregatorSpecs().get(i);
@@ -103,7 +108,7 @@ public class DruidGroupByQueryRecordReader
     if (results.hasNext()) {
       current = results.next();
       indexes = new int[query.getDimensions().size()];
-      for (int i=0; i < query.getDimensions().size(); i++) {
+      for (int i = 0; i < query.getDimensions().size(); i++) {
         DimensionSpec ds = query.getDimensions().get(i);
         indexes[i] = current.getDimension(ds.getDimension()).size() - 1;
       }
@@ -124,7 +129,7 @@ public class DruidGroupByQueryRecordReader
     // 1) The timestamp column
     value.getValue().put(DruidTable.DEFAULT_TIMESTAMP_COLUMN, current.getTimestamp().getMillis());
     // 2) The dimension columns
-    for (int i=0; i < query.getDimensions().size(); i++) {
+    for (int i = 0; i < query.getDimensions().size(); i++) {
       DimensionSpec ds = query.getDimensions().get(i);
       List<String> dims = current.getDimension(ds.getDimension());
       if (dims.size() == 0) {
@@ -163,7 +168,7 @@ public class DruidGroupByQueryRecordReader
       // 1) The timestamp column
       value.getValue().put(DruidTable.DEFAULT_TIMESTAMP_COLUMN, current.getTimestamp().getMillis());
       // 2) The dimension columns
-      for (int i=0; i < query.getDimensions().size(); i++) {
+      for (int i = 0; i < query.getDimensions().size(); i++) {
         DimensionSpec ds = query.getDimensions().get(i);
         List<String> dims = current.getDimension(ds.getDimension());
         if (dims.size() == 0) {

http://git-wip-us.apache.org/repos/asf/hive/blob/590687bc/druid-handler/src/java/org/apache/hadoop/hive/druid/serde/DruidQueryRecordReader.java
----------------------------------------------------------------------
diff --git a/druid-handler/src/java/org/apache/hadoop/hive/druid/serde/DruidQueryRecordReader.java b/druid-handler/src/java/org/apache/hadoop/hive/druid/serde/DruidQueryRecordReader.java
index fe6213b..dc9d6a0 100644
--- a/druid-handler/src/java/org/apache/hadoop/hive/druid/serde/DruidQueryRecordReader.java
+++ b/druid-handler/src/java/org/apache/hadoop/hive/druid/serde/DruidQueryRecordReader.java
@@ -17,15 +17,16 @@
  */
 package org.apache.hadoop.hive.druid.serde;
 
-import java.io.IOException;
-import java.io.InputStream;
-import java.util.Iterator;
-import java.util.List;
-
+import com.google.common.collect.Iterators;
+import com.metamx.common.lifecycle.Lifecycle;
+import com.metamx.http.client.HttpClient;
+import com.metamx.http.client.HttpClientConfig;
+import com.metamx.http.client.HttpClientInit;
+import io.druid.query.BaseQuery;
 import org.apache.hadoop.conf.Configuration;
 import org.apache.hadoop.hive.conf.HiveConf;
 import org.apache.hadoop.hive.druid.DruidStorageHandlerUtils;
-import org.apache.hadoop.hive.druid.HiveDruidSplit;
+import org.apache.hadoop.hive.druid.io.HiveDruidSplit;
 import org.apache.hadoop.io.NullWritable;
 import org.apache.hadoop.mapreduce.InputSplit;
 import org.apache.hadoop.mapreduce.RecordReader;
@@ -34,24 +35,21 @@ import org.joda.time.Period;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 
-import com.google.common.collect.Iterators;
-import com.metamx.common.lifecycle.Lifecycle;
-import com.metamx.http.client.HttpClient;
-import com.metamx.http.client.HttpClientConfig;
-import com.metamx.http.client.HttpClientInit;
-
-import io.druid.query.BaseQuery;
+import java.io.IOException;
+import java.io.InputStream;
+import java.util.Iterator;
+import java.util.List;
 
 /**
  * Base record reader for given a Druid query. This class contains the logic to
  * send the query to the broker and retrieve the results. The transformation to
  * emit records needs to be done by the classes that extend the reader.
- * 
+ *
  * The key for each record will be a NullWritable, while the value will be a
  * DruidWritable containing the timestamp as well as all values resulting from
  * the query.
  */
-public abstract class DruidQueryRecordReader<T extends BaseQuery<R>,R extends Comparable<R>>
+public abstract class DruidQueryRecordReader<T extends BaseQuery<R>, R extends Comparable<R>>
         extends RecordReader<NullWritable, DruidWritable>
         implements org.apache.hadoop.mapred.RecordReader<NullWritable, DruidWritable> {
 
@@ -83,6 +81,7 @@ public abstract class DruidQueryRecordReader<T extends BaseQuery<R>,R extends Co
       LOG.info("Retrieving from druid using query:\n " + query);
     }
 
+    final Lifecycle lifecycle = new Lifecycle();
     final int numConnection = HiveConf
             .getIntVar(conf, HiveConf.ConfVars.HIVE_DRUID_NUM_HTTP_CONNECTION);
     final Period readTimeout = new Period(
@@ -90,10 +89,17 @@ public abstract class DruidQueryRecordReader<T extends BaseQuery<R>,R extends Co
 
     HttpClient client = HttpClientInit.createClient(
             HttpClientConfig.builder().withReadTimeout(readTimeout.toStandardDuration())
-                    .withNumConnections(numConnection).build(), new Lifecycle());
-    InputStream response = DruidStorageHandlerUtils.submitRequest(client,
-            DruidStorageHandlerUtils.createRequest(hiveDruidSplit.getAddress(), query));
+                    .withNumConnections(numConnection).build(), lifecycle);
 
+    try {
+      lifecycle.start();
+    } catch (Exception e) {
+      LOG.error("Issues with lifecycle start", e);
+    }
+    InputStream response = DruidStorageHandlerUtils.submitRequest(client,
+            DruidStorageHandlerUtils.createRequest(hiveDruidSplit.getAddress(), query)
+    );
+    lifecycle.stop();
     // Retrieve results
     List<R> resultsList;
     try {

http://git-wip-us.apache.org/repos/asf/hive/blob/590687bc/druid-handler/src/java/org/apache/hadoop/hive/druid/serde/DruidSelectQueryRecordReader.java
----------------------------------------------------------------------
diff --git a/druid-handler/src/java/org/apache/hadoop/hive/druid/serde/DruidSelectQueryRecordReader.java b/druid-handler/src/java/org/apache/hadoop/hive/druid/serde/DruidSelectQueryRecordReader.java
index c30ac56..8a41e91 100644
--- a/druid-handler/src/java/org/apache/hadoop/hive/druid/serde/DruidSelectQueryRecordReader.java
+++ b/druid-handler/src/java/org/apache/hadoop/hive/druid/serde/DruidSelectQueryRecordReader.java
@@ -41,6 +41,7 @@ public class DruidSelectQueryRecordReader
         extends DruidQueryRecordReader<SelectQuery, Result<SelectResultValue>> {
 
   private Result<SelectResultValue> current;
+
   private Iterator<EventHolder> values = Iterators.emptyIterator();
 
   @Override
@@ -49,9 +50,12 @@ public class DruidSelectQueryRecordReader
   }
 
   @Override
-  protected List<Result<SelectResultValue>> createResultsList(InputStream content) throws IOException {
+  protected List<Result<SelectResultValue>> createResultsList(InputStream content)
+          throws IOException {
     return DruidStorageHandlerUtils.SMILE_MAPPER.readValue(content,
-            new TypeReference<List<Result<SelectResultValue>>>(){});
+            new TypeReference<List<Result<SelectResultValue>>>() {
+            }
+    );
   }
 
   @Override

http://git-wip-us.apache.org/repos/asf/hive/blob/590687bc/druid-handler/src/java/org/apache/hadoop/hive/druid/serde/DruidSerDe.java
----------------------------------------------------------------------
diff --git a/druid-handler/src/java/org/apache/hadoop/hive/druid/serde/DruidSerDe.java b/druid-handler/src/java/org/apache/hadoop/hive/druid/serde/DruidSerDe.java
index eb78a70..2e90df1 100644
--- a/druid-handler/src/java/org/apache/hadoop/hive/druid/serde/DruidSerDe.java
+++ b/druid-handler/src/java/org/apache/hadoop/hive/druid/serde/DruidSerDe.java
@@ -17,19 +17,33 @@
  */
 package org.apache.hadoop.hive.druid.serde;
 
-import java.io.IOException;
-import java.io.InputStream;
-import java.sql.Timestamp;
-import java.util.ArrayList;
-import java.util.List;
-import java.util.Map.Entry;
-import java.util.Properties;
-
+import com.fasterxml.jackson.core.type.TypeReference;
+import com.google.common.base.Function;
+import com.google.common.collect.Lists;
+import com.metamx.common.lifecycle.Lifecycle;
+import com.metamx.http.client.HttpClient;
+import com.metamx.http.client.HttpClientConfig;
+import com.metamx.http.client.HttpClientInit;
+import io.druid.query.Druids;
+import io.druid.query.Druids.SegmentMetadataQueryBuilder;
+import io.druid.query.Query;
+import io.druid.query.aggregation.AggregatorFactory;
+import io.druid.query.aggregation.PostAggregator;
+import io.druid.query.dimension.DimensionSpec;
+import io.druid.query.groupby.GroupByQuery;
+import io.druid.query.metadata.metadata.ColumnAnalysis;
+import io.druid.query.metadata.metadata.SegmentAnalysis;
+import io.druid.query.metadata.metadata.SegmentMetadataQuery;
+import io.druid.query.select.SelectQuery;
+import io.druid.query.timeseries.TimeseriesQuery;
+import io.druid.query.topn.TopNQuery;
 import org.apache.calcite.adapter.druid.DruidTable;
 import org.apache.hadoop.conf.Configuration;
 import org.apache.hadoop.hive.conf.Constants;
 import org.apache.hadoop.hive.conf.HiveConf;
 import org.apache.hadoop.hive.druid.DruidStorageHandlerUtils;
+import org.apache.hadoop.hive.ql.exec.Utilities;
+import org.apache.hadoop.hive.serde.serdeConstants;
 import org.apache.hadoop.hive.serde2.AbstractSerDe;
 import org.apache.hadoop.hive.serde2.SerDeException;
 import org.apache.hadoop.hive.serde2.SerDeSpec;
@@ -37,12 +51,19 @@ import org.apache.hadoop.hive.serde2.SerDeStats;
 import org.apache.hadoop.hive.serde2.io.TimestampWritable;
 import org.apache.hadoop.hive.serde2.objectinspector.ObjectInspector;
 import org.apache.hadoop.hive.serde2.objectinspector.ObjectInspectorFactory;
+import org.apache.hadoop.hive.serde2.objectinspector.StructField;
+import org.apache.hadoop.hive.serde2.objectinspector.StructObjectInspector;
+import org.apache.hadoop.hive.serde2.objectinspector.primitive.DoubleObjectInspector;
+import org.apache.hadoop.hive.serde2.objectinspector.primitive.FloatObjectInspector;
+import org.apache.hadoop.hive.serde2.objectinspector.primitive.LongObjectInspector;
 import org.apache.hadoop.hive.serde2.objectinspector.primitive.PrimitiveObjectInspectorFactory;
+import org.apache.hadoop.hive.serde2.objectinspector.primitive.StringObjectInspector;
+import org.apache.hadoop.hive.serde2.objectinspector.primitive.TimestampObjectInspector;
 import org.apache.hadoop.hive.serde2.typeinfo.PrimitiveTypeInfo;
 import org.apache.hadoop.hive.serde2.typeinfo.TypeInfoFactory;
+import org.apache.hadoop.io.DoubleWritable;
 import org.apache.hadoop.io.FloatWritable;
 import org.apache.hadoop.io.LongWritable;
-import org.apache.hadoop.io.NullWritable;
 import org.apache.hadoop.io.Text;
 import org.apache.hadoop.io.Writable;
 import org.apache.hadoop.util.StringUtils;
@@ -50,43 +71,34 @@ import org.joda.time.Period;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 
-import com.fasterxml.jackson.core.type.TypeReference;
-import com.google.common.collect.Lists;
-import com.metamx.common.lifecycle.Lifecycle;
-import com.metamx.http.client.HttpClient;
-import com.metamx.http.client.HttpClientConfig;
-import com.metamx.http.client.HttpClientInit;
-
-import io.druid.query.Druids;
-import io.druid.query.Druids.SegmentMetadataQueryBuilder;
-import io.druid.query.Query;
-import io.druid.query.aggregation.AggregatorFactory;
-import io.druid.query.aggregation.PostAggregator;
-import io.druid.query.dimension.DimensionSpec;
-import io.druid.query.groupby.GroupByQuery;
-import io.druid.query.metadata.metadata.ColumnAnalysis;
-import io.druid.query.metadata.metadata.SegmentAnalysis;
-import io.druid.query.metadata.metadata.SegmentMetadataQuery;
-import io.druid.query.select.SelectQuery;
-import io.druid.query.timeseries.TimeseriesQuery;
-import io.druid.query.topn.TopNQuery;
+import java.io.IOException;
+import java.io.InputStream;
+import java.sql.Timestamp;
+import java.util.ArrayList;
+import java.util.HashMap;
+import java.util.List;
+import java.util.Map;
+import java.util.Map.Entry;
+import java.util.Properties;
 
 /**
  * DruidSerDe that is used to  deserialize objects from a Druid data source.
  */
-@SerDeSpec(schemaProps = {Constants.DRUID_DATA_SOURCE})
+@SerDeSpec(schemaProps = { Constants.DRUID_DATA_SOURCE })
 public class DruidSerDe extends AbstractSerDe {
 
   protected static final Logger LOG = LoggerFactory.getLogger(DruidSerDe.class);
 
   private String[] columns;
+
   private PrimitiveTypeInfo[] types;
-  private ObjectInspector inspector;
 
   private int numConnection;
 
   private Period readTimeout;
 
+  private ObjectInspector inspector;
+
   @Override
   public void initialize(Configuration configuration, Properties properties) throws SerDeException {
     final List<String> columnNames = new ArrayList<>();
@@ -96,56 +108,93 @@ public class DruidSerDe extends AbstractSerDe {
     // Druid query
     String druidQuery = properties.getProperty(Constants.DRUID_QUERY_JSON);
     if (druidQuery == null) {
-      // No query. We need to create a Druid Segment Metadata query that retrieves all
-      // columns present in the data source (dimensions and metrics).
-      // Create Segment Metadata Query
-      String dataSource = properties.getProperty(Constants.DRUID_DATA_SOURCE);
-      if (dataSource == null) {
-        throw new SerDeException("Druid data source not specified; use " +
-                Constants.DRUID_DATA_SOURCE + " in table properties");
-      }
-      SegmentMetadataQueryBuilder builder = new Druids.SegmentMetadataQueryBuilder();
-      builder.dataSource(dataSource);
-      builder.merge(true);
-      builder.analysisTypes();
-      SegmentMetadataQuery query = builder.build();
+      // No query. Either it is a CTAS, or we need to create a Druid
+      // Segment Metadata query that retrieves all columns present in
+      // the data source (dimensions and metrics).
+      if (!org.apache.commons.lang3.StringUtils
+              .isEmpty(properties.getProperty(serdeConstants.LIST_COLUMNS))
+              && !org.apache.commons.lang3.StringUtils
+              .isEmpty(properties.getProperty(serdeConstants.LIST_COLUMN_TYPES))) {
+        columnNames.addAll(Utilities.getColumnNames(properties));
+        if (!columnNames.contains(DruidTable.DEFAULT_TIMESTAMP_COLUMN)) {
+          throw new SerDeException("Timestamp column (' " + DruidTable.DEFAULT_TIMESTAMP_COLUMN +
+                  "') not specified in create table; list of columns is : " +
+                  properties.getProperty(serdeConstants.LIST_COLUMNS));
+        }
+        columnTypes.addAll(Lists.transform(Utilities.getColumnTypes(properties),
+                new Function<String, PrimitiveTypeInfo>() {
+                  @Override
+                  public PrimitiveTypeInfo apply(String type) {
+                    return TypeInfoFactory.getPrimitiveTypeInfo(type);
+                  }
+                }
+        ));
+        inspectors.addAll(Lists.transform(columnTypes,
+                new Function<PrimitiveTypeInfo, ObjectInspector>() {
+                  @Override
+                  public ObjectInspector apply(PrimitiveTypeInfo type) {
+                    return PrimitiveObjectInspectorFactory
+                            .getPrimitiveWritableObjectInspector(type);
+                  }
+                }
+        ));
+        columns = columnNames.toArray(new String[columnNames.size()]);
+        types = columnTypes.toArray(new PrimitiveTypeInfo[columnTypes.size()]);
+        inspector = ObjectInspectorFactory
+                .getStandardStructObjectInspector(columnNames, inspectors);
+      } else {
+        String dataSource = properties.getProperty(Constants.DRUID_DATA_SOURCE);
+        if (dataSource == null) {
+          throw new SerDeException("Druid data source not specified; use " +
+                  Constants.DRUID_DATA_SOURCE + " in table properties");
+        }
+        SegmentMetadataQueryBuilder builder = new Druids.SegmentMetadataQueryBuilder();
+        builder.dataSource(dataSource);
+        builder.merge(true);
+        builder.analysisTypes();
+        SegmentMetadataQuery query = builder.build();
 
-      // Execute query in Druid
-      String address = HiveConf.getVar(configuration,
-              HiveConf.ConfVars.HIVE_DRUID_BROKER_DEFAULT_ADDRESS);
-      if (org.apache.commons.lang3.StringUtils.isEmpty(address)) {
-        throw new SerDeException("Druid broker address not specified in configuration");
-      }
+        // Execute query in Druid
+        String address = HiveConf.getVar(configuration,
+                HiveConf.ConfVars.HIVE_DRUID_BROKER_DEFAULT_ADDRESS
+        );
+        if (org.apache.commons.lang3.StringUtils.isEmpty(address)) {
+          throw new SerDeException("Druid broker address not specified in configuration");
+        }
 
       numConnection = HiveConf
               .getIntVar(configuration, HiveConf.ConfVars.HIVE_DRUID_NUM_HTTP_CONNECTION);
       readTimeout = new Period(
               HiveConf.getVar(configuration, HiveConf.ConfVars.HIVE_DRUID_HTTP_READ_TIMEOUT));
-      // Infer schema
-      SegmentAnalysis schemaInfo;
-      try {
-        schemaInfo = submitMetadataRequest(address, query);
-      } catch (IOException e) {
-        throw new SerDeException(e);
-      }
-      for (Entry<String,ColumnAnalysis> columnInfo : schemaInfo.getColumns().entrySet()) {
-        if (columnInfo.getKey().equals(DruidTable.DEFAULT_TIMESTAMP_COLUMN)) {
-          // Special handling for timestamp column
+
+        // Infer schema
+        SegmentAnalysis schemaInfo;
+        try {
+          schemaInfo = submitMetadataRequest(address, query);
+        } catch (IOException e) {
+          throw new SerDeException(e);
+        }
+        for (Entry<String, ColumnAnalysis> columnInfo : schemaInfo.getColumns().entrySet()) {
+          if (columnInfo.getKey().equals(DruidTable.DEFAULT_TIMESTAMP_COLUMN)) {
+            // Special handling for timestamp column
+            columnNames.add(columnInfo.getKey()); // field name
+            PrimitiveTypeInfo type = TypeInfoFactory.timestampTypeInfo; // field type
+            columnTypes.add(type);
+            inspectors
+                    .add(PrimitiveObjectInspectorFactory.getPrimitiveWritableObjectInspector(type));
+            continue;
+          }
           columnNames.add(columnInfo.getKey()); // field name
-          PrimitiveTypeInfo type = TypeInfoFactory.timestampTypeInfo; // field type
+          PrimitiveTypeInfo type = DruidSerDeUtils.convertDruidToHiveType(
+                  columnInfo.getValue().getType()); // field type
           columnTypes.add(type);
           inspectors.add(PrimitiveObjectInspectorFactory.getPrimitiveWritableObjectInspector(type));
-          continue;
         }
-        columnNames.add(columnInfo.getKey()); // field name
-        PrimitiveTypeInfo type = DruidSerDeUtils.convertDruidToHiveType(
-                columnInfo.getValue().getType()); // field type
-        columnTypes.add(type);
-        inspectors.add(PrimitiveObjectInspectorFactory.getPrimitiveWritableObjectInspector(type));
+        columns = columnNames.toArray(new String[columnNames.size()]);
+        types = columnTypes.toArray(new PrimitiveTypeInfo[columnTypes.size()]);
+        inspector = ObjectInspectorFactory
+                .getStandardStructObjectInspector(columnNames, inspectors);
       }
-      columns = columnNames.toArray(new String[columnNames.size()]);
-      types = columnTypes.toArray(new PrimitiveTypeInfo[columnTypes.size()]);
-      inspector = ObjectInspectorFactory.getStandardStructObjectInspector(columnNames, inspectors);
     } else {
       // Query is specified, we can extract the results schema from the query
       Query<?> query;
@@ -171,13 +220,14 @@ public class DruidSerDe extends AbstractSerDe {
         default:
           throw new SerDeException("Not supported Druid query");
       }
-    
+
       columns = new String[columnNames.size()];
       types = new PrimitiveTypeInfo[columnNames.size()];
       for (int i = 0; i < columnTypes.size(); ++i) {
         columns[i] = columnNames.get(i);
         types[i] = columnTypes.get(i);
-        inspectors.add(PrimitiveObjectInspectorFactory.getPrimitiveWritableObjectInspector(types[i]));
+        inspectors
+                .add(PrimitiveObjectInspectorFactory.getPrimitiveWritableObjectInspector(types[i]));
       }
       inspector = ObjectInspectorFactory.getStandardStructObjectInspector(columnNames, inspectors);
     }
@@ -192,22 +242,29 @@ public class DruidSerDe extends AbstractSerDe {
   /* Submits the request and returns */
   protected SegmentAnalysis submitMetadataRequest(String address, SegmentMetadataQuery query)
           throws SerDeException, IOException {
+    final Lifecycle lifecycle = new Lifecycle();
     HttpClient client = HttpClientInit.createClient(
             HttpClientConfig.builder().withNumConnections(numConnection)
-                    .withReadTimeout(readTimeout.toStandardDuration()).build(), new Lifecycle());
+                    .withReadTimeout(readTimeout.toStandardDuration()).build(), lifecycle);
     InputStream response;
     try {
+      lifecycle.start();
       response = DruidStorageHandlerUtils.submitRequest(client,
-              DruidStorageHandlerUtils.createRequest(address, query));
+              DruidStorageHandlerUtils.createRequest(address, query)
+      );
     } catch (Exception e) {
       throw new SerDeException(StringUtils.stringifyException(e));
+    } finally {
+      lifecycle.stop();
     }
 
     // Retrieve results
     List<SegmentAnalysis> resultsList;
     try {
       resultsList = DruidStorageHandlerUtils.SMILE_MAPPER.readValue(response,
-              new TypeReference<List<SegmentAnalysis>>() {});
+              new TypeReference<List<SegmentAnalysis>>() {
+              }
+      );
     } catch (Exception e) {
       response.close();
       throw new SerDeException(StringUtils.stringifyException(e));
@@ -224,7 +281,8 @@ public class DruidSerDe extends AbstractSerDe {
 
   /* Timeseries query */
   private void inferSchema(TimeseriesQuery query, List<String> columnNames,
-          List<PrimitiveTypeInfo> columnTypes) {
+          List<PrimitiveTypeInfo> columnTypes
+  ) {
     // Timestamp column
     columnNames.add(DruidTable.DEFAULT_TIMESTAMP_COLUMN);
     columnTypes.add(TypeInfoFactory.timestampTypeInfo);
@@ -241,7 +299,9 @@ public class DruidSerDe extends AbstractSerDe {
   }
 
   /* TopN query */
-  private void inferSchema(TopNQuery query, List<String> columnNames, List<PrimitiveTypeInfo> columnTypes) {
+  private void inferSchema(TopNQuery query, List<String> columnNames,
+          List<PrimitiveTypeInfo> columnTypes
+  ) {
     // Timestamp column
     columnNames.add(DruidTable.DEFAULT_TIMESTAMP_COLUMN);
     columnTypes.add(TypeInfoFactory.timestampTypeInfo);
@@ -262,7 +322,8 @@ public class DruidSerDe extends AbstractSerDe {
 
   /* Select query */
   private void inferSchema(SelectQuery query, List<String> columnNames,
-          List<PrimitiveTypeInfo> columnTypes) {
+          List<PrimitiveTypeInfo> columnTypes
+  ) {
     // Timestamp column
     columnNames.add(DruidTable.DEFAULT_TIMESTAMP_COLUMN);
     columnTypes.add(TypeInfoFactory.timestampTypeInfo);
@@ -279,7 +340,9 @@ public class DruidSerDe extends AbstractSerDe {
   }
 
   /* GroupBy query */
-  private void inferSchema(GroupByQuery query, List<String> columnNames, List<PrimitiveTypeInfo> columnTypes) {
+  private void inferSchema(GroupByQuery query, List<String> columnNames,
+          List<PrimitiveTypeInfo> columnTypes
+  ) {
     // Timestamp column
     columnNames.add(DruidTable.DEFAULT_TIMESTAMP_COLUMN);
     columnTypes.add(TypeInfoFactory.timestampTypeInfo);
@@ -302,17 +365,67 @@ public class DruidSerDe extends AbstractSerDe {
 
   @Override
   public Class<? extends Writable> getSerializedClass() {
-    return NullWritable.class;
+    return DruidWritable.class;
   }
 
   @Override
   public Writable serialize(Object o, ObjectInspector objectInspector) throws SerDeException {
-    return NullWritable.get();
+    if (objectInspector.getCategory() != ObjectInspector.Category.STRUCT) {
+      throw new SerDeException(getClass().toString()
+              + " can only serialize struct types, but we got: "
+              + objectInspector.getTypeName());
+    }
+
+    // Prepare the field ObjectInspectors
+    StructObjectInspector soi = (StructObjectInspector) objectInspector;
+    List<? extends StructField> fields = soi.getAllStructFieldRefs();
+    List<Object> values = soi.getStructFieldsDataAsList(o);
+    // We deserialize the result
+    Map<String, Object> value = new HashMap<>();
+    for (int i = 0; i < columns.length; i++) {
+      if (values.get(i) == null) {
+        // null, we just add it
+        value.put(columns[i], null);
+        continue;
+      }
+      final Object res;
+      switch (types[i].getPrimitiveCategory()) {
+        case TIMESTAMP:
+          res = ((TimestampObjectInspector) fields.get(i).getFieldObjectInspector())
+                  .getPrimitiveJavaObject(
+                          values.get(i)).getTime();
+          break;
+        case LONG:
+          res = ((LongObjectInspector) fields.get(i).getFieldObjectInspector()).get(values.get(i));
+          break;
+        case FLOAT:
+          res = ((FloatObjectInspector) fields.get(i).getFieldObjectInspector()).get(values.get(i));
+          break;
+        case DOUBLE:
+          res = ((DoubleObjectInspector) fields.get(i).getFieldObjectInspector())
+                  .get(values.get(i));
+          break;
+        case STRING:
+          res = ((StringObjectInspector) fields.get(i).getFieldObjectInspector())
+                  .getPrimitiveJavaObject(
+                          values.get(i));
+          break;
+        default:
+          throw new SerDeException("Unknown type: " + types[i].getPrimitiveCategory());
+      }
+      value.put(columns[i], res);
+    }
+    value.put(Constants.DRUID_TIMESTAMP_GRANULARITY_COL_NAME,
+            ((TimestampObjectInspector) fields.get(columns.length).getFieldObjectInspector())
+                    .getPrimitiveJavaObject(values.get(columns.length)).getTime()
+    );
+    return new DruidWritable(value);
   }
 
   @Override
   public SerDeStats getSerDeStats() {
-    throw new UnsupportedOperationException("SerdeStats not supported.");
+    // no support for statistics
+    return null;
   }
 
   @Override
@@ -327,13 +440,16 @@ public class DruidSerDe extends AbstractSerDe {
       }
       switch (types[i].getPrimitiveCategory()) {
         case TIMESTAMP:
-          output.add(new TimestampWritable(new Timestamp((Long)value)));
+          output.add(new TimestampWritable(new Timestamp((Long) value)));
           break;
         case LONG:
-          output.add(new LongWritable(((Number)value).longValue()));
+          output.add(new LongWritable(((Number) value).longValue()));
           break;
         case FLOAT:
-          output.add(new FloatWritable(((Number)value).floatValue()));
+          output.add(new FloatWritable(((Number) value).floatValue()));
+          break;
+        case DOUBLE:
+          output.add(new DoubleWritable(((Number) value).floatValue()));
           break;
         case STRING:
           output.add(new Text(value.toString()));

http://git-wip-us.apache.org/repos/asf/hive/blob/590687bc/druid-handler/src/java/org/apache/hadoop/hive/druid/serde/DruidSerDeUtils.java
----------------------------------------------------------------------
diff --git a/druid-handler/src/java/org/apache/hadoop/hive/druid/serde/DruidSerDeUtils.java b/druid-handler/src/java/org/apache/hadoop/hive/druid/serde/DruidSerDeUtils.java
index 29b8845..64a19f6 100644
--- a/druid-handler/src/java/org/apache/hadoop/hive/druid/serde/DruidSerDeUtils.java
+++ b/druid-handler/src/java/org/apache/hadoop/hive/druid/serde/DruidSerDeUtils.java
@@ -31,14 +31,16 @@ public final class DruidSerDeUtils {
   private static final Logger LOG = LoggerFactory.getLogger(DruidSerDeUtils.class);
 
   protected static final String FLOAT_TYPE = "FLOAT";
+
   protected static final String LONG_TYPE = "LONG";
+
   protected static final String STRING_TYPE = "STRING";
 
   /* This method converts from the String representation of Druid type
    * to the corresponding Hive type */
   public static PrimitiveTypeInfo convertDruidToHiveType(String typeName) {
     typeName = typeName.toUpperCase();
-    switch(typeName) {
+    switch (typeName) {
       case FLOAT_TYPE:
         return TypeInfoFactory.floatTypeInfo;
       case LONG_TYPE:
@@ -61,7 +63,7 @@ public final class DruidSerDeUtils {
    * to the String representation of the corresponding Hive type */
   public static String convertDruidToHiveTypeString(String typeName) {
     typeName = typeName.toUpperCase();
-    switch(typeName) {
+    switch (typeName) {
       case FLOAT_TYPE:
         return serdeConstants.FLOAT_TYPE_NAME;
       case LONG_TYPE:

http://git-wip-us.apache.org/repos/asf/hive/blob/590687bc/druid-handler/src/java/org/apache/hadoop/hive/druid/serde/DruidTimeseriesQueryRecordReader.java
----------------------------------------------------------------------
diff --git a/druid-handler/src/java/org/apache/hadoop/hive/druid/serde/DruidTimeseriesQueryRecordReader.java b/druid-handler/src/java/org/apache/hadoop/hive/druid/serde/DruidTimeseriesQueryRecordReader.java
index b91178c..8c2fb10 100644
--- a/druid-handler/src/java/org/apache/hadoop/hive/druid/serde/DruidTimeseriesQueryRecordReader.java
+++ b/druid-handler/src/java/org/apache/hadoop/hive/druid/serde/DruidTimeseriesQueryRecordReader.java
@@ -45,9 +45,12 @@ public class DruidTimeseriesQueryRecordReader
   }
 
   @Override
-  protected List<Result<TimeseriesResultValue>> createResultsList(InputStream content) throws IOException {
+  protected List<Result<TimeseriesResultValue>> createResultsList(InputStream content)
+          throws IOException {
     return DruidStorageHandlerUtils.SMILE_MAPPER.readValue(content,
-            new TypeReference<List<Result<TimeseriesResultValue>>>(){});
+            new TypeReference<List<Result<TimeseriesResultValue>>>() {
+            }
+    );
   }
 
   @Override

http://git-wip-us.apache.org/repos/asf/hive/blob/590687bc/druid-handler/src/java/org/apache/hadoop/hive/druid/serde/DruidTopNQueryRecordReader.java
----------------------------------------------------------------------
diff --git a/druid-handler/src/java/org/apache/hadoop/hive/druid/serde/DruidTopNQueryRecordReader.java b/druid-handler/src/java/org/apache/hadoop/hive/druid/serde/DruidTopNQueryRecordReader.java
index 22599c3..d431925 100644
--- a/druid-handler/src/java/org/apache/hadoop/hive/druid/serde/DruidTopNQueryRecordReader.java
+++ b/druid-handler/src/java/org/apache/hadoop/hive/druid/serde/DruidTopNQueryRecordReader.java
@@ -41,6 +41,7 @@ public class DruidTopNQueryRecordReader
         extends DruidQueryRecordReader<TopNQuery, Result<TopNResultValue>> {
 
   private Result<TopNResultValue> current;
+
   private Iterator<DimensionAndMetricValueExtractor> values = Iterators.emptyIterator();
 
   @Override
@@ -49,9 +50,12 @@ public class DruidTopNQueryRecordReader
   }
 
   @Override
-  protected List<Result<TopNResultValue>> createResultsList(InputStream content) throws IOException {
+  protected List<Result<TopNResultValue>> createResultsList(InputStream content)
+          throws IOException {
     return DruidStorageHandlerUtils.SMILE_MAPPER.readValue(content,
-            new TypeReference<List<Result<TopNResultValue>>>(){});
+            new TypeReference<List<Result<TopNResultValue>>>() {
+            }
+    );
   }
 
   @Override

http://git-wip-us.apache.org/repos/asf/hive/blob/590687bc/druid-handler/src/test/org/apache/hadoop/hive/druid/DruidStorageHandlerTest.java
----------------------------------------------------------------------
diff --git a/druid-handler/src/test/org/apache/hadoop/hive/druid/DruidStorageHandlerTest.java b/druid-handler/src/test/org/apache/hadoop/hive/druid/DruidStorageHandlerTest.java
new file mode 100644
index 0000000..8770749
--- /dev/null
+++ b/druid-handler/src/test/org/apache/hadoop/hive/druid/DruidStorageHandlerTest.java
@@ -0,0 +1,181 @@
+package org.apache.hadoop.hive.druid;
+
+import com.google.common.collect.ImmutableMap;
+import com.google.common.collect.Lists;
+import io.druid.indexer.JobHelper;
+import io.druid.indexer.SQLMetadataStorageUpdaterJobHandler;
+import io.druid.segment.loading.SegmentLoadingException;
+import io.druid.timeline.DataSegment;
+import io.druid.timeline.partition.NoneShardSpec;
+import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.fs.FileSystem;
+import org.apache.hadoop.fs.LocalFileSystem;
+import org.apache.hadoop.fs.Path;
+import org.apache.hadoop.hive.conf.Constants;
+import org.apache.hadoop.hive.conf.HiveConf;
+import org.apache.hadoop.hive.metastore.api.MetaException;
+import org.apache.hadoop.hive.metastore.api.StorageDescriptor;
+import org.apache.hadoop.hive.metastore.api.Table;
+import org.joda.time.Interval;
+import org.junit.Assert;
+import org.junit.Before;
+import org.junit.Rule;
+import org.junit.Test;
+import org.junit.rules.TemporaryFolder;
+import org.mockito.Mockito;
+import org.skife.jdbi.v2.Handle;
+
+import java.io.IOException;
+import java.io.OutputStream;
+import java.util.Arrays;
+import java.util.Map;
+import java.util.UUID;
+
+public class DruidStorageHandlerTest {
+
+  @Rule
+  public final TestDerbyConnector.DerbyConnectorRule derbyConnectorRule = new TestDerbyConnector.DerbyConnectorRule();
+
+  @Rule
+  public final TemporaryFolder temporaryFolder = new TemporaryFolder();
+
+  private static final String DATA_SOURCE_NAME = "testName";
+
+  private String segmentsTable;
+
+  private String tablePath;
+
+  private DataSegment dataSegment = DataSegment.builder().dataSource(DATA_SOURCE_NAME).version("v1")
+          .interval(new Interval(100, 170)).shardSpec(NoneShardSpec.instance()).build();
+
+  @Before
+  public void before() throws Throwable {
+    tablePath = temporaryFolder.newFolder().getAbsolutePath();
+    segmentsTable = derbyConnectorRule.metadataTablesConfigSupplier().get().getSegmentsTable();
+    Map<String, String> mockMap = ImmutableMap.of(Constants.DRUID_DATA_SOURCE, DATA_SOURCE_NAME);
+    Mockito.when(tableMock.getParameters()).thenReturn(mockMap);
+    Mockito.when(tableMock.getPartitionKeysSize()).thenReturn(0);
+    StorageDescriptor storageDes = Mockito.mock(StorageDescriptor.class);
+    Mockito.when(storageDes.getBucketColsSize()).thenReturn(0);
+    Mockito.when(tableMock.getSd()).thenReturn(storageDes);
+    Mockito.when(tableMock.getDbName()).thenReturn(DATA_SOURCE_NAME);
+  }
+
+  Table tableMock = Mockito.mock(Table.class);
+
+  @Test
+  public void testPreCreateTableWillCreateSegmentsTable() throws MetaException {
+    DruidStorageHandler druidStorageHandler = new DruidStorageHandler(
+            derbyConnectorRule.getConnector(),
+            new SQLMetadataStorageUpdaterJobHandler(derbyConnectorRule.getConnector()),
+            derbyConnectorRule.metadataTablesConfigSupplier().get()
+    );
+
+    try (Handle handle = derbyConnectorRule.getConnector().getDBI().open()) {
+      Assert.assertFalse(derbyConnectorRule.getConnector()
+              .tableExists(handle,
+                      segmentsTable
+              ));
+      druidStorageHandler.preCreateTable(tableMock);
+      Assert.assertTrue(derbyConnectorRule.getConnector()
+              .tableExists(handle,
+                      segmentsTable
+              ));
+    }
+
+  }
+
+  @Test(expected = MetaException.class)
+  public void testPreCreateTableWhenDataSourceExists() throws MetaException {
+    derbyConnectorRule.getConnector().createSegmentTable();
+    SQLMetadataStorageUpdaterJobHandler sqlMetadataStorageUpdaterJobHandler = new SQLMetadataStorageUpdaterJobHandler(
+            derbyConnectorRule.getConnector());
+    sqlMetadataStorageUpdaterJobHandler.publishSegments(segmentsTable, Arrays.asList(dataSegment),
+            DruidStorageHandlerUtils.JSON_MAPPER
+    );
+    DruidStorageHandler druidStorageHandler = new DruidStorageHandler(
+            derbyConnectorRule.getConnector(),
+            new SQLMetadataStorageUpdaterJobHandler(derbyConnectorRule.getConnector()),
+            derbyConnectorRule.metadataTablesConfigSupplier().get()
+    );
+    druidStorageHandler.preCreateTable(tableMock);
+  }
+
+  @Test
+  public void testCommitCreateTablePlusCommitDropTableWithoutPurge()
+          throws MetaException, IOException {
+    DruidStorageHandler druidStorageHandler = new DruidStorageHandler(
+            derbyConnectorRule.getConnector(),
+            new SQLMetadataStorageUpdaterJobHandler(derbyConnectorRule.getConnector()),
+            derbyConnectorRule.metadataTablesConfigSupplier().get()
+    );
+    druidStorageHandler.preCreateTable(tableMock);
+    Configuration config = new Configuration();
+    config.set(String.valueOf(HiveConf.ConfVars.HIVEQUERYID), UUID.randomUUID().toString());
+    config.set(String.valueOf(HiveConf.ConfVars.DRUID_WORKING_DIR), tablePath);
+    druidStorageHandler.setConf(config);
+    LocalFileSystem localFileSystem = FileSystem.getLocal(config);
+    /*
+    final descriptor path is in the form tablePath/taskId_Attempt_ID/segmentDescriptorDir/segmentIdentifier.json
+    UUID.randomUUID() will fake the taskId_attemptID
+    */
+    Path taskDirPath = new Path(tablePath, druidStorageHandler.makeStagingName());
+    Path descriptorPath = DruidStorageHandlerUtils.makeSegmentDescriptorOutputPath(dataSegment,
+            new Path(taskDirPath, DruidStorageHandler.SEGMENTS_DESCRIPTOR_DIR_NAME)
+    );
+    DruidStorageHandlerUtils.writeSegmentDescriptor(localFileSystem, dataSegment, descriptorPath);
+    druidStorageHandler.commitCreateTable(tableMock);
+    Assert.assertArrayEquals(Lists.newArrayList(DATA_SOURCE_NAME).toArray(), Lists.newArrayList(
+            DruidStorageHandlerUtils.getAllDataSourceNames(derbyConnectorRule.getConnector(),
+                    derbyConnectorRule.metadataTablesConfigSupplier().get()
+            )).toArray());
+    druidStorageHandler.commitDropTable(tableMock, false);
+    Assert.assertArrayEquals(Lists.newArrayList().toArray(), Lists.newArrayList(
+            DruidStorageHandlerUtils.getAllDataSourceNames(derbyConnectorRule.getConnector(),
+                    derbyConnectorRule.metadataTablesConfigSupplier().get()
+            )).toArray());
+
+  }
+
+  @Test
+  public void testDeleteSegment() throws IOException, SegmentLoadingException {
+    DruidStorageHandler druidStorageHandler = new DruidStorageHandler(
+            derbyConnectorRule.getConnector(),
+            new SQLMetadataStorageUpdaterJobHandler(derbyConnectorRule.getConnector()),
+            derbyConnectorRule.metadataTablesConfigSupplier().get()
+    );
+
+    String segmentRootPath = temporaryFolder.newFolder().getAbsolutePath();
+    Configuration config = new Configuration();
+    druidStorageHandler.setConf(config);
+    LocalFileSystem localFileSystem = FileSystem.getLocal(config);
+
+    Path segmentOutputPath = JobHelper
+            .makeSegmentOutputPath(new Path(segmentRootPath), localFileSystem, dataSegment);
+    Path indexPath = new Path(segmentOutputPath, "index.zip");
+    DataSegment dataSegmentWithLoadspect = DataSegment.builder(dataSegment).loadSpec(
+            ImmutableMap.<String, Object>of("path", indexPath)).build();
+    OutputStream outputStream = localFileSystem.create(indexPath, true);
+    outputStream.close();
+    Assert.assertTrue("index file is not created ??", localFileSystem.exists(indexPath));
+    Assert.assertTrue(localFileSystem.exists(segmentOutputPath));
+
+    druidStorageHandler.deleteSegment(dataSegmentWithLoadspect);
+    // path format -- > .../dataSource/interval/version/partitionNum/xxx.zip
+    Assert.assertFalse("Index file still there ??", localFileSystem.exists(indexPath));
+    // path format of segmentOutputPath -- > .../dataSource/interval/version/partitionNum/
+    Assert.assertFalse("PartitionNum directory still there ??",
+            localFileSystem.exists(segmentOutputPath)
+    );
+    Assert.assertFalse("Version directory still there ??",
+            localFileSystem.exists(segmentOutputPath.getParent())
+    );
+    Assert.assertFalse("Interval directory still there ??",
+            localFileSystem.exists(segmentOutputPath.getParent().getParent())
+    );
+    Assert.assertFalse("Data source directory still there ??",
+            localFileSystem.exists(segmentOutputPath.getParent().getParent().getParent())
+    );
+  }
+
+}

http://git-wip-us.apache.org/repos/asf/hive/blob/590687bc/druid-handler/src/test/org/apache/hadoop/hive/druid/QTestDruidSerDe.java
----------------------------------------------------------------------
diff --git a/druid-handler/src/test/org/apache/hadoop/hive/druid/QTestDruidSerDe.java b/druid-handler/src/test/org/apache/hadoop/hive/druid/QTestDruidSerDe.java
index 2b4df78..8dc8091 100644
--- a/druid-handler/src/test/org/apache/hadoop/hive/druid/QTestDruidSerDe.java
+++ b/druid-handler/src/test/org/apache/hadoop/hive/druid/QTestDruidSerDe.java
@@ -41,34 +41,34 @@ public class QTestDruidSerDe extends DruidSerDe {
   //        + "\"usingDefaultInterval\":true,\"lenientAggregatorMerge\":false,\"descending\":false}";
   private static final String RESPONSE =
           "[ {\r\n "
-          + " \"id\" : \"merged\",\r\n "
-          + " \"intervals\" : [ \"2010-01-01T00:00:00.000Z/2015-12-31T00:00:00.000Z\" ],\r\n "
-          + " \"columns\" : {\r\n  "
-          + "  \"__time\" : { \"type\" : \"LONG\", \"hasMultipleValues\" : false, \"size\" : 407240380, \"cardinality\" : null, \"errorMessage\" : null },\r\n  "
-          + "  \"robot\" : { \"type\" : \"STRING\", \"hasMultipleValues\" : false, \"size\" : 100000, \"cardinality\" : 1944, \"errorMessage\" : null },\r\n  "
-          + "  \"namespace\" : { \"type\" : \"STRING\", \"hasMultipleValues\" : true, \"size\" : 100000, \"cardinality\" : 1504, \"errorMessage\" : null },\r\n  "
-          + "  \"anonymous\" : { \"type\" : \"STRING\", \"hasMultipleValues\" : false, \"size\" : 100000, \"cardinality\" : 1944, \"errorMessage\" : null },\r\n  "
-          + "  \"unpatrolled\" : { \"type\" : \"STRING\", \"hasMultipleValues\" : false, \"size\" : 100000, \"cardinality\" : 1944, \"errorMessage\" : null },\r\n  "
-          + "  \"page\" : { \"type\" : \"STRING\", \"hasMultipleValues\" : false, \"size\" : 100000, \"cardinality\" : 1944, \"errorMessage\" : null },\r\n  "
-          + "  \"language\" : { \"type\" : \"STRING\", \"hasMultipleValues\" : false, \"size\" : 100000, \"cardinality\" : 1944, \"errorMessage\" : null },\r\n  "
-          + "  \"newpage\" : { \"type\" : \"STRING\", \"hasMultipleValues\" : false, \"size\" : 100000, \"cardinality\" : 1944, \"errorMessage\" : null },\r\n  "
-          + "  \"user\" : { \"type\" : \"STRING\", \"hasMultipleValues\" : false, \"size\" : 100000, \"cardinality\" : 1944, \"errorMessage\" : null },\r\n  "
-          + "  \"count\" : { \"type\" : \"FLOAT\", \"hasMultipleValues\" : false, \"size\" : 100000, \"cardinality\" : null, \"errorMessage\" : null },\r\n  "
-          + "  \"added\" : { \"type\" : \"FLOAT\", \"hasMultipleValues\" : false, \"size\" : 100000, \"cardinality\" : null, \"errorMessage\" : null },\r\n  "
-          + "  \"delta\" : { \"type\" : \"FLOAT\", \"hasMultipleValues\" : false, \"size\" : 100000, \"cardinality\" : null, \"errorMessage\" : null },\r\n  "
-          + "  \"variation\" : { \"type\" : \"FLOAT\", \"hasMultipleValues\" : false, \"size\" : 100000, \"cardinality\" : null, \"errorMessage\" : null },\r\n  "
-          + "  \"deleted\" : { \"type\" : \"FLOAT\", \"hasMultipleValues\" : false, \"size\" : 100000, \"cardinality\" : null, \"errorMessage\" : null }\r\n "
-          + " },\r\n "
-          + " \"aggregators\" : {\r\n  "
-          + "  \"count\" : { \"type\" : \"longSum\", \"name\" : \"count\", \"fieldName\" : \"count\" },\r\n  "
-          + "  \"added\" : { \"type\" : \"doubleSum\", \"name\" : \"added\", \"fieldName\" : \"added\" },\r\n  "
-          + "  \"delta\" : { \"type\" : \"doubleSum\", \"name\" : \"delta\", \"fieldName\" : \"delta\" },\r\n  "
-          + "  \"variation\" : { \"type\" : \"doubleSum\", \"name\" : \"variation\", \"fieldName\" : \"variation\" },\r\n  "
-          + "  \"deleted\" : { \"type\" : \"doubleSum\", \"name\" : \"deleted\", \"fieldName\" : \"deleted\" }\r\n "
-          + " },\r\n "
-          + " \"queryGranularity\" : {\r\n    \"type\": \"none\"\r\n  },\r\n "
-          + " \"size\" : 300000,\r\n "
-          + " \"numRows\" : 5000000\r\n} ]";
+                  + " \"id\" : \"merged\",\r\n "
+                  + " \"intervals\" : [ \"2010-01-01T00:00:00.000Z/2015-12-31T00:00:00.000Z\" ],\r\n "
+                  + " \"columns\" : {\r\n  "
+                  + "  \"__time\" : { \"type\" : \"LONG\", \"hasMultipleValues\" : false, \"size\" : 407240380, \"cardinality\" : null, \"errorMessage\" : null },\r\n  "
+                  + "  \"robot\" : { \"type\" : \"STRING\", \"hasMultipleValues\" : false, \"size\" : 100000, \"cardinality\" : 1944, \"errorMessage\" : null },\r\n  "
+                  + "  \"namespace\" : { \"type\" : \"STRING\", \"hasMultipleValues\" : true, \"size\" : 100000, \"cardinality\" : 1504, \"errorMessage\" : null },\r\n  "
+                  + "  \"anonymous\" : { \"type\" : \"STRING\", \"hasMultipleValues\" : false, \"size\" : 100000, \"cardinality\" : 1944, \"errorMessage\" : null },\r\n  "
+                  + "  \"unpatrolled\" : { \"type\" : \"STRING\", \"hasMultipleValues\" : false, \"size\" : 100000, \"cardinality\" : 1944, \"errorMessage\" : null },\r\n  "
+                  + "  \"page\" : { \"type\" : \"STRING\", \"hasMultipleValues\" : false, \"size\" : 100000, \"cardinality\" : 1944, \"errorMessage\" : null },\r\n  "
+                  + "  \"language\" : { \"type\" : \"STRING\", \"hasMultipleValues\" : false, \"size\" : 100000, \"cardinality\" : 1944, \"errorMessage\" : null },\r\n  "
+                  + "  \"newpage\" : { \"type\" : \"STRING\", \"hasMultipleValues\" : false, \"size\" : 100000, \"cardinality\" : 1944, \"errorMessage\" : null },\r\n  "
+                  + "  \"user\" : { \"type\" : \"STRING\", \"hasMultipleValues\" : false, \"size\" : 100000, \"cardinality\" : 1944, \"errorMessage\" : null },\r\n  "
+                  + "  \"count\" : { \"type\" : \"FLOAT\", \"hasMultipleValues\" : false, \"size\" : 100000, \"cardinality\" : null, \"errorMessage\" : null },\r\n  "
+                  + "  \"added\" : { \"type\" : \"FLOAT\", \"hasMultipleValues\" : false, \"size\" : 100000, \"cardinality\" : null, \"errorMessage\" : null },\r\n  "
+                  + "  \"delta\" : { \"type\" : \"FLOAT\", \"hasMultipleValues\" : false, \"size\" : 100000, \"cardinality\" : null, \"errorMessage\" : null },\r\n  "
+                  + "  \"variation\" : { \"type\" : \"FLOAT\", \"hasMultipleValues\" : false, \"size\" : 100000, \"cardinality\" : null, \"errorMessage\" : null },\r\n  "
+                  + "  \"deleted\" : { \"type\" : \"FLOAT\", \"hasMultipleValues\" : false, \"size\" : 100000, \"cardinality\" : null, \"errorMessage\" : null }\r\n "
+                  + " },\r\n "
+                  + " \"aggregators\" : {\r\n  "
+                  + "  \"count\" : { \"type\" : \"longSum\", \"name\" : \"count\", \"fieldName\" : \"count\" },\r\n  "
+                  + "  \"added\" : { \"type\" : \"doubleSum\", \"name\" : \"added\", \"fieldName\" : \"added\" },\r\n  "
+                  + "  \"delta\" : { \"type\" : \"doubleSum\", \"name\" : \"delta\", \"fieldName\" : \"delta\" },\r\n  "
+                  + "  \"variation\" : { \"type\" : \"doubleSum\", \"name\" : \"variation\", \"fieldName\" : \"variation\" },\r\n  "
+                  + "  \"deleted\" : { \"type\" : \"doubleSum\", \"name\" : \"deleted\", \"fieldName\" : \"deleted\" }\r\n "
+                  + " },\r\n "
+                  + " \"queryGranularity\" : {\r\n    \"type\": \"none\"\r\n  },\r\n "
+                  + " \"size\" : 300000,\r\n "
+                  + " \"numRows\" : 5000000\r\n} ]";
 
   /* Submits the request and returns */
   @Override
@@ -78,7 +78,9 @@ public class QTestDruidSerDe extends DruidSerDe {
     List<SegmentAnalysis> resultsList;
     try {
       resultsList = DruidStorageHandlerUtils.JSON_MAPPER.readValue(RESPONSE,
-            new TypeReference<List<SegmentAnalysis>>() {});
+              new TypeReference<List<SegmentAnalysis>>() {
+              }
+      );
     } catch (Exception e) {
       throw new SerDeException(StringUtils.stringifyException(e));
     }

http://git-wip-us.apache.org/repos/asf/hive/blob/590687bc/druid-handler/src/test/org/apache/hadoop/hive/druid/TestDerbyConnector.java
----------------------------------------------------------------------
diff --git a/druid-handler/src/test/org/apache/hadoop/hive/druid/TestDerbyConnector.java b/druid-handler/src/test/org/apache/hadoop/hive/druid/TestDerbyConnector.java
new file mode 100644
index 0000000..75c0129
--- /dev/null
+++ b/druid-handler/src/test/org/apache/hadoop/hive/druid/TestDerbyConnector.java
@@ -0,0 +1,108 @@
+package org.apache.hadoop.hive.druid;
+
+import com.google.common.base.Supplier;
+import com.google.common.base.Suppliers;
+import io.druid.metadata.MetadataStorageConnectorConfig;
+import io.druid.metadata.MetadataStorageTablesConfig;
+import io.druid.metadata.storage.derby.DerbyConnector;
+import org.junit.Assert;
+import org.junit.rules.ExternalResource;
+import org.skife.jdbi.v2.DBI;
+import org.skife.jdbi.v2.exceptions.UnableToObtainConnectionException;
+
+import java.sql.SQLException;
+import java.util.UUID;
+
+public class TestDerbyConnector extends DerbyConnector {
+  private final String jdbcUri;
+
+  public TestDerbyConnector(
+          Supplier<MetadataStorageConnectorConfig> config,
+          Supplier<MetadataStorageTablesConfig> dbTables
+  ) {
+    this(config, dbTables, "jdbc:derby:memory:druidTest" + dbSafeUUID());
+  }
+
+  protected TestDerbyConnector(
+          Supplier<MetadataStorageConnectorConfig> config,
+          Supplier<MetadataStorageTablesConfig> dbTables,
+          String jdbcUri
+  ) {
+    super(config, dbTables, new DBI(jdbcUri + ";create=true"));
+    this.jdbcUri = jdbcUri;
+  }
+
+  public void tearDown() {
+    try {
+      new DBI(jdbcUri + ";drop=true").open().close();
+    } catch (UnableToObtainConnectionException e) {
+      SQLException cause = (SQLException) e.getCause();
+      // error code "08006" indicates proper shutdown
+      Assert.assertEquals(String.format("Derby not shutdown: [%s]", cause.toString()), "08006",
+              cause.getSQLState()
+      );
+    }
+  }
+
+  public static String dbSafeUUID() {
+    return UUID.randomUUID().toString().replace("-", "");
+  }
+
+  public String getJdbcUri() {
+    return jdbcUri;
+  }
+
+  public static class DerbyConnectorRule extends ExternalResource {
+    private TestDerbyConnector connector;
+
+    private final Supplier<MetadataStorageTablesConfig> dbTables;
+
+    private final MetadataStorageConnectorConfig connectorConfig;
+
+    public DerbyConnectorRule() {
+      this("druidTest" + dbSafeUUID());
+    }
+
+    private DerbyConnectorRule(
+            final String defaultBase
+    ) {
+      this(Suppliers.ofInstance(MetadataStorageTablesConfig.fromBase(defaultBase)));
+    }
+
+    public DerbyConnectorRule(
+            Supplier<MetadataStorageTablesConfig> dbTables
+    ) {
+      this.dbTables = dbTables;
+      this.connectorConfig = new MetadataStorageConnectorConfig() {
+        @Override
+        public String getConnectURI() {
+          return connector.getJdbcUri();
+        }
+      };
+    }
+
+    @Override
+    protected void before() throws Throwable {
+      connector = new TestDerbyConnector(Suppliers.ofInstance(connectorConfig), dbTables);
+      connector.getDBI().open().close(); // create db
+    }
+
+    @Override
+    protected void after() {
+      connector.tearDown();
+    }
+
+    public TestDerbyConnector getConnector() {
+      return connector;
+    }
+
+    public MetadataStorageConnectorConfig getMetadataConnectorConfig() {
+      return connectorConfig;
+    }
+
+    public Supplier<MetadataStorageTablesConfig> metadataTablesConfigSupplier() {
+      return dbTables;
+    }
+  }
+
+}


[2/4] hive git commit: HIVE-15277: Teach Hive how to create/delete Druid segments (Slim Bouguerra, reviewed by Jesus Camacho Rodriguez)

Posted by jc...@apache.org.
http://git-wip-us.apache.org/repos/asf/hive/blob/590687bc/druid-handler/src/test/org/apache/hadoop/hive/druid/TestDruidSerDe.java
----------------------------------------------------------------------
diff --git a/druid-handler/src/test/org/apache/hadoop/hive/druid/TestDruidSerDe.java b/druid-handler/src/test/org/apache/hadoop/hive/druid/TestDruidSerDe.java
index 1343939..a495165 100644
--- a/druid-handler/src/test/org/apache/hadoop/hive/druid/TestDruidSerDe.java
+++ b/druid-handler/src/test/org/apache/hadoop/hive/druid/TestDruidSerDe.java
@@ -74,374 +74,408 @@ public class TestDruidSerDe {
   // Timeseries query
   private static final String TIMESERIES_QUERY =
           "{  \"queryType\": \"timeseries\", "
-          + " \"dataSource\": \"sample_datasource\", "
-          + " \"granularity\": \"day\", "
-          + " \"descending\": \"true\", "
-          + " \"filter\": {  "
-          + "  \"type\": \"and\",  "
-          + "  \"fields\": [   "
-          + "   { \"type\": \"selector\", \"dimension\": \"sample_dimension1\", \"value\": \"sample_value1\" },   "
-          + "   { \"type\": \"or\",    "
-          + "    \"fields\": [     "
-          + "     { \"type\": \"selector\", \"dimension\": \"sample_dimension2\", \"value\": \"sample_value2\" },     "
-          + "     { \"type\": \"selector\", \"dimension\": \"sample_dimension3\", \"value\": \"sample_value3\" }    "
-          + "    ]   "
-          + "   }  "
-          + "  ] "
-          + " }, "
-          + " \"aggregations\": [  "
-          + "  { \"type\": \"longSum\", \"name\": \"sample_name1\", \"fieldName\": \"sample_fieldName1\" },  "
-          + "  { \"type\": \"doubleSum\", \"name\": \"sample_name2\", \"fieldName\": \"sample_fieldName2\" } "
-          + " ], "
-          + " \"postAggregations\": [  "
-          + "  { \"type\": \"arithmetic\",  "
-          + "    \"name\": \"sample_divide\",  "
-          + "    \"fn\": \"/\",  "
-          + "    \"fields\": [   "
-          + "     { \"type\": \"fieldAccess\", \"name\": \"postAgg__sample_name1\", \"fieldName\": \"sample_name1\" },   "
-          + "     { \"type\": \"fieldAccess\", \"name\": \"postAgg__sample_name2\", \"fieldName\": \"sample_name2\" }  "
-          + "    ]  "
-          + "  } "
-          + " ], "
-          + " \"intervals\": [ \"2012-01-01T00:00:00.000/2012-01-03T00:00:00.000\" ]}";
+                  + " \"dataSource\": \"sample_datasource\", "
+                  + " \"granularity\": \"day\", "
+                  + " \"descending\": \"true\", "
+                  + " \"filter\": {  "
+                  + "  \"type\": \"and\",  "
+                  + "  \"fields\": [   "
+                  + "   { \"type\": \"selector\", \"dimension\": \"sample_dimension1\", \"value\": \"sample_value1\" },   "
+                  + "   { \"type\": \"or\",    "
+                  + "    \"fields\": [     "
+                  + "     { \"type\": \"selector\", \"dimension\": \"sample_dimension2\", \"value\": \"sample_value2\" },     "
+                  + "     { \"type\": \"selector\", \"dimension\": \"sample_dimension3\", \"value\": \"sample_value3\" }    "
+                  + "    ]   "
+                  + "   }  "
+                  + "  ] "
+                  + " }, "
+                  + " \"aggregations\": [  "
+                  + "  { \"type\": \"longSum\", \"name\": \"sample_name1\", \"fieldName\": \"sample_fieldName1\" },  "
+                  + "  { \"type\": \"doubleSum\", \"name\": \"sample_name2\", \"fieldName\": \"sample_fieldName2\" } "
+                  + " ], "
+                  + " \"postAggregations\": [  "
+                  + "  { \"type\": \"arithmetic\",  "
+                  + "    \"name\": \"sample_divide\",  "
+                  + "    \"fn\": \"/\",  "
+                  + "    \"fields\": [   "
+                  + "     { \"type\": \"fieldAccess\", \"name\": \"postAgg__sample_name1\", \"fieldName\": \"sample_name1\" },   "
+                  + "     { \"type\": \"fieldAccess\", \"name\": \"postAgg__sample_name2\", \"fieldName\": \"sample_name2\" }  "
+                  + "    ]  "
+                  + "  } "
+                  + " ], "
+                  + " \"intervals\": [ \"2012-01-01T00:00:00.000/2012-01-03T00:00:00.000\" ]}";
+
   // Timeseries query results
   private static final String TIMESERIES_QUERY_RESULTS =
           "[  "
-          + "{   "
-          + " \"timestamp\": \"2012-01-01T00:00:00.000Z\",   "
-          + " \"result\": { \"sample_name1\": 0, \"sample_name2\": 1.0, \"sample_divide\": 2.2222 }   "
-          + "},  "
-          + "{   "
-          + " \"timestamp\": \"2012-01-02T00:00:00.000Z\",   "
-          + " \"result\": { \"sample_name1\": 2, \"sample_name2\": 3.32, \"sample_divide\": 4 }  "
-          + "}]";
+                  + "{   "
+                  + " \"timestamp\": \"2012-01-01T00:00:00.000Z\",   "
+                  + " \"result\": { \"sample_name1\": 0, \"sample_name2\": 1.0, \"sample_divide\": 2.2222 }   "
+                  + "},  "
+                  + "{   "
+                  + " \"timestamp\": \"2012-01-02T00:00:00.000Z\",   "
+                  + " \"result\": { \"sample_name1\": 2, \"sample_name2\": 3.32, \"sample_divide\": 4 }  "
+                  + "}]";
+
   // Timeseries query results as records
   private static final Object[][] TIMESERIES_QUERY_RESULTS_RECORDS = new Object[][] {
-    new Object[] { new TimestampWritable(new Timestamp(1325376000000L)), new LongWritable(0), new FloatWritable(1.0F), new FloatWritable(2.2222F) } ,
-    new Object[] { new TimestampWritable(new Timestamp(1325462400000L)), new LongWritable(2), new FloatWritable(3.32F), new FloatWritable(4F)}
+          new Object[] { new TimestampWritable(new Timestamp(1325376000000L)), new LongWritable(0),
+                  new FloatWritable(1.0F), new FloatWritable(2.2222F) },
+          new Object[] { new TimestampWritable(new Timestamp(1325462400000L)), new LongWritable(2),
+                  new FloatWritable(3.32F), new FloatWritable(4F) }
   };
 
   // TopN query
   private static final String TOPN_QUERY =
           "{  \"queryType\": \"topN\", "
-          + " \"dataSource\": \"sample_data\", "
-          + " \"dimension\": \"sample_dim\", "
-          + " \"threshold\": 5, "
-          + " \"metric\": \"count\", "
-          + " \"granularity\": \"all\", "
-          + " \"filter\": {  "
-          + "  \"type\": \"and\",  "
-          + "  \"fields\": [   "
-          + "   {    "
-          + "    \"type\": \"selector\",    "
-          + "    \"dimension\": \"dim1\",    "
-          + "    \"value\": \"some_value\"   "
-          + "   },   "
-          + "   {    "
-          + "    \"type\": \"selector\",    "
-          + "    \"dimension\": \"dim2\",    "
-          + "    \"value\": \"some_other_val\"   "
-          + "   }  "
-          + "  ] "
-          + " }, "
-          + " \"aggregations\": [  "
-          + "  {   "
-          + "   \"type\": \"longSum\",   "
-          + "   \"name\": \"count\",   "
-          + "   \"fieldName\": \"count\"  "
-          + "  },  "
-          + "  {   "
-          + "   \"type\": \"doubleSum\",   "
-          + "   \"name\": \"some_metric\",   "
-          + "   \"fieldName\": \"some_metric\"  "
-          + "  } "
-          + " ], "
-          + " \"postAggregations\": [  "
-          + "  {   "
-          + "   \"type\": \"arithmetic\",   "
-          + "   \"name\": \"sample_divide\",   "
-          + "   \"fn\": \"/\",   "
-          + "   \"fields\": [    "
-          + "    {     "
-          + "     \"type\": \"fieldAccess\",     "
-          + "     \"name\": \"some_metric\",     "
-          + "     \"fieldName\": \"some_metric\"    "
-          + "    },    "
-          + "    {     "
-          + "     \"type\": \"fieldAccess\",     "
-          + "     \"name\": \"count\",     "
-          + "     \"fieldName\": \"count\"    "
-          + "    }   "
-          + "   ]  "
-          + "  } "
-          + " ], "
-          + " \"intervals\": [  "
-          + "  \"2013-08-31T00:00:00.000/2013-09-03T00:00:00.000\" "
-          + " ]}";
+                  + " \"dataSource\": \"sample_data\", "
+                  + " \"dimension\": \"sample_dim\", "
+                  + " \"threshold\": 5, "
+                  + " \"metric\": \"count\", "
+                  + " \"granularity\": \"all\", "
+                  + " \"filter\": {  "
+                  + "  \"type\": \"and\",  "
+                  + "  \"fields\": [   "
+                  + "   {    "
+                  + "    \"type\": \"selector\",    "
+                  + "    \"dimension\": \"dim1\",    "
+                  + "    \"value\": \"some_value\"   "
+                  + "   },   "
+                  + "   {    "
+                  + "    \"type\": \"selector\",    "
+                  + "    \"dimension\": \"dim2\",    "
+                  + "    \"value\": \"some_other_val\"   "
+                  + "   }  "
+                  + "  ] "
+                  + " }, "
+                  + " \"aggregations\": [  "
+                  + "  {   "
+                  + "   \"type\": \"longSum\",   "
+                  + "   \"name\": \"count\",   "
+                  + "   \"fieldName\": \"count\"  "
+                  + "  },  "
+                  + "  {   "
+                  + "   \"type\": \"doubleSum\",   "
+                  + "   \"name\": \"some_metric\",   "
+                  + "   \"fieldName\": \"some_metric\"  "
+                  + "  } "
+                  + " ], "
+                  + " \"postAggregations\": [  "
+                  + "  {   "
+                  + "   \"type\": \"arithmetic\",   "
+                  + "   \"name\": \"sample_divide\",   "
+                  + "   \"fn\": \"/\",   "
+                  + "   \"fields\": [    "
+                  + "    {     "
+                  + "     \"type\": \"fieldAccess\",     "
+                  + "     \"name\": \"some_metric\",     "
+                  + "     \"fieldName\": \"some_metric\"    "
+                  + "    },    "
+                  + "    {     "
+                  + "     \"type\": \"fieldAccess\",     "
+                  + "     \"name\": \"count\",     "
+                  + "     \"fieldName\": \"count\"    "
+                  + "    }   "
+                  + "   ]  "
+                  + "  } "
+                  + " ], "
+                  + " \"intervals\": [  "
+                  + "  \"2013-08-31T00:00:00.000/2013-09-03T00:00:00.000\" "
+                  + " ]}";
+
   // TopN query results
   private static final String TOPN_QUERY_RESULTS =
           "[ "
-          + " {  "
-          + "  \"timestamp\": \"2013-08-31T00:00:00.000Z\",  "
-          + "  \"result\": [   "
-          + "   {   "
-          + "     \"sample_dim\": \"dim1_val\",   "
-          + "     \"count\": 111,   "
-          + "     \"some_metric\": 10669,   "
-          + "     \"sample_divide\": 96.11711711711712   "
-          + "   },   "
-          + "   {   "
-          + "     \"sample_dim\": \"another_dim1_val\",   "
-          + "     \"count\": 88,   "
-          + "     \"some_metric\": 28344,   "
-          + "     \"sample_divide\": 322.09090909090907   "
-          + "   },   "
-          + "   {   "
-          + "     \"sample_dim\": \"dim1_val3\",   "
-          + "     \"count\": 70,   "
-          + "     \"some_metric\": 871,   "
-          + "     \"sample_divide\": 12.442857142857143   "
-          + "   },   "
-          + "   {   "
-          + "     \"sample_dim\": \"dim1_val4\",   "
-          + "     \"count\": 62,   "
-          + "     \"some_metric\": 815,   "
-          + "     \"sample_divide\": 13.14516129032258   "
-          + "   },   "
-          + "   {   "
-          + "     \"sample_dim\": \"dim1_val5\",   "
-          + "     \"count\": 60,   "
-          + "     \"some_metric\": 2787,   "
-          + "     \"sample_divide\": 46.45   "
-          + "   }  "
-          + "  ] "
-          + " }]";
+                  + " {  "
+                  + "  \"timestamp\": \"2013-08-31T00:00:00.000Z\",  "
+                  + "  \"result\": [   "
+                  + "   {   "
+                  + "     \"sample_dim\": \"dim1_val\",   "
+                  + "     \"count\": 111,   "
+                  + "     \"some_metric\": 10669,   "
+                  + "     \"sample_divide\": 96.11711711711712   "
+                  + "   },   "
+                  + "   {   "
+                  + "     \"sample_dim\": \"another_dim1_val\",   "
+                  + "     \"count\": 88,   "
+                  + "     \"some_metric\": 28344,   "
+                  + "     \"sample_divide\": 322.09090909090907   "
+                  + "   },   "
+                  + "   {   "
+                  + "     \"sample_dim\": \"dim1_val3\",   "
+                  + "     \"count\": 70,   "
+                  + "     \"some_metric\": 871,   "
+                  + "     \"sample_divide\": 12.442857142857143   "
+                  + "   },   "
+                  + "   {   "
+                  + "     \"sample_dim\": \"dim1_val4\",   "
+                  + "     \"count\": 62,   "
+                  + "     \"some_metric\": 815,   "
+                  + "     \"sample_divide\": 13.14516129032258   "
+                  + "   },   "
+                  + "   {   "
+                  + "     \"sample_dim\": \"dim1_val5\",   "
+                  + "     \"count\": 60,   "
+                  + "     \"some_metric\": 2787,   "
+                  + "     \"sample_divide\": 46.45   "
+                  + "   }  "
+                  + "  ] "
+                  + " }]";
+
   // TopN query results as records
   private static final Object[][] TOPN_QUERY_RESULTS_RECORDS = new Object[][] {
-    new Object[] { new TimestampWritable(new Timestamp(1377907200000L)), new Text("dim1_val"), new LongWritable(111), new FloatWritable(10669F), new FloatWritable(96.11711711711712F) } ,
-    new Object[] { new TimestampWritable(new Timestamp(1377907200000L)), new Text("another_dim1_val"), new LongWritable(88), new FloatWritable(28344F), new FloatWritable(322.09090909090907F) } ,
-    new Object[] { new TimestampWritable(new Timestamp(1377907200000L)), new Text("dim1_val3"), new LongWritable(70), new FloatWritable(871F), new FloatWritable(12.442857142857143F) } ,
-    new Object[] { new TimestampWritable(new Timestamp(1377907200000L)), new Text("dim1_val4"), new LongWritable(62), new FloatWritable(815F), new FloatWritable(13.14516129032258F) } ,
-    new Object[] { new TimestampWritable(new Timestamp(1377907200000L)), new Text("dim1_val5"), new LongWritable(60), new FloatWritable(2787F), new FloatWritable(46.45F) }
+          new Object[] { new TimestampWritable(new Timestamp(1377907200000L)), new Text("dim1_val"),
+                  new LongWritable(111), new FloatWritable(10669F),
+                  new FloatWritable(96.11711711711712F) },
+          new Object[] { new TimestampWritable(new Timestamp(1377907200000L)),
+                  new Text("another_dim1_val"), new LongWritable(88), new FloatWritable(28344F),
+                  new FloatWritable(322.09090909090907F) },
+          new Object[] { new TimestampWritable(new Timestamp(1377907200000L)),
+                  new Text("dim1_val3"), new LongWritable(70), new FloatWritable(871F),
+                  new FloatWritable(12.442857142857143F) },
+          new Object[] { new TimestampWritable(new Timestamp(1377907200000L)),
+                  new Text("dim1_val4"), new LongWritable(62), new FloatWritable(815F),
+                  new FloatWritable(13.14516129032258F) },
+          new Object[] { new TimestampWritable(new Timestamp(1377907200000L)),
+                  new Text("dim1_val5"), new LongWritable(60), new FloatWritable(2787F),
+                  new FloatWritable(46.45F) }
   };
 
   // GroupBy query
   private static final String GROUP_BY_QUERY =
           "{ "
-          + " \"queryType\": \"groupBy\", "
-          + " \"dataSource\": \"sample_datasource\", "
-          + " \"granularity\": \"day\", "
-          + " \"dimensions\": [\"country\", \"device\"], "
-          + " \"limitSpec\": {"
-          + " \"type\": \"default\","
-          + " \"limit\": 5000,"
-          + " \"columns\": [\"country\", \"data_transfer\"] }, "
-          + " \"filter\": {  "
-          + "  \"type\": \"and\",  "
-          + "  \"fields\": [   "
-          + "   { \"type\": \"selector\", \"dimension\": \"carrier\", \"value\": \"AT&T\" },   "
-          + "   { \"type\": \"or\",     "
-          + "    \"fields\": [     "
-          + "     { \"type\": \"selector\", \"dimension\": \"make\", \"value\": \"Apple\" },     "
-          + "     { \"type\": \"selector\", \"dimension\": \"make\", \"value\": \"Samsung\" }    "
-          + "    ]   "
-          + "   }  "
-          + "  ] "
-          + " }, "
-          + " \"aggregations\": [  "
-          + "  { \"type\": \"longSum\", \"name\": \"total_usage\", \"fieldName\": \"user_count\" },  "
-          + "  { \"type\": \"doubleSum\", \"name\": \"data_transfer\", \"fieldName\": \"data_transfer\" } "
-          + " ], "
-          + " \"postAggregations\": [  "
-          + "  { \"type\": \"arithmetic\",  "
-          + "    \"name\": \"avg_usage\",  "
-          + "    \"fn\": \"/\",  "
-          + "    \"fields\": [   "
-          + "     { \"type\": \"fieldAccess\", \"fieldName\": \"data_transfer\" },   "
-          + "     { \"type\": \"fieldAccess\", \"fieldName\": \"total_usage\" }  "
-          + "    ]  "
-          + "  } "
-          + " ], "
-          + " \"intervals\": [ \"2012-01-01T00:00:00.000/2012-01-03T00:00:00.000\" ], "
-          + " \"having\": {  "
-          + "  \"type\": \"greaterThan\",  "
-          + "  \"aggregation\": \"total_usage\",  "
-          + "  \"value\": 100 "
-          + " }}";
+                  + " \"queryType\": \"groupBy\", "
+                  + " \"dataSource\": \"sample_datasource\", "
+                  + " \"granularity\": \"day\", "
+                  + " \"dimensions\": [\"country\", \"device\"], "
+                  + " \"limitSpec\": {"
+                  + " \"type\": \"default\","
+                  + " \"limit\": 5000,"
+                  + " \"columns\": [\"country\", \"data_transfer\"] }, "
+                  + " \"filter\": {  "
+                  + "  \"type\": \"and\",  "
+                  + "  \"fields\": [   "
+                  + "   { \"type\": \"selector\", \"dimension\": \"carrier\", \"value\": \"AT&T\" },   "
+                  + "   { \"type\": \"or\",     "
+                  + "    \"fields\": [     "
+                  + "     { \"type\": \"selector\", \"dimension\": \"make\", \"value\": \"Apple\" },     "
+                  + "     { \"type\": \"selector\", \"dimension\": \"make\", \"value\": \"Samsung\" }    "
+                  + "    ]   "
+                  + "   }  "
+                  + "  ] "
+                  + " }, "
+                  + " \"aggregations\": [  "
+                  + "  { \"type\": \"longSum\", \"name\": \"total_usage\", \"fieldName\": \"user_count\" },  "
+                  + "  { \"type\": \"doubleSum\", \"name\": \"data_transfer\", \"fieldName\": \"data_transfer\" } "
+                  + " ], "
+                  + " \"postAggregations\": [  "
+                  + "  { \"type\": \"arithmetic\",  "
+                  + "    \"name\": \"avg_usage\",  "
+                  + "    \"fn\": \"/\",  "
+                  + "    \"fields\": [   "
+                  + "     { \"type\": \"fieldAccess\", \"fieldName\": \"data_transfer\" },   "
+                  + "     { \"type\": \"fieldAccess\", \"fieldName\": \"total_usage\" }  "
+                  + "    ]  "
+                  + "  } "
+                  + " ], "
+                  + " \"intervals\": [ \"2012-01-01T00:00:00.000/2012-01-03T00:00:00.000\" ], "
+                  + " \"having\": {  "
+                  + "  \"type\": \"greaterThan\",  "
+                  + "  \"aggregation\": \"total_usage\",  "
+                  + "  \"value\": 100 "
+                  + " }}";
+
   // GroupBy query results
   private static final String GROUP_BY_QUERY_RESULTS =
           "[  "
-          + " {  "
-          + "  \"version\" : \"v1\",  "
-          + "  \"timestamp\" : \"2012-01-01T00:00:00.000Z\",  "
-          + "  \"event\" : {   "
-          + "   \"country\" : \"India\",   "
-          + "   \"device\" : \"phone\",   "
-          + "   \"total_usage\" : 88,   "
-          + "   \"data_transfer\" : 29.91233453,   "
-          + "   \"avg_usage\" : 60.32  "
-          + "  } "
-          + " },  "
-          + " {  "
-          + "  \"version\" : \"v1\",  "
-          + "  \"timestamp\" : \"2012-01-01T00:00:12.000Z\",  "
-          + "  \"event\" : {   "
-          + "   \"country\" : \"Spain\",   "
-          + "   \"device\" : \"pc\",   "
-          + "   \"total_usage\" : 16,   "
-          + "   \"data_transfer\" : 172.93494959,   "
-          + "   \"avg_usage\" : 6.333333  "
-          + "  } "
-          + " }]";
+                  + " {  "
+                  + "  \"version\" : \"v1\",  "
+                  + "  \"timestamp\" : \"2012-01-01T00:00:00.000Z\",  "
+                  + "  \"event\" : {   "
+                  + "   \"country\" : \"India\",   "
+                  + "   \"device\" : \"phone\",   "
+                  + "   \"total_usage\" : 88,   "
+                  + "   \"data_transfer\" : 29.91233453,   "
+                  + "   \"avg_usage\" : 60.32  "
+                  + "  } "
+                  + " },  "
+                  + " {  "
+                  + "  \"version\" : \"v1\",  "
+                  + "  \"timestamp\" : \"2012-01-01T00:00:12.000Z\",  "
+                  + "  \"event\" : {   "
+                  + "   \"country\" : \"Spain\",   "
+                  + "   \"device\" : \"pc\",   "
+                  + "   \"total_usage\" : 16,   "
+                  + "   \"data_transfer\" : 172.93494959,   "
+                  + "   \"avg_usage\" : 6.333333  "
+                  + "  } "
+                  + " }]";
+
   // GroupBy query results as records
   private static final Object[][] GROUP_BY_QUERY_RESULTS_RECORDS = new Object[][] {
-    new Object[] { new TimestampWritable(new Timestamp(1325376000000L)), new Text("India"), new Text("phone"), new LongWritable(88), new FloatWritable(29.91233453F), new FloatWritable(60.32F) } ,
-    new Object[] { new TimestampWritable(new Timestamp(1325376012000L)), new Text("Spain"), new Text("pc"), new LongWritable(16), new FloatWritable(172.93494959F), new FloatWritable(6.333333F) }
+          new Object[] { new TimestampWritable(new Timestamp(1325376000000L)), new Text("India"),
+                  new Text("phone"), new LongWritable(88), new FloatWritable(29.91233453F),
+                  new FloatWritable(60.32F) },
+          new Object[] { new TimestampWritable(new Timestamp(1325376012000L)), new Text("Spain"),
+                  new Text("pc"), new LongWritable(16), new FloatWritable(172.93494959F),
+                  new FloatWritable(6.333333F) }
   };
 
   // Select query
   private static final String SELECT_QUERY =
           "{   \"queryType\": \"select\",  "
-          + " \"dataSource\": \"wikipedia\",   \"descending\": \"false\",  "
-          + " \"dimensions\":[\"robot\",\"namespace\",\"anonymous\",\"unpatrolled\",\"page\",\"language\",\"newpage\",\"user\"],  "
-          + " \"metrics\":[\"count\",\"added\",\"delta\",\"variation\",\"deleted\"],  "
-          + " \"granularity\": \"all\",  "
-          + " \"intervals\": [     \"2013-01-01/2013-01-02\"   ],  "
-          + " \"pagingSpec\":{\"pagingIdentifiers\": {}, \"threshold\":5} }";
+                  + " \"dataSource\": \"wikipedia\",   \"descending\": \"false\",  "
+                  + " \"dimensions\":[\"robot\",\"namespace\",\"anonymous\",\"unpatrolled\",\"page\",\"language\",\"newpage\",\"user\"],  "
+                  + " \"metrics\":[\"count\",\"added\",\"delta\",\"variation\",\"deleted\"],  "
+                  + " \"granularity\": \"all\",  "
+                  + " \"intervals\": [     \"2013-01-01/2013-01-02\"   ],  "
+                  + " \"pagingSpec\":{\"pagingIdentifiers\": {}, \"threshold\":5} }";
+
   // Select query results
   private static final String SELECT_QUERY_RESULTS =
           "[{ "
-          + " \"timestamp\" : \"2013-01-01T00:00:00.000Z\", "
-          + " \"result\" : {  "
-          + "  \"pagingIdentifiers\" : {   "
-          + "   \"wikipedia_2012-12-29T00:00:00.000Z_2013-01-10T08:00:00.000Z_2013-01-10T08:13:47.830Z_v9\" : 4    }, "
-          + "   \"events\" : [ {  "
-          + "    \"segmentId\" : \"wikipedia_editstream_2012-12-29T00:00:00.000Z_2013-01-10T08:00:00.000Z_2013-01-10T08:13:47.830Z_v9\",  "
-          + "    \"offset\" : 0,  "
-          + "    \"event\" : {   "
-          + "     \"timestamp\" : \"2013-01-01T00:00:00.000Z\",   "
-          + "     \"robot\" : \"1\",   "
-          + "     \"namespace\" : \"article\",   "
-          + "     \"anonymous\" : \"0\",   "
-          + "     \"unpatrolled\" : \"0\",   "
-          + "     \"page\" : \"11._korpus_(NOVJ)\",   "
-          + "     \"language\" : \"sl\",   "
-          + "     \"newpage\" : \"0\",   "
-          + "     \"user\" : \"EmausBot\",   "
-          + "     \"count\" : 1.0,   "
-          + "     \"added\" : 39.0,   "
-          + "     \"delta\" : 39.0,   "
-          + "     \"variation\" : 39.0,   "
-          + "     \"deleted\" : 0.0  "
-          + "    } "
-          + "   }, {  "
-          + "    \"segmentId\" : \"wikipedia_2012-12-29T00:00:00.000Z_2013-01-10T08:00:00.000Z_2013-01-10T08:13:47.830Z_v9\",  "
-          + "    \"offset\" : 1,  "
-          + "    \"event\" : {   "
-          + "     \"timestamp\" : \"2013-01-01T00:00:00.000Z\",   "
-          + "     \"robot\" : \"0\",   "
-          + "     \"namespace\" : \"article\",   "
-          + "     \"anonymous\" : \"0\",   "
-          + "     \"unpatrolled\" : \"0\",   "
-          + "     \"page\" : \"112_U.S._580\",   "
-          + "     \"language\" : \"en\",   "
-          + "     \"newpage\" : \"1\",   "
-          + "     \"user\" : \"MZMcBride\",   "
-          + "     \"count\" : 1.0,   "
-          + "     \"added\" : 70.0,   "
-          + "     \"delta\" : 70.0,   "
-          + "     \"variation\" : 70.0,   "
-          + "     \"deleted\" : 0.0  "
-          + "    } "
-          + "   }, {  "
-          + "    \"segmentId\" : \"wikipedia_2012-12-29T00:00:00.000Z_2013-01-10T08:00:00.000Z_2013-01-10T08:13:47.830Z_v9\",  "
-          + "    \"offset\" : 2,  "
-          + "    \"event\" : {   "
-          + "     \"timestamp\" : \"2013-01-01T00:00:12.000Z\",   "
-          + "     \"robot\" : \"0\",   "
-          + "     \"namespace\" : \"article\",   "
-          + "     \"anonymous\" : \"0\",   "
-          + "     \"unpatrolled\" : \"0\",   "
-          + "     \"page\" : \"113_U.S._243\",   "
-          + "     \"language\" : \"en\",   "
-          + "     \"newpage\" : \"1\",   "
-          + "     \"user\" : \"MZMcBride\",   "
-          + "     \"count\" : 1.0,   "
-          + "     \"added\" : 77.0,   "
-          + "     \"delta\" : 77.0,   "
-          + "     \"variation\" : 77.0,   "
-          + "     \"deleted\" : 0.0  "
-          + "    } "
-          + "   }, {  "
-          + "    \"segmentId\" : \"wikipedia_2012-12-29T00:00:00.000Z_2013-01-10T08:00:00.000Z_2013-01-10T08:13:47.830Z_v9\",  "
-          + "    \"offset\" : 3,  "
-          + "    \"event\" : {   "
-          + "     \"timestamp\" : \"2013-01-01T00:00:12.000Z\",   "
-          + "     \"robot\" : \"0\",   "
-          + "     \"namespace\" : \"article\",   "
-          + "     \"anonymous\" : \"0\",   "
-          + "     \"unpatrolled\" : \"0\",   "
-          + "     \"page\" : \"113_U.S._73\",   "
-          + "     \"language\" : \"en\",   "
-          + "     \"newpage\" : \"1\",   "
-          + "     \"user\" : \"MZMcBride\",   "
-          + "     \"count\" : 1.0,   "
-          + "     \"added\" : 70.0,   "
-          + "     \"delta\" : 70.0,   "
-          + "     \"variation\" : 70.0,   "
-          + "     \"deleted\" : 0.0  "
-          + "    } "
-          + "   }, {  "
-          + "    \"segmentId\" : \"wikipedia_2012-12-29T00:00:00.000Z_2013-01-10T08:00:00.000Z_2013-01-10T08:13:47.830Z_v9\",  "
-          + "    \"offset\" : 4,  "
-          + "    \"event\" : {   "
-          + "     \"timestamp\" : \"2013-01-01T00:00:12.000Z\",   "
-          + "     \"robot\" : \"0\",   "
-          + "     \"namespace\" : \"article\",   "
-          + "     \"anonymous\" : \"0\",   "
-          + "     \"unpatrolled\" : \"0\",   "
-          + "     \"page\" : \"113_U.S._756\",   "
-          + "     \"language\" : \"en\",   "
-          + "     \"newpage\" : \"1\",   "
-          + "     \"user\" : \"MZMcBride\",   "
-          + "     \"count\" : 1.0,   "
-          + "     \"added\" : 68.0,   "
-          + "     \"delta\" : 68.0,   "
-          + "     \"variation\" : 68.0,   "
-          + "     \"deleted\" : 0.0  "
-          + "    } "
-          + "   } ]  }} ]";
+                  + " \"timestamp\" : \"2013-01-01T00:00:00.000Z\", "
+                  + " \"result\" : {  "
+                  + "  \"pagingIdentifiers\" : {   "
+                  + "   \"wikipedia_2012-12-29T00:00:00.000Z_2013-01-10T08:00:00.000Z_2013-01-10T08:13:47.830Z_v9\" : 4    }, "
+                  + "   \"events\" : [ {  "
+                  + "    \"segmentId\" : \"wikipedia_editstream_2012-12-29T00:00:00.000Z_2013-01-10T08:00:00.000Z_2013-01-10T08:13:47.830Z_v9\",  "
+                  + "    \"offset\" : 0,  "
+                  + "    \"event\" : {   "
+                  + "     \"timestamp\" : \"2013-01-01T00:00:00.000Z\",   "
+                  + "     \"robot\" : \"1\",   "
+                  + "     \"namespace\" : \"article\",   "
+                  + "     \"anonymous\" : \"0\",   "
+                  + "     \"unpatrolled\" : \"0\",   "
+                  + "     \"page\" : \"11._korpus_(NOVJ)\",   "
+                  + "     \"language\" : \"sl\",   "
+                  + "     \"newpage\" : \"0\",   "
+                  + "     \"user\" : \"EmausBot\",   "
+                  + "     \"count\" : 1.0,   "
+                  + "     \"added\" : 39.0,   "
+                  + "     \"delta\" : 39.0,   "
+                  + "     \"variation\" : 39.0,   "
+                  + "     \"deleted\" : 0.0  "
+                  + "    } "
+                  + "   }, {  "
+                  + "    \"segmentId\" : \"wikipedia_2012-12-29T00:00:00.000Z_2013-01-10T08:00:00.000Z_2013-01-10T08:13:47.830Z_v9\",  "
+                  + "    \"offset\" : 1,  "
+                  + "    \"event\" : {   "
+                  + "     \"timestamp\" : \"2013-01-01T00:00:00.000Z\",   "
+                  + "     \"robot\" : \"0\",   "
+                  + "     \"namespace\" : \"article\",   "
+                  + "     \"anonymous\" : \"0\",   "
+                  + "     \"unpatrolled\" : \"0\",   "
+                  + "     \"page\" : \"112_U.S._580\",   "
+                  + "     \"language\" : \"en\",   "
+                  + "     \"newpage\" : \"1\",   "
+                  + "     \"user\" : \"MZMcBride\",   "
+                  + "     \"count\" : 1.0,   "
+                  + "     \"added\" : 70.0,   "
+                  + "     \"delta\" : 70.0,   "
+                  + "     \"variation\" : 70.0,   "
+                  + "     \"deleted\" : 0.0  "
+                  + "    } "
+                  + "   }, {  "
+                  + "    \"segmentId\" : \"wikipedia_2012-12-29T00:00:00.000Z_2013-01-10T08:00:00.000Z_2013-01-10T08:13:47.830Z_v9\",  "
+                  + "    \"offset\" : 2,  "
+                  + "    \"event\" : {   "
+                  + "     \"timestamp\" : \"2013-01-01T00:00:12.000Z\",   "
+                  + "     \"robot\" : \"0\",   "
+                  + "     \"namespace\" : \"article\",   "
+                  + "     \"anonymous\" : \"0\",   "
+                  + "     \"unpatrolled\" : \"0\",   "
+                  + "     \"page\" : \"113_U.S._243\",   "
+                  + "     \"language\" : \"en\",   "
+                  + "     \"newpage\" : \"1\",   "
+                  + "     \"user\" : \"MZMcBride\",   "
+                  + "     \"count\" : 1.0,   "
+                  + "     \"added\" : 77.0,   "
+                  + "     \"delta\" : 77.0,   "
+                  + "     \"variation\" : 77.0,   "
+                  + "     \"deleted\" : 0.0  "
+                  + "    } "
+                  + "   }, {  "
+                  + "    \"segmentId\" : \"wikipedia_2012-12-29T00:00:00.000Z_2013-01-10T08:00:00.000Z_2013-01-10T08:13:47.830Z_v9\",  "
+                  + "    \"offset\" : 3,  "
+                  + "    \"event\" : {   "
+                  + "     \"timestamp\" : \"2013-01-01T00:00:12.000Z\",   "
+                  + "     \"robot\" : \"0\",   "
+                  + "     \"namespace\" : \"article\",   "
+                  + "     \"anonymous\" : \"0\",   "
+                  + "     \"unpatrolled\" : \"0\",   "
+                  + "     \"page\" : \"113_U.S._73\",   "
+                  + "     \"language\" : \"en\",   "
+                  + "     \"newpage\" : \"1\",   "
+                  + "     \"user\" : \"MZMcBride\",   "
+                  + "     \"count\" : 1.0,   "
+                  + "     \"added\" : 70.0,   "
+                  + "     \"delta\" : 70.0,   "
+                  + "     \"variation\" : 70.0,   "
+                  + "     \"deleted\" : 0.0  "
+                  + "    } "
+                  + "   }, {  "
+                  + "    \"segmentId\" : \"wikipedia_2012-12-29T00:00:00.000Z_2013-01-10T08:00:00.000Z_2013-01-10T08:13:47.830Z_v9\",  "
+                  + "    \"offset\" : 4,  "
+                  + "    \"event\" : {   "
+                  + "     \"timestamp\" : \"2013-01-01T00:00:12.000Z\",   "
+                  + "     \"robot\" : \"0\",   "
+                  + "     \"namespace\" : \"article\",   "
+                  + "     \"anonymous\" : \"0\",   "
+                  + "     \"unpatrolled\" : \"0\",   "
+                  + "     \"page\" : \"113_U.S._756\",   "
+                  + "     \"language\" : \"en\",   "
+                  + "     \"newpage\" : \"1\",   "
+                  + "     \"user\" : \"MZMcBride\",   "
+                  + "     \"count\" : 1.0,   "
+                  + "     \"added\" : 68.0,   "
+                  + "     \"delta\" : 68.0,   "
+                  + "     \"variation\" : 68.0,   "
+                  + "     \"deleted\" : 0.0  "
+                  + "    } "
+                  + "   } ]  }} ]";
+
   // Select query results as records
   private static final Object[][] SELECT_QUERY_RESULTS_RECORDS = new Object[][] {
-    new Object[] { new TimestampWritable(new Timestamp(1356998400000L)), new Text("1"), new Text("article"), new Text("0"), new Text("0"),
-        new Text("11._korpus_(NOVJ)"), new Text("sl"), new Text("0"), new Text("EmausBot"),
-        new FloatWritable(1.0F), new FloatWritable(39.0F), new FloatWritable(39.0F), new FloatWritable(39.0F), new FloatWritable(0.0F) } ,
-    new Object[] { new TimestampWritable(new Timestamp(1356998400000L)), new Text("0"), new Text("article"), new Text("0"), new Text("0"),
-        new Text("112_U.S._580"), new Text("en"), new Text("1"), new Text("MZMcBride"),
-        new FloatWritable(1.0F), new FloatWritable(70.0F), new FloatWritable(70.0F), new FloatWritable(70.0F), new FloatWritable(0.0F) } ,
-    new Object[] { new TimestampWritable(new Timestamp(1356998412000L)), new Text("0"), new Text("article"), new Text("0"), new Text("0"),
-        new Text("113_U.S._243"), new Text("en"), new Text("1"), new Text("MZMcBride"),
-        new FloatWritable(1.0F), new FloatWritable(77.0F), new FloatWritable(77.0F), new FloatWritable(77.0F), new FloatWritable(0.0F) } ,
-    new Object[] { new TimestampWritable(new Timestamp(1356998412000L)), new Text("0"), new Text("article"), new Text("0"), new Text("0"),
-        new Text("113_U.S._73"), new Text("en"), new Text("1"), new Text("MZMcBride"),
-        new FloatWritable(1.0F), new FloatWritable(70.0F), new FloatWritable(70.0F), new FloatWritable(70.0F), new FloatWritable(0.0F) } ,
-    new Object[] { new TimestampWritable(new Timestamp(1356998412000L)), new Text("0"), new Text("article"), new Text("0"), new Text("0"),
-        new Text("113_U.S._756"), new Text("en"), new Text("1"), new Text("MZMcBride"),
-        new FloatWritable(1.0F), new FloatWritable(68.0F), new FloatWritable(68.0F), new FloatWritable(68.0F), new FloatWritable(0.0F) }
+          new Object[] { new TimestampWritable(new Timestamp(1356998400000L)), new Text("1"),
+                  new Text("article"), new Text("0"), new Text("0"),
+                  new Text("11._korpus_(NOVJ)"), new Text("sl"), new Text("0"),
+                  new Text("EmausBot"),
+                  new FloatWritable(1.0F), new FloatWritable(39.0F), new FloatWritable(39.0F),
+                  new FloatWritable(39.0F), new FloatWritable(0.0F) },
+          new Object[] { new TimestampWritable(new Timestamp(1356998400000L)), new Text("0"),
+                  new Text("article"), new Text("0"), new Text("0"),
+                  new Text("112_U.S._580"), new Text("en"), new Text("1"), new Text("MZMcBride"),
+                  new FloatWritable(1.0F), new FloatWritable(70.0F), new FloatWritable(70.0F),
+                  new FloatWritable(70.0F), new FloatWritable(0.0F) },
+          new Object[] { new TimestampWritable(new Timestamp(1356998412000L)), new Text("0"),
+                  new Text("article"), new Text("0"), new Text("0"),
+                  new Text("113_U.S._243"), new Text("en"), new Text("1"), new Text("MZMcBride"),
+                  new FloatWritable(1.0F), new FloatWritable(77.0F), new FloatWritable(77.0F),
+                  new FloatWritable(77.0F), new FloatWritable(0.0F) },
+          new Object[] { new TimestampWritable(new Timestamp(1356998412000L)), new Text("0"),
+                  new Text("article"), new Text("0"), new Text("0"),
+                  new Text("113_U.S._73"), new Text("en"), new Text("1"), new Text("MZMcBride"),
+                  new FloatWritable(1.0F), new FloatWritable(70.0F), new FloatWritable(70.0F),
+                  new FloatWritable(70.0F), new FloatWritable(0.0F) },
+          new Object[] { new TimestampWritable(new Timestamp(1356998412000L)), new Text("0"),
+                  new Text("article"), new Text("0"), new Text("0"),
+                  new Text("113_U.S._756"), new Text("en"), new Text("1"), new Text("MZMcBride"),
+                  new FloatWritable(1.0F), new FloatWritable(68.0F), new FloatWritable(68.0F),
+                  new FloatWritable(68.0F), new FloatWritable(0.0F) }
   };
 
-
   /**
    * Test the default behavior of the objects and object inspectors.
-   * @throws IOException 
-   * @throws IllegalAccessException 
-   * @throws IllegalArgumentException 
-   * @throws SecurityException 
-   * @throws NoSuchFieldException 
-   * @throws JsonMappingException 
-   * @throws JsonParseException 
-   * @throws InvocationTargetException 
-   * @throws NoSuchMethodException 
+   * @throws IOException
+   * @throws IllegalAccessException
+   * @throws IllegalArgumentException
+   * @throws SecurityException
+   * @throws NoSuchFieldException
+   * @throws JsonMappingException
+   * @throws JsonParseException
+   * @throws InvocationTargetException
+   * @throws NoSuchMethodException
    */
   @Test
   public void testDruidSerDe()
@@ -457,25 +491,31 @@ public class TestDruidSerDe {
     tbl = createPropertiesQuery("sample_datasource", Query.TIMESERIES, TIMESERIES_QUERY);
     SerDeUtils.initializeSerDe(serDe, conf, tbl, null);
     deserializeQueryResults(serDe, Query.TIMESERIES, TIMESERIES_QUERY,
-            TIMESERIES_QUERY_RESULTS, TIMESERIES_QUERY_RESULTS_RECORDS);
+            TIMESERIES_QUERY_RESULTS, TIMESERIES_QUERY_RESULTS_RECORDS
+    );
     // TopN query
     tbl = createPropertiesQuery("sample_data", Query.TOPN, TOPN_QUERY);
     SerDeUtils.initializeSerDe(serDe, conf, tbl, null);
     deserializeQueryResults(serDe, Query.TOPN, TOPN_QUERY,
-            TOPN_QUERY_RESULTS, TOPN_QUERY_RESULTS_RECORDS);
+            TOPN_QUERY_RESULTS, TOPN_QUERY_RESULTS_RECORDS
+    );
     // GroupBy query
     tbl = createPropertiesQuery("sample_datasource", Query.GROUP_BY, GROUP_BY_QUERY);
     SerDeUtils.initializeSerDe(serDe, conf, tbl, null);
     deserializeQueryResults(serDe, Query.GROUP_BY, GROUP_BY_QUERY,
-            GROUP_BY_QUERY_RESULTS, GROUP_BY_QUERY_RESULTS_RECORDS);
+            GROUP_BY_QUERY_RESULTS, GROUP_BY_QUERY_RESULTS_RECORDS
+    );
     // Select query
     tbl = createPropertiesQuery("wikipedia", Query.SELECT, SELECT_QUERY);
     SerDeUtils.initializeSerDe(serDe, conf, tbl, null);
     deserializeQueryResults(serDe, Query.SELECT, SELECT_QUERY,
-            SELECT_QUERY_RESULTS, SELECT_QUERY_RESULTS_RECORDS);
+            SELECT_QUERY_RESULTS, SELECT_QUERY_RESULTS_RECORDS
+    );
   }
 
-  private static Properties createPropertiesQuery(String dataSource, String queryType, String jsonQuery) {
+  private static Properties createPropertiesQuery(String dataSource, String queryType,
+          String jsonQuery
+  ) {
     Properties tbl = new Properties();
 
     // Set the configuration parameters
@@ -486,14 +526,15 @@ public class TestDruidSerDe {
   }
 
   private static void deserializeQueryResults(DruidSerDe serDe, String queryType, String jsonQuery,
-          String resultString, Object[][] records) throws SerDeException, JsonParseException,
+          String resultString, Object[][] records
+  ) throws SerDeException, JsonParseException,
           JsonMappingException, IOException, NoSuchFieldException, SecurityException,
           IllegalArgumentException, IllegalAccessException, InterruptedException,
           NoSuchMethodException, InvocationTargetException {
 
     // Initialize
     Query<?> query = null;
-    DruidQueryRecordReader<?,?> reader = null;
+    DruidQueryRecordReader<?, ?> reader = null;
     List<?> resultsList = null;
     ObjectMapper mapper = new DefaultObjectMapper();
     switch (queryType) {
@@ -501,25 +542,33 @@ public class TestDruidSerDe {
         query = mapper.readValue(jsonQuery, TimeseriesQuery.class);
         reader = new DruidTimeseriesQueryRecordReader();
         resultsList = mapper.readValue(resultString,
-                new TypeReference<List<Result<TimeseriesResultValue>>>() {});
+                new TypeReference<List<Result<TimeseriesResultValue>>>() {
+                }
+        );
         break;
       case Query.TOPN:
         query = mapper.readValue(jsonQuery, TopNQuery.class);
         reader = new DruidTopNQueryRecordReader();
         resultsList = mapper.readValue(resultString,
-                new TypeReference<List<Result<TopNResultValue>>>() {});
+                new TypeReference<List<Result<TopNResultValue>>>() {
+                }
+        );
         break;
       case Query.GROUP_BY:
         query = mapper.readValue(jsonQuery, GroupByQuery.class);
         reader = new DruidGroupByQueryRecordReader();
         resultsList = mapper.readValue(resultString,
-                new TypeReference<List<Row>>() {});
+                new TypeReference<List<Row>>() {
+                }
+        );
         break;
       case Query.SELECT:
         query = mapper.readValue(jsonQuery, SelectQuery.class);
         reader = new DruidSelectQueryRecordReader();
         resultsList = mapper.readValue(resultString,
-                new TypeReference<List<Result<SelectResultValue>>>() {});
+                new TypeReference<List<Result<SelectResultValue>>>() {
+                }
+        );
         break;
     }
 
@@ -534,7 +583,7 @@ public class TestDruidSerDe {
     }
     Field field2 = DruidQueryRecordReader.class.getDeclaredField("results");
     field2.setAccessible(true);
-    
+
     // Get the row structure
     StructObjectInspector oi = (StructObjectInspector) serDe.getObjectInspector();
     List<? extends StructField> fieldRefs = oi.getAllStructFieldRefs();

http://git-wip-us.apache.org/repos/asf/hive/blob/590687bc/druid-handler/src/test/org/apache/hadoop/hive/druid/TestHiveDruidQueryBasedInputFormat.java
----------------------------------------------------------------------
diff --git a/druid-handler/src/test/org/apache/hadoop/hive/druid/TestHiveDruidQueryBasedInputFormat.java b/druid-handler/src/test/org/apache/hadoop/hive/druid/TestHiveDruidQueryBasedInputFormat.java
index 9ccd48e..4fde3eb 100644
--- a/druid-handler/src/test/org/apache/hadoop/hive/druid/TestHiveDruidQueryBasedInputFormat.java
+++ b/druid-handler/src/test/org/apache/hadoop/hive/druid/TestHiveDruidQueryBasedInputFormat.java
@@ -22,6 +22,7 @@ import java.util.ArrayList;
 import java.util.Arrays;
 import java.util.List;
 
+import org.apache.hadoop.hive.druid.io.DruidQueryBasedInputFormat;
 import org.joda.time.Interval;
 import org.joda.time.chrono.ISOChronology;
 import org.junit.Test;
@@ -29,14 +30,15 @@ import org.junit.Test;
 import junit.framework.TestCase;
 
 public class TestHiveDruidQueryBasedInputFormat extends TestCase {
-  
+
   @SuppressWarnings("unchecked")
   @Test
   public void testCreateSplitsIntervals() throws Exception {
-    HiveDruidQueryBasedInputFormat input = new HiveDruidQueryBasedInputFormat();
+    DruidQueryBasedInputFormat input = new DruidQueryBasedInputFormat();
 
-    Method method1 = HiveDruidQueryBasedInputFormat.class.getDeclaredMethod("createSplitsIntervals",
-            List.class, int.class);
+    Method method1 = DruidQueryBasedInputFormat.class.getDeclaredMethod("createSplitsIntervals",
+            List.class, int.class
+    );
     method1.setAccessible(true);
 
     List<Interval> intervals;
@@ -48,10 +50,14 @@ public class TestHiveDruidQueryBasedInputFormat extends TestCase {
     intervals.add(new Interval(1262304000000L, 1293840000000L, ISOChronology.getInstanceUTC()));
     resultList = (List<List<Interval>>) method1.invoke(input, intervals, 4);
     expectedResultList = new ArrayList<>();
-    expectedResultList.add(Arrays.asList(new Interval(1262304000000L, 1270188000000L, ISOChronology.getInstanceUTC())));
-    expectedResultList.add(Arrays.asList(new Interval(1270188000000L, 1278072000000L, ISOChronology.getInstanceUTC())));
-    expectedResultList.add(Arrays.asList(new Interval(1278072000000L, 1285956000000L, ISOChronology.getInstanceUTC())));
-    expectedResultList.add(Arrays.asList(new Interval(1285956000000L, 1293840000000L, ISOChronology.getInstanceUTC())));
+    expectedResultList.add(Arrays
+            .asList(new Interval(1262304000000L, 1270188000000L, ISOChronology.getInstanceUTC())));
+    expectedResultList.add(Arrays
+            .asList(new Interval(1270188000000L, 1278072000000L, ISOChronology.getInstanceUTC())));
+    expectedResultList.add(Arrays
+            .asList(new Interval(1278072000000L, 1285956000000L, ISOChronology.getInstanceUTC())));
+    expectedResultList.add(Arrays
+            .asList(new Interval(1285956000000L, 1293840000000L, ISOChronology.getInstanceUTC())));
     assertEquals(expectedResultList, resultList);
 
     // Test 2 : two splits, create 4
@@ -60,11 +66,16 @@ public class TestHiveDruidQueryBasedInputFormat extends TestCase {
     intervals.add(new Interval(1325376000000L, 1356998400000L, ISOChronology.getInstanceUTC()));
     resultList = (List<List<Interval>>) method1.invoke(input, intervals, 4);
     expectedResultList = new ArrayList<>();
-    expectedResultList.add(Arrays.asList(new Interval(1262304000000L, 1278093600000L, ISOChronology.getInstanceUTC())));
-    expectedResultList.add(Arrays.asList(new Interval(1278093600000L, 1293840000000L, ISOChronology.getInstanceUTC()),
-            new Interval(1325376000000L, 1325419200000L, ISOChronology.getInstanceUTC())));
-    expectedResultList.add(Arrays.asList(new Interval(1325419200000L, 1341208800000L, ISOChronology.getInstanceUTC())));
-    expectedResultList.add(Arrays.asList(new Interval(1341208800000L, 1356998400000L, ISOChronology.getInstanceUTC())));
+    expectedResultList.add(Arrays
+            .asList(new Interval(1262304000000L, 1278093600000L, ISOChronology.getInstanceUTC())));
+    expectedResultList.add(Arrays
+            .asList(new Interval(1278093600000L, 1293840000000L, ISOChronology.getInstanceUTC()),
+                    new Interval(1325376000000L, 1325419200000L, ISOChronology.getInstanceUTC())
+            ));
+    expectedResultList.add(Arrays
+            .asList(new Interval(1325419200000L, 1341208800000L, ISOChronology.getInstanceUTC())));
+    expectedResultList.add(Arrays
+            .asList(new Interval(1341208800000L, 1356998400000L, ISOChronology.getInstanceUTC())));
     assertEquals(expectedResultList, resultList);
 
     // Test 3 : two splits, create 5
@@ -73,29 +84,49 @@ public class TestHiveDruidQueryBasedInputFormat extends TestCase {
     intervals.add(new Interval(1325376000000L, 1356998400000L, ISOChronology.getInstanceUTC()));
     resultList = (List<List<Interval>>) method1.invoke(input, intervals, 5);
     expectedResultList = new ArrayList<>();
-    expectedResultList.add(Arrays.asList(new Interval(1262304000000L, 1274935680000L, ISOChronology.getInstanceUTC())));
-    expectedResultList.add(Arrays.asList(new Interval(1274935680000L, 1287567360000L, ISOChronology.getInstanceUTC())));
-    expectedResultList.add(Arrays.asList(new Interval(1287567360000L, 1293840000000L, ISOChronology.getInstanceUTC()),
-            new Interval(1325376000000L, 1331735040000L, ISOChronology.getInstanceUTC())));
-    expectedResultList.add(Arrays.asList(new Interval(1331735040000L, 1344366720000L, ISOChronology.getInstanceUTC())));
-    expectedResultList.add(Arrays.asList(new Interval(1344366720000L, 1356998400000L, ISOChronology.getInstanceUTC())));
+    expectedResultList.add(Arrays
+            .asList(new Interval(1262304000000L, 1274935680000L, ISOChronology.getInstanceUTC())));
+    expectedResultList.add(Arrays
+            .asList(new Interval(1274935680000L, 1287567360000L, ISOChronology.getInstanceUTC())));
+    expectedResultList.add(Arrays
+            .asList(new Interval(1287567360000L, 1293840000000L, ISOChronology.getInstanceUTC()),
+                    new Interval(1325376000000L, 1331735040000L, ISOChronology.getInstanceUTC())
+            ));
+    expectedResultList.add(Arrays
+            .asList(new Interval(1331735040000L, 1344366720000L, ISOChronology.getInstanceUTC())));
+    expectedResultList.add(Arrays
+            .asList(new Interval(1344366720000L, 1356998400000L, ISOChronology.getInstanceUTC())));
     assertEquals(expectedResultList, resultList);
 
     // Test 4 : three splits, different ranges, create 6
     intervals = new ArrayList<>();
-    intervals.add(new Interval(1199145600000L, 1201824000000L, ISOChronology.getInstanceUTC())); // one month
-    intervals.add(new Interval(1325376000000L, 1356998400000L, ISOChronology.getInstanceUTC())); // one year
-    intervals.add(new Interval(1407283200000L, 1407888000000L, ISOChronology.getInstanceUTC())); // 7 days
+    intervals.add(new Interval(1199145600000L, 1201824000000L,
+            ISOChronology.getInstanceUTC()
+    )); // one month
+    intervals.add(new Interval(1325376000000L, 1356998400000L,
+            ISOChronology.getInstanceUTC()
+    )); // one year
+    intervals.add(new Interval(1407283200000L, 1407888000000L,
+            ISOChronology.getInstanceUTC()
+    )); // 7 days
     resultList = (List<List<Interval>>) method1.invoke(input, intervals, 6);
     expectedResultList = new ArrayList<>();
-    expectedResultList.add(Arrays.asList(new Interval(1199145600000L, 1201824000000L, ISOChronology.getInstanceUTC()),
-            new Interval(1325376000000L, 1328515200000L, ISOChronology.getInstanceUTC())));
-    expectedResultList.add(Arrays.asList(new Interval(1328515200000L, 1334332800000L, ISOChronology.getInstanceUTC())));
-    expectedResultList.add(Arrays.asList(new Interval(1334332800000L, 1340150400000L, ISOChronology.getInstanceUTC())));
-    expectedResultList.add(Arrays.asList(new Interval(1340150400000L, 1345968000000L, ISOChronology.getInstanceUTC())));
-    expectedResultList.add(Arrays.asList(new Interval(1345968000000L, 1351785600000L, ISOChronology.getInstanceUTC())));
-    expectedResultList.add(Arrays.asList(new Interval(1351785600000L, 1356998400000L, ISOChronology.getInstanceUTC()),
-            new Interval(1407283200000L, 1407888000000L, ISOChronology.getInstanceUTC())));
+    expectedResultList.add(Arrays
+            .asList(new Interval(1199145600000L, 1201824000000L, ISOChronology.getInstanceUTC()),
+                    new Interval(1325376000000L, 1328515200000L, ISOChronology.getInstanceUTC())
+            ));
+    expectedResultList.add(Arrays
+            .asList(new Interval(1328515200000L, 1334332800000L, ISOChronology.getInstanceUTC())));
+    expectedResultList.add(Arrays
+            .asList(new Interval(1334332800000L, 1340150400000L, ISOChronology.getInstanceUTC())));
+    expectedResultList.add(Arrays
+            .asList(new Interval(1340150400000L, 1345968000000L, ISOChronology.getInstanceUTC())));
+    expectedResultList.add(Arrays
+            .asList(new Interval(1345968000000L, 1351785600000L, ISOChronology.getInstanceUTC())));
+    expectedResultList.add(Arrays
+            .asList(new Interval(1351785600000L, 1356998400000L, ISOChronology.getInstanceUTC()),
+                    new Interval(1407283200000L, 1407888000000L, ISOChronology.getInstanceUTC())
+            ));
     assertEquals(expectedResultList, resultList);
   }
 

http://git-wip-us.apache.org/repos/asf/hive/blob/590687bc/druid-handler/src/test/org/apache/hadoop/hive/ql/io/DruidRecordWriterTest.java
----------------------------------------------------------------------
diff --git a/druid-handler/src/test/org/apache/hadoop/hive/ql/io/DruidRecordWriterTest.java b/druid-handler/src/test/org/apache/hadoop/hive/ql/io/DruidRecordWriterTest.java
new file mode 100644
index 0000000..a4272ee
--- /dev/null
+++ b/druid-handler/src/test/org/apache/hadoop/hive/ql/io/DruidRecordWriterTest.java
@@ -0,0 +1,221 @@
+package org.apache.hadoop.hive.ql.io;
+
+import com.fasterxml.jackson.databind.ObjectMapper;
+import com.google.common.base.Function;
+import com.google.common.collect.ImmutableList;
+import com.google.common.collect.ImmutableMap;
+import com.google.common.collect.Lists;
+import com.metamx.common.Granularity;
+import io.druid.data.input.Firehose;
+import io.druid.data.input.InputRow;
+import io.druid.data.input.impl.DimensionSchema;
+import io.druid.data.input.impl.DimensionsSpec;
+import io.druid.data.input.impl.InputRowParser;
+import io.druid.data.input.impl.MapInputRowParser;
+import io.druid.data.input.impl.StringDimensionSchema;
+import io.druid.data.input.impl.TimeAndDimsParseSpec;
+import io.druid.data.input.impl.TimestampSpec;
+import io.druid.granularity.QueryGranularities;
+import io.druid.query.aggregation.AggregatorFactory;
+import io.druid.query.aggregation.LongSumAggregatorFactory;
+import io.druid.query.aggregation.hyperloglog.HyperUniquesAggregatorFactory;
+import io.druid.segment.QueryableIndex;
+import io.druid.segment.QueryableIndexStorageAdapter;
+import io.druid.segment.indexing.DataSchema;
+import io.druid.segment.indexing.RealtimeTuningConfig;
+import io.druid.segment.indexing.granularity.UniformGranularitySpec;
+import io.druid.segment.loading.DataSegmentPusher;
+import io.druid.segment.loading.LocalDataSegmentPuller;
+import io.druid.segment.loading.LocalDataSegmentPusher;
+import io.druid.segment.loading.LocalDataSegmentPusherConfig;
+import io.druid.segment.loading.SegmentLoadingException;
+import io.druid.segment.realtime.firehose.IngestSegmentFirehose;
+import io.druid.segment.realtime.firehose.WindowedStorageAdapter;
+import io.druid.timeline.DataSegment;
+import org.apache.calcite.adapter.druid.DruidTable;
+import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.fs.FileSystem;
+import org.apache.hadoop.fs.LocalFileSystem;
+import org.apache.hadoop.fs.Path;
+import org.apache.hadoop.hive.conf.Constants;
+import org.apache.hadoop.hive.druid.DruidStorageHandler;
+import org.apache.hadoop.hive.druid.DruidStorageHandlerUtils;
+import org.apache.hadoop.hive.druid.io.DruidRecordWriter;
+import org.apache.hadoop.hive.druid.serde.DruidWritable;
+import org.joda.time.DateTime;
+import org.joda.time.Interval;
+import org.junit.Assert;
+import org.junit.Ignore;
+import org.junit.Rule;
+import org.junit.Test;
+import org.junit.rules.TemporaryFolder;
+
+import javax.annotation.Nullable;
+import java.io.File;
+import java.io.IOException;
+import java.util.List;
+import java.util.Map;
+
+public class DruidRecordWriterTest {
+  private ObjectMapper objectMapper = DruidStorageHandlerUtils.JSON_MAPPER;
+
+  private static final Interval INTERVAL_FULL = new Interval("2014-10-22T00:00:00Z/P1D");
+
+  @Rule
+  public TemporaryFolder temporaryFolder = new TemporaryFolder();
+
+  private DruidRecordWriter druidRecordWriter;
+
+  final List<ImmutableMap<String, Object>> expectedRows = ImmutableList.of(
+          ImmutableMap.<String, Object>of(
+                  DruidTable.DEFAULT_TIMESTAMP_COLUMN,
+                  DateTime.parse("2014-10-22T00:00:00.000Z").getMillis(),
+                  "host", ImmutableList.of("a.example.com"),
+                  "visited_sum", 190L,
+                  "unique_hosts", 1.0d
+          ),
+          ImmutableMap.<String, Object>of(
+                  DruidTable.DEFAULT_TIMESTAMP_COLUMN,
+                  DateTime.parse("2014-10-22T01:00:00.000Z").getMillis(),
+                  "host", ImmutableList.of("b.example.com"),
+                  "visited_sum", 175L,
+                  "unique_hosts", 1.0d
+          ),
+          ImmutableMap.<String, Object>of(
+                  DruidTable.DEFAULT_TIMESTAMP_COLUMN,
+                  DateTime.parse("2014-10-22T02:00:00.000Z").getMillis(),
+                  "host", ImmutableList.of("c.example.com"),
+                  "visited_sum", 270L,
+                  "unique_hosts", 1.0d
+          )
+  );
+
+  // This test need this patch https://github.com/druid-io/druid/pull/3483
+  @Ignore
+  @Test
+  public void testWrite() throws IOException, SegmentLoadingException {
+
+    final String dataSourceName = "testDataSource";
+    final File segmentOutputDir = temporaryFolder.newFolder();
+    final File workingDir = temporaryFolder.newFolder();
+    Configuration config = new Configuration();
+
+    final InputRowParser inputRowParser = new MapInputRowParser(new TimeAndDimsParseSpec(
+            new TimestampSpec(DruidTable.DEFAULT_TIMESTAMP_COLUMN, "auto", null),
+            new DimensionsSpec(ImmutableList.<DimensionSchema>of(new StringDimensionSchema("host")),
+                    null, null
+            )
+    ));
+    final Map<String, Object> parserMap = objectMapper.convertValue(inputRowParser, Map.class);
+
+    DataSchema dataSchema = new DataSchema(
+            dataSourceName,
+            parserMap,
+            new AggregatorFactory[] {
+                    new LongSumAggregatorFactory("visited_sum", "visited_sum"),
+                    new HyperUniquesAggregatorFactory("unique_hosts", "unique_hosts")
+            },
+            new UniformGranularitySpec(
+                    Granularity.DAY, QueryGranularities.NONE, ImmutableList.of(INTERVAL_FULL)
+            ),
+            objectMapper
+    );
+
+    RealtimeTuningConfig tuningConfig = RealtimeTuningConfig
+            .makeDefaultTuningConfig(temporaryFolder.newFolder());
+    LocalFileSystem localFileSystem = FileSystem.getLocal(config);
+    DataSegmentPusher dataSegmentPusher = new LocalDataSegmentPusher(
+            new LocalDataSegmentPusherConfig() {
+              @Override
+              public File getStorageDirectory() {return segmentOutputDir;}
+            }, objectMapper);
+
+    Path segmentDescriptroPath = new Path(workingDir.getAbsolutePath(),
+            DruidStorageHandler.SEGMENTS_DESCRIPTOR_DIR_NAME
+    );
+    druidRecordWriter = new DruidRecordWriter(dataSchema, tuningConfig, dataSegmentPusher, 20,
+            segmentDescriptroPath, localFileSystem
+    );
+
+    List<DruidWritable> druidWritables = Lists.transform(expectedRows,
+            new Function<ImmutableMap<String, Object>, DruidWritable>() {
+              @Nullable
+              @Override
+              public DruidWritable apply(@Nullable ImmutableMap<String, Object> input
+              ) {
+                return new DruidWritable(ImmutableMap.<String, Object>builder().putAll(input)
+                        .put(Constants.DRUID_TIMESTAMP_GRANULARITY_COL_NAME,
+                                Granularity.DAY.truncate(
+                                        new DateTime((long) input
+                                                .get(DruidTable.DEFAULT_TIMESTAMP_COLUMN)))
+                                        .getMillis()
+                        ).build());
+              }
+            }
+    );
+    for (DruidWritable druidWritable : druidWritables) {
+      druidRecordWriter.write(druidWritable);
+    }
+    druidRecordWriter.close(false);
+    List<DataSegment> dataSegmentList = DruidStorageHandlerUtils
+            .getPublishedSegments(segmentDescriptroPath, config);
+    Assert.assertEquals(1, dataSegmentList.size());
+    File tmpUnzippedSegmentDir = temporaryFolder.newFolder();
+    new LocalDataSegmentPuller().getSegmentFiles(dataSegmentList.get(0), tmpUnzippedSegmentDir);
+    final QueryableIndex queryableIndex = DruidStorageHandlerUtils.INDEX_IO
+            .loadIndex(tmpUnzippedSegmentDir);
+
+    QueryableIndexStorageAdapter adapter = new QueryableIndexStorageAdapter(queryableIndex);
+
+    Firehose firehose = new IngestSegmentFirehose(
+            ImmutableList.of(new WindowedStorageAdapter(adapter, adapter.getInterval())),
+            ImmutableList.of("host"),
+            ImmutableList.of("visited_sum", "unique_hosts"),
+            null,
+            QueryGranularities.NONE
+    );
+
+    List<InputRow> rows = Lists.newArrayList();
+    while (firehose.hasMore()) {
+      rows.add(firehose.nextRow());
+    }
+
+    verifyRows(expectedRows, rows);
+
+  }
+
+  private void verifyRows(List<ImmutableMap<String, Object>> expectedRows,
+          List<InputRow> actualRows
+  ) {
+    System.out.println("actualRows = " + actualRows);
+    Assert.assertEquals(expectedRows.size(), actualRows.size());
+
+    for (int i = 0; i < expectedRows.size(); i++) {
+      Map<String, Object> expected = expectedRows.get(i);
+      InputRow actual = actualRows.get(i);
+
+      Assert.assertEquals(ImmutableList.of("host"), actual.getDimensions());
+
+      Assert.assertEquals(expected.get(DruidTable.DEFAULT_TIMESTAMP_COLUMN),
+              actual.getTimestamp().getMillis()
+      );
+      Assert.assertEquals(expected.get("host"), actual.getDimension("host"));
+      Assert.assertEquals(expected.get("visited_sum"), actual.getLongMetric("visited_sum"));
+      Assert.assertEquals(
+              (Double) expected.get("unique_hosts"),
+              (Double) HyperUniquesAggregatorFactory
+                      .estimateCardinality(actual.getRaw("unique_hosts")),
+              0.001
+      );
+    }
+  }
+
+  @Test
+  public void testSerDesr() throws IOException {
+    String segment = "{\"dataSource\":\"datasource2015\",\"interval\":\"2015-06-01T00:00:00.000-04:00/2015-06-02T00:00:00.000-04:00\",\"version\":\"2016-11-04T19:24:01.732-04:00\",\"loadSpec\":{\"type\":\"hdfs\",\"path\":\"hdfs://cn105-10.l42scl.hortonworks.com:8020/apps/hive/warehouse/druid.db/.hive-staging_hive_2016-11-04_19-23-50_168_1550339856804207572-1/_task_tmp.-ext-10002/_tmp.000000_0/datasource2015/20150601T000000.000-0400_20150602T000000.000-0400/2016-11-04T19_24_01.732-04_00/0/index.zip\"},\"dimensions\":\"dimension1\",\"metrics\":\"bigint\",\"shardSpec\":{\"type\":\"linear\",\"partitionNum\":0},\"binaryVersion\":9,\"size\":1765,\"identifier\":\"datasource2015_2015-06-01T00:00:00.000-04:00_2015-06-02T00:00:00.000-04:00_2016-11-04T19:24:01.732-04:00\"}";
+    DataSegment dataSegment = objectMapper.readerFor(DataSegment.class)
+            .readValue(segment);
+    Assert.assertTrue(dataSegment.getDataSource().equals("datasource2015"));
+  }
+
+}

http://git-wip-us.apache.org/repos/asf/hive/blob/590687bc/llap-server/src/java/org/apache/hadoop/hive/llap/daemon/impl/TaskRunnerCallable.java
----------------------------------------------------------------------
diff --git a/llap-server/src/java/org/apache/hadoop/hive/llap/daemon/impl/TaskRunnerCallable.java b/llap-server/src/java/org/apache/hadoop/hive/llap/daemon/impl/TaskRunnerCallable.java
index 87bd5c8..d8689ba 100644
--- a/llap-server/src/java/org/apache/hadoop/hive/llap/daemon/impl/TaskRunnerCallable.java
+++ b/llap-server/src/java/org/apache/hadoop/hive/llap/daemon/impl/TaskRunnerCallable.java
@@ -17,19 +17,12 @@
  */
 package org.apache.hadoop.hive.llap.daemon.impl;
 
-import java.net.InetSocketAddress;
-import java.nio.ByteBuffer;
-import java.security.PrivilegedExceptionAction;
-import java.util.HashMap;
-import java.util.Map;
-import java.util.Stack;
-import java.util.concurrent.ExecutorService;
-import java.util.concurrent.LinkedBlockingQueue;
-import java.util.concurrent.ThreadPoolExecutor;
-import java.util.concurrent.TimeUnit;
-import java.util.concurrent.atomic.AtomicBoolean;
-import java.util.concurrent.atomic.AtomicLong;
-
+import com.google.common.annotations.VisibleForTesting;
+import com.google.common.base.Stopwatch;
+import com.google.common.collect.HashMultimap;
+import com.google.common.collect.Multimap;
+import com.google.common.util.concurrent.FutureCallback;
+import com.google.common.util.concurrent.ThreadFactoryBuilder;
 import org.apache.hadoop.conf.Configuration;
 import org.apache.hadoop.fs.FileSystem;
 import org.apache.hadoop.hive.llap.daemon.FragmentCompletionHandler;
@@ -50,7 +43,6 @@ import org.apache.hadoop.security.Credentials;
 import org.apache.hadoop.security.SecurityUtil;
 import org.apache.hadoop.security.UserGroupInformation;
 import org.apache.hadoop.security.token.Token;
-import org.apache.hadoop.yarn.api.records.ApplicationId;
 import org.apache.log4j.MDC;
 import org.apache.log4j.NDC;
 import org.apache.tez.common.CallableWithNdc;
@@ -58,10 +50,7 @@ import org.apache.tez.common.TezCommonUtils;
 import org.apache.tez.common.security.JobTokenIdentifier;
 import org.apache.tez.common.security.TokenCache;
 import org.apache.tez.dag.api.TezConstants;
-import org.apache.tez.dag.records.TezDAGID;
 import org.apache.tez.dag.records.TezTaskAttemptID;
-import org.apache.tez.dag.records.TezTaskID;
-import org.apache.tez.dag.records.TezVertexID;
 import org.apache.tez.hadoop.shim.HadoopShim;
 import org.apache.tez.runtime.api.ExecutionContext;
 import org.apache.tez.runtime.api.impl.TaskSpec;
@@ -75,15 +64,17 @@ import org.apache.tez.runtime.task.TezTaskRunner2;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 
-import com.google.common.annotations.VisibleForTesting;
-import com.google.common.base.Joiner;
-import com.google.common.base.Stopwatch;
-import com.google.common.collect.HashMultimap;
-import com.google.common.collect.Multimap;
-import com.google.common.util.concurrent.FutureCallback;
-import com.google.common.util.concurrent.ListeningExecutorService;
-import com.google.common.util.concurrent.MoreExecutors;
-import com.google.common.util.concurrent.ThreadFactoryBuilder;
+import java.net.InetSocketAddress;
+import java.nio.ByteBuffer;
+import java.security.PrivilegedExceptionAction;
+import java.util.HashMap;
+import java.util.Map;
+import java.util.Stack;
+import java.util.concurrent.ExecutorService;
+import java.util.concurrent.LinkedBlockingQueue;
+import java.util.concurrent.TimeUnit;
+import java.util.concurrent.atomic.AtomicBoolean;
+import java.util.concurrent.atomic.AtomicLong;
 
 /**
  *
@@ -265,7 +256,7 @@ public class TaskRunnerCallable extends CallableWithNdc<TaskRunner2Result> {
         } finally {
           FileSystem.closeAllForUGI(taskUgi);
           LOG.info("ExecutionTime for Container: " + request.getContainerIdString() + "=" +
-              runtimeWatch.stop().elapsedMillis());
+                  runtimeWatch.stop().elapsedMillis());
           if (LOG.isDebugEnabled()) {
             LOG.debug(
                 "canFinish post completion: " + taskSpec.getTaskAttemptID() + ": " + canFinish());

http://git-wip-us.apache.org/repos/asf/hive/blob/590687bc/pom.xml
----------------------------------------------------------------------
diff --git a/pom.xml b/pom.xml
index 3fc35bc..376197e 100644
--- a/pom.xml
+++ b/pom.xml
@@ -136,7 +136,7 @@
     <derby.version>10.10.2.0</derby.version>
     <dropwizard.version>3.1.0</dropwizard.version>
     <dropwizard-metrics-hadoop-metrics2-reporter.version>0.1.2</dropwizard-metrics-hadoop-metrics2-reporter.version>
-    <druid.version>0.9.1.1</druid.version>
+    <druid.version>0.9.2</druid.version>
     <guava.version>14.0.1</guava.version>
     <groovy.version>2.4.4</groovy.version>
     <hadoop.version>2.7.2</hadoop.version>

http://git-wip-us.apache.org/repos/asf/hive/blob/590687bc/ql/src/java/org/apache/hadoop/hive/ql/exec/FileSinkOperator.java
----------------------------------------------------------------------
diff --git a/ql/src/java/org/apache/hadoop/hive/ql/exec/FileSinkOperator.java b/ql/src/java/org/apache/hadoop/hive/ql/exec/FileSinkOperator.java
index 54d619c..5ef901f 100644
--- a/ql/src/java/org/apache/hadoop/hive/ql/exec/FileSinkOperator.java
+++ b/ql/src/java/org/apache/hadoop/hive/ql/exec/FileSinkOperator.java
@@ -18,20 +18,7 @@
 
 package org.apache.hadoop.hive.ql.exec;
 
-import static org.apache.hadoop.hive.conf.HiveConf.ConfVars.HIVE_TEMPORARY_TABLE_STORAGE;
-
-import java.io.FileNotFoundException;
-import java.io.IOException;
-import java.io.Serializable;
-import java.io.StringWriter;
-import java.util.ArrayList;
-import java.util.HashMap;
-import java.util.HashSet;
-import java.util.List;
-import java.util.Map;
-import java.util.Properties;
-import java.util.Set;
-
+import com.google.common.collect.Lists;
 import org.apache.hadoop.conf.Configuration;
 import org.apache.hadoop.fs.FileStatus;
 import org.apache.hadoop.fs.FileSystem;
@@ -83,7 +70,19 @@ import org.apache.hive.common.util.HiveStringUtils;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 
-import com.google.common.collect.Lists;
+import java.io.FileNotFoundException;
+import java.io.IOException;
+import java.io.Serializable;
+import java.io.StringWriter;
+import java.util.ArrayList;
+import java.util.HashMap;
+import java.util.HashSet;
+import java.util.List;
+import java.util.Map;
+import java.util.Properties;
+import java.util.Set;
+
+import static org.apache.hadoop.hive.conf.HiveConf.ConfVars.HIVE_TEMPORARY_TABLE_STORAGE;
 
 /**
  * File Sink operator implementation.

http://git-wip-us.apache.org/repos/asf/hive/blob/590687bc/ql/src/java/org/apache/hadoop/hive/ql/exec/Utilities.java
----------------------------------------------------------------------
diff --git a/ql/src/java/org/apache/hadoop/hive/ql/exec/Utilities.java b/ql/src/java/org/apache/hadoop/hive/ql/exec/Utilities.java
index 8db833e..37e4b9b 100644
--- a/ql/src/java/org/apache/hadoop/hive/ql/exec/Utilities.java
+++ b/ql/src/java/org/apache/hadoop/hive/ql/exec/Utilities.java
@@ -18,63 +18,10 @@
 
 package org.apache.hadoop.hive.ql.exec;
 
+import com.esotericsoftware.kryo.Kryo;
+import com.google.common.base.Preconditions;
 import com.google.common.collect.Lists;
 import com.google.common.collect.Sets;
-import java.beans.DefaultPersistenceDelegate;
-import java.beans.Encoder;
-import java.beans.Expression;
-import java.beans.Statement;
-import java.io.ByteArrayInputStream;
-import java.io.ByteArrayOutputStream;
-import java.io.DataInput;
-import java.io.EOFException;
-import java.io.File;
-import java.io.FileNotFoundException;
-import java.io.IOException;
-import java.io.InputStream;
-import java.io.OutputStream;
-import java.io.Serializable;
-import java.net.URI;
-import java.net.URISyntaxException;
-import java.net.URL;
-import java.net.URLClassLoader;
-import java.net.URLDecoder;
-import java.sql.Connection;
-import java.sql.DriverManager;
-import java.sql.PreparedStatement;
-import java.sql.SQLException;
-import java.sql.SQLFeatureNotSupportedException;
-import java.sql.SQLTransientException;
-import java.text.SimpleDateFormat;
-import java.util.ArrayList;
-import java.util.Arrays;
-import java.util.Calendar;
-import java.util.Collection;
-import java.util.Collections;
-import java.util.Enumeration;
-import java.util.HashMap;
-import java.util.HashSet;
-import java.util.Iterator;
-import java.util.LinkedHashMap;
-import java.util.LinkedList;
-import java.util.List;
-import java.util.Map;
-import java.util.Properties;
-import java.util.Random;
-import java.util.Set;
-import java.util.UUID;
-import java.util.concurrent.ConcurrentHashMap;
-import java.util.concurrent.ExecutionException;
-import java.util.concurrent.Future;
-import java.util.concurrent.LinkedBlockingQueue;
-import java.util.concurrent.ThreadPoolExecutor;
-import java.util.concurrent.TimeUnit;
-import java.util.regex.Matcher;
-import java.util.regex.Pattern;
-import java.util.zip.Deflater;
-import java.util.zip.DeflaterOutputStream;
-import java.util.zip.InflaterInputStream;
-
 import org.apache.commons.codec.binary.Base64;
 import org.apache.commons.lang.StringUtils;
 import org.apache.commons.lang.WordUtils;
@@ -198,8 +145,58 @@ import org.apache.hive.common.util.ReflectionUtil;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 
-import com.esotericsoftware.kryo.Kryo;
-import com.google.common.base.Preconditions;
+import java.beans.DefaultPersistenceDelegate;
+import java.beans.Encoder;
+import java.beans.Expression;
+import java.beans.Statement;
+import java.io.ByteArrayInputStream;
+import java.io.ByteArrayOutputStream;
+import java.io.DataInput;
+import java.io.EOFException;
+import java.io.File;
+import java.io.FileNotFoundException;
+import java.io.IOException;
+import java.io.InputStream;
+import java.io.OutputStream;
+import java.io.Serializable;
+import java.net.URI;
+import java.net.URL;
+import java.net.URLClassLoader;
+import java.net.URLDecoder;
+import java.sql.Connection;
+import java.sql.DriverManager;
+import java.sql.PreparedStatement;
+import java.sql.SQLException;
+import java.sql.SQLFeatureNotSupportedException;
+import java.sql.SQLTransientException;
+import java.text.SimpleDateFormat;
+import java.util.ArrayList;
+import java.util.Arrays;
+import java.util.Calendar;
+import java.util.Collection;
+import java.util.Enumeration;
+import java.util.HashMap;
+import java.util.HashSet;
+import java.util.Iterator;
+import java.util.LinkedHashMap;
+import java.util.LinkedList;
+import java.util.List;
+import java.util.Map;
+import java.util.Properties;
+import java.util.Random;
+import java.util.Set;
+import java.util.UUID;
+import java.util.concurrent.ConcurrentHashMap;
+import java.util.concurrent.ExecutionException;
+import java.util.concurrent.Future;
+import java.util.concurrent.LinkedBlockingQueue;
+import java.util.concurrent.ThreadPoolExecutor;
+import java.util.concurrent.TimeUnit;
+import java.util.regex.Matcher;
+import java.util.regex.Pattern;
+import java.util.zip.Deflater;
+import java.util.zip.DeflaterOutputStream;
+import java.util.zip.InflaterInputStream;
 
 /**
  * Utilities.
@@ -3095,7 +3092,7 @@ public final class Utilities {
 
     TableDesc tableDesc = work.getAliasToPartnInfo().get(alias).getTableDesc();
     if (tableDesc.isNonNative()) {
-      // if this isn't a hive table we can't create an empty file for it.
+      // if it does not need native storage, we can't create an empty file for it.
       return null;
     }
 

http://git-wip-us.apache.org/repos/asf/hive/blob/590687bc/ql/src/java/org/apache/hadoop/hive/ql/hooks/LineageLogger.java
----------------------------------------------------------------------
diff --git a/ql/src/java/org/apache/hadoop/hive/ql/hooks/LineageLogger.java b/ql/src/java/org/apache/hadoop/hive/ql/hooks/LineageLogger.java
index 178a2de..c1f6883 100644
--- a/ql/src/java/org/apache/hadoop/hive/ql/hooks/LineageLogger.java
+++ b/ql/src/java/org/apache/hadoop/hive/ql/hooks/LineageLogger.java
@@ -18,21 +18,13 @@
 
 package org.apache.hadoop.hive.ql.hooks;
 
-import java.io.IOException;
-import java.util.ArrayList;
-import java.util.Collection;
-import java.util.HashSet;
-import java.util.LinkedHashMap;
-import java.util.LinkedHashSet;
-import java.util.List;
-import java.util.Map;
-import java.util.Set;
-
+import com.google.common.collect.Lists;
+import com.google.common.hash.Hasher;
+import com.google.common.hash.Hashing;
+import com.google.gson.stream.JsonWriter;
 import org.apache.commons.collections.SetUtils;
 import org.apache.commons.io.output.StringBuilderWriter;
 import org.apache.commons.lang.StringUtils;
-import org.slf4j.Logger;
-import org.slf4j.LoggerFactory;
 import org.apache.hadoop.hive.common.ObjectPair;
 import org.apache.hadoop.hive.conf.HiveConf;
 import org.apache.hadoop.hive.metastore.api.FieldSchema;
@@ -50,11 +42,18 @@ import org.apache.hadoop.hive.ql.optimizer.lineage.LineageCtx.Index;
 import org.apache.hadoop.hive.ql.plan.HiveOperation;
 import org.apache.hadoop.hive.ql.session.SessionState;
 import org.apache.hadoop.hive.ql.session.SessionState.LogHelper;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
 
-import com.google.common.collect.Lists;
-import com.google.common.hash.Hasher;
-import com.google.common.hash.Hashing;
-import com.google.gson.stream.JsonWriter;
+import java.io.IOException;
+import java.util.ArrayList;
+import java.util.Collection;
+import java.util.HashSet;
+import java.util.LinkedHashMap;
+import java.util.LinkedHashSet;
+import java.util.List;
+import java.util.Map;
+import java.util.Set;
 
 /**
  * Implementation of a post execute hook that logs lineage info to a log file.

http://git-wip-us.apache.org/repos/asf/hive/blob/590687bc/ql/src/java/org/apache/hadoop/hive/ql/optimizer/Optimizer.java
----------------------------------------------------------------------
diff --git a/ql/src/java/org/apache/hadoop/hive/ql/optimizer/Optimizer.java b/ql/src/java/org/apache/hadoop/hive/ql/optimizer/Optimizer.java
index eaf0abc..d9c2ff4 100644
--- a/ql/src/java/org/apache/hadoop/hive/ql/optimizer/Optimizer.java
+++ b/ql/src/java/org/apache/hadoop/hive/ql/optimizer/Optimizer.java
@@ -122,6 +122,8 @@ public class Optimizer {
       transformations.add(new SortedDynPartitionOptimizer());
     }
 
+    transformations.add(new SortedDynPartitionTimeGranularityOptimizer());
+
     if (HiveConf.getBoolVar(hiveConf, HiveConf.ConfVars.HIVEOPTPPD)) {
       transformations.add(new PartitionPruner());
       transformations.add(new PartitionConditionRemover());