You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@activemq.apache.org by ra...@apache.org on 2008/05/28 13:11:36 UTC

svn commit: r660906 - /activemq/trunk/activemq-core/src/main/java/org/apache/activemq/transport/tcp/TcpTransportServer.java

Author: rajdavies
Date: Wed May 28 04:11:35 2008
New Revision: 660906

URL: http://svn.apache.org/viewvc?rev=660906&view=rev
Log:
Fix for https://issues.apache.org/activemq/browse/AMQ-1752

Modified:
    activemq/trunk/activemq-core/src/main/java/org/apache/activemq/transport/tcp/TcpTransportServer.java

Modified: activemq/trunk/activemq-core/src/main/java/org/apache/activemq/transport/tcp/TcpTransportServer.java
URL: http://svn.apache.org/viewvc/activemq/trunk/activemq-core/src/main/java/org/apache/activemq/transport/tcp/TcpTransportServer.java?rev=660906&r1=660905&r2=660906&view=diff
==============================================================================
--- activemq/trunk/activemq-core/src/main/java/org/apache/activemq/transport/tcp/TcpTransportServer.java (original)
+++ activemq/trunk/activemq-core/src/main/java/org/apache/activemq/transport/tcp/TcpTransportServer.java Wed May 28 04:11:35 2008
@@ -63,6 +63,7 @@
     protected final TcpTransportFactory transportFactory;
     protected long maxInactivityDuration = 30000;
     protected int minmumWireFormatVersion;
+    protected boolean useQueueForAccept=true;
    
     /**
      * trace=true -> the Transport stack where this TcpTransport
@@ -210,6 +211,35 @@
     public void setStartLogging(boolean startLogging) {
         this.startLogging = startLogging;
     }
+    
+    /**
+     * @return the backlog
+     */
+    public int getBacklog() {
+        return backlog;
+    }
+
+    /**
+     * @param backlog the backlog to set
+     */
+    public void setBacklog(int backlog) {
+        this.backlog = backlog;
+    }
+
+    /**
+     * @return the useQueueForAccept
+     */
+    public boolean isUseQueueForAccept() {
+        return useQueueForAccept;
+    }
+
+    /**
+     * @param useQueueForAccept the useQueueForAccept to set
+     */
+    public void setUseQueueForAccept(boolean useQueueForAccept) {
+        this.useQueueForAccept = useQueueForAccept;
+    }
+    
 
     /**
      * pull Sockets from the ServerSocket
@@ -223,7 +253,11 @@
                     if (isStopped() || getAcceptListener() == null) {
                         socket.close();
                     } else {
-                       socketQueue.put(socket);
+                        if (useQueueForAccept) {
+                            socketQueue.put(socket);
+                        }else {
+                            handleSocket(socket);
+                        }
                     }
                 }
             } catch (SocketTimeoutException ste) {
@@ -274,33 +308,36 @@
     }
     
     protected void doStart() throws Exception {
-        Runnable run = new Runnable() {
-            public void run() {
-                try {
-                    while (!isStopped() && !isStopping()) {
-                        Socket sock = socketQueue.poll(1, TimeUnit.SECONDS);
-                        if (sock != null) {
-                            handleSocket(sock);
+        if(useQueueForAccept) {
+            Runnable run = new Runnable() {
+                public void run() {
+                    try {
+                        while (!isStopped() && !isStopping()) {
+                            Socket sock = socketQueue.poll(1, TimeUnit.SECONDS);
+                            if (sock != null) {
+                                handleSocket(sock);
+                            }
+                        }
+    
+                    } catch (InterruptedException e) {
+                        LOG.info("socketQueue interuppted - stopping");
+                        if (!isStopping()) {
+                            onAcceptError(e);
                         }
                     }
-
-                } catch (InterruptedException e) {
-                    LOG.info("socketQueue interuppted - stopping");
-                    if (!isStopping()) {
-                        onAcceptError(e);
-                    }
+    
                 }
-
-            }
-
-        };
-        socketHandlerThread = new Thread(null, run,
-                "ActiveMQ Transport Server Thread Handler: " + toString(),
-                getStackSize());
-        socketHandlerThread.setDaemon(true);
-        socketHandlerThread.setPriority(ThreadPriorities.BROKER_MANAGEMENT-1);
+    
+            };
+            socketHandlerThread = new Thread(null, run,
+                    "ActiveMQ Transport Server Thread Handler: " + toString(),
+                    getStackSize());
+            socketHandlerThread.setDaemon(true);
+            socketHandlerThread.setPriority(ThreadPriorities.BROKER_MANAGEMENT-1);
+            socketHandlerThread.start();
+        }
         super.doStart();
-        socketHandlerThread.start();
+        
     }
 
     protected void doStop(ServiceStopper stopper) throws Exception {
@@ -348,7 +385,5 @@
                 onAcceptError(e);
             }
         }
-    }
-    
-    
+    }    
 }
\ No newline at end of file