You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@qpid.apache.org by ta...@apache.org on 2018/07/12 16:34:12 UTC

qpid-jms git commit: QPIDJMS-401 Clean up some older code in the AMQP provider

Repository: qpid-jms
Updated Branches:
  refs/heads/master fa1334718 -> a21542cca


QPIDJMS-401 Clean up some older code in the AMQP provider

Remove some now unused code paths and tests that no longer apply to how
we manage consumer and producer resources.

Project: http://git-wip-us.apache.org/repos/asf/qpid-jms/repo
Commit: http://git-wip-us.apache.org/repos/asf/qpid-jms/commit/a21542cc
Tree: http://git-wip-us.apache.org/repos/asf/qpid-jms/tree/a21542cc
Diff: http://git-wip-us.apache.org/repos/asf/qpid-jms/diff/a21542cc

Branch: refs/heads/master
Commit: a21542cca7ca3098f03e9f7441fea5d23a1cd42e
Parents: fa13347
Author: Timothy Bish <ta...@gmail.com>
Authored: Thu Jul 12 12:33:52 2018 -0400
Committer: Timothy Bish <ta...@gmail.com>
Committed: Thu Jul 12 12:33:52 2018 -0400

----------------------------------------------------------------------
 .../qpid/jms/provider/amqp/AmqpProvider.java    | 36 ++++----------------
 .../qpid/jms/provider/amqp/AmqpSession.java     | 11 +++---
 2 files changed, 10 insertions(+), 37 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/qpid-jms/blob/a21542cc/qpid-jms-client/src/main/java/org/apache/qpid/jms/provider/amqp/AmqpProvider.java
----------------------------------------------------------------------
diff --git a/qpid-jms-client/src/main/java/org/apache/qpid/jms/provider/amqp/AmqpProvider.java b/qpid-jms-client/src/main/java/org/apache/qpid/jms/provider/amqp/AmqpProvider.java
index 890ebdb..a4dbc39 100644
--- a/qpid-jms-client/src/main/java/org/apache/qpid/jms/provider/amqp/AmqpProvider.java
+++ b/qpid-jms-client/src/main/java/org/apache/qpid/jms/provider/amqp/AmqpProvider.java
@@ -89,7 +89,6 @@ import org.slf4j.LoggerFactory;
 
 import io.netty.buffer.ByteBuf;
 import io.netty.buffer.ByteBufUtil;
-import io.netty.util.ReferenceCountUtil;
 
 /**
  * An AMQP v1.0 Provider.
@@ -633,17 +632,8 @@ public class AmqpProvider implements Provider, TransportListener , AmqpResourceP
             public void run() {
                 try {
                     checkClosedOrFailed();
-
                     JmsProducerId producerId = envelope.getProducerId();
-                    AmqpProducer producer = null;
-
-                    if (producerId.getProviderHint() instanceof AmqpFixedProducer) {
-                        producer = (AmqpFixedProducer) producerId.getProviderHint();
-                    } else {
-                        AmqpSession session = connection.getSession(producerId.getParentId());
-                        producer = session.getProducer(producerId);
-                    }
-
+                    AmqpProducer producer = (AmqpProducer) producerId.getProviderHint();
                     producer.send(envelope, request);
                 } catch (Throwable t) {
                     request.onFailure(t);
@@ -683,14 +673,7 @@ public class AmqpProvider implements Provider, TransportListener , AmqpResourceP
                     checkClosedOrFailed();
 
                     JmsConsumerId consumerId = envelope.getConsumerId();
-                    AmqpConsumer consumer = null;
-
-                    if (consumerId.getProviderHint() instanceof AmqpConsumer) {
-                        consumer = (AmqpConsumer) consumerId.getProviderHint();
-                    } else {
-                        AmqpSession session = connection.getSession(consumerId.getParentId());
-                        consumer = session.getConsumer(consumerId);
-                    }
+                    AmqpConsumer consumer = (AmqpConsumer) consumerId.getProviderHint();
 
                     consumer.acknowledge(envelope, ackType);
 
@@ -794,15 +777,7 @@ public class AmqpProvider implements Provider, TransportListener , AmqpResourceP
             public void run() {
                 try {
                     checkClosedOrFailed();
-                    AmqpConsumer consumer = null;
-
-                    if (consumerId.getProviderHint() instanceof AmqpConsumer) {
-                        consumer = (AmqpConsumer) consumerId.getProviderHint();
-                    } else {
-                        AmqpSession session = connection.getSession(consumerId.getParentId());
-                        consumer = session.getConsumer(consumerId);
-                    }
-
+                    AmqpConsumer consumer = (AmqpConsumer) consumerId.getProviderHint();
                     consumer.pull(timeout, request);
                     pumpToProtonTransport(request);
                 } catch (Throwable t) {
@@ -834,7 +809,7 @@ public class AmqpProvider implements Provider, TransportListener , AmqpResourceP
     public void onData(final ByteBuf input) {
 
         // We need to retain until the serializer gets around to processing it.
-        ReferenceCountUtil.retain(input);
+        input.retain();
 
         serializer.execute(new Runnable() {
 
@@ -853,7 +828,8 @@ public class AmqpProvider implements Provider, TransportListener , AmqpResourceP
                         protonTransport.process();
                     } while (input.isReadable());
 
-                    ReferenceCountUtil.release(input);
+                    // Free for pooled memory to be put back now.
+                    input.release();
 
                     // Process the state changes from the latest data and then answer back
                     // any pending updates to the Broker.

http://git-wip-us.apache.org/repos/asf/qpid-jms/blob/a21542cc/qpid-jms-client/src/main/java/org/apache/qpid/jms/provider/amqp/AmqpSession.java
----------------------------------------------------------------------
diff --git a/qpid-jms-client/src/main/java/org/apache/qpid/jms/provider/amqp/AmqpSession.java b/qpid-jms-client/src/main/java/org/apache/qpid/jms/provider/amqp/AmqpSession.java
index 61299c7..83f12f7 100644
--- a/qpid-jms-client/src/main/java/org/apache/qpid/jms/provider/amqp/AmqpSession.java
+++ b/qpid-jms-client/src/main/java/org/apache/qpid/jms/provider/amqp/AmqpSession.java
@@ -94,15 +94,13 @@ public class AmqpSession extends AmqpAbstractResource<JmsSessionInfo, Session> i
     }
 
     public AmqpProducer getProducer(JmsProducerInfo producerInfo) {
-        return getProducer(producerInfo.getId());
-    }
+        JmsProducerId producerId = producerInfo.getId();
 
-    public AmqpProducer getProducer(JmsProducerId producerId) {
         if (producerId.getProviderHint() instanceof AmqpProducer) {
             return (AmqpProducer) producerId.getProviderHint();
         }
 
-        return null;
+        return producers.get(producerId);
     }
 
     public void createConsumer(JmsConsumerInfo consumerInfo, AsyncResult request) {
@@ -111,13 +109,12 @@ public class AmqpSession extends AmqpAbstractResource<JmsSessionInfo, Session> i
     }
 
     public AmqpConsumer getConsumer(JmsConsumerInfo consumerInfo) {
-        return getConsumer(consumerInfo.getId());
-    }
+        JmsConsumerId consumerId = consumerInfo.getId();
 
-    public AmqpConsumer getConsumer(JmsConsumerId consumerId) {
         if (consumerId.getProviderHint() instanceof AmqpConsumer) {
             return (AmqpConsumer) consumerId.getProviderHint();
         }
+
         return consumers.get(consumerId);
     }
 


---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscribe@qpid.apache.org
For additional commands, e-mail: commits-help@qpid.apache.org