You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@activemq.apache.org by ha...@apache.org on 2014/04/29 20:34:24 UTC
[12/14] Use autocrlf consistently for line endings
http://git-wip-us.apache.org/repos/asf/activemq/blob/3f32507f/activemq-console/src/main/java/org/apache/activemq/console/command/store/tar/TarOutputStream.java
----------------------------------------------------------------------
diff --git a/activemq-console/src/main/java/org/apache/activemq/console/command/store/tar/TarOutputStream.java b/activemq-console/src/main/java/org/apache/activemq/console/command/store/tar/TarOutputStream.java
index 26de93f..ddd288d 100644
--- a/activemq-console/src/main/java/org/apache/activemq/console/command/store/tar/TarOutputStream.java
+++ b/activemq-console/src/main/java/org/apache/activemq/console/command/store/tar/TarOutputStream.java
@@ -1,356 +1,356 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements. See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- *
- */
-
-/*
- * This package is based on the work done by Timothy Gerard Endres
- * (time@ice.com) to whom the Ant project is very grateful for his great code.
- */
-
-package org.apache.activemq.console.command.store.tar;
-
-import java.io.FilterOutputStream;
-import java.io.OutputStream;
-import java.io.IOException;
-
-/**
- * The TarOutputStream writes a UNIX tar archive as an OutputStream.
- * Methods are provided to put entries, and then write their contents
- * by writing to this stream using write().
- *
- */
-public class TarOutputStream extends FilterOutputStream {
- /** Fail if a long file name is required in the archive. */
- public static final int LONGFILE_ERROR = 0;
-
- /** Long paths will be truncated in the archive. */
- public static final int LONGFILE_TRUNCATE = 1;
-
- /** GNU tar extensions are used to store long file names in the archive. */
- public static final int LONGFILE_GNU = 2;
-
- // CheckStyle:VisibilityModifier OFF - bc
- protected boolean debug;
- protected long currSize;
- protected String currName;
- protected long currBytes;
- protected byte[] oneBuf;
- protected byte[] recordBuf;
- protected int assemLen;
- protected byte[] assemBuf;
- protected TarBuffer buffer;
- protected int longFileMode = LONGFILE_ERROR;
- // CheckStyle:VisibilityModifier ON
-
- private boolean closed = false;
-
- /**
- * Constructor for TarInputStream.
- * @param os the output stream to use
- */
- public TarOutputStream(OutputStream os) {
- this(os, TarBuffer.DEFAULT_BLKSIZE, TarBuffer.DEFAULT_RCDSIZE);
- }
-
- /**
- * Constructor for TarInputStream.
- * @param os the output stream to use
- * @param blockSize the block size to use
- */
- public TarOutputStream(OutputStream os, int blockSize) {
- this(os, blockSize, TarBuffer.DEFAULT_RCDSIZE);
- }
-
- /**
- * Constructor for TarInputStream.
- * @param os the output stream to use
- * @param blockSize the block size to use
- * @param recordSize the record size to use
- */
- public TarOutputStream(OutputStream os, int blockSize, int recordSize) {
- super(os);
-
- this.buffer = new TarBuffer(os, blockSize, recordSize);
- this.debug = false;
- this.assemLen = 0;
- this.assemBuf = new byte[recordSize];
- this.recordBuf = new byte[recordSize];
- this.oneBuf = new byte[1];
- }
-
- /**
- * Set the long file mode.
- * This can be LONGFILE_ERROR(0), LONGFILE_TRUNCATE(1) or LONGFILE_GNU(2).
- * This specifies the treatment of long file names (names >= TarConstants.NAMELEN).
- * Default is LONGFILE_ERROR.
- * @param longFileMode the mode to use
- */
- public void setLongFileMode(int longFileMode) {
- this.longFileMode = longFileMode;
- }
-
-
- /**
- * Sets the debugging flag.
- *
- * @param debugF True to turn on debugging.
- */
- public void setDebug(boolean debugF) {
- this.debug = debugF;
- }
-
- /**
- * Sets the debugging flag in this stream's TarBuffer.
- *
- * @param debug True to turn on debugging.
- */
- public void setBufferDebug(boolean debug) {
- buffer.setDebug(debug);
- }
-
- /**
- * Ends the TAR archive without closing the underlying OutputStream.
- * The result is that the two EOF records of nulls are written.
- * @throws IOException on error
- */
- public void finish() throws IOException {
- // See Bugzilla 28776 for a discussion on this
- // http://issues.apache.org/bugzilla/show_bug.cgi?id=28776
- writeEOFRecord();
- writeEOFRecord();
- buffer.flushBlock();
- }
-
- /**
- * Ends the TAR archive and closes the underlying OutputStream.
- * This means that finish() is called followed by calling the
- * TarBuffer's close().
- * @throws IOException on error
- */
- public void close() throws IOException {
- if (!closed) {
- finish();
- buffer.close();
- out.close();
- closed = true;
- }
- }
-
- /**
- * Get the record size being used by this stream's TarBuffer.
- *
- * @return The TarBuffer record size.
- */
- public int getRecordSize() {
- return buffer.getRecordSize();
- }
-
- /**
- * Put an entry on the output stream. This writes the entry's
- * header record and positions the output stream for writing
- * the contents of the entry. Once this method is called, the
- * stream is ready for calls to write() to write the entry's
- * contents. Once the contents are written, closeEntry()
- * <B>MUST</B> be called to ensure that all buffered data
- * is completely written to the output stream.
- *
- * @param entry The TarEntry to be written to the archive.
- * @throws IOException on error
- */
- public void putNextEntry(TarEntry entry) throws IOException {
- if (entry.getName().length() >= TarConstants.NAMELEN) {
-
- if (longFileMode == LONGFILE_GNU) {
- // create a TarEntry for the LongLink, the contents
- // of which are the entry's name
- TarEntry longLinkEntry = new TarEntry(TarConstants.GNU_LONGLINK,
- TarConstants.LF_GNUTYPE_LONGNAME);
-
- longLinkEntry.setSize(entry.getName().length() + 1);
- putNextEntry(longLinkEntry);
- write(entry.getName().getBytes());
- write(0);
- closeEntry();
- } else if (longFileMode != LONGFILE_TRUNCATE) {
- throw new RuntimeException("file name '" + entry.getName()
- + "' is too long ( > "
- + TarConstants.NAMELEN + " bytes)");
- }
- }
-
- entry.writeEntryHeader(recordBuf);
- buffer.writeRecord(recordBuf);
-
- currBytes = 0;
-
- if (entry.isDirectory()) {
- currSize = 0;
- } else {
- currSize = entry.getSize();
- }
- currName = entry.getName();
- }
-
- /**
- * Close an entry. This method MUST be called for all file
- * entries that contain data. The reason is that we must
- * buffer data written to the stream in order to satisfy
- * the buffer's record based writes. Thus, there may be
- * data fragments still being assembled that must be written
- * to the output stream before this entry is closed and the
- * next entry written.
- * @throws IOException on error
- */
- public void closeEntry() throws IOException {
- if (assemLen > 0) {
- for (int i = assemLen; i < assemBuf.length; ++i) {
- assemBuf[i] = 0;
- }
-
- buffer.writeRecord(assemBuf);
-
- currBytes += assemLen;
- assemLen = 0;
- }
-
- if (currBytes < currSize) {
- throw new IOException("entry '" + currName + "' closed at '"
- + currBytes
- + "' before the '" + currSize
- + "' bytes specified in the header were written");
- }
- }
-
- /**
- * Writes a byte to the current tar archive entry.
- *
- * This method simply calls read( byte[], int, int ).
- *
- * @param b The byte written.
- * @throws IOException on error
- */
- public void write(int b) throws IOException {
- oneBuf[0] = (byte) b;
-
- write(oneBuf, 0, 1);
- }
-
- /**
- * Writes bytes to the current tar archive entry.
- *
- * This method simply calls write( byte[], int, int ).
- *
- * @param wBuf The buffer to write to the archive.
- * @throws IOException on error
- */
- public void write(byte[] wBuf) throws IOException {
- write(wBuf, 0, wBuf.length);
- }
-
- /**
- * Writes bytes to the current tar archive entry. This method
- * is aware of the current entry and will throw an exception if
- * you attempt to write bytes past the length specified for the
- * current entry. The method is also (painfully) aware of the
- * record buffering required by TarBuffer, and manages buffers
- * that are not a multiple of recordsize in length, including
- * assembling records from small buffers.
- *
- * @param wBuf The buffer to write to the archive.
- * @param wOffset The offset in the buffer from which to get bytes.
- * @param numToWrite The number of bytes to write.
- * @throws IOException on error
- */
- public void write(byte[] wBuf, int wOffset, int numToWrite) throws IOException {
- if ((currBytes + numToWrite) > currSize) {
- throw new IOException("request to write '" + numToWrite
- + "' bytes exceeds size in header of '"
- + currSize + "' bytes for entry '"
- + currName + "'");
-
- //
- // We have to deal with assembly!!!
- // The programmer can be writing little 32 byte chunks for all
- // we know, and we must assemble complete records for writing.
- // REVIEW Maybe this should be in TarBuffer? Could that help to
- // eliminate some of the buffer copying.
- //
- }
-
- if (assemLen > 0) {
- if ((assemLen + numToWrite) >= recordBuf.length) {
- int aLen = recordBuf.length - assemLen;
-
- System.arraycopy(assemBuf, 0, recordBuf, 0,
- assemLen);
- System.arraycopy(wBuf, wOffset, recordBuf,
- assemLen, aLen);
- buffer.writeRecord(recordBuf);
-
- currBytes += recordBuf.length;
- wOffset += aLen;
- numToWrite -= aLen;
- assemLen = 0;
- } else {
- System.arraycopy(wBuf, wOffset, assemBuf, assemLen,
- numToWrite);
-
- wOffset += numToWrite;
- assemLen += numToWrite;
- numToWrite = 0;
- }
- }
-
- //
- // When we get here we have EITHER:
- // o An empty "assemble" buffer.
- // o No bytes to write (numToWrite == 0)
- //
- while (numToWrite > 0) {
- if (numToWrite < recordBuf.length) {
- System.arraycopy(wBuf, wOffset, assemBuf, assemLen,
- numToWrite);
-
- assemLen += numToWrite;
-
- break;
- }
-
- buffer.writeRecord(wBuf, wOffset);
-
- int num = recordBuf.length;
-
- currBytes += num;
- numToWrite -= num;
- wOffset += num;
- }
- }
-
- /**
- * Write an EOF (end of archive) record to the tar archive.
- * An EOF record consists of a record of all zeros.
- */
- private void writeEOFRecord() throws IOException {
- for (int i = 0; i < recordBuf.length; ++i) {
- recordBuf[i] = 0;
- }
-
- buffer.writeRecord(recordBuf);
- }
-}
-
-
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ *
+ */
+
+/*
+ * This package is based on the work done by Timothy Gerard Endres
+ * (time@ice.com) to whom the Ant project is very grateful for his great code.
+ */
+
+package org.apache.activemq.console.command.store.tar;
+
+import java.io.FilterOutputStream;
+import java.io.OutputStream;
+import java.io.IOException;
+
+/**
+ * The TarOutputStream writes a UNIX tar archive as an OutputStream.
+ * Methods are provided to put entries, and then write their contents
+ * by writing to this stream using write().
+ *
+ */
+public class TarOutputStream extends FilterOutputStream {
+ /** Fail if a long file name is required in the archive. */
+ public static final int LONGFILE_ERROR = 0;
+
+ /** Long paths will be truncated in the archive. */
+ public static final int LONGFILE_TRUNCATE = 1;
+
+ /** GNU tar extensions are used to store long file names in the archive. */
+ public static final int LONGFILE_GNU = 2;
+
+ // CheckStyle:VisibilityModifier OFF - bc
+ protected boolean debug;
+ protected long currSize;
+ protected String currName;
+ protected long currBytes;
+ protected byte[] oneBuf;
+ protected byte[] recordBuf;
+ protected int assemLen;
+ protected byte[] assemBuf;
+ protected TarBuffer buffer;
+ protected int longFileMode = LONGFILE_ERROR;
+ // CheckStyle:VisibilityModifier ON
+
+ private boolean closed = false;
+
+ /**
+ * Constructor for TarInputStream.
+ * @param os the output stream to use
+ */
+ public TarOutputStream(OutputStream os) {
+ this(os, TarBuffer.DEFAULT_BLKSIZE, TarBuffer.DEFAULT_RCDSIZE);
+ }
+
+ /**
+ * Constructor for TarInputStream.
+ * @param os the output stream to use
+ * @param blockSize the block size to use
+ */
+ public TarOutputStream(OutputStream os, int blockSize) {
+ this(os, blockSize, TarBuffer.DEFAULT_RCDSIZE);
+ }
+
+ /**
+ * Constructor for TarInputStream.
+ * @param os the output stream to use
+ * @param blockSize the block size to use
+ * @param recordSize the record size to use
+ */
+ public TarOutputStream(OutputStream os, int blockSize, int recordSize) {
+ super(os);
+
+ this.buffer = new TarBuffer(os, blockSize, recordSize);
+ this.debug = false;
+ this.assemLen = 0;
+ this.assemBuf = new byte[recordSize];
+ this.recordBuf = new byte[recordSize];
+ this.oneBuf = new byte[1];
+ }
+
+ /**
+ * Set the long file mode.
+ * This can be LONGFILE_ERROR(0), LONGFILE_TRUNCATE(1) or LONGFILE_GNU(2).
+ * This specifies the treatment of long file names (names >= TarConstants.NAMELEN).
+ * Default is LONGFILE_ERROR.
+ * @param longFileMode the mode to use
+ */
+ public void setLongFileMode(int longFileMode) {
+ this.longFileMode = longFileMode;
+ }
+
+
+ /**
+ * Sets the debugging flag.
+ *
+ * @param debugF True to turn on debugging.
+ */
+ public void setDebug(boolean debugF) {
+ this.debug = debugF;
+ }
+
+ /**
+ * Sets the debugging flag in this stream's TarBuffer.
+ *
+ * @param debug True to turn on debugging.
+ */
+ public void setBufferDebug(boolean debug) {
+ buffer.setDebug(debug);
+ }
+
+ /**
+ * Ends the TAR archive without closing the underlying OutputStream.
+ * The result is that the two EOF records of nulls are written.
+ * @throws IOException on error
+ */
+ public void finish() throws IOException {
+ // See Bugzilla 28776 for a discussion on this
+ // http://issues.apache.org/bugzilla/show_bug.cgi?id=28776
+ writeEOFRecord();
+ writeEOFRecord();
+ buffer.flushBlock();
+ }
+
+ /**
+ * Ends the TAR archive and closes the underlying OutputStream.
+ * This means that finish() is called followed by calling the
+ * TarBuffer's close().
+ * @throws IOException on error
+ */
+ public void close() throws IOException {
+ if (!closed) {
+ finish();
+ buffer.close();
+ out.close();
+ closed = true;
+ }
+ }
+
+ /**
+ * Get the record size being used by this stream's TarBuffer.
+ *
+ * @return The TarBuffer record size.
+ */
+ public int getRecordSize() {
+ return buffer.getRecordSize();
+ }
+
+ /**
+ * Put an entry on the output stream. This writes the entry's
+ * header record and positions the output stream for writing
+ * the contents of the entry. Once this method is called, the
+ * stream is ready for calls to write() to write the entry's
+ * contents. Once the contents are written, closeEntry()
+ * <B>MUST</B> be called to ensure that all buffered data
+ * is completely written to the output stream.
+ *
+ * @param entry The TarEntry to be written to the archive.
+ * @throws IOException on error
+ */
+ public void putNextEntry(TarEntry entry) throws IOException {
+ if (entry.getName().length() >= TarConstants.NAMELEN) {
+
+ if (longFileMode == LONGFILE_GNU) {
+ // create a TarEntry for the LongLink, the contents
+ // of which are the entry's name
+ TarEntry longLinkEntry = new TarEntry(TarConstants.GNU_LONGLINK,
+ TarConstants.LF_GNUTYPE_LONGNAME);
+
+ longLinkEntry.setSize(entry.getName().length() + 1);
+ putNextEntry(longLinkEntry);
+ write(entry.getName().getBytes());
+ write(0);
+ closeEntry();
+ } else if (longFileMode != LONGFILE_TRUNCATE) {
+ throw new RuntimeException("file name '" + entry.getName()
+ + "' is too long ( > "
+ + TarConstants.NAMELEN + " bytes)");
+ }
+ }
+
+ entry.writeEntryHeader(recordBuf);
+ buffer.writeRecord(recordBuf);
+
+ currBytes = 0;
+
+ if (entry.isDirectory()) {
+ currSize = 0;
+ } else {
+ currSize = entry.getSize();
+ }
+ currName = entry.getName();
+ }
+
+ /**
+ * Close an entry. This method MUST be called for all file
+ * entries that contain data. The reason is that we must
+ * buffer data written to the stream in order to satisfy
+ * the buffer's record based writes. Thus, there may be
+ * data fragments still being assembled that must be written
+ * to the output stream before this entry is closed and the
+ * next entry written.
+ * @throws IOException on error
+ */
+ public void closeEntry() throws IOException {
+ if (assemLen > 0) {
+ for (int i = assemLen; i < assemBuf.length; ++i) {
+ assemBuf[i] = 0;
+ }
+
+ buffer.writeRecord(assemBuf);
+
+ currBytes += assemLen;
+ assemLen = 0;
+ }
+
+ if (currBytes < currSize) {
+ throw new IOException("entry '" + currName + "' closed at '"
+ + currBytes
+ + "' before the '" + currSize
+ + "' bytes specified in the header were written");
+ }
+ }
+
+ /**
+ * Writes a byte to the current tar archive entry.
+ *
+ * This method simply calls read( byte[], int, int ).
+ *
+ * @param b The byte written.
+ * @throws IOException on error
+ */
+ public void write(int b) throws IOException {
+ oneBuf[0] = (byte) b;
+
+ write(oneBuf, 0, 1);
+ }
+
+ /**
+ * Writes bytes to the current tar archive entry.
+ *
+ * This method simply calls write( byte[], int, int ).
+ *
+ * @param wBuf The buffer to write to the archive.
+ * @throws IOException on error
+ */
+ public void write(byte[] wBuf) throws IOException {
+ write(wBuf, 0, wBuf.length);
+ }
+
+ /**
+ * Writes bytes to the current tar archive entry. This method
+ * is aware of the current entry and will throw an exception if
+ * you attempt to write bytes past the length specified for the
+ * current entry. The method is also (painfully) aware of the
+ * record buffering required by TarBuffer, and manages buffers
+ * that are not a multiple of recordsize in length, including
+ * assembling records from small buffers.
+ *
+ * @param wBuf The buffer to write to the archive.
+ * @param wOffset The offset in the buffer from which to get bytes.
+ * @param numToWrite The number of bytes to write.
+ * @throws IOException on error
+ */
+ public void write(byte[] wBuf, int wOffset, int numToWrite) throws IOException {
+ if ((currBytes + numToWrite) > currSize) {
+ throw new IOException("request to write '" + numToWrite
+ + "' bytes exceeds size in header of '"
+ + currSize + "' bytes for entry '"
+ + currName + "'");
+
+ //
+ // We have to deal with assembly!!!
+ // The programmer can be writing little 32 byte chunks for all
+ // we know, and we must assemble complete records for writing.
+ // REVIEW Maybe this should be in TarBuffer? Could that help to
+ // eliminate some of the buffer copying.
+ //
+ }
+
+ if (assemLen > 0) {
+ if ((assemLen + numToWrite) >= recordBuf.length) {
+ int aLen = recordBuf.length - assemLen;
+
+ System.arraycopy(assemBuf, 0, recordBuf, 0,
+ assemLen);
+ System.arraycopy(wBuf, wOffset, recordBuf,
+ assemLen, aLen);
+ buffer.writeRecord(recordBuf);
+
+ currBytes += recordBuf.length;
+ wOffset += aLen;
+ numToWrite -= aLen;
+ assemLen = 0;
+ } else {
+ System.arraycopy(wBuf, wOffset, assemBuf, assemLen,
+ numToWrite);
+
+ wOffset += numToWrite;
+ assemLen += numToWrite;
+ numToWrite = 0;
+ }
+ }
+
+ //
+ // When we get here we have EITHER:
+ // o An empty "assemble" buffer.
+ // o No bytes to write (numToWrite == 0)
+ //
+ while (numToWrite > 0) {
+ if (numToWrite < recordBuf.length) {
+ System.arraycopy(wBuf, wOffset, assemBuf, assemLen,
+ numToWrite);
+
+ assemLen += numToWrite;
+
+ break;
+ }
+
+ buffer.writeRecord(wBuf, wOffset);
+
+ int num = recordBuf.length;
+
+ currBytes += num;
+ numToWrite -= num;
+ wOffset += num;
+ }
+ }
+
+ /**
+ * Write an EOF (end of archive) record to the tar archive.
+ * An EOF record consists of a record of all zeros.
+ */
+ private void writeEOFRecord() throws IOException {
+ for (int i = 0; i < recordBuf.length; ++i) {
+ recordBuf[i] = 0;
+ }
+
+ buffer.writeRecord(recordBuf);
+ }
+}
+
+
http://git-wip-us.apache.org/repos/asf/activemq/blob/3f32507f/activemq-console/src/main/java/org/apache/activemq/console/command/store/tar/TarUtils.java
----------------------------------------------------------------------
diff --git a/activemq-console/src/main/java/org/apache/activemq/console/command/store/tar/TarUtils.java b/activemq-console/src/main/java/org/apache/activemq/console/command/store/tar/TarUtils.java
index a89caec..94e1e1c 100644
--- a/activemq-console/src/main/java/org/apache/activemq/console/command/store/tar/TarUtils.java
+++ b/activemq-console/src/main/java/org/apache/activemq/console/command/store/tar/TarUtils.java
@@ -1,206 +1,206 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements. See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- *
- */
-
-/*
- * This package is based on the work done by Timothy Gerard Endres
- * (time@ice.com) to whom the Ant project is very grateful for his great code.
- */
-
-package org.apache.activemq.console.command.store.tar;
-
-/**
- * This class provides static utility methods to work with byte streams.
- *
- */
-// CheckStyle:HideUtilityClassConstructorCheck OFF (bc)
-public class TarUtils {
-
- private static final int BYTE_MASK = 255;
-
- /**
- * Parse an octal string from a header buffer. This is used for the
- * file permission mode value.
- *
- * @param header The header buffer from which to parse.
- * @param offset The offset into the buffer from which to parse.
- * @param length The number of header bytes to parse.
- * @return The long value of the octal string.
- */
- public static long parseOctal(byte[] header, int offset, int length) {
- long result = 0;
- boolean stillPadding = true;
- int end = offset + length;
-
- for (int i = offset; i < end; ++i) {
- if (header[i] == 0) {
- break;
- }
-
- if (header[i] == (byte) ' ' || header[i] == '0') {
- if (stillPadding) {
- continue;
- }
-
- if (header[i] == (byte) ' ') {
- break;
- }
- }
-
- stillPadding = false;
- // CheckStyle:MagicNumber OFF
- result = (result << 3) + (header[i] - '0');
- // CheckStyle:MagicNumber ON
- }
-
- return result;
- }
-
- /**
- * Parse an entry name from a header buffer.
- *
- * @param header The header buffer from which to parse.
- * @param offset The offset into the buffer from which to parse.
- * @param length The number of header bytes to parse.
- * @return The header's entry name.
- */
- public static StringBuffer parseName(byte[] header, int offset, int length) {
- StringBuffer result = new StringBuffer(length);
- int end = offset + length;
-
- for (int i = offset; i < end; ++i) {
- if (header[i] == 0) {
- break;
- }
-
- result.append((char) header[i]);
- }
-
- return result;
- }
-
- /**
- * Determine the number of bytes in an entry name.
- *
- * @param name The header name from which to parse.
- * @param buf The buffer from which to parse.
- * @param offset The offset into the buffer from which to parse.
- * @param length The number of header bytes to parse.
- * @return The number of bytes in a header's entry name.
- */
- public static int getNameBytes(StringBuffer name, byte[] buf, int offset, int length) {
- int i;
-
- for (i = 0; i < length && i < name.length(); ++i) {
- buf[offset + i] = (byte) name.charAt(i);
- }
-
- for (; i < length; ++i) {
- buf[offset + i] = 0;
- }
-
- return offset + length;
- }
-
- /**
- * Parse an octal integer from a header buffer.
- *
- * @param value The header value
- * @param buf The buffer from which to parse.
- * @param offset The offset into the buffer from which to parse.
- * @param length The number of header bytes to parse.
- * @return The integer value of the octal bytes.
- */
- public static int getOctalBytes(long value, byte[] buf, int offset, int length) {
- int idx = length - 1;
-
- buf[offset + idx] = 0;
- --idx;
- buf[offset + idx] = (byte) ' ';
- --idx;
-
- if (value == 0) {
- buf[offset + idx] = (byte) '0';
- --idx;
- } else {
- for (long val = value; idx >= 0 && val > 0; --idx) {
- // CheckStyle:MagicNumber OFF
- buf[offset + idx] = (byte) ((byte) '0' + (byte) (val & 7));
- val = val >> 3;
- // CheckStyle:MagicNumber ON
- }
- }
-
- for (; idx >= 0; --idx) {
- buf[offset + idx] = (byte) ' ';
- }
-
- return offset + length;
- }
-
- /**
- * Parse an octal long integer from a header buffer.
- *
- * @param value The header value
- * @param buf The buffer from which to parse.
- * @param offset The offset into the buffer from which to parse.
- * @param length The number of header bytes to parse.
- * @return The long value of the octal bytes.
- */
- public static int getLongOctalBytes(long value, byte[] buf, int offset, int length) {
- byte[] temp = new byte[length + 1];
-
- getOctalBytes(value, temp, 0, length + 1);
- System.arraycopy(temp, 0, buf, offset, length);
-
- return offset + length;
- }
-
- /**
- * Parse the checksum octal integer from a header buffer.
- *
- * @param value The header value
- * @param buf The buffer from which to parse.
- * @param offset The offset into the buffer from which to parse.
- * @param length The number of header bytes to parse.
- * @return The integer value of the entry's checksum.
- */
- public static int getCheckSumOctalBytes(long value, byte[] buf, int offset, int length) {
- getOctalBytes(value, buf, offset, length);
-
- buf[offset + length - 1] = (byte) ' ';
- buf[offset + length - 2] = 0;
-
- return offset + length;
- }
-
- /**
- * Compute the checksum of a tar entry header.
- *
- * @param buf The tar entry's header buffer.
- * @return The computed checksum.
- */
- public static long computeCheckSum(byte[] buf) {
- long sum = 0;
-
- for (int i = 0; i < buf.length; ++i) {
- sum += BYTE_MASK & buf[i];
- }
-
- return sum;
- }
-}
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ *
+ */
+
+/*
+ * This package is based on the work done by Timothy Gerard Endres
+ * (time@ice.com) to whom the Ant project is very grateful for his great code.
+ */
+
+package org.apache.activemq.console.command.store.tar;
+
+/**
+ * This class provides static utility methods to work with byte streams.
+ *
+ */
+// CheckStyle:HideUtilityClassConstructorCheck OFF (bc)
+public class TarUtils {
+
+ private static final int BYTE_MASK = 255;
+
+ /**
+ * Parse an octal string from a header buffer. This is used for the
+ * file permission mode value.
+ *
+ * @param header The header buffer from which to parse.
+ * @param offset The offset into the buffer from which to parse.
+ * @param length The number of header bytes to parse.
+ * @return The long value of the octal string.
+ */
+ public static long parseOctal(byte[] header, int offset, int length) {
+ long result = 0;
+ boolean stillPadding = true;
+ int end = offset + length;
+
+ for (int i = offset; i < end; ++i) {
+ if (header[i] == 0) {
+ break;
+ }
+
+ if (header[i] == (byte) ' ' || header[i] == '0') {
+ if (stillPadding) {
+ continue;
+ }
+
+ if (header[i] == (byte) ' ') {
+ break;
+ }
+ }
+
+ stillPadding = false;
+ // CheckStyle:MagicNumber OFF
+ result = (result << 3) + (header[i] - '0');
+ // CheckStyle:MagicNumber ON
+ }
+
+ return result;
+ }
+
+ /**
+ * Parse an entry name from a header buffer.
+ *
+ * @param header The header buffer from which to parse.
+ * @param offset The offset into the buffer from which to parse.
+ * @param length The number of header bytes to parse.
+ * @return The header's entry name.
+ */
+ public static StringBuffer parseName(byte[] header, int offset, int length) {
+ StringBuffer result = new StringBuffer(length);
+ int end = offset + length;
+
+ for (int i = offset; i < end; ++i) {
+ if (header[i] == 0) {
+ break;
+ }
+
+ result.append((char) header[i]);
+ }
+
+ return result;
+ }
+
+ /**
+ * Determine the number of bytes in an entry name.
+ *
+ * @param name The header name from which to parse.
+ * @param buf The buffer from which to parse.
+ * @param offset The offset into the buffer from which to parse.
+ * @param length The number of header bytes to parse.
+ * @return The number of bytes in a header's entry name.
+ */
+ public static int getNameBytes(StringBuffer name, byte[] buf, int offset, int length) {
+ int i;
+
+ for (i = 0; i < length && i < name.length(); ++i) {
+ buf[offset + i] = (byte) name.charAt(i);
+ }
+
+ for (; i < length; ++i) {
+ buf[offset + i] = 0;
+ }
+
+ return offset + length;
+ }
+
+ /**
+ * Parse an octal integer from a header buffer.
+ *
+ * @param value The header value
+ * @param buf The buffer from which to parse.
+ * @param offset The offset into the buffer from which to parse.
+ * @param length The number of header bytes to parse.
+ * @return The integer value of the octal bytes.
+ */
+ public static int getOctalBytes(long value, byte[] buf, int offset, int length) {
+ int idx = length - 1;
+
+ buf[offset + idx] = 0;
+ --idx;
+ buf[offset + idx] = (byte) ' ';
+ --idx;
+
+ if (value == 0) {
+ buf[offset + idx] = (byte) '0';
+ --idx;
+ } else {
+ for (long val = value; idx >= 0 && val > 0; --idx) {
+ // CheckStyle:MagicNumber OFF
+ buf[offset + idx] = (byte) ((byte) '0' + (byte) (val & 7));
+ val = val >> 3;
+ // CheckStyle:MagicNumber ON
+ }
+ }
+
+ for (; idx >= 0; --idx) {
+ buf[offset + idx] = (byte) ' ';
+ }
+
+ return offset + length;
+ }
+
+ /**
+ * Parse an octal long integer from a header buffer.
+ *
+ * @param value The header value
+ * @param buf The buffer from which to parse.
+ * @param offset The offset into the buffer from which to parse.
+ * @param length The number of header bytes to parse.
+ * @return The long value of the octal bytes.
+ */
+ public static int getLongOctalBytes(long value, byte[] buf, int offset, int length) {
+ byte[] temp = new byte[length + 1];
+
+ getOctalBytes(value, temp, 0, length + 1);
+ System.arraycopy(temp, 0, buf, offset, length);
+
+ return offset + length;
+ }
+
+ /**
+ * Parse the checksum octal integer from a header buffer.
+ *
+ * @param value The header value
+ * @param buf The buffer from which to parse.
+ * @param offset The offset into the buffer from which to parse.
+ * @param length The number of header bytes to parse.
+ * @return The integer value of the entry's checksum.
+ */
+ public static int getCheckSumOctalBytes(long value, byte[] buf, int offset, int length) {
+ getOctalBytes(value, buf, offset, length);
+
+ buf[offset + length - 1] = (byte) ' ';
+ buf[offset + length - 2] = 0;
+
+ return offset + length;
+ }
+
+ /**
+ * Compute the checksum of a tar entry header.
+ *
+ * @param buf The tar entry's header buffer.
+ * @return The computed checksum.
+ */
+ public static long computeCheckSum(byte[] buf) {
+ long sum = 0;
+
+ for (int i = 0; i < buf.length; ++i) {
+ sum += BYTE_MASK & buf[i];
+ }
+
+ return sum;
+ }
+}
http://git-wip-us.apache.org/repos/asf/activemq/blob/3f32507f/activemq-jdbc-store/src/main/java/org/apache/activemq/store/jdbc/adapter/TransactJDBCAdapter.java
----------------------------------------------------------------------
diff --git a/activemq-jdbc-store/src/main/java/org/apache/activemq/store/jdbc/adapter/TransactJDBCAdapter.java b/activemq-jdbc-store/src/main/java/org/apache/activemq/store/jdbc/adapter/TransactJDBCAdapter.java
index 0ab985e..4187979 100644
--- a/activemq-jdbc-store/src/main/java/org/apache/activemq/store/jdbc/adapter/TransactJDBCAdapter.java
+++ b/activemq-jdbc-store/src/main/java/org/apache/activemq/store/jdbc/adapter/TransactJDBCAdapter.java
@@ -28,12 +28,12 @@ import org.apache.activemq.store.jdbc.Statements;
public class TransactJDBCAdapter extends ImageBasedJDBCAdaptor {
@Override
public void setStatements(Statements statements) {
- String lockCreateStatement = "SELECT * FROM " + statements.getFullLockTableName() + " WITH (UPDLOCK, ROWLOCK)";
+ String lockCreateStatement = "SELECT * FROM " + statements.getFullLockTableName() + " WITH (UPDLOCK, ROWLOCK)";
if (statements.isUseLockCreateWhereClause()) {
lockCreateStatement += " WHERE ID = 1";
}
-
+
statements.setLockCreateStatement(lockCreateStatement);
super.setStatements(statements);
http://git-wip-us.apache.org/repos/asf/activemq/blob/3f32507f/activemq-kahadb-store/src/main/java/org/apache/activemq/store/kahadb/disk/journal/ReplicationTarget.java
----------------------------------------------------------------------
diff --git a/activemq-kahadb-store/src/main/java/org/apache/activemq/store/kahadb/disk/journal/ReplicationTarget.java b/activemq-kahadb-store/src/main/java/org/apache/activemq/store/kahadb/disk/journal/ReplicationTarget.java
index fe228f7..dae59da 100644
--- a/activemq-kahadb-store/src/main/java/org/apache/activemq/store/kahadb/disk/journal/ReplicationTarget.java
+++ b/activemq-kahadb-store/src/main/java/org/apache/activemq/store/kahadb/disk/journal/ReplicationTarget.java
@@ -1,25 +1,25 @@
-/**
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements. See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package org.apache.activemq.store.kahadb.disk.journal;
-
-import org.apache.activemq.util.ByteSequence;
-
-public interface ReplicationTarget {
-
- void replicate(Location location, ByteSequence sequence, boolean sync);
-
-}
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.activemq.store.kahadb.disk.journal;
+
+import org.apache.activemq.util.ByteSequence;
+
+public interface ReplicationTarget {
+
+ void replicate(Location location, ByteSequence sequence, boolean sync);
+
+}
http://git-wip-us.apache.org/repos/asf/activemq/blob/3f32507f/activemq-unit-tests/src/test/java/org/apache/activemq/ExclusiveConsumerTest.java
----------------------------------------------------------------------
diff --git a/activemq-unit-tests/src/test/java/org/apache/activemq/ExclusiveConsumerTest.java b/activemq-unit-tests/src/test/java/org/apache/activemq/ExclusiveConsumerTest.java
index 757fceb..53ef28f 100644
--- a/activemq-unit-tests/src/test/java/org/apache/activemq/ExclusiveConsumerTest.java
+++ b/activemq-unit-tests/src/test/java/org/apache/activemq/ExclusiveConsumerTest.java
@@ -1,357 +1,357 @@
-/**
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements. See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package org.apache.activemq;
-
-import javax.jms.Connection;
-import javax.jms.ConnectionFactory;
-import javax.jms.JMSException;
-import javax.jms.Message;
-import javax.jms.MessageConsumer;
-import javax.jms.MessageProducer;
-import javax.jms.Session;
-
-import junit.framework.TestCase;
-
-import org.apache.activemq.command.ActiveMQQueue;
-
-public class ExclusiveConsumerTest extends TestCase {
-
- private static final String VM_BROKER_URL = "vm://localhost?broker.persistent=false&broker.useJmx=true";
-
- public ExclusiveConsumerTest(String name) {
- super(name);
- }
-
- @Override
- protected void setUp() throws Exception {
- super.setUp();
- }
-
- @Override
- protected void tearDown() throws Exception {
- super.tearDown();
- }
-
- private Connection createConnection(final boolean start) throws JMSException {
- ConnectionFactory cf = new ActiveMQConnectionFactory(VM_BROKER_URL);
- Connection conn = cf.createConnection();
- if (start) {
- conn.start();
- }
- return conn;
- }
-
- public void testExclusiveConsumerSelectedCreatedFirst() throws JMSException, InterruptedException {
- Connection conn = createConnection(true);
-
- Session exclusiveSession = null;
- Session fallbackSession = null;
- Session senderSession = null;
-
- try {
-
- exclusiveSession = conn.createSession(false, Session.AUTO_ACKNOWLEDGE);
- fallbackSession = conn.createSession(false, Session.AUTO_ACKNOWLEDGE);
- senderSession = conn.createSession(false, Session.AUTO_ACKNOWLEDGE);
-
- ActiveMQQueue exclusiveQueue = new ActiveMQQueue("TEST.QUEUE1?consumer.exclusive=true");
- MessageConsumer exclusiveConsumer = exclusiveSession.createConsumer(exclusiveQueue);
-
- ActiveMQQueue fallbackQueue = new ActiveMQQueue("TEST.QUEUE1");
- MessageConsumer fallbackConsumer = fallbackSession.createConsumer(fallbackQueue);
-
- ActiveMQQueue senderQueue = new ActiveMQQueue("TEST.QUEUE1");
-
- MessageProducer producer = senderSession.createProducer(senderQueue);
-
- Message msg = senderSession.createTextMessage("test");
- producer.send(msg);
- // TODO need two send a 2nd message - bug AMQ-1024
- // producer.send(msg);
- Thread.sleep(100);
-
- // Verify exclusive consumer receives the message.
- assertNotNull(exclusiveConsumer.receive(100));
- assertNull(fallbackConsumer.receive(100));
-
- } finally {
- fallbackSession.close();
- senderSession.close();
- conn.close();
- }
-
- }
-
- public void testExclusiveConsumerSelectedCreatedAfter() throws JMSException, InterruptedException {
- Connection conn = createConnection(true);
-
- Session exclusiveSession = null;
- Session fallbackSession = null;
- Session senderSession = null;
-
- try {
-
- exclusiveSession = conn.createSession(false, Session.AUTO_ACKNOWLEDGE);
- fallbackSession = conn.createSession(false, Session.AUTO_ACKNOWLEDGE);
- senderSession = conn.createSession(false, Session.AUTO_ACKNOWLEDGE);
-
- ActiveMQQueue fallbackQueue = new ActiveMQQueue("TEST.QUEUE5");
- MessageConsumer fallbackConsumer = fallbackSession.createConsumer(fallbackQueue);
-
- ActiveMQQueue exclusiveQueue = new ActiveMQQueue("TEST.QUEUE5?consumer.exclusive=true");
- MessageConsumer exclusiveConsumer = exclusiveSession.createConsumer(exclusiveQueue);
-
- ActiveMQQueue senderQueue = new ActiveMQQueue("TEST.QUEUE5");
-
- MessageProducer producer = senderSession.createProducer(senderQueue);
-
- Message msg = senderSession.createTextMessage("test");
- producer.send(msg);
- Thread.sleep(100);
-
- // Verify exclusive consumer receives the message.
- assertNotNull(exclusiveConsumer.receive(100));
- assertNull(fallbackConsumer.receive(100));
-
- } finally {
- fallbackSession.close();
- senderSession.close();
- conn.close();
- }
-
- }
-
- public void testFailoverToAnotherExclusiveConsumerCreatedFirst() throws JMSException,
- InterruptedException {
- Connection conn = createConnection(true);
-
- Session exclusiveSession1 = null;
- Session exclusiveSession2 = null;
- Session fallbackSession = null;
- Session senderSession = null;
-
- try {
-
- exclusiveSession1 = conn.createSession(false, Session.AUTO_ACKNOWLEDGE);
- exclusiveSession2 = conn.createSession(false, Session.AUTO_ACKNOWLEDGE);
- fallbackSession = conn.createSession(false, Session.AUTO_ACKNOWLEDGE);
- senderSession = conn.createSession(false, Session.AUTO_ACKNOWLEDGE);
-
- // This creates the exclusive consumer first which avoids AMQ-1024
- // bug.
- ActiveMQQueue exclusiveQueue = new ActiveMQQueue("TEST.QUEUE2?consumer.exclusive=true");
- MessageConsumer exclusiveConsumer1 = exclusiveSession1.createConsumer(exclusiveQueue);
- MessageConsumer exclusiveConsumer2 = exclusiveSession2.createConsumer(exclusiveQueue);
-
- ActiveMQQueue fallbackQueue = new ActiveMQQueue("TEST.QUEUE2");
- MessageConsumer fallbackConsumer = fallbackSession.createConsumer(fallbackQueue);
-
- ActiveMQQueue senderQueue = new ActiveMQQueue("TEST.QUEUE2");
-
- MessageProducer producer = senderSession.createProducer(senderQueue);
-
- Message msg = senderSession.createTextMessage("test");
- producer.send(msg);
- Thread.sleep(100);
-
- // Verify exclusive consumer receives the message.
- assertNotNull(exclusiveConsumer1.receive(100));
- assertNull(exclusiveConsumer2.receive(100));
- assertNull(fallbackConsumer.receive(100));
-
- // Close the exclusive consumer to verify the non-exclusive consumer
- // takes over
- exclusiveConsumer1.close();
-
- producer.send(msg);
- producer.send(msg);
-
- assertNotNull(exclusiveConsumer2.receive(100));
- assertNull(fallbackConsumer.receive(100));
-
- } finally {
- fallbackSession.close();
- senderSession.close();
- conn.close();
- }
-
- }
-
- public void testFailoverToAnotherExclusiveConsumerCreatedAfter() throws JMSException,
- InterruptedException {
- Connection conn = createConnection(true);
-
- Session exclusiveSession1 = null;
- Session exclusiveSession2 = null;
- Session fallbackSession = null;
- Session senderSession = null;
-
- try {
-
- exclusiveSession1 = conn.createSession(false, Session.AUTO_ACKNOWLEDGE);
- exclusiveSession2 = conn.createSession(false, Session.AUTO_ACKNOWLEDGE);
- fallbackSession = conn.createSession(false, Session.AUTO_ACKNOWLEDGE);
- senderSession = conn.createSession(false, Session.AUTO_ACKNOWLEDGE);
-
- // This creates the exclusive consumer first which avoids AMQ-1024
- // bug.
- ActiveMQQueue exclusiveQueue = new ActiveMQQueue("TEST.QUEUE6?consumer.exclusive=true");
- MessageConsumer exclusiveConsumer1 = exclusiveSession1.createConsumer(exclusiveQueue);
-
- ActiveMQQueue fallbackQueue = new ActiveMQQueue("TEST.QUEUE6");
- MessageConsumer fallbackConsumer = fallbackSession.createConsumer(fallbackQueue);
-
- MessageConsumer exclusiveConsumer2 = exclusiveSession2.createConsumer(exclusiveQueue);
-
- ActiveMQQueue senderQueue = new ActiveMQQueue("TEST.QUEUE6");
-
- MessageProducer producer = senderSession.createProducer(senderQueue);
-
- Message msg = senderSession.createTextMessage("test");
- producer.send(msg);
- Thread.sleep(100);
-
- // Verify exclusive consumer receives the message.
- assertNotNull(exclusiveConsumer1.receive(100));
- assertNull(exclusiveConsumer2.receive(100));
- assertNull(fallbackConsumer.receive(100));
-
- // Close the exclusive consumer to verify the non-exclusive consumer
- // takes over
- exclusiveConsumer1.close();
-
- producer.send(msg);
- producer.send(msg);
-
- assertNotNull(exclusiveConsumer2.receive(1000));
- assertNull(fallbackConsumer.receive(100));
-
- } finally {
- fallbackSession.close();
- senderSession.close();
- conn.close();
- }
-
- }
-
- public void testFailoverToNonExclusiveConsumer() throws JMSException, InterruptedException {
- Connection conn = createConnection(true);
-
- Session exclusiveSession = null;
- Session fallbackSession = null;
- Session senderSession = null;
-
- try {
-
- exclusiveSession = conn.createSession(false, Session.AUTO_ACKNOWLEDGE);
- fallbackSession = conn.createSession(false, Session.AUTO_ACKNOWLEDGE);
- senderSession = conn.createSession(false, Session.AUTO_ACKNOWLEDGE);
-
- // This creates the exclusive consumer first which avoids AMQ-1024
- // bug.
- ActiveMQQueue exclusiveQueue = new ActiveMQQueue("TEST.QUEUE3?consumer.exclusive=true");
- MessageConsumer exclusiveConsumer = exclusiveSession.createConsumer(exclusiveQueue);
-
- ActiveMQQueue fallbackQueue = new ActiveMQQueue("TEST.QUEUE3");
- MessageConsumer fallbackConsumer = fallbackSession.createConsumer(fallbackQueue);
-
- ActiveMQQueue senderQueue = new ActiveMQQueue("TEST.QUEUE3");
-
- MessageProducer producer = senderSession.createProducer(senderQueue);
-
- Message msg = senderSession.createTextMessage("test");
- producer.send(msg);
- Thread.sleep(100);
-
- // Verify exclusive consumer receives the message.
- assertNotNull(exclusiveConsumer.receive(100));
- assertNull(fallbackConsumer.receive(100));
-
- // Close the exclusive consumer to verify the non-exclusive consumer
- // takes over
- exclusiveConsumer.close();
-
- producer.send(msg);
-
- assertNotNull(fallbackConsumer.receive(100));
-
- } finally {
- fallbackSession.close();
- senderSession.close();
- conn.close();
- }
-
- }
-
- public void testFallbackToExclusiveConsumer() throws JMSException, InterruptedException {
- Connection conn = createConnection(true);
-
- Session exclusiveSession = null;
- Session fallbackSession = null;
- Session senderSession = null;
-
- try {
-
- exclusiveSession = conn.createSession(false, Session.AUTO_ACKNOWLEDGE);
- fallbackSession = conn.createSession(false, Session.AUTO_ACKNOWLEDGE);
- senderSession = conn.createSession(false, Session.AUTO_ACKNOWLEDGE);
-
- // This creates the exclusive consumer first which avoids AMQ-1024
- // bug.
- ActiveMQQueue exclusiveQueue = new ActiveMQQueue("TEST.QUEUE4?consumer.exclusive=true");
- MessageConsumer exclusiveConsumer = exclusiveSession.createConsumer(exclusiveQueue);
-
- ActiveMQQueue fallbackQueue = new ActiveMQQueue("TEST.QUEUE4");
- MessageConsumer fallbackConsumer = fallbackSession.createConsumer(fallbackQueue);
-
- ActiveMQQueue senderQueue = new ActiveMQQueue("TEST.QUEUE4");
-
- MessageProducer producer = senderSession.createProducer(senderQueue);
-
- Message msg = senderSession.createTextMessage("test");
- producer.send(msg);
- Thread.sleep(100);
-
- // Verify exclusive consumer receives the message.
- assertNotNull(exclusiveConsumer.receive(100));
- assertNull(fallbackConsumer.receive(100));
-
- // Close the exclusive consumer to verify the non-exclusive consumer
- // takes over
- exclusiveConsumer.close();
-
- producer.send(msg);
-
- // Verify other non-exclusive consumer receices the message.
- assertNotNull(fallbackConsumer.receive(100));
-
- // Create exclusive consumer to determine if it will start receiving
- // the messages.
- exclusiveConsumer = exclusiveSession.createConsumer(exclusiveQueue);
-
- producer.send(msg);
- assertNotNull(exclusiveConsumer.receive(100));
- assertNull(fallbackConsumer.receive(100));
-
- } finally {
- fallbackSession.close();
- senderSession.close();
- conn.close();
- }
-
- }
-}
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.activemq;
+
+import javax.jms.Connection;
+import javax.jms.ConnectionFactory;
+import javax.jms.JMSException;
+import javax.jms.Message;
+import javax.jms.MessageConsumer;
+import javax.jms.MessageProducer;
+import javax.jms.Session;
+
+import junit.framework.TestCase;
+
+import org.apache.activemq.command.ActiveMQQueue;
+
+public class ExclusiveConsumerTest extends TestCase {
+
+ private static final String VM_BROKER_URL = "vm://localhost?broker.persistent=false&broker.useJmx=true";
+
+ public ExclusiveConsumerTest(String name) {
+ super(name);
+ }
+
+ @Override
+ protected void setUp() throws Exception {
+ super.setUp();
+ }
+
+ @Override
+ protected void tearDown() throws Exception {
+ super.tearDown();
+ }
+
+ private Connection createConnection(final boolean start) throws JMSException {
+ ConnectionFactory cf = new ActiveMQConnectionFactory(VM_BROKER_URL);
+ Connection conn = cf.createConnection();
+ if (start) {
+ conn.start();
+ }
+ return conn;
+ }
+
+ public void testExclusiveConsumerSelectedCreatedFirst() throws JMSException, InterruptedException {
+ Connection conn = createConnection(true);
+
+ Session exclusiveSession = null;
+ Session fallbackSession = null;
+ Session senderSession = null;
+
+ try {
+
+ exclusiveSession = conn.createSession(false, Session.AUTO_ACKNOWLEDGE);
+ fallbackSession = conn.createSession(false, Session.AUTO_ACKNOWLEDGE);
+ senderSession = conn.createSession(false, Session.AUTO_ACKNOWLEDGE);
+
+ ActiveMQQueue exclusiveQueue = new ActiveMQQueue("TEST.QUEUE1?consumer.exclusive=true");
+ MessageConsumer exclusiveConsumer = exclusiveSession.createConsumer(exclusiveQueue);
+
+ ActiveMQQueue fallbackQueue = new ActiveMQQueue("TEST.QUEUE1");
+ MessageConsumer fallbackConsumer = fallbackSession.createConsumer(fallbackQueue);
+
+ ActiveMQQueue senderQueue = new ActiveMQQueue("TEST.QUEUE1");
+
+ MessageProducer producer = senderSession.createProducer(senderQueue);
+
+ Message msg = senderSession.createTextMessage("test");
+ producer.send(msg);
+ // TODO need two send a 2nd message - bug AMQ-1024
+ // producer.send(msg);
+ Thread.sleep(100);
+
+ // Verify exclusive consumer receives the message.
+ assertNotNull(exclusiveConsumer.receive(100));
+ assertNull(fallbackConsumer.receive(100));
+
+ } finally {
+ fallbackSession.close();
+ senderSession.close();
+ conn.close();
+ }
+
+ }
+
+ public void testExclusiveConsumerSelectedCreatedAfter() throws JMSException, InterruptedException {
+ Connection conn = createConnection(true);
+
+ Session exclusiveSession = null;
+ Session fallbackSession = null;
+ Session senderSession = null;
+
+ try {
+
+ exclusiveSession = conn.createSession(false, Session.AUTO_ACKNOWLEDGE);
+ fallbackSession = conn.createSession(false, Session.AUTO_ACKNOWLEDGE);
+ senderSession = conn.createSession(false, Session.AUTO_ACKNOWLEDGE);
+
+ ActiveMQQueue fallbackQueue = new ActiveMQQueue("TEST.QUEUE5");
+ MessageConsumer fallbackConsumer = fallbackSession.createConsumer(fallbackQueue);
+
+ ActiveMQQueue exclusiveQueue = new ActiveMQQueue("TEST.QUEUE5?consumer.exclusive=true");
+ MessageConsumer exclusiveConsumer = exclusiveSession.createConsumer(exclusiveQueue);
+
+ ActiveMQQueue senderQueue = new ActiveMQQueue("TEST.QUEUE5");
+
+ MessageProducer producer = senderSession.createProducer(senderQueue);
+
+ Message msg = senderSession.createTextMessage("test");
+ producer.send(msg);
+ Thread.sleep(100);
+
+ // Verify exclusive consumer receives the message.
+ assertNotNull(exclusiveConsumer.receive(100));
+ assertNull(fallbackConsumer.receive(100));
+
+ } finally {
+ fallbackSession.close();
+ senderSession.close();
+ conn.close();
+ }
+
+ }
+
+ public void testFailoverToAnotherExclusiveConsumerCreatedFirst() throws JMSException,
+ InterruptedException {
+ Connection conn = createConnection(true);
+
+ Session exclusiveSession1 = null;
+ Session exclusiveSession2 = null;
+ Session fallbackSession = null;
+ Session senderSession = null;
+
+ try {
+
+ exclusiveSession1 = conn.createSession(false, Session.AUTO_ACKNOWLEDGE);
+ exclusiveSession2 = conn.createSession(false, Session.AUTO_ACKNOWLEDGE);
+ fallbackSession = conn.createSession(false, Session.AUTO_ACKNOWLEDGE);
+ senderSession = conn.createSession(false, Session.AUTO_ACKNOWLEDGE);
+
+ // This creates the exclusive consumer first which avoids AMQ-1024
+ // bug.
+ ActiveMQQueue exclusiveQueue = new ActiveMQQueue("TEST.QUEUE2?consumer.exclusive=true");
+ MessageConsumer exclusiveConsumer1 = exclusiveSession1.createConsumer(exclusiveQueue);
+ MessageConsumer exclusiveConsumer2 = exclusiveSession2.createConsumer(exclusiveQueue);
+
+ ActiveMQQueue fallbackQueue = new ActiveMQQueue("TEST.QUEUE2");
+ MessageConsumer fallbackConsumer = fallbackSession.createConsumer(fallbackQueue);
+
+ ActiveMQQueue senderQueue = new ActiveMQQueue("TEST.QUEUE2");
+
+ MessageProducer producer = senderSession.createProducer(senderQueue);
+
+ Message msg = senderSession.createTextMessage("test");
+ producer.send(msg);
+ Thread.sleep(100);
+
+ // Verify exclusive consumer receives the message.
+ assertNotNull(exclusiveConsumer1.receive(100));
+ assertNull(exclusiveConsumer2.receive(100));
+ assertNull(fallbackConsumer.receive(100));
+
+ // Close the exclusive consumer to verify the non-exclusive consumer
+ // takes over
+ exclusiveConsumer1.close();
+
+ producer.send(msg);
+ producer.send(msg);
+
+ assertNotNull(exclusiveConsumer2.receive(100));
+ assertNull(fallbackConsumer.receive(100));
+
+ } finally {
+ fallbackSession.close();
+ senderSession.close();
+ conn.close();
+ }
+
+ }
+
+ public void testFailoverToAnotherExclusiveConsumerCreatedAfter() throws JMSException,
+ InterruptedException {
+ Connection conn = createConnection(true);
+
+ Session exclusiveSession1 = null;
+ Session exclusiveSession2 = null;
+ Session fallbackSession = null;
+ Session senderSession = null;
+
+ try {
+
+ exclusiveSession1 = conn.createSession(false, Session.AUTO_ACKNOWLEDGE);
+ exclusiveSession2 = conn.createSession(false, Session.AUTO_ACKNOWLEDGE);
+ fallbackSession = conn.createSession(false, Session.AUTO_ACKNOWLEDGE);
+ senderSession = conn.createSession(false, Session.AUTO_ACKNOWLEDGE);
+
+ // This creates the exclusive consumer first which avoids AMQ-1024
+ // bug.
+ ActiveMQQueue exclusiveQueue = new ActiveMQQueue("TEST.QUEUE6?consumer.exclusive=true");
+ MessageConsumer exclusiveConsumer1 = exclusiveSession1.createConsumer(exclusiveQueue);
+
+ ActiveMQQueue fallbackQueue = new ActiveMQQueue("TEST.QUEUE6");
+ MessageConsumer fallbackConsumer = fallbackSession.createConsumer(fallbackQueue);
+
+ MessageConsumer exclusiveConsumer2 = exclusiveSession2.createConsumer(exclusiveQueue);
+
+ ActiveMQQueue senderQueue = new ActiveMQQueue("TEST.QUEUE6");
+
+ MessageProducer producer = senderSession.createProducer(senderQueue);
+
+ Message msg = senderSession.createTextMessage("test");
+ producer.send(msg);
+ Thread.sleep(100);
+
+ // Verify exclusive consumer receives the message.
+ assertNotNull(exclusiveConsumer1.receive(100));
+ assertNull(exclusiveConsumer2.receive(100));
+ assertNull(fallbackConsumer.receive(100));
+
+ // Close the exclusive consumer to verify the non-exclusive consumer
+ // takes over
+ exclusiveConsumer1.close();
+
+ producer.send(msg);
+ producer.send(msg);
+
+ assertNotNull(exclusiveConsumer2.receive(1000));
+ assertNull(fallbackConsumer.receive(100));
+
+ } finally {
+ fallbackSession.close();
+ senderSession.close();
+ conn.close();
+ }
+
+ }
+
+ public void testFailoverToNonExclusiveConsumer() throws JMSException, InterruptedException {
+ Connection conn = createConnection(true);
+
+ Session exclusiveSession = null;
+ Session fallbackSession = null;
+ Session senderSession = null;
+
+ try {
+
+ exclusiveSession = conn.createSession(false, Session.AUTO_ACKNOWLEDGE);
+ fallbackSession = conn.createSession(false, Session.AUTO_ACKNOWLEDGE);
+ senderSession = conn.createSession(false, Session.AUTO_ACKNOWLEDGE);
+
+ // This creates the exclusive consumer first which avoids AMQ-1024
+ // bug.
+ ActiveMQQueue exclusiveQueue = new ActiveMQQueue("TEST.QUEUE3?consumer.exclusive=true");
+ MessageConsumer exclusiveConsumer = exclusiveSession.createConsumer(exclusiveQueue);
+
+ ActiveMQQueue fallbackQueue = new ActiveMQQueue("TEST.QUEUE3");
+ MessageConsumer fallbackConsumer = fallbackSession.createConsumer(fallbackQueue);
+
+ ActiveMQQueue senderQueue = new ActiveMQQueue("TEST.QUEUE3");
+
+ MessageProducer producer = senderSession.createProducer(senderQueue);
+
+ Message msg = senderSession.createTextMessage("test");
+ producer.send(msg);
+ Thread.sleep(100);
+
+ // Verify exclusive consumer receives the message.
+ assertNotNull(exclusiveConsumer.receive(100));
+ assertNull(fallbackConsumer.receive(100));
+
+ // Close the exclusive consumer to verify the non-exclusive consumer
+ // takes over
+ exclusiveConsumer.close();
+
+ producer.send(msg);
+
+ assertNotNull(fallbackConsumer.receive(100));
+
+ } finally {
+ fallbackSession.close();
+ senderSession.close();
+ conn.close();
+ }
+
+ }
+
+ public void testFallbackToExclusiveConsumer() throws JMSException, InterruptedException {
+ Connection conn = createConnection(true);
+
+ Session exclusiveSession = null;
+ Session fallbackSession = null;
+ Session senderSession = null;
+
+ try {
+
+ exclusiveSession = conn.createSession(false, Session.AUTO_ACKNOWLEDGE);
+ fallbackSession = conn.createSession(false, Session.AUTO_ACKNOWLEDGE);
+ senderSession = conn.createSession(false, Session.AUTO_ACKNOWLEDGE);
+
+ // This creates the exclusive consumer first which avoids AMQ-1024
+ // bug.
+ ActiveMQQueue exclusiveQueue = new ActiveMQQueue("TEST.QUEUE4?consumer.exclusive=true");
+ MessageConsumer exclusiveConsumer = exclusiveSession.createConsumer(exclusiveQueue);
+
+ ActiveMQQueue fallbackQueue = new ActiveMQQueue("TEST.QUEUE4");
+ MessageConsumer fallbackConsumer = fallbackSession.createConsumer(fallbackQueue);
+
+ ActiveMQQueue senderQueue = new ActiveMQQueue("TEST.QUEUE4");
+
+ MessageProducer producer = senderSession.createProducer(senderQueue);
+
+ Message msg = senderSession.createTextMessage("test");
+ producer.send(msg);
+ Thread.sleep(100);
+
+ // Verify exclusive consumer receives the message.
+ assertNotNull(exclusiveConsumer.receive(100));
+ assertNull(fallbackConsumer.receive(100));
+
+ // Close the exclusive consumer to verify the non-exclusive consumer
+ // takes over
+ exclusiveConsumer.close();
+
+ producer.send(msg);
+
+ // Verify other non-exclusive consumer receices the message.
+ assertNotNull(fallbackConsumer.receive(100));
+
+ // Create exclusive consumer to determine if it will start receiving
+ // the messages.
+ exclusiveConsumer = exclusiveSession.createConsumer(exclusiveQueue);
+
+ producer.send(msg);
+ assertNotNull(exclusiveConsumer.receive(100));
+ assertNull(fallbackConsumer.receive(100));
+
+ } finally {
+ fallbackSession.close();
+ senderSession.close();
+ conn.close();
+ }
+
+ }
+}
http://git-wip-us.apache.org/repos/asf/activemq/blob/3f32507f/activemq-unit-tests/src/test/java/org/apache/activemq/bugs/AMQ4893Test.java
----------------------------------------------------------------------
diff --git a/activemq-unit-tests/src/test/java/org/apache/activemq/bugs/AMQ4893Test.java b/activemq-unit-tests/src/test/java/org/apache/activemq/bugs/AMQ4893Test.java
index bd4b724..026a4be 100644
--- a/activemq-unit-tests/src/test/java/org/apache/activemq/bugs/AMQ4893Test.java
+++ b/activemq-unit-tests/src/test/java/org/apache/activemq/bugs/AMQ4893Test.java
@@ -1,86 +1,86 @@
-/**
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements. See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.activemq.bugs;
-
-import java.io.IOException;
-import java.util.Map;
-
-import javax.jms.JMSException;
-
-import org.apache.activemq.command.ActiveMQObjectMessage;
-import org.apache.activemq.openwire.OpenWireFormat;
-import org.apache.activemq.util.ByteSequence;
-import org.junit.Test;
-import org.slf4j.Logger;
-import org.slf4j.LoggerFactory;
-
-public class AMQ4893Test {
-
- private static final transient Logger LOG = LoggerFactory.getLogger(AMQ4893Test.class);
-
- @Test
- public void testPropertiesInt() throws Exception {
- ActiveMQObjectMessage message = new ActiveMQObjectMessage();
- message.setIntProperty("TestProp", 333);
- fakeUnmarshal(message);
- roundTripProperties(message);
- }
-
- @Test
- public void testPropertiesString() throws Exception {
- ActiveMQObjectMessage message = new ActiveMQObjectMessage();
- message.setStringProperty("TestProp", "Value");
- fakeUnmarshal(message);
- roundTripProperties(message);
- }
-
- @Test
- public void testPropertiesObject() throws Exception {
- ActiveMQObjectMessage message = new ActiveMQObjectMessage();
- message.setObjectProperty("TestProp", "Value");
- fakeUnmarshal(message);
- roundTripProperties(message);
- }
-
- @Test
- public void testPropertiesObjectNoMarshalling() throws Exception {
- ActiveMQObjectMessage message = new ActiveMQObjectMessage();
- message.setObjectProperty("TestProp", "Value");
- roundTripProperties(message);
- }
-
- private void roundTripProperties(ActiveMQObjectMessage message) throws IOException, JMSException {
- ActiveMQObjectMessage copy = new ActiveMQObjectMessage();
- for (Map.Entry<String, Object> prop : message.getProperties().entrySet()) {
- LOG.debug("{} -> {}", prop.getKey(), prop.getValue().getClass());
- copy.setObjectProperty(prop.getKey(), prop.getValue());
- }
- }
-
- private void fakeUnmarshal(ActiveMQObjectMessage message) throws IOException {
- // we need to force the unmarshalled property field to be set so it
- // gives us a hawtbuffer for the string
- OpenWireFormat format = new OpenWireFormat();
- message.beforeMarshall(format);
- message.afterMarshall(format);
-
- ByteSequence seq = message.getMarshalledProperties();
- message.clearProperties();
- message.setMarshalledProperties(seq);
- }
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.activemq.bugs;
+
+import java.io.IOException;
+import java.util.Map;
+
+import javax.jms.JMSException;
+
+import org.apache.activemq.command.ActiveMQObjectMessage;
+import org.apache.activemq.openwire.OpenWireFormat;
+import org.apache.activemq.util.ByteSequence;
+import org.junit.Test;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+public class AMQ4893Test {
+
+ private static final transient Logger LOG = LoggerFactory.getLogger(AMQ4893Test.class);
+
+ @Test
+ public void testPropertiesInt() throws Exception {
+ ActiveMQObjectMessage message = new ActiveMQObjectMessage();
+ message.setIntProperty("TestProp", 333);
+ fakeUnmarshal(message);
+ roundTripProperties(message);
+ }
+
+ @Test
+ public void testPropertiesString() throws Exception {
+ ActiveMQObjectMessage message = new ActiveMQObjectMessage();
+ message.setStringProperty("TestProp", "Value");
+ fakeUnmarshal(message);
+ roundTripProperties(message);
+ }
+
+ @Test
+ public void testPropertiesObject() throws Exception {
+ ActiveMQObjectMessage message = new ActiveMQObjectMessage();
+ message.setObjectProperty("TestProp", "Value");
+ fakeUnmarshal(message);
+ roundTripProperties(message);
+ }
+
+ @Test
+ public void testPropertiesObjectNoMarshalling() throws Exception {
+ ActiveMQObjectMessage message = new ActiveMQObjectMessage();
+ message.setObjectProperty("TestProp", "Value");
+ roundTripProperties(message);
+ }
+
+ private void roundTripProperties(ActiveMQObjectMessage message) throws IOException, JMSException {
+ ActiveMQObjectMessage copy = new ActiveMQObjectMessage();
+ for (Map.Entry<String, Object> prop : message.getProperties().entrySet()) {
+ LOG.debug("{} -> {}", prop.getKey(), prop.getValue().getClass());
+ copy.setObjectProperty(prop.getKey(), prop.getValue());
+ }
+ }
+
+ private void fakeUnmarshal(ActiveMQObjectMessage message) throws IOException {
+ // we need to force the unmarshalled property field to be set so it
+ // gives us a hawtbuffer for the string
+ OpenWireFormat format = new OpenWireFormat();
+ message.beforeMarshall(format);
+ message.afterMarshall(format);
+
+ ByteSequence seq = message.getMarshalledProperties();
+ message.clearProperties();
+ message.setMarshalledProperties(seq);
+ }
}
\ No newline at end of file
http://git-wip-us.apache.org/repos/asf/activemq/blob/3f32507f/activemq-unit-tests/src/test/java/org/apache/activemq/bugs/Receiver.java
----------------------------------------------------------------------
diff --git a/activemq-unit-tests/src/test/java/org/apache/activemq/bugs/Receiver.java b/activemq-unit-tests/src/test/java/org/apache/activemq/bugs/Receiver.java
index b1872df..65f30e3 100644
--- a/activemq-unit-tests/src/test/java/org/apache/activemq/bugs/Receiver.java
+++ b/activemq-unit-tests/src/test/java/org/apache/activemq/bugs/Receiver.java
@@ -1,21 +1,21 @@
-/**
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements. See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package org.apache.activemq.bugs;
-
-public interface Receiver {
- void receive(String s) throws Exception;
-}
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.activemq.bugs;
+
+public interface Receiver {
+ void receive(String s) throws Exception;
+}
http://git-wip-us.apache.org/repos/asf/activemq/blob/3f32507f/activemq-unit-tests/src/test/java/org/apache/activemq/store/kahadb/PBMesssagesTest.java
----------------------------------------------------------------------
diff --git a/activemq-unit-tests/src/test/java/org/apache/activemq/store/kahadb/PBMesssagesTest.java b/activemq-unit-tests/src/test/java/org/apache/activemq/store/kahadb/PBMesssagesTest.java
index 8d2c1ff..8d1ed68 100644
--- a/activemq-unit-tests/src/test/java/org/apache/activemq/store/kahadb/PBMesssagesTest.java
+++ b/activemq-unit-tests/src/test/java/org/apache/activemq/store/kahadb/PBMesssagesTest.java
@@ -1,56 +1,56 @@
-/**
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements. See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package org.apache.activemq.store.kahadb;
-
-import java.io.IOException;
-
-import junit.framework.TestCase;
-
-import org.apache.activemq.protobuf.Buffer;
-import org.apache.activemq.store.kahadb.data.KahaAddMessageCommand;
-import org.apache.activemq.store.kahadb.data.KahaDestination;
-import org.apache.activemq.store.kahadb.data.KahaDestination.DestinationType;
-import org.apache.activemq.store.kahadb.data.KahaEntryType;
-import org.apache.activemq.util.ByteSequence;
-import org.apache.activemq.util.DataByteArrayInputStream;
-import org.apache.activemq.util.DataByteArrayOutputStream;
-
-public class PBMesssagesTest extends TestCase {
-
- @SuppressWarnings("rawtypes")
- public void testKahaAddMessageCommand() throws IOException {
-
- KahaAddMessageCommand expected = new KahaAddMessageCommand();
- expected.setDestination(new KahaDestination().setName("Foo").setType(DestinationType.QUEUE));
- expected.setMessage(new Buffer(new byte[] {1,2,3,4,5,6} ));
- expected.setMessageId("Hello World");
-
- int size = expected.serializedSizeFramed();
- DataByteArrayOutputStream os = new DataByteArrayOutputStream(size + 1);
- os.writeByte(expected.type().getNumber());
- expected.writeFramed(os);
- ByteSequence seq = os.toByteSequence();
-
- DataByteArrayInputStream is = new DataByteArrayInputStream(seq);
- KahaEntryType type = KahaEntryType.valueOf(is.readByte());
- JournalCommand message = (JournalCommand)type.createMessage();
- message.mergeFramed(is);
-
- assertEquals(expected, message);
- }
-
-}
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.activemq.store.kahadb;
+
+import java.io.IOException;
+
+import junit.framework.TestCase;
+
+import org.apache.activemq.protobuf.Buffer;
+import org.apache.activemq.store.kahadb.data.KahaAddMessageCommand;
+import org.apache.activemq.store.kahadb.data.KahaDestination;
+import org.apache.activemq.store.kahadb.data.KahaDestination.DestinationType;
+import org.apache.activemq.store.kahadb.data.KahaEntryType;
+import org.apache.activemq.util.ByteSequence;
+import org.apache.activemq.util.DataByteArrayInputStream;
+import org.apache.activemq.util.DataByteArrayOutputStream;
+
+public class PBMesssagesTest extends TestCase {
+
+ @SuppressWarnings("rawtypes")
+ public void testKahaAddMessageCommand() throws IOException {
+
+ KahaAddMessageCommand expected = new KahaAddMessageCommand();
+ expected.setDestination(new KahaDestination().setName("Foo").setType(DestinationType.QUEUE));
+ expected.setMessage(new Buffer(new byte[] {1,2,3,4,5,6} ));
+ expected.setMessageId("Hello World");
+
+ int size = expected.serializedSizeFramed();
+ DataByteArrayOutputStream os = new DataByteArrayOutputStream(size + 1);
+ os.writeByte(expected.type().getNumber());
+ expected.writeFramed(os);
+ ByteSequence seq = os.toByteSequence();
+
+ DataByteArrayInputStream is = new DataByteArrayInputStream(seq);
+ KahaEntryType type = KahaEntryType.valueOf(is.readByte());
+ JournalCommand message = (JournalCommand)type.createMessage();
+ message.mergeFramed(is);
+
+ assertEquals(expected, message);
+ }
+
+}