You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@druid.apache.org by al...@apache.org on 2023/10/24 00:22:44 UTC

[druid] 01/01: use datasketches-java 4.2.0

This is an automated email from the ASF dual-hosted git repository.

alsay pushed a commit to branch datasketches-4.2.0
in repository https://gitbox.apache.org/repos/asf/druid.git

commit fa5daf5217390bea163bebe1c9cfe80a2ed6ac38
Author: AlexanderSaydakov <Al...@users.noreply.github.com>
AuthorDate: Mon Oct 23 17:22:32 2023 -0700

    use datasketches-java 4.2.0
---
 .../kll/KllDoublesSketchAggregatorFactory.java     |  3 +-
 .../kll/KllFloatsSketchAggregatorFactory.java      |  3 +-
 .../KllDoublesSketchComplexMetricSerdeTest.java    |  6 +-
 .../kll/KllFloatsSketchComplexMetricSerdeTest.java |  4 +-
 .../QuantilesSketchKeyCollectorFactory.java        | 55 +++++++++++++--
 .../distribution/ArrayOfStringTuplesSerDe.java     | 80 +++++++++++++++++++---
 .../distribution/ArrayOfStringsNullSafeSerde.java  |  3 +-
 licenses.yaml                                      |  2 +-
 pom.xml                                            |  2 +-
 9 files changed, 133 insertions(+), 25 deletions(-)

diff --git a/extensions-core/datasketches/src/main/java/org/apache/druid/query/aggregation/datasketches/kll/KllDoublesSketchAggregatorFactory.java b/extensions-core/datasketches/src/main/java/org/apache/druid/query/aggregation/datasketches/kll/KllDoublesSketchAggregatorFactory.java
index 815227adf55..8c6dacbdf39 100644
--- a/extensions-core/datasketches/src/main/java/org/apache/druid/query/aggregation/datasketches/kll/KllDoublesSketchAggregatorFactory.java
+++ b/extensions-core/datasketches/src/main/java/org/apache/druid/query/aggregation/datasketches/kll/KllDoublesSketchAggregatorFactory.java
@@ -22,6 +22,7 @@ package org.apache.druid.query.aggregation.datasketches.kll;
 import com.fasterxml.jackson.annotation.JsonCreator;
 import com.fasterxml.jackson.annotation.JsonProperty;
 import org.apache.datasketches.kll.KllDoublesSketch;
+import org.apache.datasketches.kll.KllSketch.SketchType;
 import org.apache.druid.query.aggregation.AggregatorFactory;
 import org.apache.druid.query.aggregation.AggregatorFactoryNotMergeableException;
 import org.apache.druid.query.aggregation.AggregatorUtil;
@@ -124,7 +125,7 @@ public class KllDoublesSketchAggregatorFactory extends KllSketchAggregatorFactor
   @Override
   int getMaxSerializedSizeBytes(final int k, final long n)
   {
-    return KllDoublesSketch.getMaxSerializedSizeBytes(k, n, true);
+    return KllDoublesSketch.getMaxSerializedSizeBytes(k, n, SketchType.DOUBLES_SKETCH, true);
   }
 
   @Override
diff --git a/extensions-core/datasketches/src/main/java/org/apache/druid/query/aggregation/datasketches/kll/KllFloatsSketchAggregatorFactory.java b/extensions-core/datasketches/src/main/java/org/apache/druid/query/aggregation/datasketches/kll/KllFloatsSketchAggregatorFactory.java
index 9cc61524615..5620921621f 100644
--- a/extensions-core/datasketches/src/main/java/org/apache/druid/query/aggregation/datasketches/kll/KllFloatsSketchAggregatorFactory.java
+++ b/extensions-core/datasketches/src/main/java/org/apache/druid/query/aggregation/datasketches/kll/KllFloatsSketchAggregatorFactory.java
@@ -22,6 +22,7 @@ package org.apache.druid.query.aggregation.datasketches.kll;
 import com.fasterxml.jackson.annotation.JsonCreator;
 import com.fasterxml.jackson.annotation.JsonProperty;
 import org.apache.datasketches.kll.KllFloatsSketch;
