You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@activemq.apache.org by ch...@apache.org on 2009/06/03 21:08:27 UTC
svn commit: r781510 - in /activemq/sandbox/activemq-flow: activemq-bio/
activemq-bio/src/main/java/org/apache/activemq/
activemq-bio/src/main/java/org/apache/activemq/transport/tcp/
activemq-broker/src/main/java/org/apache/activemq/broker/ activemq-bro...
Author: chirino
Date: Wed Jun 3 19:08:26 2009
New Revision: 781510
URL: http://svn.apache.org/viewvc?rev=781510&view=rev
Log:
applied patch at: http://issues.apache.org/activemq/browse/AMQ-2279
Thx colin!
Modified:
activemq/sandbox/activemq-flow/activemq-bio/pom.xml
activemq/sandbox/activemq-flow/activemq-bio/src/main/java/org/apache/activemq/ActiveMQSslConnectionFactory.java
activemq/sandbox/activemq-flow/activemq-bio/src/main/java/org/apache/activemq/transport/tcp/ResponseHolder.java
activemq/sandbox/activemq-flow/activemq-bio/src/main/java/org/apache/activemq/transport/tcp/SslTransport.java
activemq/sandbox/activemq-flow/activemq-bio/src/main/java/org/apache/activemq/transport/tcp/SslTransportFactory.java
activemq/sandbox/activemq-flow/activemq-bio/src/main/java/org/apache/activemq/transport/tcp/TcpTransport.java
activemq/sandbox/activemq-flow/activemq-bio/src/main/java/org/apache/activemq/transport/tcp/TcpTransportFactory.java
activemq/sandbox/activemq-flow/activemq-bio/src/main/java/org/apache/activemq/transport/tcp/TcpTransportServer.java
activemq/sandbox/activemq-flow/activemq-broker/src/main/java/org/apache/activemq/broker/BrokerQueueStore.java
activemq/sandbox/activemq-flow/activemq-broker/src/test/java/org/apache/activemq/broker/BrokerTestBase.java
activemq/sandbox/activemq-flow/activemq-openwire/pom.xml
activemq/sandbox/activemq-flow/activemq-openwire/src/main/java/org/apache/activemq/broker/openwire/OpenwireProtocolHandler.java
activemq/sandbox/activemq-flow/activemq-openwire/src/main/java/org/apache/activemq/openwire/OpenWireFormat.java
activemq/sandbox/activemq-flow/activemq-openwire/src/main/java/org/apache/activemq/transport/failover/FailoverTransport.java
activemq/sandbox/activemq-flow/activemq-queue/src/main/java/org/apache/activemq/queue/CursoredQueue.java
activemq/sandbox/activemq-flow/activemq-queue/src/test/java/org/apache/activemq/flow/Proto2WireFormatFactory.java
activemq/sandbox/activemq-flow/activemq-queue/src/test/java/org/apache/activemq/flow/ProtoWireFormatFactory.java
activemq/sandbox/activemq-flow/activemq-queue/src/test/java/org/apache/activemq/flow/TestWireFormatFactory.java
activemq/sandbox/activemq-flow/activemq-transport/src/main/java/org/apache/activemq/transport/Transport.java
activemq/sandbox/activemq-flow/activemq-transport/src/main/java/org/apache/activemq/transport/TransportFilter.java
activemq/sandbox/activemq-flow/activemq-transport/src/main/java/org/apache/activemq/transport/TransportServerThreadSupport.java
activemq/sandbox/activemq-flow/activemq-transport/src/main/java/org/apache/activemq/transport/TransportSupport.java
activemq/sandbox/activemq-flow/activemq-transport/src/main/java/org/apache/activemq/transport/pipe/PipeTransportFactory.java
activemq/sandbox/activemq-flow/activemq-transport/src/main/java/org/apache/activemq/wireformat/MultiWireFormatFactory.java
activemq/sandbox/activemq-flow/activemq-transport/src/main/java/org/apache/activemq/wireformat/ObjectStreamWireFormat.java
activemq/sandbox/activemq-flow/activemq-transport/src/main/java/org/apache/activemq/wireformat/WireFormat.java
activemq/sandbox/activemq-flow/activemq-util/src/main/java/org/apache/activemq/util/TreeMap.java
activemq/sandbox/activemq-flow/webgen/README
activemq/sandbox/activemq-flow/webgen/src/architecture.page
activemq/sandbox/activemq-flow/webgen/src/metainfo
activemq/sandbox/activemq-flow/webgen/src/todo.page
Modified: activemq/sandbox/activemq-flow/activemq-bio/pom.xml
URL: http://svn.apache.org/viewvc/activemq/sandbox/activemq-flow/activemq-bio/pom.xml?rev=781510&r1=781509&r2=781510&view=diff
==============================================================================
--- activemq/sandbox/activemq-flow/activemq-bio/pom.xml (original)
+++ activemq/sandbox/activemq-flow/activemq-bio/pom.xml Wed Jun 3 19:08:26 2009
@@ -38,8 +38,14 @@
<artifactId>activemq-transport</artifactId>
</dependency>
<dependency>
- <groupId>org.apache.activemq</groupId>
- <artifactId>activemq-client</artifactId>
+ <groupId>junit</groupId>
+ <artifactId>junit</artifactId>
+ <scope>test</scope>
+ </dependency>
+ <dependency>
+ <groupId>log4j</groupId>
+ <artifactId>log4j</artifactId>
+ <scope>test</scope>
</dependency>
<!-- In case we want to look at mina..
@@ -58,17 +64,7 @@
</dependency>
-->
- <!-- Testing Dependencies -->
- <dependency>
- <groupId>junit</groupId>
- <artifactId>junit</artifactId>
- <scope>test</scope>
- </dependency>
- <dependency>
- <groupId>log4j</groupId>
- <artifactId>log4j</artifactId>
- <scope>test</scope>
- </dependency>
+ <!-- Testing Dependencies -->
</dependencies>
Modified: activemq/sandbox/activemq-flow/activemq-bio/src/main/java/org/apache/activemq/ActiveMQSslConnectionFactory.java
URL: http://svn.apache.org/viewvc/activemq/sandbox/activemq-flow/activemq-bio/src/main/java/org/apache/activemq/ActiveMQSslConnectionFactory.java?rev=781510&r1=781509&r2=781510&view=diff
==============================================================================
--- activemq/sandbox/activemq-flow/activemq-bio/src/main/java/org/apache/activemq/ActiveMQSslConnectionFactory.java (original)
+++ activemq/sandbox/activemq-flow/activemq-bio/src/main/java/org/apache/activemq/ActiveMQSslConnectionFactory.java Wed Jun 3 19:08:26 2009
@@ -1,78 +0,0 @@
-/**
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements. See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.activemq;
-
-import java.security.SecureRandom;
-import javax.jms.JMSException;
-import javax.net.ssl.KeyManager;
-import javax.net.ssl.TrustManager;
-import org.apache.activemq.transport.Transport;
-import org.apache.activemq.transport.tcp.SslTransportFactory;
-import org.apache.activemq.util.JMSExceptionSupport;
-
-/**
- * An ActiveMQConnectionFactory that allows access to the key and trust managers
- * used for SslConnections. There is no reason to use this class unless SSL is
- * being used AND the key and trust managers need to be specified from within
- * code. In fact, if the URI passed to this class does not have an "ssl" scheme,
- * this class will pass all work on to its superclass.
- *
- * @author sepandm@gmail.com
- */
-public class ActiveMQSslConnectionFactory extends ActiveMQConnectionFactory {
- // The key and trust managers used to initialize the used SSLContext.
- protected KeyManager[] keyManager;
- protected TrustManager[] trustManager;
- protected SecureRandom secureRandom;
-
- /**
- * Sets the key and trust managers used when creating SSL connections.
- *
- * @param km The KeyManagers used.
- * @param tm The TrustManagers used.
- * @param random The SecureRandom number used.
- */
- public void setKeyAndTrustManagers(final KeyManager[] km, final TrustManager[] tm, final SecureRandom random) {
- keyManager = km;
- trustManager = tm;
- secureRandom = random;
- }
-
- /**
- * Overriding to make special considerations for SSL connections. If we are
- * not using SSL, the superclass's method is called. If we are using SSL, an
- * SslConnectionFactory is used and it is given the needed key and trust
- * managers.
- *
- * @author sepandm@gmail.com
- */
- protected Transport createTransport() throws JMSException {
- // If the given URI is non-ssl, let superclass handle it.
- if (!brokerURL.getScheme().equals("ssl")) {
- return super.createTransport();
- }
-
- try {
- SslTransportFactory sslFactory = new SslTransportFactory();
- sslFactory.setKeyAndTrustManagers(keyManager, trustManager, secureRandom);
- return sslFactory.doConnect(brokerURL);
- } catch (Exception e) {
- throw JMSExceptionSupport.create("Could not create Transport. Reason: " + e, e);
- }
- }
-}
Modified: activemq/sandbox/activemq-flow/activemq-bio/src/main/java/org/apache/activemq/transport/tcp/ResponseHolder.java
URL: http://svn.apache.org/viewvc/activemq/sandbox/activemq-flow/activemq-bio/src/main/java/org/apache/activemq/transport/tcp/ResponseHolder.java?rev=781510&r1=781509&r2=781510&view=diff
==============================================================================
--- activemq/sandbox/activemq-flow/activemq-bio/src/main/java/org/apache/activemq/transport/tcp/ResponseHolder.java (original)
+++ activemq/sandbox/activemq-flow/activemq-bio/src/main/java/org/apache/activemq/transport/tcp/ResponseHolder.java Wed Jun 3 19:08:26 2009
@@ -1,87 +0,0 @@
-/**
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements. See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package org.apache.activemq.transport.tcp;
-
-import org.apache.activemq.command.Response;
-
-/**
- * ResponseHolder utility
- *
- * @version $Revision: 1.1.1.1 $
- */
-public class ResponseHolder {
- protected Response response;
- protected Object lock = new Object();
- protected boolean notified;
-
- /**
- * Construct a receipt holder
- */
- public ResponseHolder() {
- }
-
- /**
- * Set the Response for this holder
- *
- * @param r
- */
- public void setResponse(Response r) {
- synchronized (lock) {
- this.response = r;
- notified = true;
- lock.notify();
- }
- }
-
- /**
- * Get the Response
- *
- * @return the Response or null if it is closed
- */
- public Response getResponse() {
- return getResponse(0);
- }
-
- /**
- * wait upto <Code>timeout</Code> timeout ms to get a receipt
- *
- * @param timeout
- * @return
- */
- public Response getResponse(int timeout) {
- synchronized (lock) {
- if (!notified) {
- try {
- lock.wait(timeout);
- } catch (InterruptedException e) {
- Thread.currentThread().interrupt();
- }
- }
- }
- return this.response;
- }
-
- /**
- * close this holder
- */
- public void close() {
- synchronized (lock) {
- notified = true;
- lock.notifyAll();
- }
- }
-}
Modified: activemq/sandbox/activemq-flow/activemq-bio/src/main/java/org/apache/activemq/transport/tcp/SslTransport.java
URL: http://svn.apache.org/viewvc/activemq/sandbox/activemq-flow/activemq-bio/src/main/java/org/apache/activemq/transport/tcp/SslTransport.java?rev=781510&r1=781509&r2=781510&view=diff
==============================================================================
--- activemq/sandbox/activemq-flow/activemq-bio/src/main/java/org/apache/activemq/transport/tcp/SslTransport.java (original)
+++ activemq/sandbox/activemq-flow/activemq-bio/src/main/java/org/apache/activemq/transport/tcp/SslTransport.java Wed Jun 3 19:08:26 2009
@@ -27,8 +27,8 @@
import javax.net.ssl.SSLSocket;
import javax.net.ssl.SSLSocketFactory;
-import org.apache.activemq.command.Command;
-import org.apache.activemq.command.ConnectionInfo;
+//import org.apache.activemq.command.Command;
+//import org.apache.activemq.command.ConnectionInfo;
import org.apache.activemq.wireformat.WireFormat;
/**
@@ -45,20 +45,27 @@
/**
* Connect to a remote node such as a Broker.
*
- * @param wireFormat The WireFormat to be used.
- * @param socketFactory The socket factory to be used. Forcing SSLSockets
- * for obvious reasons.
- * @param remoteLocation The remote location.
- * @param localLocation The local location.
- * @param needClientAuth If set to true, the underlying socket will need
- * client certificate authentication.
- * @throws UnknownHostException If TcpTransport throws.
- * @throws IOException If TcpTransport throws.
+ * @param wireFormat
+ * The WireFormat to be used.
+ * @param socketFactory
+ * The socket factory to be used. Forcing SSLSockets for obvious
+ * reasons.
+ * @param remoteLocation
+ * The remote location.
+ * @param localLocation
+ * The local location.
+ * @param needClientAuth
+ * If set to true, the underlying socket will need client
+ * certificate authentication.
+ * @throws UnknownHostException
+ * If TcpTransport throws.
+ * @throws IOException
+ * If TcpTransport throws.
*/
public SslTransport(WireFormat wireFormat, SSLSocketFactory socketFactory, URI remoteLocation, URI localLocation, boolean needClientAuth) throws IOException {
super(wireFormat, socketFactory, remoteLocation, localLocation);
if (this.socket != null) {
- ((SSLSocket)this.socket).setNeedClientAuth(needClientAuth);
+ ((SSLSocket) this.socket).setNeedClientAuth(needClientAuth);
}
}
@@ -66,9 +73,12 @@
* Initialize from a ServerSocket. No access to needClientAuth is given
* since it is already set within the provided socket.
*
- * @param wireFormat The WireFormat to be used.
- * @param socket The Socket to be used. Forcing SSL.
- * @throws IOException If TcpTransport throws.
+ * @param wireFormat
+ * The WireFormat to be used.
+ * @param socket
+ * The Socket to be used. Forcing SSL.
+ * @throws IOException
+ * If TcpTransport throws.
*/
public SslTransport(WireFormat wireFormat, SSLSocket socket) throws IOException {
super(wireFormat, socket);
@@ -78,30 +88,35 @@
* Overriding in order to add the client's certificates to ConnectionInfo
* Commmands.
*
- * @param command The Command coming in.
- */
+ * @param command
+ * The Command coming in.
+
public void doConsume(Object command) {
// The instanceof can be avoided, but that would require modifying the
// Command clas tree and that would require too much effort right
// now.
- if (command instanceof ConnectionInfo) {
- ConnectionInfo connectionInfo = (ConnectionInfo)command;
-
- SSLSocket sslSocket = (SSLSocket)this.socket;
-
- SSLSession sslSession = sslSocket.getSession();
-
- X509Certificate[] clientCertChain;
- try {
- clientCertChain = (X509Certificate[])sslSession.getPeerCertificates();
- } catch (SSLPeerUnverifiedException e) {
- clientCertChain = null;
- }
-
- connectionInfo.setTransportContext(clientCertChain);
- }
+ // if (command instanceof ConnectionInfo) {
+ // ConnectionInfo connectionInfo = (ConnectionInfo)command;
+ //
+ // SSLSocket sslSocket = (SSLSocket)this.socket;
+ //
+ // SSLSession sslSession = sslSocket.getSession();
+ //
+ // X509Certificate[] clientCertChain;
+ // try {
+ // clientCertChain = (X509Certificate[])sslSession.getPeerCertificates();
+ // } catch (SSLPeerUnverifiedException e) {
+ // clientCertChain = null;
+ // }
+ //
+ // connectionInfo.setTransportContext(clientCertChain);
+ // }
super.doConsume(command);
+ }*/
+
+ public X509Certificate[] getPeerCertificateChain() throws SSLPeerUnverifiedException {
+ return (X509Certificate[]) ((SSLSocket) this.socket).getSession().getPeerCertificates();
}
/**
Modified: activemq/sandbox/activemq-flow/activemq-bio/src/main/java/org/apache/activemq/transport/tcp/SslTransportFactory.java
URL: http://svn.apache.org/viewvc/activemq/sandbox/activemq-flow/activemq-bio/src/main/java/org/apache/activemq/transport/tcp/SslTransportFactory.java?rev=781510&r1=781509&r2=781510&view=diff
==============================================================================
--- activemq/sandbox/activemq-flow/activemq-bio/src/main/java/org/apache/activemq/transport/tcp/SslTransportFactory.java (original)
+++ activemq/sandbox/activemq-flow/activemq-bio/src/main/java/org/apache/activemq/transport/tcp/SslTransportFactory.java Wed Jun 3 19:08:26 2009
@@ -32,11 +32,8 @@
import javax.net.ssl.TrustManager;
import org.apache.activemq.broker.SslContext;
-import org.apache.activemq.openwire.OpenWireFormat;
-import org.apache.activemq.transport.InactivityMonitor;
import org.apache.activemq.transport.Transport;
import org.apache.activemq.transport.TransportServer;
-import org.apache.activemq.transport.WireFormatNegotiator;
import org.apache.activemq.util.IOExceptionSupport;
import org.apache.activemq.util.IntrospectionSupport;
import org.apache.activemq.util.URISupport;
@@ -100,12 +97,11 @@
// }
// }
- transport = new InactivityMonitor(transport, format);
+ boolean useInactivityMonitor = "true".equals(getOption(options, "useInactivityMonitor", "true"));
+ sslTransport.setUseInactivityMonitor(useInactivityMonitor && isUseInactivityMonitor(transport));
+
- // Only need the WireFormatNegotiator if using openwire
- if (format instanceof OpenWireFormat) {
- transport = new WireFormatNegotiator(transport, (OpenWireFormat)format, sslTransport.getMinmumWireFormatVersion());
- }
+ transport = format.createTransportFilters(transport, options);
return transport;
}
Modified: activemq/sandbox/activemq-flow/activemq-bio/src/main/java/org/apache/activemq/transport/tcp/TcpTransport.java
URL: http://svn.apache.org/viewvc/activemq/sandbox/activemq-flow/activemq-bio/src/main/java/org/apache/activemq/transport/tcp/TcpTransport.java?rev=781510&r1=781509&r2=781510&view=diff
==============================================================================
--- activemq/sandbox/activemq-flow/activemq-bio/src/main/java/org/apache/activemq/transport/tcp/TcpTransport.java (original)
+++ activemq/sandbox/activemq-flow/activemq-bio/src/main/java/org/apache/activemq/transport/tcp/TcpTransport.java Wed Jun 3 19:08:26 2009
@@ -39,19 +39,20 @@
import javax.net.SocketFactory;
import org.apache.activemq.Service;
-import org.apache.activemq.transport.Transport;
-//import org.apache.activemq.transport.TransportLoggerFactory;
+import org.apache.activemq.transport.Transport; //import org.apache.activemq.transport.TransportLoggerFactory;
import org.apache.activemq.transport.TransportThreadSupport;
import org.apache.activemq.util.IntrospectionSupport;
import org.apache.activemq.util.ServiceStopper;
import org.apache.activemq.wireformat.WireFormat;
import org.apache.commons.logging.Log;
import org.apache.commons.logging.LogFactory;
+import java.util.concurrent.LinkedBlockingQueue;
/**
* An implementation of the {@link Transport} interface using raw tcp/ip
*
- * @author David Martin Clavo david(dot)martin(dot)clavo(at)gmail.com (logging improvement modifications)
+ * @author David Martin Clavo david(dot)martin(dot)clavo(at)gmail.com (logging
+ * improvement modifications)
* @version $Revision$
*/
public class TcpTransport extends TransportThreadSupport implements Transport, Service, Runnable {
@@ -65,48 +66,50 @@
protected int soTimeout;
protected int socketBufferSize = 64 * 1024;
protected int ioBufferSize = 8 * 1024;
- protected boolean closeAsync=true;
+ protected boolean closeAsync = true;
protected Socket socket;
protected DataOutputStream dataOut;
protected DataInputStream dataIn;
protected TcpBufferedOutputStream buffOut = null;
+
+ private static final boolean ASYNC_WRITE = false;
/**
- * trace=true -> the Transport stack where this TcpTransport
- * object will be, will have a TransportLogger layer
- * trace=false -> the Transport stack where this TcpTransport
- * object will be, will NOT have a TransportLogger layer, and therefore
- * will never be able to print logging messages.
- * This parameter is most probably set in Connection or TransportConnector URIs.
+ * trace=true -> the Transport stack where this TcpTransport object will be,
+ * will have a TransportLogger layer trace=false -> the Transport stack
+ * where this TcpTransport object will be, will NOT have a TransportLogger
+ * layer, and therefore will never be able to print logging messages. This
+ * parameter is most probably set in Connection or TransportConnector URIs.
*/
protected boolean trace = false;
-// /**
-// * Name of the LogWriter implementation to use.
-// * Names are mapped to classes in the resources/META-INF/services/org/apache/activemq/transport/logwriters directory.
-// * This parameter is most probably set in Connection or TransportConnector URIs.
-// */
-// protected String logWriterName = TransportLoggerFactory.defaultLogWriterName;
- /**
- * Specifies if the TransportLogger will be manageable by JMX or not.
- * Also, as long as there is at least 1 TransportLogger which is manageable,
- * a TransportLoggerControl MBean will me created.
+ /**
+ * Name of the LogWriter implementation to use. Names are mapped to classes
+ * in the
+ * resources/META-INF/services/org/apache/activemq/transport/logwriters
+ * directory. This parameter is most probably set in Connection or
+ * TransportConnector URIs.
+ */
+ // protected String logWriterName = TransportLoggerFactory.defaultLogWriterName;
+ /**
+ * Specifies if the TransportLogger will be manageable by JMX or not. Also,
+ * as long as there is at least 1 TransportLogger which is manageable, a
+ * TransportLoggerControl MBean will me created.
*/
protected boolean dynamicManagement = false;
/**
* startLogging=true -> the TransportLogger object of the Transport stack
- * will initially write messages to the log.
- * startLogging=false -> the TransportLogger object of the Transport stack
- * will initially NOT write messages to the log.
- * This parameter only has an effect if trace == true.
- * This parameter is most probably set in Connection or TransportConnector URIs.
+ * will initially write messages to the log. startLogging=false -> the
+ * TransportLogger object of the Transport stack will initially NOT write
+ * messages to the log. This parameter only has an effect if trace == true.
+ * This parameter is most probably set in Connection or TransportConnector
+ * URIs.
*/
protected boolean startLogging = true;
/**
- * Specifies the port that will be used by the JMX server to manage
- * the TransportLoggers.
- * This should only be set in an URI by a client (producer or consumer) since
- * a broker will already create a JMX server.
- * It is useful for people who test a broker and clients in the same machine
- * and want to control both via JMX; a different port will be needed.
+ * Specifies the port that will be used by the JMX server to manage the
+ * TransportLoggers. This should only be set in an URI by a client (producer
+ * or consumer) since a broker will already create a JMX server. It is
+ * useful for people who test a broker and clients in the same machine and
+ * want to control both via JMX; a different port will be needed.
*/
protected int jmxPort = 1099;
protected boolean useLocalHost = true;
@@ -119,18 +122,20 @@
private Boolean tcpNoDelay;
private Thread runnerThread;
+ protected boolean useActivityMonitor;
+
/**
* Connect to a remote Node - e.g. a Broker
*
* @param wireFormat
* @param socketFactory
* @param remoteLocation
- * @param localLocation - e.g. local InetAddress and local port
+ * @param localLocation
+ * - e.g. local InetAddress and local port
* @throws IOException
* @throws UnknownHostException
*/
- public TcpTransport(WireFormat wireFormat, SocketFactory socketFactory, URI remoteLocation,
- URI localLocation) throws UnknownHostException, IOException {
+ public TcpTransport(WireFormat wireFormat, SocketFactory socketFactory, URI remoteLocation, URI localLocation) throws UnknownHostException, IOException {
this.wireFormat = wireFormat;
this.socketFactory = socketFactory;
try {
@@ -158,13 +163,48 @@
setDaemon(true);
}
+ LinkedBlockingQueue<Object> outbound = new LinkedBlockingQueue<Object>();
+ private Thread onewayThread;
+
/**
* A one way asynchronous send
*/
public void oneway(Object command) throws IOException {
checkStarted();
- wireFormat.marshal(command, dataOut);
- dataOut.flush();
+ try {
+ if (ASYNC_WRITE) {
+ outbound.put(command);
+ } else {
+ wireFormat.marshal(command, dataOut);
+ dataOut.flush();
+ }
+ } catch (InterruptedException e) {
+ throw new InterruptedIOException();
+ }
+ }
+
+ protected void sendOneways() {
+ try {
+ LOG.debug("Started oneway thead");
+ while (!isStopped()) {
+ Object command = outbound.poll(500, TimeUnit.MILLISECONDS);
+ if (command != null) {
+ try {
+ // int count=0;
+ while (command != null) {
+ wireFormat.marshal(command, dataOut);
+ // count++;
+ command = outbound.poll();
+ }
+ // System.out.println(count);
+ dataOut.flush();
+ } catch (IOException e) {
+ getTransportListener().onException(e);
+ }
+ }
+ }
+ } catch (InterruptedException e) {
+ }
}
/**
@@ -179,7 +219,7 @@
*/
public void run() {
LOG.trace("TCP consumer thread for " + this + " starting");
- this.runnerThread=Thread.currentThread();
+ this.runnerThread = Thread.currentThread();
try {
while (!isStopped()) {
doRun();
@@ -187,12 +227,12 @@
} catch (IOException e) {
stoppedLatch.get().countDown();
onException(e);
- } catch (Throwable e){
+ } catch (Throwable e) {
stoppedLatch.get().countDown();
- IOException ioe=new IOException("Unexpected error occured");
+ IOException ioe = new IOException("Unexpected error occured");
ioe.initCause(e);
onException(ioe);
- }finally {
+ } finally {
stoppedLatch.get().countDown();
}
}
@@ -220,14 +260,22 @@
public void setTrace(boolean trace) {
this.trace = trace;
}
-
-// public String getLogWriterName() {
-// return logWriterName;
-// }
-//
-// public void setLogWriterName(String logFormat) {
-// this.logWriterName = logFormat;
-// }
+
+ void setUseInactivityMonitor(boolean val) {
+ useActivityMonitor = val;
+ }
+
+ public boolean isUseInactivityMonitor() {
+ return useActivityMonitor;
+ }
+
+ // public String getLogWriterName() {
+ // return logWriterName;
+ // }
+ //
+ // public void setLogWriterName(String logFormat) {
+ // this.logWriterName = logFormat;
+ // }
public boolean isDynamicManagement() {
return dynamicManagement;
@@ -252,7 +300,7 @@
public void setJmxPort(int jmxPort) {
this.jmxPort = jmxPort;
}
-
+
public int getMinmumWireFormatVersion() {
return minmumWireFormatVersion;
}
@@ -337,12 +385,13 @@
}
/**
- * @param ioBufferSize the ioBufferSize to set
+ * @param ioBufferSize
+ * the ioBufferSize to set
*/
public void setIoBufferSize(int ioBufferSize) {
this.ioBufferSize = ioBufferSize;
}
-
+
/**
* @return the closeAsync
*/
@@ -351,7 +400,8 @@
}
/**
- * @param closeAsync the closeAsync to set
+ * @param closeAsync
+ * the closeAsync to set
*/
public void setCloseAsync(boolean closeAsync) {
this.closeAsync = closeAsync;
@@ -399,6 +449,16 @@
protected void doStart() throws Exception {
connect();
+ if (ASYNC_WRITE) {
+ onewayThread = new Thread() {
+ @Override
+ public void run() {
+ sendOneways();
+ }
+ };
+ onewayThread.start();
+ }
+
stoppedLatch.set(new CountDownLatch(1));
super.doStart();
}
@@ -413,8 +473,7 @@
InetSocketAddress remoteAddress = null;
if (localLocation != null) {
- localAddress = new InetSocketAddress(InetAddress.getByName(localLocation.getHost()),
- localLocation.getPort());
+ localAddress = new InetSocketAddress(InetAddress.getByName(localLocation.getHost()), localLocation.getPort());
}
if (remoteLocation != null) {
@@ -442,8 +501,7 @@
// For SSL sockets.. you can't create an unconnected socket :(
// This means the timout option are not supported either.
if (localAddress != null) {
- socket = socketFactory.createSocket(remoteAddress.getAddress(), remoteAddress.getPort(),
- localAddress.getAddress(), localAddress.getPort());
+ socket = socketFactory.createSocket(remoteAddress.getAddress(), remoteAddress.getPort(), localAddress.getAddress(), localAddress.getPort());
} else {
socket = socketFactory.createSocket(remoteAddress.getAddress(), remoteAddress.getPort());
}
@@ -463,31 +521,34 @@
// closeStreams();
if (socket != null) {
if (closeAsync) {
- //closing the socket can hang also
+ // closing the socket can hang also
final CountDownLatch latch = new CountDownLatch(1);
-
+
SOCKET_CLOSE.execute(new Runnable() {
-
+
public void run() {
try {
socket.close();
} catch (IOException e) {
- LOG.debug("Caught exception closing socket",e);
- }finally {
+ LOG.debug("Caught exception closing socket", e);
+ } finally {
latch.countDown();
}
}
-
+
});
- latch.await(1,TimeUnit.SECONDS);
- }else {
+ latch.await(1, TimeUnit.SECONDS);
+ } else {
try {
socket.close();
} catch (IOException e) {
- LOG.debug("Caught exception closing socket",e);
+ LOG.debug("Caught exception closing socket", e);
}
+
+ }
+ if (ASYNC_WRITE) {
+ onewayThread.join();
}
-
}
}
@@ -499,7 +560,7 @@
super.stop();
CountDownLatch countDownLatch = stoppedLatch.get();
if (countDownLatch != null && Thread.currentThread() != this.runnerThread) {
- countDownLatch.await(1,TimeUnit.SECONDS);
+ countDownLatch.await(1, TimeUnit.SECONDS);
}
}
@@ -529,22 +590,21 @@
}
return null;
}
-
+
@Override
public <T> T narrow(Class<T> target) {
if (target == Socket.class) {
return target.cast(socket);
- } else if ( target == TcpBufferedOutputStream.class) {
+ } else if (target == TcpBufferedOutputStream.class) {
return target.cast(buffOut);
}
return super.narrow(target);
}
-
static {
- SOCKET_CLOSE = new ThreadPoolExecutor(0, Integer.MAX_VALUE, 10, TimeUnit.SECONDS, new SynchronousQueue<Runnable>(), new ThreadFactory() {
+ SOCKET_CLOSE = new ThreadPoolExecutor(0, Integer.MAX_VALUE, 10, TimeUnit.SECONDS, new SynchronousQueue<Runnable>(), new ThreadFactory() {
public Thread newThread(Runnable runnable) {
- Thread thread = new Thread(runnable, "TcpSocketClose: "+runnable);
+ Thread thread = new Thread(runnable, "TcpSocketClose: " + runnable);
thread.setPriority(Thread.MAX_PRIORITY);
thread.setDaemon(true);
return thread;
Modified: activemq/sandbox/activemq-flow/activemq-bio/src/main/java/org/apache/activemq/transport/tcp/TcpTransportFactory.java
URL: http://svn.apache.org/viewvc/activemq/sandbox/activemq-flow/activemq-bio/src/main/java/org/apache/activemq/transport/tcp/TcpTransportFactory.java?rev=781510&r1=781509&r2=781510&view=diff
==============================================================================
--- activemq/sandbox/activemq-flow/activemq-bio/src/main/java/org/apache/activemq/transport/tcp/TcpTransportFactory.java (original)
+++ activemq/sandbox/activemq-flow/activemq-bio/src/main/java/org/apache/activemq/transport/tcp/TcpTransportFactory.java Wed Jun 3 19:08:26 2009
@@ -26,13 +26,10 @@
import javax.net.ServerSocketFactory;
import javax.net.SocketFactory;
-import org.apache.activemq.openwire.OpenWireFormat;
-import org.apache.activemq.transport.InactivityMonitor;
import org.apache.activemq.transport.Transport;
import org.apache.activemq.transport.TransportFactory;
//import org.apache.activemq.transport.TransportLoggerFactory;
import org.apache.activemq.transport.TransportServer;
-import org.apache.activemq.transport.WireFormatNegotiator;
import org.apache.activemq.util.IOExceptionSupport;
import org.apache.activemq.util.IntrospectionSupport;
import org.apache.activemq.util.URISupport;
@@ -87,29 +84,26 @@
Map<String, Object> socketOptions = IntrospectionSupport.extractProperties(options, "socket.");
tcpTransport.setSocketOptions(socketOptions);
-// if (tcpTransport.isTrace()) {
+ if (tcpTransport.isTrace()) {
+ throw new UnsupportedOperationException("Trace not implemented");
// try {
// transport = TransportLoggerFactory.getInstance().createTransportLogger(transport, tcpTransport.getLogWriterName(),
// tcpTransport.isDynamicManagement(), tcpTransport.isStartLogging(), tcpTransport.getJmxPort());
// } catch (Throwable e) {
// LOG.error("Could not create TransportLogger object for: " + tcpTransport.getLogWriterName() + ", reason: " + e, e);
// }
-// }
-
- boolean useInactivityMonitor = "true".equals(getOption(options, "useInactivityMonitor", "true"));
- if (useInactivityMonitor && isUseInactivityMonitor(transport)) {
- transport = new InactivityMonitor(transport, format);
- }
-
- // Only need the WireFormatNegotiator if using openwire
- if (format instanceof OpenWireFormat) {
- transport = new WireFormatNegotiator(transport, (OpenWireFormat)format, tcpTransport.getMinmumWireFormatVersion());
}
+
+ boolean useInactivityMonitor = "true".equals(getOption(options, "useInactivityMonitor", "true"));
+ tcpTransport.setUseInactivityMonitor(useInactivityMonitor && isUseInactivityMonitor(transport));
+
+ transport = format.createTransportFilters(transport, options);
+
return transport;
}
- private String getOption(Map options, String key, String def) {
+ protected String getOption(Map options, String key, String def) {
String rc = (String) options.remove(key);
if( rc == null ) {
rc = def;
Modified: activemq/sandbox/activemq-flow/activemq-bio/src/main/java/org/apache/activemq/transport/tcp/TcpTransportServer.java
URL: http://svn.apache.org/viewvc/activemq/sandbox/activemq-flow/activemq-bio/src/main/java/org/apache/activemq/transport/tcp/TcpTransportServer.java?rev=781510&r1=781509&r2=781510&view=diff
==============================================================================
--- activemq/sandbox/activemq-flow/activemq-bio/src/main/java/org/apache/activemq/transport/tcp/TcpTransportServer.java (original)
+++ activemq/sandbox/activemq-flow/activemq-bio/src/main/java/org/apache/activemq/transport/tcp/TcpTransportServer.java Wed Jun 3 19:08:26 2009
@@ -35,9 +35,7 @@
import javax.net.ServerSocketFactory;
import org.apache.activemq.Service;
-import org.apache.activemq.ThreadPriorities;
-import org.apache.activemq.command.BrokerInfo;
-import org.apache.activemq.openwire.OpenWireFormatFactory;
+//import org.apache.activemq.ThreadPriorities;
import org.apache.activemq.transport.Transport;
//import org.apache.activemq.transport.TransportLoggerFactory;
import org.apache.activemq.transport.TransportServer;
@@ -64,7 +62,7 @@
private static final Log LOG = LogFactory.getLog(TcpTransportServer.class);
protected ServerSocket serverSocket;
protected int backlog = 5000;
- protected WireFormatFactory wireFormatFactory = new OpenWireFormatFactory();
+ protected WireFormatFactory wireFormatFactory;
protected final TcpTransportFactory transportFactory;
protected long maxInactivityDuration = 30000;
protected long maxInactivityDurationInitalDelay = 10000;
@@ -85,12 +83,14 @@
protected int socketBufferSize = 64 * 1024;
protected int connectionTimeout = 30000;
-// /**
-// * Name of the LogWriter implementation to use.
-// * Names are mapped to classes in the resources/META-INF/services/org/apache/activemq/transport/logwriters directory.
-// * This parameter is most probably set in Connection or TransportConnector URIs.
-// */
-// protected String logWriterName = TransportLoggerFactory.defaultLogWriterName;
+ /**
+ * Name of the LogWriter implementation to use.
+ * Names are mapped to classes in the resources/META-INF/services/org/apache/activemq/transport/logwriters directory.
+ * This parameter is most probably set in Connection or TransportConnector URIs.
+
+ protected String logWriterName = TransportLoggerFactory.defaultLogWriterName;
+ */
+
/**
* Specifies if the TransportLogger will be manageable by JMX or not.
* Also, as long as there is at least 1 TransportLogger which is manageable,
@@ -175,15 +175,6 @@
this.wireFormatFactory = wireFormatFactory;
}
- /**
- * Associates a broker info with the transport server so that the transport
- * can do discovery advertisements of the broker.
- *
- * @param brokerInfo
- */
- public void setBrokerInfo(BrokerInfo brokerInfo) {
- }
-
public long getMaxInactivityDuration() {
return maxInactivityDuration;
}
@@ -215,7 +206,7 @@
public void setTrace(boolean trace) {
this.trace = trace;
}
-
+//
// public String getLogWriterName() {
// return logWriterName;
// }
@@ -369,7 +360,7 @@
"ActiveMQ Transport Server Thread Handler: " + toString(),
getStackSize());
socketHandlerThread.setDaemon(true);
- socketHandlerThread.setPriority(ThreadPriorities.BROKER_MANAGEMENT-1);
+ //socketHandlerThread.setPriority(ThreadPriorities.BROKER_MANAGEMENT-1);
socketHandlerThread.start();
}
super.doStart();
@@ -436,29 +427,29 @@
}
- public int getSoTimeout() {
- return soTimeout;
- }
-
- public void setSoTimeout(int soTimeout) {
- this.soTimeout = soTimeout;
- }
-
- public int getSocketBufferSize() {
- return socketBufferSize;
- }
-
- public void setSocketBufferSize(int socketBufferSize) {
- this.socketBufferSize = socketBufferSize;
- }
-
- public int getConnectionTimeout() {
- return connectionTimeout;
- }
-
- public void setConnectionTimeout(int connectionTimeout) {
- this.connectionTimeout = connectionTimeout;
- }
+ public int getSoTimeout() {
+ return soTimeout;
+ }
+
+ public void setSoTimeout(int soTimeout) {
+ this.soTimeout = soTimeout;
+ }
+
+ public int getSocketBufferSize() {
+ return socketBufferSize;
+ }
+
+ public void setSocketBufferSize(int socketBufferSize) {
+ this.socketBufferSize = socketBufferSize;
+ }
+
+ public int getConnectionTimeout() {
+ return connectionTimeout;
+ }
+
+ public void setConnectionTimeout(int connectionTimeout) {
+ this.connectionTimeout = connectionTimeout;
+ }
/**
* @return the maximumConnections
Modified: activemq/sandbox/activemq-flow/activemq-broker/src/main/java/org/apache/activemq/broker/BrokerQueueStore.java
URL: http://svn.apache.org/viewvc/activemq/sandbox/activemq-flow/activemq-broker/src/main/java/org/apache/activemq/broker/BrokerQueueStore.java?rev=781510&r1=781509&r2=781510&view=diff
==============================================================================
--- activemq/sandbox/activemq-flow/activemq-broker/src/main/java/org/apache/activemq/broker/BrokerQueueStore.java (original)
+++ activemq/sandbox/activemq-flow/activemq-broker/src/main/java/org/apache/activemq/broker/BrokerQueueStore.java Wed Jun 3 19:08:26 2009
@@ -117,8 +117,9 @@
private static final int DEFAULT_DURABLE_QUEUE_PAGING_THRESHOLD = 100 * 1024 * 1;
private static final int DEFAULT_DURABLE_QUEUE_RESUME_THRESHOLD = 1;
// Be default we don't page out elements to disk.
- private static final int DEFAULT_DURABLE_QUEUE_SIZE = DEFAULT_DURABLE_QUEUE_PAGING_THRESHOLD;
-
+ //private static final int DEFAULT_DURABLE_QUEUE_SIZE = DEFAULT_DURABLE_QUEUE_PAGING_THRESHOLD;
+ private static final int DEFAULT_DURABLE_QUEUE_SIZE = 1024 * 1024 * 10;
+
private static final PersistencePolicy<MessageDelivery> DURABLE_QUEUE_PERSISTENCE_POLICY = new PersistencePolicy<MessageDelivery>() {
private static final boolean PAGING_ENABLED = DEFAULT_DURABLE_QUEUE_SIZE > DEFAULT_DURABLE_QUEUE_PAGING_THRESHOLD;
Modified: activemq/sandbox/activemq-flow/activemq-broker/src/test/java/org/apache/activemq/broker/BrokerTestBase.java
URL: http://svn.apache.org/viewvc/activemq/sandbox/activemq-flow/activemq-broker/src/test/java/org/apache/activemq/broker/BrokerTestBase.java?rev=781510&r1=781509&r2=781510&view=diff
==============================================================================
--- activemq/sandbox/activemq-flow/activemq-broker/src/test/java/org/apache/activemq/broker/BrokerTestBase.java (original)
+++ activemq/sandbox/activemq-flow/activemq-broker/src/test/java/org/apache/activemq/broker/BrokerTestBase.java Wed Jun 3 19:08:26 2009
@@ -51,7 +51,7 @@
protected final boolean USE_KAHA_DB = true;
protected final boolean PURGE_STORE = true;
- protected final boolean PERSISTENT = true;
+ protected final boolean PERSISTENT = false;
protected final boolean DURABLE = true;
// Set to put senders and consumers on separate brokers.
@@ -479,7 +479,7 @@
store = StoreFactory.createStore("memory");
}
- store.setStoreDirectory(new File("sub/test-data/broker-test/" + broker.getName()));
+ store.setStoreDirectory(new File("test-data/broker-test/" + broker.getName()));
store.setDeleteAllMessages(PURGE_STORE);
return store;
}
Modified: activemq/sandbox/activemq-flow/activemq-openwire/pom.xml
URL: http://svn.apache.org/viewvc/activemq/sandbox/activemq-flow/activemq-openwire/pom.xml?rev=781510&r1=781509&r2=781510&view=diff
==============================================================================
--- activemq/sandbox/activemq-flow/activemq-openwire/pom.xml (original)
+++ activemq/sandbox/activemq-flow/activemq-openwire/pom.xml Wed Jun 3 19:08:26 2009
@@ -45,6 +45,10 @@
<groupId>org.apache.geronimo.specs</groupId>
<artifactId>geronimo-jms_1.1_spec</artifactId>
</dependency>
+ <dependency>
+ <groupId>org.apache.activemq</groupId>
+ <artifactId>activemq-bio</artifactId>
+ </dependency>
<!-- Testing Dependencies -->
<dependency>
Modified: activemq/sandbox/activemq-flow/activemq-openwire/src/main/java/org/apache/activemq/broker/openwire/OpenwireProtocolHandler.java
URL: http://svn.apache.org/viewvc/activemq/sandbox/activemq-flow/activemq-openwire/src/main/java/org/apache/activemq/broker/openwire/OpenwireProtocolHandler.java?rev=781510&r1=781509&r2=781510&view=diff
==============================================================================
--- activemq/sandbox/activemq-flow/activemq-openwire/src/main/java/org/apache/activemq/broker/openwire/OpenwireProtocolHandler.java (original)
+++ activemq/sandbox/activemq-flow/activemq-openwire/src/main/java/org/apache/activemq/broker/openwire/OpenwireProtocolHandler.java Wed Jun 3 19:08:26 2009
@@ -219,7 +219,11 @@
}
public Response processKeepAlive(KeepAliveInfo info) throws Exception {
- return ack(command);
+ if (info.isResponseRequired()) {
+ info.setResponseRequired(false);
+ connection.write(info);
+ }
+ return null;
}
public Response processFlush(FlushCommand info) throws Exception {
Modified: activemq/sandbox/activemq-flow/activemq-openwire/src/main/java/org/apache/activemq/openwire/OpenWireFormat.java
URL: http://svn.apache.org/viewvc/activemq/sandbox/activemq-flow/activemq-openwire/src/main/java/org/apache/activemq/openwire/OpenWireFormat.java?rev=781510&r1=781509&r2=781510&view=diff
==============================================================================
--- activemq/sandbox/activemq-flow/activemq-openwire/src/main/java/org/apache/activemq/openwire/OpenWireFormat.java (original)
+++ activemq/sandbox/activemq-flow/activemq-openwire/src/main/java/org/apache/activemq/openwire/OpenWireFormat.java Wed Jun 3 19:08:26 2009
@@ -27,6 +27,9 @@
import org.apache.activemq.command.CommandTypes;
import org.apache.activemq.command.DataStructure;
import org.apache.activemq.command.WireFormatInfo;
+import org.apache.activemq.transport.Transport;
+import org.apache.activemq.transport.InactivityMonitor;
+import org.apache.activemq.transport.WireFormatNegotiator;
import org.apache.activemq.util.ByteSequence;
import org.apache.activemq.util.ByteSequenceData;
import org.apache.activemq.util.DataByteArrayInputStream;
@@ -62,7 +65,7 @@
private DataByteArrayOutputStream bytesOut = new DataByteArrayOutputStream();
private DataByteArrayInputStream bytesIn = new DataByteArrayInputStream();
private WireFormatInfo preferedWireFormatInfo;
-
+
private AtomicBoolean receivingMessage = new AtomicBoolean(false);
public OpenWireFormat() {
@@ -74,10 +77,8 @@
}
public int hashCode() {
- return version ^ (cacheEnabled ? 0x10000000 : 0x20000000)
- ^ (stackTraceEnabled ? 0x01000000 : 0x02000000)
- ^ (tightEncodingEnabled ? 0x00100000 : 0x00200000)
- ^ (sizePrefixDisabled ? 0x00010000 : 0x00020000);
+ return version ^ (cacheEnabled ? 0x10000000 : 0x20000000) ^ (stackTraceEnabled ? 0x01000000 : 0x02000000) ^ (tightEncodingEnabled ? 0x00100000 : 0x00200000)
+ ^ (sizePrefixDisabled ? 0x00010000 : 0x00020000);
}
public OpenWireFormat copy() {
@@ -96,16 +97,14 @@
if (object == null) {
return false;
}
- OpenWireFormat o = (OpenWireFormat)object;
- return o.stackTraceEnabled == stackTraceEnabled && o.cacheEnabled == cacheEnabled
- && o.version == version && o.tightEncodingEnabled == tightEncodingEnabled
- && o.sizePrefixDisabled == sizePrefixDisabled;
+ OpenWireFormat o = (OpenWireFormat) object;
+ return o.stackTraceEnabled == stackTraceEnabled && o.cacheEnabled == cacheEnabled && o.version == version && o.tightEncodingEnabled == tightEncodingEnabled
+ && o.sizePrefixDisabled == sizePrefixDisabled;
}
-
public String toString() {
- return "OpenWireFormat{version=" + version + ", cacheEnabled=" + cacheEnabled + ", stackTraceEnabled=" + stackTraceEnabled + ", tightEncodingEnabled="
- + tightEncodingEnabled + ", sizePrefixDisabled=" + sizePrefixDisabled + "}";
+ return "OpenWireFormat{version=" + version + ", cacheEnabled=" + cacheEnabled + ", stackTraceEnabled=" + stackTraceEnabled + ", tightEncodingEnabled=" + tightEncodingEnabled
+ + ", sizePrefixDisabled=" + sizePrefixDisabled + "}";
// return "OpenWireFormat{id="+id+",
// tightEncodingEnabled="+tightEncodingEnabled+"}";
}
@@ -120,12 +119,12 @@
runMarshallCacheEvictionSweep();
}
-// MarshallAware ma = null;
-// // If not using value caching, then the marshaled form is always the
-// // same
-// if (!cacheEnabled && ((DataStructure)command).isMarshallAware()) {
-// ma = (MarshallAware)command;
-// }
+ // MarshallAware ma = null;
+ // // If not using value caching, then the marshaled form is always the
+ // // same
+ // if (!cacheEnabled && ((DataStructure)command).isMarshallAware()) {
+ // ma = (MarshallAware)command;
+ // }
ByteSequence sequence = null;
// if( ma!=null ) {
@@ -137,9 +136,9 @@
int size = 1;
if (command != null) {
- DataStructure c = (DataStructure)command;
+ DataStructure c = (DataStructure) command;
byte type = c.getDataStructureType();
- DataStreamMarshaller dsm = (DataStreamMarshaller)dataMarshallers[type & 0xFF];
+ DataStreamMarshaller dsm = (DataStreamMarshaller) dataMarshallers[type & 0xFF];
if (dsm == null) {
throw new IOException("Unknown data type: " + type);
}
@@ -162,8 +161,8 @@
bytesOut.restart();
if (!sizePrefixDisabled) {
bytesOut.writeInt(0); // we don't know the final size
- // yet but write this here for
- // now.
+ // yet but write this here for
+ // now.
}
bytesOut.writeByte(type);
dsm.looseMarshal(this, c, bytesOut);
@@ -220,9 +219,9 @@
int size = 1;
if (o != null) {
- DataStructure c = (DataStructure)o;
+ DataStructure c = (DataStructure) o;
byte type = c.getDataStructureType();
- DataStreamMarshaller dsm = (DataStreamMarshaller)dataMarshallers[type & 0xFF];
+ DataStreamMarshaller dsm = (DataStreamMarshaller) dataMarshallers[type & 0xFF];
if (dsm == null) {
throw new IOException("Unknown data type: " + type);
}
@@ -260,7 +259,7 @@
} else {
if (!sizePrefixDisabled) {
- dataOut.writeInt(size);
+ dataOut.writeInt(size);
}
dataOut.writeByte(NULL_TYPE);
}
@@ -285,9 +284,9 @@
public int tightMarshal1(Object o, BooleanStream bs) throws IOException {
int size = 1;
if (o != null) {
- DataStructure c = (DataStructure)o;
+ DataStructure c = (DataStructure) o;
byte type = c.getDataStructureType();
- DataStreamMarshaller dsm = (DataStreamMarshaller)dataMarshallers[type & 0xFF];
+ DataStreamMarshaller dsm = (DataStreamMarshaller) dataMarshallers[type & 0xFF];
if (dsm == null) {
throw new IOException("Unknown data type: " + type);
}
@@ -308,9 +307,9 @@
}
if (o != null) {
- DataStructure c = (DataStructure)o;
+ DataStructure c = (DataStructure) o;
byte type = c.getDataStructureType();
- DataStreamMarshaller dsm = (DataStreamMarshaller)dataMarshallers[type & 0xFF];
+ DataStreamMarshaller dsm = (DataStreamMarshaller) dataMarshallers[type & 0xFF];
if (dsm == null) {
throw new IOException("Unknown data type: " + type);
}
@@ -332,21 +331,13 @@
try {
mfClass = Class.forName(mfName, false, getClass().getClassLoader());
} catch (ClassNotFoundException e) {
- throw (IllegalArgumentException)new IllegalArgumentException("Invalid version: " + version
- + ", could not load " + mfName)
- .initCause(e);
+ throw (IllegalArgumentException) new IllegalArgumentException("Invalid version: " + version + ", could not load " + mfName).initCause(e);
}
try {
- Method method = mfClass.getMethod("createMarshallerMap", new Class[] {OpenWireFormat.class});
- dataMarshallers = (DataStreamMarshaller[])method.invoke(null, new Object[] {this});
+ Method method = mfClass.getMethod("createMarshallerMap", new Class[] { OpenWireFormat.class });
+ dataMarshallers = (DataStreamMarshaller[]) method.invoke(null, new Object[] { this });
} catch (Throwable e) {
- throw (IllegalArgumentException)new IllegalArgumentException(
- "Invalid version: "
- + version
- + ", "
- + mfName
- + " does not properly implement the createMarshallerMap method.")
- .initCause(e);
+ throw (IllegalArgumentException) new IllegalArgumentException("Invalid version: " + version + ", " + mfName + " does not properly implement the createMarshallerMap method.").initCause(e);
}
this.version = version;
}
@@ -355,7 +346,7 @@
byte dataType = dis.readByte();
receivingMessage.set(true);
if (dataType != NULL_TYPE) {
- DataStreamMarshaller dsm = (DataStreamMarshaller)dataMarshallers[dataType & 0xFF];
+ DataStreamMarshaller dsm = (DataStreamMarshaller) dataMarshallers[dataType & 0xFF];
if (dsm == null) {
throw new IOException("Unknown data type: " + dataType);
}
@@ -396,15 +387,14 @@
}
byte type = o.getDataStructureType();
- DataStreamMarshaller dsm = (DataStreamMarshaller)dataMarshallers[type & 0xFF];
+ DataStreamMarshaller dsm = (DataStreamMarshaller) dataMarshallers[type & 0xFF];
if (dsm == null) {
throw new IOException("Unknown data type: " + type);
}
return 1 + dsm.tightMarshal1(this, o, bs);
}
- public void tightMarshalNestedObject2(DataStructure o, DataOutput ds, BooleanStream bs)
- throws IOException {
+ public void tightMarshalNestedObject2(DataStructure o, DataOutput ds, BooleanStream bs) throws IOException {
if (!bs.readBoolean()) {
return;
}
@@ -423,7 +413,7 @@
} else {
- DataStreamMarshaller dsm = (DataStreamMarshaller)dataMarshallers[type & 0xFF];
+ DataStreamMarshaller dsm = (DataStreamMarshaller) dataMarshallers[type & 0xFF];
if (dsm == null) {
throw new IOException("Unknown data type: " + type);
}
@@ -436,7 +426,7 @@
if (bs.readBoolean()) {
byte dataType = dis.readByte();
- DataStreamMarshaller dsm = (DataStreamMarshaller)dataMarshallers[dataType & 0xFF];
+ DataStreamMarshaller dsm = (DataStreamMarshaller) dataMarshallers[dataType & 0xFF];
if (dsm == null) {
throw new IOException("Unknown data type: " + dataType);
}
@@ -469,7 +459,7 @@
if (dis.readBoolean()) {
byte dataType = dis.readByte();
- DataStreamMarshaller dsm = (DataStreamMarshaller)dataMarshallers[dataType & 0xFF];
+ DataStreamMarshaller dsm = (DataStreamMarshaller) dataMarshallers[dataType & 0xFF];
if (dsm == null) {
throw new IOException("Unknown data type: " + dataType);
}
@@ -487,7 +477,7 @@
if (o != null) {
byte type = o.getDataStructureType();
dataOut.writeByte(type);
- DataStreamMarshaller dsm = (DataStreamMarshaller)dataMarshallers[type & 0xFF];
+ DataStreamMarshaller dsm = (DataStreamMarshaller) dataMarshallers[type & 0xFF];
if (dsm == null) {
throw new IOException("Unknown data type: " + type);
}
@@ -529,7 +519,7 @@
} else {
// Use -1 to indicate that the value was not cached due to cache
// being full.
- return new Short((short)-1);
+ return new Short((short) -1);
}
}
@@ -595,9 +585,9 @@
public WireFormatInfo getPreferedWireFormatInfo() {
return preferedWireFormatInfo;
}
-
+
public boolean inReceive() {
- return receivingMessage.get();
+ return receivingMessage.get();
}
public void renegotiateWireFormat(WireFormatInfo info) throws IOException {
@@ -618,12 +608,10 @@
this.cacheEnabled = info.isCacheEnabled() && preferedWireFormatInfo.isCacheEnabled();
info.setCacheEnabled(this.cacheEnabled);
- this.tightEncodingEnabled = info.isTightEncodingEnabled()
- && preferedWireFormatInfo.isTightEncodingEnabled();
+ this.tightEncodingEnabled = info.isTightEncodingEnabled() && preferedWireFormatInfo.isTightEncodingEnabled();
info.setTightEncodingEnabled(this.tightEncodingEnabled);
- this.sizePrefixDisabled = info.isSizePrefixDisabled()
- && preferedWireFormatInfo.isSizePrefixDisabled();
+ this.sizePrefixDisabled = info.isSizePrefixDisabled() && preferedWireFormatInfo.isSizePrefixDisabled();
info.setSizePrefixDisabled(this.sizePrefixDisabled);
if (cacheEnabled) {
@@ -656,4 +644,14 @@
}
return version2;
}
+
+ public Transport createTransportFilters(Transport transport, Map options) {
+
+ if (transport.isUseInactivityMonitor()) {
+ transport = new InactivityMonitor(transport, this);
+ }
+
+ transport = new WireFormatNegotiator(transport, this, 1);
+ return transport;
+ }
}
Modified: activemq/sandbox/activemq-flow/activemq-openwire/src/main/java/org/apache/activemq/transport/failover/FailoverTransport.java
URL: http://svn.apache.org/viewvc/activemq/sandbox/activemq-flow/activemq-openwire/src/main/java/org/apache/activemq/transport/failover/FailoverTransport.java?rev=781510&r1=781509&r2=781510&view=diff
==============================================================================
--- activemq/sandbox/activemq-flow/activemq-openwire/src/main/java/org/apache/activemq/transport/failover/FailoverTransport.java (original)
+++ activemq/sandbox/activemq-flow/activemq-openwire/src/main/java/org/apache/activemq/transport/failover/FailoverTransport.java Wed Jun 3 19:08:26 2009
@@ -655,6 +655,11 @@
return true;
}
+ public boolean isUseInactivityMonitor() {
+ //this is up to the underlying transport:
+ return false;
+ }
+
final boolean doReconnect() {
Exception failure = null;
synchronized (reconnectMutex) {
Modified: activemq/sandbox/activemq-flow/activemq-queue/src/main/java/org/apache/activemq/queue/CursoredQueue.java
URL: http://svn.apache.org/viewvc/activemq/sandbox/activemq-flow/activemq-queue/src/main/java/org/apache/activemq/queue/CursoredQueue.java?rev=781510&r1=781509&r2=781510&view=diff
==============================================================================
--- activemq/sandbox/activemq-flow/activemq-queue/src/main/java/org/apache/activemq/queue/CursoredQueue.java (original)
+++ activemq/sandbox/activemq-flow/activemq-queue/src/main/java/org/apache/activemq/queue/CursoredQueue.java Wed Jun 3 19:08:26 2009
@@ -275,7 +275,7 @@
private boolean paused;
- public Cursor(CursoredQueue<V> queue, String name, boolean skipAcquired, boolean pageInElements, IFlowController<QueueElement<V>> memoryController) {
+ private Cursor(CursoredQueue<V> queue, String name, boolean skipAcquired, boolean pageInElements, IFlowController<QueueElement<V>> memoryController) {
this.name = name;
this.queue = queue.queue;
this.loader = queue.loader;
@@ -718,6 +718,8 @@
int softRefs = 0;
// Indicates whether this element is loaded or a placeholder:
+ // A place holder indicates that one or more elements is paged
+ // out at and above this sequence number.
boolean loaded = true;
// Indicates that we have requested a save for the element
@@ -765,6 +767,7 @@
public final void addHardRef() {
hardRefs++;
+
// Page in the element (providing it wasn't removed):
if (elem == null && !deleted) {
// If this is the first request for this
@@ -877,6 +880,7 @@
// If deleted unlink this element from the queue, and link
// together adjacent paged out entries:
if (deleted) {
+ loaded = false;
unlink();
// If both next and previous entries are unloaded,
// then collapse them:
@@ -1363,8 +1367,6 @@
// in memory:
if (persistencePolicy.isPagingEnabled()) {
- // Otherwise check with any other open cursor to see if
- // it can hang on to the element:
Collection<Cursor<V>> active = null;
active = reservedBlocks.get(qe.restoreBlock);
@@ -1423,11 +1425,24 @@
QueueElement<V> tail = queue.getTail();
// If we're at the end of the queue we don't need to
// load if we never paged out the block:
- if (tail != null && tail.isLoaded() && tail.restoreBlock == block && tail.isFirstInBlock()) {
- load = false;
- } else {
- load |= pageOutPlaceHolders;
+ // if (tail != null && tail.isLoaded() && tail.restoreBlock == block && tail.isFirstInBlock()) {
+ // load = false;
+ // } else {
+ //Otherwise add a soft ref for all element in the block that are already
+ //loaded.
+ QueueElement<V> qe = queue.upper(RESTORE_BLOCK_SIZE * block, true);
+ while (qe != null && qe.restoreBlock == block) {
+ QueueElement<V> next = qe.getNext();
+ if (qe.isLoaded()) {
+ qe.addSoftRef();
+ } else {
+ //If we find an unloaded element
+ //we'll need to load:
+ load = true;
+ }
+ qe = next;
}
+ // }
}
}
cursors.add(cursor);
@@ -1460,11 +1475,7 @@
QueueElement<V> qe = queue.upper(RESTORE_BLOCK_SIZE * block, true);
while (qe != null && qe.restoreBlock == block) {
QueueElement<V> next = qe.getNext();
- // If we page out place holders, release the
- // soft ref we added when we loaded the element:
- if (pageOutPlaceHolders) {
- qe.releaseSoftRef();
- }
+ qe.releaseSoftRef();
qe = next;
}
}
Modified: activemq/sandbox/activemq-flow/activemq-queue/src/test/java/org/apache/activemq/flow/Proto2WireFormatFactory.java
URL: http://svn.apache.org/viewvc/activemq/sandbox/activemq-flow/activemq-queue/src/test/java/org/apache/activemq/flow/Proto2WireFormatFactory.java?rev=781510&r1=781509&r2=781510&view=diff
==============================================================================
--- activemq/sandbox/activemq-flow/activemq-queue/src/test/java/org/apache/activemq/flow/Proto2WireFormatFactory.java (original)
+++ activemq/sandbox/activemq-flow/activemq-queue/src/test/java/org/apache/activemq/flow/Proto2WireFormatFactory.java Wed Jun 3 19:08:26 2009
@@ -5,19 +5,21 @@
import java.io.IOException;
import java.io.UnsupportedEncodingException;
import java.nio.ByteBuffer;
+import java.util.Map;
import org.apache.activemq.flow.Commands.Destination.DestinationBean;
import org.apache.activemq.flow.Commands.Destination.DestinationBuffer;
import org.apache.activemq.flow.Commands.FlowControl.FlowControlBean;
import org.apache.activemq.flow.Commands.FlowControl.FlowControlBuffer;
import org.apache.activemq.flow.Commands.Message.MessageBean;
+import org.apache.activemq.transport.Transport;
import org.apache.activemq.util.ByteSequence;
+import org.apache.activemq.util.DataByteArrayInputStream;
+import org.apache.activemq.util.DataByteArrayOutputStream;
import org.apache.activemq.util.IOExceptionSupport;
import org.apache.activemq.wireformat.StatefulWireFormat;
import org.apache.activemq.wireformat.WireFormat;
import org.apache.activemq.wireformat.WireFormatFactory;
-import org.apache.activemq.util.DataByteArrayInputStream;
-import org.apache.activemq.util.DataByteArrayOutputStream;
public class Proto2WireFormatFactory implements WireFormatFactory {
@@ -262,6 +264,10 @@
DataByteArrayInputStream is = new DataByteArrayInputStream(data);
return unmarshal(is);
}
+
+ public Transport createTransportFilters(Transport transport, Map options) {
+ return transport;
+ }
}
public WireFormat createWireFormat() {
Modified: activemq/sandbox/activemq-flow/activemq-queue/src/test/java/org/apache/activemq/flow/ProtoWireFormatFactory.java
URL: http://svn.apache.org/viewvc/activemq/sandbox/activemq-flow/activemq-queue/src/test/java/org/apache/activemq/flow/ProtoWireFormatFactory.java?rev=781510&r1=781509&r2=781510&view=diff
==============================================================================
--- activemq/sandbox/activemq-flow/activemq-queue/src/test/java/org/apache/activemq/flow/ProtoWireFormatFactory.java (original)
+++ activemq/sandbox/activemq-flow/activemq-queue/src/test/java/org/apache/activemq/flow/ProtoWireFormatFactory.java Wed Jun 3 19:08:26 2009
@@ -5,18 +5,20 @@
import java.io.IOException;
import java.io.UnsupportedEncodingException;
import java.nio.ByteBuffer;
+import java.util.Map;
import org.apache.activemq.flow.Commands.Destination.DestinationBuffer;
import org.apache.activemq.flow.Commands.FlowControl.FlowControlBuffer;
import org.apache.activemq.flow.Commands.Message.MessageBuffer;
import org.apache.activemq.protobuf.Buffer;
+import org.apache.activemq.transport.Transport;
import org.apache.activemq.util.ByteSequence;
+import org.apache.activemq.util.DataByteArrayInputStream;
+import org.apache.activemq.util.DataByteArrayOutputStream;
import org.apache.activemq.util.IOExceptionSupport;
import org.apache.activemq.wireformat.StatefulWireFormat;
import org.apache.activemq.wireformat.WireFormat;
import org.apache.activemq.wireformat.WireFormatFactory;
-import org.apache.activemq.util.DataByteArrayInputStream;
-import org.apache.activemq.util.DataByteArrayOutputStream;
public class ProtoWireFormatFactory implements WireFormatFactory {
@@ -234,6 +236,10 @@
DataByteArrayInputStream is = new DataByteArrayInputStream(data);
return unmarshal(is);
}
+
+ public Transport createTransportFilters(Transport transport, Map options) {
+ return transport;
+ }
}
public WireFormat createWireFormat() {
Modified: activemq/sandbox/activemq-flow/activemq-queue/src/test/java/org/apache/activemq/flow/TestWireFormatFactory.java
URL: http://svn.apache.org/viewvc/activemq/sandbox/activemq-flow/activemq-queue/src/test/java/org/apache/activemq/flow/TestWireFormatFactory.java?rev=781510&r1=781509&r2=781510&view=diff
==============================================================================
--- activemq/sandbox/activemq-flow/activemq-queue/src/test/java/org/apache/activemq/flow/TestWireFormatFactory.java (original)
+++ activemq/sandbox/activemq-flow/activemq-queue/src/test/java/org/apache/activemq/flow/TestWireFormatFactory.java Wed Jun 3 19:08:26 2009
@@ -7,7 +7,9 @@
import java.io.IOException;
import java.io.ObjectInputStream;
import java.io.ObjectOutputStream;
+import java.util.Map;
+import org.apache.activemq.transport.Transport;
import org.apache.activemq.util.ByteSequence;
import org.apache.activemq.util.IOExceptionSupport;
import org.apache.activemq.wireformat.WireFormat;
@@ -57,6 +59,10 @@
public Object unmarshal(ByteSequence data) throws IOException {
return null;
}
+
+ public Transport createTransportFilters(Transport transport, Map options) {
+ return transport;
+ }
}
public WireFormat createWireFormat() {
Modified: activemq/sandbox/activemq-flow/activemq-transport/src/main/java/org/apache/activemq/transport/Transport.java
URL: http://svn.apache.org/viewvc/activemq/sandbox/activemq-flow/activemq-transport/src/main/java/org/apache/activemq/transport/Transport.java?rev=781510&r1=781509&r2=781510&view=diff
==============================================================================
--- activemq/sandbox/activemq-flow/activemq-transport/src/main/java/org/apache/activemq/transport/Transport.java (original)
+++ activemq/sandbox/activemq-flow/activemq-transport/src/main/java/org/apache/activemq/transport/Transport.java Wed Jun 3 19:08:26 2009
@@ -138,6 +138,15 @@
boolean isFaultTolerant();
/**
+ * Indicates that the transport needs inactivity monitoring. This
+ * is true for transports like tcp that may not otherwise detect
+ * a transport failure in a timely fashion.
+ *
+ * @return true if the transport requires inactivity monitoring.
+ */
+ boolean isUseInactivityMonitor();
+
+ /**
* @return true if the transport is disposed
*/
boolean isDisposed();
Modified: activemq/sandbox/activemq-flow/activemq-transport/src/main/java/org/apache/activemq/transport/TransportFilter.java
URL: http://svn.apache.org/viewvc/activemq/sandbox/activemq-flow/activemq-transport/src/main/java/org/apache/activemq/transport/TransportFilter.java?rev=781510&r1=781509&r2=781510&view=diff
==============================================================================
--- activemq/sandbox/activemq-flow/activemq-transport/src/main/java/org/apache/activemq/transport/TransportFilter.java (original)
+++ activemq/sandbox/activemq-flow/activemq-transport/src/main/java/org/apache/activemq/transport/TransportFilter.java Wed Jun 3 19:08:26 2009
@@ -137,4 +137,8 @@
public void reconnect(URI uri) throws IOException {
next.reconnect(uri);
}
+
+ public boolean isUseInactivityMonitor() {
+ return next.isUseInactivityMonitor();
+ }
}
Modified: activemq/sandbox/activemq-flow/activemq-transport/src/main/java/org/apache/activemq/transport/TransportServerThreadSupport.java
URL: http://svn.apache.org/viewvc/activemq/sandbox/activemq-flow/activemq-transport/src/main/java/org/apache/activemq/transport/TransportServerThreadSupport.java?rev=781510&r1=781509&r2=781510&view=diff
==============================================================================
--- activemq/sandbox/activemq-flow/activemq-transport/src/main/java/org/apache/activemq/transport/TransportServerThreadSupport.java (original)
+++ activemq/sandbox/activemq-flow/activemq-transport/src/main/java/org/apache/activemq/transport/TransportServerThreadSupport.java Wed Jun 3 19:08:26 2009
@@ -18,7 +18,7 @@
import java.net.URI;
-import org.apache.activemq.ThreadPriorities;
+//import org.apache.activemq.ThreadPriorities;
import org.apache.activemq.util.ServiceStopper;
import org.apache.commons.logging.Log;
import org.apache.commons.logging.LogFactory;
@@ -72,7 +72,7 @@
LOG.info("Listening for connections at: " + getConnectURI());
runner = new Thread(null, this, "ActiveMQ Transport Server: " + toString(), stackSize);
runner.setDaemon(daemon);
- runner.setPriority(ThreadPriorities.BROKER_MANAGEMENT);
+ //runner.setPriority(ThreadPriorities.BROKER_MANAGEMENT);
runner.start();
}
Modified: activemq/sandbox/activemq-flow/activemq-transport/src/main/java/org/apache/activemq/transport/TransportSupport.java
URL: http://svn.apache.org/viewvc/activemq/sandbox/activemq-flow/activemq-transport/src/main/java/org/apache/activemq/transport/TransportSupport.java?rev=781510&r1=781509&r2=781510&view=diff
==============================================================================
--- activemq/sandbox/activemq-flow/activemq-transport/src/main/java/org/apache/activemq/transport/TransportSupport.java (original)
+++ activemq/sandbox/activemq-flow/activemq-transport/src/main/java/org/apache/activemq/transport/TransportSupport.java Wed Jun 3 19:08:26 2009
@@ -106,18 +106,21 @@
public boolean isFaultTolerant() {
return false;
}
-
-
- public void reconnect(URI uri) throws IOException {
- throw new IOException("Not supported");
- }
-
- public boolean isDisposed() {
- return isStopped();
- }
-
- public boolean isConnected() {
- return isStarted();
- }
+
+ public boolean isUseInactivityMonitor() {
+ return false;
+ }
+
+ public void reconnect(URI uri) throws IOException {
+ throw new IOException("Not supported");
+ }
+
+ public boolean isDisposed() {
+ return isStopped();
+ }
+
+ public boolean isConnected() {
+ return isStarted();
+ }
}
Modified: activemq/sandbox/activemq-flow/activemq-transport/src/main/java/org/apache/activemq/transport/pipe/PipeTransportFactory.java
URL: http://svn.apache.org/viewvc/activemq/sandbox/activemq-flow/activemq-transport/src/main/java/org/apache/activemq/transport/pipe/PipeTransportFactory.java?rev=781510&r1=781509&r2=781510&view=diff
==============================================================================
--- activemq/sandbox/activemq-flow/activemq-transport/src/main/java/org/apache/activemq/transport/pipe/PipeTransportFactory.java (original)
+++ activemq/sandbox/activemq-flow/activemq-transport/src/main/java/org/apache/activemq/transport/pipe/PipeTransportFactory.java Wed Jun 3 19:08:26 2009
@@ -29,7 +29,6 @@
import org.apache.activemq.wireformat.WireFormat;
import org.apache.activemq.wireformat.WireFormatFactory;
-
public class PipeTransportFactory extends TransportFactory {
private final HashMap<String, PipeTransportServer> servers = new HashMap<String, PipeTransportServer>();
@@ -155,6 +154,10 @@
return false;
}
+ public boolean isUseInactivityMonitor() {
+ return false;
+ }
+
public <T> T narrow(Class<T> target) {
if (target.isAssignableFrom(getClass())) {
return target.cast(this);
Modified: activemq/sandbox/activemq-flow/activemq-transport/src/main/java/org/apache/activemq/wireformat/MultiWireFormatFactory.java
URL: http://svn.apache.org/viewvc/activemq/sandbox/activemq-flow/activemq-transport/src/main/java/org/apache/activemq/wireformat/MultiWireFormatFactory.java?rev=781510&r1=781509&r2=781510&view=diff
==============================================================================
--- activemq/sandbox/activemq-flow/activemq-transport/src/main/java/org/apache/activemq/wireformat/MultiWireFormatFactory.java (original)
+++ activemq/sandbox/activemq-flow/activemq-transport/src/main/java/org/apache/activemq/wireformat/MultiWireFormatFactory.java Wed Jun 3 19:08:26 2009
@@ -23,7 +23,10 @@
import java.io.IOException;
import java.io.InputStream;
import java.util.ArrayList;
+import java.util.Map;
+import org.apache.activemq.transport.Transport;
+import org.apache.activemq.transport.TransportFilter;
import org.apache.activemq.util.ByteArrayInputStream;
import org.apache.activemq.util.ByteArrayOutputStream;
import org.apache.activemq.util.ByteSequence;
@@ -136,6 +139,9 @@
maxHeaderLength = Math.max( maxHeaderLength, wff.maxWireformatHeaderLength());
}
}
+ public Transport createTransportFilters(Transport transport, Map options) {
+ return transport;
+ }
}
public WireFormat createWireFormat() {
Modified: activemq/sandbox/activemq-flow/activemq-transport/src/main/java/org/apache/activemq/wireformat/ObjectStreamWireFormat.java
URL: http://svn.apache.org/viewvc/activemq/sandbox/activemq-flow/activemq-transport/src/main/java/org/apache/activemq/wireformat/ObjectStreamWireFormat.java?rev=781510&r1=781509&r2=781510&view=diff
==============================================================================
--- activemq/sandbox/activemq-flow/activemq-transport/src/main/java/org/apache/activemq/wireformat/ObjectStreamWireFormat.java (original)
+++ activemq/sandbox/activemq-flow/activemq-transport/src/main/java/org/apache/activemq/wireformat/ObjectStreamWireFormat.java Wed Jun 3 19:08:26 2009
@@ -24,7 +24,9 @@
import java.io.InputStream;
import java.io.ObjectOutputStream;
import java.io.OutputStream;
+import java.util.Map;
+import org.apache.activemq.transport.Transport;
import org.apache.activemq.util.ByteArrayInputStream;
import org.apache.activemq.util.ByteArrayOutputStream;
import org.apache.activemq.util.ByteSequence;
@@ -79,7 +81,8 @@
// TODO implement the inactivity monitor
return false;
}
-
-
+ public Transport createTransportFilters(Transport transport, Map options) {
+ return transport;
+ }
}
Modified: activemq/sandbox/activemq-flow/activemq-transport/src/main/java/org/apache/activemq/wireformat/WireFormat.java
URL: http://svn.apache.org/viewvc/activemq/sandbox/activemq-flow/activemq-transport/src/main/java/org/apache/activemq/wireformat/WireFormat.java?rev=781510&r1=781509&r2=781510&view=diff
==============================================================================
--- activemq/sandbox/activemq-flow/activemq-transport/src/main/java/org/apache/activemq/wireformat/WireFormat.java (original)
+++ activemq/sandbox/activemq-flow/activemq-transport/src/main/java/org/apache/activemq/wireformat/WireFormat.java Wed Jun 3 19:08:26 2009
@@ -19,7 +19,10 @@
import java.io.DataInput;
import java.io.DataOutput;
import java.io.IOException;
+import java.util.Map;
+import org.apache.activemq.transport.Transport;
+import org.apache.activemq.transport.TransportFilter;
import org.apache.activemq.util.ByteSequence;
@@ -66,4 +69,13 @@
*/
boolean inReceive();
+ /**
+ * Creates any transport filters appropriate for the given wire format:
+ *
+ * @param transport The transport to filter.
+ * @param options The options with which the transport was created.
+ * @return Either the given transport or a Transport filter wrapping the onw provided.
+ */
+ Transport createTransportFilters(Transport transport, Map options);
+
}
Modified: activemq/sandbox/activemq-flow/activemq-util/src/main/java/org/apache/activemq/util/TreeMap.java
URL: http://svn.apache.org/viewvc/activemq/sandbox/activemq-flow/activemq-util/src/main/java/org/apache/activemq/util/TreeMap.java?rev=781510&r1=781509&r2=781510&view=diff
==============================================================================
--- activemq/sandbox/activemq-flow/activemq-util/src/main/java/org/apache/activemq/util/TreeMap.java (original)
+++ activemq/sandbox/activemq-flow/activemq-util/src/main/java/org/apache/activemq/util/TreeMap.java Wed Jun 3 19:08:26 2009
@@ -701,7 +701,7 @@
while (currentNode != root && isBlack(currentNode)) {
if (isLeftChild(currentNode)) {
- TreeMapNode<K, V> siblingNode = getRight(currentNode.parent);
+ TreeMapNode<K, V> siblingNode = getRight(parent(currentNode));
if (isRed(siblingNode)) {
color(siblingNode, BLACK);
@@ -714,7 +714,7 @@
if (isBlack(getLeft(siblingNode)) && isBlack(getRight(siblingNode))) {
color(siblingNode, RED);
- currentNode = currentNode.parent;
+ currentNode = parent(currentNode);
} else {
if (isBlack(getRight(siblingNode))) {
color(getLeft(siblingNode), BLACK);
@@ -732,7 +732,7 @@
currentNode = root;
}
} else {
- TreeMapNode<K, V> siblingNode = getRight(currentNode);
+ TreeMapNode<K, V> siblingNode = getLeft(parent(currentNode));
if (isRed(siblingNode)) {
color(siblingNode, BLACK);
Modified: activemq/sandbox/activemq-flow/webgen/README
URL: http://svn.apache.org/viewvc/activemq/sandbox/activemq-flow/webgen/README?rev=781510&r1=781509&r2=781510&view=diff
==============================================================================
--- activemq/sandbox/activemq-flow/webgen/README (original)
+++ activemq/sandbox/activemq-flow/webgen/README Wed Jun 3 19:08:26 2009
@@ -10,10 +10,17 @@
Installing
-----------------------------
-You first need ruby and rubygems installed on your system. Then you can
-install webgen with the following command
+You first need ruby and rubygems installed on your system.
+You should have gems version of 1.3.4 or greater. You can check this via
-`sudo gem install webgen coderay feedtools haml`
+gem --version
+
+To update gems do
+gem update --system
+
+Then you can install webgen with the following command.
+
+`sudo gem install webgen coderay feedtools haml RedCloth`
Building
-----------------------------
Modified: activemq/sandbox/activemq-flow/webgen/src/architecture.page
URL: http://svn.apache.org/viewvc/activemq/sandbox/activemq-flow/webgen/src/architecture.page?rev=781510&r1=781509&r2=781510&view=diff
==============================================================================
--- activemq/sandbox/activemq-flow/webgen/src/architecture.page (original)
+++ activemq/sandbox/activemq-flow/webgen/src/architecture.page Wed Jun 3 19:08:26 2009
@@ -0,0 +1,11 @@
+
+---
+title: Architectural Overview
+--- pipeline:textile
+
+h2. Overview
+This page explores some of the details around core components.
+
+h2. Flow Control
+
+TODO: I've got some more background notes that I need to
Modified: activemq/sandbox/activemq-flow/webgen/src/metainfo
URL: http://svn.apache.org/viewvc/activemq/sandbox/activemq-flow/webgen/src/metainfo?rev=781510&r1=781509&r2=781510&view=diff
==============================================================================
--- activemq/sandbox/activemq-flow/webgen/src/metainfo (original)
+++ activemq/sandbox/activemq-flow/webgen/src/metainfo Wed Jun 3 19:08:26 2009
@@ -15,5 +15,5 @@
# Define the project properties here, these can be accessed in the
# pages using the {var:} syntax.
# -------------------------------------------------------------------
- project_name: "ActiveMQ Puma"
+ project_name: "ActiveMQ Apollo"
project_slogan: 'The Next Generation of Messaging Middleware'
Modified: activemq/sandbox/activemq-flow/webgen/src/todo.page
URL: http://svn.apache.org/viewvc/activemq/sandbox/activemq-flow/webgen/src/todo.page?rev=781510&r1=781509&r2=781510&view=diff
==============================================================================
--- activemq/sandbox/activemq-flow/webgen/src/todo.page (original)
+++ activemq/sandbox/activemq-flow/webgen/src/todo.page Wed Jun 3 19:08:26 2009
@@ -0,0 +1,8 @@
+---
+title: TODO
+--- pipeline:textile
+
+h2. Tasks
+* InactivityMonitor needs to get inserted for OpenWire on the server side of MultiWireFormat negotiation.
+* BIO SSL Transport does not currently pass Client Certificate chain in OpenWire ConnectionInfo command. This used to be done by the SSL transport but this shouldn't have OpenWire dependencies; instead protocol handler should be able to retrieve it from the underlying transport as needed.
+