You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@druid.apache.org by ab...@apache.org on 2021/11/24 09:00:30 UTC
[druid] branch master updated: Handle null values in Range Partition dimension distribution (#11973)
This is an automated email from the ASF dual-hosted git repository.
abhishek pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/druid.git
The following commit(s) were added to refs/heads/master by this push:
new 48dbe0e Handle null values in Range Partition dimension distribution (#11973)
48dbe0e is described below
commit 48dbe0ea45aca63b352ce599a832ae2bec889957
Author: Kashif Faraz <ka...@gmail.com>
AuthorDate: Wed Nov 24 14:30:02 2021 +0530
Handle null values in Range Partition dimension distribution (#11973)
This PR adds support for handling null dimension values while creating partition boundaries
in range partitioning.
This means that we can now have partition boundaries like [null, "abc"] or ["abc", null, "def"].
---
.../parallel/PartialDimensionDistributionTask.java | 12 ++-
.../distribution/ArrayOfStringTuplesSerDe.java | 2 +-
.../distribution/ArrayOfStringsNullSafeSerde.java | 110 +++++++++++++++++++++
.../ArrayOfStringsNullSafeSerdeTest.java | 91 +++++++++++++++++
4 files changed, 209 insertions(+), 6 deletions(-)
diff --git a/indexing-service/src/main/java/org/apache/druid/indexing/common/task/batch/parallel/PartialDimensionDistributionTask.java b/indexing-service/src/main/java/org/apache/druid/indexing/common/task/batch/parallel/PartialDimensionDistributionTask.java
index d0cce9d..f83004b 100644
--- a/indexing-service/src/main/java/org/apache/druid/indexing/common/task/batch/parallel/PartialDimensionDistributionTask.java
+++ b/indexing-service/src/main/java/org/apache/druid/indexing/common/task/batch/parallel/PartialDimensionDistributionTask.java
@@ -68,8 +68,9 @@ public class PartialDimensionDistributionTask extends PerfectRollupWorkerTask
{
public static final String TYPE = "partial_dimension_distribution";
- // Future work: StringDistribution does not handle inserting NULLs. This is the same behavior as hadoop indexing.
- private static final boolean SKIP_NULL = true;
+ // Do not skip nulls as StringDistribution can handle null values.
+ // This behavior is different from hadoop indexing.
+ private static final boolean SKIP_NULL = false;
private final int numAttempts;
private final ParallelIndexIngestionSpec ingestionSchema;
@@ -276,9 +277,10 @@ public class PartialDimensionDistributionTask extends PerfectRollupWorkerTask
}
String[] values = new String[partitionDimensions.size()];
for (int i = 0; i < partitionDimensions.size(); ++i) {
- values[i] = Iterables.getOnlyElement(
- inputRow.getDimension(partitionDimensions.get(i))
- );
+ List<String> dimensionValues = inputRow.getDimension(partitionDimensions.get(i));
+ if (dimensionValues != null && !dimensionValues.isEmpty()) {
+ values[i] = Iterables.getOnlyElement(dimensionValues);
+ }
}
final StringTuple partitionDimensionValues = StringTuple.create(values);
diff --git a/indexing-service/src/main/java/org/apache/druid/indexing/common/task/batch/parallel/distribution/ArrayOfStringTuplesSerDe.java b/indexing-service/src/main/java/org/apache/druid/indexing/common/task/batch/parallel/distribution/ArrayOfStringTuplesSerDe.java
index 1035acb..87f6284 100644
--- a/indexing-service/src/main/java/org/apache/druid/indexing/common/task/batch/parallel/distribution/ArrayOfStringTuplesSerDe.java
+++ b/indexing-service/src/main/java/org/apache/druid/indexing/common/task/batch/parallel/distribution/ArrayOfStringTuplesSerDe.java
@@ -33,7 +33,7 @@ import org.apache.druid.data.input.StringTuple;
*/
public class ArrayOfStringTuplesSerDe extends ArrayOfItemsSerDe<StringTuple>
{
- private static final ArrayOfStringsSerDe STRINGS_SERDE = new ArrayOfStringsSerDe();
+ private static final ArrayOfStringsNullSafeSerde STRINGS_SERDE = new ArrayOfStringsNullSafeSerde();
@Override
public byte[] serializeToByteArray(StringTuple[] items)
diff --git a/indexing-service/src/main/java/org/apache/druid/indexing/common/task/batch/parallel/distribution/ArrayOfStringsNullSafeSerde.java b/indexing-service/src/main/java/org/apache/druid/indexing/common/task/batch/parallel/distribution/ArrayOfStringsNullSafeSerde.java
new file mode 100644
index 0000000..a289b20
--- /dev/null
+++ b/indexing-service/src/main/java/org/apache/druid/indexing/common/task/batch/parallel/distribution/ArrayOfStringsNullSafeSerde.java
@@ -0,0 +1,110 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.druid.indexing.common.task.batch.parallel.distribution;
+
+import org.apache.datasketches.ArrayOfItemsSerDe;
+import org.apache.datasketches.ArrayOfStringsSerDe;
+import org.apache.datasketches.Util;
+import org.apache.datasketches.memory.Memory;
+import org.apache.datasketches.memory.WritableMemory;
+import org.apache.druid.data.input.StringTuple;
+import org.apache.druid.java.util.common.IAE;
+
+import java.nio.charset.StandardCharsets;
+
+/**
+ * Serde for {@link StringTuple}.
+ * <p>
+ * The implementation is the same as {@link ArrayOfStringsSerDe}, except this
+ * class handles null String values as well.
+ */
+public class ArrayOfStringsNullSafeSerde extends ArrayOfItemsSerDe<String>
+{
+
+ private static final int NULL_STRING_LENGTH = -1;
+
+ @Override
+ public byte[] serializeToByteArray(final String[] items)
+ {
+ // Determine the bytes for each String
+ int length = 0;
+ final byte[][] itemsBytes = new byte[items.length][];
+ for (int i = 0; i < items.length; i++) {
+ length += Integer.BYTES;
+
+ // Do not initialize the byte array for a null String
+ if (items[i] != null) {
+ itemsBytes[i] = items[i].getBytes(StandardCharsets.UTF_8);
+ length += itemsBytes[i].length;
+ }
+ }
+
+ // Create a single byte array for all the Strings
+ final byte[] bytes = new byte[length];
+ final WritableMemory mem = WritableMemory.writableWrap(bytes);
+ long offsetBytes = 0;
+ for (int i = 0; i < items.length; i++) {
+ if (itemsBytes[i] != null) {
+ // Write the length of the array and the array itself
+ mem.putInt(offsetBytes, itemsBytes[i].length);
+ offsetBytes += Integer.BYTES;
+ mem.putByteArray(offsetBytes, itemsBytes[i], 0, itemsBytes[i].length);
+ offsetBytes += itemsBytes[i].length;
+ } else {
+ mem.putInt(offsetBytes, NULL_STRING_LENGTH);
+ offsetBytes += Integer.BYTES;
+ }
+ }
+
+ return bytes;
+ }
+
+ @Override
+ public String[] deserializeFromMemory(final Memory mem, final int numItems)
+ {
+ final String[] array = new String[numItems];
+ long offsetBytes = 0;
+ for (int i = 0; i < numItems; i++) {
+ // Read the length of the ith String
+ Util.checkBounds(offsetBytes, Integer.BYTES, mem.getCapacity());
+ final int strLength = mem.getInt(offsetBytes);
+ offsetBytes += Integer.BYTES;
+
+ if (strLength >= 0) {
+ // Read the bytes for the String
+ final byte[] bytes = new byte[strLength];
+ Util.checkBounds(offsetBytes, strLength, mem.getCapacity());
+ mem.getByteArray(offsetBytes, bytes, 0, strLength);
+ offsetBytes += strLength;
+ array[i] = new String(bytes, StandardCharsets.UTF_8);
+ } else if (strLength != NULL_STRING_LENGTH) {
+ throw new IAE(
+ "Illegal strLength [%s] at offset [%s]. Must be %s, 0 or a positive integer.",
+ strLength,
+ offsetBytes,
+ NULL_STRING_LENGTH
+ );
+ }
+ }
+ return array;
+ }
+
+}
+
diff --git a/indexing-service/src/test/java/org/apache/druid/indexing/common/task/batch/parallel/distribution/ArrayOfStringsNullSafeSerdeTest.java b/indexing-service/src/test/java/org/apache/druid/indexing/common/task/batch/parallel/distribution/ArrayOfStringsNullSafeSerdeTest.java
new file mode 100644
index 0000000..927f311
--- /dev/null
+++ b/indexing-service/src/test/java/org/apache/druid/indexing/common/task/batch/parallel/distribution/ArrayOfStringsNullSafeSerdeTest.java
@@ -0,0 +1,91 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.druid.indexing.common.task.batch.parallel.distribution;
+
+import org.apache.datasketches.memory.Memory;
+import org.apache.druid.java.util.common.IAE;
+import org.junit.Assert;
+import org.junit.Test;
+
+public class ArrayOfStringsNullSafeSerdeTest
+{
+
+ private final ArrayOfStringsNullSafeSerde serde = new ArrayOfStringsNullSafeSerde();
+
+ @Test
+ public void testStringArray()
+ {
+ testSerde("abc", "def", "xyz");
+ testSerde("abc", "123", "456.0");
+ }
+
+ @Test
+ public void testSingletonArray()
+ {
+ testSerde("abc");
+ testSerde("xyz");
+ }
+
+ @Test
+ public void testEmptyArray()
+ {
+ testSerde();
+ }
+
+ @Test
+ public void testArrayWithNullString()
+ {
+ testSerde((String) null);
+ testSerde("abc", null, "def");
+ testSerde(null, null, null);
+ }
+
+ @Test
+ public void testArrayWithEmptyString()
+ {
+ testSerde("");
+ testSerde("abc", "def", "");
+ testSerde("", "", "");
+ testSerde("", null, "abc");
+ }
+
+ @Test
+ public void testIllegalStrLength()
+ {
+ // bytes for length = -2
+ final byte[] bytes = {-2, -1, -1, -1};
+ IAE exception = Assert.assertThrows(
+ IAE.class,
+ () -> serde.deserializeFromMemory(Memory.wrap(bytes), 1)
+ );
+ Assert.assertEquals(
+ "Illegal strLength [-2] at offset [4]. Must be -1, 0 or a positive integer.",
+ exception.getMessage()
+ );
+ }
+
+ private void testSerde(String... inputArray)
+ {
+ byte[] bytes = serde.serializeToByteArray(inputArray);
+ String[] deserialized = serde.deserializeFromMemory(Memory.wrap(bytes), inputArray.length);
+ Assert.assertEquals(inputArray, deserialized);
+ }
+
+}
---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscribe@druid.apache.org
For additional commands, e-mail: commits-help@druid.apache.org