You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@activemq.apache.org by ch...@apache.org on 2012/10/26 22:28:50 UTC

svn commit: r1402652 [2/2] - in /activemq/trunk/activemq-console: ./ src/main/java/org/apache/activemq/console/command/ src/main/java/org/apache/activemq/console/command/store/ src/main/java/org/apache/activemq/console/command/store/tar/ src/main/proto/

Added: activemq/trunk/activemq-console/src/main/java/org/apache/activemq/console/command/store/tar/TarOutputStream.java
URL: http://svn.apache.org/viewvc/activemq/trunk/activemq-console/src/main/java/org/apache/activemq/console/command/store/tar/TarOutputStream.java?rev=1402652&view=auto
==============================================================================
--- activemq/trunk/activemq-console/src/main/java/org/apache/activemq/console/command/store/tar/TarOutputStream.java (added)
+++ activemq/trunk/activemq-console/src/main/java/org/apache/activemq/console/command/store/tar/TarOutputStream.java Fri Oct 26 20:28:49 2012
@@ -0,0 +1,356 @@
+/*
+ *  Licensed to the Apache Software Foundation (ASF) under one or more
+ *  contributor license agreements.  See the NOTICE file distributed with
+ *  this work for additional information regarding copyright ownership.
+ *  The ASF licenses this file to You under the Apache License, Version 2.0
+ *  (the "License"); you may not use this file except in compliance with
+ *  the License.  You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ *  Unless required by applicable law or agreed to in writing, software
+ *  distributed under the License is distributed on an "AS IS" BASIS,
+ *  WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ *  See the License for the specific language governing permissions and
+ *  limitations under the License.
+ *
+ */
+
+/*
+ * This package is based on the work done by Timothy Gerard Endres
+ * (time@ice.com) to whom the Ant project is very grateful for his great code.
+ */
+
+package org.apache.activemq.console.command.store.tar;
+
+import java.io.FilterOutputStream;
+import java.io.OutputStream;
+import java.io.IOException;
+
+/**
+ * The TarOutputStream writes a UNIX tar archive as an OutputStream.
+ * Methods are provided to put entries, and then write their contents
+ * by writing to this stream using write().
+ *
+ */
+public class TarOutputStream extends FilterOutputStream {
+    /** Fail if a long file name is required in the archive. */
+    public static final int LONGFILE_ERROR = 0;
+
+    /** Long paths will be truncated in the archive. */
+    public static final int LONGFILE_TRUNCATE = 1;
+
+    /** GNU tar extensions are used to store long file names in the archive. */
+    public static final int LONGFILE_GNU = 2;
+
+    // CheckStyle:VisibilityModifier OFF - bc
+    protected boolean   debug;
+    protected long      currSize;
+    protected String    currName;
+    protected long      currBytes;
+    protected byte[]    oneBuf;
+    protected byte[]    recordBuf;
+    protected int       assemLen;
+    protected byte[]    assemBuf;
+    protected TarBuffer buffer;
+    protected int       longFileMode = LONGFILE_ERROR;
+    // CheckStyle:VisibilityModifier ON
+
+    private boolean closed = false;
+
+    /**
+     * Constructor for TarInputStream.
+     * @param os the output stream to use
+     */
+    public TarOutputStream(OutputStream os) {
+        this(os, TarBuffer.DEFAULT_BLKSIZE, TarBuffer.DEFAULT_RCDSIZE);
+    }
+
+    /**
+     * Constructor for TarInputStream.
+     * @param os the output stream to use
+     * @param blockSize the block size to use
+     */
+    public TarOutputStream(OutputStream os, int blockSize) {
+        this(os, blockSize, TarBuffer.DEFAULT_RCDSIZE);
+    }
+
+    /**
+     * Constructor for TarInputStream.
+     * @param os the output stream to use
+     * @param blockSize the block size to use
+     * @param recordSize the record size to use
+     */
+    public TarOutputStream(OutputStream os, int blockSize, int recordSize) {
+        super(os);
+
+        this.buffer = new TarBuffer(os, blockSize, recordSize);
+        this.debug = false;
+        this.assemLen = 0;
+        this.assemBuf = new byte[recordSize];
+        this.recordBuf = new byte[recordSize];
+        this.oneBuf = new byte[1];
+    }
+
+    /**
+     * Set the long file mode.
+     * This can be LONGFILE_ERROR(0), LONGFILE_TRUNCATE(1) or LONGFILE_GNU(2).
+     * This specifies the treatment of long file names (names >= TarConstants.NAMELEN).
+     * Default is LONGFILE_ERROR.
+     * @param longFileMode the mode to use
+     */
+    public void setLongFileMode(int longFileMode) {
+        this.longFileMode = longFileMode;
+    }
+
+
+    /**
+     * Sets the debugging flag.
+     *
+     * @param debugF True to turn on debugging.
+     */
+    public void setDebug(boolean debugF) {
+        this.debug = debugF;
+    }
+
+    /**
+     * Sets the debugging flag in this stream's TarBuffer.
+     *
+     * @param debug True to turn on debugging.
+     */
+    public void setBufferDebug(boolean debug) {
+        buffer.setDebug(debug);
+    }
+
+    /**
+     * Ends the TAR archive without closing the underlying OutputStream.
+     * The result is that the two EOF records of nulls are written.
+     * @throws IOException on error
+     */
+    public void finish() throws IOException {
+        // See Bugzilla 28776 for a discussion on this
+        // http://issues.apache.org/bugzilla/show_bug.cgi?id=28776
+        writeEOFRecord();
+        writeEOFRecord();
+        buffer.flushBlock();
+    }
+
+    /**
+     * Ends the TAR archive and closes the underlying OutputStream.
+     * This means that finish() is called followed by calling the
+     * TarBuffer's close().
+     * @throws IOException on error
+     */
+    public void close() throws IOException {
+        if (!closed) {
+            finish();
+            buffer.close();
+            out.close();
+            closed = true;
+        }
+    }
+
+    /**
+     * Get the record size being used by this stream's TarBuffer.
+     *
+     * @return The TarBuffer record size.
+     */
+    public int getRecordSize() {
+        return buffer.getRecordSize();
+    }
+
+    /**
+     * Put an entry on the output stream. This writes the entry's
+     * header record and positions the output stream for writing
+     * the contents of the entry. Once this method is called, the
+     * stream is ready for calls to write() to write the entry's
+     * contents. Once the contents are written, closeEntry()
+     * <B>MUST</B> be called to ensure that all buffered data
+     * is completely written to the output stream.
+     *
+     * @param entry The TarEntry to be written to the archive.
+     * @throws IOException on error
+     */
+    public void putNextEntry(TarEntry entry) throws IOException {
+        if (entry.getName().length() >= TarConstants.NAMELEN) {
+
+            if (longFileMode == LONGFILE_GNU) {
+                // create a TarEntry for the LongLink, the contents
+                // of which are the entry's name
+                TarEntry longLinkEntry = new TarEntry(TarConstants.GNU_LONGLINK,
+                                                      TarConstants.LF_GNUTYPE_LONGNAME);
+
+                longLinkEntry.setSize(entry.getName().length() + 1);
+                putNextEntry(longLinkEntry);
+                write(entry.getName().getBytes());
+                write(0);
+                closeEntry();
+            } else if (longFileMode != LONGFILE_TRUNCATE) {
+                throw new RuntimeException("file name '" + entry.getName()
+                                             + "' is too long ( > "
+                                             + TarConstants.NAMELEN + " bytes)");
+            }
+        }
+
+        entry.writeEntryHeader(recordBuf);
+        buffer.writeRecord(recordBuf);
+
+        currBytes = 0;
+
+        if (entry.isDirectory()) {
+            currSize = 0;
+        } else {
+            currSize = entry.getSize();
+        }
+        currName = entry.getName();
+    }
+
+    /**
+     * Close an entry. This method MUST be called for all file
+     * entries that contain data. The reason is that we must
+     * buffer data written to the stream in order to satisfy
+     * the buffer's record based writes. Thus, there may be
+     * data fragments still being assembled that must be written
+     * to the output stream before this entry is closed and the
+     * next entry written.
+     * @throws IOException on error
+     */
+    public void closeEntry() throws IOException {
+        if (assemLen > 0) {
+            for (int i = assemLen; i < assemBuf.length; ++i) {
+                assemBuf[i] = 0;
+            }
+
+            buffer.writeRecord(assemBuf);
+
+            currBytes += assemLen;
+            assemLen = 0;
+        }
+
+        if (currBytes < currSize) {
+            throw new IOException("entry '" + currName + "' closed at '"
+                                  + currBytes
+                                  + "' before the '" + currSize
+                                  + "' bytes specified in the header were written");
+        }
+    }
+
+    /**
+     * Writes a byte to the current tar archive entry.
+     *
+     * This method simply calls read( byte[], int, int ).
+     *
+     * @param b The byte written.
+     * @throws IOException on error
+     */
+    public void write(int b) throws IOException {
+        oneBuf[0] = (byte) b;
+
+        write(oneBuf, 0, 1);
+    }
+
+    /**
+     * Writes bytes to the current tar archive entry.
+     *
+     * This method simply calls write( byte[], int, int ).
+     *
+     * @param wBuf The buffer to write to the archive.
+     * @throws IOException on error
+     */
+    public void write(byte[] wBuf) throws IOException {
+        write(wBuf, 0, wBuf.length);
+    }
+
+    /**
+     * Writes bytes to the current tar archive entry. This method
+     * is aware of the current entry and will throw an exception if
+     * you attempt to write bytes past the length specified for the
+     * current entry. The method is also (painfully) aware of the
+     * record buffering required by TarBuffer, and manages buffers
+     * that are not a multiple of recordsize in length, including
+     * assembling records from small buffers.
+     *
+     * @param wBuf The buffer to write to the archive.
+     * @param wOffset The offset in the buffer from which to get bytes.
+     * @param numToWrite The number of bytes to write.
+     * @throws IOException on error
+     */
+    public void write(byte[] wBuf, int wOffset, int numToWrite) throws IOException {
+        if ((currBytes + numToWrite) > currSize) {
+            throw new IOException("request to write '" + numToWrite
+                                  + "' bytes exceeds size in header of '"
+                                  + currSize + "' bytes for entry '"
+                                  + currName + "'");
+
+            //
+            // We have to deal with assembly!!!
+            // The programmer can be writing little 32 byte chunks for all
+            // we know, and we must assemble complete records for writing.
+            // REVIEW Maybe this should be in TarBuffer? Could that help to
+            // eliminate some of the buffer copying.
+            //
+        }
+
+        if (assemLen > 0) {
+            if ((assemLen + numToWrite) >= recordBuf.length) {
+                int aLen = recordBuf.length - assemLen;
+
+                System.arraycopy(assemBuf, 0, recordBuf, 0,
+                                 assemLen);
+                System.arraycopy(wBuf, wOffset, recordBuf,
+                                 assemLen, aLen);
+                buffer.writeRecord(recordBuf);
+
+                currBytes += recordBuf.length;
+                wOffset += aLen;
+                numToWrite -= aLen;
+                assemLen = 0;
+            } else {
+                System.arraycopy(wBuf, wOffset, assemBuf, assemLen,
+                                 numToWrite);
+
+                wOffset += numToWrite;
+                assemLen += numToWrite;
+                numToWrite = 0;
+            }
+        }
+
+        //
+        // When we get here we have EITHER:
+        // o An empty "assemble" buffer.
+        // o No bytes to write (numToWrite == 0)
+        //
+        while (numToWrite > 0) {
+            if (numToWrite < recordBuf.length) {
+                System.arraycopy(wBuf, wOffset, assemBuf, assemLen,
+                                 numToWrite);
+
+                assemLen += numToWrite;
+
+                break;
+            }
+
+            buffer.writeRecord(wBuf, wOffset);
+
+            int num = recordBuf.length;
+
+            currBytes += num;
+            numToWrite -= num;
+            wOffset += num;
+        }
+    }
+
+    /**
+     * Write an EOF (end of archive) record to the tar archive.
+     * An EOF record consists of a record of all zeros.
+     */
+    private void writeEOFRecord() throws IOException {
+        for (int i = 0; i < recordBuf.length; ++i) {
+            recordBuf[i] = 0;
+        }
+
+        buffer.writeRecord(recordBuf);
+    }
+}
+
+

