You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@druid.apache.org by gi...@apache.org on 2021/01/14 02:31:58 UTC
[druid] branch master updated: use actual dataInterval in cache key
(#10714)
This is an automated email from the ASF dual-hosted git repository.
gian pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/druid.git
The following commit(s) were added to refs/heads/master by this push:
new 4437c6a use actual dataInterval in cache key (#10714)
4437c6a is described below
commit 4437c6af60316e86980d3ed75ca5fa7816f93697
Author: kaijianding <ka...@gmail.com>
AuthorDate: Thu Jan 14 10:31:36 2021 +0800
use actual dataInterval in cache key (#10714)
* use actual dataInterval in cache key
* fix ut fail
* fix segmentMaxTime exclusive
---
.../apache/druid/client/CachingQueryRunner.java | 16 ++-
.../appenderator/SinkQuerySegmentWalker.java | 7 ++
.../druid/server/coordination/ServerManager.java | 7 ++
.../druid/client/CachingQueryRunnerTest.java | 1 +
.../server/coordination/ServerManagerTest.java | 116 ++++++++++++++++++++-
5 files changed, 145 insertions(+), 2 deletions(-)
diff --git a/server/src/main/java/org/apache/druid/client/CachingQueryRunner.java b/server/src/main/java/org/apache/druid/client/CachingQueryRunner.java
index 1372c71..9bb9f47 100644
--- a/server/src/main/java/org/apache/druid/client/CachingQueryRunner.java
+++ b/server/src/main/java/org/apache/druid/client/CachingQueryRunner.java
@@ -37,6 +37,7 @@ import org.apache.druid.query.QueryRunner;
import org.apache.druid.query.QueryToolChest;
import org.apache.druid.query.SegmentDescriptor;
import org.apache.druid.query.context.ResponseContext;
+import org.joda.time.Interval;
import java.io.IOException;
import java.util.Collections;
@@ -47,6 +48,7 @@ public class CachingQueryRunner<T> implements QueryRunner<T>
{
private final String cacheId;
private final SegmentDescriptor segmentDescriptor;
+ private final Interval actualDataInterval;
private final Optional<byte[]> cacheKeyPrefix;
private final QueryRunner<T> base;
private final QueryToolChest toolChest;
@@ -59,6 +61,7 @@ public class CachingQueryRunner<T> implements QueryRunner<T>
String cacheId,
Optional<byte[]> cacheKeyPrefix,
SegmentDescriptor segmentDescriptor,
+ Interval actualDataInterval,
ObjectMapper mapper,
Cache cache,
QueryToolChest toolchest,
@@ -71,6 +74,7 @@ public class CachingQueryRunner<T> implements QueryRunner<T>
this.base = base;
this.cacheId = cacheId;
this.segmentDescriptor = segmentDescriptor;
+ this.actualDataInterval = actualDataInterval;
this.toolChest = toolchest;
this.cache = cache;
this.mapper = mapper;
@@ -90,7 +94,7 @@ public class CachingQueryRunner<T> implements QueryRunner<T>
if (useCache || populateCache) {
key = CacheUtil.computeSegmentCacheKey(
cacheId,
- segmentDescriptor,
+ alignToActualDataInterval(segmentDescriptor),
Bytes.concat(cacheKeyPrefix.get(), strategy.computeCacheKey(query))
);
} else {
@@ -172,4 +176,14 @@ public class CachingQueryRunner<T> implements QueryRunner<T>
) && cacheKeyPrefix.isPresent();
}
+ private SegmentDescriptor alignToActualDataInterval(SegmentDescriptor in)
+ {
+ Interval interval = in.getInterval();
+ return new SegmentDescriptor(
+ interval.overlaps(actualDataInterval) ? interval.overlap(actualDataInterval) : interval,
+ in.getVersion(),
+ in.getPartitionNumber()
+ );
+ }
+
}
diff --git a/server/src/main/java/org/apache/druid/segment/realtime/appenderator/SinkQuerySegmentWalker.java b/server/src/main/java/org/apache/druid/segment/realtime/appenderator/SinkQuerySegmentWalker.java
index 5b2c5cc..2c3d38f 100644
--- a/server/src/main/java/org/apache/druid/segment/realtime/appenderator/SinkQuerySegmentWalker.java
+++ b/server/src/main/java/org/apache/druid/segment/realtime/appenderator/SinkQuerySegmentWalker.java
@@ -29,6 +29,7 @@ import org.apache.druid.client.cache.CacheConfig;
import org.apache.druid.client.cache.CachePopulatorStats;
import org.apache.druid.client.cache.ForegroundCachePopulator;
import org.apache.druid.java.util.common.ISE;
+import org.apache.druid.java.util.common.Intervals;
import org.apache.druid.java.util.common.Pair;
import org.apache.druid.java.util.common.StringUtils;
import org.apache.druid.java.util.common.concurrent.Execs;
@@ -57,6 +58,7 @@ import org.apache.druid.query.planning.DataSourceAnalysis;
import org.apache.druid.query.spec.SpecificSegmentQueryRunner;
import org.apache.druid.query.spec.SpecificSegmentSpec;
import org.apache.druid.segment.SegmentReference;
+import org.apache.druid.segment.StorageAdapter;
import org.apache.druid.segment.join.JoinableFactory;
import org.apache.druid.segment.join.JoinableFactoryWrapper;
import org.apache.druid.segment.realtime.FireHydrant;
@@ -230,10 +232,15 @@ public class SinkQuerySegmentWalker implements QuerySegmentWalker
// 1) Only use caching if data is immutable
// 2) Hydrants are not the same between replicas, make sure cache is local
if (hydrantDefinitelySwapped && cache.isLocal()) {
+ StorageAdapter storageAdapter = segmentAndCloseable.lhs.asStorageAdapter();
+ long segmentMinTime = storageAdapter.getMinTime().getMillis();
+ long segmentMaxTime = storageAdapter.getMaxTime().getMillis();
+ Interval actualDataInterval = Intervals.utc(segmentMinTime, segmentMaxTime + 1);
runner = new CachingQueryRunner<>(
makeHydrantCacheIdentifier(hydrant),
cacheKeyPrefix,
descriptor,
+ actualDataInterval,
objectMapper,
cache,
toolChest,
diff --git a/server/src/main/java/org/apache/druid/server/coordination/ServerManager.java b/server/src/main/java/org/apache/druid/server/coordination/ServerManager.java
index 44c0d8b..da4fc59 100644
--- a/server/src/main/java/org/apache/druid/server/coordination/ServerManager.java
+++ b/server/src/main/java/org/apache/druid/server/coordination/ServerManager.java
@@ -29,6 +29,7 @@ import org.apache.druid.client.cache.CachePopulator;
import org.apache.druid.guice.annotations.Processing;
import org.apache.druid.guice.annotations.Smile;
import org.apache.druid.java.util.common.ISE;
+import org.apache.druid.java.util.common.Intervals;
import org.apache.druid.java.util.common.StringUtils;
import org.apache.druid.java.util.common.guava.FunctionalIterable;
import org.apache.druid.java.util.emitter.EmittingLogger;
@@ -57,6 +58,7 @@ import org.apache.druid.query.spec.SpecificSegmentQueryRunner;
import org.apache.druid.query.spec.SpecificSegmentSpec;
import org.apache.druid.segment.ReferenceCountingSegment;
import org.apache.druid.segment.SegmentReference;
+import org.apache.druid.segment.StorageAdapter;
import org.apache.druid.segment.join.JoinableFactory;
import org.apache.druid.segment.join.JoinableFactoryWrapper;
import org.apache.druid.server.SegmentManager;
@@ -300,10 +302,15 @@ public class ServerManager implements QuerySegmentWalker
queryMetrics -> queryMetrics.segment(segmentIdString)
);
+ StorageAdapter storageAdapter = segment.asStorageAdapter();
+ long segmentMaxTime = storageAdapter.getMaxTime().getMillis();
+ long segmentMinTime = storageAdapter.getMinTime().getMillis();
+ Interval actualDataInterval = Intervals.utc(segmentMinTime, segmentMaxTime + 1);
CachingQueryRunner<T> cachingQueryRunner = new CachingQueryRunner<>(
segmentIdString,
cacheKeyPrefix,
segmentDescriptor,
+ actualDataInterval,
objectMapper,
cache,
toolChest,
diff --git a/server/src/test/java/org/apache/druid/client/CachingQueryRunnerTest.java b/server/src/test/java/org/apache/druid/client/CachingQueryRunnerTest.java
index a2f7c8a..abbf316 100644
--- a/server/src/test/java/org/apache/druid/client/CachingQueryRunnerTest.java
+++ b/server/src/test/java/org/apache/druid/client/CachingQueryRunnerTest.java
@@ -413,6 +413,7 @@ public class CachingQueryRunnerTest
CACHE_ID,
Optional.ofNullable(cacheKeyPrefix),
SEGMENT_DESCRIPTOR,
+ SEGMENT_DESCRIPTOR.getInterval(),
objectMapper,
cache,
toolchest,
diff --git a/server/src/test/java/org/apache/druid/server/coordination/ServerManagerTest.java b/server/src/test/java/org/apache/druid/server/coordination/ServerManagerTest.java
index bf74ba6..4f00cde 100644
--- a/server/src/test/java/org/apache/druid/server/coordination/ServerManagerTest.java
+++ b/server/src/test/java/org/apache/druid/server/coordination/ServerManagerTest.java
@@ -63,16 +63,22 @@ import org.apache.druid.query.context.DefaultResponseContext;
import org.apache.druid.query.context.ResponseContext;
import org.apache.druid.query.context.ResponseContext.Key;
import org.apache.druid.query.filter.DimFilter;
+import org.apache.druid.query.filter.Filter;
import org.apache.druid.query.planning.DataSourceAnalysis;
import org.apache.druid.query.search.SearchQuery;
import org.apache.druid.query.search.SearchResultValue;
import org.apache.druid.query.spec.MultipleSpecificSegmentSpec;
import org.apache.druid.query.spec.QuerySegmentSpec;
+import org.apache.druid.segment.Cursor;
import org.apache.druid.segment.IndexIO;
+import org.apache.druid.segment.Metadata;
import org.apache.druid.segment.QueryableIndex;
import org.apache.druid.segment.ReferenceCountingSegment;
import org.apache.druid.segment.Segment;
import org.apache.druid.segment.StorageAdapter;
+import org.apache.druid.segment.VirtualColumns;
+import org.apache.druid.segment.column.ColumnCapabilities;
+import org.apache.druid.segment.data.Indexed;
import org.apache.druid.segment.join.NoopJoinableFactory;
import org.apache.druid.segment.loading.SegmentLoader;
import org.apache.druid.segment.loading.SegmentLoadingException;
@@ -85,6 +91,7 @@ import org.apache.druid.timeline.TimelineObjectHolder;
import org.apache.druid.timeline.VersionedIntervalTimeline;
import org.apache.druid.timeline.partition.NoneShardSpec;
import org.apache.druid.timeline.partition.PartitionChunk;
+import org.joda.time.DateTime;
import org.joda.time.Interval;
import org.junit.Assert;
import org.junit.Before;
@@ -92,6 +99,7 @@ import org.junit.Rule;
import org.junit.Test;
import org.junit.rules.ExpectedException;
+import javax.annotation.Nullable;
import java.io.File;
import java.io.IOException;
import java.util.ArrayList;
@@ -837,7 +845,7 @@ public class ServerManagerTest
@Override
public StorageAdapter asStorageAdapter()
{
- throw new UnsupportedOperationException();
+ return makeFakeStorageAdapter(interval, 0);
}
@Override
@@ -847,6 +855,112 @@ public class ServerManagerTest
closed = true;
}
}
+
+ private StorageAdapter makeFakeStorageAdapter(Interval interval, int cardinality)
+ {
+ StorageAdapter adapter = new StorageAdapter()
+ {
+ @Override
+ public Interval getInterval()
+ {
+ return interval;
+ }
+
+ @Override
+ public int getDimensionCardinality(String column)
+ {
+ return cardinality;
+ }
+
+ @Override
+ public DateTime getMinTime()
+ {
+ return interval.getStart();
+ }
+
+
+ @Override
+ public DateTime getMaxTime()
+ {
+ return interval.getEnd();
+ }
+
+ // stubs below this line not important for tests
+
+ @Override
+ public Indexed<String> getAvailableDimensions()
+ {
+ return null;
+ }
+
+ @Override
+ public Iterable<String> getAvailableMetrics()
+ {
+ return null;
+ }
+
+ @Nullable
+ @Override
+ public Comparable getMinValue(String column)
+ {
+ return null;
+ }
+
+ @Nullable
+ @Override
+ public Comparable getMaxValue(String column)
+ {
+ return null;
+ }
+
+ @Nullable
+ @Override
+ public ColumnCapabilities getColumnCapabilities(String column)
+ {
+ return null;
+ }
+
+ @Nullable
+ @Override
+ public String getColumnTypeName(String column)
+ {
+ return null;
+ }
+
+ @Override
+ public int getNumRows()
+ {
+ return 0;
+ }
+
+ @Override
+ public DateTime getMaxIngestedEventTime()
+ {
+ return null;
+ }
+
+ @Override
+ public Metadata getMetadata()
+ {
+ return null;
+ }
+
+ @Override
+ public Sequence<Cursor> makeCursors(
+ @Nullable Filter filter,
+ Interval interval,
+ VirtualColumns virtualColumns,
+ Granularity gran,
+ boolean descending,
+ @Nullable QueryMetrics<?> queryMetrics
+ )
+ {
+ return null;
+ }
+ };
+
+ return adapter;
+ }
}
private static class BlockingQueryRunner<T> implements QueryRunner<T>
---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscribe@druid.apache.org
For additional commands, e-mail: commits-help@druid.apache.org