You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@qpid.apache.org by rg...@apache.org on 2013/09/25 17:32:24 UTC

svn commit: r1526207 - in /qpid/trunk/qpid/java: amqp-1-0-client-jms/src/main/java/org/apache/qpid/amqp_1_0/jms/impl/ amqp-1-0-client/src/main/java/org/apache/qpid/amqp_1_0/client/ amqp-1-0-common/src/main/java/org/apache/qpid/amqp_1_0/framing/ amqp-1-...

Author: rgodfrey
Date: Wed Sep 25 15:32:23 2013
New Revision: 1526207

URL: http://svn.apache.org/r1526207
Log:
QPID-5113 :  JMS Client - JMS Connection exception listener is not notified when AMQ broker is killed (patch from Michael Samson)

Added:
    qpid/trunk/qpid/java/amqp-1-0-common/src/main/java/org/apache/qpid/amqp_1_0/framing/SocketExceptionHandler.java   (with props)
Modified:
    qpid/trunk/qpid/java/amqp-1-0-client-jms/src/main/java/org/apache/qpid/amqp_1_0/jms/impl/ConnectionImpl.java
    qpid/trunk/qpid/java/amqp-1-0-client/src/main/java/org/apache/qpid/amqp_1_0/client/Connection.java
    qpid/trunk/qpid/java/amqp-1-0-common/src/main/java/org/apache/qpid/amqp_1_0/framing/ConnectionHandler.java
    qpid/trunk/qpid/java/amqp-1-0-common/src/main/java/org/apache/qpid/amqp_1_0/type/transport/ConnectionError.java

Modified: qpid/trunk/qpid/java/amqp-1-0-client-jms/src/main/java/org/apache/qpid/amqp_1_0/jms/impl/ConnectionImpl.java
URL: http://svn.apache.org/viewvc/qpid/trunk/qpid/java/amqp-1-0-client-jms/src/main/java/org/apache/qpid/amqp_1_0/jms/impl/ConnectionImpl.java?rev=1526207&r1=1526206&r2=1526207&view=diff
==============================================================================
--- qpid/trunk/qpid/java/amqp-1-0-client-jms/src/main/java/org/apache/qpid/amqp_1_0/jms/impl/ConnectionImpl.java (original)
+++ qpid/trunk/qpid/java/amqp-1-0-client-jms/src/main/java/org/apache/qpid/amqp_1_0/jms/impl/ConnectionImpl.java Wed Sep 25 15:32:23 2013
@@ -107,6 +107,7 @@ public class ConnectionImpl implements C
                 {
                     _conn = new org.apache.qpid.amqp_1_0.client.Connection(_host,
                             _port, _username, _password, container, _remoteHost, _ssl);
+                    _conn.setConnectionErrorTask(new ConnectionErrorTask());
                     // TODO - retrieve negotiated AMQP version
                     _connectionMetaData = new ConnectionMetaDataImpl(1,0,0);
                 }
@@ -234,8 +235,8 @@ public class ConnectionImpl implements C
     public void setClientID(final String value) throws JMSException
     {
         checkNotConnected("Cannot set client-id to \""
-                                        + value
-                                        + "\"; client-id must be set before the connection is used");
+                          + value
+                          + "\"; client-id must be set before the connection is used");
         if( _clientId !=null )
         {
             throw new IllegalStateException("client-id has already been set");
@@ -534,4 +535,32 @@ public class ConnectionImpl implements C
         return _syncPublish;
     }
 
+    private class ConnectionErrorTask implements Runnable
+    {
+
+        @Override
+        public void run()
+        {
+
+            try
+            {
+                final ExceptionListener exceptionListener = getExceptionListener();
+
+                if(exceptionListener != null)
+                {
+                    final org.apache.qpid.amqp_1_0.type.transport.Error connectionError = _conn.getConnectionError();
+                    if(connectionError != null)
+                    {
+                        exceptionListener.onException(new JMSException(connectionError.getDescription(),
+                                connectionError.getCondition().toString()));
+                    }
+                }
+            }
+            catch (JMSException ignored)
+            {
+                // ignored
+            }
+        }
+    }
+
 }

