You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@activemq.apache.org by js...@apache.org on 2006/03/08 15:44:36 UTC

svn commit: r384225 - in /incubator/activemq/trunk/activemq-core/src/main/java/org/apache/activemq/transport/udp: CommandChannel.java CommandProcessor.java UdpTransport.java UdpTransportFactory.java UdpTransportServer.java UdpTransportServerClient.java

Author: jstrachan
Date: Wed Mar  8 06:44:35 2006
New Revision: 384225

URL: http://svn.apache.org/viewcvs?rev=384225&view=rev
Log:
added spike of a UDP server together with applying the transport refactorings to the UDP transport

Added:
    incubator/activemq/trunk/activemq-core/src/main/java/org/apache/activemq/transport/udp/CommandProcessor.java   (with props)
    incubator/activemq/trunk/activemq-core/src/main/java/org/apache/activemq/transport/udp/UdpTransportServer.java   (with props)
    incubator/activemq/trunk/activemq-core/src/main/java/org/apache/activemq/transport/udp/UdpTransportServerClient.java   (with props)
Modified:
    incubator/activemq/trunk/activemq-core/src/main/java/org/apache/activemq/transport/udp/CommandChannel.java
    incubator/activemq/trunk/activemq-core/src/main/java/org/apache/activemq/transport/udp/UdpTransport.java
    incubator/activemq/trunk/activemq-core/src/main/java/org/apache/activemq/transport/udp/UdpTransportFactory.java

Modified: incubator/activemq/trunk/activemq-core/src/main/java/org/apache/activemq/transport/udp/CommandChannel.java
URL: http://svn.apache.org/viewcvs/incubator/activemq/trunk/activemq-core/src/main/java/org/apache/activemq/transport/udp/CommandChannel.java?rev=384225&r1=384224&r2=384225&view=diff
==============================================================================
--- incubator/activemq/trunk/activemq-core/src/main/java/org/apache/activemq/transport/udp/CommandChannel.java (original)
+++ incubator/activemq/trunk/activemq-core/src/main/java/org/apache/activemq/transport/udp/CommandChannel.java Wed Mar  8 06:44:35 2006
@@ -29,6 +29,7 @@
 import java.io.DataInputStream;
 import java.io.DataOutputStream;
 import java.io.IOException;
+import java.net.InetSocketAddress;
 import java.net.SocketAddress;
 import java.nio.ByteBuffer;
 import java.nio.channels.DatagramChannel;
@@ -87,10 +88,12 @@
         bufferPool.stop();
     }
 
-    public Command read() throws IOException {
+    public void read(CommandProcessor processor) throws IOException {
+        Command answer = null;
+        SocketAddress address = null;
         synchronized (readLock) {
             readBuffer.clear();
-            SocketAddress address = channel.receive(readBuffer);
+            address = channel.receive(readBuffer);
             readBuffer.flip();
 
             if (log.isDebugEnabled()) {
@@ -122,11 +125,18 @@
                 header.setCommand(command);
             }
 
-            return readStack.read(header);
+            answer = readStack.read(header);
+        }
+        if (answer != null) {
+            processor.process(answer, address);
         }
     }
 
     public void write(Command command) throws IOException {
+        write(command, targetAddress);
+    }
+        
+    public void write(Command command, SocketAddress address) throws IOException {
         synchronized (writeLock) {
             header.incrementCounter();
             int size = wireFormat.tightMarshal1(command, bs);
@@ -146,7 +156,7 @@
                 byte[] data = buffer.toByteArray();
                 writeBuffer.put(data);
 
-                sendWriteBuffer();
+                sendWriteBuffer(address);
             }
             else {
                 header.setPartial(true);
@@ -171,15 +181,15 @@
                     // now the data
                     writeBuffer.put(data, offset, chunkSize);
                     offset += chunkSize;
-                    sendWriteBuffer();
+                    sendWriteBuffer(address);
                 }
             }
         }
     }
 
-    protected void sendWriteBuffer() throws IOException {
+    protected void sendWriteBuffer(SocketAddress address) throws IOException {
         writeBuffer.flip();
-        channel.send(writeBuffer, targetAddress);
+        channel.send(writeBuffer, address);
     }
 
     // Properties
@@ -214,5 +224,6 @@
     public void setHeaderMarshaller(DatagramHeaderMarshaller headerMarshaller) {
         this.headerMarshaller = headerMarshaller;
     }
+
 
 }

