You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@hive.apache.org by ha...@apache.org on 2018/02/13 21:39:20 UTC

hive git commit: HIVE-17627 : Use druid scan query instead of the select query. (Nishant Bangarwa via Slim B, Ashutosh Chauhan)

Repository: hive
Updated Branches:
  refs/heads/master 5daad4e44 -> cf4114e1b


HIVE-17627 : Use druid scan query instead of the select query. (Nishant Bangarwa via Slim B, Ashutosh Chauhan)

Signed-off-by: Ashutosh Chauhan <ha...@apache.org>


Project: http://git-wip-us.apache.org/repos/asf/hive/repo
Commit: http://git-wip-us.apache.org/repos/asf/hive/commit/cf4114e1
Tree: http://git-wip-us.apache.org/repos/asf/hive/tree/cf4114e1
Diff: http://git-wip-us.apache.org/repos/asf/hive/diff/cf4114e1

Branch: refs/heads/master
Commit: cf4114e1b72b0637b92d4d1267ac9b779d48a29a
Parents: 5daad4e
Author: Nishant Bangarwa <ni...@gmail.com>
Authored: Tue Jan 23 11:08:00 2018 -0800
Committer: Ashutosh Chauhan <ha...@apache.org>
Committed: Tue Feb 13 13:38:40 2018 -0800

----------------------------------------------------------------------
 .../druid/io/DruidQueryBasedInputFormat.java    |  98 +++++++++++++-----
 .../druid/serde/DruidScanQueryRecordReader.java | 102 +++++++++++++++++++
 .../hadoop/hive/druid/serde/DruidSerDe.java     |  49 ++++++++-
 3 files changed, 225 insertions(+), 24 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/hive/blob/cf4114e1/druid-handler/src/java/org/apache/hadoop/hive/druid/io/DruidQueryBasedInputFormat.java
----------------------------------------------------------------------
diff --git a/druid-handler/src/java/org/apache/hadoop/hive/druid/io/DruidQueryBasedInputFormat.java b/druid-handler/src/java/org/apache/hadoop/hive/druid/io/DruidQueryBasedInputFormat.java
index 7bdc172..33f6412 100644
--- a/druid-handler/src/java/org/apache/hadoop/hive/druid/io/DruidQueryBasedInputFormat.java
+++ b/druid-handler/src/java/org/apache/hadoop/hive/druid/io/DruidQueryBasedInputFormat.java
@@ -36,6 +36,7 @@ import org.apache.hadoop.hive.druid.DruidStorageHandler;
 import org.apache.hadoop.hive.druid.DruidStorageHandlerUtils;
 import org.apache.hadoop.hive.druid.serde.DruidGroupByQueryRecordReader;
 import org.apache.hadoop.hive.druid.serde.DruidQueryRecordReader;
+import org.apache.hadoop.hive.druid.serde.DruidScanQueryRecordReader;
 import org.apache.hadoop.hive.druid.serde.DruidSelectQueryRecordReader;
 import org.apache.hadoop.hive.druid.serde.DruidTimeseriesQueryRecordReader;
 import org.apache.hadoop.hive.druid.serde.DruidTopNQueryRecordReader;
@@ -68,6 +69,7 @@ import io.druid.query.Druids.SelectQueryBuilder;
 import io.druid.query.LocatedSegmentDescriptor;
 import io.druid.query.Query;
 import io.druid.query.SegmentDescriptor;
+import io.druid.query.scan.ScanQuery;
 import io.druid.query.select.PagingSpec;
 import io.druid.query.select.SelectQuery;
 import io.druid.query.spec.MultipleSpecificSegmentSpec;
@@ -93,6 +95,8 @@ public class DruidQueryBasedInputFormat extends InputFormat<NullWritable, DruidW
       return new DruidGroupByQueryRecordReader();
     case Query.SELECT:
       return new DruidSelectQueryRecordReader();
+    case Query.SCAN:
+      return new DruidScanQueryRecordReader();
     }
     return null;
   }
@@ -155,7 +159,11 @@ public class DruidQueryBasedInputFormat extends InputFormat<NullWritable, DruidW
         SelectQuery selectQuery = DruidStorageHandlerUtils.JSON_MAPPER.readValue(
                 druidQuery, SelectQuery.class);
         return distributeSelectQuery(conf, address, selectQuery, paths[0]);
-      default:
+      case Query.SCAN:
+        ScanQuery scanQuery = DruidStorageHandlerUtils.JSON_MAPPER.readValue(
+            druidQuery, ScanQuery.class);
+        return distributeScanQuery(conf, address, scanQuery, paths[0]);
+    default:
         throw new IOException("Druid query type not recognized");
     }
   }
