You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@druid.apache.org by ab...@apache.org on 2022/09/19 08:38:04 UTC
[druid] branch master updated: nested column serializer performance improvement for sparse columns (#13101)
This is an automated email from the ASF dual-hosted git repository.
abhishek pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/druid.git
The following commit(s) were added to refs/heads/master by this push:
new a0e0fbe1b3 nested column serializer performance improvement for sparse columns (#13101)
a0e0fbe1b3 is described below
commit a0e0fbe1b349debd3da4a136b9a25b5316291d55
Author: Clint Wylie <cw...@apache.org>
AuthorDate: Mon Sep 19 01:37:48 2022 -0700
nested column serializer performance improvement for sparse columns (#13101)
---
.../GlobalDictionaryEncodedFieldColumnWriter.java | 35 +++++++++++++++++-----
.../segment/nested/NestedDataColumnSerializer.java | 17 ++---------
2 files changed, 31 insertions(+), 21 deletions(-)
diff --git a/processing/src/main/java/org/apache/druid/segment/nested/GlobalDictionaryEncodedFieldColumnWriter.java b/processing/src/main/java/org/apache/druid/segment/nested/GlobalDictionaryEncodedFieldColumnWriter.java
index ed30c84b79..a6d40471cd 100644
--- a/processing/src/main/java/org/apache/druid/segment/nested/GlobalDictionaryEncodedFieldColumnWriter.java
+++ b/processing/src/main/java/org/apache/druid/segment/nested/GlobalDictionaryEncodedFieldColumnWriter.java
@@ -48,12 +48,12 @@ import java.nio.channels.WritableByteChannel;
/**
* Base class for writer of global dictionary encoded nested literal columns for {@link NestedDataColumnSerializer}.
- * {@link NestedDataColumnSerializer} while processing the 'raw' nested data will call {@link #addValue(Object)} for
- * all literal writers, which for this type of writer entails building a local dictionary to map into to the global
+ * {@link NestedDataColumnSerializer} while processing the 'raw' nested data will call {@link #addValue(int, Object)}
+ * for all literal writers, which for this type of writer entails building a local dictionary to map into to the global
* dictionary ({@link #localDictionary}) and writes this unsorted localId to an intermediate integer column,
* {@link #intermediateValueWriter}.
*
- * When processing the 'raw' value column is complete, the {@link #writeTo(FileSmoosher)} method will sort the
+ * When processing the 'raw' value column is complete, the {@link #writeTo(int, FileSmoosher)} method will sort the
* local ids and write them out to a local sorted dictionary, iterate over {@link #intermediateValueWriter} swapping
* the unsorted local ids with the sorted ids and writing to the compressed id column writer
* {@link #encodedValueSerializer} building the bitmap indexes along the way.
@@ -75,6 +75,8 @@ public abstract class GlobalDictionaryEncodedFieldColumnWriter<T>
protected DictionaryEncodedColumnPartSerde.VERSION version = null;
protected SingleValueColumnarIntsSerializer encodedValueSerializer;
+ protected int cursorPosition;
+
protected GlobalDictionaryEncodedFieldColumnWriter(
String columnName,
String fieldName,
@@ -99,7 +101,7 @@ public abstract class GlobalDictionaryEncodedFieldColumnWriter<T>
}
/**
- * Hook to allow implementors the chance to do additional operations during {@link #addValue(Object)}, such as
+ * Hook to allow implementors the chance to do additional operations during {@link #addValue(int, Object)}, such as
* writing an additional value column
*/
void writeValue(@Nullable T value) throws IOException
@@ -113,24 +115,40 @@ public abstract class GlobalDictionaryEncodedFieldColumnWriter<T>
abstract int lookupGlobalId(T value);
/**
- * Open the writer so that {@link #addValue(Object)} can be called
+ * Open the writer so that {@link #addValue(int, Object)} can be called
*/
public void open() throws IOException
{
intermediateValueWriter = new FixedIndexedIntWriter(segmentWriteOutMedium, false);
intermediateValueWriter.open();
+ cursorPosition = 0;
}
/**
* Add a value to the unsorted local dictionary and write to an intermediate column
*/
- public void addValue(Object val) throws IOException
+ public void addValue(int row, Object val) throws IOException
{
+ if (row > cursorPosition) {
+ fillNull(row);
+ }
final T value = processValue(val);
final int globalId = lookupGlobalId(value);
final int localId = localDictionary.add(globalId);
intermediateValueWriter.write(localId);
writeValue(value);
+ cursorPosition++;
+ }
+
+ private void fillNull(int row) throws IOException
+ {
+ final T value = processValue(null);
+ final int localId = localDictionary.add(0);
+ while (cursorPosition < row) {
+ intermediateValueWriter.write(localId);
+ writeValue(value);
+ cursorPosition++;
+ }
}
@@ -148,8 +166,11 @@ public abstract class GlobalDictionaryEncodedFieldColumnWriter<T>
*/
abstract void writeColumnTo(WritableByteChannel channel, FileSmoosher smoosher) throws IOException;
- public void writeTo(FileSmoosher smoosher) throws IOException
+ public void writeTo(int finalRowCount, FileSmoosher smoosher) throws IOException
{
+ if (finalRowCount > cursorPosition) {
+ fillNull(finalRowCount);
+ }
// use a child writeout medium so that we can close them when we are finished and don't leave temporary files
// hanging out until the entire segment is done
final SegmentWriteOutMedium tmpWriteoutMedium = segmentWriteOutMedium.makeChildWriteOutMedium();
diff --git a/processing/src/main/java/org/apache/druid/segment/nested/NestedDataColumnSerializer.java b/processing/src/main/java/org/apache/druid/segment/nested/NestedDataColumnSerializer.java
index 485abf14c3..521740aada 100644
--- a/processing/src/main/java/org/apache/druid/segment/nested/NestedDataColumnSerializer.java
+++ b/processing/src/main/java/org/apache/druid/segment/nested/NestedDataColumnSerializer.java
@@ -55,7 +55,6 @@ import java.nio.ByteBuffer;
import java.nio.ByteOrder;
import java.nio.channels.WritableByteChannel;
import java.util.Map;
-import java.util.Set;
import java.util.SortedMap;
public class NestedDataColumnSerializer implements GenericColumnSerializer<StructuredData>
@@ -83,7 +82,7 @@ public class NestedDataColumnSerializer implements GenericColumnSerializer<Struc
if (writer != null) {
try {
ExprEval<?> eval = ExprEval.bestEffortOf(fieldValue);
- writer.addValue(eval.value());
+ writer.addValue(rowCount, eval.value());
// serializer doesn't use size estimate
return 0;
}
@@ -266,17 +265,7 @@ public class NestedDataColumnSerializer implements GenericColumnSerializer<Struc
}
rawWriter.addValue(NestedDataComplexTypeSerde.INSTANCE.toBytes(data));
if (data != null) {
- StructuredDataProcessor.ProcessResults processed = fieldProcessor.processFields(data.getValue());
- Set<String> set = processed.getLiteralFields();
- for (String field : fields.keySet()) {
- if (!set.contains(field)) {
- fieldWriters.get(field).addValue(null);
- }
- }
- } else {
- for (String field : fields.keySet()) {
- fieldWriters.get(field).addValue(null);
- }
+ fieldProcessor.processFields(data.getValue());
}
rowCount++;
}
@@ -349,7 +338,7 @@ public class NestedDataColumnSerializer implements GenericColumnSerializer<Struc
for (Map.Entry<String, NestedLiteralTypeInfo.MutableTypeSet> field : fields.entrySet()) {
// remove writer so that it can be collected when we are done with it
GlobalDictionaryEncodedFieldColumnWriter<?> writer = fieldWriters.remove(field.getKey());
- writer.writeTo(smoosher);
+ writer.writeTo(rowCount, smoosher);
}
log.info("Column [%s] serialized successfully with [%d] nested columns.", name, fields.size());
}
---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscribe@druid.apache.org
For additional commands, e-mail: commits-help@druid.apache.org