Added: incubator/activemq/trunk/activemq-core/src/main/java/org/apache/activemq/transport/udp/CommandProcessor.java
URL: http://svn.apache.org/viewcvs/incubator/activemq/trunk/activemq-core/src/main/java/org/apache/activemq/transport/udp/CommandProcessor.java?rev=384225&view=auto
==============================================================================
--- incubator/activemq/trunk/activemq-core/src/main/java/org/apache/activemq/transport/udp/CommandProcessor.java (added)
+++ incubator/activemq/trunk/activemq-core/src/main/java/org/apache/activemq/transport/udp/CommandProcessor.java Wed Mar  8 06:44:35 2006
@@ -0,0 +1,32 @@
+/**
+ *
+ * Copyright 2005-2006 The Apache Software Foundation
+ *
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.activemq.transport.udp;
+
+import org.apache.activemq.command.Command;
+
+import java.net.SocketAddress;
+
+/**
+ * A callback used to process inbound commands
+ * 
+ * @version $Revision$
+ */
+public interface CommandProcessor {
+
+    void process(Command command, SocketAddress address);
+
+}

Propchange: incubator/activemq/trunk/activemq-core/src/main/java/org/apache/activemq/transport/udp/CommandProcessor.java
------------------------------------------------------------------------------
    svn:eol-style = native

Propchange: incubator/activemq/trunk/activemq-core/src/main/java/org/apache/activemq/transport/udp/CommandProcessor.java
------------------------------------------------------------------------------
    svn:keywords = Date Author Id Revision HeadURL

Propchange: incubator/activemq/trunk/activemq-core/src/main/java/org/apache/activemq/transport/udp/CommandProcessor.java
------------------------------------------------------------------------------
    svn:mime-type = text/plain

Modified: incubator/activemq/trunk/activemq-core/src/main/java/org/apache/activemq/transport/udp/UdpTransport.java
URL: http://svn.apache.org/viewcvs/incubator/activemq/trunk/activemq-core/src/main/java/org/apache/activemq/transport/udp/UdpTransport.java?rev=384225&r1=384224&r2=384225&view=diff
==============================================================================
--- incubator/activemq/trunk/activemq-core/src/main/java/org/apache/activemq/transport/udp/UdpTransport.java (original)
+++ incubator/activemq/trunk/activemq-core/src/main/java/org/apache/activemq/transport/udp/UdpTransport.java Wed Mar  8 06:44:35 2006
@@ -28,12 +28,10 @@
 import org.apache.commons.logging.LogFactory;
 
 import java.io.IOException;
-import java.io.InterruptedIOException;
 import java.net.DatagramSocket;
 import java.net.InetAddress;
 import java.net.InetSocketAddress;
 import java.net.SocketAddress;
-import java.net.SocketTimeoutException;
 import java.net.URI;
 import java.net.UnknownHostException;
 import java.nio.channels.AsynchronousCloseException;
@@ -58,6 +56,10 @@
     private boolean trace = false;
     private boolean useLocalHost = true;
     private int port;
+    private CommandProcessor commandProcessor = new CommandProcessor() {
+        public void process(Command command, SocketAddress address) {
+            doConsume(command);
+        }};
 
     protected UdpTransport(OpenWireFormat wireFormat) throws IOException {
         this.wireFormat = wireFormat;
@@ -72,16 +74,23 @@
         this(wireFormat);
         this.targetAddress = socketAddress;
     }
-
+    
     /**
      * A one way asynchronous send
      */
     public void oneway(Command command) throws IOException {
+        oneway(command, targetAddress);
+    }
+
+    /**
+     * A one way asynchronous send to a given address
+     */
+    public void oneway(Command command, InetSocketAddress address) throws IOException {
         if (log.isDebugEnabled()) {
             log.debug("Sending oneway from port: " + port + " to target: " + targetAddress);
         }
         checkStarted(command);
-        commandChannel.write(command);
+        commandChannel.write(command, address);
     }
 
     /**
@@ -96,10 +105,9 @@
      */
     public void run() {
         log.trace("Consumer thread starting for: " + toString());
-        while (!isClosed()) {
+        while (!isStopped()) {
             try {
-                Command command = commandChannel.read();
-                doConsume(command);
+                commandChannel.read(commandProcessor);
             }
             /*
              * catch (SocketTimeoutException e) { } catch
@@ -209,7 +217,14 @@
     
     // Implementation methods
     // -------------------------------------------------------------------------
+    protected CommandProcessor getCommandProcessor() {
+        return commandProcessor;
+    }
 
+    protected void setCommandProcessor(CommandProcessor commandProcessor) {
+        this.commandProcessor = commandProcessor;
+    }
+    
     /**
      * Creates an address from the given URI
      */
@@ -257,4 +272,5 @@
             channel.close();
         }
     }
+
 }

