You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@oozie.apache.org by pu...@apache.org on 2016/01/26 19:10:08 UTC
oozie git commit: OOZIE-1922 MemoryLocksService fails if lock is
acquired multiple times in same thread and released
Repository: oozie
Updated Branches:
refs/heads/master 9c365b569 -> 70a5ffe4b
OOZIE-1922 MemoryLocksService fails if lock is acquired multiple times in same thread and released
Project: http://git-wip-us.apache.org/repos/asf/oozie/repo
Commit: http://git-wip-us.apache.org/repos/asf/oozie/commit/70a5ffe4
Tree: http://git-wip-us.apache.org/repos/asf/oozie/tree/70a5ffe4
Diff: http://git-wip-us.apache.org/repos/asf/oozie/diff/70a5ffe4
Branch: refs/heads/master
Commit: 70a5ffe4b029896df81aa49cd08bbaf9b0355a36
Parents: 9c365b5
Author: Purshotam Shah <pu...@yahoo-inc.com>
Authored: Tue Jan 26 10:10:01 2016 -0800
Committer: Purshotam Shah <pu...@yahoo-inc.com>
Committed: Tue Jan 26 10:10:01 2016 -0800
----------------------------------------------------------------------
.../java/org/apache/oozie/lock/MemoryLocks.java | 12 +-
.../oozie/service/MemoryLocksService.java | 7 ++
.../apache/oozie/service/ZKLocksService.java | 21 +++-
.../org/apache/oozie/lock/TestMemoryLocks.java | 116 +++++++++++++++++++
.../oozie/service/TestZKLocksService.java | 75 ++++++------
release-log.txt | 1 +
6 files changed, 182 insertions(+), 50 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/oozie/blob/70a5ffe4/core/src/main/java/org/apache/oozie/lock/MemoryLocks.java
----------------------------------------------------------------------
diff --git a/core/src/main/java/org/apache/oozie/lock/MemoryLocks.java b/core/src/main/java/org/apache/oozie/lock/MemoryLocks.java
index ee564b3..7d65ac0 100644
--- a/core/src/main/java/org/apache/oozie/lock/MemoryLocks.java
+++ b/core/src/main/java/org/apache/oozie/lock/MemoryLocks.java
@@ -52,13 +52,17 @@ public class MemoryLocks {
*/
@Override
public void release() {
- int val = rwLock.getQueueLength();
- if (val == 0) {
+ lock.unlock();
+ if (!isLockHeld()) {
synchronized (locks) {
- locks.remove(resource);
+ if (!isLockHeld()) {
+ locks.remove(resource);
+ }
}
}
- lock.unlock();
+ }
+ private boolean isLockHeld(){
+ return rwLock.hasQueuedThreads() || rwLock.isWriteLocked() || rwLock.getReadLockCount() > 0;
}
}
http://git-wip-us.apache.org/repos/asf/oozie/blob/70a5ffe4/core/src/main/java/org/apache/oozie/service/MemoryLocksService.java
----------------------------------------------------------------------
diff --git a/core/src/main/java/org/apache/oozie/service/MemoryLocksService.java b/core/src/main/java/org/apache/oozie/service/MemoryLocksService.java
index e3eccdb..d7c6a89 100644
--- a/core/src/main/java/org/apache/oozie/service/MemoryLocksService.java
+++ b/core/src/main/java/org/apache/oozie/service/MemoryLocksService.java
@@ -23,6 +23,8 @@ import org.apache.oozie.util.Instrumentation;
import org.apache.oozie.lock.LockToken;
import org.apache.oozie.lock.MemoryLocks;
+import com.google.common.annotations.VisibleForTesting;
+
/**
* Service that provides in-memory locks. Assumes no other Oozie servers are using the database.
*/
@@ -95,4 +97,9 @@ public class MemoryLocksService implements Service, Instrumentable {
public LockToken getWriteLock(String resource, long wait) throws InterruptedException {
return locks.getWriteLock(resource, wait);
}
+
+ @VisibleForTesting
+ public MemoryLocks getMemoryLocks() {
+ return locks;
+ }
}
http://git-wip-us.apache.org/repos/asf/oozie/blob/70a5ffe4/core/src/main/java/org/apache/oozie/service/ZKLocksService.java
----------------------------------------------------------------------
diff --git a/core/src/main/java/org/apache/oozie/service/ZKLocksService.java b/core/src/main/java/org/apache/oozie/service/ZKLocksService.java
index e3a6bcf..35fc8a6 100644
--- a/core/src/main/java/org/apache/oozie/service/ZKLocksService.java
+++ b/core/src/main/java/org/apache/oozie/service/ZKLocksService.java
@@ -197,13 +197,16 @@ public class ZKLocksService extends MemoryLocksService implements Service, Instr
public void release() {
try {
lock.release();
- int val = lock.getParticipantNodes().size();
- //TODO this might break, when count is zero and before we remove lock, same thread may ask for same lock.
- // Hashmap will return the lock, but eventually release will remove it from hashmap and a immediate getlock will
- //create a new instance. Will fix this as part of OOZIE-1922
- if (val == 0) {
+ if (zkLocks.get(resource) == null) {
+ return;
+ }
+ if (!isLockHeld()) {
synchronized (zkLocks) {
- zkLocks.remove(resource);
+ if (zkLocks.get(resource) != null) {
+ if (!isLockHeld()) {
+ zkLocks.remove(resource);
+ }
+ }
}
}
}
@@ -212,6 +215,12 @@ public class ZKLocksService extends MemoryLocksService implements Service, Instr
}
}
+
+ private boolean isLockHeld() {
+ return zkLocks.get(resource).readLock().isAcquiredInThisProcess()
+ || zkLocks.get(resource).writeLock().isAcquiredInThisProcess();
+ }
+
}
@VisibleForTesting
http://git-wip-us.apache.org/repos/asf/oozie/blob/70a5ffe4/core/src/test/java/org/apache/oozie/lock/TestMemoryLocks.java
----------------------------------------------------------------------
diff --git a/core/src/test/java/org/apache/oozie/lock/TestMemoryLocks.java b/core/src/test/java/org/apache/oozie/lock/TestMemoryLocks.java
index 0efe310..61fec19 100644
--- a/core/src/test/java/org/apache/oozie/lock/TestMemoryLocks.java
+++ b/core/src/test/java/org/apache/oozie/lock/TestMemoryLocks.java
@@ -18,6 +18,10 @@
package org.apache.oozie.lock;
+import java.util.UUID;
+import org.apache.oozie.service.MemoryLocksService;
+import org.apache.oozie.service.ServiceException;
+import org.apache.oozie.service.Services;
import org.apache.oozie.test.XTestCase;
import org.apache.oozie.util.XLog;
@@ -219,4 +223,116 @@ public class TestMemoryLocks extends XTestCase {
assertEquals("a:1-L a:1-U a:2-L a:2-U", sb.toString().trim());
}
+ public class SameThreadWriteLocker implements Runnable {
+ protected String name;
+ private String nameIndex;
+ private StringBuffer sb;
+ protected long timeout;
+
+ public SameThreadWriteLocker(String name, int nameIndex, long timeout, StringBuffer buffer) {
+ this.name = name;
+ this.nameIndex = name + ":" + nameIndex;
+ this.sb = buffer;
+ this.timeout = timeout;
+ }
+
+ public void run() {
+ try {
+ log.info("Getting lock [{0}]", nameIndex);
+ MemoryLocks.MemoryLockToken token = getLock();
+ MemoryLocks.MemoryLockToken token2 = getLock();
+
+ if (token != null) {
+ log.info("Got lock [{0}]", nameIndex);
+ sb.append(nameIndex + "-L1 ");
+ if (token2 != null) {
+ sb.append(nameIndex + "-L2 ");
+ }
+ sb.append(nameIndex + "-U1 ");
+ token.release();
+ synchronized (this) {
+ wait();
+ }
+ sb.append(nameIndex + "-U2 ");
+ token2.release();
+ log.info("Release lock [{0}]", nameIndex);
+ }
+ else {
+ sb.append(nameIndex + "-N ");
+ log.info("Did not get lock [{0}]", nameIndex);
+ }
+ }
+ catch (Exception ex) {
+ throw new RuntimeException(ex);
+ }
+ }
+
+ public void finish() {
+ synchronized (this) {
+ notify();
+ }
+ }
+
+ protected MemoryLocks.MemoryLockToken getLock() throws InterruptedException {
+ return locks.getWriteLock(name, timeout);
+ }
+
+ }
+
+ public void testWriteLockSameThreadNoWait() throws Exception {
+ StringBuffer sb = new StringBuffer("");
+ SameThreadWriteLocker l1 = new SameThreadWriteLocker("a", 1, 0, sb);
+ Locker l2 = new WriteLocker("a", 2, 0, sb);
+
+ new Thread(l1).start();
+ Thread.sleep(500);
+ new Thread(l2).start();
+ Thread.sleep(500);
+ l1.finish();
+ Thread.sleep(500);
+ l2.finish();
+ Thread.sleep(500);
+ assertEquals("a:1-L1 a:1-L2 a:1-U1 a:2-N a:1-U2", sb.toString().trim());
+ }
+
+ public void testWriteLockSameThreadWait() throws Exception {
+ StringBuffer sb = new StringBuffer("");
+ SameThreadWriteLocker l1 = new SameThreadWriteLocker("a", 1, 0, sb);
+ Locker l2 = new WriteLocker("a", 2, 1000, sb);
+
+ new Thread(l1).start();
+ Thread.sleep(500);
+ new Thread(l2).start();
+ Thread.sleep(500);
+ l1.finish();
+ Thread.sleep(500);
+ l2.finish();
+ Thread.sleep(500);
+ assertEquals("a:1-L1 a:1-L2 a:1-U1 a:1-U2 a:2-L a:2-U", sb.toString().trim());
+ }
+
+ public void testLockReentrant() throws ServiceException, InterruptedException {
+ final String path = UUID.randomUUID().toString();
+ MemoryLocksService lockService = new MemoryLocksService();
+ try {
+ lockService.init(Services.get());
+ LockToken lock = lockService.getWriteLock(path, 5000);
+ lock = (LockToken) lockService.getWriteLock(path, 5000);
+ lock = (LockToken) lockService.getWriteLock(path, 5000);
+ assertEquals(lockService.getMemoryLocks().size(), 1);
+ lock.release();
+ assertEquals(lockService.getMemoryLocks().size(), 1);
+ lock.release();
+ assertEquals(lockService.getMemoryLocks().size(), 1);
+ lock.release();
+ assertEquals(lockService.getMemoryLocks().size(), 0);
+ }
+ catch (Exception e) {
+ fail("Reentrant property, it should have acquired lock");
+ }
+ finally {
+ lockService.destroy();
+ }
+ }
+
}
http://git-wip-us.apache.org/repos/asf/oozie/blob/70a5ffe4/core/src/test/java/org/apache/oozie/service/TestZKLocksService.java
----------------------------------------------------------------------
diff --git a/core/src/test/java/org/apache/oozie/service/TestZKLocksService.java b/core/src/test/java/org/apache/oozie/service/TestZKLocksService.java
index 02cc137..70aa4d7 100644
--- a/core/src/test/java/org/apache/oozie/service/TestZKLocksService.java
+++ b/core/src/test/java/org/apache/oozie/service/TestZKLocksService.java
@@ -464,49 +464,16 @@ public class TestZKLocksService extends ZKXTestCase {
public void testReentrantMultipleThread() throws ServiceException, InterruptedException {
final String path = UUID.randomUUID().toString();
final ZKLocksService zkls = new ZKLocksService();
- final LockToken[] locks = new LockToken[2];
-
+ zkls.init(Services.get());
try {
- zkls.init(Services.get());
- Thread t1 = new Thread() {
- public void run() {
- try {
- locks[0] = zkls.getWriteLock(path, 5000);
- }
- catch (InterruptedException e) {
- e.printStackTrace();
- }
- }
- };
- Thread t2 = new Thread() {
- public void run() {
- try {
- locks[1] = zkls.getWriteLock(path, 5000);
- }
- catch (InterruptedException e) {
- e.printStackTrace();
- }
- }
- };
+ ThreadLock t1 = new ThreadLock(zkls, path);
+ ThreadLock t2 = new ThreadLock(zkls, path);
t1.start();
- t2.start();
t1.join();
+ assertFalse(zkls.getLocks().containsKey(path));
+ t2.start();
t2.join();
-
- if (locks[0] != null) {
- assertNull(locks[1]);
- }
- if (locks[1] != null) {
- assertNull(locks[0]);
- }
-
- if (locks[0] != null) {
- locks[0].release();
- }
- if (locks[1] != null) {
- locks[1].release();
- }
- assertTrue(zkls.getLocks().containsKey(path));
+ assertFalse(zkls.getLocks().containsKey(path));
}
finally {
zkls.destroy();
@@ -514,8 +481,9 @@ public class TestZKLocksService extends ZKXTestCase {
}
public void testLockReaper() throws Exception {
- Services.get().getConf().set(ZKLocksService.REAPING_THRESHOLD, "1");
+ ConfigurationService.set(ZKLocksService.REAPING_THRESHOLD, "1");
ZKLocksService zkls = new ZKLocksService();
+
try {
zkls.init(Services.get());
for (int i = 0; i < 10; ++i) {
@@ -531,4 +499,31 @@ public class TestZKLocksService extends ZKXTestCase {
zkls.destroy();
}
}
+
+ static class ThreadLock extends Thread {
+ ZKLocksService zkls;
+ String path;
+ LockToken lock = null;
+
+ public ThreadLock(ZKLocksService zkls, String path) {
+ this.zkls = zkls;
+ this.path = path;
+
+ }
+
+ public void run() {
+ try {
+ lock = zkls.getWriteLock(path, 5000);
+ if (lock != null) {
+ lock = zkls.getWriteLock(path, 5000);
+ Thread.sleep(1000);
+ lock.release();
+ Thread.sleep(1000);
+ lock.release();
+ }
+ }
+ catch (InterruptedException e) {
+ }
+ }
+ }
}
\ No newline at end of file
http://git-wip-us.apache.org/repos/asf/oozie/blob/70a5ffe4/release-log.txt
----------------------------------------------------------------------
diff --git a/release-log.txt b/release-log.txt
index 1b675bb..6dac28b 100644
--- a/release-log.txt
+++ b/release-log.txt
@@ -1,5 +1,6 @@
-- Oozie 4.3.0 release (trunk - unreleased)
+OOZIE-1922 MemoryLocksService fails if lock is acquired multiple times in same thread and released (puru)
OOZIE-2432 TestPurgeXCommand fails (fdenes via rkanter)
OOZIE-2434 inconsistent coord action status and workflow job status (satishsaley via puru)
OOZIE-2438 Oozie client "jobs -filter" diagnostic message clarification (satishsaley via puru)