You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@pinot.apache.org by GitBox <gi...@apache.org> on 2019/11/01 18:38:58 UTC

[GitHub] [incubator-pinot] mcvsubbu commented on a change in pull request #4713: Refactoring realtime segment committer

mcvsubbu commented on a change in pull request #4713: Refactoring realtime segment committer
URL: https://github.com/apache/incubator-pinot/pull/4713#discussion_r341702522
 
 

 ##########
 File path: pinot-core/src/main/java/org/apache/pinot/core/data/manager/realtime/LLRealtimeSegmentDataManager.java
 ##########
 @@ -837,22 +781,28 @@ protected boolean commitSegment(SegmentCompletionProtocol.Response response) {
     return true;
   }
 
-  protected SegmentCompletionProtocol.Response postSegmentCommitMsg() {
-    final File segmentTarFile = new File(_segmentBuildDescriptor.getSegmentTarFilePath());
+  protected SegmentCompletionProtocol.Response commit(SegmentCompletionProtocol.Response response) {
     SegmentCompletionProtocol.Request.Params params = new SegmentCompletionProtocol.Request.Params();
-    params.withInstanceId(_instanceId).withOffset(_currentOffset).withSegmentName(_segmentNameStr)
-        .withNumRows(_numRowsConsumed).withInstanceId(_instanceId)
-        .withBuildTimeMillis(_segmentBuildDescriptor.getBuildTimeMillis())
+
+    params.withSegmentName(_segmentNameStr).withOffset(_currentOffset).withNumRows(_numRowsConsumed)
+        .withInstanceId(_instanceId).withBuildTimeMillis(_segmentBuildDescriptor.getBuildTimeMillis())
         .withSegmentSizeBytes(_segmentBuildDescriptor.getSegmentSizeBytes())
         .withWaitTimeMillis(_segmentBuildDescriptor.getWaitTimeMillis());
     if (_isOffHeap) {
       params.withMemoryUsedBytes(_memoryManager.getTotalAllocatedBytes());
     }
-    SegmentCompletionProtocol.Response response = _protocolHandler.segmentCommit(params, segmentTarFile);
-    if (!response.getStatus().equals(SegmentCompletionProtocol.ControllerResponseStatus.COMMIT_SUCCESS)) {
-      segmentLogger.warn("Commit failed  with response {}", response.toJsonString());
+
+    Configuration config = new PropertiesConfiguration();
+
+    if (response.isSplitCommit() && _indexLoadingConfig.isEnableSplitCommit()) {
+      config.setProperty(CommonConstants.Segment.Realtime.COMMITTER_CLASS, SplitSegmentCommitter.class.getName());
 
 Review comment:
   Like we discussed, I think Config is a bit too much of overhead here. We really cannot avoid the if statement before creating a committer (since it still depends on the controller's response to the server's segmentConsumed() call), we can call different methods in the factory (like createDefaultCommitter() vs createSplitCommitter()). The factory itself can be constructed in the ctor of LLRealtimeSegmentDataManager with parameters that are needed for both committers (indexLoadingConfig, segmentLogger, and perhaps even protocolHandler). thanks

----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
users@infra.apache.org


With regards,
Apache Git Services

---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscribe@pinot.apache.org
For additional commands, e-mail: commits-help@pinot.apache.org