You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@druid.apache.org by GitBox <gi...@apache.org> on 2020/09/08 18:14:54 UTC

[GitHub] [druid] abhishekagarwal87 opened a new pull request #10366: Proposed changes for making joins cacheable

abhishekagarwal87 opened a new pull request #10366:
URL: https://github.com/apache/druid/pull/10366


   This PR adds caching capabilities to queries with join. The support is limited for now and only supported if the right-hand data source in the join is `GlobalTableDataSource`.  But it can be extended to more data sources in follow-up PRs. 
   
   
   ### Description
   
   The cache key is computed independently of `Joinable` objects. `ServerManager`, `ResultLevelCachingQueryRunner`, `SinkQuerySegmentWalker` uses `Joinables.computeDataSourceCacheKey` which then loops through all `PreJoinableClause` objects and 
     - builds the cache key for data source participating in clause
       - calls `JoinableFactory.computeJoinCacheKey`. Returns absent if caching is not supported. `BroadcastTableJoinableFactory` returns a key computed using the segment of the broadcast table  
     - adds the `JoinConditionAnalysis` to the above cache key
   
   <hr>
   
   This PR has:
   - [ ] been self-reviewed.
      - [ ] using the [concurrency checklist](https://github.com/apache/druid/blob/master/dev/code-review/concurrency.md) (Remove this item if the PR doesn't have any relation to concurrency.)
   - [ ] added documentation for new or modified features or behaviors.
   - [ ] added Javadocs for most classes and all non-trivial methods. Linked related entities via Javadoc links.
   - [ ] added or updated version, license, or notice information in [licenses.yaml](https://github.com/apache/druid/blob/master/licenses.yaml)
   - [ ] added comments explaining the "why" and the intent of the code wherever would not be obvious for an unfamiliar reader.
   - [ ] added unit tests or modified existing tests to cover new code paths, ensuring the threshold for [code coverage](https://github.com/apache/druid/blob/master/dev/code-review/code-coverage.md) is met.
   - [ ] added integration tests.
   - [ ] been tested in a test Druid cluster.
   
   <!-- Check the items by putting "x" in the brackets for the done things. Not all of these items apply to every PR. Remove the items which are not done or not relevant to the PR. None of the items from the checklist above are strictly necessary, but it would be very helpful if you at least self-review the PR. -->
   
   <hr>
   
   ##### Key changed/added classes in this PR
    * `ServerManager`
    * `SinkQuerySegmentWalker`
    * `ResultLevelCachingQueryRunner`
    * `Joinables`
   


----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscribe@druid.apache.org
For additional commands, e-mail: commits-help@druid.apache.org


[GitHub] [druid] clintropolis commented on a change in pull request #10366: Proposed changes for making joins cacheable

Posted by GitBox <gi...@apache.org>.
clintropolis commented on a change in pull request #10366:
URL: https://github.com/apache/druid/pull/10366#discussion_r487467291



##########
File path: processing/src/main/java/org/apache/druid/segment/join/Joinables.java
##########
@@ -118,6 +126,47 @@ public static boolean isPrefixedBy(final String columnName, final String prefix)
     );
   }
 
+  /**
+   * Compute a cache key prefix for data sources that is to be used in segment level and result level caches. The
+   * data source can either be base (clauses is empty) or RHS of a join (clauses is non-empty). In both of the cases,
+   * a non-null cache is returned. However, the cache key is null if there is a join and some of the right data sources
+   * participating in the join do not support caching yet
+   *
+   * @param dataSourceAnalysis
+   * @param joinableFactory
+   * @return
+   */
+  public static Optional<byte[]> computeDataSourceCacheKey(
+      final DataSourceAnalysis dataSourceAnalysis,
+      final JoinableFactory joinableFactory
+  )
+  {
+    final CacheKeyBuilder keyBuilder;
+    final List<PreJoinableClause> clauses = dataSourceAnalysis.getPreJoinableClauses();
+    if (clauses.isEmpty()) {
+      keyBuilder = new CacheKeyBuilder(REGULAR_OPERATION);
+    } else {
+      keyBuilder = new CacheKeyBuilder(JOIN_OPERATION);
+      for (PreJoinableClause clause : clauses) {
+        if (!clause.getCondition().canHashJoin()) {

Review comment:
       Why does this matter? Shouldn't it just be on the joinable factory to decide if it can cache or not and we should supply the `PreJoinableClause` or the `JoinConditionAnalysis` from `PreJoinableClause.getCondition` to it?

##########
File path: server/src/main/java/org/apache/druid/server/coordination/ServerManager.java
##########
@@ -293,21 +302,28 @@ public ServerManager(
         queryMetrics -> queryMetrics.segment(segmentIdString)
     );
 
-    CachingQueryRunner<T> cachingQueryRunner = new CachingQueryRunner<>(
-        segmentIdString,
-        segmentDescriptor,
-        objectMapper,
-        cache,
-        toolChest,
-        metricsEmittingQueryRunnerInner,
-        cachePopulator,
-        cacheConfig
-    );
+    QueryRunner<T> queryRunner;

Review comment:
       `CachingQueryRunner` already has the machinery to decide whether or not to use the caches, should we just pass the optional in and use it as another input to the existing `useCache`/`populateCache` equation so we don't need these if statements in `ServerManager` and `SinkQuerySegmentWalker`?

##########
File path: processing/src/main/java/org/apache/druid/segment/join/MapJoinableFactory.java
##########
@@ -80,4 +80,21 @@ public boolean isDirectlyJoinable(DataSource dataSource)
     }
     return maybeJoinable;
   }
+
+  @Override
+  public Optional<byte[]> computeJoinCacheKey(DataSource dataSource)

Review comment:
       Since multiple joinable factories can support the same class of datasource, I guess this means that this method needs to mimic the same selection logic as `build` to make sure that a joinable factory only computes a cache key for a table that it would build a joinable for in the query? I think we should probably supply the `PreJoinableClause` or the `JoinConditionAnalysis ` to this method so that it can have the `JoinConditionAnalysis` that `build` has access to. Then these factories can make better decisions about if they can compute a key for the query or not.

##########
File path: processing/src/main/java/org/apache/druid/segment/join/table/BroadcastSegmentIndexedTable.java
##########
@@ -241,6 +244,22 @@ public ColumnSelectorFactory makeColumnSelectorFactory(ReadableOffset offset, bo
     );
   }
 
+  @Override
+  public Optional<byte[]> computeCacheKey()
+  {
+    SegmentId segmentId = segment.getId();

Review comment:
       Could this just be a method on `SegmentId`?

##########
File path: server/src/main/java/org/apache/druid/query/ResultLevelCachingQueryRunner.java
##########
@@ -86,7 +92,15 @@ public ResultLevelCachingQueryRunner(
     if (useResultCache || populateResultCache) {
 
       final String cacheKeyStr = StringUtils.fromUtf8(strategy.computeResultLevelCacheKey(query));
-      final byte[] cachedResultSet = fetchResultsFromResultLevelCache(cacheKeyStr);
+      DataSourceAnalysis analysis = DataSourceAnalysis.forDataSource(query.getDataSource());
+      byte[] dataSourceCacheKey = Joinables.computeDataSourceCacheKey(analysis, joinableFactory).orElse(null);
+      if (null == dataSourceCacheKey) {
+        return baseRunner.run(
+            queryPlus,
+            responseContext
+        );
+      }
+      final byte[] cachedResultSet = fetchResultsFromResultLevelCache(cacheKeyStr, dataSourceCacheKey);

Review comment:
       nit: this seems a little strange that the cache key is turned into the namespace and another key is used as the bytes, rather than making some sort of composite, but maybe it is ok?

##########
File path: server/src/main/java/org/apache/druid/segment/join/BroadcastTableJoinableFactory.java
##########
@@ -76,4 +76,27 @@ public boolean isDirectlyJoinable(DataSource dataSource)
     }
     return Optional.empty();
   }
+
+  @Override
+  public Optional<byte[]> computeJoinCacheKey(DataSource dataSource)

Review comment:
       seems like this and `build` could share a method that gets the `ReferenceCountingIndexedTable` if possible, and build can make the joinable and this method compute the key on the results

##########
File path: processing/src/main/java/org/apache/druid/segment/join/Joinables.java
##########
@@ -118,6 +126,47 @@ public static boolean isPrefixedBy(final String columnName, final String prefix)
     );
   }
 
+  /**
+   * Compute a cache key prefix for data sources that is to be used in segment level and result level caches. The
+   * data source can either be base (clauses is empty) or RHS of a join (clauses is non-empty). In both of the cases,
+   * a non-null cache is returned. However, the cache key is null if there is a join and some of the right data sources
+   * participating in the join do not support caching yet
+   *
+   * @param dataSourceAnalysis
+   * @param joinableFactory
+   * @return
+   */
+  public static Optional<byte[]> computeDataSourceCacheKey(
+      final DataSourceAnalysis dataSourceAnalysis,
+      final JoinableFactory joinableFactory
+  )
+  {
+    final CacheKeyBuilder keyBuilder;
+    final List<PreJoinableClause> clauses = dataSourceAnalysis.getPreJoinableClauses();
+    if (clauses.isEmpty()) {
+      keyBuilder = new CacheKeyBuilder(REGULAR_OPERATION);

Review comment:
       I guess this would bust cache on upgrade since all non-join cache keys would have this new byte prefix? Could we just not add a prefix if the query isn't a join query?




----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscribe@druid.apache.org
For additional commands, e-mail: commits-help@druid.apache.org


[GitHub] [druid] abhishekagarwal87 commented on a change in pull request #10366: Add caching support to join queries

Posted by GitBox <gi...@apache.org>.
abhishekagarwal87 commented on a change in pull request #10366:
URL: https://github.com/apache/druid/pull/10366#discussion_r492023432



##########
File path: processing/src/main/java/org/apache/druid/segment/join/Joinables.java
##########
@@ -118,6 +112,74 @@ public static boolean isPrefixedBy(final String columnName, final String prefix)
     );
   }
 
+  /**
+   * Compute a cache key prefix for data sources that participate in the RHS of a join. This key prefix
+   * can be used in segment level cache or result level cache. The function can return following wrapped in an
+   * Optional
+   *  - Non-empty byte array - If there is join datasource involved and caching is possible. The result includes
+   *  join condition expression, join type and cache key returned by joinable factory for each {@link PreJoinableClause}
+   *  - NULL - There is a join but caching is not possible. It may happen if one of the participating datasource
+   *  in the JOIN is not cacheable.
+   *
+   * @throws {@link IAE} if this operation is called on a non-join data source
+   * @param dataSourceAnalysis
+   * @return
+   */
+  public Optional<byte[]> computeJoinDataSourceCacheKey(
+      final DataSourceAnalysis dataSourceAnalysis
+  )
+  {
+
+    final List<PreJoinableClause> clauses = dataSourceAnalysis.getPreJoinableClauses();
+    if (clauses.isEmpty()) {
+      throw new IAE("No join clauses to build the cache key");

Review comment:
       While I am going to add `dataSourceAnalysis.getDataSource()` to the exception string, I just hope it doesn't blow up the message. On a quick look, it looks alright though. 




----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscribe@druid.apache.org
For additional commands, e-mail: commits-help@druid.apache.org


[GitHub] [druid] suneet-s commented on pull request #10366: Add caching support to join queries

Posted by GitBox <gi...@apache.org>.
suneet-s commented on pull request #10366:
URL: https://github.com/apache/druid/pull/10366#issuecomment-702261402


   Thanks for adding this ability!
   
   I haven't read the code changes yet, but can you describe how you are thinking about feature flagging this behavior? Since it looks like a big change - is this behavior enabled / disabled by default? What should an operator do to disable / enable it?
   
   Do we plan on using the existing toggles for query caching?


----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscribe@druid.apache.org
For additional commands, e-mail: commits-help@druid.apache.org


[GitHub] [druid] abhishekagarwal87 commented on a change in pull request #10366: Add caching support to join queries

Posted by GitBox <gi...@apache.org>.
abhishekagarwal87 commented on a change in pull request #10366:
URL: https://github.com/apache/druid/pull/10366#discussion_r494839186



##########
File path: server/src/main/java/org/apache/druid/client/CachingClusteredClient.java
##########
@@ -293,6 +301,14 @@ private ClusterQueryResult(Sequence<T> sequence, int numQueryServers)
       this.intervals = dataSourceAnalysis.getBaseQuerySegmentSpec()
                                          .map(QuerySegmentSpec::getIntervals)
                                          .orElseGet(() -> query.getIntervals());
+      this.cacheKeyManager = new CacheKeyManager<>(

Review comment:
       That is a good corner case. I didn't think of this. I don't see an easy flag to selectively disable result level caching for join. So I am just going to hard-code this in the new `CacheKeyManager`.  sounds good? 




----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscribe@druid.apache.org
For additional commands, e-mail: commits-help@druid.apache.org


[GitHub] [druid] gianm commented on a change in pull request #10366: Add caching support to join queries

Posted by GitBox <gi...@apache.org>.
gianm commented on a change in pull request #10366:
URL: https://github.com/apache/druid/pull/10366#discussion_r495144556



##########
File path: processing/src/main/java/org/apache/druid/segment/join/Joinables.java
##########
@@ -118,6 +107,68 @@ public static boolean isPrefixedBy(final String columnName, final String prefix)
     );
   }
 
+  /**
+   * Compute a cache key prefix for data sources that participate in the RHS of a join. This key prefix
+   * can be used in segment level cache or result level cache. The function can return following wrapped in an
+   * Optional
+   *  - Non-empty byte array - If there is join datasource involved and caching is possible. The result includes
+   *  join condition expression, join type and cache key returned by joinable factory for each {@link PreJoinableClause}
+   *  - NULL - There is a join but caching is not possible. It may happen if one of the participating datasource
+   *  in the JOIN is not cacheable.
+   *
+   * @throws {@link IAE} if this operation is called on a non-join data source
+   * @param dataSourceAnalysis for the join datasource
+   * @return the optional cache key to be used as part of query cache key
+   */
+  public Optional<byte[]> computeJoinDataSourceCacheKey(
+      final DataSourceAnalysis dataSourceAnalysis
+  )
+  {
+    final List<PreJoinableClause> clauses = dataSourceAnalysis.getPreJoinableClauses();
+    if (clauses.isEmpty()) {
+      throw new IAE("No join clauses to build the cache key for data source [%s]", dataSourceAnalysis.getDataSource());
+    }
+
+    final CacheKeyBuilder keyBuilder;
+    keyBuilder = new CacheKeyBuilder(JOIN_OPERATION);
+    for (PreJoinableClause clause : clauses) {
+      Optional<byte[]> bytes = joinableFactory.computeJoinCacheKey(clause.getDataSource(), clause.getCondition());
+      if (!bytes.isPresent()) {
+        // Encountered a data source which didn't support cache yet
+        log.debug("skipping caching for join since [%s] does not support caching", clause.getDataSource());
+        return Optional.empty();
+      }
+      keyBuilder.appendByteArray(bytes.get());
+      keyBuilder.appendString(clause.getCondition().getOriginalExpression());
+      keyBuilder.appendString(clause.getJoinType().name());

Review comment:
       It makes a difference because the prefixed fields might be inputs to aggregators, the group by clause, etc. If the prefix changes then those things would behave differently.




----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscribe@druid.apache.org
For additional commands, e-mail: commits-help@druid.apache.org


[GitHub] [druid] abhishekagarwal87 commented on a change in pull request #10366: Proposed changes for making joins cacheable

Posted by GitBox <gi...@apache.org>.
abhishekagarwal87 commented on a change in pull request #10366:
URL: https://github.com/apache/druid/pull/10366#discussion_r487698508



##########
File path: processing/src/main/java/org/apache/druid/segment/join/MapJoinableFactory.java
##########
@@ -80,4 +80,21 @@ public boolean isDirectlyJoinable(DataSource dataSource)
     }
     return maybeJoinable;
   }
+
+  @Override
+  public Optional<byte[]> computeJoinCacheKey(DataSource dataSource)

Review comment:
       Right now as I see that additional info comes from the data source itself. The joinable factory is deciding based on the `dataSource` whether it can build the joinable factory.  can you share any hypothetical examples wherein one factory decides to build Joinable but the other one does not on the basis of `JoinConditionAnalysis`?




----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscribe@druid.apache.org
For additional commands, e-mail: commits-help@druid.apache.org


[GitHub] [druid] abhishekagarwal87 commented on a change in pull request #10366: Proposed changes for making joins cacheable

Posted by GitBox <gi...@apache.org>.
abhishekagarwal87 commented on a change in pull request #10366:
URL: https://github.com/apache/druid/pull/10366#discussion_r487699662



##########
File path: server/src/main/java/org/apache/druid/query/ResultLevelCachingQueryRunner.java
##########
@@ -86,7 +92,15 @@ public ResultLevelCachingQueryRunner(
     if (useResultCache || populateResultCache) {
 
       final String cacheKeyStr = StringUtils.fromUtf8(strategy.computeResultLevelCacheKey(query));
-      final byte[] cachedResultSet = fetchResultsFromResultLevelCache(cacheKeyStr);
+      DataSourceAnalysis analysis = DataSourceAnalysis.forDataSource(query.getDataSource());
+      byte[] dataSourceCacheKey = Joinables.computeDataSourceCacheKey(analysis, joinableFactory).orElse(null);
+      if (null == dataSourceCacheKey) {
+        return baseRunner.run(
+            queryPlus,
+            responseContext
+        );
+      }
+      final byte[] cachedResultSet = fetchResultsFromResultLevelCache(cacheKeyStr, dataSourceCacheKey);

Review comment:
       yeah. this is not well thought out right now. I will revisit this. 




----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscribe@druid.apache.org
For additional commands, e-mail: commits-help@druid.apache.org


[GitHub] [druid] abhishekagarwal87 edited a comment on pull request #10366: Add caching support to join queries

Posted by GitBox <gi...@apache.org>.
abhishekagarwal87 edited a comment on pull request #10366:
URL: https://github.com/apache/druid/pull/10366#issuecomment-702305535


   We have per-query toggles and those should suffice IMO. The caching is anyway disabled on broker. `useCache` and `populateCache` parameters can be set to false in the query-context to disable caching on the individual query. 


----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscribe@druid.apache.org
For additional commands, e-mail: commits-help@druid.apache.org


[GitHub] [druid] jihoonson merged pull request #10366: Add caching support to join queries

Posted by GitBox <gi...@apache.org>.
jihoonson merged pull request #10366:
URL: https://github.com/apache/druid/pull/10366


   


----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscribe@druid.apache.org
For additional commands, e-mail: commits-help@druid.apache.org


[GitHub] [druid] abhishekagarwal87 commented on a change in pull request #10366: Add caching support to join queries

Posted by GitBox <gi...@apache.org>.
abhishekagarwal87 commented on a change in pull request #10366:
URL: https://github.com/apache/druid/pull/10366#discussion_r491897453



##########
File path: processing/src/main/java/org/apache/druid/segment/join/Joinables.java
##########
@@ -35,57 +39,47 @@
 import javax.annotation.Nullable;
 import java.util.Comparator;
 import java.util.List;
+import java.util.Optional;
 import java.util.concurrent.atomic.AtomicLong;
 import java.util.function.Function;
 
 /**
- * Utility methods for working with {@link Joinable} related classes.
+ * A wrapper class over {@link JoinableFactory} for working with {@link Joinable} related classes.

Review comment:
       I want to at least have some methods non-static which are somewhat non-trivial.  The unit testing becomes easier with that as I can pass the mocks etc. I can also create another class and move non-static methods there and will leave rest of the methods as it is. does that work? 




----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscribe@druid.apache.org
For additional commands, e-mail: commits-help@druid.apache.org


[GitHub] [druid] abhishekagarwal87 commented on a change in pull request #10366: Add caching support to join queries

Posted by GitBox <gi...@apache.org>.
abhishekagarwal87 commented on a change in pull request #10366:
URL: https://github.com/apache/druid/pull/10366#discussion_r494807466



##########
File path: processing/src/main/java/org/apache/druid/segment/join/table/IndexedTable.java
##########
@@ -87,6 +88,26 @@ default ColumnSelectorFactory makeColumnSelectorFactory(ReadableOffset offset, b
     return null;
   }
 
+  /**
+   * Computes a {@code byte[]} key for the table that can be used for computing cache keys for join operations.
+   * see {@link org.apache.druid.segment.join.JoinableFactory#computeJoinCacheKey}
+   *
+   * @return the byte array for cache key
+   * @throws {@link IAE} if caching is not supported
+   */
+  default byte[] computeCacheKey()

Review comment:
       I think part of the reason for not doing so was that not all IndexedTables are Cacheable. That is why I have two methods 
   `isCacheable`
   `computeCacheKey`
   I could get rid of `isCacheable` and return null from `computeCacheKey` when caching is unsupported. That didn't look very robust to me. 




----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscribe@druid.apache.org
For additional commands, e-mail: commits-help@druid.apache.org


[GitHub] [druid] jihoonson commented on a change in pull request #10366: Add caching support to join queries

Posted by GitBox <gi...@apache.org>.
jihoonson commented on a change in pull request #10366:
URL: https://github.com/apache/druid/pull/10366#discussion_r502082167



##########
File path: server/src/main/java/org/apache/druid/client/CachingClusteredClient.java
##########
@@ -289,14 +289,17 @@ private ClusterQueryResult(Sequence<T> sequence, int numQueryServers)
       this.query = queryPlus.getQuery();
       this.toolChest = warehouse.getToolChest(query);
       this.strategy = toolChest.getCacheStrategy(query);
+      this.dataSourceAnalysis = DataSourceAnalysis.forDataSource(query.getDataSource());
 
-      this.useCache = CacheUtil.isUseSegmentCache(query, strategy, cacheConfig, CacheUtil.ServerType.BROKER);
-      this.populateCache = CacheUtil.isPopulateSegmentCache(query, strategy, cacheConfig, CacheUtil.ServerType.BROKER);
+      // Broker join caching is disabled - https://github.com/apache/druid/issues/10444
+      this.useCache = CacheUtil.isUseSegmentCache(query, strategy, cacheConfig, CacheUtil.ServerType.BROKER)
+                      && !dataSourceAnalysis.isJoin();

Review comment:
       I think this check should be not here, but inside `CacheUtil.isUseSegmentCache()`. The reason we pass the `ServerType` in `isUseSegmentCache()` is that we want to make different decisions based on the `ServerType`. We are disabling the cache for join on brokers, so it should be in there.

##########
File path: server/src/main/java/org/apache/druid/client/CachingClusteredClient.java
##########
@@ -289,14 +289,17 @@ private ClusterQueryResult(Sequence<T> sequence, int numQueryServers)
       this.query = queryPlus.getQuery();
       this.toolChest = warehouse.getToolChest(query);
       this.strategy = toolChest.getCacheStrategy(query);
+      this.dataSourceAnalysis = DataSourceAnalysis.forDataSource(query.getDataSource());
 
-      this.useCache = CacheUtil.isUseSegmentCache(query, strategy, cacheConfig, CacheUtil.ServerType.BROKER);
-      this.populateCache = CacheUtil.isPopulateSegmentCache(query, strategy, cacheConfig, CacheUtil.ServerType.BROKER);
+      // Broker join caching is disabled - https://github.com/apache/druid/issues/10444
+      this.useCache = CacheUtil.isUseSegmentCache(query, strategy, cacheConfig, CacheUtil.ServerType.BROKER)
+                      && !dataSourceAnalysis.isJoin();
+      this.populateCache = CacheUtil.isPopulateSegmentCache(query, strategy, cacheConfig, CacheUtil.ServerType.BROKER)

Review comment:
       Same comment.




----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscribe@druid.apache.org
For additional commands, e-mail: commits-help@druid.apache.org


[GitHub] [druid] jihoonson commented on a change in pull request #10366: Add caching support to join queries

Posted by GitBox <gi...@apache.org>.
jihoonson commented on a change in pull request #10366:
URL: https://github.com/apache/druid/pull/10366#discussion_r493091856



##########
File path: processing/src/test/java/org/apache/druid/segment/join/MapJoinableFactoryTest.java
##########
@@ -113,6 +118,12 @@ public void testBuildExceptionWhenTwoJoinableFactoryForSameDataSource()
     EasyMock.expect(noopJoinableFactory.build(noopDataSource, condition)).andReturn(Optional.of(mockJoinable));
     EasyMock.expect(anotherNoopJoinableFactory.build(noopDataSource, condition)).andReturn(Optional.of(mockJoinable));
     EasyMock.replay(noopJoinableFactory, anotherNoopJoinableFactory);
+    expectedException.expect(ISE.class);
+    expectedException.expectMessage(String.format(

Review comment:
       Please use `StringUtils.format()` instead here and in other places.

##########
File path: server/src/main/java/org/apache/druid/client/CachingClusteredClient.java
##########
@@ -796,4 +749,102 @@ private void addSequencesFromServer(
           .flatMerge(seq -> seq, query.getResultOrdering());
     }
   }
+
+  /**
+   * An inner class that is used solely for computing cache keys. Its a separate class to allow extensive unit testing
+   * of cache key generation.
+   */
+  @VisibleForTesting
+  static class CacheKeyManager<T>

Review comment:
       This is nice :+1: 




----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscribe@druid.apache.org
For additional commands, e-mail: commits-help@druid.apache.org


[GitHub] [druid] abhishekagarwal87 commented on a change in pull request #10366: Add caching support to join queries

Posted by GitBox <gi...@apache.org>.
abhishekagarwal87 commented on a change in pull request #10366:
URL: https://github.com/apache/druid/pull/10366#discussion_r492028177



##########
File path: processing/src/test/java/org/apache/druid/segment/join/MapJoinableFactoryTest.java
##########
@@ -83,6 +84,36 @@ public void testBuildDataSourceIsRegisteredShouldReturnJoinableFromFactory()
     EasyMock.replay(noopJoinableFactory);
     Optional<Joinable> joinable = target.build(noopDataSource, condition);
     Assert.assertEquals(mockJoinable, joinable.get());
+
+  }
+
+  @Test
+  public void testComputeJoinCacheKey()
+  {
+    Optional<byte[]> expected = Optional.of(new byte[]{1, 2, 3});
+    EasyMock.expect(noopJoinableFactory.computeJoinCacheKey(noopDataSource)).andReturn(expected);
+    EasyMock.replay(noopJoinableFactory);
+    Optional<byte[]> actual = target.computeJoinCacheKey(noopDataSource);
+    Assert.assertEquals(expected, actual);

Review comment:
       Replaced with `Assert.assertSame`




----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscribe@druid.apache.org
For additional commands, e-mail: commits-help@druid.apache.org


[GitHub] [druid] gianm commented on a change in pull request #10366: Add caching support to join queries

Posted by GitBox <gi...@apache.org>.
gianm commented on a change in pull request #10366:
URL: https://github.com/apache/druid/pull/10366#discussion_r494681260



##########
File path: processing/src/main/java/org/apache/druid/segment/join/table/IndexedTable.java
##########
@@ -87,6 +88,26 @@ default ColumnSelectorFactory makeColumnSelectorFactory(ReadableOffset offset, b
     return null;
   }
 
+  /**
+   * Computes a {@code byte[]} key for the table that can be used for computing cache keys for join operations.
+   * see {@link org.apache.druid.segment.join.JoinableFactory#computeJoinCacheKey}
+   *
+   * @return the byte array for cache key
+   * @throws {@link IAE} if caching is not supported
+   */
+  default byte[] computeCacheKey()

Review comment:
       Consider naming this `getCacheKey()` and extending Cacheable. That interface is mainly used by CacheKeyBuilder, which it doesn't look like we actually need here, but it's still nice to use that interface for things that can generate their own cache keys.

##########
File path: server/src/main/java/org/apache/druid/client/CachingClusteredClient.java
##########
@@ -770,4 +749,102 @@ private void addSequencesFromServer(
           .flatMerge(seq -> seq, query.getResultOrdering());
     }
   }
+
+  /**
+   * An inner class that is used solely for computing cache keys. Its a separate class to allow extensive unit testing
+   * of cache key generation.
+   */
+  @VisibleForTesting
+  static class CacheKeyManager<T>
+  {
+    private final Query<T> query;
+    private final CacheStrategy<T, Object, Query<T>> strategy;
+    private final DataSourceAnalysis dataSourceAnalysis;
+    private final Joinables joinables;
+    private final boolean isSegmentLevelCachingEnable;
+
+    CacheKeyManager(
+        final Query<T> query,
+        final CacheStrategy<T, Object, Query<T>> strategy,
+        final boolean useCache,
+        final boolean populateCache,
+        final DataSourceAnalysis dataSourceAnalysis,
+        final Joinables joinables
+    )
+    {
+
+      this.query = query;
+      this.strategy = strategy;
+      this.dataSourceAnalysis = dataSourceAnalysis;
+      this.joinables = joinables;
+      this.isSegmentLevelCachingEnable = ((populateCache || useCache)
+                                          && !QueryContexts.isBySegment(query));   // explicit bySegment queries are never cached
+
+    }
+
+    @Nullable
+    byte[] computeSegmentLevelQueryCacheKey()
+    {
+      if (isSegmentLevelCachingEnable) {
+        return computeQueryCacheKeyWithJoin();
+      }
+      return null;
+    }
+
+    /**
+     * It computes the ETAG which is used by {@link org.apache.druid.query.ResultLevelCachingQueryRunner} for
+     * result level caches. queryCacheKey can be null if segment level cache is not being used. However, ETAG
+     * is still computed since result level cache may still be on.
+     */
+    @Nullable
+    String computeResultLevelCachingEtag(
+        final Set<SegmentServerSelector> segments,
+        @Nullable byte[] queryCacheKey
+    )
+    {
+      Hasher hasher = Hashing.sha1().newHasher();
+      boolean hasOnlyHistoricalSegments = true;
+      for (SegmentServerSelector p : segments) {
+        if (!p.getServer().pick().getServer().isSegmentReplicationTarget()) {
+          hasOnlyHistoricalSegments = false;
+          break;
+        }
+        hasher.putString(p.getServer().getSegment().getId().toString(), StandardCharsets.UTF_8);
+        // it is important to add the "query interval" as part ETag calculation
+        // to have result level cache work correctly for queries with different
+        // intervals covering the same set of segments
+        hasher.putString(p.rhs.getInterval().toString(), StandardCharsets.UTF_8);
+      }
+
+      if (!hasOnlyHistoricalSegments) {
+        return null;
+      }
+
+      // query cache key can be null if segment level caching is disabled
+      final byte[] queryCacheKeyFinal = (queryCacheKey == null) ? computeQueryCacheKeyWithJoin() : queryCacheKey;
+      if (queryCacheKeyFinal == null) {
+        return null;
+      }
+      hasher.putBytes(queryCacheKeyFinal);
+      String currEtag = StringUtils.encodeBase64String(hasher.hash().asBytes());
+      return currEtag;
+    }
+
+    /**
+     * Adds the cache key prefix for join data sources. Return null if its a join but caching is not supported
+     */
+    @Nullable
+    private byte[] computeQueryCacheKeyWithJoin()
+    {
+      assert strategy != null;  // implies strategy != null

Review comment:
       I think you just copied this from somewhere else, but IMO it would be better as a precondition check (i.e. throw ISE or NPE if it fails) instead of an assertion.
   
   Generally we'll use assertions to note things that are meant to be _impossible_, and precondition checks for things that are possible but are incorrect usage. Since it is possible to create this CacheKeyManager class with a null `strategy` and then call `computeQueryCacheKeyWithJoin`, it'd be better for this to be a precondition check.

##########
File path: processing/src/main/java/org/apache/druid/segment/join/table/BroadcastSegmentIndexedTable.java
##########
@@ -241,6 +244,22 @@ public ColumnSelectorFactory makeColumnSelectorFactory(ReadableOffset offset, bo
     );
   }
 
+  @Override
+  public Optional<byte[]> computeCacheKey()
+  {
+    SegmentId segmentId = segment.getId();

Review comment:
       I think there actually should be a prefix byte here, because the prefix bytes' purpose is to prevent cache key collisions for two implementations that are different but compute cache keys the same way. So each implementation should have its own prefix byte.
   
   I don't have a strong opinion on where the code should live, but it would be good to share it somehow, if that's not too much trouble.

##########
File path: server/src/main/java/org/apache/druid/client/CachingClusteredClient.java
##########
@@ -293,6 +301,14 @@ private ClusterQueryResult(Sequence<T> sequence, int numQueryServers)
       this.intervals = dataSourceAnalysis.getBaseQuerySegmentSpec()
                                          .map(QuerySegmentSpec::getIntervals)
                                          .orElseGet(() -> query.getIntervals());
+      this.cacheKeyManager = new CacheKeyManager<>(

Review comment:
       I don't think the concept of computing cache keys in CachingClusteredClient works properly here, because of the following scenario:
   
   1. There is a broadcast table that we'll be joining against.
   2. The Broker (which runs CCC) updates its broadcast table to a newer version.
   3. The Broker gets a query and uses the newer version in the cache key.
   4. It fans out the query to a data server that hasn't got the newest broadcast table yet.
   5. The data server returns results for the older table, and the Broker caches them.
   6. Now, the Broker's cache is wrong (it refers to older data with a newer key).
   
   The solution that comes to mind is that the data servers should keep both the old and new version around for a bit when they swap them out, and the Broker should send the specific version that it wants them to use, so it can be sure it's getting the right one. But this is out of the scope of this PR. For now I suggest not implementing caching for join datasources on the Broker. (Broker caching is off by default, anyway, so it's not the end of the world.)

##########
File path: processing/src/main/java/org/apache/druid/segment/join/Joinables.java
##########
@@ -35,57 +39,47 @@
 import javax.annotation.Nullable;
 import java.util.Comparator;
 import java.util.List;
+import java.util.Optional;
 import java.util.concurrent.atomic.AtomicLong;
 import java.util.function.Function;
 
 /**
- * Utility methods for working with {@link Joinable} related classes.
+ * A wrapper class over {@link JoinableFactory} for working with {@link Joinable} related classes.

Review comment:
       I think it'd be better to make the change now, since "Joinables" is meant to be a non-constructible holder of utility functions, and so making it constructible is unnecessarily changing its character. As to cluttering the PR, IMO adding a new class is likely to yield a _less_ cluttered PR, because it won't involve changes to the pre-existing utility class.

##########
File path: server/src/main/java/org/apache/druid/client/CachingQueryRunner.java
##########
@@ -77,20 +83,15 @@ public CachingQueryRunner(
   {
     Query<T> query = queryPlus.getQuery();
     final CacheStrategy strategy = toolChest.getCacheStrategy(query);
-    final boolean populateCache = CacheUtil.isPopulateSegmentCache(
-        query,
-        strategy,
-        cacheConfig,
-        CacheUtil.ServerType.DATA
-    );
-    final boolean useCache = CacheUtil.isUseSegmentCache(query, strategy, cacheConfig, CacheUtil.ServerType.DATA);
+    final boolean populateCache = canPopulateCache(query, strategy);
+    final boolean useCache = canUseCache(query, strategy);
 
     final Cache.NamedKey key;
-    if (strategy != null && (useCache || populateCache)) {

Review comment:
       Is it not possible for `strategy` to be null?

##########
File path: processing/src/main/java/org/apache/druid/segment/join/Joinables.java
##########
@@ -118,6 +107,68 @@ public static boolean isPrefixedBy(final String columnName, final String prefix)
     );
   }
 
+  /**
+   * Compute a cache key prefix for data sources that participate in the RHS of a join. This key prefix
+   * can be used in segment level cache or result level cache. The function can return following wrapped in an
+   * Optional
+   *  - Non-empty byte array - If there is join datasource involved and caching is possible. The result includes
+   *  join condition expression, join type and cache key returned by joinable factory for each {@link PreJoinableClause}
+   *  - NULL - There is a join but caching is not possible. It may happen if one of the participating datasource
+   *  in the JOIN is not cacheable.
+   *
+   * @throws {@link IAE} if this operation is called on a non-join data source
+   * @param dataSourceAnalysis for the join datasource
+   * @return the optional cache key to be used as part of query cache key
+   */
+  public Optional<byte[]> computeJoinDataSourceCacheKey(
+      final DataSourceAnalysis dataSourceAnalysis
+  )
+  {
+    final List<PreJoinableClause> clauses = dataSourceAnalysis.getPreJoinableClauses();
+    if (clauses.isEmpty()) {
+      throw new IAE("No join clauses to build the cache key for data source [%s]", dataSourceAnalysis.getDataSource());
+    }
+
+    final CacheKeyBuilder keyBuilder;
+    keyBuilder = new CacheKeyBuilder(JOIN_OPERATION);
+    for (PreJoinableClause clause : clauses) {
+      Optional<byte[]> bytes = joinableFactory.computeJoinCacheKey(clause.getDataSource(), clause.getCondition());
+      if (!bytes.isPresent()) {
+        // Encountered a data source which didn't support cache yet
+        log.debug("skipping caching for join since [%s] does not support caching", clause.getDataSource());
+        return Optional.empty();
+      }
+      keyBuilder.appendByteArray(bytes.get());
+      keyBuilder.appendString(clause.getCondition().getOriginalExpression());
+      keyBuilder.appendString(clause.getJoinType().name());

Review comment:
       The clause's prefix is important too, because it controls the names of the columns.




----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscribe@druid.apache.org
For additional commands, e-mail: commits-help@druid.apache.org


[GitHub] [druid] abhishekagarwal87 commented on pull request #10366: Add caching support to join queries

Posted by GitBox <gi...@apache.org>.
abhishekagarwal87 commented on pull request #10366:
URL: https://github.com/apache/druid/pull/10366#issuecomment-702305535


   We have per-query toggles and those should suffice IMO. The caching is anyway disabled on broker. `useCache` and `populateCache` parameters can be set to false in the query-context. 


----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscribe@druid.apache.org
For additional commands, e-mail: commits-help@druid.apache.org


[GitHub] [druid] jihoonson commented on a change in pull request #10366: Add caching support to join queries

Posted by GitBox <gi...@apache.org>.
jihoonson commented on a change in pull request #10366:
URL: https://github.com/apache/druid/pull/10366#discussion_r493799693



##########
File path: processing/src/main/java/org/apache/druid/segment/join/Joinables.java
##########
@@ -35,57 +39,47 @@
 import javax.annotation.Nullable;
 import java.util.Comparator;
 import java.util.List;
+import java.util.Optional;
 import java.util.concurrent.atomic.AtomicLong;
 import java.util.function.Function;
 
 /**
- * Utility methods for working with {@link Joinable} related classes.
+ * A wrapper class over {@link JoinableFactory} for working with {@link Joinable} related classes.

Review comment:
       I'm OK with doing it as a follow-up.

##########
File path: server/src/main/java/org/apache/druid/server/coordination/ServerManager.java
##########
@@ -197,13 +197,16 @@ public ServerManager(
     }
 
     // segmentMapFn maps each base Segment into a joined Segment if necessary.
-    final Function<SegmentReference, SegmentReference> segmentMapFn = Joinables.createSegmentMapFn(
+    final Function<SegmentReference, SegmentReference> segmentMapFn = joinables.createSegmentMapFn(
         analysis.getPreJoinableClauses(),
-        joinableFactory,
         cpuTimeAccumulator,
         analysis.getBaseQuery().orElse(query)
     );
 
+    final Optional<byte[]> cacheKeyPrefix = analysis.isJoin()

Review comment:
       I see. It sounds reasonable to me although the cache key computation should be cheap. I think we can introduce another class responsible for computing cache keys. `ServerManager` can create it and pass it to `CachingQueryRunner`. The new class will take `DatasourceAnalysis` and `Joinables` in its constructor, and have an interface providing a computed cache key for a given segment. I don't think this refactoring should be necessarily done in this PR, but it seems good to have. Are you interested in this refactoring as well?




----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscribe@druid.apache.org
For additional commands, e-mail: commits-help@druid.apache.org


[GitHub] [druid] jihoonson commented on a change in pull request #10366: Add caching support to join queries

Posted by GitBox <gi...@apache.org>.
jihoonson commented on a change in pull request #10366:
URL: https://github.com/apache/druid/pull/10366#discussion_r495137818



##########
File path: processing/src/main/java/org/apache/druid/segment/join/table/BroadcastSegmentIndexedTable.java
##########
@@ -241,6 +244,22 @@ public ColumnSelectorFactory makeColumnSelectorFactory(ReadableOffset offset, bo
     );
   }
 
+  @Override
+  public Optional<byte[]> computeCacheKey()
+  {
+    SegmentId segmentId = segment.getId();

Review comment:
       I'm not sure how there could be cache key collisions between `IndexedTable`s. Even though there could be multiple `IndexedTable` implementations, the way it is used in the callers (`IndexedTableJoinMatcher`, `IndexedTableColumnValueSelector`, etc) should be same across those implementations which will result in the same query result. There will also not be cache key collisions between a broadcast segment (`IndexedTable`) and a regular segment as their cache key computation is different (the datasource name is included in only the former). But, the current way doesn't look very well-structured.
   
   Another approach I've been thinking is making `ReferenceCountedObject` (or `SegmentReference`) cacheable as they represent a segment (either a physical or a virtual). In this way, the cache key for segments will be computed in each `ReferenceCountedObject` implementation and so even `CachingQueryRunner` doesn't have to know how to compute the cache key for segments. The interface to compute a cache key should accept `SegmentDescriptor`. In this case, the prefix will indicate how the segment will be processed (in a hash join or an aggregation without join for now). This approach seems most reasonable to me, but I didn't mention it before since you already went a different way.




----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscribe@druid.apache.org
For additional commands, e-mail: commits-help@druid.apache.org


[GitHub] [druid] abhishekagarwal87 commented on a change in pull request #10366: Add caching support to join queries

Posted by GitBox <gi...@apache.org>.
abhishekagarwal87 commented on a change in pull request #10366:
URL: https://github.com/apache/druid/pull/10366#discussion_r495085871



##########
File path: processing/src/main/java/org/apache/druid/segment/join/Joinables.java
##########
@@ -118,6 +107,68 @@ public static boolean isPrefixedBy(final String columnName, final String prefix)
     );
   }
 
+  /**
+   * Compute a cache key prefix for data sources that participate in the RHS of a join. This key prefix
+   * can be used in segment level cache or result level cache. The function can return following wrapped in an
+   * Optional
+   *  - Non-empty byte array - If there is join datasource involved and caching is possible. The result includes
+   *  join condition expression, join type and cache key returned by joinable factory for each {@link PreJoinableClause}
+   *  - NULL - There is a join but caching is not possible. It may happen if one of the participating datasource
+   *  in the JOIN is not cacheable.
+   *
+   * @throws {@link IAE} if this operation is called on a non-join data source
+   * @param dataSourceAnalysis for the join datasource
+   * @return the optional cache key to be used as part of query cache key
+   */
+  public Optional<byte[]> computeJoinDataSourceCacheKey(
+      final DataSourceAnalysis dataSourceAnalysis
+  )
+  {
+    final List<PreJoinableClause> clauses = dataSourceAnalysis.getPreJoinableClauses();
+    if (clauses.isEmpty()) {
+      throw new IAE("No join clauses to build the cache key for data source [%s]", dataSourceAnalysis.getDataSource());
+    }
+
+    final CacheKeyBuilder keyBuilder;
+    keyBuilder = new CacheKeyBuilder(JOIN_OPERATION);
+    for (PreJoinableClause clause : clauses) {
+      Optional<byte[]> bytes = joinableFactory.computeJoinCacheKey(clause.getDataSource(), clause.getCondition());
+      if (!bytes.isPresent()) {
+        // Encountered a data source which didn't support cache yet
+        log.debug("skipping caching for join since [%s] does not support caching", clause.getDataSource());
+        return Optional.empty();
+      }
+      keyBuilder.appendByteArray(bytes.get());
+      keyBuilder.appendString(clause.getCondition().getOriginalExpression());
+      keyBuilder.appendString(clause.getJoinType().name());

Review comment:
       functionally it makes no difference, right? do the cached results store column names with the prefix?




----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscribe@druid.apache.org
For additional commands, e-mail: commits-help@druid.apache.org


[GitHub] [druid] abhishekagarwal87 commented on pull request #10366: Add caching support to join queries

Posted by GitBox <gi...@apache.org>.
abhishekagarwal87 commented on pull request #10366:
URL: https://github.com/apache/druid/pull/10366#issuecomment-696768403


   > Hi @abhishekagarwal87, thanks for the PR. The overall design to compute cache key for hash join looks reasonable to me. I left some comments on details.
   > 
   > You mentioned that unit tests should be added for `CachingQueryRunner` in the PR description. Are you planning to do it in this PR? I think it would be nice to do together in a same PR. Also, please add some for `CachingClusteredClient` as well.
   
   Hi @jihoonson - I have added these tests. I had to refactor `CachingClusteredClient` somewhat to unit test the cache key related pieces. Same goes for `CachingQueryRunner`


----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscribe@druid.apache.org
For additional commands, e-mail: commits-help@druid.apache.org


[GitHub] [druid] abhishekagarwal87 commented on a change in pull request #10366: Add caching support to join queries

Posted by GitBox <gi...@apache.org>.
abhishekagarwal87 commented on a change in pull request #10366:
URL: https://github.com/apache/druid/pull/10366#discussion_r492907168



##########
File path: processing/src/main/java/org/apache/druid/segment/join/JoinableFactory.java
##########
@@ -43,8 +43,18 @@
    *
    * @param dataSource the datasource to join on
    * @param condition  the condition to join on
-   *
    * @return a Joinable if this datasource + condition combo is joinable; empty if not
    */
   Optional<Joinable> build(DataSource dataSource, JoinConditionAnalysis condition);
+
+  /**
+   * Compute the cache key for a data source participating in join operation. This is done separately from {{@link #build(DataSource, JoinConditionAnalysis)}}
+   * which can be an expensive operation and can potentially be avoided if cached results can be used.
+   *
+   * @param dataSource the datasource to join on
+   */
+  default Optional<byte[]> computeJoinCacheKey(DataSource dataSource)

Review comment:
       hmm. I do believe this condition should be outside. In some cases, when one of the conditions cannot be hash-joined, we can skip building the other expensive Joinable objects themselves when query involve multiple joins. That being said, I understand that we do not want to change the interfaces right now. Though when new join algorithms come into being, I believe these interfaces will change as you have to somehow pass what join algorithm you intend to use. 




----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscribe@druid.apache.org
For additional commands, e-mail: commits-help@druid.apache.org


[GitHub] [druid] abhishekagarwal87 commented on a change in pull request #10366: Add caching support to join queries

Posted by GitBox <gi...@apache.org>.
abhishekagarwal87 commented on a change in pull request #10366:
URL: https://github.com/apache/druid/pull/10366#discussion_r500141913



##########
File path: server/src/main/java/org/apache/druid/server/coordination/ServerManager.java
##########
@@ -117,7 +117,7 @@ public ServerManager(
 
     this.cacheConfig = cacheConfig;
     this.segmentManager = segmentManager;
-    this.joinableFactory = joinableFactory;
+    this.joinableFactoryWrapper = new JoinableFactoryWrapper(joinableFactory);

Review comment:
       My primary reason for not using static methods, was easier mocking and unit-testing. I think injecting `JoinableFactoryWrapper` directly makes sense. Another option is to merge `MapJoinableFactory` and `JoinableFactoryWrapper` and cut down on a class. Both these classes are kind of providing a utility function on top of `JoinableFactory`. 




----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscribe@druid.apache.org
For additional commands, e-mail: commits-help@druid.apache.org


[GitHub] [druid] abhishekagarwal87 commented on a change in pull request #10366: Add caching support to join queries

Posted by GitBox <gi...@apache.org>.
abhishekagarwal87 commented on a change in pull request #10366:
URL: https://github.com/apache/druid/pull/10366#discussion_r495729980



##########
File path: server/src/main/java/org/apache/druid/client/CachingClusteredClient.java
##########
@@ -293,6 +301,14 @@ private ClusterQueryResult(Sequence<T> sequence, int numQueryServers)
       this.intervals = dataSourceAnalysis.getBaseQuerySegmentSpec()
                                          .map(QuerySegmentSpec::getIntervals)
                                          .orElseGet(() -> query.getIntervals());
+      this.cacheKeyManager = new CacheKeyManager<>(

Review comment:
       I suppose, in that case, we also keep both the versions on a data server for a while. Or otherwise, the table will not be in a queryable state till all the data servers see the new version. 




----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscribe@druid.apache.org
For additional commands, e-mail: commits-help@druid.apache.org


[GitHub] [druid] gianm commented on a change in pull request #10366: Add caching support to join queries

Posted by GitBox <gi...@apache.org>.
gianm commented on a change in pull request #10366:
URL: https://github.com/apache/druid/pull/10366#discussion_r495030839



##########
File path: processing/src/main/java/org/apache/druid/segment/join/table/IndexedTable.java
##########
@@ -87,6 +88,26 @@ default ColumnSelectorFactory makeColumnSelectorFactory(ReadableOffset offset, b
     return null;
   }
 
+  /**
+   * Computes a {@code byte[]} key for the table that can be used for computing cache keys for join operations.
+   * see {@link org.apache.druid.segment.join.JoinableFactory#computeJoinCacheKey}
+   *
+   * @return the byte array for cache key
+   * @throws {@link IAE} if caching is not supported
+   */
+  default byte[] computeCacheKey()

Review comment:
       Sounds good. It's probably not a good idea to make `getCacheKey()` unreliable in the Cacheable interface, so it makes sense not to use that interface here.




----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscribe@druid.apache.org
For additional commands, e-mail: commits-help@druid.apache.org


[GitHub] [druid] gianm commented on a change in pull request #10366: Add caching support to join queries

Posted by GitBox <gi...@apache.org>.
gianm commented on a change in pull request #10366:
URL: https://github.com/apache/druid/pull/10366#discussion_r495030144



##########
File path: server/src/main/java/org/apache/druid/client/CachingClusteredClient.java
##########
@@ -293,6 +301,14 @@ private ClusterQueryResult(Sequence<T> sequence, int numQueryServers)
       this.intervals = dataSourceAnalysis.getBaseQuerySegmentSpec()
                                          .map(QuerySegmentSpec::getIntervals)
                                          .orElseGet(() -> query.getIntervals());
+      this.cacheKeyManager = new CacheKeyManager<>(

Review comment:
       I think this is an issue for both kinds of caching (segment and result-level) on the Broker, because they both use keys that might not match the inputs that are used by the data server.




----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscribe@druid.apache.org
For additional commands, e-mail: commits-help@druid.apache.org


[GitHub] [druid] abhishekagarwal87 commented on a change in pull request #10366: Add caching support to join queries

Posted by GitBox <gi...@apache.org>.
abhishekagarwal87 commented on a change in pull request #10366:
URL: https://github.com/apache/druid/pull/10366#discussion_r493834959



##########
File path: server/src/main/java/org/apache/druid/server/coordination/ServerManager.java
##########
@@ -197,13 +197,16 @@ public ServerManager(
     }
 
     // segmentMapFn maps each base Segment into a joined Segment if necessary.
-    final Function<SegmentReference, SegmentReference> segmentMapFn = Joinables.createSegmentMapFn(
+    final Function<SegmentReference, SegmentReference> segmentMapFn = joinables.createSegmentMapFn(
         analysis.getPreJoinableClauses(),
-        joinableFactory,
         cpuTimeAccumulator,
         analysis.getBaseQuery().orElse(query)
     );
 
+    final Optional<byte[]> cacheKeyPrefix = analysis.isJoin()

Review comment:
       I think this piece of code is too small to be refactored into its own class as such. May be in future if the cache key computation becomes more complex. 




----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscribe@druid.apache.org
For additional commands, e-mail: commits-help@druid.apache.org


[GitHub] [druid] abhishekagarwal87 commented on a change in pull request #10366: Add caching support to join queries

Posted by GitBox <gi...@apache.org>.
abhishekagarwal87 commented on a change in pull request #10366:
URL: https://github.com/apache/druid/pull/10366#discussion_r493249560



##########
File path: server/src/main/java/org/apache/druid/server/coordination/ServerManager.java
##########
@@ -197,13 +197,16 @@ public ServerManager(
     }
 
     // segmentMapFn maps each base Segment into a joined Segment if necessary.
-    final Function<SegmentReference, SegmentReference> segmentMapFn = Joinables.createSegmentMapFn(
+    final Function<SegmentReference, SegmentReference> segmentMapFn = joinables.createSegmentMapFn(
         analysis.getPreJoinableClauses(),
-        joinableFactory,
         cpuTimeAccumulator,
         analysis.getBaseQuery().orElse(query)
     );
 
+    final Optional<byte[]> cacheKeyPrefix = analysis.isJoin()

Review comment:
       I remember now that what you are describing is how I started. But changed it later on so that the caching key is not computed for every segment participating in the query. Thats why I compute the cacheKeyPrefix in `getQueryRunnerForSegments` and the pass it on to each `SpecificSegmentQueryRunner`. 




----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscribe@druid.apache.org
For additional commands, e-mail: commits-help@druid.apache.org


[GitHub] [druid] abhishekagarwal87 commented on a change in pull request #10366: Add caching support to join queries

Posted by GitBox <gi...@apache.org>.
abhishekagarwal87 commented on a change in pull request #10366:
URL: https://github.com/apache/druid/pull/10366#discussion_r491917112



##########
File path: server/src/main/java/org/apache/druid/server/coordination/ServerManager.java
##########
@@ -197,13 +197,16 @@ public ServerManager(
     }
 
     // segmentMapFn maps each base Segment into a joined Segment if necessary.
-    final Function<SegmentReference, SegmentReference> segmentMapFn = Joinables.createSegmentMapFn(
+    final Function<SegmentReference, SegmentReference> segmentMapFn = joinables.createSegmentMapFn(
         analysis.getPreJoinableClauses(),
-        joinableFactory,
         cpuTimeAccumulator,
         analysis.getBaseQuery().orElse(query)
     );
 
+    final Optional<byte[]> cacheKeyPrefix = analysis.isJoin()

Review comment:
       I am not expecting cache key computation to be a costly operation. do you think it should still be passed via a supplier? 




----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscribe@druid.apache.org
For additional commands, e-mail: commits-help@druid.apache.org


[GitHub] [druid] abhishekagarwal87 commented on a change in pull request #10366: Proposed changes for making joins cacheable

Posted by GitBox <gi...@apache.org>.
abhishekagarwal87 commented on a change in pull request #10366:
URL: https://github.com/apache/druid/pull/10366#discussion_r487699118



##########
File path: processing/src/main/java/org/apache/druid/segment/join/table/BroadcastSegmentIndexedTable.java
##########
@@ -241,6 +244,22 @@ public ColumnSelectorFactory makeColumnSelectorFactory(ReadableOffset offset, bo
     );
   }
 
+  @Override
+  public Optional<byte[]> computeCacheKey()
+  {
+    SegmentId segmentId = segment.getId();

Review comment:
       are you suggesting to move this logic to `SegmentId` class? 




----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscribe@druid.apache.org
For additional commands, e-mail: commits-help@druid.apache.org


[GitHub] [druid] jihoonson commented on a change in pull request #10366: Add caching support to join queries

Posted by GitBox <gi...@apache.org>.
jihoonson commented on a change in pull request #10366:
URL: https://github.com/apache/druid/pull/10366#discussion_r493174340



##########
File path: processing/src/main/java/org/apache/druid/segment/join/table/BroadcastSegmentIndexedTable.java
##########
@@ -241,6 +244,22 @@ public ColumnSelectorFactory makeColumnSelectorFactory(ReadableOffset offset, bo
     );
   }
 
+  @Override
+  public Optional<byte[]> computeCacheKey()
+  {
+    SegmentId segmentId = segment.getId();

Review comment:
       > `CacheKeyBuilder` requires a prefix byte key in its constructor and hence I was not using it everywhere. I could add some fixed byte key though not sure how much does that help.
   
   This makes me wonder if broadcast segment needs a prefix key. I'm not sure if it would be useful yet. I still find using `CacheKeyBuilder` better as it's less error-prone than computing cache key manually, but I also agree with your point. I will leave this up to you.

##########
File path: processing/src/main/java/org/apache/druid/segment/join/Joinables.java
##########
@@ -35,57 +39,47 @@
 import javax.annotation.Nullable;
 import java.util.Comparator;
 import java.util.List;
+import java.util.Optional;
 import java.util.concurrent.atomic.AtomicLong;
 import java.util.function.Function;
 
 /**
- * Utility methods for working with {@link Joinable} related classes.
+ * A wrapper class over {@link JoinableFactory} for working with {@link Joinable} related classes.

Review comment:
       It works for me.

##########
File path: server/src/main/java/org/apache/druid/server/coordination/ServerManager.java
##########
@@ -197,13 +197,16 @@ public ServerManager(
     }
 
     // segmentMapFn maps each base Segment into a joined Segment if necessary.
-    final Function<SegmentReference, SegmentReference> segmentMapFn = Joinables.createSegmentMapFn(
+    final Function<SegmentReference, SegmentReference> segmentMapFn = joinables.createSegmentMapFn(
         analysis.getPreJoinableClauses(),
-        joinableFactory,
         cpuTimeAccumulator,
         analysis.getBaseQuery().orElse(query)
     );
 
+    final Optional<byte[]> cacheKeyPrefix = analysis.isJoin()

Review comment:
       Hmm, what I want is having cache key computation in one place since it's easier to follow how the key is computed. I think my previous comment was not enough for that. How about passing `DataSourceAnalysis` and `Joinables` to `CachingQueryRunner` so that cache key computation can be solely done there?

##########
File path: processing/src/main/java/org/apache/druid/segment/join/JoinableFactory.java
##########
@@ -43,8 +43,18 @@
    *
    * @param dataSource the datasource to join on
    * @param condition  the condition to join on
-   *
    * @return a Joinable if this datasource + condition combo is joinable; empty if not
    */
   Optional<Joinable> build(DataSource dataSource, JoinConditionAnalysis condition);
+
+  /**
+   * Compute the cache key for a data source participating in join operation. This is done separately from {{@link #build(DataSource, JoinConditionAnalysis)}}
+   * which can be an expensive operation and can potentially be avoided if cached results can be used.
+   *
+   * @param dataSource the datasource to join on
+   */
+  default Optional<byte[]> computeJoinCacheKey(DataSource dataSource)

Review comment:
       Good point. I do agree that it could be better to check the condition outside so that we can stop creating potentially expensive joinables early. However, it doesn't mean that this interface should not match to `build()`. So, I would suggest either adding a description in the Javadoc why those interfaces don't match or modifying this interface to match since it's confusing when you see this code without knowing why they are different even though they seem to be supposed to match.




----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscribe@druid.apache.org
For additional commands, e-mail: commits-help@druid.apache.org


[GitHub] [druid] jihoonson commented on a change in pull request #10366: Add caching support to join queries

Posted by GitBox <gi...@apache.org>.
jihoonson commented on a change in pull request #10366:
URL: https://github.com/apache/druid/pull/10366#discussion_r500516172



##########
File path: server/src/main/java/org/apache/druid/client/CachingClusteredClient.java
##########
@@ -770,4 +749,103 @@ private void addSequencesFromServer(
           .flatMerge(seq -> seq, query.getResultOrdering());
     }
   }
+
+  /**
+   * An inner class that is used solely for computing cache keys. Its a separate class to allow extensive unit testing
+   * of cache key generation.
+   */
+  @VisibleForTesting
+  static class CacheKeyManager<T>
+  {
+    private final Query<T> query;
+    private final CacheStrategy<T, Object, Query<T>> strategy;
+    private final DataSourceAnalysis dataSourceAnalysis;
+    private final JoinableFactoryWrapper joinableFactoryWrapper;
+    private final boolean isSegmentLevelCachingEnable;
+
+    CacheKeyManager(
+        final Query<T> query,
+        final CacheStrategy<T, Object, Query<T>> strategy,
+        final boolean useCache,
+        final boolean populateCache,
+        final DataSourceAnalysis dataSourceAnalysis,
+        final JoinableFactoryWrapper joinableFactoryWrapper
+    )
+    {
+
+      this.query = query;
+      this.strategy = strategy;
+      this.dataSourceAnalysis = dataSourceAnalysis;
+      this.joinableFactoryWrapper = joinableFactoryWrapper;
+      this.isSegmentLevelCachingEnable = ((populateCache || useCache)
+                                          && !QueryContexts.isBySegment(query));   // explicit bySegment queries are never cached
+
+    }
+
+    @Nullable
+    byte[] computeSegmentLevelQueryCacheKey()
+    {
+      if (isSegmentLevelCachingEnable) {
+        return computeQueryCacheKeyWithJoin();
+      }
+      return null;
+    }
+
+    /**
+     * It computes the ETAG which is used by {@link org.apache.druid.query.ResultLevelCachingQueryRunner} for
+     * result level caches. queryCacheKey can be null if segment level cache is not being used. However, ETAG
+     * is still computed since result level cache may still be on.
+     */
+    @Nullable
+    String computeResultLevelCachingEtag(
+        final Set<SegmentServerSelector> segments,
+        @Nullable byte[] queryCacheKey
+    )
+    {
+      Hasher hasher = Hashing.sha1().newHasher();
+      boolean hasOnlyHistoricalSegments = true;
+      for (SegmentServerSelector p : segments) {
+        if (!p.getServer().pick().getServer().isSegmentReplicationTarget()) {
+          hasOnlyHistoricalSegments = false;
+          break;
+        }
+        hasher.putString(p.getServer().getSegment().getId().toString(), StandardCharsets.UTF_8);
+        // it is important to add the "query interval" as part ETag calculation
+        // to have result level cache work correctly for queries with different
+        // intervals covering the same set of segments
+        hasher.putString(p.rhs.getInterval().toString(), StandardCharsets.UTF_8);
+      }
+
+      if (!hasOnlyHistoricalSegments) {
+        return null;
+      }
+
+      // query cache key can be null if segment level caching is disabled
+      final byte[] queryCacheKeyFinal = (queryCacheKey == null) ? computeQueryCacheKeyWithJoin() : queryCacheKey;
+      if (queryCacheKeyFinal == null) {
+        return null;
+      }
+      hasher.putBytes(queryCacheKeyFinal);
+      String currEtag = StringUtils.encodeBase64String(hasher.hash().asBytes());
+      return currEtag;
+    }
+
+    /**
+     * Adds the cache key prefix for join data sources. Return null if its a join but caching is not supported
+     */
+    @Nullable
+    private byte[] computeQueryCacheKeyWithJoin()
+    {
+      Preconditions.checkNotNull(strategy, "strategy cannot be null");
+      if (dataSourceAnalysis.isJoin()) {
+        return null; // Broker join caching disabled - https://github.com/apache/druid/issues/10444
+       /* byte[] joinDataSourceCacheKey = joinableFactoryWrapper.computeJoinDataSourceCacheKey(dataSourceAnalysis).orElse(null);
+        if (null == joinDataSourceCacheKey) {
+          return null;    // A join operation that does not support caching
+        }
+        return Bytes.concat(joinDataSourceCacheKey, strategy.computeCacheKey(query));*/

Review comment:
       Please remove these lines. We can add them back later when we enable it.




----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscribe@druid.apache.org
For additional commands, e-mail: commits-help@druid.apache.org


[GitHub] [druid] abhishekagarwal87 commented on a change in pull request #10366: Proposed changes for making joins cacheable

Posted by GitBox <gi...@apache.org>.
abhishekagarwal87 commented on a change in pull request #10366:
URL: https://github.com/apache/druid/pull/10366#discussion_r487695326



##########
File path: processing/src/main/java/org/apache/druid/segment/join/Joinables.java
##########
@@ -118,6 +126,47 @@ public static boolean isPrefixedBy(final String columnName, final String prefix)
     );
   }
 
+  /**
+   * Compute a cache key prefix for data sources that is to be used in segment level and result level caches. The
+   * data source can either be base (clauses is empty) or RHS of a join (clauses is non-empty). In both of the cases,
+   * a non-null cache is returned. However, the cache key is null if there is a join and some of the right data sources
+   * participating in the join do not support caching yet
+   *
+   * @param dataSourceAnalysis
+   * @param joinableFactory
+   * @return
+   */
+  public static Optional<byte[]> computeDataSourceCacheKey(
+      final DataSourceAnalysis dataSourceAnalysis,
+      final JoinableFactory joinableFactory
+  )
+  {
+    final CacheKeyBuilder keyBuilder;
+    final List<PreJoinableClause> clauses = dataSourceAnalysis.getPreJoinableClauses();
+    if (clauses.isEmpty()) {
+      keyBuilder = new CacheKeyBuilder(REGULAR_OPERATION);
+    } else {
+      keyBuilder = new CacheKeyBuilder(JOIN_OPERATION);
+      for (PreJoinableClause clause : clauses) {
+        if (!clause.getCondition().canHashJoin()) {

Review comment:
       I was thinking to keep it here so that I could just pass the `data-source` inside the `computeJoinCacheKey`.  The engine for hash join works outside the boundaries of joinable factories. so it felt simpler to just keep it here and joinable factories just worry about the data source. The other cache info from the condition is derived here in this class as well (original expression etc) 




----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscribe@druid.apache.org
For additional commands, e-mail: commits-help@druid.apache.org


[GitHub] [druid] abhishekagarwal87 commented on a change in pull request #10366: Add caching support to join queries

Posted by GitBox <gi...@apache.org>.
abhishekagarwal87 commented on a change in pull request #10366:
URL: https://github.com/apache/druid/pull/10366#discussion_r495737809



##########
File path: processing/src/main/java/org/apache/druid/segment/join/table/BroadcastSegmentIndexedTable.java
##########
@@ -241,6 +244,22 @@ public ColumnSelectorFactory makeColumnSelectorFactory(ReadableOffset offset, bo
     );
   }
 
+  @Override
+  public Optional<byte[]> computeCacheKey()
+  {
+    SegmentId segmentId = segment.getId();

Review comment:
       `ReferenceCountedObject` has wide variety of implementations. I am not sure how can that be made `cacheable` while many of these implementations have nothing to do with query caching. 
   
   how is making `SegmentReference` cacheable different from the very first alternative where we could add `computeCacheKey` method to `Segment`  class?  
   
   > In this case, the prefix will indicate how the segment will be processed (in a hash join or an aggregation without join for now)
   
   By the way, prefix for a join operation is already being used. where do you see this prefix being passed from? 




----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscribe@druid.apache.org
For additional commands, e-mail: commits-help@druid.apache.org


[GitHub] [druid] abhishekagarwal87 commented on a change in pull request #10366: Add caching support to join queries

Posted by GitBox <gi...@apache.org>.
abhishekagarwal87 commented on a change in pull request #10366:
URL: https://github.com/apache/druid/pull/10366#discussion_r495728970



##########
File path: processing/src/main/java/org/apache/druid/segment/join/Joinables.java
##########
@@ -118,6 +107,68 @@ public static boolean isPrefixedBy(final String columnName, final String prefix)
     );
   }
 
+  /**
+   * Compute a cache key prefix for data sources that participate in the RHS of a join. This key prefix
+   * can be used in segment level cache or result level cache. The function can return following wrapped in an
+   * Optional
+   *  - Non-empty byte array - If there is join datasource involved and caching is possible. The result includes
+   *  join condition expression, join type and cache key returned by joinable factory for each {@link PreJoinableClause}
+   *  - NULL - There is a join but caching is not possible. It may happen if one of the participating datasource
+   *  in the JOIN is not cacheable.
+   *
+   * @throws {@link IAE} if this operation is called on a non-join data source
+   * @param dataSourceAnalysis for the join datasource
+   * @return the optional cache key to be used as part of query cache key
+   */
+  public Optional<byte[]> computeJoinDataSourceCacheKey(
+      final DataSourceAnalysis dataSourceAnalysis
+  )
+  {
+    final List<PreJoinableClause> clauses = dataSourceAnalysis.getPreJoinableClauses();
+    if (clauses.isEmpty()) {
+      throw new IAE("No join clauses to build the cache key for data source [%s]", dataSourceAnalysis.getDataSource());
+    }
+
+    final CacheKeyBuilder keyBuilder;
+    keyBuilder = new CacheKeyBuilder(JOIN_OPERATION);
+    for (PreJoinableClause clause : clauses) {
+      Optional<byte[]> bytes = joinableFactory.computeJoinCacheKey(clause.getDataSource(), clause.getCondition());
+      if (!bytes.isPresent()) {
+        // Encountered a data source which didn't support cache yet
+        log.debug("skipping caching for join since [%s] does not support caching", clause.getDataSource());
+        return Optional.empty();
+      }
+      keyBuilder.appendByteArray(bytes.get());
+      keyBuilder.appendString(clause.getCondition().getOriginalExpression());
+      keyBuilder.appendString(clause.getJoinType().name());

Review comment:
       added the prefix. 




----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscribe@druid.apache.org
For additional commands, e-mail: commits-help@druid.apache.org


[GitHub] [druid] abhishekagarwal87 commented on a change in pull request #10366: Add caching support to join queries

Posted by GitBox <gi...@apache.org>.
abhishekagarwal87 commented on a change in pull request #10366:
URL: https://github.com/apache/druid/pull/10366#discussion_r491897608



##########
File path: processing/src/main/java/org/apache/druid/segment/join/Joinables.java
##########
@@ -118,6 +112,74 @@ public static boolean isPrefixedBy(final String columnName, final String prefix)
     );
   }
 
+  /**
+   * Compute a cache key prefix for data sources that participate in the RHS of a join. This key prefix
+   * can be used in segment level cache or result level cache. The function can return following wrapped in an
+   * Optional
+   *  - Non-empty byte array - If there is join datasource involved and caching is possible. The result includes
+   *  join condition expression, join type and cache key returned by joinable factory for each {@link PreJoinableClause}
+   *  - NULL - There is a join but caching is not possible. It may happen if one of the participating datasource
+   *  in the JOIN is not cacheable.
+   *
+   * @throws {@link IAE} if this operation is called on a non-join data source
+   * @param dataSourceAnalysis
+   * @return

Review comment:
       Ack




----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscribe@druid.apache.org
For additional commands, e-mail: commits-help@druid.apache.org


[GitHub] [druid] abhishekagarwal87 commented on a change in pull request #10366: Add caching support to join queries

Posted by GitBox <gi...@apache.org>.
abhishekagarwal87 commented on a change in pull request #10366:
URL: https://github.com/apache/druid/pull/10366#discussion_r493306273



##########
File path: processing/src/main/java/org/apache/druid/segment/join/JoinableFactory.java
##########
@@ -43,8 +43,18 @@
    *
    * @param dataSource the datasource to join on
    * @param condition  the condition to join on
-   *
    * @return a Joinable if this datasource + condition combo is joinable; empty if not
    */
   Optional<Joinable> build(DataSource dataSource, JoinConditionAnalysis condition);
+
+  /**
+   * Compute the cache key for a data source participating in join operation. This is done separately from {{@link #build(DataSource, JoinConditionAnalysis)}}
+   * which can be an expensive operation and can potentially be avoided if cached results can be used.
+   *
+   * @param dataSource the datasource to join on
+   */
+  default Optional<byte[]> computeJoinCacheKey(DataSource dataSource)

Review comment:
       Thank you for your suggestions. I agree that functions should have the parity. I changed the code accordingly. 




----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscribe@druid.apache.org
For additional commands, e-mail: commits-help@druid.apache.org


[GitHub] [druid] gianm commented on a change in pull request #10366: Add caching support to join queries

Posted by GitBox <gi...@apache.org>.
gianm commented on a change in pull request #10366:
URL: https://github.com/apache/druid/pull/10366#discussion_r495031854



##########
File path: server/src/main/java/org/apache/druid/client/CachingQueryRunner.java
##########
@@ -77,20 +83,15 @@ public CachingQueryRunner(
   {
     Query<T> query = queryPlus.getQuery();
     final CacheStrategy strategy = toolChest.getCacheStrategy(query);
-    final boolean populateCache = CacheUtil.isPopulateSegmentCache(
-        query,
-        strategy,
-        cacheConfig,
-        CacheUtil.ServerType.DATA
-    );
-    final boolean useCache = CacheUtil.isUseSegmentCache(query, strategy, cacheConfig, CacheUtil.ServerType.DATA);
+    final boolean populateCache = canPopulateCache(query, strategy);
+    final boolean useCache = canUseCache(query, strategy);
 
     final Cache.NamedKey key;
-    if (strategy != null && (useCache || populateCache)) {

Review comment:
       Got it. I don't think the extra check is necessary, given what you just said. Maybe just a comment explaining that `strategy` can't be null if either of the cache flags is true.




----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscribe@druid.apache.org
For additional commands, e-mail: commits-help@druid.apache.org


[GitHub] [druid] jihoonson commented on a change in pull request #10366: Add caching support to join queries

Posted by GitBox <gi...@apache.org>.
jihoonson commented on a change in pull request #10366:
URL: https://github.com/apache/druid/pull/10366#discussion_r491217786



##########
File path: processing/src/main/java/org/apache/druid/segment/join/Joinables.java
##########
@@ -35,57 +39,47 @@
 import javax.annotation.Nullable;
 import java.util.Comparator;
 import java.util.List;
+import java.util.Optional;
 import java.util.concurrent.atomic.AtomicLong;
 import java.util.function.Function;
 
 /**
- * Utility methods for working with {@link Joinable} related classes.
+ * A wrapper class over {@link JoinableFactory} for working with {@link Joinable} related classes.

Review comment:
       Using a plural name is a convention for utility classes like `Collections`. Maybe we should rename it if necessary since this class will not be a simple utility class anymore. But, does this class need to be a wrapper class? It doesn't seem like so. Also, some static util methods and non-static util methods are mixed here which seems confusing. Can we keep this class as a non-instantiable utility class?

##########
File path: processing/src/main/java/org/apache/druid/segment/join/JoinableFactory.java
##########
@@ -43,8 +43,18 @@
    *
    * @param dataSource the datasource to join on
    * @param condition  the condition to join on
-   *
    * @return a Joinable if this datasource + condition combo is joinable; empty if not
    */
   Optional<Joinable> build(DataSource dataSource, JoinConditionAnalysis condition);
+
+  /**
+   * Compute the cache key for a data source participating in join operation. This is done separately from {{@link #build(DataSource, JoinConditionAnalysis)}}
+   * which can be an expensive operation and can potentially be avoided if cached results can be used.
+   *
+   * @param dataSource the datasource to join on
+   */
+  default Optional<byte[]> computeJoinCacheKey(DataSource dataSource)

Review comment:
       Per contract of `JoinableFactory`, `build()` can return an `Optional.empty()` even when it is `directlyJoinable`. I think `computeJoinCacheKey()` should match with `build()`; it should return a computed cache key only when `build()` returns a `Joinable`.
   
   As you mentioned at https://github.com/apache/druid/pull/10366#discussion_r487695326, perhaps `canHashJoin()` could be checked on the caller side. I don't see why not for now except that you should also change the `build()` method and its callers to do the same. However, I'm not sure if that's better. That means, I'm not sure if this interface or the interface you may change makes sense for other join algorithms we will add in the future. I would suggest to keep this interface until we come up with a good one.

##########
File path: server/src/main/java/org/apache/druid/segment/realtime/appenderator/SinkQuerySegmentWalker.java
##########
@@ -224,8 +228,9 @@ public SinkQuerySegmentWalker(
                       // 1) Only use caching if data is immutable
                       // 2) Hydrants are not the same between replicas, make sure cache is local
                       if (hydrantDefinitelySwapped && cache.isLocal()) {
-                        runner = new CachingQueryRunner<>(
+                        runner = new CachingQueryRunner<T>(

Review comment:
       Type argument is not required.

##########
File path: processing/src/main/java/org/apache/druid/segment/join/Joinables.java
##########
@@ -118,6 +112,74 @@ public static boolean isPrefixedBy(final String columnName, final String prefix)
     );
   }
 
+  /**
+   * Compute a cache key prefix for data sources that participate in the RHS of a join. This key prefix
+   * can be used in segment level cache or result level cache. The function can return following wrapped in an
+   * Optional
+   *  - Non-empty byte array - If there is join datasource involved and caching is possible. The result includes
+   *  join condition expression, join type and cache key returned by joinable factory for each {@link PreJoinableClause}
+   *  - NULL - There is a join but caching is not possible. It may happen if one of the participating datasource
+   *  in the JOIN is not cacheable.
+   *
+   * @throws {@link IAE} if this operation is called on a non-join data source
+   * @param dataSourceAnalysis
+   * @return

Review comment:
       Please remove `param` and `return` tags if they don't need descriptions.

##########
File path: server/src/main/java/org/apache/druid/server/coordination/ServerManager.java
##########
@@ -197,13 +197,16 @@ public ServerManager(
     }
 
     // segmentMapFn maps each base Segment into a joined Segment if necessary.
-    final Function<SegmentReference, SegmentReference> segmentMapFn = Joinables.createSegmentMapFn(
+    final Function<SegmentReference, SegmentReference> segmentMapFn = joinables.createSegmentMapFn(
         analysis.getPreJoinableClauses(),
-        joinableFactory,
         cpuTimeAccumulator,
         analysis.getBaseQuery().orElse(query)
     );
 
+    final Optional<byte[]> cacheKeyPrefix = analysis.isJoin()

Review comment:
       How about supplying a `Supplier<Optional<byte[]>>` to the `CachingQueryRunner`? Then, computing `cacheKeyPrefix` can be done only when `useCache` or `populateCache` is true which seems more legit.

##########
File path: processing/src/main/java/org/apache/druid/segment/join/Joinables.java
##########
@@ -118,6 +112,74 @@ public static boolean isPrefixedBy(final String columnName, final String prefix)
     );
   }
 
+  /**
+   * Compute a cache key prefix for data sources that participate in the RHS of a join. This key prefix
+   * can be used in segment level cache or result level cache. The function can return following wrapped in an
+   * Optional
+   *  - Non-empty byte array - If there is join datasource involved and caching is possible. The result includes
+   *  join condition expression, join type and cache key returned by joinable factory for each {@link PreJoinableClause}
+   *  - NULL - There is a join but caching is not possible. It may happen if one of the participating datasource
+   *  in the JOIN is not cacheable.
+   *
+   * @throws {@link IAE} if this operation is called on a non-join data source
+   * @param dataSourceAnalysis
+   * @return
+   */
+  public Optional<byte[]> computeJoinDataSourceCacheKey(
+      final DataSourceAnalysis dataSourceAnalysis
+  )
+  {
+
+    final List<PreJoinableClause> clauses = dataSourceAnalysis.getPreJoinableClauses();
+    if (clauses.isEmpty()) {
+      throw new IAE("No join clauses to build the cache key");

Review comment:
       Please include useful information in the error message such as datasource.

##########
File path: processing/src/main/java/org/apache/druid/segment/join/Joinables.java
##########
@@ -118,6 +112,74 @@ public static boolean isPrefixedBy(final String columnName, final String prefix)
     );
   }
 
+  /**
+   * Compute a cache key prefix for data sources that participate in the RHS of a join. This key prefix
+   * can be used in segment level cache or result level cache. The function can return following wrapped in an
+   * Optional
+   *  - Non-empty byte array - If there is join datasource involved and caching is possible. The result includes
+   *  join condition expression, join type and cache key returned by joinable factory for each {@link PreJoinableClause}
+   *  - NULL - There is a join but caching is not possible. It may happen if one of the participating datasource
+   *  in the JOIN is not cacheable.
+   *
+   * @throws {@link IAE} if this operation is called on a non-join data source
+   * @param dataSourceAnalysis
+   * @return
+   */
+  public Optional<byte[]> computeJoinDataSourceCacheKey(
+      final DataSourceAnalysis dataSourceAnalysis
+  )
+  {
+
+    final List<PreJoinableClause> clauses = dataSourceAnalysis.getPreJoinableClauses();
+    if (clauses.isEmpty()) {
+      throw new IAE("No join clauses to build the cache key");
+    }
+
+    final CacheKeyBuilder keyBuilder;
+    keyBuilder = new CacheKeyBuilder(JOIN_OPERATION);
+    for (PreJoinableClause clause : clauses) {
+      if (!clause.getCondition().canHashJoin()) {
+        log.debug("skipping caching for join since [%s] does not support hash-join", clause.getCondition());
+        return Optional.empty();
+      }
+      Optional<byte[]> bytes = joinableFactory.computeJoinCacheKey(clause.getDataSource());
+      if (!bytes.isPresent()) {
+        // Encountered a data source which didn't support cache yet
+        log.debug("skipping caching for join since [%s] does not support caching", clause.getDataSource());
+        return Optional.empty();
+      }
+      keyBuilder.appendByteArray(bytes.get());
+      keyBuilder.appendString(clause.getPrefix());    //TODO - prefix shouldn't be required IMO

Review comment:
       Yeah, it doesn't seem necessary for now.

##########
File path: processing/src/main/java/org/apache/druid/segment/join/table/RowBasedIndexedTable.java
##########
@@ -52,6 +54,8 @@
   private final List<Function<RowType, Object>> columnFunctions;
   private final Set<String> keyColumns;
   private final String version;
+  @Nullable
+  private final byte[] cacheKey;

Review comment:
       Would you please add some comment on this variable? Since the `cacheKey` is not set in anywhere, people would not understand when it can be null or even how it could be set until they read this PR.

##########
File path: processing/src/test/java/org/apache/druid/segment/join/JoinablesTest.java
##########
@@ -190,6 +196,176 @@ public boolean isDirectlyJoinable(DataSource dataSource)
     Assert.assertNotSame(Function.identity(), segmentMapFn);
   }
 
+  @Test(expected = IAE.class)

Review comment:
       Please use `expectedException` and verifies the error message as well.

##########
File path: processing/src/test/java/org/apache/druid/segment/join/MapJoinableFactoryTest.java
##########
@@ -83,6 +84,36 @@ public void testBuildDataSourceIsRegisteredShouldReturnJoinableFromFactory()
     EasyMock.replay(noopJoinableFactory);
     Optional<Joinable> joinable = target.build(noopDataSource, condition);
     Assert.assertEquals(mockJoinable, joinable.get());
+
+  }
+
+  @Test
+  public void testComputeJoinCacheKey()
+  {
+    Optional<byte[]> expected = Optional.of(new byte[]{1, 2, 3});
+    EasyMock.expect(noopJoinableFactory.computeJoinCacheKey(noopDataSource)).andReturn(expected);
+    EasyMock.replay(noopJoinableFactory);
+    Optional<byte[]> actual = target.computeJoinCacheKey(noopDataSource);
+    Assert.assertEquals(expected, actual);
+  }
+
+  @Test(expected = ISE.class)

Review comment:
       Please use `expectedException` and verify the error message as well.

##########
File path: processing/src/test/java/org/apache/druid/segment/join/JoinablesTest.java
##########
@@ -190,6 +196,176 @@ public boolean isDirectlyJoinable(DataSource dataSource)
     Assert.assertNotSame(Function.identity(), segmentMapFn);
   }
 
+  @Test(expected = IAE.class)
+  public void test_computeJoinDataSourceCacheKey_noClauses()
+  {
+    DataSourceAnalysis analysis = EasyMock.mock(DataSourceAnalysis.class);
+    EasyMock.expect(analysis.getPreJoinableClauses()).andReturn(Collections.emptyList()).anyTimes();
+    EasyMock.replay(analysis);
+    Joinables joinables = new Joinables(new JoinableFactoryWithCacheKey());
+    joinables.computeJoinDataSourceCacheKey(analysis);
+  }
+
+  @Test
+  public void test_computeJoinDataSourceCacheKey_noHashJoin()
+  {
+
+    PreJoinableClause clause_1 = makeGlobalPreJoinableClause("dataSource_1", "x == \"j.x\"", "j.");
+    PreJoinableClause clause_2 = makeGlobalPreJoinableClause("dataSource_2", "x != \"h.x\"", "h.");
+    DataSourceAnalysis analysis = EasyMock.mock(DataSourceAnalysis.class);
+    EasyMock.expect(analysis.getPreJoinableClauses()).andReturn(Arrays.asList(clause_1, clause_2)).anyTimes();
+    EasyMock.replay(analysis);
+    Joinables joinables = new Joinables(new JoinableFactoryWithCacheKey());
+    Optional<byte[]> cacheKey = joinables.computeJoinDataSourceCacheKey(analysis);
+
+    Assert.assertFalse(cacheKey.isPresent());
+  }
+
+  @Test
+  public void test_computeJoinDataSourceCacheKey_cachingUnsupported()
+  {
+    PreJoinableClause clause_1 = makeGlobalPreJoinableClause("dataSource_1", "x == \"j.x\"", "j.");
+    DataSource dataSource = new LookupDataSource("lookup");
+    PreJoinableClause clause_2 = makePreJoinableClause(dataSource, "x == \"h.x\"", "h.", JoinType.LEFT);
+    DataSourceAnalysis analysis = EasyMock.mock(DataSourceAnalysis.class);
+    EasyMock.expect(analysis.getPreJoinableClauses()).andReturn(Arrays.asList(clause_1, clause_2)).anyTimes();
+    EasyMock.replay(analysis);
+    Joinables joinables = new Joinables(new JoinableFactoryWithCacheKey());
+    Optional<byte[]> cacheKey = joinables.computeJoinDataSourceCacheKey(analysis);
+
+    Assert.assertFalse(cacheKey.isPresent());
+  }
+
+  @Test
+  public void test_computeJoinDataSourceCacheKey_usableClauses()
+  {
+
+    PreJoinableClause clause_1 = makeGlobalPreJoinableClause("dataSource_1", "x == \"j.x\"", "j.");
+    PreJoinableClause clause_2 = makeGlobalPreJoinableClause("dataSource_2", "x == \"h.x\"", "h.");
+    DataSourceAnalysis analysis = EasyMock.mock(DataSourceAnalysis.class);
+    EasyMock.expect(analysis.getPreJoinableClauses()).andReturn(Arrays.asList(clause_1, clause_2)).anyTimes();
+    EasyMock.replay(analysis);
+    Joinables joinables = new Joinables(new JoinableFactoryWithCacheKey());
+    Optional<byte[]> cacheKey = joinables.computeJoinDataSourceCacheKey(analysis);
+
+    Assert.assertTrue(cacheKey.isPresent());
+  }
+
+  @Test
+  public void test_computeJoinDataSourceCacheKey_keyChangesWithExpression()
+  {
+    DataSourceAnalysis analysis = EasyMock.mock(DataSourceAnalysis.class);
+    Joinables joinables = new Joinables(new JoinableFactoryWithCacheKey());
+
+    PreJoinableClause clause_1 = makeGlobalPreJoinableClause("dataSource_1", "y == \"j.y\"", "j.");
+    EasyMock.expect(analysis.getPreJoinableClauses()).andReturn(Collections.singletonList(clause_1)).anyTimes();
+    EasyMock.replay(analysis);
+
+    Optional<byte[]> cacheKey_1 = joinables.computeJoinDataSourceCacheKey(analysis);
+    Assert.assertTrue(cacheKey_1.isPresent());
+    Assert.assertNotEquals(0, cacheKey_1.get().length);
+
+    PreJoinableClause clause_2 = makeGlobalPreJoinableClause("dataSource_1", "x == \"j.x\"", "j.");
+    EasyMock.reset(analysis);
+    EasyMock.expect(analysis.getPreJoinableClauses()).andReturn(Collections.singletonList(clause_2)).anyTimes();
+    EasyMock.replay(analysis);
+    Optional<byte[]> cacheKey_2 = joinables.computeJoinDataSourceCacheKey(analysis);
+
+    Assert.assertNotEquals(cacheKey_1, cacheKey_2);
+  }
+
+  @Test
+  public void test_computeJoinDataSourceCacheKey_keyChangesWithJoinType()
+  {
+    DataSourceAnalysis analysis = EasyMock.mock(DataSourceAnalysis.class);
+    Joinables joinables = new Joinables(new JoinableFactoryWithCacheKey());
+
+    PreJoinableClause clause_1 = makeGlobalPreJoinableClause("dataSource_1", "x == \"j.x\"", "j.", JoinType.LEFT);
+    EasyMock.expect(analysis.getPreJoinableClauses()).andReturn(Collections.singletonList(clause_1)).anyTimes();
+    EasyMock.replay(analysis);
+
+    Optional<byte[]> cacheKey_1 = joinables.computeJoinDataSourceCacheKey(analysis);
+    Assert.assertTrue(cacheKey_1.isPresent());
+    Assert.assertNotEquals(0, cacheKey_1.get().length);
+
+    PreJoinableClause clause_2 = makeGlobalPreJoinableClause("dataSource_1", "x == \"j.x\"", "j.", JoinType.INNER);
+    EasyMock.reset(analysis);
+    EasyMock.expect(analysis.getPreJoinableClauses()).andReturn(Collections.singletonList(clause_2)).anyTimes();
+    EasyMock.replay(analysis);
+    Optional<byte[]> cacheKey_2 = joinables.computeJoinDataSourceCacheKey(analysis);
+
+    Assert.assertNotEquals(cacheKey_1, cacheKey_2);
+  }
+
+  @Test
+  public void test_computeJoinDataSourceCacheKey_keyChangesWithPrefix()
+  {
+    DataSourceAnalysis analysis = EasyMock.mock(DataSourceAnalysis.class);
+    Joinables joinables = new Joinables(new JoinableFactoryWithCacheKey());
+
+    PreJoinableClause clause_1 = makeGlobalPreJoinableClause("dataSource_1", "x == \"j.x\"", "j.");
+    EasyMock.expect(analysis.getPreJoinableClauses()).andReturn(Collections.singletonList(clause_1)).anyTimes();
+    EasyMock.replay(analysis);
+
+    Optional<byte[]> cacheKey_1 = joinables.computeJoinDataSourceCacheKey(analysis);
+    Assert.assertTrue(cacheKey_1.isPresent());
+    Assert.assertNotEquals(0, cacheKey_1.get().length);
+
+    PreJoinableClause clause_2 = makeGlobalPreJoinableClause("dataSource_1", "x == \"h.x\"", "h.");
+    EasyMock.reset(analysis);
+    EasyMock.expect(analysis.getPreJoinableClauses()).andReturn(Collections.singletonList(clause_2)).anyTimes();
+    EasyMock.replay(analysis);
+    Optional<byte[]> cacheKey_2 = joinables.computeJoinDataSourceCacheKey(analysis);
+
+    Assert.assertNotEquals(cacheKey_1, cacheKey_2);
+  }
+
+  @Test
+  public void test_computeJoinDataSourceCacheKey_keyChangesWithJoinable()
+  {
+    DataSourceAnalysis analysis = EasyMock.mock(DataSourceAnalysis.class);
+    Joinables joinables = new Joinables(new JoinableFactoryWithCacheKey());
+
+    PreJoinableClause clause_1 = makeGlobalPreJoinableClause("dataSource_1", "x == \"j.x\"", "j.");
+    EasyMock.expect(analysis.getPreJoinableClauses()).andReturn(Collections.singletonList(clause_1)).anyTimes();
+    EasyMock.replay(analysis);
+
+    Optional<byte[]> cacheKey_1 = joinables.computeJoinDataSourceCacheKey(analysis);
+    Assert.assertTrue(cacheKey_1.isPresent());
+    Assert.assertNotEquals(0, cacheKey_1.get().length);
+
+    PreJoinableClause clause_2 = makeGlobalPreJoinableClause("dataSource_2", "x == \"j.x\"", "j.");
+    EasyMock.reset(analysis);
+    EasyMock.expect(analysis.getPreJoinableClauses()).andReturn(Collections.singletonList(clause_2)).anyTimes();
+    EasyMock.replay(analysis);
+    Optional<byte[]> cacheKey_2 = joinables.computeJoinDataSourceCacheKey(analysis);
+
+    Assert.assertNotEquals(cacheKey_1, cacheKey_2);
+  }
+
+  @Test
+  public void test_computeJoinDataSourceCacheKey_sameKeyForSameJoin()
+  {
+    DataSourceAnalysis analysis = EasyMock.mock(DataSourceAnalysis.class);
+    Joinables joinables = new Joinables(new JoinableFactoryWithCacheKey());
+
+    PreJoinableClause clause_1 = makeGlobalPreJoinableClause("dataSource_1", "x == \"j.x\"", "j.");
+    EasyMock.expect(analysis.getPreJoinableClauses()).andReturn(Collections.singletonList(clause_1)).anyTimes();
+    EasyMock.replay(analysis);
+
+    Optional<byte[]> cacheKey_1 = joinables.computeJoinDataSourceCacheKey(analysis);
+    Assert.assertTrue(cacheKey_1.isPresent());
+    Assert.assertNotEquals(0, cacheKey_1.get().length);
+
+    PreJoinableClause clause_2 = makeGlobalPreJoinableClause("dataSource_1", "x == \"j.x\"", "j.");
+    EasyMock.reset(analysis);
+    EasyMock.expect(analysis.getPreJoinableClauses()).andReturn(Collections.singletonList(clause_2)).anyTimes();
+    EasyMock.replay(analysis);
+    Optional<byte[]> cacheKey_2 = joinables.computeJoinDataSourceCacheKey(analysis);
+
+    Assert.assertNotEquals(cacheKey_1, cacheKey_2);

Review comment:
       Should this be `Assert.assertArrayEquals(cacheKey_1.get(), cacheKey_2.get());`?

##########
File path: processing/src/test/java/org/apache/druid/segment/join/MapJoinableFactoryTest.java
##########
@@ -83,6 +84,36 @@ public void testBuildDataSourceIsRegisteredShouldReturnJoinableFromFactory()
     EasyMock.replay(noopJoinableFactory);
     Optional<Joinable> joinable = target.build(noopDataSource, condition);
     Assert.assertEquals(mockJoinable, joinable.get());
+
+  }
+
+  @Test
+  public void testComputeJoinCacheKey()
+  {
+    Optional<byte[]> expected = Optional.of(new byte[]{1, 2, 3});
+    EasyMock.expect(noopJoinableFactory.computeJoinCacheKey(noopDataSource)).andReturn(expected);
+    EasyMock.replay(noopJoinableFactory);
+    Optional<byte[]> actual = target.computeJoinCacheKey(noopDataSource);
+    Assert.assertEquals(expected, actual);

Review comment:
       Should use `Assert.assertArrayEquals()`.

##########
File path: processing/src/test/java/org/apache/druid/segment/join/JoinablesTest.java
##########
@@ -190,6 +196,176 @@ public boolean isDirectlyJoinable(DataSource dataSource)
     Assert.assertNotSame(Function.identity(), segmentMapFn);
   }
 
+  @Test(expected = IAE.class)
+  public void test_computeJoinDataSourceCacheKey_noClauses()
+  {
+    DataSourceAnalysis analysis = EasyMock.mock(DataSourceAnalysis.class);
+    EasyMock.expect(analysis.getPreJoinableClauses()).andReturn(Collections.emptyList()).anyTimes();
+    EasyMock.replay(analysis);
+    Joinables joinables = new Joinables(new JoinableFactoryWithCacheKey());
+    joinables.computeJoinDataSourceCacheKey(analysis);
+  }
+
+  @Test
+  public void test_computeJoinDataSourceCacheKey_noHashJoin()
+  {
+
+    PreJoinableClause clause_1 = makeGlobalPreJoinableClause("dataSource_1", "x == \"j.x\"", "j.");

Review comment:
       We don't use underscore in variable names. Please rename all variables containing underscores.

##########
File path: processing/src/main/java/org/apache/druid/segment/join/Joinables.java
##########
@@ -118,6 +112,74 @@ public static boolean isPrefixedBy(final String columnName, final String prefix)
     );
   }
 
+  /**
+   * Compute a cache key prefix for data sources that participate in the RHS of a join. This key prefix
+   * can be used in segment level cache or result level cache. The function can return following wrapped in an
+   * Optional
+   *  - Non-empty byte array - If there is join datasource involved and caching is possible. The result includes
+   *  join condition expression, join type and cache key returned by joinable factory for each {@link PreJoinableClause}
+   *  - NULL - There is a join but caching is not possible. It may happen if one of the participating datasource
+   *  in the JOIN is not cacheable.
+   *
+   * @throws {@link IAE} if this operation is called on a non-join data source
+   * @param dataSourceAnalysis
+   * @return

Review comment:
       I found a couple of more places with empty `param` or `return` tags. Please either add a description or remove them.

##########
File path: processing/src/test/java/org/apache/druid/segment/join/table/RowBasedIndexedTableTest.java
##########
@@ -179,4 +179,13 @@ public void testVersion()
     Assert.assertEquals(JoinTestHelper.INDEXED_TABLE_VERSION, countriesTable.version());
     Assert.assertEquals(JoinTestHelper.INDEXED_TABLE_VERSION, regionsTable.version());
   }
+
+  @Test
+  public void testIsCacheable() throws IOException
+  {
+    Assert.assertFalse(countriesTable.isCacheable());
+    RowBasedIndexedTable<Map<String, Object>> countriesTableWithCacheKey = JoinTestHelper.createCountriesIndexedTableWithCacheKey();
+    Assert.assertTrue(countriesTableWithCacheKey.isCacheable());
+    Assert.assertEquals(countriesTableWithCacheKey.computeCacheKey(), JoinTestHelper.INDEXED_TABLE_CACHE_KEY);

Review comment:
       Please use `Assert.assertArrayEquals()`. Also the expected result should be the first argument.

##########
File path: server/src/test/java/org/apache/druid/server/SegmentManagerBroadcastJoinIndexedTableTest.java
##########
@@ -340,4 +356,15 @@ private DataSegment createSegment(IncrementalIndex data, String interval, String
         ImmutableMap.of("type", "local", "path", segmentDir.getAbsolutePath())
     );
   }
+
+  private void assertSegmentIdEquals(SegmentId id, byte[] bytes)
+  {
+    ByteBuffer byteBuffer = ByteBuffer.wrap(bytes);
+    long start = byteBuffer.getLong();
+    long end = byteBuffer.getLong();
+    String version = StringUtils.fromUtf8(byteBuffer, StringUtils.estimatedBinaryLengthAsUTF8(id.getVersion()));
+    String dataSource = StringUtils.fromUtf8(byteBuffer, StringUtils.estimatedBinaryLengthAsUTF8(id.getDataSource()));
+    int partition = byteBuffer.getInt();
+    Assert.assertEquals(id, SegmentId.of(dataSource, new Interval(start, end, DateTimeZone.UTC), version, partition));

Review comment:
       Please use `Intervals.utc()` instead.

##########
File path: processing/src/test/java/org/apache/druid/segment/join/JoinablesTest.java
##########
@@ -190,6 +196,176 @@ public boolean isDirectlyJoinable(DataSource dataSource)
     Assert.assertNotSame(Function.identity(), segmentMapFn);
   }
 
+  @Test(expected = IAE.class)
+  public void test_computeJoinDataSourceCacheKey_noClauses()
+  {
+    DataSourceAnalysis analysis = EasyMock.mock(DataSourceAnalysis.class);
+    EasyMock.expect(analysis.getPreJoinableClauses()).andReturn(Collections.emptyList()).anyTimes();
+    EasyMock.replay(analysis);
+    Joinables joinables = new Joinables(new JoinableFactoryWithCacheKey());
+    joinables.computeJoinDataSourceCacheKey(analysis);
+  }
+
+  @Test
+  public void test_computeJoinDataSourceCacheKey_noHashJoin()
+  {
+
+    PreJoinableClause clause_1 = makeGlobalPreJoinableClause("dataSource_1", "x == \"j.x\"", "j.");
+    PreJoinableClause clause_2 = makeGlobalPreJoinableClause("dataSource_2", "x != \"h.x\"", "h.");
+    DataSourceAnalysis analysis = EasyMock.mock(DataSourceAnalysis.class);
+    EasyMock.expect(analysis.getPreJoinableClauses()).andReturn(Arrays.asList(clause_1, clause_2)).anyTimes();
+    EasyMock.replay(analysis);
+    Joinables joinables = new Joinables(new JoinableFactoryWithCacheKey());
+    Optional<byte[]> cacheKey = joinables.computeJoinDataSourceCacheKey(analysis);
+
+    Assert.assertFalse(cacheKey.isPresent());
+  }
+
+  @Test
+  public void test_computeJoinDataSourceCacheKey_cachingUnsupported()
+  {
+    PreJoinableClause clause_1 = makeGlobalPreJoinableClause("dataSource_1", "x == \"j.x\"", "j.");
+    DataSource dataSource = new LookupDataSource("lookup");
+    PreJoinableClause clause_2 = makePreJoinableClause(dataSource, "x == \"h.x\"", "h.", JoinType.LEFT);
+    DataSourceAnalysis analysis = EasyMock.mock(DataSourceAnalysis.class);
+    EasyMock.expect(analysis.getPreJoinableClauses()).andReturn(Arrays.asList(clause_1, clause_2)).anyTimes();
+    EasyMock.replay(analysis);
+    Joinables joinables = new Joinables(new JoinableFactoryWithCacheKey());
+    Optional<byte[]> cacheKey = joinables.computeJoinDataSourceCacheKey(analysis);
+
+    Assert.assertFalse(cacheKey.isPresent());
+  }
+
+  @Test
+  public void test_computeJoinDataSourceCacheKey_usableClauses()
+  {
+
+    PreJoinableClause clause_1 = makeGlobalPreJoinableClause("dataSource_1", "x == \"j.x\"", "j.");
+    PreJoinableClause clause_2 = makeGlobalPreJoinableClause("dataSource_2", "x == \"h.x\"", "h.");
+    DataSourceAnalysis analysis = EasyMock.mock(DataSourceAnalysis.class);
+    EasyMock.expect(analysis.getPreJoinableClauses()).andReturn(Arrays.asList(clause_1, clause_2)).anyTimes();
+    EasyMock.replay(analysis);
+    Joinables joinables = new Joinables(new JoinableFactoryWithCacheKey());
+    Optional<byte[]> cacheKey = joinables.computeJoinDataSourceCacheKey(analysis);
+
+    Assert.assertTrue(cacheKey.isPresent());
+  }
+
+  @Test
+  public void test_computeJoinDataSourceCacheKey_keyChangesWithExpression()
+  {
+    DataSourceAnalysis analysis = EasyMock.mock(DataSourceAnalysis.class);
+    Joinables joinables = new Joinables(new JoinableFactoryWithCacheKey());
+
+    PreJoinableClause clause_1 = makeGlobalPreJoinableClause("dataSource_1", "y == \"j.y\"", "j.");
+    EasyMock.expect(analysis.getPreJoinableClauses()).andReturn(Collections.singletonList(clause_1)).anyTimes();
+    EasyMock.replay(analysis);
+
+    Optional<byte[]> cacheKey_1 = joinables.computeJoinDataSourceCacheKey(analysis);
+    Assert.assertTrue(cacheKey_1.isPresent());
+    Assert.assertNotEquals(0, cacheKey_1.get().length);
+
+    PreJoinableClause clause_2 = makeGlobalPreJoinableClause("dataSource_1", "x == \"j.x\"", "j.");
+    EasyMock.reset(analysis);
+    EasyMock.expect(analysis.getPreJoinableClauses()).andReturn(Collections.singletonList(clause_2)).anyTimes();
+    EasyMock.replay(analysis);
+    Optional<byte[]> cacheKey_2 = joinables.computeJoinDataSourceCacheKey(analysis);
+
+    Assert.assertNotEquals(cacheKey_1, cacheKey_2);
+  }
+
+  @Test
+  public void test_computeJoinDataSourceCacheKey_keyChangesWithJoinType()
+  {
+    DataSourceAnalysis analysis = EasyMock.mock(DataSourceAnalysis.class);
+    Joinables joinables = new Joinables(new JoinableFactoryWithCacheKey());
+
+    PreJoinableClause clause_1 = makeGlobalPreJoinableClause("dataSource_1", "x == \"j.x\"", "j.", JoinType.LEFT);
+    EasyMock.expect(analysis.getPreJoinableClauses()).andReturn(Collections.singletonList(clause_1)).anyTimes();
+    EasyMock.replay(analysis);
+
+    Optional<byte[]> cacheKey_1 = joinables.computeJoinDataSourceCacheKey(analysis);
+    Assert.assertTrue(cacheKey_1.isPresent());
+    Assert.assertNotEquals(0, cacheKey_1.get().length);
+
+    PreJoinableClause clause_2 = makeGlobalPreJoinableClause("dataSource_1", "x == \"j.x\"", "j.", JoinType.INNER);
+    EasyMock.reset(analysis);
+    EasyMock.expect(analysis.getPreJoinableClauses()).andReturn(Collections.singletonList(clause_2)).anyTimes();
+    EasyMock.replay(analysis);
+    Optional<byte[]> cacheKey_2 = joinables.computeJoinDataSourceCacheKey(analysis);
+
+    Assert.assertNotEquals(cacheKey_1, cacheKey_2);
+  }
+
+  @Test
+  public void test_computeJoinDataSourceCacheKey_keyChangesWithPrefix()
+  {
+    DataSourceAnalysis analysis = EasyMock.mock(DataSourceAnalysis.class);
+    Joinables joinables = new Joinables(new JoinableFactoryWithCacheKey());
+
+    PreJoinableClause clause_1 = makeGlobalPreJoinableClause("dataSource_1", "x == \"j.x\"", "j.");
+    EasyMock.expect(analysis.getPreJoinableClauses()).andReturn(Collections.singletonList(clause_1)).anyTimes();
+    EasyMock.replay(analysis);
+
+    Optional<byte[]> cacheKey_1 = joinables.computeJoinDataSourceCacheKey(analysis);
+    Assert.assertTrue(cacheKey_1.isPresent());
+    Assert.assertNotEquals(0, cacheKey_1.get().length);
+
+    PreJoinableClause clause_2 = makeGlobalPreJoinableClause("dataSource_1", "x == \"h.x\"", "h.");
+    EasyMock.reset(analysis);
+    EasyMock.expect(analysis.getPreJoinableClauses()).andReturn(Collections.singletonList(clause_2)).anyTimes();
+    EasyMock.replay(analysis);
+    Optional<byte[]> cacheKey_2 = joinables.computeJoinDataSourceCacheKey(analysis);
+
+    Assert.assertNotEquals(cacheKey_1, cacheKey_2);
+  }
+
+  @Test
+  public void test_computeJoinDataSourceCacheKey_keyChangesWithJoinable()
+  {
+    DataSourceAnalysis analysis = EasyMock.mock(DataSourceAnalysis.class);
+    Joinables joinables = new Joinables(new JoinableFactoryWithCacheKey());
+
+    PreJoinableClause clause_1 = makeGlobalPreJoinableClause("dataSource_1", "x == \"j.x\"", "j.");
+    EasyMock.expect(analysis.getPreJoinableClauses()).andReturn(Collections.singletonList(clause_1)).anyTimes();
+    EasyMock.replay(analysis);
+
+    Optional<byte[]> cacheKey_1 = joinables.computeJoinDataSourceCacheKey(analysis);
+    Assert.assertTrue(cacheKey_1.isPresent());
+    Assert.assertNotEquals(0, cacheKey_1.get().length);
+
+    PreJoinableClause clause_2 = makeGlobalPreJoinableClause("dataSource_2", "x == \"j.x\"", "j.");
+    EasyMock.reset(analysis);
+    EasyMock.expect(analysis.getPreJoinableClauses()).andReturn(Collections.singletonList(clause_2)).anyTimes();
+    EasyMock.replay(analysis);
+    Optional<byte[]> cacheKey_2 = joinables.computeJoinDataSourceCacheKey(analysis);
+
+    Assert.assertNotEquals(cacheKey_1, cacheKey_2);
+  }
+
+  @Test
+  public void test_computeJoinDataSourceCacheKey_sameKeyForSameJoin()
+  {
+    DataSourceAnalysis analysis = EasyMock.mock(DataSourceAnalysis.class);
+    Joinables joinables = new Joinables(new JoinableFactoryWithCacheKey());
+
+    PreJoinableClause clause_1 = makeGlobalPreJoinableClause("dataSource_1", "x == \"j.x\"", "j.");
+    EasyMock.expect(analysis.getPreJoinableClauses()).andReturn(Collections.singletonList(clause_1)).anyTimes();
+    EasyMock.replay(analysis);
+
+    Optional<byte[]> cacheKey_1 = joinables.computeJoinDataSourceCacheKey(analysis);
+    Assert.assertTrue(cacheKey_1.isPresent());
+    Assert.assertNotEquals(0, cacheKey_1.get().length);
+
+    PreJoinableClause clause_2 = makeGlobalPreJoinableClause("dataSource_1", "x == \"j.x\"", "j.");
+    EasyMock.reset(analysis);
+    EasyMock.expect(analysis.getPreJoinableClauses()).andReturn(Collections.singletonList(clause_2)).anyTimes();
+    EasyMock.replay(analysis);
+    Optional<byte[]> cacheKey_2 = joinables.computeJoinDataSourceCacheKey(analysis);
+
+    Assert.assertNotEquals(cacheKey_1, cacheKey_2);

Review comment:
       Similar comment for other tests you added. Should they use `Assert.assertFalse(Arrays.equals(cacheKey_1.get(), cacheKey_2.get()));`?

##########
File path: processing/src/main/java/org/apache/druid/segment/join/table/BroadcastSegmentIndexedTable.java
##########
@@ -241,6 +244,22 @@ public ColumnSelectorFactory makeColumnSelectorFactory(ReadableOffset offset, bo
     );
   }
 
+  @Override
+  public Optional<byte[]> computeCacheKey()
+  {
+    SegmentId segmentId = segment.getId();

Review comment:
       I think this logic should be here instead of in `SegmentId` since a segmentId represents a whole segment while we can partially read a broadcast segment based on an interval filter in the future.
   
   @abhishekagarwal87 regarding using `CacheUtil`, if you just want to add a new method similar to `computeSegmentCacheKey()`, I think you don't have to since the logic is different anyway. I would rather suggest using `CacheKeyBuilder`.

##########
File path: server/src/main/java/org/apache/druid/server/coordination/ServerManager.java
##########
@@ -293,8 +300,9 @@ public ServerManager(
         queryMetrics -> queryMetrics.segment(segmentIdString)
     );
 
-    CachingQueryRunner<T> cachingQueryRunner = new CachingQueryRunner<>(
+    QueryRunner<T> queryRunner = new CachingQueryRunner<>(

Review comment:
       Why is this change needed?




----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscribe@druid.apache.org
For additional commands, e-mail: commits-help@druid.apache.org


[GitHub] [druid] jihoonson commented on a change in pull request #10366: Add caching support to join queries

Posted by GitBox <gi...@apache.org>.
jihoonson commented on a change in pull request #10366:
URL: https://github.com/apache/druid/pull/10366#discussion_r493091856



##########
File path: processing/src/test/java/org/apache/druid/segment/join/MapJoinableFactoryTest.java
##########
@@ -113,6 +118,12 @@ public void testBuildExceptionWhenTwoJoinableFactoryForSameDataSource()
     EasyMock.expect(noopJoinableFactory.build(noopDataSource, condition)).andReturn(Optional.of(mockJoinable));
     EasyMock.expect(anotherNoopJoinableFactory.build(noopDataSource, condition)).andReturn(Optional.of(mockJoinable));
     EasyMock.replay(noopJoinableFactory, anotherNoopJoinableFactory);
+    expectedException.expect(ISE.class);
+    expectedException.expectMessage(String.format(

Review comment:
       Please use `StringUtils.format()` instead here and in other places.

##########
File path: server/src/main/java/org/apache/druid/client/CachingClusteredClient.java
##########
@@ -796,4 +749,102 @@ private void addSequencesFromServer(
           .flatMerge(seq -> seq, query.getResultOrdering());
     }
   }
+
+  /**
+   * An inner class that is used solely for computing cache keys. Its a separate class to allow extensive unit testing
+   * of cache key generation.
+   */
+  @VisibleForTesting
+  static class CacheKeyManager<T>

Review comment:
       This is nice :+1: 

##########
File path: processing/src/main/java/org/apache/druid/segment/join/table/BroadcastSegmentIndexedTable.java
##########
@@ -241,6 +244,22 @@ public ColumnSelectorFactory makeColumnSelectorFactory(ReadableOffset offset, bo
     );
   }
 
+  @Override
+  public Optional<byte[]> computeCacheKey()
+  {
+    SegmentId segmentId = segment.getId();

Review comment:
       > `CacheKeyBuilder` requires a prefix byte key in its constructor and hence I was not using it everywhere. I could add some fixed byte key though not sure how much does that help.
   
   This makes me wonder if broadcast segment needs a prefix key. I'm not sure if it would be useful yet. I still find using `CacheKeyBuilder` better as it's less error-prone than computing cache key manually, but I also agree with your point. I will leave this up to you.

##########
File path: processing/src/main/java/org/apache/druid/segment/join/Joinables.java
##########
@@ -35,57 +39,47 @@
 import javax.annotation.Nullable;
 import java.util.Comparator;
 import java.util.List;
+import java.util.Optional;
 import java.util.concurrent.atomic.AtomicLong;
 import java.util.function.Function;
 
 /**
- * Utility methods for working with {@link Joinable} related classes.
+ * A wrapper class over {@link JoinableFactory} for working with {@link Joinable} related classes.

Review comment:
       It works for me.

##########
File path: server/src/main/java/org/apache/druid/server/coordination/ServerManager.java
##########
@@ -197,13 +197,16 @@ public ServerManager(
     }
 
     // segmentMapFn maps each base Segment into a joined Segment if necessary.
-    final Function<SegmentReference, SegmentReference> segmentMapFn = Joinables.createSegmentMapFn(
+    final Function<SegmentReference, SegmentReference> segmentMapFn = joinables.createSegmentMapFn(
         analysis.getPreJoinableClauses(),
-        joinableFactory,
         cpuTimeAccumulator,
         analysis.getBaseQuery().orElse(query)
     );
 
+    final Optional<byte[]> cacheKeyPrefix = analysis.isJoin()

Review comment:
       Hmm, what I want is having cache key computation in one place since it's easier to follow how the key is computed. I think my previous comment was not enough for that. How about passing `DataSourceAnalysis` and `Joinables` to `CachingQueryRunner` so that cache key computation can be solely done there?

##########
File path: processing/src/main/java/org/apache/druid/segment/join/JoinableFactory.java
##########
@@ -43,8 +43,18 @@
    *
    * @param dataSource the datasource to join on
    * @param condition  the condition to join on
-   *
    * @return a Joinable if this datasource + condition combo is joinable; empty if not
    */
   Optional<Joinable> build(DataSource dataSource, JoinConditionAnalysis condition);
+
+  /**
+   * Compute the cache key for a data source participating in join operation. This is done separately from {{@link #build(DataSource, JoinConditionAnalysis)}}
+   * which can be an expensive operation and can potentially be avoided if cached results can be used.
+   *
+   * @param dataSource the datasource to join on
+   */
+  default Optional<byte[]> computeJoinCacheKey(DataSource dataSource)

Review comment:
       Good point. I do agree that it could be better to check the condition outside so that we can stop creating potentially expensive joinables early. However, it doesn't mean that this interface should not match to `build()`. So, I would suggest either adding a description in the Javadoc why those interfaces don't match or modifying this interface to match since it's confusing when you see this code without knowing why they are different even though they seem to be supposed to match.




----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscribe@druid.apache.org
For additional commands, e-mail: commits-help@druid.apache.org


[GitHub] [druid] abhishekagarwal87 commented on a change in pull request #10366: Proposed changes for making joins cacheable

Posted by GitBox <gi...@apache.org>.
abhishekagarwal87 commented on a change in pull request #10366:
URL: https://github.com/apache/druid/pull/10366#discussion_r487699990



##########
File path: server/src/main/java/org/apache/druid/segment/join/BroadcastTableJoinableFactory.java
##########
@@ -76,4 +76,27 @@ public boolean isDirectlyJoinable(DataSource dataSource)
     }
     return Optional.empty();
   }
+
+  @Override
+  public Optional<byte[]> computeJoinCacheKey(DataSource dataSource)

Review comment:
       Yes. good point. 




----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscribe@druid.apache.org
For additional commands, e-mail: commits-help@druid.apache.org


[GitHub] [druid] abhishekagarwal87 commented on a change in pull request #10366: Add caching support to join queries

Posted by GitBox <gi...@apache.org>.
abhishekagarwal87 commented on a change in pull request #10366:
URL: https://github.com/apache/druid/pull/10366#discussion_r493241076



##########
File path: server/src/main/java/org/apache/druid/query/ResultLevelCachingQueryRunner.java
##########
@@ -86,7 +92,15 @@ public ResultLevelCachingQueryRunner(
     if (useResultCache || populateResultCache) {
 
       final String cacheKeyStr = StringUtils.fromUtf8(strategy.computeResultLevelCacheKey(query));
-      final byte[] cachedResultSet = fetchResultsFromResultLevelCache(cacheKeyStr);
+      DataSourceAnalysis analysis = DataSourceAnalysis.forDataSource(query.getDataSource());
+      byte[] dataSourceCacheKey = Joinables.computeDataSourceCacheKey(analysis, joinableFactory).orElse(null);
+      if (null == dataSourceCacheKey) {
+        return baseRunner.run(
+            queryPlus,
+            responseContext
+        );
+      }
+      final byte[] cachedResultSet = fetchResultsFromResultLevelCache(cacheKeyStr, dataSourceCacheKey);

Review comment:
       This change has been removed now. 




----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscribe@druid.apache.org
For additional commands, e-mail: commits-help@druid.apache.org


[GitHub] [druid] abhishekagarwal87 commented on a change in pull request #10366: Add caching support to join queries

Posted by GitBox <gi...@apache.org>.
abhishekagarwal87 commented on a change in pull request #10366:
URL: https://github.com/apache/druid/pull/10366#discussion_r494826651



##########
File path: server/src/main/java/org/apache/druid/client/CachingQueryRunner.java
##########
@@ -77,20 +83,15 @@ public CachingQueryRunner(
   {
     Query<T> query = queryPlus.getQuery();
     final CacheStrategy strategy = toolChest.getCacheStrategy(query);
-    final boolean populateCache = CacheUtil.isPopulateSegmentCache(
-        query,
-        strategy,
-        cacheConfig,
-        CacheUtil.ServerType.DATA
-    );
-    final boolean useCache = CacheUtil.isUseSegmentCache(query, strategy, cacheConfig, CacheUtil.ServerType.DATA);
+    final boolean populateCache = canPopulateCache(query, strategy);
+    final boolean useCache = canUseCache(query, strategy);
 
     final Cache.NamedKey key;
-    if (strategy != null && (useCache || populateCache)) {

Review comment:
       if its null, then both `useCache` and `populateCache` will be set to false. if that doesn't hold, then an NPE can occur later in the code. I can still add another check in `canUseCache`, `canPopulateCache`. 




----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscribe@druid.apache.org
For additional commands, e-mail: commits-help@druid.apache.org


[GitHub] [druid] jihoonson commented on a change in pull request #10366: Add caching support to join queries

Posted by GitBox <gi...@apache.org>.
jihoonson commented on a change in pull request #10366:
URL: https://github.com/apache/druid/pull/10366#discussion_r502082167



##########
File path: server/src/main/java/org/apache/druid/client/CachingClusteredClient.java
##########
@@ -289,14 +289,17 @@ private ClusterQueryResult(Sequence<T> sequence, int numQueryServers)
       this.query = queryPlus.getQuery();
       this.toolChest = warehouse.getToolChest(query);
       this.strategy = toolChest.getCacheStrategy(query);
+      this.dataSourceAnalysis = DataSourceAnalysis.forDataSource(query.getDataSource());
 
-      this.useCache = CacheUtil.isUseSegmentCache(query, strategy, cacheConfig, CacheUtil.ServerType.BROKER);
-      this.populateCache = CacheUtil.isPopulateSegmentCache(query, strategy, cacheConfig, CacheUtil.ServerType.BROKER);
+      // Broker join caching is disabled - https://github.com/apache/druid/issues/10444
+      this.useCache = CacheUtil.isUseSegmentCache(query, strategy, cacheConfig, CacheUtil.ServerType.BROKER)
+                      && !dataSourceAnalysis.isJoin();

Review comment:
       I think this check should be not here, but inside `CacheUtil.isUseSegmentCache()`. The reason we pass the `ServerType` in `isUseSegmentCache()` is that we want to make different decisions based on the `ServerType`. We are disabling the cache for join on brokers, so it should be in there.

##########
File path: server/src/main/java/org/apache/druid/client/CachingClusteredClient.java
##########
@@ -289,14 +289,17 @@ private ClusterQueryResult(Sequence<T> sequence, int numQueryServers)
       this.query = queryPlus.getQuery();
       this.toolChest = warehouse.getToolChest(query);
       this.strategy = toolChest.getCacheStrategy(query);
+      this.dataSourceAnalysis = DataSourceAnalysis.forDataSource(query.getDataSource());
 
-      this.useCache = CacheUtil.isUseSegmentCache(query, strategy, cacheConfig, CacheUtil.ServerType.BROKER);
-      this.populateCache = CacheUtil.isPopulateSegmentCache(query, strategy, cacheConfig, CacheUtil.ServerType.BROKER);
+      // Broker join caching is disabled - https://github.com/apache/druid/issues/10444
+      this.useCache = CacheUtil.isUseSegmentCache(query, strategy, cacheConfig, CacheUtil.ServerType.BROKER)
+                      && !dataSourceAnalysis.isJoin();
+      this.populateCache = CacheUtil.isPopulateSegmentCache(query, strategy, cacheConfig, CacheUtil.ServerType.BROKER)

Review comment:
       Same comment.




----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscribe@druid.apache.org
For additional commands, e-mail: commits-help@druid.apache.org


[GitHub] [druid] clintropolis commented on a change in pull request #10366: Add caching support to join queries

Posted by GitBox <gi...@apache.org>.
clintropolis commented on a change in pull request #10366:
URL: https://github.com/apache/druid/pull/10366#discussion_r493188636



##########
File path: processing/src/main/java/org/apache/druid/segment/join/JoinableFactory.java
##########
@@ -43,8 +43,18 @@
    *
    * @param dataSource the datasource to join on
    * @param condition  the condition to join on
-   *
    * @return a Joinable if this datasource + condition combo is joinable; empty if not
    */
   Optional<Joinable> build(DataSource dataSource, JoinConditionAnalysis condition);
+
+  /**
+   * Compute the cache key for a data source participating in join operation. This is done separately from {{@link #build(DataSource, JoinConditionAnalysis)}}
+   * which can be an expensive operation and can potentially be avoided if cached results can be used.
+   *
+   * @param dataSource the datasource to join on
+   */
+  default Optional<byte[]> computeJoinCacheKey(DataSource dataSource)

Review comment:
       +1 for having a consistent interface with `build` since they need to make the same decisions. I'm not sure this PR is the correct venue to refactor `build` so I would be in favor of adding `JoinConditionAnalysis` argument




----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscribe@druid.apache.org
For additional commands, e-mail: commits-help@druid.apache.org


[GitHub] [druid] abhishekagarwal87 commented on a change in pull request #10366: Add caching support to join queries

Posted by GitBox <gi...@apache.org>.
abhishekagarwal87 commented on a change in pull request #10366:
URL: https://github.com/apache/druid/pull/10366#discussion_r494843760



##########
File path: server/src/main/java/org/apache/druid/client/CachingClusteredClient.java
##########
@@ -770,4 +749,102 @@ private void addSequencesFromServer(
           .flatMerge(seq -> seq, query.getResultOrdering());
     }
   }
+
+  /**
+   * An inner class that is used solely for computing cache keys. Its a separate class to allow extensive unit testing
+   * of cache key generation.
+   */
+  @VisibleForTesting
+  static class CacheKeyManager<T>
+  {
+    private final Query<T> query;
+    private final CacheStrategy<T, Object, Query<T>> strategy;
+    private final DataSourceAnalysis dataSourceAnalysis;
+    private final Joinables joinables;
+    private final boolean isSegmentLevelCachingEnable;
+
+    CacheKeyManager(
+        final Query<T> query,
+        final CacheStrategy<T, Object, Query<T>> strategy,
+        final boolean useCache,
+        final boolean populateCache,
+        final DataSourceAnalysis dataSourceAnalysis,
+        final Joinables joinables
+    )
+    {
+
+      this.query = query;
+      this.strategy = strategy;
+      this.dataSourceAnalysis = dataSourceAnalysis;
+      this.joinables = joinables;
+      this.isSegmentLevelCachingEnable = ((populateCache || useCache)
+                                          && !QueryContexts.isBySegment(query));   // explicit bySegment queries are never cached
+
+    }
+
+    @Nullable
+    byte[] computeSegmentLevelQueryCacheKey()
+    {
+      if (isSegmentLevelCachingEnable) {
+        return computeQueryCacheKeyWithJoin();
+      }
+      return null;
+    }
+
+    /**
+     * It computes the ETAG which is used by {@link org.apache.druid.query.ResultLevelCachingQueryRunner} for
+     * result level caches. queryCacheKey can be null if segment level cache is not being used. However, ETAG
+     * is still computed since result level cache may still be on.
+     */
+    @Nullable
+    String computeResultLevelCachingEtag(
+        final Set<SegmentServerSelector> segments,
+        @Nullable byte[] queryCacheKey
+    )
+    {
+      Hasher hasher = Hashing.sha1().newHasher();
+      boolean hasOnlyHistoricalSegments = true;
+      for (SegmentServerSelector p : segments) {
+        if (!p.getServer().pick().getServer().isSegmentReplicationTarget()) {
+          hasOnlyHistoricalSegments = false;
+          break;
+        }
+        hasher.putString(p.getServer().getSegment().getId().toString(), StandardCharsets.UTF_8);
+        // it is important to add the "query interval" as part ETag calculation
+        // to have result level cache work correctly for queries with different
+        // intervals covering the same set of segments
+        hasher.putString(p.rhs.getInterval().toString(), StandardCharsets.UTF_8);
+      }
+
+      if (!hasOnlyHistoricalSegments) {
+        return null;
+      }
+
+      // query cache key can be null if segment level caching is disabled
+      final byte[] queryCacheKeyFinal = (queryCacheKey == null) ? computeQueryCacheKeyWithJoin() : queryCacheKey;
+      if (queryCacheKeyFinal == null) {
+        return null;
+      }
+      hasher.putBytes(queryCacheKeyFinal);
+      String currEtag = StringUtils.encodeBase64String(hasher.hash().asBytes());
+      return currEtag;
+    }
+
+    /**
+     * Adds the cache key prefix for join data sources. Return null if its a join but caching is not supported
+     */
+    @Nullable
+    private byte[] computeQueryCacheKeyWithJoin()
+    {
+      assert strategy != null;  // implies strategy != null

Review comment:
       I think it was someone suppressing the intellij warning about strategy being null. I replaced it with `Precondition.checkNotNull`




----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscribe@druid.apache.org
For additional commands, e-mail: commits-help@druid.apache.org


[GitHub] [druid] gianm commented on a change in pull request #10366: Add caching support to join queries

Posted by GitBox <gi...@apache.org>.
gianm commented on a change in pull request #10366:
URL: https://github.com/apache/druid/pull/10366#discussion_r495031854



##########
File path: server/src/main/java/org/apache/druid/client/CachingQueryRunner.java
##########
@@ -77,20 +83,15 @@ public CachingQueryRunner(
   {
     Query<T> query = queryPlus.getQuery();
     final CacheStrategy strategy = toolChest.getCacheStrategy(query);
-    final boolean populateCache = CacheUtil.isPopulateSegmentCache(
-        query,
-        strategy,
-        cacheConfig,
-        CacheUtil.ServerType.DATA
-    );
-    final boolean useCache = CacheUtil.isUseSegmentCache(query, strategy, cacheConfig, CacheUtil.ServerType.DATA);
+    final boolean populateCache = canPopulateCache(query, strategy);
+    final boolean useCache = canUseCache(query, strategy);
 
     final Cache.NamedKey key;
-    if (strategy != null && (useCache || populateCache)) {

Review comment:
       Got it. I don't think the extra check is necessary, given what you just said.

##########
File path: server/src/main/java/org/apache/druid/client/CachingQueryRunner.java
##########
@@ -77,20 +83,15 @@ public CachingQueryRunner(
   {
     Query<T> query = queryPlus.getQuery();
     final CacheStrategy strategy = toolChest.getCacheStrategy(query);
-    final boolean populateCache = CacheUtil.isPopulateSegmentCache(
-        query,
-        strategy,
-        cacheConfig,
-        CacheUtil.ServerType.DATA
-    );
-    final boolean useCache = CacheUtil.isUseSegmentCache(query, strategy, cacheConfig, CacheUtil.ServerType.DATA);
+    final boolean populateCache = canPopulateCache(query, strategy);
+    final boolean useCache = canUseCache(query, strategy);
 
     final Cache.NamedKey key;
-    if (strategy != null && (useCache || populateCache)) {

Review comment:
       Got it. I don't think the extra check is necessary, given what you just said. Maybe just a comment.




----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscribe@druid.apache.org
For additional commands, e-mail: commits-help@druid.apache.org


[GitHub] [druid] gianm commented on a change in pull request #10366: Add caching support to join queries

Posted by GitBox <gi...@apache.org>.
gianm commented on a change in pull request #10366:
URL: https://github.com/apache/druid/pull/10366#discussion_r495145350



##########
File path: server/src/main/java/org/apache/druid/client/CachingClusteredClient.java
##########
@@ -293,6 +301,14 @@ private ClusterQueryResult(Sequence<T> sequence, int numQueryServers)
       this.intervals = dataSourceAnalysis.getBaseQuerySegmentSpec()
                                          .map(QuerySegmentSpec::getIntervals)
                                          .orElseGet(() -> query.getIntervals());
+      this.cacheKeyManager = new CacheKeyManager<>(

Review comment:
       I think @jihoonson's suggestion is best, because that would not only make caching correct, it would also ensure that the same broadcast table version is used across all data servers involved in a query. That'd make the query results more internally consistent (today, potentially, a different version of the table could be used on different data servers).




----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscribe@druid.apache.org
For additional commands, e-mail: commits-help@druid.apache.org


[GitHub] [druid] abhishekagarwal87 commented on a change in pull request #10366: Add caching support to join queries

Posted by GitBox <gi...@apache.org>.
abhishekagarwal87 commented on a change in pull request #10366:
URL: https://github.com/apache/druid/pull/10366#discussion_r493240695



##########
File path: processing/src/main/java/org/apache/druid/segment/join/Joinables.java
##########
@@ -118,6 +126,47 @@ public static boolean isPrefixedBy(final String columnName, final String prefix)
     );
   }
 
+  /**
+   * Compute a cache key prefix for data sources that is to be used in segment level and result level caches. The
+   * data source can either be base (clauses is empty) or RHS of a join (clauses is non-empty). In both of the cases,
+   * a non-null cache is returned. However, the cache key is null if there is a join and some of the right data sources
+   * participating in the join do not support caching yet
+   *
+   * @param dataSourceAnalysis
+   * @param joinableFactory
+   * @return
+   */
+  public static Optional<byte[]> computeDataSourceCacheKey(
+      final DataSourceAnalysis dataSourceAnalysis,
+      final JoinableFactory joinableFactory
+  )
+  {
+    final CacheKeyBuilder keyBuilder;
+    final List<PreJoinableClause> clauses = dataSourceAnalysis.getPreJoinableClauses();
+    if (clauses.isEmpty()) {
+      keyBuilder = new CacheKeyBuilder(REGULAR_OPERATION);

Review comment:
       Modified it such that the existing cache will continue to work. 

##########
File path: processing/src/main/java/org/apache/druid/segment/join/Joinables.java
##########
@@ -118,6 +126,47 @@ public static boolean isPrefixedBy(final String columnName, final String prefix)
     );
   }
 
+  /**
+   * Compute a cache key prefix for data sources that is to be used in segment level and result level caches. The
+   * data source can either be base (clauses is empty) or RHS of a join (clauses is non-empty). In both of the cases,
+   * a non-null cache is returned. However, the cache key is null if there is a join and some of the right data sources
+   * participating in the join do not support caching yet
+   *
+   * @param dataSourceAnalysis
+   * @param joinableFactory
+   * @return
+   */
+  public static Optional<byte[]> computeDataSourceCacheKey(
+      final DataSourceAnalysis dataSourceAnalysis,
+      final JoinableFactory joinableFactory
+  )
+  {
+    final CacheKeyBuilder keyBuilder;
+    final List<PreJoinableClause> clauses = dataSourceAnalysis.getPreJoinableClauses();
+    if (clauses.isEmpty()) {
+      keyBuilder = new CacheKeyBuilder(REGULAR_OPERATION);

Review comment:
       Modified it such that the existing cache will continue to be usable. 




----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscribe@druid.apache.org
For additional commands, e-mail: commits-help@druid.apache.org


[GitHub] [druid] abhishekagarwal87 commented on a change in pull request #10366: Add caching support to join queries

Posted by GitBox <gi...@apache.org>.
abhishekagarwal87 commented on a change in pull request #10366:
URL: https://github.com/apache/druid/pull/10366#discussion_r491913529



##########
File path: server/src/main/java/org/apache/druid/server/coordination/ServerManager.java
##########
@@ -293,8 +300,9 @@ public ServerManager(
         queryMetrics -> queryMetrics.segment(segmentIdString)
     );
 
-    CachingQueryRunner<T> cachingQueryRunner = new CachingQueryRunner<>(
+    QueryRunner<T> queryRunner = new CachingQueryRunner<>(

Review comment:
       will revert. must have been some temporary refactoring. 




----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscribe@druid.apache.org
For additional commands, e-mail: commits-help@druid.apache.org


[GitHub] [druid] abhishekagarwal87 commented on a change in pull request #10366: Add caching support to join queries

Posted by GitBox <gi...@apache.org>.
abhishekagarwal87 commented on a change in pull request #10366:
URL: https://github.com/apache/druid/pull/10366#discussion_r494838045



##########
File path: processing/src/main/java/org/apache/druid/segment/join/table/BroadcastSegmentIndexedTable.java
##########
@@ -241,6 +244,22 @@ public ColumnSelectorFactory makeColumnSelectorFactory(ReadableOffset offset, bo
     );
   }
 
+  @Override
+  public Optional<byte[]> computeCacheKey()
+  {
+    SegmentId segmentId = segment.getId();

Review comment:
       Thinking out loud here. For the cache key of queries, I understand that a prefix can help when e.g. a `groupBy` and `TimeSeries` have the same filters, aggregators, etc. For `IndexedTable`, a similar scenario can occur if we have two different implementations of `IndexedTable` that can be invoked on the same segment. when can that occur? May be if the cache is external and the historical/broker is upgraded to a new implementation of `IndexedTable` applicable to that segment? 
   I don't mind adding the key prefix. Just trying to understand it better. 
   




----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscribe@druid.apache.org
For additional commands, e-mail: commits-help@druid.apache.org


[GitHub] [druid] abhishekagarwal87 commented on a change in pull request #10366: Add caching support to join queries

Posted by GitBox <gi...@apache.org>.
abhishekagarwal87 commented on a change in pull request #10366:
URL: https://github.com/apache/druid/pull/10366#discussion_r492907168



##########
File path: processing/src/main/java/org/apache/druid/segment/join/JoinableFactory.java
##########
@@ -43,8 +43,18 @@
    *
    * @param dataSource the datasource to join on
    * @param condition  the condition to join on
-   *
    * @return a Joinable if this datasource + condition combo is joinable; empty if not
    */
   Optional<Joinable> build(DataSource dataSource, JoinConditionAnalysis condition);
+
+  /**
+   * Compute the cache key for a data source participating in join operation. This is done separately from {{@link #build(DataSource, JoinConditionAnalysis)}}
+   * which can be an expensive operation and can potentially be avoided if cached results can be used.
+   *
+   * @param dataSource the datasource to join on
+   */
+  default Optional<byte[]> computeJoinCacheKey(DataSource dataSource)

Review comment:
       hmm. I do believe this condition should be outside. In some cases, when one of the conditions cannot be hash-joined, we can skip building the other expensive Joinable objects themselves when query involve multiple joins. That being said, I understand that we do not want to change the interfaces right now. Though when new join algorithms come into being, I believe these interfaces will change as you have to somehow pass what join algorithm you intend to use. 




----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscribe@druid.apache.org
For additional commands, e-mail: commits-help@druid.apache.org


[GitHub] [druid] gianm commented on a change in pull request #10366: Add caching support to join queries

Posted by GitBox <gi...@apache.org>.
gianm commented on a change in pull request #10366:
URL: https://github.com/apache/druid/pull/10366#discussion_r494681260



##########
File path: processing/src/main/java/org/apache/druid/segment/join/table/IndexedTable.java
##########
@@ -87,6 +88,26 @@ default ColumnSelectorFactory makeColumnSelectorFactory(ReadableOffset offset, b
     return null;
   }
 
+  /**
+   * Computes a {@code byte[]} key for the table that can be used for computing cache keys for join operations.
+   * see {@link org.apache.druid.segment.join.JoinableFactory#computeJoinCacheKey}
+   *
+   * @return the byte array for cache key
+   * @throws {@link IAE} if caching is not supported
+   */
+  default byte[] computeCacheKey()

Review comment:
       Consider naming this `getCacheKey()` and extending Cacheable. That interface is mainly used by CacheKeyBuilder, which it doesn't look like we actually need here, but it's still nice to use that interface for things that can generate their own cache keys.

##########
File path: server/src/main/java/org/apache/druid/client/CachingClusteredClient.java
##########
@@ -770,4 +749,102 @@ private void addSequencesFromServer(
           .flatMerge(seq -> seq, query.getResultOrdering());
     }
   }
+
+  /**
+   * An inner class that is used solely for computing cache keys. Its a separate class to allow extensive unit testing
+   * of cache key generation.
+   */
+  @VisibleForTesting
+  static class CacheKeyManager<T>
+  {
+    private final Query<T> query;
+    private final CacheStrategy<T, Object, Query<T>> strategy;
+    private final DataSourceAnalysis dataSourceAnalysis;
+    private final Joinables joinables;
+    private final boolean isSegmentLevelCachingEnable;
+
+    CacheKeyManager(
+        final Query<T> query,
+        final CacheStrategy<T, Object, Query<T>> strategy,
+        final boolean useCache,
+        final boolean populateCache,
+        final DataSourceAnalysis dataSourceAnalysis,
+        final Joinables joinables
+    )
+    {
+
+      this.query = query;
+      this.strategy = strategy;
+      this.dataSourceAnalysis = dataSourceAnalysis;
+      this.joinables = joinables;
+      this.isSegmentLevelCachingEnable = ((populateCache || useCache)
+                                          && !QueryContexts.isBySegment(query));   // explicit bySegment queries are never cached
+
+    }
+
+    @Nullable
+    byte[] computeSegmentLevelQueryCacheKey()
+    {
+      if (isSegmentLevelCachingEnable) {
+        return computeQueryCacheKeyWithJoin();
+      }
+      return null;
+    }
+
+    /**
+     * It computes the ETAG which is used by {@link org.apache.druid.query.ResultLevelCachingQueryRunner} for
+     * result level caches. queryCacheKey can be null if segment level cache is not being used. However, ETAG
+     * is still computed since result level cache may still be on.
+     */
+    @Nullable
+    String computeResultLevelCachingEtag(
+        final Set<SegmentServerSelector> segments,
+        @Nullable byte[] queryCacheKey
+    )
+    {
+      Hasher hasher = Hashing.sha1().newHasher();
+      boolean hasOnlyHistoricalSegments = true;
+      for (SegmentServerSelector p : segments) {
+        if (!p.getServer().pick().getServer().isSegmentReplicationTarget()) {
+          hasOnlyHistoricalSegments = false;
+          break;
+        }
+        hasher.putString(p.getServer().getSegment().getId().toString(), StandardCharsets.UTF_8);
+        // it is important to add the "query interval" as part ETag calculation
+        // to have result level cache work correctly for queries with different
+        // intervals covering the same set of segments
+        hasher.putString(p.rhs.getInterval().toString(), StandardCharsets.UTF_8);
+      }
+
+      if (!hasOnlyHistoricalSegments) {
+        return null;
+      }
+
+      // query cache key can be null if segment level caching is disabled
+      final byte[] queryCacheKeyFinal = (queryCacheKey == null) ? computeQueryCacheKeyWithJoin() : queryCacheKey;
+      if (queryCacheKeyFinal == null) {
+        return null;
+      }
+      hasher.putBytes(queryCacheKeyFinal);
+      String currEtag = StringUtils.encodeBase64String(hasher.hash().asBytes());
+      return currEtag;
+    }
+
+    /**
+     * Adds the cache key prefix for join data sources. Return null if its a join but caching is not supported
+     */
+    @Nullable
+    private byte[] computeQueryCacheKeyWithJoin()
+    {
+      assert strategy != null;  // implies strategy != null

Review comment:
       I think you just copied this from somewhere else, but IMO it would be better as a precondition check (i.e. throw ISE or NPE if it fails) instead of an assertion.
   
   Generally we'll use assertions to note things that are meant to be _impossible_, and precondition checks for things that are possible but are incorrect usage. Since it is possible to create this CacheKeyManager class with a null `strategy` and then call `computeQueryCacheKeyWithJoin`, it'd be better for this to be a precondition check.

##########
File path: processing/src/main/java/org/apache/druid/segment/join/table/BroadcastSegmentIndexedTable.java
##########
@@ -241,6 +244,22 @@ public ColumnSelectorFactory makeColumnSelectorFactory(ReadableOffset offset, bo
     );
   }
 
+  @Override
+  public Optional<byte[]> computeCacheKey()
+  {
+    SegmentId segmentId = segment.getId();

Review comment:
       I think there actually should be a prefix byte here, because the prefix bytes' purpose is to prevent cache key collisions for two implementations that are different but compute cache keys the same way. So each implementation should have its own prefix byte.
   
   I don't have a strong opinion on where the code should live, but it would be good to share it somehow, if that's not too much trouble.

##########
File path: server/src/main/java/org/apache/druid/client/CachingClusteredClient.java
##########
@@ -293,6 +301,14 @@ private ClusterQueryResult(Sequence<T> sequence, int numQueryServers)
       this.intervals = dataSourceAnalysis.getBaseQuerySegmentSpec()
                                          .map(QuerySegmentSpec::getIntervals)
                                          .orElseGet(() -> query.getIntervals());
+      this.cacheKeyManager = new CacheKeyManager<>(

Review comment:
       I don't think the concept of computing cache keys in CachingClusteredClient works properly here, because of the following scenario:
   
   1. There is a broadcast table that we'll be joining against.
   2. The Broker (which runs CCC) updates its broadcast table to a newer version.
   3. The Broker gets a query and uses the newer version in the cache key.
   4. It fans out the query to a data server that hasn't got the newest broadcast table yet.
   5. The data server returns results for the older table, and the Broker caches them.
   6. Now, the Broker's cache is wrong (it refers to older data with a newer key).
   
   The solution that comes to mind is that the data servers should keep both the old and new version around for a bit when they swap them out, and the Broker should send the specific version that it wants them to use, so it can be sure it's getting the right one. But this is out of the scope of this PR. For now I suggest not implementing caching for join datasources on the Broker. (Broker caching is off by default, anyway, so it's not the end of the world.)

##########
File path: processing/src/main/java/org/apache/druid/segment/join/Joinables.java
##########
@@ -35,57 +39,47 @@
 import javax.annotation.Nullable;
 import java.util.Comparator;
 import java.util.List;
+import java.util.Optional;
 import java.util.concurrent.atomic.AtomicLong;
 import java.util.function.Function;
 
 /**
- * Utility methods for working with {@link Joinable} related classes.
+ * A wrapper class over {@link JoinableFactory} for working with {@link Joinable} related classes.

Review comment:
       I think it'd be better to make the change now, since "Joinables" is meant to be a non-constructible holder of utility functions, and so making it constructible is unnecessarily changing its character. As to cluttering the PR, IMO adding a new class is likely to yield a _less_ cluttered PR, because it won't involve changes to the pre-existing utility class.

##########
File path: server/src/main/java/org/apache/druid/client/CachingQueryRunner.java
##########
@@ -77,20 +83,15 @@ public CachingQueryRunner(
   {
     Query<T> query = queryPlus.getQuery();
     final CacheStrategy strategy = toolChest.getCacheStrategy(query);
-    final boolean populateCache = CacheUtil.isPopulateSegmentCache(
-        query,
-        strategy,
-        cacheConfig,
-        CacheUtil.ServerType.DATA
-    );
-    final boolean useCache = CacheUtil.isUseSegmentCache(query, strategy, cacheConfig, CacheUtil.ServerType.DATA);
+    final boolean populateCache = canPopulateCache(query, strategy);
+    final boolean useCache = canUseCache(query, strategy);
 
     final Cache.NamedKey key;
-    if (strategy != null && (useCache || populateCache)) {

Review comment:
       Is it not possible for `strategy` to be null?

##########
File path: processing/src/main/java/org/apache/druid/segment/join/Joinables.java
##########
@@ -118,6 +107,68 @@ public static boolean isPrefixedBy(final String columnName, final String prefix)
     );
   }
 
+  /**
+   * Compute a cache key prefix for data sources that participate in the RHS of a join. This key prefix
+   * can be used in segment level cache or result level cache. The function can return following wrapped in an
+   * Optional
+   *  - Non-empty byte array - If there is join datasource involved and caching is possible. The result includes
+   *  join condition expression, join type and cache key returned by joinable factory for each {@link PreJoinableClause}
+   *  - NULL - There is a join but caching is not possible. It may happen if one of the participating datasource
+   *  in the JOIN is not cacheable.
+   *
+   * @throws {@link IAE} if this operation is called on a non-join data source
+   * @param dataSourceAnalysis for the join datasource
+   * @return the optional cache key to be used as part of query cache key
+   */
+  public Optional<byte[]> computeJoinDataSourceCacheKey(
+      final DataSourceAnalysis dataSourceAnalysis
+  )
+  {
+    final List<PreJoinableClause> clauses = dataSourceAnalysis.getPreJoinableClauses();
+    if (clauses.isEmpty()) {
+      throw new IAE("No join clauses to build the cache key for data source [%s]", dataSourceAnalysis.getDataSource());
+    }
+
+    final CacheKeyBuilder keyBuilder;
+    keyBuilder = new CacheKeyBuilder(JOIN_OPERATION);
+    for (PreJoinableClause clause : clauses) {
+      Optional<byte[]> bytes = joinableFactory.computeJoinCacheKey(clause.getDataSource(), clause.getCondition());
+      if (!bytes.isPresent()) {
+        // Encountered a data source which didn't support cache yet
+        log.debug("skipping caching for join since [%s] does not support caching", clause.getDataSource());
+        return Optional.empty();
+      }
+      keyBuilder.appendByteArray(bytes.get());
+      keyBuilder.appendString(clause.getCondition().getOriginalExpression());
+      keyBuilder.appendString(clause.getJoinType().name());

Review comment:
       The clause's prefix is important too, because it controls the names of the columns.




----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscribe@druid.apache.org
For additional commands, e-mail: commits-help@druid.apache.org


[GitHub] [druid] abhishekagarwal87 commented on pull request #10366: Add caching support to join queries

Posted by GitBox <gi...@apache.org>.
abhishekagarwal87 commented on pull request #10366:
URL: https://github.com/apache/druid/pull/10366#issuecomment-697917813


   I will run some manual tests before this is merged. 


----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscribe@druid.apache.org
For additional commands, e-mail: commits-help@druid.apache.org


[GitHub] [druid] abhishekagarwal87 commented on a change in pull request #10366: Proposed changes for making joins cacheable

Posted by GitBox <gi...@apache.org>.
abhishekagarwal87 commented on a change in pull request #10366:
URL: https://github.com/apache/druid/pull/10366#discussion_r487835902



##########
File path: processing/src/main/java/org/apache/druid/segment/join/table/BroadcastSegmentIndexedTable.java
##########
@@ -241,6 +244,22 @@ public ColumnSelectorFactory makeColumnSelectorFactory(ReadableOffset offset, bo
     );
   }
 
+  @Override
+  public Optional<byte[]> computeCacheKey()
+  {
+    SegmentId segmentId = segment.getId();

Review comment:
       I wanted to use `CacheUtil` here but that is in `server` module and not accessible here. May be I can move that class to `core`?




----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscribe@druid.apache.org
For additional commands, e-mail: commits-help@druid.apache.org


[GitHub] [druid] abhishekagarwal87 commented on pull request #10366: Add caching support to join queries

Posted by GitBox <gi...@apache.org>.
abhishekagarwal87 commented on pull request #10366:
URL: https://github.com/apache/druid/pull/10366#issuecomment-696768403


   > Hi @abhishekagarwal87, thanks for the PR. The overall design to compute cache key for hash join looks reasonable to me. I left some comments on details.
   > 
   > You mentioned that unit tests should be added for `CachingQueryRunner` in the PR description. Are you planning to do it in this PR? I think it would be nice to do together in a same PR. Also, please add some for `CachingClusteredClient` as well.
   
   Hi @jihoonson - I have added these tests. I had to refactor `CachingClusteredClient` somewhat to unit test the cache key related pieces. Same goes for `CachingQueryRunner`


----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscribe@druid.apache.org
For additional commands, e-mail: commits-help@druid.apache.org


[GitHub] [druid] abhishekagarwal87 commented on a change in pull request #10366: Add caching support to join queries

Posted by GitBox <gi...@apache.org>.
abhishekagarwal87 commented on a change in pull request #10366:
URL: https://github.com/apache/druid/pull/10366#discussion_r491995923



##########
File path: processing/src/main/java/org/apache/druid/segment/join/table/BroadcastSegmentIndexedTable.java
##########
@@ -241,6 +244,22 @@ public ColumnSelectorFactory makeColumnSelectorFactory(ReadableOffset offset, bo
     );
   }
 
+  @Override
+  public Optional<byte[]> computeCacheKey()
+  {
+    SegmentId segmentId = segment.getId();

Review comment:
       `CacheKeyBuilder` requires a prefix byte key in its constructor and hence I was not using it everywhere. I could add some fixed byte key though not sure how much does that help. 




----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscribe@druid.apache.org
For additional commands, e-mail: commits-help@druid.apache.org


[GitHub] [druid] gianm commented on a change in pull request #10366: Add caching support to join queries

Posted by GitBox <gi...@apache.org>.
gianm commented on a change in pull request #10366:
URL: https://github.com/apache/druid/pull/10366#discussion_r496468752



##########
File path: processing/src/main/java/org/apache/druid/segment/join/table/BroadcastSegmentIndexedTable.java
##########
@@ -241,6 +244,22 @@ public ColumnSelectorFactory makeColumnSelectorFactory(ReadableOffset offset, bo
     );
   }
 
+  @Override
+  public Optional<byte[]> computeCacheKey()
+  {
+    SegmentId segmentId = segment.getId();

Review comment:
       > For IndexedTable, a similar scenario can occur if we have two different implementations of IndexedTable that can be invoked on the same segment. when can that occur? May be if the cache is external and the historical/broker is upgraded to a new implementation of IndexedTable applicable to that segment?
   
   I didn’t have a specific scenario in mind, I was just thinking that it’s generally a good practice to have a namespace prefix like this when defining a cache key for an interface implementation. Even if there aren’t others today, it’s possible there will be in the future, and it’s easy for new implementers to forget to update the cache keys of the existing implementations. Better to do it ahead of time.
   
   




----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscribe@druid.apache.org
For additional commands, e-mail: commits-help@druid.apache.org


[GitHub] [druid] clintropolis commented on a change in pull request #10366: Add caching support to join queries

Posted by GitBox <gi...@apache.org>.
clintropolis commented on a change in pull request #10366:
URL: https://github.com/apache/druid/pull/10366#discussion_r500129877



##########
File path: server/src/main/java/org/apache/druid/server/coordination/ServerManager.java
##########
@@ -117,7 +117,7 @@ public ServerManager(
 
     this.cacheConfig = cacheConfig;
     this.segmentManager = segmentManager;
-    this.joinableFactory = joinableFactory;
+    this.joinableFactoryWrapper = new JoinableFactoryWrapper(joinableFactory);

Review comment:
       `JoinableFactoryWrapper` still doesn't seem quite right. These walkers all take a `JoinableFactory` just to make a `JoinableFactoryWrapper` in the constructor. `JoinableFactoryWrapper` doesn't seem to have any state of its own, why not just make it in the joinable module and inject it directly? Or maybe it didn't really need to change from static methods?
   
   Regardless, it doesn't need to change in this PR, we can refactor this in the future, since it feels like maybe there is also another refactor lurking that I haven't quite figured out in wrapping `CacheConfig`, `CachePopulator`, `Cache` (and maybe `CacheKeyManager` for brokers) into some tidy package to handle caching stuffs for walkers with implementations that do the right thing for where they are running.




----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscribe@druid.apache.org
For additional commands, e-mail: commits-help@druid.apache.org


[GitHub] [druid] abhishekagarwal87 commented on a change in pull request #10366: Add caching support to join queries

Posted by GitBox <gi...@apache.org>.
abhishekagarwal87 commented on a change in pull request #10366:
URL: https://github.com/apache/druid/pull/10366#discussion_r491996256



##########
File path: processing/src/test/java/org/apache/druid/segment/join/JoinablesTest.java
##########
@@ -190,6 +196,176 @@ public boolean isDirectlyJoinable(DataSource dataSource)
     Assert.assertNotSame(Function.identity(), segmentMapFn);
   }
 
+  @Test(expected = IAE.class)
+  public void test_computeJoinDataSourceCacheKey_noClauses()
+  {
+    DataSourceAnalysis analysis = EasyMock.mock(DataSourceAnalysis.class);
+    EasyMock.expect(analysis.getPreJoinableClauses()).andReturn(Collections.emptyList()).anyTimes();
+    EasyMock.replay(analysis);
+    Joinables joinables = new Joinables(new JoinableFactoryWithCacheKey());
+    joinables.computeJoinDataSourceCacheKey(analysis);
+  }
+
+  @Test
+  public void test_computeJoinDataSourceCacheKey_noHashJoin()
+  {
+
+    PreJoinableClause clause_1 = makeGlobalPreJoinableClause("dataSource_1", "x == \"j.x\"", "j.");
+    PreJoinableClause clause_2 = makeGlobalPreJoinableClause("dataSource_2", "x != \"h.x\"", "h.");
+    DataSourceAnalysis analysis = EasyMock.mock(DataSourceAnalysis.class);
+    EasyMock.expect(analysis.getPreJoinableClauses()).andReturn(Arrays.asList(clause_1, clause_2)).anyTimes();
+    EasyMock.replay(analysis);
+    Joinables joinables = new Joinables(new JoinableFactoryWithCacheKey());
+    Optional<byte[]> cacheKey = joinables.computeJoinDataSourceCacheKey(analysis);
+
+    Assert.assertFalse(cacheKey.isPresent());
+  }
+
+  @Test
+  public void test_computeJoinDataSourceCacheKey_cachingUnsupported()
+  {
+    PreJoinableClause clause_1 = makeGlobalPreJoinableClause("dataSource_1", "x == \"j.x\"", "j.");
+    DataSource dataSource = new LookupDataSource("lookup");
+    PreJoinableClause clause_2 = makePreJoinableClause(dataSource, "x == \"h.x\"", "h.", JoinType.LEFT);
+    DataSourceAnalysis analysis = EasyMock.mock(DataSourceAnalysis.class);
+    EasyMock.expect(analysis.getPreJoinableClauses()).andReturn(Arrays.asList(clause_1, clause_2)).anyTimes();
+    EasyMock.replay(analysis);
+    Joinables joinables = new Joinables(new JoinableFactoryWithCacheKey());
+    Optional<byte[]> cacheKey = joinables.computeJoinDataSourceCacheKey(analysis);
+
+    Assert.assertFalse(cacheKey.isPresent());
+  }
+
+  @Test
+  public void test_computeJoinDataSourceCacheKey_usableClauses()
+  {
+
+    PreJoinableClause clause_1 = makeGlobalPreJoinableClause("dataSource_1", "x == \"j.x\"", "j.");
+    PreJoinableClause clause_2 = makeGlobalPreJoinableClause("dataSource_2", "x == \"h.x\"", "h.");
+    DataSourceAnalysis analysis = EasyMock.mock(DataSourceAnalysis.class);
+    EasyMock.expect(analysis.getPreJoinableClauses()).andReturn(Arrays.asList(clause_1, clause_2)).anyTimes();
+    EasyMock.replay(analysis);
+    Joinables joinables = new Joinables(new JoinableFactoryWithCacheKey());
+    Optional<byte[]> cacheKey = joinables.computeJoinDataSourceCacheKey(analysis);
+
+    Assert.assertTrue(cacheKey.isPresent());
+  }
+
+  @Test
+  public void test_computeJoinDataSourceCacheKey_keyChangesWithExpression()
+  {
+    DataSourceAnalysis analysis = EasyMock.mock(DataSourceAnalysis.class);
+    Joinables joinables = new Joinables(new JoinableFactoryWithCacheKey());
+
+    PreJoinableClause clause_1 = makeGlobalPreJoinableClause("dataSource_1", "y == \"j.y\"", "j.");
+    EasyMock.expect(analysis.getPreJoinableClauses()).andReturn(Collections.singletonList(clause_1)).anyTimes();
+    EasyMock.replay(analysis);
+
+    Optional<byte[]> cacheKey_1 = joinables.computeJoinDataSourceCacheKey(analysis);
+    Assert.assertTrue(cacheKey_1.isPresent());
+    Assert.assertNotEquals(0, cacheKey_1.get().length);
+
+    PreJoinableClause clause_2 = makeGlobalPreJoinableClause("dataSource_1", "x == \"j.x\"", "j.");
+    EasyMock.reset(analysis);
+    EasyMock.expect(analysis.getPreJoinableClauses()).andReturn(Collections.singletonList(clause_2)).anyTimes();
+    EasyMock.replay(analysis);
+    Optional<byte[]> cacheKey_2 = joinables.computeJoinDataSourceCacheKey(analysis);
+
+    Assert.assertNotEquals(cacheKey_1, cacheKey_2);
+  }
+
+  @Test
+  public void test_computeJoinDataSourceCacheKey_keyChangesWithJoinType()
+  {
+    DataSourceAnalysis analysis = EasyMock.mock(DataSourceAnalysis.class);
+    Joinables joinables = new Joinables(new JoinableFactoryWithCacheKey());
+
+    PreJoinableClause clause_1 = makeGlobalPreJoinableClause("dataSource_1", "x == \"j.x\"", "j.", JoinType.LEFT);
+    EasyMock.expect(analysis.getPreJoinableClauses()).andReturn(Collections.singletonList(clause_1)).anyTimes();
+    EasyMock.replay(analysis);
+
+    Optional<byte[]> cacheKey_1 = joinables.computeJoinDataSourceCacheKey(analysis);
+    Assert.assertTrue(cacheKey_1.isPresent());
+    Assert.assertNotEquals(0, cacheKey_1.get().length);
+
+    PreJoinableClause clause_2 = makeGlobalPreJoinableClause("dataSource_1", "x == \"j.x\"", "j.", JoinType.INNER);
+    EasyMock.reset(analysis);
+    EasyMock.expect(analysis.getPreJoinableClauses()).andReturn(Collections.singletonList(clause_2)).anyTimes();
+    EasyMock.replay(analysis);
+    Optional<byte[]> cacheKey_2 = joinables.computeJoinDataSourceCacheKey(analysis);
+
+    Assert.assertNotEquals(cacheKey_1, cacheKey_2);
+  }
+
+  @Test
+  public void test_computeJoinDataSourceCacheKey_keyChangesWithPrefix()
+  {
+    DataSourceAnalysis analysis = EasyMock.mock(DataSourceAnalysis.class);
+    Joinables joinables = new Joinables(new JoinableFactoryWithCacheKey());
+
+    PreJoinableClause clause_1 = makeGlobalPreJoinableClause("dataSource_1", "x == \"j.x\"", "j.");
+    EasyMock.expect(analysis.getPreJoinableClauses()).andReturn(Collections.singletonList(clause_1)).anyTimes();
+    EasyMock.replay(analysis);
+
+    Optional<byte[]> cacheKey_1 = joinables.computeJoinDataSourceCacheKey(analysis);
+    Assert.assertTrue(cacheKey_1.isPresent());
+    Assert.assertNotEquals(0, cacheKey_1.get().length);
+
+    PreJoinableClause clause_2 = makeGlobalPreJoinableClause("dataSource_1", "x == \"h.x\"", "h.");
+    EasyMock.reset(analysis);
+    EasyMock.expect(analysis.getPreJoinableClauses()).andReturn(Collections.singletonList(clause_2)).anyTimes();
+    EasyMock.replay(analysis);
+    Optional<byte[]> cacheKey_2 = joinables.computeJoinDataSourceCacheKey(analysis);
+
+    Assert.assertNotEquals(cacheKey_1, cacheKey_2);
+  }
+
+  @Test
+  public void test_computeJoinDataSourceCacheKey_keyChangesWithJoinable()
+  {
+    DataSourceAnalysis analysis = EasyMock.mock(DataSourceAnalysis.class);
+    Joinables joinables = new Joinables(new JoinableFactoryWithCacheKey());
+
+    PreJoinableClause clause_1 = makeGlobalPreJoinableClause("dataSource_1", "x == \"j.x\"", "j.");
+    EasyMock.expect(analysis.getPreJoinableClauses()).andReturn(Collections.singletonList(clause_1)).anyTimes();
+    EasyMock.replay(analysis);
+
+    Optional<byte[]> cacheKey_1 = joinables.computeJoinDataSourceCacheKey(analysis);
+    Assert.assertTrue(cacheKey_1.isPresent());
+    Assert.assertNotEquals(0, cacheKey_1.get().length);
+
+    PreJoinableClause clause_2 = makeGlobalPreJoinableClause("dataSource_2", "x == \"j.x\"", "j.");
+    EasyMock.reset(analysis);
+    EasyMock.expect(analysis.getPreJoinableClauses()).andReturn(Collections.singletonList(clause_2)).anyTimes();
+    EasyMock.replay(analysis);
+    Optional<byte[]> cacheKey_2 = joinables.computeJoinDataSourceCacheKey(analysis);
+
+    Assert.assertNotEquals(cacheKey_1, cacheKey_2);
+  }
+
+  @Test
+  public void test_computeJoinDataSourceCacheKey_sameKeyForSameJoin()
+  {
+    DataSourceAnalysis analysis = EasyMock.mock(DataSourceAnalysis.class);
+    Joinables joinables = new Joinables(new JoinableFactoryWithCacheKey());
+
+    PreJoinableClause clause_1 = makeGlobalPreJoinableClause("dataSource_1", "x == \"j.x\"", "j.");
+    EasyMock.expect(analysis.getPreJoinableClauses()).andReturn(Collections.singletonList(clause_1)).anyTimes();
+    EasyMock.replay(analysis);
+
+    Optional<byte[]> cacheKey_1 = joinables.computeJoinDataSourceCacheKey(analysis);
+    Assert.assertTrue(cacheKey_1.isPresent());
+    Assert.assertNotEquals(0, cacheKey_1.get().length);
+
+    PreJoinableClause clause_2 = makeGlobalPreJoinableClause("dataSource_1", "x == \"j.x\"", "j.");
+    EasyMock.reset(analysis);
+    EasyMock.expect(analysis.getPreJoinableClauses()).andReturn(Collections.singletonList(clause_2)).anyTimes();
+    EasyMock.replay(analysis);
+    Optional<byte[]> cacheKey_2 = joinables.computeJoinDataSourceCacheKey(analysis);
+
+    Assert.assertNotEquals(cacheKey_1, cacheKey_2);

Review comment:
       oops. Thanks for catching this. 




----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscribe@druid.apache.org
For additional commands, e-mail: commits-help@druid.apache.org


[GitHub] [druid] lgtm-com[bot] commented on pull request #10366: Proposed changes for making joins cacheable

Posted by GitBox <gi...@apache.org>.
lgtm-com[bot] commented on pull request #10366:
URL: https://github.com/apache/druid/pull/10366#issuecomment-692828782


   This pull request **introduces 2 alerts** when merging 0fbe8d836a3885672779de32c7a3beffc5486100 into e465f057170052537c98e779373d441a449ce780 - [view on LGTM.com](https://lgtm.com/projects/g/apache/druid/rev/pr-a95a31e8bf3b98986588d50d5eba05ab23250991)
   
   **new alerts:**
   
   * 2 for Useless null check


----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscribe@druid.apache.org
For additional commands, e-mail: commits-help@druid.apache.org


[GitHub] [druid] abhishekagarwal87 commented on a change in pull request #10366: Add caching support to join queries

Posted by GitBox <gi...@apache.org>.
abhishekagarwal87 commented on a change in pull request #10366:
URL: https://github.com/apache/druid/pull/10366#discussion_r494807466



##########
File path: processing/src/main/java/org/apache/druid/segment/join/table/IndexedTable.java
##########
@@ -87,6 +88,26 @@ default ColumnSelectorFactory makeColumnSelectorFactory(ReadableOffset offset, b
     return null;
   }
 
+  /**
+   * Computes a {@code byte[]} key for the table that can be used for computing cache keys for join operations.
+   * see {@link org.apache.druid.segment.join.JoinableFactory#computeJoinCacheKey}
+   *
+   * @return the byte array for cache key
+   * @throws {@link IAE} if caching is not supported
+   */
+  default byte[] computeCacheKey()

Review comment:
       I think part of the reason for not doing so was that not all IndexedTables are Cacheable. That is why I have two methods 
   `isCacheable`
   `computeCacheKey`
   I could get rid of `isCacheable` and return null from `computeCacheKey` when caching is unsupported. That didn't look very robust to me. 

##########
File path: server/src/main/java/org/apache/druid/client/CachingQueryRunner.java
##########
@@ -77,20 +83,15 @@ public CachingQueryRunner(
   {
     Query<T> query = queryPlus.getQuery();
     final CacheStrategy strategy = toolChest.getCacheStrategy(query);
-    final boolean populateCache = CacheUtil.isPopulateSegmentCache(
-        query,
-        strategy,
-        cacheConfig,
-        CacheUtil.ServerType.DATA
-    );
-    final boolean useCache = CacheUtil.isUseSegmentCache(query, strategy, cacheConfig, CacheUtil.ServerType.DATA);
+    final boolean populateCache = canPopulateCache(query, strategy);
+    final boolean useCache = canUseCache(query, strategy);
 
     final Cache.NamedKey key;
-    if (strategy != null && (useCache || populateCache)) {

Review comment:
       if its null, then both `useCache` and `populateCache` will be set to false. if that doesn't hold, then an NPE can occur later in the code. I can still add another check in `canUseCache`, `canPopulateCache`. 

##########
File path: processing/src/main/java/org/apache/druid/segment/join/table/BroadcastSegmentIndexedTable.java
##########
@@ -241,6 +244,22 @@ public ColumnSelectorFactory makeColumnSelectorFactory(ReadableOffset offset, bo
     );
   }
 
+  @Override
+  public Optional<byte[]> computeCacheKey()
+  {
+    SegmentId segmentId = segment.getId();

Review comment:
       Thinking out loud here. For the cache key of queries, I understand that a prefix can help when e.g. a `groupBy` and `TimeSeries` have the same filters, aggregators, etc. For `IndexedTable`, a similar scenario can occur if we have two different implementations of `IndexedTable` that can be invoked on the same segment. when can that occur? May be if the cache is external and the historical/broker is upgraded to a new implementation of `IndexedTable` applicable to that segment? 
   I don't mind adding the key prefix. Just trying to understand it better. 
   

##########
File path: server/src/main/java/org/apache/druid/client/CachingClusteredClient.java
##########
@@ -293,6 +301,14 @@ private ClusterQueryResult(Sequence<T> sequence, int numQueryServers)
       this.intervals = dataSourceAnalysis.getBaseQuerySegmentSpec()
                                          .map(QuerySegmentSpec::getIntervals)
                                          .orElseGet(() -> query.getIntervals());
+      this.cacheKeyManager = new CacheKeyManager<>(

Review comment:
       That is a good corner case. I didn't think of this. I don't see an easy flag to selectively disable result level caching for join. So I am just going to hard-code this in the new `CacheKeyManager`.  sounds good? 

##########
File path: server/src/main/java/org/apache/druid/client/CachingClusteredClient.java
##########
@@ -770,4 +749,102 @@ private void addSequencesFromServer(
           .flatMerge(seq -> seq, query.getResultOrdering());
     }
   }
+
+  /**
+   * An inner class that is used solely for computing cache keys. Its a separate class to allow extensive unit testing
+   * of cache key generation.
+   */
+  @VisibleForTesting
+  static class CacheKeyManager<T>
+  {
+    private final Query<T> query;
+    private final CacheStrategy<T, Object, Query<T>> strategy;
+    private final DataSourceAnalysis dataSourceAnalysis;
+    private final Joinables joinables;
+    private final boolean isSegmentLevelCachingEnable;
+
+    CacheKeyManager(
+        final Query<T> query,
+        final CacheStrategy<T, Object, Query<T>> strategy,
+        final boolean useCache,
+        final boolean populateCache,
+        final DataSourceAnalysis dataSourceAnalysis,
+        final Joinables joinables
+    )
+    {
+
+      this.query = query;
+      this.strategy = strategy;
+      this.dataSourceAnalysis = dataSourceAnalysis;
+      this.joinables = joinables;
+      this.isSegmentLevelCachingEnable = ((populateCache || useCache)
+                                          && !QueryContexts.isBySegment(query));   // explicit bySegment queries are never cached
+
+    }
+
+    @Nullable
+    byte[] computeSegmentLevelQueryCacheKey()
+    {
+      if (isSegmentLevelCachingEnable) {
+        return computeQueryCacheKeyWithJoin();
+      }
+      return null;
+    }
+
+    /**
+     * It computes the ETAG which is used by {@link org.apache.druid.query.ResultLevelCachingQueryRunner} for
+     * result level caches. queryCacheKey can be null if segment level cache is not being used. However, ETAG
+     * is still computed since result level cache may still be on.
+     */
+    @Nullable
+    String computeResultLevelCachingEtag(
+        final Set<SegmentServerSelector> segments,
+        @Nullable byte[] queryCacheKey
+    )
+    {
+      Hasher hasher = Hashing.sha1().newHasher();
+      boolean hasOnlyHistoricalSegments = true;
+      for (SegmentServerSelector p : segments) {
+        if (!p.getServer().pick().getServer().isSegmentReplicationTarget()) {
+          hasOnlyHistoricalSegments = false;
+          break;
+        }
+        hasher.putString(p.getServer().getSegment().getId().toString(), StandardCharsets.UTF_8);
+        // it is important to add the "query interval" as part ETag calculation
+        // to have result level cache work correctly for queries with different
+        // intervals covering the same set of segments
+        hasher.putString(p.rhs.getInterval().toString(), StandardCharsets.UTF_8);
+      }
+
+      if (!hasOnlyHistoricalSegments) {
+        return null;
+      }
+
+      // query cache key can be null if segment level caching is disabled
+      final byte[] queryCacheKeyFinal = (queryCacheKey == null) ? computeQueryCacheKeyWithJoin() : queryCacheKey;
+      if (queryCacheKeyFinal == null) {
+        return null;
+      }
+      hasher.putBytes(queryCacheKeyFinal);
+      String currEtag = StringUtils.encodeBase64String(hasher.hash().asBytes());
+      return currEtag;
+    }
+
+    /**
+     * Adds the cache key prefix for join data sources. Return null if its a join but caching is not supported
+     */
+    @Nullable
+    private byte[] computeQueryCacheKeyWithJoin()
+    {
+      assert strategy != null;  // implies strategy != null

Review comment:
       I think it was someone suppressing the intellij warning about strategy being null. I replaced it with `Precondition.checkNotNull`




----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscribe@druid.apache.org
For additional commands, e-mail: commits-help@druid.apache.org


[GitHub] [druid] abhishekagarwal87 commented on a change in pull request #10366: Proposed changes for making joins cacheable

Posted by GitBox <gi...@apache.org>.
abhishekagarwal87 commented on a change in pull request #10366:
URL: https://github.com/apache/druid/pull/10366#discussion_r487818427



##########
File path: server/src/main/java/org/apache/druid/server/coordination/ServerManager.java
##########
@@ -293,21 +302,28 @@ public ServerManager(
         queryMetrics -> queryMetrics.segment(segmentIdString)
     );
 
-    CachingQueryRunner<T> cachingQueryRunner = new CachingQueryRunner<>(
-        segmentIdString,
-        segmentDescriptor,
-        objectMapper,
-        cache,
-        toolChest,
-        metricsEmittingQueryRunnerInner,
-        cachePopulator,
-        cacheConfig
-    );
+    QueryRunner<T> queryRunner;

Review comment:
       I was thinking that less places worry about the different meanings of `cacheKeyPrefix` content, the better it is. So for CachingQueryRunner, `cacheKeyPrefix` is just a plain dumb byte array 




----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscribe@druid.apache.org
For additional commands, e-mail: commits-help@druid.apache.org


[GitHub] [druid] abhishekagarwal87 commented on a change in pull request #10366: Proposed changes for making joins cacheable

Posted by GitBox <gi...@apache.org>.
abhishekagarwal87 commented on a change in pull request #10366:
URL: https://github.com/apache/druid/pull/10366#discussion_r487837336



##########
File path: server/src/main/java/org/apache/druid/query/ResultLevelCachingQueryRunner.java
##########
@@ -86,7 +92,15 @@ public ResultLevelCachingQueryRunner(
     if (useResultCache || populateResultCache) {
 
       final String cacheKeyStr = StringUtils.fromUtf8(strategy.computeResultLevelCacheKey(query));
-      final byte[] cachedResultSet = fetchResultsFromResultLevelCache(cacheKeyStr);
+      DataSourceAnalysis analysis = DataSourceAnalysis.forDataSource(query.getDataSource());
+      byte[] dataSourceCacheKey = Joinables.computeDataSourceCacheKey(analysis, joinableFactory).orElse(null);
+      if (null == dataSourceCacheKey) {
+        return baseRunner.run(
+            queryPlus,
+            responseContext
+        );
+      }
+      final byte[] cachedResultSet = fetchResultsFromResultLevelCache(cacheKeyStr, dataSourceCacheKey);

Review comment:
       earlier `cacheKeyStr` was being used as namespace as well as the key which was just like unnecessary duplication. Internally, the namespace and key are composed together into another bigger key. By the way, this too would cause cache bust once on an upgrade. 




----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscribe@druid.apache.org
For additional commands, e-mail: commits-help@druid.apache.org


[GitHub] [druid] abhishekagarwal87 commented on a change in pull request #10366: Add caching support to join queries

Posted by GitBox <gi...@apache.org>.
abhishekagarwal87 commented on a change in pull request #10366:
URL: https://github.com/apache/druid/pull/10366#discussion_r493303987



##########
File path: processing/src/main/java/org/apache/druid/segment/join/Joinables.java
##########
@@ -35,57 +39,47 @@
 import javax.annotation.Nullable;
 import java.util.Comparator;
 import java.util.List;
+import java.util.Optional;
 import java.util.concurrent.atomic.AtomicLong;
 import java.util.function.Function;
 
 /**
- * Utility methods for working with {@link Joinable} related classes.
+ * A wrapper class over {@link JoinableFactory} for working with {@link Joinable} related classes.

Review comment:
       can I do that as a follow-up PR? It's a harmless refactoring but will just clutter this PR. Doing it separately will ensure that no unintended change is going in. 




----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscribe@druid.apache.org
For additional commands, e-mail: commits-help@druid.apache.org


[GitHub] [druid] jihoonson commented on a change in pull request #10366: Add caching support to join queries

Posted by GitBox <gi...@apache.org>.
jihoonson commented on a change in pull request #10366:
URL: https://github.com/apache/druid/pull/10366#discussion_r495122956



##########
File path: server/src/main/java/org/apache/druid/client/CachingClusteredClient.java
##########
@@ -293,6 +301,14 @@ private ClusterQueryResult(Sequence<T> sequence, int numQueryServers)
       this.intervals = dataSourceAnalysis.getBaseQuerySegmentSpec()
                                          .map(QuerySegmentSpec::getIntervals)
                                          .orElseGet(() -> query.getIntervals());
+      this.cacheKeyManager = new CacheKeyManager<>(

Review comment:
       Hmm, good catch. I think the real fix would probably be including the version of broadcast segments in the query which is sent to data servers.




----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscribe@druid.apache.org
For additional commands, e-mail: commits-help@druid.apache.org


[GitHub] [druid] abhishekagarwal87 commented on a change in pull request #10366: Add caching support to join queries

Posted by GitBox <gi...@apache.org>.
abhishekagarwal87 commented on a change in pull request #10366:
URL: https://github.com/apache/druid/pull/10366#discussion_r496842746



##########
File path: processing/src/main/java/org/apache/druid/segment/join/table/BroadcastSegmentIndexedTable.java
##########
@@ -241,6 +244,22 @@ public ColumnSelectorFactory makeColumnSelectorFactory(ReadableOffset offset, bo
     );
   }
 
+  @Override
+  public Optional<byte[]> computeCacheKey()
+  {
+    SegmentId segmentId = segment.getId();

Review comment:
       I have added the prefix here




----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscribe@druid.apache.org
For additional commands, e-mail: commits-help@druid.apache.org


[GitHub] [druid] suneet-s commented on pull request #10366: Add caching support to join queries

Posted by GitBox <gi...@apache.org>.
suneet-s commented on pull request #10366:
URL: https://github.com/apache/druid/pull/10366#issuecomment-702339457


   > We have per-query toggles and those should suffice IMO. The caching is anyway disabled on broker. `useCache` and `populateCache` parameters can be set to false in the query-context to disable caching on the individual query.
   
   Sounds good to me! Thanks


----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscribe@druid.apache.org
For additional commands, e-mail: commits-help@druid.apache.org


[GitHub] [druid] abhishekagarwal87 commented on a change in pull request #10366: Add caching support to join queries

Posted by GitBox <gi...@apache.org>.
abhishekagarwal87 commented on a change in pull request #10366:
URL: https://github.com/apache/druid/pull/10366#discussion_r495137286



##########
File path: server/src/main/java/org/apache/druid/client/CachingClusteredClient.java
##########
@@ -293,6 +301,14 @@ private ClusterQueryResult(Sequence<T> sequence, int numQueryServers)
       this.intervals = dataSourceAnalysis.getBaseQuerySegmentSpec()
                                          .map(QuerySegmentSpec::getIntervals)
                                          .orElseGet(() -> query.getIntervals());
+      this.cacheKeyManager = new CacheKeyManager<>(

Review comment:
       wouldn't it be better for data servers to send the segment version to the broker which it can either use in cache key or not cache at all. though doing this might be trickier than passing extra information in the query. 




----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscribe@druid.apache.org
For additional commands, e-mail: commits-help@druid.apache.org


[GitHub] [druid] gianm commented on a change in pull request #10366: Add caching support to join queries

Posted by GitBox <gi...@apache.org>.
gianm commented on a change in pull request #10366:
URL: https://github.com/apache/druid/pull/10366#discussion_r496099481



##########
File path: server/src/main/java/org/apache/druid/client/CachingClusteredClient.java
##########
@@ -293,6 +301,14 @@ private ClusterQueryResult(Sequence<T> sequence, int numQueryServers)
       this.intervals = dataSourceAnalysis.getBaseQuerySegmentSpec()
                                          .map(QuerySegmentSpec::getIntervals)
                                          .orElseGet(() -> query.getIntervals());
+      this.cacheKeyManager = new CacheKeyManager<>(

Review comment:
       Yeah, we'd need to do that. The Coordinator would need to participate in the logic somehow (it would need to _not_ issue any drops for the old segments until the new ones are fully loaded).




----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscribe@druid.apache.org
For additional commands, e-mail: commits-help@druid.apache.org


[GitHub] [druid] abhishekagarwal87 commented on a change in pull request #10366: Proposed changes for making joins cacheable

Posted by GitBox <gi...@apache.org>.
abhishekagarwal87 commented on a change in pull request #10366:
URL: https://github.com/apache/druid/pull/10366#discussion_r487819771



##########
File path: processing/src/main/java/org/apache/druid/segment/join/Joinables.java
##########
@@ -118,6 +126,47 @@ public static boolean isPrefixedBy(final String columnName, final String prefix)
     );
   }
 
+  /**
+   * Compute a cache key prefix for data sources that is to be used in segment level and result level caches. The
+   * data source can either be base (clauses is empty) or RHS of a join (clauses is non-empty). In both of the cases,
+   * a non-null cache is returned. However, the cache key is null if there is a join and some of the right data sources
+   * participating in the join do not support caching yet
+   *
+   * @param dataSourceAnalysis
+   * @param joinableFactory
+   * @return
+   */
+  public static Optional<byte[]> computeDataSourceCacheKey(
+      final DataSourceAnalysis dataSourceAnalysis,
+      final JoinableFactory joinableFactory
+  )
+  {
+    final CacheKeyBuilder keyBuilder;
+    final List<PreJoinableClause> clauses = dataSourceAnalysis.getPreJoinableClauses();
+    if (clauses.isEmpty()) {
+      keyBuilder = new CacheKeyBuilder(REGULAR_OPERATION);

Review comment:
       You are right. I was thinking that it will only be one time bust. And, that having a distinct prefix will eliminate the possibility of a cache key collision. 




----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscribe@druid.apache.org
For additional commands, e-mail: commits-help@druid.apache.org


[GitHub] [druid] abhishekagarwal87 commented on a change in pull request #10366: Add caching support to join queries

Posted by GitBox <gi...@apache.org>.
abhishekagarwal87 commented on a change in pull request #10366:
URL: https://github.com/apache/druid/pull/10366#discussion_r491897453



##########
File path: processing/src/main/java/org/apache/druid/segment/join/Joinables.java
##########
@@ -35,57 +39,47 @@
 import javax.annotation.Nullable;
 import java.util.Comparator;
 import java.util.List;
+import java.util.Optional;
 import java.util.concurrent.atomic.AtomicLong;
 import java.util.function.Function;
 
 /**
- * Utility methods for working with {@link Joinable} related classes.
+ * A wrapper class over {@link JoinableFactory} for working with {@link Joinable} related classes.

Review comment:
       I want to at least have some methods non-static which are somewhat non-trivial.  The unit testing becomes easier with that as I can pass the mocks etc. I can also create another class and move non-static methods there and will leave rest of the methods as it is. does that work? 

##########
File path: processing/src/main/java/org/apache/druid/segment/join/Joinables.java
##########
@@ -118,6 +112,74 @@ public static boolean isPrefixedBy(final String columnName, final String prefix)
     );
   }
 
+  /**
+   * Compute a cache key prefix for data sources that participate in the RHS of a join. This key prefix
+   * can be used in segment level cache or result level cache. The function can return following wrapped in an
+   * Optional
+   *  - Non-empty byte array - If there is join datasource involved and caching is possible. The result includes
+   *  join condition expression, join type and cache key returned by joinable factory for each {@link PreJoinableClause}
+   *  - NULL - There is a join but caching is not possible. It may happen if one of the participating datasource
+   *  in the JOIN is not cacheable.
+   *
+   * @throws {@link IAE} if this operation is called on a non-join data source
+   * @param dataSourceAnalysis
+   * @return

Review comment:
       Ack

##########
File path: server/src/main/java/org/apache/druid/server/coordination/ServerManager.java
##########
@@ -293,8 +300,9 @@ public ServerManager(
         queryMetrics -> queryMetrics.segment(segmentIdString)
     );
 
-    CachingQueryRunner<T> cachingQueryRunner = new CachingQueryRunner<>(
+    QueryRunner<T> queryRunner = new CachingQueryRunner<>(

Review comment:
       will revert. must have been some temporary refactoring. 

##########
File path: server/src/main/java/org/apache/druid/server/coordination/ServerManager.java
##########
@@ -197,13 +197,16 @@ public ServerManager(
     }
 
     // segmentMapFn maps each base Segment into a joined Segment if necessary.
-    final Function<SegmentReference, SegmentReference> segmentMapFn = Joinables.createSegmentMapFn(
+    final Function<SegmentReference, SegmentReference> segmentMapFn = joinables.createSegmentMapFn(
         analysis.getPreJoinableClauses(),
-        joinableFactory,
         cpuTimeAccumulator,
         analysis.getBaseQuery().orElse(query)
     );
 
+    final Optional<byte[]> cacheKeyPrefix = analysis.isJoin()

Review comment:
       I am not expecting cache key computation to be a costly operation. do you think it should still be passed via a supplier? 

##########
File path: processing/src/main/java/org/apache/druid/segment/join/table/BroadcastSegmentIndexedTable.java
##########
@@ -241,6 +244,22 @@ public ColumnSelectorFactory makeColumnSelectorFactory(ReadableOffset offset, bo
     );
   }
 
+  @Override
+  public Optional<byte[]> computeCacheKey()
+  {
+    SegmentId segmentId = segment.getId();

Review comment:
       `CacheKeyBuilder` requires a prefix byte key in its constructor and hence I was not using it everywhere. I could add some fixed byte key though not sure how much does that help. 

##########
File path: processing/src/test/java/org/apache/druid/segment/join/JoinablesTest.java
##########
@@ -190,6 +196,176 @@ public boolean isDirectlyJoinable(DataSource dataSource)
     Assert.assertNotSame(Function.identity(), segmentMapFn);
   }
 
+  @Test(expected = IAE.class)
+  public void test_computeJoinDataSourceCacheKey_noClauses()
+  {
+    DataSourceAnalysis analysis = EasyMock.mock(DataSourceAnalysis.class);
+    EasyMock.expect(analysis.getPreJoinableClauses()).andReturn(Collections.emptyList()).anyTimes();
+    EasyMock.replay(analysis);
+    Joinables joinables = new Joinables(new JoinableFactoryWithCacheKey());
+    joinables.computeJoinDataSourceCacheKey(analysis);
+  }
+
+  @Test
+  public void test_computeJoinDataSourceCacheKey_noHashJoin()
+  {
+
+    PreJoinableClause clause_1 = makeGlobalPreJoinableClause("dataSource_1", "x == \"j.x\"", "j.");
+    PreJoinableClause clause_2 = makeGlobalPreJoinableClause("dataSource_2", "x != \"h.x\"", "h.");
+    DataSourceAnalysis analysis = EasyMock.mock(DataSourceAnalysis.class);
+    EasyMock.expect(analysis.getPreJoinableClauses()).andReturn(Arrays.asList(clause_1, clause_2)).anyTimes();
+    EasyMock.replay(analysis);
+    Joinables joinables = new Joinables(new JoinableFactoryWithCacheKey());
+    Optional<byte[]> cacheKey = joinables.computeJoinDataSourceCacheKey(analysis);
+
+    Assert.assertFalse(cacheKey.isPresent());
+  }
+
+  @Test
+  public void test_computeJoinDataSourceCacheKey_cachingUnsupported()
+  {
+    PreJoinableClause clause_1 = makeGlobalPreJoinableClause("dataSource_1", "x == \"j.x\"", "j.");
+    DataSource dataSource = new LookupDataSource("lookup");
+    PreJoinableClause clause_2 = makePreJoinableClause(dataSource, "x == \"h.x\"", "h.", JoinType.LEFT);
+    DataSourceAnalysis analysis = EasyMock.mock(DataSourceAnalysis.class);
+    EasyMock.expect(analysis.getPreJoinableClauses()).andReturn(Arrays.asList(clause_1, clause_2)).anyTimes();
+    EasyMock.replay(analysis);
+    Joinables joinables = new Joinables(new JoinableFactoryWithCacheKey());
+    Optional<byte[]> cacheKey = joinables.computeJoinDataSourceCacheKey(analysis);
+
+    Assert.assertFalse(cacheKey.isPresent());
+  }
+
+  @Test
+  public void test_computeJoinDataSourceCacheKey_usableClauses()
+  {
+
+    PreJoinableClause clause_1 = makeGlobalPreJoinableClause("dataSource_1", "x == \"j.x\"", "j.");
+    PreJoinableClause clause_2 = makeGlobalPreJoinableClause("dataSource_2", "x == \"h.x\"", "h.");
+    DataSourceAnalysis analysis = EasyMock.mock(DataSourceAnalysis.class);
+    EasyMock.expect(analysis.getPreJoinableClauses()).andReturn(Arrays.asList(clause_1, clause_2)).anyTimes();
+    EasyMock.replay(analysis);
+    Joinables joinables = new Joinables(new JoinableFactoryWithCacheKey());
+    Optional<byte[]> cacheKey = joinables.computeJoinDataSourceCacheKey(analysis);
+
+    Assert.assertTrue(cacheKey.isPresent());
+  }
+
+  @Test
+  public void test_computeJoinDataSourceCacheKey_keyChangesWithExpression()
+  {
+    DataSourceAnalysis analysis = EasyMock.mock(DataSourceAnalysis.class);
+    Joinables joinables = new Joinables(new JoinableFactoryWithCacheKey());
+
+    PreJoinableClause clause_1 = makeGlobalPreJoinableClause("dataSource_1", "y == \"j.y\"", "j.");
+    EasyMock.expect(analysis.getPreJoinableClauses()).andReturn(Collections.singletonList(clause_1)).anyTimes();
+    EasyMock.replay(analysis);
+
+    Optional<byte[]> cacheKey_1 = joinables.computeJoinDataSourceCacheKey(analysis);
+    Assert.assertTrue(cacheKey_1.isPresent());
+    Assert.assertNotEquals(0, cacheKey_1.get().length);
+
+    PreJoinableClause clause_2 = makeGlobalPreJoinableClause("dataSource_1", "x == \"j.x\"", "j.");
+    EasyMock.reset(analysis);
+    EasyMock.expect(analysis.getPreJoinableClauses()).andReturn(Collections.singletonList(clause_2)).anyTimes();
+    EasyMock.replay(analysis);
+    Optional<byte[]> cacheKey_2 = joinables.computeJoinDataSourceCacheKey(analysis);
+
+    Assert.assertNotEquals(cacheKey_1, cacheKey_2);
+  }
+
+  @Test
+  public void test_computeJoinDataSourceCacheKey_keyChangesWithJoinType()
+  {
+    DataSourceAnalysis analysis = EasyMock.mock(DataSourceAnalysis.class);
+    Joinables joinables = new Joinables(new JoinableFactoryWithCacheKey());
+
+    PreJoinableClause clause_1 = makeGlobalPreJoinableClause("dataSource_1", "x == \"j.x\"", "j.", JoinType.LEFT);
+    EasyMock.expect(analysis.getPreJoinableClauses()).andReturn(Collections.singletonList(clause_1)).anyTimes();
+    EasyMock.replay(analysis);
+
+    Optional<byte[]> cacheKey_1 = joinables.computeJoinDataSourceCacheKey(analysis);
+    Assert.assertTrue(cacheKey_1.isPresent());
+    Assert.assertNotEquals(0, cacheKey_1.get().length);
+
+    PreJoinableClause clause_2 = makeGlobalPreJoinableClause("dataSource_1", "x == \"j.x\"", "j.", JoinType.INNER);
+    EasyMock.reset(analysis);
+    EasyMock.expect(analysis.getPreJoinableClauses()).andReturn(Collections.singletonList(clause_2)).anyTimes();
+    EasyMock.replay(analysis);
+    Optional<byte[]> cacheKey_2 = joinables.computeJoinDataSourceCacheKey(analysis);
+
+    Assert.assertNotEquals(cacheKey_1, cacheKey_2);
+  }
+
+  @Test
+  public void test_computeJoinDataSourceCacheKey_keyChangesWithPrefix()
+  {
+    DataSourceAnalysis analysis = EasyMock.mock(DataSourceAnalysis.class);
+    Joinables joinables = new Joinables(new JoinableFactoryWithCacheKey());
+
+    PreJoinableClause clause_1 = makeGlobalPreJoinableClause("dataSource_1", "x == \"j.x\"", "j.");
+    EasyMock.expect(analysis.getPreJoinableClauses()).andReturn(Collections.singletonList(clause_1)).anyTimes();
+    EasyMock.replay(analysis);
+
+    Optional<byte[]> cacheKey_1 = joinables.computeJoinDataSourceCacheKey(analysis);
+    Assert.assertTrue(cacheKey_1.isPresent());
+    Assert.assertNotEquals(0, cacheKey_1.get().length);
+
+    PreJoinableClause clause_2 = makeGlobalPreJoinableClause("dataSource_1", "x == \"h.x\"", "h.");
+    EasyMock.reset(analysis);
+    EasyMock.expect(analysis.getPreJoinableClauses()).andReturn(Collections.singletonList(clause_2)).anyTimes();
+    EasyMock.replay(analysis);
+    Optional<byte[]> cacheKey_2 = joinables.computeJoinDataSourceCacheKey(analysis);
+
+    Assert.assertNotEquals(cacheKey_1, cacheKey_2);
+  }
+
+  @Test
+  public void test_computeJoinDataSourceCacheKey_keyChangesWithJoinable()
+  {
+    DataSourceAnalysis analysis = EasyMock.mock(DataSourceAnalysis.class);
+    Joinables joinables = new Joinables(new JoinableFactoryWithCacheKey());
+
+    PreJoinableClause clause_1 = makeGlobalPreJoinableClause("dataSource_1", "x == \"j.x\"", "j.");
+    EasyMock.expect(analysis.getPreJoinableClauses()).andReturn(Collections.singletonList(clause_1)).anyTimes();
+    EasyMock.replay(analysis);
+
+    Optional<byte[]> cacheKey_1 = joinables.computeJoinDataSourceCacheKey(analysis);
+    Assert.assertTrue(cacheKey_1.isPresent());
+    Assert.assertNotEquals(0, cacheKey_1.get().length);
+
+    PreJoinableClause clause_2 = makeGlobalPreJoinableClause("dataSource_2", "x == \"j.x\"", "j.");
+    EasyMock.reset(analysis);
+    EasyMock.expect(analysis.getPreJoinableClauses()).andReturn(Collections.singletonList(clause_2)).anyTimes();
+    EasyMock.replay(analysis);
+    Optional<byte[]> cacheKey_2 = joinables.computeJoinDataSourceCacheKey(analysis);
+
+    Assert.assertNotEquals(cacheKey_1, cacheKey_2);
+  }
+
+  @Test
+  public void test_computeJoinDataSourceCacheKey_sameKeyForSameJoin()
+  {
+    DataSourceAnalysis analysis = EasyMock.mock(DataSourceAnalysis.class);
+    Joinables joinables = new Joinables(new JoinableFactoryWithCacheKey());
+
+    PreJoinableClause clause_1 = makeGlobalPreJoinableClause("dataSource_1", "x == \"j.x\"", "j.");
+    EasyMock.expect(analysis.getPreJoinableClauses()).andReturn(Collections.singletonList(clause_1)).anyTimes();
+    EasyMock.replay(analysis);
+
+    Optional<byte[]> cacheKey_1 = joinables.computeJoinDataSourceCacheKey(analysis);
+    Assert.assertTrue(cacheKey_1.isPresent());
+    Assert.assertNotEquals(0, cacheKey_1.get().length);
+
+    PreJoinableClause clause_2 = makeGlobalPreJoinableClause("dataSource_1", "x == \"j.x\"", "j.");
+    EasyMock.reset(analysis);
+    EasyMock.expect(analysis.getPreJoinableClauses()).andReturn(Collections.singletonList(clause_2)).anyTimes();
+    EasyMock.replay(analysis);
+    Optional<byte[]> cacheKey_2 = joinables.computeJoinDataSourceCacheKey(analysis);
+
+    Assert.assertNotEquals(cacheKey_1, cacheKey_2);

Review comment:
       oops. Thanks for catching this. 

##########
File path: processing/src/main/java/org/apache/druid/segment/join/Joinables.java
##########
@@ -118,6 +112,74 @@ public static boolean isPrefixedBy(final String columnName, final String prefix)
     );
   }
 
+  /**
+   * Compute a cache key prefix for data sources that participate in the RHS of a join. This key prefix
+   * can be used in segment level cache or result level cache. The function can return following wrapped in an
+   * Optional
+   *  - Non-empty byte array - If there is join datasource involved and caching is possible. The result includes
+   *  join condition expression, join type and cache key returned by joinable factory for each {@link PreJoinableClause}
+   *  - NULL - There is a join but caching is not possible. It may happen if one of the participating datasource
+   *  in the JOIN is not cacheable.
+   *
+   * @throws {@link IAE} if this operation is called on a non-join data source
+   * @param dataSourceAnalysis
+   * @return
+   */
+  public Optional<byte[]> computeJoinDataSourceCacheKey(
+      final DataSourceAnalysis dataSourceAnalysis
+  )
+  {
+
+    final List<PreJoinableClause> clauses = dataSourceAnalysis.getPreJoinableClauses();
+    if (clauses.isEmpty()) {
+      throw new IAE("No join clauses to build the cache key");

Review comment:
       While I am going to add `dataSourceAnalysis.getDataSource()` to the exception string, I just hope it doesn't blow up the message. On a quick look, it looks alright though. 

##########
File path: processing/src/test/java/org/apache/druid/segment/join/MapJoinableFactoryTest.java
##########
@@ -83,6 +84,36 @@ public void testBuildDataSourceIsRegisteredShouldReturnJoinableFromFactory()
     EasyMock.replay(noopJoinableFactory);
     Optional<Joinable> joinable = target.build(noopDataSource, condition);
     Assert.assertEquals(mockJoinable, joinable.get());
+
+  }
+
+  @Test
+  public void testComputeJoinCacheKey()
+  {
+    Optional<byte[]> expected = Optional.of(new byte[]{1, 2, 3});
+    EasyMock.expect(noopJoinableFactory.computeJoinCacheKey(noopDataSource)).andReturn(expected);
+    EasyMock.replay(noopJoinableFactory);
+    Optional<byte[]> actual = target.computeJoinCacheKey(noopDataSource);
+    Assert.assertEquals(expected, actual);

Review comment:
       Replaced with `Assert.assertSame`




----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscribe@druid.apache.org
For additional commands, e-mail: commits-help@druid.apache.org