You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@pinot.apache.org by GitBox <gi...@apache.org> on 2020/08/13 06:35:52 UTC

[GitHub] [incubator-pinot] chenboat opened a new pull request #5857: Add a Deepstore bypass integration test without minor bug fixes to make the feature work.

chenboat opened a new pull request #5857:
URL: https://github.com/apache/incubator-pinot/pull/5857


   ## Description
   Add an integration test that extends RealtimeClusterIntegrationTest but uses 2 servers and a fake PinotFS as the deep store for segments. This test enables the peer to peer segment download scheme to test Pinot servers can download segments from peer servers even the deep store is down. This is done by controlled injection of failures in the fake PinotFS segment upload api (i.e., copyFromLocal) for all segments of even partition number.
   
   Besides standard tests, it also verifies that
    (1) All the segments on all servers are in either ONLINE or CONSUMING states
    (2) For segments failed during deep store upload, the corresponding segment download url string is empty in Zk.
   
   ## Upgrade Notes
   Does this PR prevent a zero down-time upgrade? (Assume upgrade order: Controller, Broker, Server, Minion)
   * [ ] Yes (Please label as **<code>backward-incompat</code>**, and complete the section below on Release Notes)
   
   Does this PR fix a zero-downtime upgrade introduced earlier?
   * [ ] Yes (Please label this as **<code>backward-incompat</code>**, and complete the section below on Release Notes)
   
   Does this PR otherwise need attention when creating release notes? Things to consider:
   - New configuration options
   - Deprecation of configurations
   - Signature changes to public methods/interfaces
   - New plugins added or old plugins removed
   * [ ] Yes (Please label this PR as **<code>release-notes</code>** and complete the section on Release Notes)
   ## Release Notes
   If you have tagged this as either backward-incompat or release-notes,
   you MUST add text here that you would like to see appear in release notes of the
   next release.
   
   If you have a series of commits adding or enabling a feature, then
   add this section only in final commit that marks the feature completed.
   Refer to earlier release notes to see examples of text
   
   ## Documentation
   If you have introduced a new feature or configuration, please add it to the documentation as well.
   See https://docs.pinot.apache.org/developers/developers-and-contributors/update-document
   


----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscribe@pinot.apache.org
For additional commands, e-mail: commits-help@pinot.apache.org


[GitHub] [incubator-pinot] codecov-commenter commented on pull request #5857: [Deepstore by-pass]Add a Deepstore bypass integration test with minor bug fixes.

