You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@hive.apache.org by ha...@apache.org on 2018/04/13 16:58:08 UTC

hive git commit: HIVE-19187 : Update Druid Storage Handler to Druid 0.12.0 (Slim Bouguerra via Ashutosh Chauhan)

Repository: hive
Updated Branches:
  refs/heads/master 6f9090c1d -> 2a3a7d399


HIVE-19187 : Update Druid Storage Handler to Druid 0.12.0 (Slim Bouguerra via Ashutosh Chauhan)

Signed-off-by: Ashutosh Chauhan <ha...@apache.org>


Project: http://git-wip-us.apache.org/repos/asf/hive/repo
Commit: http://git-wip-us.apache.org/repos/asf/hive/commit/2a3a7d39
Tree: http://git-wip-us.apache.org/repos/asf/hive/tree/2a3a7d39
Diff: http://git-wip-us.apache.org/repos/asf/hive/diff/2a3a7d39

Branch: refs/heads/master
Commit: 2a3a7d399e7be6581f1d975bce9a9508a5177ab6
Parents: 6f9090c
Author: Slim Bouguerra <sl...@gmail.com>
Authored: Wed Apr 11 17:56:00 2018 -0700
Committer: Ashutosh Chauhan <ha...@apache.org>
Committed: Fri Apr 13 09:57:34 2018 -0700

----------------------------------------------------------------------
 .../hadoop/hive/druid/DruidStorageHandler.java  |  4 ++-
 .../hive/druid/DruidStorageHandlerUtils.java    | 30 +++++++++++++++++---
 .../hadoop/hive/druid/io/DruidOutputFormat.java |  4 ++-
 .../druid/io/DruidQueryBasedInputFormat.java    | 20 +------------
 .../hive/druid/json/KafkaTuningConfig.java      |  8 ++++--
 .../hive/ql/io/TestDruidRecordWriter.java       |  6 ++--
 pom.xml                                         |  2 +-
 7 files changed, 44 insertions(+), 30 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/hive/blob/2a3a7d39/druid-handler/src/java/org/apache/hadoop/hive/druid/DruidStorageHandler.java
----------------------------------------------------------------------
diff --git a/druid-handler/src/java/org/apache/hadoop/hive/druid/DruidStorageHandler.java b/druid-handler/src/java/org/apache/hadoop/hive/druid/DruidStorageHandler.java
index 76540b7..c0feb8d 100644
--- a/druid-handler/src/java/org/apache/hadoop/hive/druid/DruidStorageHandler.java
+++ b/druid-handler/src/java/org/apache/hadoop/hive/druid/DruidStorageHandler.java
@@ -49,6 +49,7 @@ import io.druid.metadata.SQLMetadataConnector;
 import io.druid.metadata.storage.derby.DerbyConnector;
 import io.druid.metadata.storage.derby.DerbyMetadataStorage;
 import io.druid.metadata.storage.mysql.MySQLConnector;
+import io.druid.metadata.storage.mysql.MySQLConnectorConfig;
 import io.druid.metadata.storage.postgresql.PostgreSQLConnector;
 import io.druid.query.aggregation.AggregatorFactory;
 import io.druid.segment.IndexSpec;
@@ -335,6 +336,7 @@ public class DruidStorageHandler extends DefaultHiveMetaHook implements HiveStor
         inputParser,
         dimensionsAndAggregates.rhs,
         granularitySpec,
+        null,
         DruidStorageHandlerUtils.JSON_MAPPER
     );
 
