You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@pinot.apache.org by xi...@apache.org on 2019/11/06 11:05:53 UTC

[incubator-pinot] 02/02: Initial commit for pinot-spark

This is an automated email from the ASF dual-hosted git repository.

xiangfu pushed a commit to branch pinot-spark
in repository https://gitbox.apache.org/repos/asf/incubator-pinot.git

commit 6fbd9da6d6d16412403cda0cb9ee9a1b9c1056dd
Author: Xiang Fu <fx...@gmail.com>
AuthorDate: Wed Nov 6 03:05:19 2019 -0800

    Initial commit for pinot-spark
---
 pinot-spark/pom.xml                                |  52 ++-
 .../apache/pinot/spark/PinotSparkJobLauncher.java  |  80 ++++
 .../apache/pinot/spark/jobs/BaseSegmentJob.java    | 137 +++++++
 .../apache/pinot/spark/jobs/ControllerRestApi.java |  42 ++
 .../pinot/spark/jobs/DefaultControllerRestApi.java | 192 +++++++++
 .../pinot/spark/jobs/JobConfigConstants.java       |  65 ++++
 .../pinot/spark/jobs/SegmentCreationJob.java       | 427 +++++++++++++++++++++
 .../pinot/spark/jobs/SegmentCreationMapper.java    | 323 ++++++++++++++++
 .../apache/pinot/spark/jobs/SegmentTarPushJob.java | 116 ++++++
 .../apache/pinot/spark/jobs/SegmentUriPushJob.java |  68 ++++
 .../pinot/spark/utils/JobPreparationHelper.java    |  70 ++++
 .../org/apache/pinot/spark/utils/PushLocation.java |  54 +++
 pom.xml                                            |  11 +
 13 files changed, 1628 insertions(+), 9 deletions(-)

diff --git a/pinot-spark/pom.xml b/pinot-spark/pom.xml
index 615b21a..6c56c38 100644
--- a/pinot-spark/pom.xml
+++ b/pinot-spark/pom.xml
@@ -71,7 +71,7 @@
                   <transformers>
                     <transformer implementation="org.apache.maven.plugins.shade.resource.ServicesResourceTransformer"/>
                     <transformer implementation="org.apache.maven.plugins.shade.resource.ManifestResourceTransformer">
-                      <mainClass>org.apache.pinot.hadoop.PinotHadoopJobLauncher</mainClass>
+                      <mainClass>org.apache.pinot.spark.PinotSparkJobLauncher</mainClass>
                     </transformer>
                   </transformers>
                 </configuration>
@@ -95,6 +95,10 @@
           <groupId>org.apache.pinot</groupId>
           <artifactId>pinot-common</artifactId>
         </exclusion>
+        <exclusion>
+          <groupId>com.thoughtworks.paranamer</groupId>
+          <artifactId>paranamer</artifactId>
+        </exclusion>
       </exclusions>
     </dependency>
     <dependency>
@@ -123,17 +127,34 @@
           <groupId>org.apache.pinot</groupId>
           <artifactId>pinot-core</artifactId>
         </exclusion>
+        <exclusion>
+          <groupId>org.apache.hadoop</groupId>
+          <artifactId>hadoop-mapreduce-client-core</artifactId>
+        </exclusion>
+        <exclusion>
+          <groupId>org.apache.hadoop</groupId>
+          <artifactId>hadoop-hdfs</artifactId>
+        </exclusion>
       </exclusions>
     </dependency>
     <dependency>
       <groupId>org.apache.spark</groupId>
       <artifactId>spark-core_${scala.binary.version}</artifactId>
       <scope>provided</scope>
-    </dependency>
-    <dependency>
-      <groupId>org.apache.spark</groupId>
-      <artifactId>spark-sql_${scala.binary.version}</artifactId>
-      <scope>provided</scope>
+      <exclusions>
+        <exclusion>
+          <groupId>org.scala-lang</groupId>
+          <artifactId>scala-reflect</artifactId>
+        </exclusion>
+        <exclusion>
+          <groupId>org.scala-lang</groupId>
+          <artifactId>scala-library</artifactId>
+        </exclusion>
+        <exclusion>
+          <groupId>javax.activation</groupId>
+          <artifactId>activation</artifactId>
+        </exclusion>
+      </exclusions>
     </dependency>
     <dependency>
       <groupId>commons-logging</groupId>
@@ -149,11 +170,24 @@
       <scope>provided</scope>
     </dependency>
     <dependency>
-      <groupId>org.apache.avro</groupId>
-      <artifactId>avro-mapred</artifactId>
-      <classifier>hadoop2</classifier>
+      <groupId>org.scala-lang</groupId>
+      <artifactId>scala-library</artifactId>
+    </dependency>
+    <dependency>
+      <groupId>org.scala-lang</groupId>
+      <artifactId>scala-reflect</artifactId>
+    </dependency>
+    <dependency>
+      <groupId>com.fasterxml.jackson.core</groupId>
+      <artifactId>jackson-core</artifactId>
     </dependency>
     <dependency>
+      <groupId>com.fasterxml.jackson.core</groupId>
+      <artifactId>jackson-databind</artifactId>
+    </dependency>
+
+    <!--Test-->
+    <dependency>
       <groupId>org.testng</groupId>
       <artifactId>testng</artifactId>
       <scope>test</scope>
