You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@activemq.apache.org by ke...@apache.org on 2014/02/18 10:52:53 UTC
git commit: Fix for https://issues.apache.org/jira/browse/AMQ-5042.
Handles receiving multiple frames at once from an nio channel
Repository: activemq
Updated Branches:
refs/heads/trunk b97fa15d5 -> da63f3f20
Fix for https://issues.apache.org/jira/browse/AMQ-5042. Handles receiving multiple frames at once from an nio channel
Project: http://git-wip-us.apache.org/repos/asf/activemq/repo
Commit: http://git-wip-us.apache.org/repos/asf/activemq/commit/da63f3f2
Tree: http://git-wip-us.apache.org/repos/asf/activemq/tree/da63f3f2
Diff: http://git-wip-us.apache.org/repos/asf/activemq/diff/da63f3f2
Branch: refs/heads/trunk
Commit: da63f3f20a348b29b43ef84bb7a4f6b02d2cd35c
Parents: b97fa15
Author: Kevin Earls <ke...@kevinearls.com>
Authored: Tue Feb 18 10:52:37 2014 +0100
Committer: Kevin Earls <ke...@kevinearls.com>
Committed: Tue Feb 18 10:52:37 2014 +0100
----------------------------------------------------------------------
.../transport/amqp/AmqpNioTransport.java | 28 ++++-
.../transport/amqp/JMSClientNioTest.java | 112 ++-----------------
.../activemq/transport/amqp/JMSClientTest.java | 25 ++++-
3 files changed, 52 insertions(+), 113 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/activemq/blob/da63f3f2/activemq-amqp/src/main/java/org/apache/activemq/transport/amqp/AmqpNioTransport.java
----------------------------------------------------------------------
diff --git a/activemq-amqp/src/main/java/org/apache/activemq/transport/amqp/AmqpNioTransport.java b/activemq-amqp/src/main/java/org/apache/activemq/transport/amqp/AmqpNioTransport.java
index 665bf88..dfb5f60 100644
--- a/activemq-amqp/src/main/java/org/apache/activemq/transport/amqp/AmqpNioTransport.java
+++ b/activemq-amqp/src/main/java/org/apache/activemq/transport/amqp/AmqpNioTransport.java
@@ -16,6 +16,7 @@
*/
package org.apache.activemq.transport.amqp;
+import java.io.ByteArrayInputStream;
import java.io.DataInputStream;
import java.io.DataOutputStream;
import java.io.EOFException;
@@ -29,7 +30,6 @@ import java.nio.channels.SocketChannel;
import javax.net.SocketFactory;
-import org.apache.activemq.transport.nio.NIOInputStream;
import org.apache.activemq.transport.nio.NIOOutputStream;
import org.apache.activemq.transport.nio.SelectorManager;
import org.apache.activemq.transport.nio.SelectorSelection;
@@ -38,11 +38,15 @@ import org.apache.activemq.util.IOExceptionSupport;
import org.apache.activemq.util.ServiceStopper;
import org.apache.activemq.wireformat.WireFormat;
import org.fusesource.hawtbuf.Buffer;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
/**
* An implementation of the {@link org.apache.activemq.transport.Transport} interface for using AMQP over NIO
*/
public class AmqpNioTransport extends TcpTransport {
+ private DataInputStream amqpHeaderValue = new DataInputStream(new ByteArrayInputStream(new byte[]{'A', 'M', 'Q', 'P'}));
+ private final Integer AMQP_HEADER_VALUE = amqpHeaderValue.readInt();
private SocketChannel channel;
private SelectorSelection selection;
@@ -120,10 +124,28 @@ public class AmqpNioTransport extends TcpTransport {
}
}
- doConsume(AmqpSupport.toBuffer(inputBuffer));
+ while(inputBuffer.position() < inputBuffer.limit()) {
+ inputBuffer.mark();
+ int commandSize = inputBuffer.getInt();
+ inputBuffer.reset();
+
+ // handles buffers starting with 'A','M','Q','P' rather than size
+ if (commandSize == AMQP_HEADER_VALUE) {
+ doConsume(AmqpSupport.toBuffer(inputBuffer));
+ break;
+ }
+
+ byte[] bytes = new byte[commandSize];
+ ByteBuffer commandBuffer = ByteBuffer.allocate(commandSize);
+ inputBuffer.get(bytes, 0, commandSize);
+ commandBuffer.put(bytes);
+ commandBuffer.flip();
+ doConsume(AmqpSupport.toBuffer(commandBuffer));
+ commandBuffer.clear();
+ }
+
// clear the buffer
inputBuffer.clear();
-
}
} catch (IOException e) {
onException(e);
http://git-wip-us.apache.org/repos/asf/activemq/blob/da63f3f2/activemq-amqp/src/test/java/org/apache/activemq/transport/amqp/JMSClientNioTest.java
----------------------------------------------------------------------
diff --git a/activemq-amqp/src/test/java/org/apache/activemq/transport/amqp/JMSClientNioTest.java b/activemq-amqp/src/test/java/org/apache/activemq/transport/amqp/JMSClientNioTest.java
index 42420e2..9004aa0 100644
--- a/activemq-amqp/src/test/java/org/apache/activemq/transport/amqp/JMSClientNioTest.java
+++ b/activemq-amqp/src/test/java/org/apache/activemq/transport/amqp/JMSClientNioTest.java
@@ -18,117 +18,19 @@ package org.apache.activemq.transport.amqp;
import javax.jms.JMSException;
+import org.junit.Ignore;
import org.junit.Test;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import java.io.ByteArrayInputStream;
+import java.io.DataInputStream;
/**
* Test the JMS client when connected to the NIO transport.
*/
public class JMSClientNioTest extends JMSClientTest {
-
- @Override
- @Test
- public void testProducerConsume() throws Exception {
- }
-
- @Override
- @Test
- public void testTransactedConsumer() throws Exception {
- }
-
- @Override
- @Test
- public void testRollbackRececeivedMessage() throws Exception {
- }
-
- @Override
- @Test
- public void testTXConsumerAndLargeNumberOfMessages() throws Exception {
- }
-
- @Override
- @Test
- public void testSelectors() throws Exception {
- }
-
- @Override
- @Test(timeout=30000)
- public void testProducerThrowsWhenBrokerStops() throws Exception {
- }
-
- @Override
- @Test(timeout=30000)
- public void testProducerCreateThrowsWhenBrokerStops() throws Exception {
- }
-
- @Override
- @Test(timeout=30000)
- public void testConsumerCreateThrowsWhenBrokerStops() throws Exception {
- }
-
- @Override
- @Test(timeout=30000)
- public void testConsumerReceiveNoWaitThrowsWhenBrokerStops() throws Exception {
- }
-
- @Override
- @Test(timeout=30000)
- public void testConsumerReceiveTimedThrowsWhenBrokerStops() throws Exception {
- }
-
- @Override
- @Test(timeout=30000)
- public void testConsumerReceiveReturnsBrokerStops() throws Exception {
- }
-
- @Override
- @Test(timeout=30000)
- public void testBrokerRestartWontHangConnectionClose() throws Exception {
- }
-
- @Override
- @Test(timeout=120000)
- public void testProduceAndConsumeLargeNumbersOfMessages() throws JMSException {
- }
-
- @Override
- @Test(timeout=30000)
- public void testSyncSends() throws Exception {
- }
-
- @Override
- @Test(timeout=30000)
- public void testDurableConsumerAsync() throws Exception {
- }
-
- @Override
- @Test(timeout=30000)
- public void testDurableConsumerSync() throws Exception {
- }
-
- @Override
- @Test(timeout=30000)
- public void testTopicConsumerAsync() throws Exception {
- }
-
- @Override
- @Test(timeout=45000)
- public void testTopicConsumerSync() throws Exception {
- }
-
- @Override
- @Test(timeout=60000)
- public void testConnectionsAreClosed() throws Exception {
- }
-
- @Override
- @Test(timeout=30000)
- public void testExecptionListenerCalledOnBrokerStop() throws Exception {
- }
-
- @Override
- @Test(timeout=30000)
- public void testSessionTransactedCommit() throws JMSException, InterruptedException {
- }
+ protected static final Logger LOG = LoggerFactory.getLogger(JMSClientNioTest.class);
@Override
protected int getBrokerPort() {
http://git-wip-us.apache.org/repos/asf/activemq/blob/da63f3f2/activemq-amqp/src/test/java/org/apache/activemq/transport/amqp/JMSClientTest.java
----------------------------------------------------------------------
diff --git a/activemq-amqp/src/test/java/org/apache/activemq/transport/amqp/JMSClientTest.java b/activemq-amqp/src/test/java/org/apache/activemq/transport/amqp/JMSClientTest.java
index cb18f73..27719f9 100644
--- a/activemq-amqp/src/test/java/org/apache/activemq/transport/amqp/JMSClientTest.java
+++ b/activemq-amqp/src/test/java/org/apache/activemq/transport/amqp/JMSClientTest.java
@@ -1,4 +1,4 @@
-/**
+/** >>>>>> pumping
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
@@ -47,23 +47,36 @@ import org.apache.activemq.broker.jmx.QueueViewMBean;
import org.apache.activemq.transport.amqp.joram.ActiveMQAdmin;
import org.apache.activemq.util.Wait;
import org.apache.qpid.amqp_1_0.jms.impl.ConnectionFactoryImpl;
+import org.junit.After;
import org.junit.Before;
import org.junit.Rule;
import org.junit.Test;
import org.junit.rules.TestName;
import org.objectweb.jtests.jms.framework.TestConfig;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
public class JMSClientTest extends AmqpTestSupport {
-
+ protected static final Logger LOG = LoggerFactory.getLogger(JMSClientTest.class);
@Rule public TestName name = new TestName();
+ java.util.logging.Logger frameLoggger = java.util.logging.Logger.getLogger("FRM");
+
@Override
@Before
public void setUp() throws Exception {
- LOG.debug("Starting test {}", name.getMethodName());
+ LOG.debug("in setUp of {}", name.getMethodName());
super.setUp();
}
+ @Override
+ @After
+ public void tearDown() throws Exception {
+ LOG.debug("in tearDown of {}", name.getMethodName());
+ super.tearDown();
+ Thread.sleep(500);
+ }
+
@SuppressWarnings("rawtypes")
@Test(timeout=30000)
public void testProducerConsume() throws Exception {
@@ -169,7 +182,7 @@ public class JMSClientTest extends AmqpTestSupport {
connection.close();
}
- @Test(timeout=30000)
+ @Test(timeout=60000)
public void testTXConsumerAndLargeNumberOfMessages() throws Exception {
ActiveMQAdmin.enableJMSFrameTracing();
@@ -760,7 +773,9 @@ public class JMSClientTest extends AmqpTestSupport {
private Connection createConnection(String clientId, boolean syncPublish) throws JMSException {
- final ConnectionFactoryImpl factory = new ConnectionFactoryImpl("localhost", getBrokerPort(), "admin", "password");
+ int brokerPort = getBrokerPort();
+ LOG.debug("Creating connection on port {}", brokerPort);
+ final ConnectionFactoryImpl factory = new ConnectionFactoryImpl("localhost", brokerPort, "admin", "password");
factory.setSyncPublish(syncPublish);
factory.setTopicPrefix("topic://");