You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@hive.apache.org by jc...@apache.org on 2016/12/15 20:51:35 UTC
[1/4] hive git commit: HIVE-15277: Teach Hive how to create/delete
Druid segments (Slim Bouguerra, reviewed by Jesus Camacho Rodriguez)
Repository: hive
Updated Branches:
refs/heads/master 89362a14d -> 590687bc4
http://git-wip-us.apache.org/repos/asf/hive/blob/590687bc/ql/src/java/org/apache/hadoop/hive/ql/optimizer/SortedDynPartitionTimeGranularityOptimizer.java
----------------------------------------------------------------------
diff --git a/ql/src/java/org/apache/hadoop/hive/ql/optimizer/SortedDynPartitionTimeGranularityOptimizer.java b/ql/src/java/org/apache/hadoop/hive/ql/optimizer/SortedDynPartitionTimeGranularityOptimizer.java
new file mode 100644
index 0000000..7ea4754
--- /dev/null
+++ b/ql/src/java/org/apache/hadoop/hive/ql/optimizer/SortedDynPartitionTimeGranularityOptimizer.java
@@ -0,0 +1,363 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.hadoop.hive.ql.optimizer;
+
+import com.google.common.collect.Lists;
+import com.google.common.collect.Maps;
+import org.apache.commons.lang.StringUtils;
+import org.apache.hadoop.hive.conf.Constants;
+import org.apache.hadoop.hive.conf.HiveConf;
+import org.apache.hadoop.hive.metastore.api.FieldSchema;
+import org.apache.hadoop.hive.ql.exec.ColumnInfo;
+import org.apache.hadoop.hive.ql.exec.FileSinkOperator;
+import org.apache.hadoop.hive.ql.exec.Operator;
+import org.apache.hadoop.hive.ql.exec.OperatorFactory;
+import org.apache.hadoop.hive.ql.exec.ReduceSinkOperator;
+import org.apache.hadoop.hive.ql.exec.RowSchema;
+import org.apache.hadoop.hive.ql.exec.SelectOperator;
+import org.apache.hadoop.hive.ql.exec.UDF;
+import org.apache.hadoop.hive.ql.exec.Utilities;
+import org.apache.hadoop.hive.ql.exec.Utilities.ReduceField;
+import org.apache.hadoop.hive.ql.io.AcidUtils;
+import org.apache.hadoop.hive.ql.lib.DefaultGraphWalker;
+import org.apache.hadoop.hive.ql.lib.DefaultRuleDispatcher;
+import org.apache.hadoop.hive.ql.lib.Dispatcher;
+import org.apache.hadoop.hive.ql.lib.GraphWalker;
+import org.apache.hadoop.hive.ql.lib.Node;
+import org.apache.hadoop.hive.ql.lib.NodeProcessor;
+import org.apache.hadoop.hive.ql.lib.NodeProcessorCtx;
+import org.apache.hadoop.hive.ql.lib.Rule;
+import org.apache.hadoop.hive.ql.lib.RuleRegExp;
+import org.apache.hadoop.hive.ql.parse.ParseContext;
+import org.apache.hadoop.hive.ql.parse.SemanticException;
+import org.apache.hadoop.hive.ql.plan.ExprNodeColumnDesc;
+import org.apache.hadoop.hive.ql.plan.ExprNodeDesc;
+import org.apache.hadoop.hive.ql.plan.ExprNodeGenericFuncDesc;
+import org.apache.hadoop.hive.ql.plan.FileSinkDesc;
+import org.apache.hadoop.hive.ql.plan.OperatorDesc;
+import org.apache.hadoop.hive.ql.plan.PlanUtils;
+import org.apache.hadoop.hive.ql.plan.ReduceSinkDesc;
+import org.apache.hadoop.hive.ql.plan.SelectDesc;
+import org.apache.hadoop.hive.ql.plan.TableDesc;
+import org.apache.hadoop.hive.ql.udf.UDFDateFloorDay;
+import org.apache.hadoop.hive.ql.udf.UDFDateFloorHour;
+import org.apache.hadoop.hive.ql.udf.UDFDateFloorMinute;
+import org.apache.hadoop.hive.ql.udf.UDFDateFloorMonth;
+import org.apache.hadoop.hive.ql.udf.UDFDateFloorSecond;
+import org.apache.hadoop.hive.ql.udf.UDFDateFloorWeek;
+import org.apache.hadoop.hive.ql.udf.UDFDateFloorYear;
+import org.apache.hadoop.hive.ql.udf.generic.GenericUDFBridge;
+import org.apache.hadoop.hive.serde2.objectinspector.ObjectInspector;
+import org.apache.hadoop.hive.serde2.objectinspector.PrimitiveObjectInspector.PrimitiveCategory;
+import org.apache.hadoop.hive.serde2.typeinfo.PrimitiveTypeInfo;
+import org.apache.hadoop.hive.serde2.typeinfo.TypeInfoFactory;
+import org.apache.parquet.Strings;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import java.util.ArrayList;
+import java.util.HashMap;
+import java.util.LinkedHashMap;
+import java.util.List;
+import java.util.Map;
+import java.util.Stack;
+
+/**
+ * Introduces a RS before FS to partition data by configuration specified
+ * time granularity.
+ */
+public class SortedDynPartitionTimeGranularityOptimizer extends Transform {
+
+ @Override
+ public ParseContext transform(ParseContext pCtx) throws SemanticException {
+ // create a walker which walks the tree in a DFS manner while maintaining the
+ // operator stack. The dispatcher generates the plan from the operator tree
+ Map<Rule, NodeProcessor> opRules = new LinkedHashMap<Rule, NodeProcessor>();
+
+ String FS = FileSinkOperator.getOperatorName() + "%";
+
+ opRules.put(new RuleRegExp("Sorted Dynamic Partition Time Granularity", FS), getSortDynPartProc(pCtx));
+
+ Dispatcher disp = new DefaultRuleDispatcher(null, opRules, null);
+ GraphWalker ogw = new DefaultGraphWalker(disp);
+
+ ArrayList<Node> topNodes = new ArrayList<Node>();
+ topNodes.addAll(pCtx.getTopOps().values());
+ ogw.startWalking(topNodes, null);
+
+ return pCtx;
+ }
+
+ private NodeProcessor getSortDynPartProc(ParseContext pCtx) {
+ return new SortedDynamicPartitionProc(pCtx);
+ }
+
+ class SortedDynamicPartitionProc implements NodeProcessor {
+
+ private final Logger LOG = LoggerFactory.getLogger(SortedDynPartitionTimeGranularityOptimizer.class);
+ protected ParseContext parseCtx;
+
+ public SortedDynamicPartitionProc(ParseContext pCtx) {
+ this.parseCtx = pCtx;
+ }
+
+ @Override
+ public Object process(Node nd, Stack<Node> stack, NodeProcessorCtx procCtx,
+ Object... nodeOutputs) throws SemanticException {
+
+ // introduce RS and EX before FS
+ FileSinkOperator fsOp = (FileSinkOperator) nd;
+ final String sh = fsOp.getConf().getTableInfo().getOutputFileFormatClassName();
+ if (parseCtx.getQueryProperties().isQuery() || sh == null || !sh
+ .equals(Constants.DRUID_HIVE_OUTPUT_FORMAT)) {
+ // Bail out, nothing to do
+ return null;
+ }
+ String segmentGranularity = parseCtx.getCreateTable().getTblProps()
+ .get(Constants.DRUID_SEGMENT_GRANULARITY);
+ segmentGranularity = !Strings.isNullOrEmpty(segmentGranularity)
+ ? segmentGranularity
+ : HiveConf.getVar(parseCtx.getConf(),
+ HiveConf.ConfVars.HIVE_DRUID_INDEXING_GRANULARITY
+ );
+ LOG.info("Sorted dynamic partitioning on time granularity optimization kicked in...");
+
+ // unlink connection between FS and its parent
+ Operator<? extends OperatorDesc> fsParent = fsOp.getParentOperators().get(0);
+ fsParent = fsOp.getParentOperators().get(0);
+ fsParent.getChildOperators().clear();
+
+ // Create SelectOp with granularity column
+ Operator<? extends OperatorDesc> granularitySelOp = getGranularitySelOp(fsParent,
+ segmentGranularity
+ );
+
+ // Create ReduceSinkOp operator
+ ArrayList<ColumnInfo> parentCols = Lists.newArrayList(granularitySelOp.getSchema().getSignature());
+ ArrayList<ExprNodeDesc> allRSCols = Lists.newArrayList();
+ for (ColumnInfo ci : parentCols) {
+ allRSCols.add(new ExprNodeColumnDesc(ci));
+ }
+ // Get the key positions
+ List<Integer> keyPositions = new ArrayList<>();
+ keyPositions.add(allRSCols.size() - 1);
+ List<Integer> sortOrder = new ArrayList<Integer>(1);
+ sortOrder.add(1); // asc
+ List<Integer> sortNullOrder = new ArrayList<Integer>(1);
+ sortNullOrder.add(0); // nulls first
+ ReduceSinkOperator rsOp = getReduceSinkOp(keyPositions, sortOrder,
+ sortNullOrder, allRSCols, granularitySelOp, fsOp.getConf().getWriteType());
+
+ // Create backtrack SelectOp
+ List<ExprNodeDesc> descs = new ArrayList<ExprNodeDesc>(allRSCols.size());
+ List<String> colNames = new ArrayList<String>();
+ String colName;
+ for (int i = 0; i < allRSCols.size(); i++) {
+ ExprNodeDesc col = allRSCols.get(i);
+ colName = col.getExprString();
+ colNames.add(colName);
+ if (keyPositions.contains(i)) {
+ descs.add(new ExprNodeColumnDesc(col.getTypeInfo(), ReduceField.KEY.toString()+"."+colName, null, false));
+ } else {
+ descs.add(new ExprNodeColumnDesc(col.getTypeInfo(), ReduceField.VALUE.toString()+"."+colName, null, false));
+ }
+ }
+ RowSchema selRS = new RowSchema(granularitySelOp.getSchema());
+ SelectDesc selConf = new SelectDesc(descs, colNames);
+ SelectOperator backtrackSelOp = (SelectOperator) OperatorFactory.getAndMakeChild(
+ selConf, selRS, rsOp);
+
+ // Link backtrack SelectOp to FileSinkOp
+ fsOp.getParentOperators().clear();
+ fsOp.getParentOperators().add(backtrackSelOp);
+ backtrackSelOp.getChildOperators().add(fsOp);
+
+ // Update file sink descriptor
+ fsOp.getConf().setDpSortState(FileSinkDesc.DPSortState.PARTITION_SORTED);
+ fsOp.getConf().setPartitionCols(rsOp.getConf().getPartitionCols());
+ ColumnInfo ci = new ColumnInfo(granularitySelOp.getSchema().getSignature().get(
+ granularitySelOp.getSchema().getSignature().size() - 1)); // granularity column
+ fsOp.getSchema().getSignature().add(ci);
+
+ LOG.info("Inserted " + granularitySelOp.getOperatorId() + ", " + rsOp.getOperatorId() + " and "
+ + backtrackSelOp.getOperatorId() + " as parent of " + fsOp.getOperatorId()
+ + " and child of " + fsParent.getOperatorId());
+
+ parseCtx.setReduceSinkAddedBySortedDynPartition(true);
+ return null;
+ }
+
+ private Operator<? extends OperatorDesc> getGranularitySelOp(
+ Operator<? extends OperatorDesc> fsParent, String segmentGranularity
+ ) throws SemanticException {
+ ArrayList<ColumnInfo> parentCols = Lists.newArrayList(fsParent.getSchema().getSignature());
+ ArrayList<ExprNodeDesc> descs = Lists.newArrayList();
+ List<String> colNames = Lists.newArrayList();
+ int timestampPos = -1;
+ for (int i = 0; i < parentCols.size(); i++) {
+ ColumnInfo ci = parentCols.get(i);
+ ExprNodeColumnDesc columnDesc = new ExprNodeColumnDesc(ci);
+ descs.add(columnDesc);
+ colNames.add(columnDesc.getExprString());
+ if (columnDesc.getTypeInfo().getCategory() == ObjectInspector.Category.PRIMITIVE
+ && ((PrimitiveTypeInfo) columnDesc.getTypeInfo()).getPrimitiveCategory() == PrimitiveCategory.TIMESTAMP) {
+ if (timestampPos != -1) {
+ throw new SemanticException("Multiple columns with timestamp type on query result; "
+ + "could not resolve which one is the timestamp column");
+ }
+ timestampPos = i;
+ }
+ }
+ if (timestampPos == -1) {
+ throw new SemanticException("No column with timestamp type on query result; "
+ + "one column should be of timestamp type");
+ }
+ RowSchema selRS = new RowSchema(fsParent.getSchema());
+ // Granularity (partition) column
+ String udfName;
+
+ Class<? extends UDF> udfClass;
+ switch (segmentGranularity) {
+ case "YEAR":
+ udfName = "floor_year";
+ udfClass = UDFDateFloorYear.class;
+ break;
+ case "MONTH":
+ udfName = "floor_month";
+ udfClass = UDFDateFloorMonth.class;
+ break;
+ case "WEEK":
+ udfName = "floor_week";
+ udfClass = UDFDateFloorWeek.class;
+ break;
+ case "DAY":
+ udfName = "floor_day";
+ udfClass = UDFDateFloorDay.class;
+ break;
+ case "HOUR":
+ udfName = "floor_hour";
+ udfClass = UDFDateFloorHour.class;
+ break;
+ case "MINUTE":
+ udfName = "floor_minute";
+ udfClass = UDFDateFloorMinute.class;
+ break;
+ case "SECOND":
+ udfName = "floor_second";
+ udfClass = UDFDateFloorSecond.class;
+ break;
+ default:
+ throw new SemanticException("Granularity for Druid segment not recognized");
+ }
+ ExprNodeDesc expr = new ExprNodeColumnDesc(parentCols.get(timestampPos));
+ descs.add(new ExprNodeGenericFuncDesc(
+ TypeInfoFactory.timestampTypeInfo,
+ new GenericUDFBridge(udfName, false, udfClass.getName()),
+ Lists.newArrayList(expr)));
+ colNames.add(Constants.DRUID_TIMESTAMP_GRANULARITY_COL_NAME);
+ // Add granularity to the row schema
+ ColumnInfo ci = new ColumnInfo(Constants.DRUID_TIMESTAMP_GRANULARITY_COL_NAME, TypeInfoFactory.timestampTypeInfo,
+ selRS.getSignature().get(0).getTabAlias(), false, false);
+ selRS.getSignature().add(ci);
+
+ // Create SelectDesc
+ SelectDesc selConf = new SelectDesc(descs, colNames);
+
+ // Create Select Operator
+ SelectOperator selOp = (SelectOperator) OperatorFactory.getAndMakeChild(
+ selConf, selRS, fsParent);
+
+ return selOp;
+ }
+
+ private ReduceSinkOperator getReduceSinkOp(List<Integer> keyPositions, List<Integer> sortOrder,
+ List<Integer> sortNullOrder, ArrayList<ExprNodeDesc> allCols, Operator<? extends OperatorDesc> parent,
+ AcidUtils.Operation writeType) throws SemanticException {
+
+ ArrayList<ExprNodeDesc> keyCols = Lists.newArrayList();
+ // we will clone here as RS will update bucket column key with its
+ // corresponding with bucket number and hence their OIs
+ for (Integer idx : keyPositions) {
+ keyCols.add(allCols.get(idx).clone());
+ }
+
+ ArrayList<ExprNodeDesc> valCols = Lists.newArrayList();
+ for (int i = 0; i < allCols.size(); i++) {
+ if (!keyPositions.contains(i)) {
+ valCols.add(allCols.get(i).clone());
+ }
+ }
+
+ ArrayList<ExprNodeDesc> partCols = Lists.newArrayList();
+ for (Integer idx : keyPositions) {
+ partCols.add(allCols.get(idx).clone());
+ }
+
+ // map _col0 to KEY._col0, etc
+ Map<String, ExprNodeDesc> colExprMap = Maps.newHashMap();
+ Map<String, String> nameMapping = new HashMap<>();
+ ArrayList<String> keyColNames = Lists.newArrayList();
+ for (ExprNodeDesc keyCol : keyCols) {
+ String keyColName = keyCol.getExprString();
+ keyColNames.add(keyColName);
+ colExprMap.put(Utilities.ReduceField.KEY + "." +keyColName, keyCol);
+ nameMapping.put(keyColName, Utilities.ReduceField.KEY + "." + keyColName);
+ }
+ ArrayList<String> valColNames = Lists.newArrayList();
+ for (ExprNodeDesc valCol : valCols) {
+ String colName = valCol.getExprString();
+ valColNames.add(colName);
+ colExprMap.put(Utilities.ReduceField.VALUE + "." + colName, valCol);
+ nameMapping.put(colName, Utilities.ReduceField.VALUE + "." + colName);
+ }
+
+ // order and null order
+ String orderStr = StringUtils.repeat("+", sortOrder.size());
+ String nullOrderStr = StringUtils.repeat("a", sortNullOrder.size());
+
+ // Create Key/Value TableDesc. When the operator plan is split into MR tasks,
+ // the reduce operator will initialize Extract operator with information
+ // from Key and Value TableDesc
+ List<FieldSchema> fields = PlanUtils.getFieldSchemasFromColumnList(keyCols,
+ keyColNames, 0, "");
+ TableDesc keyTable = PlanUtils.getReduceKeyTableDesc(fields, orderStr, nullOrderStr);
+ List<FieldSchema> valFields = PlanUtils.getFieldSchemasFromColumnList(valCols,
+ valColNames, 0, "");
+ TableDesc valueTable = PlanUtils.getReduceValueTableDesc(valFields);
+ List<List<Integer>> distinctColumnIndices = Lists.newArrayList();
+
+ // Number of reducers is set to default (-1)
+ ReduceSinkDesc rsConf = new ReduceSinkDesc(keyCols, keyCols.size(), valCols,
+ keyColNames, distinctColumnIndices, valColNames, -1, partCols, -1, keyTable,
+ valueTable, writeType);
+
+ ArrayList<ColumnInfo> signature = new ArrayList<>();
+ for (int index = 0; index < parent.getSchema().getSignature().size(); index++) {
+ ColumnInfo colInfo = new ColumnInfo(parent.getSchema().getSignature().get(index));
+ colInfo.setInternalName(nameMapping.get(colInfo.getInternalName()));
+ signature.add(colInfo);
+ }
+ ReduceSinkOperator op = (ReduceSinkOperator) OperatorFactory.getAndMakeChild(
+ rsConf, new RowSchema(signature), parent);
+ op.setColumnExprMap(colExprMap);
+ return op;
+ }
+
+ }
+
+}
http://git-wip-us.apache.org/repos/asf/hive/blob/590687bc/ql/src/java/org/apache/hadoop/hive/ql/parse/SemanticAnalyzer.java
----------------------------------------------------------------------
diff --git a/ql/src/java/org/apache/hadoop/hive/ql/parse/SemanticAnalyzer.java b/ql/src/java/org/apache/hadoop/hive/ql/parse/SemanticAnalyzer.java
index def1a7d..7e6dcf3 100644
--- a/ql/src/java/org/apache/hadoop/hive/ql/parse/SemanticAnalyzer.java
+++ b/ql/src/java/org/apache/hadoop/hive/ql/parse/SemanticAnalyzer.java
@@ -11803,10 +11803,6 @@ public class SemanticAnalyzer extends BaseSemanticAnalyzer {
storageFormat.fillDefaultStorageFormat(isExt, false);
- if ((command_type == CTAS) && (storageFormat.getStorageHandler() != null)) {
- throw new SemanticException(ErrorMsg.CREATE_NON_NATIVE_AS.getMsg());
- }
-
// check for existence of table
if (ifNotExists) {
try {
http://git-wip-us.apache.org/repos/asf/hive/blob/590687bc/ql/src/java/org/apache/hadoop/hive/ql/plan/PlanUtils.java
----------------------------------------------------------------------
diff --git a/ql/src/java/org/apache/hadoop/hive/ql/plan/PlanUtils.java b/ql/src/java/org/apache/hadoop/hive/ql/plan/PlanUtils.java
index 1c0ac1e..d3a1528 100644
--- a/ql/src/java/org/apache/hadoop/hive/ql/plan/PlanUtils.java
+++ b/ql/src/java/org/apache/hadoop/hive/ql/plan/PlanUtils.java
@@ -18,21 +18,11 @@
package org.apache.hadoop.hive.ql.plan;
-import java.io.IOException;
-import java.util.ArrayList;
-import java.util.Collection;
-import java.util.Collections;
-import java.util.Comparator;
-import java.util.LinkedHashMap;
-import java.util.List;
-import java.util.Map;
-import java.util.Properties;
-import java.util.Set;
-
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.hive.common.JavaUtils;
import org.apache.hadoop.hive.conf.HiveConf;
import org.apache.hadoop.hive.conf.HiveConf.ConfVars;
+import org.apache.hadoop.hive.llap.LlapOutputFormat;
import org.apache.hadoop.hive.metastore.MetaStoreUtils;
import org.apache.hadoop.hive.metastore.api.FieldSchema;
import org.apache.hadoop.hive.metastore.api.hive_metastoreConstants;
@@ -46,7 +36,6 @@ import org.apache.hadoop.hive.ql.io.HiveOutputFormat;
import org.apache.hadoop.hive.ql.io.IgnoreKeyTextOutputFormat;
import org.apache.hadoop.hive.ql.io.RCFileInputFormat;
import org.apache.hadoop.hive.ql.io.RCFileOutputFormat;
-import org.apache.hadoop.hive.llap.LlapOutputFormat;
import org.apache.hadoop.hive.ql.metadata.Hive;
import org.apache.hadoop.hive.ql.metadata.HiveException;
import org.apache.hadoop.hive.ql.metadata.HiveStorageHandler;
@@ -71,12 +60,24 @@ import org.apache.hadoop.hive.serde2.typeinfo.TypeInfo;
import org.apache.hadoop.hive.serde2.typeinfo.TypeInfoFactory;
import org.apache.hadoop.mapred.InputFormat;
import org.apache.hadoop.mapred.JobConf;
+import org.apache.hadoop.mapred.OutputFormat;
import org.apache.hadoop.mapred.SequenceFileInputFormat;
import org.apache.hadoop.mapred.SequenceFileOutputFormat;
import org.apache.hadoop.mapred.TextInputFormat;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
+import java.io.IOException;
+import java.util.ArrayList;
+import java.util.Collection;
+import java.util.Collections;
+import java.util.Comparator;
+import java.util.LinkedHashMap;
+import java.util.List;
+import java.util.Map;
+import java.util.Properties;
+import java.util.Set;
+
/**
* PlanUtils.
*
@@ -306,17 +307,26 @@ public final class PlanUtils {
public static TableDesc getTableDesc(CreateTableDesc crtTblDesc, String cols,
String colTypes) {
- Class<? extends Deserializer> serdeClass = LazySimpleSerDe.class;
- String separatorCode = Integer.toString(Utilities.ctrlaCode);
- String columns = cols;
- String columnTypes = colTypes;
- boolean lastColumnTakesRestOfTheLine = false;
TableDesc ret;
+ // Resolve storage handler (if any)
try {
- if (crtTblDesc.getSerName() != null) {
- Class c = JavaUtils.loadClass(crtTblDesc.getSerName());
- serdeClass = c;
+ HiveStorageHandler storageHandler = null;
+ if (crtTblDesc.getStorageHandler() != null) {
+ storageHandler = HiveUtils.getStorageHandler(
+ SessionState.getSessionConf(), crtTblDesc.getStorageHandler());
+ }
+
+ Class<? extends Deserializer> serdeClass = LazySimpleSerDe.class;
+ String separatorCode = Integer.toString(Utilities.ctrlaCode);
+ String columns = cols;
+ String columnTypes = colTypes;
+ boolean lastColumnTakesRestOfTheLine = false;
+
+ if (storageHandler != null) {
+ serdeClass = storageHandler.getSerDeClass();
+ } else if (crtTblDesc.getSerName() != null) {
+ serdeClass = JavaUtils.loadClass(crtTblDesc.getSerName());
}
if (crtTblDesc.getFieldDelim() != null) {
@@ -329,6 +339,12 @@ public final class PlanUtils {
// set other table properties
Properties properties = ret.getProperties();
+ if (crtTblDesc.getStorageHandler() != null) {
+ properties.setProperty(
+ org.apache.hadoop.hive.metastore.api.hive_metastoreConstants.META_TABLE_STORAGE,
+ crtTblDesc.getStorageHandler());
+ }
+
if (crtTblDesc.getCollItemDelim() != null) {
properties.setProperty(serdeConstants.COLLECTION_DELIM, crtTblDesc
.getCollItemDelim());
@@ -367,15 +383,24 @@ public final class PlanUtils {
// replace the default input & output file format with those found in
// crtTblDesc
- Class c1 = JavaUtils.loadClass(crtTblDesc.getInputFormat());
- Class c2 = JavaUtils.loadClass(crtTblDesc.getOutputFormat());
- Class<? extends InputFormat> in_class = c1;
- Class<? extends HiveOutputFormat> out_class = c2;
-
+ Class<? extends InputFormat> in_class;
+ if (storageHandler != null) {
+ in_class = storageHandler.getInputFormatClass();
+ } else {
+ in_class = JavaUtils.loadClass(crtTblDesc.getInputFormat());
+ }
+ Class<? extends OutputFormat> out_class;
+ if (storageHandler != null) {
+ out_class = storageHandler.getOutputFormatClass();
+ } else {
+ out_class = JavaUtils.loadClass(crtTblDesc.getOutputFormat());
+ }
ret.setInputFileFormatClass(in_class);
ret.setOutputFileFormatClass(out_class);
} catch (ClassNotFoundException e) {
throw new RuntimeException("Unable to find class in getTableDesc: " + e.getMessage(), e);
+ } catch (HiveException e) {
+ throw new RuntimeException("Error loading storage handler in getTableDesc: " + e.getMessage(), e);
}
return ret;
}
http://git-wip-us.apache.org/repos/asf/hive/blob/590687bc/ql/src/java/org/apache/hadoop/hive/ql/plan/TableDesc.java
----------------------------------------------------------------------
diff --git a/ql/src/java/org/apache/hadoop/hive/ql/plan/TableDesc.java b/ql/src/java/org/apache/hadoop/hive/ql/plan/TableDesc.java
index 4f053d8..63dc94c 100644
--- a/ql/src/java/org/apache/hadoop/hive/ql/plan/TableDesc.java
+++ b/ql/src/java/org/apache/hadoop/hive/ql/plan/TableDesc.java
@@ -18,12 +18,6 @@
package org.apache.hadoop.hive.ql.plan;
-import java.io.Serializable;
-import java.util.Enumeration;
-import java.util.LinkedHashMap;
-import java.util.Map;
-import java.util.Properties;
-
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.hive.metastore.api.hive_metastoreConstants;
import org.apache.hadoop.hive.ql.exec.Utilities;
@@ -34,15 +28,25 @@ import org.apache.hadoop.hive.serde2.Deserializer;
import org.apache.hadoop.hive.serde2.SerDeUtils;
import org.apache.hadoop.mapred.InputFormat;
import org.apache.hadoop.mapred.OutputFormat;
-
import org.apache.hive.common.util.HiveStringUtils;
import org.apache.hive.common.util.ReflectionUtil;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import java.io.Serializable;
+import java.util.Enumeration;
+import java.util.LinkedHashMap;
+import java.util.Map;
+import java.util.Properties;
/**
* TableDesc.
*
*/
public class TableDesc implements Serializable, Cloneable {
+
+ private static final Logger LOG = LoggerFactory.getLogger(TableDesc.class);
+
private static final long serialVersionUID = 1L;
private Class<? extends InputFormat> inputFileFormatClass;
private Class<? extends OutputFormat> outputFileFormatClass;
http://git-wip-us.apache.org/repos/asf/hive/blob/590687bc/ql/src/test/queries/clientnegative/druid_external.q
----------------------------------------------------------------------
diff --git a/ql/src/test/queries/clientnegative/druid_external.q b/ql/src/test/queries/clientnegative/druid_external.q
deleted file mode 100644
index 2de04db..0000000
--- a/ql/src/test/queries/clientnegative/druid_external.q
+++ /dev/null
@@ -1,5 +0,0 @@
-set hive.druid.broker.address.default=localhost.test;
-
-CREATE TABLE druid_table_1
-STORED BY 'org.apache.hadoop.hive.druid.QTestDruidStorageHandler'
-TBLPROPERTIES ("druid.datasource" = "wikipedia");
http://git-wip-us.apache.org/repos/asf/hive/blob/590687bc/ql/src/test/results/clientnegative/druid_external.q.out
----------------------------------------------------------------------
diff --git a/ql/src/test/results/clientnegative/druid_external.q.out b/ql/src/test/results/clientnegative/druid_external.q.out
deleted file mode 100644
index e5fac51..0000000
--- a/ql/src/test/results/clientnegative/druid_external.q.out
+++ /dev/null
@@ -1,7 +0,0 @@
-PREHOOK: query: CREATE TABLE druid_table_1
-STORED BY 'org.apache.hadoop.hive.druid.QTestDruidStorageHandler'
-TBLPROPERTIES ("druid.datasource" = "wikipedia")
-PREHOOK: type: CREATETABLE
-PREHOOK: Output: database:default
-PREHOOK: Output: default@druid_table_1
-FAILED: Execution Error, return code 1 from org.apache.hadoop.hive.ql.exec.DDLTask. MetaException(message:Table in Druid needs to be declared as EXTERNAL)
http://git-wip-us.apache.org/repos/asf/hive/blob/590687bc/ql/src/test/results/clientpositive/druid_basic2.q.out
----------------------------------------------------------------------
diff --git a/ql/src/test/results/clientpositive/druid_basic2.q.out b/ql/src/test/results/clientpositive/druid_basic2.q.out
index 5c4359b..48de99a 100644
--- a/ql/src/test/results/clientpositive/druid_basic2.q.out
+++ b/ql/src/test/results/clientpositive/druid_basic2.q.out
@@ -269,8 +269,8 @@ STAGE PLANS:
#### A masked pattern was here ####
Partition
base file name: druid_table_1
- input format: org.apache.hadoop.hive.druid.HiveDruidQueryBasedInputFormat
- output format: org.apache.hadoop.hive.druid.HiveDruidOutputFormat
+ input format: org.apache.hadoop.hive.druid.io.DruidQueryBasedInputFormat
+ output format: org.apache.hadoop.hive.druid.io.DruidOutputFormat
properties:
COLUMN_STATS_ACCURATE {"BASIC_STATS":"true"}
EXTERNAL TRUE
@@ -294,8 +294,8 @@ STAGE PLANS:
#### A masked pattern was here ####
serde: org.apache.hadoop.hive.druid.QTestDruidSerDe
- input format: org.apache.hadoop.hive.druid.HiveDruidQueryBasedInputFormat
- output format: org.apache.hadoop.hive.druid.HiveDruidOutputFormat
+ input format: org.apache.hadoop.hive.druid.io.DruidQueryBasedInputFormat
+ output format: org.apache.hadoop.hive.druid.io.DruidOutputFormat
properties:
COLUMN_STATS_ACCURATE {"BASIC_STATS":"true"}
EXTERNAL TRUE
@@ -435,8 +435,8 @@ STAGE PLANS:
#### A masked pattern was here ####
Partition
base file name: druid_table_1
- input format: org.apache.hadoop.hive.druid.HiveDruidQueryBasedInputFormat
- output format: org.apache.hadoop.hive.druid.HiveDruidOutputFormat
+ input format: org.apache.hadoop.hive.druid.io.DruidQueryBasedInputFormat
+ output format: org.apache.hadoop.hive.druid.io.DruidOutputFormat
properties:
COLUMN_STATS_ACCURATE {"BASIC_STATS":"true"}
EXTERNAL TRUE
@@ -460,8 +460,8 @@ STAGE PLANS:
#### A masked pattern was here ####
serde: org.apache.hadoop.hive.druid.QTestDruidSerDe
- input format: org.apache.hadoop.hive.druid.HiveDruidQueryBasedInputFormat
- output format: org.apache.hadoop.hive.druid.HiveDruidOutputFormat
+ input format: org.apache.hadoop.hive.druid.io.DruidQueryBasedInputFormat
+ output format: org.apache.hadoop.hive.druid.io.DruidOutputFormat
properties:
COLUMN_STATS_ACCURATE {"BASIC_STATS":"true"}
EXTERNAL TRUE
[4/4] hive git commit: HIVE-15277: Teach Hive how to create/delete
Druid segments (Slim Bouguerra, reviewed by Jesus Camacho Rodriguez)
Posted by jc...@apache.org.
HIVE-15277: Teach Hive how to create/delete Druid segments (Slim Bouguerra, reviewed by Jesus Camacho Rodriguez)
Project: http://git-wip-us.apache.org/repos/asf/hive/repo
Commit: http://git-wip-us.apache.org/repos/asf/hive/commit/590687bc
Tree: http://git-wip-us.apache.org/repos/asf/hive/tree/590687bc
Diff: http://git-wip-us.apache.org/repos/asf/hive/diff/590687bc
Branch: refs/heads/master
Commit: 590687bc4cb97cdf4aa95ba94f28949986d1b3e8
Parents: 89362a1
Author: Slim Bouguerra <sl...@gmail.com>
Authored: Thu Dec 15 20:43:45 2016 +0000
Committer: Jesus Camacho Rodriguez <jc...@apache.org>
Committed: Thu Dec 15 20:43:45 2016 +0000
----------------------------------------------------------------------
.../hadoop/hive/common/JvmPauseMonitor.java | 3 +-
.../org/apache/hadoop/hive/conf/Constants.java | 7 +
.../org/apache/hadoop/hive/conf/HiveConf.java | 38 +-
druid-handler/README.md | 3 +
druid-handler/pom.xml | 145 +++-
.../hadoop/hive/druid/DruidStorageHandler.java | 311 +++++++-
.../hive/druid/DruidStorageHandlerUtils.java | 351 ++++++++-
.../hive/druid/HiveDruidOutputFormat.java | 55 --
.../druid/HiveDruidQueryBasedInputFormat.java | 376 ----------
.../hadoop/hive/druid/HiveDruidSplit.java | 83 ---
.../hadoop/hive/druid/io/DruidOutputFormat.java | 204 ++++++
.../druid/io/DruidQueryBasedInputFormat.java | 397 ++++++++++
.../hadoop/hive/druid/io/DruidRecordWriter.java | 260 +++++++
.../hadoop/hive/druid/io/HiveDruidSplit.java | 84 +++
.../serde/DruidGroupByQueryRecordReader.java | 15 +-
.../druid/serde/DruidQueryRecordReader.java | 42 +-
.../serde/DruidSelectQueryRecordReader.java | 8 +-
.../hadoop/hive/druid/serde/DruidSerDe.java | 286 +++++---
.../hive/druid/serde/DruidSerDeUtils.java | 6 +-
.../serde/DruidTimeseriesQueryRecordReader.java | 7 +-
.../druid/serde/DruidTopNQueryRecordReader.java | 8 +-
.../hive/druid/DruidStorageHandlerTest.java | 181 +++++
.../hadoop/hive/druid/QTestDruidSerDe.java | 60 +-
.../hadoop/hive/druid/TestDerbyConnector.java | 108 +++
.../hadoop/hive/druid/TestDruidSerDe.java | 731 ++++++++++---------
.../TestHiveDruidQueryBasedInputFormat.java | 91 ++-
.../hive/ql/io/DruidRecordWriterTest.java | 221 ++++++
.../llap/daemon/impl/TaskRunnerCallable.java | 45 +-
pom.xml | 2 +-
.../hadoop/hive/ql/exec/FileSinkOperator.java | 29 +-
.../apache/hadoop/hive/ql/exec/Utilities.java | 113 ++-
.../hadoop/hive/ql/hooks/LineageLogger.java | 31 +-
.../hadoop/hive/ql/optimizer/Optimizer.java | 2 +
...tedDynPartitionTimeGranularityOptimizer.java | 363 +++++++++
.../hadoop/hive/ql/parse/SemanticAnalyzer.java | 4 -
.../apache/hadoop/hive/ql/plan/PlanUtils.java | 75 +-
.../apache/hadoop/hive/ql/plan/TableDesc.java | 18 +-
.../queries/clientnegative/druid_external.q | 5 -
.../results/clientnegative/druid_external.q.out | 7 -
.../results/clientpositive/druid_basic2.q.out | 16 +-
40 files changed, 3534 insertions(+), 1257 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/hive/blob/590687bc/common/src/java/org/apache/hadoop/hive/common/JvmPauseMonitor.java
----------------------------------------------------------------------
diff --git a/common/src/java/org/apache/hadoop/hive/common/JvmPauseMonitor.java b/common/src/java/org/apache/hadoop/hive/common/JvmPauseMonitor.java
index 5d475f4..cf080e3 100644
--- a/common/src/java/org/apache/hadoop/hive/common/JvmPauseMonitor.java
+++ b/common/src/java/org/apache/hadoop/hive/common/JvmPauseMonitor.java
@@ -37,6 +37,7 @@ import java.lang.management.ManagementFactory;
import java.util.List;
import java.util.Map;
import java.util.Set;
+import java.util.concurrent.TimeUnit;
/**
* Based on the JvmPauseMonitor from Hadoop.
@@ -181,7 +182,7 @@ public class JvmPauseMonitor {
} catch (InterruptedException ie) {
return;
}
- long extraSleepTime = sw.elapsedMillis() - SLEEP_INTERVAL_MS;
+ long extraSleepTime = sw.elapsed(TimeUnit.MILLISECONDS) - SLEEP_INTERVAL_MS;
Map<String, GcTimes> gcTimesAfterSleep = getGcTimes();
if (extraSleepTime > warnThresholdMs) {
http://git-wip-us.apache.org/repos/asf/hive/blob/590687bc/common/src/java/org/apache/hadoop/hive/conf/Constants.java
----------------------------------------------------------------------
diff --git a/common/src/java/org/apache/hadoop/hive/conf/Constants.java b/common/src/java/org/apache/hadoop/hive/conf/Constants.java
index 6c42163..ea7864a 100644
--- a/common/src/java/org/apache/hadoop/hive/conf/Constants.java
+++ b/common/src/java/org/apache/hadoop/hive/conf/Constants.java
@@ -26,10 +26,17 @@ public class Constants {
/* Constants for Druid storage handler */
public static final String DRUID_HIVE_STORAGE_HANDLER_ID =
"org.apache.hadoop.hive.druid.DruidStorageHandler";
+ public static final String DRUID_HIVE_OUTPUT_FORMAT =
+ "org.apache.hadoop.hive.druid.io.DruidOutputFormat";
public static final String DRUID_DATA_SOURCE = "druid.datasource";
+ public static final String DRUID_SEGMENT_GRANULARITY = "druid.segment.granularity";
+ public static final String DRUID_TIMESTAMP_GRANULARITY_COL_NAME = "__time_granularity";
public static final String DRUID_QUERY_JSON = "druid.query.json";
public static final String DRUID_QUERY_TYPE = "druid.query.type";
public static final String DRUID_QUERY_FETCH = "druid.query.fetch";
+ public static final String DRUID_SEGMENT_DIRECTORY = "druid.storage.storageDirectory";
+ public static final String DRUID_SEGMENT_VERSION = "druid.segment.version";
+ public static final String DRUID_JOB_WORKING_DIRECTORY = "druid.job.workingDirectory";
public static final String HIVE_SERVER2_JOB_CREDSTORE_PASSWORD_ENVVAR = "HIVE_JOB_CREDSTORE_PASSWORD";
public static final String HADOOP_CREDENTIAL_PASSWORD_ENVVAR = "HADOOP_CREDSTORE_PASSWORD";
http://git-wip-us.apache.org/repos/asf/hive/blob/590687bc/common/src/java/org/apache/hadoop/hive/conf/HiveConf.java
----------------------------------------------------------------------
diff --git a/common/src/java/org/apache/hadoop/hive/conf/HiveConf.java b/common/src/java/org/apache/hadoop/hive/conf/HiveConf.java
index 032ff0c..dcb383d 100644
--- a/common/src/java/org/apache/hadoop/hive/conf/HiveConf.java
+++ b/common/src/java/org/apache/hadoop/hive/conf/HiveConf.java
@@ -1936,9 +1936,21 @@ public class HiveConf extends Configuration {
new TimeValidator(TimeUnit.MILLISECONDS), "Frequency of WriteSet reaper runs"),
// For Druid storage handler
+ HIVE_DRUID_INDEXING_GRANULARITY("hive.druid.indexer.segments.granularity", "DAY",
+ new PatternSet("YEAR", "MONTH", "WEEK", "DAY", "HOUR", "MINUTE", "SECOND"),
+ "Granularity for the segments created by the Druid storage handler"
+ ),
+ HIVE_DRUID_MAX_PARTITION_SIZE("hive.druid.indexer.partition.size.max", 5000000,
+ "Maximum number of records per segment partition"
+ ),
+ HIVE_DRUID_MAX_ROW_IN_MEMORY("hive.druid.indexer.memory.rownum.max", 75000,
+ "Maximum number of records in memory while storing data in Druid"
+ ),
HIVE_DRUID_BROKER_DEFAULT_ADDRESS("hive.druid.broker.address.default", "localhost:8082",
- "Address of the Druid broker. If we are querying Druid from Hive, this address needs to be\n" +
- "declared"),
+ "Address of the Druid broker. If we are querying Druid from Hive, this address needs to be\n"
+ +
+ "declared"
+ ),
HIVE_DRUID_SELECT_THRESHOLD("hive.druid.select.threshold", 10000,
"When we can split a Select query, this is the maximum number of rows that we try to retrieve\n" +
"per query. In order to do that, we obtain the estimated size for the complete result. If the\n" +
@@ -1949,7 +1961,27 @@ public class HiveConf extends Configuration {
"the HTTP client."),
HIVE_DRUID_HTTP_READ_TIMEOUT("hive.druid.http.read.timeout", "PT1M", "Read timeout period for the HTTP\n" +
"client in ISO8601 format (for example P2W, P3M, PT1H30M, PT0.750S), default is period of 1 minute."),
-
+ HIVE_DRUID_BASE_PERSIST_DIRECTORY("hive.druid.basePersistDirectory", "/tmp",
+ "Local temporary directory used to persist intermediate indexing state."
+ ),
+ DRUID_SEGMENT_DIRECTORY("hive.druid.storage.storageDirectory", "/druid/segments"
+ , "druid deep storage location."),
+ DRUID_METADATA_BASE("hive.druid.metadata.base", "druid", "Default prefix for metadata tables"),
+ DRUID_METADATA_DB_TYPE("hive.druid.metadata.db.type", "mysql",
+ new PatternSet("mysql", "postgres"), "Type of the metadata database."
+ ),
+ DRUID_METADATA_DB_USERNAME("hive.druid.metadata.username", "",
+ "Username to connect to Type of the metadata DB."
+ ),
+ DRUID_METADATA_DB_PASSWORD("hive.druid.metadata.password", "",
+ "Password to connect to Type of the metadata DB."
+ ),
+ DRUID_METADATA_DB_URI("hive.druid.metadata.uri", "",
+ "URI to connect to the database (for example jdbc:mysql://hostname:port/DBName)."
+ ),
+ DRUID_WORKING_DIR("hive.druid.working.directory", "/tmp/workingDirectory",
+ "Default hdfs working directory used to store some intermediate metadata"
+ ),
// For HBase storage handler
HIVE_HBASE_WAL_ENABLED("hive.hbase.wal.enabled", true,
"Whether writes to HBase should be forced to the write-ahead log. \n" +
http://git-wip-us.apache.org/repos/asf/hive/blob/590687bc/druid-handler/README.md
----------------------------------------------------------------------
diff --git a/druid-handler/README.md b/druid-handler/README.md
new file mode 100644
index 0000000..b548567
--- /dev/null
+++ b/druid-handler/README.md
@@ -0,0 +1,3 @@
+# Druid Storage Handler
+
+[Link for documentation]( https://cwiki.apache.org/confluence/display/Hive/Druid+Integration)
http://git-wip-us.apache.org/repos/asf/hive/blob/590687bc/druid-handler/pom.xml
----------------------------------------------------------------------
diff --git a/druid-handler/pom.xml b/druid-handler/pom.xml
index e4fa8fd..f691a2c 100644
--- a/druid-handler/pom.xml
+++ b/druid-handler/pom.xml
@@ -29,6 +29,8 @@
<properties>
<hive.path.to.root>..</hive.path.to.root>
+ <druid.metamx.util.version>0.27.10</druid.metamx.util.version>
+ <druid.guava.version>16.0.1</druid.guava.version>
</properties>
<dependencies>
@@ -47,6 +49,10 @@
<groupId>io.netty</groupId>
<artifactId>netty</artifactId>
</exclusion>
+ <exclusion>
+ <groupId>com.google.guava</groupId>
+ <artifactId>guava</artifactId>
+ </exclusion>
</exclusions>
</dependency>
<!-- inter-project -->
@@ -56,37 +62,47 @@
<version>${commons-lang.version}</version>
</dependency>
<dependency>
- <groupId>org.apache.hadoop</groupId>
- <artifactId>hadoop-common</artifactId>
- <version>${hadoop.version}</version>
- <optional>true</optional>
- <exclusions>
- <exclusion>
- <groupId>org.slf4j</groupId>
- <artifactId>slf4j-log4j12</artifactId>
- </exclusion>
- <exclusion>
- <groupId>commmons-logging</groupId>
- <artifactId>commons-logging</artifactId>
- </exclusion>
- </exclusions>
+ <groupId>com.google.guava</groupId>
+ <artifactId>guava</artifactId>
+ <version>${druid.guava.version}</version>
</dependency>
<dependency>
- <groupId>org.apache.hadoop</groupId>
- <artifactId>hadoop-mapreduce-client-core</artifactId>
- <version>${hadoop.version}</version>
- <optional>true</optional>
+ <groupId>com.metamx</groupId>
+ <artifactId>java-util</artifactId>
+ <version>${druid.metamx.util.version}</version>
<exclusions>
<exclusion>
- <groupId>io.netty</groupId>
- <artifactId>netty</artifactId>
+ <groupId>com.fasterxml.jackson.core</groupId>
+ <artifactId>jackson-core</artifactId>
+ </exclusion>
+ <exclusion>
+ <groupId>com.fasterxml.jackson.core</groupId>
+ <artifactId>jackson-annotations</artifactId>
+ </exclusion>
+ <exclusion>
+ <groupId>com.fasterxml.jackson.core</groupId>
+ <artifactId>jackson-databind</artifactId>
</exclusion>
</exclusions>
</dependency>
<dependency>
- <groupId>org.apache.calcite</groupId>
- <artifactId>calcite-druid</artifactId>
- <version>${calcite.version}</version>
+ <groupId>io.druid</groupId>
+ <artifactId>druid-server</artifactId>
+ <version>${druid.version}</version>
+ <exclusions>
+ <exclusion>
+ <groupId>com.fasterxml.jackson.core</groupId>
+ <artifactId>jackson-core</artifactId>
+ </exclusion>
+ <exclusion>
+ <groupId>com.fasterxml.jackson.core</groupId>
+ <artifactId>jackson-annotations</artifactId>
+ </exclusion>
+ <exclusion>
+ <groupId>com.fasterxml.jackson.core</groupId>
+ <artifactId>jackson-databind</artifactId>
+ </exclusion>
+ </exclusions>
</dependency>
<dependency>
<groupId>io.druid</groupId>
@@ -107,7 +123,72 @@
</exclusion>
</exclusions>
</dependency>
-
+ <dependency>
+ <groupId>io.druid.extensions</groupId>
+ <artifactId>druid-hdfs-storage</artifactId>
+ <version>${druid.version}</version>
+ </dependency>
+ <dependency>
+ <groupId>io.druid.extensions</groupId>
+ <artifactId>mysql-metadata-storage</artifactId>
+ <version>${druid.version}</version>
+ </dependency>
+ <dependency>
+ <groupId>io.druid.extensions</groupId>
+ <artifactId>postgresql-metadata-storage</artifactId>
+ <version>${druid.version}</version>
+ </dependency>
+ <dependency>
+ <groupId>org.apache.hadoop</groupId>
+ <artifactId>hadoop-common</artifactId>
+ <scope>provided</scope>
+ <version>${hadoop.version}</version>
+ <optional>true</optional>
+ <exclusions>
+ <exclusion>
+ <groupId>org.slf4j</groupId>
+ <artifactId>slf4j-log4j12</artifactId>
+ </exclusion>
+ <exclusion>
+ <groupId>com.google.guava</groupId>
+ <artifactId>guava</artifactId>
+ </exclusion>
+ <exclusion>
+ <groupId>commmons-logging</groupId>
+ <artifactId>commons-logging</artifactId>
+ </exclusion>
+ </exclusions>
+ </dependency>
+ <dependency>
+ <groupId>org.apache.hadoop</groupId>
+ <scope>provided</scope>
+ <artifactId>hadoop-mapreduce-client-core</artifactId>
+ <version>${hadoop.version}</version>
+ <optional>true</optional>
+ <exclusions>
+ <exclusion>
+ <groupId>com.google.inject</groupId>
+ <artifactId>guice</artifactId>
+ </exclusion>
+ <exclusion>
+ <groupId>com.google.inject.extensions</groupId>
+ <artifactId>guice-servlet</artifactId>
+ </exclusion>
+ <exclusion>
+ <groupId>io.netty</groupId>
+ <artifactId>netty</artifactId>
+ </exclusion>
+ <exclusion>
+ <groupId>com.google.guava</groupId>
+ <artifactId>guava</artifactId>
+ </exclusion>
+ </exclusions>
+ </dependency>
+ <dependency>
+ <groupId>org.apache.calcite</groupId>
+ <artifactId>calcite-druid</artifactId>
+ <version>${calcite.version}</version>
+ </dependency>
<!-- test inter-project -->
<dependency>
<groupId>junit</groupId>
@@ -115,6 +196,12 @@
<version>${junit.version}</version>
<scope>test</scope>
</dependency>
+ <dependency>
+ <groupId>io.druid</groupId>
+ <artifactId>druid-indexing-hadoop</artifactId>
+ <version>${druid.version}</version>
+ <scope>test</scope>
+ </dependency>
</dependencies>
<build>
@@ -170,15 +257,25 @@
<pattern>com.fasterxml.jackson</pattern>
<shadedPattern>org.apache.hive.druid.com.fasterxml.jackson</shadedPattern>
</relocation>
+ <relocation>
+ <pattern>com.google.common</pattern>
+ <shadedPattern>org.apache.hive.druid.com.google.common</shadedPattern>
+ </relocation>
</relocations>
<artifactSet>
<includes>
<include>io.druid:*</include>
+ <include>io.druid.extensions:*</include>
<include>com.metamx:*</include>
<include>io.netty:*</include>
<include>com.fasterxml.jackson.core:*</include>
<include>com.fasterxml.jackson.datatype:*</include>
<include>com.fasterxml.jackson.dataformat:*</include>
+ <include>com.google.guava:*</include>
+ <include>it.unimi.dsi:*</include>
+ <include>org.jdbi:*</include>
+ <include>net.jpountz.lz4:*</include>
+ <include>org.apache.commons:*</include>
</includes>
</artifactSet>
<filters>
http://git-wip-us.apache.org/repos/asf/hive/blob/590687bc/druid-handler/src/java/org/apache/hadoop/hive/druid/DruidStorageHandler.java
----------------------------------------------------------------------
diff --git a/druid-handler/src/java/org/apache/hadoop/hive/druid/DruidStorageHandler.java b/druid-handler/src/java/org/apache/hadoop/hive/druid/DruidStorageHandler.java
index 8242385..a08a4e3 100644
--- a/druid-handler/src/java/org/apache/hadoop/hive/druid/DruidStorageHandler.java
+++ b/druid-handler/src/java/org/apache/hadoop/hive/druid/DruidStorageHandler.java
@@ -6,9 +6,9 @@
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
+ * <p>
+ * http://www.apache.org/licenses/LICENSE-2.0
+ * <p>
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
@@ -17,36 +17,131 @@
*/
package org.apache.hadoop.hive.druid;
+import com.google.common.annotations.VisibleForTesting;
+import com.google.common.base.Preconditions;
+import com.google.common.base.Strings;
+import com.google.common.base.Supplier;
+import com.google.common.base.Suppliers;
+import com.google.common.base.Throwables;
+import io.druid.indexer.SQLMetadataStorageUpdaterJobHandler;
+import io.druid.metadata.MetadataStorageConnectorConfig;
+import io.druid.metadata.MetadataStorageTablesConfig;
+import io.druid.metadata.SQLMetadataConnector;
+import io.druid.metadata.storage.mysql.MySQLConnector;
+import io.druid.metadata.storage.postgresql.PostgreSQLConnector;
+import io.druid.segment.loading.SegmentLoadingException;
+import io.druid.timeline.DataSegment;
import org.apache.commons.lang3.StringUtils;
+import org.apache.hadoop.fs.FileSystem;
+import org.apache.hadoop.fs.Path;
import org.apache.hadoop.hive.conf.Constants;
+import org.apache.hadoop.hive.conf.HiveConf;
+import org.apache.hadoop.hive.druid.io.DruidOutputFormat;
+import org.apache.hadoop.hive.druid.io.DruidQueryBasedInputFormat;
import org.apache.hadoop.hive.druid.serde.DruidSerDe;
import org.apache.hadoop.hive.metastore.HiveMetaHook;
import org.apache.hadoop.hive.metastore.MetaStoreUtils;
import org.apache.hadoop.hive.metastore.api.MetaException;
import org.apache.hadoop.hive.metastore.api.Table;
import org.apache.hadoop.hive.ql.metadata.DefaultStorageHandler;
+import org.apache.hadoop.hive.ql.plan.TableDesc;
+import org.apache.hadoop.hive.ql.session.SessionState;
import org.apache.hadoop.hive.serde2.AbstractSerDe;
import org.apache.hadoop.mapred.InputFormat;
import org.apache.hadoop.mapred.OutputFormat;
+import org.joda.time.DateTime;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
+import java.io.IOException;
+import java.util.Collection;
+import java.util.List;
+import java.util.Map;
+
/**
* DruidStorageHandler provides a HiveStorageHandler implementation for Druid.
*/
-@SuppressWarnings({"deprecation","rawtypes"})
+@SuppressWarnings({ "deprecation", "rawtypes" })
public class DruidStorageHandler extends DefaultStorageHandler implements HiveMetaHook {
protected static final Logger LOG = LoggerFactory.getLogger(DruidStorageHandler.class);
+ public static final String SEGMENTS_DESCRIPTOR_DIR_NAME = "segmentsDescriptorDir";
+
+ private final SQLMetadataConnector connector;
+
+ private final SQLMetadataStorageUpdaterJobHandler druidSqlMetadataStorageUpdaterJobHandler;
+
+ private final MetadataStorageTablesConfig druidMetadataStorageTablesConfig;
+
+ private String uniqueId = null;
+
+ private String rootWorkingDir = null;
+
+ public DruidStorageHandler() {
+ //this is the default value in druid
+ final String base = HiveConf
+ .getVar(SessionState.getSessionConf(), HiveConf.ConfVars.DRUID_METADATA_BASE);
+ final String dbType = HiveConf
+ .getVar(SessionState.getSessionConf(), HiveConf.ConfVars.DRUID_METADATA_DB_TYPE);
+ final String username = HiveConf
+ .getVar(SessionState.getSessionConf(), HiveConf.ConfVars.DRUID_METADATA_DB_USERNAME);
+ final String password = HiveConf
+ .getVar(SessionState.getSessionConf(), HiveConf.ConfVars.DRUID_METADATA_DB_PASSWORD);
+ final String uri = HiveConf
+ .getVar(SessionState.getSessionConf(), HiveConf.ConfVars.DRUID_METADATA_DB_URI);
+ druidMetadataStorageTablesConfig = MetadataStorageTablesConfig.fromBase(base);
+
+ final Supplier<MetadataStorageConnectorConfig> storageConnectorConfigSupplier = Suppliers.<MetadataStorageConnectorConfig>ofInstance(
+ new MetadataStorageConnectorConfig() {
+ @Override
+ public String getConnectURI() {
+ return uri;
+ }
+
+ @Override
+ public String getUser() {
+ return username;
+ }
+
+ @Override
+ public String getPassword() {
+ return password;
+ }
+ });
+
+ if (dbType.equals("mysql")) {
+ connector = new MySQLConnector(storageConnectorConfigSupplier,
+ Suppliers.ofInstance(druidMetadataStorageTablesConfig)
+ );
+ } else if (dbType.equals("postgres")) {
+ connector = new PostgreSQLConnector(storageConnectorConfigSupplier,
+ Suppliers.ofInstance(druidMetadataStorageTablesConfig)
+ );
+ } else {
+ throw new IllegalStateException(String.format("Unknown metadata storage type [%s]", dbType));
+ }
+ druidSqlMetadataStorageUpdaterJobHandler = new SQLMetadataStorageUpdaterJobHandler(connector);
+ }
+
+ @VisibleForTesting
+ public DruidStorageHandler(SQLMetadataConnector connector,
+ SQLMetadataStorageUpdaterJobHandler druidSqlMetadataStorageUpdaterJobHandler,
+ MetadataStorageTablesConfig druidMetadataStorageTablesConfig
+ ) {
+ this.connector = connector;
+ this.druidSqlMetadataStorageUpdaterJobHandler = druidSqlMetadataStorageUpdaterJobHandler;
+ this.druidMetadataStorageTablesConfig = druidMetadataStorageTablesConfig;
+ }
+
@Override
public Class<? extends InputFormat> getInputFormatClass() {
- return HiveDruidQueryBasedInputFormat.class;
+ return DruidQueryBasedInputFormat.class;
}
@Override
public Class<? extends OutputFormat> getOutputFormatClass() {
- return HiveDruidOutputFormat.class;
+ return DruidOutputFormat.class;
}
@Override
@@ -62,28 +157,141 @@ public class DruidStorageHandler extends DefaultStorageHandler implements HiveMe
@Override
public void preCreateTable(Table table) throws MetaException {
// Do safety checks
- if (!MetaStoreUtils.isExternalTable(table)) {
- throw new MetaException("Table in Druid needs to be declared as EXTERNAL");
- }
- if (!StringUtils.isEmpty(table.getSd().getLocation())) {
+ if (MetaStoreUtils.isExternalTable(table) && !StringUtils
+ .isEmpty(table.getSd().getLocation())) {
throw new MetaException("LOCATION may not be specified for Druid");
}
+
if (table.getPartitionKeysSize() != 0) {
throw new MetaException("PARTITIONED BY may not be specified for Druid");
}
if (table.getSd().getBucketColsSize() != 0) {
throw new MetaException("CLUSTERED BY may not be specified for Druid");
}
+ String dataSourceName = table.getParameters().get(Constants.DRUID_DATA_SOURCE);
+ if (MetaStoreUtils.isExternalTable(table)) {
+ return;
+ }
+ // If it is not an external table we need to check the metadata
+ try {
+ connector.createSegmentTable();
+ } catch (Exception e) {
+ LOG.error("Exception while trying to create druid segments table", e);
+ throw new MetaException(e.getMessage());
+ }
+ Collection<String> existingDataSources = DruidStorageHandlerUtils
+ .getAllDataSourceNames(connector, druidMetadataStorageTablesConfig);
+ LOG.debug(String.format("pre-create data source with name [%s]", dataSourceName));
+ if (existingDataSources.contains(dataSourceName)) {
+ throw new MetaException(String.format("Data source [%s] already existing", dataSourceName));
+ }
}
@Override
public void rollbackCreateTable(Table table) throws MetaException {
- // Nothing to do
+ if (MetaStoreUtils.isExternalTable(table)) {
+ return;
+ }
+ final Path segmentDescriptorDir = getSegmentDescriptorDir();
+ try {
+ List<DataSegment> dataSegmentList = DruidStorageHandlerUtils
+ .getPublishedSegments(segmentDescriptorDir, getConf());
+ for (DataSegment dataSegment : dataSegmentList) {
+ try {
+ deleteSegment(dataSegment);
+ } catch (SegmentLoadingException e) {
+ LOG.error(String.format("Error while trying to clean the segment [%s]", dataSegment), e);
+ }
+ }
+ } catch (IOException e) {
+ LOG.error("Exception while rollback", e);
+ throw Throwables.propagate(e);
+ } finally {
+ cleanWorkingDir();
+ }
}
@Override
public void commitCreateTable(Table table) throws MetaException {
- // Nothing to do
+ if (MetaStoreUtils.isExternalTable(table)) {
+ return;
+ }
+ LOG.info(String.format("Committing table [%s] to the druid metastore", table.getDbName()));
+ final Path tableDir = getSegmentDescriptorDir();
+ try {
+ List<DataSegment> segmentList = DruidStorageHandlerUtils
+ .getPublishedSegments(tableDir, getConf());
+ LOG.info(String.format("Found [%d] segments under path [%s]", segmentList.size(), tableDir));
+ druidSqlMetadataStorageUpdaterJobHandler.publishSegments(
+ druidMetadataStorageTablesConfig.getSegmentsTable(),
+ segmentList,
+ DruidStorageHandlerUtils.JSON_MAPPER
+ );
+ } catch (IOException e) {
+ LOG.error("Exception while commit", e);
+ Throwables.propagate(e);
+ } finally {
+ cleanWorkingDir();
+ }
+ }
+
+ @VisibleForTesting
+ protected void deleteSegment(DataSegment segment) throws SegmentLoadingException {
+
+ final Path path = getPath(segment);
+ LOG.info(String.format("removing segment[%s], located at path[%s]", segment.getIdentifier(),
+ path
+ ));
+
+ try {
+ if (path.getName().endsWith(".zip")) {
+
+ final FileSystem fs = path.getFileSystem(getConf());
+
+ if (!fs.exists(path)) {
+ LOG.warn(String.format(
+ "Segment Path [%s] does not exist. It appears to have been deleted already.",
+ path
+ ));
+ return;
+ }
+
+ // path format -- > .../dataSource/interval/version/partitionNum/xxx.zip
+ Path partitionNumDir = path.getParent();
+ if (!fs.delete(partitionNumDir, true)) {
+ throw new SegmentLoadingException(
+ "Unable to kill segment, failed to delete dir [%s]",
+ partitionNumDir.toString()
+ );
+ }
+
+ //try to delete other directories if possible
+ Path versionDir = partitionNumDir.getParent();
+ if (safeNonRecursiveDelete(fs, versionDir)) {
+ Path intervalDir = versionDir.getParent();
+ if (safeNonRecursiveDelete(fs, intervalDir)) {
+ Path dataSourceDir = intervalDir.getParent();
+ safeNonRecursiveDelete(fs, dataSourceDir);
+ }
+ }
+ } else {
+ throw new SegmentLoadingException("Unknown file type[%s]", path);
+ }
+ } catch (IOException e) {
+ throw new SegmentLoadingException(e, "Unable to kill segment");
+ }
+ }
+
+ private static Path getPath(DataSegment dataSegment) {
+ return new Path(String.valueOf(dataSegment.getLoadSpec().get("path")));
+ }
+
+ private static boolean safeNonRecursiveDelete(FileSystem fs, Path path) {
+ try {
+ return fs.delete(path, false);
+ } catch (Exception ex) {
+ return false;
+ }
}
@Override
@@ -98,7 +306,45 @@ public class DruidStorageHandler extends DefaultStorageHandler implements HiveMe
@Override
public void commitDropTable(Table table, boolean deleteData) throws MetaException {
- // Nothing to do
+ if (MetaStoreUtils.isExternalTable(table)) {
+ return;
+ }
+ String dataSourceName = Preconditions
+ .checkNotNull(table.getParameters().get(Constants.DRUID_DATA_SOURCE),
+ "DataSource name is null !"
+ );
+
+ if (deleteData == true) {
+ LOG.info(String.format("Dropping with purge all the data for data source [%s]",
+ dataSourceName
+ ));
+ List<DataSegment> dataSegmentList = DruidStorageHandlerUtils
+ .getDataSegmentList(connector, druidMetadataStorageTablesConfig, dataSourceName);
+ if (dataSegmentList.isEmpty()) {
+ LOG.info(String.format("Nothing to delete for data source [%s]", dataSourceName));
+ return;
+ }
+ for (DataSegment dataSegment : dataSegmentList) {
+ try {
+ deleteSegment(dataSegment);
+ } catch (SegmentLoadingException e) {
+ LOG.error(String.format("Error while deleting segment [%s]", dataSegment.getIdentifier()),
+ e
+ );
+ }
+ }
+ }
+ if (DruidStorageHandlerUtils
+ .disableDataSource(connector, druidMetadataStorageTablesConfig, dataSourceName)) {
+ LOG.info(String.format("Successfully dropped druid data source [%s]", dataSourceName));
+ }
+ }
+
+ @Override
+ public void configureOutputJobProperties(TableDesc tableDesc, Map<String, String> jobProperties
+ ) {
+ jobProperties.put(Constants.DRUID_SEGMENT_VERSION, new DateTime().toString());
+ jobProperties.put(Constants.DRUID_JOB_WORKING_DIRECTORY, getStagingWorkingDir().toString());
}
@Override
@@ -106,4 +352,43 @@ public class DruidStorageHandler extends DefaultStorageHandler implements HiveMe
return Constants.DRUID_HIVE_STORAGE_HANDLER_ID;
}
+ public String getUniqueId() {
+ if (uniqueId == null) {
+ uniqueId = Preconditions.checkNotNull(
+ Strings.emptyToNull(HiveConf.getVar(getConf(), HiveConf.ConfVars.HIVEQUERYID)),
+ "Hive query id is null"
+ );
+ }
+ return uniqueId;
+ }
+
+ private Path getStagingWorkingDir() {
+ return new Path(getRootWorkingDir(), makeStagingName());
+ }
+
+ @VisibleForTesting
+ protected String makeStagingName() {
+ return ".staging-".concat(getUniqueId().replace(":", ""));
+ }
+
+ private Path getSegmentDescriptorDir() {
+ return new Path(getStagingWorkingDir(), SEGMENTS_DESCRIPTOR_DIR_NAME);
+ }
+
+ private void cleanWorkingDir() {
+ final FileSystem fileSystem;
+ try {
+ fileSystem = getStagingWorkingDir().getFileSystem(getConf());
+ fileSystem.delete(getStagingWorkingDir(), true);
+ } catch (IOException e) {
+ LOG.error("Got Exception while cleaning working directory", e);
+ }
+ }
+
+ private String getRootWorkingDir() {
+ if (Strings.isNullOrEmpty(rootWorkingDir)) {
+ rootWorkingDir = HiveConf.getVar(getConf(), HiveConf.ConfVars.DRUID_WORKING_DIR);
+ }
+ return rootWorkingDir;
+ }
}
http://git-wip-us.apache.org/repos/asf/hive/blob/590687bc/druid-handler/src/java/org/apache/hadoop/hive/druid/DruidStorageHandlerUtils.java
----------------------------------------------------------------------
diff --git a/druid-handler/src/java/org/apache/hadoop/hive/druid/DruidStorageHandlerUtils.java b/druid-handler/src/java/org/apache/hadoop/hive/druid/DruidStorageHandlerUtils.java
index c6b8024..193e4aa 100644
--- a/druid-handler/src/java/org/apache/hadoop/hive/druid/DruidStorageHandlerUtils.java
+++ b/druid-handler/src/java/org/apache/hadoop/hive/druid/DruidStorageHandlerUtils.java
@@ -6,9 +6,9 @@
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
+ * <p>
+ * http://www.apache.org/licenses/LICENSE-2.0
+ * <p>
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
@@ -17,22 +17,62 @@
*/
package org.apache.hadoop.hive.druid;
-import java.io.IOException;
-import java.io.InputStream;
-import java.net.URL;
-import java.util.concurrent.ExecutionException;
-
-import org.jboss.netty.handler.codec.http.HttpHeaders;
-import org.jboss.netty.handler.codec.http.HttpMethod;
-
import com.fasterxml.jackson.databind.ObjectMapper;
+import com.fasterxml.jackson.databind.jsontype.NamedType;
import com.fasterxml.jackson.dataformat.smile.SmileFactory;
+import com.google.common.base.Throwables;
+import com.google.common.collect.ImmutableList;
+import com.google.common.collect.Interner;
+import com.google.common.collect.Interners;
+import com.google.common.collect.Lists;
+import com.metamx.common.MapUtils;
+import com.metamx.emitter.EmittingLogger;
+import com.metamx.emitter.core.NoopEmitter;
+import com.metamx.emitter.service.ServiceEmitter;
import com.metamx.http.client.HttpClient;
import com.metamx.http.client.Request;
import com.metamx.http.client.response.InputStreamResponseHandler;
-
import io.druid.jackson.DefaultObjectMapper;
+import io.druid.metadata.MetadataStorageTablesConfig;
+import io.druid.metadata.SQLMetadataConnector;
+import io.druid.metadata.storage.mysql.MySQLConnector;
import io.druid.query.BaseQuery;
+import io.druid.segment.IndexIO;
+import io.druid.segment.IndexMergerV9;
+import io.druid.segment.column.ColumnConfig;
+import io.druid.timeline.DataSegment;
+import io.druid.timeline.partition.LinearShardSpec;
+import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.fs.FileStatus;
+import org.apache.hadoop.fs.FileSystem;
+import org.apache.hadoop.fs.Path;
+import org.apache.hadoop.io.retry.RetryPolicies;
+import org.apache.hadoop.io.retry.RetryProxy;
+import org.jboss.netty.handler.codec.http.HttpHeaders;
+import org.jboss.netty.handler.codec.http.HttpMethod;
+import org.skife.jdbi.v2.FoldController;
+import org.skife.jdbi.v2.Folder3;
+import org.skife.jdbi.v2.Handle;
+import org.skife.jdbi.v2.StatementContext;
+import org.skife.jdbi.v2.TransactionCallback;
+import org.skife.jdbi.v2.TransactionStatus;
+import org.skife.jdbi.v2.tweak.HandleCallback;
+import org.skife.jdbi.v2.util.ByteArrayMapper;
+
+import java.io.IOException;
+import java.io.InputStream;
+import java.io.OutputStream;
+import java.net.InetAddress;
+import java.net.URL;
+import java.net.UnknownHostException;
+import java.sql.SQLException;
+import java.util.ArrayList;
+import java.util.Collection;
+import java.util.List;
+import java.util.Map;
+import java.util.TimeZone;
+import java.util.concurrent.ExecutionException;
+import java.util.concurrent.TimeUnit;
/**
* Utils class for Druid storage handler.
@@ -51,12 +91,61 @@ public final class DruidStorageHandlerUtils {
*/
public static final ObjectMapper SMILE_MAPPER = new DefaultObjectMapper(new SmileFactory());
+ private static final int NUM_RETRIES = 8;
+
+ private static final int SECONDS_BETWEEN_RETRIES = 2;
+
+ private static final int DEFAULT_FS_BUFFER_SIZE = 1 << 18; // 256KB
+
+ private static final int DEFAULT_STREAMING_RESULT_SIZE = 100;
+
+ /**
+ * Used by druid to perform IO on indexes
+ */
+ public static final IndexIO INDEX_IO = new IndexIO(JSON_MAPPER, new ColumnConfig() {
+ @Override
+ public int columnCacheSizeBytes() {
+ return 0;
+ }
+ });
+
+ /**
+ * Used by druid to merge indexes
+ */
+ public static final IndexMergerV9 INDEX_MERGER_V9 = new IndexMergerV9(JSON_MAPPER,
+ DruidStorageHandlerUtils.INDEX_IO
+ );
+
+ /**
+ * Generic Interner implementation used to read segments object from metadata storage
+ */
+ public static final Interner<DataSegment> DATA_SEGMENT_INTERNER = Interners.newWeakInterner();
+
+ static {
+ // Register the shard sub type to be used by the mapper
+ JSON_MAPPER.registerSubtypes(new NamedType(LinearShardSpec.class, "linear"));
+ // set the timezone of the object mapper
+ // THIS IS NOT WORKING workaround is to set it as part of java opts -Duser.timezone="UTC"
+ JSON_MAPPER.setTimeZone(TimeZone.getTimeZone("UTC"));
+ try {
+ // No operation emitter will be used by some internal druid classes.
+ EmittingLogger.registerEmitter(
+ new ServiceEmitter("druid-hive-indexer", InetAddress.getLocalHost().getHostName(),
+ new NoopEmitter()
+ ));
+ } catch (UnknownHostException e) {
+ throw Throwables.propagate(e);
+ }
+ }
+
/**
* Method that creates a request for Druid JSON query (using SMILE).
- * @param mapper
+ *
* @param address
* @param query
+ *
* @return
+ *
* @throws IOException
*/
public static Request createRequest(String address, BaseQuery<?> query)
@@ -69,9 +158,12 @@ public final class DruidStorageHandlerUtils {
/**
* Method that submits a request to an Http address and retrieves the result.
* The caller is responsible for closing the stream once it finishes consuming it.
+ *
* @param client
* @param request
+ *
* @return
+ *
* @throws IOException
*/
public static InputStream submitRequest(HttpClient client, Request request)
@@ -87,4 +179,237 @@ public final class DruidStorageHandlerUtils {
return response;
}
+ /**
+ * @param taskDir path to the directory containing the segments descriptor info
+ * the descriptor path will be .../workingPath/task_id/{@link DruidStorageHandler#SEGMENTS_DESCRIPTOR_DIR_NAME}/*.json
+ * @param conf hadoop conf to get the file system
+ *
+ * @return List of DataSegments
+ *
+ * @throws IOException can be for the case we did not produce data.
+ */
+ public static List<DataSegment> getPublishedSegments(Path taskDir, Configuration conf)
+ throws IOException {
+ ImmutableList.Builder<DataSegment> publishedSegmentsBuilder = ImmutableList.builder();
+ FileSystem fs = taskDir.getFileSystem(conf);
+ for (FileStatus fileStatus : fs.listStatus(taskDir)) {
+ final DataSegment segment = JSON_MAPPER
+ .readValue(fs.open(fileStatus.getPath()), DataSegment.class);
+ publishedSegmentsBuilder.add(segment);
+ }
+ List<DataSegment> publishedSegments = publishedSegmentsBuilder.build();
+ return publishedSegments;
+ }
+
+ /**
+ * This function will write to filesystem serialized from of segment descriptor
+ * if an existing file exists it will try to replace it.
+ *
+ * @param outputFS filesystem
+ * @param segment DataSegment object
+ * @param descriptorPath path
+ *
+ * @throws IOException
+ */
+ public static void writeSegmentDescriptor(
+ final FileSystem outputFS,
+ final DataSegment segment,
+ final Path descriptorPath
+ )
+ throws IOException {
+ final DataPusher descriptorPusher = (DataPusher) RetryProxy.create(
+ DataPusher.class, new DataPusher() {
+ @Override
+ public long push() throws IOException {
+ try {
+ if (outputFS.exists(descriptorPath)) {
+ if (!outputFS.delete(descriptorPath, false)) {
+ throw new IOException(
+ String.format("Failed to delete descriptor at [%s]", descriptorPath));
+ }
+ }
+ try (final OutputStream descriptorOut = outputFS.create(
+ descriptorPath,
+ true,
+ DEFAULT_FS_BUFFER_SIZE
+ )) {
+ JSON_MAPPER.writeValue(descriptorOut, segment);
+ descriptorOut.flush();
+ }
+ } catch (RuntimeException | IOException ex) {
+ throw ex;
+ }
+ return -1;
+ }
+ },
+ RetryPolicies
+ .exponentialBackoffRetry(NUM_RETRIES, SECONDS_BETWEEN_RETRIES, TimeUnit.SECONDS)
+ );
+ descriptorPusher.push();
+ }
+
+ /**
+ * @param connector SQL metadata connector to the metadata storage
+ * @param metadataStorageTablesConfig Table config
+ *
+ * @return all the active data sources in the metadata storage
+ */
+ public static Collection<String> getAllDataSourceNames(SQLMetadataConnector connector,
+ final MetadataStorageTablesConfig metadataStorageTablesConfig
+ ) {
+ return connector.getDBI().withHandle(
+ new HandleCallback<List<String>>() {
+ @Override
+ public List<String> withHandle(Handle handle) throws Exception {
+ return handle.createQuery(
+ String.format("SELECT DISTINCT(datasource) FROM %s WHERE used = true",
+ metadataStorageTablesConfig.getSegmentsTable()
+ ))
+ .fold(Lists.<String>newArrayList(),
+ new Folder3<ArrayList<String>, Map<String, Object>>() {
+ @Override
+ public ArrayList<String> fold(ArrayList<String> druidDataSources,
+ Map<String, Object> stringObjectMap,
+ FoldController foldController,
+ StatementContext statementContext) throws SQLException {
+ druidDataSources.add(
+ MapUtils.getString(stringObjectMap, "datasource")
+ );
+ return druidDataSources;
+ }
+ }
+ );
+
+ }
+ }
+ );
+ }
+
+ /**
+ * @param connector SQL connector to metadata
+ * @param metadataStorageTablesConfig Tables configuration
+ * @param dataSource Name of data source
+ *
+ * @return true if the data source was successfully disabled false otherwise
+ */
+ public static boolean disableDataSource(SQLMetadataConnector connector,
+ final MetadataStorageTablesConfig metadataStorageTablesConfig, final String dataSource
+ ) {
+ try {
+ if (!getAllDataSourceNames(connector, metadataStorageTablesConfig).contains(dataSource)) {
+ DruidStorageHandler.LOG
+ .warn(String.format("Cannot delete data source [%s], does not exist", dataSource));
+ return false;
+ }
+
+ connector.getDBI().withHandle(
+ new HandleCallback<Void>() {
+ @Override
+ public Void withHandle(Handle handle) throws Exception {
+ handle.createStatement(
+ String.format("UPDATE %s SET used=false WHERE dataSource = :dataSource",
+ metadataStorageTablesConfig.getSegmentsTable()
+ )
+ )
+ .bind("dataSource", dataSource)
+ .execute();
+
+ return null;
+ }
+ }
+ );
+
+ } catch (Exception e) {
+ DruidStorageHandler.LOG.error(String.format("Error removing dataSource %s", dataSource), e);
+ return false;
+ }
+ return true;
+ }
+
+ /**
+ * @param connector SQL connector to metadata
+ * @param metadataStorageTablesConfig Tables configuration
+ * @param dataSource Name of data source
+ *
+ * @return List of all data segments part of the given data source
+ */
+ public static List<DataSegment> getDataSegmentList(final SQLMetadataConnector connector,
+ final MetadataStorageTablesConfig metadataStorageTablesConfig, final String dataSource
+ ) {
+ List<DataSegment> segmentList = connector.retryTransaction(
+ new TransactionCallback<List<DataSegment>>() {
+ @Override
+ public List<DataSegment> inTransaction(
+ Handle handle, TransactionStatus status
+ ) throws Exception {
+ return handle
+ .createQuery(String.format(
+ "SELECT payload FROM %s WHERE dataSource = :dataSource",
+ metadataStorageTablesConfig.getSegmentsTable()
+ ))
+ .setFetchSize(getStreamingFetchSize(connector))
+ .bind("dataSource", dataSource)
+ .map(ByteArrayMapper.FIRST)
+ .fold(
+ new ArrayList<DataSegment>(),
+ new Folder3<List<DataSegment>, byte[]>() {
+ @Override
+ public List<DataSegment> fold(List<DataSegment> accumulator,
+ byte[] payload, FoldController control,
+ StatementContext ctx
+ ) throws SQLException {
+ try {
+ final DataSegment segment = DATA_SEGMENT_INTERNER.intern(
+ JSON_MAPPER.readValue(
+ payload,
+ DataSegment.class
+ ));
+
+ accumulator.add(segment);
+ return accumulator;
+ } catch (Exception e) {
+ throw new SQLException(e.toString());
+ }
+ }
+ }
+ );
+ }
+ }
+ , 3, SQLMetadataConnector.DEFAULT_MAX_TRIES);
+ return segmentList;
+ }
+
+ /**
+ * @param connector
+ *
+ * @return streaming fetch size.
+ */
+ private static int getStreamingFetchSize(SQLMetadataConnector connector) {
+ if (connector instanceof MySQLConnector) {
+ return Integer.MIN_VALUE;
+ }
+ return DEFAULT_STREAMING_RESULT_SIZE;
+ }
+
+ /**
+ * @param pushedSegment
+ * @param segmentsDescriptorDir
+ *
+ * @return a sanitize file name
+ */
+ public static Path makeSegmentDescriptorOutputPath(DataSegment pushedSegment,
+ Path segmentsDescriptorDir
+ ) {
+ return new Path(
+ segmentsDescriptorDir,
+ String.format("%s.json", pushedSegment.getIdentifier().replace(":", ""))
+ );
+ }
+
+ /**
+ * Simple interface for retry operations
+ */
+ public interface DataPusher {
+ long push() throws IOException;
+ }
}
http://git-wip-us.apache.org/repos/asf/hive/blob/590687bc/druid-handler/src/java/org/apache/hadoop/hive/druid/HiveDruidOutputFormat.java
----------------------------------------------------------------------
diff --git a/druid-handler/src/java/org/apache/hadoop/hive/druid/HiveDruidOutputFormat.java b/druid-handler/src/java/org/apache/hadoop/hive/druid/HiveDruidOutputFormat.java
deleted file mode 100644
index 45e31d6..0000000
--- a/druid-handler/src/java/org/apache/hadoop/hive/druid/HiveDruidOutputFormat.java
+++ /dev/null
@@ -1,55 +0,0 @@
-/**
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements. See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership. The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package org.apache.hadoop.hive.druid;
-
-import java.io.IOException;
-import java.util.Properties;
-
-import org.apache.hadoop.fs.FileSystem;
-import org.apache.hadoop.fs.Path;
-import org.apache.hadoop.hive.ql.exec.FileSinkOperator;
-import org.apache.hadoop.hive.ql.io.HiveOutputFormat;
-import org.apache.hadoop.mapred.JobConf;
-import org.apache.hadoop.mapred.RecordWriter;
-import org.apache.hadoop.util.Progressable;
-
-/**
- * Place holder for Druid output format. Currently not implemented.
- */
-@SuppressWarnings("rawtypes")
-public class HiveDruidOutputFormat implements HiveOutputFormat {
-
- @Override
- public RecordWriter getRecordWriter(FileSystem ignored, JobConf job, String name,
- Progressable progress) throws IOException {
- throw new UnsupportedOperationException();
- }
-
- @Override
- public void checkOutputSpecs(FileSystem ignored, JobConf job) throws IOException {
- throw new UnsupportedOperationException();
- }
-
- @Override
- public FileSinkOperator.RecordWriter getHiveRecordWriter(JobConf jc, Path finalOutPath,
- Class valueClass, boolean isCompressed, Properties tableProperties, Progressable progress)
- throws IOException {
- throw new UnsupportedOperationException();
- }
-
-}
http://git-wip-us.apache.org/repos/asf/hive/blob/590687bc/druid-handler/src/java/org/apache/hadoop/hive/druid/HiveDruidQueryBasedInputFormat.java
----------------------------------------------------------------------
diff --git a/druid-handler/src/java/org/apache/hadoop/hive/druid/HiveDruidQueryBasedInputFormat.java b/druid-handler/src/java/org/apache/hadoop/hive/druid/HiveDruidQueryBasedInputFormat.java
deleted file mode 100644
index 612f853..0000000
--- a/druid-handler/src/java/org/apache/hadoop/hive/druid/HiveDruidQueryBasedInputFormat.java
+++ /dev/null
@@ -1,376 +0,0 @@
-/**
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements. See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership. The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package org.apache.hadoop.hive.druid;
-
-import java.io.IOException;
-import java.io.InputStream;
-import java.util.ArrayList;
-import java.util.Arrays;
-import java.util.HashMap;
-import java.util.List;
-import java.util.Map;
-
-import org.apache.calcite.adapter.druid.DruidDateTimeUtils;
-import org.apache.calcite.adapter.druid.DruidTable;
-import org.apache.commons.lang3.StringEscapeUtils;
-import org.apache.commons.lang3.StringUtils;
-import org.apache.hadoop.conf.Configuration;
-import org.apache.hadoop.fs.Path;
-import org.apache.hadoop.hive.conf.Constants;
-import org.apache.hadoop.hive.conf.HiveConf;
-import org.apache.hadoop.hive.druid.serde.DruidGroupByQueryRecordReader;
-import org.apache.hadoop.hive.druid.serde.DruidQueryRecordReader;
-import org.apache.hadoop.hive.druid.serde.DruidSelectQueryRecordReader;
-import org.apache.hadoop.hive.druid.serde.DruidTimeseriesQueryRecordReader;
-import org.apache.hadoop.hive.druid.serde.DruidTopNQueryRecordReader;
-import org.apache.hadoop.hive.druid.serde.DruidWritable;
-import org.apache.hadoop.hive.shims.ShimLoader;
-import org.apache.hadoop.io.NullWritable;
-import org.apache.hadoop.mapred.JobConf;
-import org.apache.hadoop.mapred.Reporter;
-import org.apache.hadoop.mapreduce.InputFormat;
-import org.apache.hadoop.mapreduce.InputSplit;
-import org.apache.hadoop.mapreduce.Job;
-import org.apache.hadoop.mapreduce.JobContext;
-import org.apache.hadoop.mapreduce.RecordReader;
-import org.apache.hadoop.mapreduce.TaskAttemptContext;
-import org.apache.hadoop.mapreduce.lib.input.FileInputFormat;
-import org.joda.time.Interval;
-import org.joda.time.Period;
-import org.joda.time.chrono.ISOChronology;
-import org.slf4j.Logger;
-import org.slf4j.LoggerFactory;
-
-import com.fasterxml.jackson.core.type.TypeReference;
-import com.metamx.common.lifecycle.Lifecycle;
-import com.metamx.http.client.HttpClient;
-import com.metamx.http.client.HttpClientConfig;
-import com.metamx.http.client.HttpClientInit;
-
-import io.druid.query.Druids;
-import io.druid.query.Druids.SegmentMetadataQueryBuilder;
-import io.druid.query.Druids.SelectQueryBuilder;
-import io.druid.query.Druids.TimeBoundaryQueryBuilder;
-import io.druid.query.Query;
-import io.druid.query.Result;
-import io.druid.query.metadata.metadata.SegmentAnalysis;
-import io.druid.query.metadata.metadata.SegmentMetadataQuery;
-import io.druid.query.select.PagingSpec;
-import io.druid.query.select.SelectQuery;
-import io.druid.query.spec.MultipleIntervalSegmentSpec;
-import io.druid.query.timeboundary.TimeBoundaryQuery;
-import io.druid.query.timeboundary.TimeBoundaryResultValue;
-
-/**
- * Druid query based input format.
- *
- * Given a query and the Druid broker address, it will send it, and retrieve
- * and parse the results.
- */
-public class HiveDruidQueryBasedInputFormat extends InputFormat<NullWritable, DruidWritable>
- implements org.apache.hadoop.mapred.InputFormat<NullWritable, DruidWritable> {
-
- protected static final Logger LOG = LoggerFactory.getLogger(HiveDruidQueryBasedInputFormat.class);
-
- @Override
- public org.apache.hadoop.mapred.InputSplit[] getSplits(JobConf job, int numSplits)
- throws IOException {
- return getInputSplits(job);
- }
-
- @Override
- public List<InputSplit> getSplits(JobContext context) throws IOException, InterruptedException {
- return Arrays.<InputSplit> asList(getInputSplits(context.getConfiguration()));
- }
-
- @SuppressWarnings("deprecation")
- private HiveDruidSplit[] getInputSplits(Configuration conf) throws IOException {
- String address = HiveConf.getVar(conf,
- HiveConf.ConfVars.HIVE_DRUID_BROKER_DEFAULT_ADDRESS);
- if (StringUtils.isEmpty(address)) {
- throw new IOException("Druid broker address not specified in configuration");
- }
- String druidQuery = StringEscapeUtils.unescapeJava(conf.get(Constants.DRUID_QUERY_JSON));
- String druidQueryType;
- if (StringUtils.isEmpty(druidQuery)) {
- // Empty, maybe because CBO did not run; we fall back to
- // full Select query
- if (LOG.isWarnEnabled()) {
- LOG.warn("Druid query is empty; creating Select query");
- }
- String dataSource = conf.get(Constants.DRUID_DATA_SOURCE);
- if (dataSource == null) {
- throw new IOException("Druid data source cannot be empty");
- }
- druidQuery = createSelectStarQuery(address, dataSource);
- druidQueryType = Query.SELECT;
- } else {
- druidQueryType = conf.get(Constants.DRUID_QUERY_TYPE);
- if (druidQueryType == null) {
- throw new IOException("Druid query type not recognized");
- }
- }
-
- // hive depends on FileSplits
- Job job = new Job(conf);
- JobContext jobContext = ShimLoader.getHadoopShims().newJobContext(job);
- Path [] paths = FileInputFormat.getInputPaths(jobContext);
-
- switch (druidQueryType) {
- case Query.TIMESERIES:
- case Query.TOPN:
- case Query.GROUP_BY:
- return new HiveDruidSplit[] { new HiveDruidSplit(address, druidQuery, paths[0]) };
- case Query.SELECT:
- return splitSelectQuery(conf, address, druidQuery, paths[0]);
- default:
- throw new IOException("Druid query type not recognized");
- }
- }
-
- private static String createSelectStarQuery(String address, String dataSource) throws IOException {
- // Create Select query
- SelectQueryBuilder builder = new Druids.SelectQueryBuilder();
- builder.dataSource(dataSource);
- builder.intervals(Arrays.asList(DruidTable.DEFAULT_INTERVAL));
- builder.pagingSpec(PagingSpec.newSpec(1));
- Map<String, Object> context = new HashMap<>();
- context.put(Constants.DRUID_QUERY_FETCH, false);
- builder.context(context);
- return DruidStorageHandlerUtils.JSON_MAPPER.writeValueAsString(builder.build());
- }
-
- /* Method that splits Select query depending on the threshold so read can be
- * parallelized */
- private static HiveDruidSplit[] splitSelectQuery(Configuration conf, String address,
- String druidQuery, Path dummyPath) throws IOException {
- final int selectThreshold = (int) HiveConf.getIntVar(
- conf, HiveConf.ConfVars.HIVE_DRUID_SELECT_THRESHOLD);
- final int numConnection = HiveConf
- .getIntVar(conf, HiveConf.ConfVars.HIVE_DRUID_NUM_HTTP_CONNECTION);
- final Period readTimeout = new Period(
- HiveConf.getVar(conf, HiveConf.ConfVars.HIVE_DRUID_HTTP_READ_TIMEOUT));
- SelectQuery query;
- try {
- query = DruidStorageHandlerUtils.JSON_MAPPER.readValue(druidQuery, SelectQuery.class);
- } catch (Exception e) {
- throw new IOException(e);
- }
-
- final boolean isFetch = query.getContextBoolean(Constants.DRUID_QUERY_FETCH, false);
- if (isFetch) {
- // If it has a limit, we use it and we do not split the query
- return new HiveDruidSplit[] { new HiveDruidSplit(
- address, DruidStorageHandlerUtils.JSON_MAPPER.writeValueAsString(query), dummyPath) };
- }
-
- // We do not have the number of rows, thus we need to execute a
- // Segment Metadata query to obtain number of rows
- SegmentMetadataQueryBuilder metadataBuilder = new Druids.SegmentMetadataQueryBuilder();
- metadataBuilder.dataSource(query.getDataSource());
- metadataBuilder.intervals(query.getIntervals());
- metadataBuilder.merge(true);
- metadataBuilder.analysisTypes();
- SegmentMetadataQuery metadataQuery = metadataBuilder.build();
-
- HttpClient client = HttpClientInit.createClient(
- HttpClientConfig.builder().withNumConnections(numConnection)
- .withReadTimeout(readTimeout.toStandardDuration()).build(), new Lifecycle());
- InputStream response;
- try {
- response = DruidStorageHandlerUtils.submitRequest(client,
- DruidStorageHandlerUtils.createRequest(address, metadataQuery));
- } catch (Exception e) {
- throw new IOException(org.apache.hadoop.util.StringUtils.stringifyException(e));
- }
-
- // Retrieve results
- List<SegmentAnalysis> metadataList;
- try {
- metadataList = DruidStorageHandlerUtils.SMILE_MAPPER.readValue(response,
- new TypeReference<List<SegmentAnalysis>>() {});
- } catch (Exception e) {
- response.close();
- throw new IOException(org.apache.hadoop.util.StringUtils.stringifyException(e));
- }
- if (metadataList == null || metadataList.isEmpty()) {
- throw new IOException("Connected to Druid but could not retrieve datasource information");
- }
- if (metadataList.size() != 1) {
- throw new IOException("Information about segments should have been merged");
- }
-
- final long numRows = metadataList.get(0).getNumRows();
-
- query = query.withPagingSpec(PagingSpec.newSpec(Integer.MAX_VALUE));
- if (numRows <= selectThreshold) {
- // We are not going to split it
- return new HiveDruidSplit[] { new HiveDruidSplit(address,
- DruidStorageHandlerUtils.JSON_MAPPER.writeValueAsString(query), dummyPath) };
- }
-
- // If the query does not specify a timestamp, we obtain the total time using
- // a Time Boundary query. Then, we use the information to split the query
- // following the Select threshold configuration property
- final List<Interval> intervals = new ArrayList<>();
- if (query.getIntervals().size() == 1 && query.getIntervals().get(0).withChronology(
- ISOChronology.getInstanceUTC()).equals(DruidTable.DEFAULT_INTERVAL)) {
- // Default max and min, we should execute a time boundary query to get a
- // more precise range
- TimeBoundaryQueryBuilder timeBuilder = new Druids.TimeBoundaryQueryBuilder();
- timeBuilder.dataSource(query.getDataSource());
- TimeBoundaryQuery timeQuery = timeBuilder.build();
-
- try {
- response = DruidStorageHandlerUtils.submitRequest(client,
- DruidStorageHandlerUtils.createRequest(address, timeQuery));
- } catch (Exception e) {
- throw new IOException(org.apache.hadoop.util.StringUtils.stringifyException(e));
- }
-
- // Retrieve results
- List<Result<TimeBoundaryResultValue>> timeList;
- try {
- timeList = DruidStorageHandlerUtils.SMILE_MAPPER.readValue(response,
- new TypeReference<List<Result<TimeBoundaryResultValue>>>() {});
- } catch (Exception e) {
- response.close();
- throw new IOException(org.apache.hadoop.util.StringUtils.stringifyException(e));
- }
- if (timeList == null || timeList.isEmpty()) {
- throw new IOException("Connected to Druid but could not retrieve time boundary information");
- }
- if (timeList.size() != 1) {
- throw new IOException("We should obtain a single time boundary");
- }
-
- intervals.add(new Interval(timeList.get(0).getValue().getMinTime().getMillis(),
- timeList.get(0).getValue().getMaxTime().getMillis(), ISOChronology.getInstanceUTC()));
- } else {
- intervals.addAll(query.getIntervals());
- }
-
- // Create (numRows/default threshold) input splits
- int numSplits = (int) Math.ceil((double) numRows / selectThreshold);
- List<List<Interval>> newIntervals = createSplitsIntervals(intervals, numSplits);
- HiveDruidSplit[] splits = new HiveDruidSplit[numSplits];
- for (int i = 0; i < numSplits; i++) {
- // Create partial Select query
- final SelectQuery partialQuery = query.withQuerySegmentSpec(
- new MultipleIntervalSegmentSpec(newIntervals.get(i)));
- splits[i] = new HiveDruidSplit(address,
- DruidStorageHandlerUtils.JSON_MAPPER.writeValueAsString(partialQuery), dummyPath);
- }
- return splits;
- }
-
- private static List<List<Interval>> createSplitsIntervals(List<Interval> intervals, int numSplits) {
- final long totalTime = DruidDateTimeUtils.extractTotalTime(intervals);
- long startTime = intervals.get(0).getStartMillis();
- long endTime = startTime;
- long currTime = 0;
- List<List<Interval>> newIntervals = new ArrayList<>();
- for (int i = 0, posIntervals = 0; i < numSplits; i++) {
- final long rangeSize = Math.round( (double) (totalTime * (i + 1)) / numSplits) -
- Math.round( (double) (totalTime * i) / numSplits);
- // Create the new interval(s)
- List<Interval> currentIntervals = new ArrayList<>();
- while (posIntervals < intervals.size()) {
- final Interval interval = intervals.get(posIntervals);
- final long expectedRange = rangeSize - currTime;
- if (interval.getEndMillis() - startTime >= expectedRange) {
- endTime = startTime + expectedRange;
- currentIntervals.add(new Interval(startTime, endTime, ISOChronology.getInstanceUTC()));
- startTime = endTime;
- currTime = 0;
- break;
- }
- endTime = interval.getEndMillis();
- currentIntervals.add(new Interval(startTime, endTime, ISOChronology.getInstanceUTC()));
- currTime += (endTime - startTime);
- startTime = intervals.get(++posIntervals).getStartMillis();
- }
- newIntervals.add(currentIntervals);
- }
- assert endTime == intervals.get(intervals.size()-1).getEndMillis();
- return newIntervals;
- }
-
- @Override
- public org.apache.hadoop.mapred.RecordReader<NullWritable, DruidWritable> getRecordReader(
- org.apache.hadoop.mapred.InputSplit split, JobConf job, Reporter reporter)
- throws IOException {
- // We need to provide a different record reader for every type of Druid query.
- // The reason is that Druid results format is different for each type.
- final DruidQueryRecordReader<?,?> reader;
- final String druidQueryType = job.get(Constants.DRUID_QUERY_TYPE);
- if (druidQueryType == null) {
- reader = new DruidSelectQueryRecordReader(); // By default
- reader.initialize((HiveDruidSplit)split, job);
- return reader;
- }
- switch (druidQueryType) {
- case Query.TIMESERIES:
- reader = new DruidTimeseriesQueryRecordReader();
- break;
- case Query.TOPN:
- reader = new DruidTopNQueryRecordReader();
- break;
- case Query.GROUP_BY:
- reader = new DruidGroupByQueryRecordReader();
- break;
- case Query.SELECT:
- reader = new DruidSelectQueryRecordReader();
- break;
- default:
- throw new IOException("Druid query type not recognized");
- }
- reader.initialize((HiveDruidSplit)split, job);
- return reader;
- }
-
- @Override
- public RecordReader<NullWritable, DruidWritable> createRecordReader(InputSplit split,
- TaskAttemptContext context) throws IOException, InterruptedException {
- // We need to provide a different record reader for every type of Druid query.
- // The reason is that Druid results format is different for each type.
- final String druidQueryType = context.getConfiguration().get(Constants.DRUID_QUERY_TYPE);
- if (druidQueryType == null) {
- return new DruidSelectQueryRecordReader(); // By default
- }
- final DruidQueryRecordReader<?,?> reader;
- switch (druidQueryType) {
- case Query.TIMESERIES:
- reader = new DruidTimeseriesQueryRecordReader();
- break;
- case Query.TOPN:
- reader = new DruidTopNQueryRecordReader();
- break;
- case Query.GROUP_BY:
- reader = new DruidGroupByQueryRecordReader();
- break;
- case Query.SELECT:
- reader = new DruidSelectQueryRecordReader();
- break;
- default:
- throw new IOException("Druid query type not recognized");
- }
- return reader;
- }
-
-}
http://git-wip-us.apache.org/repos/asf/hive/blob/590687bc/druid-handler/src/java/org/apache/hadoop/hive/druid/HiveDruidSplit.java
----------------------------------------------------------------------
diff --git a/druid-handler/src/java/org/apache/hadoop/hive/druid/HiveDruidSplit.java b/druid-handler/src/java/org/apache/hadoop/hive/druid/HiveDruidSplit.java
deleted file mode 100644
index 3fba5d0..0000000
--- a/druid-handler/src/java/org/apache/hadoop/hive/druid/HiveDruidSplit.java
+++ /dev/null
@@ -1,83 +0,0 @@
-/**
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements. See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership. The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package org.apache.hadoop.hive.druid;
-
-import java.io.DataInput;
-import java.io.DataOutput;
-import java.io.IOException;
-
-import org.apache.hadoop.fs.Path;
-import org.apache.hadoop.mapred.FileSplit;
-
-/**
- * Druid split. Its purpose is to trigger query execution in Druid.
- */
-public class HiveDruidSplit extends FileSplit implements org.apache.hadoop.mapred.InputSplit {
-
- private String address;
- private String druidQuery;
-
- // required for deserialization
- public HiveDruidSplit() {
- super((Path) null, 0, 0, (String[]) null);
- }
-
- public HiveDruidSplit(String address, String druidQuery, Path dummyPath) {
- super(dummyPath, 0, 0, (String[]) null);
- this.address = address;
- this.druidQuery = druidQuery;
- }
-
- @Override
- public void write(DataOutput out) throws IOException {
- super.write(out);
- out.writeUTF(address);
- out.writeUTF(druidQuery);
- }
-
- @Override
- public void readFields(DataInput in) throws IOException {
- super.readFields(in);
- address = in.readUTF();
- druidQuery = in.readUTF();
- }
-
- @Override
- public long getLength() {
- return 0L;
- }
-
- @Override
- public String[] getLocations() {
- return new String[] {""} ;
- }
-
- public String getAddress() {
- return address;
- }
-
- public String getDruidQuery() {
- return druidQuery;
- }
-
- @Override
- public String toString() {
- return "HiveDruidSplit{" + address + ", " + druidQuery + "}";
- }
-
-}
http://git-wip-us.apache.org/repos/asf/hive/blob/590687bc/druid-handler/src/java/org/apache/hadoop/hive/druid/io/DruidOutputFormat.java
----------------------------------------------------------------------
diff --git a/druid-handler/src/java/org/apache/hadoop/hive/druid/io/DruidOutputFormat.java b/druid-handler/src/java/org/apache/hadoop/hive/druid/io/DruidOutputFormat.java
new file mode 100644
index 0000000..86ddca8
--- /dev/null
+++ b/druid-handler/src/java/org/apache/hadoop/hive/druid/io/DruidOutputFormat.java
@@ -0,0 +1,204 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ * <p>
+ * http://www.apache.org/licenses/LICENSE-2.0
+ * <p>
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.hadoop.hive.druid.io;
+
+import com.google.common.base.Preconditions;
+import com.google.common.collect.ImmutableList;
+import com.google.common.collect.Lists;
+import com.metamx.common.Granularity;
+import io.druid.data.input.impl.DimensionSchema;
+import io.druid.data.input.impl.DimensionsSpec;
+import io.druid.data.input.impl.InputRowParser;
+import io.druid.data.input.impl.MapInputRowParser;
+import io.druid.data.input.impl.StringDimensionSchema;
+import io.druid.data.input.impl.TimeAndDimsParseSpec;
+import io.druid.data.input.impl.TimestampSpec;
+import io.druid.query.aggregation.AggregatorFactory;
+import io.druid.query.aggregation.DoubleSumAggregatorFactory;
+import io.druid.query.aggregation.LongSumAggregatorFactory;
+import io.druid.segment.indexing.DataSchema;
+import io.druid.segment.indexing.RealtimeTuningConfig;
+import io.druid.segment.indexing.granularity.GranularitySpec;
+import io.druid.segment.indexing.granularity.UniformGranularitySpec;
+import io.druid.segment.loading.DataSegmentPusher;
+import io.druid.segment.realtime.plumber.CustomVersioningPolicy;
+import io.druid.storage.hdfs.HdfsDataSegmentPusher;
+import io.druid.storage.hdfs.HdfsDataSegmentPusherConfig;
+import org.apache.calcite.adapter.druid.DruidTable;
+import org.apache.commons.lang.StringUtils;
+import org.apache.hadoop.fs.FileSystem;
+import org.apache.hadoop.fs.Path;
+import org.apache.hadoop.hive.conf.Constants;
+import org.apache.hadoop.hive.conf.HiveConf;
+import org.apache.hadoop.hive.druid.DruidStorageHandlerUtils;
+import org.apache.hadoop.hive.druid.serde.DruidWritable;
+import org.apache.hadoop.hive.ql.exec.FileSinkOperator;
+import org.apache.hadoop.hive.ql.io.HiveOutputFormat;
+import org.apache.hadoop.hive.serde.serdeConstants;
+import org.apache.hadoop.hive.serde2.objectinspector.ObjectInspector;
+import org.apache.hadoop.hive.serde2.typeinfo.TypeInfo;
+import org.apache.hadoop.hive.serde2.typeinfo.TypeInfoUtils;
+import org.apache.hadoop.io.Writable;
+import org.apache.hadoop.mapred.JobConf;
+import org.apache.hadoop.mapred.RecordWriter;
+import org.apache.hadoop.util.Progressable;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import java.io.File;
+import java.io.IOException;
+import java.util.ArrayList;
+import java.util.List;
+import java.util.Map;
+import java.util.Properties;
+
+import static org.apache.hadoop.hive.druid.DruidStorageHandler.SEGMENTS_DESCRIPTOR_DIR_NAME;
+
+public class DruidOutputFormat<K, V> implements HiveOutputFormat<K, DruidWritable> {
+
+ protected static final Logger LOG = LoggerFactory.getLogger(DruidOutputFormat.class);
+
+ @Override
+ public FileSinkOperator.RecordWriter getHiveRecordWriter(
+ JobConf jc,
+ Path finalOutPath,
+ Class<? extends Writable> valueClass,
+ boolean isCompressed,
+ Properties tableProperties,
+ Progressable progress
+ ) throws IOException {
+
+ final String segmentGranularity =
+ tableProperties.getProperty(Constants.DRUID_SEGMENT_GRANULARITY) != null ?
+ tableProperties.getProperty(Constants.DRUID_SEGMENT_GRANULARITY) :
+ HiveConf.getVar(jc, HiveConf.ConfVars.HIVE_DRUID_INDEXING_GRANULARITY);
+ final String dataSource = tableProperties.getProperty(Constants.DRUID_DATA_SOURCE);
+ final String segmentDirectory =
+ tableProperties.getProperty(Constants.DRUID_SEGMENT_DIRECTORY) != null
+ ? tableProperties.getProperty(Constants.DRUID_SEGMENT_DIRECTORY)
+ : HiveConf.getVar(jc, HiveConf.ConfVars.DRUID_SEGMENT_DIRECTORY);
+
+ final HdfsDataSegmentPusherConfig hdfsDataSegmentPusherConfig = new HdfsDataSegmentPusherConfig();
+ hdfsDataSegmentPusherConfig.setStorageDirectory(segmentDirectory);
+ final DataSegmentPusher hdfsDataSegmentPusher = new HdfsDataSegmentPusher(
+ hdfsDataSegmentPusherConfig, jc, DruidStorageHandlerUtils.JSON_MAPPER);
+
+ final GranularitySpec granularitySpec = new UniformGranularitySpec(
+ Granularity.valueOf(segmentGranularity),
+ null,
+ null
+ );
+
+ final String columnNameProperty = tableProperties.getProperty(serdeConstants.LIST_COLUMNS);
+ final String columnTypeProperty = tableProperties.getProperty(serdeConstants.LIST_COLUMN_TYPES);
+
+ if (StringUtils.isEmpty(columnNameProperty) || StringUtils.isEmpty(columnTypeProperty)) {
+ throw new IllegalStateException(
+ String.format("List of columns names [%s] or columns type [%s] is/are not present",
+ columnNameProperty, columnTypeProperty
+ ));
+ }
+ ArrayList<String> columnNames = new ArrayList<String>();
+ for (String name : columnNameProperty.split(",")) {
+ columnNames.add(name);
+ }
+ if (!columnNames.contains(DruidTable.DEFAULT_TIMESTAMP_COLUMN)) {
+ throw new IllegalStateException("Timestamp column (' " + DruidTable.DEFAULT_TIMESTAMP_COLUMN +
+ "') not specified in create table; list of columns is : " +
+ tableProperties.getProperty(serdeConstants.LIST_COLUMNS));
+ }
+ ArrayList<TypeInfo> columnTypes = TypeInfoUtils.getTypeInfosFromTypeString(columnTypeProperty);
+
+ // Default, all columns that are not metrics or timestamp, are treated as dimensions
+ final List<DimensionSchema> dimensions = new ArrayList<>();
+ ImmutableList.Builder<AggregatorFactory> aggregatorFactoryBuilder = ImmutableList.builder();
+ for (int i = 0; i < columnTypes.size(); i++) {
+ TypeInfo f = columnTypes.get(i);
+ assert f.getCategory() == ObjectInspector.Category.PRIMITIVE;
+ AggregatorFactory af;
+ switch (f.getTypeName()) {
+ case serdeConstants.TINYINT_TYPE_NAME:
+ case serdeConstants.SMALLINT_TYPE_NAME:
+ case serdeConstants.INT_TYPE_NAME:
+ case serdeConstants.BIGINT_TYPE_NAME:
+ af = new LongSumAggregatorFactory(columnNames.get(i), columnNames.get(i));
+ break;
+ case serdeConstants.FLOAT_TYPE_NAME:
+ case serdeConstants.DOUBLE_TYPE_NAME:
+ af = new DoubleSumAggregatorFactory(columnNames.get(i), columnNames.get(i));
+ break;
+ default:
+ // Dimension or timestamp
+ String columnName = columnNames.get(i);
+ if (!columnName.equals(DruidTable.DEFAULT_TIMESTAMP_COLUMN) && !columnName
+ .equals(Constants.DRUID_TIMESTAMP_GRANULARITY_COL_NAME)) {
+ dimensions.add(new StringDimensionSchema(columnName));
+ }
+ continue;
+ }
+ aggregatorFactoryBuilder.add(af);
+ }
+ List<AggregatorFactory> aggregatorFactories = aggregatorFactoryBuilder.build();
+ final InputRowParser inputRowParser = new MapInputRowParser(new TimeAndDimsParseSpec(
+ new TimestampSpec(DruidTable.DEFAULT_TIMESTAMP_COLUMN, "auto", null),
+ new DimensionsSpec(dimensions,
+ Lists.newArrayList(Constants.DRUID_TIMESTAMP_GRANULARITY_COL_NAME), null
+ )
+ ));
+
+ Map<String, Object> inputParser = DruidStorageHandlerUtils.JSON_MAPPER
+ .convertValue(inputRowParser, Map.class);
+
+ final DataSchema dataSchema = new DataSchema(
+ Preconditions.checkNotNull(dataSource, "Data source name is null"),
+ inputParser,
+ aggregatorFactories.toArray(new AggregatorFactory[aggregatorFactories.size()]),
+ granularitySpec,
+ DruidStorageHandlerUtils.JSON_MAPPER
+ );
+
+ final String workingPath = jc.get(Constants.DRUID_JOB_WORKING_DIRECTORY);
+ final String version = jc.get(Constants.DRUID_SEGMENT_VERSION);
+ Integer maxPartitionSize = HiveConf
+ .getIntVar(jc, HiveConf.ConfVars.HIVE_DRUID_MAX_PARTITION_SIZE);
+ String basePersistDirectory = HiveConf
+ .getVar(jc, HiveConf.ConfVars.HIVE_DRUID_BASE_PERSIST_DIRECTORY);
+ final RealtimeTuningConfig realtimeTuningConfig = RealtimeTuningConfig
+ .makeDefaultTuningConfig(new File(
+ basePersistDirectory))
+ .withVersioningPolicy(new CustomVersioningPolicy(version));
+
+ LOG.debug(String.format("running with Data schema [%s] ", dataSchema));
+ return new DruidRecordWriter(dataSchema, realtimeTuningConfig, hdfsDataSegmentPusher,
+ maxPartitionSize, new Path(workingPath, SEGMENTS_DESCRIPTOR_DIR_NAME),
+ finalOutPath.getFileSystem(jc)
+ );
+ }
+
+ @Override
+ public RecordWriter<K, DruidWritable> getRecordWriter(
+ FileSystem ignored, JobConf job, String name, Progressable progress
+ ) throws IOException {
+ throw new UnsupportedOperationException("please implement me !");
+ }
+
+ @Override
+ public void checkOutputSpecs(FileSystem ignored, JobConf job) throws IOException {
+ throw new UnsupportedOperationException("not implemented yet");
+ }
+}
[3/4] hive git commit: HIVE-15277: Teach Hive how to create/delete
Druid segments (Slim Bouguerra, reviewed by Jesus Camacho Rodriguez)
Posted by jc...@apache.org.
http://git-wip-us.apache.org/repos/asf/hive/blob/590687bc/druid-handler/src/java/org/apache/hadoop/hive/druid/io/DruidQueryBasedInputFormat.java
----------------------------------------------------------------------
diff --git a/druid-handler/src/java/org/apache/hadoop/hive/druid/io/DruidQueryBasedInputFormat.java b/druid-handler/src/java/org/apache/hadoop/hive/druid/io/DruidQueryBasedInputFormat.java
new file mode 100644
index 0000000..7ac52c6
--- /dev/null
+++ b/druid-handler/src/java/org/apache/hadoop/hive/druid/io/DruidQueryBasedInputFormat.java
@@ -0,0 +1,397 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.hadoop.hive.druid.io;
+
+import com.fasterxml.jackson.core.type.TypeReference;
+import com.metamx.common.lifecycle.Lifecycle;
+import com.metamx.http.client.HttpClient;
+import com.metamx.http.client.HttpClientConfig;
+import com.metamx.http.client.HttpClientInit;
+import io.druid.query.Druids;
+import io.druid.query.Druids.SegmentMetadataQueryBuilder;
+import io.druid.query.Druids.SelectQueryBuilder;
+import io.druid.query.Druids.TimeBoundaryQueryBuilder;
+import io.druid.query.Query;
+import io.druid.query.Result;
+import io.druid.query.metadata.metadata.SegmentAnalysis;
+import io.druid.query.metadata.metadata.SegmentMetadataQuery;
+import io.druid.query.select.PagingSpec;
+import io.druid.query.select.SelectQuery;
+import io.druid.query.spec.MultipleIntervalSegmentSpec;
+import io.druid.query.timeboundary.TimeBoundaryQuery;
+import io.druid.query.timeboundary.TimeBoundaryResultValue;
+import org.apache.calcite.adapter.druid.DruidDateTimeUtils;
+import org.apache.calcite.adapter.druid.DruidTable;
+import org.apache.commons.lang3.StringEscapeUtils;
+import org.apache.commons.lang3.StringUtils;
+import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.fs.Path;
+import org.apache.hadoop.hive.conf.Constants;
+import org.apache.hadoop.hive.conf.HiveConf;
+import org.apache.hadoop.hive.druid.DruidStorageHandlerUtils;
+import org.apache.hadoop.hive.druid.serde.DruidGroupByQueryRecordReader;
+import org.apache.hadoop.hive.druid.serde.DruidQueryRecordReader;
+import org.apache.hadoop.hive.druid.serde.DruidSelectQueryRecordReader;
+import org.apache.hadoop.hive.druid.serde.DruidTimeseriesQueryRecordReader;
+import org.apache.hadoop.hive.druid.serde.DruidTopNQueryRecordReader;
+import org.apache.hadoop.hive.druid.serde.DruidWritable;
+import org.apache.hadoop.hive.shims.ShimLoader;
+import org.apache.hadoop.io.NullWritable;
+import org.apache.hadoop.mapred.JobConf;
+import org.apache.hadoop.mapred.Reporter;
+import org.apache.hadoop.mapreduce.InputFormat;
+import org.apache.hadoop.mapreduce.InputSplit;
+import org.apache.hadoop.mapreduce.Job;
+import org.apache.hadoop.mapreduce.JobContext;
+import org.apache.hadoop.mapreduce.RecordReader;
+import org.apache.hadoop.mapreduce.TaskAttemptContext;
+import org.apache.hadoop.mapreduce.lib.input.FileInputFormat;
+import org.joda.time.Interval;
+import org.joda.time.Period;
+import org.joda.time.chrono.ISOChronology;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import java.io.IOException;
+import java.io.InputStream;
+import java.util.ArrayList;
+import java.util.Arrays;
+import java.util.HashMap;
+import java.util.List;
+import java.util.Map;
+
+/**
+ * Druid query based input format.
+ *
+ * Given a query and the Druid broker address, it will send it, and retrieve
+ * and parse the results.
+ */
+public class DruidQueryBasedInputFormat extends InputFormat<NullWritable, DruidWritable>
+ implements org.apache.hadoop.mapred.InputFormat<NullWritable, DruidWritable> {
+
+ protected static final Logger LOG = LoggerFactory.getLogger(DruidQueryBasedInputFormat.class);
+
+ @Override
+ public org.apache.hadoop.mapred.InputSplit[] getSplits(JobConf job, int numSplits)
+ throws IOException {
+ return getInputSplits(job);
+ }
+
+ @Override
+ public List<InputSplit> getSplits(JobContext context) throws IOException, InterruptedException {
+ return Arrays.<InputSplit>asList(getInputSplits(context.getConfiguration()));
+ }
+
+ @SuppressWarnings("deprecation")
+ private HiveDruidSplit[] getInputSplits(Configuration conf) throws IOException {
+ String address = HiveConf.getVar(conf,
+ HiveConf.ConfVars.HIVE_DRUID_BROKER_DEFAULT_ADDRESS
+ );
+ if (StringUtils.isEmpty(address)) {
+ throw new IOException("Druid broker address not specified in configuration");
+ }
+ String druidQuery = StringEscapeUtils.unescapeJava(conf.get(Constants.DRUID_QUERY_JSON));
+ String druidQueryType;
+ if (StringUtils.isEmpty(druidQuery)) {
+ // Empty, maybe because CBO did not run; we fall back to
+ // full Select query
+ if (LOG.isWarnEnabled()) {
+ LOG.warn("Druid query is empty; creating Select query");
+ }
+ String dataSource = conf.get(Constants.DRUID_DATA_SOURCE);
+ if (dataSource == null) {
+ throw new IOException("Druid data source cannot be empty");
+ }
+ druidQuery = createSelectStarQuery(dataSource);
+ druidQueryType = Query.SELECT;
+ } else {
+ druidQueryType = conf.get(Constants.DRUID_QUERY_TYPE);
+ if (druidQueryType == null) {
+ throw new IOException("Druid query type not recognized");
+ }
+ }
+
+ // hive depends on FileSplits
+ Job job = new Job(conf);
+ JobContext jobContext = ShimLoader.getHadoopShims().newJobContext(job);
+ Path[] paths = FileInputFormat.getInputPaths(jobContext);
+
+ switch (druidQueryType) {
+ case Query.TIMESERIES:
+ case Query.TOPN:
+ case Query.GROUP_BY:
+ return new HiveDruidSplit[] { new HiveDruidSplit(address, druidQuery, paths[0]) };
+ case Query.SELECT:
+ return splitSelectQuery(conf, address, druidQuery, paths[0]);
+ default:
+ throw new IOException("Druid query type not recognized");
+ }
+ }
+
+ private static String createSelectStarQuery(String dataSource) throws IOException {
+ // Create Select query
+ SelectQueryBuilder builder = new Druids.SelectQueryBuilder();
+ builder.dataSource(dataSource);
+ builder.intervals(Arrays.asList(DruidTable.DEFAULT_INTERVAL));
+ builder.pagingSpec(PagingSpec.newSpec(1));
+ Map<String, Object> context = new HashMap<>();
+ context.put(Constants.DRUID_QUERY_FETCH, false);
+ builder.context(context);
+ return DruidStorageHandlerUtils.JSON_MAPPER.writeValueAsString(builder.build());
+ }
+
+ /* Method that splits Select query depending on the threshold so read can be
+ * parallelized */
+ private static HiveDruidSplit[] splitSelectQuery(Configuration conf, String address,
+ String druidQuery, Path dummyPath
+ ) throws IOException {
+ final int selectThreshold = (int) HiveConf.getIntVar(
+ conf, HiveConf.ConfVars.HIVE_DRUID_SELECT_THRESHOLD);
+ final int numConnection = HiveConf
+ .getIntVar(conf, HiveConf.ConfVars.HIVE_DRUID_NUM_HTTP_CONNECTION);
+ final Period readTimeout = new Period(
+ HiveConf.getVar(conf, HiveConf.ConfVars.HIVE_DRUID_HTTP_READ_TIMEOUT));
+ SelectQuery query;
+ try {
+ query = DruidStorageHandlerUtils.JSON_MAPPER.readValue(druidQuery, SelectQuery.class);
+ } catch (Exception e) {
+ throw new IOException(e);
+ }
+
+ final boolean isFetch = query.getContextBoolean(Constants.DRUID_QUERY_FETCH, false);
+ if (isFetch) {
+ // If it has a limit, we use it and we do not split the query
+ return new HiveDruidSplit[] { new HiveDruidSplit(
+ address, DruidStorageHandlerUtils.JSON_MAPPER.writeValueAsString(query), dummyPath) };
+ }
+
+ // We do not have the number of rows, thus we need to execute a
+ // Segment Metadata query to obtain number of rows
+ SegmentMetadataQueryBuilder metadataBuilder = new Druids.SegmentMetadataQueryBuilder();
+ metadataBuilder.dataSource(query.getDataSource());
+ metadataBuilder.intervals(query.getIntervals());
+ metadataBuilder.merge(true);
+ metadataBuilder.analysisTypes();
+ SegmentMetadataQuery metadataQuery = metadataBuilder.build();
+ final Lifecycle lifecycle = new Lifecycle();
+ HttpClient client = HttpClientInit.createClient(
+ HttpClientConfig.builder().withNumConnections(numConnection)
+ .withReadTimeout(readTimeout.toStandardDuration()).build(), lifecycle);
+ try {
+ lifecycle.start();
+ } catch (Exception e) {
+ LOG.error("Lifecycle start issue", e);
+ }
+ InputStream response;
+ try {
+ response = DruidStorageHandlerUtils.submitRequest(client,
+ DruidStorageHandlerUtils.createRequest(address, metadataQuery)
+ );
+ } catch (Exception e) {
+ throw new IOException(org.apache.hadoop.util.StringUtils.stringifyException(e));
+ } finally {
+ lifecycle.stop();
+ }
+
+ // Retrieve results
+ List<SegmentAnalysis> metadataList;
+ try {
+ metadataList = DruidStorageHandlerUtils.SMILE_MAPPER.readValue(response,
+ new TypeReference<List<SegmentAnalysis>>() {
+ }
+ );
+ } catch (Exception e) {
+ response.close();
+ throw new IOException(org.apache.hadoop.util.StringUtils.stringifyException(e));
+ }
+ if (metadataList == null || metadataList.isEmpty()) {
+ throw new IOException("Connected to Druid but could not retrieve datasource information");
+ }
+ if (metadataList.size() != 1) {
+ throw new IOException("Information about segments should have been merged");
+ }
+
+ final long numRows = metadataList.get(0).getNumRows();
+
+ query = query.withPagingSpec(PagingSpec.newSpec(Integer.MAX_VALUE));
+ if (numRows <= selectThreshold) {
+ // We are not going to split it
+ return new HiveDruidSplit[] { new HiveDruidSplit(address,
+ DruidStorageHandlerUtils.JSON_MAPPER.writeValueAsString(query), dummyPath
+ ) };
+ }
+
+ // If the query does not specify a timestamp, we obtain the total time using
+ // a Time Boundary query. Then, we use the information to split the query
+ // following the Select threshold configuration property
+ final List<Interval> intervals = new ArrayList<>();
+ if (query.getIntervals().size() == 1 && query.getIntervals().get(0).withChronology(
+ ISOChronology.getInstanceUTC()).equals(DruidTable.DEFAULT_INTERVAL)) {
+ // Default max and min, we should execute a time boundary query to get a
+ // more precise range
+ TimeBoundaryQueryBuilder timeBuilder = new Druids.TimeBoundaryQueryBuilder();
+ timeBuilder.dataSource(query.getDataSource());
+ TimeBoundaryQuery timeQuery = timeBuilder.build();
+
+ try {
+ response = DruidStorageHandlerUtils.submitRequest(client,
+ DruidStorageHandlerUtils.createRequest(address, timeQuery)
+ );
+ } catch (Exception e) {
+ throw new IOException(org.apache.hadoop.util.StringUtils.stringifyException(e));
+ }
+
+ // Retrieve results
+ List<Result<TimeBoundaryResultValue>> timeList;
+ try {
+ timeList = DruidStorageHandlerUtils.SMILE_MAPPER.readValue(response,
+ new TypeReference<List<Result<TimeBoundaryResultValue>>>() {
+ }
+ );
+ } catch (Exception e) {
+ response.close();
+ throw new IOException(org.apache.hadoop.util.StringUtils.stringifyException(e));
+ }
+ if (timeList == null || timeList.isEmpty()) {
+ throw new IOException(
+ "Connected to Druid but could not retrieve time boundary information");
+ }
+ if (timeList.size() != 1) {
+ throw new IOException("We should obtain a single time boundary");
+ }
+
+ intervals.add(new Interval(timeList.get(0).getValue().getMinTime().getMillis(),
+ timeList.get(0).getValue().getMaxTime().getMillis(), ISOChronology.getInstanceUTC()
+ ));
+ } else {
+ intervals.addAll(query.getIntervals());
+ }
+
+ // Create (numRows/default threshold) input splits
+ int numSplits = (int) Math.ceil((double) numRows / selectThreshold);
+ List<List<Interval>> newIntervals = createSplitsIntervals(intervals, numSplits);
+ HiveDruidSplit[] splits = new HiveDruidSplit[numSplits];
+ for (int i = 0; i < numSplits; i++) {
+ // Create partial Select query
+ final SelectQuery partialQuery = query.withQuerySegmentSpec(
+ new MultipleIntervalSegmentSpec(newIntervals.get(i)));
+ splits[i] = new HiveDruidSplit(address,
+ DruidStorageHandlerUtils.JSON_MAPPER.writeValueAsString(partialQuery), dummyPath
+ );
+ }
+ return splits;
+ }
+
+ private static List<List<Interval>> createSplitsIntervals(List<Interval> intervals, int numSplits
+ ) {
+ final long totalTime = DruidDateTimeUtils.extractTotalTime(intervals);
+ long startTime = intervals.get(0).getStartMillis();
+ long endTime = startTime;
+ long currTime = 0;
+ List<List<Interval>> newIntervals = new ArrayList<>();
+ for (int i = 0, posIntervals = 0; i < numSplits; i++) {
+ final long rangeSize = Math.round((double) (totalTime * (i + 1)) / numSplits) -
+ Math.round((double) (totalTime * i) / numSplits);
+ // Create the new interval(s)
+ List<Interval> currentIntervals = new ArrayList<>();
+ while (posIntervals < intervals.size()) {
+ final Interval interval = intervals.get(posIntervals);
+ final long expectedRange = rangeSize - currTime;
+ if (interval.getEndMillis() - startTime >= expectedRange) {
+ endTime = startTime + expectedRange;
+ currentIntervals.add(new Interval(startTime, endTime, ISOChronology.getInstanceUTC()));
+ startTime = endTime;
+ currTime = 0;
+ break;
+ }
+ endTime = interval.getEndMillis();
+ currentIntervals.add(new Interval(startTime, endTime, ISOChronology.getInstanceUTC()));
+ currTime += (endTime - startTime);
+ startTime = intervals.get(++posIntervals).getStartMillis();
+ }
+ newIntervals.add(currentIntervals);
+ }
+ assert endTime == intervals.get(intervals.size() - 1).getEndMillis();
+ return newIntervals;
+ }
+
+ @Override
+ public org.apache.hadoop.mapred.RecordReader<NullWritable, DruidWritable> getRecordReader(
+ org.apache.hadoop.mapred.InputSplit split, JobConf job, Reporter reporter
+ )
+ throws IOException {
+ // We need to provide a different record reader for every type of Druid query.
+ // The reason is that Druid results format is different for each type.
+ final DruidQueryRecordReader<?, ?> reader;
+ final String druidQueryType = job.get(Constants.DRUID_QUERY_TYPE);
+ if (druidQueryType == null) {
+ reader = new DruidSelectQueryRecordReader(); // By default
+ reader.initialize((HiveDruidSplit) split, job);
+ return reader;
+ }
+ switch (druidQueryType) {
+ case Query.TIMESERIES:
+ reader = new DruidTimeseriesQueryRecordReader();
+ break;
+ case Query.TOPN:
+ reader = new DruidTopNQueryRecordReader();
+ break;
+ case Query.GROUP_BY:
+ reader = new DruidGroupByQueryRecordReader();
+ break;
+ case Query.SELECT:
+ reader = new DruidSelectQueryRecordReader();
+ break;
+ default:
+ throw new IOException("Druid query type not recognized");
+ }
+ reader.initialize((HiveDruidSplit) split, job);
+ return reader;
+ }
+
+ @Override
+ public RecordReader<NullWritable, DruidWritable> createRecordReader(InputSplit split,
+ TaskAttemptContext context
+ ) throws IOException, InterruptedException {
+ // We need to provide a different record reader for every type of Druid query.
+ // The reason is that Druid results format is different for each type.
+ final String druidQueryType = context.getConfiguration().get(Constants.DRUID_QUERY_TYPE);
+ if (druidQueryType == null) {
+ return new DruidSelectQueryRecordReader(); // By default
+ }
+ final DruidQueryRecordReader<?, ?> reader;
+ switch (druidQueryType) {
+ case Query.TIMESERIES:
+ reader = new DruidTimeseriesQueryRecordReader();
+ break;
+ case Query.TOPN:
+ reader = new DruidTopNQueryRecordReader();
+ break;
+ case Query.GROUP_BY:
+ reader = new DruidGroupByQueryRecordReader();
+ break;
+ case Query.SELECT:
+ reader = new DruidSelectQueryRecordReader();
+ break;
+ default:
+ throw new IOException("Druid query type not recognized");
+ }
+ return reader;
+ }
+
+}
http://git-wip-us.apache.org/repos/asf/hive/blob/590687bc/druid-handler/src/java/org/apache/hadoop/hive/druid/io/DruidRecordWriter.java
----------------------------------------------------------------------
diff --git a/druid-handler/src/java/org/apache/hadoop/hive/druid/io/DruidRecordWriter.java b/druid-handler/src/java/org/apache/hadoop/hive/druid/io/DruidRecordWriter.java
new file mode 100644
index 0000000..1601a9a
--- /dev/null
+++ b/druid-handler/src/java/org/apache/hadoop/hive/druid/io/DruidRecordWriter.java
@@ -0,0 +1,260 @@
+package org.apache.hadoop.hive.druid.io;
+
+import com.google.common.base.Function;
+import com.google.common.base.Joiner;
+import com.google.common.base.Preconditions;
+import com.google.common.base.Supplier;
+import com.google.common.base.Suppliers;
+import com.google.common.base.Throwables;
+import com.google.common.collect.FluentIterable;
+import com.google.common.collect.Lists;
+import com.metamx.common.Granularity;
+import io.druid.data.input.Committer;
+import io.druid.data.input.InputRow;
+import io.druid.data.input.MapBasedInputRow;
+import io.druid.segment.indexing.DataSchema;
+import io.druid.segment.indexing.RealtimeTuningConfig;
+import io.druid.segment.loading.DataSegmentPusher;
+import io.druid.segment.realtime.FireDepartmentMetrics;
+import io.druid.segment.realtime.appenderator.Appenderator;
+import io.druid.segment.realtime.appenderator.Appenderators;
+import io.druid.segment.realtime.appenderator.SegmentIdentifier;
+import io.druid.segment.realtime.appenderator.SegmentNotWritableException;
+import io.druid.segment.realtime.appenderator.SegmentsAndMetadata;
+import io.druid.segment.realtime.plumber.Committers;
+import io.druid.timeline.DataSegment;
+import io.druid.timeline.partition.LinearShardSpec;
+import org.apache.calcite.adapter.druid.DruidTable;
+import org.apache.hadoop.fs.FileSystem;
+import org.apache.hadoop.fs.Path;
+import org.apache.hadoop.hive.conf.Constants;
+import org.apache.hadoop.hive.druid.DruidStorageHandlerUtils;
+import org.apache.hadoop.hive.druid.serde.DruidWritable;
+import org.apache.hadoop.io.NullWritable;
+import org.apache.hadoop.io.Writable;
+import org.apache.hadoop.mapred.RecordWriter;
+import org.apache.hadoop.mapred.Reporter;
+import org.joda.time.DateTime;
+import org.joda.time.Interval;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import javax.annotation.Nullable;
+import java.io.IOException;
+import java.util.HashSet;
+import java.util.List;
+import java.util.concurrent.ExecutionException;
+
+public class DruidRecordWriter implements RecordWriter<NullWritable, DruidWritable>,
+ org.apache.hadoop.hive.ql.exec.FileSinkOperator.RecordWriter {
+ protected static final Logger LOG = LoggerFactory.getLogger(DruidRecordWriter.class);
+
+ private final DataSchema dataSchema;
+
+ private final Appenderator appenderator;
+
+ private final RealtimeTuningConfig tuningConfig;
+
+ private final Path segmentsDescriptorDir;
+
+ private SegmentIdentifier currentOpenSegment = null;
+
+ private final Integer maxPartitionSize;
+
+ private final FileSystem fileSystem;
+
+ private final Supplier<Committer> committerSupplier;
+
+ public DruidRecordWriter(
+ DataSchema dataSchema,
+ RealtimeTuningConfig realtimeTuningConfig,
+ DataSegmentPusher dataSegmentPusher,
+ int maxPartitionSize,
+ final Path segmentsDescriptorsDir,
+ final FileSystem fileSystem
+ ) {
+ this.tuningConfig = Preconditions
+ .checkNotNull(realtimeTuningConfig, "realtimeTuningConfig is null");
+ this.dataSchema = Preconditions.checkNotNull(dataSchema, "data schema is null");
+ appenderator = Appenderators
+ .createOffline(this.dataSchema,
+ tuningConfig,
+ new FireDepartmentMetrics(), dataSegmentPusher,
+ DruidStorageHandlerUtils.JSON_MAPPER,
+ DruidStorageHandlerUtils.INDEX_IO,
+ DruidStorageHandlerUtils.INDEX_MERGER_V9
+ );
+ Preconditions.checkArgument(maxPartitionSize > 0, "maxPartitionSize need to be greater than 0");
+ this.maxPartitionSize = maxPartitionSize;
+ appenderator.startJob(); // maybe we need to move this out of the constructor
+ this.segmentsDescriptorDir = Preconditions
+ .checkNotNull(segmentsDescriptorsDir, "segmentsDescriptorsDir is null");
+ this.fileSystem = Preconditions.checkNotNull(fileSystem, "file system is null");
+ committerSupplier = Suppliers.ofInstance(Committers.nil());
+ }
+
+ /**
+ * This function computes the segment identifier and push the current open segment
+ * The push will occur if max size is reached or the event belongs to the next interval.
+ * Note that this function assumes that timestamps are pseudo sorted.
+ * This function will close and move to the next segment granularity as soon as
+ * an event from the next interval appears. The sorting is done by the previous stage.
+ *
+ * @return segmentIdentifier with of the truncatedTime and maybe push the current open segment.
+ */
+ private SegmentIdentifier getSegmentIdentifierAndMaybePush(long truncatedTime) {
+
+ final Granularity segmentGranularity = dataSchema.getGranularitySpec()
+ .getSegmentGranularity();
+
+ final Interval interval = new Interval(
+ new DateTime(truncatedTime),
+ segmentGranularity.increment(new DateTime(truncatedTime))
+ );
+
+ SegmentIdentifier retVal;
+ if (currentOpenSegment == null) {
+ retVal = new SegmentIdentifier(
+ dataSchema.getDataSource(),
+ interval,
+ tuningConfig.getVersioningPolicy().getVersion(interval),
+ new LinearShardSpec(0)
+ );
+ currentOpenSegment = retVal;
+ return retVal;
+ } else if (currentOpenSegment.getInterval().equals(interval)) {
+ retVal = currentOpenSegment;
+ int rowCount = appenderator.getRowCount(retVal);
+ if (rowCount < maxPartitionSize) {
+ return retVal;
+ } else {
+ retVal = new SegmentIdentifier(
+ dataSchema.getDataSource(),
+ interval,
+ tuningConfig.getVersioningPolicy().getVersion(interval),
+ new LinearShardSpec(currentOpenSegment.getShardSpec().getPartitionNum() + 1)
+ );
+ pushSegments(Lists.newArrayList(currentOpenSegment));
+ currentOpenSegment = retVal;
+ return retVal;
+ }
+ } else {
+ retVal = new SegmentIdentifier(
+ dataSchema.getDataSource(),
+ interval,
+ tuningConfig.getVersioningPolicy().getVersion(interval),
+ new LinearShardSpec(0)
+ );
+ pushSegments(Lists.newArrayList(currentOpenSegment));
+ currentOpenSegment = retVal;
+ return retVal;
+ }
+ }
+
+ private void pushSegments(List<SegmentIdentifier> segmentsToPush) {
+ try {
+ SegmentsAndMetadata segmentsAndMetadata = appenderator
+ .push(segmentsToPush, committerSupplier.get()).get();
+ final HashSet<String> pushedSegmentIdentifierHashSet = new HashSet<>();
+
+ for (DataSegment pushedSegment : segmentsAndMetadata.getSegments()) {
+ pushedSegmentIdentifierHashSet
+ .add(SegmentIdentifier.fromDataSegment(pushedSegment).getIdentifierAsString());
+ final Path segmentDescriptorOutputPath = DruidStorageHandlerUtils
+ .makeSegmentDescriptorOutputPath(pushedSegment, segmentsDescriptorDir);
+ DruidStorageHandlerUtils
+ .writeSegmentDescriptor(fileSystem, pushedSegment, segmentDescriptorOutputPath);
+
+ LOG.info(
+ String.format(
+ "Pushed the segment [%s] and persisted the descriptor located at [%s]",
+ pushedSegment,
+ segmentDescriptorOutputPath
+ )
+ );
+ }
+
+ final HashSet<String> toPushSegmentsHashSet = new HashSet(
+ FluentIterable.from(segmentsToPush)
+ .transform(new Function<SegmentIdentifier, String>() {
+ @Nullable
+ @Override
+ public String apply(
+ @Nullable SegmentIdentifier input
+ ) {
+ return input.getIdentifierAsString();
+ }
+ })
+ .toList());
+
+ if (!pushedSegmentIdentifierHashSet.equals(toPushSegmentsHashSet)) {
+ throw new IllegalStateException(String.format(
+ "was asked to publish [%s] but was able to publish only [%s]",
+ Joiner.on(", ").join(toPushSegmentsHashSet),
+ Joiner.on(", ").join(pushedSegmentIdentifierHashSet)
+ ));
+ }
+
+ LOG.info(String.format("Published [%,d] segments.", segmentsToPush.size()));
+ } catch (InterruptedException e) {
+ LOG.error(String.format("got interrupted, failed to push [%,d] segments.",
+ segmentsToPush.size()
+ ), e);
+ Thread.currentThread().interrupt();
+ } catch (IOException | ExecutionException e) {
+ LOG.error(String.format("Failed to push [%,d] segments.", segmentsToPush.size()), e);
+ Throwables.propagate(e);
+ }
+ }
+
+ @Override
+ public void write(Writable w) throws IOException {
+ DruidWritable record = (DruidWritable) w;
+ final long timestamp = (long) record.getValue().get(DruidTable.DEFAULT_TIMESTAMP_COLUMN);
+ final long truncatedTime = (long) record.getValue()
+ .get(Constants.DRUID_TIMESTAMP_GRANULARITY_COL_NAME);
+
+ InputRow inputRow = new MapBasedInputRow(
+ timestamp,
+ dataSchema.getParser()
+ .getParseSpec()
+ .getDimensionsSpec()
+ .getDimensionNames(),
+ record.getValue()
+ );
+
+ try {
+ appenderator
+ .add(getSegmentIdentifierAndMaybePush(truncatedTime), inputRow, committerSupplier);
+ } catch (SegmentNotWritableException e) {
+ throw new IOException(e);
+ }
+ }
+
+ @Override
+ public void close(boolean abort) throws IOException {
+ try {
+ if (abort == false) {
+ final List<SegmentIdentifier> segmentsToPush = Lists.newArrayList();
+ segmentsToPush.addAll(appenderator.getSegments());
+ pushSegments(segmentsToPush);
+ }
+ appenderator.clear();
+ } catch (InterruptedException e) {
+ Throwables.propagate(e);
+ } finally {
+ appenderator.close();
+ }
+ }
+
+ @Override
+ public void write(NullWritable key, DruidWritable value) throws IOException {
+ this.write(value);
+ }
+
+ @Override
+ public void close(Reporter reporter) throws IOException {
+ this.close(true);
+ }
+
+}
http://git-wip-us.apache.org/repos/asf/hive/blob/590687bc/druid-handler/src/java/org/apache/hadoop/hive/druid/io/HiveDruidSplit.java
----------------------------------------------------------------------
diff --git a/druid-handler/src/java/org/apache/hadoop/hive/druid/io/HiveDruidSplit.java b/druid-handler/src/java/org/apache/hadoop/hive/druid/io/HiveDruidSplit.java
new file mode 100644
index 0000000..861075d
--- /dev/null
+++ b/druid-handler/src/java/org/apache/hadoop/hive/druid/io/HiveDruidSplit.java
@@ -0,0 +1,84 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.hadoop.hive.druid.io;
+
+import java.io.DataInput;
+import java.io.DataOutput;
+import java.io.IOException;
+
+import org.apache.hadoop.fs.Path;
+import org.apache.hadoop.mapred.FileSplit;
+
+/**
+ * Druid split. Its purpose is to trigger query execution in Druid.
+ */
+public class HiveDruidSplit extends FileSplit implements org.apache.hadoop.mapred.InputSplit {
+
+ private String address;
+
+ private String druidQuery;
+
+ // required for deserialization
+ public HiveDruidSplit() {
+ super((Path) null, 0, 0, (String[]) null);
+ }
+
+ public HiveDruidSplit(String address, String druidQuery, Path dummyPath) {
+ super(dummyPath, 0, 0, (String[]) null);
+ this.address = address;
+ this.druidQuery = druidQuery;
+ }
+
+ @Override
+ public void write(DataOutput out) throws IOException {
+ super.write(out);
+ out.writeUTF(address);
+ out.writeUTF(druidQuery);
+ }
+
+ @Override
+ public void readFields(DataInput in) throws IOException {
+ super.readFields(in);
+ address = in.readUTF();
+ druidQuery = in.readUTF();
+ }
+
+ @Override
+ public long getLength() {
+ return 0L;
+ }
+
+ @Override
+ public String[] getLocations() {
+ return new String[] { "" };
+ }
+
+ public String getAddress() {
+ return address;
+ }
+
+ public String getDruidQuery() {
+ return druidQuery;
+ }
+
+ @Override
+ public String toString() {
+ return "HiveDruidSplit{" + address + ", " + druidQuery + "}";
+ }
+
+}
http://git-wip-us.apache.org/repos/asf/hive/blob/590687bc/druid-handler/src/java/org/apache/hadoop/hive/druid/serde/DruidGroupByQueryRecordReader.java
----------------------------------------------------------------------
diff --git a/druid-handler/src/java/org/apache/hadoop/hive/druid/serde/DruidGroupByQueryRecordReader.java b/druid-handler/src/java/org/apache/hadoop/hive/druid/serde/DruidGroupByQueryRecordReader.java
index f97f820..9e8b439 100644
--- a/druid-handler/src/java/org/apache/hadoop/hive/druid/serde/DruidGroupByQueryRecordReader.java
+++ b/druid-handler/src/java/org/apache/hadoop/hive/druid/serde/DruidGroupByQueryRecordReader.java
@@ -42,7 +42,9 @@ public class DruidGroupByQueryRecordReader
extends DruidQueryRecordReader<GroupByQuery, Row> {
private Row current;
+
private int[] indexes = new int[0];
+
// Row objects returned by GroupByQuery have different access paths depending on
// whether the result for the metric is a Float or a Long, thus we keep track
// using these converters
@@ -62,11 +64,14 @@ public class DruidGroupByQueryRecordReader
@Override
protected List<Row> createResultsList(InputStream content) throws IOException {
return DruidStorageHandlerUtils.SMILE_MAPPER.readValue(content,
- new TypeReference<List<Row>>(){});
+ new TypeReference<List<Row>>() {
+ }
+ );
}
private void initExtractors() throws IOException {
- extractors = new Extract[query.getAggregatorSpecs().size() + query.getPostAggregatorSpecs().size()];
+ extractors = new Extract[query.getAggregatorSpecs().size() + query.getPostAggregatorSpecs()
+ .size()];
int counter = 0;
for (int i = 0; i < query.getAggregatorSpecs().size(); i++, counter++) {
AggregatorFactory af = query.getAggregatorSpecs().get(i);
@@ -103,7 +108,7 @@ public class DruidGroupByQueryRecordReader
if (results.hasNext()) {
current = results.next();
indexes = new int[query.getDimensions().size()];
- for (int i=0; i < query.getDimensions().size(); i++) {
+ for (int i = 0; i < query.getDimensions().size(); i++) {
DimensionSpec ds = query.getDimensions().get(i);
indexes[i] = current.getDimension(ds.getDimension()).size() - 1;
}
@@ -124,7 +129,7 @@ public class DruidGroupByQueryRecordReader
// 1) The timestamp column
value.getValue().put(DruidTable.DEFAULT_TIMESTAMP_COLUMN, current.getTimestamp().getMillis());
// 2) The dimension columns
- for (int i=0; i < query.getDimensions().size(); i++) {
+ for (int i = 0; i < query.getDimensions().size(); i++) {
DimensionSpec ds = query.getDimensions().get(i);
List<String> dims = current.getDimension(ds.getDimension());
if (dims.size() == 0) {
@@ -163,7 +168,7 @@ public class DruidGroupByQueryRecordReader
// 1) The timestamp column
value.getValue().put(DruidTable.DEFAULT_TIMESTAMP_COLUMN, current.getTimestamp().getMillis());
// 2) The dimension columns
- for (int i=0; i < query.getDimensions().size(); i++) {
+ for (int i = 0; i < query.getDimensions().size(); i++) {
DimensionSpec ds = query.getDimensions().get(i);
List<String> dims = current.getDimension(ds.getDimension());
if (dims.size() == 0) {
http://git-wip-us.apache.org/repos/asf/hive/blob/590687bc/druid-handler/src/java/org/apache/hadoop/hive/druid/serde/DruidQueryRecordReader.java
----------------------------------------------------------------------
diff --git a/druid-handler/src/java/org/apache/hadoop/hive/druid/serde/DruidQueryRecordReader.java b/druid-handler/src/java/org/apache/hadoop/hive/druid/serde/DruidQueryRecordReader.java
index fe6213b..dc9d6a0 100644
--- a/druid-handler/src/java/org/apache/hadoop/hive/druid/serde/DruidQueryRecordReader.java
+++ b/druid-handler/src/java/org/apache/hadoop/hive/druid/serde/DruidQueryRecordReader.java
@@ -17,15 +17,16 @@
*/
package org.apache.hadoop.hive.druid.serde;
-import java.io.IOException;
-import java.io.InputStream;
-import java.util.Iterator;
-import java.util.List;
-
+import com.google.common.collect.Iterators;
+import com.metamx.common.lifecycle.Lifecycle;
+import com.metamx.http.client.HttpClient;
+import com.metamx.http.client.HttpClientConfig;
+import com.metamx.http.client.HttpClientInit;
+import io.druid.query.BaseQuery;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.hive.conf.HiveConf;
import org.apache.hadoop.hive.druid.DruidStorageHandlerUtils;
-import org.apache.hadoop.hive.druid.HiveDruidSplit;
+import org.apache.hadoop.hive.druid.io.HiveDruidSplit;
import org.apache.hadoop.io.NullWritable;
import org.apache.hadoop.mapreduce.InputSplit;
import org.apache.hadoop.mapreduce.RecordReader;
@@ -34,24 +35,21 @@ import org.joda.time.Period;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
-import com.google.common.collect.Iterators;
-import com.metamx.common.lifecycle.Lifecycle;
-import com.metamx.http.client.HttpClient;
-import com.metamx.http.client.HttpClientConfig;
-import com.metamx.http.client.HttpClientInit;
-
-import io.druid.query.BaseQuery;
+import java.io.IOException;
+import java.io.InputStream;
+import java.util.Iterator;
+import java.util.List;
/**
* Base record reader for given a Druid query. This class contains the logic to
* send the query to the broker and retrieve the results. The transformation to
* emit records needs to be done by the classes that extend the reader.
- *
+ *
* The key for each record will be a NullWritable, while the value will be a
* DruidWritable containing the timestamp as well as all values resulting from
* the query.
*/
-public abstract class DruidQueryRecordReader<T extends BaseQuery<R>,R extends Comparable<R>>
+public abstract class DruidQueryRecordReader<T extends BaseQuery<R>, R extends Comparable<R>>
extends RecordReader<NullWritable, DruidWritable>
implements org.apache.hadoop.mapred.RecordReader<NullWritable, DruidWritable> {
@@ -83,6 +81,7 @@ public abstract class DruidQueryRecordReader<T extends BaseQuery<R>,R extends Co
LOG.info("Retrieving from druid using query:\n " + query);
}
+ final Lifecycle lifecycle = new Lifecycle();
final int numConnection = HiveConf
.getIntVar(conf, HiveConf.ConfVars.HIVE_DRUID_NUM_HTTP_CONNECTION);
final Period readTimeout = new Period(
@@ -90,10 +89,17 @@ public abstract class DruidQueryRecordReader<T extends BaseQuery<R>,R extends Co
HttpClient client = HttpClientInit.createClient(
HttpClientConfig.builder().withReadTimeout(readTimeout.toStandardDuration())
- .withNumConnections(numConnection).build(), new Lifecycle());
- InputStream response = DruidStorageHandlerUtils.submitRequest(client,
- DruidStorageHandlerUtils.createRequest(hiveDruidSplit.getAddress(), query));
+ .withNumConnections(numConnection).build(), lifecycle);
+ try {
+ lifecycle.start();
+ } catch (Exception e) {
+ LOG.error("Issues with lifecycle start", e);
+ }
+ InputStream response = DruidStorageHandlerUtils.submitRequest(client,
+ DruidStorageHandlerUtils.createRequest(hiveDruidSplit.getAddress(), query)
+ );
+ lifecycle.stop();
// Retrieve results
List<R> resultsList;
try {
http://git-wip-us.apache.org/repos/asf/hive/blob/590687bc/druid-handler/src/java/org/apache/hadoop/hive/druid/serde/DruidSelectQueryRecordReader.java
----------------------------------------------------------------------
diff --git a/druid-handler/src/java/org/apache/hadoop/hive/druid/serde/DruidSelectQueryRecordReader.java b/druid-handler/src/java/org/apache/hadoop/hive/druid/serde/DruidSelectQueryRecordReader.java
index c30ac56..8a41e91 100644
--- a/druid-handler/src/java/org/apache/hadoop/hive/druid/serde/DruidSelectQueryRecordReader.java
+++ b/druid-handler/src/java/org/apache/hadoop/hive/druid/serde/DruidSelectQueryRecordReader.java
@@ -41,6 +41,7 @@ public class DruidSelectQueryRecordReader
extends DruidQueryRecordReader<SelectQuery, Result<SelectResultValue>> {
private Result<SelectResultValue> current;
+
private Iterator<EventHolder> values = Iterators.emptyIterator();
@Override
@@ -49,9 +50,12 @@ public class DruidSelectQueryRecordReader
}
@Override
- protected List<Result<SelectResultValue>> createResultsList(InputStream content) throws IOException {
+ protected List<Result<SelectResultValue>> createResultsList(InputStream content)
+ throws IOException {
return DruidStorageHandlerUtils.SMILE_MAPPER.readValue(content,
- new TypeReference<List<Result<SelectResultValue>>>(){});
+ new TypeReference<List<Result<SelectResultValue>>>() {
+ }
+ );
}
@Override
http://git-wip-us.apache.org/repos/asf/hive/blob/590687bc/druid-handler/src/java/org/apache/hadoop/hive/druid/serde/DruidSerDe.java
----------------------------------------------------------------------
diff --git a/druid-handler/src/java/org/apache/hadoop/hive/druid/serde/DruidSerDe.java b/druid-handler/src/java/org/apache/hadoop/hive/druid/serde/DruidSerDe.java
index eb78a70..2e90df1 100644
--- a/druid-handler/src/java/org/apache/hadoop/hive/druid/serde/DruidSerDe.java
+++ b/druid-handler/src/java/org/apache/hadoop/hive/druid/serde/DruidSerDe.java
@@ -17,19 +17,33 @@
*/
package org.apache.hadoop.hive.druid.serde;
-import java.io.IOException;
-import java.io.InputStream;
-import java.sql.Timestamp;
-import java.util.ArrayList;
-import java.util.List;
-import java.util.Map.Entry;
-import java.util.Properties;
-
+import com.fasterxml.jackson.core.type.TypeReference;
+import com.google.common.base.Function;
+import com.google.common.collect.Lists;
+import com.metamx.common.lifecycle.Lifecycle;
+import com.metamx.http.client.HttpClient;
+import com.metamx.http.client.HttpClientConfig;
+import com.metamx.http.client.HttpClientInit;
+import io.druid.query.Druids;
+import io.druid.query.Druids.SegmentMetadataQueryBuilder;
+import io.druid.query.Query;
+import io.druid.query.aggregation.AggregatorFactory;
+import io.druid.query.aggregation.PostAggregator;
+import io.druid.query.dimension.DimensionSpec;
+import io.druid.query.groupby.GroupByQuery;
+import io.druid.query.metadata.metadata.ColumnAnalysis;
+import io.druid.query.metadata.metadata.SegmentAnalysis;
+import io.druid.query.metadata.metadata.SegmentMetadataQuery;
+import io.druid.query.select.SelectQuery;
+import io.druid.query.timeseries.TimeseriesQuery;
+import io.druid.query.topn.TopNQuery;
import org.apache.calcite.adapter.druid.DruidTable;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.hive.conf.Constants;
import org.apache.hadoop.hive.conf.HiveConf;
import org.apache.hadoop.hive.druid.DruidStorageHandlerUtils;
+import org.apache.hadoop.hive.ql.exec.Utilities;
+import org.apache.hadoop.hive.serde.serdeConstants;
import org.apache.hadoop.hive.serde2.AbstractSerDe;
import org.apache.hadoop.hive.serde2.SerDeException;
import org.apache.hadoop.hive.serde2.SerDeSpec;
@@ -37,12 +51,19 @@ import org.apache.hadoop.hive.serde2.SerDeStats;
import org.apache.hadoop.hive.serde2.io.TimestampWritable;
import org.apache.hadoop.hive.serde2.objectinspector.ObjectInspector;
import org.apache.hadoop.hive.serde2.objectinspector.ObjectInspectorFactory;
+import org.apache.hadoop.hive.serde2.objectinspector.StructField;
+import org.apache.hadoop.hive.serde2.objectinspector.StructObjectInspector;
+import org.apache.hadoop.hive.serde2.objectinspector.primitive.DoubleObjectInspector;
+import org.apache.hadoop.hive.serde2.objectinspector.primitive.FloatObjectInspector;
+import org.apache.hadoop.hive.serde2.objectinspector.primitive.LongObjectInspector;
import org.apache.hadoop.hive.serde2.objectinspector.primitive.PrimitiveObjectInspectorFactory;
+import org.apache.hadoop.hive.serde2.objectinspector.primitive.StringObjectInspector;
+import org.apache.hadoop.hive.serde2.objectinspector.primitive.TimestampObjectInspector;
import org.apache.hadoop.hive.serde2.typeinfo.PrimitiveTypeInfo;
import org.apache.hadoop.hive.serde2.typeinfo.TypeInfoFactory;
+import org.apache.hadoop.io.DoubleWritable;
import org.apache.hadoop.io.FloatWritable;
import org.apache.hadoop.io.LongWritable;
-import org.apache.hadoop.io.NullWritable;
import org.apache.hadoop.io.Text;
import org.apache.hadoop.io.Writable;
import org.apache.hadoop.util.StringUtils;
@@ -50,43 +71,34 @@ import org.joda.time.Period;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
-import com.fasterxml.jackson.core.type.TypeReference;
-import com.google.common.collect.Lists;
-import com.metamx.common.lifecycle.Lifecycle;
-import com.metamx.http.client.HttpClient;
-import com.metamx.http.client.HttpClientConfig;
-import com.metamx.http.client.HttpClientInit;
-
-import io.druid.query.Druids;
-import io.druid.query.Druids.SegmentMetadataQueryBuilder;
-import io.druid.query.Query;
-import io.druid.query.aggregation.AggregatorFactory;
-import io.druid.query.aggregation.PostAggregator;
-import io.druid.query.dimension.DimensionSpec;
-import io.druid.query.groupby.GroupByQuery;
-import io.druid.query.metadata.metadata.ColumnAnalysis;
-import io.druid.query.metadata.metadata.SegmentAnalysis;
-import io.druid.query.metadata.metadata.SegmentMetadataQuery;
-import io.druid.query.select.SelectQuery;
-import io.druid.query.timeseries.TimeseriesQuery;
-import io.druid.query.topn.TopNQuery;
+import java.io.IOException;
+import java.io.InputStream;
+import java.sql.Timestamp;
+import java.util.ArrayList;
+import java.util.HashMap;
+import java.util.List;
+import java.util.Map;
+import java.util.Map.Entry;
+import java.util.Properties;
/**
* DruidSerDe that is used to deserialize objects from a Druid data source.
*/
-@SerDeSpec(schemaProps = {Constants.DRUID_DATA_SOURCE})
+@SerDeSpec(schemaProps = { Constants.DRUID_DATA_SOURCE })
public class DruidSerDe extends AbstractSerDe {
protected static final Logger LOG = LoggerFactory.getLogger(DruidSerDe.class);
private String[] columns;
+
private PrimitiveTypeInfo[] types;
- private ObjectInspector inspector;
private int numConnection;
private Period readTimeout;
+ private ObjectInspector inspector;
+
@Override
public void initialize(Configuration configuration, Properties properties) throws SerDeException {
final List<String> columnNames = new ArrayList<>();
@@ -96,56 +108,93 @@ public class DruidSerDe extends AbstractSerDe {
// Druid query
String druidQuery = properties.getProperty(Constants.DRUID_QUERY_JSON);
if (druidQuery == null) {
- // No query. We need to create a Druid Segment Metadata query that retrieves all
- // columns present in the data source (dimensions and metrics).
- // Create Segment Metadata Query
- String dataSource = properties.getProperty(Constants.DRUID_DATA_SOURCE);
- if (dataSource == null) {
- throw new SerDeException("Druid data source not specified; use " +
- Constants.DRUID_DATA_SOURCE + " in table properties");
- }
- SegmentMetadataQueryBuilder builder = new Druids.SegmentMetadataQueryBuilder();
- builder.dataSource(dataSource);
- builder.merge(true);
- builder.analysisTypes();
- SegmentMetadataQuery query = builder.build();
+ // No query. Either it is a CTAS, or we need to create a Druid
+ // Segment Metadata query that retrieves all columns present in
+ // the data source (dimensions and metrics).
+ if (!org.apache.commons.lang3.StringUtils
+ .isEmpty(properties.getProperty(serdeConstants.LIST_COLUMNS))
+ && !org.apache.commons.lang3.StringUtils
+ .isEmpty(properties.getProperty(serdeConstants.LIST_COLUMN_TYPES))) {
+ columnNames.addAll(Utilities.getColumnNames(properties));
+ if (!columnNames.contains(DruidTable.DEFAULT_TIMESTAMP_COLUMN)) {
+ throw new SerDeException("Timestamp column (' " + DruidTable.DEFAULT_TIMESTAMP_COLUMN +
+ "') not specified in create table; list of columns is : " +
+ properties.getProperty(serdeConstants.LIST_COLUMNS));
+ }
+ columnTypes.addAll(Lists.transform(Utilities.getColumnTypes(properties),
+ new Function<String, PrimitiveTypeInfo>() {
+ @Override
+ public PrimitiveTypeInfo apply(String type) {
+ return TypeInfoFactory.getPrimitiveTypeInfo(type);
+ }
+ }
+ ));
+ inspectors.addAll(Lists.transform(columnTypes,
+ new Function<PrimitiveTypeInfo, ObjectInspector>() {
+ @Override
+ public ObjectInspector apply(PrimitiveTypeInfo type) {
+ return PrimitiveObjectInspectorFactory
+ .getPrimitiveWritableObjectInspector(type);
+ }
+ }
+ ));
+ columns = columnNames.toArray(new String[columnNames.size()]);
+ types = columnTypes.toArray(new PrimitiveTypeInfo[columnTypes.size()]);
+ inspector = ObjectInspectorFactory
+ .getStandardStructObjectInspector(columnNames, inspectors);
+ } else {
+ String dataSource = properties.getProperty(Constants.DRUID_DATA_SOURCE);
+ if (dataSource == null) {
+ throw new SerDeException("Druid data source not specified; use " +
+ Constants.DRUID_DATA_SOURCE + " in table properties");
+ }
+ SegmentMetadataQueryBuilder builder = new Druids.SegmentMetadataQueryBuilder();
+ builder.dataSource(dataSource);
+ builder.merge(true);
+ builder.analysisTypes();
+ SegmentMetadataQuery query = builder.build();
- // Execute query in Druid
- String address = HiveConf.getVar(configuration,
- HiveConf.ConfVars.HIVE_DRUID_BROKER_DEFAULT_ADDRESS);
- if (org.apache.commons.lang3.StringUtils.isEmpty(address)) {
- throw new SerDeException("Druid broker address not specified in configuration");
- }
+ // Execute query in Druid
+ String address = HiveConf.getVar(configuration,
+ HiveConf.ConfVars.HIVE_DRUID_BROKER_DEFAULT_ADDRESS
+ );
+ if (org.apache.commons.lang3.StringUtils.isEmpty(address)) {
+ throw new SerDeException("Druid broker address not specified in configuration");
+ }
numConnection = HiveConf
.getIntVar(configuration, HiveConf.ConfVars.HIVE_DRUID_NUM_HTTP_CONNECTION);
readTimeout = new Period(
HiveConf.getVar(configuration, HiveConf.ConfVars.HIVE_DRUID_HTTP_READ_TIMEOUT));
- // Infer schema
- SegmentAnalysis schemaInfo;
- try {
- schemaInfo = submitMetadataRequest(address, query);
- } catch (IOException e) {
- throw new SerDeException(e);
- }
- for (Entry<String,ColumnAnalysis> columnInfo : schemaInfo.getColumns().entrySet()) {
- if (columnInfo.getKey().equals(DruidTable.DEFAULT_TIMESTAMP_COLUMN)) {
- // Special handling for timestamp column
+
+ // Infer schema
+ SegmentAnalysis schemaInfo;
+ try {
+ schemaInfo = submitMetadataRequest(address, query);
+ } catch (IOException e) {
+ throw new SerDeException(e);
+ }
+ for (Entry<String, ColumnAnalysis> columnInfo : schemaInfo.getColumns().entrySet()) {
+ if (columnInfo.getKey().equals(DruidTable.DEFAULT_TIMESTAMP_COLUMN)) {
+ // Special handling for timestamp column
+ columnNames.add(columnInfo.getKey()); // field name
+ PrimitiveTypeInfo type = TypeInfoFactory.timestampTypeInfo; // field type
+ columnTypes.add(type);
+ inspectors
+ .add(PrimitiveObjectInspectorFactory.getPrimitiveWritableObjectInspector(type));
+ continue;
+ }
columnNames.add(columnInfo.getKey()); // field name
- PrimitiveTypeInfo type = TypeInfoFactory.timestampTypeInfo; // field type
+ PrimitiveTypeInfo type = DruidSerDeUtils.convertDruidToHiveType(
+ columnInfo.getValue().getType()); // field type
columnTypes.add(type);
inspectors.add(PrimitiveObjectInspectorFactory.getPrimitiveWritableObjectInspector(type));
- continue;
}
- columnNames.add(columnInfo.getKey()); // field name
- PrimitiveTypeInfo type = DruidSerDeUtils.convertDruidToHiveType(
- columnInfo.getValue().getType()); // field type
- columnTypes.add(type);
- inspectors.add(PrimitiveObjectInspectorFactory.getPrimitiveWritableObjectInspector(type));
+ columns = columnNames.toArray(new String[columnNames.size()]);
+ types = columnTypes.toArray(new PrimitiveTypeInfo[columnTypes.size()]);
+ inspector = ObjectInspectorFactory
+ .getStandardStructObjectInspector(columnNames, inspectors);
}
- columns = columnNames.toArray(new String[columnNames.size()]);
- types = columnTypes.toArray(new PrimitiveTypeInfo[columnTypes.size()]);
- inspector = ObjectInspectorFactory.getStandardStructObjectInspector(columnNames, inspectors);
} else {
// Query is specified, we can extract the results schema from the query
Query<?> query;
@@ -171,13 +220,14 @@ public class DruidSerDe extends AbstractSerDe {
default:
throw new SerDeException("Not supported Druid query");
}
-
+
columns = new String[columnNames.size()];
types = new PrimitiveTypeInfo[columnNames.size()];
for (int i = 0; i < columnTypes.size(); ++i) {
columns[i] = columnNames.get(i);
types[i] = columnTypes.get(i);
- inspectors.add(PrimitiveObjectInspectorFactory.getPrimitiveWritableObjectInspector(types[i]));
+ inspectors
+ .add(PrimitiveObjectInspectorFactory.getPrimitiveWritableObjectInspector(types[i]));
}
inspector = ObjectInspectorFactory.getStandardStructObjectInspector(columnNames, inspectors);
}
@@ -192,22 +242,29 @@ public class DruidSerDe extends AbstractSerDe {
/* Submits the request and returns */
protected SegmentAnalysis submitMetadataRequest(String address, SegmentMetadataQuery query)
throws SerDeException, IOException {
+ final Lifecycle lifecycle = new Lifecycle();
HttpClient client = HttpClientInit.createClient(
HttpClientConfig.builder().withNumConnections(numConnection)
- .withReadTimeout(readTimeout.toStandardDuration()).build(), new Lifecycle());
+ .withReadTimeout(readTimeout.toStandardDuration()).build(), lifecycle);
InputStream response;
try {
+ lifecycle.start();
response = DruidStorageHandlerUtils.submitRequest(client,
- DruidStorageHandlerUtils.createRequest(address, query));
+ DruidStorageHandlerUtils.createRequest(address, query)
+ );
} catch (Exception e) {
throw new SerDeException(StringUtils.stringifyException(e));
+ } finally {
+ lifecycle.stop();
}
// Retrieve results
List<SegmentAnalysis> resultsList;
try {
resultsList = DruidStorageHandlerUtils.SMILE_MAPPER.readValue(response,
- new TypeReference<List<SegmentAnalysis>>() {});
+ new TypeReference<List<SegmentAnalysis>>() {
+ }
+ );
} catch (Exception e) {
response.close();
throw new SerDeException(StringUtils.stringifyException(e));
@@ -224,7 +281,8 @@ public class DruidSerDe extends AbstractSerDe {
/* Timeseries query */
private void inferSchema(TimeseriesQuery query, List<String> columnNames,
- List<PrimitiveTypeInfo> columnTypes) {
+ List<PrimitiveTypeInfo> columnTypes
+ ) {
// Timestamp column
columnNames.add(DruidTable.DEFAULT_TIMESTAMP_COLUMN);
columnTypes.add(TypeInfoFactory.timestampTypeInfo);
@@ -241,7 +299,9 @@ public class DruidSerDe extends AbstractSerDe {
}
/* TopN query */
- private void inferSchema(TopNQuery query, List<String> columnNames, List<PrimitiveTypeInfo> columnTypes) {
+ private void inferSchema(TopNQuery query, List<String> columnNames,
+ List<PrimitiveTypeInfo> columnTypes
+ ) {
// Timestamp column
columnNames.add(DruidTable.DEFAULT_TIMESTAMP_COLUMN);
columnTypes.add(TypeInfoFactory.timestampTypeInfo);
@@ -262,7 +322,8 @@ public class DruidSerDe extends AbstractSerDe {
/* Select query */
private void inferSchema(SelectQuery query, List<String> columnNames,
- List<PrimitiveTypeInfo> columnTypes) {
+ List<PrimitiveTypeInfo> columnTypes
+ ) {
// Timestamp column
columnNames.add(DruidTable.DEFAULT_TIMESTAMP_COLUMN);
columnTypes.add(TypeInfoFactory.timestampTypeInfo);
@@ -279,7 +340,9 @@ public class DruidSerDe extends AbstractSerDe {
}
/* GroupBy query */
- private void inferSchema(GroupByQuery query, List<String> columnNames, List<PrimitiveTypeInfo> columnTypes) {
+ private void inferSchema(GroupByQuery query, List<String> columnNames,
+ List<PrimitiveTypeInfo> columnTypes
+ ) {
// Timestamp column
columnNames.add(DruidTable.DEFAULT_TIMESTAMP_COLUMN);
columnTypes.add(TypeInfoFactory.timestampTypeInfo);
@@ -302,17 +365,67 @@ public class DruidSerDe extends AbstractSerDe {
@Override
public Class<? extends Writable> getSerializedClass() {
- return NullWritable.class;
+ return DruidWritable.class;
}
@Override
public Writable serialize(Object o, ObjectInspector objectInspector) throws SerDeException {
- return NullWritable.get();
+ if (objectInspector.getCategory() != ObjectInspector.Category.STRUCT) {
+ throw new SerDeException(getClass().toString()
+ + " can only serialize struct types, but we got: "
+ + objectInspector.getTypeName());
+ }
+
+ // Prepare the field ObjectInspectors
+ StructObjectInspector soi = (StructObjectInspector) objectInspector;
+ List<? extends StructField> fields = soi.getAllStructFieldRefs();
+ List<Object> values = soi.getStructFieldsDataAsList(o);
+ // We deserialize the result
+ Map<String, Object> value = new HashMap<>();
+ for (int i = 0; i < columns.length; i++) {
+ if (values.get(i) == null) {
+ // null, we just add it
+ value.put(columns[i], null);
+ continue;
+ }
+ final Object res;
+ switch (types[i].getPrimitiveCategory()) {
+ case TIMESTAMP:
+ res = ((TimestampObjectInspector) fields.get(i).getFieldObjectInspector())
+ .getPrimitiveJavaObject(
+ values.get(i)).getTime();
+ break;
+ case LONG:
+ res = ((LongObjectInspector) fields.get(i).getFieldObjectInspector()).get(values.get(i));
+ break;
+ case FLOAT:
+ res = ((FloatObjectInspector) fields.get(i).getFieldObjectInspector()).get(values.get(i));
+ break;
+ case DOUBLE:
+ res = ((DoubleObjectInspector) fields.get(i).getFieldObjectInspector())
+ .get(values.get(i));
+ break;
+ case STRING:
+ res = ((StringObjectInspector) fields.get(i).getFieldObjectInspector())
+ .getPrimitiveJavaObject(
+ values.get(i));
+ break;
+ default:
+ throw new SerDeException("Unknown type: " + types[i].getPrimitiveCategory());
+ }
+ value.put(columns[i], res);
+ }
+ value.put(Constants.DRUID_TIMESTAMP_GRANULARITY_COL_NAME,
+ ((TimestampObjectInspector) fields.get(columns.length).getFieldObjectInspector())
+ .getPrimitiveJavaObject(values.get(columns.length)).getTime()
+ );
+ return new DruidWritable(value);
}
@Override
public SerDeStats getSerDeStats() {
- throw new UnsupportedOperationException("SerdeStats not supported.");
+ // no support for statistics
+ return null;
}
@Override
@@ -327,13 +440,16 @@ public class DruidSerDe extends AbstractSerDe {
}
switch (types[i].getPrimitiveCategory()) {
case TIMESTAMP:
- output.add(new TimestampWritable(new Timestamp((Long)value)));
+ output.add(new TimestampWritable(new Timestamp((Long) value)));
break;
case LONG:
- output.add(new LongWritable(((Number)value).longValue()));
+ output.add(new LongWritable(((Number) value).longValue()));
break;
case FLOAT:
- output.add(new FloatWritable(((Number)value).floatValue()));
+ output.add(new FloatWritable(((Number) value).floatValue()));
+ break;
+ case DOUBLE:
+ output.add(new DoubleWritable(((Number) value).floatValue()));
break;
case STRING:
output.add(new Text(value.toString()));
http://git-wip-us.apache.org/repos/asf/hive/blob/590687bc/druid-handler/src/java/org/apache/hadoop/hive/druid/serde/DruidSerDeUtils.java
----------------------------------------------------------------------
diff --git a/druid-handler/src/java/org/apache/hadoop/hive/druid/serde/DruidSerDeUtils.java b/druid-handler/src/java/org/apache/hadoop/hive/druid/serde/DruidSerDeUtils.java
index 29b8845..64a19f6 100644
--- a/druid-handler/src/java/org/apache/hadoop/hive/druid/serde/DruidSerDeUtils.java
+++ b/druid-handler/src/java/org/apache/hadoop/hive/druid/serde/DruidSerDeUtils.java
@@ -31,14 +31,16 @@ public final class DruidSerDeUtils {
private static final Logger LOG = LoggerFactory.getLogger(DruidSerDeUtils.class);
protected static final String FLOAT_TYPE = "FLOAT";
+
protected static final String LONG_TYPE = "LONG";
+
protected static final String STRING_TYPE = "STRING";
/* This method converts from the String representation of Druid type
* to the corresponding Hive type */
public static PrimitiveTypeInfo convertDruidToHiveType(String typeName) {
typeName = typeName.toUpperCase();
- switch(typeName) {
+ switch (typeName) {
case FLOAT_TYPE:
return TypeInfoFactory.floatTypeInfo;
case LONG_TYPE:
@@ -61,7 +63,7 @@ public final class DruidSerDeUtils {
* to the String representation of the corresponding Hive type */
public static String convertDruidToHiveTypeString(String typeName) {
typeName = typeName.toUpperCase();
- switch(typeName) {
+ switch (typeName) {
case FLOAT_TYPE:
return serdeConstants.FLOAT_TYPE_NAME;
case LONG_TYPE:
http://git-wip-us.apache.org/repos/asf/hive/blob/590687bc/druid-handler/src/java/org/apache/hadoop/hive/druid/serde/DruidTimeseriesQueryRecordReader.java
----------------------------------------------------------------------
diff --git a/druid-handler/src/java/org/apache/hadoop/hive/druid/serde/DruidTimeseriesQueryRecordReader.java b/druid-handler/src/java/org/apache/hadoop/hive/druid/serde/DruidTimeseriesQueryRecordReader.java
index b91178c..8c2fb10 100644
--- a/druid-handler/src/java/org/apache/hadoop/hive/druid/serde/DruidTimeseriesQueryRecordReader.java
+++ b/druid-handler/src/java/org/apache/hadoop/hive/druid/serde/DruidTimeseriesQueryRecordReader.java
@@ -45,9 +45,12 @@ public class DruidTimeseriesQueryRecordReader
}
@Override
- protected List<Result<TimeseriesResultValue>> createResultsList(InputStream content) throws IOException {
+ protected List<Result<TimeseriesResultValue>> createResultsList(InputStream content)
+ throws IOException {
return DruidStorageHandlerUtils.SMILE_MAPPER.readValue(content,
- new TypeReference<List<Result<TimeseriesResultValue>>>(){});
+ new TypeReference<List<Result<TimeseriesResultValue>>>() {
+ }
+ );
}
@Override
http://git-wip-us.apache.org/repos/asf/hive/blob/590687bc/druid-handler/src/java/org/apache/hadoop/hive/druid/serde/DruidTopNQueryRecordReader.java
----------------------------------------------------------------------
diff --git a/druid-handler/src/java/org/apache/hadoop/hive/druid/serde/DruidTopNQueryRecordReader.java b/druid-handler/src/java/org/apache/hadoop/hive/druid/serde/DruidTopNQueryRecordReader.java
index 22599c3..d431925 100644
--- a/druid-handler/src/java/org/apache/hadoop/hive/druid/serde/DruidTopNQueryRecordReader.java
+++ b/druid-handler/src/java/org/apache/hadoop/hive/druid/serde/DruidTopNQueryRecordReader.java
@@ -41,6 +41,7 @@ public class DruidTopNQueryRecordReader
extends DruidQueryRecordReader<TopNQuery, Result<TopNResultValue>> {
private Result<TopNResultValue> current;
+
private Iterator<DimensionAndMetricValueExtractor> values = Iterators.emptyIterator();
@Override
@@ -49,9 +50,12 @@ public class DruidTopNQueryRecordReader
}
@Override
- protected List<Result<TopNResultValue>> createResultsList(InputStream content) throws IOException {
+ protected List<Result<TopNResultValue>> createResultsList(InputStream content)
+ throws IOException {
return DruidStorageHandlerUtils.SMILE_MAPPER.readValue(content,
- new TypeReference<List<Result<TopNResultValue>>>(){});
+ new TypeReference<List<Result<TopNResultValue>>>() {
+ }
+ );
}
@Override
http://git-wip-us.apache.org/repos/asf/hive/blob/590687bc/druid-handler/src/test/org/apache/hadoop/hive/druid/DruidStorageHandlerTest.java
----------------------------------------------------------------------
diff --git a/druid-handler/src/test/org/apache/hadoop/hive/druid/DruidStorageHandlerTest.java b/druid-handler/src/test/org/apache/hadoop/hive/druid/DruidStorageHandlerTest.java
new file mode 100644
index 0000000..8770749
--- /dev/null
+++ b/druid-handler/src/test/org/apache/hadoop/hive/druid/DruidStorageHandlerTest.java
@@ -0,0 +1,181 @@
+package org.apache.hadoop.hive.druid;
+
+import com.google.common.collect.ImmutableMap;
+import com.google.common.collect.Lists;
+import io.druid.indexer.JobHelper;
+import io.druid.indexer.SQLMetadataStorageUpdaterJobHandler;
+import io.druid.segment.loading.SegmentLoadingException;
+import io.druid.timeline.DataSegment;
+import io.druid.timeline.partition.NoneShardSpec;
+import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.fs.FileSystem;
+import org.apache.hadoop.fs.LocalFileSystem;
+import org.apache.hadoop.fs.Path;
+import org.apache.hadoop.hive.conf.Constants;
+import org.apache.hadoop.hive.conf.HiveConf;
+import org.apache.hadoop.hive.metastore.api.MetaException;
+import org.apache.hadoop.hive.metastore.api.StorageDescriptor;
+import org.apache.hadoop.hive.metastore.api.Table;
+import org.joda.time.Interval;
+import org.junit.Assert;
+import org.junit.Before;
+import org.junit.Rule;
+import org.junit.Test;
+import org.junit.rules.TemporaryFolder;
+import org.mockito.Mockito;
+import org.skife.jdbi.v2.Handle;
+
+import java.io.IOException;
+import java.io.OutputStream;
+import java.util.Arrays;
+import java.util.Map;
+import java.util.UUID;
+
+public class DruidStorageHandlerTest {
+
+ @Rule
+ public final TestDerbyConnector.DerbyConnectorRule derbyConnectorRule = new TestDerbyConnector.DerbyConnectorRule();
+
+ @Rule
+ public final TemporaryFolder temporaryFolder = new TemporaryFolder();
+
+ private static final String DATA_SOURCE_NAME = "testName";
+
+ private String segmentsTable;
+
+ private String tablePath;
+
+ private DataSegment dataSegment = DataSegment.builder().dataSource(DATA_SOURCE_NAME).version("v1")
+ .interval(new Interval(100, 170)).shardSpec(NoneShardSpec.instance()).build();
+
+ @Before
+ public void before() throws Throwable {
+ tablePath = temporaryFolder.newFolder().getAbsolutePath();
+ segmentsTable = derbyConnectorRule.metadataTablesConfigSupplier().get().getSegmentsTable();
+ Map<String, String> mockMap = ImmutableMap.of(Constants.DRUID_DATA_SOURCE, DATA_SOURCE_NAME);
+ Mockito.when(tableMock.getParameters()).thenReturn(mockMap);
+ Mockito.when(tableMock.getPartitionKeysSize()).thenReturn(0);
+ StorageDescriptor storageDes = Mockito.mock(StorageDescriptor.class);
+ Mockito.when(storageDes.getBucketColsSize()).thenReturn(0);
+ Mockito.when(tableMock.getSd()).thenReturn(storageDes);
+ Mockito.when(tableMock.getDbName()).thenReturn(DATA_SOURCE_NAME);
+ }
+
+ Table tableMock = Mockito.mock(Table.class);
+
+ @Test
+ public void testPreCreateTableWillCreateSegmentsTable() throws MetaException {
+ DruidStorageHandler druidStorageHandler = new DruidStorageHandler(
+ derbyConnectorRule.getConnector(),
+ new SQLMetadataStorageUpdaterJobHandler(derbyConnectorRule.getConnector()),
+ derbyConnectorRule.metadataTablesConfigSupplier().get()
+ );
+
+ try (Handle handle = derbyConnectorRule.getConnector().getDBI().open()) {
+ Assert.assertFalse(derbyConnectorRule.getConnector()
+ .tableExists(handle,
+ segmentsTable
+ ));
+ druidStorageHandler.preCreateTable(tableMock);
+ Assert.assertTrue(derbyConnectorRule.getConnector()
+ .tableExists(handle,
+ segmentsTable
+ ));
+ }
+
+ }
+
+ @Test(expected = MetaException.class)
+ public void testPreCreateTableWhenDataSourceExists() throws MetaException {
+ derbyConnectorRule.getConnector().createSegmentTable();
+ SQLMetadataStorageUpdaterJobHandler sqlMetadataStorageUpdaterJobHandler = new SQLMetadataStorageUpdaterJobHandler(
+ derbyConnectorRule.getConnector());
+ sqlMetadataStorageUpdaterJobHandler.publishSegments(segmentsTable, Arrays.asList(dataSegment),
+ DruidStorageHandlerUtils.JSON_MAPPER
+ );
+ DruidStorageHandler druidStorageHandler = new DruidStorageHandler(
+ derbyConnectorRule.getConnector(),
+ new SQLMetadataStorageUpdaterJobHandler(derbyConnectorRule.getConnector()),
+ derbyConnectorRule.metadataTablesConfigSupplier().get()
+ );
+ druidStorageHandler.preCreateTable(tableMock);
+ }
+
+ @Test
+ public void testCommitCreateTablePlusCommitDropTableWithoutPurge()
+ throws MetaException, IOException {
+ DruidStorageHandler druidStorageHandler = new DruidStorageHandler(
+ derbyConnectorRule.getConnector(),
+ new SQLMetadataStorageUpdaterJobHandler(derbyConnectorRule.getConnector()),
+ derbyConnectorRule.metadataTablesConfigSupplier().get()
+ );
+ druidStorageHandler.preCreateTable(tableMock);
+ Configuration config = new Configuration();
+ config.set(String.valueOf(HiveConf.ConfVars.HIVEQUERYID), UUID.randomUUID().toString());
+ config.set(String.valueOf(HiveConf.ConfVars.DRUID_WORKING_DIR), tablePath);
+ druidStorageHandler.setConf(config);
+ LocalFileSystem localFileSystem = FileSystem.getLocal(config);
+ /*
+ final descriptor path is in the form tablePath/taskId_Attempt_ID/segmentDescriptorDir/segmentIdentifier.json
+ UUID.randomUUID() will fake the taskId_attemptID
+ */
+ Path taskDirPath = new Path(tablePath, druidStorageHandler.makeStagingName());
+ Path descriptorPath = DruidStorageHandlerUtils.makeSegmentDescriptorOutputPath(dataSegment,
+ new Path(taskDirPath, DruidStorageHandler.SEGMENTS_DESCRIPTOR_DIR_NAME)
+ );
+ DruidStorageHandlerUtils.writeSegmentDescriptor(localFileSystem, dataSegment, descriptorPath);
+ druidStorageHandler.commitCreateTable(tableMock);
+ Assert.assertArrayEquals(Lists.newArrayList(DATA_SOURCE_NAME).toArray(), Lists.newArrayList(
+ DruidStorageHandlerUtils.getAllDataSourceNames(derbyConnectorRule.getConnector(),
+ derbyConnectorRule.metadataTablesConfigSupplier().get()
+ )).toArray());
+ druidStorageHandler.commitDropTable(tableMock, false);
+ Assert.assertArrayEquals(Lists.newArrayList().toArray(), Lists.newArrayList(
+ DruidStorageHandlerUtils.getAllDataSourceNames(derbyConnectorRule.getConnector(),
+ derbyConnectorRule.metadataTablesConfigSupplier().get()
+ )).toArray());
+
+ }
+
+ @Test
+ public void testDeleteSegment() throws IOException, SegmentLoadingException {
+ DruidStorageHandler druidStorageHandler = new DruidStorageHandler(
+ derbyConnectorRule.getConnector(),
+ new SQLMetadataStorageUpdaterJobHandler(derbyConnectorRule.getConnector()),
+ derbyConnectorRule.metadataTablesConfigSupplier().get()
+ );
+
+ String segmentRootPath = temporaryFolder.newFolder().getAbsolutePath();
+ Configuration config = new Configuration();
+ druidStorageHandler.setConf(config);
+ LocalFileSystem localFileSystem = FileSystem.getLocal(config);
+
+ Path segmentOutputPath = JobHelper
+ .makeSegmentOutputPath(new Path(segmentRootPath), localFileSystem, dataSegment);
+ Path indexPath = new Path(segmentOutputPath, "index.zip");
+ DataSegment dataSegmentWithLoadspect = DataSegment.builder(dataSegment).loadSpec(
+ ImmutableMap.<String, Object>of("path", indexPath)).build();
+ OutputStream outputStream = localFileSystem.create(indexPath, true);
+ outputStream.close();
+ Assert.assertTrue("index file is not created ??", localFileSystem.exists(indexPath));
+ Assert.assertTrue(localFileSystem.exists(segmentOutputPath));
+
+ druidStorageHandler.deleteSegment(dataSegmentWithLoadspect);
+ // path format -- > .../dataSource/interval/version/partitionNum/xxx.zip
+ Assert.assertFalse("Index file still there ??", localFileSystem.exists(indexPath));
+ // path format of segmentOutputPath -- > .../dataSource/interval/version/partitionNum/
+ Assert.assertFalse("PartitionNum directory still there ??",
+ localFileSystem.exists(segmentOutputPath)
+ );
+ Assert.assertFalse("Version directory still there ??",
+ localFileSystem.exists(segmentOutputPath.getParent())
+ );
+ Assert.assertFalse("Interval directory still there ??",
+ localFileSystem.exists(segmentOutputPath.getParent().getParent())
+ );
+ Assert.assertFalse("Data source directory still there ??",
+ localFileSystem.exists(segmentOutputPath.getParent().getParent().getParent())
+ );
+ }
+
+}
http://git-wip-us.apache.org/repos/asf/hive/blob/590687bc/druid-handler/src/test/org/apache/hadoop/hive/druid/QTestDruidSerDe.java
----------------------------------------------------------------------
diff --git a/druid-handler/src/test/org/apache/hadoop/hive/druid/QTestDruidSerDe.java b/druid-handler/src/test/org/apache/hadoop/hive/druid/QTestDruidSerDe.java
index 2b4df78..8dc8091 100644
--- a/druid-handler/src/test/org/apache/hadoop/hive/druid/QTestDruidSerDe.java
+++ b/druid-handler/src/test/org/apache/hadoop/hive/druid/QTestDruidSerDe.java
@@ -41,34 +41,34 @@ public class QTestDruidSerDe extends DruidSerDe {
// + "\"usingDefaultInterval\":true,\"lenientAggregatorMerge\":false,\"descending\":false}";
private static final String RESPONSE =
"[ {\r\n "
- + " \"id\" : \"merged\",\r\n "
- + " \"intervals\" : [ \"2010-01-01T00:00:00.000Z/2015-12-31T00:00:00.000Z\" ],\r\n "
- + " \"columns\" : {\r\n "
- + " \"__time\" : { \"type\" : \"LONG\", \"hasMultipleValues\" : false, \"size\" : 407240380, \"cardinality\" : null, \"errorMessage\" : null },\r\n "
- + " \"robot\" : { \"type\" : \"STRING\", \"hasMultipleValues\" : false, \"size\" : 100000, \"cardinality\" : 1944, \"errorMessage\" : null },\r\n "
- + " \"namespace\" : { \"type\" : \"STRING\", \"hasMultipleValues\" : true, \"size\" : 100000, \"cardinality\" : 1504, \"errorMessage\" : null },\r\n "
- + " \"anonymous\" : { \"type\" : \"STRING\", \"hasMultipleValues\" : false, \"size\" : 100000, \"cardinality\" : 1944, \"errorMessage\" : null },\r\n "
- + " \"unpatrolled\" : { \"type\" : \"STRING\", \"hasMultipleValues\" : false, \"size\" : 100000, \"cardinality\" : 1944, \"errorMessage\" : null },\r\n "
- + " \"page\" : { \"type\" : \"STRING\", \"hasMultipleValues\" : false, \"size\" : 100000, \"cardinality\" : 1944, \"errorMessage\" : null },\r\n "
- + " \"language\" : { \"type\" : \"STRING\", \"hasMultipleValues\" : false, \"size\" : 100000, \"cardinality\" : 1944, \"errorMessage\" : null },\r\n "
- + " \"newpage\" : { \"type\" : \"STRING\", \"hasMultipleValues\" : false, \"size\" : 100000, \"cardinality\" : 1944, \"errorMessage\" : null },\r\n "
- + " \"user\" : { \"type\" : \"STRING\", \"hasMultipleValues\" : false, \"size\" : 100000, \"cardinality\" : 1944, \"errorMessage\" : null },\r\n "
- + " \"count\" : { \"type\" : \"FLOAT\", \"hasMultipleValues\" : false, \"size\" : 100000, \"cardinality\" : null, \"errorMessage\" : null },\r\n "
- + " \"added\" : { \"type\" : \"FLOAT\", \"hasMultipleValues\" : false, \"size\" : 100000, \"cardinality\" : null, \"errorMessage\" : null },\r\n "
- + " \"delta\" : { \"type\" : \"FLOAT\", \"hasMultipleValues\" : false, \"size\" : 100000, \"cardinality\" : null, \"errorMessage\" : null },\r\n "
- + " \"variation\" : { \"type\" : \"FLOAT\", \"hasMultipleValues\" : false, \"size\" : 100000, \"cardinality\" : null, \"errorMessage\" : null },\r\n "
- + " \"deleted\" : { \"type\" : \"FLOAT\", \"hasMultipleValues\" : false, \"size\" : 100000, \"cardinality\" : null, \"errorMessage\" : null }\r\n "
- + " },\r\n "
- + " \"aggregators\" : {\r\n "
- + " \"count\" : { \"type\" : \"longSum\", \"name\" : \"count\", \"fieldName\" : \"count\" },\r\n "
- + " \"added\" : { \"type\" : \"doubleSum\", \"name\" : \"added\", \"fieldName\" : \"added\" },\r\n "
- + " \"delta\" : { \"type\" : \"doubleSum\", \"name\" : \"delta\", \"fieldName\" : \"delta\" },\r\n "
- + " \"variation\" : { \"type\" : \"doubleSum\", \"name\" : \"variation\", \"fieldName\" : \"variation\" },\r\n "
- + " \"deleted\" : { \"type\" : \"doubleSum\", \"name\" : \"deleted\", \"fieldName\" : \"deleted\" }\r\n "
- + " },\r\n "
- + " \"queryGranularity\" : {\r\n \"type\": \"none\"\r\n },\r\n "
- + " \"size\" : 300000,\r\n "
- + " \"numRows\" : 5000000\r\n} ]";
+ + " \"id\" : \"merged\",\r\n "
+ + " \"intervals\" : [ \"2010-01-01T00:00:00.000Z/2015-12-31T00:00:00.000Z\" ],\r\n "
+ + " \"columns\" : {\r\n "
+ + " \"__time\" : { \"type\" : \"LONG\", \"hasMultipleValues\" : false, \"size\" : 407240380, \"cardinality\" : null, \"errorMessage\" : null },\r\n "
+ + " \"robot\" : { \"type\" : \"STRING\", \"hasMultipleValues\" : false, \"size\" : 100000, \"cardinality\" : 1944, \"errorMessage\" : null },\r\n "
+ + " \"namespace\" : { \"type\" : \"STRING\", \"hasMultipleValues\" : true, \"size\" : 100000, \"cardinality\" : 1504, \"errorMessage\" : null },\r\n "
+ + " \"anonymous\" : { \"type\" : \"STRING\", \"hasMultipleValues\" : false, \"size\" : 100000, \"cardinality\" : 1944, \"errorMessage\" : null },\r\n "
+ + " \"unpatrolled\" : { \"type\" : \"STRING\", \"hasMultipleValues\" : false, \"size\" : 100000, \"cardinality\" : 1944, \"errorMessage\" : null },\r\n "
+ + " \"page\" : { \"type\" : \"STRING\", \"hasMultipleValues\" : false, \"size\" : 100000, \"cardinality\" : 1944, \"errorMessage\" : null },\r\n "
+ + " \"language\" : { \"type\" : \"STRING\", \"hasMultipleValues\" : false, \"size\" : 100000, \"cardinality\" : 1944, \"errorMessage\" : null },\r\n "
+ + " \"newpage\" : { \"type\" : \"STRING\", \"hasMultipleValues\" : false, \"size\" : 100000, \"cardinality\" : 1944, \"errorMessage\" : null },\r\n "
+ + " \"user\" : { \"type\" : \"STRING\", \"hasMultipleValues\" : false, \"size\" : 100000, \"cardinality\" : 1944, \"errorMessage\" : null },\r\n "
+ + " \"count\" : { \"type\" : \"FLOAT\", \"hasMultipleValues\" : false, \"size\" : 100000, \"cardinality\" : null, \"errorMessage\" : null },\r\n "
+ + " \"added\" : { \"type\" : \"FLOAT\", \"hasMultipleValues\" : false, \"size\" : 100000, \"cardinality\" : null, \"errorMessage\" : null },\r\n "
+ + " \"delta\" : { \"type\" : \"FLOAT\", \"hasMultipleValues\" : false, \"size\" : 100000, \"cardinality\" : null, \"errorMessage\" : null },\r\n "
+ + " \"variation\" : { \"type\" : \"FLOAT\", \"hasMultipleValues\" : false, \"size\" : 100000, \"cardinality\" : null, \"errorMessage\" : null },\r\n "
+ + " \"deleted\" : { \"type\" : \"FLOAT\", \"hasMultipleValues\" : false, \"size\" : 100000, \"cardinality\" : null, \"errorMessage\" : null }\r\n "
+ + " },\r\n "
+ + " \"aggregators\" : {\r\n "
+ + " \"count\" : { \"type\" : \"longSum\", \"name\" : \"count\", \"fieldName\" : \"count\" },\r\n "
+ + " \"added\" : { \"type\" : \"doubleSum\", \"name\" : \"added\", \"fieldName\" : \"added\" },\r\n "
+ + " \"delta\" : { \"type\" : \"doubleSum\", \"name\" : \"delta\", \"fieldName\" : \"delta\" },\r\n "
+ + " \"variation\" : { \"type\" : \"doubleSum\", \"name\" : \"variation\", \"fieldName\" : \"variation\" },\r\n "
+ + " \"deleted\" : { \"type\" : \"doubleSum\", \"name\" : \"deleted\", \"fieldName\" : \"deleted\" }\r\n "
+ + " },\r\n "
+ + " \"queryGranularity\" : {\r\n \"type\": \"none\"\r\n },\r\n "
+ + " \"size\" : 300000,\r\n "
+ + " \"numRows\" : 5000000\r\n} ]";
/* Submits the request and returns */
@Override
@@ -78,7 +78,9 @@ public class QTestDruidSerDe extends DruidSerDe {
List<SegmentAnalysis> resultsList;
try {
resultsList = DruidStorageHandlerUtils.JSON_MAPPER.readValue(RESPONSE,
- new TypeReference<List<SegmentAnalysis>>() {});
+ new TypeReference<List<SegmentAnalysis>>() {
+ }
+ );
} catch (Exception e) {
throw new SerDeException(StringUtils.stringifyException(e));
}
http://git-wip-us.apache.org/repos/asf/hive/blob/590687bc/druid-handler/src/test/org/apache/hadoop/hive/druid/TestDerbyConnector.java
----------------------------------------------------------------------
diff --git a/druid-handler/src/test/org/apache/hadoop/hive/druid/TestDerbyConnector.java b/druid-handler/src/test/org/apache/hadoop/hive/druid/TestDerbyConnector.java
new file mode 100644
index 0000000..75c0129
--- /dev/null
+++ b/druid-handler/src/test/org/apache/hadoop/hive/druid/TestDerbyConnector.java
@@ -0,0 +1,108 @@
+package org.apache.hadoop.hive.druid;
+
+import com.google.common.base.Supplier;
+import com.google.common.base.Suppliers;
+import io.druid.metadata.MetadataStorageConnectorConfig;
+import io.druid.metadata.MetadataStorageTablesConfig;
+import io.druid.metadata.storage.derby.DerbyConnector;
+import org.junit.Assert;
+import org.junit.rules.ExternalResource;
+import org.skife.jdbi.v2.DBI;
+import org.skife.jdbi.v2.exceptions.UnableToObtainConnectionException;
+
+import java.sql.SQLException;
+import java.util.UUID;
+
+public class TestDerbyConnector extends DerbyConnector {
+ private final String jdbcUri;
+
+ public TestDerbyConnector(
+ Supplier<MetadataStorageConnectorConfig> config,
+ Supplier<MetadataStorageTablesConfig> dbTables
+ ) {
+ this(config, dbTables, "jdbc:derby:memory:druidTest" + dbSafeUUID());
+ }
+
+ protected TestDerbyConnector(
+ Supplier<MetadataStorageConnectorConfig> config,
+ Supplier<MetadataStorageTablesConfig> dbTables,
+ String jdbcUri
+ ) {
+ super(config, dbTables, new DBI(jdbcUri + ";create=true"));
+ this.jdbcUri = jdbcUri;
+ }
+
+ public void tearDown() {
+ try {
+ new DBI(jdbcUri + ";drop=true").open().close();
+ } catch (UnableToObtainConnectionException e) {
+ SQLException cause = (SQLException) e.getCause();
+ // error code "08006" indicates proper shutdown
+ Assert.assertEquals(String.format("Derby not shutdown: [%s]", cause.toString()), "08006",
+ cause.getSQLState()
+ );
+ }
+ }
+
+ public static String dbSafeUUID() {
+ return UUID.randomUUID().toString().replace("-", "");
+ }
+
+ public String getJdbcUri() {
+ return jdbcUri;
+ }
+
+ public static class DerbyConnectorRule extends ExternalResource {
+ private TestDerbyConnector connector;
+
+ private final Supplier<MetadataStorageTablesConfig> dbTables;
+
+ private final MetadataStorageConnectorConfig connectorConfig;
+
+ public DerbyConnectorRule() {
+ this("druidTest" + dbSafeUUID());
+ }
+
+ private DerbyConnectorRule(
+ final String defaultBase
+ ) {
+ this(Suppliers.ofInstance(MetadataStorageTablesConfig.fromBase(defaultBase)));
+ }
+
+ public DerbyConnectorRule(
+ Supplier<MetadataStorageTablesConfig> dbTables
+ ) {
+ this.dbTables = dbTables;
+ this.connectorConfig = new MetadataStorageConnectorConfig() {
+ @Override
+ public String getConnectURI() {
+ return connector.getJdbcUri();
+ }
+ };
+ }
+
+ @Override
+ protected void before() throws Throwable {
+ connector = new TestDerbyConnector(Suppliers.ofInstance(connectorConfig), dbTables);
+ connector.getDBI().open().close(); // create db
+ }
+
+ @Override
+ protected void after() {
+ connector.tearDown();
+ }
+
+ public TestDerbyConnector getConnector() {
+ return connector;
+ }
+
+ public MetadataStorageConnectorConfig getMetadataConnectorConfig() {
+ return connectorConfig;
+ }
+
+ public Supplier<MetadataStorageTablesConfig> metadataTablesConfigSupplier() {
+ return dbTables;
+ }
+ }
+
+}
[2/4] hive git commit: HIVE-15277: Teach Hive how to create/delete
Druid segments (Slim Bouguerra, reviewed by Jesus Camacho Rodriguez)
Posted by jc...@apache.org.
http://git-wip-us.apache.org/repos/asf/hive/blob/590687bc/druid-handler/src/test/org/apache/hadoop/hive/druid/TestDruidSerDe.java
----------------------------------------------------------------------
diff --git a/druid-handler/src/test/org/apache/hadoop/hive/druid/TestDruidSerDe.java b/druid-handler/src/test/org/apache/hadoop/hive/druid/TestDruidSerDe.java
index 1343939..a495165 100644
--- a/druid-handler/src/test/org/apache/hadoop/hive/druid/TestDruidSerDe.java
+++ b/druid-handler/src/test/org/apache/hadoop/hive/druid/TestDruidSerDe.java
@@ -74,374 +74,408 @@ public class TestDruidSerDe {
// Timeseries query
private static final String TIMESERIES_QUERY =
"{ \"queryType\": \"timeseries\", "
- + " \"dataSource\": \"sample_datasource\", "
- + " \"granularity\": \"day\", "
- + " \"descending\": \"true\", "
- + " \"filter\": { "
- + " \"type\": \"and\", "
- + " \"fields\": [ "
- + " { \"type\": \"selector\", \"dimension\": \"sample_dimension1\", \"value\": \"sample_value1\" }, "
- + " { \"type\": \"or\", "
- + " \"fields\": [ "
- + " { \"type\": \"selector\", \"dimension\": \"sample_dimension2\", \"value\": \"sample_value2\" }, "
- + " { \"type\": \"selector\", \"dimension\": \"sample_dimension3\", \"value\": \"sample_value3\" } "
- + " ] "
- + " } "
- + " ] "
- + " }, "
- + " \"aggregations\": [ "
- + " { \"type\": \"longSum\", \"name\": \"sample_name1\", \"fieldName\": \"sample_fieldName1\" }, "
- + " { \"type\": \"doubleSum\", \"name\": \"sample_name2\", \"fieldName\": \"sample_fieldName2\" } "
- + " ], "
- + " \"postAggregations\": [ "
- + " { \"type\": \"arithmetic\", "
- + " \"name\": \"sample_divide\", "
- + " \"fn\": \"/\", "
- + " \"fields\": [ "
- + " { \"type\": \"fieldAccess\", \"name\": \"postAgg__sample_name1\", \"fieldName\": \"sample_name1\" }, "
- + " { \"type\": \"fieldAccess\", \"name\": \"postAgg__sample_name2\", \"fieldName\": \"sample_name2\" } "
- + " ] "
- + " } "
- + " ], "
- + " \"intervals\": [ \"2012-01-01T00:00:00.000/2012-01-03T00:00:00.000\" ]}";
+ + " \"dataSource\": \"sample_datasource\", "
+ + " \"granularity\": \"day\", "
+ + " \"descending\": \"true\", "
+ + " \"filter\": { "
+ + " \"type\": \"and\", "
+ + " \"fields\": [ "
+ + " { \"type\": \"selector\", \"dimension\": \"sample_dimension1\", \"value\": \"sample_value1\" }, "
+ + " { \"type\": \"or\", "
+ + " \"fields\": [ "
+ + " { \"type\": \"selector\", \"dimension\": \"sample_dimension2\", \"value\": \"sample_value2\" }, "
+ + " { \"type\": \"selector\", \"dimension\": \"sample_dimension3\", \"value\": \"sample_value3\" } "
+ + " ] "
+ + " } "
+ + " ] "
+ + " }, "
+ + " \"aggregations\": [ "
+ + " { \"type\": \"longSum\", \"name\": \"sample_name1\", \"fieldName\": \"sample_fieldName1\" }, "
+ + " { \"type\": \"doubleSum\", \"name\": \"sample_name2\", \"fieldName\": \"sample_fieldName2\" } "
+ + " ], "
+ + " \"postAggregations\": [ "
+ + " { \"type\": \"arithmetic\", "
+ + " \"name\": \"sample_divide\", "
+ + " \"fn\": \"/\", "
+ + " \"fields\": [ "
+ + " { \"type\": \"fieldAccess\", \"name\": \"postAgg__sample_name1\", \"fieldName\": \"sample_name1\" }, "
+ + " { \"type\": \"fieldAccess\", \"name\": \"postAgg__sample_name2\", \"fieldName\": \"sample_name2\" } "
+ + " ] "
+ + " } "
+ + " ], "
+ + " \"intervals\": [ \"2012-01-01T00:00:00.000/2012-01-03T00:00:00.000\" ]}";
+
// Timeseries query results
private static final String TIMESERIES_QUERY_RESULTS =
"[ "
- + "{ "
- + " \"timestamp\": \"2012-01-01T00:00:00.000Z\", "
- + " \"result\": { \"sample_name1\": 0, \"sample_name2\": 1.0, \"sample_divide\": 2.2222 } "
- + "}, "
- + "{ "
- + " \"timestamp\": \"2012-01-02T00:00:00.000Z\", "
- + " \"result\": { \"sample_name1\": 2, \"sample_name2\": 3.32, \"sample_divide\": 4 } "
- + "}]";
+ + "{ "
+ + " \"timestamp\": \"2012-01-01T00:00:00.000Z\", "
+ + " \"result\": { \"sample_name1\": 0, \"sample_name2\": 1.0, \"sample_divide\": 2.2222 } "
+ + "}, "
+ + "{ "
+ + " \"timestamp\": \"2012-01-02T00:00:00.000Z\", "
+ + " \"result\": { \"sample_name1\": 2, \"sample_name2\": 3.32, \"sample_divide\": 4 } "
+ + "}]";
+
// Timeseries query results as records
private static final Object[][] TIMESERIES_QUERY_RESULTS_RECORDS = new Object[][] {
- new Object[] { new TimestampWritable(new Timestamp(1325376000000L)), new LongWritable(0), new FloatWritable(1.0F), new FloatWritable(2.2222F) } ,
- new Object[] { new TimestampWritable(new Timestamp(1325462400000L)), new LongWritable(2), new FloatWritable(3.32F), new FloatWritable(4F)}
+ new Object[] { new TimestampWritable(new Timestamp(1325376000000L)), new LongWritable(0),
+ new FloatWritable(1.0F), new FloatWritable(2.2222F) },
+ new Object[] { new TimestampWritable(new Timestamp(1325462400000L)), new LongWritable(2),
+ new FloatWritable(3.32F), new FloatWritable(4F) }
};
// TopN query
private static final String TOPN_QUERY =
"{ \"queryType\": \"topN\", "
- + " \"dataSource\": \"sample_data\", "
- + " \"dimension\": \"sample_dim\", "
- + " \"threshold\": 5, "
- + " \"metric\": \"count\", "
- + " \"granularity\": \"all\", "
- + " \"filter\": { "
- + " \"type\": \"and\", "
- + " \"fields\": [ "
- + " { "
- + " \"type\": \"selector\", "
- + " \"dimension\": \"dim1\", "
- + " \"value\": \"some_value\" "
- + " }, "
- + " { "
- + " \"type\": \"selector\", "
- + " \"dimension\": \"dim2\", "
- + " \"value\": \"some_other_val\" "
- + " } "
- + " ] "
- + " }, "
- + " \"aggregations\": [ "
- + " { "
- + " \"type\": \"longSum\", "
- + " \"name\": \"count\", "
- + " \"fieldName\": \"count\" "
- + " }, "
- + " { "
- + " \"type\": \"doubleSum\", "
- + " \"name\": \"some_metric\", "
- + " \"fieldName\": \"some_metric\" "
- + " } "
- + " ], "
- + " \"postAggregations\": [ "
- + " { "
- + " \"type\": \"arithmetic\", "
- + " \"name\": \"sample_divide\", "
- + " \"fn\": \"/\", "
- + " \"fields\": [ "
- + " { "
- + " \"type\": \"fieldAccess\", "
- + " \"name\": \"some_metric\", "
- + " \"fieldName\": \"some_metric\" "
- + " }, "
- + " { "
- + " \"type\": \"fieldAccess\", "
- + " \"name\": \"count\", "
- + " \"fieldName\": \"count\" "
- + " } "
- + " ] "
- + " } "
- + " ], "
- + " \"intervals\": [ "
- + " \"2013-08-31T00:00:00.000/2013-09-03T00:00:00.000\" "
- + " ]}";
+ + " \"dataSource\": \"sample_data\", "
+ + " \"dimension\": \"sample_dim\", "
+ + " \"threshold\": 5, "
+ + " \"metric\": \"count\", "
+ + " \"granularity\": \"all\", "
+ + " \"filter\": { "
+ + " \"type\": \"and\", "
+ + " \"fields\": [ "
+ + " { "
+ + " \"type\": \"selector\", "
+ + " \"dimension\": \"dim1\", "
+ + " \"value\": \"some_value\" "
+ + " }, "
+ + " { "
+ + " \"type\": \"selector\", "
+ + " \"dimension\": \"dim2\", "
+ + " \"value\": \"some_other_val\" "
+ + " } "
+ + " ] "
+ + " }, "
+ + " \"aggregations\": [ "
+ + " { "
+ + " \"type\": \"longSum\", "
+ + " \"name\": \"count\", "
+ + " \"fieldName\": \"count\" "
+ + " }, "
+ + " { "
+ + " \"type\": \"doubleSum\", "
+ + " \"name\": \"some_metric\", "
+ + " \"fieldName\": \"some_metric\" "
+ + " } "
+ + " ], "
+ + " \"postAggregations\": [ "
+ + " { "
+ + " \"type\": \"arithmetic\", "
+ + " \"name\": \"sample_divide\", "
+ + " \"fn\": \"/\", "
+ + " \"fields\": [ "
+ + " { "
+ + " \"type\": \"fieldAccess\", "
+ + " \"name\": \"some_metric\", "
+ + " \"fieldName\": \"some_metric\" "
+ + " }, "
+ + " { "
+ + " \"type\": \"fieldAccess\", "
+ + " \"name\": \"count\", "
+ + " \"fieldName\": \"count\" "
+ + " } "
+ + " ] "
+ + " } "
+ + " ], "
+ + " \"intervals\": [ "
+ + " \"2013-08-31T00:00:00.000/2013-09-03T00:00:00.000\" "
+ + " ]}";
+
// TopN query results
private static final String TOPN_QUERY_RESULTS =
"[ "
- + " { "
- + " \"timestamp\": \"2013-08-31T00:00:00.000Z\", "
- + " \"result\": [ "
- + " { "
- + " \"sample_dim\": \"dim1_val\", "
- + " \"count\": 111, "
- + " \"some_metric\": 10669, "
- + " \"sample_divide\": 96.11711711711712 "
- + " }, "
- + " { "
- + " \"sample_dim\": \"another_dim1_val\", "
- + " \"count\": 88, "
- + " \"some_metric\": 28344, "
- + " \"sample_divide\": 322.09090909090907 "
- + " }, "
- + " { "
- + " \"sample_dim\": \"dim1_val3\", "
- + " \"count\": 70, "
- + " \"some_metric\": 871, "
- + " \"sample_divide\": 12.442857142857143 "
- + " }, "
- + " { "
- + " \"sample_dim\": \"dim1_val4\", "
- + " \"count\": 62, "
- + " \"some_metric\": 815, "
- + " \"sample_divide\": 13.14516129032258 "
- + " }, "
- + " { "
- + " \"sample_dim\": \"dim1_val5\", "
- + " \"count\": 60, "
- + " \"some_metric\": 2787, "
- + " \"sample_divide\": 46.45 "
- + " } "
- + " ] "
- + " }]";
+ + " { "
+ + " \"timestamp\": \"2013-08-31T00:00:00.000Z\", "
+ + " \"result\": [ "
+ + " { "
+ + " \"sample_dim\": \"dim1_val\", "
+ + " \"count\": 111, "
+ + " \"some_metric\": 10669, "
+ + " \"sample_divide\": 96.11711711711712 "
+ + " }, "
+ + " { "
+ + " \"sample_dim\": \"another_dim1_val\", "
+ + " \"count\": 88, "
+ + " \"some_metric\": 28344, "
+ + " \"sample_divide\": 322.09090909090907 "
+ + " }, "
+ + " { "
+ + " \"sample_dim\": \"dim1_val3\", "
+ + " \"count\": 70, "
+ + " \"some_metric\": 871, "
+ + " \"sample_divide\": 12.442857142857143 "
+ + " }, "
+ + " { "
+ + " \"sample_dim\": \"dim1_val4\", "
+ + " \"count\": 62, "
+ + " \"some_metric\": 815, "
+ + " \"sample_divide\": 13.14516129032258 "
+ + " }, "
+ + " { "
+ + " \"sample_dim\": \"dim1_val5\", "
+ + " \"count\": 60, "
+ + " \"some_metric\": 2787, "
+ + " \"sample_divide\": 46.45 "
+ + " } "
+ + " ] "
+ + " }]";
+
// TopN query results as records
private static final Object[][] TOPN_QUERY_RESULTS_RECORDS = new Object[][] {
- new Object[] { new TimestampWritable(new Timestamp(1377907200000L)), new Text("dim1_val"), new LongWritable(111), new FloatWritable(10669F), new FloatWritable(96.11711711711712F) } ,
- new Object[] { new TimestampWritable(new Timestamp(1377907200000L)), new Text("another_dim1_val"), new LongWritable(88), new FloatWritable(28344F), new FloatWritable(322.09090909090907F) } ,
- new Object[] { new TimestampWritable(new Timestamp(1377907200000L)), new Text("dim1_val3"), new LongWritable(70), new FloatWritable(871F), new FloatWritable(12.442857142857143F) } ,
- new Object[] { new TimestampWritable(new Timestamp(1377907200000L)), new Text("dim1_val4"), new LongWritable(62), new FloatWritable(815F), new FloatWritable(13.14516129032258F) } ,
- new Object[] { new TimestampWritable(new Timestamp(1377907200000L)), new Text("dim1_val5"), new LongWritable(60), new FloatWritable(2787F), new FloatWritable(46.45F) }
+ new Object[] { new TimestampWritable(new Timestamp(1377907200000L)), new Text("dim1_val"),
+ new LongWritable(111), new FloatWritable(10669F),
+ new FloatWritable(96.11711711711712F) },
+ new Object[] { new TimestampWritable(new Timestamp(1377907200000L)),
+ new Text("another_dim1_val"), new LongWritable(88), new FloatWritable(28344F),
+ new FloatWritable(322.09090909090907F) },
+ new Object[] { new TimestampWritable(new Timestamp(1377907200000L)),
+ new Text("dim1_val3"), new LongWritable(70), new FloatWritable(871F),
+ new FloatWritable(12.442857142857143F) },
+ new Object[] { new TimestampWritable(new Timestamp(1377907200000L)),
+ new Text("dim1_val4"), new LongWritable(62), new FloatWritable(815F),
+ new FloatWritable(13.14516129032258F) },
+ new Object[] { new TimestampWritable(new Timestamp(1377907200000L)),
+ new Text("dim1_val5"), new LongWritable(60), new FloatWritable(2787F),
+ new FloatWritable(46.45F) }
};
// GroupBy query
private static final String GROUP_BY_QUERY =
"{ "
- + " \"queryType\": \"groupBy\", "
- + " \"dataSource\": \"sample_datasource\", "
- + " \"granularity\": \"day\", "
- + " \"dimensions\": [\"country\", \"device\"], "
- + " \"limitSpec\": {"
- + " \"type\": \"default\","
- + " \"limit\": 5000,"
- + " \"columns\": [\"country\", \"data_transfer\"] }, "
- + " \"filter\": { "
- + " \"type\": \"and\", "
- + " \"fields\": [ "
- + " { \"type\": \"selector\", \"dimension\": \"carrier\", \"value\": \"AT&T\" }, "
- + " { \"type\": \"or\", "
- + " \"fields\": [ "
- + " { \"type\": \"selector\", \"dimension\": \"make\", \"value\": \"Apple\" }, "
- + " { \"type\": \"selector\", \"dimension\": \"make\", \"value\": \"Samsung\" } "
- + " ] "
- + " } "
- + " ] "
- + " }, "
- + " \"aggregations\": [ "
- + " { \"type\": \"longSum\", \"name\": \"total_usage\", \"fieldName\": \"user_count\" }, "
- + " { \"type\": \"doubleSum\", \"name\": \"data_transfer\", \"fieldName\": \"data_transfer\" } "
- + " ], "
- + " \"postAggregations\": [ "
- + " { \"type\": \"arithmetic\", "
- + " \"name\": \"avg_usage\", "
- + " \"fn\": \"/\", "
- + " \"fields\": [ "
- + " { \"type\": \"fieldAccess\", \"fieldName\": \"data_transfer\" }, "
- + " { \"type\": \"fieldAccess\", \"fieldName\": \"total_usage\" } "
- + " ] "
- + " } "
- + " ], "
- + " \"intervals\": [ \"2012-01-01T00:00:00.000/2012-01-03T00:00:00.000\" ], "
- + " \"having\": { "
- + " \"type\": \"greaterThan\", "
- + " \"aggregation\": \"total_usage\", "
- + " \"value\": 100 "
- + " }}";
+ + " \"queryType\": \"groupBy\", "
+ + " \"dataSource\": \"sample_datasource\", "
+ + " \"granularity\": \"day\", "
+ + " \"dimensions\": [\"country\", \"device\"], "
+ + " \"limitSpec\": {"
+ + " \"type\": \"default\","
+ + " \"limit\": 5000,"
+ + " \"columns\": [\"country\", \"data_transfer\"] }, "
+ + " \"filter\": { "
+ + " \"type\": \"and\", "
+ + " \"fields\": [ "
+ + " { \"type\": \"selector\", \"dimension\": \"carrier\", \"value\": \"AT&T\" }, "
+ + " { \"type\": \"or\", "
+ + " \"fields\": [ "
+ + " { \"type\": \"selector\", \"dimension\": \"make\", \"value\": \"Apple\" }, "
+ + " { \"type\": \"selector\", \"dimension\": \"make\", \"value\": \"Samsung\" } "
+ + " ] "
+ + " } "
+ + " ] "
+ + " }, "
+ + " \"aggregations\": [ "
+ + " { \"type\": \"longSum\", \"name\": \"total_usage\", \"fieldName\": \"user_count\" }, "
+ + " { \"type\": \"doubleSum\", \"name\": \"data_transfer\", \"fieldName\": \"data_transfer\" } "
+ + " ], "
+ + " \"postAggregations\": [ "
+ + " { \"type\": \"arithmetic\", "
+ + " \"name\": \"avg_usage\", "
+ + " \"fn\": \"/\", "
+ + " \"fields\": [ "
+ + " { \"type\": \"fieldAccess\", \"fieldName\": \"data_transfer\" }, "
+ + " { \"type\": \"fieldAccess\", \"fieldName\": \"total_usage\" } "
+ + " ] "
+ + " } "
+ + " ], "
+ + " \"intervals\": [ \"2012-01-01T00:00:00.000/2012-01-03T00:00:00.000\" ], "
+ + " \"having\": { "
+ + " \"type\": \"greaterThan\", "
+ + " \"aggregation\": \"total_usage\", "
+ + " \"value\": 100 "
+ + " }}";
+
// GroupBy query results
private static final String GROUP_BY_QUERY_RESULTS =
"[ "
- + " { "
- + " \"version\" : \"v1\", "
- + " \"timestamp\" : \"2012-01-01T00:00:00.000Z\", "
- + " \"event\" : { "
- + " \"country\" : \"India\", "
- + " \"device\" : \"phone\", "
- + " \"total_usage\" : 88, "
- + " \"data_transfer\" : 29.91233453, "
- + " \"avg_usage\" : 60.32 "
- + " } "
- + " }, "
- + " { "
- + " \"version\" : \"v1\", "
- + " \"timestamp\" : \"2012-01-01T00:00:12.000Z\", "
- + " \"event\" : { "
- + " \"country\" : \"Spain\", "
- + " \"device\" : \"pc\", "
- + " \"total_usage\" : 16, "
- + " \"data_transfer\" : 172.93494959, "
- + " \"avg_usage\" : 6.333333 "
- + " } "
- + " }]";
+ + " { "
+ + " \"version\" : \"v1\", "
+ + " \"timestamp\" : \"2012-01-01T00:00:00.000Z\", "
+ + " \"event\" : { "
+ + " \"country\" : \"India\", "
+ + " \"device\" : \"phone\", "
+ + " \"total_usage\" : 88, "
+ + " \"data_transfer\" : 29.91233453, "
+ + " \"avg_usage\" : 60.32 "
+ + " } "
+ + " }, "
+ + " { "
+ + " \"version\" : \"v1\", "
+ + " \"timestamp\" : \"2012-01-01T00:00:12.000Z\", "
+ + " \"event\" : { "
+ + " \"country\" : \"Spain\", "
+ + " \"device\" : \"pc\", "
+ + " \"total_usage\" : 16, "
+ + " \"data_transfer\" : 172.93494959, "
+ + " \"avg_usage\" : 6.333333 "
+ + " } "
+ + " }]";
+
// GroupBy query results as records
private static final Object[][] GROUP_BY_QUERY_RESULTS_RECORDS = new Object[][] {
- new Object[] { new TimestampWritable(new Timestamp(1325376000000L)), new Text("India"), new Text("phone"), new LongWritable(88), new FloatWritable(29.91233453F), new FloatWritable(60.32F) } ,
- new Object[] { new TimestampWritable(new Timestamp(1325376012000L)), new Text("Spain"), new Text("pc"), new LongWritable(16), new FloatWritable(172.93494959F), new FloatWritable(6.333333F) }
+ new Object[] { new TimestampWritable(new Timestamp(1325376000000L)), new Text("India"),
+ new Text("phone"), new LongWritable(88), new FloatWritable(29.91233453F),
+ new FloatWritable(60.32F) },
+ new Object[] { new TimestampWritable(new Timestamp(1325376012000L)), new Text("Spain"),
+ new Text("pc"), new LongWritable(16), new FloatWritable(172.93494959F),
+ new FloatWritable(6.333333F) }
};
// Select query
private static final String SELECT_QUERY =
"{ \"queryType\": \"select\", "
- + " \"dataSource\": \"wikipedia\", \"descending\": \"false\", "
- + " \"dimensions\":[\"robot\",\"namespace\",\"anonymous\",\"unpatrolled\",\"page\",\"language\",\"newpage\",\"user\"], "
- + " \"metrics\":[\"count\",\"added\",\"delta\",\"variation\",\"deleted\"], "
- + " \"granularity\": \"all\", "
- + " \"intervals\": [ \"2013-01-01/2013-01-02\" ], "
- + " \"pagingSpec\":{\"pagingIdentifiers\": {}, \"threshold\":5} }";
+ + " \"dataSource\": \"wikipedia\", \"descending\": \"false\", "
+ + " \"dimensions\":[\"robot\",\"namespace\",\"anonymous\",\"unpatrolled\",\"page\",\"language\",\"newpage\",\"user\"], "
+ + " \"metrics\":[\"count\",\"added\",\"delta\",\"variation\",\"deleted\"], "
+ + " \"granularity\": \"all\", "
+ + " \"intervals\": [ \"2013-01-01/2013-01-02\" ], "
+ + " \"pagingSpec\":{\"pagingIdentifiers\": {}, \"threshold\":5} }";
+
// Select query results
private static final String SELECT_QUERY_RESULTS =
"[{ "
- + " \"timestamp\" : \"2013-01-01T00:00:00.000Z\", "
- + " \"result\" : { "
- + " \"pagingIdentifiers\" : { "
- + " \"wikipedia_2012-12-29T00:00:00.000Z_2013-01-10T08:00:00.000Z_2013-01-10T08:13:47.830Z_v9\" : 4 }, "
- + " \"events\" : [ { "
- + " \"segmentId\" : \"wikipedia_editstream_2012-12-29T00:00:00.000Z_2013-01-10T08:00:00.000Z_2013-01-10T08:13:47.830Z_v9\", "
- + " \"offset\" : 0, "
- + " \"event\" : { "
- + " \"timestamp\" : \"2013-01-01T00:00:00.000Z\", "
- + " \"robot\" : \"1\", "
- + " \"namespace\" : \"article\", "
- + " \"anonymous\" : \"0\", "
- + " \"unpatrolled\" : \"0\", "
- + " \"page\" : \"11._korpus_(NOVJ)\", "
- + " \"language\" : \"sl\", "
- + " \"newpage\" : \"0\", "
- + " \"user\" : \"EmausBot\", "
- + " \"count\" : 1.0, "
- + " \"added\" : 39.0, "
- + " \"delta\" : 39.0, "
- + " \"variation\" : 39.0, "
- + " \"deleted\" : 0.0 "
- + " } "
- + " }, { "
- + " \"segmentId\" : \"wikipedia_2012-12-29T00:00:00.000Z_2013-01-10T08:00:00.000Z_2013-01-10T08:13:47.830Z_v9\", "
- + " \"offset\" : 1, "
- + " \"event\" : { "
- + " \"timestamp\" : \"2013-01-01T00:00:00.000Z\", "
- + " \"robot\" : \"0\", "
- + " \"namespace\" : \"article\", "
- + " \"anonymous\" : \"0\", "
- + " \"unpatrolled\" : \"0\", "
- + " \"page\" : \"112_U.S._580\", "
- + " \"language\" : \"en\", "
- + " \"newpage\" : \"1\", "
- + " \"user\" : \"MZMcBride\", "
- + " \"count\" : 1.0, "
- + " \"added\" : 70.0, "
- + " \"delta\" : 70.0, "
- + " \"variation\" : 70.0, "
- + " \"deleted\" : 0.0 "
- + " } "
- + " }, { "
- + " \"segmentId\" : \"wikipedia_2012-12-29T00:00:00.000Z_2013-01-10T08:00:00.000Z_2013-01-10T08:13:47.830Z_v9\", "
- + " \"offset\" : 2, "
- + " \"event\" : { "
- + " \"timestamp\" : \"2013-01-01T00:00:12.000Z\", "
- + " \"robot\" : \"0\", "
- + " \"namespace\" : \"article\", "
- + " \"anonymous\" : \"0\", "
- + " \"unpatrolled\" : \"0\", "
- + " \"page\" : \"113_U.S._243\", "
- + " \"language\" : \"en\", "
- + " \"newpage\" : \"1\", "
- + " \"user\" : \"MZMcBride\", "
- + " \"count\" : 1.0, "
- + " \"added\" : 77.0, "
- + " \"delta\" : 77.0, "
- + " \"variation\" : 77.0, "
- + " \"deleted\" : 0.0 "
- + " } "
- + " }, { "
- + " \"segmentId\" : \"wikipedia_2012-12-29T00:00:00.000Z_2013-01-10T08:00:00.000Z_2013-01-10T08:13:47.830Z_v9\", "
- + " \"offset\" : 3, "
- + " \"event\" : { "
- + " \"timestamp\" : \"2013-01-01T00:00:12.000Z\", "
- + " \"robot\" : \"0\", "
- + " \"namespace\" : \"article\", "
- + " \"anonymous\" : \"0\", "
- + " \"unpatrolled\" : \"0\", "
- + " \"page\" : \"113_U.S._73\", "
- + " \"language\" : \"en\", "
- + " \"newpage\" : \"1\", "
- + " \"user\" : \"MZMcBride\", "
- + " \"count\" : 1.0, "
- + " \"added\" : 70.0, "
- + " \"delta\" : 70.0, "
- + " \"variation\" : 70.0, "
- + " \"deleted\" : 0.0 "
- + " } "
- + " }, { "
- + " \"segmentId\" : \"wikipedia_2012-12-29T00:00:00.000Z_2013-01-10T08:00:00.000Z_2013-01-10T08:13:47.830Z_v9\", "
- + " \"offset\" : 4, "
- + " \"event\" : { "
- + " \"timestamp\" : \"2013-01-01T00:00:12.000Z\", "
- + " \"robot\" : \"0\", "
- + " \"namespace\" : \"article\", "
- + " \"anonymous\" : \"0\", "
- + " \"unpatrolled\" : \"0\", "
- + " \"page\" : \"113_U.S._756\", "
- + " \"language\" : \"en\", "
- + " \"newpage\" : \"1\", "
- + " \"user\" : \"MZMcBride\", "
- + " \"count\" : 1.0, "
- + " \"added\" : 68.0, "
- + " \"delta\" : 68.0, "
- + " \"variation\" : 68.0, "
- + " \"deleted\" : 0.0 "
- + " } "
- + " } ] }} ]";
+ + " \"timestamp\" : \"2013-01-01T00:00:00.000Z\", "
+ + " \"result\" : { "
+ + " \"pagingIdentifiers\" : { "
+ + " \"wikipedia_2012-12-29T00:00:00.000Z_2013-01-10T08:00:00.000Z_2013-01-10T08:13:47.830Z_v9\" : 4 }, "
+ + " \"events\" : [ { "
+ + " \"segmentId\" : \"wikipedia_editstream_2012-12-29T00:00:00.000Z_2013-01-10T08:00:00.000Z_2013-01-10T08:13:47.830Z_v9\", "
+ + " \"offset\" : 0, "
+ + " \"event\" : { "
+ + " \"timestamp\" : \"2013-01-01T00:00:00.000Z\", "
+ + " \"robot\" : \"1\", "
+ + " \"namespace\" : \"article\", "
+ + " \"anonymous\" : \"0\", "
+ + " \"unpatrolled\" : \"0\", "
+ + " \"page\" : \"11._korpus_(NOVJ)\", "
+ + " \"language\" : \"sl\", "
+ + " \"newpage\" : \"0\", "
+ + " \"user\" : \"EmausBot\", "
+ + " \"count\" : 1.0, "
+ + " \"added\" : 39.0, "
+ + " \"delta\" : 39.0, "
+ + " \"variation\" : 39.0, "
+ + " \"deleted\" : 0.0 "
+ + " } "
+ + " }, { "
+ + " \"segmentId\" : \"wikipedia_2012-12-29T00:00:00.000Z_2013-01-10T08:00:00.000Z_2013-01-10T08:13:47.830Z_v9\", "
+ + " \"offset\" : 1, "
+ + " \"event\" : { "
+ + " \"timestamp\" : \"2013-01-01T00:00:00.000Z\", "
+ + " \"robot\" : \"0\", "
+ + " \"namespace\" : \"article\", "
+ + " \"anonymous\" : \"0\", "
+ + " \"unpatrolled\" : \"0\", "
+ + " \"page\" : \"112_U.S._580\", "
+ + " \"language\" : \"en\", "
+ + " \"newpage\" : \"1\", "
+ + " \"user\" : \"MZMcBride\", "
+ + " \"count\" : 1.0, "
+ + " \"added\" : 70.0, "
+ + " \"delta\" : 70.0, "
+ + " \"variation\" : 70.0, "
+ + " \"deleted\" : 0.0 "
+ + " } "
+ + " }, { "
+ + " \"segmentId\" : \"wikipedia_2012-12-29T00:00:00.000Z_2013-01-10T08:00:00.000Z_2013-01-10T08:13:47.830Z_v9\", "
+ + " \"offset\" : 2, "
+ + " \"event\" : { "
+ + " \"timestamp\" : \"2013-01-01T00:00:12.000Z\", "
+ + " \"robot\" : \"0\", "
+ + " \"namespace\" : \"article\", "
+ + " \"anonymous\" : \"0\", "
+ + " \"unpatrolled\" : \"0\", "
+ + " \"page\" : \"113_U.S._243\", "
+ + " \"language\" : \"en\", "
+ + " \"newpage\" : \"1\", "
+ + " \"user\" : \"MZMcBride\", "
+ + " \"count\" : 1.0, "
+ + " \"added\" : 77.0, "
+ + " \"delta\" : 77.0, "
+ + " \"variation\" : 77.0, "
+ + " \"deleted\" : 0.0 "
+ + " } "
+ + " }, { "
+ + " \"segmentId\" : \"wikipedia_2012-12-29T00:00:00.000Z_2013-01-10T08:00:00.000Z_2013-01-10T08:13:47.830Z_v9\", "
+ + " \"offset\" : 3, "
+ + " \"event\" : { "
+ + " \"timestamp\" : \"2013-01-01T00:00:12.000Z\", "
+ + " \"robot\" : \"0\", "
+ + " \"namespace\" : \"article\", "
+ + " \"anonymous\" : \"0\", "
+ + " \"unpatrolled\" : \"0\", "
+ + " \"page\" : \"113_U.S._73\", "
+ + " \"language\" : \"en\", "
+ + " \"newpage\" : \"1\", "
+ + " \"user\" : \"MZMcBride\", "
+ + " \"count\" : 1.0, "
+ + " \"added\" : 70.0, "
+ + " \"delta\" : 70.0, "
+ + " \"variation\" : 70.0, "
+ + " \"deleted\" : 0.0 "
+ + " } "
+ + " }, { "
+ + " \"segmentId\" : \"wikipedia_2012-12-29T00:00:00.000Z_2013-01-10T08:00:00.000Z_2013-01-10T08:13:47.830Z_v9\", "
+ + " \"offset\" : 4, "
+ + " \"event\" : { "
+ + " \"timestamp\" : \"2013-01-01T00:00:12.000Z\", "
+ + " \"robot\" : \"0\", "
+ + " \"namespace\" : \"article\", "
+ + " \"anonymous\" : \"0\", "
+ + " \"unpatrolled\" : \"0\", "
+ + " \"page\" : \"113_U.S._756\", "
+ + " \"language\" : \"en\", "
+ + " \"newpage\" : \"1\", "
+ + " \"user\" : \"MZMcBride\", "
+ + " \"count\" : 1.0, "
+ + " \"added\" : 68.0, "
+ + " \"delta\" : 68.0, "
+ + " \"variation\" : 68.0, "
+ + " \"deleted\" : 0.0 "
+ + " } "
+ + " } ] }} ]";
+
// Select query results as records
private static final Object[][] SELECT_QUERY_RESULTS_RECORDS = new Object[][] {
- new Object[] { new TimestampWritable(new Timestamp(1356998400000L)), new Text("1"), new Text("article"), new Text("0"), new Text("0"),
- new Text("11._korpus_(NOVJ)"), new Text("sl"), new Text("0"), new Text("EmausBot"),
- new FloatWritable(1.0F), new FloatWritable(39.0F), new FloatWritable(39.0F), new FloatWritable(39.0F), new FloatWritable(0.0F) } ,
- new Object[] { new TimestampWritable(new Timestamp(1356998400000L)), new Text("0"), new Text("article"), new Text("0"), new Text("0"),
- new Text("112_U.S._580"), new Text("en"), new Text("1"), new Text("MZMcBride"),
- new FloatWritable(1.0F), new FloatWritable(70.0F), new FloatWritable(70.0F), new FloatWritable(70.0F), new FloatWritable(0.0F) } ,
- new Object[] { new TimestampWritable(new Timestamp(1356998412000L)), new Text("0"), new Text("article"), new Text("0"), new Text("0"),
- new Text("113_U.S._243"), new Text("en"), new Text("1"), new Text("MZMcBride"),
- new FloatWritable(1.0F), new FloatWritable(77.0F), new FloatWritable(77.0F), new FloatWritable(77.0F), new FloatWritable(0.0F) } ,
- new Object[] { new TimestampWritable(new Timestamp(1356998412000L)), new Text("0"), new Text("article"), new Text("0"), new Text("0"),
- new Text("113_U.S._73"), new Text("en"), new Text("1"), new Text("MZMcBride"),
- new FloatWritable(1.0F), new FloatWritable(70.0F), new FloatWritable(70.0F), new FloatWritable(70.0F), new FloatWritable(0.0F) } ,
- new Object[] { new TimestampWritable(new Timestamp(1356998412000L)), new Text("0"), new Text("article"), new Text("0"), new Text("0"),
- new Text("113_U.S._756"), new Text("en"), new Text("1"), new Text("MZMcBride"),
- new FloatWritable(1.0F), new FloatWritable(68.0F), new FloatWritable(68.0F), new FloatWritable(68.0F), new FloatWritable(0.0F) }
+ new Object[] { new TimestampWritable(new Timestamp(1356998400000L)), new Text("1"),
+ new Text("article"), new Text("0"), new Text("0"),
+ new Text("11._korpus_(NOVJ)"), new Text("sl"), new Text("0"),
+ new Text("EmausBot"),
+ new FloatWritable(1.0F), new FloatWritable(39.0F), new FloatWritable(39.0F),
+ new FloatWritable(39.0F), new FloatWritable(0.0F) },
+ new Object[] { new TimestampWritable(new Timestamp(1356998400000L)), new Text("0"),
+ new Text("article"), new Text("0"), new Text("0"),
+ new Text("112_U.S._580"), new Text("en"), new Text("1"), new Text("MZMcBride"),
+ new FloatWritable(1.0F), new FloatWritable(70.0F), new FloatWritable(70.0F),
+ new FloatWritable(70.0F), new FloatWritable(0.0F) },
+ new Object[] { new TimestampWritable(new Timestamp(1356998412000L)), new Text("0"),
+ new Text("article"), new Text("0"), new Text("0"),
+ new Text("113_U.S._243"), new Text("en"), new Text("1"), new Text("MZMcBride"),
+ new FloatWritable(1.0F), new FloatWritable(77.0F), new FloatWritable(77.0F),
+ new FloatWritable(77.0F), new FloatWritable(0.0F) },
+ new Object[] { new TimestampWritable(new Timestamp(1356998412000L)), new Text("0"),
+ new Text("article"), new Text("0"), new Text("0"),
+ new Text("113_U.S._73"), new Text("en"), new Text("1"), new Text("MZMcBride"),
+ new FloatWritable(1.0F), new FloatWritable(70.0F), new FloatWritable(70.0F),
+ new FloatWritable(70.0F), new FloatWritable(0.0F) },
+ new Object[] { new TimestampWritable(new Timestamp(1356998412000L)), new Text("0"),
+ new Text("article"), new Text("0"), new Text("0"),
+ new Text("113_U.S._756"), new Text("en"), new Text("1"), new Text("MZMcBride"),
+ new FloatWritable(1.0F), new FloatWritable(68.0F), new FloatWritable(68.0F),
+ new FloatWritable(68.0F), new FloatWritable(0.0F) }
};
-
/**
* Test the default behavior of the objects and object inspectors.
- * @throws IOException
- * @throws IllegalAccessException
- * @throws IllegalArgumentException
- * @throws SecurityException
- * @throws NoSuchFieldException
- * @throws JsonMappingException
- * @throws JsonParseException
- * @throws InvocationTargetException
- * @throws NoSuchMethodException
+ * @throws IOException
+ * @throws IllegalAccessException
+ * @throws IllegalArgumentException
+ * @throws SecurityException
+ * @throws NoSuchFieldException
+ * @throws JsonMappingException
+ * @throws JsonParseException
+ * @throws InvocationTargetException
+ * @throws NoSuchMethodException
*/
@Test
public void testDruidSerDe()
@@ -457,25 +491,31 @@ public class TestDruidSerDe {
tbl = createPropertiesQuery("sample_datasource", Query.TIMESERIES, TIMESERIES_QUERY);
SerDeUtils.initializeSerDe(serDe, conf, tbl, null);
deserializeQueryResults(serDe, Query.TIMESERIES, TIMESERIES_QUERY,
- TIMESERIES_QUERY_RESULTS, TIMESERIES_QUERY_RESULTS_RECORDS);
+ TIMESERIES_QUERY_RESULTS, TIMESERIES_QUERY_RESULTS_RECORDS
+ );
// TopN query
tbl = createPropertiesQuery("sample_data", Query.TOPN, TOPN_QUERY);
SerDeUtils.initializeSerDe(serDe, conf, tbl, null);
deserializeQueryResults(serDe, Query.TOPN, TOPN_QUERY,
- TOPN_QUERY_RESULTS, TOPN_QUERY_RESULTS_RECORDS);
+ TOPN_QUERY_RESULTS, TOPN_QUERY_RESULTS_RECORDS
+ );
// GroupBy query
tbl = createPropertiesQuery("sample_datasource", Query.GROUP_BY, GROUP_BY_QUERY);
SerDeUtils.initializeSerDe(serDe, conf, tbl, null);
deserializeQueryResults(serDe, Query.GROUP_BY, GROUP_BY_QUERY,
- GROUP_BY_QUERY_RESULTS, GROUP_BY_QUERY_RESULTS_RECORDS);
+ GROUP_BY_QUERY_RESULTS, GROUP_BY_QUERY_RESULTS_RECORDS
+ );
// Select query
tbl = createPropertiesQuery("wikipedia", Query.SELECT, SELECT_QUERY);
SerDeUtils.initializeSerDe(serDe, conf, tbl, null);
deserializeQueryResults(serDe, Query.SELECT, SELECT_QUERY,
- SELECT_QUERY_RESULTS, SELECT_QUERY_RESULTS_RECORDS);
+ SELECT_QUERY_RESULTS, SELECT_QUERY_RESULTS_RECORDS
+ );
}
- private static Properties createPropertiesQuery(String dataSource, String queryType, String jsonQuery) {
+ private static Properties createPropertiesQuery(String dataSource, String queryType,
+ String jsonQuery
+ ) {
Properties tbl = new Properties();
// Set the configuration parameters
@@ -486,14 +526,15 @@ public class TestDruidSerDe {
}
private static void deserializeQueryResults(DruidSerDe serDe, String queryType, String jsonQuery,
- String resultString, Object[][] records) throws SerDeException, JsonParseException,
+ String resultString, Object[][] records
+ ) throws SerDeException, JsonParseException,
JsonMappingException, IOException, NoSuchFieldException, SecurityException,
IllegalArgumentException, IllegalAccessException, InterruptedException,
NoSuchMethodException, InvocationTargetException {
// Initialize
Query<?> query = null;
- DruidQueryRecordReader<?,?> reader = null;
+ DruidQueryRecordReader<?, ?> reader = null;
List<?> resultsList = null;
ObjectMapper mapper = new DefaultObjectMapper();
switch (queryType) {
@@ -501,25 +542,33 @@ public class TestDruidSerDe {
query = mapper.readValue(jsonQuery, TimeseriesQuery.class);
reader = new DruidTimeseriesQueryRecordReader();
resultsList = mapper.readValue(resultString,
- new TypeReference<List<Result<TimeseriesResultValue>>>() {});
+ new TypeReference<List<Result<TimeseriesResultValue>>>() {
+ }
+ );
break;
case Query.TOPN:
query = mapper.readValue(jsonQuery, TopNQuery.class);
reader = new DruidTopNQueryRecordReader();
resultsList = mapper.readValue(resultString,
- new TypeReference<List<Result<TopNResultValue>>>() {});
+ new TypeReference<List<Result<TopNResultValue>>>() {
+ }
+ );
break;
case Query.GROUP_BY:
query = mapper.readValue(jsonQuery, GroupByQuery.class);
reader = new DruidGroupByQueryRecordReader();
resultsList = mapper.readValue(resultString,
- new TypeReference<List<Row>>() {});
+ new TypeReference<List<Row>>() {
+ }
+ );
break;
case Query.SELECT:
query = mapper.readValue(jsonQuery, SelectQuery.class);
reader = new DruidSelectQueryRecordReader();
resultsList = mapper.readValue(resultString,
- new TypeReference<List<Result<SelectResultValue>>>() {});
+ new TypeReference<List<Result<SelectResultValue>>>() {
+ }
+ );
break;
}
@@ -534,7 +583,7 @@ public class TestDruidSerDe {
}
Field field2 = DruidQueryRecordReader.class.getDeclaredField("results");
field2.setAccessible(true);
-
+
// Get the row structure
StructObjectInspector oi = (StructObjectInspector) serDe.getObjectInspector();
List<? extends StructField> fieldRefs = oi.getAllStructFieldRefs();
http://git-wip-us.apache.org/repos/asf/hive/blob/590687bc/druid-handler/src/test/org/apache/hadoop/hive/druid/TestHiveDruidQueryBasedInputFormat.java
----------------------------------------------------------------------
diff --git a/druid-handler/src/test/org/apache/hadoop/hive/druid/TestHiveDruidQueryBasedInputFormat.java b/druid-handler/src/test/org/apache/hadoop/hive/druid/TestHiveDruidQueryBasedInputFormat.java
index 9ccd48e..4fde3eb 100644
--- a/druid-handler/src/test/org/apache/hadoop/hive/druid/TestHiveDruidQueryBasedInputFormat.java
+++ b/druid-handler/src/test/org/apache/hadoop/hive/druid/TestHiveDruidQueryBasedInputFormat.java
@@ -22,6 +22,7 @@ import java.util.ArrayList;
import java.util.Arrays;
import java.util.List;
+import org.apache.hadoop.hive.druid.io.DruidQueryBasedInputFormat;
import org.joda.time.Interval;
import org.joda.time.chrono.ISOChronology;
import org.junit.Test;
@@ -29,14 +30,15 @@ import org.junit.Test;
import junit.framework.TestCase;
public class TestHiveDruidQueryBasedInputFormat extends TestCase {
-
+
@SuppressWarnings("unchecked")
@Test
public void testCreateSplitsIntervals() throws Exception {
- HiveDruidQueryBasedInputFormat input = new HiveDruidQueryBasedInputFormat();
+ DruidQueryBasedInputFormat input = new DruidQueryBasedInputFormat();
- Method method1 = HiveDruidQueryBasedInputFormat.class.getDeclaredMethod("createSplitsIntervals",
- List.class, int.class);
+ Method method1 = DruidQueryBasedInputFormat.class.getDeclaredMethod("createSplitsIntervals",
+ List.class, int.class
+ );
method1.setAccessible(true);
List<Interval> intervals;
@@ -48,10 +50,14 @@ public class TestHiveDruidQueryBasedInputFormat extends TestCase {
intervals.add(new Interval(1262304000000L, 1293840000000L, ISOChronology.getInstanceUTC()));
resultList = (List<List<Interval>>) method1.invoke(input, intervals, 4);
expectedResultList = new ArrayList<>();
- expectedResultList.add(Arrays.asList(new Interval(1262304000000L, 1270188000000L, ISOChronology.getInstanceUTC())));
- expectedResultList.add(Arrays.asList(new Interval(1270188000000L, 1278072000000L, ISOChronology.getInstanceUTC())));
- expectedResultList.add(Arrays.asList(new Interval(1278072000000L, 1285956000000L, ISOChronology.getInstanceUTC())));
- expectedResultList.add(Arrays.asList(new Interval(1285956000000L, 1293840000000L, ISOChronology.getInstanceUTC())));
+ expectedResultList.add(Arrays
+ .asList(new Interval(1262304000000L, 1270188000000L, ISOChronology.getInstanceUTC())));
+ expectedResultList.add(Arrays
+ .asList(new Interval(1270188000000L, 1278072000000L, ISOChronology.getInstanceUTC())));
+ expectedResultList.add(Arrays
+ .asList(new Interval(1278072000000L, 1285956000000L, ISOChronology.getInstanceUTC())));
+ expectedResultList.add(Arrays
+ .asList(new Interval(1285956000000L, 1293840000000L, ISOChronology.getInstanceUTC())));
assertEquals(expectedResultList, resultList);
// Test 2 : two splits, create 4
@@ -60,11 +66,16 @@ public class TestHiveDruidQueryBasedInputFormat extends TestCase {
intervals.add(new Interval(1325376000000L, 1356998400000L, ISOChronology.getInstanceUTC()));
resultList = (List<List<Interval>>) method1.invoke(input, intervals, 4);
expectedResultList = new ArrayList<>();
- expectedResultList.add(Arrays.asList(new Interval(1262304000000L, 1278093600000L, ISOChronology.getInstanceUTC())));
- expectedResultList.add(Arrays.asList(new Interval(1278093600000L, 1293840000000L, ISOChronology.getInstanceUTC()),
- new Interval(1325376000000L, 1325419200000L, ISOChronology.getInstanceUTC())));
- expectedResultList.add(Arrays.asList(new Interval(1325419200000L, 1341208800000L, ISOChronology.getInstanceUTC())));
- expectedResultList.add(Arrays.asList(new Interval(1341208800000L, 1356998400000L, ISOChronology.getInstanceUTC())));
+ expectedResultList.add(Arrays
+ .asList(new Interval(1262304000000L, 1278093600000L, ISOChronology.getInstanceUTC())));
+ expectedResultList.add(Arrays
+ .asList(new Interval(1278093600000L, 1293840000000L, ISOChronology.getInstanceUTC()),
+ new Interval(1325376000000L, 1325419200000L, ISOChronology.getInstanceUTC())
+ ));
+ expectedResultList.add(Arrays
+ .asList(new Interval(1325419200000L, 1341208800000L, ISOChronology.getInstanceUTC())));
+ expectedResultList.add(Arrays
+ .asList(new Interval(1341208800000L, 1356998400000L, ISOChronology.getInstanceUTC())));
assertEquals(expectedResultList, resultList);
// Test 3 : two splits, create 5
@@ -73,29 +84,49 @@ public class TestHiveDruidQueryBasedInputFormat extends TestCase {
intervals.add(new Interval(1325376000000L, 1356998400000L, ISOChronology.getInstanceUTC()));
resultList = (List<List<Interval>>) method1.invoke(input, intervals, 5);
expectedResultList = new ArrayList<>();
- expectedResultList.add(Arrays.asList(new Interval(1262304000000L, 1274935680000L, ISOChronology.getInstanceUTC())));
- expectedResultList.add(Arrays.asList(new Interval(1274935680000L, 1287567360000L, ISOChronology.getInstanceUTC())));
- expectedResultList.add(Arrays.asList(new Interval(1287567360000L, 1293840000000L, ISOChronology.getInstanceUTC()),
- new Interval(1325376000000L, 1331735040000L, ISOChronology.getInstanceUTC())));
- expectedResultList.add(Arrays.asList(new Interval(1331735040000L, 1344366720000L, ISOChronology.getInstanceUTC())));
- expectedResultList.add(Arrays.asList(new Interval(1344366720000L, 1356998400000L, ISOChronology.getInstanceUTC())));
+ expectedResultList.add(Arrays
+ .asList(new Interval(1262304000000L, 1274935680000L, ISOChronology.getInstanceUTC())));
+ expectedResultList.add(Arrays
+ .asList(new Interval(1274935680000L, 1287567360000L, ISOChronology.getInstanceUTC())));
+ expectedResultList.add(Arrays
+ .asList(new Interval(1287567360000L, 1293840000000L, ISOChronology.getInstanceUTC()),
+ new Interval(1325376000000L, 1331735040000L, ISOChronology.getInstanceUTC())
+ ));
+ expectedResultList.add(Arrays
+ .asList(new Interval(1331735040000L, 1344366720000L, ISOChronology.getInstanceUTC())));
+ expectedResultList.add(Arrays
+ .asList(new Interval(1344366720000L, 1356998400000L, ISOChronology.getInstanceUTC())));
assertEquals(expectedResultList, resultList);
// Test 4 : three splits, different ranges, create 6
intervals = new ArrayList<>();
- intervals.add(new Interval(1199145600000L, 1201824000000L, ISOChronology.getInstanceUTC())); // one month
- intervals.add(new Interval(1325376000000L, 1356998400000L, ISOChronology.getInstanceUTC())); // one year
- intervals.add(new Interval(1407283200000L, 1407888000000L, ISOChronology.getInstanceUTC())); // 7 days
+ intervals.add(new Interval(1199145600000L, 1201824000000L,
+ ISOChronology.getInstanceUTC()
+ )); // one month
+ intervals.add(new Interval(1325376000000L, 1356998400000L,
+ ISOChronology.getInstanceUTC()
+ )); // one year
+ intervals.add(new Interval(1407283200000L, 1407888000000L,
+ ISOChronology.getInstanceUTC()
+ )); // 7 days
resultList = (List<List<Interval>>) method1.invoke(input, intervals, 6);
expectedResultList = new ArrayList<>();
- expectedResultList.add(Arrays.asList(new Interval(1199145600000L, 1201824000000L, ISOChronology.getInstanceUTC()),
- new Interval(1325376000000L, 1328515200000L, ISOChronology.getInstanceUTC())));
- expectedResultList.add(Arrays.asList(new Interval(1328515200000L, 1334332800000L, ISOChronology.getInstanceUTC())));
- expectedResultList.add(Arrays.asList(new Interval(1334332800000L, 1340150400000L, ISOChronology.getInstanceUTC())));
- expectedResultList.add(Arrays.asList(new Interval(1340150400000L, 1345968000000L, ISOChronology.getInstanceUTC())));
- expectedResultList.add(Arrays.asList(new Interval(1345968000000L, 1351785600000L, ISOChronology.getInstanceUTC())));
- expectedResultList.add(Arrays.asList(new Interval(1351785600000L, 1356998400000L, ISOChronology.getInstanceUTC()),
- new Interval(1407283200000L, 1407888000000L, ISOChronology.getInstanceUTC())));
+ expectedResultList.add(Arrays
+ .asList(new Interval(1199145600000L, 1201824000000L, ISOChronology.getInstanceUTC()),
+ new Interval(1325376000000L, 1328515200000L, ISOChronology.getInstanceUTC())
+ ));
+ expectedResultList.add(Arrays
+ .asList(new Interval(1328515200000L, 1334332800000L, ISOChronology.getInstanceUTC())));
+ expectedResultList.add(Arrays
+ .asList(new Interval(1334332800000L, 1340150400000L, ISOChronology.getInstanceUTC())));
+ expectedResultList.add(Arrays
+ .asList(new Interval(1340150400000L, 1345968000000L, ISOChronology.getInstanceUTC())));
+ expectedResultList.add(Arrays
+ .asList(new Interval(1345968000000L, 1351785600000L, ISOChronology.getInstanceUTC())));
+ expectedResultList.add(Arrays
+ .asList(new Interval(1351785600000L, 1356998400000L, ISOChronology.getInstanceUTC()),
+ new Interval(1407283200000L, 1407888000000L, ISOChronology.getInstanceUTC())
+ ));
assertEquals(expectedResultList, resultList);
}
http://git-wip-us.apache.org/repos/asf/hive/blob/590687bc/druid-handler/src/test/org/apache/hadoop/hive/ql/io/DruidRecordWriterTest.java
----------------------------------------------------------------------
diff --git a/druid-handler/src/test/org/apache/hadoop/hive/ql/io/DruidRecordWriterTest.java b/druid-handler/src/test/org/apache/hadoop/hive/ql/io/DruidRecordWriterTest.java
new file mode 100644
index 0000000..a4272ee
--- /dev/null
+++ b/druid-handler/src/test/org/apache/hadoop/hive/ql/io/DruidRecordWriterTest.java
@@ -0,0 +1,221 @@
+package org.apache.hadoop.hive.ql.io;
+
+import com.fasterxml.jackson.databind.ObjectMapper;
+import com.google.common.base.Function;
+import com.google.common.collect.ImmutableList;
+import com.google.common.collect.ImmutableMap;
+import com.google.common.collect.Lists;
+import com.metamx.common.Granularity;
+import io.druid.data.input.Firehose;
+import io.druid.data.input.InputRow;
+import io.druid.data.input.impl.DimensionSchema;
+import io.druid.data.input.impl.DimensionsSpec;
+import io.druid.data.input.impl.InputRowParser;
+import io.druid.data.input.impl.MapInputRowParser;
+import io.druid.data.input.impl.StringDimensionSchema;
+import io.druid.data.input.impl.TimeAndDimsParseSpec;
+import io.druid.data.input.impl.TimestampSpec;
+import io.druid.granularity.QueryGranularities;
+import io.druid.query.aggregation.AggregatorFactory;
+import io.druid.query.aggregation.LongSumAggregatorFactory;
+import io.druid.query.aggregation.hyperloglog.HyperUniquesAggregatorFactory;
+import io.druid.segment.QueryableIndex;
+import io.druid.segment.QueryableIndexStorageAdapter;
+import io.druid.segment.indexing.DataSchema;
+import io.druid.segment.indexing.RealtimeTuningConfig;
+import io.druid.segment.indexing.granularity.UniformGranularitySpec;
+import io.druid.segment.loading.DataSegmentPusher;
+import io.druid.segment.loading.LocalDataSegmentPuller;
+import io.druid.segment.loading.LocalDataSegmentPusher;
+import io.druid.segment.loading.LocalDataSegmentPusherConfig;
+import io.druid.segment.loading.SegmentLoadingException;
+import io.druid.segment.realtime.firehose.IngestSegmentFirehose;
+import io.druid.segment.realtime.firehose.WindowedStorageAdapter;
+import io.druid.timeline.DataSegment;
+import org.apache.calcite.adapter.druid.DruidTable;
+import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.fs.FileSystem;
+import org.apache.hadoop.fs.LocalFileSystem;
+import org.apache.hadoop.fs.Path;
+import org.apache.hadoop.hive.conf.Constants;
+import org.apache.hadoop.hive.druid.DruidStorageHandler;
+import org.apache.hadoop.hive.druid.DruidStorageHandlerUtils;
+import org.apache.hadoop.hive.druid.io.DruidRecordWriter;
+import org.apache.hadoop.hive.druid.serde.DruidWritable;
+import org.joda.time.DateTime;
+import org.joda.time.Interval;
+import org.junit.Assert;
+import org.junit.Ignore;
+import org.junit.Rule;
+import org.junit.Test;
+import org.junit.rules.TemporaryFolder;
+
+import javax.annotation.Nullable;
+import java.io.File;
+import java.io.IOException;
+import java.util.List;
+import java.util.Map;
+
+public class DruidRecordWriterTest {
+ private ObjectMapper objectMapper = DruidStorageHandlerUtils.JSON_MAPPER;
+
+ private static final Interval INTERVAL_FULL = new Interval("2014-10-22T00:00:00Z/P1D");
+
+ @Rule
+ public TemporaryFolder temporaryFolder = new TemporaryFolder();
+
+ private DruidRecordWriter druidRecordWriter;
+
+ final List<ImmutableMap<String, Object>> expectedRows = ImmutableList.of(
+ ImmutableMap.<String, Object>of(
+ DruidTable.DEFAULT_TIMESTAMP_COLUMN,
+ DateTime.parse("2014-10-22T00:00:00.000Z").getMillis(),
+ "host", ImmutableList.of("a.example.com"),
+ "visited_sum", 190L,
+ "unique_hosts", 1.0d
+ ),
+ ImmutableMap.<String, Object>of(
+ DruidTable.DEFAULT_TIMESTAMP_COLUMN,
+ DateTime.parse("2014-10-22T01:00:00.000Z").getMillis(),
+ "host", ImmutableList.of("b.example.com"),
+ "visited_sum", 175L,
+ "unique_hosts", 1.0d
+ ),
+ ImmutableMap.<String, Object>of(
+ DruidTable.DEFAULT_TIMESTAMP_COLUMN,
+ DateTime.parse("2014-10-22T02:00:00.000Z").getMillis(),
+ "host", ImmutableList.of("c.example.com"),
+ "visited_sum", 270L,
+ "unique_hosts", 1.0d
+ )
+ );
+
+ // This test need this patch https://github.com/druid-io/druid/pull/3483
+ @Ignore
+ @Test
+ public void testWrite() throws IOException, SegmentLoadingException {
+
+ final String dataSourceName = "testDataSource";
+ final File segmentOutputDir = temporaryFolder.newFolder();
+ final File workingDir = temporaryFolder.newFolder();
+ Configuration config = new Configuration();
+
+ final InputRowParser inputRowParser = new MapInputRowParser(new TimeAndDimsParseSpec(
+ new TimestampSpec(DruidTable.DEFAULT_TIMESTAMP_COLUMN, "auto", null),
+ new DimensionsSpec(ImmutableList.<DimensionSchema>of(new StringDimensionSchema("host")),
+ null, null
+ )
+ ));
+ final Map<String, Object> parserMap = objectMapper.convertValue(inputRowParser, Map.class);
+
+ DataSchema dataSchema = new DataSchema(
+ dataSourceName,
+ parserMap,
+ new AggregatorFactory[] {
+ new LongSumAggregatorFactory("visited_sum", "visited_sum"),
+ new HyperUniquesAggregatorFactory("unique_hosts", "unique_hosts")
+ },
+ new UniformGranularitySpec(
+ Granularity.DAY, QueryGranularities.NONE, ImmutableList.of(INTERVAL_FULL)
+ ),
+ objectMapper
+ );
+
+ RealtimeTuningConfig tuningConfig = RealtimeTuningConfig
+ .makeDefaultTuningConfig(temporaryFolder.newFolder());
+ LocalFileSystem localFileSystem = FileSystem.getLocal(config);
+ DataSegmentPusher dataSegmentPusher = new LocalDataSegmentPusher(
+ new LocalDataSegmentPusherConfig() {
+ @Override
+ public File getStorageDirectory() {return segmentOutputDir;}
+ }, objectMapper);
+
+ Path segmentDescriptroPath = new Path(workingDir.getAbsolutePath(),
+ DruidStorageHandler.SEGMENTS_DESCRIPTOR_DIR_NAME
+ );
+ druidRecordWriter = new DruidRecordWriter(dataSchema, tuningConfig, dataSegmentPusher, 20,
+ segmentDescriptroPath, localFileSystem
+ );
+
+ List<DruidWritable> druidWritables = Lists.transform(expectedRows,
+ new Function<ImmutableMap<String, Object>, DruidWritable>() {
+ @Nullable
+ @Override
+ public DruidWritable apply(@Nullable ImmutableMap<String, Object> input
+ ) {
+ return new DruidWritable(ImmutableMap.<String, Object>builder().putAll(input)
+ .put(Constants.DRUID_TIMESTAMP_GRANULARITY_COL_NAME,
+ Granularity.DAY.truncate(
+ new DateTime((long) input
+ .get(DruidTable.DEFAULT_TIMESTAMP_COLUMN)))
+ .getMillis()
+ ).build());
+ }
+ }
+ );
+ for (DruidWritable druidWritable : druidWritables) {
+ druidRecordWriter.write(druidWritable);
+ }
+ druidRecordWriter.close(false);
+ List<DataSegment> dataSegmentList = DruidStorageHandlerUtils
+ .getPublishedSegments(segmentDescriptroPath, config);
+ Assert.assertEquals(1, dataSegmentList.size());
+ File tmpUnzippedSegmentDir = temporaryFolder.newFolder();
+ new LocalDataSegmentPuller().getSegmentFiles(dataSegmentList.get(0), tmpUnzippedSegmentDir);
+ final QueryableIndex queryableIndex = DruidStorageHandlerUtils.INDEX_IO
+ .loadIndex(tmpUnzippedSegmentDir);
+
+ QueryableIndexStorageAdapter adapter = new QueryableIndexStorageAdapter(queryableIndex);
+
+ Firehose firehose = new IngestSegmentFirehose(
+ ImmutableList.of(new WindowedStorageAdapter(adapter, adapter.getInterval())),
+ ImmutableList.of("host"),
+ ImmutableList.of("visited_sum", "unique_hosts"),
+ null,
+ QueryGranularities.NONE
+ );
+
+ List<InputRow> rows = Lists.newArrayList();
+ while (firehose.hasMore()) {
+ rows.add(firehose.nextRow());
+ }
+
+ verifyRows(expectedRows, rows);
+
+ }
+
+ private void verifyRows(List<ImmutableMap<String, Object>> expectedRows,
+ List<InputRow> actualRows
+ ) {
+ System.out.println("actualRows = " + actualRows);
+ Assert.assertEquals(expectedRows.size(), actualRows.size());
+
+ for (int i = 0; i < expectedRows.size(); i++) {
+ Map<String, Object> expected = expectedRows.get(i);
+ InputRow actual = actualRows.get(i);
+
+ Assert.assertEquals(ImmutableList.of("host"), actual.getDimensions());
+
+ Assert.assertEquals(expected.get(DruidTable.DEFAULT_TIMESTAMP_COLUMN),
+ actual.getTimestamp().getMillis()
+ );
+ Assert.assertEquals(expected.get("host"), actual.getDimension("host"));
+ Assert.assertEquals(expected.get("visited_sum"), actual.getLongMetric("visited_sum"));
+ Assert.assertEquals(
+ (Double) expected.get("unique_hosts"),
+ (Double) HyperUniquesAggregatorFactory
+ .estimateCardinality(actual.getRaw("unique_hosts")),
+ 0.001
+ );
+ }
+ }
+
+ @Test
+ public void testSerDesr() throws IOException {
+ String segment = "{\"dataSource\":\"datasource2015\",\"interval\":\"2015-06-01T00:00:00.000-04:00/2015-06-02T00:00:00.000-04:00\",\"version\":\"2016-11-04T19:24:01.732-04:00\",\"loadSpec\":{\"type\":\"hdfs\",\"path\":\"hdfs://cn105-10.l42scl.hortonworks.com:8020/apps/hive/warehouse/druid.db/.hive-staging_hive_2016-11-04_19-23-50_168_1550339856804207572-1/_task_tmp.-ext-10002/_tmp.000000_0/datasource2015/20150601T000000.000-0400_20150602T000000.000-0400/2016-11-04T19_24_01.732-04_00/0/index.zip\"},\"dimensions\":\"dimension1\",\"metrics\":\"bigint\",\"shardSpec\":{\"type\":\"linear\",\"partitionNum\":0},\"binaryVersion\":9,\"size\":1765,\"identifier\":\"datasource2015_2015-06-01T00:00:00.000-04:00_2015-06-02T00:00:00.000-04:00_2016-11-04T19:24:01.732-04:00\"}";
+ DataSegment dataSegment = objectMapper.readerFor(DataSegment.class)
+ .readValue(segment);
+ Assert.assertTrue(dataSegment.getDataSource().equals("datasource2015"));
+ }
+
+}
http://git-wip-us.apache.org/repos/asf/hive/blob/590687bc/llap-server/src/java/org/apache/hadoop/hive/llap/daemon/impl/TaskRunnerCallable.java
----------------------------------------------------------------------
diff --git a/llap-server/src/java/org/apache/hadoop/hive/llap/daemon/impl/TaskRunnerCallable.java b/llap-server/src/java/org/apache/hadoop/hive/llap/daemon/impl/TaskRunnerCallable.java
index 87bd5c8..d8689ba 100644
--- a/llap-server/src/java/org/apache/hadoop/hive/llap/daemon/impl/TaskRunnerCallable.java
+++ b/llap-server/src/java/org/apache/hadoop/hive/llap/daemon/impl/TaskRunnerCallable.java
@@ -17,19 +17,12 @@
*/
package org.apache.hadoop.hive.llap.daemon.impl;
-import java.net.InetSocketAddress;
-import java.nio.ByteBuffer;
-import java.security.PrivilegedExceptionAction;
-import java.util.HashMap;
-import java.util.Map;
-import java.util.Stack;
-import java.util.concurrent.ExecutorService;
-import java.util.concurrent.LinkedBlockingQueue;
-import java.util.concurrent.ThreadPoolExecutor;
-import java.util.concurrent.TimeUnit;
-import java.util.concurrent.atomic.AtomicBoolean;
-import java.util.concurrent.atomic.AtomicLong;
-
+import com.google.common.annotations.VisibleForTesting;
+import com.google.common.base.Stopwatch;
+import com.google.common.collect.HashMultimap;
+import com.google.common.collect.Multimap;
+import com.google.common.util.concurrent.FutureCallback;
+import com.google.common.util.concurrent.ThreadFactoryBuilder;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.fs.FileSystem;
import org.apache.hadoop.hive.llap.daemon.FragmentCompletionHandler;
@@ -50,7 +43,6 @@ import org.apache.hadoop.security.Credentials;
import org.apache.hadoop.security.SecurityUtil;
import org.apache.hadoop.security.UserGroupInformation;
import org.apache.hadoop.security.token.Token;
-import org.apache.hadoop.yarn.api.records.ApplicationId;
import org.apache.log4j.MDC;
import org.apache.log4j.NDC;
import org.apache.tez.common.CallableWithNdc;
@@ -58,10 +50,7 @@ import org.apache.tez.common.TezCommonUtils;
import org.apache.tez.common.security.JobTokenIdentifier;
import org.apache.tez.common.security.TokenCache;
import org.apache.tez.dag.api.TezConstants;
-import org.apache.tez.dag.records.TezDAGID;
import org.apache.tez.dag.records.TezTaskAttemptID;
-import org.apache.tez.dag.records.TezTaskID;
-import org.apache.tez.dag.records.TezVertexID;
import org.apache.tez.hadoop.shim.HadoopShim;
import org.apache.tez.runtime.api.ExecutionContext;
import org.apache.tez.runtime.api.impl.TaskSpec;
@@ -75,15 +64,17 @@ import org.apache.tez.runtime.task.TezTaskRunner2;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
-import com.google.common.annotations.VisibleForTesting;
-import com.google.common.base.Joiner;
-import com.google.common.base.Stopwatch;
-import com.google.common.collect.HashMultimap;
-import com.google.common.collect.Multimap;
-import com.google.common.util.concurrent.FutureCallback;
-import com.google.common.util.concurrent.ListeningExecutorService;
-import com.google.common.util.concurrent.MoreExecutors;
-import com.google.common.util.concurrent.ThreadFactoryBuilder;
+import java.net.InetSocketAddress;
+import java.nio.ByteBuffer;
+import java.security.PrivilegedExceptionAction;
+import java.util.HashMap;
+import java.util.Map;
+import java.util.Stack;
+import java.util.concurrent.ExecutorService;
+import java.util.concurrent.LinkedBlockingQueue;
+import java.util.concurrent.TimeUnit;
+import java.util.concurrent.atomic.AtomicBoolean;
+import java.util.concurrent.atomic.AtomicLong;
/**
*
@@ -265,7 +256,7 @@ public class TaskRunnerCallable extends CallableWithNdc<TaskRunner2Result> {
} finally {
FileSystem.closeAllForUGI(taskUgi);
LOG.info("ExecutionTime for Container: " + request.getContainerIdString() + "=" +
- runtimeWatch.stop().elapsedMillis());
+ runtimeWatch.stop().elapsedMillis());
if (LOG.isDebugEnabled()) {
LOG.debug(
"canFinish post completion: " + taskSpec.getTaskAttemptID() + ": " + canFinish());
http://git-wip-us.apache.org/repos/asf/hive/blob/590687bc/pom.xml
----------------------------------------------------------------------
diff --git a/pom.xml b/pom.xml
index 3fc35bc..376197e 100644
--- a/pom.xml
+++ b/pom.xml
@@ -136,7 +136,7 @@
<derby.version>10.10.2.0</derby.version>
<dropwizard.version>3.1.0</dropwizard.version>
<dropwizard-metrics-hadoop-metrics2-reporter.version>0.1.2</dropwizard-metrics-hadoop-metrics2-reporter.version>
- <druid.version>0.9.1.1</druid.version>
+ <druid.version>0.9.2</druid.version>
<guava.version>14.0.1</guava.version>
<groovy.version>2.4.4</groovy.version>
<hadoop.version>2.7.2</hadoop.version>
http://git-wip-us.apache.org/repos/asf/hive/blob/590687bc/ql/src/java/org/apache/hadoop/hive/ql/exec/FileSinkOperator.java
----------------------------------------------------------------------
diff --git a/ql/src/java/org/apache/hadoop/hive/ql/exec/FileSinkOperator.java b/ql/src/java/org/apache/hadoop/hive/ql/exec/FileSinkOperator.java
index 54d619c..5ef901f 100644
--- a/ql/src/java/org/apache/hadoop/hive/ql/exec/FileSinkOperator.java
+++ b/ql/src/java/org/apache/hadoop/hive/ql/exec/FileSinkOperator.java
@@ -18,20 +18,7 @@
package org.apache.hadoop.hive.ql.exec;
-import static org.apache.hadoop.hive.conf.HiveConf.ConfVars.HIVE_TEMPORARY_TABLE_STORAGE;
-
-import java.io.FileNotFoundException;
-import java.io.IOException;
-import java.io.Serializable;
-import java.io.StringWriter;
-import java.util.ArrayList;
-import java.util.HashMap;
-import java.util.HashSet;
-import java.util.List;
-import java.util.Map;
-import java.util.Properties;
-import java.util.Set;
-
+import com.google.common.collect.Lists;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.fs.FileStatus;
import org.apache.hadoop.fs.FileSystem;
@@ -83,7 +70,19 @@ import org.apache.hive.common.util.HiveStringUtils;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
-import com.google.common.collect.Lists;
+import java.io.FileNotFoundException;
+import java.io.IOException;
+import java.io.Serializable;
+import java.io.StringWriter;
+import java.util.ArrayList;
+import java.util.HashMap;
+import java.util.HashSet;
+import java.util.List;
+import java.util.Map;
+import java.util.Properties;
+import java.util.Set;
+
+import static org.apache.hadoop.hive.conf.HiveConf.ConfVars.HIVE_TEMPORARY_TABLE_STORAGE;
/**
* File Sink operator implementation.
http://git-wip-us.apache.org/repos/asf/hive/blob/590687bc/ql/src/java/org/apache/hadoop/hive/ql/exec/Utilities.java
----------------------------------------------------------------------
diff --git a/ql/src/java/org/apache/hadoop/hive/ql/exec/Utilities.java b/ql/src/java/org/apache/hadoop/hive/ql/exec/Utilities.java
index 8db833e..37e4b9b 100644
--- a/ql/src/java/org/apache/hadoop/hive/ql/exec/Utilities.java
+++ b/ql/src/java/org/apache/hadoop/hive/ql/exec/Utilities.java
@@ -18,63 +18,10 @@
package org.apache.hadoop.hive.ql.exec;
+import com.esotericsoftware.kryo.Kryo;
+import com.google.common.base.Preconditions;
import com.google.common.collect.Lists;
import com.google.common.collect.Sets;
-import java.beans.DefaultPersistenceDelegate;
-import java.beans.Encoder;
-import java.beans.Expression;
-import java.beans.Statement;
-import java.io.ByteArrayInputStream;
-import java.io.ByteArrayOutputStream;
-import java.io.DataInput;
-import java.io.EOFException;
-import java.io.File;
-import java.io.FileNotFoundException;
-import java.io.IOException;
-import java.io.InputStream;
-import java.io.OutputStream;
-import java.io.Serializable;
-import java.net.URI;
-import java.net.URISyntaxException;
-import java.net.URL;
-import java.net.URLClassLoader;
-import java.net.URLDecoder;
-import java.sql.Connection;
-import java.sql.DriverManager;
-import java.sql.PreparedStatement;
-import java.sql.SQLException;
-import java.sql.SQLFeatureNotSupportedException;
-import java.sql.SQLTransientException;
-import java.text.SimpleDateFormat;
-import java.util.ArrayList;
-import java.util.Arrays;
-import java.util.Calendar;
-import java.util.Collection;
-import java.util.Collections;
-import java.util.Enumeration;
-import java.util.HashMap;
-import java.util.HashSet;
-import java.util.Iterator;
-import java.util.LinkedHashMap;
-import java.util.LinkedList;
-import java.util.List;
-import java.util.Map;
-import java.util.Properties;
-import java.util.Random;
-import java.util.Set;
-import java.util.UUID;
-import java.util.concurrent.ConcurrentHashMap;
-import java.util.concurrent.ExecutionException;
-import java.util.concurrent.Future;
-import java.util.concurrent.LinkedBlockingQueue;
-import java.util.concurrent.ThreadPoolExecutor;
-import java.util.concurrent.TimeUnit;
-import java.util.regex.Matcher;
-import java.util.regex.Pattern;
-import java.util.zip.Deflater;
-import java.util.zip.DeflaterOutputStream;
-import java.util.zip.InflaterInputStream;
-
import org.apache.commons.codec.binary.Base64;
import org.apache.commons.lang.StringUtils;
import org.apache.commons.lang.WordUtils;
@@ -198,8 +145,58 @@ import org.apache.hive.common.util.ReflectionUtil;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
-import com.esotericsoftware.kryo.Kryo;
-import com.google.common.base.Preconditions;
+import java.beans.DefaultPersistenceDelegate;
+import java.beans.Encoder;
+import java.beans.Expression;
+import java.beans.Statement;
+import java.io.ByteArrayInputStream;
+import java.io.ByteArrayOutputStream;
+import java.io.DataInput;
+import java.io.EOFException;
+import java.io.File;
+import java.io.FileNotFoundException;
+import java.io.IOException;
+import java.io.InputStream;
+import java.io.OutputStream;
+import java.io.Serializable;
+import java.net.URI;
+import java.net.URL;
+import java.net.URLClassLoader;
+import java.net.URLDecoder;
+import java.sql.Connection;
+import java.sql.DriverManager;
+import java.sql.PreparedStatement;
+import java.sql.SQLException;
+import java.sql.SQLFeatureNotSupportedException;
+import java.sql.SQLTransientException;
+import java.text.SimpleDateFormat;
+import java.util.ArrayList;
+import java.util.Arrays;
+import java.util.Calendar;
+import java.util.Collection;
+import java.util.Enumeration;
+import java.util.HashMap;
+import java.util.HashSet;
+import java.util.Iterator;
+import java.util.LinkedHashMap;
+import java.util.LinkedList;
+import java.util.List;
+import java.util.Map;
+import java.util.Properties;
+import java.util.Random;
+import java.util.Set;
+import java.util.UUID;
+import java.util.concurrent.ConcurrentHashMap;
+import java.util.concurrent.ExecutionException;
+import java.util.concurrent.Future;
+import java.util.concurrent.LinkedBlockingQueue;
+import java.util.concurrent.ThreadPoolExecutor;
+import java.util.concurrent.TimeUnit;
+import java.util.regex.Matcher;
+import java.util.regex.Pattern;
+import java.util.zip.Deflater;
+import java.util.zip.DeflaterOutputStream;
+import java.util.zip.InflaterInputStream;
/**
* Utilities.
@@ -3095,7 +3092,7 @@ public final class Utilities {
TableDesc tableDesc = work.getAliasToPartnInfo().get(alias).getTableDesc();
if (tableDesc.isNonNative()) {
- // if this isn't a hive table we can't create an empty file for it.
+ // if it does not need native storage, we can't create an empty file for it.
return null;
}
http://git-wip-us.apache.org/repos/asf/hive/blob/590687bc/ql/src/java/org/apache/hadoop/hive/ql/hooks/LineageLogger.java
----------------------------------------------------------------------
diff --git a/ql/src/java/org/apache/hadoop/hive/ql/hooks/LineageLogger.java b/ql/src/java/org/apache/hadoop/hive/ql/hooks/LineageLogger.java
index 178a2de..c1f6883 100644
--- a/ql/src/java/org/apache/hadoop/hive/ql/hooks/LineageLogger.java
+++ b/ql/src/java/org/apache/hadoop/hive/ql/hooks/LineageLogger.java
@@ -18,21 +18,13 @@
package org.apache.hadoop.hive.ql.hooks;
-import java.io.IOException;
-import java.util.ArrayList;
-import java.util.Collection;
-import java.util.HashSet;
-import java.util.LinkedHashMap;
-import java.util.LinkedHashSet;
-import java.util.List;
-import java.util.Map;
-import java.util.Set;
-
+import com.google.common.collect.Lists;
+import com.google.common.hash.Hasher;
+import com.google.common.hash.Hashing;
+import com.google.gson.stream.JsonWriter;
import org.apache.commons.collections.SetUtils;
import org.apache.commons.io.output.StringBuilderWriter;
import org.apache.commons.lang.StringUtils;
-import org.slf4j.Logger;
-import org.slf4j.LoggerFactory;
import org.apache.hadoop.hive.common.ObjectPair;
import org.apache.hadoop.hive.conf.HiveConf;
import org.apache.hadoop.hive.metastore.api.FieldSchema;
@@ -50,11 +42,18 @@ import org.apache.hadoop.hive.ql.optimizer.lineage.LineageCtx.Index;
import org.apache.hadoop.hive.ql.plan.HiveOperation;
import org.apache.hadoop.hive.ql.session.SessionState;
import org.apache.hadoop.hive.ql.session.SessionState.LogHelper;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
-import com.google.common.collect.Lists;
-import com.google.common.hash.Hasher;
-import com.google.common.hash.Hashing;
-import com.google.gson.stream.JsonWriter;
+import java.io.IOException;
+import java.util.ArrayList;
+import java.util.Collection;
+import java.util.HashSet;
+import java.util.LinkedHashMap;
+import java.util.LinkedHashSet;
+import java.util.List;
+import java.util.Map;
+import java.util.Set;
/**
* Implementation of a post execute hook that logs lineage info to a log file.
http://git-wip-us.apache.org/repos/asf/hive/blob/590687bc/ql/src/java/org/apache/hadoop/hive/ql/optimizer/Optimizer.java
----------------------------------------------------------------------
diff --git a/ql/src/java/org/apache/hadoop/hive/ql/optimizer/Optimizer.java b/ql/src/java/org/apache/hadoop/hive/ql/optimizer/Optimizer.java
index eaf0abc..d9c2ff4 100644
--- a/ql/src/java/org/apache/hadoop/hive/ql/optimizer/Optimizer.java
+++ b/ql/src/java/org/apache/hadoop/hive/ql/optimizer/Optimizer.java
@@ -122,6 +122,8 @@ public class Optimizer {
transformations.add(new SortedDynPartitionOptimizer());
}
+ transformations.add(new SortedDynPartitionTimeGranularityOptimizer());
+
if (HiveConf.getBoolVar(hiveConf, HiveConf.ConfVars.HIVEOPTPPD)) {
transformations.add(new PartitionPruner());
transformations.add(new PartitionConditionRemover());