You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@pinot.apache.org by ja...@apache.org on 2019/02/12 05:12:22 UTC
[incubator-pinot] branch master updated: Refactor Hadoop Jobs
(#3813)
This is an automated email from the ASF dual-hosted git repository.
jackie pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/incubator-pinot.git
The following commit(s) were added to refs/heads/master by this push:
new eacb020 Refactor Hadoop Jobs (#3813)
eacb020 is described below
commit eacb0200d811374b097e504fee2870d9ff0b2b64
Author: Xiaotian (Jackie) Jiang <17...@users.noreply.github.com>
AuthorDate: Mon Feb 11 21:12:16 2019 -0800
Refactor Hadoop Jobs (#3813)
- Make Hadoop Jobs easier to extend
- Add interface ControllerRestApi to provide APIs for config fetching and segment pushing
- Add BaseSegmentJob abstract class for common segment related jobs
- In SegmentCreationJob, allow plugable mapper class and additional job properties
- In SegmentCreationMapper, allow additional segment generator configs
- Tested with Hadoop integration test
---
.../common/utils/FileUploadDownloadClient.java | 32 +-
.../data/readers/ThriftRecordReaderConfig.java | 2 +-
.../generator/SegmentGeneratorConfig.java | 2 +-
.../pinot/hadoop/PinotHadoopJobLauncher.java | 26 +-
.../apache/pinot/hadoop/io/PinotRecordWriter.java | 5 +-
.../apache/pinot/hadoop/job/BaseSegmentJob.java | 84 ++++
.../apache/pinot/hadoop/job/ControllerRestApi.java | 84 +---
.../pinot/hadoop/job/DefaultControllerRestApi.java | 142 +++++++
.../pinot/hadoop/job/JobConfigConstants.java | 8 +-
.../pinot/hadoop/job/SegmentCreationJob.java | 427 +++++++++------------
.../apache/pinot/hadoop/job/SegmentTarPushJob.java | 85 ++--
.../apache/pinot/hadoop/job/SegmentUriPushJob.java | 98 ++---
.../mapper/HadoopSegmentCreationMapReduceJob.java | 333 ----------------
.../hadoop/job/mapper/SegmentCreationMapper.java | 248 ++++++++++++
.../apache/pinot/hadoop/utils/PushLocation.java | 40 +-
...mentBuildPushOfflineClusterIntegrationTest.java | 4 +-
16 files changed, 754 insertions(+), 866 deletions(-)
diff --git a/pinot-common/src/main/java/org/apache/pinot/common/utils/FileUploadDownloadClient.java b/pinot-common/src/main/java/org/apache/pinot/common/utils/FileUploadDownloadClient.java
index 75017cf..4a37dcf 100644
--- a/pinot-common/src/main/java/org/apache/pinot/common/utils/FileUploadDownloadClient.java
+++ b/pinot-common/src/main/java/org/apache/pinot/common/utils/FileUploadDownloadClient.java
@@ -97,7 +97,6 @@ public class FileUploadDownloadClient implements Closeable {
private static final String SEGMENT_METADATA_PATH = "/segmentmetadata";
private static final String TABLES_PATH = "/tables";
private static final String TYPE_DELIMITER = "?type=";
- private static final String SLASH = "/";
private final CloseableHttpClient _httpClient;
@@ -122,16 +121,14 @@ public class FileUploadDownloadClient implements Closeable {
return new URI(scheme, null, host, port, path, null, null);
}
- public static URI getRetrieveTableConfigURI(String host, int port, String tableName)
+ public static URI getRetrieveTableConfigHttpURI(String host, int port, String rawTableName)
throws URISyntaxException {
- String path = TABLES_PATH + SLASH + tableName;
- return getURI(HTTP, host, port, path);
+ return getURI(HTTP, host, port, TABLES_PATH + "/" + rawTableName);
}
- public static URI getRetrieveSchemaHttpURI(String host, int port, String tableName)
+ public static URI getRetrieveSchemaHttpURI(String host, int port, String schemaName)
throws URISyntaxException {
- String path = SCHEMA_PATH + SLASH + tableName;
- return getURI(HTTP, host, port, path);
+ return getURI(HTTP, host, port, SCHEMA_PATH + "/" + schemaName);
}
public static URI getUploadSchemaHttpURI(String host, int port)
@@ -336,26 +333,7 @@ public class FileUploadDownloadClient implements Closeable {
return errorMessage;
}
- /**
- * Get request to retrieve table config
- * @param uri URI
- * @return Response
- * @throws IOException
- * @throws HttpErrorStatusException
- */
- public SimpleHttpResponse getTableConfig(URI uri)
- throws IOException, HttpErrorStatusException {
- return sendRequest(constructGetRequest(uri));
- }
-
- /**
- * Get request to retrieve schema
- * @param uri URI
- * @return Response
- * @throws IOException
- * @throws HttpErrorStatusException
- */
- public SimpleHttpResponse getSchema(URI uri)
+ public SimpleHttpResponse sendGetRequest(URI uri)
throws IOException, HttpErrorStatusException {
return sendRequest(constructGetRequest(uri));
}
diff --git a/pinot-core/src/main/java/org/apache/pinot/core/data/readers/ThriftRecordReaderConfig.java b/pinot-core/src/main/java/org/apache/pinot/core/data/readers/ThriftRecordReaderConfig.java
index 2f48f98..a2e532b 100644
--- a/pinot-core/src/main/java/org/apache/pinot/core/data/readers/ThriftRecordReaderConfig.java
+++ b/pinot-core/src/main/java/org/apache/pinot/core/data/readers/ThriftRecordReaderConfig.java
@@ -31,7 +31,7 @@ public class ThriftRecordReaderConfig implements RecordReaderConfig {
}
public void setThriftClass(String thriftClass) {
- this._thriftClass = thriftClass;
+ _thriftClass = thriftClass;
}
@Override
diff --git a/pinot-core/src/main/java/org/apache/pinot/core/indexsegment/generator/SegmentGeneratorConfig.java b/pinot-core/src/main/java/org/apache/pinot/core/indexsegment/generator/SegmentGeneratorConfig.java
index 5b41e31..61cfbac 100644
--- a/pinot-core/src/main/java/org/apache/pinot/core/indexsegment/generator/SegmentGeneratorConfig.java
+++ b/pinot-core/src/main/java/org/apache/pinot/core/indexsegment/generator/SegmentGeneratorConfig.java
@@ -162,7 +162,7 @@ public class SegmentGeneratorConfig {
* @param tableConfig
* @param schema
*/
- public SegmentGeneratorConfig(TableConfig tableConfig, Schema schema) {
+ public SegmentGeneratorConfig(@Nullable TableConfig tableConfig, Schema schema) {
Preconditions.checkNotNull(schema);
_schema = schema;
diff --git a/pinot-hadoop/src/main/java/org/apache/pinot/hadoop/PinotHadoopJobLauncher.java b/pinot-hadoop/src/main/java/org/apache/pinot/hadoop/PinotHadoopJobLauncher.java
index bf84a0e..2c7b0da 100644
--- a/pinot-hadoop/src/main/java/org/apache/pinot/hadoop/PinotHadoopJobLauncher.java
+++ b/pinot-hadoop/src/main/java/org/apache/pinot/hadoop/PinotHadoopJobLauncher.java
@@ -21,7 +21,6 @@ package org.apache.pinot.hadoop;
import java.io.FileInputStream;
import java.util.Arrays;
import java.util.Properties;
-import org.apache.pinot.common.utils.StringUtil;
import org.apache.pinot.hadoop.job.SegmentCreationJob;
import org.apache.pinot.hadoop.job.SegmentTarPushJob;
import org.apache.pinot.hadoop.job.SegmentUriPushJob;
@@ -36,13 +35,6 @@ public class PinotHadoopJobLauncher {
private static final String USAGE = "usage: [job_type] [job.properties]";
private static final String SUPPORT_JOB_TYPES =
"\tsupport job types: " + Arrays.toString(PinotHadoopJobType.values());
- private static final String SEGMENT_CREATION_JOB_NAME = PinotHadoopJobType.SegmentCreation.toString();
- private static final String SEGMENT_PUSH_TAR_JOB_NAME = PinotHadoopJobType.SegmentTarPush.toString();
- private static final String SEGMENT_PUSH_URI_JOB_NAME = PinotHadoopJobType.SegmentUriPush.toString();
- private static final String SEGMENT_CREATION_AND_TAR_PUSH_JOB_NAME =
- PinotHadoopJobType.SegmentCreationAndTarPush.toString();
- private static final String SEGMENT_CREATION_AND_URI_PUSH_JOB_NAME =
- PinotHadoopJobType.SegmentCreationAndUriPush.toString();
private static void usage() {
System.err.println(USAGE);
@@ -53,25 +45,21 @@ public class PinotHadoopJobLauncher {
throws Exception {
switch (jobType) {
case SegmentCreation:
- new SegmentCreationJob(SEGMENT_CREATION_JOB_NAME, jobConf).run();
+ new SegmentCreationJob(jobConf).run();
break;
case SegmentTarPush:
- new SegmentTarPushJob(SEGMENT_PUSH_TAR_JOB_NAME, jobConf).run();
+ new SegmentTarPushJob(jobConf).run();
break;
case SegmentUriPush:
- new SegmentUriPushJob(SEGMENT_PUSH_URI_JOB_NAME, jobConf).run();
+ new SegmentUriPushJob(jobConf).run();
break;
case SegmentCreationAndTarPush:
- new SegmentCreationJob(StringUtil.join(":", SEGMENT_CREATION_JOB_NAME, SEGMENT_CREATION_AND_TAR_PUSH_JOB_NAME),
- jobConf).run();
- new SegmentTarPushJob(StringUtil.join(":", SEGMENT_PUSH_TAR_JOB_NAME, SEGMENT_CREATION_AND_TAR_PUSH_JOB_NAME),
- jobConf).run();
+ new SegmentCreationJob(jobConf).run();
+ new SegmentTarPushJob(jobConf).run();
break;
case SegmentCreationAndUriPush:
- new SegmentCreationJob(StringUtil.join(":", SEGMENT_CREATION_JOB_NAME, SEGMENT_CREATION_AND_URI_PUSH_JOB_NAME),
- jobConf).run();
- new SegmentUriPushJob(StringUtil.join(":", SEGMENT_PUSH_TAR_JOB_NAME, SEGMENT_CREATION_AND_URI_PUSH_JOB_NAME),
- jobConf).run();
+ new SegmentCreationJob(jobConf).run();
+ new SegmentUriPushJob(jobConf).run();
break;
default:
throw new RuntimeException("Not a valid jobType - " + jobType);
diff --git a/pinot-hadoop/src/main/java/org/apache/pinot/hadoop/io/PinotRecordWriter.java b/pinot-hadoop/src/main/java/org/apache/pinot/hadoop/io/PinotRecordWriter.java
index 08468d9..6585295 100644
--- a/pinot-hadoop/src/main/java/org/apache/pinot/hadoop/io/PinotRecordWriter.java
+++ b/pinot-hadoop/src/main/java/org/apache/pinot/hadoop/io/PinotRecordWriter.java
@@ -88,7 +88,8 @@ public class PinotRecordWriter<K, V> extends RecordWriter<K, V> {
String localTarPath = getLocalTarFile(PinotOutputFormat.getTempSegmentDir(context));
LOGGER.info("Trying to tar the segment to: {}", localTarPath);
TarGzCompressionUtils.createTarGzOfDirectory(localSegmentPath, localTarPath);
- String hdfsTarPath = _workDir + "/segmentTar/" + _segmentConfig.getSegmentName() + JobConfigConstants.TARGZ;
+ String hdfsTarPath =
+ _workDir + "/segmentTar/" + _segmentConfig.getSegmentName() + JobConfigConstants.TAR_GZ_FILE_EXT;
LOGGER.info("*********************************************************************");
LOGGER.info("Copy from : {} to {}", localTarPath, hdfsTarPath);
@@ -113,7 +114,7 @@ public class PinotRecordWriter<K, V> extends RecordWriter<K, V> {
if (!f.exists()) {
f.mkdirs();
}
- return localTarDir + "/" + _segmentConfig.getSegmentName() + JobConfigConstants.TARGZ;
+ return localTarDir + "/" + _segmentConfig.getSegmentName() + JobConfigConstants.TAR_GZ_FILE_EXT;
}
/**
diff --git a/pinot-hadoop/src/main/java/org/apache/pinot/hadoop/job/BaseSegmentJob.java b/pinot-hadoop/src/main/java/org/apache/pinot/hadoop/job/BaseSegmentJob.java
new file mode 100644
index 0000000..488c587
--- /dev/null
+++ b/pinot-hadoop/src/main/java/org/apache/pinot/hadoop/job/BaseSegmentJob.java
@@ -0,0 +1,84 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.pinot.hadoop.job;
+
+import java.io.IOException;
+import java.util.ArrayList;
+import java.util.List;
+import java.util.Properties;
+import javax.annotation.Nullable;
+import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.conf.Configured;
+import org.apache.hadoop.fs.FileStatus;
+import org.apache.hadoop.fs.FileSystem;
+import org.apache.hadoop.fs.Path;
+import org.apache.pinot.common.Utils;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+
+public abstract class BaseSegmentJob extends Configured {
+ protected final Logger _logger = LoggerFactory.getLogger(getClass());
+ protected final Properties _properties;
+ protected final Configuration _conf;
+
+ protected BaseSegmentJob(Properties properties) {
+ _properties = properties;
+ _conf = new Configuration();
+ setConf(_conf);
+ Utils.logVersions();
+ logProperties();
+ }
+
+ protected void logProperties() {
+ _logger.info("*********************************************************************");
+ _logger.info("Job Properties: {}", _properties);
+ _logger.info("*********************************************************************");
+ }
+
+ @Nullable
+ protected Path getPathFromProperty(String key) {
+ String value = _properties.getProperty(key);
+ return value != null ? new Path(value) : null;
+ }
+
+ protected List<Path> getDataFilePaths(Path pathPattern)
+ throws IOException {
+ List<Path> tarFilePaths = new ArrayList<>();
+ FileSystem fileSystem = FileSystem.get(_conf);
+ getDataFilePathsHelper(fileSystem, fileSystem.globStatus(pathPattern), tarFilePaths);
+ return tarFilePaths;
+ }
+
+ protected void getDataFilePathsHelper(FileSystem fileSystem, FileStatus[] fileStatuses, List<Path> tarFilePaths)
+ throws IOException {
+ for (FileStatus fileStatus : fileStatuses) {
+ Path path = fileStatus.getPath();
+ if (fileStatus.isDirectory()) {
+ getDataFilePathsHelper(fileSystem, fileSystem.listStatus(path), tarFilePaths);
+ } else {
+ if (isDataFile(path.getName())) {
+ tarFilePaths.add(path);
+ }
+ }
+ }
+ }
+
+ protected abstract boolean isDataFile(String fileName);
+}
diff --git a/pinot-hadoop/src/main/java/org/apache/pinot/hadoop/job/ControllerRestApi.java b/pinot-hadoop/src/main/java/org/apache/pinot/hadoop/job/ControllerRestApi.java
index 1568710..319b55b 100644
--- a/pinot-hadoop/src/main/java/org/apache/pinot/hadoop/job/ControllerRestApi.java
+++ b/pinot-hadoop/src/main/java/org/apache/pinot/hadoop/job/ControllerRestApi.java
@@ -18,87 +18,21 @@
*/
package org.apache.pinot.hadoop.job;
-import com.fasterxml.jackson.databind.JsonNode;
-import java.net.URI;
-import java.net.URISyntaxException;
-import java.util.ArrayList;
+import java.io.Closeable;
import java.util.List;
+import org.apache.hadoop.fs.FileSystem;
+import org.apache.hadoop.fs.Path;
import org.apache.pinot.common.config.TableConfig;
-import org.apache.pinot.common.utils.FileUploadDownloadClient;
-import org.apache.pinot.common.utils.JsonUtils;
-import org.apache.pinot.common.utils.SimpleHttpResponse;
-import org.apache.pinot.hadoop.utils.PushLocation;
-import org.slf4j.Logger;
-import org.slf4j.LoggerFactory;
+import org.apache.pinot.common.data.Schema;
-public class ControllerRestApi {
- private static final Logger LOGGER = LoggerFactory.getLogger(ControllerRestApi.class);
- private final List<PushLocation> _pushLocations;
- private final String _tableName;
+public interface ControllerRestApi extends Closeable {
- private static final String OFFLINE = "OFFLINE";
+ TableConfig getTableConfig();
- public ControllerRestApi(List<PushLocation> pushLocations, String tableName) {
- LOGGER.info("Push Locations are: " + pushLocations);
- _pushLocations = pushLocations;
- _tableName = tableName;
- }
+ Schema getSchema();
- public TableConfig getTableConfig() {
- FileUploadDownloadClient fileUploadDownloadClient = new FileUploadDownloadClient();
- List<URI> tableConfigURIs = new ArrayList<>();
- try {
- for (PushLocation pushLocation : _pushLocations) {
- tableConfigURIs.add(FileUploadDownloadClient
- .getRetrieveTableConfigURI(pushLocation.getHost(), pushLocation.getPort(), _tableName));
- }
- } catch (URISyntaxException e) {
- LOGGER.error("Could not construct table config URI for table {}", _tableName);
- throw new RuntimeException(e);
- }
+ void pushSegments(FileSystem fileSystem, List<Path> tarFilePaths);
- // Return the first table config it can retrieve
- for (URI uri : tableConfigURIs) {
- try {
- SimpleHttpResponse response = fileUploadDownloadClient.getTableConfig(uri);
- JsonNode offlineTableConfig = JsonUtils.stringToJsonNode(response.getResponse()).get(OFFLINE);
- if (offlineTableConfig != null) {
- LOGGER.info("Got table config {}", offlineTableConfig);
- return TableConfig.fromJSONConfig(offlineTableConfig);
- }
- } catch (Exception e) {
- LOGGER.warn("Caught exception while trying to get table config for " + _tableName + " " + e);
- }
- }
- LOGGER.error("Could not get table configs from any push locations provided for " + _tableName);
- throw new RuntimeException("Could not get table config for table " + _tableName);
- }
-
- public String getSchema() {
- FileUploadDownloadClient fileUploadDownloadClient = new FileUploadDownloadClient();
- List<URI> schemaURIs = new ArrayList<>();
- try {
- for (PushLocation pushLocation : _pushLocations) {
- schemaURIs.add(FileUploadDownloadClient
- .getRetrieveSchemaHttpURI(pushLocation.getHost(), pushLocation.getPort(), _tableName));
- }
- } catch (URISyntaxException e) {
- LOGGER.error("Could not construct schema URI for table {}", _tableName);
- throw new RuntimeException(e);
- }
-
- for (URI schemaURI : schemaURIs) {
- try {
- SimpleHttpResponse response = fileUploadDownloadClient.getSchema(schemaURI);
- if (response != null) {
- return response.getResponse();
- }
- } catch (Exception e) {
- LOGGER.warn("Caught exception while trying to get schema for " + _tableName + " " + e);
- }
- }
- LOGGER.error("Could not get schema configs for any push locations provided for " + _tableName);
- throw new RuntimeException("Could not get schema for table " + _tableName);
- }
+ void sendSegmentUris(List<String> segmentUris);
}
diff --git a/pinot-hadoop/src/main/java/org/apache/pinot/hadoop/job/DefaultControllerRestApi.java b/pinot-hadoop/src/main/java/org/apache/pinot/hadoop/job/DefaultControllerRestApi.java
new file mode 100644
index 0000000..efc4896
--- /dev/null
+++ b/pinot-hadoop/src/main/java/org/apache/pinot/hadoop/job/DefaultControllerRestApi.java
@@ -0,0 +1,142 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.pinot.hadoop.job;
+
+import com.fasterxml.jackson.databind.JsonNode;
+import com.google.common.base.Preconditions;
+import java.io.IOException;
+import java.io.InputStream;
+import java.util.List;
+import org.apache.hadoop.fs.FileSystem;
+import org.apache.hadoop.fs.Path;
+import org.apache.pinot.common.config.TableConfig;
+import org.apache.pinot.common.data.Schema;
+import org.apache.pinot.common.utils.FileUploadDownloadClient;
+import org.apache.pinot.common.utils.JsonUtils;
+import org.apache.pinot.common.utils.SimpleHttpResponse;
+import org.apache.pinot.hadoop.utils.PushLocation;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+
+public class DefaultControllerRestApi implements ControllerRestApi {
+ private static final Logger LOGGER = LoggerFactory.getLogger(DefaultControllerRestApi.class);
+
+ private final List<PushLocation> _pushLocations;
+ private final String _rawTableName;
+ private final FileUploadDownloadClient _fileUploadDownloadClient = new FileUploadDownloadClient();
+
+ private static final String OFFLINE = "OFFLINE";
+
+ public DefaultControllerRestApi(List<PushLocation> pushLocations, String rawTableName) {
+ LOGGER.info("Push locations are: {} for table: {}", pushLocations, rawTableName);
+ _pushLocations = pushLocations;
+ _rawTableName = rawTableName;
+ }
+
+ @Override
+ public TableConfig getTableConfig() {
+ for (PushLocation pushLocation : _pushLocations) {
+ try {
+ SimpleHttpResponse response = _fileUploadDownloadClient.sendGetRequest(FileUploadDownloadClient
+ .getRetrieveTableConfigHttpURI(pushLocation.getHost(), pushLocation.getPort(), _rawTableName));
+ JsonNode offlineJsonTableConfig = JsonUtils.stringToJsonNode(response.getResponse()).get(OFFLINE);
+ if (offlineJsonTableConfig != null) {
+ TableConfig offlineTableConfig = TableConfig.fromJSONConfig(offlineJsonTableConfig);
+ LOGGER.info("Got table config: {}", offlineTableConfig);
+ return offlineTableConfig;
+ }
+ } catch (Exception e) {
+ LOGGER.warn("Caught exception while fetching table config for table: {} from push location: {}", _rawTableName,
+ pushLocation, e);
+ }
+ }
+ String errorMessage = String
+ .format("Failed to get table config from push locations: %s for table: %s", _pushLocations, _rawTableName);
+ LOGGER.error(errorMessage);
+ throw new RuntimeException(errorMessage);
+ }
+
+ @Override
+ public Schema getSchema() {
+ for (PushLocation pushLocation : _pushLocations) {
+ try {
+ SimpleHttpResponse response = _fileUploadDownloadClient.sendGetRequest(FileUploadDownloadClient
+ .getRetrieveSchemaHttpURI(pushLocation.getHost(), pushLocation.getPort(), _rawTableName));
+ Schema schema = Schema.fromString(response.getResponse());
+ LOGGER.info("Got schema: {}", schema);
+ return schema;
+ } catch (Exception e) {
+ LOGGER.warn("Caught exception while fetching schema for table: {} from push location: {}", _rawTableName,
+ pushLocation, e);
+ }
+ }
+ String errorMessage =
+ String.format("Failed to get schema from push locations: %s for table: %s", _pushLocations, _rawTableName);
+ LOGGER.error(errorMessage);
+ throw new RuntimeException(errorMessage);
+ }
+
+ @Override
+ public void pushSegments(FileSystem fileSystem, List<Path> tarFilePaths) {
+ LOGGER.info("Start pushing segments: {} to locations: {}", tarFilePaths, _pushLocations);
+ for (Path tarFilePath : tarFilePaths) {
+ String fileName = tarFilePath.getName();
+ Preconditions.checkArgument(fileName.endsWith(JobConfigConstants.TAR_GZ_FILE_EXT));
+ String segmentName = fileName.substring(0, fileName.length() - JobConfigConstants.TAR_GZ_FILE_EXT.length());
+ for (PushLocation pushLocation : _pushLocations) {
+ LOGGER.info("Pushing segment: {} to location: {}", segmentName, pushLocation);
+ try (InputStream inputStream = fileSystem.open(tarFilePath)) {
+ SimpleHttpResponse response = _fileUploadDownloadClient.uploadSegment(
+ FileUploadDownloadClient.getUploadSegmentHttpURI(pushLocation.getHost(), pushLocation.getPort()),
+ segmentName, inputStream);
+ LOGGER.info("Response {}: {}", response.getStatusCode(), response.getResponse());
+ } catch (Exception e) {
+ LOGGER.error("Caught exception while pushing segment: {} to location: {}", segmentName, pushLocation, e);
+ throw new RuntimeException(e);
+ }
+ }
+ }
+ }
+
+ @Override
+ public void sendSegmentUris(List<String> segmentUris) {
+ LOGGER.info("Start sending segment URIs: {} to locations: {}", segmentUris, _pushLocations);
+ for (String segmentUri : segmentUris) {
+ for (PushLocation pushLocation : _pushLocations) {
+ LOGGER.info("Sending segment URI: {} to location: {}", segmentUri, pushLocation);
+ try {
+ SimpleHttpResponse response = _fileUploadDownloadClient.sendSegmentUri(
+ FileUploadDownloadClient.getUploadSegmentHttpURI(pushLocation.getHost(), pushLocation.getPort()),
+ segmentUri);
+ LOGGER.info("Response {}: {}", response.getStatusCode(), response.getResponse());
+ } catch (Exception e) {
+ LOGGER.error("Caught exception while sending segment URI: {} to location: {}", segmentUri, pushLocation, e);
+ throw new RuntimeException(e);
+ }
+ }
+ }
+ }
+
+ @Override
+ public void close()
+ throws IOException {
+ _fileUploadDownloadClient.close();
+ }
+}
diff --git a/pinot-hadoop/src/main/java/org/apache/pinot/hadoop/job/JobConfigConstants.java b/pinot-hadoop/src/main/java/org/apache/pinot/hadoop/job/JobConfigConstants.java
index 9f55a1e..c87378a 100644
--- a/pinot-hadoop/src/main/java/org/apache/pinot/hadoop/job/JobConfigConstants.java
+++ b/pinot-hadoop/src/main/java/org/apache/pinot/hadoop/job/JobConfigConstants.java
@@ -21,17 +21,17 @@ package org.apache.pinot.hadoop.job;
public class JobConfigConstants {
public static final String PATH_TO_INPUT = "path.to.input";
public static final String PATH_TO_OUTPUT = "path.to.output";
+ public static final String PATH_TO_DEPS_JAR = "path.to.deps.jar";
public static final String PATH_TO_READER_CONFIG = "path.to.reader.config";
// Leave this for backward compatibility. We prefer to use the schema fetched from the controller.
public static final String PATH_TO_SCHEMA = "path.to.schema";
- public static final String TARGZ = ".tar.gz";
+ public static final String SEGMENT_TAR_DIR = "segmentTar";
+ public static final String TAR_GZ_FILE_EXT = ".tar.gz";
- public static final String TIME_COLUMN_NAME = "table.time.column.name";
- public static final String TIME_COLUMN_TYPE = "table.time.column.type";
- public static final String TABLE_PUSH_TYPE = "table.push.type";
public static final String SCHEMA = "data.schema";
public static final String SEGMENT_TABLE_NAME = "segment.table.name";
+ public static final String SEGMENT_NAME_POSTFIX = "segment.name.postfix";
public static final String PUSH_TO_HOSTS = "push.to.hosts";
public static final String PUSH_TO_PORT = "push.to.port";
diff --git a/pinot-hadoop/src/main/java/org/apache/pinot/hadoop/job/SegmentCreationJob.java b/pinot-hadoop/src/main/java/org/apache/pinot/hadoop/job/SegmentCreationJob.java
index 7570600..7876563 100644
--- a/pinot-hadoop/src/main/java/org/apache/pinot/hadoop/job/SegmentCreationJob.java
+++ b/pinot-hadoop/src/main/java/org/apache/pinot/hadoop/job/SegmentCreationJob.java
@@ -18,15 +18,17 @@
*/
package org.apache.pinot.hadoop.job;
-import java.io.File;
+import com.google.common.base.Preconditions;
+import java.io.DataOutputStream;
import java.io.IOException;
-import java.util.ArrayList;
+import java.io.InputStream;
import java.util.List;
+import java.util.Map;
import java.util.Properties;
import java.util.UUID;
+import javax.annotation.Nullable;
+import org.apache.commons.lang3.StringUtils;
import org.apache.hadoop.conf.Configuration;
-import org.apache.hadoop.conf.Configured;
-import org.apache.hadoop.fs.FSDataOutputStream;
import org.apache.hadoop.fs.FileStatus;
import org.apache.hadoop.fs.FileSystem;
import org.apache.hadoop.fs.Path;
@@ -35,169 +37,131 @@ import org.apache.hadoop.io.LongWritable;
import org.apache.hadoop.io.Text;
import org.apache.hadoop.mapred.JobContext;
import org.apache.hadoop.mapreduce.Job;
+import org.apache.hadoop.mapreduce.Mapper;
import org.apache.hadoop.mapreduce.lib.input.FileInputFormat;
import org.apache.hadoop.mapreduce.lib.input.TextInputFormat;
import org.apache.hadoop.mapreduce.lib.output.FileOutputFormat;
import org.apache.hadoop.mapreduce.lib.output.TextOutputFormat;
-import org.apache.pinot.common.Utils;
import org.apache.pinot.common.config.SegmentsValidationAndRetentionConfig;
import org.apache.pinot.common.config.TableConfig;
import org.apache.pinot.common.data.Schema;
-import org.apache.pinot.hadoop.job.mapper.HadoopSegmentCreationMapReduceJob;
+import org.apache.pinot.common.utils.StringUtil;
+import org.apache.pinot.hadoop.job.mapper.SegmentCreationMapper;
import org.apache.pinot.hadoop.utils.PushLocation;
-import org.slf4j.Logger;
-import org.slf4j.LoggerFactory;
-public class SegmentCreationJob extends Configured {
- private static final Logger LOGGER = LoggerFactory.getLogger(SegmentCreationJob.class);
+public class SegmentCreationJob extends BaseSegmentJob {
+ protected static final String APPEND = "APPEND";
- private static final String PATH_TO_DEPS_JAR = "path.to.deps.jar";
- private static final String APPEND = "APPEND";
+ protected final Path _inputPattern;
+ protected final Path _outputDir;
+ protected final Path _stagingDir;
+ protected final String _rawTableName;
- private final String _jobName;
- private final Properties _properties;
+ // Optional
+ protected final Path _depsJarDir;
+ protected final Path _schemaFile;
+ protected final String _defaultPermissionsMask;
+ protected final List<PushLocation> _pushLocations;
- private final String _inputSegmentDir;
- private final String _stagingDir;
- private final Schema _dataSchema;
- private final String _depsJarPath;
- private final String _outputDir;
- private final String _tableName;
+ protected FileSystem _fileSystem;
- private final String _readerConfigFile;
+ public SegmentCreationJob(Properties properties) {
+ super(properties);
+ _conf.set("mapreduce.job.user.classpath.first", "true");
- private final String _defaultPermissionsMask;
+ _inputPattern = Preconditions.checkNotNull(getPathFromProperty(JobConfigConstants.PATH_TO_INPUT));
+ _outputDir = Preconditions.checkNotNull(getPathFromProperty(JobConfigConstants.PATH_TO_OUTPUT));
+ _stagingDir = new Path(_outputDir, UUID.randomUUID().toString());
+ _rawTableName = Preconditions.checkNotNull(_properties.getProperty(JobConfigConstants.SEGMENT_TABLE_NAME));
- private String[] _hosts;
- private int _port;
+ // Optional
+ _depsJarDir = getPathFromProperty(JobConfigConstants.PATH_TO_DEPS_JAR);
+ _schemaFile = getPathFromProperty(JobConfigConstants.PATH_TO_SCHEMA);
+ _defaultPermissionsMask = _properties.getProperty(JobConfigConstants.DEFAULT_PERMISSIONS_MASK);
- public SegmentCreationJob(String jobName, Properties properties)
- throws Exception {
- super(new Configuration());
- getConf().set("mapreduce.job.user.classpath.first", "true");
- _jobName = jobName;
- _properties = properties;
-
- _inputSegmentDir = _properties.getProperty(JobConfigConstants.PATH_TO_INPUT);
- String schemaFilePath = _properties.getProperty(JobConfigConstants.PATH_TO_SCHEMA);
- _outputDir = getOutputDir();
- _stagingDir = new File(_outputDir, UUID.randomUUID().toString()).getAbsolutePath();
- _depsJarPath = _properties.getProperty(PATH_TO_DEPS_JAR, null);
- _readerConfigFile = _properties.getProperty(JobConfigConstants.PATH_TO_READER_CONFIG);
- String hostsString = _properties.getProperty(JobConfigConstants.PUSH_TO_HOSTS);
- String portString = _properties.getProperty(JobConfigConstants.PUSH_TO_PORT);
-
- _defaultPermissionsMask = _properties.getProperty(JobConfigConstants.DEFAULT_PERMISSIONS_MASK, null);
- LOGGER.info("Default permissions mask is {}", _defaultPermissionsMask);
-
- // For backwards compatibility, we want to allow users to create segments without setting push location parameters
- // in their creation jobs.
- if (hostsString != null && portString != null) {
- _hosts = hostsString.split(",");
- _port = Integer.parseInt(portString);
- }
- _tableName = _properties.getProperty(JobConfigConstants.SEGMENT_TABLE_NAME);
-
- Utils.logVersions();
-
- LOGGER.info("*********************************************************************");
- LOGGER.info("path.to.input: {}", _inputSegmentDir);
- LOGGER.info("path.to.deps.jar: {}", _depsJarPath);
- LOGGER.info("path.to.output: {}", _outputDir);
- LOGGER.info("path.to.schema: {}", schemaFilePath);
- if (_readerConfigFile != null) {
- LOGGER.info("path.to.reader.config: {}", _readerConfigFile);
- }
- if (schemaFilePath != null) {
- _dataSchema = Schema.fromFile(new File(schemaFilePath));
+ // Optional push location and table parameters. If set, will use the table config and schema from the push hosts.
+ String pushHostsString = _properties.getProperty(JobConfigConstants.PUSH_TO_HOSTS);
+ String pushPortString = _properties.getProperty(JobConfigConstants.PUSH_TO_PORT);
+ if (pushHostsString != null && pushPortString != null) {
+ _pushLocations =
+ PushLocation.getPushLocations(StringUtils.split(pushHostsString, ','), Integer.parseInt(pushPortString));
} else {
- _dataSchema = null;
+ _pushLocations = null;
}
- LOGGER.info("schema: {}", _dataSchema);
- LOGGER.info("*********************************************************************");
- }
-
- protected String getOutputDir() {
- return _properties.getProperty(JobConfigConstants.PATH_TO_OUTPUT);
- }
- protected String getInputDir() {
- return _inputSegmentDir;
+ _logger.info("*********************************************************************");
+ _logger.info("Input Pattern: {}", _inputPattern);
+ _logger.info("Output Directory: {}", _outputDir);
+ _logger.info("Staging Directory: {}", _stagingDir);
+ _logger.info("Raw Table Name: {}", _rawTableName);
+ _logger.info("Dependencies Directory: {}", _depsJarDir);
+ _logger.info("Schema File: {}", _schemaFile);
+ _logger.info("Default Permissions Mask: {}", _defaultPermissionsMask);
+ _logger.info("Push Locations: {}", _pushLocations);
+ _logger.info("*********************************************************************");
}
- protected void setOutputPath(Configuration configuration) {
-
+ @Override
+ protected boolean isDataFile(String fileName) {
+ return fileName.endsWith(".avro") || fileName.endsWith(".csv") || fileName.endsWith(".json") || fileName
+ .endsWith(".thrift");
}
public void run()
throws Exception {
- LOGGER.info("Starting {}", getClass().getSimpleName());
-
- FileSystem fs = FileSystem.get(getConf());
- Path inputPathPattern = new Path(_inputSegmentDir);
- Path stagingDir = new Path(_stagingDir);
- Path outputDir = new Path(_outputDir);
-
- if (fs.exists(outputDir)) {
- LOGGER.warn("Found the output folder {}, deleting it", _outputDir);
- fs.delete(outputDir, true);
- }
- fs.mkdirs(outputDir);
-
- if (fs.exists(stagingDir)) {
- LOGGER.warn("Found the temp folder {}, deleting it", stagingDir);
- fs.delete(stagingDir, true);
+ _logger.info("Starting {}", getClass().getSimpleName());
+
+ // Initialize all directories
+ _fileSystem = FileSystem.get(_conf);
+ mkdirs(_outputDir);
+ mkdirs(_stagingDir);
+ Path stagingInputDir = new Path(_stagingDir, "input");
+ mkdirs(stagingInputDir);
+
+ // Gather all data files
+ List<Path> dataFilePaths = getDataFilePaths(_inputPattern);
+ int numDataFiles = dataFilePaths.size();
+ if (numDataFiles == 0) {
+ String errorMessage = "No data file founded with pattern: " + _inputPattern;
+ _logger.error(errorMessage);
+ throw new RuntimeException(errorMessage);
+ } else {
+ _logger.info("Creating segments with data files: {}", dataFilePaths);
+ for (int i = 0; i < numDataFiles; i++) {
+ Path dataFilePath = dataFilePaths.get(i);
+ try (DataOutputStream dataOutputStream = _fileSystem.create(new Path(stagingInputDir, Integer.toString(i)))) {
+ dataOutputStream.write(StringUtil.encodeUtf8(dataFilePath.toString() + " " + i));
+ dataOutputStream.flush();
+ }
+ }
}
- fs.mkdirs(stagingDir);
- Path stagingDirInputPath = new Path(_stagingDir + "/input/");
- fs.mkdirs(stagingDirInputPath);
- LOGGER.info("Staging dir input path is {}", stagingDirInputPath);
+ // Set up the job
+ Job job = Job.getInstance(_conf);
- if (_defaultPermissionsMask != null) {
- FsPermission umask = new FsPermission(_defaultPermissionsMask);
- FsPermission permission = FsPermission.getDirDefault().applyUMask(umask);
-
- setDirPermission(stagingDir, permission, fs);
- setDirPermission(stagingDirInputPath, permission, fs);
- setDirPermission(outputDir, permission, fs);
+ Configuration jobConf = job.getConfiguration();
+ String hadoopTokenFileLocation = System.getenv("HADOOP_TOKEN_FILE_LOCATION");
+ if (hadoopTokenFileLocation != null) {
+ jobConf.set("mapreduce.job.credentials.binary", hadoopTokenFileLocation);
}
+ jobConf.setInt(JobContext.NUM_MAPS, numDataFiles);
- List<FileStatus> inputDataFiles = new ArrayList<FileStatus>();
- FileStatus[] fileStatusArr = fs.globStatus(inputPathPattern);
- for (FileStatus fileStatus : fileStatusArr) {
- inputDataFiles.addAll(getDataFilesFromPath(fs, fileStatus.getPath()));
- }
- if (inputDataFiles.isEmpty()) {
- LOGGER.error("No Input paths {}", inputPathPattern);
- throw new RuntimeException("No input files " + inputPathPattern);
+ // Set table config and schema
+ TableConfig tableConfig = getTableConfig();
+ if (tableConfig != null) {
+ validateTableConfig(tableConfig);
+ jobConf.set(JobConfigConstants.TABLE_CONFIG, tableConfig.toJSONConfigString());
}
+ jobConf.set(JobConfigConstants.SCHEMA, getSchema().toSingleLineJsonString());
- for (int seqId = 0; seqId < inputDataFiles.size(); ++seqId) {
- FileStatus file = inputDataFiles.get(seqId);
- String completeFilePath = " " + file.getPath().toString() + " " + seqId;
- Path newOutPutFile = new Path(
- (_stagingDir + "/input/" + file.getPath().toString().replace('.', '_').replace('/', '_').replace(':', '_')
- + ".txt"));
- FSDataOutputStream stream = fs.create(newOutPutFile);
- stream.writeUTF(completeFilePath);
- stream.flush();
- stream.close();
+ // Set additional configurations
+ for (Map.Entry<Object, Object> entry : _properties.entrySet()) {
+ jobConf.set(entry.getKey().toString(), entry.getValue().toString());
}
- Job job = Job.getInstance(getConf());
-
- setAdditionalJobProperties(job);
-
- job.setJarByClass(SegmentCreationJob.class);
- job.setJobName(_jobName);
-
- setMapperClass(job);
-
- if (System.getenv("HADOOP_TOKEN_FILE_LOCATION") != null) {
- job.getConfiguration().set("mapreduce.job.credentials.binary", System.getenv("HADOOP_TOKEN_FILE_LOCATION"));
- }
+ job.setMapperClass(getMapperClass());
+ job.setNumReduceTasks(0);
job.setInputFormatClass(TextInputFormat.class);
job.setOutputFormatClass(TextOutputFormat.class);
@@ -205,158 +169,129 @@ public class SegmentCreationJob extends Configured {
job.setMapOutputKeyClass(LongWritable.class);
job.setMapOutputValueClass(Text.class);
- FileInputFormat.addInputPath(job, new Path(_stagingDir + "/input/"));
- FileOutputFormat.setOutputPath(job, new Path(_stagingDir + "/output/"));
+ FileInputFormat.addInputPath(job, stagingInputDir);
+ FileOutputFormat.setOutputPath(job, new Path(_stagingDir, "output"));
- job.getConfiguration().setInt(JobContext.NUM_MAPS, inputDataFiles.size());
- if (_dataSchema != null) {
- job.getConfiguration().set(JobConfigConstants.SCHEMA, _dataSchema.toSingleLineJsonString());
- }
- setOutputPath(job.getConfiguration());
-
- job.setNumReduceTasks(0);
- for (Object key : _properties.keySet()) {
- job.getConfiguration().set(key.toString(), _properties.getProperty(key.toString()));
- }
+ addDepsJarToDistributedCache(job);
+ addAdditionalJobProperties(job);
- if (_depsJarPath != null && _depsJarPath.length() > 0) {
- addDepsJarToDistributedCache(new Path(_depsJarPath), job);
- }
-
- // Submit the job for execution.
+ // Submit the job
job.waitForCompletion(true);
if (!job.isSuccessful()) {
- throw new RuntimeException("Job failed : " + job);
+ throw new RuntimeException("Job failed: " + job);
}
- moveToOutputDirectory(fs);
+ moveSegmentsToOutputDir();
- // Delete temporary directory.
- LOGGER.info("Cleanup the working directory.");
- LOGGER.info("Deleting the dir: {}", _stagingDir);
- fs.delete(new Path(_stagingDir), true);
+ // Delete the staging directory
+ _logger.info("Deleting the staging directory: {}", _stagingDir);
+ _fileSystem.delete(_stagingDir, true);
}
- private void setDirPermission(Path directory, FsPermission permission, FileSystem fs)
+ protected void mkdirs(Path dirPath)
throws IOException {
- fs.setPermission(directory, permission);
- LOGGER.info("Setting permissions '{}' for directory: '{}'", permission, directory);
- }
-
- protected void setAdditionalJobProperties(Job job)
- throws Exception {
- // Check host and port information before set table config dependent properties
- if (_hosts == null || _port == 0) {
- LOGGER.warn("Unable to set TableConfig-dependent properties. Please set host {} ({}) and port {} ({})",
- JobConfigConstants.PUSH_TO_HOSTS, _port, JobConfigConstants.PUSH_TO_PORT, _port);
- return;
- }
-
- // Add push locations
- List<PushLocation> pushLocations = new ArrayList<>();
- for (String host : _hosts) {
- pushLocations.add(new PushLocation.PushLocationBuilder().setHost(host).setPort(_port).build());
+ if (_fileSystem.exists(dirPath)) {
+ _logger.warn("Deleting existing file: {}", dirPath);
+ _fileSystem.delete(dirPath, true);
}
+ _logger.info("Making directory: {}", dirPath);
+ _fileSystem.mkdirs(dirPath);
+ setDirPermission(dirPath);
+ }
- ControllerRestApi controllerRestApiObject = new ControllerRestApi(pushLocations, _tableName);
-
- // Fetch table config from controller API
- TableConfig tableConfig = controllerRestApiObject.getTableConfig();
- job.getConfiguration().set(JobConfigConstants.TABLE_CONFIG, tableConfig.toJSONConfigString());
-
- SegmentsValidationAndRetentionConfig validationConfig = tableConfig.getValidationConfig();
- if (validationConfig == null) {
- throw new RuntimeException(
- "Segment validation config should not be null. Please configure them correctly in the table config");
+ protected void setDirPermission(Path dirPath)
+ throws IOException {
+ if (_defaultPermissionsMask != null) {
+ FsPermission permission = FsPermission.getDirDefault().applyUMask(new FsPermission(_defaultPermissionsMask));
+ _logger.info("Setting permission: {} to directory: {}", permission, dirPath);
+ _fileSystem.setPermission(dirPath, permission);
}
+ }
- // Check if pushType is set correctly
- String segmentPushType = validationConfig.getSegmentPushType();
- if (segmentPushType == null) {
- throw new RuntimeException("Segment push type is null. Please configure the value correctly in the table config. "
- + "We support APPEND or REFRESH for push types.");
+ @Nullable
+ protected TableConfig getTableConfig()
+ throws IOException {
+ if (_pushLocations != null) {
+ try (ControllerRestApi controllerRestApi = getControllerRestApi()) {
+ return controllerRestApi.getTableConfig();
+ }
+ } else {
+ return null;
}
+ }
- // Update table push type
- job.getConfiguration().set(JobConfigConstants.TABLE_PUSH_TYPE, segmentPushType);
-
- if (segmentPushType.equalsIgnoreCase(APPEND)) {
- // For append use cases, timeColumnName and timeType must be set
- String timeColumnName = validationConfig.getTimeColumnName();
- String timeColumnType = validationConfig.getTimeType();
- if (timeColumnName == null || timeColumnType == null) {
- throw new RuntimeException("Time column or time column type is null. Both are required for APPEND use case. "
- + "Please configure them correctly in the table config.");
+ protected Schema getSchema()
+ throws IOException {
+ if (_pushLocations != null) {
+ try (ControllerRestApi controllerRestApi = getControllerRestApi()) {
+ return controllerRestApi.getSchema();
}
- LOGGER.info("Time column: {}, time column type: {}", timeColumnName, timeColumnType);
- job.getConfiguration().set(JobConfigConstants.TIME_COLUMN_NAME, timeColumnName);
- job.getConfiguration().set(JobConfigConstants.TIME_COLUMN_TYPE, timeColumnType);
} else {
- LOGGER.info("Refresh use case. Not setting timeColumnName and timeColumnType for table: " + _tableName);
+ try (InputStream inputStream = _fileSystem.open(_schemaFile)) {
+ return Schema.fromInputSteam(inputStream);
+ }
}
+ }
- // Fetch schema from controller API and set it to the job configuration
- String schema = controllerRestApiObject.getSchema();
- LOGGER.info("Setting schema for tableName {} to {}", _tableName, schema);
- job.getConfiguration().set(JobConfigConstants.SCHEMA, schema);
+ protected DefaultControllerRestApi getControllerRestApi() {
+ return new DefaultControllerRestApi(_pushLocations, _rawTableName);
}
- protected void moveToOutputDirectory(FileSystem fs)
- throws Exception {
- LOGGER.info("Moving Segment Tar files from {} to: {}", _stagingDir + "/output/segmentTar", _outputDir);
- FileStatus[] segmentArr = fs.listStatus(new Path(_stagingDir + "/output/segmentTar"));
- for (FileStatus segment : segmentArr) {
- fs.rename(segment.getPath(), new Path(_outputDir, segment.getPath().getName()));
+ protected void validateTableConfig(TableConfig tableConfig) {
+ SegmentsValidationAndRetentionConfig validationConfig = tableConfig.getValidationConfig();
+
+ // For APPEND use case, timeColumnName and timeType must be set
+ if (APPEND.equalsIgnoreCase(validationConfig.getSegmentPushType())) {
+ Preconditions.checkState(validationConfig.getTimeColumnName() != null && validationConfig.getTimeType() != null,
+ "For APPEND use case, time column and type must be set");
}
}
- protected Job setMapperClass(Job job) {
- job.setMapperClass(HadoopSegmentCreationMapReduceJob.HadoopSegmentCreationMapper.class);
- return job;
+ /**
+ * Can be override to plug in custom mapper.
+ */
+ protected Class<? extends Mapper<LongWritable, Text, LongWritable, Text>> getMapperClass() {
+ return SegmentCreationMapper.class;
+ }
+
+ protected void addDepsJarToDistributedCache(Job job)
+ throws IOException {
+ if (_depsJarDir != null) {
+ addDepsJarToDistributedCacheHelper(job, _depsJarDir);
+ }
}
- private void addDepsJarToDistributedCache(Path path, Job job)
+ protected void addDepsJarToDistributedCacheHelper(Job job, Path depsJarDir)
throws IOException {
- LOGGER.info("Trying to add all the deps jar files from directory: {}", path);
- FileSystem fs = FileSystem.get(getConf());
- FileStatus[] fileStatusArr = fs.listStatus(path);
- for (FileStatus fileStatus : fileStatusArr) {
+ FileStatus[] fileStatuses = _fileSystem.listStatus(depsJarDir);
+ for (FileStatus fileStatus : fileStatuses) {
if (fileStatus.isDirectory()) {
- addDepsJarToDistributedCache(fileStatus.getPath(), job);
+ addDepsJarToDistributedCacheHelper(job, fileStatus.getPath());
} else {
Path depJarPath = fileStatus.getPath();
if (depJarPath.getName().endsWith(".jar")) {
- LOGGER.info("Adding deps jar files: {}", path);
- job.addCacheArchive(path.toUri());
+ _logger.info("Adding deps jar: {} to distributed cache", depJarPath);
+ job.addCacheArchive(depJarPath.toUri());
}
}
}
}
- private ArrayList<FileStatus> getDataFilesFromPath(FileSystem fs, Path inBaseDir)
+ /**
+ * Can be override to set additional job properties.
+ */
+ @SuppressWarnings("unused")
+ protected void addAdditionalJobProperties(Job job) {
+ }
+
+ protected void moveSegmentsToOutputDir()
throws IOException {
- ArrayList<FileStatus> dataFileStatusList = new ArrayList<FileStatus>();
- FileStatus[] fileStatusArr = fs.listStatus(inBaseDir);
- for (FileStatus fileStatus : fileStatusArr) {
- if (fileStatus.isDirectory()) {
- LOGGER.info("Trying to add all the data files from directory: {}", fileStatus.getPath());
- dataFileStatusList.addAll(getDataFilesFromPath(fs, fileStatus.getPath()));
- } else {
- String fileName = fileStatus.getPath().getName();
- if (fileName.endsWith(".avro")) {
- LOGGER.info("Adding avro files: {}", fileStatus.getPath());
- dataFileStatusList.add(fileStatus);
- }
- if (fileName.endsWith(".csv")) {
- LOGGER.info("Adding csv files: {}", fileStatus.getPath());
- dataFileStatusList.add(fileStatus);
- }
- if (fileName.endsWith(".json")) {
- LOGGER.info("Adding json files: {}", fileStatus.getPath());
- dataFileStatusList.add(fileStatus);
- }
- }
+ Path segmentTarDir = new Path(new Path(_stagingDir, "output"), JobConfigConstants.SEGMENT_TAR_DIR);
+ for (FileStatus segmentTarStatus : _fileSystem.listStatus(segmentTarDir)) {
+ Path segmentTarPath = segmentTarStatus.getPath();
+ Path dest = new Path(_outputDir, segmentTarPath.getName());
+ _logger.info("Moving segment tar file from: {} to: {}", segmentTarPath, dest);
+ _fileSystem.rename(segmentTarPath, dest);
}
- return dataFileStatusList;
}
}
diff --git a/pinot-hadoop/src/main/java/org/apache/pinot/hadoop/job/SegmentTarPushJob.java b/pinot-hadoop/src/main/java/org/apache/pinot/hadoop/job/SegmentTarPushJob.java
index f4890ab..7c71fd8 100644
--- a/pinot-hadoop/src/main/java/org/apache/pinot/hadoop/job/SegmentTarPushJob.java
+++ b/pinot-hadoop/src/main/java/org/apache/pinot/hadoop/job/SegmentTarPushJob.java
@@ -18,82 +18,41 @@
*/
package org.apache.pinot.hadoop.job;
-import java.io.InputStream;
+import com.google.common.base.Preconditions;
+import java.util.List;
import java.util.Properties;
-import org.apache.hadoop.conf.Configuration;
-import org.apache.hadoop.conf.Configured;
-import org.apache.hadoop.fs.FileStatus;
+import org.apache.commons.lang3.StringUtils;
import org.apache.hadoop.fs.FileSystem;
import org.apache.hadoop.fs.Path;
-import org.apache.pinot.common.utils.FileUploadDownloadClient;
-import org.apache.pinot.common.utils.SimpleHttpResponse;
-import org.slf4j.Logger;
-import org.slf4j.LoggerFactory;
+import org.apache.pinot.hadoop.utils.PushLocation;
-public class SegmentTarPushJob extends Configured {
+public class SegmentTarPushJob extends BaseSegmentJob {
+ private final Path _segmentPattern;
+ private final List<PushLocation> _pushLocations;
- private String _segmentPath;
- private String[] _hosts;
- private int _port;
-
- private static final Logger LOGGER = LoggerFactory.getLogger(SegmentTarPushJob.class);
-
- public SegmentTarPushJob(String name, Properties properties) {
- super(new Configuration());
- _segmentPath = properties.getProperty(JobConfigConstants.PATH_TO_OUTPUT) + "/";
- _hosts = properties.getProperty(JobConfigConstants.PUSH_TO_HOSTS).split(",");
- _port = Integer.parseInt(properties.getProperty(JobConfigConstants.PUSH_TO_PORT));
+ public SegmentTarPushJob(Properties properties) {
+ super(properties);
+ _segmentPattern = Preconditions.checkNotNull(getPathFromProperty(JobConfigConstants.PATH_TO_OUTPUT));
+ String[] hosts = StringUtils.split(properties.getProperty(JobConfigConstants.PUSH_TO_HOSTS), ',');
+ int port = Integer.parseInt(properties.getProperty(JobConfigConstants.PUSH_TO_PORT));
+ _pushLocations = PushLocation.getPushLocations(hosts, port);
}
- public void run()
- throws Exception {
- Configuration conf = new Configuration();
- FileSystem fs = FileSystem.get(conf);
- Path path = new Path(_segmentPath);
- FileStatus[] fileStatusArr = fs.globStatus(path);
- for (FileStatus fileStatus : fileStatusArr) {
- if (fileStatus.isDirectory()) {
- pushDir(fs, fileStatus.getPath());
- } else {
- pushOneTarFile(fs, fileStatus.getPath());
- }
- }
+ @Override
+ protected boolean isDataFile(String fileName) {
+ return fileName.endsWith(JobConfigConstants.TAR_GZ_FILE_EXT);
}
- public void pushDir(FileSystem fs, Path path)
+ public void run()
throws Exception {
- LOGGER.info("******** Now uploading segments tar from dir: {}", path);
- FileStatus[] fileStatusArr = fs.listStatus(new Path(path.toString() + "/"));
- for (FileStatus fileStatus : fileStatusArr) {
- if (fileStatus.isDirectory()) {
- pushDir(fs, fileStatus.getPath());
- } else {
- pushOneTarFile(fs, fileStatus.getPath());
- }
+ FileSystem fileSystem = FileSystem.get(_conf);
+ try (ControllerRestApi controllerRestApi = getControllerRestApi()) {
+ controllerRestApi.pushSegments(fileSystem, getDataFilePaths(_segmentPattern));
}
}
- public void pushOneTarFile(FileSystem fs, Path path)
- throws Exception {
- String fileName = path.getName();
- if (!fileName.endsWith(".tar.gz")) {
- return;
- }
- try (FileUploadDownloadClient fileUploadDownloadClient = new FileUploadDownloadClient()) {
- for (String host : _hosts) {
- try (InputStream inputStream = fs.open(path)) {
- fileName = fileName.split(JobConfigConstants.TARGZ)[0];
- LOGGER.info("******** Uploading file: {} to Host: {} and Port: {} *******", fileName, host, _port);
- SimpleHttpResponse response = fileUploadDownloadClient
- .uploadSegment(FileUploadDownloadClient.getUploadSegmentHttpURI(host, _port), fileName, inputStream);
- LOGGER.info("Response {}: {}", response.getStatusCode(), response.getResponse());
- } catch (Exception e) {
- LOGGER.error("******** Error Uploading file: {} to Host: {} and Port: {} *******", fileName, host, _port);
- LOGGER.error("Caught exception during upload", e);
- throw e;
- }
- }
- }
+ protected ControllerRestApi getControllerRestApi() {
+ return new DefaultControllerRestApi(_pushLocations, null);
}
}
diff --git a/pinot-hadoop/src/main/java/org/apache/pinot/hadoop/job/SegmentUriPushJob.java b/pinot-hadoop/src/main/java/org/apache/pinot/hadoop/job/SegmentUriPushJob.java
index 59c66b7..aa81ba8 100644
--- a/pinot-hadoop/src/main/java/org/apache/pinot/hadoop/job/SegmentUriPushJob.java
+++ b/pinot-hadoop/src/main/java/org/apache/pinot/hadoop/job/SegmentUriPushJob.java
@@ -18,87 +18,49 @@
*/
package org.apache.pinot.hadoop.job;
+import com.google.common.base.Preconditions;
+import java.util.ArrayList;
+import java.util.List;
import java.util.Properties;
-import org.apache.hadoop.conf.Configuration;
-import org.apache.hadoop.conf.Configured;
-import org.apache.hadoop.fs.FileStatus;
-import org.apache.hadoop.fs.FileSystem;
+import org.apache.commons.lang3.StringUtils;
import org.apache.hadoop.fs.Path;
-import org.apache.pinot.common.utils.FileUploadDownloadClient;
-import org.apache.pinot.common.utils.SimpleHttpResponse;
-import org.slf4j.Logger;
-import org.slf4j.LoggerFactory;
+import org.apache.pinot.hadoop.utils.PushLocation;
-public class SegmentUriPushJob extends Configured {
+public class SegmentUriPushJob extends BaseSegmentJob {
+ private final String _segmentUriPrefix;
+ private final String _segmentUriSuffix;
+ private final Path _segmentPattern;
+ private final List<PushLocation> _pushLocations;
- private String _pushUriPrefix;
- private String _pushUriSuffix;
- private String _segmentPath;
- private String[] _hosts;
- private int _port;
-
- private static final Logger LOGGER = LoggerFactory.getLogger(SegmentUriPushJob.class);
-
- public SegmentUriPushJob(String name, Properties properties) {
- super(new Configuration());
- _pushUriPrefix = properties.getProperty("uri.prefix", "");
- _pushUriSuffix = properties.getProperty("uri.suffix", "");
- _segmentPath = properties.getProperty(JobConfigConstants.PATH_TO_OUTPUT) + "/";
- _hosts = properties.getProperty(JobConfigConstants.PUSH_TO_HOSTS).split(",");
- _port = Integer.parseInt(properties.getProperty(JobConfigConstants.PUSH_TO_PORT));
+ public SegmentUriPushJob(Properties properties) {
+ super(properties);
+ _segmentUriPrefix = properties.getProperty("uri.prefix", "");
+ _segmentUriSuffix = properties.getProperty("uri.suffix", "");
+ _segmentPattern = Preconditions.checkNotNull(getPathFromProperty(JobConfigConstants.PATH_TO_OUTPUT));
+ String[] hosts = StringUtils.split(properties.getProperty(JobConfigConstants.PUSH_TO_HOSTS), ',');
+ int port = Integer.parseInt(properties.getProperty(JobConfigConstants.PUSH_TO_PORT));
+ _pushLocations = PushLocation.getPushLocations(hosts, port);
}
- public void run()
- throws Exception {
- Configuration conf = new Configuration();
- FileSystem fs = FileSystem.get(conf);
- Path path = new Path(_segmentPath);
- FileStatus[] fileStatusArr = fs.globStatus(path);
- for (FileStatus fileStatus : fileStatusArr) {
- if (fileStatus.isDirectory()) {
- pushDir(fs, fileStatus.getPath());
- } else {
- pushOneTarFile(fs, fileStatus.getPath());
- }
- }
+ @Override
+ protected boolean isDataFile(String fileName) {
+ return fileName.endsWith(JobConfigConstants.TAR_GZ_FILE_EXT);
}
- public void pushDir(FileSystem fs, Path path)
+ public void run()
throws Exception {
- LOGGER.info("******** Now uploading segments tar from dir: {}", path);
- FileStatus[] fileStatusArr = fs.listStatus(new Path(path.toString() + "/"));
- for (FileStatus fileStatus : fileStatusArr) {
- if (fileStatus.isDirectory()) {
- pushDir(fs, fileStatus.getPath());
- } else {
- pushOneTarFile(fs, fileStatus.getPath());
+ try (ControllerRestApi controllerRestApi = getControllerRestApi()) {
+ List<Path> tarFilePaths = getDataFilePaths(_segmentPattern);
+ List<String> segmentUris = new ArrayList<>(tarFilePaths.size());
+ for (Path tarFilePath : tarFilePaths) {
+ segmentUris.add(_segmentUriPrefix + tarFilePath.toUri().getRawPath() + _segmentUriSuffix);
}
+ controllerRestApi.sendSegmentUris(segmentUris);
}
}
- public void pushOneTarFile(FileSystem fs, Path path)
- throws Exception {
- String fileName = path.getName();
- if (!fileName.endsWith(JobConfigConstants.TARGZ)) {
- return;
- }
- try (FileUploadDownloadClient fileUploadDownloadClient = new FileUploadDownloadClient()) {
- for (String host : _hosts) {
- String uri = String.format("%s%s%s", _pushUriPrefix, path.toUri().getRawPath(), _pushUriSuffix);
- LOGGER
- .info("******** Uploading file: {} to Host: {} and Port: {} with download uri: {} *******", fileName, host,
- _port, uri);
- try {
- SimpleHttpResponse response = fileUploadDownloadClient
- .sendSegmentUri(FileUploadDownloadClient.getUploadSegmentHttpURI(host, _port), uri);
- LOGGER.info("Response {}: {}", response.getStatusCode(), response.getResponse());
- } catch (Exception e) {
- LOGGER.error("******** Error Uploading file: {} to Host: {} and Port: {} *******", fileName, host, _port);
- LOGGER.error("Caught exception during upload", e);
- throw e;
- }
- }
- }
+ protected ControllerRestApi getControllerRestApi() {
+ return new DefaultControllerRestApi(_pushLocations, null);
}
}
diff --git a/pinot-hadoop/src/main/java/org/apache/pinot/hadoop/job/mapper/HadoopSegmentCreationMapReduceJob.java b/pinot-hadoop/src/main/java/org/apache/pinot/hadoop/job/mapper/HadoopSegmentCreationMapReduceJob.java
deleted file mode 100644
index 128d844..0000000
--- a/pinot-hadoop/src/main/java/org/apache/pinot/hadoop/job/mapper/HadoopSegmentCreationMapReduceJob.java
+++ /dev/null
@@ -1,333 +0,0 @@
-/**
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements. See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership. The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing,
- * software distributed under the License is distributed on an
- * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
- * KIND, either express or implied. See the License for the
- * specific language governing permissions and limitations
- * under the License.
- */
-package org.apache.pinot.hadoop.job.mapper;
-
-import java.io.File;
-import java.io.IOException;
-import java.io.InputStream;
-import org.apache.commons.io.FileUtils;
-import org.apache.hadoop.conf.Configuration;
-import org.apache.hadoop.fs.FileSystem;
-import org.apache.hadoop.fs.Path;
-import org.apache.hadoop.io.LongWritable;
-import org.apache.hadoop.io.Text;
-import org.apache.hadoop.mapreduce.Mapper;
-import org.apache.hadoop.mapreduce.lib.output.FileOutputFormat;
-import org.apache.pinot.common.config.TableConfig;
-import org.apache.pinot.common.data.Schema;
-import org.apache.pinot.common.utils.DataSize;
-import org.apache.pinot.common.utils.JsonUtils;
-import org.apache.pinot.common.utils.TarGzCompressionUtils;
-import org.apache.pinot.core.data.readers.CSVRecordReaderConfig;
-import org.apache.pinot.core.data.readers.FileFormat;
-import org.apache.pinot.core.data.readers.RecordReaderConfig;
-import org.apache.pinot.core.data.readers.ThriftRecordReaderConfig;
-import org.apache.pinot.core.indexsegment.generator.SegmentGeneratorConfig;
-import org.apache.pinot.core.segment.creator.impl.SegmentIndexCreationDriverImpl;
-import org.apache.pinot.hadoop.job.JobConfigConstants;
-import org.slf4j.Logger;
-import org.slf4j.LoggerFactory;
-
-
-public class HadoopSegmentCreationMapReduceJob {
-
- public static class HadoopSegmentCreationMapper extends Mapper<LongWritable, Text, LongWritable, Text> {
- private static Logger LOGGER = LoggerFactory.getLogger(HadoopSegmentCreationMapper.class);
-
- private static final String PINOT_HADOOP_TMP = "pinot_hadoop_tmp";
- private static final String SEGMENT_NAME_POSTFIX = "segment.name.postfix";
- private static final String SEGMENT_TAR = "/segmentTar";
-
- private Configuration _properties;
-
- private String _inputFilePath;
- private String _outputPath;
- private String _tableName;
- private String _postfix;
- private String _readerConfigFile;
-
- // Temporary local disk path for current working directory
- private String _currentDiskWorkDir;
-
- // Temporary hdfs path for segment tar file
- private String _localHdfsSegmentTarPath;
-
- // Temporary local disk path for segment tar file
- private String _localDiskSegmentTarPath;
-
- // Temporary local disk path for output segment directory
- private String _localDiskOutputSegmentDir;
-
- private TableConfig _tableConfig = null;
-
- private FileSystem _fileSystem = null;
-
- @Override
- public void setup(Context context)
- throws IOException, InterruptedException {
- // Compute current working HDFS directory
- Path currentHdfsWorkDir = FileOutputFormat.getWorkOutputPath(context);
- _localHdfsSegmentTarPath = currentHdfsWorkDir + SEGMENT_TAR;
-
- // Compute current working LOCAL DISK directory
- _currentDiskWorkDir = PINOT_HADOOP_TMP;
- _localDiskSegmentTarPath = _currentDiskWorkDir + SEGMENT_TAR;
-
- _fileSystem = FileSystem.get(context.getConfiguration());
-
- // Create directory
- new File(_localDiskSegmentTarPath).mkdirs();
-
- LOGGER.info("*********************************************************************");
- LOGGER.info("Configurations : {}", context.getConfiguration().toString());
- LOGGER.info("*********************************************************************");
- LOGGER.info("Current HDFS working dir(setup) : {}", currentHdfsWorkDir);
- LOGGER.info("Current DISK working dir(setup) : {}", new File(_currentDiskWorkDir).getAbsolutePath());
- LOGGER.info("*********************************************************************");
-
- _properties = context.getConfiguration();
- _outputPath = _properties.get(JobConfigConstants.PATH_TO_OUTPUT);
- _tableName = _properties.get(JobConfigConstants.SEGMENT_TABLE_NAME);
- _postfix = _properties.get(SEGMENT_NAME_POSTFIX, null);
- _readerConfigFile = _properties.get(JobConfigConstants.PATH_TO_READER_CONFIG);
- if (_outputPath == null || _tableName == null) {
- throw new RuntimeException(
- "Missing configs: " + "\n\toutputPath: " + _properties.get(JobConfigConstants.PATH_TO_OUTPUT)
- + "\n\ttableName: " + _properties.get(JobConfigConstants.SEGMENT_TABLE_NAME));
- }
-
- String tableConfigString = _properties.get(JobConfigConstants.TABLE_CONFIG);
- if (tableConfigString != null) {
- try {
- _tableConfig = TableConfig.fromJsonString(tableConfigString);
- } catch (IOException e) {
- // Though we get table config directly from the controller of hosts and port of push location are set,
- // it is possible for the user to pass in a table config as a parameter
- LOGGER.error("Exception when parsing table config: {}", tableConfigString);
- }
- }
- }
-
- protected String getTableName() {
- return _tableName;
- }
-
- @Override
- public void cleanup(Context context)
- throws IOException, InterruptedException {
- File currentDiskWorkDir = new File(_currentDiskWorkDir);
- LOGGER.info("Clean up directory: {}", currentDiskWorkDir.getAbsolutePath());
- FileUtils.deleteQuietly(currentDiskWorkDir);
- }
-
- @Override
- protected void map(LongWritable key, Text value, Context context)
- throws IOException, InterruptedException {
- String line = value.toString();
- String[] lineSplits = line.split(" ");
-
- LOGGER.info("*********************************************************************");
- LOGGER.info("mapper input : {}", value);
- LOGGER.info("PATH_TO_OUTPUT : {}", _outputPath);
- LOGGER.info("TABLE_NAME : {}", _tableName);
- LOGGER.info("num lines : {}", lineSplits.length);
-
- for (String split : lineSplits) {
- LOGGER.info("Command line : {}", split);
- }
-
- LOGGER.info("Current DISK working dir(mapper): {}", new File(_currentDiskWorkDir).getAbsolutePath());
- LOGGER.info("*********************************************************************");
-
- if (lineSplits.length != 3) {
- throw new RuntimeException("Input to the mapper is malformed, please contact the pinot team");
- }
- _inputFilePath = lineSplits[1].trim();
-
- String segmentDirectoryName = _tableName + "_" + Integer.parseInt(lineSplits[2]);
- _localDiskOutputSegmentDir = _currentDiskWorkDir + "/segments/" + segmentDirectoryName;
-
- // To inherit from from the Hadoop Mapper class, you can't directly throw a general exception.
- Schema schema;
- Configuration conf = context.getConfiguration();
- final FileSystem fs = FileSystem.get(conf);
- final Path hdfsInputFilePath = new Path(_inputFilePath);
-
- final File localInputDataDir = new File(_currentDiskWorkDir, "inputData");
- try {
- if (localInputDataDir.exists()) {
- localInputDataDir.delete();
- }
- localInputDataDir.mkdir();
-
- final Path localInputFilePath =
- new Path(localInputDataDir.getAbsolutePath() + "/" + hdfsInputFilePath.getName());
- LOGGER.info("Copy from " + hdfsInputFilePath + " to " + localInputFilePath);
- fs.copyToLocalFile(hdfsInputFilePath, localInputFilePath);
-
- String schemaString = context.getConfiguration().get("data.schema");
- try {
- schema = Schema.fromString(schemaString);
- } catch (Exception e) {
- LOGGER.error("Could not get schema from string for value: " + schemaString);
- throw new RuntimeException(e);
- }
- } catch (Exception e) {
- LOGGER.error("Could not get schema: " + e);
- throw new RuntimeException(e);
- }
-
- LOGGER.info("*********************************************************************");
- LOGGER.info("input data file path : {}", _inputFilePath);
- LOGGER.info("local hdfs segment tar path: {}", _localHdfsSegmentTarPath);
- LOGGER.info("local disk output segment path: {}", _localDiskOutputSegmentDir);
- LOGGER.info("local disk segment tar path: {}", _localDiskSegmentTarPath);
- LOGGER.info("data schema: {}", schema);
- LOGGER.info("*********************************************************************");
-
- try {
- String segmentName =
- createSegment(_inputFilePath, schema, Integer.parseInt(lineSplits[2]), hdfsInputFilePath, localInputDataDir,
- fs);
- LOGGER.info(segmentName);
- LOGGER.info("Finished segment creation job successfully");
- } catch (Exception e) {
- LOGGER.error("Got exceptions during creating segments!", e);
- }
-
- context.write(new LongWritable(Long.parseLong(lineSplits[2])), new Text(
- FileSystem.get(_properties).listStatus(new Path(_localHdfsSegmentTarPath + "/"))[0].getPath().getName()));
- LOGGER.info("Finished the job successfully");
- }
-
- protected void setSegmentNameGenerator(SegmentGeneratorConfig segmentGeneratorConfig, Integer seqId,
- Path hdfsAvroPath, File dataPath) {
- }
-
- protected String createSegment(String dataFilePath, Schema schema, Integer seqId, Path hdfsInputFilePath,
- File localInputDataDir, FileSystem fs)
- throws Exception {
- SegmentGeneratorConfig segmentGeneratorConfig = new SegmentGeneratorConfig(_tableConfig, schema);
-
- segmentGeneratorConfig.setTableName(_tableName);
- setSegmentNameGenerator(segmentGeneratorConfig, seqId, hdfsInputFilePath, localInputDataDir);
-
- String inputFilePath = new File(localInputDataDir, hdfsInputFilePath.getName()).getAbsolutePath();
- LOGGER.info("Create segment input path: {}", inputFilePath);
- segmentGeneratorConfig.setInputFilePath(inputFilePath);
-
- FileFormat fileFormat = getFileFormat(dataFilePath);
- segmentGeneratorConfig.setFormat(fileFormat);
- segmentGeneratorConfig.setOnHeap(true);
-
- if (null != _postfix) {
- segmentGeneratorConfig.setSegmentNamePostfix(String.format("%s-%s", _postfix, seqId));
- } else {
- segmentGeneratorConfig.setSequenceId(seqId);
- }
- segmentGeneratorConfig.setReaderConfig(getReaderConfig(fileFormat));
-
- segmentGeneratorConfig.setOutDir(_localDiskOutputSegmentDir);
-
- // Add the current java package version to the segment metadata
- // properties file.
- Package objPackage = this.getClass().getPackage();
- if (null != objPackage) {
- String packageVersion = objPackage.getSpecificationVersion();
- if (null != packageVersion) {
- LOGGER.info("Pinot Hadoop Package version {}", packageVersion);
- segmentGeneratorConfig.setCreatorVersion(packageVersion);
- }
- }
-
- SegmentIndexCreationDriverImpl driver = new SegmentIndexCreationDriverImpl();
- driver.init(segmentGeneratorConfig);
- driver.build();
-
- // Tar the segment directory into file.
- String segmentName = driver.getSegmentName();
-
- File localDiskOutputSegmentDir = new File(_localDiskOutputSegmentDir, segmentName);
- String localDiskOutputSegmentDirAbsolutePath = localDiskOutputSegmentDir.getAbsolutePath();
- String localDiskSegmentTarFileAbsolutePath =
- new File(_localDiskSegmentTarPath).getAbsolutePath() + "/" + segmentName + JobConfigConstants.TARGZ;
-
- LOGGER.info("Trying to tar the segment to: {}", localDiskSegmentTarFileAbsolutePath);
- TarGzCompressionUtils
- .createTarGzOfDirectory(localDiskOutputSegmentDirAbsolutePath, localDiskSegmentTarFileAbsolutePath);
- String hdfsSegmentTarFilePath = _localHdfsSegmentTarPath + "/" + segmentName + JobConfigConstants.TARGZ;
-
- // Log segment size.
- long uncompressedSegmentSize = FileUtils.sizeOfDirectory(localDiskOutputSegmentDir);
- long compressedSegmentSize = new File(localDiskSegmentTarFileAbsolutePath).length();
- LOGGER.info(String.format("Segment %s uncompressed size: %s, compressed size: %s", segmentName,
- DataSize.fromBytes(uncompressedSegmentSize), DataSize.fromBytes(compressedSegmentSize)));
-
- LOGGER.info("*********************************************************************");
- LOGGER.info("Copy from : {} to {}", localDiskSegmentTarFileAbsolutePath, hdfsSegmentTarFilePath);
- LOGGER.info("*********************************************************************");
- fs.copyFromLocalFile(true, true, new Path(localDiskSegmentTarFileAbsolutePath), new Path(hdfsSegmentTarFilePath));
- return segmentName;
- }
-
- private RecordReaderConfig getReaderConfig(FileFormat fileFormat)
- throws IOException {
- RecordReaderConfig readerConfig = null;
- switch (fileFormat) {
- case CSV:
- if (_readerConfigFile == null) {
- readerConfig = new CSVRecordReaderConfig();
- } else {
- LOGGER.info("Reading CSV Record Reader Config from: {}", _readerConfigFile);
- Path readerConfigPath = new Path(_readerConfigFile);
- try (InputStream inputStream = _fileSystem.open(readerConfigPath)) {
- readerConfig = JsonUtils.inputStreamToObject(inputStream, CSVRecordReaderConfig.class);
- }
- LOGGER.info("CSV Record Reader Config: {}", readerConfig.toString());
- }
- break;
- case AVRO:
- break;
- case JSON:
- break;
- case THRIFT:
- readerConfig = new ThriftRecordReaderConfig();
- default:
- break;
- }
- return readerConfig;
- }
-
- private FileFormat getFileFormat(String dataFilePath) {
- if (dataFilePath.endsWith(".json")) {
- return FileFormat.JSON;
- }
- if (dataFilePath.endsWith(".csv")) {
- return FileFormat.CSV;
- }
- if (dataFilePath.endsWith(".avro")) {
- return FileFormat.AVRO;
- }
- if (dataFilePath.endsWith(".thrift")) {
- return FileFormat.THRIFT;
- }
- throw new RuntimeException("Not support file format - " + dataFilePath);
- }
- }
-}
diff --git a/pinot-hadoop/src/main/java/org/apache/pinot/hadoop/job/mapper/SegmentCreationMapper.java b/pinot-hadoop/src/main/java/org/apache/pinot/hadoop/job/mapper/SegmentCreationMapper.java
new file mode 100644
index 0000000..e8a122d
--- /dev/null
+++ b/pinot-hadoop/src/main/java/org/apache/pinot/hadoop/job/mapper/SegmentCreationMapper.java
@@ -0,0 +1,248 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.pinot.hadoop.job.mapper;
+
+import com.google.common.base.Preconditions;
+import java.io.File;
+import java.io.IOException;
+import java.io.InputStream;
+import javax.annotation.Nullable;
+import org.apache.commons.io.FileUtils;
+import org.apache.commons.lang3.StringUtils;
+import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.fs.FileSystem;
+import org.apache.hadoop.fs.Path;
+import org.apache.hadoop.io.LongWritable;
+import org.apache.hadoop.io.Text;
+import org.apache.hadoop.mapreduce.Mapper;
+import org.apache.hadoop.mapreduce.lib.output.FileOutputFormat;
+import org.apache.pinot.common.config.TableConfig;
+import org.apache.pinot.common.data.Schema;
+import org.apache.pinot.common.utils.DataSize;
+import org.apache.pinot.common.utils.JsonUtils;
+import org.apache.pinot.common.utils.TarGzCompressionUtils;
+import org.apache.pinot.core.data.readers.CSVRecordReaderConfig;
+import org.apache.pinot.core.data.readers.FileFormat;
+import org.apache.pinot.core.data.readers.RecordReaderConfig;
+import org.apache.pinot.core.data.readers.ThriftRecordReaderConfig;
+import org.apache.pinot.core.indexsegment.generator.SegmentGeneratorConfig;
+import org.apache.pinot.core.segment.creator.SegmentIndexCreationDriver;
+import org.apache.pinot.core.segment.creator.impl.SegmentIndexCreationDriverImpl;
+import org.apache.pinot.hadoop.job.JobConfigConstants;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+
+public class SegmentCreationMapper extends Mapper<LongWritable, Text, LongWritable, Text> {
+ protected static final String LOCAL_TEMP_DIR = "pinot_hadoop_tmp";
+
+ protected final Logger _logger = LoggerFactory.getLogger(getClass());
+
+ protected Configuration _jobConf;
+ protected String _rawTableName;
+ protected Schema _schema;
+
+ // Optional
+ protected TableConfig _tableConfig;
+ protected String _segmentNamePostfix;
+ protected Path _readerConfigFile;
+
+ // HDFS segment tar directory
+ protected Path _hdfsSegmentTarDir;
+
+ // Temporary local directories
+ protected File _localStagingDir;
+ protected File _localInputDir;
+ protected File _localSegmentDir;
+ protected File _localSegmentTarDir;
+
+ protected FileSystem _fileSystem;
+
+ @Override
+ public void setup(Context context)
+ throws IOException, InterruptedException {
+ _jobConf = context.getConfiguration();
+ _logger.info("*********************************************************************");
+ _logger.info("Job Configurations: {}", _jobConf);
+ _logger.info("*********************************************************************");
+
+ _rawTableName = Preconditions.checkNotNull(_jobConf.get(JobConfigConstants.SEGMENT_TABLE_NAME));
+ _schema = Schema.fromString(_jobConf.get(JobConfigConstants.SCHEMA));
+
+ // Optional
+ String tableConfigString = _jobConf.get(JobConfigConstants.TABLE_CONFIG);
+ if (tableConfigString != null) {
+ _tableConfig = TableConfig.fromJsonString(tableConfigString);
+ }
+ _segmentNamePostfix = _jobConf.get(JobConfigConstants.SEGMENT_NAME_POSTFIX);
+ String readerConfigFile = _jobConf.get(JobConfigConstants.PATH_TO_READER_CONFIG);
+ if (readerConfigFile != null) {
+ _readerConfigFile = new Path(readerConfigFile);
+ }
+
+ // Working directories
+ _hdfsSegmentTarDir = new Path(FileOutputFormat.getWorkOutputPath(context), JobConfigConstants.SEGMENT_TAR_DIR);
+ _localStagingDir = new File(LOCAL_TEMP_DIR);
+ _localInputDir = new File(_localStagingDir, "inputData");
+ _localSegmentDir = new File(_localStagingDir, "segments");
+ _localSegmentTarDir = new File(_localStagingDir, JobConfigConstants.SEGMENT_TAR_DIR);
+
+ if (_localStagingDir.exists()) {
+ _logger.warn("Deleting existing file: {}", _localStagingDir);
+ FileUtils.forceDelete(_localStagingDir);
+ }
+ _logger
+ .info("Making local temporary directories: {}, {}, {}", _localStagingDir, _localInputDir, _localSegmentTarDir);
+ Preconditions.checkState(_localStagingDir.mkdirs());
+ Preconditions.checkState(_localInputDir.mkdir());
+ Preconditions.checkState(_localSegmentDir.mkdir());
+ Preconditions.checkState(_localSegmentTarDir.mkdir());
+
+ _fileSystem = FileSystem.get(context.getConfiguration());
+
+ _logger.info("*********************************************************************");
+ _logger.info("Raw Table Name: {}", _rawTableName);
+ _logger.info("Schema: {}", _schema);
+ _logger.info("Table Config: {}", _tableConfig);
+ _logger.info("Segment Name Postfix: {}", _segmentNamePostfix);
+ _logger.info("Reader Config File: {}", _readerConfigFile);
+ _logger.info("*********************************************************************");
+ _logger.info("HDFS Segment Tar Directory: {}", _hdfsSegmentTarDir);
+ _logger.info("Local Staging Directory: {}", _localStagingDir);
+ _logger.info("Local Input Directory: {}", _localInputDir);
+ _logger.info("Local Segment Tar Directory: {}", _localSegmentTarDir);
+ _logger.info("*********************************************************************");
+ }
+
+ @Override
+ protected void map(LongWritable key, Text value, Context context)
+ throws IOException, InterruptedException {
+ String[] splits = StringUtils.split(value.toString(), ' ');
+ Preconditions.checkState(splits.length == 2, "Illegal input value: {}", value);
+
+ Path hdfsInputFile = new Path(splits[0]);
+ int sequenceId = Integer.parseInt(splits[1]);
+ _logger.info("Generating segment with HDFS input file: {}, sequence id: {}", hdfsInputFile, sequenceId);
+
+ String inputFileName = hdfsInputFile.getName();
+ File localInputFile = new File(_localInputDir, inputFileName);
+ _logger.info("Copying input file from: {} to: {}", hdfsInputFile, localInputFile);
+ _fileSystem.copyToLocalFile(hdfsInputFile, new Path(localInputFile.getAbsolutePath()));
+
+ SegmentGeneratorConfig segmentGeneratorConfig = new SegmentGeneratorConfig(_tableConfig, _schema);
+ segmentGeneratorConfig.setTableName(_rawTableName);
+ segmentGeneratorConfig.setInputFilePath(localInputFile.getPath());
+ segmentGeneratorConfig.setOutDir(_localSegmentDir.getPath());
+ segmentGeneratorConfig.setSequenceId(sequenceId);
+ if (_segmentNamePostfix != null) {
+ segmentGeneratorConfig.setSegmentNamePostfix(_segmentNamePostfix);
+ }
+ FileFormat fileFormat = getFileFormat(inputFileName);
+ segmentGeneratorConfig.setFormat(fileFormat);
+ segmentGeneratorConfig.setReaderConfig(getReaderConfig(fileFormat));
+ segmentGeneratorConfig.setOnHeap(true);
+
+ addAdditionalSegmentGeneratorConfigs(segmentGeneratorConfig, hdfsInputFile, sequenceId);
+
+ _logger.info("Start creating segment with sequence id: {}", sequenceId);
+ SegmentIndexCreationDriver driver = new SegmentIndexCreationDriverImpl();
+ try {
+ driver.init(segmentGeneratorConfig);
+ driver.build();
+ } catch (Exception e) {
+ _logger.error("Caught exception while creating segment with HDFS input file: {}, sequence id: {}", hdfsInputFile,
+ sequenceId, e);
+ throw new RuntimeException(e);
+ }
+ String segmentName = driver.getSegmentName();
+ _logger.info("Finish creating segment: {} with sequence id: {}", segmentName, sequenceId);
+
+ File localSegmentDir = new File(_localSegmentDir, segmentName);
+ String segmentTarFileName = segmentName + JobConfigConstants.TAR_GZ_FILE_EXT;
+ File localSegmentTarFile = new File(_localSegmentTarDir, segmentTarFileName);
+ _logger.info("Tarring segment from: {} to: {}", localSegmentDir, localSegmentTarFile);
+ TarGzCompressionUtils.createTarGzOfDirectory(localSegmentDir.getPath(), localSegmentTarFile.getPath());
+
+ long uncompressedSegmentSize = FileUtils.sizeOf(localSegmentDir);
+ long compressedSegmentSize = FileUtils.sizeOf(localSegmentTarFile);
+ _logger.info("Size for segment: {}, uncompressed: {}, compressed: {}", segmentName,
+ DataSize.fromBytes(uncompressedSegmentSize), DataSize.fromBytes(compressedSegmentSize));
+
+ Path hdfsSegmentTarFile = new Path(_hdfsSegmentTarDir, segmentTarFileName);
+ _logger.info("Copying segment tar file from: {} to: {}", localSegmentTarFile, hdfsSegmentTarFile);
+ _fileSystem.copyFromLocalFile(true, true, new Path(localSegmentTarFile.getAbsolutePath()), hdfsSegmentTarFile);
+
+ context.write(new LongWritable(sequenceId), new Text(segmentTarFileName));
+ _logger.info("Finish generating segment: {} with HDFS input file: {}, sequence id: {}", segmentName, hdfsInputFile,
+ sequenceId);
+ }
+
+ protected FileFormat getFileFormat(String fileName) {
+ if (fileName.endsWith(".avro")) {
+ return FileFormat.AVRO;
+ }
+ if (fileName.endsWith(".csv")) {
+ return FileFormat.CSV;
+ }
+ if (fileName.endsWith(".json")) {
+ return FileFormat.JSON;
+ }
+ if (fileName.endsWith(".thrift")) {
+ return FileFormat.THRIFT;
+ }
+ throw new IllegalArgumentException("Unsupported file format: {}" + fileName);
+ }
+
+ @Nullable
+ protected RecordReaderConfig getReaderConfig(FileFormat fileFormat)
+ throws IOException {
+ if (_readerConfigFile != null) {
+ if (fileFormat == FileFormat.CSV) {
+ try (InputStream inputStream = _fileSystem.open(_readerConfigFile)) {
+ CSVRecordReaderConfig readerConfig = JsonUtils.inputStreamToObject(inputStream, CSVRecordReaderConfig.class);
+ _logger.info("Using CSV record reader config: {}", readerConfig);
+ return readerConfig;
+ }
+ }
+ if (fileFormat == FileFormat.THRIFT) {
+ try (InputStream inputStream = _fileSystem.open(_readerConfigFile)) {
+ ThriftRecordReaderConfig readerConfig =
+ JsonUtils.inputStreamToObject(inputStream, ThriftRecordReaderConfig.class);
+ _logger.info("Using Thrift record reader config: {}", readerConfig);
+ return readerConfig;
+ }
+ }
+ }
+ return null;
+ }
+
+ /**
+ * Can be override to set additional segment generator configs.
+ */
+ @SuppressWarnings("unused")
+ protected void addAdditionalSegmentGeneratorConfigs(SegmentGeneratorConfig segmentGeneratorConfig, Path hdfsInputFile,
+ int sequenceId) {
+ }
+
+ @Override
+ public void cleanup(Context context) {
+ _logger.info("Deleting local temporary directory: {}", _localStagingDir);
+ FileUtils.deleteQuietly(_localStagingDir);
+ }
+}
diff --git a/pinot-hadoop/src/main/java/org/apache/pinot/hadoop/utils/PushLocation.java b/pinot-hadoop/src/main/java/org/apache/pinot/hadoop/utils/PushLocation.java
index 579950c..53ad7ed 100644
--- a/pinot-hadoop/src/main/java/org/apache/pinot/hadoop/utils/PushLocation.java
+++ b/pinot-hadoop/src/main/java/org/apache/pinot/hadoop/utils/PushLocation.java
@@ -18,13 +18,25 @@
*/
package org.apache.pinot.hadoop.utils;
+import java.util.ArrayList;
+import java.util.List;
+
+
public class PushLocation {
private final String _host;
private final int _port;
- private PushLocation(PushLocationBuilder pushLocationBuilder) {
- _host = pushLocationBuilder._host;
- _port = pushLocationBuilder._port;
+ public PushLocation(String host, int port) {
+ _host = host;
+ _port = port;
+ }
+
+ public static List<PushLocation> getPushLocations(String[] hosts, int port) {
+ List<PushLocation> pushLocations = new ArrayList<>(hosts.length);
+ for (String host : hosts) {
+ pushLocations.add(new PushLocation(host, port));
+ }
+ return pushLocations;
}
public String getHost() {
@@ -35,28 +47,6 @@ public class PushLocation {
return _port;
}
- public static class PushLocationBuilder {
- private String _host;
- private int _port;
-
- public PushLocationBuilder() {
- }
-
- public PushLocationBuilder setHost(String host) {
- _host = host;
- return this;
- }
-
- public PushLocationBuilder setPort(int port) {
- _port = port;
- return this;
- }
-
- public PushLocation build() {
- return new PushLocation(this);
- }
- }
-
@Override
public String toString() {
return _host + ":" + _port;
diff --git a/pinot-integration-tests/src/test/java/org/apache/pinot/integration/tests/HadoopSegmentBuildPushOfflineClusterIntegrationTest.java b/pinot-integration-tests/src/test/java/org/apache/pinot/integration/tests/HadoopSegmentBuildPushOfflineClusterIntegrationTest.java
index e682f85..36e083c 100644
--- a/pinot-integration-tests/src/test/java/org/apache/pinot/integration/tests/HadoopSegmentBuildPushOfflineClusterIntegrationTest.java
+++ b/pinot-integration-tests/src/test/java/org/apache/pinot/integration/tests/HadoopSegmentBuildPushOfflineClusterIntegrationTest.java
@@ -155,13 +155,13 @@ public class HadoopSegmentBuildPushOfflineClusterIntegrationTest extends BaseClu
properties.setProperty(JobConfigConstants.PUSH_TO_PORT, getDefaultControllerConfiguration().getControllerPort());
// Run segment creation job
- SegmentCreationJob creationJob = new SegmentCreationJob("TestSegmentCreation", properties);
+ SegmentCreationJob creationJob = new SegmentCreationJob(properties);
Configuration config = _mrCluster.getConfig();
creationJob.setConf(config);
creationJob.run();
// Run segment push job
- SegmentTarPushJob pushJob = new SegmentTarPushJob("TestSegmentPush", properties);
+ SegmentTarPushJob pushJob = new SegmentTarPushJob(properties);
pushJob.setConf(_mrCluster.getConfig());
pushJob.run();
}
---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscribe@pinot.apache.org
For additional commands, e-mail: commits-help@pinot.apache.org