You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@qpid.apache.org by ta...@apache.org on 2014/09/29 23:58:33 UTC

git commit: Remove all Proton-JMS and use just Proton-J constructs.

Repository: qpid-jms
Updated Branches:
  refs/heads/master 6485baab5 -> d9803914a


Remove all Proton-JMS and use just Proton-J constructs.

Project: http://git-wip-us.apache.org/repos/asf/qpid-jms/repo
Commit: http://git-wip-us.apache.org/repos/asf/qpid-jms/commit/d9803914
Tree: http://git-wip-us.apache.org/repos/asf/qpid-jms/tree/d9803914
Diff: http://git-wip-us.apache.org/repos/asf/qpid-jms/diff/d9803914

Branch: refs/heads/master
Commit: d9803914a0144c50101e84f41c8a0bcf525219bd
Parents: 6485baa
Author: Timothy Bish <ta...@gmail.com>
Authored: Mon Sep 29 17:57:45 2014 -0400
Committer: Timothy Bish <ta...@gmail.com>
Committed: Mon Sep 29 17:57:45 2014 -0400

----------------------------------------------------------------------
 pom.xml                                         |   2 +-
 qpid-jms-client/pom.xml                         |   2 +-
 .../qpid/jms/provider/amqp/AmqpConsumer.java    |  18 +--
 .../jms/provider/amqp/AmqpFixedProducer.java    |  53 +------
 .../qpid/jms/provider/amqp/AmqpJMSVendor.java   | 159 -------------------
 5 files changed, 12 insertions(+), 222 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/qpid-jms/blob/d9803914/pom.xml
----------------------------------------------------------------------
diff --git a/pom.xml b/pom.xml
index 8d5a83e..3c7b754 100644
--- a/pom.xml
+++ b/pom.xml
@@ -115,7 +115,7 @@
       </dependency>
       <dependency>
         <groupId>org.apache.qpid</groupId>
-        <artifactId>proton-jms</artifactId>
+        <artifactId>proton-j</artifactId>
         <version>${proton-version}</version>
       </dependency>
       <dependency>

http://git-wip-us.apache.org/repos/asf/qpid-jms/blob/d9803914/qpid-jms-client/pom.xml
----------------------------------------------------------------------
diff --git a/qpid-jms-client/pom.xml b/qpid-jms-client/pom.xml
index 3fca330..c3ca09b 100644
--- a/qpid-jms-client/pom.xml
+++ b/qpid-jms-client/pom.xml
@@ -47,7 +47,7 @@
     </dependency>
     <dependency>
       <groupId>org.apache.qpid</groupId>
-      <artifactId>proton-jms</artifactId>
+      <artifactId>proton-j</artifactId>
     </dependency>
     <dependency>
       <groupId>io.vertx</groupId>

http://git-wip-us.apache.org/repos/asf/qpid-jms/blob/d9803914/qpid-jms-client/src/main/java/org/apache/qpid/jms/provider/amqp/AmqpConsumer.java
----------------------------------------------------------------------
diff --git a/qpid-jms-client/src/main/java/org/apache/qpid/jms/provider/amqp/AmqpConsumer.java b/qpid-jms-client/src/main/java/org/apache/qpid/jms/provider/amqp/AmqpConsumer.java
index 9c04a5a..41fef20 100644
--- a/qpid-jms-client/src/main/java/org/apache/qpid/jms/provider/amqp/AmqpConsumer.java
+++ b/qpid-jms-client/src/main/java/org/apache/qpid/jms/provider/amqp/AmqpConsumer.java
@@ -32,8 +32,8 @@ import org.apache.qpid.jms.meta.JmsConsumerId;
 import org.apache.qpid.jms.meta.JmsConsumerInfo;
 import org.apache.qpid.jms.provider.AsyncResult;
 import org.apache.qpid.jms.provider.ProviderConstants.ACK_TYPE;
-import org.apache.qpid.jms.provider.amqp.message.AmqpJmsMessageBuilder;
 import org.apache.qpid.jms.provider.ProviderListener;
