You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@oozie.apache.org by an...@apache.org on 2018/05/24 09:47:16 UTC

oozie git commit: OOZIE-3238 Flaky test TestStatusTransitService#testBundleStatusTransitWithLock (pbacsko via gezapeti, andras.piros)

Repository: oozie
Updated Branches:
  refs/heads/master 5334d0641 -> 830f0b541


OOZIE-3238 Flaky test TestStatusTransitService#testBundleStatusTransitWithLock (pbacsko via gezapeti, andras.piros)


Project: http://git-wip-us.apache.org/repos/asf/oozie/repo
Commit: http://git-wip-us.apache.org/repos/asf/oozie/commit/830f0b54
Tree: http://git-wip-us.apache.org/repos/asf/oozie/tree/830f0b54
Diff: http://git-wip-us.apache.org/repos/asf/oozie/diff/830f0b54

Branch: refs/heads/master
Commit: 830f0b5419be3bee527d4f1983782525e406e76c
Parents: 5334d06
Author: Andras Piros <an...@cloudera.com>
Authored: Thu May 24 11:46:31 2018 +0200
Committer: Andras Piros <an...@cloudera.com>
Committed: Thu May 24 11:46:31 2018 +0200

----------------------------------------------------------------------
 .../oozie/service/TestStatusTransitService.java | 60 ++++++++++----------
 release-log.txt                                 |  1 +
 2 files changed, 31 insertions(+), 30 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/oozie/blob/830f0b54/core/src/test/java/org/apache/oozie/service/TestStatusTransitService.java
----------------------------------------------------------------------
diff --git a/core/src/test/java/org/apache/oozie/service/TestStatusTransitService.java b/core/src/test/java/org/apache/oozie/service/TestStatusTransitService.java
index a92cd87..9315bad 100644
--- a/core/src/test/java/org/apache/oozie/service/TestStatusTransitService.java
+++ b/core/src/test/java/org/apache/oozie/service/TestStatusTransitService.java
@@ -19,6 +19,7 @@ package org.apache.oozie.service;
 
 import java.util.Date;
 
+import org.apache.commons.logging.LogFactory;
 import org.apache.hadoop.conf.Configuration;
 import org.apache.hadoop.fs.Path;
 import org.apache.oozie.BundleActionBean;
@@ -58,6 +59,8 @@ import org.apache.oozie.lock.LockToken;
 import org.apache.oozie.service.StatusTransitService.StatusTransitRunnable;
 import org.apache.oozie.test.XDataTestCase;
 import org.apache.oozie.util.DateUtils;
+import org.apache.oozie.util.LockerCoordinator;
+import org.apache.oozie.util.XLog;
 import org.apache.oozie.workflow.WorkflowApp;
 import org.apache.oozie.workflow.WorkflowInstance;
 import org.apache.oozie.workflow.lite.EndNodeDef;