Modified: qpid/trunk/qpid/java/amqp-1-0-client/src/main/java/org/apache/qpid/amqp_1_0/client/Connection.java
URL: http://svn.apache.org/viewvc/qpid/trunk/qpid/java/amqp-1-0-client/src/main/java/org/apache/qpid/amqp_1_0/client/Connection.java?rev=1526207&r1=1526206&r2=1526207&view=diff
==============================================================================
--- qpid/trunk/qpid/java/amqp-1-0-client/src/main/java/org/apache/qpid/amqp_1_0/client/Connection.java (original)
+++ qpid/trunk/qpid/java/amqp-1-0-client/src/main/java/org/apache/qpid/amqp_1_0/client/Connection.java Wed Sep 25 15:32:23 2013
@@ -28,7 +28,10 @@ import java.nio.ByteBuffer;
 import java.security.Principal;
 import java.util.logging.Level;
 import java.util.logging.Logger;
+
 import javax.net.ssl.SSLSocketFactory;
+
+import org.apache.qpid.amqp_1_0.framing.SocketExceptionHandler;
 import org.apache.qpid.amqp_1_0.framing.ConnectionHandler;
 import org.apache.qpid.amqp_1_0.transport.AMQPTransport;
 import org.apache.qpid.amqp_1_0.transport.ConnectionEndpoint;
@@ -38,8 +41,10 @@ import org.apache.qpid.amqp_1_0.type.Bin
 import org.apache.qpid.amqp_1_0.type.FrameBody;
 import org.apache.qpid.amqp_1_0.type.SaslFrameBody;
 import org.apache.qpid.amqp_1_0.type.UnsignedInteger;
+import org.apache.qpid.amqp_1_0.type.transport.ConnectionError;
+import org.apache.qpid.amqp_1_0.type.transport.Error;
 
-public class Connection
+public class Connection implements SocketExceptionHandler
 {
     private static final Logger RAW_LOGGER = Logger.getLogger("RAW");
     private static final int MAX_FRAME_SIZE = 65536;
@@ -47,6 +52,8 @@ public class Connection
     private String _address;
     private ConnectionEndpoint _conn;
     private int _sessionCount;
+    private Runnable _connectionErrorTask;
+    private Error _socketError;
 
 
     public Connection(final String address,
@@ -223,7 +230,7 @@ public class Connection
             }
 
 
-            ConnectionHandler.BytesOutputHandler outputHandler = new ConnectionHandler.BytesOutputHandler(outputStream, src, _conn);
+            ConnectionHandler.BytesOutputHandler outputHandler = new ConnectionHandler.BytesOutputHandler(outputStream, src, _conn, this);
             Thread outputThread = new Thread(outputHandler);
             outputThread.setDaemon(true);
             outputThread.start();
@@ -409,4 +416,38 @@ public class Connection
             }
         }
     }
+
+    /**
+     * Set the connection error task that will be used as a callback for any socket read/write errors.
+     *
+     * @param connectionErrorTask connection error task
+     */
+    public void setConnectionErrorTask(Runnable connectionErrorTask)
+    {
+        _connectionErrorTask = connectionErrorTask;
+    }
+
+    /**
+     * Return the connection error for any socket read/write error that has occurred
+     *
+     * @return connection error
+     */
+    public Error getConnectionError()
+    {
+        return _socketError;
+    }
+
+    @Override
+    public void processSocketException(Exception exception)
+    {
+        Error socketError = new Error();
+        socketError.setDescription(exception.getClass() + ": " + exception.getMessage());
+        socketError.setCondition(ConnectionError.SOCKET_ERROR);
+        _socketError = socketError;
+        if(_connectionErrorTask != null)
+        {
+            Thread thread = new Thread(_connectionErrorTask);
+            thread.run();
+        }
+    }
 }

Modified: qpid/trunk/qpid/java/amqp-1-0-common/src/main/java/org/apache/qpid/amqp_1_0/framing/ConnectionHandler.java
URL: http://svn.apache.org/viewvc/qpid/trunk/qpid/java/amqp-1-0-common/src/main/java/org/apache/qpid/amqp_1_0/framing/ConnectionHandler.java?rev=1526207&r1=1526206&r2=1526207&view=diff
==============================================================================
--- qpid/trunk/qpid/java/amqp-1-0-common/src/main/java/org/apache/qpid/amqp_1_0/framing/ConnectionHandler.java (original)
+++ qpid/trunk/qpid/java/amqp-1-0-common/src/main/java/org/apache/qpid/amqp_1_0/framing/ConnectionHandler.java Wed Sep 25 15:32:23 2013
@@ -386,12 +386,14 @@ public class ConnectionHandler
         private BytesSource _bytesSource;
         private boolean _closed;
         private ConnectionEndpoint _conn;