+import org.apache.qpid.jms.provider.amqp.message.AmqpJmsMessageBuilder;
 import org.apache.qpid.jms.util.IOExceptionSupport;
 import org.apache.qpid.proton.amqp.Binary;
 import org.apache.qpid.proton.amqp.DescribedType;
@@ -49,7 +49,6 @@ import org.apache.qpid.proton.amqp.transport.ReceiverSettleMode;
 import org.apache.qpid.proton.amqp.transport.SenderSettleMode;
 import org.apache.qpid.proton.engine.Delivery;
 import org.apache.qpid.proton.engine.Receiver;
-import org.apache.qpid.proton.jms.EncodedMessage;
 import org.apache.qpid.proton.message.Message;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
@@ -305,13 +304,9 @@ public class AmqpConsumer extends AbstractAmqpResource<JmsConsumerInfo, Receiver
     }
 
     private void processDelivery(Delivery incoming) throws Exception {
-        EncodedMessage encoded = readIncomingMessage(incoming);
         JmsMessage message = null;
         try {
-            Message protonMessage = Message.Factory.create();
-            protonMessage.decode(encoded.getArray(), encoded.getArrayOffset(), encoded.getLength());
-
-            message = (JmsMessage) AmqpJmsMessageBuilder.createJmsMessage(this, protonMessage);
+            message = AmqpJmsMessageBuilder.createJmsMessage(this, decodeIncomingMessage(incoming));
         } catch (Exception e) {
             LOG.warn("Error on transform: {}", e.getMessage());
             // TODO - We could signal provider error but not sure we want to fail
@@ -421,7 +416,8 @@ public class AmqpConsumer extends AbstractAmqpResource<JmsConsumerInfo, Receiver
         }
     }
 
