You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@tomee.apache.org by an...@apache.org on 2013/12/10 17:54:48 UTC

svn commit: r1549891 - in /tomee/tomee/trunk/server: openejb-client/src/main/java/org/apache/openejb/client/ openejb-common-cli/ openejb-common-cli/src/main/java/org/apache/openejb/server/cli/ openejb-http/src/main/java/org/apache/openejb/server/httpd/...

Author: andygumbrecht
Date: Tue Dec 10 16:54:47 2013
New Revision: 1549891

URL: http://svn.apache.org/r1549891
Log:
Improve MulticastPulseAgent/MulticastPulseClient by allowing client to notify server of unreachable hosts - Eventually a server will stop sending them out.
Pull some more <version> tags up to parent pom dep-management where they should all be.
Allow server project to build from it's own directory.
Finals.

Modified:
    tomee/tomee/trunk/server/openejb-client/src/main/java/org/apache/openejb/client/MulticastPulseClient.java
    tomee/tomee/trunk/server/openejb-common-cli/pom.xml
    tomee/tomee/trunk/server/openejb-common-cli/src/main/java/org/apache/openejb/server/cli/CliRunnable.java
    tomee/tomee/trunk/server/openejb-common-cli/src/main/java/org/apache/openejb/server/cli/StreamManager.java
    tomee/tomee/trunk/server/openejb-http/src/main/java/org/apache/openejb/server/httpd/BeginWebBeansListener.java
    tomee/tomee/trunk/server/openejb-multicast/src/main/java/org/apache/openejb/server/discovery/MulticastPulseAgent.java
    tomee/tomee/trunk/server/openejb-multicast/src/test/java/org/apache/openejb/server/discovery/MulticastPulseAgentTest.java
    tomee/tomee/trunk/server/openejb-server/pom.xml
    tomee/tomee/trunk/server/openejb-server/src/main/java/org/apache/openejb/server/SimpleServiceManager.java
    tomee/tomee/trunk/server/openejb-server/src/test/java/org/apache/openejb/server/FilteredServiceManagerWithAdminTest.java

Modified: tomee/tomee/trunk/server/openejb-client/src/main/java/org/apache/openejb/client/MulticastPulseClient.java
URL: http://svn.apache.org/viewvc/tomee/tomee/trunk/server/openejb-client/src/main/java/org/apache/openejb/client/MulticastPulseClient.java?rev=1549891&r1=1549890&r2=1549891&view=diff
==============================================================================
--- tomee/tomee/trunk/server/openejb-client/src/main/java/org/apache/openejb/client/MulticastPulseClient.java (original)
+++ tomee/tomee/trunk/server/openejb-client/src/main/java/org/apache/openejb/client/MulticastPulseClient.java Tue Dec 10 16:54:47 2013
@@ -64,6 +64,7 @@ public class MulticastPulseClient extend
     private static final Logger log = Logger.getLogger("OpenEJB.client");
     private static final String SERVER = "OpenEJB.MCP.Server:";
     private static final String CLIENT = "OpenEJB.MCP.Client:";
+    private static final String BADURI = ":BadUri:";
     private static final String EMPTY = "NoService";
     private static final Charset UTF8 = Charset.forName("UTF-8");
     private static final int TTL = Integer.parseInt(System.getProperty(ORG_APACHE_OPENEJB_MULTIPULSE_TTL, "32"));
@@ -118,12 +119,7 @@ public class MulticastPulseClient extend
 
         if (null == uriSet || uriSet.isEmpty()) {
 
-            final Map<String, String> params;
-            try {
-                params = URIs.parseParamters(uri);
-            } catch (URISyntaxException e) {
-                throw new IllegalArgumentException("Invalid MultiPulse uri " + uri.toString(), e);
-            }
+            final Map<String, String> params = getUriParameters(uri);
 
             final Set<String> schemes = getSet(params, "schemes", this.getDefaultSchemes());
             final String group = getString(params, "group", "default");
@@ -140,17 +136,39 @@ public class MulticastPulseClient extend
 
         for (final URI serviceURI : uriSet) {
 
+            //Strip serverhost and group and try to connect
+            final URI tryUri = URI.create(URI.create(serviceURI.getSchemeSpecificPart()).getSchemeSpecificPart());
+
             try {
-                //Strip serverhost and group and try to connect
-                return ConnectionManager.getConnection(URI.create(URI.create(serviceURI.getSchemeSpecificPart()).getSchemeSpecificPart()));
+                return ConnectionManager.getConnection(tryUri);
             } catch (Exception e) {
+
                 uriSet.remove(serviceURI);
+
+                if (java.net.SocketTimeoutException.class.isInstance(e) || SocketException.class.isInstance(e)) {
+                    //Notify server that this URI is not reachable
+                    MulticastPulseClient.broadcastBadUri(getString(getUriParameters(uri), "group", "default"), tryUri, uri.getHost(), uri.getPort());
+                }
+
+                if (log.isLoggable(Level.FINE)) {
+                    log.fine("Failed connection to: " + serviceURI);
+                }
             }
         }
 
         throw new IOException("Unable to connect an ejb server via the MultiPulse URI: " + uri);
     }
 
+    private static Map<String, String> getUriParameters(final URI uri) {
+        final Map<String, String> params;
+        try {
+            params = URIs.parseParamters(uri);
+        } catch (URISyntaxException e) {
+            throw new IllegalArgumentException("Invalid MultiPulse uri " + uri.toString(), e);
+        }
+        return params;
+    }
+
     /**
      * Get a list of URIs discovered for the provided request.
      * <p/>
@@ -187,17 +205,7 @@ public class MulticastPulseClient extend
             throw new Exception("Specify a valid port between 1 and 65535");
         }
 
-        final InetAddress ia;
-
-        try {
-            ia = InetAddress.getByName(host);
-        } catch (UnknownHostException e) {
-            throw new Exception(host + " is not a valid address", e);
-        }
-
-        if (null == ia || !ia.isMulticastAddress()) {
-            throw new Exception(host + " is not a valid multicast address");
-        }
+        final InetAddress ia = getAddress(host);
 
         final byte[] bytes = (MulticastPulseClient.CLIENT + forGroup).getBytes(UTF8);
         final DatagramPacket request = new DatagramPacket(bytes, bytes.length, new InetSocketAddress(ia, port));
@@ -492,6 +500,20 @@ public class MulticastPulseClient extend
         }
     }
 
+    private static InetAddress getAddress(final String host) throws Exception {
+        final InetAddress ia;
+        try {
+            ia = InetAddress.getByName(host);
+        } catch (UnknownHostException e) {
+            throw new Exception(host + " is not a valid address", e);
+        }
+
+        if (null == ia || !ia.isMulticastAddress()) {
+            throw new Exception(host + " is not a valid multicast address");
+        }
+        return ia;
+    }
+
     /**
      * Is the provided host a local host
      *
@@ -681,7 +703,10 @@ public class MulticastPulseClient extend
                                 s.connect(new InetSocketAddress(host, port), st);
                                 b = true;
                             } catch (Exception e) {
-                                //Ignore
+                                if (java.net.SocketTimeoutException.class.isInstance(e) || SocketException.class.isInstance(e)) {
+                                    MulticastPulseClient.broadcastBadUri(group, uriSub, mchost, mcport);
+                                    System.out.print("" + e + " : ");
+                                }
                             } finally {
                                 try {
                                     s.close();
@@ -716,4 +741,38 @@ public class MulticastPulseClient extend
         running.set(false);
         t.interrupt();
     }
+
+    /**
+     * Asynchronous attempt to broadcast a bad URI on our channel.
+     * Hopefully the culprit server will hear this and stop sending it.
+     *
+     * @param uri Bad URI to broadcast
+     */
+    private static void broadcastBadUri(final String group, final URI uri, final String host, final int port) {
+
+        getExecutorService().submit(new Runnable() {
+            @Override
+            public void run() {
+                try {
+                    final InetAddress ia = getAddress(host);
+
+                    final byte[] bytes = (MulticastPulseClient.CLIENT + group + MulticastPulseClient.BADURI + uri.getHost()).getBytes(UTF8);
+                    final DatagramPacket request = new DatagramPacket(bytes, bytes.length, new InetSocketAddress(ia, port));
+
+                    final MulticastSocket[] multicastSockets = MulticastPulseClient.getSockets(ia, port);
+
+                    for (final MulticastSocket socket : multicastSockets) {
+
+                        try {
+                            socket.send(request);
+                        } catch (Exception e) {
+                            log.log(Level.WARNING, "Failed to broadcast bad URI: " + uri + " on: " + socket.getInterface().getHostAddress(), e);
+                        }
+                    }
+                } catch (Exception e) {
+                    log.log(Level.WARNING, "Failed to broadcast bad URI: " + uri, e);
+                }
+            }
+        });
+    }
 }

Modified: tomee/tomee/trunk/server/openejb-common-cli/pom.xml
URL: http://svn.apache.org/viewvc/tomee/tomee/trunk/server/openejb-common-cli/pom.xml?rev=1549891&r1=1549890&r2=1549891&view=diff
==============================================================================
--- tomee/tomee/trunk/server/openejb-common-cli/pom.xml (original)
+++ tomee/tomee/trunk/server/openejb-common-cli/pom.xml Tue Dec 10 16:54:47 2013
@@ -36,20 +36,38 @@
       <version>${project.version}</version>
     </dependency>
     <dependency>
+      <groupId>org.apache.openejb</groupId>
+      <artifactId>openejb-loader</artifactId>
+      <version>${project.version}</version>
+    </dependency>
+    <dependency>
       <groupId>jline</groupId>
       <artifactId>jline</artifactId>
-      <version>0.9.94</version>
     </dependency>
     <dependency>
       <groupId>com.google.code.gson</groupId>
       <artifactId>gson</artifactId>
-      <version>2.1</version>
     </dependency>
     <dependency>
+      <groupId>org.apache.xbean</groupId>
+      <artifactId>xbean-reflect</artifactId>
+    </dependency>
+    <dependency>
+      <groupId>org.apache.xbean</groupId>
+      <artifactId>xbean-finder-shaded</artifactId>
+    </dependency>
+    <dependency>
+      <groupId>org.apache.commons</groupId>
+      <artifactId>commons-lang3</artifactId>
+    </dependency>
+
+    <!-- Test scope -->
+    <dependency>
       <groupId>org.codehaus.groovy</groupId>
       <artifactId>groovy-all</artifactId>
-      <version>2.1.0</version>
-      <scope>test</scope> <!-- don't deliver it -->
+      <scope>test</scope>
     </dependency>
+
+
   </dependencies>
 </project>

