You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@tomee.apache.org by db...@apache.org on 2008/08/31 05:23:04 UTC

svn commit: r690629 - in /openejb/trunk/openejb3/server: ./ openejb-client/src/main/java/org/apache/openejb/client/ openejb-client/src/test/java/org/apache/openejb/client/ openejb-discovery/ openejb-discovery/src/ openejb-discovery/src/main/java/org/ap...

Author: dblevins
Date: Sat Aug 30 20:23:03 2008
New Revision: 690629

URL: http://svn.apache.org/viewvc?rev=690629&view=rev
Log:
OPENEJB-904: Pluggable Client/Server connection strategies and factories
OPENEJB-903: Multicast discovery and grouping

903 is only partially complete

Added:
    openejb/trunk/openejb3/server/openejb-client/src/main/java/org/apache/openejb/client/ConnectionStrategy.java
      - copied, changed from r689515, openejb/trunk/openejb3/server/openejb-client/src/main/java/org/apache/openejb/client/ConnectionFactoryStrategy.java
    openejb/trunk/openejb3/server/openejb-client/src/main/java/org/apache/openejb/client/MulticastSearch.java
    openejb/trunk/openejb3/server/openejb-client/src/main/java/org/apache/openejb/client/Registry.java
    openejb/trunk/openejb3/server/openejb-client/src/main/java/org/apache/openejb/client/StickyConnectionStrategy.java
      - copied, changed from r689515, openejb/trunk/openejb3/server/openejb-client/src/main/java/org/apache/openejb/client/StickToLastServerConnectionFactoryStrategy.java
    openejb/trunk/openejb3/server/openejb-client/src/test/java/org/apache/openejb/client/StickyConnectionStrategyTest.java
      - copied, changed from r689515, openejb/trunk/openejb3/server/openejb-client/src/test/java/org/apache/openejb/client/StickToLastServerConnectionFactoryStrategyTest.java
    openejb/trunk/openejb3/server/openejb-discovery/
      - copied from r689515, openejb/trunk/openejb3/server/openejb-admin/
    openejb/trunk/openejb3/server/openejb-discovery/pom.xml
      - copied, changed from r690620, openejb/trunk/openejb3/server/openejb-admin/pom.xml
    openejb/trunk/openejb3/server/openejb-discovery/src/
      - copied from r690620, openejb/trunk/openejb3/server/openejb-admin/src/
    openejb/trunk/openejb3/server/openejb-discovery/src/main/java/org/apache/openejb/server/
    openejb/trunk/openejb3/server/openejb-discovery/src/main/java/org/apache/openejb/server/discovery/
    openejb/trunk/openejb3/server/openejb-discovery/src/main/java/org/apache/openejb/server/discovery/DiscoveryAgent.java
    openejb/trunk/openejb3/server/openejb-discovery/src/main/java/org/apache/openejb/server/discovery/DiscoveryListener.java
    openejb/trunk/openejb3/server/openejb-discovery/src/main/java/org/apache/openejb/server/discovery/MulticastDiscoveryAgent.java
    openejb/trunk/openejb3/server/openejb-discovery/src/main/java/org/apache/openejb/server/discovery/MulticastSearch.java
    openejb/trunk/openejb3/server/openejb-discovery/src/main/resources/
    openejb/trunk/openejb3/server/openejb-discovery/src/main/resources/META-INF/
    openejb/trunk/openejb3/server/openejb-discovery/src/main/resources/META-INF/org.apache.openejb.server.ServersService/
    openejb/trunk/openejb3/server/openejb-discovery/src/main/resources/META-INF/org.apache.openejb.server.ServersService/multicast
    openejb/trunk/openejb3/server/openejb-discovery/src/test/java/org/apache/openejb/server/
    openejb/trunk/openejb3/server/openejb-discovery/src/test/java/org/apache/openejb/server/discovery/
    openejb/trunk/openejb3/server/openejb-discovery/src/test/java/org/apache/openejb/server/discovery/MulticastDiscoveryAgentTest.java
Removed:
    openejb/trunk/openejb3/server/openejb-client/src/main/java/org/apache/openejb/client/ConnectionFactoryStrategy.java
    openejb/trunk/openejb3/server/openejb-client/src/main/java/org/apache/openejb/client/StickToLastServerConnectionFactoryStrategy.java
    openejb/trunk/openejb3/server/openejb-client/src/test/java/org/apache/openejb/client/StickToLastServerConnectionFactoryStrategyTest.java
Modified:
    openejb/trunk/openejb3/server/openejb-client/src/main/java/org/apache/openejb/client/Client.java
    openejb/trunk/openejb3/server/openejb-client/src/main/java/org/apache/openejb/client/ConnectionFactory.java
    openejb/trunk/openejb3/server/openejb-client/src/main/java/org/apache/openejb/client/ConnectionManager.java
    openejb/trunk/openejb3/server/openejb-client/src/main/java/org/apache/openejb/client/HttpConnectionFactory.java
    openejb/trunk/openejb3/server/openejb-client/src/main/java/org/apache/openejb/client/ServerMetaData.java
    openejb/trunk/openejb3/server/openejb-client/src/main/java/org/apache/openejb/client/SocketConnectionFactory.java
    openejb/trunk/openejb3/server/openejb-client/src/test/java/org/apache/openejb/client/ConnectionManagerTest.java
    openejb/trunk/openejb3/server/openejb-server/src/main/java/org/apache/openejb/server/ServiceDaemon.java
    openejb/trunk/openejb3/server/pom.xml

Modified: openejb/trunk/openejb3/server/openejb-client/src/main/java/org/apache/openejb/client/Client.java
URL: http://svn.apache.org/viewvc/openejb/trunk/openejb3/server/openejb-client/src/main/java/org/apache/openejb/client/Client.java?rev=690629&r1=690628&r2=690629&view=diff
==============================================================================
--- openejb/trunk/openejb3/server/openejb-client/src/main/java/org/apache/openejb/client/Client.java (original)
+++ openejb/trunk/openejb3/server/openejb-client/src/main/java/org/apache/openejb/client/Client.java Sat Aug 30 20:23:03 2008
@@ -51,7 +51,7 @@
             /*----------------------------*/
             /* Get a connection to server */
             /*----------------------------*/
-            conn = server.connect(req);
+            conn = ConnectionManager.getConnection(server);
 
             /*----------------------------------*/
             /* Get output streams */

Modified: openejb/trunk/openejb3/server/openejb-client/src/main/java/org/apache/openejb/client/ConnectionFactory.java
URL: http://svn.apache.org/viewvc/openejb/trunk/openejb3/server/openejb-client/src/main/java/org/apache/openejb/client/ConnectionFactory.java?rev=690629&r1=690628&r2=690629&view=diff
==============================================================================
--- openejb/trunk/openejb3/server/openejb-client/src/main/java/org/apache/openejb/client/ConnectionFactory.java (original)
+++ openejb/trunk/openejb3/server/openejb-client/src/main/java/org/apache/openejb/client/ConnectionFactory.java Sat Aug 30 20:23:03 2008
@@ -21,8 +21,6 @@
 
 public interface ConnectionFactory {
 
-    public void init(Properties props);
-
     public Connection getConnection(URI uri) throws java.io.IOException;
 
 }

Modified: openejb/trunk/openejb3/server/openejb-client/src/main/java/org/apache/openejb/client/ConnectionManager.java
URL: http://svn.apache.org/viewvc/openejb/trunk/openejb3/server/openejb-client/src/main/java/org/apache/openejb/client/ConnectionManager.java?rev=690629&r1=690628&r2=690629&view=diff
==============================================================================
--- openejb/trunk/openejb3/server/openejb-client/src/main/java/org/apache/openejb/client/ConnectionManager.java (original)
+++ openejb/trunk/openejb3/server/openejb-client/src/main/java/org/apache/openejb/client/ConnectionManager.java Sat Aug 30 20:23:03 2008
@@ -17,88 +17,72 @@
 package org.apache.openejb.client;
 
 import java.io.IOException;
