You are viewing a plain text version of this content. The canonical link for it is here.
Posted to dev@oozie.apache.org by "Purshotam Shah (JIRA)" <ji...@apache.org> on 2016/05/25 18:44:13 UTC

[jira] [Updated] (OOZIE-2501) ZK reentrant lock doesn't work for few cases

     [ https://issues.apache.org/jira/browse/OOZIE-2501?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ]

Purshotam Shah updated OOZIE-2501:
----------------------------------
    Description: 
We will have an issue when oozie trying to acquire a lock and at the same time, some other thread is releasing the same lock .

acquireLock will wait for 5 sec to acquire the lock. It will bypass the synchronized block and get lockEntry from the hashmap.

While it waiting for 5 sec to acquire the lock, other thread releases the lock and may execute the release code which will remove  lockEntry from the map.

If some other command from same thread tries to acquire the lock, it will create a new InterProcessReadWriteLock object and use that for acquiring the lock. 


Logic for lock acquiring.
{code}
 public LockToken getWriteLock(String resource, long wait) throws InterruptedException {
        InterProcessReadWriteLock lockEntry;
        synchronized (zkLocks) {
            if (zkLocks.containsKey(resource)) {
                lockEntry = zkLocks.get(resource);
            }
            else {
                lockEntry = new InterProcessReadWriteLock(zk.getClient(), LOCKS_NODE + "/" + resource);
                zkLocks.put(resource, lockEntry);
            }
        }
        InterProcessMutex writeLock = lockEntry.writeLock();
        return acquireLock(wait, writeLock, resource);
    }
{code}
Logic for lock releasing

{code}
public void release() {
            try {
                lock.release();
                if (zkLocks.get(resource) == null) {
                    return;
                }
                if (!isLockHeld()) {
                    synchronized (zkLocks) {
                        if (zkLocks.get(resource) != null) {
                            if (!isLockHeld()) {
                                zkLocks.remove(resource);
                            }
                        }
                    }
                }
            }
            catch (Exception ex) {
                LOG.warn("Could not release lock: " + ex.getMessage(), ex);
            }

        }
{code}

Curator code to acquire lock.

{code}
private boolean internalLock(long time, TimeUnit unit) throws Exception
    {
        /*
           Note on concurrency: a given lockData instance
           can be only acted on by a single thread so locking isn't necessary
        */

        Thread          currentThread = Thread.currentThread();

        LockData        lockData = threadData.get(currentThread);
        if ( lockData != null )
        {
            // re-entering
            lockData.lockCount.incrementAndGet();
            return true;
        }

        String lockPath = internals.attemptLock(time, unit, getLockNodeBytes());
        if ( lockPath != null )
        {
            LockData        newLockData = new LockData(currentThread, lockPath);
            threadData.put(currentThread, newLockData);
            return true;
        }

        return false;
    }
{code}


The approach we have followed is to use map with weakvalue. Once the lock is unreachable. GC will remove it from the map. We don't have to explicitly remove it.

  was:
We will have an issue when oozie trying to acquire a lock and at the same time, some other thread is releasing the same lock .

acquireLock will wait for 5 sec to acquire the lock. It will bypass the synchronized block and get lockEntry from the hashmap.

While it waiting for 5 sec to acquire the lock, other thread releases the lock and may execute the release code which will remove  lockEntry from the map.

If some other command from same thread tries to acquire the lock, it will create a new InterProcessReadWriteLock object and use that for acquiring the lock. 


Logic for lock acquiring.
{code}
 public LockToken getWriteLock(String resource, long wait) throws InterruptedException {
        InterProcessReadWriteLock lockEntry;
        synchronized (zkLocks) {
            if (zkLocks.containsKey(resource)) {
                lockEntry = zkLocks.get(resource);
            }
            else {
                lockEntry = new InterProcessReadWriteLock(zk.getClient(), LOCKS_NODE + "/" + resource);
                zkLocks.put(resource, lockEntry);
            }
        }
        InterProcessMutex writeLock = lockEntry.writeLock();
        return acquireLock(wait, writeLock, resource);
    }
{code}
Logic for lock releasing

{code}
public void release() {
            try {
                lock.release();
                if (zkLocks.get(resource) == null) {
                    return;
                }
                if (!isLockHeld()) {
                    synchronized (zkLocks) {
                        if (zkLocks.get(resource) != null) {
                            if (!isLockHeld()) {
                                zkLocks.remove(resource);
                            }
                        }
                    }
                }
            }
            catch (Exception ex) {
                LOG.warn("Could not release lock: " + ex.getMessage(), ex);
            }

        }
{code}

Curator code to acquire lock.

{code}
private boolean internalLock(long time, TimeUnit unit) throws Exception
    {
        /*
           Note on concurrency: a given lockData instance
           can be only acted on by a single thread so locking isn't necessary
        */

        Thread          currentThread = Thread.currentThread();

        LockData        lockData = threadData.get(currentThread);
        if ( lockData != null )
        {
            // re-entering
            lockData.lockCount.incrementAndGet();
            return true;
        }

        String lockPath = internals.attemptLock(time, unit, getLockNodeBytes());
        if ( lockPath != null )
        {
            LockData        newLockData = new LockData(currentThread, lockPath);
            threadData.put(currentThread, newLockData);
            return true;
        }

        return false;
    }
{code}



> ZK reentrant lock doesn't work for few cases
> --------------------------------------------
>
>                 Key: OOZIE-2501
>                 URL: https://issues.apache.org/jira/browse/OOZIE-2501
>             Project: Oozie
>          Issue Type: Bug
>            Reporter: Purshotam Shah
>            Assignee: Purshotam Shah
>
> We will have an issue when oozie trying to acquire a lock and at the same time, some other thread is releasing the same lock .
> acquireLock will wait for 5 sec to acquire the lock. It will bypass the synchronized block and get lockEntry from the hashmap.
> While it waiting for 5 sec to acquire the lock, other thread releases the lock and may execute the release code which will remove  lockEntry from the map.
> If some other command from same thread tries to acquire the lock, it will create a new InterProcessReadWriteLock object and use that for acquiring the lock. 
> Logic for lock acquiring.
> {code}
>  public LockToken getWriteLock(String resource, long wait) throws InterruptedException {
>         InterProcessReadWriteLock lockEntry;
>         synchronized (zkLocks) {
>             if (zkLocks.containsKey(resource)) {
>                 lockEntry = zkLocks.get(resource);
>             }
>             else {
>                 lockEntry = new InterProcessReadWriteLock(zk.getClient(), LOCKS_NODE + "/" + resource);
>                 zkLocks.put(resource, lockEntry);
>             }
>         }
>         InterProcessMutex writeLock = lockEntry.writeLock();
>         return acquireLock(wait, writeLock, resource);
>     }
> {code}
> Logic for lock releasing
> {code}
> public void release() {
>             try {
>                 lock.release();
>                 if (zkLocks.get(resource) == null) {
>                     return;
>                 }
>                 if (!isLockHeld()) {
>                     synchronized (zkLocks) {
>                         if (zkLocks.get(resource) != null) {
>                             if (!isLockHeld()) {
>                                 zkLocks.remove(resource);
>                             }
>                         }
>                     }
>                 }
>             }
>             catch (Exception ex) {
>                 LOG.warn("Could not release lock: " + ex.getMessage(), ex);
>             }
>         }
> {code}
> Curator code to acquire lock.
> {code}
> private boolean internalLock(long time, TimeUnit unit) throws Exception
>     {
>         /*
>            Note on concurrency: a given lockData instance
>            can be only acted on by a single thread so locking isn't necessary
>         */
>         Thread          currentThread = Thread.currentThread();
>         LockData        lockData = threadData.get(currentThread);
>         if ( lockData != null )
>         {
>             // re-entering
>             lockData.lockCount.incrementAndGet();
>             return true;
>         }
>         String lockPath = internals.attemptLock(time, unit, getLockNodeBytes());
>         if ( lockPath != null )
>         {
>             LockData        newLockData = new LockData(currentThread, lockPath);
>             threadData.put(currentThread, newLockData);
>             return true;
>         }
>         return false;
>     }
> {code}
> The approach we have followed is to use map with weakvalue. Once the lock is unreachable. GC will remove it from the map. We don't have to explicitly remove it.



--
This message was sent by Atlassian JIRA
(v6.3.4#6332)