You are viewing a plain text version of this content. The canonical link for it is here.
Posted to dev@curator.apache.org by "Jordan Zimmerman (JIRA)" <ji...@apache.org> on 2016/06/02 21:12:59 UTC

[jira] [Created] (CURATOR-335) InterProcessSemaphoreV2 can deadlock under network stress

Jordan Zimmerman created CURATOR-335:
----------------------------------------

             Summary: InterProcessSemaphoreV2 can deadlock under network stress
                 Key: CURATOR-335
                 URL: https://issues.apache.org/jira/browse/CURATOR-335
             Project: Apache Curator
          Issue Type: Bug
          Components: Recipes
    Affects Versions: 2.10.0, 3.1.0
            Reporter: Jordan Zimmerman
            Assignee: Jordan Zimmerman
            Priority: Critical
             Fix For: 2.11.0, 3.2.0


Under network stress, InterProcessSemaphoreV2 can stop acquiring new leases. This test (by [~cammckenzie]) shows the issues :

{code}
package org.apache.curator.framework.recipes.locks;

import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;
import java.util.concurrent.atomic.AtomicBoolean;
import java.util.concurrent.atomic.AtomicInteger;

import org.apache.curator.framework.CuratorFramework;
import org.apache.curator.framework.CuratorFrameworkFactory;
import org.apache.curator.framework.state.ConnectionState;
import org.apache.curator.framework.state.ConnectionStateListener;
import org.apache.curator.retry.RetryOneTime;
import org.apache.curator.test.BaseClassForTests;
import org.apache.curator.utils.CloseableUtils;
import org.apache.zookeeper.KeeperException;
import org.testng.Assert;
import org.testng.ITestContext;
import org.testng.annotations.BeforeSuite;
import org.testng.annotations.Test;

public class TestInterProcessMutexNotReconnecting extends BaseClassForTests
{
    @Test
    public void test() throws Exception
    {
        final String SEMAPHORE_PATH = "/test";
        final int MAX_SEMAPHORES = 1;
        final int NUM_CLIENTS = 10;
        
        server.start();
        
        CuratorFramework client = null;

        ExecutorService executor = Executors.newFixedThreadPool(NUM_CLIENTS);
        
        final AtomicInteger counter = new AtomicInteger(0);
        final AtomicBoolean run = new AtomicBoolean(true);
        
        try {
            client = CuratorFrameworkFactory.newClient(server.getConnectString(), 5000, 5000, new RetryOneTime(1));
            client.start();
            
            final CuratorFramework lClient = client;
            
            for(int i = 0; i < NUM_CLIENTS; ++i)
            {
                executor.execute(new Runnable()
                    {
                    
                    @Override
                    public void run()
                    {
                        while(run.get())
                        {
                            InterProcessSemaphoreV2 semaphore = new InterProcessSemaphoreV2(lClient, SEMAPHORE_PATH, MAX_SEMAPHORES);
                            System.err.println(Thread.currentThread() + "Acquiring");
                            Lease lease = null;
                            try
                            {
                                lease = semaphore.acquire();
                                System.err.println(Thread.currentThread() + "Acquired");
                                counter.incrementAndGet();
                                Thread.sleep(2000);
                            }
                            catch(InterruptedException e)
                            {
                                System.err.println("Interrupted");
                                Thread.currentThread().interrupt();
                                break;
                            }
                            catch(KeeperException e)
                            {
                                try
                                {
                                    Thread.sleep(2000);
                                }
                                catch(InterruptedException e2)
                                {
                                    System.err.println("Interrupted");
                                    Thread.currentThread().interrupt();
                                    break;
                                }
                            }
                            catch(Exception e)
                            {
                                e.printStackTrace();
                            }
                            finally
                            {
                                if(lease != null) {
                                    semaphore.returnLease(lease);
                                }
                            }
                        }
                    }
                    });
            }
            

            final AtomicBoolean lost = new AtomicBoolean(false);
            client.getConnectionStateListenable().addListener(new ConnectionStateListener() {
                
                @Override
                public void stateChanged(CuratorFramework client, ConnectionState newState) {
                   System.err.println("New state : " + newState);
                   
                   if(newState == ConnectionState.LOST) {
                       lost.set(true);
                   }
                }
            });
            
            Thread.sleep(2000);
            
            System.err.println("Stopping server");
            server.stop();
            System.err.println("Stopped server");
            
            while(!lost.get())
            {
                Thread.sleep(1000);
            }
            
            int preRestartCount = counter.get();
            
            System.err.println("Restarting server");
            server.restart();
            
            long startCheckTime = System.currentTimeMillis();
            while(true)
            {
                if(counter.get() > preRestartCount)
                {
                    break;
                }
                else if((System.currentTimeMillis() - startCheckTime) > 30000)
                {
                    Assert.fail("Semaphores not reacquired after restart");
                }
            }

        }
        finally
        {
            run.set(false);
            executor.shutdownNow();
            CloseableUtils.closeQuietly(client);
        }
    }
}
{code}



--
This message was sent by Atlassian JIRA
(v6.3.4#6332)