@@ -186,28 +194,8 @@ public class DruidQueryBasedInputFormat extends InputFormat<NullWritable, DruidW
               new String[]{address} ) };
     }
 
-    final String intervals =
-            StringUtils.join(query.getIntervals(), ","); // Comma-separated intervals without brackets
-    final String request = String.format(
-            "http://%s/druid/v2/datasources/%s/candidates?intervals=%s",
-            address, query.getDataSource().getNames().get(0), URLEncoder.encode(intervals, "UTF-8"));
-    LOG.debug("sending request {} to query for segments", request);
-    final InputStream response;
-    try {
-      response = DruidStorageHandlerUtils.submitRequest(DruidStorageHandler.getHttpClient(), new Request(HttpMethod.GET, new URL(request)));
-    } catch (Exception e) {
-      throw new IOException(org.apache.hadoop.util.StringUtils.stringifyException(e));
-    }
-
-    // Retrieve results
-    final List<LocatedSegmentDescriptor> segmentDescriptors;
-    try {
-      segmentDescriptors = DruidStorageHandlerUtils.JSON_MAPPER.readValue(response,
-              new TypeReference<List<LocatedSegmentDescriptor>>() {});
-    } catch (Exception e) {
-      response.close();
-      throw new IOException(org.apache.hadoop.util.StringUtils.stringifyException(e));
-    }
+    final List<LocatedSegmentDescriptor> segmentDescriptors = fetchLocatedSegmentDescriptors(
+        address, query);
 
     // Create one input split for each segment
     final int numSplits = segmentDescriptors.size();
@@ -233,6 +221,70 @@ public class DruidQueryBasedInputFormat extends InputFormat<NullWritable, DruidW
     return splits;
   }
 