Modified: incubator/activemq/trunk/activemq-core/src/main/java/org/apache/activemq/transport/udp/UdpTransportFactory.java
URL: http://svn.apache.org/viewcvs/incubator/activemq/trunk/activemq-core/src/main/java/org/apache/activemq/transport/udp/UdpTransportFactory.java?rev=384225&r1=384224&r2=384225&view=diff
==============================================================================
--- incubator/activemq/trunk/activemq-core/src/main/java/org/apache/activemq/transport/udp/UdpTransportFactory.java (original)
+++ incubator/activemq/trunk/activemq-core/src/main/java/org/apache/activemq/transport/udp/UdpTransportFactory.java Wed Mar  8 06:44:35 2006
@@ -16,57 +16,46 @@
  */
 package org.apache.activemq.transport.udp;
 
-import java.io.IOException;
-import java.net.URI;
-import java.net.URISyntaxException;
-import java.net.UnknownHostException;
-import java.util.HashMap;
-import java.util.Map;
-import javax.net.ServerSocketFactory;
-import javax.net.SocketFactory;
 import org.activeio.command.WireFormat;
 import org.apache.activemq.openwire.OpenWireFormat;
 import org.apache.activemq.transport.InactivityMonitor;
-import org.apache.activemq.transport.MutexTransport;
 import org.apache.activemq.transport.ResponseCorrelator;
 import org.apache.activemq.transport.Transport;
 import org.apache.activemq.transport.TransportFactory;
 import org.apache.activemq.transport.TransportLogger;
 import org.apache.activemq.transport.TransportServer;
-import org.apache.activemq.transport.WireFormatNegotiator;
 import org.apache.activemq.util.IOExceptionSupport;
 import org.apache.activemq.util.IntrospectionSupport;
