You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@pinot.apache.org by xi...@apache.org on 2020/09/30 04:47:44 UTC

[incubator-pinot] branch master updated: Adding push job type of segment metadata only mode (#5967)

This is an automated email from the ASF dual-hosted git repository.

xiangfu pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/incubator-pinot.git


The following commit(s) were added to refs/heads/master by this push:
     new 4f2e767  Adding push job type of segment metadata only mode (#5967)
4f2e767 is described below

commit 4f2e767f5ccd123e689ecab719a83f0c16f41b2c
Author: Xiang Fu <fx...@gmail.com>
AuthorDate: Tue Sep 29 21:47:27 2020 -0700

    Adding push job type of segment metadata only mode (#5967)
    
    * Adding push job type of segment metadata only mode
    
    * Adding clean up for temp files
    
    * Address comments
    
    * Adding integration tests
---
 .../common/utils/FileUploadDownloadClient.java     |  13 +-
 .../PinotSegmentUploadDownloadRestletResource.java |  23 ++--
 .../pinot/integration/tests/ClusterTest.java       |  44 +++++-
 .../ingestion/batch/common/SegmentPushUtils.java   | 147 +++++++++++++++++++++
 .../hadoop/HadoopSegmentMetadataPushJobRunner.java | 104 +++++++++++++++
 .../spark/SparkSegmentMetadataPushJobRunner.java   | 131 ++++++++++++++++++
 .../segmentCreationAndMetadataPushJobSpec.yaml     |  53 ++++++++
 .../standalone/SegmentMetadataPushJobRunner.java   |  91 +++++++++++++
 .../spi/ingestion/batch/IngestionJobLauncher.java  |   9 +-
 .../ingestion/batch/runner/IngestionJobRunner.java |   1 +
 .../batch/spec/ExecutionFrameworkSpec.java         |  19 ++-
 .../batch/spec/SegmentGenerationJobSpec.java       |   2 +
 12 files changed, 606 insertions(+), 31 deletions(-)

diff --git a/pinot-common/src/main/java/org/apache/pinot/common/utils/FileUploadDownloadClient.java b/pinot-common/src/main/java/org/apache/pinot/common/utils/FileUploadDownloadClient.java
index b18f1e2..4c2e30e 100644
--- a/pinot-common/src/main/java/org/apache/pinot/common/utils/FileUploadDownloadClient.java
+++ b/pinot-common/src/main/java/org/apache/pinot/common/utils/FileUploadDownloadClient.java
@@ -87,7 +87,7 @@ public class FileUploadDownloadClient implements Closeable {
   }
 
   public enum FileUploadType {
-    URI, JSON, SEGMENT;
+    URI, JSON, SEGMENT, METADATA;
 
     public static FileUploadType getDefaultUploadType() {
       return SEGMENT;
@@ -103,7 +103,6 @@ public class FileUploadDownloadClient implements Closeable {
   private static final String SCHEMA_PATH = "/schemas";
   private static final String OLD_SEGMENT_PATH = "/segments";
   private static final String SEGMENT_PATH = "/v2/segments";
-  private static final String SEGMENT_METADATA_PATH = "/segmentmetadata";
   private static final String TABLES_PATH = "/tables";
   private static final String TYPE_DELIMITER = "?type=";
 
@@ -194,16 +193,6 @@ public class FileUploadDownloadClient implements Closeable {
     return getURI(HTTP, host, port, SEGMENT_PATH);
   }
 
-  public static URI getUploadSegmentMetadataHttpURI(String host, int port)
-      throws URISyntaxException {
-    return getURI(HTTP, host, port, SEGMENT_METADATA_PATH);
-  }
-
-  public static URI getUploadSegmentMetadataHttpsURI(String host, int port)
-      throws URISyntaxException {
-    return getURI(HTTPS, host, port, SEGMENT_METADATA_PATH);
-  }
-
   public static URI getUploadSegmentHttpsURI(String host, int port)
       throws URISyntaxException {
     return getURI(HTTPS, host, port, SEGMENT_PATH);
diff --git a/pinot-controller/src/main/java/org/apache/pinot/controller/api/resources/PinotSegmentUploadDownloadRestletResource.java b/pinot-controller/src/main/java/org/apache/pinot/controller/api/resources/PinotSegmentUploadDownloadRestletResource.java
index 8da5b53..2ae966d 100644
--- a/pinot-controller/src/main/java/org/apache/pinot/controller/api/resources/PinotSegmentUploadDownloadRestletResource.java
+++ b/pinot-controller/src/main/java/org/apache/pinot/controller/api/resources/PinotSegmentUploadDownloadRestletResource.java
@@ -195,7 +195,6 @@ public class PinotSegmentUploadDownloadRestletResource {
       crypterClassNameInHeader = extractHttpHeader(headers, FileUploadDownloadClient.CustomHeaders.CRYPTER);
       downloadUri = extractHttpHeader(headers, FileUploadDownloadClient.CustomHeaders.DOWNLOAD_URI);
     }
-
     File tempEncryptedFile = null;
     File tempDecryptedFile = null;
     File tempSegmentDir = null;
@@ -207,9 +206,8 @@ public class PinotSegmentUploadDownloadRestletResource {
       tempSegmentDir = new File(provider.getUntarredFileTempDir(), tempFileName);
 
       boolean uploadedSegmentIsEncrypted = !Strings.isNullOrEmpty(crypterClassNameInHeader);
-
-      File dstFile = uploadedSegmentIsEncrypted ? tempEncryptedFile : tempDecryptedFile;
       FileUploadDownloadClient.FileUploadType uploadType = getUploadType(uploadTypeStr);
+      File dstFile = uploadedSegmentIsEncrypted ? tempEncryptedFile : tempDecryptedFile;
       switch (uploadType) {
         case URI:
           downloadSegmentFileFromURI(downloadUri, dstFile, tableName);
@@ -217,6 +215,11 @@ public class PinotSegmentUploadDownloadRestletResource {
         case SEGMENT:
           createSegmentFileFromMultipart(multiPart, dstFile);
           break;
+        case METADATA:
+          moveSegmentToFinalLocation = false;
+          Preconditions.checkState(downloadUri != null, "Download URI is required in segment metadata upload mode");
+          createSegmentFileFromMultipart(multiPart, dstFile);
+          break;
         default:
           throw new UnsupportedOperationException("Unsupported upload type: " + uploadType);
       }
@@ -235,11 +238,11 @@ public class PinotSegmentUploadDownloadRestletResource {
       String rawTableName;
       if (tableName != null && !tableName.isEmpty()) {
         rawTableName = TableNameBuilder.extractRawTableName(tableName);
-        LOGGER.info("Uploading segment {} to table: {}, (Derived from API parameter)", segmentName, tableName);
+        LOGGER.info("Uploading a segment {} to table: {}, push type {}, (Derived from API parameter)", segmentName, tableName, uploadType);
       } else {
         // TODO: remove this when we completely deprecate the table name from segment metadata
         rawTableName = segmentMetadata.getTableName();
-        LOGGER.info("Uploading a segment {} to table: {}, (Derived from segment metadata)", segmentName, tableName);
+        LOGGER.info("Uploading a segment {} to table: {}, push type {}, (Derived from segment metadata)", segmentName, tableName, uploadType);
       }
 
       String offlineTableName = TableNameBuilder.OFFLINE.tableNameWithType(rawTableName);
@@ -247,10 +250,12 @@ public class PinotSegmentUploadDownloadRestletResource {
       LOGGER.info("Processing upload request for segment: {} of table: {} from client: {}, ingestion descriptor: {}",
           segmentName, offlineTableName, clientAddress, ingestionDescriptor);
 
-      // Validate segment
-      new SegmentValidator(_pinotHelixResourceManager, _controllerConf, _executor, _connectionManager,
-          _controllerMetrics, _leadControllerManager.isLeaderForTable(offlineTableName))
-          .validateOfflineSegment(offlineTableName, segmentMetadata, tempSegmentDir);
+      // Skip segment validation if upload only segment metadata
+      if (uploadType != FileUploadDownloadClient.FileUploadType.METADATA) {
+        // Validate segment
+        new SegmentValidator(_pinotHelixResourceManager, _controllerConf, _executor, _connectionManager,
+            _controllerMetrics, _leadControllerManager.isLeaderForTable(offlineTableName)).validateOfflineSegment(offlineTableName, segmentMetadata, tempSegmentDir);
+      }
 
       // Encrypt segment
       String crypterClassNameInTableConfig =
diff --git a/pinot-integration-tests/src/test/java/org/apache/pinot/integration/tests/ClusterTest.java b/pinot-integration-tests/src/test/java/org/apache/pinot/integration/tests/ClusterTest.java
index d603fee..6ffc836 100644
--- a/pinot-integration-tests/src/test/java/org/apache/pinot/integration/tests/ClusterTest.java
+++ b/pinot-integration-tests/src/test/java/org/apache/pinot/integration/tests/ClusterTest.java
@@ -22,9 +22,14 @@ import static org.testng.Assert.assertEquals;
 import static org.testng.Assert.assertNotNull;
 import static org.testng.Assert.assertTrue;
 
+import com.google.common.collect.ImmutableList;
 import java.io.File;
+import java.io.IOException;
 import java.net.URI;
+import java.net.URLEncoder;
+import java.nio.charset.StandardCharsets;
 import java.util.ArrayList;
+import java.util.Arrays;
 import java.util.HashMap;
 import java.util.List;
 import java.util.Map;
@@ -42,8 +47,13 @@ import org.apache.avro.generic.GenericRecord;
 import org.apache.avro.io.DatumReader;
 import org.apache.avro.io.DecoderFactory;
 import org.apache.commons.io.FileUtils;
+import org.apache.http.Header;
 import org.apache.http.HttpStatus;
+import org.apache.http.NameValuePair;
+import org.apache.http.message.BasicHeader;
+import org.apache.http.message.BasicNameValuePair;
 import org.apache.pinot.broker.broker.helix.HelixBrokerStarter;
+import org.apache.pinot.common.exception.HttpErrorStatusException;
 import org.apache.pinot.core.requesthandler.PinotQueryRequest;
 import org.apache.pinot.common.utils.CommonConstants.Broker;
 import org.apache.pinot.common.utils.CommonConstants.Helix;
@@ -253,17 +263,25 @@ public abstract class ClusterTest extends ControllerTest {
     try (FileUploadDownloadClient fileUploadDownloadClient = new FileUploadDownloadClient()) {
       if (numSegments == 1) {
         File segmentTarFile = segmentTarFiles[0];
-        assertEquals(fileUploadDownloadClient
-                .uploadSegment(uploadSegmentHttpURI, segmentTarFile.getName(), segmentTarFile, tableName).getStatusCode(),
-            HttpStatus.SC_OK);
+        if (System.currentTimeMillis() % 2 == 0) {
+          assertEquals(fileUploadDownloadClient.uploadSegment(uploadSegmentHttpURI, segmentTarFile.getName(), segmentTarFile, tableName).getStatusCode(),
+              HttpStatus.SC_OK);
+        } else {
+          assertEquals(
+              uploadSegmentWithOnlyMetadata(tableName, uploadSegmentHttpURI, fileUploadDownloadClient, segmentTarFile), HttpStatus.SC_OK);
+        }
       } else {
         // Upload all segments in parallel
         ExecutorService executorService = Executors.newFixedThreadPool(numSegments);
         List<Future<Integer>> futures = new ArrayList<>(numSegments);
         for (File segmentTarFile : segmentTarFiles) {
-          futures.add(executorService.submit(() -> fileUploadDownloadClient
-              .uploadSegment(uploadSegmentHttpURI, segmentTarFile.getName(), segmentTarFile, tableName)
-              .getStatusCode()));
+          futures.add(executorService.submit(() -> {
+            if (System.currentTimeMillis() % 2 == 0) {
+              return fileUploadDownloadClient.uploadSegment(uploadSegmentHttpURI, segmentTarFile.getName(), segmentTarFile, tableName).getStatusCode();
+            } else {
+              return uploadSegmentWithOnlyMetadata(tableName, uploadSegmentHttpURI, fileUploadDownloadClient, segmentTarFile);
+            }
+          }));
         }
         executorService.shutdown();
         for (Future<Integer> future : futures) {
@@ -273,6 +291,20 @@ public abstract class ClusterTest extends ControllerTest {
     }
   }
 
+  private int uploadSegmentWithOnlyMetadata(String tableName, URI uploadSegmentHttpURI,
+      FileUploadDownloadClient fileUploadDownloadClient, File segmentTarFile)
+      throws IOException, HttpErrorStatusException {
+    List<Header> headers = ImmutableList.of(new BasicHeader(FileUploadDownloadClient.CustomHeaders.DOWNLOAD_URI,
+            "file://" + segmentTarFile.getParentFile().getAbsolutePath() + "/" + URLEncoder.encode(segmentTarFile.getName(), StandardCharsets.UTF_8.toString())),
+        new BasicHeader(FileUploadDownloadClient.CustomHeaders.UPLOAD_TYPE, FileUploadDownloadClient.FileUploadType.METADATA.toString()));
+    // Add table name as a request parameter
+    NameValuePair
+        tableNameValuePair = new BasicNameValuePair(FileUploadDownloadClient.QueryParameters.TABLE_NAME, tableName);
+    List<NameValuePair> parameters = Arrays.asList(tableNameValuePair);
+    return fileUploadDownloadClient
+        .uploadSegmentMetadata(uploadSegmentHttpURI, segmentTarFile.getName(), segmentTarFile, headers, parameters, fileUploadDownloadClient.DEFAULT_SOCKET_TIMEOUT_MS).getStatusCode();
+  }
+
   public static class AvroFileSchemaKafkaAvroMessageDecoder implements StreamMessageDecoder<byte[]> {
     private static final Logger LOGGER = LoggerFactory.getLogger(AvroFileSchemaKafkaAvroMessageDecoder.class);
     public static File avroFile;
diff --git a/pinot-plugins/pinot-batch-ingestion/pinot-batch-ingestion-common/src/main/java/org/apache/pinot/plugin/ingestion/batch/common/SegmentPushUtils.java b/pinot-plugins/pinot-batch-ingestion/pinot-batch-ingestion-common/src/main/java/org/apache/pinot/plugin/ingestion/batch/common/SegmentPushUtils.java
index bc5e91a..71ccfbe 100644
--- a/pinot-plugins/pinot-batch-ingestion/pinot-batch-ingestion-common/src/main/java/org/apache/pinot/plugin/ingestion/batch/common/SegmentPushUtils.java
+++ b/pinot-plugins/pinot-batch-ingestion/pinot-batch-ingestion-common/src/main/java/org/apache/pinot/plugin/ingestion/batch/common/SegmentPushUtils.java
@@ -19,17 +19,28 @@
 package org.apache.pinot.plugin.ingestion.batch.common;
 
 import com.google.common.base.Preconditions;
+import com.google.common.collect.ImmutableList;
 import java.io.File;
 import java.io.InputStream;
 import java.io.Serializable;
 import java.net.URI;
 import java.net.URISyntaxException;
 import java.util.Arrays;
+import java.util.HashMap;
 import java.util.List;
+import java.util.Map;
+import java.util.UUID;
+import org.apache.commons.io.FileUtils;
 import org.apache.commons.lang3.StringUtils;
+import org.apache.http.Header;
+import org.apache.http.NameValuePair;
+import org.apache.http.message.BasicHeader;
+import org.apache.http.message.BasicNameValuePair;
 import org.apache.pinot.common.exception.HttpErrorStatusException;
 import org.apache.pinot.common.utils.FileUploadDownloadClient;
 import org.apache.pinot.common.utils.SimpleHttpResponse;
+import org.apache.pinot.common.utils.TarGzCompressionUtils;
+import org.apache.pinot.core.segment.creator.impl.V1Constants;
 import org.apache.pinot.spi.filesystem.PinotFS;
 import org.apache.pinot.spi.ingestion.batch.spec.Constants;
 import org.apache.pinot.spi.ingestion.batch.spec.PinotClusterSpec;
@@ -177,4 +188,140 @@ public class SegmentPushUtils implements Serializable {
       }
     }
   }
+
+  /**
+   * This method takes a map of segment downloadURI to corresponding tar file path, and push those segments in metadata mode.
+   * The steps are:
+   * 1. Download segment from tar file path;
+   * 2. Untar segment metadata and creation meta files from the tar file to a segment metadata directory;
+   * 3. Tar this segment metadata directory into a tar file
+   * 4. Generate a POST request with segmentDownloadURI in header to push tar file to Pinot controller.
+   *
+   * @param spec is the segment generation job spec
+   * @param fileSystem is the PinotFs used to copy segment tar file
+   * @param segmentUriToTarPathMap contains the map of segment DownloadURI to segment tar file path
+   * @throws Exception
+   */
+  public static void sendSegmentUriAndMetadata(SegmentGenerationJobSpec spec, PinotFS fileSystem, Map<String, String> segmentUriToTarPathMap)
+      throws Exception {
+    String tableName = spec.getTableSpec().getTableName();
+    LOGGER.info("Start pushing segment metadata: {} to locations: {} for table {}",
+        segmentUriToTarPathMap,
+        Arrays.toString(spec.getPinotClusterSpecs()), tableName);
+    for (String segmentUriPath : segmentUriToTarPathMap.keySet()) {
+      String tarFilePath = segmentUriToTarPathMap.get(segmentUriPath);
+      String fileName = new File(tarFilePath).getName();
+      Preconditions.checkArgument(fileName.endsWith(Constants.TAR_GZ_FILE_EXT));
+      String segmentName = fileName.substring(0, fileName.length() - Constants.TAR_GZ_FILE_EXT.length());
+      File segmentMetadataFile = generateSegmentMetadataFile(fileSystem, URI.create(tarFilePath));
+      try {
+        for (PinotClusterSpec pinotClusterSpec : spec.getPinotClusterSpecs()) {
+          URI controllerURI;
+          try {
+            controllerURI = new URI(pinotClusterSpec.getControllerURI());
+          } catch (URISyntaxException e) {
+            throw new RuntimeException("Got invalid controller uri - '" + pinotClusterSpec.getControllerURI() + "'");
+          }
+          LOGGER.info("Pushing segment: {} to location: {} for table {}", segmentName, controllerURI, tableName);
+          int attempts = 1;
+          if (spec.getPushJobSpec() != null && spec.getPushJobSpec().getPushAttempts() > 0) {
+            attempts = spec.getPushJobSpec().getPushAttempts();
+          }
+          long retryWaitMs = 1000L;
+          if (spec.getPushJobSpec() != null && spec.getPushJobSpec().getPushRetryIntervalMillis() > 0) {
+            retryWaitMs = spec.getPushJobSpec().getPushRetryIntervalMillis();
+          }
+          RetryPolicies.exponentialBackoffRetryPolicy(attempts, retryWaitMs, 5).attempt(() -> {
+            try {
+              List<Header> headers = ImmutableList.of(
+                  new BasicHeader(FileUploadDownloadClient.CustomHeaders.DOWNLOAD_URI, segmentUriPath),
+                  new BasicHeader(FileUploadDownloadClient.CustomHeaders.UPLOAD_TYPE, FileUploadDownloadClient.FileUploadType.METADATA.toString()));
+              // Add table name as a request parameter
+              NameValuePair tableNameValuePair =
+                  new BasicNameValuePair(FileUploadDownloadClient.QueryParameters.TABLE_NAME, tableName);
+              List<NameValuePair> parameters = Arrays.asList(tableNameValuePair);
+              SimpleHttpResponse response = FILE_UPLOAD_DOWNLOAD_CLIENT.uploadSegmentMetadata(FileUploadDownloadClient.getUploadSegmentURI(controllerURI),
+                  segmentName, segmentMetadataFile, headers, parameters, FILE_UPLOAD_DOWNLOAD_CLIENT.DEFAULT_SOCKET_TIMEOUT_MS);
+              LOGGER.info("Response for pushing table {} segment {} to location {} - {}: {}", tableName, segmentName,
+                  controllerURI, response.getStatusCode(), response.getResponse());
+              return true;
+            } catch (HttpErrorStatusException e) {
+              int statusCode = e.getStatusCode();
+              if (statusCode >= 500) {
+                // Temporary exception
+                LOGGER
+                    .warn("Caught temporary exception while pushing table: {} segment: {} to {}, will retry", tableName,
+                        segmentName, controllerURI, e);
+                return false;
+              } else {
+                // Permanent exception
+                LOGGER.error("Caught permanent exception while pushing table: {} segment: {} to {}, won't retry",
+                    tableName, segmentName, controllerURI, e);
+                throw e;
+              }
+            }
+          });
+        }
+      } finally {
+        FileUtils.deleteQuietly(segmentMetadataFile);
+      }
+    }
+  }
+
+  public static Map<String, String> getSegmentUriToTarPathMap(URI outputDirURI, String uriPrefix, String uriSuffix, String[] files) {
+    Map<String, String> segmentUriToTarPathMap = new HashMap<>();
+    for (String file : files) {
+      URI uri = URI.create(file);
+      if (uri.getPath().endsWith(Constants.TAR_GZ_FILE_EXT)) {
+        URI updatedURI = SegmentPushUtils.generateSegmentTarURI(outputDirURI, uri, uriPrefix,uriSuffix);
+        segmentUriToTarPathMap.put(updatedURI.toString(), file);
+      }
+    }
+    return segmentUriToTarPathMap;
+  }
+
+  /**
+   * Generate a segment metadata only tar file, which contains only metadata.properties and creation.meta file.
+   * The purpose of this is to create a lean tar to push to Pinot controller for adding segments without downloading
+   * the complete segment and untar the segment tarball.
+   *
+   * 1. Download segment tar file to temp dir;
+   * 2. Extract only metadata.properties and creation.meta files from the segment tar file;
+   * 3. Tar both files into a segment metadata file.
+   *
+   */
+  private static File generateSegmentMetadataFile(PinotFS fileSystem, URI tarFileURI)
+      throws Exception {
+    String uuid = UUID.randomUUID().toString();
+    File tarFile = new File(FileUtils.getTempDirectory(), "segmentTar-" + uuid + TarGzCompressionUtils.TAR_GZ_FILE_EXTENSION);
+    File segmentMetadataDir = new File(FileUtils.getTempDirectory(), "segmentMetadataDir-" + uuid);
+    try {
+      fileSystem.copyToLocalFile(tarFileURI, tarFile);
+      if (segmentMetadataDir.exists()) {
+        FileUtils.forceDelete(segmentMetadataDir);
+      }
+      FileUtils.forceMkdir(segmentMetadataDir);
+
+      // Extract metadata.properties
+      LOGGER.info("Trying to untar Metadata file from: [{}] to [{}]", tarFile, segmentMetadataDir);
+      TarGzCompressionUtils.untarOneFile(tarFile, V1Constants.MetadataKeys.METADATA_FILE_NAME,
+          new File(segmentMetadataDir, V1Constants.MetadataKeys.METADATA_FILE_NAME));
+
+      // Extract creation.meta
+      LOGGER.info("Trying to untar CreationMeta file from: [{}] to [{}]", tarFile, segmentMetadataDir);
+      TarGzCompressionUtils.untarOneFile(tarFile, V1Constants.SEGMENT_CREATION_META,
+          new File(segmentMetadataDir, V1Constants.SEGMENT_CREATION_META));
+
+      File segmentMetadataTarFile = new File(FileUtils.getTempDirectory(), "segmentMetadata-" + uuid + TarGzCompressionUtils.TAR_GZ_FILE_EXTENSION);
+      if (segmentMetadataTarFile.exists()) {
+        FileUtils.forceDelete(segmentMetadataTarFile);
+      }
+      LOGGER.info("Trying to tar segment metadata dir [{}] to [{}]", segmentMetadataDir, segmentMetadataTarFile);
+      TarGzCompressionUtils.createTarGzFile(segmentMetadataDir, segmentMetadataTarFile);
+      return segmentMetadataTarFile;
+    } finally {
+      FileUtils.deleteQuietly(tarFile);
+      FileUtils.deleteQuietly(segmentMetadataDir);
+    }
+  }
 }
diff --git a/pinot-plugins/pinot-batch-ingestion/pinot-batch-ingestion-hadoop/src/main/java/org/apache/pinot/plugin/ingestion/batch/hadoop/HadoopSegmentMetadataPushJobRunner.java b/pinot-plugins/pinot-batch-ingestion/pinot-batch-ingestion-hadoop/src/main/java/org/apache/pinot/plugin/ingestion/batch/hadoop/HadoopSegmentMetadataPushJobRunner.java
new file mode 100644
index 0000000..d605afa
--- /dev/null
+++ b/pinot-plugins/pinot-batch-ingestion/pinot-batch-ingestion-hadoop/src/main/java/org/apache/pinot/plugin/ingestion/batch/hadoop/HadoopSegmentMetadataPushJobRunner.java
@@ -0,0 +1,104 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.pinot.plugin.ingestion.batch.hadoop;
+
+import java.io.File;
+import java.io.IOException;
+import java.io.Serializable;
+import java.net.URI;
+import java.net.URISyntaxException;
+import java.util.ArrayList;
+import java.util.List;
+import java.util.Map;
+import org.apache.pinot.plugin.ingestion.batch.common.SegmentPushUtils;
+import org.apache.pinot.spi.env.PinotConfiguration;
+import org.apache.pinot.spi.filesystem.PinotFS;
+import org.apache.pinot.spi.filesystem.PinotFSFactory;
+import org.apache.pinot.spi.ingestion.batch.runner.IngestionJobRunner;
+import org.apache.pinot.spi.ingestion.batch.spec.Constants;
+import org.apache.pinot.spi.ingestion.batch.spec.PinotFSSpec;
+import org.apache.pinot.spi.ingestion.batch.spec.SegmentGenerationJobSpec;
+
+
+public class HadoopSegmentMetadataPushJobRunner implements IngestionJobRunner, Serializable {
+  private SegmentGenerationJobSpec _spec;
+
+  public HadoopSegmentMetadataPushJobRunner() {
+  }
+
+  public HadoopSegmentMetadataPushJobRunner(SegmentGenerationJobSpec spec) {
+    init(spec);
+  }
+
+  @Override
+  public void init(SegmentGenerationJobSpec spec) {
+    _spec = spec;
+  }
+
+  @Override
+  public void run() {
+    //init all file systems
+    List<PinotFSSpec> pinotFSSpecs = _spec.getPinotFSSpecs();
+    for (PinotFSSpec pinotFSSpec : pinotFSSpecs) {
+      PinotFSFactory.register(pinotFSSpec.getScheme(), pinotFSSpec.getClassName(), new PinotConfiguration(pinotFSSpec));
+    }
+
+    //Get outputFS for writing output pinot segments
+    URI outputDirURI;
+    try {
+      outputDirURI = new URI(_spec.getOutputDirURI());
+      if (outputDirURI.getScheme() == null) {
+        outputDirURI = new File(_spec.getOutputDirURI()).toURI();
+      }
+    } catch (URISyntaxException e) {
+      throw new RuntimeException("outputDirURI is not valid - '" + _spec.getOutputDirURI() + "'");
+    }
+    PinotFS outputDirFS = PinotFSFactory.create(outputDirURI.getScheme());
+    //Get list of files to process
+    String[] files;
+    try {
+      files = outputDirFS.listFiles(outputDirURI, true);
+    } catch (IOException e) {
+      throw new RuntimeException("Unable to list all files under outputDirURI - '" + outputDirURI + "'");
+    }
+
+    List<String> segmentsToPush = new ArrayList<>();
+    for (String file : files) {
+      if (file.endsWith(Constants.TAR_GZ_FILE_EXT)) {
+        segmentsToPush.add(file);
+      }
+    }
+
+    int pushParallelism = _spec.getPushJobSpec().getPushParallelism();
+    if (pushParallelism < 1) {
+      pushParallelism = segmentsToPush.size();
+    }
+    // Push from driver
+
+    try {
+      Map<String, String> segmentUriToTarPathMap = SegmentPushUtils
+          .getSegmentUriToTarPathMap(outputDirURI, _spec.getPushJobSpec().getSegmentUriPrefix(),
+              _spec.getPushJobSpec().getSegmentUriSuffix(), segmentsToPush.toArray(new String[0]));
+      SegmentPushUtils
+          .sendSegmentUriAndMetadata(_spec, PinotFSFactory.create(outputDirURI.getScheme()), segmentUriToTarPathMap);
+    } catch (Exception e) {
+      throw new RuntimeException(e);
+    }
+  }
+}
diff --git a/pinot-plugins/pinot-batch-ingestion/pinot-batch-ingestion-spark/src/main/java/org/apache/pinot/plugin/ingestion/batch/spark/SparkSegmentMetadataPushJobRunner.java b/pinot-plugins/pinot-batch-ingestion/pinot-batch-ingestion-spark/src/main/java/org/apache/pinot/plugin/ingestion/batch/spark/SparkSegmentMetadataPushJobRunner.java
new file mode 100644
index 0000000..5b07db4
--- /dev/null
+++ b/pinot-plugins/pinot-batch-ingestion/pinot-batch-ingestion-spark/src/main/java/org/apache/pinot/plugin/ingestion/batch/spark/SparkSegmentMetadataPushJobRunner.java
@@ -0,0 +1,131 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.pinot.plugin.ingestion.batch.spark;
+
+import java.io.File;
+import java.io.IOException;
+import java.io.Serializable;
+import java.net.URI;
+import java.net.URISyntaxException;
+import java.util.ArrayList;
+import java.util.List;
+import java.util.Map;
+import org.apache.pinot.plugin.ingestion.batch.common.SegmentPushUtils;
+import org.apache.pinot.spi.env.PinotConfiguration;
+import org.apache.pinot.spi.filesystem.PinotFS;
+import org.apache.pinot.spi.filesystem.PinotFSFactory;
+import org.apache.pinot.spi.ingestion.batch.runner.IngestionJobRunner;
+import org.apache.pinot.spi.ingestion.batch.spec.Constants;
+import org.apache.pinot.spi.ingestion.batch.spec.PinotFSSpec;
+import org.apache.pinot.spi.ingestion.batch.spec.SegmentGenerationJobSpec;
+import org.apache.pinot.spi.utils.retry.AttemptsExceededException;
+import org.apache.pinot.spi.utils.retry.RetriableOperationException;
+import org.apache.spark.SparkContext;
+import org.apache.spark.api.java.JavaRDD;
+import org.apache.spark.api.java.JavaSparkContext;
+import org.apache.spark.api.java.function.VoidFunction;
+
+
+public class SparkSegmentMetadataPushJobRunner implements IngestionJobRunner, Serializable {
+  private SegmentGenerationJobSpec _spec;
+
+  public SparkSegmentMetadataPushJobRunner() {
+  }
+
+  public SparkSegmentMetadataPushJobRunner(SegmentGenerationJobSpec spec) {
+    init(spec);
+  }
+
+  @Override
+  public void init(SegmentGenerationJobSpec spec) {
+    _spec = spec;
+  }
+
+  @Override
+  public void run() {
+    //init all file systems
+    List<PinotFSSpec> pinotFSSpecs = _spec.getPinotFSSpecs();
+    for (PinotFSSpec pinotFSSpec : pinotFSSpecs) {
+      PinotFSFactory.register(pinotFSSpec.getScheme(), pinotFSSpec.getClassName(), new PinotConfiguration(pinotFSSpec));
+    }
+
+    //Get outputFS for writing output pinot segments
+    URI outputDirURI;
+    try {
+      outputDirURI = new URI(_spec.getOutputDirURI());
+      if (outputDirURI.getScheme() == null) {
+        outputDirURI = new File(_spec.getOutputDirURI()).toURI();
+      }
+    } catch (URISyntaxException e) {
+      throw new RuntimeException("outputDirURI is not valid - '" + _spec.getOutputDirURI() + "'");
+    }
+    PinotFS outputDirFS = PinotFSFactory.create(outputDirURI.getScheme());
+    //Get list of files to process
+    String[] files;
+    try {
+      files = outputDirFS.listFiles(outputDirURI, true);
+    } catch (IOException e) {
+      throw new RuntimeException("Unable to list all files under outputDirURI - '" + outputDirURI + "'");
+    }
+
+    List<String> segmentsToPush = new ArrayList<>();
+    for (String file : files) {
+      if (file.endsWith(Constants.TAR_GZ_FILE_EXT)) {
+        segmentsToPush.add(file);
+      }
+    }
+
+    int pushParallelism = _spec.getPushJobSpec().getPushParallelism();
+    if (pushParallelism < 1) {
+      pushParallelism = segmentsToPush.size();
+    }
+    if (pushParallelism == 1) {
+      // Push from driver
+      try {
+        SegmentPushUtils.pushSegments(_spec, outputDirFS, segmentsToPush);
+      } catch (RetriableOperationException | AttemptsExceededException e) {
+        throw new RuntimeException(e);
+      }
+    } else {
+      JavaSparkContext sparkContext = JavaSparkContext.fromSparkContext(SparkContext.getOrCreate());
+      JavaRDD<String> pathRDD = sparkContext.parallelize(segmentsToPush, pushParallelism);
+      URI finalOutputDirURI = outputDirURI;
+      // Prevent using lambda expression in Spark to avoid potential serialization exceptions, use inner function instead.
+      pathRDD.foreach(new VoidFunction<String>() {
+        @Override
+        public void call(String segmentTarPath)
+            throws Exception {
+          for (PinotFSSpec pinotFSSpec : pinotFSSpecs) {
+            PinotFSFactory
+                .register(pinotFSSpec.getScheme(), pinotFSSpec.getClassName(), new PinotConfiguration(pinotFSSpec));
+          }
+          try {
+            Map<String, String> segmentUriToTarPathMap = SegmentPushUtils
+                .getSegmentUriToTarPathMap(finalOutputDirURI, _spec.getPushJobSpec().getSegmentUriPrefix(),
+                    _spec.getPushJobSpec().getSegmentUriSuffix(), new String[]{segmentTarPath});
+            SegmentPushUtils.sendSegmentUriAndMetadata(_spec, PinotFSFactory.create(finalOutputDirURI.getScheme()),
+                segmentUriToTarPathMap);
+          } catch (RetriableOperationException | AttemptsExceededException e) {
+            throw new RuntimeException(e);
+          }
+        }
+      });
+    }
+  }
+}
diff --git a/pinot-plugins/pinot-batch-ingestion/pinot-batch-ingestion-spark/src/main/resources/segmentCreationAndMetadataPushJobSpec.yaml b/pinot-plugins/pinot-batch-ingestion/pinot-batch-ingestion-spark/src/main/resources/segmentCreationAndMetadataPushJobSpec.yaml
new file mode 100644
index 0000000..e6a5d32
--- /dev/null
+++ b/pinot-plugins/pinot-batch-ingestion/pinot-batch-ingestion-spark/src/main/resources/segmentCreationAndMetadataPushJobSpec.yaml
@@ -0,0 +1,53 @@
+#
+# Licensed to the Apache Software Foundation (ASF) under one
+# or more contributor license agreements.  See the NOTICE file
+# distributed with this work for additional information
+# regarding copyright ownership.  The ASF licenses this file
+# to you under the Apache License, Version 2.0 (the
+# "License"); you may not use this file except in compliance
+# with the License.  You may obtain a copy of the License at
+#
+#   http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing,
+# software distributed under the License is distributed on an
+# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+# KIND, either express or implied.  See the License for the
+# specific language governing permissions and limitations
+# under the License.
+#
+
+executionFrameworkSpec:
+  name: 'spark'
+  segmentGenerationJobRunnerClassName: 'org.apache.pinot.plugin.ingestion.batch.spark.SparkSegmentGenerationJobRunner'
+  segmentTarPushJobRunnerClassName: 'org.apache.pinot.plugin.ingestion.batch.spark.SparkSegmentTarPushJobRunner'
+  segmentUriPushJobRunnerClassName: 'org.apache.pinot.plugin.ingestion.batch.spark.SparkSegmentUriPushJobRunner'
+  segmentMetadataPushJobRunnerClassName: 'org.apache.pinot.plugin.ingestion.batch.spark.SparkSegmentMetadataPushJobRunner'
+jobType: SegmentCreationAndMetadataPush
+inputDirURI: 'file:///path/to/input'
+includeFileNamePattern: 'glob:**/*.parquet'
+excludeFileNamePattern: 'glob:**/*.avro'
+outputDirURI: 'file:///path/to/output'
+overwriteOutput: true
+pinotFSSpecs:
+  - scheme: file
+    className: org.apache.pinot.spi.filesystem.LocalPinotFS
+recordReaderSpec:
+  dataFormat: 'parquet'
+  className: 'org.apache.pinot.parquet.data.readers.ParquetRecordReader'
+tableSpec:
+  tableName: 'myTable'
+  schemaURI: 'http://localhost:9000/tables/myTable/schema'
+  tableConfigURI: 'http://localhost:9000/tables/myTable'
+pinotClusterSpecs:
+  - controllerURI: 'localhost:9000'
+pushJobSpec:
+
+  # pushParallelism: push job parallelism, default is 1.
+  pushParallelism: 2
+
+  # pushAttempts: number of attempts for push job, default is 1, which means no retry.
+  pushAttempts: 2
+
+  # pushRetryIntervalMillis: retry wait Ms, default to 1 second.
+  pushRetryIntervalMillis: 1000
\ No newline at end of file
diff --git a/pinot-plugins/pinot-batch-ingestion/pinot-batch-ingestion-standalone/src/main/java/org/apache/pinot/plugin/ingestion/batch/standalone/SegmentMetadataPushJobRunner.java b/pinot-plugins/pinot-batch-ingestion/pinot-batch-ingestion-standalone/src/main/java/org/apache/pinot/plugin/ingestion/batch/standalone/SegmentMetadataPushJobRunner.java
new file mode 100644
index 0000000..cad8cf6
--- /dev/null
+++ b/pinot-plugins/pinot-batch-ingestion/pinot-batch-ingestion-standalone/src/main/java/org/apache/pinot/plugin/ingestion/batch/standalone/SegmentMetadataPushJobRunner.java
@@ -0,0 +1,91 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.pinot.plugin.ingestion.batch.standalone;
+
+import java.io.File;
+import java.io.IOException;
+import java.net.URI;
+import java.net.URISyntaxException;
+import java.util.List;
+import java.util.Map;
+import org.apache.pinot.plugin.ingestion.batch.common.SegmentPushUtils;
+import org.apache.pinot.spi.env.PinotConfiguration;
+import org.apache.pinot.spi.filesystem.PinotFS;
+import org.apache.pinot.spi.filesystem.PinotFSFactory;
+import org.apache.pinot.spi.ingestion.batch.runner.IngestionJobRunner;
+import org.apache.pinot.spi.ingestion.batch.spec.PinotFSSpec;
+import org.apache.pinot.spi.ingestion.batch.spec.SegmentGenerationJobSpec;
+
+
+public class SegmentMetadataPushJobRunner implements IngestionJobRunner {
+
+  private SegmentGenerationJobSpec _spec;
+
+  public SegmentMetadataPushJobRunner() {
+  }
+
+  public SegmentMetadataPushJobRunner(SegmentGenerationJobSpec spec) {
+    init(spec);
+  }
+
+  @Override
+  public void init(SegmentGenerationJobSpec spec) {
+    _spec = spec;
+    if (_spec.getPushJobSpec() == null) {
+      throw new RuntimeException("Missing PushJobSpec");
+    }
+  }
+
+  @Override
+  public void run() {
+    //init all file systems
+    List<PinotFSSpec> pinotFSSpecs = _spec.getPinotFSSpecs();
+    for (PinotFSSpec pinotFSSpec : pinotFSSpecs) {
+      PinotFSFactory.register(pinotFSSpec.getScheme(), pinotFSSpec.getClassName(), new PinotConfiguration(pinotFSSpec));
+    }
+
+    //Get outputFS for writing output Pinot segments
+    URI outputDirURI;
+    try {
+      outputDirURI = new URI(_spec.getOutputDirURI());
+      if (outputDirURI.getScheme() == null) {
+        outputDirURI = new File(_spec.getOutputDirURI()).toURI();
+      }
+    } catch (URISyntaxException e) {
+      throw new RuntimeException("outputDirURI is not valid - '" + _spec.getOutputDirURI() + "'");
+    }
+    PinotFS outputDirFS = PinotFSFactory.create(outputDirURI.getScheme());
+
+    //Get list of files to process
+    String[] files;
+    try {
+      files = outputDirFS.listFiles(outputDirURI, true);
+    } catch (IOException e) {
+      throw new RuntimeException("Unable to list all files under outputDirURI - '" + outputDirURI + "'");
+    }
+    Map<String, String> segmentUriToTarPathMap = SegmentPushUtils
+        .getSegmentUriToTarPathMap(outputDirURI, _spec.getPushJobSpec().getSegmentUriPrefix(),
+            _spec.getPushJobSpec().getSegmentUriSuffix(), files);
+    try {
+      SegmentPushUtils.sendSegmentUriAndMetadata(_spec, outputDirFS, segmentUriToTarPathMap);
+    } catch (Exception e) {
+      throw new RuntimeException(e);
+    }
+  }
+}
diff --git a/pinot-spi/src/main/java/org/apache/pinot/spi/ingestion/batch/IngestionJobLauncher.java b/pinot-spi/src/main/java/org/apache/pinot/spi/ingestion/batch/IngestionJobLauncher.java
index 9412fc2..63ffcd7 100644
--- a/pinot-spi/src/main/java/org/apache/pinot/spi/ingestion/batch/IngestionJobLauncher.java
+++ b/pinot-spi/src/main/java/org/apache/pinot/spi/ingestion/batch/IngestionJobLauncher.java
@@ -106,6 +106,9 @@ public class IngestionJobLauncher {
       case SegmentUriPush:
         kickoffIngestionJob(spec, executionFramework.getSegmentUriPushJobRunnerClassName());
         break;
+      case SegmentMetadataPush:
+        kickoffIngestionJob(spec, executionFramework.getSegmentMetadataPushJobRunnerClassName());
+        break;
       case SegmentCreationAndTarPush:
         kickoffIngestionJob(spec, executionFramework.getSegmentGenerationJobRunnerClassName());
         kickoffIngestionJob(spec, executionFramework.getSegmentTarPushJobRunnerClassName());
@@ -114,6 +117,10 @@ public class IngestionJobLauncher {
         kickoffIngestionJob(spec, executionFramework.getSegmentGenerationJobRunnerClassName());
         kickoffIngestionJob(spec, executionFramework.getSegmentUriPushJobRunnerClassName());
         break;
+      case SegmentCreationAndMetadataPush:
+        kickoffIngestionJob(spec, executionFramework.getSegmentGenerationJobRunnerClassName());
+        kickoffIngestionJob(spec, executionFramework.getSegmentMetadataPushJobRunnerClassName());
+        break;
       default:
         LOGGER.error("Unsupported job type - {}. Support job types: {}", spec.getJobType(),
             Arrays.toString(PinotIngestionJobType.values()));
@@ -139,6 +146,6 @@ public class IngestionJobLauncher {
   }
 
   enum PinotIngestionJobType {
-    SegmentCreation, SegmentTarPush, SegmentUriPush, SegmentCreationAndTarPush, SegmentCreationAndUriPush,
+    SegmentCreation, SegmentTarPush, SegmentUriPush, SegmentMetadataPush, SegmentCreationAndTarPush, SegmentCreationAndUriPush, SegmentCreationAndMetadataPush,
   }
 }
diff --git a/pinot-spi/src/main/java/org/apache/pinot/spi/ingestion/batch/runner/IngestionJobRunner.java b/pinot-spi/src/main/java/org/apache/pinot/spi/ingestion/batch/runner/IngestionJobRunner.java
index 51b2f6e..4917cbf 100644
--- a/pinot-spi/src/main/java/org/apache/pinot/spi/ingestion/batch/runner/IngestionJobRunner.java
+++ b/pinot-spi/src/main/java/org/apache/pinot/spi/ingestion/batch/runner/IngestionJobRunner.java
@@ -28,6 +28,7 @@ import org.apache.pinot.spi.ingestion.batch.spec.SegmentGenerationJobSpec;
  *    SegmentGenerationJobRunner
  *    SegmentTarPushJobRunner
  *    SegmentUriPushJobRunner
+ *    SegmentMetadataPushJobRunner
  *
  */
 public interface IngestionJobRunner {
diff --git a/pinot-spi/src/main/java/org/apache/pinot/spi/ingestion/batch/spec/ExecutionFrameworkSpec.java b/pinot-spi/src/main/java/org/apache/pinot/spi/ingestion/batch/spec/ExecutionFrameworkSpec.java
index 7e379c7..68a11bf 100644
--- a/pinot-spi/src/main/java/org/apache/pinot/spi/ingestion/batch/spec/ExecutionFrameworkSpec.java
+++ b/pinot-spi/src/main/java/org/apache/pinot/spi/ingestion/batch/spec/ExecutionFrameworkSpec.java
@@ -32,21 +32,26 @@ public class ExecutionFrameworkSpec implements Serializable {
   private String _name;
 
   /**
-   * The class implements org.apache.pinot.spi.batch.ingestion.runner.SegmentGenerationJobRunner interface.
+   * The class implements org.apache.pinot.spi.ingestion.batch.runner.IngestionJobRunner interface.
    */
   private String _segmentGenerationJobRunnerClassName;
 
   /**
-   * The class implements org.apache.pinot.spi.batch.ingestion.runner.SegmentTarPushJobRunner interface.
+   * The class implements org.apache.pinot.spi.ingestion.batch.runner.IngestionJobRunner interface.
    */
   private String _segmentTarPushJobRunnerClassName;
 
   /**
-   * The class implements org.apache.pinot.spi.batch.ingestion.runner.SegmentUriPushJobRunner interface.
+   * The class implements org.apache.pinot.spi.ingestion.batch.runner.IngestionJobRunner interface.
    */
   private String _segmentUriPushJobRunnerClassName;
 
   /**
+   * The class implements org.apache.pinot.spi.ingestion.batch.runner.IngestionJobRunner interface.
+   */
+  private String _segmentMetadataPushJobRunnerClassName;
+
+  /**
    * Extra configs for execution framework.
    */
   private Map<String, String> _extraConfigs;
@@ -90,4 +95,12 @@ public class ExecutionFrameworkSpec implements Serializable {
   public void setExtraConfigs(Map<String, String> extraConfigs) {
     _extraConfigs = extraConfigs;
   }
+
+  public String getSegmentMetadataPushJobRunnerClassName() {
+    return _segmentMetadataPushJobRunnerClassName;
+  }
+
+  public void setSegmentMetadataPushJobRunnerClassName(String segmentMetadataPushJobRunnerClassName) {
+    _segmentMetadataPushJobRunnerClassName = segmentMetadataPushJobRunnerClassName;
+  }
 }
diff --git a/pinot-spi/src/main/java/org/apache/pinot/spi/ingestion/batch/spec/SegmentGenerationJobSpec.java b/pinot-spi/src/main/java/org/apache/pinot/spi/ingestion/batch/spec/SegmentGenerationJobSpec.java
index e41d5a4..ab199bc 100644
--- a/pinot-spi/src/main/java/org/apache/pinot/spi/ingestion/batch/spec/SegmentGenerationJobSpec.java
+++ b/pinot-spi/src/main/java/org/apache/pinot/spi/ingestion/batch/spec/SegmentGenerationJobSpec.java
@@ -38,8 +38,10 @@ public class SegmentGenerationJobSpec implements Serializable {
    *  'SegmentCreation'
    *  'SegmentTarPush'
    *  'SegmentUriPush'
+   *  'SegmentMetadataPush'
    *  'SegmentCreationAndTarPush'
    *  'SegmentCreationAndUriPush'
+   *  'SegmentCreationAndMetadataPush'
    */
   private String _jobType;
 


---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscribe@pinot.apache.org
For additional commands, e-mail: commits-help@pinot.apache.org