You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@druid.apache.org by fj...@apache.org on 2019/05/31 00:17:32 UTC

[incubator-druid] branch 0.15.0-incubating updated: fix group-by v2 BufferArrayGrouper for empty multi-value dimension row (#7794) (#7803)

This is an automated email from the ASF dual-hosted git repository.

fjy pushed a commit to branch 0.15.0-incubating
in repository https://gitbox.apache.org/repos/asf/incubator-druid.git


The following commit(s) were added to refs/heads/0.15.0-incubating by this push:
     new 78210ba  fix group-by v2 BufferArrayGrouper for empty multi-value dimension row (#7794) (#7803)
78210ba is described below

commit 78210ba2c2cef40207022344d9c80ee52778328d
Author: Clint Wylie <cj...@gmail.com>
AuthorDate: Thu May 30 17:17:22 2019 -0700

    fix group-by v2 BufferArrayGrouper for empty multi-value dimension row (#7794) (#7803)
    
    * fix groupby v2 BufferArrayGrouper
    
    * better name test
    
    * fix sql compatible null handling array grouper bug
    
    * another test
---
 .../groupby/epinephelinae/BufferArrayGrouper.java  |  47 ++++---
 .../epinephelinae/GroupByQueryEngineV2.java        |   4 +-
 .../druid/query/MultiValuedDimensionTest.java      | 138 ++++++++++++++++++++-
 3 files changed, 157 insertions(+), 32 deletions(-)

diff --git a/processing/src/main/java/org/apache/druid/query/groupby/epinephelinae/BufferArrayGrouper.java b/processing/src/main/java/org/apache/druid/query/groupby/epinephelinae/BufferArrayGrouper.java
index 4bc541f..7994996 100644
--- a/processing/src/main/java/org/apache/druid/query/groupby/epinephelinae/BufferArrayGrouper.java
+++ b/processing/src/main/java/org/apache/druid/query/groupby/epinephelinae/BufferArrayGrouper.java
@@ -233,48 +233,33 @@ public class BufferArrayGrouper implements IntGrouper
 
     return new CloseableIterator<Entry<Integer>>()
     {
-      int cur;
-      boolean findNext = false;
-
-      {
-        cur = findNext();
-      }
+      // initialize to the first used slot
+      private int next = findNext(-1);
 
       @Override
       public boolean hasNext()
       {
-        if (findNext) {
-          cur = findNext();
-          findNext = false;
-        }
-        return cur >= 0;
-      }
-
-      private int findNext()
-      {
-        for (int i = cur + 1; i < cardinalityWithMissingValue; i++) {
-          if (isUsedSlot(i)) {
-            return i;
-          }
-        }
-        return -1;
+        return next >= 0;
       }
 
       @Override
       public Entry<Integer> next()
       {
-        if (cur < 0) {
+        if (next < 0) {
           throw new NoSuchElementException();
         }
 
-        findNext = true;
+        final int current = next;
+        next = findNext(current);
 
         final Object[] values = new Object[aggregators.length];
-        final int recordOffset = cur * recordSize;
+        final int recordOffset = current * recordSize;
         for (int i = 0; i < aggregators.length; i++) {
           values[i] = aggregators[i].get(valBuffer, recordOffset + aggregatorOffsets[i]);
         }
-        return new Entry<>(cur - 1, values);
+        // shift by -1 since values are initially shifted by +1 so they are all positive and
+        // GroupByColumnSelectorStrategy.GROUP_BY_MISSING_VALUE is -1
+        return new Entry<>(current - 1, values);
       }
 
       @Override
@@ -282,6 +267,18 @@ public class BufferArrayGrouper implements IntGrouper
       {
         // do nothing
       }
+
+      private int findNext(int current)
+      {
+        // shift by +1 since we're looking for the next used slot after the current position
+        for (int i = current + 1; i < cardinalityWithMissingValue; i++) {
+          if (isUsedSlot(i)) {
+            return i;
+          }
+        }
+        // no more slots
+        return -1;
+      }
     };
   }
 }
diff --git a/processing/src/main/java/org/apache/druid/query/groupby/epinephelinae/GroupByQueryEngineV2.java b/processing/src/main/java/org/apache/druid/query/groupby/epinephelinae/GroupByQueryEngineV2.java
index 1b58dfe..e3c609e 100644
--- a/processing/src/main/java/org/apache/druid/query/groupby/epinephelinae/GroupByQueryEngineV2.java
+++ b/processing/src/main/java/org/apache/druid/query/groupby/epinephelinae/GroupByQueryEngineV2.java
@@ -359,7 +359,7 @@ public class GroupByQueryEngineV2
             delegate.close();
           }
           delegate = initNewDelegate();
-          return true;
+          return delegate.hasNext();
         } else {
           return false;
         }
