You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@pinot.apache.org by mc...@apache.org on 2019/03/19 23:35:47 UTC

[incubator-pinot] branch master updated: Pinot server side change to optimize LLC segment completion with direct metadata upload. (#3941)

This is an automated email from the ASF dual-hosted git repository.

mcvsubbu pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/incubator-pinot.git


The following commit(s) were added to refs/heads/master by this push:
     new fe203b5  Pinot server side change to optimize LLC segment completion with direct metadata upload.  (#3941)
fe203b5 is described below

commit fe203b5be2bbbf6464f1c757f07cb3374fa3bf19
Author: Ting Chen <ch...@gmail.com>
AuthorDate: Tue Mar 19 16:35:41 2019 -0700

    Pinot server side change to optimize LLC segment completion with direct metadata upload.  (#3941)
    
    * The Pinot server side change to optimize LLC segment complete protocol by uploading metadata directly.
    
    * Minor style and comment fixes.
    
    * Fix error handling and style issues.
    
    * Add missing header..
    
    * Add the config for commit end with metadata into server instance config. Combine integration tests.
    
    * Change comments.
    
    * Minor comment fix.
    
    * Use the metadata files created in the server directly.
    
    * Deprecated the old segmentCommitEnd method and change the default to commentEndWithMetadata.
    
    * Return null instead of crashing server...
    
    * Add comments.
    
    * Invert config checks.
---
 .../protocols/SegmentCompletionProtocol.java       |  6 +++
 .../apache/pinot/common/utils/CommonConstants.java |  1 +
 .../common/utils/FileUploadDownloadClient.java     | 24 +++++++++++
 .../manager/config/InstanceDataManagerConfig.java  |  2 +
 .../realtime/LLRealtimeSegmentDataManager.java     | 48 +++++++++++++++++++---
 .../segment/index/loader/IndexLoadingConfig.java   |  4 ++
 .../ServerSegmentCompletionProtocolHandler.java    | 37 +++++++++++++++++
 .../realtime/LLRealtimeSegmentDataManagerTest.java |  4 +-
 ...CRealtimeClusterSplitCommitIntegrationTest.java |  2 +-
 .../helix/HelixInstanceDataManagerConfig.java      |  7 ++++
 10 files changed, 126 insertions(+), 9 deletions(-)

diff --git a/pinot-common/src/main/java/org/apache/pinot/common/protocols/SegmentCompletionProtocol.java b/pinot-common/src/main/java/org/apache/pinot/common/protocols/SegmentCompletionProtocol.java
index 60ee1e0..822ad21 100644
--- a/pinot-common/src/main/java/org/apache/pinot/common/protocols/SegmentCompletionProtocol.java
+++ b/pinot-common/src/main/java/org/apache/pinot/common/protocols/SegmentCompletionProtocol.java
@@ -359,6 +359,12 @@ public class SegmentCompletionProtocol {
     }
   }
 
+  public static class SegmentCommitEndWithMetadataRequest extends Request {
+    public SegmentCommitEndWithMetadataRequest(Params params) {
+      super(params, MSG_TYPE_COMMIT_END_METADATA);
+    }
+  }
+
   public static class SegmentStoppedConsuming extends Request {
     public SegmentStoppedConsuming(Params params) {
       super(params, MSG_TYPE_STOPPED_CONSUMING);
diff --git a/pinot-common/src/main/java/org/apache/pinot/common/utils/CommonConstants.java b/pinot-common/src/main/java/org/apache/pinot/common/utils/CommonConstants.java
index b76f2b8..0777ad4 100644
--- a/pinot-common/src/main/java/org/apache/pinot/common/utils/CommonConstants.java
+++ b/pinot-common/src/main/java/org/apache/pinot/common/utils/CommonConstants.java
@@ -156,6 +156,7 @@ public class CommonConstants {
     public static final String CONFIG_OF_ENABLE_DEFAULT_COLUMNS = "pinot.server.instance.enable.default.columns";
     public static final String CONFIG_OF_ENABLE_SHUTDOWN_DELAY = "pinot.server.instance.enable.shutdown.delay";
     public static final String CONFIG_OF_ENABLE_SPLIT_COMMIT = "pinot.server.instance.enable.split.commit";
+    public static final String CONFIG_OF_ENABLE_COMMIT_END_WITH_METADATA = "pinot.server.instance.enable.commitend.metadata";
     public static final String CONFIG_OF_REALTIME_OFFHEAP_ALLOCATION = "pinot.server.instance.realtime.alloc.offheap";
     public static final String CONFIG_OF_REALTIME_OFFHEAP_DIRECT_ALLOCATION =
         "pinot.server.instance.realtime.alloc.offheap.direct";
diff --git a/pinot-common/src/main/java/org/apache/pinot/common/utils/FileUploadDownloadClient.java b/pinot-common/src/main/java/org/apache/pinot/common/utils/FileUploadDownloadClient.java
index 4a37dcf..de8c76a 100644
--- a/pinot-common/src/main/java/org/apache/pinot/common/utils/FileUploadDownloadClient.java
+++ b/pinot-common/src/main/java/org/apache/pinot/common/utils/FileUploadDownloadClient.java
@@ -29,6 +29,7 @@ import java.io.OutputStream;
 import java.net.URI;
 import java.net.URISyntaxException;
 import java.util.List;
+import java.util.Map;
 import javax.annotation.Nullable;
 import javax.net.ssl.SSLContext;
 import org.apache.commons.io.IOUtils;
@@ -229,6 +230,22 @@ public class FileUploadDownloadClient implements Closeable {
         parameters, socketTimeoutMs);
   }
 
+  private static HttpUriRequest getUploadSegmentMetadataFilesRequest(URI uri, Map<String, File> metadataFiles,
+      int segmentUploadRequestTimeoutMs) {
+    MultipartEntityBuilder multipartEntityBuilder = MultipartEntityBuilder.create().
+        setMode(HttpMultipartMode.BROWSER_COMPATIBLE);
+    for (Map.Entry<String, File> entry : metadataFiles.entrySet()) {
+      multipartEntityBuilder.addPart(entry.getKey(), getContentBody(entry.getKey(), entry.getValue()));
+    }
+    HttpEntity entity = multipartEntityBuilder.build();
+
+    // Build the POST request.
+    RequestBuilder requestBuilder =
+        RequestBuilder.create(HttpPost.METHOD_NAME).setVersion(HttpVersion.HTTP_1_1).setUri(uri).setEntity(entity);
+    setTimeout(requestBuilder, segmentUploadRequestTimeoutMs);
+    return requestBuilder.build();
+  }
+
   private static HttpUriRequest getSendSegmentUriRequest(URI uri, String downloadUri, @Nullable List<Header> headers,
       @Nullable List<NameValuePair> parameters, int socketTimeoutMs) {
     RequestBuilder requestBuilder = RequestBuilder.post(uri).setVersion(HttpVersion.HTTP_1_1)
@@ -388,6 +405,13 @@ public class FileUploadDownloadClient implements Closeable {
         getUploadSegmentMetadataRequest(uri, segmentName, segmentMetadataFile, headers, parameters, socketTimeoutMs));
   }
 
+  // Upload a set of segment metadata files (e.g., meta.properties and creation.meta) to controllers.
+  public SimpleHttpResponse uploadSegmentMetadataFiles(URI uri, Map<String, File> metadataFiles,
+      int segmentUploadRequestTimeoutMs)
+      throws IOException, HttpErrorStatusException {
+    return sendRequest(getUploadSegmentMetadataFilesRequest(uri, metadataFiles, segmentUploadRequestTimeoutMs));
+  }
+
   /**
    * Upload segment with segment file.
    *
diff --git a/pinot-core/src/main/java/org/apache/pinot/core/data/manager/config/InstanceDataManagerConfig.java b/pinot-core/src/main/java/org/apache/pinot/core/data/manager/config/InstanceDataManagerConfig.java
index 2685768..ba1896f 100644
--- a/pinot-core/src/main/java/org/apache/pinot/core/data/manager/config/InstanceDataManagerConfig.java
+++ b/pinot-core/src/main/java/org/apache/pinot/core/data/manager/config/InstanceDataManagerConfig.java
@@ -45,6 +45,8 @@ public interface InstanceDataManagerConfig {
 
   boolean isEnableSplitCommit();
 
+  boolean isEnableSplitCommitEndWithMetadata();
+
   boolean isRealtimeOffHeapAllocation();
 
   boolean isDirectRealtimeOffheapAllocation();
diff --git a/pinot-core/src/main/java/org/apache/pinot/core/data/manager/realtime/LLRealtimeSegmentDataManager.java b/pinot-core/src/main/java/org/apache/pinot/core/data/manager/realtime/LLRealtimeSegmentDataManager.java
index 18b1409..50ee54e 100644
--- a/pinot-core/src/main/java/org/apache/pinot/core/data/manager/realtime/LLRealtimeSegmentDataManager.java
+++ b/pinot-core/src/main/java/org/apache/pinot/core/data/manager/realtime/LLRealtimeSegmentDataManager.java
@@ -25,7 +25,9 @@ import com.yammer.metrics.core.Meter;
 import java.io.File;
 import java.io.IOException;
 import java.util.ArrayList;
+import java.util.HashMap;
 import java.util.List;
+import java.util.Map;
 import java.util.Set;
 import java.util.concurrent.Semaphore;
 import java.util.concurrent.TimeUnit;
@@ -67,6 +69,7 @@ import org.apache.pinot.core.realtime.stream.StreamDecoderProvider;
 import org.apache.pinot.core.realtime.stream.StreamMessageDecoder;
 import org.apache.pinot.core.realtime.stream.StreamMetadataProvider;
 import org.apache.pinot.core.realtime.stream.TransientConsumerException;
+import org.apache.pinot.core.segment.creator.impl.V1Constants;
 import org.apache.pinot.core.segment.index.loader.IndexLoadingConfig;
 import org.apache.pinot.server.realtime.ServerSegmentCompletionProtocolHandler;
 import org.joda.time.DateTime;
@@ -130,15 +133,17 @@ public class LLRealtimeSegmentDataManager extends RealtimeSegmentDataManager {
 
   protected class SegmentBuildDescriptor {
     final String _segmentTarFilePath;
+    final Map<String, File> _metadataFileMap;
     final long _offset;
     final long _waitTimeMillis;
     final long _buildTimeMillis;
     final String _segmentDirPath;
     final long _segmentSizeBytes;
 
-    SegmentBuildDescriptor(String segmentTarFilePath, long offset, String segmentDirPath, long buildTimeMillis,
-        long waitTimeMillis, long segmentSizeBytes) {
+    SegmentBuildDescriptor(String segmentTarFilePath, Map<String, File> metadataFileMap, long offset,
+        String segmentDirPath, long buildTimeMillis, long waitTimeMillis, long segmentSizeBytes) {
       _segmentTarFilePath = segmentTarFilePath;
+      _metadataFileMap = metadataFileMap;
       _offset = offset;
       _buildTimeMillis = buildTimeMillis;
       _waitTimeMillis = waitTimeMillis;
@@ -173,6 +178,10 @@ public class LLRealtimeSegmentDataManager extends RealtimeSegmentDataManager {
         FileUtils.deleteQuietly(new File(_segmentTarFilePath));
       }
     }
+
+    public Map<String, File> getMetadataFiles() {
+      return _metadataFileMap;
+    }
   }
 
   private static final long TIME_THRESHOLD_FOR_LOG_MINUTES = 1;
@@ -681,11 +690,32 @@ public class LLRealtimeSegmentDataManager extends RealtimeSegmentDataManager {
           TimeUnit.MILLISECONDS.toSeconds(waitTimeMillis));
 
       if (forCommit) {
+        File[] segmentfiles = destDir.listFiles();
+        if (segmentfiles == null || segmentfiles.length == 0) {
+          segmentLogger.error("The index dir is empty: {}", destDir);
+          return null;
+        }
+        // segmentfiles[0] is the sub directory with version name (e.g., V3).
+        File metadataFileName = new File(segmentfiles[0], V1Constants.MetadataKeys.METADATA_FILE_NAME);
+        if (!metadataFileName.exists()) {
+          segmentLogger
+              .error("File does not exist in {} for {}.", destDir, V1Constants.MetadataKeys.METADATA_FILE_NAME);
+          return null;
+        }
+        File creationMetaFile = new File(segmentfiles[0], V1Constants.SEGMENT_CREATION_META);
+        if (!creationMetaFile.exists()) {
+          segmentLogger.error("File does not exist in {} for {}.", destDir, V1Constants.SEGMENT_CREATION_META);
+          return null;
+        }
+
+        Map<String, File> metadataFiles = new HashMap<>();
+        metadataFiles.put(V1Constants.MetadataKeys.METADATA_FILE_NAME, metadataFileName);
+        metadataFiles.put(V1Constants.SEGMENT_CREATION_META, creationMetaFile);
         return new SegmentBuildDescriptor(destDir.getAbsolutePath() + TarGzCompressionUtils.TAR_GZ_FILE_EXTENSION,
-            _currentOffset, null, buildTimeMillis, waitTimeMillis, segmentSizeBytes);
+            metadataFiles, _currentOffset, null, buildTimeMillis, waitTimeMillis, segmentSizeBytes);
       }
-      return new SegmentBuildDescriptor(null, _currentOffset, destDir.getAbsolutePath(), buildTimeMillis,
-          waitTimeMillis, segmentSizeBytes);
+      return new SegmentBuildDescriptor(null, null, _currentOffset, destDir.getAbsolutePath(),
+          buildTimeMillis, waitTimeMillis, segmentSizeBytes);
     } catch (InterruptedException e) {
       segmentLogger.error("Interrupted while waiting for semaphore");
       return null;
@@ -735,7 +765,13 @@ public class LLRealtimeSegmentDataManager extends RealtimeSegmentDataManager {
     if (_isOffHeap) {
       params.withMemoryUsedBytes(_memoryManager.getTotalAllocatedBytes());
     }
-    SegmentCompletionProtocol.Response commitEndResponse = _protocolHandler.segmentCommitEnd(params);
+    SegmentCompletionProtocol.Response commitEndResponse;
+    if (_indexLoadingConfig.isEnableSplitCommitEndWithMetadata()) {
+      commitEndResponse = _protocolHandler.segmentCommitEndWithMetadata(params, _segmentBuildDescriptor.getMetadataFiles());
+    } else {
+      commitEndResponse = _protocolHandler.segmentCommitEnd(params);
+    }
+
     if (!commitEndResponse.getStatus().equals(SegmentCompletionProtocol.ControllerResponseStatus.COMMIT_SUCCESS)) {
       segmentLogger.warn("CommitEnd failed  with response {}", commitEndResponse.toJsonString());
       return SegmentCompletionProtocol.RESP_FAILED;
diff --git a/pinot-core/src/main/java/org/apache/pinot/core/segment/index/loader/IndexLoadingConfig.java b/pinot-core/src/main/java/org/apache/pinot/core/segment/index/loader/IndexLoadingConfig.java
index 7e46d0c..04f9beb 100644
--- a/pinot-core/src/main/java/org/apache/pinot/core/segment/index/loader/IndexLoadingConfig.java
+++ b/pinot-core/src/main/java/org/apache/pinot/core/segment/index/loader/IndexLoadingConfig.java
@@ -57,6 +57,7 @@ public class IndexLoadingConfig {
   private boolean _enableSplitCommit;
   private boolean _isRealtimeOffheapAllocation;
   private boolean _isDirectRealtimeOffheapAllocation;
+  private boolean _enableSplitCommitEndWithMetadata;
 
   public IndexLoadingConfig(@Nonnull InstanceDataManagerConfig instanceDataManagerConfig,
       @Nonnull TableConfig tableConfig) {
@@ -135,6 +136,7 @@ public class IndexLoadingConfig {
     if (avgMultiValueCount != null) {
       _realtimeAvgMultiValueCount = Integer.valueOf(avgMultiValueCount);
     }
+    _enableSplitCommitEndWithMetadata = instanceDataManagerConfig.isEnableSplitCommitEndWithMetadata();
   }
 
   /**
@@ -222,6 +224,8 @@ public class IndexLoadingConfig {
     return _enableSplitCommit;
   }
 
+  public boolean isEnableSplitCommitEndWithMetadata() { return _enableSplitCommitEndWithMetadata; }
+
   public boolean isRealtimeOffheapAllocation() {
     return _isRealtimeOffheapAllocation;
   }
diff --git a/pinot-core/src/main/java/org/apache/pinot/server/realtime/ServerSegmentCompletionProtocolHandler.java b/pinot-core/src/main/java/org/apache/pinot/server/realtime/ServerSegmentCompletionProtocolHandler.java
index c411d16..2261c55 100644
--- a/pinot-core/src/main/java/org/apache/pinot/server/realtime/ServerSegmentCompletionProtocolHandler.java
+++ b/pinot-core/src/main/java/org/apache/pinot/server/realtime/ServerSegmentCompletionProtocolHandler.java
@@ -20,6 +20,7 @@ package org.apache.pinot.server.realtime;
 
 import java.io.File;
 import java.net.URI;
+import java.util.Map;
 import javax.net.ssl.SSLContext;
 import org.apache.commons.configuration.Configuration;
 import org.apache.pinot.common.metrics.ServerMeter;
@@ -95,6 +96,8 @@ public class ServerSegmentCompletionProtocolHandler {
     return uploadSegment(url, params.getSegmentName(), segmentTarFile);
   }
 
+  // Replaced by segmentCommitEndWithMetadata().
+  @Deprecated
   public SegmentCompletionProtocol.Response segmentCommitEnd(SegmentCompletionProtocol.Request.Params params) {
     SegmentCompletionProtocol.SegmentCommitEndRequest request =
         new SegmentCompletionProtocol.SegmentCommitEndRequest(params);
@@ -105,6 +108,17 @@ public class ServerSegmentCompletionProtocolHandler {
     return sendRequest(url);
   }
 
+  public SegmentCompletionProtocol.Response segmentCommitEndWithMetadata(
+      SegmentCompletionProtocol.Request.Params params, final Map<String, File> metadataFiles) {
+    SegmentCompletionProtocol.SegmentCommitEndWithMetadataRequest request =
+        new SegmentCompletionProtocol.SegmentCommitEndWithMetadataRequest(params);
+    String url = createSegmentCompletionUrl(request);
+    if (url == null) {
+      return SegmentCompletionProtocol.RESP_NOT_SENT;
+    }
+    return sendCommitEndWithMetadataFiles(url, metadataFiles);
+  }
+
   public SegmentCompletionProtocol.Response segmentCommit(SegmentCompletionProtocol.Request.Params params,
       final File segmentTarFile) {
     SegmentCompletionProtocol.SegmentCommitRequest request = new SegmentCompletionProtocol.SegmentCommitRequest(params);
@@ -182,6 +196,29 @@ public class ServerSegmentCompletionProtocolHandler {
     return response;
   }
 
+  private SegmentCompletionProtocol.Response sendCommitEndWithMetadataFiles(String url,
+      Map<String, File> metadataFiles) {
+    SegmentCompletionProtocol.Response response;
+    try {
+      String responseStr = _fileUploadDownloadClient
+          .uploadSegmentMetadataFiles(new URI(url), metadataFiles, SEGMENT_UPLOAD_REQUEST_TIMEOUT_MS).getResponse();
+      response = SegmentCompletionProtocol.Response.fromJsonString(responseStr);
+      LOGGER.info("Controller response {} for {}", response.toJsonString(), url);
+      if (response.getStatus().equals(SegmentCompletionProtocol.ControllerResponseStatus.NOT_LEADER)) {
+        ControllerLeaderLocator.getInstance().invalidateCachedControllerLeader();
+      }
+    } catch (Exception e) {
+      // Catch all exceptions, we want the protocol to handle the case assuming the request was never sent.
+      response = SegmentCompletionProtocol.RESP_NOT_SENT;
+      LOGGER.error("Could not send request {}", url, e);
+      // Invalidate controller leader cache, as exception could be because of leader being down (deployment/failure) and hence unable to send {@link SegmentCompletionProtocol.ControllerResponseStatus.NOT_LEADER}
+      // If cache is not invalidated, we will not recover from exceptions until the controller comes back up
+      ControllerLeaderLocator.getInstance().invalidateCachedControllerLeader();
+    }
+    raiseSegmentCompletionProtocolResponseMetric(response);
+    return response;
+  }
+
   private SegmentCompletionProtocol.Response uploadSegment(String url, final String segmentName,
       final File segmentTarFile) {
     SegmentCompletionProtocol.Response response;
diff --git a/pinot-core/src/test/java/org/apache/pinot/core/data/manager/realtime/LLRealtimeSegmentDataManagerTest.java b/pinot-core/src/test/java/org/apache/pinot/core/data/manager/realtime/LLRealtimeSegmentDataManagerTest.java
index bf584d1..e87a85a 100644
--- a/pinot-core/src/test/java/org/apache/pinot/core/data/manager/realtime/LLRealtimeSegmentDataManagerTest.java
+++ b/pinot-core/src/test/java/org/apache/pinot/core/data/manager/realtime/LLRealtimeSegmentDataManagerTest.java
@@ -766,7 +766,7 @@ public class LLRealtimeSegmentDataManagerTest {
         return null;
       }
       if (!forCommit) {
-        return new SegmentBuildDescriptor(null, getCurrentOffset(), _segmentDir, 0, 0, -1);
+        return new SegmentBuildDescriptor(null, null, getCurrentOffset(), _segmentDir, 0, 0, -1);
       }
       final String segTarFileName = _segmentDir + "/" + "segmentFile";
       File segmentTgzFile = new File(segTarFileName);
@@ -775,7 +775,7 @@ public class LLRealtimeSegmentDataManagerTest {
       } catch (IOException e) {
         Assert.fail("Could not create file " + segmentTgzFile);
       }
-      return new SegmentBuildDescriptor(segTarFileName, getCurrentOffset(), null, 0, 0, -1);
+      return new SegmentBuildDescriptor(segTarFileName, null, getCurrentOffset(), null, 0, 0, -1);
     }
 
     @Override
diff --git a/pinot-integration-tests/src/test/java/org/apache/pinot/integration/tests/LLCRealtimeClusterSplitCommitIntegrationTest.java b/pinot-integration-tests/src/test/java/org/apache/pinot/integration/tests/LLCRealtimeClusterSplitCommitIntegrationTest.java
index fca5d99..454cdb4 100644
--- a/pinot-integration-tests/src/test/java/org/apache/pinot/integration/tests/LLCRealtimeClusterSplitCommitIntegrationTest.java
+++ b/pinot-integration-tests/src/test/java/org/apache/pinot/integration/tests/LLCRealtimeClusterSplitCommitIntegrationTest.java
@@ -27,7 +27,6 @@ import org.apache.pinot.controller.ControllerConf;
  * Integration test that extends LLCRealtimeClusterIntegrationTest but with split commit enabled.
  */
 public class LLCRealtimeClusterSplitCommitIntegrationTest extends LLCRealtimeClusterIntegrationTest {
-
   @Override
   public void startController() {
     ControllerConf controllerConfig = getDefaultControllerConfiguration();
@@ -39,6 +38,7 @@ public class LLCRealtimeClusterSplitCommitIntegrationTest extends LLCRealtimeClu
   public void startServer() {
     Configuration serverConfig = getDefaultServerConfiguration();
     serverConfig.setProperty(CommonConstants.Server.CONFIG_OF_ENABLE_SPLIT_COMMIT, true);
+    serverConfig.setProperty(CommonConstants.Server.CONFIG_OF_ENABLE_COMMIT_END_WITH_METADATA, true);
     startServer(serverConfig);
   }
 }
diff --git a/pinot-server/src/main/java/org/apache/pinot/server/starter/helix/HelixInstanceDataManagerConfig.java b/pinot-server/src/main/java/org/apache/pinot/server/starter/helix/HelixInstanceDataManagerConfig.java
index 2154d88..5439618 100644
--- a/pinot-server/src/main/java/org/apache/pinot/server/starter/helix/HelixInstanceDataManagerConfig.java
+++ b/pinot-server/src/main/java/org/apache/pinot/server/starter/helix/HelixInstanceDataManagerConfig.java
@@ -68,6 +68,8 @@ public class HelixInstanceDataManagerConfig implements InstanceDataManagerConfig
 
   // Key of whether to enable split commit
   private static final String ENABLE_SPLIT_COMMIT = "enable.split.commit";
+  // Key of whether to enable split commit end with segment metadata files.
+  private static final String ENABLE_SPLIT_COMMIT_END_WITH_METADATA = "enable.commitend.metadata";
 
   // Whether memory for realtime consuming segments should be allocated off-heap.
   private static final String REALTIME_OFFHEAP_ALLOCATION = "realtime.alloc.offheap";
@@ -164,6 +166,11 @@ public class HelixInstanceDataManagerConfig implements InstanceDataManagerConfig
   }
 
   @Override
+  public boolean isEnableSplitCommitEndWithMetadata() {
+    return _instanceDataManagerConfiguration.getBoolean(ENABLE_SPLIT_COMMIT_END_WITH_METADATA, true);
+  }
+
+  @Override
   public boolean isRealtimeOffHeapAllocation() {
     return _instanceDataManagerConfiguration.getBoolean(REALTIME_OFFHEAP_ALLOCATION, false);
   }


---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscribe@pinot.apache.org
For additional commands, e-mail: commits-help@pinot.apache.org