You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@druid.apache.org by GitBox <gi...@apache.org> on 2018/08/03 16:48:48 UTC

[GitHub] himanshug commented on a change in pull request #6016: Druid 'Shapeshifting' Columns

himanshug commented on a change in pull request #6016: Druid 'Shapeshifting' Columns
URL: https://github.com/apache/incubator-druid/pull/6016#discussion_r207604617
 
 

 ##########
 File path: processing/src/main/java/io/druid/segment/CompressedPools.java
 ##########
 @@ -77,41 +102,245 @@ public BufferRecycler get()
     return outputBytesPool.take();
   }
 
-  private static final NonBlockingPool<ByteBuffer> bigEndByteBufPool = new StupidPool<ByteBuffer>(
-      "bigEndByteBufPool",
-      new Supplier<ByteBuffer>()
-      {
-        private final AtomicLong counter = new AtomicLong(0);
-
-        @Override
-        public ByteBuffer get()
+  private static NonBlockingPool<ByteBuffer> makeBufferPool(String name, int size, ByteOrder order)
+  {
+    return new StupidPool<>(
+        name,
+        new Supplier<ByteBuffer>()
         {
-          log.info("Allocating new bigEndByteBuf[%,d]", counter.incrementAndGet());
-          return ByteBuffer.allocateDirect(BUFFER_SIZE).order(ByteOrder.BIG_ENDIAN);
+          private final AtomicLong counter = new AtomicLong(0);
+
+          @Override
+          public ByteBuffer get()
+          {
+            log.info("Allocating new %s[%,d]", name, counter.incrementAndGet());
+            return ByteBuffer.allocateDirect(size).order(order);
+          }
         }
-      }
-  );
+    );
+  }
 
