You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@activemq.apache.org by ch...@apache.org on 2012/10/26 22:28:50 UTC
svn commit: r1402652 [2/2] - in /activemq/trunk/activemq-console: ./
src/main/java/org/apache/activemq/console/command/
src/main/java/org/apache/activemq/console/command/store/
src/main/java/org/apache/activemq/console/command/store/tar/ src/main/proto/
Added: activemq/trunk/activemq-console/src/main/java/org/apache/activemq/console/command/store/tar/TarOutputStream.java
URL: http://svn.apache.org/viewvc/activemq/trunk/activemq-console/src/main/java/org/apache/activemq/console/command/store/tar/TarOutputStream.java?rev=1402652&view=auto
==============================================================================
--- activemq/trunk/activemq-console/src/main/java/org/apache/activemq/console/command/store/tar/TarOutputStream.java (added)
+++ activemq/trunk/activemq-console/src/main/java/org/apache/activemq/console/command/store/tar/TarOutputStream.java Fri Oct 26 20:28:49 2012
@@ -0,0 +1,356 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ *
+ */
+
+/*
+ * This package is based on the work done by Timothy Gerard Endres
+ * (time@ice.com) to whom the Ant project is very grateful for his great code.
+ */
+
+package org.apache.activemq.console.command.store.tar;
+
+import java.io.FilterOutputStream;
+import java.io.OutputStream;
+import java.io.IOException;
+
+/**
+ * The TarOutputStream writes a UNIX tar archive as an OutputStream.
+ * Methods are provided to put entries, and then write their contents
+ * by writing to this stream using write().
+ *
+ */
+public class TarOutputStream extends FilterOutputStream {
+ /** Fail if a long file name is required in the archive. */
+ public static final int LONGFILE_ERROR = 0;
+
+ /** Long paths will be truncated in the archive. */
+ public static final int LONGFILE_TRUNCATE = 1;
+
+ /** GNU tar extensions are used to store long file names in the archive. */
+ public static final int LONGFILE_GNU = 2;
+
+ // CheckStyle:VisibilityModifier OFF - bc
+ protected boolean debug;
+ protected long currSize;
+ protected String currName;
+ protected long currBytes;
+ protected byte[] oneBuf;
+ protected byte[] recordBuf;
+ protected int assemLen;
+ protected byte[] assemBuf;
+ protected TarBuffer buffer;
+ protected int longFileMode = LONGFILE_ERROR;
+ // CheckStyle:VisibilityModifier ON
+
+ private boolean closed = false;
+
+ /**
+ * Constructor for TarInputStream.
+ * @param os the output stream to use
+ */
+ public TarOutputStream(OutputStream os) {
+ this(os, TarBuffer.DEFAULT_BLKSIZE, TarBuffer.DEFAULT_RCDSIZE);
+ }
+
+ /**
+ * Constructor for TarInputStream.
+ * @param os the output stream to use
+ * @param blockSize the block size to use
+ */
+ public TarOutputStream(OutputStream os, int blockSize) {
+ this(os, blockSize, TarBuffer.DEFAULT_RCDSIZE);
+ }
+
+ /**
+ * Constructor for TarInputStream.
+ * @param os the output stream to use
+ * @param blockSize the block size to use
+ * @param recordSize the record size to use
+ */
+ public TarOutputStream(OutputStream os, int blockSize, int recordSize) {
+ super(os);
+
+ this.buffer = new TarBuffer(os, blockSize, recordSize);
+ this.debug = false;
+ this.assemLen = 0;
+ this.assemBuf = new byte[recordSize];
+ this.recordBuf = new byte[recordSize];
+ this.oneBuf = new byte[1];
+ }
+
+ /**
+ * Set the long file mode.
+ * This can be LONGFILE_ERROR(0), LONGFILE_TRUNCATE(1) or LONGFILE_GNU(2).
+ * This specifies the treatment of long file names (names >= TarConstants.NAMELEN).
+ * Default is LONGFILE_ERROR.
+ * @param longFileMode the mode to use
+ */
+ public void setLongFileMode(int longFileMode) {
+ this.longFileMode = longFileMode;
+ }
+
+
+ /**
+ * Sets the debugging flag.
+ *
+ * @param debugF True to turn on debugging.
+ */
+ public void setDebug(boolean debugF) {
+ this.debug = debugF;
+ }
+
+ /**
+ * Sets the debugging flag in this stream's TarBuffer.
+ *
+ * @param debug True to turn on debugging.
+ */
+ public void setBufferDebug(boolean debug) {
+ buffer.setDebug(debug);
+ }
+
+ /**
+ * Ends the TAR archive without closing the underlying OutputStream.
+ * The result is that the two EOF records of nulls are written.
+ * @throws IOException on error
+ */
+ public void finish() throws IOException {
+ // See Bugzilla 28776 for a discussion on this
+ // http://issues.apache.org/bugzilla/show_bug.cgi?id=28776
+ writeEOFRecord();
+ writeEOFRecord();
+ buffer.flushBlock();
+ }
+
+ /**
+ * Ends the TAR archive and closes the underlying OutputStream.
+ * This means that finish() is called followed by calling the
+ * TarBuffer's close().
+ * @throws IOException on error
+ */
+ public void close() throws IOException {
+ if (!closed) {
+ finish();
+ buffer.close();
+ out.close();
+ closed = true;
+ }
+ }
+
+ /**
+ * Get the record size being used by this stream's TarBuffer.
+ *
+ * @return The TarBuffer record size.
+ */
+ public int getRecordSize() {
+ return buffer.getRecordSize();
+ }
+
+ /**
+ * Put an entry on the output stream. This writes the entry's
+ * header record and positions the output stream for writing
+ * the contents of the entry. Once this method is called, the
+ * stream is ready for calls to write() to write the entry's
+ * contents. Once the contents are written, closeEntry()
+ * <B>MUST</B> be called to ensure that all buffered data
+ * is completely written to the output stream.
+ *
+ * @param entry The TarEntry to be written to the archive.
+ * @throws IOException on error
+ */
+ public void putNextEntry(TarEntry entry) throws IOException {
+ if (entry.getName().length() >= TarConstants.NAMELEN) {
+
+ if (longFileMode == LONGFILE_GNU) {
+ // create a TarEntry for the LongLink, the contents
+ // of which are the entry's name
+ TarEntry longLinkEntry = new TarEntry(TarConstants.GNU_LONGLINK,
+ TarConstants.LF_GNUTYPE_LONGNAME);
+
+ longLinkEntry.setSize(entry.getName().length() + 1);
+ putNextEntry(longLinkEntry);
+ write(entry.getName().getBytes());
+ write(0);
+ closeEntry();
+ } else if (longFileMode != LONGFILE_TRUNCATE) {
+ throw new RuntimeException("file name '" + entry.getName()
+ + "' is too long ( > "
+ + TarConstants.NAMELEN + " bytes)");
+ }
+ }
+
+ entry.writeEntryHeader(recordBuf);
+ buffer.writeRecord(recordBuf);
+
+ currBytes = 0;
+
+ if (entry.isDirectory()) {
+ currSize = 0;
+ } else {
+ currSize = entry.getSize();
+ }
+ currName = entry.getName();
+ }
+
+ /**
+ * Close an entry. This method MUST be called for all file
+ * entries that contain data. The reason is that we must
+ * buffer data written to the stream in order to satisfy
+ * the buffer's record based writes. Thus, there may be
+ * data fragments still being assembled that must be written
+ * to the output stream before this entry is closed and the
+ * next entry written.
+ * @throws IOException on error
+ */
+ public void closeEntry() throws IOException {
+ if (assemLen > 0) {
+ for (int i = assemLen; i < assemBuf.length; ++i) {
+ assemBuf[i] = 0;
+ }
+
+ buffer.writeRecord(assemBuf);
+
+ currBytes += assemLen;
+ assemLen = 0;
+ }
+
+ if (currBytes < currSize) {
+ throw new IOException("entry '" + currName + "' closed at '"
+ + currBytes
+ + "' before the '" + currSize
+ + "' bytes specified in the header were written");
+ }
+ }
+
+ /**
+ * Writes a byte to the current tar archive entry.
+ *
+ * This method simply calls read( byte[], int, int ).
+ *
+ * @param b The byte written.
+ * @throws IOException on error
+ */
+ public void write(int b) throws IOException {
+ oneBuf[0] = (byte) b;
+
+ write(oneBuf, 0, 1);
+ }
+
+ /**
+ * Writes bytes to the current tar archive entry.
+ *
+ * This method simply calls write( byte[], int, int ).
+ *
+ * @param wBuf The buffer to write to the archive.
+ * @throws IOException on error
+ */
+ public void write(byte[] wBuf) throws IOException {
+ write(wBuf, 0, wBuf.length);
+ }
+
+ /**
+ * Writes bytes to the current tar archive entry. This method
+ * is aware of the current entry and will throw an exception if
+ * you attempt to write bytes past the length specified for the
+ * current entry. The method is also (painfully) aware of the
+ * record buffering required by TarBuffer, and manages buffers
+ * that are not a multiple of recordsize in length, including
+ * assembling records from small buffers.
+ *
+ * @param wBuf The buffer to write to the archive.
+ * @param wOffset The offset in the buffer from which to get bytes.
+ * @param numToWrite The number of bytes to write.
+ * @throws IOException on error
+ */
+ public void write(byte[] wBuf, int wOffset, int numToWrite) throws IOException {
+ if ((currBytes + numToWrite) > currSize) {
+ throw new IOException("request to write '" + numToWrite
+ + "' bytes exceeds size in header of '"
+ + currSize + "' bytes for entry '"
+ + currName + "'");
+
+ //
+ // We have to deal with assembly!!!
+ // The programmer can be writing little 32 byte chunks for all
+ // we know, and we must assemble complete records for writing.
+ // REVIEW Maybe this should be in TarBuffer? Could that help to
+ // eliminate some of the buffer copying.
+ //
+ }
+
+ if (assemLen > 0) {
+ if ((assemLen + numToWrite) >= recordBuf.length) {
+ int aLen = recordBuf.length - assemLen;
+
+ System.arraycopy(assemBuf, 0, recordBuf, 0,
+ assemLen);
+ System.arraycopy(wBuf, wOffset, recordBuf,
+ assemLen, aLen);
+ buffer.writeRecord(recordBuf);
+
+ currBytes += recordBuf.length;
+ wOffset += aLen;
+ numToWrite -= aLen;
+ assemLen = 0;
+ } else {
+ System.arraycopy(wBuf, wOffset, assemBuf, assemLen,
+ numToWrite);
+
+ wOffset += numToWrite;
+ assemLen += numToWrite;
+ numToWrite = 0;
+ }
+ }
+
+ //
+ // When we get here we have EITHER:
+ // o An empty "assemble" buffer.
+ // o No bytes to write (numToWrite == 0)
+ //
+ while (numToWrite > 0) {
+ if (numToWrite < recordBuf.length) {
+ System.arraycopy(wBuf, wOffset, assemBuf, assemLen,
+ numToWrite);
+
+ assemLen += numToWrite;
+
+ break;
+ }
+
+ buffer.writeRecord(wBuf, wOffset);
+
+ int num = recordBuf.length;
+
+ currBytes += num;
+ numToWrite -= num;
+ wOffset += num;
+ }
+ }
+
+ /**
+ * Write an EOF (end of archive) record to the tar archive.
+ * An EOF record consists of a record of all zeros.
+ */
+ private void writeEOFRecord() throws IOException {
+ for (int i = 0; i < recordBuf.length; ++i) {
+ recordBuf[i] = 0;
+ }
+
+ buffer.writeRecord(recordBuf);
+ }
+}
+
+
Added: activemq/trunk/activemq-console/src/main/java/org/apache/activemq/console/command/store/tar/TarUtils.java
URL: http://svn.apache.org/viewvc/activemq/trunk/activemq-console/src/main/java/org/apache/activemq/console/command/store/tar/TarUtils.java?rev=1402652&view=auto
==============================================================================
--- activemq/trunk/activemq-console/src/main/java/org/apache/activemq/console/command/store/tar/TarUtils.java (added)
+++ activemq/trunk/activemq-console/src/main/java/org/apache/activemq/console/command/store/tar/TarUtils.java Fri Oct 26 20:28:49 2012
@@ -0,0 +1,206 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ *
+ */
+
+/*
+ * This package is based on the work done by Timothy Gerard Endres
+ * (time@ice.com) to whom the Ant project is very grateful for his great code.
+ */
+
+package org.apache.activemq.console.command.store.tar;
+
+/**
+ * This class provides static utility methods to work with byte streams.
+ *
+ */
+// CheckStyle:HideUtilityClassConstructorCheck OFF (bc)
+public class TarUtils {
+
+ private static final int BYTE_MASK = 255;
+
+ /**
+ * Parse an octal string from a header buffer. This is used for the
+ * file permission mode value.
+ *
+ * @param header The header buffer from which to parse.
+ * @param offset The offset into the buffer from which to parse.
+ * @param length The number of header bytes to parse.
+ * @return The long value of the octal string.
+ */
+ public static long parseOctal(byte[] header, int offset, int length) {
+ long result = 0;
+ boolean stillPadding = true;
+ int end = offset + length;
+
+ for (int i = offset; i < end; ++i) {
+ if (header[i] == 0) {
+ break;
+ }
+
+ if (header[i] == (byte) ' ' || header[i] == '0') {
+ if (stillPadding) {
+ continue;
+ }
+
+ if (header[i] == (byte) ' ') {
+ break;
+ }
+ }
+
+ stillPadding = false;
+ // CheckStyle:MagicNumber OFF
+ result = (result << 3) + (header[i] - '0');
+ // CheckStyle:MagicNumber ON
+ }
+
+ return result;
+ }
+
+ /**
+ * Parse an entry name from a header buffer.
+ *
+ * @param header The header buffer from which to parse.
+ * @param offset The offset into the buffer from which to parse.
+ * @param length The number of header bytes to parse.
+ * @return The header's entry name.
+ */
+ public static StringBuffer parseName(byte[] header, int offset, int length) {
+ StringBuffer result = new StringBuffer(length);
+ int end = offset + length;
+
+ for (int i = offset; i < end; ++i) {
+ if (header[i] == 0) {
+ break;
+ }
+
+ result.append((char) header[i]);
+ }
+
+ return result;
+ }
+
+ /**
+ * Determine the number of bytes in an entry name.
+ *
+ * @param name The header name from which to parse.
+ * @param buf The buffer from which to parse.
+ * @param offset The offset into the buffer from which to parse.
+ * @param length The number of header bytes to parse.
+ * @return The number of bytes in a header's entry name.
+ */
+ public static int getNameBytes(StringBuffer name, byte[] buf, int offset, int length) {
+ int i;
+
+ for (i = 0; i < length && i < name.length(); ++i) {
+ buf[offset + i] = (byte) name.charAt(i);
+ }
+
+ for (; i < length; ++i) {
+ buf[offset + i] = 0;
+ }
+
+ return offset + length;
+ }
+
+ /**
+ * Parse an octal integer from a header buffer.
+ *
+ * @param value The header value
+ * @param buf The buffer from which to parse.
+ * @param offset The offset into the buffer from which to parse.
+ * @param length The number of header bytes to parse.
+ * @return The integer value of the octal bytes.
+ */
+ public static int getOctalBytes(long value, byte[] buf, int offset, int length) {
+ int idx = length - 1;
+
+ buf[offset + idx] = 0;
+ --idx;
+ buf[offset + idx] = (byte) ' ';
+ --idx;
+
+ if (value == 0) {
+ buf[offset + idx] = (byte) '0';
+ --idx;
+ } else {
+ for (long val = value; idx >= 0 && val > 0; --idx) {
+ // CheckStyle:MagicNumber OFF
+ buf[offset + idx] = (byte) ((byte) '0' + (byte) (val & 7));
+ val = val >> 3;
+ // CheckStyle:MagicNumber ON
+ }
+ }
+
+ for (; idx >= 0; --idx) {
+ buf[offset + idx] = (byte) ' ';
+ }
+
+ return offset + length;
+ }
+
+ /**
+ * Parse an octal long integer from a header buffer.
+ *
+ * @param value The header value
+ * @param buf The buffer from which to parse.
+ * @param offset The offset into the buffer from which to parse.
+ * @param length The number of header bytes to parse.
+ * @return The long value of the octal bytes.
+ */
+ public static int getLongOctalBytes(long value, byte[] buf, int offset, int length) {
+ byte[] temp = new byte[length + 1];
+
+ getOctalBytes(value, temp, 0, length + 1);
+ System.arraycopy(temp, 0, buf, offset, length);
+
+ return offset + length;
+ }
+
+ /**
+ * Parse the checksum octal integer from a header buffer.
+ *
+ * @param value The header value
+ * @param buf The buffer from which to parse.
+ * @param offset The offset into the buffer from which to parse.
+ * @param length The number of header bytes to parse.
+ * @return The integer value of the entry's checksum.
+ */
+ public static int getCheckSumOctalBytes(long value, byte[] buf, int offset, int length) {
+ getOctalBytes(value, buf, offset, length);
+
+ buf[offset + length - 1] = (byte) ' ';
+ buf[offset + length - 2] = 0;
+
+ return offset + length;
+ }
+
+ /**
+ * Compute the checksum of a tar entry header.
+ *
+ * @param buf The tar entry's header buffer.
+ * @return The computed checksum.
+ */
+ public static long computeCheckSum(byte[] buf) {
+ long sum = 0;
+
+ for (int i = 0; i < buf.length; ++i) {
+ sum += BYTE_MASK & buf[i];
+ }
+
+ return sum;
+ }
+}
Added: activemq/trunk/activemq-console/src/main/proto/data.proto
URL: http://svn.apache.org/viewvc/activemq/trunk/activemq-console/src/main/proto/data.proto?rev=1402652&view=auto
==============================================================================
--- activemq/trunk/activemq-console/src/main/proto/data.proto (added)
+++ activemq/trunk/activemq-console/src/main/proto/data.proto Fri Oct 26 20:28:49 2012
@@ -0,0 +1,60 @@
+//
+// Licensed to the Apache Software Foundation (ASF) under one or more
+// contributor license agreements. See the NOTICE file distributed with
+// this work for additional information regarding copyright ownership.
+// The ASF licenses this file to You under the Apache License, Version 2.0
+// (the "License"); you may not use this file except in compliance with
+// the License. You may obtain a copy of the License at
+//
+// http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing, software
+// distributed under the License is distributed on an "AS IS" BASIS,
+// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+// See the License for the specific language governing permissions and
+// limitations under the License.
+//
+package org.apache.activemq.console.command.store.proto;
+
+option java_multiple_files = true;
+
+///////////////////////////////////////////////////////////////
+// Message related operations.
+///////////////////////////////////////////////////////////////
+
+message MessagePB {
+ required int64 messageKey=1;
+ required bytes codec = 2 [java_override_type = "AsciiBuffer"];
+ optional int32 size = 3;
+ optional bytes value = 4;
+ optional sint64 expiration = 5;
+ optional int32 compression = 6;
+
+ optional bytes direct_data = 10;
+ optional bytes direct_file = 12;
+ optional int64 direct_offset = 13;
+ optional int32 direct_size = 14;
+}
+
+message QueuePB {
+ required int64 key=1;
+ optional bytes binding_kind = 2 [java_override_type = "AsciiBuffer"];
+ optional bytes binding_data = 3;
+}
+
+message QueueEntryPB {
+ required int64 queueKey=1;
+ required int64 queueSeq=2;
+ required int64 messageKey=3;
+ optional int32 size=4;
+ optional bytes attachment=5;
+ optional int32 redeliveries = 6;
+ optional sint64 expiration=7;
+ optional bytes messageLocator=8;
+ repeated bytes sender=9;
+}
+
+message MapEntryPB {
+ required bytes key = 1;
+ optional bytes value = 2;
+}