You are viewing a plain text version of this content. The canonical link for it is here.
Posted to dev@curator.apache.org by "Jordan Zimmerman (JIRA)" <ji...@apache.org> on 2016/06/02 21:12:59 UTC
[jira] [Created] (CURATOR-335) InterProcessSemaphoreV2 can deadlock
under network stress
Jordan Zimmerman created CURATOR-335:
----------------------------------------
Summary: InterProcessSemaphoreV2 can deadlock under network stress
Key: CURATOR-335
URL: https://issues.apache.org/jira/browse/CURATOR-335
Project: Apache Curator
Issue Type: Bug
Components: Recipes
Affects Versions: 2.10.0, 3.1.0
Reporter: Jordan Zimmerman
Assignee: Jordan Zimmerman
Priority: Critical
Fix For: 2.11.0, 3.2.0
Under network stress, InterProcessSemaphoreV2 can stop acquiring new leases. This test (by [~cammckenzie]) shows the issues :
{code}
package org.apache.curator.framework.recipes.locks;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;
import java.util.concurrent.atomic.AtomicBoolean;
import java.util.concurrent.atomic.AtomicInteger;
import org.apache.curator.framework.CuratorFramework;
import org.apache.curator.framework.CuratorFrameworkFactory;
import org.apache.curator.framework.state.ConnectionState;
import org.apache.curator.framework.state.ConnectionStateListener;
import org.apache.curator.retry.RetryOneTime;
import org.apache.curator.test.BaseClassForTests;
import org.apache.curator.utils.CloseableUtils;
import org.apache.zookeeper.KeeperException;
import org.testng.Assert;
import org.testng.ITestContext;
import org.testng.annotations.BeforeSuite;
import org.testng.annotations.Test;
public class TestInterProcessMutexNotReconnecting extends BaseClassForTests
{
@Test
public void test() throws Exception
{
final String SEMAPHORE_PATH = "/test";
final int MAX_SEMAPHORES = 1;
final int NUM_CLIENTS = 10;
server.start();
CuratorFramework client = null;
ExecutorService executor = Executors.newFixedThreadPool(NUM_CLIENTS);
final AtomicInteger counter = new AtomicInteger(0);
final AtomicBoolean run = new AtomicBoolean(true);
try {
client = CuratorFrameworkFactory.newClient(server.getConnectString(), 5000, 5000, new RetryOneTime(1));
client.start();
final CuratorFramework lClient = client;
for(int i = 0; i < NUM_CLIENTS; ++i)
{
executor.execute(new Runnable()
{
@Override
public void run()
{
while(run.get())
{
InterProcessSemaphoreV2 semaphore = new InterProcessSemaphoreV2(lClient, SEMAPHORE_PATH, MAX_SEMAPHORES);
System.err.println(Thread.currentThread() + "Acquiring");
Lease lease = null;
try
{
lease = semaphore.acquire();
System.err.println(Thread.currentThread() + "Acquired");
counter.incrementAndGet();
Thread.sleep(2000);
}
catch(InterruptedException e)
{
System.err.println("Interrupted");
Thread.currentThread().interrupt();
break;
}
catch(KeeperException e)
{
try
{
Thread.sleep(2000);
}
catch(InterruptedException e2)
{
System.err.println("Interrupted");
Thread.currentThread().interrupt();
break;
}
}
catch(Exception e)
{
e.printStackTrace();
}
finally
{
if(lease != null) {
semaphore.returnLease(lease);
}
}
}
}
});
}
final AtomicBoolean lost = new AtomicBoolean(false);
client.getConnectionStateListenable().addListener(new ConnectionStateListener() {
@Override
public void stateChanged(CuratorFramework client, ConnectionState newState) {
System.err.println("New state : " + newState);
if(newState == ConnectionState.LOST) {
lost.set(true);
}
}
});
Thread.sleep(2000);
System.err.println("Stopping server");
server.stop();
System.err.println("Stopped server");
while(!lost.get())
{
Thread.sleep(1000);
}
int preRestartCount = counter.get();
System.err.println("Restarting server");
server.restart();
long startCheckTime = System.currentTimeMillis();
while(true)
{
if(counter.get() > preRestartCount)
{
break;
}
else if((System.currentTimeMillis() - startCheckTime) > 30000)
{
Assert.fail("Semaphores not reacquired after restart");
}
}
}
finally
{
run.set(false);
executor.shutdownNow();
CloseableUtils.closeQuietly(client);
}
}
}
{code}
--
This message was sent by Atlassian JIRA
(v6.3.4#6332)