You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@oozie.apache.org by ro...@apache.org on 2014/05/28 19:38:57 UTC

git commit: OOZIE-1844 HA - Lock mechanism for CoordMaterializeTriggerService (puru via rohini)

Repository: oozie
Updated Branches:
  refs/heads/master 373a52ff2 -> 853a4af9f


OOZIE-1844 HA - Lock mechanism for CoordMaterializeTriggerService (puru via rohini)


Project: http://git-wip-us.apache.org/repos/asf/oozie/repo
Commit: http://git-wip-us.apache.org/repos/asf/oozie/commit/853a4af9
Tree: http://git-wip-us.apache.org/repos/asf/oozie/tree/853a4af9
Diff: http://git-wip-us.apache.org/repos/asf/oozie/diff/853a4af9

Branch: refs/heads/master
Commit: 853a4af9f06348e2bd95793e1d1b6c1e6a9949e1
Parents: 373a52f
Author: Rohini Palaniswamy <ro...@yahoo-inc.com>
Authored: Wed May 28 10:38:55 2014 -0700
Committer: Rohini Palaniswamy <ro...@yahoo-inc.com>
Committed: Wed May 28 10:38:55 2014 -0700

----------------------------------------------------------------------
 .../service/CoordMaterializeTriggerService.java | 81 +++++++++++++-------
 release-log.txt                                 |  1 +
 2 files changed, 55 insertions(+), 27 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/oozie/blob/853a4af9/core/src/main/java/org/apache/oozie/service/CoordMaterializeTriggerService.java
----------------------------------------------------------------------
diff --git a/core/src/main/java/org/apache/oozie/service/CoordMaterializeTriggerService.java b/core/src/main/java/org/apache/oozie/service/CoordMaterializeTriggerService.java
index ef3c3f4..d2b5a6c 100644
--- a/core/src/main/java/org/apache/oozie/service/CoordMaterializeTriggerService.java
+++ b/core/src/main/java/org/apache/oozie/service/CoordMaterializeTriggerService.java
@@ -27,6 +27,7 @@ import org.apache.oozie.command.coord.CoordMaterializeTransitionXCommand;
 import org.apache.oozie.executor.jpa.CoordJobQueryExecutor;
 import org.apache.oozie.executor.jpa.JPAExecutorException;
 import org.apache.oozie.executor.jpa.CoordJobQueryExecutor.CoordJobQuery;
+import org.apache.oozie.lock.LockToken;
 import org.apache.oozie.util.XCallable;
 import org.apache.oozie.util.XLog;
 import org.apache.oozie.util.DateUtils;
