You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@tomee.apache.org by db...@apache.org on 2008/09/09 02:09:27 UTC

svn commit: r693321 [2/2] - in /openejb/trunk/openejb3: container/openejb-core/src/main/java/org/apache/openejb/util/ server/openejb-client/src/main/java/org/apache/openejb/client/ server/openejb-client/src/test/java/org/apache/openejb/client/ server/o...

Modified: openejb/trunk/openejb3/server/openejb-ejbd/src/main/java/org/apache/openejb/server/ejbd/KeepAliveServer.java
URL: http://svn.apache.org/viewvc/openejb/trunk/openejb3/server/openejb-ejbd/src/main/java/org/apache/openejb/server/ejbd/KeepAliveServer.java?rev=693321&r1=693320&r2=693321&view=diff
==============================================================================
--- openejb/trunk/openejb3/server/openejb-ejbd/src/main/java/org/apache/openejb/server/ejbd/KeepAliveServer.java (original)
+++ openejb/trunk/openejb3/server/openejb-ejbd/src/main/java/org/apache/openejb/server/ejbd/KeepAliveServer.java Mon Sep  8 17:09:25 2008
@@ -16,36 +16,48 @@
  */
 package org.apache.openejb.server.ejbd;
 
+import org.apache.openejb.loader.SystemInstance;
 import org.apache.openejb.server.ServerService;
 import org.apache.openejb.server.ServiceException;
 import org.apache.openejb.server.ServicePool;
-import org.apache.openejb.loader.SystemInstance;
+import org.apache.openejb.util.LogCategory;
+import org.apache.openejb.util.Logger;
+import org.apache.openejb.util.Exceptions;
+import org.apache.openejb.client.KeepAliveStyle;
 
-import java.io.InputStream;
-import java.io.OutputStream;
-import java.io.IOException;
-import java.io.InterruptedIOException;
 import java.io.BufferedInputStream;
 import java.io.BufferedOutputStream;
+import java.io.IOException;
+import java.io.InputStream;
+import java.io.InterruptedIOException;
+import java.io.OutputStream;
 import java.net.Socket;
-import java.util.Properties;
+import java.net.SocketException;
+import java.text.SimpleDateFormat;
+import java.util.Date;
 import java.util.Map;
-import java.util.TimerTask;
+import java.util.Properties;
 import java.util.Timer;
-import java.util.Date;
-import java.util.Collection;
+import java.util.TimerTask;
+import java.util.concurrent.BlockingQueue;
 import java.util.concurrent.ConcurrentHashMap;
 import java.util.concurrent.ThreadPoolExecutor;
