You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@activemq.apache.org by ta...@apache.org on 2014/10/20 15:56:01 UTC
[1/2] git commit: https://issues.apache.org/jira/browse/AMQ-5403
Repository: activemq
Updated Branches:
refs/heads/trunk 004568234 -> 4881a848d
https://issues.apache.org/jira/browse/AMQ-5403
Remove now unnecessary workaround for older proton-j TTL issue.
Project: http://git-wip-us.apache.org/repos/asf/activemq/repo
Commit: http://git-wip-us.apache.org/repos/asf/activemq/commit/5a6129b5
Tree: http://git-wip-us.apache.org/repos/asf/activemq/tree/5a6129b5
Diff: http://git-wip-us.apache.org/repos/asf/activemq/diff/5a6129b5
Branch: refs/heads/trunk
Commit: 5a6129b51241449a7310bbd812b24274ec8d7914
Parents: 0045682
Author: Timothy Bish <ta...@gmail.com>
Authored: Mon Oct 20 09:42:56 2014 -0400
Committer: Timothy Bish <ta...@gmail.com>
Committed: Mon Oct 20 09:42:56 2014 -0400
----------------------------------------------------------------------
.../activemq/transport/amqp/AmqpProtocolConverter.java | 11 +----------
1 file changed, 1 insertion(+), 10 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/activemq/blob/5a6129b5/activemq-amqp/src/main/java/org/apache/activemq/transport/amqp/AmqpProtocolConverter.java
----------------------------------------------------------------------
diff --git a/activemq-amqp/src/main/java/org/apache/activemq/transport/amqp/AmqpProtocolConverter.java b/activemq-amqp/src/main/java/org/apache/activemq/transport/amqp/AmqpProtocolConverter.java
index fa49665..1f8bb36 100644
--- a/activemq-amqp/src/main/java/org/apache/activemq/transport/amqp/AmqpProtocolConverter.java
+++ b/activemq-amqp/src/main/java/org/apache/activemq/transport/amqp/AmqpProtocolConverter.java
@@ -645,19 +645,10 @@ class AmqpProtocolConverter implements IAmqpProtocolConverter {
message.setTransactionId(new LocalTransactionId(connectionId, txid));
}
- // Lets handle the case where the expiration was set, but the timestamp
- // was not set by the client. Lets assign the timestamp now, and adjust
- // the expiration.
- if (message.getExpiration() != 0) {
- if (message.getTimestamp() == 0) {
- message.setTimestamp(System.currentTimeMillis());
- message.setExpiration(message.getTimestamp() + message.getExpiration());
- }
- }
-
message.onSend();
if (!delivery.remotelySettled()) {
sendToActiveMQ(message, new ResponseHandler() {
+
@Override
public void onResponse(IAmqpProtocolConverter converter, Response response) throws IOException {
if (response.isException()) {
[2/2] git commit: https://issues.apache.org/jira/browse/AMQ-5402
Posted by ta...@apache.org.
https://issues.apache.org/jira/browse/AMQ-5402
Add support for encoding the destination type in transformed messages as
a byte value to supoort future JMS->AMQP spec mappings.
Project: http://git-wip-us.apache.org/repos/asf/activemq/repo
Commit: http://git-wip-us.apache.org/repos/asf/activemq/commit/4881a848
Tree: http://git-wip-us.apache.org/repos/asf/activemq/tree/4881a848
Diff: http://git-wip-us.apache.org/repos/asf/activemq/diff/4881a848
Branch: refs/heads/trunk
Commit: 4881a848dc9c8170ab82267a6bdedd4d3adcc372
Parents: 5a6129b
Author: Timothy Bish <ta...@gmail.com>
Authored: Mon Oct 20 09:49:36 2014 -0400
Committer: Timothy Bish <ta...@gmail.com>
Committed: Mon Oct 20 09:49:36 2014 -0400
----------------------------------------------------------------------
.../transport/amqp/AmqpProtocolConverter.java | 19 +++++++++++++++++++
.../activemq/transport/amqp/AmqpWireFormat.java | 9 +++++++++
2 files changed, 28 insertions(+)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/activemq/blob/4881a848/activemq-amqp/src/main/java/org/apache/activemq/transport/amqp/AmqpProtocolConverter.java
----------------------------------------------------------------------
diff --git a/activemq-amqp/src/main/java/org/apache/activemq/transport/amqp/AmqpProtocolConverter.java b/activemq-amqp/src/main/java/org/apache/activemq/transport/amqp/AmqpProtocolConverter.java
index 1f8bb36..80b47cc 100644
--- a/activemq-amqp/src/main/java/org/apache/activemq/transport/amqp/AmqpProtocolConverter.java
+++ b/activemq-amqp/src/main/java/org/apache/activemq/transport/amqp/AmqpProtocolConverter.java
@@ -118,12 +118,14 @@ class AmqpProtocolConverter implements IAmqpProtocolConverter {
private static final Symbol JMS_SELECTOR = Symbol.valueOf("jms-selector");
private static final Symbol NO_LOCAL = Symbol.valueOf("no-local");
private static final Symbol ANONYMOUS_RELAY = Symbol.valueOf("x-opt-anonymous-relay");
+ private static final Symbol JMS_MAPPING_VERSION = Symbol.valueOf("x-opt-jms-mapping-version");
private static final Symbol DURABLE_SUBSCRIPTION_ENDED = Symbol.getSymbol("DURABLE_SUBSCRIPTION_ENDED");
protected int prefetch;
protected Transport protonTransport = Proton.transport();
protected Connection protonConnection = Proton.connection();
protected Collector eventCollector = new CollectorImpl();
+ protected boolean useByteDestinationTypeAnnotation;
public AmqpProtocolConverter(AmqpTransport transport) {
this.amqpTransport = transport;
@@ -134,6 +136,8 @@ class AmqpProtocolConverter implements IAmqpProtocolConverter {
this.protonTransport.setMaxFrameSize(maxFrameSize);
}
+ useByteDestinationTypeAnnotation = transport.getWireFormat().isUseByteDestinationTypeAnnotation();
+
this.protonTransport.bind(this.protonConnection);
// NOTE: QPid JMS client has a bug where the channel max is stored as a
@@ -456,6 +460,17 @@ class AmqpProtocolConverter implements IAmqpProtocolConverter {
connectionInfo.setClientId(clientId);
}
+ Map<Symbol, Object> props = protonConnection.getRemoteProperties();
+ if (props != null) {
+ if (props.containsKey(JMS_MAPPING_VERSION)) {
+ useByteDestinationTypeAnnotation = true;
+ }
+ }
+
+ if (useByteDestinationTypeAnnotation) {
+ outboundTransformer.setUseByteDestinationTypeAnnotations(true);
+ }
+
connectionInfo.setTransportContext(amqpTransport.getPeerCertificates());
sendToActiveMQ(connectionInfo, new ResponseHandler() {
@@ -529,6 +544,10 @@ class AmqpProtocolConverter implements IAmqpProtocolConverter {
LOG.warn("Unknown transformer type {} using native one instead", transformer);
inboundTransformer = new AMQPNativeInboundTransformer(ActiveMQJMSVendor.INSTANCE);
}
+
+ if (useByteDestinationTypeAnnotation) {
+ inboundTransformer.setUseByteDestinationTypeAnnotations(true);
+ }
}
return inboundTransformer;
}
http://git-wip-us.apache.org/repos/asf/activemq/blob/4881a848/activemq-amqp/src/main/java/org/apache/activemq/transport/amqp/AmqpWireFormat.java
----------------------------------------------------------------------
diff --git a/activemq-amqp/src/main/java/org/apache/activemq/transport/amqp/AmqpWireFormat.java b/activemq-amqp/src/main/java/org/apache/activemq/transport/amqp/AmqpWireFormat.java
index f6c2880..b58273d 100644
--- a/activemq-amqp/src/main/java/org/apache/activemq/transport/amqp/AmqpWireFormat.java
+++ b/activemq-amqp/src/main/java/org/apache/activemq/transport/amqp/AmqpWireFormat.java
@@ -41,6 +41,7 @@ public class AmqpWireFormat implements WireFormat {
private long maxFrameSize = DEFAULT_MAX_FRAME_SIZE;
private int maxAmqpFrameSize = NO_AMQP_MAX_FRAME_SIZE;
private String anonymousNodeName = "$relay";
+ private boolean useByteDestinationTypeAnnotation = false;
@Override
public ByteSequence marshal(Object command) throws IOException {
@@ -135,4 +136,12 @@ public class AmqpWireFormat implements WireFormat {
public void setAnonymousNodeName(String anonymousNodeName) {
this.anonymousNodeName = anonymousNodeName;
}
+
+ public boolean isUseByteDestinationTypeAnnotation() {
+ return useByteDestinationTypeAnnotation;
+ }
+
+ public void setUseByteDestinationTypeAnnotation(boolean useByteDestinationTypeAnnotation) {
+ this.useByteDestinationTypeAnnotation = useByteDestinationTypeAnnotation;
+ }
}