You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@activemq.apache.org by ch...@apache.org on 2009/06/02 23:29:35 UTC

svn commit: r781177 [2/11] - in /activemq/sandbox/activemq-flow: activemq-bio/ activemq-bio/src/main/java/org/ activemq-bio/src/main/java/org/apache/ activemq-bio/src/main/java/org/apache/activemq/ activemq-bio/src/main/java/org/apache/activemq/transpo...

Added: activemq/sandbox/activemq-flow/activemq-bio/src/main/java/org/apache/activemq/transport/tcp/TcpTransportServer.java
URL: http://svn.apache.org/viewvc/activemq/sandbox/activemq-flow/activemq-bio/src/main/java/org/apache/activemq/transport/tcp/TcpTransportServer.java?rev=781177&view=auto
==============================================================================
--- activemq/sandbox/activemq-flow/activemq-bio/src/main/java/org/apache/activemq/transport/tcp/TcpTransportServer.java (added)
+++ activemq/sandbox/activemq-flow/activemq-bio/src/main/java/org/apache/activemq/transport/tcp/TcpTransportServer.java Tue Jun  2 21:29:30 2009
@@ -0,0 +1,485 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.activemq.transport.tcp;
+
+import java.io.IOException;
+import java.net.InetAddress;
+import java.net.InetSocketAddress;
+import java.net.ServerSocket;
+import java.net.Socket;
+import java.net.SocketException;
+import java.net.SocketTimeoutException;
+import java.net.URI;
+import java.net.URISyntaxException;
+import java.net.UnknownHostException;
+import java.util.HashMap;
+import java.util.Map;
+import java.util.concurrent.BlockingQueue;
+import java.util.concurrent.LinkedBlockingQueue;
+import java.util.concurrent.TimeUnit;
+
+import javax.net.ServerSocketFactory;
+
+import org.apache.activemq.Service;
+import org.apache.activemq.ThreadPriorities;
+import org.apache.activemq.command.BrokerInfo;
+import org.apache.activemq.openwire.OpenWireFormatFactory;
+import org.apache.activemq.transport.Transport;
+//import org.apache.activemq.transport.TransportLoggerFactory;
+import org.apache.activemq.transport.TransportServer;
+import org.apache.activemq.transport.TransportServerThreadSupport;
+import org.apache.activemq.util.IOExceptionSupport;
+import org.apache.activemq.util.IntrospectionSupport;
+import org.apache.activemq.util.ServiceListener;
+import org.apache.activemq.util.ServiceStopper;
+import org.apache.activemq.util.ServiceSupport;
+import org.apache.activemq.wireformat.WireFormat;
+import org.apache.activemq.wireformat.WireFormatFactory;
+import org.apache.commons.logging.Log;
+import org.apache.commons.logging.LogFactory;
+
+/**
+ * A TCP based implementation of {@link TransportServer}
+ * 
+ * @author David Martin Clavo david(dot)martin(dot)clavo(at)gmail.com (logging improvement modifications)
+ * @version $Revision: 1.1 $
+ */
+
+public class TcpTransportServer extends TransportServerThreadSupport implements ServiceListener{
+
+    private static final Log LOG = LogFactory.getLog(TcpTransportServer.class);
+    protected ServerSocket serverSocket;
+    protected int backlog = 5000;
+    protected WireFormatFactory wireFormatFactory = new OpenWireFormatFactory();
+    protected final TcpTransportFactory transportFactory;
+    protected long maxInactivityDuration = 30000;
+    protected long maxInactivityDurationInitalDelay = 10000;
+    protected int minmumWireFormatVersion;
+    protected boolean useQueueForAccept=true;
+       
+    /**
+     * trace=true -> the Transport stack where this TcpTransport
+     * object will be, will have a TransportLogger layer
+     * trace=false -> the Transport stack where this TcpTransport
+     * object will be, will NOT have a TransportLogger layer, and therefore
+     * will never be able to print logging messages.
+     * This parameter is most probably set in Connection or TransportConnector URIs.
+     */
+    protected boolean trace = false;
+
+    protected int soTimeout = 0;
+    protected int socketBufferSize = 64 * 1024;
+    protected int connectionTimeout =  30000;
+
+//    /**
+//     * Name of the LogWriter implementation to use.
+//     * Names are mapped to classes in the resources/META-INF/services/org/apache/activemq/transport/logwriters directory.
+//     * This parameter is most probably set in Connection or TransportConnector URIs.
+//     */
+//    protected String logWriterName = TransportLoggerFactory.defaultLogWriterName;
+    /**
+     * Specifies if the TransportLogger will be manageable by JMX or not.
+     * Also, as long as there is at least 1 TransportLogger which is manageable,
+     * a TransportLoggerControl MBean will me created.
+     */
+    protected boolean dynamicManagement = false;
+    /**
+     * startLogging=true -> the TransportLogger object of the Transport stack
+     * will initially write messages to the log.
+     * startLogging=false -> the TransportLogger object of the Transport stack
+     * will initially NOT write messages to the log.
+     * This parameter only has an effect if trace == true.
+     * This parameter is most probably set in Connection or TransportConnector URIs.
+     */
+    protected boolean startLogging = true;
+    protected Map<String, Object> transportOptions;
+    protected final ServerSocketFactory serverSocketFactory;
+    protected BlockingQueue<Socket> socketQueue = new LinkedBlockingQueue<Socket>();
+    protected Thread socketHandlerThread;
+    /**
+     * The maximum number of sockets allowed for this server
+     */
+    protected int maximumConnections = Integer.MAX_VALUE;
+    protected int currentTransportCount=0;
+  
+    public TcpTransportServer(TcpTransportFactory transportFactory, URI location, ServerSocketFactory serverSocketFactory) throws IOException, URISyntaxException {
+        super(location);
+        this.transportFactory = transportFactory;
+        this.serverSocketFactory = serverSocketFactory;
+        
+    }
+
+    public void bind() throws IOException {
+        URI bind = getBindLocation();
+
+        String host = bind.getHost();
+        host = (host == null || host.length() == 0) ? "localhost" : host;
+        InetAddress addr = InetAddress.getByName(host);
+
+        try {
+
+            this.serverSocket = serverSocketFactory.createServerSocket(bind.getPort(), backlog, addr);
+            configureServerSocket(this.serverSocket);
+            
+        } catch (IOException e) {
+            throw IOExceptionSupport.create("Failed to bind to server socket: " + bind + " due to: " + e, e);
+        }
+        try {
+            setConnectURI(new URI(bind.getScheme(), bind.getUserInfo(), resolveHostName(serverSocket, addr), serverSocket.getLocalPort(), bind.getPath(), bind.getQuery(), bind
+                .getFragment()));
+        } catch (URISyntaxException e) {
+
+            // it could be that the host name contains invalid characters such
+            // as _ on unix platforms
+            // so lets try use the IP address instead
+            try {
+                setConnectURI(new URI(bind.getScheme(), bind.getUserInfo(), addr.getHostAddress(), serverSocket.getLocalPort(), bind.getPath(), bind.getQuery(), bind.getFragment()));
+            } catch (URISyntaxException e2) {
+                throw IOExceptionSupport.create(e2);
+            }
+        }
+    }
+
+    private void configureServerSocket(ServerSocket socket) throws SocketException {
+        socket.setSoTimeout(2000);
+        if (transportOptions != null) {
+            IntrospectionSupport.setProperties(socket, transportOptions);
+        }
+    }
+
+    /**
+     * @return Returns the wireFormatFactory.
+     */
+    public WireFormatFactory getWireFormatFactory() {
+        return wireFormatFactory;
+    }
+
+    /**
+     * @param wireFormatFactory The wireFormatFactory to set.
+     */
+    public void setWireFormatFactory(WireFormatFactory wireFormatFactory) {
+        this.wireFormatFactory = wireFormatFactory;
+    }
+
+    /**
+     * Associates a broker info with the transport server so that the transport
+     * can do discovery advertisements of the broker.
+     * 
+     * @param brokerInfo
+     */
+    public void setBrokerInfo(BrokerInfo brokerInfo) {
+    }
+
+    public long getMaxInactivityDuration() {
+        return maxInactivityDuration;
+    }
+
+    public void setMaxInactivityDuration(long maxInactivityDuration) {
+        this.maxInactivityDuration = maxInactivityDuration;
+    }
+    
+    public long getMaxInactivityDurationInitalDelay() {
+        return this.maxInactivityDurationInitalDelay;
+    }
+
+    public void setMaxInactivityDurationInitalDelay(long maxInactivityDurationInitalDelay) {
+        this.maxInactivityDurationInitalDelay = maxInactivityDurationInitalDelay;
+    }
+
+    public int getMinmumWireFormatVersion() {
+        return minmumWireFormatVersion;
+    }
+
+    public void setMinmumWireFormatVersion(int minmumWireFormatVersion) {
+        this.minmumWireFormatVersion = minmumWireFormatVersion;
+    }
+
+    public boolean isTrace() {
+        return trace;
+    }
+
+    public void setTrace(boolean trace) {
+        this.trace = trace;
+    }
+    
+//    public String getLogWriterName() {
+//        return logWriterName;
+//    }
+//
+//    public void setLogWriterName(String logFormat) {
+//        this.logWriterName = logFormat;
+//    }        
+
+    public boolean isDynamicManagement() {
+        return dynamicManagement;
+    }
+
+    public void setDynamicManagement(boolean useJmx) {
+        this.dynamicManagement = useJmx;
+    }
+
+    public boolean isStartLogging() {
+        return startLogging;
+    }
+
+
+    public void setStartLogging(boolean startLogging) {
+        this.startLogging = startLogging;
+    }
+    
+    /**
+     * @return the backlog
+     */
+    public int getBacklog() {
+        return backlog;
+    }
+
+    /**
+     * @param backlog the backlog to set
+     */
+    public void setBacklog(int backlog) {
+        this.backlog = backlog;
+    }
+
+    /**
+     * @return the useQueueForAccept
+     */
+    public boolean isUseQueueForAccept() {
+        return useQueueForAccept;
+    }
+
+    /**
+     * @param useQueueForAccept the useQueueForAccept to set
+     */
+    public void setUseQueueForAccept(boolean useQueueForAccept) {
+        this.useQueueForAccept = useQueueForAccept;
+    }
+    
+
+    /**
+     * pull Sockets from the ServerSocket
+     */
+    public void run() {
+        while (!isStopped()) {
+            Socket socket = null;
+            try {
+                socket = serverSocket.accept();
+                if (socket != null) {
+                    if (isStopped() || getAcceptListener() == null) {
+                        socket.close();
+                    } else {
+                        if (useQueueForAccept) {
+                            socketQueue.put(socket);
+                        }else {
+                            handleSocket(socket);
+                        }
+                    }
+                }
+            } catch (SocketTimeoutException ste) {
+                // expect this to happen
+            } catch (Exception e) {
+                if (!isStopping()) {
+                    onAcceptError(e);
+                } else if (!isStopped()) {
+                    LOG.warn("run()", e);
+                    onAcceptError(e);
+                }
+            }
+        }
+    }
+
+    /**
+     * Allow derived classes to override the Transport implementation that this
+     * transport server creates.
+     * 
+     * @param socket
+     * @param format
+     * @return
+     * @throws IOException
+     */
+    protected  Transport createTransport(Socket socket, WireFormat format) throws IOException {
+        return new TcpTransport(format, socket);
+    }
+
+    /**
+     * @return pretty print of this
+     */
+    public String toString() {
+        return "" + getBindLocation();
+    }
+
+    /**
+     * @param socket 
+     * @param inetAddress
+     * @return real hostName
+     * @throws UnknownHostException
+     */
+    protected String resolveHostName(ServerSocket socket, InetAddress bindAddress) throws UnknownHostException {
+        String result = null;
+        if (socket.isBound()) {
+            if (socket.getInetAddress().isAnyLocalAddress()) {
+                // make it more human readable and useful, an alternative to 0.0.0.0
+                result = InetAddress.getLocalHost().getHostName();
+            } else {
+                result = socket.getInetAddress().getCanonicalHostName();
+            }
+        } else {
+            result = bindAddress.getCanonicalHostName();
+        }
+        return result;
+    }
+    
+    protected void doStart() throws Exception {
+        if(useQueueForAccept) {
+            Runnable run = new Runnable() {
+                public void run() {
+                    try {
+                        while (!isStopped() && !isStopping()) {
+                            Socket sock = socketQueue.poll(1, TimeUnit.SECONDS);
+                            if (sock != null) {
+                                handleSocket(sock);
+                            }
+                        }
+    
+                    } catch (InterruptedException e) {
+                        LOG.info("socketQueue interuppted - stopping");
+                        if (!isStopping()) {
+                            onAcceptError(e);
+                        }
+                    }
+    
+                }
+    
+            };
+            socketHandlerThread = new Thread(null, run,
+                    "ActiveMQ Transport Server Thread Handler: " + toString(),
+                    getStackSize());
+            socketHandlerThread.setDaemon(true);
+            socketHandlerThread.setPriority(ThreadPriorities.BROKER_MANAGEMENT-1);
+            socketHandlerThread.start();
+        }
+        super.doStart();
+        
+    }
+
+    protected void doStop(ServiceStopper stopper) throws Exception {
+        super.doStop(stopper);
+        if (serverSocket != null) {
+            serverSocket.close();
+        }
+    }
+
+    public InetSocketAddress getSocketAddress() {
+        return (InetSocketAddress)serverSocket.getLocalSocketAddress();
+    }
+
+    public void setTransportOption(Map<String, Object> transportOptions) {
+        this.transportOptions = transportOptions;
+    }
+    
+    protected final void handleSocket(Socket socket) {
+        try {
+            if (this.currentTransportCount >= this.maximumConnections) {
+                
+            }else {
+            HashMap<String, Object> options = new HashMap<String, Object>();
+            options.put("maxInactivityDuration", Long
+                    .valueOf(maxInactivityDuration));
+            options.put("maxInactivityDurationInitalDelay", Long
+                    .valueOf(maxInactivityDurationInitalDelay));            
+            options.put("minmumWireFormatVersion", Integer
+                    .valueOf(minmumWireFormatVersion));
+            options.put("trace", Boolean.valueOf(trace));
+            options.put("soTimeout", Integer.valueOf(soTimeout));
+            options.put("socketBufferSize", Integer.valueOf(socketBufferSize));
+            options.put("connectionTimeout", Integer.valueOf(connectionTimeout));
+//            options.put("logWriterName", logWriterName);
+            options
+                    .put("dynamicManagement", Boolean
+                            .valueOf(dynamicManagement));
+            options.put("startLogging", Boolean.valueOf(startLogging));
+
+            options.putAll(transportOptions);
+            WireFormat format = wireFormatFactory.createWireFormat();
+            Transport transport = createTransport(socket, format);
+            if (transport instanceof ServiceSupport) {
+                ((ServiceSupport) transport).addServiceListener(this);
+            }
+            Transport configuredTransport = transportFactory.serverConfigure(
+                    transport, format, options);
+            getAcceptListener().onAccept(configuredTransport);
+            }
+        } catch (SocketTimeoutException ste) {
+            // expect this to happen
+        } catch (Exception e) {
+            if (!isStopping()) {
+                onAcceptError(e);
+            } else if (!isStopped()) {
+                LOG.warn("run()", e);
+                onAcceptError(e);
+            }
+        }
+        
+    }    
+
+	public int getSoTimeout() {
+		return soTimeout;
+	}
+
+	public void setSoTimeout(int soTimeout) {
+		this.soTimeout = soTimeout;
+	}
+
+	public int getSocketBufferSize() {
+		return socketBufferSize;
+	}
+
+	public void setSocketBufferSize(int socketBufferSize) {
+		this.socketBufferSize = socketBufferSize;
+	}
+
+	public int getConnectionTimeout() {
+		return connectionTimeout;
+	}
+
+	public void setConnectionTimeout(int connectionTimeout) {
+		this.connectionTimeout = connectionTimeout;
+	}
+
+    /**
+     * @return the maximumConnections
+     */
+    public int getMaximumConnections() {
+        return maximumConnections;
+    }
+
+    /**
+     * @param maximumConnections the maximumConnections to set
+     */
+    public void setMaximumConnections(int maximumConnections) {
+        this.maximumConnections = maximumConnections;
+    }
+
+    
+    public void started(Service service) {
+       this.currentTransportCount++;
+    }
+
+    public void stopped(Service service) {
+        this.currentTransportCount--;
+    }
+}
\ No newline at end of file

