You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@pinot.apache.org by ja...@apache.org on 2019/04/03 05:09:11 UTC

[incubator-pinot] branch master updated: In ServiceStatus, replace Apache math Pair with inner StatusDescriptionPair class (#4062)

This is an automated email from the ASF dual-hosted git repository.

jackie pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/incubator-pinot.git


The following commit(s) were added to refs/heads/master by this push:
     new 402e5ad  In ServiceStatus, replace Apache math Pair with inner StatusDescriptionPair class (#4062)
402e5ad is described below

commit 402e5ada034bb9ade04f6ea4a3690155876b3efa
Author: Xiaotian (Jackie) Jiang <17...@users.noreply.github.com>
AuthorDate: Tue Apr 2 22:09:07 2019 -0700

    In ServiceStatus, replace Apache math Pair with inner StatusDescriptionPair class (#4062)
    
    Also fix the integration test to correctly include resources in ServiceStatus
---
 .../apache/pinot/common/utils/ServiceStatus.java   |  62 +++++++------
 .../tests/OfflineClusterIntegrationTest.java       | 103 +++++++++------------
 2 files changed, 81 insertions(+), 84 deletions(-)

diff --git a/pinot-common/src/main/java/org/apache/pinot/common/utils/ServiceStatus.java b/pinot-common/src/main/java/org/apache/pinot/common/utils/ServiceStatus.java
index 3fc8d01..d8792a6 100644
--- a/pinot-common/src/main/java/org/apache/pinot/common/utils/ServiceStatus.java
+++ b/pinot-common/src/main/java/org/apache/pinot/common/utils/ServiceStatus.java
@@ -24,7 +24,6 @@ import java.util.Iterator;
 import java.util.List;
 import java.util.Map;
 import java.util.Set;
-import org.apache.commons.math3.util.Pair;
 import org.apache.helix.HelixAdmin;
 import org.apache.helix.HelixDataAccessor;
 import org.apache.helix.HelixManager;
@@ -49,12 +48,11 @@ public class ServiceStatus {
     STARTING, GOOD, BAD
   }
 
-  public static String STATUS_DESCRIPTION_NONE = "None";
-  public static String STATUS_DESCRIPTION_INIT = "Init";
+  public static final String STATUS_DESCRIPTION_NONE = "None";
+  public static final String STATUS_DESCRIPTION_INIT = "Init";
+  public static final String STATUS_DESCRIPTION_NO_HELIX_STATE = "Helix state does not exist";
 
-  private static String _noHelixState = "Helix state does not exist";
-
-  private static int MAX_RESOURCE_NAMES_TO_LOG = 5;
+  private static final int MAX_RESOURCE_NAMES_TO_LOG = 5;
 
   /**
    * Callback that returns the status of the service.
@@ -173,7 +171,7 @@ public class ServiceStatus {
         }
       }
       _numTotalResourcesToMonitor = _resourcesToMonitor.size();
-      _minResourcesStartCount = (int)Math.round(Math.ceil(minResourcesStartPercent * _numTotalResourcesToMonitor/100));
+      _minResourcesStartCount = (int) Math.ceil(minResourcesStartPercent * _numTotalResourcesToMonitor / 100);
 
       LOGGER.info("Monitoring {} resources: {} for start up of instance {}", _numTotalResourcesToMonitor,
           getResourceListAsString(), _instanceName);
@@ -189,7 +187,7 @@ public class ServiceStatus {
       _resourcesToMonitor = new HashSet<>(resourcesToMonitor);
       _numTotalResourcesToMonitor = _resourcesToMonitor.size();
 
-      _minResourcesStartCount = (int)Math.round(Math.ceil(minResourcesStartPercent * _numTotalResourcesToMonitor/100));
+      _minResourcesStartCount = (int) Math.ceil(minResourcesStartPercent * _numTotalResourcesToMonitor / 100);
       LOGGER.info("Monitoring {} resources: {} for start up of instance {}", _numTotalResourcesToMonitor,
           getResourceListAsString(), _instanceName);
     }
@@ -238,19 +236,18 @@ public class ServiceStatus {
           _resourceIterator = _resourcesToMonitor.iterator();
         }
         resourceName = _resourceIterator.next();
-        Pair<Status, String> statusPair = evaluateResourceStatus(resourceName);
+        StatusDescriptionPair statusDescriptionPair = evaluateResourceStatus(resourceName);
 
-        Status status = statusPair.getFirst();
-        if (status == Status.GOOD) {
+        if (statusDescriptionPair._status == Status.GOOD) {
           // Resource is done starting up, remove it from the set
           _resourceIterator.remove();
         } else {
           _statusDescription = String
               .format("%s, waitingFor=%s, resource=%s, numResourcesLeft=%d, numTotalResources=%d, minStartCount=%d,",
-                  statusPair.getSecond(), getMatchName(), resourceName, _resourcesToMonitor.size(),
+                  statusDescriptionPair._description, getMatchName(), resourceName, _resourcesToMonitor.size(),
                   _numTotalResourcesToMonitor, _minResourcesStartCount);
 
-          return status;
+          return statusDescriptionPair._status;
         }
       }
 
@@ -275,35 +272,36 @@ public class ServiceStatus {
         _resourceIterator = _resourcesToMonitor.iterator();
         while (_resourceIterator.hasNext()) {
           String resource = _resourceIterator.next();
-          Pair<Status, String> statusPair = evaluateResourceStatus(resource);
-          if (statusPair.getFirst() == Status.GOOD) {
+          StatusDescriptionPair statusDescriptionPair = evaluateResourceStatus(resource);
+          if (statusDescriptionPair._status == Status.GOOD) {
             _resourceIterator.remove();
           } else {
-            LOGGER.info("Resource: {}, StatusDescription: {}", resource, statusPair.getSecond());
+            LOGGER.info("Resource: {}, StatusDescription: {}", resource, statusDescriptionPair._description);
             if (--logCount <= 0) {
               break;
             }
           }
         }
-        _statusDescription = String.format("waitingFor=%s, numResourcesLeft=%d, numTotalResources=%d, minStartCount=%d,"
-                + " resourceList=%s", getMatchName(), _resourcesToMonitor.size(), _numTotalResourcesToMonitor,
-            _minResourcesStartCount, getResourceListAsString());
+        _statusDescription = String
+            .format("waitingFor=%s, numResourcesLeft=%d, numTotalResources=%d, minStartCount=%d," + " resourceList=%s",
+                getMatchName(), _resourcesToMonitor.size(), _numTotalResourcesToMonitor, _minResourcesStartCount,
+                getResourceListAsString());
         LOGGER.info("Instance {} returning GOOD because {}", _instanceName, _statusDescription);
       }
 
       return Status.GOOD;
     }
 
-    private Pair<Status, String> evaluateResourceStatus(String resourceName) {
+    private StatusDescriptionPair evaluateResourceStatus(String resourceName) {
       IdealState idealState = getResourceIdealState(resourceName);
       // If the resource has been removed or disabled, ignore it
       if (idealState == null || !idealState.isEnabled()) {
-        return new Pair(Status.GOOD, STATUS_DESCRIPTION_NONE);
+        return new StatusDescriptionPair(Status.GOOD, STATUS_DESCRIPTION_NONE);
       }
 
       T helixState = getState(resourceName);
       if (helixState == null) {
-        return new Pair(Status.STARTING, _noHelixState);
+        return new StatusDescriptionPair(Status.STARTING, STATUS_DESCRIPTION_NO_HELIX_STATE);
       }
 
       // Check that all partitions that are supposed to be in any state other than OFFLINE have the same status in the
@@ -325,13 +323,13 @@ public class ServiceStatus {
           if ("ERROR".equals(currentStateStatus)) {
             LOGGER.error(String.format("Resource: %s, partition: %s is in ERROR state", resourceName, partitionName));
           } else {
-            String description = String.format("partition=%s, expected=%s, found=%s", partitionName,
-                idealStateStatus, currentStateStatus);
-            return new Pair(Status.STARTING, description);
+            String description = String
+                .format("partition=%s, expected=%s, found=%s", partitionName, idealStateStatus, currentStateStatus);
+            return new StatusDescriptionPair(Status.STARTING, description);
           }
         }
       }
-      return new Pair(Status.GOOD, STATUS_DESCRIPTION_NONE);
+      return new StatusDescriptionPair(Status.GOOD, STATUS_DESCRIPTION_NONE);
     }
 
     private String getResourceListAsString() {
@@ -358,6 +356,7 @@ public class ServiceStatus {
    */
   public static class IdealStateAndCurrentStateMatchServiceStatusCallback extends IdealStateMatchServiceStatusCallback<CurrentState> {
     private static final String MATCH_NAME = "CurrentStateMatch";
+
     public IdealStateAndCurrentStateMatchServiceStatusCallback(HelixManager helixManager, String clusterName,
         String instanceName, double minResourcesStartPercent) {
       super(helixManager, clusterName, instanceName, minResourcesStartPercent);
@@ -394,6 +393,7 @@ public class ServiceStatus {
    */
   public static class IdealStateAndExternalViewMatchServiceStatusCallback extends IdealStateMatchServiceStatusCallback<ExternalView> {
     private static final String MATCH_NAME = "ExternalViewMatch";
+
     public IdealStateAndExternalViewMatchServiceStatusCallback(HelixManager helixManager, String clusterName,
         String instanceName, double minResourcesStartPercent) {
       super(helixManager, clusterName, instanceName, minResourcesStartPercent);
@@ -428,4 +428,14 @@ public class ServiceStatus {
       return MATCH_NAME;
     }
   }
+
+  private static class StatusDescriptionPair {
+    Status _status;
+    String _description;
+
+    StatusDescriptionPair(Status status, String description) {
+      _status = status;
+      _description = description;
+    }
+  }
 }
diff --git a/pinot-integration-tests/src/test/java/org/apache/pinot/integration/tests/OfflineClusterIntegrationTest.java b/pinot-integration-tests/src/test/java/org/apache/pinot/integration/tests/OfflineClusterIntegrationTest.java
index ad9884d..14b694f 100644
--- a/pinot-integration-tests/src/test/java/org/apache/pinot/integration/tests/OfflineClusterIntegrationTest.java
+++ b/pinot-integration-tests/src/test/java/org/apache/pinot/integration/tests/OfflineClusterIntegrationTest.java
@@ -20,7 +20,6 @@ package org.apache.pinot.integration.tests;
 
 import com.fasterxml.jackson.databind.JsonNode;
 import com.fasterxml.jackson.databind.node.ObjectNode;
-import com.google.common.base.Function;
 import com.google.common.collect.ImmutableList;
 import java.io.File;
 import java.io.IOException;
@@ -32,7 +31,6 @@ import java.util.List;
 import java.util.concurrent.ExecutorService;
 import java.util.concurrent.Executors;
 import java.util.concurrent.TimeUnit;
-import javax.annotation.Nullable;
 import org.apache.commons.io.FileUtils;
 import org.apache.pinot.common.config.TableConfig;
 import org.apache.pinot.common.utils.CommonConstants;
@@ -61,7 +59,7 @@ public class OfflineClusterIntegrationTest extends BaseClusterIntegrationTestSet
   private static final String TEST_UPDATED_INVERTED_INDEX_QUERY =
       "SELECT COUNT(*) FROM mytable WHERE DivActualElapsedTime = 305";
 
-  private static final List<String> UPDATED_BLOOM_FLITER_COLUMNS = Arrays.asList("Carrier");
+  private static final List<String> UPDATED_BLOOM_FILTER_COLUMNS = Collections.singletonList("Carrier");
   private static final String TEST_UPDATED_BLOOM_FILTER_QUERY = "SELECT COUNT(*) FROM mytable WHERE Carrier = 'CA'";
 
   // For default columns test
@@ -95,23 +93,6 @@ public class OfflineClusterIntegrationTest extends BaseClusterIntegrationTestSet
     startBrokers(getNumBrokers());
     startServers(getNumServers());
 
-    // Set up service status callbacks
-    List<String> instances = _helixAdmin.getInstancesInCluster(_clusterName);
-    for (String instance : instances) {
-      if (instance.startsWith(CommonConstants.Helix.PREFIX_OF_BROKER_INSTANCE)) {
-        _serviceStatusCallbacks.add(
-            new ServiceStatus.IdealStateAndExternalViewMatchServiceStatusCallback(_helixManager, _clusterName, instance,
-                Collections.singletonList(CommonConstants.Helix.BROKER_RESOURCE_INSTANCE), 100.0));
-      }
-      if (instance.startsWith(CommonConstants.Helix.PREFIX_OF_SERVER_INSTANCE)) {
-        _serviceStatusCallbacks.add(new ServiceStatus.MultipleCallbackServiceStatusCallback(ImmutableList
-            .of(new ServiceStatus.IdealStateAndCurrentStateMatchServiceStatusCallback(_helixManager, _clusterName,
-                    instance, 100.0),
-                new ServiceStatus.IdealStateAndExternalViewMatchServiceStatusCallback(_helixManager, _clusterName,
-                    instance, 100.0))));
-      }
-    }
-
     // Unpack the Avro files
     List<File> avroFiles = unpackAvroData(_tempDir);
 
@@ -140,6 +121,21 @@ public class OfflineClusterIntegrationTest extends BaseClusterIntegrationTestSet
     // Upload all segments
     uploadSegments(_tarDir);
 
+    // Set up service status callbacks
+    // NOTE: put this step after creating the table and uploading all segments so that brokers and servers can find the
+    // resources to monitor
+    List<String> instances = _helixAdmin.getInstancesInCluster(_clusterName);
+    for (String instance : instances) {
+      if (instance.startsWith(CommonConstants.Helix.PREFIX_OF_BROKER_INSTANCE) || instance
+          .startsWith(CommonConstants.Helix.PREFIX_OF_SERVER_INSTANCE)) {
+        _serviceStatusCallbacks.add(new ServiceStatus.MultipleCallbackServiceStatusCallback(ImmutableList
+            .of(new ServiceStatus.IdealStateAndCurrentStateMatchServiceStatusCallback(_helixManager, _clusterName,
+                    instance, 100.0),
+                new ServiceStatus.IdealStateAndExternalViewMatchServiceStatusCallback(_helixManager, _clusterName,
+                    instance, 100.0))));
+      }
+    }
+
     // Wait for all documents loaded
     waitForAllDocsLoaded(600_000L);
   }
@@ -184,17 +180,14 @@ public class OfflineClusterIntegrationTest extends BaseClusterIntegrationTestSet
 
     sendPostRequest(_controllerBaseApiUrl + "/tables/mytable/segments/reload?type=offline", null);
 
-    TestUtils.waitForCondition(new Function<Void, Boolean>() {
-      @Override
-      public Boolean apply(@Nullable Void aVoid) {
-        try {
-          JsonNode queryResponse = postQuery(TEST_UPDATED_INVERTED_INDEX_QUERY);
-          // Total docs should not change during reload
-          assertEquals(queryResponse.get("totalDocs").asLong(), numTotalDocs);
-          return queryResponse.get("numEntriesScannedInFilter").asLong() == 0L;
-        } catch (Exception e) {
-          throw new RuntimeException(e);
-        }
+    TestUtils.waitForCondition(aVoid -> {
+      try {
+        JsonNode queryResponse1 = postQuery(TEST_UPDATED_INVERTED_INDEX_QUERY);
+        // Total docs should not change during reload
+        assertEquals(queryResponse1.get("totalDocs").asLong(), numTotalDocs);
+        return queryResponse1.get("numEntriesScannedInFilter").asLong() == 0L;
+      } catch (Exception e) {
+        throw new RuntimeException(e);
       }
     }, 600_000L, "Failed to generate inverted index");
   }
@@ -208,23 +201,20 @@ public class OfflineClusterIntegrationTest extends BaseClusterIntegrationTestSet
 
     // Update table config and trigger reload
     updateOfflineTable(getTableName(), null, null, null, null, getLoadMode(), SegmentVersion.v1, null,
-        UPDATED_BLOOM_FLITER_COLUMNS, getTaskConfig());
+        UPDATED_BLOOM_FILTER_COLUMNS, getTaskConfig());
 
     updateTableConfiguration();
 
     sendPostRequest(_controllerBaseApiUrl + "/tables/mytable/segments/reload?type=offline", null);
 
-    TestUtils.waitForCondition(new Function<Void, Boolean>() {
-      @Override
-      public Boolean apply(@Nullable Void aVoid) {
-        try {
-          JsonNode queryResponse = postQuery(TEST_UPDATED_BLOOM_FILTER_QUERY);
-          // Total docs should not change during reload
-          assertEquals(queryResponse.get("totalDocs").asLong(), numTotalDocs);
-          return queryResponse.get("numSegmentsProcessed").asLong() == 0L;
-        } catch (Exception e) {
-          throw new RuntimeException(e);
-        }
+    TestUtils.waitForCondition(aVoid -> {
+      try {
+        JsonNode queryResponse1 = postQuery(TEST_UPDATED_BLOOM_FILTER_QUERY);
+        // Total docs should not change during reload
+        assertEquals(queryResponse1.get("totalDocs").asLong(), numTotalDocs);
+        return queryResponse1.get("numSegmentsProcessed").asLong() == 0L;
+      } catch (Exception e) {
+        throw new RuntimeException(e);
       }
     }, 600_000L, "Failed to generate inverted index");
   }
@@ -285,22 +275,19 @@ public class OfflineClusterIntegrationTest extends BaseClusterIntegrationTestSet
       errorMessage = "Failed to remove default columns";
     }
 
-    TestUtils.waitForCondition(new Function<Void, Boolean>() {
-      @Override
-      public Boolean apply(@Nullable Void aVoid) {
-        try {
-          JsonNode queryResponse = postQuery(TEST_DEFAULT_COLUMNS_QUERY);
-          // Total docs should not change during reload
-          assertEquals(queryResponse.get("totalDocs").asLong(), numTotalDocs);
-          long count = queryResponse.get("aggregationResults").get(0).get("value").asLong();
-          if (withExtraColumns) {
-            return count == numTotalDocs;
-          } else {
-            return count == 0;
-          }
-        } catch (Exception e) {
-          throw new RuntimeException(e);
+    TestUtils.waitForCondition(aVoid -> {
+      try {
+        JsonNode queryResponse = postQuery(TEST_DEFAULT_COLUMNS_QUERY);
+        // Total docs should not change during reload
+        assertEquals(queryResponse.get("totalDocs").asLong(), numTotalDocs);
+        long count = queryResponse.get("aggregationResults").get(0).get("value").asLong();
+        if (withExtraColumns) {
+          return count == numTotalDocs;
+        } else {
+          return count == 0;
         }
+      } catch (Exception e) {
+        throw new RuntimeException(e);
       }
     }, 600_000L, errorMessage);
   }


---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscribe@pinot.apache.org
For additional commands, e-mail: commits-help@pinot.apache.org