You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@qpid.apache.org by fa...@apache.org on 2014/09/06 13:23:14 UTC
svn commit: r1622849 [8/9] - in
/qpid/proton/branches/fadams-javascript-binding: ./ contrib/
contrib/proton-hawtdispatch/ contrib/proton-hawtdispatch/src/
contrib/proton-hawtdispatch/src/main/
contrib/proton-hawtdispatch/src/main/java/ contrib/proton-h...
Modified: qpid/proton/branches/fadams-javascript-binding/proton-j/src/main/java/org/apache/qpid/proton/engine/impl/SenderImpl.java
URL: http://svn.apache.org/viewvc/qpid/proton/branches/fadams-javascript-binding/proton-j/src/main/java/org/apache/qpid/proton/engine/impl/SenderImpl.java?rev=1622849&r1=1622848&r2=1622849&view=diff
==============================================================================
--- qpid/proton/branches/fadams-javascript-binding/proton-j/src/main/java/org/apache/qpid/proton/engine/impl/SenderImpl.java (original)
+++ qpid/proton/branches/fadams-javascript-binding/proton-j/src/main/java/org/apache/qpid/proton/engine/impl/SenderImpl.java Sat Sep 6 11:23:10 2014
@@ -63,11 +63,11 @@ public class SenderImpl extends LinkImp
//TODO.
}
- public void free()
+ @Override
+ void doFree()
{
getSession().freeSender(this);
- super.free();
-
+ super.doFree();
}
@Override
Modified: qpid/proton/branches/fadams-javascript-binding/proton-j/src/main/java/org/apache/qpid/proton/engine/impl/SessionImpl.java
URL: http://svn.apache.org/viewvc/qpid/proton/branches/fadams-javascript-binding/proton-j/src/main/java/org/apache/qpid/proton/engine/impl/SessionImpl.java?rev=1622849&r1=1622848&r2=1622849&view=diff
==============================================================================
--- qpid/proton/branches/fadams-javascript-binding/proton-j/src/main/java/org/apache/qpid/proton/engine/impl/SessionImpl.java (original)
+++ qpid/proton/branches/fadams-javascript-binding/proton-j/src/main/java/org/apache/qpid/proton/engine/impl/SessionImpl.java Sat Sep 6 11:23:10 2014
@@ -45,7 +45,9 @@ public class SessionImpl extends Endpoin
SessionImpl(ConnectionImpl connection)
{
_connection = connection;
+ _connection.incref();
_node = _connection.addSessionEndpoint(this);
+ _connection.put(Event.Type.SESSION_INIT, this);
}
public SenderImpl sender(String name)
@@ -90,25 +92,38 @@ public class SessionImpl extends Endpoin
return getConnectionImpl();
}
- public void free()
- {
- super.free();
+ @Override
+ void postFinal() {
+ _connection.put(Event.Type.SESSION_FINAL, this);
+ _connection.decref();
+ }
+ @Override
+ void doFree() {
_connection.removeSessionEndpoint(_node);
_node = null;
- for(SenderImpl sender : _senders.values())
- {
+ for(SenderImpl sender : _senders.values()) {
sender.free();
}
_senders.clear();
- for(ReceiverImpl receiver : _receivers.values())
- {
+ for(ReceiverImpl receiver : _receivers.values()) {
receiver.free();
}
_receivers.clear();
}
+ void modifyEndpoints() {
+ for (SenderImpl snd : _senders.values()) {
+ snd.modifyEndpoints();
+ }
+
+ for (ReceiverImpl rcv : _receivers.values()) {
+ rcv.modifyEndpoints();
+ }
+ modified();
+ }
+
TransportSession getTransportSession()
{
return _transportSession;
@@ -184,11 +199,14 @@ public class SessionImpl extends Endpoin
}
@Override
- protected void localStateChanged()
+ void localOpen()
{
- EventImpl ev = getConnectionImpl().put(Event.Type.SESSION_LOCAL_STATE);
- if (ev != null) {
- ev.init(this);
- }
+ getConnectionImpl().put(Event.Type.SESSION_OPEN, this);
+ }
+
+ @Override
+ void localClose()
+ {
+ getConnectionImpl().put(Event.Type.SESSION_CLOSE, this);
}
}
Modified: qpid/proton/branches/fadams-javascript-binding/proton-j/src/main/java/org/apache/qpid/proton/engine/impl/TransportImpl.java
URL: http://svn.apache.org/viewvc/qpid/proton/branches/fadams-javascript-binding/proton-j/src/main/java/org/apache/qpid/proton/engine/impl/TransportImpl.java?rev=1622849&r1=1622848&r2=1622849&view=diff
==============================================================================
--- qpid/proton/branches/fadams-javascript-binding/proton-j/src/main/java/org/apache/qpid/proton/engine/impl/TransportImpl.java (original)
+++ qpid/proton/branches/fadams-javascript-binding/proton-j/src/main/java/org/apache/qpid/proton/engine/impl/TransportImpl.java Sat Sep 6 11:23:10 2014
@@ -17,14 +17,14 @@
package org.apache.qpid.proton.engine.impl;
-import static org.apache.qpid.proton.engine.impl.ByteBufferUtils.newReadableBuffer;
-import static org.apache.qpid.proton.engine.impl.ByteBufferUtils.pour;
import static org.apache.qpid.proton.engine.impl.ByteBufferUtils.pourArrayToBuffer;
import static org.apache.qpid.proton.engine.impl.ByteBufferUtils.pourBufferToArray;
import java.nio.ByteBuffer;
import java.util.HashMap;
+import java.util.HashSet;
import java.util.Map;
+import java.util.Set;
import org.apache.qpid.proton.amqp.Binary;
import org.apache.qpid.proton.amqp.UnsignedInteger;
@@ -32,6 +32,7 @@ import org.apache.qpid.proton.amqp.Unsig
import org.apache.qpid.proton.amqp.transport.Attach;
import org.apache.qpid.proton.amqp.transport.Begin;
import org.apache.qpid.proton.amqp.transport.Close;
+import org.apache.qpid.proton.amqp.transport.ConnectionError;
import org.apache.qpid.proton.amqp.transport.Detach;
import org.apache.qpid.proton.amqp.transport.Disposition;
import org.apache.qpid.proton.amqp.transport.End;
@@ -44,10 +45,8 @@ import org.apache.qpid.proton.amqp.trans
import org.apache.qpid.proton.codec.AMQPDefinedTypes;
import org.apache.qpid.proton.codec.DecoderImpl;
import org.apache.qpid.proton.codec.EncoderImpl;
-import org.apache.qpid.proton.codec.WritableBuffer;
import org.apache.qpid.proton.engine.Connection;
import org.apache.qpid.proton.engine.EndpointState;
-import org.apache.qpid.proton.engine.EngineFactory;
import org.apache.qpid.proton.engine.Event;
import org.apache.qpid.proton.engine.ProtonJTransport;
import org.apache.qpid.proton.engine.Sasl;
@@ -66,7 +65,20 @@ public class TransportImpl extends Endpo
implements ProtonJTransport, FrameBody.FrameBodyHandler<Integer>,
FrameHandler, TransportOutputWriter
{
- private static final byte AMQP_FRAME_TYPE = 0;
+ static final int BUFFER_RELEASE_THRESHOLD = Integer.getInteger("proton.transport_buffer_release_threshold", 2 * 1024 * 1024);
+
+ private static final boolean getBooleanEnv(String name)
+ {
+ String value = System.getenv(name);
+ return "true".equalsIgnoreCase(value) ||
+ "1".equals(value) ||
+ "yes".equalsIgnoreCase(value);
+ }
+
+ private static final boolean FRM_ENABLED = getBooleanEnv("PN_TRACE_FRM");
+
+ // trace levels
+ private int _levels = (FRM_ENABLED ? this.TRACE_FRM : 0);
private FrameParser _frameParser;
@@ -100,16 +112,16 @@ public class TransportImpl extends Endpo
private Open _open;
private SaslImpl _sasl;
private SslImpl _ssl;
- private ProtocolTracer _protocolTracer = null;
-
- private ByteBuffer _lastInputBuffer;
+ private final Ref<ProtocolTracer> _protocolTracer = new Ref(null);
private TransportResult _lastTransportResult = TransportResultFactory.ok();
private boolean _init;
+ private boolean _processingStarted;
private FrameHandler _frameHandler = this;
private boolean _head_closed = false;
+ private TransportException _tail_error = null;
/**
* @deprecated This constructor's visibility will be reduced to the default scope in a future release.
@@ -134,7 +146,6 @@ public class TransportImpl extends Endpo
FrameWriter.AMQP_FRAME_TYPE,
_protocolTracer,
this);
-
}
private void init()
@@ -149,6 +160,11 @@ public class TransportImpl extends Endpo
}
@Override
+ public void trace(int levels) {
+ _levels = levels;
+ }
+
+ @Override
public int getMaxFrameSize()
{
return _maxFrameSize;
@@ -194,9 +210,10 @@ public class TransportImpl extends Endpo
@Override
public void bind(Connection conn)
{
- // TODO - check if already bound
- ((ConnectionImpl) conn).setTransport(this);
_connectionEndpoint = (ConnectionImpl) conn;
+ // TODO - check if already bound
+ _connectionEndpoint.setTransport(this);
+ _connectionEndpoint.incref();
if(getRemoteState() != EndpointState.UNINITIALIZED)
{
@@ -211,6 +228,23 @@ public class TransportImpl extends Endpo
}
@Override
+ public void unbind()
+ {
+ _connectionEndpoint.modifyEndpoints();
+
+ _connectionEndpoint.setTransport(null);
+ _connectionEndpoint.decref();
+
+ for (TransportSession ts: _transportSessionState.values()) {
+ ts.unbind();
+ }
+
+ for (TransportLink tl: _transportLinkState.values()) {
+ tl.unbind();
+ }
+ }
+
+ @Override
public int input(byte[] bytes, int offset, int length)
{
oldApiCheckStateBeforeInput(length).checkIsOk();
@@ -278,8 +312,13 @@ public class TransportImpl extends Endpo
{
if(_sasl == null)
{
+ if(_processingStarted)
+ {
+ throw new IllegalStateException("Sasl can't be initiated after transport has started processing");
+ }
+
init();
- _sasl = new SaslImpl(_remoteMaxFrameSize);
+ _sasl = new SaslImpl(this, _remoteMaxFrameSize);
TransportWrapper transportWrapper = _sasl.wrap(_inputProcessor, _outputProcessor);
_inputProcessor = transportWrapper;
_outputProcessor = transportWrapper;
@@ -334,38 +373,37 @@ public class TransportImpl extends Endpo
&& transportLink.isLocalHandleSet()
&& !_isCloseSent)
{
- if(!(link instanceof SenderImpl)
- || link.getQueued() == 0
- || transportLink.detachReceived()
- || transportSession.endReceived()
- || _closeReceived)
- {
- UnsignedInteger localHandle = transportLink.getLocalHandle();
- transportLink.clearLocalHandle();
- transportSession.freeLocalHandle(localHandle);
+ if((link instanceof SenderImpl)
+ && link.getQueued() > 0
+ && !transportLink.detachReceived()
+ && !transportSession.endReceived()
+ && !_closeReceived) {
+ endpoint = endpoint.transportNext();
+ continue;
+ }
+ UnsignedInteger localHandle = transportLink.getLocalHandle();
+ transportLink.clearLocalHandle();
+ transportSession.freeLocalHandle(localHandle);
- Detach detach = new Detach();
- detach.setHandle(localHandle);
- // TODO - need an API for detaching rather than closing the link
- detach.setClosed(true);
- ErrorCondition localError = link.getCondition();
- if( localError.getCondition() !=null )
- {
- detach.setError(localError);
- }
+ Detach detach = new Detach();
+ detach.setHandle(localHandle);
+ // TODO - need an API for detaching rather than closing the link
+ detach.setClosed(true);
+ ErrorCondition localError = link.getCondition();
+ if( localError.getCondition() !=null )
+ {
+ detach.setError(localError);
+ }
- writeFrame(transportSession.getLocalChannel(), detach, null, null);
- endpoint.clearModified();
- // TODO - temporary hack for PROTON-154, this line should be removed and replaced
- // with proper handling for closed links
- link.free();
- }
+ writeFrame(transportSession.getLocalChannel(), detach, null, null);
}
+ endpoint.clearModified();
+
}
endpoint = endpoint.transportNext();
}
@@ -411,8 +449,6 @@ public class TransportImpl extends Endpo
sender.setDrained(0);
writeFlow(transportSession, transportLink);
-
- endpoint.clearModified();
}
}
@@ -470,7 +506,7 @@ public class TransportImpl extends Endpo
if(!delivery.isDone() &&
(delivery.getDataLength() > 0 || delivery != snd.current()) &&
tpSession.hasOutgoingCredit() && tpLink.hasCredit() &&
- tpLink.getLocalHandle() != null)
+ tpLink.getLocalHandle() != null && !_frameWriter.isFull())
{
UnsignedInteger deliveryId = tpSession.getOutgoingDeliveryId();
TransportDelivery tpDelivery = new TransportDelivery(deliveryId, delivery, tpLink);
@@ -529,6 +565,8 @@ public class TransportImpl extends Endpo
delivery.setDataLength(payload.remaining());
session.incrementOutgoingBytes(-delta);
}
+
+ getConnectionImpl().put(Event.Type.LINK_FLOW, snd);
}
if(wasDone && delivery.getLocalState() != null)
@@ -596,10 +634,6 @@ public class TransportImpl extends Endpo
{
transportLink.addCredit(credits);
writeFlow(transportSession, transportLink);
- if(receiver.getLocalState() == EndpointState.ACTIVE)
- {
- endpoint.clearModified();
- }
}
}
}
@@ -689,10 +723,6 @@ public class TransportImpl extends Endpo
writeFrame(transportSession.getLocalChannel(), attach, null, null);
transportLink.sentAttach();
- if(link.getLocalState() == EndpointState.ACTIVE && (link instanceof SenderImpl || !link.hasCredit()))
- {
- endpoint.clearModified();
- }
}
}
}
@@ -712,15 +742,22 @@ public class TransportImpl extends Endpo
private void processOpen()
{
- if(_connectionEndpoint != null && _connectionEndpoint.getLocalState() != EndpointState.UNINITIALIZED && !_isOpenSent)
- {
+ if ((_tail_error != null ||
+ (_connectionEndpoint != null &&
+ _connectionEndpoint.getLocalState() != EndpointState.UNINITIALIZED)) &&
+ !_isOpenSent) {
Open open = new Open();
- String cid = _connectionEndpoint.getLocalContainerId();
- open.setContainerId(cid == null ? "" : cid);
- open.setHostname(_connectionEndpoint.getHostname());
- open.setDesiredCapabilities(_connectionEndpoint.getDesiredCapabilities());
- open.setOfferedCapabilities(_connectionEndpoint.getOfferedCapabilities());
- open.setProperties(_connectionEndpoint.getProperties());
+ if (_connectionEndpoint != null) {
+ String cid = _connectionEndpoint.getLocalContainerId();
+ open.setContainerId(cid == null ? "" : cid);
+ open.setHostname(_connectionEndpoint.getHostname());
+ open.setDesiredCapabilities(_connectionEndpoint.getDesiredCapabilities());
+ open.setOfferedCapabilities(_connectionEndpoint.getOfferedCapabilities());
+ open.setProperties(_connectionEndpoint.getProperties());
+ } else {
+ open.setContainerId("");
+ }
+
if (_maxFrameSize > 0) {
open.setMaxFrameSize(UnsignedInteger.valueOf(_maxFrameSize));
}
@@ -731,7 +768,6 @@ public class TransportImpl extends Endpo
_isOpenSent = true;
writeFrame(0, open, null, null);
-
}
}
@@ -762,10 +798,6 @@ public class TransportImpl extends Endpo
writeFrame(channelId, begin, null, null);
transportSession.sentBegin();
- if(session.getLocalState() == EndpointState.ACTIVE)
- {
- endpoint.clearModified();
- }
}
}
endpoint = endpoint.transportNext();
@@ -829,21 +861,27 @@ public class TransportImpl extends Endpo
SessionImpl session;
TransportSession transportSession;
- if((endpoint instanceof SessionImpl)
- && (session = (SessionImpl)endpoint).getLocalState() == EndpointState.CLOSED
- && (transportSession = session.getTransportSession()).isLocalChannelSet()
- && !hasSendableMessages(session)
- && !_isCloseSent)
- {
- int channel = freeLocalChannel(transportSession);
- End end = new End();
- ErrorCondition localError = endpoint.getCondition();
- if( localError.getCondition() !=null )
+ if((endpoint instanceof SessionImpl)) {
+ if ((session = (SessionImpl)endpoint).getLocalState() == EndpointState.CLOSED
+ && (transportSession = session.getTransportSession()).isLocalChannelSet()
+ && !_isCloseSent)
{
- end.setError(localError);
+ if (hasSendableMessages(session)) {
+ endpoint = endpoint.transportNext();
+ continue;
+ }
+
+ int channel = freeLocalChannel(transportSession);
+ End end = new End();
+ ErrorCondition localError = endpoint.getCondition();
+ if( localError.getCondition() !=null )
+ {
+ end.setError(localError);
+ }
+
+ writeFrame(channel, end, null, null);
}
- writeFrame(channel, end, null, null);
endpoint.clearModified();
}
@@ -854,6 +892,9 @@ public class TransportImpl extends Endpo
private boolean hasSendableMessages(SessionImpl session)
{
+ if (_connectionEndpoint == null) {
+ return false;
+ }
if(!_closeReceived && (session == null || !session.getTransportSession().endReceived()))
{
@@ -878,14 +919,24 @@ public class TransportImpl extends Endpo
private void processClose()
{
- if(_connectionEndpoint != null && _connectionEndpoint.getLocalState() == EndpointState.CLOSED && !_isCloseSent)
- {
+ if ((_tail_error != null ||
+ (_connectionEndpoint != null &&
+ _connectionEndpoint.getLocalState() == EndpointState.CLOSED)) &&
+ !_isCloseSent) {
if(!hasSendableMessages(null))
{
Close close = new Close();
- ErrorCondition localError = _connectionEndpoint.getCondition();
- if( localError.getCondition() !=null )
+ ErrorCondition localError;
+
+ if (_connectionEndpoint == null) {
+ localError = new ErrorCondition(ConnectionError.FRAMING_ERROR,
+ _tail_error.toString());
+ } else {
+ localError = _connectionEndpoint.getCondition();
+ }
+
+ if(localError.getCondition() != null)
{
close.setError(localError);
}
@@ -893,6 +944,10 @@ public class TransportImpl extends Endpo
_isCloseSent = true;
writeFrame(0, close, null, null);
+
+ if (_connectionEndpoint != null) {
+ _connectionEndpoint.clearModified();
+ }
}
}
}
@@ -912,10 +967,10 @@ public class TransportImpl extends Endpo
}
@Override
- public void free()
- {
- super.free();
- }
+ void postFinal() {}
+
+ @Override
+ void doFree() { }
//==================================================================================================================
// handle incoming amqp data
@@ -967,6 +1022,9 @@ public class TransportImpl extends Endpo
{
// TODO check null
transportSession = _localSessions.get(begin.getRemoteChannel().intValue());
+ if (transportSession == null) {
+ throw new NullPointerException("uncorrelated channel: " + begin.getRemoteChannel());
+ }
session = transportSession.getSession();
}
@@ -975,10 +1033,7 @@ public class TransportImpl extends Endpo
transportSession.setNextIncomingId(begin.getNextOutgoingId());
_remoteSessions.put(channel, transportSession);
- EventImpl ev = _connectionEndpoint.put(Event.Type.SESSION_REMOTE_STATE);
- if (ev != null) {
- ev.init(session);
- }
+ _connectionEndpoint.put(Event.Type.SESSION_REMOTE_OPEN, session);
}
}
@@ -1034,10 +1089,7 @@ public class TransportImpl extends Endpo
}
- EventImpl ev = _connectionEndpoint.put(Event.Type.LINK_REMOTE_STATE);
- if (ev != null) {
- ev.init(link);
- }
+ _connectionEndpoint.put(Event.Type.LINK_REMOTE_OPEN, link);
}
}
@@ -1102,16 +1154,13 @@ public class TransportImpl extends Endpo
LinkImpl link = transportLink.getLink();
transportLink.receivedDetach();
transportSession.freeRemoteHandle(transportLink.getRemoteHandle());
+ _connectionEndpoint.put(Event.Type.LINK_REMOTE_CLOSE, link);
+ transportLink.clearRemoteHandle();
link.setRemoteState(EndpointState.CLOSED);
if(detach.getError() != null)
{
link.getRemoteCondition().copyFrom(detach.getError());
}
-
- EventImpl ev = _connectionEndpoint.put(Event.Type.LINK_REMOTE_STATE);
- if (ev != null) {
- ev.init(link);
- }
}
else
{
@@ -1140,10 +1189,7 @@ public class TransportImpl extends Endpo
session.getRemoteCondition().copyFrom(errorCondition);
}
- EventImpl ev = _connectionEndpoint.put(Event.Type.SESSION_REMOTE_STATE);
- if (ev != null) {
- ev.init(session);
- }
+ _connectionEndpoint.put(Event.Type.SESSION_REMOTE_CLOSE, session);
}
}
@@ -1160,10 +1206,7 @@ public class TransportImpl extends Endpo
_connectionEndpoint.getRemoteCondition().copyFrom(close.getError());
}
- EventImpl ev = _connectionEndpoint.put(Event.Type.CONNECTION_REMOTE_STATE);
- if (ev != null) {
- ev.init(_connectionEndpoint);
- }
+ _connectionEndpoint.put(Event.Type.CONNECTION_REMOTE_CLOSE, _connectionEndpoint);
}
}
@@ -1176,11 +1219,12 @@ public class TransportImpl extends Endpo
throw new IllegalStateException("Transport cannot accept frame: " + frame);
}
- log(this, INCOMING, frame);
+ log(INCOMING, frame);
- if( _protocolTracer != null )
+ ProtocolTracer tracer = _protocolTracer.get();
+ if( tracer != null )
{
- _protocolTracer.receivedFrame(frame);
+ tracer.receivedFrame(frame);
}
frame.getBody().invoke(this,frame.getPayload(), frame.getChannel());
@@ -1188,10 +1232,15 @@ public class TransportImpl extends Endpo
}
@Override
- public void closed()
+ public void closed(TransportException error)
{
- if (!_closeReceived) {
- throw new TransportException("connection aborted");
+ if (!_closeReceived || error != null) {
+ if (error == null) {
+ _tail_error = new TransportException("connection aborted");
+ } else {
+ _tail_error = error;
+ }
+ _head_closed = true;
}
}
@@ -1204,13 +1253,13 @@ public class TransportImpl extends Endpo
@Override
public ProtocolTracer getProtocolTracer()
{
- return _protocolTracer;
+ return _protocolTracer.get();
}
@Override
public void setProtocolTracer(ProtocolTracer protocolTracer)
{
- this._protocolTracer = protocolTracer;
+ this._protocolTracer.set(protocolTracer);
}
@Override
@@ -1260,6 +1309,8 @@ public class TransportImpl extends Endpo
@Override
public void process() throws TransportException
{
+ _processingStarted = true;
+
try {
init();
_inputProcessor.process();
@@ -1303,6 +1354,12 @@ public class TransportImpl extends Endpo
_outputProcessor.close_head();
}
+ public boolean isClosed() {
+ int p = pending();
+ int c = capacity();
+ return p == END_OF_STREAM && c == END_OF_STREAM;
+ }
+
@Override
public String toString()
{
@@ -1337,21 +1394,11 @@ public class TransportImpl extends Endpo
static String INCOMING = "<-";
static String OUTGOING = "->";
- private static final boolean getBooleanEnv(String name)
+ void log(String event, TransportFrame frame)
{
- String value = System.getenv(name);
- return "true".equalsIgnoreCase(value) ||
- "1".equals(value) ||
- "yes".equalsIgnoreCase(value);
- }
-
- private static final boolean ENABLED = getBooleanEnv("PN_TRACE_FRM");
-
- static void log(Object ctx, String event, TransportFrame frame)
- {
- if (ENABLED) {
+ if ((_levels & TRACE_FRM) != 0) {
StringBuilder msg = new StringBuilder();
- msg.append("[").append(System.identityHashCode(ctx)).append(":")
+ msg.append("[").append(System.identityHashCode(this)).append(":")
.append(frame.getChannel()).append("]");
msg.append(" ").append(event).append(" ").append(frame.getBody());
if (frame.getPayload() != null) {
@@ -1366,7 +1413,8 @@ public class TransportImpl extends Endpo
}
@Override
- protected void localStateChanged()
- {
- }
+ void localOpen() {}
+
+ @Override
+ void localClose() {}
}
Modified: qpid/proton/branches/fadams-javascript-binding/proton-j/src/main/java/org/apache/qpid/proton/engine/impl/TransportLink.java
URL: http://svn.apache.org/viewvc/qpid/proton/branches/fadams-javascript-binding/proton-j/src/main/java/org/apache/qpid/proton/engine/impl/TransportLink.java?rev=1622849&r1=1622848&r2=1622849&view=diff
==============================================================================
--- qpid/proton/branches/fadams-javascript-binding/proton-j/src/main/java/org/apache/qpid/proton/engine/impl/TransportLink.java (original)
+++ qpid/proton/branches/fadams-javascript-binding/proton-j/src/main/java/org/apache/qpid/proton/engine/impl/TransportLink.java Sat Sep 6 11:23:10 2014
@@ -51,6 +51,12 @@ class TransportLink<T extends LinkImpl>
: new TransportSender((SenderImpl)link));
}
+ void unbind()
+ {
+ clearLocalHandle();
+ clearRemoteHandle();
+ }
+
public UnsignedInteger getLocalHandle()
{
return _localHandle;
@@ -58,6 +64,9 @@ class TransportLink<T extends LinkImpl>
public void setLocalHandle(UnsignedInteger localHandle)
{
+ if (_localHandle == null) {
+ _link.incref();
+ }
_localHandle = localHandle;
}
@@ -78,6 +87,9 @@ class TransportLink<T extends LinkImpl>
public void clearLocalHandle()
{
+ if (_localHandle != null) {
+ _link.decref();
+ }
_localHandle = null;
}
@@ -88,9 +100,20 @@ class TransportLink<T extends LinkImpl>
public void setRemoteHandle(UnsignedInteger remoteHandle)
{
+ if (_remoteHandle == null) {
+ _link.incref();
+ }
_remoteHandle = remoteHandle;
}
+ public void clearRemoteHandle()
+ {
+ if (_remoteHandle != null) {
+ _link.decref();
+ }
+ _remoteHandle = null;
+ }
+
public UnsignedInteger getDeliveryCount()
{
return _deliveryCount;
@@ -122,10 +145,7 @@ class TransportLink<T extends LinkImpl>
_remoteLinkCredit = flow.getLinkCredit();
- EventImpl ev = _link.getConnectionImpl().put(Event.Type.LINK_FLOW);
- if (ev != null) {
- ev.init(_link);
- }
+ _link.getConnectionImpl().put(Event.Type.LINK_FLOW, _link);
}
void setLinkCredit(UnsignedInteger linkCredit)
Modified: qpid/proton/branches/fadams-javascript-binding/proton-j/src/main/java/org/apache/qpid/proton/engine/impl/TransportOutputAdaptor.java
URL: http://svn.apache.org/viewvc/qpid/proton/branches/fadams-javascript-binding/proton-j/src/main/java/org/apache/qpid/proton/engine/impl/TransportOutputAdaptor.java?rev=1622849&r1=1622848&r2=1622849&view=diff
==============================================================================
--- qpid/proton/branches/fadams-javascript-binding/proton-j/src/main/java/org/apache/qpid/proton/engine/impl/TransportOutputAdaptor.java (original)
+++ qpid/proton/branches/fadams-javascript-binding/proton-j/src/main/java/org/apache/qpid/proton/engine/impl/TransportOutputAdaptor.java Sat Sep 6 11:23:10 2014
@@ -26,23 +26,20 @@ import org.apache.qpid.proton.engine.Tra
class TransportOutputAdaptor implements TransportOutput
{
- private TransportOutputWriter _transportOutputWriter;
+ private static final ByteBuffer _emptyHead = newReadableBuffer(0).asReadOnlyBuffer();
- private final ByteBuffer _outputBuffer;
- private final ByteBuffer _head;
+ private final TransportOutputWriter _transportOutputWriter;
+ private final int _maxFrameSize;
+
+ private ByteBuffer _outputBuffer = null;
+ private ByteBuffer _head = null;
private boolean _output_done = false;
private boolean _head_closed = false;
TransportOutputAdaptor(TransportOutputWriter transportOutputWriter, int maxFrameSize)
{
_transportOutputWriter = transportOutputWriter;
- if (maxFrameSize > 0) {
- _outputBuffer = newWriteableBuffer(maxFrameSize);
- } else {
- _outputBuffer = newWriteableBuffer(4*1024);
- }
- _head = _outputBuffer.asReadOnlyBuffer();
- _head.limit(0);
+ _maxFrameSize = maxFrameSize > 0 ? maxFrameSize : 4*1024;
}
@Override
@@ -52,13 +49,26 @@ class TransportOutputAdaptor implements
return Transport.END_OF_STREAM;
}
+ if(_outputBuffer == null)
+ {
+ init_buffers();
+ }
+
_output_done = _transportOutputWriter.writeInto(_outputBuffer);
_head.limit(_outputBuffer.position());
- if (_output_done && _outputBuffer.position() == 0) {
+ if (_outputBuffer.position() == 0 && _outputBuffer.capacity() > TransportImpl.BUFFER_RELEASE_THRESHOLD)
+ {
+ release_buffers();
+ }
+
+ if (_output_done && (_outputBuffer == null || _outputBuffer.position() == 0))
+ {
return Transport.END_OF_STREAM;
- } else {
- return _outputBuffer.position();
+ }
+ else
+ {
+ return _outputBuffer == null ? 0 : _outputBuffer.position();
}
}
@@ -66,24 +76,40 @@ class TransportOutputAdaptor implements
public ByteBuffer head()
{
pending();
- return _head;
+ return _head != null ? _head : _emptyHead;
}
@Override
public void pop(int bytes)
{
- _outputBuffer.flip();
- _outputBuffer.position(bytes);
- _outputBuffer.compact();
- _head.position(0);
- _head.limit(_outputBuffer.position());
+ if (_outputBuffer != null) {
+ _outputBuffer.flip();
+ _outputBuffer.position(bytes);
+ _outputBuffer.compact();
+ _head.position(0);
+ _head.limit(_outputBuffer.position());
+ if (_outputBuffer.position() == 0 && _outputBuffer.capacity() > TransportImpl.BUFFER_RELEASE_THRESHOLD) {
+ release_buffers();
+ }
+ }
}
@Override
public void close_head()
{
_head_closed = true;
- _transportOutputWriter.closed();
+ _transportOutputWriter.closed(null);
+ release_buffers();
+ }
+
+ private void init_buffers() {
+ _outputBuffer = newWriteableBuffer(_maxFrameSize);
+ _head = _outputBuffer.asReadOnlyBuffer();
+ _head.limit(0);
}
+ private void release_buffers() {
+ _head = null;
+ _outputBuffer = null;
+ }
}
Modified: qpid/proton/branches/fadams-javascript-binding/proton-j/src/main/java/org/apache/qpid/proton/engine/impl/TransportOutputWriter.java
URL: http://svn.apache.org/viewvc/qpid/proton/branches/fadams-javascript-binding/proton-j/src/main/java/org/apache/qpid/proton/engine/impl/TransportOutputWriter.java?rev=1622849&r1=1622848&r2=1622849&view=diff
==============================================================================
--- qpid/proton/branches/fadams-javascript-binding/proton-j/src/main/java/org/apache/qpid/proton/engine/impl/TransportOutputWriter.java (original)
+++ qpid/proton/branches/fadams-javascript-binding/proton-j/src/main/java/org/apache/qpid/proton/engine/impl/TransportOutputWriter.java Sat Sep 6 11:23:10 2014
@@ -20,6 +20,8 @@ package org.apache.qpid.proton.engine.im
import java.nio.ByteBuffer;
+import org.apache.qpid.proton.engine.TransportException;
+
interface TransportOutputWriter
{
/**
@@ -28,6 +30,6 @@ interface TransportOutputWriter
*/
boolean writeInto(ByteBuffer outputBuffer);
- void closed();
+ void closed(TransportException error);
}
Modified: qpid/proton/branches/fadams-javascript-binding/proton-j/src/main/java/org/apache/qpid/proton/engine/impl/TransportSession.java
URL: http://svn.apache.org/viewvc/qpid/proton/branches/fadams-javascript-binding/proton-j/src/main/java/org/apache/qpid/proton/engine/impl/TransportSession.java?rev=1622849&r1=1622848&r2=1622849&view=diff
==============================================================================
--- qpid/proton/branches/fadams-javascript-binding/proton-j/src/main/java/org/apache/qpid/proton/engine/impl/TransportSession.java (original)
+++ qpid/proton/branches/fadams-javascript-binding/proton-j/src/main/java/org/apache/qpid/proton/engine/impl/TransportSession.java Sat Sep 6 11:23:10 2014
@@ -69,6 +69,12 @@ class TransportSession
_session = session;
}
+ void unbind()
+ {
+ unsetLocalChannel();
+ unsetRemoteChannel();
+ }
+
public SessionImpl getSession()
{
return _session;
@@ -81,6 +87,9 @@ class TransportSession
public void setLocalChannel(int localChannel)
{
+ if (!isLocalChannelSet()) {
+ _session.incref();
+ }
_localChannel = localChannel;
}
@@ -91,6 +100,9 @@ class TransportSession
public void setRemoteChannel(int remoteChannel)
{
+ if (!isRemoteChannelSet()) {
+ _session.incref();
+ }
_remoteChannel = remoteChannel;
}
@@ -116,11 +128,17 @@ class TransportSession
public void unsetLocalChannel()
{
+ if (isLocalChannelSet()) {
+ _session.decref();
+ }
_localChannel = -1;
}
public void unsetRemoteChannel()
{
+ if (isRemoteChannelSet()) {
+ _session.decref();
+ }
_remoteChannel = -1;
}
@@ -262,7 +280,7 @@ class TransportSession
_unsettledIncomingDeliveriesById.put(_incomingDeliveryId, delivery);
getSession().incrementIncomingDeliveries(1);
}
- if( transfer.getState()!=null )
+ if( transfer.getState()!=null )
{
delivery.setRemoteDeliveryState(transfer.getState());
}
@@ -308,15 +326,12 @@ class TransportSession
delivery.getLink().modified(false);
}
- EventImpl ev = getSession().getConnection().put(Event.Type.DELIVERY);
- if (ev != null) {
- ev.init(delivery);
- }
+ getSession().getConnection().put(Event.Type.DELIVERY, delivery);
}
public void freeLocalChannel()
{
- _localChannel = -1;
+ unsetLocalChannel();
}
private void setRemoteIncomingWindow(UnsignedInteger incomingWindow)
@@ -394,10 +409,7 @@ class TransportSession
}
delivery.updateWork();
- EventImpl ev = getSession().getConnection().put(Event.Type.DELIVERY);
- if (ev != null) {
- ev.init(delivery);
- }
+ getSession().getConnection().put(Event.Type.DELIVERY, delivery);
}
id = id.add(UnsignedInteger.ONE);
}
Modified: qpid/proton/branches/fadams-javascript-binding/proton-j/src/main/java/org/apache/qpid/proton/engine/impl/ssl/SimpleSslTransportWrapper.java
URL: http://svn.apache.org/viewvc/qpid/proton/branches/fadams-javascript-binding/proton-j/src/main/java/org/apache/qpid/proton/engine/impl/ssl/SimpleSslTransportWrapper.java?rev=1622849&r1=1622848&r2=1622849&view=diff
==============================================================================
--- qpid/proton/branches/fadams-javascript-binding/proton-j/src/main/java/org/apache/qpid/proton/engine/impl/ssl/SimpleSslTransportWrapper.java (original)
+++ qpid/proton/branches/fadams-javascript-binding/proton-j/src/main/java/org/apache/qpid/proton/engine/impl/ssl/SimpleSslTransportWrapper.java Sat Sep 6 11:23:10 2014
@@ -338,22 +338,13 @@ public class SimpleSslTransportWrapper i
_inputBuffer.flip();
- try
- {
- try {
- unwrapInput();
- } catch (SSLException e) {
- throw new TransportException(e);
- }
- }
- catch (TransportException e)
- {
+ try {
+ unwrapInput();
+ } catch (SSLException e) {
+ _logger.log(Level.WARNING, e.getMessage());
_inputBuffer.position(_inputBuffer.limit());
_tail_closed = true;
- throw e;
- }
- finally
- {
+ } finally {
_inputBuffer.compact();
}
}
@@ -374,17 +365,17 @@ public class SimpleSslTransportWrapper i
try {
wrapOutput();
} catch (SSLException e) {
- throw new TransportException(e);
+ _logger.log(Level.WARNING, e.getMessage());
+ _head_closed = true;
}
_head.limit(_outputBuffer.position());
- if (_head_closed && _outputBuffer.position() == 0)
- {
+ if (_head_closed && _outputBuffer.position() == 0) {
return Transport.END_OF_STREAM;
- } else {
- return _outputBuffer.position();
}
+
+ return _outputBuffer.position();
}
@Override
@@ -408,6 +399,10 @@ public class SimpleSslTransportWrapper i
public void close_head()
{
_underlyingOutput.close_head();
+ int p = pending();
+ if (p > 0) {
+ pop(p);
+ }
}
Modified: qpid/proton/branches/fadams-javascript-binding/proton-j/src/main/java/org/apache/qpid/proton/engine/impl/ssl/SslDomainImpl.java
URL: http://svn.apache.org/viewvc/qpid/proton/branches/fadams-javascript-binding/proton-j/src/main/java/org/apache/qpid/proton/engine/impl/ssl/SslDomainImpl.java?rev=1622849&r1=1622848&r2=1622849&view=diff
==============================================================================
--- qpid/proton/branches/fadams-javascript-binding/proton-j/src/main/java/org/apache/qpid/proton/engine/impl/ssl/SslDomainImpl.java (original)
+++ qpid/proton/branches/fadams-javascript-binding/proton-j/src/main/java/org/apache/qpid/proton/engine/impl/ssl/SslDomainImpl.java Sat Sep 6 11:23:10 2014
@@ -19,7 +19,6 @@
package org.apache.qpid.proton.engine.impl.ssl;
import org.apache.qpid.proton.ProtonUnsupportedOperationException;
-import org.apache.qpid.proton.engine.EngineFactory;
import org.apache.qpid.proton.engine.ProtonJSslDomain;
import org.apache.qpid.proton.engine.SslDomain;
import org.apache.qpid.proton.engine.SslPeerDetails;
Modified: qpid/proton/branches/fadams-javascript-binding/proton-j/src/main/java/org/apache/qpid/proton/engine/impl/ssl/SslPeerDetailsImpl.java
URL: http://svn.apache.org/viewvc/qpid/proton/branches/fadams-javascript-binding/proton-j/src/main/java/org/apache/qpid/proton/engine/impl/ssl/SslPeerDetailsImpl.java?rev=1622849&r1=1622848&r2=1622849&view=diff
==============================================================================
--- qpid/proton/branches/fadams-javascript-binding/proton-j/src/main/java/org/apache/qpid/proton/engine/impl/ssl/SslPeerDetailsImpl.java (original)
+++ qpid/proton/branches/fadams-javascript-binding/proton-j/src/main/java/org/apache/qpid/proton/engine/impl/ssl/SslPeerDetailsImpl.java Sat Sep 6 11:23:10 2014
@@ -18,7 +18,6 @@
*/
package org.apache.qpid.proton.engine.impl.ssl;
-import org.apache.qpid.proton.engine.EngineFactory;
import org.apache.qpid.proton.engine.ProtonJSslPeerDetails;
Modified: qpid/proton/branches/fadams-javascript-binding/proton-j/src/main/java/org/apache/qpid/proton/message/Message.java
URL: http://svn.apache.org/viewvc/qpid/proton/branches/fadams-javascript-binding/proton-j/src/main/java/org/apache/qpid/proton/message/Message.java?rev=1622849&r1=1622848&r2=1622849&view=diff
==============================================================================
--- qpid/proton/branches/fadams-javascript-binding/proton-j/src/main/java/org/apache/qpid/proton/message/Message.java (original)
+++ qpid/proton/branches/fadams-javascript-binding/proton-j/src/main/java/org/apache/qpid/proton/message/Message.java Sat Sep 6 11:23:10 2014
@@ -28,6 +28,8 @@ import org.apache.qpid.proton.amqp.messa
import org.apache.qpid.proton.amqp.messaging.Properties;
import org.apache.qpid.proton.amqp.messaging.Section;
+import org.apache.qpid.proton.message.impl.MessageImpl;
+
/**
* Represents a Message within Proton.
*
@@ -36,6 +38,27 @@ import org.apache.qpid.proton.amqp.messa
*/
public interface Message
{
+
+ public static final class Factory
+ {
+ public static Message create() {
+ return new MessageImpl();
+ }
+
+ public static Message create(Header header,
+ DeliveryAnnotations deliveryAnnotations,
+ MessageAnnotations messageAnnotations,
+ Properties properties,
+ ApplicationProperties applicationProperties,
+ Section body,
+ Footer footer) {
+ return new MessageImpl(header, deliveryAnnotations,
+ messageAnnotations, properties,
+ applicationProperties, body, footer);
+ }
+ }
+
+
short DEFAULT_PRIORITY = 4;
boolean isDurable();
Modified: qpid/proton/branches/fadams-javascript-binding/proton-j/src/main/java/org/apache/qpid/proton/message/impl/MessageImpl.java
URL: http://svn.apache.org/viewvc/qpid/proton/branches/fadams-javascript-binding/proton-j/src/main/java/org/apache/qpid/proton/message/impl/MessageImpl.java?rev=1622849&r1=1622848&r2=1622849&view=diff
==============================================================================
--- qpid/proton/branches/fadams-javascript-binding/proton-j/src/main/java/org/apache/qpid/proton/message/impl/MessageImpl.java (original)
+++ qpid/proton/branches/fadams-javascript-binding/proton-j/src/main/java/org/apache/qpid/proton/message/impl/MessageImpl.java Sat Sep 6 11:23:10 2014
@@ -573,8 +573,15 @@ public class MessageImpl implements Prot
@Override
public int decode(byte[] data, int offset, int length)
{
- DecoderImpl decoder = tlsCodec.get().decoder;
final ByteBuffer buffer = ByteBuffer.wrap(data, offset, length);
+ decode(buffer);
+
+ return length-buffer.remaining();
+ }
+
+ public void decode(ByteBuffer buffer)
+ {
+ DecoderImpl decoder = tlsCodec.get().decoder;
decoder.setByteBuffer(buffer);
_header = null;
@@ -680,9 +687,6 @@ public class MessageImpl implements Prot
}
decoder.setByteBuffer(null);
-
- return length-buffer.remaining();
-
}
@Override
Modified: qpid/proton/branches/fadams-javascript-binding/proton-j/src/main/java/org/apache/qpid/proton/messenger/Messenger.java
URL: http://svn.apache.org/viewvc/qpid/proton/branches/fadams-javascript-binding/proton-j/src/main/java/org/apache/qpid/proton/messenger/Messenger.java?rev=1622849&r1=1622848&r2=1622849&view=diff
==============================================================================
--- qpid/proton/branches/fadams-javascript-binding/proton-j/src/main/java/org/apache/qpid/proton/messenger/Messenger.java (original)
+++ qpid/proton/branches/fadams-javascript-binding/proton-j/src/main/java/org/apache/qpid/proton/messenger/Messenger.java Sat Sep 6 11:23:10 2014
@@ -25,6 +25,8 @@ import java.io.IOException;
import org.apache.qpid.proton.TimeoutException;
import org.apache.qpid.proton.message.Message;
+import org.apache.qpid.proton.messenger.impl.MessengerImpl;
+
/**
*
* Messenger defines a high level interface for sending and receiving
@@ -69,6 +71,18 @@ import org.apache.qpid.proton.message.Me
*/
public interface Messenger
{
+
+ public static final class Factory
+ {
+ public static Messenger create() {
+ return new MessengerImpl();
+ }
+
+ public static Messenger create(String name) {
+ return new MessengerImpl(name);
+ }
+ }
+
/**
* Flag for use with reject(), accept() and settle() methods.
*/
Modified: qpid/proton/branches/fadams-javascript-binding/proton-j/src/main/java/org/apache/qpid/proton/messenger/impl/MessengerImpl.java
URL: http://svn.apache.org/viewvc/qpid/proton/branches/fadams-javascript-binding/proton-j/src/main/java/org/apache/qpid/proton/messenger/impl/MessengerImpl.java?rev=1622849&r1=1622848&r2=1622849&view=diff
==============================================================================
--- qpid/proton/branches/fadams-javascript-binding/proton-j/src/main/java/org/apache/qpid/proton/messenger/impl/MessengerImpl.java (original)
+++ qpid/proton/branches/fadams-javascript-binding/proton-j/src/main/java/org/apache/qpid/proton/messenger/impl/MessengerImpl.java Sat Sep 6 11:23:10 2014
@@ -29,17 +29,14 @@ import java.util.logging.Level;
import java.util.logging.Logger;
import org.apache.qpid.proton.Proton;
-import org.apache.qpid.proton.ProtonFactoryLoader;
import org.apache.qpid.proton.InterruptException;
import org.apache.qpid.proton.TimeoutException;
import org.apache.qpid.proton.driver.Connector;
import org.apache.qpid.proton.driver.Driver;
-import org.apache.qpid.proton.driver.DriverFactory;
import org.apache.qpid.proton.driver.Listener;
import org.apache.qpid.proton.engine.Connection;
import org.apache.qpid.proton.engine.Delivery;
import org.apache.qpid.proton.engine.EndpointState;
-import org.apache.qpid.proton.engine.EngineFactory;
import org.apache.qpid.proton.engine.Link;
import org.apache.qpid.proton.engine.Receiver;
import org.apache.qpid.proton.engine.Sasl;
@@ -49,10 +46,8 @@ import org.apache.qpid.proton.engine.Ssl
import org.apache.qpid.proton.engine.Ssl;
import org.apache.qpid.proton.engine.Transport;
import org.apache.qpid.proton.message.Message;
-import org.apache.qpid.proton.message.MessageFactory;
import org.apache.qpid.proton.messenger.Messenger;
import org.apache.qpid.proton.messenger.MessengerException;
-import org.apache.qpid.proton.messenger.MessengerFactory;
import org.apache.qpid.proton.messenger.Status;
import org.apache.qpid.proton.messenger.Tracker;
import org.apache.qpid.proton.amqp.messaging.Source;
@@ -1449,14 +1444,16 @@ public class MessengerImpl implements Me
{
_receivers++;
_blocked.add((Receiver)link);
+ link.setContext(Boolean.TRUE);
}
}
// a link is being removed, account for it.
private void linkRemoved(Link _link)
{
- if (_link instanceof Receiver)
+ if (_link instanceof Receiver && (Boolean) _link.getContext())
{
+ _link.setContext(Boolean.FALSE);
Receiver link = (Receiver)_link;
assert _receivers > 0;
_receivers--;
Modified: qpid/proton/branches/fadams-javascript-binding/proton-j/src/main/resources/cengine.py
URL: http://svn.apache.org/viewvc/qpid/proton/branches/fadams-javascript-binding/proton-j/src/main/resources/cengine.py?rev=1622849&r1=1622848&r2=1622849&view=diff
==============================================================================
--- qpid/proton/branches/fadams-javascript-binding/proton-j/src/main/resources/cengine.py (original)
+++ qpid/proton/branches/fadams-javascript-binding/proton-j/src/main/resources/cengine.py Sat Sep 6 11:23:10 2014
@@ -25,7 +25,7 @@ from org.apache.qpid.proton.amqp.transac
from org.apache.qpid.proton.amqp.transport import ErrorCondition, \
SenderSettleMode, ReceiverSettleMode
from org.apache.qpid.proton.engine import EndpointState, Sender, \
- Receiver, TransportException
+ Receiver, Transport, TransportException
from java.util import EnumSet
from jarray import array, zeros
@@ -57,10 +57,10 @@ PN_NONDURABLE = 0
PN_CONFIGURATION = 1
PN_DELIVERIES = 2
-PN_LINK_CLOSE = 0
-PN_SESSION_CLOSE = 1
-PN_CONNECTION_CLOSE = 2
-PN_NEVER = 3
+PN_EXPIRE_WITH_LINK = 0
+PN_EXPIRE_WITH_SESSION = 1
+PN_EXPIRE_WITH_CONNECTION = 2
+PN_EXPIRE_NEVER = 3
PN_DIST_MODE_UNSPECIFIED = 0
PN_DIST_MODE_COPY = 1
@@ -72,10 +72,10 @@ PN_REJECTED = (0x0000000000000025)
PN_RELEASED = (0x0000000000000026)
PN_MODIFIED = (0x0000000000000027)
-PN_TRACE_OFF = (0)
-PN_TRACE_RAW = (1)
-PN_TRACE_FRM = (2)
-PN_TRACE_DRV = (4)
+PN_TRACE_OFF = Transport.TRACE_OFF
+PN_TRACE_RAW = Transport.TRACE_RAW
+PN_TRACE_FRM = Transport.TRACE_FRM
+PN_TRACE_DRV = Transport.TRACE_DRV
def wrap(obj, wrapper):
if obj:
@@ -98,7 +98,11 @@ class pn_condition:
self.description = None
self.info.clear()
else:
- self.name = impl.getCondition().toString()
+ cond = impl.getCondition()
+ if cond is None:
+ self.name = None
+ else:
+ self.name = cond.toString()
self.description = impl.getDescription()
obj2dat(impl.getInfo(), self.info)
@@ -222,6 +226,9 @@ def pn_connection_set_container(conn, na
def pn_connection_remote_container(conn):
return conn.impl.getRemoteContainer()
+def pn_connection_get_hostname(conn):
+ return conn.impl.getHostname()
+
def pn_connection_set_hostname(conn, name):
conn.impl.setHostname(name)
@@ -244,6 +251,9 @@ def pn_connection_close(conn):
conn.on_close()
conn.impl.close()
+def pn_connection_free(conn):
+ conn.impl.free()
+
class pn_session_wrapper(endpoint_wrapper):
pass
@@ -325,7 +335,7 @@ def pn_receiver(ssn, name):
return wrap(ssn.impl.receiver(name), pn_link_wrapper)
def pn_session_free(ssn):
- ssn.impl = None
+ ssn.impl.free()
TERMINUS_TYPES_J2P = {
Source: PN_SOURCE,
@@ -354,17 +364,17 @@ DURABILITY_J2P = {
}
EXPIRY_POLICY_P2J = {
- PN_LINK_CLOSE: TerminusExpiryPolicy.LINK_DETACH,
- PN_SESSION_CLOSE: TerminusExpiryPolicy.SESSION_END,
- PN_CONNECTION_CLOSE: TerminusExpiryPolicy.CONNECTION_CLOSE,
- PN_NEVER: TerminusExpiryPolicy.NEVER
+ PN_EXPIRE_WITH_LINK: TerminusExpiryPolicy.LINK_DETACH,
+ PN_EXPIRE_WITH_SESSION: TerminusExpiryPolicy.SESSION_END,
+ PN_EXPIRE_WITH_CONNECTION: TerminusExpiryPolicy.CONNECTION_CLOSE,
+ PN_EXPIRE_NEVER: TerminusExpiryPolicy.NEVER
}
EXPIRY_POLICY_J2P = {
- TerminusExpiryPolicy.LINK_DETACH: PN_LINK_CLOSE,
- TerminusExpiryPolicy.SESSION_END: PN_SESSION_CLOSE,
- TerminusExpiryPolicy.CONNECTION_CLOSE: PN_CONNECTION_CLOSE,
- TerminusExpiryPolicy.NEVER: PN_NEVER
+ TerminusExpiryPolicy.LINK_DETACH: PN_EXPIRE_WITH_LINK,
+ TerminusExpiryPolicy.SESSION_END: PN_EXPIRE_WITH_SESSION,
+ TerminusExpiryPolicy.CONNECTION_CLOSE: PN_EXPIRE_WITH_CONNECTION,
+ TerminusExpiryPolicy.NEVER: PN_EXPIRE_NEVER
}
DISTRIBUTION_MODE_P2J = {
@@ -385,7 +395,7 @@ class pn_terminus:
self.type = type
self.address = None
self.durability = PN_NONDURABLE
- self.expiry_policy = PN_SESSION_CLOSE
+ self.expiry_policy = PN_EXPIRE_WITH_SESSION
self.distribution_mode = PN_DIST_MODE_UNSPECIFIED
self.timeout = 0
self.dynamic = False
@@ -587,6 +597,9 @@ def pn_link_remote_rcv_settle_mode(link)
def pn_link_is_sender(link):
return isinstance(link.impl, Sender)
+def pn_link_is_receiver(link):
+ return isinstance(link.impl, Receiver)
+
def pn_link_head(conn, mask):
local, remote = mask2set(mask)
return wrap(conn.impl.linkHead(local, remote), pn_link_wrapper)
@@ -652,7 +665,7 @@ def pn_link_current(link):
return wrap(link.impl.current(), pn_delivery_wrapper)
def pn_link_free(link):
- link.impl = None
+ link.impl.free()
def pn_work_head(conn):
return wrap(conn.impl.getWorkHead(), pn_delivery_wrapper)
@@ -802,6 +815,9 @@ def pn_delivery_get_context(dlv):
def pn_delivery_set_context(dlv, ctx):
dlv.context = ctx
+def pn_delivery_partial(dlv):
+ return dlv.impl.isPartial()
+
def pn_delivery_pending(dlv):
return dlv.impl.pending()
@@ -847,7 +863,6 @@ class pn_transport_wrapper:
def __init__(self, impl):
self.impl = impl
- self.error = pn_error(0, None)
def pn_transport():
return wrap(Proton.transport(), pn_transport_wrapper)
@@ -877,15 +892,15 @@ def pn_transport_bind(trans, conn):
trans.impl.bind(conn.impl)
return 0
+def pn_transport_unbind(trans):
+ trans.impl.unbind()
+ return 0
+
def pn_transport_trace(trans, n):
- # XXX
- pass
+ trans.impl.trace(n)
def pn_transport_pending(trans):
- try:
- return trans.impl.pending()
- except TransportException, e:
- return trans.error.set(PN_ERR, str(e))
+ return trans.impl.pending()
def pn_transport_peek(trans, size):
size = min(trans.impl.pending(), size)
@@ -893,6 +908,7 @@ def pn_transport_peek(trans, size):
if size:
bb = trans.impl.head()
bb.get(ba)
+ bb.position(0)
return 0, ba.tostring()
def pn_transport_pop(trans, size):
@@ -906,47 +922,51 @@ def pn_transport_push(trans, input):
if cap < 0:
return cap
elif len(input) > cap:
- return PN_OVERFLOW
- else:
- bb = trans.impl.tail()
- bb.put(array(input, 'b'))
- try:
- trans.impl.process()
- return 0
- except TransportException, e:
- trans.error = pn_error(PN_ERR, str(e))
- return PN_ERR
+ input = input[:cap]
+
+ bb = trans.impl.tail()
+ bb.put(array(input, 'b'))
+ trans.impl.process()
+ return len(input)
def pn_transport_close_head(trans):
- try:
- trans.impl.close_head()
- return 0
- except TransportException, e:
- trans.error = pn_error(PN_ERR, str(e))
- return PN_ERR
+ trans.impl.close_head()
+ return 0
def pn_transport_close_tail(trans):
- try:
- trans.impl.close_tail()
- return 0
- except TransportException, e:
- trans.error = pn_error(PN_ERR, str(e))
- return PN_ERR
+ trans.impl.close_tail()
+ return 0
-def pn_transport_error(trans):
- return trans.error
+def pn_transport_closed(trans):
+ return trans.impl.isClosed()
from org.apache.qpid.proton.engine import Event
-PN_EVENT_CATEGORY_PROTOCOL = Event.Category.PROTOCOL
-
-PN_CONNECTION_LOCAL_STATE = Event.Type.CONNECTION_LOCAL_STATE
-PN_CONNECTION_REMOTE_STATE = Event.Type.CONNECTION_REMOTE_STATE
-PN_SESSION_LOCAL_STATE = Event.Type.SESSION_LOCAL_STATE
-PN_SESSION_REMOTE_STATE = Event.Type.SESSION_REMOTE_STATE
-PN_LINK_LOCAL_STATE = Event.Type.LINK_LOCAL_STATE
-PN_LINK_REMOTE_STATE = Event.Type.LINK_REMOTE_STATE
+PN_EVENT_CATEGORY_CONNECTION = Event.Category.CONNECTION
+PN_EVENT_CATEGORY_SESSION = Event.Category.SESSION
+PN_EVENT_CATEGORY_LINK = Event.Category.LINK
+PN_EVENT_CATEGORY_DELIVERY = Event.Category.DELIVERY
+PN_EVENT_CATEGORY_TRANSPORT = Event.Category.TRANSPORT
+
+PN_CONNECTION_INIT = Event.Type.CONNECTION_INIT
+PN_CONNECTION_OPEN = Event.Type.CONNECTION_OPEN
+PN_CONNECTION_REMOTE_OPEN = Event.Type.CONNECTION_REMOTE_OPEN
+PN_CONNECTION_CLOSE = Event.Type.CONNECTION_CLOSE
+PN_CONNECTION_REMOTE_CLOSE = Event.Type.CONNECTION_REMOTE_CLOSE
+PN_CONNECTION_FINAL = Event.Type.CONNECTION_FINAL
+PN_SESSION_INIT = Event.Type.SESSION_INIT
+PN_SESSION_OPEN = Event.Type.SESSION_OPEN
+PN_SESSION_REMOTE_OPEN = Event.Type.SESSION_REMOTE_OPEN
+PN_SESSION_CLOSE = Event.Type.SESSION_CLOSE
+PN_SESSION_REMOTE_CLOSE = Event.Type.SESSION_REMOTE_CLOSE
+PN_SESSION_FINAL = Event.Type.SESSION_FINAL
+PN_LINK_INIT = Event.Type.LINK_INIT
+PN_LINK_OPEN = Event.Type.LINK_OPEN
+PN_LINK_REMOTE_OPEN = Event.Type.LINK_REMOTE_OPEN
+PN_LINK_CLOSE = Event.Type.LINK_CLOSE
+PN_LINK_REMOTE_CLOSE = Event.Type.LINK_REMOTE_CLOSE
PN_LINK_FLOW = Event.Type.LINK_FLOW
+PN_LINK_FINAL = Event.Type.LINK_FINAL
PN_DELIVERY = Event.Type.DELIVERY
PN_TRANSPORT = Event.Type.TRANSPORT
Modified: qpid/proton/branches/fadams-javascript-binding/proton-j/src/main/resources/csasl.py
URL: http://svn.apache.org/viewvc/qpid/proton/branches/fadams-javascript-binding/proton-j/src/main/resources/csasl.py?rev=1622849&r1=1622848&r2=1622849&view=diff
==============================================================================
--- qpid/proton/branches/fadams-javascript-binding/proton-j/src/main/resources/csasl.py (original)
+++ qpid/proton/branches/fadams-javascript-binding/proton-j/src/main/resources/csasl.py Sat Sep 6 11:23:10 2014
@@ -29,6 +29,7 @@ PN_SASL_AUTH=1
PN_SASL_SYS=2
PN_SASL_PERM=3
PN_SASL_TEMP=4
+PN_SASL_SKIPPED=5
PN_SASL_CONF = 0
PN_SASL_IDLE = 1
@@ -53,7 +54,8 @@ SASL_OUTCOMES_P2J = {
PN_SASL_AUTH: Sasl.PN_SASL_AUTH,
PN_SASL_SYS: Sasl.PN_SASL_SYS,
PN_SASL_PERM: Sasl.PN_SASL_PERM,
- PN_SASL_TEMP: Sasl.PN_SASL_TEMP
+ PN_SASL_TEMP: Sasl.PN_SASL_TEMP,
+ PN_SASL_SKIPPED: Sasl.PN_SASL_SKIPPED
}
SASL_OUTCOMES_J2P = {
@@ -62,7 +64,8 @@ SASL_OUTCOMES_J2P = {
Sasl.PN_SASL_AUTH: PN_SASL_AUTH,
Sasl.PN_SASL_SYS: PN_SASL_SYS,
Sasl.PN_SASL_PERM: PN_SASL_PERM,
- Sasl.PN_SASL_TEMP: PN_SASL_TEMP
+ Sasl.PN_SASL_TEMP: PN_SASL_TEMP,
+ Sasl.PN_SASL_SKIPPED: PN_SASL_SKIPPED
}
def pn_sasl_state(sasl):
@@ -77,6 +80,9 @@ def pn_sasl_client(sasl):
def pn_sasl_server(sasl):
sasl.server()
+def pn_sasl_allow_skip(sasl, allow):
+ sasl.allowSkip(allow)
+
def pn_sasl_done(sasl, outcome):
sasl.done(SASL_OUTCOMES_P2J[outcome])
Modified: qpid/proton/branches/fadams-javascript-binding/proton-j/src/test/java/org/apache/qpid/proton/engine/impl/FrameParserTest.java
URL: http://svn.apache.org/viewvc/qpid/proton/branches/fadams-javascript-binding/proton-j/src/test/java/org/apache/qpid/proton/engine/impl/FrameParserTest.java?rev=1622849&r1=1622848&r2=1622849&view=diff
==============================================================================
--- qpid/proton/branches/fadams-javascript-binding/proton-j/src/test/java/org/apache/qpid/proton/engine/impl/FrameParserTest.java (original)
+++ qpid/proton/branches/fadams-javascript-binding/proton-j/src/test/java/org/apache/qpid/proton/engine/impl/FrameParserTest.java Sat Sep 6 11:23:10 2014
@@ -41,6 +41,7 @@ import org.apache.qpid.proton.amqp.trans
import org.apache.qpid.proton.codec.AMQPDefinedTypes;
import org.apache.qpid.proton.codec.DecoderImpl;
import org.apache.qpid.proton.codec.EncoderImpl;
+import org.apache.qpid.proton.engine.Transport;
import org.apache.qpid.proton.engine.TransportException;
import org.apache.qpid.proton.engine.TransportResult;
import org.apache.qpid.proton.engine.TransportResult.Status;
@@ -63,9 +64,6 @@ public class FrameParserTest
private final AmqpFramer _amqpFramer = new AmqpFramer();
- @Rule
- public ExpectedException _expectedException = ExpectedException.none();
-
@Before
public void setUp()
{
@@ -80,16 +78,8 @@ public class FrameParserTest
String headerMismatchMessage = "AMQP header mismatch";
ByteBuffer buffer = _frameParser.tail();
buffer.put("hello".getBytes());
- try {
- _frameParser.process();
- fail("expected exception");
- } catch (TransportException e) {
- assertThat(e.getMessage(), containsString(headerMismatchMessage));
- }
-
- _expectedException.expect(TransportException.class);
- _expectedException.expectMessage(headerMismatchMessage);
- _frameParser.tail();
+ _frameParser.process();
+ assertEquals(_frameParser.capacity(), Transport.END_OF_STREAM);
}
@Test
Modified: qpid/proton/branches/fadams-javascript-binding/proton-j/src/test/java/org/apache/qpid/proton/engine/impl/TransportImplTest.java
URL: http://svn.apache.org/viewvc/qpid/proton/branches/fadams-javascript-binding/proton-j/src/test/java/org/apache/qpid/proton/engine/impl/TransportImplTest.java?rev=1622849&r1=1622848&r2=1622849&view=diff
==============================================================================
--- qpid/proton/branches/fadams-javascript-binding/proton-j/src/test/java/org/apache/qpid/proton/engine/impl/TransportImplTest.java (original)
+++ qpid/proton/branches/fadams-javascript-binding/proton-j/src/test/java/org/apache/qpid/proton/engine/impl/TransportImplTest.java Sat Sep 6 11:23:10 2014
@@ -207,4 +207,19 @@ public class TransportImplTest
assertTrue("Expecting second buffer to have bytes", buf.remaining() > 0);
assertTrue("Expecting second buffer to not be full", buf.remaining() < Transport.MIN_MAX_FRAME_SIZE);
}
+
+ @Test
+ public void testAttemptToInitiateSaslAfterProcessingBeginsCausesIllegalStateException()
+ {
+ _transport.process();
+
+ try
+ {
+ _transport.sasl();
+ }
+ catch(IllegalStateException ise)
+ {
+ //expected, sasl must be initiated before processing begins
+ }
+ }
}
Modified: qpid/proton/branches/fadams-javascript-binding/proton-j/src/test/java/org/apache/qpid/proton/engine/impl/TransportOutputAdaptorTest.java
URL: http://svn.apache.org/viewvc/qpid/proton/branches/fadams-javascript-binding/proton-j/src/test/java/org/apache/qpid/proton/engine/impl/TransportOutputAdaptorTest.java?rev=1622849&r1=1622848&r2=1622849&view=diff
==============================================================================
--- qpid/proton/branches/fadams-javascript-binding/proton-j/src/test/java/org/apache/qpid/proton/engine/impl/TransportOutputAdaptorTest.java (original)
+++ qpid/proton/branches/fadams-javascript-binding/proton-j/src/test/java/org/apache/qpid/proton/engine/impl/TransportOutputAdaptorTest.java Sat Sep 6 11:23:10 2014
@@ -25,6 +25,8 @@ import static org.junit.Assert.assertEqu
import static org.junit.Assert.assertTrue;
import static org.junit.Assert.fail;
+import org.apache.qpid.proton.engine.TransportException;
+
import java.nio.ByteBuffer;
import org.junit.Test;
@@ -143,7 +145,7 @@ public class TransportOutputAdaptorTest
_cannedOutput = cannedOutput;
}
- public void closed()
+ public void closed(TransportException error)
{
// do nothing
}
Modified: qpid/proton/branches/fadams-javascript-binding/proton-j/src/test/java/org/apache/qpid/proton/engine/impl/ssl/SimpleSslTransportWrapperTest.java
URL: http://svn.apache.org/viewvc/qpid/proton/branches/fadams-javascript-binding/proton-j/src/test/java/org/apache/qpid/proton/engine/impl/ssl/SimpleSslTransportWrapperTest.java?rev=1622849&r1=1622848&r2=1622849&view=diff
==============================================================================
--- qpid/proton/branches/fadams-javascript-binding/proton-j/src/test/java/org/apache/qpid/proton/engine/impl/ssl/SimpleSslTransportWrapperTest.java (original)
+++ qpid/proton/branches/fadams-javascript-binding/proton-j/src/test/java/org/apache/qpid/proton/engine/impl/ssl/SimpleSslTransportWrapperTest.java Sat Sep 6 11:23:10 2014
@@ -32,6 +32,7 @@ import java.nio.ByteBuffer;
import javax.net.ssl.SSLException;
+import org.apache.qpid.proton.engine.Transport;
import org.apache.qpid.proton.engine.TransportException;
import org.junit.Before;
import org.junit.Rule;
@@ -132,16 +133,8 @@ public class SimpleSslTransportWrapperTe
_dummySslEngine.rejectNextEncodedPacket(sslException);
_sslWrapper.tail().put("<-A->".getBytes());
- try {
- _sslWrapper.process();
- fail("no exception");
- } catch (TransportException e) {
- assertSame(sslException, e.getCause());
- assertEquals("", _underlyingInput.getAcceptedInput());
- }
-
- _expectedException.expect(TransportException.class);
- _sslWrapper.tail();
+ _sslWrapper.process();
+ assertEquals(_sslWrapper.capacity(), Transport.END_OF_STREAM);
}
@Test
Modified: qpid/proton/branches/fadams-javascript-binding/proton-j/src/test/java/org/apache/qpid/proton/systemtests/ProtonEngineExampleTest.java
URL: http://svn.apache.org/viewvc/qpid/proton/branches/fadams-javascript-binding/proton-j/src/test/java/org/apache/qpid/proton/systemtests/ProtonEngineExampleTest.java?rev=1622849&r1=1622848&r2=1622849&view=diff
==============================================================================
--- qpid/proton/branches/fadams-javascript-binding/proton-j/src/test/java/org/apache/qpid/proton/systemtests/ProtonEngineExampleTest.java (original)
+++ qpid/proton/branches/fadams-javascript-binding/proton-j/src/test/java/org/apache/qpid/proton/systemtests/ProtonEngineExampleTest.java Sat Sep 6 11:23:10 2014
@@ -26,7 +26,6 @@ import static org.apache.qpid.proton.eng
import static org.apache.qpid.proton.engine.EndpointState.CLOSED;
import static org.apache.qpid.proton.engine.EndpointState.UNINITIALIZED;
import static org.apache.qpid.proton.systemtests.TestLoggingHelper.bold;
-import static org.apache.qpid.proton.systemtests.engine.ProtonFactoryTestFixture.isProtonJ;
import static org.junit.Assert.assertNull;
import static org.junit.Assert.assertSame;
@@ -34,7 +33,7 @@ import java.nio.ByteBuffer;
import java.util.Arrays;
import java.util.logging.Logger;
-import org.apache.qpid.proton.ProtonFactoryLoader;
+import org.apache.qpid.proton.Proton;
import org.apache.qpid.proton.amqp.messaging.Accepted;
import org.apache.qpid.proton.amqp.messaging.AmqpValue;
import org.apache.qpid.proton.amqp.messaging.Section;
@@ -45,10 +44,8 @@ import org.apache.qpid.proton.amqp.trans
import org.apache.qpid.proton.engine.Delivery;
import org.apache.qpid.proton.engine.Endpoint;
import org.apache.qpid.proton.engine.EndpointState;
-import org.apache.qpid.proton.engine.EngineFactory;
import org.apache.qpid.proton.engine.Receiver;
import org.apache.qpid.proton.message.Message;
-import org.apache.qpid.proton.message.MessageFactory;
import org.junit.Test;
/**
@@ -80,26 +77,23 @@ public class ProtonEngineExampleTest
private final String _targetAddress = _server.containerId + "-link1-target";
- private final EngineFactory _engineFactory = new ProtonFactoryLoader<EngineFactory>(EngineFactory.class).loadFactory();
- private final MessageFactory _messageFactory = new ProtonFactoryLoader<MessageFactory>(MessageFactory.class).loadFactory();
-
@Test
public void test() throws Exception
{
LOGGER.fine(bold("======== About to create transports"));
- _client.transport = _engineFactory.createTransport();
+ _client.transport = Proton.transport();
ProtocolTracerEnabler.setProtocolTracer(_client.transport, TestLoggingHelper.CLIENT_PREFIX);
- _server.transport = _engineFactory.createTransport();
+ _server.transport = Proton.transport();
ProtocolTracerEnabler.setProtocolTracer(_server.transport, " " + TestLoggingHelper.SERVER_PREFIX);
doOutputInputCycle();
- _client.connection = _engineFactory.createConnection();
+ _client.connection = Proton.connection();
_client.transport.bind(_client.connection);
- _server.connection = _engineFactory.createConnection();
+ _server.connection = Proton.connection();
_server.transport.bind(_server.connection);
@@ -182,7 +176,7 @@ public class ProtonEngineExampleTest
LOGGER.fine(bold("======== About to create a message and send it to the server"));
- _client.message = _messageFactory.createMessage();
+ _client.message = Proton.message();
Section messageBody = new AmqpValue("Hello");
_client.message.setBody(messageBody);
_client.messageData = new byte[BUFFER_SIZE];
@@ -195,12 +189,7 @@ public class ProtonEngineExampleTest
assertEquals("For simplicity, assume the sender can accept all the data",
lengthOfEncodedMessage, numberOfBytesAcceptedBySender);
- if (isProtonJ(_engineFactory))
- {
- // TODO PROTON-261: Proton-c ProtonJNI.pn_delivery_local_state is returning 0, which doesn't map to an
- // value within the C enum.
- assertNull(_client.delivery.getLocalState());
- }
+ assertNull(_client.delivery.getLocalState());
boolean senderAdvanced = _client.sender.advance();
assertTrue("sender has not advanced", senderAdvanced);
@@ -213,11 +202,8 @@ public class ProtonEngineExampleTest
_server.delivery = _server.connection.getWorkHead();
assertEquals("The received delivery should be on our receiver",
_server.receiver, _server.delivery.getLink());
- if (isProtonJ(_engineFactory))
- {
- assertNull(_server.delivery.getLocalState());
- assertNull(_server.delivery.getRemoteState());
- }
+ assertNull(_server.delivery.getLocalState());
+ assertNull(_server.delivery.getRemoteState());
assertFalse(_server.delivery.isPartial());
assertTrue(_server.delivery.isReadable());
@@ -226,7 +212,7 @@ public class ProtonEngineExampleTest
int numberOfBytesProducedByReceiver = _server.receiver.recv(_server.messageData, 0, BUFFER_SIZE);
assertEquals(numberOfBytesAcceptedBySender, numberOfBytesProducedByReceiver);
- _server.message = _messageFactory.createMessage();
+ _server.message = Proton.message();
_server.message.decode(_server.messageData, 0, numberOfBytesProducedByReceiver);
boolean messageProcessed = applicationProcessMessage(_server.message);
Modified: qpid/proton/branches/fadams-javascript-binding/proton-j/src/test/java/org/apache/qpid/proton/systemtests/SimpleTest.java
URL: http://svn.apache.org/viewvc/qpid/proton/branches/fadams-javascript-binding/proton-j/src/test/java/org/apache/qpid/proton/systemtests/SimpleTest.java?rev=1622849&r1=1622848&r2=1622849&view=diff
==============================================================================
--- qpid/proton/branches/fadams-javascript-binding/proton-j/src/test/java/org/apache/qpid/proton/systemtests/SimpleTest.java (original)
+++ qpid/proton/branches/fadams-javascript-binding/proton-j/src/test/java/org/apache/qpid/proton/systemtests/SimpleTest.java Sat Sep 6 11:23:10 2014
@@ -20,10 +20,9 @@ package org.apache.qpid.proton.systemtes
import static org.junit.Assert.assertEquals;
-import org.apache.qpid.proton.ProtonFactoryLoader;
+import org.apache.qpid.proton.Proton;
import org.apache.qpid.proton.engine.Connection;
import org.apache.qpid.proton.engine.EndpointState;
-import org.apache.qpid.proton.engine.EngineFactory;
import org.apache.qpid.proton.engine.Transport;
import org.junit.Test;
@@ -33,14 +32,12 @@ public class SimpleTest
@Test
public void test()
{
- EngineFactory engineFactory = new ProtonFactoryLoader<EngineFactory>(EngineFactory.class).loadFactory();
-
- Connection connection1 = engineFactory.createConnection();
- Connection connection2 = engineFactory.createConnection();;
- Transport transport1 = engineFactory.createTransport();
+ Connection connection1 = Proton.connection();
+ Connection connection2 = Proton.connection();;
+ Transport transport1 = Proton.transport();
transport1.bind(connection1);
- Transport transport2 = engineFactory.createTransport();
+ Transport transport2 = Proton.transport();
transport2.bind(connection2);
assertEquals(EndpointState.UNINITIALIZED, connection1.getLocalState());
Modified: qpid/proton/branches/fadams-javascript-binding/proton-j/src/test/java/org/apache/qpid/proton/systemtests/engine/ConnectionTest.java
URL: http://svn.apache.org/viewvc/qpid/proton/branches/fadams-javascript-binding/proton-j/src/test/java/org/apache/qpid/proton/systemtests/engine/ConnectionTest.java?rev=1622849&r1=1622848&r2=1622849&view=diff
==============================================================================
--- qpid/proton/branches/fadams-javascript-binding/proton-j/src/test/java/org/apache/qpid/proton/systemtests/engine/ConnectionTest.java (original)
+++ qpid/proton/branches/fadams-javascript-binding/proton-j/src/test/java/org/apache/qpid/proton/systemtests/engine/ConnectionTest.java Sat Sep 6 11:23:10 2014
@@ -23,8 +23,6 @@ import static java.util.EnumSet.of;
import static org.apache.qpid.proton.engine.EndpointState.ACTIVE;
import static org.apache.qpid.proton.engine.EndpointState.CLOSED;
import static org.apache.qpid.proton.engine.EndpointState.UNINITIALIZED;
-import static org.apache.qpid.proton.systemtests.engine.ProtonFactoryTestFixture.isProtonC;
-import static org.apache.qpid.proton.systemtests.engine.ProtonFactoryTestFixture.isProtonJ;
import static org.junit.Assert.assertEquals;
import static org.junit.Assert.assertNotNull;
import static org.junit.Assert.assertNull;
@@ -34,6 +32,7 @@ import java.util.Arrays;
import java.util.HashMap;
import java.util.Map;
+import org.apache.qpid.proton.Proton;
import org.apache.qpid.proton.amqp.Symbol;
import org.apache.qpid.proton.amqp.transport.Close;
import org.apache.qpid.proton.amqp.transport.ErrorCondition;
@@ -41,7 +40,6 @@ import org.apache.qpid.proton.amqp.trans
import org.apache.qpid.proton.engine.Connection;
import org.apache.qpid.proton.engine.Endpoint;
import org.apache.qpid.proton.engine.EndpointState;
-import org.apache.qpid.proton.engine.EngineFactory;
import org.apache.qpid.proton.engine.Session;
import org.apache.qpid.proton.engine.Transport;
import org.apache.qpid.proton.engine.TransportException;
@@ -59,18 +57,13 @@ public class ConnectionTest
private static final String SERVER_CONTAINER = "serverContainer";
private static final String CLIENT_CONTAINER = "clientContainer";
- private final ProtonFactoryTestFixture _protonFactoryTestFixture = new ProtonFactoryTestFixture();
-
- private EngineFactory _clientFactory = _protonFactoryTestFixture.getFactory1();
- private EngineFactory _serverFactory = _protonFactoryTestFixture.getFactory2();
-
- private final Transport _clientTransport = _clientFactory.createTransport();
- private final Transport _serverTransport = _serverFactory.createTransport();
+ private final Transport _clientTransport = Proton.transport();
+ private final Transport _serverTransport = Proton.transport();
private final TransportPumper _pumper = new TransportPumper(_clientTransport, _serverTransport);
- private final Connection _clientConnection = _clientFactory.createConnection();
- private final Connection _serverConnection = _serverFactory.createConnection();
+ private final Connection _clientConnection = Proton.connection();
+ private final Connection _serverConnection = Proton.connection();
private final AmqpFramer _framer = new AmqpFramer();
@@ -87,7 +80,7 @@ public class ConnectionTest
/** Container id is a mandatory field so this should cause an error */
- @Test(expected=TransportException.class)
+ @Test
public void testReceiptOfOpenWithoutContainerId_causesTODO()
{
_pumper.pumpAll();
@@ -97,7 +90,7 @@ public class ConnectionTest
int serverConsumed = _serverTransport.input(openFrameBuffer, 0, openFrameBuffer.length);
assertEquals(openFrameBuffer.length, serverConsumed);
- assumeTrue(isProtonJ(_serverFactory));
+ assertEquals(_serverTransport.capacity(), Transport.END_OF_STREAM);
}
/**
@@ -268,10 +261,7 @@ public class ConnectionTest
_pumper.pumpOnceFromClientToServer();
assertEnpointState(_clientConnection, CLOSED, UNINITIALIZED);
- if (!isProtonC(_serverFactory))
- {
- assertEnpointState(_serverConnection, UNINITIALIZED, CLOSED);
- }
+ assertEnpointState(_serverConnection, UNINITIALIZED, CLOSED);
}
/**
@@ -341,9 +331,6 @@ public class ConnectionTest
@SuppressWarnings({ "rawtypes", "unchecked" })
public void testCloseConnectionWithErrorCode_causesCloseFrameContainingErrorCodeToBeSent()
{
- // TODO Proton-c fails if no remote condition is set
- assumeTrue(isProtonJ(_clientFactory) && isProtonJ(_serverFactory));
-
bindAndOpenConnections();
/*
@@ -411,8 +398,6 @@ public class ConnectionTest
Close surprisingClose = new Close();
byte[] buf = _framer.generateFrame(0, surprisingClose);
- assumeTrue(isProtonJ(_serverFactory));
- // TODO Proton-C: function pn_do_close causes a SEGV fault if you try and close an unopened connection
_serverTransport.input(buf, 0, buf.length);
// TODO server should indicate error
Modified: qpid/proton/branches/fadams-javascript-binding/proton-j/src/test/java/org/apache/qpid/proton/systemtests/engine/TransportTest.java
URL: http://svn.apache.org/viewvc/qpid/proton/branches/fadams-javascript-binding/proton-j/src/test/java/org/apache/qpid/proton/systemtests/engine/TransportTest.java?rev=1622849&r1=1622848&r2=1622849&view=diff
==============================================================================
--- qpid/proton/branches/fadams-javascript-binding/proton-j/src/test/java/org/apache/qpid/proton/systemtests/engine/TransportTest.java (original)
+++ qpid/proton/branches/fadams-javascript-binding/proton-j/src/test/java/org/apache/qpid/proton/systemtests/engine/TransportTest.java Sat Sep 6 11:23:10 2014
@@ -22,7 +22,7 @@ package org.apache.qpid.proton.systemtes
import static org.junit.Assert.assertArrayEquals;
import static org.junit.Assert.assertEquals;
-import org.apache.qpid.proton.engine.EngineFactory;
+import org.apache.qpid.proton.Proton;
import org.apache.qpid.proton.engine.Transport;
import org.apache.qpid.proton.engine.TransportException;
import org.junit.Ignore;
@@ -34,9 +34,7 @@ import org.junit.Test;
*/
public class TransportTest
{
- private final ProtonFactoryTestFixture _protonFactoryTestFixture = new ProtonFactoryTestFixture();
- private final EngineFactory _factory = _protonFactoryTestFixture.getFactory1();
- private final Transport _transport = _factory.createTransport();
+ private final Transport _transport = Proton.transport();
/**
* Note that Proton does not yet give the application explicit control over protocol version negotiation
Modified: qpid/proton/branches/fadams-javascript-binding/tests/java/org/apache/qpid/proton/InteropTest.java
URL: http://svn.apache.org/viewvc/qpid/proton/branches/fadams-javascript-binding/tests/java/org/apache/qpid/proton/InteropTest.java?rev=1622849&r1=1622848&r2=1622849&view=diff
==============================================================================
--- qpid/proton/branches/fadams-javascript-binding/tests/java/org/apache/qpid/proton/InteropTest.java (original)
+++ qpid/proton/branches/fadams-javascript-binding/tests/java/org/apache/qpid/proton/InteropTest.java Sat Sep 6 11:23:10 2014
@@ -19,13 +19,11 @@
package org.apache.qpid.proton;
import org.apache.qpid.proton.TestDecoder;
-import org.apache.qpid.proton.ProtonFactoryLoader;
import org.apache.qpid.proton.amqp.Binary;
import org.apache.qpid.proton.amqp.DescribedType;
import org.apache.qpid.proton.amqp.Symbol;
import org.apache.qpid.proton.amqp.messaging.AmqpValue;
import org.apache.qpid.proton.message.Message;
-import org.apache.qpid.proton.message.MessageFactory;
import static org.junit.Assert.assertEquals;
import static org.junit.Assert.assertArrayEquals;
@@ -69,8 +67,7 @@ public class InteropTest
Message decodeMessage(String name) throws IOException
{
byte[] data = getBytes(name);
- MessageFactory mf = new ProtonFactoryLoader<MessageFactory>(MessageFactory.class).loadFactory();
- Message m = mf.createMessage();
+ Message m = Proton.message();
m.decode(data, 0, data.length);
return m;
}
Modified: qpid/proton/branches/fadams-javascript-binding/tests/python/proton_tests/common.py
URL: http://svn.apache.org/viewvc/qpid/proton/branches/fadams-javascript-binding/tests/python/proton_tests/common.py?rev=1622849&r1=1622848&r2=1622849&view=diff
==============================================================================
--- qpid/proton/branches/fadams-javascript-binding/tests/python/proton_tests/common.py (original)
+++ qpid/proton/branches/fadams-javascript-binding/tests/python/proton_tests/common.py Sat Sep 6 11:23:10 2014
@@ -21,7 +21,7 @@ from random import randint
from threading import Thread
from socket import socket, AF_INET, SOCK_STREAM
from subprocess import Popen,PIPE,STDOUT
-import sys, os
+import sys, os, string
from proton import Driver, Connection, Transport, SASL, Endpoint, Delivery, \
SSLDomain, SSLUnavailable
@@ -46,42 +46,37 @@ def free_tcp_ports(count=1):
s.close()
return ports
+def pump_uni(src, dst, buffer_size=1024):
+ p = src.pending()
+ c = dst.capacity()
+
+ if c < 0:
+ if p < 0:
+ return False
+ else:
+ src.close_head()
+ return True
+
+ if p < 0:
+ dst.close_tail()
+ elif p == 0 or c == 0:
+ return False
+ else:
+ bytes = src.peek(min(c, buffer_size))
+ dst.push(bytes)
+ src.pop(len(bytes))
+
+ return True
def pump(transport1, transport2, buffer_size=1024):
""" Transfer all pending bytes between two Proton engines
- by repeatedly calling input and output.
+ by repeatedly calling peek/pop and push.
Asserts that each engine accepts some bytes every time
(unless it's already closed).
"""
-
- out1_leftover_by_t2 = ""
- out2_leftover_by_t1 = ""
- i = 0
-
- while True:
- out1 = out1_leftover_by_t2 + (transport1.output(buffer_size) or "")
- out2 = out2_leftover_by_t1 + (transport2.output(buffer_size) or "")
-
- if out1:
- number_t2_consumed = transport2.input(out1)
- if number_t2_consumed is None:
- # special None return value means input is closed so discard the leftovers
- out1_leftover_by_t2 = ""
- else:
- assert number_t2_consumed > 0, (number_t2_consumed, len(out1), out1[:100])
- out1_leftover_by_t2 = out1[number_t2_consumed:]
-
- if out2:
- number_t1_consumed = transport1.input(out2)
- if number_t1_consumed is None:
- # special None return value means input is closed so discard the leftovers
- out2_leftover_by_t1 = ""
- else:
- assert number_t1_consumed > 0, (number_t1_consumed, len(out1), out1[:100])
- out2_leftover_by_t1 = out2[number_t1_consumed:]
-
- if not out1 and not out2: break
- i = i + 1
+ while (pump_uni(transport1, transport2, buffer_size) or
+ pump_uni(transport2, transport1, buffer_size)):
+ pass
def isSSLPresent():
""" True if a suitable SSL library is available.
@@ -335,6 +330,16 @@ class MessengerApp(object):
self.password = None
self._output = None
+ def findfile(self, filename, searchpath):
+ """Find filename in the searchpath
+ return absolute path to the file or None
+ """
+ paths = string.split(searchpath, os.pathsep)
+ for path in paths:
+ if os.path.exists(os.path.join(path, filename)):
+ return os.path.abspath(os.path.join(path, filename))
+ return None
+
def start(self, verbose=False):
""" Begin executing the test """
cmd = self.cmdline()
@@ -343,8 +348,20 @@ class MessengerApp(object):
print("COMMAND='%s'" % str(cmd))
#print("ENV='%s'" % str(os.environ.copy()))
try:
+ if os.name=="nt":
+ # Windows handles python launch by replacing script 'filename' with
+ # 'python abspath-to-filename' in cmdline arg list.
+ if cmd[0].endswith('.py'):
+ foundfile = self.findfile(cmd[0], os.environ['PATH'])
+ if foundfile is None:
+ foundfile = self.findfile(cmd[0], os.environ['PYTHONPATH'])
+ assert foundfile is not None, "Unable to locate file '%s' in PATH or PYTHONPATH" % cmd[0]
+ del cmd[0:1]
+ cmd.insert(0, foundfile)
+ cmd.insert(0, sys.executable)
self._process = Popen(cmd, stdout=PIPE, stderr=STDOUT, bufsize=4096)
except OSError, e:
+ print("ERROR: '%s'" % e)
assert False, "Unable to execute command '%s', is it in your PATH?" % cmd[0]
self._ready() # wait for it to initialize
@@ -514,7 +531,7 @@ class MessengerReceiver(MessengerApp):
def _ready(self):
""" wait for subscriptions to complete setup. """
r = self._process.stdout.readline()
- assert r == "READY\n", "Unexpected input while waiting for receiver to initialize: %s" % r
+ assert r == "READY" + os.linesep, "Unexpected input while waiting for receiver to initialize: %s" % r
class MessengerSenderC(MessengerSender):
def __init__(self):
---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscribe@qpid.apache.org
For additional commands, e-mail: commits-help@qpid.apache.org