You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@pinot.apache.org by xi...@apache.org on 2019/11/06 11:10:35 UTC

[incubator-pinot] branch pinot-spark updated (6fbd9da -> 7d8925b)

This is an automated email from the ASF dual-hosted git repository.

xiangfu pushed a change to branch pinot-spark
in repository https://gitbox.apache.org/repos/asf/incubator-pinot.git.


 discard 6fbd9da  Initial commit for pinot-spark
     new 7d8925b  Initial commit for pinot-spark

This update added new revisions after undoing existing revisions.
That is to say, some revisions that were in the old version of the
branch are not in the new version.  This situation occurs
when a user --force pushes a change and generates a repository
containing something like this:

 * -- * -- B -- O -- O -- O   (6fbd9da)
            \
             N -- N -- N   refs/heads/pinot-spark (7d8925b)

You should already have received notification emails for all of the O
revisions, and so the following emails describe only the N revisions
from the common base, B.

Any revisions marked "omit" are not gone; other references still
refer to them.  Any revisions marked "discard" are gone forever.

The 1 revisions listed above as "new" are entirely new to this
repository and will be described in separate emails.  The revisions
listed as "add" were already present in the repository and have only
been added to this reference.


Summary of changes:
 pinot-spark/README.md                               | 21 ++++++++++++++++++---
 .../apache/pinot/spark/PinotSparkJobLauncher.java   | 20 +++++++++++++-------
 2 files changed, 31 insertions(+), 10 deletions(-)


---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscribe@pinot.apache.org
For additional commands, e-mail: commits-help@pinot.apache.org


[incubator-pinot] 01/01: Initial commit for pinot-spark

Posted by xi...@apache.org.
This is an automated email from the ASF dual-hosted git repository.

xiangfu pushed a commit to branch pinot-spark
in repository https://gitbox.apache.org/repos/asf/incubator-pinot.git

commit 7d8925bd5c61a77efd7a6f921984311dff61d175
Author: Xiang Fu <fx...@gmail.com>
AuthorDate: Wed Nov 6 03:05:19 2019 -0800

    Initial commit for pinot-spark
---
 pinot-spark/README.md                              |  21 +-
 pinot-spark/pom.xml                                |  52 ++-
 .../apache/pinot/spark/PinotSparkJobLauncher.java  |  86 +++++
 .../apache/pinot/spark/jobs/BaseSegmentJob.java    | 137 +++++++
 .../apache/pinot/spark/jobs/ControllerRestApi.java |  42 ++
 .../pinot/spark/jobs/DefaultControllerRestApi.java | 192 +++++++++
 .../pinot/spark/jobs/JobConfigConstants.java       |  65 ++++
 .../pinot/spark/jobs/SegmentCreationJob.java       | 427 +++++++++++++++++++++
 .../pinot/spark/jobs/SegmentCreationMapper.java    | 323 ++++++++++++++++
 .../apache/pinot/spark/jobs/SegmentTarPushJob.java | 116 ++++++
 .../apache/pinot/spark/jobs/SegmentUriPushJob.java |  68 ++++
 .../pinot/spark/utils/JobPreparationHelper.java    |  70 ++++
 .../org/apache/pinot/spark/utils/PushLocation.java |  54 +++
 pom.xml                                            |  11 +
 14 files changed, 1652 insertions(+), 12 deletions(-)

