You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@oozie.apache.org by as...@apache.org on 2019/05/03 10:19:48 UTC

[oozie] branch master updated: OOZIE-3393 Add Oozie instrumentation delayed metric in CoordMaterializeTriggerService (zuston via asalamon74)

This is an automated email from the ASF dual-hosted git repository.

asalamon74 pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/oozie.git


The following commit(s) were added to refs/heads/master by this push:
     new e69a1fa  OOZIE-3393 Add Oozie instrumentation delayed metric in CoordMaterializeTriggerService (zuston via asalamon74)
e69a1fa is described below

commit e69a1fa808c8468ab38636bfb5539289e62de3e0
Author: Andras Salamon <as...@apache.org>
AuthorDate: Fri May 3 12:19:28 2019 +0200

    OOZIE-3393 Add Oozie instrumentation delayed metric in CoordMaterializeTriggerService (zuston via asalamon74)
---
 .../service/CoordMaterializeTriggerService.java    | 40 +++++++++++++++++++---
 .../apache/oozie/util/MetricsInstrumentation.java  |  2 +-
 .../TestCoordMaterializeTriggerService.java        | 34 ++++++++++++++----
 release-log.txt                                    |  1 +
 4 files changed, 66 insertions(+), 11 deletions(-)

diff --git a/core/src/main/java/org/apache/oozie/service/CoordMaterializeTriggerService.java b/core/src/main/java/org/apache/oozie/service/CoordMaterializeTriggerService.java
index 1cbd474..1376943 100644
--- a/core/src/main/java/org/apache/oozie/service/CoordMaterializeTriggerService.java
+++ b/core/src/main/java/org/apache/oozie/service/CoordMaterializeTriggerService.java
@@ -18,6 +18,7 @@
 
 package org.apache.oozie.service;
 
+import java.sql.Timestamp;
 import java.util.ArrayList;
 import java.util.Date;
 import java.util.List;
@@ -25,21 +26,23 @@ import java.util.List;
 import org.apache.oozie.CoordinatorJobBean;
 import org.apache.oozie.command.coord.CoordMaterializeTransitionXCommand;
 import org.apache.oozie.executor.jpa.BatchQueryExecutor;
-import org.apache.oozie.executor.jpa.CoordJobQueryExecutor;
-import org.apache.oozie.executor.jpa.JPAExecutorException;
 import org.apache.oozie.executor.jpa.BatchQueryExecutor.UpdateEntry;
+import org.apache.oozie.executor.jpa.CoordJobQueryExecutor;
 import org.apache.oozie.executor.jpa.CoordJobQueryExecutor.CoordJobQuery;
+import org.apache.oozie.executor.jpa.JPAExecutorException;
 import org.apache.oozie.lock.LockToken;
+import org.apache.oozie.util.DateUtils;
+import org.apache.oozie.util.Instrumentable;
+import org.apache.oozie.util.Instrumentation;
 import org.apache.oozie.util.XCallable;
 import org.apache.oozie.util.XLog;
-import org.apache.oozie.util.DateUtils;
 
 /**
  * The coordinator Materialization Lookup trigger service schedule lookup trigger command for every interval (default is
  * 5 minutes ). This interval could be configured through oozie configuration defined is either oozie-default.xml or
  * oozie-site.xml using the property name oozie.service.CoordMaterializeTriggerService.lookup.interval
  */
