You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@druid.apache.org by cw...@apache.org on 2022/09/16 08:29:02 UTC

[druid] branch master updated: split up NestedDataColumnSerializer into separate files (#13096)

This is an automated email from the ASF dual-hosted git repository.

cwylie pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/druid.git


The following commit(s) were added to refs/heads/master by this push:
     new 5ece870634 split up NestedDataColumnSerializer into separate files (#13096)
5ece870634 is described below

commit 5ece87063430e142151af88c7941222991cc5763
Author: Clint Wylie <cw...@apache.org>
AuthorDate: Fri Sep 16 01:28:47 2022 -0700

    split up NestedDataColumnSerializer into separate files (#13096)
    
    * split up NestedDataColumnSerializer into separate files
    
    * fix it
---
 .../segment/nested/DoubleFieldColumnWriter.java    |  98 ++++++
 .../GlobalDictionaryEncodedFieldColumnWriter.java  | 264 ++++++++++++++
 .../segment/nested/LongFieldColumnWriter.java      |  99 ++++++
 .../segment/nested/NestedDataColumnSerializer.java | 385 +++------------------
 .../segment/nested/StringFieldColumnWriter.java    |  63 ++++
 .../nested/VariantLiteralFieldColumnWriter.java    |  66 ++++
 6 files changed, 636 insertions(+), 339 deletions(-)

diff --git a/processing/src/main/java/org/apache/druid/segment/nested/DoubleFieldColumnWriter.java b/processing/src/main/java/org/apache/druid/segment/nested/DoubleFieldColumnWriter.java
new file mode 100644
index 0000000000..623eb1490f
--- /dev/null
+++ b/processing/src/main/java/org/apache/druid/segment/nested/DoubleFieldColumnWriter.java
@@ -0,0 +1,98 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.druid.segment.nested;
+
+import com.google.common.primitives.Ints;
+import org.apache.druid.java.util.common.StringUtils;
+import org.apache.druid.java.util.common.io.smoosh.FileSmoosher;
+import org.apache.druid.segment.IndexSpec;
+import org.apache.druid.segment.data.ColumnarDoublesSerializer;
+import org.apache.druid.segment.data.CompressionFactory;
+import org.apache.druid.segment.writeout.SegmentWriteOutMedium;
+
+import javax.annotation.Nullable;
+import java.io.IOException;
+import java.nio.ByteOrder;
+import java.nio.channels.WritableByteChannel;
+
+/**
+ * Literal field writer for double type nested columns of {@link NestedDataColumnSerializer}. In addition to the normal
+ * dictionary encoded column, this writer also writes an additional double value column with {@link #doublesSerializer},
+ * which is written to during {@link #addValue}.
+ */
+public final class DoubleFieldColumnWriter extends GlobalDictionaryEncodedFieldColumnWriter<Double>
+{
+  private ColumnarDoublesSerializer doublesSerializer;
+
+  protected DoubleFieldColumnWriter(
+      String columnName,
+      String fieldName,
+      SegmentWriteOutMedium segmentWriteOutMedium,
+      IndexSpec indexSpec,
+      GlobalDictionaryIdLookup globalDictionaryIdLookup
+  )
+  {
+    super(columnName, fieldName, segmentWriteOutMedium, indexSpec, globalDictionaryIdLookup);
+  }
+
+  @Override
+  int lookupGlobalId(Double value)
+  {
+    return globalDictionaryIdLookup.lookupDouble(value);
+  }
+
+  @Override
+  public void open() throws IOException
+  {
+    super.open();
+    doublesSerializer = CompressionFactory.getDoubleSerializer(
+        fieldName,
+        segmentWriteOutMedium,
+        StringUtils.format("%s.double_column", fieldName),
+        ByteOrder.nativeOrder(),
+        indexSpec.getDimensionCompression()
+    );
+    doublesSerializer.open();
+  }
+
+  @Override
+  void writeValue(@Nullable Double value) throws IOException
+  {
+    if (value == null) {
+      doublesSerializer.add(0.0);
+    } else {
+      doublesSerializer.add(value);
+    }
+  }
+
+  @Override
+  void writeColumnTo(WritableByteChannel channel, FileSmoosher smoosher) throws IOException
+  {
+    writeLongAndDoubleColumnLength(channel, 0, Ints.checkedCast(doublesSerializer.getSerializedSize()));
+    doublesSerializer.writeTo(channel, smoosher);
+    encodedValueSerializer.writeTo(channel, smoosher);
+  }
+
+  @Override
+  long getSerializedColumnSize() throws IOException
+  {
+    return super.getSerializedColumnSize() + doublesSerializer.getSerializedSize();
+  }
+}
diff --git a/processing/src/main/java/org/apache/druid/segment/nested/GlobalDictionaryEncodedFieldColumnWriter.java b/processing/src/main/java/org/apache/druid/segment/nested/GlobalDictionaryEncodedFieldColumnWriter.java
new file mode 100644
index 0000000000..ed30c84b79
--- /dev/null
+++ b/processing/src/main/java/org/apache/druid/segment/nested/GlobalDictionaryEncodedFieldColumnWriter.java
@@ -0,0 +1,264 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.druid.segment.nested;
+
+import com.google.common.primitives.Ints;
+import it.unimi.dsi.fastutil.ints.Int2IntOpenHashMap;
+import it.unimi.dsi.fastutil.ints.IntArrays;
+import it.unimi.dsi.fastutil.ints.IntIterator;
+import org.apache.druid.collections.bitmap.ImmutableBitmap;
+import org.apache.druid.collections.bitmap.MutableBitmap;
+import org.apache.druid.io.Channels;
+import org.apache.druid.java.util.common.io.smoosh.FileSmoosher;
+import org.apache.druid.java.util.common.io.smoosh.SmooshedWriter;
+import org.apache.druid.java.util.common.logger.Logger;
+import org.apache.druid.segment.IndexSpec;
+import org.apache.druid.segment.data.CompressedVSizeColumnarIntsSerializer;
+import org.apache.druid.segment.data.CompressionStrategy;
+import org.apache.druid.segment.data.FixedIndexedIntWriter;
+import org.apache.druid.segment.data.GenericIndexedWriter;
+import org.apache.druid.segment.data.SingleValueColumnarIntsSerializer;
+import org.apache.druid.segment.data.VSizeColumnarIntsSerializer;
+import org.apache.druid.segment.serde.DictionaryEncodedColumnPartSerde;
+import org.apache.druid.segment.serde.Serializer;
+import org.apache.druid.segment.writeout.SegmentWriteOutMedium;
+
+import javax.annotation.Nullable;
+import java.io.IOException;
+import java.nio.ByteBuffer;
+import java.nio.ByteOrder;
+import java.nio.channels.WritableByteChannel;
+
+/**
+ * Base class for writer of global dictionary encoded nested literal columns for {@link NestedDataColumnSerializer}.
+ * {@link NestedDataColumnSerializer} while processing the 'raw' nested data will call {@link #addValue(Object)} for
+ * all literal writers, which for this type of writer entails building a local dictionary to map into to the global
+ * dictionary ({@link #localDictionary}) and writes this unsorted localId to an intermediate integer column,
+ * {@link #intermediateValueWriter}.
+ *
+ * When processing the 'raw' value column is complete, the {@link #writeTo(FileSmoosher)} method will sort the
+ * local ids and write them out to a local sorted dictionary, iterate over {@link #intermediateValueWriter} swapping
+ * the unsorted local ids with the sorted ids and writing to the compressed id column writer
+ * {@link #encodedValueSerializer} building the bitmap indexes along the way.
+ */
+public abstract class GlobalDictionaryEncodedFieldColumnWriter<T>
+{
+  private static final Logger log = new Logger(GlobalDictionaryEncodedFieldColumnWriter.class);
+
+  protected final SegmentWriteOutMedium segmentWriteOutMedium;
+  protected final String columnName;
+  protected final String fieldName;
+  protected final IndexSpec indexSpec;
+  protected final GlobalDictionaryIdLookup globalDictionaryIdLookup;
+  protected final LocalDimensionDictionary localDictionary = new LocalDimensionDictionary();
+
+  protected FixedIndexedIntWriter intermediateValueWriter;
+  // maybe someday we allow no bitmap indexes or multi-value columns
+  protected int flags = DictionaryEncodedColumnPartSerde.NO_FLAGS;
+  protected DictionaryEncodedColumnPartSerde.VERSION version = null;
+  protected SingleValueColumnarIntsSerializer encodedValueSerializer;
+
+  protected GlobalDictionaryEncodedFieldColumnWriter(
+      String columnName,
+      String fieldName,
+      SegmentWriteOutMedium segmentWriteOutMedium,
+      IndexSpec indexSpec,
+      GlobalDictionaryIdLookup globalDictionaryIdLookup
+  )
+  {
+    this.columnName = columnName;
+    this.fieldName = fieldName;
+    this.segmentWriteOutMedium = segmentWriteOutMedium;
+    this.indexSpec = indexSpec;
+    this.globalDictionaryIdLookup = globalDictionaryIdLookup;
+  }
+
+  /**
+   * Perform any value conversion needed before storing the value in the
+   */
+  T processValue(Object value)
+  {
+    return (T) value;
+  }
+
+  /**
+   * Hook to allow implementors the chance to do additional operations during {@link #addValue(Object)}, such as
+   * writing an additional value column
+   */
+  void writeValue(@Nullable T value) throws IOException
+  {
+    // do nothing, if a value column is present this method should be overridden to write the value to the serializer
+  }
+
+  /**
+   * Find a value in {@link #globalDictionaryIdLookup} as is most appropriate to the writer type
+   */
+  abstract int lookupGlobalId(T value);
+
+  /**
+   * Open the writer so that {@link #addValue(Object)} can be called
+   */
+  public void open() throws IOException
+  {
+    intermediateValueWriter = new FixedIndexedIntWriter(segmentWriteOutMedium, false);
+    intermediateValueWriter.open();
+  }
+
+  /**
+   * Add a value to the unsorted local dictionary and write to an intermediate column
+   */
+  public void addValue(Object val) throws IOException
+  {
+    final T value = processValue(val);
+    final int globalId = lookupGlobalId(value);
+    final int localId = localDictionary.add(globalId);
+    intermediateValueWriter.write(localId);
+    writeValue(value);
+  }
+
+
+  /**
+   * How many bytes {@link #writeColumnTo(WritableByteChannel, FileSmoosher)} is expected to write to the segment.
+   */
+  long getSerializedColumnSize() throws IOException
+  {
+    return Integer.BYTES + Integer.BYTES + encodedValueSerializer.getSerializedSize();
+  }
+
+  /**
+   * Defines how to write the column, including the dictionary id column, along with any additional columns
+   * such as the long or double value column as type appropriate.
+   */
+  abstract void writeColumnTo(WritableByteChannel channel, FileSmoosher smoosher) throws IOException;
+
+  public void writeTo(FileSmoosher smoosher) throws IOException
+  {
+    // use a child writeout medium so that we can close them when we are finished and don't leave temporary files
+    // hanging out until the entire segment is done
+    final SegmentWriteOutMedium tmpWriteoutMedium = segmentWriteOutMedium.makeChildWriteOutMedium();
+    final FixedIndexedIntWriter sortedDictionaryWriter = new FixedIndexedIntWriter(tmpWriteoutMedium, true);
+    sortedDictionaryWriter.open();
+    GenericIndexedWriter<ImmutableBitmap> bitmapIndexWriter = new GenericIndexedWriter<>(
+        tmpWriteoutMedium,
+        columnName,
+        indexSpec.getBitmapSerdeFactory().getObjectStrategy()
+    );
+    bitmapIndexWriter.open();
+    bitmapIndexWriter.setObjectsNotSorted();
+    final Int2IntOpenHashMap globalToUnsorted = localDictionary.getGlobalIdToLocalId();
+    final int[] unsortedToGlobal = new int[localDictionary.size()];
+    for (int key : globalToUnsorted.keySet()) {
+      unsortedToGlobal[globalToUnsorted.get(key)] = key;
+    }
+    final int[] sortedGlobal = new int[unsortedToGlobal.length];
+    System.arraycopy(unsortedToGlobal, 0, sortedGlobal, 0, unsortedToGlobal.length);
+    IntArrays.unstableSort(sortedGlobal);
+
+    final int[] unsortedToSorted = new int[unsortedToGlobal.length];
+    final MutableBitmap[] bitmaps = new MutableBitmap[sortedGlobal.length];
+    for (int index = 0; index < sortedGlobal.length; index++) {
+      final int globalId = sortedGlobal[index];
+      sortedDictionaryWriter.write(globalId);
+      final int unsortedId = globalToUnsorted.get(globalId);
+      unsortedToSorted[unsortedId] = index;
+      bitmaps[index] = indexSpec.getBitmapSerdeFactory().getBitmapFactory().makeEmptyMutableBitmap();
+    }
+
+    openColumnSerializer(tmpWriteoutMedium, sortedGlobal[sortedGlobal.length - 1]);
+    final IntIterator rows = intermediateValueWriter.getIterator();
+    int rowCount = 0;
+    while (rows.hasNext()) {
+      final int unsortedLocalId = rows.nextInt();
+      final int sortedLocalId = unsortedToSorted[unsortedLocalId];
+      encodedValueSerializer.addValue(sortedLocalId);
+      bitmaps[sortedLocalId].add(rowCount++);
+    }
+
+    for (MutableBitmap bitmap : bitmaps) {
+      bitmapIndexWriter.write(
+          indexSpec.getBitmapSerdeFactory().getBitmapFactory().makeImmutableBitmap(bitmap)
+      );
+    }
+
+    final Serializer fieldSerializer = new Serializer()
+    {
+      @Override
+      public long getSerializedSize() throws IOException
+      {
+        return 1 + Integer.BYTES +
+               sortedDictionaryWriter.getSerializedSize() +
+               bitmapIndexWriter.getSerializedSize() +
+               getSerializedColumnSize();
+      }
+
+      @Override
+      public void writeTo(WritableByteChannel channel, FileSmoosher smoosher) throws IOException
+      {
+        Channels.writeFully(channel, ByteBuffer.wrap(new byte[]{version.asByte()}));
+        channel.write(ByteBuffer.wrap(Ints.toByteArray(flags)));
+        sortedDictionaryWriter.writeTo(channel, smoosher);
+        writeColumnTo(channel, smoosher);
+        bitmapIndexWriter.writeTo(channel, smoosher);
+      }
+    };
+    final String fieldFileName = NestedDataColumnSerializer.getFieldFileName(columnName, fieldName);
+    final long size = fieldSerializer.getSerializedSize();
+    log.debug("Column [%s] serializing [%s] field of size [%d].", columnName, fieldName, size);
+    try (SmooshedWriter smooshChannel = smoosher.addWithSmooshedWriter(fieldFileName, size)) {
+      fieldSerializer.writeTo(smooshChannel, smoosher);
+    }
+    finally {
+      tmpWriteoutMedium.close();
+    }
+  }
+
+  private void openColumnSerializer(SegmentWriteOutMedium medium, int maxId) throws IOException
+  {
+    if (indexSpec.getDimensionCompression() != CompressionStrategy.UNCOMPRESSED) {
+      this.version = DictionaryEncodedColumnPartSerde.VERSION.COMPRESSED;
+      encodedValueSerializer = CompressedVSizeColumnarIntsSerializer.create(
+          fieldName,
+          medium,
+          columnName,
+          maxId,
+          indexSpec.getDimensionCompression()
+      );
+    } else {
+      encodedValueSerializer = new VSizeColumnarIntsSerializer(medium, maxId);
+      this.version = DictionaryEncodedColumnPartSerde.VERSION.UNCOMPRESSED_SINGLE_VALUE;
+    }
+    encodedValueSerializer.open();
+  }
+
+  public void writeLongAndDoubleColumnLength(WritableByteChannel channel, int longLength, int doubleLength)
+      throws IOException
+  {
+    ByteBuffer intBuffer = ByteBuffer.allocate(Integer.BYTES).order(ByteOrder.nativeOrder());
+    intBuffer.position(0);
+    intBuffer.putInt(longLength);
+    intBuffer.flip();
+    Channels.writeFully(channel, intBuffer);
+    intBuffer.position(0);
+    intBuffer.limit(intBuffer.capacity());
+    intBuffer.putInt(doubleLength);
+    intBuffer.flip();
+    Channels.writeFully(channel, intBuffer);
+  }
+}
diff --git a/processing/src/main/java/org/apache/druid/segment/nested/LongFieldColumnWriter.java b/processing/src/main/java/org/apache/druid/segment/nested/LongFieldColumnWriter.java
new file mode 100644
index 0000000000..1321a00023
--- /dev/null
+++ b/processing/src/main/java/org/apache/druid/segment/nested/LongFieldColumnWriter.java
@@ -0,0 +1,99 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.druid.segment.nested;
+
+import com.google.common.primitives.Ints;
+import org.apache.druid.java.util.common.StringUtils;
+import org.apache.druid.java.util.common.io.smoosh.FileSmoosher;
+import org.apache.druid.segment.IndexSpec;
+import org.apache.druid.segment.data.ColumnarLongsSerializer;
+import org.apache.druid.segment.data.CompressionFactory;
+import org.apache.druid.segment.writeout.SegmentWriteOutMedium;
+
+import javax.annotation.Nullable;
+import java.io.IOException;
+import java.nio.ByteOrder;
+import java.nio.channels.WritableByteChannel;
+
+/**
+ * Literal field writer for long type nested columns of {@link NestedDataColumnSerializer}. In addition to the normal
+ * dictionary encoded column, this writer also writes an additional long value column with {@link #longsSerializer},
+ * which is written to during {@link #addValue}.
+ */
+public final class LongFieldColumnWriter extends GlobalDictionaryEncodedFieldColumnWriter<Long>
+{
+  private ColumnarLongsSerializer longsSerializer;
+
+  protected LongFieldColumnWriter(
+      String columnName,
+      String fieldName,
+      SegmentWriteOutMedium segmentWriteOutMedium,
+      IndexSpec indexSpec,
+      GlobalDictionaryIdLookup globalDictionaryIdLookup
+  )
+  {
+    super(columnName, fieldName, segmentWriteOutMedium, indexSpec, globalDictionaryIdLookup);
+  }
+
+  @Override
+  int lookupGlobalId(Long value)
+  {
+    return globalDictionaryIdLookup.lookupLong(value);
+  }
+
+  @Override
+  public void open() throws IOException
+  {
+    super.open();
+    longsSerializer = CompressionFactory.getLongSerializer(
+        fieldName,
+        segmentWriteOutMedium,
+        StringUtils.format("%s.long_column", fieldName),
+        ByteOrder.nativeOrder(),
+        indexSpec.getLongEncoding(),
+        indexSpec.getDimensionCompression()
+    );
+    longsSerializer.open();
+  }
+
+  @Override
+  void writeValue(@Nullable Long value) throws IOException
+  {
+    if (value == null) {
+      longsSerializer.add(0L);
+    } else {
+      longsSerializer.add(value);
+    }
+  }
+
+  @Override
+  void writeColumnTo(WritableByteChannel channel, FileSmoosher smoosher) throws IOException
+  {
+    writeLongAndDoubleColumnLength(channel, Ints.checkedCast(longsSerializer.getSerializedSize()), 0);
+    longsSerializer.writeTo(channel, smoosher);
+    encodedValueSerializer.writeTo(channel, smoosher);
+  }
+
+  @Override
+  long getSerializedColumnSize() throws IOException
+  {
+    return super.getSerializedColumnSize() + longsSerializer.getSerializedSize();
+  }
+}
diff --git a/processing/src/main/java/org/apache/druid/segment/nested/NestedDataColumnSerializer.java b/processing/src/main/java/org/apache/druid/segment/nested/NestedDataColumnSerializer.java
index f5d782d1e0..485abf14c3 100644
--- a/processing/src/main/java/org/apache/druid/segment/nested/NestedDataColumnSerializer.java
+++ b/processing/src/main/java/org/apache/druid/segment/nested/NestedDataColumnSerializer.java
@@ -21,14 +21,9 @@ package org.apache.druid.segment.nested;
 
 import com.google.common.base.Preconditions;
 import com.google.common.collect.Maps;
-import com.google.common.primitives.Ints;
-import it.unimi.dsi.fastutil.ints.Int2IntOpenHashMap;
-import it.unimi.dsi.fastutil.ints.IntArrays;
-import it.unimi.dsi.fastutil.ints.IntIterator;
 import org.apache.druid.collections.bitmap.ImmutableBitmap;
 import org.apache.druid.collections.bitmap.MutableBitmap;
 import org.apache.druid.common.config.NullHandling;
-import org.apache.druid.io.Channels;
 import org.apache.druid.java.util.common.StringUtils;
 import org.apache.druid.java.util.common.io.Closer;
 import org.apache.druid.java.util.common.io.smoosh.FileSmoosher;
@@ -46,24 +41,14 @@ import org.apache.druid.segment.column.TypeStrategy;
 import org.apache.druid.segment.column.Types;
 import org.apache.druid.segment.column.ValueType;
 import org.apache.druid.segment.data.ByteBufferWriter;
-import org.apache.druid.segment.data.ColumnarDoublesSerializer;
-import org.apache.druid.segment.data.ColumnarLongsSerializer;
-import org.apache.druid.segment.data.CompressedVSizeColumnarIntsSerializer;
 import org.apache.druid.segment.data.CompressedVariableSizedBlobColumnSerializer;
-import org.apache.druid.segment.data.CompressionFactory;
 import org.apache.druid.segment.data.CompressionStrategy;
-import org.apache.druid.segment.data.FixedIndexedIntWriter;
 import org.apache.druid.segment.data.FixedIndexedWriter;
 import org.apache.druid.segment.data.GenericIndexed;
 import org.apache.druid.segment.data.GenericIndexedWriter;
-import org.apache.druid.segment.data.ObjectStrategy;
-import org.apache.druid.segment.data.SingleValueColumnarIntsSerializer;
-import org.apache.druid.segment.data.VSizeColumnarIntsSerializer;
-import org.apache.druid.segment.serde.DictionaryEncodedColumnPartSerde;
 import org.apache.druid.segment.serde.Serializer;
 import org.apache.druid.segment.writeout.SegmentWriteOutMedium;
 
-import javax.annotation.Nullable;
 import java.io.ByteArrayOutputStream;
 import java.io.IOException;
 import java.nio.ByteBuffer;
@@ -143,10 +128,15 @@ public class NestedDataColumnSerializer implements GenericColumnSerializer<Struc
   @Override
   public void open() throws IOException
   {
-    fieldsWriter = createGenericIndexedWriter(GenericIndexed.STRING_STRATEGY, segmentWriteOutMedium);
+    fieldsWriter = new GenericIndexedWriter<>(segmentWriteOutMedium, name, GenericIndexed.STRING_STRATEGY);
+    fieldsWriter.open();
+
     fieldsInfoWriter = new NestedLiteralTypeInfo.Writer(segmentWriteOutMedium);
     fieldsInfoWriter.open();
-    dictionaryWriter = createGenericIndexedWriter(GenericIndexed.STRING_STRATEGY, segmentWriteOutMedium);
+
+    dictionaryWriter = new GenericIndexedWriter<>(segmentWriteOutMedium, name, GenericIndexed.STRING_STRATEGY);
+    dictionaryWriter.open();
+
     longDictionaryWriter = new FixedIndexedWriter<>(
         segmentWriteOutMedium,
         ColumnType.LONG.getStrategy(),
@@ -155,6 +145,7 @@ public class NestedDataColumnSerializer implements GenericColumnSerializer<Struc
         true
     );
     longDictionaryWriter.open();
+
     doubleDictionaryWriter = new FixedIndexedWriter<>(
         segmentWriteOutMedium,
         ColumnType.DOUBLE.getStrategy(),
@@ -163,17 +154,20 @@ public class NestedDataColumnSerializer implements GenericColumnSerializer<Struc
         true
     );
     doubleDictionaryWriter.open();
+
     rawWriter = new CompressedVariableSizedBlobColumnSerializer(
         getInternalFileName(name, RAW_FILE_NAME),
         segmentWriteOutMedium,
         indexSpec.getJsonCompression() != null ? indexSpec.getJsonCompression() : CompressionStrategy.LZ4
     );
     rawWriter.open();
+
     nullBitmapWriter = new ByteBufferWriter<>(
         segmentWriteOutMedium,
         indexSpec.getBitmapSerdeFactory().getObjectStrategy()
     );
     nullBitmapWriter.open();
+
     nullRowsBitmap = indexSpec.getBitmapSerdeFactory().getBitmapFactory().makeEmptyMutableBitmap();
   }
 
@@ -182,23 +176,48 @@ public class NestedDataColumnSerializer implements GenericColumnSerializer<Struc
     this.fields = fields;
     this.fieldWriters = Maps.newHashMapWithExpectedSize(fields.size());
     for (Map.Entry<String, NestedLiteralTypeInfo.MutableTypeSet> field : fields.entrySet()) {
-      fieldsWriter.write(field.getKey());
+      final String fieldName = field.getKey();
+      fieldsWriter.write(fieldName);
       fieldsInfoWriter.write(field.getValue());
-      final GlobalDictionaryEncodedFieldColumnWriter writer;
-      ColumnType type = field.getValue().getSingleType();
+      final GlobalDictionaryEncodedFieldColumnWriter<?> writer;
+      final ColumnType type = field.getValue().getSingleType();
       if (type != null) {
         if (Types.is(type, ValueType.STRING)) {
-          writer = new StringFieldColumnWriter();
+          writer = new StringFieldColumnWriter(
+              name,
+              fieldName,
+              segmentWriteOutMedium,
+              indexSpec,
+              globalDictionaryIdLookup
+          );
         } else if (Types.is(type, ValueType.LONG)) {
-          writer = new LongFieldColumnWriter();
+          writer = new LongFieldColumnWriter(
+              name,
+              fieldName,
+              segmentWriteOutMedium,
+              indexSpec,
+              globalDictionaryIdLookup
+          );
         } else {
-          writer = new DoubleFieldColumnWriter();
+          writer = new DoubleFieldColumnWriter(
+              name,
+              fieldName,
+              segmentWriteOutMedium,
+              indexSpec,
+              globalDictionaryIdLookup
+          );
         }
       } else {
-        writer = new VariantLiteralFieldColumnWriter();
+        writer = new VariantLiteralFieldColumnWriter(
+            name,
+            fieldName,
+            segmentWriteOutMedium,
+            indexSpec,
+            globalDictionaryIdLookup
+        );
       }
-      writer.open(field.getKey());
-      fieldWriters.put(field.getKey(), writer);
+      writer.open();
+      fieldWriters.put(fieldName, writer);
     }
   }
 
@@ -330,7 +349,7 @@ public class NestedDataColumnSerializer implements GenericColumnSerializer<Struc
     for (Map.Entry<String, NestedLiteralTypeInfo.MutableTypeSet> field : fields.entrySet()) {
       // remove writer so that it can be collected when we are done with it
       GlobalDictionaryEncodedFieldColumnWriter<?> writer = fieldWriters.remove(field.getKey());
-      writer.writeTo(field.getKey(), smoosher);
+      writer.writeTo(smoosher);
     }
     log.info("Column [%s] serialized successfully with [%d] nested columns.", name, fields.size());
   }
@@ -343,16 +362,6 @@ public class NestedDataColumnSerializer implements GenericColumnSerializer<Struc
     }
   }
 
