You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@qpid.apache.org by or...@apache.org on 2016/10/21 13:39:33 UTC

svn commit: r1766024 - in /qpid/java/trunk/broker-core/src/main/java/org/apache/qpid/server/transport: NetworkConnectionScheduler.java NonBlockingNetworkTransport.java SelectorThread.java

Author: orudyy
Date: Fri Oct 21 13:39:33 2016
New Revision: 1766024

URL: http://svn.apache.org/viewvc?rev=1766024&view=rev
Log:
QPID-7368: Address review comments

Modified:
    qpid/java/trunk/broker-core/src/main/java/org/apache/qpid/server/transport/NetworkConnectionScheduler.java
    qpid/java/trunk/broker-core/src/main/java/org/apache/qpid/server/transport/NonBlockingNetworkTransport.java
    qpid/java/trunk/broker-core/src/main/java/org/apache/qpid/server/transport/SelectorThread.java

Modified: qpid/java/trunk/broker-core/src/main/java/org/apache/qpid/server/transport/NetworkConnectionScheduler.java
URL: http://svn.apache.org/viewvc/qpid/java/trunk/broker-core/src/main/java/org/apache/qpid/server/transport/NetworkConnectionScheduler.java?rev=1766024&r1=1766023&r2=1766024&view=diff
==============================================================================
--- qpid/java/trunk/broker-core/src/main/java/org/apache/qpid/server/transport/NetworkConnectionScheduler.java (original)
+++ qpid/java/trunk/broker-core/src/main/java/org/apache/qpid/server/transport/NetworkConnectionScheduler.java Fri Oct 21 13:39:33 2016
@@ -22,20 +22,16 @@ package org.apache.qpid.server.transport
 
 import java.io.IOException;
 import java.nio.channels.ServerSocketChannel;
-import java.util.concurrent.ExecutionException;
 import java.util.concurrent.Executors;
-import java.util.concurrent.Future;
 import java.util.concurrent.LinkedBlockingQueue;
 import java.util.concurrent.ThreadFactory;
 import java.util.concurrent.ThreadPoolExecutor;
 import java.util.concurrent.TimeUnit;
-import java.util.concurrent.TimeoutException;
 import java.util.concurrent.atomic.AtomicInteger;
 
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 
-import org.apache.qpid.configuration.CommonProperties;
 import org.apache.qpid.transport.TransportException;
 
 public class NetworkConnectionScheduler
@@ -208,31 +204,7 @@ public class NetworkConnectionScheduler
 
     public void cancelAcceptingSocket(final ServerSocketChannel serverSocket)
     {
-        Future<Void> result = cancelAcceptingSocketAsync(serverSocket);
-        try
-        {
-            result.get(Integer.getInteger(CommonProperties.IO_NETWORK_TRANSPORT_TIMEOUT_PROP_NAME,
-                                          CommonProperties.IO_NETWORK_TRANSPORT_TIMEOUT_DEFAULT),
-                       TimeUnit.MILLISECONDS);
-        }
-        catch (InterruptedException e)
-        {
-            LOGGER.warn("Cancellation of accepting socket was interrupted");
-            Thread.currentThread().interrupt();
-        }
-        catch (ExecutionException e)
-        {
-            LOGGER.warn("Cancellation of accepting socket failed", e.getCause());
-        }
-        catch (TimeoutException e)
-        {
-            LOGGER.warn("Cancellation of accepting socket timed out");
-        }
-    }
-
-    private Future<Void> cancelAcceptingSocketAsync(final ServerSocketChannel serverSocket)
-    {
-        return _selectorThread.cancelAcceptingSocket(serverSocket);
+        _selectorThread.cancelAcceptingSocket(serverSocket);
     }
 
     public void addConnection(final NonBlockingConnection connection)

Modified: qpid/java/trunk/broker-core/src/main/java/org/apache/qpid/server/transport/NonBlockingNetworkTransport.java
URL: http://svn.apache.org/viewvc/qpid/java/trunk/broker-core/src/main/java/org/apache/qpid/server/transport/NonBlockingNetworkTransport.java?rev=1766024&r1=1766023&r2=1766024&view=diff
==============================================================================
--- qpid/java/trunk/broker-core/src/main/java/org/apache/qpid/server/transport/NonBlockingNetworkTransport.java (original)
+++ qpid/java/trunk/broker-core/src/main/java/org/apache/qpid/server/transport/NonBlockingNetworkTransport.java Fri Oct 21 13:39:33 2016
@@ -34,7 +34,6 @@ import org.apache.qpid.server.model.port
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 
-import org.apache.qpid.configuration.CommonProperties;
 import org.apache.qpid.transport.TransportException;
 import org.apache.qpid.transport.network.TransportEncryption;
 
