You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@activemq.apache.org by ch...@apache.org on 2013/06/20 17:43:54 UTC
svn commit: r1495058 - in /activemq/trunk/activemq-amqp/src:
main/java/org/apache/activemq/transport/amqp/AmqpProtocolConverter.java
test/resources/test.properties
Author: chirino
Date: Thu Jun 20 15:43:54 2013
New Revision: 1495058
URL: http://svn.apache.org/r1495058
Log:
Fixes AMQ-4376 - JoramJmsTest QueueBrowserTest.testSenderBrowser fails intermittently.
Added:
activemq/trunk/activemq-amqp/src/test/resources/test.properties
Modified:
activemq/trunk/activemq-amqp/src/main/java/org/apache/activemq/transport/amqp/AmqpProtocolConverter.java
Modified: activemq/trunk/activemq-amqp/src/main/java/org/apache/activemq/transport/amqp/AmqpProtocolConverter.java
URL: http://svn.apache.org/viewvc/activemq/trunk/activemq-amqp/src/main/java/org/apache/activemq/transport/amqp/AmqpProtocolConverter.java?rev=1495058&r1=1495057&r2=1495058&view=diff
==============================================================================
--- activemq/trunk/activemq-amqp/src/main/java/org/apache/activemq/transport/amqp/AmqpProtocolConverter.java (original)
+++ activemq/trunk/activemq-amqp/src/main/java/org/apache/activemq/transport/amqp/AmqpProtocolConverter.java Thu Jun 20 15:43:54 2013
@@ -136,7 +136,7 @@ class AmqpProtocolConverter {
public void receivedFrame(TransportFrame transportFrame) {
if (TRACE_FRAMES.isTraceEnabled()) {
TRACE_FRAMES.trace(String.format("%s | RECV: %s",
- AmqpProtocolConverter.this.amqpTransport.getRemoteAddress(), transportFrame.getBody()));
+ AmqpProtocolConverter.this.amqpTransport.getRemoteAddress(), transportFrame.getBody()));
}
}
@@ -144,7 +144,7 @@ class AmqpProtocolConverter {
public void sentFrame(TransportFrame transportFrame) {
if (TRACE_FRAMES.isTraceEnabled()) {
TRACE_FRAMES.trace(String.format("%s | SENT: %s",
- AmqpProtocolConverter.this.amqpTransport.getRemoteAddress(), transportFrame.getBody()));
+ AmqpProtocolConverter.this.amqpTransport.getRemoteAddress(), transportFrame.getBody()));
}
}
});
@@ -741,6 +741,9 @@ class AmqpProtocolConverter {
private final Sender sender;
private final boolean presettle;
private boolean closed;
+ public ConsumerInfo info;
+ private boolean endOfBrowse = false;
+
public ConsumerContext(ConsumerId consumerId, Sender sender) {
this.consumerId = consumerId;
@@ -825,7 +828,8 @@ class AmqpProtocolConverter {
final ActiveMQMessage jms = (ActiveMQMessage) md.getMessage();
if (jms == null) {
// It's the end of browse signal.
- sender.drained();
+ endOfBrowse = true;
+ drainCheck();
} else {
jms.setRedeliveryCounter(md.getRedeliveryCounter());
jms.setReadOnlyBody(true);
@@ -900,6 +904,11 @@ class AmqpProtocolConverter {
@Override
public void drainCheck() {
+ // If we are a browser.. lets not say we are drained until
+ // we hit the end of browse message.
+ if( info.isBrowser() && !endOfBrowse)
+ return;
+
if (outbound.isEmpty()) {
sender.drained();
}
@@ -1018,6 +1027,7 @@ class AmqpProtocolConverter {
subscriptionsByConsumerId.put(id, consumerContext);
ConsumerInfo consumerInfo = new ConsumerInfo(id);
+ consumerContext.info = consumerInfo;
consumerInfo.setSelector(selector);
consumerInfo.setNoRangeAcks(true);
consumerInfo.setDestination(dest);
Added: activemq/trunk/activemq-amqp/src/test/resources/test.properties
URL: http://svn.apache.org/viewvc/activemq/trunk/activemq-amqp/src/test/resources/test.properties?rev=1495058&view=auto
==============================================================================
--- activemq/trunk/activemq-amqp/src/test/resources/test.properties (added)
+++ activemq/trunk/activemq-amqp/src/test/resources/test.properties Thu Jun 20 15:43:54 2013
@@ -0,0 +1,17 @@
+#
+# Licensed to the Apache Software Foundation (ASF) under one or more
+# contributor license agreements. See the NOTICE file distributed with
+# this work for additional information regarding copyright ownership.
+# The ASF licenses this file to You under the Apache License, Version 2.0
+# (the "License"); you may not use this file except in compliance with
+# the License. You may obtain a copy of the License at
+#
+# http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing, software
+# distributed under the License is distributed on an "AS IS" BASIS,
+# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+# See the License for the specific language governing permissions and
+# limitations under the License.
+#
+timeout=10000
\ No newline at end of file