You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@activemq.apache.org by ch...@apache.org on 2012/10/26 22:28:50 UTC
svn commit: r1402652 [1/2] - in /activemq/trunk/activemq-console: ./
src/main/java/org/apache/activemq/console/command/
src/main/java/org/apache/activemq/console/command/store/
src/main/java/org/apache/activemq/console/command/store/tar/ src/main/proto/
Author: chirino
Date: Fri Oct 26 20:28:49 2012
New Revision: 1402652
URL: http://svn.apache.org/viewvc?rev=1402652&view=rev
Log:
Implementing AMQ-4137 : Create a store import/export command line tool to covert between store types
Usage is a simple as "bin/activemq export --file=archive.tgz". All data is stored in the tgz in the format defined by the Apollo store export/import utilities. Only export implemented at this time. I have verified that Apollo can import queues exported from ActiveMQ. Durable subs probably need a little more work.
Added:
activemq/trunk/activemq-console/src/main/java/org/apache/activemq/console/command/StoreExportCommand.java
activemq/trunk/activemq-console/src/main/java/org/apache/activemq/console/command/store/ExportStreamManager.java
activemq/trunk/activemq-console/src/main/java/org/apache/activemq/console/command/store/StoreExporter.java
activemq/trunk/activemq-console/src/main/java/org/apache/activemq/console/command/store/tar/
activemq/trunk/activemq-console/src/main/java/org/apache/activemq/console/command/store/tar/TarBuffer.java
activemq/trunk/activemq-console/src/main/java/org/apache/activemq/console/command/store/tar/TarConstants.java
activemq/trunk/activemq-console/src/main/java/org/apache/activemq/console/command/store/tar/TarEntry.java
activemq/trunk/activemq-console/src/main/java/org/apache/activemq/console/command/store/tar/TarInputStream.java
activemq/trunk/activemq-console/src/main/java/org/apache/activemq/console/command/store/tar/TarOutputStream.java
activemq/trunk/activemq-console/src/main/java/org/apache/activemq/console/command/store/tar/TarUtils.java
activemq/trunk/activemq-console/src/main/proto/
activemq/trunk/activemq-console/src/main/proto/data.proto
Modified:
activemq/trunk/activemq-console/pom.xml
activemq/trunk/activemq-console/src/main/java/org/apache/activemq/console/command/ShellCommand.java
Modified: activemq/trunk/activemq-console/pom.xml
URL: http://svn.apache.org/viewvc/activemq/trunk/activemq-console/pom.xml?rev=1402652&r1=1402651&r2=1402652&view=diff
==============================================================================
--- activemq/trunk/activemq-console/pom.xml (original)
+++ activemq/trunk/activemq-console/pom.xml Fri Oct 26 20:28:49 2012
@@ -77,8 +77,28 @@
<artifactId>xbean-spring</artifactId>
</dependency>
+ <dependency>
+ <groupId>org.fusesource.hawtbuf</groupId>
+ <artifactId>hawtbuf-proto</artifactId>
+ <version>${hawtbuf-version}</version>
+ </dependency>
+ <dependency>
+ <groupId>org.slf4j</groupId>
+ <artifactId>slf4j-api</artifactId>
+ </dependency>
+ <dependency>
+ <groupId>org.codehaus.jackson</groupId>
+ <artifactId>jackson-core-asl</artifactId>
+ <version>${jackson-version}</version>
+ </dependency>
+ <dependency>
+ <groupId>org.codehaus.jackson</groupId>
+ <artifactId>jackson-mapper-asl</artifactId>
+ <version>${jackson-version}</version>
+ </dependency>
- <!-- needed for TestPurgeCommand, but not for compile. -->
+
+ <!-- needed for TestPurgeCommand, but not for compile. -->
<dependency>
<groupId>org.springframework</groupId>
<artifactId>spring-context</artifactId>
@@ -173,6 +193,22 @@
</includes>
</configuration>
</plugin>
+
+ <plugin>
+ <groupId>org.fusesource.hawtbuf</groupId>
+ <artifactId>hawtbuf-protoc</artifactId>
+ <version>${hawtbuf-version}</version>
+ <configuration>
+ <type>alt</type>
+ </configuration>
+ <executions>
+ <execution>
+ <goals>
+ <goal>compile</goal>
+ </goals>
+ </execution>
+ </executions>
+ </plugin>
</plugins>
</build>
Modified: activemq/trunk/activemq-console/src/main/java/org/apache/activemq/console/command/ShellCommand.java
URL: http://svn.apache.org/viewvc/activemq/trunk/activemq-console/src/main/java/org/apache/activemq/console/command/ShellCommand.java?rev=1402652&r1=1402651&r2=1402652&view=diff
==============================================================================
--- activemq/trunk/activemq-console/src/main/java/org/apache/activemq/console/command/ShellCommand.java (original)
+++ activemq/trunk/activemq-console/src/main/java/org/apache/activemq/console/command/ShellCommand.java Fri Oct 26 20:28:49 2012
@@ -48,6 +48,7 @@ public class ShellCommand extends Abstra
" query - Display selected broker component's attributes and statistics.",
" browse - Display selected messages in a specified destination.",
" journal-audit - Allows you to view records stored in the persistent journal.",
+ " export - Exports a stopped brokers data files to an archive file",
" purge - Delete selected destination's messages that matches the message selector",
" encrypt - Encrypts given text",
" decrypt - Decrypts given text",
@@ -137,6 +138,8 @@ public class ShellCommand extends Abstra
command = new EncryptCommand();
} else if (taskToken.equals("decrypt")) {
command = new DecryptCommand();
+ } else if (taskToken.equals("export")) {
+ command = new StoreExportCommand();
} else if (taskToken.equals("help")) {
printHelp();
} else {
Added: activemq/trunk/activemq-console/src/main/java/org/apache/activemq/console/command/StoreExportCommand.java
URL: http://svn.apache.org/viewvc/activemq/trunk/activemq-console/src/main/java/org/apache/activemq/console/command/StoreExportCommand.java?rev=1402652&view=auto
==============================================================================
--- activemq/trunk/activemq-console/src/main/java/org/apache/activemq/console/command/StoreExportCommand.java (added)
+++ activemq/trunk/activemq-console/src/main/java/org/apache/activemq/console/command/StoreExportCommand.java Fri Oct 26 20:28:49 2012
@@ -0,0 +1,47 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.activemq.console.command;
+
+import org.apache.activemq.console.CommandContext;
+import org.apache.activemq.console.command.store.StoreExporter;
+import org.apache.activemq.console.command.store.amq.CommandLineSupport;
+
+import java.util.Arrays;
+import java.util.List;
+
+/**
+ * @author <a href="http://hiramchirino.com">Hiram Chirino</a>
+ */
+public class StoreExportCommand implements Command {
+
+ private CommandContext context;
+
+ @Override
+ public void setCommandContext(CommandContext context) {
+ this.context = context;
+ }
+
+ @Override
+ public void execute(List<String> tokens) throws Exception {
+ StoreExporter exporter = new StoreExporter();
+ String[] remaining = CommandLineSupport.setOptions(exporter, tokens.toArray(new String[tokens.size()]));
+ if (remaining.length > 0) {
+ throw new Exception("Unexpected arguments: " + Arrays.asList(remaining));
+ }
+ exporter.execute();
+ }
+}
Added: activemq/trunk/activemq-console/src/main/java/org/apache/activemq/console/command/store/ExportStreamManager.java
URL: http://svn.apache.org/viewvc/activemq/trunk/activemq-console/src/main/java/org/apache/activemq/console/command/store/ExportStreamManager.java?rev=1402652&view=auto
==============================================================================
--- activemq/trunk/activemq-console/src/main/java/org/apache/activemq/console/command/store/ExportStreamManager.java (added)
+++ activemq/trunk/activemq-console/src/main/java/org/apache/activemq/console/command/store/ExportStreamManager.java Fri Oct 26 20:28:49 2012
@@ -0,0 +1,85 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.activemq.console.command.store;
+
+import org.apache.activemq.console.command.store.proto.*;
+import org.apache.activemq.console.command.store.tar.TarEntry;
+import org.apache.activemq.console.command.store.tar.TarOutputStream;
+import org.fusesource.hawtbuf.AsciiBuffer;
+import org.fusesource.hawtbuf.Buffer;
+import org.fusesource.hawtbuf.proto.MessageBuffer;
+
+import java.io.IOException;
+import java.io.OutputStream;
+import java.util.zip.GZIPOutputStream;
+
+/**
+ * @author <a href="http://hiramchirino.com">Hiram Chirino</a>
+ */
+public class ExportStreamManager {
+
+ private final OutputStream target;
+ private final int version;
+ TarOutputStream stream;
+
+ ExportStreamManager(OutputStream target, int version) throws IOException {
+ this.target = target;
+ this.version = version;
+ stream = new TarOutputStream(new GZIPOutputStream(target));
+ store("ver", new AsciiBuffer(""+version));
+ }
+
+
+ long seq = 0;
+
+ public void finish() throws IOException {
+ stream.close();
+ }
+
+ private void store(String ext, Buffer value) throws IOException {
+ TarEntry entry = new TarEntry(seq + "." + ext);
+ seq += 1;
+ entry.setSize(value.length());
+ stream.putNextEntry(entry);
+ value.writeTo(stream);
+ stream.closeEntry();
+ }
+
+ private void store(String ext, MessageBuffer<?,?> value) throws IOException {
+ TarEntry entry = new TarEntry(seq + "." + ext);
+ seq += 1;
+ entry.setSize(value.serializedSizeFramed());
+ stream.putNextEntry(entry);
+ value.writeFramed(stream);
+ stream.closeEntry();
+ }
+
+
+ public void store_queue(QueuePB.Getter value) throws IOException {
+ store("que", value.freeze());
+ }
+ public void store_queue_entry(QueueEntryPB.Getter value) throws IOException {
+ store("qen", value.freeze());
+ }
+ public void store_message(MessagePB.Getter value) throws IOException {
+ store("msg", value.freeze());
+ }
+ public void store_map_entry(MapEntryPB.Getter value) throws IOException {
+ store("map", value.freeze());
+ }
+
+}
Added: activemq/trunk/activemq-console/src/main/java/org/apache/activemq/console/command/store/StoreExporter.java
URL: http://svn.apache.org/viewvc/activemq/trunk/activemq-console/src/main/java/org/apache/activemq/console/command/store/StoreExporter.java?rev=1402652&view=auto
==============================================================================
--- activemq/trunk/activemq-console/src/main/java/org/apache/activemq/console/command/store/StoreExporter.java (added)
+++ activemq/trunk/activemq-console/src/main/java/org/apache/activemq/console/command/store/StoreExporter.java Fri Oct 26 20:28:49 2012
@@ -0,0 +1,273 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.activemq.console.command.store;
+
+import org.apache.activemq.broker.BrokerFactory;
+import org.apache.activemq.broker.BrokerService;
+import org.apache.activemq.command.*;
+import org.apache.activemq.console.command.store.proto.MessagePB;
+import org.apache.activemq.console.command.store.proto.QueueEntryPB;
+import org.apache.activemq.console.command.store.proto.QueuePB;
+import org.apache.activemq.openwire.OpenWireFormat;
+import org.apache.activemq.store.*;
+import org.apache.activemq.xbean.XBeanBrokerFactory;
+import org.codehaus.jackson.map.ObjectMapper;
+import org.fusesource.hawtbuf.AsciiBuffer;
+import org.fusesource.hawtbuf.DataByteArrayOutputStream;
+import org.fusesource.hawtbuf.UTF8Buffer;
+
+import java.io.BufferedOutputStream;
+import java.io.File;
+import java.io.FileOutputStream;
+import java.io.IOException;
+import java.net.URI;
+import java.net.URISyntaxException;
+import java.util.HashMap;
+
+/**
+ * @author <a href="http://hiramchirino.com">Hiram Chirino</a>
+ */
+public class StoreExporter {
+
+ URI config;
+ File file;
+
+ public StoreExporter() throws URISyntaxException {
+ config = new URI("xbean:activemq.xml");
+ }
+
+ public void execute() throws Exception {
+ if (config == null) {
+ throw new Exception("required --config option missing");
+ }
+ if (file == null) {
+ throw new Exception("required --file option missing");
+ }
+ System.out.println("Loading: " + config);
+ XBeanBrokerFactory.setStartDefault(false); // to avoid the broker auto-starting..
+ BrokerService broker = BrokerFactory.createBroker(config);
+ XBeanBrokerFactory.resetStartDefault();
+ PersistenceAdapter store = broker.getPersistenceAdapter();
+ System.out.println("Starting: " + store);
+ store.start();
+ try {
+ BufferedOutputStream fos = new BufferedOutputStream(new FileOutputStream(file));
+ try {
+ export(store, fos);
+ } finally {
+ fos.close();
+ }
+ } finally {
+ store.stop();
+ }
+ }
+
+ static final int OPENWIRE_VERSION = 8;
+ static final boolean TIGHT_ENCODING = false;
+
+ void export(PersistenceAdapter store, BufferedOutputStream fos) throws Exception {
+
+ ObjectMapper mapper = new ObjectMapper();
+ final AsciiBuffer ds_kind = new AsciiBuffer("ds");
+ final AsciiBuffer ptp_kind = new AsciiBuffer("ptp");
+ final AsciiBuffer codec_id = new AsciiBuffer("openwire");
+ final OpenWireFormat wireformat = new OpenWireFormat();
+ wireformat.setCacheEnabled(false);
+ wireformat.setTightEncodingEnabled(TIGHT_ENCODING);
+ wireformat.setVersion(OPENWIRE_VERSION);
+
+ final long[] messageKeyCounter = new long[]{0};
+ final long[] containerKeyCounter = new long[]{0};
+ final ExportStreamManager manager = new ExportStreamManager(fos, 1);
+
+
+ final int[] preparedTxs = new int[]{0};
+ store.createTransactionStore().recover(new TransactionRecoveryListener() {
+ public void recover(XATransactionId xid, Message[] addedMessages, MessageAck[] aks) {
+ preparedTxs[0] += 1;
+ }
+ });
+
+ if (preparedTxs[0] > 0) {
+ throw new Exception("Cannot export a store with prepared XA transactions. Please commit or rollback those transactions before attempting to export.");
+ }
+
+ for (ActiveMQDestination odest : store.getDestinations()) {
+ containerKeyCounter[0]++;
+ if (odest instanceof ActiveMQQueue) {
+ ActiveMQQueue dest = (ActiveMQQueue) odest;
+ MessageStore queue = store.createQueueMessageStore(dest);
+
+ QueuePB.Bean destRecord = new QueuePB.Bean();
+ destRecord.setKey(containerKeyCounter[0]);
+ destRecord.setBindingKind(ptp_kind);
+
+ final long[] seqKeyCounter = new long[]{0};
+
+ HashMap<String, Object> jsonMap = new HashMap<String, Object>();
+ jsonMap.put("@class", "queue_destination");
+ jsonMap.put("name", dest.getQueueName());
+ String json = mapper.writeValueAsString(jsonMap);
+ System.out.println(json);
+ destRecord.setBindingData(new UTF8Buffer(json));
+ manager.store_queue(destRecord);
+
+ queue.recover(new MessageRecoveryListener() {
+ public boolean hasSpace() {
+ return true;
+ }
+
+ public boolean recoverMessageReference(MessageId ref) throws Exception {
+ return true;
+ }
+
+ public boolean isDuplicate(MessageId ref) {
+ return false;
+ }
+
+ public boolean recoverMessage(Message message) throws IOException {
+ messageKeyCounter[0]++;
+ seqKeyCounter[0]++;
+
+ DataByteArrayOutputStream mos = new DataByteArrayOutputStream();
+ mos.writeBoolean(TIGHT_ENCODING);
+ mos.writeVarInt(OPENWIRE_VERSION);
+ wireformat.marshal(message, mos);
+
+ MessagePB.Bean messageRecord = new MessagePB.Bean();
+ messageRecord.setCodec(codec_id);
+ messageRecord.setMessageKey(messageKeyCounter[0]);
+ messageRecord.setSize(message.getSize());
+ messageRecord.setValue(mos.toBuffer());
+ // record.setCompression()
+ manager.store_message(messageRecord);
+
+ QueueEntryPB.Bean entryRecord = new QueueEntryPB.Bean();
+ entryRecord.setQueueKey(containerKeyCounter[0]);
+ entryRecord.setQueueSeq(seqKeyCounter[0]);
+ entryRecord.setMessageKey(messageKeyCounter[0]);
+ entryRecord.setSize(message.getSize());
+ if (message.getExpiration() != 0) {
+ entryRecord.setExpiration(message.getExpiration());
+ }
+ if (message.getRedeliveryCounter() != 0) {
+ entryRecord.setRedeliveries(message.getRedeliveryCounter());
+ }
+ manager.store_queue_entry(entryRecord);
+ return true;
+ }
+ });
+
+ } else if (odest instanceof ActiveMQTopic) {
+ ActiveMQTopic dest = (ActiveMQTopic) odest;
+
+ TopicMessageStore topic = store.createTopicMessageStore(dest);
+ for (SubscriptionInfo sub : topic.getAllSubscriptions()) {
+
+ QueuePB.Bean destRecord = new QueuePB.Bean();
+ destRecord.setKey(containerKeyCounter[0]);
+ destRecord.setBindingKind(ds_kind);
+
+ // TODO: use a real JSON encoder like jackson.
+ HashMap<String, Object> jsonMap = new HashMap<String, Object>();
+ jsonMap.put("@class", "dsub_destination");
+ jsonMap.put("name", sub.getClientId() + ":" + sub.getSubcriptionName());
+ HashMap<String, Object> jsonTopic = new HashMap<String, Object>();
+ jsonTopic.put("name", dest.getTopicName());
+ jsonMap.put("topics", new Object[]{jsonTopic});
+ if (sub.getSelector() != null) {
+ jsonMap.put("selector", sub.getSelector());
+ }
+ String json = mapper.writeValueAsString(jsonMap);
+ System.out.println(json);
+
+ destRecord.setBindingData(new UTF8Buffer(json));
+ manager.store_queue(destRecord);
+
+ final long seqKeyCounter[] = new long[]{0};
+ topic.recoverSubscription(sub.getClientId(), sub.getSubcriptionName(), new MessageRecoveryListener() {
+ public boolean hasSpace() {
+ return true;
+ }
+
+ public boolean recoverMessageReference(MessageId ref) throws Exception {
+ return true;
+ }
+
+ public boolean isDuplicate(MessageId ref) {
+ return false;
+ }
+
+ public boolean recoverMessage(Message message) throws IOException {
+ messageKeyCounter[0]++;
+ seqKeyCounter[0]++;
+
+ DataByteArrayOutputStream mos = new DataByteArrayOutputStream();
+ mos.writeBoolean(TIGHT_ENCODING);
+ mos.writeVarInt(OPENWIRE_VERSION);
+ wireformat.marshal(mos);
+
+ MessagePB.Bean messageRecord = new MessagePB.Bean();
+ messageRecord.setCodec(codec_id);
+ messageRecord.setMessageKey(messageKeyCounter[0]);
+ messageRecord.setSize(message.getSize());
+ messageRecord.setValue(mos.toBuffer());
+ // record.setCompression()
+ manager.store_message(messageRecord);
+
+ QueueEntryPB.Bean entryRecord = new QueueEntryPB.Bean();
+ entryRecord.setQueueKey(containerKeyCounter[0]);
+ entryRecord.setQueueSeq(seqKeyCounter[0]);
+ entryRecord.setMessageKey(messageKeyCounter[0]);
+ entryRecord.setSize(message.getSize());
+ if (message.getExpiration() != 0) {
+ entryRecord.setExpiration(message.getExpiration());
+ }
+ if (message.getRedeliveryCounter() != 0) {
+ entryRecord.setRedeliveries(message.getRedeliveryCounter());
+ }
+ manager.store_queue_entry(entryRecord);
+ return true;
+ }
+ });
+
+ }
+ }
+ }
+ manager.finish();
+ }
+
+ public File getFile() {
+ return file;
+ }
+
+ public void setFile(String file) {
+ setFile(new File(file));
+ }
+
+ public void setFile(File file) {
+ this.file = file;
+ }
+
+ public URI getConfig() {
+ return config;
+ }
+
+ public void setConfig(URI config) {
+ this.config = config;
+ }
+}
Added: activemq/trunk/activemq-console/src/main/java/org/apache/activemq/console/command/store/tar/TarBuffer.java
URL: http://svn.apache.org/viewvc/activemq/trunk/activemq-console/src/main/java/org/apache/activemq/console/command/store/tar/TarBuffer.java?rev=1402652&view=auto
==============================================================================
--- activemq/trunk/activemq-console/src/main/java/org/apache/activemq/console/command/store/tar/TarBuffer.java (added)
+++ activemq/trunk/activemq-console/src/main/java/org/apache/activemq/console/command/store/tar/TarBuffer.java Fri Oct 26 20:28:49 2012
@@ -0,0 +1,462 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ *
+ */
+
+/*
+ * This package is based on the work done by Timothy Gerard Endres
+ * (time@ice.com) to whom the Ant project is very grateful for his great code.
+ */
+
+package org.apache.activemq.console.command.store.tar;
+
+import java.io.InputStream;
+import java.io.OutputStream;
+import java.io.IOException;
+import java.util.Arrays;
+
+/**
+ * The TarBuffer class implements the tar archive concept
+ * of a buffered input stream. This concept goes back to the
+ * days of blocked tape drives and special io devices. In the
+ * Java universe, the only real function that this class
+ * performs is to ensure that files have the correct "block"
+ * size, or other tars will complain.
+ * <p>
+ * You should never have a need to access this class directly.
+ * TarBuffers are created by Tar IO Streams.
+ *
+ */
+
+public class TarBuffer {
+
+ /** Default record size */
+ public static final int DEFAULT_RCDSIZE = (512);
+
+ /** Default block size */
+ public static final int DEFAULT_BLKSIZE = (DEFAULT_RCDSIZE * 20);
+
+ private InputStream inStream;
+ private OutputStream outStream;
+ private byte[] blockBuffer;
+ private int currBlkIdx;
+ private int currRecIdx;
+ private int blockSize;
+ private int recordSize;
+ private int recsPerBlock;
+ private boolean debug;
+
+ /**
+ * Constructor for a TarBuffer on an input stream.
+ * @param inStream the input stream to use
+ */
+ public TarBuffer(InputStream inStream) {
+ this(inStream, TarBuffer.DEFAULT_BLKSIZE);
+ }
+
+ /**
+ * Constructor for a TarBuffer on an input stream.
+ * @param inStream the input stream to use
+ * @param blockSize the block size to use
+ */
+ public TarBuffer(InputStream inStream, int blockSize) {
+ this(inStream, blockSize, TarBuffer.DEFAULT_RCDSIZE);
+ }
+
+ /**
+ * Constructor for a TarBuffer on an input stream.
+ * @param inStream the input stream to use
+ * @param blockSize the block size to use
+ * @param recordSize the record size to use
+ */
+ public TarBuffer(InputStream inStream, int blockSize, int recordSize) {
+ this.inStream = inStream;
+ this.outStream = null;
+
+ this.initialize(blockSize, recordSize);
+ }
+
+ /**
+ * Constructor for a TarBuffer on an output stream.
+ * @param outStream the output stream to use
+ */
+ public TarBuffer(OutputStream outStream) {
+ this(outStream, TarBuffer.DEFAULT_BLKSIZE);
+ }
+
+ /**
+ * Constructor for a TarBuffer on an output stream.
+ * @param outStream the output stream to use
+ * @param blockSize the block size to use
+ */
+ public TarBuffer(OutputStream outStream, int blockSize) {
+ this(outStream, blockSize, TarBuffer.DEFAULT_RCDSIZE);
+ }
+
+ /**
+ * Constructor for a TarBuffer on an output stream.
+ * @param outStream the output stream to use
+ * @param blockSize the block size to use
+ * @param recordSize the record size to use
+ */
+ public TarBuffer(OutputStream outStream, int blockSize, int recordSize) {
+ this.inStream = null;
+ this.outStream = outStream;
+
+ this.initialize(blockSize, recordSize);
+ }
+
+ /**
+ * Initialization common to all constructors.
+ */
+ private void initialize(int blockSize, int recordSize) {
+ this.debug = false;
+ this.blockSize = blockSize;
+ this.recordSize = recordSize;
+ this.recsPerBlock = (this.blockSize / this.recordSize);
+ this.blockBuffer = new byte[this.blockSize];
+
+ if (this.inStream != null) {
+ this.currBlkIdx = -1;
+ this.currRecIdx = this.recsPerBlock;
+ } else {
+ this.currBlkIdx = 0;
+ this.currRecIdx = 0;
+ }
+ }
+
+ /**
+ * Get the TAR Buffer's block size. Blocks consist of multiple records.
+ * @return the block size
+ */
+ public int getBlockSize() {
+ return this.blockSize;
+ }
+
+ /**
+ * Get the TAR Buffer's record size.
+ * @return the record size
+ */
+ public int getRecordSize() {
+ return this.recordSize;
+ }
+
+ /**
+ * Set the debugging flag for the buffer.
+ *
+ * @param debug If true, print debugging output.
+ */
+ public void setDebug(boolean debug) {
+ this.debug = debug;
+ }
+
+ /**
+ * Determine if an archive record indicate End of Archive. End of
+ * archive is indicated by a record that consists entirely of null bytes.
+ *
+ * @param record The record data to check.
+ * @return true if the record data is an End of Archive
+ */
+ public boolean isEOFRecord(byte[] record) {
+ for (int i = 0, sz = getRecordSize(); i < sz; ++i) {
+ if (record[i] != 0) {
+ return false;
+ }
+ }
+
+ return true;
+ }
+
+ /**
+ * Skip over a record on the input stream.
+ * @throws IOException on error
+ */
+ public void skipRecord() throws IOException {
+ if (debug) {
+ System.err.println("SkipRecord: recIdx = " + currRecIdx
+ + " blkIdx = " + currBlkIdx);
+ }
+
+ if (inStream == null) {
+ throw new IOException("reading (via skip) from an output buffer");
+ }
+
+ if (currRecIdx >= recsPerBlock) {
+ if (!readBlock()) {
+ return; // UNDONE
+ }
+ }
+
+ currRecIdx++;
+ }
+
+ /**
+ * Read a record from the input stream and return the data.
+ *
+ * @return The record data.
+ * @throws IOException on error
+ */
+ public byte[] readRecord() throws IOException {
+ if (debug) {
+ System.err.println("ReadRecord: recIdx = " + currRecIdx
+ + " blkIdx = " + currBlkIdx);
+ }
+
+ if (inStream == null) {
+ throw new IOException("reading from an output buffer");
+ }
+
+ if (currRecIdx >= recsPerBlock) {
+ if (!readBlock()) {
+ return null;
+ }
+ }
+
+ byte[] result = new byte[recordSize];
+
+ System.arraycopy(blockBuffer,
+ (currRecIdx * recordSize), result, 0,
+ recordSize);
+
+ currRecIdx++;
+
+ return result;
+ }
+
+ /**
+ * @return false if End-Of-File, else true
+ */
+ private boolean readBlock() throws IOException {
+ if (debug) {
+ System.err.println("ReadBlock: blkIdx = " + currBlkIdx);
+ }
+
+ if (inStream == null) {
+ throw new IOException("reading from an output buffer");
+ }
+
+ currRecIdx = 0;
+
+ int offset = 0;
+ int bytesNeeded = blockSize;
+
+ while (bytesNeeded > 0) {
+ long numBytes = inStream.read(blockBuffer, offset,
+ bytesNeeded);
+
+ //
+ // NOTE
+ // We have fit EOF, and the block is not full!
+ //
+ // This is a broken archive. It does not follow the standard
+ // blocking algorithm. However, because we are generous, and
+ // it requires little effort, we will simply ignore the error
+ // and continue as if the entire block were read. This does
+ // not appear to break anything upstream. We used to return
+ // false in this case.
+ //
+ // Thanks to 'Yohann.Roussel@alcatel.fr' for this fix.
+ //
+ if (numBytes == -1) {
+ if (offset == 0) {
+ // Ensure that we do not read gigabytes of zeros
+ // for a corrupt tar file.
+ // See http://issues.apache.org/bugzilla/show_bug.cgi?id=39924
+ return false;
+ }
+ // However, just leaving the unread portion of the buffer dirty does
+ // cause problems in some cases. This problem is described in
+ // http://issues.apache.org/bugzilla/show_bug.cgi?id=29877
+ //
+ // The solution is to fill the unused portion of the buffer with zeros.
+
+ Arrays.fill(blockBuffer, offset, offset + bytesNeeded, (byte) 0);
+
+ break;
+ }
+
+ offset += numBytes;
+ bytesNeeded -= numBytes;
+
+ if (numBytes != blockSize) {
+ if (debug) {
+ System.err.println("ReadBlock: INCOMPLETE READ "
+ + numBytes + " of " + blockSize
+ + " bytes read.");
+ }
+ }
+ }
+
+ currBlkIdx++;
+
+ return true;
+ }
+
+ /**
+ * Get the current block number, zero based.
+ *
+ * @return The current zero based block number.
+ */
+ public int getCurrentBlockNum() {
+ return currBlkIdx;
+ }
+
+ /**
+ * Get the current record number, within the current block, zero based.
+ * Thus, current offset = (currentBlockNum * recsPerBlk) + currentRecNum.
+ *
+ * @return The current zero based record number.
+ */
+ public int getCurrentRecordNum() {
+ return currRecIdx - 1;
+ }
+
+ /**
+ * Write an archive record to the archive.
+ *
+ * @param record The record data to write to the archive.
+ * @throws IOException on error
+ */
+ public void writeRecord(byte[] record) throws IOException {
+ if (debug) {
+ System.err.println("WriteRecord: recIdx = " + currRecIdx
+ + " blkIdx = " + currBlkIdx);
+ }
+
+ if (outStream == null) {
+ throw new IOException("writing to an input buffer");
+ }
+
+ if (record.length != recordSize) {
+ throw new IOException("record to write has length '"
+ + record.length
+ + "' which is not the record size of '"
+ + recordSize + "'");
+ }
+
+ if (currRecIdx >= recsPerBlock) {
+ writeBlock();
+ }
+
+ System.arraycopy(record, 0, blockBuffer,
+ (currRecIdx * recordSize),
+ recordSize);
+
+ currRecIdx++;
+ }
+
+ /**
+ * Write an archive record to the archive, where the record may be
+ * inside of a larger array buffer. The buffer must be "offset plus
+ * record size" long.
+ *
+ * @param buf The buffer containing the record data to write.
+ * @param offset The offset of the record data within buf.
+ * @throws IOException on error
+ */
+ public void writeRecord(byte[] buf, int offset) throws IOException {
+ if (debug) {
+ System.err.println("WriteRecord: recIdx = " + currRecIdx
+ + " blkIdx = " + currBlkIdx);
+ }
+
+ if (outStream == null) {
+ throw new IOException("writing to an input buffer");
+ }
+
+ if ((offset + recordSize) > buf.length) {
+ throw new IOException("record has length '" + buf.length
+ + "' with offset '" + offset
+ + "' which is less than the record size of '"
+ + recordSize + "'");
+ }
+
+ if (currRecIdx >= recsPerBlock) {
+ writeBlock();
+ }
+
+ System.arraycopy(buf, offset, blockBuffer,
+ (currRecIdx * recordSize),
+ recordSize);
+
+ currRecIdx++;
+ }
+
+ /**
+ * Write a TarBuffer block to the archive.
+ */
+ private void writeBlock() throws IOException {
+ if (debug) {
+ System.err.println("WriteBlock: blkIdx = " + currBlkIdx);
+ }
+
+ if (outStream == null) {
+ throw new IOException("writing to an input buffer");
+ }
+
+ outStream.write(blockBuffer, 0, blockSize);
+ outStream.flush();
+
+ currRecIdx = 0;
+ currBlkIdx++;
+ Arrays.fill(blockBuffer, (byte) 0);
+ }
+
+ /**
+ * Flush the current data block if it has any data in it.
+ */
+ void flushBlock() throws IOException {
+ if (debug) {
+ System.err.println("TarBuffer.flushBlock() called.");
+ }
+
+ if (outStream == null) {
+ throw new IOException("writing to an input buffer");
+ }
+
+ if (currRecIdx > 0) {
+ writeBlock();
+ }
+ }
+
+ /**
+ * Close the TarBuffer. If this is an output buffer, also flush the
+ * current block before closing.
+ * @throws IOException on error
+ */
+ public void close() throws IOException {
+ if (debug) {
+ System.err.println("TarBuffer.closeBuffer().");
+ }
+
+ if (outStream != null) {
+ flushBlock();
+
+ if (outStream != System.out
+ && outStream != System.err) {
+ outStream.close();
+
+ outStream = null;
+ }
+ } else if (inStream != null) {
+ if (inStream != System.in) {
+ inStream.close();
+
+ inStream = null;
+ }
+ }
+ }
+}
Added: activemq/trunk/activemq-console/src/main/java/org/apache/activemq/console/command/store/tar/TarConstants.java
URL: http://svn.apache.org/viewvc/activemq/trunk/activemq-console/src/main/java/org/apache/activemq/console/command/store/tar/TarConstants.java?rev=1402652&view=auto
==============================================================================
--- activemq/trunk/activemq-console/src/main/java/org/apache/activemq/console/command/store/tar/TarConstants.java (added)
+++ activemq/trunk/activemq-console/src/main/java/org/apache/activemq/console/command/store/tar/TarConstants.java Fri Oct 26 20:28:49 2012
@@ -0,0 +1,158 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ *
+ */
+
+/*
+ * This package is based on the work done by Timothy Gerard Endres
+ * (time@ice.com) to whom the Ant project is very grateful for his great code.
+ */
+
+package org.apache.activemq.console.command.store.tar;
+
+/**
+ * This interface contains all the definitions used in the package.
+ *
+ */
+// CheckStyle:InterfaceIsTypeCheck OFF (bc)
+public interface TarConstants {
+
+ /**
+ * The length of the name field in a header buffer.
+ */
+ int NAMELEN = 100;
+
+ /**
+ * The length of the mode field in a header buffer.
+ */
+ int MODELEN = 8;
+
+ /**
+ * The length of the user id field in a header buffer.
+ */
+ int UIDLEN = 8;
+
+ /**
+ * The length of the group id field in a header buffer.
+ */
+ int GIDLEN = 8;
+
+ /**
+ * The length of the checksum field in a header buffer.
+ */
+ int CHKSUMLEN = 8;
+
+ /**
+ * The length of the size field in a header buffer.
+ */
+ int SIZELEN = 12;
+
+ /**
+ * The maximum size of a file in a tar archive (That's 11 sevens, octal).
+ */
+ long MAXSIZE = 077777777777L;
+
+ /**
+ * The length of the magic field in a header buffer.
+ */
+ int MAGICLEN = 8;
+
+ /**
+ * The length of the modification time field in a header buffer.
+ */
+ int MODTIMELEN = 12;
+
+ /**
+ * The length of the user name field in a header buffer.
+ */
+ int UNAMELEN = 32;
+
+ /**
+ * The length of the group name field in a header buffer.
+ */
+ int GNAMELEN = 32;
+
+ /**
+ * The length of the devices field in a header buffer.
+ */
+ int DEVLEN = 8;
+
+ /**
+ * LF_ constants represent the "link flag" of an entry, or more commonly,
+ * the "entry type". This is the "old way" of indicating a normal file.
+ */
+ byte LF_OLDNORM = 0;
+
+ /**
+ * Normal file type.
+ */
+ byte LF_NORMAL = (byte) '0';
+
+ /**
+ * Link file type.
+ */
+ byte LF_LINK = (byte) '1';
+
+ /**
+ * Symbolic link file type.
+ */
+ byte LF_SYMLINK = (byte) '2';
+
+ /**
+ * Character device file type.
+ */
+ byte LF_CHR = (byte) '3';
+
+ /**
+ * Block device file type.
+ */
+ byte LF_BLK = (byte) '4';
+
+ /**
+ * Directory file type.
+ */
+ byte LF_DIR = (byte) '5';
+
+ /**
+ * FIFO (pipe) file type.
+ */
+ byte LF_FIFO = (byte) '6';
+
+ /**
+ * Contiguous file type.
+ */
+ byte LF_CONTIG = (byte) '7';
+
+ /**
+ * The magic tag representing a POSIX tar archive.
+ */
+ String TMAGIC = "ustar";
+
+ /**
+ * The magic tag representing a GNU tar archive.
+ */
+ String GNU_TMAGIC = "ustar ";
+
+ /**
+ * The namr of the GNU tar entry which contains a long name.
+ */
+ String GNU_LONGLINK = "././@LongLink";
+
+ /**
+ * Identifies the *next* file on the tape as having a long name.
+ */
+ byte LF_GNUTYPE_LONGNAME = (byte) 'L';
+}
Added: activemq/trunk/activemq-console/src/main/java/org/apache/activemq/console/command/store/tar/TarEntry.java
URL: http://svn.apache.org/viewvc/activemq/trunk/activemq-console/src/main/java/org/apache/activemq/console/command/store/tar/TarEntry.java?rev=1402652&view=auto
==============================================================================
--- activemq/trunk/activemq-console/src/main/java/org/apache/activemq/console/command/store/tar/TarEntry.java (added)
+++ activemq/trunk/activemq-console/src/main/java/org/apache/activemq/console/command/store/tar/TarEntry.java Fri Oct 26 20:28:49 2012
@@ -0,0 +1,664 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ *
+ */
+
+/*
+ * This package is based on the work done by Timothy Gerard Endres
+ * (time@ice.com) to whom the Ant project is very grateful for his great code.
+ */
+
+package org.apache.activemq.console.command.store.tar;
+
+import java.io.File;
+import java.util.Date;
+import java.util.Locale;
+
+/**
+ * This class represents an entry in a Tar archive. It consists
+ * of the entry's header, as well as the entry's File. Entries
+ * can be instantiated in one of three ways, depending on how
+ * they are to be used.
+ * <p>
+ * TarEntries that are created from the header bytes read from
+ * an archive are instantiated with the TarEntry( byte[] )
+ * constructor. These entries will be used when extracting from
+ * or listing the contents of an archive. These entries have their
+ * header filled in using the header bytes. They also set the File
+ * to null, since they reference an archive entry not a file.
+ * <p>
+ * TarEntries that are created from Files that are to be written
+ * into an archive are instantiated with the TarEntry( File )
+ * constructor. These entries have their header filled in using
+ * the File's information. They also keep a reference to the File
+ * for convenience when writing entries.
+ * <p>
+ * Finally, TarEntries can be constructed from nothing but a name.
+ * This allows the programmer to construct the entry by hand, for
+ * instance when only an InputStream is available for writing to
+ * the archive, and the header information is constructed from
+ * other information. In this case the header fields are set to
+ * defaults and the File is set to null.
+ *
+ * <p>
+ * The C structure for a Tar Entry's header is:
+ * <pre>
+ * struct header {
+ * char name[NAMSIZ];
+ * char mode[8];
+ * char uid[8];
+ * char gid[8];
+ * char size[12];
+ * char mtime[12];
+ * char chksum[8];
+ * char linkflag;
+ * char linkname[NAMSIZ];
+ * char magic[8];
+ * char uname[TUNMLEN];
+ * char gname[TGNMLEN];
+ * char devmajor[8];
+ * char devminor[8];
+ * } header;
+ * </pre>
+ *
+ */
+
+public class TarEntry implements TarConstants {
+ /** The entry's name. */
+ private StringBuffer name;
+
+ /** The entry's permission mode. */
+ private int mode;
+
+ /** The entry's user id. */
+ private int userId;
+
+ /** The entry's group id. */
+ private int groupId;
+
+ /** The entry's size. */
+ private long size;
+
+ /** The entry's modification time. */
+ private long modTime;
+
+ /** The entry's link flag. */
+ private byte linkFlag;
+
+ /** The entry's link name. */
+ private StringBuffer linkName;
+
+ /** The entry's magic tag. */
+ private StringBuffer magic;
+
+ /** The entry's user name. */
+ private StringBuffer userName;
+
+ /** The entry's group name. */
+ private StringBuffer groupName;
+
+ /** The entry's major device number. */
+ private int devMajor;
+
+ /** The entry's minor device number. */
+ private int devMinor;
+
+ /** The entry's file reference */
+ private File file;
+
+ /** Maximum length of a user's name in the tar file */
+ public static final int MAX_NAMELEN = 31;
+
+ /** Default permissions bits for directories */
+ public static final int DEFAULT_DIR_MODE = 040755;
+
+ /** Default permissions bits for files */
+ public static final int DEFAULT_FILE_MODE = 0100644;
+
+ /** Convert millis to seconds */
+ public static final int MILLIS_PER_SECOND = 1000;
+
+ /**
+ * Construct an empty entry and prepares the header values.
+ */
+ private TarEntry () {
+ this.magic = new StringBuffer(TMAGIC);
+ this.name = new StringBuffer();
+ this.linkName = new StringBuffer();
+
+ String user = System.getProperty("user.name", "");
+
+ if (user.length() > MAX_NAMELEN) {
+ user = user.substring(0, MAX_NAMELEN);
+ }
+
+ this.userId = 0;
+ this.groupId = 0;
+ this.userName = new StringBuffer(user);
+ this.groupName = new StringBuffer("");
+ this.file = null;
+ }
+
+ /**
+ * Construct an entry with only a name. This allows the programmer
+ * to construct the entry's header "by hand". File is set to null.
+ *
+ * @param name the entry name
+ */
+ public TarEntry(String name) {
+ this(name, false);
+ }
+
+ /**
+ * Construct an entry with only a name. This allows the programmer
+ * to construct the entry's header "by hand". File is set to null.
+ *
+ * @param name the entry name
+ * @param preserveLeadingSlashes whether to allow leading slashes
+ * in the name.
+ */
+ public TarEntry(String name, boolean preserveLeadingSlashes) {
+ this();
+
+ name = normalizeFileName(name, preserveLeadingSlashes);
+ boolean isDir = name.endsWith("/");
+
+ this.devMajor = 0;
+ this.devMinor = 0;
+ this.name = new StringBuffer(name);
+ this.mode = isDir ? DEFAULT_DIR_MODE : DEFAULT_FILE_MODE;
+ this.linkFlag = isDir ? LF_DIR : LF_NORMAL;
+ this.userId = 0;
+ this.groupId = 0;
+ this.size = 0;
+ this.modTime = (new Date()).getTime() / MILLIS_PER_SECOND;
+ this.linkName = new StringBuffer("");
+ this.userName = new StringBuffer("");
+ this.groupName = new StringBuffer("");
+ this.devMajor = 0;
+ this.devMinor = 0;
+
+ }
+
+ /**
+ * Construct an entry with a name and a link flag.
+ *
+ * @param name the entry name
+ * @param linkFlag the entry link flag.
+ */
+ public TarEntry(String name, byte linkFlag) {
+ this(name);
+ this.linkFlag = linkFlag;
+ if (linkFlag == LF_GNUTYPE_LONGNAME) {
+ magic = new StringBuffer(GNU_TMAGIC);
+ }
+ }
+
+ /**
+ * Construct an entry for a file. File is set to file, and the
+ * header is constructed from information from the file.
+ *
+ * @param file The file that the entry represents.
+ */
+ public TarEntry(File file) {
+ this();
+
+ this.file = file;
+
+ String fileName = normalizeFileName(file.getPath(), false);
+ this.linkName = new StringBuffer("");
+ this.name = new StringBuffer(fileName);
+
+ if (file.isDirectory()) {
+ this.mode = DEFAULT_DIR_MODE;
+ this.linkFlag = LF_DIR;
+
+ int nameLength = name.length();
+ if (nameLength == 0 || name.charAt(nameLength - 1) != '/') {
+ this.name.append("/");
+ }
+ this.size = 0;
+ } else {
+ this.mode = DEFAULT_FILE_MODE;
+ this.linkFlag = LF_NORMAL;
+ this.size = file.length();
+ }
+
+ this.modTime = file.lastModified() / MILLIS_PER_SECOND;
+ this.devMajor = 0;
+ this.devMinor = 0;
+ }
+
+ /**
+ * Construct an entry from an archive's header bytes. File is set
+ * to null.
+ *
+ * @param headerBuf The header bytes from a tar archive entry.
+ */
+ public TarEntry(byte[] headerBuf) {
+ this();
+ parseTarHeader(headerBuf);
+ }
+
+ /**
+ * Determine if the two entries are equal. Equality is determined
+ * by the header names being equal.
+ *
+ * @param it Entry to be checked for equality.
+ * @return True if the entries are equal.
+ */
+ public boolean equals(TarEntry it) {
+ return getName().equals(it.getName());
+ }
+
+ /**
+ * Determine if the two entries are equal. Equality is determined
+ * by the header names being equal.
+ *
+ * @param it Entry to be checked for equality.
+ * @return True if the entries are equal.
+ */
+ public boolean equals(Object it) {
+ if (it == null || getClass() != it.getClass()) {
+ return false;
+ }
+ return equals((TarEntry) it);
+ }
+
+ /**
+ * Hashcodes are based on entry names.
+ *
+ * @return the entry hashcode
+ */
+ public int hashCode() {
+ return getName().hashCode();
+ }
+
+ /**
+ * Determine if the given entry is a descendant of this entry.
+ * Descendancy is determined by the name of the descendant
+ * starting with this entry's name.
+ *
+ * @param desc Entry to be checked as a descendent of this.
+ * @return True if entry is a descendant of this.
+ */
+ public boolean isDescendent(TarEntry desc) {
+ return desc.getName().startsWith(getName());
+ }
+
+ /**
+ * Get this entry's name.
+ *
+ * @return This entry's name.
+ */
+ public String getName() {
+ return name.toString();
+ }
+
+ /**
+ * Set this entry's name.
+ *
+ * @param name This entry's new name.
+ */
+ public void setName(String name) {
+ this.name = new StringBuffer(normalizeFileName(name, false));
+ }
+
+ /**
+ * Set the mode for this entry
+ *
+ * @param mode the mode for this entry
+ */
+ public void setMode(int mode) {
+ this.mode = mode;
+ }
+
+ /**
+ * Get this entry's link name.
+ *
+ * @return This entry's link name.
+ */
+ public String getLinkName() {
+ return linkName.toString();
+ }
+
+ /**
+ * Get this entry's user id.
+ *
+ * @return This entry's user id.
+ */
+ public int getUserId() {
+ return userId;
+ }
+
+ /**
+ * Set this entry's user id.
+ *
+ * @param userId This entry's new user id.
+ */
+ public void setUserId(int userId) {
+ this.userId = userId;
+ }
+
+ /**
+ * Get this entry's group id.
+ *
+ * @return This entry's group id.
+ */
+ public int getGroupId() {
+ return groupId;
+ }
+
+ /**
+ * Set this entry's group id.
+ *
+ * @param groupId This entry's new group id.
+ */
+ public void setGroupId(int groupId) {
+ this.groupId = groupId;
+ }
+
+ /**
+ * Get this entry's user name.
+ *
+ * @return This entry's user name.
+ */
+ public String getUserName() {
+ return userName.toString();
+ }
+
+ /**
+ * Set this entry's user name.
+ *
+ * @param userName This entry's new user name.
+ */
+ public void setUserName(String userName) {
+ this.userName = new StringBuffer(userName);
+ }
+
+ /**
+ * Get this entry's group name.
+ *
+ * @return This entry's group name.
+ */
+ public String getGroupName() {
+ return groupName.toString();
+ }
+
+ /**
+ * Set this entry's group name.
+ *
+ * @param groupName This entry's new group name.
+ */
+ public void setGroupName(String groupName) {
+ this.groupName = new StringBuffer(groupName);
+ }
+
+ /**
+ * Convenience method to set this entry's group and user ids.
+ *
+ * @param userId This entry's new user id.
+ * @param groupId This entry's new group id.
+ */
+ public void setIds(int userId, int groupId) {
+ setUserId(userId);
+ setGroupId(groupId);
+ }
+
+ /**
+ * Convenience method to set this entry's group and user names.
+ *
+ * @param userName This entry's new user name.
+ * @param groupName This entry's new group name.
+ */
+ public void setNames(String userName, String groupName) {
+ setUserName(userName);
+ setGroupName(groupName);
+ }
+
+ /**
+ * Set this entry's modification time. The parameter passed
+ * to this method is in "Java time".
+ *
+ * @param time This entry's new modification time.
+ */
+ public void setModTime(long time) {
+ modTime = time / MILLIS_PER_SECOND;
+ }
+
+ /**
+ * Set this entry's modification time.
+ *
+ * @param time This entry's new modification time.
+ */
+ public void setModTime(Date time) {
+ modTime = time.getTime() / MILLIS_PER_SECOND;
+ }
+
+ /**
+ * Set this entry's modification time.
+ *
+ * @return time This entry's new modification time.
+ */
+ public Date getModTime() {
+ return new Date(modTime * MILLIS_PER_SECOND);
+ }
+
+ /**
+ * Get this entry's file.
+ *
+ * @return This entry's file.
+ */
+ public File getFile() {
+ return file;
+ }
+
+ /**
+ * Get this entry's mode.
+ *
+ * @return This entry's mode.
+ */
+ public int getMode() {
+ return mode;
+ }
+
+ /**
+ * Get this entry's file size.
+ *
+ * @return This entry's file size.
+ */
+ public long getSize() {
+ return size;
+ }
+
+ /**
+ * Set this entry's file size.
+ *
+ * @param size This entry's new file size.
+ */
+ public void setSize(long size) {
+ this.size = size;
+ }
+
+
+ /**
+ * Indicate if this entry is a GNU long name block
+ *
+ * @return true if this is a long name extension provided by GNU tar
+ */
+ public boolean isGNULongNameEntry() {
+ return linkFlag == LF_GNUTYPE_LONGNAME
+ && name.toString().equals(GNU_LONGLINK);
+ }
+
+ /**
+ * Return whether or not this entry represents a directory.
+ *
+ * @return True if this entry is a directory.
+ */
+ public boolean isDirectory() {
+ if (file != null) {
+ return file.isDirectory();
+ }
+
+ if (linkFlag == LF_DIR) {
+ return true;
+ }
+
+ if (getName().endsWith("/")) {
+ return true;
+ }
+
+ return false;
+ }
+
+ /**
+ * If this entry represents a file, and the file is a directory, return
+ * an array of TarEntries for this entry's children.
+ *
+ * @return An array of TarEntry's for this entry's children.
+ */
+ public TarEntry[] getDirectoryEntries() {
+ if (file == null || !file.isDirectory()) {
+ return new TarEntry[0];
+ }
+
+ String[] list = file.list();
+ TarEntry[] result = new TarEntry[list.length];
+
+ for (int i = 0; i < list.length; ++i) {
+ result[i] = new TarEntry(new File(file, list[i]));
+ }
+
+ return result;
+ }
+
+ /**
+ * Write an entry's header information to a header buffer.
+ *
+ * @param outbuf The tar entry header buffer to fill in.
+ */
+ public void writeEntryHeader(byte[] outbuf) {
+ int offset = 0;
+
+ offset = TarUtils.getNameBytes(name, outbuf, offset, NAMELEN);
+ offset = TarUtils.getOctalBytes(mode, outbuf, offset, MODELEN);
+ offset = TarUtils.getOctalBytes(userId, outbuf, offset, UIDLEN);
+ offset = TarUtils.getOctalBytes(groupId, outbuf, offset, GIDLEN);
+ offset = TarUtils.getLongOctalBytes(size, outbuf, offset, SIZELEN);
+ offset = TarUtils.getLongOctalBytes(modTime, outbuf, offset, MODTIMELEN);
+
+ int csOffset = offset;
+
+ for (int c = 0; c < CHKSUMLEN; ++c) {
+ outbuf[offset++] = (byte) ' ';
+ }
+
+ outbuf[offset++] = linkFlag;
+ offset = TarUtils.getNameBytes(linkName, outbuf, offset, NAMELEN);
+ offset = TarUtils.getNameBytes(magic, outbuf, offset, MAGICLEN);
+ offset = TarUtils.getNameBytes(userName, outbuf, offset, UNAMELEN);
+ offset = TarUtils.getNameBytes(groupName, outbuf, offset, GNAMELEN);
+ offset = TarUtils.getOctalBytes(devMajor, outbuf, offset, DEVLEN);
+ offset = TarUtils.getOctalBytes(devMinor, outbuf, offset, DEVLEN);
+
+ while (offset < outbuf.length) {
+ outbuf[offset++] = 0;
+ }
+
+ long chk = TarUtils.computeCheckSum(outbuf);
+
+ TarUtils.getCheckSumOctalBytes(chk, outbuf, csOffset, CHKSUMLEN);
+ }
+
+ /**
+ * Parse an entry's header information from a header buffer.
+ *
+ * @param header The tar entry header buffer to get information from.
+ */
+ public void parseTarHeader(byte[] header) {
+ int offset = 0;
+
+ name = TarUtils.parseName(header, offset, NAMELEN);
+ offset += NAMELEN;
+ mode = (int) TarUtils.parseOctal(header, offset, MODELEN);
+ offset += MODELEN;
+ userId = (int) TarUtils.parseOctal(header, offset, UIDLEN);
+ offset += UIDLEN;
+ groupId = (int) TarUtils.parseOctal(header, offset, GIDLEN);
+ offset += GIDLEN;
+ size = TarUtils.parseOctal(header, offset, SIZELEN);
+ offset += SIZELEN;
+ modTime = TarUtils.parseOctal(header, offset, MODTIMELEN);
+ offset += MODTIMELEN;
+ offset += CHKSUMLEN;
+ linkFlag = header[offset++];
+ linkName = TarUtils.parseName(header, offset, NAMELEN);
+ offset += NAMELEN;
+ magic = TarUtils.parseName(header, offset, MAGICLEN);
+ offset += MAGICLEN;
+ userName = TarUtils.parseName(header, offset, UNAMELEN);
+ offset += UNAMELEN;
+ groupName = TarUtils.parseName(header, offset, GNAMELEN);
+ offset += GNAMELEN;
+ devMajor = (int) TarUtils.parseOctal(header, offset, DEVLEN);
+ offset += DEVLEN;
+ devMinor = (int) TarUtils.parseOctal(header, offset, DEVLEN);
+ }
+
+ /**
+ * Strips Windows' drive letter as well as any leading slashes,
+ * turns path separators into forward slahes.
+ */
+ private static String normalizeFileName(String fileName,
+ boolean preserveLeadingSlashes) {
+ String osname = System.getProperty("os.name").toLowerCase(Locale.ENGLISH);
+
+ if (osname != null) {
+
+ // Strip off drive letters!
+ // REVIEW Would a better check be "(File.separator == '\')"?
+
+ if (osname.startsWith("windows")) {
+ if (fileName.length() > 2) {
+ char ch1 = fileName.charAt(0);
+ char ch2 = fileName.charAt(1);
+
+ if (ch2 == ':'
+ && ((ch1 >= 'a' && ch1 <= 'z')
+ || (ch1 >= 'A' && ch1 <= 'Z'))) {
+ fileName = fileName.substring(2);
+ }
+ }
+ } else if (osname.indexOf("netware") > -1) {
+ int colon = fileName.indexOf(':');
+ if (colon != -1) {
+ fileName = fileName.substring(colon + 1);
+ }
+ }
+ }
+
+ fileName = fileName.replace(File.separatorChar, '/');
+
+ // No absolute pathnames
+ // Windows (and Posix?) paths can start with "\\NetworkDrive\",
+ // so we loop on starting /'s.
+ while (!preserveLeadingSlashes && fileName.startsWith("/")) {
+ fileName = fileName.substring(1);
+ }
+ return fileName;
+ }
+}
Added: activemq/trunk/activemq-console/src/main/java/org/apache/activemq/console/command/store/tar/TarInputStream.java
URL: http://svn.apache.org/viewvc/activemq/trunk/activemq-console/src/main/java/org/apache/activemq/console/command/store/tar/TarInputStream.java?rev=1402652&view=auto
==============================================================================
--- activemq/trunk/activemq-console/src/main/java/org/apache/activemq/console/command/store/tar/TarInputStream.java (added)
+++ activemq/trunk/activemq-console/src/main/java/org/apache/activemq/console/command/store/tar/TarInputStream.java Fri Oct 26 20:28:49 2012
@@ -0,0 +1,402 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ *
+ */
+
+/*
+ * This package is based on the work done by Timothy Gerard Endres
+ * (time@ice.com) to whom the Ant project is very grateful for his great code.
+ */
+
+package org.apache.activemq.console.command.store.tar;
+
+import java.io.FilterInputStream;
+import java.io.IOException;
+import java.io.InputStream;
+import java.io.OutputStream;
+
+/**
+ * The TarInputStream reads a UNIX tar archive as an InputStream.
+ * methods are provided to position at each successive entry in
+ * the archive, and the read each entry as a normal input stream
+ * using read().
+ *
+ */
+public class TarInputStream extends FilterInputStream {
+ private static final int SMALL_BUFFER_SIZE = 256;
+ private static final int BUFFER_SIZE = 8 * 1024;
+ private static final int LARGE_BUFFER_SIZE = 32 * 1024;
+ private static final int BYTE_MASK = 0xFF;
+
+ // CheckStyle:VisibilityModifier OFF - bc
+ protected boolean debug;
+ protected boolean hasHitEOF;
+ protected long entrySize;
+ protected long entryOffset;
+ protected byte[] readBuf;
+ protected TarBuffer buffer;
+ protected TarEntry currEntry;
+
+ /**
+ * This contents of this array is not used at all in this class,
+ * it is only here to avoid repreated object creation during calls
+ * to the no-arg read method.
+ */
+ protected byte[] oneBuf;
+
+ // CheckStyle:VisibilityModifier ON
+
+ /**
+ * Constructor for TarInputStream.
+ * @param is the input stream to use
+ */
+ public TarInputStream(InputStream is) {
+ this(is, TarBuffer.DEFAULT_BLKSIZE, TarBuffer.DEFAULT_RCDSIZE);
+ }
+
+ /**
+ * Constructor for TarInputStream.
+ * @param is the input stream to use
+ * @param blockSize the block size to use
+ */
+ public TarInputStream(InputStream is, int blockSize) {
+ this(is, blockSize, TarBuffer.DEFAULT_RCDSIZE);
+ }
+
+ /**
+ * Constructor for TarInputStream.
+ * @param is the input stream to use
+ * @param blockSize the block size to use
+ * @param recordSize the record size to use
+ */
+ public TarInputStream(InputStream is, int blockSize, int recordSize) {
+ super(is);
+
+ this.buffer = new TarBuffer(is, blockSize, recordSize);
+ this.readBuf = null;
+ this.oneBuf = new byte[1];
+ this.debug = false;
+ this.hasHitEOF = false;
+ }
+
+ /**
+ * Sets the debugging flag.
+ *
+ * @param debug True to turn on debugging.
+ */
+ public void setDebug(boolean debug) {
+ this.debug = debug;
+ buffer.setDebug(debug);
+ }
+
+ /**
+ * Closes this stream. Calls the TarBuffer's close() method.
+ * @throws IOException on error
+ */
+ public void close() throws IOException {
+ buffer.close();
+ }
+
+ /**
+ * Get the record size being used by this stream's TarBuffer.
+ *
+ * @return The TarBuffer record size.
+ */
+ public int getRecordSize() {
+ return buffer.getRecordSize();
+ }
+
+ /**
+ * Get the available data that can be read from the current
+ * entry in the archive. This does not indicate how much data
+ * is left in the entire archive, only in the current entry.
+ * This value is determined from the entry's size header field
+ * and the amount of data already read from the current entry.
+ * Integer.MAX_VALUE is returen in case more than Integer.MAX_VALUE
+ * bytes are left in the current entry in the archive.
+ *
+ * @return The number of available bytes for the current entry.
+ * @throws IOException for signature
+ */
+ public int available() throws IOException {
+ if (entrySize - entryOffset > Integer.MAX_VALUE) {
+ return Integer.MAX_VALUE;
+ }
+ return (int) (entrySize - entryOffset);
+ }
+
+ /**
+ * Skip bytes in the input buffer. This skips bytes in the
+ * current entry's data, not the entire archive, and will
+ * stop at the end of the current entry's data if the number
+ * to skip extends beyond that point.
+ *
+ * @param numToSkip The number of bytes to skip.
+ * @return the number actually skipped
+ * @throws IOException on error
+ */
+ public long skip(long numToSkip) throws IOException {
+ // REVIEW
+ // This is horribly inefficient, but it ensures that we
+ // properly skip over bytes via the TarBuffer...
+ //
+ byte[] skipBuf = new byte[BUFFER_SIZE];
+ long skip = numToSkip;
+ while (skip > 0) {
+ int realSkip = (int) (skip > skipBuf.length ? skipBuf.length : skip);
+ int numRead = read(skipBuf, 0, realSkip);
+ if (numRead == -1) {
+ break;
+ }
+ skip -= numRead;
+ }
+ return (numToSkip - skip);
+ }
+
+ /**
+ * Since we do not support marking just yet, we return false.
+ *
+ * @return False.
+ */
+ public boolean markSupported() {
+ return false;
+ }
+
+ /**
+ * Since we do not support marking just yet, we do nothing.
+ *
+ * @param markLimit The limit to mark.
+ */
+ public void mark(int markLimit) {
+ }
+
+ /**
+ * Since we do not support marking just yet, we do nothing.
+ */
+ public void reset() {
+ }
+
+ /**
+ * Get the next entry in this tar archive. This will skip
+ * over any remaining data in the current entry, if there
+ * is one, and place the input stream at the header of the
+ * next entry, and read the header and instantiate a new
+ * TarEntry from the header bytes and return that entry.
+ * If there are no more entries in the archive, null will
+ * be returned to indicate that the end of the archive has
+ * been reached.
+ *
+ * @return The next TarEntry in the archive, or null.
+ * @throws IOException on error
+ */
+ public TarEntry getNextEntry() throws IOException {
+ if (hasHitEOF) {
+ return null;
+ }
+
+ if (currEntry != null) {
+ long numToSkip = entrySize - entryOffset;
+
+ if (debug) {
+ System.err.println("TarInputStream: SKIP currENTRY '"
+ + currEntry.getName() + "' SZ "
+ + entrySize + " OFF "
+ + entryOffset + " skipping "
+ + numToSkip + " bytes");
+ }
+
+ while (numToSkip > 0) {
+ long skipped = skip(numToSkip);
+ if (skipped <= 0) {
+ throw new RuntimeException("failed to skip current tar"
+ + " entry");
+ }
+ numToSkip -= skipped;
+ }
+
+ readBuf = null;
+ }
+
+ byte[] headerBuf = buffer.readRecord();
+
+ if (headerBuf == null) {
+ if (debug) {
+ System.err.println("READ NULL RECORD");
+ }
+ hasHitEOF = true;
+ } else if (buffer.isEOFRecord(headerBuf)) {
+ if (debug) {
+ System.err.println("READ EOF RECORD");
+ }
+ hasHitEOF = true;
+ }
+
+ if (hasHitEOF) {
+ currEntry = null;
+ } else {
+ currEntry = new TarEntry(headerBuf);
+
+ if (debug) {
+ System.err.println("TarInputStream: SET CURRENTRY '"
+ + currEntry.getName()
+ + "' size = "
+ + currEntry.getSize());
+ }
+
+ entryOffset = 0;
+
+ entrySize = currEntry.getSize();
+ }
+
+ if (currEntry != null && currEntry.isGNULongNameEntry()) {
+ // read in the name
+ StringBuffer longName = new StringBuffer();
+ byte[] buf = new byte[SMALL_BUFFER_SIZE];
+ int length = 0;
+ while ((length = read(buf)) >= 0) {
+ longName.append(new String(buf, 0, length));
+ }
+ getNextEntry();
+ if (currEntry == null) {
+ // Bugzilla: 40334
+ // Malformed tar file - long entry name not followed by entry
+ return null;
+ }
+ // remove trailing null terminator
+ if (longName.length() > 0
+ && longName.charAt(longName.length() - 1) == 0) {
+ longName.deleteCharAt(longName.length() - 1);
+ }
+ currEntry.setName(longName.toString());
+ }
+
+ return currEntry;
+ }
+
+ /**
+ * Reads a byte from the current tar archive entry.
+ *
+ * This method simply calls read( byte[], int, int ).
+ *
+ * @return The byte read, or -1 at EOF.
+ * @throws IOException on error
+ */
+ public int read() throws IOException {
+ int num = read(oneBuf, 0, 1);
+ return num == -1 ? -1 : ((int) oneBuf[0]) & BYTE_MASK;
+ }
+
+ /**
+ * Reads bytes from the current tar archive entry.
+ *
+ * This method is aware of the boundaries of the current
+ * entry in the archive and will deal with them as if they
+ * were this stream's start and EOF.
+ *
+ * @param buf The buffer into which to place bytes read.
+ * @param offset The offset at which to place bytes read.
+ * @param numToRead The number of bytes to read.
+ * @return The number of bytes read, or -1 at EOF.
+ * @throws IOException on error
+ */
+ public int read(byte[] buf, int offset, int numToRead) throws IOException {
+ int totalRead = 0;
+
+ if (entryOffset >= entrySize) {
+ return -1;
+ }
+
+ if ((numToRead + entryOffset) > entrySize) {
+ numToRead = (int) (entrySize - entryOffset);
+ }
+
+ if (readBuf != null) {
+ int sz = (numToRead > readBuf.length) ? readBuf.length
+ : numToRead;
+
+ System.arraycopy(readBuf, 0, buf, offset, sz);
+
+ if (sz >= readBuf.length) {
+ readBuf = null;
+ } else {
+ int newLen = readBuf.length - sz;
+ byte[] newBuf = new byte[newLen];
+
+ System.arraycopy(readBuf, sz, newBuf, 0, newLen);
+
+ readBuf = newBuf;
+ }
+
+ totalRead += sz;
+ numToRead -= sz;
+ offset += sz;
+ }
+
+ while (numToRead > 0) {
+ byte[] rec = buffer.readRecord();
+
+ if (rec == null) {
+ // Unexpected EOF!
+ throw new IOException("unexpected EOF with " + numToRead
+ + " bytes unread");
+ }
+
+ int sz = numToRead;
+ int recLen = rec.length;
+
+ if (recLen > sz) {
+ System.arraycopy(rec, 0, buf, offset, sz);
+
+ readBuf = new byte[recLen - sz];
+
+ System.arraycopy(rec, sz, readBuf, 0, recLen - sz);
+ } else {
+ sz = recLen;
+
+ System.arraycopy(rec, 0, buf, offset, recLen);
+ }
+
+ totalRead += sz;
+ numToRead -= sz;
+ offset += sz;
+ }
+
+ entryOffset += totalRead;
+
+ return totalRead;
+ }
+
+ /**
+ * Copies the contents of the current tar archive entry directly into
+ * an output stream.
+ *
+ * @param out The OutputStream into which to write the entry's data.
+ * @throws IOException on error
+ */
+ public void copyEntryContents(OutputStream out) throws IOException {
+ byte[] buf = new byte[LARGE_BUFFER_SIZE];
+
+ while (true) {
+ int numRead = read(buf, 0, buf.length);
+
+ if (numRead == -1) {
+ break;
+ }
+
+ out.write(buf, 0, numRead);
+ }
+ }
+}