@@ -1576,22 +1579,22 @@ public class TestStatusTransitService extends XDataTestCase {
         addRecordToBundleActionTable(bundleId, "action2-C", 0, Job.Status.RUNNING);
         addRecordToBundleActionTable(bundleId, "action3-C", 0, Job.Status.DONEWITHERROR);
 
-        JobLock lockThread = new JobLock(jobId);
+        LockerCoordinator coordinator = new LockerCoordinator();
+        JobLock lockThread = new JobLock(jobId, coordinator);
         new Thread(lockThread).start();
+        coordinator.awaitLockAcquire();
 
-        sleep(1000);
         Runnable runnable = new StatusTransitRunnable();
         runnable.run();
         bundleJob = BundleJobQueryExecutor.getInstance().get(BundleJobQuery.GET_BUNDLE_JOB_STATUS, bundleId);
         assertEquals(Job.Status.RUNNING, bundleJob.getStatus());
-        synchronized (lockThread) {
-            lockThread.notifyAll();
-        }
-        sleep(1000);
+
+        coordinator.signalLockerContinue();
+        coordinator.awaitTermination();
+
         runnable.run();
         bundleJob = BundleJobQueryExecutor.getInstance().get(BundleJobQuery.GET_BUNDLE_JOB_STATUS, bundleId);
         assertEquals(Job.Status.RUNNINGWITHERROR, bundleJob.getStatus());
-
     }
 
     public void testCoordStatusTransitWithLock() throws Exception {
@@ -1610,21 +1613,22 @@ public class TestStatusTransitService extends XDataTestCase {
                 "KILLED", 0);
 
         final CoordJobGetJPAExecutor coordJobGetCmd = new CoordJobGetJPAExecutor(coordJob.getId());
-        JobLock lockThread = new JobLock(coordJob.getId());
+        LockerCoordinator coordinator = new LockerCoordinator();
+        JobLock lockThread = new JobLock(coordJob.getId(), coordinator);
         new Thread(lockThread).start();
+        coordinator.awaitLockAcquire();
+
         Runnable runnable = new StatusTransitRunnable();
         runnable.run();
-        sleep(1000);
         coordJob = jpaService.execute(coordJobGetCmd);
         assertEquals(CoordinatorJob.Status.RUNNING, coordJob.getStatus());
 
-        synchronized (lockThread) {
-            lockThread.notifyAll();
-        }
+        coordinator.signalLockerContinue();
+        coordinator.awaitTermination();
+
         runnable.run();
         coordJob = jpaService.execute(coordJobGetCmd);
         assertEquals(CoordinatorJob.Status.RUNNINGWITHERROR, coordJob.getStatus());
-
     }
 
     public void testBundleStatusCoordSubmitFails() throws Exception {
@@ -1682,32 +1686,28 @@ public class TestStatusTransitService extends XDataTestCase {
     }
 
   static class JobLock implements Runnable {
-        String jobId;
+        private static XLog log = new XLog(LogFactory.getLog(JobLock.class));
+        private final String jobId;
+        private final LockerCoordinator coordinator;
 
-        public JobLock(String jobId) {
+        public JobLock(String jobId, LockerCoordinator coordinator) {
             this.jobId = jobId;
-        }
-
-        LockToken lock = null;
-
-        public void acquireLock() throws InterruptedException {
-            lock = Services.get().get(MemoryLocksService.class).getWriteLock(jobId, 0);
-        }
-
-        public void release() {
-            lock.release();
+            this.coordinator = coordinator;
         }
 
         @Override
         public void run() {
             try {
-                acquireLock();
-                synchronized (this) {
-                    this.wait();
-                }
-                release();
+                LockToken lock = Services.get().get(MemoryLocksService.class).getWriteLock(jobId, 0);
+                coordinator.lockAcquireDone();
+                coordinator.awaitContinueSignal();
+                lock.release();
             }
             catch (InterruptedException e) {
+                log.error("InterruptedException caught", e);
+            }
+            finally {
+                coordinator.terminated();
             }
         }
     }

http://git-wip-us.apache.org/repos/asf/oozie/blob/830f0b54/release-log.txt
----------------------------------------------------------------------
diff --git a/release-log.txt b/release-log.txt
index 4e60878..96c3ce7 100644
--- a/release-log.txt
+++ b/release-log.txt
@@ -1,5 +1,6 @@
 -- Oozie 5.1.0 release (trunk - unreleased)
 
+OOZIE-3238 Flaky test TestStatusTransitService#testBundleStatusTransitWithLock (pbacsko via gezapeti, andras.piros)
 OOZIE-3185 Upgrade org.apache.derby to 10.11.1.1 (PandaMonkey via andras.piros)
 OOZIE-2883 ProxyUserService: invalid configuration error message is misleading (yangfang via andras.piros)
 OOZIE-3237 Flaky test TestZKLocksService#testWriteReadLockThreads (pbacsko via andras.piros)