+import org.apache.datasketches.kll.KllSketch.SketchType;
 import org.apache.druid.query.aggregation.AggregatorFactory;
 import org.apache.druid.query.aggregation.AggregatorFactoryNotMergeableException;
 import org.apache.druid.query.aggregation.AggregatorUtil;
@@ -124,7 +125,7 @@ public class KllFloatsSketchAggregatorFactory extends KllSketchAggregatorFactory
   @Override
   int getMaxSerializedSizeBytes(final int k, final long n)
   {
-    return KllFloatsSketch.getMaxSerializedSizeBytes(k, n, true);
+    return KllFloatsSketch.getMaxSerializedSizeBytes(k, n, SketchType.FLOATS_SKETCH, true);
   }
 
   @Override
diff --git a/extensions-core/datasketches/src/test/java/org/apache/druid/query/aggregation/datasketches/kll/KllDoublesSketchComplexMetricSerdeTest.java b/extensions-core/datasketches/src/test/java/org/apache/druid/query/aggregation/datasketches/kll/KllDoublesSketchComplexMetricSerdeTest.java
index d0a26307990..730fb54c541 100644
--- a/extensions-core/datasketches/src/test/java/org/apache/druid/query/aggregation/datasketches/kll/KllDoublesSketchComplexMetricSerdeTest.java
+++ b/extensions-core/datasketches/src/test/java/org/apache/druid/query/aggregation/datasketches/kll/KllDoublesSketchComplexMetricSerdeTest.java
@@ -114,7 +114,7 @@ public class KllDoublesSketchComplexMetricSerdeTest
     objectStrategy.fromByteBufferSafe(buf, bytes.length).toByteArray();
 
     // corrupted sketch should fail with a regular java buffer exception, not all subsets actually fail with the same
-    // index out of bounds exceptions, but at least this many do
+    // sketches exceptions, but at least this many do
     for (int subset = 3; subset < 24; subset++) {
       final byte[] garbage2 = new byte[subset];
       for (int i = 0; i < garbage2.length; i++) {
@@ -123,7 +123,7 @@ public class KllDoublesSketchComplexMetricSerdeTest
 
       final ByteBuffer buf2 = ByteBuffer.wrap(garbage2).order(ByteOrder.LITTLE_ENDIAN);
       Assert.assertThrows(
-          IndexOutOfBoundsException.class,
+          Exception.class,
           () -> objectStrategy.fromByteBufferSafe(buf2, garbage2.length).toByteArray()
       );
     }
@@ -132,7 +132,7 @@ public class KllDoublesSketchComplexMetricSerdeTest
     final byte[] garbage = new byte[]{0x01, 0x02};
     final ByteBuffer buf3 = ByteBuffer.wrap(garbage).order(ByteOrder.LITTLE_ENDIAN);
     Assert.assertThrows(
-        IndexOutOfBoundsException.class,
+        Exception.class,
         () -> objectStrategy.fromByteBufferSafe(buf3, garbage.length).toByteArray()
     );
   }
diff --git a/extensions-core/datasketches/src/test/java/org/apache/druid/query/aggregation/datasketches/kll/KllFloatsSketchComplexMetricSerdeTest.java b/extensions-core/datasketches/src/test/java/org/apache/druid/query/aggregation/datasketches/kll/KllFloatsSketchComplexMetricSerdeTest.java
index 56a39778990..ee505fe65b8 100644
--- a/extensions-core/datasketches/src/test/java/org/apache/druid/query/aggregation/datasketches/kll/KllFloatsSketchComplexMetricSerdeTest.java
+++ b/extensions-core/datasketches/src/test/java/org/apache/druid/query/aggregation/datasketches/kll/KllFloatsSketchComplexMetricSerdeTest.java
@@ -123,7 +123,7 @@ public class KllFloatsSketchComplexMetricSerdeTest
 
       final ByteBuffer buf2 = ByteBuffer.wrap(garbage2).order(ByteOrder.LITTLE_ENDIAN);
       Assert.assertThrows(
-          IndexOutOfBoundsException.class,
+          Exception.class,
           () -> objectStrategy.fromByteBufferSafe(buf2, garbage2.length).toByteArray()
       );
     }
