You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@druid.apache.org by GitBox <gi...@apache.org> on 2022/10/14 08:06:29 UTC

[GitHub] [druid] imply-cheddar commented on a diff in pull request #13085: Refactoring the data source before unnest

imply-cheddar commented on code in PR #13085:
URL: https://github.com/apache/druid/pull/13085#discussion_r995464088


##########
extensions-core/multi-stage-query/src/main/java/org/apache/druid/msq/querykit/InputNumberDataSource.java:
##########
@@ -81,6 +92,29 @@ public boolean isConcrete()
     return false;
   }
 
+  @Override
+  public Function<SegmentReference, SegmentReference> createSegmentMapFunction(
+      Query query,
+      AtomicLong cpuTime
+  )
+  {
+    final DataSource dataSourceWithInlinedChannelData = broadcastJoinHelper.inlineChannelData(query.getDataSource());

Review Comment:
   Probably nice to do a null check on the `broadcastJoinHelper` here and produce an error message that's meaningful to the developer.



##########
core/src/main/java/org/apache/druid/guice/GuiceInjectableValues.java:
##########
@@ -50,7 +53,15 @@ public Object findInjectableValue(
     // Currently we should only be dealing with `Key` instances, and anything more advanced should be handled with
     // great care
     if (valueId instanceof Key) {
-      return injector.getInstance((Key) valueId);
+      try {
+        return injector.getInstance((Key) valueId);
+      }
+      catch (ConfigurationException ce) {
+        //check if annotation is nullable
+        if (forProperty.getAnnotation(Nullable.class).annotationType().isAnnotation()) {

Review Comment:
   Are you sure that `forProperty.getAnnotation()` never returns null?  If it does, I fear that this code is going to convert an actually useful `ConfigurationException` into a completely uninterpretable `NullPointerException`



##########
benchmarks/src/test/java/org/apache/druid/benchmark/query/SqlBenchmark.java:
##########
@@ -88,8 +86,8 @@
  */
 @State(Scope.Benchmark)
 @Fork(value = 1)
-@Warmup(iterations = 5)
-@Measurement(iterations = 15)
+@Warmup(iterations = 3)
+@Measurement(iterations = 10)

Review Comment:
   Why the changes in this file?



##########
processing/src/main/java/org/apache/druid/segment/join/JoinableFactoryWrapper.java:
##########
@@ -77,144 +55,13 @@ public JoinableFactoryWrapper(final JoinableFactory joinableFactory)
     this.joinableFactory = Preconditions.checkNotNull(joinableFactory, "joinableFactory");
   }
 
-  public JoinableFactory getJoinableFactory()
-  {
-    return joinableFactory;
-  }
-
-  /**
-   * Creates a Function that maps base segments to {@link HashJoinSegment} if needed (i.e. if the number of join
-   * clauses is > 0). If mapping is not needed, this method will return {@link Function#identity()}.
-   *
-   * @param baseFilter         Filter to apply before the join takes place
-   * @param clauses            Pre-joinable clauses
-   * @param cpuTimeAccumulator An accumulator that we will add CPU nanos to; this is part of the function to encourage
-   *                           callers to remember to track metrics on CPU time required for creation of Joinables
-   * @param query              The query that will be run on the mapped segments. Usually this should be
-   *                           {@code analysis.getBaseQuery().orElse(query)}, where "analysis" is a
-   *                           {@link DataSourceAnalysis} and "query" is the original
-   *                           query from the end user.
-   */
-  public Function<SegmentReference, SegmentReference> createSegmentMapFn(
-      @Nullable final Filter baseFilter,
-      final List<PreJoinableClause> clauses,
-      final AtomicLong cpuTimeAccumulator,
-      final Query<?> query
-  )
-  {
-    // compute column correlations here and RHS correlated values
-    return JvmUtils.safeAccumulateThreadCpuTime(
-        cpuTimeAccumulator,
-        () -> {
-          if (clauses.isEmpty()) {
-            return Function.identity();
-          } else {
-            final JoinableClauses joinableClauses = JoinableClauses.createClauses(clauses, joinableFactory);
-            final JoinFilterRewriteConfig filterRewriteConfig = JoinFilterRewriteConfig.forQuery(query);
-
-            // Pick off any join clauses that can be converted into filters.
-            final Set<String> requiredColumns = query.getRequiredColumns();
-            final Filter baseFilterToUse;
-            final List<JoinableClause> clausesToUse;
-
-            if (requiredColumns != null && filterRewriteConfig.isEnableRewriteJoinToFilter()) {
-              final Pair<List<Filter>, List<JoinableClause>> conversionResult = convertJoinsToFilters(
-                  joinableClauses.getJoinableClauses(),
-                  requiredColumns,
-                  Ints.checkedCast(Math.min(filterRewriteConfig.getFilterRewriteMaxSize(), Integer.MAX_VALUE))
-              );
-
-              baseFilterToUse =
-                  Filters.maybeAnd(
-                      Lists.newArrayList(
-                          Iterables.concat(
-                              Collections.singleton(baseFilter),
-                              conversionResult.lhs
-                          )
-                      )
-                  ).orElse(null);
-              clausesToUse = conversionResult.rhs;
-            } else {
-              baseFilterToUse = baseFilter;
-              clausesToUse = joinableClauses.getJoinableClauses();
-            }
-
-            // Analyze remaining join clauses to see if filters on them can be pushed down.
-            final JoinFilterPreAnalysis joinFilterPreAnalysis = JoinFilterAnalyzer.computeJoinFilterPreAnalysis(
-                new JoinFilterPreAnalysisKey(
-                    filterRewriteConfig,
-                    clausesToUse,
-                    query.getVirtualColumns(),
-                    Filters.maybeAnd(Arrays.asList(baseFilterToUse, Filters.toFilter(query.getFilter())))
-                           .orElse(null)
-                )
-            );
-
-            return baseSegment ->
-                new HashJoinSegment(
-                    baseSegment,
-                    baseFilterToUse,
-                    GuavaUtils.firstNonNull(clausesToUse, ImmutableList.of()),
-                    joinFilterPreAnalysis
-                );
-          }
-        }
-    );
-  }
-
-  /**
-   * Compute a cache key prefix for a join data source. This includes the data sources that participate in the RHS of a
-   * join as well as any query specific constructs associated with join data source such as base table filter. This key prefix
-   * can be used in segment level cache or result level cache. The function can return following wrapped in an
-   * Optional
-   * - Non-empty byte array - If there is join datasource involved and caching is possible. The result includes
-   * join condition expression, join type and cache key returned by joinable factory for each {@link PreJoinableClause}
-   * - NULL - There is a join but caching is not possible. It may happen if one of the participating datasource
-   * in the JOIN is not cacheable.
-   *
-   * @param dataSourceAnalysis for the join datasource
-   *
-   * @return the optional cache key to be used as part of query cache key
-   *
-   * @throws {@link IAE} if this operation is called on a non-join data source
-   */
-  public Optional<byte[]> computeJoinDataSourceCacheKey(
-      final DataSourceAnalysis dataSourceAnalysis
-  )
-  {
-    final List<PreJoinableClause> clauses = dataSourceAnalysis.getPreJoinableClauses();
-    if (clauses.isEmpty()) {
-      throw new IAE("No join clauses to build the cache key for data source [%s]", dataSourceAnalysis.getDataSource());
-    }
-
-    final CacheKeyBuilder keyBuilder;
-    keyBuilder = new CacheKeyBuilder(JOIN_OPERATION);
-    if (dataSourceAnalysis.getJoinBaseTableFilter().isPresent()) {
-      keyBuilder.appendCacheable(dataSourceAnalysis.getJoinBaseTableFilter().get());
-    }
-    for (PreJoinableClause clause : clauses) {
-      Optional<byte[]> bytes = joinableFactory.computeJoinCacheKey(clause.getDataSource(), clause.getCondition());
-      if (!bytes.isPresent()) {
-        // Encountered a data source which didn't support cache yet
-        log.debug("skipping caching for join since [%s] does not support caching", clause.getDataSource());
-        return Optional.empty();
-      }
-      keyBuilder.appendByteArray(bytes.get());
-      keyBuilder.appendString(clause.getCondition().getOriginalExpression());
-      keyBuilder.appendString(clause.getPrefix());
-      keyBuilder.appendString(clause.getJoinType().name());
-    }
-    return Optional.of(keyBuilder.build());
-  }
-
-
   /**
    * Converts any join clauses to filters that can be converted, and returns the rest as-is.
-   *
+   * <p>
    * See {@link #convertJoinToFilter} for details on the logic.
    */
   @VisibleForTesting

Review Comment:
   Don't really need this annotation on a public method :)



##########
services/src/main/java/org/apache/druid/cli/CliRouter.java:
##########
@@ -112,7 +114,8 @@ protected List<? extends Module> getModules()
           binder.bind(new TypeLiteral<List<TieredBrokerSelectorStrategy>>() {})
                 .toProvider(TieredBrokerSelectorStrategiesProvider.class)
                 .in(LazySingleton.class);
-
+          
+          binder.bind(JoinableFactory.class).to(NoopJoinableFactory.class).in(LazySingleton.class);

Review Comment:
   This shouldn't be necessary anymore, right?



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscribe@druid.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscribe@druid.apache.org
For additional commands, e-mail: commits-help@druid.apache.org