You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@druid.apache.org by cw...@apache.org on 2022/09/16 08:29:02 UTC
[druid] branch master updated: split up NestedDataColumnSerializer into separate files (#13096)
This is an automated email from the ASF dual-hosted git repository.
cwylie pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/druid.git
The following commit(s) were added to refs/heads/master by this push:
new 5ece870634 split up NestedDataColumnSerializer into separate files (#13096)
5ece870634 is described below
commit 5ece87063430e142151af88c7941222991cc5763
Author: Clint Wylie <cw...@apache.org>
AuthorDate: Fri Sep 16 01:28:47 2022 -0700
split up NestedDataColumnSerializer into separate files (#13096)
* split up NestedDataColumnSerializer into separate files
* fix it
---
.../segment/nested/DoubleFieldColumnWriter.java | 98 ++++++
.../GlobalDictionaryEncodedFieldColumnWriter.java | 264 ++++++++++++++
.../segment/nested/LongFieldColumnWriter.java | 99 ++++++
.../segment/nested/NestedDataColumnSerializer.java | 385 +++------------------
.../segment/nested/StringFieldColumnWriter.java | 63 ++++
.../nested/VariantLiteralFieldColumnWriter.java | 66 ++++
6 files changed, 636 insertions(+), 339 deletions(-)
diff --git a/processing/src/main/java/org/apache/druid/segment/nested/DoubleFieldColumnWriter.java b/processing/src/main/java/org/apache/druid/segment/nested/DoubleFieldColumnWriter.java
new file mode 100644
index 0000000000..623eb1490f
--- /dev/null
+++ b/processing/src/main/java/org/apache/druid/segment/nested/DoubleFieldColumnWriter.java
@@ -0,0 +1,98 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.druid.segment.nested;
+
+import com.google.common.primitives.Ints;
+import org.apache.druid.java.util.common.StringUtils;
+import org.apache.druid.java.util.common.io.smoosh.FileSmoosher;
+import org.apache.druid.segment.IndexSpec;
+import org.apache.druid.segment.data.ColumnarDoublesSerializer;
+import org.apache.druid.segment.data.CompressionFactory;
+import org.apache.druid.segment.writeout.SegmentWriteOutMedium;
+
+import javax.annotation.Nullable;
+import java.io.IOException;
+import java.nio.ByteOrder;
+import java.nio.channels.WritableByteChannel;
+
+/**
+ * Literal field writer for double type nested columns of {@link NestedDataColumnSerializer}. In addition to the normal
+ * dictionary encoded column, this writer also writes an additional double value column with {@link #doublesSerializer},
+ * which is written to during {@link #addValue}.
+ */
+public final class DoubleFieldColumnWriter extends GlobalDictionaryEncodedFieldColumnWriter<Double>
+{
+ private ColumnarDoublesSerializer doublesSerializer;
+
+ protected DoubleFieldColumnWriter(
+ String columnName,
+ String fieldName,
+ SegmentWriteOutMedium segmentWriteOutMedium,
+ IndexSpec indexSpec,
+ GlobalDictionaryIdLookup globalDictionaryIdLookup
+ )
+ {
+ super(columnName, fieldName, segmentWriteOutMedium, indexSpec, globalDictionaryIdLookup);
+ }
+
+ @Override
+ int lookupGlobalId(Double value)
+ {
+ return globalDictionaryIdLookup.lookupDouble(value);
+ }
+
+ @Override
+ public void open() throws IOException
+ {
+ super.open();
+ doublesSerializer = CompressionFactory.getDoubleSerializer(
+ fieldName,
+ segmentWriteOutMedium,
+ StringUtils.format("%s.double_column", fieldName),
+ ByteOrder.nativeOrder(),
+ indexSpec.getDimensionCompression()
+ );
+ doublesSerializer.open();
+ }
+
+ @Override
+ void writeValue(@Nullable Double value) throws IOException
+ {
+ if (value == null) {
+ doublesSerializer.add(0.0);
+ } else {
+ doublesSerializer.add(value);
+ }
+ }
+
+ @Override
+ void writeColumnTo(WritableByteChannel channel, FileSmoosher smoosher) throws IOException
+ {
+ writeLongAndDoubleColumnLength(channel, 0, Ints.checkedCast(doublesSerializer.getSerializedSize()));
+ doublesSerializer.writeTo(channel, smoosher);
+ encodedValueSerializer.writeTo(channel, smoosher);
+ }
+
+ @Override
+ long getSerializedColumnSize() throws IOException
+ {
+ return super.getSerializedColumnSize() + doublesSerializer.getSerializedSize();
+ }
+}
diff --git a/processing/src/main/java/org/apache/druid/segment/nested/GlobalDictionaryEncodedFieldColumnWriter.java b/processing/src/main/java/org/apache/druid/segment/nested/GlobalDictionaryEncodedFieldColumnWriter.java
new file mode 100644
index 0000000000..ed30c84b79
--- /dev/null
+++ b/processing/src/main/java/org/apache/druid/segment/nested/GlobalDictionaryEncodedFieldColumnWriter.java
@@ -0,0 +1,264 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.druid.segment.nested;
+
+import com.google.common.primitives.Ints;
+import it.unimi.dsi.fastutil.ints.Int2IntOpenHashMap;
+import it.unimi.dsi.fastutil.ints.IntArrays;
+import it.unimi.dsi.fastutil.ints.IntIterator;
+import org.apache.druid.collections.bitmap.ImmutableBitmap;
+import org.apache.druid.collections.bitmap.MutableBitmap;
+import org.apache.druid.io.Channels;
+import org.apache.druid.java.util.common.io.smoosh.FileSmoosher;
+import org.apache.druid.java.util.common.io.smoosh.SmooshedWriter;
+import org.apache.druid.java.util.common.logger.Logger;
+import org.apache.druid.segment.IndexSpec;
+import org.apache.druid.segment.data.CompressedVSizeColumnarIntsSerializer;
+import org.apache.druid.segment.data.CompressionStrategy;
+import org.apache.druid.segment.data.FixedIndexedIntWriter;
+import org.apache.druid.segment.data.GenericIndexedWriter;
+import org.apache.druid.segment.data.SingleValueColumnarIntsSerializer;
+import org.apache.druid.segment.data.VSizeColumnarIntsSerializer;
+import org.apache.druid.segment.serde.DictionaryEncodedColumnPartSerde;
+import org.apache.druid.segment.serde.Serializer;
+import org.apache.druid.segment.writeout.SegmentWriteOutMedium;
+
+import javax.annotation.Nullable;
+import java.io.IOException;
+import java.nio.ByteBuffer;
+import java.nio.ByteOrder;
+import java.nio.channels.WritableByteChannel;
+
+/**
+ * Base class for writer of global dictionary encoded nested literal columns for {@link NestedDataColumnSerializer}.
+ * {@link NestedDataColumnSerializer} while processing the 'raw' nested data will call {@link #addValue(Object)} for
+ * all literal writers, which for this type of writer entails building a local dictionary to map into to the global
+ * dictionary ({@link #localDictionary}) and writes this unsorted localId to an intermediate integer column,
+ * {@link #intermediateValueWriter}.
+ *
+ * When processing the 'raw' value column is complete, the {@link #writeTo(FileSmoosher)} method will sort the
+ * local ids and write them out to a local sorted dictionary, iterate over {@link #intermediateValueWriter} swapping
+ * the unsorted local ids with the sorted ids and writing to the compressed id column writer
+ * {@link #encodedValueSerializer} building the bitmap indexes along the way.
+ */
+public abstract class GlobalDictionaryEncodedFieldColumnWriter<T>
+{
+ private static final Logger log = new Logger(GlobalDictionaryEncodedFieldColumnWriter.class);
+
+ protected final SegmentWriteOutMedium segmentWriteOutMedium;
+ protected final String columnName;
+ protected final String fieldName;
+ protected final IndexSpec indexSpec;
+ protected final GlobalDictionaryIdLookup globalDictionaryIdLookup;
+ protected final LocalDimensionDictionary localDictionary = new LocalDimensionDictionary();
+
+ protected FixedIndexedIntWriter intermediateValueWriter;
+ // maybe someday we allow no bitmap indexes or multi-value columns
+ protected int flags = DictionaryEncodedColumnPartSerde.NO_FLAGS;
+ protected DictionaryEncodedColumnPartSerde.VERSION version = null;
+ protected SingleValueColumnarIntsSerializer encodedValueSerializer;
+
+ protected GlobalDictionaryEncodedFieldColumnWriter(
+ String columnName,
+ String fieldName,
+ SegmentWriteOutMedium segmentWriteOutMedium,
+ IndexSpec indexSpec,
+ GlobalDictionaryIdLookup globalDictionaryIdLookup
+ )
+ {
+ this.columnName = columnName;
+ this.fieldName = fieldName;
+ this.segmentWriteOutMedium = segmentWriteOutMedium;
+ this.indexSpec = indexSpec;
+ this.globalDictionaryIdLookup = globalDictionaryIdLookup;
+ }
+
+ /**
+ * Perform any value conversion needed before storing the value in the
+ */
+ T processValue(Object value)
+ {
+ return (T) value;
+ }
+
+ /**
+ * Hook to allow implementors the chance to do additional operations during {@link #addValue(Object)}, such as
+ * writing an additional value column
+ */
+ void writeValue(@Nullable T value) throws IOException
+ {
+ // do nothing, if a value column is present this method should be overridden to write the value to the serializer
+ }
+
+ /**
+ * Find a value in {@link #globalDictionaryIdLookup} as is most appropriate to the writer type
+ */
+ abstract int lookupGlobalId(T value);
+
+ /**
+ * Open the writer so that {@link #addValue(Object)} can be called
+ */
+ public void open() throws IOException
+ {
+ intermediateValueWriter = new FixedIndexedIntWriter(segmentWriteOutMedium, false);
+ intermediateValueWriter.open();
+ }
+
+ /**
+ * Add a value to the unsorted local dictionary and write to an intermediate column
+ */
+ public void addValue(Object val) throws IOException
+ {
+ final T value = processValue(val);
+ final int globalId = lookupGlobalId(value);
+ final int localId = localDictionary.add(globalId);
+ intermediateValueWriter.write(localId);
+ writeValue(value);
+ }
+
+
+ /**
+ * How many bytes {@link #writeColumnTo(WritableByteChannel, FileSmoosher)} is expected to write to the segment.
+ */
+ long getSerializedColumnSize() throws IOException
+ {
+ return Integer.BYTES + Integer.BYTES + encodedValueSerializer.getSerializedSize();
+ }
+
+ /**
+ * Defines how to write the column, including the dictionary id column, along with any additional columns
+ * such as the long or double value column as type appropriate.
+ */
+ abstract void writeColumnTo(WritableByteChannel channel, FileSmoosher smoosher) throws IOException;
+
+ public void writeTo(FileSmoosher smoosher) throws IOException
+ {
+ // use a child writeout medium so that we can close them when we are finished and don't leave temporary files
+ // hanging out until the entire segment is done
+ final SegmentWriteOutMedium tmpWriteoutMedium = segmentWriteOutMedium.makeChildWriteOutMedium();
+ final FixedIndexedIntWriter sortedDictionaryWriter = new FixedIndexedIntWriter(tmpWriteoutMedium, true);
+ sortedDictionaryWriter.open();
+ GenericIndexedWriter<ImmutableBitmap> bitmapIndexWriter = new GenericIndexedWriter<>(
+ tmpWriteoutMedium,
+ columnName,
+ indexSpec.getBitmapSerdeFactory().getObjectStrategy()
+ );
+ bitmapIndexWriter.open();
+ bitmapIndexWriter.setObjectsNotSorted();
+ final Int2IntOpenHashMap globalToUnsorted = localDictionary.getGlobalIdToLocalId();
+ final int[] unsortedToGlobal = new int[localDictionary.size()];
+ for (int key : globalToUnsorted.keySet()) {
+ unsortedToGlobal[globalToUnsorted.get(key)] = key;
+ }
+ final int[] sortedGlobal = new int[unsortedToGlobal.length];
+ System.arraycopy(unsortedToGlobal, 0, sortedGlobal, 0, unsortedToGlobal.length);
+ IntArrays.unstableSort(sortedGlobal);
+
+ final int[] unsortedToSorted = new int[unsortedToGlobal.length];
+ final MutableBitmap[] bitmaps = new MutableBitmap[sortedGlobal.length];
+ for (int index = 0; index < sortedGlobal.length; index++) {
+ final int globalId = sortedGlobal[index];
+ sortedDictionaryWriter.write(globalId);
+ final int unsortedId = globalToUnsorted.get(globalId);
+ unsortedToSorted[unsortedId] = index;
+ bitmaps[index] = indexSpec.getBitmapSerdeFactory().getBitmapFactory().makeEmptyMutableBitmap();
+ }
+
+ openColumnSerializer(tmpWriteoutMedium, sortedGlobal[sortedGlobal.length - 1]);
+ final IntIterator rows = intermediateValueWriter.getIterator();
+ int rowCount = 0;
+ while (rows.hasNext()) {
+ final int unsortedLocalId = rows.nextInt();
+ final int sortedLocalId = unsortedToSorted[unsortedLocalId];
+ encodedValueSerializer.addValue(sortedLocalId);
+ bitmaps[sortedLocalId].add(rowCount++);
+ }
+
+ for (MutableBitmap bitmap : bitmaps) {
+ bitmapIndexWriter.write(
+ indexSpec.getBitmapSerdeFactory().getBitmapFactory().makeImmutableBitmap(bitmap)
+ );
+ }
+
+ final Serializer fieldSerializer = new Serializer()
+ {
+ @Override
+ public long getSerializedSize() throws IOException
+ {
+ return 1 + Integer.BYTES +
+ sortedDictionaryWriter.getSerializedSize() +
+ bitmapIndexWriter.getSerializedSize() +
+ getSerializedColumnSize();
+ }
+
+ @Override
+ public void writeTo(WritableByteChannel channel, FileSmoosher smoosher) throws IOException
+ {
+ Channels.writeFully(channel, ByteBuffer.wrap(new byte[]{version.asByte()}));
+ channel.write(ByteBuffer.wrap(Ints.toByteArray(flags)));
+ sortedDictionaryWriter.writeTo(channel, smoosher);
+ writeColumnTo(channel, smoosher);
+ bitmapIndexWriter.writeTo(channel, smoosher);
+ }
+ };
+ final String fieldFileName = NestedDataColumnSerializer.getFieldFileName(columnName, fieldName);
+ final long size = fieldSerializer.getSerializedSize();
+ log.debug("Column [%s] serializing [%s] field of size [%d].", columnName, fieldName, size);
+ try (SmooshedWriter smooshChannel = smoosher.addWithSmooshedWriter(fieldFileName, size)) {
+ fieldSerializer.writeTo(smooshChannel, smoosher);
+ }
+ finally {
+ tmpWriteoutMedium.close();
+ }
+ }
+
+ private void openColumnSerializer(SegmentWriteOutMedium medium, int maxId) throws IOException
+ {
+ if (indexSpec.getDimensionCompression() != CompressionStrategy.UNCOMPRESSED) {
+ this.version = DictionaryEncodedColumnPartSerde.VERSION.COMPRESSED;
+ encodedValueSerializer = CompressedVSizeColumnarIntsSerializer.create(
+ fieldName,
+ medium,
+ columnName,
+ maxId,
+ indexSpec.getDimensionCompression()
+ );
+ } else {
+ encodedValueSerializer = new VSizeColumnarIntsSerializer(medium, maxId);
+ this.version = DictionaryEncodedColumnPartSerde.VERSION.UNCOMPRESSED_SINGLE_VALUE;
+ }
+ encodedValueSerializer.open();
+ }
+
+ public void writeLongAndDoubleColumnLength(WritableByteChannel channel, int longLength, int doubleLength)
+ throws IOException
+ {
+ ByteBuffer intBuffer = ByteBuffer.allocate(Integer.BYTES).order(ByteOrder.nativeOrder());
+ intBuffer.position(0);
+ intBuffer.putInt(longLength);
+ intBuffer.flip();
+ Channels.writeFully(channel, intBuffer);
+ intBuffer.position(0);
+ intBuffer.limit(intBuffer.capacity());
+ intBuffer.putInt(doubleLength);
+ intBuffer.flip();
+ Channels.writeFully(channel, intBuffer);
+ }
+}
diff --git a/processing/src/main/java/org/apache/druid/segment/nested/LongFieldColumnWriter.java b/processing/src/main/java/org/apache/druid/segment/nested/LongFieldColumnWriter.java
new file mode 100644
index 0000000000..1321a00023
--- /dev/null
+++ b/processing/src/main/java/org/apache/druid/segment/nested/LongFieldColumnWriter.java
@@ -0,0 +1,99 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.druid.segment.nested;
+
+import com.google.common.primitives.Ints;
+import org.apache.druid.java.util.common.StringUtils;
+import org.apache.druid.java.util.common.io.smoosh.FileSmoosher;
+import org.apache.druid.segment.IndexSpec;
+import org.apache.druid.segment.data.ColumnarLongsSerializer;
+import org.apache.druid.segment.data.CompressionFactory;
+import org.apache.druid.segment.writeout.SegmentWriteOutMedium;
+
+import javax.annotation.Nullable;
+import java.io.IOException;
+import java.nio.ByteOrder;
+import java.nio.channels.WritableByteChannel;
+
+/**
+ * Literal field writer for long type nested columns of {@link NestedDataColumnSerializer}. In addition to the normal
+ * dictionary encoded column, this writer also writes an additional long value column with {@link #longsSerializer},
+ * which is written to during {@link #addValue}.
+ */
+public final class LongFieldColumnWriter extends GlobalDictionaryEncodedFieldColumnWriter<Long>
+{
+ private ColumnarLongsSerializer longsSerializer;
+
+ protected LongFieldColumnWriter(
+ String columnName,
+ String fieldName,
+ SegmentWriteOutMedium segmentWriteOutMedium,
+ IndexSpec indexSpec,
+ GlobalDictionaryIdLookup globalDictionaryIdLookup
+ )
+ {
+ super(columnName, fieldName, segmentWriteOutMedium, indexSpec, globalDictionaryIdLookup);
+ }
+
+ @Override
+ int lookupGlobalId(Long value)
+ {
+ return globalDictionaryIdLookup.lookupLong(value);
+ }
+
+ @Override
+ public void open() throws IOException
+ {
+ super.open();
+ longsSerializer = CompressionFactory.getLongSerializer(
+ fieldName,
+ segmentWriteOutMedium,
+ StringUtils.format("%s.long_column", fieldName),
+ ByteOrder.nativeOrder(),
+ indexSpec.getLongEncoding(),
+ indexSpec.getDimensionCompression()
+ );
+ longsSerializer.open();
+ }
+
+ @Override
+ void writeValue(@Nullable Long value) throws IOException
+ {
+ if (value == null) {
+ longsSerializer.add(0L);
+ } else {
+ longsSerializer.add(value);
+ }
+ }
+
+ @Override
+ void writeColumnTo(WritableByteChannel channel, FileSmoosher smoosher) throws IOException
+ {
+ writeLongAndDoubleColumnLength(channel, Ints.checkedCast(longsSerializer.getSerializedSize()), 0);
+ longsSerializer.writeTo(channel, smoosher);
+ encodedValueSerializer.writeTo(channel, smoosher);
+ }
+
+ @Override
+ long getSerializedColumnSize() throws IOException
+ {
+ return super.getSerializedColumnSize() + longsSerializer.getSerializedSize();
+ }
+}
diff --git a/processing/src/main/java/org/apache/druid/segment/nested/NestedDataColumnSerializer.java b/processing/src/main/java/org/apache/druid/segment/nested/NestedDataColumnSerializer.java
index f5d782d1e0..485abf14c3 100644
--- a/processing/src/main/java/org/apache/druid/segment/nested/NestedDataColumnSerializer.java
+++ b/processing/src/main/java/org/apache/druid/segment/nested/NestedDataColumnSerializer.java
@@ -21,14 +21,9 @@ package org.apache.druid.segment.nested;
import com.google.common.base.Preconditions;
import com.google.common.collect.Maps;
-import com.google.common.primitives.Ints;
-import it.unimi.dsi.fastutil.ints.Int2IntOpenHashMap;
-import it.unimi.dsi.fastutil.ints.IntArrays;
-import it.unimi.dsi.fastutil.ints.IntIterator;
import org.apache.druid.collections.bitmap.ImmutableBitmap;
import org.apache.druid.collections.bitmap.MutableBitmap;
import org.apache.druid.common.config.NullHandling;
-import org.apache.druid.io.Channels;
import org.apache.druid.java.util.common.StringUtils;
import org.apache.druid.java.util.common.io.Closer;
import org.apache.druid.java.util.common.io.smoosh.FileSmoosher;
@@ -46,24 +41,14 @@ import org.apache.druid.segment.column.TypeStrategy;
import org.apache.druid.segment.column.Types;
import org.apache.druid.segment.column.ValueType;
import org.apache.druid.segment.data.ByteBufferWriter;
-import org.apache.druid.segment.data.ColumnarDoublesSerializer;
-import org.apache.druid.segment.data.ColumnarLongsSerializer;
-import org.apache.druid.segment.data.CompressedVSizeColumnarIntsSerializer;
import org.apache.druid.segment.data.CompressedVariableSizedBlobColumnSerializer;
-import org.apache.druid.segment.data.CompressionFactory;
import org.apache.druid.segment.data.CompressionStrategy;
-import org.apache.druid.segment.data.FixedIndexedIntWriter;
import org.apache.druid.segment.data.FixedIndexedWriter;
import org.apache.druid.segment.data.GenericIndexed;
import org.apache.druid.segment.data.GenericIndexedWriter;
-import org.apache.druid.segment.data.ObjectStrategy;
-import org.apache.druid.segment.data.SingleValueColumnarIntsSerializer;
-import org.apache.druid.segment.data.VSizeColumnarIntsSerializer;
-import org.apache.druid.segment.serde.DictionaryEncodedColumnPartSerde;
import org.apache.druid.segment.serde.Serializer;
import org.apache.druid.segment.writeout.SegmentWriteOutMedium;
-import javax.annotation.Nullable;
import java.io.ByteArrayOutputStream;
import java.io.IOException;
import java.nio.ByteBuffer;
@@ -143,10 +128,15 @@ public class NestedDataColumnSerializer implements GenericColumnSerializer<Struc
@Override
public void open() throws IOException
{
- fieldsWriter = createGenericIndexedWriter(GenericIndexed.STRING_STRATEGY, segmentWriteOutMedium);
+ fieldsWriter = new GenericIndexedWriter<>(segmentWriteOutMedium, name, GenericIndexed.STRING_STRATEGY);
+ fieldsWriter.open();
+
fieldsInfoWriter = new NestedLiteralTypeInfo.Writer(segmentWriteOutMedium);
fieldsInfoWriter.open();
- dictionaryWriter = createGenericIndexedWriter(GenericIndexed.STRING_STRATEGY, segmentWriteOutMedium);
+
+ dictionaryWriter = new GenericIndexedWriter<>(segmentWriteOutMedium, name, GenericIndexed.STRING_STRATEGY);
+ dictionaryWriter.open();
+
longDictionaryWriter = new FixedIndexedWriter<>(
segmentWriteOutMedium,
ColumnType.LONG.getStrategy(),
@@ -155,6 +145,7 @@ public class NestedDataColumnSerializer implements GenericColumnSerializer<Struc
true
);
longDictionaryWriter.open();
+
doubleDictionaryWriter = new FixedIndexedWriter<>(
segmentWriteOutMedium,
ColumnType.DOUBLE.getStrategy(),
@@ -163,17 +154,20 @@ public class NestedDataColumnSerializer implements GenericColumnSerializer<Struc
true
);
doubleDictionaryWriter.open();
+
rawWriter = new CompressedVariableSizedBlobColumnSerializer(
getInternalFileName(name, RAW_FILE_NAME),
segmentWriteOutMedium,
indexSpec.getJsonCompression() != null ? indexSpec.getJsonCompression() : CompressionStrategy.LZ4
);
rawWriter.open();
+
nullBitmapWriter = new ByteBufferWriter<>(
segmentWriteOutMedium,
indexSpec.getBitmapSerdeFactory().getObjectStrategy()
);
nullBitmapWriter.open();
+
nullRowsBitmap = indexSpec.getBitmapSerdeFactory().getBitmapFactory().makeEmptyMutableBitmap();
}
@@ -182,23 +176,48 @@ public class NestedDataColumnSerializer implements GenericColumnSerializer<Struc
this.fields = fields;
this.fieldWriters = Maps.newHashMapWithExpectedSize(fields.size());
for (Map.Entry<String, NestedLiteralTypeInfo.MutableTypeSet> field : fields.entrySet()) {
- fieldsWriter.write(field.getKey());
+ final String fieldName = field.getKey();
+ fieldsWriter.write(fieldName);
fieldsInfoWriter.write(field.getValue());
- final GlobalDictionaryEncodedFieldColumnWriter writer;
- ColumnType type = field.getValue().getSingleType();
+ final GlobalDictionaryEncodedFieldColumnWriter<?> writer;
+ final ColumnType type = field.getValue().getSingleType();
if (type != null) {
if (Types.is(type, ValueType.STRING)) {
- writer = new StringFieldColumnWriter();
+ writer = new StringFieldColumnWriter(
+ name,
+ fieldName,
+ segmentWriteOutMedium,
+ indexSpec,
+ globalDictionaryIdLookup
+ );
} else if (Types.is(type, ValueType.LONG)) {
- writer = new LongFieldColumnWriter();
+ writer = new LongFieldColumnWriter(
+ name,
+ fieldName,
+ segmentWriteOutMedium,
+ indexSpec,
+ globalDictionaryIdLookup
+ );
} else {
- writer = new DoubleFieldColumnWriter();
+ writer = new DoubleFieldColumnWriter(
+ name,
+ fieldName,
+ segmentWriteOutMedium,
+ indexSpec,
+ globalDictionaryIdLookup
+ );
}
} else {
- writer = new VariantLiteralFieldColumnWriter();
+ writer = new VariantLiteralFieldColumnWriter(
+ name,
+ fieldName,
+ segmentWriteOutMedium,
+ indexSpec,
+ globalDictionaryIdLookup
+ );
}
- writer.open(field.getKey());
- fieldWriters.put(field.getKey(), writer);
+ writer.open();
+ fieldWriters.put(fieldName, writer);
}
}
@@ -330,7 +349,7 @@ public class NestedDataColumnSerializer implements GenericColumnSerializer<Struc
for (Map.Entry<String, NestedLiteralTypeInfo.MutableTypeSet> field : fields.entrySet()) {
// remove writer so that it can be collected when we are done with it
GlobalDictionaryEncodedFieldColumnWriter<?> writer = fieldWriters.remove(field.getKey());
- writer.writeTo(field.getKey(), smoosher);
+ writer.writeTo(smoosher);
}
log.info("Column [%s] serialized successfully with [%d] nested columns.", name, fields.size());
}
@@ -343,16 +362,6 @@ public class NestedDataColumnSerializer implements GenericColumnSerializer<Struc
}
}
- private <T> GenericIndexedWriter<T> createGenericIndexedWriter(
- ObjectStrategy<T> objectStrategy,
- SegmentWriteOutMedium writeOutMedium
- ) throws IOException
- {
- GenericIndexedWriter<T> writer = new GenericIndexedWriter<>(writeOutMedium, name, objectStrategy);
- writer.open();
- return writer;
- }
-
public static String getFieldFileName(String fileNameBase, String field)
{
return StringUtils.format("%s_%s", fileNameBase, field);
@@ -363,308 +372,6 @@ public class NestedDataColumnSerializer implements GenericColumnSerializer<Struc
return StringUtils.format("%s.%s", fileNameBase, field);
}
- abstract class GlobalDictionaryEncodedFieldColumnWriter<T>
- {
- protected final LocalDimensionDictionary localDictionary = new LocalDimensionDictionary();
-
- protected FixedIndexedIntWriter intermediateValueWriter;
- // maybe someday we allow no bitmap indexes or multi-value columns
- protected int flags = DictionaryEncodedColumnPartSerde.NO_FLAGS;
- protected DictionaryEncodedColumnPartSerde.VERSION version = null;
- protected SingleValueColumnarIntsSerializer encodedValueSerializer;
-
- T processValue(Object value)
- {
- return (T) value;
- }
-
- void writeValue(@Nullable T value) throws IOException
- {
- // do nothing, if a value column is present this method should be overridden to write the value to the serializer
- }
-
- abstract int lookupGlobalId(T value);
-
- abstract void writeColumnTo(WritableByteChannel channel, FileSmoosher smoosher) throws IOException;
-
- void openColumnSerializer(String field, SegmentWriteOutMedium medium, int maxId) throws IOException
- {
- if (indexSpec.getDimensionCompression() != CompressionStrategy.UNCOMPRESSED) {
- this.version = DictionaryEncodedColumnPartSerde.VERSION.COMPRESSED;
- encodedValueSerializer = CompressedVSizeColumnarIntsSerializer.create(
- field,
- medium,
- name,
- maxId,
- indexSpec.getDimensionCompression()
- );
- } else {
- encodedValueSerializer = new VSizeColumnarIntsSerializer(medium, maxId);
- this.version = DictionaryEncodedColumnPartSerde.VERSION.UNCOMPRESSED_SINGLE_VALUE;
- }
- encodedValueSerializer.open();
- }
-
- long getSerializedColumnSize() throws IOException
- {
- return Integer.BYTES + Integer.BYTES + encodedValueSerializer.getSerializedSize();
- }
-
- public void open(String field) throws IOException
- {
- intermediateValueWriter = new FixedIndexedIntWriter(segmentWriteOutMedium, false);
- intermediateValueWriter.open();
- }
-
- public void writeLongAndDoubleColumnLength(WritableByteChannel channel, int longLength, int doubleLength)
- throws IOException
- {
- ByteBuffer intBuffer = ByteBuffer.allocate(Integer.BYTES).order(ByteOrder.nativeOrder());
- intBuffer.position(0);
- intBuffer.putInt(longLength);
- intBuffer.flip();
- Channels.writeFully(channel, intBuffer);
- intBuffer.position(0);
- intBuffer.limit(intBuffer.capacity());
- intBuffer.putInt(doubleLength);
- intBuffer.flip();
- Channels.writeFully(channel, intBuffer);
- }
-
- public void addValue(Object val) throws IOException
- {
- final T value = processValue(val);
- final int globalId = lookupGlobalId(value);
- final int localId = localDictionary.add(globalId);
- intermediateValueWriter.write(localId);
- writeValue(value);
- }
-
- public void writeTo(String field, FileSmoosher smoosher) throws IOException
- {
- // use a child writeout medium so that we don't leave
- final SegmentWriteOutMedium tmpWriteoutMedium = segmentWriteOutMedium.makeChildWriteOutMedium();
- final FixedIndexedIntWriter sortedDictionaryWriter = new FixedIndexedIntWriter(tmpWriteoutMedium, true);
- sortedDictionaryWriter.open();
- final GenericIndexedWriter<ImmutableBitmap> bitmapIndexWriter = createGenericIndexedWriter(
- indexSpec.getBitmapSerdeFactory().getObjectStrategy(),
- tmpWriteoutMedium
- );
- bitmapIndexWriter.setObjectsNotSorted();
- final Int2IntOpenHashMap globalToUnsorted = localDictionary.getGlobalIdToLocalId();
- final int[] unsortedToGlobal = new int[localDictionary.size()];
- for (int key : globalToUnsorted.keySet()) {
- unsortedToGlobal[globalToUnsorted.get(key)] = key;
- }
- final int[] sortedGlobal = new int[unsortedToGlobal.length];
- System.arraycopy(unsortedToGlobal, 0, sortedGlobal, 0, unsortedToGlobal.length);
- IntArrays.unstableSort(sortedGlobal);
-
- final int[] unsortedToSorted = new int[unsortedToGlobal.length];
- final MutableBitmap[] bitmaps = new MutableBitmap[sortedGlobal.length];
- for (int index = 0; index < sortedGlobal.length; index++) {
- final int globalId = sortedGlobal[index];
- sortedDictionaryWriter.write(globalId);
- final int unsortedId = globalToUnsorted.get(globalId);
- unsortedToSorted[unsortedId] = index;
- bitmaps[index] = indexSpec.getBitmapSerdeFactory().getBitmapFactory().makeEmptyMutableBitmap();
- }
-
- openColumnSerializer(field, tmpWriteoutMedium, sortedGlobal[sortedGlobal.length - 1]);
- final IntIterator rows = intermediateValueWriter.getIterator();
- int rowCount = 0;
- while (rows.hasNext()) {
- final int unsortedLocalId = rows.nextInt();
- final int sortedLocalId = unsortedToSorted[unsortedLocalId];
- encodedValueSerializer.addValue(sortedLocalId);
- bitmaps[sortedLocalId].add(rowCount++);
- }
-
- for (MutableBitmap bitmap : bitmaps) {
- bitmapIndexWriter.write(
- indexSpec.getBitmapSerdeFactory().getBitmapFactory().makeImmutableBitmap(bitmap)
- );
- }
-
- final Serializer fieldSerializer = new Serializer()
- {
- @Override
- public long getSerializedSize() throws IOException
- {
- return 1 + Integer.BYTES +
- sortedDictionaryWriter.getSerializedSize() +
- bitmapIndexWriter.getSerializedSize() +
- getSerializedColumnSize();
- }
-
- @Override
- public void writeTo(WritableByteChannel channel, FileSmoosher smoosher) throws IOException
- {
- Channels.writeFully(channel, ByteBuffer.wrap(new byte[]{version.asByte()}));
- channel.write(ByteBuffer.wrap(Ints.toByteArray(flags)));
- sortedDictionaryWriter.writeTo(channel, smoosher);
- writeColumnTo(channel, smoosher);
- bitmapIndexWriter.writeTo(channel, smoosher);
- }
- };
- final String fieldName = getFieldFileName(name, field);
- final long size = fieldSerializer.getSerializedSize();
- log.debug("Column [%s] serializing [%s] field of size [%d].", name, field, size);
- try (SmooshedWriter smooshChannel = smoosher.addWithSmooshedWriter(fieldName, size)) {
- fieldSerializer.writeTo(smooshChannel, smoosher);
- }
- finally {
- tmpWriteoutMedium.close();
- }
- }
- }
-
- private final class StringFieldColumnWriter extends GlobalDictionaryEncodedFieldColumnWriter<String>
- {
- @Override
- String processValue(Object value)
- {
- return String.valueOf(value);
- }
-
- @Override
- int lookupGlobalId(String value)
- {
- return globalDictionaryIdLookup.lookupString(value);
- }
-
- @Override
- void writeColumnTo(WritableByteChannel channel, FileSmoosher smoosher) throws IOException
- {
- writeLongAndDoubleColumnLength(channel, 0, 0);
- encodedValueSerializer.writeTo(channel, smoosher);
- }
- }
-
- private final class LongFieldColumnWriter extends GlobalDictionaryEncodedFieldColumnWriter<Long>
- {
- private ColumnarLongsSerializer longsSerializer;
-
- @Override
- int lookupGlobalId(Long value)
- {
- return globalDictionaryIdLookup.lookupLong(value);
- }
-
- @Override
- public void open(String field) throws IOException
- {
- super.open(field);
- longsSerializer = CompressionFactory.getLongSerializer(
- field,
- segmentWriteOutMedium,
- StringUtils.format("%s.long_column", name),
- ByteOrder.nativeOrder(),
- indexSpec.getLongEncoding(),
- indexSpec.getDimensionCompression()
- );
- longsSerializer.open();
- }
-
- @Override
- void writeValue(@Nullable Long value) throws IOException
- {
- if (value == null) {
- longsSerializer.add(0L);
- } else {
- longsSerializer.add(value);
- }
- }
-
- @Override
- void writeColumnTo(WritableByteChannel channel, FileSmoosher smoosher) throws IOException
- {
- writeLongAndDoubleColumnLength(channel, Ints.checkedCast(longsSerializer.getSerializedSize()), 0);
- longsSerializer.writeTo(channel, smoosher);
- encodedValueSerializer.writeTo(channel, smoosher);
- }
-
- @Override
- long getSerializedColumnSize() throws IOException
- {
- return super.getSerializedColumnSize() + longsSerializer.getSerializedSize();
- }
- }
-
- private final class DoubleFieldColumnWriter extends GlobalDictionaryEncodedFieldColumnWriter<Double>
- {
- private ColumnarDoublesSerializer doublesSerializer;
-
- @Override
- int lookupGlobalId(Double value)
- {
- return globalDictionaryIdLookup.lookupDouble(value);
- }
-
- @Override
- public void open(String field) throws IOException
- {
- super.open(field);
- doublesSerializer = CompressionFactory.getDoubleSerializer(
- field,
- segmentWriteOutMedium,
- StringUtils.format("%s.double_column", name),
- ByteOrder.nativeOrder(),
- indexSpec.getDimensionCompression()
- );
- doublesSerializer.open();
- }
-
- @Override
- void writeValue(@Nullable Double value) throws IOException
- {
- if (value == null) {
- doublesSerializer.add(0.0);
- } else {
- doublesSerializer.add(value);
- }
- }
-
- @Override
- void writeColumnTo(WritableByteChannel channel, FileSmoosher smoosher) throws IOException
- {
- writeLongAndDoubleColumnLength(channel, 0, Ints.checkedCast(doublesSerializer.getSerializedSize()));
- doublesSerializer.writeTo(channel, smoosher);
- encodedValueSerializer.writeTo(channel, smoosher);
- }
-
- @Override
- long getSerializedColumnSize() throws IOException
- {
- return super.getSerializedColumnSize() + doublesSerializer.getSerializedSize();
- }
- }
-
- private final class VariantLiteralFieldColumnWriter extends GlobalDictionaryEncodedFieldColumnWriter<Object>
- {
- @Override
- int lookupGlobalId(Object value)
- {
- if (value == null) {
- return 0;
- }
- if (value instanceof Long) {
- return globalDictionaryIdLookup.lookupLong((Long) value);
- } else if (value instanceof Double) {
- return globalDictionaryIdLookup.lookupDouble((Double) value);
- } else {
- return globalDictionaryIdLookup.lookupString(String.valueOf(value));
- }
- }
-
- @Override
- void writeColumnTo(WritableByteChannel channel, FileSmoosher smoosher) throws IOException
- {
- writeLongAndDoubleColumnLength(channel, 0, 0);
- encodedValueSerializer.writeTo(channel, smoosher);
- }
- }
-
private static final class IntTypeStrategy implements TypeStrategy<Integer>
{
@Override
diff --git a/processing/src/main/java/org/apache/druid/segment/nested/StringFieldColumnWriter.java b/processing/src/main/java/org/apache/druid/segment/nested/StringFieldColumnWriter.java
new file mode 100644
index 0000000000..637aa1fb69
--- /dev/null
+++ b/processing/src/main/java/org/apache/druid/segment/nested/StringFieldColumnWriter.java
@@ -0,0 +1,63 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.druid.segment.nested;
+
+import org.apache.druid.java.util.common.io.smoosh.FileSmoosher;
+import org.apache.druid.segment.IndexSpec;
+import org.apache.druid.segment.writeout.SegmentWriteOutMedium;
+
+import java.io.IOException;
+import java.nio.channels.WritableByteChannel;
+
+/**
+ * Literal field writer for string type nested columns of {@link NestedDataColumnSerializer}
+ */
+public final class StringFieldColumnWriter extends GlobalDictionaryEncodedFieldColumnWriter<String>
+{
+ public StringFieldColumnWriter(
+ String columnName,
+ String fieldName,
+ SegmentWriteOutMedium segmentWriteOutMedium,
+ IndexSpec indexSpec,
+ GlobalDictionaryIdLookup globalDictionaryIdLookup
+ )
+ {
+ super(columnName, fieldName, segmentWriteOutMedium, indexSpec, globalDictionaryIdLookup);
+ }
+
+ @Override
+ String processValue(Object value)
+ {
+ return String.valueOf(value);
+ }
+
+ @Override
+ int lookupGlobalId(String value)
+ {
+ return globalDictionaryIdLookup.lookupString(value);
+ }
+
+ @Override
+ void writeColumnTo(WritableByteChannel channel, FileSmoosher smoosher) throws IOException
+ {
+ writeLongAndDoubleColumnLength(channel, 0, 0);
+ encodedValueSerializer.writeTo(channel, smoosher);
+ }
+}
diff --git a/processing/src/main/java/org/apache/druid/segment/nested/VariantLiteralFieldColumnWriter.java b/processing/src/main/java/org/apache/druid/segment/nested/VariantLiteralFieldColumnWriter.java
new file mode 100644
index 0000000000..7440c12004
--- /dev/null
+++ b/processing/src/main/java/org/apache/druid/segment/nested/VariantLiteralFieldColumnWriter.java
@@ -0,0 +1,66 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.druid.segment.nested;
+
+import org.apache.druid.java.util.common.io.smoosh.FileSmoosher;
+import org.apache.druid.segment.IndexSpec;
+import org.apache.druid.segment.writeout.SegmentWriteOutMedium;
+
+import java.io.IOException;
+import java.nio.channels.WritableByteChannel;
+
+/**
+ * Literal field writer for mixed type nested columns of {@link NestedDataColumnSerializer}
+ */
+public final class VariantLiteralFieldColumnWriter extends GlobalDictionaryEncodedFieldColumnWriter<Object>
+{
+ public VariantLiteralFieldColumnWriter(
+ String columnName,
+ String fieldName,
+ SegmentWriteOutMedium segmentWriteOutMedium,
+ IndexSpec indexSpec,
+ GlobalDictionaryIdLookup globalDictionaryIdLookup
+ )
+ {
+ super(columnName, fieldName, segmentWriteOutMedium, indexSpec, globalDictionaryIdLookup);
+ }
+
+ @Override
+ int lookupGlobalId(Object value)
+ {
+ if (value == null) {
+ return 0;
+ }
+ if (value instanceof Long) {
+ return globalDictionaryIdLookup.lookupLong((Long) value);
+ } else if (value instanceof Double) {
+ return globalDictionaryIdLookup.lookupDouble((Double) value);
+ } else {
+ return globalDictionaryIdLookup.lookupString(String.valueOf(value));
+ }
+ }
+
+ @Override
+ void writeColumnTo(WritableByteChannel channel, FileSmoosher smoosher) throws IOException
+ {
+ writeLongAndDoubleColumnLength(channel, 0, 0);
+ encodedValueSerializer.writeTo(channel, smoosher);
+ }
+}
---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscribe@druid.apache.org
For additional commands, e-mail: commits-help@druid.apache.org