You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@druid.apache.org by gi...@apache.org on 2019/03/12 01:02:07 UTC
[incubator-druid] branch master updated: Write null byte when
indexing numeric dimensions with Hadoop (#7020)
This is an automated email from the ASF dual-hosted git repository.
gian pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/incubator-druid.git
The following commit(s) were added to refs/heads/master by this push:
new c503ba9 Write null byte when indexing numeric dimensions with Hadoop (#7020)
c503ba9 is described below
commit c503ba97799e3531753c5a26ef9c6f06c0e7e13e
Author: Ferris Tseng <fe...@gmail.com>
AuthorDate: Mon Mar 11 21:02:03 2019 -0400
Write null byte when indexing numeric dimensions with Hadoop (#7020)
* write null byte in hadoop indexing for numeric dimensions
* Add test case to check output serializing null numeric dimensions
* Remove extra line
* Add @Nullable annotations
---
.../org/apache/druid/indexer/InputRowSerde.java | 59 ++++++++++++++--------
.../apache/druid/indexer/InputRowSerdeTest.java | 55 ++++++++++++++++++++
2 files changed, 93 insertions(+), 21 deletions(-)
diff --git a/indexing-hadoop/src/main/java/org/apache/druid/indexer/InputRowSerde.java b/indexing-hadoop/src/main/java/org/apache/druid/indexer/InputRowSerde.java
index b9c48f4..c0a42f2 100644
--- a/indexing-hadoop/src/main/java/org/apache/druid/indexer/InputRowSerde.java
+++ b/indexing-hadoop/src/main/java/org/apache/druid/indexer/InputRowSerde.java
@@ -51,6 +51,7 @@ import java.util.ArrayList;
import java.util.HashMap;
import java.util.List;
import java.util.Map;
+import java.util.function.Consumer;
/**
*/
@@ -63,6 +64,34 @@ public class InputRowSerde
private static final IndexSerdeTypeHelper FLOAT_HELPER = new FloatIndexSerdeTypeHelper();
private static final IndexSerdeTypeHelper DOUBLE_HELPER = new DoubleIndexSerdeTypeHelper();
+ private static <T extends Number> void writeNullableNumeric(
+ T ret,
+ final ByteArrayDataOutput out,
+ final Supplier<T> getDefault,
+ final Consumer<T> write)
+ {
+ if (ret == null) {
+ ret = getDefault.get();
+ }
+
+ // Write the null byte only if the default numeric value is still null.
+ if (ret == null) {
+ out.writeByte(NullHandling.IS_NULL_BYTE);
+ return;
+ }
+
+ if (NullHandling.sqlCompatible()) {
+ out.writeByte(NullHandling.IS_NOT_NULL_BYTE);
+ }
+
+ write.accept(ret);
+ }
+
+ private static boolean isNullByteSet(final ByteArrayDataInput in)
+ {
+ return NullHandling.sqlCompatible() && in.readByte() == NullHandling.IS_NULL_BYTE;
+ }
+
public interface IndexSerdeTypeHelper<T>
{
ValueType getType();
@@ -175,12 +204,7 @@ public class InputRowSerde
exceptionToThrow = pe;
}
- if (ret == null) {
- // remove null -> zero conversion when https://github.com/apache/incubator-druid/pull/5278 series of patches is merged
- // we'll also need to change the serialized encoding so that it can represent numeric nulls
- ret = DimensionHandlerUtils.ZERO_LONG;
- }
- out.writeLong(ret);
+ writeNullableNumeric(ret, out, NullHandling::defaultLongValue, out::writeLong);
if (exceptionToThrow != null) {
throw exceptionToThrow;
@@ -188,9 +212,10 @@ public class InputRowSerde
}
@Override
+ @Nullable
public Long deserialize(ByteArrayDataInput in)
{
- return in.readLong();
+ return isNullByteSet(in) ? null : in.readLong();
}
}
@@ -214,12 +239,7 @@ public class InputRowSerde
exceptionToThrow = pe;
}
- if (ret == null) {
- // remove null -> zero conversion when https://github.com/apache/incubator-druid/pull/5278 series of patches is merged
- // we'll also need to change the serialized encoding so that it can represent numeric nulls
- ret = DimensionHandlerUtils.ZERO_FLOAT;
- }
- out.writeFloat(ret);
+ writeNullableNumeric(ret, out, NullHandling::defaultFloatValue, out::writeFloat);
if (exceptionToThrow != null) {
throw exceptionToThrow;
@@ -227,9 +247,10 @@ public class InputRowSerde
}
@Override
+ @Nullable
public Float deserialize(ByteArrayDataInput in)
{
- return in.readFloat();
+ return isNullByteSet(in) ? null : in.readFloat();
}
}
@@ -253,12 +274,7 @@ public class InputRowSerde
exceptionToThrow = pe;
}
- if (ret == null) {
- // remove null -> zero conversion when https://github.com/apache/incubator-druid/pull/5278 series of patches is merged
- // we'll also need to change the serialized encoding so that it can represent numeric nulls
- ret = DimensionHandlerUtils.ZERO_DOUBLE;
- }
- out.writeDouble(ret);
+ writeNullableNumeric(ret, out, NullHandling::defaultDoubleValue, out::writeDouble);
if (exceptionToThrow != null) {
throw exceptionToThrow;
@@ -266,9 +282,10 @@ public class InputRowSerde
}
@Override
+ @Nullable
public Double deserialize(ByteArrayDataInput in)
{
- return in.readDouble();
+ return isNullByteSet(in) ? null : in.readDouble();
}
}
diff --git a/indexing-hadoop/src/test/java/org/apache/druid/indexer/InputRowSerdeTest.java b/indexing-hadoop/src/test/java/org/apache/druid/indexer/InputRowSerdeTest.java
index ce9b95c..3d00c15 100644
--- a/indexing-hadoop/src/test/java/org/apache/druid/indexer/InputRowSerdeTest.java
+++ b/indexing-hadoop/src/test/java/org/apache/druid/indexer/InputRowSerdeTest.java
@@ -254,4 +254,59 @@ public class InputRowSerdeTest
result.getParseExceptionMessages()
);
}
+
+ @Test
+ public void testDimensionNullOrDefaultForNumerics()
+ {
+ HashMap<String, Object> eventWithNulls = new HashMap<>();
+ eventWithNulls.put("d1", null);
+ eventWithNulls.put("d2", Arrays.asList("d2v1", "d2v2"));
+ eventWithNulls.put("d3", null);
+ eventWithNulls.put("d4", null);
+ eventWithNulls.put("d5", null);
+
+ InputRow in = new MapBasedInputRow(
+ timestamp,
+ dims,
+ eventWithNulls
+ );
+
+ DimensionsSpec dimensionsSpec = new DimensionsSpec(
+ Arrays.asList(
+ new StringDimensionSchema("d1"),
+ new StringDimensionSchema("d2"),
+ new LongDimensionSchema("d3"),
+ new FloatDimensionSchema("d4"),
+ new DoubleDimensionSchema("d5")
+ ),
+ null,
+ null
+ );
+
+ byte[] result = InputRowSerde.toBytes(InputRowSerde.getTypeHelperMap(dimensionsSpec), in, new AggregatorFactory[0]).getSerializedRow();
+
+ if (NullHandling.replaceWithDefault()) {
+ long expected = 0;
+ expected += 9; // timestamp bytes + dims length
+ expected += 18; // dim_non_existing writes: 1 16 1 bytes
+ expected += 4; // d1: writes 1 2 1 bytes
+ expected += 14; // d2: writes 1 2 1 1 4 1 4 bytes
+ expected += 11; // d3: writes 1 2 8 bytes
+ expected += 7; // d4: writes 1 2 4 bytes
+ expected += 11; // d5: writes 1 2 8 bytes
+ expected += 1; // writes aggregator length
+
+ Assert.assertEquals(expected, result.length);
+ Assert.assertArrayEquals(new byte[] {0, 0, 0, 0, 0, 0, 0, 0}, Arrays.copyOfRange(result, 48, 56));
+ Assert.assertArrayEquals(new byte[] {0, 0, 0, 0}, Arrays.copyOfRange(result, 59, 63));
+ Assert.assertArrayEquals(new byte[] {0, 0, 0, 0, 0, 0, 0, 0}, Arrays.copyOfRange(result, 66, 74));
+ } else {
+ long expected = 9 + 18 + 4 + 14 + 4 + 4 + 4 + 1;
+
+ Assert.assertEquals(expected, result.length);
+ Assert.assertEquals(result[48], NullHandling.IS_NULL_BYTE);
+ Assert.assertEquals(result[52], NullHandling.IS_NULL_BYTE);
+ Assert.assertEquals(result[56], NullHandling.IS_NULL_BYTE);
+ }
+ }
}
---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscribe@druid.apache.org
For additional commands, e-mail: commits-help@druid.apache.org