You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@pinot.apache.org by jl...@apache.org on 2020/07/29 20:39:19 UTC

[incubator-pinot] branch add-untar-failure-server-meter created (now ef93477)

This is an automated email from the ASF dual-hosted git repository.

jlli pushed a change to branch add-untar-failure-server-meter
in repository https://gitbox.apache.org/repos/asf/incubator-pinot.git.


      at ef93477  Add untar failure server meter

This branch includes the following new commits:

     new ef93477  Add untar failure server meter

The 1 revisions listed above as "new" are entirely new to this
repository and will be described in separate emails.  The revisions
listed as "add" were already present in the repository and have only
been added to this reference.



---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscribe@pinot.apache.org
For additional commands, e-mail: commits-help@pinot.apache.org


[incubator-pinot] 01/01: Add untar failure server meter

Posted by jl...@apache.org.
This is an automated email from the ASF dual-hosted git repository.

jlli pushed a commit to branch add-untar-failure-server-meter
in repository https://gitbox.apache.org/repos/asf/incubator-pinot.git

commit ef93477b20f84ffe57e0ee39481d3c759d5f86b7
Author: Jack Li(Analytics Engineering) <jl...@jlli-mn1.linkedin.biz>
AuthorDate: Wed Jul 29 13:38:55 2020 -0700

    Add untar failure server meter
---
 .../apache/pinot/common/metrics/ServerMeter.java   |  1 +
 .../server/starter/helix/HelixServerStarter.java   | 10 +++----
 .../starter/helix/SegmentFetcherAndLoader.java     | 32 ++++++++++++++--------
 3 files changed, 26 insertions(+), 17 deletions(-)

diff --git a/pinot-common/src/main/java/org/apache/pinot/common/metrics/ServerMeter.java b/pinot-common/src/main/java/org/apache/pinot/common/metrics/ServerMeter.java
index 3438415..0a8a30e 100644
--- a/pinot-common/src/main/java/org/apache/pinot/common/metrics/ServerMeter.java
+++ b/pinot-common/src/main/java/org/apache/pinot/common/metrics/ServerMeter.java
@@ -61,6 +61,7 @@ public enum ServerMeter implements AbstractMetrics.Meter {
   NUM_MISSING_SEGMENTS("segments", false),
   RELOAD_FAILURES("segments", false),
   REFRESH_FAILURES("segments", false),
+  UNTAR_FAILURES("segments", false),
 
   // Netty connection metrics
   NETTY_CONNECTION_BYTES_RECEIVED("nettyConnection", true),
diff --git a/pinot-server/src/main/java/org/apache/pinot/server/starter/helix/HelixServerStarter.java b/pinot-server/src/main/java/org/apache/pinot/server/starter/helix/HelixServerStarter.java
index e86e697..09b18ce 100644
--- a/pinot-server/src/main/java/org/apache/pinot/server/starter/helix/HelixServerStarter.java
+++ b/pinot-server/src/main/java/org/apache/pinot/server/starter/helix/HelixServerStarter.java
@@ -159,13 +159,13 @@ public class HelixServerStarter implements ServiceStartable {
         _serverConf.getProperty(CommonConstants.Helix.SET_INSTANCE_ID_TO_HOSTNAME_KEY, false) ? NetUtil
             .getHostnameOrAddress() : NetUtil.getHostAddress());
     _port = _serverConf.getProperty(KEY_OF_SERVER_NETTY_PORT, DEFAULT_SERVER_NETTY_PORT);
-    
+
     _instanceId = Optional.ofNullable(_serverConf.getProperty(CONFIG_OF_INSTANCE_ID))
 
         // InstanceId is not configured. Fallback to an auto generated config.
         .orElseGet(this::initializeDefaultInstanceId);
   }
