You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@druid.apache.org by GitBox <gi...@apache.org> on 2020/09/19 23:37:29 UTC

[GitHub] [druid] jihoonson commented on a change in pull request #10366: Add caching support to join queries

jihoonson commented on a change in pull request #10366:
URL: https://github.com/apache/druid/pull/10366#discussion_r491217786



##########
File path: processing/src/main/java/org/apache/druid/segment/join/Joinables.java
##########
@@ -35,57 +39,47 @@
 import javax.annotation.Nullable;
 import java.util.Comparator;
 import java.util.List;
+import java.util.Optional;
 import java.util.concurrent.atomic.AtomicLong;
 import java.util.function.Function;
 
 /**
- * Utility methods for working with {@link Joinable} related classes.
+ * A wrapper class over {@link JoinableFactory} for working with {@link Joinable} related classes.

Review comment:
       Using a plural name is a convention for utility classes like `Collections`. Maybe we should rename it if necessary since this class will not be a simple utility class anymore. But, does this class need to be a wrapper class? It doesn't seem like so. Also, some static util methods and non-static util methods are mixed here which seems confusing. Can we keep this class as a non-instantiable utility class?

##########
File path: processing/src/main/java/org/apache/druid/segment/join/JoinableFactory.java
##########
@@ -43,8 +43,18 @@
    *
    * @param dataSource the datasource to join on
    * @param condition  the condition to join on
-   *
    * @return a Joinable if this datasource + condition combo is joinable; empty if not
    */
   Optional<Joinable> build(DataSource dataSource, JoinConditionAnalysis condition);
+
+  /**
+   * Compute the cache key for a data source participating in join operation. This is done separately from {{@link #build(DataSource, JoinConditionAnalysis)}}
+   * which can be an expensive operation and can potentially be avoided if cached results can be used.
+   *
+   * @param dataSource the datasource to join on
+   */
+  default Optional<byte[]> computeJoinCacheKey(DataSource dataSource)

Review comment:
       Per contract of `JoinableFactory`, `build()` can return an `Optional.empty()` even when it is `directlyJoinable`. I think `computeJoinCacheKey()` should match with `build()`; it should return a computed cache key only when `build()` returns a `Joinable`.
   
   As you mentioned at https://github.com/apache/druid/pull/10366#discussion_r487695326, perhaps `canHashJoin()` could be checked on the caller side. I don't see why not for now except that you should also change the `build()` method and its callers to do the same. However, I'm not sure if that's better. That means, I'm not sure if this interface or the interface you may change makes sense for other join algorithms we will add in the future. I would suggest to keep this interface until we come up with a good one.

