You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@pinot.apache.org by ti...@apache.org on 2023/05/18 00:20:36 UTC

[pinot] branch master updated: Minor Pinot Deepstore Upload Retry Task Improvements (#10752)

This is an automated email from the ASF dual-hosted git repository.

tingchen pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/pinot.git


The following commit(s) were added to refs/heads/master by this push:
     new 61d6b1075d Minor Pinot Deepstore Upload Retry Task Improvements (#10752)
61d6b1075d is described below

commit 61d6b1075da2ae73d817c3154f942af2d45021b6
Author: Ankit Sultana <an...@uber.com>
AuthorDate: Thu May 18 05:50:30 2023 +0530

    Minor Pinot Deepstore Upload Retry Task Improvements (#10752)
    
    * Use Non-Random Path in Pinot Deepstore Upload Retry
    
    * Add configurable timeout + fix segment name in move
    
    * Fix test
    
    * Proper test which asserts segment file move
    
    * Address feedback
---
 .../pinot/common/utils/SimpleHttpErrorInfo.java    |  2 ++
 .../apache/pinot/controller/ControllerConf.java    |  6 ++++
 .../realtime/PinotLLCRealtimeSegmentManager.java   | 34 ++++++++++++++++++----
 .../PinotLLCRealtimeSegmentManagerTest.java        | 31 +++++++++++++++-----
 .../manager/realtime/PinotFSSegmentUploader.java   |  8 ++++-
 .../data/manager/realtime/SegmentUploader.java     | 11 ++++++-
 .../realtime/Server2ControllerSegmentUploader.java | 13 +++++++--
 .../pinot/server/api/resources/TablesResource.java | 20 +++++++++++--
 8 files changed, 105 insertions(+), 20 deletions(-)

diff --git a/pinot-common/src/main/java/org/apache/pinot/common/utils/SimpleHttpErrorInfo.java b/pinot-common/src/main/java/org/apache/pinot/common/utils/SimpleHttpErrorInfo.java
index 90597453d5..b424ada19f 100644
--- a/pinot-common/src/main/java/org/apache/pinot/common/utils/SimpleHttpErrorInfo.java
+++ b/pinot-common/src/main/java/org/apache/pinot/common/utils/SimpleHttpErrorInfo.java
@@ -19,6 +19,7 @@
 package org.apache.pinot.common.utils;
 
 import com.fasterxml.jackson.annotation.JsonCreator;
+import com.fasterxml.jackson.annotation.JsonIgnoreProperties;
 import com.fasterxml.jackson.annotation.JsonProperty;
 
 
@@ -30,6 +31,7 @@ public class SimpleHttpErrorInfo {
   private String _error;
 
   @JsonCreator
+  @JsonIgnoreProperties(ignoreUnknown = true)
   public SimpleHttpErrorInfo(@JsonProperty("code") int code, @JsonProperty("error") String message) {
     _code = code;
     _error = message;
diff --git a/pinot-controller/src/main/java/org/apache/pinot/controller/ControllerConf.java b/pinot-controller/src/main/java/org/apache/pinot/controller/ControllerConf.java
index 2dda29a5b2..8595a434af 100644
--- a/pinot-controller/src/main/java/org/apache/pinot/controller/ControllerConf.java
+++ b/pinot-controller/src/main/java/org/apache/pinot/controller/ControllerConf.java
@@ -213,6 +213,8 @@ public class ControllerConf extends PinotConfiguration {
     // Default value is false.
     public static final String ENABLE_DEEP_STORE_RETRY_UPLOAD_LLC_SEGMENT =
         "controller.realtime.segment.deepStoreUploadRetryEnabled";
+    public static final String DEEP_STORE_RETRY_UPLOAD_TIMEOUT_MS =
+        "controller.realtime.segment.deepStoreUploadRetry.timeoutMs";
 
     public static final int MIN_INITIAL_DELAY_IN_SECONDS = 120;
     public static final int MAX_INITIAL_DELAY_IN_SECONDS = 300;
@@ -920,6 +922,10 @@ public class ControllerConf extends PinotConfiguration {
     return getProperty(ControllerPeriodicTasksConf.ENABLE_DEEP_STORE_RETRY_UPLOAD_LLC_SEGMENT, false);
   }
 
+  public int getDeepStoreRetryUploadTimeoutMs() {
+    return getProperty(ControllerPeriodicTasksConf.DEEP_STORE_RETRY_UPLOAD_TIMEOUT_MS, -1);
+  }
+
   public long getPinotTaskManagerInitialDelaySeconds() {
     return getPeriodicTaskInitialDelayInSeconds();
   }
diff --git a/pinot-controller/src/main/java/org/apache/pinot/controller/helix/core/realtime/PinotLLCRealtimeSegmentManager.java b/pinot-controller/src/main/java/org/apache/pinot/controller/helix/core/realtime/PinotLLCRealtimeSegmentManager.java
index 455ff15894..fab1a234b0 100644
--- a/pinot-controller/src/main/java/org/apache/pinot/controller/helix/core/realtime/PinotLLCRealtimeSegmentManager.java
+++ b/pinot-controller/src/main/java/org/apache/pinot/controller/helix/core/realtime/PinotLLCRealtimeSegmentManager.java
@@ -167,6 +167,7 @@ public class PinotLLCRealtimeSegmentManager {
   private final Lock[] _idealStateUpdateLocks;
   private final FlushThresholdUpdateManager _flushThresholdUpdateManager;
   private final boolean _isDeepStoreLLCSegmentUploadRetryEnabled;
+  private final int _deepstoreUploadRetryTimeoutMs;
   private final FileUploadDownloadClient _fileUploadDownloadClient;
   private final AtomicInteger _numCompletingSegments = new AtomicInteger(0);
 
@@ -191,6 +192,7 @@ public class PinotLLCRealtimeSegmentManager {
     }
     _flushThresholdUpdateManager = new FlushThresholdUpdateManager();
     _isDeepStoreLLCSegmentUploadRetryEnabled = controllerConf.isDeepStoreRetryUploadLLCSegmentEnabled();
+    _deepstoreUploadRetryTimeoutMs = controllerConf.getDeepStoreRetryUploadTimeoutMs();
     _fileUploadDownloadClient = _isDeepStoreLLCSegmentUploadRetryEnabled ? initFileUploadDownloadClient() : null;
   }
 
@@ -481,12 +483,10 @@ public class PinotLLCRealtimeSegmentManager {
       LOGGER.info("No moving needed for segment on peer servers: {}", segmentLocation);
       return;
     }
-    URI segmentFileURI = URIUtils.getUri(segmentLocation);
+
     URI tableDirURI = URIUtils.getUri(_controllerConf.getDataDir(), rawTableName);
-    URI uriToMoveTo = URIUtils.getUri(_controllerConf.getDataDir(), rawTableName, URIUtils.encode(segmentName));
     PinotFS pinotFS = PinotFSFactory.create(tableDirURI.getScheme());
-    Preconditions.checkState(pinotFS.move(segmentFileURI, uriToMoveTo, true),
-        "Failed to move segment file for segment: %s from: %s to: %s", segmentName, segmentLocation, uriToMoveTo);
+    String uriToMoveTo = moveSegmentFile(rawTableName, segmentName, segmentLocation, pinotFS);
 
     // Cleans up tmp segment files under table dir.
     // We only clean up tmp segment files in table level dir, so there's no need to list recursively.
@@ -502,7 +502,7 @@ public class PinotLLCRealtimeSegmentManager {
     } catch (Exception e) {
       LOGGER.warn("Caught exception while deleting temporary segment files for segment: {}", segmentName, e);
     }
-    committingSegmentDescriptor.setSegmentLocation(uriToMoveTo.toString());
+    committingSegmentDescriptor.setSegmentLocation(uriToMoveTo);
   }
 
   /**
@@ -1392,6 +1392,7 @@ public class PinotLLCRealtimeSegmentManager {
     Preconditions.checkState(!_isStopping, "Segment manager is stopping");
 
     String realtimeTableName = tableConfig.getTableName();
+    String rawTableName = TableNameBuilder.extractRawTableName(realtimeTableName);
 
     // Use this retention value to avoid the data racing between segment upload and retention management.
     RetentionStrategy retentionStrategy = null;
@@ -1404,6 +1405,8 @@ public class PinotLLCRealtimeSegmentManager {
           retentionMs - MIN_TIME_BEFORE_SEGMENT_EXPIRATION_FOR_FIXING_DEEP_STORE_COPY_MILLIS);
     }
 
+    PinotFS pinotFS = PinotFSFactory.create(URIUtils.getUri(_controllerConf.getDataDir()).getScheme());
+
     // Iterate through LLC segments and upload missing deep store copy by following steps:
     //  1. Ask servers which have online segment replica to upload to deep store.
     //     Servers return deep store download url after successful uploading.
@@ -1438,9 +1441,13 @@ public class PinotLLCRealtimeSegmentManager {
         // Randomly ask one server to upload
         URI uri = peerSegmentURIs.get(RANDOM.nextInt(peerSegmentURIs.size()));
         String serverUploadRequestUrl = StringUtil.join("/", uri.toString(), "upload");
+        serverUploadRequestUrl =
+            String.format("%s?uploadTimeoutMs=%d", serverUploadRequestUrl, _deepstoreUploadRetryTimeoutMs);
         LOGGER.info("Ask server to upload LLC segment {} to deep store by this path: {}", segmentName,
             serverUploadRequestUrl);
-        String segmentDownloadUrl = _fileUploadDownloadClient.uploadToSegmentStore(serverUploadRequestUrl);
+        String tempSegmentDownloadUrl = _fileUploadDownloadClient.uploadToSegmentStore(serverUploadRequestUrl);
+        String segmentDownloadUrl =
+            moveSegmentFile(rawTableName, segmentName, tempSegmentDownloadUrl, pinotFS);
         LOGGER.info("Updating segment {} download url in ZK to be {}", segmentName, segmentDownloadUrl);
         // Update segment ZK metadata by adding the download URL
         segmentZKMetadata.setDownloadUrl(segmentDownloadUrl);
@@ -1555,4 +1562,19 @@ public class PinotLLCRealtimeSegmentManager {
     Set<String> consumingSegments = findConsumingSegments(idealState);
     return new PauseStatus(Boolean.parseBoolean(isTablePausedStr), consumingSegments, null);
   }
+
+  @VisibleForTesting
+  String moveSegmentFile(String rawTableName, String segmentName, String segmentLocation, PinotFS pinotFS)
+      throws IOException {
+    URI segmentFileURI = URIUtils.getUri(segmentLocation);
+    URI uriToMoveTo = createSegmentPath(rawTableName, segmentName);
+    Preconditions.checkState(pinotFS.move(segmentFileURI, uriToMoveTo, true),
+        "Failed to move segment file for segment: %s from: %s to: %s", segmentName, segmentLocation, uriToMoveTo);
+    return uriToMoveTo.toString();
+  }
+
+  @VisibleForTesting
+  URI createSegmentPath(String rawTableName, String segmentName) {
+    return URIUtils.getUri(_controllerConf.getDataDir(), rawTableName, URIUtils.encode(segmentName));
+  }
 }
diff --git a/pinot-controller/src/test/java/org/apache/pinot/controller/helix/core/realtime/PinotLLCRealtimeSegmentManagerTest.java b/pinot-controller/src/test/java/org/apache/pinot/controller/helix/core/realtime/PinotLLCRealtimeSegmentManagerTest.java
index eee1f2a8a8..2223c9e341 100644
--- a/pinot-controller/src/test/java/org/apache/pinot/controller/helix/core/realtime/PinotLLCRealtimeSegmentManagerTest.java
+++ b/pinot-controller/src/test/java/org/apache/pinot/controller/helix/core/realtime/PinotLLCRealtimeSegmentManagerTest.java
@@ -31,6 +31,7 @@ import java.util.List;
 import java.util.Map;
 import java.util.Random;
 import java.util.TreeMap;
+import java.util.UUID;
 import java.util.concurrent.TimeUnit;
 import java.util.stream.Collectors;
 import java.util.stream.IntStream;
@@ -89,7 +90,12 @@ import static org.mockito.ArgumentMatchers.any;
 import static org.mockito.ArgumentMatchers.eq;
 import static org.mockito.Mockito.mock;
 import static org.mockito.Mockito.when;
-import static org.testng.Assert.*;
+import static org.testng.Assert.assertEquals;
+import static org.testng.Assert.assertFalse;
+import static org.testng.Assert.assertNotNull;
+import static org.testng.Assert.assertNull;
+import static org.testng.Assert.assertTrue;
+import static org.testng.Assert.fail;
 
 
 public class PinotLLCRealtimeSegmentManagerTest {
@@ -948,6 +954,7 @@ public class PinotLLCRealtimeSegmentManagerTest {
     ControllerConf controllerConfig = new ControllerConf();
     controllerConfig.setProperty(
         ControllerConf.ControllerPeriodicTasksConf.ENABLE_DEEP_STORE_RETRY_UPLOAD_LLC_SEGMENT, true);
+    controllerConfig.setDataDir(TEMP_DIR.toString());
     FakePinotLLCRealtimeSegmentManager segmentManager =
         new FakePinotLLCRealtimeSegmentManager(pinotHelixResourceManager, controllerConfig);
     Assert.assertTrue(segmentManager.isDeepStoreLLCSegmentUploadRetryEnabled());
@@ -990,11 +997,17 @@ public class PinotLLCRealtimeSegmentManagerTest {
             "segments",
             REALTIME_TABLE_NAME,
             segmentsZKMetadata.get(0).getSegmentName(),
-            "upload");
-    String segmentDownloadUrl0 = String.format("segmentDownloadUr_%s", segmentsZKMetadata.get(0)
-        .getSegmentName());
+            "upload") + "?uploadTimeoutMs=-1";
+    // tempSegmentFileLocation is the location where the segment uploader will upload the segment. This usually ends
+    // with a random UUID
+    File tempSegmentFileLocation = new File(TEMP_DIR, segmentsZKMetadata.get(0).getSegmentName() + UUID.randomUUID());
+    FileUtils.write(tempSegmentFileLocation, "test");
+    // After the deep-store retry task gets the segment location returned by Pinot server, it will move the segment to
+    // its final location. This is the expected segment location.
+    String expectedSegmentLocation = segmentManager.createSegmentPath(RAW_TABLE_NAME,
+        segmentsZKMetadata.get(0).getSegmentName()).toString();
     when(segmentManager._mockedFileUploadDownloadClient
-        .uploadToSegmentStore(serverUploadRequestUrl0)).thenReturn(segmentDownloadUrl0);
+        .uploadToSegmentStore(serverUploadRequestUrl0)).thenReturn(tempSegmentFileLocation.getPath());
 
     // Change 2nd segment status to be DONE, but with default peer download url.
     // Verify later the download url isn't fixed after upload failure.
@@ -1013,7 +1026,7 @@ public class PinotLLCRealtimeSegmentManagerTest {
             "segments",
             REALTIME_TABLE_NAME,
             segmentsZKMetadata.get(1).getSegmentName(),
-            "upload");
+            "upload") + "?uploadTimeoutMs=-1";
     when(segmentManager._mockedFileUploadDownloadClient
         .uploadToSegmentStore(serverUploadRequestUrl1))
         .thenThrow(new HttpErrorStatusException(
@@ -1041,11 +1054,15 @@ public class PinotLLCRealtimeSegmentManagerTest {
     when(pinotHelixResourceManager.getTableConfig(REALTIME_TABLE_NAME))
         .thenReturn(segmentManager._tableConfig);
 
+
     // Verify the result
     segmentManager.uploadToDeepStoreIfMissing(segmentManager._tableConfig, segmentsZKMetadata);
     assertEquals(
         segmentManager.getSegmentZKMetadata(REALTIME_TABLE_NAME, segmentNames.get(0), null).getDownloadUrl(),
-        segmentDownloadUrl0);
+        expectedSegmentLocation);
+    assertFalse(tempSegmentFileLocation.exists(),
+        "Deep-store retry task should move the file from temp location to permanent location");
+
     assertEquals(
         segmentManager.getSegmentZKMetadata(REALTIME_TABLE_NAME, segmentNames.get(1), null).getDownloadUrl(),
         CommonConstants.Segment.METADATA_URI_FOR_PEER_DOWNLOAD);
diff --git a/pinot-core/src/main/java/org/apache/pinot/core/data/manager/realtime/PinotFSSegmentUploader.java b/pinot-core/src/main/java/org/apache/pinot/core/data/manager/realtime/PinotFSSegmentUploader.java
index 9a34872a50..4f173521c9 100644
--- a/pinot-core/src/main/java/org/apache/pinot/core/data/manager/realtime/PinotFSSegmentUploader.java
+++ b/pinot-core/src/main/java/org/apache/pinot/core/data/manager/realtime/PinotFSSegmentUploader.java
@@ -59,7 +59,13 @@ public class PinotFSSegmentUploader implements SegmentUploader {
     _serverMetrics = serverMetrics;
   }
 
+  @Override
   public URI uploadSegment(File segmentFile, LLCSegmentName segmentName) {
+    return uploadSegment(segmentFile, segmentName, _timeoutInMs);
+  }
+
+  @Override
+  public URI uploadSegment(File segmentFile, LLCSegmentName segmentName, int timeoutInMillis) {
     if (_segmentStoreUriStr == null || _segmentStoreUriStr.isEmpty()) {
       LOGGER.error("Missing segment store uri. Failed to upload segment file {} for {}.", segmentFile.getName(),
           segmentName.getSegmentName());
@@ -89,7 +95,7 @@ public class PinotFSSegmentUploader implements SegmentUploader {
     };
     Future<URI> future = _executorService.submit(uploadTask);
     try {
-      URI segmentLocation = future.get(_timeoutInMs, TimeUnit.MILLISECONDS);
+      URI segmentLocation = future.get(timeoutInMillis, TimeUnit.MILLISECONDS);
       LOGGER.info("Successfully upload segment {} to {}.", segmentName, segmentLocation);
       _serverMetrics.addMeteredTableValue(rawTableName,
           segmentLocation == null ? ServerMeter.SEGMENT_UPLOAD_FAILURE : ServerMeter.SEGMENT_UPLOAD_SUCCESS, 1);
diff --git a/pinot-core/src/main/java/org/apache/pinot/core/data/manager/realtime/SegmentUploader.java b/pinot-core/src/main/java/org/apache/pinot/core/data/manager/realtime/SegmentUploader.java
index 44d0a36067..63bbe99a5b 100644
--- a/pinot-core/src/main/java/org/apache/pinot/core/data/manager/realtime/SegmentUploader.java
+++ b/pinot-core/src/main/java/org/apache/pinot/core/data/manager/realtime/SegmentUploader.java
@@ -24,6 +24,15 @@ import org.apache.pinot.common.utils.LLCSegmentName;
 
 
 public interface SegmentUploader {
-  // Returns the URI of the uploaded segment. null if the upload fails.
+
+  /**
+   * Uploads the given segmentFile to the deep-store. Returns the URI where the segment is uploaded.
+   */
   URI uploadSegment(File segmentFile, LLCSegmentName segmentName);
+
+  /**
+   * Uploads the given segmentFile to the deep-store. Returns the URI where the segment is uploaded. The upload will
+   * wait for the specified timeout.
+   */
+  URI uploadSegment(File segmentFile, LLCSegmentName segmentName, int timeoutInMillis);
 }
diff --git a/pinot-core/src/main/java/org/apache/pinot/core/data/manager/realtime/Server2ControllerSegmentUploader.java b/pinot-core/src/main/java/org/apache/pinot/core/data/manager/realtime/Server2ControllerSegmentUploader.java
index 5aa5b8b266..4b00563125 100644
--- a/pinot-core/src/main/java/org/apache/pinot/core/data/manager/realtime/Server2ControllerSegmentUploader.java
+++ b/pinot-core/src/main/java/org/apache/pinot/core/data/manager/realtime/Server2ControllerSegmentUploader.java
@@ -64,7 +64,12 @@ public class Server2ControllerSegmentUploader implements SegmentUploader {
 
   @Override
   public URI uploadSegment(File segmentFile, LLCSegmentName segmentName) {
-    SegmentCompletionProtocol.Response response = uploadSegmentToController(segmentFile);
+    return uploadSegment(segmentFile, segmentName, _segmentUploadRequestTimeoutMs);
+  }
+
+  @Override
+  public URI uploadSegment(File segmentFile, LLCSegmentName segmentName, int timeoutInMillis) {
+    SegmentCompletionProtocol.Response response = uploadSegmentToController(segmentFile, timeoutInMillis);
     if (response.getStatus() == SegmentCompletionProtocol.ControllerResponseStatus.UPLOAD_SUCCESS) {
       try {
         URI uri = new URI(response.getSegmentLocation());
@@ -79,12 +84,16 @@ public class Server2ControllerSegmentUploader implements SegmentUploader {
   }
 
   public SegmentCompletionProtocol.Response uploadSegmentToController(File segmentFile) {
+    return uploadSegmentToController(segmentFile, _segmentUploadRequestTimeoutMs);
+  }
+
+  private SegmentCompletionProtocol.Response uploadSegmentToController(File segmentFile, int timeoutInMillis) {
     SegmentCompletionProtocol.Response response;
     long startTime = System.currentTimeMillis();
     try {
       String responseStr = _fileUploadDownloadClient
           .uploadSegment(_controllerSegmentUploadCommitUrl, _segmentName, segmentFile,
-              AuthProviderUtils.toRequestHeaders(_authProvider), null, _segmentUploadRequestTimeoutMs).getResponse();
+              AuthProviderUtils.toRequestHeaders(_authProvider), null, timeoutInMillis).getResponse();
       response = SegmentCompletionProtocol.Response.fromJsonString(responseStr);
       _segmentLogger.info("Controller response {} for {}", response.toJsonString(), _controllerSegmentUploadCommitUrl);
       if (response.getStatus().equals(SegmentCompletionProtocol.ControllerResponseStatus.NOT_LEADER)) {
diff --git a/pinot-server/src/main/java/org/apache/pinot/server/api/resources/TablesResource.java b/pinot-server/src/main/java/org/apache/pinot/server/api/resources/TablesResource.java
index d9f0b6c76e..e518d2d6b1 100644
--- a/pinot-server/src/main/java/org/apache/pinot/server/api/resources/TablesResource.java
+++ b/pinot-server/src/main/java/org/apache/pinot/server/api/resources/TablesResource.java
@@ -469,7 +469,14 @@ public class TablesResource {
    * when segment store copy is unavailable for committed low level consumer segments.
    * Please note that invocation of this endpoint may cause query performance to suffer, since we tar up the segment
    * to upload it.
-   * @see <a>href="https://tinyurl.com/f63ru4sb</a>
+   *
+   * @see <a href="https://tinyurl.com/f63ru4sb></a>
+   * @param realtimeTableName table name with type.
+   * @param segmentName name of the segment to be uploaded
+   * @param timeoutMs timeout for the segment upload to the deep-store. If this is negative, the default timeout
+   *                  would be used.
+   * @return full url where the segment is uploaded
+   * @throws Exception if an error occurred during the segment upload.
    */
   @POST
   @Path("/segments/{realtimeTableName}/{segmentName}/upload")
@@ -485,7 +492,8 @@ public class TablesResource {
   public String uploadLLCSegment(
       @ApiParam(value = "Name of the REALTIME table", required = true) @PathParam("realtimeTableName")
           String realtimeTableName,
-      @ApiParam(value = "Name of the segment", required = true) @PathParam("segmentName") String segmentName)
+      @ApiParam(value = "Name of the segment", required = true) @PathParam("segmentName") String segmentName,
+      @QueryParam("uploadTimeoutMs") @DefaultValue("-1") int timeoutMs)
       throws Exception {
     LOGGER.info("Received a request to upload low level consumer segment {} for table {}", segmentName,
         realtimeTableName);
@@ -527,7 +535,13 @@ public class TablesResource {
 
       // Use segment uploader to upload the segment tar file to segment store and return the segment download url.
       SegmentUploader segmentUploader = _serverInstance.getInstanceDataManager().getSegmentUploader();
-      URI segmentDownloadUrl = segmentUploader.uploadSegment(segmentTarFile, new LLCSegmentName(segmentName));
+      URI segmentDownloadUrl;
+      if (timeoutMs <= 0) {
+        // Use default timeout if passed timeout is not positive
+        segmentDownloadUrl = segmentUploader.uploadSegment(segmentTarFile, new LLCSegmentName(segmentName));
+      } else {
+        segmentDownloadUrl = segmentUploader.uploadSegment(segmentTarFile, new LLCSegmentName(segmentName), timeoutMs);
+      }
       if (segmentDownloadUrl == null) {
         throw new WebApplicationException(
             String.format("Failed to upload table %s segment %s to segment store", realtimeTableName, segmentName),


---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscribe@pinot.apache.org
For additional commands, e-mail: commits-help@pinot.apache.org