You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@pinot.apache.org by ne...@apache.org on 2022/06/15 21:39:24 UTC

[pinot] branch master updated: Allow moveToFinalLocation in METADATA push based on config (#8823)

This is an automated email from the ASF dual-hosted git repository.

nehapawar pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/pinot.git


The following commit(s) were added to refs/heads/master by this push:
     new 378bdec11c Allow moveToFinalLocation in METADATA push based on config (#8823)
378bdec11c is described below

commit 378bdec11c68a54366cd98b0f2eda5807e454e2f
Author: Neha Pawar <ne...@gmail.com>
AuthorDate: Wed Jun 15 14:39:18 2022 -0700

    Allow moveToFinalLocation in METADATA push based on config (#8823)
    
    METADATA push didn't allow the option of moveSegmentToFinalLocation. This meant that if someone had generated segments in a location that was not the deep store, there was absolutely no way to move those segments into deep store without manual scripting.
---
 .../common/utils/FileUploadDownloadClient.java     |   6 +
 .../PinotSegmentUploadDownloadRestletResource.java |  46 +++--
 .../pinot/controller/api/upload/ZKOperator.java    |  79 ++++---
 .../controller/api/upload/ZKOperatorTest.java      | 141 ++++++++++---
 .../tests/ClusterIntegrationTestUtils.java         |  20 +-
 .../tests/SegmentUploadIntegrationTest.java        | 229 +++++++++++++++++++++
 .../segment/local/utils/SegmentPushUtils.java      |   4 +
 .../spi/ingestion/batch/spec/PushJobSpec.java      |  13 ++
 8 files changed, 472 insertions(+), 66 deletions(-)

diff --git a/pinot-common/src/main/java/org/apache/pinot/common/utils/FileUploadDownloadClient.java b/pinot-common/src/main/java/org/apache/pinot/common/utils/FileUploadDownloadClient.java
index 55931077b7..767e0f16c4 100644
--- a/pinot-common/src/main/java/org/apache/pinot/common/utils/FileUploadDownloadClient.java
+++ b/pinot-common/src/main/java/org/apache/pinot/common/utils/FileUploadDownloadClient.java
@@ -73,6 +73,12 @@ public class FileUploadDownloadClient implements AutoCloseable {
     public static final String UPLOAD_TYPE = "UPLOAD_TYPE";
     public static final String REFRESH_ONLY = "REFRESH_ONLY";
     public static final String DOWNLOAD_URI = "DOWNLOAD_URI";
+
+    /**
+     * This header is only used for METADATA push, to allow controller to copy segment to deep store,
+     * if segment was not placed in the deep store to begin with
+     */
+    public static final String COPY_SEGMENT_TO_DEEP_STORE = "COPY_SEGMENT_TO_DEEP_STORE";
     public static final String SEGMENT_ZK_METADATA_CUSTOM_MAP_MODIFIER = "Pinot-SegmentZKMetadataCustomMapModifier";
     public static final String CRYPTER = "CRYPTER";
   }
diff --git a/pinot-controller/src/main/java/org/apache/pinot/controller/api/resources/PinotSegmentUploadDownloadRestletResource.java b/pinot-controller/src/main/java/org/apache/pinot/controller/api/resources/PinotSegmentUploadDownloadRestletResource.java
index e675b7637e..cda4e37b50 100644
--- a/pinot-controller/src/main/java/org/apache/pinot/controller/api/resources/PinotSegmentUploadDownloadRestletResource.java
+++ b/pinot-controller/src/main/java/org/apache/pinot/controller/api/resources/PinotSegmentUploadDownloadRestletResource.java
@@ -197,7 +197,7 @@ public class PinotSegmentUploadDownloadRestletResource {
   }
 
   private SuccessResponse uploadSegment(@Nullable String tableName, TableType tableType,
-      @Nullable FormDataMultiPart multiPart, boolean moveSegmentToFinalLocation, boolean enableParallelPushProtection,
+      @Nullable FormDataMultiPart multiPart, boolean copySegmentToFinalLocation, boolean enableParallelPushProtection,
       boolean allowRefresh, HttpHeaders headers, Request request) {
     if (StringUtils.isNotEmpty(tableName)) {
       TableType tableTypeFromTableName = TableNameBuilder.getTableTypeFromTableName(tableName);
@@ -213,13 +213,15 @@ public class PinotSegmentUploadDownloadRestletResource {
     extractHttpHeader(headers, CommonConstants.Controller.TABLE_NAME_HTTP_HEADER);
 
     String uploadTypeStr = extractHttpHeader(headers, FileUploadDownloadClient.CustomHeaders.UPLOAD_TYPE);
-    String downloadURI = extractHttpHeader(headers, FileUploadDownloadClient.CustomHeaders.DOWNLOAD_URI);
+    String sourceDownloadURIStr = extractHttpHeader(headers, FileUploadDownloadClient.CustomHeaders.DOWNLOAD_URI);
     String crypterClassNameInHeader = extractHttpHeader(headers, FileUploadDownloadClient.CustomHeaders.CRYPTER);
     String ingestionDescriptor = extractHttpHeader(headers, CommonConstants.Controller.INGESTION_DESCRIPTOR);
 
     File tempEncryptedFile = null;
     File tempDecryptedFile = null;
     File tempSegmentDir = null;
+    // The downloadUri for putting into segment zk metadata
+    String segmentDownloadURIStr = sourceDownloadURIStr;
     try {
       ControllerFilePathProvider provider = ControllerFilePathProvider.getInstance();
       String tempFileName = TMP_DIR_PREFIX + UUID.randomUUID();
@@ -238,20 +240,22 @@ public class PinotSegmentUploadDownloadRestletResource {
                 "Segment file (as multipart/form-data) is required for SEGMENT upload mode",
                 Response.Status.BAD_REQUEST);
           }
-          if (!moveSegmentToFinalLocation && StringUtils.isEmpty(downloadURI)) {
+          if (!copySegmentToFinalLocation && StringUtils.isEmpty(sourceDownloadURIStr)) {
             throw new ControllerApplicationException(LOGGER,
-                "Download URI is required if segment should not be copied to the deep store",
+                "Source download URI is required in header field 'DOWNLOAD_URI' if segment should not be copied to "
+                    + "the deep store",
                 Response.Status.BAD_REQUEST);
           }
           createSegmentFileFromMultipart(multiPart, destFile);
           segmentSizeInBytes = destFile.length();
           break;
         case URI:
-          if (StringUtils.isEmpty(downloadURI)) {
-            throw new ControllerApplicationException(LOGGER, "Download URI is required for URI upload mode",
+          if (StringUtils.isEmpty(sourceDownloadURIStr)) {
+            throw new ControllerApplicationException(LOGGER,
+                "Source download URI is required in header field 'DOWNLOAD_URI' for URI upload mode",
                 Response.Status.BAD_REQUEST);
           }
-          downloadSegmentFileFromURI(downloadURI, destFile, tableName);
+          downloadSegmentFileFromURI(sourceDownloadURIStr, destFile, tableName);
           segmentSizeInBytes = destFile.length();
           break;
         case METADATA:
@@ -260,14 +264,19 @@ public class PinotSegmentUploadDownloadRestletResource {
                 "Segment metadata file (as multipart/form-data) is required for METADATA upload mode",
                 Response.Status.BAD_REQUEST);
           }
-          if (StringUtils.isEmpty(downloadURI)) {
-            throw new ControllerApplicationException(LOGGER, "Download URI is required for METADATA upload mode",
+          if (StringUtils.isEmpty(sourceDownloadURIStr)) {
+            throw new ControllerApplicationException(LOGGER,
+                "Source download URI is required in header field 'DOWNLOAD_URI' for METADATA upload mode",
                 Response.Status.BAD_REQUEST);
           }
-          moveSegmentToFinalLocation = false;
+          // override copySegmentToFinalLocation if override provided in headers:COPY_SEGMENT_TO_DEEP_STORE
+          // else set to false for backward compatibility
+          String copySegmentToDeepStore =
+              extractHttpHeader(headers, FileUploadDownloadClient.CustomHeaders.COPY_SEGMENT_TO_DEEP_STORE);
+          copySegmentToFinalLocation = Boolean.parseBoolean(copySegmentToDeepStore);
           createSegmentFileFromMultipart(multiPart, destFile);
           try {
-            URI segmentURI = new URI(downloadURI);
+            URI segmentURI = new URI(sourceDownloadURIStr);
             PinotFS pinotFS = PinotFSFactory.create(segmentURI.getScheme());
             segmentSizeInBytes = pinotFS.length(segmentURI);
           } catch (Exception e) {
@@ -332,24 +341,25 @@ public class PinotSegmentUploadDownloadRestletResource {
 
       // Update download URI if controller is responsible for moving the segment to the deep store
       URI finalSegmentLocationURI = null;
-      if (moveSegmentToFinalLocation) {
+      if (copySegmentToFinalLocation) {
         URI dataDirURI = provider.getDataDirURI();
         String dataDirPath = dataDirURI.toString();
         String encodedSegmentName = URIUtils.encode(segmentName);
         String finalSegmentLocationPath = URIUtils.getPath(dataDirPath, rawTableName, encodedSegmentName);
         if (dataDirURI.getScheme().equalsIgnoreCase(CommonConstants.Segment.LOCAL_SEGMENT_SCHEME)) {
-          downloadURI = URIUtils.getPath(provider.getVip(), "segments", rawTableName, encodedSegmentName);
+          segmentDownloadURIStr = URIUtils.getPath(provider.getVip(), "segments", rawTableName, encodedSegmentName);
         } else {
-          downloadURI = finalSegmentLocationPath;
+          segmentDownloadURIStr = finalSegmentLocationPath;
         }
         finalSegmentLocationURI = URIUtils.getUri(finalSegmentLocationPath);
       }
-      LOGGER.info("Using download URI: {} for segment: {} of table: {} (move segment: {})", downloadURI, segmentFile,
-          tableNameWithType, moveSegmentToFinalLocation);
+      LOGGER.info("Using segment download URI: {} for segment: {} of table: {} (move segment: {})",
+          segmentDownloadURIStr, segmentFile, tableNameWithType, copySegmentToFinalLocation);
 
       ZKOperator zkOperator = new ZKOperator(_pinotHelixResourceManager, _controllerConf, _controllerMetrics);
-      zkOperator.completeSegmentOperations(tableNameWithType, segmentMetadata, finalSegmentLocationURI, segmentFile,
-          downloadURI, crypterName, segmentSizeInBytes, enableParallelPushProtection, allowRefresh, headers);
+      zkOperator.completeSegmentOperations(tableNameWithType, segmentMetadata, uploadType, finalSegmentLocationURI,
+          segmentFile, sourceDownloadURIStr, segmentDownloadURIStr, crypterName, segmentSizeInBytes,
+          enableParallelPushProtection, allowRefresh, headers);
 
       return new SuccessResponse("Successfully uploaded segment: " + segmentName + " of table: " + tableNameWithType);
     } catch (WebApplicationException e) {
diff --git a/pinot-controller/src/main/java/org/apache/pinot/controller/api/upload/ZKOperator.java b/pinot-controller/src/main/java/org/apache/pinot/controller/api/upload/ZKOperator.java
index b75d112ce2..62bc770846 100644
--- a/pinot-controller/src/main/java/org/apache/pinot/controller/api/upload/ZKOperator.java
+++ b/pinot-controller/src/main/java/org/apache/pinot/controller/api/upload/ZKOperator.java
@@ -18,6 +18,7 @@
  */
 package org.apache.pinot.controller.api.upload;
 
+import com.google.common.base.Preconditions;
 import java.io.File;
 import java.net.URI;
 import javax.annotation.Nullable;
@@ -29,6 +30,7 @@ import org.apache.pinot.common.metadata.segment.SegmentZKMetadataCustomMapModifi
 import org.apache.pinot.common.metrics.ControllerMeter;
 import org.apache.pinot.common.metrics.ControllerMetrics;
 import org.apache.pinot.common.utils.FileUploadDownloadClient;
+import org.apache.pinot.common.utils.FileUploadDownloadClient.FileUploadType;
 import org.apache.pinot.controller.ControllerConf;
 import org.apache.pinot.controller.api.exception.ControllerApplicationException;
 import org.apache.pinot.controller.helix.core.PinotHelixResourceManager;
@@ -60,7 +62,8 @@ public class ZKOperator {
   }
 
   public void completeSegmentOperations(String tableNameWithType, SegmentMetadata segmentMetadata,
-      @Nullable URI finalSegmentLocationURI, File segmentFile, String downloadUrl, @Nullable String crypterName,
+      FileUploadType uploadType, @Nullable URI finalSegmentLocationURI, File segmentFile,
+      @Nullable String sourceDownloadURIStr, String segmentDownloadURIStr, @Nullable String crypterName,
       long segmentSizeInBytes, boolean enableParallelPushProtection, boolean allowRefresh, HttpHeaders headers)
       throws Exception {
     String segmentName = segmentMetadata.getName();
@@ -76,8 +79,9 @@ public class ZKOperator {
             Response.Status.GONE);
       }
       LOGGER.info("Adding new segment: {} to table: {}", segmentName, tableNameWithType);
-      processNewSegment(tableNameWithType, segmentMetadata, finalSegmentLocationURI, segmentFile, downloadUrl,
-          crypterName, segmentSizeInBytes, enableParallelPushProtection, headers);
+      processNewSegment(tableNameWithType, segmentMetadata, uploadType, finalSegmentLocationURI, segmentFile,
+          sourceDownloadURIStr, segmentDownloadURIStr, crypterName, segmentSizeInBytes, enableParallelPushProtection,
+          headers);
     } else {
       // Refresh an existing segment
       if (!allowRefresh) {
@@ -89,16 +93,16 @@ public class ZKOperator {
                 tableNameWithType), Response.Status.CONFLICT);
       }
       LOGGER.info("Segment: {} already exists in table: {}, refreshing it", segmentName, tableNameWithType);
-      processExistingSegment(tableNameWithType, segmentMetadata, existingSegmentMetadataZNRecord,
-          finalSegmentLocationURI, segmentFile, downloadUrl, crypterName, segmentSizeInBytes,
-          enableParallelPushProtection, headers);
+      processExistingSegment(tableNameWithType, segmentMetadata, uploadType, existingSegmentMetadataZNRecord,
+          finalSegmentLocationURI, segmentFile, sourceDownloadURIStr, segmentDownloadURIStr, crypterName,
+          segmentSizeInBytes, enableParallelPushProtection, headers);
     }
   }
 
   private void processExistingSegment(String tableNameWithType, SegmentMetadata segmentMetadata,
-      ZNRecord existingSegmentMetadataZNRecord, @Nullable URI finalSegmentLocationURI, File segmentFile,
-      String downloadUrl, @Nullable String crypterName, long segmentSizeInBytes, boolean enableParallelPushProtection,
-      HttpHeaders headers)
+      FileUploadType uploadType, ZNRecord existingSegmentMetadataZNRecord, @Nullable URI finalSegmentLocationURI,
+      File segmentFile, @Nullable String sourceDownloadURIStr, String segmentDownloadURIStr,
+      @Nullable String crypterName, long segmentSizeInBytes, boolean enableParallelPushProtection, HttpHeaders headers)
       throws Exception {
     String segmentName = segmentMetadata.getName();
     int expectedVersion = existingSegmentMetadataZNRecord.getVersion();
@@ -179,8 +183,7 @@ public class ZKOperator {
             "New segment crc {} is different than the existing segment crc {}. Updating ZK metadata and refreshing "
                 + "segment {}", newCrc, existingCrc, segmentName);
         if (finalSegmentLocationURI != null) {
-          moveSegmentToPermanentDirectory(segmentFile, finalSegmentLocationURI);
-          LOGGER.info("Moved segment: {} of table: {} to final location: {}", segmentName, tableNameWithType,
+          copySegmentToDeepStore(tableNameWithType, segmentName, uploadType, segmentFile, sourceDownloadURIStr,
               finalSegmentLocationURI);
         }
 
@@ -191,12 +194,12 @@ public class ZKOperator {
         if (customMapModifier == null) {
           // If no modifier is provided, use the custom map from the segment metadata
           segmentZKMetadata.setCustomMap(null);
-          ZKMetadataUtils.refreshSegmentZKMetadata(tableNameWithType, segmentZKMetadata, segmentMetadata, downloadUrl,
-              crypterName, segmentSizeInBytes);
+          ZKMetadataUtils.refreshSegmentZKMetadata(tableNameWithType, segmentZKMetadata, segmentMetadata,
+              segmentDownloadURIStr, crypterName, segmentSizeInBytes);
         } else {
           // If modifier is provided, first set the custom map from the segment metadata, then apply the modifier
-          ZKMetadataUtils.refreshSegmentZKMetadata(tableNameWithType, segmentZKMetadata, segmentMetadata, downloadUrl,
-              crypterName, segmentSizeInBytes);
+          ZKMetadataUtils.refreshSegmentZKMetadata(tableNameWithType, segmentZKMetadata, segmentMetadata,
+              segmentDownloadURIStr, crypterName, segmentSizeInBytes);
           segmentZKMetadata.setCustomMap(customMapModifier.modifyMap(segmentZKMetadata.getCustomMap()));
         }
         if (!_pinotHelixResourceManager.updateZkMetadata(tableNameWithType, segmentZKMetadata, expectedVersion)) {
@@ -237,16 +240,17 @@ public class ZKOperator {
     }
   }
 
-  private void processNewSegment(String tableNameWithType, SegmentMetadata segmentMetadata,
-      @Nullable URI finalSegmentLocationURI, File segmentFile, String downloadUrl, @Nullable String crypterName,
-      long segmentSizeInBytes, boolean enableParallelPushProtection, HttpHeaders headers)
+  private void processNewSegment(String tableNameWithType, SegmentMetadata segmentMetadata, FileUploadType uploadType,
+      @Nullable URI finalSegmentLocationURI, File segmentFile, @Nullable String sourceDownloadURIStr,
+      String segmentDownloadURIStr, @Nullable String crypterName, long segmentSizeInBytes,
+      boolean enableParallelPushProtection, HttpHeaders headers)
       throws Exception {
     String segmentName = segmentMetadata.getName();
     SegmentZKMetadata newSegmentZKMetadata;
     try {
       newSegmentZKMetadata =
-          ZKMetadataUtils.createSegmentZKMetadata(tableNameWithType, segmentMetadata, downloadUrl, crypterName,
-              segmentSizeInBytes);
+          ZKMetadataUtils.createSegmentZKMetadata(tableNameWithType, segmentMetadata, segmentDownloadURIStr,
+              crypterName, segmentSizeInBytes);
     } catch (IllegalArgumentException e) {
       throw new ControllerApplicationException(LOGGER,
           String.format("Got invalid segment metadata when adding segment: %s for table: %s, reason: %s", segmentName,
@@ -274,8 +278,7 @@ public class ZKOperator {
 
     if (finalSegmentLocationURI != null) {
       try {
-        moveSegmentToPermanentDirectory(segmentFile, finalSegmentLocationURI);
-        LOGGER.info("Moved segment: {} of table: {} to final location: {}", segmentName, tableNameWithType,
+        copySegmentToDeepStore(tableNameWithType, segmentName, uploadType, segmentFile, sourceDownloadURIStr,
             finalSegmentLocationURI);
       } catch (Exception e) {
         // Cleanup the Zk entry and the segment from the permanent directory if it exists.
@@ -310,9 +313,39 @@ public class ZKOperator {
     }
   }
 
-  private void moveSegmentToPermanentDirectory(File segmentFile, URI finalSegmentLocationURI)
+  private void copySegmentToDeepStore(String tableNameWithType, String segmentName, FileUploadType uploadType,
+      File segmentFile, String sourceDownloadURIStr, URI finalSegmentLocationURI)
+      throws Exception {
+    if (uploadType == FileUploadType.METADATA) {
+      // In Metadata push, local segmentFile only contains metadata.
+      // Copy segment over from sourceDownloadURI to final location.
+      copyFromSegmentURIToDeepStore(new URI(sourceDownloadURIStr), finalSegmentLocationURI);
+      LOGGER.info("Copied segment: {} of table: {} to final location: {}", segmentName, tableNameWithType,
+          finalSegmentLocationURI);
+    } else {
+      // In push types other than METADATA, local segmentFile contains the complete segment.
+      // Move local segment to final location
+      copyFromSegmentFileToDeepStore(segmentFile, finalSegmentLocationURI);
+      LOGGER.info("Copied segment: {} of table: {} to final location: {}", segmentName, tableNameWithType,
+          finalSegmentLocationURI);
+    }
+  }
+
+  private void copyFromSegmentFileToDeepStore(File segmentFile, URI finalSegmentLocationURI)
       throws Exception {
     LOGGER.info("Copying segment from: {} to: {}", segmentFile.getAbsolutePath(), finalSegmentLocationURI);
     PinotFSFactory.create(finalSegmentLocationURI.getScheme()).copyFromLocalFile(segmentFile, finalSegmentLocationURI);
   }
+
+  private void copyFromSegmentURIToDeepStore(URI sourceDownloadURI, URI finalSegmentLocationURI)
+      throws Exception {
+    if (sourceDownloadURI.equals(finalSegmentLocationURI)) {
+      LOGGER.info("Skip copying segment as sourceDownloadURI: {} is the same as finalSegmentLocationURI",
+          sourceDownloadURI);
+    } else {
+      Preconditions.checkState(sourceDownloadURI.getScheme().equals(finalSegmentLocationURI.getScheme()));
+      LOGGER.info("Copying segment from: {} to: {}", sourceDownloadURI, finalSegmentLocationURI);
+      PinotFSFactory.create(finalSegmentLocationURI.getScheme()).copy(sourceDownloadURI, finalSegmentLocationURI);
+    }
+  }
 }
diff --git a/pinot-controller/src/test/java/org/apache/pinot/controller/api/upload/ZKOperatorTest.java b/pinot-controller/src/test/java/org/apache/pinot/controller/api/upload/ZKOperatorTest.java
index 28c33eb256..ef9910677d 100644
--- a/pinot-controller/src/test/java/org/apache/pinot/controller/api/upload/ZKOperatorTest.java
+++ b/pinot-controller/src/test/java/org/apache/pinot/controller/api/upload/ZKOperatorTest.java
@@ -18,29 +18,39 @@
  */
 package org.apache.pinot.controller.api.upload;
 
+import com.google.common.collect.ImmutableList;
 import java.io.File;
 import java.net.URI;
 import java.util.HashMap;
+import java.util.List;
 import java.util.Map;
 import javax.ws.rs.core.HttpHeaders;
 import javax.ws.rs.core.Response;
+import org.apache.commons.io.FileUtils;
 import org.apache.pinot.common.metadata.segment.SegmentZKMetadata;
 import org.apache.pinot.common.metrics.ControllerMetrics;
+import org.apache.pinot.common.utils.FileUploadDownloadClient.FileUploadType;
 import org.apache.pinot.common.utils.LLCSegmentName;
+import org.apache.pinot.common.utils.TarGzCompressionUtils;
 import org.apache.pinot.common.utils.URIUtils;
 import org.apache.pinot.controller.ControllerConf;
 import org.apache.pinot.controller.api.exception.ControllerApplicationException;
 import org.apache.pinot.controller.helix.ControllerTest;
 import org.apache.pinot.controller.helix.core.PinotHelixResourceManager;
+import org.apache.pinot.segment.local.segment.creator.impl.SegmentIndexCreationDriverImpl;
+import org.apache.pinot.segment.local.segment.readers.GenericRowRecordReader;
 import org.apache.pinot.segment.spi.SegmentMetadata;
+import org.apache.pinot.segment.spi.creator.SegmentGeneratorConfig;
 import org.apache.pinot.spi.config.table.TableConfig;
 import org.apache.pinot.spi.config.table.TableType;
 import org.apache.pinot.spi.data.FieldSpec.DataType;
 import org.apache.pinot.spi.data.Schema;
+import org.apache.pinot.spi.data.readers.GenericRow;
 import org.apache.pinot.spi.utils.CommonConstants.Segment.Realtime.Status;
 import org.apache.pinot.spi.utils.builder.TableConfigBuilder;
 import org.apache.pinot.spi.utils.builder.TableNameBuilder;
 import org.apache.pinot.util.TestUtils;
+import org.testng.Assert;
 import org.testng.annotations.AfterClass;
 import org.testng.annotations.BeforeClass;
 import org.testng.annotations.Test;
@@ -51,6 +61,9 @@ import static org.testng.Assert.*;
 
 
 public class ZKOperatorTest {
+  private static final File TEMP_DIR = new File(FileUtils.getTempDirectory(), "ZKOperatorTest");
+  private static final File SEGMENT_DIR = new File(TEMP_DIR, "segmentDir");
+  private static final File DATA_DIR = new File(TEMP_DIR, "dataDir");
   private static final String RAW_TABLE_NAME = "testTable";
   private static final String OFFLINE_TABLE_NAME = TableNameBuilder.OFFLINE.tableNameWithType(RAW_TABLE_NAME);
   private static final String REALTIME_TABLE_NAME = TableNameBuilder.REALTIME.tableNameWithType(RAW_TABLE_NAME);
@@ -66,6 +79,7 @@ public class ZKOperatorTest {
   @BeforeClass
   public void setUp()
       throws Exception {
+    FileUtils.deleteQuietly(TEMP_DIR);
     TEST_INSTANCE.setupSharedStateAndValidate();
     _resourceManager = TEST_INSTANCE.getHelixResourceManager();
 
@@ -93,6 +107,87 @@ public class ZKOperatorTest {
     return streamConfigs;
   }
 
+  private File generateSegment()
+      throws Exception {
+    Schema schema =
+        new Schema.SchemaBuilder().setSchemaName(RAW_TABLE_NAME).addSingleValueDimension("colA", DataType.INT).build();
+    TableConfig tableConfig = new TableConfigBuilder(TableType.OFFLINE).setTableName(RAW_TABLE_NAME).build();
+
+    SegmentGeneratorConfig config = new SegmentGeneratorConfig(tableConfig, schema);
+    File outputDir = new File(SEGMENT_DIR, "segment");
+    config.setOutDir(outputDir.getAbsolutePath());
+    config.setSegmentName(SEGMENT_NAME);
+    GenericRow row = new GenericRow();
+    row.putValue("colA", "100");
+    List<GenericRow> rows = ImmutableList.of(row);
+
+    SegmentIndexCreationDriverImpl driver = new SegmentIndexCreationDriverImpl();
+    driver.init(config, new GenericRowRecordReader(rows));
+    driver.build();
+    File segmentTar = new File(SEGMENT_DIR, SEGMENT_NAME + TarGzCompressionUtils.TAR_GZ_FILE_EXTENSION);
+    TarGzCompressionUtils.createTarGzFile(new File(outputDir, SEGMENT_NAME),
+        new File(SEGMENT_DIR, SEGMENT_NAME + TarGzCompressionUtils.TAR_GZ_FILE_EXTENSION));
+    FileUtils.deleteQuietly(outputDir);
+    return segmentTar;
+  }
+
+  private void checkSegmentZkMetadata(String segmentName, long crc, long creationTime) {
+    SegmentZKMetadata segmentZKMetadata = _resourceManager.getSegmentZKMetadata(OFFLINE_TABLE_NAME, segmentName);
+    assertNotNull(segmentZKMetadata);
+    assertEquals(segmentZKMetadata.getCrc(), crc);
+    assertEquals(segmentZKMetadata.getCreationTime(), creationTime);
+    long pushTime = segmentZKMetadata.getPushTime();
+    assertTrue(pushTime > 0);
+    assertEquals(segmentZKMetadata.getRefreshTime(), Long.MIN_VALUE);
+    assertEquals(segmentZKMetadata.getDownloadUrl(), "downloadUrl");
+    assertEquals(segmentZKMetadata.getCrypterName(), "crypter");
+    assertEquals(segmentZKMetadata.getSegmentUploadStartTime(), -1);
+    assertEquals(segmentZKMetadata.getSizeInBytes(), 10);
+  }
+
+  @Test
+  public void testMetadataUploadType()
+      throws Exception {
+    String segmentName = "metadataTest";
+    FileUtils.deleteQuietly(TEMP_DIR);
+    ZKOperator zkOperator = new ZKOperator(_resourceManager, mock(ControllerConf.class), mock(ControllerMetrics.class));
+
+    SegmentMetadata segmentMetadata = mock(SegmentMetadata.class);
+    when(segmentMetadata.getName()).thenReturn(segmentName);
+    when(segmentMetadata.getCrc()).thenReturn("12345");
+    when(segmentMetadata.getIndexCreationTime()).thenReturn(123L);
+    HttpHeaders httpHeaders = mock(HttpHeaders.class);
+
+    File segmentTar = generateSegment();
+    String sourceDownloadURIStr = segmentTar.toURI().toString();
+    File segmentFile = new File("metadataOnly");
+
+    // with finalSegmentLocation not null
+    File finalSegmentLocation = new File(DATA_DIR, segmentName);
+    Assert.assertFalse(finalSegmentLocation.exists());
+    zkOperator.completeSegmentOperations(OFFLINE_TABLE_NAME, segmentMetadata, FileUploadType.METADATA,
+        finalSegmentLocation.toURI(), segmentFile, sourceDownloadURIStr, "downloadUrl", "crypter", 10, true, true,
+        httpHeaders);
+    Assert.assertTrue(finalSegmentLocation.exists());
+    Assert.assertTrue(segmentTar.exists());
+    checkSegmentZkMetadata(segmentName, 12345L, 123L);
+
+    _resourceManager.deleteSegment(OFFLINE_TABLE_NAME, segmentName);
+    // Wait for the segment Zk entry to be deleted.
+    TestUtils.waitForCondition(aVoid -> {
+      SegmentZKMetadata segmentZKMetadata = _resourceManager.getSegmentZKMetadata(OFFLINE_TABLE_NAME, segmentName);
+      return segmentZKMetadata == null;
+    }, 30_000L, "Failed to delete segmentZkMetadata.");
+
+    FileUtils.deleteQuietly(DATA_DIR);
+    // with finalSegmentLocation null
+    zkOperator.completeSegmentOperations(OFFLINE_TABLE_NAME, segmentMetadata, FileUploadType.METADATA, null,
+        segmentFile, sourceDownloadURIStr, "downloadUrl", "crypter", 10, true, true, httpHeaders);
+    Assert.assertFalse(finalSegmentLocation.exists());
+    Assert.assertTrue(segmentTar.exists());
+    checkSegmentZkMetadata(segmentName, 12345L, 123L);
+  }
+
   @Test
   public void testCompleteSegmentOperations()
       throws Exception {
@@ -110,9 +205,8 @@ public class ZKOperatorTest {
       URI finalSegmentLocationURI =
           URIUtils.getUri("mockPath", OFFLINE_TABLE_NAME, URIUtils.encode(segmentMetadata.getName()));
       File segmentFile = new File(new File("foo/bar"), "mockChild");
-
-      zkOperator.completeSegmentOperations(OFFLINE_TABLE_NAME, segmentMetadata, finalSegmentLocationURI, segmentFile,
-          "downloadUrl", "crypter", 10, true, true, httpHeaders);
+      zkOperator.completeSegmentOperations(OFFLINE_TABLE_NAME, segmentMetadata, FileUploadType.SEGMENT,
+          finalSegmentLocationURI, segmentFile, "downloadUrl", "downloadUrl", "crypter", 10, true, true, httpHeaders);
       fail();
     } catch (Exception e) {
       // Expected
@@ -124,8 +218,8 @@ public class ZKOperatorTest {
       return segmentZKMetadata == null;
     }, 30_000L, "Failed to delete segmentZkMetadata.");
 
-    zkOperator.completeSegmentOperations(OFFLINE_TABLE_NAME, segmentMetadata, null, null, "downloadUrl", "crypter", 10,
-        true, true, httpHeaders);
+    zkOperator.completeSegmentOperations(OFFLINE_TABLE_NAME, segmentMetadata, FileUploadType.SEGMENT, null, null,
+        "downloadUrl", "downloadUrl", "crypter", 10, true, true, httpHeaders);
 
     SegmentZKMetadata segmentZKMetadata = _resourceManager.getSegmentZKMetadata(OFFLINE_TABLE_NAME, SEGMENT_NAME);
     assertNotNull(segmentZKMetadata);
@@ -141,8 +235,8 @@ public class ZKOperatorTest {
 
     // Upload the same segment with allowRefresh = false. Validate that an exception is thrown.
     try {
-      zkOperator.completeSegmentOperations(OFFLINE_TABLE_NAME, segmentMetadata, null, null, "otherDownloadUrl",
-          "otherCrypter", 10, true, false, httpHeaders);
+      zkOperator.completeSegmentOperations(OFFLINE_TABLE_NAME, segmentMetadata, FileUploadType.SEGMENT, null, null,
+          "otherDownloadUrl", "otherDownloadUrl", "otherCrypter", 10, true, false, httpHeaders);
       fail();
     } catch (Exception e) {
       // Expected
@@ -151,8 +245,8 @@ public class ZKOperatorTest {
     // Refresh the segment with unmatched IF_MATCH field
     when(httpHeaders.getHeaderString(HttpHeaders.IF_MATCH)).thenReturn("123");
     try {
-      zkOperator.completeSegmentOperations(OFFLINE_TABLE_NAME, segmentMetadata, null, null, "otherDownloadUrl",
-          "otherCrypter", 10, true, true, httpHeaders);
+      zkOperator.completeSegmentOperations(OFFLINE_TABLE_NAME, segmentMetadata, FileUploadType.SEGMENT, null, null,
+          "otherDownloadUrl", "otherDownloadUrl", "otherCrypter", 10, true, true, httpHeaders);
       fail();
     } catch (Exception e) {
       // Expected
@@ -162,8 +256,8 @@ public class ZKOperatorTest {
     // downloadURL and crypter
     when(httpHeaders.getHeaderString(HttpHeaders.IF_MATCH)).thenReturn("12345");
     when(segmentMetadata.getIndexCreationTime()).thenReturn(456L);
-    zkOperator.completeSegmentOperations(OFFLINE_TABLE_NAME, segmentMetadata, null, null, "otherDownloadUrl",
-        "otherCrypter", 10, true, true, httpHeaders);
+    zkOperator.completeSegmentOperations(OFFLINE_TABLE_NAME, segmentMetadata, FileUploadType.SEGMENT, null, null,
+        "otherDownloadUrl", "otherDownloadUrl", "otherCrypter", 10, true, true, httpHeaders);
 
     segmentZKMetadata = _resourceManager.getSegmentZKMetadata(OFFLINE_TABLE_NAME, SEGMENT_NAME);
     assertNotNull(segmentZKMetadata);
@@ -185,8 +279,8 @@ public class ZKOperatorTest {
     when(segmentMetadata.getIndexCreationTime()).thenReturn(789L);
     // Add a tiny sleep to guarantee that refresh time is different from the previous round
     Thread.sleep(10);
-    zkOperator.completeSegmentOperations(OFFLINE_TABLE_NAME, segmentMetadata, null, null, "otherDownloadUrl",
-        "otherCrypter", 100, true, true, httpHeaders);
+    zkOperator.completeSegmentOperations(OFFLINE_TABLE_NAME, segmentMetadata, FileUploadType.SEGMENT, null, null,
+        "otherDownloadUrl", "otherDownloadUrl", "otherCrypter", 100, true, true, httpHeaders);
 
     segmentZKMetadata = _resourceManager.getSegmentZKMetadata(OFFLINE_TABLE_NAME, SEGMENT_NAME);
     assertNotNull(segmentZKMetadata);
@@ -209,8 +303,8 @@ public class ZKOperatorTest {
     SegmentMetadata segmentMetadata = mock(SegmentMetadata.class);
     when(segmentMetadata.getName()).thenReturn(SEGMENT_NAME);
     when(segmentMetadata.getCrc()).thenReturn("12345");
-    zkOperator.completeSegmentOperations(REALTIME_TABLE_NAME, segmentMetadata, null, null, "downloadUrl", null, 10,
-        true, true, mock(HttpHeaders.class));
+    zkOperator.completeSegmentOperations(REALTIME_TABLE_NAME, segmentMetadata, FileUploadType.SEGMENT, null, null,
+        "downloadUrl", "downloadUrl", null, 10, true, true, mock(HttpHeaders.class));
 
     SegmentZKMetadata segmentZKMetadata = _resourceManager.getSegmentZKMetadata(REALTIME_TABLE_NAME, SEGMENT_NAME);
     assertNotNull(segmentZKMetadata);
@@ -222,8 +316,8 @@ public class ZKOperatorTest {
     when(segmentMetadata.getName()).thenReturn(LLC_SEGMENT_NAME);
     when(segmentMetadata.getCrc()).thenReturn("23456");
     try {
-      zkOperator.completeSegmentOperations(REALTIME_TABLE_NAME, segmentMetadata, null, null, "downloadUrl", null, 10,
-          true, true, mock(HttpHeaders.class));
+      zkOperator.completeSegmentOperations(REALTIME_TABLE_NAME, segmentMetadata, FileUploadType.SEGMENT, null, null,
+          "downloadUrl", "downloadUrl", null, 10, true, true, mock(HttpHeaders.class));
       fail();
     } catch (ControllerApplicationException e) {
       assertEquals(e.getResponse().getStatus(), Response.Status.BAD_REQUEST.getStatusCode());
@@ -233,8 +327,8 @@ public class ZKOperatorTest {
     // Uploading a segment with LLC segment name and start/end offset should success
     when(segmentMetadata.getStartOffset()).thenReturn("0");
     when(segmentMetadata.getEndOffset()).thenReturn("1234");
-    zkOperator.completeSegmentOperations(REALTIME_TABLE_NAME, segmentMetadata, null, null, "downloadUrl", null, 10,
-        true, true, mock(HttpHeaders.class));
+    zkOperator.completeSegmentOperations(REALTIME_TABLE_NAME, segmentMetadata, FileUploadType.SEGMENT, null, null,
+        "downloadUrl", "downloadUrl", null, 10, true, true, mock(HttpHeaders.class));
 
     segmentZKMetadata = _resourceManager.getSegmentZKMetadata(REALTIME_TABLE_NAME, LLC_SEGMENT_NAME);
     assertNotNull(segmentZKMetadata);
@@ -246,8 +340,8 @@ public class ZKOperatorTest {
     when(segmentMetadata.getCrc()).thenReturn("34567");
     when(segmentMetadata.getStartOffset()).thenReturn(null);
     when(segmentMetadata.getEndOffset()).thenReturn(null);
-    zkOperator.completeSegmentOperations(REALTIME_TABLE_NAME, segmentMetadata, null, null, "downloadUrl", null, 10,
-        true, true, mock(HttpHeaders.class));
+    zkOperator.completeSegmentOperations(REALTIME_TABLE_NAME, segmentMetadata, FileUploadType.SEGMENT, null, null,
+        "downloadUrl", "downloadUrl", null, 10, true, true, mock(HttpHeaders.class));
 
     segmentZKMetadata = _resourceManager.getSegmentZKMetadata(REALTIME_TABLE_NAME, LLC_SEGMENT_NAME);
     assertNotNull(segmentZKMetadata);
@@ -259,8 +353,8 @@ public class ZKOperatorTest {
     when(segmentMetadata.getCrc()).thenReturn("45678");
     when(segmentMetadata.getStartOffset()).thenReturn("1234");
     when(segmentMetadata.getEndOffset()).thenReturn("2345");
-    zkOperator.completeSegmentOperations(REALTIME_TABLE_NAME, segmentMetadata, null, null, "downloadUrl", null, 10,
-        true, true, mock(HttpHeaders.class));
+    zkOperator.completeSegmentOperations(REALTIME_TABLE_NAME, segmentMetadata, FileUploadType.SEGMENT, null, null,
+        "downloadUrl", "downloadUrl", null, 10, true, true, mock(HttpHeaders.class));
 
     segmentZKMetadata = _resourceManager.getSegmentZKMetadata(REALTIME_TABLE_NAME, LLC_SEGMENT_NAME);
     assertNotNull(segmentZKMetadata);
@@ -271,6 +365,7 @@ public class ZKOperatorTest {
 
   @AfterClass
   public void tearDown() {
+    FileUtils.deleteQuietly(TEMP_DIR);
     TEST_INSTANCE.cleanup();
   }
 }
diff --git a/pinot-integration-test-base/src/test/java/org/apache/pinot/integration/tests/ClusterIntegrationTestUtils.java b/pinot-integration-test-base/src/test/java/org/apache/pinot/integration/tests/ClusterIntegrationTestUtils.java
index fbea4e6220..21abfd6430 100644
--- a/pinot-integration-test-base/src/test/java/org/apache/pinot/integration/tests/ClusterIntegrationTestUtils.java
+++ b/pinot-integration-test-base/src/test/java/org/apache/pinot/integration/tests/ClusterIntegrationTestUtils.java
@@ -283,12 +283,28 @@ public class ClusterIntegrationTestUtils {
   public static void buildSegmentFromAvro(File avroFile, TableConfig tableConfig,
       org.apache.pinot.spi.data.Schema schema, int segmentIndex, File segmentDir, File tarDir)
       throws Exception {
+    // Test segment with space and special character in the file name
+    buildSegmentFromAvro(avroFile, tableConfig, schema, segmentIndex + " %", segmentDir, tarDir);
+  }
+
+  /**
+   * Builds one Pinot segment from the given Avro file.
+   *
+   * @param avroFile Avro file
+   * @param tableConfig Pinot table config
+   * @param schema Pinot schema
+   * @param segmentNamePostfix Segment name postfix
+   * @param segmentDir Output directory for the un-tarred segments
+   * @param tarDir Output directory for the tarred segments
+   */
+  public static void buildSegmentFromAvro(File avroFile, TableConfig tableConfig,
+      org.apache.pinot.spi.data.Schema schema, String segmentNamePostfix, File segmentDir, File tarDir)
+      throws Exception {
     SegmentGeneratorConfig segmentGeneratorConfig = new SegmentGeneratorConfig(tableConfig, schema);
     segmentGeneratorConfig.setInputFilePath(avroFile.getPath());
     segmentGeneratorConfig.setOutDir(segmentDir.getPath());
     segmentGeneratorConfig.setTableName(tableConfig.getTableName());
-    // Test segment with space and special character in the file name
-    segmentGeneratorConfig.setSegmentNamePostfix(segmentIndex + " %");
+    segmentGeneratorConfig.setSegmentNamePostfix(segmentNamePostfix);
 
     // Build the segment
     SegmentIndexCreationDriver driver = new SegmentIndexCreationDriverImpl();
diff --git a/pinot-integration-tests/src/test/java/org/apache/pinot/integration/tests/SegmentUploadIntegrationTest.java b/pinot-integration-tests/src/test/java/org/apache/pinot/integration/tests/SegmentUploadIntegrationTest.java
new file mode 100644
index 0000000000..ae9d3989b6
--- /dev/null
+++ b/pinot-integration-tests/src/test/java/org/apache/pinot/integration/tests/SegmentUploadIntegrationTest.java
@@ -0,0 +1,229 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.pinot.integration.tests;
+
+import com.fasterxml.jackson.databind.JsonNode;
+import com.google.common.base.Function;
+import com.google.common.collect.Lists;
+import java.io.File;
+import java.io.IOException;
+import java.util.List;
+import java.util.Map;
+import javax.annotation.Nullable;
+import org.apache.commons.io.FileUtils;
+import org.apache.pinot.plugin.ingestion.batch.standalone.SegmentMetadataPushJobRunner;
+import org.apache.pinot.spi.config.table.TableConfig;
+import org.apache.pinot.spi.config.table.TableType;
+import org.apache.pinot.spi.data.Schema;
+import org.apache.pinot.spi.ingestion.batch.spec.PinotClusterSpec;
+import org.apache.pinot.spi.ingestion.batch.spec.PinotFSSpec;
+import org.apache.pinot.spi.ingestion.batch.spec.PushJobSpec;
+import org.apache.pinot.spi.ingestion.batch.spec.SegmentGenerationJobSpec;
+import org.apache.pinot.spi.ingestion.batch.spec.TableSpec;
+import org.apache.pinot.spi.utils.JsonUtils;
+import org.apache.pinot.util.TestUtils;
+import org.testng.Assert;
+import org.testng.annotations.AfterClass;
+import org.testng.annotations.BeforeClass;
+import org.testng.annotations.Test;
+
+
+/**
+ * Test for advanced push types.
+ * Currently only tests METADATA push type.
+ * todo: add test for URI push
+ */
+public class SegmentUploadIntegrationTest extends BaseClusterIntegrationTest {
+
+  @Override
+  protected Map<String, String> getStreamConfigs() {
+    return null;
+  }
+
+  @Override
+  protected String getSortedColumn() {
+    return null;
+  }
+
+  @Override
+  protected List<String> getInvertedIndexColumns() {
+    return null;
+  }
+
+  @Override
+  protected List<String> getNoDictionaryColumns() {
+    return null;
+  }
+
+  @Override
+  protected List<String> getRangeIndexColumns() {
+    return null;
+  }
+
+  @Override
+  protected List<String> getBloomFilterColumns() {
+    return null;
+  }
+
+  @BeforeClass
+  public void setUp()
+      throws Exception {
+    TestUtils.ensureDirectoriesExistAndEmpty(_tempDir, _segmentDir, _tarDir);
+    // Start Zk and Kafka
+    startZk();
+
+    // Start the Pinot cluster
+    startController();
+    startBroker();
+    startServer();
+  }
+
+  @Test
+  public void testUploadAndQuery()
+      throws Exception {
+    // Create and upload the schema and table config
+    Schema schema = createSchema();
+    addSchema(schema);
+    TableConfig offlineTableConfig = createOfflineTableConfig();
+    addTableConfig(offlineTableConfig);
+
+    List<File> avroFiles = getAllAvroFiles();
+
+    // Create 1 segment, for METADATA push WITH move to final location
+    ClusterIntegrationTestUtils.buildSegmentFromAvro(avroFiles.get(0), offlineTableConfig, schema, "_with_move",
+        _segmentDir, _tarDir);
+
+    SegmentMetadataPushJobRunner runner = new SegmentMetadataPushJobRunner();
+    SegmentGenerationJobSpec jobSpec = new SegmentGenerationJobSpec();
+    PushJobSpec pushJobSpec = new PushJobSpec();
+    // set moveToDeepStoreForMetadataPush to true
+    pushJobSpec.setCopyToDeepStoreForMetadataPush(true);
+    jobSpec.setPushJobSpec(pushJobSpec);
+    PinotFSSpec fsSpec = new PinotFSSpec();
+    fsSpec.setScheme("file");
+    fsSpec.setClassName("org.apache.pinot.spi.filesystem.LocalPinotFS");
+    jobSpec.setPinotFSSpecs(Lists.newArrayList(fsSpec));
+    jobSpec.setOutputDirURI(_tarDir.getAbsolutePath());
+    TableSpec tableSpec = new TableSpec();
+    tableSpec.setTableName(DEFAULT_TABLE_NAME);
+    jobSpec.setTableSpec(tableSpec);
+    PinotClusterSpec clusterSpec = new PinotClusterSpec();
+    clusterSpec.setControllerURI(_controllerBaseApiUrl);
+    jobSpec.setPinotClusterSpecs(new PinotClusterSpec[]{clusterSpec});
+
+    File dataDir = new File(_controllerConfig.getDataDir());
+    File dataDirSegments = new File(dataDir, DEFAULT_TABLE_NAME);
+
+    // Not present in dataDir, only present in sourceDir
+    Assert.assertFalse(dataDirSegments.exists());
+    Assert.assertEquals(_tarDir.listFiles().length, 1);
+
+    runner.init(jobSpec);
+    runner.run();
+
+    // Segment should be seen in dataDir
+    Assert.assertTrue(dataDirSegments.exists());
+    Assert.assertEquals(dataDirSegments.listFiles().length, 1);
+    Assert.assertEquals(_tarDir.listFiles().length, 1);
+
+    // test segment loaded
+    JsonNode segmentsList = getSegmentsList();
+    Assert.assertEquals(segmentsList.size(), 1);
+    String segmentNameWithMove = segmentsList.get(0).asText();
+    Assert.assertTrue(segmentNameWithMove.endsWith("_with_move"));
+    long numDocs = getNumDocs(segmentNameWithMove);
+    testCountStar(numDocs);
+
+    // Clear segment and tar dir
+    for (File segment : _segmentDir.listFiles()) {
+      FileUtils.deleteQuietly(segment);
+    }
+    for (File tar : _tarDir.listFiles()) {
+      FileUtils.deleteQuietly(tar);
+    }
+
+    // Create 1 segment, for METADATA push WITHOUT move to final location
+    ClusterIntegrationTestUtils.buildSegmentFromAvro(avroFiles.get(1), offlineTableConfig, schema, "_without_move",
+        _segmentDir, _tarDir);
+    jobSpec.setPushJobSpec(new PushJobSpec());
+    runner = new SegmentMetadataPushJobRunner();
+
+    Assert.assertEquals(dataDirSegments.listFiles().length, 1);
+    Assert.assertEquals(_tarDir.listFiles().length, 1);
+
+    runner.init(jobSpec);
+    runner.run();
+
+    // should not see new segments in dataDir
+    Assert.assertEquals(dataDirSegments.listFiles().length, 1);
+    Assert.assertEquals(_tarDir.listFiles().length, 1);
+
+    // test segment loaded
+    segmentsList = getSegmentsList();
+    Assert.assertEquals(segmentsList.size(), 2);
+    String segmentNameWithoutMove = null;
+    for (JsonNode segment : segmentsList) {
+      if (segment.asText().endsWith("_without_move")) {
+        segmentNameWithoutMove = segment.asText();
+      }
+    }
+    Assert.assertNotNull(segmentNameWithoutMove);
+    numDocs += getNumDocs(segmentNameWithoutMove);
+    testCountStar(numDocs);
+  }
+
+  private long getNumDocs(String segmentName)
+      throws IOException {
+    return JsonUtils.stringToJsonNode(
+            sendGetRequest(_controllerRequestURLBuilder.forSegmentMetadata(DEFAULT_TABLE_NAME, segmentName)))
+        .get("segment.total.docs").asLong();
+  }
+
+  private JsonNode getSegmentsList()
+      throws IOException {
+    return JsonUtils.stringToJsonNode(sendGetRequest(
+            _controllerRequestURLBuilder.forSegmentListAPIWithTableType(DEFAULT_TABLE_NAME,
+                TableType.OFFLINE.toString())))
+        .get(0).get("OFFLINE");
+  }
+
+  protected void testCountStar(final long countStarResult) {
+    TestUtils.waitForCondition(new Function<Void, Boolean>() {
+      @Nullable
+      @Override
+      public Boolean apply(@Nullable Void aVoid) {
+        try {
+          return getCurrentCountStarResult() == countStarResult;
+        } catch (Exception e) {
+          return null;
+        }
+      }
+    }, 100L, 300_000, "Failed to load " + countStarResult + " documents", true);
+  }
+
+  @AfterClass
+  public void tearDown()
+      throws Exception {
+    dropOfflineTable(getTableName());
+    stopServer();
+    stopBroker();
+    stopController();
+    stopZk();
+  }
+}
diff --git a/pinot-segment-local/src/main/java/org/apache/pinot/segment/local/utils/SegmentPushUtils.java b/pinot-segment-local/src/main/java/org/apache/pinot/segment/local/utils/SegmentPushUtils.java
index 1d91e5bb4e..b756c7f760 100644
--- a/pinot-segment-local/src/main/java/org/apache/pinot/segment/local/utils/SegmentPushUtils.java
+++ b/pinot-segment-local/src/main/java/org/apache/pinot/segment/local/utils/SegmentPushUtils.java
@@ -264,6 +264,10 @@ public class SegmentPushUtils implements Serializable {
               headers.add(new BasicHeader(FileUploadDownloadClient.CustomHeaders.DOWNLOAD_URI, segmentUriPath));
               headers.add(new BasicHeader(FileUploadDownloadClient.CustomHeaders.UPLOAD_TYPE,
                   FileUploadDownloadClient.FileUploadType.METADATA.toString()));
+              if (spec.getPushJobSpec() != null) {
+                headers.add(new BasicHeader(FileUploadDownloadClient.CustomHeaders.COPY_SEGMENT_TO_DEEP_STORE,
+                    String.valueOf(spec.getPushJobSpec().getCopyToDeepStoreForMetadataPush())));
+              }
               headers.addAll(AuthProviderUtils.toRequestHeaders(authProvider));
 
               SimpleHttpResponse response = FILE_UPLOAD_DOWNLOAD_CLIENT
diff --git a/pinot-spi/src/main/java/org/apache/pinot/spi/ingestion/batch/spec/PushJobSpec.java b/pinot-spi/src/main/java/org/apache/pinot/spi/ingestion/batch/spec/PushJobSpec.java
index 40944978ca..04481baf1f 100644
--- a/pinot-spi/src/main/java/org/apache/pinot/spi/ingestion/batch/spec/PushJobSpec.java
+++ b/pinot-spi/src/main/java/org/apache/pinot/spi/ingestion/batch/spec/PushJobSpec.java
@@ -41,6 +41,11 @@ public class PushJobSpec implements Serializable {
    */
   private long _pushRetryIntervalMillis = 1000;
 
+  /**
+   * Applicable for URI and METADATA push types.
+   * If true, and if segment was not already in the deep store, move it to deep store.
+   */
+  private boolean _copyToDeepStoreForMetadataPush;
   /**
    * Used in SegmentUriPushJobRunner, which is used to composite the segment uri to send to pinot controller.
    * The URI sends to controller is in the format ${segmentUriPrefix}${segmentPath}${segmentUriSuffix}
@@ -121,4 +126,12 @@ public class PushJobSpec implements Serializable {
   public void setPushParallelism(int pushParallelism) {
     _pushParallelism = pushParallelism;
   }
+
+  public boolean getCopyToDeepStoreForMetadataPush() {
+    return _copyToDeepStoreForMetadataPush;
+  }
+
+  public void setCopyToDeepStoreForMetadataPush(boolean copyToDeepStoreForMetadataPush) {
+    _copyToDeepStoreForMetadataPush = copyToDeepStoreForMetadataPush;
+  }
 }


---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscribe@pinot.apache.org
For additional commands, e-mail: commits-help@pinot.apache.org