You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@pinot.apache.org by sn...@apache.org on 2019/06/06 00:39:04 UTC

[incubator-pinot] branch master updated: Add OfflineSegmentIntervalChecker to PeriodicTasksIntegrationTests (#4278)

This is an automated email from the ASF dual-hosted git repository.

snlee pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/incubator-pinot.git


The following commit(s) were added to refs/heads/master by this push:
     new 8ffa381  Add OfflineSegmentIntervalChecker to PeriodicTasksIntegrationTests (#4278)
8ffa381 is described below

commit 8ffa381f32a034aae070e087e16db2e416df88cc
Author: Seunghyun Lee <sn...@linkedin.com>
AuthorDate: Wed Jun 5 17:38:59 2019 -0700

    Add OfflineSegmentIntervalChecker to PeriodicTasksIntegrationTests (#4278)
    
    * Add OfflineSegmentIntervalChecker to PeriodicTasksIntegrationTests
    
    * Addressed comments
---
 .../pinot/common/metrics/ValidationMetrics.java    | 14 ++++++--
 .../apache/pinot/controller/ControllerConf.java    |  5 +++
 .../validation/OfflineSegmentIntervalChecker.java  |  5 +++
 .../ControllerPeriodicTasksIntegrationTests.java   | 37 +++++++++++++++++++---
 4 files changed, 54 insertions(+), 7 deletions(-)

diff --git a/pinot-common/src/main/java/org/apache/pinot/common/metrics/ValidationMetrics.java b/pinot-common/src/main/java/org/apache/pinot/common/metrics/ValidationMetrics.java
index 88d300b..8f4b4fd 100644
--- a/pinot-common/src/main/java/org/apache/pinot/common/metrics/ValidationMetrics.java
+++ b/pinot-common/src/main/java/org/apache/pinot/common/metrics/ValidationMetrics.java
@@ -18,6 +18,7 @@
  */
 package org.apache.pinot.common.metrics;
 
+import com.google.common.annotations.VisibleForTesting;
 import com.yammer.metrics.core.Gauge;
 import com.yammer.metrics.core.MetricName;
 import com.yammer.metrics.core.MetricsRegistry;
@@ -33,7 +34,6 @@ import java.util.concurrent.TimeUnit;
  */
 public class ValidationMetrics {
   private final MetricsRegistry _metricsRegistry;
-
   private final Map<String, Long> _gaugeValues = new HashMap<>();
   private final Set<MetricName> _metricNames = new HashSet<>();
 
@@ -216,7 +216,8 @@ public class ValidationMetrics {
     makeGauge(fullGaugeName, makeMetricName(fullGaugeName), _storedValueGaugeFactory, segmentCount);
   }
 
-  private String makeGaugeName(final String resource, final String gaugeName) {
+  @VisibleForTesting
+  public static String makeGaugeName(final String resource, final String gaugeName) {
     return "pinot.controller." + resource + "." + gaugeName;
   }
 
@@ -246,4 +247,13 @@ public class ValidationMetrics {
     _metricNames.clear();
     _gaugeValues.clear();
   }
+
+  @VisibleForTesting
+  public long getValueOfGauge(final String fullGaugeName) {
+    Long value  = _gaugeValues.get(fullGaugeName);
+    if (value == null) {
+      return 0;
+    }
+    return value;
+  }
 }
diff --git a/pinot-controller/src/main/java/org/apache/pinot/controller/ControllerConf.java b/pinot-controller/src/main/java/org/apache/pinot/controller/ControllerConf.java
index 0b23fdb..d5a620e 100644
--- a/pinot-controller/src/main/java/org/apache/pinot/controller/ControllerConf.java
+++ b/pinot-controller/src/main/java/org/apache/pinot/controller/ControllerConf.java
@@ -629,6 +629,11 @@ public class ControllerConf extends PropertiesConfiguration {
         initialDelayInSeconds);
   }
 
+  public void setOfflineSegmentIntervalCheckerInitialDelayInSeconds(long initialDelayInSeconds) {
+    setProperty(ControllerPeriodicTasksConf.OFFLINE_SEGMENT_INTERVAL_CHECKER_INITIAL_DELAY_IN_SECONDS,
+        initialDelayInSeconds);
+  }
+
   public long getPeriodicTaskInitialDelayInSeconds() {
     return ControllerPeriodicTasksConf.getRandomInitialDelayInSeconds();
   }
diff --git a/pinot-controller/src/main/java/org/apache/pinot/controller/validation/OfflineSegmentIntervalChecker.java b/pinot-controller/src/main/java/org/apache/pinot/controller/validation/OfflineSegmentIntervalChecker.java
index 08e7c41..7f19395 100644
--- a/pinot-controller/src/main/java/org/apache/pinot/controller/validation/OfflineSegmentIntervalChecker.java
+++ b/pinot-controller/src/main/java/org/apache/pinot/controller/validation/OfflineSegmentIntervalChecker.java
@@ -183,6 +183,11 @@ public class OfflineSegmentIntervalChecker extends ControllerPeriodicTask<Void>
     return numTotalDocs;
   }
 
+  @VisibleForTesting
+  public ValidationMetrics getValidationMetrics() {
+    return _validationMetrics;
+  }
+
   @Override
   public void cleanUpTask() {
     LOGGER.info("Unregister all the validation metrics.");
diff --git a/pinot-integration-tests/src/test/java/org/apache/pinot/integration/tests/ControllerPeriodicTasksIntegrationTests.java b/pinot-integration-tests/src/test/java/org/apache/pinot/integration/tests/ControllerPeriodicTasksIntegrationTests.java
index a947373..a0a2c5c 100644
--- a/pinot-integration-tests/src/test/java/org/apache/pinot/integration/tests/ControllerPeriodicTasksIntegrationTests.java
+++ b/pinot-integration-tests/src/test/java/org/apache/pinot/integration/tests/ControllerPeriodicTasksIntegrationTests.java
@@ -31,9 +31,7 @@ import java.util.concurrent.Executors;
 import java.util.concurrent.TimeUnit;
 import java.util.concurrent.atomic.AtomicInteger;
 import javax.annotation.Nullable;
-import kafka.server.KafkaServerStartable;
 import org.apache.commons.io.FileUtils;
-import org.apache.helix.HelixAdmin;
 import org.apache.helix.model.IdealState;
 import org.apache.helix.model.InstanceConfig;
 import org.apache.pinot.common.config.TableConfig;
@@ -45,10 +43,12 @@ import org.apache.pinot.common.data.Schema;
 import org.apache.pinot.common.metrics.ControllerGauge;
 import org.apache.pinot.common.metrics.ControllerMeter;
 import org.apache.pinot.common.metrics.ControllerMetrics;
+import org.apache.pinot.common.metrics.ValidationMetrics;
 import org.apache.pinot.common.utils.CommonConstants;
 import org.apache.pinot.common.utils.helix.HelixHelper;
 import org.apache.pinot.common.utils.retry.RetryPolicies;
 import org.apache.pinot.controller.ControllerConf;
+import org.apache.pinot.controller.validation.OfflineSegmentIntervalChecker;
 import org.apache.pinot.core.indexsegment.generator.SegmentVersion;
 import org.apache.pinot.core.realtime.impl.kafka.KafkaStarterUtils;
 import org.apache.pinot.util.TestUtils;
@@ -74,7 +74,8 @@ import org.testng.annotations.Test;
  * See group = "segmentStatusChecker" for example.
  * The tables needed for the test will be created in beforeTask(), and dropped in afterTask()
  *
- * The groups run sequentially in the order: segmentStatusChecker -> realtimeSegmentRelocation -> brokerResourceValidationManager -> ....
+ * The groups run sequentially in the order: segmentStatusChecker -> realtimeSegmentRelocation ->
+ * brokerResourceValidationManager -> OfflineSegmentIntervalChecker ....
  */
 public class ControllerPeriodicTasksIntegrationTests extends BaseClusterIntegrationTestSet {
 
@@ -107,8 +108,10 @@ public class ControllerPeriodicTasksIntegrationTests extends BaseClusterIntegrat
     controllerConf.setStatusCheckerFrequencyInSeconds(PERIODIC_TASK_FREQ_SECONDS);
     controllerConf.setRealtimeSegmentRelocationInitialDelayInSeconds(PERIODIC_TASK_INITIAL_DELAY_SECONDS);
     controllerConf.setRealtimeSegmentRelocatorFrequency(PERIODIC_TASK_FREQ);
-    controllerConf.setBrokerResourceValidationInitialDelayInSeconds(PERIODIC_TASK_FREQ_SECONDS);
+    controllerConf.setBrokerResourceValidationInitialDelayInSeconds(PERIODIC_TASK_INITIAL_DELAY_SECONDS);
     controllerConf.setBrokerResourceValidationFrequencyInSeconds(PERIODIC_TASK_FREQ_SECONDS);
+    controllerConf.setOfflineSegmentIntervalCheckerInitialDelayInSeconds(PERIODIC_TASK_INITIAL_DELAY_SECONDS);
+    controllerConf.setOfflineSegmentIntervalCheckerFrequencyInSeconds(PERIODIC_TASK_FREQ_SECONDS);
 
     startController(controllerConf);
     startBroker();
@@ -472,7 +475,31 @@ public class ControllerPeriodicTasksIntegrationTests extends BaseClusterIntegrat
     dropOfflineTable(table2);
   }
 
-  // TODO: tests for other ControllerPeriodicTasks (RetentionManager, OfflineSegmentIntervalChecker, RealtimeSegmentValidationManager)
+  @Test(groups = "offlineSegmentIntervalChecker", dependsOnGroups = "brokerResourceValidationManager")
+  public void testOfflineSegmentIntervalChecker()
+      throws Exception {
+    OfflineSegmentIntervalChecker offlineSegmentIntervalChecker = _controllerStarter.getOfflineSegmentIntervalChecker();
+    ValidationMetrics validationMetrics = offlineSegmentIntervalChecker.getValidationMetrics();
+
+    String tablNameWithType = TableNameBuilder.OFFLINE.tableNameWithType(DEFAULT_TABLE_NAME);
+
+    // Wait until OfflineSegmentIntervalChecker gets executed
+    TestUtils.waitForCondition(
+        input -> validationMetrics.getValueOfGauge(ValidationMetrics.makeGaugeName(tablNameWithType, "SegmentCount"))
+            > 0, 60_000, "Timed out waiting for OfflineSegmentIntervalChecker");
+
+    // Test the validation metrics values updated by OfflineSegmentIntervalChecker against the known values
+    // from segment metadata
+    Assert.assertEquals(
+        validationMetrics.getValueOfGauge(ValidationMetrics.makeGaugeName(tablNameWithType, "SegmentCount")), 12);
+    Assert.assertEquals(
+        validationMetrics.getValueOfGauge(ValidationMetrics.makeGaugeName(tablNameWithType, "missingSegmentCount")), 0);
+    Assert.assertEquals(
+        validationMetrics.getValueOfGauge(ValidationMetrics.makeGaugeName(tablNameWithType, "TotalDocumentCount")),
+        115545);
+  }
+
+  // TODO: tests for other ControllerPeriodicTasks (RetentionManagert , RealtimeSegmentValidationManager)
 
   @Override
   protected boolean isUsingNewConfigFormat() {


---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscribe@pinot.apache.org
For additional commands, e-mail: commits-help@pinot.apache.org