You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@druid.apache.org by gi...@apache.org on 2019/03/12 01:02:07 UTC

[incubator-druid] branch master updated: Write null byte when indexing numeric dimensions with Hadoop (#7020)

This is an automated email from the ASF dual-hosted git repository.

gian pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/incubator-druid.git


The following commit(s) were added to refs/heads/master by this push:
     new c503ba9  Write null byte when indexing numeric dimensions with Hadoop (#7020)
c503ba9 is described below

commit c503ba97799e3531753c5a26ef9c6f06c0e7e13e
Author: Ferris Tseng <fe...@gmail.com>
AuthorDate: Mon Mar 11 21:02:03 2019 -0400

    Write null byte when indexing numeric dimensions with Hadoop (#7020)
    
    * write null byte in hadoop indexing for numeric dimensions
    
    * Add test case to check output serializing null numeric dimensions
    
    * Remove extra line
    
    * Add @Nullable annotations
---
 .../org/apache/druid/indexer/InputRowSerde.java    | 59 ++++++++++++++--------
 .../apache/druid/indexer/InputRowSerdeTest.java    | 55 ++++++++++++++++++++
 2 files changed, 93 insertions(+), 21 deletions(-)

diff --git a/indexing-hadoop/src/main/java/org/apache/druid/indexer/InputRowSerde.java b/indexing-hadoop/src/main/java/org/apache/druid/indexer/InputRowSerde.java
index b9c48f4..c0a42f2 100644
--- a/indexing-hadoop/src/main/java/org/apache/druid/indexer/InputRowSerde.java
+++ b/indexing-hadoop/src/main/java/org/apache/druid/indexer/InputRowSerde.java
@@ -51,6 +51,7 @@ import java.util.ArrayList;
 import java.util.HashMap;
 import java.util.List;
 import java.util.Map;
+import java.util.function.Consumer;
 
 /**
  */
@@ -63,6 +64,34 @@ public class InputRowSerde
   private static final IndexSerdeTypeHelper FLOAT_HELPER = new FloatIndexSerdeTypeHelper();
   private static final IndexSerdeTypeHelper DOUBLE_HELPER = new DoubleIndexSerdeTypeHelper();
 
+  private static <T extends Number> void writeNullableNumeric(
+      T ret,
+      final ByteArrayDataOutput out,
+      final Supplier<T> getDefault,
+      final Consumer<T> write)
+  {
+    if (ret == null) {
+      ret = getDefault.get();
+    }
+
+    // Write the null byte only if the default numeric value is still null.
+    if (ret == null) {
+      out.writeByte(NullHandling.IS_NULL_BYTE);
+      return;
+    }
+
+    if (NullHandling.sqlCompatible()) {
+      out.writeByte(NullHandling.IS_NOT_NULL_BYTE);
+    }
+
+    write.accept(ret);
+  }
+
+  private static boolean isNullByteSet(final ByteArrayDataInput in)
+  {
+    return NullHandling.sqlCompatible() && in.readByte() == NullHandling.IS_NULL_BYTE;
+  }
+
   public interface IndexSerdeTypeHelper<T>
   {
     ValueType getType();
@@ -175,12 +204,7 @@ public class InputRowSerde
         exceptionToThrow = pe;
       }
 
-      if (ret == null) {
-        // remove null -> zero conversion when https://github.com/apache/incubator-druid/pull/5278 series of patches is merged
-        // we'll also need to change the serialized encoding so that it can represent numeric nulls
-        ret = DimensionHandlerUtils.ZERO_LONG;
-      }
-      out.writeLong(ret);
+      writeNullableNumeric(ret, out, NullHandling::defaultLongValue, out::writeLong);
 
       if (exceptionToThrow != null) {
         throw exceptionToThrow;
@@ -188,9 +212,10 @@ public class InputRowSerde
     }
 
     @Override
+    @Nullable
     public Long deserialize(ByteArrayDataInput in)
     {
-      return in.readLong();
+      return isNullByteSet(in) ? null : in.readLong();
     }
   }
 
@@ -214,12 +239,7 @@ public class InputRowSerde
         exceptionToThrow = pe;
       }
 
-      if (ret == null) {
-        // remove null -> zero conversion when https://github.com/apache/incubator-druid/pull/5278 series of patches is merged
-        // we'll also need to change the serialized encoding so that it can represent numeric nulls
-        ret = DimensionHandlerUtils.ZERO_FLOAT;
-      }
-      out.writeFloat(ret);
+      writeNullableNumeric(ret, out, NullHandling::defaultFloatValue, out::writeFloat);
 
       if (exceptionToThrow != null) {
         throw exceptionToThrow;
@@ -227,9 +247,10 @@ public class InputRowSerde
     }
 
     @Override
+    @Nullable
     public Float deserialize(ByteArrayDataInput in)
     {
-      return in.readFloat();
+      return isNullByteSet(in) ? null : in.readFloat();
     }
   }
 
