You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@oozie.apache.org by ge...@apache.org on 2016/10/10 11:52:40 UTC

[41/50] [abbrv] oozie git commit: OOZIE-2501 ZK reentrant lock doesn't work for few cases

OOZIE-2501 ZK reentrant lock doesn't work for few cases


Project: http://git-wip-us.apache.org/repos/asf/oozie/repo
Commit: http://git-wip-us.apache.org/repos/asf/oozie/commit/d330d406
Tree: http://git-wip-us.apache.org/repos/asf/oozie/tree/d330d406
Diff: http://git-wip-us.apache.org/repos/asf/oozie/diff/d330d406

Branch: refs/heads/oya
Commit: d330d40665a3b42744db20dfc5d9a80ad5f9b439
Parents: e8a9b24
Author: Purshotam Shah <pu...@yahoo-inc.com>
Authored: Tue Sep 27 12:21:26 2016 -0700
Committer: Purshotam Shah <pu...@yahoo-inc.com>
Committed: Tue Sep 27 12:21:26 2016 -0700

----------------------------------------------------------------------
 .../oozie/command/wf/ActionStartXCommand.java   |  10 ++
 .../java/org/apache/oozie/lock/MemoryLocks.java |  82 ++++++-------
 .../oozie/service/MemoryLocksService.java       |   9 +-
 .../apache/oozie/service/ZKLocksService.java    |  85 +++++---------
 .../org/apache/oozie/lock/TestMemoryLocks.java  |  60 ++++++++--
 .../oozie/service/TestZKLocksService.java       | 115 ++++++++++++++-----
 release-log.txt                                 |   1 +
 7 files changed, 218 insertions(+), 144 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/oozie/blob/d330d406/core/src/main/java/org/apache/oozie/command/wf/ActionStartXCommand.java
----------------------------------------------------------------------
diff --git a/core/src/main/java/org/apache/oozie/command/wf/ActionStartXCommand.java b/core/src/main/java/org/apache/oozie/command/wf/ActionStartXCommand.java
index 41f4430..edfac48 100644
--- a/core/src/main/java/org/apache/oozie/command/wf/ActionStartXCommand.java
+++ b/core/src/main/java/org/apache/oozie/command/wf/ActionStartXCommand.java
@@ -21,6 +21,7 @@ package org.apache.oozie.command.wf;
 import java.util.ArrayList;
 import java.util.Date;
 import java.util.List;
+
 import javax.servlet.jsp.el.ELException;
 
 import org.apache.hadoop.conf.Configuration;
@@ -41,6 +42,7 @@ import org.apache.oozie.client.SLAEvent.Status;
 import org.apache.oozie.client.rest.JsonBean;
 import org.apache.oozie.command.CommandException;
 import org.apache.oozie.command.PreconditionException;
+import org.apache.oozie.command.XCommand;
 import org.apache.oozie.executor.jpa.BatchQueryExecutor.UpdateEntry;
 import org.apache.oozie.executor.jpa.BatchQueryExecutor;
 import org.apache.oozie.executor.jpa.JPAExecutorException;
@@ -399,4 +401,12 @@ public class ActionStartXCommand extends ActionXCommand<org.apache.oozie.command
         queue(new ActionStartXCommand(wfAction.getId(), wfAction.getType()), retryDelayMillis);
     }
 
+    protected void queue(XCommand<?> command, long msDelay) {
+        // ActionStartXCommand is synchronously called from SignalXCommand passing wfJob so that it doesn't have to
+        //reload wfJob again. We need set wfJob to null, so that it get reloaded when the requeued command executes.
+        if (command instanceof ActionStartXCommand) {
+            ((ActionStartXCommand)command).wfJob = null;
+        }
+        super.queue(command, msDelay);
+    }
 }

http://git-wip-us.apache.org/repos/asf/oozie/blob/d330d406/core/src/main/java/org/apache/oozie/lock/MemoryLocks.java
----------------------------------------------------------------------
diff --git a/core/src/main/java/org/apache/oozie/lock/MemoryLocks.java b/core/src/main/java/org/apache/oozie/lock/MemoryLocks.java
index 7d65ac0..1ef1e41 100644
--- a/core/src/main/java/org/apache/oozie/lock/MemoryLocks.java
+++ b/core/src/main/java/org/apache/oozie/lock/MemoryLocks.java
@@ -18,33 +18,32 @@
 
 package org.apache.oozie.lock;
 
-import java.util.HashMap;
+import java.util.concurrent.ConcurrentMap;
 import java.util.concurrent.TimeUnit;
 import java.util.concurrent.locks.ReentrantReadWriteLock;
 import java.util.concurrent.locks.Lock;