-    protected EncodedMessage readIncomingMessage(Delivery incoming) {
+    // TODO - Find more efficient ways to produce the Message instance.
+    protected Message decodeIncomingMessage(Delivery incoming) {
         byte[] buffer;
         int count;
 
@@ -429,11 +425,13 @@ public class AmqpConsumer extends AbstractAmqpResource<JmsConsumerInfo, Receiver
             streamBuffer.write(incomingBuffer, 0, count);
         }
 
+        // TODO - This will copy, replace with something better later.  Pooled Netty Buffer ?
         buffer = streamBuffer.toByteArray();
 
         try {
-            //TODO: get rid of EncodedMessage usage
-            return new EncodedMessage(incoming.getMessageFormat(), buffer, 0, buffer.length);
+            Message protonMessage = Message.Factory.create();
+            protonMessage.decode(buffer, 0, buffer.length);
+            return protonMessage;
         } finally {
             streamBuffer.reset();
         }

http://git-wip-us.apache.org/repos/asf/qpid-jms/blob/d9803914/qpid-jms-client/src/main/java/org/apache/qpid/jms/provider/amqp/AmqpFixedProducer.java
----------------------------------------------------------------------
diff --git a/qpid-jms-client/src/main/java/org/apache/qpid/jms/provider/amqp/AmqpFixedProducer.java b/qpid-jms-client/src/main/java/org/apache/qpid/jms/provider/amqp/AmqpFixedProducer.java
index 4271fc9..676d386 100644
--- a/qpid-jms-client/src/main/java/org/apache/qpid/jms/provider/amqp/AmqpFixedProducer.java
+++ b/qpid-jms-client/src/main/java/org/apache/qpid/jms/provider/amqp/AmqpFixedProducer.java
@@ -45,9 +45,6 @@ import org.apache.qpid.proton.amqp.transport.ReceiverSettleMode;
 import org.apache.qpid.proton.amqp.transport.SenderSettleMode;
 import org.apache.qpid.proton.engine.Delivery;
 import org.apache.qpid.proton.engine.Sender;
-import org.apache.qpid.proton.jms.AutoOutboundTransformer;
-import org.apache.qpid.proton.jms.EncodedMessage;
-import org.apache.qpid.proton.jms.OutboundTransformer;
 import org.apache.qpid.proton.message.Message;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
@@ -66,9 +63,6 @@ public class AmqpFixedProducer extends AmqpProducer {
     private final Set<Delivery> pending = new LinkedHashSet<Delivery>();
     private final LinkedList<PendingSend> pendingSends = new LinkedList<PendingSend>();
     private byte[] encodeBuffer = new byte[1024 * 8];
-
-    private final OutboundTransformer outboundTransformer = new AutoOutboundTransformer(AmqpJMSVendor.INSTANCE);
-    private final String MESSAGE_FORMAT_KEY = outboundTransformer.getPrefixVendor() + "MESSAGE_FORMAT";
     private boolean presettle = false;
 
     public AmqpFixedProducer(AmqpSession session, JmsProducerInfo info) {
@@ -131,12 +125,8 @@ public class AmqpFixedProducer extends AmqpProducer {
         JmsMessage message = envelope.getMessage();
         message.setReadOnlyBody(true);
 
-        if (facade instanceof AmqpJmsMessageFacade) {
-            AmqpJmsMessageFacade amqpMessage = (AmqpJmsMessageFacade) facade;
-            encodeAndSend(amqpMessage.getAmqpMessage(), delivery);
-        } else {
-            encodeAndSendTransformed(envelope.getMessage(), delivery);
-        }
+        AmqpJmsMessageFacade amqpMessage = (AmqpJmsMessageFacade) facade;
+        encodeAndSend(amqpMessage.getAmqpMessage(), delivery);
 
         if (presettle) {
             delivery.settle();
@@ -177,45 +167,6 @@ public class AmqpFixedProducer extends AmqpProducer {
         }
     }
 
-    private void encodeAndSendTransformed(JmsMessage message, Delivery delivery) throws IOException, JMSException {
-
-        byte[] sendBuffer = null;
-        int sendBufferSize = 0;
-        int sendBufferOffset = 0;
-
-        EncodedMessage amqp = null;
-
-        // Needed by the transformer process.
-        if (!message.getProperties().containsKey(MESSAGE_FORMAT_KEY)) {
-            message.setProperty(MESSAGE_FORMAT_KEY, 0);
-        }
-
-        try {
-            amqp = outboundTransformer.transform(message);
-        } catch (Exception e) {
-            throw IOExceptionSupport.create(e);
-        }
-
-        if (amqp != null && amqp.getLength() > 0) {
-            sendBuffer = amqp.getArray();
-            sendBufferOffset = amqp.getArrayOffset();
-            sendBufferSize = amqp.getLength();
-            int sentSoFar = 0;
-
-            while (true) {
-                int sent = endpoint.send(sendBuffer, sendBufferOffset + sentSoFar, sendBufferSize - sentSoFar);
-                if (sent > 0) {
-                    sentSoFar += sent;
-                    if ((sendBufferSize - sentSoFar) == 0) {
-                        break;
-                    }
-                } else {
-                    LOG.warn("{} failed to send any data from current Message.", this);
-                }
-            }
-        }
-    }
-
     @Override
     public void processFlowUpdates() throws IOException {
         if (!pendingSends.isEmpty() && endpoint.getCredit() > 0) {

http://git-wip-us.apache.org/repos/asf/qpid-jms/blob/d9803914/qpid-jms-client/src/main/java/org/apache/qpid/jms/provider/amqp/AmqpJMSVendor.java
----------------------------------------------------------------------
diff --git a/qpid-jms-client/src/main/java/org/apache/qpid/jms/provider/amqp/AmqpJMSVendor.java b/qpid-jms-client/src/main/java/org/apache/qpid/jms/provider/amqp/AmqpJMSVendor.java
deleted file mode 100644
index fbe2b21..0000000
--- a/qpid-jms-client/src/main/java/org/apache/qpid/jms/provider/amqp/AmqpJMSVendor.java
+++ /dev/null
@@ -1,159 +0,0 @@
-/**
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements.  See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License.  You may obtain a copy of the License at
- *
- *      http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package org.apache.qpid.jms.provider.amqp;
-
-import javax.jms.BytesMessage;
-import javax.jms.Destination;
-import javax.jms.JMSException;
-import javax.jms.MapMessage;
-import javax.jms.Message;
-import javax.jms.ObjectMessage;
-import javax.jms.Queue;
-import javax.jms.StreamMessage;
-import javax.jms.TemporaryQueue;
-import javax.jms.TemporaryTopic;
-import javax.jms.TextMessage;
-import javax.jms.Topic;
-
-import org.apache.qpid.jms.JmsDestination;
-import org.apache.qpid.jms.JmsQueue;
-import org.apache.qpid.jms.JmsTemporaryQueue;
-import org.apache.qpid.jms.JmsTemporaryTopic;
-import org.apache.qpid.jms.JmsTopic;
-import org.apache.qpid.jms.message.JmsMessage;
-import org.apache.qpid.jms.message.JmsMessageFactory;
-import org.apache.qpid.jms.provider.amqp.message.AmqpJmsMessageFactory;
-import org.apache.qpid.proton.jms.JMSVendor;
-
-/**
- * Vendor instance used with Proton-J JMS Transformer bits.
- *
- * TODO - This can go once we have our own message wrappers all working.
- */
-public class AmqpJMSVendor extends JMSVendor {
-
-    public static final AmqpJMSVendor INSTANCE = new AmqpJMSVendor();
-
-    private final JmsMessageFactory factory = new AmqpJmsMessageFactory();
-
-    private AmqpJMSVendor() {
-    }
-
-    @Override
-    public BytesMessage createBytesMessage() {
-        try {
-            return factory.createBytesMessage();
-        } catch (JMSException e) {
-            throw new RuntimeException(e);
-        }
-    }
-
-    @Override
-    public StreamMessage createStreamMessage() {
-        try {
-            return factory.createStreamMessage();
-        } catch (JMSException e) {
-            throw new RuntimeException(e);
-        }
-    }
-
-    @Override
-    public Message createMessage() {
-        try {
-            return factory.createMessage();
-        } catch (JMSException e) {
-            throw new RuntimeException(e);
-        }
-    }
-
-    @Override
-    public TextMessage createTextMessage() {
-        try {
-            return factory.createTextMessage();
-        } catch (JMSException e) {
-            throw new RuntimeException(e);
-        }
-    }
-
-    @Override
-    public ObjectMessage createObjectMessage() {
-        try {
-            return factory.createObjectMessage();
-        } catch (JMSException e) {
-            throw new RuntimeException(e);
-        }
-    }
-
-    @Override
-    public MapMessage createMapMessage() {
-        try {
-            return factory.createMapMessage();
-        } catch (JMSException e) {
-            throw new RuntimeException(e);
-        }
-    }
-
-    @Override
-    public Destination createDestination(String name) {
-        return super.createDestination(name, Destination.class);
-    }
-
-    @Override
-    public <T extends Destination> T createDestination(String name, Class<T> kind) {
-        if (kind == Queue.class) {
-            return kind.cast(new JmsQueue(name));
-        }
-        if (kind == Topic.class) {
-            return kind.cast(new JmsTopic(name));
-        }
-        if (kind == TemporaryQueue.class) {
-            return kind.cast(new JmsTemporaryQueue(name));
-        }
-        if (kind == TemporaryTopic.class) {
-            return kind.cast(new JmsTemporaryTopic(name));
-        }
-
-        return kind.cast(new JmsQueue(name));
-    }
-
-    @Override
-    public void setJMSXUserID(Message msg, String value) {
-        ((JmsMessage) msg).getFacade().setUserId(value);
-    }
-
-    @Override
-    public void setJMSXGroupID(Message msg, String value) {
-        ((JmsMessage) msg).getFacade().setGroupId(value);
-    }
-
-    @Override
-    public void setJMSXGroupSequence(Message msg, int value) {
-        ((JmsMessage) msg).getFacade().setGroupSequence(value);
-    }
-
-    @Override
-    public void setJMSXDeliveryCount(Message msg, long value) {
-        // Delivery count tracks total deliveries which is always one higher than
-        // re-delivery count since first delivery counts to.
-        ((JmsMessage) msg).getFacade().setRedeliveryCounter((int) (value == 0 ? value : value - 1));
-    }
-
-    @Override
-    public String toAddress(Destination dest) {
-        return ((JmsDestination) dest).getName();
-    }
-}


---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscribe@qpid.apache.org
For additional commands, e-mail: commits-help@qpid.apache.org