You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@pinot.apache.org by "npawar (via GitHub)" <gi...@apache.org> on 2023/05/31 04:53:12 UTC

[GitHub] [pinot] npawar commented on a diff in pull request #10528: PinotBufferFactory and a buffer implementation that uses Unsafe

npawar commented on code in PR #10528:
URL: https://github.com/apache/pinot/pull/10528#discussion_r1210945646


##########
.github/workflows/pinot_tests.yml:
##########
@@ -120,16 +124,20 @@ jobs:
     if: github.repository == 'apache/pinot'
     runs-on: ubuntu-latest
     strategy:
+      # Changed to false in order to improve coverage using unsafe buffers
+      fail-fast: false
       matrix:
         testset: [ 1, 2 ]
-    name: Pinot Integration Test Set ${{ matrix.testset }}
+        java: [ 11, 17, 20 ]
+        distribution: [ "temurin" ]
+    name: Pinot Integration Test Set ${{ matrix.testset }} (${{matrix.distribution}}-${{matrix.java}})

Review Comment:
   will the github actions end to end now take 6 times as longer than before? Or would they run in parallel? If the latter, do we know that we have enough resources on there to handle the increased num tasks?



##########
pinot-segment-spi/src/main/java/org/apache/pinot/segment/spi/memory/PinotDataBuffer.java:
##########
@@ -99,6 +104,101 @@ public String toString() {
   private static final AtomicLong ALLOCATION_FAILURE_COUNT = new AtomicLong();
   private static final Map<PinotDataBuffer, BufferContext> BUFFER_CONTEXT_MAP = new WeakHashMap<>();
 
+  /**
+   * Configuration key used to change the offheap buffer factory used by Pinot.
+   * Value should be the qualified path of a class that extends {@link PinotBufferFactory} and has empty
+   * constructor.
+   */
+  private static final String OFFHEAP_BUFFER_FACTORY_CONFIG = "pinot.offheap.buffer.factory";
+  /**
+   * Boolean configuration that decides whether to allocate using {@link ByteBufferPinotBufferFactory} when the buffer
+   * to allocate fits in a {@link ByteBuffer}.
+   *
+   * Defaults to true.
+   */
+  private static final String OFFHEAP_BUFFER_PRIORITIZE_BYTE_BUFFER_CONFIG = "pinot.offheap.prioritize.bytebuffer";

Review Comment:
   so this config exists mainly to be able to switch off the behaviour of using PinotByteBuffer by default for < 2GB, in the newer implementations if we wanted to?



##########
pinot-segment-spi/src/main/java/org/apache/pinot/segment/spi/memory/PinotDataBuffer.java:
##########
@@ -99,6 +104,101 @@ public String toString() {
   private static final AtomicLong ALLOCATION_FAILURE_COUNT = new AtomicLong();
   private static final Map<PinotDataBuffer, BufferContext> BUFFER_CONTEXT_MAP = new WeakHashMap<>();
 
+  /**
+   * Configuration key used to change the offheap buffer factory used by Pinot.
+   * Value should be the qualified path of a class that extends {@link PinotBufferFactory} and has empty
+   * constructor.
+   */
+  private static final String OFFHEAP_BUFFER_FACTORY_CONFIG = "pinot.offheap.buffer.factory";
+  /**
+   * Boolean configuration that decides whether to allocate using {@link ByteBufferPinotBufferFactory} when the buffer
+   * to allocate fits in a {@link ByteBuffer}.
+   *
+   * Defaults to true.
+   */
+  private static final String OFFHEAP_BUFFER_PRIORITIZE_BYTE_BUFFER_CONFIG = "pinot.offheap.prioritize.bytebuffer";
+
+  /**
+   * The default {@link PinotBufferFactory} used by all threads that do not define their own factory.
+   */
+  private static PinotBufferFactory _defaultFactory = createDefaultFactory();
+  /**
+   * A thread local variable that can be used to customize the {@link PinotBufferFactory} used on tests. This is mostly
+   * useful in tests.
+   */
+  private static final ThreadLocal<PinotBufferFactory> _FACTORY = new ThreadLocal<>();
+
+  /**
+   * Change the {@link PinotBufferFactory} used by the current thread.
+   *
+   * If this method is not called, the default factory configured at startup time will be used.
+   *
+   * @see #loadDefaultFactory(PinotConfiguration)
+   */
+  public static void useFactory(PinotBufferFactory factory) {
+    _FACTORY.set(factory);
+  }
+
+  /**
+   * Returns the factory the current thread should use.
+   */
+  public static PinotBufferFactory getFactory() {
+    PinotBufferFactory pinotBufferFactory = _FACTORY.get();
+    if (pinotBufferFactory == null) {
+      pinotBufferFactory = _defaultFactory;
+    }
+    return pinotBufferFactory;
+  }
+
+  public static PinotBufferFactory createDefaultFactory() {
+    return createDefaultFactory(true);
+  }
+
+  public static PinotBufferFactory createDefaultFactory(boolean prioritizeByteBuffer) {
+    String factoryClassName;
+    if (JavaVersion.VERSION < 16) {
+      LOGGER.info("Using LArray as buffer on JVM version {}", JavaVersion.VERSION);
+      factoryClassName = LArrayPinotBufferFactory.class.getCanonicalName();
+    } else {
+      LOGGER.info("Using Unsafe as buffer on JVM version {}", JavaVersion.VERSION);
+      factoryClassName = UnsafePinotBufferFactory.class.getCanonicalName();
+    }
+    return createFactory(factoryClassName, prioritizeByteBuffer);
+  }
+
+  private static PinotBufferFactory createFactory(String factoryClassName, boolean prioritizeByteBuffer) {
+    try {
+      LOGGER.info("Instantiating Pinot buffer factory class {}", factoryClassName);
+      PinotBufferFactory factory = (PinotBufferFactory) Class.forName(factoryClassName).getConstructor().newInstance();
+
+      if (prioritizeByteBuffer) {
+        factory = new SmallWithFallbackPinotBufferFactory(new ByteBufferPinotBufferFactory(), factory);
+      }
+
+      return factory;
+    } catch (Exception e) {
+      throw new RuntimeException(e);
+    }
+  }
+
+  /**
+   * Configures the default {@link PinotBufferFactory}.
+   *
+   * This method guarantees that threads that didn't use the factory before this method is called are going to use the
+   * new factory. In other words, threads that were already running when this method was called may use other factories.
+   * Therefore it is recommended to call this method during Pinot startup.
+   */
+  public static void loadDefaultFactory(PinotConfiguration configuration) {
+    boolean prioritizeByteBuffer = configuration.getProperty(OFFHEAP_BUFFER_PRIORITIZE_BYTE_BUFFER_CONFIG, true);
+    String factoryClassName = configuration.getProperty(OFFHEAP_BUFFER_FACTORY_CONFIG);
+    if (factoryClassName != null) {
+      _defaultFactory = createFactory(factoryClassName, prioritizeByteBuffer);
+    } else {
+      LOGGER.info("No custom Pinot buffer factory class found in configuration. Using default factory");
+      _defaultFactory = createDefaultFactory();

Review Comment:
   don't you want to pass `prioritizeByteBuffer` to createDefaultFactory in this case?



##########
.github/workflows/scripts/.pinot_test.sh:
##########
@@ -79,7 +83,9 @@ else
         -pl '!:pinot-csv' \
         -pl '!:pinot-json' \
         -pl '!:pinot-segment-uploader-default' \
-        -P github-actions,no-integration-tests && exit 0 || exit 1
+        -P github-actions,no-integration-tests \
+        -Dspotless.apply.skip -Dcheckstyle.skip -Dspotless.apply.skip -Dlicense.skip=true \

Review Comment:
   why add skips for all these?



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscribe@pinot.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscribe@pinot.apache.org
For additional commands, e-mail: commits-help@pinot.apache.org