You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@druid.apache.org by ji...@apache.org on 2019/05/12 23:59:19 UTC

[incubator-druid] branch 0.15.0-incubating updated: Fix resultLevelCache for timeseries with grandTotal (#7624) (#7637)

This is an automated email from the ASF dual-hosted git repository.

jihoonson pushed a commit to branch 0.15.0-incubating
in repository https://gitbox.apache.org/repos/asf/incubator-druid.git


The following commit(s) were added to refs/heads/0.15.0-incubating by this push:
     new e2f15a3  Fix resultLevelCache for timeseries with grandTotal (#7624) (#7637)
e2f15a3 is described below

commit e2f15a39f38da1f2d959f34f648b179b37ffc442
Author: Jihoon Son <ji...@apache.org>
AuthorDate: Sun May 12 16:59:12 2019 -0700

    Fix resultLevelCache for timeseries with grandTotal (#7624) (#7637)
    
    * Fix resultLevelCache for timeseries with grandTotal
    
    * Address comment
    
    * fix test
---
 .../main/java/org/apache/druid/query/Result.java   |  26 +++--
 .../timeseries/TimeseriesQueryQueryToolChest.java  |  19 +++-
 .../java/org/apache/druid/query/ResultTest.java    |  38 ++++++++
 .../TimeseriesQueryQueryToolChestTest.java         |  17 ++++
 .../druid/query/ResultLevelCachingQueryRunner.java | 105 +++++++++++----------
 5 files changed, 140 insertions(+), 65 deletions(-)

diff --git a/processing/src/main/java/org/apache/druid/query/Result.java b/processing/src/main/java/org/apache/druid/query/Result.java
index c1ec3a3..9ec75ad 100644
--- a/processing/src/main/java/org/apache/druid/query/Result.java
+++ b/processing/src/main/java/org/apache/druid/query/Result.java
@@ -24,6 +24,9 @@ import com.fasterxml.jackson.annotation.JsonProperty;
 import org.apache.druid.guice.annotations.PublicApi;
 import org.joda.time.DateTime;
 
+import javax.annotation.Nullable;
+import java.util.Comparator;
+import java.util.Objects;
 import java.util.function.Function;
 
 /**
@@ -33,11 +36,12 @@ public class Result<T> implements Comparable<Result<T>>
 {
   public static String MISSING_SEGMENTS_KEY = "missingSegments";
 
+  @Nullable
   private final DateTime timestamp;
   private final T value;
 
   @JsonCreator
-  public Result(@JsonProperty("timestamp") DateTime timestamp, @JsonProperty("result") T value)
+  public Result(@JsonProperty("timestamp") @Nullable DateTime timestamp, @JsonProperty("result") T value)
   {
     this.timestamp = timestamp;
     this.value = value;
@@ -51,10 +55,12 @@ public class Result<T> implements Comparable<Result<T>>
   @Override
   public int compareTo(Result<T> tResult)
   {
-    return timestamp.compareTo(tResult.timestamp);
+    // timestamp is null for grandTotal which should come last.
+    return Comparator.nullsLast(DateTime::compareTo).compare(this.timestamp, tResult.timestamp);
   }
 
   @JsonProperty
+  @Nullable
   public DateTime getTimestamp()
   {
     return timestamp;
@@ -78,22 +84,22 @@ public class Result<T> implements Comparable<Result<T>>
 
     Result result = (Result) o;
 
-    if (timestamp != null ? !(timestamp.isEqual(result.timestamp) && timestamp.getZone().getOffset(timestamp) == result.timestamp.getZone().getOffset(result.timestamp)) : result.timestamp != null) {
-      return false;
-    }
-    if (value != null ? !value.equals(result.value) : result.value != null) {
+    if (timestamp != null && result.timestamp != null) {
+      if (!timestamp.isEqual(result.timestamp)
+          && timestamp.getZone().getOffset(timestamp) == result.timestamp.getZone().getOffset(result.timestamp)) {
+        return false;
+      }
+    } else if (timestamp == null ^ result.timestamp == null) {
       return false;
     }
 
-    return true;
+    return Objects.equals(value, result.value);
   }
 
   @Override
   public int hashCode()
   {
-    int result = timestamp != null ? timestamp.hashCode() : 0;
-    result = 31 * result + (value != null ? value.hashCode() : 0);
-    return result;
+    return Objects.hash(timestamp, value);
   }
 
   @Override
diff --git a/processing/src/main/java/org/apache/druid/query/timeseries/TimeseriesQueryQueryToolChest.java b/processing/src/main/java/org/apache/druid/query/timeseries/TimeseriesQueryQueryToolChest.java
index d625c31..0ae9a70 100644
--- a/processing/src/main/java/org/apache/druid/query/timeseries/TimeseriesQueryQueryToolChest.java
+++ b/processing/src/main/java/org/apache/druid/query/timeseries/TimeseriesQueryQueryToolChest.java
@@ -22,6 +22,7 @@ package org.apache.druid.query.timeseries;
 import com.fasterxml.jackson.core.type.TypeReference;
 import com.google.common.annotations.VisibleForTesting;
 import com.google.common.base.Function;
+import com.google.common.base.Preconditions;
 import com.google.common.collect.ImmutableList;
 import com.google.common.collect.ImmutableMap;
 import com.google.common.collect.Lists;
@@ -52,7 +53,6 @@ import org.apache.druid.query.cache.CacheKeyBuilder;
 import org.apache.druid.query.groupby.RowBasedColumnSelectorFactory;
 import org.joda.time.DateTime;
 
-import javax.annotation.Nullable;
 import java.util.Collections;
 import java.util.HashMap;
 import java.util.Iterator;
@@ -303,7 +303,12 @@ public class TimeseriesQueryQueryToolChest extends QueryToolChest<Result<Timeser
           TimeseriesResultValue results = input.getValue();
           final List<Object> retVal = Lists.newArrayListWithCapacity(1 + aggs.size());
 
-          retVal.add(input.getTimestamp().getMillis());
+          // Timestamp can be null if grandTotal is true.
+          if (isResultLevelCache) {
+            retVal.add(input.getTimestamp() == null ? null : input.getTimestamp().getMillis());
+          } else {
+            retVal.add(Preconditions.checkNotNull(input.getTimestamp(), "timestamp of input[%s]", input).getMillis());
+          }
           for (AggregatorFactory agg : aggs) {
             retVal.add(results.getMetric(agg.getName()));
           }
@@ -324,7 +329,7 @@ public class TimeseriesQueryQueryToolChest extends QueryToolChest<Result<Timeser
           private final Granularity granularity = query.getGranularity();
 
           @Override
-          public Result<TimeseriesResultValue> apply(@Nullable Object input)
+          public Result<TimeseriesResultValue> apply(Object input)
           {
             List<Object> results = (List<Object>) input;
             final Map<String, Object> retVal = Maps.newLinkedHashMap();
@@ -332,7 +337,13 @@ public class TimeseriesQueryQueryToolChest extends QueryToolChest<Result<Timeser
             Iterator<AggregatorFactory> aggsIter = aggs.iterator();
             Iterator<Object> resultIter = results.iterator();
 
-            DateTime timestamp = granularity.toDateTime(((Number) resultIter.next()).longValue());
+            final Number timestampNumber = (Number) resultIter.next();
+            final DateTime timestamp;
+            if (isResultLevelCache) {
+              timestamp = timestampNumber == null ? null : granularity.toDateTime(timestampNumber.longValue());
+            } else {
+              timestamp = granularity.toDateTime(Preconditions.checkNotNull(timestampNumber, "timestamp").longValue());
+            }
 
             CacheStrategy.fetchAggregatorsFromCache(
                 aggsIter,
diff --git a/processing/src/test/java/org/apache/druid/query/ResultTest.java b/processing/src/test/java/org/apache/druid/query/ResultTest.java
new file mode 100644
index 0000000..7fe6815
--- /dev/null
+++ b/processing/src/test/java/org/apache/druid/query/ResultTest.java
@@ -0,0 +1,38 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.druid.query;
+
+import org.apache.druid.java.util.common.DateTimes;
+import org.junit.Assert;
+import org.junit.Test;
+
+public class ResultTest
+{
+  @Test
+  public void testCompareNullTimestamp()
+  {
+    final Result<Object> nullTimestamp = new Result<>(null, null);
+    final Result<Object> nullTimestamp2 = new Result<>(null, null);
+    final Result<Object> nonNullTimestamp = new Result<>(DateTimes.nowUtc(), null);
+
+    Assert.assertEquals(0, nullTimestamp.compareTo(nullTimestamp2));
+    Assert.assertEquals(1, nullTimestamp.compareTo(nonNullTimestamp));
+  }
+}
diff --git a/processing/src/test/java/org/apache/druid/query/timeseries/TimeseriesQueryQueryToolChestTest.java b/processing/src/test/java/org/apache/druid/query/timeseries/TimeseriesQueryQueryToolChestTest.java
index 304ac9b..89fdbb7 100644
--- a/processing/src/test/java/org/apache/druid/query/timeseries/TimeseriesQueryQueryToolChestTest.java
+++ b/processing/src/test/java/org/apache/druid/query/timeseries/TimeseriesQueryQueryToolChestTest.java
@@ -133,6 +133,23 @@ public class TimeseriesQueryQueryToolChestTest
 
     Result<TimeseriesResultValue> fromResultLevelCacheRes = strategy.pullFromCache(true).apply(fromResultLevelCacheValue);
     Assert.assertEquals(result2, fromResultLevelCacheRes);
+
+    final Result<TimeseriesResultValue> result3 = new Result<>(
+        // null timestamp similar to grandTotal
+        null,
+        new TimeseriesResultValue(
+            ImmutableMap.of("metric1", 2, "metric0", 3, "complexMetric", "val1", "post", 10)
+        )
+    );
+
+    preparedResultLevelCacheValue = strategy.prepareForCache(true).apply(result3);
+    fromResultLevelCacheValue = objectMapper.readValue(
+        objectMapper.writeValueAsBytes(preparedResultLevelCacheValue),
+        strategy.getCacheObjectClazz()
+    );
+
+    fromResultLevelCacheRes = strategy.pullFromCache(true).apply(fromResultLevelCacheValue);
+    Assert.assertEquals(result3, fromResultLevelCacheRes);
   }
 
   @Test
diff --git a/server/src/main/java/org/apache/druid/query/ResultLevelCachingQueryRunner.java b/server/src/main/java/org/apache/druid/query/ResultLevelCachingQueryRunner.java
index 6a303b8..6a9a640 100644
--- a/server/src/main/java/org/apache/druid/query/ResultLevelCachingQueryRunner.java
+++ b/server/src/main/java/org/apache/druid/query/ResultLevelCachingQueryRunner.java
@@ -108,43 +108,46 @@ public class ResultLevelCachingQueryRunner<T> implements QueryRunner<T>
         }
         final Function<T, Object> cacheFn = strategy.prepareForCache(true);
 
-        return Sequences.wrap(Sequences.map(
-            resultFromClient,
-            new Function<T, T>()
+        return Sequences.wrap(
+            Sequences.map(
+                resultFromClient,
+                new Function<T, T>()
+                {
+                  @Override
+                  public T apply(T input)
+                  {
+                    if (resultLevelCachePopulator.isShouldPopulate()) {
+                      resultLevelCachePopulator.cacheResultEntry(input, cacheFn);
+                    }
+                    return input;
+                  }
+                }
+            ),
+            new SequenceWrapper()
             {
               @Override
-              public T apply(T input)
+              public void after(boolean isDone, Throwable thrown)
               {
-                if (resultLevelCachePopulator.isShouldPopulate()) {
-                  resultLevelCachePopulator.cacheResultEntry(resultLevelCachePopulator, input, cacheFn);
+                Preconditions.checkNotNull(
+                    resultLevelCachePopulator,
+                    "ResultLevelCachePopulator cannot be null during cache population"
+                );
+                if (thrown != null) {
+                  log.error(
+                      thrown,
+                      "Error while preparing for result level caching for query %s with error %s ",
+                      query.getId(),
+                      thrown.getMessage()
+                  );
+                } else if (resultLevelCachePopulator.isShouldPopulate()) {
+                  // The resultset identifier and its length is cached along with the resultset
+                  resultLevelCachePopulator.populateResults();
+                  log.debug("Cache population complete for query %s", query.getId());
                 }
-                return input;
+                resultLevelCachePopulator.stopPopulating();
               }
             }
-        ), new SequenceWrapper()
-        {
-          @Override
-          public void after(boolean isDone, Throwable thrown)
-          {
-            Preconditions.checkNotNull(
-                resultLevelCachePopulator,
-                "ResultLevelCachePopulator cannot be null during cache population"
-            );
-            if (thrown != null) {
-              log.error(
-                  thrown,
-                  "Error while preparing for result level caching for query %s with error %s ",
-                  query.getId(),
-                  thrown.getMessage()
-              );
-            } else if (resultLevelCachePopulator.isShouldPopulate()) {
-              // The resultset identifier and its length is cached along with the resultset
-              resultLevelCachePopulator.populateResults();
-              log.debug("Cache population complete for query %s", query.getId());
-            }
-            resultLevelCachePopulator.cacheObjectStream = null;
-          }
-        });
+        );
       }
     } else {
       return baseRunner.run(
@@ -234,20 +237,14 @@ public class ResultLevelCachingQueryRunner<T> implements QueryRunner<T>
     }
   }
 
-  public class ResultLevelCachePopulator
+  private class ResultLevelCachePopulator
   {
     private final Cache cache;
     private final ObjectMapper mapper;
     private final Cache.NamedKey key;
     private final CacheConfig cacheConfig;
-    private ByteArrayOutputStream cacheObjectStream = new ByteArrayOutputStream();
-
-    public boolean isShouldPopulate()
-    {
-      return shouldPopulate;
-    }
-
-    private boolean shouldPopulate;
+    @Nullable
+    private ByteArrayOutputStream cacheObjectStream;
 
     private ResultLevelCachePopulator(
         Cache cache,
@@ -261,29 +258,35 @@ public class ResultLevelCachingQueryRunner<T> implements QueryRunner<T>
       this.mapper = mapper;
       this.key = key;
       this.cacheConfig = cacheConfig;
-      this.shouldPopulate = shouldPopulate;
+      this.cacheObjectStream = shouldPopulate ? new ByteArrayOutputStream() : null;
+    }
+
+    boolean isShouldPopulate()
+    {
+      return cacheObjectStream != null;
+    }
+
+    void stopPopulating()
+    {
+      cacheObjectStream = null;
     }
 
     private void cacheResultEntry(
-        ResultLevelCachePopulator resultLevelCachePopulator,
         T resultEntry,
         Function<T, Object> cacheFn
     )
     {
-
+      Preconditions.checkNotNull(cacheObjectStream, "cacheObjectStream");
       int cacheLimit = cacheConfig.getResultLevelCacheLimit();
-      try (JsonGenerator gen = mapper.getFactory().createGenerator(resultLevelCachePopulator.cacheObjectStream)) {
+      try (JsonGenerator gen = mapper.getFactory().createGenerator(cacheObjectStream)) {
         gen.writeObject(cacheFn.apply(resultEntry));
-        if (cacheLimit > 0 && resultLevelCachePopulator.cacheObjectStream.size() > cacheLimit) {
-          shouldPopulate = false;
-          resultLevelCachePopulator.cacheObjectStream = null;
-          return;
+        if (cacheLimit > 0 && cacheObjectStream.size() > cacheLimit) {
+          stopPopulating();
         }
       }
       catch (IOException ex) {
         log.error(ex, "Failed to retrieve entry to be cached. Result Level caching will not be performed!");
-        shouldPopulate = false;
-        resultLevelCachePopulator.cacheObjectStream = null;
+        stopPopulating();
       }
     }
 
@@ -292,7 +295,7 @@ public class ResultLevelCachingQueryRunner<T> implements QueryRunner<T>
       ResultLevelCacheUtil.populate(
           cache,
           key,
-          cacheObjectStream.toByteArray()
+          Preconditions.checkNotNull(cacheObjectStream, "cacheObjectStream").toByteArray()
       );
     }
   }


---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscribe@druid.apache.org
For additional commands, e-mail: commits-help@druid.apache.org