You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@pinot.apache.org by ja...@apache.org on 2019/04/03 04:56:39 UTC

[incubator-pinot] branch fix_service_status created (now 27d168d)

This is an automated email from the ASF dual-hosted git repository.

jackie pushed a change to branch fix_service_status
in repository https://gitbox.apache.org/repos/asf/incubator-pinot.git.


      at 27d168d  In ServiceStatus, replace Apache math Pair with inner StatusDescriptionPair class

This branch includes the following new commits:

     new 27d168d  In ServiceStatus, replace Apache math Pair with inner StatusDescriptionPair class

The 1 revisions listed above as "new" are entirely new to this
repository and will be described in separate emails.  The revisions
listed as "add" were already present in the repository and have only
been added to this reference.



---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscribe@pinot.apache.org
For additional commands, e-mail: commits-help@pinot.apache.org


[incubator-pinot] 01/01: In ServiceStatus, replace Apache math Pair with inner StatusDescriptionPair class

Posted by ja...@apache.org.
This is an automated email from the ASF dual-hosted git repository.

jackie pushed a commit to branch fix_service_status
in repository https://gitbox.apache.org/repos/asf/incubator-pinot.git

commit 27d168d85a47c7093cd60347bb688edbba544511
Author: Jackie (Xiaotian) Jiang <xa...@linkedin.com>
AuthorDate: Tue Apr 2 21:54:41 2019 -0700

    In ServiceStatus, replace Apache math Pair with inner StatusDescriptionPair class
    
    Also fix the integration test to correctly include resources in ServiceStatus
---
 .../apache/pinot/common/utils/ServiceStatus.java   |  64 +++++++------
 .../tests/OfflineClusterIntegrationTest.java       | 103 +++++++++------------
 2 files changed, 83 insertions(+), 84 deletions(-)

diff --git a/pinot-common/src/main/java/org/apache/pinot/common/utils/ServiceStatus.java b/pinot-common/src/main/java/org/apache/pinot/common/utils/ServiceStatus.java
index 3fc8d01..5beea88 100644
--- a/pinot-common/src/main/java/org/apache/pinot/common/utils/ServiceStatus.java
+++ b/pinot-common/src/main/java/org/apache/pinot/common/utils/ServiceStatus.java
@@ -24,7 +24,6 @@ import java.util.Iterator;
 import java.util.List;
 import java.util.Map;
 import java.util.Set;
-import org.apache.commons.math3.util.Pair;
 import org.apache.helix.HelixAdmin;
 import org.apache.helix.HelixDataAccessor;
 import org.apache.helix.HelixManager;
@@ -49,12 +48,11 @@ public class ServiceStatus {
     STARTING, GOOD, BAD
   }
 
-  public static String STATUS_DESCRIPTION_NONE = "None";
-  public static String STATUS_DESCRIPTION_INIT = "Init";
+  public static final String STATUS_DESCRIPTION_NONE = "None";
+  public static final String STATUS_DESCRIPTION_INIT = "Init";
+  public static final String STATUS_DESCRIPTION_NO_HELIX_STATE = "Helix state does not exist";
 