@@ -681,7 +681,7 @@ public class GroupByQueryEngineV2
               ((DimensionSelector) dim.getSelector()).lookupName(key)
           );
         } else {
-          map.put(dim.getOutputName(), "");
+          map.put(dim.getOutputName(), NullHandling.defaultStringValue());
         }
       }
     }
diff --git a/processing/src/test/java/org/apache/druid/query/MultiValuedDimensionTest.java b/processing/src/test/java/org/apache/druid/query/MultiValuedDimensionTest.java
index c4c4a0b..1ee0daf 100644
--- a/processing/src/test/java/org/apache/druid/query/MultiValuedDimensionTest.java
+++ b/processing/src/test/java/org/apache/druid/query/MultiValuedDimensionTest.java
@@ -25,9 +25,11 @@ import com.google.common.collect.ImmutableSet;
 import com.google.common.io.Files;
 import org.apache.commons.io.FileUtils;
 import org.apache.druid.collections.CloseableStupidPool;
+import org.apache.druid.common.config.NullHandling;
 import org.apache.druid.data.input.Row;
 import org.apache.druid.data.input.impl.CSVParseSpec;
 import org.apache.druid.data.input.impl.DimensionsSpec;
+import org.apache.druid.data.input.impl.JSONParseSpec;
 import org.apache.druid.data.input.impl.StringInputRowParser;
 import org.apache.druid.data.input.impl.TimestampSpec;
 import org.apache.druid.java.util.common.DateTimes;
@@ -38,11 +40,13 @@ import org.apache.druid.query.aggregation.CountAggregatorFactory;
 import org.apache.druid.query.dimension.DefaultDimensionSpec;
 import org.apache.druid.query.dimension.ListFilteredDimensionSpec;
 import org.apache.druid.query.dimension.RegexFilteredDimensionSpec;
+import org.apache.druid.query.filter.InDimFilter;
 import org.apache.druid.query.filter.SelectorDimFilter;
 import org.apache.druid.query.groupby.GroupByQuery;
 import org.apache.druid.query.groupby.GroupByQueryConfig;
 import org.apache.druid.query.groupby.GroupByQueryRunnerTest;
 import org.apache.druid.query.groupby.GroupByQueryRunnerTestHelper;
+import org.apache.druid.query.groupby.strategy.GroupByStrategySelector;
 import org.apache.druid.query.spec.LegacySegmentSpec;
 import org.apache.druid.query.topn.TopNQuery;
 import org.apache.druid.query.topn.TopNQueryBuilder;