-public class CoordMaterializeTriggerService implements Service {
+public class CoordMaterializeTriggerService implements Service, Instrumentable {
     public static final String CONF_PREFIX = Service.CONF_PREFIX + "CoordMaterializeTriggerService.";
     /**
      * Time interval, in seconds, at which the Job materialization service will be scheduled to run.
@@ -62,6 +65,11 @@ public class CoordMaterializeTriggerService implements Service {
 
     private static final String INSTRUMENTATION_GROUP = "coord_job_mat";
     private static final String INSTR_MAT_JOBS_COUNTER = "jobs";
+    private static final String INSTR_MAT_QUEUE_SIZE = "mat_queue_size";
+    private static final String INSTR_MAT_DELAYED_SIZE = "mat_delayed_size";
+
+    private static int currentMaterializedJobsCount = 0;
+    private static int currentMaterializedDelayedJobsCount = 0;
 
     /**
      * This runnable class will run in every "interval" to queue CoordMaterializeTransitionXCommand.
@@ -163,6 +171,8 @@ public class CoordMaterializeTriggerService implements Service {
                         CoordJobQuery.GET_COORD_JOBS_OLDER_FOR_MATERIALIZATION, currDate, limit);
                 LOG.info("CoordMaterializeTriggerService - Curr Date= " + DateUtils.formatDateOozieTZ(currDate)
                         + ", Num jobs to materialize = " + materializeJobs.size());
+                final long now = System.currentTimeMillis();
+                int delayedCount = 0;
                 for (CoordinatorJobBean coordJob : materializeJobs) {
                     Services.get().get(InstrumentationService.class).get()
                             .incr(INSTRUMENTATION_GROUP, INSTR_MAT_JOBS_COUNTER, 1);
@@ -170,7 +180,13 @@ public class CoordMaterializeTriggerService implements Service {
                     coordJob.setLastModifiedTime(new Date());
                     updateList.add(new UpdateEntry<CoordJobQuery>(CoordJobQuery.UPDATE_COORD_JOB_LAST_MODIFIED_TIME,
                             coordJob));
+                    Timestamp startTime = coordJob.getNextMaterializedTimestamp();
+                    if (startTime != null && startTime.getTime() > now) {
+                        delayedCount ++;
+                    }
                 }
+                currentMaterializedJobsCount = materializeJobs.size();
+                currentMaterializedDelayedJobsCount = delayedCount;
             }
             catch (JPAExecutorException jex) {
                 LOG.warn("JPAExecutorException while attempting to materialize coordinator jobs", jex);
@@ -227,4 +243,20 @@ public class CoordMaterializeTriggerService implements Service {
         return CoordMaterializeTriggerService.class;
     }
 
+    @Override
+    public void instrument(Instrumentation instr) {
+        instr.addVariable(INSTRUMENTATION_GROUP, INSTR_MAT_QUEUE_SIZE, new Instrumentation.Variable<Integer>() {
+            @Override
+            public Integer getValue() {
+                return currentMaterializedJobsCount;
+            }
+        });
+        instr.addVariable(INSTRUMENTATION_GROUP, INSTR_MAT_DELAYED_SIZE, new Instrumentation.Variable<Integer>() {
+            @Override
+            public Integer getValue() {
+                return currentMaterializedDelayedJobsCount;
+            }
+        });
+    }
+
 }
diff --git a/core/src/main/java/org/apache/oozie/util/MetricsInstrumentation.java b/core/src/main/java/org/apache/oozie/util/MetricsInstrumentation.java
index 6b19d60..98af977 100644
--- a/core/src/main/java/org/apache/oozie/util/MetricsInstrumentation.java
+++ b/core/src/main/java/org/apache/oozie/util/MetricsInstrumentation.java
@@ -395,7 +395,7 @@ public class MetricsInstrumentation extends Instrumentation {
      * @return the MetricRegistry
      */
     @VisibleForTesting
-    MetricRegistry getMetricRegistry() {
+    public MetricRegistry getMetricRegistry() {
         return metricRegistry;
     }
 
diff --git a/core/src/test/java/org/apache/oozie/service/TestCoordMaterializeTriggerService.java b/core/src/test/java/org/apache/oozie/service/TestCoordMaterializeTriggerService.java
index e5acce6..0b8f7ac 100644
--- a/core/src/test/java/org/apache/oozie/service/TestCoordMaterializeTriggerService.java
+++ b/core/src/test/java/org/apache/oozie/service/TestCoordMaterializeTriggerService.java
@@ -21,7 +21,7 @@ package org.apache.oozie.service;
 import java.io.IOException;
 import java.io.Reader;
 import java.util.Date;
-import java.util.List;
+import java.util.SortedMap;
 
 import org.apache.hadoop.conf.Configuration;
 import org.apache.hadoop.fs.Path;
@@ -30,23 +30,22 @@ import org.apache.oozie.client.CoordinatorAction;
 import org.apache.oozie.client.CoordinatorJob;
 import org.apache.oozie.client.CoordinatorJob.Execution;
 import org.apache.oozie.client.CoordinatorJob.Timeunit;
-import org.apache.oozie.client.Job;
-import org.apache.oozie.executor.jpa.CoordActionQueryExecutor;
-import org.apache.oozie.executor.jpa.CoordJobGetActionsJPAExecutor;
 import org.apache.oozie.executor.jpa.CoordJobGetJPAExecutor;
 import org.apache.oozie.executor.jpa.CoordJobGetRunningActionsCountJPAExecutor;
 import org.apache.oozie.executor.jpa.CoordJobQueryExecutor;
-import org.apache.oozie.executor.jpa.CoordActionQueryExecutor.CoordActionQuery;
 import org.apache.oozie.executor.jpa.CoordJobQueryExecutor.CoordJobQuery;
 import org.apache.oozie.service.CoordMaterializeTriggerService.CoordMaterializeTriggerRunnable;
 import org.apache.oozie.service.UUIDService.ApplicationType;
 import org.apache.oozie.test.XDataTestCase;
-import org.apache.oozie.test.XTestCase;
 import org.apache.oozie.util.DateUtils;
 import org.apache.oozie.util.IOUtils;
+import org.apache.oozie.util.MetricsInstrumentation;
 import org.apache.oozie.util.XLog;
 import org.apache.oozie.util.XmlUtils;
 
+import com.codahale.metrics.Gauge;
+import com.codahale.metrics.MetricRegistry;
+
 public class TestCoordMaterializeTriggerService extends XDataTestCase {
     private Services services;
     JPAService jpaService;
@@ -194,6 +193,29 @@ public class TestCoordMaterializeTriggerService extends XDataTestCase {
         });
     }
 
+    /**
+     * Test coord materialize job latency from Instrumentation, like mat_queue_size and mat_delayed_size
+     * @throws Exception
+     */
+    public void testCoordMaterializeTriggerService4() throws Exception {
+        Date start = DateUtils.parseDateOozieTZ("2009-02-01T01:00Z");
+        Date end = DateUtils.parseDateOozieTZ("2009-02-20T23:59Z");
+        final CoordinatorJobBean job = addRecordToCoordJobTable(CoordinatorJob.Status.PREP, start, end, false, false, 0);
+
+        waitForStatus(30000, job, CoordinatorJob.Status.PREP);
+        Runnable runnable = new CoordMaterializeTriggerRunnable(3600, 300);
+        runnable.run();
+        waitForStatus(10000, job, CoordinatorJob.Status.RUNNING);
+
+        MetricsInstrumentation instr  = (MetricsInstrumentation) Services.get().get(InstrumentationService.class).get();
+        String group = "coord_job_mat";
+        String matQueueSizeKey = MetricRegistry.name(group, "mat_queue_size");
+        String matDelayedSizeKey = MetricRegistry.name(group, "mat_delayed_size");
+        SortedMap<String, Gauge> gaugeSortedMap = instr.getMetricRegistry().getGauges();
+        assertNotNull("The metric of mat_queue_size should be not null, but not.", gaugeSortedMap.get(matQueueSizeKey));
+        assertNotNull("The metric of mat_delayed_size should be not null, but not.", gaugeSortedMap.get(matDelayedSizeKey));
+    }
+
     public void testMaxMatThrottleNotPickedMultipleJobs() throws Exception {
         Services.get().destroy();
         setSystemProperty(CoordMaterializeTriggerService.CONF_MATERIALIZATION_SYSTEM_LIMIT, "3");
diff --git a/release-log.txt b/release-log.txt
index c890d52..7abf8ae 100644
--- a/release-log.txt
+++ b/release-log.txt
@@ -1,5 +1,6 @@
 -- Oozie 5.2.0 release (trunk - unreleased)
 
+OOZIE-3393 Add Oozie instrumentation delayed metric in CoordMaterializeTriggerService (zuston via asalamon74)
 OOZIE-3477 Fix parameter checking in WorkflowStore.getWorkflowCountWithStatusInLastNSeconds (zuston via asalamon74)
 OOZIE-3470 PurgeXCommand coordActionDel variable assignment should be standardized (zuston via asalamon74)
 OOZIE-3463 Migrate from com.google.common.base.Charsets to java.nio.charset.StandardCharsets (nobigo via kmarton)