You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@druid.apache.org by GitBox <gi...@apache.org> on 2022/09/14 02:27:21 UTC

[GitHub] [druid] somu-imply opened a new pull request, #13085: Unnest v1

somu-imply opened a new pull request, #13085:
URL: https://github.com/apache/druid/pull/13085

   Draft PR for refactoring before unnest
   
   This PR has:
   - [ ] been self-reviewed.
      - [ ] using the [concurrency checklist](https://github.com/apache/druid/blob/master/dev/code-review/concurrency.md) (Remove this item if the PR doesn't have any relation to concurrency.)
   - [ ] added documentation for new or modified features or behaviors.
   - [ ] added Javadocs for most classes and all non-trivial methods. Linked related entities via Javadoc links.
   - [ ] added or updated version, license, or notice information in [licenses.yaml](https://github.com/apache/druid/blob/master/dev/license.md)
   - [ ] added comments explaining the "why" and the intent of the code wherever would not be obvious for an unfamiliar reader.
   - [ ] added unit tests or modified existing tests to cover new code paths, ensuring the threshold for [code coverage](https://github.com/apache/druid/blob/master/dev/code-review/code-coverage.md) is met.
   - [ ] added integration tests.
   - [ ] been tested in a test Druid cluster.
   


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscribe@druid.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscribe@druid.apache.org
For additional commands, e-mail: commits-help@druid.apache.org


[GitHub] [druid] somu-imply commented on a diff in pull request #13085: Refactoring the data source before unnest

Posted by GitBox <gi...@apache.org>.
somu-imply commented on code in PR #13085:
URL: https://github.com/apache/druid/pull/13085#discussion_r997159469


##########
extensions-core/multi-stage-query/src/main/java/org/apache/druid/msq/querykit/BaseLeafFrameProcessor.java:
##########
@@ -146,19 +147,31 @@ protected SegmentReference mapSegment(final Segment segment)
 
   private boolean initializeSegmentMapFn(final IntSet readableInputs)
   {
+    final AtomicLong cpuAccumulator = new AtomicLong();
     if (segmentMapFn != null) {
       return true;
     } else if (broadcastJoinHelper == null) {
       segmentMapFn = Function.identity();
       return true;
     } else {
-      final boolean retVal = broadcastJoinHelper.buildBroadcastTablesIncrementally(readableInputs);
-
-      if (retVal) {
-        segmentMapFn = broadcastJoinHelper.makeSegmentMapFn(query);
+      if (query.getDataSource() instanceof InputNumberDataSource) {
+        final boolean retVal = broadcastJoinHelper.buildBroadcastTablesIncrementally(readableInputs);
+        if (retVal) {
+          InputNumberDataSource inputNumberDataSource = (InputNumberDataSource) query.getDataSource();
+          // The InputNumberDataSource requires a BroadcastJoinHelper to be able to create its
+          // segment map function.  It would be a lot better if the InputNumberDataSource actually
+          // had a way to get that injected into it on its own, but the relationship between these objects
+          // was figured out during a refactor and using a setter here seemed like the least-bad way to
+          // make progress on the refactor without breaking functionality.  Hopefully, some future
+          // developer will move this away from a setter.
+          inputNumberDataSource.setBroadcastJoinHelper(broadcastJoinHelper);
+          segmentMapFn = inputNumberDataSource.createSegmentMapFunction(query, cpuAccumulator);
+        }
+        return retVal;
+      } else {
+        segmentMapFn = Function.identity();

Review Comment:
   To continue with Abhishek's point if the parent datasource is a `JoinDataSource` with child data sources being `InputNumberDataSource`, this would basically set the map function to an identity function which is not right. One way is to first construct the data source using the broadcastJoinHelper and call createSegmentMapFunction of that. I'll discuss this with @imply-cheddar today. I believe the else part can be simplified as
   ```
   else {
         final boolean retVal = broadcastJoinHelper.buildBroadcastTablesIncrementally(readableInputs);
         if (retVal) {
           final DataSource dataSourceWithInlinedChannelData = broadcastJoinHelper.inlineChannelData(query.getDataSource());
           segmentMapFn = dataSourceWithInlinedChannelData.createSegmentMapFunction(query, cpuAccumulator);
         }
         else {
           segmentMapFn = Function.identity();
         }
         return true;
   }
   ```



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscribe@druid.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscribe@druid.apache.org
For additional commands, e-mail: commits-help@druid.apache.org


[GitHub] [druid] somu-imply commented on a diff in pull request #13085: Refactoring the data source before unnest

Posted by GitBox <gi...@apache.org>.
somu-imply commented on code in PR #13085:
URL: https://github.com/apache/druid/pull/13085#discussion_r998548500


##########
extensions-core/multi-stage-query/src/main/java/org/apache/druid/msq/querykit/BaseLeafFrameProcessor.java:
##########
@@ -146,19 +147,31 @@ protected SegmentReference mapSegment(final Segment segment)
 
   private boolean initializeSegmentMapFn(final IntSet readableInputs)
   {
+    final AtomicLong cpuAccumulator = new AtomicLong();
     if (segmentMapFn != null) {
       return true;
     } else if (broadcastJoinHelper == null) {
       segmentMapFn = Function.identity();
       return true;
     } else {
-      final boolean retVal = broadcastJoinHelper.buildBroadcastTablesIncrementally(readableInputs);
-
-      if (retVal) {
-        segmentMapFn = broadcastJoinHelper.makeSegmentMapFn(query);
+      if (query.getDataSource() instanceof InputNumberDataSource) {
+        final boolean retVal = broadcastJoinHelper.buildBroadcastTablesIncrementally(readableInputs);
+        if (retVal) {
+          InputNumberDataSource inputNumberDataSource = (InputNumberDataSource) query.getDataSource();
+          // The InputNumberDataSource requires a BroadcastJoinHelper to be able to create its
+          // segment map function.  It would be a lot better if the InputNumberDataSource actually
+          // had a way to get that injected into it on its own, but the relationship between these objects
+          // was figured out during a refactor and using a setter here seemed like the least-bad way to
+          // make progress on the refactor without breaking functionality.  Hopefully, some future
+          // developer will move this away from a setter.
+          inputNumberDataSource.setBroadcastJoinHelper(broadcastJoinHelper);
+          segmentMapFn = inputNumberDataSource.createSegmentMapFunction(query, cpuAccumulator);
+        }
+        return retVal;
+      } else {
+        segmentMapFn = Function.identity();

Review Comment:
   This follows case 1. The new test case has been added



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscribe@druid.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscribe@druid.apache.org
For additional commands, e-mail: commits-help@druid.apache.org


[GitHub] [druid] imply-cheddar commented on a diff in pull request #13085: Refactoring the data source before unnest

Posted by GitBox <gi...@apache.org>.
imply-cheddar commented on code in PR #13085:
URL: https://github.com/apache/druid/pull/13085#discussion_r983131194


##########
processing/src/main/java/org/apache/druid/query/Queries.java:
##########
@@ -191,11 +191,19 @@ public static <T> Query<T> withBaseDataSource(final Query<T> query, final DataSo
   {
     final Query<T> retVal;
 
-    if (query.getDataSource() instanceof QueryDataSource) {
-      final Query<?> subQuery = ((QueryDataSource) query.getDataSource()).getQuery();
+    /*
+     * Currently, this method is implemented in terms of a static walk doing a bunch of instanceof checks.
+     * We should likely look into moving this functionality into the DataSource object itself so that they
+     * can walk and create new objects on their own.  This will be necessary as we expand the set of DataSources
+     * that do actual work, as each of them will need to show up in this if/then waterfall.
+     */

Review Comment:
   I'm pretty sure it should be safe to move something logically equivalent to this method into a method on `DataSource` itself.  Let's do that inside of this refactor, so that we leave this refactor with hopefully never needing to have random methods of `instanceof` checks again.



##########
processing/src/main/java/org/apache/druid/query/JoinDataSource.java:
##########
@@ -127,12 +155,22 @@ public static JoinDataSource create(
       final String rightPrefix,
       final JoinConditionAnalysis conditionAnalysis,
       final JoinType joinType,
-      final DimFilter leftFilter
+      final DimFilter leftFilter,
+      @Nullable @JacksonInject final JoinableFactoryWrapper joinableFactoryWrapper

Review Comment:
   Having `@JacksonInject` on a method that isn't used by Jackson can be confusing (it will make future developers try to figure out how Jackson uses this method).  I don't think it needs to be there.



##########
extensions-core/multi-stage-query/src/main/java/org/apache/druid/msq/querykit/BaseLeafFrameProcessor.java:
##########
@@ -152,13 +153,23 @@ private boolean initializeSegmentMapFn(final IntSet readableInputs)
       segmentMapFn = Function.identity();
       return true;
     } else {
-      final boolean retVal = broadcastJoinHelper.buildBroadcastTablesIncrementally(readableInputs);
-
-      if (retVal) {
-        segmentMapFn = broadcastJoinHelper.makeSegmentMapFn(query);
+      if (query.getDataSource() instanceof InputNumberDataSource) {
+        final boolean retVal = broadcastJoinHelper.buildBroadcastTablesIncrementally(readableInputs);
+        if (retVal) {
+          InputNumberDataSource inputNumberDataSource = (InputNumberDataSource) query.getDataSource();
+          // The InputNumberData source was going through the broadcastJoinHelper which
+          // was using the JoinableFactoryWrapper to create segment map function.
+          // After refactoring, the segment map function creation is moved to data source
+          // Hence for InputNumberDataSource we are setting the broadcast join helper for the data source
+          // and moving the segment map function creation there

Review Comment:
   Sorry to pick on a comment, but this comment does what most comments in code do: it talks from the context of the developer who is writing the code.  Any new developer who comes and reads the code for the first time doesn't even know that this code was refactored, let alone know why it was refactored or what the state of the code was before the refactor.  Instead of explain the change of the refactor, the comment should attempt to help explain what the current, post-refactor code is attempting to accomplish.  I.e. something like
   
   ```
   // The InputNumberDataSource requires a BroadcastJoinHelper to be able to create its
   // segment map function.  It would be a lot better if the InputNumberDataSource actually
   // had a way to get that injected into it on its own, but the relationship between these objects
   // was figured out during a refactor and using a setter here seemed like the least-bad way to
   // make progress on the refactor without breaking functionality.  Hopefully, some future 
   // developer will move this away from a setter.
   ```
   
   Or something like that.



##########
extensions-core/multi-stage-query/src/test/java/org/apache/druid/msq/test/MSQTestBase.java:
##########
@@ -565,6 +572,8 @@ private ObjectMapper setupObjectMapper(Injector injector)
                                                                "compaction"
                                                            )
                                                        ).registerSubtypes(ExternalDataSource.class));
+    DruidSecondaryModule.setupJackson(injector, mapper);
+/*

Review Comment:
   Delete the code instead of comment it out please.



##########
server/src/main/java/org/apache/druid/server/LocalQuerySegmentWalker.java:
##########
@@ -94,12 +93,12 @@ public <T> QueryRunner<T> getQueryRunnerForIntervals(final Query<T> query, final
 
     final AtomicLong cpuAccumulator = new AtomicLong(0L);
 
-    final Function<SegmentReference, SegmentReference> segmentMapFn = joinableFactoryWrapper.createSegmentMapFn(
-        analysis.getJoinBaseTableFilter().map(Filters::toFilter).orElse(null),
-        analysis.getPreJoinableClauses(),
-        cpuAccumulator,
-        analysis.getBaseQuery().orElse(query)
-    );
+    Function<SegmentReference, SegmentReference> segmentMapFn = analysis.getDataSource()
+                                                                        .createSegmentMapFunction(
+                                                                            query,
+                                                                            cpuAccumulator
+                                                                        );
+

Review Comment:
   Now that there's a lot less arguments here, I'm pretty sure this formatting is no longer the best way to represent this code.  Please condense things.



##########
processing/src/main/java/org/apache/druid/query/InlineDataSource.java:
##########
@@ -102,6 +105,55 @@ public static InlineDataSource fromIterable(
     return new InlineDataSource(rows, signature);
   }
 
+  /**
+   * A very zealous equality checker for "rows" that respects deep equality of arrays, but nevertheless refrains
+   * from materializing things needlessly. Useful for unit tests that want to compare equality of different
+   * InlineDataSource instances.
+   */
+  private static boolean rowsEqual(final Iterable<Object[]> rowsA, final Iterable<Object[]> rowsB)

Review Comment:
   These private methods seem to have moved above some public methods, was it just your IDE doing stuff or was there a specific reason?  Generally speaking, the flow is always public methods first.



##########
processing/src/main/java/org/apache/druid/segment/join/JoinableFactoryWrapper.java:
##########
@@ -340,12 +192,60 @@ static JoinClauseToFilterConversion convertJoinToFilter(
     return new JoinClauseToFilterConversion(null, false);
   }
 
+  public JoinableFactory getJoinableFactory()
+  {
+    return joinableFactory;
+  }
+
+  /**
+   * Compute a cache key prefix for a join data source. This includes the data sources that participate in the RHS of a
+   * join as well as any query specific constructs associated with join data source such as base table filter. This key prefix
+   * can be used in segment level cache or result level cache. The function can return following wrapped in an

Review Comment:
   A bunch of this code looks like it changed, but I think it just had methods re-ordered?  Was that intentional?



##########
processing/src/main/java/org/apache/druid/segment/join/JoinableFactoryWrapper.java:
##########
@@ -340,12 +192,60 @@ static JoinClauseToFilterConversion convertJoinToFilter(
     return new JoinClauseToFilterConversion(null, false);
   }
 
+  public JoinableFactory getJoinableFactory()
+  {
+    return joinableFactory;
+  }
+
+  /**
+   * Compute a cache key prefix for a join data source. This includes the data sources that participate in the RHS of a
+   * join as well as any query specific constructs associated with join data source such as base table filter. This key prefix
+   * can be used in segment level cache or result level cache. The function can return following wrapped in an
+   * Optional
+   * - Non-empty byte array - If there is join datasource involved and caching is possible. The result includes
+   * join condition expression, join type and cache key returned by joinable factory for each {@link PreJoinableClause}
+   * - NULL - There is a join but caching is not possible. It may happen if one of the participating datasource
+   * in the JOIN is not cacheable.
+   *
+   * @param dataSourceAnalysis for the join datasource
+   * @return the optional cache key to be used as part of query cache key
+   * @throws {@link IAE} if this operation is called on a non-join data source
+   */
+  public Optional<byte[]> computeJoinDataSourceCacheKey(

Review Comment:
   The existence of this method is another point where things seem a little suspect.  I think the `DataSource` object needs a `public byte[] getCacheKey()` method.  Most of the current implementations would just return `new byte[]{}`, but the join one should return this (or null if not cacheable).  The unnest implementation will need to override the method as well. 



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscribe@druid.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscribe@druid.apache.org
For additional commands, e-mail: commits-help@druid.apache.org


[GitHub] [druid] somu-imply commented on a diff in pull request #13085: Refactoring the data source before unnest

Posted by GitBox <gi...@apache.org>.
somu-imply commented on code in PR #13085:
URL: https://github.com/apache/druid/pull/13085#discussion_r993824681


##########
processing/src/main/java/org/apache/druid/query/Queries.java:
##########
@@ -191,11 +191,19 @@ public static <T> Query<T> withBaseDataSource(final Query<T> query, final DataSo
   {
     final Query<T> retVal;
 
-    if (query.getDataSource() instanceof QueryDataSource) {
-      final Query<?> subQuery = ((QueryDataSource) query.getDataSource()).getQuery();
+    /*
+     * Currently, this method is implemented in terms of a static walk doing a bunch of instanceof checks.
+     * We should likely look into moving this functionality into the DataSource object itself so that they
+     * can walk and create new objects on their own.  This will be necessary as we expand the set of DataSources
+     * that do actual work, as each of them will need to show up in this if/then waterfall.
+     */

Review Comment:
   @cheddar this was moved to the `DataSource` , no more `instanceof` checks here



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscribe@druid.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscribe@druid.apache.org
For additional commands, e-mail: commits-help@druid.apache.org


[GitHub] [druid] clintropolis merged pull request #13085: Refactoring the data source before unnest

Posted by GitBox <gi...@apache.org>.
clintropolis merged PR #13085:
URL: https://github.com/apache/druid/pull/13085


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscribe@druid.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscribe@druid.apache.org
For additional commands, e-mail: commits-help@druid.apache.org


[GitHub] [druid] somu-imply commented on a diff in pull request #13085: Refactoring the data source before unnest

Posted by GitBox <gi...@apache.org>.
somu-imply commented on code in PR #13085:
URL: https://github.com/apache/druid/pull/13085#discussion_r998393165


##########
extensions-core/multi-stage-query/src/main/java/org/apache/druid/msq/querykit/BaseLeafFrameProcessor.java:
##########
@@ -146,18 +147,30 @@ protected SegmentReference mapSegment(final Segment segment)
 
   private boolean initializeSegmentMapFn(final IntSet readableInputs)
   {
+    final AtomicLong cpuAccumulator = new AtomicLong();
     if (segmentMapFn != null) {
       return true;
     } else if (broadcastJoinHelper == null) {
       segmentMapFn = Function.identity();
       return true;
     } else {
       final boolean retVal = broadcastJoinHelper.buildBroadcastTablesIncrementally(readableInputs);
-
+      DataSource inlineChannelDataSource = broadcastJoinHelper.inlineChannelData(query.getDataSource());
       if (retVal) {
-        segmentMapFn = broadcastJoinHelper.makeSegmentMapFn(query);
+        if (inlineChannelDataSource instanceof InputNumberDataSource) {
+          InputNumberDataSource inputNumberDataSource = (InputNumberDataSource) query.getDataSource();
+          // The InputNumberDataSource requires a BroadcastJoinHelper to be able to create its
+          // segment map function.  It would be a lot better if the InputNumberDataSource actually
+          // had a way to get that injected into it on its own, but the relationship between these objects
+          // was figured out during a refactor and using a setter here seemed like the least-bad way to
+          // make progress on the refactor without breaking functionality.  Hopefully, some future
+          // developer will move this away from a setter.
+          inputNumberDataSource.setBroadcastJoinHelper(broadcastJoinHelper);
+          segmentMapFn = inputNumberDataSource.createSegmentMapFunction(query, cpuAccumulator);
+        } else {
+          segmentMapFn = inlineChannelDataSource.createSegmentMapFunction(query, cpuAccumulator);

Review Comment:
   This example passes with the new code changes. The problem discovered was :
   1. There was a lack of a proper test case for this to be discovered. I am adding the test suggested here 
   2. In the previous case, the identity function in the last else part was incorrect as it would not create the correct segment function for this example. This has been now updated by calling `createSegmentMapFunction` on the correct data source.
   3. All unit tests for MSQ, CalciteJoins now complete successfully



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscribe@druid.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscribe@druid.apache.org
For additional commands, e-mail: commits-help@druid.apache.org


[GitHub] [druid] clintropolis commented on a diff in pull request #13085: Refactoring the data source before unnest

Posted by GitBox <gi...@apache.org>.
clintropolis commented on code in PR #13085:
URL: https://github.com/apache/druid/pull/13085#discussion_r997561473


##########
processing/src/main/java/org/apache/druid/query/DataSource.java:
##########
@@ -83,8 +89,40 @@
    * Returns true if this datasource represents concrete data that can be scanned via a
    * {@link org.apache.druid.segment.Segment} adapter of some kind. True for e.g. 'table' but not for 'query' or 'join'.
    *
-   * @see org.apache.druid.query.planning.DataSourceAnalysis#isConcreteBased() which uses this
-   * @see org.apache.druid.query.planning.DataSourceAnalysis#isConcreteTableBased() which uses this
+   * @see DataSourceAnalysis#isConcreteBased() which uses this
+   * @see DataSourceAnalysis#isConcreteTableBased() which uses this
    */
   boolean isConcrete();
+
+  /**
+   * Returns a segment function on to how to segment should be modified.
+   *
+   * @param query the input query
+   * @param cpuTimeAcc the cpu time accumulator
+   * @return the segment function
+   */
+  Function<SegmentReference, SegmentReference> createSegmentMapFunction(Query query, AtomicLong cpuTimeAcc);
+
+  /**
+   * Returns an updated datasource based on the specified new source.
+   *
+   * @param newSource the new datasource to be used to update an existing query
+   * @return the updated datasource to be used
+   */
+  DataSource withUpdatedDataSource(DataSource newSource);
+
+  /**
+   * Compute a cache key prefix for a data source. This includes the data sources that participate in the RHS of a
+   * join as well as any query specific constructs associated with join data source such as base table filter. This key prefix
+   * can be used in segment level cache or result level cache. The function can return following
+   * - Non-empty byte array - If there is join datasource involved and caching is possible. The result includes
+   * join condition expression, join type and cache key returned by joinable factory for each {@link PreJoinableClause}
+   * - NULL - There is a join but caching is not possible. It may happen if one of the participating datasource
+   * in the JOIN is not cacheable.
+   *
+   * @return the cache key to be used as part of query cache key
+   * @throws {@link IAE} if this operation is called on a non-join data source
+   */
+  byte[] getCacheKey();

Review Comment:
   >throws {@link IAE} if this operation is called on a non-join data source
   the javadocs are maybe not true exactly? it looks like most implementations are currently returning empty bytes.
   
   Im also a bit unsure how great this naming is given that it should only be used for joins afaik, and not used at all for cache keys otherwise. it seems sort of seems like it would be important for all query types, but calling it something specific to joins also seems short sighted... Not sure what exactly to do here.. maybe this needs a wider discussion about what kinds of datasources need to implement this and what do not which would help clear this up. I'm not sure it needs to block anything right now, other than maybe correcting javadocs



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscribe@druid.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscribe@druid.apache.org
For additional commands, e-mail: commits-help@druid.apache.org


[GitHub] [druid] clintropolis commented on a diff in pull request #13085: Refactoring the data source before unnest

Posted by GitBox <gi...@apache.org>.
clintropolis commented on code in PR #13085:
URL: https://github.com/apache/druid/pull/13085#discussion_r997557552


##########
sql/src/main/java/org/apache/druid/sql/calcite/planner/PlannerContext.java:
##########
@@ -77,6 +78,7 @@
   private final String sql;
   private final DruidOperatorTable operatorTable;
   private final ExprMacroTable macroTable;
+  private final JoinableFactoryWrapper jfw;

Review Comment:
   super super nit: tiny variable name doesn't seem very consistent with other names 😅 (in a few other places too)



##########
server/src/main/java/org/apache/druid/server/LocalQuerySegmentWalker.java:
##########
@@ -94,12 +93,13 @@ public <T> QueryRunner<T> getQueryRunnerForIntervals(final Query<T> query, final
 
     final AtomicLong cpuAccumulator = new AtomicLong(0L);
 
-    final Function<SegmentReference, SegmentReference> segmentMapFn = joinableFactoryWrapper.createSegmentMapFn(
-        analysis.getJoinBaseTableFilter().map(Filters::toFilter).orElse(null),
-        analysis.getPreJoinableClauses(),
-        cpuAccumulator,
-        analysis.getBaseQuery().orElse(query)
-    );
+    Function<SegmentReference, SegmentReference> segmentMapFn = analysis

Review Comment:
   super nit: `final` to match other lines 😅 (few other places too)



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscribe@druid.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscribe@druid.apache.org
For additional commands, e-mail: commits-help@druid.apache.org


[GitHub] [druid] somu-imply commented on a diff in pull request #13085: Refactoring the data source before unnest

Posted by GitBox <gi...@apache.org>.
somu-imply commented on code in PR #13085:
URL: https://github.com/apache/druid/pull/13085#discussion_r997610253


##########
extensions-core/multi-stage-query/src/main/java/org/apache/druid/msq/querykit/BaseLeafFrameProcessor.java:
##########
@@ -146,19 +147,31 @@ protected SegmentReference mapSegment(final Segment segment)
 
   private boolean initializeSegmentMapFn(final IntSet readableInputs)
   {
+    final AtomicLong cpuAccumulator = new AtomicLong();
     if (segmentMapFn != null) {
       return true;
     } else if (broadcastJoinHelper == null) {
       segmentMapFn = Function.identity();
       return true;
     } else {
-      final boolean retVal = broadcastJoinHelper.buildBroadcastTablesIncrementally(readableInputs);
-
-      if (retVal) {
-        segmentMapFn = broadcastJoinHelper.makeSegmentMapFn(query);
+      if (query.getDataSource() instanceof InputNumberDataSource) {
+        final boolean retVal = broadcastJoinHelper.buildBroadcastTablesIncrementally(readableInputs);
+        if (retVal) {
+          InputNumberDataSource inputNumberDataSource = (InputNumberDataSource) query.getDataSource();
+          // The InputNumberDataSource requires a BroadcastJoinHelper to be able to create its
+          // segment map function.  It would be a lot better if the InputNumberDataSource actually
+          // had a way to get that injected into it on its own, but the relationship between these objects
+          // was figured out during a refactor and using a setter here seemed like the least-bad way to
+          // make progress on the refactor without breaking functionality.  Hopefully, some future
+          // developer will move this away from a setter.
+          inputNumberDataSource.setBroadcastJoinHelper(broadcastJoinHelper);
+          segmentMapFn = inputNumberDataSource.createSegmentMapFunction(query, cpuAccumulator);
+        }
+        return retVal;
+      } else {
+        segmentMapFn = Function.identity();

Review Comment:
   Yes there is a `MSQSelectTest.joinTest` 



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscribe@druid.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscribe@druid.apache.org
For additional commands, e-mail: commits-help@druid.apache.org


[GitHub] [druid] somu-imply commented on a diff in pull request #13085: Refactoring the data source before unnest

Posted by GitBox <gi...@apache.org>.
somu-imply commented on code in PR #13085:
URL: https://github.com/apache/druid/pull/13085#discussion_r996172042


##########
services/src/main/java/org/apache/druid/cli/CliRouter.java:
##########
@@ -112,7 +114,8 @@ protected List<? extends Module> getModules()
           binder.bind(new TypeLiteral<List<TieredBrokerSelectorStrategy>>() {})
                 .toProvider(TieredBrokerSelectorStrategiesProvider.class)
                 .in(LazySingleton.class);
-
+          
+          binder.bind(JoinableFactory.class).to(NoopJoinableFactory.class).in(LazySingleton.class);

Review Comment:
   removed



##########
core/src/main/java/org/apache/druid/math/expr/ExprMacroTable.java:
##########
@@ -44,6 +44,7 @@
 {
   private static final ExprMacroTable NIL = new ExprMacroTable(Collections.emptyList());
 
+

Review Comment:
   aah new line. addressed it



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscribe@druid.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscribe@druid.apache.org
For additional commands, e-mail: commits-help@druid.apache.org


[GitHub] [druid] somu-imply commented on a diff in pull request #13085: Refactoring the data source before unnest

Posted by GitBox <gi...@apache.org>.
somu-imply commented on code in PR #13085:
URL: https://github.com/apache/druid/pull/13085#discussion_r995222185


##########
processing/src/main/java/org/apache/druid/segment/join/JoinableFactoryWrapper.java:
##########
@@ -340,12 +192,60 @@ static JoinClauseToFilterConversion convertJoinToFilter(
     return new JoinClauseToFilterConversion(null, false);
   }
 
+  public JoinableFactory getJoinableFactory()
+  {
+    return joinableFactory;
+  }
+
+  /**
+   * Compute a cache key prefix for a join data source. This includes the data sources that participate in the RHS of a
+   * join as well as any query specific constructs associated with join data source such as base table filter. This key prefix
+   * can be used in segment level cache or result level cache. The function can return following wrapped in an
+   * Optional
+   * - Non-empty byte array - If there is join datasource involved and caching is possible. The result includes
+   * join condition expression, join type and cache key returned by joinable factory for each {@link PreJoinableClause}
+   * - NULL - There is a join but caching is not possible. It may happen if one of the participating datasource
+   * in the JOIN is not cacheable.
+   *
+   * @param dataSourceAnalysis for the join datasource
+   * @return the optional cache key to be used as part of query cache key
+   * @throws {@link IAE} if this operation is called on a non-join data source
+   */
+  public Optional<byte[]> computeJoinDataSourceCacheKey(

Review Comment:
   This has been moved. The test cases in `JoinableFactoryWrapperTest` have been refactored and 
   moved inside `JoinDataSourceTest`



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscribe@druid.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscribe@druid.apache.org
For additional commands, e-mail: commits-help@druid.apache.org


[GitHub] [druid] clintropolis commented on a diff in pull request #13085: Refactoring the data source before unnest

Posted by GitBox <gi...@apache.org>.
clintropolis commented on code in PR #13085:
URL: https://github.com/apache/druid/pull/13085#discussion_r997557552


##########
sql/src/main/java/org/apache/druid/sql/calcite/planner/PlannerContext.java:
##########
@@ -77,6 +78,7 @@
   private final String sql;
   private final DruidOperatorTable operatorTable;
   private final ExprMacroTable macroTable;
+  private final JoinableFactoryWrapper jfw;

Review Comment:
   super super nit: tiny variable name doesn't seem very consistent with other names 😅 



##########
processing/src/main/java/org/apache/druid/query/DataSource.java:
##########
@@ -83,8 +89,40 @@
    * Returns true if this datasource represents concrete data that can be scanned via a
    * {@link org.apache.druid.segment.Segment} adapter of some kind. True for e.g. 'table' but not for 'query' or 'join'.
    *
-   * @see org.apache.druid.query.planning.DataSourceAnalysis#isConcreteBased() which uses this
-   * @see org.apache.druid.query.planning.DataSourceAnalysis#isConcreteTableBased() which uses this
+   * @see DataSourceAnalysis#isConcreteBased() which uses this
+   * @see DataSourceAnalysis#isConcreteTableBased() which uses this
    */
   boolean isConcrete();
+
+  /**
+   * Returns a segment function on to how to segment should be modified.
+   *
+   * @param query the input query
+   * @param cpuTimeAcc the cpu time accumulator
+   * @return the segment function
+   */
+  Function<SegmentReference, SegmentReference> createSegmentMapFunction(Query query, AtomicLong cpuTimeAcc);
+
+  /**
+   * Returns an updated datasource based on the specified new source.
+   *
+   * @param newSource the new datasource to be used to update an existing query
+   * @return the updated datasource to be used
+   */
+  DataSource withUpdatedDataSource(DataSource newSource);
+
+  /**
+   * Compute a cache key prefix for a data source. This includes the data sources that participate in the RHS of a
+   * join as well as any query specific constructs associated with join data source such as base table filter. This key prefix
+   * can be used in segment level cache or result level cache. The function can return following
+   * - Non-empty byte array - If there is join datasource involved and caching is possible. The result includes
+   * join condition expression, join type and cache key returned by joinable factory for each {@link PreJoinableClause}
+   * - NULL - There is a join but caching is not possible. It may happen if one of the participating datasource
+   * in the JOIN is not cacheable.
+   *
+   * @return the cache key to be used as part of query cache key
+   * @throws {@link IAE} if this operation is called on a non-join data source
+   */
+  byte[] getCacheKey();

Review Comment:
   >throws {@link IAE} if this operation is called on a non-join data source
   the javadocs are maybe not true exactly? it looks like most implementations are currently returning empty bytes.
   
   Im also a bit tunsure how great this naming is given that it should only be used for joins afaik, and not used at all for cache keys otherwise. it seems sort of seems like it would be important for all query types, but calling it something specific to joins also seems short sighted... Not sure what exactly to do here.. maybe this needs a wider discussion about what kinds of datasources need to implement this and what do not which would help clear this up



##########
server/src/main/java/org/apache/druid/server/LocalQuerySegmentWalker.java:
##########
@@ -94,12 +93,13 @@ public <T> QueryRunner<T> getQueryRunnerForIntervals(final Query<T> query, final
 
     final AtomicLong cpuAccumulator = new AtomicLong(0L);
 
-    final Function<SegmentReference, SegmentReference> segmentMapFn = joinableFactoryWrapper.createSegmentMapFn(
-        analysis.getJoinBaseTableFilter().map(Filters::toFilter).orElse(null),
-        analysis.getPreJoinableClauses(),
-        cpuAccumulator,
-        analysis.getBaseQuery().orElse(query)
-    );
+    Function<SegmentReference, SegmentReference> segmentMapFn = analysis

Review Comment:
   super nit: `final` to match other lines 😅 



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscribe@druid.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscribe@druid.apache.org
For additional commands, e-mail: commits-help@druid.apache.org


[GitHub] [druid] somu-imply commented on a diff in pull request #13085: Refactoring the data source before unnest

Posted by GitBox <gi...@apache.org>.
somu-imply commented on code in PR #13085:
URL: https://github.com/apache/druid/pull/13085#discussion_r997159469


##########
extensions-core/multi-stage-query/src/main/java/org/apache/druid/msq/querykit/BaseLeafFrameProcessor.java:
##########
@@ -146,19 +147,31 @@ protected SegmentReference mapSegment(final Segment segment)
 
   private boolean initializeSegmentMapFn(final IntSet readableInputs)
   {
+    final AtomicLong cpuAccumulator = new AtomicLong();
     if (segmentMapFn != null) {
       return true;
     } else if (broadcastJoinHelper == null) {
       segmentMapFn = Function.identity();
       return true;
     } else {
-      final boolean retVal = broadcastJoinHelper.buildBroadcastTablesIncrementally(readableInputs);
-
-      if (retVal) {
-        segmentMapFn = broadcastJoinHelper.makeSegmentMapFn(query);
+      if (query.getDataSource() instanceof InputNumberDataSource) {
+        final boolean retVal = broadcastJoinHelper.buildBroadcastTablesIncrementally(readableInputs);
+        if (retVal) {
+          InputNumberDataSource inputNumberDataSource = (InputNumberDataSource) query.getDataSource();
+          // The InputNumberDataSource requires a BroadcastJoinHelper to be able to create its
+          // segment map function.  It would be a lot better if the InputNumberDataSource actually
+          // had a way to get that injected into it on its own, but the relationship between these objects
+          // was figured out during a refactor and using a setter here seemed like the least-bad way to
+          // make progress on the refactor without breaking functionality.  Hopefully, some future
+          // developer will move this away from a setter.
+          inputNumberDataSource.setBroadcastJoinHelper(broadcastJoinHelper);
+          segmentMapFn = inputNumberDataSource.createSegmentMapFunction(query, cpuAccumulator);
+        }
+        return retVal;
+      } else {
+        segmentMapFn = Function.identity();

Review Comment:
   To continue with Abhishek's point if the parent datasource is a `JoinDataSource` with child data sources being `InputNumberDataSource`, this would basically set the map function to an identity function which is not right. One way is to first construct the data source using the broadcastJoinHelper and call createSegmentMapFunction of that. This would also remove the `instanceof` check we are doing. I'll discuss this with @imply-cheddar today. I believe the else part can be simplified as
   ```
   else {
         final boolean retVal = broadcastJoinHelper.buildBroadcastTablesIncrementally(readableInputs);
         if (retVal) {
           final DataSource dataSourceWithInlinedChannelData = broadcastJoinHelper.inlineChannelData(query.getDataSource());
           segmentMapFn = dataSourceWithInlinedChannelData.createSegmentMapFunction(query, cpuAccumulator);
         }
         else {
           segmentMapFn = Function.identity();
         }
         return true;
   }
   ```



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscribe@druid.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscribe@druid.apache.org
For additional commands, e-mail: commits-help@druid.apache.org


[GitHub] [druid] somu-imply commented on a diff in pull request #13085: Refactoring the data source before unnest

Posted by GitBox <gi...@apache.org>.
somu-imply commented on code in PR #13085:
URL: https://github.com/apache/druid/pull/13085#discussion_r996171524


##########
benchmarks/src/test/java/org/apache/druid/benchmark/query/SqlBenchmark.java:
##########
@@ -88,8 +86,8 @@
  */
 @State(Scope.Benchmark)
 @Fork(value = 1)
-@Warmup(iterations = 5)
-@Measurement(iterations = 15)
+@Warmup(iterations = 3)
+@Measurement(iterations = 10)

Review Comment:
   This was reverted. Wherever the cpuAccumulators were reused, we passed a variable but where they were not used downstream I have used a `new AtomicLong()`. 



##########
extensions-core/multi-stage-query/src/main/java/org/apache/druid/msq/querykit/InputNumberDataSource.java:
##########
@@ -81,6 +92,29 @@ public boolean isConcrete()
     return false;
   }
 
+  @Override
+  public Function<SegmentReference, SegmentReference> createSegmentMapFunction(
+      Query query,
+      AtomicLong cpuTime
+  )
+  {
+    final DataSource dataSourceWithInlinedChannelData = broadcastJoinHelper.inlineChannelData(query.getDataSource());

Review Comment:
   I added the check



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscribe@druid.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscribe@druid.apache.org
For additional commands, e-mail: commits-help@druid.apache.org


[GitHub] [druid] imply-cheddar commented on a diff in pull request #13085: Refactoring the data source before unnest

Posted by GitBox <gi...@apache.org>.
imply-cheddar commented on code in PR #13085:
URL: https://github.com/apache/druid/pull/13085#discussion_r997604250


##########
extensions-core/multi-stage-query/src/main/java/org/apache/druid/msq/querykit/InputNumberDataSource.java:
##########
@@ -81,6 +92,36 @@ public boolean isConcrete()
     return false;
   }
 
+  @Override
+  public Function<SegmentReference, SegmentReference> createSegmentMapFunction(
+      Query query,
+      AtomicLong cpuTime
+  )
+  {
+    if (broadcastJoinHelper == null) {
+      throw new IAE(
+          "No helper for broadcast join found on data source [%s]. "
+          + "Please make sure to set this before this call. ",
+          query.getDataSource()
+      );
+    }
+    final DataSource dataSourceWithInlinedChannelData = broadcastJoinHelper.inlineChannelData(query.getDataSource());
+    final DataSourceAnalysis analysis = DataSourceAnalysis.forDataSource(dataSourceWithInlinedChannelData);
+    return analysis.getDataSource().createSegmentMapFunction(query, new AtomicLong());

Review Comment:
   Pass through the cpuTime that was passed into this method please.



##########
processing/src/main/java/org/apache/druid/query/DataSource.java:
##########
@@ -83,8 +89,40 @@
    * Returns true if this datasource represents concrete data that can be scanned via a
    * {@link org.apache.druid.segment.Segment} adapter of some kind. True for e.g. 'table' but not for 'query' or 'join'.
    *
-   * @see org.apache.druid.query.planning.DataSourceAnalysis#isConcreteBased() which uses this
-   * @see org.apache.druid.query.planning.DataSourceAnalysis#isConcreteTableBased() which uses this
+   * @see DataSourceAnalysis#isConcreteBased() which uses this
+   * @see DataSourceAnalysis#isConcreteTableBased() which uses this
    */
   boolean isConcrete();
+
+  /**
+   * Returns a segment function on to how to segment should be modified.
+   *
+   * @param query the input query
+   * @param cpuTimeAcc the cpu time accumulator
+   * @return the segment function
+   */
+  Function<SegmentReference, SegmentReference> createSegmentMapFunction(Query query, AtomicLong cpuTimeAcc);
+
+  /**
+   * Returns an updated datasource based on the specified new source.
+   *
+   * @param newSource the new datasource to be used to update an existing query
+   * @return the updated datasource to be used
+   */
+  DataSource withUpdatedDataSource(DataSource newSource);
+
+  /**
+   * Compute a cache key prefix for a data source. This includes the data sources that participate in the RHS of a
+   * join as well as any query specific constructs associated with join data source such as base table filter. This key prefix
+   * can be used in segment level cache or result level cache. The function can return following
+   * - Non-empty byte array - If there is join datasource involved and caching is possible. The result includes
+   * join condition expression, join type and cache key returned by joinable factory for each {@link PreJoinableClause}
+   * - NULL - There is a join but caching is not possible. It may happen if one of the participating datasource
+   * in the JOIN is not cacheable.
+   *
+   * @return the cache key to be used as part of query cache key
+   * @throws {@link IAE} if this operation is called on a non-join data source
+   */
+  byte[] getCacheKey();

Review Comment:
   Yeah, that javadoc on the `throws` is old.  It should never actually throw.
   
   At this point, this is only for joins, the semantics that we introduced, though, give the different datasources the ability to completely opt-out of caching if a meaningful cache key cannot be created.  The refactor is actually making this explicit and the various implementations are now opting in/out of caching on their own (by either returning `null` or `new byte[]{}`)



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscribe@druid.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscribe@druid.apache.org
For additional commands, e-mail: commits-help@druid.apache.org


[GitHub] [druid] imply-cheddar commented on a diff in pull request #13085: Refactoring the data source before unnest

Posted by GitBox <gi...@apache.org>.
imply-cheddar commented on code in PR #13085:
URL: https://github.com/apache/druid/pull/13085#discussion_r997781533


##########
server/src/main/java/org/apache/druid/server/coordination/ServerManager.java:
##########
@@ -195,18 +194,10 @@ public <T> QueryRunner<T> getQueryRunnerForSegments(Query<T> query, Iterable<Seg
     } else {
       return new ReportTimelineMissingSegmentQueryRunner<>(Lists.newArrayList(specs));
     }
+    Function<SegmentReference, SegmentReference> segmentMapFn = analysis.getDataSource().createSegmentMapFunction(query, cpuTimeAccumulator);

Review Comment:
   does this need to be the `analysis.getDataSource()` instead of `query.getDataSource()`?



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscribe@druid.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscribe@druid.apache.org
For additional commands, e-mail: commits-help@druid.apache.org


[GitHub] [druid] imply-cheddar commented on a diff in pull request #13085: Refactoring the data source before unnest

Posted by GitBox <gi...@apache.org>.
imply-cheddar commented on code in PR #13085:
URL: https://github.com/apache/druid/pull/13085#discussion_r1002843503


##########
core/src/main/java/org/apache/druid/guice/GuiceInjectableValues.java:
##########
@@ -50,7 +59,21 @@ public Object findInjectableValue(
     // Currently we should only be dealing with `Key` instances, and anything more advanced should be handled with
     // great care
     if (valueId instanceof Key) {
-      return injector.getInstance((Key) valueId);
+      try {
+        return injector.getInstance((Key) valueId);
+      }
+      catch (ConfigurationException ce) {
+        // check if nullable annotation is present for this
+        if (nullables.get().contains((Key) valueId)) {

Review Comment:
   The point of caching these is so that we only have Guice generate the exception once.  Putting this check inside of the catch means that we don't get any benefit.  This check needs to be done before we even ask the injector to create the instance.



##########
core/src/main/java/org/apache/druid/guice/GuiceInjectableValues.java:
##########
@@ -50,7 +59,21 @@ public Object findInjectableValue(
     // Currently we should only be dealing with `Key` instances, and anything more advanced should be handled with
     // great care
     if (valueId instanceof Key) {
-      return injector.getInstance((Key) valueId);
+      try {
+        return injector.getInstance((Key) valueId);
+      }
+      catch (ConfigurationException ce) {
+        // check if nullable annotation is present for this
+        if (nullables.get().contains((Key) valueId)) {
+          return null;
+        } else if (forProperty.getAnnotation(Nullable.class) != null) {
+          HashSet<Key> encounteredNullables = nullables.get();
+          encounteredNullables.add((Key) valueId);

Review Comment:
   This is a non-thread-safe mutation of the HashSet.  The goal of the AtomicReference was to make thinds thread-safe.  You must create a brand new Set, add all of the old values, add the new value and then set the new reference on the AtomicReference.



##########
core/src/main/java/org/apache/druid/guice/GuiceInjectableValues.java:
##########
@@ -22,19 +22,28 @@
 import com.fasterxml.jackson.databind.BeanProperty;
 import com.fasterxml.jackson.databind.DeserializationContext;
 import com.fasterxml.jackson.databind.InjectableValues;
+import com.google.inject.ConfigurationException;
 import com.google.inject.Injector;
 import com.google.inject.Key;
 import org.apache.druid.java.util.common.IAE;
 
+import javax.annotation.Nullable;
+import java.util.HashSet;
+import java.util.concurrent.atomic.AtomicReference;
+
 /**
+ *
  */
 public class GuiceInjectableValues extends InjectableValues
 {
   private final Injector injector;
+  private final AtomicReference<HashSet<Key>> nullables;
 
   public GuiceInjectableValues(Injector injector)
   {
     this.injector = injector;
+    this.nullables = new AtomicReference<>();
+    this.nullables.set(new HashSet<>());

Review Comment:
   There's an `AtomicReference` constructor that takes an initial value.  It's a nit, but you can remove this line by using it.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscribe@druid.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscribe@druid.apache.org
For additional commands, e-mail: commits-help@druid.apache.org


[GitHub] [druid] abhishekagarwal87 commented on a diff in pull request #13085: Refactoring the data source before unnest

Posted by GitBox <gi...@apache.org>.
abhishekagarwal87 commented on code in PR #13085:
URL: https://github.com/apache/druid/pull/13085#discussion_r995590968


##########
core/src/main/java/org/apache/druid/math/expr/ExprMacroTable.java:
##########
@@ -44,6 +44,7 @@
 {
   private static final ExprMacroTable NIL = new ExprMacroTable(Collections.emptyList());
 
+

Review Comment:
   this change is not needed. 



##########
extensions-core/multi-stage-query/src/main/java/org/apache/druid/msq/querykit/BaseLeafFrameProcessor.java:
##########
@@ -152,13 +153,24 @@ private boolean initializeSegmentMapFn(final IntSet readableInputs)
       segmentMapFn = Function.identity();
       return true;
     } else {
-      final boolean retVal = broadcastJoinHelper.buildBroadcastTablesIncrementally(readableInputs);
-
-      if (retVal) {
-        segmentMapFn = broadcastJoinHelper.makeSegmentMapFn(query);
+      if (query.getDataSource() instanceof InputNumberDataSource) {

Review Comment:
   It seems earlier a `segmentMapFn` gets created even when `InputNumberDataSource` is a child datasource. but not now? 



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscribe@druid.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscribe@druid.apache.org
For additional commands, e-mail: commits-help@druid.apache.org


[GitHub] [druid] imply-cheddar commented on a diff in pull request #13085: Refactoring the data source before unnest

Posted by GitBox <gi...@apache.org>.
imply-cheddar commented on code in PR #13085:
URL: https://github.com/apache/druid/pull/13085#discussion_r997604077


##########
extensions-core/multi-stage-query/src/main/java/org/apache/druid/msq/querykit/BaseLeafFrameProcessor.java:
##########
@@ -146,19 +147,31 @@ protected SegmentReference mapSegment(final Segment segment)
 
   private boolean initializeSegmentMapFn(final IntSet readableInputs)
   {
+    final AtomicLong cpuAccumulator = new AtomicLong();
     if (segmentMapFn != null) {
       return true;
     } else if (broadcastJoinHelper == null) {
       segmentMapFn = Function.identity();
       return true;
     } else {
-      final boolean retVal = broadcastJoinHelper.buildBroadcastTablesIncrementally(readableInputs);
-
-      if (retVal) {
-        segmentMapFn = broadcastJoinHelper.makeSegmentMapFn(query);
+      if (query.getDataSource() instanceof InputNumberDataSource) {
+        final boolean retVal = broadcastJoinHelper.buildBroadcastTablesIncrementally(readableInputs);
+        if (retVal) {
+          InputNumberDataSource inputNumberDataSource = (InputNumberDataSource) query.getDataSource();
+          // The InputNumberDataSource requires a BroadcastJoinHelper to be able to create its
+          // segment map function.  It would be a lot better if the InputNumberDataSource actually
+          // had a way to get that injected into it on its own, but the relationship between these objects
+          // was figured out during a refactor and using a setter here seemed like the least-bad way to
+          // make progress on the refactor without breaking functionality.  Hopefully, some future
+          // developer will move this away from a setter.
+          inputNumberDataSource.setBroadcastJoinHelper(broadcastJoinHelper);
+          segmentMapFn = inputNumberDataSource.createSegmentMapFunction(query, cpuAccumulator);
+        }
+        return retVal;
+      } else {
+        segmentMapFn = Function.identity();

Review Comment:
   Is there a test for that case?



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscribe@druid.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscribe@druid.apache.org
For additional commands, e-mail: commits-help@druid.apache.org


[GitHub] [druid] somu-imply commented on a diff in pull request #13085: Refactoring the data source before unnest

Posted by GitBox <gi...@apache.org>.
somu-imply commented on code in PR #13085:
URL: https://github.com/apache/druid/pull/13085#discussion_r997611130


##########
sql/src/main/java/org/apache/druid/sql/calcite/planner/PlannerContext.java:
##########
@@ -77,6 +78,7 @@
   private final String sql;
   private final DruidOperatorTable operatorTable;
   private final ExprMacroTable macroTable;
+  private final JoinableFactoryWrapper jfw;

Review Comment:
   My bad a new round is coming with all these changes (have them locally)



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscribe@druid.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscribe@druid.apache.org
For additional commands, e-mail: commits-help@druid.apache.org


[GitHub] [druid] somu-imply commented on a diff in pull request #13085: Refactoring the data source before unnest

Posted by GitBox <gi...@apache.org>.
somu-imply commented on code in PR #13085:
URL: https://github.com/apache/druid/pull/13085#discussion_r998393165


##########
extensions-core/multi-stage-query/src/main/java/org/apache/druid/msq/querykit/BaseLeafFrameProcessor.java:
##########
@@ -146,18 +147,30 @@ protected SegmentReference mapSegment(final Segment segment)
 
   private boolean initializeSegmentMapFn(final IntSet readableInputs)
   {
+    final AtomicLong cpuAccumulator = new AtomicLong();
     if (segmentMapFn != null) {
       return true;
     } else if (broadcastJoinHelper == null) {
       segmentMapFn = Function.identity();
       return true;
     } else {
       final boolean retVal = broadcastJoinHelper.buildBroadcastTablesIncrementally(readableInputs);
-
+      DataSource inlineChannelDataSource = broadcastJoinHelper.inlineChannelData(query.getDataSource());
       if (retVal) {
-        segmentMapFn = broadcastJoinHelper.makeSegmentMapFn(query);
+        if (inlineChannelDataSource instanceof InputNumberDataSource) {
+          InputNumberDataSource inputNumberDataSource = (InputNumberDataSource) query.getDataSource();
+          // The InputNumberDataSource requires a BroadcastJoinHelper to be able to create its
+          // segment map function.  It would be a lot better if the InputNumberDataSource actually
+          // had a way to get that injected into it on its own, but the relationship between these objects
+          // was figured out during a refactor and using a setter here seemed like the least-bad way to
+          // make progress on the refactor without breaking functionality.  Hopefully, some future
+          // developer will move this away from a setter.
+          inputNumberDataSource.setBroadcastJoinHelper(broadcastJoinHelper);
+          segmentMapFn = inputNumberDataSource.createSegmentMapFunction(query, cpuAccumulator);
+        } else {
+          segmentMapFn = inlineChannelDataSource.createSegmentMapFunction(query, cpuAccumulator);

Review Comment:
   This example passes with the new code changes. The problem discovered was :
   1. There was a lack of a proper test case for this to be discovered. I am adding the test suggested here 
   2. In the previous case, the identity function in the last else part was incorrect as it would not create the correct segment function for this example. This has been now updated by calling `createSegmentMapFunction` on the correct data source.
   3. All unit tests for MSQ, CalciteJoins etc. now complete successfully



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscribe@druid.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscribe@druid.apache.org
For additional commands, e-mail: commits-help@druid.apache.org


[GitHub] [druid] somu-imply commented on a diff in pull request #13085: Refactoring the data source before unnest

Posted by GitBox <gi...@apache.org>.
somu-imply commented on code in PR #13085:
URL: https://github.com/apache/druid/pull/13085#discussion_r1003506965


##########
core/src/main/java/org/apache/druid/guice/GuiceInjectableValues.java:
##########
@@ -50,7 +59,21 @@ public Object findInjectableValue(
     // Currently we should only be dealing with `Key` instances, and anything more advanced should be handled with
     // great care
     if (valueId instanceof Key) {
-      return injector.getInstance((Key) valueId);
+      try {
+        return injector.getInstance((Key) valueId);
+      }
+      catch (ConfigurationException ce) {
+        // check if nullable annotation is present for this
+        if (nullables.get().contains((Key) valueId)) {
+          return null;
+        } else if (forProperty.getAnnotation(Nullable.class) != null) {
+          HashSet<Key> encounteredNullables = nullables.get();
+          encounteredNullables.add((Key) valueId);

Review Comment:
   Addressed



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscribe@druid.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscribe@druid.apache.org
For additional commands, e-mail: commits-help@druid.apache.org


[GitHub] [druid] imply-cheddar commented on a diff in pull request #13085: Refactoring the data source before unnest

Posted by GitBox <gi...@apache.org>.
imply-cheddar commented on code in PR #13085:
URL: https://github.com/apache/druid/pull/13085#discussion_r997622663


##########
sql/src/main/java/org/apache/druid/sql/calcite/external/ExternalDataSource.java:
##########
@@ -141,7 +141,7 @@ public DataSource withUpdatedDataSource(DataSource newSource)
   @Override
   public byte[] getCacheKey()
   {
-    return null;
+    return new byte[0];

Review Comment:
   Basically, yes.  I had forgotten about that method.  In some future, we should refactor to completely eliminate the method...  But let's not take that on yet.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscribe@druid.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscribe@druid.apache.org
For additional commands, e-mail: commits-help@druid.apache.org


[GitHub] [druid] somu-imply commented on a diff in pull request #13085: Refactoring the data source before unnest

Posted by GitBox <gi...@apache.org>.
somu-imply commented on code in PR #13085:
URL: https://github.com/apache/druid/pull/13085#discussion_r997753038


##########
extensions-core/multi-stage-query/src/main/java/org/apache/druid/msq/querykit/BaseLeafFrameProcessor.java:
##########
@@ -146,18 +147,30 @@ protected SegmentReference mapSegment(final Segment segment)
 
   private boolean initializeSegmentMapFn(final IntSet readableInputs)
   {
+    final AtomicLong cpuAccumulator = new AtomicLong();
     if (segmentMapFn != null) {
       return true;
     } else if (broadcastJoinHelper == null) {
       segmentMapFn = Function.identity();
       return true;
     } else {
       final boolean retVal = broadcastJoinHelper.buildBroadcastTablesIncrementally(readableInputs);
-
+      DataSource inlineChannelDataSource = broadcastJoinHelper.inlineChannelData(query.getDataSource());
       if (retVal) {
-        segmentMapFn = broadcastJoinHelper.makeSegmentMapFn(query);
+        if (inlineChannelDataSource instanceof InputNumberDataSource) {
+          InputNumberDataSource inputNumberDataSource = (InputNumberDataSource) query.getDataSource();
+          // The InputNumberDataSource requires a BroadcastJoinHelper to be able to create its
+          // segment map function.  It would be a lot better if the InputNumberDataSource actually
+          // had a way to get that injected into it on its own, but the relationship between these objects
+          // was figured out during a refactor and using a setter here seemed like the least-bad way to
+          // make progress on the refactor without breaking functionality.  Hopefully, some future
+          // developer will move this away from a setter.
+          inputNumberDataSource.setBroadcastJoinHelper(broadcastJoinHelper);
+          segmentMapFn = inputNumberDataSource.createSegmentMapFunction(query, cpuAccumulator);
+        } else {
+          segmentMapFn = inlineChannelDataSource.createSegmentMapFunction(query, cpuAccumulator);

Review Comment:
   @abhishekagarwal87 I have updated this part. Should address your earlier concern



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscribe@druid.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscribe@druid.apache.org
For additional commands, e-mail: commits-help@druid.apache.org


[GitHub] [druid] somu-imply commented on pull request #13085: Refactoring the data source before unnest

Posted by GitBox <gi...@apache.org>.
somu-imply commented on PR #13085:
URL: https://github.com/apache/druid/pull/13085#issuecomment-1287387013

   @clintropolis @imply-cheddar @abhishekagarwal87 this is ready for review. Can you please take a look


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscribe@druid.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscribe@druid.apache.org
For additional commands, e-mail: commits-help@druid.apache.org


[GitHub] [druid] somu-imply commented on a diff in pull request #13085: Refactoring the data source before unnest

Posted by GitBox <gi...@apache.org>.
somu-imply commented on code in PR #13085:
URL: https://github.com/apache/druid/pull/13085#discussion_r997610253


##########
extensions-core/multi-stage-query/src/main/java/org/apache/druid/msq/querykit/BaseLeafFrameProcessor.java:
##########
@@ -146,19 +147,31 @@ protected SegmentReference mapSegment(final Segment segment)
 
   private boolean initializeSegmentMapFn(final IntSet readableInputs)
   {
+    final AtomicLong cpuAccumulator = new AtomicLong();
     if (segmentMapFn != null) {
       return true;
     } else if (broadcastJoinHelper == null) {
       segmentMapFn = Function.identity();
       return true;
     } else {
-      final boolean retVal = broadcastJoinHelper.buildBroadcastTablesIncrementally(readableInputs);
-
-      if (retVal) {
-        segmentMapFn = broadcastJoinHelper.makeSegmentMapFn(query);
+      if (query.getDataSource() instanceof InputNumberDataSource) {
+        final boolean retVal = broadcastJoinHelper.buildBroadcastTablesIncrementally(readableInputs);
+        if (retVal) {
+          InputNumberDataSource inputNumberDataSource = (InputNumberDataSource) query.getDataSource();
+          // The InputNumberDataSource requires a BroadcastJoinHelper to be able to create its
+          // segment map function.  It would be a lot better if the InputNumberDataSource actually
+          // had a way to get that injected into it on its own, but the relationship between these objects
+          // was figured out during a refactor and using a setter here seemed like the least-bad way to
+          // make progress on the refactor without breaking functionality.  Hopefully, some future
+          // developer will move this away from a setter.
+          inputNumberDataSource.setBroadcastJoinHelper(broadcastJoinHelper);
+          segmentMapFn = inputNumberDataSource.createSegmentMapFunction(query, cpuAccumulator);
+        }
+        return retVal;
+      } else {
+        segmentMapFn = Function.identity();

Review Comment:
   Yes there is a `MSQSelectTest.testJoin` which after the code I proposed here works as expected 



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscribe@druid.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscribe@druid.apache.org
For additional commands, e-mail: commits-help@druid.apache.org


[GitHub] [druid] somu-imply commented on a diff in pull request #13085: Refactoring the data source before unnest

Posted by GitBox <gi...@apache.org>.
somu-imply commented on code in PR #13085:
URL: https://github.com/apache/druid/pull/13085#discussion_r997610253


##########
extensions-core/multi-stage-query/src/main/java/org/apache/druid/msq/querykit/BaseLeafFrameProcessor.java:
##########
@@ -146,19 +147,31 @@ protected SegmentReference mapSegment(final Segment segment)
 
   private boolean initializeSegmentMapFn(final IntSet readableInputs)
   {
+    final AtomicLong cpuAccumulator = new AtomicLong();
     if (segmentMapFn != null) {
       return true;
     } else if (broadcastJoinHelper == null) {
       segmentMapFn = Function.identity();
       return true;
     } else {
-      final boolean retVal = broadcastJoinHelper.buildBroadcastTablesIncrementally(readableInputs);
-
-      if (retVal) {
-        segmentMapFn = broadcastJoinHelper.makeSegmentMapFn(query);
+      if (query.getDataSource() instanceof InputNumberDataSource) {
+        final boolean retVal = broadcastJoinHelper.buildBroadcastTablesIncrementally(readableInputs);
+        if (retVal) {
+          InputNumberDataSource inputNumberDataSource = (InputNumberDataSource) query.getDataSource();
+          // The InputNumberDataSource requires a BroadcastJoinHelper to be able to create its
+          // segment map function.  It would be a lot better if the InputNumberDataSource actually
+          // had a way to get that injected into it on its own, but the relationship between these objects
+          // was figured out during a refactor and using a setter here seemed like the least-bad way to
+          // make progress on the refactor without breaking functionality.  Hopefully, some future
+          // developer will move this away from a setter.
+          inputNumberDataSource.setBroadcastJoinHelper(broadcastJoinHelper);
+          segmentMapFn = inputNumberDataSource.createSegmentMapFunction(query, cpuAccumulator);
+        }
+        return retVal;
+      } else {
+        segmentMapFn = Function.identity();

Review Comment:
   Yes there is a `MSQSelectTest.testJoin` which works now and also works after the code I proposed here works as expected 



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscribe@druid.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscribe@druid.apache.org
For additional commands, e-mail: commits-help@druid.apache.org


[GitHub] [druid] imply-cheddar commented on a diff in pull request #13085: Refactoring the data source before unnest

Posted by GitBox <gi...@apache.org>.
imply-cheddar commented on code in PR #13085:
URL: https://github.com/apache/druid/pull/13085#discussion_r997616065


##########
extensions-core/multi-stage-query/src/main/java/org/apache/druid/msq/querykit/BaseLeafFrameProcessor.java:
##########
@@ -146,19 +147,31 @@ protected SegmentReference mapSegment(final Segment segment)
 
   private boolean initializeSegmentMapFn(final IntSet readableInputs)
   {
+    final AtomicLong cpuAccumulator = new AtomicLong();
     if (segmentMapFn != null) {
       return true;
     } else if (broadcastJoinHelper == null) {
       segmentMapFn = Function.identity();
       return true;
     } else {
-      final boolean retVal = broadcastJoinHelper.buildBroadcastTablesIncrementally(readableInputs);
-
-      if (retVal) {
-        segmentMapFn = broadcastJoinHelper.makeSegmentMapFn(query);
+      if (query.getDataSource() instanceof InputNumberDataSource) {
+        final boolean retVal = broadcastJoinHelper.buildBroadcastTablesIncrementally(readableInputs);
+        if (retVal) {
+          InputNumberDataSource inputNumberDataSource = (InputNumberDataSource) query.getDataSource();
+          // The InputNumberDataSource requires a BroadcastJoinHelper to be able to create its
+          // segment map function.  It would be a lot better if the InputNumberDataSource actually
+          // had a way to get that injected into it on its own, but the relationship between these objects
+          // was figured out during a refactor and using a setter here seemed like the least-bad way to
+          // make progress on the refactor without breaking functionality.  Hopefully, some future
+          // developer will move this away from a setter.
+          inputNumberDataSource.setBroadcastJoinHelper(broadcastJoinHelper);
+          segmentMapFn = inputNumberDataSource.createSegmentMapFunction(query, cpuAccumulator);
+        }
+        return retVal;
+      } else {
+        segmentMapFn = Function.identity();

Review Comment:
   If it works before the code change, then why do we need the code change?



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscribe@druid.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscribe@druid.apache.org
For additional commands, e-mail: commits-help@druid.apache.org


[GitHub] [druid] somu-imply commented on a diff in pull request #13085: Refactoring the data source before unnest

Posted by GitBox <gi...@apache.org>.
somu-imply commented on code in PR #13085:
URL: https://github.com/apache/druid/pull/13085#discussion_r993824681


##########
processing/src/main/java/org/apache/druid/query/Queries.java:
##########
@@ -191,11 +191,19 @@ public static <T> Query<T> withBaseDataSource(final Query<T> query, final DataSo
   {
     final Query<T> retVal;
 
-    if (query.getDataSource() instanceof QueryDataSource) {
-      final Query<?> subQuery = ((QueryDataSource) query.getDataSource()).getQuery();
+    /*
+     * Currently, this method is implemented in terms of a static walk doing a bunch of instanceof checks.
+     * We should likely look into moving this functionality into the DataSource object itself so that they
+     * can walk and create new objects on their own.  This will be necessary as we expand the set of DataSources
+     * that do actual work, as each of them will need to show up in this if/then waterfall.
+     */

Review Comment:
   @imply-cheddar  this was moved to the `DataSource` , no more `instanceof` checks here



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscribe@druid.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscribe@druid.apache.org
For additional commands, e-mail: commits-help@druid.apache.org


[GitHub] [druid] imply-cheddar commented on a diff in pull request #13085: Refactoring the data source before unnest

Posted by GitBox <gi...@apache.org>.
imply-cheddar commented on code in PR #13085:
URL: https://github.com/apache/druid/pull/13085#discussion_r995464088


##########
extensions-core/multi-stage-query/src/main/java/org/apache/druid/msq/querykit/InputNumberDataSource.java:
##########
@@ -81,6 +92,29 @@ public boolean isConcrete()
     return false;
   }
 
+  @Override
+  public Function<SegmentReference, SegmentReference> createSegmentMapFunction(
+      Query query,
+      AtomicLong cpuTime
+  )
+  {
+    final DataSource dataSourceWithInlinedChannelData = broadcastJoinHelper.inlineChannelData(query.getDataSource());

Review Comment:
   Probably nice to do a null check on the `broadcastJoinHelper` here and produce an error message that's meaningful to the developer.



##########
core/src/main/java/org/apache/druid/guice/GuiceInjectableValues.java:
##########
@@ -50,7 +53,15 @@ public Object findInjectableValue(
     // Currently we should only be dealing with `Key` instances, and anything more advanced should be handled with
     // great care
     if (valueId instanceof Key) {
-      return injector.getInstance((Key) valueId);
+      try {
+        return injector.getInstance((Key) valueId);
+      }
+      catch (ConfigurationException ce) {
+        //check if annotation is nullable
+        if (forProperty.getAnnotation(Nullable.class).annotationType().isAnnotation()) {

Review Comment:
   Are you sure that `forProperty.getAnnotation()` never returns null?  If it does, I fear that this code is going to convert an actually useful `ConfigurationException` into a completely uninterpretable `NullPointerException`



##########
benchmarks/src/test/java/org/apache/druid/benchmark/query/SqlBenchmark.java:
##########
@@ -88,8 +86,8 @@
  */
 @State(Scope.Benchmark)
 @Fork(value = 1)
-@Warmup(iterations = 5)
-@Measurement(iterations = 15)
+@Warmup(iterations = 3)
+@Measurement(iterations = 10)

Review Comment:
   Why the changes in this file?



##########
processing/src/main/java/org/apache/druid/segment/join/JoinableFactoryWrapper.java:
##########
@@ -77,144 +55,13 @@ public JoinableFactoryWrapper(final JoinableFactory joinableFactory)
     this.joinableFactory = Preconditions.checkNotNull(joinableFactory, "joinableFactory");
   }
 
-  public JoinableFactory getJoinableFactory()
-  {
-    return joinableFactory;
-  }
-
-  /**
-   * Creates a Function that maps base segments to {@link HashJoinSegment} if needed (i.e. if the number of join
-   * clauses is > 0). If mapping is not needed, this method will return {@link Function#identity()}.
-   *
-   * @param baseFilter         Filter to apply before the join takes place
-   * @param clauses            Pre-joinable clauses
-   * @param cpuTimeAccumulator An accumulator that we will add CPU nanos to; this is part of the function to encourage
-   *                           callers to remember to track metrics on CPU time required for creation of Joinables
-   * @param query              The query that will be run on the mapped segments. Usually this should be
-   *                           {@code analysis.getBaseQuery().orElse(query)}, where "analysis" is a
-   *                           {@link DataSourceAnalysis} and "query" is the original
-   *                           query from the end user.
-   */
-  public Function<SegmentReference, SegmentReference> createSegmentMapFn(
-      @Nullable final Filter baseFilter,
-      final List<PreJoinableClause> clauses,
-      final AtomicLong cpuTimeAccumulator,
-      final Query<?> query
-  )
-  {
-    // compute column correlations here and RHS correlated values
-    return JvmUtils.safeAccumulateThreadCpuTime(
-        cpuTimeAccumulator,
-        () -> {
-          if (clauses.isEmpty()) {
-            return Function.identity();
-          } else {
-            final JoinableClauses joinableClauses = JoinableClauses.createClauses(clauses, joinableFactory);
-            final JoinFilterRewriteConfig filterRewriteConfig = JoinFilterRewriteConfig.forQuery(query);
-
-            // Pick off any join clauses that can be converted into filters.
-            final Set<String> requiredColumns = query.getRequiredColumns();
-            final Filter baseFilterToUse;
-            final List<JoinableClause> clausesToUse;
-
-            if (requiredColumns != null && filterRewriteConfig.isEnableRewriteJoinToFilter()) {
-              final Pair<List<Filter>, List<JoinableClause>> conversionResult = convertJoinsToFilters(
-                  joinableClauses.getJoinableClauses(),
-                  requiredColumns,
-                  Ints.checkedCast(Math.min(filterRewriteConfig.getFilterRewriteMaxSize(), Integer.MAX_VALUE))
-              );
-
-              baseFilterToUse =
-                  Filters.maybeAnd(
-                      Lists.newArrayList(
-                          Iterables.concat(
-                              Collections.singleton(baseFilter),
-                              conversionResult.lhs
-                          )
-                      )
-                  ).orElse(null);
-              clausesToUse = conversionResult.rhs;
-            } else {
-              baseFilterToUse = baseFilter;
-              clausesToUse = joinableClauses.getJoinableClauses();
-            }
-
-            // Analyze remaining join clauses to see if filters on them can be pushed down.
-            final JoinFilterPreAnalysis joinFilterPreAnalysis = JoinFilterAnalyzer.computeJoinFilterPreAnalysis(
-                new JoinFilterPreAnalysisKey(
-                    filterRewriteConfig,
-                    clausesToUse,
-                    query.getVirtualColumns(),
-                    Filters.maybeAnd(Arrays.asList(baseFilterToUse, Filters.toFilter(query.getFilter())))
-                           .orElse(null)
-                )
-            );
-
-            return baseSegment ->
-                new HashJoinSegment(
-                    baseSegment,
-                    baseFilterToUse,
-                    GuavaUtils.firstNonNull(clausesToUse, ImmutableList.of()),
-                    joinFilterPreAnalysis
-                );
-          }
-        }
-    );
-  }
-
-  /**
-   * Compute a cache key prefix for a join data source. This includes the data sources that participate in the RHS of a
-   * join as well as any query specific constructs associated with join data source such as base table filter. This key prefix
-   * can be used in segment level cache or result level cache. The function can return following wrapped in an
-   * Optional
-   * - Non-empty byte array - If there is join datasource involved and caching is possible. The result includes
-   * join condition expression, join type and cache key returned by joinable factory for each {@link PreJoinableClause}
-   * - NULL - There is a join but caching is not possible. It may happen if one of the participating datasource
-   * in the JOIN is not cacheable.
-   *
-   * @param dataSourceAnalysis for the join datasource
-   *
-   * @return the optional cache key to be used as part of query cache key
-   *
-   * @throws {@link IAE} if this operation is called on a non-join data source
-   */
-  public Optional<byte[]> computeJoinDataSourceCacheKey(
-      final DataSourceAnalysis dataSourceAnalysis
-  )
-  {
-    final List<PreJoinableClause> clauses = dataSourceAnalysis.getPreJoinableClauses();
-    if (clauses.isEmpty()) {
-      throw new IAE("No join clauses to build the cache key for data source [%s]", dataSourceAnalysis.getDataSource());
-    }
-
-    final CacheKeyBuilder keyBuilder;
-    keyBuilder = new CacheKeyBuilder(JOIN_OPERATION);
-    if (dataSourceAnalysis.getJoinBaseTableFilter().isPresent()) {
-      keyBuilder.appendCacheable(dataSourceAnalysis.getJoinBaseTableFilter().get());
-    }
-    for (PreJoinableClause clause : clauses) {
-      Optional<byte[]> bytes = joinableFactory.computeJoinCacheKey(clause.getDataSource(), clause.getCondition());
-      if (!bytes.isPresent()) {
-        // Encountered a data source which didn't support cache yet
-        log.debug("skipping caching for join since [%s] does not support caching", clause.getDataSource());
-        return Optional.empty();
-      }
-      keyBuilder.appendByteArray(bytes.get());
-      keyBuilder.appendString(clause.getCondition().getOriginalExpression());
-      keyBuilder.appendString(clause.getPrefix());
-      keyBuilder.appendString(clause.getJoinType().name());
-    }
-    return Optional.of(keyBuilder.build());
-  }
-
-
   /**
    * Converts any join clauses to filters that can be converted, and returns the rest as-is.
-   *
+   * <p>
    * See {@link #convertJoinToFilter} for details on the logic.
    */
   @VisibleForTesting

Review Comment:
   Don't really need this annotation on a public method :)



##########
services/src/main/java/org/apache/druid/cli/CliRouter.java:
##########
@@ -112,7 +114,8 @@ protected List<? extends Module> getModules()
           binder.bind(new TypeLiteral<List<TieredBrokerSelectorStrategy>>() {})
                 .toProvider(TieredBrokerSelectorStrategiesProvider.class)
                 .in(LazySingleton.class);
-
+          
+          binder.bind(JoinableFactory.class).to(NoopJoinableFactory.class).in(LazySingleton.class);

Review Comment:
   This shouldn't be necessary anymore, right?



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscribe@druid.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscribe@druid.apache.org
For additional commands, e-mail: commits-help@druid.apache.org


[GitHub] [druid] somu-imply commented on a diff in pull request #13085: Refactoring the data source before unnest

Posted by GitBox <gi...@apache.org>.
somu-imply commented on code in PR #13085:
URL: https://github.com/apache/druid/pull/13085#discussion_r997391822


##########
sql/src/main/java/org/apache/druid/sql/calcite/external/ExternalDataSource.java:
##########
@@ -141,7 +141,7 @@ public DataSource withUpdatedDataSource(DataSource newSource)
   @Override
   public byte[] getCacheKey()
   {
-    return null;
+    return new byte[0];

Review Comment:
   So where the `isCacheable` for the data source is false, the `getCacheKey()` should return null for those data sources. Is my understanding correct ?



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscribe@druid.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscribe@druid.apache.org
For additional commands, e-mail: commits-help@druid.apache.org


[GitHub] [druid] imply-cheddar commented on a diff in pull request #13085: Refactoring the data source before unnest

Posted by GitBox <gi...@apache.org>.
imply-cheddar commented on code in PR #13085:
URL: https://github.com/apache/druid/pull/13085#discussion_r997618452


##########
extensions-core/multi-stage-query/src/main/java/org/apache/druid/msq/querykit/BaseLeafFrameProcessor.java:
##########
@@ -146,19 +147,31 @@ protected SegmentReference mapSegment(final Segment segment)
 
   private boolean initializeSegmentMapFn(final IntSet readableInputs)
   {
+    final AtomicLong cpuAccumulator = new AtomicLong();
     if (segmentMapFn != null) {
       return true;
     } else if (broadcastJoinHelper == null) {
       segmentMapFn = Function.identity();
       return true;
     } else {
-      final boolean retVal = broadcastJoinHelper.buildBroadcastTablesIncrementally(readableInputs);
-
-      if (retVal) {
-        segmentMapFn = broadcastJoinHelper.makeSegmentMapFn(query);
+      if (query.getDataSource() instanceof InputNumberDataSource) {
+        final boolean retVal = broadcastJoinHelper.buildBroadcastTablesIncrementally(readableInputs);
+        if (retVal) {
+          InputNumberDataSource inputNumberDataSource = (InputNumberDataSource) query.getDataSource();
+          // The InputNumberDataSource requires a BroadcastJoinHelper to be able to create its
+          // segment map function.  It would be a lot better if the InputNumberDataSource actually
+          // had a way to get that injected into it on its own, but the relationship between these objects
+          // was figured out during a refactor and using a setter here seemed like the least-bad way to
+          // make progress on the refactor without breaking functionality.  Hopefully, some future
+          // developer will move this away from a setter.
+          inputNumberDataSource.setBroadcastJoinHelper(broadcastJoinHelper);
+          segmentMapFn = inputNumberDataSource.createSegmentMapFunction(query, cpuAccumulator);
+        }
+        return retVal;
+      } else {
+        segmentMapFn = Function.identity();

Review Comment:
   What I'm trying to say is, if there's not a test that can exercise the corner that is being mentioned, then the corner doesn't exist.  If the corner does exist, there should be a test that exercises it which is broken.  So, we are in 1 of 3 states:
   
   1) There should be a test that's breaking, but there isn't yet.  In that case, let's create the test and then talk about solutions.
   2) There is a test that exercises the corner, but it's passing.  In this case, the corner doesn't actually exist and we do not need further code changes.
   3) There is no test that generates the highlighted state because something upstream from this code is actually preventing that state from ever existing.  So, it is impossible to actually get to the bad state.  In this case, the corner doesn't actually exist, no code changes required.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscribe@druid.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscribe@druid.apache.org
For additional commands, e-mail: commits-help@druid.apache.org


[GitHub] [druid] somu-imply commented on pull request #13085: Refactoring the data source before unnest

Posted by GitBox <gi...@apache.org>.
somu-imply commented on PR #13085:
URL: https://github.com/apache/druid/pull/13085#issuecomment-1287891627

   The conflicts have been resolved. Some additional refactoring introduced due to change in underlying test framework in #12965 


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscribe@druid.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscribe@druid.apache.org
For additional commands, e-mail: commits-help@druid.apache.org


[GitHub] [druid] abhishekagarwal87 commented on a diff in pull request #13085: Refactoring the data source before unnest

Posted by GitBox <gi...@apache.org>.
abhishekagarwal87 commented on code in PR #13085:
URL: https://github.com/apache/druid/pull/13085#discussion_r997907481


##########
extensions-core/multi-stage-query/src/main/java/org/apache/druid/msq/querykit/BaseLeafFrameProcessor.java:
##########
@@ -146,18 +147,30 @@ protected SegmentReference mapSegment(final Segment segment)
 
   private boolean initializeSegmentMapFn(final IntSet readableInputs)
   {
+    final AtomicLong cpuAccumulator = new AtomicLong();
     if (segmentMapFn != null) {
       return true;
     } else if (broadcastJoinHelper == null) {
       segmentMapFn = Function.identity();
       return true;
     } else {
       final boolean retVal = broadcastJoinHelper.buildBroadcastTablesIncrementally(readableInputs);
-
+      DataSource inlineChannelDataSource = broadcastJoinHelper.inlineChannelData(query.getDataSource());
       if (retVal) {
-        segmentMapFn = broadcastJoinHelper.makeSegmentMapFn(query);
+        if (inlineChannelDataSource instanceof InputNumberDataSource) {
+          InputNumberDataSource inputNumberDataSource = (InputNumberDataSource) query.getDataSource();
+          // The InputNumberDataSource requires a BroadcastJoinHelper to be able to create its
+          // segment map function.  It would be a lot better if the InputNumberDataSource actually
+          // had a way to get that injected into it on its own, but the relationship between these objects
+          // was figured out during a refactor and using a setter here seemed like the least-bad way to
+          // make progress on the refactor without breaking functionality.  Hopefully, some future
+          // developer will move this away from a setter.
+          inputNumberDataSource.setBroadcastJoinHelper(broadcastJoinHelper);
+          segmentMapFn = inputNumberDataSource.createSegmentMapFunction(query, cpuAccumulator);
+        } else {
+          segmentMapFn = inlineChannelDataSource.createSegmentMapFunction(query, cpuAccumulator);

Review Comment:
   This query would fail in MSQ
   ```
   SELECT t1.dim2, AVG(t1.m2) FROM "
               + "foo "
               + "INNER JOIN (SELECT * FROM foo LIMIT 10) AS t1 "
               + "ON t1.m1 = foo.m1 "
               + "GROUP BY t1.dim2
   ```



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscribe@druid.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscribe@druid.apache.org
For additional commands, e-mail: commits-help@druid.apache.org


[GitHub] [druid] somu-imply commented on a diff in pull request #13085: Refactoring the data source before unnest

Posted by GitBox <gi...@apache.org>.
somu-imply commented on code in PR #13085:
URL: https://github.com/apache/druid/pull/13085#discussion_r997610253


##########
extensions-core/multi-stage-query/src/main/java/org/apache/druid/msq/querykit/BaseLeafFrameProcessor.java:
##########
@@ -146,19 +147,31 @@ protected SegmentReference mapSegment(final Segment segment)
 
   private boolean initializeSegmentMapFn(final IntSet readableInputs)
   {
+    final AtomicLong cpuAccumulator = new AtomicLong();
     if (segmentMapFn != null) {
       return true;
     } else if (broadcastJoinHelper == null) {
       segmentMapFn = Function.identity();
       return true;
     } else {
-      final boolean retVal = broadcastJoinHelper.buildBroadcastTablesIncrementally(readableInputs);
-
-      if (retVal) {
-        segmentMapFn = broadcastJoinHelper.makeSegmentMapFn(query);
+      if (query.getDataSource() instanceof InputNumberDataSource) {
+        final boolean retVal = broadcastJoinHelper.buildBroadcastTablesIncrementally(readableInputs);
+        if (retVal) {
+          InputNumberDataSource inputNumberDataSource = (InputNumberDataSource) query.getDataSource();
+          // The InputNumberDataSource requires a BroadcastJoinHelper to be able to create its
+          // segment map function.  It would be a lot better if the InputNumberDataSource actually
+          // had a way to get that injected into it on its own, but the relationship between these objects
+          // was figured out during a refactor and using a setter here seemed like the least-bad way to
+          // make progress on the refactor without breaking functionality.  Hopefully, some future
+          // developer will move this away from a setter.
+          inputNumberDataSource.setBroadcastJoinHelper(broadcastJoinHelper);
+          segmentMapFn = inputNumberDataSource.createSegmentMapFunction(query, cpuAccumulator);
+        }
+        return retVal;
+      } else {
+        segmentMapFn = Function.identity();

Review Comment:
   Yes there is a `MSQSelectTest.testJoin` 



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscribe@druid.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscribe@druid.apache.org
For additional commands, e-mail: commits-help@druid.apache.org


[GitHub] [druid] somu-imply commented on a diff in pull request #13085: Refactoring the data source before unnest

Posted by GitBox <gi...@apache.org>.
somu-imply commented on code in PR #13085:
URL: https://github.com/apache/druid/pull/13085#discussion_r998526989


##########
server/src/main/java/org/apache/druid/server/coordination/ServerManager.java:
##########
@@ -195,18 +194,10 @@ public <T> QueryRunner<T> getQueryRunnerForSegments(Query<T> query, Iterable<Seg
     } else {
       return new ReportTimelineMissingSegmentQueryRunner<>(Lists.newArrayList(specs));
     }
+    Function<SegmentReference, SegmentReference> segmentMapFn = analysis.getDataSource().createSegmentMapFunction(query, cpuTimeAccumulator);

Review Comment:
   I get confused between use of `analysis.getDataSource()` and `query.getDataSource()` especially when analysis is created from `DataSourceAnalysis.fromDataSource(query.getDataSource())`. This is the case here. I have updated it to use query



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscribe@druid.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscribe@druid.apache.org
For additional commands, e-mail: commits-help@druid.apache.org


[GitHub] [druid] somu-imply commented on a diff in pull request #13085: Refactoring the data source before unnest

Posted by GitBox <gi...@apache.org>.
somu-imply commented on code in PR #13085:
URL: https://github.com/apache/druid/pull/13085#discussion_r997159469


##########
extensions-core/multi-stage-query/src/main/java/org/apache/druid/msq/querykit/BaseLeafFrameProcessor.java:
##########
@@ -146,19 +147,31 @@ protected SegmentReference mapSegment(final Segment segment)
 
   private boolean initializeSegmentMapFn(final IntSet readableInputs)
   {
+    final AtomicLong cpuAccumulator = new AtomicLong();
     if (segmentMapFn != null) {
       return true;
     } else if (broadcastJoinHelper == null) {
       segmentMapFn = Function.identity();
       return true;
     } else {
-      final boolean retVal = broadcastJoinHelper.buildBroadcastTablesIncrementally(readableInputs);
-
-      if (retVal) {
-        segmentMapFn = broadcastJoinHelper.makeSegmentMapFn(query);
+      if (query.getDataSource() instanceof InputNumberDataSource) {
+        final boolean retVal = broadcastJoinHelper.buildBroadcastTablesIncrementally(readableInputs);
+        if (retVal) {
+          InputNumberDataSource inputNumberDataSource = (InputNumberDataSource) query.getDataSource();
+          // The InputNumberDataSource requires a BroadcastJoinHelper to be able to create its
+          // segment map function.  It would be a lot better if the InputNumberDataSource actually
+          // had a way to get that injected into it on its own, but the relationship between these objects
+          // was figured out during a refactor and using a setter here seemed like the least-bad way to
+          // make progress on the refactor without breaking functionality.  Hopefully, some future
+          // developer will move this away from a setter.
+          inputNumberDataSource.setBroadcastJoinHelper(broadcastJoinHelper);
+          segmentMapFn = inputNumberDataSource.createSegmentMapFunction(query, cpuAccumulator);
+        }
+        return retVal;
+      } else {
+        segmentMapFn = Function.identity();

Review Comment:
   To continue with Abhishek's point if the parent datasource is a `JoinDataSource` with child data sources being `InputNumberDataSource`, this would basically set the map function to an identity function which is not right. One way is to first construct the data source using the broadcastJoinHelper and call createSegmentMapFunction of that. This would also remove the `instanceof` check we are doing. I'll discuss this with @imply-cheddar today. I believe the else part can be simplified as
   ```
   else {
         final boolean retVal = broadcastJoinHelper.buildBroadcastTablesIncrementally(readableInputs);
         if (retVal) {
           final DataSource dataSourceWithInlinedChannelData = broadcastJoinHelper.inlineChannelData(query.getDataSource());
           segmentMapFn = dataSourceWithInlinedChannelData.createSegmentMapFunction(query, cpuAccumulator);
         }
         return true;
   }
   ```



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscribe@druid.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscribe@druid.apache.org
For additional commands, e-mail: commits-help@druid.apache.org


[GitHub] [druid] somu-imply commented on pull request #13085: Refactoring the data source before unnest

Posted by GitBox <gi...@apache.org>.
somu-imply commented on PR #13085:
URL: https://github.com/apache/druid/pull/13085#issuecomment-1259765174

   Travis is failing from a branch coverage from the changes to `BaseLeafFrameProcessor`. While I am working on that I am setting this PR up for review now.


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscribe@druid.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscribe@druid.apache.org
For additional commands, e-mail: commits-help@druid.apache.org


[GitHub] [druid] somu-imply commented on pull request #13085: Refactoring the data source before unnest

Posted by GitBox <gi...@apache.org>.
somu-imply commented on PR #13085:
URL: https://github.com/apache/druid/pull/13085#issuecomment-1259951335

   Guice issues in Integration tests. Taking a look


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscribe@druid.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscribe@druid.apache.org
For additional commands, e-mail: commits-help@druid.apache.org


[GitHub] [druid] imply-cheddar commented on a diff in pull request #13085: Refactoring the data source before unnest

Posted by GitBox <gi...@apache.org>.
imply-cheddar commented on code in PR #13085:
URL: https://github.com/apache/druid/pull/13085#discussion_r1005109301


##########
server/src/main/java/org/apache/druid/server/coordination/ServerManager.java:
##########
@@ -195,18 +194,10 @@ public <T> QueryRunner<T> getQueryRunnerForSegments(Query<T> query, Iterable<Seg
     } else {
       return new ReportTimelineMissingSegmentQueryRunner<>(Lists.newArrayList(specs));
     }
+    Function<SegmentReference, SegmentReference> segmentMapFn = analysis.getDataSource().createSegmentMapFunction(query, cpuTimeAccumulator);

Review Comment:
   Yes it is super easy to get confused by that...



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscribe@druid.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscribe@druid.apache.org
For additional commands, e-mail: commits-help@druid.apache.org


[GitHub] [druid] somu-imply commented on pull request #13085: Refactoring the data source before unnest

Posted by GitBox <gi...@apache.org>.
somu-imply commented on PR #13085:
URL: https://github.com/apache/druid/pull/13085#issuecomment-1282763628

   @imply-cheddar , @clintropolis I have addressed the comments and also resolved the conflicts. The thing remaining to address it use of a set in `GuiceInjectableValues` so the nullable check can be avoided


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscribe@druid.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscribe@druid.apache.org
For additional commands, e-mail: commits-help@druid.apache.org


[GitHub] [druid] somu-imply commented on a diff in pull request #13085: Refactoring the data source before unnest

Posted by GitBox <gi...@apache.org>.
somu-imply commented on code in PR #13085:
URL: https://github.com/apache/druid/pull/13085#discussion_r997159469


##########
extensions-core/multi-stage-query/src/main/java/org/apache/druid/msq/querykit/BaseLeafFrameProcessor.java:
##########
@@ -146,19 +147,31 @@ protected SegmentReference mapSegment(final Segment segment)
 
   private boolean initializeSegmentMapFn(final IntSet readableInputs)
   {
+    final AtomicLong cpuAccumulator = new AtomicLong();
     if (segmentMapFn != null) {
       return true;
     } else if (broadcastJoinHelper == null) {
       segmentMapFn = Function.identity();
       return true;
     } else {
-      final boolean retVal = broadcastJoinHelper.buildBroadcastTablesIncrementally(readableInputs);
-
-      if (retVal) {
-        segmentMapFn = broadcastJoinHelper.makeSegmentMapFn(query);
+      if (query.getDataSource() instanceof InputNumberDataSource) {
+        final boolean retVal = broadcastJoinHelper.buildBroadcastTablesIncrementally(readableInputs);
+        if (retVal) {
+          InputNumberDataSource inputNumberDataSource = (InputNumberDataSource) query.getDataSource();
+          // The InputNumberDataSource requires a BroadcastJoinHelper to be able to create its
+          // segment map function.  It would be a lot better if the InputNumberDataSource actually
+          // had a way to get that injected into it on its own, but the relationship between these objects
+          // was figured out during a refactor and using a setter here seemed like the least-bad way to
+          // make progress on the refactor without breaking functionality.  Hopefully, some future
+          // developer will move this away from a setter.
+          inputNumberDataSource.setBroadcastJoinHelper(broadcastJoinHelper);
+          segmentMapFn = inputNumberDataSource.createSegmentMapFunction(query, cpuAccumulator);
+        }
+        return retVal;
+      } else {
+        segmentMapFn = Function.identity();

Review Comment:
   To continue with Abhishek's point if the parent datasource is a `JoinDataSource` with child data sources being `InputNumberDataSource`, this would basically set the map function to an identity function which is not right. One way is to first construct the data source using the broadcastJoinHelper and call createSegmentMapFunction of that. I'll discuss this with @imply-cheddar today. I believe the else part can be simplified as
   ```
   else {
         final boolean retVal = broadcastJoinHelper.buildBroadcastTablesIncrementally(readableInputs);
         if (retVal) {
           final DataSource dataSourceWithInlinedChannelData = broadcastJoinHelper.inlineChannelData(query.getDataSource());
           segmentMapFn = dataSourceWithInlinedChannelData.createSegmentMapFunction(query, cpuAccumulator);
         }
         else {
           segmentMapFn = Function.identity();
         }
         return true;
   ```



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscribe@druid.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscribe@druid.apache.org
For additional commands, e-mail: commits-help@druid.apache.org


[GitHub] [druid] somu-imply commented on a diff in pull request #13085: Refactoring the data source before unnest

Posted by GitBox <gi...@apache.org>.
somu-imply commented on code in PR #13085:
URL: https://github.com/apache/druid/pull/13085#discussion_r997159469


##########
extensions-core/multi-stage-query/src/main/java/org/apache/druid/msq/querykit/BaseLeafFrameProcessor.java:
##########
@@ -146,19 +147,31 @@ protected SegmentReference mapSegment(final Segment segment)
 
   private boolean initializeSegmentMapFn(final IntSet readableInputs)
   {
+    final AtomicLong cpuAccumulator = new AtomicLong();
     if (segmentMapFn != null) {
       return true;
     } else if (broadcastJoinHelper == null) {
       segmentMapFn = Function.identity();
       return true;
     } else {
-      final boolean retVal = broadcastJoinHelper.buildBroadcastTablesIncrementally(readableInputs);
-
-      if (retVal) {
-        segmentMapFn = broadcastJoinHelper.makeSegmentMapFn(query);
+      if (query.getDataSource() instanceof InputNumberDataSource) {
+        final boolean retVal = broadcastJoinHelper.buildBroadcastTablesIncrementally(readableInputs);
+        if (retVal) {
+          InputNumberDataSource inputNumberDataSource = (InputNumberDataSource) query.getDataSource();
+          // The InputNumberDataSource requires a BroadcastJoinHelper to be able to create its
+          // segment map function.  It would be a lot better if the InputNumberDataSource actually
+          // had a way to get that injected into it on its own, but the relationship between these objects
+          // was figured out during a refactor and using a setter here seemed like the least-bad way to
+          // make progress on the refactor without breaking functionality.  Hopefully, some future
+          // developer will move this away from a setter.
+          inputNumberDataSource.setBroadcastJoinHelper(broadcastJoinHelper);
+          segmentMapFn = inputNumberDataSource.createSegmentMapFunction(query, cpuAccumulator);
+        }
+        return retVal;
+      } else {
+        segmentMapFn = Function.identity();

Review Comment:
   To continue with Abhishek's point if the parent datasource is a `JoinDataSource` with child data sources being `InputNumberDataSource`, this would basically set the map function to an identity function which is not right. One way is to first construct the data source using the broadcastJoinHelper and call createSegmentMapFunction of that. This would also remove the instanceof check we are doing. I'll discuss this with @imply-cheddar today. I believe the else part can be simplified as
   ```
   else {
         final boolean retVal = broadcastJoinHelper.buildBroadcastTablesIncrementally(readableInputs);
         if (retVal) {
           final DataSource dataSourceWithInlinedChannelData = broadcastJoinHelper.inlineChannelData(query.getDataSource());
           segmentMapFn = dataSourceWithInlinedChannelData.createSegmentMapFunction(query, cpuAccumulator);
         }
         else {
           segmentMapFn = Function.identity();
         }
         return true;
   }
   ```



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscribe@druid.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscribe@druid.apache.org
For additional commands, e-mail: commits-help@druid.apache.org


[GitHub] [druid] clintropolis commented on a diff in pull request #13085: Refactoring the data source before unnest

Posted by GitBox <gi...@apache.org>.
clintropolis commented on code in PR #13085:
URL: https://github.com/apache/druid/pull/13085#discussion_r997561473


##########
processing/src/main/java/org/apache/druid/query/DataSource.java:
##########
@@ -83,8 +89,40 @@
    * Returns true if this datasource represents concrete data that can be scanned via a
    * {@link org.apache.druid.segment.Segment} adapter of some kind. True for e.g. 'table' but not for 'query' or 'join'.
    *
-   * @see org.apache.druid.query.planning.DataSourceAnalysis#isConcreteBased() which uses this
-   * @see org.apache.druid.query.planning.DataSourceAnalysis#isConcreteTableBased() which uses this
+   * @see DataSourceAnalysis#isConcreteBased() which uses this
+   * @see DataSourceAnalysis#isConcreteTableBased() which uses this
    */
   boolean isConcrete();
+
+  /**
+   * Returns a segment function on to how to segment should be modified.
+   *
+   * @param query the input query
+   * @param cpuTimeAcc the cpu time accumulator
+   * @return the segment function
+   */
+  Function<SegmentReference, SegmentReference> createSegmentMapFunction(Query query, AtomicLong cpuTimeAcc);
+
+  /**
+   * Returns an updated datasource based on the specified new source.
+   *
+   * @param newSource the new datasource to be used to update an existing query
+   * @return the updated datasource to be used
+   */
+  DataSource withUpdatedDataSource(DataSource newSource);
+
+  /**
+   * Compute a cache key prefix for a data source. This includes the data sources that participate in the RHS of a
+   * join as well as any query specific constructs associated with join data source such as base table filter. This key prefix
+   * can be used in segment level cache or result level cache. The function can return following
+   * - Non-empty byte array - If there is join datasource involved and caching is possible. The result includes
+   * join condition expression, join type and cache key returned by joinable factory for each {@link PreJoinableClause}
+   * - NULL - There is a join but caching is not possible. It may happen if one of the participating datasource
+   * in the JOIN is not cacheable.
+   *
+   * @return the cache key to be used as part of query cache key
+   * @throws {@link IAE} if this operation is called on a non-join data source
+   */
+  byte[] getCacheKey();

Review Comment:
   >throws {@link IAE} if this operation is called on a non-join data source
   the javadocs are maybe not true exactly? it looks like most implementations are currently returning empty bytes.
   
   Im also a bit unsure how great this naming is given that it should only be used for joins afaik, and not used at all for cache keys otherwise. it seems sort of seems like it would be important for all query types, but calling it something specific to joins also seems short sighted... Not sure what exactly to do here.. maybe this needs a wider discussion about what kinds of datasources need to implement this and what do not which would help clear this up



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscribe@druid.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscribe@druid.apache.org
For additional commands, e-mail: commits-help@druid.apache.org


[GitHub] [druid] imply-cheddar commented on a diff in pull request #13085: Refactoring the data source before unnest

Posted by GitBox <gi...@apache.org>.
imply-cheddar commented on code in PR #13085:
URL: https://github.com/apache/druid/pull/13085#discussion_r996620329


##########
sql/src/main/java/org/apache/druid/sql/calcite/external/ExternalDataSource.java:
##########
@@ -141,7 +141,7 @@ public DataSource withUpdatedDataSource(DataSource newSource)
   @Override
   public byte[] getCacheKey()
   {
-    return null;
+    return new byte[0];

Review Comment:
   This should either not be cacheable or it needs to build up an identifier for the external data source...  `null` seems correct for now.



##########
processing/src/main/java/org/apache/druid/query/QueryDataSource.java:
##########
@@ -109,7 +109,7 @@ public DataSource withUpdatedDataSource(DataSource newSource)
   @Override
   public byte[] getCacheKey()
   {
-    return null;
+    return new byte[0];

Review Comment:
   So, the query data source itself is not likely cacheable.  The sub-query that is run from inside the data source is perhaps cacheable, but not the datasource.  Well, unless it's a result level cache, that could work.  But, I'm like 80% that `null` is actually correct here.



##########
extensions-core/multi-stage-query/src/main/java/org/apache/druid/msq/querykit/InputNumberDataSource.java:
##########
@@ -112,7 +119,7 @@ public DataSource withUpdatedDataSource(DataSource newSource)
   @Override
   public byte[] getCacheKey()
   {
-    return null;
+    return new byte[0];

Review Comment:
   This seems uncacheable to me.



##########
processing/src/main/java/org/apache/druid/query/InlineDataSource.java:
##########
@@ -252,7 +252,7 @@ public DataSource withUpdatedDataSource(DataSource newSource)
   @Override
   public byte[] getCacheKey()
   {
-    return null;
+    return new byte[0];

Review Comment:
   Are you sure that this was cached previously?  It seems like an `InlineDataSource` would either have to include all of the data in the cache key or cause caching to never happen...



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscribe@druid.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscribe@druid.apache.org
For additional commands, e-mail: commits-help@druid.apache.org


[GitHub] [druid] imply-cheddar commented on a diff in pull request #13085: Refactoring the data source before unnest

Posted by GitBox <gi...@apache.org>.
imply-cheddar commented on code in PR #13085:
URL: https://github.com/apache/druid/pull/13085#discussion_r997779323


##########
extensions-core/multi-stage-query/src/main/java/org/apache/druid/msq/querykit/BaseLeafFrameProcessor.java:
##########
@@ -146,18 +147,30 @@ protected SegmentReference mapSegment(final Segment segment)
 
   private boolean initializeSegmentMapFn(final IntSet readableInputs)
   {
+    final AtomicLong cpuAccumulator = new AtomicLong();
     if (segmentMapFn != null) {
       return true;
     } else if (broadcastJoinHelper == null) {
       segmentMapFn = Function.identity();
       return true;
     } else {
       final boolean retVal = broadcastJoinHelper.buildBroadcastTablesIncrementally(readableInputs);
-
+      DataSource inlineChannelDataSource = broadcastJoinHelper.inlineChannelData(query.getDataSource());
       if (retVal) {
-        segmentMapFn = broadcastJoinHelper.makeSegmentMapFn(query);
+        if (inlineChannelDataSource instanceof InputNumberDataSource) {
+          InputNumberDataSource inputNumberDataSource = (InputNumberDataSource) query.getDataSource();
+          // The InputNumberDataSource requires a BroadcastJoinHelper to be able to create its
+          // segment map function.  It would be a lot better if the InputNumberDataSource actually
+          // had a way to get that injected into it on its own, but the relationship between these objects
+          // was figured out during a refactor and using a setter here seemed like the least-bad way to
+          // make progress on the refactor without breaking functionality.  Hopefully, some future
+          // developer will move this away from a setter.
+          inputNumberDataSource.setBroadcastJoinHelper(broadcastJoinHelper);
+          segmentMapFn = inputNumberDataSource.createSegmentMapFunction(query, cpuAccumulator);
+        } else {
+          segmentMapFn = inlineChannelDataSource.createSegmentMapFunction(query, cpuAccumulator);

Review Comment:
   Did we find a test that was broken with the previous code?  If not, then please revert back to the code that @abhishekagarwal87 initially commented on and keep that code active.  That code is not broken until we can actually reproduce the breakage.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscribe@druid.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscribe@druid.apache.org
For additional commands, e-mail: commits-help@druid.apache.org


[GitHub] [druid] somu-imply commented on a diff in pull request #13085: Refactoring the data source before unnest

Posted by GitBox <gi...@apache.org>.
somu-imply commented on code in PR #13085:
URL: https://github.com/apache/druid/pull/13085#discussion_r985280438


##########
processing/src/main/java/org/apache/druid/query/InlineDataSource.java:
##########
@@ -102,6 +105,55 @@ public static InlineDataSource fromIterable(
     return new InlineDataSource(rows, signature);
   }
 
+  /**
+   * A very zealous equality checker for "rows" that respects deep equality of arrays, but nevertheless refrains
+   * from materializing things needlessly. Useful for unit tests that want to compare equality of different
+   * InlineDataSource instances.
+   */
+  private static boolean rowsEqual(final Iterable<Object[]> rowsA, final Iterable<Object[]> rowsB)

Review Comment:
   This was my IDE doing stuff. I reformatted the code in the IDE but it does not seem to push public methods in the top of the file



##########
extensions-core/multi-stage-query/src/main/java/org/apache/druid/msq/querykit/BaseLeafFrameProcessor.java:
##########
@@ -152,13 +153,23 @@ private boolean initializeSegmentMapFn(final IntSet readableInputs)
       segmentMapFn = Function.identity();
       return true;
     } else {
-      final boolean retVal = broadcastJoinHelper.buildBroadcastTablesIncrementally(readableInputs);
-
-      if (retVal) {
-        segmentMapFn = broadcastJoinHelper.makeSegmentMapFn(query);
+      if (query.getDataSource() instanceof InputNumberDataSource) {
+        final boolean retVal = broadcastJoinHelper.buildBroadcastTablesIncrementally(readableInputs);
+        if (retVal) {
+          InputNumberDataSource inputNumberDataSource = (InputNumberDataSource) query.getDataSource();
+          // The InputNumberData source was going through the broadcastJoinHelper which
+          // was using the JoinableFactoryWrapper to create segment map function.
+          // After refactoring, the segment map function creation is moved to data source
+          // Hence for InputNumberDataSource we are setting the broadcast join helper for the data source
+          // and moving the segment map function creation there

Review Comment:
   Done



##########
server/src/main/java/org/apache/druid/server/LocalQuerySegmentWalker.java:
##########
@@ -94,12 +93,12 @@ public <T> QueryRunner<T> getQueryRunnerForIntervals(final Query<T> query, final
 
     final AtomicLong cpuAccumulator = new AtomicLong(0L);
 
-    final Function<SegmentReference, SegmentReference> segmentMapFn = joinableFactoryWrapper.createSegmentMapFn(
-        analysis.getJoinBaseTableFilter().map(Filters::toFilter).orElse(null),
-        analysis.getPreJoinableClauses(),
-        cpuAccumulator,
-        analysis.getBaseQuery().orElse(query)
-    );
+    Function<SegmentReference, SegmentReference> segmentMapFn = analysis.getDataSource()
+                                                                        .createSegmentMapFunction(
+                                                                            query,
+                                                                            cpuAccumulator
+                                                                        );
+

Review Comment:
   Done



##########
processing/src/main/java/org/apache/druid/segment/join/JoinableFactoryWrapper.java:
##########
@@ -340,12 +192,60 @@ static JoinClauseToFilterConversion convertJoinToFilter(
     return new JoinClauseToFilterConversion(null, false);
   }
 
+  public JoinableFactory getJoinableFactory()
+  {
+    return joinableFactory;
+  }
+
+  /**
+   * Compute a cache key prefix for a join data source. This includes the data sources that participate in the RHS of a
+   * join as well as any query specific constructs associated with join data source such as base table filter. This key prefix
+   * can be used in segment level cache or result level cache. The function can return following wrapped in an

Review Comment:
   The IDE did this. I am using the druid intellij code formatter. Let me check how this can be avoided 



##########
processing/src/main/java/org/apache/druid/query/JoinDataSource.java:
##########
@@ -127,12 +155,22 @@ public static JoinDataSource create(
       final String rightPrefix,
       final JoinConditionAnalysis conditionAnalysis,
       final JoinType joinType,
-      final DimFilter leftFilter
+      final DimFilter leftFilter,
+      @Nullable @JacksonInject final JoinableFactoryWrapper joinableFactoryWrapper

Review Comment:
   Excellent point ! Removed



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscribe@druid.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscribe@druid.apache.org
For additional commands, e-mail: commits-help@druid.apache.org