You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@activemq.apache.org by ac...@apache.org on 2008/02/19 05:37:56 UTC
svn commit: r628988 - in /activemq/trunk/activemq-core/src:
main/java/org/apache/activemq/ActiveMQInputStream.java
main/java/org/apache/activemq/ActiveMQOutputStream.java
test/java/org/apache/activemq/ActiveMQInputStreamTest.java
Author: aco
Date: Mon Feb 18 20:37:55 2008
New Revision: 628988
URL: http://svn.apache.org/viewvc?rev=628988&view=rev
Log:
https://issues.apache.org/activemq/browse/AMQ-1580
- Check for 0 length buffer when receiving byte messages
- Don't send an empty byte message if there is no data to flush
- Added test case from AMQQ-1580
Added:
activemq/trunk/activemq-core/src/test/java/org/apache/activemq/ActiveMQInputStreamTest.java
Modified:
activemq/trunk/activemq-core/src/main/java/org/apache/activemq/ActiveMQInputStream.java
activemq/trunk/activemq-core/src/main/java/org/apache/activemq/ActiveMQOutputStream.java
Modified: activemq/trunk/activemq-core/src/main/java/org/apache/activemq/ActiveMQInputStream.java
URL: http://svn.apache.org/viewvc/activemq/trunk/activemq-core/src/main/java/org/apache/activemq/ActiveMQInputStream.java?rev=628988&r1=628987&r2=628988&view=diff
==============================================================================
--- activemq/trunk/activemq-core/src/main/java/org/apache/activemq/ActiveMQInputStream.java (original)
+++ activemq/trunk/activemq-core/src/main/java/org/apache/activemq/ActiveMQInputStream.java Mon Feb 18 20:37:55 2008
@@ -174,15 +174,16 @@
public int read() throws IOException {
fillBuffer();
- if (eosReached) {
+ if (eosReached || buffer.length == 0) {
return -1;
}
+
return buffer[pos++] & 0xff;
}
public int read(byte[] b, int off, int len) throws IOException {
fillBuffer();
- if (eosReached) {
+ if (eosReached || buffer.length == 0) {
return -1;
}
Modified: activemq/trunk/activemq-core/src/main/java/org/apache/activemq/ActiveMQOutputStream.java
URL: http://svn.apache.org/viewvc/activemq/trunk/activemq-core/src/main/java/org/apache/activemq/ActiveMQOutputStream.java?rev=628988&r1=628987&r2=628988&view=diff
==============================================================================
--- activemq/trunk/activemq-core/src/main/java/org/apache/activemq/ActiveMQOutputStream.java (original)
+++ activemq/trunk/activemq-core/src/main/java/org/apache/activemq/ActiveMQOutputStream.java Mon Feb 18 20:37:55 2008
@@ -94,7 +94,7 @@
}
public synchronized void write(int b) throws IOException {
- buffer[count++] = (byte)b;
+ buffer[count++] = (byte) b;
if (count == buffer.length) {
flushBuffer();
}
@@ -120,14 +120,16 @@
}
private void flushBuffer() throws IOException {
- try {
- ActiveMQBytesMessage msg = new ActiveMQBytesMessage();
- msg.writeBytes(buffer, 0, count);
- send(msg, false);
- } catch (JMSException e) {
- throw IOExceptionSupport.create(e);
+ if (count != 0) {
+ try {
+ ActiveMQBytesMessage msg = new ActiveMQBytesMessage();
+ msg.writeBytes(buffer, 0, count);
+ send(msg, false);
+ } catch (JMSException e) {
+ throw IOExceptionSupport.create(e);
+ }
+ count = 0;
}
- count = 0;
}
/**
@@ -137,7 +139,7 @@
private void send(ActiveMQMessage msg, boolean eosMessage) throws JMSException {
if (properties != null) {
for (Iterator iter = properties.keySet().iterator(); iter.hasNext();) {
- String key = (String)iter.next();
+ String key = (String) iter.next();
Object value = properties.get(key);
msg.setObjectProperty(key, value);
}
@@ -147,7 +149,7 @@
if (eosMessage) {
msg.setGroupSequence(-1);
} else {
- msg.setGroupSequence((int)messageSequence);
+ msg.setGroupSequence((int) messageSequence);
}
MessageId id = new MessageId(info.getProducerId(), messageSequence++);
connection.send(info.getDestination(), msg, id, deliveryMode, priority, timeToLive, !eosMessage);
Added: activemq/trunk/activemq-core/src/test/java/org/apache/activemq/ActiveMQInputStreamTest.java
URL: http://svn.apache.org/viewvc/activemq/trunk/activemq-core/src/test/java/org/apache/activemq/ActiveMQInputStreamTest.java?rev=628988&view=auto
==============================================================================
--- activemq/trunk/activemq-core/src/test/java/org/apache/activemq/ActiveMQInputStreamTest.java (added)
+++ activemq/trunk/activemq-core/src/test/java/org/apache/activemq/ActiveMQInputStreamTest.java Mon Feb 18 20:37:55 2008
@@ -0,0 +1,86 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.activemq;
+
+import java.io.InputStream;
+import java.io.OutputStream;
+
+import javax.jms.Queue;
+import javax.jms.Session;
+
+import junit.framework.TestCase;
+import org.apache.activemq.broker.BrokerService;
+import org.apache.activemq.command.ActiveMQDestination;
+import org.apache.commons.logging.Log;
+import org.apache.commons.logging.LogFactory;
+
+public class ActiveMQInputStreamTest extends TestCase {
+
+ private static final Log LOG = LogFactory.getLog(ActiveMQInputStreamTest.class);
+
+ private static final String BROKER_URL = "tcp://localhost:61616";
+ private static final String DESTINATION = "destination";
+ private static final int STREAM_LENGTH = 64 * 1024 + 0; // change 0 to 1 to make it not crash
+
+ public void testInputStreamMatchesDefaultChuckSize() throws Exception {
+ BrokerService broker = new BrokerService();
+ broker.setUseJmx(false);
+ broker.setPersistent(false);
+ broker.setDestinations(new ActiveMQDestination[] {
+ ActiveMQDestination.createDestination(DESTINATION, ActiveMQDestination.QUEUE_TYPE),
+ });
+ broker.addConnector(BROKER_URL);
+ broker.start();
+
+ ActiveMQConnectionFactory connectionFactory = new ActiveMQConnectionFactory(BROKER_URL);
+ ActiveMQConnection connection = (ActiveMQConnection) connectionFactory.createConnection();
+ Session session = connection.createSession(false, Session.AUTO_ACKNOWLEDGE);
+ Queue destination = session.createQueue(DESTINATION);
+
+ OutputStream out = null;
+ try {
+ out = connection.createOutputStream(destination);
+ LOG.debug("writing...");
+ for (int i = 0; i < STREAM_LENGTH; ++i) {
+ out.write(0);
+ }
+ LOG.debug("wrote " + STREAM_LENGTH + " bytes");
+ } finally {
+ if (out != null) {
+ out.close();
+ }
+ }
+
+ InputStream in = null;
+ try {
+ in = connection.createInputStream(destination);
+ LOG.debug("reading...");
+ int count = 0;
+ while (-1 != in.read()) {
+ ++count;
+ }
+ LOG.debug("read " + count + " bytes");
+ } finally {
+ if (in != null) {
+ in.close();
+ }
+ }
+
+ connection.close();
+ broker.stop();
+ }
+}