You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@tomee.apache.org by an...@apache.org on 2013/04/10 15:27:41 UTC

svn commit: r1466480 - in /tomee/tomee/trunk/server/openejb-client/src/main/java/org/apache/openejb/client: AbstractConnectionStrategy.java ClientDataSource.java EJBObjectHandler.java MulticastPulseClient.java proxy/Jdk13InvocationHandler.java

Author: andygumbrecht
Date: Wed Apr 10 13:27:40 2013
New Revision: 1466480

URL: http://svn.apache.org/r1466480
Log:
MulticastPulseClient should pulse at regular intervals until the lookup timeout is reached, rather than just once per lookup - Missed that even though I left a thread spare?.
AbstractConnectionStrategy synchronize connection construction.
Removed duplicate line in ClientDataSource.
Finals and overrides.

Modified:
    tomee/tomee/trunk/server/openejb-client/src/main/java/org/apache/openejb/client/AbstractConnectionStrategy.java
    tomee/tomee/trunk/server/openejb-client/src/main/java/org/apache/openejb/client/ClientDataSource.java
    tomee/tomee/trunk/server/openejb-client/src/main/java/org/apache/openejb/client/EJBObjectHandler.java
    tomee/tomee/trunk/server/openejb-client/src/main/java/org/apache/openejb/client/MulticastPulseClient.java
    tomee/tomee/trunk/server/openejb-client/src/main/java/org/apache/openejb/client/proxy/Jdk13InvocationHandler.java

Modified: tomee/tomee/trunk/server/openejb-client/src/main/java/org/apache/openejb/client/AbstractConnectionStrategy.java
URL: http://svn.apache.org/viewvc/tomee/tomee/trunk/server/openejb-client/src/main/java/org/apache/openejb/client/AbstractConnectionStrategy.java?rev=1466480&r1=1466479&r2=1466480&view=diff
==============================================================================
--- tomee/tomee/trunk/server/openejb-client/src/main/java/org/apache/openejb/client/AbstractConnectionStrategy.java (original)
+++ tomee/tomee/trunk/server/openejb-client/src/main/java/org/apache/openejb/client/AbstractConnectionStrategy.java Wed Apr 10 13:27:40 2013
@@ -20,59 +20,77 @@ import org.apache.openejb.client.event.B
 import org.apache.openejb.client.event.FailoverSelection;
 
 import java.io.IOException;
-import java.net.ConnectException;
 import java.net.URI;
 import java.util.Collections;
 import java.util.HashSet;
 import java.util.Set;