@@ -82,13 +86,15 @@ import java.util.Map;
 @RunWith(Parameterized.class)
 public class MultiValuedDimensionTest
 {
-  @Parameterized.Parameters(name = "{0}")
+  @Parameterized.Parameters(name = "groupby: {0} forceHashAggregation: {2} ({1})")
   public static Collection<?> constructorFeeder()
   {
     final List<Object[]> constructors = new ArrayList<>();
     for (GroupByQueryConfig config : GroupByQueryRunnerTest.testConfigs()) {
-      constructors.add(new Object[]{config, TmpFileSegmentWriteOutMediumFactory.instance()});
-      constructors.add(new Object[]{config, OffHeapMemorySegmentWriteOutMediumFactory.instance()});
+      constructors.add(new Object[]{config, TmpFileSegmentWriteOutMediumFactory.instance(), false});
+      constructors.add(new Object[]{config, OffHeapMemorySegmentWriteOutMediumFactory.instance(), false});
+      constructors.add(new Object[]{config, TmpFileSegmentWriteOutMediumFactory.instance(), true});
+      constructors.add(new Object[]{config, OffHeapMemorySegmentWriteOutMediumFactory.instance(), true});
     }
     return constructors;
   }
@@ -101,7 +107,13 @@ public class MultiValuedDimensionTest
 
   private File persistedSegmentDir;
 
-  public MultiValuedDimensionTest(final GroupByQueryConfig config, SegmentWriteOutMediumFactory segmentWriteOutMediumFactory)
+  private IncrementalIndex incrementalIndexNullSampler;
+  private QueryableIndex queryableIndexNullSampler;
+  private File persistedSegmentDirNullSampler;
+
+  private final ImmutableMap<String, Object> context;
+
+  public MultiValuedDimensionTest(final GroupByQueryConfig config, SegmentWriteOutMediumFactory segmentWriteOutMediumFactory, boolean forceHashAggregation)
   {
     helper = AggregationTestHelper.createGroupByQueryAggregationTestHelper(
         ImmutableList.of(),
@@ -109,6 +121,10 @@ public class MultiValuedDimensionTest
         null
     );
     this.segmentWriteOutMediumFactory = segmentWriteOutMediumFactory;
+
+    this.context = config.getDefaultStrategy().equals(GroupByStrategySelector.STRATEGY_V1)
+                   ? ImmutableMap.of()
+                   : ImmutableMap.of("forceHashAggregation", forceHashAggregation);
   }
 
   @Before
@@ -147,6 +163,41 @@ public class MultiValuedDimensionTest
               .persist(incrementalIndex, persistedSegmentDir, new IndexSpec(), null);
 
     queryableIndex = TestHelper.getTestIndexIO().loadIndex(persistedSegmentDir);
+
+
+
+    StringInputRowParser parserNullSampler = new StringInputRowParser(
+        new JSONParseSpec(
+            new TimestampSpec("time", "iso", null),
+            new DimensionsSpec(DimensionsSpec.getDefaultSchemas(ImmutableList.of("product", "tags", "othertags")), null, null)
+        ),
+        "UTF-8"
+    );
+
+    incrementalIndexNullSampler = new IncrementalIndex.Builder()
+        .setSimpleTestingIndexSchema(new CountAggregatorFactory("count"))
+        .setMaxRowCount(5000)
+        .buildOnheap();
+
+    String[] rowsNullSampler = new String[]{
+        "{\"time\":\"2011-01-13T00:00:00.000Z\",\"product\":\"product_1\",\"tags\":[],\"othertags\":[\"u1\", \"u2\"]}",
+        "{\"time\":\"2011-01-12T00:00:00.000Z\",\"product\":\"product_2\",\"othertags\":[\"u3\", \"u4\"]}",
+        "{\"time\":\"2011-01-14T00:00:00.000Z\",\"product\":\"product_3\",\"tags\":[\"\"],\"othertags\":[\"u1\", \"u5\"]}",
+        "{\"time\":\"2011-01-15T00:00:00.000Z\",\"product\":\"product_4\",\"tags\":[\"t1\", \"t2\", \"\"],\"othertags\":[\"u6\", \"u7\"]}",
+        "{\"time\":\"2011-01-16T00:00:00.000Z\",\"product\":\"product_5\",\"tags\":[],\"othertags\":[]}",
+        "{\"time\":\"2011-01-16T00:00:00.000Z\",\"product\":\"product_6\"}",
+        "{\"time\":\"2011-01-16T00:00:00.000Z\",\"product\":\"product_7\",\"othertags\":[]}",
+        "{\"time\":\"2011-01-16T00:00:00.000Z\",\"product\":\"product_8\",\"tags\":[\"\"],\"othertags\":[]}"
+    };
+
+    for (String row : rowsNullSampler) {
+      incrementalIndexNullSampler.add(parserNullSampler.parse(row));
+    }
+    persistedSegmentDirNullSampler = Files.createTempDir();
+    TestHelper.getTestIndexMergerV9(segmentWriteOutMediumFactory)
+              .persist(incrementalIndexNullSampler, persistedSegmentDirNullSampler, new IndexSpec(), null);
+
+    queryableIndexNullSampler = TestHelper.getTestIndexIO().loadIndex(persistedSegmentDirNullSampler);
   }
 
   @After
@@ -165,6 +216,7 @@ public class MultiValuedDimensionTest
         .setGranularity(Granularities.ALL)
         .setDimensions(new DefaultDimensionSpec("tags", "tags"))
         .setAggregatorSpecs(new CountAggregatorFactory("count"))
