You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@tomee.apache.org by an...@apache.org on 2013/11/19 15:36:26 UTC

svn commit: r1543440 - in /tomee/tomee/trunk/server: openejb-client/src/main/java/org/apache/openejb/client/MulticastPulseClient.java openejb-multicast/src/main/java/org/apache/openejb/server/discovery/MulticastPulseAgent.java

Author: andygumbrecht
Date: Tue Nov 19 14:36:26 2013
New Revision: 1543440

URL: http://svn.apache.org/r1543440
Log:
Improve multipulse discovery - Server agent now pulses response more efficiently.

Modified:
    tomee/tomee/trunk/server/openejb-client/src/main/java/org/apache/openejb/client/MulticastPulseClient.java
    tomee/tomee/trunk/server/openejb-multicast/src/main/java/org/apache/openejb/server/discovery/MulticastPulseAgent.java

Modified: tomee/tomee/trunk/server/openejb-client/src/main/java/org/apache/openejb/client/MulticastPulseClient.java
URL: http://svn.apache.org/viewvc/tomee/tomee/trunk/server/openejb-client/src/main/java/org/apache/openejb/client/MulticastPulseClient.java?rev=1543440&r1=1543439&r2=1543440&view=diff
==============================================================================
--- tomee/tomee/trunk/server/openejb-client/src/main/java/org/apache/openejb/client/MulticastPulseClient.java (original)
+++ tomee/tomee/trunk/server/openejb-client/src/main/java/org/apache/openejb/client/MulticastPulseClient.java Tue Nov 19 14:36:26 2013
@@ -18,7 +18,9 @@ import java.net.UnknownHostException;
 import java.nio.charset.Charset;
 import java.util.ArrayList;
 import java.util.Arrays;
+import java.util.Collections;
 import java.util.Comparator;
+import java.util.ConcurrentModificationException;
 import java.util.Enumeration;
 import java.util.HashMap;
 import java.util.HashSet;
