You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@tomee.apache.org by an...@apache.org on 2013/12/10 17:54:48 UTC
svn commit: r1549891 - in /tomee/tomee/trunk/server:
openejb-client/src/main/java/org/apache/openejb/client/ openejb-common-cli/
openejb-common-cli/src/main/java/org/apache/openejb/server/cli/
openejb-http/src/main/java/org/apache/openejb/server/httpd/...
Author: andygumbrecht
Date: Tue Dec 10 16:54:47 2013
New Revision: 1549891
URL: http://svn.apache.org/r1549891
Log:
Improve MulticastPulseAgent/MulticastPulseClient by allowing client to notify server of unreachable hosts - Eventually a server will stop sending them out.
Pull some more <version> tags up to parent pom dep-management where they should all be.
Allow server project to build from it's own directory.
Finals.
Modified:
tomee/tomee/trunk/server/openejb-client/src/main/java/org/apache/openejb/client/MulticastPulseClient.java
tomee/tomee/trunk/server/openejb-common-cli/pom.xml
tomee/tomee/trunk/server/openejb-common-cli/src/main/java/org/apache/openejb/server/cli/CliRunnable.java
tomee/tomee/trunk/server/openejb-common-cli/src/main/java/org/apache/openejb/server/cli/StreamManager.java
tomee/tomee/trunk/server/openejb-http/src/main/java/org/apache/openejb/server/httpd/BeginWebBeansListener.java
tomee/tomee/trunk/server/openejb-multicast/src/main/java/org/apache/openejb/server/discovery/MulticastPulseAgent.java
tomee/tomee/trunk/server/openejb-multicast/src/test/java/org/apache/openejb/server/discovery/MulticastPulseAgentTest.java
tomee/tomee/trunk/server/openejb-server/pom.xml
tomee/tomee/trunk/server/openejb-server/src/main/java/org/apache/openejb/server/SimpleServiceManager.java
tomee/tomee/trunk/server/openejb-server/src/test/java/org/apache/openejb/server/FilteredServiceManagerWithAdminTest.java
Modified: tomee/tomee/trunk/server/openejb-client/src/main/java/org/apache/openejb/client/MulticastPulseClient.java
URL: http://svn.apache.org/viewvc/tomee/tomee/trunk/server/openejb-client/src/main/java/org/apache/openejb/client/MulticastPulseClient.java?rev=1549891&r1=1549890&r2=1549891&view=diff
==============================================================================
--- tomee/tomee/trunk/server/openejb-client/src/main/java/org/apache/openejb/client/MulticastPulseClient.java (original)
+++ tomee/tomee/trunk/server/openejb-client/src/main/java/org/apache/openejb/client/MulticastPulseClient.java Tue Dec 10 16:54:47 2013
@@ -64,6 +64,7 @@ public class MulticastPulseClient extend
private static final Logger log = Logger.getLogger("OpenEJB.client");
private static final String SERVER = "OpenEJB.MCP.Server:";
private static final String CLIENT = "OpenEJB.MCP.Client:";
+ private static final String BADURI = ":BadUri:";
private static final String EMPTY = "NoService";
private static final Charset UTF8 = Charset.forName("UTF-8");
private static final int TTL = Integer.parseInt(System.getProperty(ORG_APACHE_OPENEJB_MULTIPULSE_TTL, "32"));
@@ -118,12 +119,7 @@ public class MulticastPulseClient extend
if (null == uriSet || uriSet.isEmpty()) {
- final Map<String, String> params;
- try {
- params = URIs.parseParamters(uri);
- } catch (URISyntaxException e) {
- throw new IllegalArgumentException("Invalid MultiPulse uri " + uri.toString(), e);
- }
+ final Map<String, String> params = getUriParameters(uri);
final Set<String> schemes = getSet(params, "schemes", this.getDefaultSchemes());
final String group = getString(params, "group", "default");
@@ -140,17 +136,39 @@ public class MulticastPulseClient extend
for (final URI serviceURI : uriSet) {
+ //Strip serverhost and group and try to connect
+ final URI tryUri = URI.create(URI.create(serviceURI.getSchemeSpecificPart()).getSchemeSpecificPart());
+
try {
- //Strip serverhost and group and try to connect
- return ConnectionManager.getConnection(URI.create(URI.create(serviceURI.getSchemeSpecificPart()).getSchemeSpecificPart()));
+ return ConnectionManager.getConnection(tryUri);
} catch (Exception e) {
+
uriSet.remove(serviceURI);
+
+ if (java.net.SocketTimeoutException.class.isInstance(e) || SocketException.class.isInstance(e)) {
+ //Notify server that this URI is not reachable
+ MulticastPulseClient.broadcastBadUri(getString(getUriParameters(uri), "group", "default"), tryUri, uri.getHost(), uri.getPort());
+ }
+
+ if (log.isLoggable(Level.FINE)) {
+ log.fine("Failed connection to: " + serviceURI);
+ }
}
}
throw new IOException("Unable to connect an ejb server via the MultiPulse URI: " + uri);
}
+ private static Map<String, String> getUriParameters(final URI uri) {
+ final Map<String, String> params;
+ try {
+ params = URIs.parseParamters(uri);
+ } catch (URISyntaxException e) {
+ throw new IllegalArgumentException("Invalid MultiPulse uri " + uri.toString(), e);
+ }
+ return params;
+ }
+
/**
* Get a list of URIs discovered for the provided request.
* <p/>
@@ -187,17 +205,7 @@ public class MulticastPulseClient extend
throw new Exception("Specify a valid port between 1 and 65535");
}
- final InetAddress ia;
-
- try {
- ia = InetAddress.getByName(host);
- } catch (UnknownHostException e) {
- throw new Exception(host + " is not a valid address", e);
- }
-
- if (null == ia || !ia.isMulticastAddress()) {
- throw new Exception(host + " is not a valid multicast address");
- }
+ final InetAddress ia = getAddress(host);
final byte[] bytes = (MulticastPulseClient.CLIENT + forGroup).getBytes(UTF8);
final DatagramPacket request = new DatagramPacket(bytes, bytes.length, new InetSocketAddress(ia, port));
@@ -492,6 +500,20 @@ public class MulticastPulseClient extend
}
}
+ private static InetAddress getAddress(final String host) throws Exception {
+ final InetAddress ia;
+ try {
+ ia = InetAddress.getByName(host);
+ } catch (UnknownHostException e) {
+ throw new Exception(host + " is not a valid address", e);
+ }
+
+ if (null == ia || !ia.isMulticastAddress()) {
+ throw new Exception(host + " is not a valid multicast address");
+ }
+ return ia;
+ }
+
/**
* Is the provided host a local host
*
@@ -681,7 +703,10 @@ public class MulticastPulseClient extend
s.connect(new InetSocketAddress(host, port), st);
b = true;
} catch (Exception e) {
- //Ignore
+ if (java.net.SocketTimeoutException.class.isInstance(e) || SocketException.class.isInstance(e)) {
+ MulticastPulseClient.broadcastBadUri(group, uriSub, mchost, mcport);
+ System.out.print("" + e + " : ");
+ }
} finally {
try {
s.close();
@@ -716,4 +741,38 @@ public class MulticastPulseClient extend
running.set(false);
t.interrupt();
}
+
+ /**
+ * Asynchronous attempt to broadcast a bad URI on our channel.
+ * Hopefully the culprit server will hear this and stop sending it.
+ *
+ * @param uri Bad URI to broadcast
+ */
+ private static void broadcastBadUri(final String group, final URI uri, final String host, final int port) {
+
+ getExecutorService().submit(new Runnable() {
+ @Override
+ public void run() {
+ try {
+ final InetAddress ia = getAddress(host);
+
+ final byte[] bytes = (MulticastPulseClient.CLIENT + group + MulticastPulseClient.BADURI + uri.getHost()).getBytes(UTF8);
+ final DatagramPacket request = new DatagramPacket(bytes, bytes.length, new InetSocketAddress(ia, port));
+
+ final MulticastSocket[] multicastSockets = MulticastPulseClient.getSockets(ia, port);
+
+ for (final MulticastSocket socket : multicastSockets) {
+
+ try {
+ socket.send(request);
+ } catch (Exception e) {
+ log.log(Level.WARNING, "Failed to broadcast bad URI: " + uri + " on: " + socket.getInterface().getHostAddress(), e);
+ }
+ }
+ } catch (Exception e) {
+ log.log(Level.WARNING, "Failed to broadcast bad URI: " + uri, e);
+ }
+ }
+ });
+ }
}
Modified: tomee/tomee/trunk/server/openejb-common-cli/pom.xml
URL: http://svn.apache.org/viewvc/tomee/tomee/trunk/server/openejb-common-cli/pom.xml?rev=1549891&r1=1549890&r2=1549891&view=diff
==============================================================================
--- tomee/tomee/trunk/server/openejb-common-cli/pom.xml (original)
+++ tomee/tomee/trunk/server/openejb-common-cli/pom.xml Tue Dec 10 16:54:47 2013
@@ -36,20 +36,38 @@
<version>${project.version}</version>
</dependency>
<dependency>
+ <groupId>org.apache.openejb</groupId>
+ <artifactId>openejb-loader</artifactId>
+ <version>${project.version}</version>
+ </dependency>
+ <dependency>
<groupId>jline</groupId>
<artifactId>jline</artifactId>
- <version>0.9.94</version>
</dependency>
<dependency>
<groupId>com.google.code.gson</groupId>
<artifactId>gson</artifactId>
- <version>2.1</version>
</dependency>
<dependency>
+ <groupId>org.apache.xbean</groupId>
+ <artifactId>xbean-reflect</artifactId>
+ </dependency>
+ <dependency>
+ <groupId>org.apache.xbean</groupId>
+ <artifactId>xbean-finder-shaded</artifactId>
+ </dependency>
+ <dependency>
+ <groupId>org.apache.commons</groupId>
+ <artifactId>commons-lang3</artifactId>
+ </dependency>
+
+ <!-- Test scope -->
+ <dependency>
<groupId>org.codehaus.groovy</groupId>
<artifactId>groovy-all</artifactId>
- <version>2.1.0</version>
- <scope>test</scope> <!-- don't deliver it -->
+ <scope>test</scope>
</dependency>
+
+
</dependencies>
</project>
Modified: tomee/tomee/trunk/server/openejb-common-cli/src/main/java/org/apache/openejb/server/cli/CliRunnable.java
URL: http://svn.apache.org/viewvc/tomee/tomee/trunk/server/openejb-common-cli/src/main/java/org/apache/openejb/server/cli/CliRunnable.java?rev=1549891&r1=1549890&r2=1549891&view=diff
==============================================================================
--- tomee/tomee/trunk/server/openejb-common-cli/src/main/java/org/apache/openejb/server/cli/CliRunnable.java (original)
+++ tomee/tomee/trunk/server/openejb-common-cli/src/main/java/org/apache/openejb/server/cli/CliRunnable.java Tue Dec 10 16:54:47 2013
@@ -42,6 +42,7 @@ import java.util.Properties;
import java.util.TreeMap;
public class CliRunnable implements Runnable {
+
private static final Logger LOGGER = Logger.getInstance(LogCategory.OPENEJB_SERVER, CliRunnable.class);
private static final String BRANDING_FILE = "branding.properties";
@@ -87,8 +88,11 @@ public class CliRunnable implements Runn
UrlSet urlSet = new UrlSet(loader).excludeJvm();
urlSet = urlSet.exclude(loader.getParent());
- final IAnnotationFinder finder = new AnnotationFinder(new ConfigurableClasspathArchive(new ConfigurableClasspathArchive.FakeModule(loader, Collections.EMPTY_MAP), true, urlSet.getUrls()));
- for (Annotated<Class<?>> cmd : finder.findMetaAnnotatedClasses(Command.class)) {
+ //noinspection unchecked
+ final IAnnotationFinder finder = new AnnotationFinder(new ConfigurableClasspathArchive(new ConfigurableClasspathArchive.FakeModule(loader, Collections.EMPTY_MAP),
+ true,
+ urlSet.getUrls()));
+ for (final Annotated<Class<?>> cmd : finder.findMetaAnnotatedClasses(Command.class)) {
try {
final Command annotation = cmd.getAnnotation(Command.class);
final String key = annotation.name();
@@ -113,15 +117,15 @@ public class CliRunnable implements Runn
private OutputStream err;
private OutputStream out;
private InputStream sin;
- private String username;
- private String bind;
- private int port;
+ private final String username;
+ private final String bind;
+ private final int port;
- public CliRunnable(String bind, int port) {
+ public CliRunnable(final String bind, final int port) {
this(bind, port, PROMPT, null);
}
- public CliRunnable(String bind, int port, String username, String sep) {
+ public CliRunnable(final String bind, final int port, final String username, final String sep) {
this.bind = bind;
this.port = port;
this.username = username;
@@ -144,15 +148,15 @@ public class CliRunnable implements Runn
}
}
- public void setInputStream(InputStream in) {
+ public void setInputStream(final InputStream in) {
sin = in;
}
- public void setOutputStream(OutputStream out) {
+ public void setOutputStream(final OutputStream out) {
this.out = out;
}
- public void setErrorStream(OutputStream err) {
+ public void setErrorStream(final OutputStream err) {
this.err = err;
}
@@ -165,9 +169,10 @@ public class CliRunnable implements Runn
}
public void clean() {
- scripter.clearEngines();
+ OpenEJBScripter.clearEngines();
}
+ @Override
public void run() {
clean();
@@ -180,13 +185,13 @@ public class CliRunnable implements Runn
// TODO : add completers
String line;
- StringBuilder builtWelcome = new StringBuilder("Apache OpenEJB ")
- .append(OpenEjbVersion.get().getVersion())
- .append(" build: ")
- .append(OpenEjbVersion.get().getDate())
- .append("-")
- .append(OpenEjbVersion.get().getTime())
- .append(lineSep);
+ final StringBuilder builtWelcome = new StringBuilder("Apache OpenEJB ")
+ .append(OpenEjbVersion.get().getVersion())
+ .append(" build: ")
+ .append(OpenEjbVersion.get().getDate())
+ .append("-")
+ .append(OpenEjbVersion.get().getTime())
+ .append(lineSep);
if (tomee) {
builtWelcome.append(OS_LINE_SEP).append(PROPERTIES.getProperty(WELCOME_TOMEE_KEY));
} else {
@@ -196,10 +201,10 @@ public class CliRunnable implements Runn
streamManager.writeOut(OpenEjbVersion.get().getUrl());
streamManager.writeOut(builtWelcome.toString()
- .replace("$bind", bind)
- .replace("$port", Integer.toString(port))
- .replace("$name", NAME)
- .replace(OS_LINE_SEP, lineSep));
+ .replace("$bind", bind)
+ .replace("$port", Integer.toString(port))
+ .replace("$name", NAME)
+ .replace(OS_LINE_SEP, lineSep));
while ((line = reader.readLine(prompt())) != null) {
// exit simply let us go out of the loop
@@ -210,7 +215,7 @@ public class CliRunnable implements Runn
Class<?> cmdClass = null;
String key = null;
- for (Map.Entry<String, Class<?>> cmd : COMMANDS.entrySet()) {
+ for (final Map.Entry<String, Class<?>> cmd : COMMANDS.entrySet()) {
if (line.startsWith(cmd.getKey())) {
cmdClass = cmd.getValue();
key = cmd.getKey();
@@ -270,8 +275,8 @@ public class CliRunnable implements Runn
prompt.append(PROMPT);
}
prompt.append(" @ ")
- .append(bind).append(":").append(port)
- .append(PROMPT_SUFFIX);
+ .append(bind).append(":").append(port)
+ .append(PROMPT_SUFFIX);
return prompt.toString();
}
}
Modified: tomee/tomee/trunk/server/openejb-common-cli/src/main/java/org/apache/openejb/server/cli/StreamManager.java
URL: http://svn.apache.org/viewvc/tomee/tomee/trunk/server/openejb-common-cli/src/main/java/org/apache/openejb/server/cli/StreamManager.java?rev=1549891&r1=1549890&r2=1549891&view=diff
==============================================================================
--- tomee/tomee/trunk/server/openejb-common-cli/src/main/java/org/apache/openejb/server/cli/StreamManager.java (original)
+++ tomee/tomee/trunk/server/openejb-common-cli/src/main/java/org/apache/openejb/server/cli/StreamManager.java Tue Dec 10 16:54:47 2013
@@ -26,6 +26,7 @@ import java.io.OutputStreamWriter;
import java.util.Collection;
public class StreamManager {
+
private static final String OS_LINE_SEP = System.getProperty("line.separator");
private String lineSep;
@@ -34,16 +35,16 @@ public class StreamManager {
private OutputStream out;
private OutputStream err;
- public StreamManager(OutputStream out, OutputStream err, String lineSep) {
+ public StreamManager(final OutputStream out, final OutputStream err, final String lineSep) {
this.lineSep = lineSep;
this.out = out;
this.err = err;
this.sout = new OutputStreamWriter(out);
- this.serr= new OutputStreamWriter(err);
+ this.serr = new OutputStreamWriter(err);
}
private void write(final OutputStreamWriter writer, final String s) {
- for (String l : s.split(lineSep)) {
+ for (final String l : s.split(lineSep)) {
try {
writer.write(l);
writer.write(lineSep);
@@ -64,7 +65,7 @@ public class StreamManager {
} else {
final StringBuilder error = new StringBuilder();
error.append(e.getMessage()).append(lineSep);
- for (StackTraceElement elt : e.getStackTrace()) {
+ for (final StackTraceElement elt : e.getStackTrace()) {
error.append(" ").append(elt.toString()).append(lineSep);
}
write(serr, error.toString());
@@ -77,7 +78,7 @@ public class StreamManager {
}
if (out instanceof Collection) {
final StringBuilder builder = new StringBuilder();
- for (Object o : (Collection) out) {
+ for (final Object o : (Collection) out) {
builder.append(string(o, lineSep)).append(lineSep);
}
return builder.toString();
@@ -89,10 +90,10 @@ public class StreamManager {
if (!out.getClass().getName().startsWith("java")) {
try {
return new GsonBuilder().setPrettyPrinting().create().toJson(out)
- .replace(OS_LINE_SEP, lineSep);
+ .replace(OS_LINE_SEP, lineSep);
} catch (RuntimeException re) {
return ToStringBuilder.reflectionToString(out, ToStringStyle.SHORT_PREFIX_STYLE)
- .replace(OS_LINE_SEP, lineSep);
+ .replace(OS_LINE_SEP, lineSep);
}
}
return out.toString();
@@ -123,7 +124,7 @@ public class StreamManager {
}
public void writeOut(final String text, final String sep) {
- for (String line : text.split(sep)) {
+ for (final String line : text.split(sep)) {
writeOut(line);
}
}
Modified: tomee/tomee/trunk/server/openejb-http/src/main/java/org/apache/openejb/server/httpd/BeginWebBeansListener.java
URL: http://svn.apache.org/viewvc/tomee/tomee/trunk/server/openejb-http/src/main/java/org/apache/openejb/server/httpd/BeginWebBeansListener.java?rev=1549891&r1=1549890&r2=1549891&view=diff
==============================================================================
--- tomee/tomee/trunk/server/openejb-http/src/main/java/org/apache/openejb/server/httpd/BeginWebBeansListener.java (original)
+++ tomee/tomee/trunk/server/openejb-http/src/main/java/org/apache/openejb/server/httpd/BeginWebBeansListener.java Tue Dec 10 16:54:47 2013
@@ -61,7 +61,7 @@ public class BeginWebBeansListener imple
*
* @param webBeansContext the OWB context
*/
- public BeginWebBeansListener(WebBeansContext webBeansContext) {
+ public BeginWebBeansListener(final WebBeansContext webBeansContext) {
this.webBeansContext = webBeansContext;
this.failoverService = this.webBeansContext.getService(FailOverService.class);
this.contextKey = "org.apache.tomee.catalina.WebBeansListener@" + webBeansContext.hashCode();
@@ -71,7 +71,7 @@ public class BeginWebBeansListener imple
* {@inheritDoc}
*/
@Override
- public void requestDestroyed(ServletRequestEvent event) {
+ public void requestDestroyed(final ServletRequestEvent event) {
// no-op
}
@@ -127,7 +127,7 @@ public class BeginWebBeansListener imple
* {@inheritDoc}
*/
@Override
- public void sessionDestroyed(HttpSessionEvent event) {
+ public void sessionDestroyed(final HttpSessionEvent event) {
ensureRequestScope();
}
@@ -139,21 +139,20 @@ public class BeginWebBeansListener imple
}
}
-
@Override
- public void sessionWillPassivate(HttpSessionEvent event) {
+ public void sessionWillPassivate(final HttpSessionEvent event) {
ensureRequestScope();
}
@Override
- public void sessionDidActivate(HttpSessionEvent event) {
+ public void sessionDidActivate(final HttpSessionEvent event) {
if (failoverService.isSupportFailOver() || failoverService.isSupportPassivation()) {
failoverService.sessionDidActivate(event.getSession());
}
}
@Override
- public void contextInitialized(ServletContextEvent servletContextEvent) {
+ public void contextInitialized(final ServletContextEvent servletContextEvent) {
try {
OpenEJBLifecycle.initializeServletContext(servletContextEvent.getServletContext(), webBeansContext);
} catch (final Exception e) {
@@ -163,7 +162,7 @@ public class BeginWebBeansListener imple
}
@Override
- public void contextDestroyed(ServletContextEvent servletContextEvent) {
+ public void contextDestroyed(final ServletContextEvent servletContextEvent) {
ensureRequestScope();
}
}
Modified: tomee/tomee/trunk/server/openejb-multicast/src/main/java/org/apache/openejb/server/discovery/MulticastPulseAgent.java
URL: http://svn.apache.org/viewvc/tomee/tomee/trunk/server/openejb-multicast/src/main/java/org/apache/openejb/server/discovery/MulticastPulseAgent.java?rev=1549891&r1=1549890&r2=1549891&view=diff
==============================================================================
--- tomee/tomee/trunk/server/openejb-multicast/src/main/java/org/apache/openejb/server/discovery/MulticastPulseAgent.java (original)
+++ tomee/tomee/trunk/server/openejb-multicast/src/main/java/org/apache/openejb/server/discovery/MulticastPulseAgent.java Tue Dec 10 16:54:47 2013
@@ -26,6 +26,7 @@ import java.net.URI;
import java.net.UnknownHostException;
import java.nio.charset.Charset;
import java.util.ArrayList;
+import java.util.Collections;
import java.util.Comparator;
import java.util.Enumeration;
import java.util.HashSet;
@@ -39,6 +40,7 @@ import java.util.concurrent.Future;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.atomic.AtomicBoolean;
import java.util.concurrent.atomic.AtomicInteger;
+import java.util.concurrent.locks.ReentrantLock;
/**
* Licensed to the Apache Software Foundation (ASF) under one or more
@@ -66,9 +68,11 @@ public class MulticastPulseAgent impleme
public static final String SERVER = "OpenEJB.MCP.Server:";
public static final String CLIENT = "OpenEJB.MCP.Client:";
+ public static final String BADURI = ":BadUri:";
public static final String EMPTY = "NoService";
- private final Set<String> ignore = new HashSet<String>();
+ private final ReentrantLock lock = new ReentrantLock();
+ private final Set<String> ignore = Collections.synchronizedSet(new HashSet<String>());
private final Set<URI> uriSet = new HashSet<URI>();
private AtomicBoolean running = new AtomicBoolean(false);
final ArrayList<Future> futures = new ArrayList<Future>();
@@ -84,12 +88,19 @@ public class MulticastPulseAgent impleme
private boolean loopbackOnly = true;
/**
+ * @author Andy Gumbrecht
* This agent listens for client pulses on a defined multicast channel.
* On receipt of a valid pulse the agent responds with its own pulse for
* a defined amount of time and rate. A client can deliver a pulse as often as
* required until it is happy of the server response.
* <p/>
* Both server and client deliver crafted information payloads.
+ * <p/>
+ * The client pulse contains OpenEJB.MCP.Client:(group or *)[:BadUri:URI]
+ * The server will only respond to a request for it's own group or *
+ * The optional :BadUri: is used by clients to notify a server that it is sending out unreachable URI's
+ * <p/>
+ * The server response pulse contains OpenEJB.MCP.Server:(Service|Service)|(Comma separated host list)
*/
public MulticastPulseAgent() {
}
@@ -111,7 +122,7 @@ public class MulticastPulseAgent impleme
length = 1;
}
- executor = Executors.newFixedThreadPool(length * 2);
+ executor = Executors.newFixedThreadPool(length * 3);
}
return executor;
@@ -134,7 +145,7 @@ public class MulticastPulseAgent impleme
}
}
} catch (Exception e) {
- log.warning("Invalid ignore parameter. Should be a lowercase single or comma seperated list like: ignore=host1,host2");
+ log.warning("Invalid ignore parameter. Should be a lowercase single host or comma seperated list of hosts to ignore like: ignore=host1,host2,ipv4,ipv6");
}
this.multicast = o.get("bind", this.multicast);
@@ -148,41 +159,59 @@ public class MulticastPulseAgent impleme
private void buildPacket() throws SocketException {
- this.loopbackOnly = true;
- for (final URI uri : this.uriSet) {
- if (!isLoopback(uri.getHost())) {
- this.loopbackOnly = false;
- break;
+ final ReentrantLock l = this.lock;
+ l.lock();
+
+ try {
+ this.loopbackOnly = true;
+ for (final URI uri : this.uriSet) {
+ if (!isLoopback(uri.getHost())) {
+ this.loopbackOnly = false;
+ break;
+ }
}
- }
- final String hosts = getHosts(this.ignore);
- final StringBuilder sb = new StringBuilder(SERVER);
- sb.append(this.group);
- sb.append(':');
+ final String hosts = getHosts(this.ignore);
+ final StringBuilder sb = new StringBuilder(SERVER);
+ sb.append(this.group);
+ sb.append(':');
- if (this.uriSet.size() > 0) {
- for (final URI uri : this.uriSet) {
- sb.append(uri.toASCIIString());
+ if (this.uriSet.size() > 0) {
+ for (final URI uri : this.uriSet) {
+ sb.append(uri.toASCIIString());
+ sb.append('|');
+ }
+ } else {
+ sb.append(EMPTY);
sb.append('|');
}
- } else {
- sb.append(EMPTY);
- sb.append('|');
- }
- sb.append(hosts);
+ sb.append(hosts);
+
+ final byte[] bytes = (sb.toString()).getBytes(UTF8);
+ this.response = new DatagramPacket(bytes, bytes.length, this.address);
- final byte[] bytes = (sb.toString()).getBytes(UTF8);
- this.response = new DatagramPacket(bytes, bytes.length, this.address);
+ if (log.isDebugEnabled()) {
+ log.debug("MultiPulse packet is: " + sb);
+ }
- if (log.isDebugEnabled()) {
- log.debug("MultiPulse packet is: " + sb);
+ if (bytes.length > 2048) {
+ log.warning("MultiPulse packet is larger than 2048 bytes, clients will not be able to read the packet" +
+ "\n - You should define the 'ignore' property to filter out unreachable addresses: " + sb);
+ }
+ } finally {
+ l.unlock();
}
+ }
- if (bytes.length > 2048) {
- log.warning("MultiPulse packet is larger than 2048 bytes, clients will not be able to read the packet" +
- "\n - You should define the 'ignore' property to filter out unreachable addresses: " + sb);
+ public DatagramPacket getResponsePacket() {
+ final ReentrantLock l = this.lock;
+ l.lock();
+
+ try {
+ return this.response;
+ } finally {
+ l.unlock();
}
}
@@ -191,6 +220,10 @@ public class MulticastPulseAgent impleme
this.listener = listener;
}
+ public DiscoveryListener getDiscoveryListener() {
+ return listener;
+ }
+
@Override
public void registerService(URI uri) throws IOException {
@@ -267,7 +300,7 @@ public class MulticastPulseAgent impleme
continue;
}
- final Sender sender = new Sender(this, socketKey, socket, this.response);
+ final Sender sender = new Sender(this, socketKey, socket);
this.futures.add(executorService.submit(sender));
this.futures.add(executorService.submit(new Runnable() {
@@ -289,34 +322,64 @@ public class MulticastPulseAgent impleme
if (req.startsWith(CLIENT)) {
- req = (req.replace(CLIENT, ""));
+ final int ix = req.indexOf(BADURI);
+ String badUri = null;
+
+ if (ix > 0) {
+ //The client is notifying of a bad uri
+ badUri = req.substring(ix).replace(BADURI, "");
+ req = req.substring(0, ix).replace(CLIENT, "");
+ } else {
+ req = (req.replace(CLIENT, ""));
+ }
+ //Is this a group or global pulse request
if (mpg.equals(req) || "*".equals(req)) {
- final String client = ((InetSocketAddress) sa).getAddress().getHostAddress();
+ //Is there a bad url and is it this agent broadcasting the bad URI?
+ if (null != badUri && getHosts(MulticastPulseAgent.this.ignore).contains(badUri)) {
+ final ReentrantLock l = MulticastPulseAgent.this.lock;
+ l.lock();
+
+ try {
+ //Remove it and rebuild our broadcast packet
+ if (MulticastPulseAgent.this.ignore.add(badUri)) {
+ MulticastPulseAgent.this.buildPacket();
+
+ MulticastPulseAgent.this.fireEvent(URI.create("OpenEJB" + BADURI + badUri), false);
+
+ log.warning("This server has removed the unreachable host '" + badUri + "' from discovery, you should consider adding" +
+ " this to the 'ignore' property in the multipulse.properties file");
+ }
+
+ } finally {
+ l.unlock();
+ }
+ } else {
- if (isLoopBackOnly) {
- //We only have local services, so make sure the request is from a local source else ignore it
- if (!MulticastPulseAgent.isLocalAddress(client, false)) {
+ //Normal client multicast pulse request
+ final String client = ((InetSocketAddress) sa).getAddress().getHostAddress();
+
+ if (isLoopBackOnly && !MulticastPulseAgent.isLocalAddress(client, false)) {
+ //We only have local services, so make sure the request is from a local source else ignore it
if (log.isDebugEnabled()) {
log.debug(String.format("Ignoring remote client %1$s pulse request for group: %2$s - No remote services available",
client,
req));
}
- return;
- }
- }
+ } else {
- //We have received a valid pulse request
- if (log.isDebugEnabled()) {
- log.debug(String.format("Answering client '%1$s' pulse request for group: '%2$s' on '%3$s'", client, req, socketKey));
- }
+ //We have received a valid pulse request
+ if (log.isDebugEnabled()) {
+ log.debug(String.format("Answering client '%1$s' pulse request for group: '%2$s' on '%3$s'", client, req, socketKey));
+ }
- //Renew response pulse
- sender.pulseResponse();
+ //Renew response pulse
+ sender.pulseResponse();
+ }
+ }
}
}
-
}
} catch (Exception e) {
@@ -412,6 +475,27 @@ public class MulticastPulseAgent impleme
}
/**
+ * Lists current broadcast hosts as a comma separated list.
+ * Used principally for testing.
+ *
+ * @return String
+ */
+ public String getHosts() {
+ return getHosts(this.ignore);
+ }
+
+ /**
+ * Remove a host from the ignore list.
+ * Used principally for testing.
+ *
+ * @param host String
+ * @return True if removed, else false
+ */
+ public boolean removeFromIgnore(final String host) {
+ return this.ignore.remove(host);
+ }
+
+ /**
* Attempts to return at least one socket per valid network interface.
* If no valid interface is found then the array will be empty.
*
@@ -612,13 +696,11 @@ public class MulticastPulseAgent impleme
private final MulticastPulseAgent agent;
private final String socketKey;
private final MulticastSocket socket;
- private final DatagramPacket mpr;
- private Sender(final MulticastPulseAgent agent, final String socketKey, final MulticastSocket socket, final DatagramPacket mpr) {
+ private Sender(final MulticastPulseAgent agent, final String socketKey, final MulticastSocket socket) {
this.agent = agent;
this.socketKey = socketKey;
this.socket = socket;
- this.mpr = mpr;
}
@Override
@@ -640,7 +722,7 @@ public class MulticastPulseAgent impleme
while (this.counter.decrementAndGet() > 0) {
try {
- this.socket.send(this.mpr);
+ this.socket.send(this.agent.getResponsePacket());
} catch (Exception e) {
if (log.isDebugEnabled()) {
log.debug("MulticastPulseAgent client error: " + e.getMessage(), e);
Modified: tomee/tomee/trunk/server/openejb-multicast/src/test/java/org/apache/openejb/server/discovery/MulticastPulseAgentTest.java
URL: http://svn.apache.org/viewvc/tomee/tomee/trunk/server/openejb-multicast/src/test/java/org/apache/openejb/server/discovery/MulticastPulseAgentTest.java?rev=1549891&r1=1549890&r2=1549891&view=diff
==============================================================================
--- tomee/tomee/trunk/server/openejb-multicast/src/test/java/org/apache/openejb/server/discovery/MulticastPulseAgentTest.java (original)
+++ tomee/tomee/trunk/server/openejb-multicast/src/test/java/org/apache/openejb/server/discovery/MulticastPulseAgentTest.java Tue Dec 10 16:54:47 2013
@@ -48,11 +48,6 @@ import java.util.concurrent.TimeUnit;
import java.util.concurrent.atomic.AtomicBoolean;
import java.util.concurrent.locks.ReentrantLock;
-/**
- * Copyright (c) ORPRO Vision GmbH.
- * Author: Andy Gumbrecht
- * Date: 11.06.12
- */
@SuppressWarnings("UseOfSystemOutOrSystemErr")
public class MulticastPulseAgentTest {
@@ -229,7 +224,10 @@ public class MulticastPulseAgentTest {
final String[] serviceList = services.split("\\|");
final String[] hosts = s.split(",");
- System.out.println(String.format("\n" + name + " received Server pulse:\n\tGroup: %1$s\n\tServices: %2$s\n\tServer: %3$s\n", group, services, s));
+ System.out.println(String.format("\n" + name + " received Server pulse:\n\tGroup: %1$s\n\tServices: %2$s\n\tServer: %3$s\n",
+ group,
+ services,
+ s));
for (final String svc : serviceList) {
@@ -392,6 +390,69 @@ public class MulticastPulseAgentTest {
org.junit.Assert.assertTrue(timeout == 1 || set.size() > 0);
}
+ @Test
+ public void testBroadcastBadUri() throws Exception {
+
+ final DiscoveryListener original = agent.getDiscoveryListener();
+
+ final CountDownLatch latch = new CountDownLatch(1);
+
+ final DiscoveryListener listener = new DiscoveryListener() {
+ @Override
+ public void serviceAdded(final URI service) {
+ latch.countDown();
+ System.out.println("added = " + service);
+ }
+
+ @Override
+ public void serviceRemoved(final URI service) {
+ latch.countDown();
+ System.out.println("removed = " + service);
+ }
+ };
+
+ agent.setDiscoveryListener(listener);
+
+ final String[] hosts = agent.getHosts().split(",");
+ final String host = hosts[hosts.length - 1];
+
+ final Future<?> future = executor.submit(new Runnable() {
+ @Override
+ public void run() {
+ try {
+ final InetAddress ia = getAddress(MulticastPulseAgentTest.host);
+
+ final byte[] bytes = (MulticastPulseAgent.CLIENT + forGroup + MulticastPulseAgent.BADURI + host).getBytes(Charset.forName("UTF-8"));
+ final DatagramPacket request = new DatagramPacket(bytes, bytes.length, new InetSocketAddress(ia, port));
+
+ final MulticastSocket[] multicastSockets = MulticastPulseAgent.getSockets(MulticastPulseAgentTest.host, port);
+
+ for (final MulticastSocket socket : multicastSockets) {
+
+ try {
+ socket.send(request);
+ } catch (Exception e) {
+ System.out.println("Failed to broadcast bad URI on: " + socket.getInterface().getHostAddress());
+ e.printStackTrace();
+ }
+ }
+ } catch (Exception e) {
+ System.out.println("Failed to broadcast bad URI");
+ e.printStackTrace();
+ }
+ }
+ });
+
+ final Object o = future.get(10, TimeUnit.SECONDS);
+
+ final boolean await = latch.await(20, TimeUnit.SECONDS);
+ final boolean removed = agent.removeFromIgnore(host);
+
+ agent.setDiscoveryListener(original);
+
+ org.junit.Assert.assertTrue("Failed to remove host", removed && await);
+ }
+
private String ipFormat(final String h) throws UnknownHostException {
final InetAddress ia = InetAddress.getByName(h);
@@ -402,6 +463,20 @@ public class MulticastPulseAgentTest {
}
}
+ private static InetAddress getAddress(final String host) throws Exception {
+ final InetAddress ia;
+ try {
+ ia = InetAddress.getByName(host);
+ } catch (UnknownHostException e) {
+ throw new Exception(host + " is not a valid address", e);
+ }
+
+ if (null == ia || !ia.isMulticastAddress()) {
+ throw new Exception(host + " is not a valid multicast address");
+ }
+ return ia;
+ }
+
private static class MyDiscoveryListener implements DiscoveryListener {
private final String id;
Modified: tomee/tomee/trunk/server/openejb-server/pom.xml
URL: http://svn.apache.org/viewvc/tomee/tomee/trunk/server/openejb-server/pom.xml?rev=1549891&r1=1549890&r2=1549891&view=diff
==============================================================================
--- tomee/tomee/trunk/server/openejb-server/pom.xml (original)
+++ tomee/tomee/trunk/server/openejb-server/pom.xml Tue Dec 10 16:54:47 2013
@@ -65,6 +65,11 @@
</dependency>
<dependency>
<groupId>org.apache.openejb</groupId>
+ <artifactId>openejb-jee</artifactId>
+ <version>${project.version}</version>
+ </dependency>
+ <dependency>
+ <groupId>org.apache.openejb</groupId>
<artifactId>openejb-loader</artifactId>
<version>${project.version}</version>
</dependency>
@@ -84,6 +89,10 @@
</dependency>
<dependency>
<groupId>org.apache.xbean</groupId>
+ <artifactId>xbean-finder-shaded</artifactId>
+ </dependency>
+ <dependency>
+ <groupId>org.apache.xbean</groupId>
<artifactId>xbean-reflect</artifactId>
</dependency>
<dependency>
Modified: tomee/tomee/trunk/server/openejb-server/src/main/java/org/apache/openejb/server/SimpleServiceManager.java
URL: http://svn.apache.org/viewvc/tomee/tomee/trunk/server/openejb-server/src/main/java/org/apache/openejb/server/SimpleServiceManager.java?rev=1549891&r1=1549890&r2=1549891&view=diff
==============================================================================
--- tomee/tomee/trunk/server/openejb-server/src/main/java/org/apache/openejb/server/SimpleServiceManager.java (original)
+++ tomee/tomee/trunk/server/openejb-server/src/main/java/org/apache/openejb/server/SimpleServiceManager.java Tue Dec 10 16:54:47 2013
@@ -155,7 +155,7 @@ public class SimpleServiceManager extend
@Override
public synchronized void start(final boolean block) throws ServiceException {
- if(stopped){
+ if (stopped) {
throw new ServiceException("Stop has already been called on ServiceManager");
}
Modified: tomee/tomee/trunk/server/openejb-server/src/test/java/org/apache/openejb/server/FilteredServiceManagerWithAdminTest.java
URL: http://svn.apache.org/viewvc/tomee/tomee/trunk/server/openejb-server/src/test/java/org/apache/openejb/server/FilteredServiceManagerWithAdminTest.java?rev=1549891&r1=1549890&r2=1549891&view=diff
==============================================================================
--- tomee/tomee/trunk/server/openejb-server/src/test/java/org/apache/openejb/server/FilteredServiceManagerWithAdminTest.java (original)
+++ tomee/tomee/trunk/server/openejb-server/src/test/java/org/apache/openejb/server/FilteredServiceManagerWithAdminTest.java Tue Dec 10 16:54:47 2013
@@ -40,12 +40,12 @@ public class FilteredServiceManagerWithA
}
@Test
- public void numberOfServices () {
+ public void numberOfServices() {
// when using @EnableServices with the application composer
// the return value should be a FilteredServiceManager
assertEquals(FilteredServiceManager.class, ServiceManager.get().getClass());
- FilteredServiceManager manager = (FilteredServiceManager) ServiceManager.get();
+ final FilteredServiceManager manager = (FilteredServiceManager) ServiceManager.get();
assertEquals(1, manager.getDaemons().length);
assertEquals("admin", manager.getDaemons()[0].getName());
}
Re: svn commit: r1549891 - in /tomee/tomee/trunk/server:
openejb-client/src/main/java/org/apache/openejb/client/ openejb-common-cli/
openejb-common-cli/src/main/java/org/apache/openejb/server/cli/ openejb-http/src/main/java/org/apache/openejb/server/httpd/...
Posted by Thiago Veronezi <th...@veronezi.org>.
Hi Andy,
Did you miss a commit? The build is broken. It looks like the versions are
missing.
[]s,
Thiago.
On Tue, Dec 10, 2013 at 11:54 AM, <an...@apache.org> wrote:
> Author: andygumbrecht
> Date: Tue Dec 10 16:54:47 2013
> New Revision: 1549891
>
> URL: http://svn.apache.org/r1549891
> Log:
> Improve MulticastPulseAgent/MulticastPulseClient by allowing client to
> notify server of unreachable hosts - Eventually a server will stop sending
> them out.
> Pull some more <version> tags up to parent pom dep-management where they
> should all be.
> Allow server project to build from it's own directory.
> Finals.
>
> Modified:
>
> tomee/tomee/trunk/server/openejb-client/src/main/java/org/apache/openejb/client/MulticastPulseClient.java
> tomee/tomee/trunk/server/openejb-common-cli/pom.xml
>
> tomee/tomee/trunk/server/openejb-common-cli/src/main/java/org/apache/openejb/server/cli/CliRunnable.java
>
> tomee/tomee/trunk/server/openejb-common-cli/src/main/java/org/apache/openejb/server/cli/StreamManager.java
>
> tomee/tomee/trunk/server/openejb-http/src/main/java/org/apache/openejb/server/httpd/BeginWebBeansListener.java
>
> tomee/tomee/trunk/server/openejb-multicast/src/main/java/org/apache/openejb/server/discovery/MulticastPulseAgent.java
>
> tomee/tomee/trunk/server/openejb-multicast/src/test/java/org/apache/openejb/server/discovery/MulticastPulseAgentTest.java
> tomee/tomee/trunk/server/openejb-server/pom.xml
>
> tomee/tomee/trunk/server/openejb-server/src/main/java/org/apache/openejb/server/SimpleServiceManager.java
>
> tomee/tomee/trunk/server/openejb-server/src/test/java/org/apache/openejb/server/FilteredServiceManagerWithAdminTest.java
>
> Modified:
> tomee/tomee/trunk/server/openejb-client/src/main/java/org/apache/openejb/client/MulticastPulseClient.java
> URL:
> http://svn.apache.org/viewvc/tomee/tomee/trunk/server/openejb-client/src/main/java/org/apache/openejb/client/MulticastPulseClient.java?rev=1549891&r1=1549890&r2=1549891&view=diff
>
> ==============================================================================
> ---
> tomee/tomee/trunk/server/openejb-client/src/main/java/org/apache/openejb/client/MulticastPulseClient.java
> (original)
> +++
> tomee/tomee/trunk/server/openejb-client/src/main/java/org/apache/openejb/client/MulticastPulseClient.java
> Tue Dec 10 16:54:47 2013
> @@ -64,6 +64,7 @@ public class MulticastPulseClient extend
> private static final Logger log = Logger.getLogger("OpenEJB.client");
> private static final String SERVER = "OpenEJB.MCP.Server:";
> private static final String CLIENT = "OpenEJB.MCP.Client:";
> + private static final String BADURI = ":BadUri:";
> private static final String EMPTY = "NoService";
> private static final Charset UTF8 = Charset.forName("UTF-8");
> private static final int TTL =
> Integer.parseInt(System.getProperty(ORG_APACHE_OPENEJB_MULTIPULSE_TTL,
> "32"));
> @@ -118,12 +119,7 @@ public class MulticastPulseClient extend
>
> if (null == uriSet || uriSet.isEmpty()) {
>
> - final Map<String, String> params;
> - try {
> - params = URIs.parseParamters(uri);
> - } catch (URISyntaxException e) {
> - throw new IllegalArgumentException("Invalid MultiPulse
> uri " + uri.toString(), e);
> - }
> + final Map<String, String> params = getUriParameters(uri);
>
> final Set<String> schemes = getSet(params, "schemes",
> this.getDefaultSchemes());
> final String group = getString(params, "group", "default");
> @@ -140,17 +136,39 @@ public class MulticastPulseClient extend
>
> for (final URI serviceURI : uriSet) {
>
> + //Strip serverhost and group and try to connect
> + final URI tryUri =
> URI.create(URI.create(serviceURI.getSchemeSpecificPart()).getSchemeSpecificPart());
> +
> try {
> - //Strip serverhost and group and try to connect
> - return
> ConnectionManager.getConnection(URI.create(URI.create(serviceURI.getSchemeSpecificPart()).getSchemeSpecificPart()));
> + return ConnectionManager.getConnection(tryUri);
> } catch (Exception e) {
> +
> uriSet.remove(serviceURI);
> +
> + if (java.net.SocketTimeoutException.class.isInstance(e)
> || SocketException.class.isInstance(e)) {
> + //Notify server that this URI is not reachable
> +
> MulticastPulseClient.broadcastBadUri(getString(getUriParameters(uri),
> "group", "default"), tryUri, uri.getHost(), uri.getPort());
> + }
> +
> + if (log.isLoggable(Level.FINE)) {
> + log.fine("Failed connection to: " + serviceURI);
> + }
> }
> }
>
> throw new IOException("Unable to connect an ejb server via the
> MultiPulse URI: " + uri);
> }
>
> + private static Map<String, String> getUriParameters(final URI uri) {
> + final Map<String, String> params;
> + try {
> + params = URIs.parseParamters(uri);
> + } catch (URISyntaxException e) {
> + throw new IllegalArgumentException("Invalid MultiPulse uri "
> + uri.toString(), e);
> + }
> + return params;
> + }
> +
> /**
> * Get a list of URIs discovered for the provided request.
> * <p/>
> @@ -187,17 +205,7 @@ public class MulticastPulseClient extend
> throw new Exception("Specify a valid port between 1 and
> 65535");
> }
>
> - final InetAddress ia;
> -
> - try {
> - ia = InetAddress.getByName(host);
> - } catch (UnknownHostException e) {
> - throw new Exception(host + " is not a valid address", e);
> - }
> -
> - if (null == ia || !ia.isMulticastAddress()) {
> - throw new Exception(host + " is not a valid multicast
> address");
> - }
> + final InetAddress ia = getAddress(host);
>
> final byte[] bytes = (MulticastPulseClient.CLIENT +
> forGroup).getBytes(UTF8);
> final DatagramPacket request = new DatagramPacket(bytes,
> bytes.length, new InetSocketAddress(ia, port));
> @@ -492,6 +500,20 @@ public class MulticastPulseClient extend
> }
> }
>
> + private static InetAddress getAddress(final String host) throws
> Exception {
> + final InetAddress ia;
> + try {
> + ia = InetAddress.getByName(host);
> + } catch (UnknownHostException e) {
> + throw new Exception(host + " is not a valid address", e);
> + }
> +
> + if (null == ia || !ia.isMulticastAddress()) {
> + throw new Exception(host + " is not a valid multicast
> address");
> + }
> + return ia;
> + }
> +
> /**
> * Is the provided host a local host
> *
> @@ -681,7 +703,10 @@ public class MulticastPulseClient extend
> s.connect(new InetSocketAddress(host,
> port), st);
> b = true;
> } catch (Exception e) {
> - //Ignore
> + if
> (java.net.SocketTimeoutException.class.isInstance(e) ||
> SocketException.class.isInstance(e)) {
> +
> MulticastPulseClient.broadcastBadUri(group, uriSub, mchost, mcport);
> + System.out.print("" + e + " : ");
> + }
> } finally {
> try {
> s.close();
> @@ -716,4 +741,38 @@ public class MulticastPulseClient extend
> running.set(false);
> t.interrupt();
> }
> +
> + /**
> + * Asynchronous attempt to broadcast a bad URI on our channel.
> + * Hopefully the culprit server will hear this and stop sending it.
> + *
> + * @param uri Bad URI to broadcast
> + */
> + private static void broadcastBadUri(final String group, final URI
> uri, final String host, final int port) {
> +
> + getExecutorService().submit(new Runnable() {
> + @Override
> + public void run() {
> + try {
> + final InetAddress ia = getAddress(host);
> +
> + final byte[] bytes = (MulticastPulseClient.CLIENT +
> group + MulticastPulseClient.BADURI + uri.getHost()).getBytes(UTF8);
> + final DatagramPacket request = new
> DatagramPacket(bytes, bytes.length, new InetSocketAddress(ia, port));
> +
> + final MulticastSocket[] multicastSockets =
> MulticastPulseClient.getSockets(ia, port);
> +
> + for (final MulticastSocket socket : multicastSockets)
> {
> +
> + try {
> + socket.send(request);
> + } catch (Exception e) {
> + log.log(Level.WARNING, "Failed to broadcast
> bad URI: " + uri + " on: " + socket.getInterface().getHostAddress(), e);
> + }
> + }
> + } catch (Exception e) {
> + log.log(Level.WARNING, "Failed to broadcast bad URI:
> " + uri, e);
> + }
> + }
> + });
> + }
> }
>
> Modified: tomee/tomee/trunk/server/openejb-common-cli/pom.xml
> URL:
> http://svn.apache.org/viewvc/tomee/tomee/trunk/server/openejb-common-cli/pom.xml?rev=1549891&r1=1549890&r2=1549891&view=diff
>
> ==============================================================================
> --- tomee/tomee/trunk/server/openejb-common-cli/pom.xml (original)
> +++ tomee/tomee/trunk/server/openejb-common-cli/pom.xml Tue Dec 10
> 16:54:47 2013
> @@ -36,20 +36,38 @@
> <version>${project.version}</version>
> </dependency>
> <dependency>
> + <groupId>org.apache.openejb</groupId>
> + <artifactId>openejb-loader</artifactId>
> + <version>${project.version}</version>
> + </dependency>
> + <dependency>
> <groupId>jline</groupId>
> <artifactId>jline</artifactId>
> - <version>0.9.94</version>
> </dependency>
> <dependency>
> <groupId>com.google.code.gson</groupId>
> <artifactId>gson</artifactId>
> - <version>2.1</version>
> </dependency>
> <dependency>
> + <groupId>org.apache.xbean</groupId>
> + <artifactId>xbean-reflect</artifactId>
> + </dependency>
> + <dependency>
> + <groupId>org.apache.xbean</groupId>
> + <artifactId>xbean-finder-shaded</artifactId>
> + </dependency>
> + <dependency>
> + <groupId>org.apache.commons</groupId>
> + <artifactId>commons-lang3</artifactId>
> + </dependency>
> +
> + <!-- Test scope -->
> + <dependency>
> <groupId>org.codehaus.groovy</groupId>
> <artifactId>groovy-all</artifactId>
> - <version>2.1.0</version>
> - <scope>test</scope> <!-- don't deliver it -->
> + <scope>test</scope>
> </dependency>
> +
> +
> </dependencies>
> </project>
>
> Modified:
> tomee/tomee/trunk/server/openejb-common-cli/src/main/java/org/apache/openejb/server/cli/CliRunnable.java
> URL:
> http://svn.apache.org/viewvc/tomee/tomee/trunk/server/openejb-common-cli/src/main/java/org/apache/openejb/server/cli/CliRunnable.java?rev=1549891&r1=1549890&r2=1549891&view=diff
>
> ==============================================================================
> ---
> tomee/tomee/trunk/server/openejb-common-cli/src/main/java/org/apache/openejb/server/cli/CliRunnable.java
> (original)
> +++
> tomee/tomee/trunk/server/openejb-common-cli/src/main/java/org/apache/openejb/server/cli/CliRunnable.java
> Tue Dec 10 16:54:47 2013
> @@ -42,6 +42,7 @@ import java.util.Properties;
> import java.util.TreeMap;
>
> public class CliRunnable implements Runnable {
> +
> private static final Logger LOGGER =
> Logger.getInstance(LogCategory.OPENEJB_SERVER, CliRunnable.class);
>
> private static final String BRANDING_FILE = "branding.properties";
> @@ -87,8 +88,11 @@ public class CliRunnable implements Runn
> UrlSet urlSet = new UrlSet(loader).excludeJvm();
> urlSet = urlSet.exclude(loader.getParent());
>
> - final IAnnotationFinder finder = new AnnotationFinder(new
> ConfigurableClasspathArchive(new
> ConfigurableClasspathArchive.FakeModule(loader, Collections.EMPTY_MAP),
> true, urlSet.getUrls()));
> - for (Annotated<Class<?>> cmd :
> finder.findMetaAnnotatedClasses(Command.class)) {
> + //noinspection unchecked
> + final IAnnotationFinder finder = new AnnotationFinder(new
> ConfigurableClasspathArchive(new
> ConfigurableClasspathArchive.FakeModule(loader, Collections.EMPTY_MAP),
> +
> true,
> +
> urlSet.getUrls()));
> + for (final Annotated<Class<?>> cmd :
> finder.findMetaAnnotatedClasses(Command.class)) {
> try {
> final Command annotation =
> cmd.getAnnotation(Command.class);
> final String key = annotation.name();
> @@ -113,15 +117,15 @@ public class CliRunnable implements Runn
> private OutputStream err;
> private OutputStream out;
> private InputStream sin;
> - private String username;
> - private String bind;
> - private int port;
> + private final String username;
> + private final String bind;
> + private final int port;
>
> - public CliRunnable(String bind, int port) {
> + public CliRunnable(final String bind, final int port) {
> this(bind, port, PROMPT, null);
> }
>
> - public CliRunnable(String bind, int port, String username, String
> sep) {
> + public CliRunnable(final String bind, final int port, final String
> username, final String sep) {
> this.bind = bind;
> this.port = port;
> this.username = username;
> @@ -144,15 +148,15 @@ public class CliRunnable implements Runn
> }
> }
>
> - public void setInputStream(InputStream in) {
> + public void setInputStream(final InputStream in) {
> sin = in;
> }
>
> - public void setOutputStream(OutputStream out) {
> + public void setOutputStream(final OutputStream out) {
> this.out = out;
> }
>
> - public void setErrorStream(OutputStream err) {
> + public void setErrorStream(final OutputStream err) {
> this.err = err;
> }
>
> @@ -165,9 +169,10 @@ public class CliRunnable implements Runn
> }
>
> public void clean() {
> - scripter.clearEngines();
> + OpenEJBScripter.clearEngines();
> }
>
> + @Override
> public void run() {
> clean();
>
> @@ -180,13 +185,13 @@ public class CliRunnable implements Runn
> // TODO : add completers
>
> String line;
> - StringBuilder builtWelcome = new StringBuilder("Apache
> OpenEJB ")
> - .append(OpenEjbVersion.get().getVersion())
> - .append(" build: ")
> - .append(OpenEjbVersion.get().getDate())
> - .append("-")
> - .append(OpenEjbVersion.get().getTime())
> - .append(lineSep);
> + final StringBuilder builtWelcome = new StringBuilder("Apache
> OpenEJB ")
> +
> .append(OpenEjbVersion.get().getVersion())
> + .append(" build: ")
> +
> .append(OpenEjbVersion.get().getDate())
> + .append("-")
> +
> .append(OpenEjbVersion.get().getTime())
> + .append(lineSep);
> if (tomee) {
>
> builtWelcome.append(OS_LINE_SEP).append(PROPERTIES.getProperty(WELCOME_TOMEE_KEY));
> } else {
> @@ -196,10 +201,10 @@ public class CliRunnable implements Runn
>
> streamManager.writeOut(OpenEjbVersion.get().getUrl());
> streamManager.writeOut(builtWelcome.toString()
> - .replace("$bind", bind)
> - .replace("$port", Integer.toString(port))
> - .replace("$name", NAME)
> - .replace(OS_LINE_SEP, lineSep));
> + .replace("$bind", bind)
> + .replace("$port",
> Integer.toString(port))
> + .replace("$name", NAME)
> + .replace(OS_LINE_SEP,
> lineSep));
>
> while ((line = reader.readLine(prompt())) != null) {
> // exit simply let us go out of the loop
> @@ -210,7 +215,7 @@ public class CliRunnable implements Runn
>
> Class<?> cmdClass = null;
> String key = null;
> - for (Map.Entry<String, Class<?>> cmd :
> COMMANDS.entrySet()) {
> + for (final Map.Entry<String, Class<?>> cmd :
> COMMANDS.entrySet()) {
> if (line.startsWith(cmd.getKey())) {
> cmdClass = cmd.getValue();
> key = cmd.getKey();
> @@ -270,8 +275,8 @@ public class CliRunnable implements Runn
> prompt.append(PROMPT);
> }
> prompt.append(" @ ")
> - .append(bind).append(":").append(port)
> - .append(PROMPT_SUFFIX);
> + .append(bind).append(":").append(port)
> + .append(PROMPT_SUFFIX);
> return prompt.toString();
> }
> }
>
> Modified:
> tomee/tomee/trunk/server/openejb-common-cli/src/main/java/org/apache/openejb/server/cli/StreamManager.java
> URL:
> http://svn.apache.org/viewvc/tomee/tomee/trunk/server/openejb-common-cli/src/main/java/org/apache/openejb/server/cli/StreamManager.java?rev=1549891&r1=1549890&r2=1549891&view=diff
>
> ==============================================================================
> ---
> tomee/tomee/trunk/server/openejb-common-cli/src/main/java/org/apache/openejb/server/cli/StreamManager.java
> (original)
> +++
> tomee/tomee/trunk/server/openejb-common-cli/src/main/java/org/apache/openejb/server/cli/StreamManager.java
> Tue Dec 10 16:54:47 2013
> @@ -26,6 +26,7 @@ import java.io.OutputStreamWriter;
> import java.util.Collection;
>
> public class StreamManager {
> +
> private static final String OS_LINE_SEP =
> System.getProperty("line.separator");
>
> private String lineSep;
> @@ -34,16 +35,16 @@ public class StreamManager {
> private OutputStream out;
> private OutputStream err;
>
> - public StreamManager(OutputStream out, OutputStream err, String
> lineSep) {
> + public StreamManager(final OutputStream out, final OutputStream err,
> final String lineSep) {
> this.lineSep = lineSep;
> this.out = out;
> this.err = err;
> this.sout = new OutputStreamWriter(out);
> - this.serr= new OutputStreamWriter(err);
> + this.serr = new OutputStreamWriter(err);
> }
>
> private void write(final OutputStreamWriter writer, final String s) {
> - for (String l : s.split(lineSep)) {
> + for (final String l : s.split(lineSep)) {
> try {
> writer.write(l);
> writer.write(lineSep);
> @@ -64,7 +65,7 @@ public class StreamManager {
> } else {
> final StringBuilder error = new StringBuilder();
> error.append(e.getMessage()).append(lineSep);
> - for (StackTraceElement elt : e.getStackTrace()) {
> + for (final StackTraceElement elt : e.getStackTrace()) {
> error.append("
> ").append(elt.toString()).append(lineSep);
> }
> write(serr, error.toString());
> @@ -77,7 +78,7 @@ public class StreamManager {
> }
> if (out instanceof Collection) {
> final StringBuilder builder = new StringBuilder();
> - for (Object o : (Collection) out) {
> + for (final Object o : (Collection) out) {
> builder.append(string(o, lineSep)).append(lineSep);
> }
> return builder.toString();
> @@ -89,10 +90,10 @@ public class StreamManager {
> if (!out.getClass().getName().startsWith("java")) {
> try {
> return new
> GsonBuilder().setPrettyPrinting().create().toJson(out)
> - .replace(OS_LINE_SEP, lineSep);
> + .replace(OS_LINE_SEP, lineSep);
> } catch (RuntimeException re) {
> return ToStringBuilder.reflectionToString(out,
> ToStringStyle.SHORT_PREFIX_STYLE)
> - .replace(OS_LINE_SEP, lineSep);
> + .replace(OS_LINE_SEP, lineSep);
> }
> }
> return out.toString();
> @@ -123,7 +124,7 @@ public class StreamManager {
> }
>
> public void writeOut(final String text, final String sep) {
> - for (String line : text.split(sep)) {
> + for (final String line : text.split(sep)) {
> writeOut(line);
> }
> }
>
> Modified:
> tomee/tomee/trunk/server/openejb-http/src/main/java/org/apache/openejb/server/httpd/BeginWebBeansListener.java
> URL:
> http://svn.apache.org/viewvc/tomee/tomee/trunk/server/openejb-http/src/main/java/org/apache/openejb/server/httpd/BeginWebBeansListener.java?rev=1549891&r1=1549890&r2=1549891&view=diff
>
> ==============================================================================
> ---
> tomee/tomee/trunk/server/openejb-http/src/main/java/org/apache/openejb/server/httpd/BeginWebBeansListener.java
> (original)
> +++
> tomee/tomee/trunk/server/openejb-http/src/main/java/org/apache/openejb/server/httpd/BeginWebBeansListener.java
> Tue Dec 10 16:54:47 2013
> @@ -61,7 +61,7 @@ public class BeginWebBeansListener imple
> *
> * @param webBeansContext the OWB context
> */
> - public BeginWebBeansListener(WebBeansContext webBeansContext) {
> + public BeginWebBeansListener(final WebBeansContext webBeansContext) {
> this.webBeansContext = webBeansContext;
> this.failoverService =
> this.webBeansContext.getService(FailOverService.class);
> this.contextKey = "org.apache.tomee.catalina.WebBeansListener@"
> + webBeansContext.hashCode();
> @@ -71,7 +71,7 @@ public class BeginWebBeansListener imple
> * {@inheritDoc}
> */
> @Override
> - public void requestDestroyed(ServletRequestEvent event) {
> + public void requestDestroyed(final ServletRequestEvent event) {
> // no-op
> }
>
> @@ -127,7 +127,7 @@ public class BeginWebBeansListener imple
> * {@inheritDoc}
> */
> @Override
> - public void sessionDestroyed(HttpSessionEvent event) {
> + public void sessionDestroyed(final HttpSessionEvent event) {
> ensureRequestScope();
> }
>
> @@ -139,21 +139,20 @@ public class BeginWebBeansListener imple
> }
> }
>
> -
> @Override
> - public void sessionWillPassivate(HttpSessionEvent event) {
> + public void sessionWillPassivate(final HttpSessionEvent event) {
> ensureRequestScope();
> }
>
> @Override
> - public void sessionDidActivate(HttpSessionEvent event) {
> + public void sessionDidActivate(final HttpSessionEvent event) {
> if (failoverService.isSupportFailOver() ||
> failoverService.isSupportPassivation()) {
> failoverService.sessionDidActivate(event.getSession());
> }
> }
>
> @Override
> - public void contextInitialized(ServletContextEvent
> servletContextEvent) {
> + public void contextInitialized(final ServletContextEvent
> servletContextEvent) {
> try {
>
> OpenEJBLifecycle.initializeServletContext(servletContextEvent.getServletContext(),
> webBeansContext);
> } catch (final Exception e) {
> @@ -163,7 +162,7 @@ public class BeginWebBeansListener imple
> }
>
> @Override
> - public void contextDestroyed(ServletContextEvent servletContextEvent)
> {
> + public void contextDestroyed(final ServletContextEvent
> servletContextEvent) {
> ensureRequestScope();
> }
> }
>
> Modified:
> tomee/tomee/trunk/server/openejb-multicast/src/main/java/org/apache/openejb/server/discovery/MulticastPulseAgent.java
> URL:
> http://svn.apache.org/viewvc/tomee/tomee/trunk/server/openejb-multicast/src/main/java/org/apache/openejb/server/discovery/MulticastPulseAgent.java?rev=1549891&r1=1549890&r2=1549891&view=diff
>
> ==============================================================================
> ---
> tomee/tomee/trunk/server/openejb-multicast/src/main/java/org/apache/openejb/server/discovery/MulticastPulseAgent.java
> (original)
> +++
> tomee/tomee/trunk/server/openejb-multicast/src/main/java/org/apache/openejb/server/discovery/MulticastPulseAgent.java
> Tue Dec 10 16:54:47 2013
> @@ -26,6 +26,7 @@ import java.net.URI;
> import java.net.UnknownHostException;
> import java.nio.charset.Charset;
> import java.util.ArrayList;
> +import java.util.Collections;
> import java.util.Comparator;
> import java.util.Enumeration;
> import java.util.HashSet;
> @@ -39,6 +40,7 @@ import java.util.concurrent.Future;
> import java.util.concurrent.TimeUnit;
> import java.util.concurrent.atomic.AtomicBoolean;
> import java.util.concurrent.atomic.AtomicInteger;
> +import java.util.concurrent.locks.ReentrantLock;
>
> /**
> * Licensed to the Apache Software Foundation (ASF) under one or more
> @@ -66,9 +68,11 @@ public class MulticastPulseAgent impleme
>
> public static final String SERVER = "OpenEJB.MCP.Server:";
> public static final String CLIENT = "OpenEJB.MCP.Client:";
> + public static final String BADURI = ":BadUri:";
> public static final String EMPTY = "NoService";
>
> - private final Set<String> ignore = new HashSet<String>();
> + private final ReentrantLock lock = new ReentrantLock();
> + private final Set<String> ignore = Collections.synchronizedSet(new
> HashSet<String>());
> private final Set<URI> uriSet = new HashSet<URI>();
> private AtomicBoolean running = new AtomicBoolean(false);
> final ArrayList<Future> futures = new ArrayList<Future>();
> @@ -84,12 +88,19 @@ public class MulticastPulseAgent impleme
> private boolean loopbackOnly = true;
>
> /**
> + * @author Andy Gumbrecht
> * This agent listens for client pulses on a defined multicast
> channel.
> * On receipt of a valid pulse the agent responds with its own pulse
> for
> * a defined amount of time and rate. A client can deliver a pulse as
> often as
> * required until it is happy of the server response.
> * <p/>
> * Both server and client deliver crafted information payloads.
> + * <p/>
> + * The client pulse contains OpenEJB.MCP.Client:(group or
> *)[:BadUri:URI]
> + * The server will only respond to a request for it's own group or *
> + * The optional :BadUri: is used by clients to notify a server that
> it is sending out unreachable URI's
> + * <p/>
> + * The server response pulse contains
> OpenEJB.MCP.Server:(Service|Service)|(Comma separated host list)
> */
> public MulticastPulseAgent() {
> }
> @@ -111,7 +122,7 @@ public class MulticastPulseAgent impleme
> length = 1;
> }
>
> - executor = Executors.newFixedThreadPool(length * 2);
> + executor = Executors.newFixedThreadPool(length * 3);
> }
>
> return executor;
> @@ -134,7 +145,7 @@ public class MulticastPulseAgent impleme
> }
> }
> } catch (Exception e) {
> - log.warning("Invalid ignore parameter. Should be a lowercase
> single or comma seperated list like: ignore=host1,host2");
> + log.warning("Invalid ignore parameter. Should be a lowercase
> single host or comma seperated list of hosts to ignore like:
> ignore=host1,host2,ipv4,ipv6");
> }
>
> this.multicast = o.get("bind", this.multicast);
> @@ -148,41 +159,59 @@ public class MulticastPulseAgent impleme
>
> private void buildPacket() throws SocketException {
>
> - this.loopbackOnly = true;
> - for (final URI uri : this.uriSet) {
> - if (!isLoopback(uri.getHost())) {
> - this.loopbackOnly = false;
> - break;
> + final ReentrantLock l = this.lock;
> + l.lock();
> +
> + try {
> + this.loopbackOnly = true;
> + for (final URI uri : this.uriSet) {
> + if (!isLoopback(uri.getHost())) {
> + this.loopbackOnly = false;
> + break;
> + }
> }
> - }
>
> - final String hosts = getHosts(this.ignore);
> - final StringBuilder sb = new StringBuilder(SERVER);
> - sb.append(this.group);
> - sb.append(':');
> + final String hosts = getHosts(this.ignore);
> + final StringBuilder sb = new StringBuilder(SERVER);
> + sb.append(this.group);
> + sb.append(':');
>
> - if (this.uriSet.size() > 0) {
> - for (final URI uri : this.uriSet) {
> - sb.append(uri.toASCIIString());
> + if (this.uriSet.size() > 0) {
> + for (final URI uri : this.uriSet) {
> + sb.append(uri.toASCIIString());
> + sb.append('|');
> + }
> + } else {
> + sb.append(EMPTY);
> sb.append('|');
> }
> - } else {
> - sb.append(EMPTY);
> - sb.append('|');
> - }
>
> - sb.append(hosts);
> + sb.append(hosts);
> +
> + final byte[] bytes = (sb.toString()).getBytes(UTF8);
> + this.response = new DatagramPacket(bytes, bytes.length,
> this.address);
>
> - final byte[] bytes = (sb.toString()).getBytes(UTF8);
> - this.response = new DatagramPacket(bytes, bytes.length,
> this.address);
> + if (log.isDebugEnabled()) {
> + log.debug("MultiPulse packet is: " + sb);
> + }
>
> - if (log.isDebugEnabled()) {
> - log.debug("MultiPulse packet is: " + sb);
> + if (bytes.length > 2048) {
> + log.warning("MultiPulse packet is larger than 2048 bytes,
> clients will not be able to read the packet" +
> + "\n - You should define the 'ignore' property
> to filter out unreachable addresses: " + sb);
> + }
> + } finally {
> + l.unlock();
> }
> + }
>
> - if (bytes.length > 2048) {
> - log.warning("MultiPulse packet is larger than 2048 bytes,
> clients will not be able to read the packet" +
> - "\n - You should define the 'ignore' property to
> filter out unreachable addresses: " + sb);
> + public DatagramPacket getResponsePacket() {
> + final ReentrantLock l = this.lock;
> + l.lock();
> +
> + try {
> + return this.response;
> + } finally {
> + l.unlock();
> }
> }
>
> @@ -191,6 +220,10 @@ public class MulticastPulseAgent impleme
> this.listener = listener;
> }
>
> + public DiscoveryListener getDiscoveryListener() {
> + return listener;
> + }
> +
> @Override
> public void registerService(URI uri) throws IOException {
>
> @@ -267,7 +300,7 @@ public class MulticastPulseAgent impleme
> continue;
> }
>
> - final Sender sender = new Sender(this, socketKey, socket,
> this.response);
> + final Sender sender = new Sender(this, socketKey, socket);
> this.futures.add(executorService.submit(sender));
>
> this.futures.add(executorService.submit(new Runnable() {
> @@ -289,34 +322,64 @@ public class MulticastPulseAgent impleme
>
> if (req.startsWith(CLIENT)) {
>
> - req = (req.replace(CLIENT, ""));
> + final int ix =
> req.indexOf(BADURI);
> + String badUri = null;
> +
> + if (ix > 0) {
> + //The client is notifying of
> a bad uri
> + badUri =
> req.substring(ix).replace(BADURI, "");
> + req = req.substring(0,
> ix).replace(CLIENT, "");
> + } else {
> + req = (req.replace(CLIENT,
> ""));
> + }
>
> + //Is this a group or global pulse
> request
> if (mpg.equals(req) ||
> "*".equals(req)) {
>
> - final String client =
> ((InetSocketAddress) sa).getAddress().getHostAddress();
> + //Is there a bad url and is
> it this agent broadcasting the bad URI?
> + if (null != badUri &&
> getHosts(MulticastPulseAgent.this.ignore).contains(badUri)) {
> + final ReentrantLock l =
> MulticastPulseAgent.this.lock;
> + l.lock();
> +
> + try {
> + //Remove it and
> rebuild our broadcast packet
> + if
> (MulticastPulseAgent.this.ignore.add(badUri)) {
> +
> MulticastPulseAgent.this.buildPacket();
> +
> +
> MulticastPulseAgent.this.fireEvent(URI.create("OpenEJB" + BADURI +
> badUri), false);
> +
> + log.warning("This
> server has removed the unreachable host '" + badUri + "' from discovery,
> you should consider adding" +
> + "
> this to the 'ignore' property in the multipulse.properties file");
> + }
> +
> + } finally {
> + l.unlock();
> + }
> + } else {
>
> - if (isLoopBackOnly) {
> - //We only have local
> services, so make sure the request is from a local source else ignore it
> - if
> (!MulticastPulseAgent.isLocalAddress(client, false)) {
> + //Normal client multicast
> pulse request
> + final String client =
> ((InetSocketAddress) sa).getAddress().getHostAddress();
> +
> + if (isLoopBackOnly &&
> !MulticastPulseAgent.isLocalAddress(client, false)) {
> + //We only have local
> services, so make sure the request is from a local source else ignore it
> if
> (log.isDebugEnabled()) {
>
> log.debug(String.format("Ignoring remote client %1$s pulse request for
> group: %2$s - No remote services available",
>
> client,
>
> req));
> }
> - return;
> - }
> - }
> + } else {
>
> - //We have received a valid
> pulse request
> - if (log.isDebugEnabled()) {
> -
> log.debug(String.format("Answering client '%1$s' pulse request for group:
> '%2$s' on '%3$s'", client, req, socketKey));
> - }
> + //We have received a
> valid pulse request
> + if
> (log.isDebugEnabled()) {
> +
> log.debug(String.format("Answering client '%1$s' pulse request for group:
> '%2$s' on '%3$s'", client, req, socketKey));
> + }
>
> - //Renew response pulse
> - sender.pulseResponse();
> + //Renew response pulse
> +
> sender.pulseResponse();
> + }
> + }
> }
> }
> -
> }
>
> } catch (Exception e) {
> @@ -412,6 +475,27 @@ public class MulticastPulseAgent impleme
> }
>
> /**
> + * Lists current broadcast hosts as a comma separated list.
> + * Used principally for testing.
> + *
> + * @return String
> + */
> + public String getHosts() {
> + return getHosts(this.ignore);
> + }
> +
> + /**
> + * Remove a host from the ignore list.
> + * Used principally for testing.
> + *
> + * @param host String
> + * @return True if removed, else false
> + */
> + public boolean removeFromIgnore(final String host) {
> + return this.ignore.remove(host);
> + }
> +
> + /**
> * Attempts to return at least one socket per valid network interface.
> * If no valid interface is found then the array will be empty.
> *
> @@ -612,13 +696,11 @@ public class MulticastPulseAgent impleme
> private final MulticastPulseAgent agent;
> private final String socketKey;
> private final MulticastSocket socket;
> - private final DatagramPacket mpr;
>
> - private Sender(final MulticastPulseAgent agent, final String
> socketKey, final MulticastSocket socket, final DatagramPacket mpr) {
> + private Sender(final MulticastPulseAgent agent, final String
> socketKey, final MulticastSocket socket) {
> this.agent = agent;
> this.socketKey = socketKey;
> this.socket = socket;
> - this.mpr = mpr;
> }
>
> @Override
> @@ -640,7 +722,7 @@ public class MulticastPulseAgent impleme
> while (this.counter.decrementAndGet() > 0) {
>
> try {
> - this.socket.send(this.mpr);
> + this.socket.send(this.agent.getResponsePacket());
> } catch (Exception e) {
> if (log.isDebugEnabled()) {
> log.debug("MulticastPulseAgent client error:
> " + e.getMessage(), e);
>
> Modified:
> tomee/tomee/trunk/server/openejb-multicast/src/test/java/org/apache/openejb/server/discovery/MulticastPulseAgentTest.java
> URL:
> http://svn.apache.org/viewvc/tomee/tomee/trunk/server/openejb-multicast/src/test/java/org/apache/openejb/server/discovery/MulticastPulseAgentTest.java?rev=1549891&r1=1549890&r2=1549891&view=diff
>
> ==============================================================================
> ---
> tomee/tomee/trunk/server/openejb-multicast/src/test/java/org/apache/openejb/server/discovery/MulticastPulseAgentTest.java
> (original)
> +++
> tomee/tomee/trunk/server/openejb-multicast/src/test/java/org/apache/openejb/server/discovery/MulticastPulseAgentTest.java
> Tue Dec 10 16:54:47 2013
> @@ -48,11 +48,6 @@ import java.util.concurrent.TimeUnit;
> import java.util.concurrent.atomic.AtomicBoolean;
> import java.util.concurrent.locks.ReentrantLock;
>
> -/**
> - * Copyright (c) ORPRO Vision GmbH.
> - * Author: Andy Gumbrecht
> - * Date: 11.06.12
> - */
> @SuppressWarnings("UseOfSystemOutOrSystemErr")
> public class MulticastPulseAgentTest {
>
> @@ -229,7 +224,10 @@ public class MulticastPulseAgentTest {
> final String[] serviceList =
> services.split("\\|");
> final String[] hosts = s.split(",");
>
> - System.out.println(String.format("\n"
> + name + " received Server pulse:\n\tGroup: %1$s\n\tServices:
> %2$s\n\tServer: %3$s\n", group, services, s));
> + System.out.println(String.format("\n"
> + name + " received Server pulse:\n\tGroup: %1$s\n\tServices:
> %2$s\n\tServer: %3$s\n",
> +
> group,
> +
> services,
> + s));
>
> for (final String svc : serviceList) {
>
> @@ -392,6 +390,69 @@ public class MulticastPulseAgentTest {
> org.junit.Assert.assertTrue(timeout == 1 || set.size() > 0);
> }
>
> + @Test
> + public void testBroadcastBadUri() throws Exception {
> +
> + final DiscoveryListener original = agent.getDiscoveryListener();
> +
> + final CountDownLatch latch = new CountDownLatch(1);
> +
> + final DiscoveryListener listener = new DiscoveryListener() {
> + @Override
> + public void serviceAdded(final URI service) {
> + latch.countDown();
> + System.out.println("added = " + service);
> + }
> +
> + @Override
> + public void serviceRemoved(final URI service) {
> + latch.countDown();
> + System.out.println("removed = " + service);
> + }
> + };
> +
> + agent.setDiscoveryListener(listener);
> +
> + final String[] hosts = agent.getHosts().split(",");
> + final String host = hosts[hosts.length - 1];
> +
> + final Future<?> future = executor.submit(new Runnable() {
> + @Override
> + public void run() {
> + try {
> + final InetAddress ia =
> getAddress(MulticastPulseAgentTest.host);
> +
> + final byte[] bytes = (MulticastPulseAgent.CLIENT +
> forGroup + MulticastPulseAgent.BADURI +
> host).getBytes(Charset.forName("UTF-8"));
> + final DatagramPacket request = new
> DatagramPacket(bytes, bytes.length, new InetSocketAddress(ia, port));
> +
> + final MulticastSocket[] multicastSockets =
> MulticastPulseAgent.getSockets(MulticastPulseAgentTest.host, port);
> +
> + for (final MulticastSocket socket : multicastSockets)
> {
> +
> + try {
> + socket.send(request);
> + } catch (Exception e) {
> + System.out.println("Failed to broadcast bad
> URI on: " + socket.getInterface().getHostAddress());
> + e.printStackTrace();
> + }
> + }
> + } catch (Exception e) {
> + System.out.println("Failed to broadcast bad URI");
> + e.printStackTrace();
> + }
> + }
> + });
> +
> + final Object o = future.get(10, TimeUnit.SECONDS);
> +
> + final boolean await = latch.await(20, TimeUnit.SECONDS);
> + final boolean removed = agent.removeFromIgnore(host);
> +
> + agent.setDiscoveryListener(original);
> +
> + org.junit.Assert.assertTrue("Failed to remove host", removed &&
> await);
> + }
> +
> private String ipFormat(final String h) throws UnknownHostException {
>
> final InetAddress ia = InetAddress.getByName(h);
> @@ -402,6 +463,20 @@ public class MulticastPulseAgentTest {
> }
> }
>
> + private static InetAddress getAddress(final String host) throws
> Exception {
> + final InetAddress ia;
> + try {
> + ia = InetAddress.getByName(host);
> + } catch (UnknownHostException e) {
> + throw new Exception(host + " is not a valid address", e);
> + }
> +
> + if (null == ia || !ia.isMulticastAddress()) {
> + throw new Exception(host + " is not a valid multicast
> address");
> + }
> + return ia;
> + }
> +
> private static class MyDiscoveryListener implements DiscoveryListener
> {
>
> private final String id;
>
> Modified: tomee/tomee/trunk/server/openejb-server/pom.xml
> URL:
> http://svn.apache.org/viewvc/tomee/tomee/trunk/server/openejb-server/pom.xml?rev=1549891&r1=1549890&r2=1549891&view=diff
>
> ==============================================================================
> --- tomee/tomee/trunk/server/openejb-server/pom.xml (original)
> +++ tomee/tomee/trunk/server/openejb-server/pom.xml Tue Dec 10 16:54:47
> 2013
> @@ -65,6 +65,11 @@
> </dependency>
> <dependency>
> <groupId>org.apache.openejb</groupId>
> + <artifactId>openejb-jee</artifactId>
> + <version>${project.version}</version>
> + </dependency>
> + <dependency>
> + <groupId>org.apache.openejb</groupId>
> <artifactId>openejb-loader</artifactId>
> <version>${project.version}</version>
> </dependency>
> @@ -84,6 +89,10 @@
> </dependency>
> <dependency>
> <groupId>org.apache.xbean</groupId>
> + <artifactId>xbean-finder-shaded</artifactId>
> + </dependency>
> + <dependency>
> + <groupId>org.apache.xbean</groupId>
> <artifactId>xbean-reflect</artifactId>
> </dependency>
> <dependency>
>
> Modified:
> tomee/tomee/trunk/server/openejb-server/src/main/java/org/apache/openejb/server/SimpleServiceManager.java
> URL:
> http://svn.apache.org/viewvc/tomee/tomee/trunk/server/openejb-server/src/main/java/org/apache/openejb/server/SimpleServiceManager.java?rev=1549891&r1=1549890&r2=1549891&view=diff
>
> ==============================================================================
> ---
> tomee/tomee/trunk/server/openejb-server/src/main/java/org/apache/openejb/server/SimpleServiceManager.java
> (original)
> +++
> tomee/tomee/trunk/server/openejb-server/src/main/java/org/apache/openejb/server/SimpleServiceManager.java
> Tue Dec 10 16:54:47 2013
> @@ -155,7 +155,7 @@ public class SimpleServiceManager extend
> @Override
> public synchronized void start(final boolean block) throws
> ServiceException {
>
> - if(stopped){
> + if (stopped) {
> throw new ServiceException("Stop has already been called on
> ServiceManager");
> }
>
>
> Modified:
> tomee/tomee/trunk/server/openejb-server/src/test/java/org/apache/openejb/server/FilteredServiceManagerWithAdminTest.java
> URL:
> http://svn.apache.org/viewvc/tomee/tomee/trunk/server/openejb-server/src/test/java/org/apache/openejb/server/FilteredServiceManagerWithAdminTest.java?rev=1549891&r1=1549890&r2=1549891&view=diff
>
> ==============================================================================
> ---
> tomee/tomee/trunk/server/openejb-server/src/test/java/org/apache/openejb/server/FilteredServiceManagerWithAdminTest.java
> (original)
> +++
> tomee/tomee/trunk/server/openejb-server/src/test/java/org/apache/openejb/server/FilteredServiceManagerWithAdminTest.java
> Tue Dec 10 16:54:47 2013
> @@ -40,12 +40,12 @@ public class FilteredServiceManagerWithA
> }
>
> @Test
> - public void numberOfServices () {
> + public void numberOfServices() {
> // when using @EnableServices with the application composer
> // the return value should be a FilteredServiceManager
> assertEquals(FilteredServiceManager.class,
> ServiceManager.get().getClass());
>
> - FilteredServiceManager manager = (FilteredServiceManager)
> ServiceManager.get();
> + final FilteredServiceManager manager = (FilteredServiceManager)
> ServiceManager.get();
> assertEquals(1, manager.getDaemons().length);
> assertEquals("admin", manager.getDaemons()[0].getName());
> }
>
>
>