You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@druid.apache.org by cw...@apache.org on 2022/09/09 01:33:03 UTC

[druid] branch master updated: improve nested column serializer (#13051)

This is an automated email from the ASF dual-hosted git repository.

cwylie pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/druid.git


The following commit(s) were added to refs/heads/master by this push:
     new 6438f4198d improve nested column serializer (#13051)
6438f4198d is described below

commit 6438f4198db28c4accfb3524a9a1555a335d9bcf
Author: Clint Wylie <cw...@apache.org>
AuthorDate: Thu Sep 8 18:32:53 2022 -0700

    improve nested column serializer (#13051)
    
    changes:
    * long and double value columns are now written directly, at the same time as writing out the 'intermediary' dictionaryid column with unsorted ids
    * remove reverse value lookup from GlobalDictionaryIdLookup since it is no longer needed
---
 .../druid/segment/data/CompressedBlockReader.java  |  4 +-
 .../segment/nested/GlobalDictionaryIdLookup.java   | 31 +-------------
 .../segment/nested/NestedDataColumnSerializer.java | 49 ++++++++++------------
 3 files changed, 27 insertions(+), 57 deletions(-)

diff --git a/processing/src/main/java/org/apache/druid/segment/data/CompressedBlockReader.java b/processing/src/main/java/org/apache/druid/segment/data/CompressedBlockReader.java
index bace9d2dc5..f03eb80fb4 100644
--- a/processing/src/main/java/org/apache/druid/segment/data/CompressedBlockReader.java
+++ b/processing/src/main/java/org/apache/druid/segment/data/CompressedBlockReader.java
@@ -36,7 +36,7 @@ import java.util.function.Supplier;
  * Reader for a virtual contiguous address range backed by compressed blocks of data.
  *
  * Format:
- * | version (byte) | compression (byte) | num blocks (int) | block size (int) | end offsets | compressed data |
+ * | version (byte) | compression (byte) | block size (int) | num blocks (int) | end offsets | compressed data |
  *
  * This mechanism supports two modes of use, the first where callers may ask for a range of data from the underlying
  * blocks, provided by {@link #getRange(long, int)}. The {@link ByteBuffer} provided by this method may or may not
@@ -48,6 +48,8 @@ import java.util.function.Supplier;
  * {@link #seekBlock(int)} to change which block is currently loaded.
  *
  * {@link #getRange(long, int)} uses these same mechanisms internally to supply data.
+ *
+ * @see CompressedBlockSerializer for writer
  */
 public final class CompressedBlockReader implements Closeable
 {
diff --git a/processing/src/main/java/org/apache/druid/segment/nested/GlobalDictionaryIdLookup.java b/processing/src/main/java/org/apache/druid/segment/nested/GlobalDictionaryIdLookup.java
index 64bae5cf8a..48895042cd 100644
--- a/processing/src/main/java/org/apache/druid/segment/nested/GlobalDictionaryIdLookup.java
+++ b/processing/src/main/java/org/apache/druid/segment/nested/GlobalDictionaryIdLookup.java
@@ -22,10 +22,6 @@ package org.apache.druid.segment.nested;
 import com.google.common.base.Preconditions;
 import it.unimi.dsi.fastutil.doubles.Double2IntLinkedOpenHashMap;
 import it.unimi.dsi.fastutil.doubles.Double2IntMap;
-import it.unimi.dsi.fastutil.ints.Int2DoubleLinkedOpenHashMap;
-import it.unimi.dsi.fastutil.ints.Int2DoubleMap;
-import it.unimi.dsi.fastutil.ints.Int2LongLinkedOpenHashMap;
-import it.unimi.dsi.fastutil.ints.Int2LongMap;
 import it.unimi.dsi.fastutil.longs.Long2IntLinkedOpenHashMap;
 import it.unimi.dsi.fastutil.longs.Long2IntMap;
 import it.unimi.dsi.fastutil.objects.Object2IntLinkedOpenHashMap;
@@ -35,18 +31,15 @@ import javax.annotation.Nullable;
 
 /**
  * Ingestion time dictionary identifier lookup, used by {@link NestedDataColumnSerializer} to build a global dictionary
- * id to value mapping for the 'stacked' global value dictionaries. Also provides reverse lookup for numeric values,
- * since they serialize a value column
+ * id to value mapping for the 'stacked' global value dictionaries.
  */
 public class GlobalDictionaryIdLookup
 {
   private final Object2IntMap<String> stringLookup;
 
   private final Long2IntMap longLookup;
-  private final Int2LongMap reverseLongLookup;
 
   private final Double2IntMap doubleLookup;
-  private final Int2DoubleMap reverseDoubleLookup;
 
   private int dictionarySize;
 
@@ -54,9 +47,7 @@ public class GlobalDictionaryIdLookup
   {
     this.stringLookup = new Object2IntLinkedOpenHashMap<>();
     this.longLookup = new Long2IntLinkedOpenHashMap();
-    this.reverseLongLookup = new Int2LongLinkedOpenHashMap();
     this.doubleLookup = new Double2IntLinkedOpenHashMap();
-    this.reverseDoubleLookup = new Int2DoubleLinkedOpenHashMap();
   }
 
   public void addString(@Nullable String value)
@@ -82,7 +73,6 @@ public class GlobalDictionaryIdLookup
     );
     int id = dictionarySize++;
     longLookup.put(value, id);
-    reverseLongLookup.put(id, value);
   }
 
   public int lookupLong(@Nullable Long value)
@@ -93,20 +83,10 @@ public class GlobalDictionaryIdLookup
     return longLookup.get(value.longValue());
   }
 
-  @Nullable
-  public Long lookupLong(int id)
-  {
-    if (id == 0) {
-      return null;
-    }
-    return reverseLongLookup.get(id);
-  }
-
   public void addDouble(double value)
   {
     int id = dictionarySize++;
     doubleLookup.put(value, id);
-    reverseDoubleLookup.put(id, value);
   }
 
   public int lookupDouble(@Nullable Double value)
@@ -116,13 +96,4 @@ public class GlobalDictionaryIdLookup
     }
     return doubleLookup.get(value.doubleValue());
   }
-
-  @Nullable
-  public Double lookupDouble(int id)
-  {
-    if (id == 0) {
-      return null;
-    }
-    return reverseDoubleLookup.get(id);
-  }
 }
diff --git a/processing/src/main/java/org/apache/druid/segment/nested/NestedDataColumnSerializer.java b/processing/src/main/java/org/apache/druid/segment/nested/NestedDataColumnSerializer.java
index 6d113bf717..f5d782d1e0 100644
--- a/processing/src/main/java/org/apache/druid/segment/nested/NestedDataColumnSerializer.java
+++ b/processing/src/main/java/org/apache/druid/segment/nested/NestedDataColumnSerializer.java
@@ -63,6 +63,7 @@ import org.apache.druid.segment.serde.DictionaryEncodedColumnPartSerde;
 import org.apache.druid.segment.serde.Serializer;
 import org.apache.druid.segment.writeout.SegmentWriteOutMedium;
 
+import javax.annotation.Nullable;
 import java.io.ByteArrayOutputStream;
 import java.io.IOException;
 import java.nio.ByteBuffer;
@@ -196,7 +197,7 @@ public class NestedDataColumnSerializer implements GenericColumnSerializer<Struc
       } else {
         writer = new VariantLiteralFieldColumnWriter();
       }
-      writer.open();
+      writer.open(field.getKey());
       fieldWriters.put(field.getKey(), writer);
     }
   }
