You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@activemq.apache.org by jb...@apache.org on 2017/05/04 16:05:50 UTC
[3/6] activemq-artemis git commit: ARTEMIS-904 Remove cyclic
dependencies from artemis-cli
http://git-wip-us.apache.org/repos/asf/activemq-artemis/blob/f82623a2/artemis-cli/src/main/java/org/apache/activemq/artemis/cli/commands/tools/XmlDataImporter.java
----------------------------------------------------------------------
diff --git a/artemis-cli/src/main/java/org/apache/activemq/artemis/cli/commands/tools/XmlDataImporter.java b/artemis-cli/src/main/java/org/apache/activemq/artemis/cli/commands/tools/XmlDataImporter.java
deleted file mode 100644
index 518d231..0000000
--- a/artemis-cli/src/main/java/org/apache/activemq/artemis/cli/commands/tools/XmlDataImporter.java
+++ /dev/null
@@ -1,690 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements. See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package org.apache.activemq.artemis.cli.commands.tools;
-
-import javax.xml.XMLConstants;
-import javax.xml.stream.XMLInputFactory;
-import javax.xml.stream.XMLStreamConstants;
-import javax.xml.stream.XMLStreamException;
-import javax.xml.stream.XMLStreamReader;
-import javax.xml.transform.stax.StAXSource;
-import javax.xml.validation.Schema;
-import javax.xml.validation.SchemaFactory;
-import javax.xml.validation.Validator;
-import java.io.BufferedInputStream;
-import java.io.File;
-import java.io.FileInputStream;
-import java.io.FileOutputStream;
-import java.io.IOException;
-import java.io.InputStream;
-import java.io.OutputStream;
-import java.net.URL;
-import java.nio.ByteBuffer;
-import java.security.AccessController;
-import java.security.PrivilegedAction;
-import java.util.ArrayList;
-import java.util.HashMap;
-import java.util.HashSet;
-import java.util.Map;
-import java.util.Set;
-import java.util.UUID;
-
-import io.airlift.airline.Command;
-import io.airlift.airline.Option;
-import org.apache.activemq.artemis.api.core.ICoreMessage;
-import org.apache.activemq.artemis.api.core.Message;
-import org.apache.activemq.artemis.api.core.RoutingType;
-import org.apache.activemq.artemis.api.core.SimpleString;
-import org.apache.activemq.artemis.api.core.TransportConfiguration;
-import org.apache.activemq.artemis.api.core.client.ActiveMQClient;
-import org.apache.activemq.artemis.api.core.client.ClientMessage;
-import org.apache.activemq.artemis.api.core.client.ClientProducer;
-import org.apache.activemq.artemis.api.core.client.ClientRequestor;
-import org.apache.activemq.artemis.api.core.client.ClientSession;
-import org.apache.activemq.artemis.api.core.client.ClientSessionFactory;
-import org.apache.activemq.artemis.api.core.client.ServerLocator;
-import org.apache.activemq.artemis.api.core.management.ManagementHelper;
-import org.apache.activemq.artemis.api.core.management.ResourceNames;
-import org.apache.activemq.artemis.cli.commands.ActionAbstract;
-import org.apache.activemq.artemis.cli.commands.ActionContext;
-import org.apache.activemq.artemis.core.remoting.impl.netty.NettyConnectorFactory;
-import org.apache.activemq.artemis.core.remoting.impl.netty.TransportConstants;
-import org.apache.activemq.artemis.core.server.ActiveMQServerLogger;
-import org.apache.activemq.artemis.utils.Base64;
-import org.apache.activemq.artemis.utils.ClassloadingUtil;
-import org.apache.activemq.artemis.utils.ListUtil;
-import org.apache.activemq.artemis.utils.UUIDGenerator;
-import org.jboss.logging.Logger;
-
-/**
- * Read XML output from <code>org.apache.activemq.artemis.core.persistence.impl.journal.XmlDataExporter</code>, create a core session, and
- * send the messages to a running instance of ActiveMQ Artemis. It uses the StAX <code>javax.xml.stream.XMLStreamReader</code>
- * for speed and simplicity.
- */
-@Command(name = "imp", description = "Import all message-data using an XML that could be interpreted by any system.")
-public final class XmlDataImporter extends ActionAbstract {
- // Constants -----------------------------------------------------
-
- private static final Logger logger = Logger.getLogger(XmlDataImporter.class);
-
- // Attributes ----------------------------------------------------
-
- private XMLStreamReader reader;
-
- // this session is really only needed if the "session" variable does not auto-commit sends
- ClientSession managementSession;
-
- boolean localSession = false;
-
- final Map<String, String> addressMap = new HashMap<>();
-
- final Map<String, Long> queueIDs = new HashMap<>();
-
- String tempFileName = "";
-
- private ClientSession session;
-
- @Option(name = "--host", description = "The host used to import the data (default localhost)")
- public String host = "localhost";
-
- @Option(name = "--port", description = "The port used to import the data (default 61616)")
- public int port = 61616;
-
- @Option(name = "--transaction", description = "If this is set to true you will need a whole transaction to commit at the end. (default false)")
- public boolean transactional;
-
- @Option(name = "--user", description = "User name used to import the data. (default null)")
- public String user = null;
-
- @Option(name = "--password", description = "User name used to import the data. (default null)")
- public String password = null;
-
- @Option(name = "--input", description = "The input file name (default=exp.dmp)", required = true)
- public String input = "exp.dmp";
-
- public String getPassword() {
- return password;
- }
-
- public void setPassword(String password) {
- this.password = password;
- }
-
- public String getUser() {
- return user;
- }
-
- public void setUser(String user) {
- this.user = user;
- }
-
- @Override
- public Object execute(ActionContext context) throws Exception {
- process(input, host, port, transactional);
- return null;
- }
-
- public void process(String inputFile, String host, int port, boolean transactional) throws Exception {
- this.process(new FileInputStream(inputFile), host, port, transactional);
- }
-
- /**
- * This is the normal constructor for programmatic access to the
- * <code>org.apache.activemq.artemis.core.persistence.impl.journal.XmlDataImporter</code> if the session passed
- * in uses auto-commit for sends.
- * <br>
- * If the session needs to be transactional then use the constructor which takes 2 sessions.
- *
- * @param inputStream the stream from which to read the XML for import
- * @param session used for sending messages, must use auto-commit for sends
- * @throws Exception
- */
- public void process(InputStream inputStream, ClientSession session) throws Exception {
- this.process(inputStream, session, null);
- }
-
- /**
- * This is the constructor to use if you wish to import all messages transactionally.
- * <br>
- * Pass in a session which doesn't use auto-commit for sends, and one that does (for management
- * operations necessary during import).
- *
- * @param inputStream the stream from which to read the XML for import
- * @param session used for sending messages, doesn't need to auto-commit sends
- * @param managementSession used for management queries, must use auto-commit for sends
- */
- public void process(InputStream inputStream,
- ClientSession session,
- ClientSession managementSession) throws Exception {
- reader = XMLInputFactory.newInstance().createXMLStreamReader(inputStream);
- this.session = session;
- if (managementSession != null) {
- this.managementSession = managementSession;
- } else {
- this.managementSession = session;
- }
-
- processXml();
-
- }
-
- public void process(InputStream inputStream, String host, int port, boolean transactional) throws Exception {
- HashMap<String, Object> connectionParams = new HashMap<>();
- connectionParams.put(TransportConstants.HOST_PROP_NAME, host);
- connectionParams.put(TransportConstants.PORT_PROP_NAME, Integer.toString(port));
- ServerLocator serverLocator = ActiveMQClient.createServerLocatorWithoutHA(new TransportConfiguration(NettyConnectorFactory.class.getName(), connectionParams));
- ClientSessionFactory sf = serverLocator.createSessionFactory();
-
- ClientSession session;
- ClientSession managementSession;
-
- if (user != null || password != null) {
- session = sf.createSession(user, password, false, !transactional, true, false, 0);
- managementSession = sf.createSession(user, password, false, true, true, false, 0);
- } else {
- session = sf.createSession(false, !transactional, true);
- managementSession = sf.createSession(false, true, true);
- }
- localSession = true;
-
- process(inputStream, session, managementSession);
- }
-
- public void validate(String file) throws Exception {
- validate(new FileInputStream(file));
- }
-
- public void validate(InputStream inputStream) throws Exception {
- XMLStreamReader reader = XMLInputFactory.newInstance().createXMLStreamReader(inputStream);
- SchemaFactory factory = SchemaFactory.newInstance(XMLConstants.W3C_XML_SCHEMA_NS_URI);
- Schema schema = factory.newSchema(XmlDataImporter.findResource("schema/artemis-import-export.xsd"));
-
- Validator validator = schema.newValidator();
- validator.validate(new StAXSource(reader));
- reader.close();
- }
-
- private static URL findResource(final String resourceName) {
- return AccessController.doPrivileged(new PrivilegedAction<URL>() {
- @Override
- public URL run() {
- return ClassloadingUtil.findResource(resourceName);
- }
- });
- }
-
- private void processXml() throws Exception {
- try {
- while (reader.hasNext()) {
- if (logger.isDebugEnabled()) {
- logger.debug("EVENT:[" + reader.getLocation().getLineNumber() + "][" + reader.getLocation().getColumnNumber() + "] ");
- }
- if (reader.getEventType() == XMLStreamConstants.START_ELEMENT) {
- if (XmlDataConstants.QUEUE_BINDINGS_CHILD.equals(reader.getLocalName())) {
- bindQueue();
- } else if (XmlDataConstants.ADDRESS_BINDINGS_CHILD.equals(reader.getLocalName())) {
- bindAddress();
- } else if (XmlDataConstants.MESSAGES_CHILD.equals(reader.getLocalName())) {
- processMessage();
- }
- }
- reader.next();
- }
-
- if (!session.isAutoCommitSends()) {
- session.commit();
- }
- } finally {
- // if the session was created in our constructor then close it (otherwise the caller will close it)
- if (localSession) {
- session.close();
- managementSession.close();
- }
- }
- }
-
- private void processMessage() throws Exception {
- Byte type = 0;
- Byte priority = 0;
- Long expiration = 0L;
- Long timestamp = 0L;
- org.apache.activemq.artemis.utils.UUID userId = null;
- ArrayList<String> queues = new ArrayList<>();
-
- // get message's attributes
- for (int i = 0; i < reader.getAttributeCount(); i++) {
- String attributeName = reader.getAttributeLocalName(i);
- switch (attributeName) {
- case XmlDataConstants.MESSAGE_TYPE:
- type = getMessageType(reader.getAttributeValue(i));
- break;
- case XmlDataConstants.MESSAGE_PRIORITY:
- priority = Byte.parseByte(reader.getAttributeValue(i));
- break;
- case XmlDataConstants.MESSAGE_EXPIRATION:
- expiration = Long.parseLong(reader.getAttributeValue(i));
- break;
- case XmlDataConstants.MESSAGE_TIMESTAMP:
- timestamp = Long.parseLong(reader.getAttributeValue(i));
- break;
- case XmlDataConstants.MESSAGE_USER_ID:
- userId = UUIDGenerator.getInstance().generateUUID();
- break;
- }
- }
-
- Message message = session.createMessage(type, true, expiration, timestamp, priority);
- message.setUserID(userId);
-
- boolean endLoop = false;
-
- // loop through the XML and gather up all the message's data (i.e. body, properties, queues, etc.)
- while (reader.hasNext()) {
- int eventType = reader.getEventType();
- switch (eventType) {
- case XMLStreamConstants.START_ELEMENT:
- if (XmlDataConstants.MESSAGE_BODY.equals(reader.getLocalName())) {
- processMessageBody(message.toCore());
- } else if (XmlDataConstants.PROPERTIES_CHILD.equals(reader.getLocalName())) {
- processMessageProperties(message);
- } else if (XmlDataConstants.QUEUES_CHILD.equals(reader.getLocalName())) {
- processMessageQueues(queues);
- }
- break;
- case XMLStreamConstants.END_ELEMENT:
- if (XmlDataConstants.MESSAGES_CHILD.equals(reader.getLocalName())) {
- endLoop = true;
- }
- break;
- }
- if (endLoop) {
- break;
- }
- reader.next();
- }
-
- sendMessage(queues, message);
- }
-
- private Byte getMessageType(String value) {
- Byte type = Message.DEFAULT_TYPE;
- switch (value) {
- case XmlDataConstants.DEFAULT_TYPE_PRETTY:
- type = Message.DEFAULT_TYPE;
- break;
- case XmlDataConstants.BYTES_TYPE_PRETTY:
- type = Message.BYTES_TYPE;
- break;
- case XmlDataConstants.MAP_TYPE_PRETTY:
- type = Message.MAP_TYPE;
- break;
- case XmlDataConstants.OBJECT_TYPE_PRETTY:
- type = Message.OBJECT_TYPE;
- break;
- case XmlDataConstants.STREAM_TYPE_PRETTY:
- type = Message.STREAM_TYPE;
- break;
- case XmlDataConstants.TEXT_TYPE_PRETTY:
- type = Message.TEXT_TYPE;
- break;
- }
- return type;
- }
-
- private void sendMessage(ArrayList<String> queues, Message message) throws Exception {
- StringBuilder logMessage = new StringBuilder();
- String destination = addressMap.get(queues.get(0));
-
- logMessage.append("Sending ").append(message).append(" to address: ").append(destination).append("; routed to queues: ");
- ByteBuffer buffer = ByteBuffer.allocate(queues.size() * 8);
-
- for (String queue : queues) {
- long queueID;
-
- if (queueIDs.containsKey(queue)) {
- queueID = queueIDs.get(queue);
- } else {
- // Get the ID of the queues involved so the message can be routed properly. This is done because we cannot
- // send directly to a queue, we have to send to an address instead but not all the queues related to the
- // address may need the message
- try (ClientRequestor requestor = new ClientRequestor(managementSession, "activemq.management")) {
- ClientMessage managementMessage = managementSession.createMessage(false);
- ManagementHelper.putAttribute(managementMessage, ResourceNames.QUEUE + queue, "ID");
- managementSession.start();
- if (logger.isDebugEnabled()) {
- logger.debug("Requesting ID for: " + queue);
- }
- ClientMessage reply = requestor.request(managementMessage);
- Number idObject = (Number) ManagementHelper.getResult(reply);
- queueID = idObject.longValue();
- }
- if (logger.isDebugEnabled()) {
- logger.debug("ID for " + queue + " is: " + queueID);
- }
- queueIDs.put(queue, queueID); // store it so we don't have to look it up every time
- }
-
- logMessage.append(queue).append(", ");
- buffer.putLong(queueID);
- }
-
- logMessage.delete(logMessage.length() - 2, logMessage.length()); // take off the trailing comma
- if (logger.isDebugEnabled()) {
- logger.debug(logMessage);
- }
-
- message.putBytesProperty(Message.HDR_ROUTE_TO_IDS, buffer.array());
- try (ClientProducer producer = session.createProducer(destination)) {
- producer.send(message);
- }
-
- if (tempFileName.length() > 0) {
- File tempFile = new File(tempFileName);
- if (!tempFile.delete()) {
- ActiveMQServerLogger.LOGGER.couldNotDeleteTempFile(tempFileName);
- }
- tempFileName = "";
- }
- }
-
- private void processMessageQueues(ArrayList<String> queues) {
- for (int i = 0; i < reader.getAttributeCount(); i++) {
- if (XmlDataConstants.QUEUE_NAME.equals(reader.getAttributeLocalName(i))) {
- queues.add(reader.getAttributeValue(i));
- }
- }
- }
-
- private void processMessageProperties(Message message) {
- String key = "";
- String value = "";
- String propertyType = "";
- String realStringValue = null;
- SimpleString realSimpleStringValue = null;
-
- for (int i = 0; i < reader.getAttributeCount(); i++) {
- String attributeName = reader.getAttributeLocalName(i);
- switch (attributeName) {
- case XmlDataConstants.PROPERTY_NAME:
- key = reader.getAttributeValue(i);
- break;
- case XmlDataConstants.PROPERTY_VALUE:
- value = reader.getAttributeValue(i);
- break;
- case XmlDataConstants.PROPERTY_TYPE:
- propertyType = reader.getAttributeValue(i);
- break;
- }
- }
-
- if (value.equals(XmlDataConstants.NULL)) {
- value = null;
- }
-
- switch (propertyType) {
- case XmlDataConstants.PROPERTY_TYPE_SHORT:
- message.putShortProperty(key, Short.parseShort(value));
- break;
- case XmlDataConstants.PROPERTY_TYPE_BOOLEAN:
- message.putBooleanProperty(key, Boolean.parseBoolean(value));
- break;
- case XmlDataConstants.PROPERTY_TYPE_BYTE:
- message.putByteProperty(key, Byte.parseByte(value));
- break;
- case XmlDataConstants.PROPERTY_TYPE_BYTES:
- message.putBytesProperty(key, value == null ? null : decode(value));
- break;
- case XmlDataConstants.PROPERTY_TYPE_DOUBLE:
- message.putDoubleProperty(key, Double.parseDouble(value));
- break;
- case XmlDataConstants.PROPERTY_TYPE_FLOAT:
- message.putFloatProperty(key, Float.parseFloat(value));
- break;
- case XmlDataConstants.PROPERTY_TYPE_INTEGER:
- message.putIntProperty(key, Integer.parseInt(value));
- break;
- case XmlDataConstants.PROPERTY_TYPE_LONG:
- message.putLongProperty(key, Long.parseLong(value));
- break;
- case XmlDataConstants.PROPERTY_TYPE_SIMPLE_STRING:
- message.putStringProperty(new SimpleString(key), value == null ? null : SimpleString.toSimpleString(value));
- break;
- case XmlDataConstants.PROPERTY_TYPE_STRING:
- message.putStringProperty(key, value);
- break;
- }
- }
-
- private void processMessageBody(final ICoreMessage message) throws XMLStreamException, IOException {
- boolean isLarge = false;
-
- for (int i = 0; i < reader.getAttributeCount(); i++) {
- String attributeName = reader.getAttributeLocalName(i);
- if (XmlDataConstants.MESSAGE_IS_LARGE.equals(attributeName)) {
- isLarge = Boolean.parseBoolean(reader.getAttributeValue(i));
- }
- }
- reader.next();
- if (logger.isDebugEnabled()) {
- logger.debug("XMLStreamReader impl: " + reader);
- }
- if (isLarge) {
- tempFileName = UUID.randomUUID().toString() + ".tmp";
- if (logger.isDebugEnabled()) {
- logger.debug("Creating temp file " + tempFileName + " for large message.");
- }
- try (OutputStream out = new FileOutputStream(tempFileName)) {
- getMessageBodyBytes(new MessageBodyBytesProcessor() {
- @Override
- public void processBodyBytes(byte[] bytes) throws IOException {
- out.write(bytes);
- }
- });
- }
- FileInputStream fileInputStream = new FileInputStream(tempFileName);
- BufferedInputStream bufferedInput = new BufferedInputStream(fileInputStream);
- ((ClientMessage) message).setBodyInputStream(bufferedInput);
- } else {
- getMessageBodyBytes(new MessageBodyBytesProcessor() {
- @Override
- public void processBodyBytes(byte[] bytes) throws IOException {
- message.getBodyBuffer().writeBytes(bytes);
- }
- });
- }
- }
-
- /**
- * Message bodies are written to XML as one or more Base64 encoded CDATA elements. Some parser implementations won't
- * read an entire CDATA element at once (e.g. Woodstox) so it's possible that multiple CDATA/CHARACTERS events need
- * to be combined to reconstruct the Base64 encoded string. You can't decode bits and pieces of each CDATA. Each
- * CDATA has to be decoded in its entirety.
- *
- * @param processor used to deal with the decoded CDATA elements
- * @throws IOException
- * @throws XMLStreamException
- */
- private void getMessageBodyBytes(MessageBodyBytesProcessor processor) throws IOException, XMLStreamException {
- int currentEventType;
- StringBuilder cdata = new StringBuilder();
- while (reader.hasNext()) {
- currentEventType = reader.getEventType();
- if (currentEventType == XMLStreamConstants.END_ELEMENT) {
- break;
- } else if (currentEventType == XMLStreamConstants.CHARACTERS && reader.isWhiteSpace() && cdata.length() > 0) {
- /* when we hit a whitespace CHARACTERS event we know that the entire CDATA is complete so decode, pass back to
- * the processor, and reset the cdata for the next event(s)
- */
- processor.processBodyBytes(decode(cdata.toString()));
- cdata.setLength(0);
- } else {
- cdata.append(new String(reader.getTextCharacters(), reader.getTextStart(), reader.getTextLength()).trim());
- }
- reader.next();
- }
- }
-
- private void bindQueue() throws Exception {
- String queueName = "";
- String address = "";
- String filter = "";
- String routingType = "";
-
- for (int i = 0; i < reader.getAttributeCount(); i++) {
- String attributeName = reader.getAttributeLocalName(i);
- switch (attributeName) {
- case XmlDataConstants.QUEUE_BINDING_ADDRESS:
- address = reader.getAttributeValue(i);
- break;
- case XmlDataConstants.QUEUE_BINDING_NAME:
- queueName = reader.getAttributeValue(i);
- break;
- case XmlDataConstants.QUEUE_BINDING_FILTER_STRING:
- filter = reader.getAttributeValue(i);
- break;
- case XmlDataConstants.QUEUE_BINDING_ROUTING_TYPE:
- routingType = reader.getAttributeValue(i);
- break;
- }
- }
-
- ClientSession.QueueQuery queueQuery = session.queueQuery(new SimpleString(queueName));
-
- if (!queueQuery.isExists()) {
- session.createQueue(address, RoutingType.valueOf(routingType), queueName, filter, true);
- if (logger.isDebugEnabled()) {
- logger.debug("Binding queue(name=" + queueName + ", address=" + address + ", filter=" + filter + ")");
- }
- } else {
- if (logger.isDebugEnabled()) {
- logger.debug("Binding " + queueName + " already exists so won't re-bind.");
- }
- }
-
- addressMap.put(queueName, address);
- }
-
- private void bindAddress() throws Exception {
- String addressName = "";
- String routingTypes = "";
-
- for (int i = 0; i < reader.getAttributeCount(); i++) {
- String attributeName = reader.getAttributeLocalName(i);
- switch (attributeName) {
- case XmlDataConstants.ADDRESS_BINDING_NAME:
- addressName = reader.getAttributeValue(i);
- break;
- case XmlDataConstants.ADDRESS_BINDING_ROUTING_TYPE:
- routingTypes = reader.getAttributeValue(i);
- break;
- }
- }
-
- ClientSession.AddressQuery addressQuery = session.addressQuery(new SimpleString(addressName));
-
- if (!addressQuery.isExists()) {
- Set<RoutingType> set = new HashSet<>();
- for (String routingType : ListUtil.toList(routingTypes)) {
- set.add(RoutingType.valueOf(routingType));
- }
- session.createAddress(SimpleString.toSimpleString(addressName), set, false);
- if (logger.isDebugEnabled()) {
- logger.debug("Binding address(name=" + addressName + ", routingTypes=" + routingTypes + ")");
- }
- } else {
- if (logger.isDebugEnabled()) {
- logger.debug("Binding " + addressName + " already exists so won't re-bind.");
- }
- }
- }
-
- private String getEntries() throws Exception {
- StringBuilder entry = new StringBuilder();
- boolean endLoop = false;
-
- while (reader.hasNext()) {
- int eventType = reader.getEventType();
- switch (eventType) {
- case XMLStreamConstants.START_ELEMENT:
- if (XmlDataConstants.JMS_JNDI_ENTRY.equals(reader.getLocalName())) {
- String elementText = reader.getElementText();
- entry.append(elementText).append(", ");
- if (logger.isDebugEnabled()) {
- logger.debug("JMS admin object JNDI entry: " + entry.toString());
- }
- }
- break;
- case XMLStreamConstants.END_ELEMENT:
- if (XmlDataConstants.JMS_JNDI_ENTRIES.equals(reader.getLocalName())) {
- endLoop = true;
- }
- break;
- }
- if (endLoop) {
- break;
- }
- reader.next();
- }
-
- return entry.delete(entry.length() - 2, entry.length()).toString();
- }
-
- private String getConnectors() throws Exception {
- StringBuilder entry = new StringBuilder();
- boolean endLoop = false;
-
- while (reader.hasNext()) {
- int eventType = reader.getEventType();
- switch (eventType) {
- case XMLStreamConstants.START_ELEMENT:
- if (XmlDataConstants.JMS_CONNECTION_FACTORY_CONNECTOR.equals(reader.getLocalName())) {
- entry.append(reader.getElementText()).append(", ");
- }
- break;
- case XMLStreamConstants.END_ELEMENT:
- if (XmlDataConstants.JMS_CONNECTION_FACTORY_CONNECTORS.equals(reader.getLocalName())) {
- endLoop = true;
- }
- break;
- }
- if (endLoop) {
- break;
- }
- reader.next();
- }
-
- return entry.delete(entry.length() - 2, entry.length()).toString();
- }
-
- // Package protected ---------------------------------------------
-
- // Protected -----------------------------------------------------
-
- // Private -------------------------------------------------------
-
- private static byte[] decode(String data) {
- return Base64.decode(data, Base64.DONT_BREAK_LINES | Base64.URL_SAFE);
- }
-
- private interface MessageBodyBytesProcessor {
-
- void processBodyBytes(byte[] bytes) throws IOException;
- }
-
- // Inner classes -------------------------------------------------
-
-}
http://git-wip-us.apache.org/repos/asf/activemq-artemis/blob/f82623a2/artemis-cli/src/main/java/org/apache/activemq/artemis/cli/commands/tools/journal/CompactJournal.java
----------------------------------------------------------------------
diff --git a/artemis-cli/src/main/java/org/apache/activemq/artemis/cli/commands/tools/journal/CompactJournal.java b/artemis-cli/src/main/java/org/apache/activemq/artemis/cli/commands/tools/journal/CompactJournal.java
new file mode 100644
index 0000000..347bd4b
--- /dev/null
+++ b/artemis-cli/src/main/java/org/apache/activemq/artemis/cli/commands/tools/journal/CompactJournal.java
@@ -0,0 +1,66 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.activemq.artemis.cli.commands.tools.journal;
+
+import java.io.File;
+
+import io.airlift.airline.Command;
+import org.apache.activemq.artemis.cli.commands.ActionContext;
+import org.apache.activemq.artemis.cli.commands.tools.LockAbstract;
+import org.apache.activemq.artemis.core.config.Configuration;
+import org.apache.activemq.artemis.core.io.IOCriticalErrorListener;
+import org.apache.activemq.artemis.core.io.nio.NIOSequentialFileFactory;
+import org.apache.activemq.artemis.core.journal.impl.JournalImpl;
+
+@Command(name = "compact", description = "Compacts the journal of a non running server")
+public final class CompactJournal extends LockAbstract {
+
+ @Override
+ public Object execute(ActionContext context) throws Exception {
+ super.execute(context);
+ try {
+ Configuration configuration = getFileConfiguration();
+ compactJournal(new File(getJournal()), "activemq-data", "amq", configuration.getJournalMinFiles(), configuration.getJournalFileSize(), null);
+ System.out.println("Compactation succeeded for " + getJournal());
+ compactJournal(new File(getBinding()), "activemq-bindings", "bindings", 2, 1048576, null);
+ System.out.println("Compactation succeeded for " + getBinding());
+
+ } catch (Exception e) {
+ treatError(e, "data", "compact");
+ }
+ return null;
+ }
+
+ private void compactJournal(final File directory,
+ final String journalPrefix,
+ final String journalSuffix,
+ final int minFiles,
+ final int fileSize,
+ final IOCriticalErrorListener listener) throws Exception {
+ NIOSequentialFileFactory nio = new NIOSequentialFileFactory(directory, listener, 1);
+
+ JournalImpl journal = new JournalImpl(fileSize, minFiles, minFiles, 0, 0, nio, journalPrefix, journalSuffix, 1);
+
+ journal.start();
+
+ journal.loadInternalOnly();
+
+ journal.compact();
+
+ journal.stop();
+ }
+}
http://git-wip-us.apache.org/repos/asf/activemq-artemis/blob/f82623a2/artemis-cli/src/main/java/org/apache/activemq/artemis/cli/commands/tools/journal/DecodeJournal.java
----------------------------------------------------------------------
diff --git a/artemis-cli/src/main/java/org/apache/activemq/artemis/cli/commands/tools/journal/DecodeJournal.java b/artemis-cli/src/main/java/org/apache/activemq/artemis/cli/commands/tools/journal/DecodeJournal.java
new file mode 100644
index 0000000..35a4ae2
--- /dev/null
+++ b/artemis-cli/src/main/java/org/apache/activemq/artemis/cli/commands/tools/journal/DecodeJournal.java
@@ -0,0 +1,296 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.activemq.artemis.cli.commands.tools.journal;
+
+import java.io.BufferedReader;
+import java.io.File;
+import java.io.FileInputStream;
+import java.io.InputStream;
+import java.io.InputStreamReader;
+import java.io.Reader;
+import java.util.HashMap;
+import java.util.Map;
+import java.util.Properties;
+import java.util.concurrent.atomic.AtomicInteger;
+
+import io.airlift.airline.Command;
+import io.airlift.airline.Option;
+import org.apache.activemq.artemis.cli.commands.ActionContext;
+import org.apache.activemq.artemis.cli.commands.tools.LockAbstract;
+import org.apache.activemq.artemis.core.io.nio.NIOSequentialFileFactory;
+import org.apache.activemq.artemis.core.journal.RecordInfo;
+import org.apache.activemq.artemis.core.journal.impl.JournalImpl;
+import org.apache.activemq.artemis.utils.Base64;
+
+@Command(name = "decode", description = "Decode a journal's internal format into a new journal set of files")
+public class DecodeJournal extends LockAbstract {
+
+ @Option(name = "--directory", description = "The journal folder (default journal folder from broker.xml)")
+ public String directory;
+
+ @Option(name = "--prefix", description = "The journal prefix (default activemq-data)")
+ public String prefix = "activemq-data";
+
+ @Option(name = "--suffix", description = "The journal suffix (default amq)")
+ public String suffix = "amq";
+
+ @Option(name = "--file-size", description = "The journal size (default 10485760)")
+ public int size = 10485760;
+
+ @Option(name = "--input", description = "The input file name (default=exp.dmp)", required = true)
+ public String input = "exp.dmp";
+
+ @Override
+ public Object execute(ActionContext context) throws Exception {
+ super.execute(context);
+ try {
+ if (directory == null) {
+ directory = getFileConfiguration().getJournalDirectory();
+ }
+ importJournal(directory, prefix, suffix, 2, size, input);
+ } catch (Exception e) {
+ treatError(e, "data", "decode");
+ }
+
+ return null;
+ }
+
+ public static void importJournal(final String directory,
+ final String journalPrefix,
+ final String journalSuffix,
+ final int minFiles,
+ final int fileSize,
+ final String fileInput) throws Exception {
+ FileInputStream fileInputStream = new FileInputStream(new File(fileInput));
+ importJournal(directory, journalPrefix, journalSuffix, minFiles, fileSize, fileInputStream);
+
+ }
+
+ private static void importJournal(final String directory,
+ final String journalPrefix,
+ final String journalSuffix,
+ final int minFiles,
+ final int fileSize,
+ final InputStream stream) throws Exception {
+ Reader reader = new InputStreamReader(stream);
+ importJournal(directory, journalPrefix, journalSuffix, minFiles, fileSize, reader);
+ }
+
+ public static void importJournal(final String directory,
+ final String journalPrefix,
+ final String journalSuffix,
+ final int minFiles,
+ final int fileSize,
+ final Reader reader) throws Exception {
+
+ File journalDir = new File(directory);
+
+ if (!journalDir.exists()) {
+ if (!journalDir.mkdirs())
+ System.err.println("Could not create directory " + directory);
+ }
+
+ NIOSequentialFileFactory nio = new NIOSequentialFileFactory(new File(directory), null, 1);
+
+ JournalImpl journal = new JournalImpl(fileSize, minFiles, minFiles, 0, 0, nio, journalPrefix, journalSuffix, 1);
+
+ if (journal.orderFiles().size() != 0) {
+ throw new IllegalStateException("Import needs to create a brand new journal");
+ }
+
+ journal.start();
+
+ // The journal is empty, as we checked already. Calling load just to initialize the internal data
+ journal.loadInternalOnly();
+
+ BufferedReader buffReader = new BufferedReader(reader);
+
+ String line;
+
+ HashMap<Long, AtomicInteger> txCounters = new HashMap<>();
+
+ long lineNumber = 0;
+
+ while ((line = buffReader.readLine()) != null) {
+ lineNumber++;
+ String[] splitLine = line.split(",");
+ if (splitLine[0].equals("#File")) {
+ txCounters.clear();
+ continue;
+ }
+
+ Properties lineProperties = parseLine(splitLine);
+
+ String operation = null;
+ try {
+ operation = lineProperties.getProperty("operation");
+
+ if (operation.equals("AddRecord")) {
+ RecordInfo info = parseRecord(lineProperties);
+ journal.appendAddRecord(info.id, info.userRecordType, info.data, false);
+ } else if (operation.equals("AddRecordTX")) {
+ long txID = parseLong("txID", lineProperties);
+ AtomicInteger counter = getCounter(txID, txCounters);
+ counter.incrementAndGet();
+ RecordInfo info = parseRecord(lineProperties);
+ journal.appendAddRecordTransactional(txID, info.id, info.userRecordType, info.data);
+ } else if (operation.equals("UpdateTX")) {
+ long txID = parseLong("txID", lineProperties);
+ AtomicInteger counter = getCounter(txID, txCounters);
+ counter.incrementAndGet();
+ RecordInfo info = parseRecord(lineProperties);
+ journal.appendUpdateRecordTransactional(txID, info.id, info.userRecordType, info.data);
+ } else if (operation.equals("Update")) {
+ RecordInfo info = parseRecord(lineProperties);
+ journal.appendUpdateRecord(info.id, info.userRecordType, info.data, false);
+ } else if (operation.equals("DeleteRecord")) {
+ long id = parseLong("id", lineProperties);
+
+ try {
+ journal.appendDeleteRecord(id, false);
+ } catch (IllegalStateException ignored) {
+ // If not found it means the append/update records were reclaimed already
+ }
+ } else if (operation.equals("DeleteRecordTX")) {
+ long txID = parseLong("txID", lineProperties);
+ long id = parseLong("id", lineProperties);
+ AtomicInteger counter = getCounter(txID, txCounters);
+ counter.incrementAndGet();
+ journal.appendDeleteRecordTransactional(txID, id);
+ } else if (operation.equals("Prepare")) {
+ long txID = parseLong("txID", lineProperties);
+ int numberOfRecords = parseInt("numberOfRecords", lineProperties);
+ AtomicInteger counter = getCounter(txID, txCounters);
+ byte[] data = parseEncoding("extraData", lineProperties);
+
+ if (counter.get() == numberOfRecords) {
+ journal.appendPrepareRecord(txID, data, false);
+ } else {
+ System.err.println("Transaction " + txID +
+ " at line " +
+ lineNumber +
+ " is incomplete. The prepare record expected " +
+ numberOfRecords +
+ " while the import only had " +
+ counter);
+ }
+ } else if (operation.equals("Commit")) {
+ long txID = parseLong("txID", lineProperties);
+ int numberOfRecords = parseInt("numberOfRecords", lineProperties);
+ AtomicInteger counter = getCounter(txID, txCounters);
+ if (counter.get() == numberOfRecords) {
+ journal.appendCommitRecord(txID, false);
+ } else {
+ System.err.println("Transaction " + txID +
+ " at line " +
+ lineNumber +
+ " is incomplete. The commit record expected " +
+ numberOfRecords +
+ " while the import only had " +
+ counter);
+ }
+ } else if (operation.equals("Rollback")) {
+ long txID = parseLong("txID", lineProperties);
+ journal.appendRollbackRecord(txID, false);
+ } else {
+ System.err.println("Invalid operation " + operation + " at line " + lineNumber);
+ }
+ } catch (Exception ex) {
+ System.err.println("Error at line " + lineNumber + ", operation=" + operation + " msg = " + ex.getMessage());
+ }
+ }
+
+ journal.stop();
+ }
+
+ private static AtomicInteger getCounter(final Long txID, final Map<Long, AtomicInteger> txCounters) {
+ AtomicInteger counter = txCounters.get(txID);
+ if (counter == null) {
+ counter = new AtomicInteger(0);
+ txCounters.put(txID, counter);
+ }
+
+ return counter;
+ }
+
+ private static RecordInfo parseRecord(final Properties properties) throws Exception {
+ long id = parseLong("id", properties);
+ byte userRecordType = parseByte("userRecordType", properties);
+ boolean isUpdate = parseBoolean("isUpdate", properties);
+ byte[] data = parseEncoding("data", properties);
+ return new RecordInfo(id, userRecordType, data, isUpdate, (short) 0);
+ }
+
+ private static byte[] parseEncoding(final String name, final Properties properties) throws Exception {
+ String value = parseString(name, properties);
+
+ return decode(value);
+ }
+
+ private static int parseInt(final String name, final Properties properties) throws Exception {
+ String value = parseString(name, properties);
+
+ return Integer.parseInt(value);
+ }
+
+ private static long parseLong(final String name, final Properties properties) throws Exception {
+ String value = parseString(name, properties);
+
+ return Long.parseLong(value);
+ }
+
+ private static boolean parseBoolean(final String name, final Properties properties) throws Exception {
+ String value = parseString(name, properties);
+
+ return Boolean.parseBoolean(value);
+ }
+
+ private static byte parseByte(final String name, final Properties properties) throws Exception {
+ String value = parseString(name, properties);
+
+ return Byte.parseByte(value);
+ }
+
+ private static String parseString(final String name, final Properties properties) throws Exception {
+ String value = properties.getProperty(name);
+
+ if (value == null) {
+ throw new Exception("property " + name + " not found");
+ }
+ return value;
+ }
+
+ private static Properties parseLine(final String[] splitLine) {
+ Properties properties = new Properties();
+
+ for (String el : splitLine) {
+ String[] tuple = el.split("@");
+ if (tuple.length == 2) {
+ properties.put(tuple[0], tuple[1]);
+ } else {
+ properties.put(tuple[0], tuple[0]);
+ }
+ }
+
+ return properties;
+ }
+
+ private static byte[] decode(final String data) {
+ return Base64.decode(data, Base64.DONT_BREAK_LINES | Base64.URL_SAFE);
+ }
+
+}
http://git-wip-us.apache.org/repos/asf/activemq-artemis/blob/f82623a2/artemis-cli/src/main/java/org/apache/activemq/artemis/cli/commands/tools/journal/EncodeJournal.java
----------------------------------------------------------------------
diff --git a/artemis-cli/src/main/java/org/apache/activemq/artemis/cli/commands/tools/journal/EncodeJournal.java b/artemis-cli/src/main/java/org/apache/activemq/artemis/cli/commands/tools/journal/EncodeJournal.java
new file mode 100644
index 0000000..ec47837
--- /dev/null
+++ b/artemis-cli/src/main/java/org/apache/activemq/artemis/cli/commands/tools/journal/EncodeJournal.java
@@ -0,0 +1,191 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.activemq.artemis.cli.commands.tools.journal;
+
+import java.io.BufferedOutputStream;
+import java.io.File;
+import java.io.FileOutputStream;
+import java.io.PrintStream;
+import java.util.List;
+
+import io.airlift.airline.Command;
+import io.airlift.airline.Option;
+import org.apache.activemq.artemis.cli.commands.ActionContext;
+import org.apache.activemq.artemis.cli.commands.tools.LockAbstract;
+import org.apache.activemq.artemis.core.io.SequentialFileFactory;
+import org.apache.activemq.artemis.core.io.nio.NIOSequentialFileFactory;
+import org.apache.activemq.artemis.core.journal.RecordInfo;
+import org.apache.activemq.artemis.core.journal.impl.JournalFile;
+import org.apache.activemq.artemis.core.journal.impl.JournalImpl;
+import org.apache.activemq.artemis.core.journal.impl.JournalReaderCallback;
+import org.apache.activemq.artemis.utils.Base64;
+
+@Command(name = "encode", description = "Encode a set of journal files into an internal encoded data format")
+public class EncodeJournal extends LockAbstract {
+
+ @Option(name = "--directory", description = "The journal folder (default the journal folder from broker.xml)")
+ public String directory;
+
+ @Option(name = "--prefix", description = "The journal prefix (default activemq-data)")
+ public String prefix = "activemq-data";
+
+ @Option(name = "--suffix", description = "The journal suffix (default amq)")
+ public String suffix = "amq";
+
+ @Option(name = "--file-size", description = "The journal size (default 10485760)")
+ public int size = 10485760;
+
+ @Override
+ public Object execute(ActionContext context) throws Exception {
+ super.execute(context);
+ try {
+ if (directory == null) {
+ directory = getFileConfiguration().getJournalDirectory();
+ }
+
+ exportJournal(directory, prefix, suffix, 2, size);
+ } catch (Exception e) {
+ treatError(e, "data", "encode");
+ }
+
+ return null;
+ }
+
+ private static void exportJournal(final String directory,
+ final String journalPrefix,
+ final String journalSuffix,
+ final int minFiles,
+ final int fileSize) throws Exception {
+
+ exportJournal(directory, journalPrefix, journalSuffix, minFiles, fileSize, System.out);
+ }
+
+ public static void exportJournal(final String directory,
+ final String journalPrefix,
+ final String journalSuffix,
+ final int minFiles,
+ final int fileSize,
+ final String fileName) throws Exception {
+ try (FileOutputStream fileOutputStream = new FileOutputStream(fileName);
+ BufferedOutputStream bufferedOutputStream = new BufferedOutputStream(fileOutputStream);
+ PrintStream out = new PrintStream(bufferedOutputStream)) {
+ exportJournal(directory, journalPrefix, journalSuffix, minFiles, fileSize, out);
+ }
+ }
+
+ public static void exportJournal(final String directory,
+ final String journalPrefix,
+ final String journalSuffix,
+ final int minFiles,
+ final int fileSize,
+ final PrintStream out) throws Exception {
+ NIOSequentialFileFactory nio = new NIOSequentialFileFactory(new File(directory), null, 1);
+
+ JournalImpl journal = new JournalImpl(fileSize, minFiles, minFiles, 0, 0, nio, journalPrefix, journalSuffix, 1);
+
+ List<JournalFile> files = journal.orderFiles();
+
+ for (JournalFile file : files) {
+ out.println("#File," + file);
+
+ exportJournalFile(out, nio, file);
+ }
+ }
+
+ private static void exportJournalFile(final PrintStream out,
+ final SequentialFileFactory fileFactory,
+ final JournalFile file) throws Exception {
+ JournalImpl.readJournalFile(fileFactory, file, new JournalReaderCallback() {
+
+ @Override
+ public void onReadUpdateRecordTX(final long transactionID, final RecordInfo recordInfo) throws Exception {
+ out.println("operation@UpdateTX,txID@" + transactionID + "," + describeRecord(recordInfo));
+ }
+
+ @Override
+ public void onReadUpdateRecord(final RecordInfo recordInfo) throws Exception {
+ out.println("operation@Update," + describeRecord(recordInfo));
+ }
+
+ @Override
+ public void onReadRollbackRecord(final long transactionID) throws Exception {
+ out.println("operation@Rollback,txID@" + transactionID);
+ }
+
+ @Override
+ public void onReadPrepareRecord(final long transactionID,
+ final byte[] extraData,
+ final int numberOfRecords) throws Exception {
+ out.println("operation@Prepare,txID@" + transactionID +
+ ",numberOfRecords@" +
+ numberOfRecords +
+ ",extraData@" +
+ encode(extraData));
+ }
+
+ @Override
+ public void onReadDeleteRecordTX(final long transactionID, final RecordInfo recordInfo) throws Exception {
+ out.println("operation@DeleteRecordTX,txID@" + transactionID +
+ "," +
+ describeRecord(recordInfo));
+ }
+
+ @Override
+ public void onReadDeleteRecord(final long recordID) throws Exception {
+ out.println("operation@DeleteRecord,id@" + recordID);
+ }
+
+ @Override
+ public void onReadCommitRecord(final long transactionID, final int numberOfRecords) throws Exception {
+ out.println("operation@Commit,txID@" + transactionID + ",numberOfRecords@" + numberOfRecords);
+ }
+
+ @Override
+ public void onReadAddRecordTX(final long transactionID, final RecordInfo recordInfo) throws Exception {
+ out.println("operation@AddRecordTX,txID@" + transactionID + "," + describeRecord(recordInfo));
+ }
+
+ @Override
+ public void onReadAddRecord(final RecordInfo recordInfo) throws Exception {
+ out.println("operation@AddRecord," + describeRecord(recordInfo));
+ }
+
+ @Override
+ public void markAsDataFile(final JournalFile file) {
+ }
+ });
+ }
+
+ private static String describeRecord(final RecordInfo recordInfo) {
+ return "id@" + recordInfo.id +
+ ",userRecordType@" +
+ recordInfo.userRecordType +
+ ",length@" +
+ recordInfo.data.length +
+ ",isUpdate@" +
+ recordInfo.isUpdate +
+ ",compactCount@" +
+ recordInfo.compactCount +
+ ",data@" +
+ encode(recordInfo.data);
+ }
+
+ private static String encode(final byte[] data) {
+ return Base64.encodeBytes(data, 0, data.length, Base64.DONT_BREAK_LINES | Base64.URL_SAFE);
+ }
+
+}
http://git-wip-us.apache.org/repos/asf/activemq-artemis/blob/f82623a2/artemis-cli/src/main/java/org/apache/activemq/artemis/cli/commands/tools/journal/PerfJournal.java
----------------------------------------------------------------------
diff --git a/artemis-cli/src/main/java/org/apache/activemq/artemis/cli/commands/tools/journal/PerfJournal.java b/artemis-cli/src/main/java/org/apache/activemq/artemis/cli/commands/tools/journal/PerfJournal.java
new file mode 100644
index 0000000..3805de6
--- /dev/null
+++ b/artemis-cli/src/main/java/org/apache/activemq/artemis/cli/commands/tools/journal/PerfJournal.java
@@ -0,0 +1,92 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.activemq.artemis.cli.commands.tools.journal;
+
+import java.text.DecimalFormat;
+
+import io.airlift.airline.Command;
+import io.airlift.airline.Option;
+import org.apache.activemq.artemis.cli.commands.ActionContext;
+import org.apache.activemq.artemis.cli.commands.tools.LockAbstract;
+import org.apache.activemq.artemis.cli.commands.util.SyncCalculation;
+import org.apache.activemq.artemis.core.config.impl.FileConfiguration;
+import org.apache.activemq.artemis.core.server.JournalType;
+
+@Command(name = "perf-journal", description = "Calculates the journal-buffer-timeout you should use with the current data folder")
+public class PerfJournal extends LockAbstract {
+
+
+ @Option(name = "--block-size", description = "The block size for each write (default 4096)")
+ public int size = 4 * 1024;
+
+ @Option(name = "--writes", description = "The number of writes to be performed (default 250)")
+ public int writes = 250;
+
+ @Option(name = "--tries", description = "The number of tries for the test (default 5)")
+ public int tries = 5;
+
+ @Option(name = "--no-sync", description = "Disable sync")
+ public boolean nosyncs = false;
+
+ @Option(name = "--sync", description = "Enable syncs")
+ public boolean syncs = false;
+
+ @Option(name = "--journal-type", description = "Journal Type to be used (default from broker.xml)")
+ public String journalType = null;
+
+
+ @Override
+ public Object execute(ActionContext context) throws Exception {
+ super.execute(context);
+
+ FileConfiguration fileConfiguration = getFileConfiguration();
+
+ if (nosyncs) {
+ fileConfiguration.setJournalDatasync(false);
+ } else if (syncs) {
+ fileConfiguration.setJournalDatasync(true);
+ }
+
+
+ if (journalType != null) {
+ fileConfiguration.setJournalType(JournalType.getType(journalType));
+ }
+
+ System.out.println("");
+ System.out.println("Auto tuning journal ...");
+
+ System.out.println("Performing " + tries + " tests writing " + writes + " blocks of " + size + " on each test, sync=" + fileConfiguration.isJournalDatasync() + " with journalType = " + fileConfiguration.getJournalType());
+
+ fileConfiguration.getJournalLocation().mkdirs();
+
+ long time = SyncCalculation.syncTest(fileConfiguration.getJournalLocation(), size, writes, tries, verbose, fileConfiguration.isJournalDatasync(), fileConfiguration.getJournalType());
+
+ long nanosecondsWait = SyncCalculation.toNanos(time, writes, verbose);
+ double writesPerMillisecond = (double) writes / (double) time;
+
+ String writesPerMillisecondStr = new DecimalFormat("###.##").format(writesPerMillisecond);
+
+ context.out.println("Your system can execute " + writesPerMillisecondStr + " syncs per millisecond");
+ context.out.println("Your journal-buffer-timeout should be:" + nanosecondsWait);
+ context.out.println("You should use this following configuration:");
+ context.out.println();
+ context.out.println("<journal-buffer-timeout>" + nanosecondsWait + "</journal-buffer-timeout>");
+
+ return null;
+ }
+}
http://git-wip-us.apache.org/repos/asf/activemq-artemis/blob/f82623a2/artemis-cli/src/main/java/org/apache/activemq/artemis/cli/commands/tools/xml/XmlDataConstants.java
----------------------------------------------------------------------
diff --git a/artemis-cli/src/main/java/org/apache/activemq/artemis/cli/commands/tools/xml/XmlDataConstants.java b/artemis-cli/src/main/java/org/apache/activemq/artemis/cli/commands/tools/xml/XmlDataConstants.java
new file mode 100644
index 0000000..61c6d6b
--- /dev/null
+++ b/artemis-cli/src/main/java/org/apache/activemq/artemis/cli/commands/tools/xml/XmlDataConstants.java
@@ -0,0 +1,81 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.activemq.artemis.cli.commands.tools.xml;
+
+/**
+ * The constants shared by <code>org.apache.activemq.tools.XmlDataImporter</code> and
+ * <code>org.apache.activemq.tools.XmlDataExporter</code>.
+ */
+public final class XmlDataConstants {
+
+ private XmlDataConstants() {
+ // Utility
+ }
+
+ static final String XML_VERSION = "1.0";
+ static final String DOCUMENT_PARENT = "activemq-journal";
+ static final String BINDINGS_PARENT = "bindings";
+
+ static final String QUEUE_BINDINGS_CHILD = "queue-binding";
+ static final String QUEUE_BINDING_ADDRESS = "address";
+ static final String QUEUE_BINDING_FILTER_STRING = "filter-string";
+ static final String QUEUE_BINDING_NAME = "name";
+ static final String QUEUE_BINDING_ID = "id";
+ static final String QUEUE_BINDING_ROUTING_TYPE = "routing-type";
+
+ static final String ADDRESS_BINDINGS_CHILD = "address-binding";
+ static final String ADDRESS_BINDING_NAME = "name";
+ static final String ADDRESS_BINDING_ID = "id";
+ static final String ADDRESS_BINDING_ROUTING_TYPE = "routing-types";
+
+ static final String MESSAGES_PARENT = "messages";
+ static final String MESSAGES_CHILD = "message";
+ static final String MESSAGE_ID = "id";
+ static final String MESSAGE_PRIORITY = "priority";
+ static final String MESSAGE_EXPIRATION = "expiration";
+ static final String MESSAGE_TIMESTAMP = "timestamp";
+ static final String DEFAULT_TYPE_PRETTY = "default";
+ static final String BYTES_TYPE_PRETTY = "bytes";
+ static final String MAP_TYPE_PRETTY = "map";
+ static final String OBJECT_TYPE_PRETTY = "object";
+ static final String STREAM_TYPE_PRETTY = "stream";
+ static final String TEXT_TYPE_PRETTY = "text";
+ static final String MESSAGE_TYPE = "type";
+ static final String MESSAGE_IS_LARGE = "isLarge";
+ static final String MESSAGE_USER_ID = "user-id";
+ static final String MESSAGE_BODY = "body";
+ static final String PROPERTIES_PARENT = "properties";
+ static final String PROPERTIES_CHILD = "property";
+ static final String PROPERTY_NAME = "name";
+ static final String PROPERTY_VALUE = "value";
+ static final String PROPERTY_TYPE = "type";
+ static final String QUEUES_PARENT = "queues";
+ static final String QUEUES_CHILD = "queue";
+ public static final String QUEUE_NAME = "name";
+ static final String PROPERTY_TYPE_BOOLEAN = "boolean";
+ static final String PROPERTY_TYPE_BYTE = "byte";
+ static final String PROPERTY_TYPE_BYTES = "bytes";
+ static final String PROPERTY_TYPE_SHORT = "short";
+ static final String PROPERTY_TYPE_INTEGER = "integer";
+ static final String PROPERTY_TYPE_LONG = "long";
+ static final String PROPERTY_TYPE_FLOAT = "float";
+ static final String PROPERTY_TYPE_DOUBLE = "double";
+ static final String PROPERTY_TYPE_STRING = "string";
+ static final String PROPERTY_TYPE_SIMPLE_STRING = "simple-string";
+
+ static final String NULL = "_AMQ_NULL";
+}
\ No newline at end of file