Propchange: activemq/sandbox/activemq-flow/activemq-bio/src/main/java/org/apache/activemq/transport/tcp/TcpTransportServer.java
------------------------------------------------------------------------------
    svn:executable = *

Added: activemq/sandbox/activemq-flow/activemq-bio/src/main/java/org/apache/activemq/transport/tcp/package.html
URL: http://svn.apache.org/viewvc/activemq/sandbox/activemq-flow/activemq-bio/src/main/java/org/apache/activemq/transport/tcp/package.html?rev=781177&view=auto
==============================================================================
--- activemq/sandbox/activemq-flow/activemq-bio/src/main/java/org/apache/activemq/transport/tcp/package.html (added)
+++ activemq/sandbox/activemq-flow/activemq-bio/src/main/java/org/apache/activemq/transport/tcp/package.html Tue Jun  2 21:29:30 2009
@@ -0,0 +1,25 @@
+<!--
+    Licensed to the Apache Software Foundation (ASF) under one or more
+    contributor license agreements.  See the NOTICE file distributed with
+    this work for additional information regarding copyright ownership.
+    The ASF licenses this file to You under the Apache License, Version 2.0
+    (the "License"); you may not use this file except in compliance with
+    the License.  You may obtain a copy of the License at
+   
+    http://www.apache.org/licenses/LICENSE-2.0
+   
+    Unless required by applicable law or agreed to in writing, software
+    distributed under the License is distributed on an "AS IS" BASIS,
+    WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+    See the License for the specific language governing permissions and
+    limitations under the License.
+-->
+<html>
+<head>
+</head>
+<body>
+
+TCP/IP based Transport implementation.
+
+</body>
+</html>

