You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@pinot.apache.org by mc...@apache.org on 2019/03/13 22:24:22 UTC

[incubator-pinot] branch master updated: Pinot controller side change to enhance LLC segment metadata upload. (#3877)

This is an automated email from the ASF dual-hosted git repository.

mcvsubbu pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/incubator-pinot.git


The following commit(s) were added to refs/heads/master by this push:
     new 2a70bb9  Pinot controller side change to enhance LLC segment metadata upload. (#3877)
2a70bb9 is described below

commit 2a70bb913f006d52a55aa686a86464b9acf4473f
Author: Ting Chen <ch...@gmail.com>
AuthorDate: Wed Mar 13 15:24:17 2019 -0700

    Pinot controller side change to enhance LLC segment metadata upload. (#3877)
    
    * Pinot controller side change to enhance LLC segment metadata upload.
    
    * Remove an unused lib.
    
    * Revise based on reviews.
    
    * Let the LLCSegmentCompletionHandlers handle the metadata extraction.
    
    * Clean up error case behaviors for metadata extraction based on review.
    
    * Add more logging for error cases based on review feedback.
    
    * Fix integration failure by passing the right segment location in controller for metadata extraction.
    
    * Use PinotFS to copy segment files for metadata extraction.
    
    * Construct segment file URI directly.
    
    * Further revision to address reviews.
    
    * (1)Remove the prefix java.net. (2) Add test to show the diff between Apache URI and java.net.URI.
    
    * Fix a comment typo.
    
    * Revise logging and lib inclusion based on reviews.
    
    * Return failure directly if there is no metadata file in the input form.
    
    * Refector the upload function based on Subu's comments.
    
    * Fix the log error level.
    
    * Apply Subba's refactoring and a few of my twigs.
    
    * Address reviewer feedbacks about format and error logs.
    
    * Fix an compilation error.
    
    * Fix style issues and eliminate a local method.
    
    * Fix redundant line.
---
 .../protocols/SegmentCompletionProtocol.java       |   1 +
 .../resources/LLCSegmentCompletionHandlers.java    | 364 ++++++++++++++++-----
 .../realtime/PinotLLCRealtimeSegmentManager.java   |  69 +---
 .../core/realtime/SegmentCompletionManager.java    |  13 +-
 .../segment/CommittingSegmentDescriptor.java       |  18 +
 .../PinotLLCRealtimeSegmentManagerTest.java        |  61 ++--
 .../helix/core/realtime/SegmentCompletionTest.java |  30 +-
 7 files changed, 382 insertions(+), 174 deletions(-)

diff --git a/pinot-common/src/main/java/org/apache/pinot/common/protocols/SegmentCompletionProtocol.java b/pinot-common/src/main/java/org/apache/pinot/common/protocols/SegmentCompletionProtocol.java
index cedc8d6..60ee1e0 100644
--- a/pinot-common/src/main/java/org/apache/pinot/common/protocols/SegmentCompletionProtocol.java
+++ b/pinot-common/src/main/java/org/apache/pinot/common/protocols/SegmentCompletionProtocol.java
@@ -112,6 +112,7 @@ public class SegmentCompletionProtocol {
   public static final String MSG_TYPE_COMMIT_START = "segmentCommitStart";
   public static final String MSG_TYPE_SEGMENT_UPLOAD = "segmentUpload";
   public static final String MSG_TYPE_COMMIT_END = "segmentCommitEnd";
+  public static final String MSG_TYPE_COMMIT_END_METADATA = "segmentCommitEndWithMetadata";
   public static final String MSG_TYPE_STOPPED_CONSUMING = "segmentStoppedConsuming";
   public static final String MSG_TYPE_EXTEND_BUILD_TIME = "extendBuildTime";
 
diff --git a/pinot-controller/src/main/java/org/apache/pinot/controller/api/resources/LLCSegmentCompletionHandlers.java b/pinot-controller/src/main/java/org/apache/pinot/controller/api/resources/LLCSegmentCompletionHandlers.java
index 0d5de21..19e2123 100644
--- a/pinot-controller/src/main/java/org/apache/pinot/controller/api/resources/LLCSegmentCompletionHandlers.java
+++ b/pinot-controller/src/main/java/org/apache/pinot/controller/api/resources/LLCSegmentCompletionHandlers.java
@@ -20,14 +20,18 @@ package org.apache.pinot.controller.api.resources;
 
 import com.google.common.annotations.VisibleForTesting;
 import java.io.File;
+import java.io.FileInputStream;
 import java.io.FileOutputStream;
 import java.io.IOException;
 import java.io.InputStream;
 import java.io.OutputStream;
+import java.net.URI;
+import java.net.URISyntaxException;
+import java.nio.file.FileSystems;
+import java.nio.file.Files;
 import java.util.List;
 import java.util.Map;
 import java.util.UUID;
-import javax.annotation.Nullable;
 import javax.inject.Inject;
 import javax.ws.rs.Consumes;
 import javax.ws.rs.GET;
@@ -36,15 +40,20 @@ import javax.ws.rs.Path;
 import javax.ws.rs.Produces;
 import javax.ws.rs.QueryParam;
 import javax.ws.rs.core.MediaType;
-import org.apache.commons.httpclient.URI;
+
+import com.google.common.base.Preconditions;
 import org.apache.commons.io.FileUtils;
 import org.apache.commons.io.IOUtils;
 import org.apache.pinot.common.protocols.SegmentCompletionProtocol;
 import org.apache.pinot.common.utils.LLCSegmentName;
 import org.apache.pinot.common.utils.StringUtil;
+import org.apache.pinot.common.utils.TarGzCompressionUtils;
 import org.apache.pinot.controller.ControllerConf;
 import org.apache.pinot.controller.helix.core.realtime.SegmentCompletionManager;
+import org.apache.pinot.controller.helix.core.realtime.segment.CommittingSegmentDescriptor;
 import org.apache.pinot.controller.util.SegmentCompletionUtils;
+import org.apache.pinot.core.segment.creator.impl.V1Constants;
+import org.apache.pinot.core.segment.index.SegmentMetadataImpl;
 import org.apache.pinot.filesystem.PinotFS;
 import org.apache.pinot.filesystem.PinotFSFactory;
 import org.glassfish.jersey.media.multipart.FormDataBodyPart;
@@ -58,8 +67,10 @@ import org.slf4j.LoggerFactory;
 @Path("/")
 public class LLCSegmentCompletionHandlers {
 
+  private static final String SEGMENT_TMP_DIR = "segment.tmp";
   private static Logger LOGGER = LoggerFactory.getLogger(LLCSegmentCompletionHandlers.class);
   private static final String SCHEME = "file://";
+  private static final String METADATA_TEMP_DIR_SUFFIX = ".metadata.tmp";
 
   @Inject
   ControllerConf _controllerConf;
@@ -197,6 +208,16 @@ public class LLCSegmentCompletionHandlers {
       return SegmentCompletionProtocol.RESP_FAILED.toJsonString();
     }
 
+    SegmentMetadataImpl segmentMetadata;
+    try {
+      segmentMetadata = extractMetadataFromSegmentFile(segmentName, new URI(segmentLocation));
+    } catch (URISyntaxException e) {
+      LOGGER.error("Invalid segment location: {} for segment {}", segmentLocation, segmentName);
+      return SegmentCompletionProtocol.RESP_FAILED.toJsonString();
+    }
+    if (segmentMetadata == null) {
+      return SegmentCompletionProtocol.RESP_FAILED.toJsonString();
+    }
     SegmentCompletionProtocol.Request.Params requestParams = new SegmentCompletionProtocol.Request.Params();
     requestParams.withInstanceId(instanceId).withSegmentName(segmentName).withOffset(offset)
         .withSegmentLocation(segmentLocation).withSegmentSizeBytes(segmentSizeBytes)
@@ -207,8 +228,10 @@ public class LLCSegmentCompletionHandlers {
     final boolean isSuccess = true;
     final boolean isSplitCommit = true;
 
-    SegmentCompletionProtocol.Response response =
-        SegmentCompletionManager.getInstance().segmentCommitEnd(requestParams, isSuccess, isSplitCommit);
+    CommittingSegmentDescriptor committingSegmentDescriptor =
+        CommittingSegmentDescriptor.fromSegmentCompletionReqParamsAndMetadata(requestParams, segmentMetadata);
+    SegmentCompletionProtocol.Response response = SegmentCompletionManager.getInstance()
+        .segmentCommitEnd(requestParams, isSuccess, isSplitCommit, committingSegmentDescriptor);
     final String responseStr = response.toJsonString();
     LOGGER.info("Response to segmentCommitEnd:{}", responseStr);
     return responseStr;
@@ -234,13 +257,68 @@ public class LLCSegmentCompletionHandlers {
 
     final SegmentCompletionManager segmentCompletionManager = SegmentCompletionManager.getInstance();
     SegmentCompletionProtocol.Response response = segmentCompletionManager.segmentCommitStart(requestParams);
-    if (response.equals(SegmentCompletionProtocol.RESP_COMMIT_CONTINUE)) {
-      // Get the segment and put it in the right place.
-      boolean success = uploadSegment(multiPart, instanceId, segmentName, false) != null;
 
-      response = segmentCompletionManager.segmentCommitEnd(requestParams, success, false);
+    CommittingSegmentDescriptor committingSegmentDescriptor =
+        CommittingSegmentDescriptor.fromSegmentCompletionReqParams(requestParams);
+    boolean success = false;
+
+    if (response.equals(SegmentCompletionProtocol.RESP_COMMIT_CONTINUE)) {
+      File localTmpFile = null;
+      try {
+        // Get the segment from the form input and put it in a tmp area in the local file system.
+        localTmpFile = uploadFileToLocalTmpFile(multiPart, instanceId, segmentName);
+        if (localTmpFile == null) {
+          LOGGER.error("Unable to get the segment file from multipart input to local file {}", segmentName);
+        } else {
+          // Extract the segment metadata from the segment file.
+          SegmentMetadataImpl segmentMetadata =
+              getSegmentMetadataFromLocalFile(new LLCSegmentName(segmentName), localTmpFile);
+          if (segmentMetadata == null) {
+            LOGGER.error("Unable to extract segment metadata from segment data: {}", segmentName);
+          } else {
+            // Store the segment file to Pinot FS.
+            try {
+              FileUploadPathProvider provider = new FileUploadPathProvider(_controllerConf);
+              final String rawTableName = new LLCSegmentName(segmentName).getTableName();
+              URI segmentFileURI = ControllerConf.getUriFromPath(
+                  StringUtil.join("/", provider.getBaseDataDirURI().toString(), rawTableName, segmentName));
+              PinotFS pinotFS = PinotFSFactory.create(provider.getBaseDataDirURI().getScheme());
+              // Multiple threads can reach this point at the same time, if the following scenario happens
+              // The server that was asked to commit did so very slowly (due to network speeds). Meanwhile the FSM in
+              // SegmentCompletionManager timed out, and allowed another server to commit, which did so very quickly (somehow
+              // the network speeds changed). The second server made it through the FSM and reached this point.
+              // The synchronization below takes care that exactly one file gets moved in place.
+              // There are still corner conditions that are not handled correctly. For example,
+              // 1. What if the offset of the faster server was different?
+              // 2. We know that only the faster server will get to complete the COMMIT call successfully. But it is possible
+              //    that the race to this statement is won by the slower server, and so the real segment that is in there is that
+              //    of the slower server.
+              // In order to overcome controller restarts after the segment is moved to PinotFS, but before it is committed, we DO need to
+              // check for existing segment file and remove it. So, the block cannot be removed altogether.
+              // For now, we live with these corner cases. Once we have split-commit enabled and working, this code will no longer
+              // be used.
+              synchronized (SegmentCompletionManager.getInstance()) {
+                if (pinotFS.exists(segmentFileURI)) {
+                  LOGGER.warn("Segment file {} exists. Replacing with upload from {} for segment {}",
+                      segmentFileURI.toString(), instanceId, segmentName);
+                  pinotFS.delete(segmentFileURI, true);
+                }
+                pinotFS.copyFromLocalFile(localTmpFile, segmentFileURI);
+              }
+              committingSegmentDescriptor =
+                  CommittingSegmentDescriptor.fromSegmentCompletionReqParamsAndMetadata(requestParams, segmentMetadata);
+              success = true;
+            } catch (Exception e) {
+              LOGGER.error("Could not save segment {} to PinotFS", segmentName, e);
+            }
+          }
+        }
+      } finally {
+        FileUtils.deleteQuietly(localTmpFile);
+      }
     }
 
+    response = segmentCompletionManager.segmentCommitEnd(requestParams, success, false, committingSegmentDescriptor);
     LOGGER.info("Response to segmentCommit: instance={}  segment={} status={} offset={}", requestParams.getInstanceId(),
         requestParams.getSegmentName(), response.getStatus(), response.getOffset());
 
@@ -260,25 +338,202 @@ public class LLCSegmentCompletionHandlers {
     requestParams.withInstanceId(instanceId).withSegmentName(segmentName).withOffset(offset);
     LOGGER.info("Processing segmentUpload:{}", requestParams.toString());
 
-    final String segmentLocation = uploadSegment(multiPart, instanceId, segmentName, true);
-    if (segmentLocation == null) {
+    // Get the segment from the form input and put it in the right place.
+    File localTmpFile = uploadFileToLocalTmpFile(multiPart, instanceId, segmentName);
+    if (localTmpFile == null) {
+      LOGGER.error("Unable to get the segment file from multipart input to local file {}", segmentName);
       return SegmentCompletionProtocol.RESP_FAILED.toJsonString();
     }
-    SegmentCompletionProtocol.Response.Params responseParams =
-        new SegmentCompletionProtocol.Response.Params().withOffset(requestParams.getOffset())
-            .withSegmentLocation(segmentLocation)
-            .withStatus(SegmentCompletionProtocol.ControllerResponseStatus.UPLOAD_SUCCESS);
+    try {
+      FileUploadPathProvider provider = new FileUploadPathProvider(_controllerConf);
+      URI uri = localSegementFileToPinotFsTmpLocation(provider, localTmpFile, segmentName);
+      if (uri == null) {
+        LOGGER.error("Unable to upload local segment file {} to Pinot storage for segment ", localTmpFile.toPath(),
+            segmentName);
+        return SegmentCompletionProtocol.RESP_FAILED.toJsonString();
+      }
+      SegmentCompletionProtocol.Response.Params responseParams =
+          new SegmentCompletionProtocol.Response.Params().withOffset(requestParams.getOffset())
+              .withSegmentLocation(uri.toString())
+              .withStatus(SegmentCompletionProtocol.ControllerResponseStatus.UPLOAD_SUCCESS);
+
+      String response = new SegmentCompletionProtocol.Response(responseParams).toJsonString();
 
-    String response = new SegmentCompletionProtocol.Response(responseParams).toJsonString();
+      LOGGER.info("Response to segmentUpload:{}", response);
 
-    LOGGER.info("Response to segmentUpload:{}", response);
+      return response;
+    } catch (Exception e) {
 
-    return response;
+      return SegmentCompletionProtocol.RESP_FAILED.toJsonString();
+    } finally {
+      FileUtils.deleteQuietly(localTmpFile);
+    }
   }
 
-  @Nullable
-  private String uploadSegment(FormDataMultiPart multiPart, String instanceId, String segmentName,
-      boolean isSplitCommit) {
+  @POST
+  @Path(SegmentCompletionProtocol.MSG_TYPE_COMMIT_END_METADATA)
+  @Produces(MediaType.APPLICATION_JSON)
+  @Consumes(MediaType.MULTIPART_FORM_DATA)
+  public String segmentCommitEndWithMetadata(@QueryParam(SegmentCompletionProtocol.PARAM_INSTANCE_ID) String instanceId,
+      @QueryParam(SegmentCompletionProtocol.PARAM_SEGMENT_NAME) String segmentName,
+      @QueryParam(SegmentCompletionProtocol.PARAM_SEGMENT_LOCATION) String segmentLocation,
+      @QueryParam(SegmentCompletionProtocol.PARAM_OFFSET) long offset,
+      @QueryParam(SegmentCompletionProtocol.PARAM_MEMORY_USED_BYTES) long memoryUsedBytes,
+      @QueryParam(SegmentCompletionProtocol.PARAM_BUILD_TIME_MILLIS) long buildTimeMillis,
+      @QueryParam(SegmentCompletionProtocol.PARAM_WAIT_TIME_MILLIS) long waitTimeMillis,
+      @QueryParam(SegmentCompletionProtocol.PARAM_ROW_COUNT) int numRows,
+      @QueryParam(SegmentCompletionProtocol.PARAM_SEGMENT_SIZE_BYTES) long segmentSizeBytes,
+      FormDataMultiPart metadataFiles) {
+    if (instanceId == null || segmentName == null || offset == -1 || segmentLocation == null || metadataFiles == null) {
+      LOGGER.error("Invalid call: offset={}, segmentName={}, instanceId={}, segmentLocation={}", offset, segmentName,
+          instanceId, segmentLocation);
+      // TODO: memoryUsedInBytes = 0 if not present in params. Add validation when we start using it
+      return SegmentCompletionProtocol.RESP_FAILED.toJsonString();
+    }
+
+    SegmentCompletionProtocol.Request.Params requestParams = new SegmentCompletionProtocol.Request.Params();
+    requestParams.withInstanceId(instanceId).withSegmentName(segmentName).withOffset(offset)
+        .withSegmentLocation(segmentLocation).withSegmentSizeBytes(segmentSizeBytes)
+        .withBuildTimeMillis(buildTimeMillis).withWaitTimeMillis(waitTimeMillis).withNumRows(numRows)
+        .withMemoryUsedBytes(memoryUsedBytes);
+    LOGGER.info("Processing segmentCommitEnd:{}", requestParams.toString());
+
+    final boolean isSuccess = true;
+    final boolean isSplitCommit = true;
+    SegmentMetadataImpl segmentMetadata = extractMetadataFromInput(metadataFiles, segmentName);
+    // If it fails to extract metadata from the input form, return failure.
+    if (segmentMetadata == null) {
+      LOGGER.error("Segment metadata extraction failure for segment {}", segmentName);
+      return SegmentCompletionProtocol.RESP_FAILED.toJsonString();
+    }
+    SegmentCompletionProtocol.Response response = SegmentCompletionManager.getInstance()
+        .segmentCommitEnd(requestParams, isSuccess, isSplitCommit,
+            CommittingSegmentDescriptor.fromSegmentCompletionReqParamsAndMetadata(requestParams, segmentMetadata));
+    final String responseStr = response.toJsonString();
+    LOGGER.info("Response to segmentCommitEnd:{}", responseStr);
+    return responseStr;
+  }
+
+  /**
+   * Extract and return the segment metadata from the two input form data files (metadata file and creation meta).
+   * Return null if any of the two files is missing or there is exception during parsing and extraction.
+   */
+  private SegmentMetadataImpl extractMetadataFromInput(FormDataMultiPart metadataFiles, String segmentNameStr) {
+    String tempMetadataDirStr = StringUtil.join("/", _controllerConf.getLocalTempDir(),
+        segmentNameStr + METADATA_TEMP_DIR_SUFFIX + String.valueOf(System.currentTimeMillis()));
+    File tempMetadataDir = new File(tempMetadataDirStr);
+    try {
+      Preconditions.checkState(tempMetadataDir.mkdirs(), "Failed to create directory: %s", tempMetadataDirStr);
+      // Extract metadata.properties from the metadataFiles.
+      if (!extractMetadataFromInputField(metadataFiles, tempMetadataDirStr,
+          V1Constants.MetadataKeys.METADATA_FILE_NAME, segmentNameStr)) {
+        return null;
+      }
+      // Extract creation.meta from the metadataFiles.
+      if (!extractMetadataFromInputField(metadataFiles, tempMetadataDirStr, V1Constants.SEGMENT_CREATION_META,
+          segmentNameStr)) {
+        return null;
+      }
+      // Load segment metadata
+      return new SegmentMetadataImpl(tempMetadataDir);
+    } catch (Exception e) {
+      LOGGER.error("Exception extracting and reading segment metadata for {}", segmentNameStr, e);
+      return null;
+    } finally {
+      FileUtils.deleteQuietly(tempMetadataDir);
+    }
+  }
+
+  /**
+   *
+   * Extract a single file with name metaFileName from the input FormDataMultiPart and put it under the path
+   * tempMetadataDirStr + metaFileName.
+   * Return true iff the extraction and copy is successful.
+   */
+  private boolean extractMetadataFromInputField(FormDataMultiPart metadataFiles, String tempMetadataDirStr,
+      String metaFileName, String segmentName) {
+    FormDataBodyPart metadataFilesField = metadataFiles.getField(metaFileName);
+    Preconditions.checkNotNull(metadataFilesField, "The metadata input field %s does not exist.", metaFileName);
+
+    try (InputStream metadataPropertiesInputStream = metadataFilesField.getValueAs(InputStream.class)) {
+      Preconditions.checkNotNull(metadataPropertiesInputStream, "Unable to parse %s from input.", metaFileName);
+      java.nio.file.Path metadataPropertiesPath = FileSystems.getDefault().getPath(tempMetadataDirStr, metaFileName);
+      Files.copy(metadataPropertiesInputStream, metadataPropertiesPath);
+      return true;
+    } catch (IOException e) {
+      LOGGER.error("Failed to extract metadata property file: {} for segment {}", metaFileName, segmentName, e);
+    }
+    return false;
+  }
+
+  /**
+   * Extract metadata from a segment found in a URI (i.e., segmentLocation) in PinotFS.
+   * <p>We extract the metadata.properties and creation.meta into a temporary metadata directory:
+   * DATADIR/rawTableName/segmentName.metadata.tmp, and load metadata from there.
+   *
+   * @param segmentNameStr Name of the segment
+   * @param segmentLocation the location of the segment file in PinotFS.
+   * @return SegmentMetadataImpl if it is able to extract the metadata file from the tar-zipped segment file.
+   */
+  private SegmentMetadataImpl extractMetadataFromSegmentFile(final String segmentNameStr, final URI segmentLocation) {
+    LLCSegmentName segmentName = new LLCSegmentName(segmentNameStr);
+    String baseDirStr = StringUtil.join("/", _controllerConf.getDataDir(), segmentName.getTableName());
+    String tempSegmentDataDirStr =
+        StringUtil.join("/", baseDirStr, segmentNameStr + SEGMENT_TMP_DIR + String.valueOf(System.currentTimeMillis()));
+    File tempSegmentDataDir = new File(tempSegmentDataDirStr);
+    File segDstFile = new File(StringUtil.join("/", tempSegmentDataDirStr, segmentNameStr));
+    // Use PinotFS to copy the segment file to local fs for metadata extraction.
+    PinotFS pinotFS = PinotFSFactory.create(ControllerConf.getUriFromPath(_controllerConf.getDataDir()).getScheme());
+    try {
+      Preconditions.checkState(tempSegmentDataDir.mkdirs(), "Failed to create directory: %s", tempSegmentDataDir);
+      pinotFS.copyToLocalFile(segmentLocation, segDstFile);
+      return getSegmentMetadataFromLocalFile(segmentName, segDstFile);
+    } catch (Exception e) {
+      LOGGER.error("Exception in extracting segment file to local {}", segmentNameStr, e);
+      return null;
+    } finally {
+      FileUtils.deleteQuietly(tempSegmentDataDir);
+    }
+  }
+
+  private SegmentMetadataImpl getSegmentMetadataFromLocalFile(LLCSegmentName segmentName, File segmentFile) {
+    String baseDirStr = StringUtil.join("/", _controllerConf.getDataDir(), segmentName.getTableName());
+    String tempMetadataDirStr = StringUtil.join("/", baseDirStr,
+        segmentName.getSegmentName() + METADATA_TEMP_DIR_SUFFIX + String.valueOf(System.currentTimeMillis()));
+    File tempMetadataDir = new File(tempMetadataDirStr);
+    try (// Extract metadata.properties
+        InputStream metadataPropertiesInputStream = TarGzCompressionUtils
+            .unTarOneFile(new FileInputStream(segmentFile), V1Constants.MetadataKeys.METADATA_FILE_NAME);
+        // Extract creation.meta
+        InputStream creationMetaInputStream = TarGzCompressionUtils
+            .unTarOneFile(new FileInputStream(segmentFile), V1Constants.SEGMENT_CREATION_META)) {
+      Preconditions.checkState(tempMetadataDir.mkdirs(), "Failed to create directory: %s", tempMetadataDirStr);
+      Preconditions.checkNotNull(metadataPropertiesInputStream, "%s does not exist",
+          V1Constants.MetadataKeys.METADATA_FILE_NAME);
+      java.nio.file.Path metadataPropertiesPath =
+          FileSystems.getDefault().getPath(tempMetadataDirStr, V1Constants.MetadataKeys.METADATA_FILE_NAME);
+      Files.copy(metadataPropertiesInputStream, metadataPropertiesPath);
+
+      Preconditions.checkNotNull(creationMetaInputStream, "%s does not exist", V1Constants.SEGMENT_CREATION_META);
+      java.nio.file.Path creationMetaPath =
+          FileSystems.getDefault().getPath(tempMetadataDirStr, V1Constants.SEGMENT_CREATION_META);
+      Files.copy(creationMetaInputStream, creationMetaPath);
+      // Load segment metadata
+      return new SegmentMetadataImpl(tempMetadataDir);
+    } catch (Exception e) {
+      LOGGER.error("Exception extracting and reading segment metadata for {}", segmentName.getSegmentName(), e);
+      return null;
+    } finally {
+      FileUtils.deleteQuietly(tempMetadataDir);
+    }
+  }
+
+  /**
+   *
+   * Copy the uploaded segment file in the input form to a local tmp file and return the tmp file.
+   * Return null when there is any error during the process.
+   */
+  private File uploadFileToLocalTmpFile(FormDataMultiPart multiPart, String instanceId, String segmentName) {
     try {
       Map<String, List<FormDataBodyPart>> map = multiPart.getFields();
       if (!PinotSegmentUploadRestletResource.validateMultiPart(map, segmentName)) {
@@ -297,59 +552,7 @@ public class LLCSegmentCompletionHandlers {
           OutputStream outputStream = new FileOutputStream(localTmpFile)) {
         IOUtils.copyLarge(inputStream, outputStream);
       }
-
-      LLCSegmentName llcSegmentName = new LLCSegmentName(segmentName);
-      final String rawTableName = llcSegmentName.getTableName();
-      final java.net.URI tableDirURI =
-          ControllerConf.getUriFromPath(StringUtil.join("/", provider.getBaseDataDirURI().toString(), rawTableName));
-      java.net.URI segmentFileURI;
-      if (isSplitCommit) {
-        // We only clean up tmp segment file under table dir, so don't create any sub-dir under table dir.
-        // See PinotLLCRealtimeSegmentManager.commitSegmentFile().
-        // TODO: move tmp file logic into SegmentCompletionUtils.
-        String uniqueSegmentFileName = SegmentCompletionUtils.generateSegmentFileName(segmentName);
-        segmentFileURI =
-            ControllerConf.getUriFromPath(StringUtil.join("/", tableDirURI.toString(), uniqueSegmentFileName));
-      } else {
-        segmentFileURI = ControllerConf.getUriFromPath(StringUtil.join("/", tableDirURI.toString(), segmentName));
-      }
-
-      PinotFS pinotFS = PinotFSFactory.create(provider.getBaseDataDirURI().getScheme());
-      try {
-        if (isSplitCommit) {
-          pinotFS.copyFromLocalFile(localTmpFile, segmentFileURI);
-        } else {
-          // Multiple threads can reach this point at the same time, if the following scenario happens
-          // The server that was asked to commit did so very slowly (due to network speeds). Meanwhile the FSM in
-          // SegmentCompletionManager timed out, and allowed another server to commit, which did so very quickly (somehow
-          // the network speeds changed). The second server made it through the FSM and reached this point.
-          // The synchronization below takes care that exactly one file gets moved in place.
-          // There are still corner conditions that are not handled correctly. For example,
-          // 1. What if the offset of the faster server was different?
-          // 2. We know that only the faster server will get to complete the COMMIT call successfully. But it is possible
-          //    that the race to this statement is won by the slower server, and so the real segment that is in there is that
-          //    of the slower server.
-          // In order to overcome controller restarts after the segment is renamed, but before it is committed, we DO need to
-          // check for existing segment file and remove it. So, the block cannot be removed altogether.
-          // For now, we live with these corner cases. Once we have split-commit enabled and working, this code will no longer
-          // be used.
-          synchronized (SegmentCompletionManager.getInstance()) {
-            if (pinotFS.exists(segmentFileURI)) {
-              LOGGER
-                  .warn("Segment file {} exists. Replacing with upload from {}", segmentFileURI.toString(), instanceId);
-              pinotFS.delete(segmentFileURI, true);
-            }
-            pinotFS.copyFromLocalFile(localTmpFile, segmentFileURI);
-          }
-        }
-      } catch (Exception e) {
-        LOGGER.error("Could not copy from {} to {}", localTmpFile.getAbsolutePath(), segmentFileURI.toString());
-      } finally {
-        FileUtils.deleteQuietly(localTmpFile);
-      }
-
-      LOGGER.info("Moved file {} to {}", localTmpFile.getAbsolutePath(), segmentFileURI.toString());
-      return new URI(SCHEME + segmentFileURI.toString(), /* boolean escaped */ false).toString();
+      return localTmpFile;
     } catch (InvalidControllerConfigException e) {
       LOGGER.error("Invalid controller config exception from instance {} for segment {}", instanceId, segmentName, e);
       return null;
@@ -360,4 +563,19 @@ public class LLCSegmentCompletionHandlers {
       multiPart.cleanup();
     }
   }
+
+  private URI localSegementFileToPinotFsTmpLocation(FileUploadPathProvider provider, File localTmpFile,
+      String segmentName)
+      throws Exception {
+    final String rawTableName = new LLCSegmentName(segmentName).getTableName();
+    // We only clean up tmp segment file under table dir, so don't create any sub-dir under table dir.
+    // See PinotLLCRealtimeSegmentManager.commitSegmentFile().
+    // TODO: move tmp file logic into SegmentCompletionUtils.
+    String uniqueSegmentFileName = SegmentCompletionUtils.generateSegmentFileName(segmentName);
+    URI segmentFileURI = ControllerConf.getUriFromPath(
+        StringUtil.join("/", provider.getBaseDataDirURI().toString(), rawTableName, uniqueSegmentFileName));
+    PinotFS pinotFS = PinotFSFactory.create(provider.getBaseDataDirURI().getScheme());
+    pinotFS.copyFromLocalFile(localTmpFile, segmentFileURI);
+    return segmentFileURI;
+  }
 }
diff --git a/pinot-controller/src/main/java/org/apache/pinot/controller/helix/core/realtime/PinotLLCRealtimeSegmentManager.java b/pinot-controller/src/main/java/org/apache/pinot/controller/helix/core/realtime/PinotLLCRealtimeSegmentManager.java
index 82d2e31..7a883ca 100644
--- a/pinot-controller/src/main/java/org/apache/pinot/controller/helix/core/realtime/PinotLLCRealtimeSegmentManager.java
+++ b/pinot-controller/src/main/java/org/apache/pinot/controller/helix/core/realtime/PinotLLCRealtimeSegmentManager.java
@@ -23,13 +23,7 @@ import com.google.common.base.Function;
 import com.google.common.base.Preconditions;
 import com.google.common.collect.Lists;
 import com.google.common.util.concurrent.Uninterruptibles;
-import java.io.File;
-import java.io.FileInputStream;
-import java.io.InputStream;
 import java.net.URI;
-import java.nio.file.FileSystems;
-import java.nio.file.Files;
-import java.nio.file.Path;
 import java.util.ArrayList;
 import java.util.Collections;
 import java.util.HashMap;
@@ -45,7 +39,6 @@ import java.util.concurrent.locks.ReentrantLock;
 import javax.annotation.Nonnull;
 import javax.annotation.Nullable;
 import org.apache.commons.collections.CollectionUtils;
-import org.apache.commons.io.FileUtils;
 import org.apache.helix.AccessOption;
 import org.apache.helix.HelixAdmin;
 import org.apache.helix.HelixManager;
@@ -70,7 +63,6 @@ import org.apache.pinot.common.utils.CommonConstants;
 import org.apache.pinot.common.utils.LLCSegmentName;
 import org.apache.pinot.common.utils.SegmentName;
 import org.apache.pinot.common.utils.StringUtil;
-import org.apache.pinot.common.utils.TarGzCompressionUtils;
 import org.apache.pinot.common.utils.helix.HelixHelper;
 import org.apache.pinot.common.utils.retry.RetryPolicies;
 import org.apache.pinot.controller.ControllerConf;
@@ -89,7 +81,6 @@ import org.apache.pinot.core.realtime.stream.OffsetCriteria;
 import org.apache.pinot.core.realtime.stream.PartitionOffsetFetcher;
 import org.apache.pinot.core.realtime.stream.StreamConfig;
 import org.apache.pinot.core.realtime.stream.StreamConfigProperties;
-import org.apache.pinot.core.segment.creator.impl.V1Constants;
 import org.apache.pinot.core.segment.index.ColumnMetadata;
 import org.apache.pinot.core.segment.index.SegmentMetadataImpl;
 import org.apache.pinot.filesystem.PinotFS;
@@ -486,7 +477,8 @@ public class PinotLLCRealtimeSegmentManager {
     }
 
     // Step-1
-    boolean success = updateOldSegmentMetadataZNRecord(realtimeTableName, committingLLCSegmentName, nextOffset);
+    boolean success = updateOldSegmentMetadataZNRecord(realtimeTableName, committingLLCSegmentName, nextOffset,
+            committingSegmentDescriptor);
     if (!success) {
       return false;
     }
@@ -532,10 +524,12 @@ public class PinotLLCRealtimeSegmentManager {
    * @param realtimeTableName - table name for which segment is being committed
    * @param committingLLCSegmentName - name of the segment being committed
    * @param nextOffset - the end offset for this committing segment
+   * @param committingSegmentDescriptor - the metadata of the commit segment.
    * @return
    */
   protected boolean updateOldSegmentMetadataZNRecord(String realtimeTableName, LLCSegmentName committingLLCSegmentName,
-      long nextOffset) {
+                                                     long nextOffset,
+                                                     CommittingSegmentDescriptor committingSegmentDescriptor) {
 
     String committingSegmentNameStr = committingLLCSegmentName.getSegmentName();
     Stat stat = new Stat();
@@ -547,6 +541,12 @@ public class PinotLLCRealtimeSegmentManager {
           committingSegmentNameStr, realtimeTableName, committingSegmentMetadata.getStatus());
       return false;
     }
+    if (committingSegmentDescriptor.getSegmentMetadata() == null) {
+      LOGGER.error("No segment metadata found in descriptor for committing segment {} for table {}", committingLLCSegmentName,
+              realtimeTableName);
+      return false;
+    }
+    SegmentMetadataImpl segmentMetadata = committingSegmentDescriptor.getSegmentMetadata();
 
     // TODO: set number of rows to end consumption in new segment metadata, based on memory used and number of rows from old segment
     committingSegmentMetadata.setEndOffset(nextOffset);
@@ -554,8 +554,6 @@ public class PinotLLCRealtimeSegmentManager {
     String rawTableName = TableNameBuilder.extractRawTableName(realtimeTableName);
     committingSegmentMetadata.setDownloadUrl(
         ControllerConf.constructDownloadUrl(rawTableName, committingSegmentNameStr, _controllerConf.generateVipUrl()));
-    // Pull segment metadata from incoming segment and set it in zk segment metadata
-    SegmentMetadataImpl segmentMetadata = extractSegmentMetadata(rawTableName, committingSegmentNameStr);
     committingSegmentMetadata.setCrc(Long.valueOf(segmentMetadata.getCrc()));
     committingSegmentMetadata.setStartTime(segmentMetadata.getTimeInterval().getStartMillis());
     committingSegmentMetadata.setEndTime(segmentMetadata.getTimeInterval().getEndMillis());
@@ -683,51 +681,6 @@ public class PinotLLCRealtimeSegmentManager {
     return commitTimeoutMS;
   }
 
-  /**
-   * Extract the segment metadata files from the tar-zipped segment file that is expected to be in the directory for the
-   * table.
-   * <p>Segment tar-zipped file path: DATADIR/rawTableName/segmentName.
-   * <p>We extract the metadata.properties and creation.meta into a temporary metadata directory:
-   * DATADIR/rawTableName/segmentName.metadata.tmp, and load metadata from there.
-   *
-   * @param rawTableName Name of the table (not including the REALTIME extension)
-   * @param segmentNameStr Name of the segment
-   * @return SegmentMetadataImpl if it is able to extract the metadata file from the tar-zipped segment file.
-   */
-  protected SegmentMetadataImpl extractSegmentMetadata(final String rawTableName, final String segmentNameStr) {
-    String baseDirStr = StringUtil.join("/", _controllerConf.getDataDir(), rawTableName);
-    String segFileStr = StringUtil.join("/", baseDirStr, segmentNameStr);
-    String tempMetadataDirStr = StringUtil.join("/", baseDirStr, segmentNameStr + METADATA_TEMP_DIR_SUFFIX);
-    File tempMetadataDir = new File(tempMetadataDirStr);
-
-    try {
-      Preconditions.checkState(tempMetadataDir.mkdirs(), "Failed to create directory: %s", tempMetadataDirStr);
-
-      // Extract metadata.properties
-      InputStream metadataPropertiesInputStream = TarGzCompressionUtils
-          .unTarOneFile(new FileInputStream(new File(segFileStr)), V1Constants.MetadataKeys.METADATA_FILE_NAME);
-      Preconditions.checkNotNull(metadataPropertiesInputStream, "%s does not exist",
-          V1Constants.MetadataKeys.METADATA_FILE_NAME);
-      Path metadataPropertiesPath =
-          FileSystems.getDefault().getPath(tempMetadataDirStr, V1Constants.MetadataKeys.METADATA_FILE_NAME);
-      Files.copy(metadataPropertiesInputStream, metadataPropertiesPath);
-
-      // Extract creation.meta
-      InputStream creationMetaInputStream = TarGzCompressionUtils
-          .unTarOneFile(new FileInputStream(new File(segFileStr)), V1Constants.SEGMENT_CREATION_META);
-      Preconditions.checkNotNull(creationMetaInputStream, "%s does not exist", V1Constants.SEGMENT_CREATION_META);
-      Path creationMetaPath = FileSystems.getDefault().getPath(tempMetadataDirStr, V1Constants.SEGMENT_CREATION_META);
-      Files.copy(creationMetaInputStream, creationMetaPath);
-
-      // Load segment metadata
-      return new SegmentMetadataImpl(tempMetadataDir);
-    } catch (Exception e) {
-      throw new RuntimeException("Exception extracting and reading segment metadata for " + segmentNameStr, e);
-    } finally {
-      FileUtils.deleteQuietly(tempMetadataDir);
-    }
-  }
-
   public LLCRealtimeSegmentZKMetadata getRealtimeSegmentZKMetadata(String realtimeTableName, String segmentName,
       Stat stat) {
     ZNRecord znRecord = _propertyStore
diff --git a/pinot-controller/src/main/java/org/apache/pinot/controller/helix/core/realtime/SegmentCompletionManager.java b/pinot-controller/src/main/java/org/apache/pinot/controller/helix/core/realtime/SegmentCompletionManager.java
index e3e3003..b499e3e 100644
--- a/pinot-controller/src/main/java/org/apache/pinot/controller/helix/core/realtime/SegmentCompletionManager.java
+++ b/pinot-controller/src/main/java/org/apache/pinot/controller/helix/core/realtime/SegmentCompletionManager.java
@@ -290,7 +290,7 @@ public class SegmentCompletionManager {
    * @return
    */
   public SegmentCompletionProtocol.Response segmentCommitEnd(SegmentCompletionProtocol.Request.Params reqParams,
-      boolean success, boolean isSplitCommit) {
+      boolean success, boolean isSplitCommit, CommittingSegmentDescriptor committingSegmentDescriptor) {
     if (!isLeader() || !_helixManager.isConnected()) {
       _controllerMetrics.addMeteredGlobalValue(ControllerMeter.CONTROLLER_NOT_LEADER, 1L);
       return SegmentCompletionProtocol.RESP_NOT_LEADER;
@@ -301,7 +301,7 @@ public class SegmentCompletionManager {
     SegmentCompletionProtocol.Response response = SegmentCompletionProtocol.RESP_FAILED;
     try {
       fsm = lookupOrCreateFsm(segmentName, SegmentCompletionProtocol.MSG_TYPE_COMMIT);
-      response = fsm.segmentCommitEnd(reqParams, success, isSplitCommit);
+      response = fsm.segmentCommitEnd(reqParams, success, isSplitCommit, committingSegmentDescriptor);
     } catch (Exception e) {
       LOGGER.error("Caught exception in segmentCommitEnd for segment {}", segmentNameStr, e);
     }
@@ -605,7 +605,7 @@ public class SegmentCompletionManager {
      * the _winner.
      */
     public SegmentCompletionProtocol.Response segmentCommitEnd(SegmentCompletionProtocol.Request.Params reqParams,
-        boolean success, boolean isSplitCommit) {
+        boolean success, boolean isSplitCommit, CommittingSegmentDescriptor committingSegmentDescriptor) {
       String instanceId = reqParams.getInstanceId();
       long offset = reqParams.getOffset();
       synchronized (this) {
@@ -624,7 +624,7 @@ public class SegmentCompletionManager {
           LOGGER.error("Segment upload failed");
           return abortAndReturnFailed();
         }
-        SegmentCompletionProtocol.Response response = commitSegment(reqParams, isSplitCommit);
+        SegmentCompletionProtocol.Response response = commitSegment(reqParams, isSplitCommit, committingSegmentDescriptor);
         if (!response.equals(SegmentCompletionProtocol.RESP_COMMIT_SUCCESS)) {
           return abortAndReturnFailed();
         } else {
@@ -1005,7 +1005,8 @@ public class SegmentCompletionManager {
     }
 
     private SegmentCompletionProtocol.Response commitSegment(SegmentCompletionProtocol.Request.Params reqParams,
-        boolean isSplitCommit) {
+                                                             boolean isSplitCommit,
+                                                             CommittingSegmentDescriptor committingSegmentDescriptor) {
       boolean success;
       String instanceId = reqParams.getInstanceId();
       long offset = reqParams.getOffset();
@@ -1019,8 +1020,6 @@ public class SegmentCompletionManager {
       _state = State.COMMITTING;
       // In case of splitCommit, the segment is uploaded to a unique file name indicated by segmentLocation,
       // so we need to move the segment file to its permanent location first before committing the metadata.
-      CommittingSegmentDescriptor committingSegmentDescriptor =
-          CommittingSegmentDescriptor.fromSegmentCompletionReqParams(reqParams);
       if (isSplitCommit) {
         if (!_segmentManager.commitSegmentFile(_segmentName.getTableName(), committingSegmentDescriptor)) {
           return SegmentCompletionProtocol.RESP_FAILED;
diff --git a/pinot-controller/src/main/java/org/apache/pinot/controller/helix/core/realtime/segment/CommittingSegmentDescriptor.java b/pinot-controller/src/main/java/org/apache/pinot/controller/helix/core/realtime/segment/CommittingSegmentDescriptor.java
index 82304d3..50c4e3f 100644
--- a/pinot-controller/src/main/java/org/apache/pinot/controller/helix/core/realtime/segment/CommittingSegmentDescriptor.java
+++ b/pinot-controller/src/main/java/org/apache/pinot/controller/helix/core/realtime/segment/CommittingSegmentDescriptor.java
@@ -19,6 +19,7 @@
 package org.apache.pinot.controller.helix.core.realtime.segment;
 
 import org.apache.pinot.common.protocols.SegmentCompletionProtocol;
+import org.apache.pinot.core.segment.index.SegmentMetadataImpl;
 
 
 /**
@@ -29,6 +30,7 @@ public class CommittingSegmentDescriptor {
   private long _segmentSizeBytes;
   private String _segmentLocation;
   private long _nextOffset;
+  private SegmentMetadataImpl _segmentMetadata;
 
   public static CommittingSegmentDescriptor fromSegmentCompletionReqParams(
       SegmentCompletionProtocol.Request.Params reqParams) {
@@ -39,6 +41,14 @@ public class CommittingSegmentDescriptor {
     return committingSegmentDescriptor;
   }
 
+  public static CommittingSegmentDescriptor fromSegmentCompletionReqParamsAndMetadata(
+          SegmentCompletionProtocol.Request.Params reqParams, SegmentMetadataImpl metadata) {
+    CommittingSegmentDescriptor committingSegmentDescriptor =
+            fromSegmentCompletionReqParams(reqParams);
+    committingSegmentDescriptor.setSegmentMetadata(metadata);
+    return committingSegmentDescriptor;
+  }
+
   public CommittingSegmentDescriptor(String segmentName, long nextOffset, long segmentSizeBytes) {
     _segmentName = segmentName;
     _nextOffset = nextOffset;
@@ -82,4 +92,12 @@ public class CommittingSegmentDescriptor {
   public void setNextOffset(long nextOffset) {
     _nextOffset = nextOffset;
   }
+
+  public SegmentMetadataImpl getSegmentMetadata() {
+    return _segmentMetadata;
+  }
+
+  public void setSegmentMetadata(SegmentMetadataImpl segmentMetadata) {
+    _segmentMetadata = segmentMetadata;
+  }
 }
diff --git a/pinot-controller/src/test/java/org/apache/pinot/controller/helix/core/realtime/PinotLLCRealtimeSegmentManagerTest.java b/pinot-controller/src/test/java/org/apache/pinot/controller/helix/core/realtime/PinotLLCRealtimeSegmentManagerTest.java
index 1e68bbc..8153559 100644
--- a/pinot-controller/src/test/java/org/apache/pinot/controller/helix/core/realtime/PinotLLCRealtimeSegmentManagerTest.java
+++ b/pinot-controller/src/test/java/org/apache/pinot/controller/helix/core/realtime/PinotLLCRealtimeSegmentManagerTest.java
@@ -347,11 +347,11 @@ public class PinotLLCRealtimeSegmentManagerTest {
     String tableName = tableConfig.getTableName();
     String rawTableName = TableNameBuilder.extractRawTableName(tableName);
     LLCSegmentName llcSegmentName = new LLCSegmentName(segmentName);
-    segmentManager.updateOldSegmentMetadataZNRecord(tableName, llcSegmentName, nextOffset);
     LLCSegmentName newLlcSegmentName =
         new LLCSegmentName(rawTableName, partition, nextSeqNum, System.currentTimeMillis());
     CommittingSegmentDescriptor committingSegmentDescriptor =
         new CommittingSegmentDescriptor(segmentName, nextOffset, 0);
+    segmentManager.updateOldSegmentMetadataZNRecord(tableName, llcSegmentName, nextOffset, committingSegmentDescriptor);
     segmentManager.createNewSegmentMetadataZNRecord(tableConfig, llcSegmentName, newLlcSegmentName, partitionAssignment,
         committingSegmentDescriptor, false);
     segmentManager.updateIdealStateOnSegmentCompletion(idealState, segmentName, newLlcSegmentName.getSegmentName(),
@@ -552,13 +552,13 @@ public class PinotLLCRealtimeSegmentManagerTest {
           LLCSegmentName latestSegment = partitionToLatestSegments.get(String.valueOf(randomlySelectedPartition));
           LLCRealtimeSegmentZKMetadata latestMetadata =
               segmentManager.getRealtimeSegmentZKMetadata(tableName, latestSegment.getSegmentName(), null);
-          segmentManager
-              .updateOldSegmentMetadataZNRecord(tableName, latestSegment, latestMetadata.getStartOffset() + 100);
           LLCSegmentName newLlcSegmentName =
               new LLCSegmentName(rawTableName, randomlySelectedPartition, latestSegment.getSequenceNumber() + 1,
                   System.currentTimeMillis());
           CommittingSegmentDescriptor committingSegmentDescriptor =
               new CommittingSegmentDescriptor(latestSegment.getSegmentName(), latestMetadata.getStartOffset() + 100, 0);
+          segmentManager.updateOldSegmentMetadataZNRecord(tableName, latestSegment,
+                  latestMetadata.getStartOffset() + 100, committingSegmentDescriptor);
           segmentManager.createNewSegmentMetadataZNRecord(tableConfig, latestSegment, newLlcSegmentName,
               expectedPartitionAssignment, committingSegmentDescriptor, false);
 
@@ -632,7 +632,8 @@ public class PinotLLCRealtimeSegmentManagerTest {
             LLCRealtimeSegmentZKMetadata latestMetadata =
                 segmentManager.getRealtimeSegmentZKMetadata(tableName, latestSegment.getSegmentName(), null);
             segmentManager
-                .updateOldSegmentMetadataZNRecord(tableName, latestSegment, latestMetadata.getStartOffset() + 100);
+                .updateOldSegmentMetadataZNRecord(tableName, latestSegment, latestMetadata.getStartOffset() + 100,
+                        new CommittingSegmentDescriptor(latestSegment.getSegmentName(), latestMetadata.getStartOffset() + 100, 0));
             idealState = idealStateBuilder.setSegmentState(latestSegment.getSegmentName(), "ONLINE").build();
 
             // get old state
@@ -666,7 +667,8 @@ public class PinotLLCRealtimeSegmentManagerTest {
               LLCRealtimeSegmentZKMetadata latestMetadata =
                   segmentManager.getRealtimeSegmentZKMetadata(tableName, latestSegment.getSegmentName(), null);
               segmentManager
-                  .updateOldSegmentMetadataZNRecord(tableName, latestSegment, latestMetadata.getStartOffset() + 100);
+                  .updateOldSegmentMetadataZNRecord(tableName, latestSegment, latestMetadata.getStartOffset() + 100,
+                          new CommittingSegmentDescriptor(latestSegment.getSegmentName(), latestMetadata.getStartOffset() + 100, 0));
 
               // get old state
               nPartitions = expectedPartitionAssignment.getNumPartitions();
@@ -894,7 +896,8 @@ public class PinotLLCRealtimeSegmentManagerTest {
     segmentManager.IS_CONNECTED = false;
     reqParams.withSegmentName(committingSegmentMetadata.getSegmentName()).withOffset(nextOffset);
     CommittingSegmentDescriptor committingSegmentDescriptor =
-        CommittingSegmentDescriptor.fromSegmentCompletionReqParams(reqParams);
+        CommittingSegmentDescriptor.fromSegmentCompletionReqParamsAndMetadata(reqParams,
+                segmentManager.newMockSegmentMetadata());
     boolean status = segmentManager.commitSegmentMetadata(rawTableName, committingSegmentDescriptor);
     Assert.assertFalse(status);
     Assert.assertEquals(segmentManager._nCallsToUpdateHelix, 0);  // Idealstate not updated
@@ -950,7 +953,7 @@ public class PinotLLCRealtimeSegmentManagerTest {
     Set<String> prevInstances = idealState.getInstanceSet(committingSegmentMetadata.getSegmentName());
     reqParams.withSegmentName(committingSegmentMetadata.getSegmentName()).withOffset(nextOffset);
     CommittingSegmentDescriptor committingSegmentDescriptor =
-        CommittingSegmentDescriptor.fromSegmentCompletionReqParams(reqParams);
+        CommittingSegmentDescriptor.fromSegmentCompletionReqParamsAndMetadata(reqParams, segmentManager.newMockSegmentMetadata());
     boolean status = segmentManager.commitSegmentMetadata(rawTableName, committingSegmentDescriptor);
     segmentManager.verifyMetadataInteractions();
     Assert.assertTrue(status);
@@ -973,7 +976,8 @@ public class PinotLLCRealtimeSegmentManagerTest {
     segmentManager._records.clear();
     prevInstances = idealState.getInstanceSet(committingSegmentMetadata.getSegmentName());
     reqParams.withSegmentName(committingSegmentMetadata.getSegmentName()).withOffset(nextOffset);
-    committingSegmentDescriptor = CommittingSegmentDescriptor.fromSegmentCompletionReqParams(reqParams);
+    committingSegmentDescriptor = CommittingSegmentDescriptor.fromSegmentCompletionReqParamsAndMetadata(reqParams,
+            segmentManager.newMockSegmentMetadata());
     status = segmentManager.commitSegmentMetadata(rawTableName, committingSegmentDescriptor);
     segmentManager.verifyMetadataInteractions();
     Assert.assertTrue(status);
@@ -991,7 +995,9 @@ public class PinotLLCRealtimeSegmentManagerTest {
     segmentManager._paths.clear();
     segmentManager._records.clear();
     reqParams.withSegmentName(committingSegmentMetadata.getSegmentName()).withOffset(nextOffset);
-    committingSegmentDescriptor = CommittingSegmentDescriptor.fromSegmentCompletionReqParams(reqParams);
+    // We do not expect the segment metadata to be used. Thus reuse the current metadata.
+    committingSegmentDescriptor = CommittingSegmentDescriptor.fromSegmentCompletionReqParamsAndMetadata(reqParams,
+            segmentManager.getMockSegmentMetadata());
     status = segmentManager.commitSegmentMetadata(rawTableName, committingSegmentDescriptor);
     segmentManager.verifyMetadataInteractions();
     Assert.assertFalse(status);
@@ -1006,7 +1012,9 @@ public class PinotLLCRealtimeSegmentManagerTest {
     segmentManager._paths.clear();
     segmentManager._records.clear();
     reqParams.withSegmentName(committingSegmentMetadata.getSegmentName()).withOffset(nextOffset);
-    committingSegmentDescriptor = CommittingSegmentDescriptor.fromSegmentCompletionReqParams(reqParams);
+    // We do not expect the segment metadata to be used. Thus reuse the current metadata.
+    committingSegmentDescriptor = CommittingSegmentDescriptor.fromSegmentCompletionReqParamsAndMetadata(reqParams,
+            segmentManager.getMockSegmentMetadata());
     status = segmentManager.commitSegmentMetadata(rawTableName, committingSegmentDescriptor);
     segmentManager.verifyMetadataInteractions();
     Assert.assertFalse(status);
@@ -1021,7 +1029,8 @@ public class PinotLLCRealtimeSegmentManagerTest {
     committingSegmentMetadata = new LLCRealtimeSegmentZKMetadata(newZnRec);
     prevInstances = idealState.getInstanceSet(committingSegmentMetadata.getSegmentName());
     reqParams.withSegmentName(committingSegmentMetadata.getSegmentName()).withOffset(nextOffset);
-    committingSegmentDescriptor = CommittingSegmentDescriptor.fromSegmentCompletionReqParams(reqParams);
+    committingSegmentDescriptor = CommittingSegmentDescriptor.fromSegmentCompletionReqParamsAndMetadata(reqParams,
+            segmentManager.newMockSegmentMetadata());
     status = segmentManager.commitSegmentMetadata(rawTableName, committingSegmentDescriptor);
     segmentManager.verifyMetadataInteractions();
     Assert.assertTrue(status);
@@ -1091,7 +1100,8 @@ public class PinotLLCRealtimeSegmentManagerTest {
 
     reqParams.withSegmentName(committingSegmentMetadata.getSegmentName()).withOffset(nextOffset);
     CommittingSegmentDescriptor committingSegmentDescriptor =
-        CommittingSegmentDescriptor.fromSegmentCompletionReqParams(reqParams);
+        CommittingSegmentDescriptor.fromSegmentCompletionReqParamsAndMetadata(reqParams,
+                segmentManager1.newMockSegmentMetadata());
     boolean status = segmentManager1.commitSegmentMetadata(rawTableName, committingSegmentDescriptor);
     Assert.assertTrue(status);  // Committing segment metadata succeeded.
 
@@ -1365,6 +1375,19 @@ public class PinotLLCRealtimeSegmentManagerTest {
       _tableConfigStore = new TableConfigStore();
     }
 
+    private SegmentMetadataImpl newMockSegmentMetadata() {
+      segmentMetadata = mock(SegmentMetadataImpl.class);
+      when(segmentMetadata.getCrc()).thenReturn(FakePinotLLCRealtimeSegmentManager.CRC);
+      when(segmentMetadata.getTimeInterval()).thenReturn(FakePinotLLCRealtimeSegmentManager.INTERVAL);
+      when(segmentMetadata.getVersion()).thenReturn(FakePinotLLCRealtimeSegmentManager.SEGMENT_VERSION);
+      when(segmentMetadata.getTotalRawDocs()).thenReturn(FakePinotLLCRealtimeSegmentManager.NUM_DOCS);
+      return segmentMetadata;
+    }
+
+    private SegmentMetadataImpl getMockSegmentMetadata() {
+      return segmentMetadata;
+    }
+
     void addTableToStore(String tableName, TableConfig tableConfig, int nStreamPartitions) {
       _tableConfigStore.addTable(tableName, tableConfig, nStreamPartitions);
     }
@@ -1457,11 +1480,6 @@ public class PinotLLCRealtimeSegmentManagerTest {
           partitionAssignment, committingSegmentDescriptor, isNewTableSetup);
     }
 
-    @Override
-    protected boolean updateOldSegmentMetadataZNRecord(String realtimeTableName,
-        LLCSegmentName committingLLCSegmentName, long nextOffset) {
-      return super.updateOldSegmentMetadataZNRecord(realtimeTableName, committingLLCSegmentName, nextOffset);
-    }
 
     @Override
     public LLCRealtimeSegmentZKMetadata getRealtimeSegmentZKMetadata(String realtimeTableName, String segmentName,
@@ -1482,15 +1500,6 @@ public class PinotLLCRealtimeSegmentManagerTest {
       return metadata;
     }
 
-    @Override
-    protected SegmentMetadataImpl extractSegmentMetadata(final String rawTableName, final String segmentNameStr) {
-      segmentMetadata = mock(SegmentMetadataImpl.class);
-      when(segmentMetadata.getCrc()).thenReturn(CRC);
-      when(segmentMetadata.getTimeInterval()).thenReturn(INTERVAL);
-      when(segmentMetadata.getVersion()).thenReturn(SEGMENT_VERSION);
-      when(segmentMetadata.getTotalRawDocs()).thenReturn(NUM_DOCS);
-      return segmentMetadata;
-    }
 
     public void verifyMetadataInteractions() {
       verify(segmentMetadata, times(1)).getCrc();
diff --git a/pinot-controller/src/test/java/org/apache/pinot/controller/helix/core/realtime/SegmentCompletionTest.java b/pinot-controller/src/test/java/org/apache/pinot/controller/helix/core/realtime/SegmentCompletionTest.java
index 6ebd34f..3b8503e 100644
--- a/pinot-controller/src/test/java/org/apache/pinot/controller/helix/core/realtime/SegmentCompletionTest.java
+++ b/pinot-controller/src/test/java/org/apache/pinot/controller/helix/core/realtime/SegmentCompletionTest.java
@@ -151,7 +151,8 @@ public class SegmentCompletionTest {
 
     segmentCompletionMgr._secconds += 5;
     params = new Request.Params().withInstanceId(s2).withOffset(s2Offset).withSegmentName(segmentNameStr);
-    response = segmentCompletionMgr.segmentCommitEnd(params, true, false);
+    response = segmentCompletionMgr.segmentCommitEnd(params, true, false,
+            CommittingSegmentDescriptor.fromSegmentCompletionReqParams(params));
     Assert.assertEquals(response.getStatus(), SegmentCompletionProtocol.ControllerResponseStatus.COMMIT_SUCCESS);
 
     // Now the FSM should have disappeared from the map
@@ -220,7 +221,8 @@ public class SegmentCompletionTest {
 
     segmentCompletionMgr._secconds += 5;
     params = new Request.Params().withInstanceId(s2).withOffset(s2Offset).withSegmentName(segmentNameStr);
-    response = segmentCompletionMgr.segmentCommitEnd(params, true, false);
+    response = segmentCompletionMgr.segmentCommitEnd(params, true, false,
+            CommittingSegmentDescriptor.fromSegmentCompletionReqParams(params));
     Assert.assertEquals(response.getStatus(), SegmentCompletionProtocol.ControllerResponseStatus.COMMIT_SUCCESS);
 
     // Now the FSM should have disappeared from the map
@@ -330,7 +332,8 @@ public class SegmentCompletionTest {
     segmentCompletionMgr._secconds += 5;
     params = new Request.Params().withInstanceId(s2).withOffset(s2Offset).withSegmentName(segmentNameStr)
         .withSegmentLocation("doNotCommitMe");
-    response = segmentCompletionMgr.segmentCommitEnd(params, true, true);
+    response = segmentCompletionMgr.segmentCommitEnd(params, true, true,
+            CommittingSegmentDescriptor.fromSegmentCompletionReqParams(params));
     Assert.assertEquals(response.getStatus(), SegmentCompletionProtocol.ControllerResponseStatus.FAILED);
 
     // Now the FSM should have aborted
@@ -363,7 +366,8 @@ public class SegmentCompletionTest {
     segmentCompletionMgr._secconds += 5;
     params = new Request.Params().withInstanceId(s3).withOffset(s2Offset).withSegmentName(segmentNameStr)
         .withSegmentLocation("location");
-    response = segmentCompletionMgr.segmentCommitEnd(params, true, true);
+    response = segmentCompletionMgr.segmentCommitEnd(params, true, true,
+            CommittingSegmentDescriptor.fromSegmentCompletionReqParams(params));
     Assert.assertEquals(response.getStatus(), ControllerResponseStatus.COMMIT_SUCCESS);
     // And the FSM should be removed.
     Assert.assertFalse(fsmMap.containsKey(segmentNameStr));
@@ -415,7 +419,8 @@ public class SegmentCompletionTest {
     segmentCompletionMgr._secconds += 5;
     params = new Request.Params().withInstanceId(s2).withOffset(s2Offset).withSegmentName(segmentNameStr)
         .withSegmentLocation("location");
-    response = segmentCompletionMgr.segmentCommitEnd(params, true, true);
+    response = segmentCompletionMgr.segmentCommitEnd(params, true, true,
+            CommittingSegmentDescriptor.fromSegmentCompletionReqParams(params));
     Assert.assertEquals(response.getStatus(), SegmentCompletionProtocol.ControllerResponseStatus.COMMIT_SUCCESS);
 
     // Now the FSM should have disappeared from the map
@@ -464,7 +469,8 @@ public class SegmentCompletionTest {
     segmentCompletionMgr._secconds += 5;
     params = new Request.Params().withInstanceId(s3).withOffset(s3Offset).withSegmentName(segmentNameStr)
         .withSegmentLocation("location");
-    response = segmentCompletionMgr.segmentCommitEnd(params, true, true);
+    response = segmentCompletionMgr.segmentCommitEnd(params, true, true,
+            CommittingSegmentDescriptor.fromSegmentCompletionReqParams(params));
     Assert.assertEquals(response.getStatus(), SegmentCompletionProtocol.ControllerResponseStatus.FAILED);
 
     // Now the FSM should have disappeared from the map
@@ -523,7 +529,8 @@ public class SegmentCompletionTest {
 
     segmentCompletionMgr._secconds += 5;
     params = new Request.Params().withInstanceId(s2).withOffset(s2Offset).withSegmentName(segmentNameStr);
-    response = segmentCompletionMgr.segmentCommitEnd(params, true, false);
+    response = segmentCompletionMgr.segmentCommitEnd(params, true, false,
+            CommittingSegmentDescriptor.fromSegmentCompletionReqParams(params));
     Assert.assertEquals(response.getStatus(), SegmentCompletionProtocol.ControllerResponseStatus.COMMIT_SUCCESS);
 
     // Now the FSM should have disappeared from the map
@@ -595,7 +602,8 @@ public class SegmentCompletionTest {
 
     segmentCompletionMgr._secconds += 5;
     params = new Request.Params().withInstanceId(s1).withOffset(s1Offset).withSegmentName(segmentNameStr);
-    response = segmentCompletionMgr.segmentCommitEnd(params, true, false);
+    response = segmentCompletionMgr.segmentCommitEnd(params, true, false,
+            CommittingSegmentDescriptor.fromSegmentCompletionReqParams(params));
     Assert.assertEquals(response.getStatus(), SegmentCompletionProtocol.ControllerResponseStatus.COMMIT_SUCCESS);
     // We ask S2 to keep the segment
     params = new Request.Params().withInstanceId(s2).withOffset(s1Offset).withSegmentName(segmentNameStr)
@@ -661,7 +669,8 @@ public class SegmentCompletionTest {
     segmentCompletionMgr._secconds += 5;
     params = new Request.Params().withInstanceId(s2).withOffset(s2Offset).withSegmentName(segmentNameStr)
         .withSegmentLocation("location");
-    response = segmentCompletionMgr.segmentCommitEnd(params, true, isSplitCommit);
+    response = segmentCompletionMgr.segmentCommitEnd(params, true, isSplitCommit,
+            CommittingSegmentDescriptor.fromSegmentCompletionReqParams(params));
     Assert.assertEquals(response.getStatus(), SegmentCompletionProtocol.ControllerResponseStatus.COMMIT_SUCCESS);
     // Now the FSM should have disappeared from the map
     Assert.assertFalse(fsmMap.containsKey(segmentNameStr));
@@ -855,7 +864,8 @@ public class SegmentCompletionTest {
     long commitTimeMs = (segmentCompletionMgr._secconds - startTime) * 1000;
     Assert.assertEquals(commitTimeMap.get(tableName).longValue(), commitTimeMs);
     segmentCompletionMgr._secconds += 55;
-    response = segmentCompletionMgr.segmentCommitEnd(params, true, false);
+    response = segmentCompletionMgr.segmentCommitEnd(params, true, false,
+            CommittingSegmentDescriptor.fromSegmentCompletionReqParams(params));
     Assert.assertEquals(response.getStatus(), SegmentCompletionProtocol.ControllerResponseStatus.COMMIT_SUCCESS);
     // now FSM should be out of the map.
     Assert.assertFalse((fsmMap.containsKey(segmentNameStr)));


---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscribe@pinot.apache.org
For additional commands, e-mail: commits-help@pinot.apache.org