-import java.util.concurrent.BlockingQueue;
-import java.text.SimpleDateFormat;
+import java.util.concurrent.atomic.AtomicBoolean;
+import java.util.concurrent.locks.Lock;
+import java.util.concurrent.locks.ReentrantLock;
 
 /**
  * @version $Rev$ $Date$
  */
 public class KeepAliveServer implements ServerService {
+
+    private static final Logger logger = Logger.getInstance(LogCategory.OPENEJB_SERVER.createChild("keepalive"), KeepAliveServer.class);
     private final ServerService service;
     private final long timeout = (1000 * 3);
-    private final KeepAliveTimer keepAliveTimer = new KeepAliveTimer(timeout);
+
+    private final AtomicBoolean stop = new AtomicBoolean();
+    private final KeepAliveTimer keepAliveTimer;
+    private Timer timer;
 
     public KeepAliveServer() {
         this(new EjbServer());
@@ -54,26 +66,27 @@
     public KeepAliveServer(ServerService service) {
         this.service = service;
 
+        keepAliveTimer = new KeepAliveTimer();
 
-        Timer timer = new Timer("KeepAliveTimer", true);
+        timer = new Timer("KeepAliveTimer", true);
         timer.scheduleAtFixedRate(keepAliveTimer, timeout, timeout / 2);
-
     }
 
 
+    public class KeepAliveTimer extends TimerTask {
 
-    public static class KeepAliveTimer extends TimerTask {
+        // Doesn't need to be a map.  Could be a set if Session.equals/hashCode only referenced the Thread.
+        private final Map<Thread, Session> sessions = new ConcurrentHashMap<Thread, Session>();
 
-        private final Map<Thread, Status> statusMap = new ConcurrentHashMap<Thread, Status>();
-
-        private final long timeout;
         private BlockingQueue<Runnable> queue;
 
-        public KeepAliveTimer(long timeout) {
-            this.timeout = timeout;
+        public void run() {
+            if (!stop.get()) {
+                closeInactiveSessions();
+            }
         }
 
-        public void run() {
+        private void closeInactiveSessions() {
             BlockingQueue<Runnable> queue = getQueue();
             if (queue == null) return;
 
@@ -82,28 +95,50 @@
 
             long now = System.currentTimeMillis();
 
-            Collection<Status> statuses = statusMap.values();
-            for (Status status : statuses) {
+            for (Session session : sessions.values()) {
 
-//                System.out.println(""+status);
-
-                if (status.isReading() && now - status.getTime() > timeout){
-//                    System.out.println("Thread Interrupt");
+                if (session.usage.tryLock()) {
                     try {
-                        backlog--;
-                        status.in.close();
-                    } catch (IOException e) {
-                        e.printStackTrace();
+                        if (now - session.lastRequest > timeout) {
+                            try {
+                                backlog--;
+                                session.socket.close();
+                            } catch (IOException e) {
+                                logger.info("Error closing socket.", e);
+                            } finally {
+                                removeSession(session);
+                            }
+                        }
+                    } finally {
+                        session.usage.unlock();
                     }
                 }
 
                 if (backlog <= 0) return;
             }
-//            System.out.println("exit");
+        }
+
+        public void closeSessions() {
+
+            // Close the ones we can
+            for (Session session : sessions.values()) {
+                if (session.usage.tryLock()) {
+                    try {
+                        session.socket.close();
+                    } catch (IOException e) {
+                        logger.info("Error closing socket.", e);
+                    } finally {
+                        removeSession(session);
+                        session.usage.unlock();
+                    }
+                } else {
+                    logger.info("Allowing graceful shutdown of " + session.socket.getInetAddress());
+                }
+            }
         }
 
         private BlockingQueue<Runnable> getQueue() {
-            if (queue == null){
+            if (queue == null) {
                 // this can be null if timer fires before service is fully initialized
                 ServicePool incoming = SystemInstance.get().getComponent(ServicePool.class);
                 if (incoming == null) return null;
@@ -113,79 +148,87 @@
             return queue;
         }
 
-        public Status setStatus(Status status) {
-//            System.out.println("status = " + status);
-            return statusMap.put(status.getThread(), status);
+        public Session addSession(Session session) {
+            return sessions.put(session.thread, session);
+        }
+
+        public Session removeSession(Session session) {
+            return sessions.remove(session.thread);
         }
     }
 
-    public static class Status {
-        private final long time;
-        private final boolean reading;
-        private final Thread thread;
-        private static final SimpleDateFormat format = new SimpleDateFormat("HH:mm:ss.SSS");
-        private final InputStream in;
+    private class Session {
 
-        public boolean isReading() {
-            return reading;
-        }
+        private final Thread thread;
+        private final Lock usage = new ReentrantLock();
 
-        public Thread getThread() {
-            return thread;
-        }
+        // only used inside the Lock
+        private long lastRequest;
 
-        public long getTime() {
-            return time;
-        }
+        // only used inside the Lock
+        private final Socket socket;
 
-        public Status(boolean reading, InputStream in) {
-            this.reading = reading;
+        public Session(Socket socket) {
+            this.socket = socket;
+            this.lastRequest = System.currentTimeMillis();
             this.thread = Thread.currentThread();
-            this.time = System.currentTimeMillis();
-            this.in = in;
         }
 
-        public String toString() {
-            String msg = "";
-            if (reading)
-            msg += "READING";
-            else msg += "WORKING";
-            msg += " "+thread.getName();
+        public void service(Socket socket) throws ServiceException, IOException {
+            keepAliveTimer.addSession(this);
 
-            msg += " since "+ format.format(new Date(time));
-            return msg;
-        }
-    }
+            int i = -1;
 
+            try {
+                InputStream in = new BufferedInputStream(socket.getInputStream());
+                OutputStream out = new BufferedOutputStream(socket.getOutputStream());
 
-    public void service(Socket socket) throws ServiceException, IOException {
-        InputStream in = new BufferedInputStream(socket.getInputStream());
-        OutputStream out = new BufferedOutputStream(socket.getOutputStream());
+                while (!stop.get()) {
+                    try {
+                        i = in.read();
+                    } catch (SocketException e) {
+                        // Socket closed.
+                        break;
+                    }
+                    KeepAliveStyle style = KeepAliveStyle.values()[i];
 
-        try {
-            while (true) {
-                keepAliveTimer.setStatus(new Status(true, in));
-                int i = in.read();
-                char c = (char) i;
-                if (i == 30){
-                    keepAliveTimer.setStatus(new Status(false, null));
-                    service.service(new Input(in), new Output(out));
-                    out.flush();
-                } else {
-                    keepAliveTimer.setStatus(new Status(false, null));
-                    break;
+                    try {
+                        usage.lock();
+
+                        switch(style){
+                            case PING_PING: {
+                                in.read();
+                            }
+                            break;
+                            case PING_PONG: {
+                                out.write(style.ordinal());
+                                out.flush();
+                            }
+                        }
+
+                        service.service(new Input(in), new Output(out));
+                        out.flush();
+                    } finally {
+                        this.lastRequest = System.currentTimeMillis();
+                        usage.unlock();
+                    }
                 }
+            } catch (ArrayIndexOutOfBoundsException e){
+                throw new IOException("Unexpected byte " + i);
+            } catch (InterruptedIOException e) {
+                Thread.interrupted();
+            } finally {
+                keepAliveTimer.removeSession(this);
             }
-        } catch (InterruptedIOException e) {
-            Thread.interrupted();
-        } catch (IOException e) {
-        } finally{
-            keepAliveTimer.setStatus(new Status(false, null));
-//            System.out.println("close socket");
-            socket.close();
         }
     }
 
+
+    public void service(Socket socket) throws ServiceException, IOException {
+        Session session = new Session(socket);
+        session.service(socket);
+    }
+
     public void service(InputStream in, OutputStream out) throws ServiceException, IOException {
     }
 
@@ -202,11 +245,16 @@
     }
 
     public void start() throws ServiceException {
-        service.start();
+        stop.set(false);
+
+//        service.start();
     }
 
+
     public void stop() throws ServiceException {
-        service.stop();
+        stop.set(true);
+        keepAliveTimer.closeSessions();
+//        service.stop();
     }
 
     public void init(Properties props) throws Exception {

Added: openejb/trunk/openejb3/server/openejb-ejbd/src/test/java/org/apache/openejb/server/ejbd/FailoverTest.java
URL: http://svn.apache.org/viewvc/openejb/trunk/openejb3/server/openejb-ejbd/src/test/java/org/apache/openejb/server/ejbd/FailoverTest.java?rev=693321&view=auto
==============================================================================
--- openejb/trunk/openejb3/server/openejb-ejbd/src/test/java/org/apache/openejb/server/ejbd/FailoverTest.java (added)
+++ openejb/trunk/openejb3/server/openejb-ejbd/src/test/java/org/apache/openejb/server/ejbd/FailoverTest.java Mon Sep  8 17:09:25 2008
@@ -0,0 +1,221 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ *  Unless required by applicable law or agreed to in writing, software
+ *  distributed under the License is distributed on an "AS IS" BASIS,
+ *  WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ *  See the License for the specific language governing permissions and
+ *  limitations under the License.
+ */
+package org.apache.openejb.server.ejbd;
+
+import junit.framework.TestCase;
+import org.apache.openejb.OpenEJB;
+import org.apache.openejb.OpenEJBException;
+import org.apache.openejb.assembler.classic.Assembler;
+import org.apache.openejb.config.ConfigurationFactory;
+import org.apache.openejb.core.ServerFederation;
+import org.apache.openejb.jee.EjbJar;
+import org.apache.openejb.jee.StatelessBean;
+import org.apache.openejb.loader.SystemInstance;
+import org.apache.openejb.server.DiscoveryAgent;
+import org.apache.openejb.server.DiscoveryListener;
+import org.apache.openejb.server.ServerService;
+import org.apache.openejb.server.ServerServiceFilter;
+import org.apache.openejb.server.ServiceDaemon;
+import org.apache.openejb.server.ServiceException;
+
+import javax.ejb.Remote;
+import javax.naming.Context;
+import javax.naming.InitialContext;
+import javax.naming.NamingException;
+import java.io.IOException;
+import java.io.InputStream;
+import java.io.OutputStream;
+import java.net.Socket;
+import java.net.URI;
+import java.util.ArrayList;
+import java.util.List;
+import java.util.Properties;
+
+/**
+ * @version $Rev$ $Date$
+ */
+public class FailoverTest extends TestCase {
+
+    private static DiscoveryAgent agent = new TestAgent();
+
+    public void test() throws Exception {
+
+        Properties initProps = new Properties();
+        initProps.setProperty("openejb.deployments.classpath.include", "");
+        initProps.setProperty("openejb.deployments.classpath.filter.descriptors", "true");
+        OpenEJB.init(initProps, new ServerFederation());
+
+        SystemInstance.get().setComponent(DiscoveryAgent.class, agent);
+
+        ServerService red = server(Host.RED);
+        ServerService blue = server(Host.BLUE);
+        ServerService green = server(Host.GREEN);
+
+        red.start();
+        blue.start();
+        green.start();
+
+        TargetRemote target = getBean(red);
+
+        assertEquals(Host.RED, target.getHost());
+
+        red.stop();
+
+        assertEquals(Host.BLUE, target.getHost());
+
+        blue.stop();
+
+        assertEquals(Host.GREEN, target.getHost());
+    }
+
+    private TargetRemote getBean(ServerService server) throws NamingException, IOException, OpenEJBException {
+        int port = server.getPort();
+
+        Assembler assembler = SystemInstance.get().getComponent(Assembler.class);
+        ConfigurationFactory config = new ConfigurationFactory();
+
+        EjbJar ejbJar = new EjbJar();
+        ejbJar.addEnterpriseBean(new StatelessBean(Target.class));
+        assembler.createApplication(config.configureApplication(ejbJar));
+
+        // good creds
+        Properties props = new Properties();
+        props.put("java.naming.factory.initial", "org.apache.openejb.client.RemoteInitialContextFactory");
+        props.put("java.naming.provider.url", "ejbd://localhost:" + port + "/RED");
+        System.setProperty("openejb.client.keepalive", "ping_pong");
+        Context context = new InitialContext(props);
+        TargetRemote target = (TargetRemote) context.lookup("TargetRemote");
+        return target;
+    }
+
+    private ServerService server(Host host) throws Exception {
+        ServerService server = new EjbServer();
+
+        server = new HostFilter(server, host);
+
+        server = new ServiceDaemon(server, 0, "localhost");
+
+        server = new AgentFilter(server, agent, host);
+
+        server.init(new Properties());
+
+        return server;
+    }
+
+
+    // Simple single-threaded version, way easier on testing
+    public static class TestAgent implements DiscoveryAgent {
+
+        private final List<DiscoveryListener> listeners = new ArrayList<DiscoveryListener>();
+
+        public void registerService(URI serviceUri) throws IOException {
+            for (DiscoveryListener listener : listeners) {
+                listener.serviceAdded(serviceUri);
+            }
+        }
+
+        public void reportFailed(URI serviceUri) throws IOException {
+        }
+
+        public void setDiscoveryListener(DiscoveryListener listener) {
+            listeners.add(listener);
+        }
+
+        public void unregisterService(URI serviceUri) throws IOException {
+            for (DiscoveryListener listener : listeners) {
+                listener.serviceRemoved(serviceUri);
+            }
+        }
+
+    }
+
+    public static enum Host {
+        RED, BLUE, GREEN;
+    }
+
+    public static ThreadLocal<Host> host = new ThreadLocal<Host>();
+
+
+    public static class AgentFilter extends ServerServiceFilter {
+        private final Host host;
+        private final DiscoveryAgent agent;
+        private URI uri;
+
+        public AgentFilter(ServerService service, DiscoveryAgent agent, Host host) {
+            super(service);
+            this.agent = agent;
+            this.host = host;
+        }
+
+        public void start() throws ServiceException {
+            super.start();
+            try {
+                uri = new URI("ejb:ejbd://localhost:" + getPort() + "/" + host);
+                agent.registerService(uri);
+            } catch (Exception e) {
+                throw new ServiceException(e);
+            }
+        }
+
+        public void stop() throws ServiceException {
+            super.stop();
+            try {
+                agent.unregisterService(uri);
+            } catch (Exception e) {
+                throw new ServiceException(e);
+            }
+        }
+    }
+
+    public static class HostFilter extends ServerServiceFilter {
+        private final Host me;
+
+        public HostFilter(ServerService service, Host me) {
+            super(service);
+            this.me = me;
+        }
+
+        public void service(InputStream in, OutputStream out) throws ServiceException, IOException {
+            try {
+                host.set(me);
+                super.service(in, out);
+            } finally {
+                host.remove();
+            }
+        }
+
+        public void service(Socket socket) throws ServiceException, IOException {
+            try {
+                host.set(me);
+                super.service(socket);
+            } finally {
+                host.remove();
+            }
+        }
+    }
+
+    public static class Target implements TargetRemote {
+        public Host getHost() {
+            return host.get();
+        }
+    }
+
+    @Remote
+    public static interface TargetRemote {
+        Host getHost();
+    }
+}

Copied: openejb/trunk/openejb3/server/openejb-server/src/main/java/org/apache/openejb/server/DiscoveryAgent.java (from r691472, openejb/trunk/openejb3/server/openejb-discovery/src/main/java/org/apache/openejb/server/discovery/DiscoveryAgent.java)
URL: http://svn.apache.org/viewvc/openejb/trunk/openejb3/server/openejb-server/src/main/java/org/apache/openejb/server/DiscoveryAgent.java?p2=openejb/trunk/openejb3/server/openejb-server/src/main/java/org/apache/openejb/server/DiscoveryAgent.java&p1=openejb/trunk/openejb3/server/openejb-discovery/src/main/java/org/apache/openejb/server/discovery/DiscoveryAgent.java&r1=691472&r2=693321&rev=693321&view=diff
==============================================================================
--- openejb/trunk/openejb3/server/openejb-discovery/src/main/java/org/apache/openejb/server/discovery/DiscoveryAgent.java (original)
+++ openejb/trunk/openejb3/server/openejb-server/src/main/java/org/apache/openejb/server/DiscoveryAgent.java Mon Sep  8 17:09:25 2008
@@ -14,7 +14,7 @@
  *  See the License for the specific language governing permissions and
  *  limitations under the License.
  */
-package org.apache.openejb.server.discovery;
+package org.apache.openejb.server;
 
 import java.net.URI;
 import java.io.IOException;

Copied: openejb/trunk/openejb3/server/openejb-server/src/main/java/org/apache/openejb/server/DiscoveryListener.java (from r691472, openejb/trunk/openejb3/server/openejb-discovery/src/main/java/org/apache/openejb/server/discovery/DiscoveryListener.java)
URL: http://svn.apache.org/viewvc/openejb/trunk/openejb3/server/openejb-server/src/main/java/org/apache/openejb/server/DiscoveryListener.java?p2=openejb/trunk/openejb3/server/openejb-server/src/main/java/org/apache/openejb/server/DiscoveryListener.java&p1=openejb/trunk/openejb3/server/openejb-discovery/src/main/java/org/apache/openejb/server/discovery/DiscoveryListener.java&r1=691472&r2=693321&rev=693321&view=diff
==============================================================================
--- openejb/trunk/openejb3/server/openejb-discovery/src/main/java/org/apache/openejb/server/discovery/DiscoveryListener.java (original)
+++ openejb/trunk/openejb3/server/openejb-server/src/main/java/org/apache/openejb/server/DiscoveryListener.java Mon Sep  8 17:09:25 2008
@@ -14,7 +14,7 @@
  *  See the License for the specific language governing permissions and
  *  limitations under the License.
  */
-package org.apache.openejb.server.discovery;
+package org.apache.openejb.server;
 
 
 import java.net.URI;
@@ -23,7 +23,7 @@
 /**
  * @version $Rev$ $Date$
  */
-public interface DiscoveryListener {
+public interface  DiscoveryListener {
     public void serviceAdded(URI service);
     public void serviceRemoved(URI service);
 

Added: openejb/trunk/openejb3/server/openejb-server/src/main/java/org/apache/openejb/server/DiscoveryRegistry.java
URL: http://svn.apache.org/viewvc/openejb/trunk/openejb3/server/openejb-server/src/main/java/org/apache/openejb/server/DiscoveryRegistry.java?rev=693321&view=auto
==============================================================================
--- openejb/trunk/openejb3/server/openejb-server/src/main/java/org/apache/openejb/server/DiscoveryRegistry.java (added)
+++ openejb/trunk/openejb3/server/openejb-server/src/main/java/org/apache/openejb/server/DiscoveryRegistry.java Mon Sep  8 17:09:25 2008
@@ -0,0 +1,145 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ *  Unless required by applicable law or agreed to in writing, software
+ *  distributed under the License is distributed on an "AS IS" BASIS,
+ *  WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ *  See the License for the specific language governing permissions and
+ *  limitations under the License.
+ */
+package org.apache.openejb.server;
+
+import org.apache.openejb.loader.SystemInstance;
+
+import java.net.URI;
+import java.util.List;
+import java.util.ArrayList;
+import java.util.Collections;
+import java.util.Map;
+import java.util.Set;
+import java.util.HashSet;
+import java.util.concurrent.Executor;
+import java.util.concurrent.ThreadPoolExecutor;
+import java.util.concurrent.TimeUnit;
+import java.util.concurrent.LinkedBlockingQueue;
+import java.util.concurrent.ThreadFactory;
+import java.util.concurrent.ConcurrentHashMap;
+import java.io.IOException;
+
+/**
+ * @version $Rev$ $Date$
+ */
+public class DiscoveryRegistry implements DiscoveryListener, DiscoveryAgent {
+
+    private final DiscoveryAgent agent;
+    private final List<DiscoveryListener> listeners = new ArrayList<DiscoveryListener>();
+    private final Map<String, URI> services = new ConcurrentHashMap<String, URI>();
+
+    private final Executor executor = new ThreadPoolExecutor(1, 10, 30, TimeUnit.SECONDS, new LinkedBlockingQueue<Runnable>(), new ThreadFactory() {
+        public Thread newThread(Runnable runable) {
+            Thread t = new Thread(runable, DiscoveryRegistry.class.getSimpleName());
+            t.setDaemon(true);
+            return t;
+        }
+    });
+
+    public DiscoveryRegistry(DiscoveryAgent agent) {
+        this.agent = agent;
+        agent.setDiscoveryListener(this);
+        SystemInstance.get().setComponent(DiscoveryRegistry.class, this);
+        SystemInstance.get().setComponent(DiscoveryAgent.class, this);
+    }
+
+    public Set<URI> getServices() {
+        return new HashSet<URI>(services.values());
+    }
+
+    public void registerService(URI serviceUri) throws IOException {
+        agent.registerService(serviceUri);
+    }
+
+    public void reportFailed(URI serviceUri) throws IOException {
+        agent.reportFailed(serviceUri);
+    }
+
+    public void unregisterService(URI serviceUri) throws IOException {
+        agent.unregisterService(serviceUri);
+    }
+
+    public void setDiscoveryListener(DiscoveryListener listener) {
+        addDiscoveryListener(listener);
+    }
+
+    public void addDiscoveryListener(DiscoveryListener listener){
+        // get the listener caught up
+        for (URI service : services.values()) {
+            executor.execute(new ServiceAddedTask(listener, service));
+        }
+
+        listeners.add(listener);
+    }
+
+    public void removeDiscoveryListener(DiscoveryListener listener){
+        listeners.remove(listener);
+    }
+
+
+    public void serviceAdded(URI service) {
+        services.put(service.toString(), service);
+        for (final DiscoveryListener discoveryListener : getListeners()) {
+            executor.execute(new ServiceAddedTask(discoveryListener, service));
+        }
+    }
+
+    public void serviceRemoved(URI service) {
+
+        for (final DiscoveryListener discoveryListener : getListeners()) {
+            executor.execute(new ServiceRemovedTask(discoveryListener, service));
+        }
+    }
+
+    List<DiscoveryListener> getListeners(){
+        return Collections.unmodifiableList(listeners);
+    }
+
+    private abstract static class Task implements Runnable {
+        protected final DiscoveryListener discoveryListener;
+        protected final URI service;
+
+        protected Task(DiscoveryListener discoveryListener, URI service) {
+            this.discoveryListener = discoveryListener;
+            this.service = service;
+        }
+    }
+
+    private static class ServiceRemovedTask extends Task {
+        public ServiceRemovedTask(DiscoveryListener discoveryListener, URI service) {
+            super(discoveryListener, service);
+        }
+
+        public void run() {
+            if (discoveryListener != null) {
+                discoveryListener.serviceRemoved(service);
+            }
+        }
+    }
+
+    private static class ServiceAddedTask extends Task {
+        public ServiceAddedTask(DiscoveryListener discoveryListener, URI service) {
+            super(discoveryListener, service);
+        }
+
+        public void run() {
+            if (discoveryListener != null) {
+                discoveryListener.serviceAdded(service);
+            }
+        }
+    }
+}

Added: openejb/trunk/openejb3/server/openejb-server/src/main/java/org/apache/openejb/server/ServerServiceFilter.java
URL: http://svn.apache.org/viewvc/openejb/trunk/openejb3/server/openejb-server/src/main/java/org/apache/openejb/server/ServerServiceFilter.java?rev=693321&view=auto
==============================================================================
--- openejb/trunk/openejb3/server/openejb-server/src/main/java/org/apache/openejb/server/ServerServiceFilter.java (added)
+++ openejb/trunk/openejb3/server/openejb-server/src/main/java/org/apache/openejb/server/ServerServiceFilter.java Mon Sep  8 17:09:25 2008
@@ -0,0 +1,70 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ *  Unless required by applicable law or agreed to in writing, software
+ *  distributed under the License is distributed on an "AS IS" BASIS,
+ *  WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ *  See the License for the specific language governing permissions and
+ *  limitations under the License.
+ */
+package org.apache.openejb.server;
+
+import org.apache.openejb.server.ServerService;
+import org.apache.openejb.server.ServiceException;
+
+import java.io.InputStream;
+import java.io.OutputStream;
+import java.io.IOException;
+import java.net.Socket;
+import java.util.Properties;
+
+/**
+ * TODO: Make this the superclass of the appropriate ServerService implementations
+ * @version $Rev$ $Date$
+ */
+public class ServerServiceFilter implements ServerService {
+    private final ServerService service;
+
+    public ServerServiceFilter(ServerService service) {
+        this.service = service;
+    }
+
+    public String getIP() {
+        return service.getIP();
+    }
+
+    public String getName() {
+        return service.getName();
+    }
+
+    public int getPort() {
+        return service.getPort();
+    }
+
+    public void service(InputStream in, OutputStream out) throws ServiceException, IOException {
+        service.service(in, out);
+    }
+
+    public void service(Socket socket) throws ServiceException, IOException {
+        service.service(socket);
+    }
+
+    public void start() throws ServiceException {
+        service.start();
+    }
+
+    public void stop() throws ServiceException {
+        service.stop();
+    }
+
+    public void init(Properties props) throws Exception {
+        service.init(props);
+    }
+}

Modified: openejb/trunk/openejb3/server/openejb-server/src/main/java/org/apache/openejb/server/ServiceDaemon.java
URL: http://svn.apache.org/viewvc/openejb/trunk/openejb3/server/openejb-server/src/main/java/org/apache/openejb/server/ServiceDaemon.java?rev=693321&r1=693320&r2=693321&view=diff
==============================================================================
--- openejb/trunk/openejb3/server/openejb-server/src/main/java/org/apache/openejb/server/ServiceDaemon.java (original)
+++ openejb/trunk/openejb3/server/openejb-server/src/main/java/org/apache/openejb/server/ServiceDaemon.java Mon Sep  8 17:09:25 2008
@@ -32,6 +32,12 @@
 import java.net.SocketTimeoutException;
 import java.net.UnknownHostException;
 import java.util.Properties;
+import java.util.concurrent.atomic.AtomicBoolean;
+import java.util.concurrent.locks.ReentrantReadWriteLock;
+import java.util.concurrent.locks.ReadWriteLock;
+import java.util.concurrent.locks.Lock;
+import java.util.concurrent.locks.ReentrantLock;
+import java.util.concurrent.TimeUnit;
 
 /**
  */
@@ -143,7 +149,7 @@
         secure = getBoolean(props, "secure", false);
 
         timeout = 1000;
-        
+
         next.init(props);
     }
 
@@ -185,11 +191,11 @@
     public void stop() throws ServiceException {
 
         synchronized (this) {
+            next.stop();
             if (socketListener != null) {
                 socketListener.stop();
                 socketListener = null;
             }
-            next.stop();
         }
     }
 
@@ -216,35 +222,49 @@
     }
 
     private static class SocketListener implements Runnable {
-        private ServerService serverService;
-        private ServerSocket serverSocket;
-        private boolean stopped;
+        private final ServerService serverService;
+        private final ServerSocket serverSocket;
+        private AtomicBoolean stop = new AtomicBoolean();
+        private Lock lock = new ReentrantLock();
 
         public SocketListener(ServerService serverService, ServerSocket serverSocket) {
             this.serverService = serverService;
             this.serverSocket = serverSocket;
-            stopped = false;
         }
 
-        public synchronized void stop() {
-            stopped = true;
-        }
-
-        private synchronized boolean shouldStop() {
-            return stopped;
+        public void stop() {
+            stop.set(true);
+            try {
+                if (lock.tryLock(10, TimeUnit.SECONDS)){
+                    serverSocket.close();
+                }
+            } catch (InterruptedException e) {
+                Thread.interrupted();
+            } catch (IOException e) {
+            }
         }
 
         public void run() {
-            while (!shouldStop()) {
+            while (!stop.get()) {
                 Socket socket = null;
                 try {
                     socket = serverSocket.accept();
                     socket.setTcpNoDelay(true);
-                    if (!shouldStop()) {
+                    if (!stop.get()) {
                         // the server service is responsible
                         // for closing the socket.
-                        serverService.service(socket);
+                        try {
+                            lock.lock();
+                            serverService.service(socket);
+                        } finally {
+                            lock.unlock();
+                        }
                     }
+
+                    // Sockets are consumed in other threads
+                    // and should never be closed here
+                    // It's up to the consumer of the socket
+                    // to close it.
                 } catch (SocketTimeoutException e) {
                     // we don't really care
                     // log.debug("Socket timed-out",e);
@@ -253,15 +273,11 @@
                 }
             }
 
-            if (serverSocket != null) {
-                try {
-                    serverSocket.close();
-                } catch (IOException ioException) {
-                    log.debug("Error cleaning up socked", ioException);
-                }
-                serverSocket = null;
+            try {
+                serverSocket.close();
+            } catch (IOException ioException) {
+                log.debug("Error cleaning up socked", ioException);
             }
-            serverService = null;
         }
 
         public void setSoTimeout(int timeout) throws SocketException {

Modified: openejb/trunk/openejb3/server/openejb-server/src/main/java/org/apache/openejb/server/ServiceManager.java
URL: http://svn.apache.org/viewvc/openejb/trunk/openejb3/server/openejb-server/src/main/java/org/apache/openejb/server/ServiceManager.java?rev=693321&r1=693320&r2=693321&view=diff
==============================================================================
--- openejb/trunk/openejb3/server/openejb-server/src/main/java/org/apache/openejb/server/ServiceManager.java (original)
+++ openejb/trunk/openejb3/server/openejb-server/src/main/java/org/apache/openejb/server/ServiceManager.java Mon Sep  8 17:09:25 2008
@@ -44,6 +44,8 @@
 import java.util.Map;
 import java.util.Properties;
 import java.util.Set;
+import java.util.LinkedHashMap;
+import java.util.Collections;
 
 /**
  * @version $Rev$ $Date$
@@ -144,7 +146,8 @@
 
         ServiceFinder serviceFinder = new ServiceFinder("META-INF/");
 
-        Map availableServices = serviceFinder.mapAvailableServices(ServerService.class);
+        Map<String, Properties> availableServices = serviceFinder.mapAvailableServices(ServerService.class);
+
         List enabledServers = new ArrayList();
 
         OpenEjbConfiguration conf = SystemInstance.get().getComponent(OpenEjbConfiguration.class);
@@ -198,6 +201,12 @@
 
                     service = (ServerService) recipe.create(serviceClass.getClassLoader());
 
+                    if (service instanceof DiscoveryAgent){
+                        DiscoveryAgent agent = (DiscoveryAgent) service;
+                        DiscoveryRegistry registry = new DiscoveryRegistry(agent);
+                        SystemInstance.get().setComponent(DiscoveryRegistry.class, registry);
+                    }
+
                     if (!(service instanceof SelfManaging)) {
                         service = new ServicePool(service, serviceName, serviceProperties);
                         service = new ServiceLogger(service);

Modified: openejb/trunk/openejb3/server/openejb-server/src/main/java/org/apache/openejb/server/ServicePool.java
URL: http://svn.apache.org/viewvc/openejb/trunk/openejb3/server/openejb-server/src/main/java/org/apache/openejb/server/ServicePool.java?rev=693321&r1=693320&r2=693321&view=diff
==============================================================================
--- openejb/trunk/openejb3/server/openejb-server/src/main/java/org/apache/openejb/server/ServicePool.java (original)
+++ openejb/trunk/openejb3/server/openejb-server/src/main/java/org/apache/openejb/server/ServicePool.java Mon Sep  8 17:09:25 2008
@@ -23,6 +23,7 @@
 import java.util.concurrent.ThreadPoolExecutor;
 import java.util.concurrent.ThreadFactory;
 import java.util.concurrent.TimeUnit;
+import java.util.concurrent.atomic.AtomicBoolean;
 
 import org.apache.openejb.util.LogCategory;
 import org.apache.openejb.util.Logger;
@@ -40,6 +41,7 @@
     private final ServerService next;
     private final Executor executor;
     private final ThreadPoolExecutor threadPool;
+    private final AtomicBoolean stop = new AtomicBoolean();
 
     public ServicePool(ServerService next, String name, Properties properties) {
         this(next, name, getInt(properties, "threads", 100));
@@ -86,6 +88,7 @@
         final Runnable service = new Runnable() {
             public void run() {
                 try {
+                    if (stop.get()) return;
                     next.service(socket);
                 } catch (SecurityException e) {
                     log.error("Security error: " + e.getMessage(), e);
@@ -93,6 +96,12 @@
                     log.error("Unexpected error", e);
                 } finally {
                     try {
+                        // Once the thread is done with the socket, clean it up
+                        // The ServiceDaemon does not close the sockets as it is
+                        // single threaded and only accepts sockets and then
+                        // hands them off to be proceeceed.  As the thread doing
+                        // that processing it is our job to close the socket
+                        // when we are finished with it.
                         if (socket != null) {
                             socket.close();
                         }