You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@hive.apache.org by ha...@apache.org on 2017/08/24 16:52:44 UTC
hive git commit: HIVE-17372 : update druid dependency to druid 0.10.1
(Slim Bouguerra via Ashutosh Chauhan)
Repository: hive
Updated Branches:
refs/heads/master 35e30bff7 -> c284587ff
HIVE-17372 : update druid dependency to druid 0.10.1 (Slim Bouguerra via Ashutosh Chauhan)
Signed-off-by: Ashutosh Chauhan <ha...@apache.org>
Project: http://git-wip-us.apache.org/repos/asf/hive/repo
Commit: http://git-wip-us.apache.org/repos/asf/hive/commit/c284587f
Tree: http://git-wip-us.apache.org/repos/asf/hive/tree/c284587f
Diff: http://git-wip-us.apache.org/repos/asf/hive/diff/c284587f
Branch: refs/heads/master
Commit: c284587ff59b04abb0866cc8b516051406a654fe
Parents: 35e30bf
Author: Slim Bouguerra <sl...@gmail.com>
Authored: Thu Aug 24 09:52:03 2017 -0700
Committer: Ashutosh Chauhan <ha...@apache.org>
Committed: Thu Aug 24 09:52:03 2017 -0700
----------------------------------------------------------------------
.../hadoop/hive/druid/DruidStorageHandler.java | 57 ++++++++------------
.../hive/druid/DruidStorageHandlerUtils.java | 24 ++++++---
.../hadoop/hive/druid/io/DruidOutputFormat.java | 3 +-
.../hive/druid/TestDruidStorageHandler.java | 12 ++++-
.../hive/ql/io/TestDruidRecordWriter.java | 6 +--
pom.xml | 2 +-
6 files changed, 54 insertions(+), 50 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/hive/blob/c284587f/druid-handler/src/java/org/apache/hadoop/hive/druid/DruidStorageHandler.java
----------------------------------------------------------------------
diff --git a/druid-handler/src/java/org/apache/hadoop/hive/druid/DruidStorageHandler.java b/druid-handler/src/java/org/apache/hadoop/hive/druid/DruidStorageHandler.java
index 60be4ef..da6d493 100644
--- a/druid-handler/src/java/org/apache/hadoop/hive/druid/DruidStorageHandler.java
+++ b/druid-handler/src/java/org/apache/hadoop/hive/druid/DruidStorageHandler.java
@@ -281,7 +281,6 @@ public class DruidStorageHandler extends DefaultHiveMetaHook implements HiveStor
if (MetaStoreUtils.isExternalTable(table)) {
return;
}
- Lifecycle lifecycle = new Lifecycle();
LOG.info("Committing table {} to the druid metastore", table.getDbName());
final Path tableDir = getSegmentDescriptorDir();
try {
@@ -310,19 +309,9 @@ public class DruidStorageHandler extends DefaultHiveMetaHook implements HiveStor
String coordinatorResponse = null;
try {
- coordinatorResponse = RetryUtils.retry(new Callable<String>() {
- @Override
- public String call() throws Exception {
- return DruidStorageHandlerUtils.getURL(getHttpClient(),
- new URL(String.format("http://%s/status", coordinatorAddress))
- );
- }
- }, new Predicate<Throwable>() {
- @Override
- public boolean apply(@Nullable Throwable input) {
- return input instanceof IOException;
- }
- }, maxTries);
+ coordinatorResponse = RetryUtils.retry(() -> DruidStorageHandlerUtils.getURL(getHttpClient(),
+ new URL(String.format("http://%s/status", coordinatorAddress))
+ ), input -> input instanceof IOException, maxTries);
} catch (Exception e) {
console.printInfo(
"Will skip waiting for data loading");
@@ -338,28 +327,25 @@ public class DruidStorageHandler extends DefaultHiveMetaHook implements HiveStor
long passiveWaitTimeMs = HiveConf
.getLongVar(getConf(), HiveConf.ConfVars.HIVE_DRUID_PASSIVE_WAIT_TIME);
ImmutableSet<URL> setOfUrls = FluentIterable.from(segmentList)
- .transform(new Function<DataSegment, URL>() {
- @Override
- public URL apply(DataSegment dataSegment) {
- try {
- //Need to make sure that we are using UTC since most of the druid cluster use UTC by default
- return new URL(String
- .format("http://%s/druid/coordinator/v1/datasources/%s/segments/%s",
- coordinatorAddress, dataSourceName, DataSegment
- .makeDataSegmentIdentifier(dataSegment.getDataSource(),
- new DateTime(dataSegment.getInterval()
- .getStartMillis(), DateTimeZone.UTC),
- new DateTime(dataSegment.getInterval()
- .getEndMillis(), DateTimeZone.UTC),
- dataSegment.getVersion(),
- dataSegment.getShardSpec()
- )
- ));
- } catch (MalformedURLException e) {
- Throwables.propagate(e);
- }
- return null;
+ .transform(dataSegment -> {
+ try {
+ //Need to make sure that we are using UTC since most of the druid cluster use UTC by default
+ return new URL(String
+ .format("http://%s/druid/coordinator/v1/datasources/%s/segments/%s",
+ coordinatorAddress, dataSourceName, DataSegment
+ .makeDataSegmentIdentifier(dataSegment.getDataSource(),
+ new DateTime(dataSegment.getInterval()
+ .getStartMillis(), DateTimeZone.UTC),
+ new DateTime(dataSegment.getInterval()
+ .getEndMillis(), DateTimeZone.UTC),
+ dataSegment.getVersion(),
+ dataSegment.getShardSpec()
+ )
+ ));
+ } catch (MalformedURLException e) {
+ Throwables.propagate(e);
}
+ return null;
}).toSet();
int numRetries = 0;
@@ -399,7 +385,6 @@ public class DruidStorageHandler extends DefaultHiveMetaHook implements HiveStor
Throwables.propagate(e);
} finally {
cleanWorkingDir();
- lifecycle.stop();
}
}
http://git-wip-us.apache.org/repos/asf/hive/blob/c284587f/druid-handler/src/java/org/apache/hadoop/hive/druid/DruidStorageHandlerUtils.java
----------------------------------------------------------------------
diff --git a/druid-handler/src/java/org/apache/hadoop/hive/druid/DruidStorageHandlerUtils.java b/druid-handler/src/java/org/apache/hadoop/hive/druid/DruidStorageHandlerUtils.java
index 5dd65b3..3eeb0c3 100644
--- a/druid-handler/src/java/org/apache/hadoop/hive/druid/DruidStorageHandlerUtils.java
+++ b/druid-handler/src/java/org/apache/hadoop/hive/druid/DruidStorageHandlerUtils.java
@@ -19,6 +19,7 @@ package org.apache.hadoop.hive.druid;
import io.druid.common.utils.JodaUtils;
import io.druid.jackson.DefaultObjectMapper;
+import io.druid.math.expr.ExprMacroTable;
import io.druid.metadata.MetadataStorageTablesConfig;
import io.druid.metadata.SQLMetadataConnector;
import io.druid.metadata.storage.mysql.MySQLConnector;
@@ -28,7 +29,6 @@ import io.druid.segment.IndexIO;
import io.druid.segment.IndexMergerV9;
import io.druid.segment.column.ColumnConfig;
import io.druid.segment.loading.DataSegmentPusher;
-import io.druid.segment.loading.DataSegmentPusherUtil;
import io.druid.segment.realtime.appenderator.SegmentIdentifier;
import io.druid.storage.hdfs.HdfsDataSegmentPusher;
import io.druid.storage.hdfs.HdfsDataSegmentPusherConfig;
@@ -77,6 +77,7 @@ import org.jboss.netty.handler.codec.http.HttpHeaders;
import org.jboss.netty.handler.codec.http.HttpMethod;
import org.joda.time.DateTime;
import org.joda.time.Interval;
+import org.joda.time.format.ISODateTimeFormat;
import org.skife.jdbi.v2.FoldController;
import org.skife.jdbi.v2.Folder3;
import org.skife.jdbi.v2.Handle;
@@ -135,10 +136,9 @@ public final class DruidStorageHandlerUtils {
static
{
// This is needed for serde of PagingSpec as it uses JacksonInject for injecting SelectQueryConfig
- InjectableValues.Std injectableValues = new InjectableValues.Std().addValue(
- SelectQueryConfig.class,
- new SelectQueryConfig(false)
- );
+ InjectableValues.Std injectableValues = new InjectableValues.Std()
+ .addValue(SelectQueryConfig.class, new SelectQueryConfig(false))
+ .addValue(ExprMacroTable.class, ExprMacroTable.nil());
JSON_MAPPER.setInjectableValues(injectableValues);
SMILE_MAPPER.setInjectableValues(injectableValues);
HiveDruidSerializationModule hiveDruidSerializationModule = new HiveDruidSerializationModule();
@@ -720,9 +720,17 @@ public final class DruidStorageHandlerUtils {
}
public static Path finalPathForSegment(String segmentDirectory, DataSegment segment) {
- return new Path(
- String.format("%s/%s/index.zip", segmentDirectory,
- DataSegmentPusherUtil.getHdfsStorageDir(segment)));
+ String path = DataSegmentPusher.JOINER.join(
+ segment.getDataSource(),
+ String.format(
+ "%s_%s",
+ segment.getInterval().getStart().toString(ISODateTimeFormat.basicDateTime()),
+ segment.getInterval().getEnd().toString(ISODateTimeFormat.basicDateTime())
+ ),
+ segment.getVersion().replaceAll(":", "_")
+ );
+
+ return new Path(String.format("%s/%s/index.zip", segmentDirectory, path));
}
private static ShardSpec getNextPartitionShardSpec(ShardSpec shardSpec) {
http://git-wip-us.apache.org/repos/asf/hive/blob/c284587f/druid-handler/src/java/org/apache/hadoop/hive/druid/io/DruidOutputFormat.java
----------------------------------------------------------------------
diff --git a/druid-handler/src/java/org/apache/hadoop/hive/druid/io/DruidOutputFormat.java b/druid-handler/src/java/org/apache/hadoop/hive/druid/io/DruidOutputFormat.java
index 5e1deac..9d2ec82 100644
--- a/druid-handler/src/java/org/apache/hadoop/hive/druid/io/DruidOutputFormat.java
+++ b/druid-handler/src/java/org/apache/hadoop/hive/druid/io/DruidOutputFormat.java
@@ -212,7 +212,8 @@ public class DruidOutputFormat<K, V> implements HiveOutputFormat<K, DruidWritabl
0,
0,
true,
- null
+ null,
+ 0L
);
LOG.debug(String.format("running with Data schema [%s] ", dataSchema));
http://git-wip-us.apache.org/repos/asf/hive/blob/c284587f/druid-handler/src/test/org/apache/hadoop/hive/druid/TestDruidStorageHandler.java
----------------------------------------------------------------------
diff --git a/druid-handler/src/test/org/apache/hadoop/hive/druid/TestDruidStorageHandler.java b/druid-handler/src/test/org/apache/hadoop/hive/druid/TestDruidStorageHandler.java
index 3ba3196..25f96b3 100644
--- a/druid-handler/src/test/org/apache/hadoop/hive/druid/TestDruidStorageHandler.java
+++ b/druid-handler/src/test/org/apache/hadoop/hive/druid/TestDruidStorageHandler.java
@@ -21,7 +21,10 @@ package org.apache.hadoop.hive.druid;
import io.druid.indexer.JobHelper;
import io.druid.indexer.SQLMetadataStorageUpdaterJobHandler;
import io.druid.metadata.MetadataStorageTablesConfig;
+import io.druid.segment.loading.DataSegmentPusher;
import io.druid.segment.loading.SegmentLoadingException;
+import io.druid.storage.hdfs.HdfsDataSegmentPusher;
+import io.druid.storage.hdfs.HdfsDataSegmentPusherConfig;
import io.druid.timeline.DataSegment;
import io.druid.timeline.partition.LinearShardSpec;
import io.druid.timeline.partition.NoneShardSpec;
@@ -202,8 +205,15 @@ public class TestDruidStorageHandler {
LocalFileSystem localFileSystem = FileSystem.getLocal(config);
Path taskDirPath = new Path(tableWorkingPath, druidStorageHandler.makeStagingName());
DataSegment dataSegment = createSegment(new Path(taskDirPath, "index.zip").toString());
+ HdfsDataSegmentPusherConfig hdfsDSPConfig = new HdfsDataSegmentPusherConfig();
+ hdfsDSPConfig.setStorageDirectory(segmentRootPath);
+ HdfsDataSegmentPusher hdfsDataSegmentPusher = new HdfsDataSegmentPusher(hdfsDSPConfig, config,
+ DruidStorageHandlerUtils.JSON_MAPPER
+ );
Path segmentOutputPath = JobHelper
- .makeFileNamePath(new Path(segmentRootPath), localFileSystem, dataSegment, JobHelper.INDEX_ZIP);
+ .makeFileNamePath(new Path(segmentRootPath), localFileSystem, dataSegment,
+ JobHelper.INDEX_ZIP, hdfsDataSegmentPusher
+ );
Path indexPath = new Path(segmentOutputPath, "index.zip");
DataSegment dataSegmentWithLoadspect = DataSegment.builder(dataSegment).loadSpec(
ImmutableMap.<String, Object>of("path", indexPath)).build();
http://git-wip-us.apache.org/repos/asf/hive/blob/c284587f/druid-handler/src/test/org/apache/hadoop/hive/ql/io/TestDruidRecordWriter.java
----------------------------------------------------------------------
diff --git a/druid-handler/src/test/org/apache/hadoop/hive/ql/io/TestDruidRecordWriter.java b/druid-handler/src/test/org/apache/hadoop/hive/ql/io/TestDruidRecordWriter.java
index d5b217a..4962e0b 100644
--- a/druid-handler/src/test/org/apache/hadoop/hive/ql/io/TestDruidRecordWriter.java
+++ b/druid-handler/src/test/org/apache/hadoop/hive/ql/io/TestDruidRecordWriter.java
@@ -142,7 +142,8 @@ public class TestDruidRecordWriter {
IndexSpec indexSpec = new IndexSpec(new RoaringBitmapSerdeFactory(true), null, null, null);
RealtimeTuningConfig tuningConfig = new RealtimeTuningConfig(null, null, null,
- temporaryFolder.newFolder(), null, null, null, null, indexSpec, null, 0, 0, null, null
+ temporaryFolder.newFolder(), null, null, null, null, indexSpec, null, 0, 0, null, null,
+ 0L
);
LocalFileSystem localFileSystem = FileSystem.getLocal(config);
DataSegmentPusher dataSegmentPusher = new LocalDataSegmentPusher(
@@ -192,8 +193,7 @@ public class TestDruidRecordWriter {
ImmutableList.of(new WindowedStorageAdapter(adapter, adapter.getInterval())),
ImmutableList.of("host"),
ImmutableList.of("visited_sum", "unique_hosts"),
- null,
- Granularities.NONE
+ null
);
List<InputRow> rows = Lists.newArrayList();
http://git-wip-us.apache.org/repos/asf/hive/blob/c284587f/pom.xml
----------------------------------------------------------------------
diff --git a/pom.xml b/pom.xml
index 40699bc..6b59505 100644
--- a/pom.xml
+++ b/pom.xml
@@ -138,7 +138,7 @@
<derby.version>10.10.2.0</derby.version>
<dropwizard.version>3.1.0</dropwizard.version>
<dropwizard-metrics-hadoop-metrics2-reporter.version>0.1.2</dropwizard-metrics-hadoop-metrics2-reporter.version>
- <druid.version>0.10.0</druid.version>
+ <druid.version>0.10.1</druid.version>
<guava.version>14.0.1</guava.version>
<groovy.version>2.4.11</groovy.version>
<h2database.version>1.3.166</h2database.version>