You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@qpid.apache.org by rg...@apache.org on 2013/12/10 15:45:16 UTC

svn commit: r1549852 - in /qpid/branches/0.26/qpid/java: ./ amqp-1-0-common/ amqp-1-0-common/src/main/java/org/apache/qpid/amqp_1_0/framing/ amqp-1-0-common/src/main/java/org/apache/qpid/amqp_1_0/transport/

Author: rgodfrey
Date: Tue Dec 10 14:45:16 2013
New Revision: 1549852

URL: http://svn.apache.org/r1549852
Log:
Merged r1549670 to 0.26 branch

Modified:
    qpid/branches/0.26/qpid/java/   (props changed)
    qpid/branches/0.26/qpid/java/amqp-1-0-common/   (props changed)
    qpid/branches/0.26/qpid/java/amqp-1-0-common/src/main/java/org/apache/qpid/amqp_1_0/framing/FrameHandler.java
    qpid/branches/0.26/qpid/java/amqp-1-0-common/src/main/java/org/apache/qpid/amqp_1_0/transport/ConnectionEndpoint.java
    qpid/branches/0.26/qpid/java/amqp-1-0-common/src/main/java/org/apache/qpid/amqp_1_0/transport/SessionEndpoint.java
    qpid/branches/0.26/qpid/java/amqp-1-0-common/src/main/java/org/apache/qpid/amqp_1_0/transport/SessionState.java

Propchange: qpid/branches/0.26/qpid/java/
------------------------------------------------------------------------------
  Merged /qpid/trunk/qpid/java:r1549670

Propchange: qpid/branches/0.26/qpid/java/amqp-1-0-common/
------------------------------------------------------------------------------
  Merged /qpid/trunk/qpid/java/amqp-1-0-common:r1549670

Modified: qpid/branches/0.26/qpid/java/amqp-1-0-common/src/main/java/org/apache/qpid/amqp_1_0/framing/FrameHandler.java
URL: http://svn.apache.org/viewvc/qpid/branches/0.26/qpid/java/amqp-1-0-common/src/main/java/org/apache/qpid/amqp_1_0/framing/FrameHandler.java?rev=1549852&r1=1549851&r2=1549852&view=diff
==============================================================================
--- qpid/branches/0.26/qpid/java/amqp-1-0-common/src/main/java/org/apache/qpid/amqp_1_0/framing/FrameHandler.java (original)
+++ qpid/branches/0.26/qpid/java/amqp-1-0-common/src/main/java/org/apache/qpid/amqp_1_0/framing/FrameHandler.java Tue Dec 10 14:45:16 2013
@@ -272,13 +272,14 @@ public class FrameHandler implements Pro
         {
             _connection.handleError(frameParsingError);
         }
-        return this;
         }
         catch(RuntimeException e)
         {
+            // This exception is unexpected. The up layer should handle error condition gracefully
+            _connection.handleError(this.createError(ConnectionError.CONNECTION_FORCED, e.toString()));
             e.printStackTrace();
-            throw e;
         }
+        return this;
     }
 
     private static String toHex(ByteBuffer in)

Modified: qpid/branches/0.26/qpid/java/amqp-1-0-common/src/main/java/org/apache/qpid/amqp_1_0/transport/ConnectionEndpoint.java
URL: http://svn.apache.org/viewvc/qpid/branches/0.26/qpid/java/amqp-1-0-common/src/main/java/org/apache/qpid/amqp_1_0/transport/ConnectionEndpoint.java?rev=1549852&r1=1549851&r2=1549852&view=diff
==============================================================================
--- qpid/branches/0.26/qpid/java/amqp-1-0-common/src/main/java/org/apache/qpid/amqp_1_0/transport/ConnectionEndpoint.java (original)
+++ qpid/branches/0.26/qpid/java/amqp-1-0-common/src/main/java/org/apache/qpid/amqp_1_0/transport/ConnectionEndpoint.java Tue Dec 10 14:45:16 2013
@@ -24,6 +24,7 @@ package org.apache.qpid.amqp_1_0.transpo
 import java.util.HashSet;
 import java.util.Map;
 import java.util.Set;
+
 import org.apache.qpid.amqp_1_0.codec.DescribedTypeConstructorRegistry;
 import org.apache.qpid.amqp_1_0.codec.ValueWriter;
 import org.apache.qpid.amqp_1_0.framing.AMQFrame;
@@ -39,11 +40,11 @@ import org.apache.qpid.amqp_1_0.type.tra
 import org.apache.qpid.amqp_1_0.type.transport.Error;
 import org.apache.qpid.amqp_1_0.type.codec.AMQPDescribedTypeRegistry;
 
-
 import javax.security.sasl.Sasl;
 import javax.security.sasl.SaslException;
 import javax.security.sasl.SaslServer;
 import javax.security.sasl.SaslServerFactory;
+
 import java.net.SocketAddress;
 import java.nio.ByteBuffer;
 import java.nio.charset.Charset;
@@ -290,8 +291,16 @@ public class ConnectionEndpoint implemen
 
     private SessionEndpoint getSession(final short channel)
     {
-        // TODO assert existence, check channel state
-        return _receivingSessions[channel];
+        SessionEndpoint session = _receivingSessions[channel];
+        if (session == null)
+        {
+            Error error = new Error();
+            error.setCondition(ConnectionError.FRAMING_ERROR);
+            error.setDescription("Frame received on channel " + channel + " which is not known as a begun session.");
+            this.handleError(error);
+        }
+        
+        return session;
     }
 
 
@@ -470,6 +479,11 @@ public class ConnectionEndpoint implemen
                     endpoint.setReceivingChannel(channel);
                     endpoint.setNextIncomingId(begin.getNextOutgoingId());
                     endpoint.setOutgoingSessionCredit(begin.getIncomingWindow());
