You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@pinot.apache.org by ak...@apache.org on 2019/06/11 17:19:41 UTC
[incubator-pinot] branch master updated: [TE] Improve log debugging
- leverage logback capabilities to log thread id;
make logging class concise (#4291)
This is an automated email from the ASF dual-hosted git repository.
akshayrai09 pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/incubator-pinot.git
The following commit(s) were added to refs/heads/master by this push:
new 23cb9ed [TE] Improve log debugging - leverage logback capabilities to log thread id; make logging class concise (#4291)
23cb9ed is described below
commit 23cb9ed1aa6934d0ae55c775309fd5f5be4dffd1
Author: Akshay Rai <ak...@gmail.com>
AuthorDate: Tue Jun 11 10:19:36 2019 -0700
[TE] Improve log debugging - leverage logback capabilities to log thread id; make logging class concise (#4291)
Sample log lines:
[06-07-2019 16:35:12] INFO [task-executor-2] o.a.p.t.a.t.TaskDriver : Finding next task to execute
[06-07-2019 16:35:12] INFO [task-executor-2] o.a.p.t.a.t.TaskDriver : Trying to find a task to execute
[06-07-2019 16:35:34] INFO [pool-11-thread-1] o.a.p.t.d.a.DetectionAlertScheduler : Scheduling all the subscription configs
[06-07-2019 16:35:34] INFO [pool-11-thread-1] o.a.p.t.d.a.DetectionAlertScheduler : Scheduled jobs [DETECTION_ALERT_104821830]
---
.../pinot/thirdeye/anomaly/task/TaskDriver.java | 40 ++++++++++++----------
.../pinot/thirdeye/detection/alert/AlertUtils.java | 2 +-
.../detection/alert/DetectionAlertScheduler.java | 4 +--
.../detection/alert/DetectionAlertTaskRunner.java | 21 ++++++------
.../alert/scheme/DetectionEmailAlerter.java | 2 +-
5 files changed, 34 insertions(+), 35 deletions(-)
diff --git a/thirdeye/thirdeye-pinot/src/main/java/org/apache/pinot/thirdeye/anomaly/task/TaskDriver.java b/thirdeye/thirdeye-pinot/src/main/java/org/apache/pinot/thirdeye/anomaly/task/TaskDriver.java
index 8273437..c544afa 100644
--- a/thirdeye/thirdeye-pinot/src/main/java/org/apache/pinot/thirdeye/anomaly/task/TaskDriver.java
+++ b/thirdeye/thirdeye-pinot/src/main/java/org/apache/pinot/thirdeye/anomaly/task/TaskDriver.java
@@ -19,12 +19,13 @@
package org.apache.pinot.thirdeye.anomaly.task;
+import com.google.common.util.concurrent.ThreadFactoryBuilder;
import org.apache.pinot.thirdeye.anomaly.classification.classifier.AnomalyClassifierFactory;
import org.apache.pinot.thirdeye.anomaly.utils.AnomalyUtils;
import org.apache.pinot.thirdeye.anomaly.utils.ThirdeyeMetricsUtil;
+import org.apache.pinot.thirdeye.datalayer.util.Predicate;
import org.apache.pinot.thirdeye.detector.email.filter.AlertFilterFactory;
-import org.apache.pinot.thirdeye.util.ThirdEyeUtils;
import java.util.ArrayList;
import java.util.Collections;
import java.util.HashSet;
@@ -70,7 +71,9 @@ public class TaskDriver {
driverConfiguration = thirdEyeAnomalyConfiguration.getTaskDriverConfiguration();
workerId = thirdEyeAnomalyConfiguration.getId();
taskDAO = DAO_REGISTRY.getTaskDAO();
- taskExecutorService = Executors.newFixedThreadPool(driverConfiguration.getMaxParallelTasks());
+ taskExecutorService = Executors.newFixedThreadPool(
+ driverConfiguration.getMaxParallelTasks(),
+ new ThreadFactoryBuilder().setNameFormat("task-executor-%d").build());
taskContext = new TaskContext();
taskContext.setAnomalyFunctionFactory(anomalyFunctionFactory);
taskContext.setThirdEyeAnomalyConfiguration(thirdEyeAnomalyConfiguration);
@@ -85,7 +88,7 @@ public class TaskDriver {
Callable callable = new Callable() {
@Override public Object call() throws Exception {
while (!shutdown) {
- LOG.info("Thread {} : Finding next task to execute.", Thread.currentThread().getId());
+ LOG.info("Finding next task to execute");
// select a task to execute, and update it to RUNNING
TaskDTO anomalyTaskSpec = TaskDriver.this.acquireTask();
@@ -95,7 +98,7 @@ public class TaskDriver {
ThirdeyeMetricsUtil.taskCounter.inc();
try {
- LOG.info("Thread {} : Executing task: {} {}", Thread.currentThread().getId(), anomalyTaskSpec.getJobName(),
+ LOG.info("Executing task: {} {}", anomalyTaskSpec.getJobName(),
anomalyTaskSpec.getTaskInfo());
// execute the selected task
@@ -105,7 +108,7 @@ public class TaskDriver {
updateTaskStartTime(anomalyTaskSpec.getId());
List<TaskResult> taskResults = taskRunner.execute(taskInfo, taskContext);
- LOG.info("Thread {} : DONE Executing task: {}", Thread.currentThread().getId(), anomalyTaskSpec.getId());
+ LOG.info("DONE Executing task: {}", anomalyTaskSpec.getId());
// update status to COMPLETED
updateStatusAndTaskEndTime(anomalyTaskSpec.getId(), TaskStatus.RUNNING, TaskStatus.COMPLETED, "");
ThirdeyeMetricsUtil.taskSuccessCounter.inc();
@@ -124,17 +127,17 @@ public class TaskDriver {
} finally {
long elapsedTime = System.nanoTime() - tStart;
- LOG.info("Thread {} : Task {} took {} nano seconds", Thread.currentThread().getId(),
- anomalyTaskSpec.getId(), elapsedTime);
+ LOG.info("Task {} took {} nano seconds", anomalyTaskSpec.getId(), elapsedTime);
ThirdeyeMetricsUtil.taskDurationCounter.inc(elapsedTime);
}
}
}
+ LOG.info("Thread safely quiting");
return 0;
}
};
taskExecutorService.submit(callable);
- LOG.info("Thread {} : Started task driver", Thread.currentThread().getId());
+ LOG.info("Starting task driver");
}
}
@@ -149,7 +152,7 @@ public class TaskDriver {
* @return null if system is shutting down.
*/
private TaskDTO acquireTask() {
- LOG.info("Thread {} : Trying to find a task to execute", Thread.currentThread().getId());
+ LOG.info("Trying to find a task to execute");
while (!shutdown) {
List<TaskDTO> anomalyTasks = new ArrayList<>();
boolean hasFetchError = false;
@@ -166,7 +169,7 @@ public class TaskDriver {
}
if (CollectionUtils.isNotEmpty(anomalyTasks)) {
- LOG.info("Thread {} : Found {} tasks in waiting state", Thread.currentThread().getId(), anomalyTasks.size());
+ LOG.info("Found {} tasks in waiting state", anomalyTasks.size());
// shuffle candidate tasks to avoid synchronized patterns across threads (and hosts)
Collections.shuffle(anomalyTasks);
@@ -179,14 +182,13 @@ public class TaskDriver {
success = taskDAO
.updateStatusAndWorkerId(workerId, anomalyTaskSpec.getId(), allowedOldTaskStatus,
TaskStatus.RUNNING, anomalyTaskSpec.getVersion());
- LOG.info("Thread {} : Trying to acquire task id [{}], success status: [{}] with version [{}]",
- Thread.currentThread().getId(), anomalyTaskSpec.getId(), success, anomalyTaskSpec.getVersion());
+ LOG.info("Trying to acquire task id [{}], success status: [{}] with version [{}]",
+ anomalyTaskSpec.getId(), success, anomalyTaskSpec.getVersion());
} catch (Exception e) {
- LOG.warn("Thread {} : Got exception when acquiring task. (Worker Id: {})", Thread.currentThread().getId(),
- workerId, e);
+ LOG.warn("Got exception when acquiring task. (Worker Id: {})", workerId, e);
}
if (success) {
- LOG.info("Thread {} has acquired task: {}", Thread.currentThread().getId(), anomalyTaskSpec);
+ LOG.info("Acquired task: {}", anomalyTaskSpec);
return anomalyTaskSpec;
}
}
@@ -216,21 +218,21 @@ public class TaskDriver {
}
private void updateTaskStartTime(long taskId) {
- LOG.info("Thread {} : Starting updateTaskStartTime for task id {}", Thread.currentThread().getId(), taskId);
+ LOG.info("Starting updateTaskStartTime for task id {}", taskId);
try {
long startTime = System.currentTimeMillis();
taskDAO.updateTaskStartTime(taskId, startTime);
- LOG.info("Thread {} : Updated task start time {}", Thread.currentThread().getId(), startTime);
+ LOG.info("Updated task start time {}", startTime);
} catch (Exception e) {
LOG.error("Exception in updating task start time", e);
}
}
private void updateStatusAndTaskEndTime(long taskId, TaskStatus oldStatus, TaskStatus newStatus, String message) {
- LOG.info("Thread {} : Starting updateStatus for task id {}", Thread.currentThread().getId(), taskId);
+ LOG.info("Starting updateStatus for task id {}", taskId);
try {
taskDAO.updateStatusAndTaskEndTime(taskId, oldStatus, newStatus, System.currentTimeMillis(), message);
- LOG.info("Thread {} : Updated status {}", Thread.currentThread().getId(), newStatus);
+ LOG.info("Updated status {}", newStatus);
} catch (Exception e) {
LOG.error("Exception in updating status and task end time", e);
}
diff --git a/thirdeye/thirdeye-pinot/src/main/java/org/apache/pinot/thirdeye/detection/alert/AlertUtils.java b/thirdeye/thirdeye-pinot/src/main/java/org/apache/pinot/thirdeye/detection/alert/AlertUtils.java
index a298267..d715555 100644
--- a/thirdeye/thirdeye-pinot/src/main/java/org/apache/pinot/thirdeye/detection/alert/AlertUtils.java
+++ b/thirdeye/thirdeye-pinot/src/main/java/org/apache/pinot/thirdeye/detection/alert/AlertUtils.java
@@ -84,7 +84,7 @@ public class AlertUtils {
public static long getHighWaterMark(Collection<MergedAnomalyResultDTO> anomalies) {
if (anomalies.isEmpty()) {
- return -1;
+ return 0;
}
return Collections.max(Collections2.transform(anomalies, mergedAnomalyResultDTO -> mergedAnomalyResultDTO.getId()));
}
diff --git a/thirdeye/thirdeye-pinot/src/main/java/org/apache/pinot/thirdeye/detection/alert/DetectionAlertScheduler.java b/thirdeye/thirdeye-pinot/src/main/java/org/apache/pinot/thirdeye/detection/alert/DetectionAlertScheduler.java
index 6554a7a..3e091f3 100644
--- a/thirdeye/thirdeye-pinot/src/main/java/org/apache/pinot/thirdeye/detection/alert/DetectionAlertScheduler.java
+++ b/thirdeye/thirdeye-pinot/src/main/java/org/apache/pinot/thirdeye/detection/alert/DetectionAlertScheduler.java
@@ -20,13 +20,11 @@
package org.apache.pinot.thirdeye.detection.alert;
import java.util.stream.Collectors;
-import org.apache.pinot.thirdeye.anomaly.alert.v2.AlertJobSchedulerV2;
import org.apache.pinot.thirdeye.anomaly.task.TaskConstants;
import org.apache.pinot.thirdeye.anomaly.utils.AnomalyUtils;
import org.apache.pinot.thirdeye.datalayer.bao.DetectionAlertConfigManager;
import org.apache.pinot.thirdeye.datalayer.dto.DetectionAlertConfigDTO;
import org.apache.pinot.thirdeye.datasource.DAORegistry;
-import java.util.ArrayList;
import java.util.List;
import java.util.Set;
import java.util.concurrent.Executors;
@@ -53,7 +51,7 @@ import org.slf4j.LoggerFactory;
* in the cron scheduler.
*/
public class DetectionAlertScheduler implements Runnable {
- private static final Logger LOG = LoggerFactory.getLogger(AlertJobSchedulerV2.class);
+ private static final Logger LOG = LoggerFactory.getLogger(DetectionAlertScheduler.class);
private static final int DEFAULT_ALERT_DELAY = 1;
private static final TimeUnit DEFAULT_ALERT_DELAY_UNIT = TimeUnit.MINUTES;
diff --git a/thirdeye/thirdeye-pinot/src/main/java/org/apache/pinot/thirdeye/detection/alert/DetectionAlertTaskRunner.java b/thirdeye/thirdeye-pinot/src/main/java/org/apache/pinot/thirdeye/detection/alert/DetectionAlertTaskRunner.java
index b4fba8f..c644ed0 100644
--- a/thirdeye/thirdeye-pinot/src/main/java/org/apache/pinot/thirdeye/detection/alert/DetectionAlertTaskRunner.java
+++ b/thirdeye/thirdeye-pinot/src/main/java/org/apache/pinot/thirdeye/detection/alert/DetectionAlertTaskRunner.java
@@ -80,19 +80,18 @@ public class DetectionAlertTaskRunner implements TaskRunner {
}
private void updateAlertConfigWatermarks(DetectionAlertFilterResult result, DetectionAlertConfigDTO alertConfig) {
- long highWaterMark = AlertUtils.getHighWaterMark(result.getAllAnomalies());
- if (alertConfig.getHighWaterMark() != null) {
- highWaterMark = Math.max(alertConfig.getHighWaterMark(), highWaterMark);
- }
+ if (!result.getAllAnomalies().isEmpty()) {
+ long highWaterMark = AlertUtils.getHighWaterMark(result.getAllAnomalies());
+ if (alertConfig.getHighWaterMark() != null) {
+ highWaterMark = Math.max(alertConfig.getHighWaterMark(), highWaterMark);
+ }
- alertConfig.setHighWaterMark(highWaterMark);
- alertConfig.setVectorClocks(
- AlertUtils.mergeVectorClock(alertConfig.getVectorClocks(),
- AlertUtils.makeVectorClock(result.getAllAnomalies()))
- );
+ alertConfig.setHighWaterMark(highWaterMark);
+ alertConfig.setVectorClocks(AlertUtils.mergeVectorClock(alertConfig.getVectorClocks(), AlertUtils.makeVectorClock(result.getAllAnomalies())));
- LOG.info("Updating watermarks for alertConfigDAO : {}", alertConfig.getId());
- this.alertConfigDAO.save(alertConfig);
+ LOG.info("Updating watermarks for alertConfigDAO : {}", alertConfig.getId());
+ this.alertConfigDAO.save(alertConfig);
+ }
}
@Override
diff --git a/thirdeye/thirdeye-pinot/src/main/java/org/apache/pinot/thirdeye/detection/alert/scheme/DetectionEmailAlerter.java b/thirdeye/thirdeye-pinot/src/main/java/org/apache/pinot/thirdeye/detection/alert/scheme/DetectionEmailAlerter.java
index 56d834f..cf383c2 100644
--- a/thirdeye/thirdeye-pinot/src/main/java/org/apache/pinot/thirdeye/detection/alert/scheme/DetectionEmailAlerter.java
+++ b/thirdeye/thirdeye-pinot/src/main/java/org/apache/pinot/thirdeye/detection/alert/scheme/DetectionEmailAlerter.java
@@ -182,7 +182,7 @@ public class DetectionEmailAlerter extends DetectionAlertScheme {
public void run() throws Exception {
Preconditions.checkNotNull(result);
if (result.getAllAnomalies().size() == 0) {
- LOG.info("Zero anomalies found, skipping sending email alert for {}", config.getId());
+ LOG.info("Zero anomalies found, skipping email alert for {}", config.getId());
return;
}
---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscribe@pinot.apache.org
For additional commands, e-mail: commits-help@pinot.apache.org