@@ -132,7 +132,7 @@ public class KllFloatsSketchComplexMetricSerdeTest
     final byte[] garbage = new byte[]{0x01, 0x02};
     final ByteBuffer buf3 = ByteBuffer.wrap(garbage).order(ByteOrder.LITTLE_ENDIAN);
     Assert.assertThrows(
-        IndexOutOfBoundsException.class,
+        Exception.class,
         () -> objectStrategy.fromByteBufferSafe(buf3, garbage.length).toByteArray()
     );
   }
diff --git a/extensions-core/multi-stage-query/src/main/java/org/apache/druid/msq/statistics/QuantilesSketchKeyCollectorFactory.java b/extensions-core/multi-stage-query/src/main/java/org/apache/druid/msq/statistics/QuantilesSketchKeyCollectorFactory.java
index 3192813cfe1..0c858983875 100644
--- a/extensions-core/multi-stage-query/src/main/java/org/apache/druid/msq/statistics/QuantilesSketchKeyCollectorFactory.java
+++ b/extensions-core/multi-stage-query/src/main/java/org/apache/druid/msq/statistics/QuantilesSketchKeyCollectorFactory.java
@@ -24,6 +24,7 @@ import com.fasterxml.jackson.databind.DeserializationContext;
 import com.fasterxml.jackson.databind.JsonDeserializer;
 import com.google.common.annotations.VisibleForTesting;
 import org.apache.datasketches.common.ArrayOfItemsSerDe;
+import org.apache.datasketches.common.ByteArrayUtil;
 import org.apache.datasketches.memory.Memory;
 import org.apache.datasketches.memory.WritableMemory;
 import org.apache.datasketches.quantiles.ItemsSketch;
@@ -126,22 +127,66 @@ public class QuantilesSketchKeyCollectorFactory
     }
 
     @Override
-    public byte[][] deserializeFromMemory(final Memory mem, final int numItems)
+    public byte[][] deserializeFromMemory(final Memory mem, long offsetBytes, final int numItems)
     {
       final byte[][] keys = new byte[numItems][];
-      long keyPosition = (long) Integer.BYTES * numItems;
+      final long start = offsetBytes;
+      offsetBytes += (long) Integer.BYTES * numItems;
 
       for (int i = 0; i < numItems; i++) {
-        final int keyLength = mem.getInt((long) Integer.BYTES * i);
+        final int keyLength = mem.getInt(start + Integer.BYTES * i);
         final byte[] keyBytes = new byte[keyLength];
 
-        mem.getByteArray(keyPosition, keyBytes, 0, keyLength);
+        mem.getByteArray(offsetBytes, keyBytes, 0, keyLength);
         keys[i] = keyBytes;
 
-        keyPosition += keyLength;
+        offsetBytes += keyLength;
       }
 
       return keys;
     }
+
+    @Override
+    public byte[] serializeToByteArray(final byte[] item)
+    {
+      final byte[] bytes = new byte[Integer.BYTES + item.length];
+      ByteArrayUtil.putIntLE(bytes, 0, item.length);
+      ByteArrayUtil.copyBytes(item, 0, bytes, Integer.BYTES, item.length);
+      return bytes;
+    }
+
+    @Override
+    public byte[][] deserializeFromMemory(final Memory mem, final int numItems)
+    {
+      return deserializeFromMemory(mem, 0, numItems);
+    }
+
+    @Override
+    public int sizeOf(final byte[] item)
+    {
+      return Integer.BYTES + item.length;
+    }
+
+    @Override
+    public int sizeOf(final Memory mem, long offsetBytes, final int numItems)
+    {
+      int length = Integer.BYTES * numItems;
+      for (int i = 0; i < numItems; i++) {
+        length = mem.getInt(offsetBytes + Integer.BYTES * i);
+      }
+      return length;
+    }
+
+    @Override
+    public String toString(final byte[] item)
+    {
+      return item.toString();
+    }
+
+    @Override
+    public Class<?> getClassOfT()
+    {
+      return byte[].class;
+    }
   }
 }