##########
File path: server/src/main/java/org/apache/druid/segment/realtime/appenderator/SinkQuerySegmentWalker.java
##########
@@ -224,8 +228,9 @@ public SinkQuerySegmentWalker(
                       // 1) Only use caching if data is immutable
                       // 2) Hydrants are not the same between replicas, make sure cache is local
                       if (hydrantDefinitelySwapped && cache.isLocal()) {
-                        runner = new CachingQueryRunner<>(
+                        runner = new CachingQueryRunner<T>(

Review comment:
       Type argument is not required.

##########
File path: processing/src/main/java/org/apache/druid/segment/join/Joinables.java
##########
@@ -118,6 +112,74 @@ public static boolean isPrefixedBy(final String columnName, final String prefix)
     );
   }
 
+  /**
+   * Compute a cache key prefix for data sources that participate in the RHS of a join. This key prefix
+   * can be used in segment level cache or result level cache. The function can return following wrapped in an
+   * Optional
+   *  - Non-empty byte array - If there is join datasource involved and caching is possible. The result includes
+   *  join condition expression, join type and cache key returned by joinable factory for each {@link PreJoinableClause}
+   *  - NULL - There is a join but caching is not possible. It may happen if one of the participating datasource
+   *  in the JOIN is not cacheable.
+   *
+   * @throws {@link IAE} if this operation is called on a non-join data source
+   * @param dataSourceAnalysis
+   * @return

Review comment:
       Please remove `param` and `return` tags if they don't need descriptions.

##########
File path: server/src/main/java/org/apache/druid/server/coordination/ServerManager.java
##########
@@ -197,13 +197,16 @@ public ServerManager(
     }
 
     // segmentMapFn maps each base Segment into a joined Segment if necessary.
-    final Function<SegmentReference, SegmentReference> segmentMapFn = Joinables.createSegmentMapFn(
+    final Function<SegmentReference, SegmentReference> segmentMapFn = joinables.createSegmentMapFn(
         analysis.getPreJoinableClauses(),
-        joinableFactory,
         cpuTimeAccumulator,
         analysis.getBaseQuery().orElse(query)
     );
 
+    final Optional<byte[]> cacheKeyPrefix = analysis.isJoin()

Review comment:
       How about supplying a `Supplier<Optional<byte[]>>` to the `CachingQueryRunner`? Then, computing `cacheKeyPrefix` can be done only when `useCache` or `populateCache` is true which seems more legit.

##########
File path: processing/src/main/java/org/apache/druid/segment/join/Joinables.java
##########
@@ -118,6 +112,74 @@ public static boolean isPrefixedBy(final String columnName, final String prefix)
     );
   }
 
+  /**
+   * Compute a cache key prefix for data sources that participate in the RHS of a join. This key prefix
+   * can be used in segment level cache or result level cache. The function can return following wrapped in an
+   * Optional
+   *  - Non-empty byte array - If there is join datasource involved and caching is possible. The result includes
+   *  join condition expression, join type and cache key returned by joinable factory for each {@link PreJoinableClause}
+   *  - NULL - There is a join but caching is not possible. It may happen if one of the participating datasource
+   *  in the JOIN is not cacheable.
+   *
+   * @throws {@link IAE} if this operation is called on a non-join data source
+   * @param dataSourceAnalysis
+   * @return
+   */
+  public Optional<byte[]> computeJoinDataSourceCacheKey(
+      final DataSourceAnalysis dataSourceAnalysis
+  )
+  {
+
+    final List<PreJoinableClause> clauses = dataSourceAnalysis.getPreJoinableClauses();
+    if (clauses.isEmpty()) {
+      throw new IAE("No join clauses to build the cache key");

Review comment:
       Please include useful information in the error message such as datasource.

##########
File path: processing/src/main/java/org/apache/druid/segment/join/Joinables.java
##########
@@ -118,6 +112,74 @@ public static boolean isPrefixedBy(final String columnName, final String prefix)
     );
   }
 
+  /**
+   * Compute a cache key prefix for data sources that participate in the RHS of a join. This key prefix
+   * can be used in segment level cache or result level cache. The function can return following wrapped in an
+   * Optional
+   *  - Non-empty byte array - If there is join datasource involved and caching is possible. The result includes
+   *  join condition expression, join type and cache key returned by joinable factory for each {@link PreJoinableClause}
+   *  - NULL - There is a join but caching is not possible. It may happen if one of the participating datasource
+   *  in the JOIN is not cacheable.
+   *
+   * @throws {@link IAE} if this operation is called on a non-join data source
+   * @param dataSourceAnalysis
+   * @return
+   */
+  public Optional<byte[]> computeJoinDataSourceCacheKey(
+      final DataSourceAnalysis dataSourceAnalysis
+  )
+  {
+
+    final List<PreJoinableClause> clauses = dataSourceAnalysis.getPreJoinableClauses();
+    if (clauses.isEmpty()) {
+      throw new IAE("No join clauses to build the cache key");
+    }
+
+    final CacheKeyBuilder keyBuilder;
+    keyBuilder = new CacheKeyBuilder(JOIN_OPERATION);
+    for (PreJoinableClause clause : clauses) {
+      if (!clause.getCondition().canHashJoin()) {
+        log.debug("skipping caching for join since [%s] does not support hash-join", clause.getCondition());
+        return Optional.empty();
+      }
+      Optional<byte[]> bytes = joinableFactory.computeJoinCacheKey(clause.getDataSource());
+      if (!bytes.isPresent()) {
+        // Encountered a data source which didn't support cache yet
+        log.debug("skipping caching for join since [%s] does not support caching", clause.getDataSource());
+        return Optional.empty();
+      }
+      keyBuilder.appendByteArray(bytes.get());
+      keyBuilder.appendString(clause.getPrefix());    //TODO - prefix shouldn't be required IMO

Review comment:
       Yeah, it doesn't seem necessary for now.

##########
File path: processing/src/main/java/org/apache/druid/segment/join/table/RowBasedIndexedTable.java
##########
@@ -52,6 +54,8 @@
   private final List<Function<RowType, Object>> columnFunctions;
   private final Set<String> keyColumns;
   private final String version;
+  @Nullable
+  private final byte[] cacheKey;

Review comment:
       Would you please add some comment on this variable? Since the `cacheKey` is not set in anywhere, people would not understand when it can be null or even how it could be set until they read this PR.

##########
File path: processing/src/test/java/org/apache/druid/segment/join/JoinablesTest.java
##########
@@ -190,6 +196,176 @@ public boolean isDirectlyJoinable(DataSource dataSource)
     Assert.assertNotSame(Function.identity(), segmentMapFn);
   }
 
+  @Test(expected = IAE.class)

Review comment:
       Please use `expectedException` and verifies the error message as well.

##########
File path: processing/src/test/java/org/apache/druid/segment/join/MapJoinableFactoryTest.java
##########
@@ -83,6 +84,36 @@ public void testBuildDataSourceIsRegisteredShouldReturnJoinableFromFactory()
     EasyMock.replay(noopJoinableFactory);
     Optional<Joinable> joinable = target.build(noopDataSource, condition);
     Assert.assertEquals(mockJoinable, joinable.get());
+
+  }
+
+  @Test
+  public void testComputeJoinCacheKey()
+  {
+    Optional<byte[]> expected = Optional.of(new byte[]{1, 2, 3});
+    EasyMock.expect(noopJoinableFactory.computeJoinCacheKey(noopDataSource)).andReturn(expected);
+    EasyMock.replay(noopJoinableFactory);
+    Optional<byte[]> actual = target.computeJoinCacheKey(noopDataSource);
+    Assert.assertEquals(expected, actual);
+  }
+
+  @Test(expected = ISE.class)

Review comment:
       Please use `expectedException` and verify the error message as well.

##########
File path: processing/src/test/java/org/apache/druid/segment/join/JoinablesTest.java
##########
@@ -190,6 +196,176 @@ public boolean isDirectlyJoinable(DataSource dataSource)
     Assert.assertNotSame(Function.identity(), segmentMapFn);
   }
 
+  @Test(expected = IAE.class)
+  public void test_computeJoinDataSourceCacheKey_noClauses()
+  {
+    DataSourceAnalysis analysis = EasyMock.mock(DataSourceAnalysis.class);
+    EasyMock.expect(analysis.getPreJoinableClauses()).andReturn(Collections.emptyList()).anyTimes();
+    EasyMock.replay(analysis);
+    Joinables joinables = new Joinables(new JoinableFactoryWithCacheKey());
+    joinables.computeJoinDataSourceCacheKey(analysis);
+  }
+
+  @Test
+  public void test_computeJoinDataSourceCacheKey_noHashJoin()
+  {
+
+    PreJoinableClause clause_1 = makeGlobalPreJoinableClause("dataSource_1", "x == \"j.x\"", "j.");
+    PreJoinableClause clause_2 = makeGlobalPreJoinableClause("dataSource_2", "x != \"h.x\"", "h.");
+    DataSourceAnalysis analysis = EasyMock.mock(DataSourceAnalysis.class);
+    EasyMock.expect(analysis.getPreJoinableClauses()).andReturn(Arrays.asList(clause_1, clause_2)).anyTimes();
+    EasyMock.replay(analysis);
+    Joinables joinables = new Joinables(new JoinableFactoryWithCacheKey());
+    Optional<byte[]> cacheKey = joinables.computeJoinDataSourceCacheKey(analysis);
+
+    Assert.assertFalse(cacheKey.isPresent());
+  }
+
+  @Test
+  public void test_computeJoinDataSourceCacheKey_cachingUnsupported()
+  {
+    PreJoinableClause clause_1 = makeGlobalPreJoinableClause("dataSource_1", "x == \"j.x\"", "j.");
+    DataSource dataSource = new LookupDataSource("lookup");
+    PreJoinableClause clause_2 = makePreJoinableClause(dataSource, "x == \"h.x\"", "h.", JoinType.LEFT);
+    DataSourceAnalysis analysis = EasyMock.mock(DataSourceAnalysis.class);
+    EasyMock.expect(analysis.getPreJoinableClauses()).andReturn(Arrays.asList(clause_1, clause_2)).anyTimes();
+    EasyMock.replay(analysis);
+    Joinables joinables = new Joinables(new JoinableFactoryWithCacheKey());
+    Optional<byte[]> cacheKey = joinables.computeJoinDataSourceCacheKey(analysis);
+
+    Assert.assertFalse(cacheKey.isPresent());
+  }
+
+  @Test
+  public void test_computeJoinDataSourceCacheKey_usableClauses()
+  {
+
+    PreJoinableClause clause_1 = makeGlobalPreJoinableClause("dataSource_1", "x == \"j.x\"", "j.");
+    PreJoinableClause clause_2 = makeGlobalPreJoinableClause("dataSource_2", "x == \"h.x\"", "h.");
+    DataSourceAnalysis analysis = EasyMock.mock(DataSourceAnalysis.class);
+    EasyMock.expect(analysis.getPreJoinableClauses()).andReturn(Arrays.asList(clause_1, clause_2)).anyTimes();
+    EasyMock.replay(analysis);
+    Joinables joinables = new Joinables(new JoinableFactoryWithCacheKey());
+    Optional<byte[]> cacheKey = joinables.computeJoinDataSourceCacheKey(analysis);
+
+    Assert.assertTrue(cacheKey.isPresent());
+  }
+
+  @Test
+  public void test_computeJoinDataSourceCacheKey_keyChangesWithExpression()
+  {
+    DataSourceAnalysis analysis = EasyMock.mock(DataSourceAnalysis.class);
+    Joinables joinables = new Joinables(new JoinableFactoryWithCacheKey());
+
+    PreJoinableClause clause_1 = makeGlobalPreJoinableClause("dataSource_1", "y == \"j.y\"", "j.");
+    EasyMock.expect(analysis.getPreJoinableClauses()).andReturn(Collections.singletonList(clause_1)).anyTimes();
+    EasyMock.replay(analysis);
+
+    Optional<byte[]> cacheKey_1 = joinables.computeJoinDataSourceCacheKey(analysis);
+    Assert.assertTrue(cacheKey_1.isPresent());
+    Assert.assertNotEquals(0, cacheKey_1.get().length);
+
+    PreJoinableClause clause_2 = makeGlobalPreJoinableClause("dataSource_1", "x == \"j.x\"", "j.");
+    EasyMock.reset(analysis);
+    EasyMock.expect(analysis.getPreJoinableClauses()).andReturn(Collections.singletonList(clause_2)).anyTimes();
+    EasyMock.replay(analysis);
+    Optional<byte[]> cacheKey_2 = joinables.computeJoinDataSourceCacheKey(analysis);
+
+    Assert.assertNotEquals(cacheKey_1, cacheKey_2);
+  }
+
+  @Test
+  public void test_computeJoinDataSourceCacheKey_keyChangesWithJoinType()
+  {
+    DataSourceAnalysis analysis = EasyMock.mock(DataSourceAnalysis.class);
+    Joinables joinables = new Joinables(new JoinableFactoryWithCacheKey());
+
+    PreJoinableClause clause_1 = makeGlobalPreJoinableClause("dataSource_1", "x == \"j.x\"", "j.", JoinType.LEFT);
+    EasyMock.expect(analysis.getPreJoinableClauses()).andReturn(Collections.singletonList(clause_1)).anyTimes();
+    EasyMock.replay(analysis);
+
+    Optional<byte[]> cacheKey_1 = joinables.computeJoinDataSourceCacheKey(analysis);
+    Assert.assertTrue(cacheKey_1.isPresent());
+    Assert.assertNotEquals(0, cacheKey_1.get().length);
+
+    PreJoinableClause clause_2 = makeGlobalPreJoinableClause("dataSource_1", "x == \"j.x\"", "j.", JoinType.INNER);
+    EasyMock.reset(analysis);
+    EasyMock.expect(analysis.getPreJoinableClauses()).andReturn(Collections.singletonList(clause_2)).anyTimes();
+    EasyMock.replay(analysis);
+    Optional<byte[]> cacheKey_2 = joinables.computeJoinDataSourceCacheKey(analysis);
+
+    Assert.assertNotEquals(cacheKey_1, cacheKey_2);
+  }
+
+  @Test
+  public void test_computeJoinDataSourceCacheKey_keyChangesWithPrefix()
+  {
+    DataSourceAnalysis analysis = EasyMock.mock(DataSourceAnalysis.class);
+    Joinables joinables = new Joinables(new JoinableFactoryWithCacheKey());
+
+    PreJoinableClause clause_1 = makeGlobalPreJoinableClause("dataSource_1", "x == \"j.x\"", "j.");
+    EasyMock.expect(analysis.getPreJoinableClauses()).andReturn(Collections.singletonList(clause_1)).anyTimes();
+    EasyMock.replay(analysis);
+
+    Optional<byte[]> cacheKey_1 = joinables.computeJoinDataSourceCacheKey(analysis);
+    Assert.assertTrue(cacheKey_1.isPresent());
+    Assert.assertNotEquals(0, cacheKey_1.get().length);
+
+    PreJoinableClause clause_2 = makeGlobalPreJoinableClause("dataSource_1", "x == \"h.x\"", "h.");
+    EasyMock.reset(analysis);
+    EasyMock.expect(analysis.getPreJoinableClauses()).andReturn(Collections.singletonList(clause_2)).anyTimes();
+    EasyMock.replay(analysis);
+    Optional<byte[]> cacheKey_2 = joinables.computeJoinDataSourceCacheKey(analysis);
+
+    Assert.assertNotEquals(cacheKey_1, cacheKey_2);
+  }
+
+  @Test
+  public void test_computeJoinDataSourceCacheKey_keyChangesWithJoinable()
+  {
+    DataSourceAnalysis analysis = EasyMock.mock(DataSourceAnalysis.class);
+    Joinables joinables = new Joinables(new JoinableFactoryWithCacheKey());
+
+    PreJoinableClause clause_1 = makeGlobalPreJoinableClause("dataSource_1", "x == \"j.x\"", "j.");
+    EasyMock.expect(analysis.getPreJoinableClauses()).andReturn(Collections.singletonList(clause_1)).anyTimes();
+    EasyMock.replay(analysis);
+
+    Optional<byte[]> cacheKey_1 = joinables.computeJoinDataSourceCacheKey(analysis);
+    Assert.assertTrue(cacheKey_1.isPresent());
+    Assert.assertNotEquals(0, cacheKey_1.get().length);
+
+    PreJoinableClause clause_2 = makeGlobalPreJoinableClause("dataSource_2", "x == \"j.x\"", "j.");
+    EasyMock.reset(analysis);
+    EasyMock.expect(analysis.getPreJoinableClauses()).andReturn(Collections.singletonList(clause_2)).anyTimes();
+    EasyMock.replay(analysis);
+    Optional<byte[]> cacheKey_2 = joinables.computeJoinDataSourceCacheKey(analysis);
+
+    Assert.assertNotEquals(cacheKey_1, cacheKey_2);
+  }
+
+  @Test
+  public void test_computeJoinDataSourceCacheKey_sameKeyForSameJoin()
+  {
+    DataSourceAnalysis analysis = EasyMock.mock(DataSourceAnalysis.class);
+    Joinables joinables = new Joinables(new JoinableFactoryWithCacheKey());
+
+    PreJoinableClause clause_1 = makeGlobalPreJoinableClause("dataSource_1", "x == \"j.x\"", "j.");
+    EasyMock.expect(analysis.getPreJoinableClauses()).andReturn(Collections.singletonList(clause_1)).anyTimes();
+    EasyMock.replay(analysis);
+
+    Optional<byte[]> cacheKey_1 = joinables.computeJoinDataSourceCacheKey(analysis);
+    Assert.assertTrue(cacheKey_1.isPresent());
+    Assert.assertNotEquals(0, cacheKey_1.get().length);
+
+    PreJoinableClause clause_2 = makeGlobalPreJoinableClause("dataSource_1", "x == \"j.x\"", "j.");
+    EasyMock.reset(analysis);
+    EasyMock.expect(analysis.getPreJoinableClauses()).andReturn(Collections.singletonList(clause_2)).anyTimes();
+    EasyMock.replay(analysis);
+    Optional<byte[]> cacheKey_2 = joinables.computeJoinDataSourceCacheKey(analysis);
+
+    Assert.assertNotEquals(cacheKey_1, cacheKey_2);

Review comment:
       Should this be `Assert.assertArrayEquals(cacheKey_1.get(), cacheKey_2.get());`?

##########
File path: processing/src/test/java/org/apache/druid/segment/join/MapJoinableFactoryTest.java
##########
@@ -83,6 +84,36 @@ public void testBuildDataSourceIsRegisteredShouldReturnJoinableFromFactory()
     EasyMock.replay(noopJoinableFactory);
     Optional<Joinable> joinable = target.build(noopDataSource, condition);
     Assert.assertEquals(mockJoinable, joinable.get());
+
+  }
+
+  @Test
+  public void testComputeJoinCacheKey()
+  {
+    Optional<byte[]> expected = Optional.of(new byte[]{1, 2, 3});
+    EasyMock.expect(noopJoinableFactory.computeJoinCacheKey(noopDataSource)).andReturn(expected);
+    EasyMock.replay(noopJoinableFactory);
+    Optional<byte[]> actual = target.computeJoinCacheKey(noopDataSource);
+    Assert.assertEquals(expected, actual);

Review comment:
       Should use `Assert.assertArrayEquals()`.

##########
File path: processing/src/test/java/org/apache/druid/segment/join/JoinablesTest.java
##########
@@ -190,6 +196,176 @@ public boolean isDirectlyJoinable(DataSource dataSource)
     Assert.assertNotSame(Function.identity(), segmentMapFn);
   }
 
+  @Test(expected = IAE.class)
+  public void test_computeJoinDataSourceCacheKey_noClauses()
+  {
+    DataSourceAnalysis analysis = EasyMock.mock(DataSourceAnalysis.class);
+    EasyMock.expect(analysis.getPreJoinableClauses()).andReturn(Collections.emptyList()).anyTimes();
+    EasyMock.replay(analysis);
+    Joinables joinables = new Joinables(new JoinableFactoryWithCacheKey());
+    joinables.computeJoinDataSourceCacheKey(analysis);
+  }
+
+  @Test
+  public void test_computeJoinDataSourceCacheKey_noHashJoin()
+  {
+
+    PreJoinableClause clause_1 = makeGlobalPreJoinableClause("dataSource_1", "x == \"j.x\"", "j.");

Review comment:
       We don't use underscore in variable names. Please rename all variables containing underscores.

##########
File path: processing/src/main/java/org/apache/druid/segment/join/Joinables.java
##########
@@ -118,6 +112,74 @@ public static boolean isPrefixedBy(final String columnName, final String prefix)
     );
   }
 
