You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@pinot.apache.org by mc...@apache.org on 2019/03/13 22:24:22 UTC
[incubator-pinot] branch master updated: Pinot controller side
change to enhance LLC segment metadata upload. (#3877)
This is an automated email from the ASF dual-hosted git repository.
mcvsubbu pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/incubator-pinot.git
The following commit(s) were added to refs/heads/master by this push:
new 2a70bb9 Pinot controller side change to enhance LLC segment metadata upload. (#3877)
2a70bb9 is described below
commit 2a70bb913f006d52a55aa686a86464b9acf4473f
Author: Ting Chen <ch...@gmail.com>
AuthorDate: Wed Mar 13 15:24:17 2019 -0700
Pinot controller side change to enhance LLC segment metadata upload. (#3877)
* Pinot controller side change to enhance LLC segment metadata upload.
* Remove an unused lib.
* Revise based on reviews.
* Let the LLCSegmentCompletionHandlers handle the metadata extraction.
* Clean up error case behaviors for metadata extraction based on review.
* Add more logging for error cases based on review feedback.
* Fix integration failure by passing the right segment location in controller for metadata extraction.
* Use PinotFS to copy segment files for metadata extraction.
* Construct segment file URI directly.
* Further revision to address reviews.
* (1)Remove the prefix java.net. (2) Add test to show the diff between Apache URI and java.net.URI.
* Fix a comment typo.
* Revise logging and lib inclusion based on reviews.
* Return failure directly if there is no metadata file in the input form.
* Refector the upload function based on Subu's comments.
* Fix the log error level.
* Apply Subba's refactoring and a few of my twigs.
* Address reviewer feedbacks about format and error logs.
* Fix an compilation error.
* Fix style issues and eliminate a local method.
* Fix redundant line.
---
.../protocols/SegmentCompletionProtocol.java | 1 +
.../resources/LLCSegmentCompletionHandlers.java | 364 ++++++++++++++++-----
.../realtime/PinotLLCRealtimeSegmentManager.java | 69 +---
.../core/realtime/SegmentCompletionManager.java | 13 +-
.../segment/CommittingSegmentDescriptor.java | 18 +
.../PinotLLCRealtimeSegmentManagerTest.java | 61 ++--
.../helix/core/realtime/SegmentCompletionTest.java | 30 +-
7 files changed, 382 insertions(+), 174 deletions(-)
diff --git a/pinot-common/src/main/java/org/apache/pinot/common/protocols/SegmentCompletionProtocol.java b/pinot-common/src/main/java/org/apache/pinot/common/protocols/SegmentCompletionProtocol.java
index cedc8d6..60ee1e0 100644
--- a/pinot-common/src/main/java/org/apache/pinot/common/protocols/SegmentCompletionProtocol.java
+++ b/pinot-common/src/main/java/org/apache/pinot/common/protocols/SegmentCompletionProtocol.java
@@ -112,6 +112,7 @@ public class SegmentCompletionProtocol {
public static final String MSG_TYPE_COMMIT_START = "segmentCommitStart";
public static final String MSG_TYPE_SEGMENT_UPLOAD = "segmentUpload";
public static final String MSG_TYPE_COMMIT_END = "segmentCommitEnd";
+ public static final String MSG_TYPE_COMMIT_END_METADATA = "segmentCommitEndWithMetadata";
public static final String MSG_TYPE_STOPPED_CONSUMING = "segmentStoppedConsuming";
public static final String MSG_TYPE_EXTEND_BUILD_TIME = "extendBuildTime";
diff --git a/pinot-controller/src/main/java/org/apache/pinot/controller/api/resources/LLCSegmentCompletionHandlers.java b/pinot-controller/src/main/java/org/apache/pinot/controller/api/resources/LLCSegmentCompletionHandlers.java
index 0d5de21..19e2123 100644
--- a/pinot-controller/src/main/java/org/apache/pinot/controller/api/resources/LLCSegmentCompletionHandlers.java
+++ b/pinot-controller/src/main/java/org/apache/pinot/controller/api/resources/LLCSegmentCompletionHandlers.java
@@ -20,14 +20,18 @@ package org.apache.pinot.controller.api.resources;
import com.google.common.annotations.VisibleForTesting;
import java.io.File;
+import java.io.FileInputStream;
import java.io.FileOutputStream;
import java.io.IOException;
import java.io.InputStream;
import java.io.OutputStream;
+import java.net.URI;
+import java.net.URISyntaxException;
+import java.nio.file.FileSystems;
+import java.nio.file.Files;
import java.util.List;
import java.util.Map;
import java.util.UUID;
-import javax.annotation.Nullable;
import javax.inject.Inject;
import javax.ws.rs.Consumes;
import javax.ws.rs.GET;
@@ -36,15 +40,20 @@ import javax.ws.rs.Path;
import javax.ws.rs.Produces;
import javax.ws.rs.QueryParam;
import javax.ws.rs.core.MediaType;
-import org.apache.commons.httpclient.URI;
+
+import com.google.common.base.Preconditions;
import org.apache.commons.io.FileUtils;
import org.apache.commons.io.IOUtils;
import org.apache.pinot.common.protocols.SegmentCompletionProtocol;
import org.apache.pinot.common.utils.LLCSegmentName;
import org.apache.pinot.common.utils.StringUtil;
+import org.apache.pinot.common.utils.TarGzCompressionUtils;
import org.apache.pinot.controller.ControllerConf;
import org.apache.pinot.controller.helix.core.realtime.SegmentCompletionManager;
+import org.apache.pinot.controller.helix.core.realtime.segment.CommittingSegmentDescriptor;
import org.apache.pinot.controller.util.SegmentCompletionUtils;
+import org.apache.pinot.core.segment.creator.impl.V1Constants;
+import org.apache.pinot.core.segment.index.SegmentMetadataImpl;
import org.apache.pinot.filesystem.PinotFS;
import org.apache.pinot.filesystem.PinotFSFactory;
import org.glassfish.jersey.media.multipart.FormDataBodyPart;
@@ -58,8 +67,10 @@ import org.slf4j.LoggerFactory;
@Path("/")
public class LLCSegmentCompletionHandlers {
+ private static final String SEGMENT_TMP_DIR = "segment.tmp";
private static Logger LOGGER = LoggerFactory.getLogger(LLCSegmentCompletionHandlers.class);
private static final String SCHEME = "file://";
+ private static final String METADATA_TEMP_DIR_SUFFIX = ".metadata.tmp";
@Inject
ControllerConf _controllerConf;
@@ -197,6 +208,16 @@ public class LLCSegmentCompletionHandlers {
return SegmentCompletionProtocol.RESP_FAILED.toJsonString();
}
+ SegmentMetadataImpl segmentMetadata;
+ try {
+ segmentMetadata = extractMetadataFromSegmentFile(segmentName, new URI(segmentLocation));
+ } catch (URISyntaxException e) {
+ LOGGER.error("Invalid segment location: {} for segment {}", segmentLocation, segmentName);
+ return SegmentCompletionProtocol.RESP_FAILED.toJsonString();
+ }
+ if (segmentMetadata == null) {
+ return SegmentCompletionProtocol.RESP_FAILED.toJsonString();
+ }
SegmentCompletionProtocol.Request.Params requestParams = new SegmentCompletionProtocol.Request.Params();
requestParams.withInstanceId(instanceId).withSegmentName(segmentName).withOffset(offset)
.withSegmentLocation(segmentLocation).withSegmentSizeBytes(segmentSizeBytes)
@@ -207,8 +228,10 @@ public class LLCSegmentCompletionHandlers {
final boolean isSuccess = true;
final boolean isSplitCommit = true;
- SegmentCompletionProtocol.Response response =
- SegmentCompletionManager.getInstance().segmentCommitEnd(requestParams, isSuccess, isSplitCommit);
+ CommittingSegmentDescriptor committingSegmentDescriptor =
+ CommittingSegmentDescriptor.fromSegmentCompletionReqParamsAndMetadata(requestParams, segmentMetadata);
+ SegmentCompletionProtocol.Response response = SegmentCompletionManager.getInstance()
+ .segmentCommitEnd(requestParams, isSuccess, isSplitCommit, committingSegmentDescriptor);
final String responseStr = response.toJsonString();
LOGGER.info("Response to segmentCommitEnd:{}", responseStr);
return responseStr;
@@ -234,13 +257,68 @@ public class LLCSegmentCompletionHandlers {
final SegmentCompletionManager segmentCompletionManager = SegmentCompletionManager.getInstance();
SegmentCompletionProtocol.Response response = segmentCompletionManager.segmentCommitStart(requestParams);
- if (response.equals(SegmentCompletionProtocol.RESP_COMMIT_CONTINUE)) {
- // Get the segment and put it in the right place.
- boolean success = uploadSegment(multiPart, instanceId, segmentName, false) != null;
- response = segmentCompletionManager.segmentCommitEnd(requestParams, success, false);
+ CommittingSegmentDescriptor committingSegmentDescriptor =
+ CommittingSegmentDescriptor.fromSegmentCompletionReqParams(requestParams);
+ boolean success = false;
+
+ if (response.equals(SegmentCompletionProtocol.RESP_COMMIT_CONTINUE)) {
+ File localTmpFile = null;
+ try {
+ // Get the segment from the form input and put it in a tmp area in the local file system.
+ localTmpFile = uploadFileToLocalTmpFile(multiPart, instanceId, segmentName);
+ if (localTmpFile == null) {
+ LOGGER.error("Unable to get the segment file from multipart input to local file {}", segmentName);
+ } else {
+ // Extract the segment metadata from the segment file.
+ SegmentMetadataImpl segmentMetadata =
+ getSegmentMetadataFromLocalFile(new LLCSegmentName(segmentName), localTmpFile);
+ if (segmentMetadata == null) {
+ LOGGER.error("Unable to extract segment metadata from segment data: {}", segmentName);
+ } else {
+ // Store the segment file to Pinot FS.
+ try {
+ FileUploadPathProvider provider = new FileUploadPathProvider(_controllerConf);
+ final String rawTableName = new LLCSegmentName(segmentName).getTableName();
+ URI segmentFileURI = ControllerConf.getUriFromPath(
+ StringUtil.join("/", provider.getBaseDataDirURI().toString(), rawTableName, segmentName));
+ PinotFS pinotFS = PinotFSFactory.create(provider.getBaseDataDirURI().getScheme());
+ // Multiple threads can reach this point at the same time, if the following scenario happens
+ // The server that was asked to commit did so very slowly (due to network speeds). Meanwhile the FSM in
+ // SegmentCompletionManager timed out, and allowed another server to commit, which did so very quickly (somehow
+ // the network speeds changed). The second server made it through the FSM and reached this point.
+ // The synchronization below takes care that exactly one file gets moved in place.
+ // There are still corner conditions that are not handled correctly. For example,
+ // 1. What if the offset of the faster server was different?
+ // 2. We know that only the faster server will get to complete the COMMIT call successfully. But it is possible
+ // that the race to this statement is won by the slower server, and so the real segment that is in there is that
+ // of the slower server.
+ // In order to overcome controller restarts after the segment is moved to PinotFS, but before it is committed, we DO need to
+ // check for existing segment file and remove it. So, the block cannot be removed altogether.
+ // For now, we live with these corner cases. Once we have split-commit enabled and working, this code will no longer
+ // be used.
+ synchronized (SegmentCompletionManager.getInstance()) {
+ if (pinotFS.exists(segmentFileURI)) {
+ LOGGER.warn("Segment file {} exists. Replacing with upload from {} for segment {}",
+ segmentFileURI.toString(), instanceId, segmentName);
+ pinotFS.delete(segmentFileURI, true);
+ }
+ pinotFS.copyFromLocalFile(localTmpFile, segmentFileURI);
+ }
+ committingSegmentDescriptor =
+ CommittingSegmentDescriptor.fromSegmentCompletionReqParamsAndMetadata(requestParams, segmentMetadata);
+ success = true;
+ } catch (Exception e) {
+ LOGGER.error("Could not save segment {} to PinotFS", segmentName, e);
+ }
+ }
+ }
+ } finally {
+ FileUtils.deleteQuietly(localTmpFile);
+ }
}
+ response = segmentCompletionManager.segmentCommitEnd(requestParams, success, false, committingSegmentDescriptor);
LOGGER.info("Response to segmentCommit: instance={} segment={} status={} offset={}", requestParams.getInstanceId(),
requestParams.getSegmentName(), response.getStatus(), response.getOffset());
@@ -260,25 +338,202 @@ public class LLCSegmentCompletionHandlers {
requestParams.withInstanceId(instanceId).withSegmentName(segmentName).withOffset(offset);
LOGGER.info("Processing segmentUpload:{}", requestParams.toString());
- final String segmentLocation = uploadSegment(multiPart, instanceId, segmentName, true);
- if (segmentLocation == null) {
+ // Get the segment from the form input and put it in the right place.
+ File localTmpFile = uploadFileToLocalTmpFile(multiPart, instanceId, segmentName);
+ if (localTmpFile == null) {
+ LOGGER.error("Unable to get the segment file from multipart input to local file {}", segmentName);
return SegmentCompletionProtocol.RESP_FAILED.toJsonString();
}
- SegmentCompletionProtocol.Response.Params responseParams =
- new SegmentCompletionProtocol.Response.Params().withOffset(requestParams.getOffset())
- .withSegmentLocation(segmentLocation)
- .withStatus(SegmentCompletionProtocol.ControllerResponseStatus.UPLOAD_SUCCESS);
+ try {
+ FileUploadPathProvider provider = new FileUploadPathProvider(_controllerConf);
+ URI uri = localSegementFileToPinotFsTmpLocation(provider, localTmpFile, segmentName);
+ if (uri == null) {
+ LOGGER.error("Unable to upload local segment file {} to Pinot storage for segment ", localTmpFile.toPath(),
+ segmentName);
+ return SegmentCompletionProtocol.RESP_FAILED.toJsonString();
+ }
+ SegmentCompletionProtocol.Response.Params responseParams =
+ new SegmentCompletionProtocol.Response.Params().withOffset(requestParams.getOffset())
+ .withSegmentLocation(uri.toString())
+ .withStatus(SegmentCompletionProtocol.ControllerResponseStatus.UPLOAD_SUCCESS);
+
+ String response = new SegmentCompletionProtocol.Response(responseParams).toJsonString();
- String response = new SegmentCompletionProtocol.Response(responseParams).toJsonString();
+ LOGGER.info("Response to segmentUpload:{}", response);
- LOGGER.info("Response to segmentUpload:{}", response);
+ return response;
+ } catch (Exception e) {
- return response;
+ return SegmentCompletionProtocol.RESP_FAILED.toJsonString();
+ } finally {
+ FileUtils.deleteQuietly(localTmpFile);
+ }
}
- @Nullable
- private String uploadSegment(FormDataMultiPart multiPart, String instanceId, String segmentName,
- boolean isSplitCommit) {
+ @POST
+ @Path(SegmentCompletionProtocol.MSG_TYPE_COMMIT_END_METADATA)
+ @Produces(MediaType.APPLICATION_JSON)
+ @Consumes(MediaType.MULTIPART_FORM_DATA)
+ public String segmentCommitEndWithMetadata(@QueryParam(SegmentCompletionProtocol.PARAM_INSTANCE_ID) String instanceId,
+ @QueryParam(SegmentCompletionProtocol.PARAM_SEGMENT_NAME) String segmentName,
+ @QueryParam(SegmentCompletionProtocol.PARAM_SEGMENT_LOCATION) String segmentLocation,
+ @QueryParam(SegmentCompletionProtocol.PARAM_OFFSET) long offset,
+ @QueryParam(SegmentCompletionProtocol.PARAM_MEMORY_USED_BYTES) long memoryUsedBytes,
+ @QueryParam(SegmentCompletionProtocol.PARAM_BUILD_TIME_MILLIS) long buildTimeMillis,
+ @QueryParam(SegmentCompletionProtocol.PARAM_WAIT_TIME_MILLIS) long waitTimeMillis,
+ @QueryParam(SegmentCompletionProtocol.PARAM_ROW_COUNT) int numRows,
+ @QueryParam(SegmentCompletionProtocol.PARAM_SEGMENT_SIZE_BYTES) long segmentSizeBytes,
+ FormDataMultiPart metadataFiles) {
+ if (instanceId == null || segmentName == null || offset == -1 || segmentLocation == null || metadataFiles == null) {
+ LOGGER.error("Invalid call: offset={}, segmentName={}, instanceId={}, segmentLocation={}", offset, segmentName,
+ instanceId, segmentLocation);
+ // TODO: memoryUsedInBytes = 0 if not present in params. Add validation when we start using it
+ return SegmentCompletionProtocol.RESP_FAILED.toJsonString();
+ }
+
+ SegmentCompletionProtocol.Request.Params requestParams = new SegmentCompletionProtocol.Request.Params();
+ requestParams.withInstanceId(instanceId).withSegmentName(segmentName).withOffset(offset)
+ .withSegmentLocation(segmentLocation).withSegmentSizeBytes(segmentSizeBytes)
+ .withBuildTimeMillis(buildTimeMillis).withWaitTimeMillis(waitTimeMillis).withNumRows(numRows)
+ .withMemoryUsedBytes(memoryUsedBytes);
+ LOGGER.info("Processing segmentCommitEnd:{}", requestParams.toString());
+
+ final boolean isSuccess = true;
+ final boolean isSplitCommit = true;
+ SegmentMetadataImpl segmentMetadata = extractMetadataFromInput(metadataFiles, segmentName);
+ // If it fails to extract metadata from the input form, return failure.
+ if (segmentMetadata == null) {
+ LOGGER.error("Segment metadata extraction failure for segment {}", segmentName);
+ return SegmentCompletionProtocol.RESP_FAILED.toJsonString();
+ }
+ SegmentCompletionProtocol.Response response = SegmentCompletionManager.getInstance()
+ .segmentCommitEnd(requestParams, isSuccess, isSplitCommit,
+ CommittingSegmentDescriptor.fromSegmentCompletionReqParamsAndMetadata(requestParams, segmentMetadata));
+ final String responseStr = response.toJsonString();
+ LOGGER.info("Response to segmentCommitEnd:{}", responseStr);
+ return responseStr;
+ }
+
+ /**
+ * Extract and return the segment metadata from the two input form data files (metadata file and creation meta).
+ * Return null if any of the two files is missing or there is exception during parsing and extraction.
+ */
+ private SegmentMetadataImpl extractMetadataFromInput(FormDataMultiPart metadataFiles, String segmentNameStr) {
+ String tempMetadataDirStr = StringUtil.join("/", _controllerConf.getLocalTempDir(),
+ segmentNameStr + METADATA_TEMP_DIR_SUFFIX + String.valueOf(System.currentTimeMillis()));
+ File tempMetadataDir = new File(tempMetadataDirStr);
+ try {
+ Preconditions.checkState(tempMetadataDir.mkdirs(), "Failed to create directory: %s", tempMetadataDirStr);
+ // Extract metadata.properties from the metadataFiles.
+ if (!extractMetadataFromInputField(metadataFiles, tempMetadataDirStr,
+ V1Constants.MetadataKeys.METADATA_FILE_NAME, segmentNameStr)) {
+ return null;
+ }
+ // Extract creation.meta from the metadataFiles.
+ if (!extractMetadataFromInputField(metadataFiles, tempMetadataDirStr, V1Constants.SEGMENT_CREATION_META,
+ segmentNameStr)) {
+ return null;
+ }
+ // Load segment metadata
+ return new SegmentMetadataImpl(tempMetadataDir);
+ } catch (Exception e) {
+ LOGGER.error("Exception extracting and reading segment metadata for {}", segmentNameStr, e);
+ return null;
+ } finally {
+ FileUtils.deleteQuietly(tempMetadataDir);
+ }
+ }
+
+ /**
+ *
+ * Extract a single file with name metaFileName from the input FormDataMultiPart and put it under the path
+ * tempMetadataDirStr + metaFileName.
+ * Return true iff the extraction and copy is successful.
+ */
+ private boolean extractMetadataFromInputField(FormDataMultiPart metadataFiles, String tempMetadataDirStr,
+ String metaFileName, String segmentName) {
+ FormDataBodyPart metadataFilesField = metadataFiles.getField(metaFileName);
+ Preconditions.checkNotNull(metadataFilesField, "The metadata input field %s does not exist.", metaFileName);
+
+ try (InputStream metadataPropertiesInputStream = metadataFilesField.getValueAs(InputStream.class)) {
+ Preconditions.checkNotNull(metadataPropertiesInputStream, "Unable to parse %s from input.", metaFileName);
+ java.nio.file.Path metadataPropertiesPath = FileSystems.getDefault().getPath(tempMetadataDirStr, metaFileName);
+ Files.copy(metadataPropertiesInputStream, metadataPropertiesPath);
+ return true;
+ } catch (IOException e) {
+ LOGGER.error("Failed to extract metadata property file: {} for segment {}", metaFileName, segmentName, e);
+ }
+ return false;
+ }
+
+ /**
+ * Extract metadata from a segment found in a URI (i.e., segmentLocation) in PinotFS.
+ * <p>We extract the metadata.properties and creation.meta into a temporary metadata directory:
+ * DATADIR/rawTableName/segmentName.metadata.tmp, and load metadata from there.
+ *
+ * @param segmentNameStr Name of the segment
+ * @param segmentLocation the location of the segment file in PinotFS.
+ * @return SegmentMetadataImpl if it is able to extract the metadata file from the tar-zipped segment file.
+ */
+ private SegmentMetadataImpl extractMetadataFromSegmentFile(final String segmentNameStr, final URI segmentLocation) {
+ LLCSegmentName segmentName = new LLCSegmentName(segmentNameStr);
+ String baseDirStr = StringUtil.join("/", _controllerConf.getDataDir(), segmentName.getTableName());
+ String tempSegmentDataDirStr =
+ StringUtil.join("/", baseDirStr, segmentNameStr + SEGMENT_TMP_DIR + String.valueOf(System.currentTimeMillis()));
+ File tempSegmentDataDir = new File(tempSegmentDataDirStr);
+ File segDstFile = new File(StringUtil.join("/", tempSegmentDataDirStr, segmentNameStr));
+ // Use PinotFS to copy the segment file to local fs for metadata extraction.
+ PinotFS pinotFS = PinotFSFactory.create(ControllerConf.getUriFromPath(_controllerConf.getDataDir()).getScheme());
+ try {
+ Preconditions.checkState(tempSegmentDataDir.mkdirs(), "Failed to create directory: %s", tempSegmentDataDir);
+ pinotFS.copyToLocalFile(segmentLocation, segDstFile);
+ return getSegmentMetadataFromLocalFile(segmentName, segDstFile);
+ } catch (Exception e) {
+ LOGGER.error("Exception in extracting segment file to local {}", segmentNameStr, e);
+ return null;
+ } finally {
+ FileUtils.deleteQuietly(tempSegmentDataDir);
+ }
+ }
+
+ private SegmentMetadataImpl getSegmentMetadataFromLocalFile(LLCSegmentName segmentName, File segmentFile) {
+ String baseDirStr = StringUtil.join("/", _controllerConf.getDataDir(), segmentName.getTableName());
+ String tempMetadataDirStr = StringUtil.join("/", baseDirStr,
+ segmentName.getSegmentName() + METADATA_TEMP_DIR_SUFFIX + String.valueOf(System.currentTimeMillis()));
+ File tempMetadataDir = new File(tempMetadataDirStr);
+ try (// Extract metadata.properties
+ InputStream metadataPropertiesInputStream = TarGzCompressionUtils
+ .unTarOneFile(new FileInputStream(segmentFile), V1Constants.MetadataKeys.METADATA_FILE_NAME);
+ // Extract creation.meta
+ InputStream creationMetaInputStream = TarGzCompressionUtils
+ .unTarOneFile(new FileInputStream(segmentFile), V1Constants.SEGMENT_CREATION_META)) {
+ Preconditions.checkState(tempMetadataDir.mkdirs(), "Failed to create directory: %s", tempMetadataDirStr);
+ Preconditions.checkNotNull(metadataPropertiesInputStream, "%s does not exist",
+ V1Constants.MetadataKeys.METADATA_FILE_NAME);
+ java.nio.file.Path metadataPropertiesPath =
+ FileSystems.getDefault().getPath(tempMetadataDirStr, V1Constants.MetadataKeys.METADATA_FILE_NAME);
+ Files.copy(metadataPropertiesInputStream, metadataPropertiesPath);
+
+ Preconditions.checkNotNull(creationMetaInputStream, "%s does not exist", V1Constants.SEGMENT_CREATION_META);
+ java.nio.file.Path creationMetaPath =
+ FileSystems.getDefault().getPath(tempMetadataDirStr, V1Constants.SEGMENT_CREATION_META);
+ Files.copy(creationMetaInputStream, creationMetaPath);
+ // Load segment metadata
+ return new SegmentMetadataImpl(tempMetadataDir);
+ } catch (Exception e) {
+ LOGGER.error("Exception extracting and reading segment metadata for {}", segmentName.getSegmentName(), e);
+ return null;
+ } finally {
+ FileUtils.deleteQuietly(tempMetadataDir);
+ }
+ }
+
+ /**
+ *
+ * Copy the uploaded segment file in the input form to a local tmp file and return the tmp file.
+ * Return null when there is any error during the process.
+ */
+ private File uploadFileToLocalTmpFile(FormDataMultiPart multiPart, String instanceId, String segmentName) {
try {
Map<String, List<FormDataBodyPart>> map = multiPart.getFields();
if (!PinotSegmentUploadRestletResource.validateMultiPart(map, segmentName)) {
@@ -297,59 +552,7 @@ public class LLCSegmentCompletionHandlers {
OutputStream outputStream = new FileOutputStream(localTmpFile)) {
IOUtils.copyLarge(inputStream, outputStream);
}
-
- LLCSegmentName llcSegmentName = new LLCSegmentName(segmentName);
- final String rawTableName = llcSegmentName.getTableName();
- final java.net.URI tableDirURI =
- ControllerConf.getUriFromPath(StringUtil.join("/", provider.getBaseDataDirURI().toString(), rawTableName));
- java.net.URI segmentFileURI;
- if (isSplitCommit) {
- // We only clean up tmp segment file under table dir, so don't create any sub-dir under table dir.
- // See PinotLLCRealtimeSegmentManager.commitSegmentFile().
- // TODO: move tmp file logic into SegmentCompletionUtils.
- String uniqueSegmentFileName = SegmentCompletionUtils.generateSegmentFileName(segmentName);
- segmentFileURI =
- ControllerConf.getUriFromPath(StringUtil.join("/", tableDirURI.toString(), uniqueSegmentFileName));
- } else {
- segmentFileURI = ControllerConf.getUriFromPath(StringUtil.join("/", tableDirURI.toString(), segmentName));
- }
-
- PinotFS pinotFS = PinotFSFactory.create(provider.getBaseDataDirURI().getScheme());
- try {
- if (isSplitCommit) {
- pinotFS.copyFromLocalFile(localTmpFile, segmentFileURI);
- } else {
- // Multiple threads can reach this point at the same time, if the following scenario happens
- // The server that was asked to commit did so very slowly (due to network speeds). Meanwhile the FSM in
- // SegmentCompletionManager timed out, and allowed another server to commit, which did so very quickly (somehow
- // the network speeds changed). The second server made it through the FSM and reached this point.
- // The synchronization below takes care that exactly one file gets moved in place.
- // There are still corner conditions that are not handled correctly. For example,
- // 1. What if the offset of the faster server was different?
- // 2. We know that only the faster server will get to complete the COMMIT call successfully. But it is possible
- // that the race to this statement is won by the slower server, and so the real segment that is in there is that
- // of the slower server.
- // In order to overcome controller restarts after the segment is renamed, but before it is committed, we DO need to
- // check for existing segment file and remove it. So, the block cannot be removed altogether.
- // For now, we live with these corner cases. Once we have split-commit enabled and working, this code will no longer
- // be used.
- synchronized (SegmentCompletionManager.getInstance()) {
- if (pinotFS.exists(segmentFileURI)) {
- LOGGER
- .warn("Segment file {} exists. Replacing with upload from {}", segmentFileURI.toString(), instanceId);
- pinotFS.delete(segmentFileURI, true);
- }
- pinotFS.copyFromLocalFile(localTmpFile, segmentFileURI);
- }
- }
- } catch (Exception e) {
- LOGGER.error("Could not copy from {} to {}", localTmpFile.getAbsolutePath(), segmentFileURI.toString());
- } finally {
- FileUtils.deleteQuietly(localTmpFile);
- }
-
- LOGGER.info("Moved file {} to {}", localTmpFile.getAbsolutePath(), segmentFileURI.toString());
- return new URI(SCHEME + segmentFileURI.toString(), /* boolean escaped */ false).toString();
+ return localTmpFile;
} catch (InvalidControllerConfigException e) {
LOGGER.error("Invalid controller config exception from instance {} for segment {}", instanceId, segmentName, e);
return null;
@@ -360,4 +563,19 @@ public class LLCSegmentCompletionHandlers {
multiPart.cleanup();
}
}
+
+ private URI localSegementFileToPinotFsTmpLocation(FileUploadPathProvider provider, File localTmpFile,
+ String segmentName)
+ throws Exception {
+ final String rawTableName = new LLCSegmentName(segmentName).getTableName();
+ // We only clean up tmp segment file under table dir, so don't create any sub-dir under table dir.
+ // See PinotLLCRealtimeSegmentManager.commitSegmentFile().
+ // TODO: move tmp file logic into SegmentCompletionUtils.
+ String uniqueSegmentFileName = SegmentCompletionUtils.generateSegmentFileName(segmentName);
+ URI segmentFileURI = ControllerConf.getUriFromPath(
+ StringUtil.join("/", provider.getBaseDataDirURI().toString(), rawTableName, uniqueSegmentFileName));
+ PinotFS pinotFS = PinotFSFactory.create(provider.getBaseDataDirURI().getScheme());
+ pinotFS.copyFromLocalFile(localTmpFile, segmentFileURI);
+ return segmentFileURI;
+ }
}
diff --git a/pinot-controller/src/main/java/org/apache/pinot/controller/helix/core/realtime/PinotLLCRealtimeSegmentManager.java b/pinot-controller/src/main/java/org/apache/pinot/controller/helix/core/realtime/PinotLLCRealtimeSegmentManager.java
index 82d2e31..7a883ca 100644
--- a/pinot-controller/src/main/java/org/apache/pinot/controller/helix/core/realtime/PinotLLCRealtimeSegmentManager.java
+++ b/pinot-controller/src/main/java/org/apache/pinot/controller/helix/core/realtime/PinotLLCRealtimeSegmentManager.java
@@ -23,13 +23,7 @@ import com.google.common.base.Function;
import com.google.common.base.Preconditions;
import com.google.common.collect.Lists;
import com.google.common.util.concurrent.Uninterruptibles;
-import java.io.File;
-import java.io.FileInputStream;
-import java.io.InputStream;
import java.net.URI;
-import java.nio.file.FileSystems;
-import java.nio.file.Files;
-import java.nio.file.Path;
import java.util.ArrayList;
import java.util.Collections;
import java.util.HashMap;
@@ -45,7 +39,6 @@ import java.util.concurrent.locks.ReentrantLock;
import javax.annotation.Nonnull;
import javax.annotation.Nullable;
import org.apache.commons.collections.CollectionUtils;
-import org.apache.commons.io.FileUtils;
import org.apache.helix.AccessOption;
import org.apache.helix.HelixAdmin;
import org.apache.helix.HelixManager;
@@ -70,7 +63,6 @@ import org.apache.pinot.common.utils.CommonConstants;
import org.apache.pinot.common.utils.LLCSegmentName;
import org.apache.pinot.common.utils.SegmentName;
import org.apache.pinot.common.utils.StringUtil;
-import org.apache.pinot.common.utils.TarGzCompressionUtils;
import org.apache.pinot.common.utils.helix.HelixHelper;
import org.apache.pinot.common.utils.retry.RetryPolicies;
import org.apache.pinot.controller.ControllerConf;
@@ -89,7 +81,6 @@ import org.apache.pinot.core.realtime.stream.OffsetCriteria;
import org.apache.pinot.core.realtime.stream.PartitionOffsetFetcher;
import org.apache.pinot.core.realtime.stream.StreamConfig;
import org.apache.pinot.core.realtime.stream.StreamConfigProperties;
-import org.apache.pinot.core.segment.creator.impl.V1Constants;
import org.apache.pinot.core.segment.index.ColumnMetadata;
import org.apache.pinot.core.segment.index.SegmentMetadataImpl;
import org.apache.pinot.filesystem.PinotFS;
@@ -486,7 +477,8 @@ public class PinotLLCRealtimeSegmentManager {
}
// Step-1
- boolean success = updateOldSegmentMetadataZNRecord(realtimeTableName, committingLLCSegmentName, nextOffset);
+ boolean success = updateOldSegmentMetadataZNRecord(realtimeTableName, committingLLCSegmentName, nextOffset,
+ committingSegmentDescriptor);
if (!success) {
return false;
}
@@ -532,10 +524,12 @@ public class PinotLLCRealtimeSegmentManager {
* @param realtimeTableName - table name for which segment is being committed
* @param committingLLCSegmentName - name of the segment being committed
* @param nextOffset - the end offset for this committing segment
+ * @param committingSegmentDescriptor - the metadata of the commit segment.
* @return
*/
protected boolean updateOldSegmentMetadataZNRecord(String realtimeTableName, LLCSegmentName committingLLCSegmentName,
- long nextOffset) {
+ long nextOffset,
+ CommittingSegmentDescriptor committingSegmentDescriptor) {
String committingSegmentNameStr = committingLLCSegmentName.getSegmentName();
Stat stat = new Stat();
@@ -547,6 +541,12 @@ public class PinotLLCRealtimeSegmentManager {
committingSegmentNameStr, realtimeTableName, committingSegmentMetadata.getStatus());
return false;
}
+ if (committingSegmentDescriptor.getSegmentMetadata() == null) {
+ LOGGER.error("No segment metadata found in descriptor for committing segment {} for table {}", committingLLCSegmentName,
+ realtimeTableName);
+ return false;
+ }
+ SegmentMetadataImpl segmentMetadata = committingSegmentDescriptor.getSegmentMetadata();
// TODO: set number of rows to end consumption in new segment metadata, based on memory used and number of rows from old segment
committingSegmentMetadata.setEndOffset(nextOffset);
@@ -554,8 +554,6 @@ public class PinotLLCRealtimeSegmentManager {
String rawTableName = TableNameBuilder.extractRawTableName(realtimeTableName);
committingSegmentMetadata.setDownloadUrl(
ControllerConf.constructDownloadUrl(rawTableName, committingSegmentNameStr, _controllerConf.generateVipUrl()));
- // Pull segment metadata from incoming segment and set it in zk segment metadata
- SegmentMetadataImpl segmentMetadata = extractSegmentMetadata(rawTableName, committingSegmentNameStr);
committingSegmentMetadata.setCrc(Long.valueOf(segmentMetadata.getCrc()));
committingSegmentMetadata.setStartTime(segmentMetadata.getTimeInterval().getStartMillis());
committingSegmentMetadata.setEndTime(segmentMetadata.getTimeInterval().getEndMillis());
@@ -683,51 +681,6 @@ public class PinotLLCRealtimeSegmentManager {
return commitTimeoutMS;
}
- /**
- * Extract the segment metadata files from the tar-zipped segment file that is expected to be in the directory for the
- * table.
- * <p>Segment tar-zipped file path: DATADIR/rawTableName/segmentName.
- * <p>We extract the metadata.properties and creation.meta into a temporary metadata directory:
- * DATADIR/rawTableName/segmentName.metadata.tmp, and load metadata from there.
- *
- * @param rawTableName Name of the table (not including the REALTIME extension)
- * @param segmentNameStr Name of the segment
- * @return SegmentMetadataImpl if it is able to extract the metadata file from the tar-zipped segment file.
- */
- protected SegmentMetadataImpl extractSegmentMetadata(final String rawTableName, final String segmentNameStr) {
- String baseDirStr = StringUtil.join("/", _controllerConf.getDataDir(), rawTableName);
- String segFileStr = StringUtil.join("/", baseDirStr, segmentNameStr);
- String tempMetadataDirStr = StringUtil.join("/", baseDirStr, segmentNameStr + METADATA_TEMP_DIR_SUFFIX);
- File tempMetadataDir = new File(tempMetadataDirStr);
-
- try {
- Preconditions.checkState(tempMetadataDir.mkdirs(), "Failed to create directory: %s", tempMetadataDirStr);
-
- // Extract metadata.properties
- InputStream metadataPropertiesInputStream = TarGzCompressionUtils
- .unTarOneFile(new FileInputStream(new File(segFileStr)), V1Constants.MetadataKeys.METADATA_FILE_NAME);
- Preconditions.checkNotNull(metadataPropertiesInputStream, "%s does not exist",
- V1Constants.MetadataKeys.METADATA_FILE_NAME);
- Path metadataPropertiesPath =
- FileSystems.getDefault().getPath(tempMetadataDirStr, V1Constants.MetadataKeys.METADATA_FILE_NAME);
- Files.copy(metadataPropertiesInputStream, metadataPropertiesPath);
-
- // Extract creation.meta
- InputStream creationMetaInputStream = TarGzCompressionUtils
- .unTarOneFile(new FileInputStream(new File(segFileStr)), V1Constants.SEGMENT_CREATION_META);
- Preconditions.checkNotNull(creationMetaInputStream, "%s does not exist", V1Constants.SEGMENT_CREATION_META);
- Path creationMetaPath = FileSystems.getDefault().getPath(tempMetadataDirStr, V1Constants.SEGMENT_CREATION_META);
- Files.copy(creationMetaInputStream, creationMetaPath);
-
- // Load segment metadata
- return new SegmentMetadataImpl(tempMetadataDir);
- } catch (Exception e) {
- throw new RuntimeException("Exception extracting and reading segment metadata for " + segmentNameStr, e);
- } finally {
- FileUtils.deleteQuietly(tempMetadataDir);
- }
- }
-
public LLCRealtimeSegmentZKMetadata getRealtimeSegmentZKMetadata(String realtimeTableName, String segmentName,
Stat stat) {
ZNRecord znRecord = _propertyStore
diff --git a/pinot-controller/src/main/java/org/apache/pinot/controller/helix/core/realtime/SegmentCompletionManager.java b/pinot-controller/src/main/java/org/apache/pinot/controller/helix/core/realtime/SegmentCompletionManager.java
index e3e3003..b499e3e 100644
--- a/pinot-controller/src/main/java/org/apache/pinot/controller/helix/core/realtime/SegmentCompletionManager.java
+++ b/pinot-controller/src/main/java/org/apache/pinot/controller/helix/core/realtime/SegmentCompletionManager.java
@@ -290,7 +290,7 @@ public class SegmentCompletionManager {
* @return
*/
public SegmentCompletionProtocol.Response segmentCommitEnd(SegmentCompletionProtocol.Request.Params reqParams,
- boolean success, boolean isSplitCommit) {
+ boolean success, boolean isSplitCommit, CommittingSegmentDescriptor committingSegmentDescriptor) {
if (!isLeader() || !_helixManager.isConnected()) {
_controllerMetrics.addMeteredGlobalValue(ControllerMeter.CONTROLLER_NOT_LEADER, 1L);
return SegmentCompletionProtocol.RESP_NOT_LEADER;
@@ -301,7 +301,7 @@ public class SegmentCompletionManager {
SegmentCompletionProtocol.Response response = SegmentCompletionProtocol.RESP_FAILED;
try {
fsm = lookupOrCreateFsm(segmentName, SegmentCompletionProtocol.MSG_TYPE_COMMIT);
- response = fsm.segmentCommitEnd(reqParams, success, isSplitCommit);
+ response = fsm.segmentCommitEnd(reqParams, success, isSplitCommit, committingSegmentDescriptor);
} catch (Exception e) {
LOGGER.error("Caught exception in segmentCommitEnd for segment {}", segmentNameStr, e);
}
@@ -605,7 +605,7 @@ public class SegmentCompletionManager {
* the _winner.
*/
public SegmentCompletionProtocol.Response segmentCommitEnd(SegmentCompletionProtocol.Request.Params reqParams,
- boolean success, boolean isSplitCommit) {
+ boolean success, boolean isSplitCommit, CommittingSegmentDescriptor committingSegmentDescriptor) {
String instanceId = reqParams.getInstanceId();
long offset = reqParams.getOffset();
synchronized (this) {
@@ -624,7 +624,7 @@ public class SegmentCompletionManager {
LOGGER.error("Segment upload failed");
return abortAndReturnFailed();
}
- SegmentCompletionProtocol.Response response = commitSegment(reqParams, isSplitCommit);
+ SegmentCompletionProtocol.Response response = commitSegment(reqParams, isSplitCommit, committingSegmentDescriptor);
if (!response.equals(SegmentCompletionProtocol.RESP_COMMIT_SUCCESS)) {
return abortAndReturnFailed();
} else {
@@ -1005,7 +1005,8 @@ public class SegmentCompletionManager {
}
private SegmentCompletionProtocol.Response commitSegment(SegmentCompletionProtocol.Request.Params reqParams,
- boolean isSplitCommit) {
+ boolean isSplitCommit,
+ CommittingSegmentDescriptor committingSegmentDescriptor) {
boolean success;
String instanceId = reqParams.getInstanceId();
long offset = reqParams.getOffset();
@@ -1019,8 +1020,6 @@ public class SegmentCompletionManager {
_state = State.COMMITTING;
// In case of splitCommit, the segment is uploaded to a unique file name indicated by segmentLocation,
// so we need to move the segment file to its permanent location first before committing the metadata.
- CommittingSegmentDescriptor committingSegmentDescriptor =
- CommittingSegmentDescriptor.fromSegmentCompletionReqParams(reqParams);
if (isSplitCommit) {
if (!_segmentManager.commitSegmentFile(_segmentName.getTableName(), committingSegmentDescriptor)) {
return SegmentCompletionProtocol.RESP_FAILED;
diff --git a/pinot-controller/src/main/java/org/apache/pinot/controller/helix/core/realtime/segment/CommittingSegmentDescriptor.java b/pinot-controller/src/main/java/org/apache/pinot/controller/helix/core/realtime/segment/CommittingSegmentDescriptor.java
index 82304d3..50c4e3f 100644
--- a/pinot-controller/src/main/java/org/apache/pinot/controller/helix/core/realtime/segment/CommittingSegmentDescriptor.java
+++ b/pinot-controller/src/main/java/org/apache/pinot/controller/helix/core/realtime/segment/CommittingSegmentDescriptor.java
@@ -19,6 +19,7 @@
package org.apache.pinot.controller.helix.core.realtime.segment;
import org.apache.pinot.common.protocols.SegmentCompletionProtocol;
+import org.apache.pinot.core.segment.index.SegmentMetadataImpl;
/**
@@ -29,6 +30,7 @@ public class CommittingSegmentDescriptor {
private long _segmentSizeBytes;
private String _segmentLocation;
private long _nextOffset;
+ private SegmentMetadataImpl _segmentMetadata;
public static CommittingSegmentDescriptor fromSegmentCompletionReqParams(
SegmentCompletionProtocol.Request.Params reqParams) {
@@ -39,6 +41,14 @@ public class CommittingSegmentDescriptor {
return committingSegmentDescriptor;
}
+ public static CommittingSegmentDescriptor fromSegmentCompletionReqParamsAndMetadata(
+ SegmentCompletionProtocol.Request.Params reqParams, SegmentMetadataImpl metadata) {
+ CommittingSegmentDescriptor committingSegmentDescriptor =
+ fromSegmentCompletionReqParams(reqParams);
+ committingSegmentDescriptor.setSegmentMetadata(metadata);
+ return committingSegmentDescriptor;
+ }
+
public CommittingSegmentDescriptor(String segmentName, long nextOffset, long segmentSizeBytes) {
_segmentName = segmentName;
_nextOffset = nextOffset;
@@ -82,4 +92,12 @@ public class CommittingSegmentDescriptor {
public void setNextOffset(long nextOffset) {
_nextOffset = nextOffset;
}
+
+ public SegmentMetadataImpl getSegmentMetadata() {
+ return _segmentMetadata;
+ }
+
+ public void setSegmentMetadata(SegmentMetadataImpl segmentMetadata) {
+ _segmentMetadata = segmentMetadata;
+ }
}
diff --git a/pinot-controller/src/test/java/org/apache/pinot/controller/helix/core/realtime/PinotLLCRealtimeSegmentManagerTest.java b/pinot-controller/src/test/java/org/apache/pinot/controller/helix/core/realtime/PinotLLCRealtimeSegmentManagerTest.java
index 1e68bbc..8153559 100644
--- a/pinot-controller/src/test/java/org/apache/pinot/controller/helix/core/realtime/PinotLLCRealtimeSegmentManagerTest.java
+++ b/pinot-controller/src/test/java/org/apache/pinot/controller/helix/core/realtime/PinotLLCRealtimeSegmentManagerTest.java
@@ -347,11 +347,11 @@ public class PinotLLCRealtimeSegmentManagerTest {
String tableName = tableConfig.getTableName();
String rawTableName = TableNameBuilder.extractRawTableName(tableName);
LLCSegmentName llcSegmentName = new LLCSegmentName(segmentName);
- segmentManager.updateOldSegmentMetadataZNRecord(tableName, llcSegmentName, nextOffset);
LLCSegmentName newLlcSegmentName =
new LLCSegmentName(rawTableName, partition, nextSeqNum, System.currentTimeMillis());
CommittingSegmentDescriptor committingSegmentDescriptor =
new CommittingSegmentDescriptor(segmentName, nextOffset, 0);
+ segmentManager.updateOldSegmentMetadataZNRecord(tableName, llcSegmentName, nextOffset, committingSegmentDescriptor);
segmentManager.createNewSegmentMetadataZNRecord(tableConfig, llcSegmentName, newLlcSegmentName, partitionAssignment,
committingSegmentDescriptor, false);
segmentManager.updateIdealStateOnSegmentCompletion(idealState, segmentName, newLlcSegmentName.getSegmentName(),
@@ -552,13 +552,13 @@ public class PinotLLCRealtimeSegmentManagerTest {
LLCSegmentName latestSegment = partitionToLatestSegments.get(String.valueOf(randomlySelectedPartition));
LLCRealtimeSegmentZKMetadata latestMetadata =
segmentManager.getRealtimeSegmentZKMetadata(tableName, latestSegment.getSegmentName(), null);
- segmentManager
- .updateOldSegmentMetadataZNRecord(tableName, latestSegment, latestMetadata.getStartOffset() + 100);
LLCSegmentName newLlcSegmentName =
new LLCSegmentName(rawTableName, randomlySelectedPartition, latestSegment.getSequenceNumber() + 1,
System.currentTimeMillis());
CommittingSegmentDescriptor committingSegmentDescriptor =
new CommittingSegmentDescriptor(latestSegment.getSegmentName(), latestMetadata.getStartOffset() + 100, 0);
+ segmentManager.updateOldSegmentMetadataZNRecord(tableName, latestSegment,
+ latestMetadata.getStartOffset() + 100, committingSegmentDescriptor);
segmentManager.createNewSegmentMetadataZNRecord(tableConfig, latestSegment, newLlcSegmentName,
expectedPartitionAssignment, committingSegmentDescriptor, false);
@@ -632,7 +632,8 @@ public class PinotLLCRealtimeSegmentManagerTest {
LLCRealtimeSegmentZKMetadata latestMetadata =
segmentManager.getRealtimeSegmentZKMetadata(tableName, latestSegment.getSegmentName(), null);
segmentManager
- .updateOldSegmentMetadataZNRecord(tableName, latestSegment, latestMetadata.getStartOffset() + 100);
+ .updateOldSegmentMetadataZNRecord(tableName, latestSegment, latestMetadata.getStartOffset() + 100,
+ new CommittingSegmentDescriptor(latestSegment.getSegmentName(), latestMetadata.getStartOffset() + 100, 0));
idealState = idealStateBuilder.setSegmentState(latestSegment.getSegmentName(), "ONLINE").build();
// get old state
@@ -666,7 +667,8 @@ public class PinotLLCRealtimeSegmentManagerTest {
LLCRealtimeSegmentZKMetadata latestMetadata =
segmentManager.getRealtimeSegmentZKMetadata(tableName, latestSegment.getSegmentName(), null);
segmentManager
- .updateOldSegmentMetadataZNRecord(tableName, latestSegment, latestMetadata.getStartOffset() + 100);
+ .updateOldSegmentMetadataZNRecord(tableName, latestSegment, latestMetadata.getStartOffset() + 100,
+ new CommittingSegmentDescriptor(latestSegment.getSegmentName(), latestMetadata.getStartOffset() + 100, 0));
// get old state
nPartitions = expectedPartitionAssignment.getNumPartitions();
@@ -894,7 +896,8 @@ public class PinotLLCRealtimeSegmentManagerTest {
segmentManager.IS_CONNECTED = false;
reqParams.withSegmentName(committingSegmentMetadata.getSegmentName()).withOffset(nextOffset);
CommittingSegmentDescriptor committingSegmentDescriptor =
- CommittingSegmentDescriptor.fromSegmentCompletionReqParams(reqParams);
+ CommittingSegmentDescriptor.fromSegmentCompletionReqParamsAndMetadata(reqParams,
+ segmentManager.newMockSegmentMetadata());
boolean status = segmentManager.commitSegmentMetadata(rawTableName, committingSegmentDescriptor);
Assert.assertFalse(status);
Assert.assertEquals(segmentManager._nCallsToUpdateHelix, 0); // Idealstate not updated
@@ -950,7 +953,7 @@ public class PinotLLCRealtimeSegmentManagerTest {
Set<String> prevInstances = idealState.getInstanceSet(committingSegmentMetadata.getSegmentName());
reqParams.withSegmentName(committingSegmentMetadata.getSegmentName()).withOffset(nextOffset);
CommittingSegmentDescriptor committingSegmentDescriptor =
- CommittingSegmentDescriptor.fromSegmentCompletionReqParams(reqParams);
+ CommittingSegmentDescriptor.fromSegmentCompletionReqParamsAndMetadata(reqParams, segmentManager.newMockSegmentMetadata());
boolean status = segmentManager.commitSegmentMetadata(rawTableName, committingSegmentDescriptor);
segmentManager.verifyMetadataInteractions();
Assert.assertTrue(status);
@@ -973,7 +976,8 @@ public class PinotLLCRealtimeSegmentManagerTest {
segmentManager._records.clear();
prevInstances = idealState.getInstanceSet(committingSegmentMetadata.getSegmentName());
reqParams.withSegmentName(committingSegmentMetadata.getSegmentName()).withOffset(nextOffset);
- committingSegmentDescriptor = CommittingSegmentDescriptor.fromSegmentCompletionReqParams(reqParams);
+ committingSegmentDescriptor = CommittingSegmentDescriptor.fromSegmentCompletionReqParamsAndMetadata(reqParams,
+ segmentManager.newMockSegmentMetadata());
status = segmentManager.commitSegmentMetadata(rawTableName, committingSegmentDescriptor);
segmentManager.verifyMetadataInteractions();
Assert.assertTrue(status);
@@ -991,7 +995,9 @@ public class PinotLLCRealtimeSegmentManagerTest {
segmentManager._paths.clear();
segmentManager._records.clear();
reqParams.withSegmentName(committingSegmentMetadata.getSegmentName()).withOffset(nextOffset);
- committingSegmentDescriptor = CommittingSegmentDescriptor.fromSegmentCompletionReqParams(reqParams);
+ // We do not expect the segment metadata to be used. Thus reuse the current metadata.
+ committingSegmentDescriptor = CommittingSegmentDescriptor.fromSegmentCompletionReqParamsAndMetadata(reqParams,
+ segmentManager.getMockSegmentMetadata());
status = segmentManager.commitSegmentMetadata(rawTableName, committingSegmentDescriptor);
segmentManager.verifyMetadataInteractions();
Assert.assertFalse(status);
@@ -1006,7 +1012,9 @@ public class PinotLLCRealtimeSegmentManagerTest {
segmentManager._paths.clear();
segmentManager._records.clear();
reqParams.withSegmentName(committingSegmentMetadata.getSegmentName()).withOffset(nextOffset);
- committingSegmentDescriptor = CommittingSegmentDescriptor.fromSegmentCompletionReqParams(reqParams);
+ // We do not expect the segment metadata to be used. Thus reuse the current metadata.
+ committingSegmentDescriptor = CommittingSegmentDescriptor.fromSegmentCompletionReqParamsAndMetadata(reqParams,
+ segmentManager.getMockSegmentMetadata());
status = segmentManager.commitSegmentMetadata(rawTableName, committingSegmentDescriptor);
segmentManager.verifyMetadataInteractions();
Assert.assertFalse(status);
@@ -1021,7 +1029,8 @@ public class PinotLLCRealtimeSegmentManagerTest {
committingSegmentMetadata = new LLCRealtimeSegmentZKMetadata(newZnRec);
prevInstances = idealState.getInstanceSet(committingSegmentMetadata.getSegmentName());
reqParams.withSegmentName(committingSegmentMetadata.getSegmentName()).withOffset(nextOffset);
- committingSegmentDescriptor = CommittingSegmentDescriptor.fromSegmentCompletionReqParams(reqParams);
+ committingSegmentDescriptor = CommittingSegmentDescriptor.fromSegmentCompletionReqParamsAndMetadata(reqParams,
+ segmentManager.newMockSegmentMetadata());
status = segmentManager.commitSegmentMetadata(rawTableName, committingSegmentDescriptor);
segmentManager.verifyMetadataInteractions();
Assert.assertTrue(status);
@@ -1091,7 +1100,8 @@ public class PinotLLCRealtimeSegmentManagerTest {
reqParams.withSegmentName(committingSegmentMetadata.getSegmentName()).withOffset(nextOffset);
CommittingSegmentDescriptor committingSegmentDescriptor =
- CommittingSegmentDescriptor.fromSegmentCompletionReqParams(reqParams);
+ CommittingSegmentDescriptor.fromSegmentCompletionReqParamsAndMetadata(reqParams,
+ segmentManager1.newMockSegmentMetadata());
boolean status = segmentManager1.commitSegmentMetadata(rawTableName, committingSegmentDescriptor);
Assert.assertTrue(status); // Committing segment metadata succeeded.
@@ -1365,6 +1375,19 @@ public class PinotLLCRealtimeSegmentManagerTest {
_tableConfigStore = new TableConfigStore();
}
+ private SegmentMetadataImpl newMockSegmentMetadata() {
+ segmentMetadata = mock(SegmentMetadataImpl.class);
+ when(segmentMetadata.getCrc()).thenReturn(FakePinotLLCRealtimeSegmentManager.CRC);
+ when(segmentMetadata.getTimeInterval()).thenReturn(FakePinotLLCRealtimeSegmentManager.INTERVAL);
+ when(segmentMetadata.getVersion()).thenReturn(FakePinotLLCRealtimeSegmentManager.SEGMENT_VERSION);
+ when(segmentMetadata.getTotalRawDocs()).thenReturn(FakePinotLLCRealtimeSegmentManager.NUM_DOCS);
+ return segmentMetadata;
+ }
+
+ private SegmentMetadataImpl getMockSegmentMetadata() {
+ return segmentMetadata;
+ }
+
void addTableToStore(String tableName, TableConfig tableConfig, int nStreamPartitions) {
_tableConfigStore.addTable(tableName, tableConfig, nStreamPartitions);
}
@@ -1457,11 +1480,6 @@ public class PinotLLCRealtimeSegmentManagerTest {
partitionAssignment, committingSegmentDescriptor, isNewTableSetup);
}
- @Override
- protected boolean updateOldSegmentMetadataZNRecord(String realtimeTableName,
- LLCSegmentName committingLLCSegmentName, long nextOffset) {
- return super.updateOldSegmentMetadataZNRecord(realtimeTableName, committingLLCSegmentName, nextOffset);
- }
@Override
public LLCRealtimeSegmentZKMetadata getRealtimeSegmentZKMetadata(String realtimeTableName, String segmentName,
@@ -1482,15 +1500,6 @@ public class PinotLLCRealtimeSegmentManagerTest {
return metadata;
}
- @Override
- protected SegmentMetadataImpl extractSegmentMetadata(final String rawTableName, final String segmentNameStr) {
- segmentMetadata = mock(SegmentMetadataImpl.class);
- when(segmentMetadata.getCrc()).thenReturn(CRC);
- when(segmentMetadata.getTimeInterval()).thenReturn(INTERVAL);
- when(segmentMetadata.getVersion()).thenReturn(SEGMENT_VERSION);
- when(segmentMetadata.getTotalRawDocs()).thenReturn(NUM_DOCS);
- return segmentMetadata;
- }
public void verifyMetadataInteractions() {
verify(segmentMetadata, times(1)).getCrc();
diff --git a/pinot-controller/src/test/java/org/apache/pinot/controller/helix/core/realtime/SegmentCompletionTest.java b/pinot-controller/src/test/java/org/apache/pinot/controller/helix/core/realtime/SegmentCompletionTest.java
index 6ebd34f..3b8503e 100644
--- a/pinot-controller/src/test/java/org/apache/pinot/controller/helix/core/realtime/SegmentCompletionTest.java
+++ b/pinot-controller/src/test/java/org/apache/pinot/controller/helix/core/realtime/SegmentCompletionTest.java
@@ -151,7 +151,8 @@ public class SegmentCompletionTest {
segmentCompletionMgr._secconds += 5;
params = new Request.Params().withInstanceId(s2).withOffset(s2Offset).withSegmentName(segmentNameStr);
- response = segmentCompletionMgr.segmentCommitEnd(params, true, false);
+ response = segmentCompletionMgr.segmentCommitEnd(params, true, false,
+ CommittingSegmentDescriptor.fromSegmentCompletionReqParams(params));
Assert.assertEquals(response.getStatus(), SegmentCompletionProtocol.ControllerResponseStatus.COMMIT_SUCCESS);
// Now the FSM should have disappeared from the map
@@ -220,7 +221,8 @@ public class SegmentCompletionTest {
segmentCompletionMgr._secconds += 5;
params = new Request.Params().withInstanceId(s2).withOffset(s2Offset).withSegmentName(segmentNameStr);
- response = segmentCompletionMgr.segmentCommitEnd(params, true, false);
+ response = segmentCompletionMgr.segmentCommitEnd(params, true, false,
+ CommittingSegmentDescriptor.fromSegmentCompletionReqParams(params));
Assert.assertEquals(response.getStatus(), SegmentCompletionProtocol.ControllerResponseStatus.COMMIT_SUCCESS);
// Now the FSM should have disappeared from the map
@@ -330,7 +332,8 @@ public class SegmentCompletionTest {
segmentCompletionMgr._secconds += 5;
params = new Request.Params().withInstanceId(s2).withOffset(s2Offset).withSegmentName(segmentNameStr)
.withSegmentLocation("doNotCommitMe");
- response = segmentCompletionMgr.segmentCommitEnd(params, true, true);
+ response = segmentCompletionMgr.segmentCommitEnd(params, true, true,
+ CommittingSegmentDescriptor.fromSegmentCompletionReqParams(params));
Assert.assertEquals(response.getStatus(), SegmentCompletionProtocol.ControllerResponseStatus.FAILED);
// Now the FSM should have aborted
@@ -363,7 +366,8 @@ public class SegmentCompletionTest {
segmentCompletionMgr._secconds += 5;
params = new Request.Params().withInstanceId(s3).withOffset(s2Offset).withSegmentName(segmentNameStr)
.withSegmentLocation("location");
- response = segmentCompletionMgr.segmentCommitEnd(params, true, true);
+ response = segmentCompletionMgr.segmentCommitEnd(params, true, true,
+ CommittingSegmentDescriptor.fromSegmentCompletionReqParams(params));
Assert.assertEquals(response.getStatus(), ControllerResponseStatus.COMMIT_SUCCESS);
// And the FSM should be removed.
Assert.assertFalse(fsmMap.containsKey(segmentNameStr));
@@ -415,7 +419,8 @@ public class SegmentCompletionTest {
segmentCompletionMgr._secconds += 5;
params = new Request.Params().withInstanceId(s2).withOffset(s2Offset).withSegmentName(segmentNameStr)
.withSegmentLocation("location");
- response = segmentCompletionMgr.segmentCommitEnd(params, true, true);
+ response = segmentCompletionMgr.segmentCommitEnd(params, true, true,
+ CommittingSegmentDescriptor.fromSegmentCompletionReqParams(params));
Assert.assertEquals(response.getStatus(), SegmentCompletionProtocol.ControllerResponseStatus.COMMIT_SUCCESS);
// Now the FSM should have disappeared from the map
@@ -464,7 +469,8 @@ public class SegmentCompletionTest {
segmentCompletionMgr._secconds += 5;
params = new Request.Params().withInstanceId(s3).withOffset(s3Offset).withSegmentName(segmentNameStr)
.withSegmentLocation("location");
- response = segmentCompletionMgr.segmentCommitEnd(params, true, true);
+ response = segmentCompletionMgr.segmentCommitEnd(params, true, true,
+ CommittingSegmentDescriptor.fromSegmentCompletionReqParams(params));
Assert.assertEquals(response.getStatus(), SegmentCompletionProtocol.ControllerResponseStatus.FAILED);
// Now the FSM should have disappeared from the map
@@ -523,7 +529,8 @@ public class SegmentCompletionTest {
segmentCompletionMgr._secconds += 5;
params = new Request.Params().withInstanceId(s2).withOffset(s2Offset).withSegmentName(segmentNameStr);
- response = segmentCompletionMgr.segmentCommitEnd(params, true, false);
+ response = segmentCompletionMgr.segmentCommitEnd(params, true, false,
+ CommittingSegmentDescriptor.fromSegmentCompletionReqParams(params));
Assert.assertEquals(response.getStatus(), SegmentCompletionProtocol.ControllerResponseStatus.COMMIT_SUCCESS);
// Now the FSM should have disappeared from the map
@@ -595,7 +602,8 @@ public class SegmentCompletionTest {
segmentCompletionMgr._secconds += 5;
params = new Request.Params().withInstanceId(s1).withOffset(s1Offset).withSegmentName(segmentNameStr);
- response = segmentCompletionMgr.segmentCommitEnd(params, true, false);
+ response = segmentCompletionMgr.segmentCommitEnd(params, true, false,
+ CommittingSegmentDescriptor.fromSegmentCompletionReqParams(params));
Assert.assertEquals(response.getStatus(), SegmentCompletionProtocol.ControllerResponseStatus.COMMIT_SUCCESS);
// We ask S2 to keep the segment
params = new Request.Params().withInstanceId(s2).withOffset(s1Offset).withSegmentName(segmentNameStr)
@@ -661,7 +669,8 @@ public class SegmentCompletionTest {
segmentCompletionMgr._secconds += 5;
params = new Request.Params().withInstanceId(s2).withOffset(s2Offset).withSegmentName(segmentNameStr)
.withSegmentLocation("location");
- response = segmentCompletionMgr.segmentCommitEnd(params, true, isSplitCommit);
+ response = segmentCompletionMgr.segmentCommitEnd(params, true, isSplitCommit,
+ CommittingSegmentDescriptor.fromSegmentCompletionReqParams(params));
Assert.assertEquals(response.getStatus(), SegmentCompletionProtocol.ControllerResponseStatus.COMMIT_SUCCESS);
// Now the FSM should have disappeared from the map
Assert.assertFalse(fsmMap.containsKey(segmentNameStr));
@@ -855,7 +864,8 @@ public class SegmentCompletionTest {
long commitTimeMs = (segmentCompletionMgr._secconds - startTime) * 1000;
Assert.assertEquals(commitTimeMap.get(tableName).longValue(), commitTimeMs);
segmentCompletionMgr._secconds += 55;
- response = segmentCompletionMgr.segmentCommitEnd(params, true, false);
+ response = segmentCompletionMgr.segmentCommitEnd(params, true, false,
+ CommittingSegmentDescriptor.fromSegmentCompletionReqParams(params));
Assert.assertEquals(response.getStatus(), SegmentCompletionProtocol.ControllerResponseStatus.COMMIT_SUCCESS);
// now FSM should be out of the map.
Assert.assertFalse((fsmMap.containsKey(segmentNameStr)));
---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscribe@pinot.apache.org
For additional commands, e-mail: commits-help@pinot.apache.org