+import org.apache.oozie.service.MemoryLocksService.Type;
+
+import com.google.common.collect.MapMaker;
 
 /**
  * In memory resource locking that provides READ/WRITE lock capabilities.
  */
 public class MemoryLocks {
-    final private HashMap<String, ReentrantReadWriteLock> locks = new HashMap<String, ReentrantReadWriteLock>();
 
-    private static enum Type {
-        READ, WRITE
-    }
+    final private ConcurrentMap<String, ReentrantReadWriteLock> locks = new MapMaker().weakValues().makeMap();
 
     /**
      * Implementation of {@link LockToken} for in memory locks.
      */
     class MemoryLockToken implements LockToken {
-        private final ReentrantReadWriteLock rwLock;
-        private final java.util.concurrent.locks.Lock lock;
-        private final String resource;
+        private final ReentrantReadWriteLock lockEntry;
+        private final Type type;
+
+        public MemoryLockToken(ReentrantReadWriteLock lockEntry, Type type) {
+            this.lockEntry = lockEntry;
+            this.type = type;
 
-        private MemoryLockToken(ReentrantReadWriteLock rwLock, java.util.concurrent.locks.Lock lock, String resource) {
-            this.rwLock = rwLock;
-            this.lock = lock;
-            this.resource = resource;
         }
 
         /**
@@ -52,18 +51,15 @@ public class MemoryLocks {
          */
         @Override
         public void release() {
-            lock.unlock();
-            if (!isLockHeld()) {
-                synchronized (locks) {
-                    if (!isLockHeld()) {
-                        locks.remove(resource);
-                    }
-                }
+            switch (type) {
+                case WRITE:
+                    lockEntry.writeLock().unlock();
+                    break;
+                case READ:
+                    lockEntry.readLock().unlock();
+                    break;
             }
         }
-        private boolean isLockHeld(){
-            return rwLock.hasQueuedThreads() || rwLock.isWriteLocked() || rwLock.getReadLockCount() > 0;
-        }
     }
 
     /**
@@ -76,41 +72,23 @@ public class MemoryLocks {
     }
 
     /**
-     * Obtain a READ lock for a source.
+     * Obtain a lock for a source.
      *
      * @param resource resource name.
+     * @param type lock type.
      * @param wait time out in milliseconds to wait for the lock, -1 means no timeout and 0 no wait.
      * @return the lock token for the resource, or <code>null</code> if the lock could not be obtained.
      * @throws InterruptedException thrown if the thread was interrupted while waiting.
      */
-    public MemoryLockToken getReadLock(String resource, long wait) throws InterruptedException {
-        return getLock(resource, Type.READ, wait);
-    }
-
-    /**
-     * Obtain a WRITE lock for a source.
-     *
-     * @param resource resource name.
-     * @param wait time out in milliseconds to wait for the lock, -1 means no timeout and 0 no wait.
-     * @return the lock token for the resource, or <code>null</code> if the lock could not be obtained.
-     * @throws InterruptedException thrown if the thread was interrupted while waiting.
-     */
-    public MemoryLockToken getWriteLock(String resource, long wait) throws InterruptedException {
-        return getLock(resource, Type.WRITE, wait);
-    }
-
-    private MemoryLockToken getLock(String resource, Type type, long wait) throws InterruptedException {
-        ReentrantReadWriteLock lockEntry;
-        synchronized (locks) {
-            if (locks.containsKey(resource)) {
-                lockEntry = locks.get(resource);
-            }
-            else {
-                lockEntry = new ReentrantReadWriteLock(true);
-                locks.put(resource, lockEntry);
+    public MemoryLockToken getLock(final String resource, Type type, long wait) throws InterruptedException {
+        ReentrantReadWriteLock lockEntry = locks.get(resource);
+        if (lockEntry == null) {
+            ReentrantReadWriteLock newLock = new ReentrantReadWriteLock(true);
+            lockEntry = locks.putIfAbsent(resource, newLock);
+            if (lockEntry == null) {
+                lockEntry = newLock;
             }
         }
-
         Lock lock = (type.equals(Type.READ)) ? lockEntry.readLock() : lockEntry.writeLock();
 
         if (wait == -1) {
@@ -133,6 +111,10 @@ public class MemoryLocks {
                 locks.put(resource, lockEntry);
             }
         }
-        return new MemoryLockToken(lockEntry, lock, resource);
+        return new MemoryLockToken(lockEntry, type);
+    }
+
+    public ConcurrentMap<String, ReentrantReadWriteLock> getLockMap(){
+        return locks;
     }
 }

http://git-wip-us.apache.org/repos/asf/oozie/blob/d330d406/core/src/main/java/org/apache/oozie/service/MemoryLocksService.java
----------------------------------------------------------------------
diff --git a/core/src/main/java/org/apache/oozie/service/MemoryLocksService.java b/core/src/main/java/org/apache/oozie/service/MemoryLocksService.java
index d7c6a89..2ab2abc 100644
--- a/core/src/main/java/org/apache/oozie/service/MemoryLocksService.java
+++ b/core/src/main/java/org/apache/oozie/service/MemoryLocksService.java
@@ -29,6 +29,11 @@ import com.google.common.annotations.VisibleForTesting;
  * Service that provides in-memory locks.  Assumes no other Oozie servers are using the database.
  */
 public class MemoryLocksService implements Service, Instrumentable {
+
+    public static enum Type {
+        READ, WRITE
+    }
+
     protected static final String INSTRUMENTATION_GROUP = "locks";
     private MemoryLocks locks;
 
@@ -83,7 +88,7 @@ public class MemoryLocksService implements Service, Instrumentable {
      * @throws InterruptedException thrown if the thread was interrupted while waiting.
      */
     public LockToken getReadLock(String resource, long wait) throws InterruptedException {
-        return locks.getReadLock(resource, wait);
+        return locks.getLock(resource, Type.READ, wait);
     }
 
     /**
@@ -95,7 +100,7 @@ public class MemoryLocksService implements Service, Instrumentable {
      * @throws InterruptedException thrown if the thread was interrupted while waiting.
      */
     public LockToken getWriteLock(String resource, long wait) throws InterruptedException {
-        return locks.getWriteLock(resource, wait);
+        return locks.getLock(resource, Type.WRITE, wait);
     }
 
     @VisibleForTesting

http://git-wip-us.apache.org/repos/asf/oozie/blob/d330d406/core/src/main/java/org/apache/oozie/service/ZKLocksService.java
----------------------------------------------------------------------
diff --git a/core/src/main/java/org/apache/oozie/service/ZKLocksService.java b/core/src/main/java/org/apache/oozie/service/ZKLocksService.java
index 952b90d..8acbad9 100644
--- a/core/src/main/java/org/apache/oozie/service/ZKLocksService.java
+++ b/core/src/main/java/org/apache/oozie/service/ZKLocksService.java
@@ -17,7 +17,7 @@
  */
 package org.apache.oozie.service;
 
-import java.util.HashMap;
+import java.util.concurrent.ConcurrentMap;
 import java.util.concurrent.TimeUnit;
 
 import org.apache.curator.framework.recipes.locks.InterProcessMutex;
@@ -39,6 +39,7 @@ import org.apache.curator.framework.state.ConnectionState;
 import org.apache.curator.utils.ThreadUtils;
 
 import com.google.common.annotations.VisibleForTesting;
+import com.google.common.collect.MapMaker;
 
 /**
  * Service that provides distributed locks via ZooKeeper.  Requires that a ZooKeeper ensemble is available.  The locks will be
@@ -51,7 +52,8 @@ public class ZKLocksService extends MemoryLocksService implements Service, Instr
     private static XLog LOG = XLog.getLog(ZKLocksService.class);
     public static final String LOCKS_NODE = "/locks";
 
-    final private HashMap<String, InterProcessReadWriteLock> zkLocks = new HashMap<String, InterProcessReadWriteLock>();
+    private ConcurrentMap<String, InterProcessReadWriteLock> zkLocks = new MapMaker().weakValues().makeMap();
+
 
     private static final String REAPING_LEADER_PATH = ZKUtils.ZK_BASE_SERVICES_PATH + "/locksChildReaperLeaderPath";
     public static final String REAPING_THRESHOLD = CONF_PREFIX + "ZKLocksService.locks.reaper.threshold";
@@ -123,18 +125,7 @@ public class ZKLocksService extends MemoryLocksService implements Service, Instr
      */
     @Override
     public LockToken getReadLock(String resource, long wait) throws InterruptedException {
-        InterProcessReadWriteLock lockEntry;
-        synchronized (zkLocks) {
-            if (zkLocks.containsKey(resource)) {
-                lockEntry = zkLocks.get(resource);
-            }
-            else {
-                lockEntry = new InterProcessReadWriteLock(zk.getClient(), LOCKS_NODE + "/" + resource);
-                zkLocks.put(resource, lockEntry);
-            }
-        }
-        InterProcessMutex readLock = lockEntry.readLock();
-        return acquireLock(wait, readLock, resource);
+        return acquireLock(resource, Type.READ, wait);
     }
 
     /**
@@ -147,29 +138,27 @@ public class ZKLocksService extends MemoryLocksService implements Service, Instr
      */
     @Override
     public LockToken getWriteLock(String resource, long wait) throws InterruptedException {
-        InterProcessReadWriteLock lockEntry;
-        synchronized (zkLocks) {
-            if (zkLocks.containsKey(resource)) {
-                lockEntry = zkLocks.get(resource);
-            }
-            else {
-                lockEntry = new InterProcessReadWriteLock(zk.getClient(), LOCKS_NODE + "/" + resource);
-                zkLocks.put(resource, lockEntry);
-            }
-        }
-        InterProcessMutex writeLock = lockEntry.writeLock();
-        return acquireLock(wait, writeLock, resource);
+        return acquireLock(resource, Type.WRITE, wait);
     }
 
-    private LockToken acquireLock(long wait, InterProcessMutex lock, String resource) {
+    private LockToken acquireLock(final String resource, Type type, long wait) throws InterruptedException {
+        InterProcessReadWriteLock lockEntry = zkLocks.get(resource);
+        if (lockEntry == null) {
+            InterProcessReadWriteLock newLock = new InterProcessReadWriteLock(zk.getClient(), LOCKS_NODE + "/" + resource);
+            lockEntry = zkLocks.putIfAbsent(resource, newLock);
+            if (lockEntry == null) {
+                lockEntry = newLock;
+            }
+        }
+        InterProcessMutex lock = (type.equals(Type.READ)) ? lockEntry.readLock() : lockEntry.writeLock();
         ZKLockToken token = null;
         try {
             if (wait == -1) {
                 lock.acquire();
-                token = new ZKLockToken(lock, resource);
+                token = new ZKLockToken(lockEntry, type);
             }
             else if (lock.acquire(wait, TimeUnit.MILLISECONDS)) {
-                token = new ZKLockToken(lock, resource);
+                token = new ZKLockToken(lockEntry, type);
             }
         }
         catch (Exception ex) {
@@ -183,12 +172,12 @@ public class ZKLocksService extends MemoryLocksService implements Service, Instr
      * Implementation of {@link LockToken} for zookeeper locks.
      */
     class ZKLockToken implements LockToken {
-        private final InterProcessMutex lock;
-        private final String resource;
+        private final InterProcessReadWriteLock lockEntry;
+        private final Type type;
 
-        private ZKLockToken(InterProcessMutex lock, String resource) {
-            this.lock = lock;
-            this.resource = resource;
+        private ZKLockToken(InterProcessReadWriteLock lockEntry, Type type) {
+            this.lockEntry = lockEntry;
+            this.type = type;
         }
 
         /**
@@ -197,35 +186,23 @@ public class ZKLocksService extends MemoryLocksService implements Service, Instr
         @Override
         public void release() {
             try {
-                lock.release();
-                if (zkLocks.get(resource) == null) {
-                    return;
-                }
-                if (!isLockHeld()) {
-                    synchronized (zkLocks) {
-                        if (zkLocks.get(resource) != null) {
-                            if (!isLockHeld()) {
-                                zkLocks.remove(resource);
-                            }
-                        }
-                    }
+                switch (type) {
+                    case WRITE:
+                        lockEntry.writeLock().release();
+                        break;
+                    case READ:
+                        lockEntry.readLock().release();
+                        break;
                 }
             }
             catch (Exception ex) {
                 LOG.warn("Could not release lock: " + ex.getMessage(), ex);
             }
-
         }
-
-        private boolean isLockHeld() {
-            return zkLocks.get(resource).readLock().isAcquiredInThisProcess()
-                    || zkLocks.get(resource).writeLock().isAcquiredInThisProcess();
-        }
-
     }
 
     @VisibleForTesting
-    public HashMap<String, InterProcessReadWriteLock> getLocks(){
+    public ConcurrentMap<String, InterProcessReadWriteLock> getLocks(){
         return zkLocks;
     }
 

http://git-wip-us.apache.org/repos/asf/oozie/blob/d330d406/core/src/test/java/org/apache/oozie/lock/TestMemoryLocks.java
----------------------------------------------------------------------
diff --git a/core/src/test/java/org/apache/oozie/lock/TestMemoryLocks.java b/core/src/test/java/org/apache/oozie/lock/TestMemoryLocks.java
index f0a87e5..8c7b58e 100644
--- a/core/src/test/java/org/apache/oozie/lock/TestMemoryLocks.java
+++ b/core/src/test/java/org/apache/oozie/lock/TestMemoryLocks.java
@@ -23,6 +23,7 @@ import java.util.concurrent.CountDownLatch;
 import java.util.concurrent.TimeUnit;
 
 import org.apache.oozie.service.MemoryLocksService;
+import org.apache.oozie.service.MemoryLocksService.Type;
 import org.apache.oozie.service.ServiceException;
 import org.apache.oozie.service.Services;
 import org.apache.oozie.test.XTestCase;
@@ -31,6 +32,7 @@ import org.apache.oozie.util.XLog;
 public class TestMemoryLocks extends XTestCase {
     private static final int LATCH_TIMEOUT = 10;
     private XLog log = XLog.getLog(getClass());
+    public static final int DEFAULT_LOCK_TIMEOUT = 5 * 1000;
 
     private MemoryLocks locks;
 
@@ -118,7 +120,7 @@ public class TestMemoryLocks extends XTestCase {
         }
 
         protected MemoryLocks.MemoryLockToken getLock() throws InterruptedException {
-            return locks.getReadLock(name, timeout);
+            return locks.getLock(name, Type.READ, timeout);
         }
     }
 
@@ -129,7 +131,7 @@ public class TestMemoryLocks extends XTestCase {
         }
 
         protected MemoryLocks.MemoryLockToken getLock() throws InterruptedException {
-            return locks.getWriteLock(name, timeout);
+            return locks.getLock(name, Type.WRITE, timeout);
         }
     }
 
@@ -323,7 +325,7 @@ public class TestMemoryLocks extends XTestCase {
         }
 
         protected MemoryLocks.MemoryLockToken getLock() throws InterruptedException {
-            return locks.getWriteLock(name, timeout);
+            return locks.getLock(name, Type.WRITE, timeout);
         }
     }
 
@@ -372,16 +374,16 @@ public class TestMemoryLocks extends XTestCase {
         MemoryLocksService lockService = new MemoryLocksService();
         try {
             lockService.init(Services.get());
-            LockToken lock = lockService.getWriteLock(path, 5000);
-            lock = (LockToken) lockService.getWriteLock(path, 5000);
-            lock = (LockToken) lockService.getWriteLock(path, 5000);
+            LockToken lock = lockService.getWriteLock(path, DEFAULT_LOCK_TIMEOUT);
+            lock = (LockToken) lockService.getWriteLock(path, DEFAULT_LOCK_TIMEOUT);
+            lock = (LockToken) lockService.getWriteLock(path, DEFAULT_LOCK_TIMEOUT);
             assertEquals(lockService.getMemoryLocks().size(), 1);
             lock.release();
             assertEquals(lockService.getMemoryLocks().size(), 1);
             lock.release();
             assertEquals(lockService.getMemoryLocks().size(), 1);
             lock.release();
-            assertEquals(lockService.getMemoryLocks().size(), 0);
+            checkLockRelease(path, lockService);
         }
         catch (Exception e) {
             fail("Reentrant property, it should have acquired lock");
@@ -391,4 +393,48 @@ public class TestMemoryLocks extends XTestCase {
         }
     }
 
+    public void testLocksAreGarbageCollected() throws ServiceException, InterruptedException {
+        String path = new String("a");
+        String path1 = new String("a");
+        MemoryLocksService lockService = new MemoryLocksService();
+        lockService.init(Services.get());
+        LockToken lock = lockService.getWriteLock(path, DEFAULT_LOCK_TIMEOUT);
+        int oldHash = lockService.getMemoryLocks().getLockMap().get(path).hashCode();
+        lock.release();
+        lock = lockService.getWriteLock(path1, DEFAULT_LOCK_TIMEOUT);
+        int newHash = lockService.getMemoryLocks().getLockMap().get(path1).hashCode();
+        assertTrue(oldHash == newHash);
+        lock.release();
+        lock = null;
+        System.gc();
+        path = "a";
+        lock = lockService.getWriteLock(path, DEFAULT_LOCK_TIMEOUT);
+        newHash = lockService.getMemoryLocks().getLockMap().get(path).hashCode();
+        assertFalse(oldHash == newHash);
+
+    }
+
+    public void testLocksAreReused() throws ServiceException, InterruptedException {
+        String path = "a";
+        MemoryLocksService lockService = new MemoryLocksService();
+        lockService.init(Services.get());
+        LockToken lock = lockService.getWriteLock(path, DEFAULT_LOCK_TIMEOUT);
+        int oldHash = System.identityHashCode(lockService.getMemoryLocks().getLockMap().get(path));
+        System.gc();
+        lock.release();
+        lock = lockService.getWriteLock(path, DEFAULT_LOCK_TIMEOUT);
+        assertEquals(lockService.getMemoryLocks().size(), 1);
+        int newHash = System.identityHashCode(lockService.getMemoryLocks().getLockMap().get(path));
+        assertTrue(oldHash == newHash);
+    }
+
+    private void checkLockRelease(String path, MemoryLocksService lockService) {
+        if (lockService.getMemoryLocks().getLockMap().get(path) == null) {
+            // good lock is removed from memory after gc.
+        }
+        else {
+            assertFalse(lockService.getMemoryLocks().getLockMap().get(path).isWriteLocked());
+        }
+    }
+
 }

http://git-wip-us.apache.org/repos/asf/oozie/blob/d330d406/core/src/test/java/org/apache/oozie/service/TestZKLocksService.java
----------------------------------------------------------------------
diff --git a/core/src/test/java/org/apache/oozie/service/TestZKLocksService.java b/core/src/test/java/org/apache/oozie/service/TestZKLocksService.java
index d1acadf..d04f04e 100644
--- a/core/src/test/java/org/apache/oozie/service/TestZKLocksService.java
+++ b/core/src/test/java/org/apache/oozie/service/TestZKLocksService.java
@@ -21,6 +21,7 @@ package org.apache.oozie.service;
 import java.util.UUID;
 
 import org.apache.oozie.lock.LockToken;
+import org.apache.oozie.lock.TestMemoryLocks;
 import org.apache.oozie.service.ZKLocksService.ZKLockToken;
 import org.apache.oozie.test.ZKXTestCase;
 import org.apache.oozie.util.XLog;
@@ -132,7 +133,7 @@ public class TestZKLocksService extends ZKXTestCase {
         ZKLocksService zkls = new ZKLocksService();
         try {
             zkls.init(Services.get());
-            _testWaitWriteLock(zkls, zkls);
+            checkWaitWriteLock(zkls, zkls);
         }
         finally {
             zkls.destroy();
@@ -146,7 +147,7 @@ public class TestZKLocksService extends ZKXTestCase {
         try {
             zkls1.init(Services.get());
             zkls2.init(Services.get());
-            _testWaitWriteLock(zkls1, zkls2);
+            checkWaitWriteLock(zkls1, zkls2);
         }
         finally {
             zkls1.destroy();
@@ -154,7 +155,7 @@ public class TestZKLocksService extends ZKXTestCase {
         }
     }
 
-    public void _testWaitWriteLock(ZKLocksService zkls1, ZKLocksService zkls2) throws Exception {
+    public void checkWaitWriteLock(ZKLocksService zkls1, ZKLocksService zkls2) throws Exception {
         StringBuffer sb = new StringBuffer("");
         Locker l1 = new WriteLocker("a", 1, -1, sb, zkls1);
         Locker l2 = new WriteLocker("a", 2, -1, sb, zkls2);
@@ -174,7 +175,7 @@ public class TestZKLocksService extends ZKXTestCase {
         ZKLocksService zkls = new ZKLocksService();
         try {
             zkls.init(Services.get());
-            _testNoWaitWriteLock(zkls, zkls);
+            checkNoWaitWriteLock(zkls, zkls);
         }
         finally {
             zkls.destroy();
@@ -188,7 +189,7 @@ public class TestZKLocksService extends ZKXTestCase {
         try {
             zkls1.init(Services.get());
             zkls2.init(Services.get());
-            _testNoWaitWriteLock(zkls1, zkls2);
+            checkNoWaitWriteLock(zkls1, zkls2);
         }
         finally {
             zkls1.destroy();
@@ -196,7 +197,7 @@ public class TestZKLocksService extends ZKXTestCase {
         }
     }
 
-    public void _testNoWaitWriteLock(ZKLocksService zkls1, ZKLocksService zkls2) throws Exception {
+    public void checkNoWaitWriteLock(ZKLocksService zkls1, ZKLocksService zkls2) throws Exception {
         StringBuffer sb = new StringBuffer("");
         Locker l1 = new WriteLocker("a", 1, 0, sb, zkls1);
         Locker l2 = new WriteLocker("a", 2, 0, sb, zkls2);
@@ -216,7 +217,7 @@ public class TestZKLocksService extends ZKXTestCase {
         ZKLocksService zkls = new ZKLocksService();
         try {
             zkls.init(Services.get());
-            _testTimeoutWaitingWriteLock(zkls, zkls);
+            checkTimeoutWaitingWriteLock(zkls, zkls);
         }
         finally {
             zkls.destroy();
@@ -230,7 +231,7 @@ public class TestZKLocksService extends ZKXTestCase {
         try {
             zkls1.init(Services.get());
             zkls2.init(Services.get());
-            _testTimeoutWaitingWriteLock(zkls1, zkls2);
+            checkTimeoutWaitingWriteLock(zkls1, zkls2);
         }
         finally {
             zkls1.destroy();
@@ -238,7 +239,7 @@ public class TestZKLocksService extends ZKXTestCase {
         }
     }
 
-    public void _testTimeoutWaitingWriteLock(ZKLocksService zkls1, ZKLocksService zkls2) throws Exception {
+    public void checkTimeoutWaitingWriteLock(ZKLocksService zkls1, ZKLocksService zkls2) throws Exception {
         StringBuffer sb = new StringBuffer("");
         Locker l1 = new WriteLocker("a", 1, 0, sb, zkls1);
         Locker l2 = new WriteLocker("a", 2, (long) (WAITFOR_RATIO * 2000), sb, zkls2);
@@ -258,7 +259,7 @@ public class TestZKLocksService extends ZKXTestCase {
         ZKLocksService zkls = new ZKLocksService();
         try {
             zkls.init(Services.get());
-            _testTimeoutTimingOutWriteLock(zkls, zkls);
+            checkTimeoutTimingOutWriteLock(zkls, zkls);
         }
         finally {
             zkls.destroy();
@@ -272,7 +273,7 @@ public class TestZKLocksService extends ZKXTestCase {
         try {
             zkls1.init(Services.get());
             zkls2.init(Services.get());
-            _testTimeoutTimingOutWriteLock(zkls1, zkls2);
+            checkTimeoutTimingOutWriteLock(zkls1, zkls2);
         }
         finally {
             zkls1.destroy();
@@ -280,7 +281,7 @@ public class TestZKLocksService extends ZKXTestCase {
         }
     }
 
-    public void _testTimeoutTimingOutWriteLock(ZKLocksService zkls1, ZKLocksService zkls2) throws Exception {
+    public void checkTimeoutTimingOutWriteLock(ZKLocksService zkls1, ZKLocksService zkls2) throws Exception {
         StringBuffer sb = new StringBuffer("");
         Locker l1 = new WriteLocker("a", 1, 0, sb, zkls1);
         Locker l2 = new WriteLocker("a", 2, 50, sb, zkls2);
@@ -300,7 +301,7 @@ public class TestZKLocksService extends ZKXTestCase {
         ZKLocksService zkls = new ZKLocksService();
         try {
             zkls.init(Services.get());
-            _testReadLock(zkls, zkls);
+            checkReadLock(zkls, zkls);
         }
         finally {
             zkls.destroy();
@@ -314,7 +315,7 @@ public class TestZKLocksService extends ZKXTestCase {
         try {
             zkls1.init(Services.get());
             zkls2.init(Services.get());
-            _testReadLock(zkls1, zkls2);
+            checkReadLock(zkls1, zkls2);
         }
         finally {
             zkls1.destroy();
@@ -322,7 +323,7 @@ public class TestZKLocksService extends ZKXTestCase {
         }
     }
 
-    public void _testReadLock(ZKLocksService zkls1, ZKLocksService zkls2) throws Exception {
+    public void checkReadLock(ZKLocksService zkls1, ZKLocksService zkls2) throws Exception {
         StringBuffer sb = new StringBuffer("");
         Locker l1 = new ReadLocker("a", 1, -1, sb, zkls1);
         Locker l2 = new ReadLocker("a", 2, -1, sb, zkls2);
@@ -342,7 +343,7 @@ public class TestZKLocksService extends ZKXTestCase {
         ZKLocksService zkls = new ZKLocksService();
         try {
             zkls.init(Services.get());
-            _testReadWriteLock(zkls, zkls);
+            checkReadWriteLock(zkls, zkls);
         }
         finally {
             zkls.destroy();
@@ -356,7 +357,7 @@ public class TestZKLocksService extends ZKXTestCase {
         try {
             zkls1.init(Services.get());
             zkls2.init(Services.get());
-            _testReadWriteLock(zkls1, zkls2);
+            checkReadWriteLock(zkls1, zkls2);
         }
         finally {
             zkls1.destroy();
@@ -364,7 +365,7 @@ public class TestZKLocksService extends ZKXTestCase {
         }
     }
 
-    public void _testReadWriteLock(ZKLocksService zkls1, ZKLocksService zkls2) throws Exception {
+    public void checkReadWriteLock(ZKLocksService zkls1, ZKLocksService zkls2) throws Exception {
         StringBuffer sb = new StringBuffer("");
         Locker l1 = new ReadLocker("a", 1, -1, sb, zkls1);
         Locker l2 = new WriteLocker("a", 2, -1, sb, zkls2);
@@ -384,7 +385,7 @@ public class TestZKLocksService extends ZKXTestCase {
         ZKLocksService zkls = new ZKLocksService();
         try {
             zkls.init(Services.get());
-            _testWriteReadLock(zkls, zkls);
+            checkWriteReadLock(zkls, zkls);
         }
         finally {
             zkls.destroy();
@@ -398,7 +399,7 @@ public class TestZKLocksService extends ZKXTestCase {
         try {
             zkls1.init(Services.get());
             zkls2.init(Services.get());
-            _testWriteReadLock(zkls1, zkls2);
+            checkWriteReadLock(zkls1, zkls2);
         }
         finally {
             zkls1.destroy();
@@ -406,7 +407,7 @@ public class TestZKLocksService extends ZKXTestCase {
         }
     }
 
-    public void _testWriteReadLock(ZKLocksService zkls1, ZKLocksService zkls2) throws Exception {
+    public void checkWriteReadLock(ZKLocksService zkls1, ZKLocksService zkls2) throws Exception {
         StringBuffer sb = new StringBuffer("");
         Locker l1 = new WriteLocker("a", 1, -1, sb, zkls1);
         Locker l2 = new ReadLocker("a", 2, -1, sb, zkls2);
@@ -427,10 +428,10 @@ public class TestZKLocksService extends ZKXTestCase {
         ZKLocksService zkls = new ZKLocksService();
         try {
             zkls.init(Services.get());
-            ZKLockToken lock = (ZKLockToken) zkls.getWriteLock(path, 5000);
+            ZKLockToken lock = (ZKLockToken) zkls.getWriteLock(path, TestMemoryLocks.DEFAULT_LOCK_TIMEOUT);
             assertTrue(zkls.getLocks().containsKey(path));
             lock.release();
-            assertFalse(zkls.getLocks().containsKey(path));
+            checkLockRelease(path, zkls);
         }
         finally {
             zkls.destroy();
@@ -442,16 +443,16 @@ public class TestZKLocksService extends ZKXTestCase {
         ZKLocksService zkls = new ZKLocksService();
         try {
             zkls.init(Services.get());
-            ZKLockToken lock = (ZKLockToken) zkls.getWriteLock(path, 5000);
-            lock = (ZKLockToken) zkls.getWriteLock(path, 5000);
-            lock = (ZKLockToken) zkls.getWriteLock(path, 5000);
+            ZKLockToken lock = (ZKLockToken) zkls.getWriteLock(path, TestMemoryLocks.DEFAULT_LOCK_TIMEOUT);
+            lock = (ZKLockToken) zkls.getWriteLock(path, TestMemoryLocks.DEFAULT_LOCK_TIMEOUT);
+            lock = (ZKLockToken) zkls.getWriteLock(path, TestMemoryLocks.DEFAULT_LOCK_TIMEOUT);
             assertTrue(zkls.getLocks().containsKey(path));
             lock.release();
             assertTrue(zkls.getLocks().containsKey(path));
             lock.release();
             assertTrue(zkls.getLocks().containsKey(path));
             lock.release();
-            assertFalse(zkls.getLocks().containsKey(path));
+            checkLockRelease(path, zkls);
         }
         catch (Exception e) {
             fail("Reentrant property, it should have acquired lock");
@@ -470,10 +471,10 @@ public class TestZKLocksService extends ZKXTestCase {
             ThreadLock t2 = new ThreadLock(zkls, path);
             t1.start();
             t1.join();
-            assertFalse(zkls.getLocks().containsKey(path));
+            checkLockRelease(path, zkls);
             t2.start();
             t2.join();
-            assertFalse(zkls.getLocks().containsKey(path));
+            checkLockRelease(path, zkls);
         }
         finally {
             zkls.destroy();
@@ -507,6 +508,58 @@ public class TestZKLocksService extends ZKXTestCase {
         }
     }
 
+    public void testLocksAreGarbageCollected() throws ServiceException, InterruptedException {
+        String path = new String("a");
+        String path1 = new String("a");
+        ZKLocksService lockService = new ZKLocksService();
+        try {
+            lockService.init(Services.get());
+            LockToken lock = lockService.getWriteLock(path, TestMemoryLocks.DEFAULT_LOCK_TIMEOUT);
+            lock.release();
+            assertEquals(lockService.getLocks().size(), 1);
+            int oldHash = lockService.getLocks().get(path).hashCode();
+            lock = lockService.getWriteLock(path1, TestMemoryLocks.DEFAULT_LOCK_TIMEOUT);
+            int newHash = lockService.getLocks().get(path1).hashCode();
+            assertTrue(oldHash == newHash);
+            lock = null;
+            System.gc();
+            lock = lockService.getWriteLock(path, TestMemoryLocks.DEFAULT_LOCK_TIMEOUT);
+            newHash = lockService.getLocks().get(path).hashCode();
+            assertFalse(oldHash == newHash);
+        }
+        finally {
+            lockService.destroy();
+        }
+    }
+
+    public void testLocksAreReused() throws ServiceException, InterruptedException {
+        String path = "a";
+        ZKLocksService lockService = new ZKLocksService();
+        try {
+            lockService.init(Services.get());
+            LockToken lock = lockService.getWriteLock(path, TestMemoryLocks.DEFAULT_LOCK_TIMEOUT);
+            int oldHash = System.identityHashCode(lockService.getLocks().get(path));
+            System.gc();
+            lock.release();
+            lock = lockService.getWriteLock(path, TestMemoryLocks.DEFAULT_LOCK_TIMEOUT);
+            assertEquals(lockService.getLocks().size(), 1);
+            int newHash = System.identityHashCode(lockService.getLocks().get(path));
+            assertTrue(oldHash == newHash);
+        }
+        finally {
+            lockService.destroy();
+        }
+    }
+
+    private void checkLockRelease(String path, ZKLocksService zkls) {
+        if (zkls.getLocks().get(path) == null) {
+            // good, lock is removed from memory after gc.
+        }
+        else {
+            assertFalse(zkls.getLocks().get(path).writeLock().isAcquiredInThisProcess());
+        }
+    }
+
     static class ThreadLock extends Thread {
         ZKLocksService zkls;
         String path;
@@ -520,9 +573,9 @@ public class TestZKLocksService extends ZKXTestCase {
 
         public void run() {
             try {
-                lock = zkls.getWriteLock(path, 5000);
+                lock = zkls.getWriteLock(path, TestMemoryLocks.DEFAULT_LOCK_TIMEOUT);
                 if (lock != null) {
-                    lock = zkls.getWriteLock(path, 5000);
+                    lock = zkls.getWriteLock(path, TestMemoryLocks.DEFAULT_LOCK_TIMEOUT);
                     Thread.sleep(1000);
                     lock.release();
                     Thread.sleep(1000);

http://git-wip-us.apache.org/repos/asf/oozie/blob/d330d406/release-log.txt
----------------------------------------------------------------------
diff --git a/release-log.txt b/release-log.txt
index 10a183a..b03a61a 100644
--- a/release-log.txt
+++ b/release-log.txt
@@ -3,6 +3,7 @@
 
 -- Oozie 4.3.0 release
 
+OOZIE-2501 ZK reentrant lock doesn't work for few cases (puru)
 OOZIE-2582 Populating external child Ids for action failures (abhishekbafna via rohini)
 OOZIE-2678 Oozie job -kill doesn't work with tez jobs (abhishekbafna via rohini)
 OOZIE-2676 Make hadoop-2 as the default profile (gezapeti via rkanter)