Added: activemq/trunk/activemq-console/src/main/java/org/apache/activemq/console/command/store/tar/TarUtils.java
URL: http://svn.apache.org/viewvc/activemq/trunk/activemq-console/src/main/java/org/apache/activemq/console/command/store/tar/TarUtils.java?rev=1402652&view=auto
==============================================================================
--- activemq/trunk/activemq-console/src/main/java/org/apache/activemq/console/command/store/tar/TarUtils.java (added)
+++ activemq/trunk/activemq-console/src/main/java/org/apache/activemq/console/command/store/tar/TarUtils.java Fri Oct 26 20:28:49 2012
@@ -0,0 +1,206 @@
+/*
+ *  Licensed to the Apache Software Foundation (ASF) under one or more
+ *  contributor license agreements.  See the NOTICE file distributed with
+ *  this work for additional information regarding copyright ownership.
+ *  The ASF licenses this file to You under the Apache License, Version 2.0
+ *  (the "License"); you may not use this file except in compliance with
+ *  the License.  You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ *  Unless required by applicable law or agreed to in writing, software
+ *  distributed under the License is distributed on an "AS IS" BASIS,
+ *  WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ *  See the License for the specific language governing permissions and
+ *  limitations under the License.
+ *
+ */
+
+/*
+ * This package is based on the work done by Timothy Gerard Endres
+ * (time@ice.com) to whom the Ant project is very grateful for his great code.
+ */
+
+package org.apache.activemq.console.command.store.tar;
+
+/**
+ * This class provides static utility methods to work with byte streams.
+ *
+ */
+// CheckStyle:HideUtilityClassConstructorCheck OFF (bc)
+public class TarUtils {
+
+    private static final int BYTE_MASK = 255;
+
+    /**
+     * Parse an octal string from a header buffer. This is used for the
+     * file permission mode value.
+     *
+     * @param header The header buffer from which to parse.
+     * @param offset The offset into the buffer from which to parse.
+     * @param length The number of header bytes to parse.
+     * @return The long value of the octal string.
+     */
+    public static long parseOctal(byte[] header, int offset, int length) {
+        long    result = 0;
+        boolean stillPadding = true;
+        int     end = offset + length;
+
+        for (int i = offset; i < end; ++i) {
+            if (header[i] == 0) {
+                break;
+            }
+
+            if (header[i] == (byte) ' ' || header[i] == '0') {
+                if (stillPadding) {
+                    continue;
+                }
+
+                if (header[i] == (byte) ' ') {
+                    break;
+                }
+            }
+
+            stillPadding = false;
+            // CheckStyle:MagicNumber OFF
+            result = (result << 3) + (header[i] - '0');
+            // CheckStyle:MagicNumber ON
+        }
+
+        return result;
+    }
+
+    /**
+     * Parse an entry name from a header buffer.
+     *
+     * @param header The header buffer from which to parse.
+     * @param offset The offset into the buffer from which to parse.
+     * @param length The number of header bytes to parse.
+     * @return The header's entry name.
+     */
+    public static StringBuffer parseName(byte[] header, int offset, int length) {
+        StringBuffer result = new StringBuffer(length);
+        int          end = offset + length;
+
+        for (int i = offset; i < end; ++i) {
+            if (header[i] == 0) {
+                break;
+            }
+
+            result.append((char) header[i]);
+        }
+
+        return result;
+    }
+
+    /**
+     * Determine the number of bytes in an entry name.
+     *
+     * @param name The header name from which to parse.
+     * @param buf The buffer from which to parse.
+     * @param offset The offset into the buffer from which to parse.
+     * @param length The number of header bytes to parse.
+     * @return The number of bytes in a header's entry name.
+     */
+    public static int getNameBytes(StringBuffer name, byte[] buf, int offset, int length) {
+        int i;
+
+        for (i = 0; i < length && i < name.length(); ++i) {
+            buf[offset + i] = (byte) name.charAt(i);
+        }
+
+        for (; i < length; ++i) {
+            buf[offset + i] = 0;
+        }
+
+        return offset + length;
+    }
+
+    /**
+     * Parse an octal integer from a header buffer.
+     *
+     * @param value The header value
+     * @param buf The buffer from which to parse.
+     * @param offset The offset into the buffer from which to parse.
+     * @param length The number of header bytes to parse.
+     * @return The integer value of the octal bytes.
+     */
+    public static int getOctalBytes(long value, byte[] buf, int offset, int length) {
+        int    idx = length - 1;
+
+        buf[offset + idx] = 0;
+        --idx;
+        buf[offset + idx] = (byte) ' ';
+        --idx;
+
+        if (value == 0) {
+            buf[offset + idx] = (byte) '0';
+            --idx;
+        } else {
+            for (long val = value; idx >= 0 && val > 0; --idx) {
+                // CheckStyle:MagicNumber OFF
+                buf[offset + idx] = (byte) ((byte) '0' + (byte) (val & 7));
+                val = val >> 3;
+                // CheckStyle:MagicNumber ON
+            }
+        }
+
+        for (; idx >= 0; --idx) {
+            buf[offset + idx] = (byte) ' ';
+        }
+
+        return offset + length;
+    }
+
+    /**
+     * Parse an octal long integer from a header buffer.
+     *
+     * @param value The header value
+     * @param buf The buffer from which to parse.
+     * @param offset The offset into the buffer from which to parse.
+     * @param length The number of header bytes to parse.
+     * @return The long value of the octal bytes.
+     */
+    public static int getLongOctalBytes(long value, byte[] buf, int offset, int length) {
+        byte[] temp = new byte[length + 1];
+
+        getOctalBytes(value, temp, 0, length + 1);
+        System.arraycopy(temp, 0, buf, offset, length);
+
+        return offset + length;
+    }
+
+    /**
+     * Parse the checksum octal integer from a header buffer.
+     *
+     * @param value The header value
+     * @param buf The buffer from which to parse.
+     * @param offset The offset into the buffer from which to parse.
+     * @param length The number of header bytes to parse.
+     * @return The integer value of the entry's checksum.
+     */
+    public static int getCheckSumOctalBytes(long value, byte[] buf, int offset, int length) {
+        getOctalBytes(value, buf, offset, length);
+
+        buf[offset + length - 1] = (byte) ' ';
+        buf[offset + length - 2] = 0;
+
+        return offset + length;
+    }
+
+    /**
+     * Compute the checksum of a tar entry header.
+     *
+     * @param buf The tar entry's header buffer.
+     * @return The computed checksum.
+     */
+    public static long computeCheckSum(byte[] buf) {
+        long sum = 0;
+
+        for (int i = 0; i < buf.length; ++i) {
+            sum += BYTE_MASK & buf[i];
+        }
+
+        return sum;
+    }
+}

