You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@pinot.apache.org by ja...@apache.org on 2022/04/06 17:52:57 UTC
[pinot] branch master updated: Fix to query inconsistencies under heavy upsert load (resolves #7958) (#7971)
This is an automated email from the ASF dual-hosted git repository.
jackie pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/pinot.git
The following commit(s) were added to refs/heads/master by this push:
new 9178618ff0 Fix to query inconsistencies under heavy upsert load (resolves #7958) (#7971)
9178618ff0 is described below
commit 9178618ff0b1b613c9dae06aade832afabafe859
Author: Valentin Mahrwald <vm...@gmail.com>
AuthorDate: Wed Apr 6 19:52:51 2022 +0200
Fix to query inconsistencies under heavy upsert load (resolves #7958) (#7971)
Add `FilterPlanNodeTest` to verify the query consistency under heavy upsert load
---
.../apache/pinot/core/plan/FilterPlanNodeTest.java | 90 ++++++++++++++++++++++
1 file changed, 90 insertions(+)
diff --git a/pinot-core/src/test/java/org/apache/pinot/core/plan/FilterPlanNodeTest.java b/pinot-core/src/test/java/org/apache/pinot/core/plan/FilterPlanNodeTest.java
new file mode 100644
index 0000000000..8a44e83075
--- /dev/null
+++ b/pinot-core/src/test/java/org/apache/pinot/core/plan/FilterPlanNodeTest.java
@@ -0,0 +1,90 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.pinot.core.plan;
+
+import java.util.concurrent.atomic.AtomicInteger;
+import org.apache.pinot.core.common.BlockDocIdIterator;
+import org.apache.pinot.core.operator.blocks.FilterBlock;
+import org.apache.pinot.core.operator.docidsets.FilterBlockDocIdSet;
+import org.apache.pinot.core.operator.filter.BaseFilterOperator;
+import org.apache.pinot.core.query.request.context.QueryContext;
+import org.apache.pinot.segment.spi.Constants;
+import org.apache.pinot.segment.spi.IndexSegment;
+import org.apache.pinot.segment.spi.SegmentMetadata;
+import org.apache.pinot.segment.spi.index.mutable.ThreadSafeMutableRoaringBitmap;
+import org.mockito.stubbing.Answer;
+import org.testng.annotations.Test;
+
+import static org.mockito.Mockito.mock;
+import static org.mockito.Mockito.when;
+import static org.testng.Assert.assertEquals;
+
+
+public class FilterPlanNodeTest {
+
+ @Test
+ public void testConsistentSnapshot()
+ throws Exception {
+ IndexSegment segment = mock(IndexSegment.class);
+ SegmentMetadata meta = mock(SegmentMetadata.class);
+ when(segment.getSegmentMetadata()).thenReturn(meta);
+ ThreadSafeMutableRoaringBitmap bitmap = new ThreadSafeMutableRoaringBitmap();
+ when(segment.getValidDocIds()).thenReturn(bitmap);
+ AtomicInteger numDocs = new AtomicInteger(0);
+ when(meta.getTotalDocs()).then((Answer<Integer>) invocationOnMock -> numDocs.get());
+ QueryContext ctx = mock(QueryContext.class);
+ when(ctx.getFilter()).thenReturn(null);
+
+ numDocs.set(3);
+ bitmap.add(0);
+ bitmap.add(1);
+ bitmap.add(2);
+
+ // Continuously update the last value by moving it one doc id forward
+ // Follow the order of MutableIndexSegmentImpl: first add the row, update the doc count and then change the
+ // validDocId bitmap
+ Thread updater = new Thread(() -> {
+ for (int i = 3; i < 10_000_000; i++) {
+ numDocs.incrementAndGet();
+ bitmap.replace(i - 2, i);
+ }
+ });
+ updater.start();
+
+ // Result should be invariant - always exactly 3 docs
+ for (int i = 0; i < 10_000; i++) {
+ assertEquals(getNumberOfFilteredDocs(segment, ctx), 3);
+ }
+
+ updater.join();
+ }
+
+ private int getNumberOfFilteredDocs(IndexSegment segment, QueryContext ctx) {
+ FilterPlanNode node = new FilterPlanNode(segment, ctx);
+ BaseFilterOperator op = node.run();
+ int numDocsFiltered = 0;
+ FilterBlock block = op.nextBlock();
+ FilterBlockDocIdSet blockIds = block.getBlockDocIdSet();
+ BlockDocIdIterator it = blockIds.iterator();
+ while (it.next() != Constants.EOF) {
+ numDocsFiltered++;
+ }
+ return numDocsFiltered;
+ }
+}
---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscribe@pinot.apache.org
For additional commands, e-mail: commits-help@pinot.apache.org