@@ -44,12 +43,10 @@ public class NonBlockingNetworkTransport
 {
 
     private static final Logger LOGGER = LoggerFactory.getLogger(NonBlockingNetworkTransport.class);
-    private static final int TIMEOUT = Integer.getInteger(CommonProperties.IO_NETWORK_TRANSPORT_TIMEOUT_PROP_NAME,
-                                                          CommonProperties.IO_NETWORK_TRANSPORT_TIMEOUT_DEFAULT);
+
     private final Set<TransportEncryption> _encryptionSet;
     private final MultiVersionProtocolEngineFactory _factory;
     private final ServerSocketChannel _serverSocket;
-    private final int _timeout;
     private final NetworkConnectionScheduler _scheduler;
     private final AmqpPort<?> _port;
     private final InetSocketAddress _address;
@@ -63,7 +60,6 @@ public class NonBlockingNetworkTransport
         {
 
             _factory = factory;
-            _timeout = TIMEOUT;
 
             String bindingAddress = port.getBindingAddress();
             if (WILDCARD_ADDRESS.equals(bindingAddress))

Modified: qpid/java/trunk/broker-core/src/main/java/org/apache/qpid/server/transport/SelectorThread.java
URL: http://svn.apache.org/viewvc/qpid/java/trunk/broker-core/src/main/java/org/apache/qpid/server/transport/SelectorThread.java?rev=1766024&r1=1766023&r2=1766024&view=diff
==============================================================================
--- qpid/java/trunk/broker-core/src/main/java/org/apache/qpid/server/transport/SelectorThread.java (original)
+++ qpid/java/trunk/broker-core/src/main/java/org/apache/qpid/server/transport/SelectorThread.java Fri Oct 21 13:39:33 2016
@@ -36,8 +36,11 @@ import java.util.Queue;
 import java.util.Set;
 import java.util.concurrent.BlockingQueue;
 import java.util.concurrent.ConcurrentLinkedQueue;
+import java.util.concurrent.ExecutionException;
 import java.util.concurrent.Future;
 import java.util.concurrent.LinkedBlockingQueue;
+import java.util.concurrent.TimeUnit;
+import java.util.concurrent.TimeoutException;
 import java.util.concurrent.atomic.AtomicBoolean;
 import java.util.concurrent.atomic.AtomicInteger;
 
@@ -45,10 +48,15 @@ import com.google.common.util.concurrent
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 
+import org.apache.qpid.configuration.CommonProperties;
+
 
 class SelectorThread extends Thread
 {
     private static final Logger LOGGER = LoggerFactory.getLogger(SelectorThread.class);
+    private static final long ACCEPT_CANCELATION_TIMEOUT =
+            Integer.getInteger(CommonProperties.IO_NETWORK_TRANSPORT_TIMEOUT_PROP_NAME,
+                               CommonProperties.IO_NETWORK_TRANSPORT_TIMEOUT_DEFAULT);
 
     static final String IO_THREAD_NAME_PREFIX  = "IO-";
     private final Queue<Runnable> _tasks = new ConcurrentLinkedQueue<>();
@@ -433,7 +441,29 @@ class SelectorThread extends Thread
         _selectionTasks[0].wakeup();
     }
 
-    public Future<Void> cancelAcceptingSocket(final ServerSocketChannel socketChannel)
+    public void cancelAcceptingSocket(final ServerSocketChannel socketChannel)
+    {
+        Future<Void> result = cancelAcceptingSocketAsync(socketChannel);
+        try
+        {
+            result.get(ACCEPT_CANCELATION_TIMEOUT, TimeUnit.MILLISECONDS);
+        }
+        catch (InterruptedException e)
+        {
+            LOGGER.warn("Cancellation of accepting socket was interrupted");
+            Thread.currentThread().interrupt();
+        }
+        catch (ExecutionException e)
+        {
+            LOGGER.warn("Cancellation of accepting socket failed", e.getCause());
+        }
+        catch (TimeoutException e)
+        {
+            LOGGER.warn("Cancellation of accepting socket timed out");
+        }
+    }
+
+    private Future<Void> cancelAcceptingSocketAsync(final ServerSocketChannel socketChannel)
     {
         final SettableFuture<Void> cancellationResult = SettableFuture.create();
         _tasks.add(new Runnable()



---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscribe@qpid.apache.org
For additional commands, e-mail: commits-help@qpid.apache.org