Posted by GitBox <gi...@apache.org>.
codecov-commenter commented on pull request #5857:
URL: https://github.com/apache/incubator-pinot/pull/5857#issuecomment-685315760


   # [Codecov](https://codecov.io/gh/apache/incubator-pinot/pull/5857?src=pr&el=h1) Report
   > Merging [#5857](https://codecov.io/gh/apache/incubator-pinot/pull/5857?src=pr&el=desc) into [master](https://codecov.io/gh/apache/incubator-pinot/commit/1beaab59b73f26c4e35f3b9bc856b03806cddf5a?el=desc) will **increase** coverage by `0.94%`.
   > The diff coverage is `70.06%`.
   
   [![Impacted file tree graph](https://codecov.io/gh/apache/incubator-pinot/pull/5857/graphs/tree.svg?width=650&height=150&src=pr&token=4ibza2ugkz)](https://codecov.io/gh/apache/incubator-pinot/pull/5857?src=pr&el=tree)
   
   ```diff
   @@            Coverage Diff             @@
   ##           master    #5857      +/-   ##
   ==========================================
   + Coverage   66.44%   67.38%   +0.94%     
   ==========================================
     Files        1075     1192     +117     
     Lines       54773    62655    +7882     
     Branches     8168     9561    +1393     
   ==========================================
   + Hits        36396    42223    +5827     
   - Misses      15700    17328    +1628     
   - Partials     2677     3104     +427     
   ```
   
   | Flag | Coverage Δ | |
   |---|---|---|
   | #integration | `43.65% <46.55%> (?)` | |
   | #unittests | `58.84% <57.79%> (?)` | |
   
   Flags with carried forward coverage won't be shown. [Click here](https://docs.codecov.io/docs/carryforward-flags#carryforward-flags-in-the-pull-request-comment) to find out more.
   
   | [Impacted Files](https://codecov.io/gh/apache/incubator-pinot/pull/5857?src=pr&el=tree) | Coverage Δ | |
   |---|---|---|
   | [...ot/broker/broker/AllowAllAccessControlFactory.java](https://codecov.io/gh/apache/incubator-pinot/pull/5857/diff?src=pr&el=tree#diff-cGlub3QtYnJva2VyL3NyYy9tYWluL2phdmEvb3JnL2FwYWNoZS9waW5vdC9icm9rZXIvYnJva2VyL0FsbG93QWxsQWNjZXNzQ29udHJvbEZhY3RvcnkuamF2YQ==) | `100.00% <ø> (ø)` | |
   | [.../helix/BrokerUserDefinedMessageHandlerFactory.java](https://codecov.io/gh/apache/incubator-pinot/pull/5857/diff?src=pr&el=tree#diff-cGlub3QtYnJva2VyL3NyYy9tYWluL2phdmEvb3JnL2FwYWNoZS9waW5vdC9icm9rZXIvYnJva2VyL2hlbGl4L0Jyb2tlclVzZXJEZWZpbmVkTWVzc2FnZUhhbmRsZXJGYWN0b3J5LmphdmE=) | `52.83% <0.00%> (-13.84%)` | :arrow_down: |
   | [...ava/org/apache/pinot/client/AbstractResultSet.java](https://codecov.io/gh/apache/incubator-pinot/pull/5857/diff?src=pr&el=tree#diff-cGlub3QtY2xpZW50cy9waW5vdC1qYXZhLWNsaWVudC9zcmMvbWFpbi9qYXZhL29yZy9hcGFjaGUvcGlub3QvY2xpZW50L0Fic3RyYWN0UmVzdWx0U2V0LmphdmE=) | `53.33% <0.00%> (-3.81%)` | :arrow_down: |
   | [.../main/java/org/apache/pinot/client/Connection.java](https://codecov.io/gh/apache/incubator-pinot/pull/5857/diff?src=pr&el=tree#diff-cGlub3QtY2xpZW50cy9waW5vdC1qYXZhLWNsaWVudC9zcmMvbWFpbi9qYXZhL29yZy9hcGFjaGUvcGlub3QvY2xpZW50L0Nvbm5lY3Rpb24uamF2YQ==) | `44.44% <0.00%> (-4.40%)` | :arrow_down: |
   | [.../org/apache/pinot/client/ResultTableResultSet.java](https://codecov.io/gh/apache/incubator-pinot/pull/5857/diff?src=pr&el=tree#diff-cGlub3QtY2xpZW50cy9waW5vdC1qYXZhLWNsaWVudC9zcmMvbWFpbi9qYXZhL29yZy9hcGFjaGUvcGlub3QvY2xpZW50L1Jlc3VsdFRhYmxlUmVzdWx0U2V0LmphdmE=) | `24.00% <0.00%> (-10.29%)` | :arrow_down: |
   | [...not/common/lineage/SegmentLineageAccessHelper.java](https://codecov.io/gh/apache/incubator-pinot/pull/5857/diff?src=pr&el=tree#diff-cGlub3QtY29tbW9uL3NyYy9tYWluL2phdmEvb3JnL2FwYWNoZS9waW5vdC9jb21tb24vbGluZWFnZS9TZWdtZW50TGluZWFnZUFjY2Vzc0hlbHBlci5qYXZh) | `93.33% <ø> (ø)` | |
   | [...ache/pinot/common/lineage/SegmentLineageUtils.java](https://codecov.io/gh/apache/incubator-pinot/pull/5857/diff?src=pr&el=tree#diff-cGlub3QtY29tbW9uL3NyYy9tYWluL2phdmEvb3JnL2FwYWNoZS9waW5vdC9jb21tb24vbGluZWFnZS9TZWdtZW50TGluZWFnZVV0aWxzLmphdmE=) | `33.33% <ø> (ø)` | |
   | [...ot/common/messages/RoutingTableRebuildMessage.java](https://codecov.io/gh/apache/incubator-pinot/pull/5857/diff?src=pr&el=tree#diff-cGlub3QtY29tbW9uL3NyYy9tYWluL2phdmEvb3JnL2FwYWNoZS9waW5vdC9jb21tb24vbWVzc2FnZXMvUm91dGluZ1RhYmxlUmVidWlsZE1lc3NhZ2UuamF2YQ==) | `54.54% <ø> (ø)` | |
   | [...ache/pinot/common/metadata/ZKMetadataProvider.java](https://codecov.io/gh/apache/incubator-pinot/pull/5857/diff?src=pr&el=tree#diff-cGlub3QtY29tbW9uL3NyYy9tYWluL2phdmEvb3JnL2FwYWNoZS9waW5vdC9jb21tb24vbWV0YWRhdGEvWktNZXRhZGF0YVByb3ZpZGVyLmphdmE=) | `72.22% <ø> (+5.36%)` | :arrow_up: |
   | [...metadata/segment/LLCRealtimeSegmentZKMetadata.java](https://codecov.io/gh/apache/incubator-pinot/pull/5857/diff?src=pr&el=tree#diff-cGlub3QtY29tbW9uL3NyYy9tYWluL2phdmEvb3JnL2FwYWNoZS9waW5vdC9jb21tb24vbWV0YWRhdGEvc2VnbWVudC9MTENSZWFsdGltZVNlZ21lbnRaS01ldGFkYXRhLmphdmE=) | `47.61% <ø> (+3.35%)` | :arrow_up: |
   | ... and [863 more](https://codecov.io/gh/apache/incubator-pinot/pull/5857/diff?src=pr&el=tree-more) | |
   
   ------
   
   [Continue to review full report at Codecov](https://codecov.io/gh/apache/incubator-pinot/pull/5857?src=pr&el=continue).
   > **Legend** - [Click here to learn more](https://docs.codecov.io/docs/codecov-delta)
   > `Δ = absolute <relative> (impact)`, `ø = not affected`, `? = missing data`
   > Powered by [Codecov](https://codecov.io/gh/apache/incubator-pinot/pull/5857?src=pr&el=footer). Last update [d444285...7d0cce6](https://codecov.io/gh/apache/incubator-pinot/pull/5857?src=pr&el=lastupdated). Read the [comment docs](https://docs.codecov.io/docs/pull-request-comments).
   


----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscribe@pinot.apache.org
For additional commands, e-mail: commits-help@pinot.apache.org


[GitHub] [incubator-pinot] chenboat merged pull request #5857: [Deepstore by-pass]Add a Deepstore bypass integration test with minor bug fixes.

Posted by GitBox <gi...@apache.org>.
chenboat merged pull request #5857:
URL: https://github.com/apache/incubator-pinot/pull/5857


   


----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscribe@pinot.apache.org
For additional commands, e-mail: commits-help@pinot.apache.org


[GitHub] [incubator-pinot] chenboat commented on a change in pull request #5857: [Deepstore by-pass]Add a Deepstore bypass integration test with minor bug fixes.

Posted by GitBox <gi...@apache.org>.
chenboat commented on a change in pull request #5857:
URL: https://github.com/apache/incubator-pinot/pull/5857#discussion_r481719872



##########
File path: pinot-integration-tests/src/test/java/org/apache/pinot/integration/tests/PeerDownloadLLCRealtimeClusterIntegrationTest.java
##########
@@ -0,0 +1,379 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.pinot.integration.tests;
+
+import com.google.common.base.Preconditions;
+import java.io.File;
+import java.io.IOException;
+import java.io.InputStream;
+import java.net.URI;
+import java.net.URISyntaxException;
+import java.util.Collections;
+import java.util.List;
+import java.util.Map;
+import java.util.Random;
+import org.apache.avro.reflect.Nullable;
+import org.apache.commons.io.FileUtils;
+import org.apache.helix.ZNRecord;
+import org.apache.helix.model.ExternalView;
+import org.apache.pinot.common.utils.CommonConstants;
+import org.apache.pinot.common.utils.helix.HelixHelper;
+import org.apache.pinot.controller.ControllerConf;
+import org.apache.pinot.spi.config.table.CompletionConfig;
+import org.apache.pinot.spi.config.table.SegmentsValidationAndRetentionConfig;
+import org.apache.pinot.spi.config.table.TableConfig;
+import org.apache.pinot.spi.config.table.TableType;
+import org.apache.pinot.spi.env.PinotConfiguration;
+import org.apache.pinot.spi.filesystem.LocalPinotFS;
+import org.apache.pinot.spi.filesystem.PinotFS;
+import org.apache.pinot.spi.utils.builder.TableConfigBuilder;
+import org.apache.pinot.spi.utils.builder.TableNameBuilder;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+import org.testng.Assert;
+import org.testng.annotations.BeforeClass;
+import org.testng.annotations.Test;
+
+import static org.apache.pinot.controller.ControllerConf.ALLOW_HLC_TABLES;
+import static org.apache.pinot.controller.ControllerConf.ENABLE_SPLIT_COMMIT;
+import static org.testng.Assert.assertEquals;
+
+
+/**
+ * Integration test that extends RealtimeClusterIntegrationTest but uses low-level Kafka consumer and a fake PinotFS as
+ * the deep store for segments. This test enables the peer to peer segment download scheme to test Pinot servers can
+ * download segments from peer servers even the deep store is down. This is done by injection of failures in
+ * the fake PinotFS segment upload api (i.e., copyFromLocal) for all segments whose seq number mod 5 is 0.
+ *
+ * Besides standard tests, it also verifies that
+ * (1) All the segments on all servers are in either ONLINE or CONSUMING states
+ * (2) For segments failed during deep store upload, the corresponding segment download url string is empty in Zk.
+ */
+public class PeerDownloadLLCRealtimeClusterIntegrationTest extends RealtimeClusterIntegrationTest {
+  private static final Logger LOGGER = LoggerFactory.getLogger(PeerDownloadLLCRealtimeClusterIntegrationTest.class);
+
+  private static final String CONSUMER_DIRECTORY = "/tmp/consumer-test";
+  private static final long RANDOM_SEED = System.currentTimeMillis();
+  private static final Random RANDOM = new Random(RANDOM_SEED);
+  private static final int NUM_SERVERS = 2;
+  public static final int UPLOAD_FAILURE_MOD = 5;
+
+  private final boolean _isDirectAlloc = true; //Set as true; otherwise trigger indexing exception.
+  private final boolean _isConsumerDirConfigured = true;
+  private final boolean _enableSplitCommit = true;
+  private final boolean _enableLeadControllerResource = RANDOM.nextBoolean();
+  private static File PINOT_FS_ROOT_DIR;
+
+  @BeforeClass
+  @Override
+  public void setUp()
+      throws Exception {
+    System.out.println(String.format(
+        "Using random seed: %s, isDirectAlloc: %s, isConsumerDirConfigured: %s, enableSplitCommit: %s, enableLeadControllerResource: %s",
+        RANDOM_SEED, _isDirectAlloc, _isConsumerDirConfigured, _enableSplitCommit, _enableLeadControllerResource));
+
+    PINOT_FS_ROOT_DIR = new File(FileUtils.getTempDirectoryPath() + File.separator + System.currentTimeMillis() + "/");
+    Preconditions.checkState(PINOT_FS_ROOT_DIR.mkdir(), "Failed to make a dir for " + PINOT_FS_ROOT_DIR.getPath());
+
+    // Remove the consumer directory
+    File consumerDirectory = new File(CONSUMER_DIRECTORY);
+    if (consumerDirectory.exists()) {
+      FileUtils.deleteDirectory(consumerDirectory);
+    }
+    super.setUp();
+  }
+
+
+  @Override
+  public void startServer() {
+    startServers(NUM_SERVERS);
+  }
+
+  @Override
+  public void addTableConfig(TableConfig tableConfig)
+      throws IOException {
+    SegmentsValidationAndRetentionConfig segmentsValidationAndRetentionConfig =
+        new SegmentsValidationAndRetentionConfig();
+    CompletionConfig completionConfig = new CompletionConfig("DOWNLOAD");
+    segmentsValidationAndRetentionConfig.setCompletionConfig(completionConfig);
+    segmentsValidationAndRetentionConfig.setReplicasPerPartition(String.valueOf(NUM_SERVERS));
+    // Important: enable peer to peer download.
+    segmentsValidationAndRetentionConfig.setPeerSegmentDownloadScheme("http");
+    tableConfig.setValidationConfig(segmentsValidationAndRetentionConfig);
+    tableConfig.getValidationConfig().setTimeColumnName(this.getTimeColumnName());
+
+    sendPostRequest(_controllerRequestURLBuilder.forTableCreate(), tableConfig.toJsonString());
+  }
+
+
+  @Override
+  public void startController() {
+    Map<String, Object> controllerConfig = getDefaultControllerConfiguration();
+    controllerConfig.put(ALLOW_HLC_TABLES, false);
+    controllerConfig.put(ENABLE_SPLIT_COMMIT, _enableSplitCommit);
+    // Override the data dir config.
+    controllerConfig.put(ControllerConf.DATA_DIR, "mockfs://" + getHelixClusterName());
+    controllerConfig.put(ControllerConf.LOCAL_TEMP_DIR, FileUtils.getTempDirectory().getAbsolutePath());
+    // Use the mock PinotFS as the PinotFS.
+    controllerConfig.put("pinot.controller.storage.factory.class.mockfs",
+        "org.apache.pinot.integration.tests.PeerDownloadLLCRealtimeClusterIntegrationTest$MockPinotFS");
+    startController(controllerConfig);
+    enableResourceConfigForLeadControllerResource(_enableLeadControllerResource);
+  }
+
+  @Override
+  protected boolean useLlc() {
+    return true;
+  }
+
+  @Nullable
+  @Override
+  protected String getLoadMode() {
+    return "MMAP";
+  }
+
+  @Override
+  protected void overrideServerConf(PinotConfiguration configuration) {
+    configuration.setProperty(CommonConstants.Server.CONFIG_OF_REALTIME_OFFHEAP_ALLOCATION, true);
+    configuration.setProperty(CommonConstants.Server.CONFIG_OF_REALTIME_OFFHEAP_DIRECT_ALLOCATION, _isDirectAlloc);
+    configuration.setProperty(CommonConstants.Server.PREFIX_OF_CONFIG_OF_PINOT_FS_FACTORY + ".class.mockfs",
+        "org.apache.pinot.integration.tests.PeerDownloadLLCRealtimeClusterIntegrationTest$MockPinotFS");
+    // Set the segment deep store uri.
+    configuration.setProperty("pinot.server.instance.segment.store.uri", "mockfs://" + getHelixClusterName());
+    // For setting the HDFS segment fetcher.
+    configuration.setProperty(CommonConstants.Server.PREFIX_OF_CONFIG_OF_SEGMENT_FETCHER_FACTORY + ".protocols", "file,http");
+    if (_isConsumerDirConfigured) {
+      configuration.setProperty(CommonConstants.Server.CONFIG_OF_CONSUMER_DIR, CONSUMER_DIRECTORY);
+    }
+    if (_enableSplitCommit) {
+      configuration.setProperty(CommonConstants.Server.CONFIG_OF_ENABLE_SPLIT_COMMIT, true);
+      configuration.setProperty(CommonConstants.Server.CONFIG_OF_ENABLE_COMMIT_END_WITH_METADATA, true);
+    }
+  }
+
+  @Test
+  public void testConsumerDirectoryExists() {
+    File consumerDirectory = new File(CONSUMER_DIRECTORY, "mytable_REALTIME");
+    assertEquals(consumerDirectory.exists(), _isConsumerDirConfigured,
+        "The off heap consumer directory does not exist");
+  }
+
+  @Test
+  public void testSegmentFlushSize() {
+    String zkSegmentsPath = "/SEGMENTS/" + TableNameBuilder.REALTIME.tableNameWithType(getTableName());
+    List<String> segmentNames = _propertyStore.getChildNames(zkSegmentsPath, 0);
+    for (String segmentName : segmentNames) {
+      ZNRecord znRecord = _propertyStore.get(zkSegmentsPath + "/" + segmentName, null, 0);
+      assertEquals(znRecord.getSimpleField(CommonConstants.Segment.FLUSH_THRESHOLD_SIZE),
+          Integer.toString(getRealtimeSegmentFlushSize() / getNumKafkaPartitions()),
+          "Segment: " + segmentName + " does not have the expected flush size");
+    }
+  }
+
+  @Test
+  public void testSegmentDownloadURLs() {
+    // Verify that all segments of even partition number have empty download url in zk.
+    String zkSegmentsPath = "/SEGMENTS/" + TableNameBuilder.REALTIME.tableNameWithType(getTableName());
+    List<String> segmentNames = _propertyStore.getChildNames(zkSegmentsPath, 0);
+    for (String segmentName : segmentNames) {
+      ZNRecord znRecord = _propertyStore.get(zkSegmentsPath + "/" + segmentName, null, 0);
+      String downloadURL = znRecord.getSimpleField("segment.realtime.download.url");
+      String numberOfDoc = znRecord.getSimpleField("segment.total.docs");
+      if (numberOfDoc.equals("-1")) {
+        // This is a consuming segment so the download url is null.
+        Assert.assertNull(downloadURL);
+        continue;
+      }
+      int seqNum = Integer.parseInt(segmentName.split("__")[2]);
+      if (seqNum % UPLOAD_FAILURE_MOD == 0) {
+        Assert.assertEquals("", downloadURL);
+      } else {
+        Assert.assertTrue(downloadURL.startsWith("mockfs://"));
+      }
+    }
+  }
+
+  @Test
+  public void testAllSegmentsAreOnlineOrConsuming() {
+    ExternalView externalView =
+        HelixHelper.getExternalViewForResource(_helixAdmin, getHelixClusterName(),
+            TableNameBuilder.REALTIME.tableNameWithType(getTableName()));
+    Assert.assertEquals("2", externalView.getReplicas());
+    // Verify for each segment e, the state of e in its 2 hosting servers is either ONLINE or CONSUMING
+    for(String segment : externalView.getPartitionSet()) {
+      Map<String, String> instanceToStateMap = externalView.getStateMap(segment);
+      Assert.assertEquals(2, instanceToStateMap.size());
+      for (Map.Entry<String, String> instanceState : instanceToStateMap.entrySet()) {
+        Assert.assertTrue("ONLINE".equalsIgnoreCase(instanceState.getValue()) || "CONSUMING"
+            .equalsIgnoreCase(instanceState.getValue()));
+      }
+    }
+  }
+
+  @Test(expectedExceptions = IOException.class)
+  public void testAddHLCTableShouldFail()
+      throws IOException {
+    TableConfig tableConfig = new TableConfigBuilder(TableType.REALTIME).setTableName("testTable")
+        .setStreamConfigs(Collections.singletonMap("stream.kafka.consumer.type", "HIGHLEVEL")).build();
+    sendPostRequest(_controllerRequestURLBuilder.forTableCreate(), tableConfig.toJsonString());
+  }
+
+  // MockPinotFS is a localPinotFS whose root directory is configured as _basePath;
+  public static class MockPinotFS extends PinotFS {
+    LocalPinotFS _localPinotFS = new LocalPinotFS();
+    File _basePath;
+    @Override
+    public void init(PinotConfiguration config) {
+      _localPinotFS.init(config);
+      _basePath = PeerDownloadLLCRealtimeClusterIntegrationTest.PINOT_FS_ROOT_DIR;
+    }
+
+    @Override
+    public boolean mkdir(URI uri)
+        throws IOException {
+      try {
+        return _localPinotFS.mkdir(new URI(_basePath + uri.getPath()));
+      } catch (URISyntaxException e) {
+        throw new IllegalArgumentException(e.getMessage());
+      }
+    }
+
+    @Override
+    public boolean delete(URI segmentUri, boolean forceDelete)
+        throws IOException {
+      try {
+        return _localPinotFS.delete(new URI(_basePath + segmentUri.getPath()), forceDelete);
+      } catch (URISyntaxException e) {
+        throw new IllegalArgumentException(e.getMessage());
+      }
+    }
+
+    @Override
+    public boolean doMove(URI srcUri, URI dstUri)
+        throws IOException {
+      try {
+        LOGGER.warn("Moving from {} to {}", srcUri, dstUri);
+        return _localPinotFS.doMove(new URI(_basePath + srcUri.getPath()), new URI(_basePath + dstUri.getPath()));
+      } catch (URISyntaxException e) {
+        throw new IllegalArgumentException(e.getMessage());
+      }
+    }
+
+    @Override
+    public boolean copy(URI srcUri, URI dstUri)
+        throws IOException {
+      try {
+        return _localPinotFS.copy(new URI(_basePath + srcUri.getPath()), new URI(_basePath + dstUri.getPath()));
+      } catch (URISyntaxException e) {
+        throw new IllegalArgumentException(e.getMessage());
+      }
+    }
+
+    @Override
+    public boolean exists(URI fileUri)
+        throws IOException {
+      try {
+        return _localPinotFS.exists(new URI(_basePath + fileUri.getPath()));
+      } catch (URISyntaxException e) {
+        throw new IllegalArgumentException(e.getMessage());
+      }
+    }
+
+    @Override
+    public long length(URI fileUri)
+        throws IOException {
+      try {
+        return _localPinotFS.length(new URI(_basePath + fileUri.getPath()));
+      } catch (URISyntaxException e) {
+        throw new IllegalArgumentException(e.getMessage());
+      }
+    }
+
+    @Override
+    public String[] listFiles(URI fileUri, boolean recursive)
+        throws IOException {
+      try {
+        return _localPinotFS.listFiles(new URI(_basePath + fileUri.getPath()), recursive);
+      } catch (URISyntaxException e) {
+        throw new IllegalArgumentException(e.getMessage());
+      }
+    }
+
+    @Override
+    public void copyToLocalFile(URI srcUri, File dstFile)
+        throws Exception {
+      _localPinotFS.copyToLocalFile(new URI(_basePath + srcUri.getPath()), dstFile);
+    }
+
+    @Override
+    public void copyFromLocalFile(File srcFile, URI dstUri)
+        throws Exception {
+      // Inject failures for segments whose seq number mod 5 is 0.
+      String[] parts = srcFile.getName().split("__");
+      if (parts.length > 3 && (Integer.parseInt(parts[2]) % UPLOAD_FAILURE_MOD == 0)) {

Review comment:
       Done. Thanks for the good idea.




----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscribe@pinot.apache.org
For additional commands, e-mail: commits-help@pinot.apache.org


[GitHub] [incubator-pinot] mcvsubbu commented on a change in pull request #5857: [Deepstore by-pass]Add a Deepstore bypass integration test with minor bug fixes.

Posted by GitBox <gi...@apache.org>.
mcvsubbu commented on a change in pull request #5857:
URL: https://github.com/apache/incubator-pinot/pull/5857#discussion_r481348629



##########
File path: pinot-integration-tests/src/test/java/org/apache/pinot/integration/tests/PeerDownloadLLCRealtimeClusterIntegrationTest.java
##########
@@ -0,0 +1,379 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.pinot.integration.tests;
+
+import com.google.common.base.Preconditions;
+import java.io.File;
+import java.io.IOException;
+import java.io.InputStream;
+import java.net.URI;
+import java.net.URISyntaxException;
+import java.util.Collections;
+import java.util.List;
+import java.util.Map;
+import java.util.Random;
+import org.apache.avro.reflect.Nullable;
+import org.apache.commons.io.FileUtils;
+import org.apache.helix.ZNRecord;
+import org.apache.helix.model.ExternalView;
+import org.apache.pinot.common.utils.CommonConstants;
+import org.apache.pinot.common.utils.helix.HelixHelper;
+import org.apache.pinot.controller.ControllerConf;
+import org.apache.pinot.spi.config.table.CompletionConfig;
+import org.apache.pinot.spi.config.table.SegmentsValidationAndRetentionConfig;
+import org.apache.pinot.spi.config.table.TableConfig;
+import org.apache.pinot.spi.config.table.TableType;
+import org.apache.pinot.spi.env.PinotConfiguration;
+import org.apache.pinot.spi.filesystem.LocalPinotFS;
+import org.apache.pinot.spi.filesystem.PinotFS;
+import org.apache.pinot.spi.utils.builder.TableConfigBuilder;
+import org.apache.pinot.spi.utils.builder.TableNameBuilder;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+import org.testng.Assert;
+import org.testng.annotations.BeforeClass;
+import org.testng.annotations.Test;
+
+import static org.apache.pinot.controller.ControllerConf.ALLOW_HLC_TABLES;
+import static org.apache.pinot.controller.ControllerConf.ENABLE_SPLIT_COMMIT;
+import static org.testng.Assert.assertEquals;
+
+
+/**
+ * Integration test that extends RealtimeClusterIntegrationTest but uses low-level Kafka consumer and a fake PinotFS as
+ * the deep store for segments. This test enables the peer to peer segment download scheme to test Pinot servers can
+ * download segments from peer servers even the deep store is down. This is done by injection of failures in
+ * the fake PinotFS segment upload api (i.e., copyFromLocal) for all segments whose seq number mod 5 is 0.
+ *
+ * Besides standard tests, it also verifies that
+ * (1) All the segments on all servers are in either ONLINE or CONSUMING states
+ * (2) For segments failed during deep store upload, the corresponding segment download url string is empty in Zk.
+ */
+public class PeerDownloadLLCRealtimeClusterIntegrationTest extends RealtimeClusterIntegrationTest {
+  private static final Logger LOGGER = LoggerFactory.getLogger(PeerDownloadLLCRealtimeClusterIntegrationTest.class);
+
+  private static final String CONSUMER_DIRECTORY = "/tmp/consumer-test";
+  private static final long RANDOM_SEED = System.currentTimeMillis();
+  private static final Random RANDOM = new Random(RANDOM_SEED);
+  private static final int NUM_SERVERS = 2;
+  public static final int UPLOAD_FAILURE_MOD = 5;
+
+  private final boolean _isDirectAlloc = true; //Set as true; otherwise trigger indexing exception.
+  private final boolean _isConsumerDirConfigured = true;
+  private final boolean _enableSplitCommit = true;
+  private final boolean _enableLeadControllerResource = RANDOM.nextBoolean();
+  private static File PINOT_FS_ROOT_DIR;
+
+  @BeforeClass
+  @Override
+  public void setUp()
+      throws Exception {
+    System.out.println(String.format(
+        "Using random seed: %s, isDirectAlloc: %s, isConsumerDirConfigured: %s, enableSplitCommit: %s, enableLeadControllerResource: %s",
+        RANDOM_SEED, _isDirectAlloc, _isConsumerDirConfigured, _enableSplitCommit, _enableLeadControllerResource));
+
+    PINOT_FS_ROOT_DIR = new File(FileUtils.getTempDirectoryPath() + File.separator + System.currentTimeMillis() + "/");
+    Preconditions.checkState(PINOT_FS_ROOT_DIR.mkdir(), "Failed to make a dir for " + PINOT_FS_ROOT_DIR.getPath());
+
+    // Remove the consumer directory
+    File consumerDirectory = new File(CONSUMER_DIRECTORY);
+    if (consumerDirectory.exists()) {
+      FileUtils.deleteDirectory(consumerDirectory);
+    }
+    super.setUp();
+  }
+
+
+  @Override
+  public void startServer() {
+    startServers(NUM_SERVERS);
+  }
+
+  @Override
+  public void addTableConfig(TableConfig tableConfig)
+      throws IOException {
+    SegmentsValidationAndRetentionConfig segmentsValidationAndRetentionConfig =
+        new SegmentsValidationAndRetentionConfig();
+    CompletionConfig completionConfig = new CompletionConfig("DOWNLOAD");
+    segmentsValidationAndRetentionConfig.setCompletionConfig(completionConfig);
+    segmentsValidationAndRetentionConfig.setReplicasPerPartition(String.valueOf(NUM_SERVERS));
+    // Important: enable peer to peer download.
+    segmentsValidationAndRetentionConfig.setPeerSegmentDownloadScheme("http");
+    tableConfig.setValidationConfig(segmentsValidationAndRetentionConfig);
+    tableConfig.getValidationConfig().setTimeColumnName(this.getTimeColumnName());
+
+    sendPostRequest(_controllerRequestURLBuilder.forTableCreate(), tableConfig.toJsonString());
+  }
+
+
+  @Override
+  public void startController() {
+    Map<String, Object> controllerConfig = getDefaultControllerConfiguration();
+    controllerConfig.put(ALLOW_HLC_TABLES, false);
+    controllerConfig.put(ENABLE_SPLIT_COMMIT, _enableSplitCommit);
+    // Override the data dir config.
+    controllerConfig.put(ControllerConf.DATA_DIR, "mockfs://" + getHelixClusterName());
+    controllerConfig.put(ControllerConf.LOCAL_TEMP_DIR, FileUtils.getTempDirectory().getAbsolutePath());
+    // Use the mock PinotFS as the PinotFS.
+    controllerConfig.put("pinot.controller.storage.factory.class.mockfs",
+        "org.apache.pinot.integration.tests.PeerDownloadLLCRealtimeClusterIntegrationTest$MockPinotFS");
+    startController(controllerConfig);
+    enableResourceConfigForLeadControllerResource(_enableLeadControllerResource);
+  }
+
+  @Override
+  protected boolean useLlc() {
+    return true;
+  }
+
+  @Nullable
+  @Override
+  protected String getLoadMode() {
+    return "MMAP";
+  }
+
+  @Override
+  protected void overrideServerConf(PinotConfiguration configuration) {
+    configuration.setProperty(CommonConstants.Server.CONFIG_OF_REALTIME_OFFHEAP_ALLOCATION, true);
+    configuration.setProperty(CommonConstants.Server.CONFIG_OF_REALTIME_OFFHEAP_DIRECT_ALLOCATION, _isDirectAlloc);
+    configuration.setProperty(CommonConstants.Server.PREFIX_OF_CONFIG_OF_PINOT_FS_FACTORY + ".class.mockfs",
+        "org.apache.pinot.integration.tests.PeerDownloadLLCRealtimeClusterIntegrationTest$MockPinotFS");
+    // Set the segment deep store uri.
+    configuration.setProperty("pinot.server.instance.segment.store.uri", "mockfs://" + getHelixClusterName());
+    // For setting the HDFS segment fetcher.
+    configuration.setProperty(CommonConstants.Server.PREFIX_OF_CONFIG_OF_SEGMENT_FETCHER_FACTORY + ".protocols", "file,http");
+    if (_isConsumerDirConfigured) {
+      configuration.setProperty(CommonConstants.Server.CONFIG_OF_CONSUMER_DIR, CONSUMER_DIRECTORY);
+    }
+    if (_enableSplitCommit) {
+      configuration.setProperty(CommonConstants.Server.CONFIG_OF_ENABLE_SPLIT_COMMIT, true);
+      configuration.setProperty(CommonConstants.Server.CONFIG_OF_ENABLE_COMMIT_END_WITH_METADATA, true);
+    }
+  }
+
+  @Test
+  public void testConsumerDirectoryExists() {
+    File consumerDirectory = new File(CONSUMER_DIRECTORY, "mytable_REALTIME");
+    assertEquals(consumerDirectory.exists(), _isConsumerDirConfigured,
+        "The off heap consumer directory does not exist");
+  }
+
+  @Test
+  public void testSegmentFlushSize() {
+    String zkSegmentsPath = "/SEGMENTS/" + TableNameBuilder.REALTIME.tableNameWithType(getTableName());
+    List<String> segmentNames = _propertyStore.getChildNames(zkSegmentsPath, 0);
+    for (String segmentName : segmentNames) {
+      ZNRecord znRecord = _propertyStore.get(zkSegmentsPath + "/" + segmentName, null, 0);
+      assertEquals(znRecord.getSimpleField(CommonConstants.Segment.FLUSH_THRESHOLD_SIZE),
+          Integer.toString(getRealtimeSegmentFlushSize() / getNumKafkaPartitions()),
+          "Segment: " + segmentName + " does not have the expected flush size");
+    }
+  }
+
+  @Test
+  public void testSegmentDownloadURLs() {
+    // Verify that all segments of even partition number have empty download url in zk.
+    String zkSegmentsPath = "/SEGMENTS/" + TableNameBuilder.REALTIME.tableNameWithType(getTableName());
+    List<String> segmentNames = _propertyStore.getChildNames(zkSegmentsPath, 0);
+    for (String segmentName : segmentNames) {
+      ZNRecord znRecord = _propertyStore.get(zkSegmentsPath + "/" + segmentName, null, 0);
+      String downloadURL = znRecord.getSimpleField("segment.realtime.download.url");
+      String numberOfDoc = znRecord.getSimpleField("segment.total.docs");
+      if (numberOfDoc.equals("-1")) {
+        // This is a consuming segment so the download url is null.
+        Assert.assertNull(downloadURL);
+        continue;
+      }
+      int seqNum = Integer.parseInt(segmentName.split("__")[2]);
+      if (seqNum % UPLOAD_FAILURE_MOD == 0) {
+        Assert.assertEquals("", downloadURL);
+      } else {
+        Assert.assertTrue(downloadURL.startsWith("mockfs://"));
+      }
+    }
+  }
+
+  @Test
+  public void testAllSegmentsAreOnlineOrConsuming() {
+    ExternalView externalView =
+        HelixHelper.getExternalViewForResource(_helixAdmin, getHelixClusterName(),
+            TableNameBuilder.REALTIME.tableNameWithType(getTableName()));
+    Assert.assertEquals("2", externalView.getReplicas());
+    // Verify for each segment e, the state of e in its 2 hosting servers is either ONLINE or CONSUMING
+    for(String segment : externalView.getPartitionSet()) {
+      Map<String, String> instanceToStateMap = externalView.getStateMap(segment);
+      Assert.assertEquals(2, instanceToStateMap.size());
+      for (Map.Entry<String, String> instanceState : instanceToStateMap.entrySet()) {
+        Assert.assertTrue("ONLINE".equalsIgnoreCase(instanceState.getValue()) || "CONSUMING"
+            .equalsIgnoreCase(instanceState.getValue()));
+      }
+    }
+  }
+
+  @Test(expectedExceptions = IOException.class)
+  public void testAddHLCTableShouldFail()
+      throws IOException {
+    TableConfig tableConfig = new TableConfigBuilder(TableType.REALTIME).setTableName("testTable")
+        .setStreamConfigs(Collections.singletonMap("stream.kafka.consumer.type", "HIGHLEVEL")).build();
+    sendPostRequest(_controllerRequestURLBuilder.forTableCreate(), tableConfig.toJsonString());
+  }
+
+  // MockPinotFS is a localPinotFS whose root directory is configured as _basePath;
+  public static class MockPinotFS extends PinotFS {
+    LocalPinotFS _localPinotFS = new LocalPinotFS();
+    File _basePath;
+    @Override
+    public void init(PinotConfiguration config) {
+      _localPinotFS.init(config);
+      _basePath = PeerDownloadLLCRealtimeClusterIntegrationTest.PINOT_FS_ROOT_DIR;
+    }
+
+    @Override
+    public boolean mkdir(URI uri)
+        throws IOException {
+      try {
+        return _localPinotFS.mkdir(new URI(_basePath + uri.getPath()));
+      } catch (URISyntaxException e) {
+        throw new IllegalArgumentException(e.getMessage());
+      }
+    }
+
+    @Override
+    public boolean delete(URI segmentUri, boolean forceDelete)
+        throws IOException {
+      try {
+        return _localPinotFS.delete(new URI(_basePath + segmentUri.getPath()), forceDelete);
+      } catch (URISyntaxException e) {
+        throw new IllegalArgumentException(e.getMessage());
+      }
+    }
+
+    @Override
+    public boolean doMove(URI srcUri, URI dstUri)
+        throws IOException {
+      try {
+        LOGGER.warn("Moving from {} to {}", srcUri, dstUri);
+        return _localPinotFS.doMove(new URI(_basePath + srcUri.getPath()), new URI(_basePath + dstUri.getPath()));
+      } catch (URISyntaxException e) {
+        throw new IllegalArgumentException(e.getMessage());
+      }
+    }
+
+    @Override
+    public boolean copy(URI srcUri, URI dstUri)
+        throws IOException {
+      try {
+        return _localPinotFS.copy(new URI(_basePath + srcUri.getPath()), new URI(_basePath + dstUri.getPath()));
+      } catch (URISyntaxException e) {
+        throw new IllegalArgumentException(e.getMessage());
+      }
+    }
+
+    @Override
+    public boolean exists(URI fileUri)
+        throws IOException {
+      try {
+        return _localPinotFS.exists(new URI(_basePath + fileUri.getPath()));
+      } catch (URISyntaxException e) {
+        throw new IllegalArgumentException(e.getMessage());
+      }
+    }
+
+    @Override
+    public long length(URI fileUri)
+        throws IOException {
+      try {
+        return _localPinotFS.length(new URI(_basePath + fileUri.getPath()));
+      } catch (URISyntaxException e) {
+        throw new IllegalArgumentException(e.getMessage());
+      }
+    }
+
+    @Override
+    public String[] listFiles(URI fileUri, boolean recursive)
+        throws IOException {
+      try {
+        return _localPinotFS.listFiles(new URI(_basePath + fileUri.getPath()), recursive);
+      } catch (URISyntaxException e) {
+        throw new IllegalArgumentException(e.getMessage());
+      }
+    }
+
+    @Override
+    public void copyToLocalFile(URI srcUri, File dstFile)
+        throws Exception {
+      _localPinotFS.copyToLocalFile(new URI(_basePath + srcUri.getPath()), dstFile);
+    }
+
+    @Override
+    public void copyFromLocalFile(File srcFile, URI dstUri)
+        throws Exception {
+      // Inject failures for segments whose seq number mod 5 is 0.
+      String[] parts = srcFile.getName().split("__");
+      if (parts.length > 3 && (Integer.parseInt(parts[2]) % UPLOAD_FAILURE_MOD == 0)) {

Review comment:
       Can you use LLCSegmentName for this logic?




----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscribe@pinot.apache.org
For additional commands, e-mail: commits-help@pinot.apache.org