diff --git a/pinot-spark/README.md b/pinot-spark/README.md
index 069cada..dec27c0 100644
--- a/pinot-spark/README.md
+++ b/pinot-spark/README.md
@@ -60,18 +60,33 @@ The `org.apache.pinot.spark.PinotSparkJobLauncher` class (the main class of the
 
 ```
 # Segment creation
-    spark jar  pinot-spark-1.0-SNAPSHOT.jar SegmentCreation job.properties
+    spark-submit \
+      --class org.apache.pinot.spark.PinotSparkJobLauncher \
+      --master <master-url> \
+      --deploy-mode <deploy-mode> \
+      pinot-spark-0.2.0-SNAPSHOT-shaded.jar \
+      SegmentCreation job.properties
   
 After this point, we have built the data segment from the raw data file.
 Next step is to push those data into pinot controller
 
 # Segment tar push
-    spark jar  pinot-spark-1.0-SNAPSHOT.jar SegmentTarPush job.properties
+    spark-submit \
+      --class org.apache.pinot.spark.PinotSparkJobLauncher \
+      --master <master-url> \
+      --deploy-mode <deploy-mode> \
+      pinot-spark-0.2.0-SNAPSHOT-shaded.jar \
+      SegmentTarPush job.properties
 
 There is also a job that combines the two jobs together.
 
 # Segment creation and tar push
-    spark jar  pinot-spark-1.0-SNAPSHOT.jar SegmentCreationAndTarPush job.properties
+    spark-submit \
+      --class org.apache.pinot.spark.PinotSparkJobLauncher \
+      --master <master-url> \
+      --deploy-mode <deploy-mode> \
+      pinot-spark-0.2.0-SNAPSHOT-shaded.jar \
+      SegmentCreationAndTarPush job.properties
 ```
 
 
diff --git a/pinot-spark/pom.xml b/pinot-spark/pom.xml
index 615b21a..6c56c38 100644
--- a/pinot-spark/pom.xml
+++ b/pinot-spark/pom.xml
@@ -71,7 +71,7 @@
                   <transformers>
                     <transformer implementation="org.apache.maven.plugins.shade.resource.ServicesResourceTransformer"/>
                     <transformer implementation="org.apache.maven.plugins.shade.resource.ManifestResourceTransformer">
-                      <mainClass>org.apache.pinot.hadoop.PinotHadoopJobLauncher</mainClass>
+                      <mainClass>org.apache.pinot.spark.PinotSparkJobLauncher</mainClass>
                     </transformer>
                   </transformers>
                 </configuration>
@@ -95,6 +95,10 @@
           <groupId>org.apache.pinot</groupId>
           <artifactId>pinot-common</artifactId>
         </exclusion>
+        <exclusion>
+          <groupId>com.thoughtworks.paranamer</groupId>
+          <artifactId>paranamer</artifactId>
+        </exclusion>
       </exclusions>
     </dependency>
     <dependency>
@@ -123,17 +127,34 @@
           <groupId>org.apache.pinot</groupId>
           <artifactId>pinot-core</artifactId>
         </exclusion>
+        <exclusion>
+          <groupId>org.apache.hadoop</groupId>
+          <artifactId>hadoop-mapreduce-client-core</artifactId>
+        </exclusion>
+        <exclusion>
+          <groupId>org.apache.hadoop</groupId>
+          <artifactId>hadoop-hdfs</artifactId>
+        </exclusion>
       </exclusions>
     </dependency>
     <dependency>
       <groupId>org.apache.spark</groupId>
       <artifactId>spark-core_${scala.binary.version}</artifactId>
       <scope>provided</scope>
-    </dependency>
-    <dependency>
-      <groupId>org.apache.spark</groupId>
-      <artifactId>spark-sql_${scala.binary.version}</artifactId>
-      <scope>provided</scope>
+      <exclusions>
+        <exclusion>
+          <groupId>org.scala-lang</groupId>
+          <artifactId>scala-reflect</artifactId>
+        </exclusion>
+        <exclusion>
+          <groupId>org.scala-lang</groupId>
+          <artifactId>scala-library</artifactId>
+        </exclusion>
+        <exclusion>
+          <groupId>javax.activation</groupId>
+          <artifactId>activation</artifactId>
+        </exclusion>
+      </exclusions>
     </dependency>
     <dependency>
       <groupId>commons-logging</groupId>
@@ -149,11 +170,24 @@
       <scope>provided</scope>
     </dependency>
     <dependency>
-      <groupId>org.apache.avro</groupId>
-      <artifactId>avro-mapred</artifactId>
-      <classifier>hadoop2</classifier>
+      <groupId>org.scala-lang</groupId>
+      <artifactId>scala-library</artifactId>
+    </dependency>
+    <dependency>
+      <groupId>org.scala-lang</groupId>
+      <artifactId>scala-reflect</artifactId>
+    </dependency>
+    <dependency>
+      <groupId>com.fasterxml.jackson.core</groupId>
+      <artifactId>jackson-core</artifactId>
     </dependency>
     <dependency>
+      <groupId>com.fasterxml.jackson.core</groupId>
+      <artifactId>jackson-databind</artifactId>
+    </dependency>
+
+    <!--Test-->
+    <dependency>
       <groupId>org.testng</groupId>
       <artifactId>testng</artifactId>
       <scope>test</scope>
diff --git a/pinot-spark/src/main/java/org/apache/pinot/spark/PinotSparkJobLauncher.java b/pinot-spark/src/main/java/org/apache/pinot/spark/PinotSparkJobLauncher.java
new file mode 100644
index 0000000..dad1fd2
--- /dev/null
+++ b/pinot-spark/src/main/java/org/apache/pinot/spark/PinotSparkJobLauncher.java
@@ -0,0 +1,86 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.pinot.spark;
+
+import java.io.FileInputStream;
+import java.util.Arrays;
+import java.util.Properties;
+import org.apache.pinot.spark.jobs.SegmentCreationJob;
+import org.apache.pinot.spark.jobs.SegmentTarPushJob;
+import org.apache.pinot.spark.jobs.SegmentUriPushJob;
+
+
+public class PinotSparkJobLauncher {
+
+  private static final String USAGE = "usage: [job_type] [job.properties]";
+  private static final String SUPPORT_JOB_TYPES = "\tsupport job types: " + Arrays.toString(PinotSparkJobType.values());
+
+  private static void usage() {
+    System.err.println(USAGE);
+    System.err.println(SUPPORT_JOB_TYPES);
+  }
+
+  private static void kickOffPinotSparkJob(PinotSparkJobType jobType, Properties jobConf)
+      throws Exception {
+    switch (jobType) {
+      case SegmentCreation:
+        new SegmentCreationJob(jobConf).run();
+        break;
+      case SegmentTarPush:
+        new SegmentTarPushJob(jobConf).run();
+        break;
+      case SegmentUriPush:
+        new SegmentUriPushJob(jobConf).run();
+        break;
+      case SegmentCreationAndTarPush:
+        new SegmentCreationJob(jobConf).run();
+        new SegmentTarPushJob(jobConf).run();
+        break;
+      case SegmentCreationAndUriPush:
+        new SegmentCreationJob(jobConf).run();
+        new SegmentUriPushJob(jobConf).run();
+        break;
+      default:
+        throw new RuntimeException("Not a valid jobType - " + jobType);
+    }
+  }
+
+  public static void main(String[] args)
+      throws Exception {
+    if (args.length != 2) {
+      usage();
+      System.exit(1);
+    }
+    PinotSparkJobType jobType = null;
+    Properties jobConf = null;
+    try {
+      jobType = PinotSparkJobType.valueOf(args[0]);
+      jobConf = new Properties();
+      jobConf.load(new FileInputStream(args[1]));
+    } catch (Exception e) {
+      usage();
+      System.exit(1);
+    }
+    kickOffPinotSparkJob(jobType, jobConf);
+  }
+
+  enum PinotSparkJobType {
+    SegmentCreation, SegmentTarPush, SegmentUriPush, SegmentCreationAndTarPush, SegmentCreationAndUriPush,
+  }
+}
diff --git a/pinot-spark/src/main/java/org/apache/pinot/spark/jobs/BaseSegmentJob.java b/pinot-spark/src/main/java/org/apache/pinot/spark/jobs/BaseSegmentJob.java
new file mode 100644
index 0000000..3285899
--- /dev/null
+++ b/pinot-spark/src/main/java/org/apache/pinot/spark/jobs/BaseSegmentJob.java
@@ -0,0 +1,137 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.pinot.spark.jobs;
+
+import com.google.common.base.Preconditions;
+import java.io.IOException;
+import java.io.Serializable;
+import java.util.ArrayList;
+import java.util.List;
+import java.util.Properties;
+import javax.annotation.Nullable;
+import org.apache.commons.lang3.StringUtils;
+import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.fs.FileStatus;
+import org.apache.hadoop.fs.FileSystem;
+import org.apache.hadoop.fs.Path;
+import org.apache.pinot.common.Utils;
+import org.apache.pinot.common.config.TableConfig;
+import org.apache.pinot.spark.utils.PushLocation;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+
+public abstract class BaseSegmentJob implements Serializable {
+  protected final Logger _logger = LoggerFactory.getLogger(getClass());
+  protected final Properties _properties;
+  protected final List<PushLocation> _pushLocations;
+  protected final String _rawTableName;
+
+  protected BaseSegmentJob(Properties properties) {
+    _properties = properties;
+    Utils.logVersions();
+    logProperties();
+
+    // Optional push location and table parameters. If set, will use the table config and schema from the push hosts.
+    String pushHostsString = _properties.getProperty(JobConfigConstants.PUSH_TO_HOSTS);
+    String pushPortString = _properties.getProperty(JobConfigConstants.PUSH_TO_PORT);
+    if (pushHostsString != null && pushPortString != null) {
+      _pushLocations =
+          PushLocation.getPushLocations(StringUtils.split(pushHostsString, ','), Integer.parseInt(pushPortString));
+    } else {
+      _pushLocations = null;
+    }
+
+    _rawTableName = Preconditions.checkNotNull(_properties.getProperty(JobConfigConstants.SEGMENT_TABLE_NAME));
+  }
+
+  @Nullable
+  protected TableConfig getTableConfig()
+      throws IOException {
+    try (ControllerRestApi controllerRestApi = getControllerRestApi()) {
+      return controllerRestApi != null ? controllerRestApi.getTableConfig() : null;
+    }
+  }
+
+  /**
+   * This method is currently implemented in SegmentCreationJob and SegmentPreprocessingJob due to a dependency on
+   * the hadoop filesystem, which we can only get once the job begins to run.
+   * We return null here to make it clear that for now, all implementations of this method have to support
+   * reading from a schema file. In the future, we hope to deprecate reading the schema from the schema file in favor
+   * of mandating that a schema is pushed to the controller.
+   */
+  @Nullable
+  protected org.apache.pinot.common.data.Schema getSchema() throws IOException {
+    return null;
+  }
+
+  /**
+   * Can be overridden to provide custom controller Rest API.
+   */
+  @Nullable
+  protected ControllerRestApi getControllerRestApi() {
+    return _pushLocations != null ? new DefaultControllerRestApi(_pushLocations, _rawTableName) : null;
+  }
+
+  protected void logProperties() {
+    _logger.info("*********************************************************************");
+    _logger.info("Job Properties: {}", _properties);
+    _logger.info("*********************************************************************");
+  }
+
+  @Nullable
+  protected Path getPathFromProperty(String key) {
+    String value = _properties.getProperty(key);
+    return value != null ? new Path(value) : null;
+  }
+
+  protected List<Path> getDataFilePaths(Path pathPattern)
+      throws IOException {
+    List<Path> tarFilePaths = new ArrayList<>();
+    FileSystem fileSystem = FileSystem.get(pathPattern.toUri(), new Configuration());
+    _logger.info("Using filesystem: {}", fileSystem);
+    FileStatus[] fileStatuses = fileSystem.globStatus(pathPattern);
+    if (fileStatuses == null) {
+      _logger.warn("Unable to match file status from file path pattern: {}", pathPattern);
+    } else {
+      getDataFilePathsHelper(fileSystem, fileStatuses, tarFilePaths);
+    }
+    return tarFilePaths;
+  }
+
+  protected void getDataFilePathsHelper(FileSystem fileSystem, FileStatus[] fileStatuses, List<Path> tarFilePaths)
+      throws IOException {
+    for (FileStatus fileStatus : fileStatuses) {
+      Path path = fileStatus.getPath();
+      if (fileStatus.isDirectory()) {
+        getDataFilePathsHelper(fileSystem, fileSystem.listStatus(path), tarFilePaths);
+      } else {
+        // Skip temp files generated by computation frameworks like Hadoop/Spark.
+        if (path.getName().startsWith("_") || path.getName().startsWith(".")) {
+          continue;
+        }
+        if (isDataFile(path.getName())) {
+          tarFilePaths.add(path);
+        }
+      }
+    }
+  }
+
+  protected abstract boolean isDataFile(String fileName);
+}
diff --git a/pinot-spark/src/main/java/org/apache/pinot/spark/jobs/ControllerRestApi.java b/pinot-spark/src/main/java/org/apache/pinot/spark/jobs/ControllerRestApi.java
new file mode 100644
index 0000000..4d0391c
--- /dev/null
+++ b/pinot-spark/src/main/java/org/apache/pinot/spark/jobs/ControllerRestApi.java
@@ -0,0 +1,42 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.pinot.spark.jobs;
+
+import java.io.Closeable;
+import java.util.List;
+import org.apache.hadoop.fs.FileSystem;
+import org.apache.hadoop.fs.Path;
+import org.apache.pinot.common.config.TableConfig;
+import org.apache.pinot.common.data.Schema;
+
+
+public interface ControllerRestApi extends Closeable {
+
+  TableConfig getTableConfig();
+
+  Schema getSchema();
+
+  void pushSegments(FileSystem fileSystem, List<Path> tarFilePaths);
+
+  void sendSegmentUris(List<String> segmentUris);
+
+  void deleteSegmentUris(List<String> segmentUris);
+
+  List<String> getAllSegments(String tableType);
+}
diff --git a/pinot-spark/src/main/java/org/apache/pinot/spark/jobs/DefaultControllerRestApi.java b/pinot-spark/src/main/java/org/apache/pinot/spark/jobs/DefaultControllerRestApi.java
new file mode 100644
index 0000000..c9444a9
--- /dev/null
+++ b/pinot-spark/src/main/java/org/apache/pinot/spark/jobs/DefaultControllerRestApi.java
@@ -0,0 +1,192 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.pinot.spark.jobs;
+
+import com.fasterxml.jackson.databind.JsonNode;
+import com.fasterxml.jackson.databind.ObjectMapper;
+import com.google.common.base.Preconditions;
+import java.io.IOException;
+import java.io.InputStream;
+import java.util.ArrayList;
+import java.util.List;
+import org.apache.hadoop.fs.FileSystem;
+import org.apache.hadoop.fs.Path;
+import org.apache.pinot.common.config.TableConfig;
+import org.apache.pinot.common.data.Schema;
+import org.apache.pinot.common.utils.FileUploadDownloadClient;
+import org.apache.pinot.common.utils.JsonUtils;
+import org.apache.pinot.common.utils.SimpleHttpResponse;
+import org.apache.pinot.spark.utils.PushLocation;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+
+public class DefaultControllerRestApi implements ControllerRestApi {
+  private static final Logger LOGGER = LoggerFactory.getLogger(DefaultControllerRestApi.class);
+
+  private final List<PushLocation> _pushLocations;
+  private final String _rawTableName;
+  private final FileUploadDownloadClient _fileUploadDownloadClient = new FileUploadDownloadClient();
+
+  private static final String OFFLINE = "OFFLINE";
+
+  public DefaultControllerRestApi(List<PushLocation> pushLocations, String rawTableName) {
+    LOGGER.info("Push locations are: {} for table: {}", pushLocations, rawTableName);
+    _pushLocations = pushLocations;
+    _rawTableName = rawTableName;
+  }
+
+  @Override
+  public TableConfig getTableConfig() {
+    for (PushLocation pushLocation : _pushLocations) {
+      try {
+        SimpleHttpResponse response = _fileUploadDownloadClient.sendGetRequest(FileUploadDownloadClient
+            .getRetrieveTableConfigHttpURI(pushLocation.getHost(), pushLocation.getPort(), _rawTableName));
+        JsonNode offlineJsonTableConfig = JsonUtils.stringToJsonNode(response.getResponse()).get(OFFLINE);
+        if (offlineJsonTableConfig != null) {
+          TableConfig offlineTableConfig = TableConfig.fromJsonConfig(offlineJsonTableConfig);
+          LOGGER.info("Got table config: {}", offlineTableConfig);
+          return offlineTableConfig;
+        }
+      } catch (Exception e) {
+        LOGGER.warn("Caught exception while fetching table config for table: {} from push location: {}", _rawTableName,
+            pushLocation, e);
+      }
+    }
+    String errorMessage = String
+        .format("Failed to get table config from push locations: %s for table: %s", _pushLocations, _rawTableName);
+    LOGGER.error(errorMessage);
+    throw new RuntimeException(errorMessage);
+  }
+
+  @Override
+  public Schema getSchema() {
+    for (PushLocation pushLocation : _pushLocations) {
+      try {
+        SimpleHttpResponse response = _fileUploadDownloadClient.sendGetRequest(FileUploadDownloadClient
+            .getRetrieveSchemaHttpURI(pushLocation.getHost(), pushLocation.getPort(), _rawTableName));
+        Schema schema = Schema.fromString(response.getResponse());
+        LOGGER.info("Got schema: {}", schema);
+        return schema;
+      } catch (Exception e) {
+        LOGGER.warn("Caught exception while fetching schema for table: {} from push location: {}", _rawTableName,
+            pushLocation, e);
+      }
+    }
+    String errorMessage =
+        String.format("Failed to get schema from push locations: %s for table: %s", _pushLocations, _rawTableName);
+    LOGGER.error(errorMessage);
+    throw new RuntimeException(errorMessage);
+  }
+
+  @Override
+  public void pushSegments(FileSystem fileSystem, List<Path> tarFilePaths) {
+    LOGGER.info("Start pushing segments: {} to locations: {}", tarFilePaths, _pushLocations);
+    for (Path tarFilePath : tarFilePaths) {
+      String fileName = tarFilePath.getName();
+      Preconditions.checkArgument(fileName.endsWith(JobConfigConstants.TAR_GZ_FILE_EXT));
+      String segmentName = fileName.substring(0, fileName.length() - JobConfigConstants.TAR_GZ_FILE_EXT.length());
+      for (PushLocation pushLocation : _pushLocations) {
+        LOGGER.info("Pushing segment: {} to location: {}", segmentName, pushLocation);
+        try (InputStream inputStream = fileSystem.open(tarFilePath)) {
+          SimpleHttpResponse response = _fileUploadDownloadClient.uploadSegment(
+              FileUploadDownloadClient.getUploadSegmentHttpURI(pushLocation.getHost(), pushLocation.getPort()),
+              segmentName, inputStream, _rawTableName);
+          LOGGER.info("Response {}: {}", response.getStatusCode(), response.getResponse());
+        } catch (Exception e) {
+          LOGGER.error("Caught exception while pushing segment: {} to location: {}", segmentName, pushLocation, e);
+          throw new RuntimeException(e);
+        }
+      }
+    }
+  }
+
+  @Override
+  public void sendSegmentUris(List<String> segmentUris) {
+    LOGGER.info("Start sending segment URIs: {} to locations: {}", segmentUris, _pushLocations);
+    for (String segmentUri : segmentUris) {
+      for (PushLocation pushLocation : _pushLocations) {
+        LOGGER.info("Sending segment URI: {} to location: {}", segmentUri, pushLocation);
+        try {
+          SimpleHttpResponse response = _fileUploadDownloadClient.sendSegmentUri(
+              FileUploadDownloadClient.getUploadSegmentHttpURI(pushLocation.getHost(), pushLocation.getPort()),
+              segmentUri, _rawTableName);
+          LOGGER.info("Response {}: {}", response.getStatusCode(), response.getResponse());
+        } catch (Exception e) {
+          LOGGER.error("Caught exception while sending segment URI: {} to location: {}", segmentUri, pushLocation, e);
+          throw new RuntimeException(e);
+        }
+      }
+    }
+  }
+
+  @Override
+  public void deleteSegmentUris(List<String> segmentUris) {
+    LOGGER.info("Start deleting segment URIs: {} to locations: {}", segmentUris, _pushLocations);
+    for (String segmentUri : segmentUris) {
+      for (PushLocation pushLocation : _pushLocations) {
+        LOGGER.info("Sending deleting segment URI: {} to location: {}", segmentUri, pushLocation);
+        try {
+          SimpleHttpResponse response = _fileUploadDownloadClient.sendDeleteRequest(
+              FileUploadDownloadClient.getDeleteSegmentHttpUri(pushLocation.getHost(), pushLocation.getPort(), _rawTableName,
+              segmentUri, "OFFLINE"));
+          LOGGER.info("Response {}: {}", response.getStatusCode(), response.getResponse());
+        } catch (Exception e) {
+          LOGGER.error("Caught exception while deleting segment URI: {} to location: {}", segmentUri, pushLocation, e);
+          throw new RuntimeException(e);
+        }
+      }
+    }
+  }
+
+  @Override
+  public List<String> getAllSegments(String tableType) {
+    LOGGER.info("Getting all segments of table {}", _rawTableName);
+    ObjectMapper objectMapper = new ObjectMapper();
+    for (PushLocation pushLocation : _pushLocations) {
+      try {
+        SimpleHttpResponse response = _fileUploadDownloadClient.sendGetRequest(
+            FileUploadDownloadClient.getRetrieveAllSegmentWithTableTypeHttpUri(pushLocation.getHost(), pushLocation.getPort(),
+                _rawTableName, tableType));
+        JsonNode segmentList = getSegmentsFromJsonSegmentAPI(response.getResponse(), tableType);
+        return objectMapper.convertValue(segmentList, ArrayList.class);
+      } catch (Exception e) {
+        LOGGER.warn("Caught exception while getting all {} segments for table: {} from push location: {}", tableType, _rawTableName,
+            pushLocation, e);
+      }
+    }
+    String errorMessage =
+        String.format("Failed to get a list of all segments from push locations: %s for table: %s", _pushLocations,
+            _rawTableName);
+    LOGGER.error(errorMessage);
+    throw new RuntimeException(errorMessage);
+
+  }
+
+  @Override
+  public void close()
+      throws IOException {
+    _fileUploadDownloadClient.close();
+  }
+
+  private JsonNode getSegmentsFromJsonSegmentAPI(String json, String tableType)
+      throws Exception {
+    return JsonUtils.stringToJsonNode(json).get(0).get(tableType);
+  }
+}
diff --git a/pinot-spark/src/main/java/org/apache/pinot/spark/jobs/JobConfigConstants.java b/pinot-spark/src/main/java/org/apache/pinot/spark/jobs/JobConfigConstants.java
new file mode 100644
index 0000000..f7636d3
--- /dev/null
+++ b/pinot-spark/src/main/java/org/apache/pinot/spark/jobs/JobConfigConstants.java
@@ -0,0 +1,65 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.pinot.spark.jobs;
+
+public class JobConfigConstants {
+  public static final String PATH_TO_INPUT = "path.to.input";
+  public static final String PATH_TO_OUTPUT = "path.to.output";
+  public static final String PREPROCESS_PATH_TO_OUTPUT = "preprocess.path.to.output";
+  public static final String PATH_TO_DEPS_JAR = "path.to.deps.jar";
+  public static final String PATH_TO_READER_CONFIG = "path.to.reader.config";
+  // Leave this for backward compatibility. We prefer to use the schema fetched from the controller.
+  public static final String PATH_TO_SCHEMA = "path.to.schema";
+
+  public static final String SEGMENT_TAR_DIR = "segmentTar";
+  public static final String TAR_GZ_FILE_EXT = ".tar.gz";
+
+  public static final String SEGMENT_TABLE_NAME = "segment.table.name";
+  public static final String TABLE_CONFIG = "table.config";
+  public static final String SCHEMA = "data.schema";
+
+  public static final String SEGMENT_NAME_GENERATOR_TYPE = "segment.name.generator.type";
+  public static final String SIMPLE_SEGMENT_NAME_GENERATOR = "simple";
+  public static final String NORMALIZED_DATE_SEGMENT_NAME_GENERATOR = "normalizedDate";
+  public static final String DEFAULT_SEGMENT_NAME_GENERATOR = SIMPLE_SEGMENT_NAME_GENERATOR;
+
+  // For SimpleSegmentNameGenerator
+  public static final String SEGMENT_NAME_POSTFIX = "segment.name.postfix";
+
+  // For NormalizedDateSegmentNameGenerator
+  public static final String SEGMENT_NAME_PREFIX = "segment.name.prefix";
+  public static final String EXCLUDE_SEQUENCE_ID = "exclude.sequence.id";
+
+  public static final String PUSH_TO_HOSTS = "push.to.hosts";
+  public static final String PUSH_TO_PORT = "push.to.port";
+
+  public static final String DEFAULT_PERMISSIONS_MASK = "fs.permissions.umask-mode";
+
+  // The path to the record reader to be configured
+  public static final String RECORD_READER_PATH = "record.reader.path";
+
+  public static final String ENABLE_PREPROCESSING = "enable.preprocessing";
+
+  // This setting should be used if you will generate less # of segments after
+  // push. In preprocessing, this is likely because we resize segments.
+  public static final String DELETE_EXTRA_SEGMENTS = "delete.extra.segments";
+
+  // This setting is used to match output segments hierarchy along with input file hierarchy.
+  public static final String USE_RELATIVE_PATH = "use.relative.path";
+}
diff --git a/pinot-spark/src/main/java/org/apache/pinot/spark/jobs/SegmentCreationJob.java b/pinot-spark/src/main/java/org/apache/pinot/spark/jobs/SegmentCreationJob.java
new file mode 100644
index 0000000..74f09ee
--- /dev/null
+++ b/pinot-spark/src/main/java/org/apache/pinot/spark/jobs/SegmentCreationJob.java
@@ -0,0 +1,427 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.pinot.spark.jobs;
+
+import com.google.common.base.Preconditions;
+import java.io.DataOutputStream;
+import java.io.File;
+import java.io.IOException;
+import java.io.InputStream;
+import java.io.Serializable;
+import java.net.URI;
+import java.util.ArrayList;
+import java.util.List;
+import java.util.Map;
+import java.util.Properties;
+import java.util.UUID;
+import javax.annotation.Nullable;
+import org.apache.commons.io.FileUtils;
+import org.apache.commons.lang3.StringUtils;
+import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.fs.FileStatus;
+import org.apache.hadoop.fs.FileSystem;
+import org.apache.hadoop.fs.Path;
+import org.apache.pinot.common.config.SegmentsValidationAndRetentionConfig;
+import org.apache.pinot.common.config.TableConfig;
+import org.apache.pinot.common.data.Schema;
+import org.apache.pinot.common.data.TimeFieldSpec;
+import org.apache.pinot.common.utils.DataSize;
+import org.apache.pinot.common.utils.JsonUtils;
+import org.apache.pinot.common.utils.StringUtil;
+import org.apache.pinot.common.utils.TarGzCompressionUtils;
+import org.apache.pinot.core.data.readers.CSVRecordReaderConfig;
+import org.apache.pinot.core.data.readers.FileFormat;
+import org.apache.pinot.core.data.readers.RecordReaderConfig;
+import org.apache.pinot.core.data.readers.ThriftRecordReaderConfig;
+import org.apache.pinot.core.indexsegment.generator.SegmentGeneratorConfig;
+import org.apache.pinot.core.segment.creator.SegmentIndexCreationDriver;
+import org.apache.pinot.core.segment.creator.impl.SegmentIndexCreationDriverImpl;
+import org.apache.pinot.core.segment.name.NormalizedDateSegmentNameGenerator;
+import org.apache.pinot.core.segment.name.SegmentNameGenerator;
+import org.apache.pinot.core.segment.name.SimpleSegmentNameGenerator;
+import org.apache.pinot.spark.utils.JobPreparationHelper;
+import org.apache.pinot.spark.utils.PushLocation;
+import org.apache.spark.api.java.JavaRDD;
+import org.apache.spark.api.java.JavaSparkContext;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+
+public class SegmentCreationJob extends BaseSegmentJob {
+  protected static final String APPEND = "APPEND";
+  protected static final String LOCAL_TEMP_DIR = "pinot_spark_tmp";
+  protected static final Logger LOGGER = LoggerFactory.getLogger(SegmentCreationJob.class);
+  protected final String _rawTableName;
+
+  protected final String _inputPattern;
+  protected final String _outputDir;
+  protected final String _stagingDir;
+  // Optional
+  protected final String _depsJarDir;
+  protected final String _schemaFile;
+  protected final String _defaultPermissionsMask;
+  protected final List<PushLocation> _pushLocations;
+
+  public SegmentCreationJob(Properties properties) {
+    super(properties);
+    new Configuration().set("mapreduce.job.user.classpath.first", "true");
+
+    _inputPattern = Preconditions.checkNotNull(getPathFromProperty(JobConfigConstants.PATH_TO_INPUT)).toString();
+    _outputDir = Preconditions.checkNotNull(getPathFromProperty(JobConfigConstants.PATH_TO_OUTPUT)).toString();
+    _stagingDir = new Path(_outputDir, UUID.randomUUID().toString()).toString();
+    _rawTableName = Preconditions.checkNotNull(_properties.getProperty(JobConfigConstants.SEGMENT_TABLE_NAME));
+
+    // Optional
+    _depsJarDir = properties.getProperty(JobConfigConstants.PATH_TO_DEPS_JAR);
+    _schemaFile = properties.getProperty(JobConfigConstants.PATH_TO_SCHEMA);
+    _defaultPermissionsMask = _properties.getProperty(JobConfigConstants.DEFAULT_PERMISSIONS_MASK);
+
+    // Optional push location and table parameters. If set, will use the table config and schema from the push hosts.
+    String pushHostsString = _properties.getProperty(JobConfigConstants.PUSH_TO_HOSTS);
+    String pushPortString = _properties.getProperty(JobConfigConstants.PUSH_TO_PORT);
+    if (pushHostsString != null && pushPortString != null) {
+      _pushLocations =
+          PushLocation.getPushLocations(StringUtils.split(pushHostsString, ','), Integer.parseInt(pushPortString));
+    } else {
+      _pushLocations = null;
+    }
+
+    LOGGER.info("*********************************************************************");
+    LOGGER.info("Input Pattern: {}", _inputPattern);
+    LOGGER.info("Output Directory: {}", _outputDir);
+    LOGGER.info("Staging Directory: {}", _stagingDir);
+    LOGGER.info("Raw Table Name: {}", _rawTableName);
+    LOGGER.info("Dependencies Directory: {}", _depsJarDir);
+    LOGGER.info("Schema File: {}", _schemaFile);
+    LOGGER.info("Default Permissions Mask: {}", _defaultPermissionsMask);
+    LOGGER.info("Push Locations: {}", _pushLocations);
+    LOGGER.info("*********************************************************************");
+  }
+
+  /**
+   * Generate a relative output directory path when `useRelativePath` flag is on.
+   * This method will compute the relative path based on `inputFile` and `baseInputDir`,
+   * then apply only the directory part of relative path to `outputDir`.
+   * E.g.
+   *    baseInputDir = "/path/to/input"
+   *    inputFile = "/path/to/input/a/b/c/d.avro"
+   *    outputDir = "/path/to/output"
+   *    getRelativeOutputPath(baseInputDir, inputFile, outputDir) = /path/to/output/a/b/c
+   */
+  protected static Path getRelativeOutputPath(URI baseInputDir, URI inputFile, Path outputDir) {
+    URI relativePath = baseInputDir.relativize(inputFile);
+    Preconditions.checkState(relativePath.getPath().length() > 0 && !relativePath.equals(inputFile),
+        "Unable to extract out the relative path based on base input path: " + baseInputDir);
+    return new Path(outputDir, relativePath.getPath()).getParent();
+  }
+
+  protected static void createSingleSegment(String inputFile, Long seqId, Configuration conf, String stagingDir)
+      throws IOException {
+    Path hdfsInputFile = new Path(inputFile);
+    int sequenceId = seqId.intValue();
+    LOGGER.info("Generating segment with HDFS input file: {}, sequence id: {}", hdfsInputFile, sequenceId);
+
+    String rawTableName = conf.get(JobConfigConstants.SEGMENT_TABLE_NAME);
+    Schema schema = Schema.fromString(conf.get(JobConfigConstants.SCHEMA));
+    SegmentNameGenerator segmentNameGenerator;
+    boolean useRelativePath = conf.getBoolean(JobConfigConstants.USE_RELATIVE_PATH, false);
+
+    // Optional
+    TableConfig tableConfig = null;
+    String recordReaderPath;
+    Path readerConfigFile = null;
+
+    String tableConfigString = conf.get(JobConfigConstants.TABLE_CONFIG);
+    if (tableConfigString != null) {
+      tableConfig = TableConfig.fromJsonString(tableConfigString);
+    }
+    String readerConfigFileStr = conf.get(JobConfigConstants.PATH_TO_READER_CONFIG);
+    if (readerConfigFileStr != null) {
+      readerConfigFile = new Path(readerConfigFileStr);
+    }
+    recordReaderPath = conf.get(JobConfigConstants.RECORD_READER_PATH);
+
+    // HDFS segment tar directory
+    Path hdfsSegmentTarDir = new Path(new Path(stagingDir, "output"), JobConfigConstants.SEGMENT_TAR_DIR);
+
+    // Set up segment name generator
+    String segmentNameGeneratorType =
+        conf.get(JobConfigConstants.SEGMENT_NAME_GENERATOR_TYPE, JobConfigConstants.DEFAULT_SEGMENT_NAME_GENERATOR);
+    switch (segmentNameGeneratorType) {
+      case JobConfigConstants.SIMPLE_SEGMENT_NAME_GENERATOR:
+        segmentNameGenerator =
+            new SimpleSegmentNameGenerator(rawTableName, conf.get(JobConfigConstants.SEGMENT_NAME_POSTFIX));
+        break;
+      case JobConfigConstants.NORMALIZED_DATE_SEGMENT_NAME_GENERATOR:
+        Preconditions.checkState(tableConfig != null,
+            "In order to use NormalizedDateSegmentNameGenerator, table config must be provided");
+        SegmentsValidationAndRetentionConfig validationConfig = tableConfig.getValidationConfig();
+        String timeFormat = null;
+        TimeFieldSpec timeFieldSpec = schema.getTimeFieldSpec();
+        if (timeFieldSpec != null) {
+          timeFormat = timeFieldSpec.getOutgoingGranularitySpec().getTimeFormat();
+        }
+        segmentNameGenerator =
+            new NormalizedDateSegmentNameGenerator(rawTableName, conf.get(JobConfigConstants.SEGMENT_NAME_PREFIX),
+                conf.getBoolean(JobConfigConstants.EXCLUDE_SEQUENCE_ID, false), validationConfig.getSegmentPushType(),
+                validationConfig.getSegmentPushFrequency(), validationConfig.getTimeType(), timeFormat);
+        break;
+      default:
+        throw new UnsupportedOperationException("Unsupported segment name generator type: " + segmentNameGeneratorType);
+    }
+
+    // Temporary local directories
+    File localStagingDir = new File(LOCAL_TEMP_DIR);
+    File localInputDir = new File(localStagingDir, "inputData");
+    File localSegmentsDir = new File(localStagingDir, "segments");
+    File localSegmentTarDir = new File(localStagingDir, JobConfigConstants.SEGMENT_TAR_DIR);
+
+    String inputFileName = hdfsInputFile.getName();
+    File localInputFile = new File(localInputDir, inputFileName);
+    LOGGER.info("Copying input file from: {} to: {}", hdfsInputFile, localInputFile);
+    FileSystem.get(hdfsInputFile.toUri(), new Configuration())
+        .copyToLocalFile(hdfsInputFile, new Path(localInputFile.getAbsolutePath()));
+
+    SegmentGeneratorConfig segmentGeneratorConfig = new SegmentGeneratorConfig(tableConfig, schema);
+    segmentGeneratorConfig.setTableName(rawTableName);
+    segmentGeneratorConfig.setInputFilePath(localInputFile.getPath());
+    segmentGeneratorConfig.setOutDir(localSegmentsDir.getPath());
+    segmentGeneratorConfig.setSegmentNameGenerator(segmentNameGenerator);
+    segmentGeneratorConfig.setSequenceId(sequenceId);
+    if (recordReaderPath != null) {
+      segmentGeneratorConfig.setRecordReaderPath(recordReaderPath);
+      segmentGeneratorConfig.setFormat(FileFormat.OTHER);
+    } else {
+      FileFormat fileFormat = getFileFormat(inputFileName);
+      segmentGeneratorConfig.setFormat(fileFormat);
+      segmentGeneratorConfig.setReaderConfig(getReaderConfig(conf, readerConfigFile, fileFormat));
+    }
+    segmentGeneratorConfig.setOnHeap(true);
+
+    addAdditionalSegmentGeneratorConfigs(segmentGeneratorConfig, hdfsInputFile, sequenceId);
+
+    LOGGER.info("Start creating segment with sequence id: {}", sequenceId);
+    SegmentIndexCreationDriver driver = new SegmentIndexCreationDriverImpl();
+    try {
+      driver.init(segmentGeneratorConfig);
+      driver.build();
+    } catch (Exception e) {
+      LOGGER.error("Caught exception while creating segment with HDFS input file: {}, sequence id: {}", hdfsInputFile,
+          sequenceId, e);
+      throw new RuntimeException(e);
+    }
+    String segmentName = driver.getSegmentName();
+    LOGGER.info("Finish creating segment: {} with sequence id: {}", segmentName, sequenceId);
+
+    File localSegmentDir = new File(localSegmentsDir, segmentName);
+    String segmentTarFileName = segmentName + JobConfigConstants.TAR_GZ_FILE_EXT;
+    File localSegmentTarFile = new File(localSegmentTarDir, segmentTarFileName);
+    LOGGER.info("Tarring segment from: {} to: {}", localSegmentDir, localSegmentTarFile);
+    TarGzCompressionUtils.createTarGzOfDirectory(localSegmentDir.getPath(), localSegmentTarFile.getPath());
+
+    long uncompressedSegmentSize = FileUtils.sizeOf(localSegmentDir);
+    long compressedSegmentSize = FileUtils.sizeOf(localSegmentTarFile);
+    LOGGER.info("Size for segment: {}, uncompressed: {}, compressed: {}", segmentName,
+        DataSize.fromBytes(uncompressedSegmentSize), DataSize.fromBytes(compressedSegmentSize));
+
+    Path hdfsSegmentTarFile = new Path(hdfsSegmentTarDir, segmentTarFileName);
+    if (useRelativePath) {
+      Path relativeOutputPath =
+          getRelativeOutputPath(new Path(conf.get(JobConfigConstants.PATH_TO_INPUT)).toUri(), hdfsInputFile.toUri(),
+              hdfsSegmentTarDir);
+      hdfsSegmentTarFile = new Path(relativeOutputPath, segmentTarFileName);
+    }
+    LOGGER.info("Copying segment tar file from: {} to: {}", localSegmentTarFile, hdfsSegmentTarFile);
+    FileSystem.get(hdfsSegmentTarFile.toUri(), new Configuration())
+        .copyFromLocalFile(true, true, new Path(localSegmentTarFile.getAbsolutePath()), hdfsSegmentTarFile);
+
+    LOGGER.info("Finish generating segment: {} with HDFS input file: {}, sequence id: {}", segmentName, hdfsInputFile,
+        sequenceId);
+  }
+
+  protected static FileFormat getFileFormat(String fileName) {
+    if (fileName.endsWith(".avro")) {
+      return FileFormat.AVRO;
+    }
+    if (fileName.endsWith(".csv")) {
+      return FileFormat.CSV;
+    }
+    if (fileName.endsWith(".json")) {
+      return FileFormat.JSON;
+    }
+    if (fileName.endsWith(".thrift")) {
+      return FileFormat.THRIFT;
+    }
+    throw new IllegalArgumentException("Unsupported file format: {}" + fileName);
+  }
+
+  @Nullable
+  protected static RecordReaderConfig getReaderConfig(Configuration conf, Path readerConfigFile, FileFormat fileFormat)
+      throws IOException {
+    if (readerConfigFile != null) {
+      if (fileFormat == FileFormat.CSV) {
+        try (InputStream inputStream = FileSystem.get(readerConfigFile.toUri(), conf).open(readerConfigFile)) {
+          CSVRecordReaderConfig readerConfig = JsonUtils.inputStreamToObject(inputStream, CSVRecordReaderConfig.class);
+          LOGGER.info("Using CSV record reader config: {}", readerConfig);
+          return readerConfig;
+        }
+      }
+      if (fileFormat == FileFormat.THRIFT) {
+        try (InputStream inputStream = FileSystem.get(readerConfigFile.toUri(), conf).open(readerConfigFile)) {
+          ThriftRecordReaderConfig readerConfig =
+              JsonUtils.inputStreamToObject(inputStream, ThriftRecordReaderConfig.class);
+          LOGGER.info("Using Thrift record reader config: {}", readerConfig);
+          return readerConfig;
+        }
+      }
+    }
+    return null;
+  }
+
+  /**
+   * Can be overridden to set additional segment generator configs.
+   */
+  @SuppressWarnings("unused")
+  protected static void addAdditionalSegmentGeneratorConfigs(SegmentGeneratorConfig segmentGeneratorConfig,
+      Path hdfsInputFile, int sequenceId) {
+  }
+
+  @Override
+  protected boolean isDataFile(String fileName) {
+    // For custom record reader, treat all files as data file
+    if (_properties.getProperty(JobConfigConstants.RECORD_READER_PATH) != null) {
+      return true;
+    }
+    return fileName.endsWith(".avro") || fileName.endsWith(".csv") || fileName.endsWith(".json") || fileName
+        .endsWith(".thrift");
+  }
+
+  public void run()
+      throws Exception {
+    LOGGER.info("Starting {}", getClass().getSimpleName());
+
+    Path inputPattern = new Path(_inputPattern);
+    Path outputDir = new Path(_stagingDir);
+    Path stagingDir = new Path(_stagingDir);
+
+    // Initialize all directories
+    FileSystem outputDirFileSystem = FileSystem.get(outputDir.toUri(), new Configuration());
+    JobPreparationHelper.mkdirs(outputDirFileSystem, outputDir, _defaultPermissionsMask);
+    JobPreparationHelper.mkdirs(outputDirFileSystem, stagingDir, _defaultPermissionsMask);
+    Path stagingInputDir = new Path(stagingDir, "input");
+    JobPreparationHelper.mkdirs(outputDirFileSystem, stagingInputDir, _defaultPermissionsMask);
+
+    // Gather all data files
+    List<Path> dataFilePaths = getDataFilePaths(inputPattern);
+    int numDataFiles = dataFilePaths.size();
+    if (numDataFiles == 0) {
+      String errorMessage = "No data file founded with pattern: " + inputPattern;
+      LOGGER.error(errorMessage);
+      throw new RuntimeException(errorMessage);
+    } else {
+      LOGGER.info("Creating segments with data files: {}", dataFilePaths);
+      for (int i = 0; i < numDataFiles; i++) {
+        Path dataFilePath = dataFilePaths.get(i);
+        try (DataOutputStream dataOutputStream = outputDirFileSystem
+            .create(new Path(stagingInputDir, Integer.toString(i)))) {
+          dataOutputStream.write(StringUtil.encodeUtf8(dataFilePath.toString() + " " + i));
+          dataOutputStream.flush();
+        }
+      }
+    }
+
+    // Set up the job
+    List<String> dataFilePathStrs = new ArrayList<>();
+    for (Path dataFilePath : dataFilePaths) {
+      dataFilePathStrs.add(dataFilePath.toString());
+    }
+
+    // Set table config and schema
+    TableConfig tableConfig = getTableConfig();
+    if (tableConfig != null) {
+      validateTableConfig(tableConfig);
+      _properties.put(JobConfigConstants.TABLE_CONFIG, tableConfig.toJsonConfigString());
+    }
+    _properties.put(JobConfigConstants.SCHEMA, getSchema().toSingleLineJsonString());
+
+    JavaSparkContext sparkContext = new JavaSparkContext();
+    addDepsJarToDistributedCache(sparkContext);
+    JavaRDD<String> pathRDD = sparkContext.parallelize(dataFilePathStrs, numDataFiles);
+    pathRDD.zipWithIndex().foreach(tuple2 -> {
+      SegmentCreationMapper segmentCreationMapper =
+          new SegmentCreationMapper(_properties, new Path(_stagingDir, "output").toString());
+      segmentCreationMapper.run(tuple2._1, tuple2._2);
+      segmentCreationMapper.cleanup();
+    });
+
+    moveSegmentsToOutputDir(outputDirFileSystem, _stagingDir, _outputDir);
+
+    // Delete the staging directory
+    LOGGER.info("Deleting the staging directory: {}", stagingDir);
+    outputDirFileSystem.delete(stagingDir, true);
+  }
+
+  @Override
+  protected Schema getSchema()
+      throws IOException {
+    try (ControllerRestApi controllerRestApi = getControllerRestApi()) {
+      if (controllerRestApi != null) {
+        return controllerRestApi.getSchema();
+      } else {
+        // Schema file could be stored local or remotely.
+        Path schemaFilePath = new Path(_schemaFile);
+        try (InputStream inputStream = FileSystem.get(schemaFilePath.toUri(), new Configuration())
+            .open(schemaFilePath)) {
+          return Schema.fromInputSteam(inputStream);
+        }
+      }
+    }
+  }
+
+  protected void validateTableConfig(TableConfig tableConfig) {
+    SegmentsValidationAndRetentionConfig validationConfig = tableConfig.getValidationConfig();
+
+    // For APPEND use case, timeColumnName and timeType must be set
+    if (APPEND.equalsIgnoreCase(validationConfig.getSegmentPushType())) {
+      Preconditions.checkState(validationConfig.getTimeColumnName() != null && validationConfig.getTimeType() != null,
+          "For APPEND use case, time column and type must be set");
+    }
+  }
+
+  protected void addDepsJarToDistributedCache(JavaSparkContext sparkContext)
+      throws IOException {
+    if (_depsJarDir != null) {
+      Path depsJarPath = new Path(_depsJarDir);
+      JobPreparationHelper
+          .addDepsJarToDistributedCacheHelper(FileSystem.get(depsJarPath.toUri(), new Configuration()), sparkContext,
+              depsJarPath);
+    }
+  }
+
+  protected void moveSegmentsToOutputDir(FileSystem outputDirFileSystem, String stagingDir, String outputDir)
+      throws IOException {
+    Path segmentTarDir = new Path(new Path(stagingDir, "output"), JobConfigConstants.SEGMENT_TAR_DIR);
+    for (FileStatus segmentTarStatus : outputDirFileSystem.listStatus(segmentTarDir)) {
+      Path segmentTarPath = segmentTarStatus.getPath();
+      Path dest = new Path(outputDir, segmentTarPath.getName());
+      LOGGER.info("Moving segment tar file from: {} to: {}", segmentTarPath, dest);
+      outputDirFileSystem.rename(segmentTarPath, dest);
+    }
+  }
+}
diff --git a/pinot-spark/src/main/java/org/apache/pinot/spark/jobs/SegmentCreationMapper.java b/pinot-spark/src/main/java/org/apache/pinot/spark/jobs/SegmentCreationMapper.java
new file mode 100644
index 0000000..f9e771b
--- /dev/null
+++ b/pinot-spark/src/main/java/org/apache/pinot/spark/jobs/SegmentCreationMapper.java
@@ -0,0 +1,323 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.pinot.spark.jobs;
+
+import com.google.common.base.Preconditions;
+import java.io.File;
+import java.io.IOException;
+import java.io.InputStream;
+import java.io.Serializable;
+import java.net.URI;
+import java.util.Map;
+import java.util.Properties;
+import javax.annotation.Nullable;
+import org.apache.commons.io.FileUtils;
+import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.fs.FileSystem;
+import org.apache.hadoop.fs.Path;
+import org.apache.pinot.common.config.SegmentsValidationAndRetentionConfig;
+import org.apache.pinot.common.config.TableConfig;
+import org.apache.pinot.common.data.Schema;
+import org.apache.pinot.common.data.TimeFieldSpec;
+import org.apache.pinot.common.utils.DataSize;
+import org.apache.pinot.common.utils.JsonUtils;
+import org.apache.pinot.common.utils.TarGzCompressionUtils;
+import org.apache.pinot.core.data.readers.CSVRecordReaderConfig;
+import org.apache.pinot.core.data.readers.FileFormat;
+import org.apache.pinot.core.data.readers.RecordReaderConfig;
+import org.apache.pinot.core.data.readers.ThriftRecordReaderConfig;
+import org.apache.pinot.core.indexsegment.generator.SegmentGeneratorConfig;
+import org.apache.pinot.core.segment.creator.SegmentIndexCreationDriver;
+import org.apache.pinot.core.segment.creator.impl.SegmentIndexCreationDriverImpl;
+import org.apache.pinot.core.segment.name.NormalizedDateSegmentNameGenerator;
+import org.apache.pinot.core.segment.name.SegmentNameGenerator;
+import org.apache.pinot.core.segment.name.SimpleSegmentNameGenerator;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+
+public class SegmentCreationMapper implements Serializable {
+  protected static final String LOCAL_TEMP_DIR = "pinot_hadoop_tmp";
+
+  protected final Logger _logger = LoggerFactory.getLogger(getClass());
+
+  protected Configuration _jobConf;
+  protected String _rawTableName;
+  protected Schema _schema;
+  protected SegmentNameGenerator _segmentNameGenerator;
+  protected boolean _useRelativePath;
+
+  // Optional
+  protected TableConfig _tableConfig;
+  protected String _recordReaderPath;
+  protected Path _readerConfigFile;
+
+  // HDFS segment tar directory
+  protected Path _hdfsSegmentTarDir;
+
+  // Temporary local directories
+  protected File _localStagingDir;
+  protected File _localInputDir;
+  protected File _localSegmentDir;
+  protected File _localSegmentTarDir;
+
+  public SegmentCreationMapper(Properties properties, String workerOutputPath)
+      throws IOException {
+    _jobConf = new Configuration();
+    for (Map.Entry<Object, Object> entry : properties.entrySet()) {
+      _jobConf.set(entry.getKey().toString(), entry.getValue().toString());
+    }
+
+    logConfigurations();
+
+    _useRelativePath = _jobConf.getBoolean(JobConfigConstants.USE_RELATIVE_PATH, false);
+    _rawTableName = _jobConf.get(JobConfigConstants.SEGMENT_TABLE_NAME);
+    _schema = Schema.fromString(_jobConf.get(JobConfigConstants.SCHEMA));
+
+    // Optional
+    String tableConfigString = _jobConf.get(JobConfigConstants.TABLE_CONFIG);
+    if (tableConfigString != null) {
+      _tableConfig = TableConfig.fromJsonString(tableConfigString);
+    }
+    String readerConfigFile = _jobConf.get(JobConfigConstants.PATH_TO_READER_CONFIG);
+    if (readerConfigFile != null) {
+      _readerConfigFile = new Path(readerConfigFile);
+    }
+    _recordReaderPath = _jobConf.get(JobConfigConstants.RECORD_READER_PATH);
+
+    // Set up segment name generator
+    String segmentNameGeneratorType =
+        _jobConf.get(JobConfigConstants.SEGMENT_NAME_GENERATOR_TYPE, JobConfigConstants.DEFAULT_SEGMENT_NAME_GENERATOR);
+    switch (segmentNameGeneratorType) {
+      case JobConfigConstants.SIMPLE_SEGMENT_NAME_GENERATOR:
+        _segmentNameGenerator =
+            new SimpleSegmentNameGenerator(_rawTableName, _jobConf.get(JobConfigConstants.SEGMENT_NAME_POSTFIX));
+        break;
+      case JobConfigConstants.NORMALIZED_DATE_SEGMENT_NAME_GENERATOR:
+        Preconditions.checkState(_tableConfig != null,
+            "In order to use NormalizedDateSegmentNameGenerator, table config must be provided");
+        SegmentsValidationAndRetentionConfig validationConfig = _tableConfig.getValidationConfig();
+        String timeFormat = null;
+        TimeFieldSpec timeFieldSpec = _schema.getTimeFieldSpec();
+        if (timeFieldSpec != null) {
+          timeFormat = timeFieldSpec.getOutgoingGranularitySpec().getTimeFormat();
+        }
+        _segmentNameGenerator =
+            new NormalizedDateSegmentNameGenerator(_rawTableName, _jobConf.get(JobConfigConstants.SEGMENT_NAME_PREFIX),
+                _jobConf.getBoolean(JobConfigConstants.EXCLUDE_SEQUENCE_ID, false),
+                validationConfig.getSegmentPushType(), validationConfig.getSegmentPushFrequency(),
+                validationConfig.getTimeType(), timeFormat);
+        break;
+      default:
+        throw new UnsupportedOperationException("Unsupported segment name generator type: " + segmentNameGeneratorType);
+    }
+
+    // Working directories
+    _hdfsSegmentTarDir = new Path(workerOutputPath, JobConfigConstants.SEGMENT_TAR_DIR);
+    _localStagingDir = new File(LOCAL_TEMP_DIR);
+    _localInputDir = new File(_localStagingDir, "inputData");
+    _localSegmentDir = new File(_localStagingDir, "segments");
+    _localSegmentTarDir = new File(_localStagingDir, JobConfigConstants.SEGMENT_TAR_DIR);
+
+    if (_localStagingDir.exists()) {
+      _logger.warn("Deleting existing file: {}", _localStagingDir);
+      FileUtils.forceDelete(_localStagingDir);
+    }
+    _logger
+        .info("Making local temporary directories: {}, {}, {}", _localStagingDir, _localInputDir, _localSegmentTarDir);
+    Preconditions.checkState(_localStagingDir.mkdirs());
+    Preconditions.checkState(_localInputDir.mkdir());
+    Preconditions.checkState(_localSegmentDir.mkdir());
+    Preconditions.checkState(_localSegmentTarDir.mkdir());
+
+    _logger.info("*********************************************************************");
+    _logger.info("Raw Table Name: {}", _rawTableName);
+    _logger.info("Schema: {}", _schema);
+    _logger.info("Segment Name Generator: {}", _segmentNameGenerator);
+    _logger.info("Table Config: {}", _tableConfig);
+    _logger.info("Reader Config File: {}", _readerConfigFile);
+    _logger.info("*********************************************************************");
+    _logger.info("HDFS Segment Tar Directory: {}", _hdfsSegmentTarDir);
+    _logger.info("Local Staging Directory: {}", _localStagingDir);
+    _logger.info("Local Input Directory: {}", _localInputDir);
+    _logger.info("Local Segment Tar Directory: {}", _localSegmentTarDir);
+    _logger.info("*********************************************************************");
+  }
+
+  /**
+   * Generate a relative output directory path when `useRelativePath` flag is on.
+   * This method will compute the relative path based on `inputFile` and `baseInputDir`,
+   * then apply only the directory part of relative path to `outputDir`.
+   * E.g.
+   *    baseInputDir = "/path/to/input"
+   *    inputFile = "/path/to/input/a/b/c/d.avro"
+   *    outputDir = "/path/to/output"
+   *    getRelativeOutputPath(baseInputDir, inputFile, outputDir) = /path/to/output/a/b/c
+   */
+  protected static Path getRelativeOutputPath(URI baseInputDir, URI inputFile, Path outputDir) {
+    URI relativePath = baseInputDir.relativize(inputFile);
+    Preconditions.checkState(relativePath.getPath().length() > 0 && !relativePath.equals(inputFile),
+        "Unable to extract out the relative path based on base input path: " + baseInputDir);
+    return new Path(outputDir, relativePath.getPath()).getParent();
+  }
+
+  protected void logConfigurations() {
+    StringBuilder stringBuilder = new StringBuilder();
+    stringBuilder.append('{');
+    boolean firstEntry = true;
+    for (Map.Entry<String, String> entry : _jobConf) {
+      if (!firstEntry) {
+        stringBuilder.append(", ");
+      } else {
+        firstEntry = false;
+      }
+
+      stringBuilder.append(entry.getKey());
+      stringBuilder.append('=');
+      stringBuilder.append(entry.getValue());
+    }
+    stringBuilder.append('}');
+
+    _logger.info("*********************************************************************");
+    _logger.info("Job Configurations: {}", stringBuilder.toString());
+    _logger.info("*********************************************************************");
+  }
+
+  protected void run(String hdfsInputFileString, Long seqId)
+      throws IOException, InterruptedException {
+    Path hdfsInputFile = new Path(hdfsInputFileString);
+    int sequenceId = seqId.intValue();
+    _logger.info("Generating segment with HDFS input file: {}, sequence id: {}", hdfsInputFile, sequenceId);
+
+    String inputFileName = hdfsInputFile.getName();
+    File localInputFile = new File(_localInputDir, inputFileName);
+    _logger.info("Copying input file from: {} to: {}", hdfsInputFile, localInputFile);
+    FileSystem.get(hdfsInputFile.toUri(), _jobConf)
+        .copyToLocalFile(hdfsInputFile, new Path(localInputFile.getAbsolutePath()));
+
+    SegmentGeneratorConfig segmentGeneratorConfig = new SegmentGeneratorConfig(_tableConfig, _schema);
+    segmentGeneratorConfig.setTableName(_rawTableName);
+    segmentGeneratorConfig.setInputFilePath(localInputFile.getPath());
+    segmentGeneratorConfig.setOutDir(_localSegmentDir.getPath());
+    segmentGeneratorConfig.setSegmentNameGenerator(_segmentNameGenerator);
+    segmentGeneratorConfig.setSequenceId(sequenceId);
+    if (_recordReaderPath != null) {
+      segmentGeneratorConfig.setRecordReaderPath(_recordReaderPath);
+      segmentGeneratorConfig.setFormat(FileFormat.OTHER);
+    } else {
+      FileFormat fileFormat = getFileFormat(inputFileName);
+      segmentGeneratorConfig.setFormat(fileFormat);
+      segmentGeneratorConfig.setReaderConfig(getReaderConfig(fileFormat));
+    }
+    segmentGeneratorConfig.setOnHeap(true);
+
+    addAdditionalSegmentGeneratorConfigs(segmentGeneratorConfig, hdfsInputFile, sequenceId);
+
+    _logger.info("Start creating segment with sequence id: {}", sequenceId);
+    SegmentIndexCreationDriver driver = new SegmentIndexCreationDriverImpl();
+
+    try {
+      driver.init(segmentGeneratorConfig);
+      driver.build();
+    } catch (Exception e) {
+      _logger.error("Caught exception while creating segment with HDFS input file: {}, sequence id: {}", hdfsInputFile,
+          sequenceId, e);
+      throw new RuntimeException(e);
+    }
+    String segmentName = driver.getSegmentName();
+    _logger.info("Finish creating segment: {} with sequence id: {}", segmentName, sequenceId);
+
+    File localSegmentDir = new File(_localSegmentDir, segmentName);
+    String segmentTarFileName = segmentName + JobConfigConstants.TAR_GZ_FILE_EXT;
+    File localSegmentTarFile = new File(_localSegmentTarDir, segmentTarFileName);
+    _logger.info("Tarring segment from: {} to: {}", localSegmentDir, localSegmentTarFile);
+    TarGzCompressionUtils.createTarGzOfDirectory(localSegmentDir.getPath(), localSegmentTarFile.getPath());
+
+    long uncompressedSegmentSize = FileUtils.sizeOf(localSegmentDir);
+    long compressedSegmentSize = FileUtils.sizeOf(localSegmentTarFile);
+    _logger.info("Size for segment: {}, uncompressed: {}, compressed: {}", segmentName,
+        DataSize.fromBytes(uncompressedSegmentSize), DataSize.fromBytes(compressedSegmentSize));
+
+    Path hdfsSegmentTarFile = new Path(_hdfsSegmentTarDir, segmentTarFileName);
+    if (_useRelativePath) {
+      Path relativeOutputPath =
+          getRelativeOutputPath(new Path(_jobConf.get(JobConfigConstants.PATH_TO_INPUT)).toUri(), hdfsInputFile.toUri(),
+              _hdfsSegmentTarDir);
+      hdfsSegmentTarFile = new Path(relativeOutputPath, segmentTarFileName);
+    }
+    _logger.info("Copying segment tar file from: {} to: {}", localSegmentTarFile, hdfsSegmentTarFile);
+    FileSystem.get(hdfsSegmentTarFile.toUri(), _jobConf)
+        .copyFromLocalFile(true, true, new Path(localSegmentTarFile.getAbsolutePath()), hdfsSegmentTarFile);
+
+    _logger.info("Finish generating segment: {} with HDFS input file: {}, sequence id: {}", segmentName, hdfsInputFile,
+        sequenceId);
+  }
+
+  protected FileFormat getFileFormat(String fileName) {
+    if (fileName.endsWith(".avro")) {
+      return FileFormat.AVRO;
+    }
+    if (fileName.endsWith(".csv")) {
+      return FileFormat.CSV;
+    }
+    if (fileName.endsWith(".json")) {
+      return FileFormat.JSON;
+    }
+    if (fileName.endsWith(".thrift")) {
+      return FileFormat.THRIFT;
+    }
+    throw new IllegalArgumentException("Unsupported file format: {}" + fileName);
+  }
+
+  @Nullable
+  protected RecordReaderConfig getReaderConfig(FileFormat fileFormat)
+      throws IOException {
+    if (_readerConfigFile != null) {
+      if (fileFormat == FileFormat.CSV) {
+        try (InputStream inputStream = FileSystem.get(_readerConfigFile.toUri(), _jobConf).open(_readerConfigFile)) {
+          CSVRecordReaderConfig readerConfig = JsonUtils.inputStreamToObject(inputStream, CSVRecordReaderConfig.class);
+          _logger.info("Using CSV record reader config: {}", readerConfig);
+          return readerConfig;
+        }
+      }
+      if (fileFormat == FileFormat.THRIFT) {
+        try (InputStream inputStream = FileSystem.get(_readerConfigFile.toUri(), _jobConf).open(_readerConfigFile)) {
+          ThriftRecordReaderConfig readerConfig =
+              JsonUtils.inputStreamToObject(inputStream, ThriftRecordReaderConfig.class);
+          _logger.info("Using Thrift record reader config: {}", readerConfig);
+          return readerConfig;
+        }
+      }
+    }
+    return null;
+  }
+
+  /**
+   * Can be overridden to set additional segment generator configs.
+   */
+  @SuppressWarnings("unused")
+  protected void addAdditionalSegmentGeneratorConfigs(SegmentGeneratorConfig segmentGeneratorConfig, Path hdfsInputFile,
+      int sequenceId) {
+  }
+
+  public void cleanup() {
+    _logger.info("Deleting local temporary directory: {}", _localStagingDir);
+    FileUtils.deleteQuietly(_localStagingDir);
+  }
+}
diff --git a/pinot-spark/src/main/java/org/apache/pinot/spark/jobs/SegmentTarPushJob.java b/pinot-spark/src/main/java/org/apache/pinot/spark/jobs/SegmentTarPushJob.java
new file mode 100644
index 0000000..259e95b
--- /dev/null
+++ b/pinot-spark/src/main/java/org/apache/pinot/spark/jobs/SegmentTarPushJob.java
@@ -0,0 +1,116 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.pinot.spark.jobs;
+
+import com.google.common.base.Preconditions;
+import java.io.IOException;
+import java.util.ArrayList;
+import java.util.HashSet;
+import java.util.List;
+import java.util.Properties;
+import java.util.Set;
+import java.util.stream.Collectors;
+import org.apache.commons.lang3.StringUtils;
+import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.fs.FileSystem;
+import org.apache.hadoop.fs.Path;
+import org.apache.pinot.spark.utils.PushLocation;
+
+
+public class SegmentTarPushJob extends BaseSegmentJob {
+  private final Path _segmentPattern;
+  private final List<PushLocation> _pushLocations;
+  private final String _rawTableName;
+  private final boolean _deleteExtraSegments;
+
+  public SegmentTarPushJob(Properties properties) {
+    super(properties);
+    _segmentPattern = Preconditions.checkNotNull(getPathFromProperty(JobConfigConstants.PATH_TO_OUTPUT));
+    String[] hosts = StringUtils.split(properties.getProperty(JobConfigConstants.PUSH_TO_HOSTS), ',');
+    int port = Integer.parseInt(properties.getProperty(JobConfigConstants.PUSH_TO_PORT));
+    _pushLocations = PushLocation.getPushLocations(hosts, port);
+    _rawTableName = Preconditions.checkNotNull(_properties.getProperty(JobConfigConstants.SEGMENT_TABLE_NAME));
+    _deleteExtraSegments = Boolean.parseBoolean(properties.getProperty(JobConfigConstants.DELETE_EXTRA_SEGMENTS, "false"));
+  }
+
+  @Override
+  protected boolean isDataFile(String fileName) {
+    return fileName.endsWith(JobConfigConstants.TAR_GZ_FILE_EXT);
+  }
+
+  public void run()
+      throws Exception {
+    FileSystem fileSystem = FileSystem.get(_segmentPattern.toUri(), new Configuration());
+    List<Path> segmentsToPush = getDataFilePaths(_segmentPattern);
+    try (ControllerRestApi controllerRestApi = getControllerRestApi()) {
+      // TODO: Deal with invalid prefixes in the future
+
+      List<String> currentSegments = controllerRestApi.getAllSegments("OFFLINE");
+
+      controllerRestApi.pushSegments(fileSystem, segmentsToPush);
+
+      if (_deleteExtraSegments) {
+        controllerRestApi.deleteSegmentUris(getSegmentsToDelete(currentSegments, segmentsToPush));
+      }
+    }
+  }
+
+  /**
+   * Deletes extra segments after pushing to the controller
+   * @param allSegments all segments on the controller for the table
+   * @param segmentsToPush segments that will be pushed to the controller
+   * @throws IOException
+   */
+  public List<String> getSegmentsToDelete(List<String> allSegments, List<Path> segmentsToPush) {
+    Set<String> uniqueSegmentPrefixes = new HashSet<>();
+
+    // Get all relevant segment prefixes that we are planning on pushing
+    List<String> segmentNamesToPush = segmentsToPush.stream().map(s -> s.getName()).collect(Collectors.toList());
+    for (String segmentName : segmentNamesToPush) {
+      String segmentNamePrefix = removeSequenceId(segmentName);
+      uniqueSegmentPrefixes.add(segmentNamePrefix);
+    }
+
+    List<String> relevantSegments = new ArrayList<>();
+    // Get relevant segments already pushed that we are planning on refreshing
+    for (String segmentName : allSegments) {
+      if (uniqueSegmentPrefixes.contains(removeSequenceId(segmentName))) {
+        relevantSegments.add(segmentName);
+      }
+    }
+
+    relevantSegments.removeAll(segmentNamesToPush);
+    return relevantSegments;
+  }
+
+  /**
+   * Remove trailing sequence id
+   * eg: If segment name is mytable_12, it will return mytable_
+   * If segment name is mytable_20190809_20190809_12, it will return mytable_20190809_20190809_
+   * @param segmentName
+   * @return
+   */
+  private String removeSequenceId(String segmentName) {
+    return segmentName.replaceAll("\\d*$", "");
+  }
+
+  protected ControllerRestApi getControllerRestApi() {
+    return new DefaultControllerRestApi(_pushLocations, _rawTableName);
+  }
+}
diff --git a/pinot-spark/src/main/java/org/apache/pinot/spark/jobs/SegmentUriPushJob.java b/pinot-spark/src/main/java/org/apache/pinot/spark/jobs/SegmentUriPushJob.java
new file mode 100644
index 0000000..910fae3
--- /dev/null
+++ b/pinot-spark/src/main/java/org/apache/pinot/spark/jobs/SegmentUriPushJob.java
@@ -0,0 +1,68 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.pinot.spark.jobs;
+
+import com.google.common.base.Preconditions;
+import java.util.ArrayList;
+import java.util.List;
+import java.util.Properties;
+import org.apache.commons.lang3.StringUtils;
+import org.apache.hadoop.fs.Path;
+import org.apache.pinot.spark.utils.PushLocation;
+
+
+public class SegmentUriPushJob extends BaseSegmentJob {
+  private final String _segmentUriPrefix;
+  private final String _segmentUriSuffix;
+  private final Path _segmentPattern;
+  private final List<PushLocation> _pushLocations;
+  private final String _rawTableName;
+
+  public SegmentUriPushJob(Properties properties) {
+    super(properties);
+    _segmentUriPrefix = properties.getProperty("uri.prefix", "");
+    _segmentUriSuffix = properties.getProperty("uri.suffix", "");
+    _segmentPattern = Preconditions.checkNotNull(getPathFromProperty(JobConfigConstants.PATH_TO_OUTPUT));
+    String[] hosts = StringUtils.split(properties.getProperty(JobConfigConstants.PUSH_TO_HOSTS), ',');
+    int port = Integer.parseInt(properties.getProperty(JobConfigConstants.PUSH_TO_PORT));
+    _pushLocations = PushLocation.getPushLocations(hosts, port);
+    _rawTableName = Preconditions.checkNotNull(_properties.getProperty(JobConfigConstants.SEGMENT_TABLE_NAME));
+  }
+
+  @Override
+  protected boolean isDataFile(String fileName) {
+    return fileName.endsWith(JobConfigConstants.TAR_GZ_FILE_EXT);
+  }
+
+  public void run()
+      throws Exception {
+    try (ControllerRestApi controllerRestApi = getControllerRestApi()) {
+      List<Path> tarFilePaths = getDataFilePaths(_segmentPattern);
+      List<String> segmentUris = new ArrayList<>(tarFilePaths.size());
+      for (Path tarFilePath : tarFilePaths) {
+        segmentUris.add(_segmentUriPrefix + tarFilePath.toUri().getRawPath() + _segmentUriSuffix);
+      }
+      controllerRestApi.sendSegmentUris(segmentUris);
+    }
+  }
+
+  protected ControllerRestApi getControllerRestApi() {
+    return new DefaultControllerRestApi(_pushLocations, _rawTableName);
+  }
+}
diff --git a/pinot-spark/src/main/java/org/apache/pinot/spark/utils/JobPreparationHelper.java b/pinot-spark/src/main/java/org/apache/pinot/spark/utils/JobPreparationHelper.java
new file mode 100644
index 0000000..92bc990
--- /dev/null
+++ b/pinot-spark/src/main/java/org/apache/pinot/spark/utils/JobPreparationHelper.java
@@ -0,0 +1,70 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.pinot.spark.utils;
+
+import java.io.IOException;
+import org.apache.hadoop.fs.FileStatus;
+import org.apache.hadoop.fs.FileSystem;
+import org.apache.hadoop.fs.Path;
+import org.apache.hadoop.fs.permission.FsPermission;
+import org.apache.spark.api.java.JavaSparkContext;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+
+public class JobPreparationHelper {
+  private static final Logger _logger = LoggerFactory.getLogger(JobPreparationHelper.class);
+
+  public static void mkdirs(FileSystem fileSystem, Path dirPath, String defaultPermissionsMask)
+      throws IOException {
+    if (fileSystem.exists(dirPath)) {
+      _logger.warn("Deleting existing file: {}", dirPath);
+      fileSystem.delete(dirPath, true);
+    }
+    _logger.info("Making directory: {}", dirPath);
+    fileSystem.mkdirs(dirPath);
+    JobPreparationHelper.setDirPermission(fileSystem, dirPath, defaultPermissionsMask);
+  }
+
+  public static void addDepsJarToDistributedCacheHelper(FileSystem fileSystem, JavaSparkContext sparkContext,
+      Path depsJarDir)
+      throws IOException {
+    FileStatus[] fileStatuses = fileSystem.listStatus(depsJarDir);
+    for (FileStatus fileStatus : fileStatuses) {
+      if (fileStatus.isDirectory()) {
+        addDepsJarToDistributedCacheHelper(fileSystem, sparkContext, fileStatus.getPath());
+      } else {
+        Path depJarPath = fileStatus.getPath();
+        if (depJarPath.getName().endsWith(".jar")) {
+          _logger.info("Adding deps jar: {} to distributed cache", depJarPath);
+          sparkContext.addJar(depJarPath.toUri().getPath());
+        }
+      }
+    }
+  }
+
+  public static void setDirPermission(FileSystem fileSystem, Path dirPath, String defaultPermissionsMask)
+      throws IOException {
+    if (defaultPermissionsMask != null) {
+      FsPermission permission = FsPermission.getDirDefault().applyUMask(new FsPermission(defaultPermissionsMask));
+      _logger.info("Setting permission: {} to directory: {}", permission, dirPath);
+      fileSystem.setPermission(dirPath, permission);
+    }
+  }
+}
diff --git a/pinot-spark/src/main/java/org/apache/pinot/spark/utils/PushLocation.java b/pinot-spark/src/main/java/org/apache/pinot/spark/utils/PushLocation.java
new file mode 100644
index 0000000..8793a1a
--- /dev/null
+++ b/pinot-spark/src/main/java/org/apache/pinot/spark/utils/PushLocation.java
@@ -0,0 +1,54 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.pinot.spark.utils;
+
+import java.util.ArrayList;
+import java.util.List;
+
+
+public class PushLocation {
+  private final String _host;
+  private final int _port;
+
+  public PushLocation(String host, int port) {
+    _host = host;
+    _port = port;
+  }
+
+  public static List<PushLocation> getPushLocations(String[] hosts, int port) {
+    List<PushLocation> pushLocations = new ArrayList<>(hosts.length);
+    for (String host : hosts) {
+      pushLocations.add(new PushLocation(host, port));
+    }
+    return pushLocations;
+  }
+
+  public String getHost() {
+    return _host;
+  }
+
+  public int getPort() {
+    return _port;
+  }
+
+  @Override
+  public String toString() {
+    return _host + ":" + _port;
+  }
+}
diff --git a/pom.xml b/pom.xml
index e6dc7f1..6ceee81 100644
--- a/pom.xml
+++ b/pom.xml
@@ -128,6 +128,7 @@
     <hadoop.version>2.7.0</hadoop.version>
     <spark.version>2.2.0</spark.version>
     <scala.binary.version>2.11</scala.binary.version>
+    <scala.version>2.11.11</scala.version>
     <antlr.version>4.6</antlr.version>
     <calcite.version>1.19.0</calcite.version>
     <!-- commons-configuration, hadoop-common, hadoop-client use commons-lang -->
@@ -597,6 +598,16 @@
         <artifactId>spark-sql_${scala.binary.version}</artifactId>
         <version>${spark.version}</version>
       </dependency>
+      <dependency>
+        <groupId>org.scala-lang</groupId>
+        <artifactId>scala-library</artifactId>
+        <version>${scala.version}</version>
+      </dependency>
+      <dependency>
+        <groupId>org.scala-lang</groupId>
+        <artifactId>scala-reflect</artifactId>
+        <version>${scala.version}</version>
+      </dependency>
 
       <!-- Hadoop  -->
       <dependency>


---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscribe@pinot.apache.org
For additional commands, e-mail: commits-help@pinot.apache.org