You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@druid.apache.org by cw...@apache.org on 2019/04/24 22:14:02 UTC

[incubator-druid] 01/20: Adjust BufferAggregator.get() impls to return copies (#7464)

This is an automated email from the ASF dual-hosted git repository.

cwylie pushed a commit to branch 0.14.1-incubating
in repository https://gitbox.apache.org/repos/asf/incubator-druid.git

commit 76b7998a130f12f28b9918af03b7b236706afabe
Author: Jonathan Wei <jo...@users.noreply.github.com>
AuthorDate: Fri Apr 12 19:04:07 2019 -0700

    Adjust BufferAggregator.get() impls to return copies (#7464)
    
    * Adjust BufferAggregator.get() impls to return copies
    
    * Update BufferAggregator docs, more agg fixes
    
    * Update BufferAggregator get() doc
---
 .../datasketches/hll/HllSketchBuildBufferAggregator.java      |  2 +-
 .../quantiles/DoublesSketchBuildBufferAggregator.java         |  2 +-
 .../aggregation/bloom/BaseBloomFilterBufferAggregator.java    |  6 +++++-
 .../org/apache/druid/query/aggregation/BufferAggregator.java  | 11 ++++++++++-
 4 files changed, 17 insertions(+), 4 deletions(-)

diff --git a/extensions-core/datasketches/src/main/java/org/apache/druid/query/aggregation/datasketches/hll/HllSketchBuildBufferAggregator.java b/extensions-core/datasketches/src/main/java/org/apache/druid/query/aggregation/datasketches/hll/HllSketchBuildBufferAggregator.java
index 0ec525e..ce15821 100644
--- a/extensions-core/datasketches/src/main/java/org/apache/druid/query/aggregation/datasketches/hll/HllSketchBuildBufferAggregator.java
+++ b/extensions-core/datasketches/src/main/java/org/apache/druid/query/aggregation/datasketches/hll/HllSketchBuildBufferAggregator.java
@@ -108,7 +108,7 @@ public class HllSketchBuildBufferAggregator implements BufferAggregator
     final Lock lock = stripedLock.getAt(lockIndex(position)).readLock();
     lock.lock();
     try {
-      return sketchCache.get(buf).get(position);
+      return sketchCache.get(buf).get(position).copy();
     }
     finally {
       lock.unlock();
diff --git a/extensions-core/datasketches/src/main/java/org/apache/druid/query/aggregation/datasketches/quantiles/DoublesSketchBuildBufferAggregator.java b/extensions-core/datasketches/src/main/java/org/apache/druid/query/aggregation/datasketches/quantiles/DoublesSketchBuildBufferAggregator.java
index ead9a6a..609a46e 100644
--- a/extensions-core/datasketches/src/main/java/org/apache/druid/query/aggregation/datasketches/quantiles/DoublesSketchBuildBufferAggregator.java
+++ b/extensions-core/datasketches/src/main/java/org/apache/druid/query/aggregation/datasketches/quantiles/DoublesSketchBuildBufferAggregator.java
@@ -69,7 +69,7 @@ public class DoublesSketchBuildBufferAggregator implements BufferAggregator
   @Override
   public synchronized Object get(final ByteBuffer buffer, final int position)
   {
-    return sketches.get(buffer).get(position);
+    return sketches.get(buffer).get(position).compact();
   }
 
   @Override
diff --git a/extensions-core/druid-bloom-filter/src/main/java/org/apache/druid/query/aggregation/bloom/BaseBloomFilterBufferAggregator.java b/extensions-core/druid-bloom-filter/src/main/java/org/apache/druid/query/aggregation/bloom/BaseBloomFilterBufferAggregator.java
index 74def15..ff866f9 100644
--- a/extensions-core/druid-bloom-filter/src/main/java/org/apache/druid/query/aggregation/bloom/BaseBloomFilterBufferAggregator.java
+++ b/extensions-core/druid-bloom-filter/src/main/java/org/apache/druid/query/aggregation/bloom/BaseBloomFilterBufferAggregator.java
@@ -66,7 +66,11 @@ public abstract class BaseBloomFilterBufferAggregator<TSelector extends BaseNull
     // | k (byte) | numLongs (int) | bitset (long[numLongs]) |
     int sizeBytes = 1 + Integer.BYTES + (buf.getInt(position + 1) * Long.BYTES);
     mutationBuffer.limit(position + sizeBytes);
-    return mutationBuffer.slice();
+
+    ByteBuffer resultCopy = ByteBuffer.allocate(sizeBytes);
+    resultCopy.put(mutationBuffer.slice());
+    resultCopy.rewind();
+    return resultCopy;
   }
 
   @Override
diff --git a/processing/src/main/java/org/apache/druid/query/aggregation/BufferAggregator.java b/processing/src/main/java/org/apache/druid/query/aggregation/BufferAggregator.java
index ecd0c11..ed77c91 100644
--- a/processing/src/main/java/org/apache/druid/query/aggregation/BufferAggregator.java
+++ b/processing/src/main/java/org/apache/druid/query/aggregation/BufferAggregator.java
@@ -72,7 +72,16 @@ public interface BufferAggregator extends HotLoopCallee
    *
    * Converts the given byte buffer representation into an intermediate aggregate Object
    *
-   * <b>Implementations must not change the position, limit or mark of the given buffer</b>
+   * <b>Implementations must not change the position, limit or mark of the given buffer.</b>
+   *
+   * <b>
+   * The object returned must not have any references to the given buffer (i.e., make a copy), since the
+   * underlying buffer is a shared resource and may be given to another processing thread
+   * while the objects returned by this aggregator are still in use.
+   * </b>
+   *
+   * <b>If the corresponding {@link AggregatorFactory#combine(Object, Object)} method for this aggregator
+   * expects its inputs to be mutable, then the object returned by this method must be mutable.</b>
    *
    * @param buf byte buffer storing the byte array representation of the aggregate
    * @param position offset within the byte buffer at which the aggregate value is stored


---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscribe@druid.apache.org
For additional commands, e-mail: commits-help@druid.apache.org