You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@hive.apache.org by ha...@apache.org on 2017/08/24 16:52:44 UTC

hive git commit: HIVE-17372 : update druid dependency to druid 0.10.1 (Slim Bouguerra via Ashutosh Chauhan)

Repository: hive
Updated Branches:
  refs/heads/master 35e30bff7 -> c284587ff


HIVE-17372 : update druid dependency to druid 0.10.1 (Slim Bouguerra via Ashutosh Chauhan)

Signed-off-by: Ashutosh Chauhan <ha...@apache.org>


Project: http://git-wip-us.apache.org/repos/asf/hive/repo
Commit: http://git-wip-us.apache.org/repos/asf/hive/commit/c284587f
Tree: http://git-wip-us.apache.org/repos/asf/hive/tree/c284587f
Diff: http://git-wip-us.apache.org/repos/asf/hive/diff/c284587f

Branch: refs/heads/master
Commit: c284587ff59b04abb0866cc8b516051406a654fe
Parents: 35e30bf
Author: Slim Bouguerra <sl...@gmail.com>
Authored: Thu Aug 24 09:52:03 2017 -0700
Committer: Ashutosh Chauhan <ha...@apache.org>
Committed: Thu Aug 24 09:52:03 2017 -0700

----------------------------------------------------------------------
 .../hadoop/hive/druid/DruidStorageHandler.java  | 57 ++++++++------------
 .../hive/druid/DruidStorageHandlerUtils.java    | 24 ++++++---
 .../hadoop/hive/druid/io/DruidOutputFormat.java |  3 +-
 .../hive/druid/TestDruidStorageHandler.java     | 12 ++++-
 .../hive/ql/io/TestDruidRecordWriter.java       |  6 +--
 pom.xml                                         |  2 +-
 6 files changed, 54 insertions(+), 50 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/hive/blob/c284587f/druid-handler/src/java/org/apache/hadoop/hive/druid/DruidStorageHandler.java
----------------------------------------------------------------------
diff --git a/druid-handler/src/java/org/apache/hadoop/hive/druid/DruidStorageHandler.java b/druid-handler/src/java/org/apache/hadoop/hive/druid/DruidStorageHandler.java
index 60be4ef..da6d493 100644
--- a/druid-handler/src/java/org/apache/hadoop/hive/druid/DruidStorageHandler.java
+++ b/druid-handler/src/java/org/apache/hadoop/hive/druid/DruidStorageHandler.java
@@ -281,7 +281,6 @@ public class DruidStorageHandler extends DefaultHiveMetaHook implements HiveStor
     if (MetaStoreUtils.isExternalTable(table)) {
       return;
     }