diff --git a/indexing-service/src/main/java/org/apache/druid/indexing/common/task/batch/parallel/distribution/ArrayOfStringTuplesSerDe.java b/indexing-service/src/main/java/org/apache/druid/indexing/common/task/batch/parallel/distribution/ArrayOfStringTuplesSerDe.java
index e3f76b5b92f..178e191652e 100644
--- a/indexing-service/src/main/java/org/apache/druid/indexing/common/task/batch/parallel/distribution/ArrayOfStringTuplesSerDe.java
+++ b/indexing-service/src/main/java/org/apache/druid/indexing/common/task/batch/parallel/distribution/ArrayOfStringTuplesSerDe.java
@@ -21,8 +21,9 @@ package org.apache.druid.indexing.common.task.batch.parallel.distribution;
 
 import org.apache.datasketches.common.ArrayOfItemsSerDe;
 import org.apache.datasketches.common.ArrayOfStringsSerDe;
+import org.apache.datasketches.common.ByteArrayUtil;
+import org.apache.datasketches.common.Util;
 import org.apache.datasketches.memory.Memory;
-import org.apache.datasketches.memory.WritableMemory;
 import org.apache.datasketches.memory.internal.UnsafeUtil;
 import org.apache.druid.data.input.StringTuple;
 
@@ -36,7 +37,7 @@ public class ArrayOfStringTuplesSerDe extends ArrayOfItemsSerDe<StringTuple>
   private static final ArrayOfStringsNullSafeSerde STRINGS_SERDE = new ArrayOfStringsNullSafeSerde();
 
   @Override
-  public byte[] serializeToByteArray(StringTuple[] items)
+  public byte[] serializeToByteArray(final StringTuple[] items)
   {
     int length = 0;
     final byte[][] itemsBytes = new byte[items.length][];
@@ -49,29 +50,27 @@ public class ArrayOfStringTuplesSerDe extends ArrayOfItemsSerDe<StringTuple>
     }
 
     final byte[] bytes = new byte[length];
-    final WritableMemory mem = WritableMemory.writableWrap(bytes);
-    long offsetBytes = 0;
+    int offsetBytes = 0;
     for (int i = 0; i < items.length; i++) {
       // Add the number of items in the StringTuple
-      mem.putInt(offsetBytes, items[i].size());
+      ByteArrayUtil.putIntLE(bytes, offsetBytes, items[i].size());
       offsetBytes += Integer.BYTES;
 
       // Add the size of byte content for the StringTuple
-      mem.putInt(offsetBytes, itemsBytes[i].length);
+      ByteArrayUtil.putIntLE(bytes, offsetBytes, itemsBytes[i].length);
       offsetBytes += Integer.BYTES;
 
       // Add the byte contents of the StringTuple
-      mem.putByteArray(offsetBytes, itemsBytes[i], 0, itemsBytes[i].length);
+      ByteArrayUtil.copyBytes(itemsBytes[i], 0, bytes, offsetBytes, itemsBytes[i].length);
       offsetBytes += itemsBytes[i].length;
     }
     return bytes;
   }
 
   @Override
-  public StringTuple[] deserializeFromMemory(Memory mem, int numItems)
+  public StringTuple[] deserializeFromMemory(final Memory mem, long offsetBytes, final int numItems)
   {
     final StringTuple[] array = new StringTuple[numItems];
-    long offsetBytes = 0;
     for (int i = 0; i < numItems; i++) {
       // Read the number of items in the StringTuple
       UnsafeUtil.checkBounds(offsetBytes, Integer.BYTES, mem.getCapacity());
@@ -96,4 +95,67 @@ public class ArrayOfStringTuplesSerDe extends ArrayOfItemsSerDe<StringTuple>
     }
     return array;
   }
