You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@pinot.apache.org by GitBox <gi...@apache.org> on 2020/12/18 05:14:57 UTC

[GitHub] [incubator-pinot] kishoreg commented on a change in pull request #6354: Ingestion resource with APIs for ingestion via file/URI

kishoreg commented on a change in pull request #6354:
URL: https://github.com/apache/incubator-pinot/pull/6354#discussion_r545561004



##########
File path: pinot-controller/src/main/java/org/apache/pinot/controller/api/resources/PinotIngestionRestletResource.java
##########
@@ -0,0 +1,145 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.pinot.controller.api.resources;
+
+import com.fasterxml.jackson.core.type.TypeReference;
+import com.google.common.base.Preconditions;
+import io.swagger.annotations.Api;
+import io.swagger.annotations.ApiOperation;
+import io.swagger.annotations.ApiParam;
+import java.net.URI;
+import java.util.Map;
+import javax.inject.Inject;
+import javax.ws.rs.Consumes;
+import javax.ws.rs.POST;
+import javax.ws.rs.Path;
+import javax.ws.rs.Produces;
+import javax.ws.rs.QueryParam;
+import javax.ws.rs.container.AsyncResponse;
+import javax.ws.rs.container.Suspended;
+import javax.ws.rs.core.MediaType;
+import javax.ws.rs.core.Response;
+import org.apache.pinot.controller.ControllerConf;
+import org.apache.pinot.controller.helix.core.PinotHelixResourceManager;
+import org.apache.pinot.controller.util.FileIngestionHelper;
+import org.apache.pinot.controller.util.FileIngestionHelper.DataPayload;
+import org.apache.pinot.spi.config.table.TableConfig;
+import org.apache.pinot.spi.config.table.TableType;
+import org.apache.pinot.spi.data.Schema;
+import org.apache.pinot.spi.ingestion.batch.BatchConfig;
+import org.apache.pinot.spi.utils.JsonUtils;
+import org.apache.pinot.spi.utils.builder.TableNameBuilder;
+import org.glassfish.jersey.media.multipart.FormDataMultiPart;
+import org.glassfish.jersey.server.ManagedAsync;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+
+/**
+ * APIs related to ingestion
+ */
+@Api(tags = Constants.TABLE_TAG)
+@Path("/")
+public class PinotIngestionRestletResource {
+
+  private static final Logger LOGGER = LoggerFactory.getLogger(PinotIngestionRestletResource.class);
+
+  @Inject
+  PinotHelixResourceManager _pinotHelixResourceManager;
+
+  @Inject
+  ControllerConf _controllerConf;
+
+  /**
+   * API to upload a file and ingest it into a Pinot table
+   * @param tableName Name of the table to upload to
+   * @param batchConfigMapStr Batch config Map as a string. Provide the
+   *                          input format (inputFormat)
+   *                          record reader configs (recordReader.prop.<property>),
+   *                          fs class name (input.fs.className)
+   *                          fs configs (input.fs.prop.<property>)
+   * @param fileUpload file to upload as a multipart

Review comment:
       missing async response doc

##########
File path: pinot-controller/src/main/java/org/apache/pinot/controller/api/resources/PinotIngestionRestletResource.java
##########
@@ -0,0 +1,145 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.pinot.controller.api.resources;
+
+import com.fasterxml.jackson.core.type.TypeReference;
+import com.google.common.base.Preconditions;
+import io.swagger.annotations.Api;
+import io.swagger.annotations.ApiOperation;
+import io.swagger.annotations.ApiParam;
+import java.net.URI;
+import java.util.Map;
+import javax.inject.Inject;
+import javax.ws.rs.Consumes;
+import javax.ws.rs.POST;
+import javax.ws.rs.Path;
+import javax.ws.rs.Produces;
+import javax.ws.rs.QueryParam;
+import javax.ws.rs.container.AsyncResponse;
+import javax.ws.rs.container.Suspended;
+import javax.ws.rs.core.MediaType;
+import javax.ws.rs.core.Response;
+import org.apache.pinot.controller.ControllerConf;
+import org.apache.pinot.controller.helix.core.PinotHelixResourceManager;
+import org.apache.pinot.controller.util.FileIngestionHelper;
+import org.apache.pinot.controller.util.FileIngestionHelper.DataPayload;
+import org.apache.pinot.spi.config.table.TableConfig;
+import org.apache.pinot.spi.config.table.TableType;
+import org.apache.pinot.spi.data.Schema;
+import org.apache.pinot.spi.ingestion.batch.BatchConfig;
+import org.apache.pinot.spi.utils.JsonUtils;
+import org.apache.pinot.spi.utils.builder.TableNameBuilder;
+import org.glassfish.jersey.media.multipart.FormDataMultiPart;
+import org.glassfish.jersey.server.ManagedAsync;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+
+/**
+ * APIs related to ingestion

Review comment:
       Add sample calls here. will be great if it can show up as part of swagger

##########
File path: pinot-controller/src/main/java/org/apache/pinot/controller/util/FileIngestionHelper.java
##########
@@ -0,0 +1,140 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.pinot.controller.util;
+
+import com.google.common.base.Preconditions;
+import com.google.common.collect.Lists;
+import java.io.File;
+import java.net.URI;
+import org.apache.commons.io.FileUtils;
+import org.apache.pinot.common.utils.TarGzCompressionUtils;
+import org.apache.pinot.controller.ControllerConf;
+import org.apache.pinot.controller.api.resources.SuccessResponse;
+import org.apache.pinot.core.indexsegment.generator.SegmentGeneratorConfig;
+import org.apache.pinot.spi.config.table.TableConfig;
+import org.apache.pinot.spi.data.Schema;
+import org.apache.pinot.spi.ingestion.batch.BatchConfig;
+import org.glassfish.jersey.media.multipart.FormDataMultiPart;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+
+/**
+ * A driver for the ingestion process of the provided file.
+ * Responsible for copying the file locally, building a segment and uploading it to the controller.
+ */
+public class FileIngestionHelper {
+  private static final Logger LOGGER = LoggerFactory.getLogger(FileIngestionHelper.class);
+
+  private static final String WORKING_DIR_PREFIX = "working_dir";
+  private static final String INPUT_DATA_DIR = "input_data_dir";
+  private static final String OUTPUT_SEGMENT_DIR = "output_segment_dir";
+  private static final String SEGMENT_TAR_DIR = "segment_tar_dir";
+  private static final String DATA_FILE_PREFIX = "data";
+
+  private final TableConfig _tableConfig;
+  private final Schema _schema;
+  private final BatchConfig _batchConfig;
+  private final ControllerConf _controllerConf;
+
+  public FileIngestionHelper(TableConfig tableConfig, Schema schema,
+      BatchConfig batchConfig, ControllerConf controllerConf) {
+    _tableConfig = tableConfig;
+    _schema = schema;
+    _batchConfig = batchConfig;
+    _controllerConf = controllerConf;
+  }
+
+  /**
+   * Creates a segment using the provided data file/URI and uploads to Pinot
+   */
+  public SuccessResponse buildSegmentAndPush(DataPayload payload)
+      throws Exception {
+    String tableNameWithType = _tableConfig.getTableName();
+
+    // Setup working dir
+    File workingDir = new File(FileUtils.getTempDirectory(),

Review comment:
       will be good to provide the ability to configure the file.upload.dir as input this class. Useful for testing and also the controller can initialize this with a directory under the data.dir.

##########
File path: pinot-controller/src/main/java/org/apache/pinot/controller/util/FileIngestionHelper.java
##########
@@ -0,0 +1,140 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.pinot.controller.util;
+
+import com.google.common.base.Preconditions;
+import com.google.common.collect.Lists;
+import java.io.File;
+import java.net.URI;
+import org.apache.commons.io.FileUtils;
+import org.apache.pinot.common.utils.TarGzCompressionUtils;
+import org.apache.pinot.controller.ControllerConf;
+import org.apache.pinot.controller.api.resources.SuccessResponse;
+import org.apache.pinot.core.indexsegment.generator.SegmentGeneratorConfig;
+import org.apache.pinot.spi.config.table.TableConfig;
+import org.apache.pinot.spi.data.Schema;
+import org.apache.pinot.spi.ingestion.batch.BatchConfig;
+import org.glassfish.jersey.media.multipart.FormDataMultiPart;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+
+/**
+ * A driver for the ingestion process of the provided file.
+ * Responsible for copying the file locally, building a segment and uploading it to the controller.
+ */
+public class FileIngestionHelper {
+  private static final Logger LOGGER = LoggerFactory.getLogger(FileIngestionHelper.class);
+
+  private static final String WORKING_DIR_PREFIX = "working_dir";
+  private static final String INPUT_DATA_DIR = "input_data_dir";
+  private static final String OUTPUT_SEGMENT_DIR = "output_segment_dir";
+  private static final String SEGMENT_TAR_DIR = "segment_tar_dir";
+  private static final String DATA_FILE_PREFIX = "data";
+
+  private final TableConfig _tableConfig;
+  private final Schema _schema;
+  private final BatchConfig _batchConfig;
+  private final ControllerConf _controllerConf;
+
+  public FileIngestionHelper(TableConfig tableConfig, Schema schema,
+      BatchConfig batchConfig, ControllerConf controllerConf) {
+    _tableConfig = tableConfig;
+    _schema = schema;
+    _batchConfig = batchConfig;
+    _controllerConf = controllerConf;
+  }
+
+  /**
+   * Creates a segment using the provided data file/URI and uploads to Pinot
+   */
+  public SuccessResponse buildSegmentAndPush(DataPayload payload)
+      throws Exception {
+    String tableNameWithType = _tableConfig.getTableName();
+
+    // Setup working dir
+    File workingDir = new File(FileUtils.getTempDirectory(),
+        String.format("%s_%s_%d", WORKING_DIR_PREFIX, tableNameWithType, System.currentTimeMillis()));
+    File inputDir = new File(workingDir, INPUT_DATA_DIR);
+    File outputDir = new File(workingDir, OUTPUT_SEGMENT_DIR);
+    File segmentTarDir = new File(workingDir, SEGMENT_TAR_DIR);
+    try {
+      Preconditions
+          .checkState(inputDir.mkdirs(), "Could not create directory for downloading input file locally: %s", inputDir);
+      Preconditions.checkState(segmentTarDir.mkdirs(), "Could not create directory for segment tar file: %s", inputDir);
+
+      // Copy file to local working dir
+      File inputFile =
+          new File(inputDir, String.format("%s.%s", DATA_FILE_PREFIX, _batchConfig.getInputFormat().toString().toLowerCase()));
+      if (payload._dataSource.equals(DataSource.URI)) {
+        FileIngestionUtils.copyURIToLocal(_batchConfig, payload._uri, inputFile);
+      } else {
+        FileIngestionUtils.copyMultipartToLocal(payload._multiPart, inputFile);
+      }
+
+      // Build segment
+      SegmentGeneratorConfig segmentGeneratorConfig =
+          FileIngestionUtils.generateSegmentGeneratorConfig(_tableConfig, _batchConfig, _schema, inputFile, outputDir);
+      String segmentName = FileIngestionUtils.buildSegment(segmentGeneratorConfig);
+
+      // Tar and push segment
+      File segmentTarFile =
+          new File(segmentTarDir, segmentName + org.apache.pinot.spi.ingestion.batch.spec.Constants.TAR_GZ_FILE_EXT);
+      TarGzCompressionUtils.createTarGzFile(new File(outputDir, segmentName), segmentTarFile);
+      FileIngestionUtils
+          .uploadSegment(tableNameWithType, Lists.newArrayList(segmentTarFile), _controllerConf.getControllerHost(),
+              Integer.parseInt(_controllerConf.getControllerPort()));
+
+      return new SuccessResponse(
+          "Successfully ingested file into table: " + tableNameWithType + " as segment: " + segmentName);
+    } catch (Exception e) {
+      LOGGER.error("Caught exception when ingesting file to table: {}", tableNameWithType, e);
+      throw e;
+    } finally {
+      FileUtils.deleteQuietly(workingDir);
+    }
+  }
+
+  /**
+   * Enum to identify the source of ingestion file
+   */
+  private enum DataSource {

Review comment:
       why is this called DataSource and why do we need this?
   

##########
File path: pinot-controller/src/main/java/org/apache/pinot/controller/api/resources/PinotIngestionRestletResource.java
##########
@@ -0,0 +1,145 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.pinot.controller.api.resources;
+
+import com.fasterxml.jackson.core.type.TypeReference;
+import com.google.common.base.Preconditions;
+import io.swagger.annotations.Api;
+import io.swagger.annotations.ApiOperation;
+import io.swagger.annotations.ApiParam;
+import java.net.URI;
+import java.util.Map;
+import javax.inject.Inject;
+import javax.ws.rs.Consumes;
+import javax.ws.rs.POST;
+import javax.ws.rs.Path;
+import javax.ws.rs.Produces;
+import javax.ws.rs.QueryParam;
+import javax.ws.rs.container.AsyncResponse;
+import javax.ws.rs.container.Suspended;
+import javax.ws.rs.core.MediaType;
+import javax.ws.rs.core.Response;
+import org.apache.pinot.controller.ControllerConf;
+import org.apache.pinot.controller.helix.core.PinotHelixResourceManager;
+import org.apache.pinot.controller.util.FileIngestionHelper;
+import org.apache.pinot.controller.util.FileIngestionHelper.DataPayload;
+import org.apache.pinot.spi.config.table.TableConfig;
+import org.apache.pinot.spi.config.table.TableType;
+import org.apache.pinot.spi.data.Schema;
+import org.apache.pinot.spi.ingestion.batch.BatchConfig;
+import org.apache.pinot.spi.utils.JsonUtils;
+import org.apache.pinot.spi.utils.builder.TableNameBuilder;
+import org.glassfish.jersey.media.multipart.FormDataMultiPart;
+import org.glassfish.jersey.server.ManagedAsync;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+
+/**
+ * APIs related to ingestion
+ */
+@Api(tags = Constants.TABLE_TAG)
+@Path("/")
+public class PinotIngestionRestletResource {
+
+  private static final Logger LOGGER = LoggerFactory.getLogger(PinotIngestionRestletResource.class);
+
+  @Inject
+  PinotHelixResourceManager _pinotHelixResourceManager;
+
+  @Inject
+  ControllerConf _controllerConf;
+
+  /**
+   * API to upload a file and ingest it into a Pinot table
+   * @param tableName Name of the table to upload to

Review comment:
       this should be tableNameWithType right

##########
File path: pinot-controller/src/main/java/org/apache/pinot/controller/util/FileIngestionHelper.java
##########
@@ -0,0 +1,140 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.pinot.controller.util;
+
+import com.google.common.base.Preconditions;
+import com.google.common.collect.Lists;
+import java.io.File;
+import java.net.URI;
+import org.apache.commons.io.FileUtils;
+import org.apache.pinot.common.utils.TarGzCompressionUtils;
+import org.apache.pinot.controller.ControllerConf;
+import org.apache.pinot.controller.api.resources.SuccessResponse;
+import org.apache.pinot.core.indexsegment.generator.SegmentGeneratorConfig;
+import org.apache.pinot.spi.config.table.TableConfig;
+import org.apache.pinot.spi.data.Schema;
+import org.apache.pinot.spi.ingestion.batch.BatchConfig;
+import org.glassfish.jersey.media.multipart.FormDataMultiPart;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+
+/**
+ * A driver for the ingestion process of the provided file.
+ * Responsible for copying the file locally, building a segment and uploading it to the controller.
+ */
+public class FileIngestionHelper {
+  private static final Logger LOGGER = LoggerFactory.getLogger(FileIngestionHelper.class);
+
+  private static final String WORKING_DIR_PREFIX = "working_dir";
+  private static final String INPUT_DATA_DIR = "input_data_dir";
+  private static final String OUTPUT_SEGMENT_DIR = "output_segment_dir";
+  private static final String SEGMENT_TAR_DIR = "segment_tar_dir";
+  private static final String DATA_FILE_PREFIX = "data";
+
+  private final TableConfig _tableConfig;
+  private final Schema _schema;
+  private final BatchConfig _batchConfig;
+  private final ControllerConf _controllerConf;
+
+  public FileIngestionHelper(TableConfig tableConfig, Schema schema,

Review comment:
       this is a good class, can we take only the info needed from ControllerConf. This can be used in other places

##########
File path: pinot-controller/src/main/java/org/apache/pinot/controller/api/resources/PinotIngestionRestletResource.java
##########
@@ -0,0 +1,145 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.pinot.controller.api.resources;
+
+import com.fasterxml.jackson.core.type.TypeReference;
+import com.google.common.base.Preconditions;
+import io.swagger.annotations.Api;
+import io.swagger.annotations.ApiOperation;
+import io.swagger.annotations.ApiParam;
+import java.net.URI;
+import java.util.Map;
+import javax.inject.Inject;
+import javax.ws.rs.Consumes;
+import javax.ws.rs.POST;
+import javax.ws.rs.Path;
+import javax.ws.rs.Produces;
+import javax.ws.rs.QueryParam;
+import javax.ws.rs.container.AsyncResponse;
+import javax.ws.rs.container.Suspended;
+import javax.ws.rs.core.MediaType;
+import javax.ws.rs.core.Response;
+import org.apache.pinot.controller.ControllerConf;
+import org.apache.pinot.controller.helix.core.PinotHelixResourceManager;
+import org.apache.pinot.controller.util.FileIngestionHelper;
+import org.apache.pinot.controller.util.FileIngestionHelper.DataPayload;
+import org.apache.pinot.spi.config.table.TableConfig;
+import org.apache.pinot.spi.config.table.TableType;
+import org.apache.pinot.spi.data.Schema;
+import org.apache.pinot.spi.ingestion.batch.BatchConfig;
+import org.apache.pinot.spi.utils.JsonUtils;
+import org.apache.pinot.spi.utils.builder.TableNameBuilder;
+import org.glassfish.jersey.media.multipart.FormDataMultiPart;
+import org.glassfish.jersey.server.ManagedAsync;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+
+/**
+ * APIs related to ingestion
+ */
+@Api(tags = Constants.TABLE_TAG)
+@Path("/")
+public class PinotIngestionRestletResource {
+
+  private static final Logger LOGGER = LoggerFactory.getLogger(PinotIngestionRestletResource.class);
+
+  @Inject
+  PinotHelixResourceManager _pinotHelixResourceManager;
+
+  @Inject
+  ControllerConf _controllerConf;
+
+  /**
+   * API to upload a file and ingest it into a Pinot table
+   * @param tableName Name of the table to upload to
+   * @param batchConfigMapStr Batch config Map as a string. Provide the
+   *                          input format (inputFormat)
+   *                          record reader configs (recordReader.prop.<property>),
+   *                          fs class name (input.fs.className)
+   *                          fs configs (input.fs.prop.<property>)
+   * @param fileUpload file to upload as a multipart
+   */
+  @POST
+  @ManagedAsync
+  @Produces(MediaType.APPLICATION_JSON)
+  @Consumes(MediaType.MULTIPART_FORM_DATA)
+  @Path("/ingestFromFile")
+  @ApiOperation(value = "Ingest a file", notes = "Creates a segment using given file and pushes it to Pinot")
+  public void ingestFromFile(
+      @ApiParam(value = "Name of the table to upload the file to", required = true) @QueryParam("tableName") String tableName,
+      @ApiParam(value = "Batch config map as string", required = true) @QueryParam("batchConfigMapStr") String batchConfigMapStr,
+      FormDataMultiPart fileUpload,
+      @Suspended final AsyncResponse asyncResponse) {
+    try {
+      asyncResponse.resume(ingestData(tableName, batchConfigMapStr, new DataPayload(fileUpload)));
+    } catch (Exception e) {
+      asyncResponse.resume(new ControllerApplicationException(LOGGER,
+          String.format("Caught exception when ingesting file into table: %s. %s", tableName, e.getMessage()),
+          Response.Status.INTERNAL_SERVER_ERROR, e));
+    }
+  }
+
+  /**
+   * API to ingest a file into Pinot from a URI
+   * @param tableName Name of the table to upload to
+   * @param batchConfigMapStr Batch config Map as a string. Provide the
+   *                          input format (inputFormat)
+   *                          record reader configs (recordReader.prop.<property>),
+   *                          fs class name (input.fs.className)
+   *                          fs configs (input.fs.prop.<property>)
+   * @param sourceURIStr URI for input file to ingest
+   */
+  @POST
+  @ManagedAsync
+  @Produces(MediaType.APPLICATION_JSON)
+  @Consumes(MediaType.MULTIPART_FORM_DATA)
+  @Path("/ingestFromURI")
+  @ApiOperation(value = "Ingest from the given URI", notes = "Creates a segment using file at the given URI and pushes it to Pinot")
+  public void ingestFromURI(
+      @ApiParam(value = "Name of the table to upload the file to", required = true) @QueryParam("tableName") String tableName,
+      @ApiParam(value = "Batch config map as string", required = true) @QueryParam("batchConfigMapStr") String batchConfigMapStr,
+      @ApiParam(value = "URI", required = true) @QueryParam("sourceURIStr") String sourceURIStr,
+      @Suspended final AsyncResponse asyncResponse) {
+    try {
+      asyncResponse.resume(ingestData(tableName, batchConfigMapStr, new DataPayload(new URI(sourceURIStr))));
+    } catch (Exception e) {
+      asyncResponse.resume(new ControllerApplicationException(LOGGER,
+          String.format("Caught exception when ingesting file into table: %s. %s", tableName, e.getMessage()),
+          Response.Status.INTERNAL_SERVER_ERROR, e));
+    }
+  }
+
+  private SuccessResponse ingestData(String tableName, String batchConfigMapStr, DataPayload payload)
+      throws Exception {
+    TableType tableType = TableNameBuilder.getTableTypeFromTableName(tableName);
+    Preconditions
+        .checkState(TableType.REALTIME != tableType, "Cannot ingest file into REALTIME table: %s", tableName);
+    String tableNameWithType = TableNameBuilder.forType(TableType.OFFLINE).tableNameWithType(tableName);

Review comment:
       I thought the input is already tableName with Type?

##########
File path: pinot-controller/src/main/java/org/apache/pinot/controller/api/resources/PinotIngestionRestletResource.java
##########
@@ -0,0 +1,145 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.pinot.controller.api.resources;
+
+import com.fasterxml.jackson.core.type.TypeReference;
+import com.google.common.base.Preconditions;
+import io.swagger.annotations.Api;
+import io.swagger.annotations.ApiOperation;
+import io.swagger.annotations.ApiParam;
+import java.net.URI;
+import java.util.Map;
+import javax.inject.Inject;
+import javax.ws.rs.Consumes;
+import javax.ws.rs.POST;
+import javax.ws.rs.Path;
+import javax.ws.rs.Produces;
+import javax.ws.rs.QueryParam;
+import javax.ws.rs.container.AsyncResponse;
+import javax.ws.rs.container.Suspended;
+import javax.ws.rs.core.MediaType;
+import javax.ws.rs.core.Response;
+import org.apache.pinot.controller.ControllerConf;
+import org.apache.pinot.controller.helix.core.PinotHelixResourceManager;
+import org.apache.pinot.controller.util.FileIngestionHelper;
+import org.apache.pinot.controller.util.FileIngestionHelper.DataPayload;
+import org.apache.pinot.spi.config.table.TableConfig;
+import org.apache.pinot.spi.config.table.TableType;
+import org.apache.pinot.spi.data.Schema;
+import org.apache.pinot.spi.ingestion.batch.BatchConfig;
+import org.apache.pinot.spi.utils.JsonUtils;
+import org.apache.pinot.spi.utils.builder.TableNameBuilder;
+import org.glassfish.jersey.media.multipart.FormDataMultiPart;
+import org.glassfish.jersey.server.ManagedAsync;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+
+/**
+ * APIs related to ingestion
+ */
+@Api(tags = Constants.TABLE_TAG)
+@Path("/")
+public class PinotIngestionRestletResource {
+
+  private static final Logger LOGGER = LoggerFactory.getLogger(PinotIngestionRestletResource.class);
+
+  @Inject
+  PinotHelixResourceManager _pinotHelixResourceManager;
+
+  @Inject
+  ControllerConf _controllerConf;
+
+  /**
+   * API to upload a file and ingest it into a Pinot table
+   * @param tableName Name of the table to upload to
+   * @param batchConfigMapStr Batch config Map as a string. Provide the
+   *                          input format (inputFormat)
+   *                          record reader configs (recordReader.prop.<property>),
+   *                          fs class name (input.fs.className)
+   *                          fs configs (input.fs.prop.<property>)
+   * @param fileUpload file to upload as a multipart
+   */
+  @POST
+  @ManagedAsync
+  @Produces(MediaType.APPLICATION_JSON)
+  @Consumes(MediaType.MULTIPART_FORM_DATA)
+  @Path("/ingestFromFile")
+  @ApiOperation(value = "Ingest a file", notes = "Creates a segment using given file and pushes it to Pinot")
+  public void ingestFromFile(
+      @ApiParam(value = "Name of the table to upload the file to", required = true) @QueryParam("tableName") String tableName,
+      @ApiParam(value = "Batch config map as string", required = true) @QueryParam("batchConfigMapStr") String batchConfigMapStr,
+      FormDataMultiPart fileUpload,
+      @Suspended final AsyncResponse asyncResponse) {
+    try {
+      asyncResponse.resume(ingestData(tableName, batchConfigMapStr, new DataPayload(fileUpload)));

Review comment:
       it's probably better to copy the data locally for both URI based and multipart right

##########
File path: pinot-controller/src/main/java/org/apache/pinot/controller/api/resources/PinotIngestionRestletResource.java
##########
@@ -0,0 +1,145 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.pinot.controller.api.resources;
+
+import com.fasterxml.jackson.core.type.TypeReference;
+import com.google.common.base.Preconditions;
+import io.swagger.annotations.Api;
+import io.swagger.annotations.ApiOperation;
+import io.swagger.annotations.ApiParam;
+import java.net.URI;
+import java.util.Map;
+import javax.inject.Inject;
+import javax.ws.rs.Consumes;
+import javax.ws.rs.POST;
+import javax.ws.rs.Path;
+import javax.ws.rs.Produces;
+import javax.ws.rs.QueryParam;
+import javax.ws.rs.container.AsyncResponse;
+import javax.ws.rs.container.Suspended;
+import javax.ws.rs.core.MediaType;
+import javax.ws.rs.core.Response;
+import org.apache.pinot.controller.ControllerConf;
+import org.apache.pinot.controller.helix.core.PinotHelixResourceManager;
+import org.apache.pinot.controller.util.FileIngestionHelper;
+import org.apache.pinot.controller.util.FileIngestionHelper.DataPayload;
+import org.apache.pinot.spi.config.table.TableConfig;
+import org.apache.pinot.spi.config.table.TableType;
+import org.apache.pinot.spi.data.Schema;
+import org.apache.pinot.spi.ingestion.batch.BatchConfig;
+import org.apache.pinot.spi.utils.JsonUtils;
+import org.apache.pinot.spi.utils.builder.TableNameBuilder;
+import org.glassfish.jersey.media.multipart.FormDataMultiPart;
+import org.glassfish.jersey.server.ManagedAsync;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+
+/**
+ * APIs related to ingestion
+ */
+@Api(tags = Constants.TABLE_TAG)
+@Path("/")
+public class PinotIngestionRestletResource {
+
+  private static final Logger LOGGER = LoggerFactory.getLogger(PinotIngestionRestletResource.class);
+
+  @Inject
+  PinotHelixResourceManager _pinotHelixResourceManager;
+
+  @Inject
+  ControllerConf _controllerConf;
+
+  /**
+   * API to upload a file and ingest it into a Pinot table
+   * @param tableName Name of the table to upload to
+   * @param batchConfigMapStr Batch config Map as a string. Provide the
+   *                          input format (inputFormat)
+   *                          record reader configs (recordReader.prop.<property>),
+   *                          fs class name (input.fs.className)
+   *                          fs configs (input.fs.prop.<property>)
+   * @param fileUpload file to upload as a multipart
+   */
+  @POST
+  @ManagedAsync
+  @Produces(MediaType.APPLICATION_JSON)
+  @Consumes(MediaType.MULTIPART_FORM_DATA)
+  @Path("/ingestFromFile")
+  @ApiOperation(value = "Ingest a file", notes = "Creates a segment using given file and pushes it to Pinot")
+  public void ingestFromFile(
+      @ApiParam(value = "Name of the table to upload the file to", required = true) @QueryParam("tableName") String tableName,
+      @ApiParam(value = "Batch config map as string", required = true) @QueryParam("batchConfigMapStr") String batchConfigMapStr,
+      FormDataMultiPart fileUpload,
+      @Suspended final AsyncResponse asyncResponse) {
+    try {
+      asyncResponse.resume(ingestData(tableName, batchConfigMapStr, new DataPayload(fileUpload)));
+    } catch (Exception e) {
+      asyncResponse.resume(new ControllerApplicationException(LOGGER,
+          String.format("Caught exception when ingesting file into table: %s. %s", tableName, e.getMessage()),
+          Response.Status.INTERNAL_SERVER_ERROR, e));
+    }
+  }
+
+  /**
+   * API to ingest a file into Pinot from a URI
+   * @param tableName Name of the table to upload to
+   * @param batchConfigMapStr Batch config Map as a string. Provide the
+   *                          input format (inputFormat)
+   *                          record reader configs (recordReader.prop.<property>),
+   *                          fs class name (input.fs.className)
+   *                          fs configs (input.fs.prop.<property>)
+   * @param sourceURIStr URI for input file to ingest
+   */
+  @POST
+  @ManagedAsync
+  @Produces(MediaType.APPLICATION_JSON)
+  @Consumes(MediaType.MULTIPART_FORM_DATA)
+  @Path("/ingestFromURI")
+  @ApiOperation(value = "Ingest from the given URI", notes = "Creates a segment using file at the given URI and pushes it to Pinot")
+  public void ingestFromURI(
+      @ApiParam(value = "Name of the table to upload the file to", required = true) @QueryParam("tableName") String tableName,
+      @ApiParam(value = "Batch config map as string", required = true) @QueryParam("batchConfigMapStr") String batchConfigMapStr,
+      @ApiParam(value = "URI", required = true) @QueryParam("sourceURIStr") String sourceURIStr,
+      @Suspended final AsyncResponse asyncResponse) {
+    try {
+      asyncResponse.resume(ingestData(tableName, batchConfigMapStr, new DataPayload(new URI(sourceURIStr))));
+    } catch (Exception e) {
+      asyncResponse.resume(new ControllerApplicationException(LOGGER,
+          String.format("Caught exception when ingesting file into table: %s. %s", tableName, e.getMessage()),
+          Response.Status.INTERNAL_SERVER_ERROR, e));
+    }
+  }
+
+  private SuccessResponse ingestData(String tableName, String batchConfigMapStr, DataPayload payload)

Review comment:
       tableName ->tableNameWithType
   

##########
File path: pinot-controller/src/main/java/org/apache/pinot/controller/api/resources/PinotIngestionRestletResource.java
##########
@@ -0,0 +1,145 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.pinot.controller.api.resources;
+
+import com.fasterxml.jackson.core.type.TypeReference;
+import com.google.common.base.Preconditions;
+import io.swagger.annotations.Api;
+import io.swagger.annotations.ApiOperation;
+import io.swagger.annotations.ApiParam;
+import java.net.URI;
+import java.util.Map;
+import javax.inject.Inject;
+import javax.ws.rs.Consumes;
+import javax.ws.rs.POST;
+import javax.ws.rs.Path;
+import javax.ws.rs.Produces;
+import javax.ws.rs.QueryParam;
+import javax.ws.rs.container.AsyncResponse;
+import javax.ws.rs.container.Suspended;
+import javax.ws.rs.core.MediaType;
+import javax.ws.rs.core.Response;
+import org.apache.pinot.controller.ControllerConf;
+import org.apache.pinot.controller.helix.core.PinotHelixResourceManager;
+import org.apache.pinot.controller.util.FileIngestionHelper;
+import org.apache.pinot.controller.util.FileIngestionHelper.DataPayload;
+import org.apache.pinot.spi.config.table.TableConfig;
+import org.apache.pinot.spi.config.table.TableType;
+import org.apache.pinot.spi.data.Schema;
+import org.apache.pinot.spi.ingestion.batch.BatchConfig;
+import org.apache.pinot.spi.utils.JsonUtils;
+import org.apache.pinot.spi.utils.builder.TableNameBuilder;
+import org.glassfish.jersey.media.multipart.FormDataMultiPart;
+import org.glassfish.jersey.server.ManagedAsync;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+
+/**
+ * APIs related to ingestion
+ */
+@Api(tags = Constants.TABLE_TAG)
+@Path("/")
+public class PinotIngestionRestletResource {
+
+  private static final Logger LOGGER = LoggerFactory.getLogger(PinotIngestionRestletResource.class);
+
+  @Inject
+  PinotHelixResourceManager _pinotHelixResourceManager;
+
+  @Inject
+  ControllerConf _controllerConf;
+
+  /**
+   * API to upload a file and ingest it into a Pinot table

Review comment:
       add some documentation on what the caller should expect. Will this call return something immediately or wait until the operation is done

##########
File path: pinot-controller/src/main/java/org/apache/pinot/controller/util/FileIngestionHelper.java
##########
@@ -0,0 +1,140 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.pinot.controller.util;
+
+import com.google.common.base.Preconditions;
+import com.google.common.collect.Lists;
+import java.io.File;
+import java.net.URI;
+import org.apache.commons.io.FileUtils;
+import org.apache.pinot.common.utils.TarGzCompressionUtils;
+import org.apache.pinot.controller.ControllerConf;
+import org.apache.pinot.controller.api.resources.SuccessResponse;
+import org.apache.pinot.core.indexsegment.generator.SegmentGeneratorConfig;
+import org.apache.pinot.spi.config.table.TableConfig;
+import org.apache.pinot.spi.data.Schema;
+import org.apache.pinot.spi.ingestion.batch.BatchConfig;
+import org.glassfish.jersey.media.multipart.FormDataMultiPart;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+
+/**
+ * A driver for the ingestion process of the provided file.
+ * Responsible for copying the file locally, building a segment and uploading it to the controller.
+ */
+public class FileIngestionHelper {
+  private static final Logger LOGGER = LoggerFactory.getLogger(FileIngestionHelper.class);
+
+  private static final String WORKING_DIR_PREFIX = "working_dir";
+  private static final String INPUT_DATA_DIR = "input_data_dir";
+  private static final String OUTPUT_SEGMENT_DIR = "output_segment_dir";
+  private static final String SEGMENT_TAR_DIR = "segment_tar_dir";
+  private static final String DATA_FILE_PREFIX = "data";
+
+  private final TableConfig _tableConfig;
+  private final Schema _schema;
+  private final BatchConfig _batchConfig;
+  private final ControllerConf _controllerConf;
+
+  public FileIngestionHelper(TableConfig tableConfig, Schema schema,
+      BatchConfig batchConfig, ControllerConf controllerConf) {
+    _tableConfig = tableConfig;
+    _schema = schema;
+    _batchConfig = batchConfig;
+    _controllerConf = controllerConf;
+  }
+
+  /**
+   * Creates a segment using the provided data file/URI and uploads to Pinot
+   */
+  public SuccessResponse buildSegmentAndPush(DataPayload payload)
+      throws Exception {
+    String tableNameWithType = _tableConfig.getTableName();
+
+    // Setup working dir
+    File workingDir = new File(FileUtils.getTempDirectory(),
+        String.format("%s_%s_%d", WORKING_DIR_PREFIX, tableNameWithType, System.currentTimeMillis()));
+    File inputDir = new File(workingDir, INPUT_DATA_DIR);
+    File outputDir = new File(workingDir, OUTPUT_SEGMENT_DIR);
+    File segmentTarDir = new File(workingDir, SEGMENT_TAR_DIR);
+    try {
+      Preconditions
+          .checkState(inputDir.mkdirs(), "Could not create directory for downloading input file locally: %s", inputDir);
+      Preconditions.checkState(segmentTarDir.mkdirs(), "Could not create directory for segment tar file: %s", inputDir);
+
+      // Copy file to local working dir
+      File inputFile =
+          new File(inputDir, String.format("%s.%s", DATA_FILE_PREFIX, _batchConfig.getInputFormat().toString().toLowerCase()));
+      if (payload._dataSource.equals(DataSource.URI)) {
+        FileIngestionUtils.copyURIToLocal(_batchConfig, payload._uri, inputFile);
+      } else {
+        FileIngestionUtils.copyMultipartToLocal(payload._multiPart, inputFile);
+      }
+
+      // Build segment
+      SegmentGeneratorConfig segmentGeneratorConfig =

Review comment:
       better to log this somewhere for debugging purpose

##########
File path: pinot-controller/src/main/java/org/apache/pinot/controller/util/FileIngestionHelper.java
##########
@@ -0,0 +1,140 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.pinot.controller.util;
+
+import com.google.common.base.Preconditions;
+import com.google.common.collect.Lists;
+import java.io.File;
+import java.net.URI;
+import org.apache.commons.io.FileUtils;
+import org.apache.pinot.common.utils.TarGzCompressionUtils;
+import org.apache.pinot.controller.ControllerConf;
+import org.apache.pinot.controller.api.resources.SuccessResponse;
+import org.apache.pinot.core.indexsegment.generator.SegmentGeneratorConfig;
+import org.apache.pinot.spi.config.table.TableConfig;
+import org.apache.pinot.spi.data.Schema;
+import org.apache.pinot.spi.ingestion.batch.BatchConfig;
+import org.glassfish.jersey.media.multipart.FormDataMultiPart;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+
+/**
+ * A driver for the ingestion process of the provided file.
+ * Responsible for copying the file locally, building a segment and uploading it to the controller.
+ */
+public class FileIngestionHelper {
+  private static final Logger LOGGER = LoggerFactory.getLogger(FileIngestionHelper.class);
+
+  private static final String WORKING_DIR_PREFIX = "working_dir";
+  private static final String INPUT_DATA_DIR = "input_data_dir";
+  private static final String OUTPUT_SEGMENT_DIR = "output_segment_dir";
+  private static final String SEGMENT_TAR_DIR = "segment_tar_dir";
+  private static final String DATA_FILE_PREFIX = "data";
+
+  private final TableConfig _tableConfig;
+  private final Schema _schema;
+  private final BatchConfig _batchConfig;
+  private final ControllerConf _controllerConf;
+
+  public FileIngestionHelper(TableConfig tableConfig, Schema schema,
+      BatchConfig batchConfig, ControllerConf controllerConf) {
+    _tableConfig = tableConfig;
+    _schema = schema;
+    _batchConfig = batchConfig;
+    _controllerConf = controllerConf;
+  }
+
+  /**
+   * Creates a segment using the provided data file/URI and uploads to Pinot
+   */
+  public SuccessResponse buildSegmentAndPush(DataPayload payload)
+      throws Exception {
+    String tableNameWithType = _tableConfig.getTableName();
+
+    // Setup working dir
+    File workingDir = new File(FileUtils.getTempDirectory(),
+        String.format("%s_%s_%d", WORKING_DIR_PREFIX, tableNameWithType, System.currentTimeMillis()));
+    File inputDir = new File(workingDir, INPUT_DATA_DIR);
+    File outputDir = new File(workingDir, OUTPUT_SEGMENT_DIR);
+    File segmentTarDir = new File(workingDir, SEGMENT_TAR_DIR);
+    try {
+      Preconditions
+          .checkState(inputDir.mkdirs(), "Could not create directory for downloading input file locally: %s", inputDir);
+      Preconditions.checkState(segmentTarDir.mkdirs(), "Could not create directory for segment tar file: %s", inputDir);
+
+      // Copy file to local working dir
+      File inputFile =
+          new File(inputDir, String.format("%s.%s", DATA_FILE_PREFIX, _batchConfig.getInputFormat().toString().toLowerCase()));
+      if (payload._dataSource.equals(DataSource.URI)) {
+        FileIngestionUtils.copyURIToLocal(_batchConfig, payload._uri, inputFile);
+      } else {
+        FileIngestionUtils.copyMultipartToLocal(payload._multiPart, inputFile);
+      }
+
+      // Build segment
+      SegmentGeneratorConfig segmentGeneratorConfig =
+          FileIngestionUtils.generateSegmentGeneratorConfig(_tableConfig, _batchConfig, _schema, inputFile, outputDir);
+      String segmentName = FileIngestionUtils.buildSegment(segmentGeneratorConfig);
+
+      // Tar and push segment
+      File segmentTarFile =
+          new File(segmentTarDir, segmentName + org.apache.pinot.spi.ingestion.batch.spec.Constants.TAR_GZ_FILE_EXT);
+      TarGzCompressionUtils.createTarGzFile(new File(outputDir, segmentName), segmentTarFile);
+      FileIngestionUtils

Review comment:
       what happens if the segment already exists? can we say replace if it already exists?




----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscribe@pinot.apache.org
For additional commands, e-mail: commits-help@pinot.apache.org