+  /* New method that distributes the Scan query by creating splits containing
+   * information about different Druid nodes that have the data for the given
+   * query. */
+  private static HiveDruidSplit[] distributeScanQuery(Configuration conf, String address,
+      ScanQuery query, Path dummyPath) throws IOException {
+    // If it has a limit, we use it and we do not distribute the query
+    final boolean isFetch = query.getContextBoolean(Constants.DRUID_QUERY_FETCH, false);
+    if (isFetch) {
+      return new HiveDruidSplit[] { new HiveDruidSplit(
+          DruidStorageHandlerUtils.JSON_MAPPER.writeValueAsString(query), dummyPath,
+          new String[]{address} ) };
+    }
+
+    final List<LocatedSegmentDescriptor> segmentDescriptors = fetchLocatedSegmentDescriptors(
+        address, query);
+
+    // Create one input split for each segment
+    final int numSplits = segmentDescriptors.size();
+    final HiveDruidSplit[] splits = new HiveDruidSplit[segmentDescriptors.size()];
+    for (int i = 0; i < numSplits; i++) {
+      final LocatedSegmentDescriptor locatedSD = segmentDescriptors.get(i);
+      final String[] hosts = new String[locatedSD.getLocations().size()];
+      for (int j = 0; j < locatedSD.getLocations().size(); j++) {
+        hosts[j] = locatedSD.getLocations().get(j).getHost();
+      }
+      // Create partial Select query
+      final SegmentDescriptor newSD = new SegmentDescriptor(
+          locatedSD.getInterval(), locatedSD.getVersion(), locatedSD.getPartitionNumber());
+      final Query partialQuery = query
+          .withQuerySegmentSpec(new MultipleSpecificSegmentSpec(Lists.newArrayList(newSD)));
+      splits[i] = new HiveDruidSplit(DruidStorageHandlerUtils.JSON_MAPPER.writeValueAsString(partialQuery),
+          dummyPath, hosts);
+    }
+    return splits;
+  }
+
+  private static List<LocatedSegmentDescriptor> fetchLocatedSegmentDescriptors(String address,
+      BaseQuery query) throws IOException {
+    final String intervals =
+            StringUtils.join(query.getIntervals(), ","); // Comma-separated intervals without brackets
+    final String request = String.format(
+            "http://%s/druid/v2/datasources/%s/candidates?intervals=%s",
+            address, query.getDataSource().getNames().get(0), URLEncoder.encode(intervals, "UTF-8"));
+    LOG.debug("sending request {} to query for segments", request);
+    final InputStream response;
+    try {
+      response = DruidStorageHandlerUtils
+          .submitRequest(DruidStorageHandler.getHttpClient(), new Request(HttpMethod.GET, new URL(request)));
+    } catch (Exception e) {
+      throw new IOException(org.apache.hadoop.util.StringUtils.stringifyException(e));
+    }
+
+    // Retrieve results
+    final List<LocatedSegmentDescriptor> segmentDescriptors;
+    try {
+      segmentDescriptors = DruidStorageHandlerUtils.JSON_MAPPER.readValue(response,
+              new TypeReference<List<LocatedSegmentDescriptor>>() {});
+    } catch (Exception e) {
+      response.close();
+      throw new IOException(org.apache.hadoop.util.StringUtils.stringifyException(e));
+    }
+    return segmentDescriptors;
+  }
+
   private static String deserializeSerialize(String druidQuery)
           throws JsonParseException, JsonMappingException, IOException {
     BaseQuery<?> deserializedQuery = DruidStorageHandlerUtils.JSON_MAPPER.readValue(

http://git-wip-us.apache.org/repos/asf/hive/blob/cf4114e1/druid-handler/src/java/org/apache/hadoop/hive/druid/serde/DruidScanQueryRecordReader.java
----------------------------------------------------------------------
diff --git a/druid-handler/src/java/org/apache/hadoop/hive/druid/serde/DruidScanQueryRecordReader.java b/druid-handler/src/java/org/apache/hadoop/hive/druid/serde/DruidScanQueryRecordReader.java
new file mode 100644
index 0000000..cbeac2c
--- /dev/null
+++ b/druid-handler/src/java/org/apache/hadoop/hive/druid/serde/DruidScanQueryRecordReader.java
@@ -0,0 +1,102 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.hadoop.hive.druid.serde;
+
+import io.druid.query.Result;
+import io.druid.query.scan.ScanQuery;
+import io.druid.query.scan.ScanResultValue;
+
+import org.apache.hadoop.hive.druid.DruidStorageHandlerUtils;
+import org.apache.hadoop.io.NullWritable;
+
+import com.fasterxml.jackson.core.type.TypeReference;
+import com.fasterxml.jackson.databind.JavaType;
+import com.google.common.collect.Iterators;
+
+import java.io.IOException;
+import java.util.Iterator;
+import java.util.List;
+
+/**
+ * Record reader for results for Druid ScanQuery.
+ */
+public class DruidScanQueryRecordReader
+    extends DruidQueryRecordReader<ScanQuery, ScanResultValue> {
+
+  private static final TypeReference<ScanResultValue> TYPE_REFERENCE =
+      new TypeReference<ScanResultValue>() {
+      };
+
+  private ScanResultValue current;
+
+  private Iterator<List<Object>> compactedValues = Iterators.emptyIterator();
+
+  @Override
+  protected JavaType getResultTypeDef() {
+    return DruidStorageHandlerUtils.JSON_MAPPER.getTypeFactory().constructType(TYPE_REFERENCE);
+  }
+
+  @Override
+  public boolean nextKeyValue() throws IOException {
+    if (compactedValues.hasNext()) {
+      return true;
+    }
+    if (queryResultsIterator.hasNext()) {
+      current = queryResultsIterator.next();
+      compactedValues = ((List<List<Object>>) current.getEvents()).iterator();
+      return nextKeyValue();
+    }
+    return false;
+  }
+
+  @Override
+  public NullWritable getCurrentKey() throws IOException, InterruptedException {
+    return NullWritable.get();
+  }
+
+  @Override
+  public DruidWritable getCurrentValue() throws IOException, InterruptedException {
+    // Create new value
+    DruidWritable value = new DruidWritable();
+    List<Object> e = compactedValues.next();
+    for (int i = 0; i < current.getColumns().size(); i++) {
+      value.getValue().put(current.getColumns().get(i), e.get(i));
+    }
+    return value;
+  }
+
+  @Override
+  public boolean next(NullWritable key, DruidWritable value) throws IOException {
+    if (nextKeyValue()) {
+      // Update value
+      value.getValue().clear();
+      List<Object> e = compactedValues.next();
+      for (int i = 0; i < current.getColumns().size(); i++) {
+        value.getValue().put(current.getColumns().get(i), e.get(i));
+      }
+      return true;
+    }
+    return false;
+  }
+
+  @Override
+  public float getProgress() {
+    return queryResultsIterator.hasNext() || compactedValues.hasNext() ? 0 : 1;
+  }
+
+}

http://git-wip-us.apache.org/repos/asf/hive/blob/cf4114e1/druid-handler/src/java/org/apache/hadoop/hive/druid/serde/DruidSerDe.java
----------------------------------------------------------------------
diff --git a/druid-handler/src/java/org/apache/hadoop/hive/druid/serde/DruidSerDe.java b/druid-handler/src/java/org/apache/hadoop/hive/druid/serde/DruidSerDe.java
index 3899bff..3696b0f 100644
--- a/druid-handler/src/java/org/apache/hadoop/hive/druid/serde/DruidSerDe.java
+++ b/druid-handler/src/java/org/apache/hadoop/hive/druid/serde/DruidSerDe.java
@@ -98,6 +98,7 @@ import io.druid.query.groupby.GroupByQuery;
 import io.druid.query.metadata.metadata.ColumnAnalysis;
 import io.druid.query.metadata.metadata.SegmentAnalysis;
 import io.druid.query.metadata.metadata.SegmentMetadataQuery;
+import io.druid.query.scan.ScanQuery;
 import io.druid.query.select.SelectQuery;
 import io.druid.query.timeseries.TimeseriesQuery;
 import io.druid.query.topn.TopNQuery;
@@ -245,6 +246,15 @@ public class DruidSerDe extends AbstractSerDe {
             inferSchema((GroupByQuery) query, tsTZTypeInfo, columnNames, columnTypes,
                     mapColumnNamesTypes.build());
             break;
+          case Query.SCAN:
+            String broker = HiveConf.getVar(configuration,
+                HiveConf.ConfVars.HIVE_DRUID_BROKER_DEFAULT_ADDRESS);
+            if (org.apache.commons.lang3.StringUtils.isEmpty(broker)) {
+              throw new SerDeException("Druid broker address not specified in configuration");
+            }
+            inferSchema((ScanQuery) query, tsTZTypeInfo, columnNames, columnTypes, broker,
+                mapColumnNamesTypes.build());
+            break;
           default:
             throw new SerDeException("Not supported Druid query");
         }
@@ -406,6 +416,43 @@ public class DruidSerDe extends AbstractSerDe {
     }
   }
 
+  /* Scan query */
+  private void inferSchema(ScanQuery query, TimestampLocalTZTypeInfo timeColumnTypeInfo,
+      List<String> columnNames, List<PrimitiveTypeInfo> columnTypes,
+      String address, Map<String, PrimitiveTypeInfo> mapColumnNamesTypes)
+      throws SerDeException {
+    // The type for metric columns is not explicit in the query, thus in this case
+    // we need to emit a metadata query to know their type
+    SegmentMetadataQueryBuilder builder = new Druids.SegmentMetadataQueryBuilder();
+    builder.dataSource(query.getDataSource());
+    builder.merge(true);
+    builder.analysisTypes();
+    SegmentMetadataQuery metadataQuery = builder.build();
+    // Execute query in Druid
+    SegmentAnalysis schemaInfo;
+    try {
+      schemaInfo = submitMetadataRequest(address, metadataQuery);
+    } catch (IOException e) {
+      throw new SerDeException(e);
+    }
+    if (schemaInfo == null) {
+      throw new SerDeException("Connected to Druid but could not retrieve datasource information");
+    }
+    for (String column : query.getColumns()) {
+      columnNames.add(column);
+      PrimitiveTypeInfo typeInfo = mapColumnNamesTypes.get(column);
+      if (typeInfo != null) {
+        // If datasource was created by Hive, we consider Hive type
+        columnTypes.add(typeInfo);
+      } else {
+        ColumnAnalysis columnAnalysis = schemaInfo.getColumns().get(column);
+        // If column is absent from Druid consider it as a dimension with type string.
+        String type = columnAnalysis == null ? DruidSerDeUtils.STRING_TYPE : columnAnalysis.getType();
+        columnTypes.add(DruidSerDeUtils.convertDruidToHiveType(type));
+      }
+    }
+  }
+
   /* GroupBy query */
   private void inferSchema(GroupByQuery query, TimestampLocalTZTypeInfo timeColumnTypeInfo,
           List<String> columnNames, List<PrimitiveTypeInfo> columnTypes,
@@ -543,7 +590,7 @@ public class DruidSerDe extends AbstractSerDe {
               new TimestampLocalTZWritable(
                   new TimestampTZ(
                       ZonedDateTime.ofInstant(
-                          Instant.ofEpochMilli((Long) value),
+                          Instant.ofEpochMilli(((Number) value).longValue()),
                           ((TimestampLocalTZTypeInfo) types[i]).timeZone()))));
           break;
         case BYTE: