You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@hive.apache.org by ha...@apache.org on 2018/04/13 16:58:08 UTC
hive git commit: HIVE-19187 : Update Druid Storage Handler to Druid
0.12.0 (Slim Bouguerra via Ashutosh Chauhan)
Repository: hive
Updated Branches:
refs/heads/master 6f9090c1d -> 2a3a7d399
HIVE-19187 : Update Druid Storage Handler to Druid 0.12.0 (Slim Bouguerra via Ashutosh Chauhan)
Signed-off-by: Ashutosh Chauhan <ha...@apache.org>
Project: http://git-wip-us.apache.org/repos/asf/hive/repo
Commit: http://git-wip-us.apache.org/repos/asf/hive/commit/2a3a7d39
Tree: http://git-wip-us.apache.org/repos/asf/hive/tree/2a3a7d39
Diff: http://git-wip-us.apache.org/repos/asf/hive/diff/2a3a7d39
Branch: refs/heads/master
Commit: 2a3a7d399e7be6581f1d975bce9a9508a5177ab6
Parents: 6f9090c
Author: Slim Bouguerra <sl...@gmail.com>
Authored: Wed Apr 11 17:56:00 2018 -0700
Committer: Ashutosh Chauhan <ha...@apache.org>
Committed: Fri Apr 13 09:57:34 2018 -0700
----------------------------------------------------------------------
.../hadoop/hive/druid/DruidStorageHandler.java | 4 ++-
.../hive/druid/DruidStorageHandlerUtils.java | 30 +++++++++++++++++---
.../hadoop/hive/druid/io/DruidOutputFormat.java | 4 ++-
.../druid/io/DruidQueryBasedInputFormat.java | 20 +------------
.../hive/druid/json/KafkaTuningConfig.java | 8 ++++--
.../hive/ql/io/TestDruidRecordWriter.java | 6 ++--
pom.xml | 2 +-
7 files changed, 44 insertions(+), 30 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/hive/blob/2a3a7d39/druid-handler/src/java/org/apache/hadoop/hive/druid/DruidStorageHandler.java
----------------------------------------------------------------------
diff --git a/druid-handler/src/java/org/apache/hadoop/hive/druid/DruidStorageHandler.java b/druid-handler/src/java/org/apache/hadoop/hive/druid/DruidStorageHandler.java
index 76540b7..c0feb8d 100644
--- a/druid-handler/src/java/org/apache/hadoop/hive/druid/DruidStorageHandler.java
+++ b/druid-handler/src/java/org/apache/hadoop/hive/druid/DruidStorageHandler.java
@@ -49,6 +49,7 @@ import io.druid.metadata.SQLMetadataConnector;
import io.druid.metadata.storage.derby.DerbyConnector;
import io.druid.metadata.storage.derby.DerbyMetadataStorage;
import io.druid.metadata.storage.mysql.MySQLConnector;
+import io.druid.metadata.storage.mysql.MySQLConnectorConfig;
import io.druid.metadata.storage.postgresql.PostgreSQLConnector;
import io.druid.query.aggregation.AggregatorFactory;
import io.druid.segment.IndexSpec;
@@ -335,6 +336,7 @@ public class DruidStorageHandler extends DefaultHiveMetaHook implements HiveStor
inputParser,
dimensionsAndAggregates.rhs,
granularitySpec,
+ null,
DruidStorageHandlerUtils.JSON_MAPPER
);
@@ -880,7 +882,7 @@ public class DruidStorageHandler extends DefaultHiveMetaHook implements HiveStor
if (dbType.equals("mysql")) {
connector = new MySQLConnector(storageConnectorConfigSupplier,
Suppliers.ofInstance(getDruidMetadataStorageTablesConfig())
- );
+ , new MySQLConnectorConfig());
} else if (dbType.equals("postgresql")) {
connector = new PostgreSQLConnector(storageConnectorConfigSupplier,
Suppliers.ofInstance(getDruidMetadataStorageTablesConfig())
http://git-wip-us.apache.org/repos/asf/hive/blob/2a3a7d39/druid-handler/src/java/org/apache/hadoop/hive/druid/DruidStorageHandlerUtils.java
----------------------------------------------------------------------
diff --git a/druid-handler/src/java/org/apache/hadoop/hive/druid/DruidStorageHandlerUtils.java b/druid-handler/src/java/org/apache/hadoop/hive/druid/DruidStorageHandlerUtils.java
index 1424237..1aef565 100644
--- a/druid-handler/src/java/org/apache/hadoop/hive/druid/DruidStorageHandlerUtils.java
+++ b/druid-handler/src/java/org/apache/hadoop/hive/druid/DruidStorageHandlerUtils.java
@@ -26,6 +26,7 @@ import io.druid.math.expr.ExprMacroTable;
import io.druid.metadata.MetadataStorageTablesConfig;
import io.druid.metadata.SQLMetadataConnector;
import io.druid.metadata.storage.mysql.MySQLConnector;
+import io.druid.query.Druids;
import io.druid.query.expression.LikeExprMacro;
import io.druid.query.expression.RegexpExtractExprMacro;
import io.druid.query.expression.TimestampCeilExprMacro;
@@ -35,7 +36,9 @@ import io.druid.query.expression.TimestampFormatExprMacro;
import io.druid.query.expression.TimestampParseExprMacro;
import io.druid.query.expression.TimestampShiftExprMacro;
import io.druid.query.expression.TrimExprMacro;
+import io.druid.query.select.PagingSpec;
import io.druid.query.select.SelectQueryConfig;
+import io.druid.query.spec.MultipleIntervalSegmentSpec;
import io.druid.segment.IndexIO;
import io.druid.segment.IndexMergerV9;
import io.druid.query.aggregation.AggregatorFactory;
@@ -48,6 +51,7 @@ import io.druid.segment.indexing.granularity.GranularitySpec;
import io.druid.segment.indexing.granularity.UniformGranularitySpec;
import io.druid.segment.loading.DataSegmentPusher;
import io.druid.segment.realtime.appenderator.SegmentIdentifier;
+import io.druid.segment.writeout.TmpFileSegmentWriteOutMediumFactory;
import io.druid.storage.hdfs.HdfsDataSegmentPusher;
import io.druid.storage.hdfs.HdfsDataSegmentPusherConfig;
import io.druid.timeline.DataSegment;
@@ -124,8 +128,10 @@ import java.net.URL;
import java.net.UnknownHostException;
import java.sql.SQLException;
import java.util.ArrayList;
+import java.util.Arrays;
import java.util.Collection;
import java.util.Collections;
+import java.util.HashMap;
import java.util.HashSet;
import java.util.List;
import java.util.Map;
@@ -168,6 +174,7 @@ public final class DruidStorageHandlerUtils {
* Mapper to use to serialize/deserialize Druid objects (SMILE)
*/
public static final ObjectMapper SMILE_MAPPER = new DefaultObjectMapper(new SmileFactory());
+ private static final int DEFAULT_MAX_TRIES = 10;
static {
// This is needed for serde of PagingSpec as it uses JacksonInject for injecting SelectQueryConfig
@@ -187,7 +194,8 @@ public final class DruidStorageHandlerUtils {
new TrimExprMacro.LeftTrimExprMacro(),
new TrimExprMacro.RightTrimExprMacro()
)))
- .addValue(ObjectMapper.class, JSON_MAPPER);
+ .addValue(ObjectMapper.class, JSON_MAPPER)
+ .addValue(DataSegment.PruneLoadSpecHolder.class, DataSegment.PruneLoadSpecHolder.DEFAULT);
JSON_MAPPER.setInjectableValues(injectableValues);
SMILE_MAPPER.setInjectableValues(injectableValues);
@@ -214,13 +222,14 @@ public final class DruidStorageHandlerUtils {
/**
* Used by druid to perform IO on indexes
*/
- public static final IndexIO INDEX_IO = new IndexIO(JSON_MAPPER, () -> 0);
+ public static final IndexIO INDEX_IO =
+ new IndexIO(JSON_MAPPER, TmpFileSegmentWriteOutMediumFactory.instance(), () -> 0);
/**
* Used by druid to merge indexes
*/
public static final IndexMergerV9 INDEX_MERGER_V9 = new IndexMergerV9(JSON_MAPPER,
- DruidStorageHandlerUtils.INDEX_IO
+ DruidStorageHandlerUtils.INDEX_IO,TmpFileSegmentWriteOutMediumFactory.instance()
);
/**
@@ -606,7 +615,7 @@ public final class DruidStorageHandlerUtils {
}
}
)
- , 3, SQLMetadataConnector.DEFAULT_MAX_TRIES);
+ , 3, DEFAULT_MAX_TRIES);
return segmentList;
}
@@ -637,6 +646,19 @@ public final class DruidStorageHandlerUtils {
);
}
+ public static String createSelectStarQuery(String dataSource) throws IOException {
+ // Create Select query
+ Druids.SelectQueryBuilder builder = new Druids.SelectQueryBuilder();
+ builder.dataSource(dataSource);
+ final List<Interval> intervals = Arrays.asList(DEFAULT_INTERVAL);
+ builder.intervals(new MultipleIntervalSegmentSpec(intervals));
+ builder.pagingSpec(PagingSpec.newSpec(1));
+ Map<String, Object> context = new HashMap<>();
+ context.put(Constants.DRUID_QUERY_FETCH, false);
+ builder.context(context);
+ return JSON_MAPPER.writeValueAsString(builder.build());
+ }
+
/**
* Simple interface for retry operations
*/
http://git-wip-us.apache.org/repos/asf/hive/blob/2a3a7d39/druid-handler/src/java/org/apache/hadoop/hive/druid/io/DruidOutputFormat.java
----------------------------------------------------------------------
diff --git a/druid-handler/src/java/org/apache/hadoop/hive/druid/io/DruidOutputFormat.java b/druid-handler/src/java/org/apache/hadoop/hive/druid/io/DruidOutputFormat.java
index 15a08eb..ecb4360 100644
--- a/druid-handler/src/java/org/apache/hadoop/hive/druid/io/DruidOutputFormat.java
+++ b/druid-handler/src/java/org/apache/hadoop/hive/druid/io/DruidOutputFormat.java
@@ -129,6 +129,7 @@ public class DruidOutputFormat<K, V> implements HiveOutputFormat<K, DruidWritabl
inputParser,
dimensionsAndAggregates.rhs,
granularitySpec,
+ null,
DruidStorageHandlerUtils.JSON_MAPPER
);
@@ -156,7 +157,8 @@ public class DruidOutputFormat<K, V> implements HiveOutputFormat<K, DruidWritabl
0,
true,
null,
- 0L
+ 0L,
+ null
);
LOG.debug(String.format("running with Data schema [%s] ", dataSchema));
http://git-wip-us.apache.org/repos/asf/hive/blob/2a3a7d39/druid-handler/src/java/org/apache/hadoop/hive/druid/io/DruidQueryBasedInputFormat.java
----------------------------------------------------------------------
diff --git a/druid-handler/src/java/org/apache/hadoop/hive/druid/io/DruidQueryBasedInputFormat.java b/druid-handler/src/java/org/apache/hadoop/hive/druid/io/DruidQueryBasedInputFormat.java
index c097a13..c2d3fe5 100644
--- a/druid-handler/src/java/org/apache/hadoop/hive/druid/io/DruidQueryBasedInputFormat.java
+++ b/druid-handler/src/java/org/apache/hadoop/hive/druid/io/DruidQueryBasedInputFormat.java
@@ -22,9 +22,7 @@ import java.io.InputStream;
import java.net.URL;
import java.net.URLEncoder;
import java.util.Arrays;
-import java.util.HashMap;
import java.util.List;
-import java.util.Map;
import org.apache.commons.lang3.StringEscapeUtils;
import org.apache.commons.lang3.StringUtils;
@@ -53,7 +51,6 @@ import org.apache.hadoop.mapreduce.RecordReader;
import org.apache.hadoop.mapreduce.TaskAttemptContext;
import org.apache.hadoop.mapreduce.lib.input.FileInputFormat;
import org.jboss.netty.handler.codec.http.HttpMethod;
-import org.joda.time.Interval;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
@@ -64,8 +61,6 @@ import com.google.common.collect.Lists;
import com.metamx.http.client.Request;
import io.druid.query.BaseQuery;
-import io.druid.query.Druids;
-import io.druid.query.Druids.SelectQueryBuilder;
import io.druid.query.LocatedSegmentDescriptor;
import io.druid.query.Query;
import io.druid.query.SegmentDescriptor;
@@ -133,7 +128,7 @@ public class DruidQueryBasedInputFormat extends InputFormat<NullWritable, DruidW
throw new IOException("Druid data source cannot be empty or null");
}
//@FIXME https://issues.apache.org/jira/browse/HIVE-19023 use scan instead of Select
- druidQuery = createSelectStarQuery(dataSource);
+ druidQuery = DruidStorageHandlerUtils.createSelectStarQuery(dataSource);
druidQueryType = Query.SELECT;
} else {
druidQueryType = conf.get(Constants.DRUID_QUERY_TYPE);
@@ -169,19 +164,6 @@ public class DruidQueryBasedInputFormat extends InputFormat<NullWritable, DruidW
}
}
- private static String createSelectStarQuery(String dataSource) throws IOException {
- // Create Select query
- SelectQueryBuilder builder = new Druids.SelectQueryBuilder();
- builder.dataSource(dataSource);
- final List<Interval> intervals = Arrays.asList(DruidStorageHandlerUtils.DEFAULT_INTERVAL);
- builder.intervals(intervals);
- builder.pagingSpec(PagingSpec.newSpec(1));
- Map<String, Object> context = new HashMap<>();
- context.put(Constants.DRUID_QUERY_FETCH, false);
- builder.context(context);
- return DruidStorageHandlerUtils.JSON_MAPPER.writeValueAsString(builder.build());
- }
-
/* New method that distributes the Select query by creating splits containing
* information about different Druid nodes that have the data for the given
* query. */
http://git-wip-us.apache.org/repos/asf/hive/blob/2a3a7d39/druid-handler/src/java/org/apache/hadoop/hive/druid/json/KafkaTuningConfig.java
----------------------------------------------------------------------
diff --git a/druid-handler/src/java/org/apache/hadoop/hive/druid/json/KafkaTuningConfig.java b/druid-handler/src/java/org/apache/hadoop/hive/druid/json/KafkaTuningConfig.java
index ea23ddd..1ec8b5c 100644
--- a/druid-handler/src/java/org/apache/hadoop/hive/druid/json/KafkaTuningConfig.java
+++ b/druid-handler/src/java/org/apache/hadoop/hive/druid/json/KafkaTuningConfig.java
@@ -24,11 +24,11 @@ import io.druid.segment.realtime.appenderator.AppenderatorConfig;
import com.fasterxml.jackson.annotation.JsonCreator;
import com.fasterxml.jackson.annotation.JsonProperty;
-import com.fasterxml.jackson.annotation.JsonSubTypes;
-import com.fasterxml.jackson.annotation.JsonTypeInfo;
+import io.druid.segment.writeout.SegmentWriteOutMediumFactory;
import org.joda.time.Period;
+import javax.annotation.Nullable;
import java.io.File;
/**
@@ -131,6 +131,10 @@ public class KafkaTuningConfig implements AppenderatorConfig
return basePersistDirectory;
}
+ @Nullable @Override public SegmentWriteOutMediumFactory getSegmentWriteOutMediumFactory() {
+ return null;
+ }
+
@Override
@JsonProperty
public int getMaxPendingPersists()
http://git-wip-us.apache.org/repos/asf/hive/blob/2a3a7d39/druid-handler/src/test/org/apache/hadoop/hive/ql/io/TestDruidRecordWriter.java
----------------------------------------------------------------------
diff --git a/druid-handler/src/test/org/apache/hadoop/hive/ql/io/TestDruidRecordWriter.java b/druid-handler/src/test/org/apache/hadoop/hive/ql/io/TestDruidRecordWriter.java
index c1bd332..cb8fa39 100644
--- a/druid-handler/src/test/org/apache/hadoop/hive/ql/io/TestDruidRecordWriter.java
+++ b/druid-handler/src/test/org/apache/hadoop/hive/ql/io/TestDruidRecordWriter.java
@@ -144,13 +144,14 @@ public class TestDruidRecordWriter {
new UniformGranularitySpec(
Granularities.DAY, Granularities.NONE, ImmutableList.of(INTERVAL_FULL)
),
+ null,
objectMapper
);
IndexSpec indexSpec = new IndexSpec(new RoaringBitmapSerdeFactory(true), null, null, null);
RealtimeTuningConfig tuningConfig = new RealtimeTuningConfig(null, null, null,
temporaryFolder.newFolder(), null, null, null, null, indexSpec, null, 0, 0, null, null,
- 0L
+ 0L, null
);
LocalFileSystem localFileSystem = FileSystem.getLocal(config);
DataSegmentPusher dataSegmentPusher = new LocalDataSegmentPusher(
@@ -198,6 +199,7 @@ public class TestDruidRecordWriter {
Firehose firehose = new IngestSegmentFirehose(
ImmutableList.of(new WindowedStorageAdapter(adapter, adapter.getInterval())),
+ null,
ImmutableList.of("host"),
ImmutableList.of("visited_sum", "unique_hosts"),
null
@@ -228,7 +230,7 @@ public class TestDruidRecordWriter {
actual.getTimestamp().getMillis()
);
Assert.assertEquals(expected.get("host"), actual.getDimension("host"));
- Assert.assertEquals(expected.get("visited_sum"), actual.getLongMetric("visited_sum"));
+ Assert.assertEquals(expected.get("visited_sum"), actual.getMetric("visited_sum"));
Assert.assertEquals(
(Double) expected.get("unique_hosts"),
(Double) HyperUniquesAggregatorFactory
http://git-wip-us.apache.org/repos/asf/hive/blob/2a3a7d39/pom.xml
----------------------------------------------------------------------
diff --git a/pom.xml b/pom.xml
index 5802bd3..26721ed 100644
--- a/pom.xml
+++ b/pom.xml
@@ -140,7 +140,7 @@
<derby.version>10.14.1.0</derby.version>
<dropwizard.version>3.1.0</dropwizard.version>
<dropwizard-metrics-hadoop-metrics2-reporter.version>0.1.2</dropwizard-metrics-hadoop-metrics2-reporter.version>
- <druid.version>0.11.0</druid.version>
+ <druid.version>0.12.0</druid.version>
<guava.version>19.0</guava.version>
<groovy.version>2.4.11</groovy.version>
<h2database.version>1.3.166</h2database.version>