You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@druid.apache.org by GitBox <gi...@apache.org> on 2020/09/25 01:16:39 UTC

[GitHub] [druid] gianm commented on a change in pull request #10366: Add caching support to join queries

gianm commented on a change in pull request #10366:
URL: https://github.com/apache/druid/pull/10366#discussion_r494681260



##########
File path: processing/src/main/java/org/apache/druid/segment/join/table/IndexedTable.java
##########
@@ -87,6 +88,26 @@ default ColumnSelectorFactory makeColumnSelectorFactory(ReadableOffset offset, b
     return null;
   }
 
+  /**
+   * Computes a {@code byte[]} key for the table that can be used for computing cache keys for join operations.
+   * see {@link org.apache.druid.segment.join.JoinableFactory#computeJoinCacheKey}
+   *
+   * @return the byte array for cache key
+   * @throws {@link IAE} if caching is not supported
+   */
+  default byte[] computeCacheKey()

Review comment:
       Consider naming this `getCacheKey()` and extending Cacheable. That interface is mainly used by CacheKeyBuilder, which it doesn't look like we actually need here, but it's still nice to use that interface for things that can generate their own cache keys.

##########
File path: server/src/main/java/org/apache/druid/client/CachingClusteredClient.java
##########
@@ -770,4 +749,102 @@ private void addSequencesFromServer(
           .flatMerge(seq -> seq, query.getResultOrdering());
     }
   }
