You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@druid.apache.org by cw...@apache.org on 2022/09/09 01:33:03 UTC
[druid] branch master updated: improve nested column serializer (#13051)
This is an automated email from the ASF dual-hosted git repository.
cwylie pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/druid.git
The following commit(s) were added to refs/heads/master by this push:
new 6438f4198d improve nested column serializer (#13051)
6438f4198d is described below
commit 6438f4198db28c4accfb3524a9a1555a335d9bcf
Author: Clint Wylie <cw...@apache.org>
AuthorDate: Thu Sep 8 18:32:53 2022 -0700
improve nested column serializer (#13051)
changes:
* long and double value columns are now written directly, at the same time as writing out the 'intermediary' dictionaryid column with unsorted ids
* remove reverse value lookup from GlobalDictionaryIdLookup since it is no longer needed
---
.../druid/segment/data/CompressedBlockReader.java | 4 +-
.../segment/nested/GlobalDictionaryIdLookup.java | 31 +-------------
.../segment/nested/NestedDataColumnSerializer.java | 49 ++++++++++------------
3 files changed, 27 insertions(+), 57 deletions(-)
diff --git a/processing/src/main/java/org/apache/druid/segment/data/CompressedBlockReader.java b/processing/src/main/java/org/apache/druid/segment/data/CompressedBlockReader.java
index bace9d2dc5..f03eb80fb4 100644
--- a/processing/src/main/java/org/apache/druid/segment/data/CompressedBlockReader.java
+++ b/processing/src/main/java/org/apache/druid/segment/data/CompressedBlockReader.java
@@ -36,7 +36,7 @@ import java.util.function.Supplier;
* Reader for a virtual contiguous address range backed by compressed blocks of data.
*
* Format:
- * | version (byte) | compression (byte) | num blocks (int) | block size (int) | end offsets | compressed data |
+ * | version (byte) | compression (byte) | block size (int) | num blocks (int) | end offsets | compressed data |
*
* This mechanism supports two modes of use, the first where callers may ask for a range of data from the underlying
* blocks, provided by {@link #getRange(long, int)}. The {@link ByteBuffer} provided by this method may or may not
@@ -48,6 +48,8 @@ import java.util.function.Supplier;
* {@link #seekBlock(int)} to change which block is currently loaded.
*
* {@link #getRange(long, int)} uses these same mechanisms internally to supply data.
+ *
+ * @see CompressedBlockSerializer for writer
*/
public final class CompressedBlockReader implements Closeable
{
diff --git a/processing/src/main/java/org/apache/druid/segment/nested/GlobalDictionaryIdLookup.java b/processing/src/main/java/org/apache/druid/segment/nested/GlobalDictionaryIdLookup.java
index 64bae5cf8a..48895042cd 100644
--- a/processing/src/main/java/org/apache/druid/segment/nested/GlobalDictionaryIdLookup.java
+++ b/processing/src/main/java/org/apache/druid/segment/nested/GlobalDictionaryIdLookup.java
@@ -22,10 +22,6 @@ package org.apache.druid.segment.nested;
import com.google.common.base.Preconditions;
import it.unimi.dsi.fastutil.doubles.Double2IntLinkedOpenHashMap;
import it.unimi.dsi.fastutil.doubles.Double2IntMap;
-import it.unimi.dsi.fastutil.ints.Int2DoubleLinkedOpenHashMap;
-import it.unimi.dsi.fastutil.ints.Int2DoubleMap;
-import it.unimi.dsi.fastutil.ints.Int2LongLinkedOpenHashMap;
-import it.unimi.dsi.fastutil.ints.Int2LongMap;
import it.unimi.dsi.fastutil.longs.Long2IntLinkedOpenHashMap;
import it.unimi.dsi.fastutil.longs.Long2IntMap;
import it.unimi.dsi.fastutil.objects.Object2IntLinkedOpenHashMap;
@@ -35,18 +31,15 @@ import javax.annotation.Nullable;
/**
* Ingestion time dictionary identifier lookup, used by {@link NestedDataColumnSerializer} to build a global dictionary
- * id to value mapping for the 'stacked' global value dictionaries. Also provides reverse lookup for numeric values,
- * since they serialize a value column
+ * id to value mapping for the 'stacked' global value dictionaries.
*/
public class GlobalDictionaryIdLookup
{
private final Object2IntMap<String> stringLookup;
private final Long2IntMap longLookup;
- private final Int2LongMap reverseLongLookup;
private final Double2IntMap doubleLookup;
- private final Int2DoubleMap reverseDoubleLookup;
private int dictionarySize;
@@ -54,9 +47,7 @@ public class GlobalDictionaryIdLookup
{
this.stringLookup = new Object2IntLinkedOpenHashMap<>();
this.longLookup = new Long2IntLinkedOpenHashMap();
- this.reverseLongLookup = new Int2LongLinkedOpenHashMap();
this.doubleLookup = new Double2IntLinkedOpenHashMap();
- this.reverseDoubleLookup = new Int2DoubleLinkedOpenHashMap();
}
public void addString(@Nullable String value)
@@ -82,7 +73,6 @@ public class GlobalDictionaryIdLookup
);
int id = dictionarySize++;
longLookup.put(value, id);
- reverseLongLookup.put(id, value);
}
public int lookupLong(@Nullable Long value)
@@ -93,20 +83,10 @@ public class GlobalDictionaryIdLookup
return longLookup.get(value.longValue());
}
- @Nullable
- public Long lookupLong(int id)
- {
- if (id == 0) {
- return null;
- }
- return reverseLongLookup.get(id);
- }
-
public void addDouble(double value)
{
int id = dictionarySize++;
doubleLookup.put(value, id);
- reverseDoubleLookup.put(id, value);
}
public int lookupDouble(@Nullable Double value)
@@ -116,13 +96,4 @@ public class GlobalDictionaryIdLookup
}
return doubleLookup.get(value.doubleValue());
}
-
- @Nullable
- public Double lookupDouble(int id)
- {
- if (id == 0) {
- return null;
- }
- return reverseDoubleLookup.get(id);
- }
}
diff --git a/processing/src/main/java/org/apache/druid/segment/nested/NestedDataColumnSerializer.java b/processing/src/main/java/org/apache/druid/segment/nested/NestedDataColumnSerializer.java
index 6d113bf717..f5d782d1e0 100644
--- a/processing/src/main/java/org/apache/druid/segment/nested/NestedDataColumnSerializer.java
+++ b/processing/src/main/java/org/apache/druid/segment/nested/NestedDataColumnSerializer.java
@@ -63,6 +63,7 @@ import org.apache.druid.segment.serde.DictionaryEncodedColumnPartSerde;
import org.apache.druid.segment.serde.Serializer;
import org.apache.druid.segment.writeout.SegmentWriteOutMedium;
+import javax.annotation.Nullable;
import java.io.ByteArrayOutputStream;
import java.io.IOException;
import java.nio.ByteBuffer;
@@ -196,7 +197,7 @@ public class NestedDataColumnSerializer implements GenericColumnSerializer<Struc
} else {
writer = new VariantLiteralFieldColumnWriter();
}
- writer.open();
+ writer.open(field.getKey());
fieldWriters.put(field.getKey(), writer);
}
}
@@ -376,6 +377,12 @@ public class NestedDataColumnSerializer implements GenericColumnSerializer<Struc
{
return (T) value;
}
+
+ void writeValue(@Nullable T value) throws IOException
+ {
+ // do nothing, if a value column is present this method should be overridden to write the value to the serializer
+ }
+
abstract int lookupGlobalId(T value);
abstract void writeColumnTo(WritableByteChannel channel, FileSmoosher smoosher) throws IOException;
@@ -398,17 +405,12 @@ public class NestedDataColumnSerializer implements GenericColumnSerializer<Struc
encodedValueSerializer.open();
}
- void serializeRow(int globalId, int localId) throws IOException
- {
- encodedValueSerializer.addValue(localId);
- }
-
long getSerializedColumnSize() throws IOException
{
return Integer.BYTES + Integer.BYTES + encodedValueSerializer.getSerializedSize();
}
- public void open() throws IOException
+ public void open(String field) throws IOException
{
intermediateValueWriter = new FixedIndexedIntWriter(segmentWriteOutMedium, false);
intermediateValueWriter.open();
@@ -435,6 +437,7 @@ public class NestedDataColumnSerializer implements GenericColumnSerializer<Struc
final int globalId = lookupGlobalId(value);
final int localId = localDictionary.add(globalId);
intermediateValueWriter.write(localId);
+ writeValue(value);
}
public void writeTo(String field, FileSmoosher smoosher) throws IOException
@@ -472,10 +475,8 @@ public class NestedDataColumnSerializer implements GenericColumnSerializer<Struc
int rowCount = 0;
while (rows.hasNext()) {
final int unsortedLocalId = rows.nextInt();
- final int globalId = unsortedToGlobal[unsortedLocalId];
final int sortedLocalId = unsortedToSorted[unsortedLocalId];
-
- serializeRow(globalId, sortedLocalId);
+ encodedValueSerializer.addValue(sortedLocalId);
bitmaps[sortedLocalId].add(rowCount++);
}
@@ -551,12 +552,12 @@ public class NestedDataColumnSerializer implements GenericColumnSerializer<Struc
}
@Override
- void openColumnSerializer(String field, SegmentWriteOutMedium medium, int maxId) throws IOException
+ public void open(String field) throws IOException
{
- super.openColumnSerializer(field, medium, maxId);
+ super.open(field);
longsSerializer = CompressionFactory.getLongSerializer(
field,
- medium,
+ segmentWriteOutMedium,
StringUtils.format("%s.long_column", name),
ByteOrder.nativeOrder(),
indexSpec.getLongEncoding(),
@@ -566,14 +567,12 @@ public class NestedDataColumnSerializer implements GenericColumnSerializer<Struc
}
@Override
- void serializeRow(int globalId, int localId) throws IOException
+ void writeValue(@Nullable Long value) throws IOException
{
- super.serializeRow(globalId, localId);
- Long l = globalDictionaryIdLookup.lookupLong(globalId);
- if (l == null) {
+ if (value == null) {
longsSerializer.add(0L);
} else {
- longsSerializer.add(l);
+ longsSerializer.add(value);
}
}
@@ -603,12 +602,12 @@ public class NestedDataColumnSerializer implements GenericColumnSerializer<Struc
}
@Override
- void openColumnSerializer(String field, SegmentWriteOutMedium medium, int maxId) throws IOException
+ public void open(String field) throws IOException
{
- super.openColumnSerializer(field, medium, maxId);
+ super.open(field);
doublesSerializer = CompressionFactory.getDoubleSerializer(
field,
- medium,
+ segmentWriteOutMedium,
StringUtils.format("%s.double_column", name),
ByteOrder.nativeOrder(),
indexSpec.getDimensionCompression()
@@ -617,14 +616,12 @@ public class NestedDataColumnSerializer implements GenericColumnSerializer<Struc
}
@Override
- void serializeRow(int globalId, int localId) throws IOException
+ void writeValue(@Nullable Double value) throws IOException
{
- super.serializeRow(globalId, localId);
- Double d = globalDictionaryIdLookup.lookupDouble(globalId);
- if (d == null) {
+ if (value == null) {
doublesSerializer.add(0.0);
} else {
- doublesSerializer.add(d);
+ doublesSerializer.add(value);
}
}
---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscribe@druid.apache.org
For additional commands, e-mail: commits-help@druid.apache.org