You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@pinot.apache.org by ti...@apache.org on 2020/09/02 19:19:23 UTC
[incubator-pinot] branch master updated: [Deepstore by-pass]Add a
Deepstore bypass integration test with minor bug fixes. (#5857)
This is an automated email from the ASF dual-hosted git repository.
tingchen pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/incubator-pinot.git
The following commit(s) were added to refs/heads/master by this push:
new 7f12105 [Deepstore by-pass]Add a Deepstore bypass integration test with minor bug fixes. (#5857)
7f12105 is described below
commit 7f1210539591e04fbb5fc4ef53e197405a40c727
Author: Ting Chen <ti...@uber.com>
AuthorDate: Wed Sep 2 12:19:09 2020 -0700
[Deepstore by-pass]Add a Deepstore bypass integration test with minor bug fixes. (#5857)
* Add an integration test for peer segment download for LLC. Serveral minor fixes.
---
.../common/utils/fetcher/HttpSegmentFetcher.java | 15 +
.../manager/realtime/PinotFSSegmentUploader.java | 6 +-
.../pinot/core/util/PeerServerSegmentFinder.java | 27 +-
.../realtime/PinotFSSegmentUploaderTest.java | 4 +-
...rDownloadLLCRealtimeClusterIntegrationTest.java | 379 +++++++++++++++++++++
5 files changed, 422 insertions(+), 9 deletions(-)
diff --git a/pinot-common/src/main/java/org/apache/pinot/common/utils/fetcher/HttpSegmentFetcher.java b/pinot-common/src/main/java/org/apache/pinot/common/utils/fetcher/HttpSegmentFetcher.java
index 5b04eda..fcaee6c 100644
--- a/pinot-common/src/main/java/org/apache/pinot/common/utils/fetcher/HttpSegmentFetcher.java
+++ b/pinot-common/src/main/java/org/apache/pinot/common/utils/fetcher/HttpSegmentFetcher.java
@@ -21,6 +21,7 @@ package org.apache.pinot.common.utils.fetcher;
import java.io.File;
import java.net.URI;
+import java.util.List;
import org.apache.pinot.common.exception.HttpErrorStatusException;
import org.apache.pinot.common.utils.FileUploadDownloadClient;
import org.apache.pinot.spi.env.PinotConfiguration;
@@ -64,4 +65,18 @@ public class HttpSegmentFetcher extends BaseSegmentFetcher {
}
});
}
+
+ @Override
+ public void fetchSegmentToLocalWithoutRetry(URI uri, File dest)
+ throws Exception {
+ try {
+ int statusCode = _httpClient.downloadFile(uri, dest);
+ _logger
+ .info("Downloaded segment from: {} to: {} of size: {}; Response status code: {}", uri, dest, dest.length(),
+ statusCode);
+ } catch (Exception e) {
+ _logger.warn("Caught exception while downloading segment from: {} to: {}", uri, dest, e);
+ throw e;
+ }
+ }
}
diff --git a/pinot-core/src/main/java/org/apache/pinot/core/data/manager/realtime/PinotFSSegmentUploader.java b/pinot-core/src/main/java/org/apache/pinot/core/data/manager/realtime/PinotFSSegmentUploader.java
index 75a7fff..263435e 100644
--- a/pinot-core/src/main/java/org/apache/pinot/core/data/manager/realtime/PinotFSSegmentUploader.java
+++ b/pinot-core/src/main/java/org/apache/pinot/core/data/manager/realtime/PinotFSSegmentUploader.java
@@ -20,6 +20,7 @@ package org.apache.pinot.core.data.manager.realtime;
import java.io.File;
import java.net.URI;
+import java.util.UUID;
import java.util.concurrent.Callable;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;
@@ -36,7 +37,7 @@ import org.slf4j.LoggerFactory;
/**
* A segment uploader which does segment upload to a segment store (with store root dir configured as
* _segmentStoreUriStr) using PinotFS within a configurable timeout period. The final segment location would be in the
- * URI _segmentStoreUriStr/_tableNameWithType/segmentName if successful.
+ * URI _segmentStoreUriStr/_tableNameWithType/segmentName+random_uuid if successful.
*/
public class PinotFSSegmentUploader implements SegmentUploader {
private Logger LOGGER = LoggerFactory.getLogger(PinotFSSegmentUploader.class);
@@ -57,7 +58,7 @@ public class PinotFSSegmentUploader implements SegmentUploader {
}
Callable<URI> uploadTask = () -> {
URI destUri = new URI(StringUtil
- .join(File.separator, _segmentStoreUriStr, segmentName.getTableName(), segmentName.getSegmentName()));
+ .join(File.separator, _segmentStoreUriStr, segmentName.getTableName(), segmentName.getSegmentName() + UUID.randomUUID().toString()));
try {
PinotFS pinotFS = PinotFSFactory.create(new URI(_segmentStoreUriStr).getScheme());
// Check and delete any existing segment file.
@@ -74,6 +75,7 @@ public class PinotFSSegmentUploader implements SegmentUploader {
Future<URI> future = _executorService.submit(uploadTask);
try {
URI segmentLocation = future.get(_timeoutInMs, TimeUnit.MILLISECONDS);
+ LOGGER.info("Successfully upload segment {} to {}.", segmentName, segmentLocation);
return segmentLocation;
} catch (InterruptedException e) {
LOGGER.info("Interrupted while waiting for segment upload of {} to {}.", segmentName, _segmentStoreUriStr);
diff --git a/pinot-core/src/main/java/org/apache/pinot/core/util/PeerServerSegmentFinder.java b/pinot-core/src/main/java/org/apache/pinot/core/util/PeerServerSegmentFinder.java
index 76fd152..2747f5b 100644
--- a/pinot-core/src/main/java/org/apache/pinot/core/util/PeerServerSegmentFinder.java
+++ b/pinot-core/src/main/java/org/apache/pinot/core/util/PeerServerSegmentFinder.java
@@ -35,16 +35,20 @@ import org.apache.pinot.common.utils.StringUtil;
import org.apache.pinot.common.utils.helix.HelixHelper;
import org.apache.pinot.spi.config.table.TableType;
import org.apache.pinot.spi.utils.builder.TableNameBuilder;
+import org.apache.pinot.spi.utils.retry.RetryPolicies;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
/**
- * PeerServerSegmentFinder discovers all the servers having the input segment in a ONLINE state through external view of
- * a Pinot table.
+ * PeerServerSegmentFinder discovers all the servers having the input segment in an ONLINE state through external view
+ * of a Pinot table. It performs retries during the discovery to minimize the chance of Helix state propagation delay.
*/
public class PeerServerSegmentFinder {
private static final Logger _logger = LoggerFactory.getLogger(PeerServerSegmentFinder.class);
+ private static final int MAX_NUM_ATTEMPTS = 5;
+ private static final int INITIAL_DELAY_MS = 500;
+ private static final double DELAY_SCALE_FACTOR = 2;
/**
*
@@ -65,13 +69,27 @@ public class PeerServerSegmentFinder {
_logger.error("ClusterName not found");
return ListUtils.EMPTY_LIST;
}
+ final List<URI> onlineServerURIs = new ArrayList<>();
+ try {
+ RetryPolicies.exponentialBackoffRetryPolicy(MAX_NUM_ATTEMPTS, INITIAL_DELAY_MS, DELAY_SCALE_FACTOR).attempt(() -> {
+ getOnlineServersFromExternalView(segmentName, downloadScheme, tableNameWithType, helixAdmin, clusterName,
+ onlineServerURIs);
+ return onlineServerURIs.size() > 0;
+ });
+ } catch (Exception e) {
+ _logger.error("Failure in getting online servers for segment {}", segmentName, e);
+ }
+ return onlineServerURIs;
+ }
+
+ private static void getOnlineServersFromExternalView(String segmentName, String downloadScheme,
+ String tableNameWithType, HelixAdmin helixAdmin, String clusterName, List<URI> onlineServerURIs) {
ExternalView externalViewForResource =
HelixHelper.getExternalViewForResource(helixAdmin, clusterName, tableNameWithType);
if (externalViewForResource == null) {
_logger.warn("External View not found for table {}", tableNameWithType);
- return ListUtils.EMPTY_LIST;
+ return;
}
- List<URI> onlineServerURIs = new ArrayList<>();
// Find out the ONLINE servers serving the segment.
Map<String, String> instanceToStateMap = externalViewForResource.getStateMap(segmentName);
for (Map.Entry<String, String> instanceState : instanceToStateMap.entrySet()) {
@@ -89,7 +107,6 @@ public class PeerServerSegmentFinder {
}
}
}
- return onlineServerURIs;
}
private static int getServerAdminPort(HelixAdmin helixAdmin, String clusterName, String instanceId) {
diff --git a/pinot-core/src/test/java/org/apache/pinot/core/data/manager/realtime/PinotFSSegmentUploaderTest.java b/pinot-core/src/test/java/org/apache/pinot/core/data/manager/realtime/PinotFSSegmentUploaderTest.java
index dc93bbf..3ee81e0 100644
--- a/pinot-core/src/test/java/org/apache/pinot/core/data/manager/realtime/PinotFSSegmentUploaderTest.java
+++ b/pinot-core/src/test/java/org/apache/pinot/core/data/manager/realtime/PinotFSSegmentUploaderTest.java
@@ -60,14 +60,14 @@ public class PinotFSSegmentUploaderTest {
public void testSuccessfulUpload() {
SegmentUploader segmentUploader = new PinotFSSegmentUploader("hdfs://root", TIMEOUT_IN_MS);
URI segmentURI = segmentUploader.uploadSegment(_file, _llcSegmentName);
- Assert.assertEquals(segmentURI.toString(), StringUtil.join(File.separator,"hdfs://root", _llcSegmentName.getTableName(), _llcSegmentName.getSegmentName()));
+ Assert.assertTrue(segmentURI.toString().startsWith(StringUtil.join(File.separator,"hdfs://root", _llcSegmentName.getTableName(), _llcSegmentName.getSegmentName())));
}
@Test
public void testSegmentAlreadyExist() {
SegmentUploader segmentUploader = new PinotFSSegmentUploader("existing://root", TIMEOUT_IN_MS);
URI segmentURI = segmentUploader.uploadSegment(_file, _llcSegmentName);
- Assert.assertEquals(segmentURI.toString(), StringUtil.join(File.separator,"existing://root", _llcSegmentName.getTableName(), _llcSegmentName.getSegmentName()));
+ Assert.assertTrue(segmentURI.toString().startsWith(StringUtil.join(File.separator,"existing://root", _llcSegmentName.getTableName(), _llcSegmentName.getSegmentName())));
}
@Test
diff --git a/pinot-integration-tests/src/test/java/org/apache/pinot/integration/tests/PeerDownloadLLCRealtimeClusterIntegrationTest.java b/pinot-integration-tests/src/test/java/org/apache/pinot/integration/tests/PeerDownloadLLCRealtimeClusterIntegrationTest.java
new file mode 100644
index 0000000..ea8ea51
--- /dev/null
+++ b/pinot-integration-tests/src/test/java/org/apache/pinot/integration/tests/PeerDownloadLLCRealtimeClusterIntegrationTest.java
@@ -0,0 +1,379 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.pinot.integration.tests;
+
+import com.google.common.base.Preconditions;
+import java.io.File;
+import java.io.IOException;
+import java.io.InputStream;
+import java.net.URI;
+import java.net.URISyntaxException;
+import java.util.Collections;
+import java.util.List;
+import java.util.Map;
+import java.util.Random;
+import org.apache.avro.reflect.Nullable;
+import org.apache.commons.io.FileUtils;
+import org.apache.helix.ZNRecord;
+import org.apache.helix.model.ExternalView;
+import org.apache.pinot.common.utils.CommonConstants;
+import org.apache.pinot.common.utils.LLCSegmentName;
+import org.apache.pinot.common.utils.helix.HelixHelper;
+import org.apache.pinot.controller.ControllerConf;
+import org.apache.pinot.spi.config.table.CompletionConfig;
+import org.apache.pinot.spi.config.table.SegmentsValidationAndRetentionConfig;
+import org.apache.pinot.spi.config.table.TableConfig;
+import org.apache.pinot.spi.config.table.TableType;
+import org.apache.pinot.spi.env.PinotConfiguration;
+import org.apache.pinot.spi.filesystem.LocalPinotFS;
+import org.apache.pinot.spi.filesystem.PinotFS;
+import org.apache.pinot.spi.utils.builder.TableConfigBuilder;
+import org.apache.pinot.spi.utils.builder.TableNameBuilder;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+import org.testng.Assert;
+import org.testng.annotations.BeforeClass;
+import org.testng.annotations.Test;
+
+import static org.apache.pinot.controller.ControllerConf.ALLOW_HLC_TABLES;
+import static org.apache.pinot.controller.ControllerConf.ENABLE_SPLIT_COMMIT;
+import static org.testng.Assert.assertEquals;
+
+
+/**
+ * Integration test that extends RealtimeClusterIntegrationTest but uses low-level Kafka consumer and a fake PinotFS as
+ * the deep store for segments. This test enables the peer to peer segment download scheme to test Pinot servers can
+ * download segments from peer servers even when the deep store is down. This is done by injection of failures in
+ * the fake PinotFS segment upload api (i.e., copyFromLocal) for all segments whose seq number mod 5 is 0.
+ *
+ * Besides standard tests, it also verifies that
+ * (1) All the segments on all servers are in either ONLINE or CONSUMING states
+ * (2) For segments failed during deep store upload, the corresponding segment download url string is empty in Zk.
+ */
+public class PeerDownloadLLCRealtimeClusterIntegrationTest extends RealtimeClusterIntegrationTest {
+ private static final Logger LOGGER = LoggerFactory.getLogger(PeerDownloadLLCRealtimeClusterIntegrationTest.class);
+
+ private static final String CONSUMER_DIRECTORY = "/tmp/consumer-test";
+ private static final long RANDOM_SEED = System.currentTimeMillis();
+ private static final Random RANDOM = new Random(RANDOM_SEED);
+ private static final int NUM_SERVERS = 2;
+ public static final int UPLOAD_FAILURE_MOD = 5;
+
+ private final boolean _isDirectAlloc = true; //Set as true; otherwise trigger indexing exception.
+ private final boolean _isConsumerDirConfigured = true;
+ private final boolean _enableSplitCommit = true;
+ private final boolean _enableLeadControllerResource = RANDOM.nextBoolean();
+ private static File PINOT_FS_ROOT_DIR;
+
+ @BeforeClass
+ @Override
+ public void setUp()
+ throws Exception {
+ System.out.println(String.format(
+ "Using random seed: %s, isDirectAlloc: %s, isConsumerDirConfigured: %s, enableSplitCommit: %s, enableLeadControllerResource: %s",
+ RANDOM_SEED, _isDirectAlloc, _isConsumerDirConfigured, _enableSplitCommit, _enableLeadControllerResource));
+
+ PINOT_FS_ROOT_DIR = new File(FileUtils.getTempDirectoryPath() + File.separator + System.currentTimeMillis() + "/");
+ Preconditions.checkState(PINOT_FS_ROOT_DIR.mkdir(), "Failed to make a dir for " + PINOT_FS_ROOT_DIR.getPath());
+
+ // Remove the consumer directory
+ File consumerDirectory = new File(CONSUMER_DIRECTORY);
+ if (consumerDirectory.exists()) {
+ FileUtils.deleteDirectory(consumerDirectory);
+ }
+ super.setUp();
+ }
+
+
+ @Override
+ public void startServer() {
+ startServers(NUM_SERVERS);
+ }
+
+ @Override
+ public void addTableConfig(TableConfig tableConfig)
+ throws IOException {
+ SegmentsValidationAndRetentionConfig segmentsValidationAndRetentionConfig =
+ new SegmentsValidationAndRetentionConfig();
+ CompletionConfig completionConfig = new CompletionConfig("DOWNLOAD");
+ segmentsValidationAndRetentionConfig.setCompletionConfig(completionConfig);
+ segmentsValidationAndRetentionConfig.setReplicasPerPartition(String.valueOf(NUM_SERVERS));
+ // Important: enable peer to peer download.
+ segmentsValidationAndRetentionConfig.setPeerSegmentDownloadScheme("http");
+ tableConfig.setValidationConfig(segmentsValidationAndRetentionConfig);
+ tableConfig.getValidationConfig().setTimeColumnName(this.getTimeColumnName());
+
+ sendPostRequest(_controllerRequestURLBuilder.forTableCreate(), tableConfig.toJsonString());
+ }
+
+
+ @Override
+ public void startController() {
+ Map<String, Object> controllerConfig = getDefaultControllerConfiguration();
+ controllerConfig.put(ALLOW_HLC_TABLES, false);
+ controllerConfig.put(ENABLE_SPLIT_COMMIT, _enableSplitCommit);
+ // Override the data dir config.
+ controllerConfig.put(ControllerConf.DATA_DIR, "mockfs://" + getHelixClusterName());
+ controllerConfig.put(ControllerConf.LOCAL_TEMP_DIR, FileUtils.getTempDirectory().getAbsolutePath());
+ // Use the mock PinotFS as the PinotFS.
+ controllerConfig.put("pinot.controller.storage.factory.class.mockfs",
+ "org.apache.pinot.integration.tests.PeerDownloadLLCRealtimeClusterIntegrationTest$MockPinotFS");
+ startController(controllerConfig);
+ enableResourceConfigForLeadControllerResource(_enableLeadControllerResource);
+ }
+
+ @Override
+ protected boolean useLlc() {
+ return true;
+ }
+
+ @Nullable
+ @Override
+ protected String getLoadMode() {
+ return "MMAP";
+ }
+
+ @Override
+ protected void overrideServerConf(PinotConfiguration configuration) {
+ configuration.setProperty(CommonConstants.Server.CONFIG_OF_REALTIME_OFFHEAP_ALLOCATION, true);
+ configuration.setProperty(CommonConstants.Server.CONFIG_OF_REALTIME_OFFHEAP_DIRECT_ALLOCATION, _isDirectAlloc);
+ configuration.setProperty(CommonConstants.Server.PREFIX_OF_CONFIG_OF_PINOT_FS_FACTORY + ".class.mockfs",
+ "org.apache.pinot.integration.tests.PeerDownloadLLCRealtimeClusterIntegrationTest$MockPinotFS");
+ // Set the segment deep store uri.
+ configuration.setProperty("pinot.server.instance.segment.store.uri", "mockfs://" + getHelixClusterName());
+ // For setting the HDFS segment fetcher.
+ configuration.setProperty(CommonConstants.Server.PREFIX_OF_CONFIG_OF_SEGMENT_FETCHER_FACTORY + ".protocols", "file,http");
+ if (_isConsumerDirConfigured) {
+ configuration.setProperty(CommonConstants.Server.CONFIG_OF_CONSUMER_DIR, CONSUMER_DIRECTORY);
+ }
+ if (_enableSplitCommit) {
+ configuration.setProperty(CommonConstants.Server.CONFIG_OF_ENABLE_SPLIT_COMMIT, true);
+ configuration.setProperty(CommonConstants.Server.CONFIG_OF_ENABLE_COMMIT_END_WITH_METADATA, true);
+ }
+ }
+
+ @Test
+ public void testConsumerDirectoryExists() {
+ File consumerDirectory = new File(CONSUMER_DIRECTORY, "mytable_REALTIME");
+ assertEquals(consumerDirectory.exists(), _isConsumerDirConfigured,
+ "The off heap consumer directory does not exist");
+ }
+
+ @Test
+ public void testSegmentFlushSize() {
+ String zkSegmentsPath = "/SEGMENTS/" + TableNameBuilder.REALTIME.tableNameWithType(getTableName());
+ List<String> segmentNames = _propertyStore.getChildNames(zkSegmentsPath, 0);
+ for (String segmentName : segmentNames) {
+ ZNRecord znRecord = _propertyStore.get(zkSegmentsPath + "/" + segmentName, null, 0);
+ assertEquals(znRecord.getSimpleField(CommonConstants.Segment.FLUSH_THRESHOLD_SIZE),
+ Integer.toString(getRealtimeSegmentFlushSize() / getNumKafkaPartitions()),
+ "Segment: " + segmentName + " does not have the expected flush size");
+ }
+ }
+
+ @Test
+ public void testSegmentDownloadURLs() {
+ // Verify that all segments of even partition number have empty download url in zk.
+ String zkSegmentsPath = "/SEGMENTS/" + TableNameBuilder.REALTIME.tableNameWithType(getTableName());
+ List<String> segmentNames = _propertyStore.getChildNames(zkSegmentsPath, 0);
+ for (String segmentName : segmentNames) {
+ ZNRecord znRecord = _propertyStore.get(zkSegmentsPath + "/" + segmentName, null, 0);
+ String downloadURL = znRecord.getSimpleField("segment.realtime.download.url");
+ String numberOfDoc = znRecord.getSimpleField("segment.total.docs");
+ if (numberOfDoc.equals("-1")) {
+ // This is a consuming segment so the download url is null.
+ Assert.assertNull(downloadURL);
+ continue;
+ }
+ int seqNum = Integer.parseInt(segmentName.split("__")[2]);
+ if (seqNum % UPLOAD_FAILURE_MOD == 0) {
+ Assert.assertEquals("", downloadURL);
+ } else {
+ Assert.assertTrue(downloadURL.startsWith("mockfs://"));
+ }
+ }
+ }
+
+ @Test
+ public void testAllSegmentsAreOnlineOrConsuming() {
+ ExternalView externalView =
+ HelixHelper.getExternalViewForResource(_helixAdmin, getHelixClusterName(),
+ TableNameBuilder.REALTIME.tableNameWithType(getTableName()));
+ Assert.assertEquals("2", externalView.getReplicas());
+ // Verify for each segment e, the state of e in its 2 hosting servers is either ONLINE or CONSUMING
+ for(String segment : externalView.getPartitionSet()) {
+ Map<String, String> instanceToStateMap = externalView.getStateMap(segment);
+ Assert.assertEquals(2, instanceToStateMap.size());
+ for (Map.Entry<String, String> instanceState : instanceToStateMap.entrySet()) {
+ Assert.assertTrue("ONLINE".equalsIgnoreCase(instanceState.getValue()) || "CONSUMING"
+ .equalsIgnoreCase(instanceState.getValue()));
+ }
+ }
+ }
+
+ @Test(expectedExceptions = IOException.class)
+ public void testAddHLCTableShouldFail()
+ throws IOException {
+ TableConfig tableConfig = new TableConfigBuilder(TableType.REALTIME).setTableName("testTable")
+ .setStreamConfigs(Collections.singletonMap("stream.kafka.consumer.type", "HIGHLEVEL")).build();
+ sendPostRequest(_controllerRequestURLBuilder.forTableCreate(), tableConfig.toJsonString());
+ }
+
+ // MockPinotFS is a localPinotFS whose root directory is configured as _basePath;
+ public static class MockPinotFS extends PinotFS {
+ LocalPinotFS _localPinotFS = new LocalPinotFS();
+ File _basePath;
+ @Override
+ public void init(PinotConfiguration config) {
+ _localPinotFS.init(config);
+ _basePath = PeerDownloadLLCRealtimeClusterIntegrationTest.PINOT_FS_ROOT_DIR;
+ }
+
+ @Override
+ public boolean mkdir(URI uri)
+ throws IOException {
+ try {
+ return _localPinotFS.mkdir(new URI(_basePath + uri.getPath()));
+ } catch (URISyntaxException e) {
+ throw new IllegalArgumentException(e.getMessage());
+ }
+ }
+
+ @Override
+ public boolean delete(URI segmentUri, boolean forceDelete)
+ throws IOException {
+ try {
+ return _localPinotFS.delete(new URI(_basePath + segmentUri.getPath()), forceDelete);
+ } catch (URISyntaxException e) {
+ throw new IllegalArgumentException(e.getMessage());
+ }
+ }
+
+ @Override
+ public boolean doMove(URI srcUri, URI dstUri)
+ throws IOException {
+ try {
+ LOGGER.warn("Moving from {} to {}", srcUri, dstUri);
+ return _localPinotFS.doMove(new URI(_basePath + srcUri.getPath()), new URI(_basePath + dstUri.getPath()));
+ } catch (URISyntaxException e) {
+ throw new IllegalArgumentException(e.getMessage());
+ }
+ }
+
+ @Override
+ public boolean copy(URI srcUri, URI dstUri)
+ throws IOException {
+ try {
+ return _localPinotFS.copy(new URI(_basePath + srcUri.getPath()), new URI(_basePath + dstUri.getPath()));
+ } catch (URISyntaxException e) {
+ throw new IllegalArgumentException(e.getMessage());
+ }
+ }
+
+ @Override
+ public boolean exists(URI fileUri)
+ throws IOException {
+ try {
+ return _localPinotFS.exists(new URI(_basePath + fileUri.getPath()));
+ } catch (URISyntaxException e) {
+ throw new IllegalArgumentException(e.getMessage());
+ }
+ }
+
+ @Override
+ public long length(URI fileUri)
+ throws IOException {
+ try {
+ return _localPinotFS.length(new URI(_basePath + fileUri.getPath()));
+ } catch (URISyntaxException e) {
+ throw new IllegalArgumentException(e.getMessage());
+ }
+ }
+
+ @Override
+ public String[] listFiles(URI fileUri, boolean recursive)
+ throws IOException {
+ try {
+ return _localPinotFS.listFiles(new URI(_basePath + fileUri.getPath()), recursive);
+ } catch (URISyntaxException e) {
+ throw new IllegalArgumentException(e.getMessage());
+ }
+ }
+
+ @Override
+ public void copyToLocalFile(URI srcUri, File dstFile)
+ throws Exception {
+ _localPinotFS.copyToLocalFile(new URI(_basePath + srcUri.getPath()), dstFile);
+ }
+
+ @Override
+ public void copyFromLocalFile(File srcFile, URI dstUri)
+ throws Exception {
+ // Inject failures for segments whose seq number mod 5 is 0.
+ if (new LLCSegmentName(srcFile.getName()).getSequenceNumber() % UPLOAD_FAILURE_MOD == 0) {
+ throw new IllegalArgumentException(srcFile.getAbsolutePath());
+ }
+ try {
+ _localPinotFS.copyFromLocalFile(srcFile, new URI(_basePath + dstUri.getPath()));
+ } catch (URISyntaxException e) {
+ throw new IllegalArgumentException(e.getMessage());
+ }
+ }
+
+ @Override
+ public boolean isDirectory(URI uri)
+ throws IOException {
+ try {
+ return _localPinotFS.isDirectory(new URI(_basePath + uri.getPath()));
+ } catch (URISyntaxException e) {
+ throw new IllegalArgumentException(e.getMessage());
+ }
+ }
+
+ @Override
+ public long lastModified(URI uri)
+ throws IOException {
+ try {
+ return _localPinotFS.lastModified(new URI(_basePath + uri.getPath()));
+ } catch (URISyntaxException e) {
+ throw new IllegalArgumentException(e.getMessage());
+ }
+ }
+
+ @Override
+ public boolean touch(URI uri)
+ throws IOException {
+ try {
+ return _localPinotFS.touch(new URI(_basePath + uri.getPath()));
+ } catch (URISyntaxException e) {
+ throw new IllegalArgumentException(e.getMessage());
+ }
+ }
+
+ @Override
+ public InputStream open(URI uri)
+ throws IOException {
+ try {
+ return _localPinotFS.open(new URI(_basePath + uri.getPath()));
+ } catch (URISyntaxException e) {
+ throw new IllegalArgumentException(e.getMessage());
+ }
+ }
+ }
+}
\ No newline at end of file
---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscribe@pinot.apache.org
For additional commands, e-mail: commits-help@pinot.apache.org