You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@pinot.apache.org by kh...@apache.org on 2022/12/06 12:04:14 UTC

[pinot] branch master updated: Allow segment upload via Metadata in MergeRollup Minion task (#9825)

This is an automated email from the ASF dual-hosted git repository.

kharekartik pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/pinot.git


The following commit(s) were added to refs/heads/master by this push:
     new fb48288ef6 Allow segment upload via Metadata in MergeRollup Minion task (#9825)
fb48288ef6 is described below

commit fb48288ef6a22c80a516087d95fea841f69c9af7
Author: Kartik Khare <kh...@gmail.com>
AuthorDate: Tue Dec 6 17:34:02 2022 +0530

    Allow segment upload via Metadata in MergeRollup Minion task (#9825)
    
    * Add code to push metadata in minion tasks
    
    * Fix metadata push
    
    * Add config for overwrite dir
    
    * Fix download url to generate outputDir
    
    * Fix linting
    
    * Remove MinionPushUtils class
    
    * Do not copy in case of tar push
    
    * Remove duplicate code for filesystem
    
    * Add metadata push to realtime to offline task
    
    * Use data dir as output dir
    
    * Add tests for realtime to offline metadata push
    
    * use controller data dir for output
    
    * Remove redundant code for URI push
    
    * Enforce outputDir in metadata push
    
    * Use SegmentConversionUtils method to push TAR files for backward compatibility
    
    * Do not catch exception since it is already handled
    
    Co-authored-by: Kartik Khare <kh...@Kartiks-MacBook-Pro.local>
---
 .../tests/BaseClusterIntegrationTest.java          |   6 +-
 .../MergeRollupMinionClusterIntegrationTest.java   | 145 ++++++++++++++++++++-
 ...fflineSegmentsMinionClusterIntegrationTest.java | 133 ++++++++++++++++++-
 .../BaseMultipleSegmentsConversionExecutor.java    | 105 ++++++++++++++-
 .../pinot/plugin/minion/tasks/MinionTaskUtils.java | 118 +++++++++++++++++
 .../mergerollup/MergeRollupTaskGenerator.java      |  11 +-
 .../RealtimeToOfflineSegmentsTaskGenerator.java    |   4 +-
 .../SegmentGenerationAndPushTaskExecutor.java      |   9 +-
 .../SegmentGenerationAndPushTaskGenerator.java     |   3 +-
 .../SegmentGenerationAndPushTaskUtils.java         |  73 -----------
 .../segment/local/utils/SegmentPushUtils.java      |  74 ++++++++---
 11 files changed, 568 insertions(+), 113 deletions(-)

diff --git a/pinot-integration-test-base/src/test/java/org/apache/pinot/integration/tests/BaseClusterIntegrationTest.java b/pinot-integration-test-base/src/test/java/org/apache/pinot/integration/tests/BaseClusterIntegrationTest.java
index b133a27551..11ffabe953 100644
--- a/pinot-integration-test-base/src/test/java/org/apache/pinot/integration/tests/BaseClusterIntegrationTest.java
+++ b/pinot-integration-test-base/src/test/java/org/apache/pinot/integration/tests/BaseClusterIntegrationTest.java
@@ -607,13 +607,17 @@ public abstract class BaseClusterIntegrationTest extends ClusterTest {
   }
 
   protected void waitForDocsLoaded(long timeoutMs, boolean raiseError) {
+    waitForDocsLoaded(timeoutMs, raiseError, getTableName());
+  }
+
+  protected void waitForDocsLoaded(long timeoutMs, boolean raiseError, String tableName) {
     final long countStarResult = getCountStarResult();
     TestUtils.waitForCondition(new Function<Void, Boolean>() {
       @Nullable
       @Override
       public Boolean apply(@Nullable Void aVoid) {
         try {
-          return getCurrentCountStarResult() == countStarResult;
+          return getCurrentCountStarResult(tableName) == countStarResult;
         } catch (Exception e) {
           return null;
         }
diff --git a/pinot-integration-tests/src/test/java/org/apache/pinot/integration/tests/MergeRollupMinionClusterIntegrationTest.java b/pinot-integration-tests/src/test/java/org/apache/pinot/integration/tests/MergeRollupMinionClusterIntegrationTest.java
index 5846eba2d3..9602275068 100644
--- a/pinot-integration-tests/src/test/java/org/apache/pinot/integration/tests/MergeRollupMinionClusterIntegrationTest.java
+++ b/pinot-integration-tests/src/test/java/org/apache/pinot/integration/tests/MergeRollupMinionClusterIntegrationTest.java
@@ -50,6 +50,7 @@ import org.apache.pinot.spi.config.table.TableConfig;
 import org.apache.pinot.spi.config.table.TableTaskConfig;
 import org.apache.pinot.spi.config.table.TableType;
 import org.apache.pinot.spi.data.Schema;
+import org.apache.pinot.spi.ingestion.batch.BatchConfigProperties;
 import org.apache.pinot.spi.utils.builder.TableConfigBuilder;
 import org.apache.pinot.spi.utils.builder.TableNameBuilder;
 import org.apache.pinot.util.TestUtils;
@@ -70,6 +71,7 @@ public class MergeRollupMinionClusterIntegrationTest extends BaseClusterIntegrat
   private static final String SINGLE_LEVEL_CONCAT_TEST_TABLE = "myTable1";
   private static final String SINGLE_LEVEL_ROLLUP_TEST_TABLE = "myTable2";
   private static final String MULTI_LEVEL_CONCAT_TEST_TABLE = "myTable3";
+  private static final String SINGLE_LEVEL_CONCAT_METADATA_TEST_TABLE = "myTable4";
   private static final long TIMEOUT_IN_MS = 10_000L;
 
   protected PinotHelixTaskResourceManager _helixTaskResourceManager;
@@ -79,15 +81,17 @@ public class MergeRollupMinionClusterIntegrationTest extends BaseClusterIntegrat
   protected final File _segmentDir1 = new File(_tempDir, "segmentDir1");
   protected final File _segmentDir2 = new File(_tempDir, "segmentDir2");
   protected final File _segmentDir3 = new File(_tempDir, "segmentDir3");
+  protected final File _segmentDir4 = new File(_tempDir, "segmentDir4");
   protected final File _tarDir1 = new File(_tempDir, "tarDir1");
   protected final File _tarDir2 = new File(_tempDir, "tarDir2");
   protected final File _tarDir3 = new File(_tempDir, "tarDir3");
+  protected final File _tarDir4 = new File(_tempDir, "tarDir4");
 
   @BeforeClass
   public void setUp()
       throws Exception {
-    TestUtils.ensureDirectoriesExistAndEmpty(_tempDir, _segmentDir1, _segmentDir2, _segmentDir3, _tarDir1, _tarDir2,
-        _tarDir3);
+    TestUtils.ensureDirectoriesExistAndEmpty(_tempDir, _segmentDir1, _segmentDir2, _segmentDir3, _segmentDir4,
+        _tarDir1, _tarDir2, _tarDir3, _tarDir4);
 
     // Start the Pinot cluster
     startZk();
@@ -105,9 +109,12 @@ public class MergeRollupMinionClusterIntegrationTest extends BaseClusterIntegrat
             getMultiColumnsSegmentPartitionConfig());
     TableConfig multiLevelConcatTableConfig =
         createOfflineTableConfig(MULTI_LEVEL_CONCAT_TEST_TABLE, getMultiLevelConcatTaskConfig());
+    TableConfig singleLevelConcatMetadataTableConfig =
+        createOfflineTableConfig(SINGLE_LEVEL_CONCAT_METADATA_TEST_TABLE, getSingleLevelConcatMetadataTaskConfig());
     addTableConfig(singleLevelConcatTableConfig);
     addTableConfig(singleLevelRollupTableConfig);
     addTableConfig(multiLevelConcatTableConfig);
+    addTableConfig(singleLevelConcatMetadataTableConfig);
 
     // Unpack the Avro files
     List<File> avroFiles = unpackAvroData(_tempDir);
@@ -119,9 +126,13 @@ public class MergeRollupMinionClusterIntegrationTest extends BaseClusterIntegrat
     buildSegmentsFromAvroWithPostfix(avroFiles, singleLevelRollupTableConfig, schema, 0, _segmentDir2, _tarDir2, "2");
     ClusterIntegrationTestUtils
         .buildSegmentsFromAvro(avroFiles, multiLevelConcatTableConfig, schema, 0, _segmentDir3, _tarDir3);
+    ClusterIntegrationTestUtils
+        .buildSegmentsFromAvro(avroFiles, singleLevelConcatMetadataTableConfig, schema, 0, _segmentDir4, _tarDir4);
     uploadSegments(SINGLE_LEVEL_CONCAT_TEST_TABLE, _tarDir1);
     uploadSegments(SINGLE_LEVEL_ROLLUP_TEST_TABLE, _tarDir2);
     uploadSegments(MULTI_LEVEL_CONCAT_TEST_TABLE, _tarDir3);
+    uploadSegments(SINGLE_LEVEL_CONCAT_METADATA_TEST_TABLE, _tarDir4);
+
 
     // Set up the H2 connection
     setUpH2Connection(avroFiles);
@@ -160,6 +171,21 @@ public class MergeRollupMinionClusterIntegrationTest extends BaseClusterIntegrat
     tableTaskConfigs.put("100days.maxNumRecordsPerTask", "15000");
     tableTaskConfigs.put("ActualElapsedTime.aggregationType", "min");
     tableTaskConfigs.put("WeatherDelay.aggregationType", "sum");
+    tableTaskConfigs.put(BatchConfigProperties.OVERWRITE_OUTPUT, "true");
+    return new TableTaskConfig(Collections.singletonMap(MinionConstants.MergeRollupTask.TASK_TYPE, tableTaskConfigs));
+  }
+
+  private TableTaskConfig getSingleLevelConcatMetadataTaskConfig() {
+    Map<String, String> tableTaskConfigs = new HashMap<>();
+    tableTaskConfigs.put("100days.mergeType", "concat");
+    tableTaskConfigs.put("100days.bufferTimePeriod", "1d");
+    tableTaskConfigs.put("100days.bucketTimePeriod", "100d");
+    tableTaskConfigs.put("100days.maxNumRecordsPerSegment", "15000");
+    tableTaskConfigs.put("100days.maxNumRecordsPerTask", "15000");
+    tableTaskConfigs.put("ActualElapsedTime.aggregationType", "min");
+    tableTaskConfigs.put("WeatherDelay.aggregationType", "sum");
+    tableTaskConfigs.put(BatchConfigProperties.OVERWRITE_OUTPUT, "true");
+    tableTaskConfigs.put(BatchConfigProperties.PUSH_MODE, BatchConfigProperties.SegmentPushType.METADATA.toString());
     return new TableTaskConfig(Collections.singletonMap(MinionConstants.MergeRollupTask.TASK_TYPE, tableTaskConfigs));
   }
 
@@ -169,6 +195,7 @@ public class MergeRollupMinionClusterIntegrationTest extends BaseClusterIntegrat
     tableTaskConfigs.put("150days.bufferTimePeriod", "1d");
     tableTaskConfigs.put("150days.bucketTimePeriod", "150d");
     tableTaskConfigs.put("150days.roundBucketTimePeriod", "7d");
+    tableTaskConfigs.put(BatchConfigProperties.OVERWRITE_OUTPUT, "true");
     return new TableTaskConfig(Collections.singletonMap(MinionConstants.MergeRollupTask.TASK_TYPE, tableTaskConfigs));
   }
 
@@ -185,6 +212,7 @@ public class MergeRollupMinionClusterIntegrationTest extends BaseClusterIntegrat
     tableTaskConfigs.put("90days.bucketTimePeriod", "90d");
     tableTaskConfigs.put("90days.maxNumRecordsPerSegment", "100000");
     tableTaskConfigs.put("90days.maxNumRecordsPerTask", "100000");
+    tableTaskConfigs.put(BatchConfigProperties.OVERWRITE_OUTPUT, "true");
     return new TableTaskConfig(Collections.singletonMap(MinionConstants.MergeRollupTask.TASK_TYPE, tableTaskConfigs));
   }
 
@@ -345,6 +373,119 @@ public class MergeRollupMinionClusterIntegrationTest extends BaseClusterIntegrat
     verifyTableDelete(offlineTableName);
   }
 
+  /**
+   * Test single level concat task with maxNumRecordPerTask, maxNumRecordPerSegment constraints
+   * Push type is set to Metadata
+   */
+  @Test
+  public void testSingleLevelConcatWithMetadataPush()
+      throws Exception {
+    // The original segments are time partitioned by month:
+    // segmentName (totalDocs)
+    // myTable1_16071_16101_3 (9746)
+    // myTable1_16102_16129_4 (8690)
+    // myTable1_16130_16159_5 (9621)
+    // myTable1_16160_16189_6 (9454)
+    // myTable1_16190_16220_7 (10329)
+    // myTable1_16221_16250_8 (10468)
+    // myTable1_16251_16281_9 (10499)
+    // myTable1_16282_16312_10 (10196)
+    // myTable1_16313_16342_11 (9136)
+    // myTable1_16343_16373_0 (9292)
+    // myTable1_16374_16404_1 (8736)
+    // myTable1_16405_16435_2 (9378)
+
+    // Expected merge tasks and result segments:
+    // 1.
+    //    {myTable1_16071_16101_3}
+    //      -> {merged_100days_T1_0_myTable1_16071_16099_0, merged_100days_T1_0_myTable1_16100_16101_1}
+    // 2.
+    //    {merged_100days_T1_0_myTable1_16100_16101_1, myTable1_16102_16129_4, myTable1_16130_16159_5}
+    //      -> {merged_100days_T2_0_myTable1_16100_???_0(15000), merged_100days_T2_0_myTable1_???_16159_1}
+    //    {myTable1_16160_16189_6, myTable1_16190_16220_7}
+    //      -> {merged_100days_T2_1_myTable1_16160_16199_0, merged_100days_T2_1_myTable1_16200_16220_1}
+    // 3.
+    //    {merged_100days_T2_1_myTable1_16200_16220_1, myTable1_16221_16250_8}
+    //      -> {merged_100days_T3_0_myTable1_16200_???_0(15000), merged_100days_T3_0_myTable1_???_16250_1}
+    //    {myTable1_16251_16281_9, myTable1_16282_16312_10}
+    //      -> {merged_100days_T3_1_myTable1_16251_???_0(15000), merged_100days_T3_1_myTable1_???_16299_1,
+    //      merged_100days_T3_1_myTable1_16300_16312_2}
+    // 4.
+    //    {merged_100days_T3_1_myTable1_16300_16312_2, myTable1_16313_16342_11, myTable1_16343_16373_0}
+    //      -> {merged_100days_T4_0_myTable1_16300_???_0(15000), merged_100days_T4_0_myTable1_???_16373_1}
+    //    {myTable1_16374_16404_1}
+    //      -> {merged_100days_T4_1_16374_16399_0, merged_100days_T4_1_16400_16404_1}
+    // 5.
+    //    {merged_100days_T4_1_16400_16404_1, myTable1_16405_16435_2}
+    //      -> {merged_100days_T5_0_myTable1_16400_16435_0}
+
+    String sqlQuery = "SELECT count(*) FROM myTable4"; // 115545 rows for the test table
+    JsonNode expectedJson = postQuery(sqlQuery, _brokerBaseApiUrl);
+    int[] expectedNumSubTasks = {1, 2, 2, 2, 1};
+    int[] expectedNumSegmentsQueried = {13, 12, 13, 13, 12};
+    long expectedWatermark = 16000 * 86_400_000L;
+    String offlineTableName = TableNameBuilder.OFFLINE.tableNameWithType(SINGLE_LEVEL_CONCAT_METADATA_TEST_TABLE);
+    int numTasks = 0;
+    for (String tasks = _taskManager.scheduleTasks(offlineTableName).get(MinionConstants.MergeRollupTask.TASK_TYPE);
+        tasks != null; tasks =
+        _taskManager.scheduleTasks(offlineTableName).get(MinionConstants.MergeRollupTask.TASK_TYPE), numTasks++) {
+      assertEquals(_helixTaskResourceManager.getSubtaskConfigs(tasks).size(), expectedNumSubTasks[numTasks]);
+      assertTrue(_helixTaskResourceManager.getTaskQueues()
+          .contains(PinotHelixTaskResourceManager.getHelixJobQueueName(MinionConstants.MergeRollupTask.TASK_TYPE)));
+      // Will not schedule task if there's incomplete task
+      assertNull(
+          _taskManager.scheduleTasks(offlineTableName).get(MinionConstants.RealtimeToOfflineSegmentsTask.TASK_TYPE));
+      waitForTaskToComplete();
+
+      // Check watermark
+      MergeRollupTaskMetadata minionTaskMetadata = MergeRollupTaskMetadata.fromZNRecord(
+          _taskManager.getClusterInfoAccessor()
+              .getMinionTaskMetadataZNRecord(MinionConstants.MergeRollupTask.TASK_TYPE, offlineTableName));
+      assertNotNull(minionTaskMetadata);
+      assertEquals((long) minionTaskMetadata.getWatermarkMap().get("100days"), expectedWatermark);
+      expectedWatermark += 100 * 86_400_000L;
+
+      // Check metadata of merged segments
+      for (SegmentZKMetadata metadata : _pinotHelixResourceManager.getSegmentsZKMetadata(offlineTableName)) {
+        if (metadata.getSegmentName().startsWith("merged")) {
+          // Check merged segment zk metadata
+          assertNotNull(metadata.getCustomMap());
+          assertEquals("100days",
+              metadata.getCustomMap().get(MinionConstants.MergeRollupTask.SEGMENT_ZK_METADATA_MERGE_LEVEL_KEY));
+          // Check merged segments are time partitioned
+          assertEquals(metadata.getEndTimeMs() / (86_400_000L * 100), metadata.getStartTimeMs() / (86_400_000L * 100));
+        }
+      }
+
+      final int finalNumTasks = numTasks;
+      TestUtils.waitForCondition(aVoid -> {
+        try {
+          // Check num total doc of merged segments are the same as the original segments
+          JsonNode actualJson = postQuery(sqlQuery, _brokerBaseApiUrl);
+          if (!SqlResultComparator.areEqual(actualJson, expectedJson, sqlQuery)) {
+            return false;
+          }
+          // Check query routing
+          int numSegmentsQueried = actualJson.get("numSegmentsQueried").asInt();
+          return numSegmentsQueried == expectedNumSegmentsQueried[finalNumTasks];
+        } catch (Exception e) {
+          throw new RuntimeException(e);
+        }
+      }, TIMEOUT_IN_MS, "Timeout while validating segments");
+    }
+    // Check total tasks
+    assertEquals(numTasks, 5);
+
+    assertTrue(_controllerStarter.getControllerMetrics()
+        .containsGauge("mergeRollupTaskDelayInNumBuckets.myTable4_OFFLINE.100days"));
+
+    // Drop the table
+    dropOfflineTable(SINGLE_LEVEL_CONCAT_METADATA_TEST_TABLE);
+
+    // Check if the task metadata is cleaned up on table deletion
+    verifyTableDelete(offlineTableName);
+  }
+
   /**
    * Test single level rollup task with duplicate data (original segments * 2)
    */
diff --git a/pinot-integration-tests/src/test/java/org/apache/pinot/integration/tests/RealtimeToOfflineSegmentsMinionClusterIntegrationTest.java b/pinot-integration-tests/src/test/java/org/apache/pinot/integration/tests/RealtimeToOfflineSegmentsMinionClusterIntegrationTest.java
index a8eb0a4c40..852230188b 100644
--- a/pinot-integration-tests/src/test/java/org/apache/pinot/integration/tests/RealtimeToOfflineSegmentsMinionClusterIntegrationTest.java
+++ b/pinot-integration-tests/src/test/java/org/apache/pinot/integration/tests/RealtimeToOfflineSegmentsMinionClusterIntegrationTest.java
@@ -24,6 +24,7 @@ import java.util.Collections;
 import java.util.HashMap;
 import java.util.List;
 import java.util.Map;
+import javax.annotation.Nullable;
 import org.apache.commons.io.FileUtils;
 import org.apache.helix.task.TaskState;
 import org.apache.helix.zookeeper.datamodel.ZNRecord;
@@ -38,6 +39,7 @@ import org.apache.pinot.spi.config.table.FieldConfig;
 import org.apache.pinot.spi.config.table.SegmentPartitionConfig;
 import org.apache.pinot.spi.config.table.TableConfig;
 import org.apache.pinot.spi.config.table.TableTaskConfig;
+import org.apache.pinot.spi.config.table.TableType;
 import org.apache.pinot.spi.config.table.TimestampConfig;
 import org.apache.pinot.spi.config.table.TimestampIndexGranularity;
 import org.apache.pinot.spi.config.table.ingestion.IngestionConfig;
@@ -45,7 +47,9 @@ import org.apache.pinot.spi.config.table.ingestion.TransformConfig;
 import org.apache.pinot.spi.data.DateTimeFieldSpec;
 import org.apache.pinot.spi.data.FieldSpec.DataType;
 import org.apache.pinot.spi.data.Schema;
+import org.apache.pinot.spi.ingestion.batch.BatchConfigProperties;
 import org.apache.pinot.spi.utils.CommonConstants;
+import org.apache.pinot.spi.utils.builder.TableConfigBuilder;
 import org.apache.pinot.spi.utils.builder.TableNameBuilder;
 import org.apache.pinot.util.TestUtils;
 import org.testng.annotations.AfterClass;
@@ -68,7 +72,11 @@ public class RealtimeToOfflineSegmentsMinionClusterIntegrationTest extends BaseC
   private PinotTaskManager _taskManager;
   private String _realtimeTableName;
   private String _offlineTableName;
+
+  private String _realtimeMetadataTableName;
+  private String _offlineMetadataTableName;
   private long _dataSmallestTimeMs;
+  private long _dataSmallestMetadataTableTimeMs;
 
   @Override
   protected SegmentPartitionConfig getSegmentPartitionConfig() {
@@ -113,14 +121,35 @@ public class RealtimeToOfflineSegmentsMinionClusterIntegrationTest extends BaseC
             new TimestampConfig(Arrays.asList(TimestampIndexGranularity.HOUR, TimestampIndexGranularity.DAY,
                 TimestampIndexGranularity.WEEK, TimestampIndexGranularity.MONTH)), null);
     realtimeTableConfig.setFieldConfigList(Collections.singletonList(tsFieldConfig));
+
+    Map<String, String> taskConfigs = new HashMap<>();
+    taskConfigs.put(BatchConfigProperties.OVERWRITE_OUTPUT, "true");
     realtimeTableConfig.setTaskConfig(new TableTaskConfig(
-        Collections.singletonMap(MinionConstants.RealtimeToOfflineSegmentsTask.TASK_TYPE, new HashMap<>())));
+        Collections.singletonMap(MinionConstants.RealtimeToOfflineSegmentsTask.TASK_TYPE, taskConfigs)));
     addTableConfig(realtimeTableConfig);
 
     TableConfig offlineTableConfig = createOfflineTableConfig();
     offlineTableConfig.setFieldConfigList(Collections.singletonList(tsFieldConfig));
     addTableConfig(offlineTableConfig);
 
+    Map<String, String> taskConfigsWithMetadata = new HashMap<>();
+    taskConfigsWithMetadata.put(BatchConfigProperties.OVERWRITE_OUTPUT, "true");
+    taskConfigsWithMetadata.put(
+        BatchConfigProperties.PUSH_MODE, BatchConfigProperties.SegmentPushType.METADATA.toString());
+    String tableWithMetadataPush = "myTable2";
+    TableConfig realtimeMetadataTableConfig = createRealtimeTableConfig(avroFiles.get(0), tableWithMetadataPush,
+        new TableTaskConfig(Collections.singletonMap(
+            MinionConstants.RealtimeToOfflineSegmentsTask.TASK_TYPE, taskConfigsWithMetadata)));
+    realtimeMetadataTableConfig.setIngestionConfig(ingestionConfig);
+    realtimeMetadataTableConfig.setFieldConfigList(Collections.singletonList(tsFieldConfig));
+    addTableConfig(realtimeMetadataTableConfig);
+
+    TableConfig offlineMetadataTableConfig =
+        createOfflineTableConfig(tableWithMetadataPush, null, getSegmentPartitionConfig());
+    offlineMetadataTableConfig.setFieldConfigList(Collections.singletonList(tsFieldConfig));
+    addTableConfig(offlineMetadataTableConfig);
+
+
     // Push data into Kafka
     pushAvroIntoKafka(avroFiles);
 
@@ -130,11 +159,17 @@ public class RealtimeToOfflineSegmentsMinionClusterIntegrationTest extends BaseC
     // Wait for all documents loaded
     waitForAllDocsLoaded(600_000L);
 
+    waitForDocsLoaded(600_000L, true, tableWithMetadataPush);
+
+
     _taskResourceManager = _controllerStarter.getHelixTaskResourceManager();
     _taskManager = _controllerStarter.getTaskManager();
     _realtimeTableName = TableNameBuilder.REALTIME.tableNameWithType(getTableName());
     _offlineTableName = TableNameBuilder.OFFLINE.tableNameWithType(getTableName());
 
+    _realtimeMetadataTableName = TableNameBuilder.REALTIME.tableNameWithType(tableWithMetadataPush);
+    _offlineMetadataTableName = TableNameBuilder.OFFLINE.tableNameWithType(tableWithMetadataPush);
+
     List<SegmentZKMetadata> segmentsZKMetadata = _helixResourceManager.getSegmentsZKMetadata(_realtimeTableName);
     long minSegmentTimeMs = Long.MAX_VALUE;
     for (SegmentZKMetadata segmentZKMetadata : segmentsZKMetadata) {
@@ -143,8 +178,42 @@ public class RealtimeToOfflineSegmentsMinionClusterIntegrationTest extends BaseC
       }
     }
     _dataSmallestTimeMs = minSegmentTimeMs;
