You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@pinot.apache.org by ne...@apache.org on 2022/06/15 21:39:24 UTC
[pinot] branch master updated: Allow moveToFinalLocation in METADATA push based on config (#8823)
This is an automated email from the ASF dual-hosted git repository.
nehapawar pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/pinot.git
The following commit(s) were added to refs/heads/master by this push:
new 378bdec11c Allow moveToFinalLocation in METADATA push based on config (#8823)
378bdec11c is described below
commit 378bdec11c68a54366cd98b0f2eda5807e454e2f
Author: Neha Pawar <ne...@gmail.com>
AuthorDate: Wed Jun 15 14:39:18 2022 -0700
Allow moveToFinalLocation in METADATA push based on config (#8823)
METADATA push didn't allow the option of moveSegmentToFinalLocation. This meant that if someone had generated segments in a location that was not the deep store, there was absolutely no way to move those segments into deep store without manual scripting.
---
.../common/utils/FileUploadDownloadClient.java | 6 +
.../PinotSegmentUploadDownloadRestletResource.java | 46 +++--
.../pinot/controller/api/upload/ZKOperator.java | 79 ++++---
.../controller/api/upload/ZKOperatorTest.java | 141 ++++++++++---
.../tests/ClusterIntegrationTestUtils.java | 20 +-
.../tests/SegmentUploadIntegrationTest.java | 229 +++++++++++++++++++++
.../segment/local/utils/SegmentPushUtils.java | 4 +
.../spi/ingestion/batch/spec/PushJobSpec.java | 13 ++
8 files changed, 472 insertions(+), 66 deletions(-)
diff --git a/pinot-common/src/main/java/org/apache/pinot/common/utils/FileUploadDownloadClient.java b/pinot-common/src/main/java/org/apache/pinot/common/utils/FileUploadDownloadClient.java
index 55931077b7..767e0f16c4 100644
--- a/pinot-common/src/main/java/org/apache/pinot/common/utils/FileUploadDownloadClient.java
+++ b/pinot-common/src/main/java/org/apache/pinot/common/utils/FileUploadDownloadClient.java
@@ -73,6 +73,12 @@ public class FileUploadDownloadClient implements AutoCloseable {
public static final String UPLOAD_TYPE = "UPLOAD_TYPE";
public static final String REFRESH_ONLY = "REFRESH_ONLY";
public static final String DOWNLOAD_URI = "DOWNLOAD_URI";
+
+ /**
+ * This header is only used for METADATA push, to allow controller to copy segment to deep store,
+ * if segment was not placed in the deep store to begin with
+ */
+ public static final String COPY_SEGMENT_TO_DEEP_STORE = "COPY_SEGMENT_TO_DEEP_STORE";
public static final String SEGMENT_ZK_METADATA_CUSTOM_MAP_MODIFIER = "Pinot-SegmentZKMetadataCustomMapModifier";
public static final String CRYPTER = "CRYPTER";
}
diff --git a/pinot-controller/src/main/java/org/apache/pinot/controller/api/resources/PinotSegmentUploadDownloadRestletResource.java b/pinot-controller/src/main/java/org/apache/pinot/controller/api/resources/PinotSegmentUploadDownloadRestletResource.java
index e675b7637e..cda4e37b50 100644
--- a/pinot-controller/src/main/java/org/apache/pinot/controller/api/resources/PinotSegmentUploadDownloadRestletResource.java
+++ b/pinot-controller/src/main/java/org/apache/pinot/controller/api/resources/PinotSegmentUploadDownloadRestletResource.java
@@ -197,7 +197,7 @@ public class PinotSegmentUploadDownloadRestletResource {
}
private SuccessResponse uploadSegment(@Nullable String tableName, TableType tableType,
- @Nullable FormDataMultiPart multiPart, boolean moveSegmentToFinalLocation, boolean enableParallelPushProtection,
+ @Nullable FormDataMultiPart multiPart, boolean copySegmentToFinalLocation, boolean enableParallelPushProtection,
boolean allowRefresh, HttpHeaders headers, Request request) {
if (StringUtils.isNotEmpty(tableName)) {
TableType tableTypeFromTableName = TableNameBuilder.getTableTypeFromTableName(tableName);
@@ -213,13 +213,15 @@ public class PinotSegmentUploadDownloadRestletResource {
extractHttpHeader(headers, CommonConstants.Controller.TABLE_NAME_HTTP_HEADER);
String uploadTypeStr = extractHttpHeader(headers, FileUploadDownloadClient.CustomHeaders.UPLOAD_TYPE);
- String downloadURI = extractHttpHeader(headers, FileUploadDownloadClient.CustomHeaders.DOWNLOAD_URI);
+ String sourceDownloadURIStr = extractHttpHeader(headers, FileUploadDownloadClient.CustomHeaders.DOWNLOAD_URI);
String crypterClassNameInHeader = extractHttpHeader(headers, FileUploadDownloadClient.CustomHeaders.CRYPTER);
String ingestionDescriptor = extractHttpHeader(headers, CommonConstants.Controller.INGESTION_DESCRIPTOR);
File tempEncryptedFile = null;
File tempDecryptedFile = null;
File tempSegmentDir = null;
+ // The downloadUri for putting into segment zk metadata
+ String segmentDownloadURIStr = sourceDownloadURIStr;
try {
ControllerFilePathProvider provider = ControllerFilePathProvider.getInstance();
String tempFileName = TMP_DIR_PREFIX + UUID.randomUUID();
@@ -238,20 +240,22 @@ public class PinotSegmentUploadDownloadRestletResource {
"Segment file (as multipart/form-data) is required for SEGMENT upload mode",
Response.Status.BAD_REQUEST);
}
- if (!moveSegmentToFinalLocation && StringUtils.isEmpty(downloadURI)) {
+ if (!copySegmentToFinalLocation && StringUtils.isEmpty(sourceDownloadURIStr)) {
throw new ControllerApplicationException(LOGGER,
- "Download URI is required if segment should not be copied to the deep store",
+ "Source download URI is required in header field 'DOWNLOAD_URI' if segment should not be copied to "
+ + "the deep store",
Response.Status.BAD_REQUEST);
}
createSegmentFileFromMultipart(multiPart, destFile);
segmentSizeInBytes = destFile.length();
break;
case URI:
- if (StringUtils.isEmpty(downloadURI)) {
- throw new ControllerApplicationException(LOGGER, "Download URI is required for URI upload mode",
+ if (StringUtils.isEmpty(sourceDownloadURIStr)) {
+ throw new ControllerApplicationException(LOGGER,
+ "Source download URI is required in header field 'DOWNLOAD_URI' for URI upload mode",
Response.Status.BAD_REQUEST);
}
- downloadSegmentFileFromURI(downloadURI, destFile, tableName);
+ downloadSegmentFileFromURI(sourceDownloadURIStr, destFile, tableName);
segmentSizeInBytes = destFile.length();
break;
case METADATA:
@@ -260,14 +264,19 @@ public class PinotSegmentUploadDownloadRestletResource {
"Segment metadata file (as multipart/form-data) is required for METADATA upload mode",
Response.Status.BAD_REQUEST);
}
- if (StringUtils.isEmpty(downloadURI)) {
- throw new ControllerApplicationException(LOGGER, "Download URI is required for METADATA upload mode",
+ if (StringUtils.isEmpty(sourceDownloadURIStr)) {
+ throw new ControllerApplicationException(LOGGER,
+ "Source download URI is required in header field 'DOWNLOAD_URI' for METADATA upload mode",
Response.Status.BAD_REQUEST);
}
- moveSegmentToFinalLocation = false;
+ // override copySegmentToFinalLocation if override provided in headers:COPY_SEGMENT_TO_DEEP_STORE
+ // else set to false for backward compatibility
+ String copySegmentToDeepStore =
+ extractHttpHeader(headers, FileUploadDownloadClient.CustomHeaders.COPY_SEGMENT_TO_DEEP_STORE);
+ copySegmentToFinalLocation = Boolean.parseBoolean(copySegmentToDeepStore);
createSegmentFileFromMultipart(multiPart, destFile);
try {
- URI segmentURI = new URI(downloadURI);
+ URI segmentURI = new URI(sourceDownloadURIStr);
PinotFS pinotFS = PinotFSFactory.create(segmentURI.getScheme());
segmentSizeInBytes = pinotFS.length(segmentURI);
} catch (Exception e) {
@@ -332,24 +341,25 @@ public class PinotSegmentUploadDownloadRestletResource {
// Update download URI if controller is responsible for moving the segment to the deep store
URI finalSegmentLocationURI = null;
- if (moveSegmentToFinalLocation) {
+ if (copySegmentToFinalLocation) {
URI dataDirURI = provider.getDataDirURI();
String dataDirPath = dataDirURI.toString();
String encodedSegmentName = URIUtils.encode(segmentName);
String finalSegmentLocationPath = URIUtils.getPath(dataDirPath, rawTableName, encodedSegmentName);
if (dataDirURI.getScheme().equalsIgnoreCase(CommonConstants.Segment.LOCAL_SEGMENT_SCHEME)) {
- downloadURI = URIUtils.getPath(provider.getVip(), "segments", rawTableName, encodedSegmentName);
+ segmentDownloadURIStr = URIUtils.getPath(provider.getVip(), "segments", rawTableName, encodedSegmentName);
} else {
- downloadURI = finalSegmentLocationPath;
+ segmentDownloadURIStr = finalSegmentLocationPath;
}
finalSegmentLocationURI = URIUtils.getUri(finalSegmentLocationPath);
}
- LOGGER.info("Using download URI: {} for segment: {} of table: {} (move segment: {})", downloadURI, segmentFile,
- tableNameWithType, moveSegmentToFinalLocation);
+ LOGGER.info("Using segment download URI: {} for segment: {} of table: {} (move segment: {})",
+ segmentDownloadURIStr, segmentFile, tableNameWithType, copySegmentToFinalLocation);
ZKOperator zkOperator = new ZKOperator(_pinotHelixResourceManager, _controllerConf, _controllerMetrics);
- zkOperator.completeSegmentOperations(tableNameWithType, segmentMetadata, finalSegmentLocationURI, segmentFile,
- downloadURI, crypterName, segmentSizeInBytes, enableParallelPushProtection, allowRefresh, headers);
+ zkOperator.completeSegmentOperations(tableNameWithType, segmentMetadata, uploadType, finalSegmentLocationURI,
+ segmentFile, sourceDownloadURIStr, segmentDownloadURIStr, crypterName, segmentSizeInBytes,
+ enableParallelPushProtection, allowRefresh, headers);
return new SuccessResponse("Successfully uploaded segment: " + segmentName + " of table: " + tableNameWithType);
} catch (WebApplicationException e) {
diff --git a/pinot-controller/src/main/java/org/apache/pinot/controller/api/upload/ZKOperator.java b/pinot-controller/src/main/java/org/apache/pinot/controller/api/upload/ZKOperator.java
index b75d112ce2..62bc770846 100644
--- a/pinot-controller/src/main/java/org/apache/pinot/controller/api/upload/ZKOperator.java
+++ b/pinot-controller/src/main/java/org/apache/pinot/controller/api/upload/ZKOperator.java
@@ -18,6 +18,7 @@
*/
package org.apache.pinot.controller.api.upload;
+import com.google.common.base.Preconditions;
import java.io.File;
import java.net.URI;
import javax.annotation.Nullable;
@@ -29,6 +30,7 @@ import org.apache.pinot.common.metadata.segment.SegmentZKMetadataCustomMapModifi
import org.apache.pinot.common.metrics.ControllerMeter;
import org.apache.pinot.common.metrics.ControllerMetrics;
import org.apache.pinot.common.utils.FileUploadDownloadClient;
+import org.apache.pinot.common.utils.FileUploadDownloadClient.FileUploadType;
import org.apache.pinot.controller.ControllerConf;
import org.apache.pinot.controller.api.exception.ControllerApplicationException;
import org.apache.pinot.controller.helix.core.PinotHelixResourceManager;
@@ -60,7 +62,8 @@ public class ZKOperator {
}
public void completeSegmentOperations(String tableNameWithType, SegmentMetadata segmentMetadata,
- @Nullable URI finalSegmentLocationURI, File segmentFile, String downloadUrl, @Nullable String crypterName,
+ FileUploadType uploadType, @Nullable URI finalSegmentLocationURI, File segmentFile,
+ @Nullable String sourceDownloadURIStr, String segmentDownloadURIStr, @Nullable String crypterName,
long segmentSizeInBytes, boolean enableParallelPushProtection, boolean allowRefresh, HttpHeaders headers)
throws Exception {
String segmentName = segmentMetadata.getName();
@@ -76,8 +79,9 @@ public class ZKOperator {
Response.Status.GONE);
}
LOGGER.info("Adding new segment: {} to table: {}", segmentName, tableNameWithType);
- processNewSegment(tableNameWithType, segmentMetadata, finalSegmentLocationURI, segmentFile, downloadUrl,
- crypterName, segmentSizeInBytes, enableParallelPushProtection, headers);
+ processNewSegment(tableNameWithType, segmentMetadata, uploadType, finalSegmentLocationURI, segmentFile,
+ sourceDownloadURIStr, segmentDownloadURIStr, crypterName, segmentSizeInBytes, enableParallelPushProtection,
+ headers);
} else {
// Refresh an existing segment
if (!allowRefresh) {
@@ -89,16 +93,16 @@ public class ZKOperator {
tableNameWithType), Response.Status.CONFLICT);
}
LOGGER.info("Segment: {} already exists in table: {}, refreshing it", segmentName, tableNameWithType);
- processExistingSegment(tableNameWithType, segmentMetadata, existingSegmentMetadataZNRecord,
- finalSegmentLocationURI, segmentFile, downloadUrl, crypterName, segmentSizeInBytes,
- enableParallelPushProtection, headers);
+ processExistingSegment(tableNameWithType, segmentMetadata, uploadType, existingSegmentMetadataZNRecord,
+ finalSegmentLocationURI, segmentFile, sourceDownloadURIStr, segmentDownloadURIStr, crypterName,
+ segmentSizeInBytes, enableParallelPushProtection, headers);
}
}
private void processExistingSegment(String tableNameWithType, SegmentMetadata segmentMetadata,
- ZNRecord existingSegmentMetadataZNRecord, @Nullable URI finalSegmentLocationURI, File segmentFile,
- String downloadUrl, @Nullable String crypterName, long segmentSizeInBytes, boolean enableParallelPushProtection,
- HttpHeaders headers)
+ FileUploadType uploadType, ZNRecord existingSegmentMetadataZNRecord, @Nullable URI finalSegmentLocationURI,
+ File segmentFile, @Nullable String sourceDownloadURIStr, String segmentDownloadURIStr,
+ @Nullable String crypterName, long segmentSizeInBytes, boolean enableParallelPushProtection, HttpHeaders headers)
throws Exception {
String segmentName = segmentMetadata.getName();
int expectedVersion = existingSegmentMetadataZNRecord.getVersion();
@@ -179,8 +183,7 @@ public class ZKOperator {
"New segment crc {} is different than the existing segment crc {}. Updating ZK metadata and refreshing "
+ "segment {}", newCrc, existingCrc, segmentName);
if (finalSegmentLocationURI != null) {
- moveSegmentToPermanentDirectory(segmentFile, finalSegmentLocationURI);
- LOGGER.info("Moved segment: {} of table: {} to final location: {}", segmentName, tableNameWithType,
+ copySegmentToDeepStore(tableNameWithType, segmentName, uploadType, segmentFile, sourceDownloadURIStr,
finalSegmentLocationURI);
}
@@ -191,12 +194,12 @@ public class ZKOperator {
if (customMapModifier == null) {
// If no modifier is provided, use the custom map from the segment metadata
segmentZKMetadata.setCustomMap(null);
- ZKMetadataUtils.refreshSegmentZKMetadata(tableNameWithType, segmentZKMetadata, segmentMetadata, downloadUrl,
- crypterName, segmentSizeInBytes);
+ ZKMetadataUtils.refreshSegmentZKMetadata(tableNameWithType, segmentZKMetadata, segmentMetadata,
+ segmentDownloadURIStr, crypterName, segmentSizeInBytes);
} else {
// If modifier is provided, first set the custom map from the segment metadata, then apply the modifier
- ZKMetadataUtils.refreshSegmentZKMetadata(tableNameWithType, segmentZKMetadata, segmentMetadata, downloadUrl,
- crypterName, segmentSizeInBytes);
+ ZKMetadataUtils.refreshSegmentZKMetadata(tableNameWithType, segmentZKMetadata, segmentMetadata,
+ segmentDownloadURIStr, crypterName, segmentSizeInBytes);
segmentZKMetadata.setCustomMap(customMapModifier.modifyMap(segmentZKMetadata.getCustomMap()));
}
if (!_pinotHelixResourceManager.updateZkMetadata(tableNameWithType, segmentZKMetadata, expectedVersion)) {
@@ -237,16 +240,17 @@ public class ZKOperator {
}
}
- private void processNewSegment(String tableNameWithType, SegmentMetadata segmentMetadata,
- @Nullable URI finalSegmentLocationURI, File segmentFile, String downloadUrl, @Nullable String crypterName,
- long segmentSizeInBytes, boolean enableParallelPushProtection, HttpHeaders headers)
+ private void processNewSegment(String tableNameWithType, SegmentMetadata segmentMetadata, FileUploadType uploadType,
+ @Nullable URI finalSegmentLocationURI, File segmentFile, @Nullable String sourceDownloadURIStr,
+ String segmentDownloadURIStr, @Nullable String crypterName, long segmentSizeInBytes,
+ boolean enableParallelPushProtection, HttpHeaders headers)
throws Exception {
String segmentName = segmentMetadata.getName();
SegmentZKMetadata newSegmentZKMetadata;
try {
newSegmentZKMetadata =
- ZKMetadataUtils.createSegmentZKMetadata(tableNameWithType, segmentMetadata, downloadUrl, crypterName,
- segmentSizeInBytes);
+ ZKMetadataUtils.createSegmentZKMetadata(tableNameWithType, segmentMetadata, segmentDownloadURIStr,
+ crypterName, segmentSizeInBytes);
} catch (IllegalArgumentException e) {
throw new ControllerApplicationException(LOGGER,
String.format("Got invalid segment metadata when adding segment: %s for table: %s, reason: %s", segmentName,
@@ -274,8 +278,7 @@ public class ZKOperator {
if (finalSegmentLocationURI != null) {
try {
- moveSegmentToPermanentDirectory(segmentFile, finalSegmentLocationURI);
- LOGGER.info("Moved segment: {} of table: {} to final location: {}", segmentName, tableNameWithType,
+ copySegmentToDeepStore(tableNameWithType, segmentName, uploadType, segmentFile, sourceDownloadURIStr,
finalSegmentLocationURI);
} catch (Exception e) {
// Cleanup the Zk entry and the segment from the permanent directory if it exists.
@@ -310,9 +313,39 @@ public class ZKOperator {
}
}
- private void moveSegmentToPermanentDirectory(File segmentFile, URI finalSegmentLocationURI)
+ private void copySegmentToDeepStore(String tableNameWithType, String segmentName, FileUploadType uploadType,
+ File segmentFile, String sourceDownloadURIStr, URI finalSegmentLocationURI)
+ throws Exception {
+ if (uploadType == FileUploadType.METADATA) {
+ // In Metadata push, local segmentFile only contains metadata.
+ // Copy segment over from sourceDownloadURI to final location.
+ copyFromSegmentURIToDeepStore(new URI(sourceDownloadURIStr), finalSegmentLocationURI);
+ LOGGER.info("Copied segment: {} of table: {} to final location: {}", segmentName, tableNameWithType,
+ finalSegmentLocationURI);
+ } else {
+ // In push types other than METADATA, local segmentFile contains the complete segment.
+ // Move local segment to final location
+ copyFromSegmentFileToDeepStore(segmentFile, finalSegmentLocationURI);
+ LOGGER.info("Copied segment: {} of table: {} to final location: {}", segmentName, tableNameWithType,
+ finalSegmentLocationURI);
+ }
+ }
+
+ private void copyFromSegmentFileToDeepStore(File segmentFile, URI finalSegmentLocationURI)
throws Exception {
LOGGER.info("Copying segment from: {} to: {}", segmentFile.getAbsolutePath(), finalSegmentLocationURI);
PinotFSFactory.create(finalSegmentLocationURI.getScheme()).copyFromLocalFile(segmentFile, finalSegmentLocationURI);
}
+
+ private void copyFromSegmentURIToDeepStore(URI sourceDownloadURI, URI finalSegmentLocationURI)
+ throws Exception {
+ if (sourceDownloadURI.equals(finalSegmentLocationURI)) {
+ LOGGER.info("Skip copying segment as sourceDownloadURI: {} is the same as finalSegmentLocationURI",
+ sourceDownloadURI);
+ } else {
+ Preconditions.checkState(sourceDownloadURI.getScheme().equals(finalSegmentLocationURI.getScheme()));
+ LOGGER.info("Copying segment from: {} to: {}", sourceDownloadURI, finalSegmentLocationURI);
+ PinotFSFactory.create(finalSegmentLocationURI.getScheme()).copy(sourceDownloadURI, finalSegmentLocationURI);
+ }
+ }
}
diff --git a/pinot-controller/src/test/java/org/apache/pinot/controller/api/upload/ZKOperatorTest.java b/pinot-controller/src/test/java/org/apache/pinot/controller/api/upload/ZKOperatorTest.java
index 28c33eb256..ef9910677d 100644
--- a/pinot-controller/src/test/java/org/apache/pinot/controller/api/upload/ZKOperatorTest.java
+++ b/pinot-controller/src/test/java/org/apache/pinot/controller/api/upload/ZKOperatorTest.java
@@ -18,29 +18,39 @@
*/
package org.apache.pinot.controller.api.upload;
+import com.google.common.collect.ImmutableList;
import java.io.File;
import java.net.URI;
import java.util.HashMap;
+import java.util.List;
import java.util.Map;
import javax.ws.rs.core.HttpHeaders;
import javax.ws.rs.core.Response;
+import org.apache.commons.io.FileUtils;
import org.apache.pinot.common.metadata.segment.SegmentZKMetadata;
import org.apache.pinot.common.metrics.ControllerMetrics;
+import org.apache.pinot.common.utils.FileUploadDownloadClient.FileUploadType;
import org.apache.pinot.common.utils.LLCSegmentName;
+import org.apache.pinot.common.utils.TarGzCompressionUtils;
import org.apache.pinot.common.utils.URIUtils;
import org.apache.pinot.controller.ControllerConf;
import org.apache.pinot.controller.api.exception.ControllerApplicationException;
import org.apache.pinot.controller.helix.ControllerTest;
import org.apache.pinot.controller.helix.core.PinotHelixResourceManager;
+import org.apache.pinot.segment.local.segment.creator.impl.SegmentIndexCreationDriverImpl;
+import org.apache.pinot.segment.local.segment.readers.GenericRowRecordReader;
import org.apache.pinot.segment.spi.SegmentMetadata;
+import org.apache.pinot.segment.spi.creator.SegmentGeneratorConfig;
import org.apache.pinot.spi.config.table.TableConfig;
import org.apache.pinot.spi.config.table.TableType;
import org.apache.pinot.spi.data.FieldSpec.DataType;
import org.apache.pinot.spi.data.Schema;
+import org.apache.pinot.spi.data.readers.GenericRow;
import org.apache.pinot.spi.utils.CommonConstants.Segment.Realtime.Status;
import org.apache.pinot.spi.utils.builder.TableConfigBuilder;
import org.apache.pinot.spi.utils.builder.TableNameBuilder;
import org.apache.pinot.util.TestUtils;
+import org.testng.Assert;
import org.testng.annotations.AfterClass;
import org.testng.annotations.BeforeClass;
import org.testng.annotations.Test;
@@ -51,6 +61,9 @@ import static org.testng.Assert.*;
public class ZKOperatorTest {
+ private static final File TEMP_DIR = new File(FileUtils.getTempDirectory(), "ZKOperatorTest");
+ private static final File SEGMENT_DIR = new File(TEMP_DIR, "segmentDir");
+ private static final File DATA_DIR = new File(TEMP_DIR, "dataDir");
private static final String RAW_TABLE_NAME = "testTable";
private static final String OFFLINE_TABLE_NAME = TableNameBuilder.OFFLINE.tableNameWithType(RAW_TABLE_NAME);
private static final String REALTIME_TABLE_NAME = TableNameBuilder.REALTIME.tableNameWithType(RAW_TABLE_NAME);
@@ -66,6 +79,7 @@ public class ZKOperatorTest {
@BeforeClass
public void setUp()
throws Exception {
+ FileUtils.deleteQuietly(TEMP_DIR);
TEST_INSTANCE.setupSharedStateAndValidate();
_resourceManager = TEST_INSTANCE.getHelixResourceManager();
@@ -93,6 +107,87 @@ public class ZKOperatorTest {
return streamConfigs;
}
+ private File generateSegment()
+ throws Exception {
+ Schema schema =
+ new Schema.SchemaBuilder().setSchemaName(RAW_TABLE_NAME).addSingleValueDimension("colA", DataType.INT).build();
+ TableConfig tableConfig = new TableConfigBuilder(TableType.OFFLINE).setTableName(RAW_TABLE_NAME).build();
+
+ SegmentGeneratorConfig config = new SegmentGeneratorConfig(tableConfig, schema);
+ File outputDir = new File(SEGMENT_DIR, "segment");
+ config.setOutDir(outputDir.getAbsolutePath());
+ config.setSegmentName(SEGMENT_NAME);
+ GenericRow row = new GenericRow();
+ row.putValue("colA", "100");
+ List<GenericRow> rows = ImmutableList.of(row);
+
+ SegmentIndexCreationDriverImpl driver = new SegmentIndexCreationDriverImpl();
+ driver.init(config, new GenericRowRecordReader(rows));
+ driver.build();
+ File segmentTar = new File(SEGMENT_DIR, SEGMENT_NAME + TarGzCompressionUtils.TAR_GZ_FILE_EXTENSION);
+ TarGzCompressionUtils.createTarGzFile(new File(outputDir, SEGMENT_NAME),
+ new File(SEGMENT_DIR, SEGMENT_NAME + TarGzCompressionUtils.TAR_GZ_FILE_EXTENSION));
+ FileUtils.deleteQuietly(outputDir);
+ return segmentTar;
+ }
+
+ private void checkSegmentZkMetadata(String segmentName, long crc, long creationTime) {
+ SegmentZKMetadata segmentZKMetadata = _resourceManager.getSegmentZKMetadata(OFFLINE_TABLE_NAME, segmentName);
+ assertNotNull(segmentZKMetadata);
+ assertEquals(segmentZKMetadata.getCrc(), crc);
+ assertEquals(segmentZKMetadata.getCreationTime(), creationTime);
+ long pushTime = segmentZKMetadata.getPushTime();
+ assertTrue(pushTime > 0);
+ assertEquals(segmentZKMetadata.getRefreshTime(), Long.MIN_VALUE);
+ assertEquals(segmentZKMetadata.getDownloadUrl(), "downloadUrl");
+ assertEquals(segmentZKMetadata.getCrypterName(), "crypter");
+ assertEquals(segmentZKMetadata.getSegmentUploadStartTime(), -1);
+ assertEquals(segmentZKMetadata.getSizeInBytes(), 10);
+ }
+
+ @Test
+ public void testMetadataUploadType()
+ throws Exception {
+ String segmentName = "metadataTest";
+ FileUtils.deleteQuietly(TEMP_DIR);
+ ZKOperator zkOperator = new ZKOperator(_resourceManager, mock(ControllerConf.class), mock(ControllerMetrics.class));
+
+ SegmentMetadata segmentMetadata = mock(SegmentMetadata.class);
+ when(segmentMetadata.getName()).thenReturn(segmentName);
+ when(segmentMetadata.getCrc()).thenReturn("12345");
+ when(segmentMetadata.getIndexCreationTime()).thenReturn(123L);
+ HttpHeaders httpHeaders = mock(HttpHeaders.class);
+
+ File segmentTar = generateSegment();
+ String sourceDownloadURIStr = segmentTar.toURI().toString();
+ File segmentFile = new File("metadataOnly");
+
+ // with finalSegmentLocation not null
+ File finalSegmentLocation = new File(DATA_DIR, segmentName);
+ Assert.assertFalse(finalSegmentLocation.exists());
+ zkOperator.completeSegmentOperations(OFFLINE_TABLE_NAME, segmentMetadata, FileUploadType.METADATA,
+ finalSegmentLocation.toURI(), segmentFile, sourceDownloadURIStr, "downloadUrl", "crypter", 10, true, true,
+ httpHeaders);
+ Assert.assertTrue(finalSegmentLocation.exists());
+ Assert.assertTrue(segmentTar.exists());
+ checkSegmentZkMetadata(segmentName, 12345L, 123L);
+
+ _resourceManager.deleteSegment(OFFLINE_TABLE_NAME, segmentName);
+ // Wait for the segment Zk entry to be deleted.
+ TestUtils.waitForCondition(aVoid -> {
+ SegmentZKMetadata segmentZKMetadata = _resourceManager.getSegmentZKMetadata(OFFLINE_TABLE_NAME, segmentName);
+ return segmentZKMetadata == null;
+ }, 30_000L, "Failed to delete segmentZkMetadata.");
+
+ FileUtils.deleteQuietly(DATA_DIR);
+ // with finalSegmentLocation null
+ zkOperator.completeSegmentOperations(OFFLINE_TABLE_NAME, segmentMetadata, FileUploadType.METADATA, null,
+ segmentFile, sourceDownloadURIStr, "downloadUrl", "crypter", 10, true, true, httpHeaders);
+ Assert.assertFalse(finalSegmentLocation.exists());
+ Assert.assertTrue(segmentTar.exists());
+ checkSegmentZkMetadata(segmentName, 12345L, 123L);
+ }
+
@Test
public void testCompleteSegmentOperations()
throws Exception {
@@ -110,9 +205,8 @@ public class ZKOperatorTest {
URI finalSegmentLocationURI =
URIUtils.getUri("mockPath", OFFLINE_TABLE_NAME, URIUtils.encode(segmentMetadata.getName()));
File segmentFile = new File(new File("foo/bar"), "mockChild");
-
- zkOperator.completeSegmentOperations(OFFLINE_TABLE_NAME, segmentMetadata, finalSegmentLocationURI, segmentFile,
- "downloadUrl", "crypter", 10, true, true, httpHeaders);
+ zkOperator.completeSegmentOperations(OFFLINE_TABLE_NAME, segmentMetadata, FileUploadType.SEGMENT,
+ finalSegmentLocationURI, segmentFile, "downloadUrl", "downloadUrl", "crypter", 10, true, true, httpHeaders);
fail();
} catch (Exception e) {
// Expected
@@ -124,8 +218,8 @@ public class ZKOperatorTest {
return segmentZKMetadata == null;
}, 30_000L, "Failed to delete segmentZkMetadata.");
- zkOperator.completeSegmentOperations(OFFLINE_TABLE_NAME, segmentMetadata, null, null, "downloadUrl", "crypter", 10,
- true, true, httpHeaders);
+ zkOperator.completeSegmentOperations(OFFLINE_TABLE_NAME, segmentMetadata, FileUploadType.SEGMENT, null, null,
+ "downloadUrl", "downloadUrl", "crypter", 10, true, true, httpHeaders);
SegmentZKMetadata segmentZKMetadata = _resourceManager.getSegmentZKMetadata(OFFLINE_TABLE_NAME, SEGMENT_NAME);
assertNotNull(segmentZKMetadata);
@@ -141,8 +235,8 @@ public class ZKOperatorTest {
// Upload the same segment with allowRefresh = false. Validate that an exception is thrown.
try {
- zkOperator.completeSegmentOperations(OFFLINE_TABLE_NAME, segmentMetadata, null, null, "otherDownloadUrl",
- "otherCrypter", 10, true, false, httpHeaders);
+ zkOperator.completeSegmentOperations(OFFLINE_TABLE_NAME, segmentMetadata, FileUploadType.SEGMENT, null, null,
+ "otherDownloadUrl", "otherDownloadUrl", "otherCrypter", 10, true, false, httpHeaders);
fail();
} catch (Exception e) {
// Expected
@@ -151,8 +245,8 @@ public class ZKOperatorTest {
// Refresh the segment with unmatched IF_MATCH field
when(httpHeaders.getHeaderString(HttpHeaders.IF_MATCH)).thenReturn("123");
try {
- zkOperator.completeSegmentOperations(OFFLINE_TABLE_NAME, segmentMetadata, null, null, "otherDownloadUrl",
- "otherCrypter", 10, true, true, httpHeaders);
+ zkOperator.completeSegmentOperations(OFFLINE_TABLE_NAME, segmentMetadata, FileUploadType.SEGMENT, null, null,
+ "otherDownloadUrl", "otherDownloadUrl", "otherCrypter", 10, true, true, httpHeaders);
fail();
} catch (Exception e) {
// Expected
@@ -162,8 +256,8 @@ public class ZKOperatorTest {
// downloadURL and crypter
when(httpHeaders.getHeaderString(HttpHeaders.IF_MATCH)).thenReturn("12345");
when(segmentMetadata.getIndexCreationTime()).thenReturn(456L);
- zkOperator.completeSegmentOperations(OFFLINE_TABLE_NAME, segmentMetadata, null, null, "otherDownloadUrl",
- "otherCrypter", 10, true, true, httpHeaders);
+ zkOperator.completeSegmentOperations(OFFLINE_TABLE_NAME, segmentMetadata, FileUploadType.SEGMENT, null, null,
+ "otherDownloadUrl", "otherDownloadUrl", "otherCrypter", 10, true, true, httpHeaders);
segmentZKMetadata = _resourceManager.getSegmentZKMetadata(OFFLINE_TABLE_NAME, SEGMENT_NAME);
assertNotNull(segmentZKMetadata);
@@ -185,8 +279,8 @@ public class ZKOperatorTest {
when(segmentMetadata.getIndexCreationTime()).thenReturn(789L);
// Add a tiny sleep to guarantee that refresh time is different from the previous round
Thread.sleep(10);
- zkOperator.completeSegmentOperations(OFFLINE_TABLE_NAME, segmentMetadata, null, null, "otherDownloadUrl",
- "otherCrypter", 100, true, true, httpHeaders);
+ zkOperator.completeSegmentOperations(OFFLINE_TABLE_NAME, segmentMetadata, FileUploadType.SEGMENT, null, null,
+ "otherDownloadUrl", "otherDownloadUrl", "otherCrypter", 100, true, true, httpHeaders);
segmentZKMetadata = _resourceManager.getSegmentZKMetadata(OFFLINE_TABLE_NAME, SEGMENT_NAME);
assertNotNull(segmentZKMetadata);
@@ -209,8 +303,8 @@ public class ZKOperatorTest {
SegmentMetadata segmentMetadata = mock(SegmentMetadata.class);
when(segmentMetadata.getName()).thenReturn(SEGMENT_NAME);
when(segmentMetadata.getCrc()).thenReturn("12345");
- zkOperator.completeSegmentOperations(REALTIME_TABLE_NAME, segmentMetadata, null, null, "downloadUrl", null, 10,
- true, true, mock(HttpHeaders.class));
+ zkOperator.completeSegmentOperations(REALTIME_TABLE_NAME, segmentMetadata, FileUploadType.SEGMENT, null, null,
+ "downloadUrl", "downloadUrl", null, 10, true, true, mock(HttpHeaders.class));
SegmentZKMetadata segmentZKMetadata = _resourceManager.getSegmentZKMetadata(REALTIME_TABLE_NAME, SEGMENT_NAME);
assertNotNull(segmentZKMetadata);
@@ -222,8 +316,8 @@ public class ZKOperatorTest {
when(segmentMetadata.getName()).thenReturn(LLC_SEGMENT_NAME);
when(segmentMetadata.getCrc()).thenReturn("23456");
try {
- zkOperator.completeSegmentOperations(REALTIME_TABLE_NAME, segmentMetadata, null, null, "downloadUrl", null, 10,
- true, true, mock(HttpHeaders.class));
+ zkOperator.completeSegmentOperations(REALTIME_TABLE_NAME, segmentMetadata, FileUploadType.SEGMENT, null, null,
+ "downloadUrl", "downloadUrl", null, 10, true, true, mock(HttpHeaders.class));
fail();
} catch (ControllerApplicationException e) {
assertEquals(e.getResponse().getStatus(), Response.Status.BAD_REQUEST.getStatusCode());
@@ -233,8 +327,8 @@ public class ZKOperatorTest {
// Uploading a segment with LLC segment name and start/end offset should success
when(segmentMetadata.getStartOffset()).thenReturn("0");
when(segmentMetadata.getEndOffset()).thenReturn("1234");
- zkOperator.completeSegmentOperations(REALTIME_TABLE_NAME, segmentMetadata, null, null, "downloadUrl", null, 10,
- true, true, mock(HttpHeaders.class));
+ zkOperator.completeSegmentOperations(REALTIME_TABLE_NAME, segmentMetadata, FileUploadType.SEGMENT, null, null,
+ "downloadUrl", "downloadUrl", null, 10, true, true, mock(HttpHeaders.class));
segmentZKMetadata = _resourceManager.getSegmentZKMetadata(REALTIME_TABLE_NAME, LLC_SEGMENT_NAME);
assertNotNull(segmentZKMetadata);
@@ -246,8 +340,8 @@ public class ZKOperatorTest {
when(segmentMetadata.getCrc()).thenReturn("34567");
when(segmentMetadata.getStartOffset()).thenReturn(null);
when(segmentMetadata.getEndOffset()).thenReturn(null);
- zkOperator.completeSegmentOperations(REALTIME_TABLE_NAME, segmentMetadata, null, null, "downloadUrl", null, 10,
- true, true, mock(HttpHeaders.class));
+ zkOperator.completeSegmentOperations(REALTIME_TABLE_NAME, segmentMetadata, FileUploadType.SEGMENT, null, null,
+ "downloadUrl", "downloadUrl", null, 10, true, true, mock(HttpHeaders.class));
segmentZKMetadata = _resourceManager.getSegmentZKMetadata(REALTIME_TABLE_NAME, LLC_SEGMENT_NAME);
assertNotNull(segmentZKMetadata);
@@ -259,8 +353,8 @@ public class ZKOperatorTest {
when(segmentMetadata.getCrc()).thenReturn("45678");
when(segmentMetadata.getStartOffset()).thenReturn("1234");
when(segmentMetadata.getEndOffset()).thenReturn("2345");
- zkOperator.completeSegmentOperations(REALTIME_TABLE_NAME, segmentMetadata, null, null, "downloadUrl", null, 10,
- true, true, mock(HttpHeaders.class));
+ zkOperator.completeSegmentOperations(REALTIME_TABLE_NAME, segmentMetadata, FileUploadType.SEGMENT, null, null,
+ "downloadUrl", "downloadUrl", null, 10, true, true, mock(HttpHeaders.class));
segmentZKMetadata = _resourceManager.getSegmentZKMetadata(REALTIME_TABLE_NAME, LLC_SEGMENT_NAME);
assertNotNull(segmentZKMetadata);
@@ -271,6 +365,7 @@ public class ZKOperatorTest {
@AfterClass
public void tearDown() {
+ FileUtils.deleteQuietly(TEMP_DIR);
TEST_INSTANCE.cleanup();
}
}
diff --git a/pinot-integration-test-base/src/test/java/org/apache/pinot/integration/tests/ClusterIntegrationTestUtils.java b/pinot-integration-test-base/src/test/java/org/apache/pinot/integration/tests/ClusterIntegrationTestUtils.java
index fbea4e6220..21abfd6430 100644
--- a/pinot-integration-test-base/src/test/java/org/apache/pinot/integration/tests/ClusterIntegrationTestUtils.java
+++ b/pinot-integration-test-base/src/test/java/org/apache/pinot/integration/tests/ClusterIntegrationTestUtils.java
@@ -283,12 +283,28 @@ public class ClusterIntegrationTestUtils {
public static void buildSegmentFromAvro(File avroFile, TableConfig tableConfig,
org.apache.pinot.spi.data.Schema schema, int segmentIndex, File segmentDir, File tarDir)
throws Exception {
+ // Test segment with space and special character in the file name
+ buildSegmentFromAvro(avroFile, tableConfig, schema, segmentIndex + " %", segmentDir, tarDir);
+ }
+
+ /**
+ * Builds one Pinot segment from the given Avro file.
+ *
+ * @param avroFile Avro file
+ * @param tableConfig Pinot table config
+ * @param schema Pinot schema
+ * @param segmentNamePostfix Segment name postfix
+ * @param segmentDir Output directory for the un-tarred segments
+ * @param tarDir Output directory for the tarred segments
+ */
+ public static void buildSegmentFromAvro(File avroFile, TableConfig tableConfig,
+ org.apache.pinot.spi.data.Schema schema, String segmentNamePostfix, File segmentDir, File tarDir)
+ throws Exception {
SegmentGeneratorConfig segmentGeneratorConfig = new SegmentGeneratorConfig(tableConfig, schema);
segmentGeneratorConfig.setInputFilePath(avroFile.getPath());
segmentGeneratorConfig.setOutDir(segmentDir.getPath());
segmentGeneratorConfig.setTableName(tableConfig.getTableName());
- // Test segment with space and special character in the file name
- segmentGeneratorConfig.setSegmentNamePostfix(segmentIndex + " %");
+ segmentGeneratorConfig.setSegmentNamePostfix(segmentNamePostfix);
// Build the segment
SegmentIndexCreationDriver driver = new SegmentIndexCreationDriverImpl();
diff --git a/pinot-integration-tests/src/test/java/org/apache/pinot/integration/tests/SegmentUploadIntegrationTest.java b/pinot-integration-tests/src/test/java/org/apache/pinot/integration/tests/SegmentUploadIntegrationTest.java
new file mode 100644
index 0000000000..ae9d3989b6
--- /dev/null
+++ b/pinot-integration-tests/src/test/java/org/apache/pinot/integration/tests/SegmentUploadIntegrationTest.java
@@ -0,0 +1,229 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.pinot.integration.tests;
+
+import com.fasterxml.jackson.databind.JsonNode;
+import com.google.common.base.Function;
+import com.google.common.collect.Lists;
+import java.io.File;
+import java.io.IOException;
+import java.util.List;
+import java.util.Map;
+import javax.annotation.Nullable;
+import org.apache.commons.io.FileUtils;
+import org.apache.pinot.plugin.ingestion.batch.standalone.SegmentMetadataPushJobRunner;
+import org.apache.pinot.spi.config.table.TableConfig;
+import org.apache.pinot.spi.config.table.TableType;
+import org.apache.pinot.spi.data.Schema;
+import org.apache.pinot.spi.ingestion.batch.spec.PinotClusterSpec;
+import org.apache.pinot.spi.ingestion.batch.spec.PinotFSSpec;
+import org.apache.pinot.spi.ingestion.batch.spec.PushJobSpec;
+import org.apache.pinot.spi.ingestion.batch.spec.SegmentGenerationJobSpec;
+import org.apache.pinot.spi.ingestion.batch.spec.TableSpec;
+import org.apache.pinot.spi.utils.JsonUtils;
+import org.apache.pinot.util.TestUtils;
+import org.testng.Assert;
+import org.testng.annotations.AfterClass;
+import org.testng.annotations.BeforeClass;
+import org.testng.annotations.Test;
+
+
+/**
+ * Test for advanced push types.
+ * Currently only tests METADATA push type.
+ * todo: add test for URI push
+ */
+public class SegmentUploadIntegrationTest extends BaseClusterIntegrationTest {
+
+ @Override
+ protected Map<String, String> getStreamConfigs() {
+ return null;
+ }
+
+ @Override
+ protected String getSortedColumn() {
+ return null;
+ }
+
+ @Override
+ protected List<String> getInvertedIndexColumns() {
+ return null;
+ }
+
+ @Override
+ protected List<String> getNoDictionaryColumns() {
+ return null;
+ }
+
+ @Override
+ protected List<String> getRangeIndexColumns() {
+ return null;
+ }
+
+ @Override
+ protected List<String> getBloomFilterColumns() {
+ return null;
+ }
+
+ @BeforeClass
+ public void setUp()
+ throws Exception {
+ TestUtils.ensureDirectoriesExistAndEmpty(_tempDir, _segmentDir, _tarDir);
+ // Start Zk and Kafka
+ startZk();
+
+ // Start the Pinot cluster
+ startController();
+ startBroker();
+ startServer();
+ }
+
+ @Test
+ public void testUploadAndQuery()
+ throws Exception {
+ // Create and upload the schema and table config
+ Schema schema = createSchema();
+ addSchema(schema);
+ TableConfig offlineTableConfig = createOfflineTableConfig();
+ addTableConfig(offlineTableConfig);
+
+ List<File> avroFiles = getAllAvroFiles();
+
+ // Create 1 segment, for METADATA push WITH move to final location
+ ClusterIntegrationTestUtils.buildSegmentFromAvro(avroFiles.get(0), offlineTableConfig, schema, "_with_move",
+ _segmentDir, _tarDir);
+
+ SegmentMetadataPushJobRunner runner = new SegmentMetadataPushJobRunner();
+ SegmentGenerationJobSpec jobSpec = new SegmentGenerationJobSpec();
+ PushJobSpec pushJobSpec = new PushJobSpec();
+ // set moveToDeepStoreForMetadataPush to true
+ pushJobSpec.setCopyToDeepStoreForMetadataPush(true);
+ jobSpec.setPushJobSpec(pushJobSpec);
+ PinotFSSpec fsSpec = new PinotFSSpec();
+ fsSpec.setScheme("file");
+ fsSpec.setClassName("org.apache.pinot.spi.filesystem.LocalPinotFS");
+ jobSpec.setPinotFSSpecs(Lists.newArrayList(fsSpec));
+ jobSpec.setOutputDirURI(_tarDir.getAbsolutePath());
+ TableSpec tableSpec = new TableSpec();
+ tableSpec.setTableName(DEFAULT_TABLE_NAME);
+ jobSpec.setTableSpec(tableSpec);
+ PinotClusterSpec clusterSpec = new PinotClusterSpec();
+ clusterSpec.setControllerURI(_controllerBaseApiUrl);
+ jobSpec.setPinotClusterSpecs(new PinotClusterSpec[]{clusterSpec});
+
+ File dataDir = new File(_controllerConfig.getDataDir());
+ File dataDirSegments = new File(dataDir, DEFAULT_TABLE_NAME);
+
+ // Not present in dataDir, only present in sourceDir
+ Assert.assertFalse(dataDirSegments.exists());
+ Assert.assertEquals(_tarDir.listFiles().length, 1);
+
+ runner.init(jobSpec);
+ runner.run();
+
+ // Segment should be seen in dataDir
+ Assert.assertTrue(dataDirSegments.exists());
+ Assert.assertEquals(dataDirSegments.listFiles().length, 1);
+ Assert.assertEquals(_tarDir.listFiles().length, 1);
+
+ // test segment loaded
+ JsonNode segmentsList = getSegmentsList();
+ Assert.assertEquals(segmentsList.size(), 1);
+ String segmentNameWithMove = segmentsList.get(0).asText();
+ Assert.assertTrue(segmentNameWithMove.endsWith("_with_move"));
+ long numDocs = getNumDocs(segmentNameWithMove);
+ testCountStar(numDocs);
+
+ // Clear segment and tar dir
+ for (File segment : _segmentDir.listFiles()) {
+ FileUtils.deleteQuietly(segment);
+ }
+ for (File tar : _tarDir.listFiles()) {
+ FileUtils.deleteQuietly(tar);
+ }
+
+ // Create 1 segment, for METADATA push WITHOUT move to final location
+ ClusterIntegrationTestUtils.buildSegmentFromAvro(avroFiles.get(1), offlineTableConfig, schema, "_without_move",
+ _segmentDir, _tarDir);
+ jobSpec.setPushJobSpec(new PushJobSpec());
+ runner = new SegmentMetadataPushJobRunner();
+
+ Assert.assertEquals(dataDirSegments.listFiles().length, 1);
+ Assert.assertEquals(_tarDir.listFiles().length, 1);
+
+ runner.init(jobSpec);
+ runner.run();
+
+ // should not see new segments in dataDir
+ Assert.assertEquals(dataDirSegments.listFiles().length, 1);
+ Assert.assertEquals(_tarDir.listFiles().length, 1);
+
+ // test segment loaded
+ segmentsList = getSegmentsList();
+ Assert.assertEquals(segmentsList.size(), 2);
+ String segmentNameWithoutMove = null;
+ for (JsonNode segment : segmentsList) {
+ if (segment.asText().endsWith("_without_move")) {
+ segmentNameWithoutMove = segment.asText();
+ }
+ }
+ Assert.assertNotNull(segmentNameWithoutMove);
+ numDocs += getNumDocs(segmentNameWithoutMove);
+ testCountStar(numDocs);
+ }
+
+ private long getNumDocs(String segmentName)
+ throws IOException {
+ return JsonUtils.stringToJsonNode(
+ sendGetRequest(_controllerRequestURLBuilder.forSegmentMetadata(DEFAULT_TABLE_NAME, segmentName)))
+ .get("segment.total.docs").asLong();
+ }
+
+ private JsonNode getSegmentsList()
+ throws IOException {
+ return JsonUtils.stringToJsonNode(sendGetRequest(
+ _controllerRequestURLBuilder.forSegmentListAPIWithTableType(DEFAULT_TABLE_NAME,
+ TableType.OFFLINE.toString())))
+ .get(0).get("OFFLINE");
+ }
+
+ protected void testCountStar(final long countStarResult) {
+ TestUtils.waitForCondition(new Function<Void, Boolean>() {
+ @Nullable
+ @Override
+ public Boolean apply(@Nullable Void aVoid) {
+ try {
+ return getCurrentCountStarResult() == countStarResult;
+ } catch (Exception e) {
+ return null;
+ }
+ }
+ }, 100L, 300_000, "Failed to load " + countStarResult + " documents", true);
+ }
+
+ @AfterClass
+ public void tearDown()
+ throws Exception {
+ dropOfflineTable(getTableName());
+ stopServer();
+ stopBroker();
+ stopController();
+ stopZk();
+ }
+}
diff --git a/pinot-segment-local/src/main/java/org/apache/pinot/segment/local/utils/SegmentPushUtils.java b/pinot-segment-local/src/main/java/org/apache/pinot/segment/local/utils/SegmentPushUtils.java
index 1d91e5bb4e..b756c7f760 100644
--- a/pinot-segment-local/src/main/java/org/apache/pinot/segment/local/utils/SegmentPushUtils.java
+++ b/pinot-segment-local/src/main/java/org/apache/pinot/segment/local/utils/SegmentPushUtils.java
@@ -264,6 +264,10 @@ public class SegmentPushUtils implements Serializable {
headers.add(new BasicHeader(FileUploadDownloadClient.CustomHeaders.DOWNLOAD_URI, segmentUriPath));
headers.add(new BasicHeader(FileUploadDownloadClient.CustomHeaders.UPLOAD_TYPE,
FileUploadDownloadClient.FileUploadType.METADATA.toString()));
+ if (spec.getPushJobSpec() != null) {
+ headers.add(new BasicHeader(FileUploadDownloadClient.CustomHeaders.COPY_SEGMENT_TO_DEEP_STORE,
+ String.valueOf(spec.getPushJobSpec().getCopyToDeepStoreForMetadataPush())));
+ }
headers.addAll(AuthProviderUtils.toRequestHeaders(authProvider));
SimpleHttpResponse response = FILE_UPLOAD_DOWNLOAD_CLIENT
diff --git a/pinot-spi/src/main/java/org/apache/pinot/spi/ingestion/batch/spec/PushJobSpec.java b/pinot-spi/src/main/java/org/apache/pinot/spi/ingestion/batch/spec/PushJobSpec.java
index 40944978ca..04481baf1f 100644
--- a/pinot-spi/src/main/java/org/apache/pinot/spi/ingestion/batch/spec/PushJobSpec.java
+++ b/pinot-spi/src/main/java/org/apache/pinot/spi/ingestion/batch/spec/PushJobSpec.java
@@ -41,6 +41,11 @@ public class PushJobSpec implements Serializable {
*/
private long _pushRetryIntervalMillis = 1000;
+ /**
+ * Applicable for URI and METADATA push types.
+ * If true, and if segment was not already in the deep store, move it to deep store.
+ */
+ private boolean _copyToDeepStoreForMetadataPush;
/**
* Used in SegmentUriPushJobRunner, which is used to composite the segment uri to send to pinot controller.
* The URI sends to controller is in the format ${segmentUriPrefix}${segmentPath}${segmentUriSuffix}
@@ -121,4 +126,12 @@ public class PushJobSpec implements Serializable {
public void setPushParallelism(int pushParallelism) {
_pushParallelism = pushParallelism;
}
+
+ public boolean getCopyToDeepStoreForMetadataPush() {
+ return _copyToDeepStoreForMetadataPush;
+ }
+
+ public void setCopyToDeepStoreForMetadataPush(boolean copyToDeepStoreForMetadataPush) {
+ _copyToDeepStoreForMetadataPush = copyToDeepStoreForMetadataPush;
+ }
}
---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscribe@pinot.apache.org
For additional commands, e-mail: commits-help@pinot.apache.org