-import org.apache.activemq.util.URISupport;
-import org.apache.commons.logging.Log;
-import org.apache.commons.logging.LogFactory;
+
+import javax.net.ServerSocketFactory;
+import javax.net.SocketFactory;
+
+import java.io.IOException;
+import java.net.URI;
+import java.net.URISyntaxException;
+import java.net.UnknownHostException;
+import java.util.Map;
 
 public class UdpTransportFactory extends TransportFactory {
-    private static final Log log = LogFactory.getLog(UdpTransportFactory.class);
 
     public TransportServer doBind(String brokerId, final URI location) throws IOException {
-        throw new IOException("TransportServer not supported for UDP");
-        /*
         try {
-            Map options = new HashMap(URISupport.parseParamters(location));
-
-            return null;
-            UdpTransportServer server = new UdpTransportServer(location);
-            server.setWireFormatFactory(createWireFormatFactory(options));
-            IntrospectionSupport.setProperties(server, options);
-
+            UdpTransport transport = (UdpTransport) doConnect(location);
+            UdpTransportServer server = new UdpTransportServer(transport);
             return server;
         }
         catch (URISyntaxException e) {
             throw IOExceptionSupport.create(e);
         }
-        */
+        catch (Exception e) {
+            throw IOExceptionSupport.create(e);
+        }
     }
 
     public Transport configure(Transport transport, WireFormat format, Map options) {
         IntrospectionSupport.setProperties(transport, options);
         UdpTransport tcpTransport = (UdpTransport) transport;
-        
-        /*
+
         if (tcpTransport.isTrace()) {
             transport = new TransportLogger(transport);
         }
@@ -76,7 +65,6 @@
         }
 
         transport = new ResponseCorrelator(transport);
-        */
         return transport;
     }
 
@@ -94,25 +82,6 @@
     }
 
     protected Transport createTransport(URI location, WireFormat wf) throws UnknownHostException, IOException {
-        /*
-        URI localLocation = null;
-        String path = location.getPath();
-        // see if the path is a local URI location
-        if (path != null && path.length() > 0) {
-            int localPortIndex = path.indexOf(':');
-            try {
-                Integer.parseInt(path.substring((localPortIndex + 1), path.length()));
-                String localString = location.getScheme() + ":/" + path;
-                localLocation = new URI(localString);
-            }
-            catch (Exception e) {
-                log.warn("path isn't a valid local location for TcpTransport to use", e);
-            }
-        }
-        if (localLocation != null) {
-            return new UdpTransport(wf, location, localLocation);
-        }
-        */
         OpenWireFormat wireFormat = (OpenWireFormat) wf;
         wireFormat.setPrefixPacketSize(false);
         return new UdpTransport(wireFormat, location);

Added: incubator/activemq/trunk/activemq-core/src/main/java/org/apache/activemq/transport/udp/UdpTransportServer.java
URL: http://svn.apache.org/viewcvs/incubator/activemq/trunk/activemq-core/src/main/java/org/apache/activemq/transport/udp/UdpTransportServer.java?rev=384225&view=auto
==============================================================================
--- incubator/activemq/trunk/activemq-core/src/main/java/org/apache/activemq/transport/udp/UdpTransportServer.java (added)
+++ incubator/activemq/trunk/activemq-core/src/main/java/org/apache/activemq/transport/udp/UdpTransportServer.java Wed Mar  8 06:44:35 2006
@@ -0,0 +1,119 @@
+/**
+ *
+ * Copyright 2005-2006 The Apache Software Foundation
+ *
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.activemq.transport.udp;
+
+import org.apache.activemq.command.BrokerInfo;
+import org.apache.activemq.command.Command;
+import org.apache.activemq.transport.InactivityMonitor;
+import org.apache.activemq.transport.ResponseCorrelator;
+import org.apache.activemq.transport.Transport;
+import org.apache.activemq.transport.TransportListener;
+import org.apache.activemq.transport.TransportServer;
+import org.apache.activemq.transport.TransportServerSupport;
+import org.apache.activemq.transport.TransportSupport;
+import org.apache.activemq.util.ServiceStopper;
+import org.apache.commons.logging.Log;
+import org.apache.commons.logging.LogFactory;
+
+import java.net.SocketAddress;
+import java.util.HashMap;
+import java.util.Map;
+
+/**
+ * A UDP based implementation of {@link TransportServer}
+ * 
+ * @version $Revision$
+ */
+
+public class UdpTransportServer extends TransportServerSupport {
+    private static final Log log = LogFactory.getLog(UdpTransportServer.class);
+
+    private UdpTransport serverTransport;
+    private Map transports = new HashMap();
+
+    public UdpTransportServer(UdpTransport serverTransport) {
+        this.serverTransport = serverTransport;
+    }
+
+    public String toString() {
+        return "UdpTransportServer@" + serverTransport;
+    }
+
+    public void run() {
+    }
+
+    public UdpTransport getServerTransport() {
+        return serverTransport;
+    }
+
+    public void setBrokerInfo(BrokerInfo brokerInfo) {
+    }
+
+    protected void doStart() throws Exception {
+        serverTransport.start();
+        serverTransport.setCommandProcessor(new CommandProcessor() {
+            public void process(Command command, SocketAddress address) {
+                onInboundCommand(command, address);
+            }
+        });
+    }
+
+    protected void doStop(ServiceStopper stopper) throws Exception {
+        serverTransport.stop();
+    }
+
+    protected void onInboundCommand(Command command, SocketAddress address) {
+        Transport transport = null;
+        synchronized (transports) {
+            transport = (Transport) transports.get(address);
+            if (transport == null) {
+                transport = createTransport(address);
+                transport = configureTransport(transport);
+                transports.put(address, transport);
+            }
+        }
+        processInboundCommand(command, transport);
+    }
+
+    public void sendOutboundCommand(Command command, SocketAddress address) {
+        // TODO we should use an inbound buffer to make this async
+        
+    }
+
+    protected void processInboundCommand(Command command, Transport transport) {
+        // TODO - consider making this asynchronous
+        TransportListener listener = transport.getTransportListener();
+        if (listener != null) {
+            listener.onCommand(command);
+        }
+        else {
+            log.error("No transportListener available for transport: " + transport + " to process inbound command: " + command);
+        }
+    }
+
+    protected Transport configureTransport(Transport transport) {
+        transport = new ResponseCorrelator(transport);
+        transport = new InactivityMonitor(transport, serverTransport.getMaxInactivityDuration());
+        getAcceptListener().onAccept(transport);
+        return transport;
+    }
+
+    protected TransportSupport createTransport(SocketAddress address) {
+        return new UdpTransportServerClient(this, address);
+    }
+
+}

Propchange: incubator/activemq/trunk/activemq-core/src/main/java/org/apache/activemq/transport/udp/UdpTransportServer.java
------------------------------------------------------------------------------
    svn:eol-style = native

Propchange: incubator/activemq/trunk/activemq-core/src/main/java/org/apache/activemq/transport/udp/UdpTransportServer.java
------------------------------------------------------------------------------
    svn:executable = *

Propchange: incubator/activemq/trunk/activemq-core/src/main/java/org/apache/activemq/transport/udp/UdpTransportServer.java
------------------------------------------------------------------------------
    svn:keywords = Date Author Id Revision HeadURL

Propchange: incubator/activemq/trunk/activemq-core/src/main/java/org/apache/activemq/transport/udp/UdpTransportServer.java
------------------------------------------------------------------------------
    svn:mime-type = text/plain

Added: incubator/activemq/trunk/activemq-core/src/main/java/org/apache/activemq/transport/udp/UdpTransportServerClient.java
URL: http://svn.apache.org/viewcvs/incubator/activemq/trunk/activemq-core/src/main/java/org/apache/activemq/transport/udp/UdpTransportServerClient.java?rev=384225&view=auto
==============================================================================
--- incubator/activemq/trunk/activemq-core/src/main/java/org/apache/activemq/transport/udp/UdpTransportServerClient.java (added)
+++ incubator/activemq/trunk/activemq-core/src/main/java/org/apache/activemq/transport/udp/UdpTransportServerClient.java Wed Mar  8 06:44:35 2006
@@ -0,0 +1,71 @@
+/**
+ *
+ * Copyright 2005-2006 The Apache Software Foundation
+ *
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.activemq.transport.udp;
+
+import org.apache.activemq.command.Command;
+import org.apache.activemq.transport.TransportSupport;
+import org.apache.activemq.util.ServiceStopper;
+import org.apache.commons.logging.Log;
+import org.apache.commons.logging.LogFactory;
+
+import java.io.IOException;
+import java.net.SocketAddress;
+import java.util.Collections;
+import java.util.Iterator;
+import java.util.LinkedList;
+import java.util.List;
+
+/**
+ * A logical server side transport instance for a remote client which works with
+ * the {@link UdpTransportServer}
+ * 
+ * @version $Revision$
+ */
+public class UdpTransportServerClient extends TransportSupport {
+    private static final Log log = LogFactory.getLog(UdpTransportServerClient.class);
+
+    private UdpTransportServer server;
+    private SocketAddress address;
+    private List queue = Collections.synchronizedList(new LinkedList());
+
+    public UdpTransportServerClient(UdpTransportServer server, SocketAddress address) {
+        this.server = server;
+        this.address = address;
+    }
+
+    public String toString() {
+        return "UdpClient@" + address;
+    }
+
+    public void oneway(Command command) throws IOException {
+        checkStarted(command);
+        server.sendOutboundCommand(command, address);
+    }
+
+    protected void doStart() throws Exception {
+        for (Iterator iter = queue.iterator(); iter.hasNext();) {
+            Command command = (Command) iter.next();
+            doConsume(command);
+            iter.remove();
+        }
+    }
+
+    protected void doStop(ServiceStopper stopper) throws Exception {
+        queue.clear();
+    }
+
+}

Propchange: incubator/activemq/trunk/activemq-core/src/main/java/org/apache/activemq/transport/udp/UdpTransportServerClient.java
------------------------------------------------------------------------------
    svn:eol-style = native

Propchange: incubator/activemq/trunk/activemq-core/src/main/java/org/apache/activemq/transport/udp/UdpTransportServerClient.java
------------------------------------------------------------------------------
    svn:executable = *

Propchange: incubator/activemq/trunk/activemq-core/src/main/java/org/apache/activemq/transport/udp/UdpTransportServerClient.java
------------------------------------------------------------------------------
    svn:keywords = Date Author Id Revision HeadURL

Propchange: incubator/activemq/trunk/activemq-core/src/main/java/org/apache/activemq/transport/udp/UdpTransportServerClient.java
------------------------------------------------------------------------------
    svn:mime-type = text/plain