+
+  /**
+   * An inner class that is used solely for computing cache keys. Its a separate class to allow extensive unit testing
+   * of cache key generation.
+   */
+  @VisibleForTesting
+  static class CacheKeyManager<T>
+  {
+    private final Query<T> query;
+    private final CacheStrategy<T, Object, Query<T>> strategy;
+    private final DataSourceAnalysis dataSourceAnalysis;
+    private final Joinables joinables;
+    private final boolean isSegmentLevelCachingEnable;
+
+    CacheKeyManager(
+        final Query<T> query,
+        final CacheStrategy<T, Object, Query<T>> strategy,
+        final boolean useCache,
+        final boolean populateCache,
+        final DataSourceAnalysis dataSourceAnalysis,
+        final Joinables joinables
+    )
+    {
+
+      this.query = query;
+      this.strategy = strategy;
+      this.dataSourceAnalysis = dataSourceAnalysis;
+      this.joinables = joinables;
+      this.isSegmentLevelCachingEnable = ((populateCache || useCache)
+                                          && !QueryContexts.isBySegment(query));   // explicit bySegment queries are never cached
+
+    }
+
+    @Nullable
+    byte[] computeSegmentLevelQueryCacheKey()
+    {
+      if (isSegmentLevelCachingEnable) {
+        return computeQueryCacheKeyWithJoin();
+      }
+      return null;
+    }
+
+    /**
+     * It computes the ETAG which is used by {@link org.apache.druid.query.ResultLevelCachingQueryRunner} for
+     * result level caches. queryCacheKey can be null if segment level cache is not being used. However, ETAG
+     * is still computed since result level cache may still be on.
+     */
+    @Nullable
+    String computeResultLevelCachingEtag(
+        final Set<SegmentServerSelector> segments,
+        @Nullable byte[] queryCacheKey
+    )
+    {
+      Hasher hasher = Hashing.sha1().newHasher();
+      boolean hasOnlyHistoricalSegments = true;
+      for (SegmentServerSelector p : segments) {
+        if (!p.getServer().pick().getServer().isSegmentReplicationTarget()) {
+          hasOnlyHistoricalSegments = false;
+          break;
+        }
+        hasher.putString(p.getServer().getSegment().getId().toString(), StandardCharsets.UTF_8);
+        // it is important to add the "query interval" as part ETag calculation
+        // to have result level cache work correctly for queries with different
+        // intervals covering the same set of segments
+        hasher.putString(p.rhs.getInterval().toString(), StandardCharsets.UTF_8);
+      }
+
+      if (!hasOnlyHistoricalSegments) {
+        return null;
+      }
+
+      // query cache key can be null if segment level caching is disabled
+      final byte[] queryCacheKeyFinal = (queryCacheKey == null) ? computeQueryCacheKeyWithJoin() : queryCacheKey;
+      if (queryCacheKeyFinal == null) {
+        return null;
+      }
+      hasher.putBytes(queryCacheKeyFinal);
+      String currEtag = StringUtils.encodeBase64String(hasher.hash().asBytes());
+      return currEtag;
+    }
+
+    /**
+     * Adds the cache key prefix for join data sources. Return null if its a join but caching is not supported
+     */
+    @Nullable
+    private byte[] computeQueryCacheKeyWithJoin()
+    {
+      assert strategy != null;  // implies strategy != null

Review comment:
       I think you just copied this from somewhere else, but IMO it would be better as a precondition check (i.e. throw ISE or NPE if it fails) instead of an assertion.
   
   Generally we'll use assertions to note things that are meant to be _impossible_, and precondition checks for things that are possible but are incorrect usage. Since it is possible to create this CacheKeyManager class with a null `strategy` and then call `computeQueryCacheKeyWithJoin`, it'd be better for this to be a precondition check.

##########
File path: processing/src/main/java/org/apache/druid/segment/join/table/BroadcastSegmentIndexedTable.java
##########
@@ -241,6 +244,22 @@ public ColumnSelectorFactory makeColumnSelectorFactory(ReadableOffset offset, bo
     );
   }
 
+  @Override
+  public Optional<byte[]> computeCacheKey()
+  {
+    SegmentId segmentId = segment.getId();

Review comment:
       I think there actually should be a prefix byte here, because the prefix bytes' purpose is to prevent cache key collisions for two implementations that are different but compute cache keys the same way. So each implementation should have its own prefix byte.
   
   I don't have a strong opinion on where the code should live, but it would be good to share it somehow, if that's not too much trouble.

##########
File path: server/src/main/java/org/apache/druid/client/CachingClusteredClient.java
##########
@@ -293,6 +301,14 @@ private ClusterQueryResult(Sequence<T> sequence, int numQueryServers)
       this.intervals = dataSourceAnalysis.getBaseQuerySegmentSpec()
                                          .map(QuerySegmentSpec::getIntervals)
                                          .orElseGet(() -> query.getIntervals());
+      this.cacheKeyManager = new CacheKeyManager<>(

Review comment:
       I don't think the concept of computing cache keys in CachingClusteredClient works properly here, because of the following scenario:
   
   1. There is a broadcast table that we'll be joining against.
   2. The Broker (which runs CCC) updates its broadcast table to a newer version.
   3. The Broker gets a query and uses the newer version in the cache key.
   4. It fans out the query to a data server that hasn't got the newest broadcast table yet.
   5. The data server returns results for the older table, and the Broker caches them.
   6. Now, the Broker's cache is wrong (it refers to older data with a newer key).
   
   The solution that comes to mind is that the data servers should keep both the old and new version around for a bit when they swap them out, and the Broker should send the specific version that it wants them to use, so it can be sure it's getting the right one. But this is out of the scope of this PR. For now I suggest not implementing caching for join datasources on the Broker. (Broker caching is off by default, anyway, so it's not the end of the world.)

##########
File path: processing/src/main/java/org/apache/druid/segment/join/Joinables.java
##########
@@ -35,57 +39,47 @@
 import javax.annotation.Nullable;
 import java.util.Comparator;
 import java.util.List;
+import java.util.Optional;
 import java.util.concurrent.atomic.AtomicLong;
 import java.util.function.Function;
 
 /**
- * Utility methods for working with {@link Joinable} related classes.
+ * A wrapper class over {@link JoinableFactory} for working with {@link Joinable} related classes.

Review comment:
       I think it'd be better to make the change now, since "Joinables" is meant to be a non-constructible holder of utility functions, and so making it constructible is unnecessarily changing its character. As to cluttering the PR, IMO adding a new class is likely to yield a _less_ cluttered PR, because it won't involve changes to the pre-existing utility class.

##########
File path: server/src/main/java/org/apache/druid/client/CachingQueryRunner.java
##########
@@ -77,20 +83,15 @@ public CachingQueryRunner(
   {
     Query<T> query = queryPlus.getQuery();
     final CacheStrategy strategy = toolChest.getCacheStrategy(query);
-    final boolean populateCache = CacheUtil.isPopulateSegmentCache(
-        query,
-        strategy,
-        cacheConfig,
-        CacheUtil.ServerType.DATA
-    );
-    final boolean useCache = CacheUtil.isUseSegmentCache(query, strategy, cacheConfig, CacheUtil.ServerType.DATA);
+    final boolean populateCache = canPopulateCache(query, strategy);
+    final boolean useCache = canUseCache(query, strategy);
 
     final Cache.NamedKey key;
-    if (strategy != null && (useCache || populateCache)) {

Review comment:
       Is it not possible for `strategy` to be null?

##########
File path: processing/src/main/java/org/apache/druid/segment/join/Joinables.java
##########
@@ -118,6 +107,68 @@ public static boolean isPrefixedBy(final String columnName, final String prefix)
     );
   }
 
+  /**
+   * Compute a cache key prefix for data sources that participate in the RHS of a join. This key prefix
+   * can be used in segment level cache or result level cache. The function can return following wrapped in an
+   * Optional
+   *  - Non-empty byte array - If there is join datasource involved and caching is possible. The result includes
+   *  join condition expression, join type and cache key returned by joinable factory for each {@link PreJoinableClause}
+   *  - NULL - There is a join but caching is not possible. It may happen if one of the participating datasource
+   *  in the JOIN is not cacheable.
+   *
+   * @throws {@link IAE} if this operation is called on a non-join data source
+   * @param dataSourceAnalysis for the join datasource
+   * @return the optional cache key to be used as part of query cache key
+   */
+  public Optional<byte[]> computeJoinDataSourceCacheKey(
+      final DataSourceAnalysis dataSourceAnalysis
+  )
+  {
+    final List<PreJoinableClause> clauses = dataSourceAnalysis.getPreJoinableClauses();
+    if (clauses.isEmpty()) {
+      throw new IAE("No join clauses to build the cache key for data source [%s]", dataSourceAnalysis.getDataSource());
+    }
+
+    final CacheKeyBuilder keyBuilder;
+    keyBuilder = new CacheKeyBuilder(JOIN_OPERATION);
+    for (PreJoinableClause clause : clauses) {
+      Optional<byte[]> bytes = joinableFactory.computeJoinCacheKey(clause.getDataSource(), clause.getCondition());
+      if (!bytes.isPresent()) {
+        // Encountered a data source which didn't support cache yet
+        log.debug("skipping caching for join since [%s] does not support caching", clause.getDataSource());
+        return Optional.empty();
+      }
+      keyBuilder.appendByteArray(bytes.get());
+      keyBuilder.appendString(clause.getCondition().getOriginalExpression());
+      keyBuilder.appendString(clause.getJoinType().name());

Review comment:
       The clause's prefix is important too, because it controls the names of the columns.




----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscribe@druid.apache.org
For additional commands, e-mail: commits-help@druid.apache.org