-    Lifecycle lifecycle = new Lifecycle();
     LOG.info("Committing table {} to the druid metastore", table.getDbName());
     final Path tableDir = getSegmentDescriptorDir();
     try {
@@ -310,19 +309,9 @@ public class DruidStorageHandler extends DefaultHiveMetaHook implements HiveStor
 
       String coordinatorResponse = null;
       try {
-        coordinatorResponse = RetryUtils.retry(new Callable<String>() {
-          @Override
-          public String call() throws Exception {
-            return DruidStorageHandlerUtils.getURL(getHttpClient(),
-                    new URL(String.format("http://%s/status", coordinatorAddress))
-            );
-          }
-        }, new Predicate<Throwable>() {
-          @Override
-          public boolean apply(@Nullable Throwable input) {
-            return input instanceof IOException;
-          }
-        }, maxTries);
+        coordinatorResponse = RetryUtils.retry(() -> DruidStorageHandlerUtils.getURL(getHttpClient(),
+                new URL(String.format("http://%s/status", coordinatorAddress))
+        ), input -> input instanceof IOException, maxTries);
       } catch (Exception e) {
         console.printInfo(
                 "Will skip waiting for data loading");
@@ -338,28 +327,25 @@ public class DruidStorageHandler extends DefaultHiveMetaHook implements HiveStor
       long passiveWaitTimeMs = HiveConf
               .getLongVar(getConf(), HiveConf.ConfVars.HIVE_DRUID_PASSIVE_WAIT_TIME);
       ImmutableSet<URL> setOfUrls = FluentIterable.from(segmentList)
-              .transform(new Function<DataSegment, URL>() {
-                @Override
-                public URL apply(DataSegment dataSegment) {
-                  try {
-                    //Need to make sure that we are using UTC since most of the druid cluster use UTC by default
-                    return new URL(String
-                            .format("http://%s/druid/coordinator/v1/datasources/%s/segments/%s",
-                                    coordinatorAddress, dataSourceName, DataSegment
-                                            .makeDataSegmentIdentifier(dataSegment.getDataSource(),
-                                                    new DateTime(dataSegment.getInterval()
-                                                            .getStartMillis(), DateTimeZone.UTC),
-                                                    new DateTime(dataSegment.getInterval()
-                                                            .getEndMillis(), DateTimeZone.UTC),
-                                                    dataSegment.getVersion(),
-                                                    dataSegment.getShardSpec()
-                                            )
-                            ));
-                  } catch (MalformedURLException e) {
-                    Throwables.propagate(e);
-                  }
-                  return null;
+              .transform(dataSegment -> {
+                try {
+                  //Need to make sure that we are using UTC since most of the druid cluster use UTC by default
+                  return new URL(String
+                          .format("http://%s/druid/coordinator/v1/datasources/%s/segments/%s",
+                                  coordinatorAddress, dataSourceName, DataSegment
+                                          .makeDataSegmentIdentifier(dataSegment.getDataSource(),
+                                                  new DateTime(dataSegment.getInterval()
+                                                          .getStartMillis(), DateTimeZone.UTC),
+                                                  new DateTime(dataSegment.getInterval()
+                                                          .getEndMillis(), DateTimeZone.UTC),
+                                                  dataSegment.getVersion(),
+                                                  dataSegment.getShardSpec()
+                                          )
+                          ));
+                } catch (MalformedURLException e) {
+                  Throwables.propagate(e);
                 }
+                return null;
               }).toSet();
 
       int numRetries = 0;
@@ -399,7 +385,6 @@ public class DruidStorageHandler extends DefaultHiveMetaHook implements HiveStor
       Throwables.propagate(e);
     } finally {
       cleanWorkingDir();
-      lifecycle.stop();
     }
   }
 

http://git-wip-us.apache.org/repos/asf/hive/blob/c284587f/druid-handler/src/java/org/apache/hadoop/hive/druid/DruidStorageHandlerUtils.java
----------------------------------------------------------------------
diff --git a/druid-handler/src/java/org/apache/hadoop/hive/druid/DruidStorageHandlerUtils.java b/druid-handler/src/java/org/apache/hadoop/hive/druid/DruidStorageHandlerUtils.java
index 5dd65b3..3eeb0c3 100644
--- a/druid-handler/src/java/org/apache/hadoop/hive/druid/DruidStorageHandlerUtils.java
+++ b/druid-handler/src/java/org/apache/hadoop/hive/druid/DruidStorageHandlerUtils.java
@@ -19,6 +19,7 @@ package org.apache.hadoop.hive.druid;
 
 import io.druid.common.utils.JodaUtils;
 import io.druid.jackson.DefaultObjectMapper;
+import io.druid.math.expr.ExprMacroTable;
 import io.druid.metadata.MetadataStorageTablesConfig;
 import io.druid.metadata.SQLMetadataConnector;
 import io.druid.metadata.storage.mysql.MySQLConnector;
@@ -28,7 +29,6 @@ import io.druid.segment.IndexIO;
 import io.druid.segment.IndexMergerV9;
 import io.druid.segment.column.ColumnConfig;
 import io.druid.segment.loading.DataSegmentPusher;
-import io.druid.segment.loading.DataSegmentPusherUtil;
 import io.druid.segment.realtime.appenderator.SegmentIdentifier;
 import io.druid.storage.hdfs.HdfsDataSegmentPusher;
 import io.druid.storage.hdfs.HdfsDataSegmentPusherConfig;
@@ -77,6 +77,7 @@ import org.jboss.netty.handler.codec.http.HttpHeaders;
 import org.jboss.netty.handler.codec.http.HttpMethod;
 import org.joda.time.DateTime;
 import org.joda.time.Interval;
+import org.joda.time.format.ISODateTimeFormat;
 import org.skife.jdbi.v2.FoldController;
 import org.skife.jdbi.v2.Folder3;
 import org.skife.jdbi.v2.Handle;
@@ -135,10 +136,9 @@ public final class DruidStorageHandlerUtils {
   static
   {
     // This is needed for serde of PagingSpec as it uses JacksonInject for injecting SelectQueryConfig
-    InjectableValues.Std injectableValues = new InjectableValues.Std().addValue(
-        SelectQueryConfig.class,
-        new SelectQueryConfig(false)
-    );
+    InjectableValues.Std injectableValues = new InjectableValues.Std()
+            .addValue(SelectQueryConfig.class, new SelectQueryConfig(false))
+            .addValue(ExprMacroTable.class, ExprMacroTable.nil());
     JSON_MAPPER.setInjectableValues(injectableValues);
     SMILE_MAPPER.setInjectableValues(injectableValues);
     HiveDruidSerializationModule hiveDruidSerializationModule = new HiveDruidSerializationModule();
@@ -720,9 +720,17 @@ public final class DruidStorageHandlerUtils {
   }
 
   public static Path finalPathForSegment(String segmentDirectory, DataSegment segment) {
-    return new Path(
-            String.format("%s/%s/index.zip", segmentDirectory,
-                    DataSegmentPusherUtil.getHdfsStorageDir(segment)));
+    String path = DataSegmentPusher.JOINER.join(
+            segment.getDataSource(),
+            String.format(
+                    "%s_%s",
+                    segment.getInterval().getStart().toString(ISODateTimeFormat.basicDateTime()),
+                    segment.getInterval().getEnd().toString(ISODateTimeFormat.basicDateTime())
+            ),
+            segment.getVersion().replaceAll(":", "_")
+    );
+
+    return new Path(String.format("%s/%s/index.zip", segmentDirectory, path));
   }
 
   private static ShardSpec getNextPartitionShardSpec(ShardSpec shardSpec) {

http://git-wip-us.apache.org/repos/asf/hive/blob/c284587f/druid-handler/src/java/org/apache/hadoop/hive/druid/io/DruidOutputFormat.java
----------------------------------------------------------------------
diff --git a/druid-handler/src/java/org/apache/hadoop/hive/druid/io/DruidOutputFormat.java b/druid-handler/src/java/org/apache/hadoop/hive/druid/io/DruidOutputFormat.java
index 5e1deac..9d2ec82 100644
--- a/druid-handler/src/java/org/apache/hadoop/hive/druid/io/DruidOutputFormat.java
+++ b/druid-handler/src/java/org/apache/hadoop/hive/druid/io/DruidOutputFormat.java
@@ -212,7 +212,8 @@ public class DruidOutputFormat<K, V> implements HiveOutputFormat<K, DruidWritabl
             0,
             0,
             true,
-            null
+            null,
+            0L
     );
 
     LOG.debug(String.format("running with Data schema [%s] ", dataSchema));

http://git-wip-us.apache.org/repos/asf/hive/blob/c284587f/druid-handler/src/test/org/apache/hadoop/hive/druid/TestDruidStorageHandler.java
----------------------------------------------------------------------
diff --git a/druid-handler/src/test/org/apache/hadoop/hive/druid/TestDruidStorageHandler.java b/druid-handler/src/test/org/apache/hadoop/hive/druid/TestDruidStorageHandler.java
index 3ba3196..25f96b3 100644
--- a/druid-handler/src/test/org/apache/hadoop/hive/druid/TestDruidStorageHandler.java
+++ b/druid-handler/src/test/org/apache/hadoop/hive/druid/TestDruidStorageHandler.java
@@ -21,7 +21,10 @@ package org.apache.hadoop.hive.druid;
 import io.druid.indexer.JobHelper;
 import io.druid.indexer.SQLMetadataStorageUpdaterJobHandler;
 import io.druid.metadata.MetadataStorageTablesConfig;
+import io.druid.segment.loading.DataSegmentPusher;
 import io.druid.segment.loading.SegmentLoadingException;
+import io.druid.storage.hdfs.HdfsDataSegmentPusher;
+import io.druid.storage.hdfs.HdfsDataSegmentPusherConfig;
 import io.druid.timeline.DataSegment;
 import io.druid.timeline.partition.LinearShardSpec;
 import io.druid.timeline.partition.NoneShardSpec;
@@ -202,8 +205,15 @@ public class TestDruidStorageHandler {
     LocalFileSystem localFileSystem = FileSystem.getLocal(config);
     Path taskDirPath = new Path(tableWorkingPath, druidStorageHandler.makeStagingName());
     DataSegment dataSegment = createSegment(new Path(taskDirPath, "index.zip").toString());
+    HdfsDataSegmentPusherConfig hdfsDSPConfig = new HdfsDataSegmentPusherConfig();
+    hdfsDSPConfig.setStorageDirectory(segmentRootPath);
+    HdfsDataSegmentPusher hdfsDataSegmentPusher = new HdfsDataSegmentPusher(hdfsDSPConfig, config,
+            DruidStorageHandlerUtils.JSON_MAPPER
+    );
     Path segmentOutputPath = JobHelper
-            .makeFileNamePath(new Path(segmentRootPath), localFileSystem, dataSegment, JobHelper.INDEX_ZIP);
+            .makeFileNamePath(new Path(segmentRootPath), localFileSystem, dataSegment,
+                    JobHelper.INDEX_ZIP, hdfsDataSegmentPusher
+            );
     Path indexPath = new Path(segmentOutputPath, "index.zip");
     DataSegment dataSegmentWithLoadspect = DataSegment.builder(dataSegment).loadSpec(
             ImmutableMap.<String, Object>of("path", indexPath)).build();

http://git-wip-us.apache.org/repos/asf/hive/blob/c284587f/druid-handler/src/test/org/apache/hadoop/hive/ql/io/TestDruidRecordWriter.java
----------------------------------------------------------------------
diff --git a/druid-handler/src/test/org/apache/hadoop/hive/ql/io/TestDruidRecordWriter.java b/druid-handler/src/test/org/apache/hadoop/hive/ql/io/TestDruidRecordWriter.java
index d5b217a..4962e0b 100644
--- a/druid-handler/src/test/org/apache/hadoop/hive/ql/io/TestDruidRecordWriter.java
+++ b/druid-handler/src/test/org/apache/hadoop/hive/ql/io/TestDruidRecordWriter.java
@@ -142,7 +142,8 @@ public class TestDruidRecordWriter {
 
     IndexSpec indexSpec = new IndexSpec(new RoaringBitmapSerdeFactory(true), null, null, null);
     RealtimeTuningConfig tuningConfig = new RealtimeTuningConfig(null, null, null,
-            temporaryFolder.newFolder(), null, null, null, null, indexSpec, null, 0, 0, null, null
+            temporaryFolder.newFolder(), null, null, null, null, indexSpec, null, 0, 0, null, null,
+            0L
     );
     LocalFileSystem localFileSystem = FileSystem.getLocal(config);
     DataSegmentPusher dataSegmentPusher = new LocalDataSegmentPusher(
@@ -192,8 +193,7 @@ public class TestDruidRecordWriter {
             ImmutableList.of(new WindowedStorageAdapter(adapter, adapter.getInterval())),
             ImmutableList.of("host"),
             ImmutableList.of("visited_sum", "unique_hosts"),
-            null,
-            Granularities.NONE
+            null
     );
 
     List<InputRow> rows = Lists.newArrayList();

http://git-wip-us.apache.org/repos/asf/hive/blob/c284587f/pom.xml
----------------------------------------------------------------------
diff --git a/pom.xml b/pom.xml
index 40699bc..6b59505 100644
--- a/pom.xml
+++ b/pom.xml
@@ -138,7 +138,7 @@
     <derby.version>10.10.2.0</derby.version>
     <dropwizard.version>3.1.0</dropwizard.version>
     <dropwizard-metrics-hadoop-metrics2-reporter.version>0.1.2</dropwizard-metrics-hadoop-metrics2-reporter.version>
-    <druid.version>0.10.0</druid.version>
+    <druid.version>0.10.1</druid.version>
     <guava.version>14.0.1</guava.version>
     <groovy.version>2.4.11</groovy.version>
     <h2database.version>1.3.166</h2database.version>