You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@pinot.apache.org by ja...@apache.org on 2019/02/12 23:49:08 UTC
[incubator-pinot] branch periodic_task updated: Address comments
This is an automated email from the ASF dual-hosted git repository.
jackie pushed a commit to branch periodic_task
in repository https://gitbox.apache.org/repos/asf/incubator-pinot.git
The following commit(s) were added to refs/heads/periodic_task by this push:
new 7654227 Address comments
7654227 is described below
commit 765422772fd460ac55e3986e8dfed2fd2f6b5001
Author: Jackie (Xiaotian) Jiang <xa...@linkedin.com>
AuthorDate: Tue Feb 12 15:48:54 2019 -0800
Address comments
---
.../apache/pinot/common/metrics/ControllerMeter.java | 2 ++
.../controller/helix/core/minion/PinotTaskManager.java | 18 ++++++++----------
.../core/periodictask/ControllerPeriodicTask.java | 15 +++++++++++----
.../pinot/core/periodictask/BasePeriodicTask.java | 18 +++++++++++-------
4 files changed, 32 insertions(+), 21 deletions(-)
diff --git a/pinot-common/src/main/java/org/apache/pinot/common/metrics/ControllerMeter.java b/pinot-common/src/main/java/org/apache/pinot/common/metrics/ControllerMeter.java
index 9d522bc..9f9299e 100644
--- a/pinot-common/src/main/java/org/apache/pinot/common/metrics/ControllerMeter.java
+++ b/pinot-common/src/main/java/org/apache/pinot/common/metrics/ControllerMeter.java
@@ -56,6 +56,8 @@ public enum ControllerMeter implements AbstractMetrics.Meter {
// Introducing a new stream agnostic metric to replace LLC_KAFKA_DATA_LOSS.
// We can phase out LLC_KAFKA_DATA_LOSS once we have collected sufficient metrics for the new one
LLC_STREAM_DATA_LOSS("dataLoss", false),
+ CONTROLLER_PERIODIC_TASK_RUN("periodicTaskRun", false),
+ CONTROLLER_PERIODIC_TASK_ERROR("periodicTaskError", false),
NUMBER_TIMES_SCHEDULE_TASKS_CALLED("tasks", true),
NUMBER_TASKS_SUBMITTED("tasks", false),
NUMBER_SEGMENT_UPLOAD_TIMEOUT_EXCEEDED("SegmentUploadTimeouts", true),
diff --git a/pinot-controller/src/main/java/org/apache/pinot/controller/helix/core/minion/PinotTaskManager.java b/pinot-controller/src/main/java/org/apache/pinot/controller/helix/core/minion/PinotTaskManager.java
index 4defa04..6bb5c9d 100644
--- a/pinot-controller/src/main/java/org/apache/pinot/controller/helix/core/minion/PinotTaskManager.java
+++ b/pinot-controller/src/main/java/org/apache/pinot/controller/helix/core/minion/PinotTaskManager.java
@@ -88,10 +88,12 @@ public class PinotTaskManager extends ControllerPeriodicTask<Void> {
public synchronized Map<String, String> scheduleTasks() {
Map<String, String> tasksScheduled = scheduleTasks(_pinotHelixResourceManager.getAllTables());
- // For non-leader controller, perform non-leader cleanup
+ // NOTE: this method might be called from the Rest API instead of the periodic task scheduler on non-leader
+ // controllers, so if the task is stopped (non-leader controller), clean up the task
if (!isStarted()) {
- nonLeaderCleanUp();
+ cleanUpTask();
}
+
return tasksScheduled;
}
@@ -149,13 +151,6 @@ public class PinotTaskManager extends ControllerPeriodicTask<Void> {
return tasksScheduled;
}
- private void nonLeaderCleanUp() {
- LOGGER.info("Performing non-leader cleanups");
- for (String taskType : _taskGeneratorRegistry.getAllTaskTypes()) {
- _taskGeneratorRegistry.getTaskGenerator(taskType).nonLeaderCleanUp();
- }
- }
-
@Override
protected void processTables(List<String> tableNamesWithType) {
scheduleTasks(tableNamesWithType);
@@ -163,6 +158,9 @@ public class PinotTaskManager extends ControllerPeriodicTask<Void> {
@Override
public void cleanUpTask() {
- nonLeaderCleanUp();
+ LOGGER.info("Cleaning up all task generators");
+ for (String taskType : _taskGeneratorRegistry.getAllTaskTypes()) {
+ _taskGeneratorRegistry.getTaskGenerator(taskType).nonLeaderCleanUp();
+ }
}
}
diff --git a/pinot-controller/src/main/java/org/apache/pinot/controller/helix/core/periodictask/ControllerPeriodicTask.java b/pinot-controller/src/main/java/org/apache/pinot/controller/helix/core/periodictask/ControllerPeriodicTask.java
index eecc5e4..7e764f3 100644
--- a/pinot-controller/src/main/java/org/apache/pinot/controller/helix/core/periodictask/ControllerPeriodicTask.java
+++ b/pinot-controller/src/main/java/org/apache/pinot/controller/helix/core/periodictask/ControllerPeriodicTask.java
@@ -21,6 +21,7 @@ package org.apache.pinot.controller.helix.core.periodictask;
import java.util.List;
import javax.annotation.concurrent.ThreadSafe;
import org.apache.pinot.common.metrics.ControllerGauge;
+import org.apache.pinot.common.metrics.ControllerMeter;
import org.apache.pinot.common.metrics.ControllerMetrics;
import org.apache.pinot.controller.helix.core.PinotHelixResourceManager;
import org.apache.pinot.core.periodictask.BasePeriodicTask;
@@ -50,7 +51,13 @@ public abstract class ControllerPeriodicTask<C> extends BasePeriodicTask {
@Override
protected final void runTask() {
- processTables(_pinotHelixResourceManager.getAllTables());
+ _controllerMetrics.addMeteredTableValue(_taskName, ControllerMeter.CONTROLLER_PERIODIC_TASK_RUN, 1L);
+ try {
+ processTables(_pinotHelixResourceManager.getAllTables());
+ } catch (Exception e) {
+ LOGGER.error("Caught exception while running task: {}", _taskName, e);
+ _controllerMetrics.addMeteredTableValue(_taskName, ControllerMeter.CONTROLLER_PERIODIC_TASK_ERROR, 1L);
+ }
}
/**
@@ -82,7 +89,7 @@ public abstract class ControllerPeriodicTask<C> extends BasePeriodicTask {
}
/**
- * Can be override to provide context before processing the tables.
+ * Can be overridden to provide context before processing the tables.
*/
protected C preprocess() {
return null;
@@ -106,14 +113,14 @@ public abstract class ControllerPeriodicTask<C> extends BasePeriodicTask {
}
/**
- * Can be override to perform cleanups after processing the tables.
+ * Can be overridden to perform cleanups after processing the tables.
*/
protected void postprocess(C context) {
postprocess();
}
/**
- * Can be override to perform cleanups after processing the tables.
+ * Can be overridden to perform cleanups after processing the tables.
*/
protected void postprocess() {
}
diff --git a/pinot-core/src/main/java/org/apache/pinot/core/periodictask/BasePeriodicTask.java b/pinot-core/src/main/java/org/apache/pinot/core/periodictask/BasePeriodicTask.java
index 74b6b64..eb15aa9 100644
--- a/pinot-core/src/main/java/org/apache/pinot/core/periodictask/BasePeriodicTask.java
+++ b/pinot-core/src/main/java/org/apache/pinot/core/periodictask/BasePeriodicTask.java
@@ -30,7 +30,7 @@ import org.slf4j.LoggerFactory;
public abstract class BasePeriodicTask implements PeriodicTask {
private static final Logger LOGGER = LoggerFactory.getLogger(BasePeriodicTask.class);
- // Wait for at most 5 minutes while calling stop() for task to terminate
+ // Wait for at most 30 seconds while calling stop() for task to terminate
private static final long MAX_PERIODIC_TASK_STOP_TIME_MILLIS = 30_000L;
protected final String _taskName;
@@ -97,7 +97,9 @@ public abstract class BasePeriodicTask implements PeriodicTask {
}
/**
- * Can be override for extra task setups.
+ * Can be overridden for extra task setups. This method will be called when the periodic task starts.
+ * <p>
+ * Possible setups include adding or resetting the metric values.
*/
protected void setUpTask() {
}
@@ -136,8 +138,8 @@ public abstract class BasePeriodicTask implements PeriodicTask {
/**
* {@inheritDoc}
* <p>
- * This method sets {@code started} flag to false. If the task is running, this method will block for at most 5
- * minutes until the task finishes.
+ * This method sets {@code started} flag to false. If the task is running, this method will block for at most 30
+ * seconds until the task finishes.
*/
@Override
public final synchronized void stop() {
@@ -164,9 +166,9 @@ public abstract class BasePeriodicTask implements PeriodicTask {
}
long waitTimeMs = System.currentTimeMillis() - startTimeMs;
if (_running) {
- LOGGER.info("Task: {} is finished in {}ms", waitTimeMs);
- } else {
LOGGER.warn("Task: {} is not finished in {}ms", waitTimeMs);
+ } else {
+ LOGGER.info("Task: {} is finished in {}ms", waitTimeMs);
}
}
@@ -178,7 +180,9 @@ public abstract class BasePeriodicTask implements PeriodicTask {
}
/**
- * Can be override for extra task cleanups.
+ * Can be overridden for extra task cleanups. This method will be called when the periodic task stops.
+ * <p>
+ * Possible cleanups include removing or resetting the metric values.
*/
protected void cleanUpTask() {
}
---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscribe@pinot.apache.org
For additional commands, e-mail: commits-help@pinot.apache.org