You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@druid.apache.org by gi...@apache.org on 2021/01/14 02:31:58 UTC

[druid] branch master updated: use actual dataInterval in cache key (#10714)

This is an automated email from the ASF dual-hosted git repository.

gian pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/druid.git


The following commit(s) were added to refs/heads/master by this push:
     new 4437c6a  use actual dataInterval in cache key (#10714)
4437c6a is described below

commit 4437c6af60316e86980d3ed75ca5fa7816f93697
Author: kaijianding <ka...@gmail.com>
AuthorDate: Thu Jan 14 10:31:36 2021 +0800

    use actual dataInterval in cache key (#10714)
    
    * use actual dataInterval in cache key
    
    * fix ut fail
    
    * fix segmentMaxTime exclusive
---
 .../apache/druid/client/CachingQueryRunner.java    |  16 ++-
 .../appenderator/SinkQuerySegmentWalker.java       |   7 ++
 .../druid/server/coordination/ServerManager.java   |   7 ++
 .../druid/client/CachingQueryRunnerTest.java       |   1 +
 .../server/coordination/ServerManagerTest.java     | 116 ++++++++++++++++++++-
 5 files changed, 145 insertions(+), 2 deletions(-)

diff --git a/server/src/main/java/org/apache/druid/client/CachingQueryRunner.java b/server/src/main/java/org/apache/druid/client/CachingQueryRunner.java
index 1372c71..9bb9f47 100644
--- a/server/src/main/java/org/apache/druid/client/CachingQueryRunner.java
+++ b/server/src/main/java/org/apache/druid/client/CachingQueryRunner.java
@@ -37,6 +37,7 @@ import org.apache.druid.query.QueryRunner;
 import org.apache.druid.query.QueryToolChest;
 import org.apache.druid.query.SegmentDescriptor;
 import org.apache.druid.query.context.ResponseContext;
+import org.joda.time.Interval;
 
 import java.io.IOException;
 import java.util.Collections;
@@ -47,6 +48,7 @@ public class CachingQueryRunner<T> implements QueryRunner<T>
 {
   private final String cacheId;
   private final SegmentDescriptor segmentDescriptor;
+  private final Interval actualDataInterval;
   private final Optional<byte[]> cacheKeyPrefix;
   private final QueryRunner<T> base;
   private final QueryToolChest toolChest;
@@ -59,6 +61,7 @@ public class CachingQueryRunner<T> implements QueryRunner<T>
       String cacheId,
       Optional<byte[]> cacheKeyPrefix,
       SegmentDescriptor segmentDescriptor,
+      Interval actualDataInterval,
       ObjectMapper mapper,
       Cache cache,
       QueryToolChest toolchest,
@@ -71,6 +74,7 @@ public class CachingQueryRunner<T> implements QueryRunner<T>
     this.base = base;
     this.cacheId = cacheId;
     this.segmentDescriptor = segmentDescriptor;
+    this.actualDataInterval = actualDataInterval;
     this.toolChest = toolchest;
     this.cache = cache;
     this.mapper = mapper;
@@ -90,7 +94,7 @@ public class CachingQueryRunner<T> implements QueryRunner<T>
     if (useCache || populateCache) {
       key = CacheUtil.computeSegmentCacheKey(
           cacheId,
-          segmentDescriptor,
+          alignToActualDataInterval(segmentDescriptor),
           Bytes.concat(cacheKeyPrefix.get(), strategy.computeCacheKey(query))
       );
     } else {
@@ -172,4 +176,14 @@ public class CachingQueryRunner<T> implements QueryRunner<T>
     ) && cacheKeyPrefix.isPresent();
   }
 
+  private SegmentDescriptor alignToActualDataInterval(SegmentDescriptor in)
+  {
+    Interval interval = in.getInterval();
+    return new SegmentDescriptor(
+        interval.overlaps(actualDataInterval) ? interval.overlap(actualDataInterval) : interval,
+        in.getVersion(),
+        in.getPartitionNumber()
+    );
+  }
+
 }
diff --git a/server/src/main/java/org/apache/druid/segment/realtime/appenderator/SinkQuerySegmentWalker.java b/server/src/main/java/org/apache/druid/segment/realtime/appenderator/SinkQuerySegmentWalker.java
index 5b2c5cc..2c3d38f 100644
--- a/server/src/main/java/org/apache/druid/segment/realtime/appenderator/SinkQuerySegmentWalker.java
+++ b/server/src/main/java/org/apache/druid/segment/realtime/appenderator/SinkQuerySegmentWalker.java
@@ -29,6 +29,7 @@ import org.apache.druid.client.cache.CacheConfig;
 import org.apache.druid.client.cache.CachePopulatorStats;
 import org.apache.druid.client.cache.ForegroundCachePopulator;
 import org.apache.druid.java.util.common.ISE;
+import org.apache.druid.java.util.common.Intervals;
 import org.apache.druid.java.util.common.Pair;
 import org.apache.druid.java.util.common.StringUtils;
 import org.apache.druid.java.util.common.concurrent.Execs;
@@ -57,6 +58,7 @@ import org.apache.druid.query.planning.DataSourceAnalysis;
 import org.apache.druid.query.spec.SpecificSegmentQueryRunner;
 import org.apache.druid.query.spec.SpecificSegmentSpec;
 import org.apache.druid.segment.SegmentReference;
+import org.apache.druid.segment.StorageAdapter;
 import org.apache.druid.segment.join.JoinableFactory;
 import org.apache.druid.segment.join.JoinableFactoryWrapper;
 import org.apache.druid.segment.realtime.FireHydrant;
@@ -230,10 +232,15 @@ public class SinkQuerySegmentWalker implements QuerySegmentWalker
                       // 1) Only use caching if data is immutable
                       // 2) Hydrants are not the same between replicas, make sure cache is local
                       if (hydrantDefinitelySwapped && cache.isLocal()) {
+                        StorageAdapter storageAdapter = segmentAndCloseable.lhs.asStorageAdapter();
+                        long segmentMinTime = storageAdapter.getMinTime().getMillis();
+                        long segmentMaxTime = storageAdapter.getMaxTime().getMillis();
+                        Interval actualDataInterval = Intervals.utc(segmentMinTime, segmentMaxTime + 1);
                         runner = new CachingQueryRunner<>(
                             makeHydrantCacheIdentifier(hydrant),
                             cacheKeyPrefix,
                             descriptor,
+                            actualDataInterval,
                             objectMapper,
                             cache,
                             toolChest,
diff --git a/server/src/main/java/org/apache/druid/server/coordination/ServerManager.java b/server/src/main/java/org/apache/druid/server/coordination/ServerManager.java
index 44c0d8b..da4fc59 100644
--- a/server/src/main/java/org/apache/druid/server/coordination/ServerManager.java
+++ b/server/src/main/java/org/apache/druid/server/coordination/ServerManager.java
@@ -29,6 +29,7 @@ import org.apache.druid.client.cache.CachePopulator;
 import org.apache.druid.guice.annotations.Processing;
 import org.apache.druid.guice.annotations.Smile;
 import org.apache.druid.java.util.common.ISE;
+import org.apache.druid.java.util.common.Intervals;
 import org.apache.druid.java.util.common.StringUtils;
 import org.apache.druid.java.util.common.guava.FunctionalIterable;
 import org.apache.druid.java.util.emitter.EmittingLogger;
@@ -57,6 +58,7 @@ import org.apache.druid.query.spec.SpecificSegmentQueryRunner;
 import org.apache.druid.query.spec.SpecificSegmentSpec;
 import org.apache.druid.segment.ReferenceCountingSegment;
 import org.apache.druid.segment.SegmentReference;
+import org.apache.druid.segment.StorageAdapter;
 import org.apache.druid.segment.join.JoinableFactory;
 import org.apache.druid.segment.join.JoinableFactoryWrapper;
 import org.apache.druid.server.SegmentManager;
@@ -300,10 +302,15 @@ public class ServerManager implements QuerySegmentWalker
         queryMetrics -> queryMetrics.segment(segmentIdString)
     );
 
+    StorageAdapter storageAdapter = segment.asStorageAdapter();
+    long segmentMaxTime = storageAdapter.getMaxTime().getMillis();
+    long segmentMinTime = storageAdapter.getMinTime().getMillis();
+    Interval actualDataInterval = Intervals.utc(segmentMinTime, segmentMaxTime + 1);
     CachingQueryRunner<T> cachingQueryRunner = new CachingQueryRunner<>(
         segmentIdString,
         cacheKeyPrefix,
         segmentDescriptor,
+        actualDataInterval,
         objectMapper,
         cache,
         toolChest,
diff --git a/server/src/test/java/org/apache/druid/client/CachingQueryRunnerTest.java b/server/src/test/java/org/apache/druid/client/CachingQueryRunnerTest.java
index a2f7c8a..abbf316 100644
--- a/server/src/test/java/org/apache/druid/client/CachingQueryRunnerTest.java
+++ b/server/src/test/java/org/apache/druid/client/CachingQueryRunnerTest.java
@@ -413,6 +413,7 @@ public class CachingQueryRunnerTest
         CACHE_ID,
         Optional.ofNullable(cacheKeyPrefix),
         SEGMENT_DESCRIPTOR,
+        SEGMENT_DESCRIPTOR.getInterval(),
         objectMapper,
         cache,
         toolchest,
diff --git a/server/src/test/java/org/apache/druid/server/coordination/ServerManagerTest.java b/server/src/test/java/org/apache/druid/server/coordination/ServerManagerTest.java
index bf74ba6..4f00cde 100644
--- a/server/src/test/java/org/apache/druid/server/coordination/ServerManagerTest.java
+++ b/server/src/test/java/org/apache/druid/server/coordination/ServerManagerTest.java
@@ -63,16 +63,22 @@ import org.apache.druid.query.context.DefaultResponseContext;
 import org.apache.druid.query.context.ResponseContext;
 import org.apache.druid.query.context.ResponseContext.Key;
 import org.apache.druid.query.filter.DimFilter;
+import org.apache.druid.query.filter.Filter;
 import org.apache.druid.query.planning.DataSourceAnalysis;
 import org.apache.druid.query.search.SearchQuery;
 import org.apache.druid.query.search.SearchResultValue;
 import org.apache.druid.query.spec.MultipleSpecificSegmentSpec;
 import org.apache.druid.query.spec.QuerySegmentSpec;
+import org.apache.druid.segment.Cursor;
 import org.apache.druid.segment.IndexIO;
+import org.apache.druid.segment.Metadata;
 import org.apache.druid.segment.QueryableIndex;
 import org.apache.druid.segment.ReferenceCountingSegment;
 import org.apache.druid.segment.Segment;
 import org.apache.druid.segment.StorageAdapter;
+import org.apache.druid.segment.VirtualColumns;
+import org.apache.druid.segment.column.ColumnCapabilities;
+import org.apache.druid.segment.data.Indexed;
 import org.apache.druid.segment.join.NoopJoinableFactory;
 import org.apache.druid.segment.loading.SegmentLoader;
 import org.apache.druid.segment.loading.SegmentLoadingException;
@@ -85,6 +91,7 @@ import org.apache.druid.timeline.TimelineObjectHolder;
 import org.apache.druid.timeline.VersionedIntervalTimeline;
 import org.apache.druid.timeline.partition.NoneShardSpec;
 import org.apache.druid.timeline.partition.PartitionChunk;
+import org.joda.time.DateTime;
 import org.joda.time.Interval;
 import org.junit.Assert;
 import org.junit.Before;
@@ -92,6 +99,7 @@ import org.junit.Rule;
 import org.junit.Test;
 import org.junit.rules.ExpectedException;
 
+import javax.annotation.Nullable;
 import java.io.File;
 import java.io.IOException;
 import java.util.ArrayList;
@@ -837,7 +845,7 @@ public class ServerManagerTest
     @Override
     public StorageAdapter asStorageAdapter()
     {
-      throw new UnsupportedOperationException();
+      return makeFakeStorageAdapter(interval, 0);
     }
 
     @Override
@@ -847,6 +855,112 @@ public class ServerManagerTest
         closed = true;
       }
     }
+
+    private StorageAdapter makeFakeStorageAdapter(Interval interval, int cardinality)
+    {
+      StorageAdapter adapter = new StorageAdapter()
+      {
+        @Override
+        public Interval getInterval()
+        {
+          return interval;
+        }
+
+        @Override
+        public int getDimensionCardinality(String column)
+        {
+          return cardinality;
+        }
+
+        @Override
+        public DateTime getMinTime()
+        {
+          return interval.getStart();
+        }
+
+
+        @Override
+        public DateTime getMaxTime()
+        {
+          return interval.getEnd();
+        }
+
+        // stubs below this line not important for tests
+
+        @Override
+        public Indexed<String> getAvailableDimensions()
+        {
+          return null;
+        }
+
+        @Override
+        public Iterable<String> getAvailableMetrics()
+        {
+          return null;
+        }
+
+        @Nullable
+        @Override
+        public Comparable getMinValue(String column)
+        {
+          return null;
+        }
+
+        @Nullable
+        @Override
+        public Comparable getMaxValue(String column)
+        {
+          return null;
+        }
+
+        @Nullable
+        @Override
+        public ColumnCapabilities getColumnCapabilities(String column)
+        {
+          return null;
+        }
+
+        @Nullable
+        @Override
+        public String getColumnTypeName(String column)
+        {
+          return null;
+        }
+
+        @Override
+        public int getNumRows()
+        {
+          return 0;
+        }
+
+        @Override
+        public DateTime getMaxIngestedEventTime()
+        {
+          return null;
+        }
+
+        @Override
+        public Metadata getMetadata()
+        {
+          return null;
+        }
+
+        @Override
+        public Sequence<Cursor> makeCursors(
+            @Nullable Filter filter,
+            Interval interval,
+            VirtualColumns virtualColumns,
+            Granularity gran,
+            boolean descending,
+            @Nullable QueryMetrics<?> queryMetrics
+        )
+        {
+          return null;
+        }
+      };
+
+      return adapter;
+    }
   }
 
   private static class BlockingQueryRunner<T> implements QueryRunner<T>


---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscribe@druid.apache.org
For additional commands, e-mail: commits-help@druid.apache.org