You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@activemq.apache.org by jb...@apache.org on 2017/05/04 16:05:51 UTC
[4/6] activemq-artemis git commit: ARTEMIS-904 Remove cyclic
dependencies from artemis-cli
http://git-wip-us.apache.org/repos/asf/activemq-artemis/blob/f82623a2/artemis-cli/src/main/java/org/apache/activemq/artemis/cli/commands/tools/EncodeJournal.java
----------------------------------------------------------------------
diff --git a/artemis-cli/src/main/java/org/apache/activemq/artemis/cli/commands/tools/EncodeJournal.java b/artemis-cli/src/main/java/org/apache/activemq/artemis/cli/commands/tools/EncodeJournal.java
deleted file mode 100644
index e5fd80c..0000000
--- a/artemis-cli/src/main/java/org/apache/activemq/artemis/cli/commands/tools/EncodeJournal.java
+++ /dev/null
@@ -1,208 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements. See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package org.apache.activemq.artemis.cli.commands.tools;
-
-import java.io.BufferedOutputStream;
-import java.io.File;
-import java.io.FileOutputStream;
-import java.io.PrintStream;
-import java.util.List;
-
-import io.airlift.airline.Command;
-import io.airlift.airline.Option;
-import org.apache.activemq.artemis.cli.commands.ActionContext;
-import org.apache.activemq.artemis.core.io.SequentialFileFactory;
-import org.apache.activemq.artemis.core.io.nio.NIOSequentialFileFactory;
-import org.apache.activemq.artemis.core.journal.RecordInfo;
-import org.apache.activemq.artemis.core.journal.impl.JournalFile;
-import org.apache.activemq.artemis.core.journal.impl.JournalImpl;
-import org.apache.activemq.artemis.core.journal.impl.JournalReaderCallback;
-import org.apache.activemq.artemis.utils.Base64;
-
-@Command(name = "encode", description = "Encode a set of journal files into an internal encoded data format")
-public class EncodeJournal extends LockAbstract {
-
- @Option(name = "--directory", description = "The journal folder (default the journal folder from broker.xml)")
- public String directory;
-
- @Option(name = "--prefix", description = "The journal prefix (default activemq-data)")
- public String prefix = "activemq-data";
-
- @Option(name = "--suffix", description = "The journal suffix (default amq)")
- public String suffix = "amq";
-
- @Option(name = "--file-size", description = "The journal size (default 10485760)")
- public int size = 10485760;
-
- @Override
- public Object execute(ActionContext context) throws Exception {
- super.execute(context);
- try {
- if (directory == null) {
- directory = getFileConfiguration().getJournalDirectory();
- }
-
- exportJournal(directory, prefix, suffix, 2, size);
- } catch (Exception e) {
- treatError(e, "data", "encode");
- }
-
- return null;
- }
-
- public static void exportJournal(final String directory,
- final String journalPrefix,
- final String journalSuffix,
- final int minFiles,
- final int fileSize) throws Exception {
-
- exportJournal(directory, journalPrefix, journalSuffix, minFiles, fileSize, System.out);
- }
-
- public static void exportJournal(final String directory,
- final String journalPrefix,
- final String journalSuffix,
- final int minFiles,
- final int fileSize,
- final String fileName) throws Exception {
- try (FileOutputStream fileOutputStream = new FileOutputStream(fileName);
- BufferedOutputStream bufferedOutputStream = new BufferedOutputStream(fileOutputStream);
- PrintStream out = new PrintStream(bufferedOutputStream)) {
- exportJournal(directory, journalPrefix, journalSuffix, minFiles, fileSize, out);
- }
- }
-
- public static void exportJournal(final String directory,
- final String journalPrefix,
- final String journalSuffix,
- final int minFiles,
- final int fileSize,
- final PrintStream out) throws Exception {
- NIOSequentialFileFactory nio = new NIOSequentialFileFactory(new File(directory), null, 1);
-
- JournalImpl journal = new JournalImpl(fileSize, minFiles, minFiles, 0, 0, nio, journalPrefix, journalSuffix, 1);
-
- List<JournalFile> files = journal.orderFiles();
-
- for (JournalFile file : files) {
- out.println("#File," + file);
-
- exportJournalFile(out, nio, file);
- }
- }
-
- /**
- * @param out
- * @param fileFactory
- * @param file
- * @throws Exception
- */
- public static void exportJournalFile(final PrintStream out,
- final SequentialFileFactory fileFactory,
- final JournalFile file) throws Exception {
- JournalImpl.readJournalFile(fileFactory, file, new JournalReaderCallback() {
-
- @Override
- public void onReadUpdateRecordTX(final long transactionID, final RecordInfo recordInfo) throws Exception {
- out.println("operation@UpdateTX,txID@" + transactionID + "," + describeRecord(recordInfo));
- }
-
- @Override
- public void onReadUpdateRecord(final RecordInfo recordInfo) throws Exception {
- out.println("operation@Update," + describeRecord(recordInfo));
- }
-
- @Override
- public void onReadRollbackRecord(final long transactionID) throws Exception {
- out.println("operation@Rollback,txID@" + transactionID);
- }
-
- @Override
- public void onReadPrepareRecord(final long transactionID,
- final byte[] extraData,
- final int numberOfRecords) throws Exception {
- out.println("operation@Prepare,txID@" + transactionID +
- ",numberOfRecords@" +
- numberOfRecords +
- ",extraData@" +
- encode(extraData));
- }
-
- @Override
- public void onReadDeleteRecordTX(final long transactionID, final RecordInfo recordInfo) throws Exception {
- out.println("operation@DeleteRecordTX,txID@" + transactionID +
- "," +
- describeRecord(recordInfo));
- }
-
- @Override
- public void onReadDeleteRecord(final long recordID) throws Exception {
- out.println("operation@DeleteRecord,id@" + recordID);
- }
-
- @Override
- public void onReadCommitRecord(final long transactionID, final int numberOfRecords) throws Exception {
- out.println("operation@Commit,txID@" + transactionID + ",numberOfRecords@" + numberOfRecords);
- }
-
- @Override
- public void onReadAddRecordTX(final long transactionID, final RecordInfo recordInfo) throws Exception {
- out.println("operation@AddRecordTX,txID@" + transactionID + "," + describeRecord(recordInfo));
- }
-
- @Override
- public void onReadAddRecord(final RecordInfo recordInfo) throws Exception {
- out.println("operation@AddRecord," + describeRecord(recordInfo));
- }
-
- @Override
- public void markAsDataFile(final JournalFile file) {
- }
- });
- }
-
- private static String describeRecord(final RecordInfo recordInfo) {
- return "id@" + recordInfo.id +
- ",userRecordType@" +
- recordInfo.userRecordType +
- ",length@" +
- recordInfo.data.length +
- ",isUpdate@" +
- recordInfo.isUpdate +
- ",compactCount@" +
- recordInfo.compactCount +
- ",data@" +
- encode(recordInfo.data);
- }
-
- private static String encode(final byte[] data) {
- return Base64.encodeBytes(data, 0, data.length, Base64.DONT_BREAK_LINES | Base64.URL_SAFE);
- }
-
- public void printUsage() {
- for (int i = 0; i < 10; i++) {
- System.err.println();
- }
- System.err.println("This method will export the journal at low level record.");
- System.err.println();
- System.err.println();
- for (int i = 0; i < 10; i++) {
- System.err.println();
- }
- }
-
-}
http://git-wip-us.apache.org/repos/asf/activemq-artemis/blob/f82623a2/artemis-cli/src/main/java/org/apache/activemq/artemis/cli/commands/tools/HelpData.java
----------------------------------------------------------------------
diff --git a/artemis-cli/src/main/java/org/apache/activemq/artemis/cli/commands/tools/HelpData.java b/artemis-cli/src/main/java/org/apache/activemq/artemis/cli/commands/tools/HelpData.java
index a7a27a6..86b9a60 100644
--- a/artemis-cli/src/main/java/org/apache/activemq/artemis/cli/commands/tools/HelpData.java
+++ b/artemis-cli/src/main/java/org/apache/activemq/artemis/cli/commands/tools/HelpData.java
@@ -25,7 +25,7 @@ import io.airlift.airline.Help;
import org.apache.activemq.artemis.cli.commands.Action;
import org.apache.activemq.artemis.cli.commands.ActionContext;
import org.apache.activemq.artemis.cli.commands.InvalidOptionsError;
-import org.apache.activemq.artemis.util.OptionsUtil;
+import org.apache.activemq.artemis.cli.commands.OptionsUtil;
public class HelpData extends Help implements Action {
http://git-wip-us.apache.org/repos/asf/activemq-artemis/blob/f82623a2/artemis-cli/src/main/java/org/apache/activemq/artemis/cli/commands/tools/LockAbstract.java
----------------------------------------------------------------------
diff --git a/artemis-cli/src/main/java/org/apache/activemq/artemis/cli/commands/tools/LockAbstract.java b/artemis-cli/src/main/java/org/apache/activemq/artemis/cli/commands/tools/LockAbstract.java
index 5bffb36..cbc5234 100644
--- a/artemis-cli/src/main/java/org/apache/activemq/artemis/cli/commands/tools/LockAbstract.java
+++ b/artemis-cli/src/main/java/org/apache/activemq/artemis/cli/commands/tools/LockAbstract.java
@@ -32,15 +32,6 @@ public abstract class LockAbstract extends DataAbstract {
private static RandomAccessFile serverLockFile = null;
private static FileLock serverLockLock = null;
- protected File getLockPlace() throws Exception {
- String brokerInstance = getBrokerInstance();
- if (brokerInstance != null) {
- return new File(new File(brokerInstance), "lock");
- } else {
- return null;
- }
- }
-
public static void unlock() {
try {
if (serverLockFile != null) {
@@ -70,7 +61,7 @@ public abstract class LockAbstract extends DataAbstract {
return null;
}
- protected void lockCLI(File lockPlace) throws Exception {
+ void lockCLI(File lockPlace) throws Exception {
if (lockPlace != null) {
lockPlace.mkdirs();
if (serverLockFile == null) {
@@ -89,4 +80,12 @@ public abstract class LockAbstract extends DataAbstract {
}
}
+ private File getLockPlace() throws Exception {
+ String brokerInstance = getBrokerInstance();
+ if (brokerInstance != null) {
+ return new File(new File(brokerInstance), "lock");
+ } else {
+ return null;
+ }
+ }
}
http://git-wip-us.apache.org/repos/asf/activemq-artemis/blob/f82623a2/artemis-cli/src/main/java/org/apache/activemq/artemis/cli/commands/tools/PerfJournal.java
----------------------------------------------------------------------
diff --git a/artemis-cli/src/main/java/org/apache/activemq/artemis/cli/commands/tools/PerfJournal.java b/artemis-cli/src/main/java/org/apache/activemq/artemis/cli/commands/tools/PerfJournal.java
deleted file mode 100644
index f7d89ec..0000000
--- a/artemis-cli/src/main/java/org/apache/activemq/artemis/cli/commands/tools/PerfJournal.java
+++ /dev/null
@@ -1,92 +0,0 @@
-/**
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements. See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.activemq.artemis.cli.commands.tools;
-
-import java.text.DecimalFormat;
-
-import io.airlift.airline.Command;
-import io.airlift.airline.Option;
-import org.apache.activemq.artemis.cli.commands.ActionContext;
-import org.apache.activemq.artemis.cli.commands.util.SyncCalculation;
-import org.apache.activemq.artemis.core.config.impl.FileConfiguration;
-import org.apache.activemq.artemis.core.server.JournalType;
-
-@Command(name = "perf-journal", description = "Calculates the journal-buffer-timeout you should use with the current data folder")
-public class PerfJournal extends LockAbstract {
-
-
- @Option(name = "--block-size", description = "The block size for each write (default 4096)")
- public int size = 4 * 1024;
-
-
- @Option(name = "--writes", description = "The number of writes to be performed (default 250)")
- public int writes = 250;
-
- @Option(name = "--tries", description = "The number of tries for the test (default 5)")
- public int tries = 5;
-
- @Option(name = "--no-sync", description = "Disable sync")
- public boolean nosyncs = false;
-
- @Option(name = "--sync", description = "Enable syncs")
- public boolean syncs = false;
-
- @Option(name = "--journal-type", description = "Journal Type to be used (default from broker.xml)")
- public String journalType = null;
-
-
- @Override
- public Object execute(ActionContext context) throws Exception {
- super.execute(context);
-
- FileConfiguration fileConfiguration = getFileConfiguration();
-
- if (nosyncs) {
- fileConfiguration.setJournalDatasync(false);
- } else if (syncs) {
- fileConfiguration.setJournalDatasync(true);
- }
-
-
- if (journalType != null) {
- fileConfiguration.setJournalType(JournalType.getType(journalType));
- }
-
- System.out.println("");
- System.out.println("Auto tuning journal ...");
-
- System.out.println("Performing " + tries + " tests writing " + writes + " blocks of " + size + " on each test, sync=" + fileConfiguration.isJournalDatasync() + " with journalType = " + fileConfiguration.getJournalType());
-
- fileConfiguration.getJournalLocation().mkdirs();
-
- long time = SyncCalculation.syncTest(fileConfiguration.getJournalLocation(), size, writes, tries, verbose, fileConfiguration.isJournalDatasync(), fileConfiguration.getJournalType());
-
- long nanosecondsWait = SyncCalculation.toNanos(time, writes, verbose);
- double writesPerMillisecond = (double) writes / (double) time;
-
- String writesPerMillisecondStr = new DecimalFormat("###.##").format(writesPerMillisecond);
-
- context.out.println("Your system can execute " + writesPerMillisecondStr + " syncs per millisecond");
- context.out.println("Your journal-buffer-timeout should be:" + nanosecondsWait);
- context.out.println("You should use this following configuration:");
- context.out.println();
- context.out.println("<journal-buffer-timeout>" + nanosecondsWait + "</journal-buffer-timeout>");
-
- return null;
- }
-}
http://git-wip-us.apache.org/repos/asf/activemq-artemis/blob/f82623a2/artemis-cli/src/main/java/org/apache/activemq/artemis/cli/commands/tools/PrintData.java
----------------------------------------------------------------------
diff --git a/artemis-cli/src/main/java/org/apache/activemq/artemis/cli/commands/tools/PrintData.java b/artemis-cli/src/main/java/org/apache/activemq/artemis/cli/commands/tools/PrintData.java
index 2816aaf..d5e895d 100644
--- a/artemis-cli/src/main/java/org/apache/activemq/artemis/cli/commands/tools/PrintData.java
+++ b/artemis-cli/src/main/java/org/apache/activemq/artemis/cli/commands/tools/PrintData.java
@@ -17,6 +17,8 @@
package org.apache.activemq.artemis.cli.commands.tools;
import java.io.File;
+import java.io.InputStream;
+import java.io.OutputStream;
import java.util.HashMap;
import java.util.HashSet;
import java.util.List;
@@ -31,7 +33,6 @@ import io.airlift.airline.Command;
import org.apache.activemq.artemis.api.core.ActiveMQBuffer;
import org.apache.activemq.artemis.api.core.ActiveMQBuffers;
import org.apache.activemq.artemis.api.core.SimpleString;
-import org.apache.activemq.artemis.cli.Artemis;
import org.apache.activemq.artemis.cli.commands.ActionContext;
import org.apache.activemq.artemis.core.journal.RecordInfo;
import org.apache.activemq.artemis.core.message.impl.CoreMessagePersister;
@@ -81,7 +82,7 @@ public class PrintData extends OptionalLocking {
public static void printData(File bindingsDirectory, File messagesDirectory, File pagingDirectory) throws Exception {
// Having the version on the data report is an information very useful to understand what happened
// When debugging stuff
- Artemis.printBanner();
+ printBanner();
File serverLockFile = new File(messagesDirectory, "server.lock");
@@ -135,6 +136,20 @@ public class PrintData extends OptionalLocking {
}
+ public static void printBanner() throws Exception {
+ copy(PrintData.class.getResourceAsStream("banner.txt"), System.out);
+ }
+
+ private static long copy(InputStream in, OutputStream out) throws Exception {
+ byte[] buffer = new byte[1024];
+ int len = in.read(buffer);
+ while (len != -1) {
+ out.write(buffer, 0, len);
+ len = in.read(buffer);
+ }
+ return len;
+ }
+
private static void printPages(File pageDirectory, DescribeJournal describeJournal) {
try {
@@ -214,12 +229,9 @@ public class PrintData extends OptionalLocking {
System.out.println();
msgID++;
}
-
pgid++;
-
}
}
-
} catch (Exception e) {
e.printStackTrace();
}
@@ -228,7 +240,7 @@ public class PrintData extends OptionalLocking {
/**
* Calculate the acks on the page system
*/
- protected static PageCursorsInfo calculateCursorsInfo(List<RecordInfo> records) throws Exception {
+ private static PageCursorsInfo calculateCursorsInfo(List<RecordInfo> records) throws Exception {
PageCursorsInfo cursorInfo = new PageCursorsInfo();
@@ -293,25 +305,18 @@ public class PrintData extends OptionalLocking {
/**
* @return the pgTXs
*/
- public Set<Long> getPgTXs() {
+ Set<Long> getPgTXs() {
return pgTXs;
}
/**
* @return the cursorRecords
*/
- public Map<Long, Set<PagePosition>> getCursorRecords() {
+ Map<Long, Set<PagePosition>> getCursorRecords() {
return cursorRecords;
}
- /**
- * @return the completePages
- */
- public Map<Long, Set<Long>> getCompletePages() {
- return completePages;
- }
-
- public Set<Long> getCompletePages(Long queueID) {
+ Set<Long> getCompletePages(Long queueID) {
Set<Long> completePagesSet = completePages.get(queueID);
if (completePagesSet == null) {
http://git-wip-us.apache.org/repos/asf/activemq-artemis/blob/f82623a2/artemis-cli/src/main/java/org/apache/activemq/artemis/cli/commands/tools/XmlDataConstants.java
----------------------------------------------------------------------
diff --git a/artemis-cli/src/main/java/org/apache/activemq/artemis/cli/commands/tools/XmlDataConstants.java b/artemis-cli/src/main/java/org/apache/activemq/artemis/cli/commands/tools/XmlDataConstants.java
deleted file mode 100644
index be7e84e..0000000
--- a/artemis-cli/src/main/java/org/apache/activemq/artemis/cli/commands/tools/XmlDataConstants.java
+++ /dev/null
@@ -1,129 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements. See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package org.apache.activemq.artemis.cli.commands.tools;
-
-/**
- * The constants shared by <code>org.apache.activemq.tools.XmlDataImporter</code> and
- * <code>org.apache.activemq.tools.XmlDataExporter</code>.
- */
-public final class XmlDataConstants {
-
- private XmlDataConstants() {
- // Utility
- }
-
- public static final String XML_VERSION = "1.0";
- public static final String DOCUMENT_PARENT = "activemq-journal";
- public static final String BINDINGS_PARENT = "bindings";
-
- public static final String QUEUE_BINDINGS_CHILD = "queue-binding";
- public static final String QUEUE_BINDING_ADDRESS = "address";
- public static final String QUEUE_BINDING_FILTER_STRING = "filter-string";
- public static final String QUEUE_BINDING_NAME = "name";
- public static final String QUEUE_BINDING_ID = "id";
- public static final String QUEUE_BINDING_ROUTING_TYPE = "routing-type";
-
- public static final String ADDRESS_BINDINGS_CHILD = "address-binding";
- public static final String ADDRESS_BINDING_NAME = "name";
- public static final String ADDRESS_BINDING_ID = "id";
- public static final String ADDRESS_BINDING_ROUTING_TYPE = "routing-types";
-
- public static final String MESSAGES_PARENT = "messages";
- public static final String MESSAGES_CHILD = "message";
- public static final String MESSAGE_ID = "id";
- public static final String MESSAGE_PRIORITY = "priority";
- public static final String MESSAGE_EXPIRATION = "expiration";
- public static final String MESSAGE_TIMESTAMP = "timestamp";
- public static final String DEFAULT_TYPE_PRETTY = "default";
- public static final String BYTES_TYPE_PRETTY = "bytes";
- public static final String MAP_TYPE_PRETTY = "map";
- public static final String OBJECT_TYPE_PRETTY = "object";
- public static final String STREAM_TYPE_PRETTY = "stream";
- public static final String TEXT_TYPE_PRETTY = "text";
- public static final String MESSAGE_TYPE = "type";
- public static final String MESSAGE_IS_LARGE = "isLarge";
- public static final String MESSAGE_USER_ID = "user-id";
- public static final String MESSAGE_BODY = "body";
- public static final String PROPERTIES_PARENT = "properties";
- public static final String PROPERTIES_CHILD = "property";
- public static final String PROPERTY_NAME = "name";
- public static final String PROPERTY_VALUE = "value";
- public static final String PROPERTY_TYPE = "type";
- public static final String QUEUES_PARENT = "queues";
- public static final String QUEUES_CHILD = "queue";
- public static final String QUEUE_NAME = "name";
- public static final String PROPERTY_TYPE_BOOLEAN = "boolean";
- public static final String PROPERTY_TYPE_BYTE = "byte";
- public static final String PROPERTY_TYPE_BYTES = "bytes";
- public static final String PROPERTY_TYPE_SHORT = "short";
- public static final String PROPERTY_TYPE_INTEGER = "integer";
- public static final String PROPERTY_TYPE_LONG = "long";
- public static final String PROPERTY_TYPE_FLOAT = "float";
- public static final String PROPERTY_TYPE_DOUBLE = "double";
- public static final String PROPERTY_TYPE_STRING = "string";
- public static final String PROPERTY_TYPE_SIMPLE_STRING = "simple-string";
-
- static final String JMS_CONNECTION_FACTORY_NAME = "name";
- static final String JMS_CONNECTION_FACTORY_CLIENT_ID = "client-id";
- static final String JMS_CONNECTION_FACTORY_CALL_FAILOVER_TIMEOUT = "call-failover-timeout";
- static final String JMS_CONNECTION_FACTORY_CALL_TIMEOUT = "call-timeout";
- static final String JMS_CONNECTION_FACTORY_CLIENT_FAILURE_CHECK_PERIOD = "client-failure-check-period";
- static final String JMS_CONNECTION_FACTORY_CONFIRMATION_WINDOW_SIZE = "confirmation-window-size";
- static final String JMS_CONNECTION_FACTORY_CONNECTION_TTL = "connection-ttl";
- static final String JMS_CONNECTION_FACTORY_CONSUMER_MAX_RATE = "consumer-max-rate";
- static final String JMS_CONNECTION_FACTORY_CONSUMER_WINDOW_SIZE = "consumer-window-size";
- static final String JMS_CONNECTION_FACTORY_DISCOVERY_GROUP_NAME = "discovery-group-name";
- static final String JMS_CONNECTION_FACTORY_DUPS_OK_BATCH_SIZE = "dups-ok-batch-size";
- static final String JMS_CONNECTION_FACTORY_TYPE = "type";
- static final String JMS_CONNECTION_FACTORY_GROUP_ID = "group-id";
- static final String JMS_CONNECTION_FACTORY_LOAD_BALANCING_POLICY_CLASS_NAME = "load-balancing-policy-class-name";
- static final String JMS_CONNECTION_FACTORY_MAX_RETRY_INTERVAL = "max-retry-interval";
- static final String JMS_CONNECTION_FACTORY_MIN_LARGE_MESSAGE_SIZE = "min-large-message-size";
- static final String JMS_CONNECTION_FACTORY_PRODUCER_MAX_RATE = "producer-max-rate";
- static final String JMS_CONNECTION_FACTORY_PRODUCER_WINDOW_SIZE = "producer-window-size";
- static final String JMS_CONNECTION_FACTORY_RECONNECT_ATTEMPTS = "reconnect-attempts";
- static final String JMS_CONNECTION_FACTORY_RETRY_INTERVAL = "retry-interval";
- static final String JMS_CONNECTION_FACTORY_RETRY_INTERVAL_MULTIPLIER = "retry-interval-multiplier";
- static final String JMS_CONNECTION_FACTORY_SCHEDULED_THREAD_POOL_MAX_SIZE = "scheduled-thread-pool-max-size";
- static final String JMS_CONNECTION_FACTORY_THREAD_POOL_MAX_SIZE = "thread-pool-max-size";
- static final String JMS_CONNECTION_FACTORY_TRANSACTION_BATCH_SIZE = "transaction-batch-size";
- static final String JMS_CONNECTION_FACTORY_CONNECTORS = "connectors";
- static final String JMS_CONNECTION_FACTORY_CONNECTOR = "connector";
- static final String JMS_CONNECTION_FACTORY_AUTO_GROUP = "auto-group";
- static final String JMS_CONNECTION_FACTORY_BLOCK_ON_ACKNOWLEDGE = "block-on-acknowledge";
- static final String JMS_CONNECTION_FACTORY_BLOCK_ON_DURABLE_SEND = "block-on-durable-send";
- static final String JMS_CONNECTION_FACTORY_BLOCK_ON_NON_DURABLE_SEND = "block-on-non-durable-send";
- static final String JMS_CONNECTION_FACTORY_CACHE_LARGE_MESSAGES_CLIENT = "cache-large-messages-client";
- static final String JMS_CONNECTION_FACTORY_COMPRESS_LARGE_MESSAGES = "compress-large-messages";
- static final String JMS_CONNECTION_FACTORY_FAILOVER_ON_INITIAL_CONNECTION = "failover-on-initial-connection";
- static final String JMS_CONNECTION_FACTORY_HA = "ha";
- static final String JMS_CONNECTION_FACTORY_PREACKNOWLEDGE = "preacknowledge";
- static final String JMS_CONNECTION_FACTORY_USE_GLOBAL_POOLS = "use-global-pools";
-
- static final String JMS_DESTINATIONS = "jms-destinations";
- static final String JMS_DESTINATION = "jms-destination";
- static final String JMS_DESTINATION_NAME = "name";
- static final String JMS_DESTINATION_SELECTOR = "selector";
- static final String JMS_DESTINATION_TYPE = "type";
-
- static final String JMS_JNDI_ENTRIES = "entries";
- static final String JMS_JNDI_ENTRY = "entry";
-
- public static final String JNDI_COMPATIBILITY_PREFIX = "java:jboss/exported/";
-
- static final String NULL = "_AMQ_NULL";
-}
\ No newline at end of file
http://git-wip-us.apache.org/repos/asf/activemq-artemis/blob/f82623a2/artemis-cli/src/main/java/org/apache/activemq/artemis/cli/commands/tools/XmlDataExporter.java
----------------------------------------------------------------------
diff --git a/artemis-cli/src/main/java/org/apache/activemq/artemis/cli/commands/tools/XmlDataExporter.java b/artemis-cli/src/main/java/org/apache/activemq/artemis/cli/commands/tools/XmlDataExporter.java
deleted file mode 100644
index d2f6204..0000000
--- a/artemis-cli/src/main/java/org/apache/activemq/artemis/cli/commands/tools/XmlDataExporter.java
+++ /dev/null
@@ -1,626 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements. See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package org.apache.activemq.artemis.cli.commands.tools;
-
-import javax.xml.stream.XMLOutputFactory;
-import javax.xml.stream.XMLStreamException;
-import javax.xml.stream.XMLStreamWriter;
-import java.io.File;
-import java.io.OutputStream;
-import java.lang.reflect.InvocationHandler;
-import java.lang.reflect.Method;
-import java.lang.reflect.Proxy;
-import java.util.ArrayList;
-import java.util.HashMap;
-import java.util.HashSet;
-import java.util.LinkedList;
-import java.util.List;
-import java.util.Map;
-import java.util.Set;
-import java.util.concurrent.Executor;
-import java.util.concurrent.ExecutorService;
-import java.util.concurrent.Executors;
-import java.util.concurrent.ScheduledExecutorService;
-
-import io.airlift.airline.Command;
-import org.apache.activemq.artemis.api.core.ActiveMQBuffer;
-import org.apache.activemq.artemis.api.core.ActiveMQBuffers;
-import org.apache.activemq.artemis.api.core.ActiveMQException;
-import org.apache.activemq.artemis.api.core.ICoreMessage;
-import org.apache.activemq.artemis.api.core.Message;
-import org.apache.activemq.artemis.api.core.RoutingType;
-import org.apache.activemq.artemis.api.core.SimpleString;
-import org.apache.activemq.artemis.cli.commands.ActionContext;
-import org.apache.activemq.artemis.core.config.Configuration;
-import org.apache.activemq.artemis.core.config.impl.ConfigurationImpl;
-import org.apache.activemq.artemis.core.journal.Journal;
-import org.apache.activemq.artemis.core.journal.PreparedTransactionInfo;
-import org.apache.activemq.artemis.core.journal.RecordInfo;
-import org.apache.activemq.artemis.core.journal.TransactionFailureCallback;
-import org.apache.activemq.artemis.core.journal.impl.JournalImpl;
-import org.apache.activemq.artemis.core.message.LargeBodyEncoder;
-import org.apache.activemq.artemis.core.paging.PagedMessage;
-import org.apache.activemq.artemis.core.paging.PagingManager;
-import org.apache.activemq.artemis.core.paging.PagingStore;
-import org.apache.activemq.artemis.core.paging.PagingStoreFactory;
-import org.apache.activemq.artemis.core.paging.cursor.PagePosition;
-import org.apache.activemq.artemis.core.paging.cursor.impl.PagePositionImpl;
-import org.apache.activemq.artemis.core.paging.impl.Page;
-import org.apache.activemq.artemis.core.paging.impl.PageTransactionInfoImpl;
-import org.apache.activemq.artemis.core.paging.impl.PagingManagerImpl;
-import org.apache.activemq.artemis.core.paging.impl.PagingStoreFactoryNIO;
-import org.apache.activemq.artemis.core.persistence.impl.journal.AckDescribe;
-import org.apache.activemq.artemis.core.persistence.impl.journal.DescribeJournal;
-import org.apache.activemq.artemis.core.persistence.impl.journal.DescribeJournal.MessageDescribe;
-import org.apache.activemq.artemis.core.persistence.impl.journal.DescribeJournal.ReferenceDescribe;
-import org.apache.activemq.artemis.core.persistence.impl.journal.JournalRecordIds;
-import org.apache.activemq.artemis.core.persistence.impl.journal.JournalStorageManager;
-import org.apache.activemq.artemis.core.persistence.impl.journal.codec.CursorAckRecordEncoding;
-import org.apache.activemq.artemis.core.persistence.impl.journal.codec.PageUpdateTXEncoding;
-import org.apache.activemq.artemis.core.persistence.impl.journal.codec.PersistentAddressBindingEncoding;
-import org.apache.activemq.artemis.core.persistence.impl.journal.codec.PersistentQueueBindingEncoding;
-import org.apache.activemq.artemis.core.server.ActiveMQServerLogger;
-import org.apache.activemq.artemis.core.server.JournalType;
-import org.apache.activemq.artemis.core.server.LargeServerMessage;
-import org.apache.activemq.artemis.core.settings.HierarchicalRepository;
-import org.apache.activemq.artemis.core.settings.impl.AddressSettings;
-import org.apache.activemq.artemis.core.settings.impl.HierarchicalObjectRepository;
-import org.apache.activemq.artemis.utils.ActiveMQThreadFactory;
-import org.apache.activemq.artemis.utils.ExecutorFactory;
-import org.apache.activemq.artemis.utils.OrderedExecutorFactory;
-
-@Command(name = "exp", description = "Export all message-data using an XML that could be interpreted by any system.")
-public final class XmlDataExporter extends OptionalLocking {
-
- private static final Long LARGE_MESSAGE_CHUNK_SIZE = 1000L;
-
- private JournalStorageManager storageManager;
-
- private Configuration config;
-
- private XMLStreamWriter xmlWriter;
-
- // an inner map of message refs hashed by the queue ID to which they belong and then hashed by their record ID
- private final Map<Long, HashMap<Long, ReferenceDescribe>> messageRefs = new HashMap<>();
-
- // map of all message records hashed by their record ID (which will match the record ID of the message refs)
- private final HashMap<Long, Message> messages = new HashMap<>();
-
- private final Map<Long, Set<PagePosition>> cursorRecords = new HashMap<>();
-
- private final Set<Long> pgTXs = new HashSet<>();
-
- private final HashMap<Long, PersistentQueueBindingEncoding> queueBindings = new HashMap<>();
-
- private final HashMap<Long, PersistentAddressBindingEncoding> addressBindings = new HashMap<>();
-
- long messagesPrinted = 0L;
-
- long bindingsPrinted = 0L;
-
- @Override
- public Object execute(ActionContext context) throws Exception {
- super.execute(context);
-
- try {
- process(context.out, getBinding(), getJournal(), getPaging(), getLargeMessages());
- } catch (Exception e) {
- treatError(e, "data", "exp");
- }
- return null;
- }
-
- public void process(OutputStream out,
- String bindingsDir,
- String journalDir,
- String pagingDir,
- String largeMessagesDir) throws Exception {
- config = new ConfigurationImpl().setBindingsDirectory(bindingsDir).setJournalDirectory(journalDir).setPagingDirectory(pagingDir).setLargeMessagesDirectory(largeMessagesDir).setJournalType(JournalType.NIO);
- final ExecutorService executor = Executors.newFixedThreadPool(5, ActiveMQThreadFactory.defaultThreadFactory());
- ExecutorFactory executorFactory = new OrderedExecutorFactory(executor);
-
- storageManager = new JournalStorageManager(config, executorFactory, executorFactory);
-
- XMLOutputFactory factory = XMLOutputFactory.newInstance();
- XMLStreamWriter rawXmlWriter = factory.createXMLStreamWriter(out, "UTF-8");
- PrettyPrintHandler handler = new PrettyPrintHandler(rawXmlWriter);
- xmlWriter = (XMLStreamWriter) Proxy.newProxyInstance(XMLStreamWriter.class.getClassLoader(), new Class[]{XMLStreamWriter.class}, handler);
-
- writeXMLData();
-
- executor.shutdown();
- }
-
- private void writeXMLData() throws Exception {
- long start = System.currentTimeMillis();
- getBindings();
- processMessageJournal();
- printDataAsXML();
- ActiveMQServerLogger.LOGGER.debug("\n\nProcessing took: " + (System.currentTimeMillis() - start) + "ms");
- ActiveMQServerLogger.LOGGER.debug("Output " + messagesPrinted + " messages and " + bindingsPrinted + " bindings.");
- }
-
- /**
- * Read through the message journal and stuff all the events/data we care about into local data structures. We'll
- * use this data later to print all the right information.
- *
- * @throws Exception will be thrown if anything goes wrong reading the journal
- */
- private void processMessageJournal() throws Exception {
- ArrayList<RecordInfo> acks = new ArrayList<>();
-
- List<RecordInfo> records = new LinkedList<>();
-
- // We load these, but don't use them.
- List<PreparedTransactionInfo> preparedTransactions = new LinkedList<>();
-
- Journal messageJournal = storageManager.getMessageJournal();
-
- ActiveMQServerLogger.LOGGER.debug("Reading journal from " + config.getJournalDirectory());
-
- messageJournal.start();
-
- // Just logging these, no action necessary
- TransactionFailureCallback transactionFailureCallback = new TransactionFailureCallback() {
- @Override
- public void failedTransaction(long transactionID,
- List<RecordInfo> records1,
- List<RecordInfo> recordsToDelete) {
- StringBuilder message = new StringBuilder();
- message.append("Encountered failed journal transaction: ").append(transactionID);
- for (int i = 0; i < records1.size(); i++) {
- if (i == 0) {
- message.append("; Records: ");
- }
- message.append(records1.get(i));
- if (i != (records1.size() - 1)) {
- message.append(", ");
- }
- }
-
- for (int i = 0; i < recordsToDelete.size(); i++) {
- if (i == 0) {
- message.append("; RecordsToDelete: ");
- }
- message.append(recordsToDelete.get(i));
- if (i != (recordsToDelete.size() - 1)) {
- message.append(", ");
- }
- }
-
- ActiveMQServerLogger.LOGGER.debug(message.toString());
- }
- };
-
- ((JournalImpl) messageJournal).load(records, preparedTransactions, transactionFailureCallback, false);
-
- // Since we don't use these nullify the reference so that the garbage collector can clean them up
- preparedTransactions = null;
-
- for (RecordInfo info : records) {
- byte[] data = info.data;
-
- ActiveMQBuffer buff = ActiveMQBuffers.wrappedBuffer(data);
-
- Object o = DescribeJournal.newObjectEncoding(info, storageManager);
- if (info.getUserRecordType() == JournalRecordIds.ADD_MESSAGE) {
- messages.put(info.id, ((MessageDescribe) o).getMsg().toCore());
- } else if (info.getUserRecordType() == JournalRecordIds.ADD_MESSAGE_PROTOCOL) {
- messages.put(info.id, ((MessageDescribe) o).getMsg().toCore());
- } else if (info.getUserRecordType() == JournalRecordIds.ADD_LARGE_MESSAGE) {
- messages.put(info.id, ((MessageDescribe) o).getMsg());
- } else if (info.getUserRecordType() == JournalRecordIds.ADD_REF) {
- ReferenceDescribe ref = (ReferenceDescribe) o;
- HashMap<Long, ReferenceDescribe> map = messageRefs.get(info.id);
- if (map == null) {
- HashMap<Long, ReferenceDescribe> newMap = new HashMap<>();
- newMap.put(ref.refEncoding.queueID, ref);
- messageRefs.put(info.id, newMap);
- } else {
- map.put(ref.refEncoding.queueID, ref);
- }
- } else if (info.getUserRecordType() == JournalRecordIds.ACKNOWLEDGE_REF) {
- acks.add(info);
- } else if (info.userRecordType == JournalRecordIds.ACKNOWLEDGE_CURSOR) {
- CursorAckRecordEncoding encoding = new CursorAckRecordEncoding();
- encoding.decode(buff);
-
- Set<PagePosition> set = cursorRecords.get(encoding.queueID);
-
- if (set == null) {
- set = new HashSet<>();
- cursorRecords.put(encoding.queueID, set);
- }
-
- set.add(encoding.position);
- } else if (info.userRecordType == JournalRecordIds.PAGE_TRANSACTION) {
- if (info.isUpdate) {
- PageUpdateTXEncoding pageUpdate = new PageUpdateTXEncoding();
-
- pageUpdate.decode(buff);
- pgTXs.add(pageUpdate.pageTX);
- } else {
- PageTransactionInfoImpl pageTransactionInfo = new PageTransactionInfoImpl();
-
- pageTransactionInfo.decode(buff);
-
- pageTransactionInfo.setRecordID(info.id);
- pgTXs.add(pageTransactionInfo.getTransactionID());
- }
- }
- }
-
- messageJournal.stop();
-
- removeAcked(acks);
- }
-
- /**
- * Go back through the messages and message refs we found in the journal and remove the ones that have been acked.
- *
- * @param acks the list of ack records we got from the journal
- */
- private void removeAcked(ArrayList<RecordInfo> acks) {
- for (RecordInfo info : acks) {
- AckDescribe ack = (AckDescribe) DescribeJournal.newObjectEncoding(info, null);
- HashMap<Long, ReferenceDescribe> referenceDescribeHashMap = messageRefs.get(info.id);
- referenceDescribeHashMap.remove(ack.refEncoding.queueID);
- if (referenceDescribeHashMap.size() == 0) {
- messages.remove(info.id);
- messageRefs.remove(info.id);
- }
- }
- }
-
- /**
- * Open the bindings journal and extract all bindings data.
- *
- * @throws Exception will be thrown if anything goes wrong reading the bindings journal
- */
- private void getBindings() throws Exception {
- List<RecordInfo> records = new LinkedList<>();
-
- Journal bindingsJournal = storageManager.getBindingsJournal();
-
- bindingsJournal.start();
-
- ActiveMQServerLogger.LOGGER.debug("Reading bindings journal from " + config.getBindingsDirectory());
-
- ((JournalImpl) bindingsJournal).load(records, null, null, false);
-
- for (RecordInfo info : records) {
- if (info.getUserRecordType() == JournalRecordIds.QUEUE_BINDING_RECORD) {
- PersistentQueueBindingEncoding bindingEncoding = (PersistentQueueBindingEncoding) DescribeJournal.newObjectEncoding(info, null);
- queueBindings.put(bindingEncoding.getId(), bindingEncoding);
- } else if (info.getUserRecordType() == JournalRecordIds.ADDRESS_BINDING_RECORD) {
- PersistentAddressBindingEncoding bindingEncoding = (PersistentAddressBindingEncoding) DescribeJournal.newObjectEncoding(info, null);
- addressBindings.put(bindingEncoding.getId(), bindingEncoding);
- }
- }
-
- bindingsJournal.stop();
- }
-
- private void printDataAsXML() {
- try {
- xmlWriter.writeStartDocument(XmlDataConstants.XML_VERSION);
- xmlWriter.writeStartElement(XmlDataConstants.DOCUMENT_PARENT);
- printBindingsAsXML();
- printAllMessagesAsXML();
- xmlWriter.writeEndElement(); // end DOCUMENT_PARENT
- xmlWriter.writeEndDocument();
- xmlWriter.flush();
- xmlWriter.close();
- } catch (Exception e) {
- e.printStackTrace();
- }
- }
-
- private void printBindingsAsXML() throws XMLStreamException {
- xmlWriter.writeStartElement(XmlDataConstants.BINDINGS_PARENT);
- for (Map.Entry<Long, PersistentAddressBindingEncoding> addressBindingEncodingEntry : addressBindings.entrySet()) {
- PersistentAddressBindingEncoding bindingEncoding = addressBindings.get(addressBindingEncodingEntry.getKey());
- xmlWriter.writeEmptyElement(XmlDataConstants.ADDRESS_BINDINGS_CHILD);
- StringBuilder routingTypes = new StringBuilder();
- for (RoutingType routingType : bindingEncoding.getRoutingTypes()) {
- routingTypes.append(routingType.toString()).append(", ");
- }
- xmlWriter.writeAttribute(XmlDataConstants.ADDRESS_BINDING_ROUTING_TYPE, routingTypes.toString().substring(0, routingTypes.length() - 2));
- xmlWriter.writeAttribute(XmlDataConstants.ADDRESS_BINDING_NAME, bindingEncoding.getName().toString());
- xmlWriter.writeAttribute(XmlDataConstants.ADDRESS_BINDING_ID, Long.toString(bindingEncoding.getId()));
- bindingsPrinted++;
- }
- for (Map.Entry<Long, PersistentQueueBindingEncoding> queueBindingEncodingEntry : queueBindings.entrySet()) {
- PersistentQueueBindingEncoding bindingEncoding = queueBindings.get(queueBindingEncodingEntry.getKey());
- xmlWriter.writeEmptyElement(XmlDataConstants.QUEUE_BINDINGS_CHILD);
- xmlWriter.writeAttribute(XmlDataConstants.QUEUE_BINDING_ADDRESS, bindingEncoding.getAddress().toString());
- String filter = "";
- if (bindingEncoding.getFilterString() != null) {
- filter = bindingEncoding.getFilterString().toString();
- }
- xmlWriter.writeAttribute(XmlDataConstants.QUEUE_BINDING_FILTER_STRING, filter);
- xmlWriter.writeAttribute(XmlDataConstants.QUEUE_BINDING_NAME, bindingEncoding.getQueueName().toString());
- xmlWriter.writeAttribute(XmlDataConstants.QUEUE_BINDING_ID, Long.toString(bindingEncoding.getId()));
- xmlWriter.writeAttribute(XmlDataConstants.QUEUE_BINDING_ROUTING_TYPE, RoutingType.getType(bindingEncoding.getRoutingType()).toString());
- bindingsPrinted++;
- }
- xmlWriter.writeEndElement(); // end BINDINGS_PARENT
- }
-
- private void printAllMessagesAsXML() throws Exception {
- xmlWriter.writeStartElement(XmlDataConstants.MESSAGES_PARENT);
-
- // Order here is important. We must process the messages from the journal before we process those from the page
- // files in order to get the messages in the right order.
- for (Map.Entry<Long, Message> messageMapEntry : messages.entrySet()) {
- printSingleMessageAsXML(messageMapEntry.getValue().toCore(), extractQueueNames(messageRefs.get(messageMapEntry.getKey())));
- }
-
- printPagedMessagesAsXML();
-
- xmlWriter.writeEndElement(); // end "messages"
- }
-
- /**
- * Reads from the page files and prints messages as it finds them (making sure to check acks and transactions
- * from the journal).
- */
- private void printPagedMessagesAsXML() {
- try {
- ScheduledExecutorService scheduled = Executors.newScheduledThreadPool(1, ActiveMQThreadFactory.defaultThreadFactory());
- final ExecutorService executor = Executors.newFixedThreadPool(10, ActiveMQThreadFactory.defaultThreadFactory());
- ExecutorFactory executorFactory = new ExecutorFactory() {
- @Override
- public Executor getExecutor() {
- return executor;
- }
- };
- PagingStoreFactory pageStoreFactory = new PagingStoreFactoryNIO(storageManager, config.getPagingLocation(), 1000L, scheduled, executorFactory, true, null);
- HierarchicalRepository<AddressSettings> addressSettingsRepository = new HierarchicalObjectRepository<>();
- addressSettingsRepository.setDefault(new AddressSettings());
- PagingManager manager = new PagingManagerImpl(pageStoreFactory, addressSettingsRepository);
-
- manager.start();
-
- SimpleString[] stores = manager.getStoreNames();
-
- for (SimpleString store : stores) {
- PagingStore pageStore = manager.getPageStore(store);
-
- if (pageStore != null) {
- File folder = pageStore.getFolder();
- ActiveMQServerLogger.LOGGER.debug("Reading page store " + store + " folder = " + folder);
-
- int pageId = (int) pageStore.getFirstPage();
- for (int i = 0; i < pageStore.getNumberOfPages(); i++) {
- ActiveMQServerLogger.LOGGER.debug("Reading page " + pageId);
- Page page = pageStore.createPage(pageId);
- page.open();
- List<PagedMessage> messages = page.read(storageManager);
- page.close();
-
- int messageId = 0;
-
- for (PagedMessage message : messages) {
- message.initMessage(storageManager);
- long[] queueIDs = message.getQueueIDs();
- List<String> queueNames = new ArrayList<>();
- for (long queueID : queueIDs) {
- PagePosition posCheck = new PagePositionImpl(pageId, messageId);
-
- boolean acked = false;
-
- Set<PagePosition> positions = cursorRecords.get(queueID);
- if (positions != null) {
- acked = positions.contains(posCheck);
- }
-
- if (!acked) {
- PersistentQueueBindingEncoding queueBinding = queueBindings.get(queueID);
- if (queueBinding != null) {
- SimpleString queueName = queueBinding.getQueueName();
- queueNames.add(queueName.toString());
- }
- }
- }
-
- if (queueNames.size() > 0 && (message.getTransactionID() == -1 || pgTXs.contains(message.getTransactionID()))) {
- printSingleMessageAsXML(message.getMessage().toCore(), queueNames);
- }
-
- messageId++;
- }
-
- pageId++;
- }
- } else {
- ActiveMQServerLogger.LOGGER.debug("Page store was null");
- }
- }
- } catch (Exception e) {
- e.printStackTrace();
- }
- }
-
- private void printSingleMessageAsXML(ICoreMessage message, List<String> queues) throws Exception {
- xmlWriter.writeStartElement(XmlDataConstants.MESSAGES_CHILD);
- printMessageAttributes(message);
- printMessageProperties(message);
- printMessageQueues(queues);
- printMessageBody(message.toCore());
- xmlWriter.writeEndElement(); // end MESSAGES_CHILD
- messagesPrinted++;
- }
-
- private void printMessageBody(Message message) throws Exception {
- xmlWriter.writeStartElement(XmlDataConstants.MESSAGE_BODY);
-
- if (message.toCore().isLargeMessage()) {
- printLargeMessageBody((LargeServerMessage) message);
- } else {
- xmlWriter.writeCData(XmlDataExporterUtil.encodeMessageBody(message));
- }
- xmlWriter.writeEndElement(); // end MESSAGE_BODY
- }
-
- private void printLargeMessageBody(LargeServerMessage message) throws XMLStreamException {
- xmlWriter.writeAttribute(XmlDataConstants.MESSAGE_IS_LARGE, Boolean.TRUE.toString());
- LargeBodyEncoder encoder = null;
-
- try {
- encoder = message.toCore().getBodyEncoder();
- encoder.open();
- long totalBytesWritten = 0;
- Long bufferSize;
- long bodySize = encoder.getLargeBodySize();
- for (long i = 0; i < bodySize; i += LARGE_MESSAGE_CHUNK_SIZE) {
- Long remainder = bodySize - totalBytesWritten;
- if (remainder >= LARGE_MESSAGE_CHUNK_SIZE) {
- bufferSize = LARGE_MESSAGE_CHUNK_SIZE;
- } else {
- bufferSize = remainder;
- }
- ActiveMQBuffer buffer = ActiveMQBuffers.fixedBuffer(bufferSize.intValue());
- encoder.encode(buffer, bufferSize.intValue());
- xmlWriter.writeCData(XmlDataExporterUtil.encode(buffer.toByteBuffer().array()));
- totalBytesWritten += bufferSize;
- }
- encoder.close();
- } catch (ActiveMQException e) {
- e.printStackTrace();
- } finally {
- if (encoder != null) {
- try {
- encoder.close();
- } catch (ActiveMQException e) {
- e.printStackTrace();
- }
- }
- }
- }
-
- private void printMessageQueues(List<String> queues) throws XMLStreamException {
- xmlWriter.writeStartElement(XmlDataConstants.QUEUES_PARENT);
- for (String queueName : queues) {
- xmlWriter.writeEmptyElement(XmlDataConstants.QUEUES_CHILD);
- xmlWriter.writeAttribute(XmlDataConstants.QUEUE_NAME, queueName);
- }
- xmlWriter.writeEndElement(); // end QUEUES_PARENT
- }
-
- private void printMessageProperties(Message message) throws XMLStreamException {
- xmlWriter.writeStartElement(XmlDataConstants.PROPERTIES_PARENT);
- for (SimpleString key : message.getPropertyNames()) {
- Object value = message.getObjectProperty(key);
- xmlWriter.writeEmptyElement(XmlDataConstants.PROPERTIES_CHILD);
- xmlWriter.writeAttribute(XmlDataConstants.PROPERTY_NAME, key.toString());
- xmlWriter.writeAttribute(XmlDataConstants.PROPERTY_VALUE, XmlDataExporterUtil.convertProperty(value));
-
- // Write the property type as an attribute
- String propertyType = XmlDataExporterUtil.getPropertyType(value);
- if (propertyType != null) {
- xmlWriter.writeAttribute(XmlDataConstants.PROPERTY_TYPE, propertyType);
- }
- }
- xmlWriter.writeEndElement(); // end PROPERTIES_PARENT
- }
-
- private void printMessageAttributes(ICoreMessage message) throws XMLStreamException {
- xmlWriter.writeAttribute(XmlDataConstants.MESSAGE_ID, Long.toString(message.getMessageID()));
- xmlWriter.writeAttribute(XmlDataConstants.MESSAGE_PRIORITY, Byte.toString(message.getPriority()));
- xmlWriter.writeAttribute(XmlDataConstants.MESSAGE_EXPIRATION, Long.toString(message.getExpiration()));
- xmlWriter.writeAttribute(XmlDataConstants.MESSAGE_TIMESTAMP, Long.toString(message.getTimestamp()));
- String prettyType = XmlDataExporterUtil.getMessagePrettyType(message.getType());
- xmlWriter.writeAttribute(XmlDataConstants.MESSAGE_TYPE, prettyType);
- if (message.getUserID() != null) {
- xmlWriter.writeAttribute(XmlDataConstants.MESSAGE_USER_ID, message.getUserID().toString());
- }
- }
-
- private List<String> extractQueueNames(HashMap<Long, ReferenceDescribe> refMap) {
- List<String> queues = new ArrayList<>();
- for (ReferenceDescribe ref : refMap.values()) {
- queues.add(queueBindings.get(ref.refEncoding.queueID).getQueueName().toString());
- }
- return queues;
- }
-
- // Inner classes -------------------------------------------------
-
- /**
- * Proxy to handle indenting the XML since <code>javax.xml.stream.XMLStreamWriter</code> doesn't support that.
- */
- static class PrettyPrintHandler implements InvocationHandler {
-
- private final XMLStreamWriter target;
-
- private int depth = 0;
-
- private static final char INDENT_CHAR = ' ';
-
- private static final String LINE_SEPARATOR = System.getProperty("line.separator");
-
- boolean wrap = true;
-
- PrettyPrintHandler(XMLStreamWriter target) {
- this.target = target;
- }
-
- @Override
- public Object invoke(Object proxy, Method method, Object[] args) throws Throwable {
- String m = method.getName();
-
- switch (m) {
- case "writeStartElement":
- target.writeCharacters(LINE_SEPARATOR);
- target.writeCharacters(indent(depth));
-
- depth++;
- break;
- case "writeEndElement":
- depth--;
- if (wrap) {
- target.writeCharacters(LINE_SEPARATOR);
- target.writeCharacters(indent(depth));
- }
- wrap = true;
- break;
- case "writeEmptyElement":
- case "writeCData":
- target.writeCharacters(LINE_SEPARATOR);
- target.writeCharacters(indent(depth));
- break;
- case "writeCharacters":
- wrap = false;
- break;
- }
-
- method.invoke(target, args);
-
- return null;
- }
-
- private String indent(int depth) {
- depth *= 3; // level of indentation
- char[] output = new char[depth];
- while (depth-- > 0) {
- output[depth] = INDENT_CHAR;
- }
- return new String(output);
- }
- }
-}
http://git-wip-us.apache.org/repos/asf/activemq-artemis/blob/f82623a2/artemis-cli/src/main/java/org/apache/activemq/artemis/cli/commands/tools/XmlDataExporterUtil.java
----------------------------------------------------------------------
diff --git a/artemis-cli/src/main/java/org/apache/activemq/artemis/cli/commands/tools/XmlDataExporterUtil.java b/artemis-cli/src/main/java/org/apache/activemq/artemis/cli/commands/tools/XmlDataExporterUtil.java
deleted file mode 100644
index 7711648..0000000
--- a/artemis-cli/src/main/java/org/apache/activemq/artemis/cli/commands/tools/XmlDataExporterUtil.java
+++ /dev/null
@@ -1,107 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements. See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package org.apache.activemq.artemis.cli.commands.tools;
-
-import com.google.common.base.Preconditions;
-import org.apache.activemq.artemis.api.core.ActiveMQBuffer;
-import org.apache.activemq.artemis.api.core.Message;
-import org.apache.activemq.artemis.api.core.SimpleString;
-import org.apache.activemq.artemis.utils.Base64;
-
-/**
- * Common utility methods to help with XML message conversion
- */
-public class XmlDataExporterUtil {
-
- public static String convertProperty(final Object value) {
- if (value instanceof byte[]) {
- return encode((byte[]) value);
- } else {
- return value == null ? XmlDataConstants.NULL : value.toString();
- }
- }
-
- public static String getPropertyType(final Object value) {
- String stringValue = null;
-
- // if the value is null then we can't really know what it is so just set
- // the type to the most generic thing
- if (value == null) {
- stringValue = XmlDataConstants.PROPERTY_TYPE_BYTES;
- } else if (value instanceof Boolean) {
- stringValue = XmlDataConstants.PROPERTY_TYPE_BOOLEAN;
- } else if (value instanceof Byte) {
- stringValue = XmlDataConstants.PROPERTY_TYPE_BYTE;
- } else if (value instanceof Short) {
- stringValue = XmlDataConstants.PROPERTY_TYPE_SHORT;
- } else if (value instanceof Integer) {
- stringValue = XmlDataConstants.PROPERTY_TYPE_INTEGER;
- } else if (value instanceof Long) {
- stringValue = XmlDataConstants.PROPERTY_TYPE_LONG;
- } else if (value instanceof Float) {
- stringValue = XmlDataConstants.PROPERTY_TYPE_FLOAT;
- } else if (value instanceof Double) {
- stringValue = XmlDataConstants.PROPERTY_TYPE_DOUBLE;
- } else if (value instanceof String) {
- stringValue = XmlDataConstants.PROPERTY_TYPE_STRING;
- } else if (value instanceof SimpleString) {
- stringValue = XmlDataConstants.PROPERTY_TYPE_SIMPLE_STRING;
- } else if (value instanceof byte[]) {
- stringValue = XmlDataConstants.PROPERTY_TYPE_BYTES;
- }
-
- return stringValue;
- }
-
- public static String getMessagePrettyType(byte rawType) {
- String prettyType = XmlDataConstants.DEFAULT_TYPE_PRETTY;
-
- if (rawType == Message.BYTES_TYPE) {
- prettyType = XmlDataConstants.BYTES_TYPE_PRETTY;
- } else if (rawType == Message.MAP_TYPE) {
- prettyType = XmlDataConstants.MAP_TYPE_PRETTY;
- } else if (rawType == Message.OBJECT_TYPE) {
- prettyType = XmlDataConstants.OBJECT_TYPE_PRETTY;
- } else if (rawType == Message.STREAM_TYPE) {
- prettyType = XmlDataConstants.STREAM_TYPE_PRETTY;
- } else if (rawType == Message.TEXT_TYPE) {
- prettyType = XmlDataConstants.TEXT_TYPE_PRETTY;
- }
-
- return prettyType;
- }
-
- /**
- * Base64 encode a ServerMessage body into the proper XML format
- *
- * @param message
- * @return
- */
- public static String encodeMessageBody(final Message message) throws Exception {
- Preconditions.checkNotNull(message, "ServerMessage can not be null");
-
- ActiveMQBuffer byteBuffer = message.toCore().getReadOnlyBodyBuffer();
- byte[] buffer = new byte[byteBuffer.writerIndex()];
- byteBuffer.readBytes(buffer);
-
- return XmlDataExporterUtil.encode(buffer);
- }
-
- protected static String encode(final byte[] data) {
- return Base64.encodeBytes(data, 0, data.length, Base64.DONT_BREAK_LINES | Base64.URL_SAFE);
- }
-}