You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@tomee.apache.org by an...@apache.org on 2013/10/09 14:35:05 UTC

svn commit: r1530583 - in /tomee/tomee/trunk/server: openejb-client/src/main/java/org/apache/openejb/client/ openejb-multicast/src/main/java/org/apache/openejb/server/discovery/ openejb-multicast/src/test/java/org/apache/openejb/server/discovery/

Author: andygumbrecht
Date: Wed Oct  9 12:35:05 2013
New Revision: 1530583

URL: http://svn.apache.org/r1530583
Log:
Fix https://issues.apache.org/jira/browse/OPENEJB-2043

Modified:
    tomee/tomee/trunk/server/openejb-client/src/main/java/org/apache/openejb/client/MulticastPulseClient.java
    tomee/tomee/trunk/server/openejb-multicast/src/main/java/org/apache/openejb/server/discovery/MulticastPulseAgent.java
    tomee/tomee/trunk/server/openejb-multicast/src/test/java/org/apache/openejb/server/discovery/MulticastPulseAgentTest.java

Modified: tomee/tomee/trunk/server/openejb-client/src/main/java/org/apache/openejb/client/MulticastPulseClient.java
URL: http://svn.apache.org/viewvc/tomee/tomee/trunk/server/openejb-client/src/main/java/org/apache/openejb/client/MulticastPulseClient.java?rev=1530583&r1=1530582&r2=1530583&view=diff
==============================================================================
--- tomee/tomee/trunk/server/openejb-client/src/main/java/org/apache/openejb/client/MulticastPulseClient.java (original)
+++ tomee/tomee/trunk/server/openejb-client/src/main/java/org/apache/openejb/client/MulticastPulseClient.java Wed Oct  9 12:35:05 2013
@@ -35,6 +35,7 @@ import java.util.concurrent.Future;
 import java.util.concurrent.TimeUnit;
 import java.util.concurrent.atomic.AtomicBoolean;
 import java.util.concurrent.locks.ReentrantLock;
