You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@activemq.apache.org by js...@apache.org on 2006/03/08 15:44:36 UTC
svn commit: r384225 - in
/incubator/activemq/trunk/activemq-core/src/main/java/org/apache/activemq/transport/udp:
CommandChannel.java CommandProcessor.java UdpTransport.java
UdpTransportFactory.java UdpTransportServer.java
UdpTransportServerClient.java
Author: jstrachan
Date: Wed Mar 8 06:44:35 2006
New Revision: 384225
URL: http://svn.apache.org/viewcvs?rev=384225&view=rev
Log:
added spike of a UDP server together with applying the transport refactorings to the UDP transport
Added:
incubator/activemq/trunk/activemq-core/src/main/java/org/apache/activemq/transport/udp/CommandProcessor.java (with props)
incubator/activemq/trunk/activemq-core/src/main/java/org/apache/activemq/transport/udp/UdpTransportServer.java (with props)
incubator/activemq/trunk/activemq-core/src/main/java/org/apache/activemq/transport/udp/UdpTransportServerClient.java (with props)
Modified:
incubator/activemq/trunk/activemq-core/src/main/java/org/apache/activemq/transport/udp/CommandChannel.java
incubator/activemq/trunk/activemq-core/src/main/java/org/apache/activemq/transport/udp/UdpTransport.java
incubator/activemq/trunk/activemq-core/src/main/java/org/apache/activemq/transport/udp/UdpTransportFactory.java
Modified: incubator/activemq/trunk/activemq-core/src/main/java/org/apache/activemq/transport/udp/CommandChannel.java
URL: http://svn.apache.org/viewcvs/incubator/activemq/trunk/activemq-core/src/main/java/org/apache/activemq/transport/udp/CommandChannel.java?rev=384225&r1=384224&r2=384225&view=diff
==============================================================================
--- incubator/activemq/trunk/activemq-core/src/main/java/org/apache/activemq/transport/udp/CommandChannel.java (original)
+++ incubator/activemq/trunk/activemq-core/src/main/java/org/apache/activemq/transport/udp/CommandChannel.java Wed Mar 8 06:44:35 2006
@@ -29,6 +29,7 @@
import java.io.DataInputStream;
import java.io.DataOutputStream;
import java.io.IOException;
+import java.net.InetSocketAddress;
import java.net.SocketAddress;
import java.nio.ByteBuffer;
import java.nio.channels.DatagramChannel;
@@ -87,10 +88,12 @@
bufferPool.stop();
}
- public Command read() throws IOException {
+ public void read(CommandProcessor processor) throws IOException {
+ Command answer = null;
+ SocketAddress address = null;
synchronized (readLock) {
readBuffer.clear();
- SocketAddress address = channel.receive(readBuffer);
+ address = channel.receive(readBuffer);
readBuffer.flip();
if (log.isDebugEnabled()) {
@@ -122,11 +125,18 @@
header.setCommand(command);
}
- return readStack.read(header);
+ answer = readStack.read(header);
+ }
+ if (answer != null) {
+ processor.process(answer, address);
}
}
public void write(Command command) throws IOException {
+ write(command, targetAddress);
+ }
+
+ public void write(Command command, SocketAddress address) throws IOException {
synchronized (writeLock) {
header.incrementCounter();
int size = wireFormat.tightMarshal1(command, bs);
@@ -146,7 +156,7 @@
byte[] data = buffer.toByteArray();
writeBuffer.put(data);
- sendWriteBuffer();
+ sendWriteBuffer(address);
}
else {
header.setPartial(true);
@@ -171,15 +181,15 @@
// now the data
writeBuffer.put(data, offset, chunkSize);
offset += chunkSize;
- sendWriteBuffer();
+ sendWriteBuffer(address);
}
}
}
}
- protected void sendWriteBuffer() throws IOException {
+ protected void sendWriteBuffer(SocketAddress address) throws IOException {
writeBuffer.flip();
- channel.send(writeBuffer, targetAddress);
+ channel.send(writeBuffer, address);
}
// Properties
@@ -214,5 +224,6 @@
public void setHeaderMarshaller(DatagramHeaderMarshaller headerMarshaller) {
this.headerMarshaller = headerMarshaller;
}
+
}
Added: incubator/activemq/trunk/activemq-core/src/main/java/org/apache/activemq/transport/udp/CommandProcessor.java
URL: http://svn.apache.org/viewcvs/incubator/activemq/trunk/activemq-core/src/main/java/org/apache/activemq/transport/udp/CommandProcessor.java?rev=384225&view=auto
==============================================================================
--- incubator/activemq/trunk/activemq-core/src/main/java/org/apache/activemq/transport/udp/CommandProcessor.java (added)
+++ incubator/activemq/trunk/activemq-core/src/main/java/org/apache/activemq/transport/udp/CommandProcessor.java Wed Mar 8 06:44:35 2006
@@ -0,0 +1,32 @@
+/**
+ *
+ * Copyright 2005-2006 The Apache Software Foundation
+ *
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.activemq.transport.udp;
+
+import org.apache.activemq.command.Command;
+
+import java.net.SocketAddress;
+
+/**
+ * A callback used to process inbound commands
+ *
+ * @version $Revision$
+ */
+public interface CommandProcessor {
+
+ void process(Command command, SocketAddress address);
+
+}
Propchange: incubator/activemq/trunk/activemq-core/src/main/java/org/apache/activemq/transport/udp/CommandProcessor.java
------------------------------------------------------------------------------
svn:eol-style = native
Propchange: incubator/activemq/trunk/activemq-core/src/main/java/org/apache/activemq/transport/udp/CommandProcessor.java
------------------------------------------------------------------------------
svn:keywords = Date Author Id Revision HeadURL
Propchange: incubator/activemq/trunk/activemq-core/src/main/java/org/apache/activemq/transport/udp/CommandProcessor.java
------------------------------------------------------------------------------
svn:mime-type = text/plain
Modified: incubator/activemq/trunk/activemq-core/src/main/java/org/apache/activemq/transport/udp/UdpTransport.java
URL: http://svn.apache.org/viewcvs/incubator/activemq/trunk/activemq-core/src/main/java/org/apache/activemq/transport/udp/UdpTransport.java?rev=384225&r1=384224&r2=384225&view=diff
==============================================================================
--- incubator/activemq/trunk/activemq-core/src/main/java/org/apache/activemq/transport/udp/UdpTransport.java (original)
+++ incubator/activemq/trunk/activemq-core/src/main/java/org/apache/activemq/transport/udp/UdpTransport.java Wed Mar 8 06:44:35 2006
@@ -28,12 +28,10 @@
import org.apache.commons.logging.LogFactory;
import java.io.IOException;
-import java.io.InterruptedIOException;
import java.net.DatagramSocket;
import java.net.InetAddress;
import java.net.InetSocketAddress;
import java.net.SocketAddress;
-import java.net.SocketTimeoutException;
import java.net.URI;
import java.net.UnknownHostException;
import java.nio.channels.AsynchronousCloseException;
@@ -58,6 +56,10 @@
private boolean trace = false;
private boolean useLocalHost = true;
private int port;
+ private CommandProcessor commandProcessor = new CommandProcessor() {
+ public void process(Command command, SocketAddress address) {
+ doConsume(command);
+ }};
protected UdpTransport(OpenWireFormat wireFormat) throws IOException {
this.wireFormat = wireFormat;
@@ -72,16 +74,23 @@
this(wireFormat);
this.targetAddress = socketAddress;
}
-
+
/**
* A one way asynchronous send
*/
public void oneway(Command command) throws IOException {
+ oneway(command, targetAddress);
+ }
+
+ /**
+ * A one way asynchronous send to a given address
+ */
+ public void oneway(Command command, InetSocketAddress address) throws IOException {
if (log.isDebugEnabled()) {
log.debug("Sending oneway from port: " + port + " to target: " + targetAddress);
}
checkStarted(command);
- commandChannel.write(command);
+ commandChannel.write(command, address);
}
/**
@@ -96,10 +105,9 @@
*/
public void run() {
log.trace("Consumer thread starting for: " + toString());
- while (!isClosed()) {
+ while (!isStopped()) {
try {
- Command command = commandChannel.read();
- doConsume(command);
+ commandChannel.read(commandProcessor);
}
/*
* catch (SocketTimeoutException e) { } catch
@@ -209,7 +217,14 @@
// Implementation methods
// -------------------------------------------------------------------------
+ protected CommandProcessor getCommandProcessor() {
+ return commandProcessor;
+ }
+ protected void setCommandProcessor(CommandProcessor commandProcessor) {
+ this.commandProcessor = commandProcessor;
+ }
+
/**
* Creates an address from the given URI
*/
@@ -257,4 +272,5 @@
channel.close();
}
}
+
}
Modified: incubator/activemq/trunk/activemq-core/src/main/java/org/apache/activemq/transport/udp/UdpTransportFactory.java
URL: http://svn.apache.org/viewcvs/incubator/activemq/trunk/activemq-core/src/main/java/org/apache/activemq/transport/udp/UdpTransportFactory.java?rev=384225&r1=384224&r2=384225&view=diff
==============================================================================
--- incubator/activemq/trunk/activemq-core/src/main/java/org/apache/activemq/transport/udp/UdpTransportFactory.java (original)
+++ incubator/activemq/trunk/activemq-core/src/main/java/org/apache/activemq/transport/udp/UdpTransportFactory.java Wed Mar 8 06:44:35 2006
@@ -16,57 +16,46 @@
*/
package org.apache.activemq.transport.udp;
-import java.io.IOException;
-import java.net.URI;
-import java.net.URISyntaxException;
-import java.net.UnknownHostException;
-import java.util.HashMap;
-import java.util.Map;
-import javax.net.ServerSocketFactory;
-import javax.net.SocketFactory;
import org.activeio.command.WireFormat;
import org.apache.activemq.openwire.OpenWireFormat;
import org.apache.activemq.transport.InactivityMonitor;
-import org.apache.activemq.transport.MutexTransport;
import org.apache.activemq.transport.ResponseCorrelator;
import org.apache.activemq.transport.Transport;
import org.apache.activemq.transport.TransportFactory;
import org.apache.activemq.transport.TransportLogger;
import org.apache.activemq.transport.TransportServer;
-import org.apache.activemq.transport.WireFormatNegotiator;
import org.apache.activemq.util.IOExceptionSupport;
import org.apache.activemq.util.IntrospectionSupport;
-import org.apache.activemq.util.URISupport;
-import org.apache.commons.logging.Log;
-import org.apache.commons.logging.LogFactory;
+
+import javax.net.ServerSocketFactory;
+import javax.net.SocketFactory;
+
+import java.io.IOException;
+import java.net.URI;
+import java.net.URISyntaxException;
+import java.net.UnknownHostException;
+import java.util.Map;
public class UdpTransportFactory extends TransportFactory {
- private static final Log log = LogFactory.getLog(UdpTransportFactory.class);
public TransportServer doBind(String brokerId, final URI location) throws IOException {
- throw new IOException("TransportServer not supported for UDP");
- /*
try {
- Map options = new HashMap(URISupport.parseParamters(location));
-
- return null;
- UdpTransportServer server = new UdpTransportServer(location);
- server.setWireFormatFactory(createWireFormatFactory(options));
- IntrospectionSupport.setProperties(server, options);
-
+ UdpTransport transport = (UdpTransport) doConnect(location);
+ UdpTransportServer server = new UdpTransportServer(transport);
return server;
}
catch (URISyntaxException e) {
throw IOExceptionSupport.create(e);
}
- */
+ catch (Exception e) {
+ throw IOExceptionSupport.create(e);
+ }
}
public Transport configure(Transport transport, WireFormat format, Map options) {
IntrospectionSupport.setProperties(transport, options);
UdpTransport tcpTransport = (UdpTransport) transport;
-
- /*
+
if (tcpTransport.isTrace()) {
transport = new TransportLogger(transport);
}
@@ -76,7 +65,6 @@
}
transport = new ResponseCorrelator(transport);
- */
return transport;
}
@@ -94,25 +82,6 @@
}
protected Transport createTransport(URI location, WireFormat wf) throws UnknownHostException, IOException {
- /*
- URI localLocation = null;
- String path = location.getPath();
- // see if the path is a local URI location
- if (path != null && path.length() > 0) {
- int localPortIndex = path.indexOf(':');
- try {
- Integer.parseInt(path.substring((localPortIndex + 1), path.length()));
- String localString = location.getScheme() + ":/" + path;
- localLocation = new URI(localString);
- }
- catch (Exception e) {
- log.warn("path isn't a valid local location for TcpTransport to use", e);
- }
- }
- if (localLocation != null) {
- return new UdpTransport(wf, location, localLocation);
- }
- */
OpenWireFormat wireFormat = (OpenWireFormat) wf;
wireFormat.setPrefixPacketSize(false);
return new UdpTransport(wireFormat, location);
Added: incubator/activemq/trunk/activemq-core/src/main/java/org/apache/activemq/transport/udp/UdpTransportServer.java
URL: http://svn.apache.org/viewcvs/incubator/activemq/trunk/activemq-core/src/main/java/org/apache/activemq/transport/udp/UdpTransportServer.java?rev=384225&view=auto
==============================================================================
--- incubator/activemq/trunk/activemq-core/src/main/java/org/apache/activemq/transport/udp/UdpTransportServer.java (added)
+++ incubator/activemq/trunk/activemq-core/src/main/java/org/apache/activemq/transport/udp/UdpTransportServer.java Wed Mar 8 06:44:35 2006
@@ -0,0 +1,119 @@
+/**
+ *
+ * Copyright 2005-2006 The Apache Software Foundation
+ *
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.activemq.transport.udp;
+
+import org.apache.activemq.command.BrokerInfo;
+import org.apache.activemq.command.Command;
+import org.apache.activemq.transport.InactivityMonitor;
+import org.apache.activemq.transport.ResponseCorrelator;
+import org.apache.activemq.transport.Transport;
+import org.apache.activemq.transport.TransportListener;
+import org.apache.activemq.transport.TransportServer;
+import org.apache.activemq.transport.TransportServerSupport;
+import org.apache.activemq.transport.TransportSupport;
+import org.apache.activemq.util.ServiceStopper;
+import org.apache.commons.logging.Log;
+import org.apache.commons.logging.LogFactory;
+
+import java.net.SocketAddress;
+import java.util.HashMap;
+import java.util.Map;
+
+/**
+ * A UDP based implementation of {@link TransportServer}
+ *
+ * @version $Revision$
+ */
+
+public class UdpTransportServer extends TransportServerSupport {
+ private static final Log log = LogFactory.getLog(UdpTransportServer.class);
+
+ private UdpTransport serverTransport;
+ private Map transports = new HashMap();
+
+ public UdpTransportServer(UdpTransport serverTransport) {
+ this.serverTransport = serverTransport;
+ }
+
+ public String toString() {
+ return "UdpTransportServer@" + serverTransport;
+ }
+
+ public void run() {
+ }
+
+ public UdpTransport getServerTransport() {
+ return serverTransport;
+ }
+
+ public void setBrokerInfo(BrokerInfo brokerInfo) {
+ }
+
+ protected void doStart() throws Exception {
+ serverTransport.start();
+ serverTransport.setCommandProcessor(new CommandProcessor() {
+ public void process(Command command, SocketAddress address) {
+ onInboundCommand(command, address);
+ }
+ });
+ }
+
+ protected void doStop(ServiceStopper stopper) throws Exception {
+ serverTransport.stop();
+ }
+
+ protected void onInboundCommand(Command command, SocketAddress address) {
+ Transport transport = null;
+ synchronized (transports) {
+ transport = (Transport) transports.get(address);
+ if (transport == null) {
+ transport = createTransport(address);
+ transport = configureTransport(transport);
+ transports.put(address, transport);
+ }
+ }
+ processInboundCommand(command, transport);
+ }
+
+ public void sendOutboundCommand(Command command, SocketAddress address) {
+ // TODO we should use an inbound buffer to make this async
+
+ }
+
+ protected void processInboundCommand(Command command, Transport transport) {
+ // TODO - consider making this asynchronous
+ TransportListener listener = transport.getTransportListener();
+ if (listener != null) {
+ listener.onCommand(command);
+ }
+ else {
+ log.error("No transportListener available for transport: " + transport + " to process inbound command: " + command);
+ }
+ }
+
+ protected Transport configureTransport(Transport transport) {
+ transport = new ResponseCorrelator(transport);
+ transport = new InactivityMonitor(transport, serverTransport.getMaxInactivityDuration());
+ getAcceptListener().onAccept(transport);
+ return transport;
+ }
+
+ protected TransportSupport createTransport(SocketAddress address) {
+ return new UdpTransportServerClient(this, address);
+ }
+
+}
Propchange: incubator/activemq/trunk/activemq-core/src/main/java/org/apache/activemq/transport/udp/UdpTransportServer.java
------------------------------------------------------------------------------
svn:eol-style = native
Propchange: incubator/activemq/trunk/activemq-core/src/main/java/org/apache/activemq/transport/udp/UdpTransportServer.java
------------------------------------------------------------------------------
svn:executable = *
Propchange: incubator/activemq/trunk/activemq-core/src/main/java/org/apache/activemq/transport/udp/UdpTransportServer.java
------------------------------------------------------------------------------
svn:keywords = Date Author Id Revision HeadURL
Propchange: incubator/activemq/trunk/activemq-core/src/main/java/org/apache/activemq/transport/udp/UdpTransportServer.java
------------------------------------------------------------------------------
svn:mime-type = text/plain
Added: incubator/activemq/trunk/activemq-core/src/main/java/org/apache/activemq/transport/udp/UdpTransportServerClient.java
URL: http://svn.apache.org/viewcvs/incubator/activemq/trunk/activemq-core/src/main/java/org/apache/activemq/transport/udp/UdpTransportServerClient.java?rev=384225&view=auto
==============================================================================
--- incubator/activemq/trunk/activemq-core/src/main/java/org/apache/activemq/transport/udp/UdpTransportServerClient.java (added)
+++ incubator/activemq/trunk/activemq-core/src/main/java/org/apache/activemq/transport/udp/UdpTransportServerClient.java Wed Mar 8 06:44:35 2006
@@ -0,0 +1,71 @@
+/**
+ *
+ * Copyright 2005-2006 The Apache Software Foundation
+ *
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.activemq.transport.udp;
+
+import org.apache.activemq.command.Command;
+import org.apache.activemq.transport.TransportSupport;
+import org.apache.activemq.util.ServiceStopper;
+import org.apache.commons.logging.Log;
+import org.apache.commons.logging.LogFactory;
+
+import java.io.IOException;
+import java.net.SocketAddress;
+import java.util.Collections;
+import java.util.Iterator;
+import java.util.LinkedList;
+import java.util.List;
+
+/**
+ * A logical server side transport instance for a remote client which works with
+ * the {@link UdpTransportServer}
+ *
+ * @version $Revision$
+ */
+public class UdpTransportServerClient extends TransportSupport {
+ private static final Log log = LogFactory.getLog(UdpTransportServerClient.class);
+
+ private UdpTransportServer server;
+ private SocketAddress address;
+ private List queue = Collections.synchronizedList(new LinkedList());
+
+ public UdpTransportServerClient(UdpTransportServer server, SocketAddress address) {
+ this.server = server;
+ this.address = address;
+ }
+
+ public String toString() {
+ return "UdpClient@" + address;
+ }
+
+ public void oneway(Command command) throws IOException {
+ checkStarted(command);
+ server.sendOutboundCommand(command, address);
+ }
+
+ protected void doStart() throws Exception {
+ for (Iterator iter = queue.iterator(); iter.hasNext();) {
+ Command command = (Command) iter.next();
+ doConsume(command);
+ iter.remove();
+ }
+ }
+
+ protected void doStop(ServiceStopper stopper) throws Exception {
+ queue.clear();
+ }
+
+}
Propchange: incubator/activemq/trunk/activemq-core/src/main/java/org/apache/activemq/transport/udp/UdpTransportServerClient.java
------------------------------------------------------------------------------
svn:eol-style = native
Propchange: incubator/activemq/trunk/activemq-core/src/main/java/org/apache/activemq/transport/udp/UdpTransportServerClient.java
------------------------------------------------------------------------------
svn:executable = *
Propchange: incubator/activemq/trunk/activemq-core/src/main/java/org/apache/activemq/transport/udp/UdpTransportServerClient.java
------------------------------------------------------------------------------
svn:keywords = Date Author Id Revision HeadURL
Propchange: incubator/activemq/trunk/activemq-core/src/main/java/org/apache/activemq/transport/udp/UdpTransportServerClient.java
------------------------------------------------------------------------------
svn:mime-type = text/plain