You are viewing a plain text version of this content. The canonical link for it is here.
Posted to dev@pinot.apache.org by GitBox <gi...@apache.org> on 2019/01/07 18:15:42 UTC

[GitHub] jihaozh closed pull request #3647: [TE] fix database connection close issue

jihaozh closed pull request #3647: [TE] fix database connection close issue
URL: https://github.com/apache/incubator-pinot/pull/3647
 
 
   

This is a PR merged from a forked repository.
As GitHub hides the original diff on merge, it is displayed below for
the sake of provenance:

As this is a foreign pull request (from a fork), the diff is supplied
below (as it won't show otherwise due to GitHub magic):

diff --git a/thirdeye/thirdeye-pinot/src/main/java/com/linkedin/thirdeye/anomaly/task/TaskDriver.java b/thirdeye/thirdeye-pinot/src/main/java/com/linkedin/thirdeye/anomaly/task/TaskDriver.java
index a4e9c7fa1f..93f97da20c 100644
--- a/thirdeye/thirdeye-pinot/src/main/java/com/linkedin/thirdeye/anomaly/task/TaskDriver.java
+++ b/thirdeye/thirdeye-pinot/src/main/java/com/linkedin/thirdeye/anomaly/task/TaskDriver.java
@@ -18,8 +18,10 @@
 
 import com.linkedin.thirdeye.anomaly.classification.classifier.AnomalyClassifierFactory;
 import com.linkedin.thirdeye.anomaly.utils.AnomalyUtils;
+import com.linkedin.thirdeye.anomaly.utils.ThirdeyeMetricsUtil;
 import com.linkedin.thirdeye.detector.email.filter.AlertFilterFactory;
 
+import com.linkedin.thirdeye.util.ThirdEyeUtils;
 import java.util.ArrayList;
 import java.util.Collections;
 import java.util.HashSet;
@@ -86,6 +88,9 @@ public void start() throws Exception {
             TaskDTO anomalyTaskSpec = TaskDriver.this.acquireTask();
 
             if (anomalyTaskSpec != null) { // a task has acquired and we must finish executing it before termination
+              long tStart = System.nanoTime();
+              ThirdeyeMetricsUtil.taskCounter.inc();
+
               try {
                 LOG.info("Thread {} : Executing task: {} {}", Thread.currentThread().getId(), anomalyTaskSpec.getId(),
                     anomalyTaskSpec.getTaskInfo());
@@ -101,8 +106,12 @@ public void start() throws Exception {
                 LOG.info("Thread {} : DONE Executing task: {}", Thread.currentThread().getId(), anomalyTaskSpec.getId());
                 // update status to COMPLETED
                 updateStatusAndTaskEndTime(anomalyTaskSpec.getId(), TaskStatus.RUNNING, TaskStatus.COMPLETED, "");
+                ThirdeyeMetricsUtil.taskSuccessCounter.inc();
+
               } catch (Exception e) {
+                ThirdeyeMetricsUtil.taskExceptionCounter.inc();
                 LOG.error("Exception in electing and executing task", e);
+
                 try {
                   // update task status failed
                   updateStatusAndTaskEndTime(anomalyTaskSpec.getId(), TaskStatus.RUNNING, TaskStatus.FAILED,
@@ -110,6 +119,9 @@ public void start() throws Exception {
                 } catch (Exception e1) {
                   LOG.error("Error in updating failed status", e1);
                 }
+
+              } finally {
+                ThirdeyeMetricsUtil.taskDurationCounter.inc(System.nanoTime() - tStart);
               }
             }
           }
diff --git a/thirdeye/thirdeye-pinot/src/main/java/com/linkedin/thirdeye/anomaly/utils/ThirdeyeMetricsUtil.java b/thirdeye/thirdeye-pinot/src/main/java/com/linkedin/thirdeye/anomaly/utils/ThirdeyeMetricsUtil.java
index c7cab9a9fe..876800d217 100644
--- a/thirdeye/thirdeye-pinot/src/main/java/com/linkedin/thirdeye/anomaly/utils/ThirdeyeMetricsUtil.java
+++ b/thirdeye/thirdeye-pinot/src/main/java/com/linkedin/thirdeye/anomaly/utils/ThirdeyeMetricsUtil.java
@@ -16,8 +16,10 @@
 
 package com.linkedin.thirdeye.anomaly.utils;
 
+import com.linkedin.thirdeye.datasource.DAORegistry;
 import com.linkedin.thirdeye.tracking.RequestLog;
 import com.yammer.metrics.core.Counter;
+import com.yammer.metrics.core.Gauge;
 import com.yammer.metrics.core.MetricsRegistry;
 import com.yammer.metrics.reporting.JmxReporter;
 
@@ -34,6 +36,26 @@
   private ThirdeyeMetricsUtil() {
   }
 
+  public static final Counter taskCounter =
+      metricsRegistry.newCounter(ThirdeyeMetricsUtil.class, "taskCounter");
+
+  public static final Counter taskSuccessCounter =
+      metricsRegistry.newCounter(ThirdeyeMetricsUtil.class, "taskSuccessCounter");
+
+  public static final Counter taskExceptionCounter =
+      metricsRegistry.newCounter(ThirdeyeMetricsUtil.class, "taskExceptionCounter");
+
+  public static final Counter taskDurationCounter =
+      metricsRegistry.newCounter(ThirdeyeMetricsUtil.class, "taskDurationCounter");
+
+  public static final Gauge<Integer> taskBacklogGauge =
+      metricsRegistry.newGauge(ThirdeyeMetricsUtil.class, "taskBacklogGauge", new Gauge<Integer>() {
+        @Override
+        public Integer value() {
+          return DAORegistry.getInstance().getTaskDAO().countWaiting();
+        }
+      });
+
   public static final Counter detectionTaskCounter =
       metricsRegistry.newCounter(ThirdeyeMetricsUtil.class, "detectionTaskCounter");
 
diff --git a/thirdeye/thirdeye-pinot/src/main/java/com/linkedin/thirdeye/datalayer/bao/TaskManager.java b/thirdeye/thirdeye-pinot/src/main/java/com/linkedin/thirdeye/datalayer/bao/TaskManager.java
index a9e79e7e1f..8c935668ec 100644
--- a/thirdeye/thirdeye-pinot/src/main/java/com/linkedin/thirdeye/datalayer/bao/TaskManager.java
+++ b/thirdeye/thirdeye-pinot/src/main/java/com/linkedin/thirdeye/datalayer/bao/TaskManager.java
@@ -44,4 +44,5 @@ void updateStatusAndTaskEndTime(Long id, TaskStatus oldStatus, TaskStatus newSta
 
   int deleteRecordsOlderThanDaysWithStatus(int days, TaskStatus status);
 
+  int countWaiting();
 }
diff --git a/thirdeye/thirdeye-pinot/src/main/java/com/linkedin/thirdeye/datalayer/bao/jdbc/TaskManagerImpl.java b/thirdeye/thirdeye-pinot/src/main/java/com/linkedin/thirdeye/datalayer/bao/jdbc/TaskManagerImpl.java
index d7a133c995..42782aa2c8 100644
--- a/thirdeye/thirdeye-pinot/src/main/java/com/linkedin/thirdeye/datalayer/bao/jdbc/TaskManagerImpl.java
+++ b/thirdeye/thirdeye-pinot/src/main/java/com/linkedin/thirdeye/datalayer/bao/jdbc/TaskManagerImpl.java
@@ -18,6 +18,10 @@
 
 import com.google.inject.Singleton;
 import com.linkedin.thirdeye.anomaly.task.TaskConstants;
+import java.sql.Connection;
+import java.sql.PreparedStatement;
+import java.sql.ResultSet;
+import java.sql.Statement;
 import java.sql.Timestamp;
 import java.util.ArrayList;
 import java.util.HashMap;
@@ -33,16 +37,23 @@
 import com.linkedin.thirdeye.datalayer.dto.TaskDTO;
 import com.linkedin.thirdeye.datalayer.pojo.TaskBean;
 import com.linkedin.thirdeye.datalayer.util.Predicate;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
 
 @Singleton
 public class TaskManagerImpl extends AbstractManagerImpl<TaskDTO> implements TaskManager {
-
   private static final String FIND_BY_STATUS_ORDER_BY_CREATE_TIME_ASC =
       " WHERE status = :status order by startTime asc limit 10";
 
   private static final String FIND_BY_STATUS_ORDER_BY_CREATE_TIME_DESC =
       " WHERE status = :status order by startTime desc limit 10";
 
+  private static final String COUNT_WAITING_TASKS =
+      "SELECT COUNT(*) FROM task_index WHERE status = 'WAITING'";
+
+  private static final Logger LOG = LoggerFactory.getLogger(TaskManagerImpl.class);
+
   public TaskManagerImpl() {
     super(TaskDTO.class, TaskBean.class);
   }
@@ -158,4 +169,18 @@ public int deleteRecordsOlderThanDaysWithStatus(int days, TaskStatus status) {
     Predicate timeoutTimestampPredicate = Predicate.LT("updateTime", timeoutTimestamp);
     return findByPredicate(Predicate.AND(statusPredicate, daysTimestampPredicate, timeoutTimestampPredicate));
   }
+
+  @Override
+  public int countWaiting() {
+    // NOTE: this aggregation should be supported by genericPojoDAO directly
+    // ensure each resource is closed at the end of the statement
+    try (Connection connection = this.genericPojoDao.getConnection();
+        PreparedStatement statement = connection.prepareStatement(COUNT_WAITING_TASKS);
+        ResultSet rs = statement.executeQuery()){
+      return rs.getInt(0);
+    } catch (Exception e) {
+      LOG.warn("Could not retrieve task backlog size. Defaulting to -1.", e);
+      return -1;
+    }
+  }
 }


 

----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
users@infra.apache.org


With regards,
Apache Git Services

---------------------------------------------------------------------
To unsubscribe, e-mail: dev-unsubscribe@pinot.apache.org
For additional commands, e-mail: dev-help@pinot.apache.org