You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@tomee.apache.org by db...@apache.org on 2008/09/09 02:09:27 UTC
svn commit: r693321 [2/2] - in /openejb/trunk/openejb3:
container/openejb-core/src/main/java/org/apache/openejb/util/
server/openejb-client/src/main/java/org/apache/openejb/client/
server/openejb-client/src/test/java/org/apache/openejb/client/ server/o...
Modified: openejb/trunk/openejb3/server/openejb-ejbd/src/main/java/org/apache/openejb/server/ejbd/KeepAliveServer.java
URL: http://svn.apache.org/viewvc/openejb/trunk/openejb3/server/openejb-ejbd/src/main/java/org/apache/openejb/server/ejbd/KeepAliveServer.java?rev=693321&r1=693320&r2=693321&view=diff
==============================================================================
--- openejb/trunk/openejb3/server/openejb-ejbd/src/main/java/org/apache/openejb/server/ejbd/KeepAliveServer.java (original)
+++ openejb/trunk/openejb3/server/openejb-ejbd/src/main/java/org/apache/openejb/server/ejbd/KeepAliveServer.java Mon Sep 8 17:09:25 2008
@@ -16,36 +16,48 @@
*/
package org.apache.openejb.server.ejbd;
+import org.apache.openejb.loader.SystemInstance;
import org.apache.openejb.server.ServerService;
import org.apache.openejb.server.ServiceException;
import org.apache.openejb.server.ServicePool;
-import org.apache.openejb.loader.SystemInstance;
+import org.apache.openejb.util.LogCategory;
+import org.apache.openejb.util.Logger;
+import org.apache.openejb.util.Exceptions;
+import org.apache.openejb.client.KeepAliveStyle;
-import java.io.InputStream;
-import java.io.OutputStream;
-import java.io.IOException;
-import java.io.InterruptedIOException;
import java.io.BufferedInputStream;
import java.io.BufferedOutputStream;
+import java.io.IOException;
+import java.io.InputStream;
+import java.io.InterruptedIOException;
+import java.io.OutputStream;
import java.net.Socket;
-import java.util.Properties;
+import java.net.SocketException;
+import java.text.SimpleDateFormat;
+import java.util.Date;
import java.util.Map;
-import java.util.TimerTask;
+import java.util.Properties;
import java.util.Timer;
-import java.util.Date;
-import java.util.Collection;
+import java.util.TimerTask;
+import java.util.concurrent.BlockingQueue;
import java.util.concurrent.ConcurrentHashMap;
import java.util.concurrent.ThreadPoolExecutor;
-import java.util.concurrent.BlockingQueue;
-import java.text.SimpleDateFormat;
+import java.util.concurrent.atomic.AtomicBoolean;
+import java.util.concurrent.locks.Lock;
+import java.util.concurrent.locks.ReentrantLock;
/**
* @version $Rev$ $Date$
*/
public class KeepAliveServer implements ServerService {
+
+ private static final Logger logger = Logger.getInstance(LogCategory.OPENEJB_SERVER.createChild("keepalive"), KeepAliveServer.class);
private final ServerService service;
private final long timeout = (1000 * 3);
- private final KeepAliveTimer keepAliveTimer = new KeepAliveTimer(timeout);
+
+ private final AtomicBoolean stop = new AtomicBoolean();
+ private final KeepAliveTimer keepAliveTimer;
+ private Timer timer;
public KeepAliveServer() {
this(new EjbServer());
@@ -54,26 +66,27 @@
public KeepAliveServer(ServerService service) {
this.service = service;
+ keepAliveTimer = new KeepAliveTimer();
- Timer timer = new Timer("KeepAliveTimer", true);
+ timer = new Timer("KeepAliveTimer", true);
timer.scheduleAtFixedRate(keepAliveTimer, timeout, timeout / 2);
-
}
+ public class KeepAliveTimer extends TimerTask {
- public static class KeepAliveTimer extends TimerTask {
+ // Doesn't need to be a map. Could be a set if Session.equals/hashCode only referenced the Thread.
+ private final Map<Thread, Session> sessions = new ConcurrentHashMap<Thread, Session>();
- private final Map<Thread, Status> statusMap = new ConcurrentHashMap<Thread, Status>();
-
- private final long timeout;
private BlockingQueue<Runnable> queue;
- public KeepAliveTimer(long timeout) {
- this.timeout = timeout;
+ public void run() {
+ if (!stop.get()) {
+ closeInactiveSessions();
+ }
}
- public void run() {
+ private void closeInactiveSessions() {
BlockingQueue<Runnable> queue = getQueue();
if (queue == null) return;
@@ -82,28 +95,50 @@
long now = System.currentTimeMillis();
- Collection<Status> statuses = statusMap.values();
- for (Status status : statuses) {
+ for (Session session : sessions.values()) {
-// System.out.println(""+status);
-
- if (status.isReading() && now - status.getTime() > timeout){
-// System.out.println("Thread Interrupt");
+ if (session.usage.tryLock()) {
try {
- backlog--;
- status.in.close();
- } catch (IOException e) {
- e.printStackTrace();
+ if (now - session.lastRequest > timeout) {
+ try {
+ backlog--;
+ session.socket.close();
+ } catch (IOException e) {
+ logger.info("Error closing socket.", e);
+ } finally {
+ removeSession(session);
+ }
+ }
+ } finally {
+ session.usage.unlock();
}
}
if (backlog <= 0) return;
}
-// System.out.println("exit");
+ }
+
+ public void closeSessions() {
+
+ // Close the ones we can
+ for (Session session : sessions.values()) {
+ if (session.usage.tryLock()) {
+ try {
+ session.socket.close();
+ } catch (IOException e) {
+ logger.info("Error closing socket.", e);
+ } finally {
+ removeSession(session);
+ session.usage.unlock();
+ }
+ } else {
+ logger.info("Allowing graceful shutdown of " + session.socket.getInetAddress());
+ }
+ }
}
private BlockingQueue<Runnable> getQueue() {
- if (queue == null){
+ if (queue == null) {
// this can be null if timer fires before service is fully initialized
ServicePool incoming = SystemInstance.get().getComponent(ServicePool.class);
if (incoming == null) return null;
@@ -113,79 +148,87 @@
return queue;
}
- public Status setStatus(Status status) {
-// System.out.println("status = " + status);
- return statusMap.put(status.getThread(), status);
+ public Session addSession(Session session) {
+ return sessions.put(session.thread, session);
+ }
+
+ public Session removeSession(Session session) {
+ return sessions.remove(session.thread);
}
}
- public static class Status {
- private final long time;
- private final boolean reading;
- private final Thread thread;
- private static final SimpleDateFormat format = new SimpleDateFormat("HH:mm:ss.SSS");
- private final InputStream in;
+ private class Session {
- public boolean isReading() {
- return reading;
- }
+ private final Thread thread;
+ private final Lock usage = new ReentrantLock();
- public Thread getThread() {
- return thread;
- }
+ // only used inside the Lock
+ private long lastRequest;
- public long getTime() {
- return time;
- }
+ // only used inside the Lock
+ private final Socket socket;
- public Status(boolean reading, InputStream in) {
- this.reading = reading;
+ public Session(Socket socket) {
+ this.socket = socket;
+ this.lastRequest = System.currentTimeMillis();
this.thread = Thread.currentThread();
- this.time = System.currentTimeMillis();
- this.in = in;
}
- public String toString() {
- String msg = "";
- if (reading)
- msg += "READING";
- else msg += "WORKING";
- msg += " "+thread.getName();
+ public void service(Socket socket) throws ServiceException, IOException {
+ keepAliveTimer.addSession(this);
- msg += " since "+ format.format(new Date(time));
- return msg;
- }
- }
+ int i = -1;
+ try {
+ InputStream in = new BufferedInputStream(socket.getInputStream());
+ OutputStream out = new BufferedOutputStream(socket.getOutputStream());
- public void service(Socket socket) throws ServiceException, IOException {
- InputStream in = new BufferedInputStream(socket.getInputStream());
- OutputStream out = new BufferedOutputStream(socket.getOutputStream());
+ while (!stop.get()) {
+ try {
+ i = in.read();
+ } catch (SocketException e) {
+ // Socket closed.
+ break;
+ }
+ KeepAliveStyle style = KeepAliveStyle.values()[i];
- try {
- while (true) {
- keepAliveTimer.setStatus(new Status(true, in));
- int i = in.read();
- char c = (char) i;
- if (i == 30){
- keepAliveTimer.setStatus(new Status(false, null));
- service.service(new Input(in), new Output(out));
- out.flush();
- } else {
- keepAliveTimer.setStatus(new Status(false, null));
- break;
+ try {
+ usage.lock();
+
+ switch(style){
+ case PING_PING: {
+ in.read();
+ }
+ break;
+ case PING_PONG: {
+ out.write(style.ordinal());
+ out.flush();
+ }
+ }
+
+ service.service(new Input(in), new Output(out));
+ out.flush();
+ } finally {
+ this.lastRequest = System.currentTimeMillis();
+ usage.unlock();
+ }
}
+ } catch (ArrayIndexOutOfBoundsException e){
+ throw new IOException("Unexpected byte " + i);
+ } catch (InterruptedIOException e) {
+ Thread.interrupted();
+ } finally {
+ keepAliveTimer.removeSession(this);
}
- } catch (InterruptedIOException e) {
- Thread.interrupted();
- } catch (IOException e) {
- } finally{
- keepAliveTimer.setStatus(new Status(false, null));
-// System.out.println("close socket");
- socket.close();
}
}
+
+ public void service(Socket socket) throws ServiceException, IOException {
+ Session session = new Session(socket);
+ session.service(socket);
+ }
+
public void service(InputStream in, OutputStream out) throws ServiceException, IOException {
}
@@ -202,11 +245,16 @@
}
public void start() throws ServiceException {
- service.start();
+ stop.set(false);
+
+// service.start();
}
+
public void stop() throws ServiceException {
- service.stop();
+ stop.set(true);
+ keepAliveTimer.closeSessions();
+// service.stop();
}
public void init(Properties props) throws Exception {
Added: openejb/trunk/openejb3/server/openejb-ejbd/src/test/java/org/apache/openejb/server/ejbd/FailoverTest.java
URL: http://svn.apache.org/viewvc/openejb/trunk/openejb3/server/openejb-ejbd/src/test/java/org/apache/openejb/server/ejbd/FailoverTest.java?rev=693321&view=auto
==============================================================================
--- openejb/trunk/openejb3/server/openejb-ejbd/src/test/java/org/apache/openejb/server/ejbd/FailoverTest.java (added)
+++ openejb/trunk/openejb3/server/openejb-ejbd/src/test/java/org/apache/openejb/server/ejbd/FailoverTest.java Mon Sep 8 17:09:25 2008
@@ -0,0 +1,221 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.openejb.server.ejbd;
+
+import junit.framework.TestCase;
+import org.apache.openejb.OpenEJB;
+import org.apache.openejb.OpenEJBException;
+import org.apache.openejb.assembler.classic.Assembler;
+import org.apache.openejb.config.ConfigurationFactory;
+import org.apache.openejb.core.ServerFederation;
+import org.apache.openejb.jee.EjbJar;
+import org.apache.openejb.jee.StatelessBean;
+import org.apache.openejb.loader.SystemInstance;
+import org.apache.openejb.server.DiscoveryAgent;
+import org.apache.openejb.server.DiscoveryListener;
+import org.apache.openejb.server.ServerService;
+import org.apache.openejb.server.ServerServiceFilter;
+import org.apache.openejb.server.ServiceDaemon;
+import org.apache.openejb.server.ServiceException;
+
+import javax.ejb.Remote;
+import javax.naming.Context;
+import javax.naming.InitialContext;
+import javax.naming.NamingException;
+import java.io.IOException;
+import java.io.InputStream;
+import java.io.OutputStream;
+import java.net.Socket;
+import java.net.URI;
+import java.util.ArrayList;
+import java.util.List;
+import java.util.Properties;
+
+/**
+ * @version $Rev$ $Date$
+ */
+public class FailoverTest extends TestCase {
+
+ private static DiscoveryAgent agent = new TestAgent();
+
+ public void test() throws Exception {
+
+ Properties initProps = new Properties();
+ initProps.setProperty("openejb.deployments.classpath.include", "");
+ initProps.setProperty("openejb.deployments.classpath.filter.descriptors", "true");
+ OpenEJB.init(initProps, new ServerFederation());
+
+ SystemInstance.get().setComponent(DiscoveryAgent.class, agent);
+
+ ServerService red = server(Host.RED);
+ ServerService blue = server(Host.BLUE);
+ ServerService green = server(Host.GREEN);
+
+ red.start();
+ blue.start();
+ green.start();
+
+ TargetRemote target = getBean(red);
+
+ assertEquals(Host.RED, target.getHost());
+
+ red.stop();
+
+ assertEquals(Host.BLUE, target.getHost());
+
+ blue.stop();
+
+ assertEquals(Host.GREEN, target.getHost());
+ }
+
+ private TargetRemote getBean(ServerService server) throws NamingException, IOException, OpenEJBException {
+ int port = server.getPort();
+
+ Assembler assembler = SystemInstance.get().getComponent(Assembler.class);
+ ConfigurationFactory config = new ConfigurationFactory();
+
+ EjbJar ejbJar = new EjbJar();
+ ejbJar.addEnterpriseBean(new StatelessBean(Target.class));
+ assembler.createApplication(config.configureApplication(ejbJar));
+
+ // good creds
+ Properties props = new Properties();
+ props.put("java.naming.factory.initial", "org.apache.openejb.client.RemoteInitialContextFactory");
+ props.put("java.naming.provider.url", "ejbd://localhost:" + port + "/RED");
+ System.setProperty("openejb.client.keepalive", "ping_pong");
+ Context context = new InitialContext(props);
+ TargetRemote target = (TargetRemote) context.lookup("TargetRemote");
+ return target;
+ }
+
+ private ServerService server(Host host) throws Exception {
+ ServerService server = new EjbServer();
+
+ server = new HostFilter(server, host);
+
+ server = new ServiceDaemon(server, 0, "localhost");
+
+ server = new AgentFilter(server, agent, host);
+
+ server.init(new Properties());
+
+ return server;
+ }
+
+
+ // Simple single-threaded version, way easier on testing
+ public static class TestAgent implements DiscoveryAgent {
+
+ private final List<DiscoveryListener> listeners = new ArrayList<DiscoveryListener>();
+
+ public void registerService(URI serviceUri) throws IOException {
+ for (DiscoveryListener listener : listeners) {
+ listener.serviceAdded(serviceUri);
+ }
+ }
+
+ public void reportFailed(URI serviceUri) throws IOException {
+ }
+
+ public void setDiscoveryListener(DiscoveryListener listener) {
+ listeners.add(listener);
+ }
+
+ public void unregisterService(URI serviceUri) throws IOException {
+ for (DiscoveryListener listener : listeners) {
+ listener.serviceRemoved(serviceUri);
+ }
+ }
+
+ }
+
+ public static enum Host {
+ RED, BLUE, GREEN;
+ }
+
+ public static ThreadLocal<Host> host = new ThreadLocal<Host>();
+
+
+ public static class AgentFilter extends ServerServiceFilter {
+ private final Host host;
+ private final DiscoveryAgent agent;
+ private URI uri;
+
+ public AgentFilter(ServerService service, DiscoveryAgent agent, Host host) {
+ super(service);
+ this.agent = agent;
+ this.host = host;
+ }
+
+ public void start() throws ServiceException {
+ super.start();
+ try {
+ uri = new URI("ejb:ejbd://localhost:" + getPort() + "/" + host);
+ agent.registerService(uri);
+ } catch (Exception e) {
+ throw new ServiceException(e);
+ }
+ }
+
+ public void stop() throws ServiceException {
+ super.stop();
+ try {
+ agent.unregisterService(uri);
+ } catch (Exception e) {
+ throw new ServiceException(e);
+ }
+ }
+ }
+
+ public static class HostFilter extends ServerServiceFilter {
+ private final Host me;
+
+ public HostFilter(ServerService service, Host me) {
+ super(service);
+ this.me = me;
+ }
+
+ public void service(InputStream in, OutputStream out) throws ServiceException, IOException {
+ try {
+ host.set(me);
+ super.service(in, out);
+ } finally {
+ host.remove();
+ }
+ }
+
+ public void service(Socket socket) throws ServiceException, IOException {
+ try {
+ host.set(me);
+ super.service(socket);
+ } finally {
+ host.remove();
+ }
+ }
+ }
+
+ public static class Target implements TargetRemote {
+ public Host getHost() {
+ return host.get();
+ }
+ }
+
+ @Remote
+ public static interface TargetRemote {
+ Host getHost();
+ }
+}
Copied: openejb/trunk/openejb3/server/openejb-server/src/main/java/org/apache/openejb/server/DiscoveryAgent.java (from r691472, openejb/trunk/openejb3/server/openejb-discovery/src/main/java/org/apache/openejb/server/discovery/DiscoveryAgent.java)
URL: http://svn.apache.org/viewvc/openejb/trunk/openejb3/server/openejb-server/src/main/java/org/apache/openejb/server/DiscoveryAgent.java?p2=openejb/trunk/openejb3/server/openejb-server/src/main/java/org/apache/openejb/server/DiscoveryAgent.java&p1=openejb/trunk/openejb3/server/openejb-discovery/src/main/java/org/apache/openejb/server/discovery/DiscoveryAgent.java&r1=691472&r2=693321&rev=693321&view=diff
==============================================================================
--- openejb/trunk/openejb3/server/openejb-discovery/src/main/java/org/apache/openejb/server/discovery/DiscoveryAgent.java (original)
+++ openejb/trunk/openejb3/server/openejb-server/src/main/java/org/apache/openejb/server/DiscoveryAgent.java Mon Sep 8 17:09:25 2008
@@ -14,7 +14,7 @@
* See the License for the specific language governing permissions and
* limitations under the License.
*/
-package org.apache.openejb.server.discovery;
+package org.apache.openejb.server;
import java.net.URI;
import java.io.IOException;
Copied: openejb/trunk/openejb3/server/openejb-server/src/main/java/org/apache/openejb/server/DiscoveryListener.java (from r691472, openejb/trunk/openejb3/server/openejb-discovery/src/main/java/org/apache/openejb/server/discovery/DiscoveryListener.java)
URL: http://svn.apache.org/viewvc/openejb/trunk/openejb3/server/openejb-server/src/main/java/org/apache/openejb/server/DiscoveryListener.java?p2=openejb/trunk/openejb3/server/openejb-server/src/main/java/org/apache/openejb/server/DiscoveryListener.java&p1=openejb/trunk/openejb3/server/openejb-discovery/src/main/java/org/apache/openejb/server/discovery/DiscoveryListener.java&r1=691472&r2=693321&rev=693321&view=diff
==============================================================================
--- openejb/trunk/openejb3/server/openejb-discovery/src/main/java/org/apache/openejb/server/discovery/DiscoveryListener.java (original)
+++ openejb/trunk/openejb3/server/openejb-server/src/main/java/org/apache/openejb/server/DiscoveryListener.java Mon Sep 8 17:09:25 2008
@@ -14,7 +14,7 @@
* See the License for the specific language governing permissions and
* limitations under the License.
*/
-package org.apache.openejb.server.discovery;
+package org.apache.openejb.server;
import java.net.URI;
@@ -23,7 +23,7 @@
/**
* @version $Rev$ $Date$
*/
-public interface DiscoveryListener {
+public interface DiscoveryListener {
public void serviceAdded(URI service);
public void serviceRemoved(URI service);
Added: openejb/trunk/openejb3/server/openejb-server/src/main/java/org/apache/openejb/server/DiscoveryRegistry.java
URL: http://svn.apache.org/viewvc/openejb/trunk/openejb3/server/openejb-server/src/main/java/org/apache/openejb/server/DiscoveryRegistry.java?rev=693321&view=auto
==============================================================================
--- openejb/trunk/openejb3/server/openejb-server/src/main/java/org/apache/openejb/server/DiscoveryRegistry.java (added)
+++ openejb/trunk/openejb3/server/openejb-server/src/main/java/org/apache/openejb/server/DiscoveryRegistry.java Mon Sep 8 17:09:25 2008
@@ -0,0 +1,145 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.openejb.server;
+
+import org.apache.openejb.loader.SystemInstance;
+
+import java.net.URI;
+import java.util.List;
+import java.util.ArrayList;
+import java.util.Collections;
+import java.util.Map;
+import java.util.Set;
+import java.util.HashSet;
+import java.util.concurrent.Executor;
+import java.util.concurrent.ThreadPoolExecutor;
+import java.util.concurrent.TimeUnit;
+import java.util.concurrent.LinkedBlockingQueue;
+import java.util.concurrent.ThreadFactory;
+import java.util.concurrent.ConcurrentHashMap;
+import java.io.IOException;
+
+/**
+ * @version $Rev$ $Date$
+ */
+public class DiscoveryRegistry implements DiscoveryListener, DiscoveryAgent {
+
+ private final DiscoveryAgent agent;
+ private final List<DiscoveryListener> listeners = new ArrayList<DiscoveryListener>();
+ private final Map<String, URI> services = new ConcurrentHashMap<String, URI>();
+
+ private final Executor executor = new ThreadPoolExecutor(1, 10, 30, TimeUnit.SECONDS, new LinkedBlockingQueue<Runnable>(), new ThreadFactory() {
+ public Thread newThread(Runnable runable) {
+ Thread t = new Thread(runable, DiscoveryRegistry.class.getSimpleName());
+ t.setDaemon(true);
+ return t;
+ }
+ });
+
+ public DiscoveryRegistry(DiscoveryAgent agent) {
+ this.agent = agent;
+ agent.setDiscoveryListener(this);
+ SystemInstance.get().setComponent(DiscoveryRegistry.class, this);
+ SystemInstance.get().setComponent(DiscoveryAgent.class, this);
+ }
+
+ public Set<URI> getServices() {
+ return new HashSet<URI>(services.values());
+ }
+
+ public void registerService(URI serviceUri) throws IOException {
+ agent.registerService(serviceUri);
+ }
+
+ public void reportFailed(URI serviceUri) throws IOException {
+ agent.reportFailed(serviceUri);
+ }
+
+ public void unregisterService(URI serviceUri) throws IOException {
+ agent.unregisterService(serviceUri);
+ }
+
+ public void setDiscoveryListener(DiscoveryListener listener) {
+ addDiscoveryListener(listener);
+ }
+
+ public void addDiscoveryListener(DiscoveryListener listener){
+ // get the listener caught up
+ for (URI service : services.values()) {
+ executor.execute(new ServiceAddedTask(listener, service));
+ }
+
+ listeners.add(listener);
+ }
+
+ public void removeDiscoveryListener(DiscoveryListener listener){
+ listeners.remove(listener);
+ }
+
+
+ public void serviceAdded(URI service) {
+ services.put(service.toString(), service);
+ for (final DiscoveryListener discoveryListener : getListeners()) {
+ executor.execute(new ServiceAddedTask(discoveryListener, service));
+ }
+ }
+
+ public void serviceRemoved(URI service) {
+
+ for (final DiscoveryListener discoveryListener : getListeners()) {
+ executor.execute(new ServiceRemovedTask(discoveryListener, service));
+ }
+ }
+
+ List<DiscoveryListener> getListeners(){
+ return Collections.unmodifiableList(listeners);
+ }
+
+ private abstract static class Task implements Runnable {
+ protected final DiscoveryListener discoveryListener;
+ protected final URI service;
+
+ protected Task(DiscoveryListener discoveryListener, URI service) {
+ this.discoveryListener = discoveryListener;
+ this.service = service;
+ }
+ }
+
+ private static class ServiceRemovedTask extends Task {
+ public ServiceRemovedTask(DiscoveryListener discoveryListener, URI service) {
+ super(discoveryListener, service);
+ }
+
+ public void run() {
+ if (discoveryListener != null) {
+ discoveryListener.serviceRemoved(service);
+ }
+ }
+ }
+
+ private static class ServiceAddedTask extends Task {
+ public ServiceAddedTask(DiscoveryListener discoveryListener, URI service) {
+ super(discoveryListener, service);
+ }
+
+ public void run() {
+ if (discoveryListener != null) {
+ discoveryListener.serviceAdded(service);
+ }
+ }
+ }
+}
Added: openejb/trunk/openejb3/server/openejb-server/src/main/java/org/apache/openejb/server/ServerServiceFilter.java
URL: http://svn.apache.org/viewvc/openejb/trunk/openejb3/server/openejb-server/src/main/java/org/apache/openejb/server/ServerServiceFilter.java?rev=693321&view=auto
==============================================================================
--- openejb/trunk/openejb3/server/openejb-server/src/main/java/org/apache/openejb/server/ServerServiceFilter.java (added)
+++ openejb/trunk/openejb3/server/openejb-server/src/main/java/org/apache/openejb/server/ServerServiceFilter.java Mon Sep 8 17:09:25 2008
@@ -0,0 +1,70 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.openejb.server;
+
+import org.apache.openejb.server.ServerService;
+import org.apache.openejb.server.ServiceException;
+
+import java.io.InputStream;
+import java.io.OutputStream;
+import java.io.IOException;
+import java.net.Socket;
+import java.util.Properties;
+
+/**
+ * TODO: Make this the superclass of the appropriate ServerService implementations
+ * @version $Rev$ $Date$
+ */
+public class ServerServiceFilter implements ServerService {
+ private final ServerService service;
+
+ public ServerServiceFilter(ServerService service) {
+ this.service = service;
+ }
+
+ public String getIP() {
+ return service.getIP();
+ }
+
+ public String getName() {
+ return service.getName();
+ }
+
+ public int getPort() {
+ return service.getPort();
+ }
+
+ public void service(InputStream in, OutputStream out) throws ServiceException, IOException {
+ service.service(in, out);
+ }
+
+ public void service(Socket socket) throws ServiceException, IOException {
+ service.service(socket);
+ }
+
+ public void start() throws ServiceException {
+ service.start();
+ }
+
+ public void stop() throws ServiceException {
+ service.stop();
+ }
+
+ public void init(Properties props) throws Exception {
+ service.init(props);
+ }
+}
Modified: openejb/trunk/openejb3/server/openejb-server/src/main/java/org/apache/openejb/server/ServiceDaemon.java
URL: http://svn.apache.org/viewvc/openejb/trunk/openejb3/server/openejb-server/src/main/java/org/apache/openejb/server/ServiceDaemon.java?rev=693321&r1=693320&r2=693321&view=diff
==============================================================================
--- openejb/trunk/openejb3/server/openejb-server/src/main/java/org/apache/openejb/server/ServiceDaemon.java (original)
+++ openejb/trunk/openejb3/server/openejb-server/src/main/java/org/apache/openejb/server/ServiceDaemon.java Mon Sep 8 17:09:25 2008
@@ -32,6 +32,12 @@
import java.net.SocketTimeoutException;
import java.net.UnknownHostException;
import java.util.Properties;
+import java.util.concurrent.atomic.AtomicBoolean;
+import java.util.concurrent.locks.ReentrantReadWriteLock;
+import java.util.concurrent.locks.ReadWriteLock;
+import java.util.concurrent.locks.Lock;
+import java.util.concurrent.locks.ReentrantLock;
+import java.util.concurrent.TimeUnit;
/**
*/
@@ -143,7 +149,7 @@
secure = getBoolean(props, "secure", false);
timeout = 1000;
-
+
next.init(props);
}
@@ -185,11 +191,11 @@
public void stop() throws ServiceException {
synchronized (this) {
+ next.stop();
if (socketListener != null) {
socketListener.stop();
socketListener = null;
}
- next.stop();
}
}
@@ -216,35 +222,49 @@
}
private static class SocketListener implements Runnable {
- private ServerService serverService;
- private ServerSocket serverSocket;
- private boolean stopped;
+ private final ServerService serverService;
+ private final ServerSocket serverSocket;
+ private AtomicBoolean stop = new AtomicBoolean();
+ private Lock lock = new ReentrantLock();
public SocketListener(ServerService serverService, ServerSocket serverSocket) {
this.serverService = serverService;
this.serverSocket = serverSocket;
- stopped = false;
}
- public synchronized void stop() {
- stopped = true;
- }
-
- private synchronized boolean shouldStop() {
- return stopped;
+ public void stop() {
+ stop.set(true);
+ try {
+ if (lock.tryLock(10, TimeUnit.SECONDS)){
+ serverSocket.close();
+ }
+ } catch (InterruptedException e) {
+ Thread.interrupted();
+ } catch (IOException e) {
+ }
}
public void run() {
- while (!shouldStop()) {
+ while (!stop.get()) {
Socket socket = null;
try {
socket = serverSocket.accept();
socket.setTcpNoDelay(true);
- if (!shouldStop()) {
+ if (!stop.get()) {
// the server service is responsible
// for closing the socket.
- serverService.service(socket);
+ try {
+ lock.lock();
+ serverService.service(socket);
+ } finally {
+ lock.unlock();
+ }
}
+
+ // Sockets are consumed in other threads
+ // and should never be closed here
+ // It's up to the consumer of the socket
+ // to close it.
} catch (SocketTimeoutException e) {
// we don't really care
// log.debug("Socket timed-out",e);
@@ -253,15 +273,11 @@
}
}
- if (serverSocket != null) {
- try {
- serverSocket.close();
- } catch (IOException ioException) {
- log.debug("Error cleaning up socked", ioException);
- }
- serverSocket = null;
+ try {
+ serverSocket.close();
+ } catch (IOException ioException) {
+ log.debug("Error cleaning up socked", ioException);
}
- serverService = null;
}
public void setSoTimeout(int timeout) throws SocketException {
Modified: openejb/trunk/openejb3/server/openejb-server/src/main/java/org/apache/openejb/server/ServiceManager.java
URL: http://svn.apache.org/viewvc/openejb/trunk/openejb3/server/openejb-server/src/main/java/org/apache/openejb/server/ServiceManager.java?rev=693321&r1=693320&r2=693321&view=diff
==============================================================================
--- openejb/trunk/openejb3/server/openejb-server/src/main/java/org/apache/openejb/server/ServiceManager.java (original)
+++ openejb/trunk/openejb3/server/openejb-server/src/main/java/org/apache/openejb/server/ServiceManager.java Mon Sep 8 17:09:25 2008
@@ -44,6 +44,8 @@
import java.util.Map;
import java.util.Properties;
import java.util.Set;
+import java.util.LinkedHashMap;
+import java.util.Collections;
/**
* @version $Rev$ $Date$
@@ -144,7 +146,8 @@
ServiceFinder serviceFinder = new ServiceFinder("META-INF/");
- Map availableServices = serviceFinder.mapAvailableServices(ServerService.class);
+ Map<String, Properties> availableServices = serviceFinder.mapAvailableServices(ServerService.class);
+
List enabledServers = new ArrayList();
OpenEjbConfiguration conf = SystemInstance.get().getComponent(OpenEjbConfiguration.class);
@@ -198,6 +201,12 @@
service = (ServerService) recipe.create(serviceClass.getClassLoader());
+ if (service instanceof DiscoveryAgent){
+ DiscoveryAgent agent = (DiscoveryAgent) service;
+ DiscoveryRegistry registry = new DiscoveryRegistry(agent);
+ SystemInstance.get().setComponent(DiscoveryRegistry.class, registry);
+ }
+
if (!(service instanceof SelfManaging)) {
service = new ServicePool(service, serviceName, serviceProperties);
service = new ServiceLogger(service);
Modified: openejb/trunk/openejb3/server/openejb-server/src/main/java/org/apache/openejb/server/ServicePool.java
URL: http://svn.apache.org/viewvc/openejb/trunk/openejb3/server/openejb-server/src/main/java/org/apache/openejb/server/ServicePool.java?rev=693321&r1=693320&r2=693321&view=diff
==============================================================================
--- openejb/trunk/openejb3/server/openejb-server/src/main/java/org/apache/openejb/server/ServicePool.java (original)
+++ openejb/trunk/openejb3/server/openejb-server/src/main/java/org/apache/openejb/server/ServicePool.java Mon Sep 8 17:09:25 2008
@@ -23,6 +23,7 @@
import java.util.concurrent.ThreadPoolExecutor;
import java.util.concurrent.ThreadFactory;
import java.util.concurrent.TimeUnit;
+import java.util.concurrent.atomic.AtomicBoolean;
import org.apache.openejb.util.LogCategory;
import org.apache.openejb.util.Logger;
@@ -40,6 +41,7 @@
private final ServerService next;
private final Executor executor;
private final ThreadPoolExecutor threadPool;
+ private final AtomicBoolean stop = new AtomicBoolean();
public ServicePool(ServerService next, String name, Properties properties) {
this(next, name, getInt(properties, "threads", 100));
@@ -86,6 +88,7 @@
final Runnable service = new Runnable() {
public void run() {
try {
+ if (stop.get()) return;
next.service(socket);
} catch (SecurityException e) {
log.error("Security error: " + e.getMessage(), e);
@@ -93,6 +96,12 @@
log.error("Unexpected error", e);
} finally {
try {
+ // Once the thread is done with the socket, clean it up
+ // The ServiceDaemon does not close the sockets as it is
+ // single threaded and only accepts sockets and then
+ // hands them off to be proceeceed. As the thread doing
+ // that processing it is our job to close the socket
+ // when we are finished with it.
if (socket != null) {
socket.close();
}