You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@pinot.apache.org by ti...@apache.org on 2020/09/02 19:19:23 UTC

[incubator-pinot] branch master updated: [Deepstore by-pass]Add a Deepstore bypass integration test with minor bug fixes. (#5857)

This is an automated email from the ASF dual-hosted git repository.

tingchen pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/incubator-pinot.git


The following commit(s) were added to refs/heads/master by this push:
     new 7f12105  [Deepstore by-pass]Add a Deepstore bypass integration test with minor bug fixes. (#5857)
7f12105 is described below

commit 7f1210539591e04fbb5fc4ef53e197405a40c727
Author: Ting Chen <ti...@uber.com>
AuthorDate: Wed Sep 2 12:19:09 2020 -0700

    [Deepstore by-pass]Add a Deepstore bypass integration test with minor bug fixes. (#5857)
    
    
    * Add an integration test for peer segment download for LLC. Serveral minor fixes.
---
 .../common/utils/fetcher/HttpSegmentFetcher.java   |  15 +
 .../manager/realtime/PinotFSSegmentUploader.java   |   6 +-
 .../pinot/core/util/PeerServerSegmentFinder.java   |  27 +-
 .../realtime/PinotFSSegmentUploaderTest.java       |   4 +-
 ...rDownloadLLCRealtimeClusterIntegrationTest.java | 379 +++++++++++++++++++++
 5 files changed, 422 insertions(+), 9 deletions(-)

diff --git a/pinot-common/src/main/java/org/apache/pinot/common/utils/fetcher/HttpSegmentFetcher.java b/pinot-common/src/main/java/org/apache/pinot/common/utils/fetcher/HttpSegmentFetcher.java
index 5b04eda..fcaee6c 100644
--- a/pinot-common/src/main/java/org/apache/pinot/common/utils/fetcher/HttpSegmentFetcher.java
+++ b/pinot-common/src/main/java/org/apache/pinot/common/utils/fetcher/HttpSegmentFetcher.java
@@ -21,6 +21,7 @@ package org.apache.pinot.common.utils.fetcher;
 import java.io.File;
 import java.net.URI;
 
+import java.util.List;
 import org.apache.pinot.common.exception.HttpErrorStatusException;
 import org.apache.pinot.common.utils.FileUploadDownloadClient;
 import org.apache.pinot.spi.env.PinotConfiguration;
@@ -64,4 +65,18 @@ public class HttpSegmentFetcher extends BaseSegmentFetcher {
       }
     });
   }
+
+  @Override
+  public void fetchSegmentToLocalWithoutRetry(URI uri, File dest)
+      throws Exception {
+    try {
+      int statusCode = _httpClient.downloadFile(uri, dest);
+      _logger
+          .info("Downloaded segment from: {} to: {} of size: {}; Response status code: {}", uri, dest, dest.length(),
+              statusCode);
+    }  catch (Exception e) {
+      _logger.warn("Caught exception while downloading segment from: {} to: {}", uri, dest, e);
+      throw e;
+    }
+  }
 }
diff --git a/pinot-core/src/main/java/org/apache/pinot/core/data/manager/realtime/PinotFSSegmentUploader.java b/pinot-core/src/main/java/org/apache/pinot/core/data/manager/realtime/PinotFSSegmentUploader.java
index 75a7fff..263435e 100644
--- a/pinot-core/src/main/java/org/apache/pinot/core/data/manager/realtime/PinotFSSegmentUploader.java
+++ b/pinot-core/src/main/java/org/apache/pinot/core/data/manager/realtime/PinotFSSegmentUploader.java
@@ -20,6 +20,7 @@ package org.apache.pinot.core.data.manager.realtime;
 
 import java.io.File;
 import java.net.URI;
+import java.util.UUID;
 import java.util.concurrent.Callable;
 import java.util.concurrent.ExecutorService;
 import java.util.concurrent.Executors;
