You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@oozie.apache.org by ro...@apache.org on 2014/05/28 19:38:57 UTC
git commit: OOZIE-1844 HA - Lock mechanism for
CoordMaterializeTriggerService (puru via rohini)
Repository: oozie
Updated Branches:
refs/heads/master 373a52ff2 -> 853a4af9f
OOZIE-1844 HA - Lock mechanism for CoordMaterializeTriggerService (puru via rohini)
Project: http://git-wip-us.apache.org/repos/asf/oozie/repo
Commit: http://git-wip-us.apache.org/repos/asf/oozie/commit/853a4af9
Tree: http://git-wip-us.apache.org/repos/asf/oozie/tree/853a4af9
Diff: http://git-wip-us.apache.org/repos/asf/oozie/diff/853a4af9
Branch: refs/heads/master
Commit: 853a4af9f06348e2bd95793e1d1b6c1e6a9949e1
Parents: 373a52f
Author: Rohini Palaniswamy <ro...@yahoo-inc.com>
Authored: Wed May 28 10:38:55 2014 -0700
Committer: Rohini Palaniswamy <ro...@yahoo-inc.com>
Committed: Wed May 28 10:38:55 2014 -0700
----------------------------------------------------------------------
.../service/CoordMaterializeTriggerService.java | 81 +++++++++++++-------
release-log.txt | 1 +
2 files changed, 55 insertions(+), 27 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/oozie/blob/853a4af9/core/src/main/java/org/apache/oozie/service/CoordMaterializeTriggerService.java
----------------------------------------------------------------------
diff --git a/core/src/main/java/org/apache/oozie/service/CoordMaterializeTriggerService.java b/core/src/main/java/org/apache/oozie/service/CoordMaterializeTriggerService.java
index ef3c3f4..d2b5a6c 100644
--- a/core/src/main/java/org/apache/oozie/service/CoordMaterializeTriggerService.java
+++ b/core/src/main/java/org/apache/oozie/service/CoordMaterializeTriggerService.java
@@ -27,6 +27,7 @@ import org.apache.oozie.command.coord.CoordMaterializeTransitionXCommand;
import org.apache.oozie.executor.jpa.CoordJobQueryExecutor;
import org.apache.oozie.executor.jpa.JPAExecutorException;
import org.apache.oozie.executor.jpa.CoordJobQueryExecutor.CoordJobQuery;
+import org.apache.oozie.lock.LockToken;
import org.apache.oozie.util.XCallable;
import org.apache.oozie.util.XLog;
import org.apache.oozie.util.DateUtils;
@@ -73,6 +74,8 @@ public class CoordMaterializeTriggerService implements Service {
private long delay = 0;
private List<XCallable<Void>> callables;
private List<XCallable<Void>> delayedCallables;
+ private XLog LOG = XLog.getLog(getClass());
+
public CoordMaterializeTriggerRunnable(int materializationWindow, int lookupInterval) {
this.materializationWindow = materializationWindow;
@@ -81,29 +84,54 @@ public class CoordMaterializeTriggerService implements Service {
@Override
public void run() {
- runCoordJobMatLookup();
+ LockToken lock = null;
- if (null != callables) {
- boolean ret = Services.get().get(CallableQueueService.class).queueSerial(callables);
- if (ret == false) {
- XLog.getLog(getClass()).warn(
- "Unable to queue the callables commands for CoordMaterializeTriggerRunnable. "
- + "Most possibly command queue is full. Queue size is :"
- + Services.get().get(CallableQueueService.class).queueSize());
+ // first check if there is some other running instance from the same service;
+ try {
+ lock = Services.get().get(MemoryLocksService.class)
+ .getWriteLock(CoordMaterializeTriggerService.class.getName(), lockTimeout);
+
+ if (lock != null) {
+ runCoordJobMatLookup();
+ if (null != callables) {
+ boolean ret = Services.get().get(CallableQueueService.class).queueSerial(callables);
+ if (ret == false) {
+ XLog.getLog(getClass()).warn(
+ "Unable to queue the callables commands for CoordMaterializeTriggerRunnable. "
+ + "Most possibly command queue is full. Queue size is :"
+ + Services.get().get(CallableQueueService.class).queueSize());
+ }
+ callables = null;
+ }
+ if (null != delayedCallables) {
+ boolean ret = Services.get().get(CallableQueueService.class)
+ .queueSerial(delayedCallables, this.delay);
+ if (ret == false) {
+ XLog.getLog(getClass()).warn(
+ "Unable to queue the delayedCallables commands for CoordMaterializeTriggerRunnable. "
+ + "Most possibly Callable queue is full. Queue size is :"
+ + Services.get().get(CallableQueueService.class).queueSize());
+ }
+ delayedCallables = null;
+ this.delay = 0;
+ }
+ }
+
+ else {
+ LOG.debug("Can't obtain lock, skipping");
}
- callables = null;
}
- if (null != delayedCallables) {
- boolean ret = Services.get().get(CallableQueueService.class).queueSerial(delayedCallables, this.delay);
- if (ret == false) {
- XLog.getLog(getClass()).warn(
- "Unable to queue the delayedCallables commands for CoordMaterializeTriggerRunnable. "
- + "Most possibly Callable queue is full. Queue size is :"
- + Services.get().get(CallableQueueService.class).queueSize());
+ catch (Exception e) {
+ LOG.error("Exception", e);
+ }
+ finally {
+ if (lock != null) {
+ lock.release();
+ LOG.info("Released lock for [{0}]", CoordMaterializeTriggerService.class.getName());
}
- delayedCallables = null;
- this.delay = 0;
+
}
+
}
/**
@@ -133,15 +161,14 @@ public class CoordMaterializeTriggerService implements Service {
LOG.info("CoordMaterializeTriggerService - Curr Date= " + DateUtils.formatDateOozieTZ(currDate)
+ ", Num jobs to materialize = " + materializeJobs.size());
for (CoordinatorJobBean coordJob : materializeJobs) {
- if (Services.get().get(JobsConcurrencyService.class).isJobIdForThisServer(coordJob.getId())) {
- Services.get().get(InstrumentationService.class).get()
- .incr(INSTRUMENTATION_GROUP, INSTR_MAT_JOBS_COUNTER, 1);
- queueCallable(new CoordMaterializeTransitionXCommand(coordJob.getId(), materializationWindow));
- coordJob.setLastModifiedTime(new Date());
- // TODO In place of calling single query, we should call bulk update.
- CoordJobQueryExecutor.getInstance().executeUpdate(
- CoordJobQueryExecutor.CoordJobQuery.UPDATE_COORD_JOB_LAST_MODIFIED_TIME, coordJob);
- }
+ Services.get().get(InstrumentationService.class).get()
+ .incr(INSTRUMENTATION_GROUP, INSTR_MAT_JOBS_COUNTER, 1);
+ queueCallable(new CoordMaterializeTransitionXCommand(coordJob.getId(), materializationWindow));
+ coordJob.setLastModifiedTime(new Date());
+ // TODO In place of calling single query, we should call bulk update.
+ CoordJobQueryExecutor.getInstance().executeUpdate(
+ CoordJobQueryExecutor.CoordJobQuery.UPDATE_COORD_JOB_LAST_MODIFIED_TIME, coordJob);
+
}
}
catch (JPAExecutorException jex) {
http://git-wip-us.apache.org/repos/asf/oozie/blob/853a4af9/release-log.txt
----------------------------------------------------------------------
diff --git a/release-log.txt b/release-log.txt
index 7d1d339..44e42d5 100644
--- a/release-log.txt
+++ b/release-log.txt
@@ -1,5 +1,6 @@
-- Oozie 4.1.0 release (trunk - unreleased)
+OOZIE-1844 HA - Lock mechanism for CoordMaterializeTriggerService (puru via rohini)
OOZIE-1834 sla should-start is supposed to be optional but it is not (rkanter)
OOZIE-1838 jdbc.connections.active sampler does not show up (rkanter)
OOZIE-1801 ZKLocksService instrumentation should say how many locks this server has (rkanter)