@@ -376,6 +377,12 @@ public class NestedDataColumnSerializer implements GenericColumnSerializer<Struc
     {
       return (T) value;
     }
+
+    void writeValue(@Nullable T value) throws IOException
+    {
+      // do nothing, if a value column is present this method should be overridden to write the value to the serializer
+    }
+
     abstract int lookupGlobalId(T value);
 
     abstract void writeColumnTo(WritableByteChannel channel, FileSmoosher smoosher) throws IOException;
@@ -398,17 +405,12 @@ public class NestedDataColumnSerializer implements GenericColumnSerializer<Struc
       encodedValueSerializer.open();
     }
 
-    void serializeRow(int globalId, int localId) throws IOException
-    {
-      encodedValueSerializer.addValue(localId);
-    }
-
     long getSerializedColumnSize() throws IOException
     {
       return Integer.BYTES + Integer.BYTES + encodedValueSerializer.getSerializedSize();
     }
 
-    public void open() throws IOException
+    public void open(String field) throws IOException
     {
       intermediateValueWriter = new FixedIndexedIntWriter(segmentWriteOutMedium, false);
       intermediateValueWriter.open();
@@ -435,6 +437,7 @@ public class NestedDataColumnSerializer implements GenericColumnSerializer<Struc
       final int globalId = lookupGlobalId(value);
       final int localId = localDictionary.add(globalId);
       intermediateValueWriter.write(localId);
+      writeValue(value);
     }
 
     public void writeTo(String field, FileSmoosher smoosher) throws IOException