-  private static final NonBlockingPool<ByteBuffer> littleEndByteBufPool = new StupidPool<ByteBuffer>(
-      "littleEndByteBufPool",
-      new Supplier<ByteBuffer>()
-      {
-        private final AtomicLong counter = new AtomicLong(0);
+  private static NonBlockingPool<int[]> makeIntArrayPool(String name, int size, int maxCache)
+  {
+    return new StupidPool<>(
+        name,
+        new Supplier<int[]>()
+        {
+          private final AtomicLong counter = new AtomicLong(0);
 
-        @Override
-        public ByteBuffer get()
+          @Override
+          public int[] get()
+          {
+            log.info("Allocating new %s[%,d]", name, counter.incrementAndGet());
+            return new int[size];
+          }
+        },
+        0,
+        maxCache
+    );
+  }
+
+  private static NonBlockingPool<SkippableIntegerCODEC> makeFastpforPool(String name, int size)
+  {
+    return new StupidPool<>(
+        name,
+        new Supplier<SkippableIntegerCODEC>()
         {
-          log.info("Allocating new littleEndByteBuf[%,d]", counter.incrementAndGet());
-          return ByteBuffer.allocateDirect(BUFFER_SIZE).order(ByteOrder.LITTLE_ENDIAN);
-        }
-      }
-  );
+          private final AtomicLong counter = new AtomicLong(0);
+
+          @Override
+          public SkippableIntegerCODEC get()
+          {
+            log.info("Allocating new %s[%,d]", name, counter.incrementAndGet());
+
+            Supplier<ByteBuffer> compressionBufferSupplier =
+                Suppliers.memoize(() -> ByteBuffer.allocateDirect(1 << 14));
+            return new SkippableComposition(
+                new FastPFOR(),
+                new VariableByte() {
+                  // VariableByte allocates a buffer in compress method instead of in constructor like fastpfor
+                  // so override to re-use instead (and only allocate if indexing)
+                  @Override
+                  protected ByteBuffer makeBuffer(int sizeInBytes)
+                  {
+                    ByteBuffer theBuffer = compressionBufferSupplier.get();
+                    theBuffer.clear();
+                    return theBuffer;
+                  }
+                }
+              );
+          }
+        },
+        0,
+        LEMIRE_FASTPFOR_CODEC_POOL_MAX_CACHE
+    );
+  }
+
+  private static final NonBlockingPool<ByteBuffer> bigEndByteBufPool =
+      makeBufferPool("bigEndByteBufPool", BUFFER_SIZE, ByteOrder.BIG_ENDIAN);
+
+  private static final NonBlockingPool<ByteBuffer> littleBigEndByteBufPool =
+      makeBufferPool("littleBigEndByteBufPool", SMALLER_BUFFER_SIZE, ByteOrder.BIG_ENDIAN);
+
+  private static final NonBlockingPool<ByteBuffer> littlestBigEndByteBufPool =
+      makeBufferPool("littlestBigEndByteBufPool", SMALLEST_BUFFER_SIZE, ByteOrder.BIG_ENDIAN);
+
+  private static final NonBlockingPool<ByteBuffer> littleEndByteBufPool =
+      makeBufferPool("littleEndByteBufPool", BUFFER_SIZE, ByteOrder.LITTLE_ENDIAN);
+
+  private static final NonBlockingPool<ByteBuffer> littlerEndByteBufPool =
+      makeBufferPool("littlerEndByteBufPool", SMALLER_BUFFER_SIZE, ByteOrder.LITTLE_ENDIAN);
+
+  private static final NonBlockingPool<ByteBuffer> littlestEndByteBufPool =
+      makeBufferPool("littlestEndByteBufPool", SMALLEST_BUFFER_SIZE, ByteOrder.LITTLE_ENDIAN);
+
+  private static final NonBlockingPool<int[]> shapeshiftIntsDecodedValuesArrayPool =
+      makeIntArrayPool(
+          "shapeshiftIntsDecodedValuesArrayPool",
+          INT_ARRAY_SIZE,
+          INT_DECODED_ARRAY_POOL_MAX_CACHE
+      );
+
+  private static final NonBlockingPool<int[]> shapeshiftIntsEncodedValuesArrayPool =
+      makeIntArrayPool(
+          "shapeshiftIntsEncodedValuesArrayPool",
+          INT_ARRAY_SIZE + ENCODED_INTS_SHOULD_BE_ENOUGH,
+          INT_ENCODED_ARRAY_POOL_MAX_CACHE
+      );
+
+  private static final NonBlockingPool<int[]> shapeshiftSmallerIntsDecodedValuesArrayPool =
+      makeIntArrayPool(
+          "shapeshiftSmallerIntsDecodedValuesArrayPool",
+          SMALLER_INT_ARRAY_SIZE,
+          INT_DECODED_ARRAY_POOL_MAX_CACHE
+      );
+
+  private static final NonBlockingPool<int[]> shapeshiftSmallerIntsEncodedValuesArrayPool =
+      makeIntArrayPool(
+          "shapeshiftSmallerIntsEncodedValuesArrayPool",
+          SMALLER_INT_ARRAY_SIZE + ENCODED_INTS_SHOULD_BE_ENOUGH,
+          INT_ENCODED_ARRAY_POOL_MAX_CACHE
+      );
+
+  private static final NonBlockingPool<int[]> shapeshiftSmallestIntsDecodedValuesArrayPool =
+      makeIntArrayPool(
+          "shapeshiftSmallestIntsDecodedValuesArrayPool",
+          SMALLEST_INT_ARRAY_SIZE,
+          INT_DECODED_ARRAY_POOL_MAX_CACHE
+      );
+
+  private static final NonBlockingPool<int[]> shapeshiftSmallestIntsEncodedValuesArrayPool =
+      makeIntArrayPool(
+          "shapeshiftSmallestIntsEncodedValuesArrayPool",
+          SMALLEST_INT_ARRAY_SIZE + ENCODED_INTS_SHOULD_BE_ENOUGH,
+          INT_ENCODED_ARRAY_POOL_MAX_CACHE
+      );
 
 Review comment:
   thanks for clarifying some of the things for me. I think it is fair to measure and do whatever performs best.
   
   I guess that not pooling temp arrays would be ok as they are short lived (specially with the optimizations done to G1GC). It might be ok to not pool value arrays as well given that we are not operating at the limits of available heap. I haven't read through the FastPFOC code yet, but sounds like it has direct buffers which are definitely better pooled.

----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
users@infra.apache.org


With regards,
Apache Git Services

---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscribe@druid.apache.org
For additional commands, e-mail: commits-help@druid.apache.org