You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@qpid.apache.org by ro...@apache.org on 2014/11/10 18:54:15 UTC
[5/7] qpid-jms git commit: encapsulate the endpoint field
encapsulate the endpoint field
Project: http://git-wip-us.apache.org/repos/asf/qpid-jms/repo
Commit: http://git-wip-us.apache.org/repos/asf/qpid-jms/commit/f2b3a5da
Tree: http://git-wip-us.apache.org/repos/asf/qpid-jms/tree/f2b3a5da
Diff: http://git-wip-us.apache.org/repos/asf/qpid-jms/diff/f2b3a5da
Branch: refs/heads/master
Commit: f2b3a5da8217e7dcd228ae8ee7ce32253b0780e4
Parents: da94884
Author: Robert Gemmell <ro...@apache.org>
Authored: Mon Nov 10 15:21:03 2014 +0000
Committer: Robert Gemmell <ro...@apache.org>
Committed: Mon Nov 10 17:48:37 2014 +0000
----------------------------------------------------------------------
.../jms/provider/amqp/AmqpAbstractResource.java | 42 +++++++++++---------
.../qpid/jms/provider/amqp/AmqpConnection.java | 16 ++++----
.../provider/amqp/AmqpConnectionSession.java | 7 ++--
.../qpid/jms/provider/amqp/AmqpConsumer.java | 39 +++++++++---------
.../jms/provider/amqp/AmqpFixedProducer.java | 32 ++++++++-------
.../jms/provider/amqp/AmqpQueueBrowser.java | 20 +++++-----
.../qpid/jms/provider/amqp/AmqpSession.java | 4 +-
.../provider/amqp/AmqpTemporaryDestination.java | 19 +++++----
.../provider/amqp/AmqpTransactionContext.java | 25 +++++++-----
9 files changed, 112 insertions(+), 92 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/qpid-jms/blob/f2b3a5da/qpid-jms-client/src/main/java/org/apache/qpid/jms/provider/amqp/AmqpAbstractResource.java
----------------------------------------------------------------------
diff --git a/qpid-jms-client/src/main/java/org/apache/qpid/jms/provider/amqp/AmqpAbstractResource.java b/qpid-jms-client/src/main/java/org/apache/qpid/jms/provider/amqp/AmqpAbstractResource.java
index 6def062..933683a 100644
--- a/qpid-jms-client/src/main/java/org/apache/qpid/jms/provider/amqp/AmqpAbstractResource.java
+++ b/qpid-jms-client/src/main/java/org/apache/qpid/jms/provider/amqp/AmqpAbstractResource.java
@@ -45,7 +45,7 @@ public abstract class AmqpAbstractResource<R extends JmsResource, E extends Endp
protected AsyncResult openRequest;
protected AsyncResult closeRequest;
- protected E endpoint;
+ private E endpoint;
protected R resource;
/**
@@ -68,19 +68,19 @@ public abstract class AmqpAbstractResource<R extends JmsResource, E extends Endp
*/
public AmqpAbstractResource(R resource, E endpoint) {
this.resource = resource;
- this.endpoint = endpoint;
+ setEndpoint(endpoint);
}
@Override
public void open(AsyncResult request) {
this.openRequest = request;
doOpen();
- this.endpoint.setContext(this);
+ getEndpoint().setContext(this);
}
@Override
public boolean isOpen() {
- return this.endpoint.getRemoteState() == EndpointState.ACTIVE;
+ return getEndpoint().getRemoteState() == EndpointState.ACTIVE;
}
@Override
@@ -99,7 +99,7 @@ public abstract class AmqpAbstractResource<R extends JmsResource, E extends Endp
@Override
public void close(AsyncResult request) {
// If already closed signal success or else the caller might never get notified.
- if (endpoint.getLocalState() == EndpointState.CLOSED) {
+ if (getEndpoint().getLocalState() == EndpointState.CLOSED) {
request.onSuccess();
return;
}
@@ -110,7 +110,7 @@ public abstract class AmqpAbstractResource<R extends JmsResource, E extends Endp
@Override
public boolean isClosed() {
- return this.endpoint.getLocalState() == EndpointState.CLOSED;
+ return getEndpoint().getLocalState() == EndpointState.CLOSED;
}
@Override
@@ -120,8 +120,8 @@ public abstract class AmqpAbstractResource<R extends JmsResource, E extends Endp
@Override
public void closed() {
- this.endpoint.close();
- this.endpoint.free();
+ getEndpoint().close();
+ getEndpoint().free();
if (this.closeRequest != null) {
this.closeRequest.onSuccess();
@@ -166,34 +166,38 @@ public abstract class AmqpAbstractResource<R extends JmsResource, E extends Endp
return this.endpoint;
}
+ public void setEndpoint(E endpoint) {
+ this.endpoint = endpoint;
+ }
+
public R getJmsResource() {
return this.resource;
}
public EndpointState getLocalState() {
- if (endpoint == null) {
+ if (getEndpoint() == null) {
return EndpointState.UNINITIALIZED;
}
- return this.endpoint.getLocalState();
+ return getEndpoint().getLocalState();
}
public EndpointState getRemoteState() {
- if (endpoint == null) {
+ if (getEndpoint() == null) {
return EndpointState.UNINITIALIZED;
}
- return this.endpoint.getRemoteState();
+ return getEndpoint().getRemoteState();
}
@Override
public boolean hasRemoteError() {
- return endpoint.getRemoteCondition().getCondition() != null;
+ return getEndpoint().getRemoteCondition().getCondition() != null;
}
@Override
public Exception getRemoteError() {
String message = getRemoteErrorMessage();
Exception remoteError = null;
- Symbol error = endpoint.getRemoteCondition().getCondition();
+ Symbol error = getEndpoint().getRemoteCondition().getCondition();
if (error != null) {
if (error.equals(AmqpError.UNAUTHORIZED_ACCESS)) {
remoteError = new JMSSecurityException(message);
@@ -208,8 +212,8 @@ public abstract class AmqpAbstractResource<R extends JmsResource, E extends Endp
@Override
public String getRemoteErrorMessage() {
String message = "Received unkown error from remote peer";
- if (endpoint.getRemoteCondition() != null) {
- ErrorCondition error = endpoint.getRemoteCondition();
+ if (getEndpoint().getRemoteCondition() != null) {
+ ErrorCondition error = getEndpoint().getRemoteCondition();
if (error.getDescription() != null && !error.getDescription().isEmpty()) {
message = error.getDescription();
}
@@ -220,7 +224,7 @@ public abstract class AmqpAbstractResource<R extends JmsResource, E extends Endp
@Override
public void processStateChange() throws IOException {
- EndpointState remoteState = endpoint.getRemoteState();
+ EndpointState remoteState = getEndpoint().getRemoteState();
if (remoteState == EndpointState.ACTIVE) {
if (isAwaitingOpen()) {
@@ -258,7 +262,7 @@ public abstract class AmqpAbstractResource<R extends JmsResource, E extends Endp
* updates.
*/
protected void doOpen() {
- endpoint.open();
+ getEndpoint().open();
}
/**
@@ -267,6 +271,6 @@ public abstract class AmqpAbstractResource<R extends JmsResource, E extends Endp
* standard close path such as endpoint detach etc.
*/
protected void doClose() {
- endpoint.close();
+ getEndpoint().close();
}
}
http://git-wip-us.apache.org/repos/asf/qpid-jms/blob/f2b3a5da/qpid-jms-client/src/main/java/org/apache/qpid/jms/provider/amqp/AmqpConnection.java
----------------------------------------------------------------------
diff --git a/qpid-jms-client/src/main/java/org/apache/qpid/jms/provider/amqp/AmqpConnection.java b/qpid-jms-client/src/main/java/org/apache/qpid/jms/provider/amqp/AmqpConnection.java
index 8d5a458..c500d74 100644
--- a/qpid-jms-client/src/main/java/org/apache/qpid/jms/provider/amqp/AmqpConnection.java
+++ b/qpid-jms-client/src/main/java/org/apache/qpid/jms/provider/amqp/AmqpConnection.java
@@ -94,8 +94,8 @@ public class AmqpConnection extends AmqpAbstractResource<JmsConnectionInfo, Conn
@Override
protected void doOpen() {
- this.endpoint.setContainer(resource.getClientId());
- this.endpoint.setHostname(remoteURI.getHost());
+ getEndpoint().setContainer(resource.getClientId());
+ getEndpoint().setHostname(remoteURI.getHost());
super.doOpen();
}
@@ -132,7 +132,7 @@ public class AmqpConnection extends AmqpAbstractResource<JmsConnectionInfo, Conn
connected = true;
this.properties = new AmqpConnectionProperties(
- endpoint.getRemoteOfferedCapabilities(), endpoint.getRemoteProperties());
+ getEndpoint().getRemoteOfferedCapabilities(), getEndpoint().getRemoteProperties());
connectionSession.open(new AsyncResult() {
@@ -155,14 +155,14 @@ public class AmqpConnection extends AmqpAbstractResource<JmsConnectionInfo, Conn
});
}
- EndpointState localState = endpoint.getLocalState();
- EndpointState remoteState = endpoint.getRemoteState();
+ EndpointState localState = getEndpoint().getLocalState();
+ EndpointState remoteState = getEndpoint().getRemoteState();
// We are still active (connected or not) and something on the remote end has
// closed us, signal an error if one was sent.
if (localState == EndpointState.ACTIVE && remoteState != EndpointState.ACTIVE) {
- if (endpoint.getRemoteCondition().getCondition() != null) {
- LOG.info("Error condition detected on Connection open {}.", endpoint.getRemoteCondition().getCondition());
+ if (getEndpoint().getRemoteCondition().getCondition() != null) {
+ LOG.info("Error condition detected on Connection open {}.", getEndpoint().getRemoteCondition().getCondition());
Exception remoteError = getRemoteError();
if (isAwaitingOpen()) {
openRequest.onFailure(remoteError);
@@ -214,7 +214,7 @@ public class AmqpConnection extends AmqpAbstractResource<JmsConnectionInfo, Conn
}
public Connection getProtonConnection() {
- return this.endpoint;
+ return this.getEndpoint();
}
public URI getRemoteURI() {
http://git-wip-us.apache.org/repos/asf/qpid-jms/blob/f2b3a5da/qpid-jms-client/src/main/java/org/apache/qpid/jms/provider/amqp/AmqpConnectionSession.java
----------------------------------------------------------------------
diff --git a/qpid-jms-client/src/main/java/org/apache/qpid/jms/provider/amqp/AmqpConnectionSession.java b/qpid-jms-client/src/main/java/org/apache/qpid/jms/provider/amqp/AmqpConnectionSession.java
index 3cfc0c6..e08d1b9 100644
--- a/qpid-jms-client/src/main/java/org/apache/qpid/jms/provider/amqp/AmqpConnectionSession.java
+++ b/qpid-jms-client/src/main/java/org/apache/qpid/jms/provider/amqp/AmqpConnectionSession.java
@@ -81,9 +81,10 @@ public class AmqpConnectionSession extends AmqpSession {
@Override
protected void doOpen() {
- endpoint.setTarget(new Target());
- endpoint.setSenderSettleMode(SenderSettleMode.UNSETTLED);
- endpoint.setReceiverSettleMode(ReceiverSettleMode.FIRST);
+ Receiver receiver = getEndpoint();
+ receiver.setTarget(new Target());
+ receiver.setSenderSettleMode(SenderSettleMode.UNSETTLED);
+ receiver.setReceiverSettleMode(ReceiverSettleMode.FIRST);
super.doOpen();
}
http://git-wip-us.apache.org/repos/asf/qpid-jms/blob/f2b3a5da/qpid-jms-client/src/main/java/org/apache/qpid/jms/provider/amqp/AmqpConsumer.java
----------------------------------------------------------------------
diff --git a/qpid-jms-client/src/main/java/org/apache/qpid/jms/provider/amqp/AmqpConsumer.java b/qpid-jms-client/src/main/java/org/apache/qpid/jms/provider/amqp/AmqpConsumer.java
index cb766bf..1a9be73 100644
--- a/qpid-jms-client/src/main/java/org/apache/qpid/jms/provider/amqp/AmqpConsumer.java
+++ b/qpid-jms-client/src/main/java/org/apache/qpid/jms/provider/amqp/AmqpConsumer.java
@@ -88,7 +88,7 @@ public class AmqpConsumer extends AmqpAbstractResource<JmsConsumerInfo, Receiver
* Starts the consumer by setting the link credit to the given prefetch value.
*/
public void start(AsyncResult request) {
- this.endpoint.flow(resource.getPrefetchSize());
+ getEndpoint().flow(resource.getPrefetchSize());
request.onSuccess();
}
@@ -110,15 +110,18 @@ public class AmqpConsumer extends AmqpAbstractResource<JmsConsumerInfo, Receiver
receiverName = resource.getSubscriptionName();
}
- endpoint = session.getProtonSession().receiver(receiverName);
- endpoint.setSource(source);
- endpoint.setTarget(target);
+ Receiver receiver = session.getProtonSession().receiver(receiverName);
+ receiver.setSource(source);
+ receiver.setTarget(target);
if (isPresettle()) {
- endpoint.setSenderSettleMode(SenderSettleMode.SETTLED);
+ receiver.setSenderSettleMode(SenderSettleMode.SETTLED);
} else {
- endpoint.setSenderSettleMode(SenderSettleMode.UNSETTLED);
+ receiver.setSenderSettleMode(SenderSettleMode.UNSETTLED);
}
- endpoint.setReceiverSettleMode(ReceiverSettleMode.FIRST);
+ receiver.setReceiverSettleMode(ReceiverSettleMode.FIRST);
+
+ setEndpoint(receiver);
+
super.doOpen();
}
@@ -251,9 +254,9 @@ public class AmqpConsumer extends AmqpAbstractResource<JmsConsumerInfo, Receiver
return;
}
- int currentCredit = endpoint.getCredit();
+ int currentCredit = getEndpoint().getCredit();
if (currentCredit <= resource.getPrefetchSize() * 0.2) {
- endpoint.flow(resource.getPrefetchSize() - currentCredit);
+ getEndpoint().flow(resource.getPrefetchSize() - currentCredit);
}
}
@@ -282,9 +285,9 @@ public class AmqpConsumer extends AmqpAbstractResource<JmsConsumerInfo, Receiver
* @param timeout
*/
public void pull(long timeout) {
- if (resource.getPrefetchSize() == 0 && endpoint.getCredit() == 0) {
+ if (resource.getPrefetchSize() == 0 && getEndpoint().getCredit() == 0) {
// expand the credit window by one.
- endpoint.flow(1);
+ getEndpoint().flow(1);
}
}
@@ -292,7 +295,7 @@ public class AmqpConsumer extends AmqpAbstractResource<JmsConsumerInfo, Receiver
public void processDeliveryUpdates() throws IOException {
Delivery incoming = null;
do {
- incoming = endpoint.current();
+ incoming = getEndpoint().current();
if (incoming != null && incoming.isReadable() && !incoming.isPartial()) {
LOG.trace("{} has incoming Message(s).", this);
try {
@@ -300,7 +303,7 @@ public class AmqpConsumer extends AmqpAbstractResource<JmsConsumerInfo, Receiver
} catch (Exception e) {
throw IOExceptionSupport.create(e);
}
- endpoint.advance();
+ getEndpoint().advance();
} else {
LOG.trace("{} has a partial incoming Message(s), deferring.", this);
incoming = null;
@@ -348,9 +351,9 @@ public class AmqpConsumer extends AmqpAbstractResource<JmsConsumerInfo, Receiver
@Override
protected void doClose() {
if (resource.isDurable()) {
- this.endpoint.detach();
+ getEndpoint().detach();
} else {
- this.endpoint.close();
+ getEndpoint().close();
}
}
@@ -371,7 +374,7 @@ public class AmqpConsumer extends AmqpAbstractResource<JmsConsumerInfo, Receiver
}
public Receiver getProtonReceiver() {
- return this.endpoint;
+ return this.getEndpoint();
}
public boolean isBrowser() {
@@ -398,7 +401,7 @@ public class AmqpConsumer extends AmqpAbstractResource<JmsConsumerInfo, Receiver
incoming.disposition(disposition);
incoming.settle();
if (expandCredit) {
- endpoint.flow(1);
+ getEndpoint().flow(1);
}
}
@@ -420,7 +423,7 @@ public class AmqpConsumer extends AmqpAbstractResource<JmsConsumerInfo, Receiver
protected Message decodeIncomingMessage(Delivery incoming) {
int count;
- while ((count = endpoint.recv(incomingBuffer.array(), incomingBuffer.writerIndex(), incomingBuffer.writableBytes())) > 0) {
+ while ((count = getEndpoint().recv(incomingBuffer.array(), incomingBuffer.writerIndex(), incomingBuffer.writableBytes())) > 0) {
incomingBuffer.writerIndex(incomingBuffer.writerIndex() + count);
if (!incomingBuffer.isWritable()) {
incomingBuffer.capacity((int) (incomingBuffer.capacity() * 1.5));
http://git-wip-us.apache.org/repos/asf/qpid-jms/blob/f2b3a5da/qpid-jms-client/src/main/java/org/apache/qpid/jms/provider/amqp/AmqpFixedProducer.java
----------------------------------------------------------------------
diff --git a/qpid-jms-client/src/main/java/org/apache/qpid/jms/provider/amqp/AmqpFixedProducer.java b/qpid-jms-client/src/main/java/org/apache/qpid/jms/provider/amqp/AmqpFixedProducer.java
index 7eb6683..8cc5a74 100644
--- a/qpid-jms-client/src/main/java/org/apache/qpid/jms/provider/amqp/AmqpFixedProducer.java
+++ b/qpid-jms-client/src/main/java/org/apache/qpid/jms/provider/amqp/AmqpFixedProducer.java
@@ -86,7 +86,7 @@ public class AmqpFixedProducer extends AmqpProducer {
// TODO - Handle the case where remote has no credit which means we can't send to it.
// We need to hold the send until remote credit becomes available but we should
// also have a send timeout option and filter timed out sends.
- if (endpoint.getCredit() <= 0) {
+ if (getEndpoint().getCredit() <= 0) {
LOG.trace("Holding Message send until credit is available.");
// Once a message goes into a held mode we no longer can send it async, so
// we clear the async flag if set to avoid the sender never getting notified.
@@ -108,9 +108,9 @@ public class AmqpFixedProducer extends AmqpProducer {
Delivery delivery = null;
if (presettle) {
- delivery = endpoint.delivery(EMPTY_BYTE_ARRAY, 0, 0);
+ delivery = getEndpoint().delivery(EMPTY_BYTE_ARRAY, 0, 0);
} else {
- delivery = endpoint.delivery(tag, 0, tag.length);
+ delivery = getEndpoint().delivery(tag, 0, tag.length);
}
delivery.setContext(request);
@@ -129,7 +129,7 @@ public class AmqpFixedProducer extends AmqpProducer {
delivery.settle();
} else {
pending.add(delivery);
- endpoint.advance();
+ getEndpoint().advance();
}
if (envelope.isSendAsync() || presettle) {
@@ -152,7 +152,7 @@ public class AmqpFixedProducer extends AmqpProducer {
int sentSoFar = 0;
while (true) {
- int sent = endpoint.send(encodeBuffer, sentSoFar, encodedSize - sentSoFar);
+ int sent = getEndpoint().send(encodeBuffer, sentSoFar, encodedSize - sentSoFar);
if (sent > 0) {
sentSoFar += sent;
if ((encodedSize - sentSoFar) == 0) {
@@ -166,8 +166,8 @@ public class AmqpFixedProducer extends AmqpProducer {
@Override
public void processFlowUpdates() throws IOException {
- if (!pendingSends.isEmpty() && endpoint.getCredit() > 0) {
- while (endpoint.getCredit() > 0 && !pendingSends.isEmpty()) {
+ if (!pendingSends.isEmpty() && getEndpoint().getCredit() > 0) {
+ while (getEndpoint().getCredit() > 0 && !pendingSends.isEmpty()) {
LOG.trace("Dispatching previously held send");
PendingSend held = pendingSends.pop();
try {
@@ -248,15 +248,19 @@ public class AmqpFixedProducer extends AmqpProducer {
target.setAddress(targetAddress);
String senderName = sourceAddress + ":" + targetAddress;
- endpoint = session.getProtonSession().sender(senderName);
- endpoint.setSource(source);
- endpoint.setTarget(target);
+
+ Sender sender = session.getProtonSession().sender(senderName);
+ sender.setSource(source);
+ sender.setTarget(target);
if (presettle) {
- endpoint.setSenderSettleMode(SenderSettleMode.SETTLED);
+ sender.setSenderSettleMode(SenderSettleMode.SETTLED);
} else {
- endpoint.setSenderSettleMode(SenderSettleMode.UNSETTLED);
+ sender.setSenderSettleMode(SenderSettleMode.UNSETTLED);
}
- endpoint.setReceiverSettleMode(ReceiverSettleMode.FIRST);
+ sender.setReceiverSettleMode(ReceiverSettleMode.FIRST);
+
+ setEndpoint(sender);
+
super.doOpen();
}
@@ -265,7 +269,7 @@ public class AmqpFixedProducer extends AmqpProducer {
}
public Sender getProtonSender() {
- return this.endpoint;
+ return this.getEndpoint();
}
@Override
http://git-wip-us.apache.org/repos/asf/qpid-jms/blob/f2b3a5da/qpid-jms-client/src/main/java/org/apache/qpid/jms/provider/amqp/AmqpQueueBrowser.java
----------------------------------------------------------------------
diff --git a/qpid-jms-client/src/main/java/org/apache/qpid/jms/provider/amqp/AmqpQueueBrowser.java b/qpid-jms-client/src/main/java/org/apache/qpid/jms/provider/amqp/AmqpQueueBrowser.java
index 6434bbd..5daeb02 100644
--- a/qpid-jms-client/src/main/java/org/apache/qpid/jms/provider/amqp/AmqpQueueBrowser.java
+++ b/qpid-jms-client/src/main/java/org/apache/qpid/jms/provider/amqp/AmqpQueueBrowser.java
@@ -46,7 +46,7 @@ public class AmqpQueueBrowser extends AmqpConsumer {
*/
@Override
public void start(AsyncResult request) {
- this.endpoint.flow(resource.getPrefetchSize());
+ getEndpoint().flow(resource.getPrefetchSize());
request.onSuccess();
}
@@ -64,17 +64,17 @@ public class AmqpQueueBrowser extends AmqpConsumer {
*/
@Override
public void pull(long timeout) {
- if (!endpoint.getDrain() && endpoint.current() == null && endpoint.getUnsettled() == 0) {
+ if (!getEndpoint().getDrain() && getEndpoint().current() == null && getEndpoint().getUnsettled() == 0) {
LOG.trace("QueueBrowser {} will try to drain remote.", getConsumerId());
- this.endpoint.drain(resource.getPrefetchSize());
+ getEndpoint().drain(resource.getPrefetchSize());
} else {
- endpoint.setDrain(false);
+ getEndpoint().setDrain(false);
}
}
@Override
public void processFlowUpdates() throws IOException {
- if (endpoint.getDrain() && endpoint.getCredit() == endpoint.getRemoteCredit()) {
+ if (getEndpoint().getDrain() && getEndpoint().getCredit() == getEndpoint().getRemoteCredit()) {
JmsInboundMessageDispatch browseDone = new JmsInboundMessageDispatch(getNextIncomingSequenceNumber());
browseDone.setConsumerId(getConsumerId());
try {
@@ -83,7 +83,7 @@ public class AmqpQueueBrowser extends AmqpConsumer {
throw IOExceptionSupport.create(e);
}
} else {
- endpoint.setDrain(false);
+ getEndpoint().setDrain(false);
}
super.processFlowUpdates();
@@ -91,14 +91,14 @@ public class AmqpQueueBrowser extends AmqpConsumer {
@Override
public void processDeliveryUpdates() throws IOException {
- if (endpoint.getDrain() && endpoint.current() != null) {
+ if (getEndpoint().getDrain() && getEndpoint().current() != null) {
LOG.trace("{} incoming delivery, cancel drain.", getConsumerId());
- endpoint.setDrain(false);
+ getEndpoint().setDrain(false);
}
super.processDeliveryUpdates();
- if (endpoint.getDrain() && endpoint.getCredit() == endpoint.getRemoteCredit()) {
+ if (getEndpoint().getDrain() && getEndpoint().getCredit() == getEndpoint().getRemoteCredit()) {
JmsInboundMessageDispatch browseDone = new JmsInboundMessageDispatch(getNextIncomingSequenceNumber());
browseDone.setConsumerId(getConsumerId());
try {
@@ -107,7 +107,7 @@ public class AmqpQueueBrowser extends AmqpConsumer {
throw IOExceptionSupport.create(e);
}
} else {
- endpoint.setDrain(false);
+ getEndpoint().setDrain(false);
}
}
http://git-wip-us.apache.org/repos/asf/qpid-jms/blob/f2b3a5da/qpid-jms-client/src/main/java/org/apache/qpid/jms/provider/amqp/AmqpSession.java
----------------------------------------------------------------------
diff --git a/qpid-jms-client/src/main/java/org/apache/qpid/jms/provider/amqp/AmqpSession.java b/qpid-jms-client/src/main/java/org/apache/qpid/jms/provider/amqp/AmqpSession.java
index 5f7ad07..3ffd792 100644
--- a/qpid-jms-client/src/main/java/org/apache/qpid/jms/provider/amqp/AmqpSession.java
+++ b/qpid-jms-client/src/main/java/org/apache/qpid/jms/provider/amqp/AmqpSession.java
@@ -67,7 +67,7 @@ public class AmqpSession extends AmqpAbstractResource<JmsSessionInfo, Session> {
@Override
protected void doOpen() {
- this.endpoint.setIncomingCapacity(Integer.MAX_VALUE);
+ this.getEndpoint().setIncomingCapacity(Integer.MAX_VALUE);
this.connection.addSession(this);
super.doOpen();
}
@@ -292,7 +292,7 @@ public class AmqpSession extends AmqpAbstractResource<JmsSessionInfo, Session> {
}
public Session getProtonSession() {
- return this.endpoint;
+ return this.getEndpoint();
}
boolean isTransacted() {
http://git-wip-us.apache.org/repos/asf/qpid-jms/blob/f2b3a5da/qpid-jms-client/src/main/java/org/apache/qpid/jms/provider/amqp/AmqpTemporaryDestination.java
----------------------------------------------------------------------
diff --git a/qpid-jms-client/src/main/java/org/apache/qpid/jms/provider/amqp/AmqpTemporaryDestination.java b/qpid-jms-client/src/main/java/org/apache/qpid/jms/provider/amqp/AmqpTemporaryDestination.java
index 2cf1cb9..fca448d 100644
--- a/qpid-jms-client/src/main/java/org/apache/qpid/jms/provider/amqp/AmqpTemporaryDestination.java
+++ b/qpid-jms-client/src/main/java/org/apache/qpid/jms/provider/amqp/AmqpTemporaryDestination.java
@@ -58,7 +58,7 @@ public class AmqpTemporaryDestination extends AmqpAbstractResource<JmsDestinatio
// TODO - We might want to check on our producer to see if it becomes closed
// which might indicate that the broker purged the temporary destination.
- EndpointState remoteState = endpoint.getRemoteState();
+ EndpointState remoteState = getEndpoint().getRemoteState();
if (remoteState == EndpointState.ACTIVE) {
LOG.trace("Temporary Destination: {} is now open", this.resource);
opened();
@@ -73,7 +73,7 @@ public class AmqpTemporaryDestination extends AmqpAbstractResource<JmsDestinatio
// Once our producer is opened we can read the updated name from the target address.
String oldDestinationName = resource.getName();
- String destinationName = this.endpoint.getRemoteTarget().getAddress();
+ String destinationName = getEndpoint().getRemoteTarget().getAddress();
this.resource.setName(destinationName);
@@ -100,11 +100,14 @@ public class AmqpTemporaryDestination extends AmqpAbstractResource<JmsDestinatio
target.setExpiryPolicy(TerminusExpiryPolicy.LINK_DETACH);
String senderName = sourceAddress;
- endpoint = session.getProtonSession().sender(senderName);
- endpoint.setSource(source);
- endpoint.setTarget(target);
- endpoint.setSenderSettleMode(SenderSettleMode.UNSETTLED);
- endpoint.setReceiverSettleMode(ReceiverSettleMode.FIRST);
+
+ Sender sender = session.getProtonSession().sender(senderName);
+ sender.setSource(source);
+ sender.setTarget(target);
+ sender.setSenderSettleMode(SenderSettleMode.UNSETTLED);
+ sender.setReceiverSettleMode(ReceiverSettleMode.FIRST);
+
+ setEndpoint(sender);
this.connection.addTemporaryDestination(this);
@@ -127,7 +130,7 @@ public class AmqpTemporaryDestination extends AmqpAbstractResource<JmsDestinatio
}
public Sender getProtonSender() {
- return this.endpoint;
+ return getEndpoint();
}
public JmsDestination getJmsDestination() {
http://git-wip-us.apache.org/repos/asf/qpid-jms/blob/f2b3a5da/qpid-jms-client/src/main/java/org/apache/qpid/jms/provider/amqp/AmqpTransactionContext.java
----------------------------------------------------------------------
diff --git a/qpid-jms-client/src/main/java/org/apache/qpid/jms/provider/amqp/AmqpTransactionContext.java b/qpid-jms-client/src/main/java/org/apache/qpid/jms/provider/amqp/AmqpTransactionContext.java
index 5492976..a138c97 100644
--- a/qpid-jms-client/src/main/java/org/apache/qpid/jms/provider/amqp/AmqpTransactionContext.java
+++ b/qpid-jms-client/src/main/java/org/apache/qpid/jms/provider/amqp/AmqpTransactionContext.java
@@ -135,11 +135,15 @@ public class AmqpTransactionContext extends AmqpAbstractResource<JmsSessionInfo,
Source source = new Source();
String coordinatorName = resource.getSessionId().toString();
- endpoint = session.getProtonSession().sender(coordinatorName);
- endpoint.setSource(source);
- endpoint.setTarget(coordinator);
- endpoint.setSenderSettleMode(SenderSettleMode.UNSETTLED);
- endpoint.setReceiverSettleMode(ReceiverSettleMode.FIRST);
+
+ Sender sender = session.getProtonSession().sender(coordinatorName);
+ sender.setSource(source);
+ sender.setTarget(coordinator);
+ sender.setSenderSettleMode(SenderSettleMode.UNSETTLED);
+ sender.setReceiverSettleMode(ReceiverSettleMode.FIRST);
+
+ setEndpoint(sender);
+
super.doOpen();
}
@@ -152,7 +156,7 @@ public class AmqpTransactionContext extends AmqpAbstractResource<JmsSessionInfo,
Declare declare = new Declare();
message.setBody(new AmqpValue(declare));
- pendingDelivery = endpoint.delivery(tagGenerator.getNextTag());
+ pendingDelivery = getEndpoint().delivery(tagGenerator.getNextTag());
pendingRequest = request;
current = txId;
@@ -172,7 +176,7 @@ public class AmqpTransactionContext extends AmqpAbstractResource<JmsSessionInfo,
discharge.setTxnId((Binary) current.getProviderHint());
message.setBody(new AmqpValue(discharge));
- pendingDelivery = endpoint.delivery(tagGenerator.getNextTag());
+ pendingDelivery = getEndpoint().delivery(tagGenerator.getNextTag());
pendingDelivery.setContext(COMMIT_MARKER);
pendingRequest = request;
@@ -192,7 +196,7 @@ public class AmqpTransactionContext extends AmqpAbstractResource<JmsSessionInfo,
discharge.setTxnId((Binary) current.getProviderHint());
message.setBody(new AmqpValue(discharge));
- pendingDelivery = endpoint.delivery(tagGenerator.getNextTag());
+ pendingDelivery = getEndpoint().delivery(tagGenerator.getNextTag());
pendingDelivery.setContext(ROLLBACK_MARKER);
pendingRequest = request;
@@ -260,7 +264,8 @@ public class AmqpTransactionContext extends AmqpAbstractResource<JmsSessionInfo,
}
}
- this.endpoint.send(buffer, 0, encodedSize);
- this.endpoint.advance();
+ Sender sender = getEndpoint();
+ sender.send(buffer, 0, encodedSize);
+ sender.advance();
}
}
---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscribe@qpid.apache.org
For additional commands, e-mail: commits-help@qpid.apache.org