Modified: tomee/tomee/trunk/server/openejb-common-cli/src/main/java/org/apache/openejb/server/cli/CliRunnable.java
URL: http://svn.apache.org/viewvc/tomee/tomee/trunk/server/openejb-common-cli/src/main/java/org/apache/openejb/server/cli/CliRunnable.java?rev=1549891&r1=1549890&r2=1549891&view=diff
==============================================================================
--- tomee/tomee/trunk/server/openejb-common-cli/src/main/java/org/apache/openejb/server/cli/CliRunnable.java (original)
+++ tomee/tomee/trunk/server/openejb-common-cli/src/main/java/org/apache/openejb/server/cli/CliRunnable.java Tue Dec 10 16:54:47 2013
@@ -42,6 +42,7 @@ import java.util.Properties;
 import java.util.TreeMap;
 
 public class CliRunnable implements Runnable {
+
     private static final Logger LOGGER = Logger.getInstance(LogCategory.OPENEJB_SERVER, CliRunnable.class);
 
     private static final String BRANDING_FILE = "branding.properties";
@@ -87,8 +88,11 @@ public class CliRunnable implements Runn
             UrlSet urlSet = new UrlSet(loader).excludeJvm();
             urlSet = urlSet.exclude(loader.getParent());
 
-            final IAnnotationFinder finder = new AnnotationFinder(new ConfigurableClasspathArchive(new ConfigurableClasspathArchive.FakeModule(loader, Collections.EMPTY_MAP), true, urlSet.getUrls()));
-            for (Annotated<Class<?>> cmd : finder.findMetaAnnotatedClasses(Command.class)) {
+            //noinspection unchecked
+            final IAnnotationFinder finder = new AnnotationFinder(new ConfigurableClasspathArchive(new ConfigurableClasspathArchive.FakeModule(loader, Collections.EMPTY_MAP),
+                                                                                                   true,
+                                                                                                   urlSet.getUrls()));
+            for (final Annotated<Class<?>> cmd : finder.findMetaAnnotatedClasses(Command.class)) {
                 try {
                     final Command annotation = cmd.getAnnotation(Command.class);
                     final String key = annotation.name();
@@ -113,15 +117,15 @@ public class CliRunnable implements Runn
     private OutputStream err;
     private OutputStream out;
     private InputStream sin;
-    private String username;
-    private String bind;
-    private int port;
+    private final String username;
+    private final String bind;
+    private final int port;
 
-    public CliRunnable(String bind, int port) {
+    public CliRunnable(final String bind, final int port) {
         this(bind, port, PROMPT, null);
     }
 
-    public CliRunnable(String bind, int port, String username, String sep) {
+    public CliRunnable(final String bind, final int port, final String username, final String sep) {
         this.bind = bind;
         this.port = port;
         this.username = username;
@@ -144,15 +148,15 @@ public class CliRunnable implements Runn
         }
     }
 
-    public void setInputStream(InputStream in) {
+    public void setInputStream(final InputStream in) {
         sin = in;
     }
 
-    public void setOutputStream(OutputStream out) {
+    public void setOutputStream(final OutputStream out) {
         this.out = out;
     }
 
-    public void setErrorStream(OutputStream err) {
+    public void setErrorStream(final OutputStream err) {
         this.err = err;
     }
 
@@ -165,9 +169,10 @@ public class CliRunnable implements Runn
     }
 
     public void clean() {
-        scripter.clearEngines();
+        OpenEJBScripter.clearEngines();
     }
 
+    @Override
     public void run() {
         clean();
 
@@ -180,13 +185,13 @@ public class CliRunnable implements Runn
             // TODO : add completers
 
             String line;
-            StringBuilder builtWelcome = new StringBuilder("Apache OpenEJB ")
-                    .append(OpenEjbVersion.get().getVersion())
-                    .append("    build: ")
-                    .append(OpenEjbVersion.get().getDate())
-                    .append("-")
-                    .append(OpenEjbVersion.get().getTime())
-                    .append(lineSep);
+            final StringBuilder builtWelcome = new StringBuilder("Apache OpenEJB ")
+                                                   .append(OpenEjbVersion.get().getVersion())
+                                                   .append("    build: ")
+                                                   .append(OpenEjbVersion.get().getDate())
+                                                   .append("-")
+                                                   .append(OpenEjbVersion.get().getTime())
+                                                   .append(lineSep);
             if (tomee) {
                 builtWelcome.append(OS_LINE_SEP).append(PROPERTIES.getProperty(WELCOME_TOMEE_KEY));
             } else {
@@ -196,10 +201,10 @@ public class CliRunnable implements Runn
 
             streamManager.writeOut(OpenEjbVersion.get().getUrl());
             streamManager.writeOut(builtWelcome.toString()
-                    .replace("$bind", bind)
-                    .replace("$port", Integer.toString(port))
-                    .replace("$name", NAME)
-                    .replace(OS_LINE_SEP, lineSep));
+                                               .replace("$bind", bind)
+                                               .replace("$port", Integer.toString(port))
+                                               .replace("$name", NAME)
+                                               .replace(OS_LINE_SEP, lineSep));
 
             while ((line = reader.readLine(prompt())) != null) {
                 // exit simply let us go out of the loop
@@ -210,7 +215,7 @@ public class CliRunnable implements Runn
 
                 Class<?> cmdClass = null;
                 String key = null;
-                for (Map.Entry<String, Class<?>> cmd : COMMANDS.entrySet()) {
+                for (final Map.Entry<String, Class<?>> cmd : COMMANDS.entrySet()) {
                     if (line.startsWith(cmd.getKey())) {
                         cmdClass = cmd.getValue();
                         key = cmd.getKey();
@@ -270,8 +275,8 @@ public class CliRunnable implements Runn
             prompt.append(PROMPT);
         }
         prompt.append(" @ ")
-            .append(bind).append(":").append(port)
-            .append(PROMPT_SUFFIX);
+              .append(bind).append(":").append(port)
+              .append(PROMPT_SUFFIX);
         return prompt.toString();
     }
 }

Modified: tomee/tomee/trunk/server/openejb-common-cli/src/main/java/org/apache/openejb/server/cli/StreamManager.java
URL: http://svn.apache.org/viewvc/tomee/tomee/trunk/server/openejb-common-cli/src/main/java/org/apache/openejb/server/cli/StreamManager.java?rev=1549891&r1=1549890&r2=1549891&view=diff
==============================================================================
--- tomee/tomee/trunk/server/openejb-common-cli/src/main/java/org/apache/openejb/server/cli/StreamManager.java (original)
+++ tomee/tomee/trunk/server/openejb-common-cli/src/main/java/org/apache/openejb/server/cli/StreamManager.java Tue Dec 10 16:54:47 2013
@@ -26,6 +26,7 @@ import java.io.OutputStreamWriter;
 import java.util.Collection;
 
 public class StreamManager {
+
     private static final String OS_LINE_SEP = System.getProperty("line.separator");
 
     private String lineSep;
@@ -34,16 +35,16 @@ public class StreamManager {
     private OutputStream out;
     private OutputStream err;
 
-    public StreamManager(OutputStream out, OutputStream err, String lineSep) {
+    public StreamManager(final OutputStream out, final OutputStream err, final String lineSep) {
         this.lineSep = lineSep;
         this.out = out;
         this.err = err;
         this.sout = new OutputStreamWriter(out);
-        this.serr= new OutputStreamWriter(err);
+        this.serr = new OutputStreamWriter(err);
     }
 
     private void write(final OutputStreamWriter writer, final String s) {
-        for (String l : s.split(lineSep)) {
+        for (final String l : s.split(lineSep)) {
             try {
                 writer.write(l);
                 writer.write(lineSep);
@@ -64,7 +65,7 @@ public class StreamManager {
         } else {
             final StringBuilder error = new StringBuilder();
             error.append(e.getMessage()).append(lineSep);
-            for (StackTraceElement elt : e.getStackTrace()) {
+            for (final StackTraceElement elt : e.getStackTrace()) {
                 error.append("    ").append(elt.toString()).append(lineSep);
             }
             write(serr, error.toString());
@@ -77,7 +78,7 @@ public class StreamManager {
         }
         if (out instanceof Collection) {
             final StringBuilder builder = new StringBuilder();
-            for (Object o : (Collection) out) {
+            for (final Object o : (Collection) out) {
                 builder.append(string(o, lineSep)).append(lineSep);
             }
             return builder.toString();
@@ -89,10 +90,10 @@ public class StreamManager {
         if (!out.getClass().getName().startsWith("java")) {
             try {
                 return new GsonBuilder().setPrettyPrinting().create().toJson(out)
-                            .replace(OS_LINE_SEP, lineSep);
+                                        .replace(OS_LINE_SEP, lineSep);
             } catch (RuntimeException re) {
                 return ToStringBuilder.reflectionToString(out, ToStringStyle.SHORT_PREFIX_STYLE)
-                            .replace(OS_LINE_SEP, lineSep);
+                                      .replace(OS_LINE_SEP, lineSep);
             }
         }
         return out.toString();
@@ -123,7 +124,7 @@ public class StreamManager {
     }
 
     public void writeOut(final String text, final String sep) {
-        for (String line : text.split(sep)) {
+        for (final String line : text.split(sep)) {
             writeOut(line);
         }
     }

Modified: tomee/tomee/trunk/server/openejb-http/src/main/java/org/apache/openejb/server/httpd/BeginWebBeansListener.java
URL: http://svn.apache.org/viewvc/tomee/tomee/trunk/server/openejb-http/src/main/java/org/apache/openejb/server/httpd/BeginWebBeansListener.java?rev=1549891&r1=1549890&r2=1549891&view=diff
==============================================================================
--- tomee/tomee/trunk/server/openejb-http/src/main/java/org/apache/openejb/server/httpd/BeginWebBeansListener.java (original)
+++ tomee/tomee/trunk/server/openejb-http/src/main/java/org/apache/openejb/server/httpd/BeginWebBeansListener.java Tue Dec 10 16:54:47 2013
@@ -61,7 +61,7 @@ public class BeginWebBeansListener imple
      *
      * @param webBeansContext the OWB context
      */
-    public BeginWebBeansListener(WebBeansContext webBeansContext) {
+    public BeginWebBeansListener(final WebBeansContext webBeansContext) {
         this.webBeansContext = webBeansContext;
         this.failoverService = this.webBeansContext.getService(FailOverService.class);
         this.contextKey = "org.apache.tomee.catalina.WebBeansListener@" + webBeansContext.hashCode();
@@ -71,7 +71,7 @@ public class BeginWebBeansListener imple
      * {@inheritDoc}
      */
     @Override
-    public void requestDestroyed(ServletRequestEvent event) {
+    public void requestDestroyed(final ServletRequestEvent event) {
         // no-op
     }
 
@@ -127,7 +127,7 @@ public class BeginWebBeansListener imple
      * {@inheritDoc}
      */
     @Override
-    public void sessionDestroyed(HttpSessionEvent event) {
+    public void sessionDestroyed(final HttpSessionEvent event) {
         ensureRequestScope();
     }
 
@@ -139,21 +139,20 @@ public class BeginWebBeansListener imple
         }
     }
 
-
     @Override
-    public void sessionWillPassivate(HttpSessionEvent event) {
+    public void sessionWillPassivate(final HttpSessionEvent event) {
         ensureRequestScope();
     }
 
     @Override
-    public void sessionDidActivate(HttpSessionEvent event) {
+    public void sessionDidActivate(final HttpSessionEvent event) {
         if (failoverService.isSupportFailOver() || failoverService.isSupportPassivation()) {
             failoverService.sessionDidActivate(event.getSession());
         }
     }
 
     @Override
-    public void contextInitialized(ServletContextEvent servletContextEvent) {
+    public void contextInitialized(final ServletContextEvent servletContextEvent) {
         try {
             OpenEJBLifecycle.initializeServletContext(servletContextEvent.getServletContext(), webBeansContext);
         } catch (final Exception e) {
@@ -163,7 +162,7 @@ public class BeginWebBeansListener imple
     }
 
     @Override
-    public void contextDestroyed(ServletContextEvent servletContextEvent) {
+    public void contextDestroyed(final ServletContextEvent servletContextEvent) {
         ensureRequestScope();
     }
 }

Modified: tomee/tomee/trunk/server/openejb-multicast/src/main/java/org/apache/openejb/server/discovery/MulticastPulseAgent.java
URL: http://svn.apache.org/viewvc/tomee/tomee/trunk/server/openejb-multicast/src/main/java/org/apache/openejb/server/discovery/MulticastPulseAgent.java?rev=1549891&r1=1549890&r2=1549891&view=diff
==============================================================================
--- tomee/tomee/trunk/server/openejb-multicast/src/main/java/org/apache/openejb/server/discovery/MulticastPulseAgent.java (original)
+++ tomee/tomee/trunk/server/openejb-multicast/src/main/java/org/apache/openejb/server/discovery/MulticastPulseAgent.java Tue Dec 10 16:54:47 2013
@@ -26,6 +26,7 @@ import java.net.URI;
 import java.net.UnknownHostException;
 import java.nio.charset.Charset;
 import java.util.ArrayList;
+import java.util.Collections;
 import java.util.Comparator;
 import java.util.Enumeration;
 import java.util.HashSet;
@@ -39,6 +40,7 @@ import java.util.concurrent.Future;
 import java.util.concurrent.TimeUnit;
 import java.util.concurrent.atomic.AtomicBoolean;
 import java.util.concurrent.atomic.AtomicInteger;
+import java.util.concurrent.locks.ReentrantLock;
 
 /**
  * Licensed to the Apache Software Foundation (ASF) under one or more
@@ -66,9 +68,11 @@ public class MulticastPulseAgent impleme
 
     public static final String SERVER = "OpenEJB.MCP.Server:";
     public static final String CLIENT = "OpenEJB.MCP.Client:";
+    public static final String BADURI = ":BadUri:";
     public static final String EMPTY = "NoService";
 
-    private final Set<String> ignore = new HashSet<String>();
+    private final ReentrantLock lock = new ReentrantLock();
+    private final Set<String> ignore = Collections.synchronizedSet(new HashSet<String>());
     private final Set<URI> uriSet = new HashSet<URI>();
     private AtomicBoolean running = new AtomicBoolean(false);
     final ArrayList<Future> futures = new ArrayList<Future>();
@@ -84,12 +88,19 @@ public class MulticastPulseAgent impleme
     private boolean loopbackOnly = true;
 
     /**
+     * @author Andy Gumbrecht
      * This agent listens for client pulses on a defined multicast channel.
      * On receipt of a valid pulse the agent responds with its own pulse for
      * a defined amount of time and rate. A client can deliver a pulse as often as
      * required until it is happy of the server response.
      * <p/>
      * Both server and client deliver crafted information payloads.
+     * <p/>
+     * The client pulse contains OpenEJB.MCP.Client:(group or *)[:BadUri:URI]
+     * The server will only respond to a request for it's own group or *
+     * The optional :BadUri: is used by clients to notify a server that it is sending out unreachable URI's
+     * <p/>
+     * The server response pulse contains OpenEJB.MCP.Server:(Service|Service)|(Comma separated host list)
      */
     public MulticastPulseAgent() {
     }
@@ -111,7 +122,7 @@ public class MulticastPulseAgent impleme
                 length = 1;
             }
 
-            executor = Executors.newFixedThreadPool(length * 2);
+            executor = Executors.newFixedThreadPool(length * 3);
         }
 
         return executor;
@@ -134,7 +145,7 @@ public class MulticastPulseAgent impleme
                 }
             }
         } catch (Exception e) {
-            log.warning("Invalid ignore parameter. Should be a lowercase single or comma seperated list like: ignore=host1,host2");
+            log.warning("Invalid ignore parameter. Should be a lowercase single host or comma seperated list of hosts to ignore like: ignore=host1,host2,ipv4,ipv6");
         }
 
         this.multicast = o.get("bind", this.multicast);
@@ -148,41 +159,59 @@ public class MulticastPulseAgent impleme
 
     private void buildPacket() throws SocketException {
 
-        this.loopbackOnly = true;
-        for (final URI uri : this.uriSet) {
-            if (!isLoopback(uri.getHost())) {
-                this.loopbackOnly = false;
-                break;
+        final ReentrantLock l = this.lock;
+        l.lock();
+
+        try {
+            this.loopbackOnly = true;
+            for (final URI uri : this.uriSet) {
+                if (!isLoopback(uri.getHost())) {
+                    this.loopbackOnly = false;
+                    break;
+                }
             }
-        }
 
-        final String hosts = getHosts(this.ignore);
-        final StringBuilder sb = new StringBuilder(SERVER);
-        sb.append(this.group);
-        sb.append(':');
+            final String hosts = getHosts(this.ignore);
+            final StringBuilder sb = new StringBuilder(SERVER);
+            sb.append(this.group);
+            sb.append(':');
 
-        if (this.uriSet.size() > 0) {
-            for (final URI uri : this.uriSet) {
-                sb.append(uri.toASCIIString());
+            if (this.uriSet.size() > 0) {
+                for (final URI uri : this.uriSet) {
+                    sb.append(uri.toASCIIString());
+                    sb.append('|');
+                }
+            } else {
+                sb.append(EMPTY);
                 sb.append('|');
             }
-        } else {
-            sb.append(EMPTY);
-            sb.append('|');
-        }
 
-        sb.append(hosts);
+            sb.append(hosts);
+
+            final byte[] bytes = (sb.toString()).getBytes(UTF8);
+            this.response = new DatagramPacket(bytes, bytes.length, this.address);
 
-        final byte[] bytes = (sb.toString()).getBytes(UTF8);
-        this.response = new DatagramPacket(bytes, bytes.length, this.address);
+            if (log.isDebugEnabled()) {
+                log.debug("MultiPulse packet is: " + sb);
+            }
 
-        if (log.isDebugEnabled()) {
-            log.debug("MultiPulse packet is: " + sb);
+            if (bytes.length > 2048) {
+                log.warning("MultiPulse packet is larger than 2048 bytes, clients will not be able to read the packet" +
+                            "\n - You should define the 'ignore' property to filter out unreachable addresses: " + sb);
+            }
+        } finally {
+            l.unlock();
         }
+    }
 
-        if (bytes.length > 2048) {
-            log.warning("MultiPulse packet is larger than 2048 bytes, clients will not be able to read the packet" +
-                        "\n - You should define the 'ignore' property to filter out unreachable addresses: " + sb);
+    public DatagramPacket getResponsePacket() {
+        final ReentrantLock l = this.lock;
+        l.lock();
+
+        try {
+            return this.response;
+        } finally {
+            l.unlock();
         }
     }
 
@@ -191,6 +220,10 @@ public class MulticastPulseAgent impleme
         this.listener = listener;
     }
 
+    public DiscoveryListener getDiscoveryListener() {
+        return listener;
+    }
+
     @Override
     public void registerService(URI uri) throws IOException {
 
@@ -267,7 +300,7 @@ public class MulticastPulseAgent impleme
                     continue;
                 }
 
-                final Sender sender = new Sender(this, socketKey, socket, this.response);
+                final Sender sender = new Sender(this, socketKey, socket);
                 this.futures.add(executorService.submit(sender));
 
                 this.futures.add(executorService.submit(new Runnable() {
@@ -289,34 +322,64 @@ public class MulticastPulseAgent impleme
 
                                     if (req.startsWith(CLIENT)) {
 
-                                        req = (req.replace(CLIENT, ""));
+                                        final int ix = req.indexOf(BADURI);
+                                        String badUri = null;
+
+                                        if (ix > 0) {
+                                            //The client is notifying of a bad uri
+                                            badUri = req.substring(ix).replace(BADURI, "");
+                                            req = req.substring(0, ix).replace(CLIENT, "");
+                                        } else {
+                                            req = (req.replace(CLIENT, ""));
+                                        }
 
+                                        //Is this a group or global pulse request
                                         if (mpg.equals(req) || "*".equals(req)) {
 
-                                            final String client = ((InetSocketAddress) sa).getAddress().getHostAddress();
+                                            //Is there a bad url and is it this agent broadcasting the bad URI?
+                                            if (null != badUri && getHosts(MulticastPulseAgent.this.ignore).contains(badUri)) {
+                                                final ReentrantLock l = MulticastPulseAgent.this.lock;
+                                                l.lock();
+
+                                                try {
+                                                    //Remove it and rebuild our broadcast packet
+                                                    if (MulticastPulseAgent.this.ignore.add(badUri)) {
+                                                        MulticastPulseAgent.this.buildPacket();
+
+                                                        MulticastPulseAgent.this.fireEvent(URI.create("OpenEJB" + BADURI + badUri), false);
+
+                                                        log.warning("This server has removed the unreachable host '" + badUri + "' from discovery, you should consider adding" +
+                                                                    " this to the 'ignore' property in the multipulse.properties file");
+                                                    }
+
+                                                } finally {
+                                                    l.unlock();
+                                                }
+                                            } else {
 
-                                            if (isLoopBackOnly) {
-                                                //We only have local services, so make sure the request is from a local source else ignore it
-                                                if (!MulticastPulseAgent.isLocalAddress(client, false)) {
+                                                //Normal client multicast pulse request
+                                                final String client = ((InetSocketAddress) sa).getAddress().getHostAddress();
+
+                                                if (isLoopBackOnly && !MulticastPulseAgent.isLocalAddress(client, false)) {
+                                                    //We only have local services, so make sure the request is from a local source else ignore it
                                                     if (log.isDebugEnabled()) {
                                                         log.debug(String.format("Ignoring remote client %1$s pulse request for group: %2$s - No remote services available",
                                                                                 client,
                                                                                 req));
                                                     }
-                                                    return;
-                                                }
-                                            }
+                                                } else {
 
-                                            //We have received a valid pulse request
-                                            if (log.isDebugEnabled()) {
-                                                log.debug(String.format("Answering client '%1$s' pulse request for group: '%2$s' on '%3$s'", client, req, socketKey));
-                                            }
+                                                    //We have received a valid pulse request
+                                                    if (log.isDebugEnabled()) {
+                                                        log.debug(String.format("Answering client '%1$s' pulse request for group: '%2$s' on '%3$s'", client, req, socketKey));
+                                                    }
 
-                                            //Renew response pulse
-                                            sender.pulseResponse();
+                                                    //Renew response pulse
+                                                    sender.pulseResponse();
+                                                }
+                                            }
                                         }
                                     }
-
                                 }
 
                             } catch (Exception e) {
@@ -412,6 +475,27 @@ public class MulticastPulseAgent impleme
     }
 
     /**
+     * Lists current broadcast hosts as a comma separated list.
+     * Used principally for testing.
+     *
+     * @return String
+     */
+    public String getHosts() {
+        return getHosts(this.ignore);
+    }
+
+    /**
+     * Remove a host from the ignore list.
+     * Used principally for testing.
+     *
+     * @param host String
+     * @return True if removed, else false
+     */
+    public boolean removeFromIgnore(final String host) {
+        return this.ignore.remove(host);
+    }
+
+    /**
      * Attempts to return at least one socket per valid network interface.
      * If no valid interface is found then the array will be empty.
      *
@@ -612,13 +696,11 @@ public class MulticastPulseAgent impleme
         private final MulticastPulseAgent agent;
         private final String socketKey;
         private final MulticastSocket socket;
-        private final DatagramPacket mpr;
 
-        private Sender(final MulticastPulseAgent agent, final String socketKey, final MulticastSocket socket, final DatagramPacket mpr) {
+        private Sender(final MulticastPulseAgent agent, final String socketKey, final MulticastSocket socket) {
             this.agent = agent;
             this.socketKey = socketKey;
             this.socket = socket;
-            this.mpr = mpr;
         }
 
         @Override
@@ -640,7 +722,7 @@ public class MulticastPulseAgent impleme
                 while (this.counter.decrementAndGet() > 0) {
 
                     try {
-                        this.socket.send(this.mpr);
+                        this.socket.send(this.agent.getResponsePacket());
                     } catch (Exception e) {
                         if (log.isDebugEnabled()) {
                             log.debug("MulticastPulseAgent client error: " + e.getMessage(), e);

Modified: tomee/tomee/trunk/server/openejb-multicast/src/test/java/org/apache/openejb/server/discovery/MulticastPulseAgentTest.java
URL: http://svn.apache.org/viewvc/tomee/tomee/trunk/server/openejb-multicast/src/test/java/org/apache/openejb/server/discovery/MulticastPulseAgentTest.java?rev=1549891&r1=1549890&r2=1549891&view=diff
==============================================================================
--- tomee/tomee/trunk/server/openejb-multicast/src/test/java/org/apache/openejb/server/discovery/MulticastPulseAgentTest.java (original)
+++ tomee/tomee/trunk/server/openejb-multicast/src/test/java/org/apache/openejb/server/discovery/MulticastPulseAgentTest.java Tue Dec 10 16:54:47 2013
@@ -48,11 +48,6 @@ import java.util.concurrent.TimeUnit;
 import java.util.concurrent.atomic.AtomicBoolean;
 import java.util.concurrent.locks.ReentrantLock;
 
-/**
- * Copyright (c) ORPRO Vision GmbH.
- * Author: Andy Gumbrecht
- * Date: 11.06.12
- */
 @SuppressWarnings("UseOfSystemOutOrSystemErr")
 public class MulticastPulseAgentTest {
 
@@ -229,7 +224,10 @@ public class MulticastPulseAgentTest {
                                     final String[] serviceList = services.split("\\|");
                                     final String[] hosts = s.split(",");
 
-                                    System.out.println(String.format("\n" + name + " received Server pulse:\n\tGroup: %1$s\n\tServices: %2$s\n\tServer: %3$s\n", group, services, s));
+                                    System.out.println(String.format("\n" + name + " received Server pulse:\n\tGroup: %1$s\n\tServices: %2$s\n\tServer: %3$s\n",
+                                                                     group,
+                                                                     services,
+                                                                     s));
 
                                     for (final String svc : serviceList) {
 
@@ -392,6 +390,69 @@ public class MulticastPulseAgentTest {
         org.junit.Assert.assertTrue(timeout == 1 || set.size() > 0);
     }
 
+    @Test
+    public void testBroadcastBadUri() throws Exception {
+
+        final DiscoveryListener original = agent.getDiscoveryListener();
+
+        final CountDownLatch latch = new CountDownLatch(1);
+
+        final DiscoveryListener listener = new DiscoveryListener() {
+            @Override
+            public void serviceAdded(final URI service) {
+                latch.countDown();
+                System.out.println("added = " + service);
+            }
+
+            @Override
+            public void serviceRemoved(final URI service) {
+                latch.countDown();
+                System.out.println("removed = " + service);
+            }
+        };
+
+        agent.setDiscoveryListener(listener);
+
+        final String[] hosts = agent.getHosts().split(",");
+        final String host = hosts[hosts.length - 1];
+
+        final Future<?> future = executor.submit(new Runnable() {
+            @Override
+            public void run() {
+                try {
+                    final InetAddress ia = getAddress(MulticastPulseAgentTest.host);
+
+                    final byte[] bytes = (MulticastPulseAgent.CLIENT + forGroup + MulticastPulseAgent.BADURI + host).getBytes(Charset.forName("UTF-8"));
+                    final DatagramPacket request = new DatagramPacket(bytes, bytes.length, new InetSocketAddress(ia, port));
+
+                    final MulticastSocket[] multicastSockets = MulticastPulseAgent.getSockets(MulticastPulseAgentTest.host, port);
+
+                    for (final MulticastSocket socket : multicastSockets) {
+
+                        try {
+                            socket.send(request);
+                        } catch (Exception e) {
+                            System.out.println("Failed to broadcast bad URI on: " + socket.getInterface().getHostAddress());
+                            e.printStackTrace();
+                        }
+                    }
+                } catch (Exception e) {
+                    System.out.println("Failed to broadcast bad URI");
+                    e.printStackTrace();
+                }
+            }
+        });
+
+        final Object o = future.get(10, TimeUnit.SECONDS);
+
+        final boolean await = latch.await(20, TimeUnit.SECONDS);
+        final boolean removed = agent.removeFromIgnore(host);
+
+        agent.setDiscoveryListener(original);
+
+        org.junit.Assert.assertTrue("Failed to remove host", removed && await);
+    }
+
     private String ipFormat(final String h) throws UnknownHostException {
 
         final InetAddress ia = InetAddress.getByName(h);
@@ -402,6 +463,20 @@ public class MulticastPulseAgentTest {
         }
     }
 
+    private static InetAddress getAddress(final String host) throws Exception {
+        final InetAddress ia;
+        try {
+            ia = InetAddress.getByName(host);
+        } catch (UnknownHostException e) {
+            throw new Exception(host + " is not a valid address", e);
+        }
+
+        if (null == ia || !ia.isMulticastAddress()) {
+            throw new Exception(host + " is not a valid multicast address");
+        }
+        return ia;
+    }
+
     private static class MyDiscoveryListener implements DiscoveryListener {
 
         private final String id;

Modified: tomee/tomee/trunk/server/openejb-server/pom.xml
URL: http://svn.apache.org/viewvc/tomee/tomee/trunk/server/openejb-server/pom.xml?rev=1549891&r1=1549890&r2=1549891&view=diff
==============================================================================
--- tomee/tomee/trunk/server/openejb-server/pom.xml (original)
+++ tomee/tomee/trunk/server/openejb-server/pom.xml Tue Dec 10 16:54:47 2013
@@ -65,6 +65,11 @@
     </dependency>
     <dependency>
       <groupId>org.apache.openejb</groupId>
+      <artifactId>openejb-jee</artifactId>
+      <version>${project.version}</version>
+    </dependency>
+    <dependency>
+      <groupId>org.apache.openejb</groupId>
       <artifactId>openejb-loader</artifactId>
       <version>${project.version}</version>
     </dependency>
@@ -84,6 +89,10 @@
     </dependency>
     <dependency>
       <groupId>org.apache.xbean</groupId>
+      <artifactId>xbean-finder-shaded</artifactId>
+    </dependency>
+    <dependency>
+      <groupId>org.apache.xbean</groupId>
       <artifactId>xbean-reflect</artifactId>
     </dependency>
     <dependency>

Modified: tomee/tomee/trunk/server/openejb-server/src/main/java/org/apache/openejb/server/SimpleServiceManager.java
URL: http://svn.apache.org/viewvc/tomee/tomee/trunk/server/openejb-server/src/main/java/org/apache/openejb/server/SimpleServiceManager.java?rev=1549891&r1=1549890&r2=1549891&view=diff
==============================================================================
--- tomee/tomee/trunk/server/openejb-server/src/main/java/org/apache/openejb/server/SimpleServiceManager.java (original)
+++ tomee/tomee/trunk/server/openejb-server/src/main/java/org/apache/openejb/server/SimpleServiceManager.java Tue Dec 10 16:54:47 2013
@@ -155,7 +155,7 @@ public class SimpleServiceManager extend
     @Override
     public synchronized void start(final boolean block) throws ServiceException {
 
-        if(stopped){
+        if (stopped) {
             throw new ServiceException("Stop has already been called on ServiceManager");
         }
 

Modified: tomee/tomee/trunk/server/openejb-server/src/test/java/org/apache/openejb/server/FilteredServiceManagerWithAdminTest.java
URL: http://svn.apache.org/viewvc/tomee/tomee/trunk/server/openejb-server/src/test/java/org/apache/openejb/server/FilteredServiceManagerWithAdminTest.java?rev=1549891&r1=1549890&r2=1549891&view=diff
==============================================================================
--- tomee/tomee/trunk/server/openejb-server/src/test/java/org/apache/openejb/server/FilteredServiceManagerWithAdminTest.java (original)
+++ tomee/tomee/trunk/server/openejb-server/src/test/java/org/apache/openejb/server/FilteredServiceManagerWithAdminTest.java Tue Dec 10 16:54:47 2013
@@ -40,12 +40,12 @@ public class FilteredServiceManagerWithA
     }
 
     @Test
-    public void numberOfServices () {
+    public void numberOfServices() {
         // when using @EnableServices with the application composer
         // the return value should be a FilteredServiceManager
         assertEquals(FilteredServiceManager.class, ServiceManager.get().getClass());
 
-        FilteredServiceManager manager = (FilteredServiceManager) ServiceManager.get();
+        final FilteredServiceManager manager = (FilteredServiceManager) ServiceManager.get();
         assertEquals(1, manager.getDaemons().length);
         assertEquals("admin", manager.getDaemons()[0].getName());
     }



Re: svn commit: r1549891 - in /tomee/tomee/trunk/server: openejb-client/src/main/java/org/apache/openejb/client/ openejb-common-cli/ openejb-common-cli/src/main/java/org/apache/openejb/server/cli/ openejb-http/src/main/java/org/apache/openejb/server/httpd/...

Posted by Thiago Veronezi <th...@veronezi.org>.
Hi Andy,
Did you miss a commit? The build is broken. It looks like the versions are
missing.

[]s,
Thiago.


On Tue, Dec 10, 2013 at 11:54 AM, <an...@apache.org> wrote:

> Author: andygumbrecht
> Date: Tue Dec 10 16:54:47 2013
> New Revision: 1549891
>
> URL: http://svn.apache.org/r1549891
> Log:
> Improve MulticastPulseAgent/MulticastPulseClient by allowing client to
> notify server of unreachable hosts - Eventually a server will stop sending
> them out.
> Pull some more <version> tags up to parent pom dep-management where they
> should all be.
> Allow server project to build from it's own directory.
> Finals.
>
> Modified:
>
> tomee/tomee/trunk/server/openejb-client/src/main/java/org/apache/openejb/client/MulticastPulseClient.java
>     tomee/tomee/trunk/server/openejb-common-cli/pom.xml
>
> tomee/tomee/trunk/server/openejb-common-cli/src/main/java/org/apache/openejb/server/cli/CliRunnable.java
>
> tomee/tomee/trunk/server/openejb-common-cli/src/main/java/org/apache/openejb/server/cli/StreamManager.java
>
> tomee/tomee/trunk/server/openejb-http/src/main/java/org/apache/openejb/server/httpd/BeginWebBeansListener.java
>
> tomee/tomee/trunk/server/openejb-multicast/src/main/java/org/apache/openejb/server/discovery/MulticastPulseAgent.java
>
> tomee/tomee/trunk/server/openejb-multicast/src/test/java/org/apache/openejb/server/discovery/MulticastPulseAgentTest.java
>     tomee/tomee/trunk/server/openejb-server/pom.xml
>
> tomee/tomee/trunk/server/openejb-server/src/main/java/org/apache/openejb/server/SimpleServiceManager.java
>
> tomee/tomee/trunk/server/openejb-server/src/test/java/org/apache/openejb/server/FilteredServiceManagerWithAdminTest.java
>
> Modified:
> tomee/tomee/trunk/server/openejb-client/src/main/java/org/apache/openejb/client/MulticastPulseClient.java
> URL:
> http://svn.apache.org/viewvc/tomee/tomee/trunk/server/openejb-client/src/main/java/org/apache/openejb/client/MulticastPulseClient.java?rev=1549891&r1=1549890&r2=1549891&view=diff
>
> ==============================================================================
> ---
> tomee/tomee/trunk/server/openejb-client/src/main/java/org/apache/openejb/client/MulticastPulseClient.java
> (original)
> +++
> tomee/tomee/trunk/server/openejb-client/src/main/java/org/apache/openejb/client/MulticastPulseClient.java
> Tue Dec 10 16:54:47 2013
> @@ -64,6 +64,7 @@ public class MulticastPulseClient extend
>      private static final Logger log = Logger.getLogger("OpenEJB.client");
>      private static final String SERVER = "OpenEJB.MCP.Server:";
>      private static final String CLIENT = "OpenEJB.MCP.Client:";
> +    private static final String BADURI = ":BadUri:";
>      private static final String EMPTY = "NoService";
>      private static final Charset UTF8 = Charset.forName("UTF-8");
>      private static final int TTL =
> Integer.parseInt(System.getProperty(ORG_APACHE_OPENEJB_MULTIPULSE_TTL,
> "32"));
> @@ -118,12 +119,7 @@ public class MulticastPulseClient extend
>
>          if (null == uriSet || uriSet.isEmpty()) {
>
> -            final Map<String, String> params;
> -            try {
> -                params = URIs.parseParamters(uri);
> -            } catch (URISyntaxException e) {
> -                throw new IllegalArgumentException("Invalid MultiPulse
> uri " + uri.toString(), e);
> -            }
> +            final Map<String, String> params = getUriParameters(uri);
>
>              final Set<String> schemes = getSet(params, "schemes",
> this.getDefaultSchemes());
>              final String group = getString(params, "group", "default");
> @@ -140,17 +136,39 @@ public class MulticastPulseClient extend
>
>          for (final URI serviceURI : uriSet) {
>
> +            //Strip serverhost and group and try to connect
> +            final URI tryUri =
> URI.create(URI.create(serviceURI.getSchemeSpecificPart()).getSchemeSpecificPart());
> +
>              try {
> -                //Strip serverhost and group and try to connect
> -                return
> ConnectionManager.getConnection(URI.create(URI.create(serviceURI.getSchemeSpecificPart()).getSchemeSpecificPart()));
> +                return ConnectionManager.getConnection(tryUri);
>              } catch (Exception e) {
> +
>                  uriSet.remove(serviceURI);
> +
> +                if (java.net.SocketTimeoutException.class.isInstance(e)
> || SocketException.class.isInstance(e)) {
> +                    //Notify server that this URI is not reachable
> +
>  MulticastPulseClient.broadcastBadUri(getString(getUriParameters(uri),
> "group", "default"), tryUri, uri.getHost(), uri.getPort());
> +                }
> +
> +                if (log.isLoggable(Level.FINE)) {
> +                    log.fine("Failed connection to: " + serviceURI);
> +                }
>              }
>          }
>
>          throw new IOException("Unable to connect an ejb server via the
> MultiPulse URI: " + uri);
>      }
>
> +    private static Map<String, String> getUriParameters(final URI uri) {
> +        final Map<String, String> params;
> +        try {
> +            params = URIs.parseParamters(uri);
> +        } catch (URISyntaxException e) {
> +            throw new IllegalArgumentException("Invalid MultiPulse uri "
> + uri.toString(), e);
> +        }
> +        return params;
> +    }
> +
>      /**
>       * Get a list of URIs discovered for the provided request.
>       * <p/>
> @@ -187,17 +205,7 @@ public class MulticastPulseClient extend
>              throw new Exception("Specify a valid port between 1 and
> 65535");
>          }
>
> -        final InetAddress ia;
> -
> -        try {
> -            ia = InetAddress.getByName(host);
> -        } catch (UnknownHostException e) {
> -            throw new Exception(host + " is not a valid address", e);
> -        }
> -
> -        if (null == ia || !ia.isMulticastAddress()) {
> -            throw new Exception(host + " is not a valid multicast
> address");
> -        }
> +        final InetAddress ia = getAddress(host);
>
>          final byte[] bytes = (MulticastPulseClient.CLIENT +
> forGroup).getBytes(UTF8);
>          final DatagramPacket request = new DatagramPacket(bytes,
> bytes.length, new InetSocketAddress(ia, port));
> @@ -492,6 +500,20 @@ public class MulticastPulseClient extend
>          }
>      }
>
> +    private static InetAddress getAddress(final String host) throws
> Exception {
> +        final InetAddress ia;
> +        try {
> +            ia = InetAddress.getByName(host);
> +        } catch (UnknownHostException e) {
> +            throw new Exception(host + " is not a valid address", e);
> +        }
> +
> +        if (null == ia || !ia.isMulticastAddress()) {
> +            throw new Exception(host + " is not a valid multicast
> address");
> +        }
> +        return ia;
> +    }
> +
>      /**
>       * Is the provided host a local host
>       *
> @@ -681,7 +703,10 @@ public class MulticastPulseClient extend
>                                  s.connect(new InetSocketAddress(host,
> port), st);
>                                  b = true;
>                              } catch (Exception e) {
> -                                //Ignore
> +                                if
> (java.net.SocketTimeoutException.class.isInstance(e) ||
> SocketException.class.isInstance(e)) {
> +
>  MulticastPulseClient.broadcastBadUri(group, uriSub, mchost, mcport);
> +                                    System.out.print("" + e + " : ");
> +                                }
>                              } finally {
>                                  try {
>                                      s.close();
> @@ -716,4 +741,38 @@ public class MulticastPulseClient extend
>          running.set(false);
>          t.interrupt();
>      }
> +
> +    /**
> +     * Asynchronous attempt to broadcast a bad URI on our channel.
> +     * Hopefully the culprit server will hear this and stop sending it.
> +     *
> +     * @param uri Bad URI to broadcast
> +     */
> +    private static void broadcastBadUri(final String group, final URI
> uri, final String host, final int port) {
> +
> +        getExecutorService().submit(new Runnable() {
> +            @Override
> +            public void run() {
> +                try {
> +                    final InetAddress ia = getAddress(host);
> +
> +                    final byte[] bytes = (MulticastPulseClient.CLIENT +
> group + MulticastPulseClient.BADURI + uri.getHost()).getBytes(UTF8);
> +                    final DatagramPacket request = new
> DatagramPacket(bytes, bytes.length, new InetSocketAddress(ia, port));
> +
> +                    final MulticastSocket[] multicastSockets =
> MulticastPulseClient.getSockets(ia, port);
> +
> +                    for (final MulticastSocket socket : multicastSockets)
> {
> +
> +                        try {
> +                            socket.send(request);
> +                        } catch (Exception e) {
> +                            log.log(Level.WARNING, "Failed to broadcast
> bad URI: " + uri + " on: " + socket.getInterface().getHostAddress(), e);
> +                        }
> +                    }
> +                } catch (Exception e) {
> +                    log.log(Level.WARNING, "Failed to broadcast bad URI:
> " + uri, e);
> +                }
> +            }
> +        });
> +    }
>  }
>
> Modified: tomee/tomee/trunk/server/openejb-common-cli/pom.xml
> URL:
> http://svn.apache.org/viewvc/tomee/tomee/trunk/server/openejb-common-cli/pom.xml?rev=1549891&r1=1549890&r2=1549891&view=diff
>
> ==============================================================================
> --- tomee/tomee/trunk/server/openejb-common-cli/pom.xml (original)
> +++ tomee/tomee/trunk/server/openejb-common-cli/pom.xml Tue Dec 10
> 16:54:47 2013
> @@ -36,20 +36,38 @@
>        <version>${project.version}</version>
>      </dependency>
>      <dependency>
> +      <groupId>org.apache.openejb</groupId>
> +      <artifactId>openejb-loader</artifactId>
> +      <version>${project.version}</version>
> +    </dependency>
> +    <dependency>
>        <groupId>jline</groupId>
>        <artifactId>jline</artifactId>
> -      <version>0.9.94</version>
>      </dependency>
>      <dependency>
>        <groupId>com.google.code.gson</groupId>
>        <artifactId>gson</artifactId>
> -      <version>2.1</version>
>      </dependency>
>      <dependency>
> +      <groupId>org.apache.xbean</groupId>
> +      <artifactId>xbean-reflect</artifactId>
> +    </dependency>
> +    <dependency>
> +      <groupId>org.apache.xbean</groupId>
> +      <artifactId>xbean-finder-shaded</artifactId>
> +    </dependency>
> +    <dependency>
> +      <groupId>org.apache.commons</groupId>
> +      <artifactId>commons-lang3</artifactId>
> +    </dependency>
> +
> +    <!-- Test scope -->
> +    <dependency>
>        <groupId>org.codehaus.groovy</groupId>
>        <artifactId>groovy-all</artifactId>
> -      <version>2.1.0</version>
> -      <scope>test</scope> <!-- don't deliver it -->
> +      <scope>test</scope>
>      </dependency>
> +
> +
>    </dependencies>
>  </project>
>
> Modified:
> tomee/tomee/trunk/server/openejb-common-cli/src/main/java/org/apache/openejb/server/cli/CliRunnable.java
> URL:
> http://svn.apache.org/viewvc/tomee/tomee/trunk/server/openejb-common-cli/src/main/java/org/apache/openejb/server/cli/CliRunnable.java?rev=1549891&r1=1549890&r2=1549891&view=diff
>
> ==============================================================================
> ---
> tomee/tomee/trunk/server/openejb-common-cli/src/main/java/org/apache/openejb/server/cli/CliRunnable.java
> (original)
> +++
> tomee/tomee/trunk/server/openejb-common-cli/src/main/java/org/apache/openejb/server/cli/CliRunnable.java
> Tue Dec 10 16:54:47 2013
> @@ -42,6 +42,7 @@ import java.util.Properties;
>  import java.util.TreeMap;
>
>  public class CliRunnable implements Runnable {
> +
>      private static final Logger LOGGER =
> Logger.getInstance(LogCategory.OPENEJB_SERVER, CliRunnable.class);
>
>      private static final String BRANDING_FILE = "branding.properties";
> @@ -87,8 +88,11 @@ public class CliRunnable implements Runn
>              UrlSet urlSet = new UrlSet(loader).excludeJvm();
>              urlSet = urlSet.exclude(loader.getParent());
>
> -            final IAnnotationFinder finder = new AnnotationFinder(new
> ConfigurableClasspathArchive(new
> ConfigurableClasspathArchive.FakeModule(loader, Collections.EMPTY_MAP),
> true, urlSet.getUrls()));
> -            for (Annotated<Class<?>> cmd :
> finder.findMetaAnnotatedClasses(Command.class)) {
> +            //noinspection unchecked
> +            final IAnnotationFinder finder = new AnnotationFinder(new
> ConfigurableClasspathArchive(new
> ConfigurableClasspathArchive.FakeModule(loader, Collections.EMPTY_MAP),
> +
>                           true,
> +
>                           urlSet.getUrls()));
> +            for (final Annotated<Class<?>> cmd :
> finder.findMetaAnnotatedClasses(Command.class)) {
>                  try {
>                      final Command annotation =
> cmd.getAnnotation(Command.class);
>                      final String key = annotation.name();
> @@ -113,15 +117,15 @@ public class CliRunnable implements Runn
>      private OutputStream err;
>      private OutputStream out;
>      private InputStream sin;
> -    private String username;
> -    private String bind;
> -    private int port;
> +    private final String username;
> +    private final String bind;
> +    private final int port;
>
> -    public CliRunnable(String bind, int port) {
> +    public CliRunnable(final String bind, final int port) {
>          this(bind, port, PROMPT, null);
>      }
>
> -    public CliRunnable(String bind, int port, String username, String
> sep) {
> +    public CliRunnable(final String bind, final int port, final String
> username, final String sep) {
>          this.bind = bind;
>          this.port = port;
>          this.username = username;
> @@ -144,15 +148,15 @@ public class CliRunnable implements Runn
>          }
>      }
>
> -    public void setInputStream(InputStream in) {
> +    public void setInputStream(final InputStream in) {
>          sin = in;
>      }
>
> -    public void setOutputStream(OutputStream out) {
> +    public void setOutputStream(final OutputStream out) {
>          this.out = out;
>      }
>
> -    public void setErrorStream(OutputStream err) {
> +    public void setErrorStream(final OutputStream err) {
>          this.err = err;
>      }
>
> @@ -165,9 +169,10 @@ public class CliRunnable implements Runn
>      }
>
>      public void clean() {
> -        scripter.clearEngines();
> +        OpenEJBScripter.clearEngines();
>      }
>
> +    @Override
>      public void run() {
>          clean();
>
> @@ -180,13 +185,13 @@ public class CliRunnable implements Runn
>              // TODO : add completers
>
>              String line;
> -            StringBuilder builtWelcome = new StringBuilder("Apache
> OpenEJB ")
> -                    .append(OpenEjbVersion.get().getVersion())
> -                    .append("    build: ")
> -                    .append(OpenEjbVersion.get().getDate())
> -                    .append("-")
> -                    .append(OpenEjbVersion.get().getTime())
> -                    .append(lineSep);
> +            final StringBuilder builtWelcome = new StringBuilder("Apache
> OpenEJB ")
> +
> .append(OpenEjbVersion.get().getVersion())
> +                                                   .append("    build: ")
> +
> .append(OpenEjbVersion.get().getDate())
> +                                                   .append("-")
> +
> .append(OpenEjbVersion.get().getTime())
> +                                                   .append(lineSep);
>              if (tomee) {
>
>  builtWelcome.append(OS_LINE_SEP).append(PROPERTIES.getProperty(WELCOME_TOMEE_KEY));
>              } else {
> @@ -196,10 +201,10 @@ public class CliRunnable implements Runn
>
>              streamManager.writeOut(OpenEjbVersion.get().getUrl());
>              streamManager.writeOut(builtWelcome.toString()
> -                    .replace("$bind", bind)
> -                    .replace("$port", Integer.toString(port))
> -                    .replace("$name", NAME)
> -                    .replace(OS_LINE_SEP, lineSep));
> +                                               .replace("$bind", bind)
> +                                               .replace("$port",
> Integer.toString(port))
> +                                               .replace("$name", NAME)
> +                                               .replace(OS_LINE_SEP,
> lineSep));
>
>              while ((line = reader.readLine(prompt())) != null) {
>                  // exit simply let us go out of the loop
> @@ -210,7 +215,7 @@ public class CliRunnable implements Runn
>
>                  Class<?> cmdClass = null;
>                  String key = null;
> -                for (Map.Entry<String, Class<?>> cmd :
> COMMANDS.entrySet()) {
> +                for (final Map.Entry<String, Class<?>> cmd :
> COMMANDS.entrySet()) {
>                      if (line.startsWith(cmd.getKey())) {
>                          cmdClass = cmd.getValue();
>                          key = cmd.getKey();
> @@ -270,8 +275,8 @@ public class CliRunnable implements Runn
>              prompt.append(PROMPT);
>          }
>          prompt.append(" @ ")
> -            .append(bind).append(":").append(port)
> -            .append(PROMPT_SUFFIX);
> +              .append(bind).append(":").append(port)
> +              .append(PROMPT_SUFFIX);
>          return prompt.toString();
>      }
>  }
>
> Modified:
> tomee/tomee/trunk/server/openejb-common-cli/src/main/java/org/apache/openejb/server/cli/StreamManager.java
> URL:
> http://svn.apache.org/viewvc/tomee/tomee/trunk/server/openejb-common-cli/src/main/java/org/apache/openejb/server/cli/StreamManager.java?rev=1549891&r1=1549890&r2=1549891&view=diff
>
> ==============================================================================
> ---
> tomee/tomee/trunk/server/openejb-common-cli/src/main/java/org/apache/openejb/server/cli/StreamManager.java
> (original)
> +++
> tomee/tomee/trunk/server/openejb-common-cli/src/main/java/org/apache/openejb/server/cli/StreamManager.java
> Tue Dec 10 16:54:47 2013
> @@ -26,6 +26,7 @@ import java.io.OutputStreamWriter;
>  import java.util.Collection;
>
>  public class StreamManager {
> +
>      private static final String OS_LINE_SEP =
> System.getProperty("line.separator");
>
>      private String lineSep;
> @@ -34,16 +35,16 @@ public class StreamManager {
>      private OutputStream out;
>      private OutputStream err;
>
> -    public StreamManager(OutputStream out, OutputStream err, String
> lineSep) {
> +    public StreamManager(final OutputStream out, final OutputStream err,
> final String lineSep) {
>          this.lineSep = lineSep;
>          this.out = out;
>          this.err = err;
>          this.sout = new OutputStreamWriter(out);
> -        this.serr= new OutputStreamWriter(err);
> +        this.serr = new OutputStreamWriter(err);
>      }
>
>      private void write(final OutputStreamWriter writer, final String s) {
> -        for (String l : s.split(lineSep)) {
> +        for (final String l : s.split(lineSep)) {
>              try {
>                  writer.write(l);
>                  writer.write(lineSep);
> @@ -64,7 +65,7 @@ public class StreamManager {
>          } else {
>              final StringBuilder error = new StringBuilder();
>              error.append(e.getMessage()).append(lineSep);
> -            for (StackTraceElement elt : e.getStackTrace()) {
> +            for (final StackTraceElement elt : e.getStackTrace()) {
>                  error.append("
>  ").append(elt.toString()).append(lineSep);
>              }
>              write(serr, error.toString());
> @@ -77,7 +78,7 @@ public class StreamManager {
>          }
>          if (out instanceof Collection) {
>              final StringBuilder builder = new StringBuilder();
> -            for (Object o : (Collection) out) {
> +            for (final Object o : (Collection) out) {
>                  builder.append(string(o, lineSep)).append(lineSep);
>              }
>              return builder.toString();
> @@ -89,10 +90,10 @@ public class StreamManager {
>          if (!out.getClass().getName().startsWith("java")) {
>              try {
>                  return new
> GsonBuilder().setPrettyPrinting().create().toJson(out)
> -                            .replace(OS_LINE_SEP, lineSep);
> +                                        .replace(OS_LINE_SEP, lineSep);
>              } catch (RuntimeException re) {
>                  return ToStringBuilder.reflectionToString(out,
> ToStringStyle.SHORT_PREFIX_STYLE)
> -                            .replace(OS_LINE_SEP, lineSep);
> +                                      .replace(OS_LINE_SEP, lineSep);
>              }
>          }
>          return out.toString();
> @@ -123,7 +124,7 @@ public class StreamManager {
>      }
>
>      public void writeOut(final String text, final String sep) {
> -        for (String line : text.split(sep)) {
> +        for (final String line : text.split(sep)) {
>              writeOut(line);
>          }
>      }
>
> Modified:
> tomee/tomee/trunk/server/openejb-http/src/main/java/org/apache/openejb/server/httpd/BeginWebBeansListener.java
> URL:
> http://svn.apache.org/viewvc/tomee/tomee/trunk/server/openejb-http/src/main/java/org/apache/openejb/server/httpd/BeginWebBeansListener.java?rev=1549891&r1=1549890&r2=1549891&view=diff
>
> ==============================================================================
> ---
> tomee/tomee/trunk/server/openejb-http/src/main/java/org/apache/openejb/server/httpd/BeginWebBeansListener.java
> (original)
> +++
> tomee/tomee/trunk/server/openejb-http/src/main/java/org/apache/openejb/server/httpd/BeginWebBeansListener.java
> Tue Dec 10 16:54:47 2013
> @@ -61,7 +61,7 @@ public class BeginWebBeansListener imple
>       *
>       * @param webBeansContext the OWB context
>       */
> -    public BeginWebBeansListener(WebBeansContext webBeansContext) {
> +    public BeginWebBeansListener(final WebBeansContext webBeansContext) {
>          this.webBeansContext = webBeansContext;
>          this.failoverService =
> this.webBeansContext.getService(FailOverService.class);
>          this.contextKey = "org.apache.tomee.catalina.WebBeansListener@"
> + webBeansContext.hashCode();
> @@ -71,7 +71,7 @@ public class BeginWebBeansListener imple
>       * {@inheritDoc}
>       */
>      @Override
> -    public void requestDestroyed(ServletRequestEvent event) {
> +    public void requestDestroyed(final ServletRequestEvent event) {
>          // no-op
>      }
>
> @@ -127,7 +127,7 @@ public class BeginWebBeansListener imple
>       * {@inheritDoc}
>       */
>      @Override
> -    public void sessionDestroyed(HttpSessionEvent event) {
> +    public void sessionDestroyed(final HttpSessionEvent event) {
>          ensureRequestScope();
>      }
>
> @@ -139,21 +139,20 @@ public class BeginWebBeansListener imple
>          }
>      }
>
> -
>      @Override
> -    public void sessionWillPassivate(HttpSessionEvent event) {
> +    public void sessionWillPassivate(final HttpSessionEvent event) {
>          ensureRequestScope();
>      }
>
>      @Override
> -    public void sessionDidActivate(HttpSessionEvent event) {
> +    public void sessionDidActivate(final HttpSessionEvent event) {
>          if (failoverService.isSupportFailOver() ||
> failoverService.isSupportPassivation()) {
>              failoverService.sessionDidActivate(event.getSession());
>          }
>      }
>
>      @Override
> -    public void contextInitialized(ServletContextEvent
> servletContextEvent) {
> +    public void contextInitialized(final ServletContextEvent
> servletContextEvent) {
>          try {
>
>  OpenEJBLifecycle.initializeServletContext(servletContextEvent.getServletContext(),
> webBeansContext);
>          } catch (final Exception e) {
> @@ -163,7 +162,7 @@ public class BeginWebBeansListener imple
>      }
>
>      @Override
> -    public void contextDestroyed(ServletContextEvent servletContextEvent)
> {
> +    public void contextDestroyed(final ServletContextEvent
> servletContextEvent) {
>          ensureRequestScope();
>      }
>  }
>
> Modified:
> tomee/tomee/trunk/server/openejb-multicast/src/main/java/org/apache/openejb/server/discovery/MulticastPulseAgent.java
> URL:
> http://svn.apache.org/viewvc/tomee/tomee/trunk/server/openejb-multicast/src/main/java/org/apache/openejb/server/discovery/MulticastPulseAgent.java?rev=1549891&r1=1549890&r2=1549891&view=diff
>
> ==============================================================================
> ---
> tomee/tomee/trunk/server/openejb-multicast/src/main/java/org/apache/openejb/server/discovery/MulticastPulseAgent.java
> (original)
> +++
> tomee/tomee/trunk/server/openejb-multicast/src/main/java/org/apache/openejb/server/discovery/MulticastPulseAgent.java
> Tue Dec 10 16:54:47 2013
> @@ -26,6 +26,7 @@ import java.net.URI;
>  import java.net.UnknownHostException;
>  import java.nio.charset.Charset;
>  import java.util.ArrayList;
> +import java.util.Collections;
>  import java.util.Comparator;
>  import java.util.Enumeration;
>  import java.util.HashSet;
> @@ -39,6 +40,7 @@ import java.util.concurrent.Future;
>  import java.util.concurrent.TimeUnit;
>  import java.util.concurrent.atomic.AtomicBoolean;
>  import java.util.concurrent.atomic.AtomicInteger;
> +import java.util.concurrent.locks.ReentrantLock;
>
>  /**
>   * Licensed to the Apache Software Foundation (ASF) under one or more
> @@ -66,9 +68,11 @@ public class MulticastPulseAgent impleme
>
>      public static final String SERVER = "OpenEJB.MCP.Server:";
>      public static final String CLIENT = "OpenEJB.MCP.Client:";
> +    public static final String BADURI = ":BadUri:";
>      public static final String EMPTY = "NoService";
>
> -    private final Set<String> ignore = new HashSet<String>();
> +    private final ReentrantLock lock = new ReentrantLock();
> +    private final Set<String> ignore = Collections.synchronizedSet(new
> HashSet<String>());
>      private final Set<URI> uriSet = new HashSet<URI>();
>      private AtomicBoolean running = new AtomicBoolean(false);
>      final ArrayList<Future> futures = new ArrayList<Future>();
> @@ -84,12 +88,19 @@ public class MulticastPulseAgent impleme
>      private boolean loopbackOnly = true;
>
>      /**
> +     * @author Andy Gumbrecht
>       * This agent listens for client pulses on a defined multicast
> channel.
>       * On receipt of a valid pulse the agent responds with its own pulse
> for
>       * a defined amount of time and rate. A client can deliver a pulse as
> often as
>       * required until it is happy of the server response.
>       * <p/>
>       * Both server and client deliver crafted information payloads.
> +     * <p/>
> +     * The client pulse contains OpenEJB.MCP.Client:(group or
> *)[:BadUri:URI]
> +     * The server will only respond to a request for it's own group or *
> +     * The optional :BadUri: is used by clients to notify a server that
> it is sending out unreachable URI's
> +     * <p/>
> +     * The server response pulse contains
> OpenEJB.MCP.Server:(Service|Service)|(Comma separated host list)
>       */
>      public MulticastPulseAgent() {
>      }
> @@ -111,7 +122,7 @@ public class MulticastPulseAgent impleme
>                  length = 1;
>              }
>
> -            executor = Executors.newFixedThreadPool(length * 2);
> +            executor = Executors.newFixedThreadPool(length * 3);
>          }
>
>          return executor;
> @@ -134,7 +145,7 @@ public class MulticastPulseAgent impleme
>                  }
>              }
>          } catch (Exception e) {
> -            log.warning("Invalid ignore parameter. Should be a lowercase
> single or comma seperated list like: ignore=host1,host2");
> +            log.warning("Invalid ignore parameter. Should be a lowercase
> single host or comma seperated list of hosts to ignore like:
> ignore=host1,host2,ipv4,ipv6");
>          }
>
>          this.multicast = o.get("bind", this.multicast);
> @@ -148,41 +159,59 @@ public class MulticastPulseAgent impleme
>
>      private void buildPacket() throws SocketException {
>
> -        this.loopbackOnly = true;
> -        for (final URI uri : this.uriSet) {
> -            if (!isLoopback(uri.getHost())) {
> -                this.loopbackOnly = false;
> -                break;
> +        final ReentrantLock l = this.lock;
> +        l.lock();
> +
> +        try {
> +            this.loopbackOnly = true;
> +            for (final URI uri : this.uriSet) {
> +                if (!isLoopback(uri.getHost())) {
> +                    this.loopbackOnly = false;
> +                    break;
> +                }
>              }
> -        }
>
> -        final String hosts = getHosts(this.ignore);
> -        final StringBuilder sb = new StringBuilder(SERVER);
> -        sb.append(this.group);
> -        sb.append(':');
> +            final String hosts = getHosts(this.ignore);
> +            final StringBuilder sb = new StringBuilder(SERVER);
> +            sb.append(this.group);
> +            sb.append(':');
>
> -        if (this.uriSet.size() > 0) {
> -            for (final URI uri : this.uriSet) {
> -                sb.append(uri.toASCIIString());
> +            if (this.uriSet.size() > 0) {
> +                for (final URI uri : this.uriSet) {
> +                    sb.append(uri.toASCIIString());
> +                    sb.append('|');
> +                }
> +            } else {
> +                sb.append(EMPTY);
>                  sb.append('|');
>              }
> -        } else {
> -            sb.append(EMPTY);
> -            sb.append('|');
> -        }
>
> -        sb.append(hosts);
> +            sb.append(hosts);
> +
> +            final byte[] bytes = (sb.toString()).getBytes(UTF8);
> +            this.response = new DatagramPacket(bytes, bytes.length,
> this.address);
>
> -        final byte[] bytes = (sb.toString()).getBytes(UTF8);
> -        this.response = new DatagramPacket(bytes, bytes.length,
> this.address);
> +            if (log.isDebugEnabled()) {
> +                log.debug("MultiPulse packet is: " + sb);
> +            }
>
> -        if (log.isDebugEnabled()) {
> -            log.debug("MultiPulse packet is: " + sb);
> +            if (bytes.length > 2048) {
> +                log.warning("MultiPulse packet is larger than 2048 bytes,
> clients will not be able to read the packet" +
> +                            "\n - You should define the 'ignore' property
> to filter out unreachable addresses: " + sb);
> +            }
> +        } finally {
> +            l.unlock();
>          }
> +    }
>
> -        if (bytes.length > 2048) {
> -            log.warning("MultiPulse packet is larger than 2048 bytes,
> clients will not be able to read the packet" +
> -                        "\n - You should define the 'ignore' property to
> filter out unreachable addresses: " + sb);
> +    public DatagramPacket getResponsePacket() {
> +        final ReentrantLock l = this.lock;
> +        l.lock();
> +
> +        try {
> +            return this.response;
> +        } finally {
> +            l.unlock();
>          }
>      }
>
> @@ -191,6 +220,10 @@ public class MulticastPulseAgent impleme
>          this.listener = listener;
>      }
>
> +    public DiscoveryListener getDiscoveryListener() {
> +        return listener;
> +    }
> +
>      @Override
>      public void registerService(URI uri) throws IOException {
>
> @@ -267,7 +300,7 @@ public class MulticastPulseAgent impleme
>                      continue;
>                  }
>
> -                final Sender sender = new Sender(this, socketKey, socket,
> this.response);
> +                final Sender sender = new Sender(this, socketKey, socket);
>                  this.futures.add(executorService.submit(sender));
>
>                  this.futures.add(executorService.submit(new Runnable() {
> @@ -289,34 +322,64 @@ public class MulticastPulseAgent impleme
>
>                                      if (req.startsWith(CLIENT)) {
>
> -                                        req = (req.replace(CLIENT, ""));
> +                                        final int ix =
> req.indexOf(BADURI);
> +                                        String badUri = null;
> +
> +                                        if (ix > 0) {
> +                                            //The client is notifying of
> a bad uri
> +                                            badUri =
> req.substring(ix).replace(BADURI, "");
> +                                            req = req.substring(0,
> ix).replace(CLIENT, "");
> +                                        } else {
> +                                            req = (req.replace(CLIENT,
> ""));
> +                                        }
>
> +                                        //Is this a group or global pulse
> request
>                                          if (mpg.equals(req) ||
> "*".equals(req)) {
>
> -                                            final String client =
> ((InetSocketAddress) sa).getAddress().getHostAddress();
> +                                            //Is there a bad url and is
> it this agent broadcasting the bad URI?
> +                                            if (null != badUri &&
> getHosts(MulticastPulseAgent.this.ignore).contains(badUri)) {
> +                                                final ReentrantLock l =
> MulticastPulseAgent.this.lock;
> +                                                l.lock();
> +
> +                                                try {
> +                                                    //Remove it and
> rebuild our broadcast packet
> +                                                    if
> (MulticastPulseAgent.this.ignore.add(badUri)) {
> +
>  MulticastPulseAgent.this.buildPacket();
> +
> +
>  MulticastPulseAgent.this.fireEvent(URI.create("OpenEJB" + BADURI +
> badUri), false);
> +
> +                                                        log.warning("This
> server has removed the unreachable host '" + badUri + "' from discovery,
> you should consider adding" +
> +                                                                    "
> this to the 'ignore' property in the multipulse.properties file");
> +                                                    }
> +
> +                                                } finally {
> +                                                    l.unlock();
> +                                                }
> +                                            } else {
>
> -                                            if (isLoopBackOnly) {
> -                                                //We only have local
> services, so make sure the request is from a local source else ignore it
> -                                                if
> (!MulticastPulseAgent.isLocalAddress(client, false)) {
> +                                                //Normal client multicast
> pulse request
> +                                                final String client =
> ((InetSocketAddress) sa).getAddress().getHostAddress();
> +
> +                                                if (isLoopBackOnly &&
> !MulticastPulseAgent.isLocalAddress(client, false)) {
> +                                                    //We only have local
> services, so make sure the request is from a local source else ignore it
>                                                      if
> (log.isDebugEnabled()) {
>
>  log.debug(String.format("Ignoring remote client %1$s pulse request for
> group: %2$s - No remote services available",
>
>        client,
>
>        req));
>                                                      }
> -                                                    return;
> -                                                }
> -                                            }
> +                                                } else {
>
> -                                            //We have received a valid
> pulse request
> -                                            if (log.isDebugEnabled()) {
> -
>  log.debug(String.format("Answering client '%1$s' pulse request for group:
> '%2$s' on '%3$s'", client, req, socketKey));
> -                                            }
> +                                                    //We have received a
> valid pulse request
> +                                                    if
> (log.isDebugEnabled()) {
> +
>  log.debug(String.format("Answering client '%1$s' pulse request for group:
> '%2$s' on '%3$s'", client, req, socketKey));
> +                                                    }
>
> -                                            //Renew response pulse
> -                                            sender.pulseResponse();
> +                                                    //Renew response pulse
> +
>  sender.pulseResponse();
> +                                                }
> +                                            }
>                                          }
>                                      }
> -
>                                  }
>
>                              } catch (Exception e) {
> @@ -412,6 +475,27 @@ public class MulticastPulseAgent impleme
>      }
>
>      /**
> +     * Lists current broadcast hosts as a comma separated list.
> +     * Used principally for testing.
> +     *
> +     * @return String
> +     */
> +    public String getHosts() {
> +        return getHosts(this.ignore);
> +    }
> +
> +    /**
> +     * Remove a host from the ignore list.
> +     * Used principally for testing.
> +     *
> +     * @param host String
> +     * @return True if removed, else false
> +     */
> +    public boolean removeFromIgnore(final String host) {
> +        return this.ignore.remove(host);
> +    }
> +
> +    /**
>       * Attempts to return at least one socket per valid network interface.
>       * If no valid interface is found then the array will be empty.
>       *
> @@ -612,13 +696,11 @@ public class MulticastPulseAgent impleme
>          private final MulticastPulseAgent agent;
>          private final String socketKey;
>          private final MulticastSocket socket;
> -        private final DatagramPacket mpr;
>
> -        private Sender(final MulticastPulseAgent agent, final String
> socketKey, final MulticastSocket socket, final DatagramPacket mpr) {
> +        private Sender(final MulticastPulseAgent agent, final String
> socketKey, final MulticastSocket socket) {
>              this.agent = agent;
>              this.socketKey = socketKey;
>              this.socket = socket;
> -            this.mpr = mpr;
>          }
>
>          @Override
> @@ -640,7 +722,7 @@ public class MulticastPulseAgent impleme
>                  while (this.counter.decrementAndGet() > 0) {
>
>                      try {
> -                        this.socket.send(this.mpr);
> +                        this.socket.send(this.agent.getResponsePacket());
>                      } catch (Exception e) {
>                          if (log.isDebugEnabled()) {
>                              log.debug("MulticastPulseAgent client error:
> " + e.getMessage(), e);
>
> Modified:
> tomee/tomee/trunk/server/openejb-multicast/src/test/java/org/apache/openejb/server/discovery/MulticastPulseAgentTest.java
> URL:
> http://svn.apache.org/viewvc/tomee/tomee/trunk/server/openejb-multicast/src/test/java/org/apache/openejb/server/discovery/MulticastPulseAgentTest.java?rev=1549891&r1=1549890&r2=1549891&view=diff
>
> ==============================================================================
> ---
> tomee/tomee/trunk/server/openejb-multicast/src/test/java/org/apache/openejb/server/discovery/MulticastPulseAgentTest.java
> (original)
> +++
> tomee/tomee/trunk/server/openejb-multicast/src/test/java/org/apache/openejb/server/discovery/MulticastPulseAgentTest.java
> Tue Dec 10 16:54:47 2013
> @@ -48,11 +48,6 @@ import java.util.concurrent.TimeUnit;
>  import java.util.concurrent.atomic.AtomicBoolean;
>  import java.util.concurrent.locks.ReentrantLock;
>
> -/**
> - * Copyright (c) ORPRO Vision GmbH.
> - * Author: Andy Gumbrecht
> - * Date: 11.06.12
> - */
>  @SuppressWarnings("UseOfSystemOutOrSystemErr")
>  public class MulticastPulseAgentTest {
>
> @@ -229,7 +224,10 @@ public class MulticastPulseAgentTest {
>                                      final String[] serviceList =
> services.split("\\|");
>                                      final String[] hosts = s.split(",");
>
> -                                    System.out.println(String.format("\n"
> + name + " received Server pulse:\n\tGroup: %1$s\n\tServices:
> %2$s\n\tServer: %3$s\n", group, services, s));
> +                                    System.out.println(String.format("\n"
> + name + " received Server pulse:\n\tGroup: %1$s\n\tServices:
> %2$s\n\tServer: %3$s\n",
> +
> group,
> +
> services,
> +                                                                     s));
>
>                                      for (final String svc : serviceList) {
>
> @@ -392,6 +390,69 @@ public class MulticastPulseAgentTest {
>          org.junit.Assert.assertTrue(timeout == 1 || set.size() > 0);
>      }
>
> +    @Test
> +    public void testBroadcastBadUri() throws Exception {
> +
> +        final DiscoveryListener original = agent.getDiscoveryListener();
> +
> +        final CountDownLatch latch = new CountDownLatch(1);
> +
> +        final DiscoveryListener listener = new DiscoveryListener() {
> +            @Override
> +            public void serviceAdded(final URI service) {
> +                latch.countDown();
> +                System.out.println("added = " + service);
> +            }
> +
> +            @Override
> +            public void serviceRemoved(final URI service) {
> +                latch.countDown();
> +                System.out.println("removed = " + service);
> +            }
> +        };
> +
> +        agent.setDiscoveryListener(listener);
> +
> +        final String[] hosts = agent.getHosts().split(",");
> +        final String host = hosts[hosts.length - 1];
> +
> +        final Future<?> future = executor.submit(new Runnable() {
> +            @Override
> +            public void run() {
> +                try {
> +                    final InetAddress ia =
> getAddress(MulticastPulseAgentTest.host);
> +
> +                    final byte[] bytes = (MulticastPulseAgent.CLIENT +
> forGroup + MulticastPulseAgent.BADURI +
> host).getBytes(Charset.forName("UTF-8"));
> +                    final DatagramPacket request = new
> DatagramPacket(bytes, bytes.length, new InetSocketAddress(ia, port));
> +
> +                    final MulticastSocket[] multicastSockets =
> MulticastPulseAgent.getSockets(MulticastPulseAgentTest.host, port);
> +
> +                    for (final MulticastSocket socket : multicastSockets)
> {
> +
> +                        try {
> +                            socket.send(request);
> +                        } catch (Exception e) {
> +                            System.out.println("Failed to broadcast bad
> URI on: " + socket.getInterface().getHostAddress());
> +                            e.printStackTrace();
> +                        }
> +                    }
> +                } catch (Exception e) {
> +                    System.out.println("Failed to broadcast bad URI");
> +                    e.printStackTrace();
> +                }
> +            }
> +        });
> +
> +        final Object o = future.get(10, TimeUnit.SECONDS);
> +
> +        final boolean await = latch.await(20, TimeUnit.SECONDS);
> +        final boolean removed = agent.removeFromIgnore(host);
> +
> +        agent.setDiscoveryListener(original);
> +
> +        org.junit.Assert.assertTrue("Failed to remove host", removed &&
> await);
> +    }
> +
>      private String ipFormat(final String h) throws UnknownHostException {
>
>          final InetAddress ia = InetAddress.getByName(h);
> @@ -402,6 +463,20 @@ public class MulticastPulseAgentTest {
>          }
>      }
>
> +    private static InetAddress getAddress(final String host) throws
> Exception {
> +        final InetAddress ia;
> +        try {
> +            ia = InetAddress.getByName(host);
> +        } catch (UnknownHostException e) {
> +            throw new Exception(host + " is not a valid address", e);
> +        }
> +
> +        if (null == ia || !ia.isMulticastAddress()) {
> +            throw new Exception(host + " is not a valid multicast
> address");
> +        }
> +        return ia;
> +    }
> +
>      private static class MyDiscoveryListener implements DiscoveryListener
> {
>
>          private final String id;
>
> Modified: tomee/tomee/trunk/server/openejb-server/pom.xml
> URL:
> http://svn.apache.org/viewvc/tomee/tomee/trunk/server/openejb-server/pom.xml?rev=1549891&r1=1549890&r2=1549891&view=diff
>
> ==============================================================================
> --- tomee/tomee/trunk/server/openejb-server/pom.xml (original)
> +++ tomee/tomee/trunk/server/openejb-server/pom.xml Tue Dec 10 16:54:47
> 2013
> @@ -65,6 +65,11 @@
>      </dependency>
>      <dependency>
>        <groupId>org.apache.openejb</groupId>
> +      <artifactId>openejb-jee</artifactId>
> +      <version>${project.version}</version>
> +    </dependency>
> +    <dependency>
> +      <groupId>org.apache.openejb</groupId>
>        <artifactId>openejb-loader</artifactId>
>        <version>${project.version}</version>
>      </dependency>
> @@ -84,6 +89,10 @@
>      </dependency>
>      <dependency>
>        <groupId>org.apache.xbean</groupId>
> +      <artifactId>xbean-finder-shaded</artifactId>
> +    </dependency>
> +    <dependency>
> +      <groupId>org.apache.xbean</groupId>
>        <artifactId>xbean-reflect</artifactId>
>      </dependency>
>      <dependency>
>
> Modified:
> tomee/tomee/trunk/server/openejb-server/src/main/java/org/apache/openejb/server/SimpleServiceManager.java
> URL:
> http://svn.apache.org/viewvc/tomee/tomee/trunk/server/openejb-server/src/main/java/org/apache/openejb/server/SimpleServiceManager.java?rev=1549891&r1=1549890&r2=1549891&view=diff
>
> ==============================================================================
> ---
> tomee/tomee/trunk/server/openejb-server/src/main/java/org/apache/openejb/server/SimpleServiceManager.java
> (original)
> +++
> tomee/tomee/trunk/server/openejb-server/src/main/java/org/apache/openejb/server/SimpleServiceManager.java
> Tue Dec 10 16:54:47 2013
> @@ -155,7 +155,7 @@ public class SimpleServiceManager extend
>      @Override
>      public synchronized void start(final boolean block) throws
> ServiceException {
>
> -        if(stopped){
> +        if (stopped) {
>              throw new ServiceException("Stop has already been called on
> ServiceManager");
>          }
>
>
> Modified:
> tomee/tomee/trunk/server/openejb-server/src/test/java/org/apache/openejb/server/FilteredServiceManagerWithAdminTest.java
> URL:
> http://svn.apache.org/viewvc/tomee/tomee/trunk/server/openejb-server/src/test/java/org/apache/openejb/server/FilteredServiceManagerWithAdminTest.java?rev=1549891&r1=1549890&r2=1549891&view=diff
>
> ==============================================================================
> ---
> tomee/tomee/trunk/server/openejb-server/src/test/java/org/apache/openejb/server/FilteredServiceManagerWithAdminTest.java
> (original)
> +++
> tomee/tomee/trunk/server/openejb-server/src/test/java/org/apache/openejb/server/FilteredServiceManagerWithAdminTest.java
> Tue Dec 10 16:54:47 2013
> @@ -40,12 +40,12 @@ public class FilteredServiceManagerWithA
>      }
>
>      @Test
> -    public void numberOfServices () {
> +    public void numberOfServices() {
>          // when using @EnableServices with the application composer
>          // the return value should be a FilteredServiceManager
>          assertEquals(FilteredServiceManager.class,
> ServiceManager.get().getClass());
>
> -        FilteredServiceManager manager = (FilteredServiceManager)
> ServiceManager.get();
> +        final FilteredServiceManager manager = (FilteredServiceManager)
> ServiceManager.get();
>          assertEquals(1, manager.getDaemons().length);
>          assertEquals("admin", manager.getDaemons()[0].getName());
>      }
>
>
>