-  private static String _noHelixState = "Helix state does not exist";
-
-  private static int MAX_RESOURCE_NAMES_TO_LOG = 5;
+  private static final int MAX_RESOURCE_NAMES_TO_LOG = 5;
 
   /**
    * Callback that returns the status of the service.
@@ -173,7 +171,8 @@ public class ServiceStatus {
         }
       }
       _numTotalResourcesToMonitor = _resourcesToMonitor.size();
-      _minResourcesStartCount = (int)Math.round(Math.ceil(minResourcesStartPercent * _numTotalResourcesToMonitor/100));
+      _minResourcesStartCount =
+          (int) Math.round(Math.ceil(minResourcesStartPercent * _numTotalResourcesToMonitor / 100));
 
       LOGGER.info("Monitoring {} resources: {} for start up of instance {}", _numTotalResourcesToMonitor,
           getResourceListAsString(), _instanceName);
@@ -189,7 +188,8 @@ public class ServiceStatus {
       _resourcesToMonitor = new HashSet<>(resourcesToMonitor);
       _numTotalResourcesToMonitor = _resourcesToMonitor.size();
 
-      _minResourcesStartCount = (int)Math.round(Math.ceil(minResourcesStartPercent * _numTotalResourcesToMonitor/100));
+      _minResourcesStartCount =
+          (int) Math.round(Math.ceil(minResourcesStartPercent * _numTotalResourcesToMonitor / 100));
       LOGGER.info("Monitoring {} resources: {} for start up of instance {}", _numTotalResourcesToMonitor,
           getResourceListAsString(), _instanceName);
     }
@@ -238,19 +238,18 @@ public class ServiceStatus {
           _resourceIterator = _resourcesToMonitor.iterator();
         }
         resourceName = _resourceIterator.next();
-        Pair<Status, String> statusPair = evaluateResourceStatus(resourceName);
+        StatusDescriptionPair statusDescriptionPair = evaluateResourceStatus(resourceName);
 
-        Status status = statusPair.getFirst();
-        if (status == Status.GOOD) {
+        if (statusDescriptionPair._status == Status.GOOD) {
           // Resource is done starting up, remove it from the set
           _resourceIterator.remove();
         } else {
           _statusDescription = String
               .format("%s, waitingFor=%s, resource=%s, numResourcesLeft=%d, numTotalResources=%d, minStartCount=%d,",
-                  statusPair.getSecond(), getMatchName(), resourceName, _resourcesToMonitor.size(),
+                  statusDescriptionPair._description, getMatchName(), resourceName, _resourcesToMonitor.size(),
                   _numTotalResourcesToMonitor, _minResourcesStartCount);
 
-          return status;
+          return statusDescriptionPair._status;
         }
       }
 
@@ -275,35 +274,36 @@ public class ServiceStatus {
         _resourceIterator = _resourcesToMonitor.iterator();
         while (_resourceIterator.hasNext()) {
           String resource = _resourceIterator.next();
-          Pair<Status, String> statusPair = evaluateResourceStatus(resource);
-          if (statusPair.getFirst() == Status.GOOD) {
+          StatusDescriptionPair statusDescriptionPair = evaluateResourceStatus(resource);
+          if (statusDescriptionPair._status == Status.GOOD) {
             _resourceIterator.remove();
           } else {
-            LOGGER.info("Resource: {}, StatusDescription: {}", resource, statusPair.getSecond());
+            LOGGER.info("Resource: {}, StatusDescription: {}", resource, statusDescriptionPair._description);
             if (--logCount <= 0) {
               break;
             }
           }
         }
-        _statusDescription = String.format("waitingFor=%s, numResourcesLeft=%d, numTotalResources=%d, minStartCount=%d,"
-                + " resourceList=%s", getMatchName(), _resourcesToMonitor.size(), _numTotalResourcesToMonitor,
-            _minResourcesStartCount, getResourceListAsString());
+        _statusDescription = String
+            .format("waitingFor=%s, numResourcesLeft=%d, numTotalResources=%d, minStartCount=%d," + " resourceList=%s",
+                getMatchName(), _resourcesToMonitor.size(), _numTotalResourcesToMonitor, _minResourcesStartCount,
+                getResourceListAsString());
         LOGGER.info("Instance {} returning GOOD because {}", _instanceName, _statusDescription);
       }
 
       return Status.GOOD;
     }
 
-    private Pair<Status, String> evaluateResourceStatus(String resourceName) {
+    private StatusDescriptionPair evaluateResourceStatus(String resourceName) {
       IdealState idealState = getResourceIdealState(resourceName);
       // If the resource has been removed or disabled, ignore it
       if (idealState == null || !idealState.isEnabled()) {
-        return new Pair(Status.GOOD, STATUS_DESCRIPTION_NONE);
+        return new StatusDescriptionPair(Status.GOOD, STATUS_DESCRIPTION_NONE);
       }
 
       T helixState = getState(resourceName);
       if (helixState == null) {
-        return new Pair(Status.STARTING, _noHelixState);
+        return new StatusDescriptionPair(Status.STARTING, STATUS_DESCRIPTION_NO_HELIX_STATE);
       }
 
       // Check that all partitions that are supposed to be in any state other than OFFLINE have the same status in the
@@ -325,13 +325,13 @@ public class ServiceStatus {
           if ("ERROR".equals(currentStateStatus)) {
             LOGGER.error(String.format("Resource: %s, partition: %s is in ERROR state", resourceName, partitionName));
           } else {
-            String description = String.format("partition=%s, expected=%s, found=%s", partitionName,
-                idealStateStatus, currentStateStatus);
-            return new Pair(Status.STARTING, description);
+            String description = String
+                .format("partition=%s, expected=%s, found=%s", partitionName, idealStateStatus, currentStateStatus);
+            return new StatusDescriptionPair(Status.STARTING, description);
           }
         }
       }
-      return new Pair(Status.GOOD, STATUS_DESCRIPTION_NONE);
+      return new StatusDescriptionPair(Status.GOOD, STATUS_DESCRIPTION_NONE);
     }
 
     private String getResourceListAsString() {
@@ -358,6 +358,7 @@ public class ServiceStatus {
    */
   public static class IdealStateAndCurrentStateMatchServiceStatusCallback extends IdealStateMatchServiceStatusCallback<CurrentState> {
     private static final String MATCH_NAME = "CurrentStateMatch";
+
     public IdealStateAndCurrentStateMatchServiceStatusCallback(HelixManager helixManager, String clusterName,
         String instanceName, double minResourcesStartPercent) {
       super(helixManager, clusterName, instanceName, minResourcesStartPercent);
@@ -394,6 +395,7 @@ public class ServiceStatus {
    */
   public static class IdealStateAndExternalViewMatchServiceStatusCallback extends IdealStateMatchServiceStatusCallback<ExternalView> {
     private static final String MATCH_NAME = "ExternalViewMatch";
+
     public IdealStateAndExternalViewMatchServiceStatusCallback(HelixManager helixManager, String clusterName,
         String instanceName, double minResourcesStartPercent) {
       super(helixManager, clusterName, instanceName, minResourcesStartPercent);
@@ -428,4 +430,14 @@ public class ServiceStatus {
       return MATCH_NAME;
     }
   }
+
+  private static class StatusDescriptionPair {
+    Status _status;
+    String _description;
+
+    StatusDescriptionPair(Status status, String description) {
+      _status = status;
+      _description = description;
+    }
+  }
 }
diff --git a/pinot-integration-tests/src/test/java/org/apache/pinot/integration/tests/OfflineClusterIntegrationTest.java b/pinot-integration-tests/src/test/java/org/apache/pinot/integration/tests/OfflineClusterIntegrationTest.java
index ad9884d..14b694f 100644
--- a/pinot-integration-tests/src/test/java/org/apache/pinot/integration/tests/OfflineClusterIntegrationTest.java
+++ b/pinot-integration-tests/src/test/java/org/apache/pinot/integration/tests/OfflineClusterIntegrationTest.java
@@ -20,7 +20,6 @@ package org.apache.pinot.integration.tests;
 
 import com.fasterxml.jackson.databind.JsonNode;
 import com.fasterxml.jackson.databind.node.ObjectNode;
-import com.google.common.base.Function;
 import com.google.common.collect.ImmutableList;
 import java.io.File;
 import java.io.IOException;
@@ -32,7 +31,6 @@ import java.util.List;
 import java.util.concurrent.ExecutorService;
 import java.util.concurrent.Executors;
 import java.util.concurrent.TimeUnit;
-import javax.annotation.Nullable;
 import org.apache.commons.io.FileUtils;
 import org.apache.pinot.common.config.TableConfig;
 import org.apache.pinot.common.utils.CommonConstants;
@@ -61,7 +59,7 @@ public class OfflineClusterIntegrationTest extends BaseClusterIntegrationTestSet
   private static final String TEST_UPDATED_INVERTED_INDEX_QUERY =
       "SELECT COUNT(*) FROM mytable WHERE DivActualElapsedTime = 305";
 
-  private static final List<String> UPDATED_BLOOM_FLITER_COLUMNS = Arrays.asList("Carrier");
+  private static final List<String> UPDATED_BLOOM_FILTER_COLUMNS = Collections.singletonList("Carrier");
   private static final String TEST_UPDATED_BLOOM_FILTER_QUERY = "SELECT COUNT(*) FROM mytable WHERE Carrier = 'CA'";
 
   // For default columns test
@@ -95,23 +93,6 @@ public class OfflineClusterIntegrationTest extends BaseClusterIntegrationTestSet
     startBrokers(getNumBrokers());
     startServers(getNumServers());
 
-    // Set up service status callbacks
-    List<String> instances = _helixAdmin.getInstancesInCluster(_clusterName);
-    for (String instance : instances) {
-      if (instance.startsWith(CommonConstants.Helix.PREFIX_OF_BROKER_INSTANCE)) {
-        _serviceStatusCallbacks.add(
-            new ServiceStatus.IdealStateAndExternalViewMatchServiceStatusCallback(_helixManager, _clusterName, instance,
-                Collections.singletonList(CommonConstants.Helix.BROKER_RESOURCE_INSTANCE), 100.0));
-      }
-      if (instance.startsWith(CommonConstants.Helix.PREFIX_OF_SERVER_INSTANCE)) {
-        _serviceStatusCallbacks.add(new ServiceStatus.MultipleCallbackServiceStatusCallback(ImmutableList
-            .of(new ServiceStatus.IdealStateAndCurrentStateMatchServiceStatusCallback(_helixManager, _clusterName,
-                    instance, 100.0),
-                new ServiceStatus.IdealStateAndExternalViewMatchServiceStatusCallback(_helixManager, _clusterName,
-                    instance, 100.0))));
-      }
-    }
-
     // Unpack the Avro files
     List<File> avroFiles = unpackAvroData(_tempDir);
 
@@ -140,6 +121,21 @@ public class OfflineClusterIntegrationTest extends BaseClusterIntegrationTestSet
     // Upload all segments
     uploadSegments(_tarDir);
 
+    // Set up service status callbacks
+    // NOTE: put this step after creating the table and uploading all segments so that brokers and servers can find the
+    // resources to monitor
+    List<String> instances = _helixAdmin.getInstancesInCluster(_clusterName);
+    for (String instance : instances) {
+      if (instance.startsWith(CommonConstants.Helix.PREFIX_OF_BROKER_INSTANCE) || instance
+          .startsWith(CommonConstants.Helix.PREFIX_OF_SERVER_INSTANCE)) {
+        _serviceStatusCallbacks.add(new ServiceStatus.MultipleCallbackServiceStatusCallback(ImmutableList
+            .of(new ServiceStatus.IdealStateAndCurrentStateMatchServiceStatusCallback(_helixManager, _clusterName,
+                    instance, 100.0),
+                new ServiceStatus.IdealStateAndExternalViewMatchServiceStatusCallback(_helixManager, _clusterName,
+                    instance, 100.0))));
+      }
+    }
+
     // Wait for all documents loaded
     waitForAllDocsLoaded(600_000L);
   }
@@ -184,17 +180,14 @@ public class OfflineClusterIntegrationTest extends BaseClusterIntegrationTestSet
 
     sendPostRequest(_controllerBaseApiUrl + "/tables/mytable/segments/reload?type=offline", null);
 
-    TestUtils.waitForCondition(new Function<Void, Boolean>() {
-      @Override
-      public Boolean apply(@Nullable Void aVoid) {
-        try {
-          JsonNode queryResponse = postQuery(TEST_UPDATED_INVERTED_INDEX_QUERY);
-          // Total docs should not change during reload
-          assertEquals(queryResponse.get("totalDocs").asLong(), numTotalDocs);
-          return queryResponse.get("numEntriesScannedInFilter").asLong() == 0L;
-        } catch (Exception e) {
-          throw new RuntimeException(e);
-        }
+    TestUtils.waitForCondition(aVoid -> {
+      try {
+        JsonNode queryResponse1 = postQuery(TEST_UPDATED_INVERTED_INDEX_QUERY);
+        // Total docs should not change during reload
+        assertEquals(queryResponse1.get("totalDocs").asLong(), numTotalDocs);
+        return queryResponse1.get("numEntriesScannedInFilter").asLong() == 0L;
+      } catch (Exception e) {
+        throw new RuntimeException(e);
       }
     }, 600_000L, "Failed to generate inverted index");
   }
@@ -208,23 +201,20 @@ public class OfflineClusterIntegrationTest extends BaseClusterIntegrationTestSet
 
     // Update table config and trigger reload
     updateOfflineTable(getTableName(), null, null, null, null, getLoadMode(), SegmentVersion.v1, null,
-        UPDATED_BLOOM_FLITER_COLUMNS, getTaskConfig());
+        UPDATED_BLOOM_FILTER_COLUMNS, getTaskConfig());
 
     updateTableConfiguration();
 
     sendPostRequest(_controllerBaseApiUrl + "/tables/mytable/segments/reload?type=offline", null);
 
-    TestUtils.waitForCondition(new Function<Void, Boolean>() {
-      @Override
-      public Boolean apply(@Nullable Void aVoid) {
-        try {
-          JsonNode queryResponse = postQuery(TEST_UPDATED_BLOOM_FILTER_QUERY);
-          // Total docs should not change during reload
-          assertEquals(queryResponse.get("totalDocs").asLong(), numTotalDocs);
-          return queryResponse.get("numSegmentsProcessed").asLong() == 0L;
-        } catch (Exception e) {
-          throw new RuntimeException(e);
-        }
+    TestUtils.waitForCondition(aVoid -> {
+      try {
+        JsonNode queryResponse1 = postQuery(TEST_UPDATED_BLOOM_FILTER_QUERY);
+        // Total docs should not change during reload
+        assertEquals(queryResponse1.get("totalDocs").asLong(), numTotalDocs);
+        return queryResponse1.get("numSegmentsProcessed").asLong() == 0L;
+      } catch (Exception e) {
+        throw new RuntimeException(e);
       }
     }, 600_000L, "Failed to generate inverted index");
   }
@@ -285,22 +275,19 @@ public class OfflineClusterIntegrationTest extends BaseClusterIntegrationTestSet
       errorMessage = "Failed to remove default columns";
     }
 
-    TestUtils.waitForCondition(new Function<Void, Boolean>() {
-      @Override
-      public Boolean apply(@Nullable Void aVoid) {
-        try {
-          JsonNode queryResponse = postQuery(TEST_DEFAULT_COLUMNS_QUERY);
-          // Total docs should not change during reload
-          assertEquals(queryResponse.get("totalDocs").asLong(), numTotalDocs);
-          long count = queryResponse.get("aggregationResults").get(0).get("value").asLong();
-          if (withExtraColumns) {
-            return count == numTotalDocs;
-          } else {
-            return count == 0;
-          }
-        } catch (Exception e) {
-          throw new RuntimeException(e);
+    TestUtils.waitForCondition(aVoid -> {
+      try {
+        JsonNode queryResponse = postQuery(TEST_DEFAULT_COLUMNS_QUERY);
+        // Total docs should not change during reload
+        assertEquals(queryResponse.get("totalDocs").asLong(), numTotalDocs);
+        long count = queryResponse.get("aggregationResults").get(0).get("value").asLong();
+        if (withExtraColumns) {
+          return count == numTotalDocs;
+        } else {
+          return count == 0;
         }
+      } catch (Exception e) {
+        throw new RuntimeException(e);
       }
     }, 600_000L, errorMessage);
   }


---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscribe@pinot.apache.org
For additional commands, e-mail: commits-help@pinot.apache.org