You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@druid.apache.org by fj...@apache.org on 2019/05/21 18:31:43 UTC

[incubator-druid] branch 0.15.0-incubating updated: fix issue where result level cache was recomputing post aggs that were already cached, causing issues with finalizing aggregators (#7708) (#7711)

This is an automated email from the ASF dual-hosted git repository.

fjy pushed a commit to branch 0.15.0-incubating
in repository https://gitbox.apache.org/repos/asf/incubator-druid.git


The following commit(s) were added to refs/heads/0.15.0-incubating by this push:
     new 1909dda  fix issue where result level cache was recomputing post aggs that were already cached, causing issues with finalizing aggregators (#7708) (#7711)
1909dda is described below

commit 1909dda5274d7e0316b5d6c0326ee68b1ca4f936
Author: Jihoon Son <ji...@apache.org>
AuthorDate: Tue May 21 11:31:38 2019 -0700

    fix issue where result level cache was recomputing post aggs that were already cached, causing issues with finalizing aggregators (#7708) (#7711)
---
 .../druid/query/topn/TopNQueryQueryToolChest.java  |  10 +-
 .../query/topn/TopNQueryQueryToolChestTest.java    | 131 +++++++++++++++++++++
 2 files changed, 136 insertions(+), 5 deletions(-)

diff --git a/processing/src/main/java/org/apache/druid/query/topn/TopNQueryQueryToolChest.java b/processing/src/main/java/org/apache/druid/query/topn/TopNQueryQueryToolChest.java
index d87a178..53af0908 100644
--- a/processing/src/main/java/org/apache/druid/query/topn/TopNQueryQueryToolChest.java
+++ b/processing/src/main/java/org/apache/druid/query/topn/TopNQueryQueryToolChest.java
@@ -294,8 +294,7 @@ public class TopNQueryQueryToolChest extends QueryToolChest<Result<TopNResultVal
       private final List<AggregatorFactory> aggs = Lists.newArrayList(query.getAggregatorSpecs());
       private final List<PostAggregator> postAggs = AggregatorUtil.pruneDependentPostAgg(
           query.getPostAggregatorSpecs(),
-          query.getTopNMetricSpec()
-               .getMetricName(query.getDimensionSpec())
+          query.getTopNMetricSpec().getMetricName(query.getDimensionSpec())
       );
 
       @Override
@@ -419,14 +418,15 @@ public class TopNQueryQueryToolChest extends QueryToolChest<Result<TopNResultVal
                   }
               );
 
-              for (PostAggregator postAgg : postAggs) {
-                vals.put(postAgg.getName(), postAgg.compute(vals));
-              }
               if (isResultLevelCache) {
                 Iterator<PostAggregator> postItr = query.getPostAggregatorSpecs().iterator();
                 while (postItr.hasNext() && resultIter.hasNext()) {
                   vals.put(postItr.next().getName(), resultIter.next());
                 }
+              } else {
+                for (PostAggregator postAgg : postAggs) {
+                  vals.put(postAgg.getName(), postAgg.compute(vals));
+                }
               }
               retVal.add(vals);
             }
diff --git a/processing/src/test/java/org/apache/druid/query/topn/TopNQueryQueryToolChestTest.java b/processing/src/test/java/org/apache/druid/query/topn/TopNQueryQueryToolChestTest.java
index 191cc55..f9080e7 100644
--- a/processing/src/test/java/org/apache/druid/query/topn/TopNQueryQueryToolChestTest.java
+++ b/processing/src/test/java/org/apache/druid/query/topn/TopNQueryQueryToolChestTest.java
@@ -24,6 +24,7 @@ import com.google.common.collect.ImmutableList;
 import com.google.common.collect.ImmutableMap;
 import org.apache.druid.collections.CloseableStupidPool;
 import org.apache.druid.collections.SerializablePair;
+import org.apache.druid.hll.HyperLogLogCollector;
 import org.apache.druid.java.util.common.DateTimes;
 import org.apache.druid.java.util.common.Intervals;
 import org.apache.druid.java.util.common.granularity.Granularities;
@@ -40,6 +41,8 @@ import org.apache.druid.query.aggregation.AggregatorFactory;
 import org.apache.druid.query.aggregation.CountAggregatorFactory;
 import org.apache.druid.query.aggregation.LongSumAggregatorFactory;
 import org.apache.druid.query.aggregation.SerializablePairLongString;
+import org.apache.druid.query.aggregation.cardinality.CardinalityAggregator;
+import org.apache.druid.query.aggregation.hyperloglog.HyperUniquesAggregatorFactory;
 import org.apache.druid.query.aggregation.last.DoubleLastAggregatorFactory;
 import org.apache.druid.query.aggregation.last.FloatLastAggregatorFactory;
 import org.apache.druid.query.aggregation.last.LongLastAggregatorFactory;
@@ -47,6 +50,7 @@ import org.apache.druid.query.aggregation.last.StringLastAggregatorFactory;
 import org.apache.druid.query.aggregation.post.ArithmeticPostAggregator;
 import org.apache.druid.query.aggregation.post.ConstantPostAggregator;
 import org.apache.druid.query.aggregation.post.FieldAccessPostAggregator;
+import org.apache.druid.query.aggregation.post.FinalizingFieldAccessPostAggregator;
 import org.apache.druid.query.dimension.DefaultDimensionSpec;
 import org.apache.druid.query.spec.MultipleIntervalSegmentSpec;
 import org.apache.druid.segment.IncrementalIndexSegment;
@@ -80,6 +84,15 @@ public class TopNQueryQueryToolChestTest
   }
 
   @Test
