You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@activemq.apache.org by cl...@apache.org on 2015/08/10 17:13:47 UTC
[48/53] [abbrv] [partial] activemq-artemis git commit: automatic
checkstyle change
http://git-wip-us.apache.org/repos/asf/activemq-artemis/blob/bac96047/artemis-cli/src/main/java/org/apache/activemq/artemis/cli/commands/tools/XmlDataImporter.java
----------------------------------------------------------------------
diff --git a/artemis-cli/src/main/java/org/apache/activemq/artemis/cli/commands/tools/XmlDataImporter.java b/artemis-cli/src/main/java/org/apache/activemq/artemis/cli/commands/tools/XmlDataImporter.java
index 33aa5f3..854ca98 100644
--- a/artemis-cli/src/main/java/org/apache/activemq/artemis/cli/commands/tools/XmlDataImporter.java
+++ b/artemis-cli/src/main/java/org/apache/activemq/artemis/cli/commands/tools/XmlDataImporter.java
@@ -16,7 +16,6 @@
*/
package org.apache.activemq.artemis.cli.commands.tools;
-
import javax.xml.stream.XMLInputFactory;
import javax.xml.stream.XMLStreamConstants;
import javax.xml.stream.XMLStreamException;
@@ -63,8 +62,7 @@ import org.apache.activemq.artemis.utils.UUIDGenerator;
* for speed and simplicity.
*/
@Command(name = "imp", description = "Import all message-data using an XML that could be interpreted by any system.")
-public final class XmlDataImporter extends ActionAbstract
-{
+public final class XmlDataImporter extends ActionAbstract {
// Constants -----------------------------------------------------
// Attributes ----------------------------------------------------
@@ -102,37 +100,29 @@ public final class XmlDataImporter extends ActionAbstract
@Option(name = "--input", description = "The input file name (default=exp.dmp)", required = true)
public String input = "exp.dmp";
- public String getPassword()
- {
+ public String getPassword() {
return password;
}
- public void setPassword(String password)
- {
+ public void setPassword(String password) {
this.password = password;
}
-
- public String getUser()
- {
+ public String getUser() {
return user;
}
- public void setUser(String user)
- {
+ public void setUser(String user) {
this.user = user;
}
-
@Override
- public Object execute(ActionContext context) throws Exception
- {
+ public Object execute(ActionContext context) throws Exception {
process(input, host, port, transactional);
return null;
}
- public void process(String inputFile, String host, int port, boolean transactional) throws Exception
- {
+ public void process(String inputFile, String host, int port, boolean transactional) throws Exception {
this.process(new FileInputStream(inputFile), host, port, transactional);
}
@@ -147,8 +137,7 @@ public final class XmlDataImporter extends ActionAbstract
* @param session used for sending messages, must use auto-commit for sends
* @throws Exception
*/
- public void process(InputStream inputStream, ClientSession session) throws Exception
- {
+ public void process(InputStream inputStream, ClientSession session) throws Exception {
this.process(inputStream, session, null);
}
@@ -162,44 +151,36 @@ public final class XmlDataImporter extends ActionAbstract
* @param session used for sending messages, doesn't need to auto-commit sends
* @param managementSession used for management queries, must use auto-commit for sends
*/
- public void process(InputStream inputStream, ClientSession session, ClientSession managementSession) throws Exception
- {
+ public void process(InputStream inputStream,
+ ClientSession session,
+ ClientSession managementSession) throws Exception {
reader = XMLInputFactory.newInstance().createXMLStreamReader(inputStream);
this.session = session;
- if (managementSession != null)
- {
+ if (managementSession != null) {
this.managementSession = managementSession;
}
- else
- {
+ else {
this.managementSession = session;
}
localSession = false;
-
processXml();
}
- public void process(InputStream inputStream, String host, int port, boolean transactional) throws Exception
- {
+ public void process(InputStream inputStream, String host, int port, boolean transactional) throws Exception {
reader = XMLInputFactory.newInstance().createXMLStreamReader(inputStream);
HashMap<String, Object> connectionParams = new HashMap<>();
connectionParams.put(TransportConstants.HOST_PROP_NAME, host);
connectionParams.put(TransportConstants.PORT_PROP_NAME, Integer.toString(port));
- ServerLocator serverLocator =
- ActiveMQClient.createServerLocatorWithoutHA(new TransportConfiguration(
- NettyConnectorFactory.class.getName(),
- connectionParams));
+ ServerLocator serverLocator = ActiveMQClient.createServerLocatorWithoutHA(new TransportConfiguration(NettyConnectorFactory.class.getName(), connectionParams));
ClientSessionFactory sf = serverLocator.createSessionFactory();
- if (user != null || password != null)
- {
+ if (user != null || password != null) {
session = sf.createSession(user, password, false, !transactional, true, false, 0);
managementSession = sf.createSession(user, password, false, true, true, false, 0);
}
- else
- {
+ else {
session = sf.createSession(false, !transactional, true);
managementSession = sf.createSession(false, true, true);
}
@@ -208,53 +189,41 @@ public final class XmlDataImporter extends ActionAbstract
processXml();
}
- private void processXml() throws Exception
- {
- try
- {
- while (reader.hasNext())
- {
+ private void processXml() throws Exception {
+ try {
+ while (reader.hasNext()) {
ActiveMQServerLogger.LOGGER.debug("EVENT:[" + reader.getLocation().getLineNumber() + "][" + reader.getLocation().getColumnNumber() + "] ");
- if (reader.getEventType() == XMLStreamConstants.START_ELEMENT)
- {
- if (XmlDataConstants.BINDINGS_CHILD.equals(reader.getLocalName()))
- {
+ if (reader.getEventType() == XMLStreamConstants.START_ELEMENT) {
+ if (XmlDataConstants.BINDINGS_CHILD.equals(reader.getLocalName())) {
bindQueue();
}
- else if (XmlDataConstants.MESSAGES_CHILD.equals(reader.getLocalName()))
- {
+ else if (XmlDataConstants.MESSAGES_CHILD.equals(reader.getLocalName())) {
processMessage();
}
- else if (XmlDataConstants.JMS_CONNECTION_FACTORIES.equals(reader.getLocalName()))
- {
+ else if (XmlDataConstants.JMS_CONNECTION_FACTORIES.equals(reader.getLocalName())) {
createJmsConnectionFactories();
}
- else if (XmlDataConstants.JMS_DESTINATIONS.equals(reader.getLocalName()))
- {
+ else if (XmlDataConstants.JMS_DESTINATIONS.equals(reader.getLocalName())) {
createJmsDestinations();
}
}
reader.next();
}
- if (!session.isAutoCommitSends())
- {
+ if (!session.isAutoCommitSends()) {
session.commit();
}
}
- finally
- {
+ finally {
// if the session was created in our constructor then close it (otherwise the caller will close it)
- if (localSession)
- {
+ if (localSession) {
session.close();
managementSession.close();
}
}
}
- private void processMessage() throws Exception
- {
+ private void processMessage() throws Exception {
Byte type = 0;
Byte priority = 0;
Long expiration = 0L;
@@ -263,11 +232,9 @@ public final class XmlDataImporter extends ActionAbstract
ArrayList<String> queues = new ArrayList<>();
// get message's attributes
- for (int i = 0; i < reader.getAttributeCount(); i++)
- {
+ for (int i = 0; i < reader.getAttributeCount(); i++) {
String attributeName = reader.getAttributeLocalName(i);
- switch (attributeName)
- {
+ switch (attributeName) {
case XmlDataConstants.MESSAGE_TYPE:
type = getMessageType(reader.getAttributeValue(i));
break;
@@ -292,34 +259,27 @@ public final class XmlDataImporter extends ActionAbstract
boolean endLoop = false;
// loop through the XML and gather up all the message's data (i.e. body, properties, queues, etc.)
- while (reader.hasNext())
- {
+ while (reader.hasNext()) {
int eventType = reader.getEventType();
- switch (eventType)
- {
+ switch (eventType) {
case XMLStreamConstants.START_ELEMENT:
- if (XmlDataConstants.MESSAGE_BODY.equals(reader.getLocalName()))
- {
+ if (XmlDataConstants.MESSAGE_BODY.equals(reader.getLocalName())) {
processMessageBody(message);
}
- else if (XmlDataConstants.PROPERTIES_CHILD.equals(reader.getLocalName()))
- {
+ else if (XmlDataConstants.PROPERTIES_CHILD.equals(reader.getLocalName())) {
processMessageProperties(message);
}
- else if (XmlDataConstants.QUEUES_CHILD.equals(reader.getLocalName()))
- {
+ else if (XmlDataConstants.QUEUES_CHILD.equals(reader.getLocalName())) {
processMessageQueues(queues);
}
break;
case XMLStreamConstants.END_ELEMENT:
- if (XmlDataConstants.MESSAGES_CHILD.equals(reader.getLocalName()))
- {
+ if (XmlDataConstants.MESSAGES_CHILD.equals(reader.getLocalName())) {
endLoop = true;
}
break;
}
- if (endLoop)
- {
+ if (endLoop) {
break;
}
reader.next();
@@ -328,11 +288,9 @@ public final class XmlDataImporter extends ActionAbstract
sendMessage(queues, message);
}
- private Byte getMessageType(String value)
- {
+ private Byte getMessageType(String value) {
Byte type = Message.DEFAULT_TYPE;
- switch (value)
- {
+ switch (value) {
case XmlDataConstants.DEFAULT_TYPE_PRETTY:
type = Message.DEFAULT_TYPE;
break;
@@ -355,24 +313,20 @@ public final class XmlDataImporter extends ActionAbstract
return type;
}
- private void sendMessage(ArrayList<String> queues, Message message) throws Exception
- {
+ private void sendMessage(ArrayList<String> queues, Message message) throws Exception {
StringBuilder logMessage = new StringBuilder();
String destination = addressMap.get(queues.get(0));
logMessage.append("Sending ").append(message).append(" to address: ").append(destination).append("; routed to queues: ");
ByteBuffer buffer = ByteBuffer.allocate(queues.size() * 8);
- for (String queue : queues)
- {
+ for (String queue : queues) {
long queueID;
- if (queueIDs.containsKey(queue))
- {
+ if (queueIDs.containsKey(queue)) {
queueID = queueIDs.get(queue);
}
- else
- {
+ else {
// Get the ID of the queues involved so the message can be routed properly. This is done because we cannot
// send directly to a queue, we have to send to an address instead but not all the queues related to the
// address may need the message
@@ -400,40 +354,32 @@ public final class XmlDataImporter extends ActionAbstract
producer.send(message);
producer.close();
- if (tempFileName.length() > 0)
- {
+ if (tempFileName.length() > 0) {
File tempFile = new File(tempFileName);
- if (!tempFile.delete())
- {
+ if (!tempFile.delete()) {
ActiveMQServerLogger.LOGGER.couldNotDeleteTempFile(tempFileName);
}
tempFileName = "";
}
}
- private void processMessageQueues(ArrayList<String> queues)
- {
- for (int i = 0; i < reader.getAttributeCount(); i++)
- {
- if (XmlDataConstants.QUEUE_NAME.equals(reader.getAttributeLocalName(i)))
- {
+ private void processMessageQueues(ArrayList<String> queues) {
+ for (int i = 0; i < reader.getAttributeCount(); i++) {
+ if (XmlDataConstants.QUEUE_NAME.equals(reader.getAttributeLocalName(i))) {
queues.add(reader.getAttributeValue(i));
}
}
}
- private void processMessageProperties(Message message)
- {
+ private void processMessageProperties(Message message) {
String key = "";
String value = "";
String propertyType = "";
String realValue = null;
- for (int i = 0; i < reader.getAttributeCount(); i++)
- {
+ for (int i = 0; i < reader.getAttributeCount(); i++) {
String attributeName = reader.getAttributeLocalName(i);
- switch (attributeName)
- {
+ switch (attributeName) {
case XmlDataConstants.PROPERTY_NAME:
key = reader.getAttributeValue(i);
break;
@@ -446,8 +392,7 @@ public final class XmlDataImporter extends ActionAbstract
}
}
- switch (propertyType)
- {
+ switch (propertyType) {
case XmlDataConstants.PROPERTY_TYPE_SHORT:
message.putShortProperty(key, Short.parseShort(value));
break;
@@ -473,15 +418,13 @@ public final class XmlDataImporter extends ActionAbstract
message.putLongProperty(key, Long.parseLong(value));
break;
case XmlDataConstants.PROPERTY_TYPE_SIMPLE_STRING:
- if (!value.equals(XmlDataConstants.NULL))
- {
+ if (!value.equals(XmlDataConstants.NULL)) {
realValue = value;
}
message.putStringProperty(new SimpleString(key), new SimpleString(realValue));
break;
case XmlDataConstants.PROPERTY_TYPE_STRING:
- if (!value.equals(XmlDataConstants.NULL))
- {
+ if (!value.equals(XmlDataConstants.NULL)) {
realValue = value;
}
message.putStringProperty(key, realValue);
@@ -489,33 +432,25 @@ public final class XmlDataImporter extends ActionAbstract
}
}
- private void processMessageBody(Message message) throws XMLStreamException, IOException
- {
+ private void processMessageBody(Message message) throws XMLStreamException, IOException {
boolean isLarge = false;
- for (int i = 0; i < reader.getAttributeCount(); i++)
- {
+ for (int i = 0; i < reader.getAttributeCount(); i++) {
String attributeName = reader.getAttributeLocalName(i);
- if (XmlDataConstants.MESSAGE_IS_LARGE.equals(attributeName))
- {
+ if (XmlDataConstants.MESSAGE_IS_LARGE.equals(attributeName)) {
isLarge = Boolean.parseBoolean(reader.getAttributeValue(i));
}
}
reader.next();
- if (isLarge)
- {
+ if (isLarge) {
tempFileName = UUID.randomUUID().toString() + ".tmp";
ActiveMQServerLogger.LOGGER.debug("Creating temp file " + tempFileName + " for large message.");
- try (OutputStream out = new FileOutputStream(tempFileName))
- {
- while (reader.hasNext())
- {
- if (reader.getEventType() == XMLStreamConstants.END_ELEMENT)
- {
+ try (OutputStream out = new FileOutputStream(tempFileName)) {
+ while (reader.hasNext()) {
+ if (reader.getEventType() == XMLStreamConstants.END_ELEMENT) {
break;
}
- else
- {
+ else {
String characters = new String(reader.getTextCharacters(), reader.getTextStart(), reader.getTextLength());
String trimmedCharacters = characters.trim();
if (trimmedCharacters.length() > 0) // this will skip "indentation" characters
@@ -531,25 +466,21 @@ public final class XmlDataImporter extends ActionAbstract
BufferedInputStream bufferedInput = new BufferedInputStream(fileInputStream);
((ClientMessage) message).setBodyInputStream(bufferedInput);
}
- else
- {
+ else {
reader.next(); // step past the "indentation" characters to get to the CDATA with the message body
String characters = new String(reader.getTextCharacters(), reader.getTextStart(), reader.getTextLength());
message.getBodyBuffer().writeBytes(decode(characters.trim()));
}
}
- private void bindQueue() throws Exception
- {
+ private void bindQueue() throws Exception {
String queueName = "";
String address = "";
String filter = "";
- for (int i = 0; i < reader.getAttributeCount(); i++)
- {
+ for (int i = 0; i < reader.getAttributeCount(); i++) {
String attributeName = reader.getAttributeLocalName(i);
- switch (attributeName)
- {
+ switch (attributeName) {
case XmlDataConstants.BINDING_ADDRESS:
address = reader.getAttributeValue(i);
break;
@@ -564,81 +495,66 @@ public final class XmlDataImporter extends ActionAbstract
ClientSession.QueueQuery queueQuery = session.queueQuery(new SimpleString(queueName));
- if (!queueQuery.isExists())
- {
+ if (!queueQuery.isExists()) {
session.createQueue(address, queueName, filter, true);
ActiveMQServerLogger.LOGGER.debug("Binding queue(name=" + queueName + ", address=" + address + ", filter=" + filter + ")");
}
- else
- {
+ else {
ActiveMQServerLogger.LOGGER.debug("Binding " + queueName + " already exists so won't re-bind.");
}
addressMap.put(queueName, address);
}
- private void createJmsConnectionFactories() throws Exception
- {
+ private void createJmsConnectionFactories() throws Exception {
boolean endLoop = false;
- while (reader.hasNext())
- {
+ while (reader.hasNext()) {
int eventType = reader.getEventType();
- switch (eventType)
- {
+ switch (eventType) {
case XMLStreamConstants.START_ELEMENT:
- if (XmlDataConstants.JMS_CONNECTION_FACTORY.equals(reader.getLocalName()))
- {
+ if (XmlDataConstants.JMS_CONNECTION_FACTORY.equals(reader.getLocalName())) {
createJmsConnectionFactory();
}
break;
case XMLStreamConstants.END_ELEMENT:
- if (XmlDataConstants.JMS_CONNECTION_FACTORIES.equals(reader.getLocalName()))
- {
+ if (XmlDataConstants.JMS_CONNECTION_FACTORIES.equals(reader.getLocalName())) {
endLoop = true;
}
break;
}
- if (endLoop)
- {
+ if (endLoop) {
break;
}
reader.next();
}
}
- private void createJmsDestinations() throws Exception
- {
+ private void createJmsDestinations() throws Exception {
boolean endLoop = false;
- while (reader.hasNext())
- {
+ while (reader.hasNext()) {
int eventType = reader.getEventType();
- switch (eventType)
- {
+ switch (eventType) {
case XMLStreamConstants.START_ELEMENT:
- if (XmlDataConstants.JMS_DESTINATION.equals(reader.getLocalName()))
- {
+ if (XmlDataConstants.JMS_DESTINATION.equals(reader.getLocalName())) {
createJmsDestination();
}
break;
case XMLStreamConstants.END_ELEMENT:
- if (XmlDataConstants.JMS_DESTINATIONS.equals(reader.getLocalName()))
- {
+ if (XmlDataConstants.JMS_DESTINATIONS.equals(reader.getLocalName())) {
endLoop = true;
}
break;
}
- if (endLoop)
- {
+ if (endLoop) {
break;
}
reader.next();
}
}
- private void createJmsConnectionFactory() throws Exception
- {
+ private void createJmsConnectionFactory() throws Exception {
String name = "";
String callFailoverTimeout = "";
String callTimeout = "";
@@ -678,202 +594,162 @@ public final class XmlDataImporter extends ActionAbstract
boolean endLoop = false;
- while (reader.hasNext())
- {
+ while (reader.hasNext()) {
int eventType = reader.getEventType();
- switch (eventType)
- {
+ switch (eventType) {
case XMLStreamConstants.START_ELEMENT:
- if (XmlDataConstants.JMS_CONNECTION_FACTORY_CALL_FAILOVER_TIMEOUT.equals(reader.getLocalName()))
- {
+ if (XmlDataConstants.JMS_CONNECTION_FACTORY_CALL_FAILOVER_TIMEOUT.equals(reader.getLocalName())) {
callFailoverTimeout = reader.getElementText();
ActiveMQServerLogger.LOGGER.debug("JMS connection factory callFailoverTimeout: " + callFailoverTimeout);
}
- else if (XmlDataConstants.JMS_CONNECTION_FACTORY_CALL_TIMEOUT.equals(reader.getLocalName()))
- {
+ else if (XmlDataConstants.JMS_CONNECTION_FACTORY_CALL_TIMEOUT.equals(reader.getLocalName())) {
callTimeout = reader.getElementText();
ActiveMQServerLogger.LOGGER.debug("JMS connection factory callTimeout: " + callTimeout);
}
- else if (XmlDataConstants.JMS_CONNECTION_FACTORY_CLIENT_FAILURE_CHECK_PERIOD.equals(reader.getLocalName()))
- {
+ else if (XmlDataConstants.JMS_CONNECTION_FACTORY_CLIENT_FAILURE_CHECK_PERIOD.equals(reader.getLocalName())) {
clientFailureCheckPeriod = reader.getElementText();
ActiveMQServerLogger.LOGGER.debug("JMS connection factory clientFailureCheckPeriod: " + clientFailureCheckPeriod);
}
- else if (XmlDataConstants.JMS_CONNECTION_FACTORY_CLIENT_ID.equals(reader.getLocalName()))
- {
+ else if (XmlDataConstants.JMS_CONNECTION_FACTORY_CLIENT_ID.equals(reader.getLocalName())) {
clientId = reader.getElementText();
ActiveMQServerLogger.LOGGER.debug("JMS connection factory clientId: " + clientId);
}
- else if (XmlDataConstants.JMS_CONNECTION_FACTORY_CONFIRMATION_WINDOW_SIZE.equals(reader.getLocalName()))
- {
+ else if (XmlDataConstants.JMS_CONNECTION_FACTORY_CONFIRMATION_WINDOW_SIZE.equals(reader.getLocalName())) {
confirmationWindowSize = reader.getElementText();
ActiveMQServerLogger.LOGGER.debug("JMS connection factory confirmationWindowSize: " + confirmationWindowSize);
}
- else if (XmlDataConstants.JMS_CONNECTION_FACTORY_CONNECTION_TTL.equals(reader.getLocalName()))
- {
+ else if (XmlDataConstants.JMS_CONNECTION_FACTORY_CONNECTION_TTL.equals(reader.getLocalName())) {
connectionTtl = reader.getElementText();
ActiveMQServerLogger.LOGGER.debug("JMS connection factory connectionTtl: " + connectionTtl);
}
- else if (XmlDataConstants.JMS_CONNECTION_FACTORY_CONNECTOR.equals(reader.getLocalName()))
- {
+ else if (XmlDataConstants.JMS_CONNECTION_FACTORY_CONNECTOR.equals(reader.getLocalName())) {
connectors = getConnectors();
ActiveMQServerLogger.LOGGER.debug("JMS connection factory getLocalName: " + connectors);
}
- else if (XmlDataConstants.JMS_CONNECTION_FACTORY_CONSUMER_MAX_RATE.equals(reader.getLocalName()))
- {
+ else if (XmlDataConstants.JMS_CONNECTION_FACTORY_CONSUMER_MAX_RATE.equals(reader.getLocalName())) {
consumerMaxRate = reader.getElementText();
ActiveMQServerLogger.LOGGER.debug("JMS connection factory consumerMaxRate: " + consumerMaxRate);
}
- else if (XmlDataConstants.JMS_CONNECTION_FACTORY_CONSUMER_WINDOW_SIZE.equals(reader.getLocalName()))
- {
+ else if (XmlDataConstants.JMS_CONNECTION_FACTORY_CONSUMER_WINDOW_SIZE.equals(reader.getLocalName())) {
consumerWindowSize = reader.getElementText();
ActiveMQServerLogger.LOGGER.debug("JMS connection factory consumerWindowSize: " + consumerWindowSize);
}
- else if (XmlDataConstants.JMS_CONNECTION_FACTORY_DISCOVERY_GROUP_NAME.equals(reader.getLocalName()))
- {
+ else if (XmlDataConstants.JMS_CONNECTION_FACTORY_DISCOVERY_GROUP_NAME.equals(reader.getLocalName())) {
discoveryGroupName = reader.getElementText();
ActiveMQServerLogger.LOGGER.debug("JMS connection factory discoveryGroupName: " + discoveryGroupName);
}
- else if (XmlDataConstants.JMS_CONNECTION_FACTORY_DUPS_OK_BATCH_SIZE.equals(reader.getLocalName()))
- {
+ else if (XmlDataConstants.JMS_CONNECTION_FACTORY_DUPS_OK_BATCH_SIZE.equals(reader.getLocalName())) {
dupsOkBatchSize = reader.getElementText();
ActiveMQServerLogger.LOGGER.debug("JMS connection factory dupsOkBatchSize: " + dupsOkBatchSize);
}
- else if (XmlDataConstants.JMS_CONNECTION_FACTORY_GROUP_ID.equals(reader.getLocalName()))
- {
+ else if (XmlDataConstants.JMS_CONNECTION_FACTORY_GROUP_ID.equals(reader.getLocalName())) {
groupId = reader.getElementText();
ActiveMQServerLogger.LOGGER.debug("JMS connection factory groupId: " + groupId);
}
- else if (XmlDataConstants.JMS_CONNECTION_FACTORY_LOAD_BALANCING_POLICY_CLASS_NAME.equals(reader.getLocalName()))
- {
+ else if (XmlDataConstants.JMS_CONNECTION_FACTORY_LOAD_BALANCING_POLICY_CLASS_NAME.equals(reader.getLocalName())) {
loadBalancingPolicyClassName = reader.getElementText();
ActiveMQServerLogger.LOGGER.debug("JMS connection factory loadBalancingPolicyClassName: " + loadBalancingPolicyClassName);
}
- else if (XmlDataConstants.JMS_CONNECTION_FACTORY_MAX_RETRY_INTERVAL.equals(reader.getLocalName()))
- {
+ else if (XmlDataConstants.JMS_CONNECTION_FACTORY_MAX_RETRY_INTERVAL.equals(reader.getLocalName())) {
maxRetryInterval = reader.getElementText();
ActiveMQServerLogger.LOGGER.debug("JMS connection factory maxRetryInterval: " + maxRetryInterval);
}
- else if (XmlDataConstants.JMS_CONNECTION_FACTORY_MIN_LARGE_MESSAGE_SIZE.equals(reader.getLocalName()))
- {
+ else if (XmlDataConstants.JMS_CONNECTION_FACTORY_MIN_LARGE_MESSAGE_SIZE.equals(reader.getLocalName())) {
minLargeMessageSize = reader.getElementText();
ActiveMQServerLogger.LOGGER.debug("JMS connection factory minLargeMessageSize: " + minLargeMessageSize);
}
- else if (XmlDataConstants.JMS_CONNECTION_FACTORY_NAME.equals(reader.getLocalName()))
- {
+ else if (XmlDataConstants.JMS_CONNECTION_FACTORY_NAME.equals(reader.getLocalName())) {
name = reader.getElementText();
ActiveMQServerLogger.LOGGER.debug("JMS connection factory name: " + name);
}
- else if (XmlDataConstants.JMS_CONNECTION_FACTORY_PRODUCER_MAX_RATE.equals(reader.getLocalName()))
- {
+ else if (XmlDataConstants.JMS_CONNECTION_FACTORY_PRODUCER_MAX_RATE.equals(reader.getLocalName())) {
producerMaxRate = reader.getElementText();
ActiveMQServerLogger.LOGGER.debug("JMS connection factory producerMaxRate: " + producerMaxRate);
}
- else if (XmlDataConstants.JMS_CONNECTION_FACTORY_PRODUCER_WINDOW_SIZE.equals(reader.getLocalName()))
- {
+ else if (XmlDataConstants.JMS_CONNECTION_FACTORY_PRODUCER_WINDOW_SIZE.equals(reader.getLocalName())) {
producerWindowSize = reader.getElementText();
ActiveMQServerLogger.LOGGER.debug("JMS connection factory producerWindowSize: " + producerWindowSize);
}
- else if (XmlDataConstants.JMS_CONNECTION_FACTORY_RECONNECT_ATTEMPTS.equals(reader.getLocalName()))
- {
+ else if (XmlDataConstants.JMS_CONNECTION_FACTORY_RECONNECT_ATTEMPTS.equals(reader.getLocalName())) {
reconnectAttempts = reader.getElementText();
ActiveMQServerLogger.LOGGER.debug("JMS connection factory reconnectAttempts: " + reconnectAttempts);
}
- else if (XmlDataConstants.JMS_CONNECTION_FACTORY_RETRY_INTERVAL.equals(reader.getLocalName()))
- {
+ else if (XmlDataConstants.JMS_CONNECTION_FACTORY_RETRY_INTERVAL.equals(reader.getLocalName())) {
retryInterval = reader.getElementText();
ActiveMQServerLogger.LOGGER.debug("JMS connection factory retryInterval: " + retryInterval);
}
- else if (XmlDataConstants.JMS_CONNECTION_FACTORY_RETRY_INTERVAL_MULTIPLIER.equals(reader.getLocalName()))
- {
+ else if (XmlDataConstants.JMS_CONNECTION_FACTORY_RETRY_INTERVAL_MULTIPLIER.equals(reader.getLocalName())) {
retryIntervalMultiplier = reader.getElementText();
ActiveMQServerLogger.LOGGER.debug("JMS connection factory retryIntervalMultiplier: " + retryIntervalMultiplier);
}
- else if (XmlDataConstants.JMS_CONNECTION_FACTORY_SCHEDULED_THREAD_POOL_MAX_SIZE.equals(reader.getLocalName()))
- {
+ else if (XmlDataConstants.JMS_CONNECTION_FACTORY_SCHEDULED_THREAD_POOL_MAX_SIZE.equals(reader.getLocalName())) {
scheduledThreadMaxPoolSize = reader.getElementText();
ActiveMQServerLogger.LOGGER.debug("JMS connection factory scheduledThreadMaxPoolSize: " + scheduledThreadMaxPoolSize);
}
- else if (XmlDataConstants.JMS_CONNECTION_FACTORY_THREAD_POOL_MAX_SIZE.equals(reader.getLocalName()))
- {
+ else if (XmlDataConstants.JMS_CONNECTION_FACTORY_THREAD_POOL_MAX_SIZE.equals(reader.getLocalName())) {
threadMaxPoolSize = reader.getElementText();
ActiveMQServerLogger.LOGGER.debug("JMS connection factory threadMaxPoolSize: " + threadMaxPoolSize);
}
- else if (XmlDataConstants.JMS_CONNECTION_FACTORY_TRANSACTION_BATCH_SIZE.equals(reader.getLocalName()))
- {
+ else if (XmlDataConstants.JMS_CONNECTION_FACTORY_TRANSACTION_BATCH_SIZE.equals(reader.getLocalName())) {
transactionBatchSize = reader.getElementText();
ActiveMQServerLogger.LOGGER.debug("JMS connection factory transactionBatchSize: " + transactionBatchSize);
}
- else if (XmlDataConstants.JMS_CONNECTION_FACTORY_TYPE.equals(reader.getLocalName()))
- {
+ else if (XmlDataConstants.JMS_CONNECTION_FACTORY_TYPE.equals(reader.getLocalName())) {
type = reader.getElementText();
ActiveMQServerLogger.LOGGER.debug("JMS connection factory type: " + type);
}
- else if (XmlDataConstants.JMS_JNDI_ENTRIES.equals(reader.getLocalName()))
- {
+ else if (XmlDataConstants.JMS_JNDI_ENTRIES.equals(reader.getLocalName())) {
entries = getEntries();
ActiveMQServerLogger.LOGGER.debug("JMS connection factory entries: " + entries);
}
- else if (XmlDataConstants.JMS_CONNECTION_FACTORY_AUTO_GROUP.equals(reader.getLocalName()))
- {
+ else if (XmlDataConstants.JMS_CONNECTION_FACTORY_AUTO_GROUP.equals(reader.getLocalName())) {
autoGroup = reader.getElementText();
ActiveMQServerLogger.LOGGER.debug("JMS connection factory autoGroup: " + autoGroup);
}
- else if (XmlDataConstants.JMS_CONNECTION_FACTORY_BLOCK_ON_ACKNOWLEDGE.equals(reader.getLocalName()))
- {
+ else if (XmlDataConstants.JMS_CONNECTION_FACTORY_BLOCK_ON_ACKNOWLEDGE.equals(reader.getLocalName())) {
blockOnAcknowledge = reader.getElementText();
ActiveMQServerLogger.LOGGER.debug("JMS connection factory blockOnAcknowledge: " + blockOnAcknowledge);
}
- else if (XmlDataConstants.JMS_CONNECTION_FACTORY_BLOCK_ON_DURABLE_SEND.equals(reader.getLocalName()))
- {
+ else if (XmlDataConstants.JMS_CONNECTION_FACTORY_BLOCK_ON_DURABLE_SEND.equals(reader.getLocalName())) {
blockOnDurableSend = reader.getElementText();
ActiveMQServerLogger.LOGGER.debug("JMS connection factory blockOnDurableSend: " + blockOnDurableSend);
}
- else if (XmlDataConstants.JMS_CONNECTION_FACTORY_BLOCK_ON_NON_DURABLE_SEND.equals(reader.getLocalName()))
- {
+ else if (XmlDataConstants.JMS_CONNECTION_FACTORY_BLOCK_ON_NON_DURABLE_SEND.equals(reader.getLocalName())) {
blockOnNonDurableSend = reader.getElementText();
ActiveMQServerLogger.LOGGER.debug("JMS connection factory blockOnNonDurableSend: " + blockOnNonDurableSend);
}
- else if (XmlDataConstants.JMS_CONNECTION_FACTORY_CACHE_LARGE_MESSAGES_CLIENT.equals(reader.getLocalName()))
- {
+ else if (XmlDataConstants.JMS_CONNECTION_FACTORY_CACHE_LARGE_MESSAGES_CLIENT.equals(reader.getLocalName())) {
cacheLargeMessagesClient = reader.getElementText();
ActiveMQServerLogger.LOGGER.info("JMS connection factory " + name + " cacheLargeMessagesClient: " + cacheLargeMessagesClient);
}
- else if (XmlDataConstants.JMS_CONNECTION_FACTORY_COMPRESS_LARGE_MESSAGES.equals(reader.getLocalName()))
- {
+ else if (XmlDataConstants.JMS_CONNECTION_FACTORY_COMPRESS_LARGE_MESSAGES.equals(reader.getLocalName())) {
compressLargeMessages = reader.getElementText();
ActiveMQServerLogger.LOGGER.debug("JMS connection factory compressLargeMessages: " + compressLargeMessages);
}
- else if (XmlDataConstants.JMS_CONNECTION_FACTORY_FAILOVER_ON_INITIAL_CONNECTION.equals(reader.getLocalName()))
- {
+ else if (XmlDataConstants.JMS_CONNECTION_FACTORY_FAILOVER_ON_INITIAL_CONNECTION.equals(reader.getLocalName())) {
failoverOnInitialConnection = reader.getElementText();
ActiveMQServerLogger.LOGGER.debug("JMS connection factory failoverOnInitialConnection: " + failoverOnInitialConnection);
}
- else if (XmlDataConstants.JMS_CONNECTION_FACTORY_HA.equals(reader.getLocalName()))
- {
+ else if (XmlDataConstants.JMS_CONNECTION_FACTORY_HA.equals(reader.getLocalName())) {
ha = reader.getElementText();
ActiveMQServerLogger.LOGGER.debug("JMS connection factory ha: " + ha);
}
- else if (XmlDataConstants.JMS_CONNECTION_FACTORY_PREACKNOWLEDGE.equals(reader.getLocalName()))
- {
+ else if (XmlDataConstants.JMS_CONNECTION_FACTORY_PREACKNOWLEDGE.equals(reader.getLocalName())) {
preacknowledge = reader.getElementText();
ActiveMQServerLogger.LOGGER.debug("JMS connection factory preacknowledge: " + preacknowledge);
}
- else if (XmlDataConstants.JMS_CONNECTION_FACTORY_USE_GLOBAL_POOLS.equals(reader.getLocalName()))
- {
+ else if (XmlDataConstants.JMS_CONNECTION_FACTORY_USE_GLOBAL_POOLS.equals(reader.getLocalName())) {
useGlobalPools = reader.getElementText();
ActiveMQServerLogger.LOGGER.debug("JMS connection factory useGlobalPools: " + useGlobalPools);
}
break;
case XMLStreamConstants.END_ELEMENT:
- if (XmlDataConstants.JMS_CONNECTION_FACTORY.equals(reader.getLocalName()))
- {
+ if (XmlDataConstants.JMS_CONNECTION_FACTORY.equals(reader.getLocalName())) {
endLoop = true;
}
break;
}
- if (endLoop)
- {
+ if (endLoop) {
break;
}
reader.next();
@@ -881,102 +757,54 @@ public final class XmlDataImporter extends ActionAbstract
ClientRequestor requestor = new ClientRequestor(managementSession, "jms.queue.activemq.management");
ClientMessage managementMessage = managementSession.createMessage(false);
- ManagementHelper.putOperationInvocation(managementMessage,
- ResourceNames.JMS_SERVER,
- "createConnectionFactory",
- name,
- Boolean.parseBoolean(ha),
- discoveryGroupName.length() > 0,
- Integer.parseInt(type),
- connectors,
- entries,
- clientId,
- Long.parseLong(clientFailureCheckPeriod),
- Long.parseLong(connectionTtl),
- Long.parseLong(callTimeout),
- Long.parseLong(callFailoverTimeout),
- Integer.parseInt(minLargeMessageSize),
- Boolean.parseBoolean(compressLargeMessages),
- Integer.parseInt(consumerWindowSize),
- Integer.parseInt(consumerMaxRate),
- Integer.parseInt(confirmationWindowSize),
- Integer.parseInt(producerWindowSize),
- Integer.parseInt(producerMaxRate),
- Boolean.parseBoolean(blockOnAcknowledge),
- Boolean.parseBoolean(blockOnDurableSend),
- Boolean.parseBoolean(blockOnNonDurableSend),
- Boolean.parseBoolean(autoGroup),
- Boolean.parseBoolean(preacknowledge),
- loadBalancingPolicyClassName,
- Integer.parseInt(transactionBatchSize),
- Integer.parseInt(dupsOkBatchSize),
- Boolean.parseBoolean(useGlobalPools),
- Integer.parseInt(scheduledThreadMaxPoolSize),
- Integer.parseInt(threadMaxPoolSize),
- Long.parseLong(retryInterval),
- Double.parseDouble(retryIntervalMultiplier),
- Long.parseLong(maxRetryInterval),
- Integer.parseInt(reconnectAttempts),
- Boolean.parseBoolean(failoverOnInitialConnection),
- groupId);
+ ManagementHelper.putOperationInvocation(managementMessage, ResourceNames.JMS_SERVER, "createConnectionFactory", name, Boolean.parseBoolean(ha), discoveryGroupName.length() > 0, Integer.parseInt(type), connectors, entries, clientId, Long.parseLong(clientFailureCheckPeriod), Long.parseLong(connectionTtl), Long.parseLong(callTimeout), Long.parseLong(callFailoverTimeout), Integer.parseInt(minLargeMessageSize), Boolean.parseBoolean(compressLargeMessages), Integer.parseInt(consumerWindowSize), Integer.parseInt(consumerMaxRate), Integer.parseInt(confirmationWindowSize), Integer.parseInt(producerWindowSize), Integer.parseInt(producerMaxRate), Boolean.parseBoolean(blockOnAcknowledge), Boolean.parseBoolean(blockOnDurableSend), Boolean.parseBoolean(blockOnNonDurableSend), Boolean.parseBoolean(autoGroup), Boolean.parseBoolean(preacknowledge), loadBalancingPolicyClassName, Integer.parseInt(transactionBatchSize), Integer.parseInt(dupsOkBatchSize), Boolean.parseBoolean(useGlobalPools), Integ
er.parseInt(scheduledThreadMaxPoolSize), Integer.parseInt(threadMaxPoolSize), Long.parseLong(retryInterval), Double.parseDouble(retryIntervalMultiplier), Long.parseLong(maxRetryInterval), Integer.parseInt(reconnectAttempts), Boolean.parseBoolean(failoverOnInitialConnection), groupId);
//Boolean.parseBoolean(cacheLargeMessagesClient));
managementSession.start();
ClientMessage reply = requestor.request(managementMessage);
- if (ManagementHelper.hasOperationSucceeded(reply))
- {
+ if (ManagementHelper.hasOperationSucceeded(reply)) {
ActiveMQServerLogger.LOGGER.debug("Created connection factory " + name);
}
- else
- {
+ else {
ActiveMQServerLogger.LOGGER.error("Problem creating " + name);
}
requestor.close();
}
- private void createJmsDestination() throws Exception
- {
+ private void createJmsDestination() throws Exception {
String name = "";
String selector = "";
String entries = "";
String type = "";
boolean endLoop = false;
- while (reader.hasNext())
- {
+ while (reader.hasNext()) {
int eventType = reader.getEventType();
- switch (eventType)
- {
+ switch (eventType) {
case XMLStreamConstants.START_ELEMENT:
- if (XmlDataConstants.JMS_DESTINATION_NAME.equals(reader.getLocalName()))
- {
+ if (XmlDataConstants.JMS_DESTINATION_NAME.equals(reader.getLocalName())) {
name = reader.getElementText();
ActiveMQServerLogger.LOGGER.debug("JMS destination name: " + name);
}
- else if (XmlDataConstants.JMS_DESTINATION_SELECTOR.equals(reader.getLocalName()))
- {
+ else if (XmlDataConstants.JMS_DESTINATION_SELECTOR.equals(reader.getLocalName())) {
selector = reader.getElementText();
ActiveMQServerLogger.LOGGER.debug("JMS destination selector: " + selector);
}
- else if (XmlDataConstants.JMS_DESTINATION_TYPE.equals(reader.getLocalName()))
- {
+ else if (XmlDataConstants.JMS_DESTINATION_TYPE.equals(reader.getLocalName())) {
type = reader.getElementText();
ActiveMQServerLogger.LOGGER.debug("JMS destination type: " + type);
}
- else if (XmlDataConstants.JMS_JNDI_ENTRIES.equals(reader.getLocalName()))
- {
+ else if (XmlDataConstants.JMS_JNDI_ENTRIES.equals(reader.getLocalName())) {
entries = getEntries();
}
break;
case XMLStreamConstants.END_ELEMENT:
- if (XmlDataConstants.JMS_DESTINATION.equals(reader.getLocalName()))
- {
+ if (XmlDataConstants.JMS_DESTINATION.equals(reader.getLocalName())) {
endLoop = true;
}
break;
}
- if (endLoop)
- {
+ if (endLoop) {
break;
}
reader.next();
@@ -984,55 +812,45 @@ public final class XmlDataImporter extends ActionAbstract
ClientRequestor requestor = new ClientRequestor(managementSession, "jms.queue.activemq.management");
ClientMessage managementMessage = managementSession.createMessage(false);
- if ("Queue".equals(type))
- {
+ if ("Queue".equals(type)) {
ManagementHelper.putOperationInvocation(managementMessage, ResourceNames.JMS_SERVER, "createQueue", name, entries, selector);
}
- else if ("Topic".equals(type))
- {
+ else if ("Topic".equals(type)) {
ManagementHelper.putOperationInvocation(managementMessage, ResourceNames.JMS_SERVER, "createTopic", name, entries);
}
managementSession.start();
ClientMessage reply = requestor.request(managementMessage);
- if (ManagementHelper.hasOperationSucceeded(reply))
- {
+ if (ManagementHelper.hasOperationSucceeded(reply)) {
ActiveMQServerLogger.LOGGER.debug("Created " + type.toLowerCase() + " " + name);
}
- else
- {
+ else {
ActiveMQServerLogger.LOGGER.error("Problem creating " + name);
}
requestor.close();
}
- private String getEntries() throws Exception
- {
+ private String getEntries() throws Exception {
StringBuilder entry = new StringBuilder();
boolean endLoop = false;
- while (reader.hasNext())
- {
+ while (reader.hasNext()) {
int eventType = reader.getEventType();
- switch (eventType)
- {
+ switch (eventType) {
case XMLStreamConstants.START_ELEMENT:
- if (XmlDataConstants.JMS_JNDI_ENTRY.equals(reader.getLocalName()))
- {
+ if (XmlDataConstants.JMS_JNDI_ENTRY.equals(reader.getLocalName())) {
String elementText = reader.getElementText();
entry.append(elementText).append(", ");
ActiveMQServerLogger.LOGGER.debug("JMS admin object JNDI entry: " + entry.toString());
}
break;
case XMLStreamConstants.END_ELEMENT:
- if (XmlDataConstants.JMS_JNDI_ENTRIES.equals(reader.getLocalName()))
- {
+ if (XmlDataConstants.JMS_JNDI_ENTRIES.equals(reader.getLocalName())) {
endLoop = true;
}
break;
}
- if (endLoop)
- {
+ if (endLoop) {
break;
}
reader.next();
@@ -1041,31 +859,25 @@ public final class XmlDataImporter extends ActionAbstract
return entry.delete(entry.length() - 2, entry.length()).toString();
}
- private String getConnectors() throws Exception
- {
+ private String getConnectors() throws Exception {
StringBuilder entry = new StringBuilder();
boolean endLoop = false;
- while (reader.hasNext())
- {
+ while (reader.hasNext()) {
int eventType = reader.getEventType();
- switch (eventType)
- {
+ switch (eventType) {
case XMLStreamConstants.START_ELEMENT:
- if (XmlDataConstants.JMS_CONNECTION_FACTORY_CONNECTOR.equals(reader.getLocalName()))
- {
+ if (XmlDataConstants.JMS_CONNECTION_FACTORY_CONNECTOR.equals(reader.getLocalName())) {
entry.append(reader.getElementText()).append(", ");
}
break;
case XMLStreamConstants.END_ELEMENT:
- if (XmlDataConstants.JMS_CONNECTION_FACTORY_CONNECTORS.equals(reader.getLocalName()))
- {
+ if (XmlDataConstants.JMS_CONNECTION_FACTORY_CONNECTORS.equals(reader.getLocalName())) {
endLoop = true;
}
break;
}
- if (endLoop)
- {
+ if (endLoop) {
break;
}
reader.next();
@@ -1080,8 +892,7 @@ public final class XmlDataImporter extends ActionAbstract
// Private -------------------------------------------------------
- private static byte[] decode(String data)
- {
+ private static byte[] decode(String data) {
return Base64.decode(data, Base64.DONT_BREAK_LINES | Base64.URL_SAFE);
}
http://git-wip-us.apache.org/repos/asf/activemq-artemis/blob/bac96047/artemis-cli/src/main/java/org/apache/activemq/artemis/cli/commands/util/ConsumerThread.java
----------------------------------------------------------------------
diff --git a/artemis-cli/src/main/java/org/apache/activemq/artemis/cli/commands/util/ConsumerThread.java b/artemis-cli/src/main/java/org/apache/activemq/artemis/cli/commands/util/ConsumerThread.java
index 8a241b7..fea2f3e 100644
--- a/artemis-cli/src/main/java/org/apache/activemq/artemis/cli/commands/util/ConsumerThread.java
+++ b/artemis-cli/src/main/java/org/apache/activemq/artemis/cli/commands/util/ConsumerThread.java
@@ -16,7 +16,6 @@
*/
package org.apache.activemq.artemis.cli.commands.util;
-
import javax.jms.BytesMessage;
import javax.jms.Destination;
import javax.jms.JMSException;
@@ -30,8 +29,7 @@ import javax.jms.Topic;
import java.util.Enumeration;
import java.util.concurrent.CountDownLatch;
-public class ConsumerThread extends Thread
-{
+public class ConsumerThread extends Thread {
int messageCount = 1000;
int receiveTimeOut = 3000;
@@ -52,57 +50,45 @@ public class ConsumerThread extends Thread
CountDownLatch finished;
boolean bytesAsText;
- public ConsumerThread(Session session, Destination destination, int threadNr)
- {
+ public ConsumerThread(Session session, Destination destination, int threadNr) {
super("Consumer " + destination.toString() + ", thread=" + threadNr);
this.destination = destination;
this.session = session;
}
@Override
- public void run()
- {
- if (browse)
- {
+ public void run() {
+ if (browse) {
browse();
}
- else
- {
+ else {
consume();
}
}
- public void browse()
- {
+ public void browse() {
running = true;
QueueBrowser consumer = null;
String threadName = Thread.currentThread().getName();
System.out.println(threadName + " wait until " + messageCount + " messages are consumed");
- try
- {
- if (filter != null)
- {
- consumer = session.createBrowser((Queue)destination, filter);
+ try {
+ if (filter != null) {
+ consumer = session.createBrowser((Queue) destination, filter);
}
- else
- {
- consumer = session.createBrowser((Queue)destination);
+ else {
+ consumer = session.createBrowser((Queue) destination);
}
Enumeration<Message> enumBrowse = consumer.getEnumeration();
- while (enumBrowse.hasMoreElements())
- {
+ while (enumBrowse.hasMoreElements()) {
Message msg = enumBrowse.nextElement();
- if (msg != null)
- {
+ if (msg != null) {
System.out.println(threadName + " Received " + (msg instanceof TextMessage ? ((TextMessage) msg).getText() : msg.getJMSMessageID()));
- if (verbose)
- {
- System.out.println("..." + msg);
+ if (verbose) {
+ System.out.println("..." + msg);
}
- if (bytesAsText && (msg instanceof BytesMessage))
- {
+ if (bytesAsText && (msg instanceof BytesMessage)) {
long length = ((BytesMessage) msg).getBodyLength();
byte[] bytes = new byte[(int) length];
((BytesMessage) msg).readBytes(bytes);
@@ -110,18 +96,15 @@ public class ConsumerThread extends Thread
}
received++;
- if (received >= messageCount)
- {
+ if (received >= messageCount) {
break;
}
}
- else
- {
+ else {
break;
}
- if (sleep > 0)
- {
+ if (sleep > 0) {
Thread.sleep(sleep);
}
@@ -129,25 +112,19 @@ public class ConsumerThread extends Thread
consumer.close();
}
- catch (Exception e)
- {
+ catch (Exception e) {
e.printStackTrace();
}
- finally
- {
- if (finished != null)
- {
+ finally {
+ if (finished != null) {
finished.countDown();
}
- if (consumer != null)
- {
+ if (consumer != null) {
System.out.println(threadName + " Consumed: " + this.getReceived() + " messages");
- try
- {
+ try {
consumer.close();
}
- catch (JMSException e)
- {
+ catch (JMSException e) {
e.printStackTrace();
}
}
@@ -156,48 +133,36 @@ public class ConsumerThread extends Thread
System.out.println(threadName + " Consumer thread finished");
}
- public void consume()
- {
+ public void consume() {
running = true;
MessageConsumer consumer = null;
String threadName = Thread.currentThread().getName();
System.out.println(threadName + " wait until " + messageCount + " messages are consumed");
- try
- {
- if (durable && destination instanceof Topic)
- {
- if (filter != null)
- {
+ try {
+ if (durable && destination instanceof Topic) {
+ if (filter != null) {
consumer = session.createDurableSubscriber((Topic) destination, getName(), filter, false);
}
- else
- {
+ else {
consumer = session.createDurableSubscriber((Topic) destination, getName());
}
}
- else
- {
- if (filter != null)
- {
+ else {
+ if (filter != null) {
consumer = session.createConsumer(destination, filter);
}
- else
- {
+ else {
consumer = session.createConsumer(destination);
}
}
- while (running && received < messageCount)
- {
+ while (running && received < messageCount) {
Message msg = consumer.receive(receiveTimeOut);
- if (msg != null)
- {
+ if (msg != null) {
System.out.println(threadName + " Received " + (msg instanceof TextMessage ? ((TextMessage) msg).getText() : msg.getJMSMessageID()));
- if (verbose)
- {
- System.out.println("..." + msg);
+ if (verbose) {
+ System.out.println("..." + msg);
}
- if (bytesAsText && (msg instanceof BytesMessage))
- {
+ if (bytesAsText && (msg instanceof BytesMessage)) {
long length = ((BytesMessage) msg).getBodyLength();
byte[] bytes = new byte[(int) length];
((BytesMessage) msg).readBytes(bytes);
@@ -205,64 +170,49 @@ public class ConsumerThread extends Thread
}
received++;
}
- else
- {
- if (breakOnNull)
- {
+ else {
+ if (breakOnNull) {
break;
}
}
- if (session.getTransacted())
- {
- if (batchSize > 0 && received > 0 && received % batchSize == 0)
- {
+ if (session.getTransacted()) {
+ if (batchSize > 0 && received > 0 && received % batchSize == 0) {
System.out.println(threadName + " Committing transaction: " + transactions++);
session.commit();
}
}
- else if (session.getAcknowledgeMode() == Session.CLIENT_ACKNOWLEDGE)
- {
- if (batchSize > 0 && received > 0 && received % batchSize == 0)
- {
+ else if (session.getAcknowledgeMode() == Session.CLIENT_ACKNOWLEDGE) {
+ if (batchSize > 0 && received > 0 && received % batchSize == 0) {
System.out.println("Acknowledging last " + batchSize + " messages; messages so far = " + received);
msg.acknowledge();
}
}
- if (sleep > 0)
- {
+ if (sleep > 0) {
Thread.sleep(sleep);
}
}
- try
- {
+ try {
session.commit();
}
- catch (Throwable ignored)
- {
+ catch (Throwable ignored) {
}
}
- catch (Exception e)
- {
+ catch (Exception e) {
e.printStackTrace();
}
- finally
- {
- if (finished != null)
- {
+ finally {
+ if (finished != null) {
finished.countDown();
}
- if (consumer != null)
- {
+ if (consumer != null) {
System.out.println(threadName + " Consumed: " + this.getReceived() + " messages");
- try
- {
+ try {
consumer.close();
}
- catch (JMSException e)
- {
+ catch (JMSException e) {
e.printStackTrace();
}
}
@@ -271,139 +221,114 @@ public class ConsumerThread extends Thread
System.out.println(threadName + " Consumer thread finished");
}
- public int getReceived()
- {
+ public int getReceived() {
return received;
}
- public boolean isDurable()
- {
+ public boolean isDurable() {
return durable;
}
- public ConsumerThread setDurable(boolean durable)
- {
+ public ConsumerThread setDurable(boolean durable) {
this.durable = durable;
return this;
}
- public ConsumerThread setMessageCount(int messageCount)
- {
+ public ConsumerThread setMessageCount(int messageCount) {
this.messageCount = messageCount;
return this;
}
- public ConsumerThread setBreakOnNull(boolean breakOnNull)
- {
+ public ConsumerThread setBreakOnNull(boolean breakOnNull) {
this.breakOnNull = breakOnNull;
return this;
}
- public int getBatchSize()
- {
+ public int getBatchSize() {
return batchSize;
}
- public ConsumerThread setBatchSize(int batchSize)
- {
+ public ConsumerThread setBatchSize(int batchSize) {
this.batchSize = batchSize;
return this;
}
- public int getMessageCount()
- {
+ public int getMessageCount() {
return messageCount;
}
- public boolean isBreakOnNull()
- {
+ public boolean isBreakOnNull() {
return breakOnNull;
}
- public int getReceiveTimeOut()
- {
+ public int getReceiveTimeOut() {
return receiveTimeOut;
}
- public ConsumerThread setReceiveTimeOut(int receiveTimeOut)
- {
+ public ConsumerThread setReceiveTimeOut(int receiveTimeOut) {
this.receiveTimeOut = receiveTimeOut;
return this;
}
- public boolean isRunning()
- {
+ public boolean isRunning() {
return running;
}
- public ConsumerThread setRunning(boolean running)
- {
+ public ConsumerThread setRunning(boolean running) {
this.running = running;
return this;
}
- public int getSleep()
- {
+ public int getSleep() {
return sleep;
}
- public ConsumerThread setSleep(int sleep)
- {
+ public ConsumerThread setSleep(int sleep) {
this.sleep = sleep;
return this;
}
- public CountDownLatch getFinished()
- {
+ public CountDownLatch getFinished() {
return finished;
}
- public ConsumerThread setFinished(CountDownLatch finished)
- {
+ public ConsumerThread setFinished(CountDownLatch finished) {
this.finished = finished;
return this;
}
- public boolean isBytesAsText()
- {
+ public boolean isBytesAsText() {
return bytesAsText;
}
- public boolean isVerbose()
- {
+ public boolean isVerbose() {
return verbose;
}
- public ConsumerThread setVerbose(boolean verbose)
- {
+ public ConsumerThread setVerbose(boolean verbose) {
this.verbose = verbose;
return this;
}
- public ConsumerThread setBytesAsText(boolean bytesAsText)
- {
+ public ConsumerThread setBytesAsText(boolean bytesAsText) {
this.bytesAsText = bytesAsText;
return this;
}
- public String getFilter()
- {
+ public String getFilter() {
return filter;
}
- public ConsumerThread setFilter(String filter)
- {
+ public ConsumerThread setFilter(String filter) {
this.filter = filter;
return this;
}
- public boolean isBrowse()
- {
+ public boolean isBrowse() {
return browse;
}
- public ConsumerThread setBrowse(boolean browse)
- {
+ public ConsumerThread setBrowse(boolean browse) {
this.browse = browse;
return this;
}
http://git-wip-us.apache.org/repos/asf/activemq-artemis/blob/bac96047/artemis-cli/src/main/java/org/apache/activemq/artemis/cli/commands/util/ProducerThread.java
----------------------------------------------------------------------
diff --git a/artemis-cli/src/main/java/org/apache/activemq/artemis/cli/commands/util/ProducerThread.java b/artemis-cli/src/main/java/org/apache/activemq/artemis/cli/commands/util/ProducerThread.java
index 820ebbd..427443f 100644
--- a/artemis-cli/src/main/java/org/apache/activemq/artemis/cli/commands/util/ProducerThread.java
+++ b/artemis-cli/src/main/java/org/apache/activemq/artemis/cli/commands/util/ProducerThread.java
@@ -16,7 +16,6 @@
*/
package org.apache.activemq.artemis.cli.commands.util;
-
import javax.jms.BytesMessage;
import javax.jms.DeliveryMode;
import javax.jms.Destination;
@@ -33,8 +32,8 @@ import java.util.concurrent.atomic.AtomicInteger;
import org.apache.activemq.artemis.utils.ReusableLatch;
-public class ProducerThread extends Thread
-{
+public class ProducerThread extends Thread {
+
protected final Session session;
boolean verbose;
@@ -59,20 +58,16 @@ public class ProducerThread extends Thread
final ReusableLatch finished = new ReusableLatch(1);
final ReusableLatch paused = new ReusableLatch(0);
-
- public ProducerThread(Session session, Destination destination, int threadNr)
- {
+ public ProducerThread(Session session, Destination destination, int threadNr) {
super("Producer " + destination.toString() + ", thread=" + threadNr);
this.destination = destination;
this.session = session;
}
- public void run()
- {
+ public void run() {
MessageProducer producer = null;
String threadName = Thread.currentThread().getName();
- try
- {
+ try {
producer = session.createProducer(destination);
producer.setDeliveryMode(persistent ? DeliveryMode.PERSISTENT : DeliveryMode.NON_PERSISTENT);
producer.setTimeToLive(msgTTL);
@@ -82,30 +77,24 @@ public class ProducerThread extends Thread
System.out.println(threadName + " Started to calculate elapsed time ...\n");
long tStart = System.currentTimeMillis();
- if (runIndefinitely)
- {
- while (running)
- {
+ if (runIndefinitely) {
+ while (running) {
paused.await();
sendMessage(producer, threadName);
sentCount.incrementAndGet();
}
}
- else
- {
- for (sentCount.set(0); sentCount.get() < messageCount && running; sentCount.incrementAndGet())
- {
+ else {
+ for (sentCount.set(0); sentCount.get() < messageCount && running; sentCount.incrementAndGet()) {
paused.await();
sendMessage(producer, threadName);
}
}
- try
- {
+ try {
session.commit();
}
- catch (Throwable ignored)
- {
+ catch (Throwable ignored) {
}
System.out.println(threadName + " Produced: " + this.getSentCount() + " messages");
@@ -115,96 +104,74 @@ public class ProducerThread extends Thread
System.out.println(threadName + " Elapsed time in milli second : " + (tEnd - tStart) + " milli seconds");
}
- catch (Exception e)
- {
+ catch (Exception e) {
e.printStackTrace();
}
- finally
- {
- if (finished != null)
- {
+ finally {
+ if (finished != null) {
finished.countDown();
}
- if (producer != null)
- {
- try
- {
+ if (producer != null) {
+ try {
producer.close();
}
- catch (JMSException e)
- {
+ catch (JMSException e) {
e.printStackTrace();
}
}
}
}
- private void sendMessage(MessageProducer producer, String threadName) throws Exception
- {
+ private void sendMessage(MessageProducer producer, String threadName) throws Exception {
Message message = createMessage(sentCount.get(), threadName);
producer.send(message);
- if (verbose)
- {
+ if (verbose) {
System.out.println(threadName + " Sent: " + (message instanceof TextMessage ? ((TextMessage) message).getText() : message.getJMSMessageID()));
}
- if (transactionBatchSize > 0 && sentCount.get() > 0 && sentCount.get() % transactionBatchSize == 0)
- {
+ if (transactionBatchSize > 0 && sentCount.get() > 0 && sentCount.get() % transactionBatchSize == 0) {
System.out.println(threadName + " Committing transaction: " + transactions++);
session.commit();
}
- if (sleep > 0)
- {
+ if (sleep > 0) {
Thread.sleep(sleep);
}
}
- private void initPayLoad()
- {
- if (messageSize > 0)
- {
+ private void initPayLoad() {
+ if (messageSize > 0) {
payload = new byte[messageSize];
- for (int i = 0; i < payload.length; i++)
- {
+ for (int i = 0; i < payload.length; i++) {
payload[i] = '.';
}
}
}
- protected Message createMessage(int i, String threadName) throws Exception
- {
+ protected Message createMessage(int i, String threadName) throws Exception {
Message answer;
- if (payload != null)
- {
+ if (payload != null) {
answer = session.createBytesMessage();
((BytesMessage) answer).writeBytes(payload);
}
- else
- {
- if (textMessageSize > 0)
- {
- if (messageText == null)
- {
+ else {
+ if (textMessageSize > 0) {
+ if (messageText == null) {
messageText = readInputStream(getClass().getResourceAsStream("demo.txt"), textMessageSize, i);
}
}
- else if (payloadUrl != null)
- {
+ else if (payloadUrl != null) {
messageText = readInputStream(new URL(payloadUrl).openStream(), -1, i);
}
- else if (message != null)
- {
+ else if (message != null) {
messageText = message;
}
- else
- {
+ else {
messageText = createDefaultMessage(i);
}
answer = session.createTextMessage(messageText);
}
- if ((msgGroupID != null) && (!msgGroupID.isEmpty()))
- {
+ if ((msgGroupID != null) && (!msgGroupID.isEmpty())) {
answer.setStringProperty("JMSXGroupID", msgGroupID);
}
@@ -213,218 +180,178 @@ public class ProducerThread extends Thread
return answer;
}
- private String readInputStream(InputStream is, int size, int messageNumber) throws IOException
- {
+ private String readInputStream(InputStream is, int size, int messageNumber) throws IOException {
InputStreamReader reader = new InputStreamReader(is);
- try
- {
+ try {
char[] buffer;
- if (size > 0)
- {
+ if (size > 0) {
buffer = new char[size];
}
- else
- {
+ else {
buffer = new char[1024];
}
int count;
StringBuilder builder = new StringBuilder();
- while ((count = reader.read(buffer)) != -1)
- {
+ while ((count = reader.read(buffer)) != -1) {
builder.append(buffer, 0, count);
- if (size > 0) break;
+ if (size > 0)
+ break;
}
return builder.toString();
}
- catch (IOException ioe)
- {
+ catch (IOException ioe) {
return createDefaultMessage(messageNumber);
}
- finally
- {
+ finally {
reader.close();
}
}
- private String createDefaultMessage(int messageNumber)
- {
+ private String createDefaultMessage(int messageNumber) {
return "test message: " + messageNumber;
}
- public ProducerThread setMessageCount(int messageCount)
- {
+ public ProducerThread setMessageCount(int messageCount) {
this.messageCount = messageCount;
return this;
}
- public int getSleep()
- {
+ public int getSleep() {
return sleep;
}
- public ProducerThread setSleep(int sleep)
- {
+ public ProducerThread setSleep(int sleep) {
this.sleep = sleep;
return this;
}
- public int getMessageCount()
- {
+ public int getMessageCount() {
return messageCount;
}
- public int getSentCount()
- {
+ public int getSentCount() {
return sentCount.get();
}
- public boolean isPersistent()
- {
+ public boolean isPersistent() {
return persistent;
}
- public ProducerThread setPersistent(boolean persistent)
- {
+ public ProducerThread setPersistent(boolean persistent) {
this.persistent = persistent;
return this;
}
- public boolean isRunning()
- {
+ public boolean isRunning() {
return running;
}
- public ProducerThread setRunning(boolean running)
- {
+ public ProducerThread setRunning(boolean running) {
this.running = running;
return this;
}
- public long getMsgTTL()
- {
+ public long getMsgTTL() {
return msgTTL;
}
- public ProducerThread setMsgTTL(long msgTTL)
- {
+ public ProducerThread setMsgTTL(long msgTTL) {
this.msgTTL = msgTTL;
return this;
}
- public int getTransactionBatchSize()
- {
+ public int getTransactionBatchSize() {
return transactionBatchSize;
}
- public ProducerThread setTransactionBatchSize(int transactionBatchSize)
- {
+ public ProducerThread setTransactionBatchSize(int transactionBatchSize) {
this.transactionBatchSize = transactionBatchSize;
return this;
}
- public String getMsgGroupID()
- {
+ public String getMsgGroupID() {
return msgGroupID;
}
- public ProducerThread setMsgGroupID(String msgGroupID)
- {
+ public ProducerThread setMsgGroupID(String msgGroupID) {
this.msgGroupID = msgGroupID;
return this;
}
- public int getTextMessageSize()
- {
+ public int getTextMessageSize() {
return textMessageSize;
}
- public ProducerThread setTextMessageSize(int textMessageSize)
- {
+ public ProducerThread setTextMessageSize(int textMessageSize) {
this.textMessageSize = textMessageSize;
return this;
}
- public int getMessageSize()
- {
+ public int getMessageSize() {
return messageSize;
}
- public ProducerThread setMessageSize(int messageSize)
- {
+ public ProducerThread setMessageSize(int messageSize) {
this.messageSize = messageSize;
return this;
}
- public ReusableLatch getFinished()
- {
+ public ReusableLatch getFinished() {
return finished;
}
- public ProducerThread setFinished(int value)
- {
+ public ProducerThread setFinished(int value) {
finished.setCount(value);
return this;
}
- public String getPayloadUrl()
- {
+ public String getPayloadUrl() {
return payloadUrl;
}
- public ProducerThread setPayloadUrl(String payloadUrl)
- {
+ public ProducerThread setPayloadUrl(String payloadUrl) {
this.payloadUrl = payloadUrl;
return this;
}
- public String getMessage()
- {
+ public String getMessage() {
return message;
}
- public ProducerThread setMessage(String message)
- {
+ public ProducerThread setMessage(String message) {
this.message = message;
return this;
}
- public boolean isRunIndefinitely()
- {
+ public boolean isRunIndefinitely() {
return runIndefinitely;
}
- public ProducerThread setRunIndefinitely(boolean runIndefinitely)
- {
+ public ProducerThread setRunIndefinitely(boolean runIndefinitely) {
this.runIndefinitely = runIndefinitely;
return this;
}
- public ProducerThread pauseProducer()
- {
+ public ProducerThread pauseProducer() {
this.paused.countUp();
return this;
}
- public ProducerThread resumeProducer()
- {
+ public ProducerThread resumeProducer() {
this.paused.countDown();
return this;
}
- public ProducerThread resetCounters()
- {
+ public ProducerThread resetCounters() {
this.sentCount.set(0);
return this;
}
-
- public boolean isVerbose()
- {
+ public boolean isVerbose() {
return verbose;
}
- public ProducerThread setVerbose(boolean verbose)
- {
+ public ProducerThread setVerbose(boolean verbose) {
this.verbose = verbose;
return this;
}
http://git-wip-us.apache.org/repos/asf/activemq-artemis/blob/bac96047/artemis-cli/src/main/java/org/apache/activemq/artemis/cli/commands/util/SyncCalculation.java
----------------------------------------------------------------------
diff --git a/artemis-cli/src/main/java/org/apache/activemq/artemis/cli/commands/util/SyncCalculation.java b/artemis-cli/src/main/java/org/apache/activemq/artemis/cli/commands/util/SyncCalculation.java
index b5a8845..e3799d9 100644
--- a/artemis-cli/src/main/java/org/apache/activemq/artemis/cli/commands/util/SyncCalculation.java
+++ b/artemis-cli/src/main/java/org/apache/activemq/artemis/cli/commands/util/SyncCalculation.java
@@ -35,19 +35,22 @@ import org.apache.activemq.artemis.utils.ReusableLatch;
* It will perform a simple test to evaluate how many syncs a disk can make per second
* * *
*/
-public class SyncCalculation
-{
+public class SyncCalculation {
+
/**
* It will perform a write test of blockSize * bocks, sinc on each write, for N tries.
* It will return the lowest spent time from the tries.
*/
- public static long syncTest(File datafolder, int blockSize, int blocks, int tries, boolean verbose, boolean aio) throws Exception
- {
+ public static long syncTest(File datafolder,
+ int blockSize,
+ int blocks,
+ int tries,
+ boolean verbose,
+ boolean aio) throws Exception {
SequentialFileFactory factory = newFactory(datafolder, aio);
SequentialFile file = factory.createSequentialFile("test.tmp");
- try
- {
+ try {
file.delete();
file.open();
@@ -57,8 +60,7 @@ public class SyncCalculation
byte[] block = new byte[blockSize];
- for (int i = 0; i < block.length; i++)
- {
+ for (int i = 0; i < block.length; i++) {
block[i] = (byte) 't';
}
@@ -68,39 +70,32 @@ public class SyncCalculation
final ReusableLatch latch = new ReusableLatch(0);
- IOCallback callback = new IOCallback()
- {
+ IOCallback callback = new IOCallback() {
@Override
- public void done()
- {
+ public void done() {
latch.countDown();
}
@Override
- public void onError(int errorCode, String errorMessage)
- {
+ public void onError(int errorCode, String errorMessage) {
}
};
DecimalFormat dcformat = new DecimalFormat("###.##");
- for (int ntry = 0; ntry < tries; ntry++)
- {
+ for (int ntry = 0; ntry < tries; ntry++) {
- if (verbose)
- {
+ if (verbose) {
System.out.println("**************************************************");
System.out.println(ntry + " of " + tries + " calculation");
}
file.position(0);
long start = System.currentTimeMillis();
- for (int i = 0; i < blocks; i++)
- {
+ for (int i = 0; i < blocks; i++) {
bufferBlock.position(0);
latch.countUp();
file.writeDirect(bufferBlock, true, callback);
- if (!latch.await(5, TimeUnit.SECONDS))
- {
+ if (!latch.await(5, TimeUnit.SECONDS)) {
throw new IOException("Callback wasn't called");
}
}
@@ -108,9 +103,8 @@ public class SyncCalculation
result[ntry] = (end - start);
- if (verbose)
- {
- double writesPerMillisecond = (double)blocks / (double) result[ntry];
+ if (verbose) {
+ double writesPerMillisecond = (double) blocks / (double) result[ntry];
System.out.println("Time = " + result[ntry]);
System.out.println("Writes / millisecond = " + dcformat.format(writesPerMillisecond));
System.out.println("bufferTimeout = " + toNanos(result[ntry], blocks));
@@ -121,45 +115,34 @@ public class SyncCalculation
factory.releaseDirectBuffer(bufferBlock);
long totalTime = Long.MAX_VALUE;
- for (int i = 0; i < tries; i++)
- {
- if (result[i] < totalTime)
- {
+ for (int i = 0; i < tries; i++) {
+ if (result[i] < totalTime) {
totalTime = result[i];
}
}
return totalTime;
}
- finally
- {
- try
- {
+ finally {
+ try {
file.close();
}
- catch (Exception e)
- {
+ catch (Exception e) {
}
- try
- {
+ try {
file.delete();
}
- catch (Exception e)
- {
+ catch (Exception e) {
}
- try
- {
+ try {
factory.stop();
}
- catch (Exception e)
- {
+ catch (Exception e) {
}
}
}
-
- public static long toNanos(long time, long blocks)
- {
+ public static long toNanos(long time, long blocks) {
double blocksPerMillisecond = (double) blocks / (double) (time);
@@ -170,18 +153,15 @@ public class SyncCalculation
return timeWait;
}
- private static SequentialFileFactory newFactory(File datafolder, boolean aio)
- {
- if (aio && LibaioContext.isLoaded())
- {
+ private static SequentialFileFactory newFactory(File datafolder, boolean aio) {
+ if (aio && LibaioContext.isLoaded()) {
SequentialFileFactory factory = new AIOSequentialFileFactory(datafolder, 1);
factory.start();
((AIOSequentialFileFactory) factory).disableBufferReuse();
return factory;
}
- else
- {
+ else {
SequentialFileFactory factory = new NIOSequentialFileFactory(datafolder, 1);
factory.start();
return factory;
http://git-wip-us.apache.org/repos/asf/activemq-artemis/blob/bac96047/artemis-cli/src/main/java/org/apache/activemq/artemis/cli/process/ProcessBuilder.java
----------------------------------------------------------------------
diff --git a/artemis-cli/src/main/java/org/apache/activemq/artemis/cli/process/ProcessBuilder.java b/artemis-cli/src/main/java/org/apache/activemq/artemis/cli/process/ProcessBuilder.java
index eaf4865..bbfb82f 100644
--- a/artemis-cli/src/main/java/org/apache/activemq/artemis/cli/process/ProcessBuilder.java
+++ b/artemis-cli/src/main/java/org/apache/activemq/artemis/cli/process/ProcessBuilder.java
@@ -24,19 +24,15 @@ import java.io.InputStreamReader;
import org.apache.activemq.artemis.utils.ConcurrentHashSet;
-public class ProcessBuilder
-{
+public class ProcessBuilder {
+
static ConcurrentHashSet<Process> processes = new ConcurrentHashSet<>();
- static
- {
- Runtime.getRuntime().addShutdownHook(new Thread()
- {
- public void run()
- {
- for (Process p : processes)
- {
-// if (p.isAlive())
+ static {
+ Runtime.getRuntime().addShutdownHook(new Thread() {
+ public void run() {
+ for (Process p : processes) {
+ // if (p.isAlive())
{
p.destroy();
}
@@ -45,43 +41,36 @@ public class ProcessBuilder
});
}
-
/**
* it will lookup for process that are dead already, eliminating leaks.
*/
- public static void cleanupProcess()
- {
- for (Process p: processes)
- {
-// if (!p.isAlive())
+ public static void cleanupProcess() {
+ for (Process p : processes) {
+ // if (!p.isAlive())
{
processes.remove(p);
}
}
}
-
-
/**
* *
- * @param logname the prefix for log output
+ *
+ * @param logname the prefix for log output
* @param location The location where this command is being executed from
- * @param hook it will finish the process upon shutdown of the VM
- * @param args The arguments being passwed to the the CLI tool
+ * @param hook it will finish the process upon shutdown of the VM
+ * @param args The arguments being passwed to the the CLI tool
* @return
* @throws Exception
*/
- public static Process build(String logname, File location, boolean hook, String... args) throws Exception
- {
+ public static Process build(String logname, File location, boolean hook, String... args) throws Exception {
boolean IS_WINDOWS = System.getProperty("os.name").toLowerCase().trim().startsWith("win");
String[] newArgs;
- if (IS_WINDOWS)
- {
+ if (IS_WINDOWS) {
newArgs = rebuildArgs(args, "cmd", "/c", "artemis.cmd");
}
- else
- {
+ else {
newArgs = rebuildArgs(args, "./artemis");
}
@@ -91,17 +80,11 @@ public class ProcessBuilder
Process process = builder.start();
- ProcessLogger outputLogger = new ProcessLogger(true,
- process.getInputStream(),
- logname,
- false);
+ ProcessLogger outputLogger = new ProcessLogger(true, process.getInputStream(), logname, false);
outputLogger.start();
// Adding a reader to System.err, so the VM won't hang on a System.err.println as identified on this forum thread:
- ProcessLogger errorLogger = new ProcessLogger(true,
- process.getErrorStream(),
- logname,
- true);
+ ProcessLogger errorLogger = new ProcessLogger(true, process.getErrorStream(), logname, true);
errorLogger.start();
processes.add(process);
@@ -111,31 +94,27 @@ public class ProcessBuilder
return process;
}
- public static String[] rebuildArgs(String[] args, String ... prefixArgs)
- {
+ public static String[] rebuildArgs(String[] args, String... prefixArgs) {
String[] resultArgs = new String[args.length + prefixArgs.length];
int i = 0;
- for (String arg: prefixArgs)
- {
+ for (String arg : prefixArgs) {
resultArgs[i++] = arg;
}
- for (String arg: args)
- {
+ for (String arg : args) {
resultArgs[i++] = arg;
}
return resultArgs;
}
-
/**
* Redirect the input stream to a logger (as debug logs)
*/
- static class ProcessLogger extends Thread
- {
+ static class ProcessLogger extends Thread {
+
private final InputStream is;
private final String logName;
@@ -149,8 +128,7 @@ public class ProcessBuilder
ProcessLogger(final boolean print,
final InputStream is,
final String logName,
- final boolean sendToErr) throws ClassNotFoundException
- {
+ final boolean sendToErr) throws ClassNotFoundException {
this.is = is;
this.print = print;
this.logName = logName;
@@ -159,30 +137,23 @@ public class ProcessBuilder
}
@Override
- public void run()
- {
- try
- {
+ public void run() {
+ try {
InputStreamReader isr = new InputStreamReader(is);
BufferedReader br = new BufferedReader(isr);
String line;
- while ((line = br.readLine()) != null)
- {
- if (print)
- {
- if (sendToErr)
- {
+ while ((line = br.readLine()) != null) {
+ if (print) {
+ if (sendToErr) {
System.err.println(logName + "-err:" + line);
}
- else
- {
+ else {
System.out.println(logName + "-out:" + line);
}
}
}
}
- catch (IOException e)
- {
+ catch (IOException e) {
// ok, stream closed
}
http://git-wip-us.apache.org/repos/asf/activemq-artemis/blob/bac96047/artemis-cli/src/main/java/org/apache/activemq/artemis/components/ExternalComponent.java
----------------------------------------------------------------------
diff --git a/artemis-cli/src/main/java/org/apache/activemq/artemis/components/ExternalComponent.java b/artemis-cli/src/main/java/org/apache/activemq/artemis/components/ExternalComponent.java
index 434fc71..4969d6a 100644
--- a/artemis-cli/src/main/java/org/apache/activemq/artemis/components/ExternalComponent.java
+++ b/artemis-cli/src/main/java/org/apache/activemq/artemis/components/ExternalComponent.java
@@ -19,7 +19,7 @@ package org.apache.activemq.artemis.components;
import org.apache.activemq.artemis.core.server.ActiveMQComponent;
import org.apache.activemq.artemis.dto.ComponentDTO;
-public interface ExternalComponent extends ActiveMQComponent
-{
+public interface ExternalComponent extends ActiveMQComponent {
+
void configure(ComponentDTO config, String artemisInstance, String artemisHome) throws Exception;
}