You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@pinot.apache.org by xi...@apache.org on 2020/09/30 04:47:44 UTC
[incubator-pinot] branch master updated: Adding push job type of
segment metadata only mode (#5967)
This is an automated email from the ASF dual-hosted git repository.
xiangfu pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/incubator-pinot.git
The following commit(s) were added to refs/heads/master by this push:
new 4f2e767 Adding push job type of segment metadata only mode (#5967)
4f2e767 is described below
commit 4f2e767f5ccd123e689ecab719a83f0c16f41b2c
Author: Xiang Fu <fx...@gmail.com>
AuthorDate: Tue Sep 29 21:47:27 2020 -0700
Adding push job type of segment metadata only mode (#5967)
* Adding push job type of segment metadata only mode
* Adding clean up for temp files
* Address comments
* Adding integration tests
---
.../common/utils/FileUploadDownloadClient.java | 13 +-
.../PinotSegmentUploadDownloadRestletResource.java | 23 ++--
.../pinot/integration/tests/ClusterTest.java | 44 +++++-
.../ingestion/batch/common/SegmentPushUtils.java | 147 +++++++++++++++++++++
.../hadoop/HadoopSegmentMetadataPushJobRunner.java | 104 +++++++++++++++
.../spark/SparkSegmentMetadataPushJobRunner.java | 131 ++++++++++++++++++
.../segmentCreationAndMetadataPushJobSpec.yaml | 53 ++++++++
.../standalone/SegmentMetadataPushJobRunner.java | 91 +++++++++++++
.../spi/ingestion/batch/IngestionJobLauncher.java | 9 +-
.../ingestion/batch/runner/IngestionJobRunner.java | 1 +
.../batch/spec/ExecutionFrameworkSpec.java | 19 ++-
.../batch/spec/SegmentGenerationJobSpec.java | 2 +
12 files changed, 606 insertions(+), 31 deletions(-)
diff --git a/pinot-common/src/main/java/org/apache/pinot/common/utils/FileUploadDownloadClient.java b/pinot-common/src/main/java/org/apache/pinot/common/utils/FileUploadDownloadClient.java
index b18f1e2..4c2e30e 100644
--- a/pinot-common/src/main/java/org/apache/pinot/common/utils/FileUploadDownloadClient.java
+++ b/pinot-common/src/main/java/org/apache/pinot/common/utils/FileUploadDownloadClient.java
@@ -87,7 +87,7 @@ public class FileUploadDownloadClient implements Closeable {
}
public enum FileUploadType {
- URI, JSON, SEGMENT;
+ URI, JSON, SEGMENT, METADATA;
public static FileUploadType getDefaultUploadType() {
return SEGMENT;
@@ -103,7 +103,6 @@ public class FileUploadDownloadClient implements Closeable {
private static final String SCHEMA_PATH = "/schemas";
private static final String OLD_SEGMENT_PATH = "/segments";
private static final String SEGMENT_PATH = "/v2/segments";
- private static final String SEGMENT_METADATA_PATH = "/segmentmetadata";
private static final String TABLES_PATH = "/tables";
private static final String TYPE_DELIMITER = "?type=";
@@ -194,16 +193,6 @@ public class FileUploadDownloadClient implements Closeable {
return getURI(HTTP, host, port, SEGMENT_PATH);
}
- public static URI getUploadSegmentMetadataHttpURI(String host, int port)
- throws URISyntaxException {
- return getURI(HTTP, host, port, SEGMENT_METADATA_PATH);
- }
-
- public static URI getUploadSegmentMetadataHttpsURI(String host, int port)
- throws URISyntaxException {
- return getURI(HTTPS, host, port, SEGMENT_METADATA_PATH);
- }
-
public static URI getUploadSegmentHttpsURI(String host, int port)
throws URISyntaxException {
return getURI(HTTPS, host, port, SEGMENT_PATH);
diff --git a/pinot-controller/src/main/java/org/apache/pinot/controller/api/resources/PinotSegmentUploadDownloadRestletResource.java b/pinot-controller/src/main/java/org/apache/pinot/controller/api/resources/PinotSegmentUploadDownloadRestletResource.java
index 8da5b53..2ae966d 100644
--- a/pinot-controller/src/main/java/org/apache/pinot/controller/api/resources/PinotSegmentUploadDownloadRestletResource.java
+++ b/pinot-controller/src/main/java/org/apache/pinot/controller/api/resources/PinotSegmentUploadDownloadRestletResource.java
@@ -195,7 +195,6 @@ public class PinotSegmentUploadDownloadRestletResource {
crypterClassNameInHeader = extractHttpHeader(headers, FileUploadDownloadClient.CustomHeaders.CRYPTER);
downloadUri = extractHttpHeader(headers, FileUploadDownloadClient.CustomHeaders.DOWNLOAD_URI);
}
-
File tempEncryptedFile = null;
File tempDecryptedFile = null;
File tempSegmentDir = null;
@@ -207,9 +206,8 @@ public class PinotSegmentUploadDownloadRestletResource {
tempSegmentDir = new File(provider.getUntarredFileTempDir(), tempFileName);
boolean uploadedSegmentIsEncrypted = !Strings.isNullOrEmpty(crypterClassNameInHeader);
-
- File dstFile = uploadedSegmentIsEncrypted ? tempEncryptedFile : tempDecryptedFile;
FileUploadDownloadClient.FileUploadType uploadType = getUploadType(uploadTypeStr);
+ File dstFile = uploadedSegmentIsEncrypted ? tempEncryptedFile : tempDecryptedFile;
switch (uploadType) {
case URI:
downloadSegmentFileFromURI(downloadUri, dstFile, tableName);
@@ -217,6 +215,11 @@ public class PinotSegmentUploadDownloadRestletResource {
case SEGMENT:
createSegmentFileFromMultipart(multiPart, dstFile);
break;
+ case METADATA:
+ moveSegmentToFinalLocation = false;
+ Preconditions.checkState(downloadUri != null, "Download URI is required in segment metadata upload mode");
+ createSegmentFileFromMultipart(multiPart, dstFile);
+ break;
default:
throw new UnsupportedOperationException("Unsupported upload type: " + uploadType);
}
@@ -235,11 +238,11 @@ public class PinotSegmentUploadDownloadRestletResource {
String rawTableName;
if (tableName != null && !tableName.isEmpty()) {
rawTableName = TableNameBuilder.extractRawTableName(tableName);
- LOGGER.info("Uploading segment {} to table: {}, (Derived from API parameter)", segmentName, tableName);
+ LOGGER.info("Uploading a segment {} to table: {}, push type {}, (Derived from API parameter)", segmentName, tableName, uploadType);
} else {
// TODO: remove this when we completely deprecate the table name from segment metadata
rawTableName = segmentMetadata.getTableName();
- LOGGER.info("Uploading a segment {} to table: {}, (Derived from segment metadata)", segmentName, tableName);
+ LOGGER.info("Uploading a segment {} to table: {}, push type {}, (Derived from segment metadata)", segmentName, tableName, uploadType);
}
String offlineTableName = TableNameBuilder.OFFLINE.tableNameWithType(rawTableName);
@@ -247,10 +250,12 @@ public class PinotSegmentUploadDownloadRestletResource {
LOGGER.info("Processing upload request for segment: {} of table: {} from client: {}, ingestion descriptor: {}",
segmentName, offlineTableName, clientAddress, ingestionDescriptor);
- // Validate segment
- new SegmentValidator(_pinotHelixResourceManager, _controllerConf, _executor, _connectionManager,
- _controllerMetrics, _leadControllerManager.isLeaderForTable(offlineTableName))
- .validateOfflineSegment(offlineTableName, segmentMetadata, tempSegmentDir);
+ // Skip segment validation if upload only segment metadata
+ if (uploadType != FileUploadDownloadClient.FileUploadType.METADATA) {
+ // Validate segment
+ new SegmentValidator(_pinotHelixResourceManager, _controllerConf, _executor, _connectionManager,
+ _controllerMetrics, _leadControllerManager.isLeaderForTable(offlineTableName)).validateOfflineSegment(offlineTableName, segmentMetadata, tempSegmentDir);
+ }
// Encrypt segment
String crypterClassNameInTableConfig =
diff --git a/pinot-integration-tests/src/test/java/org/apache/pinot/integration/tests/ClusterTest.java b/pinot-integration-tests/src/test/java/org/apache/pinot/integration/tests/ClusterTest.java
index d603fee..6ffc836 100644
--- a/pinot-integration-tests/src/test/java/org/apache/pinot/integration/tests/ClusterTest.java
+++ b/pinot-integration-tests/src/test/java/org/apache/pinot/integration/tests/ClusterTest.java
@@ -22,9 +22,14 @@ import static org.testng.Assert.assertEquals;
import static org.testng.Assert.assertNotNull;
import static org.testng.Assert.assertTrue;
+import com.google.common.collect.ImmutableList;
import java.io.File;
+import java.io.IOException;
import java.net.URI;
+import java.net.URLEncoder;
+import java.nio.charset.StandardCharsets;
import java.util.ArrayList;
+import java.util.Arrays;
import java.util.HashMap;
import java.util.List;
import java.util.Map;
@@ -42,8 +47,13 @@ import org.apache.avro.generic.GenericRecord;
import org.apache.avro.io.DatumReader;
import org.apache.avro.io.DecoderFactory;
import org.apache.commons.io.FileUtils;
+import org.apache.http.Header;
import org.apache.http.HttpStatus;
+import org.apache.http.NameValuePair;
+import org.apache.http.message.BasicHeader;
+import org.apache.http.message.BasicNameValuePair;
import org.apache.pinot.broker.broker.helix.HelixBrokerStarter;
+import org.apache.pinot.common.exception.HttpErrorStatusException;
import org.apache.pinot.core.requesthandler.PinotQueryRequest;
import org.apache.pinot.common.utils.CommonConstants.Broker;
import org.apache.pinot.common.utils.CommonConstants.Helix;
@@ -253,17 +263,25 @@ public abstract class ClusterTest extends ControllerTest {
try (FileUploadDownloadClient fileUploadDownloadClient = new FileUploadDownloadClient()) {
if (numSegments == 1) {
File segmentTarFile = segmentTarFiles[0];
- assertEquals(fileUploadDownloadClient
- .uploadSegment(uploadSegmentHttpURI, segmentTarFile.getName(), segmentTarFile, tableName).getStatusCode(),
- HttpStatus.SC_OK);
+ if (System.currentTimeMillis() % 2 == 0) {
+ assertEquals(fileUploadDownloadClient.uploadSegment(uploadSegmentHttpURI, segmentTarFile.getName(), segmentTarFile, tableName).getStatusCode(),
+ HttpStatus.SC_OK);
+ } else {
+ assertEquals(
+ uploadSegmentWithOnlyMetadata(tableName, uploadSegmentHttpURI, fileUploadDownloadClient, segmentTarFile), HttpStatus.SC_OK);
+ }
} else {
// Upload all segments in parallel
ExecutorService executorService = Executors.newFixedThreadPool(numSegments);
List<Future<Integer>> futures = new ArrayList<>(numSegments);
for (File segmentTarFile : segmentTarFiles) {
- futures.add(executorService.submit(() -> fileUploadDownloadClient
- .uploadSegment(uploadSegmentHttpURI, segmentTarFile.getName(), segmentTarFile, tableName)
- .getStatusCode()));
+ futures.add(executorService.submit(() -> {
+ if (System.currentTimeMillis() % 2 == 0) {
+ return fileUploadDownloadClient.uploadSegment(uploadSegmentHttpURI, segmentTarFile.getName(), segmentTarFile, tableName).getStatusCode();
+ } else {
+ return uploadSegmentWithOnlyMetadata(tableName, uploadSegmentHttpURI, fileUploadDownloadClient, segmentTarFile);
+ }
+ }));
}
executorService.shutdown();
for (Future<Integer> future : futures) {
@@ -273,6 +291,20 @@ public abstract class ClusterTest extends ControllerTest {
}
}
+ private int uploadSegmentWithOnlyMetadata(String tableName, URI uploadSegmentHttpURI,
+ FileUploadDownloadClient fileUploadDownloadClient, File segmentTarFile)
+ throws IOException, HttpErrorStatusException {
+ List<Header> headers = ImmutableList.of(new BasicHeader(FileUploadDownloadClient.CustomHeaders.DOWNLOAD_URI,
+ "file://" + segmentTarFile.getParentFile().getAbsolutePath() + "/" + URLEncoder.encode(segmentTarFile.getName(), StandardCharsets.UTF_8.toString())),
+ new BasicHeader(FileUploadDownloadClient.CustomHeaders.UPLOAD_TYPE, FileUploadDownloadClient.FileUploadType.METADATA.toString()));
+ // Add table name as a request parameter
+ NameValuePair
+ tableNameValuePair = new BasicNameValuePair(FileUploadDownloadClient.QueryParameters.TABLE_NAME, tableName);
+ List<NameValuePair> parameters = Arrays.asList(tableNameValuePair);
+ return fileUploadDownloadClient
+ .uploadSegmentMetadata(uploadSegmentHttpURI, segmentTarFile.getName(), segmentTarFile, headers, parameters, fileUploadDownloadClient.DEFAULT_SOCKET_TIMEOUT_MS).getStatusCode();
+ }
+
public static class AvroFileSchemaKafkaAvroMessageDecoder implements StreamMessageDecoder<byte[]> {
private static final Logger LOGGER = LoggerFactory.getLogger(AvroFileSchemaKafkaAvroMessageDecoder.class);
public static File avroFile;
diff --git a/pinot-plugins/pinot-batch-ingestion/pinot-batch-ingestion-common/src/main/java/org/apache/pinot/plugin/ingestion/batch/common/SegmentPushUtils.java b/pinot-plugins/pinot-batch-ingestion/pinot-batch-ingestion-common/src/main/java/org/apache/pinot/plugin/ingestion/batch/common/SegmentPushUtils.java
index bc5e91a..71ccfbe 100644
--- a/pinot-plugins/pinot-batch-ingestion/pinot-batch-ingestion-common/src/main/java/org/apache/pinot/plugin/ingestion/batch/common/SegmentPushUtils.java
+++ b/pinot-plugins/pinot-batch-ingestion/pinot-batch-ingestion-common/src/main/java/org/apache/pinot/plugin/ingestion/batch/common/SegmentPushUtils.java
@@ -19,17 +19,28 @@
package org.apache.pinot.plugin.ingestion.batch.common;
import com.google.common.base.Preconditions;
+import com.google.common.collect.ImmutableList;
import java.io.File;
import java.io.InputStream;
import java.io.Serializable;
import java.net.URI;
import java.net.URISyntaxException;
import java.util.Arrays;
+import java.util.HashMap;
import java.util.List;
+import java.util.Map;
+import java.util.UUID;
+import org.apache.commons.io.FileUtils;
import org.apache.commons.lang3.StringUtils;
+import org.apache.http.Header;
+import org.apache.http.NameValuePair;
+import org.apache.http.message.BasicHeader;
+import org.apache.http.message.BasicNameValuePair;
import org.apache.pinot.common.exception.HttpErrorStatusException;
import org.apache.pinot.common.utils.FileUploadDownloadClient;
import org.apache.pinot.common.utils.SimpleHttpResponse;
+import org.apache.pinot.common.utils.TarGzCompressionUtils;
+import org.apache.pinot.core.segment.creator.impl.V1Constants;
import org.apache.pinot.spi.filesystem.PinotFS;
import org.apache.pinot.spi.ingestion.batch.spec.Constants;
import org.apache.pinot.spi.ingestion.batch.spec.PinotClusterSpec;
@@ -177,4 +188,140 @@ public class SegmentPushUtils implements Serializable {
}
}
}
+
+ /**
+ * This method takes a map of segment downloadURI to corresponding tar file path, and push those segments in metadata mode.
+ * The steps are:
+ * 1. Download segment from tar file path;
+ * 2. Untar segment metadata and creation meta files from the tar file to a segment metadata directory;
+ * 3. Tar this segment metadata directory into a tar file
+ * 4. Generate a POST request with segmentDownloadURI in header to push tar file to Pinot controller.
+ *
+ * @param spec is the segment generation job spec
+ * @param fileSystem is the PinotFs used to copy segment tar file
+ * @param segmentUriToTarPathMap contains the map of segment DownloadURI to segment tar file path
+ * @throws Exception
+ */
+ public static void sendSegmentUriAndMetadata(SegmentGenerationJobSpec spec, PinotFS fileSystem, Map<String, String> segmentUriToTarPathMap)
+ throws Exception {
+ String tableName = spec.getTableSpec().getTableName();
+ LOGGER.info("Start pushing segment metadata: {} to locations: {} for table {}",
+ segmentUriToTarPathMap,
+ Arrays.toString(spec.getPinotClusterSpecs()), tableName);
+ for (String segmentUriPath : segmentUriToTarPathMap.keySet()) {
+ String tarFilePath = segmentUriToTarPathMap.get(segmentUriPath);
+ String fileName = new File(tarFilePath).getName();
+ Preconditions.checkArgument(fileName.endsWith(Constants.TAR_GZ_FILE_EXT));
+ String segmentName = fileName.substring(0, fileName.length() - Constants.TAR_GZ_FILE_EXT.length());
+ File segmentMetadataFile = generateSegmentMetadataFile(fileSystem, URI.create(tarFilePath));
+ try {
+ for (PinotClusterSpec pinotClusterSpec : spec.getPinotClusterSpecs()) {
+ URI controllerURI;
+ try {
+ controllerURI = new URI(pinotClusterSpec.getControllerURI());
+ } catch (URISyntaxException e) {
+ throw new RuntimeException("Got invalid controller uri - '" + pinotClusterSpec.getControllerURI() + "'");
+ }
+ LOGGER.info("Pushing segment: {} to location: {} for table {}", segmentName, controllerURI, tableName);
+ int attempts = 1;
+ if (spec.getPushJobSpec() != null && spec.getPushJobSpec().getPushAttempts() > 0) {
+ attempts = spec.getPushJobSpec().getPushAttempts();
+ }
+ long retryWaitMs = 1000L;
+ if (spec.getPushJobSpec() != null && spec.getPushJobSpec().getPushRetryIntervalMillis() > 0) {
+ retryWaitMs = spec.getPushJobSpec().getPushRetryIntervalMillis();
+ }
+ RetryPolicies.exponentialBackoffRetryPolicy(attempts, retryWaitMs, 5).attempt(() -> {
+ try {
+ List<Header> headers = ImmutableList.of(
+ new BasicHeader(FileUploadDownloadClient.CustomHeaders.DOWNLOAD_URI, segmentUriPath),
+ new BasicHeader(FileUploadDownloadClient.CustomHeaders.UPLOAD_TYPE, FileUploadDownloadClient.FileUploadType.METADATA.toString()));
+ // Add table name as a request parameter
+ NameValuePair tableNameValuePair =
+ new BasicNameValuePair(FileUploadDownloadClient.QueryParameters.TABLE_NAME, tableName);
+ List<NameValuePair> parameters = Arrays.asList(tableNameValuePair);
+ SimpleHttpResponse response = FILE_UPLOAD_DOWNLOAD_CLIENT.uploadSegmentMetadata(FileUploadDownloadClient.getUploadSegmentURI(controllerURI),
+ segmentName, segmentMetadataFile, headers, parameters, FILE_UPLOAD_DOWNLOAD_CLIENT.DEFAULT_SOCKET_TIMEOUT_MS);
+ LOGGER.info("Response for pushing table {} segment {} to location {} - {}: {}", tableName, segmentName,
+ controllerURI, response.getStatusCode(), response.getResponse());
+ return true;
+ } catch (HttpErrorStatusException e) {
+ int statusCode = e.getStatusCode();
+ if (statusCode >= 500) {
+ // Temporary exception
+ LOGGER
+ .warn("Caught temporary exception while pushing table: {} segment: {} to {}, will retry", tableName,
+ segmentName, controllerURI, e);
+ return false;
+ } else {
+ // Permanent exception
+ LOGGER.error("Caught permanent exception while pushing table: {} segment: {} to {}, won't retry",
+ tableName, segmentName, controllerURI, e);
+ throw e;
+ }
+ }
+ });
+ }
+ } finally {
+ FileUtils.deleteQuietly(segmentMetadataFile);
+ }
+ }
+ }
+
+ public static Map<String, String> getSegmentUriToTarPathMap(URI outputDirURI, String uriPrefix, String uriSuffix, String[] files) {
+ Map<String, String> segmentUriToTarPathMap = new HashMap<>();
+ for (String file : files) {
+ URI uri = URI.create(file);
+ if (uri.getPath().endsWith(Constants.TAR_GZ_FILE_EXT)) {
+ URI updatedURI = SegmentPushUtils.generateSegmentTarURI(outputDirURI, uri, uriPrefix,uriSuffix);
+ segmentUriToTarPathMap.put(updatedURI.toString(), file);
+ }
+ }
+ return segmentUriToTarPathMap;
+ }
+
+ /**
+ * Generate a segment metadata only tar file, which contains only metadata.properties and creation.meta file.
+ * The purpose of this is to create a lean tar to push to Pinot controller for adding segments without downloading
+ * the complete segment and untar the segment tarball.
+ *
+ * 1. Download segment tar file to temp dir;
+ * 2. Extract only metadata.properties and creation.meta files from the segment tar file;
+ * 3. Tar both files into a segment metadata file.
+ *
+ */
+ private static File generateSegmentMetadataFile(PinotFS fileSystem, URI tarFileURI)
+ throws Exception {
+ String uuid = UUID.randomUUID().toString();
+ File tarFile = new File(FileUtils.getTempDirectory(), "segmentTar-" + uuid + TarGzCompressionUtils.TAR_GZ_FILE_EXTENSION);
+ File segmentMetadataDir = new File(FileUtils.getTempDirectory(), "segmentMetadataDir-" + uuid);
+ try {
+ fileSystem.copyToLocalFile(tarFileURI, tarFile);
+ if (segmentMetadataDir.exists()) {
+ FileUtils.forceDelete(segmentMetadataDir);
+ }
+ FileUtils.forceMkdir(segmentMetadataDir);
+
+ // Extract metadata.properties
+ LOGGER.info("Trying to untar Metadata file from: [{}] to [{}]", tarFile, segmentMetadataDir);
+ TarGzCompressionUtils.untarOneFile(tarFile, V1Constants.MetadataKeys.METADATA_FILE_NAME,
+ new File(segmentMetadataDir, V1Constants.MetadataKeys.METADATA_FILE_NAME));
+
+ // Extract creation.meta
+ LOGGER.info("Trying to untar CreationMeta file from: [{}] to [{}]", tarFile, segmentMetadataDir);
+ TarGzCompressionUtils.untarOneFile(tarFile, V1Constants.SEGMENT_CREATION_META,
+ new File(segmentMetadataDir, V1Constants.SEGMENT_CREATION_META));
+
+ File segmentMetadataTarFile = new File(FileUtils.getTempDirectory(), "segmentMetadata-" + uuid + TarGzCompressionUtils.TAR_GZ_FILE_EXTENSION);
+ if (segmentMetadataTarFile.exists()) {
+ FileUtils.forceDelete(segmentMetadataTarFile);
+ }
+ LOGGER.info("Trying to tar segment metadata dir [{}] to [{}]", segmentMetadataDir, segmentMetadataTarFile);
+ TarGzCompressionUtils.createTarGzFile(segmentMetadataDir, segmentMetadataTarFile);
+ return segmentMetadataTarFile;
+ } finally {
+ FileUtils.deleteQuietly(tarFile);
+ FileUtils.deleteQuietly(segmentMetadataDir);
+ }
+ }
}
diff --git a/pinot-plugins/pinot-batch-ingestion/pinot-batch-ingestion-hadoop/src/main/java/org/apache/pinot/plugin/ingestion/batch/hadoop/HadoopSegmentMetadataPushJobRunner.java b/pinot-plugins/pinot-batch-ingestion/pinot-batch-ingestion-hadoop/src/main/java/org/apache/pinot/plugin/ingestion/batch/hadoop/HadoopSegmentMetadataPushJobRunner.java
new file mode 100644
index 0000000..d605afa
--- /dev/null
+++ b/pinot-plugins/pinot-batch-ingestion/pinot-batch-ingestion-hadoop/src/main/java/org/apache/pinot/plugin/ingestion/batch/hadoop/HadoopSegmentMetadataPushJobRunner.java
@@ -0,0 +1,104 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.pinot.plugin.ingestion.batch.hadoop;
+
+import java.io.File;
+import java.io.IOException;
+import java.io.Serializable;
+import java.net.URI;
+import java.net.URISyntaxException;
+import java.util.ArrayList;
+import java.util.List;
+import java.util.Map;
+import org.apache.pinot.plugin.ingestion.batch.common.SegmentPushUtils;
+import org.apache.pinot.spi.env.PinotConfiguration;
+import org.apache.pinot.spi.filesystem.PinotFS;
+import org.apache.pinot.spi.filesystem.PinotFSFactory;
+import org.apache.pinot.spi.ingestion.batch.runner.IngestionJobRunner;
+import org.apache.pinot.spi.ingestion.batch.spec.Constants;
+import org.apache.pinot.spi.ingestion.batch.spec.PinotFSSpec;
+import org.apache.pinot.spi.ingestion.batch.spec.SegmentGenerationJobSpec;
+
+
+public class HadoopSegmentMetadataPushJobRunner implements IngestionJobRunner, Serializable {
+ private SegmentGenerationJobSpec _spec;
+
+ public HadoopSegmentMetadataPushJobRunner() {
+ }
+
+ public HadoopSegmentMetadataPushJobRunner(SegmentGenerationJobSpec spec) {
+ init(spec);
+ }
+
+ @Override
+ public void init(SegmentGenerationJobSpec spec) {
+ _spec = spec;
+ }
+
+ @Override
+ public void run() {
+ //init all file systems
+ List<PinotFSSpec> pinotFSSpecs = _spec.getPinotFSSpecs();
+ for (PinotFSSpec pinotFSSpec : pinotFSSpecs) {
+ PinotFSFactory.register(pinotFSSpec.getScheme(), pinotFSSpec.getClassName(), new PinotConfiguration(pinotFSSpec));
+ }
+
+ //Get outputFS for writing output pinot segments
+ URI outputDirURI;
+ try {
+ outputDirURI = new URI(_spec.getOutputDirURI());
+ if (outputDirURI.getScheme() == null) {
+ outputDirURI = new File(_spec.getOutputDirURI()).toURI();
+ }
+ } catch (URISyntaxException e) {
+ throw new RuntimeException("outputDirURI is not valid - '" + _spec.getOutputDirURI() + "'");
+ }
+ PinotFS outputDirFS = PinotFSFactory.create(outputDirURI.getScheme());
+ //Get list of files to process
+ String[] files;
+ try {
+ files = outputDirFS.listFiles(outputDirURI, true);
+ } catch (IOException e) {
+ throw new RuntimeException("Unable to list all files under outputDirURI - '" + outputDirURI + "'");
+ }
+
+ List<String> segmentsToPush = new ArrayList<>();
+ for (String file : files) {
+ if (file.endsWith(Constants.TAR_GZ_FILE_EXT)) {
+ segmentsToPush.add(file);
+ }
+ }
+
+ int pushParallelism = _spec.getPushJobSpec().getPushParallelism();
+ if (pushParallelism < 1) {
+ pushParallelism = segmentsToPush.size();
+ }
+ // Push from driver
+
+ try {
+ Map<String, String> segmentUriToTarPathMap = SegmentPushUtils
+ .getSegmentUriToTarPathMap(outputDirURI, _spec.getPushJobSpec().getSegmentUriPrefix(),
+ _spec.getPushJobSpec().getSegmentUriSuffix(), segmentsToPush.toArray(new String[0]));
+ SegmentPushUtils
+ .sendSegmentUriAndMetadata(_spec, PinotFSFactory.create(outputDirURI.getScheme()), segmentUriToTarPathMap);
+ } catch (Exception e) {
+ throw new RuntimeException(e);
+ }
+ }
+}
diff --git a/pinot-plugins/pinot-batch-ingestion/pinot-batch-ingestion-spark/src/main/java/org/apache/pinot/plugin/ingestion/batch/spark/SparkSegmentMetadataPushJobRunner.java b/pinot-plugins/pinot-batch-ingestion/pinot-batch-ingestion-spark/src/main/java/org/apache/pinot/plugin/ingestion/batch/spark/SparkSegmentMetadataPushJobRunner.java
new file mode 100644
index 0000000..5b07db4
--- /dev/null
+++ b/pinot-plugins/pinot-batch-ingestion/pinot-batch-ingestion-spark/src/main/java/org/apache/pinot/plugin/ingestion/batch/spark/SparkSegmentMetadataPushJobRunner.java
@@ -0,0 +1,131 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.pinot.plugin.ingestion.batch.spark;
+
+import java.io.File;
+import java.io.IOException;
+import java.io.Serializable;
+import java.net.URI;
+import java.net.URISyntaxException;
+import java.util.ArrayList;
+import java.util.List;
+import java.util.Map;
+import org.apache.pinot.plugin.ingestion.batch.common.SegmentPushUtils;
+import org.apache.pinot.spi.env.PinotConfiguration;
+import org.apache.pinot.spi.filesystem.PinotFS;
+import org.apache.pinot.spi.filesystem.PinotFSFactory;
+import org.apache.pinot.spi.ingestion.batch.runner.IngestionJobRunner;
+import org.apache.pinot.spi.ingestion.batch.spec.Constants;
+import org.apache.pinot.spi.ingestion.batch.spec.PinotFSSpec;
+import org.apache.pinot.spi.ingestion.batch.spec.SegmentGenerationJobSpec;
+import org.apache.pinot.spi.utils.retry.AttemptsExceededException;
+import org.apache.pinot.spi.utils.retry.RetriableOperationException;
+import org.apache.spark.SparkContext;
+import org.apache.spark.api.java.JavaRDD;
+import org.apache.spark.api.java.JavaSparkContext;
+import org.apache.spark.api.java.function.VoidFunction;
+
+
+public class SparkSegmentMetadataPushJobRunner implements IngestionJobRunner, Serializable {
+ private SegmentGenerationJobSpec _spec;
+
+ public SparkSegmentMetadataPushJobRunner() {
+ }
+
+ public SparkSegmentMetadataPushJobRunner(SegmentGenerationJobSpec spec) {
+ init(spec);
+ }
+
+ @Override
+ public void init(SegmentGenerationJobSpec spec) {
+ _spec = spec;
+ }
+
+ @Override
+ public void run() {
+ //init all file systems
+ List<PinotFSSpec> pinotFSSpecs = _spec.getPinotFSSpecs();
+ for (PinotFSSpec pinotFSSpec : pinotFSSpecs) {
+ PinotFSFactory.register(pinotFSSpec.getScheme(), pinotFSSpec.getClassName(), new PinotConfiguration(pinotFSSpec));
+ }
+
+ //Get outputFS for writing output pinot segments
+ URI outputDirURI;
+ try {
+ outputDirURI = new URI(_spec.getOutputDirURI());
+ if (outputDirURI.getScheme() == null) {
+ outputDirURI = new File(_spec.getOutputDirURI()).toURI();
+ }
+ } catch (URISyntaxException e) {
+ throw new RuntimeException("outputDirURI is not valid - '" + _spec.getOutputDirURI() + "'");
+ }
+ PinotFS outputDirFS = PinotFSFactory.create(outputDirURI.getScheme());
+ //Get list of files to process
+ String[] files;
+ try {
+ files = outputDirFS.listFiles(outputDirURI, true);
+ } catch (IOException e) {
+ throw new RuntimeException("Unable to list all files under outputDirURI - '" + outputDirURI + "'");
+ }
+
+ List<String> segmentsToPush = new ArrayList<>();
+ for (String file : files) {
+ if (file.endsWith(Constants.TAR_GZ_FILE_EXT)) {
+ segmentsToPush.add(file);
+ }
+ }
+
+ int pushParallelism = _spec.getPushJobSpec().getPushParallelism();
+ if (pushParallelism < 1) {
+ pushParallelism = segmentsToPush.size();
+ }
+ if (pushParallelism == 1) {
+ // Push from driver
+ try {
+ SegmentPushUtils.pushSegments(_spec, outputDirFS, segmentsToPush);
+ } catch (RetriableOperationException | AttemptsExceededException e) {
+ throw new RuntimeException(e);
+ }
+ } else {
+ JavaSparkContext sparkContext = JavaSparkContext.fromSparkContext(SparkContext.getOrCreate());
+ JavaRDD<String> pathRDD = sparkContext.parallelize(segmentsToPush, pushParallelism);
+ URI finalOutputDirURI = outputDirURI;
+ // Prevent using lambda expression in Spark to avoid potential serialization exceptions, use inner function instead.
+ pathRDD.foreach(new VoidFunction<String>() {
+ @Override
+ public void call(String segmentTarPath)
+ throws Exception {
+ for (PinotFSSpec pinotFSSpec : pinotFSSpecs) {
+ PinotFSFactory
+ .register(pinotFSSpec.getScheme(), pinotFSSpec.getClassName(), new PinotConfiguration(pinotFSSpec));
+ }
+ try {
+ Map<String, String> segmentUriToTarPathMap = SegmentPushUtils
+ .getSegmentUriToTarPathMap(finalOutputDirURI, _spec.getPushJobSpec().getSegmentUriPrefix(),
+ _spec.getPushJobSpec().getSegmentUriSuffix(), new String[]{segmentTarPath});
+ SegmentPushUtils.sendSegmentUriAndMetadata(_spec, PinotFSFactory.create(finalOutputDirURI.getScheme()),
+ segmentUriToTarPathMap);
+ } catch (RetriableOperationException | AttemptsExceededException e) {
+ throw new RuntimeException(e);
+ }
+ }
+ });
+ }
+ }
+}
diff --git a/pinot-plugins/pinot-batch-ingestion/pinot-batch-ingestion-spark/src/main/resources/segmentCreationAndMetadataPushJobSpec.yaml b/pinot-plugins/pinot-batch-ingestion/pinot-batch-ingestion-spark/src/main/resources/segmentCreationAndMetadataPushJobSpec.yaml
new file mode 100644
index 0000000..e6a5d32
--- /dev/null
+++ b/pinot-plugins/pinot-batch-ingestion/pinot-batch-ingestion-spark/src/main/resources/segmentCreationAndMetadataPushJobSpec.yaml
@@ -0,0 +1,53 @@
+#
+# Licensed to the Apache Software Foundation (ASF) under one
+# or more contributor license agreements. See the NOTICE file
+# distributed with this work for additional information
+# regarding copyright ownership. The ASF licenses this file
+# to you under the Apache License, Version 2.0 (the
+# "License"); you may not use this file except in compliance
+# with the License. You may obtain a copy of the License at
+#
+# http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing,
+# software distributed under the License is distributed on an
+# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+# KIND, either express or implied. See the License for the
+# specific language governing permissions and limitations
+# under the License.
+#
+
+executionFrameworkSpec:
+ name: 'spark'
+ segmentGenerationJobRunnerClassName: 'org.apache.pinot.plugin.ingestion.batch.spark.SparkSegmentGenerationJobRunner'
+ segmentTarPushJobRunnerClassName: 'org.apache.pinot.plugin.ingestion.batch.spark.SparkSegmentTarPushJobRunner'
+ segmentUriPushJobRunnerClassName: 'org.apache.pinot.plugin.ingestion.batch.spark.SparkSegmentUriPushJobRunner'
+ segmentMetadataPushJobRunnerClassName: 'org.apache.pinot.plugin.ingestion.batch.spark.SparkSegmentMetadataPushJobRunner'
+jobType: SegmentCreationAndMetadataPush
+inputDirURI: 'file:///path/to/input'
+includeFileNamePattern: 'glob:**/*.parquet'
+excludeFileNamePattern: 'glob:**/*.avro'
+outputDirURI: 'file:///path/to/output'
+overwriteOutput: true
+pinotFSSpecs:
+ - scheme: file
+ className: org.apache.pinot.spi.filesystem.LocalPinotFS
+recordReaderSpec:
+ dataFormat: 'parquet'
+ className: 'org.apache.pinot.parquet.data.readers.ParquetRecordReader'
+tableSpec:
+ tableName: 'myTable'
+ schemaURI: 'http://localhost:9000/tables/myTable/schema'
+ tableConfigURI: 'http://localhost:9000/tables/myTable'
+pinotClusterSpecs:
+ - controllerURI: 'localhost:9000'
+pushJobSpec:
+
+ # pushParallelism: push job parallelism, default is 1.
+ pushParallelism: 2
+
+ # pushAttempts: number of attempts for push job, default is 1, which means no retry.
+ pushAttempts: 2
+
+ # pushRetryIntervalMillis: retry wait Ms, default to 1 second.
+ pushRetryIntervalMillis: 1000
\ No newline at end of file
diff --git a/pinot-plugins/pinot-batch-ingestion/pinot-batch-ingestion-standalone/src/main/java/org/apache/pinot/plugin/ingestion/batch/standalone/SegmentMetadataPushJobRunner.java b/pinot-plugins/pinot-batch-ingestion/pinot-batch-ingestion-standalone/src/main/java/org/apache/pinot/plugin/ingestion/batch/standalone/SegmentMetadataPushJobRunner.java
new file mode 100644
index 0000000..cad8cf6
--- /dev/null
+++ b/pinot-plugins/pinot-batch-ingestion/pinot-batch-ingestion-standalone/src/main/java/org/apache/pinot/plugin/ingestion/batch/standalone/SegmentMetadataPushJobRunner.java
@@ -0,0 +1,91 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.pinot.plugin.ingestion.batch.standalone;
+
+import java.io.File;
+import java.io.IOException;
+import java.net.URI;
+import java.net.URISyntaxException;
+import java.util.List;
+import java.util.Map;
+import org.apache.pinot.plugin.ingestion.batch.common.SegmentPushUtils;
+import org.apache.pinot.spi.env.PinotConfiguration;
+import org.apache.pinot.spi.filesystem.PinotFS;
+import org.apache.pinot.spi.filesystem.PinotFSFactory;
+import org.apache.pinot.spi.ingestion.batch.runner.IngestionJobRunner;
+import org.apache.pinot.spi.ingestion.batch.spec.PinotFSSpec;
+import org.apache.pinot.spi.ingestion.batch.spec.SegmentGenerationJobSpec;
+
+
+public class SegmentMetadataPushJobRunner implements IngestionJobRunner {
+
+ private SegmentGenerationJobSpec _spec;
+
+ public SegmentMetadataPushJobRunner() {
+ }
+
+ public SegmentMetadataPushJobRunner(SegmentGenerationJobSpec spec) {
+ init(spec);
+ }
+
+ @Override
+ public void init(SegmentGenerationJobSpec spec) {
+ _spec = spec;
+ if (_spec.getPushJobSpec() == null) {
+ throw new RuntimeException("Missing PushJobSpec");
+ }
+ }
+
+ @Override
+ public void run() {
+ //init all file systems
+ List<PinotFSSpec> pinotFSSpecs = _spec.getPinotFSSpecs();
+ for (PinotFSSpec pinotFSSpec : pinotFSSpecs) {
+ PinotFSFactory.register(pinotFSSpec.getScheme(), pinotFSSpec.getClassName(), new PinotConfiguration(pinotFSSpec));
+ }
+
+ //Get outputFS for writing output Pinot segments
+ URI outputDirURI;
+ try {
+ outputDirURI = new URI(_spec.getOutputDirURI());
+ if (outputDirURI.getScheme() == null) {
+ outputDirURI = new File(_spec.getOutputDirURI()).toURI();
+ }
+ } catch (URISyntaxException e) {
+ throw new RuntimeException("outputDirURI is not valid - '" + _spec.getOutputDirURI() + "'");
+ }
+ PinotFS outputDirFS = PinotFSFactory.create(outputDirURI.getScheme());
+
+ //Get list of files to process
+ String[] files;
+ try {
+ files = outputDirFS.listFiles(outputDirURI, true);
+ } catch (IOException e) {
+ throw new RuntimeException("Unable to list all files under outputDirURI - '" + outputDirURI + "'");
+ }
+ Map<String, String> segmentUriToTarPathMap = SegmentPushUtils
+ .getSegmentUriToTarPathMap(outputDirURI, _spec.getPushJobSpec().getSegmentUriPrefix(),
+ _spec.getPushJobSpec().getSegmentUriSuffix(), files);
+ try {
+ SegmentPushUtils.sendSegmentUriAndMetadata(_spec, outputDirFS, segmentUriToTarPathMap);
+ } catch (Exception e) {
+ throw new RuntimeException(e);
+ }
+ }
+}
diff --git a/pinot-spi/src/main/java/org/apache/pinot/spi/ingestion/batch/IngestionJobLauncher.java b/pinot-spi/src/main/java/org/apache/pinot/spi/ingestion/batch/IngestionJobLauncher.java
index 9412fc2..63ffcd7 100644
--- a/pinot-spi/src/main/java/org/apache/pinot/spi/ingestion/batch/IngestionJobLauncher.java
+++ b/pinot-spi/src/main/java/org/apache/pinot/spi/ingestion/batch/IngestionJobLauncher.java
@@ -106,6 +106,9 @@ public class IngestionJobLauncher {
case SegmentUriPush:
kickoffIngestionJob(spec, executionFramework.getSegmentUriPushJobRunnerClassName());
break;
+ case SegmentMetadataPush:
+ kickoffIngestionJob(spec, executionFramework.getSegmentMetadataPushJobRunnerClassName());
+ break;
case SegmentCreationAndTarPush:
kickoffIngestionJob(spec, executionFramework.getSegmentGenerationJobRunnerClassName());
kickoffIngestionJob(spec, executionFramework.getSegmentTarPushJobRunnerClassName());
@@ -114,6 +117,10 @@ public class IngestionJobLauncher {
kickoffIngestionJob(spec, executionFramework.getSegmentGenerationJobRunnerClassName());
kickoffIngestionJob(spec, executionFramework.getSegmentUriPushJobRunnerClassName());
break;
+ case SegmentCreationAndMetadataPush:
+ kickoffIngestionJob(spec, executionFramework.getSegmentGenerationJobRunnerClassName());
+ kickoffIngestionJob(spec, executionFramework.getSegmentMetadataPushJobRunnerClassName());
+ break;
default:
LOGGER.error("Unsupported job type - {}. Support job types: {}", spec.getJobType(),
Arrays.toString(PinotIngestionJobType.values()));
@@ -139,6 +146,6 @@ public class IngestionJobLauncher {
}
enum PinotIngestionJobType {
- SegmentCreation, SegmentTarPush, SegmentUriPush, SegmentCreationAndTarPush, SegmentCreationAndUriPush,
+ SegmentCreation, SegmentTarPush, SegmentUriPush, SegmentMetadataPush, SegmentCreationAndTarPush, SegmentCreationAndUriPush, SegmentCreationAndMetadataPush,
}
}
diff --git a/pinot-spi/src/main/java/org/apache/pinot/spi/ingestion/batch/runner/IngestionJobRunner.java b/pinot-spi/src/main/java/org/apache/pinot/spi/ingestion/batch/runner/IngestionJobRunner.java
index 51b2f6e..4917cbf 100644
--- a/pinot-spi/src/main/java/org/apache/pinot/spi/ingestion/batch/runner/IngestionJobRunner.java
+++ b/pinot-spi/src/main/java/org/apache/pinot/spi/ingestion/batch/runner/IngestionJobRunner.java
@@ -28,6 +28,7 @@ import org.apache.pinot.spi.ingestion.batch.spec.SegmentGenerationJobSpec;
* SegmentGenerationJobRunner
* SegmentTarPushJobRunner
* SegmentUriPushJobRunner
+ * SegmentMetadataPushJobRunner
*
*/
public interface IngestionJobRunner {
diff --git a/pinot-spi/src/main/java/org/apache/pinot/spi/ingestion/batch/spec/ExecutionFrameworkSpec.java b/pinot-spi/src/main/java/org/apache/pinot/spi/ingestion/batch/spec/ExecutionFrameworkSpec.java
index 7e379c7..68a11bf 100644
--- a/pinot-spi/src/main/java/org/apache/pinot/spi/ingestion/batch/spec/ExecutionFrameworkSpec.java
+++ b/pinot-spi/src/main/java/org/apache/pinot/spi/ingestion/batch/spec/ExecutionFrameworkSpec.java
@@ -32,21 +32,26 @@ public class ExecutionFrameworkSpec implements Serializable {
private String _name;
/**
- * The class implements org.apache.pinot.spi.batch.ingestion.runner.SegmentGenerationJobRunner interface.
+ * The class implements org.apache.pinot.spi.ingestion.batch.runner.IngestionJobRunner interface.
*/
private String _segmentGenerationJobRunnerClassName;
/**
- * The class implements org.apache.pinot.spi.batch.ingestion.runner.SegmentTarPushJobRunner interface.
+ * The class implements org.apache.pinot.spi.ingestion.batch.runner.IngestionJobRunner interface.
*/
private String _segmentTarPushJobRunnerClassName;
/**
- * The class implements org.apache.pinot.spi.batch.ingestion.runner.SegmentUriPushJobRunner interface.
+ * The class implements org.apache.pinot.spi.ingestion.batch.runner.IngestionJobRunner interface.
*/
private String _segmentUriPushJobRunnerClassName;
/**
+ * The class implements org.apache.pinot.spi.ingestion.batch.runner.IngestionJobRunner interface.
+ */
+ private String _segmentMetadataPushJobRunnerClassName;
+
+ /**
* Extra configs for execution framework.
*/
private Map<String, String> _extraConfigs;
@@ -90,4 +95,12 @@ public class ExecutionFrameworkSpec implements Serializable {
public void setExtraConfigs(Map<String, String> extraConfigs) {
_extraConfigs = extraConfigs;
}
+
+ public String getSegmentMetadataPushJobRunnerClassName() {
+ return _segmentMetadataPushJobRunnerClassName;
+ }
+
+ public void setSegmentMetadataPushJobRunnerClassName(String segmentMetadataPushJobRunnerClassName) {
+ _segmentMetadataPushJobRunnerClassName = segmentMetadataPushJobRunnerClassName;
+ }
}
diff --git a/pinot-spi/src/main/java/org/apache/pinot/spi/ingestion/batch/spec/SegmentGenerationJobSpec.java b/pinot-spi/src/main/java/org/apache/pinot/spi/ingestion/batch/spec/SegmentGenerationJobSpec.java
index e41d5a4..ab199bc 100644
--- a/pinot-spi/src/main/java/org/apache/pinot/spi/ingestion/batch/spec/SegmentGenerationJobSpec.java
+++ b/pinot-spi/src/main/java/org/apache/pinot/spi/ingestion/batch/spec/SegmentGenerationJobSpec.java
@@ -38,8 +38,10 @@ public class SegmentGenerationJobSpec implements Serializable {
* 'SegmentCreation'
* 'SegmentTarPush'
* 'SegmentUriPush'
+ * 'SegmentMetadataPush'
* 'SegmentCreationAndTarPush'
* 'SegmentCreationAndUriPush'
+ * 'SegmentCreationAndMetadataPush'
*/
private String _jobType;
---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscribe@pinot.apache.org
For additional commands, e-mail: commits-help@pinot.apache.org