You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@avro.apache.org by cu...@apache.org on 2011/07/01 18:30:54 UTC
svn commit: r1141980 - in /avro/trunk: ./
lang/java/ipc/src/main/java/org/apache/avro/ipc/
lang/java/ipc/src/main/java/org/apache/avro/ipc/specific/
lang/java/ipc/src/test/java/org/apache/avro/
lang/java/ipc/src/test/java/org/apache/avro/ipc/
Author: cutting
Date: Fri Jul 1 16:30:54 2011
New Revision: 1141980
URL: http://svn.apache.org/viewvc?rev=1141980&view=rev
Log:
AVRO-842. Java: Fix Netty-based IPC client to provide better errors when attempting to use a closed connection. Contributed by James Baldassari.
Modified:
avro/trunk/CHANGES.txt
avro/trunk/lang/java/ipc/src/main/java/org/apache/avro/ipc/NettyTransceiver.java
avro/trunk/lang/java/ipc/src/main/java/org/apache/avro/ipc/Requestor.java
avro/trunk/lang/java/ipc/src/main/java/org/apache/avro/ipc/Transceiver.java
avro/trunk/lang/java/ipc/src/main/java/org/apache/avro/ipc/specific/SpecificRequestor.java
avro/trunk/lang/java/ipc/src/test/java/org/apache/avro/TestProtocolHttp.java
avro/trunk/lang/java/ipc/src/test/java/org/apache/avro/ipc/TestNettyServerWithCallbacks.java
Modified: avro/trunk/CHANGES.txt
URL: http://svn.apache.org/viewvc/avro/trunk/CHANGES.txt?rev=1141980&r1=1141979&r2=1141980&view=diff
==============================================================================
--- avro/trunk/CHANGES.txt (original)
+++ avro/trunk/CHANGES.txt Fri Jul 1 16:30:54 2011
@@ -74,6 +74,10 @@ Avro 1.5.2 (unreleased)
AVRO-776. Java: Fix SocketServer to close socket. (scottcarey)
+ AVRO-842. Java: Fix Netty-based IPC client to provide better
+ errors when attempting to use a closed connection.
+ (James Baldassari via cutting)
+
Avro 1.5.1 (3 May 2011)
NEW FEATURES
Modified: avro/trunk/lang/java/ipc/src/main/java/org/apache/avro/ipc/NettyTransceiver.java
URL: http://svn.apache.org/viewvc/avro/trunk/lang/java/ipc/src/main/java/org/apache/avro/ipc/NettyTransceiver.java?rev=1141980&r1=1141979&r2=1141980&view=diff
==============================================================================
--- avro/trunk/lang/java/ipc/src/main/java/org/apache/avro/ipc/NettyTransceiver.java (original)
+++ avro/trunk/lang/java/ipc/src/main/java/org/apache/avro/ipc/NettyTransceiver.java Fri Jul 1 16:30:54 2011
@@ -73,7 +73,6 @@ public class NettyTransceiver extends Tr
* Write lock must be acquired whenever modifying state.
*/
private final ReadWriteLock stateLock = new ReentrantReadWriteLock();
- private boolean open = false; // Synchronized on stateLock
private Channel channel; // Synchronized on stateLock
private Protocol remote; // Synchronized on stateLock
@@ -83,12 +82,27 @@ public class NettyTransceiver extends Tr
remoteAddr = null;
}
- public NettyTransceiver(InetSocketAddress addr) {
+ /**
+ * Creates a NettyTransceiver, and attempts to connect to the given address.
+ * @param addr the address to connect to.
+ * @throws IOException if an error occurs connecting to the given address.
+ */
+ public NettyTransceiver(InetSocketAddress addr) throws IOException {
this(addr, new NioClientSocketChannelFactory(Executors.newCachedThreadPool(),
Executors.newCachedThreadPool()));
}
- public NettyTransceiver(InetSocketAddress addr, ChannelFactory channelFactory) {
+ /**
+ * Creates a NettyTransceiver, and attempts to connect to the given address.
+ * @param addr the address to connect to.
+ * @param channelFactory the factory to use to create a new Netty Channel.
+ * @throws IOException if an error occurs connecting to the given address.
+ */
+ public NettyTransceiver(InetSocketAddress addr, ChannelFactory channelFactory) throws IOException {
+ if (channelFactory == null) {
+ throw new NullPointerException("channelFactory is null");
+ }
+
// Set up.
this.channelFactory = channelFactory;
bootstrap = new ClientBootstrap(channelFactory);
@@ -109,43 +123,71 @@ public class NettyTransceiver extends Tr
bootstrap.setOption("tcpNoDelay", true);
// Make a new connection.
- connect();
+ stateLock.readLock().lock();
+ try {
+ getChannel();
+ } finally {
+ stateLock.readLock().unlock();
+ }
}
/**
- * Connects to the remote peer if not already connected.
+ * Tests whether the given channel is ready for writing.
+ * @return true if the channel is open and ready; false otherwise.
*/
- private void connect() {
- stateLock.writeLock().lock();
- try {
- if (!open || (channel == null) || !channel.isOpen() || !channel.isBound() || !channel.isConnected()) {
+ private static boolean isChannelReady(Channel channel) {
+ return (channel != null) &&
+ channel.isOpen() && channel.isBound() && channel.isConnected();
+ }
+
+ /**
+ * Gets the Netty channel. If the channel is not connected, first attempts
+ * to connect.
+ * NOTE: The stateLock read lock *must* be acquired before calling this
+ * method.
+ * @return the Netty channel
+ * @throws IOException if an error occurs connecting the channel.
+ */
+ private Channel getChannel() throws IOException {
+ if (!isChannelReady(channel)) {
+ // Need to reconnect
+ // Upgrade to write lock
+ stateLock.readLock().unlock();
+ stateLock.writeLock().lock();
+ try {
LOG.info("Connecting to " + remoteAddr);
ChannelFuture channelFuture = bootstrap.connect(remoteAddr);
channelFuture.awaitUninterruptibly();
if (!channelFuture.isSuccess()) {
channelFuture.getCause().printStackTrace();
- throw new RuntimeException(channelFuture.getCause());
+ throw new IOException("Error connecting to " + remoteAddr,
+ channelFuture.getCause());
}
channel = channelFuture.getChannel();
- open = true;
+ } finally {
+ // Downgrade to read lock:
+ stateLock.readLock().lock();
+ stateLock.writeLock().unlock();
}
- } finally {
- stateLock.writeLock().unlock();
}
+ return channel;
}
/**
* Closes the connection to the remote peer if connected.
*/
private void disconnect() {
- disconnect(false);
+ disconnect(false, false);
}
/**
* Closes the connection to the remote peer if connected.
* @param awaitCompletion if true, will block until the close has completed.
+ * @param cancelPendingRequests if true, will drain the requests map and
+ * send an IOException to all Callbacks.
*/
- private void disconnect(boolean awaitCompletion) {
+ private void disconnect(boolean awaitCompletion, boolean cancelPendingRequests) {
+ Map<Integer, Callback<List<ByteBuffer>>> requestsToCancel = null;
stateLock.writeLock().lock();
try {
if (channel != null) {
@@ -156,11 +198,25 @@ public class NettyTransceiver extends Tr
}
channel = null;
remote = null;
- open = false;
+ if (cancelPendingRequests) {
+ // Remove all pending requests (will be canceled after relinquishing
+ // write lock).
+ requestsToCancel =
+ new ConcurrentHashMap<Integer, Callback<List<ByteBuffer>>>(requests);
+ requests.clear();
+ }
}
} finally {
stateLock.writeLock().unlock();
}
+
+ if ((requestsToCancel != null) && !requestsToCancel.isEmpty()) {
+ LOG.warn("Removing " + requestsToCancel.size() + " pending request(s).");
+ for (Callback<List<ByteBuffer>> request : requestsToCancel.values()) {
+ request.handleError(
+ new IOException(getClass().getSimpleName() + " closed"));
+ }
+ }
}
/**
@@ -182,22 +238,20 @@ public class NettyTransceiver extends Tr
}
public void close() {
- stateLock.writeLock().lock();
try {
- // Close the connection.
- disconnect(true);
+ // Close the connection:
+ disconnect(true, true);
+ } finally {
// Shut down all thread pools to exit.
channelFactory.releaseExternalResources();
- } finally {
- stateLock.writeLock().unlock();
}
}
@Override
- public String getRemoteName() {
+ public String getRemoteName() throws IOException {
stateLock.readLock().lock();
try {
- return channel.getRemoteAddress().toString();
+ return getChannel().getRemoteAddress().toString();
} finally {
stateLock.readLock().unlock();
}
@@ -207,7 +261,8 @@ public class NettyTransceiver extends Tr
* Override as non-synchronized method because the method is thread safe.
*/
@Override
- public List<ByteBuffer> transceive(List<ByteBuffer> request) {
+ public List<ByteBuffer> transceive(List<ByteBuffer> request)
+ throws IOException {
try {
CallFuture<List<ByteBuffer>> transceiverFuture = new CallFuture<List<ByteBuffer>>();
transceive(request, transceiverFuture);
@@ -222,7 +277,8 @@ public class NettyTransceiver extends Tr
}
@Override
- public void transceive(List<ByteBuffer> request, Callback<List<ByteBuffer>> callback) {
+ public void transceive(List<ByteBuffer> request,
+ Callback<List<ByteBuffer>> callback) throws IOException {
stateLock.readLock().lock();
try {
int serial = serialGenerator.incrementAndGet();
@@ -242,24 +298,12 @@ public class NettyTransceiver extends Tr
/**
* Writes a NettyDataPack, reconnecting to the remote peer if necessary.
* @param dataPack the data pack to write.
+ * @throws IOException if an error occurs connecting to the remote peer.
*/
- private void writeDataPack(NettyDataPack dataPack) {
+ private void writeDataPack(NettyDataPack dataPack) throws IOException {
stateLock.readLock().lock();
try {
- while ((channel == null) || !channel.isOpen() || !channel.isBound() || !channel.isConnected()) {
- // Need to reconnect
- // Upgrade to write lock
- stateLock.readLock().unlock();
- stateLock.writeLock().lock();
- try {
- connect();
- } finally {
- // Downgrade to read lock:
- stateLock.readLock().lock();
- stateLock.writeLock().unlock();
- }
- }
- channel.write(dataPack);
+ getChannel().write(dataPack);
} finally {
stateLock.readLock().unlock();
}
@@ -314,28 +358,7 @@ public class NettyTransceiver extends Tr
if ((cse.getState() == ChannelState.OPEN) && (Boolean.FALSE.equals(cse.getValue()))) {
// Server closed connection; disconnect client side
LOG.info("Remote peer " + remoteAddr + " closed connection.");
- stateLock.readLock().lock();
- boolean readLockAcquired = true;
- try {
- // Only disconnect if open to prevent deadlock on close()
- if (open) {
- // Upgrade to write lock:
- stateLock.readLock().unlock();
- readLockAcquired = false;
- stateLock.writeLock().lock();
- try {
- if (open) {
- disconnect();
- }
- } finally {
- stateLock.writeLock().unlock();
- }
- }
- } finally {
- if (readLockAcquired) {
- stateLock.readLock().unlock();
- }
- }
+ disconnect();
}
}
super.handleUpstream(ctx, e);
Modified: avro/trunk/lang/java/ipc/src/main/java/org/apache/avro/ipc/Requestor.java
URL: http://svn.apache.org/viewvc/avro/trunk/lang/java/ipc/src/main/java/org/apache/avro/ipc/Requestor.java?rev=1141980&r1=1141979&r2=1141980&view=diff
==============================================================================
--- avro/trunk/lang/java/ipc/src/main/java/org/apache/avro/ipc/Requestor.java (original)
+++ avro/trunk/lang/java/ipc/src/main/java/org/apache/avro/ipc/Requestor.java Fri Jul 1 16:30:54 2011
@@ -28,6 +28,7 @@ import java.util.concurrent.locks.Reentr
import java.util.List;
import java.util.Map;
+import org.apache.avro.AvroRemoteException;
import org.apache.avro.AvroRuntimeException;
import org.apache.avro.Protocol;
import org.apache.avro.Schema;
@@ -108,7 +109,7 @@ public abstract class Requestor {
if (e.getCause() instanceof Exception) {
throw (Exception)e.getCause();
} else {
- throw new RuntimeException(e.getCause());
+ throw new AvroRemoteException(e.getCause());
}
}
}
@@ -247,7 +248,7 @@ public abstract class Requestor {
return established;
}
- private void setRemote(HandshakeResponse handshake) {
+ private void setRemote(HandshakeResponse handshake) throws IOException {
remote = Protocol.parse(handshake.serverProtocol.toString());
MD5 remoteHash = (MD5)handshake.serverHash;
REMOTE_HASHES.put(transceiver.getRemoteName(), remoteHash);
Modified: avro/trunk/lang/java/ipc/src/main/java/org/apache/avro/ipc/Transceiver.java
URL: http://svn.apache.org/viewvc/avro/trunk/lang/java/ipc/src/main/java/org/apache/avro/ipc/Transceiver.java?rev=1141980&r1=1141979&r2=1141980&view=diff
==============================================================================
--- avro/trunk/lang/java/ipc/src/main/java/org/apache/avro/ipc/Transceiver.java (original)
+++ avro/trunk/lang/java/ipc/src/main/java/org/apache/avro/ipc/Transceiver.java Fri Jul 1 16:30:54 2011
@@ -30,7 +30,7 @@ import org.apache.avro.Protocol;
public abstract class Transceiver implements Closeable {
private final ReentrantLock channelLock = new ReentrantLock();
- public abstract String getRemoteName();
+ public abstract String getRemoteName() throws IOException;
/**
* Acquires an exclusive lock on the transceiver's channel.
Modified: avro/trunk/lang/java/ipc/src/main/java/org/apache/avro/ipc/specific/SpecificRequestor.java
URL: http://svn.apache.org/viewvc/avro/trunk/lang/java/ipc/src/main/java/org/apache/avro/ipc/specific/SpecificRequestor.java?rev=1141980&r1=1141979&r2=1141980&view=diff
==============================================================================
--- avro/trunk/lang/java/ipc/src/main/java/org/apache/avro/ipc/specific/SpecificRequestor.java (original)
+++ avro/trunk/lang/java/ipc/src/main/java/org/apache/avro/ipc/specific/SpecificRequestor.java Fri Jul 1 16:30:54 2011
@@ -25,6 +25,7 @@ import java.lang.reflect.InvocationHandl
import java.lang.reflect.Type;
import java.util.Arrays;
+import org.apache.avro.AvroRemoteException;
import org.apache.avro.Protocol;
import org.apache.avro.Schema;
import org.apache.avro.AvroRuntimeException;
@@ -55,19 +56,36 @@ public class SpecificRequestor extends R
@Override
public Object invoke(Object proxy, Method method, Object[] args)
throws Throwable {
- // Check if this is a callback-based RPC:
- Type[] parameterTypes = method.getParameterTypes();
- if ((parameterTypes.length > 0) &&
- (parameterTypes[parameterTypes.length - 1] instanceof Class) &&
- Callback.class.isAssignableFrom(((Class<?>)parameterTypes[parameterTypes.length - 1]))) {
- // Extract the Callback from the end of of the argument list
- Object[] finalArgs = Arrays.copyOf(args, args.length - 1);
- Callback<?> callback = (Callback<?>)args[args.length - 1];
- request(method.getName(), finalArgs, callback);
- return null;
- }
- else {
- return request(method.getName(), args);
+ try {
+ // Check if this is a callback-based RPC:
+ Type[] parameterTypes = method.getParameterTypes();
+ if ((parameterTypes.length > 0) &&
+ (parameterTypes[parameterTypes.length - 1] instanceof Class) &&
+ Callback.class.isAssignableFrom(((Class<?>)parameterTypes[parameterTypes.length - 1]))) {
+ // Extract the Callback from the end of of the argument list
+ Object[] finalArgs = Arrays.copyOf(args, args.length - 1);
+ Callback<?> callback = (Callback<?>)args[args.length - 1];
+ request(method.getName(), finalArgs, callback);
+ return null;
+ }
+ else {
+ return request(method.getName(), args);
+ }
+ } catch (Exception e) {
+ // Check if this is a declared Exception:
+ for (Class<?> exceptionClass : method.getExceptionTypes()) {
+ if (exceptionClass.isAssignableFrom(e.getClass())) {
+ throw e;
+ }
+ }
+
+ // Next, check for RuntimeExceptions:
+ if (e instanceof RuntimeException) {
+ throw e;
+ }
+
+ // Not an expected Exception, so wrap it in AvroRemoteException:
+ throw new AvroRemoteException(e);
}
}
Modified: avro/trunk/lang/java/ipc/src/test/java/org/apache/avro/TestProtocolHttp.java
URL: http://svn.apache.org/viewvc/avro/trunk/lang/java/ipc/src/test/java/org/apache/avro/TestProtocolHttp.java?rev=1141980&r1=1141979&r2=1141980&view=diff
==============================================================================
--- avro/trunk/lang/java/ipc/src/test/java/org/apache/avro/TestProtocolHttp.java (original)
+++ avro/trunk/lang/java/ipc/src/test/java/org/apache/avro/TestProtocolHttp.java Fri Jul 1 16:30:54 2011
@@ -34,7 +34,6 @@ import org.junit.Test;
import java.net.URL;
import java.net.ServerSocket;
import java.net.SocketTimeoutException;
-import java.lang.reflect.UndeclaredThrowableException;
import java.util.ArrayList;
public class TestProtocolHttp extends TestProtocolSpecific {
@@ -62,7 +61,7 @@ public class TestProtocolHttp extends Te
Simple proxy = SpecificRequestor.getClient(Simple.class, client);
try {
proxy.hello("foo");
- } catch (UndeclaredThrowableException e) {
+ } catch (AvroRemoteException e) {
throw e.getCause();
} finally {
s.close();
Modified: avro/trunk/lang/java/ipc/src/test/java/org/apache/avro/ipc/TestNettyServerWithCallbacks.java
URL: http://svn.apache.org/viewvc/avro/trunk/lang/java/ipc/src/test/java/org/apache/avro/ipc/TestNettyServerWithCallbacks.java?rev=1141980&r1=1141979&r2=1141980&view=diff
==============================================================================
--- avro/trunk/lang/java/ipc/src/test/java/org/apache/avro/ipc/TestNettyServerWithCallbacks.java (original)
+++ avro/trunk/lang/java/ipc/src/test/java/org/apache/avro/ipc/TestNettyServerWithCallbacks.java Fri Jul 1 16:30:54 2011
@@ -25,6 +25,7 @@ import java.util.concurrent.CountDownLat
import java.util.concurrent.ExecutionException;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;
+import java.util.concurrent.Semaphore;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.TimeoutException;
import java.util.concurrent.atomic.AtomicBoolean;
@@ -256,6 +257,119 @@ public class TestNettyServerWithCallback
Assert.assertFalse("Expected ack flag to be cleared", ackFlag.get());
}
+ @Test
+ public void testSendAfterChannelClose() throws Exception {
+ // Start up a second server so that closing the server doesn't
+ // interfere with the other unit tests:
+ Server server2 = new NettyServer(new SpecificResponder(Simple.class, simpleService),
+ new InetSocketAddress(0));
+ server2.start();
+ try {
+ int serverPort = server2.getPort();
+ System.out.println("server2 port : " + serverPort);
+
+ Transceiver transceiver2 = new NettyTransceiver(new InetSocketAddress(
+ serverPort));
+ try {
+ Simple.Callback simpleClient2 =
+ SpecificRequestor.getClient(Simple.Callback.class, transceiver2);
+
+ // Verify that connection works:
+ Assert.assertEquals(3, simpleClient2.add(1, 2));
+
+ // Try again with callbacks:
+ CallFuture<Integer> addFuture = new CallFuture<Integer>();
+ simpleClient2.add(1, 2, addFuture);
+ Assert.assertEquals(new Integer(3), addFuture.get());
+
+ // Shut down server:
+ server2.close();
+
+ // Send a new RPC, and verify that it throws an Exception that
+ // can be detected by the client:
+ boolean ioeCaught = false;
+ try {
+ simpleClient2.add(1, 2);
+ Assert.fail("Send after server close should have thrown Exception");
+ } catch (AvroRemoteException e) {
+ ioeCaught = e.getCause() instanceof IOException;
+ Assert.assertTrue("Expected IOException", ioeCaught);
+ } catch (Exception e) {
+ e.printStackTrace();
+ throw e;
+ }
+ Assert.assertTrue("Expected IOException", ioeCaught);
+
+ // Send a new RPC with callback, and verify that the correct Exception
+ // is thrown:
+ ioeCaught = false;
+ try {
+ addFuture = new CallFuture<Integer>();
+ simpleClient2.add(1, 2, addFuture);
+ addFuture.get();
+ Assert.fail("Send after server close should have thrown Exception");
+ } catch (IOException e) {
+ ioeCaught = true;
+ } catch (Exception e) {
+ e.printStackTrace();
+ throw e;
+ }
+ Assert.assertTrue("Expected IOException", ioeCaught);
+ } finally {
+ transceiver2.close();
+ }
+ } finally {
+ server2.close();
+ }
+ }
+
+ @Test
+ public void cancelPendingRequestsOnTransceiverClose() throws Exception {
+ // Start up a second server so that closing the server doesn't
+ // interfere with the other unit tests:
+ BlockingSimpleImpl blockingSimpleImpl = new BlockingSimpleImpl();
+ Server server2 = new NettyServer(new SpecificResponder(Simple.class,
+ blockingSimpleImpl), new InetSocketAddress(0));
+ server2.start();
+ try {
+ int serverPort = server2.getPort();
+ System.out.println("server2 port : " + serverPort);
+
+ CallFuture<Integer> addFuture = new CallFuture<Integer>();
+ Transceiver transceiver2 = new NettyTransceiver(new InetSocketAddress(
+ serverPort));
+ try {
+ Simple.Callback simpleClient2 =
+ SpecificRequestor.getClient(Simple.Callback.class, transceiver2);
+
+ // The first call has to block for the handshake:
+ Assert.assertEquals(3, simpleClient2.add(1, 2));
+
+ // Now acquire the semaphore so that the server will block:
+ blockingSimpleImpl.acquirePermit();
+ simpleClient2.add(1, 2, addFuture);
+ } finally {
+ // When the transceiver is closed, the CallFuture should get
+ // an IOException
+ transceiver2.close();
+ }
+ boolean ioeThrown = false;
+ try {
+ addFuture.get();
+ } catch (ExecutionException e) {
+ ioeThrown = e.getCause() instanceof IOException;
+ Assert.assertTrue(e.getCause() instanceof IOException);
+ } catch (Exception e) {
+ e.printStackTrace();
+ Assert.fail("Unexpected Exception: " + e.toString());
+ }
+ Assert.assertTrue("Expected IOException to be thrown", ioeThrown);
+ } finally {
+ blockingSimpleImpl.releasePermit();
+ server2.close();
+ }
+ }
+
@Ignore
@Test
public void performanceTest() throws Exception {
@@ -342,4 +456,92 @@ public class TestNettyServerWithCallback
ackLatch.get().countDown();
}
}
+
+ /**
+ * A SimpleImpl that requires a semaphore permit before executing any method.
+ */
+ private static class BlockingSimpleImpl extends SimpleImpl {
+ private final Semaphore semaphore = new Semaphore(1);
+
+ /**
+ * Creates a BlockingSimpleImpl.
+ */
+ public BlockingSimpleImpl() {
+ super(new AtomicBoolean());
+ }
+
+ @Override
+ public CharSequence hello(CharSequence greeting) throws AvroRemoteException {
+ acquirePermit();
+ try {
+ return super.hello(greeting);
+ } finally {
+ releasePermit();
+ }
+ }
+
+ @Override
+ public TestRecord echo(TestRecord record) throws AvroRemoteException {
+ acquirePermit();
+ try {
+ return super.echo(record);
+ } finally {
+ releasePermit();
+ }
+ }
+
+ @Override
+ public int add(int arg1, int arg2) throws AvroRemoteException {
+ acquirePermit();
+ try {
+ return super.add(arg1, arg2);
+ } finally {
+ releasePermit();
+ }
+ }
+
+ @Override
+ public ByteBuffer echoBytes(ByteBuffer data) throws AvroRemoteException {
+ acquirePermit();
+ try {
+ return super.echoBytes(data);
+ } finally {
+ releasePermit();
+ }
+ }
+
+ @Override
+ public Void error() throws AvroRemoteException, TestError {
+ acquirePermit();
+ try {
+ return super.error();
+ } finally {
+ releasePermit();
+ }
+ }
+
+ @Override
+ public void ack() {
+ acquirePermit();
+ try {
+ super.ack();
+ } finally {
+ releasePermit();
+ }
+ }
+
+ /**
+ * Acquires a single permit from the semaphore.
+ */
+ public void acquirePermit() {
+ semaphore.acquireUninterruptibly();
+ }
+
+ /**
+ * Releases a single permit to the semaphore.
+ */
+ public void releasePermit() {
+ semaphore.release();
+ }
+ }
}