You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@pinot.apache.org by ap...@apache.org on 2020/07/24 18:47:40 UTC
[incubator-pinot] 04/04: [Deepstore by-passing]Introduce a
subclasses SplitSegmentCommitter which will proceeds to commit even if the
segment upload fails. (#5700)
This is an automated email from the ASF dual-hosted git repository.
apucher pushed a commit to branch thirdeye-temp
in repository https://gitbox.apache.org/repos/asf/incubator-pinot.git
commit abd7de7cb414bb8899bb4fe1be469ad6b97d8703
Author: Ting Chen <ti...@uber.com>
AuthorDate: Thu Jul 23 17:13:34 2020 -0700
[Deepstore by-passing]Introduce a subclasses SplitSegmentCommitter which will proceeds to commit even if the segment upload fails. (#5700)
* First commit to enable by passing segment store in LLC.
* Fix a compilation issue.
* Fix an unit test.
* Refactor the segment committer factory api.
* Introduction a new constant for peer download scheme.
* Fix a typo.
* Add a TODO on how to control split commit behavior and refactor the uploadSegment method.
* Remove unused vars.
---
.../apache/pinot/common/utils/CommonConstants.java | 1 +
.../realtime/PinotLLCRealtimeSegmentManager.java | 8 ++--
.../PinotLLCRealtimeSegmentManagerTest.java | 3 +-
.../helix/core/realtime/SegmentCompletionTest.java | 3 +-
.../realtime/LLRealtimeSegmentDataManager.java | 24 +++--------
.../realtime/PeerSchemeSplitSegmentCommitter.java | 48 ++++++++++++++++++++++
.../manager/realtime/PinotFSSegmentUploader.java | 2 +
.../manager/realtime/SegmentCommitterFactory.java | 37 +++++++++++++----
.../manager/realtime/SplitSegmentCommitter.java | 14 ++++++-
.../segment/index/loader/IndexLoadingConfig.java | 7 ++++
.../realtime/LLRealtimeSegmentDataManagerTest.java | 2 +
11 files changed, 117 insertions(+), 32 deletions(-)
diff --git a/pinot-common/src/main/java/org/apache/pinot/common/utils/CommonConstants.java b/pinot-common/src/main/java/org/apache/pinot/common/utils/CommonConstants.java
index 7c4a7b2..3e25512 100644
--- a/pinot-common/src/main/java/org/apache/pinot/common/utils/CommonConstants.java
+++ b/pinot-common/src/main/java/org/apache/pinot/common/utils/CommonConstants.java
@@ -343,6 +343,7 @@ public class CommonConstants {
public static final String FLUSH_THRESHOLD_SIZE = "segment.flush.threshold.size";
public static final String FLUSH_THRESHOLD_TIME = "segment.flush.threshold.time";
public static final String PARTITION_METADATA = "segment.partition.metadata";
+ public static final String PEER_SEGMENT_DOWNLOAD_SCHEME = "peer://";
/**
* This field is used for parallel push protection to lock the segment globally.
* We put the segment upload start timestamp so that if the previous push failed without unlock the segment, the
diff --git a/pinot-controller/src/main/java/org/apache/pinot/controller/helix/core/realtime/PinotLLCRealtimeSegmentManager.java b/pinot-controller/src/main/java/org/apache/pinot/controller/helix/core/realtime/PinotLLCRealtimeSegmentManager.java
index 1691553..06cc58d 100644
--- a/pinot-controller/src/main/java/org/apache/pinot/controller/helix/core/realtime/PinotLLCRealtimeSegmentManager.java
+++ b/pinot-controller/src/main/java/org/apache/pinot/controller/helix/core/realtime/PinotLLCRealtimeSegmentManager.java
@@ -398,8 +398,9 @@ public class PinotLLCRealtimeSegmentManager {
}
private boolean isPeerSegmentDownloadScheme(CommittingSegmentDescriptor committingSegmentDescriptor) {
- return !(committingSegmentDescriptor == null) && !(committingSegmentDescriptor.getSegmentLocation() == null) &&
- committingSegmentDescriptor.getSegmentLocation().toLowerCase().startsWith("peer://");
+ return !(committingSegmentDescriptor == null) && !(committingSegmentDescriptor.getSegmentLocation() == null)
+ && committingSegmentDescriptor.getSegmentLocation().toLowerCase()
+ .startsWith(CommonConstants.Segment.PEER_SEGMENT_DOWNLOAD_SCHEME);
}
/**
@@ -514,7 +515,8 @@ public class PinotLLCRealtimeSegmentManager {
}
private boolean isPeerURL(String segmentLocation) {
- return segmentLocation != null && segmentLocation.toLowerCase().startsWith("peer://");
+ return segmentLocation != null && segmentLocation.toLowerCase()
+ .startsWith(CommonConstants.Segment.PEER_SEGMENT_DOWNLOAD_SCHEME);
}
/**
diff --git a/pinot-controller/src/test/java/org/apache/pinot/controller/helix/core/realtime/PinotLLCRealtimeSegmentManagerTest.java b/pinot-controller/src/test/java/org/apache/pinot/controller/helix/core/realtime/PinotLLCRealtimeSegmentManagerTest.java
index acd4664..e15d28c 100644
--- a/pinot-controller/src/test/java/org/apache/pinot/controller/helix/core/realtime/PinotLLCRealtimeSegmentManagerTest.java
+++ b/pinot-controller/src/test/java/org/apache/pinot/controller/helix/core/realtime/PinotLLCRealtimeSegmentManagerTest.java
@@ -45,6 +45,7 @@ import org.apache.helix.model.IdealState;
import org.apache.pinot.common.assignment.InstancePartitions;
import org.apache.pinot.common.metadata.segment.LLCRealtimeSegmentZKMetadata;
import org.apache.pinot.common.metrics.ControllerMetrics;
+import org.apache.pinot.common.utils.CommonConstants;
import org.apache.pinot.common.utils.CommonConstants.Helix;
import org.apache.pinot.common.utils.CommonConstants.Helix.StateModel.SegmentStateModel;
import org.apache.pinot.common.utils.CommonConstants.Segment.Realtime.Status;
@@ -801,7 +802,7 @@ public class PinotLLCRealtimeSegmentManagerTest {
// Test case 2: segment location with peer format: peer://segment1, verify that an empty string is stored in zk.
committingSegment = new LLCSegmentName(RAW_TABLE_NAME, 0, 1, CURRENT_TIME_MS).getSegmentName();
- String peerSegmentLocation = "peer:///segment1";
+ String peerSegmentLocation = CommonConstants.Segment.PEER_SEGMENT_DOWNLOAD_SCHEME + "/segment1";
committingSegmentDescriptor = new CommittingSegmentDescriptor(committingSegment,
new LongMsgOffset(PARTITION_OFFSET.getOffset() + NUM_DOCS).toString(), 0L, peerSegmentLocation);
committingSegmentDescriptor.setSegmentMetadata(mockSegmentMetadata());
diff --git a/pinot-controller/src/test/java/org/apache/pinot/controller/helix/core/realtime/SegmentCompletionTest.java b/pinot-controller/src/test/java/org/apache/pinot/controller/helix/core/realtime/SegmentCompletionTest.java
index 2723063..0166f9f 100644
--- a/pinot-controller/src/test/java/org/apache/pinot/controller/helix/core/realtime/SegmentCompletionTest.java
+++ b/pinot-controller/src/test/java/org/apache/pinot/controller/helix/core/realtime/SegmentCompletionTest.java
@@ -320,7 +320,8 @@ public class SegmentCompletionTest {
@Test
public void testHappyPathSplitCommitWithPeerDownloadScheme()
throws Exception {
- testHappyPathSplitCommit(5L, "peer:///segment1", "peer:///segment1");
+ testHappyPathSplitCommit(5L, CommonConstants.Segment.PEER_SEGMENT_DOWNLOAD_SCHEME + "/segment1",
+ CommonConstants.Segment.PEER_SEGMENT_DOWNLOAD_SCHEME + "/segment1");
}
@Test
diff --git a/pinot-core/src/main/java/org/apache/pinot/core/data/manager/realtime/LLRealtimeSegmentDataManager.java b/pinot-core/src/main/java/org/apache/pinot/core/data/manager/realtime/LLRealtimeSegmentDataManager.java
index 0a8b27d..8944e15 100644
--- a/pinot-core/src/main/java/org/apache/pinot/core/data/manager/realtime/LLRealtimeSegmentDataManager.java
+++ b/pinot-core/src/main/java/org/apache/pinot/core/data/manager/realtime/LLRealtimeSegmentDataManager.java
@@ -846,24 +846,12 @@ public class LLRealtimeSegmentDataManager extends RealtimeSegmentDataManager {
}
SegmentCommitter segmentCommitter;
-
- if (isSplitCommit) {
- // TODO: make segment uploader used in the segment committer configurable.
- SegmentUploader segmentUploader;
- try {
- segmentUploader =
- new Server2ControllerSegmentUploader(segmentLogger, _protocolHandler.getFileUploadDownloadClient(),
- _protocolHandler.getSegmentCommitUploadURL(params, controllerVipUrl), _segmentNameStr,
- ServerSegmentCompletionProtocolHandler.getSegmentUploadRequestTimeoutMs(), _serverMetrics);
- } catch (URISyntaxException e) {
- segmentLogger.error("Segment commit upload url error: ", e);
- return SegmentCompletionProtocol.RESP_NOT_SENT;
- }
- segmentCommitter = _segmentCommitterFactory.createSplitSegmentCommitter(params, segmentUploader);
- } else {
- segmentCommitter = _segmentCommitterFactory.createDefaultSegmentCommitter(params);
+ try {
+ segmentCommitter = _segmentCommitterFactory.createSegmentCommitter(isSplitCommit, params, controllerVipUrl);
+ } catch (URISyntaxException e) {
+ segmentLogger.error("Failed to create a segment committer: ", e);
+ return SegmentCompletionProtocol.RESP_NOT_SENT;
}
-
return segmentCommitter.commit(_segmentBuildDescriptor);
}
@@ -1268,7 +1256,7 @@ public class LLRealtimeSegmentDataManager extends RealtimeSegmentDataManager {
_consumeEndTime = now + minConsumeTimeMillis;
}
- _segmentCommitterFactory = new SegmentCommitterFactory(segmentLogger, _protocolHandler);
+ _segmentCommitterFactory = new SegmentCommitterFactory(segmentLogger, _protocolHandler, tableConfig, indexLoadingConfig, serverMetrics);
segmentLogger
.info("Starting consumption on realtime consuming segment {} maxRowCount {} maxEndTime {}", _llcSegmentName,
diff --git a/pinot-core/src/main/java/org/apache/pinot/core/data/manager/realtime/PeerSchemeSplitSegmentCommitter.java b/pinot-core/src/main/java/org/apache/pinot/core/data/manager/realtime/PeerSchemeSplitSegmentCommitter.java
new file mode 100644
index 0000000..118f0ff
--- /dev/null
+++ b/pinot-core/src/main/java/org/apache/pinot/core/data/manager/realtime/PeerSchemeSplitSegmentCommitter.java
@@ -0,0 +1,48 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.pinot.core.data.manager.realtime;
+
+import java.io.File;
+import java.net.URI;
+import org.apache.pinot.common.protocols.SegmentCompletionProtocol;
+import org.apache.pinot.common.utils.CommonConstants;
+import org.apache.pinot.common.utils.LLCSegmentName;
+import org.apache.pinot.common.utils.StringUtil;
+import org.apache.pinot.server.realtime.ServerSegmentCompletionProtocolHandler;
+import org.slf4j.Logger;
+
+
+public class PeerSchemeSplitSegmentCommitter extends SplitSegmentCommitter {
+ public PeerSchemeSplitSegmentCommitter(Logger segmentLogger, ServerSegmentCompletionProtocolHandler protocolHandler,
+ SegmentCompletionProtocol.Request.Params params, SegmentUploader segmentUploader) {
+ super(segmentLogger, protocolHandler, params, segmentUploader);
+ }
+
+ // Always return a uri string even if the segment upload fails and returns a null uri.
+ // If the segment upload fails, put peer:///segment_name in the segment location to notify the controller it is a
+ // peer download scheme.
+ protected String uploadSegment(File segmentTarFile, SegmentUploader segmentUploader,
+ SegmentCompletionProtocol.Request.Params params) {
+ URI segmentLocation = segmentUploader.uploadSegment(segmentTarFile, new LLCSegmentName(params.getSegmentName()));
+ if (segmentLocation == null) {
+ return StringUtil.join("/", CommonConstants.Segment.PEER_SEGMENT_DOWNLOAD_SCHEME, params.getSegmentName());
+ }
+ return segmentLocation.toString();
+ }
+}
diff --git a/pinot-core/src/main/java/org/apache/pinot/core/data/manager/realtime/PinotFSSegmentUploader.java b/pinot-core/src/main/java/org/apache/pinot/core/data/manager/realtime/PinotFSSegmentUploader.java
index 5dae4e4..75a7fff 100644
--- a/pinot-core/src/main/java/org/apache/pinot/core/data/manager/realtime/PinotFSSegmentUploader.java
+++ b/pinot-core/src/main/java/org/apache/pinot/core/data/manager/realtime/PinotFSSegmentUploader.java
@@ -40,6 +40,8 @@ import org.slf4j.LoggerFactory;
*/
public class PinotFSSegmentUploader implements SegmentUploader {
private Logger LOGGER = LoggerFactory.getLogger(PinotFSSegmentUploader.class);
+ public static final int DEFAULT_SEGMENT_UPLOAD_TIMEOUT_MILLIS = 10 * 1000;
+
private String _segmentStoreUriStr;
private ExecutorService _executorService = Executors.newCachedThreadPool();
private int _timeoutInMs;
diff --git a/pinot-core/src/main/java/org/apache/pinot/core/data/manager/realtime/SegmentCommitterFactory.java b/pinot-core/src/main/java/org/apache/pinot/core/data/manager/realtime/SegmentCommitterFactory.java
index df0f6b8..2d8154e 100644
--- a/pinot-core/src/main/java/org/apache/pinot/core/data/manager/realtime/SegmentCommitterFactory.java
+++ b/pinot-core/src/main/java/org/apache/pinot/core/data/manager/realtime/SegmentCommitterFactory.java
@@ -18,8 +18,12 @@
*/
package org.apache.pinot.core.data.manager.realtime;
+import java.net.URISyntaxException;
+import org.apache.pinot.common.metrics.ServerMetrics;
import org.apache.pinot.common.protocols.SegmentCompletionProtocol;
+import org.apache.pinot.core.segment.index.loader.IndexLoadingConfig;
import org.apache.pinot.server.realtime.ServerSegmentCompletionProtocolHandler;
+import org.apache.pinot.spi.config.table.TableConfig;
import org.slf4j.Logger;
@@ -29,18 +33,37 @@ import org.slf4j.Logger;
public class SegmentCommitterFactory {
private static Logger LOGGER;
private final ServerSegmentCompletionProtocolHandler _protocolHandler;
+ private final TableConfig _tableConfig;
+ private final ServerMetrics _serverMetrics;
+ private final IndexLoadingConfig _indexLoadingConfig;
- public SegmentCommitterFactory(Logger segmentLogger, ServerSegmentCompletionProtocolHandler protocolHandler) {
+ public SegmentCommitterFactory(Logger segmentLogger, ServerSegmentCompletionProtocolHandler protocolHandler,
+ TableConfig tableConfig, IndexLoadingConfig indexLoadingConfig, ServerMetrics serverMetrics) {
LOGGER = segmentLogger;
_protocolHandler = protocolHandler;
+ _tableConfig = tableConfig;
+ _indexLoadingConfig = indexLoadingConfig;
+ _serverMetrics = serverMetrics;
}
- public SegmentCommitter createSplitSegmentCommitter(SegmentCompletionProtocol.Request.Params params,
- SegmentUploader segmentUploader) {
- return new SplitSegmentCommitter(LOGGER, _protocolHandler, params, segmentUploader);
- }
+ public SegmentCommitter createSegmentCommitter(boolean isSplitCommit, SegmentCompletionProtocol.Request.Params params,
+ String controllerVipUrl)
+ throws URISyntaxException {
+ if (!isSplitCommit) {
+ return new DefaultSegmentCommitter(LOGGER, _protocolHandler, params);
+ }
+ SegmentUploader segmentUploader;
+ // TODO Instead of using a peer segment download scheme to control how the servers do split commit, we should use
+ // other configs such as server or controller configs or controller responses to the servers.
+ if (_tableConfig.getValidationConfig().getPeerSegmentDownloadScheme() != null) {
+ segmentUploader = new PinotFSSegmentUploader(_indexLoadingConfig.getSegmentStoreURI(),
+ PinotFSSegmentUploader.DEFAULT_SEGMENT_UPLOAD_TIMEOUT_MILLIS);
+ return new PeerSchemeSplitSegmentCommitter(LOGGER, _protocolHandler, params, segmentUploader);
+ }
- public SegmentCommitter createDefaultSegmentCommitter(SegmentCompletionProtocol.Request.Params params) {
- return new DefaultSegmentCommitter(LOGGER, _protocolHandler, params);
+ segmentUploader = new Server2ControllerSegmentUploader(LOGGER, _protocolHandler.getFileUploadDownloadClient(),
+ _protocolHandler.getSegmentCommitUploadURL(params, controllerVipUrl), params.getSegmentName(),
+ ServerSegmentCompletionProtocolHandler.getSegmentUploadRequestTimeoutMs(), _serverMetrics);
+ return new SplitSegmentCommitter(LOGGER, _protocolHandler, params, segmentUploader);
}
}
diff --git a/pinot-core/src/main/java/org/apache/pinot/core/data/manager/realtime/SplitSegmentCommitter.java b/pinot-core/src/main/java/org/apache/pinot/core/data/manager/realtime/SplitSegmentCommitter.java
index 8d73498..33b2ac0 100644
--- a/pinot-core/src/main/java/org/apache/pinot/core/data/manager/realtime/SplitSegmentCommitter.java
+++ b/pinot-core/src/main/java/org/apache/pinot/core/data/manager/realtime/SplitSegmentCommitter.java
@@ -56,11 +56,11 @@ public class SplitSegmentCommitter implements SegmentCommitter {
return SegmentCompletionProtocol.RESP_FAILED;
}
- URI segmentLocation = _segmentUploader.uploadSegment(segmentTarFile, new LLCSegmentName(_params.getSegmentName()));
+ String segmentLocation = uploadSegment(segmentTarFile, _segmentUploader, _params);
if (segmentLocation == null) {
return SegmentCompletionProtocol.RESP_FAILED;
}
- _params.withSegmentLocation(segmentLocation.toString());
+ _params.withSegmentLocation(segmentLocation);
SegmentCompletionProtocol.Response commitEndResponse =
_protocolHandler.segmentCommitEndWithMetadata(_params, segmentBuildDescriptor.getMetadataFiles());
@@ -71,4 +71,14 @@ public class SplitSegmentCommitter implements SegmentCommitter {
}
return commitEndResponse;
}
+
+ // Return null iff the segment upload fails.
+ protected String uploadSegment(File segmentTarFile, SegmentUploader segmentUploader,
+ SegmentCompletionProtocol.Request.Params params) {
+ URI segmentLocation = segmentUploader.uploadSegment(segmentTarFile, new LLCSegmentName(params.getSegmentName()));
+ if (segmentLocation != null) {
+ return segmentLocation.toString();
+ }
+ return null;
+ }
}
diff --git a/pinot-core/src/main/java/org/apache/pinot/core/segment/index/loader/IndexLoadingConfig.java b/pinot-core/src/main/java/org/apache/pinot/core/segment/index/loader/IndexLoadingConfig.java
index 6ad0f3f..3c3fb7e 100644
--- a/pinot-core/src/main/java/org/apache/pinot/core/segment/index/loader/IndexLoadingConfig.java
+++ b/pinot-core/src/main/java/org/apache/pinot/core/segment/index/loader/IndexLoadingConfig.java
@@ -41,6 +41,7 @@ import org.apache.pinot.spi.config.table.TableConfig;
*/
public class IndexLoadingConfig {
private static final int DEFAULT_REALTIME_AVG_MULTI_VALUE_COUNT = 2;
+ private static final String SEGMENT_STORE_URI = "segment.store.uri";
private ReadMode _readMode = ReadMode.DEFAULT_MODE;
private List<String> _sortedColumns = Collections.emptyList();
@@ -62,6 +63,7 @@ public class IndexLoadingConfig {
private boolean _isRealtimeOffheapAllocation;
private boolean _isDirectRealtimeOffheapAllocation;
private boolean _enableSplitCommitEndWithMetadata;
+ private String _segmentStoreURI;
// constructed from FieldConfig
private Map<String, Map<String, String>> _columnProperties = new HashMap<>();
@@ -185,6 +187,7 @@ public class IndexLoadingConfig {
_realtimeAvgMultiValueCount = Integer.valueOf(avgMultiValueCount);
}
_enableSplitCommitEndWithMetadata = instanceDataManagerConfig.isEnableSplitCommitEndWithMetadata();
+ _segmentStoreURI = instanceDataManagerConfig.getConfig().getProperty(SEGMENT_STORE_URI);
}
/**
@@ -332,6 +335,10 @@ public class IndexLoadingConfig {
return _columnMinMaxValueGeneratorMode;
}
+
+ public String getSegmentStoreURI() { return _segmentStoreURI; }
+
+
/**
* For tests only.
*/
diff --git a/pinot-core/src/test/java/org/apache/pinot/core/data/manager/realtime/LLRealtimeSegmentDataManagerTest.java b/pinot-core/src/test/java/org/apache/pinot/core/data/manager/realtime/LLRealtimeSegmentDataManagerTest.java
index 646ef0a..44af74f 100644
--- a/pinot-core/src/test/java/org/apache/pinot/core/data/manager/realtime/LLRealtimeSegmentDataManagerTest.java
+++ b/pinot-core/src/test/java/org/apache/pinot/core/data/manager/realtime/LLRealtimeSegmentDataManagerTest.java
@@ -44,6 +44,7 @@ import org.apache.pinot.core.realtime.impl.fakestream.FakeStreamMessageDecoder;
import org.apache.pinot.core.segment.index.loader.IndexLoadingConfig;
import org.apache.pinot.spi.config.table.TableConfig;
import org.apache.pinot.spi.data.Schema;
+import org.apache.pinot.spi.env.PinotConfiguration;
import org.apache.pinot.spi.stream.LongMsgOffset;
import org.apache.pinot.spi.stream.LongMsgOffsetFactory;
import org.apache.pinot.spi.stream.PermanentConsumerException;
@@ -782,6 +783,7 @@ public class LLRealtimeSegmentDataManagerTest {
when(dataManagerConfig.getSegmentFormatVersion()).thenReturn(null);
when(dataManagerConfig.isEnableSplitCommit()).thenReturn(false);
when(dataManagerConfig.isRealtimeOffHeapAllocation()).thenReturn(false);
+ when(dataManagerConfig.getConfig()).thenReturn(new PinotConfiguration());
return dataManagerConfig;
}
---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscribe@pinot.apache.org
For additional commands, e-mail: commits-help@pinot.apache.org