You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@druid.apache.org by ab...@apache.org on 2022/10/28 11:40:55 UTC
[druid] branch 24.0.1 updated: fix nested column thread safety issue (#13267)
This is an automated email from the ASF dual-hosted git repository.
abhishek pushed a commit to branch 24.0.1
in repository https://gitbox.apache.org/repos/asf/druid.git
The following commit(s) were added to refs/heads/24.0.1 by this push:
new 542277ece9 fix nested column thread safety issue (#13267)
542277ece9 is described below
commit 542277ece94e556e7eaeb2b5ac999addad7df766
Author: Clint Wylie <cw...@apache.org>
AuthorDate: Fri Oct 28 04:40:39 2022 -0700
fix nested column thread safety issue (#13267)
---
.../druid/segment/column/TypeStrategies.java | 18 ++
.../druid/segment/NestedDataColumnIndexer.java | 17 +
.../druid/segment/NestedDataColumnMerger.java | 10 +-
.../segment/nested/NestedDataColumnSerializer.java | 6 +
.../nested/NestedDataColumnSupplierTest.java | 353 +++++++++++++++++++++
5 files changed, 396 insertions(+), 8 deletions(-)
diff --git a/core/src/main/java/org/apache/druid/segment/column/TypeStrategies.java b/core/src/main/java/org/apache/druid/segment/column/TypeStrategies.java
index 38d72046f1..54a15b1dbd 100644
--- a/core/src/main/java/org/apache/druid/segment/column/TypeStrategies.java
+++ b/core/src/main/java/org/apache/druid/segment/column/TypeStrategies.java
@@ -250,6 +250,12 @@ public class TypeStrategies
return buffer.getLong();
}
+ @Override
+ public Long read(ByteBuffer buffer, int offset)
+ {
+ return buffer.getLong(offset);
+ }
+
@Override
public boolean readRetainsBufferReference()
{
@@ -297,6 +303,12 @@ public class TypeStrategies
return buffer.getFloat();
}
+ @Override
+ public Float read(ByteBuffer buffer, int offset)
+ {
+ return buffer.getFloat(offset);
+ }
+
@Override
public boolean readRetainsBufferReference()
{
@@ -344,6 +356,12 @@ public class TypeStrategies
return buffer.getDouble();
}
+ @Override
+ public Double read(ByteBuffer buffer, int offset)
+ {
+ return buffer.getDouble(offset);
+ }
+
@Override
public boolean readRetainsBufferReference()
{
diff --git a/processing/src/main/java/org/apache/druid/segment/NestedDataColumnIndexer.java b/processing/src/main/java/org/apache/druid/segment/NestedDataColumnIndexer.java
index dbdb6236fd..09467245ad 100644
--- a/processing/src/main/java/org/apache/druid/segment/NestedDataColumnIndexer.java
+++ b/processing/src/main/java/org/apache/druid/segment/NestedDataColumnIndexer.java
@@ -31,6 +31,7 @@ import org.apache.druid.segment.column.ColumnType;
import org.apache.druid.segment.data.CloseableIndexed;
import org.apache.druid.segment.incremental.IncrementalIndex;
import org.apache.druid.segment.incremental.IncrementalIndexRowHolder;
+import org.apache.druid.segment.nested.GlobalDictionarySortedCollector;
import org.apache.druid.segment.nested.GlobalDimensionDictionary;
import org.apache.druid.segment.nested.NestedDataComplexTypeSerde;
import org.apache.druid.segment.nested.NestedLiteralTypeInfo;
@@ -38,6 +39,7 @@ import org.apache.druid.segment.nested.StructuredData;
import org.apache.druid.segment.nested.StructuredDataProcessor;
import javax.annotation.Nullable;
+import java.util.Map;
import java.util.Objects;
import java.util.SortedMap;
import java.util.TreeMap;
@@ -224,6 +226,21 @@ public class NestedDataColumnIndexer implements DimensionIndexer<StructuredData,
throw new UnsupportedOperationException("Not supported");
}
+ public void mergeFields(SortedMap<String, NestedLiteralTypeInfo.MutableTypeSet> mergedFields)
+ {
+ for (Map.Entry<String, NestedDataColumnIndexer.LiteralFieldIndexer> entry : fieldIndexers.entrySet()) {
+ // skip adding the field if no types are in the set, meaning only null values have been processed
+ if (!entry.getValue().getTypes().isEmpty()) {
+ mergedFields.put(entry.getKey(), entry.getValue().getTypes());
+ }
+ }
+ }
+
+ public GlobalDictionarySortedCollector getSortedCollector()
+ {
+ return globalDictionary.getSortedCollector();
+ }
+
static class LiteralFieldIndexer
{
private final GlobalDimensionDictionary globalDimensionDictionary;
diff --git a/processing/src/main/java/org/apache/druid/segment/NestedDataColumnMerger.java b/processing/src/main/java/org/apache/druid/segment/NestedDataColumnMerger.java
index d0452c57a3..08c9799eeb 100644
--- a/processing/src/main/java/org/apache/druid/segment/NestedDataColumnMerger.java
+++ b/processing/src/main/java/org/apache/druid/segment/NestedDataColumnMerger.java
@@ -44,7 +44,6 @@ import java.io.IOException;
import java.nio.IntBuffer;
import java.util.Comparator;
import java.util.List;
-import java.util.Map;
import java.util.SortedMap;
import java.util.TreeMap;
@@ -163,13 +162,8 @@ public class NestedDataColumnMerger implements DimensionMergerV9
return null;
}
final NestedDataColumnIndexer indexer = (NestedDataColumnIndexer) dim.getIndexer();
- for (Map.Entry<String, NestedDataColumnIndexer.LiteralFieldIndexer> entry : indexer.fieldIndexers.entrySet()) {
- // skip adding the field if no types are in the set, meaning only null values have been processed
- if (!entry.getValue().getTypes().isEmpty()) {
- mergedFields.put(entry.getKey(), entry.getValue().getTypes());
- }
- }
- return indexer.globalDictionary.getSortedCollector();
+ indexer.mergeFields(mergedFields);
+ return indexer.getSortedCollector();
}
@Nullable
diff --git a/processing/src/main/java/org/apache/druid/segment/nested/NestedDataColumnSerializer.java b/processing/src/main/java/org/apache/druid/segment/nested/NestedDataColumnSerializer.java
index 6d113bf717..dd58d3b866 100644
--- a/processing/src/main/java/org/apache/druid/segment/nested/NestedDataColumnSerializer.java
+++ b/processing/src/main/java/org/apache/druid/segment/nested/NestedDataColumnSerializer.java
@@ -682,6 +682,12 @@ public class NestedDataColumnSerializer implements GenericColumnSerializer<Struc
return buffer.getInt();
}
+ @Override
+ public Integer read(ByteBuffer buffer, int offset)
+ {
+ return buffer.getInt(offset);
+ }
+
@Override
public boolean readRetainsBufferReference()
{
diff --git a/processing/src/test/java/org/apache/druid/segment/nested/NestedDataColumnSupplierTest.java b/processing/src/test/java/org/apache/druid/segment/nested/NestedDataColumnSupplierTest.java
new file mode 100644
index 0000000000..60285df21a
--- /dev/null
+++ b/processing/src/test/java/org/apache/druid/segment/nested/NestedDataColumnSupplierTest.java
@@ -0,0 +1,353 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.druid.segment.nested;
+
+import com.fasterxml.jackson.databind.ObjectMapper;
+import com.google.common.collect.ImmutableList;
+import com.google.common.collect.ImmutableMap;
+import com.google.common.util.concurrent.Futures;
+import com.google.common.util.concurrent.ListenableFuture;
+import com.google.common.util.concurrent.ListeningExecutorService;
+import com.google.common.util.concurrent.MoreExecutors;
+import org.apache.druid.collections.bitmap.RoaringBitmapFactory;
+import org.apache.druid.java.util.common.io.Closer;
+import org.apache.druid.java.util.common.io.smoosh.FileSmoosher;
+import org.apache.druid.java.util.common.io.smoosh.SmooshedFileMapper;
+import org.apache.druid.java.util.common.io.smoosh.SmooshedWriter;
+import org.apache.druid.query.DefaultBitmapResultFactory;
+import org.apache.druid.query.monomorphicprocessing.RuntimeShapeInspector;
+import org.apache.druid.segment.BaseProgressIndicator;
+import org.apache.druid.segment.ColumnValueSelector;
+import org.apache.druid.segment.IndexSpec;
+import org.apache.druid.segment.NestedDataColumnIndexer;
+import org.apache.druid.segment.ObjectColumnSelector;
+import org.apache.druid.segment.SimpleAscendingOffset;
+import org.apache.druid.segment.TestHelper;
+import org.apache.druid.segment.column.ColumnBuilder;
+import org.apache.druid.segment.column.ColumnIndexSupplier;
+import org.apache.druid.segment.column.NullValueIndex;
+import org.apache.druid.segment.column.StringValueSetIndex;
+import org.apache.druid.segment.column.TypeStrategy;
+import org.apache.druid.segment.writeout.TmpFileSegmentWriteOutMediumFactory;
+import org.apache.druid.testing.InitializedNullHandlingTest;
+import org.junit.After;
+import org.junit.Assert;
+import org.junit.Before;
+import org.junit.Rule;
+import org.junit.Test;
+import org.junit.rules.TemporaryFolder;
+
+import javax.annotation.Nullable;
+import java.io.File;
+import java.io.IOException;
+import java.nio.ByteBuffer;
+import java.util.ArrayList;
+import java.util.Collection;
+import java.util.List;
+import java.util.Map;
+import java.util.SortedMap;
+import java.util.TreeMap;
+import java.util.concurrent.CountDownLatch;
+import java.util.concurrent.ExecutionException;
+import java.util.concurrent.Executors;
+import java.util.concurrent.atomic.AtomicReference;
+
+public class NestedDataColumnSupplierTest extends InitializedNullHandlingTest
+{
+ private static final ObjectMapper JSON_MAPPER = TestHelper.makeJsonMapper();
+
+ @Rule
+ public final TemporaryFolder tempFolder = new TemporaryFolder();
+
+ DefaultBitmapResultFactory resultFactory = new DefaultBitmapResultFactory(new RoaringBitmapFactory());
+
+ List<Map<String, Object>> data = ImmutableList.of(
+ ImmutableMap.of("x", 1L, "y", 1.0, "z", "a"),
+ ImmutableMap.of("y", 3.0, "z", "d"),
+ ImmutableMap.of("x", 5L, "y", 5.0, "z", "b"),
+ ImmutableMap.of("x", 3L, "y", 4.0, "z", "c"),
+ ImmutableMap.of("x", 2L),
+ ImmutableMap.of("x", 4L, "y", 2.0, "z", "e")
+ );
+
+ Closer closer = Closer.create();
+
+ SmooshedFileMapper fileMapper;
+
+ ByteBuffer baseBuffer;
+
+ @Before
+ public void setup() throws IOException
+ {
+ final String fileNameBase = "test";
+ TmpFileSegmentWriteOutMediumFactory writeOutMediumFactory = TmpFileSegmentWriteOutMediumFactory.instance();
+ final File tmpFile = tempFolder.newFolder();
+ try (final FileSmoosher smoosher = new FileSmoosher(tmpFile)) {
+
+
+ NestedDataColumnSerializer serializer = new NestedDataColumnSerializer(
+ fileNameBase,
+ new IndexSpec(),
+ writeOutMediumFactory.makeSegmentWriteOutMedium(tempFolder.newFolder()),
+ new BaseProgressIndicator(),
+ closer
+ );
+
+ NestedDataColumnIndexer indexer = new NestedDataColumnIndexer();
+ for (Object o : data) {
+ indexer.processRowValsToUnsortedEncodedKeyComponent(o, false);
+ }
+ SortedMap<String, NestedLiteralTypeInfo.MutableTypeSet> sortedFields = new TreeMap<>();
+ indexer.mergeFields(sortedFields);
+
+ GlobalDictionarySortedCollector globalDictionarySortedCollector = indexer.getSortedCollector();
+
+ serializer.open();
+ serializer.serializeFields(sortedFields);
+ serializer.serializeStringDictionary(globalDictionarySortedCollector.getSortedStrings());
+ serializer.serializeLongDictionary(globalDictionarySortedCollector.getSortedLongs());
+ serializer.serializeDoubleDictionary(globalDictionarySortedCollector.getSortedDoubles());
+
+ SettableSelector valueSelector = new SettableSelector();
+ for (Object o : data) {
+ valueSelector.setObject(StructuredData.wrap(o));
+ serializer.serialize(valueSelector);
+ }
+
+ try (SmooshedWriter writer = smoosher.addWithSmooshedWriter(fileNameBase, serializer.getSerializedSize())) {
+ serializer.writeTo(writer, smoosher);
+ }
+ smoosher.close();
+ fileMapper = closer.register(SmooshedFileMapper.load(tmpFile));
+ baseBuffer = fileMapper.mapFile(fileNameBase);
+ }
+ }
+
+ @After
+ public void teardown() throws IOException
+ {
+ closer.close();
+ }
+
+ @Test
+ public void testBasicFunctionality() throws IOException
+ {
+ ColumnBuilder bob = new ColumnBuilder();
+ bob.setFileMapper(fileMapper);
+ NestedDataColumnSupplier supplier = new NestedDataColumnSupplier(
+ baseBuffer,
+ bob,
+ () -> 0,
+ NestedDataComplexTypeSerde.OBJECT_MAPPER
+ );
+ try (NestedDataComplexColumn column = (NestedDataComplexColumn) supplier.get()) {
+ smokeTest(column);
+ }
+ }
+
+ @Test
+ public void testConcurrency() throws ExecutionException, InterruptedException
+ {
+ // if this test ever starts being to be a flake, there might be thread safety issues
+ ColumnBuilder bob = new ColumnBuilder();
+ bob.setFileMapper(fileMapper);
+ NestedDataColumnSupplier supplier = new NestedDataColumnSupplier(
+ baseBuffer,
+ bob,
+ () -> 0,
+ NestedDataComplexTypeSerde.OBJECT_MAPPER
+ );
+ final String expectedReason = "none";
+ final AtomicReference<String> failureReason = new AtomicReference<>(expectedReason);
+
+ final int threads = 10;
+ ListeningExecutorService executorService = MoreExecutors.listeningDecorator(Executors.newFixedThreadPool(threads));
+ Collection<ListenableFuture<?>> futures = new ArrayList<>(threads);
+ final CountDownLatch threadsStartLatch = new CountDownLatch(1);
+ for (int i = 0; i < threads; ++i) {
+ futures.add(
+ executorService.submit(() -> {
+ try {
+ threadsStartLatch.await();
+ for (int iter = 0; iter < 5000; iter++) {
+ try (NestedDataComplexColumn column = (NestedDataComplexColumn) supplier.get()) {
+ smokeTest(column);
+ }
+ }
+ }
+ catch (Throwable ex) {
+ failureReason.set(ex.getMessage());
+ }
+ })
+ );
+ }
+ threadsStartLatch.countDown();
+ Futures.allAsList(futures).get();
+ Assert.assertEquals(expectedReason, failureReason.get());
+ }
+
+ private void smokeTest(NestedDataComplexColumn column) throws IOException
+ {
+ SimpleAscendingOffset offset = new SimpleAscendingOffset(data.size());
+ ColumnValueSelector<?> rawSelector = column.makeColumnValueSelector(offset);
+
+ final List<NestedPathPart> xPath = NestedPathFinder.parseJsonPath("$.x");
+ ColumnValueSelector<?> xSelector = column.makeColumnValueSelector(xPath, offset);
+ ColumnIndexSupplier xIndexSupplier = column.getColumnIndexSupplier(xPath);
+ Assert.assertNotNull(xIndexSupplier);
+ StringValueSetIndex xValueIndex = xIndexSupplier.as(StringValueSetIndex.class);
+ NullValueIndex xNulls = xIndexSupplier.as(NullValueIndex.class);
+
+ final List<NestedPathPart> yPath = NestedPathFinder.parseJsonPath("$.y");
+ ColumnValueSelector<?> ySelector = column.makeColumnValueSelector(yPath, offset);
+ ColumnIndexSupplier yIndexSupplier = column.getColumnIndexSupplier(yPath);
+ Assert.assertNotNull(yIndexSupplier);
+ StringValueSetIndex yValueIndex = yIndexSupplier.as(StringValueSetIndex.class);
+ NullValueIndex yNulls = yIndexSupplier.as(NullValueIndex.class);
+
+ final List<NestedPathPart> zPath = NestedPathFinder.parseJsonPath("$.z");
+ ColumnValueSelector<?> zSelector = column.makeColumnValueSelector(zPath, offset);
+ ColumnIndexSupplier zIndexSupplier = column.getColumnIndexSupplier(zPath);
+ Assert.assertNotNull(zIndexSupplier);
+ StringValueSetIndex zValueIndex = zIndexSupplier.as(StringValueSetIndex.class);
+ NullValueIndex zNulls = zIndexSupplier.as(NullValueIndex.class);
+
+ for (int i = 0; i < data.size(); i++) {
+ Map row = data.get(i);
+ Assert.assertEquals(
+ JSON_MAPPER.writeValueAsString(row),
+ JSON_MAPPER.writeValueAsString(StructuredData.unwrap(rawSelector.getObject()))
+ );
+ if (row.containsKey("x")) {
+ Assert.assertEquals(row.get("x"), xSelector.getObject());
+ Assert.assertEquals(row.get("x"), xSelector.getLong());
+ Assert.assertTrue(xValueIndex.forValue(String.valueOf(row.get("x"))).computeBitmapResult(resultFactory).get(i));
+ Assert.assertFalse(xNulls.forNull().computeBitmapResult(resultFactory).get(i));
+ } else {
+ Assert.assertNull(xSelector.getObject());
+ Assert.assertTrue(xSelector.isNull());
+ Assert.assertTrue(xValueIndex.forValue(null).computeBitmapResult(resultFactory).get(i));
+ Assert.assertTrue(xNulls.forNull().computeBitmapResult(resultFactory).get(i));
+ }
+ if (row.containsKey("y")) {
+ Assert.assertEquals(row.get("y"), ySelector.getObject());
+ Assert.assertEquals(row.get("y"), ySelector.getDouble());
+ Assert.assertTrue(yValueIndex.forValue(String.valueOf(row.get("y"))).computeBitmapResult(resultFactory).get(i));
+ Assert.assertFalse(yNulls.forNull().computeBitmapResult(resultFactory).get(i));
+ } else {
+ Assert.assertNull(ySelector.getObject());
+ Assert.assertTrue(ySelector.isNull());
+ Assert.assertTrue(yValueIndex.forValue(null).computeBitmapResult(resultFactory).get(i));
+ Assert.assertTrue(yNulls.forNull().computeBitmapResult(resultFactory).get(i));
+ }
+ if (row.containsKey("z")) {
+ Assert.assertEquals(row.get("z"), zSelector.getObject());
+ Assert.assertTrue(zValueIndex.forValue((String) row.get("z")).computeBitmapResult(resultFactory).get(i));
+ Assert.assertFalse(zNulls.forNull().computeBitmapResult(resultFactory).get(i));
+ } else {
+ Assert.assertNull(zSelector.getObject());
+ Assert.assertTrue(zValueIndex.forValue(null).computeBitmapResult(resultFactory).get(i));
+ Assert.assertTrue(zNulls.forNull().computeBitmapResult(resultFactory).get(i));
+ }
+ offset.increment();
+ }
+ }
+
+ private static class SettableSelector extends ObjectColumnSelector<StructuredData>
+ {
+ private StructuredData data;
+
+ public void setObject(StructuredData o)
+ {
+ this.data = o;
+ }
+
+ @Nullable
+ @Override
+ public StructuredData getObject()
+ {
+ return data;
+ }
+
+ @Override
+ public Class classOfObject()
+ {
+ return StructuredData.class;
+ }
+
+ @Override
+ public void inspectRuntimeShape(RuntimeShapeInspector inspector)
+ {
+
+ }
+ }
+
+ private static class OnlyPositionalReadsTypeStrategy<T> implements TypeStrategy<T>
+ {
+ private final TypeStrategy<T> delegate;
+
+ private OnlyPositionalReadsTypeStrategy(TypeStrategy<T> delegate)
+ {
+ this.delegate = delegate;
+ }
+
+ @Override
+ public int estimateSizeBytes(T value)
+ {
+ return delegate.estimateSizeBytes(value);
+ }
+
+ @Override
+ public T read(ByteBuffer buffer)
+ {
+ throw new IllegalStateException("non-positional read");
+ }
+
+ @Override
+ public boolean readRetainsBufferReference()
+ {
+ return delegate.readRetainsBufferReference();
+ }
+
+ @Override
+ public int write(ByteBuffer buffer, T value, int maxSizeBytes)
+ {
+ return delegate.write(buffer, value, maxSizeBytes);
+ }
+
+ @Override
+ public T read(ByteBuffer buffer, int offset)
+ {
+ return delegate.read(buffer, offset);
+ }
+
+ @Override
+ public int write(ByteBuffer buffer, int offset, T value, int maxSizeBytes)
+ {
+ return delegate.write(buffer, offset, value, maxSizeBytes);
+ }
+
+ @Override
+ public int compare(T o1, T o2)
+ {
+ return delegate.compare(o1, o2);
+ }
+ }
+}
---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscribe@druid.apache.org
For additional commands, e-mail: commits-help@druid.apache.org