You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@pinot.apache.org by ja...@apache.org on 2019/02/12 23:49:08 UTC

[incubator-pinot] branch periodic_task updated: Address comments

This is an automated email from the ASF dual-hosted git repository.

jackie pushed a commit to branch periodic_task
in repository https://gitbox.apache.org/repos/asf/incubator-pinot.git


The following commit(s) were added to refs/heads/periodic_task by this push:
     new 7654227  Address comments
7654227 is described below

commit 765422772fd460ac55e3986e8dfed2fd2f6b5001
Author: Jackie (Xiaotian) Jiang <xa...@linkedin.com>
AuthorDate: Tue Feb 12 15:48:54 2019 -0800

    Address comments
---
 .../apache/pinot/common/metrics/ControllerMeter.java   |  2 ++
 .../controller/helix/core/minion/PinotTaskManager.java | 18 ++++++++----------
 .../core/periodictask/ControllerPeriodicTask.java      | 15 +++++++++++----
 .../pinot/core/periodictask/BasePeriodicTask.java      | 18 +++++++++++-------
 4 files changed, 32 insertions(+), 21 deletions(-)

diff --git a/pinot-common/src/main/java/org/apache/pinot/common/metrics/ControllerMeter.java b/pinot-common/src/main/java/org/apache/pinot/common/metrics/ControllerMeter.java
index 9d522bc..9f9299e 100644
--- a/pinot-common/src/main/java/org/apache/pinot/common/metrics/ControllerMeter.java
+++ b/pinot-common/src/main/java/org/apache/pinot/common/metrics/ControllerMeter.java
@@ -56,6 +56,8 @@ public enum ControllerMeter implements AbstractMetrics.Meter {
   // Introducing a new stream agnostic metric to replace LLC_KAFKA_DATA_LOSS.
   // We can phase out LLC_KAFKA_DATA_LOSS once we have collected sufficient metrics for the new one
   LLC_STREAM_DATA_LOSS("dataLoss", false),
+  CONTROLLER_PERIODIC_TASK_RUN("periodicTaskRun", false),
+  CONTROLLER_PERIODIC_TASK_ERROR("periodicTaskError", false),
   NUMBER_TIMES_SCHEDULE_TASKS_CALLED("tasks", true),
   NUMBER_TASKS_SUBMITTED("tasks", false),
   NUMBER_SEGMENT_UPLOAD_TIMEOUT_EXCEEDED("SegmentUploadTimeouts", true),
diff --git a/pinot-controller/src/main/java/org/apache/pinot/controller/helix/core/minion/PinotTaskManager.java b/pinot-controller/src/main/java/org/apache/pinot/controller/helix/core/minion/PinotTaskManager.java
index 4defa04..6bb5c9d 100644
--- a/pinot-controller/src/main/java/org/apache/pinot/controller/helix/core/minion/PinotTaskManager.java
+++ b/pinot-controller/src/main/java/org/apache/pinot/controller/helix/core/minion/PinotTaskManager.java
@@ -88,10 +88,12 @@ public class PinotTaskManager extends ControllerPeriodicTask<Void> {
   public synchronized Map<String, String> scheduleTasks() {
     Map<String, String> tasksScheduled = scheduleTasks(_pinotHelixResourceManager.getAllTables());
 
-    // For non-leader controller, perform non-leader cleanup
+    // NOTE: this method might be called from the Rest API instead of the periodic task scheduler on non-leader
+    // controllers, so if the task is stopped (non-leader controller), clean up the task
     if (!isStarted()) {
-      nonLeaderCleanUp();
+      cleanUpTask();
     }
+
     return tasksScheduled;
   }
 
@@ -149,13 +151,6 @@ public class PinotTaskManager extends ControllerPeriodicTask<Void> {
     return tasksScheduled;
   }
 
-  private void nonLeaderCleanUp() {
-    LOGGER.info("Performing non-leader cleanups");
-    for (String taskType : _taskGeneratorRegistry.getAllTaskTypes()) {
-      _taskGeneratorRegistry.getTaskGenerator(taskType).nonLeaderCleanUp();
-    }
-  }
-
   @Override
   protected void processTables(List<String> tableNamesWithType) {
     scheduleTasks(tableNamesWithType);
@@ -163,6 +158,9 @@ public class PinotTaskManager extends ControllerPeriodicTask<Void> {
 
   @Override
   public void cleanUpTask() {
-    nonLeaderCleanUp();
+    LOGGER.info("Cleaning up all task generators");
+    for (String taskType : _taskGeneratorRegistry.getAllTaskTypes()) {
+      _taskGeneratorRegistry.getTaskGenerator(taskType).nonLeaderCleanUp();
+    }
   }
 }
diff --git a/pinot-controller/src/main/java/org/apache/pinot/controller/helix/core/periodictask/ControllerPeriodicTask.java b/pinot-controller/src/main/java/org/apache/pinot/controller/helix/core/periodictask/ControllerPeriodicTask.java
index eecc5e4..7e764f3 100644
--- a/pinot-controller/src/main/java/org/apache/pinot/controller/helix/core/periodictask/ControllerPeriodicTask.java
+++ b/pinot-controller/src/main/java/org/apache/pinot/controller/helix/core/periodictask/ControllerPeriodicTask.java
@@ -21,6 +21,7 @@ package org.apache.pinot.controller.helix.core.periodictask;
 import java.util.List;
 import javax.annotation.concurrent.ThreadSafe;
 import org.apache.pinot.common.metrics.ControllerGauge;
+import org.apache.pinot.common.metrics.ControllerMeter;
 import org.apache.pinot.common.metrics.ControllerMetrics;
 import org.apache.pinot.controller.helix.core.PinotHelixResourceManager;
 import org.apache.pinot.core.periodictask.BasePeriodicTask;
@@ -50,7 +51,13 @@ public abstract class ControllerPeriodicTask<C> extends BasePeriodicTask {
 
   @Override
   protected final void runTask() {
-    processTables(_pinotHelixResourceManager.getAllTables());
+    _controllerMetrics.addMeteredTableValue(_taskName, ControllerMeter.CONTROLLER_PERIODIC_TASK_RUN, 1L);
+    try {
+      processTables(_pinotHelixResourceManager.getAllTables());
+    } catch (Exception e) {
+      LOGGER.error("Caught exception while running task: {}", _taskName, e);
+      _controllerMetrics.addMeteredTableValue(_taskName, ControllerMeter.CONTROLLER_PERIODIC_TASK_ERROR, 1L);
+    }
   }
 
   /**
@@ -82,7 +89,7 @@ public abstract class ControllerPeriodicTask<C> extends BasePeriodicTask {
   }
 
   /**
-   * Can be override to provide context before processing the tables.
+   * Can be overridden to provide context before processing the tables.
    */
   protected C preprocess() {
     return null;
@@ -106,14 +113,14 @@ public abstract class ControllerPeriodicTask<C> extends BasePeriodicTask {
   }
 
   /**
-   * Can be override to perform cleanups after processing the tables.
+   * Can be overridden to perform cleanups after processing the tables.
    */
   protected void postprocess(C context) {
     postprocess();
   }
 
   /**
-   * Can be override to perform cleanups after processing the tables.
+   * Can be overridden to perform cleanups after processing the tables.
    */
   protected void postprocess() {
   }
diff --git a/pinot-core/src/main/java/org/apache/pinot/core/periodictask/BasePeriodicTask.java b/pinot-core/src/main/java/org/apache/pinot/core/periodictask/BasePeriodicTask.java
index 74b6b64..eb15aa9 100644
--- a/pinot-core/src/main/java/org/apache/pinot/core/periodictask/BasePeriodicTask.java
+++ b/pinot-core/src/main/java/org/apache/pinot/core/periodictask/BasePeriodicTask.java
@@ -30,7 +30,7 @@ import org.slf4j.LoggerFactory;
 public abstract class BasePeriodicTask implements PeriodicTask {
   private static final Logger LOGGER = LoggerFactory.getLogger(BasePeriodicTask.class);
 
-  // Wait for at most 5 minutes while calling stop() for task to terminate
+  // Wait for at most 30 seconds while calling stop() for task to terminate
   private static final long MAX_PERIODIC_TASK_STOP_TIME_MILLIS = 30_000L;
 
   protected final String _taskName;
@@ -97,7 +97,9 @@ public abstract class BasePeriodicTask implements PeriodicTask {
   }
 
   /**
-   * Can be override for extra task setups.
+   * Can be overridden for extra task setups. This method will be called when the periodic task starts.
+   * <p>
+   * Possible setups include adding or resetting the metric values.
    */
   protected void setUpTask() {
   }
@@ -136,8 +138,8 @@ public abstract class BasePeriodicTask implements PeriodicTask {
   /**
    * {@inheritDoc}
    * <p>
-   * This method sets {@code started} flag to false. If the task is running, this method will block for at most 5
-   * minutes until the task finishes.
+   * This method sets {@code started} flag to false. If the task is running, this method will block for at most 30
+   * seconds until the task finishes.
    */
   @Override
   public final synchronized void stop() {
@@ -164,9 +166,9 @@ public abstract class BasePeriodicTask implements PeriodicTask {
       }
       long waitTimeMs = System.currentTimeMillis() - startTimeMs;
       if (_running) {
-        LOGGER.info("Task: {} is finished in {}ms", waitTimeMs);
-      } else {
         LOGGER.warn("Task: {} is not finished in {}ms", waitTimeMs);
+      } else {
+        LOGGER.info("Task: {} is finished in {}ms", waitTimeMs);
       }
     }
 
@@ -178,7 +180,9 @@ public abstract class BasePeriodicTask implements PeriodicTask {
   }
 
   /**
-   * Can be override for extra task cleanups.
+   * Can be overridden for extra task cleanups. This method will be called when the periodic task stops.
+   * <p>
+   * Possible cleanups include removing or resetting the metric values.
    */
   protected void cleanUpTask() {
   }


---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscribe@pinot.apache.org
For additional commands, e-mail: commits-help@pinot.apache.org