You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@pinot.apache.org by ja...@apache.org on 2022/04/06 17:52:57 UTC

[pinot] branch master updated: Fix to query inconsistencies under heavy upsert load (resolves #7958) (#7971)

This is an automated email from the ASF dual-hosted git repository.

jackie pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/pinot.git


The following commit(s) were added to refs/heads/master by this push:
     new 9178618ff0 Fix to query inconsistencies under heavy upsert load (resolves #7958) (#7971)
9178618ff0 is described below

commit 9178618ff0b1b613c9dae06aade832afabafe859
Author: Valentin Mahrwald <vm...@gmail.com>
AuthorDate: Wed Apr 6 19:52:51 2022 +0200

    Fix to query inconsistencies under heavy upsert load (resolves #7958) (#7971)
    
    Add `FilterPlanNodeTest` to verify the query consistency under heavy upsert load
---
 .../apache/pinot/core/plan/FilterPlanNodeTest.java | 90 ++++++++++++++++++++++
 1 file changed, 90 insertions(+)

diff --git a/pinot-core/src/test/java/org/apache/pinot/core/plan/FilterPlanNodeTest.java b/pinot-core/src/test/java/org/apache/pinot/core/plan/FilterPlanNodeTest.java
new file mode 100644
index 0000000000..8a44e83075
--- /dev/null
+++ b/pinot-core/src/test/java/org/apache/pinot/core/plan/FilterPlanNodeTest.java
@@ -0,0 +1,90 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.pinot.core.plan;
+
+import java.util.concurrent.atomic.AtomicInteger;
+import org.apache.pinot.core.common.BlockDocIdIterator;
+import org.apache.pinot.core.operator.blocks.FilterBlock;
+import org.apache.pinot.core.operator.docidsets.FilterBlockDocIdSet;
+import org.apache.pinot.core.operator.filter.BaseFilterOperator;
+import org.apache.pinot.core.query.request.context.QueryContext;
+import org.apache.pinot.segment.spi.Constants;
+import org.apache.pinot.segment.spi.IndexSegment;
+import org.apache.pinot.segment.spi.SegmentMetadata;
+import org.apache.pinot.segment.spi.index.mutable.ThreadSafeMutableRoaringBitmap;
+import org.mockito.stubbing.Answer;
+import org.testng.annotations.Test;
+
+import static org.mockito.Mockito.mock;
+import static org.mockito.Mockito.when;
+import static org.testng.Assert.assertEquals;
+
+
+public class FilterPlanNodeTest {
+
+  @Test
+  public void testConsistentSnapshot()
+      throws Exception {
+    IndexSegment segment = mock(IndexSegment.class);
+    SegmentMetadata meta = mock(SegmentMetadata.class);
+    when(segment.getSegmentMetadata()).thenReturn(meta);
+    ThreadSafeMutableRoaringBitmap bitmap = new ThreadSafeMutableRoaringBitmap();
+    when(segment.getValidDocIds()).thenReturn(bitmap);
+    AtomicInteger numDocs = new AtomicInteger(0);
+    when(meta.getTotalDocs()).then((Answer<Integer>) invocationOnMock -> numDocs.get());
+    QueryContext ctx = mock(QueryContext.class);
+    when(ctx.getFilter()).thenReturn(null);
+
+    numDocs.set(3);
+    bitmap.add(0);
+    bitmap.add(1);
+    bitmap.add(2);
+
+    // Continuously update the last value by moving it one doc id forward
+    // Follow the order of MutableIndexSegmentImpl: first add the row, update the doc count and then change the
+    // validDocId bitmap
+    Thread updater = new Thread(() -> {
+      for (int i = 3; i < 10_000_000; i++) {
+        numDocs.incrementAndGet();
+        bitmap.replace(i - 2, i);
+      }
+    });
+    updater.start();
+
+    // Result should be invariant - always exactly 3 docs
+    for (int i = 0; i < 10_000; i++) {
+      assertEquals(getNumberOfFilteredDocs(segment, ctx), 3);
+    }
+
+    updater.join();
+  }
+
+  private int getNumberOfFilteredDocs(IndexSegment segment, QueryContext ctx) {
+    FilterPlanNode node = new FilterPlanNode(segment, ctx);
+    BaseFilterOperator op = node.run();
+    int numDocsFiltered = 0;
+    FilterBlock block = op.nextBlock();
+    FilterBlockDocIdSet blockIds = block.getBlockDocIdSet();
+    BlockDocIdIterator it = blockIds.iterator();
+    while (it.next() != Constants.EOF) {
+      numDocsFiltered++;
+    }
+    return numDocsFiltered;
+  }
+}


---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscribe@pinot.apache.org
For additional commands, e-mail: commits-help@pinot.apache.org