You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@activemq.apache.org by ch...@apache.org on 2013/06/20 17:43:54 UTC

svn commit: r1495058 - in /activemq/trunk/activemq-amqp/src: main/java/org/apache/activemq/transport/amqp/AmqpProtocolConverter.java test/resources/test.properties

Author: chirino
Date: Thu Jun 20 15:43:54 2013
New Revision: 1495058

URL: http://svn.apache.org/r1495058
Log:
Fixes AMQ-4376 - JoramJmsTest QueueBrowserTest.testSenderBrowser fails intermittently.

Added:
    activemq/trunk/activemq-amqp/src/test/resources/test.properties
Modified:
    activemq/trunk/activemq-amqp/src/main/java/org/apache/activemq/transport/amqp/AmqpProtocolConverter.java

Modified: activemq/trunk/activemq-amqp/src/main/java/org/apache/activemq/transport/amqp/AmqpProtocolConverter.java
URL: http://svn.apache.org/viewvc/activemq/trunk/activemq-amqp/src/main/java/org/apache/activemq/transport/amqp/AmqpProtocolConverter.java?rev=1495058&r1=1495057&r2=1495058&view=diff
==============================================================================
--- activemq/trunk/activemq-amqp/src/main/java/org/apache/activemq/transport/amqp/AmqpProtocolConverter.java (original)
+++ activemq/trunk/activemq-amqp/src/main/java/org/apache/activemq/transport/amqp/AmqpProtocolConverter.java Thu Jun 20 15:43:54 2013
@@ -136,7 +136,7 @@ class AmqpProtocolConverter {
                 public void receivedFrame(TransportFrame transportFrame) {
                     if (TRACE_FRAMES.isTraceEnabled()) {
                         TRACE_FRAMES.trace(String.format("%s | RECV: %s",
-                            AmqpProtocolConverter.this.amqpTransport.getRemoteAddress(), transportFrame.getBody()));
+                                AmqpProtocolConverter.this.amqpTransport.getRemoteAddress(), transportFrame.getBody()));
                     }
                 }
 
@@ -144,7 +144,7 @@ class AmqpProtocolConverter {
                 public void sentFrame(TransportFrame transportFrame) {
                     if (TRACE_FRAMES.isTraceEnabled()) {
                         TRACE_FRAMES.trace(String.format("%s | SENT: %s",
-                            AmqpProtocolConverter.this.amqpTransport.getRemoteAddress(), transportFrame.getBody()));
+                                AmqpProtocolConverter.this.amqpTransport.getRemoteAddress(), transportFrame.getBody()));
                     }
                 }
             });
@@ -741,6 +741,9 @@ class AmqpProtocolConverter {
         private final Sender sender;
         private final boolean presettle;
         private boolean closed;
+        public ConsumerInfo info;
+        private boolean endOfBrowse = false;
+
 
         public ConsumerContext(ConsumerId consumerId, Sender sender) {
             this.consumerId = consumerId;
@@ -825,7 +828,8 @@ class AmqpProtocolConverter {
                     final ActiveMQMessage jms = (ActiveMQMessage) md.getMessage();
                     if (jms == null) {
                         // It's the end of browse signal.
-                        sender.drained();
+                        endOfBrowse = true;
+                        drainCheck();
                     } else {
                         jms.setRedeliveryCounter(md.getRedeliveryCounter());
                         jms.setReadOnlyBody(true);
@@ -900,6 +904,11 @@ class AmqpProtocolConverter {
 
         @Override
         public void drainCheck() {
+            // If we are a browser.. lets not say we are drained until
+            // we hit the end of browse message.
+            if( info.isBrowser() && !endOfBrowse)
+                return;
+
             if (outbound.isEmpty()) {
                 sender.drained();
             }
@@ -1018,6 +1027,7 @@ class AmqpProtocolConverter {
 
             subscriptionsByConsumerId.put(id, consumerContext);
             ConsumerInfo consumerInfo = new ConsumerInfo(id);
+            consumerContext.info = consumerInfo;
             consumerInfo.setSelector(selector);
             consumerInfo.setNoRangeAcks(true);
             consumerInfo.setDestination(dest);

Added: activemq/trunk/activemq-amqp/src/test/resources/test.properties
URL: http://svn.apache.org/viewvc/activemq/trunk/activemq-amqp/src/test/resources/test.properties?rev=1495058&view=auto
==============================================================================
--- activemq/trunk/activemq-amqp/src/test/resources/test.properties (added)
+++ activemq/trunk/activemq-amqp/src/test/resources/test.properties Thu Jun 20 15:43:54 2013
@@ -0,0 +1,17 @@
+#
+# Licensed to the Apache Software Foundation (ASF) under one or more
+# contributor license agreements.  See the NOTICE file distributed with
+# this work for additional information regarding copyright ownership.
+# The ASF licenses this file to You under the Apache License, Version 2.0
+# (the "License"); you may not use this file except in compliance with
+# the License.  You may obtain a copy of the License at
+#
+#      http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing, software
+# distributed under the License is distributed on an "AS IS" BASIS,
+# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+# See the License for the specific language governing permissions and
+# limitations under the License.
+#
+timeout=10000
\ No newline at end of file