+        private SocketExceptionHandler _exceptionHandler;
 
-        public BytesOutputHandler(OutputStream outputStream, BytesSource source, ConnectionEndpoint conn)
+        public BytesOutputHandler(OutputStream outputStream, BytesSource source, ConnectionEndpoint conn, SocketExceptionHandler exceptionHandler)
             {
                 _outputStream = outputStream;
                 _bytesSource = source;
                 _conn = conn;
+                _exceptionHandler = exceptionHandler;
             }
 
             public void run()
@@ -421,7 +423,7 @@ public class ConnectionHandler
             catch (IOException e)
             {
                 _closed = true;
-                e.printStackTrace();  //TODO
+                _exceptionHandler.processSocketException(e);
             }
         }
     }

Added: qpid/trunk/qpid/java/amqp-1-0-common/src/main/java/org/apache/qpid/amqp_1_0/framing/SocketExceptionHandler.java
URL: http://svn.apache.org/viewvc/qpid/trunk/qpid/java/amqp-1-0-common/src/main/java/org/apache/qpid/amqp_1_0/framing/SocketExceptionHandler.java?rev=1526207&view=auto
==============================================================================
--- qpid/trunk/qpid/java/amqp-1-0-common/src/main/java/org/apache/qpid/amqp_1_0/framing/SocketExceptionHandler.java (added)
+++ qpid/trunk/qpid/java/amqp-1-0-common/src/main/java/org/apache/qpid/amqp_1_0/framing/SocketExceptionHandler.java Wed Sep 25 15:32:23 2013
@@ -0,0 +1,31 @@
+/*
+ *
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ *
+ */
+package org.apache.qpid.amqp_1_0.framing;
+
+/**
+ * Callback interface for processing socket exceptions.
+ */
+public interface SocketExceptionHandler
+{
+
+  public void processSocketException(Exception exception);
+
+}

Propchange: qpid/trunk/qpid/java/amqp-1-0-common/src/main/java/org/apache/qpid/amqp_1_0/framing/SocketExceptionHandler.java
------------------------------------------------------------------------------
    svn:eol-style = native

Modified: qpid/trunk/qpid/java/amqp-1-0-common/src/main/java/org/apache/qpid/amqp_1_0/type/transport/ConnectionError.java
URL: http://svn.apache.org/viewvc/qpid/trunk/qpid/java/amqp-1-0-common/src/main/java/org/apache/qpid/amqp_1_0/type/transport/ConnectionError.java?rev=1526207&r1=1526206&r2=1526207&view=diff
==============================================================================
--- qpid/trunk/qpid/java/amqp-1-0-common/src/main/java/org/apache/qpid/amqp_1_0/type/transport/ConnectionError.java (original)
+++ qpid/trunk/qpid/java/amqp-1-0-common/src/main/java/org/apache/qpid/amqp_1_0/type/transport/ConnectionError.java Wed Sep 25 15:32:23 2013
@@ -43,6 +43,8 @@ public class ConnectionError
     
     public static final ConnectionError REDIRECT = new ConnectionError(Symbol.valueOf("amqp:connection:redirect"));
     
+    public static final ConnectionError SOCKET_ERROR = new ConnectionError(Symbol.valueOf("amqp:connection:socket-error"));
+    
 
 
     private ConnectionError(Symbol val)
@@ -73,6 +75,11 @@ public class ConnectionError
             return "redirect";
         }
         
+        if(this == SOCKET_ERROR)
+        {
+            return "socket-error";
+        }
+        
         else
         {
             return String.valueOf(_val);
@@ -97,6 +104,11 @@ public class ConnectionError
         {
             return REDIRECT;
         }
+        
+        if(SOCKET_ERROR._val.equals(val))
+        {
+            return SOCKET_ERROR;
+        }
     
         // TODO ERROR
         return null;



---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscribe@qpid.apache.org
For additional commands, e-mail: commits-help@qpid.apache.org