+
+  @Override
+  public byte[] serializeToByteArray(final StringTuple item)
+  {
+    final byte[] itemBytes = STRINGS_SERDE.serializeToByteArray(item.toArray());
+    final byte[] bytes = new byte[Integer.BYTES * 2 + itemBytes.length];
+    int offsetBytes = 0;
+    ByteArrayUtil.putIntLE(bytes, offsetBytes, item.size());
+    offsetBytes += Integer.BYTES;
+    ByteArrayUtil.putIntLE(bytes, offsetBytes, itemBytes.length);
+    offsetBytes += Integer.BYTES;
+    ByteArrayUtil.copyBytes(itemBytes, 0, bytes, offsetBytes, itemBytes.length);
+    return bytes;
+  }
+
+  @Override
+  public StringTuple[] deserializeFromMemory(final Memory mem, final int numItems)
+  {
+    return deserializeFromMemory(mem, 0, numItems);
+  }
+
+  @Override
+  public int sizeOf(final StringTuple item)
+  {
+    int length = Integer.BYTES;
+    for (final String s : item.toArray()) {
+      length += STRINGS_SERDE.sizeOf(s);
+    }
+    return length;
+  }
+
+  @Override
+  public int sizeOf(final Memory mem, long offsetBytes, final int numItems)
+  {
+    final long start = offsetBytes;
+    for (int i = 0; i < numItems; i++) {
+      // Skip the number of items in the StringTuple
+      Util.checkBounds(offsetBytes, Integer.BYTES, mem.getCapacity());
+      offsetBytes += Integer.BYTES;
+
+      // Read the size of byte content
+      Util.checkBounds(offsetBytes, Integer.BYTES, mem.getCapacity());
+      final int byteContentSize = mem.getInt(offsetBytes);
+      offsetBytes += Integer.BYTES;
+
+      // Skip the byte content
+      Util.checkBounds(offsetBytes, byteContentSize, mem.getCapacity());
+      offsetBytes += byteContentSize;
+    }
+    return (int) (offsetBytes - start);
+  }
+
+  @Override
+  public String toString(final StringTuple item)
+  {
+    return item.toString();
+  }
+
+  @Override
+  public Class<?> getClassOfT()
+  {
+    return StringTuple.class;
+  }
 }
diff --git a/indexing-service/src/main/java/org/apache/druid/indexing/common/task/batch/parallel/distribution/ArrayOfStringsNullSafeSerde.java b/indexing-service/src/main/java/org/apache/druid/indexing/common/task/batch/parallel/distribution/ArrayOfStringsNullSafeSerde.java
index b5a8393b172..406121c4086 100644
--- a/indexing-service/src/main/java/org/apache/druid/indexing/common/task/batch/parallel/distribution/ArrayOfStringsNullSafeSerde.java
+++ b/indexing-service/src/main/java/org/apache/druid/indexing/common/task/batch/parallel/distribution/ArrayOfStringsNullSafeSerde.java
@@ -19,7 +19,6 @@
 
 package org.apache.druid.indexing.common.task.batch.parallel.distribution;
 
-import org.apache.datasketches.common.ArrayOfItemsSerDe;
 import org.apache.datasketches.common.ArrayOfStringsSerDe;
 import org.apache.datasketches.common.Util;
 import org.apache.datasketches.memory.Memory;
@@ -35,7 +34,7 @@ import java.nio.charset.StandardCharsets;
  * The implementation is the same as {@link ArrayOfStringsSerDe}, except this
  * class handles null String values as well.
  */
-public class ArrayOfStringsNullSafeSerde extends ArrayOfItemsSerDe<String>
+public class ArrayOfStringsNullSafeSerde extends ArrayOfStringsSerDe
 {
 
   private static final int NULL_STRING_LENGTH = -1;
diff --git a/licenses.yaml b/licenses.yaml
index a207fb1b0ac..eb2d63dd245 100644
--- a/licenses.yaml
+++ b/licenses.yaml
@@ -3477,7 +3477,7 @@ name: DataSketches
 license_category: binary
 module: java-core
 license_name: Apache License version 2.0
-version: 4.1.0
+version: 4.2.0
 libraries:
   - org.apache.datasketches: datasketches-java
 
diff --git a/pom.xml b/pom.xml
index 26f35353436..e9431770dbf 100644
--- a/pom.xml
+++ b/pom.xml
@@ -86,7 +86,7 @@
              default_config.fmpp
           -->
         <calcite.version>1.35.0</calcite.version>
-        <datasketches.version>4.1.0</datasketches.version>
+        <datasketches.version>4.2.0</datasketches.version>
         <datasketches.memory.version>2.2.0</datasketches.memory.version>
         <derby.version>10.14.2.0</derby.version>
         <dropwizard.metrics.version>4.2.19</dropwizard.metrics.version>


---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscribe@druid.apache.org
For additional commands, e-mail: commits-help@druid.apache.org