You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@activemq.apache.org by ch...@apache.org on 2012/10/26 22:28:50 UTC

svn commit: r1402652 [1/2] - in /activemq/trunk/activemq-console: ./ src/main/java/org/apache/activemq/console/command/ src/main/java/org/apache/activemq/console/command/store/ src/main/java/org/apache/activemq/console/command/store/tar/ src/main/proto/

Author: chirino
Date: Fri Oct 26 20:28:49 2012
New Revision: 1402652

URL: http://svn.apache.org/viewvc?rev=1402652&view=rev
Log:
Implementing AMQ-4137 : Create a store import/export command line tool to covert between store types

Usage is a simple as "bin/activemq export --file=archive.tgz".  All data is stored in the tgz in the format defined by the Apollo store export/import utilities.  Only export implemented at this time.  I have verified that Apollo can import queues exported from ActiveMQ.  Durable subs probably need a little more work.

Added:
    activemq/trunk/activemq-console/src/main/java/org/apache/activemq/console/command/StoreExportCommand.java
    activemq/trunk/activemq-console/src/main/java/org/apache/activemq/console/command/store/ExportStreamManager.java
    activemq/trunk/activemq-console/src/main/java/org/apache/activemq/console/command/store/StoreExporter.java
    activemq/trunk/activemq-console/src/main/java/org/apache/activemq/console/command/store/tar/
    activemq/trunk/activemq-console/src/main/java/org/apache/activemq/console/command/store/tar/TarBuffer.java
    activemq/trunk/activemq-console/src/main/java/org/apache/activemq/console/command/store/tar/TarConstants.java
    activemq/trunk/activemq-console/src/main/java/org/apache/activemq/console/command/store/tar/TarEntry.java
    activemq/trunk/activemq-console/src/main/java/org/apache/activemq/console/command/store/tar/TarInputStream.java
    activemq/trunk/activemq-console/src/main/java/org/apache/activemq/console/command/store/tar/TarOutputStream.java
    activemq/trunk/activemq-console/src/main/java/org/apache/activemq/console/command/store/tar/TarUtils.java
    activemq/trunk/activemq-console/src/main/proto/
    activemq/trunk/activemq-console/src/main/proto/data.proto
Modified:
    activemq/trunk/activemq-console/pom.xml
    activemq/trunk/activemq-console/src/main/java/org/apache/activemq/console/command/ShellCommand.java

Modified: activemq/trunk/activemq-console/pom.xml
URL: http://svn.apache.org/viewvc/activemq/trunk/activemq-console/pom.xml?rev=1402652&r1=1402651&r2=1402652&view=diff
==============================================================================
--- activemq/trunk/activemq-console/pom.xml (original)
+++ activemq/trunk/activemq-console/pom.xml Fri Oct 26 20:28:49 2012
@@ -77,8 +77,28 @@
       <artifactId>xbean-spring</artifactId>
     </dependency>
 
+    <dependency>
+      <groupId>org.fusesource.hawtbuf</groupId>
+      <artifactId>hawtbuf-proto</artifactId>
+      <version>${hawtbuf-version}</version>
+    </dependency>
+    <dependency>
+      <groupId>org.slf4j</groupId>
+      <artifactId>slf4j-api</artifactId>
+    </dependency>
+    <dependency>
+      <groupId>org.codehaus.jackson</groupId>
+      <artifactId>jackson-core-asl</artifactId>
+      <version>${jackson-version}</version>
+    </dependency>
+    <dependency>
+      <groupId>org.codehaus.jackson</groupId>
+      <artifactId>jackson-mapper-asl</artifactId>
+      <version>${jackson-version}</version>
+    </dependency>
 
-	<!-- needed for TestPurgeCommand, but not for compile. -->
+
+    <!-- needed for TestPurgeCommand, but not for compile. -->
     <dependency>
       <groupId>org.springframework</groupId>
       <artifactId>spring-context</artifactId>
@@ -173,6 +193,22 @@
           </includes>
         </configuration>
       </plugin>
+      
+      <plugin>
+        <groupId>org.fusesource.hawtbuf</groupId>
+        <artifactId>hawtbuf-protoc</artifactId>
+        <version>${hawtbuf-version}</version>
+        <configuration>
+          <type>alt</type>
+        </configuration>
+         <executions>
+          <execution>
+            <goals>
+              <goal>compile</goal>
+            </goals>
+          </execution>
+        </executions>
+      </plugin>
 
     </plugins>
   </build>

Modified: activemq/trunk/activemq-console/src/main/java/org/apache/activemq/console/command/ShellCommand.java
URL: http://svn.apache.org/viewvc/activemq/trunk/activemq-console/src/main/java/org/apache/activemq/console/command/ShellCommand.java?rev=1402652&r1=1402651&r2=1402652&view=diff
==============================================================================
--- activemq/trunk/activemq-console/src/main/java/org/apache/activemq/console/command/ShellCommand.java (original)
+++ activemq/trunk/activemq-console/src/main/java/org/apache/activemq/console/command/ShellCommand.java Fri Oct 26 20:28:49 2012
@@ -48,6 +48,7 @@ public class ShellCommand extends Abstra
             "    query           - Display selected broker component's attributes and statistics.",
             "    browse          - Display selected messages in a specified destination.",
             "    journal-audit   - Allows you to view records stored in the persistent journal.",
+            "    export          - Exports a stopped brokers data files to an archive file",
             "    purge           - Delete selected destination's messages that matches the message selector",
             "    encrypt         - Encrypts given text",
             "    decrypt         - Decrypts given text",