+  public void testCacheStrategyOrderByPostAggs() throws Exception
+  {
+    doTestCacheStrategyOrderByPost(ValueType.STRING, "val1");
+    doTestCacheStrategyOrderByPost(ValueType.FLOAT, 2.1f);
+    doTestCacheStrategyOrderByPost(ValueType.DOUBLE, 2.1d);
+    doTestCacheStrategyOrderByPost(ValueType.LONG, 2L);
+  }
+
+  @Test
   public void testComputeCacheKeyWithDifferentPostAgg()
   {
     final TopNQuery query1 = new TopNQuery(
@@ -306,6 +319,28 @@ public class TopNQueryQueryToolChestTest
     }
   }
 
+  private HyperLogLogCollector getIntermediateHllCollector(final ValueType valueType, final Object dimValue)
+  {
+    HyperLogLogCollector collector = HyperLogLogCollector.makeLatestCollector();
+    switch (valueType) {
+      case LONG:
+        collector.add(CardinalityAggregator.hashFn.hashLong((Long) dimValue).asBytes());
+        break;
+      case DOUBLE:
+        collector.add(CardinalityAggregator.hashFn.hashLong(Double.doubleToLongBits((Double) dimValue)).asBytes());
+        break;
+      case FLOAT:
+        collector.add(CardinalityAggregator.hashFn.hashInt(Float.floatToIntBits((Float) dimValue)).asBytes());
+        break;
+      case STRING:
+        collector.add(CardinalityAggregator.hashFn.hashUnencodedChars((String) dimValue).asBytes());
+        break;
+      default:
+        throw new IllegalArgumentException("bad valueType: " + valueType);
+    }
+    return collector;
+  }
+
   private void doTestCacheStrategy(final ValueType valueType, final Object dimValue) throws IOException
   {
     CacheStrategy<Result<TopNResultValue>, Object, TopNQuery> strategy =
@@ -419,6 +454,102 @@ public class TopNQueryQueryToolChestTest
     Assert.assertEquals(typeAdjustedResult2, fromResultCacheResult);
   }
 
+  private void doTestCacheStrategyOrderByPost(final ValueType valueType, final Object dimValue) throws IOException
+  {
+    CacheStrategy<Result<TopNResultValue>, Object, TopNQuery> strategy =
+        new TopNQueryQueryToolChest(null, null).getCacheStrategy(
+            new TopNQuery(
+                new TableDataSource("dummy"),
+                VirtualColumns.EMPTY,
+                new DefaultDimensionSpec("test", "test", valueType),
+                new NumericTopNMetricSpec("post"),
+                3,
+                new MultipleIntervalSegmentSpec(ImmutableList.of(Intervals.of("2015-01-01/2015-01-02"))),
+                null,
+                Granularities.ALL,
+                ImmutableList.of(
+                    new HyperUniquesAggregatorFactory("metric1", "test", false, false),
+                    new CountAggregatorFactory("metric2")
+                ),
+                ImmutableList.of(
+                    new ArithmeticPostAggregator(
+                        "post",
+                        "+",
+                        ImmutableList.of(
+                            new FinalizingFieldAccessPostAggregator(
+                                "metric1",
+                                "metric1"
+                            ),
+                            new FieldAccessPostAggregator(
+                                "metric2",
+                                "metric2"
+                            )
+                        )
+                    )
+                ),
+                null
+            )
+        );
+
+    HyperLogLogCollector collector = getIntermediateHllCollector(valueType, dimValue);
+
+    final Result<TopNResultValue> result1 = new Result<>(
+        // test timestamps that result in integer size millis
+        DateTimes.utc(123L),
+        new TopNResultValue(
+            Collections.singletonList(
+                ImmutableMap.of(
+                    "test", dimValue,
+                    "metric1", collector,
+                    "metric2", 2,
+                    "post", collector.estimateCardinality() + 2
+                )
+            )
+        )
+    );
+
+    Object preparedValue = strategy.prepareForSegmentLevelCache().apply(
+        result1
+    );
+
+    ObjectMapper objectMapper = TestHelper.makeJsonMapper();
+    Object fromCacheValue = objectMapper.readValue(
+        objectMapper.writeValueAsBytes(preparedValue),
+        strategy.getCacheObjectClazz()
+    );
+
+    Result<TopNResultValue> fromCacheResult = strategy.pullFromSegmentLevelCache().apply(fromCacheValue);
+
+    Assert.assertEquals(result1, fromCacheResult);
+
+    final Result<TopNResultValue> resultLevelCacheResult = new Result<>(
+        // test timestamps that result in integer size millis
+        DateTimes.utc(123L),
+        new TopNResultValue(
+            Collections.singletonList(
+                ImmutableMap.of(
+                    "test", dimValue,
+                    "metric1", collector.estimateCardinality(),
+                    "metric2", 2,
+                    "post", collector.estimateCardinality() + 2
+                )
+            )
+        )
+    );
+
+    Object preparedResultCacheValue = strategy.prepareForCache(true).apply(
+        resultLevelCacheResult
+    );
+
+    Object fromResultCacheValue = objectMapper.readValue(
+        objectMapper.writeValueAsBytes(preparedResultCacheValue),
+        strategy.getCacheObjectClazz()
+    );
+
+    Result<TopNResultValue> fromResultCacheResult = strategy.pullFromCache(true).apply(fromResultCacheValue);
+    Assert.assertEquals(resultLevelCacheResult, fromResultCacheResult);
+  }
+
   static class MockQueryRunner implements QueryRunner<Result<TopNResultValue>>
   {
     private final QueryRunner<Result<TopNResultValue>> runner;


---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscribe@druid.apache.org
For additional commands, e-mail: commits-help@druid.apache.org