You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@qpid.apache.org by rg...@apache.org on 2013/12/10 15:45:16 UTC
svn commit: r1549852 - in /qpid/branches/0.26/qpid/java: ./ amqp-1-0-common/
amqp-1-0-common/src/main/java/org/apache/qpid/amqp_1_0/framing/
amqp-1-0-common/src/main/java/org/apache/qpid/amqp_1_0/transport/
Author: rgodfrey
Date: Tue Dec 10 14:45:16 2013
New Revision: 1549852
URL: http://svn.apache.org/r1549852
Log:
Merged r1549670 to 0.26 branch
Modified:
qpid/branches/0.26/qpid/java/ (props changed)
qpid/branches/0.26/qpid/java/amqp-1-0-common/ (props changed)
qpid/branches/0.26/qpid/java/amqp-1-0-common/src/main/java/org/apache/qpid/amqp_1_0/framing/FrameHandler.java
qpid/branches/0.26/qpid/java/amqp-1-0-common/src/main/java/org/apache/qpid/amqp_1_0/transport/ConnectionEndpoint.java
qpid/branches/0.26/qpid/java/amqp-1-0-common/src/main/java/org/apache/qpid/amqp_1_0/transport/SessionEndpoint.java
qpid/branches/0.26/qpid/java/amqp-1-0-common/src/main/java/org/apache/qpid/amqp_1_0/transport/SessionState.java
Propchange: qpid/branches/0.26/qpid/java/
------------------------------------------------------------------------------
Merged /qpid/trunk/qpid/java:r1549670
Propchange: qpid/branches/0.26/qpid/java/amqp-1-0-common/
------------------------------------------------------------------------------
Merged /qpid/trunk/qpid/java/amqp-1-0-common:r1549670
Modified: qpid/branches/0.26/qpid/java/amqp-1-0-common/src/main/java/org/apache/qpid/amqp_1_0/framing/FrameHandler.java
URL: http://svn.apache.org/viewvc/qpid/branches/0.26/qpid/java/amqp-1-0-common/src/main/java/org/apache/qpid/amqp_1_0/framing/FrameHandler.java?rev=1549852&r1=1549851&r2=1549852&view=diff
==============================================================================
--- qpid/branches/0.26/qpid/java/amqp-1-0-common/src/main/java/org/apache/qpid/amqp_1_0/framing/FrameHandler.java (original)
+++ qpid/branches/0.26/qpid/java/amqp-1-0-common/src/main/java/org/apache/qpid/amqp_1_0/framing/FrameHandler.java Tue Dec 10 14:45:16 2013
@@ -272,13 +272,14 @@ public class FrameHandler implements Pro
{
_connection.handleError(frameParsingError);
}
- return this;
}
catch(RuntimeException e)
{
+ // This exception is unexpected. The up layer should handle error condition gracefully
+ _connection.handleError(this.createError(ConnectionError.CONNECTION_FORCED, e.toString()));
e.printStackTrace();
- throw e;
}
+ return this;
}
private static String toHex(ByteBuffer in)
Modified: qpid/branches/0.26/qpid/java/amqp-1-0-common/src/main/java/org/apache/qpid/amqp_1_0/transport/ConnectionEndpoint.java
URL: http://svn.apache.org/viewvc/qpid/branches/0.26/qpid/java/amqp-1-0-common/src/main/java/org/apache/qpid/amqp_1_0/transport/ConnectionEndpoint.java?rev=1549852&r1=1549851&r2=1549852&view=diff
==============================================================================
--- qpid/branches/0.26/qpid/java/amqp-1-0-common/src/main/java/org/apache/qpid/amqp_1_0/transport/ConnectionEndpoint.java (original)
+++ qpid/branches/0.26/qpid/java/amqp-1-0-common/src/main/java/org/apache/qpid/amqp_1_0/transport/ConnectionEndpoint.java Tue Dec 10 14:45:16 2013
@@ -24,6 +24,7 @@ package org.apache.qpid.amqp_1_0.transpo
import java.util.HashSet;
import java.util.Map;
import java.util.Set;
+
import org.apache.qpid.amqp_1_0.codec.DescribedTypeConstructorRegistry;
import org.apache.qpid.amqp_1_0.codec.ValueWriter;
import org.apache.qpid.amqp_1_0.framing.AMQFrame;
@@ -39,11 +40,11 @@ import org.apache.qpid.amqp_1_0.type.tra
import org.apache.qpid.amqp_1_0.type.transport.Error;
import org.apache.qpid.amqp_1_0.type.codec.AMQPDescribedTypeRegistry;
-
import javax.security.sasl.Sasl;
import javax.security.sasl.SaslException;
import javax.security.sasl.SaslServer;
import javax.security.sasl.SaslServerFactory;
+
import java.net.SocketAddress;
import java.nio.ByteBuffer;
import java.nio.charset.Charset;
@@ -290,8 +291,16 @@ public class ConnectionEndpoint implemen
private SessionEndpoint getSession(final short channel)
{
- // TODO assert existence, check channel state
- return _receivingSessions[channel];
+ SessionEndpoint session = _receivingSessions[channel];
+ if (session == null)
+ {
+ Error error = new Error();
+ error.setCondition(ConnectionError.FRAMING_ERROR);
+ error.setDescription("Frame received on channel " + channel + " which is not known as a begun session.");
+ this.handleError(error);
+ }
+
+ return session;
}
@@ -470,6 +479,11 @@ public class ConnectionEndpoint implemen
endpoint.setReceivingChannel(channel);
endpoint.setNextIncomingId(begin.getNextOutgoingId());
endpoint.setOutgoingSessionCredit(begin.getIncomingWindow());
+
+ if (endpoint.getState() == SessionState.END_SENT)
+ {
+ _sendingSessions[myChannelId] = null;
+ }
}
else
{
@@ -551,41 +565,59 @@ public class ConnectionEndpoint implemen
}
- public synchronized void sendEnd(short channel, End end)
+ public synchronized void sendEnd(short channel, End end, boolean remove)
{
send(channel, end);
- _sendingSessions[channel] = null;
+ if (remove)
+ {
+ _sendingSessions[channel] = null;
+ }
}
public synchronized void receiveAttach(short channel, Attach attach)
{
SessionEndpoint endPoint = getSession(channel);
- endPoint.receiveAttach(attach);
+ if (endPoint != null)
+ {
+ endPoint.receiveAttach(attach);
+ }
}
public synchronized void receiveDetach(short channel, Detach detach)
{
SessionEndpoint endPoint = getSession(channel);
- endPoint.receiveDetach(detach);
+ if (endPoint != null)
+ {
+ endPoint.receiveDetach(detach);
+ }
}
public synchronized void receiveTransfer(short channel, Transfer transfer)
{
SessionEndpoint endPoint = getSession(channel);
- endPoint.receiveTransfer(transfer);
+ if (endPoint != null)
+ {
+ endPoint.receiveTransfer(transfer);
+ }
}
public synchronized void receiveDisposition(short channel, Disposition disposition)
{
SessionEndpoint endPoint = getSession(channel);
- endPoint.receiveDisposition(disposition);
+ if (endPoint != null)
+ {
+ endPoint.receiveDisposition(disposition);
+ }
}
public synchronized void receiveFlow(short channel, Flow flow)
{
SessionEndpoint endPoint = getSession(channel);
- endPoint.receiveFlow(flow);
+ if (endPoint != null)
+ {
+ endPoint.receiveFlow(flow);
+ }
}
@@ -667,8 +699,9 @@ public class ConnectionEndpoint implemen
Close close = new Close();
close.setError(error);
send((short) 0, close);
+
+ this.setClosedForOutput(true);
}
- _closedForInput = true;
}
private final Logger _logger = Logger.getLogger("FRM");
Modified: qpid/branches/0.26/qpid/java/amqp-1-0-common/src/main/java/org/apache/qpid/amqp_1_0/transport/SessionEndpoint.java
URL: http://svn.apache.org/viewvc/qpid/branches/0.26/qpid/java/amqp-1-0-common/src/main/java/org/apache/qpid/amqp_1_0/transport/SessionEndpoint.java?rev=1549852&r1=1549851&r2=1549852&view=diff
==============================================================================
--- qpid/branches/0.26/qpid/java/amqp-1-0-common/src/main/java/org/apache/qpid/amqp_1_0/transport/SessionEndpoint.java (original)
+++ qpid/branches/0.26/qpid/java/amqp-1-0-common/src/main/java/org/apache/qpid/amqp_1_0/transport/SessionEndpoint.java Tue Dec 10 14:45:16 2013
@@ -118,6 +118,9 @@ public class SessionEndpoint
case BEGIN_SENT:
_state = SessionState.ACTIVE;
break;
+ case END_PIPE:
+ _state = SessionState.END_SENT;
+ break;
default:
// TODO error
@@ -158,6 +161,10 @@ public class SessionEndpoint
{
switch(_state)
{
+ case BEGIN_SENT:
+ _connection.sendEnd(getSendingChannel(), new End(), false);
+ _state = SessionState.END_PIPE;
+ break;
case END_SENT:
_state = SessionState.ENDED;
break;
@@ -165,7 +172,7 @@ public class SessionEndpoint
detachLinks();
_sessionEventListener.remoteEnd(end);
short sendChannel = getSendingChannel();
- _connection.sendEnd(sendChannel, new End());
+ _connection.sendEnd(sendChannel, new End(), true);
_state = end == null ? SessionState.END_SENT : SessionState.ENDED;
break;
default:
@@ -175,7 +182,7 @@ public class SessionEndpoint
error.setCondition(AmqpError.ILLEGAL_STATE);
error.setDescription("END called on Session which has not been opened");
reply.setError(error);
- _connection.sendEnd(sendChannel, reply);
+ _connection.sendEnd(sendChannel, reply, true);
break;
Modified: qpid/branches/0.26/qpid/java/amqp-1-0-common/src/main/java/org/apache/qpid/amqp_1_0/transport/SessionState.java
URL: http://svn.apache.org/viewvc/qpid/branches/0.26/qpid/java/amqp-1-0-common/src/main/java/org/apache/qpid/amqp_1_0/transport/SessionState.java?rev=1549852&r1=1549851&r2=1549852&view=diff
==============================================================================
--- qpid/branches/0.26/qpid/java/amqp-1-0-common/src/main/java/org/apache/qpid/amqp_1_0/transport/SessionState.java (original)
+++ qpid/branches/0.26/qpid/java/amqp-1-0-common/src/main/java/org/apache/qpid/amqp_1_0/transport/SessionState.java Tue Dec 10 14:45:16 2013
@@ -26,6 +26,7 @@ public enum SessionState
ACTIVE,
INACTIVE,
BEGIN_SENT,
+ END_PIPE,
BEGIN_RECVD,
END_SENT,
END_RECVD,
---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscribe@qpid.apache.org
For additional commands, e-mail: commits-help@qpid.apache.org