+        .setContext(context)
         .build();
 
     Sequence<Row> result = helper.runQueryOnSegmentsObjs(
@@ -200,6 +252,7 @@ public class MultiValuedDimensionTest
         .setDimensions(new DefaultDimensionSpec("tags", "tags"))
         .setAggregatorSpecs(new CountAggregatorFactory("count"))
         .setDimFilter(new SelectorDimFilter("tags", "t3", null))
+        .setContext(context)
         .build();
 
     Sequence<Row> result = helper.runQueryOnSegmentsObjs(
@@ -222,6 +275,79 @@ public class MultiValuedDimensionTest
   }
 
   @Test
+  public void testGroupByWithDimFilterEmptyResults()
+  {
+    GroupByQuery query = GroupByQuery
+        .builder()
+        .setDataSource("xx")
+        .setQuerySegmentSpec(new LegacySegmentSpec("1970/3000"))
+        .setGranularity(Granularities.ALL)
+        .setDimensions(new DefaultDimensionSpec("tags", "tags"))
+        .setAggregatorSpecs(new CountAggregatorFactory("count"))
+        .setDimFilter(new InDimFilter("product", ImmutableList.of("product_5"), null))
+        .setContext(context)
+        .build();
+
+    Sequence<Row> result = helper.runQueryOnSegmentsObjs(
+        ImmutableList.of(
+            new QueryableIndexSegment(queryableIndexNullSampler, SegmentId.dummy("sid1")),
+            new IncrementalIndexSegment(incrementalIndexNullSampler, SegmentId.dummy("sid2"))
+        ),
+        query
+    );
+
+    List<Row> expectedResults = Collections.singletonList(
+        GroupByQueryRunnerTestHelper.createExpectedRow("1970-01-01T00:00:00.000Z", "tags", null, "count", 2L)
+    );
+
+    TestHelper.assertExpectedObjects(expectedResults, result.toList(), "filter-empty");
+  }
+
+  @Test
+  public void testGroupByWithDimFilterNullishResults()
+  {
+    GroupByQuery query = GroupByQuery
+        .builder()
+        .setDataSource("xx")
+        .setQuerySegmentSpec(new LegacySegmentSpec("1970/3000"))
+        .setGranularity(Granularities.ALL)
+        .setDimensions(new DefaultDimensionSpec("tags", "tags"))
+        .setAggregatorSpecs(new CountAggregatorFactory("count"))
+        .setDimFilter(
+            new InDimFilter("product", ImmutableList.of("product_5", "product_6", "product_8"), null)
+        )
+        .setContext(context)
+        .build();
+
+    Sequence<Row> result = helper.runQueryOnSegmentsObjs(
+        ImmutableList.of(
+            new QueryableIndexSegment(queryableIndexNullSampler, SegmentId.dummy("sid1")),
+            new IncrementalIndexSegment(incrementalIndexNullSampler, SegmentId.dummy("sid2"))
+        ),
+        query
+    );
+
+    List<Row> expectedResults;
+    // an empty row e.g. [], or group by 'missing' value, is grouped with the default string value, "" or null
+    // grouping input is filtered to [], null, [""]
+    if (NullHandling.replaceWithDefault()) {
+      // when sql compatible null handling is disabled, the inputs are effectively [], null, [null] and
+      // are all grouped as null
+      expectedResults = Collections.singletonList(
+          GroupByQueryRunnerTestHelper.createExpectedRow("1970-01-01T00:00:00.000Z", "tags", null, "count", 6L)
+      );
+    } else {
+      // with sql compatible null handling, null and [] = null, but [""] = ""
+      expectedResults = ImmutableList.of(
+          GroupByQueryRunnerTestHelper.createExpectedRow("1970-01-01T00:00:00.000Z", "tags", null, "count", 4L),
+          GroupByQueryRunnerTestHelper.createExpectedRow("1970-01-01T00:00:00.000Z", "tags", "", "count", 2L)
+      );
+    }
+
+    TestHelper.assertExpectedObjects(expectedResults, result.toList(), "filter-nullish");
+  }
+
+  @Test
   public void testGroupByWithDimFilterAndWithFilteredDimSpec()
   {
     GroupByQuery query = GroupByQuery
@@ -232,6 +358,7 @@ public class MultiValuedDimensionTest
         .setDimensions(new RegexFilteredDimensionSpec(new DefaultDimensionSpec("tags", "tags"), "t3"))
         .setAggregatorSpecs(new CountAggregatorFactory("count"))
         .setDimFilter(new SelectorDimFilter("tags", "t3", null))
+        .setContext(context)
         .build();
 
     Sequence<Row> result = helper.runQueryOnSegmentsObjs(
@@ -264,7 +391,8 @@ public class MultiValuedDimensionTest
         .intervals(QueryRunnerTestHelper.fullOnIntervalSpec)
         .aggregators(Collections.singletonList(new CountAggregatorFactory("count")))
         .threshold(5)
-        .filters(new SelectorDimFilter("tags", "t3", null)).build();
+        .filters(new SelectorDimFilter("tags", "t3", null))
+        .build();
 
     try (CloseableStupidPool<ByteBuffer> pool = TestQueryRunners.createDefaultNonBlockingPool()) {
       QueryRunnerFactory factory = new TopNQueryRunnerFactory(


---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscribe@druid.apache.org
For additional commands, e-mail: commits-help@druid.apache.org