You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@pinot.apache.org by GitBox <gi...@apache.org> on 2020/05/05 21:12:17 UTC

[GitHub] [incubator-pinot] chenboat opened a new pull request #5336: [Part 4] Deep-store bypass for LLC: Add a peer to peer segment fetcher.

chenboat opened a new pull request #5336:
URL: https://github.com/apache/incubator-pinot/pull/5336


   This segment fetcher downloads the segment by first discovering the server having the segment through external view of a Pinot table and then downloading the segment from the peer server using a configured http or https fetcher. By default, HttpSegmentFetcher is used.
   
   @mcvsubbu 


----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscribe@pinot.apache.org
For additional commands, e-mail: commits-help@pinot.apache.org


[GitHub] [incubator-pinot] mcvsubbu commented on a change in pull request #5336: [Part 4] Deep-store bypass for LLC: Add a peer to peer segment fetcher.

Posted by GitBox <gi...@apache.org>.
mcvsubbu commented on a change in pull request #5336:
URL: https://github.com/apache/incubator-pinot/pull/5336#discussion_r436849376



##########
File path: pinot-core/src/main/java/org/apache/pinot/core/data/manager/realtime/RealtimeTableDataManager.java
##########
@@ -268,20 +273,81 @@ public void addSegment(String segmentName, TableConfig tableConfig, IndexLoading
   }
 
   public void downloadAndReplaceSegment(String segmentName, LLCRealtimeSegmentZKMetadata llcSegmentMetadata,
-      IndexLoadingConfig indexLoadingConfig) {
+      IndexLoadingConfig indexLoadingConfig, TableConfig tableConfig) {
     final String uri = llcSegmentMetadata.getDownloadUrl();
+    if (!"PEER".equalsIgnoreCase(uri)) {

Review comment:
       "PEER" is an unparseable uri, so it is not a good idea to put it into the downloadUrl in metadata. It is best to leave it unset (or explicit set to null). 
   I also suggest you should float metadata changes in an email like table config changes. I suggest that we should change as follows:
   (1) Change the segment completion protocol to mention "PEER" in the uri string (or, maybe a specific URI like "peer://uri/of/server/segmentName". 
   (2). On the controller, if it receives a peer scheme from the server, it knows that the segment needs to be committed without a URI in the segment metadata.
   

##########
File path: pinot-core/src/main/java/org/apache/pinot/core/data/manager/realtime/RealtimeTableDataManager.java
##########
@@ -268,20 +273,81 @@ public void addSegment(String segmentName, TableConfig tableConfig, IndexLoading
   }
 
   public void downloadAndReplaceSegment(String segmentName, LLCRealtimeSegmentZKMetadata llcSegmentMetadata,
-      IndexLoadingConfig indexLoadingConfig) {
+      IndexLoadingConfig indexLoadingConfig, TableConfig tableConfig) {
     final String uri = llcSegmentMetadata.getDownloadUrl();
+    if (!"PEER".equalsIgnoreCase(uri)) {
+      try {
+        downloadSegmentFromDeepStore(segmentName, indexLoadingConfig, uri);
+      } catch (Exception e) {
+        // Download from deep store failed; try to download from peer if peer download is setup for the table.
+        if (isPeerSegmentDownloadEnabled(tableConfig)) {
+          downloadSegmentFromPeer(segmentName, tableConfig.getValidationConfig().getPeerSegmentDownloadScheme(), indexLoadingConfig);
+        } else {
+          throw e;
+        }
+      }
+    } else {
+      if (isPeerSegmentDownloadEnabled(tableConfig)) {
+        downloadSegmentFromPeer(segmentName, tableConfig.getValidationConfig().getPeerSegmentDownloadScheme(), indexLoadingConfig);
+      } else {
+        throw new RuntimeException("Peer segment download not enabled for segment " + segmentName);
+      }
+    }
+  }
+
+  private void downloadSegmentFromDeepStore(String segmentName, IndexLoadingConfig indexLoadingConfig, String uri) {
     File tempSegmentFolder = new File(_indexDir, "tmp-" + segmentName + "." + System.currentTimeMillis());
     File tempFile = new File(_indexDir, segmentName + ".tar.gz");
-    final File segmentFolder = new File(_indexDir, segmentName);
-    FileUtils.deleteQuietly(segmentFolder);
     try {
       SegmentFetcherFactory.fetchSegmentToLocal(uri, tempFile);
       _logger.info("Downloaded file from {} to {}; Length of downloaded file: {}", uri, tempFile, tempFile.length());
-      TarGzCompressionUtils.unTar(tempFile, tempSegmentFolder);
-      _logger.info("Uncompressed file {} into tmp dir {}", tempFile, tempSegmentFolder);
-      FileUtils.moveDirectory(tempSegmentFolder.listFiles()[0], segmentFolder);
-      _logger.info("Replacing LLC Segment {}", segmentName);
-      replaceLLSegment(segmentName, indexLoadingConfig);
+      untarAndMoveSegment(segmentName, indexLoadingConfig, tempSegmentFolder, tempFile);
+    } catch (Exception e) {
+      _logger.warn("Failed to download segment {} from deep store: ", segmentName, e);
+      throw new RuntimeException(e);
+    } finally {
+      FileUtils.deleteQuietly(tempFile);
+      FileUtils.deleteQuietly(tempSegmentFolder);
+    }
+  }
+
+  private void untarAndMoveSegment(String segmentName, IndexLoadingConfig indexLoadingConfig, File tempSegmentFolder,
+      File tempFile)
+      throws IOException, ArchiveException {
+    TarGzCompressionUtils.unTar(tempFile, tempSegmentFolder);
+    _logger.info("Uncompressed file {} into tmp dir {}", tempFile, tempSegmentFolder);
+    final File segmentFolder = new File(_indexDir, segmentName);
+    FileUtils.deleteQuietly(segmentFolder);
+    FileUtils.moveDirectory(tempSegmentFolder.listFiles()[0], segmentFolder);
+    _logger.info("Replacing LLC Segment {}", segmentName);
+    replaceLLSegment(segmentName, indexLoadingConfig);
+  }
+
+  private boolean isPeerSegmentDownloadEnabled(TableConfig tableConfig) {
+    return SegmentFetcherFactory.HTTP_PROTOCOL
+        .equalsIgnoreCase(tableConfig.getValidationConfig().getPeerSegmentDownloadScheme())
+        || SegmentFetcherFactory.HTTPS_PROTOCOL
+        .equalsIgnoreCase(tableConfig.getValidationConfig().getPeerSegmentDownloadScheme());
+  }
+
+  private void downloadSegmentFromPeer(String segmentName, String downloadScheme, IndexLoadingConfig indexLoadingConfig) {
+    File tempSegmentFolder = new File(_indexDir, "tmp-" + segmentName + "." + System.currentTimeMillis());
+    File tempFile = new File(_indexDir, segmentName + ".tar.gz");
+    try {
+      RetryPolicies.exponentialBackoffRetryPolicy(RETRY_COUNT, RETRY_WAIT_MS, RETRY_DELAY_SCALE_FACTOR).attempt(() -> {

Review comment:
       fetchSegmentToLocal already does retries

##########
File path: pinot-common/src/main/java/org/apache/pinot/common/utils/fetcher/SegmentFetcherFactory.java
##########
@@ -35,11 +35,11 @@ private SegmentFetcherFactory() {
 
   public static final String PROTOCOLS_KEY = "protocols";
   public static final String SEGMENT_FETCHER_CLASS_KEY_SUFFIX = ".class";
+  public static final String HTTP_PROTOCOL = "http";

Review comment:
       I think there a few definitions of these floating around in the code. It may be useful to take this opportunity to move it to Constants class.




----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscribe@pinot.apache.org
For additional commands, e-mail: commits-help@pinot.apache.org


[GitHub] [incubator-pinot] mcvsubbu commented on a change in pull request #5336: [Part 4] Deep-store bypass for LLC: Add a peer to peer segment fetcher.

Posted by GitBox <gi...@apache.org>.
mcvsubbu commented on a change in pull request #5336:
URL: https://github.com/apache/incubator-pinot/pull/5336#discussion_r420452949



##########
File path: pinot-common/src/main/java/org/apache/pinot/common/utils/fetcher/PeerServerSegmentFetcher.java
##########
@@ -0,0 +1,149 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.pinot.common.utils.fetcher;
+
+import java.io.File;
+import java.net.URI;
+import java.util.ArrayList;
+import java.util.List;
+import java.util.Map;
+import java.util.Random;
+import org.apache.commons.configuration.Configuration;
+import org.apache.helix.HelixAdmin;
+import org.apache.helix.HelixManager;
+import org.apache.helix.model.ExternalView;
+import org.apache.helix.model.InstanceConfig;
+import org.apache.pinot.common.utils.CommonConstants;
+import org.apache.pinot.common.utils.LLCSegmentName;
+import org.apache.pinot.common.utils.StringUtil;
+import org.apache.pinot.common.utils.helix.HelixHelper;
+import org.apache.pinot.spi.config.table.TableType;
+import org.apache.pinot.spi.plugin.PluginManager;
+import org.apache.pinot.spi.utils.builder.TableNameBuilder;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+
+/**
+ * This segment fetcher downloads the segment by first discovering the server having the segment through external view
+ * of a Pinot table and then downloading the segment from the peer server using a configured http or https fetcher. By
+ * default, HttpSegmentFetcher is used.
+ * The format fo expected segment address uri is
+ *    server:///segment_name
+ * Note the host component is empty.
+ * To use this segment fetcher, servers need to put "server" in their segment fetcher protocol.
+ */
+public class PeerServerSegmentFetcher extends BaseSegmentFetcher {
+  private static final Logger LOGGER = LoggerFactory.getLogger(PeerServerSegmentFetcher.class);
+  private static final String PEER_2_PEER_PROTOCOL = "server";
+  private static final String DOWNLOADER_CLASS = ".downloader.class";
+  private HelixManager _helixManager;
+  private String _helixClusterName;
+  private HttpSegmentFetcher _httpSegmentFetcher;
+
+  public PeerServerSegmentFetcher(Configuration config, HelixManager helixManager, String helixClusterName) {
+    _helixManager = helixManager;
+    _helixClusterName = helixClusterName;
+    String segmentDownloaderClass = config.getString(PEER_2_PEER_PROTOCOL + DOWNLOADER_CLASS);

Review comment:
       nit: I would declare the DOWNLOADER_CLASS without the starting dot, and add the intermediate dot here.

##########
File path: pinot-common/src/main/java/org/apache/pinot/common/utils/fetcher/PeerServerSegmentFetcher.java
##########
@@ -0,0 +1,149 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.pinot.common.utils.fetcher;
+
+import java.io.File;
+import java.net.URI;
+import java.util.ArrayList;
+import java.util.List;
+import java.util.Map;
+import java.util.Random;
+import org.apache.commons.configuration.Configuration;
+import org.apache.helix.HelixAdmin;
+import org.apache.helix.HelixManager;
+import org.apache.helix.model.ExternalView;
+import org.apache.helix.model.InstanceConfig;
+import org.apache.pinot.common.utils.CommonConstants;
+import org.apache.pinot.common.utils.LLCSegmentName;
+import org.apache.pinot.common.utils.StringUtil;
+import org.apache.pinot.common.utils.helix.HelixHelper;
+import org.apache.pinot.spi.config.table.TableType;
+import org.apache.pinot.spi.plugin.PluginManager;
+import org.apache.pinot.spi.utils.builder.TableNameBuilder;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+
+/**
+ * This segment fetcher downloads the segment by first discovering the server having the segment through external view
+ * of a Pinot table and then downloading the segment from the peer server using a configured http or https fetcher. By
+ * default, HttpSegmentFetcher is used.
+ * The format fo expected segment address uri is
+ *    server:///segment_name
+ * Note the host component is empty.
+ * To use this segment fetcher, servers need to put "server" in their segment fetcher protocol.
+ */
+public class PeerServerSegmentFetcher extends BaseSegmentFetcher {
+  private static final Logger LOGGER = LoggerFactory.getLogger(PeerServerSegmentFetcher.class);

Review comment:
       We dont need this logger right? We can use `_logger` ?

##########
File path: pinot-common/src/main/java/org/apache/pinot/common/utils/fetcher/PeerServerSegmentFetcher.java
##########
@@ -0,0 +1,149 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.pinot.common.utils.fetcher;
+
+import java.io.File;
+import java.net.URI;
+import java.util.ArrayList;
+import java.util.List;
+import java.util.Map;
+import java.util.Random;
+import org.apache.commons.configuration.Configuration;
+import org.apache.helix.HelixAdmin;
+import org.apache.helix.HelixManager;
+import org.apache.helix.model.ExternalView;
+import org.apache.helix.model.InstanceConfig;
+import org.apache.pinot.common.utils.CommonConstants;
+import org.apache.pinot.common.utils.LLCSegmentName;
+import org.apache.pinot.common.utils.StringUtil;
+import org.apache.pinot.common.utils.helix.HelixHelper;
+import org.apache.pinot.spi.config.table.TableType;
+import org.apache.pinot.spi.plugin.PluginManager;
+import org.apache.pinot.spi.utils.builder.TableNameBuilder;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+
+/**
+ * This segment fetcher downloads the segment by first discovering the server having the segment through external view
+ * of a Pinot table and then downloading the segment from the peer server using a configured http or https fetcher. By
+ * default, HttpSegmentFetcher is used.
+ * The format fo expected segment address uri is
+ *    server:///segment_name
+ * Note the host component is empty.
+ * To use this segment fetcher, servers need to put "server" in their segment fetcher protocol.
+ */
+public class PeerServerSegmentFetcher extends BaseSegmentFetcher {
+  private static final Logger LOGGER = LoggerFactory.getLogger(PeerServerSegmentFetcher.class);
+  private static final String PEER_2_PEER_PROTOCOL = "server";
+  private static final String DOWNLOADER_CLASS = ".downloader.class";
+  private HelixManager _helixManager;
+  private String _helixClusterName;
+  private HttpSegmentFetcher _httpSegmentFetcher;
+
+  public PeerServerSegmentFetcher(Configuration config, HelixManager helixManager, String helixClusterName) {
+    _helixManager = helixManager;
+    _helixClusterName = helixClusterName;
+    String segmentDownloaderClass = config.getString(PEER_2_PEER_PROTOCOL + DOWNLOADER_CLASS);
+    try {
+      _httpSegmentFetcher = PluginManager.get().createInstance(segmentDownloaderClass);

Review comment:
       So, essentially we are saying that it can either be http or https segment fetcher (or some other class that extends from these), right? We need to get the config story right. Can we configure just the scheme and construct the rest of the url here and initialize the right fetcher? Do you anticipate any other class than these two being used?




----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscribe@pinot.apache.org
For additional commands, e-mail: commits-help@pinot.apache.org


[GitHub] [incubator-pinot] mcvsubbu commented on a change in pull request #5336: [Part 4] Deep-store bypass for LLC: Add a peer to peer segment fetcher.

Posted by GitBox <gi...@apache.org>.
mcvsubbu commented on a change in pull request #5336:
URL: https://github.com/apache/incubator-pinot/pull/5336#discussion_r434191769



##########
File path: pinot-common/src/main/java/org/apache/pinot/common/utils/fetcher/PeerServerSegmentFetcher.java
##########
@@ -0,0 +1,155 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.pinot.common.utils.fetcher;
+
+import com.google.common.annotations.VisibleForTesting;
+import java.io.File;
+import java.net.URI;
+import java.util.ArrayList;
+import java.util.List;
+import java.util.Map;
+import java.util.Random;
+import org.apache.commons.configuration.Configuration;
+import org.apache.helix.HelixAdmin;
+import org.apache.helix.HelixManager;
+import org.apache.helix.model.ExternalView;
+import org.apache.helix.model.InstanceConfig;
+import org.apache.pinot.common.utils.CommonConstants;
+import org.apache.pinot.common.utils.LLCSegmentName;
+import org.apache.pinot.common.utils.StringUtil;
+import org.apache.pinot.common.utils.helix.HelixHelper;
+import org.apache.pinot.spi.config.table.TableType;
+import org.apache.pinot.spi.utils.builder.TableNameBuilder;
+
+/**
+ * This segment fetcher downloads the segment by first discovering the server having the segment through external view
+ * of a Pinot table and then downloading the segment from the peer server using a configured http or https fetcher. By
+ * default, HttpSegmentFetcher is used.
+ * The format fo expected segment address uri is
+ *    peer:///segment_name
+ * Note the host component is empty.
+ */
+public class PeerServerSegmentFetcher extends BaseSegmentFetcher {
+  private static final String PEER_2_PEER_PROTOCOL = "peer";
+  private static final String PEER_SEGMENT_DOWNLOAD_SCHEME = "peerSegmentDownloadScheme";
+  private HelixManager _helixManager;
+  private String _helixClusterName;
+  private HttpSegmentFetcher _httpSegmentFetcher;
+  // The value is either https or http
+  private final String _httpScheme;
+
+  public PeerServerSegmentFetcher(Configuration config, HelixManager helixManager, String helixClusterName) {
+    _helixManager = helixManager;
+    _helixClusterName = helixClusterName;
+    switch (config.getString(PEER_SEGMENT_DOWNLOAD_SCHEME)) {
+      case "https":
+        _httpSegmentFetcher = new HttpsSegmentFetcher();
+        _httpScheme = "https";
+        break;
+      default:
+        _httpSegmentFetcher = new HttpSegmentFetcher();
+        _httpScheme = "http";
+    }
+    _httpSegmentFetcher.init(config);
+  }
+
+  public PeerServerSegmentFetcher(HttpSegmentFetcher httpSegmentFetcher, String httpScheme, HelixManager helixManager,
+      String helixClusterName) {
+    _helixManager = helixManager;
+    _helixClusterName = helixClusterName;
+    _httpSegmentFetcher = httpSegmentFetcher;
+    _httpScheme = httpScheme;
+  }
+
+  @Override
+  public void fetchSegmentToLocalWithoutRetry(URI uri, File dest)
+      throws Exception {
+    if (!PEER_2_PEER_PROTOCOL.equals(uri.getScheme())) {

Review comment:
       Not sure how the peer-2-peer protocol plays here. Caller can create an http segment fetcher, or https segment fetcher based on the table config, right? The only extra logic that the caller needs is the construction of a URL. Maybe that can be in a utils class, or just in the caller as a method (if it is used by exactly one caller)

##########
File path: pinot-common/src/main/java/org/apache/pinot/common/utils/fetcher/PeerServerSegmentFetcher.java
##########
@@ -0,0 +1,155 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.pinot.common.utils.fetcher;
+
+import com.google.common.annotations.VisibleForTesting;
+import java.io.File;
+import java.net.URI;
+import java.util.ArrayList;
+import java.util.List;
+import java.util.Map;
+import java.util.Random;
+import org.apache.commons.configuration.Configuration;
+import org.apache.helix.HelixAdmin;
+import org.apache.helix.HelixManager;
+import org.apache.helix.model.ExternalView;
+import org.apache.helix.model.InstanceConfig;
+import org.apache.pinot.common.utils.CommonConstants;
+import org.apache.pinot.common.utils.LLCSegmentName;
+import org.apache.pinot.common.utils.StringUtil;
+import org.apache.pinot.common.utils.helix.HelixHelper;
+import org.apache.pinot.spi.config.table.TableType;
+import org.apache.pinot.spi.utils.builder.TableNameBuilder;
+
+/**
+ * This segment fetcher downloads the segment by first discovering the server having the segment through external view
+ * of a Pinot table and then downloading the segment from the peer server using a configured http or https fetcher. By
+ * default, HttpSegmentFetcher is used.
+ * The format fo expected segment address uri is
+ *    peer:///segment_name
+ * Note the host component is empty.
+ */
+public class PeerServerSegmentFetcher extends BaseSegmentFetcher {
+  private static final String PEER_2_PEER_PROTOCOL = "peer";
+  private static final String PEER_SEGMENT_DOWNLOAD_SCHEME = "peerSegmentDownloadScheme";
+  private HelixManager _helixManager;
+  private String _helixClusterName;
+  private HttpSegmentFetcher _httpSegmentFetcher;
+  // The value is either https or http
+  private final String _httpScheme;
+
+  public PeerServerSegmentFetcher(Configuration config, HelixManager helixManager, String helixClusterName) {
+    _helixManager = helixManager;
+    _helixClusterName = helixClusterName;
+    switch (config.getString(PEER_SEGMENT_DOWNLOAD_SCHEME)) {
+      case "https":

Review comment:
       Change these to some definitions in constants class (or, they may already be there). Use those constants in PR#5478 as well, instead of hard-coded strings

##########
File path: pinot-common/src/main/java/org/apache/pinot/common/utils/fetcher/PeerServerSegmentFetcher.java
##########
@@ -0,0 +1,155 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.pinot.common.utils.fetcher;
+
+import com.google.common.annotations.VisibleForTesting;
+import java.io.File;
+import java.net.URI;
+import java.util.ArrayList;
+import java.util.List;
+import java.util.Map;
+import java.util.Random;
+import org.apache.commons.configuration.Configuration;
+import org.apache.helix.HelixAdmin;
+import org.apache.helix.HelixManager;
+import org.apache.helix.model.ExternalView;
+import org.apache.helix.model.InstanceConfig;
+import org.apache.pinot.common.utils.CommonConstants;
+import org.apache.pinot.common.utils.LLCSegmentName;
+import org.apache.pinot.common.utils.StringUtil;
+import org.apache.pinot.common.utils.helix.HelixHelper;
+import org.apache.pinot.spi.config.table.TableType;
+import org.apache.pinot.spi.utils.builder.TableNameBuilder;
+
+/**
+ * This segment fetcher downloads the segment by first discovering the server having the segment through external view
+ * of a Pinot table and then downloading the segment from the peer server using a configured http or https fetcher. By
+ * default, HttpSegmentFetcher is used.
+ * The format fo expected segment address uri is
+ *    peer:///segment_name
+ * Note the host component is empty.
+ */
+public class PeerServerSegmentFetcher extends BaseSegmentFetcher {
+  private static final String PEER_2_PEER_PROTOCOL = "peer";
+  private static final String PEER_SEGMENT_DOWNLOAD_SCHEME = "peerSegmentDownloadScheme";
+  private HelixManager _helixManager;
+  private String _helixClusterName;
+  private HttpSegmentFetcher _httpSegmentFetcher;
+  // The value is either https or http
+  private final String _httpScheme;
+
+  public PeerServerSegmentFetcher(Configuration config, HelixManager helixManager, String helixClusterName) {
+    _helixManager = helixManager;
+    _helixClusterName = helixClusterName;
+    switch (config.getString(PEER_SEGMENT_DOWNLOAD_SCHEME)) {

Review comment:
       we should get the config from the tableConfig




----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscribe@pinot.apache.org
For additional commands, e-mail: commits-help@pinot.apache.org


[GitHub] [incubator-pinot] mcvsubbu commented on a change in pull request #5336: [Part 4] Deep-store bypass for LLC: Add a peer to peer segment fetcher.

Posted by GitBox <gi...@apache.org>.
mcvsubbu commented on a change in pull request #5336:
URL: https://github.com/apache/incubator-pinot/pull/5336#discussion_r449272474



##########
File path: pinot-core/src/main/java/org/apache/pinot/core/data/manager/realtime/RealtimeTableDataManager.java
##########
@@ -57,9 +61,12 @@
 import org.apache.pinot.spi.data.FieldSpec;
 import org.apache.pinot.spi.data.Schema;
 
+import static org.apache.pinot.common.utils.CommonConstants.Segment.METADATA_URI_FOR_PEER_DOWNLOAD;
+
 
 @ThreadSafe
 public class RealtimeTableDataManager extends BaseTableDataManager {
+  private static final String EMPTY_URL = "";

Review comment:
       this is not needed anymore




----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscribe@pinot.apache.org
For additional commands, e-mail: commits-help@pinot.apache.org


[GitHub] [incubator-pinot] chenboat commented on a change in pull request #5336: [Part 4] Deep-store bypass for LLC: Add a peer to peer segment fetcher.

Posted by GitBox <gi...@apache.org>.
chenboat commented on a change in pull request #5336:
URL: https://github.com/apache/incubator-pinot/pull/5336#discussion_r448601742



##########
File path: pinot-common/src/main/java/org/apache/pinot/common/utils/fetcher/SegmentFetcherFactory.java
##########
@@ -35,11 +35,11 @@ private SegmentFetcherFactory() {
 
   public static final String PROTOCOLS_KEY = "protocols";
   public static final String SEGMENT_FETCHER_CLASS_KEY_SUFFIX = ".class";
+  public static final String HTTP_PROTOCOL = "http";

Review comment:
       done.




----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscribe@pinot.apache.org
For additional commands, e-mail: commits-help@pinot.apache.org


[GitHub] [incubator-pinot] chenboat commented on a change in pull request #5336: [Part 4] Deep-store bypass for LLC: Add a peer to peer segment fetcher.

Posted by GitBox <gi...@apache.org>.
chenboat commented on a change in pull request #5336:
URL: https://github.com/apache/incubator-pinot/pull/5336#discussion_r448548586



##########
File path: pinot-core/src/main/java/org/apache/pinot/core/util/PeerServerSegmentFinder.java
##########
@@ -0,0 +1,111 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.pinot.core.util;
+
+import java.net.URI;
+import java.net.URISyntaxException;
+import java.util.ArrayList;
+import java.util.List;
+import java.util.Map;
+
+import org.apache.commons.collections.ListUtils;
+import org.apache.helix.HelixAdmin;
+import org.apache.helix.HelixManager;
+import org.apache.helix.model.ExternalView;
+import org.apache.helix.model.InstanceConfig;
+import org.apache.pinot.common.utils.CommonConstants;
+import org.apache.pinot.common.utils.LLCSegmentName;
+import org.apache.pinot.common.utils.StringUtil;
+import org.apache.pinot.common.utils.helix.HelixHelper;
+import org.apache.pinot.spi.config.table.TableType;
+import org.apache.pinot.spi.utils.builder.TableNameBuilder;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+
+/**
+ * PeerServerSegmentFinder discovers all the servers having the input segment in a ONLINE state through external view of
+ * a Pinot table.
+ */
+public class PeerServerSegmentFinder {
+  private static final Logger _logger = LoggerFactory.getLogger(PeerServerSegmentFinder.class);
+  /**
+   *
+   * @param segmentName
+   * @param downloadScheme Can be either http or https.
+   * @param helixManager
+   * @return a list of uri strings of the form http(s)://hostname:port/segments/tablenameWithType/segmentName
+   * for the servers hosting ONLINE segments; empty list if no such server found.
+   */
+  public static List<URI> getPeerServerURIs(String segmentName, String downloadScheme, HelixManager helixManager) {
+    LLCSegmentName llcSegmentName = new LLCSegmentName(segmentName);
+    String tableNameWithType =
+        TableNameBuilder.forType(TableType.REALTIME).tableNameWithType(llcSegmentName.getTableName());
+
+    HelixAdmin helixAdmin = helixManager.getClusterManagmentTool();
+    String clusterName = helixManager.getClusterName();
+    if (clusterName == null) {
+      _logger.error("ClusterName not found");
+      return ListUtils.EMPTY_LIST;
+    }
+    ExternalView externalViewForResource =
+        HelixHelper.getExternalViewForResource(helixAdmin, clusterName, tableNameWithType);
+    if (externalViewForResource == null) {
+      _logger.warn("External View not found for table {}", tableNameWithType);
+      return ListUtils.EMPTY_LIST;
+    }
+    List<URI> onlineServerURIs = new ArrayList<>();
+    // Find out the ONLINE server serving the segment.
+    for (String segment : externalViewForResource.getPartitionSet()) {

Review comment:
       good catch. I was using the getStateMaps a few lines below. Revised.




----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscribe@pinot.apache.org
For additional commands, e-mail: commits-help@pinot.apache.org


[GitHub] [incubator-pinot] mcvsubbu merged pull request #5336: [Part 4] Deep-store bypass for LLC: Add a peer to peer segment fetcher.

Posted by GitBox <gi...@apache.org>.
mcvsubbu merged pull request #5336:
URL: https://github.com/apache/incubator-pinot/pull/5336


   


----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscribe@pinot.apache.org
For additional commands, e-mail: commits-help@pinot.apache.org


[GitHub] [incubator-pinot] chenboat commented on a change in pull request #5336: [Part 4] Deep-store bypass for LLC: Add a peer to peer segment fetcher.

Posted by GitBox <gi...@apache.org>.
chenboat commented on a change in pull request #5336:
URL: https://github.com/apache/incubator-pinot/pull/5336#discussion_r422427031



##########
File path: pinot-common/src/main/java/org/apache/pinot/common/utils/fetcher/PeerServerSegmentFetcher.java
##########
@@ -0,0 +1,149 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.pinot.common.utils.fetcher;
+
+import java.io.File;
+import java.net.URI;
+import java.util.ArrayList;
+import java.util.List;
+import java.util.Map;
+import java.util.Random;
+import org.apache.commons.configuration.Configuration;
+import org.apache.helix.HelixAdmin;
+import org.apache.helix.HelixManager;
+import org.apache.helix.model.ExternalView;
+import org.apache.helix.model.InstanceConfig;
+import org.apache.pinot.common.utils.CommonConstants;
+import org.apache.pinot.common.utils.LLCSegmentName;
+import org.apache.pinot.common.utils.StringUtil;
+import org.apache.pinot.common.utils.helix.HelixHelper;
+import org.apache.pinot.spi.config.table.TableType;
+import org.apache.pinot.spi.plugin.PluginManager;
+import org.apache.pinot.spi.utils.builder.TableNameBuilder;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+
+/**
+ * This segment fetcher downloads the segment by first discovering the server having the segment through external view
+ * of a Pinot table and then downloading the segment from the peer server using a configured http or https fetcher. By
+ * default, HttpSegmentFetcher is used.
+ * The format fo expected segment address uri is
+ *    server:///segment_name
+ * Note the host component is empty.
+ * To use this segment fetcher, servers need to put "server" in their segment fetcher protocol.
+ */
+public class PeerServerSegmentFetcher extends BaseSegmentFetcher {
+  private static final Logger LOGGER = LoggerFactory.getLogger(PeerServerSegmentFetcher.class);
+  private static final String PEER_2_PEER_PROTOCOL = "server";
+  private static final String DOWNLOADER_CLASS = ".downloader.class";
+  private HelixManager _helixManager;
+  private String _helixClusterName;
+  private HttpSegmentFetcher _httpSegmentFetcher;
+
+  public PeerServerSegmentFetcher(Configuration config, HelixManager helixManager, String helixClusterName) {
+    _helixManager = helixManager;
+    _helixClusterName = helixClusterName;
+    String segmentDownloaderClass = config.getString(PEER_2_PEER_PROTOCOL + DOWNLOADER_CLASS);
+    try {
+      _httpSegmentFetcher = PluginManager.get().createInstance(segmentDownloaderClass);

Review comment:
       The config I use now passed in the fetcher class (mainly for ease of testing). I can pass in scheme (http or https) too together with the class. But let us put this config choice in a broader discussion we had offline so that we plan everything in one discussion. The updated design is here: https://cwiki.apache.org/confluence/display/PINOT/By-passing+deep-store+requirement+for+Realtime+segment+completion#By-passingdeep-storerequirementforRealtimesegmentcompletion-Configchange




----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscribe@pinot.apache.org
For additional commands, e-mail: commits-help@pinot.apache.org


[GitHub] [incubator-pinot] chenboat commented on a change in pull request #5336: [Part 4] Deep-store bypass for LLC: Add a peer to peer segment fetcher.

Posted by GitBox <gi...@apache.org>.
chenboat commented on a change in pull request #5336:
URL: https://github.com/apache/incubator-pinot/pull/5336#discussion_r448741283



##########
File path: pinot-core/src/main/java/org/apache/pinot/core/data/manager/realtime/RealtimeTableDataManager.java
##########
@@ -268,21 +276,79 @@ public void addSegment(String segmentName, TableConfig tableConfig, IndexLoading
   }
 
   public void downloadAndReplaceSegment(String segmentName, LLCRealtimeSegmentZKMetadata llcSegmentMetadata,
-      IndexLoadingConfig indexLoadingConfig) {
+      IndexLoadingConfig indexLoadingConfig, TableConfig tableConfig) {
     final String uri = llcSegmentMetadata.getDownloadUrl();
+    if (uri != null && !uri.isEmpty()) {

Review comment:
       Added the CommonConstants.Segments.METADATA_URI_FOR_PEER_DOWNLOAD constant.




----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscribe@pinot.apache.org
For additional commands, e-mail: commits-help@pinot.apache.org


[GitHub] [incubator-pinot] codecov-commenter commented on pull request #5336: [Part 4] Deep-store bypass for LLC: Add a peer to peer segment fetcher.

Posted by GitBox <gi...@apache.org>.
codecov-commenter commented on pull request #5336:
URL: https://github.com/apache/incubator-pinot/pull/5336#issuecomment-642301046


   # [Codecov](https://codecov.io/gh/apache/incubator-pinot/pull/5336?src=pr&el=h1) Report
   > :exclamation: No coverage uploaded for pull request base (`master@5ebcacf`). [Click here to learn what that means](https://docs.codecov.io/docs/error-reference#section-missing-base-commit).
   > The diff coverage is `55.55%`.
   
   [![Impacted file tree graph](https://codecov.io/gh/apache/incubator-pinot/pull/5336/graphs/tree.svg?width=650&height=150&src=pr&token=4ibza2ugkz)](https://codecov.io/gh/apache/incubator-pinot/pull/5336?src=pr&el=tree)
   
   ```diff
   @@            Coverage Diff            @@
   ##             master    #5336   +/-   ##
   =========================================
     Coverage          ?   66.32%           
   =========================================
     Files             ?     1105           
     Lines             ?    56965           
     Branches          ?     8516           
   =========================================
     Hits              ?    37782           
     Misses            ?    16393           
     Partials          ?     2790           
   ```
   
   | Flag | Coverage Δ | |
   |---|---|---|
   | #integrationtests | `44.79% <25.25%> (?)` | |
   | #unittests | `56.77% <37.37%> (?)` | |
   
   | [Impacted Files](https://codecov.io/gh/apache/incubator-pinot/pull/5336?src=pr&el=tree) | Coverage Δ | |
   |---|---|---|
   | [...org/apache/pinot/common/utils/CommonConstants.java](https://codecov.io/gh/apache/incubator-pinot/pull/5336/diff?src=pr&el=tree#diff-cGlub3QtY29tbW9uL3NyYy9tYWluL2phdmEvb3JnL2FwYWNoZS9waW5vdC9jb21tb24vdXRpbHMvQ29tbW9uQ29uc3RhbnRzLmphdmE=) | `39.02% <ø> (ø)` | |
   | [...e/pinot/common/utils/FileUploadDownloadClient.java](https://codecov.io/gh/apache/incubator-pinot/pull/5336/diff?src=pr&el=tree#diff-cGlub3QtY29tbW9uL3NyYy9tYWluL2phdmEvb3JnL2FwYWNoZS9waW5vdC9jb21tb24vdXRpbHMvRmlsZVVwbG9hZERvd25sb2FkQ2xpZW50LmphdmE=) | `49.40% <ø> (ø)` | |
   | [...ava/org/apache/pinot/common/utils/SchemaUtils.java](https://codecov.io/gh/apache/incubator-pinot/pull/5336/diff?src=pr&el=tree#diff-cGlub3QtY29tbW9uL3NyYy9tYWluL2phdmEvb3JnL2FwYWNoZS9waW5vdC9jb21tb24vdXRpbHMvU2NoZW1hVXRpbHMuamF2YQ==) | `9.45% <0.00%> (ø)` | |
   | [...he/pinot/common/utils/config/TableConfigUtils.java](https://codecov.io/gh/apache/incubator-pinot/pull/5336/diff?src=pr&el=tree#diff-cGlub3QtY29tbW9uL3NyYy9tYWluL2phdmEvb3JnL2FwYWNoZS9waW5vdC9jb21tb24vdXRpbHMvY29uZmlnL1RhYmxlQ29uZmlnVXRpbHMuamF2YQ==) | `87.61% <0.00%> (ø)` | |
   | [...ot/common/utils/fetcher/SegmentFetcherFactory.java](https://codecov.io/gh/apache/incubator-pinot/pull/5336/diff?src=pr&el=tree#diff-cGlub3QtY29tbW9uL3NyYy9tYWluL2phdmEvb3JnL2FwYWNoZS9waW5vdC9jb21tb24vdXRpbHMvZmV0Y2hlci9TZWdtZW50RmV0Y2hlckZhY3RvcnkuamF2YQ==) | `89.47% <ø> (ø)` | |
   | [...he/pinot/common/utils/webhdfs/WebHdfsV1Client.java](https://codecov.io/gh/apache/incubator-pinot/pull/5336/diff?src=pr&el=tree#diff-cGlub3QtY29tbW9uL3NyYy9tYWluL2phdmEvb3JnL2FwYWNoZS9waW5vdC9jb21tb24vdXRpbHMvd2ViaGRmcy9XZWJIZGZzVjFDbGllbnQuamF2YQ==) | `0.00% <ø> (ø)` | |
   | [...org/apache/pinot/controller/ControllerStarter.java](https://codecov.io/gh/apache/incubator-pinot/pull/5336/diff?src=pr&el=tree#diff-cGlub3QtY29udHJvbGxlci9zcmMvbWFpbi9qYXZhL29yZy9hcGFjaGUvcGlub3QvY29udHJvbGxlci9Db250cm9sbGVyU3RhcnRlci5qYXZh) | `73.35% <0.00%> (ø)` | |
   | [...altime/ServerSegmentCompletionProtocolHandler.java](https://codecov.io/gh/apache/incubator-pinot/pull/5336/diff?src=pr&el=tree#diff-cGlub3QtY29yZS9zcmMvbWFpbi9qYXZhL29yZy9hcGFjaGUvcGlub3Qvc2VydmVyL3JlYWx0aW1lL1NlcnZlclNlZ21lbnRDb21wbGV0aW9uUHJvdG9jb2xIYW5kbGVyLmphdmE=) | `35.00% <ø> (ø)` | |
   | [...in/java/org/apache/pinot/minion/MinionStarter.java](https://codecov.io/gh/apache/incubator-pinot/pull/5336/diff?src=pr&el=tree#diff-cGlub3QtbWluaW9uL3NyYy9tYWluL2phdmEvb3JnL2FwYWNoZS9waW5vdC9taW5pb24vTWluaW9uU3RhcnRlci5qYXZh) | `82.55% <ø> (ø)` | |
   | [...va/org/apache/pinot/controller/ControllerConf.java](https://codecov.io/gh/apache/incubator-pinot/pull/5336/diff?src=pr&el=tree#diff-cGlub3QtY29udHJvbGxlci9zcmMvbWFpbi9qYXZhL29yZy9hcGFjaGUvcGlub3QvY29udHJvbGxlci9Db250cm9sbGVyQ29uZi5qYXZh) | `49.47% <33.33%> (ø)` | |
   | ... and [1114 more](https://codecov.io/gh/apache/incubator-pinot/pull/5336/diff?src=pr&el=tree-more) | |
   
   ------
   
   [Continue to review full report at Codecov](https://codecov.io/gh/apache/incubator-pinot/pull/5336?src=pr&el=continue).
   > **Legend** - [Click here to learn more](https://docs.codecov.io/docs/codecov-delta)
   > `Δ = absolute <relative> (impact)`, `ø = not affected`, `? = missing data`
   > Powered by [Codecov](https://codecov.io/gh/apache/incubator-pinot/pull/5336?src=pr&el=footer). Last update [5ebcacf...c7fa444](https://codecov.io/gh/apache/incubator-pinot/pull/5336?src=pr&el=lastupdated). Read the [comment docs](https://docs.codecov.io/docs/pull-request-comments).
   


----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscribe@pinot.apache.org
For additional commands, e-mail: commits-help@pinot.apache.org


[GitHub] [incubator-pinot] mcvsubbu commented on a change in pull request #5336: [Part 4] Deep-store bypass for LLC: Add a peer to peer segment fetcher.

Posted by GitBox <gi...@apache.org>.
mcvsubbu commented on a change in pull request #5336:
URL: https://github.com/apache/incubator-pinot/pull/5336#discussion_r448664817



##########
File path: pinot-core/src/main/java/org/apache/pinot/core/data/manager/realtime/RealtimeTableDataManager.java
##########
@@ -268,21 +276,79 @@ public void addSegment(String segmentName, TableConfig tableConfig, IndexLoading
   }
 
   public void downloadAndReplaceSegment(String segmentName, LLCRealtimeSegmentZKMetadata llcSegmentMetadata,
-      IndexLoadingConfig indexLoadingConfig) {
+      IndexLoadingConfig indexLoadingConfig, TableConfig tableConfig) {
     final String uri = llcSegmentMetadata.getDownloadUrl();
+    if (uri != null && !uri.isEmpty()) {

Review comment:
       check against uri not equal to `CommonConstants.Segments.SomeConstant` that we define as `""`. The controller and server should use the same.
   Also, uri can never be null, since we never write it as null for a completed segments. You can remove that check




----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscribe@pinot.apache.org
For additional commands, e-mail: commits-help@pinot.apache.org


[GitHub] [incubator-pinot] chenboat commented on a change in pull request #5336: [Part 4] Deep-store bypass for LLC: Add a peer to peer segment fetcher.

Posted by GitBox <gi...@apache.org>.
chenboat commented on a change in pull request #5336:
URL: https://github.com/apache/incubator-pinot/pull/5336#discussion_r448548971



##########
File path: pinot-core/src/main/java/org/apache/pinot/core/data/manager/realtime/RealtimeTableDataManager.java
##########
@@ -268,20 +273,81 @@ public void addSegment(String segmentName, TableConfig tableConfig, IndexLoading
   }
 
   public void downloadAndReplaceSegment(String segmentName, LLCRealtimeSegmentZKMetadata llcSegmentMetadata,
-      IndexLoadingConfig indexLoadingConfig) {
+      IndexLoadingConfig indexLoadingConfig, TableConfig tableConfig) {
     final String uri = llcSegmentMetadata.getDownloadUrl();
+    if (!"PEER".equalsIgnoreCase(uri)) {
+      try {
+        downloadSegmentFromDeepStore(segmentName, indexLoadingConfig, uri);
+      } catch (Exception e) {
+        // Download from deep store failed; try to download from peer if peer download is setup for the table.
+        if (isPeerSegmentDownloadEnabled(tableConfig)) {
+          downloadSegmentFromPeer(segmentName, tableConfig.getValidationConfig().getPeerSegmentDownloadScheme(), indexLoadingConfig);
+        } else {
+          throw e;
+        }
+      }
+    } else {
+      if (isPeerSegmentDownloadEnabled(tableConfig)) {
+        downloadSegmentFromPeer(segmentName, tableConfig.getValidationConfig().getPeerSegmentDownloadScheme(), indexLoadingConfig);
+      } else {
+        throw new RuntimeException("Peer segment download not enabled for segment " + segmentName);
+      }
+    }
+  }
+
+  private void downloadSegmentFromDeepStore(String segmentName, IndexLoadingConfig indexLoadingConfig, String uri) {
     File tempSegmentFolder = new File(_indexDir, "tmp-" + segmentName + "." + System.currentTimeMillis());
     File tempFile = new File(_indexDir, segmentName + ".tar.gz");
-    final File segmentFolder = new File(_indexDir, segmentName);
-    FileUtils.deleteQuietly(segmentFolder);
     try {
       SegmentFetcherFactory.fetchSegmentToLocal(uri, tempFile);
       _logger.info("Downloaded file from {} to {}; Length of downloaded file: {}", uri, tempFile, tempFile.length());
-      TarGzCompressionUtils.unTar(tempFile, tempSegmentFolder);
-      _logger.info("Uncompressed file {} into tmp dir {}", tempFile, tempSegmentFolder);
-      FileUtils.moveDirectory(tempSegmentFolder.listFiles()[0], segmentFolder);
-      _logger.info("Replacing LLC Segment {}", segmentName);
-      replaceLLSegment(segmentName, indexLoadingConfig);
+      untarAndMoveSegment(segmentName, indexLoadingConfig, tempSegmentFolder, tempFile);
+    } catch (Exception e) {
+      _logger.warn("Failed to download segment {} from deep store: ", segmentName, e);
+      throw new RuntimeException(e);
+    } finally {
+      FileUtils.deleteQuietly(tempFile);
+      FileUtils.deleteQuietly(tempSegmentFolder);
+    }
+  }
+
+  private void untarAndMoveSegment(String segmentName, IndexLoadingConfig indexLoadingConfig, File tempSegmentFolder,
+      File tempFile)
+      throws IOException, ArchiveException {
+    TarGzCompressionUtils.unTar(tempFile, tempSegmentFolder);
+    _logger.info("Uncompressed file {} into tmp dir {}", tempFile, tempSegmentFolder);
+    final File segmentFolder = new File(_indexDir, segmentName);
+    FileUtils.deleteQuietly(segmentFolder);
+    FileUtils.moveDirectory(tempSegmentFolder.listFiles()[0], segmentFolder);
+    _logger.info("Replacing LLC Segment {}", segmentName);
+    replaceLLSegment(segmentName, indexLoadingConfig);
+  }
+
+  private boolean isPeerSegmentDownloadEnabled(TableConfig tableConfig) {
+    return SegmentFetcherFactory.HTTP_PROTOCOL
+        .equalsIgnoreCase(tableConfig.getValidationConfig().getPeerSegmentDownloadScheme())
+        || SegmentFetcherFactory.HTTPS_PROTOCOL
+        .equalsIgnoreCase(tableConfig.getValidationConfig().getPeerSegmentDownloadScheme());
+  }
+
+  private void downloadSegmentFromPeer(String segmentName, String downloadScheme, IndexLoadingConfig indexLoadingConfig) {
+    File tempSegmentFolder = new File(_indexDir, "tmp-" + segmentName + "." + System.currentTimeMillis());
+    File tempFile = new File(_indexDir, segmentName + ".tar.gz");
+    try {
+      RetryPolicies.exponentialBackoffRetryPolicy(RETRY_COUNT, RETRY_WAIT_MS, RETRY_DELAY_SCALE_FACTOR).attempt(() -> {

Review comment:
       done. thanks for the pointer.




----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscribe@pinot.apache.org
For additional commands, e-mail: commits-help@pinot.apache.org


[GitHub] [incubator-pinot] mcvsubbu commented on a change in pull request #5336: [Part 4] Deep-store bypass for LLC: Add a peer to peer segment fetcher.

Posted by GitBox <gi...@apache.org>.
mcvsubbu commented on a change in pull request #5336:
URL: https://github.com/apache/incubator-pinot/pull/5336#discussion_r448097186



##########
File path: pinot-core/src/main/java/org/apache/pinot/core/data/manager/realtime/RealtimeTableDataManager.java
##########
@@ -268,20 +276,82 @@ public void addSegment(String segmentName, TableConfig tableConfig, IndexLoading
   }
 
   public void downloadAndReplaceSegment(String segmentName, LLCRealtimeSegmentZKMetadata llcSegmentMetadata,
-      IndexLoadingConfig indexLoadingConfig) {
+      IndexLoadingConfig indexLoadingConfig, TableConfig tableConfig) {
     final String uri = llcSegmentMetadata.getDownloadUrl();
+    if (uri != null && !uri.isEmpty()) {
+      try {
+        downloadSegmentFromDeepStore(segmentName, indexLoadingConfig, uri);
+      } catch (Exception e) {
+        // Download from deep store failed; try to download from peer if peer download is setup for the table.

Review comment:
       add a warn log here with segment name and exception.

##########
File path: pinot-core/src/main/java/org/apache/pinot/core/util/PeerServerSegmentFinder.java
##########
@@ -0,0 +1,111 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.pinot.core.util;
+
+import java.net.URI;
+import java.net.URISyntaxException;
+import java.util.ArrayList;
+import java.util.List;
+import java.util.Map;
+
+import org.apache.commons.collections.ListUtils;
+import org.apache.helix.HelixAdmin;
+import org.apache.helix.HelixManager;
+import org.apache.helix.model.ExternalView;
+import org.apache.helix.model.InstanceConfig;
+import org.apache.pinot.common.utils.CommonConstants;
+import org.apache.pinot.common.utils.LLCSegmentName;
+import org.apache.pinot.common.utils.StringUtil;
+import org.apache.pinot.common.utils.helix.HelixHelper;
+import org.apache.pinot.spi.config.table.TableType;
+import org.apache.pinot.spi.utils.builder.TableNameBuilder;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+
+/**
+ * PeerServerSegmentFinder discovers all the servers having the input segment in a ONLINE state through external view of
+ * a Pinot table.
+ */
+public class PeerServerSegmentFinder {
+  private static final Logger _logger = LoggerFactory.getLogger(PeerServerSegmentFinder.class);
+  /**
+   *
+   * @param segmentName
+   * @param downloadScheme Can be either http or https.
+   * @param helixManager
+   * @return a list of uri strings of the form http(s)://hostname:port/segments/tablenameWithType/segmentName
+   * for the servers hosting ONLINE segments; empty list if no such server found.
+   */
+  public static List<URI> getPeerServerURIs(String segmentName, String downloadScheme, HelixManager helixManager) {
+    LLCSegmentName llcSegmentName = new LLCSegmentName(segmentName);
+    String tableNameWithType =
+        TableNameBuilder.forType(TableType.REALTIME).tableNameWithType(llcSegmentName.getTableName());
+
+    HelixAdmin helixAdmin = helixManager.getClusterManagmentTool();
+    String clusterName = helixManager.getClusterName();
+    if (clusterName == null) {
+      _logger.error("ClusterName not found");
+      return ListUtils.EMPTY_LIST;
+    }
+    ExternalView externalViewForResource =
+        HelixHelper.getExternalViewForResource(helixAdmin, clusterName, tableNameWithType);
+    if (externalViewForResource == null) {
+      _logger.warn("External View not found for table {}", tableNameWithType);
+      return ListUtils.EMPTY_LIST;
+    }
+    List<URI> onlineServerURIs = new ArrayList<>();
+    // Find out the ONLINE server serving the segment.
+    for (String segment : externalViewForResource.getPartitionSet()) {

Review comment:
       Why are we looping around here? Why not just get the statemap for a segment?
   `ExternalView.getStateMap(segmentName)` returns a Map<String, String> that we can iterate on.

##########
File path: pinot-common/src/main/java/org/apache/pinot/common/utils/fetcher/BaseSegmentFetcher.java
##########
@@ -75,6 +77,16 @@ public void fetchSegmentToLocal(URI uri, File dest)
     });
   }
 
+  @Override
+  public void fetchSegmentToLocal(List<URI> uris, File dest)

Review comment:
       Nice.
   How about changing this to apply retries with backoff but use one of the uris at random ? Sort of similar to lines 68-77

##########
File path: pinot-core/src/main/java/org/apache/pinot/core/data/manager/realtime/RealtimeTableDataManager.java
##########
@@ -268,20 +276,82 @@ public void addSegment(String segmentName, TableConfig tableConfig, IndexLoading
   }
 
   public void downloadAndReplaceSegment(String segmentName, LLCRealtimeSegmentZKMetadata llcSegmentMetadata,
-      IndexLoadingConfig indexLoadingConfig) {
+      IndexLoadingConfig indexLoadingConfig, TableConfig tableConfig) {
     final String uri = llcSegmentMetadata.getDownloadUrl();
+    if (uri != null && !uri.isEmpty()) {
+      try {
+        downloadSegmentFromDeepStore(segmentName, indexLoadingConfig, uri);
+      } catch (Exception e) {
+        // Download from deep store failed; try to download from peer if peer download is setup for the table.
+        if (isPeerSegmentDownloadEnabled(tableConfig)) {
+          downloadSegmentFromPeer(segmentName, tableConfig.getValidationConfig().getPeerSegmentDownloadScheme(), indexLoadingConfig);

Review comment:
       need another try/catch here for exceptions, and a similar log if that fails.

##########
File path: pinot-core/src/main/java/org/apache/pinot/core/data/manager/realtime/RealtimeTableDataManager.java
##########
@@ -268,20 +273,81 @@ public void addSegment(String segmentName, TableConfig tableConfig, IndexLoading
   }
 
   public void downloadAndReplaceSegment(String segmentName, LLCRealtimeSegmentZKMetadata llcSegmentMetadata,
-      IndexLoadingConfig indexLoadingConfig) {
+      IndexLoadingConfig indexLoadingConfig, TableConfig tableConfig) {
     final String uri = llcSegmentMetadata.getDownloadUrl();
+    if (!"PEER".equalsIgnoreCase(uri)) {
+      try {
+        downloadSegmentFromDeepStore(segmentName, indexLoadingConfig, uri);
+      } catch (Exception e) {
+        // Download from deep store failed; try to download from peer if peer download is setup for the table.
+        if (isPeerSegmentDownloadEnabled(tableConfig)) {
+          downloadSegmentFromPeer(segmentName, tableConfig.getValidationConfig().getPeerSegmentDownloadScheme(), indexLoadingConfig);
+        } else {
+          throw e;
+        }
+      }
+    } else {
+      if (isPeerSegmentDownloadEnabled(tableConfig)) {
+        downloadSegmentFromPeer(segmentName, tableConfig.getValidationConfig().getPeerSegmentDownloadScheme(), indexLoadingConfig);
+      } else {
+        throw new RuntimeException("Peer segment download not enabled for segment " + segmentName);
+      }
+    }
+  }
+
+  private void downloadSegmentFromDeepStore(String segmentName, IndexLoadingConfig indexLoadingConfig, String uri) {
     File tempSegmentFolder = new File(_indexDir, "tmp-" + segmentName + "." + System.currentTimeMillis());
     File tempFile = new File(_indexDir, segmentName + ".tar.gz");
-    final File segmentFolder = new File(_indexDir, segmentName);
-    FileUtils.deleteQuietly(segmentFolder);
     try {
       SegmentFetcherFactory.fetchSegmentToLocal(uri, tempFile);
       _logger.info("Downloaded file from {} to {}; Length of downloaded file: {}", uri, tempFile, tempFile.length());
-      TarGzCompressionUtils.unTar(tempFile, tempSegmentFolder);
-      _logger.info("Uncompressed file {} into tmp dir {}", tempFile, tempSegmentFolder);
-      FileUtils.moveDirectory(tempSegmentFolder.listFiles()[0], segmentFolder);
-      _logger.info("Replacing LLC Segment {}", segmentName);
-      replaceLLSegment(segmentName, indexLoadingConfig);
+      untarAndMoveSegment(segmentName, indexLoadingConfig, tempSegmentFolder, tempFile);
+    } catch (Exception e) {
+      _logger.warn("Failed to download segment {} from deep store: ", segmentName, e);
+      throw new RuntimeException(e);
+    } finally {
+      FileUtils.deleteQuietly(tempFile);
+      FileUtils.deleteQuietly(tempSegmentFolder);
+    }
+  }
+
+  private void untarAndMoveSegment(String segmentName, IndexLoadingConfig indexLoadingConfig, File tempSegmentFolder,
+      File tempFile)
+      throws IOException, ArchiveException {
+    TarGzCompressionUtils.unTar(tempFile, tempSegmentFolder);
+    _logger.info("Uncompressed file {} into tmp dir {}", tempFile, tempSegmentFolder);
+    final File segmentFolder = new File(_indexDir, segmentName);
+    FileUtils.deleteQuietly(segmentFolder);
+    FileUtils.moveDirectory(tempSegmentFolder.listFiles()[0], segmentFolder);
+    _logger.info("Replacing LLC Segment {}", segmentName);
+    replaceLLSegment(segmentName, indexLoadingConfig);
+  }
+
+  private boolean isPeerSegmentDownloadEnabled(TableConfig tableConfig) {
+    return SegmentFetcherFactory.HTTP_PROTOCOL
+        .equalsIgnoreCase(tableConfig.getValidationConfig().getPeerSegmentDownloadScheme())
+        || SegmentFetcherFactory.HTTPS_PROTOCOL
+        .equalsIgnoreCase(tableConfig.getValidationConfig().getPeerSegmentDownloadScheme());
+  }
+
+  private void downloadSegmentFromPeer(String segmentName, String downloadScheme, IndexLoadingConfig indexLoadingConfig) {
+    File tempSegmentFolder = new File(_indexDir, "tmp-" + segmentName + "." + System.currentTimeMillis());
+    File tempFile = new File(_indexDir, segmentName + ".tar.gz");
+    try {
+      RetryPolicies.exponentialBackoffRetryPolicy(RETRY_COUNT, RETRY_WAIT_MS, RETRY_DELAY_SCALE_FACTOR).attempt(() -> {

Review comment:
       Move this retry logic to the BaseSegmentFetcher like I suggest there.




----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscribe@pinot.apache.org
For additional commands, e-mail: commits-help@pinot.apache.org


[GitHub] [incubator-pinot] chenboat commented on a change in pull request #5336: [Part 4] Deep-store bypass for LLC: Add a peer to peer segment fetcher.

Posted by GitBox <gi...@apache.org>.
chenboat commented on a change in pull request #5336:
URL: https://github.com/apache/incubator-pinot/pull/5336#discussion_r428992578



##########
File path: pinot-common/src/main/java/org/apache/pinot/common/utils/fetcher/PeerServerSegmentFetcher.java
##########
@@ -0,0 +1,149 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.pinot.common.utils.fetcher;
+
+import java.io.File;
+import java.net.URI;
+import java.util.ArrayList;
+import java.util.List;
+import java.util.Map;
+import java.util.Random;
+import org.apache.commons.configuration.Configuration;
+import org.apache.helix.HelixAdmin;
+import org.apache.helix.HelixManager;
+import org.apache.helix.model.ExternalView;
+import org.apache.helix.model.InstanceConfig;
+import org.apache.pinot.common.utils.CommonConstants;
+import org.apache.pinot.common.utils.LLCSegmentName;
+import org.apache.pinot.common.utils.StringUtil;
+import org.apache.pinot.common.utils.helix.HelixHelper;
+import org.apache.pinot.spi.config.table.TableType;
+import org.apache.pinot.spi.plugin.PluginManager;
+import org.apache.pinot.spi.utils.builder.TableNameBuilder;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+
+/**
+ * This segment fetcher downloads the segment by first discovering the server having the segment through external view
+ * of a Pinot table and then downloading the segment from the peer server using a configured http or https fetcher. By
+ * default, HttpSegmentFetcher is used.
+ * The format fo expected segment address uri is
+ *    server:///segment_name
+ * Note the host component is empty.
+ * To use this segment fetcher, servers need to put "server" in their segment fetcher protocol.
+ */
+public class PeerServerSegmentFetcher extends BaseSegmentFetcher {
+  private static final Logger LOGGER = LoggerFactory.getLogger(PeerServerSegmentFetcher.class);
+  private static final String PEER_2_PEER_PROTOCOL = "server";
+  private static final String DOWNLOADER_CLASS = ".downloader.class";
+  private HelixManager _helixManager;
+  private String _helixClusterName;
+  private HttpSegmentFetcher _httpSegmentFetcher;
+
+  public PeerServerSegmentFetcher(Configuration config, HelixManager helixManager, String helixClusterName) {
+    _helixManager = helixManager;
+    _helixClusterName = helixClusterName;
+    String segmentDownloaderClass = config.getString(PEER_2_PEER_PROTOCOL + DOWNLOADER_CLASS);

Review comment:
       not applicable anymore as the this config value is not used.




----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscribe@pinot.apache.org
For additional commands, e-mail: commits-help@pinot.apache.org


[GitHub] [incubator-pinot] chenboat commented on a change in pull request #5336: [Part 4] Deep-store bypass for LLC: Add a peer to peer segment fetcher.

Posted by GitBox <gi...@apache.org>.
chenboat commented on a change in pull request #5336:
URL: https://github.com/apache/incubator-pinot/pull/5336#discussion_r428992491



##########
File path: pinot-common/src/main/java/org/apache/pinot/common/utils/fetcher/PeerServerSegmentFetcher.java
##########
@@ -0,0 +1,149 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.pinot.common.utils.fetcher;
+
+import java.io.File;
+import java.net.URI;
+import java.util.ArrayList;
+import java.util.List;
+import java.util.Map;
+import java.util.Random;
+import org.apache.commons.configuration.Configuration;
+import org.apache.helix.HelixAdmin;
+import org.apache.helix.HelixManager;
+import org.apache.helix.model.ExternalView;
+import org.apache.helix.model.InstanceConfig;
+import org.apache.pinot.common.utils.CommonConstants;
+import org.apache.pinot.common.utils.LLCSegmentName;
+import org.apache.pinot.common.utils.StringUtil;
+import org.apache.pinot.common.utils.helix.HelixHelper;
+import org.apache.pinot.spi.config.table.TableType;
+import org.apache.pinot.spi.plugin.PluginManager;
+import org.apache.pinot.spi.utils.builder.TableNameBuilder;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+
+/**
+ * This segment fetcher downloads the segment by first discovering the server having the segment through external view
+ * of a Pinot table and then downloading the segment from the peer server using a configured http or https fetcher. By
+ * default, HttpSegmentFetcher is used.
+ * The format fo expected segment address uri is
+ *    server:///segment_name
+ * Note the host component is empty.
+ * To use this segment fetcher, servers need to put "server" in their segment fetcher protocol.
+ */
+public class PeerServerSegmentFetcher extends BaseSegmentFetcher {
+  private static final Logger LOGGER = LoggerFactory.getLogger(PeerServerSegmentFetcher.class);

Review comment:
       done.




----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscribe@pinot.apache.org
For additional commands, e-mail: commits-help@pinot.apache.org


[GitHub] [incubator-pinot] chenboat commented on pull request #5336: [Part 4] Deep-store bypass for LLC: Add a peer to peer segment fetcher.

Posted by GitBox <gi...@apache.org>.
chenboat commented on pull request #5336:
URL: https://github.com/apache/incubator-pinot/pull/5336#issuecomment-640962562


   > May be you can introduce a multiple URI segment fetcher, provide all the peer segment URIs to it, and let the SegmentFetcher select a random uri in each retry?
   
   I am not sure if adding a new segment fetcher class is a good idea. The segmentFinder class returns a random uri in each retry already. 


----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscribe@pinot.apache.org
For additional commands, e-mail: commits-help@pinot.apache.org


[GitHub] [incubator-pinot] chenboat commented on a change in pull request #5336: [Part 4] Deep-store bypass for LLC: Add a peer to peer segment fetcher.

Posted by GitBox <gi...@apache.org>.
chenboat commented on a change in pull request #5336:
URL: https://github.com/apache/incubator-pinot/pull/5336#discussion_r445709820



##########
File path: pinot-common/src/main/java/org/apache/pinot/common/utils/fetcher/PeerServerSegmentFetcher.java
##########
@@ -0,0 +1,155 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.pinot.common.utils.fetcher;
+
+import com.google.common.annotations.VisibleForTesting;
+import java.io.File;
+import java.net.URI;
+import java.util.ArrayList;
+import java.util.List;
+import java.util.Map;
+import java.util.Random;
+import org.apache.commons.configuration.Configuration;
+import org.apache.helix.HelixAdmin;
+import org.apache.helix.HelixManager;
+import org.apache.helix.model.ExternalView;
+import org.apache.helix.model.InstanceConfig;
+import org.apache.pinot.common.utils.CommonConstants;
+import org.apache.pinot.common.utils.LLCSegmentName;
+import org.apache.pinot.common.utils.StringUtil;
+import org.apache.pinot.common.utils.helix.HelixHelper;
+import org.apache.pinot.spi.config.table.TableType;
+import org.apache.pinot.spi.utils.builder.TableNameBuilder;
+
+/**
+ * This segment fetcher downloads the segment by first discovering the server having the segment through external view
+ * of a Pinot table and then downloading the segment from the peer server using a configured http or https fetcher. By
+ * default, HttpSegmentFetcher is used.
+ * The format fo expected segment address uri is
+ *    peer:///segment_name
+ * Note the host component is empty.
+ */
+public class PeerServerSegmentFetcher extends BaseSegmentFetcher {
+  private static final String PEER_2_PEER_PROTOCOL = "peer";
+  private static final String PEER_SEGMENT_DOWNLOAD_SCHEME = "peerSegmentDownloadScheme";
+  private HelixManager _helixManager;
+  private String _helixClusterName;
+  private HttpSegmentFetcher _httpSegmentFetcher;
+  // The value is either https or http
+  private final String _httpScheme;
+
+  public PeerServerSegmentFetcher(Configuration config, HelixManager helixManager, String helixClusterName) {
+    _helixManager = helixManager;
+    _helixClusterName = helixClusterName;
+    switch (config.getString(PEER_SEGMENT_DOWNLOAD_SCHEME)) {

Review comment:
       this is obsolete now.




----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscribe@pinot.apache.org
For additional commands, e-mail: commits-help@pinot.apache.org


[GitHub] [incubator-pinot] codecov-commenter edited a comment on pull request #5336: [Part 4] Deep-store bypass for LLC: Add a peer to peer segment fetcher.

Posted by GitBox <gi...@apache.org>.
codecov-commenter edited a comment on pull request #5336:
URL: https://github.com/apache/incubator-pinot/pull/5336#issuecomment-642301046


   # [Codecov](https://codecov.io/gh/apache/incubator-pinot/pull/5336?src=pr&el=h1) Report
   > :exclamation: No coverage uploaded for pull request base (`master@5ebcacf`). [Click here to learn what that means](https://docs.codecov.io/docs/error-reference#section-missing-base-commit).
   > The diff coverage is `55.55%`.
   
   [![Impacted file tree graph](https://codecov.io/gh/apache/incubator-pinot/pull/5336/graphs/tree.svg?width=650&height=150&src=pr&token=4ibza2ugkz)](https://codecov.io/gh/apache/incubator-pinot/pull/5336?src=pr&el=tree)
   
   ```diff
   @@            Coverage Diff            @@
   ##             master    #5336   +/-   ##
   =========================================
     Coverage          ?   66.32%           
   =========================================
     Files             ?     1105           
     Lines             ?    56965           
     Branches          ?     8516           
   =========================================
     Hits              ?    37782           
     Misses            ?    16393           
     Partials          ?     2790           
   ```
   
   | Flag | Coverage Δ | |
   |---|---|---|
   | #integrationtests | `44.79% <25.25%> (?)` | |
   | #unittests | `56.77% <37.37%> (?)` | |
   
   | [Impacted Files](https://codecov.io/gh/apache/incubator-pinot/pull/5336?src=pr&el=tree) | Coverage Δ | |
   |---|---|---|
   | [...org/apache/pinot/common/utils/CommonConstants.java](https://codecov.io/gh/apache/incubator-pinot/pull/5336/diff?src=pr&el=tree#diff-cGlub3QtY29tbW9uL3NyYy9tYWluL2phdmEvb3JnL2FwYWNoZS9waW5vdC9jb21tb24vdXRpbHMvQ29tbW9uQ29uc3RhbnRzLmphdmE=) | `39.02% <ø> (ø)` | |
   | [...e/pinot/common/utils/FileUploadDownloadClient.java](https://codecov.io/gh/apache/incubator-pinot/pull/5336/diff?src=pr&el=tree#diff-cGlub3QtY29tbW9uL3NyYy9tYWluL2phdmEvb3JnL2FwYWNoZS9waW5vdC9jb21tb24vdXRpbHMvRmlsZVVwbG9hZERvd25sb2FkQ2xpZW50LmphdmE=) | `49.40% <ø> (ø)` | |
   | [...ava/org/apache/pinot/common/utils/SchemaUtils.java](https://codecov.io/gh/apache/incubator-pinot/pull/5336/diff?src=pr&el=tree#diff-cGlub3QtY29tbW9uL3NyYy9tYWluL2phdmEvb3JnL2FwYWNoZS9waW5vdC9jb21tb24vdXRpbHMvU2NoZW1hVXRpbHMuamF2YQ==) | `9.45% <0.00%> (ø)` | |
   | [...he/pinot/common/utils/config/TableConfigUtils.java](https://codecov.io/gh/apache/incubator-pinot/pull/5336/diff?src=pr&el=tree#diff-cGlub3QtY29tbW9uL3NyYy9tYWluL2phdmEvb3JnL2FwYWNoZS9waW5vdC9jb21tb24vdXRpbHMvY29uZmlnL1RhYmxlQ29uZmlnVXRpbHMuamF2YQ==) | `87.61% <0.00%> (ø)` | |
   | [...ot/common/utils/fetcher/SegmentFetcherFactory.java](https://codecov.io/gh/apache/incubator-pinot/pull/5336/diff?src=pr&el=tree#diff-cGlub3QtY29tbW9uL3NyYy9tYWluL2phdmEvb3JnL2FwYWNoZS9waW5vdC9jb21tb24vdXRpbHMvZmV0Y2hlci9TZWdtZW50RmV0Y2hlckZhY3RvcnkuamF2YQ==) | `89.47% <ø> (ø)` | |
   | [...he/pinot/common/utils/webhdfs/WebHdfsV1Client.java](https://codecov.io/gh/apache/incubator-pinot/pull/5336/diff?src=pr&el=tree#diff-cGlub3QtY29tbW9uL3NyYy9tYWluL2phdmEvb3JnL2FwYWNoZS9waW5vdC9jb21tb24vdXRpbHMvd2ViaGRmcy9XZWJIZGZzVjFDbGllbnQuamF2YQ==) | `0.00% <ø> (ø)` | |
   | [...org/apache/pinot/controller/ControllerStarter.java](https://codecov.io/gh/apache/incubator-pinot/pull/5336/diff?src=pr&el=tree#diff-cGlub3QtY29udHJvbGxlci9zcmMvbWFpbi9qYXZhL29yZy9hcGFjaGUvcGlub3QvY29udHJvbGxlci9Db250cm9sbGVyU3RhcnRlci5qYXZh) | `73.35% <0.00%> (ø)` | |
   | [...altime/ServerSegmentCompletionProtocolHandler.java](https://codecov.io/gh/apache/incubator-pinot/pull/5336/diff?src=pr&el=tree#diff-cGlub3QtY29yZS9zcmMvbWFpbi9qYXZhL29yZy9hcGFjaGUvcGlub3Qvc2VydmVyL3JlYWx0aW1lL1NlcnZlclNlZ21lbnRDb21wbGV0aW9uUHJvdG9jb2xIYW5kbGVyLmphdmE=) | `35.00% <ø> (ø)` | |
   | [...in/java/org/apache/pinot/minion/MinionStarter.java](https://codecov.io/gh/apache/incubator-pinot/pull/5336/diff?src=pr&el=tree#diff-cGlub3QtbWluaW9uL3NyYy9tYWluL2phdmEvb3JnL2FwYWNoZS9waW5vdC9taW5pb24vTWluaW9uU3RhcnRlci5qYXZh) | `82.55% <ø> (ø)` | |
   | [...va/org/apache/pinot/controller/ControllerConf.java](https://codecov.io/gh/apache/incubator-pinot/pull/5336/diff?src=pr&el=tree#diff-cGlub3QtY29udHJvbGxlci9zcmMvbWFpbi9qYXZhL29yZy9hcGFjaGUvcGlub3QvY29udHJvbGxlci9Db250cm9sbGVyQ29uZi5qYXZh) | `49.47% <33.33%> (ø)` | |
   | ... and [1114 more](https://codecov.io/gh/apache/incubator-pinot/pull/5336/diff?src=pr&el=tree-more) | |
   
   ------
   
   [Continue to review full report at Codecov](https://codecov.io/gh/apache/incubator-pinot/pull/5336?src=pr&el=continue).
   > **Legend** - [Click here to learn more](https://docs.codecov.io/docs/codecov-delta)
   > `Δ = absolute <relative> (impact)`, `ø = not affected`, `? = missing data`
   > Powered by [Codecov](https://codecov.io/gh/apache/incubator-pinot/pull/5336?src=pr&el=footer). Last update [5ebcacf...c7fa444](https://codecov.io/gh/apache/incubator-pinot/pull/5336?src=pr&el=lastupdated). Read the [comment docs](https://docs.codecov.io/docs/pull-request-comments).
   


----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscribe@pinot.apache.org
For additional commands, e-mail: commits-help@pinot.apache.org


[GitHub] [incubator-pinot] chenboat commented on a change in pull request #5336: [Part 4] Deep-store bypass for LLC: Add a peer to peer segment fetcher.

Posted by GitBox <gi...@apache.org>.
chenboat commented on a change in pull request #5336:
URL: https://github.com/apache/incubator-pinot/pull/5336#discussion_r448535327



##########
File path: pinot-core/src/main/java/org/apache/pinot/core/data/manager/realtime/RealtimeTableDataManager.java
##########
@@ -268,20 +276,82 @@ public void addSegment(String segmentName, TableConfig tableConfig, IndexLoading
   }
 
   public void downloadAndReplaceSegment(String segmentName, LLCRealtimeSegmentZKMetadata llcSegmentMetadata,
-      IndexLoadingConfig indexLoadingConfig) {
+      IndexLoadingConfig indexLoadingConfig, TableConfig tableConfig) {
     final String uri = llcSegmentMetadata.getDownloadUrl();
+    if (uri != null && !uri.isEmpty()) {
+      try {
+        downloadSegmentFromDeepStore(segmentName, indexLoadingConfig, uri);
+      } catch (Exception e) {
+        // Download from deep store failed; try to download from peer if peer download is setup for the table.
+        if (isPeerSegmentDownloadEnabled(tableConfig)) {
+          downloadSegmentFromPeer(segmentName, tableConfig.getValidationConfig().getPeerSegmentDownloadScheme(), indexLoadingConfig);

Review comment:
       The try/catch is in the downloadSegmentFromPeer method already. Added a warning log there.




----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscribe@pinot.apache.org
For additional commands, e-mail: commits-help@pinot.apache.org


[GitHub] [incubator-pinot] mcvsubbu commented on a change in pull request #5336: [Part 4] Deep-store bypass for LLC: Add a peer to peer segment fetcher.

Posted by GitBox <gi...@apache.org>.
mcvsubbu commented on a change in pull request #5336:
URL: https://github.com/apache/incubator-pinot/pull/5336#discussion_r422387472



##########
File path: pinot-common/src/main/java/org/apache/pinot/common/utils/fetcher/PeerServerSegmentFetcher.java
##########
@@ -0,0 +1,149 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.pinot.common.utils.fetcher;
+
+import java.io.File;
+import java.net.URI;
+import java.util.ArrayList;
+import java.util.List;
+import java.util.Map;
+import java.util.Random;
+import org.apache.commons.configuration.Configuration;
+import org.apache.helix.HelixAdmin;
+import org.apache.helix.HelixManager;
+import org.apache.helix.model.ExternalView;
+import org.apache.helix.model.InstanceConfig;
+import org.apache.pinot.common.utils.CommonConstants;
+import org.apache.pinot.common.utils.LLCSegmentName;
+import org.apache.pinot.common.utils.StringUtil;
+import org.apache.pinot.common.utils.helix.HelixHelper;
+import org.apache.pinot.spi.config.table.TableType;
+import org.apache.pinot.spi.plugin.PluginManager;
+import org.apache.pinot.spi.utils.builder.TableNameBuilder;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+
+/**
+ * This segment fetcher downloads the segment by first discovering the server having the segment through external view
+ * of a Pinot table and then downloading the segment from the peer server using a configured http or https fetcher. By
+ * default, HttpSegmentFetcher is used.
+ * The format fo expected segment address uri is
+ *    server:///segment_name
+ * Note the host component is empty.
+ * To use this segment fetcher, servers need to put "server" in their segment fetcher protocol.
+ */
+public class PeerServerSegmentFetcher extends BaseSegmentFetcher {
+  private static final Logger LOGGER = LoggerFactory.getLogger(PeerServerSegmentFetcher.class);
+  private static final String PEER_2_PEER_PROTOCOL = "server";
+  private static final String DOWNLOADER_CLASS = ".downloader.class";
+  private HelixManager _helixManager;
+  private String _helixClusterName;
+  private HttpSegmentFetcher _httpSegmentFetcher;
+
+  public PeerServerSegmentFetcher(Configuration config, HelixManager helixManager, String helixClusterName) {
+    _helixManager = helixManager;
+    _helixClusterName = helixClusterName;
+    String segmentDownloaderClass = config.getString(PEER_2_PEER_PROTOCOL + DOWNLOADER_CLASS);
+    try {
+      _httpSegmentFetcher = PluginManager.get().createInstance(segmentDownloaderClass);

Review comment:
       I meant construct uri in this class (like you are doing) but based on the configured scheme




----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscribe@pinot.apache.org
For additional commands, e-mail: commits-help@pinot.apache.org


[GitHub] [incubator-pinot] chenboat commented on a change in pull request #5336: [Part 4] Deep-store bypass for LLC: Add a peer to peer segment fetcher.

Posted by GitBox <gi...@apache.org>.
chenboat commented on a change in pull request #5336:
URL: https://github.com/apache/incubator-pinot/pull/5336#discussion_r437074675



##########
File path: pinot-core/src/main/java/org/apache/pinot/core/data/manager/realtime/RealtimeTableDataManager.java
##########
@@ -268,20 +273,81 @@ public void addSegment(String segmentName, TableConfig tableConfig, IndexLoading
   }
 
   public void downloadAndReplaceSegment(String segmentName, LLCRealtimeSegmentZKMetadata llcSegmentMetadata,
-      IndexLoadingConfig indexLoadingConfig) {
+      IndexLoadingConfig indexLoadingConfig, TableConfig tableConfig) {
     final String uri = llcSegmentMetadata.getDownloadUrl();
+    if (!"PEER".equalsIgnoreCase(uri)) {
+      try {
+        downloadSegmentFromDeepStore(segmentName, indexLoadingConfig, uri);
+      } catch (Exception e) {
+        // Download from deep store failed; try to download from peer if peer download is setup for the table.
+        if (isPeerSegmentDownloadEnabled(tableConfig)) {
+          downloadSegmentFromPeer(segmentName, tableConfig.getValidationConfig().getPeerSegmentDownloadScheme(), indexLoadingConfig);
+        } else {
+          throw e;
+        }
+      }
+    } else {
+      if (isPeerSegmentDownloadEnabled(tableConfig)) {
+        downloadSegmentFromPeer(segmentName, tableConfig.getValidationConfig().getPeerSegmentDownloadScheme(), indexLoadingConfig);
+      } else {
+        throw new RuntimeException("Peer segment download not enabled for segment " + segmentName);
+      }
+    }
+  }
+
+  private void downloadSegmentFromDeepStore(String segmentName, IndexLoadingConfig indexLoadingConfig, String uri) {
     File tempSegmentFolder = new File(_indexDir, "tmp-" + segmentName + "." + System.currentTimeMillis());
     File tempFile = new File(_indexDir, segmentName + ".tar.gz");
-    final File segmentFolder = new File(_indexDir, segmentName);
-    FileUtils.deleteQuietly(segmentFolder);
     try {
       SegmentFetcherFactory.fetchSegmentToLocal(uri, tempFile);
       _logger.info("Downloaded file from {} to {}; Length of downloaded file: {}", uri, tempFile, tempFile.length());
-      TarGzCompressionUtils.unTar(tempFile, tempSegmentFolder);
-      _logger.info("Uncompressed file {} into tmp dir {}", tempFile, tempSegmentFolder);
-      FileUtils.moveDirectory(tempSegmentFolder.listFiles()[0], segmentFolder);
-      _logger.info("Replacing LLC Segment {}", segmentName);
-      replaceLLSegment(segmentName, indexLoadingConfig);
+      untarAndMoveSegment(segmentName, indexLoadingConfig, tempSegmentFolder, tempFile);
+    } catch (Exception e) {
+      _logger.warn("Failed to download segment {} from deep store: ", segmentName, e);
+      throw new RuntimeException(e);
+    } finally {
+      FileUtils.deleteQuietly(tempFile);
+      FileUtils.deleteQuietly(tempSegmentFolder);
+    }
+  }
+
+  private void untarAndMoveSegment(String segmentName, IndexLoadingConfig indexLoadingConfig, File tempSegmentFolder,
+      File tempFile)
+      throws IOException, ArchiveException {
+    TarGzCompressionUtils.unTar(tempFile, tempSegmentFolder);
+    _logger.info("Uncompressed file {} into tmp dir {}", tempFile, tempSegmentFolder);
+    final File segmentFolder = new File(_indexDir, segmentName);
+    FileUtils.deleteQuietly(segmentFolder);
+    FileUtils.moveDirectory(tempSegmentFolder.listFiles()[0], segmentFolder);
+    _logger.info("Replacing LLC Segment {}", segmentName);
+    replaceLLSegment(segmentName, indexLoadingConfig);
+  }
+
+  private boolean isPeerSegmentDownloadEnabled(TableConfig tableConfig) {
+    return SegmentFetcherFactory.HTTP_PROTOCOL
+        .equalsIgnoreCase(tableConfig.getValidationConfig().getPeerSegmentDownloadScheme())
+        || SegmentFetcherFactory.HTTPS_PROTOCOL
+        .equalsIgnoreCase(tableConfig.getValidationConfig().getPeerSegmentDownloadScheme());
+  }
+
+  private void downloadSegmentFromPeer(String segmentName, String downloadScheme, IndexLoadingConfig indexLoadingConfig) {
+    File tempSegmentFolder = new File(_indexDir, "tmp-" + segmentName + "." + System.currentTimeMillis());
+    File tempFile = new File(_indexDir, segmentName + ".tar.gz");
+    try {
+      RetryPolicies.exponentialBackoffRetryPolicy(RETRY_COUNT, RETRY_WAIT_MS, RETRY_DELAY_SCALE_FACTOR).attempt(() -> {

Review comment:
       I know. Right now neither the SegmentFetcherFactory nor the SegmentFetcher provides the withoutRetry version as a public interface. More important, retries here also provides robustness for download, right? 

##########
File path: pinot-core/src/main/java/org/apache/pinot/core/data/manager/realtime/RealtimeTableDataManager.java
##########
@@ -268,20 +273,81 @@ public void addSegment(String segmentName, TableConfig tableConfig, IndexLoading
   }
 
   public void downloadAndReplaceSegment(String segmentName, LLCRealtimeSegmentZKMetadata llcSegmentMetadata,
-      IndexLoadingConfig indexLoadingConfig) {
+      IndexLoadingConfig indexLoadingConfig, TableConfig tableConfig) {
     final String uri = llcSegmentMetadata.getDownloadUrl();
+    if (!"PEER".equalsIgnoreCase(uri)) {

Review comment:
       How about put a valid uri string like peer:///segmentName in the metadata? we do not need to specify the concrete server. When servers needs to do peer download, they will use segmentFinder. 

##########
File path: pinot-core/src/main/java/org/apache/pinot/core/data/manager/realtime/RealtimeTableDataManager.java
##########
@@ -268,20 +273,81 @@ public void addSegment(String segmentName, TableConfig tableConfig, IndexLoading
   }
 
   public void downloadAndReplaceSegment(String segmentName, LLCRealtimeSegmentZKMetadata llcSegmentMetadata,
-      IndexLoadingConfig indexLoadingConfig) {
+      IndexLoadingConfig indexLoadingConfig, TableConfig tableConfig) {
     final String uri = llcSegmentMetadata.getDownloadUrl();
+    if (!"PEER".equalsIgnoreCase(uri)) {

Review comment:
       For sending email. yes and I will send it.




----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscribe@pinot.apache.org
For additional commands, e-mail: commits-help@pinot.apache.org


[GitHub] [incubator-pinot] chenboat commented on a change in pull request #5336: [Part 4] Deep-store bypass for LLC: Add a peer to peer segment fetcher.

Posted by GitBox <gi...@apache.org>.
chenboat commented on a change in pull request #5336:
URL: https://github.com/apache/incubator-pinot/pull/5336#discussion_r422427031



##########
File path: pinot-common/src/main/java/org/apache/pinot/common/utils/fetcher/PeerServerSegmentFetcher.java
##########
@@ -0,0 +1,149 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.pinot.common.utils.fetcher;
+
+import java.io.File;
+import java.net.URI;
+import java.util.ArrayList;
+import java.util.List;
+import java.util.Map;
+import java.util.Random;
+import org.apache.commons.configuration.Configuration;
+import org.apache.helix.HelixAdmin;
+import org.apache.helix.HelixManager;
+import org.apache.helix.model.ExternalView;
+import org.apache.helix.model.InstanceConfig;
+import org.apache.pinot.common.utils.CommonConstants;
+import org.apache.pinot.common.utils.LLCSegmentName;
+import org.apache.pinot.common.utils.StringUtil;
+import org.apache.pinot.common.utils.helix.HelixHelper;
+import org.apache.pinot.spi.config.table.TableType;
+import org.apache.pinot.spi.plugin.PluginManager;
+import org.apache.pinot.spi.utils.builder.TableNameBuilder;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+
+/**
+ * This segment fetcher downloads the segment by first discovering the server having the segment through external view
+ * of a Pinot table and then downloading the segment from the peer server using a configured http or https fetcher. By
+ * default, HttpSegmentFetcher is used.
+ * The format fo expected segment address uri is
+ *    server:///segment_name
+ * Note the host component is empty.
+ * To use this segment fetcher, servers need to put "server" in their segment fetcher protocol.
+ */
+public class PeerServerSegmentFetcher extends BaseSegmentFetcher {
+  private static final Logger LOGGER = LoggerFactory.getLogger(PeerServerSegmentFetcher.class);
+  private static final String PEER_2_PEER_PROTOCOL = "server";
+  private static final String DOWNLOADER_CLASS = ".downloader.class";
+  private HelixManager _helixManager;
+  private String _helixClusterName;
+  private HttpSegmentFetcher _httpSegmentFetcher;
+
+  public PeerServerSegmentFetcher(Configuration config, HelixManager helixManager, String helixClusterName) {
+    _helixManager = helixManager;
+    _helixClusterName = helixClusterName;
+    String segmentDownloaderClass = config.getString(PEER_2_PEER_PROTOCOL + DOWNLOADER_CLASS);
+    try {
+      _httpSegmentFetcher = PluginManager.get().createInstance(segmentDownloaderClass);

Review comment:
       The config I use now passed in the fetcher class (mainly for ease of testing). I can pass in scheme (http or https) too together with the class. But let us put this config choice in a broader discussion we had offline so that we plan everything in one discussion.




----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscribe@pinot.apache.org
For additional commands, e-mail: commits-help@pinot.apache.org


[GitHub] [incubator-pinot] chenboat commented on a change in pull request #5336: [Part 4] Deep-store bypass for LLC: Add a peer to peer segment fetcher.

Posted by GitBox <gi...@apache.org>.
chenboat commented on a change in pull request #5336:
URL: https://github.com/apache/incubator-pinot/pull/5336#discussion_r449273838



##########
File path: pinot-core/src/main/java/org/apache/pinot/core/data/manager/realtime/RealtimeTableDataManager.java
##########
@@ -57,9 +61,12 @@
 import org.apache.pinot.spi.data.FieldSpec;
 import org.apache.pinot.spi.data.Schema;
 
+import static org.apache.pinot.common.utils.CommonConstants.Segment.METADATA_URI_FOR_PEER_DOWNLOAD;
+
 
 @ThreadSafe
 public class RealtimeTableDataManager extends BaseTableDataManager {
+  private static final String EMPTY_URL = "";

Review comment:
       done.




----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscribe@pinot.apache.org
For additional commands, e-mail: commits-help@pinot.apache.org


[GitHub] [incubator-pinot] chenboat commented on a change in pull request #5336: [Part 4] Deep-store bypass for LLC: Add a peer to peer segment fetcher.

Posted by GitBox <gi...@apache.org>.
chenboat commented on a change in pull request #5336:
URL: https://github.com/apache/incubator-pinot/pull/5336#discussion_r422361717



##########
File path: pinot-common/src/main/java/org/apache/pinot/common/utils/fetcher/PeerServerSegmentFetcher.java
##########
@@ -0,0 +1,149 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.pinot.common.utils.fetcher;
+
+import java.io.File;
+import java.net.URI;
+import java.util.ArrayList;
+import java.util.List;
+import java.util.Map;
+import java.util.Random;
+import org.apache.commons.configuration.Configuration;
+import org.apache.helix.HelixAdmin;
+import org.apache.helix.HelixManager;
+import org.apache.helix.model.ExternalView;
+import org.apache.helix.model.InstanceConfig;
+import org.apache.pinot.common.utils.CommonConstants;
+import org.apache.pinot.common.utils.LLCSegmentName;
+import org.apache.pinot.common.utils.StringUtil;
+import org.apache.pinot.common.utils.helix.HelixHelper;
+import org.apache.pinot.spi.config.table.TableType;
+import org.apache.pinot.spi.plugin.PluginManager;
+import org.apache.pinot.spi.utils.builder.TableNameBuilder;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+
+/**
+ * This segment fetcher downloads the segment by first discovering the server having the segment through external view
+ * of a Pinot table and then downloading the segment from the peer server using a configured http or https fetcher. By
+ * default, HttpSegmentFetcher is used.
+ * The format fo expected segment address uri is
+ *    server:///segment_name
+ * Note the host component is empty.
+ * To use this segment fetcher, servers need to put "server" in their segment fetcher protocol.
+ */
+public class PeerServerSegmentFetcher extends BaseSegmentFetcher {
+  private static final Logger LOGGER = LoggerFactory.getLogger(PeerServerSegmentFetcher.class);
+  private static final String PEER_2_PEER_PROTOCOL = "server";
+  private static final String DOWNLOADER_CLASS = ".downloader.class";
+  private HelixManager _helixManager;
+  private String _helixClusterName;
+  private HttpSegmentFetcher _httpSegmentFetcher;
+
+  public PeerServerSegmentFetcher(Configuration config, HelixManager helixManager, String helixClusterName) {
+    _helixManager = helixManager;
+    _helixClusterName = helixClusterName;
+    String segmentDownloaderClass = config.getString(PEER_2_PEER_PROTOCOL + DOWNLOADER_CLASS);
+    try {
+      _httpSegmentFetcher = PluginManager.get().createInstance(segmentDownloaderClass);

Review comment:
       Because this fetcher download segments from peer servers, http and https segment fetchers would be sufficient for usage considering the download API server supports. 
   
   What do you mean "construct the rest of the url here"?  In the constructor? The complete url is a function of segment and we do not have segment info here. Right now the server discovery and url construction is done in per segment fetch basis. @mcvsubbu 




----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscribe@pinot.apache.org
For additional commands, e-mail: commits-help@pinot.apache.org