@@ -73,6 +74,8 @@ public class CoordMaterializeTriggerService implements Service {
         private long delay = 0;
         private List<XCallable<Void>> callables;
         private List<XCallable<Void>> delayedCallables;
+        private XLog LOG = XLog.getLog(getClass());
+
 
         public CoordMaterializeTriggerRunnable(int materializationWindow, int lookupInterval) {
             this.materializationWindow = materializationWindow;
@@ -81,29 +84,54 @@ public class CoordMaterializeTriggerService implements Service {
 
         @Override
         public void run() {
-            runCoordJobMatLookup();
+            LockToken lock = null;
 
-            if (null != callables) {
-                boolean ret = Services.get().get(CallableQueueService.class).queueSerial(callables);
-                if (ret == false) {
-                    XLog.getLog(getClass()).warn(
-                            "Unable to queue the callables commands for CoordMaterializeTriggerRunnable. "
-                                    + "Most possibly command queue is full. Queue size is :"
-                                    + Services.get().get(CallableQueueService.class).queueSize());
+            // first check if there is some other running instance from the same service;
+            try {
+                lock = Services.get().get(MemoryLocksService.class)
+                        .getWriteLock(CoordMaterializeTriggerService.class.getName(), lockTimeout);
+
+                if (lock != null) {
+                    runCoordJobMatLookup();
+                    if (null != callables) {
+                        boolean ret = Services.get().get(CallableQueueService.class).queueSerial(callables);
+                        if (ret == false) {
+                            XLog.getLog(getClass()).warn(
+                                    "Unable to queue the callables commands for CoordMaterializeTriggerRunnable. "
+                                            + "Most possibly command queue is full. Queue size is :"
+                                            + Services.get().get(CallableQueueService.class).queueSize());
+                        }
+                        callables = null;
+                    }
+                    if (null != delayedCallables) {
+                        boolean ret = Services.get().get(CallableQueueService.class)
+                                .queueSerial(delayedCallables, this.delay);
+                        if (ret == false) {
+                            XLog.getLog(getClass()).warn(
+                                    "Unable to queue the delayedCallables commands for CoordMaterializeTriggerRunnable. "
+                                            + "Most possibly Callable queue is full. Queue size is :"
+                                            + Services.get().get(CallableQueueService.class).queueSize());
+                        }
+                        delayedCallables = null;
+                        this.delay = 0;
+                    }
+                }
+
+                else {
+                    LOG.debug("Can't obtain lock, skipping");
                 }
-                callables = null;
             }
-            if (null != delayedCallables) {
-                boolean ret = Services.get().get(CallableQueueService.class).queueSerial(delayedCallables, this.delay);
-                if (ret == false) {
-                    XLog.getLog(getClass()).warn(
-                            "Unable to queue the delayedCallables commands for CoordMaterializeTriggerRunnable. "
-                                    + "Most possibly Callable queue is full. Queue size is :"
-                                    + Services.get().get(CallableQueueService.class).queueSize());
+            catch (Exception e) {
+                LOG.error("Exception", e);
+            }
+            finally {
+                if (lock != null) {
+                    lock.release();
+                    LOG.info("Released lock for [{0}]", CoordMaterializeTriggerService.class.getName());
                 }
-                delayedCallables = null;
-                this.delay = 0;
+
             }
+
         }
 
         /**
@@ -133,15 +161,14 @@ public class CoordMaterializeTriggerService implements Service {
                 LOG.info("CoordMaterializeTriggerService - Curr Date= " + DateUtils.formatDateOozieTZ(currDate)
                         + ", Num jobs to materialize = " + materializeJobs.size());
                 for (CoordinatorJobBean coordJob : materializeJobs) {
-                    if (Services.get().get(JobsConcurrencyService.class).isJobIdForThisServer(coordJob.getId())) {
-                        Services.get().get(InstrumentationService.class).get()
-                                .incr(INSTRUMENTATION_GROUP, INSTR_MAT_JOBS_COUNTER, 1);
-                        queueCallable(new CoordMaterializeTransitionXCommand(coordJob.getId(), materializationWindow));
-                        coordJob.setLastModifiedTime(new Date());
-                        // TODO In place of calling single query, we should call bulk update.
-                        CoordJobQueryExecutor.getInstance().executeUpdate(
-                                CoordJobQueryExecutor.CoordJobQuery.UPDATE_COORD_JOB_LAST_MODIFIED_TIME, coordJob);
-                    }
+                    Services.get().get(InstrumentationService.class).get()
+                            .incr(INSTRUMENTATION_GROUP, INSTR_MAT_JOBS_COUNTER, 1);
+                    queueCallable(new CoordMaterializeTransitionXCommand(coordJob.getId(), materializationWindow));
+                    coordJob.setLastModifiedTime(new Date());
+                    // TODO In place of calling single query, we should call bulk update.
+                    CoordJobQueryExecutor.getInstance().executeUpdate(
+                            CoordJobQueryExecutor.CoordJobQuery.UPDATE_COORD_JOB_LAST_MODIFIED_TIME, coordJob);
+
                 }
             }
             catch (JPAExecutorException jex) {

http://git-wip-us.apache.org/repos/asf/oozie/blob/853a4af9/release-log.txt
----------------------------------------------------------------------
diff --git a/release-log.txt b/release-log.txt
index 7d1d339..44e42d5 100644
--- a/release-log.txt
+++ b/release-log.txt
@@ -1,5 +1,6 @@
 -- Oozie 4.1.0 release (trunk - unreleased)
 
+OOZIE-1844 HA - Lock mechanism for CoordMaterializeTriggerService (puru via rohini)
 OOZIE-1834 sla should-start is supposed to be optional but it is not (rkanter)
 OOZIE-1838 jdbc.connections.active sampler does not show up (rkanter)
 OOZIE-1801 ZKLocksService instrumentation should say how many locks this server has (rkanter)