+
+   segmentsZKMetadata = _helixResourceManager.getSegmentsZKMetadata(_realtimeMetadataTableName);
+   minSegmentTimeMs = Long.MAX_VALUE;
+    for (SegmentZKMetadata segmentZKMetadata : segmentsZKMetadata) {
+      if (segmentZKMetadata.getStatus() == CommonConstants.Segment.Realtime.Status.DONE) {
+        minSegmentTimeMs = Math.min(minSegmentTimeMs, segmentZKMetadata.getStartTimeMs());
+      }
+    }
+    _dataSmallestMetadataTableTimeMs = minSegmentTimeMs;
+  }
+
+  private TableConfig createOfflineTableConfig(String tableName, @Nullable TableTaskConfig taskConfig,
+      @Nullable SegmentPartitionConfig partitionConfig) {
+    return new TableConfigBuilder(TableType.OFFLINE).setTableName(tableName).setSchemaName(getSchemaName())
+        .setTimeColumnName(getTimeColumnName()).setSortedColumn(getSortedColumn())
+        .setInvertedIndexColumns(getInvertedIndexColumns()).setNoDictionaryColumns(getNoDictionaryColumns())
+        .setRangeIndexColumns(getRangeIndexColumns()).setBloomFilterColumns(getBloomFilterColumns())
+        .setFieldConfigList(getFieldConfigs()).setNumReplicas(getNumReplicas()).setSegmentVersion(getSegmentVersion())
+        .setLoadMode(getLoadMode()).setTaskConfig(taskConfig).setBrokerTenant(getBrokerTenant())
+        .setServerTenant(getServerTenant()).setIngestionConfig(getIngestionConfig())
+        .setNullHandlingEnabled(getNullHandlingEnabled()).setSegmentPartitionConfig(partitionConfig).build();
+  }
+
+  protected TableConfig createRealtimeTableConfig(File sampleAvroFile, String tableName, TableTaskConfig taskConfig) {
+    AvroFileSchemaKafkaAvroMessageDecoder._avroFile = sampleAvroFile;
+    return new TableConfigBuilder(TableType.REALTIME).setTableName(tableName).setSchemaName(getSchemaName())
+        .setTimeColumnName(getTimeColumnName()).setSortedColumn(getSortedColumn())
+        .setInvertedIndexColumns(getInvertedIndexColumns()).setNoDictionaryColumns(getNoDictionaryColumns())
+        .setRangeIndexColumns(getRangeIndexColumns()).setBloomFilterColumns(getBloomFilterColumns())
+        .setFieldConfigList(getFieldConfigs()).setNumReplicas(getNumReplicas()).setSegmentVersion(getSegmentVersion())
+        .setLoadMode(getLoadMode()).setTaskConfig(taskConfig).setBrokerTenant(getBrokerTenant())
+        .setServerTenant(getServerTenant()).setIngestionConfig(getIngestionConfig()).setQueryConfig(getQueryconfig())
+        .setLLC(useLlc()).setStreamConfigs(getStreamConfigs()).setNullHandlingEnabled(getNullHandlingEnabled()).build();
   }
 
