You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@pinot.apache.org by GitBox <gi...@apache.org> on 2020/03/05 01:02:02 UTC

[GitHub] [incubator-pinot] chenboat commented on a change in pull request #4914: [POC] By-passing deep-store requirement for Realtime segment completion

chenboat commented on a change in pull request #4914: [POC] By-passing deep-store requirement for Realtime segment completion
URL: https://github.com/apache/incubator-pinot/pull/4914#discussion_r388023936
 
 

 ##########
 File path: pinot-core/src/main/java/org/apache/pinot/core/data/manager/realtime/RealtimeTableDataManager.java
 ##########
 @@ -257,21 +280,78 @@ public void downloadAndReplaceSegment(String segmentName, LLCRealtimeSegmentZKMe
     final File segmentFolder = new File(_indexDir, segmentName);
     FileUtils.deleteQuietly(segmentFolder);
     try {
-      SegmentFetcherFactory.getInstance().getSegmentFetcherBasedOnURI(uri).fetchSegmentToLocal(uri, tempFile);
-      _logger.info("Downloaded file from {} to {}; Length of downloaded file: {}", uri, tempFile, tempFile.length());
+      boolean downloadResult = downloadFromURI(uri, tempFile);
+      if (!downloadResult) {
+         String peerServerUri = getPeerServerURI(segmentName);
+         if (peerServerUri == null || !downloadFromURI(peerServerUri, tempFile)) {
+           _logger.warn("Download segment {} from {} failed.", segmentName, peerServerUri);
+           return;
+         }
+      }
       TarGzCompressionUtils.unTar(tempFile, tempSegmentFolder);
-      _logger.info("Uncompressed file {} into tmp dir {}", tempFile, tempSegmentFolder);
+      _logger.warn("Uncompressed file {} into tmp dir {}", tempFile, tempSegmentFolder);
       FileUtils.moveDirectory(tempSegmentFolder.listFiles()[0], segmentFolder);
-      _logger.info("Replacing LLC Segment {}", segmentName);
+      _logger.warn("Replacing LLC Segment {}", segmentName);
       replaceLLSegment(segmentName, indexLoadingConfig);
     } catch (Exception e) {
+      _logger.error("Failed to download segment {}.", e.getMessage());
       throw new RuntimeException(e);
     } finally {
       FileUtils.deleteQuietly(tempFile);
       FileUtils.deleteQuietly(tempSegmentFolder);
     }
   }
 
+  // Return the address of an ONLINE server hosting a segment.
+  private String getPeerServerURI(String segmentName) {
+    ExternalView externalViewForResource = HelixHelper.getExternalViewForResource(_helixAdmin, _clusterName, _tableNameWithType);
+    if (externalViewForResource == null ) {
+      _logger.warn("External View not found for segment {}", segmentName);
+      return null;
+    }
+    // Find out the ONLINE server serving the segment.
+    for(String segment : externalViewForResource.getPartitionSet()) {
+      if (!segmentName.equals(segment)) {
+        continue;
+      }
+
+      Map<String, String> instanceToStateMap = externalViewForResource.getStateMap(segmentName);
+      for (Map.Entry<String, String> instanceState : instanceToStateMap.entrySet()) {
+        if ("ONLINE".equals(instanceState.getValue())) {
+          _logger.info("Found ONLINE server {} for segment {}.", instanceState.getKey(), segmentName);
+          String instanceId = instanceState.getKey();
+
+          String namePortStr = instanceId.split(CommonConstants.Helix.PREFIX_OF_SERVER_INSTANCE)[1];
+          String hostName = namePortStr.split("_")[0];
+
+          Map<String, String> instanceConfig =
+              HelixHelper.getInstanceConfigsMapFor(instanceId, _clusterName, _helixAdmin);
+          int port;
+          try {
+            port = Integer.parseInt(instanceConfig.get(CommonConstants.Helix.Instance.ADMIN_PORT_KEY));
+          } catch (Exception e) {
+            port = CommonConstants.Helix.DEFAULT_SERVER_NETTY_PORT;
+          }
+
+          return StringUtil.join("/", "http://" + hostName + ":" + port,
 
 Review comment:
   done.

----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
users@infra.apache.org


With regards,
Apache Git Services

---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscribe@pinot.apache.org
For additional commands, e-mail: commits-help@pinot.apache.org