You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@activemq.apache.org by jb...@apache.org on 2015/05/04 17:30:38 UTC
[07/11] activemq-artemis git commit: Moving artemis-tools to
artemis-cli and improving the tooling
http://git-wip-us.apache.org/repos/asf/activemq-artemis/blob/ea3370b3/artemis-cli/src/main/java/org/apache/activemq/artemis/cli/commands/tools/DecodeJournal.java
----------------------------------------------------------------------
diff --git a/artemis-cli/src/main/java/org/apache/activemq/artemis/cli/commands/tools/DecodeJournal.java b/artemis-cli/src/main/java/org/apache/activemq/artemis/cli/commands/tools/DecodeJournal.java
new file mode 100644
index 0000000..515323e
--- /dev/null
+++ b/artemis-cli/src/main/java/org/apache/activemq/artemis/cli/commands/tools/DecodeJournal.java
@@ -0,0 +1,393 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.activemq.artemis.cli.commands.tools;
+
+import java.io.BufferedReader;
+import java.io.File;
+import java.io.FileInputStream;
+import java.io.InputStream;
+import java.io.InputStreamReader;
+import java.io.Reader;
+import java.util.HashMap;
+import java.util.Map;
+import java.util.Properties;
+import java.util.concurrent.atomic.AtomicInteger;
+
+import io.airlift.command.Arguments;
+import io.airlift.command.Command;
+import io.airlift.command.Option;
+import org.apache.activemq.artemis.cli.commands.Action;
+import org.apache.activemq.artemis.cli.commands.ActionContext;
+import org.apache.activemq.artemis.core.journal.RecordInfo;
+import org.apache.activemq.artemis.core.journal.impl.JournalImpl;
+import org.apache.activemq.artemis.core.journal.impl.JournalRecord;
+import org.apache.activemq.artemis.core.journal.impl.NIOSequentialFileFactory;
+import org.apache.activemq.artemis.utils.Base64;
+
+@Command(name = "decode", description = "Decode a journal's internal format into a new journal set of files")
+public class DecodeJournal implements Action
+{
+
+ @Option(name = "--directory", description = "The journal folder (default ../data/journal)")
+ public String directory = "../data/journal";
+
+ @Option(name = "--prefix", description = "The journal prefix (default activemq-datal)")
+ public String prefix = "activemq-data";
+
+ @Option(name = "--suffix", description = "The journal suffix (default amq)")
+ public String suffix = "amq";
+
+ @Option(name = "--file-size", description = "The journal size (default 10485760)")
+ public int size = 10485760;
+
+ @Arguments(description = "The input file name (default=exp.dmp)", required = true)
+ public String input;
+
+ public Object execute(ActionContext context) throws Exception
+ {
+ try
+ {
+ importJournal(directory, prefix, suffix, 2, size, input);
+ }
+ catch (Exception e)
+ {
+ e.printStackTrace();
+ }
+
+ return null;
+ }
+
+
+
+ public static void importJournal(final String directory,
+ final String journalPrefix,
+ final String journalSuffix,
+ final int minFiles,
+ final int fileSize,
+ final String fileInput) throws Exception
+ {
+ FileInputStream fileInputStream = new FileInputStream(new File(fileInput));
+ importJournal(directory, journalPrefix, journalSuffix, minFiles, fileSize, fileInputStream);
+
+ }
+
+ public static void importJournal(final String directory,
+ final String journalPrefix,
+ final String journalSuffix,
+ final int minFiles,
+ final int fileSize,
+ final InputStream stream) throws Exception
+ {
+ Reader reader = new InputStreamReader(stream);
+ importJournal(directory, journalPrefix, journalSuffix, minFiles, fileSize, reader);
+ }
+
+ public static void importJournal(final String directory,
+ final String journalPrefix,
+ final String journalSuffix,
+ final int minFiles,
+ final int fileSize,
+ final Reader reader) throws Exception
+ {
+
+ File journalDir = new File(directory);
+
+ if (!journalDir.exists())
+ {
+ if (!journalDir.mkdirs())
+ System.err.println("Could not create directory " + directory);
+ }
+
+ NIOSequentialFileFactory nio = new NIOSequentialFileFactory(directory, null);
+
+ JournalImpl journal = new JournalImpl(fileSize, minFiles, 0, 0, nio, journalPrefix, journalSuffix, 1);
+
+ if (journal.orderFiles().size() != 0)
+ {
+ throw new IllegalStateException("Import needs to create a brand new journal");
+ }
+
+ journal.start();
+
+ // The journal is empty, as we checked already. Calling load just to initialize the internal data
+ journal.loadInternalOnly();
+
+ BufferedReader buffReader = new BufferedReader(reader);
+
+ String line;
+
+ HashMap<Long, AtomicInteger> txCounters = new HashMap<Long, AtomicInteger>();
+
+ long lineNumber = 0;
+
+ Map<Long, JournalRecord> journalRecords = journal.getRecords();
+
+ while ((line = buffReader.readLine()) != null)
+ {
+ lineNumber++;
+ String[] splitLine = line.split(",");
+ if (splitLine[0].equals("#File"))
+ {
+ txCounters.clear();
+ continue;
+ }
+
+ Properties lineProperties = parseLine(splitLine);
+
+ String operation = null;
+ try
+ {
+ operation = lineProperties.getProperty("operation");
+
+ if (operation.equals("AddRecord"))
+ {
+ RecordInfo info = parseRecord(lineProperties);
+ journal.appendAddRecord(info.id, info.userRecordType, info.data, false);
+ }
+ else if (operation.equals("AddRecordTX"))
+ {
+ long txID = parseLong("txID", lineProperties);
+ AtomicInteger counter = getCounter(txID, txCounters);
+ counter.incrementAndGet();
+ RecordInfo info = parseRecord(lineProperties);
+ journal.appendAddRecordTransactional(txID, info.id, info.userRecordType, info.data);
+ }
+ else if (operation.equals("AddRecordTX"))
+ {
+ long txID = parseLong("txID", lineProperties);
+ AtomicInteger counter = getCounter(txID, txCounters);
+ counter.incrementAndGet();
+ RecordInfo info = parseRecord(lineProperties);
+ journal.appendAddRecordTransactional(txID, info.id, info.userRecordType, info.data);
+ }
+ else if (operation.equals("UpdateTX"))
+ {
+ long txID = parseLong("txID", lineProperties);
+ AtomicInteger counter = getCounter(txID, txCounters);
+ counter.incrementAndGet();
+ RecordInfo info = parseRecord(lineProperties);
+ journal.appendUpdateRecordTransactional(txID, info.id, info.userRecordType, info.data);
+ }
+ else if (operation.equals("Update"))
+ {
+ RecordInfo info = parseRecord(lineProperties);
+ journal.appendUpdateRecord(info.id, info.userRecordType, info.data, false);
+ }
+ else if (operation.equals("DeleteRecord"))
+ {
+ long id = parseLong("id", lineProperties);
+
+ // If not found it means the append/update records were reclaimed already
+ if (journalRecords.get(id) != null)
+ {
+ journal.appendDeleteRecord(id, false);
+ }
+ }
+ else if (operation.equals("DeleteRecordTX"))
+ {
+ long txID = parseLong("txID", lineProperties);
+ long id = parseLong("id", lineProperties);
+ AtomicInteger counter = getCounter(txID, txCounters);
+ counter.incrementAndGet();
+
+ // If not found it means the append/update records were reclaimed already
+ if (journalRecords.get(id) != null)
+ {
+ journal.appendDeleteRecordTransactional(txID, id);
+ }
+ }
+ else if (operation.equals("Prepare"))
+ {
+ long txID = parseLong("txID", lineProperties);
+ int numberOfRecords = parseInt("numberOfRecords", lineProperties);
+ AtomicInteger counter = getCounter(txID, txCounters);
+ byte[] data = parseEncoding("extraData", lineProperties);
+
+ if (counter.get() == numberOfRecords)
+ {
+ journal.appendPrepareRecord(txID, data, false);
+ }
+ else
+ {
+ System.err.println("Transaction " + txID +
+ " at line " +
+ lineNumber +
+ " is incomplete. The prepare record expected " +
+ numberOfRecords +
+ " while the import only had " +
+ counter);
+ }
+ }
+ else if (operation.equals("Commit"))
+ {
+ long txID = parseLong("txID", lineProperties);
+ int numberOfRecords = parseInt("numberOfRecords", lineProperties);
+ AtomicInteger counter = getCounter(txID, txCounters);
+ if (counter.get() == numberOfRecords)
+ {
+ journal.appendCommitRecord(txID, false);
+ }
+ else
+ {
+ System.err.println("Transaction " + txID +
+ " at line " +
+ lineNumber +
+ " is incomplete. The commit record expected " +
+ numberOfRecords +
+ " while the import only had " +
+ counter);
+ }
+ }
+ else if (operation.equals("Rollback"))
+ {
+ long txID = parseLong("txID", lineProperties);
+ journal.appendRollbackRecord(txID, false);
+ }
+ else
+ {
+ System.err.println("Invalid operation " + operation + " at line " + lineNumber);
+ }
+ }
+ catch (Exception ex)
+ {
+ System.err.println("Error at line " + lineNumber + ", operation=" + operation + " msg = " + ex.getMessage());
+ }
+ }
+
+ journal.stop();
+ }
+
+ protected static AtomicInteger getCounter(final Long txID, final Map<Long, AtomicInteger> txCounters)
+ {
+
+ AtomicInteger counter = txCounters.get(txID);
+ if (counter == null)
+ {
+ counter = new AtomicInteger(0);
+ txCounters.put(txID, counter);
+ }
+
+ return counter;
+ }
+
+ protected static RecordInfo parseRecord(final Properties properties) throws Exception
+ {
+ long id = parseLong("id", properties);
+ byte userRecordType = parseByte("userRecordType", properties);
+ boolean isUpdate = parseBoolean("isUpdate", properties);
+ byte[] data = parseEncoding("data", properties);
+ return new RecordInfo(id, userRecordType, data, isUpdate, (short)0);
+ }
+
+ private static byte[] parseEncoding(final String name, final Properties properties) throws Exception
+ {
+ String value = parseString(name, properties);
+
+ return decode(value);
+ }
+
+ /**
+ * @param properties
+ * @return
+ */
+ private static int parseInt(final String name, final Properties properties) throws Exception
+ {
+ String value = parseString(name, properties);
+
+ return Integer.parseInt(value);
+ }
+
+ private static long parseLong(final String name, final Properties properties) throws Exception
+ {
+ String value = parseString(name, properties);
+
+ return Long.parseLong(value);
+ }
+
+ private static boolean parseBoolean(final String name, final Properties properties) throws Exception
+ {
+ String value = parseString(name, properties);
+
+ return Boolean.parseBoolean(value);
+ }
+
+ private static byte parseByte(final String name, final Properties properties) throws Exception
+ {
+ String value = parseString(name, properties);
+
+ return Byte.parseByte(value);
+ }
+
+ /**
+ * @param name
+ * @param properties
+ * @return
+ * @throws Exception
+ */
+ private static String parseString(final String name, final Properties properties) throws Exception
+ {
+ String value = properties.getProperty(name);
+
+ if (value == null)
+ {
+ throw new Exception("property " + name + " not found");
+ }
+ return value;
+ }
+
+ protected static Properties parseLine(final String[] splitLine)
+ {
+ Properties properties = new Properties();
+
+ for (String el : splitLine)
+ {
+ String[] tuple = el.split("@");
+ if (tuple.length == 2)
+ {
+ properties.put(tuple[0], tuple[1]);
+ }
+ else
+ {
+ properties.put(tuple[0], tuple[0]);
+ }
+ }
+
+ return properties;
+ }
+
+ private static byte[] decode(final String data)
+ {
+ return Base64.decode(data, Base64.DONT_BREAK_LINES | Base64.URL_SAFE);
+ }
+
+
+ public void printUsage()
+ {
+ for (int i = 0; i < 10; i++)
+ {
+ System.err.println();
+ }
+ System.err.println("This method will export the journal at low level record.");
+ System.err.println();
+ System.err.println();
+ for (int i = 0; i < 10; i++)
+ {
+ System.err.println();
+ }
+ }
+
+}
http://git-wip-us.apache.org/repos/asf/activemq-artemis/blob/ea3370b3/artemis-cli/src/main/java/org/apache/activemq/artemis/cli/commands/tools/EncodeJournal.java
----------------------------------------------------------------------
diff --git a/artemis-cli/src/main/java/org/apache/activemq/artemis/cli/commands/tools/EncodeJournal.java b/artemis-cli/src/main/java/org/apache/activemq/artemis/cli/commands/tools/EncodeJournal.java
new file mode 100644
index 0000000..01f7ac5
--- /dev/null
+++ b/artemis-cli/src/main/java/org/apache/activemq/artemis/cli/commands/tools/EncodeJournal.java
@@ -0,0 +1,229 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.activemq.artemis.cli.commands.tools;
+
+import java.io.BufferedOutputStream;
+import java.io.FileOutputStream;
+import java.io.PrintStream;
+import java.util.List;
+
+import io.airlift.command.Command;
+import io.airlift.command.Option;
+import org.apache.activemq.artemis.cli.commands.Action;
+import org.apache.activemq.artemis.cli.commands.ActionContext;
+import org.apache.activemq.artemis.core.journal.RecordInfo;
+import org.apache.activemq.artemis.core.journal.SequentialFileFactory;
+import org.apache.activemq.artemis.core.journal.impl.JournalFile;
+import org.apache.activemq.artemis.core.journal.impl.JournalImpl;
+import org.apache.activemq.artemis.core.journal.impl.JournalReaderCallback;
+import org.apache.activemq.artemis.core.journal.impl.NIOSequentialFileFactory;
+import org.apache.activemq.artemis.utils.Base64;
+
+@Command(name = "encode", description = "Encode a set of journal files into an internal encoded data format")
+public class EncodeJournal implements Action
+{
+
+ @Option(name = "--directory", description = "The journal folder (default ../data/journal)")
+ public String directory = "../data/journal";
+
+ @Option(name = "--prefix", description = "The journal prefix (default activemq-datal)")
+ public String prefix = "activemq-data";
+
+ @Option(name = "--suffix", description = "The journal suffix (default amq)")
+ public String suffix = "amq";
+
+ @Option(name = "--file-size", description = "The journal size (default 10485760)")
+ public int size = 10485760;
+
+
+ public Object execute(ActionContext context) throws Exception
+ {
+ try
+ {
+ exportJournal(directory, prefix, suffix, 2, size);
+ }
+ catch (Exception e)
+ {
+ e.printStackTrace();
+ }
+
+ return null;
+ }
+
+
+ public static void exportJournal(final String directory,
+ final String journalPrefix,
+ final String journalSuffix,
+ final int minFiles,
+ final int fileSize) throws Exception
+ {
+
+
+ exportJournal(directory, journalPrefix, journalSuffix, minFiles, fileSize, System.out);
+ }
+ public static void exportJournal(final String directory,
+ final String journalPrefix,
+ final String journalSuffix,
+ final int minFiles,
+ final int fileSize,
+ final String fileName) throws Exception
+ {
+ FileOutputStream fileOutputStream = new FileOutputStream(fileName);
+ BufferedOutputStream bufferedOutputStream = new BufferedOutputStream(fileOutputStream);
+ PrintStream out = new PrintStream(bufferedOutputStream);
+ try
+ {
+ exportJournal(directory, journalPrefix, journalSuffix, minFiles, fileSize, out);
+ }
+ finally
+ {
+ out.close();
+ fileOutputStream.close();
+ }
+
+
+ }
+
+ public static void exportJournal(final String directory,
+ final String journalPrefix,
+ final String journalSuffix,
+ final int minFiles,
+ final int fileSize,
+ final PrintStream out) throws Exception
+ {
+ NIOSequentialFileFactory nio = new NIOSequentialFileFactory(directory, null);
+
+ JournalImpl journal = new JournalImpl(fileSize, minFiles, 0, 0, nio, journalPrefix, journalSuffix, 1);
+
+ List<JournalFile> files = journal.orderFiles();
+
+ for (JournalFile file : files)
+ {
+ out.println("#File," + file);
+
+ exportJournalFile(out, nio, file);
+ }
+ }
+
+ /**
+ * @param out
+ * @param fileFactory
+ * @param file
+ * @throws Exception
+ */
+ public static void exportJournalFile(final PrintStream out,
+ final SequentialFileFactory fileFactory,
+ final JournalFile file) throws Exception
+ {
+ JournalImpl.readJournalFile(fileFactory, file, new JournalReaderCallback()
+ {
+
+ public void onReadUpdateRecordTX(final long transactionID, final RecordInfo recordInfo) throws Exception
+ {
+ out.println("operation@UpdateTX,txID@" + transactionID + "," + describeRecord(recordInfo));
+ }
+
+ public void onReadUpdateRecord(final RecordInfo recordInfo) throws Exception
+ {
+ out.println("operation@Update," + describeRecord(recordInfo));
+ }
+
+ public void onReadRollbackRecord(final long transactionID) throws Exception
+ {
+ out.println("operation@Rollback,txID@" + transactionID);
+ }
+
+ public void onReadPrepareRecord(final long transactionID, final byte[] extraData, final int numberOfRecords) throws Exception
+ {
+ out.println("operation@Prepare,txID@" + transactionID +
+ ",numberOfRecords@" +
+ numberOfRecords +
+ ",extraData@" +
+ encode(extraData));
+ }
+
+ public void onReadDeleteRecordTX(final long transactionID, final RecordInfo recordInfo) throws Exception
+ {
+ out.println("operation@DeleteRecordTX,txID@" + transactionID +
+ "," +
+ describeRecord(recordInfo));
+ }
+
+ public void onReadDeleteRecord(final long recordID) throws Exception
+ {
+ out.println("operation@DeleteRecord,id@" + recordID);
+ }
+
+ public void onReadCommitRecord(final long transactionID, final int numberOfRecords) throws Exception
+ {
+ out.println("operation@Commit,txID@" + transactionID + ",numberOfRecords@" + numberOfRecords);
+ }
+
+ public void onReadAddRecordTX(final long transactionID, final RecordInfo recordInfo) throws Exception
+ {
+ out.println("operation@AddRecordTX,txID@" + transactionID + "," + describeRecord(recordInfo));
+ }
+
+ public void onReadAddRecord(final RecordInfo recordInfo) throws Exception
+ {
+ out.println("operation@AddRecord," + describeRecord(recordInfo));
+ }
+
+ public void markAsDataFile(final JournalFile file)
+ {
+ }
+ });
+ }
+
+ private static String describeRecord(final RecordInfo recordInfo)
+ {
+ return "id@" + recordInfo.id +
+ ",userRecordType@" +
+ recordInfo.userRecordType +
+ ",length@" +
+ recordInfo.data.length +
+ ",isUpdate@" +
+ recordInfo.isUpdate +
+ ",compactCount@" +
+ recordInfo.compactCount +
+ ",data@" +
+ encode(recordInfo.data);
+ }
+
+ private static String encode(final byte[] data)
+ {
+ return Base64.encodeBytes(data, 0, data.length, Base64.DONT_BREAK_LINES | Base64.URL_SAFE);
+ }
+
+
+ public void printUsage()
+ {
+ for (int i = 0; i < 10; i++)
+ {
+ System.err.println();
+ }
+ System.err.println("This method will export the journal at low level record.");
+ System.err.println();
+ System.err.println();
+ for (int i = 0; i < 10; i++)
+ {
+ System.err.println();
+ }
+ }
+
+
+}
http://git-wip-us.apache.org/repos/asf/activemq-artemis/blob/ea3370b3/artemis-cli/src/main/java/org/apache/activemq/artemis/cli/commands/tools/HelpData.java
----------------------------------------------------------------------
diff --git a/artemis-cli/src/main/java/org/apache/activemq/artemis/cli/commands/tools/HelpData.java b/artemis-cli/src/main/java/org/apache/activemq/artemis/cli/commands/tools/HelpData.java
new file mode 100644
index 0000000..c739a01
--- /dev/null
+++ b/artemis-cli/src/main/java/org/apache/activemq/artemis/cli/commands/tools/HelpData.java
@@ -0,0 +1,40 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.activemq.artemis.cli.commands.tools;
+
+import java.util.ArrayList;
+import java.util.List;
+
+import io.airlift.command.Help;
+import org.apache.activemq.artemis.cli.commands.Action;
+import org.apache.activemq.artemis.cli.commands.ActionContext;
+
+public class HelpData extends Help implements Action
+{
+
+ @Override
+ public Object execute(ActionContext context) throws Exception
+ {
+
+ List<String> commands = new ArrayList<>(1);
+ commands.add("data");
+ help(global, commands);
+ return null;
+ }
+
+}
http://git-wip-us.apache.org/repos/asf/activemq-artemis/blob/ea3370b3/artemis-cli/src/main/java/org/apache/activemq/artemis/cli/commands/tools/PrintData.java
----------------------------------------------------------------------
diff --git a/artemis-cli/src/main/java/org/apache/activemq/artemis/cli/commands/tools/PrintData.java b/artemis-cli/src/main/java/org/apache/activemq/artemis/cli/commands/tools/PrintData.java
new file mode 100644
index 0000000..a5e18c0
--- /dev/null
+++ b/artemis-cli/src/main/java/org/apache/activemq/artemis/cli/commands/tools/PrintData.java
@@ -0,0 +1,379 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.activemq.artemis.cli.commands.tools;
+
+import java.io.File;
+import java.util.HashMap;
+import java.util.HashSet;
+import java.util.List;
+import java.util.Map;
+import java.util.Set;
+import java.util.concurrent.Executor;
+import java.util.concurrent.ExecutorService;
+import java.util.concurrent.Executors;
+import java.util.concurrent.ScheduledExecutorService;
+
+import io.airlift.command.Command;
+import io.airlift.command.Option;
+import org.apache.activemq.artemis.api.core.ActiveMQBuffer;
+import org.apache.activemq.artemis.api.core.ActiveMQBuffers;
+import org.apache.activemq.artemis.api.core.SimpleString;
+import org.apache.activemq.artemis.cli.commands.Action;
+import org.apache.activemq.artemis.cli.commands.ActionContext;
+import org.apache.activemq.artemis.core.journal.RecordInfo;
+import org.apache.activemq.artemis.core.paging.PagedMessage;
+import org.apache.activemq.artemis.core.paging.PagingManager;
+import org.apache.activemq.artemis.core.paging.PagingStore;
+import org.apache.activemq.artemis.core.paging.PagingStoreFactory;
+import org.apache.activemq.artemis.core.paging.cursor.PagePosition;
+import org.apache.activemq.artemis.core.paging.cursor.impl.PagePositionImpl;
+import org.apache.activemq.artemis.core.paging.impl.Page;
+import org.apache.activemq.artemis.core.paging.impl.PageTransactionInfoImpl;
+import org.apache.activemq.artemis.core.paging.impl.PagingManagerImpl;
+import org.apache.activemq.artemis.core.paging.impl.PagingStoreFactoryNIO;
+import org.apache.activemq.artemis.core.persistence.StorageManager;
+import org.apache.activemq.artemis.core.persistence.impl.journal.DescribeJournal;
+import org.apache.activemq.artemis.core.persistence.impl.journal.JournalRecordIds;
+import org.apache.activemq.artemis.core.persistence.impl.journal.JournalStorageManager;
+import org.apache.activemq.artemis.core.persistence.impl.nullpm.NullStorageManager;
+import org.apache.activemq.artemis.core.server.impl.FileLockNodeManager;
+import org.apache.activemq.artemis.core.settings.HierarchicalRepository;
+import org.apache.activemq.artemis.core.settings.impl.AddressSettings;
+import org.apache.activemq.artemis.core.settings.impl.HierarchicalObjectRepository;
+import org.apache.activemq.artemis.utils.ExecutorFactory;
+
+@Command(name = "print", description = "Print data records information (WARNING: don't use while a production server is running)")
+public class PrintData implements Action
+{
+ @Option(name = "--bindings", description = "The folder used for bindings (default ../data/bindings)")
+ public String binding = "../data/bindings";
+
+ @Option(name = "--journal", description = "The folder used for messages journal (default ../data/journal)")
+ public String journal = "../data/journal";
+
+ @Option(name = "--paging", description = "The folder used for paging (default ../data/paging)")
+ public String paging = "../data/paging";
+
+
+ @Override
+ public Object execute(ActionContext context) throws Exception
+ {
+ printData(binding, journal, paging);
+ return null;
+ }
+
+ public static void printData(String bindingsDirectory, String messagesDirectory, String pagingDirectory)
+ {
+ File serverLockFile = new File(messagesDirectory, "server.lock");
+
+ if (serverLockFile.isFile())
+ {
+ try
+ {
+ FileLockNodeManager fileLock = new FileLockNodeManager(messagesDirectory, false);
+ fileLock.start();
+ System.out.println("********************************************");
+ System.out.println("Server's ID=" + fileLock.getNodeId().toString());
+ System.out.println("********************************************");
+ fileLock.stop();
+ }
+ catch (Exception e)
+ {
+ e.printStackTrace();
+ }
+ }
+
+ System.out.println("********************************************");
+ System.out.println("B I N D I N G S J O U R N A L");
+ System.out.println("********************************************");
+
+ try
+ {
+ DescribeJournal.describeBindingsJournal(bindingsDirectory);
+ }
+ catch (Exception e)
+ {
+ e.printStackTrace();
+ }
+
+ System.out.println();
+ System.out.println("********************************************");
+ System.out.println("M E S S A G E S J O U R N A L");
+ System.out.println("********************************************");
+
+ DescribeJournal describeJournal = null;
+ try
+ {
+ describeJournal = DescribeJournal.describeMessagesJournal(messagesDirectory);
+ }
+ catch (Exception e)
+ {
+ e.printStackTrace();
+ return;
+ }
+
+
+ try
+ {
+ System.out.println();
+ System.out.println("********************************************");
+ System.out.println("P A G I N G");
+ System.out.println("********************************************");
+
+ printPages(pagingDirectory, describeJournal);
+ }
+ catch (Exception e)
+ {
+ e.printStackTrace();
+ return;
+ }
+
+
+ }
+
+
+ private static void printPages(String pageDirectory, DescribeJournal describeJournal)
+ {
+ try
+ {
+
+ PageCursorsInfo cursorACKs = calculateCursorsInfo(describeJournal.getRecords());
+
+ Set<Long> pgTXs = cursorACKs.getPgTXs();
+
+ ScheduledExecutorService scheduled = Executors.newScheduledThreadPool(1);
+ final ExecutorService executor = Executors.newFixedThreadPool(10);
+ ExecutorFactory execfactory = new ExecutorFactory()
+ {
+ @Override
+ public Executor getExecutor()
+ {
+ return executor;
+ }
+ };
+ final StorageManager sm = new NullStorageManager();
+ PagingStoreFactory pageStoreFactory =
+ new PagingStoreFactoryNIO(sm, pageDirectory, 1000L, scheduled, execfactory, false, null);
+ HierarchicalRepository<AddressSettings> addressSettingsRepository = new HierarchicalObjectRepository<AddressSettings>();
+ addressSettingsRepository.setDefault(new AddressSettings());
+ PagingManager manager = new PagingManagerImpl(pageStoreFactory, addressSettingsRepository);
+
+ manager.start();
+
+ SimpleString[] stores = manager.getStoreNames();
+
+ for (SimpleString store : stores)
+ {
+ PagingStore pgStore = manager.getPageStore(store);
+ String folder = null;
+
+ if (pgStore != null)
+ {
+ folder = pgStore.getFolder();
+ }
+ System.out.println("####################################################################################################");
+ System.out.println("Exploring store " + store + " folder = " + folder);
+ int pgid = (int) pgStore.getFirstPage();
+ for (int pg = 0; pg < pgStore.getNumberOfPages(); pg++)
+ {
+ System.out.println("******* Page " + pgid);
+ Page page = pgStore.createPage(pgid);
+ page.open();
+ List<PagedMessage> msgs = page.read(sm);
+ page.close();
+
+ int msgID = 0;
+
+ for (PagedMessage msg : msgs)
+ {
+ msg.initMessage(sm);
+ System.out.print("pg=" + pgid + ", msg=" + msgID + ",pgTX=" + msg.getTransactionID() + ",userMessageID=" + (msg.getMessage().getUserID() != null ? msg.getMessage().getUserID() : "") + ", msg=" + msg.getMessage());
+ System.out.print(",Queues = ");
+ long[] q = msg.getQueueIDs();
+ for (int i = 0; i < q.length; i++)
+ {
+ System.out.print(q[i]);
+
+ PagePosition posCheck = new PagePositionImpl(pgid, msgID);
+
+ boolean acked = false;
+
+ Set<PagePosition> positions = cursorACKs.getCursorRecords().get(q[i]);
+ if (positions != null)
+ {
+ acked = positions.contains(posCheck);
+ }
+
+ if (acked)
+ {
+ System.out.print(" (ACK)");
+ }
+
+ if (cursorACKs.getCompletePages(q[i]).contains(Long.valueOf(pgid)))
+ {
+ System.out.println(" (PG-COMPLETE)");
+ }
+
+
+ if (i + 1 < q.length)
+ {
+ System.out.print(",");
+ }
+ }
+ if (msg.getTransactionID() >= 0 && !pgTXs.contains(msg.getTransactionID()))
+ {
+ System.out.print(", **PG_TX_NOT_FOUND**");
+ }
+ System.out.println();
+ msgID++;
+ }
+
+ pgid++;
+
+ }
+ }
+
+ }
+ catch (Exception e)
+ {
+ e.printStackTrace();
+ }
+ }
+
+
+ /** Calculate the acks on the page system */
+ protected static PageCursorsInfo calculateCursorsInfo(List<RecordInfo> records) throws Exception
+ {
+
+ PageCursorsInfo cursorInfo = new PageCursorsInfo();
+
+
+ for (RecordInfo record : records)
+ {
+ byte[] data = record.data;
+
+ ActiveMQBuffer buff = ActiveMQBuffers.wrappedBuffer(data);
+
+ if (record.userRecordType == JournalRecordIds.ACKNOWLEDGE_CURSOR)
+ {
+ JournalStorageManager.CursorAckRecordEncoding encoding = new JournalStorageManager.CursorAckRecordEncoding();
+ encoding.decode(buff);
+
+ Set<PagePosition> set = cursorInfo.getCursorRecords().get(encoding.queueID);
+
+ if (set == null)
+ {
+ set = new HashSet<PagePosition>();
+ cursorInfo.getCursorRecords().put(encoding.queueID, set);
+ }
+
+ set.add(encoding.position);
+ }
+ else if (record.userRecordType == JournalRecordIds.PAGE_CURSOR_COMPLETE)
+ {
+ JournalStorageManager.CursorAckRecordEncoding encoding = new JournalStorageManager.CursorAckRecordEncoding();
+ encoding.decode(buff);
+
+ Long queueID = Long.valueOf(encoding.queueID);
+ Long pageNR = Long.valueOf(encoding.position.getPageNr());
+
+ if (!cursorInfo.getCompletePages(queueID).add(pageNR))
+ {
+ System.err.println("Page " + pageNR + " has been already set as complete on queue " + queueID);
+ }
+ }
+ else if (record.userRecordType == JournalRecordIds.PAGE_TRANSACTION)
+ {
+ if (record.isUpdate)
+ {
+ JournalStorageManager.PageUpdateTXEncoding pageUpdate = new JournalStorageManager.PageUpdateTXEncoding();
+
+ pageUpdate.decode(buff);
+ cursorInfo.getPgTXs().add(pageUpdate.pageTX);
+ }
+ else
+ {
+ PageTransactionInfoImpl pageTransactionInfo = new PageTransactionInfoImpl();
+
+ pageTransactionInfo.decode(buff);
+
+ pageTransactionInfo.setRecordID(record.id);
+ cursorInfo.getPgTXs().add(pageTransactionInfo.getTransactionID());
+ }
+ }
+ }
+
+ return cursorInfo;
+ }
+
+
+ private static class PageCursorsInfo
+ {
+ private final Map<Long, Set<PagePosition>> cursorRecords = new HashMap<Long, Set<PagePosition>>();
+
+ private final Set<Long> pgTXs = new HashSet<Long>();
+
+ private final Map<Long, Set<Long>> completePages = new HashMap<Long, Set<Long>>();
+
+ public PageCursorsInfo()
+ {
+ }
+
+
+ /**
+ * @return the pgTXs
+ */
+ public Set<Long> getPgTXs()
+ {
+ return pgTXs;
+ }
+
+
+ /**
+ * @return the cursorRecords
+ */
+ public Map<Long, Set<PagePosition>> getCursorRecords()
+ {
+ return cursorRecords;
+ }
+
+
+ /**
+ * @return the completePages
+ */
+ public Map<Long, Set<Long>> getCompletePages()
+ {
+ return completePages;
+ }
+
+ public Set<Long> getCompletePages(Long queueID)
+ {
+ Set<Long> completePagesSet = completePages.get(queueID);
+
+ if (completePagesSet == null)
+ {
+ completePagesSet = new HashSet<Long>();
+ completePages.put(queueID, completePagesSet);
+ }
+
+ return completePagesSet;
+ }
+
+ }
+
+
+}
http://git-wip-us.apache.org/repos/asf/activemq-artemis/blob/ea3370b3/artemis-cli/src/main/java/org/apache/activemq/artemis/cli/commands/tools/XmlDataConstants.java
----------------------------------------------------------------------
diff --git a/artemis-cli/src/main/java/org/apache/activemq/artemis/cli/commands/tools/XmlDataConstants.java b/artemis-cli/src/main/java/org/apache/activemq/artemis/cli/commands/tools/XmlDataConstants.java
new file mode 100644
index 0000000..2923760
--- /dev/null
+++ b/artemis-cli/src/main/java/org/apache/activemq/artemis/cli/commands/tools/XmlDataConstants.java
@@ -0,0 +1,123 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.activemq.artemis.cli.commands.tools;
+
+/**
+ * The constants shared by <code>org.apache.activemq.tools.XmlDataImporter</code> and
+ * <code>org.apache.activemq.tools.XmlDataExporter</code>.
+ */
+public final class XmlDataConstants
+{
+ private XmlDataConstants()
+ {
+ // Utility
+ }
+ static final String XML_VERSION = "1.0";
+ static final String DOCUMENT_PARENT = "activemq-journal";
+ static final String BINDINGS_PARENT = "bindings";
+ static final String BINDINGS_CHILD = "binding";
+ static final String BINDING_ADDRESS = "address";
+ static final String BINDING_FILTER_STRING = "filter-string";
+ static final String BINDING_QUEUE_NAME = "queue-name";
+ static final String BINDING_ID = "id";
+ static final String JMS_CONNECTION_FACTORY = "jms-connection-factory";
+ static final String JMS_CONNECTION_FACTORIES = "jms-connection-factories";
+ static final String MESSAGES_PARENT = "messages";
+ static final String MESSAGES_CHILD = "message";
+ static final String MESSAGE_ID = "id";
+ static final String MESSAGE_PRIORITY = "priority";
+ static final String MESSAGE_EXPIRATION = "expiration";
+ static final String MESSAGE_TIMESTAMP = "timestamp";
+ static final String DEFAULT_TYPE_PRETTY = "default";
+ static final String BYTES_TYPE_PRETTY = "bytes";
+ static final String MAP_TYPE_PRETTY = "map";
+ static final String OBJECT_TYPE_PRETTY = "object";
+ static final String STREAM_TYPE_PRETTY = "stream";
+ static final String TEXT_TYPE_PRETTY = "text";
+ static final String MESSAGE_TYPE = "type";
+ static final String MESSAGE_IS_LARGE = "isLarge";
+ static final String MESSAGE_USER_ID = "user-id";
+ static final String MESSAGE_BODY = "body";
+ static final String PROPERTIES_PARENT = "properties";
+ static final String PROPERTIES_CHILD = "property";
+ static final String PROPERTY_NAME = "name";
+ static final String PROPERTY_VALUE = "value";
+ static final String PROPERTY_TYPE = "type";
+ static final String QUEUES_PARENT = "queues";
+ static final String QUEUES_CHILD = "queue";
+ static final String QUEUE_NAME = "name";
+ static final String PROPERTY_TYPE_BOOLEAN = "boolean";
+ static final String PROPERTY_TYPE_BYTE = "byte";
+ static final String PROPERTY_TYPE_BYTES = "bytes";
+ static final String PROPERTY_TYPE_SHORT = "short";
+ static final String PROPERTY_TYPE_INTEGER = "integer";
+ static final String PROPERTY_TYPE_LONG = "long";
+ static final String PROPERTY_TYPE_FLOAT = "float";
+ static final String PROPERTY_TYPE_DOUBLE = "double";
+ static final String PROPERTY_TYPE_STRING = "string";
+ static final String PROPERTY_TYPE_SIMPLE_STRING = "simple-string";
+
+ static final String JMS_CONNECTION_FACTORY_NAME = "name";
+ static final String JMS_CONNECTION_FACTORY_CLIENT_ID = "client-id";
+ static final String JMS_CONNECTION_FACTORY_CALL_FAILOVER_TIMEOUT = "call-failover-timeout";
+ static final String JMS_CONNECTION_FACTORY_CALL_TIMEOUT = "call-timeout";
+ static final String JMS_CONNECTION_FACTORY_CLIENT_FAILURE_CHECK_PERIOD = "client-failure-check-period";
+ static final String JMS_CONNECTION_FACTORY_CONFIRMATION_WINDOW_SIZE = "confirmation-window-size";
+ static final String JMS_CONNECTION_FACTORY_CONNECTION_TTL = "connection-ttl";
+ static final String JMS_CONNECTION_FACTORY_CONSUMER_MAX_RATE = "consumer-max-rate";
+ static final String JMS_CONNECTION_FACTORY_CONSUMER_WINDOW_SIZE = "consumer-window-size";
+ static final String JMS_CONNECTION_FACTORY_DISCOVERY_GROUP_NAME = "discovery-group-name";
+ static final String JMS_CONNECTION_FACTORY_DUPS_OK_BATCH_SIZE = "dups-ok-batch-size";
+ static final String JMS_CONNECTION_FACTORY_TYPE = "type";
+ static final String JMS_CONNECTION_FACTORY_GROUP_ID = "group-id";
+ static final String JMS_CONNECTION_FACTORY_LOAD_BALANCING_POLICY_CLASS_NAME = "load-balancing-policy-class-name";
+ static final String JMS_CONNECTION_FACTORY_MAX_RETRY_INTERVAL = "max-retry-interval";
+ static final String JMS_CONNECTION_FACTORY_MIN_LARGE_MESSAGE_SIZE = "min-large-message-size";
+ static final String JMS_CONNECTION_FACTORY_PRODUCER_MAX_RATE = "producer-max-rate";
+ static final String JMS_CONNECTION_FACTORY_PRODUCER_WINDOW_SIZE = "producer-window-size";
+ static final String JMS_CONNECTION_FACTORY_RECONNECT_ATTEMPTS = "reconnect-attempts";
+ static final String JMS_CONNECTION_FACTORY_RETRY_INTERVAL = "retry-interval";
+ static final String JMS_CONNECTION_FACTORY_RETRY_INTERVAL_MULTIPLIER = "retry-interval-multiplier";
+ static final String JMS_CONNECTION_FACTORY_SCHEDULED_THREAD_POOL_MAX_SIZE = "scheduled-thread-pool-max-size";
+ static final String JMS_CONNECTION_FACTORY_THREAD_POOL_MAX_SIZE = "thread-pool-max-size";
+ static final String JMS_CONNECTION_FACTORY_TRANSACTION_BATCH_SIZE = "transaction-batch-size";
+ static final String JMS_CONNECTION_FACTORY_CONNECTORS = "connectors";
+ static final String JMS_CONNECTION_FACTORY_CONNECTOR = "connector";
+ static final String JMS_CONNECTION_FACTORY_AUTO_GROUP = "auto-group";
+ static final String JMS_CONNECTION_FACTORY_BLOCK_ON_ACKNOWLEDGE = "block-on-acknowledge";
+ static final String JMS_CONNECTION_FACTORY_BLOCK_ON_DURABLE_SEND = "block-on-durable-send";
+ static final String JMS_CONNECTION_FACTORY_BLOCK_ON_NON_DURABLE_SEND = "block-on-non-durable-send";
+ static final String JMS_CONNECTION_FACTORY_CACHE_LARGE_MESSAGES_CLIENT = "cache-large-messages-client";
+ static final String JMS_CONNECTION_FACTORY_COMPRESS_LARGE_MESSAGES = "compress-large-messages";
+ static final String JMS_CONNECTION_FACTORY_FAILOVER_ON_INITIAL_CONNECTION = "failover-on-initial-connection";
+ static final String JMS_CONNECTION_FACTORY_HA = "ha";
+ static final String JMS_CONNECTION_FACTORY_PREACKNOWLEDGE = "preacknowledge";
+ static final String JMS_CONNECTION_FACTORY_USE_GLOBAL_POOLS = "use-global-pools";
+
+ static final String JMS_DESTINATIONS = "jms-destinations";
+ static final String JMS_DESTINATION = "jms-destination";
+ static final String JMS_DESTINATION_NAME = "name";
+ static final String JMS_DESTINATION_SELECTOR = "selector";
+ static final String JMS_DESTINATION_TYPE = "type";
+
+ static final String JMS_JNDI_ENTRIES = "entries";
+ static final String JMS_JNDI_ENTRY = "entry";
+
+ public static final String JNDI_COMPATIBILITY_PREFIX = "java:jboss/exported/";
+
+ static final String NULL = "_AMQ_NULL";
+}
\ No newline at end of file