You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@pinot.apache.org by ja...@apache.org on 2019/02/12 05:12:22 UTC

[incubator-pinot] branch master updated: Refactor Hadoop Jobs (#3813)

This is an automated email from the ASF dual-hosted git repository.

jackie pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/incubator-pinot.git


The following commit(s) were added to refs/heads/master by this push:
     new eacb020  Refactor Hadoop Jobs (#3813)
eacb020 is described below

commit eacb0200d811374b097e504fee2870d9ff0b2b64
Author: Xiaotian (Jackie) Jiang <17...@users.noreply.github.com>
AuthorDate: Mon Feb 11 21:12:16 2019 -0800

    Refactor Hadoop Jobs (#3813)
    
    - Make Hadoop Jobs easier to extend
    - Add interface ControllerRestApi to provide APIs for config fetching and segment pushing
    - Add BaseSegmentJob abstract class for common segment related jobs
    - In SegmentCreationJob, allow plugable mapper class and additional job properties
    - In SegmentCreationMapper, allow additional segment generator configs
    - Tested with Hadoop integration test
---
 .../common/utils/FileUploadDownloadClient.java     |  32 +-
 .../data/readers/ThriftRecordReaderConfig.java     |   2 +-
 .../generator/SegmentGeneratorConfig.java          |   2 +-
 .../pinot/hadoop/PinotHadoopJobLauncher.java       |  26 +-
 .../apache/pinot/hadoop/io/PinotRecordWriter.java  |   5 +-
 .../apache/pinot/hadoop/job/BaseSegmentJob.java    |  84 ++++
 .../apache/pinot/hadoop/job/ControllerRestApi.java |  84 +---
 .../pinot/hadoop/job/DefaultControllerRestApi.java | 142 +++++++
 .../pinot/hadoop/job/JobConfigConstants.java       |   8 +-
 .../pinot/hadoop/job/SegmentCreationJob.java       | 427 +++++++++------------
 .../apache/pinot/hadoop/job/SegmentTarPushJob.java |  85 ++--
 .../apache/pinot/hadoop/job/SegmentUriPushJob.java |  98 ++---
 .../mapper/HadoopSegmentCreationMapReduceJob.java  | 333 ----------------
 .../hadoop/job/mapper/SegmentCreationMapper.java   | 248 ++++++++++++
 .../apache/pinot/hadoop/utils/PushLocation.java    |  40 +-
 ...mentBuildPushOfflineClusterIntegrationTest.java |   4 +-
 16 files changed, 754 insertions(+), 866 deletions(-)

diff --git a/pinot-common/src/main/java/org/apache/pinot/common/utils/FileUploadDownloadClient.java b/pinot-common/src/main/java/org/apache/pinot/common/utils/FileUploadDownloadClient.java
index 75017cf..4a37dcf 100644
--- a/pinot-common/src/main/java/org/apache/pinot/common/utils/FileUploadDownloadClient.java
+++ b/pinot-common/src/main/java/org/apache/pinot/common/utils/FileUploadDownloadClient.java
@@ -97,7 +97,6 @@ public class FileUploadDownloadClient implements Closeable {
   private static final String SEGMENT_METADATA_PATH = "/segmentmetadata";
   private static final String TABLES_PATH = "/tables";
   private static final String TYPE_DELIMITER = "?type=";
-  private static final String SLASH = "/";
 
   private final CloseableHttpClient _httpClient;
 
@@ -122,16 +121,14 @@ public class FileUploadDownloadClient implements Closeable {
     return new URI(scheme, null, host, port, path, null, null);
   }
 
-  public static URI getRetrieveTableConfigURI(String host, int port, String tableName)
+  public static URI getRetrieveTableConfigHttpURI(String host, int port, String rawTableName)
       throws URISyntaxException {
-    String path = TABLES_PATH + SLASH + tableName;
-    return getURI(HTTP, host, port, path);
+    return getURI(HTTP, host, port, TABLES_PATH + "/" + rawTableName);
   }
 
-  public static URI getRetrieveSchemaHttpURI(String host, int port, String tableName)
+  public static URI getRetrieveSchemaHttpURI(String host, int port, String schemaName)
       throws URISyntaxException {
-    String path = SCHEMA_PATH + SLASH + tableName;
-    return getURI(HTTP, host, port, path);
+    return getURI(HTTP, host, port, SCHEMA_PATH + "/" + schemaName);
   }
 
   public static URI getUploadSchemaHttpURI(String host, int port)
@@ -336,26 +333,7 @@ public class FileUploadDownloadClient implements Closeable {
     return errorMessage;
   }
 
-  /**
-   * Get request to retrieve table config
-   * @param uri URI
-   * @return Response
-   * @throws IOException
-   * @throws HttpErrorStatusException
-   */
-  public SimpleHttpResponse getTableConfig(URI uri)
-      throws IOException, HttpErrorStatusException {
-    return sendRequest(constructGetRequest(uri));
-  }
-
-  /**
-   * Get request to retrieve schema
-   * @param uri URI
-   * @return Response
-   * @throws IOException
-   * @throws HttpErrorStatusException
-   */
-  public SimpleHttpResponse getSchema(URI uri)
+  public SimpleHttpResponse sendGetRequest(URI uri)
       throws IOException, HttpErrorStatusException {
     return sendRequest(constructGetRequest(uri));
   }
diff --git a/pinot-core/src/main/java/org/apache/pinot/core/data/readers/ThriftRecordReaderConfig.java b/pinot-core/src/main/java/org/apache/pinot/core/data/readers/ThriftRecordReaderConfig.java
index 2f48f98..a2e532b 100644
--- a/pinot-core/src/main/java/org/apache/pinot/core/data/readers/ThriftRecordReaderConfig.java
+++ b/pinot-core/src/main/java/org/apache/pinot/core/data/readers/ThriftRecordReaderConfig.java
@@ -31,7 +31,7 @@ public class ThriftRecordReaderConfig implements RecordReaderConfig {
   }
 
   public void setThriftClass(String thriftClass) {
-    this._thriftClass = thriftClass;
+    _thriftClass = thriftClass;
   }
 
   @Override
diff --git a/pinot-core/src/main/java/org/apache/pinot/core/indexsegment/generator/SegmentGeneratorConfig.java b/pinot-core/src/main/java/org/apache/pinot/core/indexsegment/generator/SegmentGeneratorConfig.java
index 5b41e31..61cfbac 100644
--- a/pinot-core/src/main/java/org/apache/pinot/core/indexsegment/generator/SegmentGeneratorConfig.java
+++ b/pinot-core/src/main/java/org/apache/pinot/core/indexsegment/generator/SegmentGeneratorConfig.java
@@ -162,7 +162,7 @@ public class SegmentGeneratorConfig {
    * @param tableConfig
    * @param schema
    */
-  public SegmentGeneratorConfig(TableConfig tableConfig, Schema schema) {
+  public SegmentGeneratorConfig(@Nullable TableConfig tableConfig, Schema schema) {
     Preconditions.checkNotNull(schema);
     _schema = schema;
 
diff --git a/pinot-hadoop/src/main/java/org/apache/pinot/hadoop/PinotHadoopJobLauncher.java b/pinot-hadoop/src/main/java/org/apache/pinot/hadoop/PinotHadoopJobLauncher.java
index bf84a0e..2c7b0da 100644
--- a/pinot-hadoop/src/main/java/org/apache/pinot/hadoop/PinotHadoopJobLauncher.java
+++ b/pinot-hadoop/src/main/java/org/apache/pinot/hadoop/PinotHadoopJobLauncher.java
@@ -21,7 +21,6 @@ package org.apache.pinot.hadoop;
 import java.io.FileInputStream;
 import java.util.Arrays;
 import java.util.Properties;
-import org.apache.pinot.common.utils.StringUtil;
 import org.apache.pinot.hadoop.job.SegmentCreationJob;
 import org.apache.pinot.hadoop.job.SegmentTarPushJob;
 import org.apache.pinot.hadoop.job.SegmentUriPushJob;
@@ -36,13 +35,6 @@ public class PinotHadoopJobLauncher {
   private static final String USAGE = "usage: [job_type] [job.properties]";
   private static final String SUPPORT_JOB_TYPES =
       "\tsupport job types: " + Arrays.toString(PinotHadoopJobType.values());
-  private static final String SEGMENT_CREATION_JOB_NAME = PinotHadoopJobType.SegmentCreation.toString();
-  private static final String SEGMENT_PUSH_TAR_JOB_NAME = PinotHadoopJobType.SegmentTarPush.toString();
-  private static final String SEGMENT_PUSH_URI_JOB_NAME = PinotHadoopJobType.SegmentUriPush.toString();
-  private static final String SEGMENT_CREATION_AND_TAR_PUSH_JOB_NAME =
-      PinotHadoopJobType.SegmentCreationAndTarPush.toString();
-  private static final String SEGMENT_CREATION_AND_URI_PUSH_JOB_NAME =
-      PinotHadoopJobType.SegmentCreationAndUriPush.toString();
 
   private static void usage() {
     System.err.println(USAGE);
@@ -53,25 +45,21 @@ public class PinotHadoopJobLauncher {
       throws Exception {
     switch (jobType) {
       case SegmentCreation:
-        new SegmentCreationJob(SEGMENT_CREATION_JOB_NAME, jobConf).run();
+        new SegmentCreationJob(jobConf).run();
         break;
       case SegmentTarPush:
-        new SegmentTarPushJob(SEGMENT_PUSH_TAR_JOB_NAME, jobConf).run();
+        new SegmentTarPushJob(jobConf).run();
         break;
       case SegmentUriPush:
-        new SegmentUriPushJob(SEGMENT_PUSH_URI_JOB_NAME, jobConf).run();
+        new SegmentUriPushJob(jobConf).run();
         break;
       case SegmentCreationAndTarPush:
-        new SegmentCreationJob(StringUtil.join(":", SEGMENT_CREATION_JOB_NAME, SEGMENT_CREATION_AND_TAR_PUSH_JOB_NAME),
-            jobConf).run();
-        new SegmentTarPushJob(StringUtil.join(":", SEGMENT_PUSH_TAR_JOB_NAME, SEGMENT_CREATION_AND_TAR_PUSH_JOB_NAME),
-            jobConf).run();
+        new SegmentCreationJob(jobConf).run();
+        new SegmentTarPushJob(jobConf).run();
         break;
       case SegmentCreationAndUriPush:
-        new SegmentCreationJob(StringUtil.join(":", SEGMENT_CREATION_JOB_NAME, SEGMENT_CREATION_AND_URI_PUSH_JOB_NAME),
-            jobConf).run();
-        new SegmentUriPushJob(StringUtil.join(":", SEGMENT_PUSH_TAR_JOB_NAME, SEGMENT_CREATION_AND_URI_PUSH_JOB_NAME),
-            jobConf).run();
+        new SegmentCreationJob(jobConf).run();
+        new SegmentUriPushJob(jobConf).run();
         break;
       default:
         throw new RuntimeException("Not a valid jobType - " + jobType);
diff --git a/pinot-hadoop/src/main/java/org/apache/pinot/hadoop/io/PinotRecordWriter.java b/pinot-hadoop/src/main/java/org/apache/pinot/hadoop/io/PinotRecordWriter.java
index 08468d9..6585295 100644
--- a/pinot-hadoop/src/main/java/org/apache/pinot/hadoop/io/PinotRecordWriter.java
+++ b/pinot-hadoop/src/main/java/org/apache/pinot/hadoop/io/PinotRecordWriter.java
@@ -88,7 +88,8 @@ public class PinotRecordWriter<K, V> extends RecordWriter<K, V> {
     String localTarPath = getLocalTarFile(PinotOutputFormat.getTempSegmentDir(context));
     LOGGER.info("Trying to tar the segment to: {}", localTarPath);
     TarGzCompressionUtils.createTarGzOfDirectory(localSegmentPath, localTarPath);
-    String hdfsTarPath = _workDir + "/segmentTar/" + _segmentConfig.getSegmentName() + JobConfigConstants.TARGZ;
+    String hdfsTarPath =
+        _workDir + "/segmentTar/" + _segmentConfig.getSegmentName() + JobConfigConstants.TAR_GZ_FILE_EXT;
 
     LOGGER.info("*********************************************************************");
     LOGGER.info("Copy from : {} to {}", localTarPath, hdfsTarPath);
@@ -113,7 +114,7 @@ public class PinotRecordWriter<K, V> extends RecordWriter<K, V> {
     if (!f.exists()) {
       f.mkdirs();
     }
-    return localTarDir + "/" + _segmentConfig.getSegmentName() + JobConfigConstants.TARGZ;
+    return localTarDir + "/" + _segmentConfig.getSegmentName() + JobConfigConstants.TAR_GZ_FILE_EXT;
   }
 
   /**
diff --git a/pinot-hadoop/src/main/java/org/apache/pinot/hadoop/job/BaseSegmentJob.java b/pinot-hadoop/src/main/java/org/apache/pinot/hadoop/job/BaseSegmentJob.java
new file mode 100644
index 0000000..488c587
--- /dev/null
+++ b/pinot-hadoop/src/main/java/org/apache/pinot/hadoop/job/BaseSegmentJob.java
@@ -0,0 +1,84 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.pinot.hadoop.job;
+
+import java.io.IOException;
+import java.util.ArrayList;
+import java.util.List;
+import java.util.Properties;
+import javax.annotation.Nullable;
+import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.conf.Configured;
+import org.apache.hadoop.fs.FileStatus;
+import org.apache.hadoop.fs.FileSystem;
+import org.apache.hadoop.fs.Path;
+import org.apache.pinot.common.Utils;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+
+public abstract class BaseSegmentJob extends Configured {
+  protected final Logger _logger = LoggerFactory.getLogger(getClass());
+  protected final Properties _properties;
+  protected final Configuration _conf;
+
+  protected BaseSegmentJob(Properties properties) {
+    _properties = properties;
+    _conf = new Configuration();
+    setConf(_conf);
+    Utils.logVersions();
+    logProperties();
+  }
+
+  protected void logProperties() {
+    _logger.info("*********************************************************************");
+    _logger.info("Job Properties: {}", _properties);
+    _logger.info("*********************************************************************");
+  }
+
+  @Nullable
+  protected Path getPathFromProperty(String key) {
+    String value = _properties.getProperty(key);
+    return value != null ? new Path(value) : null;
+  }
+
+  protected List<Path> getDataFilePaths(Path pathPattern)
+      throws IOException {
+    List<Path> tarFilePaths = new ArrayList<>();
+    FileSystem fileSystem = FileSystem.get(_conf);
+    getDataFilePathsHelper(fileSystem, fileSystem.globStatus(pathPattern), tarFilePaths);
+    return tarFilePaths;
+  }
+
+  protected void getDataFilePathsHelper(FileSystem fileSystem, FileStatus[] fileStatuses, List<Path> tarFilePaths)
+      throws IOException {
+    for (FileStatus fileStatus : fileStatuses) {
+      Path path = fileStatus.getPath();
+      if (fileStatus.isDirectory()) {
+        getDataFilePathsHelper(fileSystem, fileSystem.listStatus(path), tarFilePaths);
+      } else {
+        if (isDataFile(path.getName())) {
+          tarFilePaths.add(path);
+        }
+      }
+    }
+  }
+
+  protected abstract boolean isDataFile(String fileName);
+}
diff --git a/pinot-hadoop/src/main/java/org/apache/pinot/hadoop/job/ControllerRestApi.java b/pinot-hadoop/src/main/java/org/apache/pinot/hadoop/job/ControllerRestApi.java
index 1568710..319b55b 100644
--- a/pinot-hadoop/src/main/java/org/apache/pinot/hadoop/job/ControllerRestApi.java
+++ b/pinot-hadoop/src/main/java/org/apache/pinot/hadoop/job/ControllerRestApi.java
@@ -18,87 +18,21 @@
  */
 package org.apache.pinot.hadoop.job;
 
-import com.fasterxml.jackson.databind.JsonNode;
-import java.net.URI;
-import java.net.URISyntaxException;
-import java.util.ArrayList;
+import java.io.Closeable;
 import java.util.List;
+import org.apache.hadoop.fs.FileSystem;
+import org.apache.hadoop.fs.Path;
 import org.apache.pinot.common.config.TableConfig;
-import org.apache.pinot.common.utils.FileUploadDownloadClient;
-import org.apache.pinot.common.utils.JsonUtils;
-import org.apache.pinot.common.utils.SimpleHttpResponse;
-import org.apache.pinot.hadoop.utils.PushLocation;
-import org.slf4j.Logger;
-import org.slf4j.LoggerFactory;
+import org.apache.pinot.common.data.Schema;
 
 
-public class ControllerRestApi {
-  private static final Logger LOGGER = LoggerFactory.getLogger(ControllerRestApi.class);
-  private final List<PushLocation> _pushLocations;
-  private final String _tableName;
+public interface ControllerRestApi extends Closeable {
 
-  private static final String OFFLINE = "OFFLINE";
+  TableConfig getTableConfig();
 
-  public ControllerRestApi(List<PushLocation> pushLocations, String tableName) {
-    LOGGER.info("Push Locations are: " + pushLocations);
-    _pushLocations = pushLocations;
-    _tableName = tableName;
-  }
+  Schema getSchema();
 
-  public TableConfig getTableConfig() {
-    FileUploadDownloadClient fileUploadDownloadClient = new FileUploadDownloadClient();
-    List<URI> tableConfigURIs = new ArrayList<>();
-    try {
-      for (PushLocation pushLocation : _pushLocations) {
-        tableConfigURIs.add(FileUploadDownloadClient
-            .getRetrieveTableConfigURI(pushLocation.getHost(), pushLocation.getPort(), _tableName));
-      }
-    } catch (URISyntaxException e) {
-      LOGGER.error("Could not construct table config URI for table {}", _tableName);
-      throw new RuntimeException(e);
-    }
+  void pushSegments(FileSystem fileSystem, List<Path> tarFilePaths);
 
-    // Return the first table config it can retrieve
-    for (URI uri : tableConfigURIs) {
-      try {
-        SimpleHttpResponse response = fileUploadDownloadClient.getTableConfig(uri);
-        JsonNode offlineTableConfig = JsonUtils.stringToJsonNode(response.getResponse()).get(OFFLINE);
-        if (offlineTableConfig != null) {
-          LOGGER.info("Got table config {}", offlineTableConfig);
-          return TableConfig.fromJSONConfig(offlineTableConfig);
-        }
-      } catch (Exception e) {
-        LOGGER.warn("Caught exception while trying to get table config for " + _tableName + " " + e);
-      }
-    }
-    LOGGER.error("Could not get table configs from any push locations provided for " + _tableName);
-    throw new RuntimeException("Could not get table config for table " + _tableName);
-  }
-
-  public String getSchema() {
-    FileUploadDownloadClient fileUploadDownloadClient = new FileUploadDownloadClient();
-    List<URI> schemaURIs = new ArrayList<>();
-    try {
-      for (PushLocation pushLocation : _pushLocations) {
-        schemaURIs.add(FileUploadDownloadClient
-            .getRetrieveSchemaHttpURI(pushLocation.getHost(), pushLocation.getPort(), _tableName));
-      }
-    } catch (URISyntaxException e) {
-      LOGGER.error("Could not construct schema URI for table {}", _tableName);
-      throw new RuntimeException(e);
-    }
-
-    for (URI schemaURI : schemaURIs) {
-      try {
-        SimpleHttpResponse response = fileUploadDownloadClient.getSchema(schemaURI);
-        if (response != null) {
-          return response.getResponse();
-        }
-      } catch (Exception e) {
-        LOGGER.warn("Caught exception while trying to get schema for " + _tableName + " " + e);
-      }
-    }
-    LOGGER.error("Could not get schema configs for any push locations provided for " + _tableName);
-    throw new RuntimeException("Could not get schema for table " + _tableName);
-  }
+  void sendSegmentUris(List<String> segmentUris);
 }
diff --git a/pinot-hadoop/src/main/java/org/apache/pinot/hadoop/job/DefaultControllerRestApi.java b/pinot-hadoop/src/main/java/org/apache/pinot/hadoop/job/DefaultControllerRestApi.java
new file mode 100644
index 0000000..efc4896
--- /dev/null
+++ b/pinot-hadoop/src/main/java/org/apache/pinot/hadoop/job/DefaultControllerRestApi.java
@@ -0,0 +1,142 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.pinot.hadoop.job;
+
+import com.fasterxml.jackson.databind.JsonNode;
+import com.google.common.base.Preconditions;
+import java.io.IOException;
+import java.io.InputStream;
+import java.util.List;
+import org.apache.hadoop.fs.FileSystem;
+import org.apache.hadoop.fs.Path;
+import org.apache.pinot.common.config.TableConfig;
+import org.apache.pinot.common.data.Schema;
+import org.apache.pinot.common.utils.FileUploadDownloadClient;
+import org.apache.pinot.common.utils.JsonUtils;
+import org.apache.pinot.common.utils.SimpleHttpResponse;
+import org.apache.pinot.hadoop.utils.PushLocation;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+
+public class DefaultControllerRestApi implements ControllerRestApi {
+  private static final Logger LOGGER = LoggerFactory.getLogger(DefaultControllerRestApi.class);
+
+  private final List<PushLocation> _pushLocations;
+  private final String _rawTableName;
+  private final FileUploadDownloadClient _fileUploadDownloadClient = new FileUploadDownloadClient();
+
+  private static final String OFFLINE = "OFFLINE";
+
+  public DefaultControllerRestApi(List<PushLocation> pushLocations, String rawTableName) {
+    LOGGER.info("Push locations are: {} for table: {}", pushLocations, rawTableName);
+    _pushLocations = pushLocations;
+    _rawTableName = rawTableName;
+  }
+
+  @Override
+  public TableConfig getTableConfig() {
+    for (PushLocation pushLocation : _pushLocations) {
+      try {
+        SimpleHttpResponse response = _fileUploadDownloadClient.sendGetRequest(FileUploadDownloadClient
+            .getRetrieveTableConfigHttpURI(pushLocation.getHost(), pushLocation.getPort(), _rawTableName));
+        JsonNode offlineJsonTableConfig = JsonUtils.stringToJsonNode(response.getResponse()).get(OFFLINE);
+        if (offlineJsonTableConfig != null) {
+          TableConfig offlineTableConfig = TableConfig.fromJSONConfig(offlineJsonTableConfig);
+          LOGGER.info("Got table config: {}", offlineTableConfig);
+          return offlineTableConfig;
+        }
+      } catch (Exception e) {
+        LOGGER.warn("Caught exception while fetching table config for table: {} from push location: {}", _rawTableName,
+            pushLocation, e);
+      }
+    }
+    String errorMessage = String
+        .format("Failed to get table config from push locations: %s for table: %s", _pushLocations, _rawTableName);
+    LOGGER.error(errorMessage);
+    throw new RuntimeException(errorMessage);
+  }
+
+  @Override
+  public Schema getSchema() {
+    for (PushLocation pushLocation : _pushLocations) {
+      try {
+        SimpleHttpResponse response = _fileUploadDownloadClient.sendGetRequest(FileUploadDownloadClient
+            .getRetrieveSchemaHttpURI(pushLocation.getHost(), pushLocation.getPort(), _rawTableName));
+        Schema schema = Schema.fromString(response.getResponse());
+        LOGGER.info("Got schema: {}", schema);
+        return schema;
+      } catch (Exception e) {
+        LOGGER.warn("Caught exception while fetching schema for table: {} from push location: {}", _rawTableName,
+            pushLocation, e);
+      }
+    }
+    String errorMessage =
+        String.format("Failed to get schema from push locations: %s for table: %s", _pushLocations, _rawTableName);
+    LOGGER.error(errorMessage);
+    throw new RuntimeException(errorMessage);
+  }
+
+  @Override
+  public void pushSegments(FileSystem fileSystem, List<Path> tarFilePaths) {
+    LOGGER.info("Start pushing segments: {} to locations: {}", tarFilePaths, _pushLocations);
+    for (Path tarFilePath : tarFilePaths) {
+      String fileName = tarFilePath.getName();
+      Preconditions.checkArgument(fileName.endsWith(JobConfigConstants.TAR_GZ_FILE_EXT));
+      String segmentName = fileName.substring(0, fileName.length() - JobConfigConstants.TAR_GZ_FILE_EXT.length());
+      for (PushLocation pushLocation : _pushLocations) {
+        LOGGER.info("Pushing segment: {} to location: {}", segmentName, pushLocation);
+        try (InputStream inputStream = fileSystem.open(tarFilePath)) {
+          SimpleHttpResponse response = _fileUploadDownloadClient.uploadSegment(
+              FileUploadDownloadClient.getUploadSegmentHttpURI(pushLocation.getHost(), pushLocation.getPort()),
+              segmentName, inputStream);
+          LOGGER.info("Response {}: {}", response.getStatusCode(), response.getResponse());
+        } catch (Exception e) {
+          LOGGER.error("Caught exception while pushing segment: {} to location: {}", segmentName, pushLocation, e);
+          throw new RuntimeException(e);
+        }
+      }
+    }
+  }
+
+  @Override
+  public void sendSegmentUris(List<String> segmentUris) {
+    LOGGER.info("Start sending segment URIs: {} to locations: {}", segmentUris, _pushLocations);
+    for (String segmentUri : segmentUris) {
+      for (PushLocation pushLocation : _pushLocations) {
+        LOGGER.info("Sending segment URI: {} to location: {}", segmentUri, pushLocation);
+        try {
+          SimpleHttpResponse response = _fileUploadDownloadClient.sendSegmentUri(
+              FileUploadDownloadClient.getUploadSegmentHttpURI(pushLocation.getHost(), pushLocation.getPort()),
+              segmentUri);
+          LOGGER.info("Response {}: {}", response.getStatusCode(), response.getResponse());
+        } catch (Exception e) {
+          LOGGER.error("Caught exception while sending segment URI: {} to location: {}", segmentUri, pushLocation, e);
+          throw new RuntimeException(e);
+        }
+      }
+    }
+  }
+
+  @Override
+  public void close()
+      throws IOException {
+    _fileUploadDownloadClient.close();
+  }
+}
diff --git a/pinot-hadoop/src/main/java/org/apache/pinot/hadoop/job/JobConfigConstants.java b/pinot-hadoop/src/main/java/org/apache/pinot/hadoop/job/JobConfigConstants.java
index 9f55a1e..c87378a 100644
--- a/pinot-hadoop/src/main/java/org/apache/pinot/hadoop/job/JobConfigConstants.java
+++ b/pinot-hadoop/src/main/java/org/apache/pinot/hadoop/job/JobConfigConstants.java
@@ -21,17 +21,17 @@ package org.apache.pinot.hadoop.job;
 public class JobConfigConstants {
   public static final String PATH_TO_INPUT = "path.to.input";
   public static final String PATH_TO_OUTPUT = "path.to.output";
+  public static final String PATH_TO_DEPS_JAR = "path.to.deps.jar";
   public static final String PATH_TO_READER_CONFIG = "path.to.reader.config";
   // Leave this for backward compatibility. We prefer to use the schema fetched from the controller.
   public static final String PATH_TO_SCHEMA = "path.to.schema";
 
-  public static final String TARGZ = ".tar.gz";
+  public static final String SEGMENT_TAR_DIR = "segmentTar";
+  public static final String TAR_GZ_FILE_EXT = ".tar.gz";
 
-  public static final String TIME_COLUMN_NAME = "table.time.column.name";
-  public static final String TIME_COLUMN_TYPE = "table.time.column.type";
-  public static final String TABLE_PUSH_TYPE = "table.push.type";
   public static final String SCHEMA = "data.schema";
   public static final String SEGMENT_TABLE_NAME = "segment.table.name";
+  public static final String SEGMENT_NAME_POSTFIX = "segment.name.postfix";
 
   public static final String PUSH_TO_HOSTS = "push.to.hosts";
   public static final String PUSH_TO_PORT = "push.to.port";
diff --git a/pinot-hadoop/src/main/java/org/apache/pinot/hadoop/job/SegmentCreationJob.java b/pinot-hadoop/src/main/java/org/apache/pinot/hadoop/job/SegmentCreationJob.java
index 7570600..7876563 100644
--- a/pinot-hadoop/src/main/java/org/apache/pinot/hadoop/job/SegmentCreationJob.java
+++ b/pinot-hadoop/src/main/java/org/apache/pinot/hadoop/job/SegmentCreationJob.java
@@ -18,15 +18,17 @@
  */
 package org.apache.pinot.hadoop.job;
 
-import java.io.File;
+import com.google.common.base.Preconditions;
+import java.io.DataOutputStream;
 import java.io.IOException;
-import java.util.ArrayList;
+import java.io.InputStream;
 import java.util.List;
+import java.util.Map;
 import java.util.Properties;
 import java.util.UUID;
+import javax.annotation.Nullable;
+import org.apache.commons.lang3.StringUtils;
 import org.apache.hadoop.conf.Configuration;
-import org.apache.hadoop.conf.Configured;
-import org.apache.hadoop.fs.FSDataOutputStream;
 import org.apache.hadoop.fs.FileStatus;
 import org.apache.hadoop.fs.FileSystem;
 import org.apache.hadoop.fs.Path;
@@ -35,169 +37,131 @@ import org.apache.hadoop.io.LongWritable;
 import org.apache.hadoop.io.Text;
 import org.apache.hadoop.mapred.JobContext;
 import org.apache.hadoop.mapreduce.Job;
+import org.apache.hadoop.mapreduce.Mapper;
 import org.apache.hadoop.mapreduce.lib.input.FileInputFormat;
 import org.apache.hadoop.mapreduce.lib.input.TextInputFormat;
 import org.apache.hadoop.mapreduce.lib.output.FileOutputFormat;
 import org.apache.hadoop.mapreduce.lib.output.TextOutputFormat;
-import org.apache.pinot.common.Utils;
 import org.apache.pinot.common.config.SegmentsValidationAndRetentionConfig;
 import org.apache.pinot.common.config.TableConfig;
 import org.apache.pinot.common.data.Schema;
-import org.apache.pinot.hadoop.job.mapper.HadoopSegmentCreationMapReduceJob;
+import org.apache.pinot.common.utils.StringUtil;
+import org.apache.pinot.hadoop.job.mapper.SegmentCreationMapper;
 import org.apache.pinot.hadoop.utils.PushLocation;
-import org.slf4j.Logger;
-import org.slf4j.LoggerFactory;
 
 
-public class SegmentCreationJob extends Configured {
-  private static final Logger LOGGER = LoggerFactory.getLogger(SegmentCreationJob.class);
+public class SegmentCreationJob extends BaseSegmentJob {
+  protected static final String APPEND = "APPEND";
 
-  private static final String PATH_TO_DEPS_JAR = "path.to.deps.jar";
-  private static final String APPEND = "APPEND";
+  protected final Path _inputPattern;
+  protected final Path _outputDir;
+  protected final Path _stagingDir;
+  protected final String _rawTableName;
 
-  private final String _jobName;
-  private final Properties _properties;
+  // Optional
+  protected final Path _depsJarDir;
+  protected final Path _schemaFile;
+  protected final String _defaultPermissionsMask;
+  protected final List<PushLocation> _pushLocations;
 
-  private final String _inputSegmentDir;
-  private final String _stagingDir;
-  private final Schema _dataSchema;
-  private final String _depsJarPath;
-  private final String _outputDir;
-  private final String _tableName;
+  protected FileSystem _fileSystem;
 
-  private final String _readerConfigFile;
+  public SegmentCreationJob(Properties properties) {
+    super(properties);
+    _conf.set("mapreduce.job.user.classpath.first", "true");
 
-  private final String _defaultPermissionsMask;
+    _inputPattern = Preconditions.checkNotNull(getPathFromProperty(JobConfigConstants.PATH_TO_INPUT));
+    _outputDir = Preconditions.checkNotNull(getPathFromProperty(JobConfigConstants.PATH_TO_OUTPUT));
+    _stagingDir = new Path(_outputDir, UUID.randomUUID().toString());
+    _rawTableName = Preconditions.checkNotNull(_properties.getProperty(JobConfigConstants.SEGMENT_TABLE_NAME));
 
-  private String[] _hosts;
-  private int _port;
+    // Optional
+    _depsJarDir = getPathFromProperty(JobConfigConstants.PATH_TO_DEPS_JAR);
+    _schemaFile = getPathFromProperty(JobConfigConstants.PATH_TO_SCHEMA);
+    _defaultPermissionsMask = _properties.getProperty(JobConfigConstants.DEFAULT_PERMISSIONS_MASK);
 
-  public SegmentCreationJob(String jobName, Properties properties)
-      throws Exception {
-    super(new Configuration());
-    getConf().set("mapreduce.job.user.classpath.first", "true");
-    _jobName = jobName;
-    _properties = properties;
-
-    _inputSegmentDir = _properties.getProperty(JobConfigConstants.PATH_TO_INPUT);
-    String schemaFilePath = _properties.getProperty(JobConfigConstants.PATH_TO_SCHEMA);
-    _outputDir = getOutputDir();
-    _stagingDir = new File(_outputDir, UUID.randomUUID().toString()).getAbsolutePath();
-    _depsJarPath = _properties.getProperty(PATH_TO_DEPS_JAR, null);
-    _readerConfigFile = _properties.getProperty(JobConfigConstants.PATH_TO_READER_CONFIG);
-    String hostsString = _properties.getProperty(JobConfigConstants.PUSH_TO_HOSTS);
-    String portString = _properties.getProperty(JobConfigConstants.PUSH_TO_PORT);
-
-    _defaultPermissionsMask = _properties.getProperty(JobConfigConstants.DEFAULT_PERMISSIONS_MASK, null);
-    LOGGER.info("Default permissions mask is {}", _defaultPermissionsMask);
-
-    // For backwards compatibility, we want to allow users to create segments without setting push location parameters
-    // in their creation jobs.
-    if (hostsString != null && portString != null) {
-      _hosts = hostsString.split(",");
-      _port = Integer.parseInt(portString);
-    }
-    _tableName = _properties.getProperty(JobConfigConstants.SEGMENT_TABLE_NAME);
-
-    Utils.logVersions();
-
-    LOGGER.info("*********************************************************************");
-    LOGGER.info("path.to.input: {}", _inputSegmentDir);
-    LOGGER.info("path.to.deps.jar: {}", _depsJarPath);
-    LOGGER.info("path.to.output: {}", _outputDir);
-    LOGGER.info("path.to.schema: {}", schemaFilePath);
-    if (_readerConfigFile != null) {
-      LOGGER.info("path.to.reader.config: {}", _readerConfigFile);
-    }
-    if (schemaFilePath != null) {
-      _dataSchema = Schema.fromFile(new File(schemaFilePath));
+    // Optional push location and table parameters. If set, will use the table config and schema from the push hosts.
+    String pushHostsString = _properties.getProperty(JobConfigConstants.PUSH_TO_HOSTS);
+    String pushPortString = _properties.getProperty(JobConfigConstants.PUSH_TO_PORT);
+    if (pushHostsString != null && pushPortString != null) {
+      _pushLocations =
+          PushLocation.getPushLocations(StringUtils.split(pushHostsString, ','), Integer.parseInt(pushPortString));
     } else {
-      _dataSchema = null;
+      _pushLocations = null;
     }
-    LOGGER.info("schema: {}", _dataSchema);
-    LOGGER.info("*********************************************************************");
-  }
-
-  protected String getOutputDir() {
-    return _properties.getProperty(JobConfigConstants.PATH_TO_OUTPUT);
-  }
 
-  protected String getInputDir() {
-    return _inputSegmentDir;
+    _logger.info("*********************************************************************");
+    _logger.info("Input Pattern: {}", _inputPattern);
+    _logger.info("Output Directory: {}", _outputDir);
+    _logger.info("Staging Directory: {}", _stagingDir);
+    _logger.info("Raw Table Name: {}", _rawTableName);
+    _logger.info("Dependencies Directory: {}", _depsJarDir);
+    _logger.info("Schema File: {}", _schemaFile);
+    _logger.info("Default Permissions Mask: {}", _defaultPermissionsMask);
+    _logger.info("Push Locations: {}", _pushLocations);
+    _logger.info("*********************************************************************");
   }
 
-  protected void setOutputPath(Configuration configuration) {
-
+  @Override
+  protected boolean isDataFile(String fileName) {
+    return fileName.endsWith(".avro") || fileName.endsWith(".csv") || fileName.endsWith(".json") || fileName
+        .endsWith(".thrift");
   }
 
   public void run()
       throws Exception {
-    LOGGER.info("Starting {}", getClass().getSimpleName());
-
-    FileSystem fs = FileSystem.get(getConf());
-    Path inputPathPattern = new Path(_inputSegmentDir);
-    Path stagingDir = new Path(_stagingDir);
-    Path outputDir = new Path(_outputDir);
-
-    if (fs.exists(outputDir)) {
-      LOGGER.warn("Found the output folder {}, deleting it", _outputDir);
-      fs.delete(outputDir, true);
-    }
-    fs.mkdirs(outputDir);
-
-    if (fs.exists(stagingDir)) {
-      LOGGER.warn("Found the temp folder {}, deleting it", stagingDir);
-      fs.delete(stagingDir, true);
+    _logger.info("Starting {}", getClass().getSimpleName());
+
+    // Initialize all directories
+    _fileSystem = FileSystem.get(_conf);
+    mkdirs(_outputDir);
+    mkdirs(_stagingDir);
+    Path stagingInputDir = new Path(_stagingDir, "input");
+    mkdirs(stagingInputDir);
+
+    // Gather all data files
+    List<Path> dataFilePaths = getDataFilePaths(_inputPattern);
+    int numDataFiles = dataFilePaths.size();
+    if (numDataFiles == 0) {
+      String errorMessage = "No data file founded with pattern: " + _inputPattern;
+      _logger.error(errorMessage);
+      throw new RuntimeException(errorMessage);
+    } else {
+      _logger.info("Creating segments with data files: {}", dataFilePaths);
+      for (int i = 0; i < numDataFiles; i++) {
+        Path dataFilePath = dataFilePaths.get(i);
+        try (DataOutputStream dataOutputStream = _fileSystem.create(new Path(stagingInputDir, Integer.toString(i)))) {
+          dataOutputStream.write(StringUtil.encodeUtf8(dataFilePath.toString() + " " + i));
+          dataOutputStream.flush();
+        }
+      }
     }
-    fs.mkdirs(stagingDir);
 
-    Path stagingDirInputPath = new Path(_stagingDir + "/input/");
-    fs.mkdirs(stagingDirInputPath);
-    LOGGER.info("Staging dir input path is {}", stagingDirInputPath);
+    // Set up the job
+    Job job = Job.getInstance(_conf);
 
-    if (_defaultPermissionsMask != null) {
-      FsPermission umask = new FsPermission(_defaultPermissionsMask);
-      FsPermission permission = FsPermission.getDirDefault().applyUMask(umask);
-
-      setDirPermission(stagingDir, permission, fs);
-      setDirPermission(stagingDirInputPath, permission, fs);
-      setDirPermission(outputDir, permission, fs);
+    Configuration jobConf = job.getConfiguration();
+    String hadoopTokenFileLocation = System.getenv("HADOOP_TOKEN_FILE_LOCATION");
+    if (hadoopTokenFileLocation != null) {
+      jobConf.set("mapreduce.job.credentials.binary", hadoopTokenFileLocation);
     }
+    jobConf.setInt(JobContext.NUM_MAPS, numDataFiles);
 
-    List<FileStatus> inputDataFiles = new ArrayList<FileStatus>();
-    FileStatus[] fileStatusArr = fs.globStatus(inputPathPattern);
-    for (FileStatus fileStatus : fileStatusArr) {
-      inputDataFiles.addAll(getDataFilesFromPath(fs, fileStatus.getPath()));
-    }
-    if (inputDataFiles.isEmpty()) {
-      LOGGER.error("No Input paths {}", inputPathPattern);
-      throw new RuntimeException("No input files " + inputPathPattern);
+    // Set table config and schema
+    TableConfig tableConfig = getTableConfig();
+    if (tableConfig != null) {
+      validateTableConfig(tableConfig);
+      jobConf.set(JobConfigConstants.TABLE_CONFIG, tableConfig.toJSONConfigString());
     }
+    jobConf.set(JobConfigConstants.SCHEMA, getSchema().toSingleLineJsonString());
 
-    for (int seqId = 0; seqId < inputDataFiles.size(); ++seqId) {
-      FileStatus file = inputDataFiles.get(seqId);
-      String completeFilePath = " " + file.getPath().toString() + " " + seqId;
-      Path newOutPutFile = new Path(
-          (_stagingDir + "/input/" + file.getPath().toString().replace('.', '_').replace('/', '_').replace(':', '_')
-              + ".txt"));
-      FSDataOutputStream stream = fs.create(newOutPutFile);
-      stream.writeUTF(completeFilePath);
-      stream.flush();
-      stream.close();
+    // Set additional configurations
+    for (Map.Entry<Object, Object> entry : _properties.entrySet()) {
+      jobConf.set(entry.getKey().toString(), entry.getValue().toString());
     }
 
-    Job job = Job.getInstance(getConf());
-
-    setAdditionalJobProperties(job);
-
-    job.setJarByClass(SegmentCreationJob.class);
-    job.setJobName(_jobName);
-
-    setMapperClass(job);
-
-    if (System.getenv("HADOOP_TOKEN_FILE_LOCATION") != null) {
-      job.getConfiguration().set("mapreduce.job.credentials.binary", System.getenv("HADOOP_TOKEN_FILE_LOCATION"));
-    }
+    job.setMapperClass(getMapperClass());
+    job.setNumReduceTasks(0);
 
     job.setInputFormatClass(TextInputFormat.class);
     job.setOutputFormatClass(TextOutputFormat.class);
@@ -205,158 +169,129 @@ public class SegmentCreationJob extends Configured {
     job.setMapOutputKeyClass(LongWritable.class);
     job.setMapOutputValueClass(Text.class);
 
-    FileInputFormat.addInputPath(job, new Path(_stagingDir + "/input/"));
-    FileOutputFormat.setOutputPath(job, new Path(_stagingDir + "/output/"));
+    FileInputFormat.addInputPath(job, stagingInputDir);
+    FileOutputFormat.setOutputPath(job, new Path(_stagingDir, "output"));
 
-    job.getConfiguration().setInt(JobContext.NUM_MAPS, inputDataFiles.size());
-    if (_dataSchema != null) {
-      job.getConfiguration().set(JobConfigConstants.SCHEMA, _dataSchema.toSingleLineJsonString());
-    }
-    setOutputPath(job.getConfiguration());
-
-    job.setNumReduceTasks(0);
-    for (Object key : _properties.keySet()) {
-      job.getConfiguration().set(key.toString(), _properties.getProperty(key.toString()));
-    }
+    addDepsJarToDistributedCache(job);
+    addAdditionalJobProperties(job);
 
-    if (_depsJarPath != null && _depsJarPath.length() > 0) {
-      addDepsJarToDistributedCache(new Path(_depsJarPath), job);
-    }
-
-    // Submit the job for execution.
+    // Submit the job
     job.waitForCompletion(true);
     if (!job.isSuccessful()) {
-      throw new RuntimeException("Job failed : " + job);
+      throw new RuntimeException("Job failed: " + job);
     }
 
-    moveToOutputDirectory(fs);
+    moveSegmentsToOutputDir();
 
-    // Delete temporary directory.
-    LOGGER.info("Cleanup the working directory.");
-    LOGGER.info("Deleting the dir: {}", _stagingDir);
-    fs.delete(new Path(_stagingDir), true);
+    // Delete the staging directory
+    _logger.info("Deleting the staging directory: {}", _stagingDir);
+    _fileSystem.delete(_stagingDir, true);
   }
 
-  private void setDirPermission(Path directory, FsPermission permission, FileSystem fs)
+  protected void mkdirs(Path dirPath)
       throws IOException {
-    fs.setPermission(directory, permission);
-    LOGGER.info("Setting permissions '{}' for directory: '{}'", permission, directory);
-  }
-
-  protected void setAdditionalJobProperties(Job job)
-      throws Exception {
-    // Check host and port information before set table config dependent properties
-    if (_hosts == null || _port == 0) {
-      LOGGER.warn("Unable to set TableConfig-dependent properties. Please set host {} ({}) and port {} ({})",
-          JobConfigConstants.PUSH_TO_HOSTS, _port, JobConfigConstants.PUSH_TO_PORT, _port);
-      return;
-    }
-
-    // Add push locations
-    List<PushLocation> pushLocations = new ArrayList<>();
-    for (String host : _hosts) {
-      pushLocations.add(new PushLocation.PushLocationBuilder().setHost(host).setPort(_port).build());
+    if (_fileSystem.exists(dirPath)) {
+      _logger.warn("Deleting existing file: {}", dirPath);
+      _fileSystem.delete(dirPath, true);
     }
+    _logger.info("Making directory: {}", dirPath);
+    _fileSystem.mkdirs(dirPath);
+    setDirPermission(dirPath);
+  }
 
-    ControllerRestApi controllerRestApiObject = new ControllerRestApi(pushLocations, _tableName);
-
-    // Fetch table config from controller API
-    TableConfig tableConfig = controllerRestApiObject.getTableConfig();
-    job.getConfiguration().set(JobConfigConstants.TABLE_CONFIG, tableConfig.toJSONConfigString());
-
-    SegmentsValidationAndRetentionConfig validationConfig = tableConfig.getValidationConfig();
-    if (validationConfig == null) {
-      throw new RuntimeException(
-          "Segment validation config should not be null. Please configure them correctly in the table config");
+  protected void setDirPermission(Path dirPath)
+      throws IOException {
+    if (_defaultPermissionsMask != null) {
+      FsPermission permission = FsPermission.getDirDefault().applyUMask(new FsPermission(_defaultPermissionsMask));
+      _logger.info("Setting permission: {} to directory: {}", permission, dirPath);
+      _fileSystem.setPermission(dirPath, permission);
     }
+  }
 
-    // Check if pushType is set correctly
-    String segmentPushType = validationConfig.getSegmentPushType();
-    if (segmentPushType == null) {
-      throw new RuntimeException("Segment push type is null. Please configure the value correctly in the table config. "
-          + "We support APPEND or REFRESH for push types.");
+  @Nullable
+  protected TableConfig getTableConfig()
+      throws IOException {
+    if (_pushLocations != null) {
+      try (ControllerRestApi controllerRestApi = getControllerRestApi()) {
+        return controllerRestApi.getTableConfig();
+      }
+    } else {
+      return null;
     }
+  }
 
-    // Update table push type
-    job.getConfiguration().set(JobConfigConstants.TABLE_PUSH_TYPE, segmentPushType);
-
-    if (segmentPushType.equalsIgnoreCase(APPEND)) {
-      // For append use cases, timeColumnName and timeType must be set
-      String timeColumnName = validationConfig.getTimeColumnName();
-      String timeColumnType = validationConfig.getTimeType();
-      if (timeColumnName == null || timeColumnType == null) {
-        throw new RuntimeException("Time column or time column type is null. Both are required for APPEND use case. "
-            + "Please configure them correctly in the table config.");
+  protected Schema getSchema()
+      throws IOException {
+    if (_pushLocations != null) {
+      try (ControllerRestApi controllerRestApi = getControllerRestApi()) {
+        return controllerRestApi.getSchema();
       }
-      LOGGER.info("Time column: {}, time column type: {}", timeColumnName, timeColumnType);
-      job.getConfiguration().set(JobConfigConstants.TIME_COLUMN_NAME, timeColumnName);
-      job.getConfiguration().set(JobConfigConstants.TIME_COLUMN_TYPE, timeColumnType);
     } else {
-      LOGGER.info("Refresh use case. Not setting timeColumnName and timeColumnType for table: " + _tableName);
+      try (InputStream inputStream = _fileSystem.open(_schemaFile)) {
+        return Schema.fromInputSteam(inputStream);
+      }
     }
+  }
 
-    // Fetch schema from controller API and set it to the job configuration
-    String schema = controllerRestApiObject.getSchema();
-    LOGGER.info("Setting schema for tableName {} to {}", _tableName, schema);
-    job.getConfiguration().set(JobConfigConstants.SCHEMA, schema);
+  protected DefaultControllerRestApi getControllerRestApi() {
+    return new DefaultControllerRestApi(_pushLocations, _rawTableName);
   }
 
-  protected void moveToOutputDirectory(FileSystem fs)
-      throws Exception {
-    LOGGER.info("Moving Segment Tar files from {} to: {}", _stagingDir + "/output/segmentTar", _outputDir);
-    FileStatus[] segmentArr = fs.listStatus(new Path(_stagingDir + "/output/segmentTar"));
-    for (FileStatus segment : segmentArr) {
-      fs.rename(segment.getPath(), new Path(_outputDir, segment.getPath().getName()));
+  protected void validateTableConfig(TableConfig tableConfig) {
+    SegmentsValidationAndRetentionConfig validationConfig = tableConfig.getValidationConfig();
+
+    // For APPEND use case, timeColumnName and timeType must be set
+    if (APPEND.equalsIgnoreCase(validationConfig.getSegmentPushType())) {
+      Preconditions.checkState(validationConfig.getTimeColumnName() != null && validationConfig.getTimeType() != null,
+          "For APPEND use case, time column and type must be set");
     }
   }
 
-  protected Job setMapperClass(Job job) {
-    job.setMapperClass(HadoopSegmentCreationMapReduceJob.HadoopSegmentCreationMapper.class);
-    return job;
+  /**
+   * Can be override to plug in custom mapper.
+   */
+  protected Class<? extends Mapper<LongWritable, Text, LongWritable, Text>> getMapperClass() {
+    return SegmentCreationMapper.class;
+  }
+
+  protected void addDepsJarToDistributedCache(Job job)
+      throws IOException {
+    if (_depsJarDir != null) {
+      addDepsJarToDistributedCacheHelper(job, _depsJarDir);
+    }
   }
 
-  private void addDepsJarToDistributedCache(Path path, Job job)
+  protected void addDepsJarToDistributedCacheHelper(Job job, Path depsJarDir)
       throws IOException {
-    LOGGER.info("Trying to add all the deps jar files from directory: {}", path);
-    FileSystem fs = FileSystem.get(getConf());
-    FileStatus[] fileStatusArr = fs.listStatus(path);
-    for (FileStatus fileStatus : fileStatusArr) {
+    FileStatus[] fileStatuses = _fileSystem.listStatus(depsJarDir);
+    for (FileStatus fileStatus : fileStatuses) {
       if (fileStatus.isDirectory()) {
-        addDepsJarToDistributedCache(fileStatus.getPath(), job);
+        addDepsJarToDistributedCacheHelper(job, fileStatus.getPath());
       } else {
         Path depJarPath = fileStatus.getPath();
         if (depJarPath.getName().endsWith(".jar")) {
-          LOGGER.info("Adding deps jar files: {}", path);
-          job.addCacheArchive(path.toUri());
+          _logger.info("Adding deps jar: {} to distributed cache", depJarPath);
+          job.addCacheArchive(depJarPath.toUri());
         }
       }
     }
   }
 
-  private ArrayList<FileStatus> getDataFilesFromPath(FileSystem fs, Path inBaseDir)
+  /**
+   * Can be override to set additional job properties.
+   */
+  @SuppressWarnings("unused")
+  protected void addAdditionalJobProperties(Job job) {
+  }
+
+  protected void moveSegmentsToOutputDir()
       throws IOException {
-    ArrayList<FileStatus> dataFileStatusList = new ArrayList<FileStatus>();
-    FileStatus[] fileStatusArr = fs.listStatus(inBaseDir);
-    for (FileStatus fileStatus : fileStatusArr) {
-      if (fileStatus.isDirectory()) {
-        LOGGER.info("Trying to add all the data files from directory: {}", fileStatus.getPath());
-        dataFileStatusList.addAll(getDataFilesFromPath(fs, fileStatus.getPath()));
-      } else {
-        String fileName = fileStatus.getPath().getName();
-        if (fileName.endsWith(".avro")) {
-          LOGGER.info("Adding avro files: {}", fileStatus.getPath());
-          dataFileStatusList.add(fileStatus);
-        }
-        if (fileName.endsWith(".csv")) {
-          LOGGER.info("Adding csv files: {}", fileStatus.getPath());
-          dataFileStatusList.add(fileStatus);
-        }
-        if (fileName.endsWith(".json")) {
-          LOGGER.info("Adding json files: {}", fileStatus.getPath());
-          dataFileStatusList.add(fileStatus);
-        }
-      }
+    Path segmentTarDir = new Path(new Path(_stagingDir, "output"), JobConfigConstants.SEGMENT_TAR_DIR);
+    for (FileStatus segmentTarStatus : _fileSystem.listStatus(segmentTarDir)) {
+      Path segmentTarPath = segmentTarStatus.getPath();
+      Path dest = new Path(_outputDir, segmentTarPath.getName());
+      _logger.info("Moving segment tar file from: {} to: {}", segmentTarPath, dest);
+      _fileSystem.rename(segmentTarPath, dest);
     }
-    return dataFileStatusList;
   }
 }
diff --git a/pinot-hadoop/src/main/java/org/apache/pinot/hadoop/job/SegmentTarPushJob.java b/pinot-hadoop/src/main/java/org/apache/pinot/hadoop/job/SegmentTarPushJob.java
index f4890ab..7c71fd8 100644
--- a/pinot-hadoop/src/main/java/org/apache/pinot/hadoop/job/SegmentTarPushJob.java
+++ b/pinot-hadoop/src/main/java/org/apache/pinot/hadoop/job/SegmentTarPushJob.java
@@ -18,82 +18,41 @@
  */
 package org.apache.pinot.hadoop.job;
 
-import java.io.InputStream;
+import com.google.common.base.Preconditions;
+import java.util.List;
 import java.util.Properties;
-import org.apache.hadoop.conf.Configuration;
-import org.apache.hadoop.conf.Configured;
-import org.apache.hadoop.fs.FileStatus;
+import org.apache.commons.lang3.StringUtils;
 import org.apache.hadoop.fs.FileSystem;
 import org.apache.hadoop.fs.Path;
-import org.apache.pinot.common.utils.FileUploadDownloadClient;
-import org.apache.pinot.common.utils.SimpleHttpResponse;
-import org.slf4j.Logger;
-import org.slf4j.LoggerFactory;
+import org.apache.pinot.hadoop.utils.PushLocation;
 
 
-public class SegmentTarPushJob extends Configured {
+public class SegmentTarPushJob extends BaseSegmentJob {
+  private final Path _segmentPattern;
+  private final List<PushLocation> _pushLocations;
 
-  private String _segmentPath;
-  private String[] _hosts;
-  private int _port;
-
-  private static final Logger LOGGER = LoggerFactory.getLogger(SegmentTarPushJob.class);
-
-  public SegmentTarPushJob(String name, Properties properties) {
-    super(new Configuration());
-    _segmentPath = properties.getProperty(JobConfigConstants.PATH_TO_OUTPUT) + "/";
-    _hosts = properties.getProperty(JobConfigConstants.PUSH_TO_HOSTS).split(",");
-    _port = Integer.parseInt(properties.getProperty(JobConfigConstants.PUSH_TO_PORT));
+  public SegmentTarPushJob(Properties properties) {
+    super(properties);
+    _segmentPattern = Preconditions.checkNotNull(getPathFromProperty(JobConfigConstants.PATH_TO_OUTPUT));
+    String[] hosts = StringUtils.split(properties.getProperty(JobConfigConstants.PUSH_TO_HOSTS), ',');
+    int port = Integer.parseInt(properties.getProperty(JobConfigConstants.PUSH_TO_PORT));
+    _pushLocations = PushLocation.getPushLocations(hosts, port);
   }
 
-  public void run()
-      throws Exception {
-    Configuration conf = new Configuration();
-    FileSystem fs = FileSystem.get(conf);
-    Path path = new Path(_segmentPath);
-    FileStatus[] fileStatusArr = fs.globStatus(path);
-    for (FileStatus fileStatus : fileStatusArr) {
-      if (fileStatus.isDirectory()) {
-        pushDir(fs, fileStatus.getPath());
-      } else {
-        pushOneTarFile(fs, fileStatus.getPath());
-      }
-    }
+  @Override
+  protected boolean isDataFile(String fileName) {
+    return fileName.endsWith(JobConfigConstants.TAR_GZ_FILE_EXT);
   }
 
-  public void pushDir(FileSystem fs, Path path)
+  public void run()
       throws Exception {
-    LOGGER.info("******** Now uploading segments tar from dir: {}", path);
-    FileStatus[] fileStatusArr = fs.listStatus(new Path(path.toString() + "/"));
-    for (FileStatus fileStatus : fileStatusArr) {
-      if (fileStatus.isDirectory()) {
-        pushDir(fs, fileStatus.getPath());
-      } else {
-        pushOneTarFile(fs, fileStatus.getPath());
-      }
+    FileSystem fileSystem = FileSystem.get(_conf);
+    try (ControllerRestApi controllerRestApi = getControllerRestApi()) {
+      controllerRestApi.pushSegments(fileSystem, getDataFilePaths(_segmentPattern));
     }
   }
 
-  public void pushOneTarFile(FileSystem fs, Path path)
-      throws Exception {
-    String fileName = path.getName();
-    if (!fileName.endsWith(".tar.gz")) {
-      return;
-    }
-    try (FileUploadDownloadClient fileUploadDownloadClient = new FileUploadDownloadClient()) {
-      for (String host : _hosts) {
-        try (InputStream inputStream = fs.open(path)) {
-          fileName = fileName.split(JobConfigConstants.TARGZ)[0];
-          LOGGER.info("******** Uploading file: {} to Host: {} and Port: {} *******", fileName, host, _port);
-          SimpleHttpResponse response = fileUploadDownloadClient
-              .uploadSegment(FileUploadDownloadClient.getUploadSegmentHttpURI(host, _port), fileName, inputStream);
-          LOGGER.info("Response {}: {}", response.getStatusCode(), response.getResponse());
-        } catch (Exception e) {
-          LOGGER.error("******** Error Uploading file: {} to Host: {} and Port: {}  *******", fileName, host, _port);
-          LOGGER.error("Caught exception during upload", e);
-          throw e;
-        }
-      }
-    }
+  protected ControllerRestApi getControllerRestApi() {
+    return new DefaultControllerRestApi(_pushLocations, null);
   }
 }
diff --git a/pinot-hadoop/src/main/java/org/apache/pinot/hadoop/job/SegmentUriPushJob.java b/pinot-hadoop/src/main/java/org/apache/pinot/hadoop/job/SegmentUriPushJob.java
index 59c66b7..aa81ba8 100644
--- a/pinot-hadoop/src/main/java/org/apache/pinot/hadoop/job/SegmentUriPushJob.java
+++ b/pinot-hadoop/src/main/java/org/apache/pinot/hadoop/job/SegmentUriPushJob.java
@@ -18,87 +18,49 @@
  */
 package org.apache.pinot.hadoop.job;
 
+import com.google.common.base.Preconditions;
+import java.util.ArrayList;
+import java.util.List;
 import java.util.Properties;
-import org.apache.hadoop.conf.Configuration;
-import org.apache.hadoop.conf.Configured;
-import org.apache.hadoop.fs.FileStatus;
-import org.apache.hadoop.fs.FileSystem;
+import org.apache.commons.lang3.StringUtils;
 import org.apache.hadoop.fs.Path;
-import org.apache.pinot.common.utils.FileUploadDownloadClient;
-import org.apache.pinot.common.utils.SimpleHttpResponse;
-import org.slf4j.Logger;
-import org.slf4j.LoggerFactory;
+import org.apache.pinot.hadoop.utils.PushLocation;
 
 
-public class SegmentUriPushJob extends Configured {
+public class SegmentUriPushJob extends BaseSegmentJob {
+  private final String _segmentUriPrefix;
+  private final String _segmentUriSuffix;
+  private final Path _segmentPattern;
+  private final List<PushLocation> _pushLocations;
 
-  private String _pushUriPrefix;
-  private String _pushUriSuffix;
-  private String _segmentPath;
-  private String[] _hosts;
-  private int _port;
-
-  private static final Logger LOGGER = LoggerFactory.getLogger(SegmentUriPushJob.class);
-
-  public SegmentUriPushJob(String name, Properties properties) {
-    super(new Configuration());
-    _pushUriPrefix = properties.getProperty("uri.prefix", "");
-    _pushUriSuffix = properties.getProperty("uri.suffix", "");
-    _segmentPath = properties.getProperty(JobConfigConstants.PATH_TO_OUTPUT) + "/";
-    _hosts = properties.getProperty(JobConfigConstants.PUSH_TO_HOSTS).split(",");
-    _port = Integer.parseInt(properties.getProperty(JobConfigConstants.PUSH_TO_PORT));
+  public SegmentUriPushJob(Properties properties) {
+    super(properties);
+    _segmentUriPrefix = properties.getProperty("uri.prefix", "");
+    _segmentUriSuffix = properties.getProperty("uri.suffix", "");
+    _segmentPattern = Preconditions.checkNotNull(getPathFromProperty(JobConfigConstants.PATH_TO_OUTPUT));
+    String[] hosts = StringUtils.split(properties.getProperty(JobConfigConstants.PUSH_TO_HOSTS), ',');
+    int port = Integer.parseInt(properties.getProperty(JobConfigConstants.PUSH_TO_PORT));
+    _pushLocations = PushLocation.getPushLocations(hosts, port);
   }
 
-  public void run()
-      throws Exception {
-    Configuration conf = new Configuration();
-    FileSystem fs = FileSystem.get(conf);
-    Path path = new Path(_segmentPath);
-    FileStatus[] fileStatusArr = fs.globStatus(path);
-    for (FileStatus fileStatus : fileStatusArr) {
-      if (fileStatus.isDirectory()) {
-        pushDir(fs, fileStatus.getPath());
-      } else {
-        pushOneTarFile(fs, fileStatus.getPath());
-      }
-    }
+  @Override
+  protected boolean isDataFile(String fileName) {
+    return fileName.endsWith(JobConfigConstants.TAR_GZ_FILE_EXT);
   }
 
-  public void pushDir(FileSystem fs, Path path)
+  public void run()
       throws Exception {
-    LOGGER.info("******** Now uploading segments tar from dir: {}", path);
-    FileStatus[] fileStatusArr = fs.listStatus(new Path(path.toString() + "/"));
-    for (FileStatus fileStatus : fileStatusArr) {
-      if (fileStatus.isDirectory()) {
-        pushDir(fs, fileStatus.getPath());
-      } else {
-        pushOneTarFile(fs, fileStatus.getPath());
+    try (ControllerRestApi controllerRestApi = getControllerRestApi()) {
+      List<Path> tarFilePaths = getDataFilePaths(_segmentPattern);
+      List<String> segmentUris = new ArrayList<>(tarFilePaths.size());
+      for (Path tarFilePath : tarFilePaths) {
+        segmentUris.add(_segmentUriPrefix + tarFilePath.toUri().getRawPath() + _segmentUriSuffix);
       }
+      controllerRestApi.sendSegmentUris(segmentUris);
     }
   }
 
-  public void pushOneTarFile(FileSystem fs, Path path)
-      throws Exception {
-    String fileName = path.getName();
-    if (!fileName.endsWith(JobConfigConstants.TARGZ)) {
-      return;
-    }
-    try (FileUploadDownloadClient fileUploadDownloadClient = new FileUploadDownloadClient()) {
-      for (String host : _hosts) {
-        String uri = String.format("%s%s%s", _pushUriPrefix, path.toUri().getRawPath(), _pushUriSuffix);
-        LOGGER
-            .info("******** Uploading file: {} to Host: {} and Port: {} with download uri: {} *******", fileName, host,
-                _port, uri);
-        try {
-          SimpleHttpResponse response = fileUploadDownloadClient
-              .sendSegmentUri(FileUploadDownloadClient.getUploadSegmentHttpURI(host, _port), uri);
-          LOGGER.info("Response {}: {}", response.getStatusCode(), response.getResponse());
-        } catch (Exception e) {
-          LOGGER.error("******** Error Uploading file: {} to Host: {} and Port: {}  *******", fileName, host, _port);
-          LOGGER.error("Caught exception during upload", e);
-          throw e;
-        }
-      }
-    }
+  protected ControllerRestApi getControllerRestApi() {
+    return new DefaultControllerRestApi(_pushLocations, null);
   }
 }
diff --git a/pinot-hadoop/src/main/java/org/apache/pinot/hadoop/job/mapper/HadoopSegmentCreationMapReduceJob.java b/pinot-hadoop/src/main/java/org/apache/pinot/hadoop/job/mapper/HadoopSegmentCreationMapReduceJob.java
deleted file mode 100644
index 128d844..0000000
--- a/pinot-hadoop/src/main/java/org/apache/pinot/hadoop/job/mapper/HadoopSegmentCreationMapReduceJob.java
+++ /dev/null
@@ -1,333 +0,0 @@
-/**
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *   http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing,
- * software distributed under the License is distributed on an
- * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
- * KIND, either express or implied.  See the License for the
- * specific language governing permissions and limitations
- * under the License.
- */
-package org.apache.pinot.hadoop.job.mapper;
-
-import java.io.File;
-import java.io.IOException;
-import java.io.InputStream;
-import org.apache.commons.io.FileUtils;
-import org.apache.hadoop.conf.Configuration;
-import org.apache.hadoop.fs.FileSystem;
-import org.apache.hadoop.fs.Path;
-import org.apache.hadoop.io.LongWritable;
-import org.apache.hadoop.io.Text;
-import org.apache.hadoop.mapreduce.Mapper;
-import org.apache.hadoop.mapreduce.lib.output.FileOutputFormat;
-import org.apache.pinot.common.config.TableConfig;
-import org.apache.pinot.common.data.Schema;
-import org.apache.pinot.common.utils.DataSize;
-import org.apache.pinot.common.utils.JsonUtils;
-import org.apache.pinot.common.utils.TarGzCompressionUtils;
-import org.apache.pinot.core.data.readers.CSVRecordReaderConfig;
-import org.apache.pinot.core.data.readers.FileFormat;
-import org.apache.pinot.core.data.readers.RecordReaderConfig;
-import org.apache.pinot.core.data.readers.ThriftRecordReaderConfig;
-import org.apache.pinot.core.indexsegment.generator.SegmentGeneratorConfig;
-import org.apache.pinot.core.segment.creator.impl.SegmentIndexCreationDriverImpl;
-import org.apache.pinot.hadoop.job.JobConfigConstants;
-import org.slf4j.Logger;
-import org.slf4j.LoggerFactory;
-
-
-public class HadoopSegmentCreationMapReduceJob {
-
-  public static class HadoopSegmentCreationMapper extends Mapper<LongWritable, Text, LongWritable, Text> {
-    private static Logger LOGGER = LoggerFactory.getLogger(HadoopSegmentCreationMapper.class);
-
-    private static final String PINOT_HADOOP_TMP = "pinot_hadoop_tmp";
-    private static final String SEGMENT_NAME_POSTFIX = "segment.name.postfix";
-    private static final String SEGMENT_TAR = "/segmentTar";
-
-    private Configuration _properties;
-
-    private String _inputFilePath;
-    private String _outputPath;
-    private String _tableName;
-    private String _postfix;
-    private String _readerConfigFile;
-
-    // Temporary local disk path for current working directory
-    private String _currentDiskWorkDir;
-
-    // Temporary hdfs path for segment tar file
-    private String _localHdfsSegmentTarPath;
-
-    // Temporary local disk path for segment tar file
-    private String _localDiskSegmentTarPath;
-
-    // Temporary local disk path for output segment directory
-    private String _localDiskOutputSegmentDir;
-
-    private TableConfig _tableConfig = null;
-
-    private FileSystem _fileSystem = null;
-
-    @Override
-    public void setup(Context context)
-        throws IOException, InterruptedException {
-      // Compute current working HDFS directory
-      Path currentHdfsWorkDir = FileOutputFormat.getWorkOutputPath(context);
-      _localHdfsSegmentTarPath = currentHdfsWorkDir + SEGMENT_TAR;
-
-      // Compute current working LOCAL DISK directory
-      _currentDiskWorkDir = PINOT_HADOOP_TMP;
-      _localDiskSegmentTarPath = _currentDiskWorkDir + SEGMENT_TAR;
-
-      _fileSystem = FileSystem.get(context.getConfiguration());
-
-      // Create directory
-      new File(_localDiskSegmentTarPath).mkdirs();
-
-      LOGGER.info("*********************************************************************");
-      LOGGER.info("Configurations : {}", context.getConfiguration().toString());
-      LOGGER.info("*********************************************************************");
-      LOGGER.info("Current HDFS working dir(setup) : {}", currentHdfsWorkDir);
-      LOGGER.info("Current DISK working dir(setup) : {}", new File(_currentDiskWorkDir).getAbsolutePath());
-      LOGGER.info("*********************************************************************");
-
-      _properties = context.getConfiguration();
-      _outputPath = _properties.get(JobConfigConstants.PATH_TO_OUTPUT);
-      _tableName = _properties.get(JobConfigConstants.SEGMENT_TABLE_NAME);
-      _postfix = _properties.get(SEGMENT_NAME_POSTFIX, null);
-      _readerConfigFile = _properties.get(JobConfigConstants.PATH_TO_READER_CONFIG);
-      if (_outputPath == null || _tableName == null) {
-        throw new RuntimeException(
-            "Missing configs: " + "\n\toutputPath: " + _properties.get(JobConfigConstants.PATH_TO_OUTPUT)
-                + "\n\ttableName: " + _properties.get(JobConfigConstants.SEGMENT_TABLE_NAME));
-      }
-
-      String tableConfigString = _properties.get(JobConfigConstants.TABLE_CONFIG);
-      if (tableConfigString != null) {
-        try {
-          _tableConfig = TableConfig.fromJsonString(tableConfigString);
-        } catch (IOException e) {
-          // Though we get table config directly from the controller of hosts and port of push location are set,
-          // it is possible for the user to pass in a table config as a parameter
-          LOGGER.error("Exception when parsing table config: {}", tableConfigString);
-        }
-      }
-    }
-
-    protected String getTableName() {
-      return _tableName;
-    }
-
-    @Override
-    public void cleanup(Context context)
-        throws IOException, InterruptedException {
-      File currentDiskWorkDir = new File(_currentDiskWorkDir);
-      LOGGER.info("Clean up directory: {}", currentDiskWorkDir.getAbsolutePath());
-      FileUtils.deleteQuietly(currentDiskWorkDir);
-    }
-
-    @Override
-    protected void map(LongWritable key, Text value, Context context)
-        throws IOException, InterruptedException {
-      String line = value.toString();
-      String[] lineSplits = line.split(" ");
-
-      LOGGER.info("*********************************************************************");
-      LOGGER.info("mapper input : {}", value);
-      LOGGER.info("PATH_TO_OUTPUT : {}", _outputPath);
-      LOGGER.info("TABLE_NAME : {}", _tableName);
-      LOGGER.info("num lines : {}", lineSplits.length);
-
-      for (String split : lineSplits) {
-        LOGGER.info("Command line : {}", split);
-      }
-
-      LOGGER.info("Current DISK working dir(mapper): {}", new File(_currentDiskWorkDir).getAbsolutePath());
-      LOGGER.info("*********************************************************************");
-
-      if (lineSplits.length != 3) {
-        throw new RuntimeException("Input to the mapper is malformed, please contact the pinot team");
-      }
-      _inputFilePath = lineSplits[1].trim();
-
-      String segmentDirectoryName = _tableName + "_" + Integer.parseInt(lineSplits[2]);
-      _localDiskOutputSegmentDir = _currentDiskWorkDir + "/segments/" + segmentDirectoryName;
-
-      // To inherit from from the Hadoop Mapper class, you can't directly throw a general exception.
-      Schema schema;
-      Configuration conf = context.getConfiguration();
-      final FileSystem fs = FileSystem.get(conf);
-      final Path hdfsInputFilePath = new Path(_inputFilePath);
-
-      final File localInputDataDir = new File(_currentDiskWorkDir, "inputData");
-      try {
-        if (localInputDataDir.exists()) {
-          localInputDataDir.delete();
-        }
-        localInputDataDir.mkdir();
-
-        final Path localInputFilePath =
-            new Path(localInputDataDir.getAbsolutePath() + "/" + hdfsInputFilePath.getName());
-        LOGGER.info("Copy from " + hdfsInputFilePath + " to " + localInputFilePath);
-        fs.copyToLocalFile(hdfsInputFilePath, localInputFilePath);
-
-        String schemaString = context.getConfiguration().get("data.schema");
-        try {
-          schema = Schema.fromString(schemaString);
-        } catch (Exception e) {
-          LOGGER.error("Could not get schema from string for value: " + schemaString);
-          throw new RuntimeException(e);
-        }
-      } catch (Exception e) {
-        LOGGER.error("Could not get schema: " + e);
-        throw new RuntimeException(e);
-      }
-
-      LOGGER.info("*********************************************************************");
-      LOGGER.info("input data file path : {}", _inputFilePath);
-      LOGGER.info("local hdfs segment tar path: {}", _localHdfsSegmentTarPath);
-      LOGGER.info("local disk output segment path: {}", _localDiskOutputSegmentDir);
-      LOGGER.info("local disk segment tar path: {}", _localDiskSegmentTarPath);
-      LOGGER.info("data schema: {}", schema);
-      LOGGER.info("*********************************************************************");
-
-      try {
-        String segmentName =
-            createSegment(_inputFilePath, schema, Integer.parseInt(lineSplits[2]), hdfsInputFilePath, localInputDataDir,
-                fs);
-        LOGGER.info(segmentName);
-        LOGGER.info("Finished segment creation job successfully");
-      } catch (Exception e) {
-        LOGGER.error("Got exceptions during creating segments!", e);
-      }
-
-      context.write(new LongWritable(Long.parseLong(lineSplits[2])), new Text(
-          FileSystem.get(_properties).listStatus(new Path(_localHdfsSegmentTarPath + "/"))[0].getPath().getName()));
-      LOGGER.info("Finished the job successfully");
-    }
-
-    protected void setSegmentNameGenerator(SegmentGeneratorConfig segmentGeneratorConfig, Integer seqId,
-        Path hdfsAvroPath, File dataPath) {
-    }
-
-    protected String createSegment(String dataFilePath, Schema schema, Integer seqId, Path hdfsInputFilePath,
-        File localInputDataDir, FileSystem fs)
-        throws Exception {
-      SegmentGeneratorConfig segmentGeneratorConfig = new SegmentGeneratorConfig(_tableConfig, schema);
-
-      segmentGeneratorConfig.setTableName(_tableName);
-      setSegmentNameGenerator(segmentGeneratorConfig, seqId, hdfsInputFilePath, localInputDataDir);
-
-      String inputFilePath = new File(localInputDataDir, hdfsInputFilePath.getName()).getAbsolutePath();
-      LOGGER.info("Create segment input path: {}", inputFilePath);
-      segmentGeneratorConfig.setInputFilePath(inputFilePath);
-
-      FileFormat fileFormat = getFileFormat(dataFilePath);
-      segmentGeneratorConfig.setFormat(fileFormat);
-      segmentGeneratorConfig.setOnHeap(true);
-
-      if (null != _postfix) {
-        segmentGeneratorConfig.setSegmentNamePostfix(String.format("%s-%s", _postfix, seqId));
-      } else {
-        segmentGeneratorConfig.setSequenceId(seqId);
-      }
-      segmentGeneratorConfig.setReaderConfig(getReaderConfig(fileFormat));
-
-      segmentGeneratorConfig.setOutDir(_localDiskOutputSegmentDir);
-
-      // Add the current java package version to the segment metadata
-      // properties file.
-      Package objPackage = this.getClass().getPackage();
-      if (null != objPackage) {
-        String packageVersion = objPackage.getSpecificationVersion();
-        if (null != packageVersion) {
-          LOGGER.info("Pinot Hadoop Package version {}", packageVersion);
-          segmentGeneratorConfig.setCreatorVersion(packageVersion);
-        }
-      }
-
-      SegmentIndexCreationDriverImpl driver = new SegmentIndexCreationDriverImpl();
-      driver.init(segmentGeneratorConfig);
-      driver.build();
-
-      // Tar the segment directory into file.
-      String segmentName = driver.getSegmentName();
-
-      File localDiskOutputSegmentDir = new File(_localDiskOutputSegmentDir, segmentName);
-      String localDiskOutputSegmentDirAbsolutePath = localDiskOutputSegmentDir.getAbsolutePath();
-      String localDiskSegmentTarFileAbsolutePath =
-          new File(_localDiskSegmentTarPath).getAbsolutePath() + "/" + segmentName + JobConfigConstants.TARGZ;
-
-      LOGGER.info("Trying to tar the segment to: {}", localDiskSegmentTarFileAbsolutePath);
-      TarGzCompressionUtils
-          .createTarGzOfDirectory(localDiskOutputSegmentDirAbsolutePath, localDiskSegmentTarFileAbsolutePath);
-      String hdfsSegmentTarFilePath = _localHdfsSegmentTarPath + "/" + segmentName + JobConfigConstants.TARGZ;
-
-      // Log segment size.
-      long uncompressedSegmentSize = FileUtils.sizeOfDirectory(localDiskOutputSegmentDir);
-      long compressedSegmentSize = new File(localDiskSegmentTarFileAbsolutePath).length();
-      LOGGER.info(String.format("Segment %s uncompressed size: %s, compressed size: %s", segmentName,
-          DataSize.fromBytes(uncompressedSegmentSize), DataSize.fromBytes(compressedSegmentSize)));
-
-      LOGGER.info("*********************************************************************");
-      LOGGER.info("Copy from : {} to {}", localDiskSegmentTarFileAbsolutePath, hdfsSegmentTarFilePath);
-      LOGGER.info("*********************************************************************");
-      fs.copyFromLocalFile(true, true, new Path(localDiskSegmentTarFileAbsolutePath), new Path(hdfsSegmentTarFilePath));
-      return segmentName;
-    }
-
-    private RecordReaderConfig getReaderConfig(FileFormat fileFormat)
-        throws IOException {
-      RecordReaderConfig readerConfig = null;
-      switch (fileFormat) {
-        case CSV:
-          if (_readerConfigFile == null) {
-            readerConfig = new CSVRecordReaderConfig();
-          } else {
-            LOGGER.info("Reading CSV Record Reader Config from: {}", _readerConfigFile);
-            Path readerConfigPath = new Path(_readerConfigFile);
-            try (InputStream inputStream = _fileSystem.open(readerConfigPath)) {
-              readerConfig = JsonUtils.inputStreamToObject(inputStream, CSVRecordReaderConfig.class);
-            }
-            LOGGER.info("CSV Record Reader Config: {}", readerConfig.toString());
-          }
-          break;
-        case AVRO:
-          break;
-        case JSON:
-          break;
-        case THRIFT:
-          readerConfig = new ThriftRecordReaderConfig();
-        default:
-          break;
-      }
-      return readerConfig;
-    }
-
-    private FileFormat getFileFormat(String dataFilePath) {
-      if (dataFilePath.endsWith(".json")) {
-        return FileFormat.JSON;
-      }
-      if (dataFilePath.endsWith(".csv")) {
-        return FileFormat.CSV;
-      }
-      if (dataFilePath.endsWith(".avro")) {
-        return FileFormat.AVRO;
-      }
-      if (dataFilePath.endsWith(".thrift")) {
-        return FileFormat.THRIFT;
-      }
-      throw new RuntimeException("Not support file format - " + dataFilePath);
-    }
-  }
-}
diff --git a/pinot-hadoop/src/main/java/org/apache/pinot/hadoop/job/mapper/SegmentCreationMapper.java b/pinot-hadoop/src/main/java/org/apache/pinot/hadoop/job/mapper/SegmentCreationMapper.java
new file mode 100644
index 0000000..e8a122d
--- /dev/null
+++ b/pinot-hadoop/src/main/java/org/apache/pinot/hadoop/job/mapper/SegmentCreationMapper.java
@@ -0,0 +1,248 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.pinot.hadoop.job.mapper;
+
+import com.google.common.base.Preconditions;
+import java.io.File;
+import java.io.IOException;
+import java.io.InputStream;
+import javax.annotation.Nullable;
+import org.apache.commons.io.FileUtils;
+import org.apache.commons.lang3.StringUtils;
+import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.fs.FileSystem;
+import org.apache.hadoop.fs.Path;
+import org.apache.hadoop.io.LongWritable;
+import org.apache.hadoop.io.Text;
+import org.apache.hadoop.mapreduce.Mapper;
+import org.apache.hadoop.mapreduce.lib.output.FileOutputFormat;
+import org.apache.pinot.common.config.TableConfig;
+import org.apache.pinot.common.data.Schema;
+import org.apache.pinot.common.utils.DataSize;
+import org.apache.pinot.common.utils.JsonUtils;
+import org.apache.pinot.common.utils.TarGzCompressionUtils;
+import org.apache.pinot.core.data.readers.CSVRecordReaderConfig;
+import org.apache.pinot.core.data.readers.FileFormat;
+import org.apache.pinot.core.data.readers.RecordReaderConfig;
+import org.apache.pinot.core.data.readers.ThriftRecordReaderConfig;
+import org.apache.pinot.core.indexsegment.generator.SegmentGeneratorConfig;
+import org.apache.pinot.core.segment.creator.SegmentIndexCreationDriver;
+import org.apache.pinot.core.segment.creator.impl.SegmentIndexCreationDriverImpl;
+import org.apache.pinot.hadoop.job.JobConfigConstants;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+
+public class SegmentCreationMapper extends Mapper<LongWritable, Text, LongWritable, Text> {
+  protected static final String LOCAL_TEMP_DIR = "pinot_hadoop_tmp";
+
+  protected final Logger _logger = LoggerFactory.getLogger(getClass());
+
+  protected Configuration _jobConf;
+  protected String _rawTableName;
+  protected Schema _schema;
+
+  // Optional
+  protected TableConfig _tableConfig;
+  protected String _segmentNamePostfix;
+  protected Path _readerConfigFile;
+
+  // HDFS segment tar directory
+  protected Path _hdfsSegmentTarDir;
+
+  // Temporary local directories
+  protected File _localStagingDir;
+  protected File _localInputDir;
+  protected File _localSegmentDir;
+  protected File _localSegmentTarDir;
+
+  protected FileSystem _fileSystem;
+
+  @Override
+  public void setup(Context context)
+      throws IOException, InterruptedException {
+    _jobConf = context.getConfiguration();
+    _logger.info("*********************************************************************");
+    _logger.info("Job Configurations: {}", _jobConf);
+    _logger.info("*********************************************************************");
+
+    _rawTableName = Preconditions.checkNotNull(_jobConf.get(JobConfigConstants.SEGMENT_TABLE_NAME));
+    _schema = Schema.fromString(_jobConf.get(JobConfigConstants.SCHEMA));
+
+    // Optional
+    String tableConfigString = _jobConf.get(JobConfigConstants.TABLE_CONFIG);
+    if (tableConfigString != null) {
+      _tableConfig = TableConfig.fromJsonString(tableConfigString);
+    }
+    _segmentNamePostfix = _jobConf.get(JobConfigConstants.SEGMENT_NAME_POSTFIX);
+    String readerConfigFile = _jobConf.get(JobConfigConstants.PATH_TO_READER_CONFIG);
+    if (readerConfigFile != null) {
+      _readerConfigFile = new Path(readerConfigFile);
+    }
+
+    // Working directories
+    _hdfsSegmentTarDir = new Path(FileOutputFormat.getWorkOutputPath(context), JobConfigConstants.SEGMENT_TAR_DIR);
+    _localStagingDir = new File(LOCAL_TEMP_DIR);
+    _localInputDir = new File(_localStagingDir, "inputData");
+    _localSegmentDir = new File(_localStagingDir, "segments");
+    _localSegmentTarDir = new File(_localStagingDir, JobConfigConstants.SEGMENT_TAR_DIR);
+
+    if (_localStagingDir.exists()) {
+      _logger.warn("Deleting existing file: {}", _localStagingDir);
+      FileUtils.forceDelete(_localStagingDir);
+    }
+    _logger
+        .info("Making local temporary directories: {}, {}, {}", _localStagingDir, _localInputDir, _localSegmentTarDir);
+    Preconditions.checkState(_localStagingDir.mkdirs());
+    Preconditions.checkState(_localInputDir.mkdir());
+    Preconditions.checkState(_localSegmentDir.mkdir());
+    Preconditions.checkState(_localSegmentTarDir.mkdir());
+
+    _fileSystem = FileSystem.get(context.getConfiguration());
+
+    _logger.info("*********************************************************************");
+    _logger.info("Raw Table Name: {}", _rawTableName);
+    _logger.info("Schema: {}", _schema);
+    _logger.info("Table Config: {}", _tableConfig);
+    _logger.info("Segment Name Postfix: {}", _segmentNamePostfix);
+    _logger.info("Reader Config File: {}", _readerConfigFile);
+    _logger.info("*********************************************************************");
+    _logger.info("HDFS Segment Tar Directory: {}", _hdfsSegmentTarDir);
+    _logger.info("Local Staging Directory: {}", _localStagingDir);
+    _logger.info("Local Input Directory: {}", _localInputDir);
+    _logger.info("Local Segment Tar Directory: {}", _localSegmentTarDir);
+    _logger.info("*********************************************************************");
+  }
+
+  @Override
+  protected void map(LongWritable key, Text value, Context context)
+      throws IOException, InterruptedException {
+    String[] splits = StringUtils.split(value.toString(), ' ');
+    Preconditions.checkState(splits.length == 2, "Illegal input value: {}", value);
+
+    Path hdfsInputFile = new Path(splits[0]);
+    int sequenceId = Integer.parseInt(splits[1]);
+    _logger.info("Generating segment with HDFS input file: {}, sequence id: {}", hdfsInputFile, sequenceId);
+
+    String inputFileName = hdfsInputFile.getName();
+    File localInputFile = new File(_localInputDir, inputFileName);
+    _logger.info("Copying input file from: {} to: {}", hdfsInputFile, localInputFile);
+    _fileSystem.copyToLocalFile(hdfsInputFile, new Path(localInputFile.getAbsolutePath()));
+
+    SegmentGeneratorConfig segmentGeneratorConfig = new SegmentGeneratorConfig(_tableConfig, _schema);
+    segmentGeneratorConfig.setTableName(_rawTableName);
+    segmentGeneratorConfig.setInputFilePath(localInputFile.getPath());
+    segmentGeneratorConfig.setOutDir(_localSegmentDir.getPath());
+    segmentGeneratorConfig.setSequenceId(sequenceId);
+    if (_segmentNamePostfix != null) {
+      segmentGeneratorConfig.setSegmentNamePostfix(_segmentNamePostfix);
+    }
+    FileFormat fileFormat = getFileFormat(inputFileName);
+    segmentGeneratorConfig.setFormat(fileFormat);
+    segmentGeneratorConfig.setReaderConfig(getReaderConfig(fileFormat));
+    segmentGeneratorConfig.setOnHeap(true);
+
+    addAdditionalSegmentGeneratorConfigs(segmentGeneratorConfig, hdfsInputFile, sequenceId);
+
+    _logger.info("Start creating segment with sequence id: {}", sequenceId);
+    SegmentIndexCreationDriver driver = new SegmentIndexCreationDriverImpl();
+    try {
+      driver.init(segmentGeneratorConfig);
+      driver.build();
+    } catch (Exception e) {
+      _logger.error("Caught exception while creating segment with HDFS input file: {}, sequence id: {}", hdfsInputFile,
+          sequenceId, e);
+      throw new RuntimeException(e);
+    }
+    String segmentName = driver.getSegmentName();
+    _logger.info("Finish creating segment: {} with sequence id: {}", segmentName, sequenceId);
+
+    File localSegmentDir = new File(_localSegmentDir, segmentName);
+    String segmentTarFileName = segmentName + JobConfigConstants.TAR_GZ_FILE_EXT;
+    File localSegmentTarFile = new File(_localSegmentTarDir, segmentTarFileName);
+    _logger.info("Tarring segment from: {} to: {}", localSegmentDir, localSegmentTarFile);
+    TarGzCompressionUtils.createTarGzOfDirectory(localSegmentDir.getPath(), localSegmentTarFile.getPath());
+
+    long uncompressedSegmentSize = FileUtils.sizeOf(localSegmentDir);
+    long compressedSegmentSize = FileUtils.sizeOf(localSegmentTarFile);
+    _logger.info("Size for segment: {}, uncompressed: {}, compressed: {}", segmentName,
+        DataSize.fromBytes(uncompressedSegmentSize), DataSize.fromBytes(compressedSegmentSize));
+
+    Path hdfsSegmentTarFile = new Path(_hdfsSegmentTarDir, segmentTarFileName);
+    _logger.info("Copying segment tar file from: {} to: {}", localSegmentTarFile, hdfsSegmentTarFile);
+    _fileSystem.copyFromLocalFile(true, true, new Path(localSegmentTarFile.getAbsolutePath()), hdfsSegmentTarFile);
+
+    context.write(new LongWritable(sequenceId), new Text(segmentTarFileName));
+    _logger.info("Finish generating segment: {} with HDFS input file: {}, sequence id: {}", segmentName, hdfsInputFile,
+        sequenceId);
+  }
+
+  protected FileFormat getFileFormat(String fileName) {
+    if (fileName.endsWith(".avro")) {
+      return FileFormat.AVRO;
+    }
+    if (fileName.endsWith(".csv")) {
+      return FileFormat.CSV;
+    }
+    if (fileName.endsWith(".json")) {
+      return FileFormat.JSON;
+    }
+    if (fileName.endsWith(".thrift")) {
+      return FileFormat.THRIFT;
+    }
+    throw new IllegalArgumentException("Unsupported file format: {}" + fileName);
+  }
+
+  @Nullable
+  protected RecordReaderConfig getReaderConfig(FileFormat fileFormat)
+      throws IOException {
+    if (_readerConfigFile != null) {
+      if (fileFormat == FileFormat.CSV) {
+        try (InputStream inputStream = _fileSystem.open(_readerConfigFile)) {
+          CSVRecordReaderConfig readerConfig = JsonUtils.inputStreamToObject(inputStream, CSVRecordReaderConfig.class);
+          _logger.info("Using CSV record reader config: {}", readerConfig);
+          return readerConfig;
+        }
+      }
+      if (fileFormat == FileFormat.THRIFT) {
+        try (InputStream inputStream = _fileSystem.open(_readerConfigFile)) {
+          ThriftRecordReaderConfig readerConfig =
+              JsonUtils.inputStreamToObject(inputStream, ThriftRecordReaderConfig.class);
+          _logger.info("Using Thrift record reader config: {}", readerConfig);
+          return readerConfig;
+        }
+      }
+    }
+    return null;
+  }
+
+  /**
+   * Can be override to set additional segment generator configs.
+   */
+  @SuppressWarnings("unused")
+  protected void addAdditionalSegmentGeneratorConfigs(SegmentGeneratorConfig segmentGeneratorConfig, Path hdfsInputFile,
+      int sequenceId) {
+  }
+
+  @Override
+  public void cleanup(Context context) {
+    _logger.info("Deleting local temporary directory: {}", _localStagingDir);
+    FileUtils.deleteQuietly(_localStagingDir);
+  }
+}
diff --git a/pinot-hadoop/src/main/java/org/apache/pinot/hadoop/utils/PushLocation.java b/pinot-hadoop/src/main/java/org/apache/pinot/hadoop/utils/PushLocation.java
index 579950c..53ad7ed 100644
--- a/pinot-hadoop/src/main/java/org/apache/pinot/hadoop/utils/PushLocation.java
+++ b/pinot-hadoop/src/main/java/org/apache/pinot/hadoop/utils/PushLocation.java
@@ -18,13 +18,25 @@
  */
 package org.apache.pinot.hadoop.utils;
 
+import java.util.ArrayList;
+import java.util.List;
+
+
 public class PushLocation {
   private final String _host;
   private final int _port;
 
-  private PushLocation(PushLocationBuilder pushLocationBuilder) {
-    _host = pushLocationBuilder._host;
-    _port = pushLocationBuilder._port;
+  public PushLocation(String host, int port) {
+    _host = host;
+    _port = port;
+  }
+
+  public static List<PushLocation> getPushLocations(String[] hosts, int port) {
+    List<PushLocation> pushLocations = new ArrayList<>(hosts.length);
+    for (String host : hosts) {
+      pushLocations.add(new PushLocation(host, port));
+    }
+    return pushLocations;
   }
 
   public String getHost() {
@@ -35,28 +47,6 @@ public class PushLocation {
     return _port;
   }
 
-  public static class PushLocationBuilder {
-    private String _host;
-    private int _port;
-
-    public PushLocationBuilder() {
-    }
-
-    public PushLocationBuilder setHost(String host) {
-      _host = host;
-      return this;
-    }
-
-    public PushLocationBuilder setPort(int port) {
-      _port = port;
-      return this;
-    }
-
-    public PushLocation build() {
-      return new PushLocation(this);
-    }
-  }
-
   @Override
   public String toString() {
     return _host + ":" + _port;
diff --git a/pinot-integration-tests/src/test/java/org/apache/pinot/integration/tests/HadoopSegmentBuildPushOfflineClusterIntegrationTest.java b/pinot-integration-tests/src/test/java/org/apache/pinot/integration/tests/HadoopSegmentBuildPushOfflineClusterIntegrationTest.java
index e682f85..36e083c 100644
--- a/pinot-integration-tests/src/test/java/org/apache/pinot/integration/tests/HadoopSegmentBuildPushOfflineClusterIntegrationTest.java
+++ b/pinot-integration-tests/src/test/java/org/apache/pinot/integration/tests/HadoopSegmentBuildPushOfflineClusterIntegrationTest.java
@@ -155,13 +155,13 @@ public class HadoopSegmentBuildPushOfflineClusterIntegrationTest extends BaseClu
     properties.setProperty(JobConfigConstants.PUSH_TO_PORT, getDefaultControllerConfiguration().getControllerPort());
 
     // Run segment creation job
-    SegmentCreationJob creationJob = new SegmentCreationJob("TestSegmentCreation", properties);
+    SegmentCreationJob creationJob = new SegmentCreationJob(properties);
     Configuration config = _mrCluster.getConfig();
     creationJob.setConf(config);
     creationJob.run();
 
     // Run segment push job
-    SegmentTarPushJob pushJob = new SegmentTarPushJob("TestSegmentPush", properties);
+    SegmentTarPushJob pushJob = new SegmentTarPushJob(properties);
     pushJob.setConf(_mrCluster.getConfig());
     pushJob.run();
   }


---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscribe@pinot.apache.org
For additional commands, e-mail: commits-help@pinot.apache.org