You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@tomee.apache.org by db...@apache.org on 2008/08/31 05:23:04 UTC
svn commit: r690629 - in /openejb/trunk/openejb3/server: ./
openejb-client/src/main/java/org/apache/openejb/client/
openejb-client/src/test/java/org/apache/openejb/client/ openejb-discovery/
openejb-discovery/src/ openejb-discovery/src/main/java/org/ap...
Author: dblevins
Date: Sat Aug 30 20:23:03 2008
New Revision: 690629
URL: http://svn.apache.org/viewvc?rev=690629&view=rev
Log:
OPENEJB-904: Pluggable Client/Server connection strategies and factories
OPENEJB-903: Multicast discovery and grouping
903 is only partially complete
Added:
openejb/trunk/openejb3/server/openejb-client/src/main/java/org/apache/openejb/client/ConnectionStrategy.java
- copied, changed from r689515, openejb/trunk/openejb3/server/openejb-client/src/main/java/org/apache/openejb/client/ConnectionFactoryStrategy.java
openejb/trunk/openejb3/server/openejb-client/src/main/java/org/apache/openejb/client/MulticastSearch.java
openejb/trunk/openejb3/server/openejb-client/src/main/java/org/apache/openejb/client/Registry.java
openejb/trunk/openejb3/server/openejb-client/src/main/java/org/apache/openejb/client/StickyConnectionStrategy.java
- copied, changed from r689515, openejb/trunk/openejb3/server/openejb-client/src/main/java/org/apache/openejb/client/StickToLastServerConnectionFactoryStrategy.java
openejb/trunk/openejb3/server/openejb-client/src/test/java/org/apache/openejb/client/StickyConnectionStrategyTest.java
- copied, changed from r689515, openejb/trunk/openejb3/server/openejb-client/src/test/java/org/apache/openejb/client/StickToLastServerConnectionFactoryStrategyTest.java
openejb/trunk/openejb3/server/openejb-discovery/
- copied from r689515, openejb/trunk/openejb3/server/openejb-admin/
openejb/trunk/openejb3/server/openejb-discovery/pom.xml
- copied, changed from r690620, openejb/trunk/openejb3/server/openejb-admin/pom.xml
openejb/trunk/openejb3/server/openejb-discovery/src/
- copied from r690620, openejb/trunk/openejb3/server/openejb-admin/src/
openejb/trunk/openejb3/server/openejb-discovery/src/main/java/org/apache/openejb/server/
openejb/trunk/openejb3/server/openejb-discovery/src/main/java/org/apache/openejb/server/discovery/
openejb/trunk/openejb3/server/openejb-discovery/src/main/java/org/apache/openejb/server/discovery/DiscoveryAgent.java
openejb/trunk/openejb3/server/openejb-discovery/src/main/java/org/apache/openejb/server/discovery/DiscoveryListener.java
openejb/trunk/openejb3/server/openejb-discovery/src/main/java/org/apache/openejb/server/discovery/MulticastDiscoveryAgent.java
openejb/trunk/openejb3/server/openejb-discovery/src/main/java/org/apache/openejb/server/discovery/MulticastSearch.java
openejb/trunk/openejb3/server/openejb-discovery/src/main/resources/
openejb/trunk/openejb3/server/openejb-discovery/src/main/resources/META-INF/
openejb/trunk/openejb3/server/openejb-discovery/src/main/resources/META-INF/org.apache.openejb.server.ServersService/
openejb/trunk/openejb3/server/openejb-discovery/src/main/resources/META-INF/org.apache.openejb.server.ServersService/multicast
openejb/trunk/openejb3/server/openejb-discovery/src/test/java/org/apache/openejb/server/
openejb/trunk/openejb3/server/openejb-discovery/src/test/java/org/apache/openejb/server/discovery/
openejb/trunk/openejb3/server/openejb-discovery/src/test/java/org/apache/openejb/server/discovery/MulticastDiscoveryAgentTest.java
Removed:
openejb/trunk/openejb3/server/openejb-client/src/main/java/org/apache/openejb/client/ConnectionFactoryStrategy.java
openejb/trunk/openejb3/server/openejb-client/src/main/java/org/apache/openejb/client/StickToLastServerConnectionFactoryStrategy.java
openejb/trunk/openejb3/server/openejb-client/src/test/java/org/apache/openejb/client/StickToLastServerConnectionFactoryStrategyTest.java
Modified:
openejb/trunk/openejb3/server/openejb-client/src/main/java/org/apache/openejb/client/Client.java
openejb/trunk/openejb3/server/openejb-client/src/main/java/org/apache/openejb/client/ConnectionFactory.java
openejb/trunk/openejb3/server/openejb-client/src/main/java/org/apache/openejb/client/ConnectionManager.java
openejb/trunk/openejb3/server/openejb-client/src/main/java/org/apache/openejb/client/HttpConnectionFactory.java
openejb/trunk/openejb3/server/openejb-client/src/main/java/org/apache/openejb/client/ServerMetaData.java
openejb/trunk/openejb3/server/openejb-client/src/main/java/org/apache/openejb/client/SocketConnectionFactory.java
openejb/trunk/openejb3/server/openejb-client/src/test/java/org/apache/openejb/client/ConnectionManagerTest.java
openejb/trunk/openejb3/server/openejb-server/src/main/java/org/apache/openejb/server/ServiceDaemon.java
openejb/trunk/openejb3/server/pom.xml
Modified: openejb/trunk/openejb3/server/openejb-client/src/main/java/org/apache/openejb/client/Client.java
URL: http://svn.apache.org/viewvc/openejb/trunk/openejb3/server/openejb-client/src/main/java/org/apache/openejb/client/Client.java?rev=690629&r1=690628&r2=690629&view=diff
==============================================================================
--- openejb/trunk/openejb3/server/openejb-client/src/main/java/org/apache/openejb/client/Client.java (original)
+++ openejb/trunk/openejb3/server/openejb-client/src/main/java/org/apache/openejb/client/Client.java Sat Aug 30 20:23:03 2008
@@ -51,7 +51,7 @@
/*----------------------------*/
/* Get a connection to server */
/*----------------------------*/
- conn = server.connect(req);
+ conn = ConnectionManager.getConnection(server);
/*----------------------------------*/
/* Get output streams */
Modified: openejb/trunk/openejb3/server/openejb-client/src/main/java/org/apache/openejb/client/ConnectionFactory.java
URL: http://svn.apache.org/viewvc/openejb/trunk/openejb3/server/openejb-client/src/main/java/org/apache/openejb/client/ConnectionFactory.java?rev=690629&r1=690628&r2=690629&view=diff
==============================================================================
--- openejb/trunk/openejb3/server/openejb-client/src/main/java/org/apache/openejb/client/ConnectionFactory.java (original)
+++ openejb/trunk/openejb3/server/openejb-client/src/main/java/org/apache/openejb/client/ConnectionFactory.java Sat Aug 30 20:23:03 2008
@@ -21,8 +21,6 @@
public interface ConnectionFactory {
- public void init(Properties props);
-
public Connection getConnection(URI uri) throws java.io.IOException;
}
Modified: openejb/trunk/openejb3/server/openejb-client/src/main/java/org/apache/openejb/client/ConnectionManager.java
URL: http://svn.apache.org/viewvc/openejb/trunk/openejb3/server/openejb-client/src/main/java/org/apache/openejb/client/ConnectionManager.java?rev=690629&r1=690628&r2=690629&view=diff
==============================================================================
--- openejb/trunk/openejb3/server/openejb-client/src/main/java/org/apache/openejb/client/ConnectionManager.java (original)
+++ openejb/trunk/openejb3/server/openejb-client/src/main/java/org/apache/openejb/client/ConnectionManager.java Sat Aug 30 20:23:03 2008
@@ -17,88 +17,72 @@
package org.apache.openejb.client;
import java.io.IOException;
-import java.util.Properties;
import java.net.URI;
public class ConnectionManager {
- private static ConnectionFactory factory;
+ private static Registry<ConnectionFactory> factories = Registry.create(ConnectionFactory.class);
+ private static Registry<ConnectionStrategy> strategies = Registry.create(ConnectionStrategy.class);
- private static Class defaultFactoryClass = SocketConnectionFactory.class;
+ static {
+ SocketConnectionFactory ejbdFactory = new SocketConnectionFactory();
- private static String factoryName;
+ factories.register("default", ejbdFactory);
+ factories.register("ejbd", ejbdFactory);
- static {
- try {
- installFactory(defaultFactoryClass.getName());
- } catch (Exception e) {
+ HttpConnectionFactory httpFactory = new HttpConnectionFactory();
+ factories.register("http", httpFactory);
+ factories.register("https", httpFactory);
- }
+ strategies.register("default", new StickyConnectionStrategy());
}
- public static Connection getConnection(URI uri) throws IOException {
- if (uri.getScheme().equals("http") || uri.getScheme().equals("https")) {
- return new HttpConnectionFactory().getConnection(uri);
- } else {
- return factory.getConnection(uri);
- }
- }
- public static void setFactory(String factoryName) throws IOException {
- installFactory(factoryName);
- }
+ public static Connection getConnection(ServerMetaData serverMetaData) throws IOException {
+ String name = serverMetaData.getConnectionStrategy();
- public static void setFactory(ConnectionFactory factory) throws IOException {
- if (null == factory) {
- throw new IllegalArgumentException("factory is required");
- }
- ConnectionManager.factory = factory;
- factoryName = factory.getClass().getName();
- }
-
- public static ConnectionFactory getFactory() {
- return factory;
- }
+ if (name == null) name = "default";
+
+ ConnectionStrategy strategy = strategies.get(name);
- public static String getFactoryName() {
- return factoryName;
+ if (strategy == null) throw new IOException("Unsupported ConnectionStrategy \"" + name + "\"");
+
+
+ return strategy.connect(serverMetaData);
}
- private static void installFactory(String factoryName) throws IOException {
+ public static Connection getConnection(URI uri) throws IOException {
+ String scheme = uri.getScheme();
- Class factoryClass = null;
- ConnectionFactory factory = null;
+ ConnectionFactory factory = factories.get(scheme);
- try {
- ClassLoader cl = getContextClassLoader();
- factoryClass = Class.forName(factoryName, true, cl);
- } catch (Exception e) {
- throw new IOException("No ConnectionFactory Can be installed. Unable to load the class " + factoryName);
- }
+ if (factory == null) throw new IOException("Unsupported ConnectionFactory URI scheme \"" + scheme + "\"");
- try {
- factory = (ConnectionFactory) factoryClass.newInstance();
- } catch (Exception e) {
- throw new IOException("No ConnectionFactory Can be installed. Unable to instantiate the class " + factoryName);
- }
+ return factory.getConnection(uri);
+ }
- try {
+ public static void registerFactory(String scheme, ConnectionFactory factory) {
+ factories.register(scheme, factory);
+ }
- factory.init(new Properties());
- } catch (Exception e) {
- throw new IOException("No ConnectionFactory Can be installed. Unable to initialize the class " + factoryName);
- }
+ public static ConnectionFactory unregisterFactory(String scheme) {
+ return factories.unregister(scheme);
+ }
- ConnectionManager.factory = factory;
- ConnectionManager.factoryName = factoryName;
+ public static void registerStrategy(String scheme, ConnectionStrategy factory) {
+ strategies.register(scheme, factory);
}
- public static ClassLoader getContextClassLoader() {
- return (ClassLoader) java.security.AccessController.doPrivileged(new java.security.PrivilegedAction() {
- public Object run() {
- return Thread.currentThread().getContextClassLoader();
- }
- });
+ public static ConnectionStrategy unregisterStrategy(String scheme) {
+ return strategies.unregister(scheme);
}
+ /**
+ * @param factory
+ * @throws IOException
+ * @Depricated use register("default", factory);
+ */
+ public static void setFactory(ConnectionFactory factory) throws IOException {
+ registerFactory("default", factory);
+ }
}
Copied: openejb/trunk/openejb3/server/openejb-client/src/main/java/org/apache/openejb/client/ConnectionStrategy.java (from r689515, openejb/trunk/openejb3/server/openejb-client/src/main/java/org/apache/openejb/client/ConnectionFactoryStrategy.java)
URL: http://svn.apache.org/viewvc/openejb/trunk/openejb3/server/openejb-client/src/main/java/org/apache/openejb/client/ConnectionStrategy.java?p2=openejb/trunk/openejb3/server/openejb-client/src/main/java/org/apache/openejb/client/ConnectionStrategy.java&p1=openejb/trunk/openejb3/server/openejb-client/src/main/java/org/apache/openejb/client/ConnectionFactoryStrategy.java&r1=689515&r2=690629&rev=690629&view=diff
==============================================================================
--- openejb/trunk/openejb3/server/openejb-client/src/main/java/org/apache/openejb/client/ConnectionFactoryStrategy.java (original)
+++ openejb/trunk/openejb3/server/openejb-client/src/main/java/org/apache/openejb/client/ConnectionStrategy.java Sat Aug 30 20:23:03 2008
@@ -16,10 +16,8 @@
*/
package org.apache.openejb.client;
-import java.io.Serializable;
-import java.net.URI;
-import java.rmi.RemoteException;
+import java.io.IOException;
-public interface ConnectionFactoryStrategy extends Serializable {
- Connection connect(URI[] locations, Request request) throws RemoteException;
+public interface ConnectionStrategy {
+ public Connection connect(ServerMetaData server) throws IOException;
}
Modified: openejb/trunk/openejb3/server/openejb-client/src/main/java/org/apache/openejb/client/HttpConnectionFactory.java
URL: http://svn.apache.org/viewvc/openejb/trunk/openejb3/server/openejb-client/src/main/java/org/apache/openejb/client/HttpConnectionFactory.java?rev=690629&r1=690628&r2=690629&view=diff
==============================================================================
--- openejb/trunk/openejb3/server/openejb-client/src/main/java/org/apache/openejb/client/HttpConnectionFactory.java (original)
+++ openejb/trunk/openejb3/server/openejb-client/src/main/java/org/apache/openejb/client/HttpConnectionFactory.java Sat Aug 30 20:23:03 2008
@@ -31,9 +31,6 @@
*/
public class HttpConnectionFactory implements ConnectionFactory {
- public void init(Properties props) {
- }
-
public Connection getConnection(URI uri) throws IOException {
return new HttpConnection(uri);
}
Added: openejb/trunk/openejb3/server/openejb-client/src/main/java/org/apache/openejb/client/MulticastSearch.java
URL: http://svn.apache.org/viewvc/openejb/trunk/openejb3/server/openejb-client/src/main/java/org/apache/openejb/client/MulticastSearch.java?rev=690629&view=auto
==============================================================================
--- openejb/trunk/openejb3/server/openejb-client/src/main/java/org/apache/openejb/client/MulticastSearch.java (added)
+++ openejb/trunk/openejb3/server/openejb-client/src/main/java/org/apache/openejb/client/MulticastSearch.java Sat Aug 30 20:23:03 2008
@@ -0,0 +1,95 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.openejb.client;
+
+import java.net.URI;
+import java.net.MulticastSocket;
+import java.net.InetAddress;
+import java.net.DatagramPacket;
+import java.util.concurrent.TimeUnit;
+import java.io.IOException;
+
+/**
+ * @version $Rev$ $Date$
+ */
+public class MulticastSearch {
+
+ private static final int BUFF_SIZE = 8192;
+
+ private final MulticastSocket multicast;
+
+ public MulticastSearch() throws IOException {
+ this("239.255.2.3", 6142);
+ }
+
+ public MulticastSearch(String host, int port) throws IOException {
+ InetAddress inetAddress = InetAddress.getByName(host);
+
+ multicast = new MulticastSocket(port);
+ multicast.joinGroup(inetAddress);
+ multicast.setSoTimeout(500);
+ }
+
+ public URI search(int timeout, TimeUnit milliseconds) throws IOException {
+ return search(new DefaultFilter(), timeout, milliseconds);
+ }
+
+ public URI search() throws IOException {
+ return search(new DefaultFilter(), 0, TimeUnit.MILLISECONDS);
+ }
+
+ public URI search(Filter filter) throws IOException {
+ return search(filter, 0, TimeUnit.MILLISECONDS);
+ }
+
+ public URI search(Filter filter, long timeout, TimeUnit unit) throws IOException {
+ timeout = unit.convert(timeout, TimeUnit.MILLISECONDS);
+ long waited = 0;
+
+ byte[] buf = new byte[BUFF_SIZE];
+ DatagramPacket packet = new DatagramPacket(buf, 0, buf.length);
+
+ while (timeout == 0 || waited < timeout){
+ long start = System.currentTimeMillis();
+ try {
+ multicast.receive(packet);
+ if (packet.getLength() > 0) {
+ String str = new String(packet.getData(), packet.getOffset(), packet.getLength());
+ URI service = URI.create(str);
+ if (service != null && filter.accept(service)) {
+ return service;
+ }
+ }
+ } finally {
+ long stop = System.currentTimeMillis();
+ waited += stop - start;
+ }
+ }
+
+ return null;
+ }
+
+ public interface Filter {
+ boolean accept(URI service);
+ }
+
+ public static class DefaultFilter implements Filter {
+ public boolean accept(URI service) {
+ return true;
+ }
+ }
+}
Added: openejb/trunk/openejb3/server/openejb-client/src/main/java/org/apache/openejb/client/Registry.java
URL: http://svn.apache.org/viewvc/openejb/trunk/openejb3/server/openejb-client/src/main/java/org/apache/openejb/client/Registry.java?rev=690629&view=auto
==============================================================================
--- openejb/trunk/openejb3/server/openejb-client/src/main/java/org/apache/openejb/client/Registry.java (added)
+++ openejb/trunk/openejb3/server/openejb-client/src/main/java/org/apache/openejb/client/Registry.java Sat Aug 30 20:23:03 2008
@@ -0,0 +1,88 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.openejb.client;
+
+import java.util.Map;
+import java.util.HashMap;
+import java.util.concurrent.ConcurrentHashMap;
+import java.io.IOException;
+
+/**
+ * @version $Rev$ $Date$
+ */
+public class Registry<T> {
+
+ private Map<String, T> components = new ConcurrentHashMap<String, T>();
+
+ private Map<String, Class> available;
+
+ private String componentType;
+
+
+ public static <T> Registry<T> create(Class<T> type){
+ return new Registry(type);
+ }
+
+ private Registry(Class<T> type) {
+ componentType = type.getSimpleName();
+
+ try {
+ ResourceFinder resourceFinder = new ResourceFinder("META-INF/");
+ available = resourceFinder.mapAvailableImplementations(type);
+ } catch (IOException e) {
+ available = new HashMap();
+ }
+ }
+
+ public void register(String scheme, T factory) {
+ components.put(scheme, factory);
+ }
+
+ public T unregister(String scheme) {
+ if ("default".equals(scheme)) {
+ throw new IllegalArgumentException("Cannot uninstall the default " + componentType);
+ }
+ return components.remove(scheme);
+ }
+
+ public T get(String scheme) {
+ T factory = components.get(scheme);
+
+ if (factory == null) {
+ factory = load(scheme);
+ }
+
+ return factory;
+ }
+
+ private T load(String scheme) {
+
+ Class clazz = available.get(scheme);
+
+ if (clazz == null) return null;
+
+ try {
+ T factory = (T) clazz.newInstance();
+
+ components.put(scheme, factory);
+
+ return factory;
+ } catch (Exception e) {
+ throw new IllegalStateException(componentType + " cannot be installed. Unable to instantiate the class " + clazz.getName() + " for scheme " + scheme);
+ }
+ }
+}
Modified: openejb/trunk/openejb3/server/openejb-client/src/main/java/org/apache/openejb/client/ServerMetaData.java
URL: http://svn.apache.org/viewvc/openejb/trunk/openejb3/server/openejb-client/src/main/java/org/apache/openejb/client/ServerMetaData.java?rev=690629&r1=690628&r2=690629&view=diff
==============================================================================
--- openejb/trunk/openejb3/server/openejb-client/src/main/java/org/apache/openejb/client/ServerMetaData.java (original)
+++ openejb/trunk/openejb3/server/openejb-client/src/main/java/org/apache/openejb/client/ServerMetaData.java Sat Aug 30 20:23:03 2008
@@ -21,13 +21,12 @@
import java.io.ObjectInput;
import java.io.ObjectOutput;
import java.net.URI;
-import java.rmi.RemoteException;
import java.util.Arrays;
public class ServerMetaData implements Externalizable {
private transient URI[] locations;
- private transient ConnectionFactoryStrategy connectionFactoryStrategy;
+ private transient String connectionStrategy;
public ServerMetaData() {
}
@@ -36,10 +35,19 @@
this.locations = locations;
}
+ public ServerMetaData(String connectionStrategy, URI ... locations) {
+ this.connectionStrategy = connectionStrategy;
+ this.locations = locations;
+ }
+
public void merge(ServerMetaData toMerge) {
locations = toMerge.locations;
}
-
+
+ public String getConnectionStrategy() {
+ return connectionStrategy;
+ }
+
public URI[] getLocations() {
return locations;
}
@@ -52,28 +60,21 @@
return locationsHash;
}
- public Connection connect(Request request) throws RemoteException {
- ConnectionFactoryStrategy factoryStrategy = getConnectionFactoryStrategy();
- return factoryStrategy.connect(locations, request);
- }
-
- protected ConnectionFactoryStrategy getConnectionFactoryStrategy() {
- if (null == connectionFactoryStrategy) {
- connectionFactoryStrategy = new StickToLastServerConnectionFactoryStrategy();
+ public void readExternal(ObjectInput in) throws IOException, ClassNotFoundException {
+ byte version = in.readByte();
+
+ if (version > 1){
+ connectionStrategy = (String) in.readObject();
}
- return connectionFactoryStrategy;
- }
- public void readExternal(ObjectInput in) throws IOException, ClassNotFoundException {
- byte version = in.readByte(); // future use
-
locations = (URI[]) in.readObject();
}
public void writeExternal(ObjectOutput out) throws IOException {
// write out the version of the serialized data for future use
- out.writeByte(1);
+ out.writeByte(2);
+ out.writeObject(connectionStrategy);
out.writeObject(locations);
}
Modified: openejb/trunk/openejb3/server/openejb-client/src/main/java/org/apache/openejb/client/SocketConnectionFactory.java
URL: http://svn.apache.org/viewvc/openejb/trunk/openejb3/server/openejb-client/src/main/java/org/apache/openejb/client/SocketConnectionFactory.java?rev=690629&r1=690628&r2=690629&view=diff
==============================================================================
--- openejb/trunk/openejb3/server/openejb-client/src/main/java/org/apache/openejb/client/SocketConnectionFactory.java (original)
+++ openejb/trunk/openejb3/server/openejb-client/src/main/java/org/apache/openejb/client/SocketConnectionFactory.java Sat Aug 30 20:23:03 2008
@@ -38,9 +38,6 @@
private static Map<URI, SocketConnection> connections = new ConcurrentHashMap<URI, SocketConnection>();
- public void init(Properties props) {
- }
-
public Connection getConnection(URI uri) throws java.io.IOException {
SocketConnection conn = connections.get(uri);
Copied: openejb/trunk/openejb3/server/openejb-client/src/main/java/org/apache/openejb/client/StickyConnectionStrategy.java (from r689515, openejb/trunk/openejb3/server/openejb-client/src/main/java/org/apache/openejb/client/StickToLastServerConnectionFactoryStrategy.java)
URL: http://svn.apache.org/viewvc/openejb/trunk/openejb3/server/openejb-client/src/main/java/org/apache/openejb/client/StickyConnectionStrategy.java?p2=openejb/trunk/openejb3/server/openejb-client/src/main/java/org/apache/openejb/client/StickyConnectionStrategy.java&p1=openejb/trunk/openejb3/server/openejb-client/src/main/java/org/apache/openejb/client/StickToLastServerConnectionFactoryStrategy.java&r1=689515&r2=690629&rev=690629&view=diff
==============================================================================
--- openejb/trunk/openejb3/server/openejb-client/src/main/java/org/apache/openejb/client/StickToLastServerConnectionFactoryStrategy.java (original)
+++ openejb/trunk/openejb3/server/openejb-client/src/main/java/org/apache/openejb/client/StickyConnectionStrategy.java Sat Aug 30 20:23:03 2008
@@ -22,12 +22,13 @@
import java.util.logging.Level;
import java.util.logging.Logger;
-public class StickToLastServerConnectionFactoryStrategy implements ConnectionFactoryStrategy {
+public class StickyConnectionStrategy implements ConnectionStrategy {
private static final Logger LOGGER = Logger.getLogger("OpenEJB.client");
private URI lastLocation;
- public Connection connect(URI[] locations, Request request) throws RemoteException {
+ public Connection connect(ServerMetaData server) throws IOException {
+ URI[] locations = server.getLocations();
if (null != lastLocation) {
for (int i = 0; i < locations.length; i++) {
if (locations[i].equals(lastLocation)) {
@@ -39,7 +40,7 @@
}
}
}
-
+
Connection connection = null;
for (int i = 0; i < locations.length; i++) {
URI uri = locations[i];
@@ -53,10 +54,11 @@
throw new RemoteException("Cannot connect to server: " + uri.getHost() + ":" + uri.getPort() + " due to an unkown exception in the OpenEJB client: ", e);
}
}
+
if (null != connection) {
return connection;
}
-
+
// If no servers responded, throw an error
StringBuffer buffer = new StringBuffer();
for (int i = 0; i < locations.length; i++) {
Modified: openejb/trunk/openejb3/server/openejb-client/src/test/java/org/apache/openejb/client/ConnectionManagerTest.java
URL: http://svn.apache.org/viewvc/openejb/trunk/openejb3/server/openejb-client/src/test/java/org/apache/openejb/client/ConnectionManagerTest.java?rev=690629&r1=690628&r2=690629&view=diff
==============================================================================
--- openejb/trunk/openejb3/server/openejb-client/src/test/java/org/apache/openejb/client/ConnectionManagerTest.java (original)
+++ openejb/trunk/openejb3/server/openejb-client/src/test/java/org/apache/openejb/client/ConnectionManagerTest.java Sat Aug 30 20:23:03 2008
@@ -17,6 +17,8 @@
package org.apache.openejb.client;
import java.io.IOException;
+import java.io.InputStream;
+import java.io.OutputStream;
import java.net.URI;
import java.util.Properties;
@@ -25,24 +27,34 @@
public class ConnectionManagerTest extends TestCase{
- public void testSetConnectionFactory() throws Exception {
+ public void testRegisterFactory() throws Exception {
MockConnectionFactory connectionFactory = new MockConnectionFactory();
- ConnectionManager.setFactory(connectionFactory);
-
- assertSame(connectionFactory, ConnectionManager.getFactory());
- assertEquals(connectionFactory.getClass().getName(), ConnectionManager.getFactoryName());
+ ConnectionManager.registerFactory("mock", connectionFactory);
+
+ Connection connection = ConnectionManager.getConnection(new URI("mock://foo"));
+
+ assertTrue(connection instanceof MockConnection);
}
public static class MockConnectionFactory implements ConnectionFactory {
public Connection getConnection(URI uri) throws IOException {
+ return new MockConnection();
+ }
+
+ }
+
+ private static class MockConnection implements Connection {
+ public void close() throws IOException {
throw new UnsupportedOperationException();
}
- public void init(Properties props) {
+ public InputStream getInputStream() throws IOException {
+ throw new UnsupportedOperationException();
+ }
+
+ public OutputStream getOuputStream() throws IOException {
throw new UnsupportedOperationException();
}
-
}
-
}
Copied: openejb/trunk/openejb3/server/openejb-client/src/test/java/org/apache/openejb/client/StickyConnectionStrategyTest.java (from r689515, openejb/trunk/openejb3/server/openejb-client/src/test/java/org/apache/openejb/client/StickToLastServerConnectionFactoryStrategyTest.java)
URL: http://svn.apache.org/viewvc/openejb/trunk/openejb3/server/openejb-client/src/test/java/org/apache/openejb/client/StickyConnectionStrategyTest.java?p2=openejb/trunk/openejb3/server/openejb-client/src/test/java/org/apache/openejb/client/StickyConnectionStrategyTest.java&p1=openejb/trunk/openejb3/server/openejb-client/src/test/java/org/apache/openejb/client/StickToLastServerConnectionFactoryStrategyTest.java&r1=689515&r2=690629&rev=690629&view=diff
==============================================================================
--- openejb/trunk/openejb3/server/openejb-client/src/test/java/org/apache/openejb/client/StickToLastServerConnectionFactoryStrategyTest.java (original)
+++ openejb/trunk/openejb3/server/openejb-client/src/test/java/org/apache/openejb/client/StickyConnectionStrategyTest.java Sat Aug 30 20:23:03 2008
@@ -23,18 +23,18 @@
import com.agical.rmock.extension.junit.RMockTestCase;
-public class StickToLastServerConnectionFactoryStrategyTest extends RMockTestCase {
+public class StickyConnectionStrategyTest extends RMockTestCase {
- private StickToLastServerConnectionFactoryStrategy factoryStrategy;
+ private StickyConnectionStrategy factoryStrategy;
private ConnectionFactory connectionFactory;
private URI[] locations;
@Override
protected void setUp() throws Exception {
connectionFactory = (ConnectionFactory) mock(ConnectionFactory.class);
- ConnectionManager.setFactory(connectionFactory);
+ ConnectionManager.registerFactory("ejbd", connectionFactory);
- factoryStrategy = new StickToLastServerConnectionFactoryStrategy();
+ factoryStrategy = new StickyConnectionStrategy();
URI uri1 = new URI("ejbd://localhost:4201");
URI uri2 = new URI("ejbd://localhost:4202");
@@ -51,9 +51,10 @@
startVerification();
try {
- factoryStrategy.connect(locations, null);
+ ServerMetaData server = new ServerMetaData(locations);
+ factoryStrategy.connect(server);
fail();
- } catch (RemoteException e) {
+ } catch (IOException e) {
}
}
@@ -65,7 +66,8 @@
startVerification();
- Connection actualConnection = factoryStrategy.connect(locations, null);
+ ServerMetaData server = new ServerMetaData(locations);
+ Connection actualConnection = factoryStrategy.connect(server);
assertSame(expectedConnection, actualConnection);
}
@@ -78,8 +80,9 @@
startVerification();
- factoryStrategy.connect(locations, null);
- factoryStrategy.connect(locations, null);
+ ServerMetaData server = new ServerMetaData(locations);
+ factoryStrategy.connect(server);
+ factoryStrategy.connect(server);
}
}
Copied: openejb/trunk/openejb3/server/openejb-discovery/pom.xml (from r690620, openejb/trunk/openejb3/server/openejb-admin/pom.xml)
URL: http://svn.apache.org/viewvc/openejb/trunk/openejb3/server/openejb-discovery/pom.xml?p2=openejb/trunk/openejb3/server/openejb-discovery/pom.xml&p1=openejb/trunk/openejb3/server/openejb-admin/pom.xml&r1=690620&r2=690629&rev=690629&view=diff
==============================================================================
--- openejb/trunk/openejb3/server/openejb-admin/pom.xml (original)
+++ openejb/trunk/openejb3/server/openejb-discovery/pom.xml Sat Aug 30 20:23:03 2008
@@ -26,9 +26,9 @@
<version>3.1-SNAPSHOT</version>
</parent>
<modelVersion>4.0.0</modelVersion>
- <artifactId>openejb-admin</artifactId>
+ <artifactId>openejb-discovery</artifactId>
<packaging>jar</packaging>
- <name>OpenEJB :: Server :: Admin</name>
+ <name>OpenEJB :: Server :: Discovery</name>
<dependencies>
<dependency>
<groupId>org.apache.openejb</groupId>
Added: openejb/trunk/openejb3/server/openejb-discovery/src/main/java/org/apache/openejb/server/discovery/DiscoveryAgent.java
URL: http://svn.apache.org/viewvc/openejb/trunk/openejb3/server/openejb-discovery/src/main/java/org/apache/openejb/server/discovery/DiscoveryAgent.java?rev=690629&view=auto
==============================================================================
--- openejb/trunk/openejb3/server/openejb-discovery/src/main/java/org/apache/openejb/server/discovery/DiscoveryAgent.java (added)
+++ openejb/trunk/openejb3/server/openejb-discovery/src/main/java/org/apache/openejb/server/discovery/DiscoveryAgent.java Sat Aug 30 20:23:03 2008
@@ -0,0 +1,53 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.openejb.server.discovery;
+
+import java.net.URI;
+import java.io.IOException;
+
+/**
+ * @version $Rev$ $Date$
+ */
+public interface DiscoveryAgent {
+ /**
+ * Sets the discovery listener
+ * @param listener
+ */
+ void setDiscoveryListener(DiscoveryListener listener);
+
+ /**
+ * register a service
+ * @param serviceUri
+ * @param details
+ */
+ void registerService(URI serviceUri) throws IOException;
+
+ /**
+ * register a service
+ * @param serviceUri
+ * @param details
+ */
+ void unregisterService(URI serviceUri) throws IOException;
+
+ /**
+ * A process actively using a service may see it go down before the DiscoveryAgent notices the
+ * service's failure. That process can use this method to notify the DiscoveryAgent of the failure
+ * so that other listeners of this DiscoveryAgent can also be made aware of the failure.
+ */
+ void reportFailed(URI serviceUri) throws IOException;
+
+}
Added: openejb/trunk/openejb3/server/openejb-discovery/src/main/java/org/apache/openejb/server/discovery/DiscoveryListener.java
URL: http://svn.apache.org/viewvc/openejb/trunk/openejb3/server/openejb-discovery/src/main/java/org/apache/openejb/server/discovery/DiscoveryListener.java?rev=690629&view=auto
==============================================================================
--- openejb/trunk/openejb3/server/openejb-discovery/src/main/java/org/apache/openejb/server/discovery/DiscoveryListener.java (added)
+++ openejb/trunk/openejb3/server/openejb-discovery/src/main/java/org/apache/openejb/server/discovery/DiscoveryListener.java Sat Aug 30 20:23:03 2008
@@ -0,0 +1,30 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.openejb.server.discovery;
+
+
+import java.net.URI;
+import java.io.IOException;
+
+/**
+ * @version $Rev$ $Date$
+ */
+public interface DiscoveryListener {
+ public void serviceAdded(URI service);
+ public void serviceRemoved(URI service);
+
+}
Added: openejb/trunk/openejb3/server/openejb-discovery/src/main/java/org/apache/openejb/server/discovery/MulticastDiscoveryAgent.java
URL: http://svn.apache.org/viewvc/openejb/trunk/openejb3/server/openejb-discovery/src/main/java/org/apache/openejb/server/discovery/MulticastDiscoveryAgent.java?rev=690629&view=auto
==============================================================================
--- openejb/trunk/openejb3/server/openejb-discovery/src/main/java/org/apache/openejb/server/discovery/MulticastDiscoveryAgent.java (added)
+++ openejb/trunk/openejb3/server/openejb-discovery/src/main/java/org/apache/openejb/server/discovery/MulticastDiscoveryAgent.java Sat Aug 30 20:23:03 2008
@@ -0,0 +1,471 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.openejb.server.discovery;
+
+import static org.apache.openejb.server.ServiceDaemon.getBoolean;
+import static org.apache.openejb.server.ServiceDaemon.getLong;
+import static org.apache.openejb.server.ServiceDaemon.getInt;
+import org.apache.openejb.server.SelfManaging;
+import org.apache.openejb.server.ServerService;
+import org.apache.openejb.server.ServiceException;
+import org.apache.openejb.server.ServiceDaemon;
+import org.apache.openejb.util.LogCategory;
+import org.apache.openejb.util.Logger;
+
+import java.io.IOException;
+import java.io.InputStream;
+import java.io.OutputStream;
+import java.net.DatagramPacket;
+import java.net.InetAddress;
+import java.net.InetSocketAddress;
+import java.net.MulticastSocket;
+import java.net.Socket;
+import java.net.SocketAddress;
+import java.net.SocketTimeoutException;
+import java.net.URI;
+import java.net.URISyntaxException;
+import java.util.Map;
+import java.util.Properties;
+import java.util.Timer;
+import java.util.TimerTask;
+import java.util.concurrent.ConcurrentHashMap;
+import java.util.concurrent.Executor;
+import java.util.concurrent.LinkedBlockingQueue;
+import java.util.concurrent.ThreadFactory;
+import java.util.concurrent.ThreadPoolExecutor;
+import java.util.concurrent.TimeUnit;
+import java.util.concurrent.atomic.AtomicBoolean;
+
+/**
+ * @version $Rev$ $Date$
+ */
+public class MulticastDiscoveryAgent implements DiscoveryAgent, ServerService, SelfManaging {
+
+ private static final Logger log = Logger.getInstance(LogCategory.OPENEJB_SERVER.createChild("discovery"), MulticastDiscoveryAgent.class);
+
+ private static final int BUFF_SIZE = 8192;
+
+
+ private AtomicBoolean started = new AtomicBoolean(false);
+ private MulticastSocket multicast;
+
+ private String host = "239.255.3.2";
+ private int port = 6142;
+
+ private int timeToLive = 1;
+ private boolean loopbackMode = false;
+ private SocketAddress address;
+
+ private Map<String, Service> registeredServices = new ConcurrentHashMap<String, Service>();
+
+ private int maxMissedHeartbeats = 10;
+ private long heartRate = 500;
+
+ private Listener listener;
+
+ public MulticastDiscoveryAgent() {
+ listener = new Listener();
+ }
+
+ // ---------------------------------
+ // Listenting specific settings
+ private long initialReconnectDelay = 1000 * 5;
+ private long maxReconnectDelay = 1000 * 30;
+ private long backOffMultiplier = 0;
+ private boolean useExponentialBackOff;
+ private int maxReconnectAttempts = 10; // todo: check this out
+ // ---------------------------------
+
+
+ public void init(Properties props) throws Exception {
+
+ host = props.getProperty("bind", host);
+
+ port = getInt(props, "port", port);
+
+ heartRate = getLong(props, "heart_rate", heartRate);
+ maxMissedHeartbeats = getInt(props, "max_missed_heartbeats", maxMissedHeartbeats);
+ loopbackMode = getBoolean(props, "max_missed_heartbeats", loopbackMode);
+
+ initialReconnectDelay = getLong(props, "reconnect_delay", initialReconnectDelay);
+ maxReconnectDelay = getLong(props, "max_reconnect_delay", initialReconnectDelay);
+ maxReconnectAttempts = getInt(props, "max_reconnect_attempts", maxReconnectAttempts);
+ backOffMultiplier = getLong(props, "exponential_backoff", backOffMultiplier);
+
+ useExponentialBackOff = (backOffMultiplier > 1);
+ }
+
+ public String getIP() {
+ return host;
+ }
+
+ public String getName() {
+ return "multicast";
+ }
+
+ public int getPort() {
+ return port;
+ }
+
+ public void setDiscoveryListener(DiscoveryListener listener) {
+ this.listener.setDiscoveryListener(listener);
+ }
+
+ public void registerService(URI serviceUri) throws IOException {
+ Service service = new Service(serviceUri);
+ this.registeredServices.put(service.uriString, service);
+ }
+
+ public void unregisterService(URI serviceUri) throws IOException {
+ Service service = new Service(serviceUri);
+ this.registeredServices.remove(service.uriString);
+ }
+
+ public void reportFailed(URI serviceUri) throws IOException {
+ listener.reportFailed(serviceUri);
+ }
+
+
+ private boolean isSelf(Service service) {
+ return isSelf(service.uriString);
+ }
+
+ private boolean isSelf(String service) {
+ return registeredServices.keySet().contains(service);
+ }
+
+ public static void main(String[] args) throws Exception {
+ }
+
+ /**
+ * start the discovery agent
+ *
+ * @throws Exception
+ */
+ public void start() throws ServiceException {
+ try {
+ if (started.compareAndSet(false, true)) {
+
+ InetAddress inetAddress = InetAddress.getByName(host);
+
+ this.address = new InetSocketAddress(inetAddress, port);
+
+ multicast = new MulticastSocket(port);
+ multicast.setLoopbackMode(loopbackMode);
+ multicast.setTimeToLive(timeToLive);
+ multicast.joinGroup(inetAddress);
+ multicast.setSoTimeout((int) heartRate);
+
+ Thread listenerThread = new Thread(listener);
+ listenerThread.setName("MulticastDiscovery: Listener");
+ listenerThread.setDaemon(true);
+ listenerThread.start();
+
+ Broadcaster broadcaster = new Broadcaster();
+
+ Timer timer = new Timer("MulticastDiscovery: Broadcaster", true);
+ timer.scheduleAtFixedRate(broadcaster, 0, heartRate);
+ }
+ } catch (Exception e) {
+ throw new ServiceException(e);
+ }
+ }
+
+ /**
+ * stop the channel
+ *
+ * @throws Exception
+ */
+ public void stop() throws ServiceException {
+ if (started.compareAndSet(true, false)) {
+ multicast.close();
+ }
+ }
+
+ public void service(InputStream in, OutputStream out) throws ServiceException, IOException {
+ }
+
+ public void service(Socket socket) throws ServiceException, IOException {
+ }
+
+ class Service {
+ private final URI uri;
+ private final String uriString;
+
+ public Service(URI uri) {
+ this.uri = uri;
+ this.uriString = uri.toString();
+ }
+
+ public Service(String uriString) throws URISyntaxException {
+ this(new URI(uriString));
+ }
+ }
+
+ private class ServiceVitals {
+
+ private final Service service;
+
+ private long lastHeartBeat;
+ private long recoveryTime;
+ private int failureCount;
+ private boolean dead;
+
+ public ServiceVitals(Service service) {
+ this.service = service;
+ this.lastHeartBeat = System.currentTimeMillis();
+ }
+
+ public synchronized void heartbeat() {
+ lastHeartBeat = System.currentTimeMillis();
+
+ // Consider that the service recovery has succeeded if it has not
+ // failed in 60 seconds.
+ if (!dead && failureCount > 0 && (lastHeartBeat - recoveryTime) > 1000 * 60) {
+ if (log.isDebugEnabled()) {
+ log.debug("I now think that the " + service + " service has recovered.");
+ }
+ failureCount = 0;
+ recoveryTime = 0;
+ }
+ }
+
+ public synchronized long getLastHeartbeat() {
+ return lastHeartBeat;
+ }
+
+ public synchronized boolean pronounceDead() {
+ if (!dead) {
+ dead = true;
+ failureCount++;
+
+ long reconnectDelay;
+ if (useExponentialBackOff) {
+ reconnectDelay = (long) Math.pow(backOffMultiplier, failureCount);
+ if (reconnectDelay > maxReconnectDelay) {
+ reconnectDelay = maxReconnectDelay;
+ }
+ } else {
+ reconnectDelay = initialReconnectDelay;
+ }
+
+ if (log.isDebugEnabled()) {
+ log.debug("Remote failure of " + service + " while still receiving multicast advertisements. " +
+ "Advertising events will be suppressed for " + reconnectDelay
+ + " ms, the current failure count is: " + failureCount);
+ }
+
+ recoveryTime = System.currentTimeMillis() + reconnectDelay;
+ return true;
+ }
+ return false;
+ }
+
+ /**
+ * @return true if this broker is marked failed and it is now the right
+ * time to start recovery.
+ */
+ public synchronized boolean doRecovery() {
+ if (!dead) {
+ return false;
+ }
+
+ // Are we done trying to recover this guy?
+ if (maxReconnectAttempts > 0 && failureCount > maxReconnectAttempts) {
+ if (log.isDebugEnabled()) {
+ log.debug("Max reconnect attempts of the " + service + " service has been reached.");
+ }
+ return false;
+ }
+
+ // Is it not yet time?
+ if (System.currentTimeMillis() < recoveryTime) {
+ return false;
+ }
+
+ if (log.isDebugEnabled()) {
+ log.debug("Resuming event advertisement of the " + service + " service.");
+ }
+ dead = false;
+ return true;
+ }
+
+ public boolean isDead() {
+ return dead;
+ }
+ }
+
+
+ class Listener implements Runnable {
+ private Map<String, ServiceVitals> discoveredServices = new ConcurrentHashMap<String, ServiceVitals>();
+ private DiscoveryListener discoveryListener;
+
+ public void setDiscoveryListener(DiscoveryListener discoveryListener) {
+ this.discoveryListener = discoveryListener;
+ }
+
+ public void run() {
+ byte[] buf = new byte[BUFF_SIZE];
+ DatagramPacket packet = new DatagramPacket(buf, 0, buf.length);
+ while (started.get()) {
+ checkServices();
+ try {
+ multicast.receive(packet);
+ if (packet.getLength() > 0) {
+ String str = new String(packet.getData(), packet.getOffset(), packet.getLength());
+// System.out.println("read = " + str);
+ processData(str);
+ }
+ } catch (SocketTimeoutException se) {
+ // ignore
+ } catch (IOException e) {
+ if (started.get()) {
+ log.error("failed to process packet: " + e);
+ }
+ }
+ }
+ }
+
+ private void processData(String uriString) {
+ if (discoveryListener == null) {
+ return;
+ }
+ if (isSelf(uriString)) {
+ return;
+ }
+
+ ServiceVitals vitals = discoveredServices.get(uriString);
+
+ if (vitals == null) {
+ try {
+ vitals = new ServiceVitals(new Service(uriString));
+
+ discoveredServices.put(uriString, vitals);
+
+ fireServiceAddEvent(vitals.service.uri);
+ } catch (URISyntaxException e) {
+ // don't continuously log this
+ }
+
+ } else {
+ vitals.heartbeat();
+
+ if (vitals.doRecovery()) {
+ fireServiceAddEvent(vitals.service.uri);
+ }
+ }
+ }
+
+ private void checkServices() {
+ long expireTime = System.currentTimeMillis() - (heartRate * maxMissedHeartbeats);
+ for (ServiceVitals serviceVitals : discoveredServices.values()) {
+ if (serviceVitals.getLastHeartbeat() < expireTime && !isSelf(serviceVitals.service)) {
+
+ ServiceVitals vitals = discoveredServices.remove(serviceVitals.service.uriString);
+ if (vitals != null && !vitals.isDead()) {
+ fireServiceRemovedEvent(vitals.service.uri);
+ }
+ }
+ }
+ }
+
+ private final Executor executor = new ThreadPoolExecutor(1, 1, 30, TimeUnit.SECONDS, new LinkedBlockingQueue<Runnable>(), new ThreadFactory() {
+ public Thread newThread(Runnable runable) {
+ Thread t = new Thread(runable, "Multicast Discovery Agent Notifier");
+ t.setDaemon(true);
+ return t;
+ }
+ });
+
+ private void fireServiceRemovedEvent(final URI uri) {
+ if (discoveryListener != null) {
+ final DiscoveryListener discoveryListener = this.discoveryListener;
+
+ // Have the listener process the event async so that
+ // he does not block this thread since we are doing time sensitive
+ // processing of events.
+ executor.execute(new Runnable() {
+ public void run() {
+ if (discoveryListener != null) {
+ discoveryListener.serviceRemoved(uri);
+ }
+ }
+ });
+ }
+ }
+
+ private void fireServiceAddEvent(final URI uri) {
+ if (discoveryListener != null) {
+ final DiscoveryListener discoveryListener = this.discoveryListener;
+
+ // Have the listener process the event async so that
+ // he does not block this thread since we are doing time sensitive
+ // processing of events.
+ executor.execute(new Runnable() {
+ public void run() {
+ if (discoveryListener != null) {
+ discoveryListener.serviceAdded(uri);
+ }
+ }
+ });
+ }
+ }
+
+ public void reportFailed(URI serviceUri) {
+ final Service service = new Service(serviceUri);
+ ServiceVitals serviceVitals = discoveredServices.get(service.uriString);
+ if (serviceVitals != null && serviceVitals.pronounceDead()) {
+ fireServiceRemovedEvent(service.uri);
+ }
+ }
+ }
+
+ class Broadcaster extends TimerTask {
+ private IOException failed;
+
+ public void run() {
+ if (started.get()) {
+ heartbeat();
+ }
+ }
+
+ private void heartbeat() {
+ for (String uri : registeredServices.keySet()) {
+ try {
+ byte[] data = uri.getBytes();
+ DatagramPacket packet = new DatagramPacket(data, 0, data.length, address);
+// System.out.println("ann = " + uri);
+ multicast.send(packet);
+ } catch (IOException e) {
+ // If a send fails, chances are all subsequent sends will fail
+ // too.. No need to keep reporting the
+ // same error over and over.
+ if (failed == null) {
+ failed = e;
+
+ log.error("Failed to advertise our service: " + uri, e);
+ if ("Operation not permitted".equals(e.getMessage())) {
+ log.error("The 'Operation not permitted' error has been know to be caused by improper firewall/network setup. "
+ + "Please make sure that the OS is properly configured to allow multicast traffic over: " + multicast.getLocalAddress());
+ }
+ }
+ }
+ }
+ }
+ }
+
+}
Added: openejb/trunk/openejb3/server/openejb-discovery/src/main/java/org/apache/openejb/server/discovery/MulticastSearch.java
URL: http://svn.apache.org/viewvc/openejb/trunk/openejb3/server/openejb-discovery/src/main/java/org/apache/openejb/server/discovery/MulticastSearch.java?rev=690629&view=auto
==============================================================================
--- openejb/trunk/openejb3/server/openejb-discovery/src/main/java/org/apache/openejb/server/discovery/MulticastSearch.java (added)
+++ openejb/trunk/openejb3/server/openejb-discovery/src/main/java/org/apache/openejb/server/discovery/MulticastSearch.java Sat Aug 30 20:23:03 2008
@@ -0,0 +1,95 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.openejb.server.discovery;
+
+import java.net.URI;
+import java.net.MulticastSocket;
+import java.net.InetAddress;
+import java.net.DatagramPacket;
+import java.util.concurrent.TimeUnit;
+import java.io.IOException;
+
+/**
+ * @version $Rev$ $Date$
+ */
+public class MulticastSearch {
+
+ private static final int BUFF_SIZE = 8192;
+
+ private final MulticastSocket multicast;
+
+ public MulticastSearch() throws IOException {
+ this("239.255.2.3", 6142);
+ }
+
+ public MulticastSearch(String host, int port) throws IOException {
+ InetAddress inetAddress = InetAddress.getByName(host);
+
+ multicast = new MulticastSocket(port);
+ multicast.joinGroup(inetAddress);
+ multicast.setSoTimeout(500);
+ }
+
+ public URI search(int timeout, TimeUnit milliseconds) throws IOException {
+ return search(new DefaultFilter(), timeout, milliseconds);
+ }
+
+ public URI search() throws IOException {
+ return search(new DefaultFilter(), 0, TimeUnit.MILLISECONDS);
+ }
+
+ public URI search(Filter filter) throws IOException {
+ return search(filter, 0, TimeUnit.MILLISECONDS);
+ }
+
+ public URI search(Filter filter, long timeout, TimeUnit unit) throws IOException {
+ timeout = unit.convert(timeout, TimeUnit.MILLISECONDS);
+ long waited = 0;
+
+ byte[] buf = new byte[BUFF_SIZE];
+ DatagramPacket packet = new DatagramPacket(buf, 0, buf.length);
+
+ while (timeout == 0 || waited < timeout){
+ long start = System.currentTimeMillis();
+ try {
+ multicast.receive(packet);
+ if (packet.getLength() > 0) {
+ String str = new String(packet.getData(), packet.getOffset(), packet.getLength());
+ URI service = URI.create(str);
+ if (service != null && filter.accept(service)) {
+ return service;
+ }
+ }
+ } finally {
+ long stop = System.currentTimeMillis();
+ waited += stop - start;
+ }
+ }
+
+ return null;
+ }
+
+ public interface Filter {
+ boolean accept(URI service);
+ }
+
+ public static class DefaultFilter implements Filter {
+ public boolean accept(URI service) {
+ return true;
+ }
+ }
+}
Added: openejb/trunk/openejb3/server/openejb-discovery/src/main/resources/META-INF/org.apache.openejb.server.ServersService/multicast
URL: http://svn.apache.org/viewvc/openejb/trunk/openejb3/server/openejb-discovery/src/main/resources/META-INF/org.apache.openejb.server.ServersService/multicast?rev=690629&view=auto
==============================================================================
--- openejb/trunk/openejb3/server/openejb-discovery/src/main/resources/META-INF/org.apache.openejb.server.ServersService/multicast (added)
+++ openejb/trunk/openejb3/server/openejb-discovery/src/main/resources/META-INF/org.apache.openejb.server.ServersService/multicast Sat Aug 30 20:23:03 2008
@@ -0,0 +1,4 @@
+server = org.apache.openejb.server.discovery.MulticastDiscoveryAgent
+bind = 239.255.2.3
+port = 6142
+disabled = true
Added: openejb/trunk/openejb3/server/openejb-discovery/src/test/java/org/apache/openejb/server/discovery/MulticastDiscoveryAgentTest.java
URL: http://svn.apache.org/viewvc/openejb/trunk/openejb3/server/openejb-discovery/src/test/java/org/apache/openejb/server/discovery/MulticastDiscoveryAgentTest.java?rev=690629&view=auto
==============================================================================
--- openejb/trunk/openejb3/server/openejb-discovery/src/test/java/org/apache/openejb/server/discovery/MulticastDiscoveryAgentTest.java (added)
+++ openejb/trunk/openejb3/server/openejb-discovery/src/test/java/org/apache/openejb/server/discovery/MulticastDiscoveryAgentTest.java Sat Aug 30 20:23:03 2008
@@ -0,0 +1,103 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.openejb.server.discovery;
+
+import junit.framework.TestCase;
+
+import java.net.URI;
+import java.net.URISyntaxException;
+import java.io.IOException;
+import java.util.HashSet;
+import java.util.Set;
+
+import org.apache.openejb.server.ServiceException;
+
+/**
+ * @version $Rev$ $Date$
+ */
+public class MulticastDiscoveryAgentTest extends TestCase {
+
+ public void testNothing(){}
+
+ public void _test() throws Exception {
+ MulticastDiscoveryAgent[] agents = {agent("red"),agent("green"),agent("yellow"),agent("blue")};
+
+ MulticastSearch multicast = new MulticastSearch();
+ Filter filter = new Filter();
+
+ System.out.println("uri = " + multicast.search(filter));
+ System.out.println("uri = " + multicast.search(filter));
+ System.out.println("uri = " + multicast.search(filter));
+ System.out.println("uri = " + multicast.search(filter));
+
+ Thread.sleep(2000);
+ System.out.println("--");
+
+
+ for (MulticastDiscoveryAgent agent : agents) {
+ Thread.sleep(10000);
+ System.out.println("--");
+ agent.stop();
+ }
+
+ for (MulticastDiscoveryAgent agent : agents) {
+ Thread.sleep(10000);
+ System.out.println("--");
+ agent.start();
+ }
+
+ Thread.sleep(10000);
+
+
+ }
+
+ private static class Filter implements MulticastSearch.Filter {
+ private final Set<URI> seen = new HashSet<URI>();
+ public boolean accept(URI service) {
+ if (seen.contains(service)) return false;
+ seen.add(service);
+ return true;
+ }
+ }
+
+ private MulticastDiscoveryAgent agent(String id) throws IOException, URISyntaxException, ServiceException {
+ MulticastDiscoveryAgent agent = new MulticastDiscoveryAgent();
+ agent.setDiscoveryListener(new MyDiscoveryListener(id));
+ agent.registerService(new URI("ejbd://"+id+":4201"));
+ agent.start();
+ return agent;
+ }
+
+ private static class MyDiscoveryListener implements DiscoveryListener {
+ private final String id;
+
+ public MyDiscoveryListener(String id) {
+ id += " ";
+ id = id.substring(0,8);
+ this.id = id;
+ }
+
+ public void serviceAdded(URI service) {
+ System.out.println(id + "add " + service.toString());
+ }
+
+ public void serviceRemoved(URI service) {
+ System.out.println(id + "remove " + service.toString());
+ }
+ }
+
+}
Modified: openejb/trunk/openejb3/server/openejb-server/src/main/java/org/apache/openejb/server/ServiceDaemon.java
URL: http://svn.apache.org/viewvc/openejb/trunk/openejb3/server/openejb-server/src/main/java/org/apache/openejb/server/ServiceDaemon.java?rev=690629&r1=690628&r2=690629&view=diff
==============================================================================
--- openejb/trunk/openejb3/server/openejb-server/src/main/java/org/apache/openejb/server/ServiceDaemon.java (original)
+++ openejb/trunk/openejb3/server/openejb-server/src/main/java/org/apache/openejb/server/ServiceDaemon.java Sat Aug 30 20:23:03 2008
@@ -70,7 +70,7 @@
this.next = next;
}
- private static InetAddress getAddress(String host){
+ public static InetAddress getAddress(String host){
try {
return InetAddress.getByName(host);
} catch (UnknownHostException e) {
@@ -88,6 +88,16 @@
}
}
+ public static long getLong(Properties p, String property, long defaultValue){
+ String value = p.getProperty(property);
+ try {
+ if (value != null) return Long.parseLong(value);
+ else return defaultValue;
+ } catch (NumberFormatException e) {
+ return defaultValue;
+ }
+ }
+
public static boolean getBoolean(Properties p, String property, boolean defaultValue){
String value = p.getProperty(property);
try {
Modified: openejb/trunk/openejb3/server/pom.xml
URL: http://svn.apache.org/viewvc/openejb/trunk/openejb3/server/pom.xml?rev=690629&r1=690628&r2=690629&view=diff
==============================================================================
--- openejb/trunk/openejb3/server/pom.xml (original)
+++ openejb/trunk/openejb3/server/pom.xml Sat Aug 30 20:23:03 2008
@@ -42,6 +42,7 @@
<module>openejb-axis</module>
<module>openejb-axis2</module>
<module>openejb-cxf</module>
+ <module>openejb-discovery</module>
</modules>
</project>