You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@activemq.apache.org by jb...@apache.org on 2017/05/04 16:05:49 UTC
[2/6] activemq-artemis git commit: ARTEMIS-904 Remove cyclic
dependencies from artemis-cli
http://git-wip-us.apache.org/repos/asf/activemq-artemis/blob/f82623a2/artemis-cli/src/main/java/org/apache/activemq/artemis/cli/commands/tools/xml/XmlDataExporter.java
----------------------------------------------------------------------
diff --git a/artemis-cli/src/main/java/org/apache/activemq/artemis/cli/commands/tools/xml/XmlDataExporter.java b/artemis-cli/src/main/java/org/apache/activemq/artemis/cli/commands/tools/xml/XmlDataExporter.java
new file mode 100644
index 0000000..607f92b
--- /dev/null
+++ b/artemis-cli/src/main/java/org/apache/activemq/artemis/cli/commands/tools/xml/XmlDataExporter.java
@@ -0,0 +1,627 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.activemq.artemis.cli.commands.tools.xml;
+
+import javax.xml.stream.XMLOutputFactory;
+import javax.xml.stream.XMLStreamException;
+import javax.xml.stream.XMLStreamWriter;
+import java.io.File;
+import java.io.OutputStream;
+import java.lang.reflect.InvocationHandler;
+import java.lang.reflect.Method;
+import java.lang.reflect.Proxy;
+import java.util.ArrayList;
+import java.util.HashMap;
+import java.util.HashSet;
+import java.util.LinkedList;
+import java.util.List;
+import java.util.Map;
+import java.util.Set;
+import java.util.concurrent.Executor;
+import java.util.concurrent.ExecutorService;
+import java.util.concurrent.Executors;
+import java.util.concurrent.ScheduledExecutorService;
+
+import io.airlift.airline.Command;
+import org.apache.activemq.artemis.api.core.ActiveMQBuffer;
+import org.apache.activemq.artemis.api.core.ActiveMQBuffers;
+import org.apache.activemq.artemis.api.core.ActiveMQException;
+import org.apache.activemq.artemis.api.core.ICoreMessage;
+import org.apache.activemq.artemis.api.core.Message;
+import org.apache.activemq.artemis.api.core.RoutingType;
+import org.apache.activemq.artemis.api.core.SimpleString;
+import org.apache.activemq.artemis.cli.commands.ActionContext;
+import org.apache.activemq.artemis.cli.commands.tools.OptionalLocking;
+import org.apache.activemq.artemis.core.config.Configuration;
+import org.apache.activemq.artemis.core.config.impl.ConfigurationImpl;
+import org.apache.activemq.artemis.core.journal.Journal;
+import org.apache.activemq.artemis.core.journal.PreparedTransactionInfo;
+import org.apache.activemq.artemis.core.journal.RecordInfo;
+import org.apache.activemq.artemis.core.journal.TransactionFailureCallback;
+import org.apache.activemq.artemis.core.journal.impl.JournalImpl;
+import org.apache.activemq.artemis.core.message.LargeBodyEncoder;
+import org.apache.activemq.artemis.core.paging.PagedMessage;
+import org.apache.activemq.artemis.core.paging.PagingManager;
+import org.apache.activemq.artemis.core.paging.PagingStore;
+import org.apache.activemq.artemis.core.paging.PagingStoreFactory;
+import org.apache.activemq.artemis.core.paging.cursor.PagePosition;
+import org.apache.activemq.artemis.core.paging.cursor.impl.PagePositionImpl;
+import org.apache.activemq.artemis.core.paging.impl.Page;
+import org.apache.activemq.artemis.core.paging.impl.PageTransactionInfoImpl;
+import org.apache.activemq.artemis.core.paging.impl.PagingManagerImpl;
+import org.apache.activemq.artemis.core.paging.impl.PagingStoreFactoryNIO;
+import org.apache.activemq.artemis.core.persistence.impl.journal.AckDescribe;
+import org.apache.activemq.artemis.core.persistence.impl.journal.DescribeJournal;
+import org.apache.activemq.artemis.core.persistence.impl.journal.DescribeJournal.MessageDescribe;
+import org.apache.activemq.artemis.core.persistence.impl.journal.DescribeJournal.ReferenceDescribe;
+import org.apache.activemq.artemis.core.persistence.impl.journal.JournalRecordIds;
+import org.apache.activemq.artemis.core.persistence.impl.journal.JournalStorageManager;
+import org.apache.activemq.artemis.core.persistence.impl.journal.codec.CursorAckRecordEncoding;
+import org.apache.activemq.artemis.core.persistence.impl.journal.codec.PageUpdateTXEncoding;
+import org.apache.activemq.artemis.core.persistence.impl.journal.codec.PersistentAddressBindingEncoding;
+import org.apache.activemq.artemis.core.persistence.impl.journal.codec.PersistentQueueBindingEncoding;
+import org.apache.activemq.artemis.core.server.ActiveMQServerLogger;
+import org.apache.activemq.artemis.core.server.JournalType;
+import org.apache.activemq.artemis.core.server.LargeServerMessage;
+import org.apache.activemq.artemis.core.settings.HierarchicalRepository;
+import org.apache.activemq.artemis.core.settings.impl.AddressSettings;
+import org.apache.activemq.artemis.core.settings.impl.HierarchicalObjectRepository;
+import org.apache.activemq.artemis.utils.ActiveMQThreadFactory;
+import org.apache.activemq.artemis.utils.ExecutorFactory;
+import org.apache.activemq.artemis.utils.OrderedExecutorFactory;
+
+@Command(name = "exp", description = "Export all message-data using an XML that could be interpreted by any system.")
+public final class XmlDataExporter extends OptionalLocking {
+
+ private static final Long LARGE_MESSAGE_CHUNK_SIZE = 1000L;
+
+ private JournalStorageManager storageManager;
+
+ private Configuration config;
+
+ private XMLStreamWriter xmlWriter;
+
+ // an inner map of message refs hashed by the queue ID to which they belong and then hashed by their record ID
+ private final Map<Long, HashMap<Long, ReferenceDescribe>> messageRefs = new HashMap<>();
+
+ // map of all message records hashed by their record ID (which will match the record ID of the message refs)
+ private final HashMap<Long, Message> messages = new HashMap<>();
+
+ private final Map<Long, Set<PagePosition>> cursorRecords = new HashMap<>();
+
+ private final Set<Long> pgTXs = new HashSet<>();
+
+ private final HashMap<Long, PersistentQueueBindingEncoding> queueBindings = new HashMap<>();
+
+ private final HashMap<Long, PersistentAddressBindingEncoding> addressBindings = new HashMap<>();
+
+ long messagesPrinted = 0L;
+
+ long bindingsPrinted = 0L;
+
+ @Override
+ public Object execute(ActionContext context) throws Exception {
+ super.execute(context);
+
+ try {
+ process(context.out, getBinding(), getJournal(), getPaging(), getLargeMessages());
+ } catch (Exception e) {
+ treatError(e, "data", "exp");
+ }
+ return null;
+ }
+
+ public void process(OutputStream out,
+ String bindingsDir,
+ String journalDir,
+ String pagingDir,
+ String largeMessagesDir) throws Exception {
+ config = new ConfigurationImpl().setBindingsDirectory(bindingsDir).setJournalDirectory(journalDir).setPagingDirectory(pagingDir).setLargeMessagesDirectory(largeMessagesDir).setJournalType(JournalType.NIO);
+ final ExecutorService executor = Executors.newFixedThreadPool(5, ActiveMQThreadFactory.defaultThreadFactory());
+ ExecutorFactory executorFactory = new OrderedExecutorFactory(executor);
+
+ storageManager = new JournalStorageManager(config, executorFactory, executorFactory);
+
+ XMLOutputFactory factory = XMLOutputFactory.newInstance();
+ XMLStreamWriter rawXmlWriter = factory.createXMLStreamWriter(out, "UTF-8");
+ PrettyPrintHandler handler = new PrettyPrintHandler(rawXmlWriter);
+ xmlWriter = (XMLStreamWriter) Proxy.newProxyInstance(XMLStreamWriter.class.getClassLoader(), new Class[]{XMLStreamWriter.class}, handler);
+
+ writeXMLData();
+
+ executor.shutdown();
+ }
+
+ private void writeXMLData() throws Exception {
+ long start = System.currentTimeMillis();
+ getBindings();
+ processMessageJournal();
+ printDataAsXML();
+ ActiveMQServerLogger.LOGGER.debug("\n\nProcessing took: " + (System.currentTimeMillis() - start) + "ms");
+ ActiveMQServerLogger.LOGGER.debug("Output " + messagesPrinted + " messages and " + bindingsPrinted + " bindings.");
+ }
+
+ /**
+ * Read through the message journal and stuff all the events/data we care about into local data structures. We'll
+ * use this data later to print all the right information.
+ *
+ * @throws Exception will be thrown if anything goes wrong reading the journal
+ */
+ private void processMessageJournal() throws Exception {
+ ArrayList<RecordInfo> acks = new ArrayList<>();
+
+ List<RecordInfo> records = new LinkedList<>();
+
+ // We load these, but don't use them.
+ List<PreparedTransactionInfo> preparedTransactions = new LinkedList<>();
+
+ Journal messageJournal = storageManager.getMessageJournal();
+
+ ActiveMQServerLogger.LOGGER.debug("Reading journal from " + config.getJournalDirectory());
+
+ messageJournal.start();
+
+ // Just logging these, no action necessary
+ TransactionFailureCallback transactionFailureCallback = new TransactionFailureCallback() {
+ @Override
+ public void failedTransaction(long transactionID,
+ List<RecordInfo> records1,
+ List<RecordInfo> recordsToDelete) {
+ StringBuilder message = new StringBuilder();
+ message.append("Encountered failed journal transaction: ").append(transactionID);
+ for (int i = 0; i < records1.size(); i++) {
+ if (i == 0) {
+ message.append("; Records: ");
+ }
+ message.append(records1.get(i));
+ if (i != (records1.size() - 1)) {
+ message.append(", ");
+ }
+ }
+
+ for (int i = 0; i < recordsToDelete.size(); i++) {
+ if (i == 0) {
+ message.append("; RecordsToDelete: ");
+ }
+ message.append(recordsToDelete.get(i));
+ if (i != (recordsToDelete.size() - 1)) {
+ message.append(", ");
+ }
+ }
+
+ ActiveMQServerLogger.LOGGER.debug(message.toString());
+ }
+ };
+
+ ((JournalImpl) messageJournal).load(records, preparedTransactions, transactionFailureCallback, false);
+
+ // Since we don't use these nullify the reference so that the garbage collector can clean them up
+ preparedTransactions = null;
+
+ for (RecordInfo info : records) {
+ byte[] data = info.data;
+
+ ActiveMQBuffer buff = ActiveMQBuffers.wrappedBuffer(data);
+
+ Object o = DescribeJournal.newObjectEncoding(info, storageManager);
+ if (info.getUserRecordType() == JournalRecordIds.ADD_MESSAGE) {
+ messages.put(info.id, ((MessageDescribe) o).getMsg().toCore());
+ } else if (info.getUserRecordType() == JournalRecordIds.ADD_MESSAGE_PROTOCOL) {
+ messages.put(info.id, ((MessageDescribe) o).getMsg().toCore());
+ } else if (info.getUserRecordType() == JournalRecordIds.ADD_LARGE_MESSAGE) {
+ messages.put(info.id, ((MessageDescribe) o).getMsg());
+ } else if (info.getUserRecordType() == JournalRecordIds.ADD_REF) {
+ ReferenceDescribe ref = (ReferenceDescribe) o;
+ HashMap<Long, ReferenceDescribe> map = messageRefs.get(info.id);
+ if (map == null) {
+ HashMap<Long, ReferenceDescribe> newMap = new HashMap<>();
+ newMap.put(ref.refEncoding.queueID, ref);
+ messageRefs.put(info.id, newMap);
+ } else {
+ map.put(ref.refEncoding.queueID, ref);
+ }
+ } else if (info.getUserRecordType() == JournalRecordIds.ACKNOWLEDGE_REF) {
+ acks.add(info);
+ } else if (info.userRecordType == JournalRecordIds.ACKNOWLEDGE_CURSOR) {
+ CursorAckRecordEncoding encoding = new CursorAckRecordEncoding();
+ encoding.decode(buff);
+
+ Set<PagePosition> set = cursorRecords.get(encoding.queueID);
+
+ if (set == null) {
+ set = new HashSet<>();
+ cursorRecords.put(encoding.queueID, set);
+ }
+
+ set.add(encoding.position);
+ } else if (info.userRecordType == JournalRecordIds.PAGE_TRANSACTION) {
+ if (info.isUpdate) {
+ PageUpdateTXEncoding pageUpdate = new PageUpdateTXEncoding();
+
+ pageUpdate.decode(buff);
+ pgTXs.add(pageUpdate.pageTX);
+ } else {
+ PageTransactionInfoImpl pageTransactionInfo = new PageTransactionInfoImpl();
+
+ pageTransactionInfo.decode(buff);
+
+ pageTransactionInfo.setRecordID(info.id);
+ pgTXs.add(pageTransactionInfo.getTransactionID());
+ }
+ }
+ }
+
+ messageJournal.stop();
+
+ removeAcked(acks);
+ }
+
+ /**
+ * Go back through the messages and message refs we found in the journal and remove the ones that have been acked.
+ *
+ * @param acks the list of ack records we got from the journal
+ */
+ private void removeAcked(ArrayList<RecordInfo> acks) {
+ for (RecordInfo info : acks) {
+ AckDescribe ack = (AckDescribe) DescribeJournal.newObjectEncoding(info, null);
+ HashMap<Long, ReferenceDescribe> referenceDescribeHashMap = messageRefs.get(info.id);
+ referenceDescribeHashMap.remove(ack.refEncoding.queueID);
+ if (referenceDescribeHashMap.size() == 0) {
+ messages.remove(info.id);
+ messageRefs.remove(info.id);
+ }
+ }
+ }
+
+ /**
+ * Open the bindings journal and extract all bindings data.
+ *
+ * @throws Exception will be thrown if anything goes wrong reading the bindings journal
+ */
+ private void getBindings() throws Exception {
+ List<RecordInfo> records = new LinkedList<>();
+
+ Journal bindingsJournal = storageManager.getBindingsJournal();
+
+ bindingsJournal.start();
+
+ ActiveMQServerLogger.LOGGER.debug("Reading bindings journal from " + config.getBindingsDirectory());
+
+ ((JournalImpl) bindingsJournal).load(records, null, null, false);
+
+ for (RecordInfo info : records) {
+ if (info.getUserRecordType() == JournalRecordIds.QUEUE_BINDING_RECORD) {
+ PersistentQueueBindingEncoding bindingEncoding = (PersistentQueueBindingEncoding) DescribeJournal.newObjectEncoding(info, null);
+ queueBindings.put(bindingEncoding.getId(), bindingEncoding);
+ } else if (info.getUserRecordType() == JournalRecordIds.ADDRESS_BINDING_RECORD) {
+ PersistentAddressBindingEncoding bindingEncoding = (PersistentAddressBindingEncoding) DescribeJournal.newObjectEncoding(info, null);
+ addressBindings.put(bindingEncoding.getId(), bindingEncoding);
+ }
+ }
+
+ bindingsJournal.stop();
+ }
+
+ private void printDataAsXML() {
+ try {
+ xmlWriter.writeStartDocument(XmlDataConstants.XML_VERSION);
+ xmlWriter.writeStartElement(XmlDataConstants.DOCUMENT_PARENT);
+ printBindingsAsXML();
+ printAllMessagesAsXML();
+ xmlWriter.writeEndElement(); // end DOCUMENT_PARENT
+ xmlWriter.writeEndDocument();
+ xmlWriter.flush();
+ xmlWriter.close();
+ } catch (Exception e) {
+ e.printStackTrace();
+ }
+ }
+
+ private void printBindingsAsXML() throws XMLStreamException {
+ xmlWriter.writeStartElement(XmlDataConstants.BINDINGS_PARENT);
+ for (Map.Entry<Long, PersistentAddressBindingEncoding> addressBindingEncodingEntry : addressBindings.entrySet()) {
+ PersistentAddressBindingEncoding bindingEncoding = addressBindings.get(addressBindingEncodingEntry.getKey());
+ xmlWriter.writeEmptyElement(XmlDataConstants.ADDRESS_BINDINGS_CHILD);
+ StringBuilder routingTypes = new StringBuilder();
+ for (RoutingType routingType : bindingEncoding.getRoutingTypes()) {
+ routingTypes.append(routingType.toString()).append(", ");
+ }
+ xmlWriter.writeAttribute(XmlDataConstants.ADDRESS_BINDING_ROUTING_TYPE, routingTypes.toString().substring(0, routingTypes.length() - 2));
+ xmlWriter.writeAttribute(XmlDataConstants.ADDRESS_BINDING_NAME, bindingEncoding.getName().toString());
+ xmlWriter.writeAttribute(XmlDataConstants.ADDRESS_BINDING_ID, Long.toString(bindingEncoding.getId()));
+ bindingsPrinted++;
+ }
+ for (Map.Entry<Long, PersistentQueueBindingEncoding> queueBindingEncodingEntry : queueBindings.entrySet()) {
+ PersistentQueueBindingEncoding bindingEncoding = queueBindings.get(queueBindingEncodingEntry.getKey());
+ xmlWriter.writeEmptyElement(XmlDataConstants.QUEUE_BINDINGS_CHILD);
+ xmlWriter.writeAttribute(XmlDataConstants.QUEUE_BINDING_ADDRESS, bindingEncoding.getAddress().toString());
+ String filter = "";
+ if (bindingEncoding.getFilterString() != null) {
+ filter = bindingEncoding.getFilterString().toString();
+ }
+ xmlWriter.writeAttribute(XmlDataConstants.QUEUE_BINDING_FILTER_STRING, filter);
+ xmlWriter.writeAttribute(XmlDataConstants.QUEUE_BINDING_NAME, bindingEncoding.getQueueName().toString());
+ xmlWriter.writeAttribute(XmlDataConstants.QUEUE_BINDING_ID, Long.toString(bindingEncoding.getId()));
+ xmlWriter.writeAttribute(XmlDataConstants.QUEUE_BINDING_ROUTING_TYPE, RoutingType.getType(bindingEncoding.getRoutingType()).toString());
+ bindingsPrinted++;
+ }
+ xmlWriter.writeEndElement(); // end BINDINGS_PARENT
+ }
+
+ private void printAllMessagesAsXML() throws Exception {
+ xmlWriter.writeStartElement(XmlDataConstants.MESSAGES_PARENT);
+
+ // Order here is important. We must process the messages from the journal before we process those from the page
+ // files in order to get the messages in the right order.
+ for (Map.Entry<Long, Message> messageMapEntry : messages.entrySet()) {
+ printSingleMessageAsXML(messageMapEntry.getValue().toCore(), extractQueueNames(messageRefs.get(messageMapEntry.getKey())));
+ }
+
+ printPagedMessagesAsXML();
+
+ xmlWriter.writeEndElement(); // end "messages"
+ }
+
+ /**
+ * Reads from the page files and prints messages as it finds them (making sure to check acks and transactions
+ * from the journal).
+ */
+ private void printPagedMessagesAsXML() {
+ try {
+ ScheduledExecutorService scheduled = Executors.newScheduledThreadPool(1, ActiveMQThreadFactory.defaultThreadFactory());
+ final ExecutorService executor = Executors.newFixedThreadPool(10, ActiveMQThreadFactory.defaultThreadFactory());
+ ExecutorFactory executorFactory = new ExecutorFactory() {
+ @Override
+ public Executor getExecutor() {
+ return executor;
+ }
+ };
+ PagingStoreFactory pageStoreFactory = new PagingStoreFactoryNIO(storageManager, config.getPagingLocation(), 1000L, scheduled, executorFactory, true, null);
+ HierarchicalRepository<AddressSettings> addressSettingsRepository = new HierarchicalObjectRepository<>();
+ addressSettingsRepository.setDefault(new AddressSettings());
+ PagingManager manager = new PagingManagerImpl(pageStoreFactory, addressSettingsRepository);
+
+ manager.start();
+
+ SimpleString[] stores = manager.getStoreNames();
+
+ for (SimpleString store : stores) {
+ PagingStore pageStore = manager.getPageStore(store);
+
+ if (pageStore != null) {
+ File folder = pageStore.getFolder();
+ ActiveMQServerLogger.LOGGER.debug("Reading page store " + store + " folder = " + folder);
+
+ int pageId = (int) pageStore.getFirstPage();
+ for (int i = 0; i < pageStore.getNumberOfPages(); i++) {
+ ActiveMQServerLogger.LOGGER.debug("Reading page " + pageId);
+ Page page = pageStore.createPage(pageId);
+ page.open();
+ List<PagedMessage> messages = page.read(storageManager);
+ page.close();
+
+ int messageId = 0;
+
+ for (PagedMessage message : messages) {
+ message.initMessage(storageManager);
+ long[] queueIDs = message.getQueueIDs();
+ List<String> queueNames = new ArrayList<>();
+ for (long queueID : queueIDs) {
+ PagePosition posCheck = new PagePositionImpl(pageId, messageId);
+
+ boolean acked = false;
+
+ Set<PagePosition> positions = cursorRecords.get(queueID);
+ if (positions != null) {
+ acked = positions.contains(posCheck);
+ }
+
+ if (!acked) {
+ PersistentQueueBindingEncoding queueBinding = queueBindings.get(queueID);
+ if (queueBinding != null) {
+ SimpleString queueName = queueBinding.getQueueName();
+ queueNames.add(queueName.toString());
+ }
+ }
+ }
+
+ if (queueNames.size() > 0 && (message.getTransactionID() == -1 || pgTXs.contains(message.getTransactionID()))) {
+ printSingleMessageAsXML(message.getMessage().toCore(), queueNames);
+ }
+
+ messageId++;
+ }
+
+ pageId++;
+ }
+ } else {
+ ActiveMQServerLogger.LOGGER.debug("Page store was null");
+ }
+ }
+ } catch (Exception e) {
+ e.printStackTrace();
+ }
+ }
+
+ private void printSingleMessageAsXML(ICoreMessage message, List<String> queues) throws Exception {
+ xmlWriter.writeStartElement(XmlDataConstants.MESSAGES_CHILD);
+ printMessageAttributes(message);
+ printMessageProperties(message);
+ printMessageQueues(queues);
+ printMessageBody(message.toCore());
+ xmlWriter.writeEndElement(); // end MESSAGES_CHILD
+ messagesPrinted++;
+ }
+
+ private void printMessageBody(Message message) throws Exception {
+ xmlWriter.writeStartElement(XmlDataConstants.MESSAGE_BODY);
+
+ if (message.toCore().isLargeMessage()) {
+ printLargeMessageBody((LargeServerMessage) message);
+ } else {
+ xmlWriter.writeCData(XmlDataExporterUtil.encodeMessageBody(message));
+ }
+ xmlWriter.writeEndElement(); // end MESSAGE_BODY
+ }
+
+ private void printLargeMessageBody(LargeServerMessage message) throws XMLStreamException {
+ xmlWriter.writeAttribute(XmlDataConstants.MESSAGE_IS_LARGE, Boolean.TRUE.toString());
+ LargeBodyEncoder encoder = null;
+
+ try {
+ encoder = message.toCore().getBodyEncoder();
+ encoder.open();
+ long totalBytesWritten = 0;
+ Long bufferSize;
+ long bodySize = encoder.getLargeBodySize();
+ for (long i = 0; i < bodySize; i += LARGE_MESSAGE_CHUNK_SIZE) {
+ Long remainder = bodySize - totalBytesWritten;
+ if (remainder >= LARGE_MESSAGE_CHUNK_SIZE) {
+ bufferSize = LARGE_MESSAGE_CHUNK_SIZE;
+ } else {
+ bufferSize = remainder;
+ }
+ ActiveMQBuffer buffer = ActiveMQBuffers.fixedBuffer(bufferSize.intValue());
+ encoder.encode(buffer, bufferSize.intValue());
+ xmlWriter.writeCData(XmlDataExporterUtil.encode(buffer.toByteBuffer().array()));
+ totalBytesWritten += bufferSize;
+ }
+ encoder.close();
+ } catch (ActiveMQException e) {
+ e.printStackTrace();
+ } finally {
+ if (encoder != null) {
+ try {
+ encoder.close();
+ } catch (ActiveMQException e) {
+ e.printStackTrace();
+ }
+ }
+ }
+ }
+
+ private void printMessageQueues(List<String> queues) throws XMLStreamException {
+ xmlWriter.writeStartElement(XmlDataConstants.QUEUES_PARENT);
+ for (String queueName : queues) {
+ xmlWriter.writeEmptyElement(XmlDataConstants.QUEUES_CHILD);
+ xmlWriter.writeAttribute(XmlDataConstants.QUEUE_NAME, queueName);
+ }
+ xmlWriter.writeEndElement(); // end QUEUES_PARENT
+ }
+
+ private void printMessageProperties(Message message) throws XMLStreamException {
+ xmlWriter.writeStartElement(XmlDataConstants.PROPERTIES_PARENT);
+ for (SimpleString key : message.getPropertyNames()) {
+ Object value = message.getObjectProperty(key);
+ xmlWriter.writeEmptyElement(XmlDataConstants.PROPERTIES_CHILD);
+ xmlWriter.writeAttribute(XmlDataConstants.PROPERTY_NAME, key.toString());
+ xmlWriter.writeAttribute(XmlDataConstants.PROPERTY_VALUE, XmlDataExporterUtil.convertProperty(value));
+
+ // Write the property type as an attribute
+ String propertyType = XmlDataExporterUtil.getPropertyType(value);
+ if (propertyType != null) {
+ xmlWriter.writeAttribute(XmlDataConstants.PROPERTY_TYPE, propertyType);
+ }
+ }
+ xmlWriter.writeEndElement(); // end PROPERTIES_PARENT
+ }
+
+ private void printMessageAttributes(ICoreMessage message) throws XMLStreamException {
+ xmlWriter.writeAttribute(XmlDataConstants.MESSAGE_ID, Long.toString(message.getMessageID()));
+ xmlWriter.writeAttribute(XmlDataConstants.MESSAGE_PRIORITY, Byte.toString(message.getPriority()));
+ xmlWriter.writeAttribute(XmlDataConstants.MESSAGE_EXPIRATION, Long.toString(message.getExpiration()));
+ xmlWriter.writeAttribute(XmlDataConstants.MESSAGE_TIMESTAMP, Long.toString(message.getTimestamp()));
+ String prettyType = XmlDataExporterUtil.getMessagePrettyType(message.getType());
+ xmlWriter.writeAttribute(XmlDataConstants.MESSAGE_TYPE, prettyType);
+ if (message.getUserID() != null) {
+ xmlWriter.writeAttribute(XmlDataConstants.MESSAGE_USER_ID, message.getUserID().toString());
+ }
+ }
+
+ private List<String> extractQueueNames(HashMap<Long, ReferenceDescribe> refMap) {
+ List<String> queues = new ArrayList<>();
+ for (ReferenceDescribe ref : refMap.values()) {
+ queues.add(queueBindings.get(ref.refEncoding.queueID).getQueueName().toString());
+ }
+ return queues;
+ }
+
+ // Inner classes -------------------------------------------------
+
+ /**
+ * Proxy to handle indenting the XML since <code>javax.xml.stream.XMLStreamWriter</code> doesn't support that.
+ */
+ static class PrettyPrintHandler implements InvocationHandler {
+
+ private final XMLStreamWriter target;
+
+ private int depth = 0;
+
+ private static final char INDENT_CHAR = ' ';
+
+ private static final String LINE_SEPARATOR = System.getProperty("line.separator");
+
+ boolean wrap = true;
+
+ PrettyPrintHandler(XMLStreamWriter target) {
+ this.target = target;
+ }
+
+ @Override
+ public Object invoke(Object proxy, Method method, Object[] args) throws Throwable {
+ String m = method.getName();
+
+ switch (m) {
+ case "writeStartElement":
+ target.writeCharacters(LINE_SEPARATOR);
+ target.writeCharacters(indent(depth));
+
+ depth++;
+ break;
+ case "writeEndElement":
+ depth--;
+ if (wrap) {
+ target.writeCharacters(LINE_SEPARATOR);
+ target.writeCharacters(indent(depth));
+ }
+ wrap = true;
+ break;
+ case "writeEmptyElement":
+ case "writeCData":
+ target.writeCharacters(LINE_SEPARATOR);
+ target.writeCharacters(indent(depth));
+ break;
+ case "writeCharacters":
+ wrap = false;
+ break;
+ }
+
+ method.invoke(target, args);
+
+ return null;
+ }
+
+ private String indent(int depth) {
+ depth *= 3; // level of indentation
+ char[] output = new char[depth];
+ while (depth-- > 0) {
+ output[depth] = INDENT_CHAR;
+ }
+ return new String(output);
+ }
+ }
+}
http://git-wip-us.apache.org/repos/asf/activemq-artemis/blob/f82623a2/artemis-cli/src/main/java/org/apache/activemq/artemis/cli/commands/tools/xml/XmlDataExporterUtil.java
----------------------------------------------------------------------
diff --git a/artemis-cli/src/main/java/org/apache/activemq/artemis/cli/commands/tools/xml/XmlDataExporterUtil.java b/artemis-cli/src/main/java/org/apache/activemq/artemis/cli/commands/tools/xml/XmlDataExporterUtil.java
new file mode 100644
index 0000000..df48dcf
--- /dev/null
+++ b/artemis-cli/src/main/java/org/apache/activemq/artemis/cli/commands/tools/xml/XmlDataExporterUtil.java
@@ -0,0 +1,104 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.activemq.artemis.cli.commands.tools.xml;
+
+import com.google.common.base.Preconditions;
+import org.apache.activemq.artemis.api.core.ActiveMQBuffer;
+import org.apache.activemq.artemis.api.core.Message;
+import org.apache.activemq.artemis.api.core.SimpleString;
+import org.apache.activemq.artemis.utils.Base64;
+
+/**
+ * Common utility methods to help with XML message conversion
+ */
+public class XmlDataExporterUtil {
+
+ static String convertProperty(final Object value) {
+ if (value instanceof byte[]) {
+ return encode((byte[]) value);
+ } else {
+ return value == null ? XmlDataConstants.NULL : value.toString();
+ }
+ }
+
+ static String getPropertyType(final Object value) {
+ String stringValue = null;
+
+ // if the value is null then we can't really know what it is so just set
+ // the type to the most generic thing
+ if (value == null) {
+ stringValue = XmlDataConstants.PROPERTY_TYPE_BYTES;
+ } else if (value instanceof Boolean) {
+ stringValue = XmlDataConstants.PROPERTY_TYPE_BOOLEAN;
+ } else if (value instanceof Byte) {
+ stringValue = XmlDataConstants.PROPERTY_TYPE_BYTE;
+ } else if (value instanceof Short) {
+ stringValue = XmlDataConstants.PROPERTY_TYPE_SHORT;
+ } else if (value instanceof Integer) {
+ stringValue = XmlDataConstants.PROPERTY_TYPE_INTEGER;
+ } else if (value instanceof Long) {
+ stringValue = XmlDataConstants.PROPERTY_TYPE_LONG;
+ } else if (value instanceof Float) {
+ stringValue = XmlDataConstants.PROPERTY_TYPE_FLOAT;
+ } else if (value instanceof Double) {
+ stringValue = XmlDataConstants.PROPERTY_TYPE_DOUBLE;
+ } else if (value instanceof String) {
+ stringValue = XmlDataConstants.PROPERTY_TYPE_STRING;
+ } else if (value instanceof SimpleString) {
+ stringValue = XmlDataConstants.PROPERTY_TYPE_SIMPLE_STRING;
+ } else if (value instanceof byte[]) {
+ stringValue = XmlDataConstants.PROPERTY_TYPE_BYTES;
+ }
+
+ return stringValue;
+ }
+
+ public static String getMessagePrettyType(byte rawType) {
+ String prettyType = XmlDataConstants.DEFAULT_TYPE_PRETTY;
+
+ if (rawType == Message.BYTES_TYPE) {
+ prettyType = XmlDataConstants.BYTES_TYPE_PRETTY;
+ } else if (rawType == Message.MAP_TYPE) {
+ prettyType = XmlDataConstants.MAP_TYPE_PRETTY;
+ } else if (rawType == Message.OBJECT_TYPE) {
+ prettyType = XmlDataConstants.OBJECT_TYPE_PRETTY;
+ } else if (rawType == Message.STREAM_TYPE) {
+ prettyType = XmlDataConstants.STREAM_TYPE_PRETTY;
+ } else if (rawType == Message.TEXT_TYPE) {
+ prettyType = XmlDataConstants.TEXT_TYPE_PRETTY;
+ }
+
+ return prettyType;
+ }
+
+ /**
+ * Base64 encode a ServerMessage body into the proper XML format
+ */
+ static String encodeMessageBody(final Message message) throws Exception {
+ Preconditions.checkNotNull(message, "ServerMessage can not be null");
+
+ ActiveMQBuffer byteBuffer = message.toCore().getReadOnlyBodyBuffer();
+ byte[] buffer = new byte[byteBuffer.writerIndex()];
+ byteBuffer.readBytes(buffer);
+
+ return XmlDataExporterUtil.encode(buffer);
+ }
+
+ protected static String encode(final byte[] data) {
+ return Base64.encodeBytes(data, 0, data.length, Base64.DONT_BREAK_LINES | Base64.URL_SAFE);
+ }
+}
http://git-wip-us.apache.org/repos/asf/activemq-artemis/blob/f82623a2/artemis-cli/src/main/java/org/apache/activemq/artemis/cli/commands/tools/xml/XmlDataImporter.java
----------------------------------------------------------------------
diff --git a/artemis-cli/src/main/java/org/apache/activemq/artemis/cli/commands/tools/xml/XmlDataImporter.java b/artemis-cli/src/main/java/org/apache/activemq/artemis/cli/commands/tools/xml/XmlDataImporter.java
new file mode 100644
index 0000000..a824177
--- /dev/null
+++ b/artemis-cli/src/main/java/org/apache/activemq/artemis/cli/commands/tools/xml/XmlDataImporter.java
@@ -0,0 +1,614 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.activemq.artemis.cli.commands.tools.xml;
+
+import javax.xml.XMLConstants;
+import javax.xml.stream.XMLInputFactory;
+import javax.xml.stream.XMLStreamConstants;
+import javax.xml.stream.XMLStreamException;
+import javax.xml.stream.XMLStreamReader;
+import javax.xml.transform.stax.StAXSource;
+import javax.xml.validation.Schema;
+import javax.xml.validation.SchemaFactory;
+import javax.xml.validation.Validator;
+import java.io.BufferedInputStream;
+import java.io.File;
+import java.io.FileInputStream;
+import java.io.FileOutputStream;
+import java.io.IOException;
+import java.io.InputStream;
+import java.io.OutputStream;
+import java.net.URL;
+import java.nio.ByteBuffer;
+import java.security.AccessController;
+import java.security.PrivilegedAction;
+import java.util.ArrayList;
+import java.util.HashMap;
+import java.util.HashSet;
+import java.util.Map;
+import java.util.Set;
+import java.util.UUID;
+
+import io.airlift.airline.Command;
+import io.airlift.airline.Option;
+import org.apache.activemq.artemis.api.core.ICoreMessage;
+import org.apache.activemq.artemis.api.core.Message;
+import org.apache.activemq.artemis.api.core.RoutingType;
+import org.apache.activemq.artemis.api.core.SimpleString;
+import org.apache.activemq.artemis.api.core.TransportConfiguration;
+import org.apache.activemq.artemis.api.core.client.ActiveMQClient;
+import org.apache.activemq.artemis.api.core.client.ClientMessage;
+import org.apache.activemq.artemis.api.core.client.ClientProducer;
+import org.apache.activemq.artemis.api.core.client.ClientRequestor;
+import org.apache.activemq.artemis.api.core.client.ClientSession;
+import org.apache.activemq.artemis.api.core.client.ClientSessionFactory;
+import org.apache.activemq.artemis.api.core.client.ServerLocator;
+import org.apache.activemq.artemis.api.core.management.ManagementHelper;
+import org.apache.activemq.artemis.api.core.management.ResourceNames;
+import org.apache.activemq.artemis.cli.commands.ActionAbstract;
+import org.apache.activemq.artemis.cli.commands.ActionContext;
+import org.apache.activemq.artemis.core.remoting.impl.netty.NettyConnectorFactory;
+import org.apache.activemq.artemis.core.remoting.impl.netty.TransportConstants;
+import org.apache.activemq.artemis.core.server.ActiveMQServerLogger;
+import org.apache.activemq.artemis.utils.Base64;
+import org.apache.activemq.artemis.utils.ClassloadingUtil;
+import org.apache.activemq.artemis.utils.ListUtil;
+import org.apache.activemq.artemis.utils.UUIDGenerator;
+import org.jboss.logging.Logger;
+
+/**
+ * Read XML output from <code>org.apache.activemq.artemis.core.persistence.impl.journal.XmlDataExporter</code>, create a core session, and
+ * send the messages to a running instance of ActiveMQ Artemis. It uses the StAX <code>javax.xml.stream.XMLStreamReader</code>
+ * for speed and simplicity.
+ */
+@Command(name = "imp", description = "Import all message-data using an XML that could be interpreted by any system.")
+public final class XmlDataImporter extends ActionAbstract {
+
+ private static final Logger logger = Logger.getLogger(XmlDataImporter.class);
+
+ private XMLStreamReader reader;
+
+ // this session is really only needed if the "session" variable does not auto-commit sends
+ ClientSession managementSession;
+
+ boolean localSession = false;
+
+ final Map<String, String> addressMap = new HashMap<>();
+
+ final Map<String, Long> queueIDs = new HashMap<>();
+
+ String tempFileName = "";
+
+ private ClientSession session;
+
+ @Option(name = "--host", description = "The host used to import the data (default localhost)")
+ public String host = "localhost";
+
+ @Option(name = "--port", description = "The port used to import the data (default 61616)")
+ public int port = 61616;
+
+ @Option(name = "--transaction", description = "If this is set to true you will need a whole transaction to commit at the end. (default false)")
+ public boolean transactional;
+
+ @Option(name = "--user", description = "User name used to import the data. (default null)")
+ public String user = null;
+
+ @Option(name = "--password", description = "User name used to import the data. (default null)")
+ public String password = null;
+
+ @Option(name = "--input", description = "The input file name (default=exp.dmp)", required = true)
+ public String input = "exp.dmp";
+
+ public String getPassword() {
+ return password;
+ }
+
+ public void setPassword(String password) {
+ this.password = password;
+ }
+
+ public String getUser() {
+ return user;
+ }
+
+ public void setUser(String user) {
+ this.user = user;
+ }
+
+ @Override
+ public Object execute(ActionContext context) throws Exception {
+ process(input, host, port, transactional);
+ return null;
+ }
+
+ public void process(String inputFile, String host, int port, boolean transactional) throws Exception {
+ this.process(new FileInputStream(inputFile), host, port, transactional);
+ }
+
+ /**
+ * This is the normal constructor for programmatic access to the
+ * <code>org.apache.activemq.artemis.core.persistence.impl.journal.XmlDataImporter</code> if the session passed
+ * in uses auto-commit for sends.
+ * <br>
+ * If the session needs to be transactional then use the constructor which takes 2 sessions.
+ *
+ * @param inputStream the stream from which to read the XML for import
+ * @param session used for sending messages, must use auto-commit for sends
+ */
+ public void process(InputStream inputStream, ClientSession session) throws Exception {
+ this.process(inputStream, session, null);
+ }
+
+ /**
+ * This is the constructor to use if you wish to import all messages transactionally.
+ * <br>
+ * Pass in a session which doesn't use auto-commit for sends, and one that does (for management
+ * operations necessary during import).
+ *
+ * @param inputStream the stream from which to read the XML for import
+ * @param session used for sending messages, doesn't need to auto-commit sends
+ * @param managementSession used for management queries, must use auto-commit for sends
+ */
+ public void process(InputStream inputStream,
+ ClientSession session,
+ ClientSession managementSession) throws Exception {
+ reader = XMLInputFactory.newInstance().createXMLStreamReader(inputStream);
+ this.session = session;
+ if (managementSession != null) {
+ this.managementSession = managementSession;
+ } else {
+ this.managementSession = session;
+ }
+
+ processXml();
+
+ }
+
+ public void process(InputStream inputStream, String host, int port, boolean transactional) throws Exception {
+ HashMap<String, Object> connectionParams = new HashMap<>();
+ connectionParams.put(TransportConstants.HOST_PROP_NAME, host);
+ connectionParams.put(TransportConstants.PORT_PROP_NAME, Integer.toString(port));
+ ServerLocator serverLocator = ActiveMQClient.createServerLocatorWithoutHA(new TransportConfiguration(NettyConnectorFactory.class.getName(), connectionParams));
+ ClientSessionFactory sf = serverLocator.createSessionFactory();
+
+ ClientSession session;
+ ClientSession managementSession;
+
+ if (user != null || password != null) {
+ session = sf.createSession(user, password, false, !transactional, true, false, 0);
+ managementSession = sf.createSession(user, password, false, true, true, false, 0);
+ } else {
+ session = sf.createSession(false, !transactional, true);
+ managementSession = sf.createSession(false, true, true);
+ }
+ localSession = true;
+
+ process(inputStream, session, managementSession);
+ }
+
+ public void validate(String file) throws Exception {
+ validate(new FileInputStream(file));
+ }
+
+ public void validate(InputStream inputStream) throws Exception {
+ XMLStreamReader reader = XMLInputFactory.newInstance().createXMLStreamReader(inputStream);
+ SchemaFactory factory = SchemaFactory.newInstance(XMLConstants.W3C_XML_SCHEMA_NS_URI);
+ Schema schema = factory.newSchema(XmlDataImporter.findResource("schema/artemis-import-export.xsd"));
+
+ Validator validator = schema.newValidator();
+ validator.validate(new StAXSource(reader));
+ reader.close();
+ }
+
+ private static URL findResource(final String resourceName) {
+ return AccessController.doPrivileged(new PrivilegedAction<URL>() {
+ @Override
+ public URL run() {
+ return ClassloadingUtil.findResource(resourceName);
+ }
+ });
+ }
+
+ private void processXml() throws Exception {
+ try {
+ while (reader.hasNext()) {
+ if (logger.isDebugEnabled()) {
+ logger.debug("EVENT:[" + reader.getLocation().getLineNumber() + "][" + reader.getLocation().getColumnNumber() + "] ");
+ }
+ if (reader.getEventType() == XMLStreamConstants.START_ELEMENT) {
+ if (XmlDataConstants.QUEUE_BINDINGS_CHILD.equals(reader.getLocalName())) {
+ bindQueue();
+ } else if (XmlDataConstants.ADDRESS_BINDINGS_CHILD.equals(reader.getLocalName())) {
+ bindAddress();
+ } else if (XmlDataConstants.MESSAGES_CHILD.equals(reader.getLocalName())) {
+ processMessage();
+ }
+ }
+ reader.next();
+ }
+
+ if (!session.isAutoCommitSends()) {
+ session.commit();
+ }
+ } finally {
+ // if the session was created in our constructor then close it (otherwise the caller will close it)
+ if (localSession) {
+ session.close();
+ managementSession.close();
+ }
+ }
+ }
+
+ private void processMessage() throws Exception {
+ Byte type = 0;
+ Byte priority = 0;
+ Long expiration = 0L;
+ Long timestamp = 0L;
+ org.apache.activemq.artemis.utils.UUID userId = null;
+ ArrayList<String> queues = new ArrayList<>();
+
+ // get message's attributes
+ for (int i = 0; i < reader.getAttributeCount(); i++) {
+ String attributeName = reader.getAttributeLocalName(i);
+ switch (attributeName) {
+ case XmlDataConstants.MESSAGE_TYPE:
+ type = getMessageType(reader.getAttributeValue(i));
+ break;
+ case XmlDataConstants.MESSAGE_PRIORITY:
+ priority = Byte.parseByte(reader.getAttributeValue(i));
+ break;
+ case XmlDataConstants.MESSAGE_EXPIRATION:
+ expiration = Long.parseLong(reader.getAttributeValue(i));
+ break;
+ case XmlDataConstants.MESSAGE_TIMESTAMP:
+ timestamp = Long.parseLong(reader.getAttributeValue(i));
+ break;
+ case XmlDataConstants.MESSAGE_USER_ID:
+ userId = UUIDGenerator.getInstance().generateUUID();
+ break;
+ }
+ }
+
+ Message message = session.createMessage(type, true, expiration, timestamp, priority);
+ message.setUserID(userId);
+
+ boolean endLoop = false;
+
+ // loop through the XML and gather up all the message's data (i.e. body, properties, queues, etc.)
+ while (reader.hasNext()) {
+ int eventType = reader.getEventType();
+ switch (eventType) {
+ case XMLStreamConstants.START_ELEMENT:
+ if (XmlDataConstants.MESSAGE_BODY.equals(reader.getLocalName())) {
+ processMessageBody(message.toCore());
+ } else if (XmlDataConstants.PROPERTIES_CHILD.equals(reader.getLocalName())) {
+ processMessageProperties(message);
+ } else if (XmlDataConstants.QUEUES_CHILD.equals(reader.getLocalName())) {
+ processMessageQueues(queues);
+ }
+ break;
+ case XMLStreamConstants.END_ELEMENT:
+ if (XmlDataConstants.MESSAGES_CHILD.equals(reader.getLocalName())) {
+ endLoop = true;
+ }
+ break;
+ }
+ if (endLoop) {
+ break;
+ }
+ reader.next();
+ }
+
+ sendMessage(queues, message);
+ }
+
+ private Byte getMessageType(String value) {
+ Byte type = Message.DEFAULT_TYPE;
+ switch (value) {
+ case XmlDataConstants.DEFAULT_TYPE_PRETTY:
+ type = Message.DEFAULT_TYPE;
+ break;
+ case XmlDataConstants.BYTES_TYPE_PRETTY:
+ type = Message.BYTES_TYPE;
+ break;
+ case XmlDataConstants.MAP_TYPE_PRETTY:
+ type = Message.MAP_TYPE;
+ break;
+ case XmlDataConstants.OBJECT_TYPE_PRETTY:
+ type = Message.OBJECT_TYPE;
+ break;
+ case XmlDataConstants.STREAM_TYPE_PRETTY:
+ type = Message.STREAM_TYPE;
+ break;
+ case XmlDataConstants.TEXT_TYPE_PRETTY:
+ type = Message.TEXT_TYPE;
+ break;
+ }
+ return type;
+ }
+
+ private void sendMessage(ArrayList<String> queues, Message message) throws Exception {
+ StringBuilder logMessage = new StringBuilder();
+ String destination = addressMap.get(queues.get(0));
+
+ logMessage.append("Sending ").append(message).append(" to address: ").append(destination).append("; routed to queues: ");
+ ByteBuffer buffer = ByteBuffer.allocate(queues.size() * 8);
+
+ for (String queue : queues) {
+ long queueID;
+
+ if (queueIDs.containsKey(queue)) {
+ queueID = queueIDs.get(queue);
+ } else {
+ // Get the ID of the queues involved so the message can be routed properly. This is done because we cannot
+ // send directly to a queue, we have to send to an address instead but not all the queues related to the
+ // address may need the message
+ try (ClientRequestor requestor = new ClientRequestor(managementSession, "activemq.management")) {
+ ClientMessage managementMessage = managementSession.createMessage(false);
+ ManagementHelper.putAttribute(managementMessage, ResourceNames.QUEUE + queue, "ID");
+ managementSession.start();
+ if (logger.isDebugEnabled()) {
+ logger.debug("Requesting ID for: " + queue);
+ }
+ ClientMessage reply = requestor.request(managementMessage);
+ Number idObject = (Number) ManagementHelper.getResult(reply);
+ queueID = idObject.longValue();
+ }
+ if (logger.isDebugEnabled()) {
+ logger.debug("ID for " + queue + " is: " + queueID);
+ }
+ queueIDs.put(queue, queueID); // store it so we don't have to look it up every time
+ }
+
+ logMessage.append(queue).append(", ");
+ buffer.putLong(queueID);
+ }
+
+ logMessage.delete(logMessage.length() - 2, logMessage.length()); // take off the trailing comma
+ if (logger.isDebugEnabled()) {
+ logger.debug(logMessage);
+ }
+
+ message.putBytesProperty(Message.HDR_ROUTE_TO_IDS, buffer.array());
+ try (ClientProducer producer = session.createProducer(destination)) {
+ producer.send(message);
+ }
+
+ if (tempFileName.length() > 0) {
+ File tempFile = new File(tempFileName);
+ if (!tempFile.delete()) {
+ ActiveMQServerLogger.LOGGER.couldNotDeleteTempFile(tempFileName);
+ }
+ tempFileName = "";
+ }
+ }
+
+ private void processMessageQueues(ArrayList<String> queues) {
+ for (int i = 0; i < reader.getAttributeCount(); i++) {
+ if (XmlDataConstants.QUEUE_NAME.equals(reader.getAttributeLocalName(i))) {
+ queues.add(reader.getAttributeValue(i));
+ }
+ }
+ }
+
+ private void processMessageProperties(Message message) {
+ String key = "";
+ String value = "";
+ String propertyType = "";
+
+ for (int i = 0; i < reader.getAttributeCount(); i++) {
+ String attributeName = reader.getAttributeLocalName(i);
+ switch (attributeName) {
+ case XmlDataConstants.PROPERTY_NAME:
+ key = reader.getAttributeValue(i);
+ break;
+ case XmlDataConstants.PROPERTY_VALUE:
+ value = reader.getAttributeValue(i);
+ break;
+ case XmlDataConstants.PROPERTY_TYPE:
+ propertyType = reader.getAttributeValue(i);
+ break;
+ }
+ }
+
+ if (value.equals(XmlDataConstants.NULL)) {
+ value = null;
+ }
+
+ switch (propertyType) {
+ case XmlDataConstants.PROPERTY_TYPE_SHORT:
+ message.putShortProperty(key, Short.parseShort(value));
+ break;
+ case XmlDataConstants.PROPERTY_TYPE_BOOLEAN:
+ message.putBooleanProperty(key, Boolean.parseBoolean(value));
+ break;
+ case XmlDataConstants.PROPERTY_TYPE_BYTE:
+ message.putByteProperty(key, Byte.parseByte(value));
+ break;
+ case XmlDataConstants.PROPERTY_TYPE_BYTES:
+ message.putBytesProperty(key, value == null ? null : decode(value));
+ break;
+ case XmlDataConstants.PROPERTY_TYPE_DOUBLE:
+ message.putDoubleProperty(key, Double.parseDouble(value));
+ break;
+ case XmlDataConstants.PROPERTY_TYPE_FLOAT:
+ message.putFloatProperty(key, Float.parseFloat(value));
+ break;
+ case XmlDataConstants.PROPERTY_TYPE_INTEGER:
+ message.putIntProperty(key, Integer.parseInt(value));
+ break;
+ case XmlDataConstants.PROPERTY_TYPE_LONG:
+ message.putLongProperty(key, Long.parseLong(value));
+ break;
+ case XmlDataConstants.PROPERTY_TYPE_SIMPLE_STRING:
+ message.putStringProperty(new SimpleString(key), value == null ? null : SimpleString.toSimpleString(value));
+ break;
+ case XmlDataConstants.PROPERTY_TYPE_STRING:
+ message.putStringProperty(key, value);
+ break;
+ }
+ }
+
+ private void processMessageBody(final ICoreMessage message) throws XMLStreamException, IOException {
+ boolean isLarge = false;
+
+ for (int i = 0; i < reader.getAttributeCount(); i++) {
+ String attributeName = reader.getAttributeLocalName(i);
+ if (XmlDataConstants.MESSAGE_IS_LARGE.equals(attributeName)) {
+ isLarge = Boolean.parseBoolean(reader.getAttributeValue(i));
+ }
+ }
+ reader.next();
+ if (logger.isDebugEnabled()) {
+ logger.debug("XMLStreamReader impl: " + reader);
+ }
+ if (isLarge) {
+ tempFileName = UUID.randomUUID().toString() + ".tmp";
+ if (logger.isDebugEnabled()) {
+ logger.debug("Creating temp file " + tempFileName + " for large message.");
+ }
+ try (OutputStream out = new FileOutputStream(tempFileName)) {
+ getMessageBodyBytes(new MessageBodyBytesProcessor() {
+ @Override
+ public void processBodyBytes(byte[] bytes) throws IOException {
+ out.write(bytes);
+ }
+ });
+ }
+ FileInputStream fileInputStream = new FileInputStream(tempFileName);
+ BufferedInputStream bufferedInput = new BufferedInputStream(fileInputStream);
+ ((ClientMessage) message).setBodyInputStream(bufferedInput);
+ } else {
+ getMessageBodyBytes(new MessageBodyBytesProcessor() {
+ @Override
+ public void processBodyBytes(byte[] bytes) throws IOException {
+ message.getBodyBuffer().writeBytes(bytes);
+ }
+ });
+ }
+ }
+
+ /**
+ * Message bodies are written to XML as one or more Base64 encoded CDATA elements. Some parser implementations won't
+ * read an entire CDATA element at once (e.g. Woodstox) so it's possible that multiple CDATA/CHARACTERS events need
+ * to be combined to reconstruct the Base64 encoded string. You can't decode bits and pieces of each CDATA. Each
+ * CDATA has to be decoded in its entirety.
+ *
+ * @param processor used to deal with the decoded CDATA elements
+ */
+ private void getMessageBodyBytes(MessageBodyBytesProcessor processor) throws IOException, XMLStreamException {
+ int currentEventType;
+ StringBuilder cdata = new StringBuilder();
+ while (reader.hasNext()) {
+ currentEventType = reader.getEventType();
+ if (currentEventType == XMLStreamConstants.END_ELEMENT) {
+ break;
+ } else if (currentEventType == XMLStreamConstants.CHARACTERS && reader.isWhiteSpace() && cdata.length() > 0) {
+ /* when we hit a whitespace CHARACTERS event we know that the entire CDATA is complete so decode, pass back to
+ * the processor, and reset the cdata for the next event(s)
+ */
+ processor.processBodyBytes(decode(cdata.toString()));
+ cdata.setLength(0);
+ } else {
+ cdata.append(new String(reader.getTextCharacters(), reader.getTextStart(), reader.getTextLength()).trim());
+ }
+ reader.next();
+ }
+ }
+
+ private void bindQueue() throws Exception {
+ String queueName = "";
+ String address = "";
+ String filter = "";
+ String routingType = "";
+
+ for (int i = 0; i < reader.getAttributeCount(); i++) {
+ String attributeName = reader.getAttributeLocalName(i);
+ switch (attributeName) {
+ case XmlDataConstants.QUEUE_BINDING_ADDRESS:
+ address = reader.getAttributeValue(i);
+ break;
+ case XmlDataConstants.QUEUE_BINDING_NAME:
+ queueName = reader.getAttributeValue(i);
+ break;
+ case XmlDataConstants.QUEUE_BINDING_FILTER_STRING:
+ filter = reader.getAttributeValue(i);
+ break;
+ case XmlDataConstants.QUEUE_BINDING_ROUTING_TYPE:
+ routingType = reader.getAttributeValue(i);
+ break;
+ }
+ }
+
+ ClientSession.QueueQuery queueQuery = session.queueQuery(new SimpleString(queueName));
+
+ if (!queueQuery.isExists()) {
+ session.createQueue(address, RoutingType.valueOf(routingType), queueName, filter, true);
+ if (logger.isDebugEnabled()) {
+ logger.debug("Binding queue(name=" + queueName + ", address=" + address + ", filter=" + filter + ")");
+ }
+ } else {
+ if (logger.isDebugEnabled()) {
+ logger.debug("Binding " + queueName + " already exists so won't re-bind.");
+ }
+ }
+
+ addressMap.put(queueName, address);
+ }
+
+ private void bindAddress() throws Exception {
+ String addressName = "";
+ String routingTypes = "";
+
+ for (int i = 0; i < reader.getAttributeCount(); i++) {
+ String attributeName = reader.getAttributeLocalName(i);
+ switch (attributeName) {
+ case XmlDataConstants.ADDRESS_BINDING_NAME:
+ addressName = reader.getAttributeValue(i);
+ break;
+ case XmlDataConstants.ADDRESS_BINDING_ROUTING_TYPE:
+ routingTypes = reader.getAttributeValue(i);
+ break;
+ }
+ }
+
+ ClientSession.AddressQuery addressQuery = session.addressQuery(new SimpleString(addressName));
+
+ if (!addressQuery.isExists()) {
+ Set<RoutingType> set = new HashSet<>();
+ for (String routingType : ListUtil.toList(routingTypes)) {
+ set.add(RoutingType.valueOf(routingType));
+ }
+ session.createAddress(SimpleString.toSimpleString(addressName), set, false);
+ if (logger.isDebugEnabled()) {
+ logger.debug("Binding address(name=" + addressName + ", routingTypes=" + routingTypes + ")");
+ }
+ } else {
+ if (logger.isDebugEnabled()) {
+ logger.debug("Binding " + addressName + " already exists so won't re-bind.");
+ }
+ }
+ }
+
+ private static byte[] decode(String data) {
+ return Base64.decode(data, Base64.DONT_BREAK_LINES | Base64.URL_SAFE);
+ }
+
+ private interface MessageBodyBytesProcessor {
+ void processBodyBytes(byte[] bytes) throws IOException;
+ }
+}
http://git-wip-us.apache.org/repos/asf/activemq-artemis/blob/f82623a2/artemis-cli/src/main/java/org/apache/activemq/artemis/cli/commands/user/AddUser.java
----------------------------------------------------------------------
diff --git a/artemis-cli/src/main/java/org/apache/activemq/artemis/cli/commands/user/AddUser.java b/artemis-cli/src/main/java/org/apache/activemq/artemis/cli/commands/user/AddUser.java
index caa32a7..37bd676 100644
--- a/artemis-cli/src/main/java/org/apache/activemq/artemis/cli/commands/user/AddUser.java
+++ b/artemis-cli/src/main/java/org/apache/activemq/artemis/cli/commands/user/AddUser.java
@@ -20,7 +20,6 @@ import io.airlift.airline.Command;
import io.airlift.airline.Option;
import org.apache.activemq.artemis.cli.commands.ActionContext;
import org.apache.activemq.artemis.cli.commands.util.HashUtil;
-import org.apache.activemq.artemis.util.FileBasedSecStoreConfig;
import org.apache.commons.lang3.StringUtils;
/**
@@ -53,7 +52,7 @@ public class AddUser extends PasswordAction {
* @param role the role
* @throws IllegalArgumentException if user exists
*/
- protected void add(String hash, String... role) throws Exception {
+ private void add(String hash, String... role) throws Exception {
FileBasedSecStoreConfig config = getConfiguration();
config.addNewUser(username, hash, role);
config.save();
http://git-wip-us.apache.org/repos/asf/activemq-artemis/blob/f82623a2/artemis-cli/src/main/java/org/apache/activemq/artemis/cli/commands/user/FileBasedSecStoreConfig.java
----------------------------------------------------------------------
diff --git a/artemis-cli/src/main/java/org/apache/activemq/artemis/cli/commands/user/FileBasedSecStoreConfig.java b/artemis-cli/src/main/java/org/apache/activemq/artemis/cli/commands/user/FileBasedSecStoreConfig.java
new file mode 100644
index 0000000..1f8e297
--- /dev/null
+++ b/artemis-cli/src/main/java/org/apache/activemq/artemis/cli/commands/user/FileBasedSecStoreConfig.java
@@ -0,0 +1,222 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.activemq.artemis.cli.commands.user;
+
+import java.io.File;
+import java.util.ArrayList;
+import java.util.Iterator;
+import java.util.List;
+
+import org.apache.activemq.artemis.api.core.Pair;
+import org.apache.activemq.artemis.utils.StringUtil;
+import org.apache.commons.configuration2.PropertiesConfiguration;
+import org.apache.commons.configuration2.builder.FileBasedConfigurationBuilder;
+import org.apache.commons.configuration2.builder.fluent.Configurations;
+
+class FileBasedSecStoreConfig {
+
+ private static final String LICENSE_HEADER =
+ "## ---------------------------------------------------------------------------\n" +
+ "## Licensed to the Apache Software Foundation (ASF) under one or more\n" +
+ "## contributor license agreements. See the NOTICE file distributed with\n" +
+ "## this work for additional information regarding copyright ownership.\n" +
+ "## The ASF licenses this file to You under the Apache License, Version 2.0\n" +
+ "## (the \"License\"); you may not use this file except in compliance with\n" +
+ "## the License. You may obtain a copy of the License at\n" +
+ "##\n" +
+ "## http://www.apache.org/licenses/LICENSE-2.0\n" +
+ "##\n" +
+ "## Unless required by applicable law or agreed to in writing, software\n" +
+ "## distributed under the License is distributed on an \"AS IS\" BASIS,\n" +
+ "## WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.\n" +
+ "## See the License for the specific language governing permissions and\n" +
+ "## limitations under the License.\n" +
+ "## ---------------------------------------------------------------------------\n";
+ private FileBasedConfigurationBuilder<PropertiesConfiguration> userBuilder;
+ private FileBasedConfigurationBuilder<PropertiesConfiguration> roleBuilder;
+ private PropertiesConfiguration userConfig;
+ private PropertiesConfiguration roleConfig;
+
+ FileBasedSecStoreConfig(File userFile, File roleFile) throws Exception {
+ Configurations configs = new Configurations();
+ userBuilder = configs.propertiesBuilder(userFile);
+ roleBuilder = configs.propertiesBuilder(roleFile);
+ userConfig = userBuilder.getConfiguration();
+ roleConfig = roleBuilder.getConfiguration();
+
+ String roleHeader = roleConfig.getLayout().getHeaderComment();
+ String userHeader = userConfig.getLayout().getHeaderComment();
+
+ if (userHeader == null) {
+ if (userConfig.isEmpty()) {
+ //clean and reset header
+ userConfig.clear();
+ userConfig.setHeader(LICENSE_HEADER);
+ }
+ }
+
+ if (roleHeader == null) {
+ if (roleConfig.isEmpty()) {
+ //clean and reset header
+ roleConfig.clear();
+ roleConfig.setHeader(LICENSE_HEADER);
+ }
+ }
+ }
+
+ void addNewUser(String username, String hash, String... roles) throws Exception {
+ if (userConfig.getString(username) != null) {
+ throw new IllegalArgumentException("User already exist: " + username);
+ }
+ userConfig.addProperty(username, hash);
+ addRoles(username, roles);
+ }
+
+ void save() throws Exception {
+ userBuilder.save();
+ roleBuilder.save();
+ }
+
+ void removeUser(String username) throws Exception {
+ if (userConfig.getProperty(username) == null) {
+ throw new IllegalArgumentException("user " + username + " doesn't exist.");
+ }
+ userConfig.clearProperty(username);
+ removeRoles(username);
+ }
+
+ List<String> listUser(String username) {
+ List<String> result = new ArrayList<>();
+ result.add("--- \"user\"(roles) ---\n");
+
+ int totalUsers = 0;
+ if (username != null) {
+ String roles = findRoles(username);
+ result.add("\"" + username + "\"(" + roles + ")");
+ totalUsers++;
+ } else {
+ Iterator<String> iter = userConfig.getKeys();
+ while (iter.hasNext()) {
+ String keyUser = iter.next();
+ String roles = findRoles(keyUser);
+ result.add("\"" + keyUser + "\"(" + roles + ")");
+ totalUsers++;
+ }
+ }
+ result.add("\n Total: " + totalUsers);
+ return result;
+ }
+
+ void updateUser(String username, String password, String[] roles) {
+ String oldPassword = (String) userConfig.getProperty(username);
+ if (oldPassword == null) {
+ throw new IllegalArgumentException("user " + username + " doesn't exist.");
+ }
+
+ if (password != null) {
+ userConfig.setProperty(username, password);
+ }
+
+ if (roles != null && roles.length > 0) {
+
+ removeRoles(username);
+ addRoles(username, roles);
+ }
+ }
+
+ private String findRoles(String uname) {
+ Iterator<String> iter = roleConfig.getKeys();
+ StringBuilder builder = new StringBuilder();
+ boolean first = true;
+ while (iter.hasNext()) {
+ String role = iter.next();
+ List<String> names = roleConfig.getList(String.class, role);
+ for (String value : names) {
+ //each value may be a comma separated list
+ String[] items = value.split(",");
+ for (String item : items) {
+ if (item.equals(uname)) {
+ if (!first) {
+ builder.append(",");
+ }
+ builder.append(role);
+ first = false;
+ }
+ }
+ }
+ }
+
+ return builder.toString();
+ }
+
+ private void addRoles(String username, String[] roles) {
+ for (String role : roles) {
+ List<String> users = roleConfig.getList(String.class, role);
+ if (users == null) {
+ users = new ArrayList<>();
+ }
+ users.add(username);
+ roleConfig.setProperty(role, StringUtil.joinStringList(users, ","));
+ }
+ }
+
+ private void removeRoles(String username) {
+
+ Iterator<String> iterKeys = roleConfig.getKeys();
+
+ List<Pair<String, List<String>>> updateMap = new ArrayList<>();
+ while (iterKeys.hasNext()) {
+ String theRole = iterKeys.next();
+
+ List<String> userList = roleConfig.getList(String.class, theRole);
+ List<String> newList = new ArrayList<>();
+
+ boolean roleChaned = false;
+ for (String value : userList) {
+ //each value may be comma separated.
+ List<String> update = new ArrayList<>();
+ String[] items = value.split(",");
+ boolean found = false;
+ for (String item : items) {
+ if (!item.equals(username)) {
+ update.add(item);
+ } else {
+ found = true;
+ roleChaned = true;
+ }
+ }
+ if (found) {
+ if (update.size() > 0) {
+ newList.add(StringUtil.joinStringList(update, ","));
+ }
+ }
+ }
+ if (roleChaned) {
+ updateMap.add(new Pair(theRole, newList));
+ }
+ }
+ //do update
+ Iterator<Pair<String, List<String>>> iterUpdate = updateMap.iterator();
+ while (iterUpdate.hasNext()) {
+ Pair<String, List<String>> entry = iterUpdate.next();
+ roleConfig.clearProperty(entry.getA());
+ if (entry.getB().size() > 0) {
+ roleConfig.addProperty(entry.getA(), entry.getB());
+ }
+ }
+ }
+}
http://git-wip-us.apache.org/repos/asf/activemq-artemis/blob/f82623a2/artemis-cli/src/main/java/org/apache/activemq/artemis/cli/commands/user/HelpUser.java
----------------------------------------------------------------------
diff --git a/artemis-cli/src/main/java/org/apache/activemq/artemis/cli/commands/user/HelpUser.java b/artemis-cli/src/main/java/org/apache/activemq/artemis/cli/commands/user/HelpUser.java
index 36c0348..2e0ce2b 100644
--- a/artemis-cli/src/main/java/org/apache/activemq/artemis/cli/commands/user/HelpUser.java
+++ b/artemis-cli/src/main/java/org/apache/activemq/artemis/cli/commands/user/HelpUser.java
@@ -16,15 +16,15 @@
*/
package org.apache.activemq.artemis.cli.commands.user;
+import java.io.File;
+import java.util.ArrayList;
+import java.util.List;
+
import io.airlift.airline.Help;
import org.apache.activemq.artemis.cli.commands.Action;
import org.apache.activemq.artemis.cli.commands.ActionContext;
import org.apache.activemq.artemis.cli.commands.InvalidOptionsError;
-import org.apache.activemq.artemis.util.OptionsUtil;
-
-import java.io.File;
-import java.util.ArrayList;
-import java.util.List;
+import org.apache.activemq.artemis.cli.commands.OptionsUtil;
public class HelpUser extends Help implements Action {
http://git-wip-us.apache.org/repos/asf/activemq-artemis/blob/f82623a2/artemis-cli/src/main/java/org/apache/activemq/artemis/cli/commands/user/ListUser.java
----------------------------------------------------------------------
diff --git a/artemis-cli/src/main/java/org/apache/activemq/artemis/cli/commands/user/ListUser.java b/artemis-cli/src/main/java/org/apache/activemq/artemis/cli/commands/user/ListUser.java
index 136a417..c0fb979 100644
--- a/artemis-cli/src/main/java/org/apache/activemq/artemis/cli/commands/user/ListUser.java
+++ b/artemis-cli/src/main/java/org/apache/activemq/artemis/cli/commands/user/ListUser.java
@@ -20,7 +20,6 @@ import java.util.List;
import io.airlift.airline.Command;
import org.apache.activemq.artemis.cli.commands.ActionContext;
-import org.apache.activemq.artemis.util.FileBasedSecStoreConfig;
/**
* list existing users, example:
@@ -42,7 +41,7 @@ public class ListUser extends UserAction {
* list a single user or all users
* if username is not specified
*/
- protected void list() throws Exception {
+ private void list() throws Exception {
FileBasedSecStoreConfig config = getConfiguration();
List<String> result = config.listUser(username);
for (String str : result) {
http://git-wip-us.apache.org/repos/asf/activemq-artemis/blob/f82623a2/artemis-cli/src/main/java/org/apache/activemq/artemis/cli/commands/user/PasswordAction.java
----------------------------------------------------------------------
diff --git a/artemis-cli/src/main/java/org/apache/activemq/artemis/cli/commands/user/PasswordAction.java b/artemis-cli/src/main/java/org/apache/activemq/artemis/cli/commands/user/PasswordAction.java
index 2260488..aeba55a 100644
--- a/artemis-cli/src/main/java/org/apache/activemq/artemis/cli/commands/user/PasswordAction.java
+++ b/artemis-cli/src/main/java/org/apache/activemq/artemis/cli/commands/user/PasswordAction.java
@@ -24,7 +24,7 @@ public class PasswordAction extends UserAction {
@Option(name = "--password", description = "the password (Default: input)")
String password;
- protected void checkInputPassword() {
+ void checkInputPassword() {
if (password == null) {
password = inputPassword("--password", "Please provide the password:", null);
}
http://git-wip-us.apache.org/repos/asf/activemq-artemis/blob/f82623a2/artemis-cli/src/main/java/org/apache/activemq/artemis/cli/commands/user/RemoveUser.java
----------------------------------------------------------------------
diff --git a/artemis-cli/src/main/java/org/apache/activemq/artemis/cli/commands/user/RemoveUser.java b/artemis-cli/src/main/java/org/apache/activemq/artemis/cli/commands/user/RemoveUser.java
index 70167da..a9dce8d 100644
--- a/artemis-cli/src/main/java/org/apache/activemq/artemis/cli/commands/user/RemoveUser.java
+++ b/artemis-cli/src/main/java/org/apache/activemq/artemis/cli/commands/user/RemoveUser.java
@@ -18,7 +18,6 @@ package org.apache.activemq.artemis.cli.commands.user;
import io.airlift.airline.Command;
import org.apache.activemq.artemis.cli.commands.ActionContext;
-import org.apache.activemq.artemis.util.FileBasedSecStoreConfig;
/**
* Remove a user, example:
@@ -35,7 +34,7 @@ public class RemoveUser extends UserAction {
return null;
}
- protected void remove() throws Exception {
+ private void remove() throws Exception {
FileBasedSecStoreConfig config = getConfiguration();
config.removeUser(username);
config.save();
http://git-wip-us.apache.org/repos/asf/activemq-artemis/blob/f82623a2/artemis-cli/src/main/java/org/apache/activemq/artemis/cli/commands/user/ResetUser.java
----------------------------------------------------------------------
diff --git a/artemis-cli/src/main/java/org/apache/activemq/artemis/cli/commands/user/ResetUser.java b/artemis-cli/src/main/java/org/apache/activemq/artemis/cli/commands/user/ResetUser.java
index c219ef5..2e3e725 100644
--- a/artemis-cli/src/main/java/org/apache/activemq/artemis/cli/commands/user/ResetUser.java
+++ b/artemis-cli/src/main/java/org/apache/activemq/artemis/cli/commands/user/ResetUser.java
@@ -20,7 +20,6 @@ import io.airlift.airline.Command;
import io.airlift.airline.Option;
import org.apache.activemq.artemis.cli.commands.ActionContext;
import org.apache.activemq.artemis.cli.commands.util.HashUtil;
-import org.apache.activemq.artemis.util.FileBasedSecStoreConfig;
import org.apache.commons.lang3.StringUtils;
/**
@@ -53,7 +52,7 @@ public class ResetUser extends PasswordAction {
return null;
}
- protected void reset(String password, String[] roles) throws Exception {
+ private void reset(String password, String[] roles) throws Exception {
if (password == null && roles == null) {
context.err.println("Nothing to update.");
return;
http://git-wip-us.apache.org/repos/asf/activemq-artemis/blob/f82623a2/artemis-cli/src/main/java/org/apache/activemq/artemis/cli/commands/user/UserAction.java
----------------------------------------------------------------------
diff --git a/artemis-cli/src/main/java/org/apache/activemq/artemis/cli/commands/user/UserAction.java b/artemis-cli/src/main/java/org/apache/activemq/artemis/cli/commands/user/UserAction.java
index 2f7c77f..2a23fa6 100644
--- a/artemis-cli/src/main/java/org/apache/activemq/artemis/cli/commands/user/UserAction.java
+++ b/artemis-cli/src/main/java/org/apache/activemq/artemis/cli/commands/user/UserAction.java
@@ -16,15 +16,14 @@
*/
package org.apache.activemq.artemis.cli.commands.user;
-import io.airlift.airline.Option;
-import org.apache.activemq.artemis.cli.commands.InputAbstract;
-import org.apache.activemq.artemis.spi.core.security.jaas.PropertiesLoginModule;
-import org.apache.activemq.artemis.util.FileBasedSecStoreConfig;
-
import javax.security.auth.login.AppConfigurationEntry;
import javax.security.auth.login.Configuration;
import java.io.File;
+import io.airlift.airline.Option;
+import org.apache.activemq.artemis.cli.commands.InputAbstract;
+import org.apache.activemq.artemis.spi.core.security.jaas.PropertiesLoginModule;
+
import static org.apache.activemq.artemis.spi.core.security.jaas.PropertiesLoginModule.ROLE_FILE_PROP_NAME;
import static org.apache.activemq.artemis.spi.core.security.jaas.PropertiesLoginModule.USER_FILE_PROP_NAME;
@@ -39,23 +38,19 @@ public abstract class UserAction extends InputAbstract {
@Option(name = "--entry", description = "The appConfigurationEntry (default: activemq)")
String entry = "activemq";
- protected void checkInputUser() {
+ void checkInputUser() {
if (username == null) {
username = input("--user", "Please provider the userName:", null);
}
}
- public void setRole(String role) {
- this.role = role;
- }
-
- public void checkInputRole() {
+ void checkInputRole() {
if (role == null) {
role = input("--role", "type a comma separated list of roles", null);
}
}
- protected FileBasedSecStoreConfig getConfiguration() throws Exception {
+ FileBasedSecStoreConfig getConfiguration() throws Exception {
Configuration securityConfig = Configuration.getConfiguration();
AppConfigurationEntry[] entries = securityConfig.getAppConfigurationEntry(entry);
@@ -82,4 +77,8 @@ public abstract class UserAction extends InputAbstract {
public void setUsername(String username) {
this.username = username;
}
+
+ public void setRole(String role) {
+ this.role = role;
+ }
}
http://git-wip-us.apache.org/repos/asf/activemq-artemis/blob/f82623a2/artemis-cli/src/main/java/org/apache/activemq/artemis/cli/commands/util/ConsumerThread.java
----------------------------------------------------------------------
diff --git a/artemis-cli/src/main/java/org/apache/activemq/artemis/cli/commands/util/ConsumerThread.java b/artemis-cli/src/main/java/org/apache/activemq/artemis/cli/commands/util/ConsumerThread.java
deleted file mode 100644
index fc63518..0000000
--- a/artemis-cli/src/main/java/org/apache/activemq/artemis/cli/commands/util/ConsumerThread.java
+++ /dev/null
@@ -1,320 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements. See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package org.apache.activemq.artemis.cli.commands.util;
-
-import javax.jms.BytesMessage;
-import javax.jms.Destination;
-import javax.jms.JMSException;
-import javax.jms.Message;
-import javax.jms.MessageConsumer;
-import javax.jms.Queue;
-import javax.jms.QueueBrowser;
-import javax.jms.Session;
-import javax.jms.TextMessage;
-import javax.jms.Topic;
-import java.util.Enumeration;
-import java.util.concurrent.CountDownLatch;
-
-public class ConsumerThread extends Thread {
-
- int messageCount = 1000;
- int receiveTimeOut = 3000;
- Destination destination;
- Session session;
- boolean durable;
- boolean breakOnNull = true;
- int sleep;
- int batchSize;
- boolean verbose;
- boolean browse;
-
- String filter;
-
- int received = 0;
- int transactions = 0;
- boolean running = false;
- CountDownLatch finished;
- boolean bytesAsText;
-
- public ConsumerThread(Session session, Destination destination, int threadNr) {
- super("Consumer " + destination.toString() + ", thread=" + threadNr);
- this.destination = destination;
- this.session = session;
- }
-
- @Override
- public void run() {
- if (browse) {
- browse();
- } else {
- consume();
- }
- }
-
- public void browse() {
- running = true;
- QueueBrowser consumer = null;
- String threadName = Thread.currentThread().getName();
- System.out.println(threadName + " wait until " + messageCount + " messages are consumed");
- try {
- if (filter != null) {
- consumer = session.createBrowser((Queue) destination, filter);
- } else {
- consumer = session.createBrowser((Queue) destination);
- }
- Enumeration<Message> enumBrowse = consumer.getEnumeration();
-
- while (enumBrowse.hasMoreElements()) {
- Message msg = enumBrowse.nextElement();
- if (msg != null) {
- System.out.println(threadName + " Received " + (msg instanceof TextMessage ? ((TextMessage) msg).getText() : msg.getJMSMessageID()));
-
- if (verbose) {
- System.out.println("..." + msg);
- }
- if (bytesAsText && (msg instanceof BytesMessage)) {
- long length = ((BytesMessage) msg).getBodyLength();
- byte[] bytes = new byte[(int) length];
- ((BytesMessage) msg).readBytes(bytes);
- System.out.println("Message:" + msg);
- }
- received++;
-
- if (received >= messageCount) {
- break;
- }
- } else {
- break;
- }
-
- if (sleep > 0) {
- Thread.sleep(sleep);
- }
-
- }
-
- consumer.close();
- } catch (Exception e) {
- e.printStackTrace();
- } finally {
- if (finished != null) {
- finished.countDown();
- }
- if (consumer != null) {
- System.out.println(threadName + " Consumed: " + this.getReceived() + " messages");
- try {
- consumer.close();
- } catch (JMSException e) {
- e.printStackTrace();
- }
- }
- }
-
- System.out.println(threadName + " Consumer thread finished");
- }
-
- public void consume() {
- running = true;
- MessageConsumer consumer = null;
- String threadName = Thread.currentThread().getName();
- System.out.println(threadName + " wait until " + messageCount + " messages are consumed");
- try {
- if (durable && destination instanceof Topic) {
- if (filter != null) {
- consumer = session.createDurableSubscriber((Topic) destination, getName(), filter, false);
- } else {
- consumer = session.createDurableSubscriber((Topic) destination, getName());
- }
- } else {
- if (filter != null) {
- consumer = session.createConsumer(destination, filter);
- } else {
- consumer = session.createConsumer(destination);
- }
- }
- while (running && received < messageCount) {
- Message msg = consumer.receive(receiveTimeOut);
- if (msg != null) {
- System.out.println(threadName + " Received " + (msg instanceof TextMessage ? ((TextMessage) msg).getText() : msg.getJMSMessageID()));
- if (verbose) {
- System.out.println("..." + msg);
- }
- if (bytesAsText && (msg instanceof BytesMessage)) {
- long length = ((BytesMessage) msg).getBodyLength();
- byte[] bytes = new byte[(int) length];
- ((BytesMessage) msg).readBytes(bytes);
- System.out.println("Message:" + msg);
- }
- received++;
- } else {
- if (breakOnNull) {
- break;
- }
- }
-
- if (session.getTransacted()) {
- if (batchSize > 0 && received > 0 && received % batchSize == 0) {
- System.out.println(threadName + " Committing transaction: " + transactions++);
- session.commit();
- }
- } else if (session.getAcknowledgeMode() == Session.CLIENT_ACKNOWLEDGE) {
- if (batchSize > 0 && received > 0 && received % batchSize == 0) {
- System.out.println("Acknowledging last " + batchSize + " messages; messages so far = " + received);
- msg.acknowledge();
- }
- }
- if (sleep > 0) {
- Thread.sleep(sleep);
- }
-
- }
-
- try {
- session.commit();
- } catch (Throwable ignored) {
- }
- } catch (Exception e) {
- e.printStackTrace();
- } finally {
- if (finished != null) {
- finished.countDown();
- }
- if (consumer != null) {
- System.out.println(threadName + " Consumed: " + this.getReceived() + " messages");
- try {
- consumer.close();
- } catch (JMSException e) {
- e.printStackTrace();
- }
- }
- }
-
- System.out.println(threadName + " Consumer thread finished");
- }
-
- public int getReceived() {
- return received;
- }
-
- public boolean isDurable() {
- return durable;
- }
-
- public ConsumerThread setDurable(boolean durable) {
- this.durable = durable;
- return this;
- }
-
- public ConsumerThread setMessageCount(int messageCount) {
- this.messageCount = messageCount;
- return this;
- }
-
- public ConsumerThread setBreakOnNull(boolean breakOnNull) {
- this.breakOnNull = breakOnNull;
- return this;
- }
-
- public int getBatchSize() {
- return batchSize;
- }
-
- public ConsumerThread setBatchSize(int batchSize) {
- this.batchSize = batchSize;
- return this;
- }
-
- public int getMessageCount() {
- return messageCount;
- }
-
- public boolean isBreakOnNull() {
- return breakOnNull;
- }
-
- public int getReceiveTimeOut() {
- return receiveTimeOut;
- }
-
- public ConsumerThread setReceiveTimeOut(int receiveTimeOut) {
- this.receiveTimeOut = receiveTimeOut;
- return this;
- }
-
- public boolean isRunning() {
- return running;
- }
-
- public ConsumerThread setRunning(boolean running) {
- this.running = running;
- return this;
- }
-
- public int getSleep() {
- return sleep;
- }
-
- public ConsumerThread setSleep(int sleep) {
- this.sleep = sleep;
- return this;
- }
-
- public CountDownLatch getFinished() {
- return finished;
- }
-
- public ConsumerThread setFinished(CountDownLatch finished) {
- this.finished = finished;
- return this;
- }
-
- public boolean isBytesAsText() {
- return bytesAsText;
- }
-
- public boolean isVerbose() {
- return verbose;
- }
-
- public ConsumerThread setVerbose(boolean verbose) {
- this.verbose = verbose;
- return this;
- }
-
- public ConsumerThread setBytesAsText(boolean bytesAsText) {
- this.bytesAsText = bytesAsText;
- return this;
- }
-
- public String getFilter() {
- return filter;
- }
-
- public ConsumerThread setFilter(String filter) {
- this.filter = filter;
- return this;
- }
-
- public boolean isBrowse() {
- return browse;
- }
-
- public ConsumerThread setBrowse(boolean browse) {
- this.browse = browse;
- return this;
- }
-}