You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@pinot.apache.org by je...@apache.org on 2019/10/16 18:05:46 UTC

[incubator-pinot] branch realtimeref created (now 4e8b535)

This is an automated email from the ASF dual-hosted git repository.

jenniferdai pushed a change to branch realtimeref
in repository https://gitbox.apache.org/repos/asf/incubator-pinot.git.


      at 4e8b535  Refactoring realtime segment committer

This branch includes the following new commits:

     new 4e8b535  Refactoring realtime segment committer

The 1 revisions listed above as "new" are entirely new to this
repository and will be described in separate emails.  The revisions
listed as "add" were already present in the repository and have only
been added to this reference.



---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscribe@pinot.apache.org
For additional commands, e-mail: commits-help@pinot.apache.org


[incubator-pinot] 01/01: Refactoring realtime segment committer

Posted by je...@apache.org.
This is an automated email from the ASF dual-hosted git repository.

jenniferdai pushed a commit to branch realtimeref
in repository https://gitbox.apache.org/repos/asf/incubator-pinot.git

commit 4e8b535c84e4d029d85fa50db0e704b30c700df5
Author: Jennifer Dai <jd...@linkedin.com>
AuthorDate: Wed Oct 16 11:05:21 2019 -0700

    Refactoring realtime segment committer
---
 .../manager/realtime/DefaultSegmentCommitter.java  | 54 ++++++++++++++
 .../realtime/LLRealtimeSegmentDataManager.java     | 84 ++++-----------------
 .../data/manager/realtime/SegmentCommitter.java    | 29 ++++++++
 .../manager/realtime/SplitSegmentCommitter.java    | 86 ++++++++++++++++++++++
 4 files changed, 183 insertions(+), 70 deletions(-)

diff --git a/pinot-core/src/main/java/org/apache/pinot/core/data/manager/realtime/DefaultSegmentCommitter.java b/pinot-core/src/main/java/org/apache/pinot/core/data/manager/realtime/DefaultSegmentCommitter.java
new file mode 100644
index 0000000..84bde7a
--- /dev/null
+++ b/pinot-core/src/main/java/org/apache/pinot/core/data/manager/realtime/DefaultSegmentCommitter.java
@@ -0,0 +1,54 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.pinot.core.data.manager.realtime;
+
+import java.io.File;
+import org.apache.pinot.common.protocols.SegmentCompletionProtocol;
+import org.apache.pinot.server.realtime.ServerSegmentCompletionProtocolHandler;
+import org.slf4j.Logger;
+
+
+/**
+ * Sends segmentCommit() to the controller.
+ * If that succeeds, swap in-memory segment with the one built.
+ */
+public class DefaultSegmentCommitter implements SegmentCommitter{
+  private SegmentCompletionProtocol.Request.Params _params;
+  private ServerSegmentCompletionProtocolHandler _protocolHandler;
+
+  private Logger _segmentLogger;
+
+  public DefaultSegmentCommitter(SegmentCompletionProtocol.Request.Params params, ServerSegmentCompletionProtocolHandler protocolHandler, Logger segmentLogger) {
+    _params = params;
+    _protocolHandler = protocolHandler;
+
+    _segmentLogger = segmentLogger;
+  }
+
+  @Override
+  public SegmentCompletionProtocol.Response commit(long currentOffset, int numRows, LLRealtimeSegmentDataManager.SegmentBuildDescriptor segmentBuildDescriptor) {
+    final File segmentTarFile = new File(segmentBuildDescriptor.getSegmentTarFilePath());
+
+    SegmentCompletionProtocol.Response response = _protocolHandler.segmentCommit(_params, segmentTarFile);
+    if (!response.getStatus().equals(SegmentCompletionProtocol.ControllerResponseStatus.COMMIT_SUCCESS)) {
+      _segmentLogger.warn("Commit failed  with response {}", response.toJsonString());
+    }
+    return response;
+  }
+}
diff --git a/pinot-core/src/main/java/org/apache/pinot/core/data/manager/realtime/LLRealtimeSegmentDataManager.java b/pinot-core/src/main/java/org/apache/pinot/core/data/manager/realtime/LLRealtimeSegmentDataManager.java
index 7bfb348..5c286bb 100644
--- a/pinot-core/src/main/java/org/apache/pinot/core/data/manager/realtime/LLRealtimeSegmentDataManager.java
+++ b/pinot-core/src/main/java/org/apache/pinot/core/data/manager/realtime/LLRealtimeSegmentDataManager.java
@@ -759,58 +759,6 @@ public class LLRealtimeSegmentDataManager extends RealtimeSegmentDataManager {
     }
   }
 