+
   @Test
   public void testRealtimeToOfflineSegmentsTask()
       throws Exception {
@@ -163,14 +232,16 @@ public class RealtimeToOfflineSegmentsMinionClusterIntegrationTest extends BaseC
     long expectedWatermark = _dataSmallestTimeMs + 86400000;
     for (int i = 0; i < 3; i++) {
       // Schedule task
-      assertNotNull(_taskManager.scheduleTasks().get(MinionConstants.RealtimeToOfflineSegmentsTask.TASK_TYPE));
+      assertNotNull(_taskManager.scheduleTasks(_realtimeTableName)
+          .get(MinionConstants.RealtimeToOfflineSegmentsTask.TASK_TYPE));
       assertTrue(_taskResourceManager.getTaskQueues().contains(
           PinotHelixTaskResourceManager.getHelixJobQueueName(MinionConstants.RealtimeToOfflineSegmentsTask.TASK_TYPE)));
       // Should not generate more tasks
-      assertNull(_taskManager.scheduleTasks().get(MinionConstants.RealtimeToOfflineSegmentsTask.TASK_TYPE));
+      assertNull(_taskManager.scheduleTasks(_realtimeTableName)
+          .get(MinionConstants.RealtimeToOfflineSegmentsTask.TASK_TYPE));
 
       // Wait at most 600 seconds for all tasks COMPLETED
-      waitForTaskToComplete(expectedWatermark);
+      waitForTaskToComplete(expectedWatermark, _realtimeTableName);
       // check segment is in offline
       segmentsZKMetadata = _helixResourceManager.getSegmentsZKMetadata(_offlineTableName);
       assertEquals(segmentsZKMetadata.size(), (numOfflineSegmentsPerTask * (i + 1)));
@@ -194,7 +265,57 @@ public class RealtimeToOfflineSegmentsMinionClusterIntegrationTest extends BaseC
     testHardcodedQueries();
   }
 
-  private void waitForTaskToComplete(long expectedWatermark) {
+  @Test
+  public void testRealtimeToOfflineSegmentsMetadataPushTask()
+      throws Exception {
+    List<SegmentZKMetadata> segmentsZKMetadata = _helixResourceManager.getSegmentsZKMetadata(_offlineMetadataTableName);
+    assertTrue(segmentsZKMetadata.isEmpty());
+
+    // The number of offline segments would be equal to the product of number of partitions for all the
+    // partition columns if segment partitioning is configured.
+    SegmentPartitionConfig segmentPartitionConfig =
+        getOfflineTableConfig().getIndexingConfig().getSegmentPartitionConfig();
+    int numOfflineSegmentsPerTask =
+        segmentPartitionConfig != null ? segmentPartitionConfig.getColumnPartitionMap().values().stream()
+            .map(ColumnPartitionConfig::getNumPartitions).reduce((a, b) -> a * b)
+            .orElseThrow(() -> new RuntimeException("Expected accumulated result but not found.")) : 1;
+
+    long expectedWatermark = _dataSmallestMetadataTableTimeMs + 86400000;
+    _taskManager.cleanUpTask();
+    for (int i = 0; i < 3; i++) {
+      // Schedule task
+      assertNotNull(_taskManager.scheduleTasks(_realtimeMetadataTableName)
+          .get(MinionConstants.RealtimeToOfflineSegmentsTask.TASK_TYPE));
+      assertTrue(_taskResourceManager.getTaskQueues().contains(
+          PinotHelixTaskResourceManager.getHelixJobQueueName(MinionConstants.RealtimeToOfflineSegmentsTask.TASK_TYPE)));
+      // Should not generate more tasks
+      assertNull(_taskManager.scheduleTasks(_realtimeMetadataTableName)
+          .get(MinionConstants.RealtimeToOfflineSegmentsTask.TASK_TYPE));
+
+      // Wait at most 600 seconds for all tasks COMPLETED
+      waitForTaskToComplete(expectedWatermark, _realtimeMetadataTableName);
+      // check segment is in offline
+      segmentsZKMetadata = _helixResourceManager.getSegmentsZKMetadata(_offlineMetadataTableName);
+      assertEquals(segmentsZKMetadata.size(), (numOfflineSegmentsPerTask * (i + 1)));
+
+      long expectedOfflineSegmentTimeMs = expectedWatermark - 86400000;
+      for (int j = (numOfflineSegmentsPerTask * i); j < segmentsZKMetadata.size(); j++) {
+        SegmentZKMetadata segmentZKMetadata = segmentsZKMetadata.get(j);
+        assertEquals(segmentZKMetadata.getStartTimeMs(), expectedOfflineSegmentTimeMs);
+        assertEquals(segmentZKMetadata.getEndTimeMs(), expectedOfflineSegmentTimeMs);
+        if (segmentPartitionConfig != null) {
+          assertEquals(segmentZKMetadata.getPartitionMetadata().getColumnPartitionMap().keySet(),
+              segmentPartitionConfig.getColumnPartitionMap().keySet());
+          for (String partitionColumn : segmentPartitionConfig.getColumnPartitionMap().keySet()) {
+            assertEquals(segmentZKMetadata.getPartitionMetadata().getPartitions(partitionColumn).size(), 1);
+          }
+        }
+      }
+      expectedWatermark += 86400000;
+    }
+  }
+
+  private void waitForTaskToComplete(long expectedWatermark, String realtimeTableName) {
     TestUtils.waitForCondition(input -> {
       // Check task state
       for (TaskState taskState : _taskResourceManager.getTaskStates(
@@ -208,7 +329,7 @@ public class RealtimeToOfflineSegmentsMinionClusterIntegrationTest extends BaseC
 
     // Check segment ZK metadata
     ZNRecord znRecord = _taskManager.getClusterInfoAccessor()
-        .getMinionTaskMetadataZNRecord(MinionConstants.RealtimeToOfflineSegmentsTask.TASK_TYPE, _realtimeTableName);
+        .getMinionTaskMetadataZNRecord(MinionConstants.RealtimeToOfflineSegmentsTask.TASK_TYPE, realtimeTableName);
     RealtimeToOfflineSegmentsTaskMetadata minionTaskMetadata =
         znRecord != null ? RealtimeToOfflineSegmentsTaskMetadata.fromZNRecord(znRecord) : null;
     assertNotNull(minionTaskMetadata);
diff --git a/pinot-plugins/pinot-minion-tasks/pinot-minion-builtin-tasks/src/main/java/org/apache/pinot/plugin/minion/tasks/BaseMultipleSegmentsConversionExecutor.java b/pinot-plugins/pinot-minion-tasks/pinot-minion-builtin-tasks/src/main/java/org/apache/pinot/plugin/minion/tasks/BaseMultipleSegmentsConversionExecutor.java
index 5b64c58420..810f07b6d8 100644
--- a/pinot-plugins/pinot-minion-tasks/pinot-minion-builtin-tasks/src/main/java/org/apache/pinot/plugin/minion/tasks/BaseMultipleSegmentsConversionExecutor.java
+++ b/pinot-plugins/pinot-minion-tasks/pinot-minion-builtin-tasks/src/main/java/org/apache/pinot/plugin/minion/tasks/BaseMultipleSegmentsConversionExecutor.java
@@ -21,6 +21,7 @@ package org.apache.pinot.plugin.minion.tasks;
 import com.google.common.annotations.VisibleForTesting;
 import com.google.common.base.Preconditions;
 import java.io.File;
+import java.net.URI;
 import java.util.ArrayList;
 import java.util.Arrays;
 import java.util.HashMap;
@@ -46,7 +47,14 @@ import org.apache.pinot.minion.MinionConf;
 import org.apache.pinot.minion.event.MinionEventObserver;
 import org.apache.pinot.minion.event.MinionEventObservers;
 import org.apache.pinot.minion.exception.TaskCancelledException;
+import org.apache.pinot.segment.local.utils.SegmentPushUtils;
 import org.apache.pinot.spi.auth.AuthProvider;
+import org.apache.pinot.spi.filesystem.PinotFS;
+import org.apache.pinot.spi.ingestion.batch.BatchConfigProperties;
+import org.apache.pinot.spi.ingestion.batch.spec.PinotClusterSpec;
+import org.apache.pinot.spi.ingestion.batch.spec.PushJobSpec;
+import org.apache.pinot.spi.ingestion.batch.spec.SegmentGenerationJobSpec;
+import org.apache.pinot.spi.ingestion.batch.spec.TableSpec;
 import org.apache.pinot.spi.utils.builder.TableNameBuilder;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
@@ -66,6 +74,10 @@ public abstract class BaseMultipleSegmentsConversionExecutor extends BaseTaskExe
   private static final Logger LOGGER = LoggerFactory.getLogger(BaseMultipleSegmentsConversionExecutor.class);
   private static final String CUSTOM_SEGMENT_UPLOAD_CONTEXT_LINEAGE_ENTRY_ID = "lineageEntryId";
 
+  private static final int DEFUALT_PUSH_ATTEMPTS = 5;
+  private static final int DEFAULT_PUSH_PARALLELISM = 1;
+  private static final long DEFAULT_PUSH_RETRY_INTERVAL_MILLIS = 1000L;
+
   protected MinionConf _minionConf;
 
   // Tracking finer grained progress status.
@@ -242,6 +254,17 @@ public abstract class BaseMultipleSegmentsConversionExecutor extends BaseTaskExe
             new BasicHeader(FileUploadDownloadClient.CustomHeaders.SEGMENT_ZK_METADATA_CUSTOM_MAP_MODIFIER,
                 segmentZKMetadataCustomMapModifier.toJsonString());
 
+        String pushMode = configs.get(BatchConfigProperties.PUSH_MODE);
+        URI outputSegmentTarURI;
+        if (BatchConfigProperties.SegmentPushType.valueOf(pushMode.toUpperCase())
+            != BatchConfigProperties.SegmentPushType.TAR) {
+          outputSegmentTarURI = moveSegmentToOutputPinotFS(configs, convertedTarredSegmentFile);
+          LOGGER.info("Moved generated segment from [{}] to location: [{}]", convertedTarredSegmentFile,
+              outputSegmentTarURI);
+        } else {
+          outputSegmentTarURI = convertedTarredSegmentFile.toURI();
+        }
+
         List<Header> httpHeaders = new ArrayList<>();
         httpHeaders.add(segmentZKMetadataCustomMapModifierHeader);
         httpHeaders.addAll(AuthProviderUtils.toRequestHeaders(authProvider));
@@ -253,9 +276,8 @@ public abstract class BaseMultipleSegmentsConversionExecutor extends BaseTaskExe
             TableNameBuilder.extractRawTableName(tableNameWithType));
         List<NameValuePair> parameters = Arrays.asList(enableParallelPushProtectionParameter, tableNameParameter);
 
-        SegmentConversionUtils
-            .uploadSegment(configs, httpHeaders, parameters, tableNameWithType, resultSegmentName, uploadURL,
-                convertedTarredSegmentFile);
+        pushSegment(tableNameParameter.getValue(), configs, outputSegmentTarURI, httpHeaders, parameters,
+            segmentConversionResult);
         if (!FileUtils.deleteQuietly(convertedTarredSegmentFile)) {
           LOGGER.warn("Failed to delete tarred converted segment: {}", convertedTarredSegmentFile.getAbsolutePath());
         }
@@ -276,6 +298,83 @@ public abstract class BaseMultipleSegmentsConversionExecutor extends BaseTaskExe
     }
   }
 
+  private void pushSegment(String tableName, Map<String, String> taskConfigs, URI outputSegmentTarURI,
+      List<Header> headers, List<NameValuePair> parameters, SegmentConversionResult segmentConversionResult)
+      throws Exception {
+    String pushMode = taskConfigs.get(BatchConfigProperties.PUSH_MODE);
+    LOGGER.info("Trying to push Pinot segment with push mode {} from {}", pushMode, outputSegmentTarURI);
+
+    PushJobSpec pushJobSpec = new PushJobSpec();
+    pushJobSpec.setPushAttempts(DEFUALT_PUSH_ATTEMPTS);
+    pushJobSpec.setPushParallelism(DEFAULT_PUSH_PARALLELISM);
+    pushJobSpec.setPushRetryIntervalMillis(DEFAULT_PUSH_RETRY_INTERVAL_MILLIS);
+    pushJobSpec.setSegmentUriPrefix(taskConfigs.get(BatchConfigProperties.PUSH_SEGMENT_URI_PREFIX));
+    pushJobSpec.setSegmentUriSuffix(taskConfigs.get(BatchConfigProperties.PUSH_SEGMENT_URI_SUFFIX));
+
+    SegmentGenerationJobSpec spec = generatePushJobSpec(tableName, taskConfigs, pushJobSpec);
+
+    switch (BatchConfigProperties.SegmentPushType.valueOf(pushMode.toUpperCase())) {
+      case TAR:
+          File tarFile = new File(outputSegmentTarURI);
+          String segmentName = segmentConversionResult.getSegmentName();
+          String tableNameWithType = segmentConversionResult.getTableNameWithType();
+          String uploadURL = taskConfigs.get(MinionConstants.UPLOAD_URL_KEY);
+          SegmentConversionUtils.uploadSegment(taskConfigs, headers, parameters, tableNameWithType, segmentName,
+              uploadURL, tarFile);
+        break;
+      case METADATA:
+        if (taskConfigs.containsKey(BatchConfigProperties.OUTPUT_SEGMENT_DIR_URI)) {
+          URI outputSegmentDirURI = URI.create(taskConfigs.get(BatchConfigProperties.OUTPUT_SEGMENT_DIR_URI));
+          try (PinotFS outputFileFS = MinionTaskUtils.getOutputPinotFS(taskConfigs, outputSegmentDirURI)) {
+            Map<String, String> segmentUriToTarPathMap =
+                SegmentPushUtils.getSegmentUriToTarPathMap(outputSegmentDirURI, pushJobSpec,
+                    new String[]{outputSegmentTarURI.toString()});
+            SegmentPushUtils.sendSegmentUriAndMetadata(spec, outputFileFS, segmentUriToTarPathMap, headers, parameters);
+          }
+        } else {
+          throw new RuntimeException("Output dir URI missing for metadata push");
+        }
+        break;
+      default:
+        throw new UnsupportedOperationException("Unrecognized push mode - " + pushMode);
+    }
+  }
+
+  private SegmentGenerationJobSpec generatePushJobSpec(String tableName, Map<String, String> taskConfigs,
+      PushJobSpec pushJobSpec) {
+
+    TableSpec tableSpec = new TableSpec();
+    tableSpec.setTableName(tableName);
+
+    PinotClusterSpec pinotClusterSpec = new PinotClusterSpec();
+    pinotClusterSpec.setControllerURI(taskConfigs.get(BatchConfigProperties.PUSH_CONTROLLER_URI));
+    PinotClusterSpec[] pinotClusterSpecs = new PinotClusterSpec[]{pinotClusterSpec};
+
+    SegmentGenerationJobSpec spec = new SegmentGenerationJobSpec();
+    spec.setPushJobSpec(pushJobSpec);
+    spec.setTableSpec(tableSpec);
+    spec.setPinotClusterSpecs(pinotClusterSpecs);
+    spec.setAuthToken(taskConfigs.get(BatchConfigProperties.AUTH_TOKEN));
+
+    return spec;
+  }
+
+  private URI moveSegmentToOutputPinotFS(Map<String, String> taskConfigs, File localSegmentTarFile)
+      throws Exception {
+    URI outputSegmentDirURI = URI.create(taskConfigs.get(BatchConfigProperties.OUTPUT_SEGMENT_DIR_URI));
+    try (PinotFS outputFileFS = MinionTaskUtils.getOutputPinotFS(taskConfigs, outputSegmentDirURI)) {
+      URI outputSegmentTarURI = URI.create(outputSegmentDirURI + localSegmentTarFile.getName());
+      if (!Boolean.parseBoolean(taskConfigs.get(BatchConfigProperties.OVERWRITE_OUTPUT)) && outputFileFS.exists(
+          outputSegmentTarURI)) {
+        throw new RuntimeException(String.format("Output file: %s already exists. "
+                + "Set 'overwriteOutput' to true to ignore this error", outputSegmentTarURI));
+      } else {
+        outputFileFS.copyFromLocalFile(localSegmentTarFile, outputSegmentTarURI);
+      }
+      return outputSegmentTarURI;
+    }
+  }
+
   // SegmentUploadContext holds the info to conduct certain actions
   // before and after uploading multiple segments.
   protected static class SegmentUploadContext {
diff --git a/pinot-plugins/pinot-minion-tasks/pinot-minion-builtin-tasks/src/main/java/org/apache/pinot/plugin/minion/tasks/MinionTaskUtils.java b/pinot-plugins/pinot-minion-tasks/pinot-minion-builtin-tasks/src/main/java/org/apache/pinot/plugin/minion/tasks/MinionTaskUtils.java
new file mode 100644
index 0000000000..0d8c1e375c
--- /dev/null
+++ b/pinot-plugins/pinot-minion-tasks/pinot-minion-builtin-tasks/src/main/java/org/apache/pinot/plugin/minion/tasks/MinionTaskUtils.java
@@ -0,0 +1,118 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.pinot.plugin.minion.tasks;
+
+import java.net.URI;
+import java.util.HashMap;
+import java.util.Map;
+import org.apache.pinot.controller.helix.core.minion.ClusterInfoAccessor;
+import org.apache.pinot.spi.env.PinotConfiguration;
+import org.apache.pinot.spi.filesystem.LocalPinotFS;
+import org.apache.pinot.spi.filesystem.PinotFS;
+import org.apache.pinot.spi.filesystem.PinotFSFactory;
+import org.apache.pinot.spi.ingestion.batch.BatchConfigProperties;
+import org.apache.pinot.spi.plugin.PluginManager;
+import org.apache.pinot.spi.utils.IngestionConfigUtils;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+
+public class MinionTaskUtils {
+  private static final Logger LOGGER = LoggerFactory.getLogger(MinionTaskUtils.class);
+
+  private MinionTaskUtils() {
+  }
+
+  public static PinotFS getInputPinotFS(Map<String, String> taskConfigs, URI fileURI)
+      throws Exception {
+    String fileURIScheme = fileURI.getScheme();
+    if (fileURIScheme == null) {
+      return new LocalPinotFS();
+    }
+    // Try to create PinotFS using given Input FileSystem config always
+    String fsClass = taskConfigs.get(BatchConfigProperties.INPUT_FS_CLASS);
+    if (fsClass != null) {
+      PinotFS pinotFS = PluginManager.get().createInstance(fsClass);
+      PinotConfiguration fsProps = IngestionConfigUtils.getInputFsProps(taskConfigs);
+      pinotFS.init(fsProps);
+      return pinotFS;
+    }
+    return PinotFSFactory.create(fileURIScheme);
+  }
+
+  public static PinotFS getOutputPinotFS(Map<String, String> taskConfigs, URI fileURI)
+      throws Exception {
+    String fileURIScheme = (fileURI == null) ? null : fileURI.getScheme();
+    if (fileURIScheme == null) {
+      return new LocalPinotFS();
+    }
+    // Try to create PinotFS using given Input FileSystem config always
+    String fsClass = taskConfigs.get(BatchConfigProperties.OUTPUT_FS_CLASS);
+    if (fsClass != null) {
+      PinotFS pinotFS = PluginManager.get().createInstance(fsClass);
+      PinotConfiguration fsProps = IngestionConfigUtils.getOutputFsProps(taskConfigs);
+      pinotFS.init(fsProps);
+      return pinotFS;
+    }
+    return PinotFSFactory.create(fileURIScheme);
+  }
+
+  public static Map<String, String> getPushTaskConfig(String tableName, Map<String, String> taskConfigs,
+      ClusterInfoAccessor clusterInfoAccessor) {
+    try {
+      String pushMode = IngestionConfigUtils.getPushMode(taskConfigs);
+
+      Map<String, String> singleFileGenerationTaskConfig = new HashMap<>(taskConfigs);
+      if (pushMode == null
+          || pushMode.toUpperCase().contentEquals(BatchConfigProperties.SegmentPushType.TAR.toString())) {
+        singleFileGenerationTaskConfig.put(BatchConfigProperties.PUSH_MODE,
+            BatchConfigProperties.SegmentPushType.TAR.toString());
+      } else {
+        URI outputDirURI = URI.create(clusterInfoAccessor.getDataDir() + "/" + tableName);
+        String outputDirURIScheme = outputDirURI.getScheme();
+
+        if (!isLocalOutputDir(outputDirURIScheme)) {
+          singleFileGenerationTaskConfig.put(BatchConfigProperties.OUTPUT_SEGMENT_DIR_URI, outputDirURI.toString());
+          if (pushMode.toUpperCase().contentEquals(BatchConfigProperties.SegmentPushType.URI.toString())) {
+            LOGGER.warn("URI push type is not supported in this task. Switching to METADATA push");
+            pushMode = BatchConfigProperties.SegmentPushType.METADATA.toString();
+          }
+          singleFileGenerationTaskConfig.put(BatchConfigProperties.PUSH_MODE, pushMode);
+        } else {
+          LOGGER.warn("segment upload with METADATA push is not supported with local output dir: {}."
+              + " Switching to TAR push.", outputDirURI);
+          singleFileGenerationTaskConfig.put(BatchConfigProperties.PUSH_MODE,
+              BatchConfigProperties.SegmentPushType.TAR.toString());
+        }
+      }
+      singleFileGenerationTaskConfig.put(BatchConfigProperties.PUSH_CONTROLLER_URI, clusterInfoAccessor.getVipUrl());
+      return singleFileGenerationTaskConfig;
+    } catch (Exception e) {
+      return taskConfigs;
+    }
+  }
+
+  public static boolean isLocalOutputDir(String outputDirURIScheme) {
+    return outputDirURIScheme == null || outputDirURIScheme.startsWith("file");
+  }
+
+  public static PinotFS getLocalPinotFs() {
+    return new LocalPinotFS();
+  }
+}
diff --git a/pinot-plugins/pinot-minion-tasks/pinot-minion-builtin-tasks/src/main/java/org/apache/pinot/plugin/minion/tasks/mergerollup/MergeRollupTaskGenerator.java b/pinot-plugins/pinot-minion-tasks/pinot-minion-builtin-tasks/src/main/java/org/apache/pinot/plugin/minion/tasks/mergerollup/MergeRollupTaskGenerator.java
index fc7be90657..6043320495 100644
--- a/pinot-plugins/pinot-minion-tasks/pinot-minion-builtin-tasks/src/main/java/org/apache/pinot/plugin/minion/tasks/mergerollup/MergeRollupTaskGenerator.java
+++ b/pinot-plugins/pinot-minion-tasks/pinot-minion-builtin-tasks/src/main/java/org/apache/pinot/plugin/minion/tasks/mergerollup/MergeRollupTaskGenerator.java
@@ -46,11 +46,13 @@ import org.apache.pinot.core.common.MinionConstants.MergeRollupTask;
 import org.apache.pinot.core.common.MinionConstants.MergeTask;
 import org.apache.pinot.core.minion.PinotTaskConfig;
 import org.apache.pinot.plugin.minion.tasks.MergeTaskUtils;
+import org.apache.pinot.plugin.minion.tasks.MinionTaskUtils;
 import org.apache.pinot.spi.annotations.minion.TaskGenerator;
 import org.apache.pinot.spi.config.table.ColumnPartitionConfig;
 import org.apache.pinot.spi.config.table.SegmentPartitionConfig;
 import org.apache.pinot.spi.config.table.TableConfig;
 import org.apache.pinot.spi.config.table.TableType;
+import org.apache.pinot.spi.ingestion.batch.BatchConfigProperties;
 import org.apache.pinot.spi.utils.IngestionConfigUtils;
 import org.apache.pinot.spi.utils.TimeUtils;
 import org.apache.pinot.spi.utils.builder.TableNameBuilder;
@@ -558,12 +560,13 @@ public class MergeRollupTaskGenerator extends BaseTaskGenerator {
     String partitionSuffix = partitionSuffixBuilder.toString();
 
     for (int i = 0; i < segmentNamesList.size(); i++) {
-      Map<String, String> configs = new HashMap<>();
+      String downloadURL = StringUtils.join(downloadURLsList.get(i), MinionConstants.URL_SEPARATOR);
+      Map<String, String> configs = MinionTaskUtils.getPushTaskConfig(offlineTableName, taskConfigs,
+          _clusterInfoAccessor);
       configs.put(MinionConstants.TABLE_NAME_KEY, offlineTableName);
       configs.put(MinionConstants.SEGMENT_NAME_KEY,
           StringUtils.join(segmentNamesList.get(i), MinionConstants.SEGMENT_NAME_SEPARATOR));
-      configs.put(MinionConstants.DOWNLOAD_URL_KEY,
-          StringUtils.join(downloadURLsList.get(i), MinionConstants.URL_SEPARATOR));
+      configs.put(MinionConstants.DOWNLOAD_URL_KEY, downloadURL);
       configs.put(MinionConstants.UPLOAD_URL_KEY, _clusterInfoAccessor.getVipUrl() + "/segments");
       configs.put(MinionConstants.ENABLE_REPLACE_SEGMENTS_KEY, "true");
 
@@ -573,6 +576,8 @@ public class MergeRollupTaskGenerator extends BaseTaskGenerator {
         }
       }
 
+      configs.put(BatchConfigProperties.OVERWRITE_OUTPUT,
+          taskConfigs.getOrDefault(BatchConfigProperties.OVERWRITE_OUTPUT, "false"));
       configs.put(MergeRollupTask.MERGE_TYPE_KEY, mergeConfigs.get(MergeTask.MERGE_TYPE_KEY));
       configs.put(MergeRollupTask.MERGE_LEVEL_KEY, mergeLevel);
       configs.put(MergeTask.PARTITION_BUCKET_TIME_PERIOD_KEY, mergeConfigs.get(MergeTask.BUCKET_TIME_PERIOD_KEY));
diff --git a/pinot-plugins/pinot-minion-tasks/pinot-minion-builtin-tasks/src/main/java/org/apache/pinot/plugin/minion/tasks/realtimetoofflinesegments/RealtimeToOfflineSegmentsTaskGenerator.java b/pinot-plugins/pinot-minion-tasks/pinot-minion-builtin-tasks/src/main/java/org/apache/pinot/plugin/minion/tasks/realtimetoofflinesegments/RealtimeToOfflineSegmentsTaskGenerator.java
index f718821082..ff65c704ba 100644
--- a/pinot-plugins/pinot-minion-tasks/pinot-minion-builtin-tasks/src/main/java/org/apache/pinot/plugin/minion/tasks/realtimetoofflinesegments/RealtimeToOfflineSegmentsTaskGenerator.java
+++ b/pinot-plugins/pinot-minion-tasks/pinot-minion-builtin-tasks/src/main/java/org/apache/pinot/plugin/minion/tasks/realtimetoofflinesegments/RealtimeToOfflineSegmentsTaskGenerator.java
@@ -37,6 +37,7 @@ import org.apache.pinot.controller.helix.core.minion.generator.TaskGeneratorUtil
 import org.apache.pinot.core.common.MinionConstants;
 import org.apache.pinot.core.common.MinionConstants.RealtimeToOfflineSegmentsTask;
 import org.apache.pinot.core.minion.PinotTaskConfig;
+import org.apache.pinot.plugin.minion.tasks.MinionTaskUtils;
 import org.apache.pinot.spi.annotations.minion.TaskGenerator;
 import org.apache.pinot.spi.config.table.TableConfig;
 import org.apache.pinot.spi.config.table.TableTaskConfig;
@@ -204,7 +205,8 @@ public class RealtimeToOfflineSegmentsTaskGenerator extends BaseTaskGenerator {
         continue;
       }
 
-      Map<String, String> configs = new HashMap<>();
+      Map<String, String> configs = MinionTaskUtils.getPushTaskConfig(realtimeTableName, taskConfigs,
+          _clusterInfoAccessor);
       configs.put(MinionConstants.TABLE_NAME_KEY, realtimeTableName);
       configs.put(MinionConstants.SEGMENT_NAME_KEY, StringUtils.join(segmentNames, ","));
       configs.put(MinionConstants.DOWNLOAD_URL_KEY, StringUtils.join(downloadURLs, MinionConstants.URL_SEPARATOR));
diff --git a/pinot-plugins/pinot-minion-tasks/pinot-minion-builtin-tasks/src/main/java/org/apache/pinot/plugin/minion/tasks/segmentgenerationandpush/SegmentGenerationAndPushTaskExecutor.java b/pinot-plugins/pinot-minion-tasks/pinot-minion-builtin-tasks/src/main/java/org/apache/pinot/plugin/minion/tasks/segmentgenerationandpush/SegmentGenerationAndPushTaskExecutor.java
index a7d5127c09..6475d6f895 100644
--- a/pinot-plugins/pinot-minion-tasks/pinot-minion-builtin-tasks/src/main/java/org/apache/pinot/plugin/minion/tasks/segmentgenerationandpush/SegmentGenerationAndPushTaskExecutor.java
+++ b/pinot-plugins/pinot-minion-tasks/pinot-minion-builtin-tasks/src/main/java/org/apache/pinot/plugin/minion/tasks/segmentgenerationandpush/SegmentGenerationAndPushTaskExecutor.java
@@ -36,6 +36,7 @@ import org.apache.pinot.minion.event.MinionEventObserver;
 import org.apache.pinot.minion.event.MinionEventObservers;
 import org.apache.pinot.plugin.ingestion.batch.common.SegmentGenerationTaskRunner;
 import org.apache.pinot.plugin.minion.tasks.BaseTaskExecutor;
+import org.apache.pinot.plugin.minion.tasks.MinionTaskUtils;
 import org.apache.pinot.plugin.minion.tasks.SegmentConversionResult;
 import org.apache.pinot.segment.local.utils.SegmentPushUtils;
 import org.apache.pinot.spi.config.table.TableConfig;
@@ -176,10 +177,10 @@ public class SegmentGenerationAndPushTaskExecutor extends BaseTaskExecutor {
     if (taskConfigs.containsKey(BatchConfigProperties.OUTPUT_SEGMENT_DIR_URI)) {
       outputSegmentDirURI = URI.create(taskConfigs.get(BatchConfigProperties.OUTPUT_SEGMENT_DIR_URI));
     }
-    try (PinotFS outputFileFS = SegmentGenerationAndPushTaskUtils.getOutputPinotFS(taskConfigs, outputSegmentDirURI)) {
+    try (PinotFS outputFileFS = MinionTaskUtils.getOutputPinotFS(taskConfigs, outputSegmentDirURI)) {
       switch (BatchConfigProperties.SegmentPushType.valueOf(pushMode.toUpperCase())) {
         case TAR:
-          try (PinotFS pinotFS = SegmentGenerationAndPushTaskUtils.getLocalPinotFs()) {
+          try (PinotFS pinotFS = MinionTaskUtils.getLocalPinotFs()) {
             SegmentPushUtils.pushSegments(spec, pinotFS, Arrays.asList(outputSegmentTarURI.toString()));
           } catch (RetriableOperationException | AttemptsExceededException e) {
             throw new RuntimeException(e);
@@ -237,7 +238,7 @@ public class SegmentGenerationAndPushTaskExecutor extends BaseTaskExecutor {
       return localSegmentTarFile.toURI();
     }
     URI outputSegmentDirURI = URI.create(taskConfigs.get(BatchConfigProperties.OUTPUT_SEGMENT_DIR_URI));
-    try (PinotFS outputFileFS = SegmentGenerationAndPushTaskUtils.getOutputPinotFS(taskConfigs, outputSegmentDirURI)) {
+    try (PinotFS outputFileFS = MinionTaskUtils.getOutputPinotFS(taskConfigs, outputSegmentDirURI)) {
       URI outputSegmentTarURI = URI.create(outputSegmentDirURI + localSegmentTarFile.getName());
       if (!Boolean.parseBoolean(taskConfigs.get(BatchConfigProperties.OVERWRITE_OUTPUT)) && outputFileFS.exists(
           outputSegmentDirURI)) {
@@ -269,7 +270,7 @@ public class SegmentGenerationAndPushTaskExecutor extends BaseTaskExecutor {
     SegmentGenerationTaskSpec taskSpec = new SegmentGenerationTaskSpec();
     URI inputFileURI = URI.create(taskConfigs.get(BatchConfigProperties.INPUT_DATA_FILE_URI_KEY));
 
-    try (PinotFS inputFileFS = SegmentGenerationAndPushTaskUtils.getInputPinotFS(taskConfigs, inputFileURI)) {
+    try (PinotFS inputFileFS = MinionTaskUtils.getInputPinotFS(taskConfigs, inputFileURI)) {
       File localInputTempDir = new File(localTempDir, "input");
       FileUtils.forceMkdir(localInputTempDir);
       File localOutputTempDir = new File(localTempDir, "output");
diff --git a/pinot-plugins/pinot-minion-tasks/pinot-minion-builtin-tasks/src/main/java/org/apache/pinot/plugin/minion/tasks/segmentgenerationandpush/SegmentGenerationAndPushTaskGenerator.java b/pinot-plugins/pinot-minion-tasks/pinot-minion-builtin-tasks/src/main/java/org/apache/pinot/plugin/minion/tasks/segmentgenerationandpush/SegmentGenerationAndPushTaskGenerator.java
index eff429ec4a..e1bdd97cac 100644
--- a/pinot-plugins/pinot-minion-tasks/pinot-minion-builtin-tasks/src/main/java/org/apache/pinot/plugin/minion/tasks/segmentgenerationandpush/SegmentGenerationAndPushTaskGenerator.java
+++ b/pinot-plugins/pinot-minion-tasks/pinot-minion-builtin-tasks/src/main/java/org/apache/pinot/plugin/minion/tasks/segmentgenerationandpush/SegmentGenerationAndPushTaskGenerator.java
@@ -41,6 +41,7 @@ import org.apache.pinot.controller.helix.core.minion.generator.BaseTaskGenerator
 import org.apache.pinot.controller.helix.core.minion.generator.TaskGeneratorUtils;
 import org.apache.pinot.core.common.MinionConstants;
 import org.apache.pinot.core.minion.PinotTaskConfig;
+import org.apache.pinot.plugin.minion.tasks.MinionTaskUtils;
 import org.apache.pinot.spi.annotations.minion.TaskGenerator;
 import org.apache.pinot.spi.config.table.TableConfig;
 import org.apache.pinot.spi.config.table.TableTaskConfig;
@@ -323,7 +324,7 @@ public class SegmentGenerationAndPushTaskGenerator extends BaseTaskGenerator {
   private List<URI> getInputFilesFromDirectory(Map<String, String> batchConfigMap, URI inputDirURI,
       Set<String> existingSegmentInputFileURIs)
       throws Exception {
-    try (PinotFS inputDirFS = SegmentGenerationAndPushTaskUtils.getInputPinotFS(batchConfigMap, inputDirURI)) {
+    try (PinotFS inputDirFS = MinionTaskUtils.getInputPinotFS(batchConfigMap, inputDirURI)) {
 
       String includeFileNamePattern = batchConfigMap.get(BatchConfigProperties.INCLUDE_FILE_NAME_PATTERN);
       String excludeFileNamePattern = batchConfigMap.get(BatchConfigProperties.EXCLUDE_FILE_NAME_PATTERN);
diff --git a/pinot-plugins/pinot-minion-tasks/pinot-minion-builtin-tasks/src/main/java/org/apache/pinot/plugin/minion/tasks/segmentgenerationandpush/SegmentGenerationAndPushTaskUtils.java b/pinot-plugins/pinot-minion-tasks/pinot-minion-builtin-tasks/src/main/java/org/apache/pinot/plugin/minion/tasks/segmentgenerationandpush/SegmentGenerationAndPushTaskUtils.java
deleted file mode 100644
index 913184943f..0000000000
--- a/pinot-plugins/pinot-minion-tasks/pinot-minion-builtin-tasks/src/main/java/org/apache/pinot/plugin/minion/tasks/segmentgenerationandpush/SegmentGenerationAndPushTaskUtils.java
+++ /dev/null
@@ -1,73 +0,0 @@
-/**
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *   http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing,
- * software distributed under the License is distributed on an
- * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
- * KIND, either express or implied.  See the License for the
- * specific language governing permissions and limitations
- * under the License.
- */
-package org.apache.pinot.plugin.minion.tasks.segmentgenerationandpush;
-
-import java.net.URI;
-import java.util.Map;
-import org.apache.pinot.spi.env.PinotConfiguration;
-import org.apache.pinot.spi.filesystem.LocalPinotFS;
-import org.apache.pinot.spi.filesystem.PinotFS;
-import org.apache.pinot.spi.filesystem.PinotFSFactory;
-import org.apache.pinot.spi.ingestion.batch.BatchConfigProperties;
-import org.apache.pinot.spi.plugin.PluginManager;
-import org.apache.pinot.spi.utils.IngestionConfigUtils;
-
-
-public class SegmentGenerationAndPushTaskUtils {
-  private SegmentGenerationAndPushTaskUtils() {
-  }
-
-  static PinotFS getInputPinotFS(Map<String, String> taskConfigs, URI fileURI)
-      throws Exception {
-    String fileURIScheme = fileURI.getScheme();
-    if (fileURIScheme == null) {
-      return new LocalPinotFS();
-    }
-    // Try to create PinotFS using given Input FileSystem config always
-    String fsClass = taskConfigs.get(BatchConfigProperties.INPUT_FS_CLASS);
-    if (fsClass != null) {
-      PinotFS pinotFS = PluginManager.get().createInstance(fsClass);
-      PinotConfiguration fsProps = IngestionConfigUtils.getInputFsProps(taskConfigs);
-      pinotFS.init(fsProps);
-      return pinotFS;
-    }
-    return PinotFSFactory.create(fileURIScheme);
-  }
-
-  static PinotFS getOutputPinotFS(Map<String, String> taskConfigs, URI fileURI)
-      throws Exception {
-    String fileURIScheme = (fileURI == null) ? null : fileURI.getScheme();
-    if (fileURIScheme == null) {
-      return new LocalPinotFS();
-    }
-    // Try to create PinotFS using given Input FileSystem config always
-    String fsClass = taskConfigs.get(BatchConfigProperties.OUTPUT_FS_CLASS);
-    if (fsClass != null) {
-      PinotFS pinotFS = PluginManager.get().createInstance(fsClass);
-      PinotConfiguration fsProps = IngestionConfigUtils.getOutputFsProps(taskConfigs);
-      pinotFS.init(fsProps);
-      return pinotFS;
-    }
-    return PinotFSFactory.create(fileURIScheme);
-  }
-
-  static PinotFS getLocalPinotFs() {
-    return new LocalPinotFS();
-  }
-}
diff --git a/pinot-segment-local/src/main/java/org/apache/pinot/segment/local/utils/SegmentPushUtils.java b/pinot-segment-local/src/main/java/org/apache/pinot/segment/local/utils/SegmentPushUtils.java
index cee66b2b9a..4f6423fadc 100644
--- a/pinot-segment-local/src/main/java/org/apache/pinot/segment/local/utils/SegmentPushUtils.java
+++ b/pinot-segment-local/src/main/java/org/apache/pinot/segment/local/utils/SegmentPushUtils.java
@@ -27,7 +27,6 @@ import java.net.URISyntaxException;
 import java.nio.file.FileSystems;
 import java.nio.file.PathMatcher;
 import java.nio.file.Paths;
-import java.util.ArrayList;
 import java.util.Arrays;
 import java.util.HashMap;
 import java.util.List;
@@ -36,6 +35,7 @@ import java.util.UUID;
 import org.apache.commons.io.FileUtils;
 import org.apache.commons.lang3.StringUtils;
 import org.apache.http.Header;
+import org.apache.http.NameValuePair;
 import org.apache.http.message.BasicHeader;
 import org.apache.pinot.common.auth.AuthProviderUtils;
 import org.apache.pinot.common.exception.HttpErrorStatusException;
@@ -97,6 +97,49 @@ public class SegmentPushUtils implements Serializable {
   public static void pushSegments(SegmentGenerationJobSpec spec, PinotFS fileSystem, List<String> tarFilePaths)
       throws RetriableOperationException, AttemptsExceededException {
     String tableName = spec.getTableSpec().getTableName();
+    AuthProvider authProvider = AuthProviderUtils.makeAuthProvider(spec.getAuthToken());
+    List<Header> headers = AuthProviderUtils.toRequestHeaders(authProvider);
+    List<NameValuePair> parameters = FileUploadDownloadClient.makeTableParam(tableName);
+    pushSegments(spec, fileSystem, tarFilePaths, headers, parameters);
+  }
+
+  public static void sendSegmentUris(SegmentGenerationJobSpec spec, List<String> segmentUris)
+      throws RetriableOperationException, AttemptsExceededException {
+    String tableName = spec.getTableSpec().getTableName();
+    AuthProvider authProvider = AuthProviderUtils.makeAuthProvider(spec.getAuthToken());
+    List<Header> headers = AuthProviderUtils.toRequestHeaders(authProvider);
+    List<NameValuePair> parameters = FileUploadDownloadClient.makeTableParam(tableName);
+    sendSegmentUris(spec, segmentUris, headers, parameters);
+  }
+
+  /**
+   * This method takes a map of segment downloadURI to corresponding tar file path, and push those segments in
+   * metadata mode.
+   * The steps are:
+   * 1. Download segment from tar file path;
+   * 2. Untar segment metadata and creation meta files from the tar file to a segment metadata directory;
+   * 3. Tar this segment metadata directory into a tar file
+   * 4. Generate a POST request with segmentDownloadURI in header to push tar file to Pinot controller.
+   *
+   * @param spec is the segment generation job spec
+   * @param fileSystem is the PinotFs used to copy segment tar file
+   * @param segmentUriToTarPathMap contains the map of segment DownloadURI to segment tar file path
+   * @throws Exception
+   */
+  public static void sendSegmentUriAndMetadata(SegmentGenerationJobSpec spec, PinotFS fileSystem,
+      Map<String, String> segmentUriToTarPathMap)
+      throws Exception {
+    String tableName = spec.getTableSpec().getTableName();
+    AuthProvider authProvider = AuthProviderUtils.makeAuthProvider(spec.getAuthToken());
+    List<Header> headers = AuthProviderUtils.toRequestHeaders(authProvider);
+    List<NameValuePair> parameters = FileUploadDownloadClient.makeTableParam(tableName);
+    sendSegmentUriAndMetadata(spec, fileSystem, segmentUriToTarPathMap, headers, parameters);
+  }
+
+  public static void pushSegments(SegmentGenerationJobSpec spec, PinotFS fileSystem, List<String> tarFilePaths,
+      List<Header> headers, List<NameValuePair> parameters)
+      throws RetriableOperationException, AttemptsExceededException {
+    String tableName = spec.getTableSpec().getTableName();
     TableType tableType = tableName.endsWith("_" + TableType.REALTIME.name()) ? TableType.REALTIME : TableType.OFFLINE;
     boolean cleanUpOutputDir = spec.isCleanUpOutputDir();
     LOGGER.info("Start pushing segments: {}... to locations: {} for table {}",
@@ -108,7 +151,6 @@ public class SegmentPushUtils implements Serializable {
       String fileName = tarFile.getName();
       Preconditions.checkArgument(fileName.endsWith(Constants.TAR_GZ_FILE_EXT));
       String segmentName = fileName.substring(0, fileName.length() - Constants.TAR_GZ_FILE_EXT.length());
-      AuthProvider authProvider = AuthProviderUtils.makeAuthProvider(spec.getAuthToken());
       for (PinotClusterSpec pinotClusterSpec : spec.getPinotClusterSpecs()) {
         URI controllerURI;
         try {
@@ -129,8 +171,8 @@ public class SegmentPushUtils implements Serializable {
           try (InputStream inputStream = fileSystem.open(tarFileURI)) {
             SimpleHttpResponse response =
                 FILE_UPLOAD_DOWNLOAD_CLIENT.uploadSegment(FileUploadDownloadClient.getUploadSegmentURI(controllerURI),
-                    segmentName, inputStream, AuthProviderUtils.toRequestHeaders(authProvider),
-                    FileUploadDownloadClient.makeTableParam(tableName), tableName, tableType);
+                    segmentName, inputStream, headers,
+                    parameters, tableName, tableType);
             LOGGER.info("Response for pushing table {} segment {} to location {} - {}: {}", tableName, segmentName,
                 controllerURI, response.getStatusCode(), response.getResponse());
             return true;
@@ -157,7 +199,8 @@ public class SegmentPushUtils implements Serializable {
     }
   }
 
-  public static void sendSegmentUris(SegmentGenerationJobSpec spec, List<String> segmentUris)
+  public static void sendSegmentUris(SegmentGenerationJobSpec spec, List<String> segmentUris,
+      List<Header> headers, List<NameValuePair> parameters)
       throws RetriableOperationException, AttemptsExceededException {
     String tableName = spec.getTableSpec().getTableName();
     LOGGER.info("Start sending table {} segment URIs: {} to locations: {}", tableName,
@@ -166,7 +209,6 @@ public class SegmentPushUtils implements Serializable {
     for (String segmentUri : segmentUris) {
       URI segmentURI = URI.create(segmentUri);
       PinotFS outputDirFS = PinotFSFactory.create(segmentURI.getScheme());
-      AuthProvider authProvider = AuthProviderUtils.makeAuthProvider(spec.getAuthToken());
       for (PinotClusterSpec pinotClusterSpec : spec.getPinotClusterSpecs()) {
         URI controllerURI;
         try {
@@ -187,8 +229,7 @@ public class SegmentPushUtils implements Serializable {
           try {
             SimpleHttpResponse response = FILE_UPLOAD_DOWNLOAD_CLIENT
                 .sendSegmentUri(FileUploadDownloadClient.getUploadSegmentURI(controllerURI), segmentUri,
-                    AuthProviderUtils.toRequestHeaders(authProvider),
-                    FileUploadDownloadClient.makeTableParam(tableName), HttpClient.DEFAULT_SOCKET_TIMEOUT_MS);
+                    headers, parameters, HttpClient.DEFAULT_SOCKET_TIMEOUT_MS);
             LOGGER.info("Response for pushing table {} segment uri {} to location {} - {}: {}", tableName, segmentUri,
                 controllerURI, response.getStatusCode(), response.getResponse());
             return true;
@@ -230,7 +271,7 @@ public class SegmentPushUtils implements Serializable {
    * @throws Exception
    */
   public static void sendSegmentUriAndMetadata(SegmentGenerationJobSpec spec, PinotFS fileSystem,
-      Map<String, String> segmentUriToTarPathMap)
+      Map<String, String> segmentUriToTarPathMap, List<Header> headers, List<NameValuePair> parameters)
       throws Exception {
     String tableName = spec.getTableSpec().getTableName();
     LOGGER.info("Start pushing segment metadata: {} to locations: {} for table {}", segmentUriToTarPathMap,
@@ -243,7 +284,6 @@ public class SegmentPushUtils implements Serializable {
           ? fileName.substring(0, fileName.length() - Constants.TAR_GZ_FILE_EXT.length()) : fileName;
       SegmentNameUtils.validatePartialOrFullSegmentName(segmentName);
       File segmentMetadataFile = generateSegmentMetadataFile(fileSystem, URI.create(tarFilePath));
-      AuthProvider authProvider = AuthProviderUtils.makeAuthProvider(spec.getAuthToken());
       try {
         for (PinotClusterSpec pinotClusterSpec : spec.getPinotClusterSpecs()) {
           URI controllerURI;
@@ -263,7 +303,6 @@ public class SegmentPushUtils implements Serializable {
           }
           RetryPolicies.exponentialBackoffRetryPolicy(attempts, retryWaitMs, 5).attempt(() -> {
             try {
-              List<Header> headers = new ArrayList<>();
               headers.add(new BasicHeader(FileUploadDownloadClient.CustomHeaders.DOWNLOAD_URI, segmentUriPath));
               headers.add(new BasicHeader(FileUploadDownloadClient.CustomHeaders.UPLOAD_TYPE,
                   FileUploadDownloadClient.FileUploadType.METADATA.toString()));
@@ -271,12 +310,10 @@ public class SegmentPushUtils implements Serializable {
                 headers.add(new BasicHeader(FileUploadDownloadClient.CustomHeaders.COPY_SEGMENT_TO_DEEP_STORE,
                     String.valueOf(spec.getPushJobSpec().getCopyToDeepStoreForMetadataPush())));
               }
-              headers.addAll(AuthProviderUtils.toRequestHeaders(authProvider));
 
-              SimpleHttpResponse response = FILE_UPLOAD_DOWNLOAD_CLIENT
-                  .uploadSegmentMetadata(FileUploadDownloadClient.getUploadSegmentURI(controllerURI), segmentName,
-                      segmentMetadataFile, headers, FileUploadDownloadClient.makeTableParam(tableName),
-                      HttpClient.DEFAULT_SOCKET_TIMEOUT_MS);
+              SimpleHttpResponse response = FILE_UPLOAD_DOWNLOAD_CLIENT.uploadSegmentMetadata(
+                  FileUploadDownloadClient.getUploadSegmentURI(controllerURI), segmentName,
+                  segmentMetadataFile, headers, parameters, HttpClient.DEFAULT_SOCKET_TIMEOUT_MS);
               LOGGER.info("Response for pushing table {} segment {} to location {} - {}: {}", tableName, segmentName,
                   controllerURI, response.getStatusCode(), response.getResponse());
               return true;
@@ -284,9 +321,8 @@ public class SegmentPushUtils implements Serializable {
               int statusCode = e.getStatusCode();
               if (statusCode >= 500) {
                 // Temporary exception
-                LOGGER
-                    .warn("Caught temporary exception while pushing table: {} segment: {} to {}, will retry", tableName,
-                        segmentName, controllerURI, e);
+                LOGGER.warn("Caught temporary exception while pushing table: {} segment: {} to {}, will retry",
+                    tableName, segmentName, controllerURI, e);
                 return false;
               } else {
                 // Permanent exception


---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscribe@pinot.apache.org
For additional commands, e-mail: commits-help@pinot.apache.org