You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@qpid.apache.org by ta...@apache.org on 2014/09/29 23:58:33 UTC
git commit: Remove all Proton-JMS and use just Proton-J constructs.
Repository: qpid-jms
Updated Branches:
refs/heads/master 6485baab5 -> d9803914a
Remove all Proton-JMS and use just Proton-J constructs.
Project: http://git-wip-us.apache.org/repos/asf/qpid-jms/repo
Commit: http://git-wip-us.apache.org/repos/asf/qpid-jms/commit/d9803914
Tree: http://git-wip-us.apache.org/repos/asf/qpid-jms/tree/d9803914
Diff: http://git-wip-us.apache.org/repos/asf/qpid-jms/diff/d9803914
Branch: refs/heads/master
Commit: d9803914a0144c50101e84f41c8a0bcf525219bd
Parents: 6485baa
Author: Timothy Bish <ta...@gmail.com>
Authored: Mon Sep 29 17:57:45 2014 -0400
Committer: Timothy Bish <ta...@gmail.com>
Committed: Mon Sep 29 17:57:45 2014 -0400
----------------------------------------------------------------------
pom.xml | 2 +-
qpid-jms-client/pom.xml | 2 +-
.../qpid/jms/provider/amqp/AmqpConsumer.java | 18 +--
.../jms/provider/amqp/AmqpFixedProducer.java | 53 +------
.../qpid/jms/provider/amqp/AmqpJMSVendor.java | 159 -------------------
5 files changed, 12 insertions(+), 222 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/qpid-jms/blob/d9803914/pom.xml
----------------------------------------------------------------------
diff --git a/pom.xml b/pom.xml
index 8d5a83e..3c7b754 100644
--- a/pom.xml
+++ b/pom.xml
@@ -115,7 +115,7 @@
</dependency>
<dependency>
<groupId>org.apache.qpid</groupId>
- <artifactId>proton-jms</artifactId>
+ <artifactId>proton-j</artifactId>
<version>${proton-version}</version>
</dependency>
<dependency>
http://git-wip-us.apache.org/repos/asf/qpid-jms/blob/d9803914/qpid-jms-client/pom.xml
----------------------------------------------------------------------
diff --git a/qpid-jms-client/pom.xml b/qpid-jms-client/pom.xml
index 3fca330..c3ca09b 100644
--- a/qpid-jms-client/pom.xml
+++ b/qpid-jms-client/pom.xml
@@ -47,7 +47,7 @@
</dependency>
<dependency>
<groupId>org.apache.qpid</groupId>
- <artifactId>proton-jms</artifactId>
+ <artifactId>proton-j</artifactId>
</dependency>
<dependency>
<groupId>io.vertx</groupId>
http://git-wip-us.apache.org/repos/asf/qpid-jms/blob/d9803914/qpid-jms-client/src/main/java/org/apache/qpid/jms/provider/amqp/AmqpConsumer.java
----------------------------------------------------------------------
diff --git a/qpid-jms-client/src/main/java/org/apache/qpid/jms/provider/amqp/AmqpConsumer.java b/qpid-jms-client/src/main/java/org/apache/qpid/jms/provider/amqp/AmqpConsumer.java
index 9c04a5a..41fef20 100644
--- a/qpid-jms-client/src/main/java/org/apache/qpid/jms/provider/amqp/AmqpConsumer.java
+++ b/qpid-jms-client/src/main/java/org/apache/qpid/jms/provider/amqp/AmqpConsumer.java
@@ -32,8 +32,8 @@ import org.apache.qpid.jms.meta.JmsConsumerId;
import org.apache.qpid.jms.meta.JmsConsumerInfo;
import org.apache.qpid.jms.provider.AsyncResult;
import org.apache.qpid.jms.provider.ProviderConstants.ACK_TYPE;
-import org.apache.qpid.jms.provider.amqp.message.AmqpJmsMessageBuilder;
import org.apache.qpid.jms.provider.ProviderListener;
+import org.apache.qpid.jms.provider.amqp.message.AmqpJmsMessageBuilder;
import org.apache.qpid.jms.util.IOExceptionSupport;
import org.apache.qpid.proton.amqp.Binary;
import org.apache.qpid.proton.amqp.DescribedType;
@@ -49,7 +49,6 @@ import org.apache.qpid.proton.amqp.transport.ReceiverSettleMode;
import org.apache.qpid.proton.amqp.transport.SenderSettleMode;
import org.apache.qpid.proton.engine.Delivery;
import org.apache.qpid.proton.engine.Receiver;
-import org.apache.qpid.proton.jms.EncodedMessage;
import org.apache.qpid.proton.message.Message;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
@@ -305,13 +304,9 @@ public class AmqpConsumer extends AbstractAmqpResource<JmsConsumerInfo, Receiver
}
private void processDelivery(Delivery incoming) throws Exception {
- EncodedMessage encoded = readIncomingMessage(incoming);
JmsMessage message = null;
try {
- Message protonMessage = Message.Factory.create();
- protonMessage.decode(encoded.getArray(), encoded.getArrayOffset(), encoded.getLength());
-
- message = (JmsMessage) AmqpJmsMessageBuilder.createJmsMessage(this, protonMessage);
+ message = AmqpJmsMessageBuilder.createJmsMessage(this, decodeIncomingMessage(incoming));
} catch (Exception e) {
LOG.warn("Error on transform: {}", e.getMessage());
// TODO - We could signal provider error but not sure we want to fail
@@ -421,7 +416,8 @@ public class AmqpConsumer extends AbstractAmqpResource<JmsConsumerInfo, Receiver
}
}
- protected EncodedMessage readIncomingMessage(Delivery incoming) {
+ // TODO - Find more efficient ways to produce the Message instance.
+ protected Message decodeIncomingMessage(Delivery incoming) {
byte[] buffer;
int count;
@@ -429,11 +425,13 @@ public class AmqpConsumer extends AbstractAmqpResource<JmsConsumerInfo, Receiver
streamBuffer.write(incomingBuffer, 0, count);
}
+ // TODO - This will copy, replace with something better later. Pooled Netty Buffer ?
buffer = streamBuffer.toByteArray();
try {
- //TODO: get rid of EncodedMessage usage
- return new EncodedMessage(incoming.getMessageFormat(), buffer, 0, buffer.length);
+ Message protonMessage = Message.Factory.create();
+ protonMessage.decode(buffer, 0, buffer.length);
+ return protonMessage;
} finally {
streamBuffer.reset();
}
http://git-wip-us.apache.org/repos/asf/qpid-jms/blob/d9803914/qpid-jms-client/src/main/java/org/apache/qpid/jms/provider/amqp/AmqpFixedProducer.java
----------------------------------------------------------------------
diff --git a/qpid-jms-client/src/main/java/org/apache/qpid/jms/provider/amqp/AmqpFixedProducer.java b/qpid-jms-client/src/main/java/org/apache/qpid/jms/provider/amqp/AmqpFixedProducer.java
index 4271fc9..676d386 100644
--- a/qpid-jms-client/src/main/java/org/apache/qpid/jms/provider/amqp/AmqpFixedProducer.java
+++ b/qpid-jms-client/src/main/java/org/apache/qpid/jms/provider/amqp/AmqpFixedProducer.java
@@ -45,9 +45,6 @@ import org.apache.qpid.proton.amqp.transport.ReceiverSettleMode;
import org.apache.qpid.proton.amqp.transport.SenderSettleMode;
import org.apache.qpid.proton.engine.Delivery;
import org.apache.qpid.proton.engine.Sender;
-import org.apache.qpid.proton.jms.AutoOutboundTransformer;
-import org.apache.qpid.proton.jms.EncodedMessage;
-import org.apache.qpid.proton.jms.OutboundTransformer;
import org.apache.qpid.proton.message.Message;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
@@ -66,9 +63,6 @@ public class AmqpFixedProducer extends AmqpProducer {
private final Set<Delivery> pending = new LinkedHashSet<Delivery>();
private final LinkedList<PendingSend> pendingSends = new LinkedList<PendingSend>();
private byte[] encodeBuffer = new byte[1024 * 8];
-
- private final OutboundTransformer outboundTransformer = new AutoOutboundTransformer(AmqpJMSVendor.INSTANCE);
- private final String MESSAGE_FORMAT_KEY = outboundTransformer.getPrefixVendor() + "MESSAGE_FORMAT";
private boolean presettle = false;
public AmqpFixedProducer(AmqpSession session, JmsProducerInfo info) {
@@ -131,12 +125,8 @@ public class AmqpFixedProducer extends AmqpProducer {
JmsMessage message = envelope.getMessage();
message.setReadOnlyBody(true);
- if (facade instanceof AmqpJmsMessageFacade) {
- AmqpJmsMessageFacade amqpMessage = (AmqpJmsMessageFacade) facade;
- encodeAndSend(amqpMessage.getAmqpMessage(), delivery);
- } else {
- encodeAndSendTransformed(envelope.getMessage(), delivery);
- }
+ AmqpJmsMessageFacade amqpMessage = (AmqpJmsMessageFacade) facade;
+ encodeAndSend(amqpMessage.getAmqpMessage(), delivery);
if (presettle) {
delivery.settle();
@@ -177,45 +167,6 @@ public class AmqpFixedProducer extends AmqpProducer {
}
}
- private void encodeAndSendTransformed(JmsMessage message, Delivery delivery) throws IOException, JMSException {
-
- byte[] sendBuffer = null;
- int sendBufferSize = 0;
- int sendBufferOffset = 0;
-
- EncodedMessage amqp = null;
-
- // Needed by the transformer process.
- if (!message.getProperties().containsKey(MESSAGE_FORMAT_KEY)) {
- message.setProperty(MESSAGE_FORMAT_KEY, 0);
- }
-
- try {
- amqp = outboundTransformer.transform(message);
- } catch (Exception e) {
- throw IOExceptionSupport.create(e);
- }
-
- if (amqp != null && amqp.getLength() > 0) {
- sendBuffer = amqp.getArray();
- sendBufferOffset = amqp.getArrayOffset();
- sendBufferSize = amqp.getLength();
- int sentSoFar = 0;
-
- while (true) {
- int sent = endpoint.send(sendBuffer, sendBufferOffset + sentSoFar, sendBufferSize - sentSoFar);
- if (sent > 0) {
- sentSoFar += sent;
- if ((sendBufferSize - sentSoFar) == 0) {
- break;
- }
- } else {
- LOG.warn("{} failed to send any data from current Message.", this);
- }
- }
- }
- }
-
@Override
public void processFlowUpdates() throws IOException {
if (!pendingSends.isEmpty() && endpoint.getCredit() > 0) {
http://git-wip-us.apache.org/repos/asf/qpid-jms/blob/d9803914/qpid-jms-client/src/main/java/org/apache/qpid/jms/provider/amqp/AmqpJMSVendor.java
----------------------------------------------------------------------
diff --git a/qpid-jms-client/src/main/java/org/apache/qpid/jms/provider/amqp/AmqpJMSVendor.java b/qpid-jms-client/src/main/java/org/apache/qpid/jms/provider/amqp/AmqpJMSVendor.java
deleted file mode 100644
index fbe2b21..0000000
--- a/qpid-jms-client/src/main/java/org/apache/qpid/jms/provider/amqp/AmqpJMSVendor.java
+++ /dev/null
@@ -1,159 +0,0 @@
-/**
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements. See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package org.apache.qpid.jms.provider.amqp;
-
-import javax.jms.BytesMessage;
-import javax.jms.Destination;
-import javax.jms.JMSException;
-import javax.jms.MapMessage;
-import javax.jms.Message;
-import javax.jms.ObjectMessage;
-import javax.jms.Queue;
-import javax.jms.StreamMessage;
-import javax.jms.TemporaryQueue;
-import javax.jms.TemporaryTopic;
-import javax.jms.TextMessage;
-import javax.jms.Topic;
-
-import org.apache.qpid.jms.JmsDestination;
-import org.apache.qpid.jms.JmsQueue;
-import org.apache.qpid.jms.JmsTemporaryQueue;
-import org.apache.qpid.jms.JmsTemporaryTopic;
-import org.apache.qpid.jms.JmsTopic;
-import org.apache.qpid.jms.message.JmsMessage;
-import org.apache.qpid.jms.message.JmsMessageFactory;
-import org.apache.qpid.jms.provider.amqp.message.AmqpJmsMessageFactory;
-import org.apache.qpid.proton.jms.JMSVendor;
-
-/**
- * Vendor instance used with Proton-J JMS Transformer bits.
- *
- * TODO - This can go once we have our own message wrappers all working.
- */
-public class AmqpJMSVendor extends JMSVendor {
-
- public static final AmqpJMSVendor INSTANCE = new AmqpJMSVendor();
-
- private final JmsMessageFactory factory = new AmqpJmsMessageFactory();
-
- private AmqpJMSVendor() {
- }
-
- @Override
- public BytesMessage createBytesMessage() {
- try {
- return factory.createBytesMessage();
- } catch (JMSException e) {
- throw new RuntimeException(e);
- }
- }
-
- @Override
- public StreamMessage createStreamMessage() {
- try {
- return factory.createStreamMessage();
- } catch (JMSException e) {
- throw new RuntimeException(e);
- }
- }
-
- @Override
- public Message createMessage() {
- try {
- return factory.createMessage();
- } catch (JMSException e) {
- throw new RuntimeException(e);
- }
- }
-
- @Override
- public TextMessage createTextMessage() {
- try {
- return factory.createTextMessage();
- } catch (JMSException e) {
- throw new RuntimeException(e);
- }
- }
-
- @Override
- public ObjectMessage createObjectMessage() {
- try {
- return factory.createObjectMessage();
- } catch (JMSException e) {
- throw new RuntimeException(e);
- }
- }
-
- @Override
- public MapMessage createMapMessage() {
- try {
- return factory.createMapMessage();
- } catch (JMSException e) {
- throw new RuntimeException(e);
- }
- }
-
- @Override
- public Destination createDestination(String name) {
- return super.createDestination(name, Destination.class);
- }
-
- @Override
- public <T extends Destination> T createDestination(String name, Class<T> kind) {
- if (kind == Queue.class) {
- return kind.cast(new JmsQueue(name));
- }
- if (kind == Topic.class) {
- return kind.cast(new JmsTopic(name));
- }
- if (kind == TemporaryQueue.class) {
- return kind.cast(new JmsTemporaryQueue(name));
- }
- if (kind == TemporaryTopic.class) {
- return kind.cast(new JmsTemporaryTopic(name));
- }
-
- return kind.cast(new JmsQueue(name));
- }
-
- @Override
- public void setJMSXUserID(Message msg, String value) {
- ((JmsMessage) msg).getFacade().setUserId(value);
- }
-
- @Override
- public void setJMSXGroupID(Message msg, String value) {
- ((JmsMessage) msg).getFacade().setGroupId(value);
- }
-
- @Override
- public void setJMSXGroupSequence(Message msg, int value) {
- ((JmsMessage) msg).getFacade().setGroupSequence(value);
- }
-
- @Override
- public void setJMSXDeliveryCount(Message msg, long value) {
- // Delivery count tracks total deliveries which is always one higher than
- // re-delivery count since first delivery counts to.
- ((JmsMessage) msg).getFacade().setRedeliveryCounter((int) (value == 0 ? value : value - 1));
- }
-
- @Override
- public String toAddress(Destination dest) {
- return ((JmsDestination) dest).getName();
- }
-}
---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscribe@qpid.apache.org
For additional commands, e-mail: commits-help@qpid.apache.org