You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@activemq.apache.org by ch...@apache.org on 2010/07/07 06:23:37 UTC
svn commit: r961218 [2/2] - in /activemq/sandbox/activemq-apollo-actor:
activemq-broker/src/main/resources/META-INF/services/org.apache.activemq.apollo/
activemq-broker/src/main/resources/META-INF/services/org/apache/activemq/wireformat/
activemq-broke...
Modified: activemq/sandbox/activemq-apollo-actor/activemq-tcp/src/main/java/org/apache/activemq/transport/tcp/TcpTransportServer.java
URL: http://svn.apache.org/viewvc/activemq/sandbox/activemq-apollo-actor/activemq-tcp/src/main/java/org/apache/activemq/transport/tcp/TcpTransportServer.java?rev=961218&r1=961217&r2=961218&view=diff
==============================================================================
--- activemq/sandbox/activemq-apollo-actor/activemq-tcp/src/main/java/org/apache/activemq/transport/tcp/TcpTransportServer.java (original)
+++ activemq/sandbox/activemq-apollo-actor/activemq-tcp/src/main/java/org/apache/activemq/transport/tcp/TcpTransportServer.java Wed Jul 7 04:23:35 2010
@@ -16,12 +16,11 @@
*/
package org.apache.activemq.transport.tcp;
-import org.apache.activemq.transport.Transport;
-import org.apache.activemq.transport.TransportAcceptListener;
-import org.apache.activemq.transport.TransportServer;
+import org.apache.activemq.apollo.transport.TransportServer;
+import org.apache.activemq.apollo.transport.Transport;
+import org.apache.activemq.apollo.transport.TransportAcceptListener;
import org.apache.activemq.util.IOExceptionSupport;
import org.apache.activemq.util.IntrospectionSupport;
-import org.apache.activemq.wireformat.WireFormatFactory;
import org.fusesource.hawtdispatch.Dispatch;
import org.fusesource.hawtdispatch.DispatchQueue;
import org.fusesource.hawtdispatch.DispatchSource;
@@ -42,7 +41,6 @@ import java.util.Map;
public class TcpTransportServer implements TransportServer {
-// protected WireFormatFactory wireFormatFactory;
private ServerSocketChannel channel;
private TransportAcceptListener listener;
private URI bindURI;
@@ -193,7 +191,6 @@ public class TcpTransportServer implemen
HashMap<String, Object> options = new HashMap<String, Object>();
// options.put("maxInactivityDuration", Long.valueOf(maxInactivityDuration));
// options.put("maxInactivityDurationInitalDelay", Long.valueOf(maxInactivityDurationInitalDelay));
-// options.put("minmumWireFormatVersion", Integer.valueOf(minmumWireFormatVersion));
// options.put("trace", Boolean.valueOf(trace));
// options.put("soTimeout", Integer.valueOf(soTimeout));
// options.put("socketBufferSize", Integer.valueOf(socketBufferSize));
@@ -221,364 +218,11 @@ public class TcpTransportServer implemen
this.transportOptions = transportOptions;
}
-
-
-// private static final Log LOG = LogFactory.getLog(TcpTransportServer.class);
-// protected ServerSocket serverSocket;
-// protected int backlog = 5000;
-// protected WireFormatFactory wireFormatFactory;
-// protected final TcpTransportFactory transportFactory;
-// protected long maxInactivityDuration = 30000;
-// protected long maxInactivityDurationInitalDelay = 10000;
-// protected int minmumWireFormatVersion;
-// protected boolean useQueueForAccept=true;
-//
-// /**
-// * trace=true -> the Transport stack where this TcpTransport
-// * object will be, will have a TransportLogger layer
-// * trace=false -> the Transport stack where this TcpTransport
-// * object will be, will NOT have a TransportLogger layer, and therefore
-// * will never be able to print logging messages.
-// * This parameter is most probably set in Connection or TransportConnector URIs.
-// */
-// protected boolean trace = false;
-//
-// protected int soTimeout = 0;
-// protected int socketBufferSize = 64 * 1024;
-// protected int connectionTimeout = 30000;
-//
-// /**
-// * Name of the LogWriter implementation to use.
-// * Names are mapped to classes in the resources/META-INF/services/org/apache/activemq/transport/logwriters directory.
-// * This parameter is most probably set in Connection or TransportConnector URIs.
-//
-// protected String logWriterName = TransportLoggerFactory.defaultLogWriterName;
-// */
-//
-// /**
-// * Specifies if the TransportLogger will be manageable by JMX or not.
-// * Also, as long as there is at least 1 TransportLogger which is manageable,
-// * a TransportLoggerControl MBean will me created.
-// */
-// protected boolean dynamicManagement = false;
-// /**
-// * startLogging=true -> the TransportLogger object of the Transport stack
-// * will initially write messages to the log.
-// * startLogging=false -> the TransportLogger object of the Transport stack
-// * will initially NOT write messages to the log.
-// * This parameter only has an effect if trace == true.
-// * This parameter is most probably set in Connection or TransportConnector URIs.
-// */
-// protected boolean startLogging = true;
-// protected Map<String, Object> transportOptions;
-// protected final ServerSocketFactory serverSocketFactory;
-// protected BlockingQueue<Socket> socketQueue = new LinkedBlockingQueue<Socket>();
-// protected Thread socketHandlerThread;
-// /**
-// * The maximum number of sockets allowed for this server
-// */
-// protected int maximumConnections = Integer.MAX_VALUE;
-// protected int currentTransportCount=0;
-//
-// public TcpTransportServer(TcpTransportFactory transportFactory, URI location, ServerSocketFactory serverSocketFactory) throws IOException, URISyntaxException {
-// super(location);
-// this.transportFactory = transportFactory;
-// this.serverSocketFactory = serverSocketFactory;
-//
-// }
-//
-// public void bind() throws IOException {
-// URI bind = getBindLocation();
-//
-// String host = bind.getHost();
-// host = (host == null || host.length() == 0) ? "localhost" : host;
-// InetAddress addr = InetAddress.getByName(host);
-//
-// try {
-//
-// this.serverSocket = serverSocketFactory.createServerSocket(bind.getPort(), backlog, addr);
-// configureServerSocket(this.serverSocket);
-//
-// } catch (IOException e) {
-// throw IOExceptionSupport.create("Failed to bind to server socket: " + bind + " due to: " + e, e);
-// }
-// try {
-// setConnectURI(new URI(bind.getScheme(), bind.getUserInfo(), resolveHostName(serverSocket, addr), serverSocket.getLocalPort(), bind.getPath(), bind.getQuery(), bind
-// .getFragment()));
-// } catch (URISyntaxException e) {
-//
-// // it could be that the host name contains invalid characters such
-// // as _ on unix platforms
-// // so lets try use the IP address instead
-// try {
-// setConnectURI(new URI(bind.getScheme(), bind.getUserInfo(), addr.getHostAddress(), serverSocket.getLocalPort(), bind.getPath(), bind.getQuery(), bind.getFragment()));
-// } catch (URISyntaxException e2) {
-// throw IOExceptionSupport.create(e2);
-// }
-// }
-// }
-//
-// private void configureServerSocket(ServerSocket socket) throws SocketException {
-// socket.setSoTimeout(2000);
-// if (transportOptions != null) {
-// IntrospectionSupport.setProperties(socket, transportOptions);
-// }
-// }
-//
-// /**
-// * @return Returns the wireFormatFactory.
-// */
-// public WireFormatFactory getWireFormatFactory() {
-// return wireFormatFactory;
-// }
-//
-// /**
-// * @param wireFormatFactory The wireFormatFactory to set.
-// */
-// public void setWireFormatFactory(WireFormatFactory wireFormatFactory) {
-// this.wireFormatFactory = wireFormatFactory;
-// }
-//
-// public long getMaxInactivityDuration() {
-// return maxInactivityDuration;
-// }
-//
-// public void setMaxInactivityDuration(long maxInactivityDuration) {
-// this.maxInactivityDuration = maxInactivityDuration;
-// }
-//
-// public long getMaxInactivityDurationInitalDelay() {
-// return this.maxInactivityDurationInitalDelay;
-// }
-//
-// public void setMaxInactivityDurationInitalDelay(long maxInactivityDurationInitalDelay) {
-// this.maxInactivityDurationInitalDelay = maxInactivityDurationInitalDelay;
-// }
-//
-// public int getMinmumWireFormatVersion() {
-// return minmumWireFormatVersion;
-// }
-//
-// public void setMinmumWireFormatVersion(int minmumWireFormatVersion) {
-// this.minmumWireFormatVersion = minmumWireFormatVersion;
-// }
-//
-// public boolean isTrace() {
-// return trace;
-// }
-//
-// public void setTrace(boolean trace) {
-// this.trace = trace;
-// }
-////
-//// public String getLogWriterName() {
-//// return logWriterName;
-//// }
-////
-//// public void setLogWriterName(String logFormat) {
-//// this.logWriterName = logFormat;
-//// }
-//
-// public boolean isDynamicManagement() {
-// return dynamicManagement;
-// }
-//
-// public void setDynamicManagement(boolean useJmx) {
-// this.dynamicManagement = useJmx;
-// }
-//
-// public boolean isStartLogging() {
-// return startLogging;
-// }
-//
-//
-// public void setStartLogging(boolean startLogging) {
-// this.startLogging = startLogging;
-// }
-//
-// /**
-// * @return the backlog
-// */
-// public int getBacklog() {
-// return backlog;
-// }
-//
-// /**
-// * @param backlog the backlog to set
-// */
-// public void setBacklog(int backlog) {
-// this.backlog = backlog;
-// }
-//
-// /**
-// * @return the useQueueForAccept
-// */
-// public boolean isUseQueueForAccept() {
-// return useQueueForAccept;
-// }
-//
-// /**
-// * @param useQueueForAccept the useQueueForAccept to set
-// */
-// public void setUseQueueForAccept(boolean useQueueForAccept) {
-// this.useQueueForAccept = useQueueForAccept;
-// }
-//
-//
-// /**
-// * pull Sockets from the ServerSocket
-// */
-// public void run() {
-// while (!isStopped()) {
-// Socket socket = null;
-// try {
-// socket = serverSocket.accept();
-// if (socket != null) {
-// if (isStopped() || getAcceptListener() == null) {
-// socket.close();
-// } else {
-// if (useQueueForAccept) {
-// socketQueue.put(socket);
-// }else {
-// handleSocket(socket);
-// }
-// }
-// }
-// } catch (SocketTimeoutException ste) {
-// // expect this to happen
-// } catch (Exception e) {
-// if (!isStopping()) {
-// onAcceptError(e);
-// } else if (!isStopped()) {
-// LOG.warn("run()", e);
-// onAcceptError(e);
-// }
-// }
-// }
-// }
-//
-// /**
-// * Allow derived classes to override the Transport implementation that this
-// * transport server creates.
-// *
-// * @param socket
-// * @param format
-// * @return
-// * @throws IOException
-// */
-// protected Transport createTransport(Socket socket, WireFormat format) throws IOException {
-// return new TcpTransport(format, socket);
-// }
-//
-
/**
* @return pretty print of this
*/
public String toString() {
return "" + bindURI;
}
-//
-// /**
-// * @param socket
-// * @param inetAddress
-// * @return real hostName
-// * @throws UnknownHostException
-// */
-//
-// protected void doStart() throws Exception {
-// if(useQueueForAccept) {
-// Runnable run = new Runnable() {
-// public void run() {
-// try {
-// while (!isStopped() && !isStopping()) {
-// Socket sock = socketQueue.poll(1, TimeUnit.SECONDS);
-// if (sock != null) {
-// handleSocket(sock);
-// }
-// }
-//
-// } catch (InterruptedException e) {
-// LOG.info("socketQueue interuppted - stopping");
-// if (!isStopping()) {
-// onAcceptError(e);
-// }
-// }
-//
-// }
-//
-// };
-// socketHandlerThread = new Thread(null, run,
-// "ActiveMQ Transport Server Thread Handler: " + toString(),
-// getStackSize());
-// socketHandlerThread.setDaemon(true);
-// //socketHandlerThread.setPriority(ThreadPriorities.BROKER_MANAGEMENT-1);
-// socketHandlerThread.start();
-// }
-// super.doStart();
-//
-// }
-//
-// protected void doStop(ServiceStopper stopper) throws Exception {
-// super.doStop(stopper);
-// if (serverSocket != null) {
-// serverSocket.close();
-// }
-// }
-//
-// public InetSocketAddress getSocketAddress() {
-// return (InetSocketAddress)serverSocket.getLocalSocketAddress();
-// }
-//
-// public void setTransportOption(Map<String, Object> transportOptions) {
-// this.transportOptions = transportOptions;
-// }
-//
-//
-// public int getSoTimeout() {
-// return soTimeout;
-// }
-//
-// public void setSoTimeout(int soTimeout) {
-// this.soTimeout = soTimeout;
-// }
-//
-// public int getSocketBufferSize() {
-// return socketBufferSize;
-// }
-//
-// public void setSocketBufferSize(int socketBufferSize) {
-// this.socketBufferSize = socketBufferSize;
-// }
-//
-// public int getConnectionTimeout() {
-// return connectionTimeout;
-// }
-//
-// public void setConnectionTimeout(int connectionTimeout) {
-// this.connectionTimeout = connectionTimeout;
-// }
-//
-// /**
-// * @return the maximumConnections
-// */
-// public int getMaximumConnections() {
-// return maximumConnections;
-// }
-//
-// /**
-// * @param maximumConnections the maximumConnections to set
-// */
-// public void setMaximumConnections(int maximumConnections) {
-// this.maximumConnections = maximumConnections;
-// }
-//
-//
-// public void started(Service service) {
-// this.currentTransportCount++;
-// }
-//
-// public void stopped(Service service) {
-// this.currentTransportCount--;
-// }
}
\ No newline at end of file
Copied: activemq/sandbox/activemq-apollo-actor/activemq-transport/src/main/java/org/apache/activemq/apollo/transport/DefaultTransportListener.java (from r961217, activemq/sandbox/activemq-apollo-actor/activemq-transport/src/main/java/org/apache/activemq/transport/DefaultTransportListener.java)
URL: http://svn.apache.org/viewvc/activemq/sandbox/activemq-apollo-actor/activemq-transport/src/main/java/org/apache/activemq/apollo/transport/DefaultTransportListener.java?p2=activemq/sandbox/activemq-apollo-actor/activemq-transport/src/main/java/org/apache/activemq/apollo/transport/DefaultTransportListener.java&p1=activemq/sandbox/activemq-apollo-actor/activemq-transport/src/main/java/org/apache/activemq/transport/DefaultTransportListener.java&r1=961217&r2=961218&rev=961218&view=diff
==============================================================================
--- activemq/sandbox/activemq-apollo-actor/activemq-transport/src/main/java/org/apache/activemq/transport/DefaultTransportListener.java (original)
+++ activemq/sandbox/activemq-apollo-actor/activemq-transport/src/main/java/org/apache/activemq/apollo/transport/DefaultTransportListener.java Wed Jul 7 04:23:35 2010
@@ -14,7 +14,7 @@
* See the License for the specific language governing permissions and
* limitations under the License.
*/
-package org.apache.activemq.transport;
+package org.apache.activemq.apollo.transport;
import java.io.IOException;
Copied: activemq/sandbox/activemq-apollo-actor/activemq-transport/src/main/java/org/apache/activemq/apollo/transport/DiscoveryAgent.java (from r961217, activemq/sandbox/activemq-apollo-actor/activemq-transport/src/main/java/org/apache/activemq/transport/discovery/DiscoveryAgent.java)
URL: http://svn.apache.org/viewvc/activemq/sandbox/activemq-apollo-actor/activemq-transport/src/main/java/org/apache/activemq/apollo/transport/DiscoveryAgent.java?p2=activemq/sandbox/activemq-apollo-actor/activemq-transport/src/main/java/org/apache/activemq/apollo/transport/DiscoveryAgent.java&p1=activemq/sandbox/activemq-apollo-actor/activemq-transport/src/main/java/org/apache/activemq/transport/discovery/DiscoveryAgent.java&r1=961217&r2=961218&rev=961218&view=diff
==============================================================================
--- activemq/sandbox/activemq-apollo-actor/activemq-transport/src/main/java/org/apache/activemq/transport/discovery/DiscoveryAgent.java (original)
+++ activemq/sandbox/activemq-apollo-actor/activemq-transport/src/main/java/org/apache/activemq/apollo/transport/DiscoveryAgent.java Wed Jul 7 04:23:35 2010
@@ -14,7 +14,7 @@
* See the License for the specific language governing permissions and
* limitations under the License.
*/
-package org.apache.activemq.transport.discovery;
+package org.apache.activemq.apollo.transport;
import java.io.IOException;
Copied: activemq/sandbox/activemq-apollo-actor/activemq-transport/src/main/java/org/apache/activemq/apollo/transport/DiscoveryAgentFactory.java (from r961217, activemq/sandbox/activemq-apollo-actor/activemq-transport/src/main/java/org/apache/activemq/transport/discovery/DiscoveryAgentFactory.java)
URL: http://svn.apache.org/viewvc/activemq/sandbox/activemq-apollo-actor/activemq-transport/src/main/java/org/apache/activemq/apollo/transport/DiscoveryAgentFactory.java?p2=activemq/sandbox/activemq-apollo-actor/activemq-transport/src/main/java/org/apache/activemq/apollo/transport/DiscoveryAgentFactory.java&p1=activemq/sandbox/activemq-apollo-actor/activemq-transport/src/main/java/org/apache/activemq/transport/discovery/DiscoveryAgentFactory.java&r1=961217&r2=961218&rev=961218&view=diff
==============================================================================
--- activemq/sandbox/activemq-apollo-actor/activemq-transport/src/main/java/org/apache/activemq/transport/discovery/DiscoveryAgentFactory.java (original)
+++ activemq/sandbox/activemq-apollo-actor/activemq-transport/src/main/java/org/apache/activemq/apollo/transport/DiscoveryAgentFactory.java Wed Jul 7 04:23:35 2010
@@ -14,7 +14,7 @@
* See the License for the specific language governing permissions and
* limitations under the License.
*/
-package org.apache.activemq.transport.discovery;
+package org.apache.activemq.apollo.transport;
import java.io.IOException;
import java.net.URI;
Copied: activemq/sandbox/activemq-apollo-actor/activemq-transport/src/main/java/org/apache/activemq/apollo/transport/DiscoveryEvent.java (from r961217, activemq/sandbox/activemq-apollo-actor/activemq-transport/src/main/java/org/apache/activemq/transport/discovery/DiscoveryEvent.java)
URL: http://svn.apache.org/viewvc/activemq/sandbox/activemq-apollo-actor/activemq-transport/src/main/java/org/apache/activemq/apollo/transport/DiscoveryEvent.java?p2=activemq/sandbox/activemq-apollo-actor/activemq-transport/src/main/java/org/apache/activemq/apollo/transport/DiscoveryEvent.java&p1=activemq/sandbox/activemq-apollo-actor/activemq-transport/src/main/java/org/apache/activemq/transport/discovery/DiscoveryEvent.java&r1=961217&r2=961218&rev=961218&view=diff
==============================================================================
--- activemq/sandbox/activemq-apollo-actor/activemq-transport/src/main/java/org/apache/activemq/transport/discovery/DiscoveryEvent.java (original)
+++ activemq/sandbox/activemq-apollo-actor/activemq-transport/src/main/java/org/apache/activemq/apollo/transport/DiscoveryEvent.java Wed Jul 7 04:23:35 2010
@@ -14,7 +14,7 @@
* See the License for the specific language governing permissions and
* limitations under the License.
*/
-package org.apache.activemq.transport.discovery;
+package org.apache.activemq.apollo.transport;
public class DiscoveryEvent {
Copied: activemq/sandbox/activemq-apollo-actor/activemq-transport/src/main/java/org/apache/activemq/apollo/transport/DiscoveryListener.java (from r961217, activemq/sandbox/activemq-apollo-actor/activemq-transport/src/main/java/org/apache/activemq/transport/discovery/DiscoveryListener.java)
URL: http://svn.apache.org/viewvc/activemq/sandbox/activemq-apollo-actor/activemq-transport/src/main/java/org/apache/activemq/apollo/transport/DiscoveryListener.java?p2=activemq/sandbox/activemq-apollo-actor/activemq-transport/src/main/java/org/apache/activemq/apollo/transport/DiscoveryListener.java&p1=activemq/sandbox/activemq-apollo-actor/activemq-transport/src/main/java/org/apache/activemq/transport/discovery/DiscoveryListener.java&r1=961217&r2=961218&rev=961218&view=diff
==============================================================================
--- activemq/sandbox/activemq-apollo-actor/activemq-transport/src/main/java/org/apache/activemq/transport/discovery/DiscoveryListener.java (original)
+++ activemq/sandbox/activemq-apollo-actor/activemq-transport/src/main/java/org/apache/activemq/apollo/transport/DiscoveryListener.java Wed Jul 7 04:23:35 2010
@@ -14,7 +14,7 @@
* See the License for the specific language governing permissions and
* limitations under the License.
*/
-package org.apache.activemq.transport.discovery;
+package org.apache.activemq.apollo.transport;
/**
Copied: activemq/sandbox/activemq-apollo-actor/activemq-transport/src/main/java/org/apache/activemq/apollo/transport/ObjectStreamProtocolCodec.java (from r961217, activemq/sandbox/activemq-apollo-actor/activemq-transport/src/main/java/org/apache/activemq/wireformat/ObjectStreamWireFormat.java)
URL: http://svn.apache.org/viewvc/activemq/sandbox/activemq-apollo-actor/activemq-transport/src/main/java/org/apache/activemq/apollo/transport/ObjectStreamProtocolCodec.java?p2=activemq/sandbox/activemq-apollo-actor/activemq-transport/src/main/java/org/apache/activemq/apollo/transport/ObjectStreamProtocolCodec.java&p1=activemq/sandbox/activemq-apollo-actor/activemq-transport/src/main/java/org/apache/activemq/wireformat/ObjectStreamWireFormat.java&r1=961217&r2=961218&rev=961218&view=diff
==============================================================================
--- activemq/sandbox/activemq-apollo-actor/activemq-transport/src/main/java/org/apache/activemq/wireformat/ObjectStreamWireFormat.java (original)
+++ activemq/sandbox/activemq-apollo-actor/activemq-transport/src/main/java/org/apache/activemq/apollo/transport/ObjectStreamProtocolCodec.java Wed Jul 7 04:23:35 2010
@@ -14,7 +14,7 @@
* See the License for the specific language governing permissions and
* limitations under the License.
*/
-package org.apache.activemq.wireformat;
+package org.apache.activemq.apollo.transport;
import java.io.DataInput;
import java.io.DataInputStream;
@@ -24,7 +24,6 @@ import java.io.IOException;
import java.io.InputStream;
import java.io.ObjectOutputStream;
import java.io.OutputStream;
-import java.nio.ByteBuffer;
import java.nio.channels.ReadableByteChannel;
import java.nio.channels.WritableByteChannel;
@@ -38,7 +37,7 @@ import org.fusesource.hawtbuf.ByteArrayO
*
* @version $Revision: 1.1 $
*/
-public class ObjectStreamWireFormat implements WireFormat {
+public class ObjectStreamProtocolCodec implements ProtocolCodec {
public static final String WIREFORMAT_NAME = "object";
@@ -105,7 +104,7 @@ public class ObjectStreamWireFormat impl
throw new UnsupportedOperationException();
}
- public String getName() {
+ public String protocol() {
return WIREFORMAT_NAME;
}
Added: activemq/sandbox/activemq-apollo-actor/activemq-transport/src/main/java/org/apache/activemq/apollo/transport/ObjectStreamProtocolCodecFactory.java
URL: http://svn.apache.org/viewvc/activemq/sandbox/activemq-apollo-actor/activemq-transport/src/main/java/org/apache/activemq/apollo/transport/ObjectStreamProtocolCodecFactory.java?rev=961218&view=auto
==============================================================================
--- activemq/sandbox/activemq-apollo-actor/activemq-transport/src/main/java/org/apache/activemq/apollo/transport/ObjectStreamProtocolCodecFactory.java (added)
+++ activemq/sandbox/activemq-apollo-actor/activemq-transport/src/main/java/org/apache/activemq/apollo/transport/ObjectStreamProtocolCodecFactory.java Wed Jul 7 04:23:35 2010
@@ -0,0 +1,22 @@
+package org.apache.activemq.apollo.transport;
+
+import org.fusesource.hawtbuf.Buffer;
+
+public class ObjectStreamProtocolCodecFactory implements ProtocolCodecFactory {
+
+ public ProtocolCodec createProtocolCodec() {
+ return new ObjectStreamProtocolCodec();
+ }
+
+ public boolean isIdentifiable() {
+ return false;
+ }
+
+ public boolean matchesIdentification(Buffer byteSequence) {
+ throw new UnsupportedOperationException();
+ }
+
+ public int maxIdentificaionLength() {
+ throw new UnsupportedOperationException();
+ }
+}
Copied: activemq/sandbox/activemq-apollo-actor/activemq-transport/src/main/java/org/apache/activemq/apollo/transport/ProtocolCodec.java (from r961217, activemq/sandbox/activemq-apollo-actor/activemq-transport/src/main/java/org/apache/activemq/wireformat/WireFormat.java)
URL: http://svn.apache.org/viewvc/activemq/sandbox/activemq-apollo-actor/activemq-transport/src/main/java/org/apache/activemq/apollo/transport/ProtocolCodec.java?p2=activemq/sandbox/activemq-apollo-actor/activemq-transport/src/main/java/org/apache/activemq/apollo/transport/ProtocolCodec.java&p1=activemq/sandbox/activemq-apollo-actor/activemq-transport/src/main/java/org/apache/activemq/wireformat/WireFormat.java&r1=961217&r2=961218&rev=961218&view=diff
==============================================================================
--- activemq/sandbox/activemq-apollo-actor/activemq-transport/src/main/java/org/apache/activemq/wireformat/WireFormat.java (original)
+++ activemq/sandbox/activemq-apollo-actor/activemq-transport/src/main/java/org/apache/activemq/apollo/transport/ProtocolCodec.java Wed Jul 7 04:23:35 2010
@@ -14,12 +14,9 @@
* See the License for the specific language governing permissions and
* limitations under the License.
*/
-package org.apache.activemq.wireformat;
+package org.apache.activemq.apollo.transport;
-import java.io.DataInput;
-import java.io.DataOutput;
import java.io.IOException;
-import java.nio.ByteBuffer;
import java.nio.channels.ReadableByteChannel;
import java.nio.channels.WritableByteChannel;
@@ -27,44 +24,22 @@ import org.fusesource.hawtbuf.Buffer;
/**
- * Provides a mechanism to marshal commands into and out of packets
- * or into and out of streams, Channels and Datagrams.
+ * Interface to encode and decode commands in and out of a a non blocking channel.
*
- * @version $Revision: 1.1 $
+ * @author <a href="http://hiramchirino.com">Hiram Chirino</a>
*/
-public interface WireFormat {
-
- enum BufferState {
- EMPTY,
- WAS_EMPTY,
- NOT_EMPTY,
- FULL,
- }
+public interface ProtocolCodec {
/**
- * @return The name of the wireformat
+ * @return The name of the protocol associated with the the channel codec.
*/
- String getName();
+ String protocol();
- /**
- * Packet based marshaling
- */
- Buffer marshal(Object command) throws IOException;
-
- /**
- * Packet based un-marshaling
- */
- Object unmarshal(Buffer packet) throws IOException;
-
- /**
- * Stream based marshaling
- */
- void marshal(Object command, DataOutput out) throws IOException;
-
- /**
- * Stream based un-marshaling
- */
- Object unmarshal(DataInput in) throws IOException;
+ ///////////////////////////////////////////////////////////////////
+ //
+ // Methods related with reading from the channel
+ //
+ ///////////////////////////////////////////////////////////////////
/**
* @param channel
@@ -87,13 +62,26 @@ public interface WireFormat {
*/
void unread(Buffer buffer);
-
/**
* @return The number of bytes received.
*/
public long getReadCounter();
+ ///////////////////////////////////////////////////////////////////
+ //
+ // Methods related with writing to the channel
+ //
+ ///////////////////////////////////////////////////////////////////
+
+
+ enum BufferState {
+ EMPTY,
+ WAS_EMPTY,
+ NOT_EMPTY,
+ FULL,
+ }
+
public void setWritableByteChannel(WritableByteChannel channel);
/**
@@ -114,19 +102,7 @@ public interface WireFormat {
/**
* @return The number of bytes written.
*/
- public long getWriteCounter() ;
-
-
-// void unmarshalStartPos(int pos);
-//
-// int unmarshalEndPos();
-// void unmarshalEndPos(int pos);
-//
-// /**
-// * For a unmarshal session is used for non-blocking
-// * unmarshalling.
-// */
-// Object unmarshalNB(ByteBuffer buffer) throws IOException;
+ public long getWriteCounter();
}
Copied: activemq/sandbox/activemq-apollo-actor/activemq-transport/src/main/java/org/apache/activemq/apollo/transport/ProtocolCodecFactory.java (from r961217, activemq/sandbox/activemq-apollo-actor/activemq-transport/src/main/java/org/apache/activemq/wireformat/WireFormatFactory.java)
URL: http://svn.apache.org/viewvc/activemq/sandbox/activemq-apollo-actor/activemq-transport/src/main/java/org/apache/activemq/apollo/transport/ProtocolCodecFactory.java?p2=activemq/sandbox/activemq-apollo-actor/activemq-transport/src/main/java/org/apache/activemq/apollo/transport/ProtocolCodecFactory.java&p1=activemq/sandbox/activemq-apollo-actor/activemq-transport/src/main/java/org/apache/activemq/wireformat/WireFormatFactory.java&r1=961217&r2=961218&rev=961218&view=diff
==============================================================================
--- activemq/sandbox/activemq-apollo-actor/activemq-transport/src/main/java/org/apache/activemq/wireformat/WireFormatFactory.java (original)
+++ activemq/sandbox/activemq-apollo-actor/activemq-transport/src/main/java/org/apache/activemq/apollo/transport/ProtocolCodecFactory.java Wed Jul 7 04:23:35 2010
@@ -14,38 +14,38 @@
* See the License for the specific language governing permissions and
* limitations under the License.
*/
-package org.apache.activemq.wireformat;
+package org.apache.activemq.apollo.transport;
import org.fusesource.hawtbuf.Buffer;
-public interface WireFormatFactory {
+public interface ProtocolCodecFactory {
/**
* @return an instance of the wire format.
*
*/
- WireFormat createWireFormat();
+ ProtocolCodec createProtocolCodec();
/**
- * @return true if this wire format factory is isDiscriminatable. A discriminatable
- * WireFormat's will first write a header to the stream
+ * @return true if this wire format factory is identifiable. An identifiable
+ * protocol will first write a easy to identify header to the stream
*/
- boolean isDiscriminatable();
+ boolean isIdentifiable();
/**
* @return Returns the maximum length of the header used to discriminate the wire format if it
- * {@link #isDiscriminatable()}
- * @throws UnsupportedOperationException If {@link #isDiscriminatable()} is false
+ * {@link #isIdentifiable()}
+ * @throws UnsupportedOperationException If {@link #isIdentifiable()} is false
*/
- int maxWireformatHeaderLength();
+ int maxIdentificaionLength();
/**
- * Called to test if this wireformat matches the provided header.
+ * Called to test if this protocol matches the identification header.
*
* @param buffer The byte buffer representing the header data read so far.
- * @return true if the Buffer matches the wire format header.
+ * @return true if the Buffer matches the protocol format header.
*/
- boolean matchesWireformatHeader(Buffer buffer);
+ boolean matchesIdentification(Buffer buffer);
}
Copied: activemq/sandbox/activemq-apollo-actor/activemq-transport/src/main/java/org/apache/activemq/apollo/transport/Transport.java (from r961217, activemq/sandbox/activemq-apollo-actor/activemq-transport/src/main/java/org/apache/activemq/transport/Transport.java)
URL: http://svn.apache.org/viewvc/activemq/sandbox/activemq-apollo-actor/activemq-transport/src/main/java/org/apache/activemq/apollo/transport/Transport.java?p2=activemq/sandbox/activemq-apollo-actor/activemq-transport/src/main/java/org/apache/activemq/apollo/transport/Transport.java&p1=activemq/sandbox/activemq-apollo-actor/activemq-transport/src/main/java/org/apache/activemq/transport/Transport.java&r1=961217&r2=961218&rev=961218&view=diff
==============================================================================
--- activemq/sandbox/activemq-apollo-actor/activemq-transport/src/main/java/org/apache/activemq/transport/Transport.java (original)
+++ activemq/sandbox/activemq-apollo-actor/activemq-transport/src/main/java/org/apache/activemq/apollo/transport/Transport.java Wed Jul 7 04:23:35 2010
@@ -14,15 +14,13 @@
* See the License for the specific language governing permissions and
* limitations under the License.
*/
-package org.apache.activemq.transport;
+package org.apache.activemq.apollo.transport;
import java.io.IOException;
import java.net.URI;
import org.apache.activemq.Service;
-import org.apache.activemq.wireformat.WireFormat;
import org.fusesource.hawtdispatch.DispatchQueue;
-import org.fusesource.hawtdispatch.Retained;
/**
* Represents an abstract connection. It can be a client side or server side connection.
@@ -109,11 +107,15 @@ public interface Transport extends Servi
boolean isConnected();
/**
- * @return The wireformat for the connection.
+ * @return The protocol codec for the transport.
*/
- WireFormat getWireformat();
+ ProtocolCodec getProtocolCodec();
- void setWireformat(WireFormat wireformat);
+ /**
+ * Sets the protocol codec for the transport
+ * @param protocolCodec
+ */
+ void setProtocolCodec(ProtocolCodec protocolCodec);
/**
* reconnect to another location
Copied: activemq/sandbox/activemq-apollo-actor/activemq-transport/src/main/java/org/apache/activemq/apollo/transport/TransportAcceptListener.java (from r961217, activemq/sandbox/activemq-apollo-actor/activemq-transport/src/main/java/org/apache/activemq/transport/TransportAcceptListener.java)
URL: http://svn.apache.org/viewvc/activemq/sandbox/activemq-apollo-actor/activemq-transport/src/main/java/org/apache/activemq/apollo/transport/TransportAcceptListener.java?p2=activemq/sandbox/activemq-apollo-actor/activemq-transport/src/main/java/org/apache/activemq/apollo/transport/TransportAcceptListener.java&p1=activemq/sandbox/activemq-apollo-actor/activemq-transport/src/main/java/org/apache/activemq/transport/TransportAcceptListener.java&r1=961217&r2=961218&rev=961218&view=diff
==============================================================================
--- activemq/sandbox/activemq-apollo-actor/activemq-transport/src/main/java/org/apache/activemq/transport/TransportAcceptListener.java (original)
+++ activemq/sandbox/activemq-apollo-actor/activemq-transport/src/main/java/org/apache/activemq/apollo/transport/TransportAcceptListener.java Wed Jul 7 04:23:35 2010
@@ -14,7 +14,7 @@
* See the License for the specific language governing permissions and
* limitations under the License.
*/
-package org.apache.activemq.transport;
+package org.apache.activemq.apollo.transport;
public interface TransportAcceptListener {
Copied: activemq/sandbox/activemq-apollo-actor/activemq-transport/src/main/java/org/apache/activemq/apollo/transport/TransportFactory.java (from r961217, activemq/sandbox/activemq-apollo-actor/activemq-transport/src/main/java/org/apache/activemq/transport/TransportFactory.java)
URL: http://svn.apache.org/viewvc/activemq/sandbox/activemq-apollo-actor/activemq-transport/src/main/java/org/apache/activemq/apollo/transport/TransportFactory.java?p2=activemq/sandbox/activemq-apollo-actor/activemq-transport/src/main/java/org/apache/activemq/apollo/transport/TransportFactory.java&p1=activemq/sandbox/activemq-apollo-actor/activemq-transport/src/main/java/org/apache/activemq/transport/TransportFactory.java&r1=961217&r2=961218&rev=961218&view=diff
==============================================================================
--- activemq/sandbox/activemq-apollo-actor/activemq-transport/src/main/java/org/apache/activemq/transport/TransportFactory.java (original)
+++ activemq/sandbox/activemq-apollo-actor/activemq-transport/src/main/java/org/apache/activemq/apollo/transport/TransportFactory.java Wed Jul 7 04:23:35 2010
@@ -14,14 +14,12 @@
* See the License for the specific language governing permissions and
* limitations under the License.
*/
-package org.apache.activemq.transport;
+package org.apache.activemq.apollo.transport;
import org.apache.activemq.util.FactoryFinder;
import org.apache.activemq.util.IOExceptionSupport;
-import org.fusesource.hawtdispatch.DispatchQueue;
import java.io.IOException;
-import java.net.URI;
import java.util.concurrent.ConcurrentHashMap;
/**
Copied: activemq/sandbox/activemq-apollo-actor/activemq-transport/src/main/java/org/apache/activemq/apollo/transport/TransportFactorySupport.java (from r961217, activemq/sandbox/activemq-apollo-actor/activemq-transport/src/main/java/org/apache/activemq/transport/TransportFactorySupport.java)
URL: http://svn.apache.org/viewvc/activemq/sandbox/activemq-apollo-actor/activemq-transport/src/main/java/org/apache/activemq/apollo/transport/TransportFactorySupport.java?p2=activemq/sandbox/activemq-apollo-actor/activemq-transport/src/main/java/org/apache/activemq/apollo/transport/TransportFactorySupport.java&p1=activemq/sandbox/activemq-apollo-actor/activemq-transport/src/main/java/org/apache/activemq/transport/TransportFactorySupport.java&r1=961217&r2=961218&rev=961218&view=diff
==============================================================================
--- activemq/sandbox/activemq-apollo-actor/activemq-transport/src/main/java/org/apache/activemq/transport/TransportFactorySupport.java (original)
+++ activemq/sandbox/activemq-apollo-actor/activemq-transport/src/main/java/org/apache/activemq/apollo/transport/TransportFactorySupport.java Wed Jul 7 04:23:35 2010
@@ -14,36 +14,29 @@
* See the License for the specific language governing permissions and
* limitations under the License.
*/
-package org.apache.activemq.transport;
+package org.apache.activemq.apollo.transport;
import org.apache.activemq.util.FactoryFinder;
import org.apache.activemq.util.IOExceptionSupport;
import org.apache.activemq.util.IntrospectionSupport;
import org.apache.activemq.util.URISupport;
-import org.apache.activemq.wireformat.WireFormat;
-import org.apache.activemq.wireformat.WireFormatFactory;
import java.io.IOException;
-import java.net.MalformedURLException;
import java.net.URI;
import java.net.URISyntaxException;
-import java.net.UnknownHostException;
import java.util.HashMap;
import java.util.Map;
/**
* <a href="http://hiramchirino.com">Hiram Chirino</a>
*/
-public class TransportFactorySupport{
+public class TransportFactorySupport {
- private static final FactoryFinder WIREFORMAT_FACTORY_FINDER = new FactoryFinder("META-INF/services/org/apache/activemq/wireformat/");
-
- private static final String WRITE_TIMEOUT_FILTER = "soWriteTimeout";
- private static final String THREAD_NAME_FILTER = "threadName";
+ private static final FactoryFinder PROTOCOL_CODEC_FACTORY_FINDER = new FactoryFinder("META-INF/services/org/apache/activemq/wireformat/");
static public Transport configure(Transport transport, Map<String, String> options) throws IOException {
- WireFormat wf = createWireFormat(options);
- transport.setWireformat(wf);
+ ProtocolCodec wf = createProtocolCodec(options);
+ transport.setProtocolCodec(wf);
IntrospectionSupport.setProperties(transport, options);
return transport;
}
@@ -60,52 +53,52 @@ public class TransportFactorySupport{
return transport;
}
- static public WireFormat createWireFormat(Map<String, String> options) throws IOException {
- WireFormatFactory factory = createWireFormatFactory(options);
+ static public ProtocolCodec createProtocolCodec(Map<String, String> options) throws IOException {
+ ProtocolCodecFactory factory = createProtocolCodecFactory(options);
if( factory == null ) {
return null;
}
- WireFormat format = factory.createWireFormat();
- return format;
+ ProtocolCodec protocolCodec = factory.createProtocolCodec();
+ return protocolCodec;
}
- static public WireFormatFactory createWireFormatFactory(Map<String, String> options) throws IOException {
- String wireFormat = (String)options.remove("wireFormat");
- if (wireFormat == null) {
- wireFormat = getDefaultWireFormatType();
+ static public ProtocolCodecFactory createProtocolCodecFactory(Map<String, String> options) throws IOException {
+ String protocolName = (String)options.remove("protocol");
+ if (protocolName == null) {
+ protocolName = getDefaultProtocolName();
}
- if( "null".equals(wireFormat) ) {
+ if( "null".equals(protocolName) ) {
return null;
}
try {
- WireFormatFactory wff = (WireFormatFactory)WIREFORMAT_FACTORY_FINDER.newInstance(wireFormat);
- IntrospectionSupport.setProperties(wff, options, "wireFormat.");
+ ProtocolCodecFactory wff = (ProtocolCodecFactory) PROTOCOL_CODEC_FACTORY_FINDER.newInstance(protocolName);
+ IntrospectionSupport.setProperties(wff, options, "protocol.");
return wff;
} catch (Throwable e) {
- throw IOExceptionSupport.create("Could not create wire format factory for: " + wireFormat + ", reason: " + e, e);
+ throw IOExceptionSupport.create("Could not create protocol codec for: " + protocolName + ", reason: " + e, e);
}
}
- static public WireFormatFactory createWireFormatFactory(String location) throws IOException, URISyntaxException {
+ static public ProtocolCodecFactory createProtocolCodecFactory(String location) throws IOException, URISyntaxException {
URI uri = new URI(location);
Map<String, String> options = new HashMap<String, String>(URISupport.parseParamters(uri));
- String wireFormat = uri.getPath();
- if( "null".equals(wireFormat) ) {
+ String protocolName = uri.getPath();
+ if( "null".equals(protocolName) ) {
return null;
}
try {
- WireFormatFactory wff = (WireFormatFactory)WIREFORMAT_FACTORY_FINDER.newInstance(wireFormat);
+ ProtocolCodecFactory wff = (ProtocolCodecFactory) PROTOCOL_CODEC_FACTORY_FINDER.newInstance(protocolName);
IntrospectionSupport.setProperties(wff, options);
return wff;
} catch (Throwable e) {
- throw IOExceptionSupport.create("Could not create wire format factory for: " + wireFormat + ", reason: " + e, e);
+ throw IOExceptionSupport.create("Could not protocol codec for: " + protocolName + ", reason: " + e, e);
}
}
- static protected String getDefaultWireFormatType() {
- return "default";
+ static protected String getDefaultProtocolName() {
+ return "stomp";
}
}
Copied: activemq/sandbox/activemq-apollo-actor/activemq-transport/src/main/java/org/apache/activemq/apollo/transport/TransportFilter.java (from r961217, activemq/sandbox/activemq-apollo-actor/activemq-transport/src/main/java/org/apache/activemq/transport/TransportFilter.java)
URL: http://svn.apache.org/viewvc/activemq/sandbox/activemq-apollo-actor/activemq-transport/src/main/java/org/apache/activemq/apollo/transport/TransportFilter.java?p2=activemq/sandbox/activemq-apollo-actor/activemq-transport/src/main/java/org/apache/activemq/apollo/transport/TransportFilter.java&p1=activemq/sandbox/activemq-apollo-actor/activemq-transport/src/main/java/org/apache/activemq/transport/TransportFilter.java&r1=961217&r2=961218&rev=961218&view=diff
==============================================================================
--- activemq/sandbox/activemq-apollo-actor/activemq-transport/src/main/java/org/apache/activemq/transport/TransportFilter.java (original)
+++ activemq/sandbox/activemq-apollo-actor/activemq-transport/src/main/java/org/apache/activemq/apollo/transport/TransportFilter.java Wed Jul 7 04:23:35 2010
@@ -14,15 +14,13 @@
* See the License for the specific language governing permissions and
* limitations under the License.
*/
-package org.apache.activemq.transport;
+package org.apache.activemq.apollo.transport;
+
+import org.fusesource.hawtdispatch.DispatchQueue;
import java.io.IOException;
import java.net.URI;
-import org.apache.activemq.wireformat.WireFormat;
-import org.fusesource.hawtdispatch.DispatchQueue;
-import org.fusesource.hawtdispatch.Retained;
-
/**
* @version $Revision: 1.5 $
*/
@@ -151,7 +149,7 @@ public class TransportFilter implements
/**
* @return
- * @see org.apache.activemq.transport.Transport#isFaultTolerant()
+ * @see Transport#isFaultTolerant()
*/
public boolean isFaultTolerant() {
return next.isFaultTolerant();
@@ -173,11 +171,11 @@ public class TransportFilter implements
next.reconnect(uri);
}
- public WireFormat getWireformat() {
- return next.getWireformat();
+ public ProtocolCodec getProtocolCodec() {
+ return next.getProtocolCodec();
}
- public void setWireformat(WireFormat wireformat) {
- next.setWireformat(wireformat);
+ public void setProtocolCodec(ProtocolCodec protocolCodec) {
+ next.setProtocolCodec(protocolCodec);
}
Copied: activemq/sandbox/activemq-apollo-actor/activemq-transport/src/main/java/org/apache/activemq/apollo/transport/TransportListener.java (from r961217, activemq/sandbox/activemq-apollo-actor/activemq-transport/src/main/java/org/apache/activemq/transport/TransportListener.java)
URL: http://svn.apache.org/viewvc/activemq/sandbox/activemq-apollo-actor/activemq-transport/src/main/java/org/apache/activemq/apollo/transport/TransportListener.java?p2=activemq/sandbox/activemq-apollo-actor/activemq-transport/src/main/java/org/apache/activemq/apollo/transport/TransportListener.java&p1=activemq/sandbox/activemq-apollo-actor/activemq-transport/src/main/java/org/apache/activemq/transport/TransportListener.java&r1=961217&r2=961218&rev=961218&view=diff
==============================================================================
--- activemq/sandbox/activemq-apollo-actor/activemq-transport/src/main/java/org/apache/activemq/transport/TransportListener.java (original)
+++ activemq/sandbox/activemq-apollo-actor/activemq-transport/src/main/java/org/apache/activemq/apollo/transport/TransportListener.java Wed Jul 7 04:23:35 2010
@@ -14,7 +14,7 @@
* See the License for the specific language governing permissions and
* limitations under the License.
*/
-package org.apache.activemq.transport;
+package org.apache.activemq.apollo.transport;
import java.io.IOException;
Copied: activemq/sandbox/activemq-apollo-actor/activemq-transport/src/main/java/org/apache/activemq/apollo/transport/TransportServer.java (from r961217, activemq/sandbox/activemq-apollo-actor/activemq-transport/src/main/java/org/apache/activemq/transport/TransportServer.java)
URL: http://svn.apache.org/viewvc/activemq/sandbox/activemq-apollo-actor/activemq-transport/src/main/java/org/apache/activemq/apollo/transport/TransportServer.java?p2=activemq/sandbox/activemq-apollo-actor/activemq-transport/src/main/java/org/apache/activemq/apollo/transport/TransportServer.java&p1=activemq/sandbox/activemq-apollo-actor/activemq-transport/src/main/java/org/apache/activemq/transport/TransportServer.java&r1=961217&r2=961218&rev=961218&view=diff
==============================================================================
--- activemq/sandbox/activemq-apollo-actor/activemq-transport/src/main/java/org/apache/activemq/transport/TransportServer.java (original)
+++ activemq/sandbox/activemq-apollo-actor/activemq-transport/src/main/java/org/apache/activemq/apollo/transport/TransportServer.java Wed Jul 7 04:23:35 2010
@@ -14,13 +14,12 @@
* See the License for the specific language governing permissions and
* limitations under the License.
*/
-package org.apache.activemq.transport;
+package org.apache.activemq.apollo.transport;
import java.net.InetSocketAddress;
import java.net.URI;
import org.apache.activemq.Service;
-import org.apache.activemq.wireformat.WireFormatFactory;
import org.fusesource.hawtdispatch.DispatchQueue;
/**
Copied: activemq/sandbox/activemq-apollo-actor/activemq-transport/src/main/java/org/apache/activemq/apollo/transport/TransportServerSupport.java (from r961217, activemq/sandbox/activemq-apollo-actor/activemq-transport/src/main/java/org/apache/activemq/transport/TransportServerSupport.java)
URL: http://svn.apache.org/viewvc/activemq/sandbox/activemq-apollo-actor/activemq-transport/src/main/java/org/apache/activemq/apollo/transport/TransportServerSupport.java?p2=activemq/sandbox/activemq-apollo-actor/activemq-transport/src/main/java/org/apache/activemq/apollo/transport/TransportServerSupport.java&p1=activemq/sandbox/activemq-apollo-actor/activemq-transport/src/main/java/org/apache/activemq/transport/TransportServerSupport.java&r1=961217&r2=961218&rev=961218&view=diff
==============================================================================
--- activemq/sandbox/activemq-apollo-actor/activemq-transport/src/main/java/org/apache/activemq/transport/TransportServerSupport.java (original)
+++ activemq/sandbox/activemq-apollo-actor/activemq-transport/src/main/java/org/apache/activemq/apollo/transport/TransportServerSupport.java Wed Jul 7 04:23:35 2010
@@ -14,7 +14,7 @@
* See the License for the specific language governing permissions and
* limitations under the License.
*/
-package org.apache.activemq.transport;
+package org.apache.activemq.apollo.transport;
import java.net.URI;
Copied: activemq/sandbox/activemq-apollo-actor/activemq-transport/src/main/java/org/apache/activemq/apollo/transport/package.html (from r961217, activemq/sandbox/activemq-apollo-actor/activemq-transport/src/main/java/org/apache/activemq/wireformat/package.html)
URL: http://svn.apache.org/viewvc/activemq/sandbox/activemq-apollo-actor/activemq-transport/src/main/java/org/apache/activemq/apollo/transport/package.html?p2=activemq/sandbox/activemq-apollo-actor/activemq-transport/src/main/java/org/apache/activemq/apollo/transport/package.html&p1=activemq/sandbox/activemq-apollo-actor/activemq-transport/src/main/java/org/apache/activemq/wireformat/package.html&r1=961217&r2=961218&rev=961218&view=diff
==============================================================================
--- activemq/sandbox/activemq-apollo-actor/activemq-transport/src/main/java/org/apache/activemq/wireformat/package.html (original)
+++ activemq/sandbox/activemq-apollo-actor/activemq-transport/src/main/java/org/apache/activemq/apollo/transport/package.html Wed Jul 7 04:23:35 2010
@@ -20,7 +20,7 @@
<body>
<p>
-An API for WireFormats which are used to turn object into bytes and bytes into objects.
+Transport related interfaces.
</p>
</body>
Copied: activemq/sandbox/activemq-apollo-actor/activemq-transport/src/main/java/org/apache/activemq/apollo/transport/pipe/PipeTransport.java (from r961217, activemq/sandbox/activemq-apollo-actor/activemq-transport/src/main/java/org/apache/activemq/transport/pipe/PipeTransport.java)
URL: http://svn.apache.org/viewvc/activemq/sandbox/activemq-apollo-actor/activemq-transport/src/main/java/org/apache/activemq/apollo/transport/pipe/PipeTransport.java?p2=activemq/sandbox/activemq-apollo-actor/activemq-transport/src/main/java/org/apache/activemq/apollo/transport/pipe/PipeTransport.java&p1=activemq/sandbox/activemq-apollo-actor/activemq-transport/src/main/java/org/apache/activemq/transport/pipe/PipeTransport.java&r1=961217&r2=961218&rev=961218&view=diff
==============================================================================
--- activemq/sandbox/activemq-apollo-actor/activemq-transport/src/main/java/org/apache/activemq/transport/pipe/PipeTransport.java (original)
+++ activemq/sandbox/activemq-apollo-actor/activemq-transport/src/main/java/org/apache/activemq/apollo/transport/pipe/PipeTransport.java Wed Jul 7 04:23:35 2010
@@ -14,12 +14,11 @@
* See the License for the specific language governing permissions and
* limitations under the License.
*/
-package org.apache.activemq.transport.pipe;
+package org.apache.activemq.apollo.transport.pipe;
-import org.apache.activemq.transport.Transport;
-import org.apache.activemq.transport.TransportListener;
-import org.fusesource.hawtbuf.Buffer;
-import org.apache.activemq.wireformat.WireFormat;
+import org.apache.activemq.apollo.transport.ProtocolCodec;
+import org.apache.activemq.apollo.transport.Transport;
+import org.apache.activemq.apollo.transport.TransportListener;
import org.fusesource.hawtdispatch.*;
import java.io.EOFException;
@@ -41,7 +40,6 @@ public class PipeTransport implements Tr
private String remoteAddress;
private AtomicBoolean stopping = new AtomicBoolean();
private String name;
- private WireFormat wireformat;
private boolean marshal;
private boolean trace;
@@ -51,6 +49,7 @@ public class PipeTransport implements Tr
private long writeCounter = 0;
private long readCounter = 0;
+ private ProtocolCodec protocolCodec;
public PipeTransport(PipeTransportServer server) {
this.server = server;
@@ -88,13 +87,8 @@ public class PipeTransport implements Tr
if(o == EOF_TOKEN) {
throw new EOFException();
}
-
- if (wireformat != null && marshal) {
- listener.onTransportCommand(wireformat.unmarshal((Buffer) o));
- } else {
- readCounter ++;
- listener.onTransportCommand(o);
- }
+ readCounter ++;
+ listener.onTransportCommand(o);
}
// let the peer know that they have been processed.
@@ -247,11 +241,11 @@ public class PipeTransport implements Tr
this.listener = listener;
}
- public WireFormat getWireformat() {
- return wireformat;
+ public ProtocolCodec getProtocolCodec() {
+ return protocolCodec;
}
- public void setWireformat(WireFormat wireformat) {
- this.wireformat = wireformat;
+ public void setProtocolCodec(ProtocolCodec protocolCodec) {
+ this.protocolCodec = protocolCodec;
}
Copied: activemq/sandbox/activemq-apollo-actor/activemq-transport/src/main/java/org/apache/activemq/apollo/transport/pipe/PipeTransportFactory.java (from r961217, activemq/sandbox/activemq-apollo-actor/activemq-transport/src/main/java/org/apache/activemq/transport/pipe/PipeTransportFactory.java)
URL: http://svn.apache.org/viewvc/activemq/sandbox/activemq-apollo-actor/activemq-transport/src/main/java/org/apache/activemq/apollo/transport/pipe/PipeTransportFactory.java?p2=activemq/sandbox/activemq-apollo-actor/activemq-transport/src/main/java/org/apache/activemq/apollo/transport/pipe/PipeTransportFactory.java&p1=activemq/sandbox/activemq-apollo-actor/activemq-transport/src/main/java/org/apache/activemq/transport/pipe/PipeTransportFactory.java&r1=961217&r2=961218&rev=961218&view=diff
==============================================================================
--- activemq/sandbox/activemq-apollo-actor/activemq-transport/src/main/java/org/apache/activemq/transport/pipe/PipeTransportFactory.java (original)
+++ activemq/sandbox/activemq-apollo-actor/activemq-transport/src/main/java/org/apache/activemq/apollo/transport/pipe/PipeTransportFactory.java Wed Jul 7 04:23:35 2010
@@ -14,7 +14,7 @@
* See the License for the specific language governing permissions and
* limitations under the License.
*/
-package org.apache.activemq.transport.pipe;
+package org.apache.activemq.apollo.transport.pipe;
import java.io.IOException;
import java.net.URI;
@@ -22,8 +22,11 @@ import java.net.URISyntaxException;
import java.util.HashMap;
import java.util.Map;
-import static org.apache.activemq.transport.TransportFactorySupport.*;
-import org.apache.activemq.transport.*;
+import static org.apache.activemq.apollo.transport.TransportFactorySupport.*;
+
+import org.apache.activemq.apollo.transport.Transport;
+import org.apache.activemq.apollo.transport.TransportFactory;
+import org.apache.activemq.apollo.transport.TransportServer;
import org.apache.activemq.util.IntrospectionSupport;
import org.apache.activemq.util.URISupport;
Copied: activemq/sandbox/activemq-apollo-actor/activemq-transport/src/main/java/org/apache/activemq/apollo/transport/pipe/PipeTransportServer.java (from r961217, activemq/sandbox/activemq-apollo-actor/activemq-transport/src/main/java/org/apache/activemq/transport/pipe/PipeTransportServer.java)
URL: http://svn.apache.org/viewvc/activemq/sandbox/activemq-apollo-actor/activemq-transport/src/main/java/org/apache/activemq/apollo/transport/pipe/PipeTransportServer.java?p2=activemq/sandbox/activemq-apollo-actor/activemq-transport/src/main/java/org/apache/activemq/apollo/transport/pipe/PipeTransportServer.java&p1=activemq/sandbox/activemq-apollo-actor/activemq-transport/src/main/java/org/apache/activemq/transport/pipe/PipeTransportServer.java&r1=961217&r2=961218&rev=961218&view=diff
==============================================================================
--- activemq/sandbox/activemq-apollo-actor/activemq-transport/src/main/java/org/apache/activemq/transport/pipe/PipeTransportServer.java (original)
+++ activemq/sandbox/activemq-apollo-actor/activemq-transport/src/main/java/org/apache/activemq/apollo/transport/pipe/PipeTransportServer.java Wed Jul 7 04:23:35 2010
@@ -14,11 +14,11 @@
* See the License for the specific language governing permissions and
* limitations under the License.
*/
-package org.apache.activemq.transport.pipe;
+package org.apache.activemq.apollo.transport.pipe;
-import org.apache.activemq.transport.TransportAcceptListener;
-import org.apache.activemq.transport.TransportServer;
-import org.apache.activemq.wireformat.WireFormatFactory;
+import org.apache.activemq.apollo.transport.TransportAcceptListener;
+import org.apache.activemq.apollo.transport.TransportServer;
+import org.apache.activemq.apollo.transport.ProtocolCodecFactory;
import org.fusesource.hawtdispatch.CustomDispatchSource;
import org.fusesource.hawtdispatch.Dispatch;
import org.fusesource.hawtdispatch.DispatchQueue;
@@ -38,7 +38,7 @@ public class PipeTransportServer impleme
protected URI connectURI;
protected TransportAcceptListener listener;
protected String name;
- protected WireFormatFactory wireFormatFactory;
+ protected ProtocolCodecFactory protocolCodecFactory;
protected boolean marshal;
protected final AtomicInteger connectionCounter = new AtomicInteger();
DispatchQueue dispatchQueue;
@@ -127,9 +127,9 @@ public class PipeTransportServer impleme
serverTransport.setRemoteAddress(remoteAddress);
serverTransport.setMarshal(marshal);
- if (wireFormatFactory != null) {
- clientTransport.setWireformat(wireFormatFactory.createWireFormat());
- serverTransport.setWireformat(wireFormatFactory.createWireFormat());
+ if (protocolCodecFactory != null) {
+ clientTransport.setProtocolCodec(protocolCodecFactory.createProtocolCodec());
+ serverTransport.setProtocolCodec(protocolCodecFactory.createProtocolCodec());
}
this.acceptSource.merge(serverTransport);
return clientTransport;
@@ -143,8 +143,8 @@ public class PipeTransportServer impleme
return new PipeTransport(this);
}
- public void setWireFormatFactory(WireFormatFactory wireFormatFactory) {
- this.wireFormatFactory = wireFormatFactory;
+ public void setProtocolCodecFactory(ProtocolCodecFactory protocolCodecFactory) {
+ this.protocolCodecFactory = protocolCodecFactory;
}
public boolean isMarshal() {
Modified: activemq/sandbox/activemq-apollo-actor/activemq-util/src/main/scala/org/apache/activemq/apollo/DirectBufferPoolFactory.scala
URL: http://svn.apache.org/viewvc/activemq/sandbox/activemq-apollo-actor/activemq-util/src/main/scala/org/apache/activemq/apollo/DirectBufferPoolFactory.scala?rev=961218&r1=961217&r2=961218&view=diff
==============================================================================
--- activemq/sandbox/activemq-apollo-actor/activemq-util/src/main/scala/org/apache/activemq/apollo/DirectBufferPoolFactory.scala (original)
+++ activemq/sandbox/activemq-apollo-actor/activemq-util/src/main/scala/org/apache/activemq/apollo/DirectBufferPoolFactory.scala Wed Jul 7 04:23:35 2010
@@ -27,7 +27,7 @@ import org.apache.activemq.apollo.util.C
object DirectBufferPoolFactory {
val finder = ClassFinder[SPI]("META-INF/services/org.apache.activemq.apollo/direct-buffer-pools")
- var direct_buffer_pools_spis = List[SPI]()
+ var spis = List[SPI]()
trait SPI {
def create(config:String):DirectBufferPool
@@ -36,11 +36,9 @@ object DirectBufferPoolFactory {
finder.find.foreach{ clazz =>
try {
- val SPI = clazz.newInstance.asInstanceOf[SPI]
- direct_buffer_pools_spis ::= SPI
+ spis ::= clazz.newInstance.asInstanceOf[SPI]
} catch {
- case e:Throwable =>
- e.printStackTrace
+ case e:Throwable => e.printStackTrace
}
}
@@ -48,7 +46,7 @@ object DirectBufferPoolFactory {
if( config == null ) {
return null
}
- direct_buffer_pools_spis.foreach { spi=>
+ spis.foreach { spi=>
val rc = spi.create(config)
if( rc!=null ) {
return rc
@@ -62,7 +60,7 @@ object DirectBufferPoolFactory {
if( config == null ) {
return true
} else {
- direct_buffer_pools_spis.foreach { spi=>
+ spis.foreach { spi=>
if( spi.validate(config) ) {
return true
}
Modified: activemq/sandbox/activemq-apollo-actor/activemq-web/src/main/scala/org/apache/activemq/apollo/web/resources/RuntimeResource.scala
URL: http://svn.apache.org/viewvc/activemq/sandbox/activemq-apollo-actor/activemq-web/src/main/scala/org/apache/activemq/apollo/web/resources/RuntimeResource.scala?rev=961218&r1=961217&r2=961218&view=diff
==============================================================================
--- activemq/sandbox/activemq-apollo-actor/activemq-web/src/main/scala/org/apache/activemq/apollo/web/resources/RuntimeResource.scala (original)
+++ activemq/sandbox/activemq-apollo-actor/activemq-web/src/main/scala/org/apache/activemq/apollo/web/resources/RuntimeResource.scala Wed Jul 7 04:23:35 2010
@@ -274,10 +274,10 @@ case class RuntimeResource(parent:Broker
result.id = connection.id
result.state = connection.serviceState.toString
result.state_since = connection.serviceState.since
- result.protocol = connection.protocol
+ result.protocol = connection.protocolHandler.protocol
result.transport = connection.transport.getTypeId
result.remote_address = connection.transport.getRemoteAddress
- val wf = connection.transport.getWireformat
+ val wf = connection.transport.getProtocolCodec
if( wf!=null ) {
result.write_counter = wf.getWriteCounter
result.read_counter = wf.getReadCounter