You are viewing a plain text version of this content. The canonical link for it is here.
Posted to dev@oozie.apache.org by "Abhishek Bafna (JIRA)" <ji...@apache.org> on 2016/09/22 16:41:20 UTC
[jira] [Commented] (OOZIE-2501) ZK reentrant lock doesn't work for
few cases
[ https://issues.apache.org/jira/browse/OOZIE-2501?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=15513773#comment-15513773 ]
Abhishek Bafna commented on OOZIE-2501:
---------------------------------------
Ping: [~rohini], [~rkanter] [~puru] [~jaydeepvishwakarma].
Pinging for Oozie 4.3.0 Release perspective.
> ZK reentrant lock doesn't work for few cases
> --------------------------------------------
>
> Key: OOZIE-2501
> URL: https://issues.apache.org/jira/browse/OOZIE-2501
> Project: Oozie
> Issue Type: Bug
> Reporter: Purshotam Shah
> Assignee: Purshotam Shah
> Priority: Blocker
> Fix For: 4.3.0
>
> Attachments: OOZIE-2501-V2.patch, OOZIE-2501-V4.patch
>
>
> We will have an issue when oozie trying to acquire a lock and at the same time, some other thread is releasing the same lock .
> acquireLock will wait for 5 sec to acquire the lock. It will bypass the synchronized block and get lockEntry from the hashmap.
> While it waiting for 5 sec to acquire the lock, other thread releases the lock and may execute the release code which will remove lockEntry from the map.
> If some other command from same thread tries to acquire the lock, it will create a new InterProcessReadWriteLock object and use that for acquiring the lock.
> Logic for lock acquiring.
> {code}
> public LockToken getWriteLock(String resource, long wait) throws InterruptedException {
> InterProcessReadWriteLock lockEntry;
> synchronized (zkLocks) {
> if (zkLocks.containsKey(resource)) {
> lockEntry = zkLocks.get(resource);
> }
> else {
> lockEntry = new InterProcessReadWriteLock(zk.getClient(), LOCKS_NODE + "/" + resource);
> zkLocks.put(resource, lockEntry);
> }
> }
> InterProcessMutex writeLock = lockEntry.writeLock();
> return acquireLock(wait, writeLock, resource);
> }
> {code}
> Logic for lock releasing
> {code}
> public void release() {
> try {
> lock.release();
> if (zkLocks.get(resource) == null) {
> return;
> }
> if (!isLockHeld()) {
> synchronized (zkLocks) {
> if (zkLocks.get(resource) != null) {
> if (!isLockHeld()) {
> zkLocks.remove(resource);
> }
> }
> }
> }
> }
> catch (Exception ex) {
> LOG.warn("Could not release lock: " + ex.getMessage(), ex);
> }
> }
> {code}
> Curator code to acquire lock.
> {code}
> private boolean internalLock(long time, TimeUnit unit) throws Exception
> {
> /*
> Note on concurrency: a given lockData instance
> can be only acted on by a single thread so locking isn't necessary
> */
> Thread currentThread = Thread.currentThread();
> LockData lockData = threadData.get(currentThread);
> if ( lockData != null )
> {
> // re-entering
> lockData.lockCount.incrementAndGet();
> return true;
> }
> String lockPath = internals.attemptLock(time, unit, getLockNodeBytes());
> if ( lockPath != null )
> {
> LockData newLockData = new LockData(currentThread, lockPath);
> threadData.put(currentThread, newLockData);
> return true;
> }
> return false;
> }
> {code}
> The approach we have followed is to use map with weakvalue. Once the lock is unreachable. GC will remove it from the map. We don't have to explicitly remove it.
--
This message was sent by Atlassian JIRA
(v6.3.4#6332)