You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@qpid.apache.org by or...@apache.org on 2016/10/21 13:39:33 UTC
svn commit: r1766024 - in
/qpid/java/trunk/broker-core/src/main/java/org/apache/qpid/server/transport:
NetworkConnectionScheduler.java NonBlockingNetworkTransport.java
SelectorThread.java
Author: orudyy
Date: Fri Oct 21 13:39:33 2016
New Revision: 1766024
URL: http://svn.apache.org/viewvc?rev=1766024&view=rev
Log:
QPID-7368: Address review comments
Modified:
qpid/java/trunk/broker-core/src/main/java/org/apache/qpid/server/transport/NetworkConnectionScheduler.java
qpid/java/trunk/broker-core/src/main/java/org/apache/qpid/server/transport/NonBlockingNetworkTransport.java
qpid/java/trunk/broker-core/src/main/java/org/apache/qpid/server/transport/SelectorThread.java
Modified: qpid/java/trunk/broker-core/src/main/java/org/apache/qpid/server/transport/NetworkConnectionScheduler.java
URL: http://svn.apache.org/viewvc/qpid/java/trunk/broker-core/src/main/java/org/apache/qpid/server/transport/NetworkConnectionScheduler.java?rev=1766024&r1=1766023&r2=1766024&view=diff
==============================================================================
--- qpid/java/trunk/broker-core/src/main/java/org/apache/qpid/server/transport/NetworkConnectionScheduler.java (original)
+++ qpid/java/trunk/broker-core/src/main/java/org/apache/qpid/server/transport/NetworkConnectionScheduler.java Fri Oct 21 13:39:33 2016
@@ -22,20 +22,16 @@ package org.apache.qpid.server.transport
import java.io.IOException;
import java.nio.channels.ServerSocketChannel;
-import java.util.concurrent.ExecutionException;
import java.util.concurrent.Executors;
-import java.util.concurrent.Future;
import java.util.concurrent.LinkedBlockingQueue;
import java.util.concurrent.ThreadFactory;
import java.util.concurrent.ThreadPoolExecutor;
import java.util.concurrent.TimeUnit;
-import java.util.concurrent.TimeoutException;
import java.util.concurrent.atomic.AtomicInteger;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
-import org.apache.qpid.configuration.CommonProperties;
import org.apache.qpid.transport.TransportException;
public class NetworkConnectionScheduler
@@ -208,31 +204,7 @@ public class NetworkConnectionScheduler
public void cancelAcceptingSocket(final ServerSocketChannel serverSocket)
{
- Future<Void> result = cancelAcceptingSocketAsync(serverSocket);
- try
- {
- result.get(Integer.getInteger(CommonProperties.IO_NETWORK_TRANSPORT_TIMEOUT_PROP_NAME,
- CommonProperties.IO_NETWORK_TRANSPORT_TIMEOUT_DEFAULT),
- TimeUnit.MILLISECONDS);
- }
- catch (InterruptedException e)
- {
- LOGGER.warn("Cancellation of accepting socket was interrupted");
- Thread.currentThread().interrupt();
- }
- catch (ExecutionException e)
- {
- LOGGER.warn("Cancellation of accepting socket failed", e.getCause());
- }
- catch (TimeoutException e)
- {
- LOGGER.warn("Cancellation of accepting socket timed out");
- }
- }
-
- private Future<Void> cancelAcceptingSocketAsync(final ServerSocketChannel serverSocket)
- {
- return _selectorThread.cancelAcceptingSocket(serverSocket);
+ _selectorThread.cancelAcceptingSocket(serverSocket);
}
public void addConnection(final NonBlockingConnection connection)
Modified: qpid/java/trunk/broker-core/src/main/java/org/apache/qpid/server/transport/NonBlockingNetworkTransport.java
URL: http://svn.apache.org/viewvc/qpid/java/trunk/broker-core/src/main/java/org/apache/qpid/server/transport/NonBlockingNetworkTransport.java?rev=1766024&r1=1766023&r2=1766024&view=diff
==============================================================================
--- qpid/java/trunk/broker-core/src/main/java/org/apache/qpid/server/transport/NonBlockingNetworkTransport.java (original)
+++ qpid/java/trunk/broker-core/src/main/java/org/apache/qpid/server/transport/NonBlockingNetworkTransport.java Fri Oct 21 13:39:33 2016
@@ -34,7 +34,6 @@ import org.apache.qpid.server.model.port
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
-import org.apache.qpid.configuration.CommonProperties;
import org.apache.qpid.transport.TransportException;
import org.apache.qpid.transport.network.TransportEncryption;
@@ -44,12 +43,10 @@ public class NonBlockingNetworkTransport
{
private static final Logger LOGGER = LoggerFactory.getLogger(NonBlockingNetworkTransport.class);
- private static final int TIMEOUT = Integer.getInteger(CommonProperties.IO_NETWORK_TRANSPORT_TIMEOUT_PROP_NAME,
- CommonProperties.IO_NETWORK_TRANSPORT_TIMEOUT_DEFAULT);
+
private final Set<TransportEncryption> _encryptionSet;
private final MultiVersionProtocolEngineFactory _factory;
private final ServerSocketChannel _serverSocket;
- private final int _timeout;
private final NetworkConnectionScheduler _scheduler;
private final AmqpPort<?> _port;
private final InetSocketAddress _address;
@@ -63,7 +60,6 @@ public class NonBlockingNetworkTransport
{
_factory = factory;
- _timeout = TIMEOUT;
String bindingAddress = port.getBindingAddress();
if (WILDCARD_ADDRESS.equals(bindingAddress))
Modified: qpid/java/trunk/broker-core/src/main/java/org/apache/qpid/server/transport/SelectorThread.java
URL: http://svn.apache.org/viewvc/qpid/java/trunk/broker-core/src/main/java/org/apache/qpid/server/transport/SelectorThread.java?rev=1766024&r1=1766023&r2=1766024&view=diff
==============================================================================
--- qpid/java/trunk/broker-core/src/main/java/org/apache/qpid/server/transport/SelectorThread.java (original)
+++ qpid/java/trunk/broker-core/src/main/java/org/apache/qpid/server/transport/SelectorThread.java Fri Oct 21 13:39:33 2016
@@ -36,8 +36,11 @@ import java.util.Queue;
import java.util.Set;
import java.util.concurrent.BlockingQueue;
import java.util.concurrent.ConcurrentLinkedQueue;
+import java.util.concurrent.ExecutionException;
import java.util.concurrent.Future;
import java.util.concurrent.LinkedBlockingQueue;
+import java.util.concurrent.TimeUnit;
+import java.util.concurrent.TimeoutException;
import java.util.concurrent.atomic.AtomicBoolean;
import java.util.concurrent.atomic.AtomicInteger;
@@ -45,10 +48,15 @@ import com.google.common.util.concurrent
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
+import org.apache.qpid.configuration.CommonProperties;
+
class SelectorThread extends Thread
{
private static final Logger LOGGER = LoggerFactory.getLogger(SelectorThread.class);
+ private static final long ACCEPT_CANCELATION_TIMEOUT =
+ Integer.getInteger(CommonProperties.IO_NETWORK_TRANSPORT_TIMEOUT_PROP_NAME,
+ CommonProperties.IO_NETWORK_TRANSPORT_TIMEOUT_DEFAULT);
static final String IO_THREAD_NAME_PREFIX = "IO-";
private final Queue<Runnable> _tasks = new ConcurrentLinkedQueue<>();
@@ -433,7 +441,29 @@ class SelectorThread extends Thread
_selectionTasks[0].wakeup();
}
- public Future<Void> cancelAcceptingSocket(final ServerSocketChannel socketChannel)
+ public void cancelAcceptingSocket(final ServerSocketChannel socketChannel)
+ {
+ Future<Void> result = cancelAcceptingSocketAsync(socketChannel);
+ try
+ {
+ result.get(ACCEPT_CANCELATION_TIMEOUT, TimeUnit.MILLISECONDS);
+ }
+ catch (InterruptedException e)
+ {
+ LOGGER.warn("Cancellation of accepting socket was interrupted");
+ Thread.currentThread().interrupt();
+ }
+ catch (ExecutionException e)
+ {
+ LOGGER.warn("Cancellation of accepting socket failed", e.getCause());
+ }
+ catch (TimeoutException e)
+ {
+ LOGGER.warn("Cancellation of accepting socket timed out");
+ }
+ }
+
+ private Future<Void> cancelAcceptingSocketAsync(final ServerSocketChannel socketChannel)
{
final SettableFuture<Void> cancellationResult = SettableFuture.create();
_tasks.add(new Runnable()
---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscribe@qpid.apache.org
For additional commands, e-mail: commits-help@qpid.apache.org