You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@druid.apache.org by ab...@apache.org on 2021/11/24 09:00:30 UTC

[druid] branch master updated: Handle null values in Range Partition dimension distribution (#11973)

This is an automated email from the ASF dual-hosted git repository.

abhishek pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/druid.git


The following commit(s) were added to refs/heads/master by this push:
     new 48dbe0e  Handle null values in Range Partition dimension distribution (#11973)
48dbe0e is described below

commit 48dbe0ea45aca63b352ce599a832ae2bec889957
Author: Kashif Faraz <ka...@gmail.com>
AuthorDate: Wed Nov 24 14:30:02 2021 +0530

    Handle null values in Range Partition dimension distribution (#11973)
    
    This PR adds support for handling null dimension values while creating partition boundaries
    in range partitioning.
    
    This means that we can now have partition boundaries like [null, "abc"] or ["abc", null, "def"].
---
 .../parallel/PartialDimensionDistributionTask.java |  12 ++-
 .../distribution/ArrayOfStringTuplesSerDe.java     |   2 +-
 .../distribution/ArrayOfStringsNullSafeSerde.java  | 110 +++++++++++++++++++++
 .../ArrayOfStringsNullSafeSerdeTest.java           |  91 +++++++++++++++++
 4 files changed, 209 insertions(+), 6 deletions(-)

diff --git a/indexing-service/src/main/java/org/apache/druid/indexing/common/task/batch/parallel/PartialDimensionDistributionTask.java b/indexing-service/src/main/java/org/apache/druid/indexing/common/task/batch/parallel/PartialDimensionDistributionTask.java
index d0cce9d..f83004b 100644
--- a/indexing-service/src/main/java/org/apache/druid/indexing/common/task/batch/parallel/PartialDimensionDistributionTask.java
+++ b/indexing-service/src/main/java/org/apache/druid/indexing/common/task/batch/parallel/PartialDimensionDistributionTask.java
@@ -68,8 +68,9 @@ public class PartialDimensionDistributionTask extends PerfectRollupWorkerTask
 {
   public static final String TYPE = "partial_dimension_distribution";
 
-  // Future work: StringDistribution does not handle inserting NULLs. This is the same behavior as hadoop indexing.
-  private static final boolean SKIP_NULL = true;
+  // Do not skip nulls as StringDistribution can handle null values.
+  // This behavior is different from hadoop indexing.
+  private static final boolean SKIP_NULL = false;
 
   private final int numAttempts;
   private final ParallelIndexIngestionSpec ingestionSchema;
@@ -276,9 +277,10 @@ public class PartialDimensionDistributionTask extends PerfectRollupWorkerTask
       }
       String[] values = new String[partitionDimensions.size()];
       for (int i = 0; i < partitionDimensions.size(); ++i) {
-        values[i] = Iterables.getOnlyElement(
-            inputRow.getDimension(partitionDimensions.get(i))
-        );
+        List<String> dimensionValues = inputRow.getDimension(partitionDimensions.get(i));
+        if (dimensionValues != null && !dimensionValues.isEmpty()) {
+          values[i] = Iterables.getOnlyElement(dimensionValues);
+        }
       }
       final StringTuple partitionDimensionValues = StringTuple.create(values);
 
diff --git a/indexing-service/src/main/java/org/apache/druid/indexing/common/task/batch/parallel/distribution/ArrayOfStringTuplesSerDe.java b/indexing-service/src/main/java/org/apache/druid/indexing/common/task/batch/parallel/distribution/ArrayOfStringTuplesSerDe.java
index 1035acb..87f6284 100644
--- a/indexing-service/src/main/java/org/apache/druid/indexing/common/task/batch/parallel/distribution/ArrayOfStringTuplesSerDe.java
+++ b/indexing-service/src/main/java/org/apache/druid/indexing/common/task/batch/parallel/distribution/ArrayOfStringTuplesSerDe.java
@@ -33,7 +33,7 @@ import org.apache.druid.data.input.StringTuple;
  */
 public class ArrayOfStringTuplesSerDe extends ArrayOfItemsSerDe<StringTuple>
 {
-  private static final ArrayOfStringsSerDe STRINGS_SERDE = new ArrayOfStringsSerDe();
+  private static final ArrayOfStringsNullSafeSerde STRINGS_SERDE = new ArrayOfStringsNullSafeSerde();
 
   @Override
   public byte[] serializeToByteArray(StringTuple[] items)
diff --git a/indexing-service/src/main/java/org/apache/druid/indexing/common/task/batch/parallel/distribution/ArrayOfStringsNullSafeSerde.java b/indexing-service/src/main/java/org/apache/druid/indexing/common/task/batch/parallel/distribution/ArrayOfStringsNullSafeSerde.java
new file mode 100644
index 0000000..a289b20
--- /dev/null
+++ b/indexing-service/src/main/java/org/apache/druid/indexing/common/task/batch/parallel/distribution/ArrayOfStringsNullSafeSerde.java
@@ -0,0 +1,110 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.druid.indexing.common.task.batch.parallel.distribution;
+
+import org.apache.datasketches.ArrayOfItemsSerDe;
+import org.apache.datasketches.ArrayOfStringsSerDe;
+import org.apache.datasketches.Util;
+import org.apache.datasketches.memory.Memory;
+import org.apache.datasketches.memory.WritableMemory;
+import org.apache.druid.data.input.StringTuple;
+import org.apache.druid.java.util.common.IAE;
+
+import java.nio.charset.StandardCharsets;
+
+/**
+ * Serde for {@link StringTuple}.
+ * <p>
+ * The implementation is the same as {@link ArrayOfStringsSerDe}, except this
+ * class handles null String values as well.
+ */
+public class ArrayOfStringsNullSafeSerde extends ArrayOfItemsSerDe<String>
+{
+
+  private static final int NULL_STRING_LENGTH = -1;
+
+  @Override
+  public byte[] serializeToByteArray(final String[] items)
+  {
+    // Determine the bytes for each String
+    int length = 0;
+    final byte[][] itemsBytes = new byte[items.length][];
+    for (int i = 0; i < items.length; i++) {
+      length += Integer.BYTES;
+
+      // Do not initialize the byte array for a null String
+      if (items[i] != null) {
+        itemsBytes[i] = items[i].getBytes(StandardCharsets.UTF_8);
+        length += itemsBytes[i].length;
+      }
+    }
+
+    // Create a single byte array for all the Strings
+    final byte[] bytes = new byte[length];
+    final WritableMemory mem = WritableMemory.writableWrap(bytes);
+    long offsetBytes = 0;
+    for (int i = 0; i < items.length; i++) {
+      if (itemsBytes[i] != null) {
+        // Write the length of the array and the array itself
+        mem.putInt(offsetBytes, itemsBytes[i].length);
+        offsetBytes += Integer.BYTES;
+        mem.putByteArray(offsetBytes, itemsBytes[i], 0, itemsBytes[i].length);
+        offsetBytes += itemsBytes[i].length;
+      } else {
+        mem.putInt(offsetBytes, NULL_STRING_LENGTH);
+        offsetBytes += Integer.BYTES;
+      }
+    }
+
+    return bytes;
+  }
+
+  @Override
+  public String[] deserializeFromMemory(final Memory mem, final int numItems)
+  {
+    final String[] array = new String[numItems];
+    long offsetBytes = 0;
+    for (int i = 0; i < numItems; i++) {
+      // Read the length of the ith String
+      Util.checkBounds(offsetBytes, Integer.BYTES, mem.getCapacity());
+      final int strLength = mem.getInt(offsetBytes);
+      offsetBytes += Integer.BYTES;
+
+      if (strLength >= 0) {
+        // Read the bytes for the String
+        final byte[] bytes = new byte[strLength];
+        Util.checkBounds(offsetBytes, strLength, mem.getCapacity());
+        mem.getByteArray(offsetBytes, bytes, 0, strLength);
+        offsetBytes += strLength;
+        array[i] = new String(bytes, StandardCharsets.UTF_8);
+      } else if (strLength != NULL_STRING_LENGTH) {
+        throw new IAE(
+            "Illegal strLength [%s] at offset [%s]. Must be %s, 0 or a positive integer.",
+            strLength,
+            offsetBytes,
+            NULL_STRING_LENGTH
+        );
+      }
+    }
+    return array;
+  }
+
+}
+
diff --git a/indexing-service/src/test/java/org/apache/druid/indexing/common/task/batch/parallel/distribution/ArrayOfStringsNullSafeSerdeTest.java b/indexing-service/src/test/java/org/apache/druid/indexing/common/task/batch/parallel/distribution/ArrayOfStringsNullSafeSerdeTest.java
new file mode 100644
index 0000000..927f311
--- /dev/null
+++ b/indexing-service/src/test/java/org/apache/druid/indexing/common/task/batch/parallel/distribution/ArrayOfStringsNullSafeSerdeTest.java
@@ -0,0 +1,91 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.druid.indexing.common.task.batch.parallel.distribution;
+
+import org.apache.datasketches.memory.Memory;
+import org.apache.druid.java.util.common.IAE;
+import org.junit.Assert;
+import org.junit.Test;
+
+public class ArrayOfStringsNullSafeSerdeTest
+{
+
+  private final ArrayOfStringsNullSafeSerde serde = new ArrayOfStringsNullSafeSerde();
+
+  @Test
+  public void testStringArray()
+  {
+    testSerde("abc", "def", "xyz");
+    testSerde("abc", "123", "456.0");
+  }
+
+  @Test
+  public void testSingletonArray()
+  {
+    testSerde("abc");
+    testSerde("xyz");
+  }
+
+  @Test
+  public void testEmptyArray()
+  {
+    testSerde();
+  }
+
+  @Test
+  public void testArrayWithNullString()
+  {
+    testSerde((String) null);
+    testSerde("abc", null, "def");
+    testSerde(null, null, null);
+  }
+
+  @Test
+  public void testArrayWithEmptyString()
+  {
+    testSerde("");
+    testSerde("abc", "def", "");
+    testSerde("", "", "");
+    testSerde("", null, "abc");
+  }
+
+  @Test
+  public void testIllegalStrLength()
+  {
+    // bytes for length = -2
+    final byte[] bytes = {-2, -1, -1, -1};
+    IAE exception = Assert.assertThrows(
+        IAE.class,
+        () -> serde.deserializeFromMemory(Memory.wrap(bytes), 1)
+    );
+    Assert.assertEquals(
+        "Illegal strLength [-2] at offset [4]. Must be -1, 0 or a positive integer.",
+        exception.getMessage()
+    );
+  }
+
+  private void testSerde(String... inputArray)
+  {
+    byte[] bytes = serde.serializeToByteArray(inputArray);
+    String[] deserialized = serde.deserializeFromMemory(Memory.wrap(bytes), inputArray.length);
+    Assert.assertEquals(inputArray, deserialized);
+  }
+
+}

---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscribe@druid.apache.org
For additional commands, e-mail: commits-help@druid.apache.org