@@ -472,10 +475,8 @@ public class NestedDataColumnSerializer implements GenericColumnSerializer<Struc
       int rowCount = 0;
       while (rows.hasNext()) {
         final int unsortedLocalId = rows.nextInt();
-        final int globalId = unsortedToGlobal[unsortedLocalId];
         final int sortedLocalId = unsortedToSorted[unsortedLocalId];
-
-        serializeRow(globalId, sortedLocalId);
+        encodedValueSerializer.addValue(sortedLocalId);
         bitmaps[sortedLocalId].add(rowCount++);
       }
 
@@ -551,12 +552,12 @@ public class NestedDataColumnSerializer implements GenericColumnSerializer<Struc
     }
 
     @Override
-    void openColumnSerializer(String field, SegmentWriteOutMedium medium, int maxId) throws IOException
+    public void open(String field) throws IOException
     {
-      super.openColumnSerializer(field, medium, maxId);
+      super.open(field);
       longsSerializer = CompressionFactory.getLongSerializer(
           field,
-          medium,
+          segmentWriteOutMedium,
           StringUtils.format("%s.long_column", name),
           ByteOrder.nativeOrder(),
           indexSpec.getLongEncoding(),
@@ -566,14 +567,12 @@ public class NestedDataColumnSerializer implements GenericColumnSerializer<Struc
     }
 
     @Override
-    void serializeRow(int globalId, int localId) throws IOException
+    void writeValue(@Nullable Long value) throws IOException
     {
-      super.serializeRow(globalId, localId);
-      Long l = globalDictionaryIdLookup.lookupLong(globalId);
-      if (l == null) {
+      if (value == null) {
         longsSerializer.add(0L);
       } else {
-        longsSerializer.add(l);
+        longsSerializer.add(value);
       }
     }
 
@@ -603,12 +602,12 @@ public class NestedDataColumnSerializer implements GenericColumnSerializer<Struc
     }
 
     @Override
-    void openColumnSerializer(String field, SegmentWriteOutMedium medium, int maxId) throws IOException
+    public void open(String field) throws IOException
     {
-      super.openColumnSerializer(field, medium, maxId);
+      super.open(field);
       doublesSerializer = CompressionFactory.getDoubleSerializer(
           field,
-          medium,
+          segmentWriteOutMedium,
           StringUtils.format("%s.double_column", name),
           ByteOrder.nativeOrder(),
           indexSpec.getDimensionCompression()
@@ -617,14 +616,12 @@ public class NestedDataColumnSerializer implements GenericColumnSerializer<Struc
     }
 
     @Override
-    void serializeRow(int globalId, int localId) throws IOException
+    void writeValue(@Nullable Double value) throws IOException
     {
-      super.serializeRow(globalId, localId);
-      Double d = globalDictionaryIdLookup.lookupDouble(globalId);
-      if (d == null) {
+      if (value == null) {
         doublesSerializer.add(0.0);
       } else {
-        doublesSerializer.add(d);
+        doublesSerializer.add(value);
       }
     }
 


---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscribe@druid.apache.org
For additional commands, e-mail: commits-help@druid.apache.org