You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@pinot.apache.org by sn...@apache.org on 2019/06/06 00:39:04 UTC
[incubator-pinot] branch master updated: Add
OfflineSegmentIntervalChecker to PeriodicTasksIntegrationTests (#4278)
This is an automated email from the ASF dual-hosted git repository.
snlee pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/incubator-pinot.git
The following commit(s) were added to refs/heads/master by this push:
new 8ffa381 Add OfflineSegmentIntervalChecker to PeriodicTasksIntegrationTests (#4278)
8ffa381 is described below
commit 8ffa381f32a034aae070e087e16db2e416df88cc
Author: Seunghyun Lee <sn...@linkedin.com>
AuthorDate: Wed Jun 5 17:38:59 2019 -0700
Add OfflineSegmentIntervalChecker to PeriodicTasksIntegrationTests (#4278)
* Add OfflineSegmentIntervalChecker to PeriodicTasksIntegrationTests
* Addressed comments
---
.../pinot/common/metrics/ValidationMetrics.java | 14 ++++++--
.../apache/pinot/controller/ControllerConf.java | 5 +++
.../validation/OfflineSegmentIntervalChecker.java | 5 +++
.../ControllerPeriodicTasksIntegrationTests.java | 37 +++++++++++++++++++---
4 files changed, 54 insertions(+), 7 deletions(-)
diff --git a/pinot-common/src/main/java/org/apache/pinot/common/metrics/ValidationMetrics.java b/pinot-common/src/main/java/org/apache/pinot/common/metrics/ValidationMetrics.java
index 88d300b..8f4b4fd 100644
--- a/pinot-common/src/main/java/org/apache/pinot/common/metrics/ValidationMetrics.java
+++ b/pinot-common/src/main/java/org/apache/pinot/common/metrics/ValidationMetrics.java
@@ -18,6 +18,7 @@
*/
package org.apache.pinot.common.metrics;
+import com.google.common.annotations.VisibleForTesting;
import com.yammer.metrics.core.Gauge;
import com.yammer.metrics.core.MetricName;
import com.yammer.metrics.core.MetricsRegistry;
@@ -33,7 +34,6 @@ import java.util.concurrent.TimeUnit;
*/
public class ValidationMetrics {
private final MetricsRegistry _metricsRegistry;
-
private final Map<String, Long> _gaugeValues = new HashMap<>();
private final Set<MetricName> _metricNames = new HashSet<>();
@@ -216,7 +216,8 @@ public class ValidationMetrics {
makeGauge(fullGaugeName, makeMetricName(fullGaugeName), _storedValueGaugeFactory, segmentCount);
}
- private String makeGaugeName(final String resource, final String gaugeName) {
+ @VisibleForTesting
+ public static String makeGaugeName(final String resource, final String gaugeName) {
return "pinot.controller." + resource + "." + gaugeName;
}
@@ -246,4 +247,13 @@ public class ValidationMetrics {
_metricNames.clear();
_gaugeValues.clear();
}
+
+ @VisibleForTesting
+ public long getValueOfGauge(final String fullGaugeName) {
+ Long value = _gaugeValues.get(fullGaugeName);
+ if (value == null) {
+ return 0;
+ }
+ return value;
+ }
}
diff --git a/pinot-controller/src/main/java/org/apache/pinot/controller/ControllerConf.java b/pinot-controller/src/main/java/org/apache/pinot/controller/ControllerConf.java
index 0b23fdb..d5a620e 100644
--- a/pinot-controller/src/main/java/org/apache/pinot/controller/ControllerConf.java
+++ b/pinot-controller/src/main/java/org/apache/pinot/controller/ControllerConf.java
@@ -629,6 +629,11 @@ public class ControllerConf extends PropertiesConfiguration {
initialDelayInSeconds);
}
+ public void setOfflineSegmentIntervalCheckerInitialDelayInSeconds(long initialDelayInSeconds) {
+ setProperty(ControllerPeriodicTasksConf.OFFLINE_SEGMENT_INTERVAL_CHECKER_INITIAL_DELAY_IN_SECONDS,
+ initialDelayInSeconds);
+ }
+
public long getPeriodicTaskInitialDelayInSeconds() {
return ControllerPeriodicTasksConf.getRandomInitialDelayInSeconds();
}
diff --git a/pinot-controller/src/main/java/org/apache/pinot/controller/validation/OfflineSegmentIntervalChecker.java b/pinot-controller/src/main/java/org/apache/pinot/controller/validation/OfflineSegmentIntervalChecker.java
index 08e7c41..7f19395 100644
--- a/pinot-controller/src/main/java/org/apache/pinot/controller/validation/OfflineSegmentIntervalChecker.java
+++ b/pinot-controller/src/main/java/org/apache/pinot/controller/validation/OfflineSegmentIntervalChecker.java
@@ -183,6 +183,11 @@ public class OfflineSegmentIntervalChecker extends ControllerPeriodicTask<Void>
return numTotalDocs;
}
+ @VisibleForTesting
+ public ValidationMetrics getValidationMetrics() {
+ return _validationMetrics;
+ }
+
@Override
public void cleanUpTask() {
LOGGER.info("Unregister all the validation metrics.");
diff --git a/pinot-integration-tests/src/test/java/org/apache/pinot/integration/tests/ControllerPeriodicTasksIntegrationTests.java b/pinot-integration-tests/src/test/java/org/apache/pinot/integration/tests/ControllerPeriodicTasksIntegrationTests.java
index a947373..a0a2c5c 100644
--- a/pinot-integration-tests/src/test/java/org/apache/pinot/integration/tests/ControllerPeriodicTasksIntegrationTests.java
+++ b/pinot-integration-tests/src/test/java/org/apache/pinot/integration/tests/ControllerPeriodicTasksIntegrationTests.java
@@ -31,9 +31,7 @@ import java.util.concurrent.Executors;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.atomic.AtomicInteger;
import javax.annotation.Nullable;
-import kafka.server.KafkaServerStartable;
import org.apache.commons.io.FileUtils;
-import org.apache.helix.HelixAdmin;
import org.apache.helix.model.IdealState;
import org.apache.helix.model.InstanceConfig;
import org.apache.pinot.common.config.TableConfig;
@@ -45,10 +43,12 @@ import org.apache.pinot.common.data.Schema;
import org.apache.pinot.common.metrics.ControllerGauge;
import org.apache.pinot.common.metrics.ControllerMeter;
import org.apache.pinot.common.metrics.ControllerMetrics;
+import org.apache.pinot.common.metrics.ValidationMetrics;
import org.apache.pinot.common.utils.CommonConstants;
import org.apache.pinot.common.utils.helix.HelixHelper;
import org.apache.pinot.common.utils.retry.RetryPolicies;
import org.apache.pinot.controller.ControllerConf;
+import org.apache.pinot.controller.validation.OfflineSegmentIntervalChecker;
import org.apache.pinot.core.indexsegment.generator.SegmentVersion;
import org.apache.pinot.core.realtime.impl.kafka.KafkaStarterUtils;
import org.apache.pinot.util.TestUtils;
@@ -74,7 +74,8 @@ import org.testng.annotations.Test;
* See group = "segmentStatusChecker" for example.
* The tables needed for the test will be created in beforeTask(), and dropped in afterTask()
*
- * The groups run sequentially in the order: segmentStatusChecker -> realtimeSegmentRelocation -> brokerResourceValidationManager -> ....
+ * The groups run sequentially in the order: segmentStatusChecker -> realtimeSegmentRelocation ->
+ * brokerResourceValidationManager -> OfflineSegmentIntervalChecker ....
*/
public class ControllerPeriodicTasksIntegrationTests extends BaseClusterIntegrationTestSet {
@@ -107,8 +108,10 @@ public class ControllerPeriodicTasksIntegrationTests extends BaseClusterIntegrat
controllerConf.setStatusCheckerFrequencyInSeconds(PERIODIC_TASK_FREQ_SECONDS);
controllerConf.setRealtimeSegmentRelocationInitialDelayInSeconds(PERIODIC_TASK_INITIAL_DELAY_SECONDS);
controllerConf.setRealtimeSegmentRelocatorFrequency(PERIODIC_TASK_FREQ);
- controllerConf.setBrokerResourceValidationInitialDelayInSeconds(PERIODIC_TASK_FREQ_SECONDS);
+ controllerConf.setBrokerResourceValidationInitialDelayInSeconds(PERIODIC_TASK_INITIAL_DELAY_SECONDS);
controllerConf.setBrokerResourceValidationFrequencyInSeconds(PERIODIC_TASK_FREQ_SECONDS);
+ controllerConf.setOfflineSegmentIntervalCheckerInitialDelayInSeconds(PERIODIC_TASK_INITIAL_DELAY_SECONDS);
+ controllerConf.setOfflineSegmentIntervalCheckerFrequencyInSeconds(PERIODIC_TASK_FREQ_SECONDS);
startController(controllerConf);
startBroker();
@@ -472,7 +475,31 @@ public class ControllerPeriodicTasksIntegrationTests extends BaseClusterIntegrat
dropOfflineTable(table2);
}
- // TODO: tests for other ControllerPeriodicTasks (RetentionManager, OfflineSegmentIntervalChecker, RealtimeSegmentValidationManager)
+ @Test(groups = "offlineSegmentIntervalChecker", dependsOnGroups = "brokerResourceValidationManager")
+ public void testOfflineSegmentIntervalChecker()
+ throws Exception {
+ OfflineSegmentIntervalChecker offlineSegmentIntervalChecker = _controllerStarter.getOfflineSegmentIntervalChecker();
+ ValidationMetrics validationMetrics = offlineSegmentIntervalChecker.getValidationMetrics();
+
+ String tablNameWithType = TableNameBuilder.OFFLINE.tableNameWithType(DEFAULT_TABLE_NAME);
+
+ // Wait until OfflineSegmentIntervalChecker gets executed
+ TestUtils.waitForCondition(
+ input -> validationMetrics.getValueOfGauge(ValidationMetrics.makeGaugeName(tablNameWithType, "SegmentCount"))
+ > 0, 60_000, "Timed out waiting for OfflineSegmentIntervalChecker");
+
+ // Test the validation metrics values updated by OfflineSegmentIntervalChecker against the known values
+ // from segment metadata
+ Assert.assertEquals(
+ validationMetrics.getValueOfGauge(ValidationMetrics.makeGaugeName(tablNameWithType, "SegmentCount")), 12);
+ Assert.assertEquals(
+ validationMetrics.getValueOfGauge(ValidationMetrics.makeGaugeName(tablNameWithType, "missingSegmentCount")), 0);
+ Assert.assertEquals(
+ validationMetrics.getValueOfGauge(ValidationMetrics.makeGaugeName(tablNameWithType, "TotalDocumentCount")),
+ 115545);
+ }
+
+ // TODO: tests for other ControllerPeriodicTasks (RetentionManagert , RealtimeSegmentValidationManager)
@Override
protected boolean isUsingNewConfigFormat() {
---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscribe@pinot.apache.org
For additional commands, e-mail: commits-help@pinot.apache.org