You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@pinot.apache.org by xi...@apache.org on 2019/11/06 11:05:53 UTC
[incubator-pinot] 02/02: Initial commit for pinot-spark
This is an automated email from the ASF dual-hosted git repository.
xiangfu pushed a commit to branch pinot-spark
in repository https://gitbox.apache.org/repos/asf/incubator-pinot.git
commit 6fbd9da6d6d16412403cda0cb9ee9a1b9c1056dd
Author: Xiang Fu <fx...@gmail.com>
AuthorDate: Wed Nov 6 03:05:19 2019 -0800
Initial commit for pinot-spark
---
pinot-spark/pom.xml | 52 ++-
.../apache/pinot/spark/PinotSparkJobLauncher.java | 80 ++++
.../apache/pinot/spark/jobs/BaseSegmentJob.java | 137 +++++++
.../apache/pinot/spark/jobs/ControllerRestApi.java | 42 ++
.../pinot/spark/jobs/DefaultControllerRestApi.java | 192 +++++++++
.../pinot/spark/jobs/JobConfigConstants.java | 65 ++++
.../pinot/spark/jobs/SegmentCreationJob.java | 427 +++++++++++++++++++++
.../pinot/spark/jobs/SegmentCreationMapper.java | 323 ++++++++++++++++
.../apache/pinot/spark/jobs/SegmentTarPushJob.java | 116 ++++++
.../apache/pinot/spark/jobs/SegmentUriPushJob.java | 68 ++++
.../pinot/spark/utils/JobPreparationHelper.java | 70 ++++
.../org/apache/pinot/spark/utils/PushLocation.java | 54 +++
pom.xml | 11 +
13 files changed, 1628 insertions(+), 9 deletions(-)
diff --git a/pinot-spark/pom.xml b/pinot-spark/pom.xml
index 615b21a..6c56c38 100644
--- a/pinot-spark/pom.xml
+++ b/pinot-spark/pom.xml
@@ -71,7 +71,7 @@
<transformers>
<transformer implementation="org.apache.maven.plugins.shade.resource.ServicesResourceTransformer"/>
<transformer implementation="org.apache.maven.plugins.shade.resource.ManifestResourceTransformer">
- <mainClass>org.apache.pinot.hadoop.PinotHadoopJobLauncher</mainClass>
+ <mainClass>org.apache.pinot.spark.PinotSparkJobLauncher</mainClass>
</transformer>
</transformers>
</configuration>
@@ -95,6 +95,10 @@
<groupId>org.apache.pinot</groupId>
<artifactId>pinot-common</artifactId>
</exclusion>
+ <exclusion>
+ <groupId>com.thoughtworks.paranamer</groupId>
+ <artifactId>paranamer</artifactId>
+ </exclusion>
</exclusions>
</dependency>
<dependency>
@@ -123,17 +127,34 @@
<groupId>org.apache.pinot</groupId>
<artifactId>pinot-core</artifactId>
</exclusion>
+ <exclusion>
+ <groupId>org.apache.hadoop</groupId>
+ <artifactId>hadoop-mapreduce-client-core</artifactId>
+ </exclusion>
+ <exclusion>
+ <groupId>org.apache.hadoop</groupId>
+ <artifactId>hadoop-hdfs</artifactId>
+ </exclusion>
</exclusions>
</dependency>
<dependency>
<groupId>org.apache.spark</groupId>
<artifactId>spark-core_${scala.binary.version}</artifactId>
<scope>provided</scope>
- </dependency>
- <dependency>
- <groupId>org.apache.spark</groupId>
- <artifactId>spark-sql_${scala.binary.version}</artifactId>
- <scope>provided</scope>
+ <exclusions>
+ <exclusion>
+ <groupId>org.scala-lang</groupId>
+ <artifactId>scala-reflect</artifactId>
+ </exclusion>
+ <exclusion>
+ <groupId>org.scala-lang</groupId>
+ <artifactId>scala-library</artifactId>
+ </exclusion>
+ <exclusion>
+ <groupId>javax.activation</groupId>
+ <artifactId>activation</artifactId>
+ </exclusion>
+ </exclusions>
</dependency>
<dependency>
<groupId>commons-logging</groupId>
@@ -149,11 +170,24 @@
<scope>provided</scope>
</dependency>
<dependency>
- <groupId>org.apache.avro</groupId>
- <artifactId>avro-mapred</artifactId>
- <classifier>hadoop2</classifier>
+ <groupId>org.scala-lang</groupId>
+ <artifactId>scala-library</artifactId>
+ </dependency>
+ <dependency>
+ <groupId>org.scala-lang</groupId>
+ <artifactId>scala-reflect</artifactId>
+ </dependency>
+ <dependency>
+ <groupId>com.fasterxml.jackson.core</groupId>
+ <artifactId>jackson-core</artifactId>
</dependency>
<dependency>
+ <groupId>com.fasterxml.jackson.core</groupId>
+ <artifactId>jackson-databind</artifactId>
+ </dependency>
+
+ <!--Test-->
+ <dependency>
<groupId>org.testng</groupId>
<artifactId>testng</artifactId>
<scope>test</scope>
diff --git a/pinot-spark/src/main/java/org/apache/pinot/spark/PinotSparkJobLauncher.java b/pinot-spark/src/main/java/org/apache/pinot/spark/PinotSparkJobLauncher.java
new file mode 100644
index 0000000..6ed9422
--- /dev/null
+++ b/pinot-spark/src/main/java/org/apache/pinot/spark/PinotSparkJobLauncher.java
@@ -0,0 +1,80 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.pinot.spark;
+
+import java.io.FileInputStream;
+import java.util.Arrays;
+import java.util.Properties;
+import org.apache.pinot.spark.jobs.SegmentCreationJob;
+import org.apache.pinot.spark.jobs.SegmentTarPushJob;
+
+
+public class PinotSparkJobLauncher {
+
+ enum PinotSparkJobType {
+ SegmentCreation, SegmentTarPush, SegmentCreationAndTarPush
+ }
+
+ private static final String USAGE = "usage: [job_type] [job.properties]";
+ private static final String SUPPORT_JOB_TYPES =
+ "\tsupport job types: " + Arrays.toString(PinotSparkJobType.values());
+
+ private static void usage() {
+ System.err.println(USAGE);
+ System.err.println(SUPPORT_JOB_TYPES);
+ }
+
+ private static void kickOffPinotSparkJob(PinotSparkJobType jobType, Properties jobConf)
+ throws Exception {
+ switch (jobType) {
+ case SegmentCreation:
+ new SegmentCreationJob(jobConf).run();
+ break;
+ case SegmentTarPush:
+ new SegmentTarPushJob(jobConf).run();
+ break;
+ case SegmentCreationAndTarPush:
+ new SegmentCreationJob(jobConf).run();
+ new SegmentTarPushJob(jobConf).run();
+ break;
+ default:
+ throw new RuntimeException("Not a valid jobType - " + jobType);
+ }
+ }
+
+
+ public static void main(String[] args)
+ throws Exception {
+ if (args.length != 2) {
+ usage();
+ System.exit(1);
+ }
+ PinotSparkJobType jobType = null;
+ Properties jobConf = null;
+ try {
+ jobType = PinotSparkJobType.valueOf(args[0]);
+ jobConf = new Properties();
+ jobConf.load(new FileInputStream(args[1]));
+ } catch (Exception e) {
+ usage();
+ System.exit(1);
+ }
+ kickOffPinotSparkJob(jobType, jobConf);
+ }
+}
diff --git a/pinot-spark/src/main/java/org/apache/pinot/spark/jobs/BaseSegmentJob.java b/pinot-spark/src/main/java/org/apache/pinot/spark/jobs/BaseSegmentJob.java
new file mode 100644
index 0000000..3285899
--- /dev/null
+++ b/pinot-spark/src/main/java/org/apache/pinot/spark/jobs/BaseSegmentJob.java
@@ -0,0 +1,137 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.pinot.spark.jobs;
+
+import com.google.common.base.Preconditions;
+import java.io.IOException;
+import java.io.Serializable;
+import java.util.ArrayList;
+import java.util.List;
+import java.util.Properties;
+import javax.annotation.Nullable;
+import org.apache.commons.lang3.StringUtils;
+import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.fs.FileStatus;
+import org.apache.hadoop.fs.FileSystem;
+import org.apache.hadoop.fs.Path;
+import org.apache.pinot.common.Utils;
+import org.apache.pinot.common.config.TableConfig;
+import org.apache.pinot.spark.utils.PushLocation;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+
+public abstract class BaseSegmentJob implements Serializable {
+ protected final Logger _logger = LoggerFactory.getLogger(getClass());
+ protected final Properties _properties;
+ protected final List<PushLocation> _pushLocations;
+ protected final String _rawTableName;
+
+ protected BaseSegmentJob(Properties properties) {
+ _properties = properties;
+ Utils.logVersions();
+ logProperties();
+
+ // Optional push location and table parameters. If set, will use the table config and schema from the push hosts.
+ String pushHostsString = _properties.getProperty(JobConfigConstants.PUSH_TO_HOSTS);
+ String pushPortString = _properties.getProperty(JobConfigConstants.PUSH_TO_PORT);
+ if (pushHostsString != null && pushPortString != null) {
+ _pushLocations =
+ PushLocation.getPushLocations(StringUtils.split(pushHostsString, ','), Integer.parseInt(pushPortString));
+ } else {
+ _pushLocations = null;
+ }
+
+ _rawTableName = Preconditions.checkNotNull(_properties.getProperty(JobConfigConstants.SEGMENT_TABLE_NAME));
+ }
+
+ @Nullable
+ protected TableConfig getTableConfig()
+ throws IOException {
+ try (ControllerRestApi controllerRestApi = getControllerRestApi()) {
+ return controllerRestApi != null ? controllerRestApi.getTableConfig() : null;
+ }
+ }
+
+ /**
+ * This method is currently implemented in SegmentCreationJob and SegmentPreprocessingJob due to a dependency on
+ * the hadoop filesystem, which we can only get once the job begins to run.
+ * We return null here to make it clear that for now, all implementations of this method have to support
+ * reading from a schema file. In the future, we hope to deprecate reading the schema from the schema file in favor
+ * of mandating that a schema is pushed to the controller.
+ */
+ @Nullable
+ protected org.apache.pinot.common.data.Schema getSchema() throws IOException {
+ return null;
+ }
+
+ /**
+ * Can be overridden to provide custom controller Rest API.
+ */
+ @Nullable
+ protected ControllerRestApi getControllerRestApi() {
+ return _pushLocations != null ? new DefaultControllerRestApi(_pushLocations, _rawTableName) : null;
+ }
+
+ protected void logProperties() {
+ _logger.info("*********************************************************************");
+ _logger.info("Job Properties: {}", _properties);
+ _logger.info("*********************************************************************");
+ }
+
+ @Nullable
+ protected Path getPathFromProperty(String key) {
+ String value = _properties.getProperty(key);
+ return value != null ? new Path(value) : null;
+ }
+
+ protected List<Path> getDataFilePaths(Path pathPattern)
+ throws IOException {
+ List<Path> tarFilePaths = new ArrayList<>();
+ FileSystem fileSystem = FileSystem.get(pathPattern.toUri(), new Configuration());
+ _logger.info("Using filesystem: {}", fileSystem);
+ FileStatus[] fileStatuses = fileSystem.globStatus(pathPattern);
+ if (fileStatuses == null) {
+ _logger.warn("Unable to match file status from file path pattern: {}", pathPattern);
+ } else {
+ getDataFilePathsHelper(fileSystem, fileStatuses, tarFilePaths);
+ }
+ return tarFilePaths;
+ }
+
+ protected void getDataFilePathsHelper(FileSystem fileSystem, FileStatus[] fileStatuses, List<Path> tarFilePaths)
+ throws IOException {
+ for (FileStatus fileStatus : fileStatuses) {
+ Path path = fileStatus.getPath();
+ if (fileStatus.isDirectory()) {
+ getDataFilePathsHelper(fileSystem, fileSystem.listStatus(path), tarFilePaths);
+ } else {
+ // Skip temp files generated by computation frameworks like Hadoop/Spark.
+ if (path.getName().startsWith("_") || path.getName().startsWith(".")) {
+ continue;
+ }
+ if (isDataFile(path.getName())) {
+ tarFilePaths.add(path);
+ }
+ }
+ }
+ }
+
+ protected abstract boolean isDataFile(String fileName);
+}
diff --git a/pinot-spark/src/main/java/org/apache/pinot/spark/jobs/ControllerRestApi.java b/pinot-spark/src/main/java/org/apache/pinot/spark/jobs/ControllerRestApi.java
new file mode 100644
index 0000000..4d0391c
--- /dev/null
+++ b/pinot-spark/src/main/java/org/apache/pinot/spark/jobs/ControllerRestApi.java
@@ -0,0 +1,42 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.pinot.spark.jobs;
+
+import java.io.Closeable;
+import java.util.List;
+import org.apache.hadoop.fs.FileSystem;
+import org.apache.hadoop.fs.Path;
+import org.apache.pinot.common.config.TableConfig;
+import org.apache.pinot.common.data.Schema;
+
+
+public interface ControllerRestApi extends Closeable {
+
+ TableConfig getTableConfig();
+
+ Schema getSchema();
+
+ void pushSegments(FileSystem fileSystem, List<Path> tarFilePaths);
+
+ void sendSegmentUris(List<String> segmentUris);
+
+ void deleteSegmentUris(List<String> segmentUris);
+
+ List<String> getAllSegments(String tableType);
+}
diff --git a/pinot-spark/src/main/java/org/apache/pinot/spark/jobs/DefaultControllerRestApi.java b/pinot-spark/src/main/java/org/apache/pinot/spark/jobs/DefaultControllerRestApi.java
new file mode 100644
index 0000000..c9444a9
--- /dev/null
+++ b/pinot-spark/src/main/java/org/apache/pinot/spark/jobs/DefaultControllerRestApi.java
@@ -0,0 +1,192 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.pinot.spark.jobs;
+
+import com.fasterxml.jackson.databind.JsonNode;
+import com.fasterxml.jackson.databind.ObjectMapper;
+import com.google.common.base.Preconditions;
+import java.io.IOException;
+import java.io.InputStream;
+import java.util.ArrayList;
+import java.util.List;
+import org.apache.hadoop.fs.FileSystem;
+import org.apache.hadoop.fs.Path;
+import org.apache.pinot.common.config.TableConfig;
+import org.apache.pinot.common.data.Schema;
+import org.apache.pinot.common.utils.FileUploadDownloadClient;
+import org.apache.pinot.common.utils.JsonUtils;
+import org.apache.pinot.common.utils.SimpleHttpResponse;
+import org.apache.pinot.spark.utils.PushLocation;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+
+public class DefaultControllerRestApi implements ControllerRestApi {
+ private static final Logger LOGGER = LoggerFactory.getLogger(DefaultControllerRestApi.class);
+
+ private final List<PushLocation> _pushLocations;
+ private final String _rawTableName;
+ private final FileUploadDownloadClient _fileUploadDownloadClient = new FileUploadDownloadClient();
+
+ private static final String OFFLINE = "OFFLINE";
+
+ public DefaultControllerRestApi(List<PushLocation> pushLocations, String rawTableName) {
+ LOGGER.info("Push locations are: {} for table: {}", pushLocations, rawTableName);
+ _pushLocations = pushLocations;
+ _rawTableName = rawTableName;
+ }
+
+ @Override
+ public TableConfig getTableConfig() {
+ for (PushLocation pushLocation : _pushLocations) {
+ try {
+ SimpleHttpResponse response = _fileUploadDownloadClient.sendGetRequest(FileUploadDownloadClient
+ .getRetrieveTableConfigHttpURI(pushLocation.getHost(), pushLocation.getPort(), _rawTableName));
+ JsonNode offlineJsonTableConfig = JsonUtils.stringToJsonNode(response.getResponse()).get(OFFLINE);
+ if (offlineJsonTableConfig != null) {
+ TableConfig offlineTableConfig = TableConfig.fromJsonConfig(offlineJsonTableConfig);
+ LOGGER.info("Got table config: {}", offlineTableConfig);
+ return offlineTableConfig;
+ }
+ } catch (Exception e) {
+ LOGGER.warn("Caught exception while fetching table config for table: {} from push location: {}", _rawTableName,
+ pushLocation, e);
+ }
+ }
+ String errorMessage = String
+ .format("Failed to get table config from push locations: %s for table: %s", _pushLocations, _rawTableName);
+ LOGGER.error(errorMessage);
+ throw new RuntimeException(errorMessage);
+ }
+
+ @Override
+ public Schema getSchema() {
+ for (PushLocation pushLocation : _pushLocations) {
+ try {
+ SimpleHttpResponse response = _fileUploadDownloadClient.sendGetRequest(FileUploadDownloadClient
+ .getRetrieveSchemaHttpURI(pushLocation.getHost(), pushLocation.getPort(), _rawTableName));
+ Schema schema = Schema.fromString(response.getResponse());
+ LOGGER.info("Got schema: {}", schema);
+ return schema;
+ } catch (Exception e) {
+ LOGGER.warn("Caught exception while fetching schema for table: {} from push location: {}", _rawTableName,
+ pushLocation, e);
+ }
+ }
+ String errorMessage =
+ String.format("Failed to get schema from push locations: %s for table: %s", _pushLocations, _rawTableName);
+ LOGGER.error(errorMessage);
+ throw new RuntimeException(errorMessage);
+ }
+
+ @Override
+ public void pushSegments(FileSystem fileSystem, List<Path> tarFilePaths) {
+ LOGGER.info("Start pushing segments: {} to locations: {}", tarFilePaths, _pushLocations);
+ for (Path tarFilePath : tarFilePaths) {
+ String fileName = tarFilePath.getName();
+ Preconditions.checkArgument(fileName.endsWith(JobConfigConstants.TAR_GZ_FILE_EXT));
+ String segmentName = fileName.substring(0, fileName.length() - JobConfigConstants.TAR_GZ_FILE_EXT.length());
+ for (PushLocation pushLocation : _pushLocations) {
+ LOGGER.info("Pushing segment: {} to location: {}", segmentName, pushLocation);
+ try (InputStream inputStream = fileSystem.open(tarFilePath)) {
+ SimpleHttpResponse response = _fileUploadDownloadClient.uploadSegment(
+ FileUploadDownloadClient.getUploadSegmentHttpURI(pushLocation.getHost(), pushLocation.getPort()),
+ segmentName, inputStream, _rawTableName);
+ LOGGER.info("Response {}: {}", response.getStatusCode(), response.getResponse());
+ } catch (Exception e) {
+ LOGGER.error("Caught exception while pushing segment: {} to location: {}", segmentName, pushLocation, e);
+ throw new RuntimeException(e);
+ }
+ }
+ }
+ }
+
+ @Override
+ public void sendSegmentUris(List<String> segmentUris) {
+ LOGGER.info("Start sending segment URIs: {} to locations: {}", segmentUris, _pushLocations);
+ for (String segmentUri : segmentUris) {
+ for (PushLocation pushLocation : _pushLocations) {
+ LOGGER.info("Sending segment URI: {} to location: {}", segmentUri, pushLocation);
+ try {
+ SimpleHttpResponse response = _fileUploadDownloadClient.sendSegmentUri(
+ FileUploadDownloadClient.getUploadSegmentHttpURI(pushLocation.getHost(), pushLocation.getPort()),
+ segmentUri, _rawTableName);
+ LOGGER.info("Response {}: {}", response.getStatusCode(), response.getResponse());
+ } catch (Exception e) {
+ LOGGER.error("Caught exception while sending segment URI: {} to location: {}", segmentUri, pushLocation, e);
+ throw new RuntimeException(e);
+ }
+ }
+ }
+ }
+
+ @Override
+ public void deleteSegmentUris(List<String> segmentUris) {
+ LOGGER.info("Start deleting segment URIs: {} to locations: {}", segmentUris, _pushLocations);
+ for (String segmentUri : segmentUris) {
+ for (PushLocation pushLocation : _pushLocations) {
+ LOGGER.info("Sending deleting segment URI: {} to location: {}", segmentUri, pushLocation);
+ try {
+ SimpleHttpResponse response = _fileUploadDownloadClient.sendDeleteRequest(
+ FileUploadDownloadClient.getDeleteSegmentHttpUri(pushLocation.getHost(), pushLocation.getPort(), _rawTableName,
+ segmentUri, "OFFLINE"));
+ LOGGER.info("Response {}: {}", response.getStatusCode(), response.getResponse());
+ } catch (Exception e) {
+ LOGGER.error("Caught exception while deleting segment URI: {} to location: {}", segmentUri, pushLocation, e);
+ throw new RuntimeException(e);
+ }
+ }
+ }
+ }
+
+ @Override
+ public List<String> getAllSegments(String tableType) {
+ LOGGER.info("Getting all segments of table {}", _rawTableName);
+ ObjectMapper objectMapper = new ObjectMapper();
+ for (PushLocation pushLocation : _pushLocations) {
+ try {
+ SimpleHttpResponse response = _fileUploadDownloadClient.sendGetRequest(
+ FileUploadDownloadClient.getRetrieveAllSegmentWithTableTypeHttpUri(pushLocation.getHost(), pushLocation.getPort(),
+ _rawTableName, tableType));
+ JsonNode segmentList = getSegmentsFromJsonSegmentAPI(response.getResponse(), tableType);
+ return objectMapper.convertValue(segmentList, ArrayList.class);
+ } catch (Exception e) {
+ LOGGER.warn("Caught exception while getting all {} segments for table: {} from push location: {}", tableType, _rawTableName,
+ pushLocation, e);
+ }
+ }
+ String errorMessage =
+ String.format("Failed to get a list of all segments from push locations: %s for table: %s", _pushLocations,
+ _rawTableName);
+ LOGGER.error(errorMessage);
+ throw new RuntimeException(errorMessage);
+
+ }
+
+ @Override
+ public void close()
+ throws IOException {
+ _fileUploadDownloadClient.close();
+ }
+
+ private JsonNode getSegmentsFromJsonSegmentAPI(String json, String tableType)
+ throws Exception {
+ return JsonUtils.stringToJsonNode(json).get(0).get(tableType);
+ }
+}
diff --git a/pinot-spark/src/main/java/org/apache/pinot/spark/jobs/JobConfigConstants.java b/pinot-spark/src/main/java/org/apache/pinot/spark/jobs/JobConfigConstants.java
new file mode 100644
index 0000000..f7636d3
--- /dev/null
+++ b/pinot-spark/src/main/java/org/apache/pinot/spark/jobs/JobConfigConstants.java
@@ -0,0 +1,65 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.pinot.spark.jobs;
+
+public class JobConfigConstants {
+ public static final String PATH_TO_INPUT = "path.to.input";
+ public static final String PATH_TO_OUTPUT = "path.to.output";
+ public static final String PREPROCESS_PATH_TO_OUTPUT = "preprocess.path.to.output";
+ public static final String PATH_TO_DEPS_JAR = "path.to.deps.jar";
+ public static final String PATH_TO_READER_CONFIG = "path.to.reader.config";
+ // Leave this for backward compatibility. We prefer to use the schema fetched from the controller.
+ public static final String PATH_TO_SCHEMA = "path.to.schema";
+
+ public static final String SEGMENT_TAR_DIR = "segmentTar";
+ public static final String TAR_GZ_FILE_EXT = ".tar.gz";
+
+ public static final String SEGMENT_TABLE_NAME = "segment.table.name";
+ public static final String TABLE_CONFIG = "table.config";
+ public static final String SCHEMA = "data.schema";
+
+ public static final String SEGMENT_NAME_GENERATOR_TYPE = "segment.name.generator.type";
+ public static final String SIMPLE_SEGMENT_NAME_GENERATOR = "simple";
+ public static final String NORMALIZED_DATE_SEGMENT_NAME_GENERATOR = "normalizedDate";
+ public static final String DEFAULT_SEGMENT_NAME_GENERATOR = SIMPLE_SEGMENT_NAME_GENERATOR;
+
+ // For SimpleSegmentNameGenerator
+ public static final String SEGMENT_NAME_POSTFIX = "segment.name.postfix";
+
+ // For NormalizedDateSegmentNameGenerator
+ public static final String SEGMENT_NAME_PREFIX = "segment.name.prefix";
+ public static final String EXCLUDE_SEQUENCE_ID = "exclude.sequence.id";
+
+ public static final String PUSH_TO_HOSTS = "push.to.hosts";
+ public static final String PUSH_TO_PORT = "push.to.port";
+
+ public static final String DEFAULT_PERMISSIONS_MASK = "fs.permissions.umask-mode";
+
+ // The path to the record reader to be configured
+ public static final String RECORD_READER_PATH = "record.reader.path";
+
+ public static final String ENABLE_PREPROCESSING = "enable.preprocessing";
+
+ // This setting should be used if you will generate less # of segments after
+ // push. In preprocessing, this is likely because we resize segments.
+ public static final String DELETE_EXTRA_SEGMENTS = "delete.extra.segments";
+
+ // This setting is used to match output segments hierarchy along with input file hierarchy.
+ public static final String USE_RELATIVE_PATH = "use.relative.path";
+}
diff --git a/pinot-spark/src/main/java/org/apache/pinot/spark/jobs/SegmentCreationJob.java b/pinot-spark/src/main/java/org/apache/pinot/spark/jobs/SegmentCreationJob.java
new file mode 100644
index 0000000..74f09ee
--- /dev/null
+++ b/pinot-spark/src/main/java/org/apache/pinot/spark/jobs/SegmentCreationJob.java
@@ -0,0 +1,427 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.pinot.spark.jobs;
+
+import com.google.common.base.Preconditions;
+import java.io.DataOutputStream;
+import java.io.File;
+import java.io.IOException;
+import java.io.InputStream;
+import java.io.Serializable;
+import java.net.URI;
+import java.util.ArrayList;
+import java.util.List;
+import java.util.Map;
+import java.util.Properties;
+import java.util.UUID;
+import javax.annotation.Nullable;
+import org.apache.commons.io.FileUtils;
+import org.apache.commons.lang3.StringUtils;
+import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.fs.FileStatus;
+import org.apache.hadoop.fs.FileSystem;
+import org.apache.hadoop.fs.Path;
+import org.apache.pinot.common.config.SegmentsValidationAndRetentionConfig;
+import org.apache.pinot.common.config.TableConfig;
+import org.apache.pinot.common.data.Schema;
+import org.apache.pinot.common.data.TimeFieldSpec;
+import org.apache.pinot.common.utils.DataSize;
+import org.apache.pinot.common.utils.JsonUtils;
+import org.apache.pinot.common.utils.StringUtil;
+import org.apache.pinot.common.utils.TarGzCompressionUtils;
+import org.apache.pinot.core.data.readers.CSVRecordReaderConfig;
+import org.apache.pinot.core.data.readers.FileFormat;
+import org.apache.pinot.core.data.readers.RecordReaderConfig;
+import org.apache.pinot.core.data.readers.ThriftRecordReaderConfig;
+import org.apache.pinot.core.indexsegment.generator.SegmentGeneratorConfig;
+import org.apache.pinot.core.segment.creator.SegmentIndexCreationDriver;
+import org.apache.pinot.core.segment.creator.impl.SegmentIndexCreationDriverImpl;
+import org.apache.pinot.core.segment.name.NormalizedDateSegmentNameGenerator;
+import org.apache.pinot.core.segment.name.SegmentNameGenerator;
+import org.apache.pinot.core.segment.name.SimpleSegmentNameGenerator;
+import org.apache.pinot.spark.utils.JobPreparationHelper;
+import org.apache.pinot.spark.utils.PushLocation;
+import org.apache.spark.api.java.JavaRDD;
+import org.apache.spark.api.java.JavaSparkContext;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+
+public class SegmentCreationJob extends BaseSegmentJob {
+ protected static final String APPEND = "APPEND";
+ protected static final String LOCAL_TEMP_DIR = "pinot_spark_tmp";
+ protected static final Logger LOGGER = LoggerFactory.getLogger(SegmentCreationJob.class);
+ protected final String _rawTableName;
+
+ protected final String _inputPattern;
+ protected final String _outputDir;
+ protected final String _stagingDir;
+ // Optional
+ protected final String _depsJarDir;
+ protected final String _schemaFile;
+ protected final String _defaultPermissionsMask;
+ protected final List<PushLocation> _pushLocations;
+
+ public SegmentCreationJob(Properties properties) {
+ super(properties);
+ new Configuration().set("mapreduce.job.user.classpath.first", "true");
+
+ _inputPattern = Preconditions.checkNotNull(getPathFromProperty(JobConfigConstants.PATH_TO_INPUT)).toString();
+ _outputDir = Preconditions.checkNotNull(getPathFromProperty(JobConfigConstants.PATH_TO_OUTPUT)).toString();
+ _stagingDir = new Path(_outputDir, UUID.randomUUID().toString()).toString();
+ _rawTableName = Preconditions.checkNotNull(_properties.getProperty(JobConfigConstants.SEGMENT_TABLE_NAME));
+
+ // Optional
+ _depsJarDir = properties.getProperty(JobConfigConstants.PATH_TO_DEPS_JAR);
+ _schemaFile = properties.getProperty(JobConfigConstants.PATH_TO_SCHEMA);
+ _defaultPermissionsMask = _properties.getProperty(JobConfigConstants.DEFAULT_PERMISSIONS_MASK);
+
+ // Optional push location and table parameters. If set, will use the table config and schema from the push hosts.
+ String pushHostsString = _properties.getProperty(JobConfigConstants.PUSH_TO_HOSTS);
+ String pushPortString = _properties.getProperty(JobConfigConstants.PUSH_TO_PORT);
+ if (pushHostsString != null && pushPortString != null) {
+ _pushLocations =
+ PushLocation.getPushLocations(StringUtils.split(pushHostsString, ','), Integer.parseInt(pushPortString));
+ } else {
+ _pushLocations = null;
+ }
+
+ LOGGER.info("*********************************************************************");
+ LOGGER.info("Input Pattern: {}", _inputPattern);
+ LOGGER.info("Output Directory: {}", _outputDir);
+ LOGGER.info("Staging Directory: {}", _stagingDir);
+ LOGGER.info("Raw Table Name: {}", _rawTableName);
+ LOGGER.info("Dependencies Directory: {}", _depsJarDir);
+ LOGGER.info("Schema File: {}", _schemaFile);
+ LOGGER.info("Default Permissions Mask: {}", _defaultPermissionsMask);
+ LOGGER.info("Push Locations: {}", _pushLocations);
+ LOGGER.info("*********************************************************************");
+ }
+
+ /**
+ * Generate a relative output directory path when `useRelativePath` flag is on.
+ * This method will compute the relative path based on `inputFile` and `baseInputDir`,
+ * then apply only the directory part of relative path to `outputDir`.
+ * E.g.
+ * baseInputDir = "/path/to/input"
+ * inputFile = "/path/to/input/a/b/c/d.avro"
+ * outputDir = "/path/to/output"
+ * getRelativeOutputPath(baseInputDir, inputFile, outputDir) = /path/to/output/a/b/c
+ */
+ protected static Path getRelativeOutputPath(URI baseInputDir, URI inputFile, Path outputDir) {
+ URI relativePath = baseInputDir.relativize(inputFile);
+ Preconditions.checkState(relativePath.getPath().length() > 0 && !relativePath.equals(inputFile),
+ "Unable to extract out the relative path based on base input path: " + baseInputDir);
+ return new Path(outputDir, relativePath.getPath()).getParent();
+ }
+
+ protected static void createSingleSegment(String inputFile, Long seqId, Configuration conf, String stagingDir)
+ throws IOException {
+ Path hdfsInputFile = new Path(inputFile);
+ int sequenceId = seqId.intValue();
+ LOGGER.info("Generating segment with HDFS input file: {}, sequence id: {}", hdfsInputFile, sequenceId);
+
+ String rawTableName = conf.get(JobConfigConstants.SEGMENT_TABLE_NAME);
+ Schema schema = Schema.fromString(conf.get(JobConfigConstants.SCHEMA));
+ SegmentNameGenerator segmentNameGenerator;
+ boolean useRelativePath = conf.getBoolean(JobConfigConstants.USE_RELATIVE_PATH, false);
+
+ // Optional
+ TableConfig tableConfig = null;
+ String recordReaderPath;
+ Path readerConfigFile = null;
+
+ String tableConfigString = conf.get(JobConfigConstants.TABLE_CONFIG);
+ if (tableConfigString != null) {
+ tableConfig = TableConfig.fromJsonString(tableConfigString);
+ }
+ String readerConfigFileStr = conf.get(JobConfigConstants.PATH_TO_READER_CONFIG);
+ if (readerConfigFileStr != null) {
+ readerConfigFile = new Path(readerConfigFileStr);
+ }
+ recordReaderPath = conf.get(JobConfigConstants.RECORD_READER_PATH);
+
+ // HDFS segment tar directory
+ Path hdfsSegmentTarDir = new Path(new Path(stagingDir, "output"), JobConfigConstants.SEGMENT_TAR_DIR);
+
+ // Set up segment name generator
+ String segmentNameGeneratorType =
+ conf.get(JobConfigConstants.SEGMENT_NAME_GENERATOR_TYPE, JobConfigConstants.DEFAULT_SEGMENT_NAME_GENERATOR);
+ switch (segmentNameGeneratorType) {
+ case JobConfigConstants.SIMPLE_SEGMENT_NAME_GENERATOR:
+ segmentNameGenerator =
+ new SimpleSegmentNameGenerator(rawTableName, conf.get(JobConfigConstants.SEGMENT_NAME_POSTFIX));
+ break;
+ case JobConfigConstants.NORMALIZED_DATE_SEGMENT_NAME_GENERATOR:
+ Preconditions.checkState(tableConfig != null,
+ "In order to use NormalizedDateSegmentNameGenerator, table config must be provided");
+ SegmentsValidationAndRetentionConfig validationConfig = tableConfig.getValidationConfig();
+ String timeFormat = null;
+ TimeFieldSpec timeFieldSpec = schema.getTimeFieldSpec();
+ if (timeFieldSpec != null) {
+ timeFormat = timeFieldSpec.getOutgoingGranularitySpec().getTimeFormat();
+ }
+ segmentNameGenerator =
+ new NormalizedDateSegmentNameGenerator(rawTableName, conf.get(JobConfigConstants.SEGMENT_NAME_PREFIX),
+ conf.getBoolean(JobConfigConstants.EXCLUDE_SEQUENCE_ID, false), validationConfig.getSegmentPushType(),
+ validationConfig.getSegmentPushFrequency(), validationConfig.getTimeType(), timeFormat);
+ break;
+ default:
+ throw new UnsupportedOperationException("Unsupported segment name generator type: " + segmentNameGeneratorType);
+ }
+
+ // Temporary local directories
+ File localStagingDir = new File(LOCAL_TEMP_DIR);
+ File localInputDir = new File(localStagingDir, "inputData");
+ File localSegmentsDir = new File(localStagingDir, "segments");
+ File localSegmentTarDir = new File(localStagingDir, JobConfigConstants.SEGMENT_TAR_DIR);
+
+ String inputFileName = hdfsInputFile.getName();
+ File localInputFile = new File(localInputDir, inputFileName);
+ LOGGER.info("Copying input file from: {} to: {}", hdfsInputFile, localInputFile);
+ FileSystem.get(hdfsInputFile.toUri(), new Configuration())
+ .copyToLocalFile(hdfsInputFile, new Path(localInputFile.getAbsolutePath()));
+
+ SegmentGeneratorConfig segmentGeneratorConfig = new SegmentGeneratorConfig(tableConfig, schema);
+ segmentGeneratorConfig.setTableName(rawTableName);
+ segmentGeneratorConfig.setInputFilePath(localInputFile.getPath());
+ segmentGeneratorConfig.setOutDir(localSegmentsDir.getPath());
+ segmentGeneratorConfig.setSegmentNameGenerator(segmentNameGenerator);
+ segmentGeneratorConfig.setSequenceId(sequenceId);
+ if (recordReaderPath != null) {
+ segmentGeneratorConfig.setRecordReaderPath(recordReaderPath);
+ segmentGeneratorConfig.setFormat(FileFormat.OTHER);
+ } else {
+ FileFormat fileFormat = getFileFormat(inputFileName);
+ segmentGeneratorConfig.setFormat(fileFormat);
+ segmentGeneratorConfig.setReaderConfig(getReaderConfig(conf, readerConfigFile, fileFormat));
+ }
+ segmentGeneratorConfig.setOnHeap(true);
+
+ addAdditionalSegmentGeneratorConfigs(segmentGeneratorConfig, hdfsInputFile, sequenceId);
+
+ LOGGER.info("Start creating segment with sequence id: {}", sequenceId);
+ SegmentIndexCreationDriver driver = new SegmentIndexCreationDriverImpl();
+ try {
+ driver.init(segmentGeneratorConfig);
+ driver.build();
+ } catch (Exception e) {
+ LOGGER.error("Caught exception while creating segment with HDFS input file: {}, sequence id: {}", hdfsInputFile,
+ sequenceId, e);
+ throw new RuntimeException(e);
+ }
+ String segmentName = driver.getSegmentName();
+ LOGGER.info("Finish creating segment: {} with sequence id: {}", segmentName, sequenceId);
+
+ File localSegmentDir = new File(localSegmentsDir, segmentName);
+ String segmentTarFileName = segmentName + JobConfigConstants.TAR_GZ_FILE_EXT;
+ File localSegmentTarFile = new File(localSegmentTarDir, segmentTarFileName);
+ LOGGER.info("Tarring segment from: {} to: {}", localSegmentDir, localSegmentTarFile);
+ TarGzCompressionUtils.createTarGzOfDirectory(localSegmentDir.getPath(), localSegmentTarFile.getPath());
+
+ long uncompressedSegmentSize = FileUtils.sizeOf(localSegmentDir);
+ long compressedSegmentSize = FileUtils.sizeOf(localSegmentTarFile);
+ LOGGER.info("Size for segment: {}, uncompressed: {}, compressed: {}", segmentName,
+ DataSize.fromBytes(uncompressedSegmentSize), DataSize.fromBytes(compressedSegmentSize));
+
+ Path hdfsSegmentTarFile = new Path(hdfsSegmentTarDir, segmentTarFileName);
+ if (useRelativePath) {
+ Path relativeOutputPath =
+ getRelativeOutputPath(new Path(conf.get(JobConfigConstants.PATH_TO_INPUT)).toUri(), hdfsInputFile.toUri(),
+ hdfsSegmentTarDir);
+ hdfsSegmentTarFile = new Path(relativeOutputPath, segmentTarFileName);
+ }
+ LOGGER.info("Copying segment tar file from: {} to: {}", localSegmentTarFile, hdfsSegmentTarFile);
+ FileSystem.get(hdfsSegmentTarFile.toUri(), new Configuration())
+ .copyFromLocalFile(true, true, new Path(localSegmentTarFile.getAbsolutePath()), hdfsSegmentTarFile);
+
+ LOGGER.info("Finish generating segment: {} with HDFS input file: {}, sequence id: {}", segmentName, hdfsInputFile,
+ sequenceId);
+ }
+
+ protected static FileFormat getFileFormat(String fileName) {
+ if (fileName.endsWith(".avro")) {
+ return FileFormat.AVRO;
+ }
+ if (fileName.endsWith(".csv")) {
+ return FileFormat.CSV;
+ }
+ if (fileName.endsWith(".json")) {
+ return FileFormat.JSON;
+ }
+ if (fileName.endsWith(".thrift")) {
+ return FileFormat.THRIFT;
+ }
+ throw new IllegalArgumentException("Unsupported file format: {}" + fileName);
+ }
+
+ @Nullable
+ protected static RecordReaderConfig getReaderConfig(Configuration conf, Path readerConfigFile, FileFormat fileFormat)
+ throws IOException {
+ if (readerConfigFile != null) {
+ if (fileFormat == FileFormat.CSV) {
+ try (InputStream inputStream = FileSystem.get(readerConfigFile.toUri(), conf).open(readerConfigFile)) {
+ CSVRecordReaderConfig readerConfig = JsonUtils.inputStreamToObject(inputStream, CSVRecordReaderConfig.class);
+ LOGGER.info("Using CSV record reader config: {}", readerConfig);
+ return readerConfig;
+ }
+ }
+ if (fileFormat == FileFormat.THRIFT) {
+ try (InputStream inputStream = FileSystem.get(readerConfigFile.toUri(), conf).open(readerConfigFile)) {
+ ThriftRecordReaderConfig readerConfig =
+ JsonUtils.inputStreamToObject(inputStream, ThriftRecordReaderConfig.class);
+ LOGGER.info("Using Thrift record reader config: {}", readerConfig);
+ return readerConfig;
+ }
+ }
+ }
+ return null;
+ }
+
+ /**
+ * Can be overridden to set additional segment generator configs.
+ */
+ @SuppressWarnings("unused")
+ protected static void addAdditionalSegmentGeneratorConfigs(SegmentGeneratorConfig segmentGeneratorConfig,
+ Path hdfsInputFile, int sequenceId) {
+ }
+
+ @Override
+ protected boolean isDataFile(String fileName) {
+ // For custom record reader, treat all files as data file
+ if (_properties.getProperty(JobConfigConstants.RECORD_READER_PATH) != null) {
+ return true;
+ }
+ return fileName.endsWith(".avro") || fileName.endsWith(".csv") || fileName.endsWith(".json") || fileName
+ .endsWith(".thrift");
+ }
+
+ public void run()
+ throws Exception {
+ LOGGER.info("Starting {}", getClass().getSimpleName());
+
+ Path inputPattern = new Path(_inputPattern);
+ Path outputDir = new Path(_stagingDir);
+ Path stagingDir = new Path(_stagingDir);
+
+ // Initialize all directories
+ FileSystem outputDirFileSystem = FileSystem.get(outputDir.toUri(), new Configuration());
+ JobPreparationHelper.mkdirs(outputDirFileSystem, outputDir, _defaultPermissionsMask);
+ JobPreparationHelper.mkdirs(outputDirFileSystem, stagingDir, _defaultPermissionsMask);
+ Path stagingInputDir = new Path(stagingDir, "input");
+ JobPreparationHelper.mkdirs(outputDirFileSystem, stagingInputDir, _defaultPermissionsMask);
+
+ // Gather all data files
+ List<Path> dataFilePaths = getDataFilePaths(inputPattern);
+ int numDataFiles = dataFilePaths.size();
+ if (numDataFiles == 0) {
+ String errorMessage = "No data file founded with pattern: " + inputPattern;
+ LOGGER.error(errorMessage);
+ throw new RuntimeException(errorMessage);
+ } else {
+ LOGGER.info("Creating segments with data files: {}", dataFilePaths);
+ for (int i = 0; i < numDataFiles; i++) {
+ Path dataFilePath = dataFilePaths.get(i);
+ try (DataOutputStream dataOutputStream = outputDirFileSystem
+ .create(new Path(stagingInputDir, Integer.toString(i)))) {
+ dataOutputStream.write(StringUtil.encodeUtf8(dataFilePath.toString() + " " + i));
+ dataOutputStream.flush();
+ }
+ }
+ }
+
+ // Set up the job
+ List<String> dataFilePathStrs = new ArrayList<>();
+ for (Path dataFilePath : dataFilePaths) {
+ dataFilePathStrs.add(dataFilePath.toString());
+ }
+
+ // Set table config and schema
+ TableConfig tableConfig = getTableConfig();
+ if (tableConfig != null) {
+ validateTableConfig(tableConfig);
+ _properties.put(JobConfigConstants.TABLE_CONFIG, tableConfig.toJsonConfigString());
+ }
+ _properties.put(JobConfigConstants.SCHEMA, getSchema().toSingleLineJsonString());
+
+ JavaSparkContext sparkContext = new JavaSparkContext();
+ addDepsJarToDistributedCache(sparkContext);
+ JavaRDD<String> pathRDD = sparkContext.parallelize(dataFilePathStrs, numDataFiles);
+ pathRDD.zipWithIndex().foreach(tuple2 -> {
+ SegmentCreationMapper segmentCreationMapper =
+ new SegmentCreationMapper(_properties, new Path(_stagingDir, "output").toString());
+ segmentCreationMapper.run(tuple2._1, tuple2._2);
+ segmentCreationMapper.cleanup();
+ });
+
+ moveSegmentsToOutputDir(outputDirFileSystem, _stagingDir, _outputDir);
+
+ // Delete the staging directory
+ LOGGER.info("Deleting the staging directory: {}", stagingDir);
+ outputDirFileSystem.delete(stagingDir, true);
+ }
+
+ @Override
+ protected Schema getSchema()
+ throws IOException {
+ try (ControllerRestApi controllerRestApi = getControllerRestApi()) {
+ if (controllerRestApi != null) {
+ return controllerRestApi.getSchema();
+ } else {
+ // Schema file could be stored local or remotely.
+ Path schemaFilePath = new Path(_schemaFile);
+ try (InputStream inputStream = FileSystem.get(schemaFilePath.toUri(), new Configuration())
+ .open(schemaFilePath)) {
+ return Schema.fromInputSteam(inputStream);
+ }
+ }
+ }
+ }
+
+ protected void validateTableConfig(TableConfig tableConfig) {
+ SegmentsValidationAndRetentionConfig validationConfig = tableConfig.getValidationConfig();
+
+ // For APPEND use case, timeColumnName and timeType must be set
+ if (APPEND.equalsIgnoreCase(validationConfig.getSegmentPushType())) {
+ Preconditions.checkState(validationConfig.getTimeColumnName() != null && validationConfig.getTimeType() != null,
+ "For APPEND use case, time column and type must be set");
+ }
+ }
+
+ protected void addDepsJarToDistributedCache(JavaSparkContext sparkContext)
+ throws IOException {
+ if (_depsJarDir != null) {
+ Path depsJarPath = new Path(_depsJarDir);
+ JobPreparationHelper
+ .addDepsJarToDistributedCacheHelper(FileSystem.get(depsJarPath.toUri(), new Configuration()), sparkContext,
+ depsJarPath);
+ }
+ }
+
+ protected void moveSegmentsToOutputDir(FileSystem outputDirFileSystem, String stagingDir, String outputDir)
+ throws IOException {
+ Path segmentTarDir = new Path(new Path(stagingDir, "output"), JobConfigConstants.SEGMENT_TAR_DIR);
+ for (FileStatus segmentTarStatus : outputDirFileSystem.listStatus(segmentTarDir)) {
+ Path segmentTarPath = segmentTarStatus.getPath();
+ Path dest = new Path(outputDir, segmentTarPath.getName());
+ LOGGER.info("Moving segment tar file from: {} to: {}", segmentTarPath, dest);
+ outputDirFileSystem.rename(segmentTarPath, dest);
+ }
+ }
+}
diff --git a/pinot-spark/src/main/java/org/apache/pinot/spark/jobs/SegmentCreationMapper.java b/pinot-spark/src/main/java/org/apache/pinot/spark/jobs/SegmentCreationMapper.java
new file mode 100644
index 0000000..f9e771b
--- /dev/null
+++ b/pinot-spark/src/main/java/org/apache/pinot/spark/jobs/SegmentCreationMapper.java
@@ -0,0 +1,323 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.pinot.spark.jobs;
+
+import com.google.common.base.Preconditions;
+import java.io.File;
+import java.io.IOException;
+import java.io.InputStream;
+import java.io.Serializable;
+import java.net.URI;
+import java.util.Map;
+import java.util.Properties;
+import javax.annotation.Nullable;
+import org.apache.commons.io.FileUtils;
+import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.fs.FileSystem;
+import org.apache.hadoop.fs.Path;
+import org.apache.pinot.common.config.SegmentsValidationAndRetentionConfig;
+import org.apache.pinot.common.config.TableConfig;
+import org.apache.pinot.common.data.Schema;
+import org.apache.pinot.common.data.TimeFieldSpec;
+import org.apache.pinot.common.utils.DataSize;
+import org.apache.pinot.common.utils.JsonUtils;
+import org.apache.pinot.common.utils.TarGzCompressionUtils;
+import org.apache.pinot.core.data.readers.CSVRecordReaderConfig;
+import org.apache.pinot.core.data.readers.FileFormat;
+import org.apache.pinot.core.data.readers.RecordReaderConfig;
+import org.apache.pinot.core.data.readers.ThriftRecordReaderConfig;
+import org.apache.pinot.core.indexsegment.generator.SegmentGeneratorConfig;
+import org.apache.pinot.core.segment.creator.SegmentIndexCreationDriver;
+import org.apache.pinot.core.segment.creator.impl.SegmentIndexCreationDriverImpl;
+import org.apache.pinot.core.segment.name.NormalizedDateSegmentNameGenerator;
+import org.apache.pinot.core.segment.name.SegmentNameGenerator;
+import org.apache.pinot.core.segment.name.SimpleSegmentNameGenerator;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+
+public class SegmentCreationMapper implements Serializable {
+ protected static final String LOCAL_TEMP_DIR = "pinot_hadoop_tmp";
+
+ protected final Logger _logger = LoggerFactory.getLogger(getClass());
+
+ protected Configuration _jobConf;
+ protected String _rawTableName;
+ protected Schema _schema;
+ protected SegmentNameGenerator _segmentNameGenerator;
+ protected boolean _useRelativePath;
+
+ // Optional
+ protected TableConfig _tableConfig;
+ protected String _recordReaderPath;
+ protected Path _readerConfigFile;
+
+ // HDFS segment tar directory
+ protected Path _hdfsSegmentTarDir;
+
+ // Temporary local directories
+ protected File _localStagingDir;
+ protected File _localInputDir;
+ protected File _localSegmentDir;
+ protected File _localSegmentTarDir;
+
+ public SegmentCreationMapper(Properties properties, String workerOutputPath)
+ throws IOException {
+ _jobConf = new Configuration();
+ for (Map.Entry<Object, Object> entry : properties.entrySet()) {
+ _jobConf.set(entry.getKey().toString(), entry.getValue().toString());
+ }
+
+ logConfigurations();
+
+ _useRelativePath = _jobConf.getBoolean(JobConfigConstants.USE_RELATIVE_PATH, false);
+ _rawTableName = _jobConf.get(JobConfigConstants.SEGMENT_TABLE_NAME);
+ _schema = Schema.fromString(_jobConf.get(JobConfigConstants.SCHEMA));
+
+ // Optional
+ String tableConfigString = _jobConf.get(JobConfigConstants.TABLE_CONFIG);
+ if (tableConfigString != null) {
+ _tableConfig = TableConfig.fromJsonString(tableConfigString);
+ }
+ String readerConfigFile = _jobConf.get(JobConfigConstants.PATH_TO_READER_CONFIG);
+ if (readerConfigFile != null) {
+ _readerConfigFile = new Path(readerConfigFile);
+ }
+ _recordReaderPath = _jobConf.get(JobConfigConstants.RECORD_READER_PATH);
+
+ // Set up segment name generator
+ String segmentNameGeneratorType =
+ _jobConf.get(JobConfigConstants.SEGMENT_NAME_GENERATOR_TYPE, JobConfigConstants.DEFAULT_SEGMENT_NAME_GENERATOR);
+ switch (segmentNameGeneratorType) {
+ case JobConfigConstants.SIMPLE_SEGMENT_NAME_GENERATOR:
+ _segmentNameGenerator =
+ new SimpleSegmentNameGenerator(_rawTableName, _jobConf.get(JobConfigConstants.SEGMENT_NAME_POSTFIX));
+ break;
+ case JobConfigConstants.NORMALIZED_DATE_SEGMENT_NAME_GENERATOR:
+ Preconditions.checkState(_tableConfig != null,
+ "In order to use NormalizedDateSegmentNameGenerator, table config must be provided");
+ SegmentsValidationAndRetentionConfig validationConfig = _tableConfig.getValidationConfig();
+ String timeFormat = null;
+ TimeFieldSpec timeFieldSpec = _schema.getTimeFieldSpec();
+ if (timeFieldSpec != null) {
+ timeFormat = timeFieldSpec.getOutgoingGranularitySpec().getTimeFormat();
+ }
+ _segmentNameGenerator =
+ new NormalizedDateSegmentNameGenerator(_rawTableName, _jobConf.get(JobConfigConstants.SEGMENT_NAME_PREFIX),
+ _jobConf.getBoolean(JobConfigConstants.EXCLUDE_SEQUENCE_ID, false),
+ validationConfig.getSegmentPushType(), validationConfig.getSegmentPushFrequency(),
+ validationConfig.getTimeType(), timeFormat);
+ break;
+ default:
+ throw new UnsupportedOperationException("Unsupported segment name generator type: " + segmentNameGeneratorType);
+ }
+
+ // Working directories
+ _hdfsSegmentTarDir = new Path(workerOutputPath, JobConfigConstants.SEGMENT_TAR_DIR);
+ _localStagingDir = new File(LOCAL_TEMP_DIR);
+ _localInputDir = new File(_localStagingDir, "inputData");
+ _localSegmentDir = new File(_localStagingDir, "segments");
+ _localSegmentTarDir = new File(_localStagingDir, JobConfigConstants.SEGMENT_TAR_DIR);
+
+ if (_localStagingDir.exists()) {
+ _logger.warn("Deleting existing file: {}", _localStagingDir);
+ FileUtils.forceDelete(_localStagingDir);
+ }
+ _logger
+ .info("Making local temporary directories: {}, {}, {}", _localStagingDir, _localInputDir, _localSegmentTarDir);
+ Preconditions.checkState(_localStagingDir.mkdirs());
+ Preconditions.checkState(_localInputDir.mkdir());
+ Preconditions.checkState(_localSegmentDir.mkdir());
+ Preconditions.checkState(_localSegmentTarDir.mkdir());
+
+ _logger.info("*********************************************************************");
+ _logger.info("Raw Table Name: {}", _rawTableName);
+ _logger.info("Schema: {}", _schema);
+ _logger.info("Segment Name Generator: {}", _segmentNameGenerator);
+ _logger.info("Table Config: {}", _tableConfig);
+ _logger.info("Reader Config File: {}", _readerConfigFile);
+ _logger.info("*********************************************************************");
+ _logger.info("HDFS Segment Tar Directory: {}", _hdfsSegmentTarDir);
+ _logger.info("Local Staging Directory: {}", _localStagingDir);
+ _logger.info("Local Input Directory: {}", _localInputDir);
+ _logger.info("Local Segment Tar Directory: {}", _localSegmentTarDir);
+ _logger.info("*********************************************************************");
+ }
+
+ /**
+ * Generate a relative output directory path when `useRelativePath` flag is on.
+ * This method will compute the relative path based on `inputFile` and `baseInputDir`,
+ * then apply only the directory part of relative path to `outputDir`.
+ * E.g.
+ * baseInputDir = "/path/to/input"
+ * inputFile = "/path/to/input/a/b/c/d.avro"
+ * outputDir = "/path/to/output"
+ * getRelativeOutputPath(baseInputDir, inputFile, outputDir) = /path/to/output/a/b/c
+ */
+ protected static Path getRelativeOutputPath(URI baseInputDir, URI inputFile, Path outputDir) {
+ URI relativePath = baseInputDir.relativize(inputFile);
+ Preconditions.checkState(relativePath.getPath().length() > 0 && !relativePath.equals(inputFile),
+ "Unable to extract out the relative path based on base input path: " + baseInputDir);
+ return new Path(outputDir, relativePath.getPath()).getParent();
+ }
+
+ protected void logConfigurations() {
+ StringBuilder stringBuilder = new StringBuilder();
+ stringBuilder.append('{');
+ boolean firstEntry = true;
+ for (Map.Entry<String, String> entry : _jobConf) {
+ if (!firstEntry) {
+ stringBuilder.append(", ");
+ } else {
+ firstEntry = false;
+ }
+
+ stringBuilder.append(entry.getKey());
+ stringBuilder.append('=');
+ stringBuilder.append(entry.getValue());
+ }
+ stringBuilder.append('}');
+
+ _logger.info("*********************************************************************");
+ _logger.info("Job Configurations: {}", stringBuilder.toString());
+ _logger.info("*********************************************************************");
+ }
+
+ protected void run(String hdfsInputFileString, Long seqId)
+ throws IOException, InterruptedException {
+ Path hdfsInputFile = new Path(hdfsInputFileString);
+ int sequenceId = seqId.intValue();
+ _logger.info("Generating segment with HDFS input file: {}, sequence id: {}", hdfsInputFile, sequenceId);
+
+ String inputFileName = hdfsInputFile.getName();
+ File localInputFile = new File(_localInputDir, inputFileName);
+ _logger.info("Copying input file from: {} to: {}", hdfsInputFile, localInputFile);
+ FileSystem.get(hdfsInputFile.toUri(), _jobConf)
+ .copyToLocalFile(hdfsInputFile, new Path(localInputFile.getAbsolutePath()));
+
+ SegmentGeneratorConfig segmentGeneratorConfig = new SegmentGeneratorConfig(_tableConfig, _schema);
+ segmentGeneratorConfig.setTableName(_rawTableName);
+ segmentGeneratorConfig.setInputFilePath(localInputFile.getPath());
+ segmentGeneratorConfig.setOutDir(_localSegmentDir.getPath());
+ segmentGeneratorConfig.setSegmentNameGenerator(_segmentNameGenerator);
+ segmentGeneratorConfig.setSequenceId(sequenceId);
+ if (_recordReaderPath != null) {
+ segmentGeneratorConfig.setRecordReaderPath(_recordReaderPath);
+ segmentGeneratorConfig.setFormat(FileFormat.OTHER);
+ } else {
+ FileFormat fileFormat = getFileFormat(inputFileName);
+ segmentGeneratorConfig.setFormat(fileFormat);
+ segmentGeneratorConfig.setReaderConfig(getReaderConfig(fileFormat));
+ }
+ segmentGeneratorConfig.setOnHeap(true);
+
+ addAdditionalSegmentGeneratorConfigs(segmentGeneratorConfig, hdfsInputFile, sequenceId);
+
+ _logger.info("Start creating segment with sequence id: {}", sequenceId);
+ SegmentIndexCreationDriver driver = new SegmentIndexCreationDriverImpl();
+
+ try {
+ driver.init(segmentGeneratorConfig);
+ driver.build();
+ } catch (Exception e) {
+ _logger.error("Caught exception while creating segment with HDFS input file: {}, sequence id: {}", hdfsInputFile,
+ sequenceId, e);
+ throw new RuntimeException(e);
+ }
+ String segmentName = driver.getSegmentName();
+ _logger.info("Finish creating segment: {} with sequence id: {}", segmentName, sequenceId);
+
+ File localSegmentDir = new File(_localSegmentDir, segmentName);
+ String segmentTarFileName = segmentName + JobConfigConstants.TAR_GZ_FILE_EXT;
+ File localSegmentTarFile = new File(_localSegmentTarDir, segmentTarFileName);
+ _logger.info("Tarring segment from: {} to: {}", localSegmentDir, localSegmentTarFile);
+ TarGzCompressionUtils.createTarGzOfDirectory(localSegmentDir.getPath(), localSegmentTarFile.getPath());
+
+ long uncompressedSegmentSize = FileUtils.sizeOf(localSegmentDir);
+ long compressedSegmentSize = FileUtils.sizeOf(localSegmentTarFile);
+ _logger.info("Size for segment: {}, uncompressed: {}, compressed: {}", segmentName,
+ DataSize.fromBytes(uncompressedSegmentSize), DataSize.fromBytes(compressedSegmentSize));
+
+ Path hdfsSegmentTarFile = new Path(_hdfsSegmentTarDir, segmentTarFileName);
+ if (_useRelativePath) {
+ Path relativeOutputPath =
+ getRelativeOutputPath(new Path(_jobConf.get(JobConfigConstants.PATH_TO_INPUT)).toUri(), hdfsInputFile.toUri(),
+ _hdfsSegmentTarDir);
+ hdfsSegmentTarFile = new Path(relativeOutputPath, segmentTarFileName);
+ }
+ _logger.info("Copying segment tar file from: {} to: {}", localSegmentTarFile, hdfsSegmentTarFile);
+ FileSystem.get(hdfsSegmentTarFile.toUri(), _jobConf)
+ .copyFromLocalFile(true, true, new Path(localSegmentTarFile.getAbsolutePath()), hdfsSegmentTarFile);
+
+ _logger.info("Finish generating segment: {} with HDFS input file: {}, sequence id: {}", segmentName, hdfsInputFile,
+ sequenceId);
+ }
+
+ protected FileFormat getFileFormat(String fileName) {
+ if (fileName.endsWith(".avro")) {
+ return FileFormat.AVRO;
+ }
+ if (fileName.endsWith(".csv")) {
+ return FileFormat.CSV;
+ }
+ if (fileName.endsWith(".json")) {
+ return FileFormat.JSON;
+ }
+ if (fileName.endsWith(".thrift")) {
+ return FileFormat.THRIFT;
+ }
+ throw new IllegalArgumentException("Unsupported file format: {}" + fileName);
+ }
+
+ @Nullable
+ protected RecordReaderConfig getReaderConfig(FileFormat fileFormat)
+ throws IOException {
+ if (_readerConfigFile != null) {
+ if (fileFormat == FileFormat.CSV) {
+ try (InputStream inputStream = FileSystem.get(_readerConfigFile.toUri(), _jobConf).open(_readerConfigFile)) {
+ CSVRecordReaderConfig readerConfig = JsonUtils.inputStreamToObject(inputStream, CSVRecordReaderConfig.class);
+ _logger.info("Using CSV record reader config: {}", readerConfig);
+ return readerConfig;
+ }
+ }
+ if (fileFormat == FileFormat.THRIFT) {
+ try (InputStream inputStream = FileSystem.get(_readerConfigFile.toUri(), _jobConf).open(_readerConfigFile)) {
+ ThriftRecordReaderConfig readerConfig =
+ JsonUtils.inputStreamToObject(inputStream, ThriftRecordReaderConfig.class);
+ _logger.info("Using Thrift record reader config: {}", readerConfig);
+ return readerConfig;
+ }
+ }
+ }
+ return null;
+ }
+
+ /**
+ * Can be overridden to set additional segment generator configs.
+ */
+ @SuppressWarnings("unused")
+ protected void addAdditionalSegmentGeneratorConfigs(SegmentGeneratorConfig segmentGeneratorConfig, Path hdfsInputFile,
+ int sequenceId) {
+ }
+
+ public void cleanup() {
+ _logger.info("Deleting local temporary directory: {}", _localStagingDir);
+ FileUtils.deleteQuietly(_localStagingDir);
+ }
+}
diff --git a/pinot-spark/src/main/java/org/apache/pinot/spark/jobs/SegmentTarPushJob.java b/pinot-spark/src/main/java/org/apache/pinot/spark/jobs/SegmentTarPushJob.java
new file mode 100644
index 0000000..259e95b
--- /dev/null
+++ b/pinot-spark/src/main/java/org/apache/pinot/spark/jobs/SegmentTarPushJob.java
@@ -0,0 +1,116 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.pinot.spark.jobs;
+
+import com.google.common.base.Preconditions;
+import java.io.IOException;
+import java.util.ArrayList;
+import java.util.HashSet;
+import java.util.List;
+import java.util.Properties;
+import java.util.Set;
+import java.util.stream.Collectors;
+import org.apache.commons.lang3.StringUtils;
+import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.fs.FileSystem;
+import org.apache.hadoop.fs.Path;
+import org.apache.pinot.spark.utils.PushLocation;
+
+
+public class SegmentTarPushJob extends BaseSegmentJob {
+ private final Path _segmentPattern;
+ private final List<PushLocation> _pushLocations;
+ private final String _rawTableName;
+ private final boolean _deleteExtraSegments;
+
+ public SegmentTarPushJob(Properties properties) {
+ super(properties);
+ _segmentPattern = Preconditions.checkNotNull(getPathFromProperty(JobConfigConstants.PATH_TO_OUTPUT));
+ String[] hosts = StringUtils.split(properties.getProperty(JobConfigConstants.PUSH_TO_HOSTS), ',');
+ int port = Integer.parseInt(properties.getProperty(JobConfigConstants.PUSH_TO_PORT));
+ _pushLocations = PushLocation.getPushLocations(hosts, port);
+ _rawTableName = Preconditions.checkNotNull(_properties.getProperty(JobConfigConstants.SEGMENT_TABLE_NAME));
+ _deleteExtraSegments = Boolean.parseBoolean(properties.getProperty(JobConfigConstants.DELETE_EXTRA_SEGMENTS, "false"));
+ }
+
+ @Override
+ protected boolean isDataFile(String fileName) {
+ return fileName.endsWith(JobConfigConstants.TAR_GZ_FILE_EXT);
+ }
+
+ public void run()
+ throws Exception {
+ FileSystem fileSystem = FileSystem.get(_segmentPattern.toUri(), new Configuration());
+ List<Path> segmentsToPush = getDataFilePaths(_segmentPattern);
+ try (ControllerRestApi controllerRestApi = getControllerRestApi()) {
+ // TODO: Deal with invalid prefixes in the future
+
+ List<String> currentSegments = controllerRestApi.getAllSegments("OFFLINE");
+
+ controllerRestApi.pushSegments(fileSystem, segmentsToPush);
+
+ if (_deleteExtraSegments) {
+ controllerRestApi.deleteSegmentUris(getSegmentsToDelete(currentSegments, segmentsToPush));
+ }
+ }
+ }
+
+ /**
+ * Deletes extra segments after pushing to the controller
+ * @param allSegments all segments on the controller for the table
+ * @param segmentsToPush segments that will be pushed to the controller
+ * @throws IOException
+ */
+ public List<String> getSegmentsToDelete(List<String> allSegments, List<Path> segmentsToPush) {
+ Set<String> uniqueSegmentPrefixes = new HashSet<>();
+
+ // Get all relevant segment prefixes that we are planning on pushing
+ List<String> segmentNamesToPush = segmentsToPush.stream().map(s -> s.getName()).collect(Collectors.toList());
+ for (String segmentName : segmentNamesToPush) {
+ String segmentNamePrefix = removeSequenceId(segmentName);
+ uniqueSegmentPrefixes.add(segmentNamePrefix);
+ }
+
+ List<String> relevantSegments = new ArrayList<>();
+ // Get relevant segments already pushed that we are planning on refreshing
+ for (String segmentName : allSegments) {
+ if (uniqueSegmentPrefixes.contains(removeSequenceId(segmentName))) {
+ relevantSegments.add(segmentName);
+ }
+ }
+
+ relevantSegments.removeAll(segmentNamesToPush);
+ return relevantSegments;
+ }
+
+ /**
+ * Remove trailing sequence id
+ * eg: If segment name is mytable_12, it will return mytable_
+ * If segment name is mytable_20190809_20190809_12, it will return mytable_20190809_20190809_
+ * @param segmentName
+ * @return
+ */
+ private String removeSequenceId(String segmentName) {
+ return segmentName.replaceAll("\\d*$", "");
+ }
+
+ protected ControllerRestApi getControllerRestApi() {
+ return new DefaultControllerRestApi(_pushLocations, _rawTableName);
+ }
+}
diff --git a/pinot-spark/src/main/java/org/apache/pinot/spark/jobs/SegmentUriPushJob.java b/pinot-spark/src/main/java/org/apache/pinot/spark/jobs/SegmentUriPushJob.java
new file mode 100644
index 0000000..910fae3
--- /dev/null
+++ b/pinot-spark/src/main/java/org/apache/pinot/spark/jobs/SegmentUriPushJob.java
@@ -0,0 +1,68 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.pinot.spark.jobs;
+
+import com.google.common.base.Preconditions;
+import java.util.ArrayList;
+import java.util.List;
+import java.util.Properties;
+import org.apache.commons.lang3.StringUtils;
+import org.apache.hadoop.fs.Path;
+import org.apache.pinot.spark.utils.PushLocation;
+
+
+public class SegmentUriPushJob extends BaseSegmentJob {
+ private final String _segmentUriPrefix;
+ private final String _segmentUriSuffix;
+ private final Path _segmentPattern;
+ private final List<PushLocation> _pushLocations;
+ private final String _rawTableName;
+
+ public SegmentUriPushJob(Properties properties) {
+ super(properties);
+ _segmentUriPrefix = properties.getProperty("uri.prefix", "");
+ _segmentUriSuffix = properties.getProperty("uri.suffix", "");
+ _segmentPattern = Preconditions.checkNotNull(getPathFromProperty(JobConfigConstants.PATH_TO_OUTPUT));
+ String[] hosts = StringUtils.split(properties.getProperty(JobConfigConstants.PUSH_TO_HOSTS), ',');
+ int port = Integer.parseInt(properties.getProperty(JobConfigConstants.PUSH_TO_PORT));
+ _pushLocations = PushLocation.getPushLocations(hosts, port);
+ _rawTableName = Preconditions.checkNotNull(_properties.getProperty(JobConfigConstants.SEGMENT_TABLE_NAME));
+ }
+
+ @Override
+ protected boolean isDataFile(String fileName) {
+ return fileName.endsWith(JobConfigConstants.TAR_GZ_FILE_EXT);
+ }
+
+ public void run()
+ throws Exception {
+ try (ControllerRestApi controllerRestApi = getControllerRestApi()) {
+ List<Path> tarFilePaths = getDataFilePaths(_segmentPattern);
+ List<String> segmentUris = new ArrayList<>(tarFilePaths.size());
+ for (Path tarFilePath : tarFilePaths) {
+ segmentUris.add(_segmentUriPrefix + tarFilePath.toUri().getRawPath() + _segmentUriSuffix);
+ }
+ controllerRestApi.sendSegmentUris(segmentUris);
+ }
+ }
+
+ protected ControllerRestApi getControllerRestApi() {
+ return new DefaultControllerRestApi(_pushLocations, _rawTableName);
+ }
+}
diff --git a/pinot-spark/src/main/java/org/apache/pinot/spark/utils/JobPreparationHelper.java b/pinot-spark/src/main/java/org/apache/pinot/spark/utils/JobPreparationHelper.java
new file mode 100644
index 0000000..92bc990
--- /dev/null
+++ b/pinot-spark/src/main/java/org/apache/pinot/spark/utils/JobPreparationHelper.java
@@ -0,0 +1,70 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.pinot.spark.utils;
+
+import java.io.IOException;
+import org.apache.hadoop.fs.FileStatus;
+import org.apache.hadoop.fs.FileSystem;
+import org.apache.hadoop.fs.Path;
+import org.apache.hadoop.fs.permission.FsPermission;
+import org.apache.spark.api.java.JavaSparkContext;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+
+public class JobPreparationHelper {
+ private static final Logger _logger = LoggerFactory.getLogger(JobPreparationHelper.class);
+
+ public static void mkdirs(FileSystem fileSystem, Path dirPath, String defaultPermissionsMask)
+ throws IOException {
+ if (fileSystem.exists(dirPath)) {
+ _logger.warn("Deleting existing file: {}", dirPath);
+ fileSystem.delete(dirPath, true);
+ }
+ _logger.info("Making directory: {}", dirPath);
+ fileSystem.mkdirs(dirPath);
+ JobPreparationHelper.setDirPermission(fileSystem, dirPath, defaultPermissionsMask);
+ }
+
+ public static void addDepsJarToDistributedCacheHelper(FileSystem fileSystem, JavaSparkContext sparkContext,
+ Path depsJarDir)
+ throws IOException {
+ FileStatus[] fileStatuses = fileSystem.listStatus(depsJarDir);
+ for (FileStatus fileStatus : fileStatuses) {
+ if (fileStatus.isDirectory()) {
+ addDepsJarToDistributedCacheHelper(fileSystem, sparkContext, fileStatus.getPath());
+ } else {
+ Path depJarPath = fileStatus.getPath();
+ if (depJarPath.getName().endsWith(".jar")) {
+ _logger.info("Adding deps jar: {} to distributed cache", depJarPath);
+ sparkContext.addJar(depJarPath.toUri().getPath());
+ }
+ }
+ }
+ }
+
+ public static void setDirPermission(FileSystem fileSystem, Path dirPath, String defaultPermissionsMask)
+ throws IOException {
+ if (defaultPermissionsMask != null) {
+ FsPermission permission = FsPermission.getDirDefault().applyUMask(new FsPermission(defaultPermissionsMask));
+ _logger.info("Setting permission: {} to directory: {}", permission, dirPath);
+ fileSystem.setPermission(dirPath, permission);
+ }
+ }
+}
diff --git a/pinot-spark/src/main/java/org/apache/pinot/spark/utils/PushLocation.java b/pinot-spark/src/main/java/org/apache/pinot/spark/utils/PushLocation.java
new file mode 100644
index 0000000..8793a1a
--- /dev/null
+++ b/pinot-spark/src/main/java/org/apache/pinot/spark/utils/PushLocation.java
@@ -0,0 +1,54 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.pinot.spark.utils;
+
+import java.util.ArrayList;
+import java.util.List;
+
+
+public class PushLocation {
+ private final String _host;
+ private final int _port;
+
+ public PushLocation(String host, int port) {
+ _host = host;
+ _port = port;
+ }
+
+ public static List<PushLocation> getPushLocations(String[] hosts, int port) {
+ List<PushLocation> pushLocations = new ArrayList<>(hosts.length);
+ for (String host : hosts) {
+ pushLocations.add(new PushLocation(host, port));
+ }
+ return pushLocations;
+ }
+
+ public String getHost() {
+ return _host;
+ }
+
+ public int getPort() {
+ return _port;
+ }
+
+ @Override
+ public String toString() {
+ return _host + ":" + _port;
+ }
+}
diff --git a/pom.xml b/pom.xml
index e6dc7f1..6ceee81 100644
--- a/pom.xml
+++ b/pom.xml
@@ -128,6 +128,7 @@
<hadoop.version>2.7.0</hadoop.version>
<spark.version>2.2.0</spark.version>
<scala.binary.version>2.11</scala.binary.version>
+ <scala.version>2.11.11</scala.version>
<antlr.version>4.6</antlr.version>
<calcite.version>1.19.0</calcite.version>
<!-- commons-configuration, hadoop-common, hadoop-client use commons-lang -->
@@ -597,6 +598,16 @@
<artifactId>spark-sql_${scala.binary.version}</artifactId>
<version>${spark.version}</version>
</dependency>
+ <dependency>
+ <groupId>org.scala-lang</groupId>
+ <artifactId>scala-library</artifactId>
+ <version>${scala.version}</version>
+ </dependency>
+ <dependency>
+ <groupId>org.scala-lang</groupId>
+ <artifactId>scala-reflect</artifactId>
+ <version>${scala.version}</version>
+ </dependency>
<!-- Hadoop -->
<dependency>
---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscribe@pinot.apache.org
For additional commands, e-mail: commits-help@pinot.apache.org