@@ -137,6 +138,8 @@ public class ShellCommand extends Abstra
                 command = new EncryptCommand();
             } else if (taskToken.equals("decrypt")) {
                 command = new DecryptCommand();
+            } else if (taskToken.equals("export")) {
+                command = new StoreExportCommand();
             } else if (taskToken.equals("help")) {
                 printHelp();
             } else {

Added: activemq/trunk/activemq-console/src/main/java/org/apache/activemq/console/command/StoreExportCommand.java
URL: http://svn.apache.org/viewvc/activemq/trunk/activemq-console/src/main/java/org/apache/activemq/console/command/StoreExportCommand.java?rev=1402652&view=auto
==============================================================================
--- activemq/trunk/activemq-console/src/main/java/org/apache/activemq/console/command/StoreExportCommand.java (added)
+++ activemq/trunk/activemq-console/src/main/java/org/apache/activemq/console/command/StoreExportCommand.java Fri Oct 26 20:28:49 2012
@@ -0,0 +1,47 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.activemq.console.command;
+
+import org.apache.activemq.console.CommandContext;
+import org.apache.activemq.console.command.store.StoreExporter;
+import org.apache.activemq.console.command.store.amq.CommandLineSupport;
+
+import java.util.Arrays;
+import java.util.List;
+
+/**
+ * @author <a href="http://hiramchirino.com">Hiram Chirino</a>
+ */
+public class StoreExportCommand implements Command {
+
+    private CommandContext context;
+
+    @Override
+    public void setCommandContext(CommandContext context) {
+        this.context = context;
+    }
+
+    @Override
+    public void execute(List<String> tokens) throws Exception {
+        StoreExporter exporter = new StoreExporter();
+        String[] remaining = CommandLineSupport.setOptions(exporter, tokens.toArray(new String[tokens.size()]));
+        if (remaining.length > 0) {
+          throw new Exception("Unexpected arguments: " + Arrays.asList(remaining));
+        }
+        exporter.execute();
+    }
+}

Added: activemq/trunk/activemq-console/src/main/java/org/apache/activemq/console/command/store/ExportStreamManager.java
URL: http://svn.apache.org/viewvc/activemq/trunk/activemq-console/src/main/java/org/apache/activemq/console/command/store/ExportStreamManager.java?rev=1402652&view=auto
==============================================================================
--- activemq/trunk/activemq-console/src/main/java/org/apache/activemq/console/command/store/ExportStreamManager.java (added)
+++ activemq/trunk/activemq-console/src/main/java/org/apache/activemq/console/command/store/ExportStreamManager.java Fri Oct 26 20:28:49 2012
@@ -0,0 +1,85 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.activemq.console.command.store;
+
+import org.apache.activemq.console.command.store.proto.*;
+import org.apache.activemq.console.command.store.tar.TarEntry;
+import org.apache.activemq.console.command.store.tar.TarOutputStream;
+import org.fusesource.hawtbuf.AsciiBuffer;
+import org.fusesource.hawtbuf.Buffer;
+import org.fusesource.hawtbuf.proto.MessageBuffer;
+
+import java.io.IOException;
+import java.io.OutputStream;
+import java.util.zip.GZIPOutputStream;
+
+/**
+ * @author <a href="http://hiramchirino.com">Hiram Chirino</a>
+ */
+public class ExportStreamManager {
+
+    private final OutputStream target;
+    private final int version;
+    TarOutputStream stream;
+
+    ExportStreamManager(OutputStream target, int version) throws IOException {
+        this.target = target;
+        this.version = version;
+        stream = new TarOutputStream(new GZIPOutputStream(target));
+        store("ver", new AsciiBuffer(""+version));
+    }
+
+
+    long seq = 0;
+
+    public void finish() throws IOException {
+        stream.close();
+    }
+
+    private void store(String ext, Buffer value) throws IOException {
+        TarEntry entry = new TarEntry(seq + "." + ext);
+        seq += 1;
+        entry.setSize(value.length());
+        stream.putNextEntry(entry);
+        value.writeTo(stream);
+        stream.closeEntry();
+    }
+
+    private void store(String ext, MessageBuffer<?,?> value) throws IOException {
+        TarEntry entry = new TarEntry(seq + "." + ext);
+      seq += 1;
+      entry.setSize(value.serializedSizeFramed());
+      stream.putNextEntry(entry);
+      value.writeFramed(stream);
+      stream.closeEntry();
+    }
+
+
+    public void store_queue(QueuePB.Getter value) throws IOException {
+      store("que", value.freeze());
+    }
+    public void store_queue_entry(QueueEntryPB.Getter value) throws IOException {
+      store("qen", value.freeze());
+    }
+    public void store_message(MessagePB.Getter value) throws IOException {
+      store("msg", value.freeze());
+    }
+    public void store_map_entry(MapEntryPB.Getter value) throws IOException {
+      store("map", value.freeze());
+    }
+
+}

Added: activemq/trunk/activemq-console/src/main/java/org/apache/activemq/console/command/store/StoreExporter.java
URL: http://svn.apache.org/viewvc/activemq/trunk/activemq-console/src/main/java/org/apache/activemq/console/command/store/StoreExporter.java?rev=1402652&view=auto
==============================================================================
--- activemq/trunk/activemq-console/src/main/java/org/apache/activemq/console/command/store/StoreExporter.java (added)
+++ activemq/trunk/activemq-console/src/main/java/org/apache/activemq/console/command/store/StoreExporter.java Fri Oct 26 20:28:49 2012
@@ -0,0 +1,273 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.activemq.console.command.store;
+
+import org.apache.activemq.broker.BrokerFactory;
+import org.apache.activemq.broker.BrokerService;
+import org.apache.activemq.command.*;
+import org.apache.activemq.console.command.store.proto.MessagePB;
+import org.apache.activemq.console.command.store.proto.QueueEntryPB;
+import org.apache.activemq.console.command.store.proto.QueuePB;
+import org.apache.activemq.openwire.OpenWireFormat;
+import org.apache.activemq.store.*;
+import org.apache.activemq.xbean.XBeanBrokerFactory;
+import org.codehaus.jackson.map.ObjectMapper;
+import org.fusesource.hawtbuf.AsciiBuffer;
+import org.fusesource.hawtbuf.DataByteArrayOutputStream;
+import org.fusesource.hawtbuf.UTF8Buffer;
+
+import java.io.BufferedOutputStream;
+import java.io.File;
+import java.io.FileOutputStream;
+import java.io.IOException;
+import java.net.URI;
+import java.net.URISyntaxException;
+import java.util.HashMap;
+
+/**
+ * @author <a href="http://hiramchirino.com">Hiram Chirino</a>
+ */
+public class StoreExporter {
+
+    URI config;
+    File file;
+
+    public StoreExporter() throws URISyntaxException {
+        config = new URI("xbean:activemq.xml");
+    }
+
+    public void execute() throws Exception {
+        if (config == null) {
+            throw new Exception("required --config option missing");
+        }
+        if (file == null) {
+            throw new Exception("required --file option missing");
+        }
+        System.out.println("Loading: " + config);
+        XBeanBrokerFactory.setStartDefault(false); // to avoid the broker auto-starting..
+        BrokerService broker = BrokerFactory.createBroker(config);
+        XBeanBrokerFactory.resetStartDefault();
+        PersistenceAdapter store = broker.getPersistenceAdapter();
+        System.out.println("Starting: " + store);
+        store.start();
+        try {
+            BufferedOutputStream fos = new BufferedOutputStream(new FileOutputStream(file));
+            try {
+                export(store, fos);
+            } finally {
+                fos.close();
+            }
+        } finally {
+            store.stop();
+        }
+    }
+
+    static final int OPENWIRE_VERSION = 8;
+    static final boolean TIGHT_ENCODING = false;
+
+    void export(PersistenceAdapter store, BufferedOutputStream fos) throws Exception {
+
+        ObjectMapper mapper = new ObjectMapper();
+        final AsciiBuffer ds_kind = new AsciiBuffer("ds");
+        final AsciiBuffer ptp_kind = new AsciiBuffer("ptp");
+        final AsciiBuffer codec_id = new AsciiBuffer("openwire");
+        final OpenWireFormat wireformat = new OpenWireFormat();
+        wireformat.setCacheEnabled(false);
+        wireformat.setTightEncodingEnabled(TIGHT_ENCODING);
+        wireformat.setVersion(OPENWIRE_VERSION);
+
+        final long[] messageKeyCounter = new long[]{0};
+        final long[] containerKeyCounter = new long[]{0};
+        final ExportStreamManager manager = new ExportStreamManager(fos, 1);
+
+
+        final int[] preparedTxs = new int[]{0};
+        store.createTransactionStore().recover(new TransactionRecoveryListener() {
+            public void recover(XATransactionId xid, Message[] addedMessages, MessageAck[] aks) {
+                preparedTxs[0] += 1;
+            }
+        });
+
+        if (preparedTxs[0] > 0) {
+            throw new Exception("Cannot export a store with prepared XA transactions.  Please commit or rollback those transactions before attempting to export.");
+        }
+
+        for (ActiveMQDestination odest : store.getDestinations()) {
+            containerKeyCounter[0]++;
+            if (odest instanceof ActiveMQQueue) {
+                ActiveMQQueue dest = (ActiveMQQueue) odest;
+                MessageStore queue = store.createQueueMessageStore(dest);
+
+                QueuePB.Bean destRecord = new QueuePB.Bean();
+                destRecord.setKey(containerKeyCounter[0]);
+                destRecord.setBindingKind(ptp_kind);
+
+                final long[] seqKeyCounter = new long[]{0};
+
+                HashMap<String, Object> jsonMap = new HashMap<String, Object>();
+                jsonMap.put("@class", "queue_destination");
+                jsonMap.put("name", dest.getQueueName());
+                String json = mapper.writeValueAsString(jsonMap);
+                System.out.println(json);
+                destRecord.setBindingData(new UTF8Buffer(json));
+                manager.store_queue(destRecord);
+
+                queue.recover(new MessageRecoveryListener() {
+                    public boolean hasSpace() {
+                        return true;
+                    }
+
+                    public boolean recoverMessageReference(MessageId ref) throws Exception {
+                        return true;
+                    }
+
+                    public boolean isDuplicate(MessageId ref) {
+                        return false;
+                    }
+
+                    public boolean recoverMessage(Message message) throws IOException {
+                        messageKeyCounter[0]++;
+                        seqKeyCounter[0]++;
+
+                        DataByteArrayOutputStream mos = new DataByteArrayOutputStream();
+                        mos.writeBoolean(TIGHT_ENCODING);
+                        mos.writeVarInt(OPENWIRE_VERSION);
+                        wireformat.marshal(message, mos);
+
+                        MessagePB.Bean messageRecord = new MessagePB.Bean();
+                        messageRecord.setCodec(codec_id);
+                        messageRecord.setMessageKey(messageKeyCounter[0]);
+                        messageRecord.setSize(message.getSize());
+                        messageRecord.setValue(mos.toBuffer());
+                        //                record.setCompression()
+                        manager.store_message(messageRecord);
+
+                        QueueEntryPB.Bean entryRecord = new QueueEntryPB.Bean();
+                        entryRecord.setQueueKey(containerKeyCounter[0]);
+                        entryRecord.setQueueSeq(seqKeyCounter[0]);
+                        entryRecord.setMessageKey(messageKeyCounter[0]);
+                        entryRecord.setSize(message.getSize());
+                        if (message.getExpiration() != 0) {
+                            entryRecord.setExpiration(message.getExpiration());
+                        }
+                        if (message.getRedeliveryCounter() != 0) {
+                            entryRecord.setRedeliveries(message.getRedeliveryCounter());
+                        }
+                        manager.store_queue_entry(entryRecord);
+                        return true;
+                    }
+                });
+
+            } else if (odest instanceof ActiveMQTopic) {
+                ActiveMQTopic dest = (ActiveMQTopic) odest;
+
+                TopicMessageStore topic = store.createTopicMessageStore(dest);
+                for (SubscriptionInfo sub : topic.getAllSubscriptions()) {
+
+                    QueuePB.Bean destRecord = new QueuePB.Bean();
+                    destRecord.setKey(containerKeyCounter[0]);
+                    destRecord.setBindingKind(ds_kind);
+
+                    // TODO: use a real JSON encoder like jackson.
+                    HashMap<String, Object> jsonMap = new HashMap<String, Object>();
+                    jsonMap.put("@class", "dsub_destination");
+                    jsonMap.put("name", sub.getClientId() + ":" + sub.getSubcriptionName());
+                    HashMap<String, Object> jsonTopic = new HashMap<String, Object>();
+                    jsonTopic.put("name", dest.getTopicName());
+                    jsonMap.put("topics", new Object[]{jsonTopic});
+                    if (sub.getSelector() != null) {
+                        jsonMap.put("selector", sub.getSelector());
+                    }
+                    String json = mapper.writeValueAsString(jsonMap);
+                    System.out.println(json);
+
+                    destRecord.setBindingData(new UTF8Buffer(json));
+                    manager.store_queue(destRecord);
+
+                    final long seqKeyCounter[] = new long[]{0};
+                    topic.recoverSubscription(sub.getClientId(), sub.getSubcriptionName(), new MessageRecoveryListener() {
+                        public boolean hasSpace() {
+                            return true;
+                        }
+
+                        public boolean recoverMessageReference(MessageId ref) throws Exception {
+                            return true;
+                        }
+
+                        public boolean isDuplicate(MessageId ref) {
+                            return false;
+                        }
+
+                        public boolean recoverMessage(Message message) throws IOException {
+                            messageKeyCounter[0]++;
+                            seqKeyCounter[0]++;
+
+                            DataByteArrayOutputStream mos = new DataByteArrayOutputStream();
+                            mos.writeBoolean(TIGHT_ENCODING);
+                            mos.writeVarInt(OPENWIRE_VERSION);
+                            wireformat.marshal(mos);
+
+                            MessagePB.Bean messageRecord = new MessagePB.Bean();
+                            messageRecord.setCodec(codec_id);
+                            messageRecord.setMessageKey(messageKeyCounter[0]);
+                            messageRecord.setSize(message.getSize());
+                            messageRecord.setValue(mos.toBuffer());
+                            //                record.setCompression()
+                            manager.store_message(messageRecord);
+
+                            QueueEntryPB.Bean entryRecord = new QueueEntryPB.Bean();
+                            entryRecord.setQueueKey(containerKeyCounter[0]);
+                            entryRecord.setQueueSeq(seqKeyCounter[0]);
+                            entryRecord.setMessageKey(messageKeyCounter[0]);
+                            entryRecord.setSize(message.getSize());
+                            if (message.getExpiration() != 0) {
+                                entryRecord.setExpiration(message.getExpiration());
+                            }
+                            if (message.getRedeliveryCounter() != 0) {
+                                entryRecord.setRedeliveries(message.getRedeliveryCounter());
+                            }
+                            manager.store_queue_entry(entryRecord);
+                            return true;
+                        }
+                    });
+
+                }
+            }
+        }
+        manager.finish();
+    }
+
+    public File getFile() {
+        return file;
+    }
+
+    public void setFile(String file) {
+        setFile(new File(file));
+    }
+
+    public void setFile(File file) {
+        this.file = file;
+    }
+
+    public URI getConfig() {
+        return config;
+    }
+
+    public void setConfig(URI config) {
+        this.config = config;
+    }
+}

Added: activemq/trunk/activemq-console/src/main/java/org/apache/activemq/console/command/store/tar/TarBuffer.java
URL: http://svn.apache.org/viewvc/activemq/trunk/activemq-console/src/main/java/org/apache/activemq/console/command/store/tar/TarBuffer.java?rev=1402652&view=auto
==============================================================================
--- activemq/trunk/activemq-console/src/main/java/org/apache/activemq/console/command/store/tar/TarBuffer.java (added)
+++ activemq/trunk/activemq-console/src/main/java/org/apache/activemq/console/command/store/tar/TarBuffer.java Fri Oct 26 20:28:49 2012
@@ -0,0 +1,462 @@
+/*
+ *  Licensed to the Apache Software Foundation (ASF) under one or more
+ *  contributor license agreements.  See the NOTICE file distributed with
+ *  this work for additional information regarding copyright ownership.
+ *  The ASF licenses this file to You under the Apache License, Version 2.0
+ *  (the "License"); you may not use this file except in compliance with
+ *  the License.  You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ *  Unless required by applicable law or agreed to in writing, software
+ *  distributed under the License is distributed on an "AS IS" BASIS,
+ *  WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ *  See the License for the specific language governing permissions and
+ *  limitations under the License.
+ *
+ */
+
+/*
+ * This package is based on the work done by Timothy Gerard Endres
+ * (time@ice.com) to whom the Ant project is very grateful for his great code.
+ */
+
+package org.apache.activemq.console.command.store.tar;
+
+import java.io.InputStream;
+import java.io.OutputStream;
+import java.io.IOException;
+import java.util.Arrays;
+
+/**
+ * The TarBuffer class implements the tar archive concept
+ * of a buffered input stream. This concept goes back to the
+ * days of blocked tape drives and special io devices. In the
+ * Java universe, the only real function that this class
+ * performs is to ensure that files have the correct "block"
+ * size, or other tars will complain.
+ * <p>
+ * You should never have a need to access this class directly.
+ * TarBuffers are created by Tar IO Streams.
+ *
+ */
+
+public class TarBuffer {
+
+    /** Default record size */
+    public static final int DEFAULT_RCDSIZE = (512);
+
+    /** Default block size */
+    public static final int DEFAULT_BLKSIZE = (DEFAULT_RCDSIZE * 20);
+
+    private InputStream     inStream;
+    private OutputStream    outStream;
+    private byte[]          blockBuffer;
+    private int             currBlkIdx;
+    private int             currRecIdx;
+    private int             blockSize;
+    private int             recordSize;
+    private int             recsPerBlock;
+    private boolean         debug;
+
+    /**
+     * Constructor for a TarBuffer on an input stream.
+     * @param inStream the input stream to use
+     */
+    public TarBuffer(InputStream inStream) {
+        this(inStream, TarBuffer.DEFAULT_BLKSIZE);
+    }
+
+    /**
+     * Constructor for a TarBuffer on an input stream.
+     * @param inStream the input stream to use
+     * @param blockSize the block size to use
+     */
+    public TarBuffer(InputStream inStream, int blockSize) {
+        this(inStream, blockSize, TarBuffer.DEFAULT_RCDSIZE);
+    }
+
+    /**
+     * Constructor for a TarBuffer on an input stream.
+     * @param inStream the input stream to use
+     * @param blockSize the block size to use
+     * @param recordSize the record size to use
+     */
+    public TarBuffer(InputStream inStream, int blockSize, int recordSize) {
+        this.inStream = inStream;
+        this.outStream = null;
+
+        this.initialize(blockSize, recordSize);
+    }
+
+    /**
+     * Constructor for a TarBuffer on an output stream.
+     * @param outStream the output stream to use
+     */
+    public TarBuffer(OutputStream outStream) {
+        this(outStream, TarBuffer.DEFAULT_BLKSIZE);
+    }
+
+    /**
+     * Constructor for a TarBuffer on an output stream.
+     * @param outStream the output stream to use
+     * @param blockSize the block size to use
+     */
+    public TarBuffer(OutputStream outStream, int blockSize) {
+        this(outStream, blockSize, TarBuffer.DEFAULT_RCDSIZE);
+    }
+
+    /**
+     * Constructor for a TarBuffer on an output stream.
+     * @param outStream the output stream to use
+     * @param blockSize the block size to use
+     * @param recordSize the record size to use
+     */
+    public TarBuffer(OutputStream outStream, int blockSize, int recordSize) {
+        this.inStream = null;
+        this.outStream = outStream;
+
+        this.initialize(blockSize, recordSize);
+    }
+
+    /**
+     * Initialization common to all constructors.
+     */
+    private void initialize(int blockSize, int recordSize) {
+        this.debug = false;
+        this.blockSize = blockSize;
+        this.recordSize = recordSize;
+        this.recsPerBlock = (this.blockSize / this.recordSize);
+        this.blockBuffer = new byte[this.blockSize];
+
+        if (this.inStream != null) {
+            this.currBlkIdx = -1;
+            this.currRecIdx = this.recsPerBlock;
+        } else {
+            this.currBlkIdx = 0;
+            this.currRecIdx = 0;
+        }
+    }
+
+    /**
+     * Get the TAR Buffer's block size. Blocks consist of multiple records.
+     * @return the block size
+     */
+    public int getBlockSize() {
+        return this.blockSize;
+    }
+
+    /**
+     * Get the TAR Buffer's record size.
+     * @return the record size
+     */
+    public int getRecordSize() {
+        return this.recordSize;
+    }
+
+    /**
+     * Set the debugging flag for the buffer.
+     *
+     * @param debug If true, print debugging output.
+     */
+    public void setDebug(boolean debug) {
+        this.debug = debug;
+    }
+
+    /**
+     * Determine if an archive record indicate End of Archive. End of
+     * archive is indicated by a record that consists entirely of null bytes.
+     *
+     * @param record The record data to check.
+     * @return true if the record data is an End of Archive
+     */
+    public boolean isEOFRecord(byte[] record) {
+        for (int i = 0, sz = getRecordSize(); i < sz; ++i) {
+            if (record[i] != 0) {
+                return false;
+            }
+        }
+
+        return true;
+    }
+
+    /**
+     * Skip over a record on the input stream.
+     * @throws IOException on error
+     */
+    public void skipRecord() throws IOException {
+        if (debug) {
+            System.err.println("SkipRecord: recIdx = " + currRecIdx
+                               + " blkIdx = " + currBlkIdx);
+        }
+
+        if (inStream == null) {
+            throw new IOException("reading (via skip) from an output buffer");
+        }
+
+        if (currRecIdx >= recsPerBlock) {
+            if (!readBlock()) {
+                return;    // UNDONE
+            }
+        }
+
+        currRecIdx++;
+    }
+
+    /**
+     * Read a record from the input stream and return the data.
+     *
+     * @return The record data.
+     * @throws IOException on error
+     */
+    public byte[] readRecord() throws IOException {
+        if (debug) {
+            System.err.println("ReadRecord: recIdx = " + currRecIdx
+                               + " blkIdx = " + currBlkIdx);
+        }
+
+        if (inStream == null) {
+            throw new IOException("reading from an output buffer");
+        }
+
+        if (currRecIdx >= recsPerBlock) {
+            if (!readBlock()) {
+                return null;
+            }
+        }
+
+        byte[] result = new byte[recordSize];
+
+        System.arraycopy(blockBuffer,
+                         (currRecIdx * recordSize), result, 0,
+                         recordSize);
+
+        currRecIdx++;
+
+        return result;
+    }
+
+    /**
+     * @return false if End-Of-File, else true
+     */
+    private boolean readBlock() throws IOException {
+        if (debug) {
+            System.err.println("ReadBlock: blkIdx = " + currBlkIdx);
+        }
+
+        if (inStream == null) {
+            throw new IOException("reading from an output buffer");
+        }
+
+        currRecIdx = 0;
+
+        int offset = 0;
+        int bytesNeeded = blockSize;
+
+        while (bytesNeeded > 0) {
+            long numBytes = inStream.read(blockBuffer, offset,
+                                               bytesNeeded);
+
+            //
+            // NOTE
+            // We have fit EOF, and the block is not full!
+            //
+            // This is a broken archive. It does not follow the standard
+            // blocking algorithm. However, because we are generous, and
+            // it requires little effort, we will simply ignore the error
+            // and continue as if the entire block were read. This does
+            // not appear to break anything upstream. We used to return
+            // false in this case.
+            //
+            // Thanks to 'Yohann.Roussel@alcatel.fr' for this fix.
+            //
+            if (numBytes == -1) {
+                if (offset == 0) {
+                    // Ensure that we do not read gigabytes of zeros
+                    // for a corrupt tar file.
+                    // See http://issues.apache.org/bugzilla/show_bug.cgi?id=39924
+                    return false;
+                }
+                // However, just leaving the unread portion of the buffer dirty does
+                // cause problems in some cases.  This problem is described in
+                // http://issues.apache.org/bugzilla/show_bug.cgi?id=29877
+                //
+                // The solution is to fill the unused portion of the buffer with zeros.
+
+                Arrays.fill(blockBuffer, offset, offset + bytesNeeded, (byte) 0);
+
+                break;
+            }
+
+            offset += numBytes;
+            bytesNeeded -= numBytes;
+
+            if (numBytes != blockSize) {
+                if (debug) {
+                    System.err.println("ReadBlock: INCOMPLETE READ "
+                                       + numBytes + " of " + blockSize
+                                       + " bytes read.");
+                }
+            }
+        }
+
+        currBlkIdx++;
+
+        return true;
+    }
+
+    /**
+     * Get the current block number, zero based.
+     *
+     * @return The current zero based block number.
+     */
+    public int getCurrentBlockNum() {
+        return currBlkIdx;
+    }
+
+    /**
+     * Get the current record number, within the current block, zero based.
+     * Thus, current offset = (currentBlockNum * recsPerBlk) + currentRecNum.
+     *
+     * @return The current zero based record number.
+     */
+    public int getCurrentRecordNum() {
+        return currRecIdx - 1;
+    }
+
+    /**
+     * Write an archive record to the archive.
+     *
+     * @param record The record data to write to the archive.
+     * @throws IOException on error
+     */
+    public void writeRecord(byte[] record) throws IOException {
+        if (debug) {
+            System.err.println("WriteRecord: recIdx = " + currRecIdx
+                               + " blkIdx = " + currBlkIdx);
+        }
+
+        if (outStream == null) {
+            throw new IOException("writing to an input buffer");
+        }
+
+        if (record.length != recordSize) {
+            throw new IOException("record to write has length '"
+                                  + record.length
+                                  + "' which is not the record size of '"
+                                  + recordSize + "'");
+        }
+
+        if (currRecIdx >= recsPerBlock) {
+            writeBlock();
+        }
+
+        System.arraycopy(record, 0, blockBuffer,
+                         (currRecIdx * recordSize),
+                         recordSize);
+
+        currRecIdx++;
+    }
+
+    /**
+     * Write an archive record to the archive, where the record may be
+     * inside of a larger array buffer. The buffer must be "offset plus
+     * record size" long.
+     *
+     * @param buf The buffer containing the record data to write.
+     * @param offset The offset of the record data within buf.
+     * @throws IOException on error
+     */
+    public void writeRecord(byte[] buf, int offset) throws IOException {
+        if (debug) {
+            System.err.println("WriteRecord: recIdx = " + currRecIdx
+                               + " blkIdx = " + currBlkIdx);
+        }
+
+        if (outStream == null) {
+            throw new IOException("writing to an input buffer");
+        }
+
+        if ((offset + recordSize) > buf.length) {
+            throw new IOException("record has length '" + buf.length
+                                  + "' with offset '" + offset
+                                  + "' which is less than the record size of '"
+                                  + recordSize + "'");
+        }
+
+        if (currRecIdx >= recsPerBlock) {
+            writeBlock();
+        }
+
+        System.arraycopy(buf, offset, blockBuffer,
+                         (currRecIdx * recordSize),
+                         recordSize);
+
+        currRecIdx++;
+    }
+
+    /**
+     * Write a TarBuffer block to the archive.
+     */
+    private void writeBlock() throws IOException {
+        if (debug) {
+            System.err.println("WriteBlock: blkIdx = " + currBlkIdx);
+        }
+
+        if (outStream == null) {
+            throw new IOException("writing to an input buffer");
+        }
+
+        outStream.write(blockBuffer, 0, blockSize);
+        outStream.flush();
+
+        currRecIdx = 0;
+        currBlkIdx++;
+        Arrays.fill(blockBuffer, (byte) 0);
+    }
+
+    /**
+     * Flush the current data block if it has any data in it.
+     */
+    void flushBlock() throws IOException {
+        if (debug) {
+            System.err.println("TarBuffer.flushBlock() called.");
+        }
+
+        if (outStream == null) {
+            throw new IOException("writing to an input buffer");
+        }
+
+        if (currRecIdx > 0) {
+            writeBlock();
+        }
+    }
+
+    /**
+     * Close the TarBuffer. If this is an output buffer, also flush the
+     * current block before closing.
+     * @throws IOException on error
+     */
+    public void close() throws IOException {
+        if (debug) {
+            System.err.println("TarBuffer.closeBuffer().");
+        }
+
+        if (outStream != null) {
+            flushBlock();
+
+            if (outStream != System.out
+                    && outStream != System.err) {
+                outStream.close();
+
+                outStream = null;
+            }
+        } else if (inStream != null) {
+            if (inStream != System.in) {
+                inStream.close();
+
+                inStream = null;
+            }
+        }
+    }
+}

Added: activemq/trunk/activemq-console/src/main/java/org/apache/activemq/console/command/store/tar/TarConstants.java
URL: http://svn.apache.org/viewvc/activemq/trunk/activemq-console/src/main/java/org/apache/activemq/console/command/store/tar/TarConstants.java?rev=1402652&view=auto
==============================================================================
--- activemq/trunk/activemq-console/src/main/java/org/apache/activemq/console/command/store/tar/TarConstants.java (added)
+++ activemq/trunk/activemq-console/src/main/java/org/apache/activemq/console/command/store/tar/TarConstants.java Fri Oct 26 20:28:49 2012
@@ -0,0 +1,158 @@
+/*
+ *  Licensed to the Apache Software Foundation (ASF) under one or more
+ *  contributor license agreements.  See the NOTICE file distributed with
+ *  this work for additional information regarding copyright ownership.
+ *  The ASF licenses this file to You under the Apache License, Version 2.0
+ *  (the "License"); you may not use this file except in compliance with
+ *  the License.  You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ *  Unless required by applicable law or agreed to in writing, software
+ *  distributed under the License is distributed on an "AS IS" BASIS,
+ *  WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ *  See the License for the specific language governing permissions and
+ *  limitations under the License.
+ *
+ */
+
+/*
+ * This package is based on the work done by Timothy Gerard Endres
+ * (time@ice.com) to whom the Ant project is very grateful for his great code.
+ */
+
+package org.apache.activemq.console.command.store.tar;
+
+/**
+ * This interface contains all the definitions used in the package.
+ *
+ */
+// CheckStyle:InterfaceIsTypeCheck OFF (bc)
+public interface TarConstants {
+
+    /**
+     * The length of the name field in a header buffer.
+     */
+    int    NAMELEN = 100;
+
+    /**
+     * The length of the mode field in a header buffer.
+     */
+    int    MODELEN = 8;
+
+    /**
+     * The length of the user id field in a header buffer.
+     */
+    int    UIDLEN = 8;
+
+    /**
+     * The length of the group id field in a header buffer.
+     */
+    int    GIDLEN = 8;
+
+    /**
+     * The length of the checksum field in a header buffer.
+     */
+    int    CHKSUMLEN = 8;
+
+    /**
+     * The length of the size field in a header buffer.
+     */
+    int    SIZELEN = 12;
+
+    /**
+     * The maximum size of a file in a tar archive (That's 11 sevens, octal).
+     */
+    long   MAXSIZE = 077777777777L;
+
+    /**
+     * The length of the magic field in a header buffer.
+     */
+    int    MAGICLEN = 8;
+
+    /**
+     * The length of the modification time field in a header buffer.
+     */
+    int    MODTIMELEN = 12;
+
+    /**
+     * The length of the user name field in a header buffer.
+     */
+    int    UNAMELEN = 32;
+
+    /**
+     * The length of the group name field in a header buffer.
+     */
+    int    GNAMELEN = 32;
+
+    /**
+     * The length of the devices field in a header buffer.
+     */
+    int    DEVLEN = 8;
+
+    /**
+     * LF_ constants represent the "link flag" of an entry, or more commonly,
+     * the "entry type". This is the "old way" of indicating a normal file.
+     */
+    byte   LF_OLDNORM = 0;
+
+    /**
+     * Normal file type.
+     */
+    byte   LF_NORMAL = (byte) '0';
+
+    /**
+     * Link file type.
+     */
+    byte   LF_LINK = (byte) '1';
+
+    /**
+     * Symbolic link file type.
+     */
+    byte   LF_SYMLINK = (byte) '2';
+
+    /**
+     * Character device file type.
+     */
+    byte   LF_CHR = (byte) '3';
+
+    /**
+     * Block device file type.
+     */
+    byte   LF_BLK = (byte) '4';
+
+    /**
+     * Directory file type.
+     */
+    byte   LF_DIR = (byte) '5';
+
+    /**
+     * FIFO (pipe) file type.
+     */
+    byte   LF_FIFO = (byte) '6';
+
+    /**
+     * Contiguous file type.
+     */
+    byte   LF_CONTIG = (byte) '7';
+
+    /**
+     * The magic tag representing a POSIX tar archive.
+     */
+    String TMAGIC = "ustar";
+
+    /**
+     * The magic tag representing a GNU tar archive.
+     */
+    String GNU_TMAGIC = "ustar  ";
+
+    /**
+     * The namr of the GNU tar entry which contains a long name.
+     */
+    String GNU_LONGLINK = "././@LongLink";
+
+    /**
+     * Identifies the *next* file on the tape as having a long name.
+     */
+    byte LF_GNUTYPE_LONGNAME = (byte) 'L';
+}

Added: activemq/trunk/activemq-console/src/main/java/org/apache/activemq/console/command/store/tar/TarEntry.java
URL: http://svn.apache.org/viewvc/activemq/trunk/activemq-console/src/main/java/org/apache/activemq/console/command/store/tar/TarEntry.java?rev=1402652&view=auto
==============================================================================
--- activemq/trunk/activemq-console/src/main/java/org/apache/activemq/console/command/store/tar/TarEntry.java (added)
+++ activemq/trunk/activemq-console/src/main/java/org/apache/activemq/console/command/store/tar/TarEntry.java Fri Oct 26 20:28:49 2012
@@ -0,0 +1,664 @@
+/*
+ *  Licensed to the Apache Software Foundation (ASF) under one or more
+ *  contributor license agreements.  See the NOTICE file distributed with
+ *  this work for additional information regarding copyright ownership.
+ *  The ASF licenses this file to You under the Apache License, Version 2.0
+ *  (the "License"); you may not use this file except in compliance with
+ *  the License.  You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ *  Unless required by applicable law or agreed to in writing, software
+ *  distributed under the License is distributed on an "AS IS" BASIS,
+ *  WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ *  See the License for the specific language governing permissions and
+ *  limitations under the License.
+ *
+ */
+
+/*
+ * This package is based on the work done by Timothy Gerard Endres
+ * (time@ice.com) to whom the Ant project is very grateful for his great code.
+ */
+
+package org.apache.activemq.console.command.store.tar;
+
+import java.io.File;
+import java.util.Date;
+import java.util.Locale;
+
+/**
+ * This class represents an entry in a Tar archive. It consists
+ * of the entry's header, as well as the entry's File. Entries
+ * can be instantiated in one of three ways, depending on how
+ * they are to be used.
+ * <p>
+ * TarEntries that are created from the header bytes read from
+ * an archive are instantiated with the TarEntry( byte[] )
+ * constructor. These entries will be used when extracting from
+ * or listing the contents of an archive. These entries have their
+ * header filled in using the header bytes. They also set the File
+ * to null, since they reference an archive entry not a file.
+ * <p>
+ * TarEntries that are created from Files that are to be written
+ * into an archive are instantiated with the TarEntry( File )
+ * constructor. These entries have their header filled in using
+ * the File's information. They also keep a reference to the File
+ * for convenience when writing entries.
+ * <p>
+ * Finally, TarEntries can be constructed from nothing but a name.
+ * This allows the programmer to construct the entry by hand, for
+ * instance when only an InputStream is available for writing to
+ * the archive, and the header information is constructed from
+ * other information. In this case the header fields are set to
+ * defaults and the File is set to null.
+ *
+ * <p>
+ * The C structure for a Tar Entry's header is:
+ * <pre>
+ * struct header {
+ * char name[NAMSIZ];
+ * char mode[8];
+ * char uid[8];
+ * char gid[8];
+ * char size[12];
+ * char mtime[12];
+ * char chksum[8];
+ * char linkflag;
+ * char linkname[NAMSIZ];
+ * char magic[8];
+ * char uname[TUNMLEN];
+ * char gname[TGNMLEN];
+ * char devmajor[8];
+ * char devminor[8];
+ * } header;
+ * </pre>
+ *
+ */
+
+public class TarEntry implements TarConstants {
+    /** The entry's name. */
+    private StringBuffer name;
+
+    /** The entry's permission mode. */
+    private int mode;
+
+    /** The entry's user id. */
+    private int userId;
+
+    /** The entry's group id. */
+    private int groupId;
+
+    /** The entry's size. */
+    private long size;
+
+    /** The entry's modification time. */
+    private long modTime;
+
+    /** The entry's link flag. */
+    private byte linkFlag;
+
+    /** The entry's link name. */
+    private StringBuffer linkName;
+
+    /** The entry's magic tag. */
+    private StringBuffer magic;
+
+    /** The entry's user name. */
+    private StringBuffer userName;
+
+    /** The entry's group name. */
+    private StringBuffer groupName;
+
+    /** The entry's major device number. */
+    private int devMajor;
+
+    /** The entry's minor device number. */
+    private int devMinor;
+
+    /** The entry's file reference */
+    private File file;
+
+    /** Maximum length of a user's name in the tar file */
+    public static final int MAX_NAMELEN = 31;
+
+    /** Default permissions bits for directories */
+    public static final int DEFAULT_DIR_MODE = 040755;
+
+    /** Default permissions bits for files */
+    public static final int DEFAULT_FILE_MODE = 0100644;
+
+    /** Convert millis to seconds */
+    public static final int MILLIS_PER_SECOND = 1000;
+
+    /**
+     * Construct an empty entry and prepares the header values.
+     */
+    private TarEntry () {
+        this.magic = new StringBuffer(TMAGIC);
+        this.name = new StringBuffer();
+        this.linkName = new StringBuffer();
+
+        String user = System.getProperty("user.name", "");
+
+        if (user.length() > MAX_NAMELEN) {
+            user = user.substring(0, MAX_NAMELEN);
+        }
+
+        this.userId = 0;
+        this.groupId = 0;
+        this.userName = new StringBuffer(user);
+        this.groupName = new StringBuffer("");
+        this.file = null;
+    }
+
+    /**
+     * Construct an entry with only a name. This allows the programmer
+     * to construct the entry's header "by hand". File is set to null.
+     *
+     * @param name the entry name
+     */
+    public TarEntry(String name) {
+        this(name, false);
+    }
+
+    /**
+     * Construct an entry with only a name. This allows the programmer
+     * to construct the entry's header "by hand". File is set to null.
+     *
+     * @param name the entry name
+     * @param preserveLeadingSlashes whether to allow leading slashes
+     * in the name.
+     */
+    public TarEntry(String name, boolean preserveLeadingSlashes) {
+        this();
+
+        name = normalizeFileName(name, preserveLeadingSlashes);
+        boolean isDir = name.endsWith("/");
+
+        this.devMajor = 0;
+        this.devMinor = 0;
+        this.name = new StringBuffer(name);
+        this.mode = isDir ? DEFAULT_DIR_MODE : DEFAULT_FILE_MODE;
+        this.linkFlag = isDir ? LF_DIR : LF_NORMAL;
+        this.userId = 0;
+        this.groupId = 0;
+        this.size = 0;
+        this.modTime = (new Date()).getTime() / MILLIS_PER_SECOND;
+        this.linkName = new StringBuffer("");
+        this.userName = new StringBuffer("");
+        this.groupName = new StringBuffer("");
+        this.devMajor = 0;
+        this.devMinor = 0;
+
+    }
+
+    /**
+     * Construct an entry with a name and a link flag.
+     *
+     * @param name the entry name
+     * @param linkFlag the entry link flag.
+     */
+    public TarEntry(String name, byte linkFlag) {
+        this(name);
+        this.linkFlag = linkFlag;
+        if (linkFlag == LF_GNUTYPE_LONGNAME) {
+            magic = new StringBuffer(GNU_TMAGIC);
+        }
+    }
+
+    /**
+     * Construct an entry for a file. File is set to file, and the
+     * header is constructed from information from the file.
+     *
+     * @param file The file that the entry represents.
+     */
+    public TarEntry(File file) {
+        this();
+
+        this.file = file;
+
+        String fileName = normalizeFileName(file.getPath(), false);
+        this.linkName = new StringBuffer("");
+        this.name = new StringBuffer(fileName);
+
+        if (file.isDirectory()) {
+            this.mode = DEFAULT_DIR_MODE;
+            this.linkFlag = LF_DIR;
+
+            int nameLength = name.length();
+            if (nameLength == 0 || name.charAt(nameLength - 1) != '/') {
+                this.name.append("/");
+            }
+            this.size = 0;
+        } else {
+            this.mode = DEFAULT_FILE_MODE;
+            this.linkFlag = LF_NORMAL;
+            this.size = file.length();
+        }
+
+        this.modTime = file.lastModified() / MILLIS_PER_SECOND;
+        this.devMajor = 0;
+        this.devMinor = 0;
+    }
+
+    /**
+     * Construct an entry from an archive's header bytes. File is set
+     * to null.
+     *
+     * @param headerBuf The header bytes from a tar archive entry.
+     */
+    public TarEntry(byte[] headerBuf) {
+        this();
+        parseTarHeader(headerBuf);
+    }
+
+    /**
+     * Determine if the two entries are equal. Equality is determined
+     * by the header names being equal.
+     *
+     * @param it Entry to be checked for equality.
+     * @return True if the entries are equal.
+     */
+    public boolean equals(TarEntry it) {
+        return getName().equals(it.getName());
+    }
+
+    /**
+     * Determine if the two entries are equal. Equality is determined
+     * by the header names being equal.
+     *
+     * @param it Entry to be checked for equality.
+     * @return True if the entries are equal.
+     */
+    public boolean equals(Object it) {
+        if (it == null || getClass() != it.getClass()) {
+            return false;
+        }
+        return equals((TarEntry) it);
+    }
+
+    /**
+     * Hashcodes are based on entry names.
+     *
+     * @return the entry hashcode
+     */
+    public int hashCode() {
+        return getName().hashCode();
+    }
+
+    /**
+     * Determine if the given entry is a descendant of this entry.
+     * Descendancy is determined by the name of the descendant
+     * starting with this entry's name.
+     *
+     * @param desc Entry to be checked as a descendent of this.
+     * @return True if entry is a descendant of this.
+     */
+    public boolean isDescendent(TarEntry desc) {
+        return desc.getName().startsWith(getName());
+    }
+
+    /**
+     * Get this entry's name.
+     *
+     * @return This entry's name.
+     */
+    public String getName() {
+        return name.toString();
+    }
+
+    /**
+     * Set this entry's name.
+     *
+     * @param name This entry's new name.
+     */
+    public void setName(String name) {
+        this.name = new StringBuffer(normalizeFileName(name, false));
+    }
+
+    /**
+     * Set the mode for this entry
+     *
+     * @param mode the mode for this entry
+     */
+    public void setMode(int mode) {
+        this.mode = mode;
+    }
+
+    /**
+     * Get this entry's link name.
+     *
+     * @return This entry's link name.
+     */
+    public String getLinkName() {
+        return linkName.toString();
+    }
+
+    /**
+     * Get this entry's user id.
+     *
+     * @return This entry's user id.
+     */
+    public int getUserId() {
+        return userId;
+    }
+
+    /**
+     * Set this entry's user id.
+     *
+     * @param userId This entry's new user id.
+     */
+    public void setUserId(int userId) {
+        this.userId = userId;
+    }
+
+    /**
+     * Get this entry's group id.
+     *
+     * @return This entry's group id.
+     */
+    public int getGroupId() {
+        return groupId;
+    }
+
+    /**
+     * Set this entry's group id.
+     *
+     * @param groupId This entry's new group id.
+     */
+    public void setGroupId(int groupId) {
+        this.groupId = groupId;
+    }
+
+    /**
+     * Get this entry's user name.
+     *
+     * @return This entry's user name.
+     */
+    public String getUserName() {
+        return userName.toString();
+    }
+
+    /**
+     * Set this entry's user name.
+     *
+     * @param userName This entry's new user name.
+     */
+    public void setUserName(String userName) {
+        this.userName = new StringBuffer(userName);
+    }
+
+    /**
+     * Get this entry's group name.
+     *
+     * @return This entry's group name.
+     */
+    public String getGroupName() {
+        return groupName.toString();
+    }
+
+    /**
+     * Set this entry's group name.
+     *
+     * @param groupName This entry's new group name.
+     */
+    public void setGroupName(String groupName) {
+        this.groupName = new StringBuffer(groupName);
+    }
+
+    /**
+     * Convenience method to set this entry's group and user ids.
+     *
+     * @param userId This entry's new user id.
+     * @param groupId This entry's new group id.
+     */
+    public void setIds(int userId, int groupId) {
+        setUserId(userId);
+        setGroupId(groupId);
+    }
+
+    /**
+     * Convenience method to set this entry's group and user names.
+     *
+     * @param userName This entry's new user name.
+     * @param groupName This entry's new group name.
+     */
+    public void setNames(String userName, String groupName) {
+        setUserName(userName);
+        setGroupName(groupName);
+    }
+
+    /**
+     * Set this entry's modification time. The parameter passed
+     * to this method is in "Java time".
+     *
+     * @param time This entry's new modification time.
+     */
+    public void setModTime(long time) {
+        modTime = time / MILLIS_PER_SECOND;
+    }
+
+    /**
+     * Set this entry's modification time.
+     *
+     * @param time This entry's new modification time.
+     */
+    public void setModTime(Date time) {
+        modTime = time.getTime() / MILLIS_PER_SECOND;
+    }
+
+    /**
+     * Set this entry's modification time.
+     *
+     * @return time This entry's new modification time.
+     */
+    public Date getModTime() {
+        return new Date(modTime * MILLIS_PER_SECOND);
+    }
+
+    /**
+     * Get this entry's file.
+     *
+     * @return This entry's file.
+     */
+    public File getFile() {
+        return file;
+    }
+
+    /**
+     * Get this entry's mode.
+     *
+     * @return This entry's mode.
+     */
+    public int getMode() {
+        return mode;
+    }
+
+    /**
+     * Get this entry's file size.
+     *
+     * @return This entry's file size.
+     */
+    public long getSize() {
+        return size;
+    }
+
+    /**
+     * Set this entry's file size.
+     *
+     * @param size This entry's new file size.
+     */
+    public void setSize(long size) {
+        this.size = size;
+    }
+
+
+    /**
+     * Indicate if this entry is a GNU long name block
+     *
+     * @return true if this is a long name extension provided by GNU tar
+     */
+    public boolean isGNULongNameEntry() {
+        return linkFlag == LF_GNUTYPE_LONGNAME
+                           && name.toString().equals(GNU_LONGLINK);
+    }
+
+    /**
+     * Return whether or not this entry represents a directory.
+     *
+     * @return True if this entry is a directory.
+     */
+    public boolean isDirectory() {
+        if (file != null) {
+            return file.isDirectory();
+        }
+
+        if (linkFlag == LF_DIR) {
+            return true;
+        }
+
+        if (getName().endsWith("/")) {
+            return true;
+        }
+
+        return false;
+    }
+
+    /**
+     * If this entry represents a file, and the file is a directory, return
+     * an array of TarEntries for this entry's children.
+     *
+     * @return An array of TarEntry's for this entry's children.
+     */
+    public TarEntry[] getDirectoryEntries() {
+        if (file == null || !file.isDirectory()) {
+            return new TarEntry[0];
+        }
+
+        String[]   list = file.list();
+        TarEntry[] result = new TarEntry[list.length];
+
+        for (int i = 0; i < list.length; ++i) {
+            result[i] = new TarEntry(new File(file, list[i]));
+        }
+
+        return result;
+    }
+
+    /**
+     * Write an entry's header information to a header buffer.
+     *
+     * @param outbuf The tar entry header buffer to fill in.
+     */
+    public void writeEntryHeader(byte[] outbuf) {
+        int offset = 0;
+
+        offset = TarUtils.getNameBytes(name, outbuf, offset, NAMELEN);
+        offset = TarUtils.getOctalBytes(mode, outbuf, offset, MODELEN);
+        offset = TarUtils.getOctalBytes(userId, outbuf, offset, UIDLEN);
+        offset = TarUtils.getOctalBytes(groupId, outbuf, offset, GIDLEN);
+        offset = TarUtils.getLongOctalBytes(size, outbuf, offset, SIZELEN);
+        offset = TarUtils.getLongOctalBytes(modTime, outbuf, offset, MODTIMELEN);
+
+        int csOffset = offset;
+
+        for (int c = 0; c < CHKSUMLEN; ++c) {
+            outbuf[offset++] = (byte) ' ';
+        }
+
+        outbuf[offset++] = linkFlag;
+        offset = TarUtils.getNameBytes(linkName, outbuf, offset, NAMELEN);
+        offset = TarUtils.getNameBytes(magic, outbuf, offset, MAGICLEN);
+        offset = TarUtils.getNameBytes(userName, outbuf, offset, UNAMELEN);
+        offset = TarUtils.getNameBytes(groupName, outbuf, offset, GNAMELEN);
+        offset = TarUtils.getOctalBytes(devMajor, outbuf, offset, DEVLEN);
+        offset = TarUtils.getOctalBytes(devMinor, outbuf, offset, DEVLEN);
+
+        while (offset < outbuf.length) {
+            outbuf[offset++] = 0;
+        }
+
+        long chk = TarUtils.computeCheckSum(outbuf);
+
+        TarUtils.getCheckSumOctalBytes(chk, outbuf, csOffset, CHKSUMLEN);
+    }
+
+    /**
+     * Parse an entry's header information from a header buffer.
+     *
+     * @param header The tar entry header buffer to get information from.
+     */
+    public void parseTarHeader(byte[] header) {
+        int offset = 0;
+
+        name = TarUtils.parseName(header, offset, NAMELEN);
+        offset += NAMELEN;
+        mode = (int) TarUtils.parseOctal(header, offset, MODELEN);
+        offset += MODELEN;
+        userId = (int) TarUtils.parseOctal(header, offset, UIDLEN);
+        offset += UIDLEN;
+        groupId = (int) TarUtils.parseOctal(header, offset, GIDLEN);
+        offset += GIDLEN;
+        size = TarUtils.parseOctal(header, offset, SIZELEN);
+        offset += SIZELEN;
+        modTime = TarUtils.parseOctal(header, offset, MODTIMELEN);
+        offset += MODTIMELEN;
+        offset += CHKSUMLEN;
+        linkFlag = header[offset++];
+        linkName = TarUtils.parseName(header, offset, NAMELEN);
+        offset += NAMELEN;
+        magic = TarUtils.parseName(header, offset, MAGICLEN);
+        offset += MAGICLEN;
+        userName = TarUtils.parseName(header, offset, UNAMELEN);
+        offset += UNAMELEN;
+        groupName = TarUtils.parseName(header, offset, GNAMELEN);
+        offset += GNAMELEN;
+        devMajor = (int) TarUtils.parseOctal(header, offset, DEVLEN);
+        offset += DEVLEN;
+        devMinor = (int) TarUtils.parseOctal(header, offset, DEVLEN);
+    }
+
+    /**
+     * Strips Windows' drive letter as well as any leading slashes,
+     * turns path separators into forward slahes.
+     */
+    private static String normalizeFileName(String fileName,
+                                            boolean preserveLeadingSlashes) {
+        String osname = System.getProperty("os.name").toLowerCase(Locale.ENGLISH);
+
+        if (osname != null) {
+
+            // Strip off drive letters!
+            // REVIEW Would a better check be "(File.separator == '\')"?
+
+            if (osname.startsWith("windows")) {
+                if (fileName.length() > 2) {
+                    char ch1 = fileName.charAt(0);
+                    char ch2 = fileName.charAt(1);
+
+                    if (ch2 == ':'
+                        && ((ch1 >= 'a' && ch1 <= 'z')
+                            || (ch1 >= 'A' && ch1 <= 'Z'))) {
+                        fileName = fileName.substring(2);
+                    }
+                }
+            } else if (osname.indexOf("netware") > -1) {
+                int colon = fileName.indexOf(':');
+                if (colon != -1) {
+                    fileName = fileName.substring(colon + 1);
+                }
+            }
+        }
+
+        fileName = fileName.replace(File.separatorChar, '/');
+
+        // No absolute pathnames
+        // Windows (and Posix?) paths can start with "\\NetworkDrive\",
+        // so we loop on starting /'s.
+        while (!preserveLeadingSlashes && fileName.startsWith("/")) {
+            fileName = fileName.substring(1);
+        }
+        return fileName;
+    }
+}

Added: activemq/trunk/activemq-console/src/main/java/org/apache/activemq/console/command/store/tar/TarInputStream.java
URL: http://svn.apache.org/viewvc/activemq/trunk/activemq-console/src/main/java/org/apache/activemq/console/command/store/tar/TarInputStream.java?rev=1402652&view=auto
==============================================================================
--- activemq/trunk/activemq-console/src/main/java/org/apache/activemq/console/command/store/tar/TarInputStream.java (added)
+++ activemq/trunk/activemq-console/src/main/java/org/apache/activemq/console/command/store/tar/TarInputStream.java Fri Oct 26 20:28:49 2012
@@ -0,0 +1,402 @@
+/*
+ *  Licensed to the Apache Software Foundation (ASF) under one or more
+ *  contributor license agreements.  See the NOTICE file distributed with
+ *  this work for additional information regarding copyright ownership.
+ *  The ASF licenses this file to You under the Apache License, Version 2.0
+ *  (the "License"); you may not use this file except in compliance with
+ *  the License.  You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ *  Unless required by applicable law or agreed to in writing, software
+ *  distributed under the License is distributed on an "AS IS" BASIS,
+ *  WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ *  See the License for the specific language governing permissions and
+ *  limitations under the License.
+ *
+ */
+
+/*
+ * This package is based on the work done by Timothy Gerard Endres
+ * (time@ice.com) to whom the Ant project is very grateful for his great code.
+ */
+
+package org.apache.activemq.console.command.store.tar;
+
+import java.io.FilterInputStream;
+import java.io.IOException;
+import java.io.InputStream;
+import java.io.OutputStream;
+
+/**
+ * The TarInputStream reads a UNIX tar archive as an InputStream.
+ * methods are provided to position at each successive entry in
+ * the archive, and the read each entry as a normal input stream
+ * using read().
+ *
+ */
+public class TarInputStream extends FilterInputStream {
+    private static final int SMALL_BUFFER_SIZE = 256;
+    private static final int BUFFER_SIZE = 8 * 1024;
+    private static final int LARGE_BUFFER_SIZE = 32 * 1024;
+    private static final int BYTE_MASK = 0xFF;
+
+    // CheckStyle:VisibilityModifier OFF - bc
+    protected boolean debug;
+    protected boolean hasHitEOF;
+    protected long entrySize;
+    protected long entryOffset;
+    protected byte[] readBuf;
+    protected TarBuffer buffer;
+    protected TarEntry currEntry;
+
+    /**
+     * This contents of this array is not used at all in this class,
+     * it is only here to avoid repreated object creation during calls
+     * to the no-arg read method.
+     */
+    protected byte[] oneBuf;
+
+    // CheckStyle:VisibilityModifier ON
+
+    /**
+     * Constructor for TarInputStream.
+     * @param is the input stream to use
+     */
+    public TarInputStream(InputStream is) {
+        this(is, TarBuffer.DEFAULT_BLKSIZE, TarBuffer.DEFAULT_RCDSIZE);
+    }
+
+    /**
+     * Constructor for TarInputStream.
+     * @param is the input stream to use
+     * @param blockSize the block size to use
+     */
+    public TarInputStream(InputStream is, int blockSize) {
+        this(is, blockSize, TarBuffer.DEFAULT_RCDSIZE);
+    }
+
+    /**
+     * Constructor for TarInputStream.
+     * @param is the input stream to use
+     * @param blockSize the block size to use
+     * @param recordSize the record size to use
+     */
+    public TarInputStream(InputStream is, int blockSize, int recordSize) {
+        super(is);
+
+        this.buffer = new TarBuffer(is, blockSize, recordSize);
+        this.readBuf = null;
+        this.oneBuf = new byte[1];
+        this.debug = false;
+        this.hasHitEOF = false;
+    }
+
+    /**
+     * Sets the debugging flag.
+     *
+     * @param debug True to turn on debugging.
+     */
+    public void setDebug(boolean debug) {
+        this.debug = debug;
+        buffer.setDebug(debug);
+    }
+
+    /**
+     * Closes this stream. Calls the TarBuffer's close() method.
+     * @throws IOException on error
+     */
+    public void close() throws IOException {
+        buffer.close();
+    }
+
+    /**
+     * Get the record size being used by this stream's TarBuffer.
+     *
+     * @return The TarBuffer record size.
+     */
+    public int getRecordSize() {
+        return buffer.getRecordSize();
+    }
+
+    /**
+     * Get the available data that can be read from the current
+     * entry in the archive. This does not indicate how much data
+     * is left in the entire archive, only in the current entry.
+     * This value is determined from the entry's size header field
+     * and the amount of data already read from the current entry.
+     * Integer.MAX_VALUE is returen in case more than Integer.MAX_VALUE
+     * bytes are left in the current entry in the archive.
+     *
+     * @return The number of available bytes for the current entry.
+     * @throws IOException for signature
+     */
+    public int available() throws IOException {
+        if (entrySize - entryOffset > Integer.MAX_VALUE) {
+            return Integer.MAX_VALUE;
+        }
+        return (int) (entrySize - entryOffset);
+    }
+
+    /**
+     * Skip bytes in the input buffer. This skips bytes in the
+     * current entry's data, not the entire archive, and will
+     * stop at the end of the current entry's data if the number
+     * to skip extends beyond that point.
+     *
+     * @param numToSkip The number of bytes to skip.
+     * @return the number actually skipped
+     * @throws IOException on error
+     */
+    public long skip(long numToSkip) throws IOException {
+        // REVIEW
+        // This is horribly inefficient, but it ensures that we
+        // properly skip over bytes via the TarBuffer...
+        //
+        byte[] skipBuf = new byte[BUFFER_SIZE];
+        long skip = numToSkip;
+        while (skip > 0) {
+            int realSkip = (int) (skip > skipBuf.length ? skipBuf.length : skip);
+            int numRead = read(skipBuf, 0, realSkip);
+            if (numRead == -1) {
+                break;
+            }
+            skip -= numRead;
+        }
+        return (numToSkip - skip);
+    }
+
+    /**
+     * Since we do not support marking just yet, we return false.
+     *
+     * @return False.
+     */
+    public boolean markSupported() {
+        return false;
+    }
+
+    /**
+     * Since we do not support marking just yet, we do nothing.
+     *
+     * @param markLimit The limit to mark.
+     */
+    public void mark(int markLimit) {
+    }
+
+    /**
+     * Since we do not support marking just yet, we do nothing.
+     */
+    public void reset() {
+    }
+
+    /**
+     * Get the next entry in this tar archive. This will skip
+     * over any remaining data in the current entry, if there
+     * is one, and place the input stream at the header of the
+     * next entry, and read the header and instantiate a new
+     * TarEntry from the header bytes and return that entry.
+     * If there are no more entries in the archive, null will
+     * be returned to indicate that the end of the archive has
+     * been reached.
+     *
+     * @return The next TarEntry in the archive, or null.
+     * @throws IOException on error
+     */
+    public TarEntry getNextEntry() throws IOException {
+        if (hasHitEOF) {
+            return null;
+        }
+
+        if (currEntry != null) {
+            long numToSkip = entrySize - entryOffset;
+
+            if (debug) {
+                System.err.println("TarInputStream: SKIP currENTRY '"
+                        + currEntry.getName() + "' SZ "
+                        + entrySize + " OFF "
+                        + entryOffset + "  skipping "
+                        + numToSkip + " bytes");
+            }
+
+            while (numToSkip > 0) {
+                long skipped = skip(numToSkip);
+                if (skipped <= 0) {
+                    throw new RuntimeException("failed to skip current tar"
+                                               + " entry");
+                }
+                numToSkip -= skipped;
+            }
+
+            readBuf = null;
+        }
+
+        byte[] headerBuf = buffer.readRecord();
+
+        if (headerBuf == null) {
+            if (debug) {
+                System.err.println("READ NULL RECORD");
+            }
+            hasHitEOF = true;
+        } else if (buffer.isEOFRecord(headerBuf)) {
+            if (debug) {
+                System.err.println("READ EOF RECORD");
+            }
+            hasHitEOF = true;
+        }
+
+        if (hasHitEOF) {
+            currEntry = null;
+        } else {
+            currEntry = new TarEntry(headerBuf);
+
+            if (debug) {
+                System.err.println("TarInputStream: SET CURRENTRY '"
+                        + currEntry.getName()
+                        + "' size = "
+                        + currEntry.getSize());
+            }
+
+            entryOffset = 0;
+
+            entrySize = currEntry.getSize();
+        }
+
+        if (currEntry != null && currEntry.isGNULongNameEntry()) {
+            // read in the name
+            StringBuffer longName = new StringBuffer();
+            byte[] buf = new byte[SMALL_BUFFER_SIZE];
+            int length = 0;
+            while ((length = read(buf)) >= 0) {
+                longName.append(new String(buf, 0, length));
+            }
+            getNextEntry();
+            if (currEntry == null) {
+                // Bugzilla: 40334
+                // Malformed tar file - long entry name not followed by entry
+                return null;
+            }
+            // remove trailing null terminator
+            if (longName.length() > 0
+                && longName.charAt(longName.length() - 1) == 0) {
+                longName.deleteCharAt(longName.length() - 1);
+            }
+            currEntry.setName(longName.toString());
+        }
+
+        return currEntry;
+    }
+
+    /**
+     * Reads a byte from the current tar archive entry.
+     *
+     * This method simply calls read( byte[], int, int ).
+     *
+     * @return The byte read, or -1 at EOF.
+     * @throws IOException on error
+     */
+    public int read() throws IOException {
+        int num = read(oneBuf, 0, 1);
+        return num == -1 ? -1 : ((int) oneBuf[0]) & BYTE_MASK;
+    }
+
+    /**
+     * Reads bytes from the current tar archive entry.
+     *
+     * This method is aware of the boundaries of the current
+     * entry in the archive and will deal with them as if they
+     * were this stream's start and EOF.
+     *
+     * @param buf The buffer into which to place bytes read.
+     * @param offset The offset at which to place bytes read.
+     * @param numToRead The number of bytes to read.
+     * @return The number of bytes read, or -1 at EOF.
+     * @throws IOException on error
+     */
+    public int read(byte[] buf, int offset, int numToRead) throws IOException {
+        int totalRead = 0;
+
+        if (entryOffset >= entrySize) {
+            return -1;
+        }
+
+        if ((numToRead + entryOffset) > entrySize) {
+            numToRead = (int) (entrySize - entryOffset);
+        }
+
+        if (readBuf != null) {
+            int sz = (numToRead > readBuf.length) ? readBuf.length
+                    : numToRead;
+
+            System.arraycopy(readBuf, 0, buf, offset, sz);
+
+            if (sz >= readBuf.length) {
+                readBuf = null;
+            } else {
+                int newLen = readBuf.length - sz;
+                byte[] newBuf = new byte[newLen];
+
+                System.arraycopy(readBuf, sz, newBuf, 0, newLen);
+
+                readBuf = newBuf;
+            }
+
+            totalRead += sz;
+            numToRead -= sz;
+            offset += sz;
+        }
+
+        while (numToRead > 0) {
+            byte[] rec = buffer.readRecord();
+
+            if (rec == null) {
+                // Unexpected EOF!
+                throw new IOException("unexpected EOF with " + numToRead
+                        + " bytes unread");
+            }
+
+            int sz = numToRead;
+            int recLen = rec.length;
+
+            if (recLen > sz) {
+                System.arraycopy(rec, 0, buf, offset, sz);
+
+                readBuf = new byte[recLen - sz];
+
+                System.arraycopy(rec, sz, readBuf, 0, recLen - sz);
+            } else {
+                sz = recLen;
+
+                System.arraycopy(rec, 0, buf, offset, recLen);
+            }
+
+            totalRead += sz;
+            numToRead -= sz;
+            offset += sz;
+        }
+
+        entryOffset += totalRead;
+
+        return totalRead;
+    }
+
+    /**
+     * Copies the contents of the current tar archive entry directly into
+     * an output stream.
+     *
+     * @param out The OutputStream into which to write the entry's data.
+     * @throws IOException on error
+     */
+    public void copyEntryContents(OutputStream out) throws IOException {
+        byte[] buf = new byte[LARGE_BUFFER_SIZE];
+
+        while (true) {
+            int numRead = read(buf, 0, buf.length);
+
+            if (numRead == -1) {
+                break;
+            }
+
+            out.write(buf, 0, numRead);
+        }
+    }
+}