-import java.util.Properties;
 import java.net.URI;
 
 public class ConnectionManager {
 
-    private static ConnectionFactory factory;
+    private static Registry<ConnectionFactory> factories = Registry.create(ConnectionFactory.class);
+    private static Registry<ConnectionStrategy> strategies = Registry.create(ConnectionStrategy.class);
 
-    private static Class defaultFactoryClass = SocketConnectionFactory.class;
+    static {
+        SocketConnectionFactory ejbdFactory = new SocketConnectionFactory();
 
-    private static String factoryName;
+        factories.register("default", ejbdFactory);
+        factories.register("ejbd", ejbdFactory);
 
-    static {
-        try {
-            installFactory(defaultFactoryClass.getName());
-        } catch (Exception e) {
+        HttpConnectionFactory httpFactory = new HttpConnectionFactory();
+        factories.register("http", httpFactory);
+        factories.register("https", httpFactory);
 
-        }
+        strategies.register("default", new StickyConnectionStrategy());
     }
 
-    public static Connection getConnection(URI uri) throws IOException {
-        if (uri.getScheme().equals("http") || uri.getScheme().equals("https")) {
-            return new HttpConnectionFactory().getConnection(uri);
-        } else {
-            return factory.getConnection(uri);
-        }
-    }
 
-    public static void setFactory(String factoryName) throws IOException {
-        installFactory(factoryName);
-    }
+    public static Connection getConnection(ServerMetaData serverMetaData) throws IOException {
+        String name = serverMetaData.getConnectionStrategy();
 
-    public static void setFactory(ConnectionFactory factory) throws IOException {
-        if (null == factory) {
-            throw new IllegalArgumentException("factory is required");
-        }
-        ConnectionManager.factory = factory;
-        factoryName = factory.getClass().getName();
-    }
-    
-    public static ConnectionFactory getFactory() {
-        return factory;
-    }
+        if (name == null) name = "default";
+
+        ConnectionStrategy strategy = strategies.get(name);
 
-    public static String getFactoryName() {
-        return factoryName;
+        if (strategy == null) throw new IOException("Unsupported ConnectionStrategy  \"" + name + "\"");
+
+
+        return strategy.connect(serverMetaData);
     }
 
-    private static void installFactory(String factoryName) throws IOException {
+    public static Connection getConnection(URI uri) throws IOException {
+        String scheme = uri.getScheme();
 
-        Class factoryClass = null;
-        ConnectionFactory factory = null;
+        ConnectionFactory factory = factories.get(scheme);
 
-        try {
-            ClassLoader cl = getContextClassLoader();
-            factoryClass = Class.forName(factoryName, true, cl);
-        } catch (Exception e) {
-            throw new IOException("No ConnectionFactory Can be installed. Unable to load the class " + factoryName);
-        }
+        if (factory == null) throw new IOException("Unsupported ConnectionFactory URI scheme  \"" + scheme + "\"");
 
-        try {
-            factory = (ConnectionFactory) factoryClass.newInstance();
-        } catch (Exception e) {
-            throw new IOException("No ConnectionFactory Can be installed. Unable to instantiate the class " + factoryName);
-        }
+        return factory.getConnection(uri);
+    }
 
-        try {
+    public static void registerFactory(String scheme, ConnectionFactory factory) {
+        factories.register(scheme, factory);
+    }
 
-            factory.init(new Properties());
-        } catch (Exception e) {
-            throw new IOException("No ConnectionFactory Can be installed. Unable to initialize the class " + factoryName);
-        }
+    public static ConnectionFactory unregisterFactory(String scheme) {
+        return factories.unregister(scheme);
+    }
 
-        ConnectionManager.factory = factory;
-        ConnectionManager.factoryName = factoryName;
+    public static void registerStrategy(String scheme, ConnectionStrategy factory) {
+        strategies.register(scheme, factory);
     }
 
-    public static ClassLoader getContextClassLoader() {
-        return (ClassLoader) java.security.AccessController.doPrivileged(new java.security.PrivilegedAction() {
-            public Object run() {
-                return Thread.currentThread().getContextClassLoader();
-            }
-        });
+    public static ConnectionStrategy unregisterStrategy(String scheme) {
+        return strategies.unregister(scheme);
     }
 
+    /**
+     * @param factory
+     * @throws IOException
+     * @Depricated use register("default", factory);
+     */
+    public static void setFactory(ConnectionFactory factory) throws IOException {
+        registerFactory("default", factory);
+    }
 }

Copied: openejb/trunk/openejb3/server/openejb-client/src/main/java/org/apache/openejb/client/ConnectionStrategy.java (from r689515, openejb/trunk/openejb3/server/openejb-client/src/main/java/org/apache/openejb/client/ConnectionFactoryStrategy.java)
URL: http://svn.apache.org/viewvc/openejb/trunk/openejb3/server/openejb-client/src/main/java/org/apache/openejb/client/ConnectionStrategy.java?p2=openejb/trunk/openejb3/server/openejb-client/src/main/java/org/apache/openejb/client/ConnectionStrategy.java&p1=openejb/trunk/openejb3/server/openejb-client/src/main/java/org/apache/openejb/client/ConnectionFactoryStrategy.java&r1=689515&r2=690629&rev=690629&view=diff
==============================================================================
--- openejb/trunk/openejb3/server/openejb-client/src/main/java/org/apache/openejb/client/ConnectionFactoryStrategy.java (original)
+++ openejb/trunk/openejb3/server/openejb-client/src/main/java/org/apache/openejb/client/ConnectionStrategy.java Sat Aug 30 20:23:03 2008
@@ -16,10 +16,8 @@
  */
 package org.apache.openejb.client;
 
-import java.io.Serializable;
-import java.net.URI;
-import java.rmi.RemoteException;
+import java.io.IOException;
 
-public interface ConnectionFactoryStrategy extends Serializable {
-    Connection connect(URI[] locations, Request request) throws RemoteException;
+public interface ConnectionStrategy {
+    public Connection connect(ServerMetaData server) throws IOException;
 }

Modified: openejb/trunk/openejb3/server/openejb-client/src/main/java/org/apache/openejb/client/HttpConnectionFactory.java
URL: http://svn.apache.org/viewvc/openejb/trunk/openejb3/server/openejb-client/src/main/java/org/apache/openejb/client/HttpConnectionFactory.java?rev=690629&r1=690628&r2=690629&view=diff
==============================================================================
--- openejb/trunk/openejb3/server/openejb-client/src/main/java/org/apache/openejb/client/HttpConnectionFactory.java (original)
+++ openejb/trunk/openejb3/server/openejb-client/src/main/java/org/apache/openejb/client/HttpConnectionFactory.java Sat Aug 30 20:23:03 2008
@@ -31,9 +31,6 @@
  */
 public class HttpConnectionFactory implements ConnectionFactory {
 
-    public void init(Properties props) {
-    }
-
     public Connection getConnection(URI uri) throws IOException {
         return new HttpConnection(uri);
     }

Added: openejb/trunk/openejb3/server/openejb-client/src/main/java/org/apache/openejb/client/MulticastSearch.java
URL: http://svn.apache.org/viewvc/openejb/trunk/openejb3/server/openejb-client/src/main/java/org/apache/openejb/client/MulticastSearch.java?rev=690629&view=auto
==============================================================================
--- openejb/trunk/openejb3/server/openejb-client/src/main/java/org/apache/openejb/client/MulticastSearch.java (added)
+++ openejb/trunk/openejb3/server/openejb-client/src/main/java/org/apache/openejb/client/MulticastSearch.java Sat Aug 30 20:23:03 2008
@@ -0,0 +1,95 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ *  Unless required by applicable law or agreed to in writing, software
+ *  distributed under the License is distributed on an "AS IS" BASIS,
+ *  WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ *  See the License for the specific language governing permissions and
+ *  limitations under the License.
+ */
+package org.apache.openejb.client;
+
+import java.net.URI;
+import java.net.MulticastSocket;
+import java.net.InetAddress;
+import java.net.DatagramPacket;
+import java.util.concurrent.TimeUnit;
+import java.io.IOException;
+
+/**
+ * @version $Rev$ $Date$
+ */
+public class MulticastSearch {
+
+    private static final int BUFF_SIZE = 8192;
+
+    private final MulticastSocket multicast;
+
+    public MulticastSearch() throws IOException {
+        this("239.255.2.3", 6142);
+    }
+
+    public MulticastSearch(String host, int port) throws IOException {
+        InetAddress inetAddress = InetAddress.getByName(host);
+
+        multicast = new MulticastSocket(port);
+        multicast.joinGroup(inetAddress);
+        multicast.setSoTimeout(500);
+    }
+
+    public URI search(int timeout, TimeUnit milliseconds) throws IOException {
+        return search(new DefaultFilter(), timeout, milliseconds);
+    }
+
+    public URI search() throws IOException {
+        return search(new DefaultFilter(), 0, TimeUnit.MILLISECONDS);
+    }
+
+    public URI search(Filter filter) throws IOException {
+        return search(filter, 0, TimeUnit.MILLISECONDS);
+    }
+
+    public URI search(Filter filter, long timeout, TimeUnit unit) throws IOException {
+        timeout = unit.convert(timeout, TimeUnit.MILLISECONDS);
+        long waited = 0;
+
+        byte[] buf = new byte[BUFF_SIZE];
+        DatagramPacket packet = new DatagramPacket(buf, 0, buf.length);
+
+        while (timeout == 0 || waited < timeout){
+            long start = System.currentTimeMillis();
+            try {
+                multicast.receive(packet);
+                if (packet.getLength() > 0) {
+                    String str = new String(packet.getData(), packet.getOffset(), packet.getLength());
+                    URI service = URI.create(str);
+                    if (service != null && filter.accept(service)) {
+                        return service;
+                    }
+                }
+            } finally {
+                long stop = System.currentTimeMillis();
+                waited += stop - start;
+            }
+        }
+
+        return null;
+    }
+
+    public interface Filter {
+        boolean accept(URI service);
+    }
+
+    public static class DefaultFilter implements Filter {
+        public boolean accept(URI service) {
+            return true;
+        }
+    }
+}

Added: openejb/trunk/openejb3/server/openejb-client/src/main/java/org/apache/openejb/client/Registry.java
URL: http://svn.apache.org/viewvc/openejb/trunk/openejb3/server/openejb-client/src/main/java/org/apache/openejb/client/Registry.java?rev=690629&view=auto
==============================================================================
--- openejb/trunk/openejb3/server/openejb-client/src/main/java/org/apache/openejb/client/Registry.java (added)
+++ openejb/trunk/openejb3/server/openejb-client/src/main/java/org/apache/openejb/client/Registry.java Sat Aug 30 20:23:03 2008
@@ -0,0 +1,88 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ *  Unless required by applicable law or agreed to in writing, software
+ *  distributed under the License is distributed on an "AS IS" BASIS,
+ *  WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ *  See the License for the specific language governing permissions and
+ *  limitations under the License.
+ */
+package org.apache.openejb.client;
+
+import java.util.Map;
+import java.util.HashMap;
+import java.util.concurrent.ConcurrentHashMap;
+import java.io.IOException;
+
+/**
+ * @version $Rev$ $Date$
+ */
+public class Registry<T> {
+
+    private Map<String, T> components = new ConcurrentHashMap<String, T>();
+
+    private Map<String, Class> available;
+
+    private String componentType;
+
+
+    public static <T> Registry<T> create(Class<T> type){
+        return new Registry(type);
+    }
+
+    private Registry(Class<T> type) {
+        componentType = type.getSimpleName();
+
+        try {
+            ResourceFinder resourceFinder = new ResourceFinder("META-INF/");
+            available = resourceFinder.mapAvailableImplementations(type);
+        } catch (IOException e) {
+            available = new HashMap();
+        }
+    }
+
+    public void register(String scheme, T factory) {
+        components.put(scheme, factory);
+    }
+
+    public T unregister(String scheme) {
+        if ("default".equals(scheme)) {
+            throw new IllegalArgumentException("Cannot uninstall the default " + componentType);
+        }
+        return components.remove(scheme);
+    }
+
+    public T get(String scheme) {
+        T factory = components.get(scheme);
+
+        if (factory == null) {
+            factory = load(scheme);
+        }
+
+        return factory;
+    }
+
+    private T load(String scheme) {
+
+        Class clazz = available.get(scheme);
+
+        if (clazz == null) return null;
+
+        try {
+            T factory = (T) clazz.newInstance();
+
+            components.put(scheme, factory);
+
+            return factory;
+        } catch (Exception e) {
+            throw new IllegalStateException(componentType + " cannot be installed.  Unable to instantiate the class " + clazz.getName() + " for scheme " + scheme);
+        }
+    }
+}

Modified: openejb/trunk/openejb3/server/openejb-client/src/main/java/org/apache/openejb/client/ServerMetaData.java
URL: http://svn.apache.org/viewvc/openejb/trunk/openejb3/server/openejb-client/src/main/java/org/apache/openejb/client/ServerMetaData.java?rev=690629&r1=690628&r2=690629&view=diff
==============================================================================
--- openejb/trunk/openejb3/server/openejb-client/src/main/java/org/apache/openejb/client/ServerMetaData.java (original)
+++ openejb/trunk/openejb3/server/openejb-client/src/main/java/org/apache/openejb/client/ServerMetaData.java Sat Aug 30 20:23:03 2008
@@ -21,13 +21,12 @@
 import java.io.ObjectInput;
 import java.io.ObjectOutput;
 import java.net.URI;
-import java.rmi.RemoteException;
 import java.util.Arrays;
 
 public class ServerMetaData implements Externalizable {
 
     private transient URI[] locations;
-    private transient ConnectionFactoryStrategy connectionFactoryStrategy;
+    private transient String connectionStrategy;
 
     public ServerMetaData() {
     }
@@ -36,10 +35,19 @@
         this.locations = locations;
     }
 
+    public ServerMetaData(String connectionStrategy, URI ... locations)  {
+        this.connectionStrategy = connectionStrategy;
+        this.locations = locations;
+    }
+
     public void merge(ServerMetaData toMerge) {
         locations = toMerge.locations;
     }
-    
+
+    public String getConnectionStrategy() {
+        return connectionStrategy;
+    }
+
     public URI[] getLocations() {
         return locations;
     }
@@ -52,28 +60,21 @@
         return locationsHash;
     }
 
-    public Connection connect(Request request) throws RemoteException {
-        ConnectionFactoryStrategy factoryStrategy = getConnectionFactoryStrategy();
-        return factoryStrategy.connect(locations, request);
-    }
-    
-    protected ConnectionFactoryStrategy getConnectionFactoryStrategy() {
-        if (null == connectionFactoryStrategy) {
-            connectionFactoryStrategy = new StickToLastServerConnectionFactoryStrategy(); 
+    public void readExternal(ObjectInput in) throws IOException, ClassNotFoundException {
+        byte version = in.readByte();
+
+        if (version > 1){
+            connectionStrategy = (String) in.readObject();
         }
-        return connectionFactoryStrategy;
-    }
 
-    public void readExternal(ObjectInput in) throws IOException, ClassNotFoundException {
-        byte version = in.readByte(); // future use
-        
         locations = (URI[]) in.readObject();
     }
 
     public void writeExternal(ObjectOutput out) throws IOException {
         // write out the version of the serialized data for future use
-        out.writeByte(1);
+        out.writeByte(2);
 
+        out.writeObject(connectionStrategy);
         out.writeObject(locations);
     }
 

Modified: openejb/trunk/openejb3/server/openejb-client/src/main/java/org/apache/openejb/client/SocketConnectionFactory.java
URL: http://svn.apache.org/viewvc/openejb/trunk/openejb3/server/openejb-client/src/main/java/org/apache/openejb/client/SocketConnectionFactory.java?rev=690629&r1=690628&r2=690629&view=diff
==============================================================================
--- openejb/trunk/openejb3/server/openejb-client/src/main/java/org/apache/openejb/client/SocketConnectionFactory.java (original)
+++ openejb/trunk/openejb3/server/openejb-client/src/main/java/org/apache/openejb/client/SocketConnectionFactory.java Sat Aug 30 20:23:03 2008
@@ -38,9 +38,6 @@
 
     private static Map<URI, SocketConnection> connections = new ConcurrentHashMap<URI, SocketConnection>();
 
-    public void init(Properties props) {
-    }
-
     public Connection getConnection(URI uri) throws java.io.IOException {
 
         SocketConnection conn = connections.get(uri);

Copied: openejb/trunk/openejb3/server/openejb-client/src/main/java/org/apache/openejb/client/StickyConnectionStrategy.java (from r689515, openejb/trunk/openejb3/server/openejb-client/src/main/java/org/apache/openejb/client/StickToLastServerConnectionFactoryStrategy.java)
URL: http://svn.apache.org/viewvc/openejb/trunk/openejb3/server/openejb-client/src/main/java/org/apache/openejb/client/StickyConnectionStrategy.java?p2=openejb/trunk/openejb3/server/openejb-client/src/main/java/org/apache/openejb/client/StickyConnectionStrategy.java&p1=openejb/trunk/openejb3/server/openejb-client/src/main/java/org/apache/openejb/client/StickToLastServerConnectionFactoryStrategy.java&r1=689515&r2=690629&rev=690629&view=diff
==============================================================================
--- openejb/trunk/openejb3/server/openejb-client/src/main/java/org/apache/openejb/client/StickToLastServerConnectionFactoryStrategy.java (original)
+++ openejb/trunk/openejb3/server/openejb-client/src/main/java/org/apache/openejb/client/StickyConnectionStrategy.java Sat Aug 30 20:23:03 2008
@@ -22,12 +22,13 @@
 import java.util.logging.Level;
 import java.util.logging.Logger;
 
-public class StickToLastServerConnectionFactoryStrategy implements ConnectionFactoryStrategy {
+public class StickyConnectionStrategy implements ConnectionStrategy {
     private static final Logger LOGGER = Logger.getLogger("OpenEJB.client");
 
     private URI lastLocation;
     
-    public Connection connect(URI[] locations, Request request) throws RemoteException {
+    public Connection connect(ServerMetaData server) throws IOException {
+        URI[] locations = server.getLocations();
         if (null != lastLocation) {
             for (int i = 0; i < locations.length; i++) {
                 if (locations[i].equals(lastLocation)) {
@@ -39,7 +40,7 @@
                 }
             }
         }
-        
+
         Connection connection = null;
         for (int i = 0; i < locations.length; i++) {
             URI uri = locations[i];
@@ -53,10 +54,11 @@
                 throw new RemoteException("Cannot connect to server: " + uri.getHost() + ":" + uri.getPort() + " due to an unkown exception in the OpenEJB client: ", e);
             }
         }
+        
         if (null != connection) {
             return connection;
         }
-        
+
         // If no servers responded, throw an error
         StringBuffer buffer = new StringBuffer();
         for (int i = 0; i < locations.length; i++) {

Modified: openejb/trunk/openejb3/server/openejb-client/src/test/java/org/apache/openejb/client/ConnectionManagerTest.java
URL: http://svn.apache.org/viewvc/openejb/trunk/openejb3/server/openejb-client/src/test/java/org/apache/openejb/client/ConnectionManagerTest.java?rev=690629&r1=690628&r2=690629&view=diff
==============================================================================
--- openejb/trunk/openejb3/server/openejb-client/src/test/java/org/apache/openejb/client/ConnectionManagerTest.java (original)
+++ openejb/trunk/openejb3/server/openejb-client/src/test/java/org/apache/openejb/client/ConnectionManagerTest.java Sat Aug 30 20:23:03 2008
@@ -17,6 +17,8 @@
 package org.apache.openejb.client;
 
 import java.io.IOException;
+import java.io.InputStream;
+import java.io.OutputStream;
 import java.net.URI;
 import java.util.Properties;
 
@@ -25,24 +27,34 @@
 
 public class ConnectionManagerTest extends TestCase{
 
-    public void testSetConnectionFactory() throws Exception {
+    public void testRegisterFactory() throws Exception {
         MockConnectionFactory connectionFactory = new MockConnectionFactory();
-        ConnectionManager.setFactory(connectionFactory);
-        
-        assertSame(connectionFactory, ConnectionManager.getFactory());
-        assertEquals(connectionFactory.getClass().getName(), ConnectionManager.getFactoryName());
+        ConnectionManager.registerFactory("mock", connectionFactory);
+
+        Connection connection = ConnectionManager.getConnection(new URI("mock://foo"));
+
+        assertTrue(connection instanceof MockConnection);
     }
     
     public static class MockConnectionFactory implements ConnectionFactory {
 
         public Connection getConnection(URI uri) throws IOException {
+            return new MockConnection();
+        }
+
+    }
+
+    private static class MockConnection implements Connection {
+        public void close() throws IOException {
             throw new UnsupportedOperationException();
         }
 
-        public void init(Properties props) {
+        public InputStream getInputStream() throws IOException {
+            throw new UnsupportedOperationException();
+        }
+
+        public OutputStream getOuputStream() throws IOException {
             throw new UnsupportedOperationException();
         }
-        
     }
-    
 }

Copied: openejb/trunk/openejb3/server/openejb-client/src/test/java/org/apache/openejb/client/StickyConnectionStrategyTest.java (from r689515, openejb/trunk/openejb3/server/openejb-client/src/test/java/org/apache/openejb/client/StickToLastServerConnectionFactoryStrategyTest.java)
URL: http://svn.apache.org/viewvc/openejb/trunk/openejb3/server/openejb-client/src/test/java/org/apache/openejb/client/StickyConnectionStrategyTest.java?p2=openejb/trunk/openejb3/server/openejb-client/src/test/java/org/apache/openejb/client/StickyConnectionStrategyTest.java&p1=openejb/trunk/openejb3/server/openejb-client/src/test/java/org/apache/openejb/client/StickToLastServerConnectionFactoryStrategyTest.java&r1=689515&r2=690629&rev=690629&view=diff
==============================================================================
--- openejb/trunk/openejb3/server/openejb-client/src/test/java/org/apache/openejb/client/StickToLastServerConnectionFactoryStrategyTest.java (original)
+++ openejb/trunk/openejb3/server/openejb-client/src/test/java/org/apache/openejb/client/StickyConnectionStrategyTest.java Sat Aug 30 20:23:03 2008
@@ -23,18 +23,18 @@
 import com.agical.rmock.extension.junit.RMockTestCase;
 
 
-public class StickToLastServerConnectionFactoryStrategyTest extends RMockTestCase {
+public class StickyConnectionStrategyTest extends RMockTestCase {
 
-    private StickToLastServerConnectionFactoryStrategy factoryStrategy;
+    private StickyConnectionStrategy factoryStrategy;
     private ConnectionFactory connectionFactory;
     private URI[] locations;
 
     @Override
     protected void setUp() throws Exception {
         connectionFactory = (ConnectionFactory) mock(ConnectionFactory.class);
-        ConnectionManager.setFactory(connectionFactory);
+        ConnectionManager.registerFactory("ejbd", connectionFactory);
 
-        factoryStrategy = new StickToLastServerConnectionFactoryStrategy();
+        factoryStrategy = new StickyConnectionStrategy();
         
         URI uri1 = new URI("ejbd://localhost:4201");
         URI uri2 = new URI("ejbd://localhost:4202");
@@ -51,9 +51,10 @@
         startVerification();
 
         try {
-            factoryStrategy.connect(locations, null);
+            ServerMetaData server = new ServerMetaData(locations);
+            factoryStrategy.connect(server);
             fail();
-        } catch (RemoteException e) {
+        } catch (IOException e) {
         }
     }
     
@@ -65,7 +66,8 @@
         
         startVerification();
 
-        Connection actualConnection = factoryStrategy.connect(locations, null);
+        ServerMetaData server = new ServerMetaData(locations);
+        Connection actualConnection = factoryStrategy.connect(server);
         assertSame(expectedConnection, actualConnection);
     }
     
@@ -78,8 +80,9 @@
         
         startVerification();
 
-        factoryStrategy.connect(locations, null);
-        factoryStrategy.connect(locations, null);
+        ServerMetaData server = new ServerMetaData(locations);
+        factoryStrategy.connect(server);
+        factoryStrategy.connect(server);
     }
     
 }

Copied: openejb/trunk/openejb3/server/openejb-discovery/pom.xml (from r690620, openejb/trunk/openejb3/server/openejb-admin/pom.xml)
URL: http://svn.apache.org/viewvc/openejb/trunk/openejb3/server/openejb-discovery/pom.xml?p2=openejb/trunk/openejb3/server/openejb-discovery/pom.xml&p1=openejb/trunk/openejb3/server/openejb-admin/pom.xml&r1=690620&r2=690629&rev=690629&view=diff
==============================================================================
--- openejb/trunk/openejb3/server/openejb-admin/pom.xml (original)
+++ openejb/trunk/openejb3/server/openejb-discovery/pom.xml Sat Aug 30 20:23:03 2008
@@ -26,9 +26,9 @@
     <version>3.1-SNAPSHOT</version>
     </parent>
   <modelVersion>4.0.0</modelVersion>
-  <artifactId>openejb-admin</artifactId>
+  <artifactId>openejb-discovery</artifactId>
   <packaging>jar</packaging>
-  <name>OpenEJB :: Server :: Admin</name>
+  <name>OpenEJB :: Server :: Discovery</name>
   <dependencies>
     <dependency>
       <groupId>org.apache.openejb</groupId>

Added: openejb/trunk/openejb3/server/openejb-discovery/src/main/java/org/apache/openejb/server/discovery/DiscoveryAgent.java
URL: http://svn.apache.org/viewvc/openejb/trunk/openejb3/server/openejb-discovery/src/main/java/org/apache/openejb/server/discovery/DiscoveryAgent.java?rev=690629&view=auto
==============================================================================
--- openejb/trunk/openejb3/server/openejb-discovery/src/main/java/org/apache/openejb/server/discovery/DiscoveryAgent.java (added)
+++ openejb/trunk/openejb3/server/openejb-discovery/src/main/java/org/apache/openejb/server/discovery/DiscoveryAgent.java Sat Aug 30 20:23:03 2008
@@ -0,0 +1,53 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ *  Unless required by applicable law or agreed to in writing, software
+ *  distributed under the License is distributed on an "AS IS" BASIS,
+ *  WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ *  See the License for the specific language governing permissions and
+ *  limitations under the License.
+ */
+package org.apache.openejb.server.discovery;
+
+import java.net.URI;
+import java.io.IOException;
+
+/**
+ * @version $Rev$ $Date$
+ */
+public interface DiscoveryAgent {
+    /**
+     * Sets the discovery listener
+     * @param listener
+     */
+    void setDiscoveryListener(DiscoveryListener listener);
+
+    /**
+     * register a service
+     * @param serviceUri
+     * @param details
+     */
+    void registerService(URI serviceUri) throws IOException;
+
+    /**
+     * register a service
+     * @param serviceUri
+     * @param details
+     */
+    void unregisterService(URI serviceUri) throws IOException;
+
+    /**
+     * A process actively using a service may see it go down before the DiscoveryAgent notices the
+     * service's failure.  That process can use this method to notify the DiscoveryAgent of the failure
+     * so that other listeners of this DiscoveryAgent can also be made aware of the failure.
+     */
+    void reportFailed(URI serviceUri) throws IOException;
+
+}

Added: openejb/trunk/openejb3/server/openejb-discovery/src/main/java/org/apache/openejb/server/discovery/DiscoveryListener.java
URL: http://svn.apache.org/viewvc/openejb/trunk/openejb3/server/openejb-discovery/src/main/java/org/apache/openejb/server/discovery/DiscoveryListener.java?rev=690629&view=auto
==============================================================================
--- openejb/trunk/openejb3/server/openejb-discovery/src/main/java/org/apache/openejb/server/discovery/DiscoveryListener.java (added)
+++ openejb/trunk/openejb3/server/openejb-discovery/src/main/java/org/apache/openejb/server/discovery/DiscoveryListener.java Sat Aug 30 20:23:03 2008
@@ -0,0 +1,30 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ *  Unless required by applicable law or agreed to in writing, software
+ *  distributed under the License is distributed on an "AS IS" BASIS,
+ *  WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ *  See the License for the specific language governing permissions and
+ *  limitations under the License.
+ */
+package org.apache.openejb.server.discovery;
+
+
+import java.net.URI;
+import java.io.IOException;
+
+/**
+ * @version $Rev$ $Date$
+ */
+public interface DiscoveryListener {
+    public void serviceAdded(URI service);
+    public void serviceRemoved(URI service);
+
+}

Added: openejb/trunk/openejb3/server/openejb-discovery/src/main/java/org/apache/openejb/server/discovery/MulticastDiscoveryAgent.java
URL: http://svn.apache.org/viewvc/openejb/trunk/openejb3/server/openejb-discovery/src/main/java/org/apache/openejb/server/discovery/MulticastDiscoveryAgent.java?rev=690629&view=auto
==============================================================================
--- openejb/trunk/openejb3/server/openejb-discovery/src/main/java/org/apache/openejb/server/discovery/MulticastDiscoveryAgent.java (added)
+++ openejb/trunk/openejb3/server/openejb-discovery/src/main/java/org/apache/openejb/server/discovery/MulticastDiscoveryAgent.java Sat Aug 30 20:23:03 2008
@@ -0,0 +1,471 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ *  Unless required by applicable law or agreed to in writing, software
+ *  distributed under the License is distributed on an "AS IS" BASIS,
+ *  WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ *  See the License for the specific language governing permissions and
+ *  limitations under the License.
+ */
+package org.apache.openejb.server.discovery;
+
+import static org.apache.openejb.server.ServiceDaemon.getBoolean;
+import static org.apache.openejb.server.ServiceDaemon.getLong;
+import static org.apache.openejb.server.ServiceDaemon.getInt;
+import org.apache.openejb.server.SelfManaging;
+import org.apache.openejb.server.ServerService;
+import org.apache.openejb.server.ServiceException;
+import org.apache.openejb.server.ServiceDaemon;
+import org.apache.openejb.util.LogCategory;
+import org.apache.openejb.util.Logger;
+
+import java.io.IOException;
+import java.io.InputStream;
+import java.io.OutputStream;
+import java.net.DatagramPacket;
+import java.net.InetAddress;
+import java.net.InetSocketAddress;
+import java.net.MulticastSocket;
+import java.net.Socket;
+import java.net.SocketAddress;
+import java.net.SocketTimeoutException;
+import java.net.URI;
+import java.net.URISyntaxException;
+import java.util.Map;
+import java.util.Properties;
+import java.util.Timer;
+import java.util.TimerTask;
+import java.util.concurrent.ConcurrentHashMap;
+import java.util.concurrent.Executor;
+import java.util.concurrent.LinkedBlockingQueue;
+import java.util.concurrent.ThreadFactory;
+import java.util.concurrent.ThreadPoolExecutor;
+import java.util.concurrent.TimeUnit;
+import java.util.concurrent.atomic.AtomicBoolean;
+
+/**
+ * @version $Rev$ $Date$
+ */
+public class MulticastDiscoveryAgent implements DiscoveryAgent, ServerService, SelfManaging {
+
+    private static final Logger log = Logger.getInstance(LogCategory.OPENEJB_SERVER.createChild("discovery"), MulticastDiscoveryAgent.class);
+
+    private static final int BUFF_SIZE = 8192;
+
+
+    private AtomicBoolean started = new AtomicBoolean(false);
+    private MulticastSocket multicast;
+
+    private String host = "239.255.3.2";
+    private int port = 6142;
+
+    private int timeToLive = 1;
+    private boolean loopbackMode = false;
+    private SocketAddress address;
+
+    private Map<String, Service> registeredServices = new ConcurrentHashMap<String, Service>();
+
+    private int maxMissedHeartbeats = 10;
+    private long heartRate = 500;
+
+    private Listener listener;
+
+    public MulticastDiscoveryAgent() {
+        listener = new Listener();
+    }
+
+    // ---------------------------------
+    // Listenting specific settings
+    private long initialReconnectDelay = 1000 * 5;
+    private long maxReconnectDelay = 1000 * 30;
+    private long backOffMultiplier = 0;
+    private boolean useExponentialBackOff;
+    private int maxReconnectAttempts = 10; // todo: check this out
+    // ---------------------------------
+
+
+    public void init(Properties props) throws Exception {
+
+        host = props.getProperty("bind", host);
+
+        port = getInt(props, "port", port);
+
+        heartRate = getLong(props, "heart_rate", heartRate);
+        maxMissedHeartbeats = getInt(props, "max_missed_heartbeats", maxMissedHeartbeats);
+        loopbackMode = getBoolean(props, "max_missed_heartbeats", loopbackMode);
+
+        initialReconnectDelay = getLong(props, "reconnect_delay", initialReconnectDelay);
+        maxReconnectDelay = getLong(props, "max_reconnect_delay", initialReconnectDelay);
+        maxReconnectAttempts = getInt(props, "max_reconnect_attempts", maxReconnectAttempts);
+        backOffMultiplier = getLong(props, "exponential_backoff", backOffMultiplier);
+
+        useExponentialBackOff = (backOffMultiplier > 1);
+    }
+
+    public String getIP() {
+        return host;
+    }
+
+    public String getName() {
+        return "multicast";
+    }
+
+    public int getPort() {
+        return port;
+    }
+
+    public void setDiscoveryListener(DiscoveryListener listener) {
+        this.listener.setDiscoveryListener(listener);
+    }
+
+    public void registerService(URI serviceUri) throws IOException {
+        Service service = new Service(serviceUri);
+        this.registeredServices.put(service.uriString, service);
+    }
+
+    public void unregisterService(URI serviceUri) throws IOException {
+        Service service = new Service(serviceUri);
+        this.registeredServices.remove(service.uriString);
+    }
+
+    public void reportFailed(URI serviceUri) throws IOException {
+        listener.reportFailed(serviceUri);
+    }
+
+
+    private boolean isSelf(Service service) {
+        return isSelf(service.uriString);
+    }
+
+    private boolean isSelf(String service) {
+        return registeredServices.keySet().contains(service);
+    }
+
+    public static void main(String[] args) throws Exception {
+    }
+
+    /**
+     * start the discovery agent
+     *
+     * @throws Exception
+     */
+    public void start() throws ServiceException {
+        try {
+            if (started.compareAndSet(false, true)) {
+
+                InetAddress inetAddress = InetAddress.getByName(host);
+
+                this.address = new InetSocketAddress(inetAddress, port);
+
+                multicast = new MulticastSocket(port);
+                multicast.setLoopbackMode(loopbackMode);
+                multicast.setTimeToLive(timeToLive);
+                multicast.joinGroup(inetAddress);
+                multicast.setSoTimeout((int) heartRate);
+
+                Thread listenerThread = new Thread(listener);
+                listenerThread.setName("MulticastDiscovery: Listener");
+                listenerThread.setDaemon(true);
+                listenerThread.start();
+
+                Broadcaster broadcaster = new Broadcaster();
+
+                Timer timer = new Timer("MulticastDiscovery: Broadcaster", true);
+                timer.scheduleAtFixedRate(broadcaster, 0, heartRate);
+            }
+        } catch (Exception e) {
+            throw new ServiceException(e);
+        }
+    }
+
+    /**
+     * stop the channel
+     *
+     * @throws Exception
+     */
+    public void stop() throws ServiceException {
+        if (started.compareAndSet(true, false)) {
+            multicast.close();
+        }
+    }
+
+    public void service(InputStream in, OutputStream out) throws ServiceException, IOException {
+    }
+
+    public void service(Socket socket) throws ServiceException, IOException {
+    }
+
+    class Service {
+        private final URI uri;
+        private final String uriString;
+
+        public Service(URI uri) {
+            this.uri = uri;
+            this.uriString = uri.toString();
+        }
+
+        public Service(String uriString) throws URISyntaxException {
+            this(new URI(uriString));
+        }
+    }
+
+    private class ServiceVitals {
+
+        private final Service service;
+
+        private long lastHeartBeat;
+        private long recoveryTime;
+        private int failureCount;
+        private boolean dead;
+
+        public ServiceVitals(Service service) {
+            this.service = service;
+            this.lastHeartBeat = System.currentTimeMillis();
+        }
+
+        public synchronized void heartbeat() {
+            lastHeartBeat = System.currentTimeMillis();
+
+            // Consider that the service recovery has succeeded if it has not
+            // failed in 60 seconds.
+            if (!dead && failureCount > 0 && (lastHeartBeat - recoveryTime) > 1000 * 60) {
+                if (log.isDebugEnabled()) {
+                    log.debug("I now think that the " + service + " service has recovered.");
+                }
+                failureCount = 0;
+                recoveryTime = 0;
+            }
+        }
+
+        public synchronized long getLastHeartbeat() {
+            return lastHeartBeat;
+        }
+
+        public synchronized boolean pronounceDead() {
+            if (!dead) {
+                dead = true;
+                failureCount++;
+
+                long reconnectDelay;
+                if (useExponentialBackOff) {
+                    reconnectDelay = (long) Math.pow(backOffMultiplier, failureCount);
+                    if (reconnectDelay > maxReconnectDelay) {
+                        reconnectDelay = maxReconnectDelay;
+                    }
+                } else {
+                    reconnectDelay = initialReconnectDelay;
+                }
+
+                if (log.isDebugEnabled()) {
+                    log.debug("Remote failure of " + service + " while still receiving multicast advertisements.  " +
+                            "Advertising events will be suppressed for " + reconnectDelay
+                            + " ms, the current failure count is: " + failureCount);
+                }
+
+                recoveryTime = System.currentTimeMillis() + reconnectDelay;
+                return true;
+            }
+            return false;
+        }
+
+        /**
+         * @return true if this broker is marked failed and it is now the right
+         *         time to start recovery.
+         */
+        public synchronized boolean doRecovery() {
+            if (!dead) {
+                return false;
+            }
+
+            // Are we done trying to recover this guy?
+            if (maxReconnectAttempts > 0 && failureCount > maxReconnectAttempts) {
+                if (log.isDebugEnabled()) {
+                    log.debug("Max reconnect attempts of the " + service + " service has been reached.");
+                }
+                return false;
+            }
+
+            // Is it not yet time?
+            if (System.currentTimeMillis() < recoveryTime) {
+                return false;
+            }
+
+            if (log.isDebugEnabled()) {
+                log.debug("Resuming event advertisement of the " + service + " service.");
+            }
+            dead = false;
+            return true;
+        }
+
+        public boolean isDead() {
+            return dead;
+        }
+    }
+
+
+    class Listener implements Runnable {
+        private Map<String, ServiceVitals> discoveredServices = new ConcurrentHashMap<String, ServiceVitals>();
+        private DiscoveryListener discoveryListener;
+
+        public void setDiscoveryListener(DiscoveryListener discoveryListener) {
+            this.discoveryListener = discoveryListener;
+        }
+
+        public void run() {
+            byte[] buf = new byte[BUFF_SIZE];
+            DatagramPacket packet = new DatagramPacket(buf, 0, buf.length);
+            while (started.get()) {
+                checkServices();
+                try {
+                    multicast.receive(packet);
+                    if (packet.getLength() > 0) {
+                        String str = new String(packet.getData(), packet.getOffset(), packet.getLength());
+//                        System.out.println("read = " + str);
+                        processData(str);
+                    }
+                } catch (SocketTimeoutException se) {
+                    // ignore
+                } catch (IOException e) {
+                    if (started.get()) {
+                        log.error("failed to process packet: " + e);
+                    }
+                }
+            }
+        }
+
+        private void processData(String uriString) {
+            if (discoveryListener == null) {
+                return;
+            }
+            if (isSelf(uriString)) {
+                return;
+            }
+
+            ServiceVitals vitals = discoveredServices.get(uriString);
+
+            if (vitals == null) {
+                try {
+                    vitals = new ServiceVitals(new Service(uriString));
+
+                    discoveredServices.put(uriString, vitals);
+
+                    fireServiceAddEvent(vitals.service.uri);
+                } catch (URISyntaxException e) {
+                    // don't continuously log this
+                }
+
+            } else {
+                vitals.heartbeat();
+
+                if (vitals.doRecovery()) {
+                    fireServiceAddEvent(vitals.service.uri);
+                }
+            }
+        }
+
+        private void checkServices() {
+            long expireTime = System.currentTimeMillis() - (heartRate * maxMissedHeartbeats);
+            for (ServiceVitals serviceVitals : discoveredServices.values()) {
+                if (serviceVitals.getLastHeartbeat() < expireTime && !isSelf(serviceVitals.service)) {
+
+                    ServiceVitals vitals = discoveredServices.remove(serviceVitals.service.uriString);
+                    if (vitals != null && !vitals.isDead()) {
+                        fireServiceRemovedEvent(vitals.service.uri);
+                    }
+                }
+            }
+        }
+
+        private final Executor executor = new ThreadPoolExecutor(1, 1, 30, TimeUnit.SECONDS, new LinkedBlockingQueue<Runnable>(), new ThreadFactory() {
+            public Thread newThread(Runnable runable) {
+                Thread t = new Thread(runable, "Multicast Discovery Agent Notifier");
+                t.setDaemon(true);
+                return t;
+            }
+        });
+
+        private void fireServiceRemovedEvent(final URI uri) {
+            if (discoveryListener != null) {
+                final DiscoveryListener discoveryListener = this.discoveryListener;
+
+                // Have the listener process the event async so that
+                // he does not block this thread since we are doing time sensitive
+                // processing of events.
+                executor.execute(new Runnable() {
+                    public void run() {
+                        if (discoveryListener != null) {
+                            discoveryListener.serviceRemoved(uri);
+                        }
+                    }
+                });
+            }
+        }
+
+        private void fireServiceAddEvent(final URI uri) {
+            if (discoveryListener != null) {
+                final DiscoveryListener discoveryListener = this.discoveryListener;
+
+                // Have the listener process the event async so that
+                // he does not block this thread since we are doing time sensitive
+                // processing of events.
+                executor.execute(new Runnable() {
+                    public void run() {
+                        if (discoveryListener != null) {
+                            discoveryListener.serviceAdded(uri);
+                        }
+                    }
+                });
+            }
+        }
+
+        public void reportFailed(URI serviceUri) {
+            final Service service = new Service(serviceUri);
+            ServiceVitals serviceVitals = discoveredServices.get(service.uriString);
+            if (serviceVitals != null && serviceVitals.pronounceDead()) {
+                fireServiceRemovedEvent(service.uri);
+            }
+        }
+    }
+
+    class Broadcaster extends TimerTask {
+        private IOException failed;
+
+        public void run() {
+            if (started.get()) {
+                heartbeat();
+            }
+        }
+
+        private void heartbeat() {
+            for (String uri : registeredServices.keySet()) {
+                try {
+                    byte[] data = uri.getBytes();
+                    DatagramPacket packet = new DatagramPacket(data, 0, data.length, address);
+//                    System.out.println("ann = " + uri);
+                    multicast.send(packet);
+                } catch (IOException e) {
+                    // If a send fails, chances are all subsequent sends will fail
+                    // too.. No need to keep reporting the
+                    // same error over and over.
+                    if (failed == null) {
+                        failed = e;
+
+                        log.error("Failed to advertise our service: " + uri, e);
+                        if ("Operation not permitted".equals(e.getMessage())) {
+                            log.error("The 'Operation not permitted' error has been know to be caused by improper firewall/network setup.  "
+                                    + "Please make sure that the OS is properly configured to allow multicast traffic over: " + multicast.getLocalAddress());
+                        }
+                    }
+                }
+            }
+        }
+    }
+
+}

Added: openejb/trunk/openejb3/server/openejb-discovery/src/main/java/org/apache/openejb/server/discovery/MulticastSearch.java
URL: http://svn.apache.org/viewvc/openejb/trunk/openejb3/server/openejb-discovery/src/main/java/org/apache/openejb/server/discovery/MulticastSearch.java?rev=690629&view=auto
==============================================================================
--- openejb/trunk/openejb3/server/openejb-discovery/src/main/java/org/apache/openejb/server/discovery/MulticastSearch.java (added)
+++ openejb/trunk/openejb3/server/openejb-discovery/src/main/java/org/apache/openejb/server/discovery/MulticastSearch.java Sat Aug 30 20:23:03 2008
@@ -0,0 +1,95 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ *  Unless required by applicable law or agreed to in writing, software
+ *  distributed under the License is distributed on an "AS IS" BASIS,
+ *  WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ *  See the License for the specific language governing permissions and
+ *  limitations under the License.
+ */
+package org.apache.openejb.server.discovery;
+
+import java.net.URI;
+import java.net.MulticastSocket;
+import java.net.InetAddress;
+import java.net.DatagramPacket;
+import java.util.concurrent.TimeUnit;
+import java.io.IOException;
+
+/**
+ * @version $Rev$ $Date$
+ */
+public class MulticastSearch {
+
+    private static final int BUFF_SIZE = 8192;
+
+    private final MulticastSocket multicast;
+
+    public MulticastSearch() throws IOException {
+        this("239.255.2.3", 6142);
+    }
+
+    public MulticastSearch(String host, int port) throws IOException {
+        InetAddress inetAddress = InetAddress.getByName(host);
+
+        multicast = new MulticastSocket(port);
+        multicast.joinGroup(inetAddress);
+        multicast.setSoTimeout(500);
+    }
+
+    public URI search(int timeout, TimeUnit milliseconds) throws IOException {
+        return search(new DefaultFilter(), timeout, milliseconds);
+    }
+
+    public URI search() throws IOException {
+        return search(new DefaultFilter(), 0, TimeUnit.MILLISECONDS);
+    }
+
+    public URI search(Filter filter) throws IOException {
+        return search(filter, 0, TimeUnit.MILLISECONDS);
+    }
+
+    public URI search(Filter filter, long timeout, TimeUnit unit) throws IOException {
+        timeout = unit.convert(timeout, TimeUnit.MILLISECONDS);
+        long waited = 0;
+
+        byte[] buf = new byte[BUFF_SIZE];
+        DatagramPacket packet = new DatagramPacket(buf, 0, buf.length);
+
+        while (timeout == 0 || waited < timeout){
+            long start = System.currentTimeMillis();
+            try {
+                multicast.receive(packet);
+                if (packet.getLength() > 0) {
+                    String str = new String(packet.getData(), packet.getOffset(), packet.getLength());
+                    URI service = URI.create(str);
+                    if (service != null && filter.accept(service)) {
+                        return service;
+                    }
+                }
+            } finally {
+                long stop = System.currentTimeMillis();
+                waited += stop - start;
+            }
+        }
+
+        return null;
+    }
+
+    public interface Filter {
+        boolean accept(URI service);
+    }
+
+    public static class DefaultFilter implements Filter {
+        public boolean accept(URI service) {
+            return true;
+        }
+    }
+}

Added: openejb/trunk/openejb3/server/openejb-discovery/src/main/resources/META-INF/org.apache.openejb.server.ServersService/multicast
URL: http://svn.apache.org/viewvc/openejb/trunk/openejb3/server/openejb-discovery/src/main/resources/META-INF/org.apache.openejb.server.ServersService/multicast?rev=690629&view=auto
==============================================================================
--- openejb/trunk/openejb3/server/openejb-discovery/src/main/resources/META-INF/org.apache.openejb.server.ServersService/multicast (added)
+++ openejb/trunk/openejb3/server/openejb-discovery/src/main/resources/META-INF/org.apache.openejb.server.ServersService/multicast Sat Aug 30 20:23:03 2008
@@ -0,0 +1,4 @@
+server      = org.apache.openejb.server.discovery.MulticastDiscoveryAgent
+bind        = 239.255.2.3
+port        = 6142
+disabled    = true

Added: openejb/trunk/openejb3/server/openejb-discovery/src/test/java/org/apache/openejb/server/discovery/MulticastDiscoveryAgentTest.java
URL: http://svn.apache.org/viewvc/openejb/trunk/openejb3/server/openejb-discovery/src/test/java/org/apache/openejb/server/discovery/MulticastDiscoveryAgentTest.java?rev=690629&view=auto
==============================================================================
--- openejb/trunk/openejb3/server/openejb-discovery/src/test/java/org/apache/openejb/server/discovery/MulticastDiscoveryAgentTest.java (added)
+++ openejb/trunk/openejb3/server/openejb-discovery/src/test/java/org/apache/openejb/server/discovery/MulticastDiscoveryAgentTest.java Sat Aug 30 20:23:03 2008
@@ -0,0 +1,103 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ *  Unless required by applicable law or agreed to in writing, software
+ *  distributed under the License is distributed on an "AS IS" BASIS,
+ *  WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ *  See the License for the specific language governing permissions and
+ *  limitations under the License.
+ */
+package org.apache.openejb.server.discovery;
+
+import junit.framework.TestCase;
+
+import java.net.URI;
+import java.net.URISyntaxException;
+import java.io.IOException;
+import java.util.HashSet;
+import java.util.Set;
+
+import org.apache.openejb.server.ServiceException;
+
+/**
+ * @version $Rev$ $Date$
+ */
+public class MulticastDiscoveryAgentTest extends TestCase {
+
+    public void testNothing(){}
+
+    public void _test() throws Exception {
+        MulticastDiscoveryAgent[] agents = {agent("red"),agent("green"),agent("yellow"),agent("blue")};
+
+        MulticastSearch multicast = new MulticastSearch();
+        Filter filter = new Filter();
+
+        System.out.println("uri = " + multicast.search(filter));
+        System.out.println("uri = " + multicast.search(filter));
+        System.out.println("uri = " + multicast.search(filter));
+        System.out.println("uri = " + multicast.search(filter));
+
+        Thread.sleep(2000);
+        System.out.println("--");
+
+
+        for (MulticastDiscoveryAgent agent : agents) {
+            Thread.sleep(10000);
+            System.out.println("--");
+            agent.stop();
+        }
+
+        for (MulticastDiscoveryAgent agent : agents) {
+            Thread.sleep(10000);
+            System.out.println("--");
+            agent.start();
+        }
+
+        Thread.sleep(10000);
+
+
+    }
+
+    private static class Filter implements MulticastSearch.Filter {
+        private final Set<URI> seen = new HashSet<URI>();
+        public boolean accept(URI service) {
+            if (seen.contains(service)) return false;
+            seen.add(service);
+            return true;
+        }
+    }
+
+    private MulticastDiscoveryAgent agent(String id) throws IOException, URISyntaxException, ServiceException {
+        MulticastDiscoveryAgent agent = new MulticastDiscoveryAgent();
+        agent.setDiscoveryListener(new MyDiscoveryListener(id));
+        agent.registerService(new URI("ejbd://"+id+":4201"));
+        agent.start();
+        return agent;
+    }
+
+    private static class MyDiscoveryListener implements DiscoveryListener {
+        private final String id;
+
+        public MyDiscoveryListener(String id) {
+            id += "        ";
+            id = id.substring(0,8);
+            this.id = id;
+        }
+
+        public void serviceAdded(URI service) {
+            System.out.println(id + "add " + service.toString());
+        }
+
+        public void serviceRemoved(URI service) {
+            System.out.println(id + "remove " + service.toString());
+        }
+    }
+
+}

Modified: openejb/trunk/openejb3/server/openejb-server/src/main/java/org/apache/openejb/server/ServiceDaemon.java
URL: http://svn.apache.org/viewvc/openejb/trunk/openejb3/server/openejb-server/src/main/java/org/apache/openejb/server/ServiceDaemon.java?rev=690629&r1=690628&r2=690629&view=diff
==============================================================================
--- openejb/trunk/openejb3/server/openejb-server/src/main/java/org/apache/openejb/server/ServiceDaemon.java (original)
+++ openejb/trunk/openejb3/server/openejb-server/src/main/java/org/apache/openejb/server/ServiceDaemon.java Sat Aug 30 20:23:03 2008
@@ -70,7 +70,7 @@
         this.next = next;
     }
 
-    private static InetAddress getAddress(String host){
+    public static InetAddress getAddress(String host){
         try {
             return InetAddress.getByName(host);
         } catch (UnknownHostException e) {
@@ -88,6 +88,16 @@
         }
     }
 
+    public static long getLong(Properties p, String property, long defaultValue){
+        String value = p.getProperty(property);
+        try {
+            if (value != null) return Long.parseLong(value);
+            else return defaultValue;
+        } catch (NumberFormatException e) {
+            return defaultValue;
+        }
+    }
+
     public static boolean getBoolean(Properties p, String property, boolean defaultValue){
         String value = p.getProperty(property);
         try {

Modified: openejb/trunk/openejb3/server/pom.xml
URL: http://svn.apache.org/viewvc/openejb/trunk/openejb3/server/pom.xml?rev=690629&r1=690628&r2=690629&view=diff
==============================================================================
--- openejb/trunk/openejb3/server/pom.xml (original)
+++ openejb/trunk/openejb3/server/pom.xml Sat Aug 30 20:23:03 2008
@@ -42,6 +42,7 @@
     <module>openejb-axis</module>
     <module>openejb-axis2</module>
     <module>openejb-cxf</module>
+    <module>openejb-discovery</module>
   </modules>
 </project>