+import java.util.logging.Level;
 import java.util.logging.Logger;
 
 /**
@@ -66,8 +67,24 @@ public class MulticastPulseClient extend
     private static final int TTL = Integer.parseInt(System.getProperty(ORG_APACHE_OPENEJB_MULTIPULSE_TTL, "32"));
     private static final int LIMIT = Integer.parseInt(System.getProperty(ORG_APACHE_OPENEJB_MULTIPULSE_URI_LIMIT, "50000"));
     private static final Map<URI, Set<URI>> knownUris = new HashMap<URI, Set<URI>>();
-    private static final NetworkInterface[] interfaces = getNetworkInterfaces();
-    private static final ExecutorService executor = Executors.newFixedThreadPool(interfaces.length + 1);
+    private static NetworkInterface[] interfaces = getNetworkInterfaces();
+    private static ExecutorService executor = null;
+
+    private static synchronized NetworkInterface[] getInterfaces() {
+        if (null == interfaces) {
+            interfaces = getNetworkInterfaces();
+        }
+
+        return interfaces;
+    }
+
+    private static synchronized ExecutorService getExecutorService() {
+        if (null == executor) {
+            executor = Executors.newFixedThreadPool(getInterfaces().length + 2);
+        }
+
+        return executor;
+    }
 
     /**
      * @param uri Connection URI
@@ -117,7 +134,7 @@ public class MulticastPulseClient extend
             try {
                 //Strip serverhost and group and try to connect
                 return ConnectionManager.getConnection(URI.create(URI.create(serviceURI.getSchemeSpecificPart()).getSchemeSpecificPart()));
-            } catch (Throwable e) {
+            } catch (Exception e) {
                 uriSet.remove(serviceURI);
             }
         }
@@ -200,7 +217,7 @@ public class MulticastPulseClient extend
 
                     //Compare URI hosts
                     int i = compare(u1.getHost(), u2.getHost());
-                    if (i == 0) {
+                    if (i != 0) {
                         i = uri1.compareTo(uri2);
                     }
 
@@ -223,7 +240,7 @@ public class MulticastPulseClient extend
                         } else if (0 != h1.compareTo(h2)) {
                             return -1;
                         }
-                    } catch (Throwable e) {
+                    } catch (Exception e) {
                         //Ignore
                     }
 
@@ -240,7 +257,7 @@ public class MulticastPulseClient extend
 
             for (final MulticastSocket socket : clientSocketsFinal) {
 
-                futures.add(executor.submit(new Runnable() {
+                futures.add(getExecutorService().submit(new Runnable() {
                     @Override
                     public void run() {
                         try {
@@ -258,6 +275,10 @@ public class MulticastPulseClient extend
 
                                         int len = response.getLength();
                                         if (len > 2048) {
+
+                                            if (log.isLoggable(Level.FINE)) {
+                                                log.log(Level.FINE, "Truncating multipulse length {0} to 2048", new Object[]{len});
+                                            }
                                             len = 2048;
                                         }
 
@@ -288,7 +309,7 @@ public class MulticastPulseClient extend
                                                 final URI serviceUri;
                                                 try {
                                                     serviceUri = URI.create(svc);
-                                                } catch (Throwable e) {
+                                                } catch (Exception e) {
                                                     continue;
                                                 }
 
@@ -327,7 +348,7 @@ public class MulticastPulseClient extend
                                                             //Just add as is
                                                             set.add(URI.create(svcfull));
                                                         }
-                                                    } catch (Throwable e) {
+                                                    } catch (Exception e) {
                                                         //Ignore
                                                     } finally {
                                                         setLock.unlock();
@@ -337,19 +358,19 @@ public class MulticastPulseClient extend
                                         }
                                     }
 
-                                } catch (Throwable e) {
+                                } catch (Exception e) {
                                     //Ignore
                                 }
                             }
                         } finally {
                             try {
                                 socket.leaveGroup(ia);
-                            } catch (Throwable e) {
+                            } catch (Exception e) {
                                 //Ignore
                             }
                             try {
                                 socket.close();
-                            } catch (Throwable e) {
+                            } catch (Exception e) {
                                 //Ignore
                             }
                         }
@@ -361,8 +382,9 @@ public class MulticastPulseClient extend
                 //Give listener threads a reasonable amount of time to start
                 if (latchListeners.await(5, TimeUnit.SECONDS)) {
 
-                    //Start pulsing request every 20ms - This will ensure we have at least 2 pulses within our minimum timeout
-                    futures.add(0, executor.submit(new Runnable() {
+                    //Start pulsing client request every 10ms - This will ensure we have at least 4 client pulses within our minimum timeout
+                    //This pulse is designed to tell a listening server to wake up and pulse back a response
+                    futures.add(0, getExecutorService().submit(new Runnable() {
                         @Override
                         public void run() {
                             while (running.get()) {
@@ -372,7 +394,7 @@ public class MulticastPulseClient extend
                                     if (running.get()) {
                                         try {
                                             socket.send(request);
-                                        } catch (Throwable e) {
+                                        } catch (Exception e) {
                                             //Ignore
                                         }
                                     } else {
@@ -382,7 +404,7 @@ public class MulticastPulseClient extend
 
                                 if (running.get()) {
                                     try {
-                                        Thread.sleep(20);
+                                        Thread.sleep(10);
                                     } catch (InterruptedException e) {
                                         break;
                                     }
@@ -414,12 +436,12 @@ public class MulticastPulseClient extend
 
                         try {
                             socket.leaveGroup(ia);
-                        } catch (Throwable e) {
+                        } catch (Exception e) {
                             //Ignore
                         }
                         try {
                             socket.close();
-                        } catch (Throwable e) {
+                        } catch (Exception e) {
                             //Ignore
                         }
                     }
@@ -430,7 +452,7 @@ public class MulticastPulseClient extend
             for (final Future future : futures) {
                 try {
                     future.get();
-                } catch (Throwable e) {
+                } catch (Exception e) {
                     //Ignore
                 }
             }
@@ -446,12 +468,12 @@ public class MulticastPulseClient extend
 
                 try {
                     socket.leaveGroup(ia);
-                } catch (Throwable e) {
+                } catch (Exception e) {
                     //Ignore
                 }
                 try {
                     socket.close();
-                } catch (Throwable e) {
+                } catch (Exception e) {
                     //Ignore
                 }
             }
@@ -501,7 +523,7 @@ public class MulticastPulseClient extend
 
         final ArrayList<MulticastSocket> list = new ArrayList<MulticastSocket>();
 
-        for (final NetworkInterface ni : interfaces) {
+        for (final NetworkInterface ni : getInterfaces()) {
 
             MulticastSocket ms = null;
 
@@ -518,12 +540,12 @@ public class MulticastPulseClient extend
 
                 list.add(ms);
 
-            } catch (Throwable e) {
+            } catch (Exception e) {
 
                 if (null != ms) {
                     try {
                         ms.close();
-                    } catch (Throwable t) {
+                    } catch (Exception t) {
                         //Ignore
                     }
                 }
@@ -614,11 +636,14 @@ public class MulticastPulseClient extend
                     Set<URI> uriSet = null;
                     try {
                         uriSet = MulticastPulseClient.discoverURIs(discover, new HashSet<String>(Arrays.asList("ejbd", "ejbds", "http", "https")), mchost, mcport, timeout);
-                    } catch (Throwable e) {
+                    } catch (Exception e) {
                         System.err.println(e.getMessage());
                     }
 
-                    if (uriSet != null && uriSet.size() > 0) {
+                    final int size = uriSet.size();
+                    if (uriSet != null && size > 0) {
+
+                        final int st = (timeout / size);
 
                         for (final URI uri : uriSet) {
 
@@ -636,22 +661,24 @@ public class MulticastPulseClient extend
                                 continue;
                             }
 
+                            System.out.print(server + ":" + group + " - " + uriSub.toASCIIString() + " is reachable: ");
+
                             boolean b = false;
                             final Socket s = new Socket();
                             try {
-                                s.connect(new InetSocketAddress(host, port), 500);
+                                s.connect(new InetSocketAddress(host, port), st);
                                 b = true;
-                            } catch (Throwable e) {
+                            } catch (Exception e) {
                                 //Ignore
                             } finally {
                                 try {
                                     s.close();
-                                } catch (Throwable e) {
+                                } catch (Exception e) {
                                     //Ignore
                                 }
                             }
 
-                            System.out.println(server + ":" + group + " - " + uriSub.toASCIIString() + " is reachable: " + b);
+                            System.out.println(b);
                         }
                     } else {
                         System.out.println("### Failed to discover server: " + discover);

Modified: tomee/tomee/trunk/server/openejb-multicast/src/main/java/org/apache/openejb/server/discovery/MulticastPulseAgent.java
URL: http://svn.apache.org/viewvc/tomee/tomee/trunk/server/openejb-multicast/src/main/java/org/apache/openejb/server/discovery/MulticastPulseAgent.java?rev=1530583&r1=1530582&r2=1530583&view=diff
==============================================================================
--- tomee/tomee/trunk/server/openejb-multicast/src/main/java/org/apache/openejb/server/discovery/MulticastPulseAgent.java (original)
+++ tomee/tomee/trunk/server/openejb-multicast/src/main/java/org/apache/openejb/server/discovery/MulticastPulseAgent.java Wed Oct  9 12:35:05 2013
@@ -59,7 +59,7 @@ public class MulticastPulseAgent impleme
 
     private static final Logger log = Logger.getInstance(LogCategory.OPENEJB_SERVER.createChild("discovery").createChild("multipulse"), MulticastPulseAgent.class);
     private static final NetworkInterface[] interfaces = getNetworkInterfaces();
-    private static final ExecutorService executor = Executors.newFixedThreadPool(interfaces.length + 1);
+    private static final ExecutorService executor = Executors.newFixedThreadPool((interfaces.length + 2) * 2);
     private static final Charset UTF8 = Charset.forName("UTF-8");
     private static final int TTL = Integer.parseInt(System.getProperty("org.apache.openejb.multipulse.ttl", "32"));
 
@@ -82,9 +82,9 @@ public class MulticastPulseAgent impleme
     private boolean loopbackOnly = true;
 
     /**
-     * This agent listens for a client pulse on a defined multicast channel.
+     * This agent listens for client pulses on a defined multicast channel.
      * On receipt of a valid pulse the agent responds with its own pulse for
-     * a defined amount of time. A client can deliver a pulse as often as
+     * a defined amount of time and rate. A client can deliver a pulse as often as
      * required until it is happy of the server response.
      * <p/>
      * Both server and client deliver crafted information payloads.
@@ -112,7 +112,7 @@ public class MulticastPulseAgent impleme
             log.warning("Invalid ignore parameter. Should be a lowercase single or comma seperated list like: ignore=host1,host2");
         }
 
-        this.multicast = p.getProperty("bind", this.multicast);
+        this.multicast = o.get("bind", this.multicast);
         this.port = o.get("port", this.port);
         this.group = o.get("group", this.group);
 
@@ -225,6 +225,9 @@ public class MulticastPulseAgent impleme
             }
 
             final CountDownLatch latch = new CountDownLatch(this.sockets.length);
+            final String mpg = MulticastPulseAgent.this.group;
+            final boolean isLoopBackOnly = MulticastPulseAgent.this.loopbackOnly;
+            final DatagramPacket mpr = MulticastPulseAgent.this.response;
 
             for (final MulticastSocket socket : this.sockets) {
 
@@ -243,34 +246,67 @@ public class MulticastPulseAgent impleme
 
                                 if (null != sa) {
 
-                                    String s = new String(request.getData(), 0, request.getLength());
+                                    final String req = new String(request.getData(), 0, request.getLength());
 
-                                    if (s.startsWith(CLIENT)) {
-
-                                        s = (s.replace(CLIENT, ""));
-
-                                        final String client = ((InetSocketAddress) sa).getAddress().getHostAddress();
-
-                                        if (MulticastPulseAgent.this.group.equals(s) || "*".equals(s)) {
-
-                                            if (MulticastPulseAgent.this.loopbackOnly) {
-                                                //We only have local services, so make sure the request is from a local source else ignore it
-                                                if (!MulticastPulseAgent.isLocalAddress(client, false)) {
-                                                    log.debug(String.format("Ignoring remote client %1$s pulse request for group: %2$s - No remote services available", client, s));
-                                                    continue;
+                                    executor.execute(new Runnable() {
+                                        @Override
+                                        public void run() {
+
+                                            String s = req;
+
+                                            if (s.startsWith(CLIENT)) {
+
+                                                s = (s.replace(CLIENT, ""));
+
+                                                if (mpg.equals(s) || "*".equals(s)) {
+
+                                                    final String client = ((InetSocketAddress) sa).getAddress().getHostAddress();
+
+                                                    if (isLoopBackOnly) {
+                                                        //We only have local services, so make sure the request is from a local source else ignore it
+                                                        if (!MulticastPulseAgent.isLocalAddress(client, false)) {
+                                                            if (log.isDebugEnabled()) {
+                                                                log.debug(String.format("Ignoring remote client %1$s pulse request for group: %2$s - No remote services available",
+                                                                                        client,
+                                                                                        s));
+                                                            }
+                                                            return;
+                                                        }
+                                                    }
+
+                                                    if (log.isDebugEnabled()) {
+                                                        log.debug(String.format("Answering client %1$s pulse request for group: %2$s", client, s));
+                                                    }
+
+                                                    //This is a valid client request for the server to respond on the same channel.
+                                                    //Because multicast is not guaranteed we will send 3 responses per valid request at 10ms intervals.
+                                                    for (int i = 0; i < 3; i++) {
+
+                                                        try {
+                                                            socket.send(mpr);
+                                                        } catch (Exception e) {
+                                                            if (log.isDebugEnabled()) {
+                                                                log.debug("MulticastPulseAgent client error: " + e.getMessage(), e);
+                                                            }
+                                                        }
+
+                                                        try {
+                                                            Thread.sleep(10);
+                                                        } catch (InterruptedException e) {
+                                                            break;
+                                                        }
+                                                    }
                                                 }
                                             }
-
-                                            log.debug(String.format("Answering client %1$s pulse request for group: %2$s", client, s));
-                                            socket.send(MulticastPulseAgent.this.response);
-                                        } else {
-                                            log.debug(String.format("Ignoring client %1$s pulse request for group: %2$s", client, s));
                                         }
-                                    }
+                                    });
+
                                 }
 
-                            } catch (Throwable e) {
-                                //Ignore
+                            } catch (Exception e) {
+                                if (log.isDebugEnabled()) {
+                                    log.debug("MulticastPulseAgent request error: " + e.getMessage(), e);
+                                }
                             }
                         }
 

Modified: tomee/tomee/trunk/server/openejb-multicast/src/test/java/org/apache/openejb/server/discovery/MulticastPulseAgentTest.java
URL: http://svn.apache.org/viewvc/tomee/tomee/trunk/server/openejb-multicast/src/test/java/org/apache/openejb/server/discovery/MulticastPulseAgentTest.java?rev=1530583&r1=1530582&r2=1530583&view=diff
==============================================================================
--- tomee/tomee/trunk/server/openejb-multicast/src/test/java/org/apache/openejb/server/discovery/MulticastPulseAgentTest.java (original)
+++ tomee/tomee/trunk/server/openejb-multicast/src/test/java/org/apache/openejb/server/discovery/MulticastPulseAgentTest.java Wed Oct  9 12:35:05 2013
@@ -141,7 +141,7 @@ public class MulticastPulseAgentTest {
 
                 //Compare URI hosts
                 int i = compare(u1.getHost(), u2.getHost());
-                if (i == 0) {
+                if (i != 0) {
                     i = uri1.compareTo(uri2);
                 }