You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@pinot.apache.org by ak...@apache.org on 2019/06/11 17:19:41 UTC

[incubator-pinot] branch master updated: [TE] Improve log debugging - leverage logback capabilities to log thread id; make logging class concise (#4291)

This is an automated email from the ASF dual-hosted git repository.

akshayrai09 pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/incubator-pinot.git


The following commit(s) were added to refs/heads/master by this push:
     new 23cb9ed  [TE] Improve log debugging - leverage logback capabilities to log thread id; make logging class concise (#4291)
23cb9ed is described below

commit 23cb9ed1aa6934d0ae55c775309fd5f5be4dffd1
Author: Akshay Rai <ak...@gmail.com>
AuthorDate: Tue Jun 11 10:19:36 2019 -0700

    [TE] Improve log debugging - leverage logback capabilities to log thread id; make logging class concise (#4291)
    
    Sample log lines:
    
    [06-07-2019 16:35:12] INFO  [task-executor-2] o.a.p.t.a.t.TaskDriver : Finding next task to execute
    [06-07-2019 16:35:12] INFO  [task-executor-2] o.a.p.t.a.t.TaskDriver : Trying to find a task to execute
    [06-07-2019 16:35:34] INFO  [pool-11-thread-1] o.a.p.t.d.a.DetectionAlertScheduler : Scheduling all the subscription configs
    [06-07-2019 16:35:34] INFO  [pool-11-thread-1] o.a.p.t.d.a.DetectionAlertScheduler : Scheduled jobs [DETECTION_ALERT_104821830]
---
 .../pinot/thirdeye/anomaly/task/TaskDriver.java    | 40 ++++++++++++----------
 .../pinot/thirdeye/detection/alert/AlertUtils.java |  2 +-
 .../detection/alert/DetectionAlertScheduler.java   |  4 +--
 .../detection/alert/DetectionAlertTaskRunner.java  | 21 ++++++------
 .../alert/scheme/DetectionEmailAlerter.java        |  2 +-
 5 files changed, 34 insertions(+), 35 deletions(-)

diff --git a/thirdeye/thirdeye-pinot/src/main/java/org/apache/pinot/thirdeye/anomaly/task/TaskDriver.java b/thirdeye/thirdeye-pinot/src/main/java/org/apache/pinot/thirdeye/anomaly/task/TaskDriver.java
index 8273437..c544afa 100644
--- a/thirdeye/thirdeye-pinot/src/main/java/org/apache/pinot/thirdeye/anomaly/task/TaskDriver.java
+++ b/thirdeye/thirdeye-pinot/src/main/java/org/apache/pinot/thirdeye/anomaly/task/TaskDriver.java
@@ -19,12 +19,13 @@
 
 package org.apache.pinot.thirdeye.anomaly.task;
 
+import com.google.common.util.concurrent.ThreadFactoryBuilder;
 import org.apache.pinot.thirdeye.anomaly.classification.classifier.AnomalyClassifierFactory;
 import org.apache.pinot.thirdeye.anomaly.utils.AnomalyUtils;
 import org.apache.pinot.thirdeye.anomaly.utils.ThirdeyeMetricsUtil;
+import org.apache.pinot.thirdeye.datalayer.util.Predicate;
 import org.apache.pinot.thirdeye.detector.email.filter.AlertFilterFactory;
 
-import org.apache.pinot.thirdeye.util.ThirdEyeUtils;
 import java.util.ArrayList;
 import java.util.Collections;
 import java.util.HashSet;
@@ -70,7 +71,9 @@ public class TaskDriver {
     driverConfiguration = thirdEyeAnomalyConfiguration.getTaskDriverConfiguration();
     workerId = thirdEyeAnomalyConfiguration.getId();
     taskDAO = DAO_REGISTRY.getTaskDAO();
-    taskExecutorService = Executors.newFixedThreadPool(driverConfiguration.getMaxParallelTasks());
+    taskExecutorService = Executors.newFixedThreadPool(
+            driverConfiguration.getMaxParallelTasks(),
+            new ThreadFactoryBuilder().setNameFormat("task-executor-%d").build());
     taskContext = new TaskContext();
     taskContext.setAnomalyFunctionFactory(anomalyFunctionFactory);
     taskContext.setThirdEyeAnomalyConfiguration(thirdEyeAnomalyConfiguration);
@@ -85,7 +88,7 @@ public class TaskDriver {
       Callable callable = new Callable() {
         @Override public Object call() throws Exception {
           while (!shutdown) {
-            LOG.info("Thread {} : Finding next task to execute.", Thread.currentThread().getId());
+            LOG.info("Finding next task to execute");
 
             // select a task to execute, and update it to RUNNING
             TaskDTO anomalyTaskSpec = TaskDriver.this.acquireTask();
@@ -95,7 +98,7 @@ public class TaskDriver {
               ThirdeyeMetricsUtil.taskCounter.inc();
 
               try {
-                LOG.info("Thread {} : Executing task: {} {}", Thread.currentThread().getId(), anomalyTaskSpec.getJobName(),
+                LOG.info("Executing task: {} {}", anomalyTaskSpec.getJobName(),
                     anomalyTaskSpec.getTaskInfo());
 
                 // execute the selected task
@@ -105,7 +108,7 @@ public class TaskDriver {
 
                 updateTaskStartTime(anomalyTaskSpec.getId());
                 List<TaskResult> taskResults = taskRunner.execute(taskInfo, taskContext);
-                LOG.info("Thread {} : DONE Executing task: {}", Thread.currentThread().getId(), anomalyTaskSpec.getId());
+                LOG.info("DONE Executing task: {}", anomalyTaskSpec.getId());
                 // update status to COMPLETED
                 updateStatusAndTaskEndTime(anomalyTaskSpec.getId(), TaskStatus.RUNNING, TaskStatus.COMPLETED, "");
                 ThirdeyeMetricsUtil.taskSuccessCounter.inc();
@@ -124,17 +127,17 @@ public class TaskDriver {
 
               } finally {
                 long elapsedTime = System.nanoTime() - tStart;
-                LOG.info("Thread {} : Task {} took {} nano seconds", Thread.currentThread().getId(),
-                    anomalyTaskSpec.getId(), elapsedTime);
+                LOG.info("Task {} took {} nano seconds", anomalyTaskSpec.getId(), elapsedTime);
                 ThirdeyeMetricsUtil.taskDurationCounter.inc(elapsedTime);
               }
             }
           }
+          LOG.info("Thread safely quiting");
           return 0;
         }
       };
       taskExecutorService.submit(callable);
-      LOG.info("Thread {} : Started task driver", Thread.currentThread().getId());
+      LOG.info("Starting task driver");
     }
   }
 
@@ -149,7 +152,7 @@ public class TaskDriver {
    * @return null if system is shutting down.
    */
   private TaskDTO acquireTask() {
-    LOG.info("Thread {} : Trying to find a task to execute", Thread.currentThread().getId());
+    LOG.info("Trying to find a task to execute");
     while (!shutdown) {
       List<TaskDTO> anomalyTasks = new ArrayList<>();
       boolean hasFetchError = false;
@@ -166,7 +169,7 @@ public class TaskDriver {
       }
 
       if (CollectionUtils.isNotEmpty(anomalyTasks)) {
-        LOG.info("Thread {} : Found {} tasks in waiting state", Thread.currentThread().getId(), anomalyTasks.size());
+        LOG.info("Found {} tasks in waiting state", anomalyTasks.size());
 
         // shuffle candidate tasks to avoid synchronized patterns across threads (and hosts)
         Collections.shuffle(anomalyTasks);
@@ -179,14 +182,13 @@ public class TaskDriver {
             success = taskDAO
                 .updateStatusAndWorkerId(workerId, anomalyTaskSpec.getId(), allowedOldTaskStatus,
                     TaskStatus.RUNNING, anomalyTaskSpec.getVersion());
-            LOG.info("Thread {} : Trying to acquire task id [{}], success status: [{}] with version [{}]",
-                Thread.currentThread().getId(), anomalyTaskSpec.getId(), success, anomalyTaskSpec.getVersion());
+            LOG.info("Trying to acquire task id [{}], success status: [{}] with version [{}]",
+                anomalyTaskSpec.getId(), success, anomalyTaskSpec.getVersion());
           } catch (Exception e) {
-            LOG.warn("Thread {} : Got exception when acquiring task. (Worker Id: {})", Thread.currentThread().getId(),
-                workerId, e);
+            LOG.warn("Got exception when acquiring task. (Worker Id: {})", workerId, e);
           }
           if (success) {
-            LOG.info("Thread {} has acquired task: {}", Thread.currentThread().getId(), anomalyTaskSpec);
+            LOG.info("Acquired task: {}", anomalyTaskSpec);
             return anomalyTaskSpec;
           }
         }
@@ -216,21 +218,21 @@ public class TaskDriver {
   }
 
   private void updateTaskStartTime(long taskId) {
-    LOG.info("Thread {} : Starting updateTaskStartTime for task id {}", Thread.currentThread().getId(), taskId);
+    LOG.info("Starting updateTaskStartTime for task id {}", taskId);
     try {
       long startTime = System.currentTimeMillis();
       taskDAO.updateTaskStartTime(taskId, startTime);
-      LOG.info("Thread {} : Updated task start time {}", Thread.currentThread().getId(), startTime);
+      LOG.info("Updated task start time {}", startTime);
     } catch (Exception e) {
       LOG.error("Exception in updating task start time", e);
     }
   }
 
   private void updateStatusAndTaskEndTime(long taskId, TaskStatus oldStatus, TaskStatus newStatus, String message) {
-    LOG.info("Thread {} : Starting updateStatus for task id {}", Thread.currentThread().getId(), taskId);
+    LOG.info("Starting updateStatus for task id {}", taskId);
     try {
       taskDAO.updateStatusAndTaskEndTime(taskId, oldStatus, newStatus, System.currentTimeMillis(), message);
-      LOG.info("Thread {} : Updated status {}", Thread.currentThread().getId(), newStatus);
+      LOG.info("Updated status {}", newStatus);
     } catch (Exception e) {
       LOG.error("Exception in updating status and task end time", e);
     }
diff --git a/thirdeye/thirdeye-pinot/src/main/java/org/apache/pinot/thirdeye/detection/alert/AlertUtils.java b/thirdeye/thirdeye-pinot/src/main/java/org/apache/pinot/thirdeye/detection/alert/AlertUtils.java
index a298267..d715555 100644
--- a/thirdeye/thirdeye-pinot/src/main/java/org/apache/pinot/thirdeye/detection/alert/AlertUtils.java
+++ b/thirdeye/thirdeye-pinot/src/main/java/org/apache/pinot/thirdeye/detection/alert/AlertUtils.java
@@ -84,7 +84,7 @@ public class AlertUtils {
 
   public static long getHighWaterMark(Collection<MergedAnomalyResultDTO> anomalies) {
     if (anomalies.isEmpty()) {
-      return -1;
+      return 0;
     }
     return Collections.max(Collections2.transform(anomalies, mergedAnomalyResultDTO -> mergedAnomalyResultDTO.getId()));
   }
diff --git a/thirdeye/thirdeye-pinot/src/main/java/org/apache/pinot/thirdeye/detection/alert/DetectionAlertScheduler.java b/thirdeye/thirdeye-pinot/src/main/java/org/apache/pinot/thirdeye/detection/alert/DetectionAlertScheduler.java
index 6554a7a..3e091f3 100644
--- a/thirdeye/thirdeye-pinot/src/main/java/org/apache/pinot/thirdeye/detection/alert/DetectionAlertScheduler.java
+++ b/thirdeye/thirdeye-pinot/src/main/java/org/apache/pinot/thirdeye/detection/alert/DetectionAlertScheduler.java
@@ -20,13 +20,11 @@
 package org.apache.pinot.thirdeye.detection.alert;
 
 import java.util.stream.Collectors;
-import org.apache.pinot.thirdeye.anomaly.alert.v2.AlertJobSchedulerV2;
 import org.apache.pinot.thirdeye.anomaly.task.TaskConstants;
 import org.apache.pinot.thirdeye.anomaly.utils.AnomalyUtils;
 import org.apache.pinot.thirdeye.datalayer.bao.DetectionAlertConfigManager;
 import org.apache.pinot.thirdeye.datalayer.dto.DetectionAlertConfigDTO;
 import org.apache.pinot.thirdeye.datasource.DAORegistry;
-import java.util.ArrayList;
 import java.util.List;
 import java.util.Set;
 import java.util.concurrent.Executors;
@@ -53,7 +51,7 @@ import org.slf4j.LoggerFactory;
  * in the cron scheduler.
  */
 public class DetectionAlertScheduler implements Runnable {
-  private static final Logger LOG = LoggerFactory.getLogger(AlertJobSchedulerV2.class);
+  private static final Logger LOG = LoggerFactory.getLogger(DetectionAlertScheduler.class);
   private static final int DEFAULT_ALERT_DELAY = 1;
   private static final TimeUnit DEFAULT_ALERT_DELAY_UNIT = TimeUnit.MINUTES;
 
diff --git a/thirdeye/thirdeye-pinot/src/main/java/org/apache/pinot/thirdeye/detection/alert/DetectionAlertTaskRunner.java b/thirdeye/thirdeye-pinot/src/main/java/org/apache/pinot/thirdeye/detection/alert/DetectionAlertTaskRunner.java
index b4fba8f..c644ed0 100644
--- a/thirdeye/thirdeye-pinot/src/main/java/org/apache/pinot/thirdeye/detection/alert/DetectionAlertTaskRunner.java
+++ b/thirdeye/thirdeye-pinot/src/main/java/org/apache/pinot/thirdeye/detection/alert/DetectionAlertTaskRunner.java
@@ -80,19 +80,18 @@ public class DetectionAlertTaskRunner implements TaskRunner {
   }
 
   private void updateAlertConfigWatermarks(DetectionAlertFilterResult result, DetectionAlertConfigDTO alertConfig) {
-    long highWaterMark = AlertUtils.getHighWaterMark(result.getAllAnomalies());
-    if (alertConfig.getHighWaterMark() != null) {
-      highWaterMark = Math.max(alertConfig.getHighWaterMark(), highWaterMark);
-    }
+    if (!result.getAllAnomalies().isEmpty()) {
+      long highWaterMark = AlertUtils.getHighWaterMark(result.getAllAnomalies());
+      if (alertConfig.getHighWaterMark() != null) {
+        highWaterMark = Math.max(alertConfig.getHighWaterMark(), highWaterMark);
+      }
 
-    alertConfig.setHighWaterMark(highWaterMark);
-    alertConfig.setVectorClocks(
-        AlertUtils.mergeVectorClock(alertConfig.getVectorClocks(),
-        AlertUtils.makeVectorClock(result.getAllAnomalies()))
-    );
+      alertConfig.setHighWaterMark(highWaterMark);
+      alertConfig.setVectorClocks(AlertUtils.mergeVectorClock(alertConfig.getVectorClocks(), AlertUtils.makeVectorClock(result.getAllAnomalies())));
 
-    LOG.info("Updating watermarks for alertConfigDAO : {}", alertConfig.getId());
-    this.alertConfigDAO.save(alertConfig);
+      LOG.info("Updating watermarks for alertConfigDAO : {}", alertConfig.getId());
+      this.alertConfigDAO.save(alertConfig);
+    }
   }
 
   @Override
diff --git a/thirdeye/thirdeye-pinot/src/main/java/org/apache/pinot/thirdeye/detection/alert/scheme/DetectionEmailAlerter.java b/thirdeye/thirdeye-pinot/src/main/java/org/apache/pinot/thirdeye/detection/alert/scheme/DetectionEmailAlerter.java
index 56d834f..cf383c2 100644
--- a/thirdeye/thirdeye-pinot/src/main/java/org/apache/pinot/thirdeye/detection/alert/scheme/DetectionEmailAlerter.java
+++ b/thirdeye/thirdeye-pinot/src/main/java/org/apache/pinot/thirdeye/detection/alert/scheme/DetectionEmailAlerter.java
@@ -182,7 +182,7 @@ public class DetectionEmailAlerter extends DetectionAlertScheme {
   public void run() throws Exception {
     Preconditions.checkNotNull(result);
     if (result.getAllAnomalies().size() == 0) {
-      LOG.info("Zero anomalies found, skipping sending email alert for {}", config.getId());
+      LOG.info("Zero anomalies found, skipping email alert for {}", config.getId());
       return;
     }
 


---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscribe@pinot.apache.org
For additional commands, e-mail: commits-help@pinot.apache.org