+  /**
+   * Compute a cache key prefix for data sources that participate in the RHS of a join. This key prefix
+   * can be used in segment level cache or result level cache. The function can return following wrapped in an
+   * Optional
+   *  - Non-empty byte array - If there is join datasource involved and caching is possible. The result includes
+   *  join condition expression, join type and cache key returned by joinable factory for each {@link PreJoinableClause}
+   *  - NULL - There is a join but caching is not possible. It may happen if one of the participating datasource
+   *  in the JOIN is not cacheable.
+   *
+   * @throws {@link IAE} if this operation is called on a non-join data source
+   * @param dataSourceAnalysis
+   * @return

Review comment:
       I found a couple of more places with empty `param` or `return` tags. Please either add a description or remove them.

##########
File path: processing/src/test/java/org/apache/druid/segment/join/table/RowBasedIndexedTableTest.java
##########
@@ -179,4 +179,13 @@ public void testVersion()
     Assert.assertEquals(JoinTestHelper.INDEXED_TABLE_VERSION, countriesTable.version());
     Assert.assertEquals(JoinTestHelper.INDEXED_TABLE_VERSION, regionsTable.version());
   }
+
+  @Test
+  public void testIsCacheable() throws IOException
+  {
+    Assert.assertFalse(countriesTable.isCacheable());
+    RowBasedIndexedTable<Map<String, Object>> countriesTableWithCacheKey = JoinTestHelper.createCountriesIndexedTableWithCacheKey();
+    Assert.assertTrue(countriesTableWithCacheKey.isCacheable());
+    Assert.assertEquals(countriesTableWithCacheKey.computeCacheKey(), JoinTestHelper.INDEXED_TABLE_CACHE_KEY);

Review comment:
       Please use `Assert.assertArrayEquals()`. Also the expected result should be the first argument.

##########
File path: server/src/test/java/org/apache/druid/server/SegmentManagerBroadcastJoinIndexedTableTest.java
##########
@@ -340,4 +356,15 @@ private DataSegment createSegment(IncrementalIndex data, String interval, String
         ImmutableMap.of("type", "local", "path", segmentDir.getAbsolutePath())
     );
   }
+
+  private void assertSegmentIdEquals(SegmentId id, byte[] bytes)
+  {
+    ByteBuffer byteBuffer = ByteBuffer.wrap(bytes);
+    long start = byteBuffer.getLong();
+    long end = byteBuffer.getLong();
+    String version = StringUtils.fromUtf8(byteBuffer, StringUtils.estimatedBinaryLengthAsUTF8(id.getVersion()));
+    String dataSource = StringUtils.fromUtf8(byteBuffer, StringUtils.estimatedBinaryLengthAsUTF8(id.getDataSource()));
+    int partition = byteBuffer.getInt();
+    Assert.assertEquals(id, SegmentId.of(dataSource, new Interval(start, end, DateTimeZone.UTC), version, partition));

Review comment:
       Please use `Intervals.utc()` instead.

##########
File path: processing/src/test/java/org/apache/druid/segment/join/JoinablesTest.java
##########
@@ -190,6 +196,176 @@ public boolean isDirectlyJoinable(DataSource dataSource)
     Assert.assertNotSame(Function.identity(), segmentMapFn);
   }
 
+  @Test(expected = IAE.class)
+  public void test_computeJoinDataSourceCacheKey_noClauses()
+  {
+    DataSourceAnalysis analysis = EasyMock.mock(DataSourceAnalysis.class);
+    EasyMock.expect(analysis.getPreJoinableClauses()).andReturn(Collections.emptyList()).anyTimes();
+    EasyMock.replay(analysis);
+    Joinables joinables = new Joinables(new JoinableFactoryWithCacheKey());
+    joinables.computeJoinDataSourceCacheKey(analysis);
+  }
+
+  @Test
+  public void test_computeJoinDataSourceCacheKey_noHashJoin()
+  {
+
+    PreJoinableClause clause_1 = makeGlobalPreJoinableClause("dataSource_1", "x == \"j.x\"", "j.");
+    PreJoinableClause clause_2 = makeGlobalPreJoinableClause("dataSource_2", "x != \"h.x\"", "h.");
+    DataSourceAnalysis analysis = EasyMock.mock(DataSourceAnalysis.class);
+    EasyMock.expect(analysis.getPreJoinableClauses()).andReturn(Arrays.asList(clause_1, clause_2)).anyTimes();
+    EasyMock.replay(analysis);
+    Joinables joinables = new Joinables(new JoinableFactoryWithCacheKey());
+    Optional<byte[]> cacheKey = joinables.computeJoinDataSourceCacheKey(analysis);
+
+    Assert.assertFalse(cacheKey.isPresent());
+  }
+
+  @Test
+  public void test_computeJoinDataSourceCacheKey_cachingUnsupported()
+  {
+    PreJoinableClause clause_1 = makeGlobalPreJoinableClause("dataSource_1", "x == \"j.x\"", "j.");
+    DataSource dataSource = new LookupDataSource("lookup");
+    PreJoinableClause clause_2 = makePreJoinableClause(dataSource, "x == \"h.x\"", "h.", JoinType.LEFT);
+    DataSourceAnalysis analysis = EasyMock.mock(DataSourceAnalysis.class);
+    EasyMock.expect(analysis.getPreJoinableClauses()).andReturn(Arrays.asList(clause_1, clause_2)).anyTimes();
+    EasyMock.replay(analysis);
+    Joinables joinables = new Joinables(new JoinableFactoryWithCacheKey());
+    Optional<byte[]> cacheKey = joinables.computeJoinDataSourceCacheKey(analysis);
+
+    Assert.assertFalse(cacheKey.isPresent());
+  }
+
+  @Test
+  public void test_computeJoinDataSourceCacheKey_usableClauses()
+  {
+
+    PreJoinableClause clause_1 = makeGlobalPreJoinableClause("dataSource_1", "x == \"j.x\"", "j.");
+    PreJoinableClause clause_2 = makeGlobalPreJoinableClause("dataSource_2", "x == \"h.x\"", "h.");
+    DataSourceAnalysis analysis = EasyMock.mock(DataSourceAnalysis.class);
+    EasyMock.expect(analysis.getPreJoinableClauses()).andReturn(Arrays.asList(clause_1, clause_2)).anyTimes();
+    EasyMock.replay(analysis);
+    Joinables joinables = new Joinables(new JoinableFactoryWithCacheKey());
+    Optional<byte[]> cacheKey = joinables.computeJoinDataSourceCacheKey(analysis);
+
+    Assert.assertTrue(cacheKey.isPresent());
+  }
+
+  @Test
+  public void test_computeJoinDataSourceCacheKey_keyChangesWithExpression()
+  {
+    DataSourceAnalysis analysis = EasyMock.mock(DataSourceAnalysis.class);
+    Joinables joinables = new Joinables(new JoinableFactoryWithCacheKey());
+
+    PreJoinableClause clause_1 = makeGlobalPreJoinableClause("dataSource_1", "y == \"j.y\"", "j.");
+    EasyMock.expect(analysis.getPreJoinableClauses()).andReturn(Collections.singletonList(clause_1)).anyTimes();
+    EasyMock.replay(analysis);
+
+    Optional<byte[]> cacheKey_1 = joinables.computeJoinDataSourceCacheKey(analysis);
+    Assert.assertTrue(cacheKey_1.isPresent());
+    Assert.assertNotEquals(0, cacheKey_1.get().length);
+
+    PreJoinableClause clause_2 = makeGlobalPreJoinableClause("dataSource_1", "x == \"j.x\"", "j.");
+    EasyMock.reset(analysis);
+    EasyMock.expect(analysis.getPreJoinableClauses()).andReturn(Collections.singletonList(clause_2)).anyTimes();
+    EasyMock.replay(analysis);
+    Optional<byte[]> cacheKey_2 = joinables.computeJoinDataSourceCacheKey(analysis);
+
+    Assert.assertNotEquals(cacheKey_1, cacheKey_2);
+  }
+
+  @Test
+  public void test_computeJoinDataSourceCacheKey_keyChangesWithJoinType()
+  {
+    DataSourceAnalysis analysis = EasyMock.mock(DataSourceAnalysis.class);
+    Joinables joinables = new Joinables(new JoinableFactoryWithCacheKey());
+
+    PreJoinableClause clause_1 = makeGlobalPreJoinableClause("dataSource_1", "x == \"j.x\"", "j.", JoinType.LEFT);
+    EasyMock.expect(analysis.getPreJoinableClauses()).andReturn(Collections.singletonList(clause_1)).anyTimes();
+    EasyMock.replay(analysis);
+
+    Optional<byte[]> cacheKey_1 = joinables.computeJoinDataSourceCacheKey(analysis);
+    Assert.assertTrue(cacheKey_1.isPresent());
+    Assert.assertNotEquals(0, cacheKey_1.get().length);
+
+    PreJoinableClause clause_2 = makeGlobalPreJoinableClause("dataSource_1", "x == \"j.x\"", "j.", JoinType.INNER);
+    EasyMock.reset(analysis);
+    EasyMock.expect(analysis.getPreJoinableClauses()).andReturn(Collections.singletonList(clause_2)).anyTimes();
+    EasyMock.replay(analysis);
+    Optional<byte[]> cacheKey_2 = joinables.computeJoinDataSourceCacheKey(analysis);
+
+    Assert.assertNotEquals(cacheKey_1, cacheKey_2);
+  }
+
+  @Test
+  public void test_computeJoinDataSourceCacheKey_keyChangesWithPrefix()
+  {
+    DataSourceAnalysis analysis = EasyMock.mock(DataSourceAnalysis.class);
+    Joinables joinables = new Joinables(new JoinableFactoryWithCacheKey());
+
+    PreJoinableClause clause_1 = makeGlobalPreJoinableClause("dataSource_1", "x == \"j.x\"", "j.");
+    EasyMock.expect(analysis.getPreJoinableClauses()).andReturn(Collections.singletonList(clause_1)).anyTimes();
+    EasyMock.replay(analysis);
+
+    Optional<byte[]> cacheKey_1 = joinables.computeJoinDataSourceCacheKey(analysis);
+    Assert.assertTrue(cacheKey_1.isPresent());
+    Assert.assertNotEquals(0, cacheKey_1.get().length);
+
+    PreJoinableClause clause_2 = makeGlobalPreJoinableClause("dataSource_1", "x == \"h.x\"", "h.");
+    EasyMock.reset(analysis);
+    EasyMock.expect(analysis.getPreJoinableClauses()).andReturn(Collections.singletonList(clause_2)).anyTimes();
+    EasyMock.replay(analysis);
+    Optional<byte[]> cacheKey_2 = joinables.computeJoinDataSourceCacheKey(analysis);
+
+    Assert.assertNotEquals(cacheKey_1, cacheKey_2);
+  }
+
+  @Test
+  public void test_computeJoinDataSourceCacheKey_keyChangesWithJoinable()
+  {
+    DataSourceAnalysis analysis = EasyMock.mock(DataSourceAnalysis.class);
+    Joinables joinables = new Joinables(new JoinableFactoryWithCacheKey());
+
+    PreJoinableClause clause_1 = makeGlobalPreJoinableClause("dataSource_1", "x == \"j.x\"", "j.");
+    EasyMock.expect(analysis.getPreJoinableClauses()).andReturn(Collections.singletonList(clause_1)).anyTimes();
+    EasyMock.replay(analysis);
+
+    Optional<byte[]> cacheKey_1 = joinables.computeJoinDataSourceCacheKey(analysis);
+    Assert.assertTrue(cacheKey_1.isPresent());
+    Assert.assertNotEquals(0, cacheKey_1.get().length);
+
+    PreJoinableClause clause_2 = makeGlobalPreJoinableClause("dataSource_2", "x == \"j.x\"", "j.");
+    EasyMock.reset(analysis);
+    EasyMock.expect(analysis.getPreJoinableClauses()).andReturn(Collections.singletonList(clause_2)).anyTimes();
+    EasyMock.replay(analysis);
+    Optional<byte[]> cacheKey_2 = joinables.computeJoinDataSourceCacheKey(analysis);
+
+    Assert.assertNotEquals(cacheKey_1, cacheKey_2);
+  }
+
+  @Test
+  public void test_computeJoinDataSourceCacheKey_sameKeyForSameJoin()
+  {
+    DataSourceAnalysis analysis = EasyMock.mock(DataSourceAnalysis.class);
+    Joinables joinables = new Joinables(new JoinableFactoryWithCacheKey());
+
+    PreJoinableClause clause_1 = makeGlobalPreJoinableClause("dataSource_1", "x == \"j.x\"", "j.");
+    EasyMock.expect(analysis.getPreJoinableClauses()).andReturn(Collections.singletonList(clause_1)).anyTimes();
+    EasyMock.replay(analysis);
+
+    Optional<byte[]> cacheKey_1 = joinables.computeJoinDataSourceCacheKey(analysis);
+    Assert.assertTrue(cacheKey_1.isPresent());
+    Assert.assertNotEquals(0, cacheKey_1.get().length);
+
+    PreJoinableClause clause_2 = makeGlobalPreJoinableClause("dataSource_1", "x == \"j.x\"", "j.");
+    EasyMock.reset(analysis);
+    EasyMock.expect(analysis.getPreJoinableClauses()).andReturn(Collections.singletonList(clause_2)).anyTimes();
+    EasyMock.replay(analysis);
+    Optional<byte[]> cacheKey_2 = joinables.computeJoinDataSourceCacheKey(analysis);
+
+    Assert.assertNotEquals(cacheKey_1, cacheKey_2);

Review comment:
       Similar comment for other tests you added. Should they use `Assert.assertFalse(Arrays.equals(cacheKey_1.get(), cacheKey_2.get()));`?

##########
File path: processing/src/main/java/org/apache/druid/segment/join/table/BroadcastSegmentIndexedTable.java
##########
@@ -241,6 +244,22 @@ public ColumnSelectorFactory makeColumnSelectorFactory(ReadableOffset offset, bo
     );
   }
 
+  @Override
+  public Optional<byte[]> computeCacheKey()
+  {
+    SegmentId segmentId = segment.getId();

Review comment:
       I think this logic should be here instead of in `SegmentId` since a segmentId represents a whole segment while we can partially read a broadcast segment based on an interval filter in the future.
   
   @abhishekagarwal87 regarding using `CacheUtil`, if you just want to add a new method similar to `computeSegmentCacheKey()`, I think you don't have to since the logic is different anyway. I would rather suggest using `CacheKeyBuilder`.

##########
File path: server/src/main/java/org/apache/druid/server/coordination/ServerManager.java
##########
@@ -293,8 +300,9 @@ public ServerManager(
         queryMetrics -> queryMetrics.segment(segmentIdString)
     );
 
-    CachingQueryRunner<T> cachingQueryRunner = new CachingQueryRunner<>(
+    QueryRunner<T> queryRunner = new CachingQueryRunner<>(

Review comment:
       Why is this change needed?




----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscribe@druid.apache.org
For additional commands, e-mail: commits-help@druid.apache.org