You are viewing a plain text version of this content. The canonical link for it is here.
Posted to gitbox@hive.apache.org by GitBox <gi...@apache.org> on 2019/09/09 09:32:23 UTC

[GitHub] [hive] b-slim commented on a change in pull request #723: [HIVE-20683] Add the Ability to push Dynamic Between and Bloom filters to Druid

b-slim commented on a change in pull request #723: [HIVE-20683] Add the Ability to push Dynamic Between and Bloom filters to Druid
URL: https://github.com/apache/hive/pull/723#discussion_r322146950
 
 

 ##########
 File path: druid-handler/src/java/org/apache/hadoop/hive/druid/DruidStorageHandlerUtils.java
 ##########
 @@ -894,4 +945,255 @@ public static IndexSpec getIndexSpec(Configuration jc) {
     ImmutableList<AggregatorFactory> aggregatorFactories = aggregatorFactoryBuilder.build();
     return Pair.of(dimensions, aggregatorFactories.toArray(new AggregatorFactory[0]));
   }
+
+  // Druid only supports String,Long,Float,Double selectors
+  private static Set<TypeInfo> druidSupportedTypeInfos = ImmutableSet.<TypeInfo>of(
+      TypeInfoFactory.stringTypeInfo, TypeInfoFactory.charTypeInfo,
+      TypeInfoFactory.varcharTypeInfo, TypeInfoFactory.byteTypeInfo,
+      TypeInfoFactory.intTypeInfo, TypeInfoFactory.longTypeInfo,
+      TypeInfoFactory.shortTypeInfo, TypeInfoFactory.doubleTypeInfo
+  );
+
+  private static Set<TypeInfo> stringTypeInfos = ImmutableSet.<TypeInfo>of(
+      TypeInfoFactory.stringTypeInfo,
+      TypeInfoFactory.charTypeInfo, TypeInfoFactory.varcharTypeInfo
+  );
+
+
+  public static org.apache.druid.query.Query addDynamicFilters(org.apache.druid.query.Query query,
+      ExprNodeGenericFuncDesc filterExpr, Configuration conf, boolean resolveDynamicValues
+  ) {
+    List<VirtualColumn> virtualColumns = Arrays
+        .asList(getVirtualColumns(query).getVirtualColumns());
+    org.apache.druid.query.Query rv = query;
+    DimFilter joinReductionFilter = toDruidFilter(filterExpr, conf, virtualColumns,
+        resolveDynamicValues
+    );
+    if(joinReductionFilter != null) {
+      String type = query.getType();
+      DimFilter filter = new AndDimFilter(joinReductionFilter, query.getFilter());
+      switch (type) {
+      case org.apache.druid.query.Query.TIMESERIES:
+        rv = Druids.TimeseriesQueryBuilder.copy((TimeseriesQuery) query)
+            .filters(filter)
+            .virtualColumns(VirtualColumns.create(virtualColumns))
+            .build();
+        break;
+      case org.apache.druid.query.Query.TOPN:
+        rv = new TopNQueryBuilder((TopNQuery) query)
+            .filters(filter)
+            .virtualColumns(VirtualColumns.create(virtualColumns))
+            .build();
+        break;
+      case org.apache.druid.query.Query.GROUP_BY:
+        rv = new GroupByQuery.Builder((GroupByQuery) query)
+            .setDimFilter(filter)
+            .setVirtualColumns(VirtualColumns.create(virtualColumns))
+            .build();
+        break;
+      case org.apache.druid.query.Query.SCAN:
+        rv = ScanQuery.ScanQueryBuilder.copy((ScanQuery) query)
+            .filters(filter)
+            .virtualColumns(VirtualColumns.create(virtualColumns))
+            .build();
+        break;
+      case org.apache.druid.query.Query.SELECT:
+        rv = Druids.SelectQueryBuilder.copy((SelectQuery) query)
+            .filters(filter)
+            .virtualColumns(VirtualColumns.create(virtualColumns))
+            .build();
+        break;
+      default:
+        throw new UnsupportedOperationException("Unsupported Query type " + type);
+      }
+    }
+    return rv;
+  }
+
+  @Nullable
+  private static DimFilter toDruidFilter(ExprNodeDesc filterExpr, Configuration configuration,
+      List<VirtualColumn> virtualColumns, boolean resolveDynamicValues
+  ) {
+    if(filterExpr == null) {
+      return null;
+    }
+    Class<? extends GenericUDF> genericUDFClass = getGenericUDFClassFromExprDesc(filterExpr);
+    if(FunctionRegistry.isOpAnd(filterExpr)) {
+      Iterator<ExprNodeDesc> iterator = filterExpr.getChildren().iterator();
+      List<DimFilter> delegates = Lists.newArrayList();
+      while (iterator.hasNext()) {
+        DimFilter filter = toDruidFilter(iterator.next(), configuration, virtualColumns,
+            resolveDynamicValues
+        );
+        if(filter != null) {
+          delegates.add(filter);
+        }
+      }
+      if(delegates != null && !delegates.isEmpty()) {
+        return new AndDimFilter(delegates);
+      }
+    }
+    if(FunctionRegistry.isOpOr(filterExpr)) {
+      Iterator<ExprNodeDesc> iterator = filterExpr.getChildren().iterator();
+      List<DimFilter> delegates = Lists.newArrayList();
+      while (iterator.hasNext()) {
+        DimFilter filter = toDruidFilter(iterator.next(), configuration, virtualColumns,
+            resolveDynamicValues
+        );
+        if(filter != null) {
+          delegates.add(filter);
+        }
+      }
+      if(delegates != null) {
+        return new OrDimFilter(delegates);
+      }
+    } else if(GenericUDFBetween.class == genericUDFClass) {
+      List<ExprNodeDesc> child = filterExpr.getChildren();
+      String col = extractColName(child.get(1), virtualColumns);
+      if(col != null) {
+        try {
+          StringComparator comparator = stringTypeInfos.contains(child.get(1).getTypeInfo())
+              ? StringComparators.LEXICOGRAPHIC
+              : StringComparators.NUMERIC;
+          String lower = evaluate(child.get(2), configuration, resolveDynamicValues);
+          String upper = evaluate(child.get(3), configuration, resolveDynamicValues);
+          return new BoundDimFilter(col, lower, upper, false, false, null, null,
+              comparator
+          );
+
+        } catch (HiveException e) {
+          throw new RuntimeException(e);
+        }
+      }
+    } else if(GenericUDFInBloomFilter.class == genericUDFClass) {
+      List<ExprNodeDesc> child = filterExpr.getChildren();
+      String col = extractColName(child.get(0), virtualColumns);
+      if(col != null) {
+        try {
+          BloomKFilter bloomFilter = evaluateBloomFilter(child.get(1), configuration,
+              resolveDynamicValues
+          );
+          return new BloomDimFilter(col, BloomKFilterHolder.fromBloomKFilter(bloomFilter), null);
+        } catch (HiveException e) {
+          throw new RuntimeException(e);
+        } catch (IOException e) {
+          throw new RuntimeException(e);
+        }
+      }
+    }
+    return null;
+  }
+
+  private static String evaluate(ExprNodeDesc desc, Configuration configuration,
+      boolean resolveDynamicValue
+  ) throws HiveException {
+    ExprNodeEvaluator exprNodeEvaluator = ExprNodeEvaluatorFactory.get(desc, configuration);
+    if(exprNodeEvaluator instanceof ExprNodeDynamicValueEvaluator && !resolveDynamicValue) {
+      return desc.getExprStringForExplain();
+    } else {
+      return exprNodeEvaluator.evaluate(null).toString();
+    }
+  }
+
+  private static BloomKFilter evaluateBloomFilter(ExprNodeDesc desc, Configuration configuration,
+      boolean resolveDynamicValue
+  )
+      throws HiveException, IOException {
+    if(!resolveDynamicValue) {
+      // return a dummy bloom filter for explain
+      return new BloomKFilter(1);
+    } else {
+      BytesWritable bw = (BytesWritable) ExprNodeEvaluatorFactory.get(desc, configuration)
+              .evaluate(null);
+      return BloomKFilter.deserialize(ByteBuffer.wrap(bw.getBytes()));
+    }
+  }
+
+  public static String extractColName(ExprNodeDesc expr, List<VirtualColumn> virtualColumns) {
+    if(!druidSupportedTypeInfos.contains(expr.getTypeInfo())) {
+      // This column type is currently not supported in druid.(e.g boolean)
+      // We cannot pass the bloom filter to druid since bloom filter tests for exact object bytes.
+      return null;
+    }
+    if(expr instanceof ExprNodeColumnDesc) {
 
 Review comment:
   wrong formatting all over, `if(` -> `if (`

----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
users@infra.apache.org


With regards,
Apache Git Services

---------------------------------------------------------------------
To unsubscribe, e-mail: gitbox-unsubscribe@hive.apache.org
For additional commands, e-mail: gitbox-help@hive.apache.org