@@ -880,7 +882,7 @@ public class DruidStorageHandler extends DefaultHiveMetaHook implements HiveStor
     if (dbType.equals("mysql")) {
       connector = new MySQLConnector(storageConnectorConfigSupplier,
               Suppliers.ofInstance(getDruidMetadataStorageTablesConfig())
-      );
+          , new MySQLConnectorConfig());
     } else if (dbType.equals("postgresql")) {
       connector = new PostgreSQLConnector(storageConnectorConfigSupplier,
               Suppliers.ofInstance(getDruidMetadataStorageTablesConfig())

http://git-wip-us.apache.org/repos/asf/hive/blob/2a3a7d39/druid-handler/src/java/org/apache/hadoop/hive/druid/DruidStorageHandlerUtils.java
----------------------------------------------------------------------
diff --git a/druid-handler/src/java/org/apache/hadoop/hive/druid/DruidStorageHandlerUtils.java b/druid-handler/src/java/org/apache/hadoop/hive/druid/DruidStorageHandlerUtils.java
index 1424237..1aef565 100644
--- a/druid-handler/src/java/org/apache/hadoop/hive/druid/DruidStorageHandlerUtils.java
+++ b/druid-handler/src/java/org/apache/hadoop/hive/druid/DruidStorageHandlerUtils.java
@@ -26,6 +26,7 @@ import io.druid.math.expr.ExprMacroTable;
 import io.druid.metadata.MetadataStorageTablesConfig;
 import io.druid.metadata.SQLMetadataConnector;
 import io.druid.metadata.storage.mysql.MySQLConnector;
+import io.druid.query.Druids;
 import io.druid.query.expression.LikeExprMacro;
 import io.druid.query.expression.RegexpExtractExprMacro;
 import io.druid.query.expression.TimestampCeilExprMacro;
@@ -35,7 +36,9 @@ import io.druid.query.expression.TimestampFormatExprMacro;
 import io.druid.query.expression.TimestampParseExprMacro;
 import io.druid.query.expression.TimestampShiftExprMacro;
 import io.druid.query.expression.TrimExprMacro;
+import io.druid.query.select.PagingSpec;
 import io.druid.query.select.SelectQueryConfig;
+import io.druid.query.spec.MultipleIntervalSegmentSpec;
 import io.druid.segment.IndexIO;
 import io.druid.segment.IndexMergerV9;
 import io.druid.query.aggregation.AggregatorFactory;
@@ -48,6 +51,7 @@ import io.druid.segment.indexing.granularity.GranularitySpec;
 import io.druid.segment.indexing.granularity.UniformGranularitySpec;
 import io.druid.segment.loading.DataSegmentPusher;
 import io.druid.segment.realtime.appenderator.SegmentIdentifier;
+import io.druid.segment.writeout.TmpFileSegmentWriteOutMediumFactory;
 import io.druid.storage.hdfs.HdfsDataSegmentPusher;
 import io.druid.storage.hdfs.HdfsDataSegmentPusherConfig;
 import io.druid.timeline.DataSegment;
@@ -124,8 +128,10 @@ import java.net.URL;
 import java.net.UnknownHostException;
 import java.sql.SQLException;
 import java.util.ArrayList;
+import java.util.Arrays;
 import java.util.Collection;
 import java.util.Collections;
+import java.util.HashMap;
 import java.util.HashSet;
 import java.util.List;
 import java.util.Map;
@@ -168,6 +174,7 @@ public final class DruidStorageHandlerUtils {
    * Mapper to use to serialize/deserialize Druid objects (SMILE)
    */
   public static final ObjectMapper SMILE_MAPPER = new DefaultObjectMapper(new SmileFactory());
+  private static final int DEFAULT_MAX_TRIES = 10;
 
   static {
     // This is needed for serde of PagingSpec as it uses JacksonInject for injecting SelectQueryConfig
@@ -187,7 +194,8 @@ public final class DruidStorageHandlerUtils {
                 new TrimExprMacro.LeftTrimExprMacro(),
                 new TrimExprMacro.RightTrimExprMacro()
             )))
-            .addValue(ObjectMapper.class, JSON_MAPPER);
+        .addValue(ObjectMapper.class, JSON_MAPPER)
+        .addValue(DataSegment.PruneLoadSpecHolder.class, DataSegment.PruneLoadSpecHolder.DEFAULT);
 
     JSON_MAPPER.setInjectableValues(injectableValues);
     SMILE_MAPPER.setInjectableValues(injectableValues);
@@ -214,13 +222,14 @@ public final class DruidStorageHandlerUtils {
   /**
    * Used by druid to perform IO on indexes
    */
-  public static final IndexIO INDEX_IO = new IndexIO(JSON_MAPPER, () -> 0);
+  public static final IndexIO INDEX_IO =
+      new IndexIO(JSON_MAPPER, TmpFileSegmentWriteOutMediumFactory.instance(), () -> 0);
 
   /**
    * Used by druid to merge indexes
    */
   public static final IndexMergerV9 INDEX_MERGER_V9 = new IndexMergerV9(JSON_MAPPER,
-          DruidStorageHandlerUtils.INDEX_IO
+          DruidStorageHandlerUtils.INDEX_IO,TmpFileSegmentWriteOutMediumFactory.instance()
   );
 
   /**
@@ -606,7 +615,7 @@ public final class DruidStorageHandlerUtils {
                               }
                             }
                     )
-            , 3, SQLMetadataConnector.DEFAULT_MAX_TRIES);
+            , 3, DEFAULT_MAX_TRIES);
     return segmentList;
   }
 
@@ -637,6 +646,19 @@ public final class DruidStorageHandlerUtils {
     );
   }
 
+  public static String createSelectStarQuery(String dataSource) throws IOException {
+    // Create Select query
+    Druids.SelectQueryBuilder builder = new Druids.SelectQueryBuilder();
+    builder.dataSource(dataSource);
+    final List<Interval> intervals = Arrays.asList(DEFAULT_INTERVAL);
+    builder.intervals(new MultipleIntervalSegmentSpec(intervals));
+    builder.pagingSpec(PagingSpec.newSpec(1));
+    Map<String, Object> context = new HashMap<>();
+    context.put(Constants.DRUID_QUERY_FETCH, false);
+    builder.context(context);
+    return JSON_MAPPER.writeValueAsString(builder.build());
+  }
+
   /**
    * Simple interface for retry operations
    */

http://git-wip-us.apache.org/repos/asf/hive/blob/2a3a7d39/druid-handler/src/java/org/apache/hadoop/hive/druid/io/DruidOutputFormat.java
----------------------------------------------------------------------
diff --git a/druid-handler/src/java/org/apache/hadoop/hive/druid/io/DruidOutputFormat.java b/druid-handler/src/java/org/apache/hadoop/hive/druid/io/DruidOutputFormat.java
index 15a08eb..ecb4360 100644
--- a/druid-handler/src/java/org/apache/hadoop/hive/druid/io/DruidOutputFormat.java
+++ b/druid-handler/src/java/org/apache/hadoop/hive/druid/io/DruidOutputFormat.java
@@ -129,6 +129,7 @@ public class DruidOutputFormat<K, V> implements HiveOutputFormat<K, DruidWritabl
             inputParser,
             dimensionsAndAggregates.rhs,
             granularitySpec,
+            null,
             DruidStorageHandlerUtils.JSON_MAPPER
     );
 
@@ -156,7 +157,8 @@ public class DruidOutputFormat<K, V> implements HiveOutputFormat<K, DruidWritabl
             0,
             true,
             null,
-            0L
+            0L,
+        null
     );
 
     LOG.debug(String.format("running with Data schema [%s] ", dataSchema));

http://git-wip-us.apache.org/repos/asf/hive/blob/2a3a7d39/druid-handler/src/java/org/apache/hadoop/hive/druid/io/DruidQueryBasedInputFormat.java
----------------------------------------------------------------------
diff --git a/druid-handler/src/java/org/apache/hadoop/hive/druid/io/DruidQueryBasedInputFormat.java b/druid-handler/src/java/org/apache/hadoop/hive/druid/io/DruidQueryBasedInputFormat.java
index c097a13..c2d3fe5 100644
--- a/druid-handler/src/java/org/apache/hadoop/hive/druid/io/DruidQueryBasedInputFormat.java
+++ b/druid-handler/src/java/org/apache/hadoop/hive/druid/io/DruidQueryBasedInputFormat.java
@@ -22,9 +22,7 @@ import java.io.InputStream;
 import java.net.URL;
 import java.net.URLEncoder;
 import java.util.Arrays;
-import java.util.HashMap;
 import java.util.List;
-import java.util.Map;
 
 import org.apache.commons.lang3.StringEscapeUtils;
 import org.apache.commons.lang3.StringUtils;
@@ -53,7 +51,6 @@ import org.apache.hadoop.mapreduce.RecordReader;
 import org.apache.hadoop.mapreduce.TaskAttemptContext;
 import org.apache.hadoop.mapreduce.lib.input.FileInputFormat;
 import org.jboss.netty.handler.codec.http.HttpMethod;
-import org.joda.time.Interval;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 
@@ -64,8 +61,6 @@ import com.google.common.collect.Lists;
 import com.metamx.http.client.Request;
 
 import io.druid.query.BaseQuery;
-import io.druid.query.Druids;
-import io.druid.query.Druids.SelectQueryBuilder;
 import io.druid.query.LocatedSegmentDescriptor;
 import io.druid.query.Query;
 import io.druid.query.SegmentDescriptor;
@@ -133,7 +128,7 @@ public class DruidQueryBasedInputFormat extends InputFormat<NullWritable, DruidW
         throw new IOException("Druid data source cannot be empty or null");
       }
       //@FIXME https://issues.apache.org/jira/browse/HIVE-19023 use scan instead of Select
-      druidQuery = createSelectStarQuery(dataSource);
+      druidQuery = DruidStorageHandlerUtils.createSelectStarQuery(dataSource);
       druidQueryType = Query.SELECT;
     } else {
       druidQueryType = conf.get(Constants.DRUID_QUERY_TYPE);
@@ -169,19 +164,6 @@ public class DruidQueryBasedInputFormat extends InputFormat<NullWritable, DruidW
     }
   }
 
-  private static String createSelectStarQuery(String dataSource) throws IOException {
-    // Create Select query
-    SelectQueryBuilder builder = new Druids.SelectQueryBuilder();
-    builder.dataSource(dataSource);
-    final List<Interval> intervals = Arrays.asList(DruidStorageHandlerUtils.DEFAULT_INTERVAL);
-    builder.intervals(intervals);
-    builder.pagingSpec(PagingSpec.newSpec(1));
-    Map<String, Object> context = new HashMap<>();
-    context.put(Constants.DRUID_QUERY_FETCH, false);
-    builder.context(context);
-    return DruidStorageHandlerUtils.JSON_MAPPER.writeValueAsString(builder.build());
-  }
-
   /* New method that distributes the Select query by creating splits containing
    * information about different Druid nodes that have the data for the given
    * query. */

http://git-wip-us.apache.org/repos/asf/hive/blob/2a3a7d39/druid-handler/src/java/org/apache/hadoop/hive/druid/json/KafkaTuningConfig.java
----------------------------------------------------------------------
diff --git a/druid-handler/src/java/org/apache/hadoop/hive/druid/json/KafkaTuningConfig.java b/druid-handler/src/java/org/apache/hadoop/hive/druid/json/KafkaTuningConfig.java
index ea23ddd..1ec8b5c 100644
--- a/druid-handler/src/java/org/apache/hadoop/hive/druid/json/KafkaTuningConfig.java
+++ b/druid-handler/src/java/org/apache/hadoop/hive/druid/json/KafkaTuningConfig.java
@@ -24,11 +24,11 @@ import io.druid.segment.realtime.appenderator.AppenderatorConfig;
 
 import com.fasterxml.jackson.annotation.JsonCreator;
 import com.fasterxml.jackson.annotation.JsonProperty;
-import com.fasterxml.jackson.annotation.JsonSubTypes;
-import com.fasterxml.jackson.annotation.JsonTypeInfo;
 
+import io.druid.segment.writeout.SegmentWriteOutMediumFactory;
 import org.joda.time.Period;
 
+import javax.annotation.Nullable;
 import java.io.File;
 
 /**
@@ -131,6 +131,10 @@ public class KafkaTuningConfig implements AppenderatorConfig
     return basePersistDirectory;
   }
 
+  @Nullable @Override public SegmentWriteOutMediumFactory getSegmentWriteOutMediumFactory() {
+    return null;
+  }
+
   @Override
   @JsonProperty
   public int getMaxPendingPersists()

http://git-wip-us.apache.org/repos/asf/hive/blob/2a3a7d39/druid-handler/src/test/org/apache/hadoop/hive/ql/io/TestDruidRecordWriter.java
----------------------------------------------------------------------
diff --git a/druid-handler/src/test/org/apache/hadoop/hive/ql/io/TestDruidRecordWriter.java b/druid-handler/src/test/org/apache/hadoop/hive/ql/io/TestDruidRecordWriter.java
index c1bd332..cb8fa39 100644
--- a/druid-handler/src/test/org/apache/hadoop/hive/ql/io/TestDruidRecordWriter.java
+++ b/druid-handler/src/test/org/apache/hadoop/hive/ql/io/TestDruidRecordWriter.java
@@ -144,13 +144,14 @@ public class TestDruidRecordWriter {
             new UniformGranularitySpec(
                     Granularities.DAY, Granularities.NONE, ImmutableList.of(INTERVAL_FULL)
             ),
+        null,
             objectMapper
     );
 
     IndexSpec indexSpec = new IndexSpec(new RoaringBitmapSerdeFactory(true), null, null, null);
     RealtimeTuningConfig tuningConfig = new RealtimeTuningConfig(null, null, null,
             temporaryFolder.newFolder(), null, null, null, null, indexSpec, null, 0, 0, null, null,
-            0L
+            0L, null
     );
     LocalFileSystem localFileSystem = FileSystem.getLocal(config);
     DataSegmentPusher dataSegmentPusher = new LocalDataSegmentPusher(
@@ -198,6 +199,7 @@ public class TestDruidRecordWriter {
 
     Firehose firehose = new IngestSegmentFirehose(
             ImmutableList.of(new WindowedStorageAdapter(adapter, adapter.getInterval())),
+            null,
             ImmutableList.of("host"),
             ImmutableList.of("visited_sum", "unique_hosts"),
             null
@@ -228,7 +230,7 @@ public class TestDruidRecordWriter {
               actual.getTimestamp().getMillis()
       );
       Assert.assertEquals(expected.get("host"), actual.getDimension("host"));
-      Assert.assertEquals(expected.get("visited_sum"), actual.getLongMetric("visited_sum"));
+      Assert.assertEquals(expected.get("visited_sum"), actual.getMetric("visited_sum"));
       Assert.assertEquals(
               (Double) expected.get("unique_hosts"),
               (Double) HyperUniquesAggregatorFactory

http://git-wip-us.apache.org/repos/asf/hive/blob/2a3a7d39/pom.xml
----------------------------------------------------------------------
diff --git a/pom.xml b/pom.xml
index 5802bd3..26721ed 100644
--- a/pom.xml
+++ b/pom.xml
@@ -140,7 +140,7 @@
     <derby.version>10.14.1.0</derby.version>
     <dropwizard.version>3.1.0</dropwizard.version>
     <dropwizard-metrics-hadoop-metrics2-reporter.version>0.1.2</dropwizard-metrics-hadoop-metrics2-reporter.version>
-    <druid.version>0.11.0</druid.version>
+    <druid.version>0.12.0</druid.version>
     <guava.version>19.0</guava.version>
     <groovy.version>2.4.11</groovy.version>
     <h2database.version>1.3.166</h2database.version>