+                    
+                    if (endpoint.getState() == SessionState.END_SENT)
+                    {
+                        _sendingSessions[myChannelId] = null;
+                    }
                 }
                 else
                 {
@@ -551,41 +565,59 @@ public class ConnectionEndpoint implemen
     }
 
 
-    public synchronized void sendEnd(short channel, End end)
+    public synchronized void sendEnd(short channel, End end, boolean remove)
     {
         send(channel, end);
-        _sendingSessions[channel] = null;
+        if (remove)
+        {
+            _sendingSessions[channel] = null;
+        }
     }
 
     public synchronized void receiveAttach(short channel, Attach attach)
     {
         SessionEndpoint endPoint = getSession(channel);
-        endPoint.receiveAttach(attach);
+        if (endPoint != null)
+        {
+            endPoint.receiveAttach(attach);
+        }
     }
 
 
     public synchronized void receiveDetach(short channel, Detach detach)
     {
         SessionEndpoint endPoint = getSession(channel);
-        endPoint.receiveDetach(detach);
+        if (endPoint != null)
+        {
+            endPoint.receiveDetach(detach);
+        }
     }
 
     public synchronized void receiveTransfer(short channel, Transfer transfer)
     {
         SessionEndpoint endPoint = getSession(channel);
-        endPoint.receiveTransfer(transfer);
+        if (endPoint != null)
+        {
+            endPoint.receiveTransfer(transfer);
+        }
     }
 
     public synchronized void receiveDisposition(short channel, Disposition disposition)
     {
         SessionEndpoint endPoint = getSession(channel);
-        endPoint.receiveDisposition(disposition);
+        if (endPoint != null)
+        {
+            endPoint.receiveDisposition(disposition);
+        }
     }
 
     public synchronized void receiveFlow(short channel, Flow flow)
     {
         SessionEndpoint endPoint = getSession(channel);
-        endPoint.receiveFlow(flow);
+        if (endPoint != null)
+        {
+            endPoint.receiveFlow(flow);
+        }
     }
 
 
@@ -667,8 +699,9 @@ public class ConnectionEndpoint implemen
             Close close = new Close();
             close.setError(error);
             send((short) 0, close);
+            
+            this.setClosedForOutput(true);
         }
-        _closedForInput = true;
     }
 
     private final Logger _logger = Logger.getLogger("FRM");

Modified: qpid/branches/0.26/qpid/java/amqp-1-0-common/src/main/java/org/apache/qpid/amqp_1_0/transport/SessionEndpoint.java
URL: http://svn.apache.org/viewvc/qpid/branches/0.26/qpid/java/amqp-1-0-common/src/main/java/org/apache/qpid/amqp_1_0/transport/SessionEndpoint.java?rev=1549852&r1=1549851&r2=1549852&view=diff
==============================================================================
--- qpid/branches/0.26/qpid/java/amqp-1-0-common/src/main/java/org/apache/qpid/amqp_1_0/transport/SessionEndpoint.java (original)
+++ qpid/branches/0.26/qpid/java/amqp-1-0-common/src/main/java/org/apache/qpid/amqp_1_0/transport/SessionEndpoint.java Tue Dec 10 14:45:16 2013
@@ -118,6 +118,9 @@ public class SessionEndpoint
             case BEGIN_SENT:
                 _state = SessionState.ACTIVE;
                 break;
+            case END_PIPE:
+                _state = SessionState.END_SENT;
+                break;
             default:
                 // TODO error
 
@@ -158,6 +161,10 @@ public class SessionEndpoint
         {
             switch(_state)
             {
+                case BEGIN_SENT:
+                    _connection.sendEnd(getSendingChannel(), new End(), false);
+                    _state = SessionState.END_PIPE;
+                    break;
                 case END_SENT:
                     _state = SessionState.ENDED;
                     break;
@@ -165,7 +172,7 @@ public class SessionEndpoint
                     detachLinks();
                     _sessionEventListener.remoteEnd(end);
                     short sendChannel = getSendingChannel();
-                    _connection.sendEnd(sendChannel, new End());
+                    _connection.sendEnd(sendChannel, new End(), true);
                     _state = end == null ? SessionState.END_SENT : SessionState.ENDED;
                     break;
                 default:
@@ -175,7 +182,7 @@ public class SessionEndpoint
                     error.setCondition(AmqpError.ILLEGAL_STATE);
                     error.setDescription("END called on Session which has not been opened");
                     reply.setError(error);
-                    _connection.sendEnd(sendChannel, reply);
+                    _connection.sendEnd(sendChannel, reply, true);
                     break;
 
 

Modified: qpid/branches/0.26/qpid/java/amqp-1-0-common/src/main/java/org/apache/qpid/amqp_1_0/transport/SessionState.java
URL: http://svn.apache.org/viewvc/qpid/branches/0.26/qpid/java/amqp-1-0-common/src/main/java/org/apache/qpid/amqp_1_0/transport/SessionState.java?rev=1549852&r1=1549851&r2=1549852&view=diff
==============================================================================
--- qpid/branches/0.26/qpid/java/amqp-1-0-common/src/main/java/org/apache/qpid/amqp_1_0/transport/SessionState.java (original)
+++ qpid/branches/0.26/qpid/java/amqp-1-0-common/src/main/java/org/apache/qpid/amqp_1_0/transport/SessionState.java Tue Dec 10 14:45:16 2013
@@ -26,6 +26,7 @@ public enum SessionState
     ACTIVE,
     INACTIVE,
     BEGIN_SENT,
+    END_PIPE,
     BEGIN_RECVD,
     END_SENT,
     END_RECVD,



---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscribe@qpid.apache.org
For additional commands, e-mail: commits-help@qpid.apache.org