diff --git a/pinot-spark/src/main/java/org/apache/pinot/spark/PinotSparkJobLauncher.java b/pinot-spark/src/main/java/org/apache/pinot/spark/PinotSparkJobLauncher.java
new file mode 100644
index 0000000..6ed9422
--- /dev/null
+++ b/pinot-spark/src/main/java/org/apache/pinot/spark/PinotSparkJobLauncher.java
@@ -0,0 +1,80 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.pinot.spark;
+
+import java.io.FileInputStream;
+import java.util.Arrays;
+import java.util.Properties;
+import org.apache.pinot.spark.jobs.SegmentCreationJob;
+import org.apache.pinot.spark.jobs.SegmentTarPushJob;
+
+
+public class PinotSparkJobLauncher {
+
+  enum PinotSparkJobType {
+    SegmentCreation, SegmentTarPush, SegmentCreationAndTarPush
+  }
+
+  private static final String USAGE = "usage: [job_type] [job.properties]";
+  private static final String SUPPORT_JOB_TYPES =
+      "\tsupport job types: " + Arrays.toString(PinotSparkJobType.values());
+
+  private static void usage() {
+    System.err.println(USAGE);
+    System.err.println(SUPPORT_JOB_TYPES);
+  }
+
+  private static void kickOffPinotSparkJob(PinotSparkJobType jobType, Properties jobConf)
+      throws Exception {
+    switch (jobType) {
+      case SegmentCreation:
+        new SegmentCreationJob(jobConf).run();
+        break;
+      case SegmentTarPush:
+        new SegmentTarPushJob(jobConf).run();
+        break;
+      case SegmentCreationAndTarPush:
+        new SegmentCreationJob(jobConf).run();
+        new SegmentTarPushJob(jobConf).run();
+        break;
+      default:
+        throw new RuntimeException("Not a valid jobType - " + jobType);
+    }
+  }
+
+
+  public static void main(String[] args)
+      throws Exception {
+    if (args.length != 2) {
+      usage();
+      System.exit(1);
+    }
+    PinotSparkJobType jobType = null;
+    Properties jobConf = null;
+    try {
+      jobType = PinotSparkJobType.valueOf(args[0]);
+      jobConf = new Properties();
+      jobConf.load(new FileInputStream(args[1]));
+    } catch (Exception e) {
+      usage();
+      System.exit(1);
+    }
+    kickOffPinotSparkJob(jobType, jobConf);
+  }
+}
diff --git a/pinot-spark/src/main/java/org/apache/pinot/spark/jobs/BaseSegmentJob.java b/pinot-spark/src/main/java/org/apache/pinot/spark/jobs/BaseSegmentJob.java
new file mode 100644
index 0000000..3285899
--- /dev/null
+++ b/pinot-spark/src/main/java/org/apache/pinot/spark/jobs/BaseSegmentJob.java
@@ -0,0 +1,137 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.pinot.spark.jobs;
+
+import com.google.common.base.Preconditions;
+import java.io.IOException;
+import java.io.Serializable;
+import java.util.ArrayList;
+import java.util.List;
+import java.util.Properties;
+import javax.annotation.Nullable;
+import org.apache.commons.lang3.StringUtils;
+import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.fs.FileStatus;
+import org.apache.hadoop.fs.FileSystem;
+import org.apache.hadoop.fs.Path;
+import org.apache.pinot.common.Utils;
+import org.apache.pinot.common.config.TableConfig;
+import org.apache.pinot.spark.utils.PushLocation;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+
+public abstract class BaseSegmentJob implements Serializable {
+  protected final Logger _logger = LoggerFactory.getLogger(getClass());
+  protected final Properties _properties;
+  protected final List<PushLocation> _pushLocations;
+  protected final String _rawTableName;
+
+  protected BaseSegmentJob(Properties properties) {
+    _properties = properties;
+    Utils.logVersions();
+    logProperties();
+
+    // Optional push location and table parameters. If set, will use the table config and schema from the push hosts.
+    String pushHostsString = _properties.getProperty(JobConfigConstants.PUSH_TO_HOSTS);
+    String pushPortString = _properties.getProperty(JobConfigConstants.PUSH_TO_PORT);
+    if (pushHostsString != null && pushPortString != null) {
+      _pushLocations =
+          PushLocation.getPushLocations(StringUtils.split(pushHostsString, ','), Integer.parseInt(pushPortString));
+    } else {
+      _pushLocations = null;
+    }
+
+    _rawTableName = Preconditions.checkNotNull(_properties.getProperty(JobConfigConstants.SEGMENT_TABLE_NAME));
+  }
+
+  @Nullable
+  protected TableConfig getTableConfig()
+      throws IOException {
+    try (ControllerRestApi controllerRestApi = getControllerRestApi()) {
+      return controllerRestApi != null ? controllerRestApi.getTableConfig() : null;
+    }
+  }
+
+  /**
+   * This method is currently implemented in SegmentCreationJob and SegmentPreprocessingJob due to a dependency on
+   * the hadoop filesystem, which we can only get once the job begins to run.
+   * We return null here to make it clear that for now, all implementations of this method have to support
+   * reading from a schema file. In the future, we hope to deprecate reading the schema from the schema file in favor
+   * of mandating that a schema is pushed to the controller.
+   */
+  @Nullable
+  protected org.apache.pinot.common.data.Schema getSchema() throws IOException {
+    return null;
+  }
+
+  /**
+   * Can be overridden to provide custom controller Rest API.
+   */
+  @Nullable
+  protected ControllerRestApi getControllerRestApi() {
+    return _pushLocations != null ? new DefaultControllerRestApi(_pushLocations, _rawTableName) : null;
+  }
+
+  protected void logProperties() {
+    _logger.info("*********************************************************************");
+    _logger.info("Job Properties: {}", _properties);
+    _logger.info("*********************************************************************");
+  }
+
+  @Nullable
+  protected Path getPathFromProperty(String key) {
+    String value = _properties.getProperty(key);
+    return value != null ? new Path(value) : null;
+  }
+
+  protected List<Path> getDataFilePaths(Path pathPattern)
+      throws IOException {
+    List<Path> tarFilePaths = new ArrayList<>();
+    FileSystem fileSystem = FileSystem.get(pathPattern.toUri(), new Configuration());
+    _logger.info("Using filesystem: {}", fileSystem);
+    FileStatus[] fileStatuses = fileSystem.globStatus(pathPattern);
+    if (fileStatuses == null) {
+      _logger.warn("Unable to match file status from file path pattern: {}", pathPattern);
+    } else {
+      getDataFilePathsHelper(fileSystem, fileStatuses, tarFilePaths);
+    }
+    return tarFilePaths;
+  }
+
+  protected void getDataFilePathsHelper(FileSystem fileSystem, FileStatus[] fileStatuses, List<Path> tarFilePaths)
+      throws IOException {
+    for (FileStatus fileStatus : fileStatuses) {
+      Path path = fileStatus.getPath();
+      if (fileStatus.isDirectory()) {
+        getDataFilePathsHelper(fileSystem, fileSystem.listStatus(path), tarFilePaths);
+      } else {
+        // Skip temp files generated by computation frameworks like Hadoop/Spark.
+        if (path.getName().startsWith("_") || path.getName().startsWith(".")) {
+          continue;
+        }
+        if (isDataFile(path.getName())) {
+          tarFilePaths.add(path);
+        }
+      }
+    }
+  }
+
+  protected abstract boolean isDataFile(String fileName);
+}
diff --git a/pinot-spark/src/main/java/org/apache/pinot/spark/jobs/ControllerRestApi.java b/pinot-spark/src/main/java/org/apache/pinot/spark/jobs/ControllerRestApi.java
new file mode 100644
index 0000000..4d0391c
--- /dev/null
+++ b/pinot-spark/src/main/java/org/apache/pinot/spark/jobs/ControllerRestApi.java
@@ -0,0 +1,42 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.pinot.spark.jobs;
+
+import java.io.Closeable;
+import java.util.List;
+import org.apache.hadoop.fs.FileSystem;
+import org.apache.hadoop.fs.Path;
+import org.apache.pinot.common.config.TableConfig;
+import org.apache.pinot.common.data.Schema;
+
+
+public interface ControllerRestApi extends Closeable {
+
+  TableConfig getTableConfig();
+
+  Schema getSchema();
+
+  void pushSegments(FileSystem fileSystem, List<Path> tarFilePaths);
+
+  void sendSegmentUris(List<String> segmentUris);
+
+  void deleteSegmentUris(List<String> segmentUris);
+
+  List<String> getAllSegments(String tableType);
+}
diff --git a/pinot-spark/src/main/java/org/apache/pinot/spark/jobs/DefaultControllerRestApi.java b/pinot-spark/src/main/java/org/apache/pinot/spark/jobs/DefaultControllerRestApi.java
new file mode 100644
index 0000000..c9444a9
--- /dev/null
+++ b/pinot-spark/src/main/java/org/apache/pinot/spark/jobs/DefaultControllerRestApi.java
@@ -0,0 +1,192 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.pinot.spark.jobs;
+
+import com.fasterxml.jackson.databind.JsonNode;
+import com.fasterxml.jackson.databind.ObjectMapper;
+import com.google.common.base.Preconditions;
+import java.io.IOException;
+import java.io.InputStream;
+import java.util.ArrayList;
+import java.util.List;
+import org.apache.hadoop.fs.FileSystem;
+import org.apache.hadoop.fs.Path;
+import org.apache.pinot.common.config.TableConfig;
+import org.apache.pinot.common.data.Schema;
+import org.apache.pinot.common.utils.FileUploadDownloadClient;
+import org.apache.pinot.common.utils.JsonUtils;
+import org.apache.pinot.common.utils.SimpleHttpResponse;
+import org.apache.pinot.spark.utils.PushLocation;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+
+public class DefaultControllerRestApi implements ControllerRestApi {
+  private static final Logger LOGGER = LoggerFactory.getLogger(DefaultControllerRestApi.class);
+
+  private final List<PushLocation> _pushLocations;
+  private final String _rawTableName;
+  private final FileUploadDownloadClient _fileUploadDownloadClient = new FileUploadDownloadClient();
+
+  private static final String OFFLINE = "OFFLINE";
+
+  public DefaultControllerRestApi(List<PushLocation> pushLocations, String rawTableName) {
+    LOGGER.info("Push locations are: {} for table: {}", pushLocations, rawTableName);
+    _pushLocations = pushLocations;
+    _rawTableName = rawTableName;
+  }
+
+  @Override
+  public TableConfig getTableConfig() {
+    for (PushLocation pushLocation : _pushLocations) {
+      try {
+        SimpleHttpResponse response = _fileUploadDownloadClient.sendGetRequest(FileUploadDownloadClient
+            .getRetrieveTableConfigHttpURI(pushLocation.getHost(), pushLocation.getPort(), _rawTableName));
+        JsonNode offlineJsonTableConfig = JsonUtils.stringToJsonNode(response.getResponse()).get(OFFLINE);
+        if (offlineJsonTableConfig != null) {
+          TableConfig offlineTableConfig = TableConfig.fromJsonConfig(offlineJsonTableConfig);
+          LOGGER.info("Got table config: {}", offlineTableConfig);
+          return offlineTableConfig;
+        }
+      } catch (Exception e) {
+        LOGGER.warn("Caught exception while fetching table config for table: {} from push location: {}", _rawTableName,
+            pushLocation, e);
+      }
+    }
+    String errorMessage = String
+        .format("Failed to get table config from push locations: %s for table: %s", _pushLocations, _rawTableName);
+    LOGGER.error(errorMessage);
+    throw new RuntimeException(errorMessage);
+  }
+
+  @Override
+  public Schema getSchema() {
+    for (PushLocation pushLocation : _pushLocations) {
+      try {
+        SimpleHttpResponse response = _fileUploadDownloadClient.sendGetRequest(FileUploadDownloadClient
+            .getRetrieveSchemaHttpURI(pushLocation.getHost(), pushLocation.getPort(), _rawTableName));
+        Schema schema = Schema.fromString(response.getResponse());
+        LOGGER.info("Got schema: {}", schema);
+        return schema;
+      } catch (Exception e) {
+        LOGGER.warn("Caught exception while fetching schema for table: {} from push location: {}", _rawTableName,
+            pushLocation, e);
+      }
+    }
+    String errorMessage =
+        String.format("Failed to get schema from push locations: %s for table: %s", _pushLocations, _rawTableName);
+    LOGGER.error(errorMessage);
+    throw new RuntimeException(errorMessage);
+  }
+
+  @Override
+  public void pushSegments(FileSystem fileSystem, List<Path> tarFilePaths) {
+    LOGGER.info("Start pushing segments: {} to locations: {}", tarFilePaths, _pushLocations);
+    for (Path tarFilePath : tarFilePaths) {
+      String fileName = tarFilePath.getName();
+      Preconditions.checkArgument(fileName.endsWith(JobConfigConstants.TAR_GZ_FILE_EXT));
+      String segmentName = fileName.substring(0, fileName.length() - JobConfigConstants.TAR_GZ_FILE_EXT.length());
+      for (PushLocation pushLocation : _pushLocations) {
+        LOGGER.info("Pushing segment: {} to location: {}", segmentName, pushLocation);
+        try (InputStream inputStream = fileSystem.open(tarFilePath)) {
+          SimpleHttpResponse response = _fileUploadDownloadClient.uploadSegment(
+              FileUploadDownloadClient.getUploadSegmentHttpURI(pushLocation.getHost(), pushLocation.getPort()),
+              segmentName, inputStream, _rawTableName);
+          LOGGER.info("Response {}: {}", response.getStatusCode(), response.getResponse());
+        } catch (Exception e) {
+          LOGGER.error("Caught exception while pushing segment: {} to location: {}", segmentName, pushLocation, e);
+          throw new RuntimeException(e);
+        }
+      }
+    }
+  }
+
+  @Override
+  public void sendSegmentUris(List<String> segmentUris) {
+    LOGGER.info("Start sending segment URIs: {} to locations: {}", segmentUris, _pushLocations);
+    for (String segmentUri : segmentUris) {
+      for (PushLocation pushLocation : _pushLocations) {
+        LOGGER.info("Sending segment URI: {} to location: {}", segmentUri, pushLocation);
+        try {
+          SimpleHttpResponse response = _fileUploadDownloadClient.sendSegmentUri(
+              FileUploadDownloadClient.getUploadSegmentHttpURI(pushLocation.getHost(), pushLocation.getPort()),
+              segmentUri, _rawTableName);
+          LOGGER.info("Response {}: {}", response.getStatusCode(), response.getResponse());
+        } catch (Exception e) {
+          LOGGER.error("Caught exception while sending segment URI: {} to location: {}", segmentUri, pushLocation, e);
+          throw new RuntimeException(e);
+        }
+      }
+    }
+  }
+
+  @Override
+  public void deleteSegmentUris(List<String> segmentUris) {
+    LOGGER.info("Start deleting segment URIs: {} to locations: {}", segmentUris, _pushLocations);
+    for (String segmentUri : segmentUris) {
+      for (PushLocation pushLocation : _pushLocations) {
+        LOGGER.info("Sending deleting segment URI: {} to location: {}", segmentUri, pushLocation);
+        try {
+          SimpleHttpResponse response = _fileUploadDownloadClient.sendDeleteRequest(
+              FileUploadDownloadClient.getDeleteSegmentHttpUri(pushLocation.getHost(), pushLocation.getPort(), _rawTableName,
+              segmentUri, "OFFLINE"));
+          LOGGER.info("Response {}: {}", response.getStatusCode(), response.getResponse());
+        } catch (Exception e) {
+          LOGGER.error("Caught exception while deleting segment URI: {} to location: {}", segmentUri, pushLocation, e);
+          throw new RuntimeException(e);
+        }
+      }
+    }
+  }
+
+  @Override
+  public List<String> getAllSegments(String tableType) {
+    LOGGER.info("Getting all segments of table {}", _rawTableName);
+    ObjectMapper objectMapper = new ObjectMapper();
+    for (PushLocation pushLocation : _pushLocations) {
+      try {
+        SimpleHttpResponse response = _fileUploadDownloadClient.sendGetRequest(
+            FileUploadDownloadClient.getRetrieveAllSegmentWithTableTypeHttpUri(pushLocation.getHost(), pushLocation.getPort(),
+                _rawTableName, tableType));
+        JsonNode segmentList = getSegmentsFromJsonSegmentAPI(response.getResponse(), tableType);
+        return objectMapper.convertValue(segmentList, ArrayList.class);
+      } catch (Exception e) {
+        LOGGER.warn("Caught exception while getting all {} segments for table: {} from push location: {}", tableType, _rawTableName,
+            pushLocation, e);
+      }
+    }
+    String errorMessage =
+        String.format("Failed to get a list of all segments from push locations: %s for table: %s", _pushLocations,
+            _rawTableName);
+    LOGGER.error(errorMessage);
+    throw new RuntimeException(errorMessage);
+
+  }
+
+  @Override
+  public void close()
+      throws IOException {
+    _fileUploadDownloadClient.close();
+  }
+
+  private JsonNode getSegmentsFromJsonSegmentAPI(String json, String tableType)
+      throws Exception {
+    return JsonUtils.stringToJsonNode(json).get(0).get(tableType);
+  }
+}
diff --git a/pinot-spark/src/main/java/org/apache/pinot/spark/jobs/JobConfigConstants.java b/pinot-spark/src/main/java/org/apache/pinot/spark/jobs/JobConfigConstants.java
new file mode 100644
index 0000000..f7636d3
--- /dev/null
+++ b/pinot-spark/src/main/java/org/apache/pinot/spark/jobs/JobConfigConstants.java
@@ -0,0 +1,65 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.pinot.spark.jobs;
+
+public class JobConfigConstants {
+  public static final String PATH_TO_INPUT = "path.to.input";
+  public static final String PATH_TO_OUTPUT = "path.to.output";
+  public static final String PREPROCESS_PATH_TO_OUTPUT = "preprocess.path.to.output";
+  public static final String PATH_TO_DEPS_JAR = "path.to.deps.jar";
+  public static final String PATH_TO_READER_CONFIG = "path.to.reader.config";
+  // Leave this for backward compatibility. We prefer to use the schema fetched from the controller.
+  public static final String PATH_TO_SCHEMA = "path.to.schema";
+
+  public static final String SEGMENT_TAR_DIR = "segmentTar";
+  public static final String TAR_GZ_FILE_EXT = ".tar.gz";
+
+  public static final String SEGMENT_TABLE_NAME = "segment.table.name";
+  public static final String TABLE_CONFIG = "table.config";
+  public static final String SCHEMA = "data.schema";
+
+  public static final String SEGMENT_NAME_GENERATOR_TYPE = "segment.name.generator.type";
+  public static final String SIMPLE_SEGMENT_NAME_GENERATOR = "simple";
+  public static final String NORMALIZED_DATE_SEGMENT_NAME_GENERATOR = "normalizedDate";
+  public static final String DEFAULT_SEGMENT_NAME_GENERATOR = SIMPLE_SEGMENT_NAME_GENERATOR;
+
+  // For SimpleSegmentNameGenerator
+  public static final String SEGMENT_NAME_POSTFIX = "segment.name.postfix";
+
+  // For NormalizedDateSegmentNameGenerator
+  public static final String SEGMENT_NAME_PREFIX = "segment.name.prefix";
+  public static final String EXCLUDE_SEQUENCE_ID = "exclude.sequence.id";
+
+  public static final String PUSH_TO_HOSTS = "push.to.hosts";
+  public static final String PUSH_TO_PORT = "push.to.port";
+
+  public static final String DEFAULT_PERMISSIONS_MASK = "fs.permissions.umask-mode";
+
+  // The path to the record reader to be configured
+  public static final String RECORD_READER_PATH = "record.reader.path";
+
+  public static final String ENABLE_PREPROCESSING = "enable.preprocessing";
+
+  // This setting should be used if you will generate less # of segments after
+  // push. In preprocessing, this is likely because we resize segments.
+  public static final String DELETE_EXTRA_SEGMENTS = "delete.extra.segments";
+
+  // This setting is used to match output segments hierarchy along with input file hierarchy.
+  public static final String USE_RELATIVE_PATH = "use.relative.path";
+}
diff --git a/pinot-spark/src/main/java/org/apache/pinot/spark/jobs/SegmentCreationJob.java b/pinot-spark/src/main/java/org/apache/pinot/spark/jobs/SegmentCreationJob.java
new file mode 100644
index 0000000..74f09ee
--- /dev/null
+++ b/pinot-spark/src/main/java/org/apache/pinot/spark/jobs/SegmentCreationJob.java
@@ -0,0 +1,427 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.pinot.spark.jobs;
+
+import com.google.common.base.Preconditions;
+import java.io.DataOutputStream;
+import java.io.File;
+import java.io.IOException;
+import java.io.InputStream;
+import java.io.Serializable;
+import java.net.URI;
+import java.util.ArrayList;
+import java.util.List;
+import java.util.Map;
+import java.util.Properties;
+import java.util.UUID;
+import javax.annotation.Nullable;
+import org.apache.commons.io.FileUtils;
+import org.apache.commons.lang3.StringUtils;
+import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.fs.FileStatus;
+import org.apache.hadoop.fs.FileSystem;
+import org.apache.hadoop.fs.Path;
+import org.apache.pinot.common.config.SegmentsValidationAndRetentionConfig;
+import org.apache.pinot.common.config.TableConfig;
+import org.apache.pinot.common.data.Schema;
+import org.apache.pinot.common.data.TimeFieldSpec;
+import org.apache.pinot.common.utils.DataSize;
+import org.apache.pinot.common.utils.JsonUtils;
+import org.apache.pinot.common.utils.StringUtil;
+import org.apache.pinot.common.utils.TarGzCompressionUtils;
+import org.apache.pinot.core.data.readers.CSVRecordReaderConfig;
+import org.apache.pinot.core.data.readers.FileFormat;
+import org.apache.pinot.core.data.readers.RecordReaderConfig;
+import org.apache.pinot.core.data.readers.ThriftRecordReaderConfig;
+import org.apache.pinot.core.indexsegment.generator.SegmentGeneratorConfig;
+import org.apache.pinot.core.segment.creator.SegmentIndexCreationDriver;
+import org.apache.pinot.core.segment.creator.impl.SegmentIndexCreationDriverImpl;
+import org.apache.pinot.core.segment.name.NormalizedDateSegmentNameGenerator;
+import org.apache.pinot.core.segment.name.SegmentNameGenerator;
+import org.apache.pinot.core.segment.name.SimpleSegmentNameGenerator;
+import org.apache.pinot.spark.utils.JobPreparationHelper;
+import org.apache.pinot.spark.utils.PushLocation;
+import org.apache.spark.api.java.JavaRDD;
+import org.apache.spark.api.java.JavaSparkContext;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+
+public class SegmentCreationJob extends BaseSegmentJob {
+  protected static final String APPEND = "APPEND";
+  protected static final String LOCAL_TEMP_DIR = "pinot_spark_tmp";
+  protected static final Logger LOGGER = LoggerFactory.getLogger(SegmentCreationJob.class);
+  protected final String _rawTableName;
+
+  protected final String _inputPattern;
+  protected final String _outputDir;
+  protected final String _stagingDir;
+  // Optional
+  protected final String _depsJarDir;
+  protected final String _schemaFile;
+  protected final String _defaultPermissionsMask;
+  protected final List<PushLocation> _pushLocations;
+
+  public SegmentCreationJob(Properties properties) {
+    super(properties);
+    new Configuration().set("mapreduce.job.user.classpath.first", "true");
+
+    _inputPattern = Preconditions.checkNotNull(getPathFromProperty(JobConfigConstants.PATH_TO_INPUT)).toString();
+    _outputDir = Preconditions.checkNotNull(getPathFromProperty(JobConfigConstants.PATH_TO_OUTPUT)).toString();
+    _stagingDir = new Path(_outputDir, UUID.randomUUID().toString()).toString();
+    _rawTableName = Preconditions.checkNotNull(_properties.getProperty(JobConfigConstants.SEGMENT_TABLE_NAME));
+
+    // Optional
+    _depsJarDir = properties.getProperty(JobConfigConstants.PATH_TO_DEPS_JAR);
+    _schemaFile = properties.getProperty(JobConfigConstants.PATH_TO_SCHEMA);
+    _defaultPermissionsMask = _properties.getProperty(JobConfigConstants.DEFAULT_PERMISSIONS_MASK);
+
+    // Optional push location and table parameters. If set, will use the table config and schema from the push hosts.
+    String pushHostsString = _properties.getProperty(JobConfigConstants.PUSH_TO_HOSTS);
+    String pushPortString = _properties.getProperty(JobConfigConstants.PUSH_TO_PORT);
+    if (pushHostsString != null && pushPortString != null) {
+      _pushLocations =
+          PushLocation.getPushLocations(StringUtils.split(pushHostsString, ','), Integer.parseInt(pushPortString));
+    } else {
+      _pushLocations = null;
+    }
+
+    LOGGER.info("*********************************************************************");
+    LOGGER.info("Input Pattern: {}", _inputPattern);
+    LOGGER.info("Output Directory: {}", _outputDir);
+    LOGGER.info("Staging Directory: {}", _stagingDir);
+    LOGGER.info("Raw Table Name: {}", _rawTableName);
+    LOGGER.info("Dependencies Directory: {}", _depsJarDir);
+    LOGGER.info("Schema File: {}", _schemaFile);
+    LOGGER.info("Default Permissions Mask: {}", _defaultPermissionsMask);
+    LOGGER.info("Push Locations: {}", _pushLocations);
+    LOGGER.info("*********************************************************************");
+  }
+
+  /**
+   * Generate a relative output directory path when `useRelativePath` flag is on.
+   * This method will compute the relative path based on `inputFile` and `baseInputDir`,
+   * then apply only the directory part of relative path to `outputDir`.
+   * E.g.
+   *    baseInputDir = "/path/to/input"
+   *    inputFile = "/path/to/input/a/b/c/d.avro"
+   *    outputDir = "/path/to/output"
+   *    getRelativeOutputPath(baseInputDir, inputFile, outputDir) = /path/to/output/a/b/c
+   */
+  protected static Path getRelativeOutputPath(URI baseInputDir, URI inputFile, Path outputDir) {
+    URI relativePath = baseInputDir.relativize(inputFile);
+    Preconditions.checkState(relativePath.getPath().length() > 0 && !relativePath.equals(inputFile),
+        "Unable to extract out the relative path based on base input path: " + baseInputDir);
+    return new Path(outputDir, relativePath.getPath()).getParent();
+  }
+
+  protected static void createSingleSegment(String inputFile, Long seqId, Configuration conf, String stagingDir)
+      throws IOException {
+    Path hdfsInputFile = new Path(inputFile);
+    int sequenceId = seqId.intValue();
+    LOGGER.info("Generating segment with HDFS input file: {}, sequence id: {}", hdfsInputFile, sequenceId);
+
+    String rawTableName = conf.get(JobConfigConstants.SEGMENT_TABLE_NAME);
+    Schema schema = Schema.fromString(conf.get(JobConfigConstants.SCHEMA));
+    SegmentNameGenerator segmentNameGenerator;
+    boolean useRelativePath = conf.getBoolean(JobConfigConstants.USE_RELATIVE_PATH, false);
+
+    // Optional
+    TableConfig tableConfig = null;
+    String recordReaderPath;
+    Path readerConfigFile = null;
+
+    String tableConfigString = conf.get(JobConfigConstants.TABLE_CONFIG);
+    if (tableConfigString != null) {
+      tableConfig = TableConfig.fromJsonString(tableConfigString);
+    }
+    String readerConfigFileStr = conf.get(JobConfigConstants.PATH_TO_READER_CONFIG);
+    if (readerConfigFileStr != null) {
+      readerConfigFile = new Path(readerConfigFileStr);
+    }
+    recordReaderPath = conf.get(JobConfigConstants.RECORD_READER_PATH);
+
+    // HDFS segment tar directory
+    Path hdfsSegmentTarDir = new Path(new Path(stagingDir, "output"), JobConfigConstants.SEGMENT_TAR_DIR);
+
+    // Set up segment name generator
+    String segmentNameGeneratorType =
+        conf.get(JobConfigConstants.SEGMENT_NAME_GENERATOR_TYPE, JobConfigConstants.DEFAULT_SEGMENT_NAME_GENERATOR);
+    switch (segmentNameGeneratorType) {
+      case JobConfigConstants.SIMPLE_SEGMENT_NAME_GENERATOR:
+        segmentNameGenerator =
+            new SimpleSegmentNameGenerator(rawTableName, conf.get(JobConfigConstants.SEGMENT_NAME_POSTFIX));
+        break;
+      case JobConfigConstants.NORMALIZED_DATE_SEGMENT_NAME_GENERATOR:
+        Preconditions.checkState(tableConfig != null,
+            "In order to use NormalizedDateSegmentNameGenerator, table config must be provided");
+        SegmentsValidationAndRetentionConfig validationConfig = tableConfig.getValidationConfig();
+        String timeFormat = null;
+        TimeFieldSpec timeFieldSpec = schema.getTimeFieldSpec();
+        if (timeFieldSpec != null) {
+          timeFormat = timeFieldSpec.getOutgoingGranularitySpec().getTimeFormat();
+        }
+        segmentNameGenerator =
+            new NormalizedDateSegmentNameGenerator(rawTableName, conf.get(JobConfigConstants.SEGMENT_NAME_PREFIX),
+                conf.getBoolean(JobConfigConstants.EXCLUDE_SEQUENCE_ID, false), validationConfig.getSegmentPushType(),
+                validationConfig.getSegmentPushFrequency(), validationConfig.getTimeType(), timeFormat);
+        break;
+      default:
+        throw new UnsupportedOperationException("Unsupported segment name generator type: " + segmentNameGeneratorType);
+    }
+
+    // Temporary local directories
+    File localStagingDir = new File(LOCAL_TEMP_DIR);
+    File localInputDir = new File(localStagingDir, "inputData");
+    File localSegmentsDir = new File(localStagingDir, "segments");
+    File localSegmentTarDir = new File(localStagingDir, JobConfigConstants.SEGMENT_TAR_DIR);
+
+    String inputFileName = hdfsInputFile.getName();
+    File localInputFile = new File(localInputDir, inputFileName);
+    LOGGER.info("Copying input file from: {} to: {}", hdfsInputFile, localInputFile);
+    FileSystem.get(hdfsInputFile.toUri(), new Configuration())
+        .copyToLocalFile(hdfsInputFile, new Path(localInputFile.getAbsolutePath()));
+
+    SegmentGeneratorConfig segmentGeneratorConfig = new SegmentGeneratorConfig(tableConfig, schema);
+    segmentGeneratorConfig.setTableName(rawTableName);
+    segmentGeneratorConfig.setInputFilePath(localInputFile.getPath());
+    segmentGeneratorConfig.setOutDir(localSegmentsDir.getPath());
+    segmentGeneratorConfig.setSegmentNameGenerator(segmentNameGenerator);
+    segmentGeneratorConfig.setSequenceId(sequenceId);
+    if (recordReaderPath != null) {
+      segmentGeneratorConfig.setRecordReaderPath(recordReaderPath);
+      segmentGeneratorConfig.setFormat(FileFormat.OTHER);
+    } else {
+      FileFormat fileFormat = getFileFormat(inputFileName);
+      segmentGeneratorConfig.setFormat(fileFormat);
+      segmentGeneratorConfig.setReaderConfig(getReaderConfig(conf, readerConfigFile, fileFormat));
+    }
+    segmentGeneratorConfig.setOnHeap(true);
+
+    addAdditionalSegmentGeneratorConfigs(segmentGeneratorConfig, hdfsInputFile, sequenceId);
+
+    LOGGER.info("Start creating segment with sequence id: {}", sequenceId);
+    SegmentIndexCreationDriver driver = new SegmentIndexCreationDriverImpl();
+    try {
+      driver.init(segmentGeneratorConfig);
+      driver.build();
+    } catch (Exception e) {
+      LOGGER.error("Caught exception while creating segment with HDFS input file: {}, sequence id: {}", hdfsInputFile,
+          sequenceId, e);
+      throw new RuntimeException(e);
+    }
+    String segmentName = driver.getSegmentName();
+    LOGGER.info("Finish creating segment: {} with sequence id: {}", segmentName, sequenceId);
+
+    File localSegmentDir = new File(localSegmentsDir, segmentName);
+    String segmentTarFileName = segmentName + JobConfigConstants.TAR_GZ_FILE_EXT;
+    File localSegmentTarFile = new File(localSegmentTarDir, segmentTarFileName);
+    LOGGER.info("Tarring segment from: {} to: {}", localSegmentDir, localSegmentTarFile);
+    TarGzCompressionUtils.createTarGzOfDirectory(localSegmentDir.getPath(), localSegmentTarFile.getPath());
+
+    long uncompressedSegmentSize = FileUtils.sizeOf(localSegmentDir);
+    long compressedSegmentSize = FileUtils.sizeOf(localSegmentTarFile);
+    LOGGER.info("Size for segment: {}, uncompressed: {}, compressed: {}", segmentName,
+        DataSize.fromBytes(uncompressedSegmentSize), DataSize.fromBytes(compressedSegmentSize));
+
+    Path hdfsSegmentTarFile = new Path(hdfsSegmentTarDir, segmentTarFileName);
+    if (useRelativePath) {
+      Path relativeOutputPath =
+          getRelativeOutputPath(new Path(conf.get(JobConfigConstants.PATH_TO_INPUT)).toUri(), hdfsInputFile.toUri(),
+              hdfsSegmentTarDir);
+      hdfsSegmentTarFile = new Path(relativeOutputPath, segmentTarFileName);
+    }
+    LOGGER.info("Copying segment tar file from: {} to: {}", localSegmentTarFile, hdfsSegmentTarFile);
+    FileSystem.get(hdfsSegmentTarFile.toUri(), new Configuration())
+        .copyFromLocalFile(true, true, new Path(localSegmentTarFile.getAbsolutePath()), hdfsSegmentTarFile);
+
+    LOGGER.info("Finish generating segment: {} with HDFS input file: {}, sequence id: {}", segmentName, hdfsInputFile,
+        sequenceId);
+  }
+
+  protected static FileFormat getFileFormat(String fileName) {
+    if (fileName.endsWith(".avro")) {
+      return FileFormat.AVRO;
+    }
+    if (fileName.endsWith(".csv")) {
+      return FileFormat.CSV;
+    }
+    if (fileName.endsWith(".json")) {
+      return FileFormat.JSON;
+    }
+    if (fileName.endsWith(".thrift")) {
+      return FileFormat.THRIFT;
+    }
+    throw new IllegalArgumentException("Unsupported file format: {}" + fileName);
+  }
+
+  @Nullable
+  protected static RecordReaderConfig getReaderConfig(Configuration conf, Path readerConfigFile, FileFormat fileFormat)
+      throws IOException {
+    if (readerConfigFile != null) {
+      if (fileFormat == FileFormat.CSV) {
+        try (InputStream inputStream = FileSystem.get(readerConfigFile.toUri(), conf).open(readerConfigFile)) {
+          CSVRecordReaderConfig readerConfig = JsonUtils.inputStreamToObject(inputStream, CSVRecordReaderConfig.class);
+          LOGGER.info("Using CSV record reader config: {}", readerConfig);
+          return readerConfig;
+        }
+      }
+      if (fileFormat == FileFormat.THRIFT) {
+        try (InputStream inputStream = FileSystem.get(readerConfigFile.toUri(), conf).open(readerConfigFile)) {
+          ThriftRecordReaderConfig readerConfig =
+              JsonUtils.inputStreamToObject(inputStream, ThriftRecordReaderConfig.class);
+          LOGGER.info("Using Thrift record reader config: {}", readerConfig);
+          return readerConfig;
+        }
+      }
+    }
+    return null;
+  }
+
+  /**
+   * Can be overridden to set additional segment generator configs.
+   */
+  @SuppressWarnings("unused")
+  protected static void addAdditionalSegmentGeneratorConfigs(SegmentGeneratorConfig segmentGeneratorConfig,
+      Path hdfsInputFile, int sequenceId) {
+  }
+
+  @Override
+  protected boolean isDataFile(String fileName) {
+    // For custom record reader, treat all files as data file
+    if (_properties.getProperty(JobConfigConstants.RECORD_READER_PATH) != null) {
+      return true;
+    }
+    return fileName.endsWith(".avro") || fileName.endsWith(".csv") || fileName.endsWith(".json") || fileName
+        .endsWith(".thrift");
+  }
+
+  public void run()
+      throws Exception {
+    LOGGER.info("Starting {}", getClass().getSimpleName());
+
+    Path inputPattern = new Path(_inputPattern);
+    Path outputDir = new Path(_stagingDir);
+    Path stagingDir = new Path(_stagingDir);
+
+    // Initialize all directories
+    FileSystem outputDirFileSystem = FileSystem.get(outputDir.toUri(), new Configuration());
+    JobPreparationHelper.mkdirs(outputDirFileSystem, outputDir, _defaultPermissionsMask);
+    JobPreparationHelper.mkdirs(outputDirFileSystem, stagingDir, _defaultPermissionsMask);
+    Path stagingInputDir = new Path(stagingDir, "input");
+    JobPreparationHelper.mkdirs(outputDirFileSystem, stagingInputDir, _defaultPermissionsMask);
+
+    // Gather all data files
+    List<Path> dataFilePaths = getDataFilePaths(inputPattern);
+    int numDataFiles = dataFilePaths.size();
+    if (numDataFiles == 0) {
+      String errorMessage = "No data file founded with pattern: " + inputPattern;
+      LOGGER.error(errorMessage);
+      throw new RuntimeException(errorMessage);
+    } else {
+      LOGGER.info("Creating segments with data files: {}", dataFilePaths);
+      for (int i = 0; i < numDataFiles; i++) {
+        Path dataFilePath = dataFilePaths.get(i);
+        try (DataOutputStream dataOutputStream = outputDirFileSystem
+            .create(new Path(stagingInputDir, Integer.toString(i)))) {
+          dataOutputStream.write(StringUtil.encodeUtf8(dataFilePath.toString() + " " + i));
+          dataOutputStream.flush();
+        }
+      }
+    }
+
+    // Set up the job
+    List<String> dataFilePathStrs = new ArrayList<>();
+    for (Path dataFilePath : dataFilePaths) {
+      dataFilePathStrs.add(dataFilePath.toString());
+    }
+
+    // Set table config and schema
+    TableConfig tableConfig = getTableConfig();
+    if (tableConfig != null) {
+      validateTableConfig(tableConfig);
+      _properties.put(JobConfigConstants.TABLE_CONFIG, tableConfig.toJsonConfigString());
+    }
+    _properties.put(JobConfigConstants.SCHEMA, getSchema().toSingleLineJsonString());
+
+    JavaSparkContext sparkContext = new JavaSparkContext();
+    addDepsJarToDistributedCache(sparkContext);
+    JavaRDD<String> pathRDD = sparkContext.parallelize(dataFilePathStrs, numDataFiles);
+    pathRDD.zipWithIndex().foreach(tuple2 -> {
+      SegmentCreationMapper segmentCreationMapper =
+          new SegmentCreationMapper(_properties, new Path(_stagingDir, "output").toString());
+      segmentCreationMapper.run(tuple2._1, tuple2._2);
+      segmentCreationMapper.cleanup();
+    });
+
+    moveSegmentsToOutputDir(outputDirFileSystem, _stagingDir, _outputDir);
+
+    // Delete the staging directory
+    LOGGER.info("Deleting the staging directory: {}", stagingDir);
+    outputDirFileSystem.delete(stagingDir, true);
+  }
+
+  @Override
+  protected Schema getSchema()
+      throws IOException {
+    try (ControllerRestApi controllerRestApi = getControllerRestApi()) {
+      if (controllerRestApi != null) {
+        return controllerRestApi.getSchema();
+      } else {
+        // Schema file could be stored local or remotely.
+        Path schemaFilePath = new Path(_schemaFile);
+        try (InputStream inputStream = FileSystem.get(schemaFilePath.toUri(), new Configuration())
+            .open(schemaFilePath)) {
+          return Schema.fromInputSteam(inputStream);
+        }
+      }
+    }
+  }
+
+  protected void validateTableConfig(TableConfig tableConfig) {
+    SegmentsValidationAndRetentionConfig validationConfig = tableConfig.getValidationConfig();
+
+    // For APPEND use case, timeColumnName and timeType must be set
+    if (APPEND.equalsIgnoreCase(validationConfig.getSegmentPushType())) {
+      Preconditions.checkState(validationConfig.getTimeColumnName() != null && validationConfig.getTimeType() != null,
+          "For APPEND use case, time column and type must be set");
+    }
+  }
+
+  protected void addDepsJarToDistributedCache(JavaSparkContext sparkContext)
+      throws IOException {
+    if (_depsJarDir != null) {
+      Path depsJarPath = new Path(_depsJarDir);
+      JobPreparationHelper
+          .addDepsJarToDistributedCacheHelper(FileSystem.get(depsJarPath.toUri(), new Configuration()), sparkContext,
+              depsJarPath);
+    }
+  }
+
+  protected void moveSegmentsToOutputDir(FileSystem outputDirFileSystem, String stagingDir, String outputDir)
+      throws IOException {
+    Path segmentTarDir = new Path(new Path(stagingDir, "output"), JobConfigConstants.SEGMENT_TAR_DIR);
+    for (FileStatus segmentTarStatus : outputDirFileSystem.listStatus(segmentTarDir)) {
+      Path segmentTarPath = segmentTarStatus.getPath();
+      Path dest = new Path(outputDir, segmentTarPath.getName());
+      LOGGER.info("Moving segment tar file from: {} to: {}", segmentTarPath, dest);
+      outputDirFileSystem.rename(segmentTarPath, dest);
+    }
+  }
+}
diff --git a/pinot-spark/src/main/java/org/apache/pinot/spark/jobs/SegmentCreationMapper.java b/pinot-spark/src/main/java/org/apache/pinot/spark/jobs/SegmentCreationMapper.java
new file mode 100644
index 0000000..f9e771b
--- /dev/null
+++ b/pinot-spark/src/main/java/org/apache/pinot/spark/jobs/SegmentCreationMapper.java
@@ -0,0 +1,323 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.pinot.spark.jobs;
+
+import com.google.common.base.Preconditions;
+import java.io.File;
+import java.io.IOException;
+import java.io.InputStream;
+import java.io.Serializable;
+import java.net.URI;
+import java.util.Map;
+import java.util.Properties;
+import javax.annotation.Nullable;
+import org.apache.commons.io.FileUtils;
+import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.fs.FileSystem;
+import org.apache.hadoop.fs.Path;
+import org.apache.pinot.common.config.SegmentsValidationAndRetentionConfig;
+import org.apache.pinot.common.config.TableConfig;
+import org.apache.pinot.common.data.Schema;
+import org.apache.pinot.common.data.TimeFieldSpec;
+import org.apache.pinot.common.utils.DataSize;
+import org.apache.pinot.common.utils.JsonUtils;
+import org.apache.pinot.common.utils.TarGzCompressionUtils;
+import org.apache.pinot.core.data.readers.CSVRecordReaderConfig;
+import org.apache.pinot.core.data.readers.FileFormat;
+import org.apache.pinot.core.data.readers.RecordReaderConfig;
+import org.apache.pinot.core.data.readers.ThriftRecordReaderConfig;
+import org.apache.pinot.core.indexsegment.generator.SegmentGeneratorConfig;
+import org.apache.pinot.core.segment.creator.SegmentIndexCreationDriver;
+import org.apache.pinot.core.segment.creator.impl.SegmentIndexCreationDriverImpl;
+import org.apache.pinot.core.segment.name.NormalizedDateSegmentNameGenerator;
+import org.apache.pinot.core.segment.name.SegmentNameGenerator;
+import org.apache.pinot.core.segment.name.SimpleSegmentNameGenerator;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+
+public class SegmentCreationMapper implements Serializable {
+  protected static final String LOCAL_TEMP_DIR = "pinot_hadoop_tmp";
+
+  protected final Logger _logger = LoggerFactory.getLogger(getClass());
+
+  protected Configuration _jobConf;
+  protected String _rawTableName;
+  protected Schema _schema;
+  protected SegmentNameGenerator _segmentNameGenerator;
+  protected boolean _useRelativePath;
+
+  // Optional
+  protected TableConfig _tableConfig;
+  protected String _recordReaderPath;
+  protected Path _readerConfigFile;
+
+  // HDFS segment tar directory
+  protected Path _hdfsSegmentTarDir;
+
+  // Temporary local directories
+  protected File _localStagingDir;
+  protected File _localInputDir;
+  protected File _localSegmentDir;
+  protected File _localSegmentTarDir;
+
+  public SegmentCreationMapper(Properties properties, String workerOutputPath)
+      throws IOException {
+    _jobConf = new Configuration();
+    for (Map.Entry<Object, Object> entry : properties.entrySet()) {
+      _jobConf.set(entry.getKey().toString(), entry.getValue().toString());
+    }
+
+    logConfigurations();
+
+    _useRelativePath = _jobConf.getBoolean(JobConfigConstants.USE_RELATIVE_PATH, false);
+    _rawTableName = _jobConf.get(JobConfigConstants.SEGMENT_TABLE_NAME);
+    _schema = Schema.fromString(_jobConf.get(JobConfigConstants.SCHEMA));
+
+    // Optional
+    String tableConfigString = _jobConf.get(JobConfigConstants.TABLE_CONFIG);
+    if (tableConfigString != null) {
+      _tableConfig = TableConfig.fromJsonString(tableConfigString);
+    }
+    String readerConfigFile = _jobConf.get(JobConfigConstants.PATH_TO_READER_CONFIG);
+    if (readerConfigFile != null) {
+      _readerConfigFile = new Path(readerConfigFile);
+    }
+    _recordReaderPath = _jobConf.get(JobConfigConstants.RECORD_READER_PATH);
+
+    // Set up segment name generator
+    String segmentNameGeneratorType =
+        _jobConf.get(JobConfigConstants.SEGMENT_NAME_GENERATOR_TYPE, JobConfigConstants.DEFAULT_SEGMENT_NAME_GENERATOR);
+    switch (segmentNameGeneratorType) {
+      case JobConfigConstants.SIMPLE_SEGMENT_NAME_GENERATOR:
+        _segmentNameGenerator =
+            new SimpleSegmentNameGenerator(_rawTableName, _jobConf.get(JobConfigConstants.SEGMENT_NAME_POSTFIX));
+        break;
+      case JobConfigConstants.NORMALIZED_DATE_SEGMENT_NAME_GENERATOR:
+        Preconditions.checkState(_tableConfig != null,
+            "In order to use NormalizedDateSegmentNameGenerator, table config must be provided");
+        SegmentsValidationAndRetentionConfig validationConfig = _tableConfig.getValidationConfig();
+        String timeFormat = null;
+        TimeFieldSpec timeFieldSpec = _schema.getTimeFieldSpec();
+        if (timeFieldSpec != null) {
+          timeFormat = timeFieldSpec.getOutgoingGranularitySpec().getTimeFormat();
+        }
+        _segmentNameGenerator =
+            new NormalizedDateSegmentNameGenerator(_rawTableName, _jobConf.get(JobConfigConstants.SEGMENT_NAME_PREFIX),
+                _jobConf.getBoolean(JobConfigConstants.EXCLUDE_SEQUENCE_ID, false),
+                validationConfig.getSegmentPushType(), validationConfig.getSegmentPushFrequency(),
+                validationConfig.getTimeType(), timeFormat);
+        break;
+      default:
+        throw new UnsupportedOperationException("Unsupported segment name generator type: " + segmentNameGeneratorType);
+    }
+
+    // Working directories
+    _hdfsSegmentTarDir = new Path(workerOutputPath, JobConfigConstants.SEGMENT_TAR_DIR);
+    _localStagingDir = new File(LOCAL_TEMP_DIR);
+    _localInputDir = new File(_localStagingDir, "inputData");
+    _localSegmentDir = new File(_localStagingDir, "segments");
+    _localSegmentTarDir = new File(_localStagingDir, JobConfigConstants.SEGMENT_TAR_DIR);
+
+    if (_localStagingDir.exists()) {
+      _logger.warn("Deleting existing file: {}", _localStagingDir);
+      FileUtils.forceDelete(_localStagingDir);
+    }
+    _logger
+        .info("Making local temporary directories: {}, {}, {}", _localStagingDir, _localInputDir, _localSegmentTarDir);
+    Preconditions.checkState(_localStagingDir.mkdirs());
+    Preconditions.checkState(_localInputDir.mkdir());
+    Preconditions.checkState(_localSegmentDir.mkdir());
+    Preconditions.checkState(_localSegmentTarDir.mkdir());
+
+    _logger.info("*********************************************************************");
+    _logger.info("Raw Table Name: {}", _rawTableName);
+    _logger.info("Schema: {}", _schema);
+    _logger.info("Segment Name Generator: {}", _segmentNameGenerator);
+    _logger.info("Table Config: {}", _tableConfig);
+    _logger.info("Reader Config File: {}", _readerConfigFile);
+    _logger.info("*********************************************************************");
+    _logger.info("HDFS Segment Tar Directory: {}", _hdfsSegmentTarDir);
+    _logger.info("Local Staging Directory: {}", _localStagingDir);
+    _logger.info("Local Input Directory: {}", _localInputDir);
+    _logger.info("Local Segment Tar Directory: {}", _localSegmentTarDir);
+    _logger.info("*********************************************************************");
+  }
+
+  /**
+   * Generate a relative output directory path when `useRelativePath` flag is on.
+   * This method will compute the relative path based on `inputFile` and `baseInputDir`,
+   * then apply only the directory part of relative path to `outputDir`.
+   * E.g.
+   *    baseInputDir = "/path/to/input"
+   *    inputFile = "/path/to/input/a/b/c/d.avro"
+   *    outputDir = "/path/to/output"
+   *    getRelativeOutputPath(baseInputDir, inputFile, outputDir) = /path/to/output/a/b/c
+   */
+  protected static Path getRelativeOutputPath(URI baseInputDir, URI inputFile, Path outputDir) {
+    URI relativePath = baseInputDir.relativize(inputFile);
+    Preconditions.checkState(relativePath.getPath().length() > 0 && !relativePath.equals(inputFile),
+        "Unable to extract out the relative path based on base input path: " + baseInputDir);
+    return new Path(outputDir, relativePath.getPath()).getParent();
+  }
+
+  protected void logConfigurations() {
+    StringBuilder stringBuilder = new StringBuilder();
+    stringBuilder.append('{');
+    boolean firstEntry = true;
+    for (Map.Entry<String, String> entry : _jobConf) {
+      if (!firstEntry) {
+        stringBuilder.append(", ");
+      } else {
+        firstEntry = false;
+      }
+
+      stringBuilder.append(entry.getKey());
+      stringBuilder.append('=');
+      stringBuilder.append(entry.getValue());
+    }
+    stringBuilder.append('}');
+
+    _logger.info("*********************************************************************");
+    _logger.info("Job Configurations: {}", stringBuilder.toString());
+    _logger.info("*********************************************************************");
+  }
+
+  protected void run(String hdfsInputFileString, Long seqId)
+      throws IOException, InterruptedException {
+    Path hdfsInputFile = new Path(hdfsInputFileString);
+    int sequenceId = seqId.intValue();
+    _logger.info("Generating segment with HDFS input file: {}, sequence id: {}", hdfsInputFile, sequenceId);
+
+    String inputFileName = hdfsInputFile.getName();
+    File localInputFile = new File(_localInputDir, inputFileName);
+    _logger.info("Copying input file from: {} to: {}", hdfsInputFile, localInputFile);
+    FileSystem.get(hdfsInputFile.toUri(), _jobConf)
+        .copyToLocalFile(hdfsInputFile, new Path(localInputFile.getAbsolutePath()));
+
+    SegmentGeneratorConfig segmentGeneratorConfig = new SegmentGeneratorConfig(_tableConfig, _schema);
+    segmentGeneratorConfig.setTableName(_rawTableName);
+    segmentGeneratorConfig.setInputFilePath(localInputFile.getPath());
+    segmentGeneratorConfig.setOutDir(_localSegmentDir.getPath());
+    segmentGeneratorConfig.setSegmentNameGenerator(_segmentNameGenerator);
+    segmentGeneratorConfig.setSequenceId(sequenceId);
+    if (_recordReaderPath != null) {
+      segmentGeneratorConfig.setRecordReaderPath(_recordReaderPath);
+      segmentGeneratorConfig.setFormat(FileFormat.OTHER);
+    } else {
+      FileFormat fileFormat = getFileFormat(inputFileName);
+      segmentGeneratorConfig.setFormat(fileFormat);
+      segmentGeneratorConfig.setReaderConfig(getReaderConfig(fileFormat));
+    }
+    segmentGeneratorConfig.setOnHeap(true);
+
+    addAdditionalSegmentGeneratorConfigs(segmentGeneratorConfig, hdfsInputFile, sequenceId);
+
+    _logger.info("Start creating segment with sequence id: {}", sequenceId);
+    SegmentIndexCreationDriver driver = new SegmentIndexCreationDriverImpl();
+
+    try {
+      driver.init(segmentGeneratorConfig);
+      driver.build();
+    } catch (Exception e) {
+      _logger.error("Caught exception while creating segment with HDFS input file: {}, sequence id: {}", hdfsInputFile,
+          sequenceId, e);
+      throw new RuntimeException(e);
+    }
+    String segmentName = driver.getSegmentName();
+    _logger.info("Finish creating segment: {} with sequence id: {}", segmentName, sequenceId);
+
+    File localSegmentDir = new File(_localSegmentDir, segmentName);
+    String segmentTarFileName = segmentName + JobConfigConstants.TAR_GZ_FILE_EXT;
+    File localSegmentTarFile = new File(_localSegmentTarDir, segmentTarFileName);
+    _logger.info("Tarring segment from: {} to: {}", localSegmentDir, localSegmentTarFile);
+    TarGzCompressionUtils.createTarGzOfDirectory(localSegmentDir.getPath(), localSegmentTarFile.getPath());
+
+    long uncompressedSegmentSize = FileUtils.sizeOf(localSegmentDir);
+    long compressedSegmentSize = FileUtils.sizeOf(localSegmentTarFile);
+    _logger.info("Size for segment: {}, uncompressed: {}, compressed: {}", segmentName,
+        DataSize.fromBytes(uncompressedSegmentSize), DataSize.fromBytes(compressedSegmentSize));
+
+    Path hdfsSegmentTarFile = new Path(_hdfsSegmentTarDir, segmentTarFileName);
+    if (_useRelativePath) {
+      Path relativeOutputPath =
+          getRelativeOutputPath(new Path(_jobConf.get(JobConfigConstants.PATH_TO_INPUT)).toUri(), hdfsInputFile.toUri(),
+              _hdfsSegmentTarDir);
+      hdfsSegmentTarFile = new Path(relativeOutputPath, segmentTarFileName);
+    }
+    _logger.info("Copying segment tar file from: {} to: {}", localSegmentTarFile, hdfsSegmentTarFile);
+    FileSystem.get(hdfsSegmentTarFile.toUri(), _jobConf)
+        .copyFromLocalFile(true, true, new Path(localSegmentTarFile.getAbsolutePath()), hdfsSegmentTarFile);
+
+    _logger.info("Finish generating segment: {} with HDFS input file: {}, sequence id: {}", segmentName, hdfsInputFile,
+        sequenceId);
+  }
+
+  protected FileFormat getFileFormat(String fileName) {
+    if (fileName.endsWith(".avro")) {
+      return FileFormat.AVRO;
+    }
+    if (fileName.endsWith(".csv")) {
+      return FileFormat.CSV;
+    }
+    if (fileName.endsWith(".json")) {
+      return FileFormat.JSON;
+    }
+    if (fileName.endsWith(".thrift")) {
+      return FileFormat.THRIFT;
+    }
+    throw new IllegalArgumentException("Unsupported file format: {}" + fileName);
+  }
+
+  @Nullable
+  protected RecordReaderConfig getReaderConfig(FileFormat fileFormat)
+      throws IOException {
+    if (_readerConfigFile != null) {
+      if (fileFormat == FileFormat.CSV) {
+        try (InputStream inputStream = FileSystem.get(_readerConfigFile.toUri(), _jobConf).open(_readerConfigFile)) {
+          CSVRecordReaderConfig readerConfig = JsonUtils.inputStreamToObject(inputStream, CSVRecordReaderConfig.class);
+          _logger.info("Using CSV record reader config: {}", readerConfig);
+          return readerConfig;
+        }
+      }
+      if (fileFormat == FileFormat.THRIFT) {
+        try (InputStream inputStream = FileSystem.get(_readerConfigFile.toUri(), _jobConf).open(_readerConfigFile)) {
+          ThriftRecordReaderConfig readerConfig =
+              JsonUtils.inputStreamToObject(inputStream, ThriftRecordReaderConfig.class);
+          _logger.info("Using Thrift record reader config: {}", readerConfig);
+          return readerConfig;
+        }
+      }
+    }
+    return null;
+  }
+
+  /**
+   * Can be overridden to set additional segment generator configs.
+   */
+  @SuppressWarnings("unused")
+  protected void addAdditionalSegmentGeneratorConfigs(SegmentGeneratorConfig segmentGeneratorConfig, Path hdfsInputFile,
+      int sequenceId) {
+  }
+
+  public void cleanup() {
+    _logger.info("Deleting local temporary directory: {}", _localStagingDir);
+    FileUtils.deleteQuietly(_localStagingDir);
+  }
+}
diff --git a/pinot-spark/src/main/java/org/apache/pinot/spark/jobs/SegmentTarPushJob.java b/pinot-spark/src/main/java/org/apache/pinot/spark/jobs/SegmentTarPushJob.java
new file mode 100644
index 0000000..259e95b
--- /dev/null
+++ b/pinot-spark/src/main/java/org/apache/pinot/spark/jobs/SegmentTarPushJob.java
@@ -0,0 +1,116 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.pinot.spark.jobs;
+
+import com.google.common.base.Preconditions;
+import java.io.IOException;
+import java.util.ArrayList;
+import java.util.HashSet;
+import java.util.List;
+import java.util.Properties;
+import java.util.Set;
+import java.util.stream.Collectors;
+import org.apache.commons.lang3.StringUtils;
+import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.fs.FileSystem;
+import org.apache.hadoop.fs.Path;
+import org.apache.pinot.spark.utils.PushLocation;
+
+
+public class SegmentTarPushJob extends BaseSegmentJob {
+  private final Path _segmentPattern;
+  private final List<PushLocation> _pushLocations;
+  private final String _rawTableName;
+  private final boolean _deleteExtraSegments;
+
+  public SegmentTarPushJob(Properties properties) {
+    super(properties);
+    _segmentPattern = Preconditions.checkNotNull(getPathFromProperty(JobConfigConstants.PATH_TO_OUTPUT));
+    String[] hosts = StringUtils.split(properties.getProperty(JobConfigConstants.PUSH_TO_HOSTS), ',');
+    int port = Integer.parseInt(properties.getProperty(JobConfigConstants.PUSH_TO_PORT));
+    _pushLocations = PushLocation.getPushLocations(hosts, port);
+    _rawTableName = Preconditions.checkNotNull(_properties.getProperty(JobConfigConstants.SEGMENT_TABLE_NAME));
+    _deleteExtraSegments = Boolean.parseBoolean(properties.getProperty(JobConfigConstants.DELETE_EXTRA_SEGMENTS, "false"));
+  }
+
+  @Override
+  protected boolean isDataFile(String fileName) {
+    return fileName.endsWith(JobConfigConstants.TAR_GZ_FILE_EXT);
+  }
+
+  public void run()
+      throws Exception {
+    FileSystem fileSystem = FileSystem.get(_segmentPattern.toUri(), new Configuration());
+    List<Path> segmentsToPush = getDataFilePaths(_segmentPattern);
+    try (ControllerRestApi controllerRestApi = getControllerRestApi()) {
+      // TODO: Deal with invalid prefixes in the future
+
+      List<String> currentSegments = controllerRestApi.getAllSegments("OFFLINE");
+
+      controllerRestApi.pushSegments(fileSystem, segmentsToPush);
+
+      if (_deleteExtraSegments) {
+        controllerRestApi.deleteSegmentUris(getSegmentsToDelete(currentSegments, segmentsToPush));
+      }
+    }
+  }
+
+  /**
+   * Deletes extra segments after pushing to the controller
+   * @param allSegments all segments on the controller for the table
+   * @param segmentsToPush segments that will be pushed to the controller
+   * @throws IOException
+   */
+  public List<String> getSegmentsToDelete(List<String> allSegments, List<Path> segmentsToPush) {
+    Set<String> uniqueSegmentPrefixes = new HashSet<>();
+
+    // Get all relevant segment prefixes that we are planning on pushing
+    List<String> segmentNamesToPush = segmentsToPush.stream().map(s -> s.getName()).collect(Collectors.toList());
+    for (String segmentName : segmentNamesToPush) {
+      String segmentNamePrefix = removeSequenceId(segmentName);
+      uniqueSegmentPrefixes.add(segmentNamePrefix);
+    }
+
+    List<String> relevantSegments = new ArrayList<>();
+    // Get relevant segments already pushed that we are planning on refreshing
+    for (String segmentName : allSegments) {
+      if (uniqueSegmentPrefixes.contains(removeSequenceId(segmentName))) {
+        relevantSegments.add(segmentName);
+      }
+    }
+
+    relevantSegments.removeAll(segmentNamesToPush);
+    return relevantSegments;
+  }
+
+  /**
+   * Remove trailing sequence id
+   * eg: If segment name is mytable_12, it will return mytable_
+   * If segment name is mytable_20190809_20190809_12, it will return mytable_20190809_20190809_
+   * @param segmentName
+   * @return
+   */
+  private String removeSequenceId(String segmentName) {
+    return segmentName.replaceAll("\\d*$", "");
+  }
+
+  protected ControllerRestApi getControllerRestApi() {
+    return new DefaultControllerRestApi(_pushLocations, _rawTableName);
+  }
+}
diff --git a/pinot-spark/src/main/java/org/apache/pinot/spark/jobs/SegmentUriPushJob.java b/pinot-spark/src/main/java/org/apache/pinot/spark/jobs/SegmentUriPushJob.java
new file mode 100644
index 0000000..910fae3
--- /dev/null
+++ b/pinot-spark/src/main/java/org/apache/pinot/spark/jobs/SegmentUriPushJob.java
@@ -0,0 +1,68 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.pinot.spark.jobs;
+
+import com.google.common.base.Preconditions;
+import java.util.ArrayList;
+import java.util.List;
+import java.util.Properties;
+import org.apache.commons.lang3.StringUtils;
+import org.apache.hadoop.fs.Path;
+import org.apache.pinot.spark.utils.PushLocation;
+
+
+public class SegmentUriPushJob extends BaseSegmentJob {
+  private final String _segmentUriPrefix;
+  private final String _segmentUriSuffix;
+  private final Path _segmentPattern;
+  private final List<PushLocation> _pushLocations;
+  private final String _rawTableName;
+
+  public SegmentUriPushJob(Properties properties) {
+    super(properties);
+    _segmentUriPrefix = properties.getProperty("uri.prefix", "");
+    _segmentUriSuffix = properties.getProperty("uri.suffix", "");
+    _segmentPattern = Preconditions.checkNotNull(getPathFromProperty(JobConfigConstants.PATH_TO_OUTPUT));
+    String[] hosts = StringUtils.split(properties.getProperty(JobConfigConstants.PUSH_TO_HOSTS), ',');
+    int port = Integer.parseInt(properties.getProperty(JobConfigConstants.PUSH_TO_PORT));
+    _pushLocations = PushLocation.getPushLocations(hosts, port);
+    _rawTableName = Preconditions.checkNotNull(_properties.getProperty(JobConfigConstants.SEGMENT_TABLE_NAME));
+  }
+
+  @Override
+  protected boolean isDataFile(String fileName) {
+    return fileName.endsWith(JobConfigConstants.TAR_GZ_FILE_EXT);
+  }
+
+  public void run()
+      throws Exception {
+    try (ControllerRestApi controllerRestApi = getControllerRestApi()) {
+      List<Path> tarFilePaths = getDataFilePaths(_segmentPattern);
+      List<String> segmentUris = new ArrayList<>(tarFilePaths.size());
+      for (Path tarFilePath : tarFilePaths) {
+        segmentUris.add(_segmentUriPrefix + tarFilePath.toUri().getRawPath() + _segmentUriSuffix);
+      }
+      controllerRestApi.sendSegmentUris(segmentUris);
+    }
+  }
+
+  protected ControllerRestApi getControllerRestApi() {
+    return new DefaultControllerRestApi(_pushLocations, _rawTableName);
+  }
+}
diff --git a/pinot-spark/src/main/java/org/apache/pinot/spark/utils/JobPreparationHelper.java b/pinot-spark/src/main/java/org/apache/pinot/spark/utils/JobPreparationHelper.java
new file mode 100644
index 0000000..92bc990
--- /dev/null
+++ b/pinot-spark/src/main/java/org/apache/pinot/spark/utils/JobPreparationHelper.java
@@ -0,0 +1,70 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.pinot.spark.utils;
+
+import java.io.IOException;
+import org.apache.hadoop.fs.FileStatus;
+import org.apache.hadoop.fs.FileSystem;
+import org.apache.hadoop.fs.Path;
+import org.apache.hadoop.fs.permission.FsPermission;
+import org.apache.spark.api.java.JavaSparkContext;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+
+public class JobPreparationHelper {
+  private static final Logger _logger = LoggerFactory.getLogger(JobPreparationHelper.class);
+
+  public static void mkdirs(FileSystem fileSystem, Path dirPath, String defaultPermissionsMask)
+      throws IOException {
+    if (fileSystem.exists(dirPath)) {
+      _logger.warn("Deleting existing file: {}", dirPath);
+      fileSystem.delete(dirPath, true);
+    }
+    _logger.info("Making directory: {}", dirPath);
+    fileSystem.mkdirs(dirPath);
+    JobPreparationHelper.setDirPermission(fileSystem, dirPath, defaultPermissionsMask);
+  }
+
+  public static void addDepsJarToDistributedCacheHelper(FileSystem fileSystem, JavaSparkContext sparkContext,
+      Path depsJarDir)
+      throws IOException {
+    FileStatus[] fileStatuses = fileSystem.listStatus(depsJarDir);
+    for (FileStatus fileStatus : fileStatuses) {
+      if (fileStatus.isDirectory()) {
+        addDepsJarToDistributedCacheHelper(fileSystem, sparkContext, fileStatus.getPath());
+      } else {
+        Path depJarPath = fileStatus.getPath();
+        if (depJarPath.getName().endsWith(".jar")) {
+          _logger.info("Adding deps jar: {} to distributed cache", depJarPath);
+          sparkContext.addJar(depJarPath.toUri().getPath());
+        }
+      }
+    }
+  }
+
+  public static void setDirPermission(FileSystem fileSystem, Path dirPath, String defaultPermissionsMask)
+      throws IOException {
+    if (defaultPermissionsMask != null) {
+      FsPermission permission = FsPermission.getDirDefault().applyUMask(new FsPermission(defaultPermissionsMask));
+      _logger.info("Setting permission: {} to directory: {}", permission, dirPath);
+      fileSystem.setPermission(dirPath, permission);
+    }
+  }
+}
diff --git a/pinot-spark/src/main/java/org/apache/pinot/spark/utils/PushLocation.java b/pinot-spark/src/main/java/org/apache/pinot/spark/utils/PushLocation.java
new file mode 100644
index 0000000..8793a1a
--- /dev/null
+++ b/pinot-spark/src/main/java/org/apache/pinot/spark/utils/PushLocation.java
@@ -0,0 +1,54 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.pinot.spark.utils;
+
+import java.util.ArrayList;
+import java.util.List;
+
+
+public class PushLocation {
+  private final String _host;
+  private final int _port;
+
+  public PushLocation(String host, int port) {
+    _host = host;
+    _port = port;
+  }
+
+  public static List<PushLocation> getPushLocations(String[] hosts, int port) {
+    List<PushLocation> pushLocations = new ArrayList<>(hosts.length);
+    for (String host : hosts) {
+      pushLocations.add(new PushLocation(host, port));
+    }
+    return pushLocations;
+  }
+
+  public String getHost() {
+    return _host;
+  }
+
+  public int getPort() {
+    return _port;
+  }
+
+  @Override
+  public String toString() {
+    return _host + ":" + _port;
+  }
+}
diff --git a/pom.xml b/pom.xml
index e6dc7f1..6ceee81 100644
--- a/pom.xml
+++ b/pom.xml
@@ -128,6 +128,7 @@
     <hadoop.version>2.7.0</hadoop.version>
     <spark.version>2.2.0</spark.version>
     <scala.binary.version>2.11</scala.binary.version>
+    <scala.version>2.11.11</scala.version>
     <antlr.version>4.6</antlr.version>
     <calcite.version>1.19.0</calcite.version>
     <!-- commons-configuration, hadoop-common, hadoop-client use commons-lang -->
@@ -597,6 +598,16 @@
         <artifactId>spark-sql_${scala.binary.version}</artifactId>
         <version>${spark.version}</version>
       </dependency>
+      <dependency>
+        <groupId>org.scala-lang</groupId>
+        <artifactId>scala-library</artifactId>
+        <version>${scala.version}</version>
+      </dependency>
+      <dependency>
+        <groupId>org.scala-lang</groupId>
+        <artifactId>scala-reflect</artifactId>
+        <version>${scala.version}</version>
+      </dependency>
 
       <!-- Hadoop  -->
       <dependency>


---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscribe@pinot.apache.org
For additional commands, e-mail: commits-help@pinot.apache.org