Added: activemq/trunk/activemq-console/src/main/proto/data.proto
URL: http://svn.apache.org/viewvc/activemq/trunk/activemq-console/src/main/proto/data.proto?rev=1402652&view=auto
==============================================================================
--- activemq/trunk/activemq-console/src/main/proto/data.proto (added)
+++ activemq/trunk/activemq-console/src/main/proto/data.proto Fri Oct 26 20:28:49 2012
@@ -0,0 +1,60 @@
+//
+// Licensed to the Apache Software Foundation (ASF) under one or more
+// contributor license agreements.  See the NOTICE file distributed with
+// this work for additional information regarding copyright ownership.
+// The ASF licenses this file to You under the Apache License, Version 2.0
+// (the "License"); you may not use this file except in compliance with
+// the License.  You may obtain a copy of the License at
+//
+//      http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing, software
+// distributed under the License is distributed on an "AS IS" BASIS,
+// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+// See the License for the specific language governing permissions and
+// limitations under the License.
+//
+package org.apache.activemq.console.command.store.proto;
+
+option java_multiple_files = true;
+
+///////////////////////////////////////////////////////////////
+// Message related operations.
+///////////////////////////////////////////////////////////////
+
+message MessagePB {
+  required int64 messageKey=1;
+  required bytes codec = 2 [java_override_type = "AsciiBuffer"];
+  optional int32 size = 3;
+  optional bytes value = 4;
+  optional sint64 expiration = 5;
+  optional int32 compression = 6;
+  
+  optional bytes direct_data = 10;
+  optional bytes direct_file = 12;
+  optional int64 direct_offset = 13;
+  optional int32 direct_size = 14;
+}
+
+message QueuePB {
+  required int64 key=1;
+  optional bytes binding_kind = 2 [java_override_type = "AsciiBuffer"];
+  optional bytes binding_data = 3;
+}
+
+message QueueEntryPB {
+  required int64 queueKey=1;
+  required int64 queueSeq=2;
+  required int64 messageKey=3;
+  optional int32 size=4;
+  optional bytes attachment=5;
+  optional int32 redeliveries = 6;
+  optional sint64 expiration=7;
+  optional bytes messageLocator=8;
+  repeated bytes sender=9;
+}
+
+message MapEntryPB {
+  required bytes key = 1;
+  optional bytes value = 2;
+}