You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@druid.apache.org by ji...@apache.org on 2019/05/12 23:59:19 UTC
[incubator-druid] branch 0.15.0-incubating updated: Fix
resultLevelCache for timeseries with grandTotal (#7624) (#7637)
This is an automated email from the ASF dual-hosted git repository.
jihoonson pushed a commit to branch 0.15.0-incubating
in repository https://gitbox.apache.org/repos/asf/incubator-druid.git
The following commit(s) were added to refs/heads/0.15.0-incubating by this push:
new e2f15a3 Fix resultLevelCache for timeseries with grandTotal (#7624) (#7637)
e2f15a3 is described below
commit e2f15a39f38da1f2d959f34f648b179b37ffc442
Author: Jihoon Son <ji...@apache.org>
AuthorDate: Sun May 12 16:59:12 2019 -0700
Fix resultLevelCache for timeseries with grandTotal (#7624) (#7637)
* Fix resultLevelCache for timeseries with grandTotal
* Address comment
* fix test
---
.../main/java/org/apache/druid/query/Result.java | 26 +++--
.../timeseries/TimeseriesQueryQueryToolChest.java | 19 +++-
.../java/org/apache/druid/query/ResultTest.java | 38 ++++++++
.../TimeseriesQueryQueryToolChestTest.java | 17 ++++
.../druid/query/ResultLevelCachingQueryRunner.java | 105 +++++++++++----------
5 files changed, 140 insertions(+), 65 deletions(-)
diff --git a/processing/src/main/java/org/apache/druid/query/Result.java b/processing/src/main/java/org/apache/druid/query/Result.java
index c1ec3a3..9ec75ad 100644
--- a/processing/src/main/java/org/apache/druid/query/Result.java
+++ b/processing/src/main/java/org/apache/druid/query/Result.java
@@ -24,6 +24,9 @@ import com.fasterxml.jackson.annotation.JsonProperty;
import org.apache.druid.guice.annotations.PublicApi;
import org.joda.time.DateTime;
+import javax.annotation.Nullable;
+import java.util.Comparator;
+import java.util.Objects;
import java.util.function.Function;
/**
@@ -33,11 +36,12 @@ public class Result<T> implements Comparable<Result<T>>
{
public static String MISSING_SEGMENTS_KEY = "missingSegments";
+ @Nullable
private final DateTime timestamp;
private final T value;
@JsonCreator
- public Result(@JsonProperty("timestamp") DateTime timestamp, @JsonProperty("result") T value)
+ public Result(@JsonProperty("timestamp") @Nullable DateTime timestamp, @JsonProperty("result") T value)
{
this.timestamp = timestamp;
this.value = value;
@@ -51,10 +55,12 @@ public class Result<T> implements Comparable<Result<T>>
@Override
public int compareTo(Result<T> tResult)
{
- return timestamp.compareTo(tResult.timestamp);
+ // timestamp is null for grandTotal which should come last.
+ return Comparator.nullsLast(DateTime::compareTo).compare(this.timestamp, tResult.timestamp);
}
@JsonProperty
+ @Nullable
public DateTime getTimestamp()
{
return timestamp;
@@ -78,22 +84,22 @@ public class Result<T> implements Comparable<Result<T>>
Result result = (Result) o;
- if (timestamp != null ? !(timestamp.isEqual(result.timestamp) && timestamp.getZone().getOffset(timestamp) == result.timestamp.getZone().getOffset(result.timestamp)) : result.timestamp != null) {
- return false;
- }
- if (value != null ? !value.equals(result.value) : result.value != null) {
+ if (timestamp != null && result.timestamp != null) {
+ if (!timestamp.isEqual(result.timestamp)
+ && timestamp.getZone().getOffset(timestamp) == result.timestamp.getZone().getOffset(result.timestamp)) {
+ return false;
+ }
+ } else if (timestamp == null ^ result.timestamp == null) {
return false;
}
- return true;
+ return Objects.equals(value, result.value);
}
@Override
public int hashCode()
{
- int result = timestamp != null ? timestamp.hashCode() : 0;
- result = 31 * result + (value != null ? value.hashCode() : 0);
- return result;
+ return Objects.hash(timestamp, value);
}
@Override
diff --git a/processing/src/main/java/org/apache/druid/query/timeseries/TimeseriesQueryQueryToolChest.java b/processing/src/main/java/org/apache/druid/query/timeseries/TimeseriesQueryQueryToolChest.java
index d625c31..0ae9a70 100644
--- a/processing/src/main/java/org/apache/druid/query/timeseries/TimeseriesQueryQueryToolChest.java
+++ b/processing/src/main/java/org/apache/druid/query/timeseries/TimeseriesQueryQueryToolChest.java
@@ -22,6 +22,7 @@ package org.apache.druid.query.timeseries;
import com.fasterxml.jackson.core.type.TypeReference;
import com.google.common.annotations.VisibleForTesting;
import com.google.common.base.Function;
+import com.google.common.base.Preconditions;
import com.google.common.collect.ImmutableList;
import com.google.common.collect.ImmutableMap;
import com.google.common.collect.Lists;
@@ -52,7 +53,6 @@ import org.apache.druid.query.cache.CacheKeyBuilder;
import org.apache.druid.query.groupby.RowBasedColumnSelectorFactory;
import org.joda.time.DateTime;
-import javax.annotation.Nullable;
import java.util.Collections;
import java.util.HashMap;
import java.util.Iterator;
@@ -303,7 +303,12 @@ public class TimeseriesQueryQueryToolChest extends QueryToolChest<Result<Timeser
TimeseriesResultValue results = input.getValue();
final List<Object> retVal = Lists.newArrayListWithCapacity(1 + aggs.size());
- retVal.add(input.getTimestamp().getMillis());
+ // Timestamp can be null if grandTotal is true.
+ if (isResultLevelCache) {
+ retVal.add(input.getTimestamp() == null ? null : input.getTimestamp().getMillis());
+ } else {
+ retVal.add(Preconditions.checkNotNull(input.getTimestamp(), "timestamp of input[%s]", input).getMillis());
+ }
for (AggregatorFactory agg : aggs) {
retVal.add(results.getMetric(agg.getName()));
}
@@ -324,7 +329,7 @@ public class TimeseriesQueryQueryToolChest extends QueryToolChest<Result<Timeser
private final Granularity granularity = query.getGranularity();
@Override
- public Result<TimeseriesResultValue> apply(@Nullable Object input)
+ public Result<TimeseriesResultValue> apply(Object input)
{
List<Object> results = (List<Object>) input;
final Map<String, Object> retVal = Maps.newLinkedHashMap();
@@ -332,7 +337,13 @@ public class TimeseriesQueryQueryToolChest extends QueryToolChest<Result<Timeser
Iterator<AggregatorFactory> aggsIter = aggs.iterator();
Iterator<Object> resultIter = results.iterator();
- DateTime timestamp = granularity.toDateTime(((Number) resultIter.next()).longValue());
+ final Number timestampNumber = (Number) resultIter.next();
+ final DateTime timestamp;
+ if (isResultLevelCache) {
+ timestamp = timestampNumber == null ? null : granularity.toDateTime(timestampNumber.longValue());
+ } else {
+ timestamp = granularity.toDateTime(Preconditions.checkNotNull(timestampNumber, "timestamp").longValue());
+ }
CacheStrategy.fetchAggregatorsFromCache(
aggsIter,
diff --git a/processing/src/test/java/org/apache/druid/query/ResultTest.java b/processing/src/test/java/org/apache/druid/query/ResultTest.java
new file mode 100644
index 0000000..7fe6815
--- /dev/null
+++ b/processing/src/test/java/org/apache/druid/query/ResultTest.java
@@ -0,0 +1,38 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.druid.query;
+
+import org.apache.druid.java.util.common.DateTimes;
+import org.junit.Assert;
+import org.junit.Test;
+
+public class ResultTest
+{
+ @Test
+ public void testCompareNullTimestamp()
+ {
+ final Result<Object> nullTimestamp = new Result<>(null, null);
+ final Result<Object> nullTimestamp2 = new Result<>(null, null);
+ final Result<Object> nonNullTimestamp = new Result<>(DateTimes.nowUtc(), null);
+
+ Assert.assertEquals(0, nullTimestamp.compareTo(nullTimestamp2));
+ Assert.assertEquals(1, nullTimestamp.compareTo(nonNullTimestamp));
+ }
+}
diff --git a/processing/src/test/java/org/apache/druid/query/timeseries/TimeseriesQueryQueryToolChestTest.java b/processing/src/test/java/org/apache/druid/query/timeseries/TimeseriesQueryQueryToolChestTest.java
index 304ac9b..89fdbb7 100644
--- a/processing/src/test/java/org/apache/druid/query/timeseries/TimeseriesQueryQueryToolChestTest.java
+++ b/processing/src/test/java/org/apache/druid/query/timeseries/TimeseriesQueryQueryToolChestTest.java
@@ -133,6 +133,23 @@ public class TimeseriesQueryQueryToolChestTest
Result<TimeseriesResultValue> fromResultLevelCacheRes = strategy.pullFromCache(true).apply(fromResultLevelCacheValue);
Assert.assertEquals(result2, fromResultLevelCacheRes);
+
+ final Result<TimeseriesResultValue> result3 = new Result<>(
+ // null timestamp similar to grandTotal
+ null,
+ new TimeseriesResultValue(
+ ImmutableMap.of("metric1", 2, "metric0", 3, "complexMetric", "val1", "post", 10)
+ )
+ );
+
+ preparedResultLevelCacheValue = strategy.prepareForCache(true).apply(result3);
+ fromResultLevelCacheValue = objectMapper.readValue(
+ objectMapper.writeValueAsBytes(preparedResultLevelCacheValue),
+ strategy.getCacheObjectClazz()
+ );
+
+ fromResultLevelCacheRes = strategy.pullFromCache(true).apply(fromResultLevelCacheValue);
+ Assert.assertEquals(result3, fromResultLevelCacheRes);
}
@Test
diff --git a/server/src/main/java/org/apache/druid/query/ResultLevelCachingQueryRunner.java b/server/src/main/java/org/apache/druid/query/ResultLevelCachingQueryRunner.java
index 6a303b8..6a9a640 100644
--- a/server/src/main/java/org/apache/druid/query/ResultLevelCachingQueryRunner.java
+++ b/server/src/main/java/org/apache/druid/query/ResultLevelCachingQueryRunner.java
@@ -108,43 +108,46 @@ public class ResultLevelCachingQueryRunner<T> implements QueryRunner<T>
}
final Function<T, Object> cacheFn = strategy.prepareForCache(true);
- return Sequences.wrap(Sequences.map(
- resultFromClient,
- new Function<T, T>()
+ return Sequences.wrap(
+ Sequences.map(
+ resultFromClient,
+ new Function<T, T>()
+ {
+ @Override
+ public T apply(T input)
+ {
+ if (resultLevelCachePopulator.isShouldPopulate()) {
+ resultLevelCachePopulator.cacheResultEntry(input, cacheFn);
+ }
+ return input;
+ }
+ }
+ ),
+ new SequenceWrapper()
{
@Override
- public T apply(T input)
+ public void after(boolean isDone, Throwable thrown)
{
- if (resultLevelCachePopulator.isShouldPopulate()) {
- resultLevelCachePopulator.cacheResultEntry(resultLevelCachePopulator, input, cacheFn);
+ Preconditions.checkNotNull(
+ resultLevelCachePopulator,
+ "ResultLevelCachePopulator cannot be null during cache population"
+ );
+ if (thrown != null) {
+ log.error(
+ thrown,
+ "Error while preparing for result level caching for query %s with error %s ",
+ query.getId(),
+ thrown.getMessage()
+ );
+ } else if (resultLevelCachePopulator.isShouldPopulate()) {
+ // The resultset identifier and its length is cached along with the resultset
+ resultLevelCachePopulator.populateResults();
+ log.debug("Cache population complete for query %s", query.getId());
}
- return input;
+ resultLevelCachePopulator.stopPopulating();
}
}
- ), new SequenceWrapper()
- {
- @Override
- public void after(boolean isDone, Throwable thrown)
- {
- Preconditions.checkNotNull(
- resultLevelCachePopulator,
- "ResultLevelCachePopulator cannot be null during cache population"
- );
- if (thrown != null) {
- log.error(
- thrown,
- "Error while preparing for result level caching for query %s with error %s ",
- query.getId(),
- thrown.getMessage()
- );
- } else if (resultLevelCachePopulator.isShouldPopulate()) {
- // The resultset identifier and its length is cached along with the resultset
- resultLevelCachePopulator.populateResults();
- log.debug("Cache population complete for query %s", query.getId());
- }
- resultLevelCachePopulator.cacheObjectStream = null;
- }
- });
+ );
}
} else {
return baseRunner.run(
@@ -234,20 +237,14 @@ public class ResultLevelCachingQueryRunner<T> implements QueryRunner<T>
}
}
- public class ResultLevelCachePopulator
+ private class ResultLevelCachePopulator
{
private final Cache cache;
private final ObjectMapper mapper;
private final Cache.NamedKey key;
private final CacheConfig cacheConfig;
- private ByteArrayOutputStream cacheObjectStream = new ByteArrayOutputStream();
-
- public boolean isShouldPopulate()
- {
- return shouldPopulate;
- }
-
- private boolean shouldPopulate;
+ @Nullable
+ private ByteArrayOutputStream cacheObjectStream;
private ResultLevelCachePopulator(
Cache cache,
@@ -261,29 +258,35 @@ public class ResultLevelCachingQueryRunner<T> implements QueryRunner<T>
this.mapper = mapper;
this.key = key;
this.cacheConfig = cacheConfig;
- this.shouldPopulate = shouldPopulate;
+ this.cacheObjectStream = shouldPopulate ? new ByteArrayOutputStream() : null;
+ }
+
+ boolean isShouldPopulate()
+ {
+ return cacheObjectStream != null;
+ }
+
+ void stopPopulating()
+ {
+ cacheObjectStream = null;
}
private void cacheResultEntry(
- ResultLevelCachePopulator resultLevelCachePopulator,
T resultEntry,
Function<T, Object> cacheFn
)
{
-
+ Preconditions.checkNotNull(cacheObjectStream, "cacheObjectStream");
int cacheLimit = cacheConfig.getResultLevelCacheLimit();
- try (JsonGenerator gen = mapper.getFactory().createGenerator(resultLevelCachePopulator.cacheObjectStream)) {
+ try (JsonGenerator gen = mapper.getFactory().createGenerator(cacheObjectStream)) {
gen.writeObject(cacheFn.apply(resultEntry));
- if (cacheLimit > 0 && resultLevelCachePopulator.cacheObjectStream.size() > cacheLimit) {
- shouldPopulate = false;
- resultLevelCachePopulator.cacheObjectStream = null;
- return;
+ if (cacheLimit > 0 && cacheObjectStream.size() > cacheLimit) {
+ stopPopulating();
}
}
catch (IOException ex) {
log.error(ex, "Failed to retrieve entry to be cached. Result Level caching will not be performed!");
- shouldPopulate = false;
- resultLevelCachePopulator.cacheObjectStream = null;
+ stopPopulating();
}
}
@@ -292,7 +295,7 @@ public class ResultLevelCachingQueryRunner<T> implements QueryRunner<T>
ResultLevelCacheUtil.populate(
cache,
key,
- cacheObjectStream.toByteArray()
+ Preconditions.checkNotNull(cacheObjectStream, "cacheObjectStream").toByteArray()
);
}
}
---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscribe@druid.apache.org
For additional commands, e-mail: commits-help@druid.apache.org