You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@druid.apache.org by ab...@apache.org on 2022/09/19 08:38:04 UTC

[druid] branch master updated: nested column serializer performance improvement for sparse columns (#13101)

This is an automated email from the ASF dual-hosted git repository.

abhishek pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/druid.git


The following commit(s) were added to refs/heads/master by this push:
     new a0e0fbe1b3 nested column serializer performance improvement for sparse columns (#13101)
a0e0fbe1b3 is described below

commit a0e0fbe1b349debd3da4a136b9a25b5316291d55
Author: Clint Wylie <cw...@apache.org>
AuthorDate: Mon Sep 19 01:37:48 2022 -0700

    nested column serializer performance improvement for sparse columns (#13101)
---
 .../GlobalDictionaryEncodedFieldColumnWriter.java  | 35 +++++++++++++++++-----
 .../segment/nested/NestedDataColumnSerializer.java | 17 ++---------
 2 files changed, 31 insertions(+), 21 deletions(-)

diff --git a/processing/src/main/java/org/apache/druid/segment/nested/GlobalDictionaryEncodedFieldColumnWriter.java b/processing/src/main/java/org/apache/druid/segment/nested/GlobalDictionaryEncodedFieldColumnWriter.java
index ed30c84b79..a6d40471cd 100644
--- a/processing/src/main/java/org/apache/druid/segment/nested/GlobalDictionaryEncodedFieldColumnWriter.java
+++ b/processing/src/main/java/org/apache/druid/segment/nested/GlobalDictionaryEncodedFieldColumnWriter.java
@@ -48,12 +48,12 @@ import java.nio.channels.WritableByteChannel;
 
 /**
  * Base class for writer of global dictionary encoded nested literal columns for {@link NestedDataColumnSerializer}.
- * {@link NestedDataColumnSerializer} while processing the 'raw' nested data will call {@link #addValue(Object)} for
- * all literal writers, which for this type of writer entails building a local dictionary to map into to the global
+ * {@link NestedDataColumnSerializer} while processing the 'raw' nested data will call {@link #addValue(int, Object)}
+ * for all literal writers, which for this type of writer entails building a local dictionary to map into to the global
  * dictionary ({@link #localDictionary}) and writes this unsorted localId to an intermediate integer column,
  * {@link #intermediateValueWriter}.
  *
- * When processing the 'raw' value column is complete, the {@link #writeTo(FileSmoosher)} method will sort the
+ * When processing the 'raw' value column is complete, the {@link #writeTo(int, FileSmoosher)} method will sort the
  * local ids and write them out to a local sorted dictionary, iterate over {@link #intermediateValueWriter} swapping
  * the unsorted local ids with the sorted ids and writing to the compressed id column writer
  * {@link #encodedValueSerializer} building the bitmap indexes along the way.
@@ -75,6 +75,8 @@ public abstract class GlobalDictionaryEncodedFieldColumnWriter<T>
   protected DictionaryEncodedColumnPartSerde.VERSION version = null;
   protected SingleValueColumnarIntsSerializer encodedValueSerializer;
 
+  protected int cursorPosition;
+
   protected GlobalDictionaryEncodedFieldColumnWriter(
       String columnName,
       String fieldName,
@@ -99,7 +101,7 @@ public abstract class GlobalDictionaryEncodedFieldColumnWriter<T>
   }
 
   /**
-   * Hook to allow implementors the chance to do additional operations during {@link #addValue(Object)}, such as
+   * Hook to allow implementors the chance to do additional operations during {@link #addValue(int, Object)}, such as
    * writing an additional value column
    */
   void writeValue(@Nullable T value) throws IOException
@@ -113,24 +115,40 @@ public abstract class GlobalDictionaryEncodedFieldColumnWriter<T>
   abstract int lookupGlobalId(T value);
 
   /**
-   * Open the writer so that {@link #addValue(Object)} can be called
+   * Open the writer so that {@link #addValue(int, Object)} can be called
    */
   public void open() throws IOException
   {
     intermediateValueWriter = new FixedIndexedIntWriter(segmentWriteOutMedium, false);
     intermediateValueWriter.open();
+    cursorPosition = 0;
   }
 
   /**
    * Add a value to the unsorted local dictionary and write to an intermediate column
    */
-  public void addValue(Object val) throws IOException
+  public void addValue(int row, Object val) throws IOException
   {
+    if (row > cursorPosition) {
+      fillNull(row);
+    }
     final T value = processValue(val);
     final int globalId = lookupGlobalId(value);
     final int localId = localDictionary.add(globalId);
     intermediateValueWriter.write(localId);
     writeValue(value);
+    cursorPosition++;
+  }
+
+  private void fillNull(int row) throws IOException
+  {
+    final T value = processValue(null);
+    final int localId = localDictionary.add(0);
+    while (cursorPosition < row) {
+      intermediateValueWriter.write(localId);
+      writeValue(value);
+      cursorPosition++;
+    }
   }
 
 
@@ -148,8 +166,11 @@ public abstract class GlobalDictionaryEncodedFieldColumnWriter<T>
    */
   abstract void writeColumnTo(WritableByteChannel channel, FileSmoosher smoosher) throws IOException;
 
-  public void writeTo(FileSmoosher smoosher) throws IOException
+  public void writeTo(int finalRowCount, FileSmoosher smoosher) throws IOException
   {
+    if (finalRowCount > cursorPosition) {
+      fillNull(finalRowCount);
+    }
     // use a child writeout medium so that we can close them when we are finished and don't leave temporary files
     // hanging out until the entire segment is done
     final SegmentWriteOutMedium tmpWriteoutMedium = segmentWriteOutMedium.makeChildWriteOutMedium();
diff --git a/processing/src/main/java/org/apache/druid/segment/nested/NestedDataColumnSerializer.java b/processing/src/main/java/org/apache/druid/segment/nested/NestedDataColumnSerializer.java
index 485abf14c3..521740aada 100644
--- a/processing/src/main/java/org/apache/druid/segment/nested/NestedDataColumnSerializer.java
+++ b/processing/src/main/java/org/apache/druid/segment/nested/NestedDataColumnSerializer.java
@@ -55,7 +55,6 @@ import java.nio.ByteBuffer;
 import java.nio.ByteOrder;
 import java.nio.channels.WritableByteChannel;
 import java.util.Map;
-import java.util.Set;
 import java.util.SortedMap;
 
 public class NestedDataColumnSerializer implements GenericColumnSerializer<StructuredData>
@@ -83,7 +82,7 @@ public class NestedDataColumnSerializer implements GenericColumnSerializer<Struc
       if (writer != null) {
         try {
           ExprEval<?> eval = ExprEval.bestEffortOf(fieldValue);
-          writer.addValue(eval.value());
+          writer.addValue(rowCount, eval.value());
           // serializer doesn't use size estimate
           return 0;
         }
@@ -266,17 +265,7 @@ public class NestedDataColumnSerializer implements GenericColumnSerializer<Struc
     }
     rawWriter.addValue(NestedDataComplexTypeSerde.INSTANCE.toBytes(data));
     if (data != null) {
-      StructuredDataProcessor.ProcessResults processed = fieldProcessor.processFields(data.getValue());
-      Set<String> set = processed.getLiteralFields();
-      for (String field : fields.keySet()) {
-        if (!set.contains(field)) {
-          fieldWriters.get(field).addValue(null);
-        }
-      }
-    } else {
-      for (String field : fields.keySet()) {
-        fieldWriters.get(field).addValue(null);
-      }
+      fieldProcessor.processFields(data.getValue());
     }
     rowCount++;
   }
@@ -349,7 +338,7 @@ public class NestedDataColumnSerializer implements GenericColumnSerializer<Struc
     for (Map.Entry<String, NestedLiteralTypeInfo.MutableTypeSet> field : fields.entrySet()) {
       // remove writer so that it can be collected when we are done with it
       GlobalDictionaryEncodedFieldColumnWriter<?> writer = fieldWriters.remove(field.getKey());
-      writer.writeTo(smoosher);
+      writer.writeTo(rowCount, smoosher);
     }
     log.info("Column [%s] serialized successfully with [%d] nested columns.", name, fields.size());
   }


---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscribe@druid.apache.org
For additional commands, e-mail: commits-help@druid.apache.org