Propchange: activemq/sandbox/activemq-flow/activemq-bio/src/main/java/org/apache/activemq/transport/tcp/package.html
------------------------------------------------------------------------------
    svn:executable = *

Modified: activemq/sandbox/activemq-flow/activemq-client/pom.xml
URL: http://svn.apache.org/viewvc/activemq/sandbox/activemq-flow/activemq-client/pom.xml?rev=781177&r1=781176&r2=781177&view=diff
==============================================================================
--- activemq/sandbox/activemq-flow/activemq-client/pom.xml (original)
+++ activemq/sandbox/activemq-flow/activemq-client/pom.xml Tue Jun  2 21:29:30 2009
@@ -41,11 +41,13 @@
       <groupId>org.apache.activemq</groupId>
       <artifactId>activemq-openwire</artifactId>
     </dependency>
-
     <dependency>
-      <groupId>org.apache.activemq</groupId>
-      <artifactId>activemq-core</artifactId>
-      <version>5.3-SNAPSHOT</version>
+      <groupId>org.apache.geronimo.specs</groupId>
+      <artifactId>geronimo-jms_1.1_spec</artifactId>
+    </dependency>
+    <dependency>
+      <groupId>org.apache.geronimo.specs</groupId>
+      <artifactId>geronimo-j2ee-management_1.0_spec</artifactId>
     </dependency>
 
     <!-- Testing Dependencies -->