@@ -36,7 +37,7 @@ import org.slf4j.LoggerFactory;
 /**
  * A segment uploader which does segment upload to a segment store (with store root dir configured as
  * _segmentStoreUriStr) using PinotFS within a configurable timeout period. The final segment location would be in the
- * URI _segmentStoreUriStr/_tableNameWithType/segmentName if successful.
+ * URI _segmentStoreUriStr/_tableNameWithType/segmentName+random_uuid if successful.
  */
 public class PinotFSSegmentUploader implements SegmentUploader {
   private Logger LOGGER = LoggerFactory.getLogger(PinotFSSegmentUploader.class);
@@ -57,7 +58,7 @@ public class PinotFSSegmentUploader implements SegmentUploader {
     }
     Callable<URI> uploadTask = () -> {
       URI destUri = new URI(StringUtil
-          .join(File.separator, _segmentStoreUriStr, segmentName.getTableName(), segmentName.getSegmentName()));
+          .join(File.separator, _segmentStoreUriStr, segmentName.getTableName(), segmentName.getSegmentName() + UUID.randomUUID().toString()));
       try {
         PinotFS pinotFS = PinotFSFactory.create(new URI(_segmentStoreUriStr).getScheme());
         // Check and delete any existing segment file.
@@ -74,6 +75,7 @@ public class PinotFSSegmentUploader implements SegmentUploader {
     Future<URI> future = _executorService.submit(uploadTask);
     try {
       URI segmentLocation = future.get(_timeoutInMs, TimeUnit.MILLISECONDS);
+      LOGGER.info("Successfully upload segment {} to {}.", segmentName, segmentLocation);
       return segmentLocation;
     } catch (InterruptedException e) {
       LOGGER.info("Interrupted while waiting for segment upload of {} to {}.", segmentName, _segmentStoreUriStr);
diff --git a/pinot-core/src/main/java/org/apache/pinot/core/util/PeerServerSegmentFinder.java b/pinot-core/src/main/java/org/apache/pinot/core/util/PeerServerSegmentFinder.java
index 76fd152..2747f5b 100644
--- a/pinot-core/src/main/java/org/apache/pinot/core/util/PeerServerSegmentFinder.java
+++ b/pinot-core/src/main/java/org/apache/pinot/core/util/PeerServerSegmentFinder.java
@@ -35,16 +35,20 @@ import org.apache.pinot.common.utils.StringUtil;
 import org.apache.pinot.common.utils.helix.HelixHelper;
 import org.apache.pinot.spi.config.table.TableType;
 import org.apache.pinot.spi.utils.builder.TableNameBuilder;
+import org.apache.pinot.spi.utils.retry.RetryPolicies;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 
 
 /**
- * PeerServerSegmentFinder discovers all the servers having the input segment in a ONLINE state through external view of
- * a Pinot table.
+ * PeerServerSegmentFinder discovers all the servers having the input segment in an ONLINE state through external view
+ * of a Pinot table. It performs retries during the discovery to minimize the chance of Helix state propagation delay.
  */
 public class PeerServerSegmentFinder {
   private static final Logger _logger = LoggerFactory.getLogger(PeerServerSegmentFinder.class);
+  private static final int MAX_NUM_ATTEMPTS = 5;
+  private static final int INITIAL_DELAY_MS = 500;
+  private static final double DELAY_SCALE_FACTOR = 2;
 
   /**
    *
@@ -65,13 +69,27 @@ public class PeerServerSegmentFinder {
       _logger.error("ClusterName not found");
       return ListUtils.EMPTY_LIST;
     }
+    final List<URI> onlineServerURIs = new ArrayList<>();
+    try {
+      RetryPolicies.exponentialBackoffRetryPolicy(MAX_NUM_ATTEMPTS, INITIAL_DELAY_MS, DELAY_SCALE_FACTOR).attempt(() -> {
+        getOnlineServersFromExternalView(segmentName, downloadScheme, tableNameWithType, helixAdmin, clusterName,
+            onlineServerURIs);
+        return onlineServerURIs.size() > 0;
+      });
+    } catch (Exception e) {
+      _logger.error("Failure in getting online servers for segment {}", segmentName, e);
+    }
+    return onlineServerURIs;
+  }
+
+  private static void getOnlineServersFromExternalView(String segmentName, String downloadScheme,
+      String tableNameWithType, HelixAdmin helixAdmin, String clusterName, List<URI> onlineServerURIs) {
     ExternalView externalViewForResource =
         HelixHelper.getExternalViewForResource(helixAdmin, clusterName, tableNameWithType);
     if (externalViewForResource == null) {
       _logger.warn("External View not found for table {}", tableNameWithType);
-      return ListUtils.EMPTY_LIST;
+      return;
     }
-    List<URI> onlineServerURIs = new ArrayList<>();
     // Find out the ONLINE servers serving the segment.
     Map<String, String> instanceToStateMap = externalViewForResource.getStateMap(segmentName);
     for (Map.Entry<String, String> instanceState : instanceToStateMap.entrySet()) {
@@ -89,7 +107,6 @@ public class PeerServerSegmentFinder {
         }
       }
     }
-    return onlineServerURIs;
   }
 
   private static int getServerAdminPort(HelixAdmin helixAdmin, String clusterName, String instanceId) {
diff --git a/pinot-core/src/test/java/org/apache/pinot/core/data/manager/realtime/PinotFSSegmentUploaderTest.java b/pinot-core/src/test/java/org/apache/pinot/core/data/manager/realtime/PinotFSSegmentUploaderTest.java
index dc93bbf..3ee81e0 100644
--- a/pinot-core/src/test/java/org/apache/pinot/core/data/manager/realtime/PinotFSSegmentUploaderTest.java
+++ b/pinot-core/src/test/java/org/apache/pinot/core/data/manager/realtime/PinotFSSegmentUploaderTest.java
@@ -60,14 +60,14 @@ public class PinotFSSegmentUploaderTest {
   public void testSuccessfulUpload() {
     SegmentUploader segmentUploader = new PinotFSSegmentUploader("hdfs://root", TIMEOUT_IN_MS);
     URI segmentURI = segmentUploader.uploadSegment(_file, _llcSegmentName);
-    Assert.assertEquals(segmentURI.toString(), StringUtil.join(File.separator,"hdfs://root", _llcSegmentName.getTableName(), _llcSegmentName.getSegmentName()));
+    Assert.assertTrue(segmentURI.toString().startsWith(StringUtil.join(File.separator,"hdfs://root", _llcSegmentName.getTableName(), _llcSegmentName.getSegmentName())));
   }
 
   @Test
   public void testSegmentAlreadyExist() {
     SegmentUploader segmentUploader = new PinotFSSegmentUploader("existing://root", TIMEOUT_IN_MS);
     URI segmentURI = segmentUploader.uploadSegment(_file, _llcSegmentName);
-    Assert.assertEquals(segmentURI.toString(), StringUtil.join(File.separator,"existing://root", _llcSegmentName.getTableName(), _llcSegmentName.getSegmentName()));
+    Assert.assertTrue(segmentURI.toString().startsWith(StringUtil.join(File.separator,"existing://root", _llcSegmentName.getTableName(), _llcSegmentName.getSegmentName())));
   }
 
   @Test
diff --git a/pinot-integration-tests/src/test/java/org/apache/pinot/integration/tests/PeerDownloadLLCRealtimeClusterIntegrationTest.java b/pinot-integration-tests/src/test/java/org/apache/pinot/integration/tests/PeerDownloadLLCRealtimeClusterIntegrationTest.java
new file mode 100644
index 0000000..ea8ea51
--- /dev/null
+++ b/pinot-integration-tests/src/test/java/org/apache/pinot/integration/tests/PeerDownloadLLCRealtimeClusterIntegrationTest.java
@@ -0,0 +1,379 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.pinot.integration.tests;
+
+import com.google.common.base.Preconditions;
+import java.io.File;
+import java.io.IOException;
+import java.io.InputStream;
+import java.net.URI;
+import java.net.URISyntaxException;
+import java.util.Collections;
+import java.util.List;
+import java.util.Map;
+import java.util.Random;
+import org.apache.avro.reflect.Nullable;
+import org.apache.commons.io.FileUtils;
+import org.apache.helix.ZNRecord;
+import org.apache.helix.model.ExternalView;
+import org.apache.pinot.common.utils.CommonConstants;
+import org.apache.pinot.common.utils.LLCSegmentName;
+import org.apache.pinot.common.utils.helix.HelixHelper;
+import org.apache.pinot.controller.ControllerConf;
+import org.apache.pinot.spi.config.table.CompletionConfig;
+import org.apache.pinot.spi.config.table.SegmentsValidationAndRetentionConfig;
+import org.apache.pinot.spi.config.table.TableConfig;
+import org.apache.pinot.spi.config.table.TableType;
+import org.apache.pinot.spi.env.PinotConfiguration;
+import org.apache.pinot.spi.filesystem.LocalPinotFS;
+import org.apache.pinot.spi.filesystem.PinotFS;
+import org.apache.pinot.spi.utils.builder.TableConfigBuilder;
+import org.apache.pinot.spi.utils.builder.TableNameBuilder;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+import org.testng.Assert;
+import org.testng.annotations.BeforeClass;
+import org.testng.annotations.Test;
+
+import static org.apache.pinot.controller.ControllerConf.ALLOW_HLC_TABLES;
+import static org.apache.pinot.controller.ControllerConf.ENABLE_SPLIT_COMMIT;
+import static org.testng.Assert.assertEquals;
+
+
+/**
+ * Integration test that extends RealtimeClusterIntegrationTest but uses low-level Kafka consumer and a fake PinotFS as
+ * the deep store for segments. This test enables the peer to peer segment download scheme to test Pinot servers can
+ * download segments from peer servers even when the deep store is down. This is done by injection of failures in
+ * the fake PinotFS segment upload api (i.e., copyFromLocal) for all segments whose seq number mod 5 is 0.
+ *
+ * Besides standard tests, it also verifies that
+ * (1) All the segments on all servers are in either ONLINE or CONSUMING states
+ * (2) For segments failed during deep store upload, the corresponding segment download url string is empty in Zk.
+ */
+public class PeerDownloadLLCRealtimeClusterIntegrationTest extends RealtimeClusterIntegrationTest {
+  private static final Logger LOGGER = LoggerFactory.getLogger(PeerDownloadLLCRealtimeClusterIntegrationTest.class);
+
+  private static final String CONSUMER_DIRECTORY = "/tmp/consumer-test";
+  private static final long RANDOM_SEED = System.currentTimeMillis();
+  private static final Random RANDOM = new Random(RANDOM_SEED);
+  private static final int NUM_SERVERS = 2;
+  public static final int UPLOAD_FAILURE_MOD = 5;
+
+  private final boolean _isDirectAlloc = true; //Set as true; otherwise trigger indexing exception.
+  private final boolean _isConsumerDirConfigured = true;
+  private final boolean _enableSplitCommit = true;
+  private final boolean _enableLeadControllerResource = RANDOM.nextBoolean();
+  private static File PINOT_FS_ROOT_DIR;
+
+  @BeforeClass
+  @Override
+  public void setUp()
+      throws Exception {
+    System.out.println(String.format(
+        "Using random seed: %s, isDirectAlloc: %s, isConsumerDirConfigured: %s, enableSplitCommit: %s, enableLeadControllerResource: %s",
+        RANDOM_SEED, _isDirectAlloc, _isConsumerDirConfigured, _enableSplitCommit, _enableLeadControllerResource));
+
+    PINOT_FS_ROOT_DIR = new File(FileUtils.getTempDirectoryPath() + File.separator + System.currentTimeMillis() + "/");
+    Preconditions.checkState(PINOT_FS_ROOT_DIR.mkdir(), "Failed to make a dir for " + PINOT_FS_ROOT_DIR.getPath());
+
+    // Remove the consumer directory
+    File consumerDirectory = new File(CONSUMER_DIRECTORY);
+    if (consumerDirectory.exists()) {
+      FileUtils.deleteDirectory(consumerDirectory);
+    }
+    super.setUp();
+  }
+
+
+  @Override
+  public void startServer() {
+    startServers(NUM_SERVERS);
+  }
+
+  @Override
+  public void addTableConfig(TableConfig tableConfig)
+      throws IOException {
+    SegmentsValidationAndRetentionConfig segmentsValidationAndRetentionConfig =
+        new SegmentsValidationAndRetentionConfig();
+    CompletionConfig completionConfig = new CompletionConfig("DOWNLOAD");
+    segmentsValidationAndRetentionConfig.setCompletionConfig(completionConfig);
+    segmentsValidationAndRetentionConfig.setReplicasPerPartition(String.valueOf(NUM_SERVERS));
+    // Important: enable peer to peer download.
+    segmentsValidationAndRetentionConfig.setPeerSegmentDownloadScheme("http");
+    tableConfig.setValidationConfig(segmentsValidationAndRetentionConfig);
+    tableConfig.getValidationConfig().setTimeColumnName(this.getTimeColumnName());
+
+    sendPostRequest(_controllerRequestURLBuilder.forTableCreate(), tableConfig.toJsonString());
+  }
+
+
+  @Override
+  public void startController() {
+    Map<String, Object> controllerConfig = getDefaultControllerConfiguration();
+    controllerConfig.put(ALLOW_HLC_TABLES, false);
+    controllerConfig.put(ENABLE_SPLIT_COMMIT, _enableSplitCommit);
+    // Override the data dir config.
+    controllerConfig.put(ControllerConf.DATA_DIR, "mockfs://" + getHelixClusterName());
+    controllerConfig.put(ControllerConf.LOCAL_TEMP_DIR, FileUtils.getTempDirectory().getAbsolutePath());
+    // Use the mock PinotFS as the PinotFS.
+    controllerConfig.put("pinot.controller.storage.factory.class.mockfs",
+        "org.apache.pinot.integration.tests.PeerDownloadLLCRealtimeClusterIntegrationTest$MockPinotFS");
+    startController(controllerConfig);
+    enableResourceConfigForLeadControllerResource(_enableLeadControllerResource);
+  }
+
+  @Override
+  protected boolean useLlc() {
+    return true;
+  }
+
+  @Nullable
+  @Override
+  protected String getLoadMode() {
+    return "MMAP";
+  }
+
+  @Override
+  protected void overrideServerConf(PinotConfiguration configuration) {
+    configuration.setProperty(CommonConstants.Server.CONFIG_OF_REALTIME_OFFHEAP_ALLOCATION, true);
+    configuration.setProperty(CommonConstants.Server.CONFIG_OF_REALTIME_OFFHEAP_DIRECT_ALLOCATION, _isDirectAlloc);
+    configuration.setProperty(CommonConstants.Server.PREFIX_OF_CONFIG_OF_PINOT_FS_FACTORY + ".class.mockfs",
+        "org.apache.pinot.integration.tests.PeerDownloadLLCRealtimeClusterIntegrationTest$MockPinotFS");
+    // Set the segment deep store uri.
+    configuration.setProperty("pinot.server.instance.segment.store.uri", "mockfs://" + getHelixClusterName());
+    // For setting the HDFS segment fetcher.
+    configuration.setProperty(CommonConstants.Server.PREFIX_OF_CONFIG_OF_SEGMENT_FETCHER_FACTORY + ".protocols", "file,http");
+    if (_isConsumerDirConfigured) {
+      configuration.setProperty(CommonConstants.Server.CONFIG_OF_CONSUMER_DIR, CONSUMER_DIRECTORY);
+    }
+    if (_enableSplitCommit) {
+      configuration.setProperty(CommonConstants.Server.CONFIG_OF_ENABLE_SPLIT_COMMIT, true);
+      configuration.setProperty(CommonConstants.Server.CONFIG_OF_ENABLE_COMMIT_END_WITH_METADATA, true);
+    }
+  }
+
+  @Test
+  public void testConsumerDirectoryExists() {
+    File consumerDirectory = new File(CONSUMER_DIRECTORY, "mytable_REALTIME");
+    assertEquals(consumerDirectory.exists(), _isConsumerDirConfigured,
+        "The off heap consumer directory does not exist");
+  }
+
+  @Test
+  public void testSegmentFlushSize() {
+    String zkSegmentsPath = "/SEGMENTS/" + TableNameBuilder.REALTIME.tableNameWithType(getTableName());
+    List<String> segmentNames = _propertyStore.getChildNames(zkSegmentsPath, 0);
+    for (String segmentName : segmentNames) {
+      ZNRecord znRecord = _propertyStore.get(zkSegmentsPath + "/" + segmentName, null, 0);
+      assertEquals(znRecord.getSimpleField(CommonConstants.Segment.FLUSH_THRESHOLD_SIZE),
+          Integer.toString(getRealtimeSegmentFlushSize() / getNumKafkaPartitions()),
+          "Segment: " + segmentName + " does not have the expected flush size");
+    }
+  }
+
+  @Test
+  public void testSegmentDownloadURLs() {
+    // Verify that all segments of even partition number have empty download url in zk.
+    String zkSegmentsPath = "/SEGMENTS/" + TableNameBuilder.REALTIME.tableNameWithType(getTableName());
+    List<String> segmentNames = _propertyStore.getChildNames(zkSegmentsPath, 0);
+    for (String segmentName : segmentNames) {
+      ZNRecord znRecord = _propertyStore.get(zkSegmentsPath + "/" + segmentName, null, 0);
+      String downloadURL = znRecord.getSimpleField("segment.realtime.download.url");
+      String numberOfDoc = znRecord.getSimpleField("segment.total.docs");
+      if (numberOfDoc.equals("-1")) {
+        // This is a consuming segment so the download url is null.
+        Assert.assertNull(downloadURL);
+        continue;
+      }
+      int seqNum = Integer.parseInt(segmentName.split("__")[2]);
+      if (seqNum % UPLOAD_FAILURE_MOD == 0) {
+        Assert.assertEquals("", downloadURL);
+      } else {
+        Assert.assertTrue(downloadURL.startsWith("mockfs://"));
+      }
+    }
+  }
+
+  @Test
+  public void testAllSegmentsAreOnlineOrConsuming() {
+    ExternalView externalView =
+        HelixHelper.getExternalViewForResource(_helixAdmin, getHelixClusterName(),
+            TableNameBuilder.REALTIME.tableNameWithType(getTableName()));
+    Assert.assertEquals("2", externalView.getReplicas());
+    // Verify for each segment e, the state of e in its 2 hosting servers is either ONLINE or CONSUMING
+    for(String segment : externalView.getPartitionSet()) {
+      Map<String, String> instanceToStateMap = externalView.getStateMap(segment);
+      Assert.assertEquals(2, instanceToStateMap.size());
+      for (Map.Entry<String, String> instanceState : instanceToStateMap.entrySet()) {
+        Assert.assertTrue("ONLINE".equalsIgnoreCase(instanceState.getValue()) || "CONSUMING"
+            .equalsIgnoreCase(instanceState.getValue()));
+      }
+    }
+  }
+
+  @Test(expectedExceptions = IOException.class)
+  public void testAddHLCTableShouldFail()
+      throws IOException {
+    TableConfig tableConfig = new TableConfigBuilder(TableType.REALTIME).setTableName("testTable")
+        .setStreamConfigs(Collections.singletonMap("stream.kafka.consumer.type", "HIGHLEVEL")).build();
+    sendPostRequest(_controllerRequestURLBuilder.forTableCreate(), tableConfig.toJsonString());
+  }
+
+  // MockPinotFS is a localPinotFS whose root directory is configured as _basePath;
+  public static class MockPinotFS extends PinotFS {
+    LocalPinotFS _localPinotFS = new LocalPinotFS();
+    File _basePath;
+    @Override
+    public void init(PinotConfiguration config) {
+      _localPinotFS.init(config);
+      _basePath = PeerDownloadLLCRealtimeClusterIntegrationTest.PINOT_FS_ROOT_DIR;
+    }
+
+    @Override
+    public boolean mkdir(URI uri)
+        throws IOException {
+      try {
+        return _localPinotFS.mkdir(new URI(_basePath + uri.getPath()));
+      } catch (URISyntaxException e) {
+        throw new IllegalArgumentException(e.getMessage());
+      }
+    }
+
+    @Override
+    public boolean delete(URI segmentUri, boolean forceDelete)
+        throws IOException {
+      try {
+        return _localPinotFS.delete(new URI(_basePath + segmentUri.getPath()), forceDelete);
+      } catch (URISyntaxException e) {
+        throw new IllegalArgumentException(e.getMessage());
+      }
+    }
+
+    @Override
+    public boolean doMove(URI srcUri, URI dstUri)
+        throws IOException {
+      try {
+        LOGGER.warn("Moving from {} to {}", srcUri, dstUri);
+        return _localPinotFS.doMove(new URI(_basePath + srcUri.getPath()), new URI(_basePath + dstUri.getPath()));
+      } catch (URISyntaxException e) {
+        throw new IllegalArgumentException(e.getMessage());
+      }
+    }
+
+    @Override
+    public boolean copy(URI srcUri, URI dstUri)
+        throws IOException {
+      try {
+        return _localPinotFS.copy(new URI(_basePath + srcUri.getPath()), new URI(_basePath + dstUri.getPath()));
+      } catch (URISyntaxException e) {
+        throw new IllegalArgumentException(e.getMessage());
+      }
+    }
+
+    @Override
+    public boolean exists(URI fileUri)
+        throws IOException {
+      try {
+        return _localPinotFS.exists(new URI(_basePath + fileUri.getPath()));
+      } catch (URISyntaxException e) {
+        throw new IllegalArgumentException(e.getMessage());
+      }
+    }
+
+    @Override
+    public long length(URI fileUri)
+        throws IOException {
+      try {
+        return _localPinotFS.length(new URI(_basePath + fileUri.getPath()));
+      } catch (URISyntaxException e) {
+        throw new IllegalArgumentException(e.getMessage());
+      }
+    }
+
+    @Override
+    public String[] listFiles(URI fileUri, boolean recursive)
+        throws IOException {
+      try {
+        return _localPinotFS.listFiles(new URI(_basePath + fileUri.getPath()), recursive);
+      } catch (URISyntaxException e) {
+        throw new IllegalArgumentException(e.getMessage());
+      }
+    }
+
+    @Override
+    public void copyToLocalFile(URI srcUri, File dstFile)
+        throws Exception {
+      _localPinotFS.copyToLocalFile(new URI(_basePath + srcUri.getPath()), dstFile);
+    }
+
+    @Override
+    public void copyFromLocalFile(File srcFile, URI dstUri)
+        throws Exception {
+      // Inject failures for segments whose seq number mod 5 is 0.
+      if (new LLCSegmentName(srcFile.getName()).getSequenceNumber() % UPLOAD_FAILURE_MOD == 0) {
+        throw new IllegalArgumentException(srcFile.getAbsolutePath());
+      }
+      try {
+        _localPinotFS.copyFromLocalFile(srcFile, new URI(_basePath + dstUri.getPath()));
+      } catch (URISyntaxException e) {
+        throw new IllegalArgumentException(e.getMessage());
+      }
+    }
+
+    @Override
+    public boolean isDirectory(URI uri)
+        throws IOException {
+      try {
+        return _localPinotFS.isDirectory(new URI(_basePath + uri.getPath()));
+      } catch (URISyntaxException e) {
+        throw new IllegalArgumentException(e.getMessage());
+      }
+    }
+
+    @Override
+    public long lastModified(URI uri)
+        throws IOException {
+      try {
+        return _localPinotFS.lastModified(new URI(_basePath + uri.getPath()));
+      } catch (URISyntaxException e) {
+        throw new IllegalArgumentException(e.getMessage());
+      }
+    }
+
+    @Override
+    public boolean touch(URI uri)
+        throws IOException {
+      try {
+        return _localPinotFS.touch(new URI(_basePath + uri.getPath()));
+      } catch (URISyntaxException e) {
+        throw new IllegalArgumentException(e.getMessage());
+      }
+    }
+
+    @Override
+    public InputStream open(URI uri)
+        throws IOException {
+      try {
+        return _localPinotFS.open(new URI(_basePath + uri.getPath()));
+      } catch (URISyntaxException e) {
+        throw new IllegalArgumentException(e.getMessage());
+      }
+    }
+  }
+}
\ No newline at end of file


---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscribe@pinot.apache.org
For additional commands, e-mail: commits-help@pinot.apache.org