@@ -79,8 +81,15 @@ public class MulticastPulseClient extend
     }
 
     private static synchronized ExecutorService getExecutorService() {
+
         if (null == executor) {
-            executor = Executors.newFixedThreadPool(getInterfaces().length + 2);
+
+            int length = getInterfaces().length;
+            if (length < 1) {
+                length = 1;
+            }
+
+            executor = Executors.newFixedThreadPool(length * 2);
         }
 
         return executor;
@@ -194,6 +203,7 @@ public class MulticastPulseClient extend
         final DatagramPacket request = new DatagramPacket(bytes, bytes.length, new InetSocketAddress(ia, port));
 
         final AtomicBoolean running = new AtomicBoolean(true);
+        final List<Future> futures = Collections.synchronizedList(new ArrayList<Future>());
 
         MulticastSocket[] clientSockets = null;
 
@@ -252,7 +262,6 @@ public class MulticastPulseClient extend
 
             //Start threads that listen for multicast packets on our channel.
             //These need to start 'before' we pulse a request.
-            final ArrayList<Future> futures = new ArrayList<Future>();
             final CountDownLatch latchListeners = new CountDownLatch(clientSocketsFinal.length);
 
             for (final MulticastSocket socket : clientSocketsFinal) {
@@ -380,7 +389,7 @@ public class MulticastPulseClient extend
 
             try {
                 //Give listener threads a reasonable amount of time to start
-                if (latchListeners.await(5, TimeUnit.SECONDS)) {
+                if (latchListeners.await(clientSocketsFinal.length * 2, TimeUnit.SECONDS)) {
 
                     //Start pulsing client request every 10ms - This will ensure we have at least 4 client pulses within our minimum timeout
                     //This pulse is designed to tell a listening server to wake up and pulse back a response
@@ -428,23 +437,14 @@ public class MulticastPulseClient extend
 
                     running.set(false);
 
-                    for (final Future future : futures) {
-                        future.cancel(true);
-                    }
-
-                    for (final MulticastSocket socket : clientSocketsFinal) {
-
-                        try {
-                            socket.leaveGroup(ia);
-                        } catch (Exception e) {
-                            //Ignore
-                        }
-                        try {
-                            socket.close();
-                        } catch (Exception e) {
-                            //Ignore
+                    try {
+                        for (final Future future : futures) {
+                            future.cancel(true);
                         }
+                    } catch (ConcurrentModificationException e) {
+                        //Ignore
                     }
+
                 }
             }, timeout);
 
@@ -464,6 +464,18 @@ public class MulticastPulseClient extend
                 setLock.unlock();
             }
         } finally {
+
+            //Just to be sure we are clean
+            for (final Future future : futures) {
+                try {
+                    future.cancel(true);
+                } catch (Exception e) {
+                    //Ignore
+                }
+            }
+
+            futures.clear();
+
             for (final MulticastSocket socket : clientSockets) {
 
                 try {

Modified: tomee/tomee/trunk/server/openejb-multicast/src/main/java/org/apache/openejb/server/discovery/MulticastPulseAgent.java
URL: http://svn.apache.org/viewvc/tomee/tomee/trunk/server/openejb-multicast/src/main/java/org/apache/openejb/server/discovery/MulticastPulseAgent.java?rev=1543440&r1=1543439&r2=1543440&view=diff
==============================================================================
--- tomee/tomee/trunk/server/openejb-multicast/src/main/java/org/apache/openejb/server/discovery/MulticastPulseAgent.java (original)
+++ tomee/tomee/trunk/server/openejb-multicast/src/main/java/org/apache/openejb/server/discovery/MulticastPulseAgent.java Tue Nov 19 14:36:26 2013
@@ -38,6 +38,7 @@ import java.util.concurrent.Executors;
 import java.util.concurrent.Future;
 import java.util.concurrent.TimeUnit;
 import java.util.concurrent.atomic.AtomicBoolean;
+import java.util.concurrent.atomic.AtomicInteger;
 
 /**
  * Licensed to the Apache Software Foundation (ASF) under one or more
@@ -58,8 +59,8 @@ import java.util.concurrent.atomic.Atomi
 public class MulticastPulseAgent implements DiscoveryAgent, ServerService, SelfManaging {
 
     private static final Logger log = Logger.getInstance(LogCategory.OPENEJB_SERVER.createChild("discovery").createChild("multipulse"), MulticastPulseAgent.class);
-    private static final NetworkInterface[] interfaces = getNetworkInterfaces();
-    private static final ExecutorService executor = Executors.newFixedThreadPool((interfaces.length + 2) * 2);
+    private static NetworkInterface[] interfaces = null;
+    private static ExecutorService executor = null;
     private static final Charset UTF8 = Charset.forName("UTF-8");
     private static final int TTL = Integer.parseInt(System.getProperty("org.apache.openejb.multipulse.ttl", "32"));
 
@@ -71,6 +72,7 @@ public class MulticastPulseAgent impleme
     private final Set<URI> uriSet = new HashSet<URI>();
     private AtomicBoolean running = new AtomicBoolean(false);
     final ArrayList<Future> futures = new ArrayList<Future>();
+    final ArrayList<Future> senders = new ArrayList<Future>();
     private MulticastSocket[] sockets = null;
     private InetSocketAddress address = null;
 
@@ -92,6 +94,29 @@ public class MulticastPulseAgent impleme
     public MulticastPulseAgent() {
     }
 
+    private static synchronized NetworkInterface[] getInterfaces() {
+        if (null == interfaces) {
+            interfaces = getNetworkInterfaces();
+        }
+
+        return interfaces;
+    }
+
+    private static synchronized ExecutorService getExecutorService() {
+
+        if (null == executor) {
+
+            int length = getNetworkInterfaces().length;
+            if (length < 1) {
+                length = 1;
+            }
+
+            executor = Executors.newFixedThreadPool(length * 2);
+        }
+
+        return executor;
+    }
+
     @Override
     public void init(final Properties p) throws Exception {
         final Options o = new Options(p);
@@ -124,7 +149,7 @@ public class MulticastPulseAgent impleme
     private void buildPacket() throws SocketException {
 
         this.loopbackOnly = true;
-        for (final URI uri : uriSet) {
+        for (final URI uri : this.uriSet) {
             if (!isLoopback(uri.getHost())) {
                 this.loopbackOnly = false;
                 break;
@@ -151,10 +176,13 @@ public class MulticastPulseAgent impleme
         final byte[] bytes = (sb.toString()).getBytes(UTF8);
         this.response = new DatagramPacket(bytes, bytes.length, this.address);
 
-        log.debug("MultiPulse packet is: " + sb);
+        if (log.isDebugEnabled()) {
+            log.debug("MultiPulse packet is: " + sb);
+        }
 
         if (bytes.length > 2048) {
-            log.warning("MultiPulse packet is larger than 2048 bytes, clients will not be able to read the packet");
+            log.warning("MultiPulse packet is larger than 2048 bytes, clients will not be able to read the packet" +
+                        "\n - You should define the 'ignore' property to filter out unreachable addresses: " + sb);
         }
     }
 
@@ -201,7 +229,7 @@ public class MulticastPulseAgent impleme
 
     private void fireEvent(final URI uri, final boolean add) {
         if (null != this.listener) {
-            executor.execute(new Runnable() {
+            getExecutorService().execute(new Runnable() {
                 @Override
                 public void run() {
                     if (add) {
@@ -225,13 +253,24 @@ public class MulticastPulseAgent impleme
             }
 
             final CountDownLatch latch = new CountDownLatch(this.sockets.length);
-            final String mpg = MulticastPulseAgent.this.group;
-            final boolean isLoopBackOnly = MulticastPulseAgent.this.loopbackOnly;
-            final DatagramPacket mpr = MulticastPulseAgent.this.response;
+            final String mpg = this.group;
+            final boolean isLoopBackOnly = this.loopbackOnly;
+            final ExecutorService executorService = getExecutorService();
 
             for (final MulticastSocket socket : this.sockets) {
 
-                this.futures.add(executor.submit(new Runnable() {
+                final String socketKey;
+                try {
+                    socketKey = socket.getNetworkInterface().toString();
+                } catch (SocketException e) {
+                    log.error("Failed to get network interface name on: " + socket, e);
+                    continue;
+                }
+
+                final Sender sender = new Sender(this, socketKey, socket, this.response);
+                this.futures.add(executorService.submit(sender));
+
+                this.futures.add(executorService.submit(new Runnable() {
                     @Override
                     public void run() {
 
@@ -246,60 +285,37 @@ public class MulticastPulseAgent impleme
 
                                 if (null != sa) {
 
-                                    final String req = new String(request.getData(), 0, request.getLength());
+                                    String req = new String(request.getData(), 0, request.getLength());
 
-                                    executor.execute(new Runnable() {
-                                        @Override
-                                        public void run() {
+                                    if (req.startsWith(CLIENT)) {
 
-                                            String s = req;
+                                        req = (req.replace(CLIENT, ""));
 
-                                            if (s.startsWith(CLIENT)) {
+                                        if (mpg.equals(req) || "*".equals(req)) {
 
-                                                s = (s.replace(CLIENT, ""));
-
-                                                if (mpg.equals(s) || "*".equals(s)) {
-
-                                                    final String client = ((InetSocketAddress) sa).getAddress().getHostAddress();
-
-                                                    if (isLoopBackOnly) {
-                                                        //We only have local services, so make sure the request is from a local source else ignore it
-                                                        if (!MulticastPulseAgent.isLocalAddress(client, false)) {
-                                                            if (log.isDebugEnabled()) {
-                                                                log.debug(String.format("Ignoring remote client %1$s pulse request for group: %2$s - No remote services available",
-                                                                                        client,
-                                                                                        s));
-                                                            }
-                                                            return;
-                                                        }
-                                                    }
+                                            final String client = ((InetSocketAddress) sa).getAddress().getHostAddress();
 
+                                            if (isLoopBackOnly) {
+                                                //We only have local services, so make sure the request is from a local source else ignore it
+                                                if (!MulticastPulseAgent.isLocalAddress(client, false)) {
                                                     if (log.isDebugEnabled()) {
-                                                        log.debug(String.format("Answering client %1$s pulse request for group: %2$s", client, s));
-                                                    }
-
-                                                    //This is a valid client request for the server to respond on the same channel.
-                                                    //Because multicast is not guaranteed we will send 3 responses per valid request at 10ms intervals.
-                                                    for (int i = 0; i < 3; i++) {
-
-                                                        try {
-                                                            socket.send(mpr);
-                                                        } catch (Exception e) {
-                                                            if (log.isDebugEnabled()) {
-                                                                log.debug("MulticastPulseAgent client error: " + e.getMessage(), e);
-                                                            }
-                                                        }
-
-                                                        try {
-                                                            Thread.sleep(10);
-                                                        } catch (InterruptedException e) {
-                                                            break;
-                                                        }
+                                                        log.debug(String.format("Ignoring remote client %1$s pulse request for group: %2$s - No remote services available",
+                                                                                client,
+                                                                                req));
                                                     }
+                                                    return;
                                                 }
                                             }
+
+                                            //We have received a valid pulse request
+                                            if (log.isDebugEnabled()) {
+                                                log.debug(String.format("Answering client '%1$s' pulse request for group: '%2$s' on '%3$s'", client, req, socketKey));
+                                            }
+
+                                            //Renew response pulse
+                                            sender.pulseResponse();
                                         }
-                                    });
+                                    }
 
                                 }
 
@@ -356,7 +372,7 @@ public class MulticastPulseAgent impleme
 
             if (null != this.sockets) {
                 try {
-                    for (final MulticastSocket s : sockets) {
+                    for (final MulticastSocket s : this.sockets) {
                         try {
                             s.close();
                         } catch (Throwable e) {
@@ -425,7 +441,7 @@ public class MulticastPulseAgent impleme
 
         final ArrayList<MulticastSocket> list = new ArrayList<MulticastSocket>();
 
-        for (final NetworkInterface ni : interfaces) {
+        for (final NetworkInterface ni : getNetworkInterfaces()) {
 
             MulticastSocket ms = null;
 
@@ -453,7 +469,6 @@ public class MulticastPulseAgent impleme
                         //Ignore
                     }
                 }
-
             }
         }
 
@@ -590,4 +605,72 @@ public class MulticastPulseAgent impleme
 
         return sb.toString();
     }
+
+    private static class Sender implements Runnable {
+
+        private final AtomicInteger counter = new AtomicInteger(0);
+        private final MulticastPulseAgent agent;
+        private final String socketKey;
+        private final MulticastSocket socket;
+        private final DatagramPacket mpr;
+
+        private Sender(final MulticastPulseAgent agent, final String socketKey, final MulticastSocket socket, final DatagramPacket mpr) {
+            this.agent = agent;
+            this.socketKey = socketKey;
+            this.socket = socket;
+            this.mpr = mpr;
+        }
+
+        @Override
+        public void run() {
+            while (this.agent.running.get()) {
+
+                synchronized (this.counter) {
+                    try {
+                        //Wait indefinitely until we are interrupted or notified
+                        this.counter.wait();
+                    } catch (InterruptedException e) {
+                        if (!this.agent.running.get()) {
+                            break;
+                        }
+                    }
+                }
+
+                //Pulse a response every 10ms until our counter is 0 (at least 1 second)
+                while (this.counter.decrementAndGet() > 0) {
+
+                    try {
+                        this.socket.send(this.mpr);
+                    } catch (Exception e) {
+                        if (log.isDebugEnabled()) {
+                            log.debug("MulticastPulseAgent client error: " + e.getMessage(), e);
+                        }
+                    }
+
+                    try {
+                        Thread.sleep(10);
+                    } catch (InterruptedException e) {
+                        break;
+                    }
+                }
+            }
+        }
+
+        /**
+         * Renew the counter and notify to pulse response
+         */
+        private void pulseResponse() {
+
+            synchronized (this.counter) {
+
+                this.counter.set(100);
+                this.counter.notifyAll();
+            }
+        }
+
+        @Override
+        public String toString() {
+            return this.socketKey;
+        }
+    }
 }