+import java.util.concurrent.locks.ReentrantLock;
 
 /**
  * @version $Rev$ $Date$
  */
 public abstract class AbstractConnectionStrategy implements ConnectionStrategy {
 
+    private final ReentrantLock lock = new ReentrantLock(true);
+
     @Override
-    public Connection connect(ClusterMetaData cluster, ServerMetaData server) throws IOException {
-        final Set<URI> failed = Client.getFailed();
-        final Set<URI> remaining = new HashSet<URI>();
+    public Connection connect(final ClusterMetaData cluster, final ServerMetaData server) throws IOException {
 
-        boolean failover = false;
+        final ReentrantLock l = lock;
+        l.lock();
 
-        final Iterable<URI> iterable = getIterable(cluster);
-        for (URI uri : iterable) {
-            if (failed.contains(uri)) continue;
+        try {
+            final Set<URI> failed = Client.getFailed();
+            final Set<URI> remaining = new HashSet<URI>();
 
-            if (failover) Client.fireEvent(createFailureEvent(remaining, failed, uri));
+            boolean failover = false;
 
-            try {
-                return connect(cluster, uri);
-            } catch (IOException e) {
+            final Iterable<URI> iterable = getIterable(cluster);
+            for (final URI uri : iterable) {
+                if (failed.contains(uri)) {
+                    continue;
+                }
 
-                if (!failover) {
-                    Collections.addAll(remaining, cluster.getLocations());
-                    remaining.removeAll(failed);
+                if (failover) {
+                    Client.fireEvent(createFailureEvent(remaining, failed, uri));
                 }
 
-                failed.add(uri);
-                remaining.remove(uri);
-                failover = true;
+                try {
+                    return connect(cluster, uri);
+                } catch (IOException e) {
+
+                    if (!failover) {
+                        Collections.addAll(remaining, cluster.getLocations());
+                        remaining.removeAll(failed);
+                    }
+
+                    failed.add(uri);
+                    remaining.remove(uri);
+                    failover = true;
+                }
             }
-        }
 
-        final URI uri = server.getLocation();
+            final URI uri = server.getLocation();
 
-        if (uri == null) throw new RemoteFailoverException("Attempted to connect to " + failed.size() + " servers.");
+            if (uri == null) {
+                throw new RemoteFailoverException("Attempted to connect to " + failed.size() + " servers.");
+            }
 
-        Client.fireEvent(new BootstrappingConnection(uri));
+            Client.fireEvent(new BootstrappingConnection(uri));
 
-        return connect(cluster, uri);
+            return connect(cluster, uri);
+        } finally {
+            l.unlock();
+        }
     }
 
-    private Iterable<URI> getIterable(ClusterMetaData cluster) {
+    private Iterable<URI> getIterable(final ClusterMetaData cluster) {
         final Context context = cluster.getContext();
         final StrategyData data = context.getComponent(StrategyData.class);
 
-        if (data != null) return data.getIterable();
+        if (data != null) {
+            return data.getIterable();
+        }
 
         context.setComponent(StrategyData.class, new StrategyData(createIterable(cluster)));
 
@@ -83,8 +101,8 @@ public abstract class AbstractConnection
 
     protected abstract Iterable<URI> createIterable(ClusterMetaData cluster);
 
-    protected Connection connect(ClusterMetaData cluster, URI uri) throws IOException {
-        Connection connection = ConnectionManager.getConnection(uri);
+    protected Connection connect(final ClusterMetaData cluster, final URI uri) throws IOException {
+        final Connection connection = ConnectionManager.getConnection(uri);
 
         // Grabbing the URI from the associated connection allows the ConnectionFactory to
         // employ discovery to find and connect to a server.  We then attempt to connect
@@ -94,9 +112,10 @@ public abstract class AbstractConnection
     }
 
     private static class StrategyData {
+
         private final Iterable<URI> iterable;
 
-        private StrategyData(Iterable<URI> iterable) {
+        private StrategyData(final Iterable<URI> iterable) {
             this.iterable = iterable;
         }
 

Modified: tomee/tomee/trunk/server/openejb-client/src/main/java/org/apache/openejb/client/ClientDataSource.java
URL: http://svn.apache.org/viewvc/tomee/tomee/trunk/server/openejb-client/src/main/java/org/apache/openejb/client/ClientDataSource.java?rev=1466480&r1=1466479&r2=1466480&view=diff
==============================================================================
--- tomee/tomee/trunk/server/openejb-client/src/main/java/org/apache/openejb/client/ClientDataSource.java (original)
+++ tomee/tomee/trunk/server/openejb-client/src/main/java/org/apache/openejb/client/ClientDataSource.java Wed Apr 10 13:27:40 2013
@@ -29,17 +29,18 @@ import java.util.logging.Logger;
 /**
  * @version $Rev$ $Date$
  */
+@SuppressWarnings("UseOfSystemOutOrSystemErr")
 public class ClientDataSource implements DataSource {
+
     private final String jdbcUrl;
     private final String defaultPassword;
     private final String defaultUserName;
 
-    public static void main(String[] args) throws URISyntaxException {
+    public static void main(final String[] args) throws URISyntaxException {
         URI uri1;
-        uri1 = new URI("datasource", null, "/path",null, null);
-        uri1 = new URI("datasource", null, "/path",null, null);
+        uri1 = new URI("datasource", null, "/path", null, null);
         System.out.println("uri = " + uri1);
-        uri1 = new URI("datasource", "host", "/path",null, null);
+        uri1 = new URI("datasource", "host", "/path", null, null);
         System.out.println("uri = " + uri1);
         uri1 = new URI("datasource", "host", "/path", "query", "fragment");
         System.out.println("uri = " + uri1);
@@ -48,7 +49,7 @@ public class ClientDataSource implements
         print(new URI(uri1.getSchemeSpecificPart()));
     }
 
-    private static void print(URI uri1) {
+    private static void print(final URI uri1) {
         System.out.println("uri = " + uri1);
         System.out.println("  scheme = " + uri1.getScheme());
         System.out.println("  part   = " + uri1.getSchemeSpecificPart());
@@ -57,61 +58,73 @@ public class ClientDataSource implements
         System.out.println("  query  = " + uri1.getQuery());
     }
 
-    public ClientDataSource(DataSourceMetaData d) {
+    public ClientDataSource(final DataSourceMetaData d) {
         this(d.getJdbcDriver(), d.getJdbcUrl(), d.getDefaultUserName(), d.getDefaultPassword());
     }
 
-    public ClientDataSource(String jdbcDriver, String jdbcUrl, String defaultUserName, String defaultPassword) {
+    public ClientDataSource(final String jdbcDriver, final String jdbcUrl, final String defaultUserName, final String defaultPassword) {
         this.defaultPassword = defaultPassword;
         this.defaultUserName = defaultUserName;
         this.jdbcUrl = jdbcUrl;
-        ClassLoader classLoader = Thread.currentThread().getContextClassLoader();
+        final ClassLoader classLoader = Thread.currentThread().getContextClassLoader();
         try {
             Class.forName(jdbcDriver, true, classLoader);
         } catch (ClassNotFoundException e) {
-            throw new IllegalStateException("Cannot use DataSource in client VM without the JDBC Driver in classpath: "+jdbcDriver, e);
+            throw new IllegalStateException("Cannot use DataSource in client VM without the JDBC Driver in classpath: " + jdbcDriver, e);
         } catch (NoClassDefFoundError e) {
-            throw new IllegalStateException("Cannot use DataSource in client VM without the JDBC Driver in classpath: "+jdbcDriver, e);
+            throw new IllegalStateException("Cannot use DataSource in client VM without the JDBC Driver in classpath: " + jdbcDriver, e);
         }
     }
 
+    @Override
     public Connection getConnection() throws SQLException {
         return getConnection(defaultUserName, defaultPassword);
     }
 
-    public Connection getConnection(String username, String password) throws SQLException {
-        Connection connection = DriverManager.getConnection(jdbcUrl, username, password);
-        return connection;
+    @Override
+    public Connection getConnection(final String username, final String password) throws SQLException {
+        return DriverManager.getConnection(jdbcUrl, username, password);
     }
 
+    @Override
     public int getLoginTimeout() throws SQLException {
         return 0;
     }
 
+    @Override
     public PrintWriter getLogWriter() throws SQLException {
         return null;
     }
 
-    public void setLoginTimeout(int seconds) throws SQLException {
+    @Override
+    public void setLoginTimeout(final int seconds) throws SQLException {
     }
 
-    public void setLogWriter(PrintWriter out) throws SQLException {
+    @Override
+    public void setLogWriter(final PrintWriter out) throws SQLException {
     }
 
-    public boolean isWrapperFor(java.lang.Class<?> iface) {
-        if (iface == null) throw new NullPointerException("iface is null");
+    @Override
+    public boolean isWrapperFor(final java.lang.Class<?> iface) {
+        if (iface == null) {
+            throw new NullPointerException("iface is null");
+        }
         return iface.isInstance(this);
     }
 
+    @Override
     @SuppressWarnings({"unchecked"})
-    public <T> T unwrap(Class<T> iface) throws SQLException {
-        if (iface == null) throw new NullPointerException("iface is null");
+    public <T> T unwrap(final Class<T> iface) throws SQLException {
+        if (iface == null) {
+            throw new NullPointerException("iface is null");
+        }
         if (iface.isInstance(this)) {
             return (T) this;
         }
         throw new SQLException(getClass().getName() + " does not implement " + iface.getName());
     }
 
+    @SuppressWarnings("override")
     public Logger getParentLogger() throws SQLFeatureNotSupportedException {
         return null;
     }

Modified: tomee/tomee/trunk/server/openejb-client/src/main/java/org/apache/openejb/client/EJBObjectHandler.java
URL: http://svn.apache.org/viewvc/tomee/tomee/trunk/server/openejb-client/src/main/java/org/apache/openejb/client/EJBObjectHandler.java?rev=1466480&r1=1466479&r2=1466480&view=diff
==============================================================================
--- tomee/tomee/trunk/server/openejb-client/src/main/java/org/apache/openejb/client/EJBObjectHandler.java (original)
+++ tomee/tomee/trunk/server/openejb-client/src/main/java/org/apache/openejb/client/EJBObjectHandler.java Wed Apr 10 13:27:40 2013
@@ -212,51 +212,43 @@ public abstract class EJBObjectHandler e
 
             if (m.getDeclaringClass().equals(Object.class)) {
 
-                if (m.equals(TOSTRING))
+                if (m.equals(TOSTRING)) {
                     return "proxy=" + this;
-
-                else if (m.equals(EQUALS))
+                } else if (m.equals(EQUALS)) {
                     return equals(m, a, p);
-
-                else if (m.equals(HASHCODE))
+                } else if (m.equals(HASHCODE)) {
                     return this.hashCode();
-
-                else
+                } else {
                     throw new UnsupportedOperationException("Unkown method: " + m);
+                }
 
             } else if (m.getDeclaringClass() == EJBObjectProxy.class) {
 
-                if (m.equals(GETHANDLER))
+                if (m.equals(GETHANDLER)) {
                     return this;
-
-                else if (m.getName().equals("writeReplace"))
+                } else if (m.getName().equals("writeReplace")) {
                     return new EJBObjectProxyHandle(this);
-
-                else if (m.getName().equals("readResolve"))
+                } else if (m.getName().equals("readResolve")) {
                     return null;
-
-                else
+                } else {
                     throw new UnsupportedOperationException("Unkown method: " + m);
+                }
 
             } else if (m.getDeclaringClass() == javax.ejb.EJBObject.class) {
 
-                if (m.equals(GETHANDLE))
+                if (m.equals(GETHANDLE)) {
                     return getHandle(m, a, p);
-
-                else if (m.equals(GETPRIMARYKEY))
+                } else if (m.equals(GETPRIMARYKEY)) {
                     return getPrimaryKey(m, a, p);
-
-                else if (m.equals(ISIDENTICAL))
+                } else if (m.equals(ISIDENTICAL)) {
                     return isIdentical(m, a, p);
-
-                else if (m.equals(GETEJBHOME))
+                } else if (m.equals(GETEJBHOME)) {
                     return getEJBHome(m, a, p);
-
-                else if (m.equals(REMOVE))
+                } else if (m.equals(REMOVE)) {
                     return remove(m, a, p);
-
-                else
+                } else {
                     throw new UnsupportedOperationException("Unkown method: " + m);
+                }
 
             } else {
 
@@ -286,12 +278,14 @@ public abstract class EJBObjectHandler e
             }
         } catch (Throwable throwable) {
             if (remote) {
-                if (throwable instanceof RemoteException)
+                if (throwable instanceof RemoteException) {
                     throw throwable;
+                }
                 throw new RemoteException("Unknown Container Exception: " + throwable.getClass().getName() + ": " + throwable.getMessage(), getCause(throwable));
             } else {
-                if (throwable instanceof EJBException)
+                if (throwable instanceof EJBException) {
                     throw throwable;
+                }
                 throw new EJBException("Unknown Container Exception: " + throwable.getClass().getName() + ": " + throwable.getMessage()).initCause(getCause(throwable));
             }
         }
@@ -457,9 +451,11 @@ public abstract class EJBObjectHandler e
                         final EJBResponse res = request(req);
                         if (res.getResponseCode() != ResponseCodes.EJB_OK) {
                             //TODO how do we notify the user that we fail to configure the value ?
+                            Logger.getLogger(this.getClass().getName()).info("Unexpected response on cancel: " + res);
                         }
                     } catch (Exception e) {
                         //TODO how to handle
+                        Logger.getLogger(this.getClass().getName()).log(Level.INFO, "Unexpected error on cancel", e);
                         return false;
                     }
                 }

Modified: tomee/tomee/trunk/server/openejb-client/src/main/java/org/apache/openejb/client/MulticastPulseClient.java
URL: http://svn.apache.org/viewvc/tomee/tomee/trunk/server/openejb-client/src/main/java/org/apache/openejb/client/MulticastPulseClient.java?rev=1466480&r1=1466479&r2=1466480&view=diff
==============================================================================
--- tomee/tomee/trunk/server/openejb-client/src/main/java/org/apache/openejb/client/MulticastPulseClient.java (original)
+++ tomee/tomee/trunk/server/openejb-client/src/main/java/org/apache/openejb/client/MulticastPulseClient.java Wed Apr 10 13:27:40 2013
@@ -81,7 +81,11 @@ public class MulticastPulseClient extend
         if (knownUris.size() >= LIMIT) {
             //This is here just as a brake to prevent DOS or OOME.
             //There is no way we should have more than this number of unique MutliPulse URI's in a LAN
-            throw new IllegalArgumentException("Unique MultiPulse URI limit of " + LIMIT + " reached. Increase using the system property '" + ORG_APACHE_OPENEJB_MULTIPULSE_URI_LIMIT + "'");
+            throw new IllegalArgumentException("Unique MultiPulse URI limit of " +
+                                               LIMIT +
+                                               " reached. Increase using the system property '" +
+                                               ORG_APACHE_OPENEJB_MULTIPULSE_URI_LIMIT +
+                                               "'");
         }
 
         Set<URI> uriSet = knownUris.get(uri);
@@ -232,7 +236,7 @@ public class MulticastPulseClient extend
             //Start threads that listen for multicast packets on our channel.
             //These need to start 'before' we pulse a request.
             final ArrayList<Future> futures = new ArrayList<Future>();
-            final CountDownLatch latch = new CountDownLatch(clientSocketsFinal.length);
+            final CountDownLatch latchListeners = new CountDownLatch(clientSocketsFinal.length);
 
             for (final MulticastSocket socket : clientSocketsFinal) {
 
@@ -240,8 +244,8 @@ public class MulticastPulseClient extend
                     @Override
                     public void run() {
                         try {
-                            latch.countDown();
                             final DatagramPacket response = new DatagramPacket(new byte[2048], 2048);
+                            latchListeners.countDown();
 
                             while (running.get()) {
                                 try {
@@ -354,17 +358,38 @@ public class MulticastPulseClient extend
             }
 
             try {
-                //Give threads a reasonable amount of time to start
-                if (latch.await(5, TimeUnit.SECONDS)) {
+                //Give listener threads a reasonable amount of time to start
+                if (latchListeners.await(5, TimeUnit.SECONDS)) {
 
-                    //Pulse the server - It is thread safe to use same sockets as send/receive synchronization is only on the packet
-                    for (final MulticastSocket socket : clientSocketsFinal) {
-                        try {
-                            socket.send(request);
-                        } catch (Throwable e) {
-                            //Ignore
+                    //Start pulsing request every 20ms - This will ensure we have at least 2 pulses within our minimum timeout
+                    futures.add(0, executor.submit(new Runnable() {
+                        @Override
+                        public void run() {
+                            while (running.get()) {
+                                //Pulse to listening servers - It is thread safe to use same sockets as send/receive synchronization is only on the packet
+                                for (final MulticastSocket socket : clientSocketsFinal) {
+
+                                    if (running.get()) {
+                                        try {
+                                            socket.send(request);
+                                        } catch (Throwable e) {
+                                            //Ignore
+                                        }
+                                    } else {
+                                        break;
+                                    }
+                                }
+
+                                if (running.get()) {
+                                    try {
+                                        Thread.sleep(20);
+                                    } catch (InterruptedException e) {
+                                        break;
+                                    }
+                                }
+                            }
                         }
-                    }
+                    }));
                 } else {
                     timeout = 1;
                 }

Modified: tomee/tomee/trunk/server/openejb-client/src/main/java/org/apache/openejb/client/proxy/Jdk13InvocationHandler.java
URL: http://svn.apache.org/viewvc/tomee/tomee/trunk/server/openejb-client/src/main/java/org/apache/openejb/client/proxy/Jdk13InvocationHandler.java?rev=1466480&r1=1466479&r2=1466480&view=diff
==============================================================================
--- tomee/tomee/trunk/server/openejb-client/src/main/java/org/apache/openejb/client/proxy/Jdk13InvocationHandler.java (original)
+++ tomee/tomee/trunk/server/openejb-client/src/main/java/org/apache/openejb/client/proxy/Jdk13InvocationHandler.java Wed Apr 10 13:27:40 2013
@@ -26,7 +26,7 @@ public class Jdk13InvocationHandler impl
     public Jdk13InvocationHandler() {
     }
 
-    public Jdk13InvocationHandler(InvocationHandler delegate) {
+    public Jdk13InvocationHandler(final InvocationHandler delegate) {
         setInvocationHandler(delegate);
     }
 
@@ -34,13 +34,14 @@ public class Jdk13InvocationHandler impl
         return delegate;
     }
 
-    public InvocationHandler setInvocationHandler(InvocationHandler handler) {
-        InvocationHandler old = delegate;
+    public InvocationHandler setInvocationHandler(final InvocationHandler handler) {
+        final InvocationHandler old = delegate;
         delegate = handler;
         return old;
     }
 
-    public Object invoke(Object proxy, Method method, Object... args) throws Throwable {
+    @Override
+    public Object invoke(final Object proxy, final Method method, final Object... args) throws Throwable {
         if (delegate == null) throw new NullPointerException("No invocation handler for proxy " + proxy);
 
         return delegate.invoke(proxy, method, args);