You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@pinot.apache.org by ja...@apache.org on 2019/04/03 05:09:11 UTC
[incubator-pinot] branch master updated: In ServiceStatus,
replace Apache math Pair with inner StatusDescriptionPair class
(#4062)
This is an automated email from the ASF dual-hosted git repository.
jackie pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/incubator-pinot.git
The following commit(s) were added to refs/heads/master by this push:
new 402e5ad In ServiceStatus, replace Apache math Pair with inner StatusDescriptionPair class (#4062)
402e5ad is described below
commit 402e5ada034bb9ade04f6ea4a3690155876b3efa
Author: Xiaotian (Jackie) Jiang <17...@users.noreply.github.com>
AuthorDate: Tue Apr 2 22:09:07 2019 -0700
In ServiceStatus, replace Apache math Pair with inner StatusDescriptionPair class (#4062)
Also fix the integration test to correctly include resources in ServiceStatus
---
.../apache/pinot/common/utils/ServiceStatus.java | 62 +++++++------
.../tests/OfflineClusterIntegrationTest.java | 103 +++++++++------------
2 files changed, 81 insertions(+), 84 deletions(-)
diff --git a/pinot-common/src/main/java/org/apache/pinot/common/utils/ServiceStatus.java b/pinot-common/src/main/java/org/apache/pinot/common/utils/ServiceStatus.java
index 3fc8d01..d8792a6 100644
--- a/pinot-common/src/main/java/org/apache/pinot/common/utils/ServiceStatus.java
+++ b/pinot-common/src/main/java/org/apache/pinot/common/utils/ServiceStatus.java
@@ -24,7 +24,6 @@ import java.util.Iterator;
import java.util.List;
import java.util.Map;
import java.util.Set;
-import org.apache.commons.math3.util.Pair;
import org.apache.helix.HelixAdmin;
import org.apache.helix.HelixDataAccessor;
import org.apache.helix.HelixManager;
@@ -49,12 +48,11 @@ public class ServiceStatus {
STARTING, GOOD, BAD
}
- public static String STATUS_DESCRIPTION_NONE = "None";
- public static String STATUS_DESCRIPTION_INIT = "Init";
+ public static final String STATUS_DESCRIPTION_NONE = "None";
+ public static final String STATUS_DESCRIPTION_INIT = "Init";
+ public static final String STATUS_DESCRIPTION_NO_HELIX_STATE = "Helix state does not exist";
- private static String _noHelixState = "Helix state does not exist";
-
- private static int MAX_RESOURCE_NAMES_TO_LOG = 5;
+ private static final int MAX_RESOURCE_NAMES_TO_LOG = 5;
/**
* Callback that returns the status of the service.
@@ -173,7 +171,7 @@ public class ServiceStatus {
}
}
_numTotalResourcesToMonitor = _resourcesToMonitor.size();
- _minResourcesStartCount = (int)Math.round(Math.ceil(minResourcesStartPercent * _numTotalResourcesToMonitor/100));
+ _minResourcesStartCount = (int) Math.ceil(minResourcesStartPercent * _numTotalResourcesToMonitor / 100);
LOGGER.info("Monitoring {} resources: {} for start up of instance {}", _numTotalResourcesToMonitor,
getResourceListAsString(), _instanceName);
@@ -189,7 +187,7 @@ public class ServiceStatus {
_resourcesToMonitor = new HashSet<>(resourcesToMonitor);
_numTotalResourcesToMonitor = _resourcesToMonitor.size();
- _minResourcesStartCount = (int)Math.round(Math.ceil(minResourcesStartPercent * _numTotalResourcesToMonitor/100));
+ _minResourcesStartCount = (int) Math.ceil(minResourcesStartPercent * _numTotalResourcesToMonitor / 100);
LOGGER.info("Monitoring {} resources: {} for start up of instance {}", _numTotalResourcesToMonitor,
getResourceListAsString(), _instanceName);
}
@@ -238,19 +236,18 @@ public class ServiceStatus {
_resourceIterator = _resourcesToMonitor.iterator();
}
resourceName = _resourceIterator.next();
- Pair<Status, String> statusPair = evaluateResourceStatus(resourceName);
+ StatusDescriptionPair statusDescriptionPair = evaluateResourceStatus(resourceName);
- Status status = statusPair.getFirst();
- if (status == Status.GOOD) {
+ if (statusDescriptionPair._status == Status.GOOD) {
// Resource is done starting up, remove it from the set
_resourceIterator.remove();
} else {
_statusDescription = String
.format("%s, waitingFor=%s, resource=%s, numResourcesLeft=%d, numTotalResources=%d, minStartCount=%d,",
- statusPair.getSecond(), getMatchName(), resourceName, _resourcesToMonitor.size(),
+ statusDescriptionPair._description, getMatchName(), resourceName, _resourcesToMonitor.size(),
_numTotalResourcesToMonitor, _minResourcesStartCount);
- return status;
+ return statusDescriptionPair._status;
}
}
@@ -275,35 +272,36 @@ public class ServiceStatus {
_resourceIterator = _resourcesToMonitor.iterator();
while (_resourceIterator.hasNext()) {
String resource = _resourceIterator.next();
- Pair<Status, String> statusPair = evaluateResourceStatus(resource);
- if (statusPair.getFirst() == Status.GOOD) {
+ StatusDescriptionPair statusDescriptionPair = evaluateResourceStatus(resource);
+ if (statusDescriptionPair._status == Status.GOOD) {
_resourceIterator.remove();
} else {
- LOGGER.info("Resource: {}, StatusDescription: {}", resource, statusPair.getSecond());
+ LOGGER.info("Resource: {}, StatusDescription: {}", resource, statusDescriptionPair._description);
if (--logCount <= 0) {
break;
}
}
}
- _statusDescription = String.format("waitingFor=%s, numResourcesLeft=%d, numTotalResources=%d, minStartCount=%d,"
- + " resourceList=%s", getMatchName(), _resourcesToMonitor.size(), _numTotalResourcesToMonitor,
- _minResourcesStartCount, getResourceListAsString());
+ _statusDescription = String
+ .format("waitingFor=%s, numResourcesLeft=%d, numTotalResources=%d, minStartCount=%d," + " resourceList=%s",
+ getMatchName(), _resourcesToMonitor.size(), _numTotalResourcesToMonitor, _minResourcesStartCount,
+ getResourceListAsString());
LOGGER.info("Instance {} returning GOOD because {}", _instanceName, _statusDescription);
}
return Status.GOOD;
}
- private Pair<Status, String> evaluateResourceStatus(String resourceName) {
+ private StatusDescriptionPair evaluateResourceStatus(String resourceName) {
IdealState idealState = getResourceIdealState(resourceName);
// If the resource has been removed or disabled, ignore it
if (idealState == null || !idealState.isEnabled()) {
- return new Pair(Status.GOOD, STATUS_DESCRIPTION_NONE);
+ return new StatusDescriptionPair(Status.GOOD, STATUS_DESCRIPTION_NONE);
}
T helixState = getState(resourceName);
if (helixState == null) {
- return new Pair(Status.STARTING, _noHelixState);
+ return new StatusDescriptionPair(Status.STARTING, STATUS_DESCRIPTION_NO_HELIX_STATE);
}
// Check that all partitions that are supposed to be in any state other than OFFLINE have the same status in the
@@ -325,13 +323,13 @@ public class ServiceStatus {
if ("ERROR".equals(currentStateStatus)) {
LOGGER.error(String.format("Resource: %s, partition: %s is in ERROR state", resourceName, partitionName));
} else {
- String description = String.format("partition=%s, expected=%s, found=%s", partitionName,
- idealStateStatus, currentStateStatus);
- return new Pair(Status.STARTING, description);
+ String description = String
+ .format("partition=%s, expected=%s, found=%s", partitionName, idealStateStatus, currentStateStatus);
+ return new StatusDescriptionPair(Status.STARTING, description);
}
}
}
- return new Pair(Status.GOOD, STATUS_DESCRIPTION_NONE);
+ return new StatusDescriptionPair(Status.GOOD, STATUS_DESCRIPTION_NONE);
}
private String getResourceListAsString() {
@@ -358,6 +356,7 @@ public class ServiceStatus {
*/
public static class IdealStateAndCurrentStateMatchServiceStatusCallback extends IdealStateMatchServiceStatusCallback<CurrentState> {
private static final String MATCH_NAME = "CurrentStateMatch";
+
public IdealStateAndCurrentStateMatchServiceStatusCallback(HelixManager helixManager, String clusterName,
String instanceName, double minResourcesStartPercent) {
super(helixManager, clusterName, instanceName, minResourcesStartPercent);
@@ -394,6 +393,7 @@ public class ServiceStatus {
*/
public static class IdealStateAndExternalViewMatchServiceStatusCallback extends IdealStateMatchServiceStatusCallback<ExternalView> {
private static final String MATCH_NAME = "ExternalViewMatch";
+
public IdealStateAndExternalViewMatchServiceStatusCallback(HelixManager helixManager, String clusterName,
String instanceName, double minResourcesStartPercent) {
super(helixManager, clusterName, instanceName, minResourcesStartPercent);
@@ -428,4 +428,14 @@ public class ServiceStatus {
return MATCH_NAME;
}
}
+
+ private static class StatusDescriptionPair {
+ Status _status;
+ String _description;
+
+ StatusDescriptionPair(Status status, String description) {
+ _status = status;
+ _description = description;
+ }
+ }
}
diff --git a/pinot-integration-tests/src/test/java/org/apache/pinot/integration/tests/OfflineClusterIntegrationTest.java b/pinot-integration-tests/src/test/java/org/apache/pinot/integration/tests/OfflineClusterIntegrationTest.java
index ad9884d..14b694f 100644
--- a/pinot-integration-tests/src/test/java/org/apache/pinot/integration/tests/OfflineClusterIntegrationTest.java
+++ b/pinot-integration-tests/src/test/java/org/apache/pinot/integration/tests/OfflineClusterIntegrationTest.java
@@ -20,7 +20,6 @@ package org.apache.pinot.integration.tests;
import com.fasterxml.jackson.databind.JsonNode;
import com.fasterxml.jackson.databind.node.ObjectNode;
-import com.google.common.base.Function;
import com.google.common.collect.ImmutableList;
import java.io.File;
import java.io.IOException;
@@ -32,7 +31,6 @@ import java.util.List;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;
import java.util.concurrent.TimeUnit;
-import javax.annotation.Nullable;
import org.apache.commons.io.FileUtils;
import org.apache.pinot.common.config.TableConfig;
import org.apache.pinot.common.utils.CommonConstants;
@@ -61,7 +59,7 @@ public class OfflineClusterIntegrationTest extends BaseClusterIntegrationTestSet
private static final String TEST_UPDATED_INVERTED_INDEX_QUERY =
"SELECT COUNT(*) FROM mytable WHERE DivActualElapsedTime = 305";
- private static final List<String> UPDATED_BLOOM_FLITER_COLUMNS = Arrays.asList("Carrier");
+ private static final List<String> UPDATED_BLOOM_FILTER_COLUMNS = Collections.singletonList("Carrier");
private static final String TEST_UPDATED_BLOOM_FILTER_QUERY = "SELECT COUNT(*) FROM mytable WHERE Carrier = 'CA'";
// For default columns test
@@ -95,23 +93,6 @@ public class OfflineClusterIntegrationTest extends BaseClusterIntegrationTestSet
startBrokers(getNumBrokers());
startServers(getNumServers());
- // Set up service status callbacks
- List<String> instances = _helixAdmin.getInstancesInCluster(_clusterName);
- for (String instance : instances) {
- if (instance.startsWith(CommonConstants.Helix.PREFIX_OF_BROKER_INSTANCE)) {
- _serviceStatusCallbacks.add(
- new ServiceStatus.IdealStateAndExternalViewMatchServiceStatusCallback(_helixManager, _clusterName, instance,
- Collections.singletonList(CommonConstants.Helix.BROKER_RESOURCE_INSTANCE), 100.0));
- }
- if (instance.startsWith(CommonConstants.Helix.PREFIX_OF_SERVER_INSTANCE)) {
- _serviceStatusCallbacks.add(new ServiceStatus.MultipleCallbackServiceStatusCallback(ImmutableList
- .of(new ServiceStatus.IdealStateAndCurrentStateMatchServiceStatusCallback(_helixManager, _clusterName,
- instance, 100.0),
- new ServiceStatus.IdealStateAndExternalViewMatchServiceStatusCallback(_helixManager, _clusterName,
- instance, 100.0))));
- }
- }
-
// Unpack the Avro files
List<File> avroFiles = unpackAvroData(_tempDir);
@@ -140,6 +121,21 @@ public class OfflineClusterIntegrationTest extends BaseClusterIntegrationTestSet
// Upload all segments
uploadSegments(_tarDir);
+ // Set up service status callbacks
+ // NOTE: put this step after creating the table and uploading all segments so that brokers and servers can find the
+ // resources to monitor
+ List<String> instances = _helixAdmin.getInstancesInCluster(_clusterName);
+ for (String instance : instances) {
+ if (instance.startsWith(CommonConstants.Helix.PREFIX_OF_BROKER_INSTANCE) || instance
+ .startsWith(CommonConstants.Helix.PREFIX_OF_SERVER_INSTANCE)) {
+ _serviceStatusCallbacks.add(new ServiceStatus.MultipleCallbackServiceStatusCallback(ImmutableList
+ .of(new ServiceStatus.IdealStateAndCurrentStateMatchServiceStatusCallback(_helixManager, _clusterName,
+ instance, 100.0),
+ new ServiceStatus.IdealStateAndExternalViewMatchServiceStatusCallback(_helixManager, _clusterName,
+ instance, 100.0))));
+ }
+ }
+
// Wait for all documents loaded
waitForAllDocsLoaded(600_000L);
}
@@ -184,17 +180,14 @@ public class OfflineClusterIntegrationTest extends BaseClusterIntegrationTestSet
sendPostRequest(_controllerBaseApiUrl + "/tables/mytable/segments/reload?type=offline", null);
- TestUtils.waitForCondition(new Function<Void, Boolean>() {
- @Override
- public Boolean apply(@Nullable Void aVoid) {
- try {
- JsonNode queryResponse = postQuery(TEST_UPDATED_INVERTED_INDEX_QUERY);
- // Total docs should not change during reload
- assertEquals(queryResponse.get("totalDocs").asLong(), numTotalDocs);
- return queryResponse.get("numEntriesScannedInFilter").asLong() == 0L;
- } catch (Exception e) {
- throw new RuntimeException(e);
- }
+ TestUtils.waitForCondition(aVoid -> {
+ try {
+ JsonNode queryResponse1 = postQuery(TEST_UPDATED_INVERTED_INDEX_QUERY);
+ // Total docs should not change during reload
+ assertEquals(queryResponse1.get("totalDocs").asLong(), numTotalDocs);
+ return queryResponse1.get("numEntriesScannedInFilter").asLong() == 0L;
+ } catch (Exception e) {
+ throw new RuntimeException(e);
}
}, 600_000L, "Failed to generate inverted index");
}
@@ -208,23 +201,20 @@ public class OfflineClusterIntegrationTest extends BaseClusterIntegrationTestSet
// Update table config and trigger reload
updateOfflineTable(getTableName(), null, null, null, null, getLoadMode(), SegmentVersion.v1, null,
- UPDATED_BLOOM_FLITER_COLUMNS, getTaskConfig());
+ UPDATED_BLOOM_FILTER_COLUMNS, getTaskConfig());
updateTableConfiguration();
sendPostRequest(_controllerBaseApiUrl + "/tables/mytable/segments/reload?type=offline", null);
- TestUtils.waitForCondition(new Function<Void, Boolean>() {
- @Override
- public Boolean apply(@Nullable Void aVoid) {
- try {
- JsonNode queryResponse = postQuery(TEST_UPDATED_BLOOM_FILTER_QUERY);
- // Total docs should not change during reload
- assertEquals(queryResponse.get("totalDocs").asLong(), numTotalDocs);
- return queryResponse.get("numSegmentsProcessed").asLong() == 0L;
- } catch (Exception e) {
- throw new RuntimeException(e);
- }
+ TestUtils.waitForCondition(aVoid -> {
+ try {
+ JsonNode queryResponse1 = postQuery(TEST_UPDATED_BLOOM_FILTER_QUERY);
+ // Total docs should not change during reload
+ assertEquals(queryResponse1.get("totalDocs").asLong(), numTotalDocs);
+ return queryResponse1.get("numSegmentsProcessed").asLong() == 0L;
+ } catch (Exception e) {
+ throw new RuntimeException(e);
}
}, 600_000L, "Failed to generate inverted index");
}
@@ -285,22 +275,19 @@ public class OfflineClusterIntegrationTest extends BaseClusterIntegrationTestSet
errorMessage = "Failed to remove default columns";
}
- TestUtils.waitForCondition(new Function<Void, Boolean>() {
- @Override
- public Boolean apply(@Nullable Void aVoid) {
- try {
- JsonNode queryResponse = postQuery(TEST_DEFAULT_COLUMNS_QUERY);
- // Total docs should not change during reload
- assertEquals(queryResponse.get("totalDocs").asLong(), numTotalDocs);
- long count = queryResponse.get("aggregationResults").get(0).get("value").asLong();
- if (withExtraColumns) {
- return count == numTotalDocs;
- } else {
- return count == 0;
- }
- } catch (Exception e) {
- throw new RuntimeException(e);
+ TestUtils.waitForCondition(aVoid -> {
+ try {
+ JsonNode queryResponse = postQuery(TEST_DEFAULT_COLUMNS_QUERY);
+ // Total docs should not change during reload
+ assertEquals(queryResponse.get("totalDocs").asLong(), numTotalDocs);
+ long count = queryResponse.get("aggregationResults").get(0).get("value").asLong();
+ if (withExtraColumns) {
+ return count == numTotalDocs;
+ } else {
+ return count == 0;
}
+ } catch (Exception e) {
+ throw new RuntimeException(e);
}
}, 600_000L, errorMessage);
}
---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscribe@pinot.apache.org
For additional commands, e-mail: commits-help@pinot.apache.org