You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@activemq.apache.org by ac...@apache.org on 2008/02/19 05:37:56 UTC

svn commit: r628988 - in /activemq/trunk/activemq-core/src: main/java/org/apache/activemq/ActiveMQInputStream.java main/java/org/apache/activemq/ActiveMQOutputStream.java test/java/org/apache/activemq/ActiveMQInputStreamTest.java

Author: aco
Date: Mon Feb 18 20:37:55 2008
New Revision: 628988

URL: http://svn.apache.org/viewvc?rev=628988&view=rev
Log:
https://issues.apache.org/activemq/browse/AMQ-1580
- Check for 0 length buffer when receiving byte messages
- Don't send an empty byte message if there is no data to flush
- Added test case from AMQQ-1580

Added:
    activemq/trunk/activemq-core/src/test/java/org/apache/activemq/ActiveMQInputStreamTest.java
Modified:
    activemq/trunk/activemq-core/src/main/java/org/apache/activemq/ActiveMQInputStream.java
    activemq/trunk/activemq-core/src/main/java/org/apache/activemq/ActiveMQOutputStream.java

Modified: activemq/trunk/activemq-core/src/main/java/org/apache/activemq/ActiveMQInputStream.java
URL: http://svn.apache.org/viewvc/activemq/trunk/activemq-core/src/main/java/org/apache/activemq/ActiveMQInputStream.java?rev=628988&r1=628987&r2=628988&view=diff
==============================================================================
--- activemq/trunk/activemq-core/src/main/java/org/apache/activemq/ActiveMQInputStream.java (original)
+++ activemq/trunk/activemq-core/src/main/java/org/apache/activemq/ActiveMQInputStream.java Mon Feb 18 20:37:55 2008
@@ -174,15 +174,16 @@
 
     public int read() throws IOException {
         fillBuffer();
-        if (eosReached) {
+        if (eosReached || buffer.length == 0) {
             return -1;
         }
+
         return buffer[pos++] & 0xff;
     }
 
     public int read(byte[] b, int off, int len) throws IOException {
         fillBuffer();
-        if (eosReached) {
+        if (eosReached || buffer.length == 0) {
             return -1;
         }
 

Modified: activemq/trunk/activemq-core/src/main/java/org/apache/activemq/ActiveMQOutputStream.java
URL: http://svn.apache.org/viewvc/activemq/trunk/activemq-core/src/main/java/org/apache/activemq/ActiveMQOutputStream.java?rev=628988&r1=628987&r2=628988&view=diff
==============================================================================
--- activemq/trunk/activemq-core/src/main/java/org/apache/activemq/ActiveMQOutputStream.java (original)
+++ activemq/trunk/activemq-core/src/main/java/org/apache/activemq/ActiveMQOutputStream.java Mon Feb 18 20:37:55 2008
@@ -94,7 +94,7 @@
     }
 
     public synchronized void write(int b) throws IOException {
-        buffer[count++] = (byte)b;
+        buffer[count++] = (byte) b;
         if (count == buffer.length) {
             flushBuffer();
         }
@@ -120,14 +120,16 @@
     }
 
     private void flushBuffer() throws IOException {
-        try {
-            ActiveMQBytesMessage msg = new ActiveMQBytesMessage();
-            msg.writeBytes(buffer, 0, count);
-            send(msg, false);
-        } catch (JMSException e) {
-            throw IOExceptionSupport.create(e);
+        if (count != 0) {
+            try {
+                ActiveMQBytesMessage msg = new ActiveMQBytesMessage();
+                msg.writeBytes(buffer, 0, count);
+                send(msg, false);
+            } catch (JMSException e) {
+                throw IOExceptionSupport.create(e);
+            }
+            count = 0;
         }
-        count = 0;
     }
 
     /**
@@ -137,7 +139,7 @@
     private void send(ActiveMQMessage msg, boolean eosMessage) throws JMSException {
         if (properties != null) {
             for (Iterator iter = properties.keySet().iterator(); iter.hasNext();) {
-                String key = (String)iter.next();
+                String key = (String) iter.next();
                 Object value = properties.get(key);
                 msg.setObjectProperty(key, value);
             }
@@ -147,7 +149,7 @@
         if (eosMessage) {
             msg.setGroupSequence(-1);
         } else {
-            msg.setGroupSequence((int)messageSequence);
+            msg.setGroupSequence((int) messageSequence);
         }
         MessageId id = new MessageId(info.getProducerId(), messageSequence++);
         connection.send(info.getDestination(), msg, id, deliveryMode, priority, timeToLive, !eosMessage);

Added: activemq/trunk/activemq-core/src/test/java/org/apache/activemq/ActiveMQInputStreamTest.java
URL: http://svn.apache.org/viewvc/activemq/trunk/activemq-core/src/test/java/org/apache/activemq/ActiveMQInputStreamTest.java?rev=628988&view=auto
==============================================================================
--- activemq/trunk/activemq-core/src/test/java/org/apache/activemq/ActiveMQInputStreamTest.java (added)
+++ activemq/trunk/activemq-core/src/test/java/org/apache/activemq/ActiveMQInputStreamTest.java Mon Feb 18 20:37:55 2008
@@ -0,0 +1,86 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.activemq;
+
+import java.io.InputStream;
+import java.io.OutputStream;
+
+import javax.jms.Queue;
+import javax.jms.Session;
+
+import junit.framework.TestCase;
+import org.apache.activemq.broker.BrokerService;
+import org.apache.activemq.command.ActiveMQDestination;
+import org.apache.commons.logging.Log;
+import org.apache.commons.logging.LogFactory;
+
+public class ActiveMQInputStreamTest extends TestCase {
+
+    private static final Log LOG = LogFactory.getLog(ActiveMQInputStreamTest.class);
+
+    private static final String BROKER_URL = "tcp://localhost:61616";
+    private static final String DESTINATION = "destination";
+    private static final int STREAM_LENGTH = 64 * 1024 + 0; // change 0 to 1 to make it not crash
+
+    public void testInputStreamMatchesDefaultChuckSize() throws Exception {
+        BrokerService broker = new BrokerService();
+        broker.setUseJmx(false);
+        broker.setPersistent(false);
+        broker.setDestinations(new ActiveMQDestination[] {
+            ActiveMQDestination.createDestination(DESTINATION, ActiveMQDestination.QUEUE_TYPE),
+        });
+        broker.addConnector(BROKER_URL);
+        broker.start();
+
+        ActiveMQConnectionFactory connectionFactory = new ActiveMQConnectionFactory(BROKER_URL);
+        ActiveMQConnection connection = (ActiveMQConnection) connectionFactory.createConnection();
+        Session session = connection.createSession(false, Session.AUTO_ACKNOWLEDGE);
+        Queue destination = session.createQueue(DESTINATION);
+
+        OutputStream out = null;
+        try {
+            out = connection.createOutputStream(destination);
+            LOG.debug("writing...");
+            for (int i = 0; i < STREAM_LENGTH; ++i) {
+                out.write(0);
+            }
+            LOG.debug("wrote " + STREAM_LENGTH + " bytes");
+        } finally {
+            if (out != null) {
+                out.close();
+            }
+        }
+
+        InputStream in = null;
+        try {
+            in = connection.createInputStream(destination);
+            LOG.debug("reading...");
+            int count = 0;
+            while (-1 != in.read()) {
+                ++count;
+            }
+            LOG.debug("read " + count + " bytes");
+        } finally {
+            if (in != null) {
+                in.close();
+            }
+        }
+
+        connection.close();
+        broker.stop();
+    }
+}