You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@activemq.apache.org by ra...@apache.org on 2008/05/28 13:11:36 UTC
svn commit: r660906 -
/activemq/trunk/activemq-core/src/main/java/org/apache/activemq/transport/tcp/TcpTransportServer.java
Author: rajdavies
Date: Wed May 28 04:11:35 2008
New Revision: 660906
URL: http://svn.apache.org/viewvc?rev=660906&view=rev
Log:
Fix for https://issues.apache.org/activemq/browse/AMQ-1752
Modified:
activemq/trunk/activemq-core/src/main/java/org/apache/activemq/transport/tcp/TcpTransportServer.java
Modified: activemq/trunk/activemq-core/src/main/java/org/apache/activemq/transport/tcp/TcpTransportServer.java
URL: http://svn.apache.org/viewvc/activemq/trunk/activemq-core/src/main/java/org/apache/activemq/transport/tcp/TcpTransportServer.java?rev=660906&r1=660905&r2=660906&view=diff
==============================================================================
--- activemq/trunk/activemq-core/src/main/java/org/apache/activemq/transport/tcp/TcpTransportServer.java (original)
+++ activemq/trunk/activemq-core/src/main/java/org/apache/activemq/transport/tcp/TcpTransportServer.java Wed May 28 04:11:35 2008
@@ -63,6 +63,7 @@
protected final TcpTransportFactory transportFactory;
protected long maxInactivityDuration = 30000;
protected int minmumWireFormatVersion;
+ protected boolean useQueueForAccept=true;
/**
* trace=true -> the Transport stack where this TcpTransport
@@ -210,6 +211,35 @@
public void setStartLogging(boolean startLogging) {
this.startLogging = startLogging;
}
+
+ /**
+ * @return the backlog
+ */
+ public int getBacklog() {
+ return backlog;
+ }
+
+ /**
+ * @param backlog the backlog to set
+ */
+ public void setBacklog(int backlog) {
+ this.backlog = backlog;
+ }
+
+ /**
+ * @return the useQueueForAccept
+ */
+ public boolean isUseQueueForAccept() {
+ return useQueueForAccept;
+ }
+
+ /**
+ * @param useQueueForAccept the useQueueForAccept to set
+ */
+ public void setUseQueueForAccept(boolean useQueueForAccept) {
+ this.useQueueForAccept = useQueueForAccept;
+ }
+
/**
* pull Sockets from the ServerSocket
@@ -223,7 +253,11 @@
if (isStopped() || getAcceptListener() == null) {
socket.close();
} else {
- socketQueue.put(socket);
+ if (useQueueForAccept) {
+ socketQueue.put(socket);
+ }else {
+ handleSocket(socket);
+ }
}
}
} catch (SocketTimeoutException ste) {
@@ -274,33 +308,36 @@
}
protected void doStart() throws Exception {
- Runnable run = new Runnable() {
- public void run() {
- try {
- while (!isStopped() && !isStopping()) {
- Socket sock = socketQueue.poll(1, TimeUnit.SECONDS);
- if (sock != null) {
- handleSocket(sock);
+ if(useQueueForAccept) {
+ Runnable run = new Runnable() {
+ public void run() {
+ try {
+ while (!isStopped() && !isStopping()) {
+ Socket sock = socketQueue.poll(1, TimeUnit.SECONDS);
+ if (sock != null) {
+ handleSocket(sock);
+ }
+ }
+
+ } catch (InterruptedException e) {
+ LOG.info("socketQueue interuppted - stopping");
+ if (!isStopping()) {
+ onAcceptError(e);
}
}
-
- } catch (InterruptedException e) {
- LOG.info("socketQueue interuppted - stopping");
- if (!isStopping()) {
- onAcceptError(e);
- }
+
}
-
- }
-
- };
- socketHandlerThread = new Thread(null, run,
- "ActiveMQ Transport Server Thread Handler: " + toString(),
- getStackSize());
- socketHandlerThread.setDaemon(true);
- socketHandlerThread.setPriority(ThreadPriorities.BROKER_MANAGEMENT-1);
+
+ };
+ socketHandlerThread = new Thread(null, run,
+ "ActiveMQ Transport Server Thread Handler: " + toString(),
+ getStackSize());
+ socketHandlerThread.setDaemon(true);
+ socketHandlerThread.setPriority(ThreadPriorities.BROKER_MANAGEMENT-1);
+ socketHandlerThread.start();
+ }
super.doStart();
- socketHandlerThread.start();
+
}
protected void doStop(ServiceStopper stopper) throws Exception {
@@ -348,7 +385,5 @@
onAcceptError(e);
}
}
- }
-
-
+ }
}
\ No newline at end of file