@@ -253,12 +274,7 @@ public class InputRowSerde
         exceptionToThrow = pe;
       }
 
-      if (ret == null) {
-        // remove null -> zero conversion when https://github.com/apache/incubator-druid/pull/5278 series of patches is merged
-        // we'll also need to change the serialized encoding so that it can represent numeric nulls
-        ret = DimensionHandlerUtils.ZERO_DOUBLE;
-      }
-      out.writeDouble(ret);
+      writeNullableNumeric(ret, out, NullHandling::defaultDoubleValue, out::writeDouble);
 
       if (exceptionToThrow != null) {
         throw exceptionToThrow;
@@ -266,9 +282,10 @@ public class InputRowSerde
     }
 
     @Override
+    @Nullable
     public Double deserialize(ByteArrayDataInput in)
     {
-      return in.readDouble();
+      return isNullByteSet(in) ? null : in.readDouble();
     }
   }
 
diff --git a/indexing-hadoop/src/test/java/org/apache/druid/indexer/InputRowSerdeTest.java b/indexing-hadoop/src/test/java/org/apache/druid/indexer/InputRowSerdeTest.java
index ce9b95c..3d00c15 100644
--- a/indexing-hadoop/src/test/java/org/apache/druid/indexer/InputRowSerdeTest.java
+++ b/indexing-hadoop/src/test/java/org/apache/druid/indexer/InputRowSerdeTest.java
@@ -254,4 +254,59 @@ public class InputRowSerdeTest
         result.getParseExceptionMessages()
     );
   }
+
+  @Test
+  public void testDimensionNullOrDefaultForNumerics()
+  {
+    HashMap<String, Object> eventWithNulls = new HashMap<>();
+    eventWithNulls.put("d1", null);
+    eventWithNulls.put("d2", Arrays.asList("d2v1", "d2v2"));
+    eventWithNulls.put("d3", null);
+    eventWithNulls.put("d4", null);
+    eventWithNulls.put("d5", null);
+
+    InputRow in = new MapBasedInputRow(
+        timestamp,
+        dims,
+        eventWithNulls
+    );
+
+    DimensionsSpec dimensionsSpec = new DimensionsSpec(
+        Arrays.asList(
+            new StringDimensionSchema("d1"),
+            new StringDimensionSchema("d2"),
+            new LongDimensionSchema("d3"),
+            new FloatDimensionSchema("d4"),
+            new DoubleDimensionSchema("d5")
+        ),
+        null,
+        null
+    );
+
+    byte[] result = InputRowSerde.toBytes(InputRowSerde.getTypeHelperMap(dimensionsSpec), in, new AggregatorFactory[0]).getSerializedRow();
+
+    if (NullHandling.replaceWithDefault()) {
+      long expected = 0;
+      expected += 9;  // timestamp bytes + dims length
+      expected += 18; // dim_non_existing writes: 1 16 1 bytes
+      expected += 4;  // d1: writes 1 2 1 bytes
+      expected += 14; // d2: writes 1 2 1 1 4 1 4 bytes
+      expected += 11; // d3: writes 1 2 8 bytes
+      expected += 7;  // d4: writes 1 2 4 bytes
+      expected += 11; // d5: writes 1 2 8 bytes
+      expected += 1;  // writes aggregator length
+
+      Assert.assertEquals(expected, result.length);
+      Assert.assertArrayEquals(new byte[] {0, 0, 0, 0, 0, 0, 0, 0}, Arrays.copyOfRange(result, 48, 56));
+      Assert.assertArrayEquals(new byte[] {0, 0, 0, 0}, Arrays.copyOfRange(result, 59, 63));
+      Assert.assertArrayEquals(new byte[] {0, 0, 0, 0, 0, 0, 0, 0}, Arrays.copyOfRange(result, 66, 74));
+    } else {
+      long expected = 9 + 18 + 4 + 14 + 4 + 4 + 4 + 1;
+
+      Assert.assertEquals(expected, result.length);
+      Assert.assertEquals(result[48], NullHandling.IS_NULL_BYTE);
+      Assert.assertEquals(result[52], NullHandling.IS_NULL_BYTE);
+      Assert.assertEquals(result[56], NullHandling.IS_NULL_BYTE);
+    }
+  }
 }


---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscribe@druid.apache.org
For additional commands, e-mail: commits-help@druid.apache.org