-  
+
   private String initializeDefaultInstanceId() {
     String instanceId = PREFIX_OF_SERVER_INSTANCE + _host + "_" + _port;
 
@@ -367,8 +367,9 @@ public class HelixServerStarter implements ServiceStartable {
         .init(_serverConf.subset(SegmentCompletionProtocol.PREFIX_OF_CONFIG_OF_SEGMENT_UPLOADER));
     ServerConf serverInstanceConfig = DefaultHelixStarterServerConfig.getDefaultHelixServerConfig(_serverConf);
     _serverInstance = new ServerInstance(serverInstanceConfig, _helixManager);
+    ServerMetrics serverMetrics = _serverInstance.getServerMetrics();
     InstanceDataManager instanceDataManager = _serverInstance.getInstanceDataManager();
-    SegmentFetcherAndLoader fetcherAndLoader = new SegmentFetcherAndLoader(_serverConf, instanceDataManager);
+    SegmentFetcherAndLoader fetcherAndLoader = new SegmentFetcherAndLoader(_serverConf, instanceDataManager, serverMetrics);
     StateModelFactory<?> stateModelFactory =
         new SegmentOnlineOfflineStateModelFactory(_instanceId, instanceDataManager, fetcherAndLoader);
     _helixManager.getStateMachineEngine()
@@ -400,7 +401,6 @@ public class HelixServerStarter implements ServiceStartable {
     _adminApiApplication.start(adminApiPort);
     setAdminApiPort(adminApiPort);
 
-    ServerMetrics serverMetrics = _serverInstance.getServerMetrics();
     // Register message handler factory
     SegmentMessageHandlerFactory messageHandlerFactory =
         new SegmentMessageHandlerFactory(fetcherAndLoader, instanceDataManager, serverMetrics);
@@ -643,7 +643,7 @@ public class HelixServerStarter implements ServiceStartable {
     properties.put(KEY_OF_SERVER_NETTY_PORT, port);
     properties.put(CONFIG_OF_INSTANCE_DATA_DIR, "/tmp/PinotServer/test" + port + "/index");
     properties.put(CONFIG_OF_INSTANCE_SEGMENT_TAR_DIR, "/tmp/PinotServer/test" + port + "/segmentTar");
-    
+
     HelixServerStarter serverStarter =
         new HelixServerStarter("quickstart", "localhost:2191", new PinotConfiguration(properties));
     serverStarter.start();
diff --git a/pinot-server/src/main/java/org/apache/pinot/server/starter/helix/SegmentFetcherAndLoader.java b/pinot-server/src/main/java/org/apache/pinot/server/starter/helix/SegmentFetcherAndLoader.java
index 85256af..e5f3c35 100644
--- a/pinot-server/src/main/java/org/apache/pinot/server/starter/helix/SegmentFetcherAndLoader.java
+++ b/pinot-server/src/main/java/org/apache/pinot/server/starter/helix/SegmentFetcherAndLoader.java
@@ -28,6 +28,8 @@ import org.apache.commons.io.FileUtils;
 import org.apache.pinot.common.Utils;
 import org.apache.pinot.common.metadata.ZKMetadataProvider;
 import org.apache.pinot.common.metadata.segment.OfflineSegmentZKMetadata;
+import org.apache.pinot.common.metrics.ServerMeter;
+import org.apache.pinot.common.metrics.ServerMetrics;
 import org.apache.pinot.common.utils.CommonConstants;
 import org.apache.pinot.common.utils.TarGzCompressionUtils;
 import org.apache.pinot.common.utils.fetcher.SegmentFetcherFactory;
@@ -53,10 +55,12 @@ public class SegmentFetcherAndLoader {
   private static final String ENCODED_SUFFIX = ".enc";
 
   private final InstanceDataManager _instanceDataManager;
+  private final ServerMetrics _serverMetrics;
 
-  public SegmentFetcherAndLoader(PinotConfiguration config, InstanceDataManager instanceDataManager)
+  public SegmentFetcherAndLoader(PinotConfiguration config, InstanceDataManager instanceDataManager, ServerMetrics serverMetrics)
       throws Exception {
     _instanceDataManager = instanceDataManager;
+    _serverMetrics = serverMetrics;
 
     PinotConfiguration pinotFSConfig = config.subset(CommonConstants.Server.PREFIX_OF_CONFIG_OF_PINOT_FS_FACTORY);
     PinotConfiguration segmentFetcherFactoryConfig =
@@ -202,18 +206,22 @@ public class SegmentFetcherAndLoader {
           .info("Downloaded tarred segment: {} for table: {} from: {} to: {}, file length: {}", segmentName, tableName,
               uri, tempTarFile, tempTarFile.length());
 
-      // If an exception is thrown when untarring, it means the tar file is broken OR not found after the retry.
-      // Thus, there's no need to retry again.
-      File tempIndexDir = TarGzCompressionUtils.untar(tempTarFile, tempSegmentDir).get(0);
-
-      File indexDir = new File(new File(_instanceDataManager.getSegmentDataDirectory(), tableName), segmentName);
-      if (indexDir.exists()) {
-        LOGGER.info("Deleting existing index directory for segment: {} for table: {}", segmentName, tableName);
-        FileUtils.deleteDirectory(indexDir);
+      try {
+        // If an exception is thrown when untarring, it means the tar file is broken OR not found after the retry.
+        // Thus, there's no need to retry again.
+        File tempIndexDir = TarGzCompressionUtils.untar(tempTarFile, tempSegmentDir).get(0);
+        File indexDir = new File(new File(_instanceDataManager.getSegmentDataDirectory(), tableName), segmentName);
+        if (indexDir.exists()) {
+          LOGGER.info("Deleting existing index directory for segment: {} for table: {}", segmentName, tableName);
+          FileUtils.deleteDirectory(indexDir);
+        }
+        FileUtils.moveDirectory(tempIndexDir, indexDir);
+        LOGGER.info("Successfully downloaded segment: {} for table: {} to: {}", segmentName, tableName, indexDir);
+        return indexDir.getAbsolutePath();
+      } catch (Exception e) {
+        _serverMetrics.addMeteredTableValue(tableName, ServerMeter.UNTAR_FAILURES, 1L);
+        throw e;
       }
-      FileUtils.moveDirectory(tempIndexDir, indexDir);
-      LOGGER.info("Successfully downloaded segment: {} for table: {} to: {}", segmentName, tableName, indexDir);
-      return indexDir.getAbsolutePath();
     } finally {
       FileUtils.deleteQuietly(tempDir);
     }


---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscribe@pinot.apache.org
For additional commands, e-mail: commits-help@pinot.apache.org