You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@pinot.apache.org by mc...@apache.org on 2019/03/19 23:35:47 UTC
[incubator-pinot] branch master updated: Pinot server side change
to optimize LLC segment completion with direct metadata upload. (#3941)
This is an automated email from the ASF dual-hosted git repository.
mcvsubbu pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/incubator-pinot.git
The following commit(s) were added to refs/heads/master by this push:
new fe203b5 Pinot server side change to optimize LLC segment completion with direct metadata upload. (#3941)
fe203b5 is described below
commit fe203b5be2bbbf6464f1c757f07cb3374fa3bf19
Author: Ting Chen <ch...@gmail.com>
AuthorDate: Tue Mar 19 16:35:41 2019 -0700
Pinot server side change to optimize LLC segment completion with direct metadata upload. (#3941)
* The Pinot server side change to optimize LLC segment complete protocol by uploading metadata directly.
* Minor style and comment fixes.
* Fix error handling and style issues.
* Add missing header..
* Add the config for commit end with metadata into server instance config. Combine integration tests.
* Change comments.
* Minor comment fix.
* Use the metadata files created in the server directly.
* Deprecated the old segmentCommitEnd method and change the default to commentEndWithMetadata.
* Return null instead of crashing server...
* Add comments.
* Invert config checks.
---
.../protocols/SegmentCompletionProtocol.java | 6 +++
.../apache/pinot/common/utils/CommonConstants.java | 1 +
.../common/utils/FileUploadDownloadClient.java | 24 +++++++++++
.../manager/config/InstanceDataManagerConfig.java | 2 +
.../realtime/LLRealtimeSegmentDataManager.java | 48 +++++++++++++++++++---
.../segment/index/loader/IndexLoadingConfig.java | 4 ++
.../ServerSegmentCompletionProtocolHandler.java | 37 +++++++++++++++++
.../realtime/LLRealtimeSegmentDataManagerTest.java | 4 +-
...CRealtimeClusterSplitCommitIntegrationTest.java | 2 +-
.../helix/HelixInstanceDataManagerConfig.java | 7 ++++
10 files changed, 126 insertions(+), 9 deletions(-)
diff --git a/pinot-common/src/main/java/org/apache/pinot/common/protocols/SegmentCompletionProtocol.java b/pinot-common/src/main/java/org/apache/pinot/common/protocols/SegmentCompletionProtocol.java
index 60ee1e0..822ad21 100644
--- a/pinot-common/src/main/java/org/apache/pinot/common/protocols/SegmentCompletionProtocol.java
+++ b/pinot-common/src/main/java/org/apache/pinot/common/protocols/SegmentCompletionProtocol.java
@@ -359,6 +359,12 @@ public class SegmentCompletionProtocol {
}
}
+ public static class SegmentCommitEndWithMetadataRequest extends Request {
+ public SegmentCommitEndWithMetadataRequest(Params params) {
+ super(params, MSG_TYPE_COMMIT_END_METADATA);
+ }
+ }
+
public static class SegmentStoppedConsuming extends Request {
public SegmentStoppedConsuming(Params params) {
super(params, MSG_TYPE_STOPPED_CONSUMING);
diff --git a/pinot-common/src/main/java/org/apache/pinot/common/utils/CommonConstants.java b/pinot-common/src/main/java/org/apache/pinot/common/utils/CommonConstants.java
index b76f2b8..0777ad4 100644
--- a/pinot-common/src/main/java/org/apache/pinot/common/utils/CommonConstants.java
+++ b/pinot-common/src/main/java/org/apache/pinot/common/utils/CommonConstants.java
@@ -156,6 +156,7 @@ public class CommonConstants {
public static final String CONFIG_OF_ENABLE_DEFAULT_COLUMNS = "pinot.server.instance.enable.default.columns";
public static final String CONFIG_OF_ENABLE_SHUTDOWN_DELAY = "pinot.server.instance.enable.shutdown.delay";
public static final String CONFIG_OF_ENABLE_SPLIT_COMMIT = "pinot.server.instance.enable.split.commit";
+ public static final String CONFIG_OF_ENABLE_COMMIT_END_WITH_METADATA = "pinot.server.instance.enable.commitend.metadata";
public static final String CONFIG_OF_REALTIME_OFFHEAP_ALLOCATION = "pinot.server.instance.realtime.alloc.offheap";
public static final String CONFIG_OF_REALTIME_OFFHEAP_DIRECT_ALLOCATION =
"pinot.server.instance.realtime.alloc.offheap.direct";
diff --git a/pinot-common/src/main/java/org/apache/pinot/common/utils/FileUploadDownloadClient.java b/pinot-common/src/main/java/org/apache/pinot/common/utils/FileUploadDownloadClient.java
index 4a37dcf..de8c76a 100644
--- a/pinot-common/src/main/java/org/apache/pinot/common/utils/FileUploadDownloadClient.java
+++ b/pinot-common/src/main/java/org/apache/pinot/common/utils/FileUploadDownloadClient.java
@@ -29,6 +29,7 @@ import java.io.OutputStream;
import java.net.URI;
import java.net.URISyntaxException;
import java.util.List;
+import java.util.Map;
import javax.annotation.Nullable;
import javax.net.ssl.SSLContext;
import org.apache.commons.io.IOUtils;
@@ -229,6 +230,22 @@ public class FileUploadDownloadClient implements Closeable {
parameters, socketTimeoutMs);
}
+ private static HttpUriRequest getUploadSegmentMetadataFilesRequest(URI uri, Map<String, File> metadataFiles,
+ int segmentUploadRequestTimeoutMs) {
+ MultipartEntityBuilder multipartEntityBuilder = MultipartEntityBuilder.create().
+ setMode(HttpMultipartMode.BROWSER_COMPATIBLE);
+ for (Map.Entry<String, File> entry : metadataFiles.entrySet()) {
+ multipartEntityBuilder.addPart(entry.getKey(), getContentBody(entry.getKey(), entry.getValue()));
+ }
+ HttpEntity entity = multipartEntityBuilder.build();
+
+ // Build the POST request.
+ RequestBuilder requestBuilder =
+ RequestBuilder.create(HttpPost.METHOD_NAME).setVersion(HttpVersion.HTTP_1_1).setUri(uri).setEntity(entity);
+ setTimeout(requestBuilder, segmentUploadRequestTimeoutMs);
+ return requestBuilder.build();
+ }
+
private static HttpUriRequest getSendSegmentUriRequest(URI uri, String downloadUri, @Nullable List<Header> headers,
@Nullable List<NameValuePair> parameters, int socketTimeoutMs) {
RequestBuilder requestBuilder = RequestBuilder.post(uri).setVersion(HttpVersion.HTTP_1_1)
@@ -388,6 +405,13 @@ public class FileUploadDownloadClient implements Closeable {
getUploadSegmentMetadataRequest(uri, segmentName, segmentMetadataFile, headers, parameters, socketTimeoutMs));
}
+ // Upload a set of segment metadata files (e.g., meta.properties and creation.meta) to controllers.
+ public SimpleHttpResponse uploadSegmentMetadataFiles(URI uri, Map<String, File> metadataFiles,
+ int segmentUploadRequestTimeoutMs)
+ throws IOException, HttpErrorStatusException {
+ return sendRequest(getUploadSegmentMetadataFilesRequest(uri, metadataFiles, segmentUploadRequestTimeoutMs));
+ }
+
/**
* Upload segment with segment file.
*
diff --git a/pinot-core/src/main/java/org/apache/pinot/core/data/manager/config/InstanceDataManagerConfig.java b/pinot-core/src/main/java/org/apache/pinot/core/data/manager/config/InstanceDataManagerConfig.java
index 2685768..ba1896f 100644
--- a/pinot-core/src/main/java/org/apache/pinot/core/data/manager/config/InstanceDataManagerConfig.java
+++ b/pinot-core/src/main/java/org/apache/pinot/core/data/manager/config/InstanceDataManagerConfig.java
@@ -45,6 +45,8 @@ public interface InstanceDataManagerConfig {
boolean isEnableSplitCommit();
+ boolean isEnableSplitCommitEndWithMetadata();
+
boolean isRealtimeOffHeapAllocation();
boolean isDirectRealtimeOffheapAllocation();
diff --git a/pinot-core/src/main/java/org/apache/pinot/core/data/manager/realtime/LLRealtimeSegmentDataManager.java b/pinot-core/src/main/java/org/apache/pinot/core/data/manager/realtime/LLRealtimeSegmentDataManager.java
index 18b1409..50ee54e 100644
--- a/pinot-core/src/main/java/org/apache/pinot/core/data/manager/realtime/LLRealtimeSegmentDataManager.java
+++ b/pinot-core/src/main/java/org/apache/pinot/core/data/manager/realtime/LLRealtimeSegmentDataManager.java
@@ -25,7 +25,9 @@ import com.yammer.metrics.core.Meter;
import java.io.File;
import java.io.IOException;
import java.util.ArrayList;
+import java.util.HashMap;
import java.util.List;
+import java.util.Map;
import java.util.Set;
import java.util.concurrent.Semaphore;
import java.util.concurrent.TimeUnit;
@@ -67,6 +69,7 @@ import org.apache.pinot.core.realtime.stream.StreamDecoderProvider;
import org.apache.pinot.core.realtime.stream.StreamMessageDecoder;
import org.apache.pinot.core.realtime.stream.StreamMetadataProvider;
import org.apache.pinot.core.realtime.stream.TransientConsumerException;
+import org.apache.pinot.core.segment.creator.impl.V1Constants;
import org.apache.pinot.core.segment.index.loader.IndexLoadingConfig;
import org.apache.pinot.server.realtime.ServerSegmentCompletionProtocolHandler;
import org.joda.time.DateTime;
@@ -130,15 +133,17 @@ public class LLRealtimeSegmentDataManager extends RealtimeSegmentDataManager {
protected class SegmentBuildDescriptor {
final String _segmentTarFilePath;
+ final Map<String, File> _metadataFileMap;
final long _offset;
final long _waitTimeMillis;
final long _buildTimeMillis;
final String _segmentDirPath;
final long _segmentSizeBytes;
- SegmentBuildDescriptor(String segmentTarFilePath, long offset, String segmentDirPath, long buildTimeMillis,
- long waitTimeMillis, long segmentSizeBytes) {
+ SegmentBuildDescriptor(String segmentTarFilePath, Map<String, File> metadataFileMap, long offset,
+ String segmentDirPath, long buildTimeMillis, long waitTimeMillis, long segmentSizeBytes) {
_segmentTarFilePath = segmentTarFilePath;
+ _metadataFileMap = metadataFileMap;
_offset = offset;
_buildTimeMillis = buildTimeMillis;
_waitTimeMillis = waitTimeMillis;
@@ -173,6 +178,10 @@ public class LLRealtimeSegmentDataManager extends RealtimeSegmentDataManager {
FileUtils.deleteQuietly(new File(_segmentTarFilePath));
}
}
+
+ public Map<String, File> getMetadataFiles() {
+ return _metadataFileMap;
+ }
}
private static final long TIME_THRESHOLD_FOR_LOG_MINUTES = 1;
@@ -681,11 +690,32 @@ public class LLRealtimeSegmentDataManager extends RealtimeSegmentDataManager {
TimeUnit.MILLISECONDS.toSeconds(waitTimeMillis));
if (forCommit) {
+ File[] segmentfiles = destDir.listFiles();
+ if (segmentfiles == null || segmentfiles.length == 0) {
+ segmentLogger.error("The index dir is empty: {}", destDir);
+ return null;
+ }
+ // segmentfiles[0] is the sub directory with version name (e.g., V3).
+ File metadataFileName = new File(segmentfiles[0], V1Constants.MetadataKeys.METADATA_FILE_NAME);
+ if (!metadataFileName.exists()) {
+ segmentLogger
+ .error("File does not exist in {} for {}.", destDir, V1Constants.MetadataKeys.METADATA_FILE_NAME);
+ return null;
+ }
+ File creationMetaFile = new File(segmentfiles[0], V1Constants.SEGMENT_CREATION_META);
+ if (!creationMetaFile.exists()) {
+ segmentLogger.error("File does not exist in {} for {}.", destDir, V1Constants.SEGMENT_CREATION_META);
+ return null;
+ }
+
+ Map<String, File> metadataFiles = new HashMap<>();
+ metadataFiles.put(V1Constants.MetadataKeys.METADATA_FILE_NAME, metadataFileName);
+ metadataFiles.put(V1Constants.SEGMENT_CREATION_META, creationMetaFile);
return new SegmentBuildDescriptor(destDir.getAbsolutePath() + TarGzCompressionUtils.TAR_GZ_FILE_EXTENSION,
- _currentOffset, null, buildTimeMillis, waitTimeMillis, segmentSizeBytes);
+ metadataFiles, _currentOffset, null, buildTimeMillis, waitTimeMillis, segmentSizeBytes);
}
- return new SegmentBuildDescriptor(null, _currentOffset, destDir.getAbsolutePath(), buildTimeMillis,
- waitTimeMillis, segmentSizeBytes);
+ return new SegmentBuildDescriptor(null, null, _currentOffset, destDir.getAbsolutePath(),
+ buildTimeMillis, waitTimeMillis, segmentSizeBytes);
} catch (InterruptedException e) {
segmentLogger.error("Interrupted while waiting for semaphore");
return null;
@@ -735,7 +765,13 @@ public class LLRealtimeSegmentDataManager extends RealtimeSegmentDataManager {
if (_isOffHeap) {
params.withMemoryUsedBytes(_memoryManager.getTotalAllocatedBytes());
}
- SegmentCompletionProtocol.Response commitEndResponse = _protocolHandler.segmentCommitEnd(params);
+ SegmentCompletionProtocol.Response commitEndResponse;
+ if (_indexLoadingConfig.isEnableSplitCommitEndWithMetadata()) {
+ commitEndResponse = _protocolHandler.segmentCommitEndWithMetadata(params, _segmentBuildDescriptor.getMetadataFiles());
+ } else {
+ commitEndResponse = _protocolHandler.segmentCommitEnd(params);
+ }
+
if (!commitEndResponse.getStatus().equals(SegmentCompletionProtocol.ControllerResponseStatus.COMMIT_SUCCESS)) {
segmentLogger.warn("CommitEnd failed with response {}", commitEndResponse.toJsonString());
return SegmentCompletionProtocol.RESP_FAILED;
diff --git a/pinot-core/src/main/java/org/apache/pinot/core/segment/index/loader/IndexLoadingConfig.java b/pinot-core/src/main/java/org/apache/pinot/core/segment/index/loader/IndexLoadingConfig.java
index 7e46d0c..04f9beb 100644
--- a/pinot-core/src/main/java/org/apache/pinot/core/segment/index/loader/IndexLoadingConfig.java
+++ b/pinot-core/src/main/java/org/apache/pinot/core/segment/index/loader/IndexLoadingConfig.java
@@ -57,6 +57,7 @@ public class IndexLoadingConfig {
private boolean _enableSplitCommit;
private boolean _isRealtimeOffheapAllocation;
private boolean _isDirectRealtimeOffheapAllocation;
+ private boolean _enableSplitCommitEndWithMetadata;
public IndexLoadingConfig(@Nonnull InstanceDataManagerConfig instanceDataManagerConfig,
@Nonnull TableConfig tableConfig) {
@@ -135,6 +136,7 @@ public class IndexLoadingConfig {
if (avgMultiValueCount != null) {
_realtimeAvgMultiValueCount = Integer.valueOf(avgMultiValueCount);
}
+ _enableSplitCommitEndWithMetadata = instanceDataManagerConfig.isEnableSplitCommitEndWithMetadata();
}
/**
@@ -222,6 +224,8 @@ public class IndexLoadingConfig {
return _enableSplitCommit;
}
+ public boolean isEnableSplitCommitEndWithMetadata() { return _enableSplitCommitEndWithMetadata; }
+
public boolean isRealtimeOffheapAllocation() {
return _isRealtimeOffheapAllocation;
}
diff --git a/pinot-core/src/main/java/org/apache/pinot/server/realtime/ServerSegmentCompletionProtocolHandler.java b/pinot-core/src/main/java/org/apache/pinot/server/realtime/ServerSegmentCompletionProtocolHandler.java
index c411d16..2261c55 100644
--- a/pinot-core/src/main/java/org/apache/pinot/server/realtime/ServerSegmentCompletionProtocolHandler.java
+++ b/pinot-core/src/main/java/org/apache/pinot/server/realtime/ServerSegmentCompletionProtocolHandler.java
@@ -20,6 +20,7 @@ package org.apache.pinot.server.realtime;
import java.io.File;
import java.net.URI;
+import java.util.Map;
import javax.net.ssl.SSLContext;
import org.apache.commons.configuration.Configuration;
import org.apache.pinot.common.metrics.ServerMeter;
@@ -95,6 +96,8 @@ public class ServerSegmentCompletionProtocolHandler {
return uploadSegment(url, params.getSegmentName(), segmentTarFile);
}
+ // Replaced by segmentCommitEndWithMetadata().
+ @Deprecated
public SegmentCompletionProtocol.Response segmentCommitEnd(SegmentCompletionProtocol.Request.Params params) {
SegmentCompletionProtocol.SegmentCommitEndRequest request =
new SegmentCompletionProtocol.SegmentCommitEndRequest(params);
@@ -105,6 +108,17 @@ public class ServerSegmentCompletionProtocolHandler {
return sendRequest(url);
}
+ public SegmentCompletionProtocol.Response segmentCommitEndWithMetadata(
+ SegmentCompletionProtocol.Request.Params params, final Map<String, File> metadataFiles) {
+ SegmentCompletionProtocol.SegmentCommitEndWithMetadataRequest request =
+ new SegmentCompletionProtocol.SegmentCommitEndWithMetadataRequest(params);
+ String url = createSegmentCompletionUrl(request);
+ if (url == null) {
+ return SegmentCompletionProtocol.RESP_NOT_SENT;
+ }
+ return sendCommitEndWithMetadataFiles(url, metadataFiles);
+ }
+
public SegmentCompletionProtocol.Response segmentCommit(SegmentCompletionProtocol.Request.Params params,
final File segmentTarFile) {
SegmentCompletionProtocol.SegmentCommitRequest request = new SegmentCompletionProtocol.SegmentCommitRequest(params);
@@ -182,6 +196,29 @@ public class ServerSegmentCompletionProtocolHandler {
return response;
}
+ private SegmentCompletionProtocol.Response sendCommitEndWithMetadataFiles(String url,
+ Map<String, File> metadataFiles) {
+ SegmentCompletionProtocol.Response response;
+ try {
+ String responseStr = _fileUploadDownloadClient
+ .uploadSegmentMetadataFiles(new URI(url), metadataFiles, SEGMENT_UPLOAD_REQUEST_TIMEOUT_MS).getResponse();
+ response = SegmentCompletionProtocol.Response.fromJsonString(responseStr);
+ LOGGER.info("Controller response {} for {}", response.toJsonString(), url);
+ if (response.getStatus().equals(SegmentCompletionProtocol.ControllerResponseStatus.NOT_LEADER)) {
+ ControllerLeaderLocator.getInstance().invalidateCachedControllerLeader();
+ }
+ } catch (Exception e) {
+ // Catch all exceptions, we want the protocol to handle the case assuming the request was never sent.
+ response = SegmentCompletionProtocol.RESP_NOT_SENT;
+ LOGGER.error("Could not send request {}", url, e);
+ // Invalidate controller leader cache, as exception could be because of leader being down (deployment/failure) and hence unable to send {@link SegmentCompletionProtocol.ControllerResponseStatus.NOT_LEADER}
+ // If cache is not invalidated, we will not recover from exceptions until the controller comes back up
+ ControllerLeaderLocator.getInstance().invalidateCachedControllerLeader();
+ }
+ raiseSegmentCompletionProtocolResponseMetric(response);
+ return response;
+ }
+
private SegmentCompletionProtocol.Response uploadSegment(String url, final String segmentName,
final File segmentTarFile) {
SegmentCompletionProtocol.Response response;
diff --git a/pinot-core/src/test/java/org/apache/pinot/core/data/manager/realtime/LLRealtimeSegmentDataManagerTest.java b/pinot-core/src/test/java/org/apache/pinot/core/data/manager/realtime/LLRealtimeSegmentDataManagerTest.java
index bf584d1..e87a85a 100644
--- a/pinot-core/src/test/java/org/apache/pinot/core/data/manager/realtime/LLRealtimeSegmentDataManagerTest.java
+++ b/pinot-core/src/test/java/org/apache/pinot/core/data/manager/realtime/LLRealtimeSegmentDataManagerTest.java
@@ -766,7 +766,7 @@ public class LLRealtimeSegmentDataManagerTest {
return null;
}
if (!forCommit) {
- return new SegmentBuildDescriptor(null, getCurrentOffset(), _segmentDir, 0, 0, -1);
+ return new SegmentBuildDescriptor(null, null, getCurrentOffset(), _segmentDir, 0, 0, -1);
}
final String segTarFileName = _segmentDir + "/" + "segmentFile";
File segmentTgzFile = new File(segTarFileName);
@@ -775,7 +775,7 @@ public class LLRealtimeSegmentDataManagerTest {
} catch (IOException e) {
Assert.fail("Could not create file " + segmentTgzFile);
}
- return new SegmentBuildDescriptor(segTarFileName, getCurrentOffset(), null, 0, 0, -1);
+ return new SegmentBuildDescriptor(segTarFileName, null, getCurrentOffset(), null, 0, 0, -1);
}
@Override
diff --git a/pinot-integration-tests/src/test/java/org/apache/pinot/integration/tests/LLCRealtimeClusterSplitCommitIntegrationTest.java b/pinot-integration-tests/src/test/java/org/apache/pinot/integration/tests/LLCRealtimeClusterSplitCommitIntegrationTest.java
index fca5d99..454cdb4 100644
--- a/pinot-integration-tests/src/test/java/org/apache/pinot/integration/tests/LLCRealtimeClusterSplitCommitIntegrationTest.java
+++ b/pinot-integration-tests/src/test/java/org/apache/pinot/integration/tests/LLCRealtimeClusterSplitCommitIntegrationTest.java
@@ -27,7 +27,6 @@ import org.apache.pinot.controller.ControllerConf;
* Integration test that extends LLCRealtimeClusterIntegrationTest but with split commit enabled.
*/
public class LLCRealtimeClusterSplitCommitIntegrationTest extends LLCRealtimeClusterIntegrationTest {
-
@Override
public void startController() {
ControllerConf controllerConfig = getDefaultControllerConfiguration();
@@ -39,6 +38,7 @@ public class LLCRealtimeClusterSplitCommitIntegrationTest extends LLCRealtimeClu
public void startServer() {
Configuration serverConfig = getDefaultServerConfiguration();
serverConfig.setProperty(CommonConstants.Server.CONFIG_OF_ENABLE_SPLIT_COMMIT, true);
+ serverConfig.setProperty(CommonConstants.Server.CONFIG_OF_ENABLE_COMMIT_END_WITH_METADATA, true);
startServer(serverConfig);
}
}
diff --git a/pinot-server/src/main/java/org/apache/pinot/server/starter/helix/HelixInstanceDataManagerConfig.java b/pinot-server/src/main/java/org/apache/pinot/server/starter/helix/HelixInstanceDataManagerConfig.java
index 2154d88..5439618 100644
--- a/pinot-server/src/main/java/org/apache/pinot/server/starter/helix/HelixInstanceDataManagerConfig.java
+++ b/pinot-server/src/main/java/org/apache/pinot/server/starter/helix/HelixInstanceDataManagerConfig.java
@@ -68,6 +68,8 @@ public class HelixInstanceDataManagerConfig implements InstanceDataManagerConfig
// Key of whether to enable split commit
private static final String ENABLE_SPLIT_COMMIT = "enable.split.commit";
+ // Key of whether to enable split commit end with segment metadata files.
+ private static final String ENABLE_SPLIT_COMMIT_END_WITH_METADATA = "enable.commitend.metadata";
// Whether memory for realtime consuming segments should be allocated off-heap.
private static final String REALTIME_OFFHEAP_ALLOCATION = "realtime.alloc.offheap";
@@ -164,6 +166,11 @@ public class HelixInstanceDataManagerConfig implements InstanceDataManagerConfig
}
@Override
+ public boolean isEnableSplitCommitEndWithMetadata() {
+ return _instanceDataManagerConfiguration.getBoolean(ENABLE_SPLIT_COMMIT_END_WITH_METADATA, true);
+ }
+
+ @Override
public boolean isRealtimeOffHeapAllocation() {
return _instanceDataManagerConfiguration.getBoolean(REALTIME_OFFHEAP_ALLOCATION, false);
}
---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscribe@pinot.apache.org
For additional commands, e-mail: commits-help@pinot.apache.org