You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@activemq.apache.org by ke...@apache.org on 2014/02/18 10:52:53 UTC

git commit: Fix for https://issues.apache.org/jira/browse/AMQ-5042. Handles receiving multiple frames at once from an nio channel

Repository: activemq
Updated Branches:
  refs/heads/trunk b97fa15d5 -> da63f3f20


Fix for https://issues.apache.org/jira/browse/AMQ-5042.  Handles receiving multiple frames at once from an nio channel


Project: http://git-wip-us.apache.org/repos/asf/activemq/repo
Commit: http://git-wip-us.apache.org/repos/asf/activemq/commit/da63f3f2
Tree: http://git-wip-us.apache.org/repos/asf/activemq/tree/da63f3f2
Diff: http://git-wip-us.apache.org/repos/asf/activemq/diff/da63f3f2

Branch: refs/heads/trunk
Commit: da63f3f20a348b29b43ef84bb7a4f6b02d2cd35c
Parents: b97fa15
Author: Kevin Earls <ke...@kevinearls.com>
Authored: Tue Feb 18 10:52:37 2014 +0100
Committer: Kevin Earls <ke...@kevinearls.com>
Committed: Tue Feb 18 10:52:37 2014 +0100

----------------------------------------------------------------------
 .../transport/amqp/AmqpNioTransport.java        |  28 ++++-
 .../transport/amqp/JMSClientNioTest.java        | 112 ++-----------------
 .../activemq/transport/amqp/JMSClientTest.java  |  25 ++++-
 3 files changed, 52 insertions(+), 113 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/activemq/blob/da63f3f2/activemq-amqp/src/main/java/org/apache/activemq/transport/amqp/AmqpNioTransport.java
----------------------------------------------------------------------
diff --git a/activemq-amqp/src/main/java/org/apache/activemq/transport/amqp/AmqpNioTransport.java b/activemq-amqp/src/main/java/org/apache/activemq/transport/amqp/AmqpNioTransport.java
index 665bf88..dfb5f60 100644
--- a/activemq-amqp/src/main/java/org/apache/activemq/transport/amqp/AmqpNioTransport.java
+++ b/activemq-amqp/src/main/java/org/apache/activemq/transport/amqp/AmqpNioTransport.java
@@ -16,6 +16,7 @@
  */
 package org.apache.activemq.transport.amqp;
 
+import java.io.ByteArrayInputStream;
 import java.io.DataInputStream;
 import java.io.DataOutputStream;
 import java.io.EOFException;
@@ -29,7 +30,6 @@ import java.nio.channels.SocketChannel;
 
 import javax.net.SocketFactory;
 
-import org.apache.activemq.transport.nio.NIOInputStream;
 import org.apache.activemq.transport.nio.NIOOutputStream;
 import org.apache.activemq.transport.nio.SelectorManager;
 import org.apache.activemq.transport.nio.SelectorSelection;
@@ -38,11 +38,15 @@ import org.apache.activemq.util.IOExceptionSupport;
 import org.apache.activemq.util.ServiceStopper;
 import org.apache.activemq.wireformat.WireFormat;
 import org.fusesource.hawtbuf.Buffer;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
 
 /**
  * An implementation of the {@link org.apache.activemq.transport.Transport} interface for using AMQP over NIO
  */
 public class AmqpNioTransport extends TcpTransport {
+    private DataInputStream amqpHeaderValue = new DataInputStream(new ByteArrayInputStream(new byte[]{'A', 'M', 'Q', 'P'}));
+    private final Integer AMQP_HEADER_VALUE = amqpHeaderValue.readInt();
 
     private SocketChannel channel;
     private SelectorSelection selection;
@@ -120,10 +124,28 @@ public class AmqpNioTransport extends TcpTransport {
                     }
                 }
 