-  private SegmentCompletionProtocol.Response doSplitCommit(SegmentCompletionProtocol.Response prevResponse) {
-    final File segmentTarFile = new File(_segmentBuildDescriptor.getSegmentTarFilePath());
-    SegmentCompletionProtocol.Request.Params params = new SegmentCompletionProtocol.Request.Params();
-
-    params.withSegmentName(_segmentNameStr).withOffset(_currentOffset).withNumRows(_numRowsConsumed)
-        .withInstanceId(_instanceId).withBuildTimeMillis(_segmentBuildDescriptor.getBuildTimeMillis())
-        .withSegmentSizeBytes(_segmentBuildDescriptor.getSegmentSizeBytes())
-        .withWaitTimeMillis(_segmentBuildDescriptor.getWaitTimeMillis());
-    if (_isOffHeap) {
-      params.withMemoryUsedBytes(_memoryManager.getTotalAllocatedBytes());
-    }
-    SegmentCompletionProtocol.Response segmentCommitStartResponse = _protocolHandler.segmentCommitStart(params);
-    if (!segmentCommitStartResponse.getStatus()
-        .equals(SegmentCompletionProtocol.ControllerResponseStatus.COMMIT_CONTINUE)) {
-      segmentLogger.warn("CommitStart failed  with response {}", segmentCommitStartResponse.toJsonString());
-      return SegmentCompletionProtocol.RESP_FAILED;
-    }
-
-    params = new SegmentCompletionProtocol.Request.Params();
-    params.withOffset(_currentOffset).withSegmentName(_segmentNameStr).withInstanceId(_instanceId);
-    SegmentCompletionProtocol.Response segmentCommitUploadResponse =
-        _protocolHandler.segmentCommitUpload(params, segmentTarFile, prevResponse.getControllerVipUrl());
-    if (!segmentCommitUploadResponse.getStatus()
-        .equals(SegmentCompletionProtocol.ControllerResponseStatus.UPLOAD_SUCCESS)) {
-      segmentLogger.warn("Segment upload failed  with response {}", segmentCommitUploadResponse.toJsonString());
-      return SegmentCompletionProtocol.RESP_FAILED;
-    }
-
-    params = new SegmentCompletionProtocol.Request.Params();
-    params.withInstanceId(_instanceId).withOffset(_currentOffset).withSegmentName(_segmentNameStr)
-        .withSegmentLocation(segmentCommitUploadResponse.getSegmentLocation()).withNumRows(_numRowsConsumed)
-        .withBuildTimeMillis(_segmentBuildDescriptor.getBuildTimeMillis())
-        .withSegmentSizeBytes(_segmentBuildDescriptor.getSegmentSizeBytes())
-        .withWaitTimeMillis(_segmentBuildDescriptor.getWaitTimeMillis());
-    if (_isOffHeap) {
-      params.withMemoryUsedBytes(_memoryManager.getTotalAllocatedBytes());
-    }
-    SegmentCompletionProtocol.Response commitEndResponse;
-    if (_indexLoadingConfig.isEnableSplitCommitEndWithMetadata()) {
-      commitEndResponse =
-          _protocolHandler.segmentCommitEndWithMetadata(params, _segmentBuildDescriptor.getMetadataFiles());
-    } else {
-      commitEndResponse = _protocolHandler.segmentCommitEnd(params);
-    }
-
-    if (!commitEndResponse.getStatus().equals(SegmentCompletionProtocol.ControllerResponseStatus.COMMIT_SUCCESS)) {
-      segmentLogger.warn("CommitEnd failed  with response {}", commitEndResponse.toJsonString());
-      return SegmentCompletionProtocol.RESP_FAILED;
-    }
-    return commitEndResponse;
-  }
-
   protected boolean commitSegment(SegmentCompletionProtocol.Response response) {
     final String segTarFileName = _segmentBuildDescriptor.getSegmentTarFilePath();
     File segTarFile = new File(segTarFileName);
@@ -818,15 +766,9 @@ public class LLRealtimeSegmentDataManager extends RealtimeSegmentDataManager {
       throw new RuntimeException("Segment file does not exist:" + segTarFileName);
     }
     SegmentCompletionProtocol.Response returnedResponse;
-    if (response.isSplitCommit() && _indexLoadingConfig.isEnableSplitCommit()) {
-      // Send segmentStart, segmentUpload, & segmentCommitEnd to the controller
-      // if that succeeds, swap in-memory segment with the one built.
-      returnedResponse = doSplitCommit(response);
-    } else {
-      // Send segmentCommit() to the controller
-      // if that succeeds, swap in-memory segment with the one built.
-      returnedResponse = postSegmentCommitMsg();
-    }
+
+    boolean isSplitCommit = response.isSplitCommit() && _indexLoadingConfig.isEnableSplitCommit();
+    returnedResponse = commit(response, isSplitCommit);
 
     if (!returnedResponse.getStatus().equals(SegmentCompletionProtocol.ControllerResponseStatus.COMMIT_SUCCESS)) {
       return false;
@@ -837,22 +779,24 @@ public class LLRealtimeSegmentDataManager extends RealtimeSegmentDataManager {
     return true;
   }
 
-  protected SegmentCompletionProtocol.Response postSegmentCommitMsg() {
-    final File segmentTarFile = new File(_segmentBuildDescriptor.getSegmentTarFilePath());
+  private SegmentCompletionProtocol.Response commit(SegmentCompletionProtocol.Response response, boolean isSplitCommit) {
     SegmentCompletionProtocol.Request.Params params = new SegmentCompletionProtocol.Request.Params();
-    params.withInstanceId(_instanceId).withOffset(_currentOffset).withSegmentName(_segmentNameStr)
-        .withNumRows(_numRowsConsumed).withInstanceId(_instanceId)
-        .withBuildTimeMillis(_segmentBuildDescriptor.getBuildTimeMillis())
+
+    params.withSegmentName(_segmentNameStr).withOffset(_currentOffset).withNumRows(_numRowsConsumed)
+        .withInstanceId(_instanceId).withBuildTimeMillis(_segmentBuildDescriptor.getBuildTimeMillis())
         .withSegmentSizeBytes(_segmentBuildDescriptor.getSegmentSizeBytes())
         .withWaitTimeMillis(_segmentBuildDescriptor.getWaitTimeMillis());
     if (_isOffHeap) {
       params.withMemoryUsedBytes(_memoryManager.getTotalAllocatedBytes());
     }
-    SegmentCompletionProtocol.Response response = _protocolHandler.segmentCommit(params, segmentTarFile);
-    if (!response.getStatus().equals(SegmentCompletionProtocol.ControllerResponseStatus.COMMIT_SUCCESS)) {
-      segmentLogger.warn("Commit failed  with response {}", response.toJsonString());
+
+    if (isSplitCommit) {
+      SplitSegmentCommitter splitSegmentCommitter = new SplitSegmentCommitter(params, _protocolHandler, response, _indexLoadingConfig.isEnableSplitCommitEndWithMetadata(), segmentLogger);
+      return splitSegmentCommitter.commit(_currentOffset, _numRowsConsumed, _segmentBuildDescriptor);
+    } else {
+      DefaultSegmentCommitter defaultSegmentCommitter = new DefaultSegmentCommitter(params, _protocolHandler, segmentLogger);
+      return defaultSegmentCommitter.commit(_currentOffset, _numRowsConsumed, _segmentBuildDescriptor);
     }
-    return response;
   }
 
   protected boolean buildSegmentAndReplace() {
diff --git a/pinot-core/src/main/java/org/apache/pinot/core/data/manager/realtime/SegmentCommitter.java b/pinot-core/src/main/java/org/apache/pinot/core/data/manager/realtime/SegmentCommitter.java
new file mode 100644
index 0000000..d63bcf6
--- /dev/null
+++ b/pinot-core/src/main/java/org/apache/pinot/core/data/manager/realtime/SegmentCommitter.java
@@ -0,0 +1,29 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.pinot.core.data.manager.realtime;
+
+import org.apache.pinot.common.protocols.SegmentCompletionProtocol;
+
+
+/**
+ * For committing realtime segments.
+ */
+public interface SegmentCommitter {
+  SegmentCompletionProtocol.Response commit(long currentOffset, int numRows, LLRealtimeSegmentDataManager.SegmentBuildDescriptor segmentBuildDescriptor);
+}
diff --git a/pinot-core/src/main/java/org/apache/pinot/core/data/manager/realtime/SplitSegmentCommitter.java b/pinot-core/src/main/java/org/apache/pinot/core/data/manager/realtime/SplitSegmentCommitter.java
new file mode 100644
index 0000000..8fbf646
--- /dev/null
+++ b/pinot-core/src/main/java/org/apache/pinot/core/data/manager/realtime/SplitSegmentCommitter.java
@@ -0,0 +1,86 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.pinot.core.data.manager.realtime;
+
+import java.io.File;
+import org.apache.pinot.common.protocols.SegmentCompletionProtocol;
+import org.apache.pinot.server.realtime.ServerSegmentCompletionProtocolHandler;
+import org.slf4j.Logger;
+
+
+/**
+ * Sends segmentStart, segmentUpload, & segmentCommitEnd to the controller
+ * If that succeeds, swap in-memory segment with the one built.
+ */
+public class SplitSegmentCommitter implements SegmentCommitter {
+  private SegmentCompletionProtocol.Request.Params _params;
+  private ServerSegmentCompletionProtocolHandler _protocolHandler;
+  private SegmentCompletionProtocol.Response _prevResponse;
+  private boolean _isEnableSplitCommitWithMetadata;
+
+  private Logger _segmentLogger;
+
+  public SplitSegmentCommitter(SegmentCompletionProtocol.Request.Params params, ServerSegmentCompletionProtocolHandler protocolHandler, SegmentCompletionProtocol.Response prevResponse, boolean isEnableSplitCommitWithMetadata, Logger segmentLogger) {
+    _params = params;
+    _protocolHandler = protocolHandler;
+    _prevResponse = prevResponse;
+    _isEnableSplitCommitWithMetadata = isEnableSplitCommitWithMetadata;
+
+    _segmentLogger = segmentLogger;
+  }
+
+  @Override
+  public SegmentCompletionProtocol.Response commit(long currentOffset, int numRowsConsumed, LLRealtimeSegmentDataManager.SegmentBuildDescriptor segmentBuildDescriptor) {
+    final File segmentTarFile = new File(segmentBuildDescriptor.getSegmentTarFilePath());
+
+    SegmentCompletionProtocol.Response segmentCommitStartResponse = _protocolHandler.segmentCommitStart(_params);
+    if (!segmentCommitStartResponse.getStatus()
+        .equals(SegmentCompletionProtocol.ControllerResponseStatus.COMMIT_CONTINUE)) {
+      _segmentLogger.warn("CommitStart failed  with response {}", segmentCommitStartResponse.toJsonString());
+      return SegmentCompletionProtocol.RESP_FAILED;
+    }
+
+    SegmentCompletionProtocol.Request.Params params = new SegmentCompletionProtocol.Request.Params();
+    params.withInstanceId(_params.getInstanceId()).withOffset(currentOffset).withSegmentName(_params.getSegmentName());
+    SegmentCompletionProtocol.Response segmentCommitUploadResponse =
+        _protocolHandler.segmentCommitUpload(params, segmentTarFile, _prevResponse.getControllerVipUrl());
+    if (!segmentCommitUploadResponse.getStatus()
+        .equals(SegmentCompletionProtocol.ControllerResponseStatus.UPLOAD_SUCCESS)) {
+      _segmentLogger.warn("Segment upload failed with response {}", segmentCommitUploadResponse.toJsonString());
+      return SegmentCompletionProtocol.RESP_FAILED;
+    }
+
+    _params.withOffset(currentOffset).withSegmentLocation(segmentCommitUploadResponse.getSegmentLocation())
+        .withNumRows(numRowsConsumed);
+
+    SegmentCompletionProtocol.Response commitEndResponse;
+    if (_isEnableSplitCommitWithMetadata) {
+      commitEndResponse =
+          _protocolHandler.segmentCommitEndWithMetadata(params, segmentBuildDescriptor.getMetadataFiles());
+    } else {
+      commitEndResponse = _protocolHandler.segmentCommitEnd(params);
+    }
+
+    if (!commitEndResponse.getStatus().equals(SegmentCompletionProtocol.ControllerResponseStatus.COMMIT_SUCCESS)) {
+      _segmentLogger.warn("CommitEnd failed with response {}", commitEndResponse.toJsonString());
+      return SegmentCompletionProtocol.RESP_FAILED;
+    }
+    return commitEndResponse;
+  }
+}


---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscribe@pinot.apache.org
For additional commands, e-mail: commits-help@pinot.apache.org