-  private <T> GenericIndexedWriter<T> createGenericIndexedWriter(
-      ObjectStrategy<T> objectStrategy,
-      SegmentWriteOutMedium writeOutMedium
-  ) throws IOException
-  {
-    GenericIndexedWriter<T> writer = new GenericIndexedWriter<>(writeOutMedium, name, objectStrategy);
-    writer.open();
-    return writer;
-  }
-
   public static String getFieldFileName(String fileNameBase, String field)
   {
     return StringUtils.format("%s_%s", fileNameBase, field);
@@ -363,308 +372,6 @@ public class NestedDataColumnSerializer implements GenericColumnSerializer<Struc
     return StringUtils.format("%s.%s", fileNameBase, field);
   }
 
-  abstract class GlobalDictionaryEncodedFieldColumnWriter<T>
-  {
-    protected final LocalDimensionDictionary localDictionary = new LocalDimensionDictionary();
-
-    protected FixedIndexedIntWriter intermediateValueWriter;
-    // maybe someday we allow no bitmap indexes or multi-value columns
-    protected int flags = DictionaryEncodedColumnPartSerde.NO_FLAGS;
-    protected DictionaryEncodedColumnPartSerde.VERSION version = null;
-    protected SingleValueColumnarIntsSerializer encodedValueSerializer;
-
-    T processValue(Object value)
-    {
-      return (T) value;
-    }
-
-    void writeValue(@Nullable T value) throws IOException
-    {
-      // do nothing, if a value column is present this method should be overridden to write the value to the serializer
-    }
-
-    abstract int lookupGlobalId(T value);
-
-    abstract void writeColumnTo(WritableByteChannel channel, FileSmoosher smoosher) throws IOException;
-
-    void openColumnSerializer(String field, SegmentWriteOutMedium medium, int maxId) throws IOException
-    {
-      if (indexSpec.getDimensionCompression() != CompressionStrategy.UNCOMPRESSED) {
-        this.version = DictionaryEncodedColumnPartSerde.VERSION.COMPRESSED;
-        encodedValueSerializer = CompressedVSizeColumnarIntsSerializer.create(
-            field,
-            medium,
-            name,
-            maxId,
-            indexSpec.getDimensionCompression()
-        );
-      } else {
-        encodedValueSerializer = new VSizeColumnarIntsSerializer(medium, maxId);
-        this.version = DictionaryEncodedColumnPartSerde.VERSION.UNCOMPRESSED_SINGLE_VALUE;
-      }
-      encodedValueSerializer.open();
-    }
-
-    long getSerializedColumnSize() throws IOException
-    {
-      return Integer.BYTES + Integer.BYTES + encodedValueSerializer.getSerializedSize();
-    }
-
-    public void open(String field) throws IOException
-    {
-      intermediateValueWriter = new FixedIndexedIntWriter(segmentWriteOutMedium, false);
-      intermediateValueWriter.open();
-    }
-
-    public void writeLongAndDoubleColumnLength(WritableByteChannel channel, int longLength, int doubleLength)
-        throws IOException
-    {
-      ByteBuffer intBuffer = ByteBuffer.allocate(Integer.BYTES).order(ByteOrder.nativeOrder());
-      intBuffer.position(0);
-      intBuffer.putInt(longLength);
-      intBuffer.flip();
-      Channels.writeFully(channel, intBuffer);
-      intBuffer.position(0);
-      intBuffer.limit(intBuffer.capacity());
-      intBuffer.putInt(doubleLength);
-      intBuffer.flip();
-      Channels.writeFully(channel, intBuffer);
-    }
-
-    public void addValue(Object val) throws IOException
-    {
-      final T value = processValue(val);
-      final int globalId = lookupGlobalId(value);
-      final int localId = localDictionary.add(globalId);
-      intermediateValueWriter.write(localId);
-      writeValue(value);
-    }
-
-    public void writeTo(String field, FileSmoosher smoosher) throws IOException
-    {
-      // use a child writeout medium so that we don't leave
-      final SegmentWriteOutMedium tmpWriteoutMedium = segmentWriteOutMedium.makeChildWriteOutMedium();
-      final FixedIndexedIntWriter sortedDictionaryWriter = new FixedIndexedIntWriter(tmpWriteoutMedium, true);
-      sortedDictionaryWriter.open();
-      final GenericIndexedWriter<ImmutableBitmap> bitmapIndexWriter = createGenericIndexedWriter(
-          indexSpec.getBitmapSerdeFactory().getObjectStrategy(),
-          tmpWriteoutMedium
-      );
-      bitmapIndexWriter.setObjectsNotSorted();
-      final Int2IntOpenHashMap globalToUnsorted = localDictionary.getGlobalIdToLocalId();
-      final int[] unsortedToGlobal = new int[localDictionary.size()];
-      for (int key : globalToUnsorted.keySet()) {
-        unsortedToGlobal[globalToUnsorted.get(key)] = key;
-      }
-      final int[] sortedGlobal = new int[unsortedToGlobal.length];
-      System.arraycopy(unsortedToGlobal, 0, sortedGlobal, 0, unsortedToGlobal.length);
-      IntArrays.unstableSort(sortedGlobal);
-
-      final int[] unsortedToSorted = new int[unsortedToGlobal.length];
-      final MutableBitmap[] bitmaps = new MutableBitmap[sortedGlobal.length];
-      for (int index = 0; index < sortedGlobal.length; index++) {
-        final int globalId = sortedGlobal[index];
-        sortedDictionaryWriter.write(globalId);
-        final int unsortedId = globalToUnsorted.get(globalId);
-        unsortedToSorted[unsortedId] = index;
-        bitmaps[index] = indexSpec.getBitmapSerdeFactory().getBitmapFactory().makeEmptyMutableBitmap();
-      }
-
-      openColumnSerializer(field, tmpWriteoutMedium, sortedGlobal[sortedGlobal.length - 1]);
-      final IntIterator rows = intermediateValueWriter.getIterator();
-      int rowCount = 0;
-      while (rows.hasNext()) {
-        final int unsortedLocalId = rows.nextInt();
-        final int sortedLocalId = unsortedToSorted[unsortedLocalId];
-        encodedValueSerializer.addValue(sortedLocalId);
-        bitmaps[sortedLocalId].add(rowCount++);
-      }
-
-      for (MutableBitmap bitmap : bitmaps) {
-        bitmapIndexWriter.write(
-            indexSpec.getBitmapSerdeFactory().getBitmapFactory().makeImmutableBitmap(bitmap)
-        );
-      }
-
-      final Serializer fieldSerializer = new Serializer()
-      {
-        @Override
-        public long getSerializedSize() throws IOException
-        {
-          return 1 + Integer.BYTES +
-                 sortedDictionaryWriter.getSerializedSize() +
-                 bitmapIndexWriter.getSerializedSize() +
-                 getSerializedColumnSize();
-        }
-
-        @Override
-        public void writeTo(WritableByteChannel channel, FileSmoosher smoosher) throws IOException
-        {
-          Channels.writeFully(channel, ByteBuffer.wrap(new byte[]{version.asByte()}));
-          channel.write(ByteBuffer.wrap(Ints.toByteArray(flags)));
-          sortedDictionaryWriter.writeTo(channel, smoosher);
-          writeColumnTo(channel, smoosher);
-          bitmapIndexWriter.writeTo(channel, smoosher);
-        }
-      };
-      final String fieldName = getFieldFileName(name, field);
-      final long size = fieldSerializer.getSerializedSize();
-      log.debug("Column [%s] serializing [%s] field of size [%d].", name, field, size);
-      try (SmooshedWriter smooshChannel = smoosher.addWithSmooshedWriter(fieldName, size)) {
-        fieldSerializer.writeTo(smooshChannel, smoosher);
-      }
-      finally {
-        tmpWriteoutMedium.close();
-      }
-    }
-  }
-
-  private final class StringFieldColumnWriter extends GlobalDictionaryEncodedFieldColumnWriter<String>
-  {
-    @Override
-    String processValue(Object value)
-    {
-      return String.valueOf(value);
-    }
-
-    @Override
-    int lookupGlobalId(String value)
-    {
-      return globalDictionaryIdLookup.lookupString(value);
-    }
-
-    @Override
-    void writeColumnTo(WritableByteChannel channel, FileSmoosher smoosher) throws IOException
-    {
-      writeLongAndDoubleColumnLength(channel, 0, 0);
-      encodedValueSerializer.writeTo(channel, smoosher);
-    }
-  }
-
-  private final class LongFieldColumnWriter extends GlobalDictionaryEncodedFieldColumnWriter<Long>
-  {
-    private ColumnarLongsSerializer longsSerializer;
-
-    @Override
-    int lookupGlobalId(Long value)
-    {
-      return globalDictionaryIdLookup.lookupLong(value);
-    }
-
-    @Override
-    public void open(String field) throws IOException
-    {
-      super.open(field);
-      longsSerializer = CompressionFactory.getLongSerializer(
-          field,
-          segmentWriteOutMedium,
-          StringUtils.format("%s.long_column", name),
-          ByteOrder.nativeOrder(),
-          indexSpec.getLongEncoding(),
-          indexSpec.getDimensionCompression()
-      );
-      longsSerializer.open();
-    }
-
-    @Override
-    void writeValue(@Nullable Long value) throws IOException
-    {
-      if (value == null) {
-        longsSerializer.add(0L);
-      } else {
-        longsSerializer.add(value);
-      }
-    }
-
-    @Override
-    void writeColumnTo(WritableByteChannel channel, FileSmoosher smoosher) throws IOException
-    {
-      writeLongAndDoubleColumnLength(channel, Ints.checkedCast(longsSerializer.getSerializedSize()), 0);
-      longsSerializer.writeTo(channel, smoosher);
-      encodedValueSerializer.writeTo(channel, smoosher);
-    }
-
-    @Override
-    long getSerializedColumnSize() throws IOException
-    {
-      return super.getSerializedColumnSize() + longsSerializer.getSerializedSize();
-    }
-  }
-
-  private final class DoubleFieldColumnWriter extends GlobalDictionaryEncodedFieldColumnWriter<Double>
-  {
-    private ColumnarDoublesSerializer doublesSerializer;
-
-    @Override
-    int lookupGlobalId(Double value)
-    {
-      return globalDictionaryIdLookup.lookupDouble(value);
-    }
-
-    @Override
-    public void open(String field) throws IOException
-    {
-      super.open(field);
-      doublesSerializer = CompressionFactory.getDoubleSerializer(
-          field,
-          segmentWriteOutMedium,
-          StringUtils.format("%s.double_column", name),
-          ByteOrder.nativeOrder(),
-          indexSpec.getDimensionCompression()
-      );
-      doublesSerializer.open();
-    }
-
-    @Override
-    void writeValue(@Nullable Double value) throws IOException
-    {
-      if (value == null) {
-        doublesSerializer.add(0.0);
-      } else {
-        doublesSerializer.add(value);
-      }
-    }
-
-    @Override
-    void writeColumnTo(WritableByteChannel channel, FileSmoosher smoosher) throws IOException
-    {
-      writeLongAndDoubleColumnLength(channel, 0, Ints.checkedCast(doublesSerializer.getSerializedSize()));
-      doublesSerializer.writeTo(channel, smoosher);
-      encodedValueSerializer.writeTo(channel, smoosher);
-    }
-
-    @Override
-    long getSerializedColumnSize() throws IOException
-    {
-      return super.getSerializedColumnSize() + doublesSerializer.getSerializedSize();
-    }
-  }
-
-  private final class VariantLiteralFieldColumnWriter extends GlobalDictionaryEncodedFieldColumnWriter<Object>
-  {
-    @Override
-    int lookupGlobalId(Object value)
-    {
-      if (value == null) {
-        return 0;
-      }
-      if (value instanceof Long) {
-        return globalDictionaryIdLookup.lookupLong((Long) value);
-      } else if (value instanceof Double) {
-        return globalDictionaryIdLookup.lookupDouble((Double) value);
-      } else {
-        return globalDictionaryIdLookup.lookupString(String.valueOf(value));
-      }
-    }
-
-    @Override
-    void writeColumnTo(WritableByteChannel channel, FileSmoosher smoosher) throws IOException
-    {
-      writeLongAndDoubleColumnLength(channel, 0, 0);
-      encodedValueSerializer.writeTo(channel, smoosher);
-    }
-  }
-
   private static final class IntTypeStrategy implements TypeStrategy<Integer>
   {
     @Override
diff --git a/processing/src/main/java/org/apache/druid/segment/nested/StringFieldColumnWriter.java b/processing/src/main/java/org/apache/druid/segment/nested/StringFieldColumnWriter.java
new file mode 100644
index 0000000000..637aa1fb69
--- /dev/null
+++ b/processing/src/main/java/org/apache/druid/segment/nested/StringFieldColumnWriter.java
@@ -0,0 +1,63 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.druid.segment.nested;
+
+import org.apache.druid.java.util.common.io.smoosh.FileSmoosher;
+import org.apache.druid.segment.IndexSpec;
+import org.apache.druid.segment.writeout.SegmentWriteOutMedium;
+
+import java.io.IOException;
+import java.nio.channels.WritableByteChannel;
+
+/**
+ * Literal field writer for string type nested columns of {@link NestedDataColumnSerializer}
+ */
+public final class StringFieldColumnWriter extends GlobalDictionaryEncodedFieldColumnWriter<String>
+{
+  public StringFieldColumnWriter(
+      String columnName,
+      String fieldName,
+      SegmentWriteOutMedium segmentWriteOutMedium,
+      IndexSpec indexSpec,
+      GlobalDictionaryIdLookup globalDictionaryIdLookup
+  )
+  {
+    super(columnName, fieldName, segmentWriteOutMedium, indexSpec, globalDictionaryIdLookup);
+  }
+
+  @Override
+  String processValue(Object value)
+  {
+    return String.valueOf(value);
+  }
+
+  @Override
+  int lookupGlobalId(String value)
+  {
+    return globalDictionaryIdLookup.lookupString(value);
+  }
+
+  @Override
+  void writeColumnTo(WritableByteChannel channel, FileSmoosher smoosher) throws IOException
+  {
+    writeLongAndDoubleColumnLength(channel, 0, 0);
+    encodedValueSerializer.writeTo(channel, smoosher);
+  }
+}
diff --git a/processing/src/main/java/org/apache/druid/segment/nested/VariantLiteralFieldColumnWriter.java b/processing/src/main/java/org/apache/druid/segment/nested/VariantLiteralFieldColumnWriter.java
new file mode 100644
index 0000000000..7440c12004
--- /dev/null
+++ b/processing/src/main/java/org/apache/druid/segment/nested/VariantLiteralFieldColumnWriter.java
@@ -0,0 +1,66 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.druid.segment.nested;
+
+import org.apache.druid.java.util.common.io.smoosh.FileSmoosher;
+import org.apache.druid.segment.IndexSpec;
+import org.apache.druid.segment.writeout.SegmentWriteOutMedium;
+
+import java.io.IOException;
+import java.nio.channels.WritableByteChannel;
+
+/**
+ * Literal field writer for mixed type nested columns of {@link NestedDataColumnSerializer}
+ */
+public final class VariantLiteralFieldColumnWriter extends GlobalDictionaryEncodedFieldColumnWriter<Object>
+{
+  public VariantLiteralFieldColumnWriter(
+      String columnName,
+      String fieldName,
+      SegmentWriteOutMedium segmentWriteOutMedium,
+      IndexSpec indexSpec,
+      GlobalDictionaryIdLookup globalDictionaryIdLookup
+  )
+  {
+    super(columnName, fieldName, segmentWriteOutMedium, indexSpec, globalDictionaryIdLookup);
+  }
+
+  @Override
+  int lookupGlobalId(Object value)
+  {
+    if (value == null) {
+      return 0;
+    }
+    if (value instanceof Long) {
+      return globalDictionaryIdLookup.lookupLong((Long) value);
+    } else if (value instanceof Double) {
+      return globalDictionaryIdLookup.lookupDouble((Double) value);
+    } else {
+      return globalDictionaryIdLookup.lookupString(String.valueOf(value));
+    }
+  }
+
+  @Override
+  void writeColumnTo(WritableByteChannel channel, FileSmoosher smoosher) throws IOException
+  {
+    writeLongAndDoubleColumnLength(channel, 0, 0);
+    encodedValueSerializer.writeTo(channel, smoosher);
+  }
+}


---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscribe@druid.apache.org
For additional commands, e-mail: commits-help@druid.apache.org