-                doConsume(AmqpSupport.toBuffer(inputBuffer));
+                while(inputBuffer.position() < inputBuffer.limit()) {
+                    inputBuffer.mark();
+                    int commandSize = inputBuffer.getInt();
+                    inputBuffer.reset();
+
+                    // handles buffers starting with 'A','M','Q','P' rather than size
+                    if (commandSize == AMQP_HEADER_VALUE) {
+                        doConsume(AmqpSupport.toBuffer(inputBuffer));
+                        break;
+                    }
+
+                    byte[] bytes = new byte[commandSize];
+                    ByteBuffer commandBuffer = ByteBuffer.allocate(commandSize);
+                    inputBuffer.get(bytes, 0, commandSize);
+                    commandBuffer.put(bytes);
+                    commandBuffer.flip();
+                    doConsume(AmqpSupport.toBuffer(commandBuffer));
+                    commandBuffer.clear();
+                }
+
                 // clear the buffer
                 inputBuffer.clear();
-
             }
         } catch (IOException e) {
             onException(e);

http://git-wip-us.apache.org/repos/asf/activemq/blob/da63f3f2/activemq-amqp/src/test/java/org/apache/activemq/transport/amqp/JMSClientNioTest.java
----------------------------------------------------------------------
diff --git a/activemq-amqp/src/test/java/org/apache/activemq/transport/amqp/JMSClientNioTest.java b/activemq-amqp/src/test/java/org/apache/activemq/transport/amqp/JMSClientNioTest.java
index 42420e2..9004aa0 100644
--- a/activemq-amqp/src/test/java/org/apache/activemq/transport/amqp/JMSClientNioTest.java
+++ b/activemq-amqp/src/test/java/org/apache/activemq/transport/amqp/JMSClientNioTest.java
@@ -18,117 +18,19 @@ package org.apache.activemq.transport.amqp;
 
 import javax.jms.JMSException;
 
+import org.junit.Ignore;
 import org.junit.Test;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import java.io.ByteArrayInputStream;
+import java.io.DataInputStream;
 
 /**
  * Test the JMS client when connected to the NIO transport.
  */
 public class JMSClientNioTest extends JMSClientTest {
-
-    @Override
-    @Test
-    public void testProducerConsume() throws Exception {
-    }
-
-    @Override
-    @Test
-    public void testTransactedConsumer() throws Exception {
-    }
-
-    @Override
-    @Test
-    public void testRollbackRececeivedMessage() throws Exception {
-    }
-
-    @Override
-    @Test
-    public void testTXConsumerAndLargeNumberOfMessages() throws Exception {
-    }
-
-    @Override
-    @Test
-    public void testSelectors() throws Exception {
-    }
-
-    @Override
-    @Test(timeout=30000)
-    public void testProducerThrowsWhenBrokerStops() throws Exception {
-    }
-
-    @Override
-    @Test(timeout=30000)
-    public void testProducerCreateThrowsWhenBrokerStops() throws Exception {
-    }
-
-    @Override
-    @Test(timeout=30000)
-    public void testConsumerCreateThrowsWhenBrokerStops() throws Exception {
-    }
-
-    @Override
-    @Test(timeout=30000)
-    public void testConsumerReceiveNoWaitThrowsWhenBrokerStops() throws Exception {
-    }
-
-    @Override
-    @Test(timeout=30000)
-    public void testConsumerReceiveTimedThrowsWhenBrokerStops() throws Exception {
-    }
-
-    @Override
-    @Test(timeout=30000)
-    public void testConsumerReceiveReturnsBrokerStops() throws Exception {
-    }
-
-    @Override
-    @Test(timeout=30000)
-    public void testBrokerRestartWontHangConnectionClose() throws Exception {
-    }
-
-    @Override
-    @Test(timeout=120000)
-    public void testProduceAndConsumeLargeNumbersOfMessages() throws JMSException {
-    }
-
-    @Override
-    @Test(timeout=30000)
-    public void testSyncSends() throws Exception {
-    }
-
-    @Override
-    @Test(timeout=30000)
-    public void testDurableConsumerAsync() throws Exception {
-    }
-
-    @Override
-    @Test(timeout=30000)
-    public void testDurableConsumerSync() throws Exception {
-    }
-
-    @Override
-    @Test(timeout=30000)
-    public void testTopicConsumerAsync() throws Exception {
-    }
-
-    @Override
-    @Test(timeout=45000)
-    public void testTopicConsumerSync() throws Exception {
-    }
-
-    @Override
-    @Test(timeout=60000)
-    public void testConnectionsAreClosed() throws Exception {
-    }
-
-    @Override
-    @Test(timeout=30000)
-    public void testExecptionListenerCalledOnBrokerStop() throws Exception {
-    }
-
-    @Override
-    @Test(timeout=30000)
-    public void testSessionTransactedCommit() throws JMSException, InterruptedException {
-    }
+    protected static final Logger LOG = LoggerFactory.getLogger(JMSClientNioTest.class);
 
     @Override
     protected int getBrokerPort() {

http://git-wip-us.apache.org/repos/asf/activemq/blob/da63f3f2/activemq-amqp/src/test/java/org/apache/activemq/transport/amqp/JMSClientTest.java
----------------------------------------------------------------------
diff --git a/activemq-amqp/src/test/java/org/apache/activemq/transport/amqp/JMSClientTest.java b/activemq-amqp/src/test/java/org/apache/activemq/transport/amqp/JMSClientTest.java
index cb18f73..27719f9 100644
--- a/activemq-amqp/src/test/java/org/apache/activemq/transport/amqp/JMSClientTest.java
+++ b/activemq-amqp/src/test/java/org/apache/activemq/transport/amqp/JMSClientTest.java
@@ -1,4 +1,4 @@
-/**
+/**                           >>>>>> pumping
  * Licensed to the Apache Software Foundation (ASF) under one or more
  * contributor license agreements.  See the NOTICE file distributed with
  * this work for additional information regarding copyright ownership.
@@ -47,23 +47,36 @@ import org.apache.activemq.broker.jmx.QueueViewMBean;
 import org.apache.activemq.transport.amqp.joram.ActiveMQAdmin;
 import org.apache.activemq.util.Wait;
 import org.apache.qpid.amqp_1_0.jms.impl.ConnectionFactoryImpl;
+import org.junit.After;
 import org.junit.Before;
 import org.junit.Rule;
 import org.junit.Test;
 import org.junit.rules.TestName;
 import org.objectweb.jtests.jms.framework.TestConfig;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
 
 public class JMSClientTest extends AmqpTestSupport {
-
+    protected static final Logger LOG = LoggerFactory.getLogger(JMSClientTest.class);
     @Rule public TestName name = new TestName();
+    java.util.logging.Logger frameLoggger = java.util.logging.Logger.getLogger("FRM");
+
 
     @Override
     @Before
     public void setUp() throws Exception {
-        LOG.debug("Starting test {}", name.getMethodName());
+        LOG.debug("in setUp of {}", name.getMethodName());
         super.setUp();
     }
 
+    @Override
+    @After
+    public void tearDown() throws Exception {
+        LOG.debug("in tearDown of {}", name.getMethodName());
+        super.tearDown();
+        Thread.sleep(500);
+    }
+
     @SuppressWarnings("rawtypes")
     @Test(timeout=30000)
     public void testProducerConsume() throws Exception {
@@ -169,7 +182,7 @@ public class JMSClientTest extends AmqpTestSupport {
         connection.close();
     }
 
-    @Test(timeout=30000)
+    @Test(timeout=60000)
     public void testTXConsumerAndLargeNumberOfMessages() throws Exception {
 
         ActiveMQAdmin.enableJMSFrameTracing();
@@ -760,7 +773,9 @@ public class JMSClientTest extends AmqpTestSupport {
 
     private Connection createConnection(String clientId, boolean syncPublish) throws JMSException {
 
-        final ConnectionFactoryImpl factory = new ConnectionFactoryImpl("localhost", getBrokerPort(), "admin", "password");
+        int brokerPort = getBrokerPort();
+        LOG.debug("Creating connection on port {}", brokerPort);
+        final ConnectionFactoryImpl factory = new ConnectionFactoryImpl("localhost", brokerPort, "admin", "password");
 
         factory.setSyncPublish(syncPublish);
         factory.setTopicPrefix("topic://");