You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@druid.apache.org by al...@apache.org on 2023/10/24 00:22:44 UTC
[druid] 01/01: use datasketches-java 4.2.0
This is an automated email from the ASF dual-hosted git repository.
alsay pushed a commit to branch datasketches-4.2.0
in repository https://gitbox.apache.org/repos/asf/druid.git
commit fa5daf5217390bea163bebe1c9cfe80a2ed6ac38
Author: AlexanderSaydakov <Al...@users.noreply.github.com>
AuthorDate: Mon Oct 23 17:22:32 2023 -0700
use datasketches-java 4.2.0
---
.../kll/KllDoublesSketchAggregatorFactory.java | 3 +-
.../kll/KllFloatsSketchAggregatorFactory.java | 3 +-
.../KllDoublesSketchComplexMetricSerdeTest.java | 6 +-
.../kll/KllFloatsSketchComplexMetricSerdeTest.java | 4 +-
.../QuantilesSketchKeyCollectorFactory.java | 55 +++++++++++++--
.../distribution/ArrayOfStringTuplesSerDe.java | 80 +++++++++++++++++++---
.../distribution/ArrayOfStringsNullSafeSerde.java | 3 +-
licenses.yaml | 2 +-
pom.xml | 2 +-
9 files changed, 133 insertions(+), 25 deletions(-)
diff --git a/extensions-core/datasketches/src/main/java/org/apache/druid/query/aggregation/datasketches/kll/KllDoublesSketchAggregatorFactory.java b/extensions-core/datasketches/src/main/java/org/apache/druid/query/aggregation/datasketches/kll/KllDoublesSketchAggregatorFactory.java
index 815227adf55..8c6dacbdf39 100644
--- a/extensions-core/datasketches/src/main/java/org/apache/druid/query/aggregation/datasketches/kll/KllDoublesSketchAggregatorFactory.java
+++ b/extensions-core/datasketches/src/main/java/org/apache/druid/query/aggregation/datasketches/kll/KllDoublesSketchAggregatorFactory.java
@@ -22,6 +22,7 @@ package org.apache.druid.query.aggregation.datasketches.kll;
import com.fasterxml.jackson.annotation.JsonCreator;
import com.fasterxml.jackson.annotation.JsonProperty;
import org.apache.datasketches.kll.KllDoublesSketch;
+import org.apache.datasketches.kll.KllSketch.SketchType;
import org.apache.druid.query.aggregation.AggregatorFactory;
import org.apache.druid.query.aggregation.AggregatorFactoryNotMergeableException;
import org.apache.druid.query.aggregation.AggregatorUtil;
@@ -124,7 +125,7 @@ public class KllDoublesSketchAggregatorFactory extends KllSketchAggregatorFactor
@Override
int getMaxSerializedSizeBytes(final int k, final long n)
{
- return KllDoublesSketch.getMaxSerializedSizeBytes(k, n, true);
+ return KllDoublesSketch.getMaxSerializedSizeBytes(k, n, SketchType.DOUBLES_SKETCH, true);
}
@Override
diff --git a/extensions-core/datasketches/src/main/java/org/apache/druid/query/aggregation/datasketches/kll/KllFloatsSketchAggregatorFactory.java b/extensions-core/datasketches/src/main/java/org/apache/druid/query/aggregation/datasketches/kll/KllFloatsSketchAggregatorFactory.java
index 9cc61524615..5620921621f 100644
--- a/extensions-core/datasketches/src/main/java/org/apache/druid/query/aggregation/datasketches/kll/KllFloatsSketchAggregatorFactory.java
+++ b/extensions-core/datasketches/src/main/java/org/apache/druid/query/aggregation/datasketches/kll/KllFloatsSketchAggregatorFactory.java
@@ -22,6 +22,7 @@ package org.apache.druid.query.aggregation.datasketches.kll;
import com.fasterxml.jackson.annotation.JsonCreator;
import com.fasterxml.jackson.annotation.JsonProperty;
import org.apache.datasketches.kll.KllFloatsSketch;
+import org.apache.datasketches.kll.KllSketch.SketchType;
import org.apache.druid.query.aggregation.AggregatorFactory;
import org.apache.druid.query.aggregation.AggregatorFactoryNotMergeableException;
import org.apache.druid.query.aggregation.AggregatorUtil;
@@ -124,7 +125,7 @@ public class KllFloatsSketchAggregatorFactory extends KllSketchAggregatorFactory
@Override
int getMaxSerializedSizeBytes(final int k, final long n)
{
- return KllFloatsSketch.getMaxSerializedSizeBytes(k, n, true);
+ return KllFloatsSketch.getMaxSerializedSizeBytes(k, n, SketchType.FLOATS_SKETCH, true);
}
@Override
diff --git a/extensions-core/datasketches/src/test/java/org/apache/druid/query/aggregation/datasketches/kll/KllDoublesSketchComplexMetricSerdeTest.java b/extensions-core/datasketches/src/test/java/org/apache/druid/query/aggregation/datasketches/kll/KllDoublesSketchComplexMetricSerdeTest.java
index d0a26307990..730fb54c541 100644
--- a/extensions-core/datasketches/src/test/java/org/apache/druid/query/aggregation/datasketches/kll/KllDoublesSketchComplexMetricSerdeTest.java
+++ b/extensions-core/datasketches/src/test/java/org/apache/druid/query/aggregation/datasketches/kll/KllDoublesSketchComplexMetricSerdeTest.java
@@ -114,7 +114,7 @@ public class KllDoublesSketchComplexMetricSerdeTest
objectStrategy.fromByteBufferSafe(buf, bytes.length).toByteArray();
// corrupted sketch should fail with a regular java buffer exception, not all subsets actually fail with the same
- // index out of bounds exceptions, but at least this many do
+ // sketches exceptions, but at least this many do
for (int subset = 3; subset < 24; subset++) {
final byte[] garbage2 = new byte[subset];
for (int i = 0; i < garbage2.length; i++) {
@@ -123,7 +123,7 @@ public class KllDoublesSketchComplexMetricSerdeTest
final ByteBuffer buf2 = ByteBuffer.wrap(garbage2).order(ByteOrder.LITTLE_ENDIAN);
Assert.assertThrows(
- IndexOutOfBoundsException.class,
+ Exception.class,
() -> objectStrategy.fromByteBufferSafe(buf2, garbage2.length).toByteArray()
);
}
@@ -132,7 +132,7 @@ public class KllDoublesSketchComplexMetricSerdeTest
final byte[] garbage = new byte[]{0x01, 0x02};
final ByteBuffer buf3 = ByteBuffer.wrap(garbage).order(ByteOrder.LITTLE_ENDIAN);
Assert.assertThrows(
- IndexOutOfBoundsException.class,
+ Exception.class,
() -> objectStrategy.fromByteBufferSafe(buf3, garbage.length).toByteArray()
);
}
diff --git a/extensions-core/datasketches/src/test/java/org/apache/druid/query/aggregation/datasketches/kll/KllFloatsSketchComplexMetricSerdeTest.java b/extensions-core/datasketches/src/test/java/org/apache/druid/query/aggregation/datasketches/kll/KllFloatsSketchComplexMetricSerdeTest.java
index 56a39778990..ee505fe65b8 100644
--- a/extensions-core/datasketches/src/test/java/org/apache/druid/query/aggregation/datasketches/kll/KllFloatsSketchComplexMetricSerdeTest.java
+++ b/extensions-core/datasketches/src/test/java/org/apache/druid/query/aggregation/datasketches/kll/KllFloatsSketchComplexMetricSerdeTest.java
@@ -123,7 +123,7 @@ public class KllFloatsSketchComplexMetricSerdeTest
final ByteBuffer buf2 = ByteBuffer.wrap(garbage2).order(ByteOrder.LITTLE_ENDIAN);
Assert.assertThrows(
- IndexOutOfBoundsException.class,
+ Exception.class,
() -> objectStrategy.fromByteBufferSafe(buf2, garbage2.length).toByteArray()
);
}
@@ -132,7 +132,7 @@ public class KllFloatsSketchComplexMetricSerdeTest
final byte[] garbage = new byte[]{0x01, 0x02};
final ByteBuffer buf3 = ByteBuffer.wrap(garbage).order(ByteOrder.LITTLE_ENDIAN);
Assert.assertThrows(
- IndexOutOfBoundsException.class,
+ Exception.class,
() -> objectStrategy.fromByteBufferSafe(buf3, garbage.length).toByteArray()
);
}
diff --git a/extensions-core/multi-stage-query/src/main/java/org/apache/druid/msq/statistics/QuantilesSketchKeyCollectorFactory.java b/extensions-core/multi-stage-query/src/main/java/org/apache/druid/msq/statistics/QuantilesSketchKeyCollectorFactory.java
index 3192813cfe1..0c858983875 100644
--- a/extensions-core/multi-stage-query/src/main/java/org/apache/druid/msq/statistics/QuantilesSketchKeyCollectorFactory.java
+++ b/extensions-core/multi-stage-query/src/main/java/org/apache/druid/msq/statistics/QuantilesSketchKeyCollectorFactory.java
@@ -24,6 +24,7 @@ import com.fasterxml.jackson.databind.DeserializationContext;
import com.fasterxml.jackson.databind.JsonDeserializer;
import com.google.common.annotations.VisibleForTesting;
import org.apache.datasketches.common.ArrayOfItemsSerDe;
+import org.apache.datasketches.common.ByteArrayUtil;
import org.apache.datasketches.memory.Memory;
import org.apache.datasketches.memory.WritableMemory;
import org.apache.datasketches.quantiles.ItemsSketch;
@@ -126,22 +127,66 @@ public class QuantilesSketchKeyCollectorFactory
}
@Override
- public byte[][] deserializeFromMemory(final Memory mem, final int numItems)
+ public byte[][] deserializeFromMemory(final Memory mem, long offsetBytes, final int numItems)
{
final byte[][] keys = new byte[numItems][];
- long keyPosition = (long) Integer.BYTES * numItems;
+ final long start = offsetBytes;
+ offsetBytes += (long) Integer.BYTES * numItems;
for (int i = 0; i < numItems; i++) {
- final int keyLength = mem.getInt((long) Integer.BYTES * i);
+ final int keyLength = mem.getInt(start + Integer.BYTES * i);
final byte[] keyBytes = new byte[keyLength];
- mem.getByteArray(keyPosition, keyBytes, 0, keyLength);
+ mem.getByteArray(offsetBytes, keyBytes, 0, keyLength);
keys[i] = keyBytes;
- keyPosition += keyLength;
+ offsetBytes += keyLength;
}
return keys;
}
+
+ @Override
+ public byte[] serializeToByteArray(final byte[] item)
+ {
+ final byte[] bytes = new byte[Integer.BYTES + item.length];
+ ByteArrayUtil.putIntLE(bytes, 0, item.length);
+ ByteArrayUtil.copyBytes(item, 0, bytes, Integer.BYTES, item.length);
+ return bytes;
+ }
+
+ @Override
+ public byte[][] deserializeFromMemory(final Memory mem, final int numItems)
+ {
+ return deserializeFromMemory(mem, 0, numItems);
+ }
+
+ @Override
+ public int sizeOf(final byte[] item)
+ {
+ return Integer.BYTES + item.length;
+ }
+
+ @Override
+ public int sizeOf(final Memory mem, long offsetBytes, final int numItems)
+ {
+ int length = Integer.BYTES * numItems;
+ for (int i = 0; i < numItems; i++) {
+ length = mem.getInt(offsetBytes + Integer.BYTES * i);
+ }
+ return length;
+ }
+
+ @Override
+ public String toString(final byte[] item)
+ {
+ return item.toString();
+ }
+
+ @Override
+ public Class<?> getClassOfT()
+ {
+ return byte[].class;
+ }
}
}
diff --git a/indexing-service/src/main/java/org/apache/druid/indexing/common/task/batch/parallel/distribution/ArrayOfStringTuplesSerDe.java b/indexing-service/src/main/java/org/apache/druid/indexing/common/task/batch/parallel/distribution/ArrayOfStringTuplesSerDe.java
index e3f76b5b92f..178e191652e 100644
--- a/indexing-service/src/main/java/org/apache/druid/indexing/common/task/batch/parallel/distribution/ArrayOfStringTuplesSerDe.java
+++ b/indexing-service/src/main/java/org/apache/druid/indexing/common/task/batch/parallel/distribution/ArrayOfStringTuplesSerDe.java
@@ -21,8 +21,9 @@ package org.apache.druid.indexing.common.task.batch.parallel.distribution;
import org.apache.datasketches.common.ArrayOfItemsSerDe;
import org.apache.datasketches.common.ArrayOfStringsSerDe;
+import org.apache.datasketches.common.ByteArrayUtil;
+import org.apache.datasketches.common.Util;
import org.apache.datasketches.memory.Memory;
-import org.apache.datasketches.memory.WritableMemory;
import org.apache.datasketches.memory.internal.UnsafeUtil;
import org.apache.druid.data.input.StringTuple;
@@ -36,7 +37,7 @@ public class ArrayOfStringTuplesSerDe extends ArrayOfItemsSerDe<StringTuple>
private static final ArrayOfStringsNullSafeSerde STRINGS_SERDE = new ArrayOfStringsNullSafeSerde();
@Override
- public byte[] serializeToByteArray(StringTuple[] items)
+ public byte[] serializeToByteArray(final StringTuple[] items)
{
int length = 0;
final byte[][] itemsBytes = new byte[items.length][];
@@ -49,29 +50,27 @@ public class ArrayOfStringTuplesSerDe extends ArrayOfItemsSerDe<StringTuple>
}
final byte[] bytes = new byte[length];
- final WritableMemory mem = WritableMemory.writableWrap(bytes);
- long offsetBytes = 0;
+ int offsetBytes = 0;
for (int i = 0; i < items.length; i++) {
// Add the number of items in the StringTuple
- mem.putInt(offsetBytes, items[i].size());
+ ByteArrayUtil.putIntLE(bytes, offsetBytes, items[i].size());
offsetBytes += Integer.BYTES;
// Add the size of byte content for the StringTuple
- mem.putInt(offsetBytes, itemsBytes[i].length);
+ ByteArrayUtil.putIntLE(bytes, offsetBytes, itemsBytes[i].length);
offsetBytes += Integer.BYTES;
// Add the byte contents of the StringTuple
- mem.putByteArray(offsetBytes, itemsBytes[i], 0, itemsBytes[i].length);
+ ByteArrayUtil.copyBytes(itemsBytes[i], 0, bytes, offsetBytes, itemsBytes[i].length);
offsetBytes += itemsBytes[i].length;
}
return bytes;
}
@Override
- public StringTuple[] deserializeFromMemory(Memory mem, int numItems)
+ public StringTuple[] deserializeFromMemory(final Memory mem, long offsetBytes, final int numItems)
{
final StringTuple[] array = new StringTuple[numItems];
- long offsetBytes = 0;
for (int i = 0; i < numItems; i++) {
// Read the number of items in the StringTuple
UnsafeUtil.checkBounds(offsetBytes, Integer.BYTES, mem.getCapacity());
@@ -96,4 +95,67 @@ public class ArrayOfStringTuplesSerDe extends ArrayOfItemsSerDe<StringTuple>
}
return array;
}
+
+ @Override
+ public byte[] serializeToByteArray(final StringTuple item)
+ {
+ final byte[] itemBytes = STRINGS_SERDE.serializeToByteArray(item.toArray());
+ final byte[] bytes = new byte[Integer.BYTES * 2 + itemBytes.length];
+ int offsetBytes = 0;
+ ByteArrayUtil.putIntLE(bytes, offsetBytes, item.size());
+ offsetBytes += Integer.BYTES;
+ ByteArrayUtil.putIntLE(bytes, offsetBytes, itemBytes.length);
+ offsetBytes += Integer.BYTES;
+ ByteArrayUtil.copyBytes(itemBytes, 0, bytes, offsetBytes, itemBytes.length);
+ return bytes;
+ }
+
+ @Override
+ public StringTuple[] deserializeFromMemory(final Memory mem, final int numItems)
+ {
+ return deserializeFromMemory(mem, 0, numItems);
+ }
+
+ @Override
+ public int sizeOf(final StringTuple item)
+ {
+ int length = Integer.BYTES;
+ for (final String s : item.toArray()) {
+ length += STRINGS_SERDE.sizeOf(s);
+ }
+ return length;
+ }
+
+ @Override
+ public int sizeOf(final Memory mem, long offsetBytes, final int numItems)
+ {
+ final long start = offsetBytes;
+ for (int i = 0; i < numItems; i++) {
+ // Skip the number of items in the StringTuple
+ Util.checkBounds(offsetBytes, Integer.BYTES, mem.getCapacity());
+ offsetBytes += Integer.BYTES;
+
+ // Read the size of byte content
+ Util.checkBounds(offsetBytes, Integer.BYTES, mem.getCapacity());
+ final int byteContentSize = mem.getInt(offsetBytes);
+ offsetBytes += Integer.BYTES;
+
+ // Skip the byte content
+ Util.checkBounds(offsetBytes, byteContentSize, mem.getCapacity());
+ offsetBytes += byteContentSize;
+ }
+ return (int) (offsetBytes - start);
+ }
+
+ @Override
+ public String toString(final StringTuple item)
+ {
+ return item.toString();
+ }
+
+ @Override
+ public Class<?> getClassOfT()
+ {
+ return StringTuple.class;
+ }
}
diff --git a/indexing-service/src/main/java/org/apache/druid/indexing/common/task/batch/parallel/distribution/ArrayOfStringsNullSafeSerde.java b/indexing-service/src/main/java/org/apache/druid/indexing/common/task/batch/parallel/distribution/ArrayOfStringsNullSafeSerde.java
index b5a8393b172..406121c4086 100644
--- a/indexing-service/src/main/java/org/apache/druid/indexing/common/task/batch/parallel/distribution/ArrayOfStringsNullSafeSerde.java
+++ b/indexing-service/src/main/java/org/apache/druid/indexing/common/task/batch/parallel/distribution/ArrayOfStringsNullSafeSerde.java
@@ -19,7 +19,6 @@
package org.apache.druid.indexing.common.task.batch.parallel.distribution;
-import org.apache.datasketches.common.ArrayOfItemsSerDe;
import org.apache.datasketches.common.ArrayOfStringsSerDe;
import org.apache.datasketches.common.Util;
import org.apache.datasketches.memory.Memory;
@@ -35,7 +34,7 @@ import java.nio.charset.StandardCharsets;
* The implementation is the same as {@link ArrayOfStringsSerDe}, except this
* class handles null String values as well.
*/
-public class ArrayOfStringsNullSafeSerde extends ArrayOfItemsSerDe<String>
+public class ArrayOfStringsNullSafeSerde extends ArrayOfStringsSerDe
{
private static final int NULL_STRING_LENGTH = -1;
diff --git a/licenses.yaml b/licenses.yaml
index a207fb1b0ac..eb2d63dd245 100644
--- a/licenses.yaml
+++ b/licenses.yaml
@@ -3477,7 +3477,7 @@ name: DataSketches
license_category: binary
module: java-core
license_name: Apache License version 2.0
-version: 4.1.0
+version: 4.2.0
libraries:
- org.apache.datasketches: datasketches-java
diff --git a/pom.xml b/pom.xml
index 26f35353436..e9431770dbf 100644
--- a/pom.xml
+++ b/pom.xml
@@ -86,7 +86,7 @@
default_config.fmpp
-->
<calcite.version>1.35.0</calcite.version>
- <datasketches.version>4.1.0</datasketches.version>
+ <datasketches.version>4.2.0</datasketches.version>
<datasketches.memory.version>2.2.0</datasketches.memory.version>
<derby.version>10.14.2.0</derby.version>
<dropwizard.metrics.version>4.2.19</dropwizard.metrics.version>
---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscribe@druid.apache.org
For additional commands, e-mail: commits-help@druid.apache.org