You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@pinot.apache.org by ap...@apache.org on 2020/07/24 18:47:40 UTC

[incubator-pinot] 04/04: [Deepstore by-passing]Introduce a subclasses SplitSegmentCommitter which will proceeds to commit even if the segment upload fails. (#5700)

This is an automated email from the ASF dual-hosted git repository.

apucher pushed a commit to branch thirdeye-temp
in repository https://gitbox.apache.org/repos/asf/incubator-pinot.git

commit abd7de7cb414bb8899bb4fe1be469ad6b97d8703
Author: Ting Chen <ti...@uber.com>
AuthorDate: Thu Jul 23 17:13:34 2020 -0700

    [Deepstore by-passing]Introduce a subclasses SplitSegmentCommitter which will proceeds to commit even if the segment upload fails. (#5700)
    
    * First commit to enable by passing segment store in LLC.
    
    * Fix a compilation issue.
    
    * Fix an unit test.
    
    * Refactor the segment committer factory api.
    
    * Introduction a new constant for peer download scheme.
    
    * Fix a typo.
    
    * Add a TODO on how to control split commit behavior and refactor the uploadSegment method.
    
    * Remove unused vars.
---
 .../apache/pinot/common/utils/CommonConstants.java |  1 +
 .../realtime/PinotLLCRealtimeSegmentManager.java   |  8 ++--
 .../PinotLLCRealtimeSegmentManagerTest.java        |  3 +-
 .../helix/core/realtime/SegmentCompletionTest.java |  3 +-
 .../realtime/LLRealtimeSegmentDataManager.java     | 24 +++--------
 .../realtime/PeerSchemeSplitSegmentCommitter.java  | 48 ++++++++++++++++++++++
 .../manager/realtime/PinotFSSegmentUploader.java   |  2 +
 .../manager/realtime/SegmentCommitterFactory.java  | 37 +++++++++++++----
 .../manager/realtime/SplitSegmentCommitter.java    | 14 ++++++-
 .../segment/index/loader/IndexLoadingConfig.java   |  7 ++++
 .../realtime/LLRealtimeSegmentDataManagerTest.java |  2 +
 11 files changed, 117 insertions(+), 32 deletions(-)

diff --git a/pinot-common/src/main/java/org/apache/pinot/common/utils/CommonConstants.java b/pinot-common/src/main/java/org/apache/pinot/common/utils/CommonConstants.java
index 7c4a7b2..3e25512 100644
--- a/pinot-common/src/main/java/org/apache/pinot/common/utils/CommonConstants.java
+++ b/pinot-common/src/main/java/org/apache/pinot/common/utils/CommonConstants.java
@@ -343,6 +343,7 @@ public class CommonConstants {
     public static final String FLUSH_THRESHOLD_SIZE = "segment.flush.threshold.size";
     public static final String FLUSH_THRESHOLD_TIME = "segment.flush.threshold.time";
     public static final String PARTITION_METADATA = "segment.partition.metadata";
+    public static final String PEER_SEGMENT_DOWNLOAD_SCHEME = "peer://";
     /**
      * This field is used for parallel push protection to lock the segment globally.
      * We put the segment upload start timestamp so that if the previous push failed without unlock the segment, the
diff --git a/pinot-controller/src/main/java/org/apache/pinot/controller/helix/core/realtime/PinotLLCRealtimeSegmentManager.java b/pinot-controller/src/main/java/org/apache/pinot/controller/helix/core/realtime/PinotLLCRealtimeSegmentManager.java
index 1691553..06cc58d 100644
--- a/pinot-controller/src/main/java/org/apache/pinot/controller/helix/core/realtime/PinotLLCRealtimeSegmentManager.java
+++ b/pinot-controller/src/main/java/org/apache/pinot/controller/helix/core/realtime/PinotLLCRealtimeSegmentManager.java
@@ -398,8 +398,9 @@ public class PinotLLCRealtimeSegmentManager {
   }
 
   private boolean isPeerSegmentDownloadScheme(CommittingSegmentDescriptor committingSegmentDescriptor) {
-    return !(committingSegmentDescriptor == null) && !(committingSegmentDescriptor.getSegmentLocation() == null) &&
-        committingSegmentDescriptor.getSegmentLocation().toLowerCase().startsWith("peer://");
+    return !(committingSegmentDescriptor == null) && !(committingSegmentDescriptor.getSegmentLocation() == null)
+        && committingSegmentDescriptor.getSegmentLocation().toLowerCase()
+        .startsWith(CommonConstants.Segment.PEER_SEGMENT_DOWNLOAD_SCHEME);
   }
 
   /**
@@ -514,7 +515,8 @@ public class PinotLLCRealtimeSegmentManager {
   }
 
   private boolean isPeerURL(String segmentLocation) {
-    return segmentLocation != null && segmentLocation.toLowerCase().startsWith("peer://");
+    return segmentLocation != null && segmentLocation.toLowerCase()
+        .startsWith(CommonConstants.Segment.PEER_SEGMENT_DOWNLOAD_SCHEME);
   }
 
   /**
diff --git a/pinot-controller/src/test/java/org/apache/pinot/controller/helix/core/realtime/PinotLLCRealtimeSegmentManagerTest.java b/pinot-controller/src/test/java/org/apache/pinot/controller/helix/core/realtime/PinotLLCRealtimeSegmentManagerTest.java
index acd4664..e15d28c 100644
--- a/pinot-controller/src/test/java/org/apache/pinot/controller/helix/core/realtime/PinotLLCRealtimeSegmentManagerTest.java
+++ b/pinot-controller/src/test/java/org/apache/pinot/controller/helix/core/realtime/PinotLLCRealtimeSegmentManagerTest.java
@@ -45,6 +45,7 @@ import org.apache.helix.model.IdealState;
 import org.apache.pinot.common.assignment.InstancePartitions;
 import org.apache.pinot.common.metadata.segment.LLCRealtimeSegmentZKMetadata;
 import org.apache.pinot.common.metrics.ControllerMetrics;
+import org.apache.pinot.common.utils.CommonConstants;
 import org.apache.pinot.common.utils.CommonConstants.Helix;
 import org.apache.pinot.common.utils.CommonConstants.Helix.StateModel.SegmentStateModel;
 import org.apache.pinot.common.utils.CommonConstants.Segment.Realtime.Status;
@@ -801,7 +802,7 @@ public class PinotLLCRealtimeSegmentManagerTest {
 
     // Test case 2: segment location with peer format: peer://segment1, verify that an empty string is stored in zk.
     committingSegment = new LLCSegmentName(RAW_TABLE_NAME, 0, 1, CURRENT_TIME_MS).getSegmentName();
-    String peerSegmentLocation = "peer:///segment1";
+    String peerSegmentLocation = CommonConstants.Segment.PEER_SEGMENT_DOWNLOAD_SCHEME + "/segment1";
     committingSegmentDescriptor = new CommittingSegmentDescriptor(committingSegment,
         new LongMsgOffset(PARTITION_OFFSET.getOffset() + NUM_DOCS).toString(), 0L, peerSegmentLocation);
     committingSegmentDescriptor.setSegmentMetadata(mockSegmentMetadata());
diff --git a/pinot-controller/src/test/java/org/apache/pinot/controller/helix/core/realtime/SegmentCompletionTest.java b/pinot-controller/src/test/java/org/apache/pinot/controller/helix/core/realtime/SegmentCompletionTest.java
index 2723063..0166f9f 100644
--- a/pinot-controller/src/test/java/org/apache/pinot/controller/helix/core/realtime/SegmentCompletionTest.java
+++ b/pinot-controller/src/test/java/org/apache/pinot/controller/helix/core/realtime/SegmentCompletionTest.java
@@ -320,7 +320,8 @@ public class SegmentCompletionTest {
   @Test
   public void testHappyPathSplitCommitWithPeerDownloadScheme()
       throws Exception {
-    testHappyPathSplitCommit(5L, "peer:///segment1", "peer:///segment1");
+    testHappyPathSplitCommit(5L, CommonConstants.Segment.PEER_SEGMENT_DOWNLOAD_SCHEME + "/segment1",
+        CommonConstants.Segment.PEER_SEGMENT_DOWNLOAD_SCHEME + "/segment1");
   }
 
   @Test
diff --git a/pinot-core/src/main/java/org/apache/pinot/core/data/manager/realtime/LLRealtimeSegmentDataManager.java b/pinot-core/src/main/java/org/apache/pinot/core/data/manager/realtime/LLRealtimeSegmentDataManager.java
index 0a8b27d..8944e15 100644
--- a/pinot-core/src/main/java/org/apache/pinot/core/data/manager/realtime/LLRealtimeSegmentDataManager.java
+++ b/pinot-core/src/main/java/org/apache/pinot/core/data/manager/realtime/LLRealtimeSegmentDataManager.java
@@ -846,24 +846,12 @@ public class LLRealtimeSegmentDataManager extends RealtimeSegmentDataManager {
     }
 
     SegmentCommitter segmentCommitter;
-
-    if (isSplitCommit) {
-      // TODO: make segment uploader used in the segment committer configurable.
-      SegmentUploader segmentUploader;
-      try {
-        segmentUploader =
-            new Server2ControllerSegmentUploader(segmentLogger, _protocolHandler.getFileUploadDownloadClient(),
-                _protocolHandler.getSegmentCommitUploadURL(params, controllerVipUrl), _segmentNameStr,
-                ServerSegmentCompletionProtocolHandler.getSegmentUploadRequestTimeoutMs(), _serverMetrics);
-      } catch (URISyntaxException e) {
-        segmentLogger.error("Segment commit upload url error: ", e);
-        return SegmentCompletionProtocol.RESP_NOT_SENT;
-      }
-      segmentCommitter = _segmentCommitterFactory.createSplitSegmentCommitter(params, segmentUploader);
-    } else {
-      segmentCommitter = _segmentCommitterFactory.createDefaultSegmentCommitter(params);
+    try {
+      segmentCommitter = _segmentCommitterFactory.createSegmentCommitter(isSplitCommit, params, controllerVipUrl);
+    } catch (URISyntaxException e) {
+      segmentLogger.error("Failed to create a segment committer: ", e);
+      return SegmentCompletionProtocol.RESP_NOT_SENT;
     }
-
     return segmentCommitter.commit(_segmentBuildDescriptor);
   }
 
@@ -1268,7 +1256,7 @@ public class LLRealtimeSegmentDataManager extends RealtimeSegmentDataManager {
       _consumeEndTime = now + minConsumeTimeMillis;
     }
 
-    _segmentCommitterFactory = new SegmentCommitterFactory(segmentLogger, _protocolHandler);
+    _segmentCommitterFactory = new SegmentCommitterFactory(segmentLogger, _protocolHandler, tableConfig, indexLoadingConfig, serverMetrics);
 
     segmentLogger
         .info("Starting consumption on realtime consuming segment {} maxRowCount {} maxEndTime {}", _llcSegmentName,
diff --git a/pinot-core/src/main/java/org/apache/pinot/core/data/manager/realtime/PeerSchemeSplitSegmentCommitter.java b/pinot-core/src/main/java/org/apache/pinot/core/data/manager/realtime/PeerSchemeSplitSegmentCommitter.java
new file mode 100644
index 0000000..118f0ff
--- /dev/null
+++ b/pinot-core/src/main/java/org/apache/pinot/core/data/manager/realtime/PeerSchemeSplitSegmentCommitter.java
@@ -0,0 +1,48 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.pinot.core.data.manager.realtime;
+
+import java.io.File;
+import java.net.URI;
+import org.apache.pinot.common.protocols.SegmentCompletionProtocol;
+import org.apache.pinot.common.utils.CommonConstants;
+import org.apache.pinot.common.utils.LLCSegmentName;
+import org.apache.pinot.common.utils.StringUtil;
+import org.apache.pinot.server.realtime.ServerSegmentCompletionProtocolHandler;
+import org.slf4j.Logger;
+
+
+public class PeerSchemeSplitSegmentCommitter extends SplitSegmentCommitter {
+  public PeerSchemeSplitSegmentCommitter(Logger segmentLogger, ServerSegmentCompletionProtocolHandler protocolHandler,
+      SegmentCompletionProtocol.Request.Params params, SegmentUploader segmentUploader) {
+    super(segmentLogger, protocolHandler, params, segmentUploader);
+  }
+
+  // Always return a uri string even if the segment upload fails and returns a null uri.
+  // If the segment upload fails, put peer:///segment_name in the segment location to notify the controller it is a
+  // peer download scheme.
+  protected String uploadSegment(File segmentTarFile, SegmentUploader segmentUploader,
+      SegmentCompletionProtocol.Request.Params params) {
+    URI segmentLocation = segmentUploader.uploadSegment(segmentTarFile, new LLCSegmentName(params.getSegmentName()));
+    if (segmentLocation == null) {
+      return StringUtil.join("/", CommonConstants.Segment.PEER_SEGMENT_DOWNLOAD_SCHEME, params.getSegmentName());
+    }
+    return segmentLocation.toString();
+  }
+}
diff --git a/pinot-core/src/main/java/org/apache/pinot/core/data/manager/realtime/PinotFSSegmentUploader.java b/pinot-core/src/main/java/org/apache/pinot/core/data/manager/realtime/PinotFSSegmentUploader.java
index 5dae4e4..75a7fff 100644
--- a/pinot-core/src/main/java/org/apache/pinot/core/data/manager/realtime/PinotFSSegmentUploader.java
+++ b/pinot-core/src/main/java/org/apache/pinot/core/data/manager/realtime/PinotFSSegmentUploader.java
@@ -40,6 +40,8 @@ import org.slf4j.LoggerFactory;
  */
 public class PinotFSSegmentUploader implements SegmentUploader {
   private Logger LOGGER = LoggerFactory.getLogger(PinotFSSegmentUploader.class);
+  public static final int DEFAULT_SEGMENT_UPLOAD_TIMEOUT_MILLIS = 10 * 1000;
+
   private String _segmentStoreUriStr;
   private ExecutorService _executorService = Executors.newCachedThreadPool();
   private int _timeoutInMs;
diff --git a/pinot-core/src/main/java/org/apache/pinot/core/data/manager/realtime/SegmentCommitterFactory.java b/pinot-core/src/main/java/org/apache/pinot/core/data/manager/realtime/SegmentCommitterFactory.java
index df0f6b8..2d8154e 100644
--- a/pinot-core/src/main/java/org/apache/pinot/core/data/manager/realtime/SegmentCommitterFactory.java
+++ b/pinot-core/src/main/java/org/apache/pinot/core/data/manager/realtime/SegmentCommitterFactory.java
@@ -18,8 +18,12 @@
  */
 package org.apache.pinot.core.data.manager.realtime;
 
+import java.net.URISyntaxException;
+import org.apache.pinot.common.metrics.ServerMetrics;
 import org.apache.pinot.common.protocols.SegmentCompletionProtocol;
+import org.apache.pinot.core.segment.index.loader.IndexLoadingConfig;
 import org.apache.pinot.server.realtime.ServerSegmentCompletionProtocolHandler;
+import org.apache.pinot.spi.config.table.TableConfig;
 import org.slf4j.Logger;
 
 
@@ -29,18 +33,37 @@ import org.slf4j.Logger;
 public class SegmentCommitterFactory {
   private static Logger LOGGER;
   private final ServerSegmentCompletionProtocolHandler _protocolHandler;
+  private final TableConfig _tableConfig;
+  private final ServerMetrics _serverMetrics;
+  private final IndexLoadingConfig _indexLoadingConfig;
 
-  public SegmentCommitterFactory(Logger segmentLogger, ServerSegmentCompletionProtocolHandler protocolHandler) {
+  public SegmentCommitterFactory(Logger segmentLogger, ServerSegmentCompletionProtocolHandler protocolHandler,
+      TableConfig tableConfig, IndexLoadingConfig indexLoadingConfig, ServerMetrics serverMetrics) {
     LOGGER = segmentLogger;
     _protocolHandler = protocolHandler;
+    _tableConfig = tableConfig;
+    _indexLoadingConfig = indexLoadingConfig;
+    _serverMetrics = serverMetrics;
   }
 
-  public SegmentCommitter createSplitSegmentCommitter(SegmentCompletionProtocol.Request.Params params,
-      SegmentUploader segmentUploader) {
-    return new SplitSegmentCommitter(LOGGER, _protocolHandler, params, segmentUploader);
-  }
+  public SegmentCommitter createSegmentCommitter(boolean isSplitCommit, SegmentCompletionProtocol.Request.Params params,
+      String controllerVipUrl)
+      throws URISyntaxException {
+    if (!isSplitCommit) {
+      return new DefaultSegmentCommitter(LOGGER, _protocolHandler, params);
+    }
+    SegmentUploader segmentUploader;
+    // TODO Instead of using a peer segment download scheme to control how the servers do split commit, we should use
+    // other configs such as server or controller configs or controller responses to the servers.
+    if (_tableConfig.getValidationConfig().getPeerSegmentDownloadScheme() != null) {
+      segmentUploader = new PinotFSSegmentUploader(_indexLoadingConfig.getSegmentStoreURI(),
+          PinotFSSegmentUploader.DEFAULT_SEGMENT_UPLOAD_TIMEOUT_MILLIS);
+      return new PeerSchemeSplitSegmentCommitter(LOGGER, _protocolHandler, params, segmentUploader);
+    }
 
-  public SegmentCommitter createDefaultSegmentCommitter(SegmentCompletionProtocol.Request.Params params) {
-    return new DefaultSegmentCommitter(LOGGER, _protocolHandler, params);
+    segmentUploader = new Server2ControllerSegmentUploader(LOGGER, _protocolHandler.getFileUploadDownloadClient(),
+        _protocolHandler.getSegmentCommitUploadURL(params, controllerVipUrl), params.getSegmentName(),
+        ServerSegmentCompletionProtocolHandler.getSegmentUploadRequestTimeoutMs(), _serverMetrics);
+    return new SplitSegmentCommitter(LOGGER, _protocolHandler, params, segmentUploader);
   }
 }
diff --git a/pinot-core/src/main/java/org/apache/pinot/core/data/manager/realtime/SplitSegmentCommitter.java b/pinot-core/src/main/java/org/apache/pinot/core/data/manager/realtime/SplitSegmentCommitter.java
index 8d73498..33b2ac0 100644
--- a/pinot-core/src/main/java/org/apache/pinot/core/data/manager/realtime/SplitSegmentCommitter.java
+++ b/pinot-core/src/main/java/org/apache/pinot/core/data/manager/realtime/SplitSegmentCommitter.java
@@ -56,11 +56,11 @@ public class SplitSegmentCommitter implements SegmentCommitter {
       return SegmentCompletionProtocol.RESP_FAILED;
     }
 
-    URI segmentLocation = _segmentUploader.uploadSegment(segmentTarFile, new LLCSegmentName(_params.getSegmentName()));
+    String segmentLocation = uploadSegment(segmentTarFile, _segmentUploader, _params);
     if (segmentLocation == null) {
       return SegmentCompletionProtocol.RESP_FAILED;
     }
-    _params.withSegmentLocation(segmentLocation.toString());
+    _params.withSegmentLocation(segmentLocation);
 
     SegmentCompletionProtocol.Response commitEndResponse =
         _protocolHandler.segmentCommitEndWithMetadata(_params, segmentBuildDescriptor.getMetadataFiles());
@@ -71,4 +71,14 @@ public class SplitSegmentCommitter implements SegmentCommitter {
     }
     return commitEndResponse;
   }
+
+  // Return null iff the segment upload fails.
+  protected String uploadSegment(File segmentTarFile, SegmentUploader segmentUploader,
+      SegmentCompletionProtocol.Request.Params params) {
+    URI segmentLocation = segmentUploader.uploadSegment(segmentTarFile, new LLCSegmentName(params.getSegmentName()));
+    if (segmentLocation != null) {
+      return segmentLocation.toString();
+    }
+    return null;
+  }
 }
diff --git a/pinot-core/src/main/java/org/apache/pinot/core/segment/index/loader/IndexLoadingConfig.java b/pinot-core/src/main/java/org/apache/pinot/core/segment/index/loader/IndexLoadingConfig.java
index 6ad0f3f..3c3fb7e 100644
--- a/pinot-core/src/main/java/org/apache/pinot/core/segment/index/loader/IndexLoadingConfig.java
+++ b/pinot-core/src/main/java/org/apache/pinot/core/segment/index/loader/IndexLoadingConfig.java
@@ -41,6 +41,7 @@ import org.apache.pinot.spi.config.table.TableConfig;
  */
 public class IndexLoadingConfig {
   private static final int DEFAULT_REALTIME_AVG_MULTI_VALUE_COUNT = 2;
+  private static final String SEGMENT_STORE_URI = "segment.store.uri";
 
   private ReadMode _readMode = ReadMode.DEFAULT_MODE;
   private List<String> _sortedColumns = Collections.emptyList();
@@ -62,6 +63,7 @@ public class IndexLoadingConfig {
   private boolean _isRealtimeOffheapAllocation;
   private boolean _isDirectRealtimeOffheapAllocation;
   private boolean _enableSplitCommitEndWithMetadata;
+  private String _segmentStoreURI;
 
   // constructed from FieldConfig
   private Map<String, Map<String, String>> _columnProperties = new HashMap<>();
@@ -185,6 +187,7 @@ public class IndexLoadingConfig {
       _realtimeAvgMultiValueCount = Integer.valueOf(avgMultiValueCount);
     }
     _enableSplitCommitEndWithMetadata = instanceDataManagerConfig.isEnableSplitCommitEndWithMetadata();
+    _segmentStoreURI = instanceDataManagerConfig.getConfig().getProperty(SEGMENT_STORE_URI);
   }
 
   /**
@@ -332,6 +335,10 @@ public class IndexLoadingConfig {
     return _columnMinMaxValueGeneratorMode;
   }
 
+
+  public String getSegmentStoreURI() { return _segmentStoreURI; }
+
+
   /**
    * For tests only.
    */
diff --git a/pinot-core/src/test/java/org/apache/pinot/core/data/manager/realtime/LLRealtimeSegmentDataManagerTest.java b/pinot-core/src/test/java/org/apache/pinot/core/data/manager/realtime/LLRealtimeSegmentDataManagerTest.java
index 646ef0a..44af74f 100644
--- a/pinot-core/src/test/java/org/apache/pinot/core/data/manager/realtime/LLRealtimeSegmentDataManagerTest.java
+++ b/pinot-core/src/test/java/org/apache/pinot/core/data/manager/realtime/LLRealtimeSegmentDataManagerTest.java
@@ -44,6 +44,7 @@ import org.apache.pinot.core.realtime.impl.fakestream.FakeStreamMessageDecoder;
 import org.apache.pinot.core.segment.index.loader.IndexLoadingConfig;
 import org.apache.pinot.spi.config.table.TableConfig;
 import org.apache.pinot.spi.data.Schema;
+import org.apache.pinot.spi.env.PinotConfiguration;
 import org.apache.pinot.spi.stream.LongMsgOffset;
 import org.apache.pinot.spi.stream.LongMsgOffsetFactory;
 import org.apache.pinot.spi.stream.PermanentConsumerException;
@@ -782,6 +783,7 @@ public class LLRealtimeSegmentDataManagerTest {
       when(dataManagerConfig.getSegmentFormatVersion()).thenReturn(null);
       when(dataManagerConfig.isEnableSplitCommit()).thenReturn(false);
       when(dataManagerConfig.isRealtimeOffHeapAllocation()).thenReturn(false);
+      when(dataManagerConfig.getConfig()).thenReturn(new PinotConfiguration());
       return dataManagerConfig;
     }
 


---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscribe@pinot.apache.org
For additional commands, e-mail: commits-help@pinot.apache.org