You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@activemq.apache.org by ha...@apache.org on 2014/12/18 04:17:34 UTC
[1/8] activemq git commit:
https://issues.apache.org/jira/browse/AMQ-5352
Repository: activemq
Updated Branches:
refs/heads/activemq-5.10.x 22f2f3dde -> e3d218a97
https://issues.apache.org/jira/browse/AMQ-5352
Applied and tested, all tests still passing after this change.
Project: http://git-wip-us.apache.org/repos/asf/activemq/repo
Commit: http://git-wip-us.apache.org/repos/asf/activemq/commit/d7e65a3c
Tree: http://git-wip-us.apache.org/repos/asf/activemq/tree/d7e65a3c
Diff: http://git-wip-us.apache.org/repos/asf/activemq/diff/d7e65a3c
Branch: refs/heads/activemq-5.10.x
Commit: d7e65a3c1962cf29292c38f9e43b6c6b0399e17d
Parents: 22f2f3d
Author: Timothy Bish <ta...@gmail.com>
Authored: Tue Sep 9 14:19:39 2014 -0400
Committer: Hadrian Zbarcea <ha...@apache.org>
Committed: Wed Dec 17 21:42:50 2014 -0500
----------------------------------------------------------------------
.../activemq/transport/amqp/AmqpProtocolConverter.java | 13 +++++++++++--
1 file changed, 11 insertions(+), 2 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/activemq/blob/d7e65a3c/activemq-amqp/src/main/java/org/apache/activemq/transport/amqp/AmqpProtocolConverter.java
----------------------------------------------------------------------
diff --git a/activemq-amqp/src/main/java/org/apache/activemq/transport/amqp/AmqpProtocolConverter.java b/activemq-amqp/src/main/java/org/apache/activemq/transport/amqp/AmqpProtocolConverter.java
index df8509d..69bf856 100644
--- a/activemq-amqp/src/main/java/org/apache/activemq/transport/amqp/AmqpProtocolConverter.java
+++ b/activemq-amqp/src/main/java/org/apache/activemq/transport/amqp/AmqpProtocolConverter.java
@@ -610,7 +610,7 @@ class AmqpProtocolConverter implements IAmqpProtocolConverter {
LOG.trace("Inbound Message:{} from Producer:{}", message.getMessageId(), producerId + ":" + messageId.getProducerSequenceId());
- DeliveryState remoteState = delivery.getRemoteState();
+ final DeliveryState remoteState = delivery.getRemoteState();
if (remoteState != null && remoteState instanceof TransactionalState) {
TransactionalState s = (TransactionalState) remoteState;
long txid = toLong(s.getTxnId());
@@ -648,7 +648,16 @@ class AmqpProtocolConverter implements IAmqpProtocolConverter {
receiver.flow(prefetch - receiver.getCredit());
}
- delivery.disposition(Accepted.getInstance());
+ if (remoteState != null && remoteState instanceof TransactionalState) {
+ TransactionalState txAccepted = new TransactionalState();
+ txAccepted.setOutcome(Accepted.getInstance());
+ txAccepted.setTxnId(((TransactionalState) remoteState).getTxnId());
+
+ delivery.disposition(txAccepted);
+ } else {
+ delivery.disposition(Accepted.getInstance());
+ }
+
delivery.settle();
}
[8/8] activemq git commit:
https://issues.apache.org/jira/browse/AMQ-5381
Posted by ha...@apache.org.
https://issues.apache.org/jira/browse/AMQ-5381
Apply fix and test for error when restoring old content and treating it
as compressed when it was not.
Project: http://git-wip-us.apache.org/repos/asf/activemq/repo
Commit: http://git-wip-us.apache.org/repos/asf/activemq/commit/e3d218a9
Tree: http://git-wip-us.apache.org/repos/asf/activemq/tree/e3d218a9
Diff: http://git-wip-us.apache.org/repos/asf/activemq/diff/e3d218a9
Branch: refs/heads/activemq-5.10.x
Commit: e3d218a97ab3f502366b7cc1fc62fc212a5591c5
Parents: c52c4ed
Author: Timothy Bish <ta...@gmail.com>
Authored: Tue Oct 21 15:52:10 2014 -0400
Committer: Hadrian Zbarcea <ha...@apache.org>
Committed: Wed Dec 17 22:00:52 2014 -0500
----------------------------------------------------------------------
.../activemq/command/ActiveMQBytesMessage.java | 12 +-
.../org/apache/activemq/bugs/AMQ5381Test.java | 182 +++++++++++++++++++
2 files changed, 185 insertions(+), 9 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/activemq/blob/e3d218a9/activemq-client/src/main/java/org/apache/activemq/command/ActiveMQBytesMessage.java
----------------------------------------------------------------------
diff --git a/activemq-client/src/main/java/org/apache/activemq/command/ActiveMQBytesMessage.java b/activemq-client/src/main/java/org/apache/activemq/command/ActiveMQBytesMessage.java
index 65e1036..8806028 100755
--- a/activemq-client/src/main/java/org/apache/activemq/command/ActiveMQBytesMessage.java
+++ b/activemq-client/src/main/java/org/apache/activemq/command/ActiveMQBytesMessage.java
@@ -19,15 +19,12 @@ package org.apache.activemq.command;
import java.io.DataInputStream;
import java.io.DataOutputStream;
import java.io.EOFException;
-import java.io.FilterOutputStream;
import java.io.IOException;
import java.io.InputStream;
import java.io.OutputStream;
import java.util.Arrays;
import java.util.zip.Deflater;
-import java.util.zip.DeflaterOutputStream;
import java.util.zip.Inflater;
-import java.util.zip.InflaterInputStream;
import javax.jms.BytesMessage;
import javax.jms.JMSException;
@@ -131,7 +128,9 @@ public class ActiveMQBytesMessage extends ActiveMQMessage implements BytesMessag
dataOut.close();
ByteSequence bs = bytesOut.toByteSequence();
setContent(bs);
- if (compressed) {
+
+ ActiveMQConnection connection = getConnection();
+ if (connection != null && connection.isUseCompression()) {
doCompress();
}
} catch (IOException ioe) {
@@ -834,11 +833,6 @@ public class ActiveMQBytesMessage extends ActiveMQMessage implements BytesMessag
this.dataOut = new DataOutputStream(os);
}
- ActiveMQConnection connection = getConnection();
- if (connection != null && connection.isUseCompression()) {
- compressed = true;
- }
-
restoreOldContent();
}
http://git-wip-us.apache.org/repos/asf/activemq/blob/e3d218a9/activemq-unit-tests/src/test/java/org/apache/activemq/bugs/AMQ5381Test.java
----------------------------------------------------------------------
diff --git a/activemq-unit-tests/src/test/java/org/apache/activemq/bugs/AMQ5381Test.java b/activemq-unit-tests/src/test/java/org/apache/activemq/bugs/AMQ5381Test.java
new file mode 100644
index 0000000..ff10b0d
--- /dev/null
+++ b/activemq-unit-tests/src/test/java/org/apache/activemq/bugs/AMQ5381Test.java
@@ -0,0 +1,182 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.activemq.bugs;
+
+import java.io.PrintWriter;
+import java.io.StringWriter;
+import java.util.Arrays;
+import java.util.Random;
+
+import javax.jms.Connection;
+import javax.jms.JMSException;
+import javax.jms.MessageConsumer;
+import javax.jms.MessageProducer;
+import javax.jms.Queue;
+import javax.jms.Session;
+
+import org.apache.activemq.ActiveMQConnection;
+import org.apache.activemq.ActiveMQConnectionFactory;
+import org.apache.activemq.broker.BrokerService;
+import org.apache.activemq.command.ActiveMQBytesMessage;
+import org.apache.activemq.command.ActiveMQMessage;
+import org.junit.After;
+import org.junit.Assert;
+import org.junit.Before;
+import org.junit.Rule;
+import org.junit.Test;
+import org.junit.rules.TestName;
+
+public class AMQ5381Test {
+
+ public static final byte[] ORIG_MSG_CONTENT = randomByteArray();
+ public static final String AMQ5381_EXCEPTION_MESSAGE = "java.util.zip.DataFormatException: incorrect header check";
+
+ private BrokerService brokerService;
+ private String brokerURI;
+
+ @Rule public TestName name = new TestName();
+
+ @Before
+ public void startBroker() throws Exception {
+ brokerService = new BrokerService();
+ brokerService.setPersistent(false);
+ brokerService.setUseJmx(false);
+ brokerService.addConnector("tcp://localhost:0");
+ brokerService.start();
+ brokerService.waitUntilStarted();
+
+ brokerURI = brokerService.getTransportConnectorByScheme("tcp").getPublishableConnectString();
+ }
+
+ @After
+ public void stopBroker() throws Exception {
+ if (brokerService != null) {
+ brokerService.stop();
+ }
+ }
+
+ private ActiveMQConnection createConnection(boolean useCompression) throws Exception {
+ ActiveMQConnectionFactory factory = new ActiveMQConnectionFactory(brokerURI);
+ factory.setUseCompression(useCompression);
+ Connection connection = factory.createConnection();
+ connection.start();
+ return (ActiveMQConnection) connection;
+ }
+
+ @Test
+ public void amq5381Test() throws Exception {
+
+ // Consumer Configured for (useCompression=true)
+ final ActiveMQConnection consumerConnection = createConnection(true);
+ final Session consumerSession = consumerConnection.createSession(false, Session.AUTO_ACKNOWLEDGE);
+ final Queue consumerQueue = consumerSession.createQueue(name.getMethodName());
+ final MessageConsumer consumer = consumerSession.createConsumer(consumerQueue);
+
+ // Producer Configured for (useCompression=false)
+ final ActiveMQConnection producerConnection = createConnection(false);
+ final Session producerSession = producerConnection.createSession(false, Session.AUTO_ACKNOWLEDGE);
+ final Queue producerQueue = producerSession.createQueue(name.getMethodName());
+
+ try {
+
+ final ActiveMQBytesMessage messageProduced = (ActiveMQBytesMessage) producerSession.createBytesMessage();
+ messageProduced.writeBytes(ORIG_MSG_CONTENT);
+ Assert.assertFalse(messageProduced.isReadOnlyBody());
+
+ Assert.assertFalse(
+ "Produced Message's 'compressed' flag should remain false until the message is sent (where it will be compressed, if necessary)",
+ messageProduced.isCompressed());
+
+ final MessageProducer producer = producerSession.createProducer(null);
+ producer.send(producerQueue, messageProduced);
+
+ Assert.assertEquals("Once sent, the produced Message's 'compressed' flag should match its Connection's 'useCompression' flag",
+ producerConnection.isUseCompression(), messageProduced.isCompressed());
+
+ final ActiveMQBytesMessage messageConsumed = (ActiveMQBytesMessage) consumer.receive();
+ Assert.assertNotNull(messageConsumed);
+ Assert.assertTrue("Consumed Message should be read-only", messageConsumed.isReadOnlyBody());
+ Assert.assertEquals("Consumed Message's 'compressed' flag should match the produced Message's 'compressed' flag",
+ messageProduced.isCompressed(), messageConsumed.isCompressed());
+
+ // ensure consumed message content matches what was originally set
+ final byte[] consumedMsgContent = new byte[(int) messageConsumed.getBodyLength()];
+ messageConsumed.readBytes(consumedMsgContent);
+
+ Assert.assertTrue("Consumed Message content should match the original Message content", Arrays.equals(ORIG_MSG_CONTENT, consumedMsgContent));
+
+ // make message writable so the consumer can modify and reuse it
+ makeWritable(messageConsumed);
+
+ // modify message, attempt to trigger DataFormatException due
+ // to old incorrect compression logic
+ try {
+ messageConsumed.setStringProperty(this.getClass().getName(), "test");
+ } catch (JMSException jmsE) {
+ if (AMQ5381_EXCEPTION_MESSAGE.equals(jmsE.getMessage())) {
+ StringWriter sw = new StringWriter();
+ PrintWriter pw = new PrintWriter(sw);
+ jmsE.printStackTrace(pw);
+
+ Assert.fail("AMQ5381 Error State Achieved: attempted to decompress BytesMessage contents that are not compressed\n" + sw.toString());
+ } else {
+ throw jmsE;
+ }
+ }
+
+ Assert.assertEquals(
+ "The consumed Message's 'compressed' flag should still match the produced Message's 'compressed' flag after it has been made writable",
+ messageProduced.isCompressed(), messageConsumed.isCompressed());
+
+ // simulate re-publishing message
+ simulatePublish(messageConsumed);
+
+ // ensure consumed message content matches what was originally set
+ final byte[] modifiedMsgContent = new byte[(int) messageConsumed.getBodyLength()];
+ messageConsumed.readBytes(modifiedMsgContent);
+
+ Assert.assertTrue(
+ "After the message properties are modified and it is re-published, its message content should still match the original message content",
+ Arrays.equals(ORIG_MSG_CONTENT, modifiedMsgContent));
+ } finally {
+ producerSession.close();
+ producerConnection.close();
+ consumerSession.close();
+ consumerConnection.close();
+ }
+ }
+
+ protected static final int MAX_RANDOM_BYTE_ARRAY_SIZE_KB = 128;
+
+ protected static byte[] randomByteArray() {
+ final Random random = new Random();
+ final byte[] byteArray = new byte[random.nextInt(MAX_RANDOM_BYTE_ARRAY_SIZE_KB * 1024)];
+ random.nextBytes(byteArray);
+
+ return byteArray;
+ }
+
+ protected static void makeWritable(final ActiveMQMessage message) {
+ message.setReadOnlyBody(false);
+ message.setReadOnlyProperties(false);
+ }
+
+ protected static void simulatePublish(final ActiveMQBytesMessage message) throws JMSException {
+ message.reset();
+ message.onSend();
+ }
+}
[2/8] activemq git commit:
https://issues.apache.org/jira/browse/AMQ-5354 - patch applied with thanks
Posted by ha...@apache.org.
https://issues.apache.org/jira/browse/AMQ-5354 - patch applied with thanks
Project: http://git-wip-us.apache.org/repos/asf/activemq/repo
Commit: http://git-wip-us.apache.org/repos/asf/activemq/commit/c682f919
Tree: http://git-wip-us.apache.org/repos/asf/activemq/tree/c682f919
Diff: http://git-wip-us.apache.org/repos/asf/activemq/diff/c682f919
Branch: refs/heads/activemq-5.10.x
Commit: c682f91946c51ddcfa044aa06f41a74a4c22f83b
Parents: d7e65a3
Author: gtully <ga...@gmail.com>
Authored: Mon Sep 15 15:59:00 2014 +0100
Committer: Hadrian Zbarcea <ha...@apache.org>
Committed: Wed Dec 17 21:44:52 2014 -0500
----------------------------------------------------------------------
.../java/org/apache/activemq/store/kahadb/MessageDatabase.java | 5 ++++-
1 file changed, 4 insertions(+), 1 deletion(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/activemq/blob/c682f919/activemq-kahadb-store/src/main/java/org/apache/activemq/store/kahadb/MessageDatabase.java
----------------------------------------------------------------------
diff --git a/activemq-kahadb-store/src/main/java/org/apache/activemq/store/kahadb/MessageDatabase.java b/activemq-kahadb-store/src/main/java/org/apache/activemq/store/kahadb/MessageDatabase.java
index 755f214..337e188 100644
--- a/activemq-kahadb-store/src/main/java/org/apache/activemq/store/kahadb/MessageDatabase.java
+++ b/activemq-kahadb-store/src/main/java/org/apache/activemq/store/kahadb/MessageDatabase.java
@@ -1332,13 +1332,16 @@ public abstract class MessageDatabase extends ServiceSupport implements BrokerSe
Long id = sd.messageIdIndex.get(tx, command.getMessageId());
if (id != null) {
- sd.orderIndex.put(
+ MessageKeys previousKeys = sd.orderIndex.put(
tx,
command.getPrioritySupported() ? command.getPriority() : javax.jms.Message.DEFAULT_PRIORITY,
id,
new MessageKeys(command.getMessageId(), location)
);
sd.locationIndex.put(tx, location, id);
+ if(previousKeys != null) {
+ sd.locationIndex.remove(tx, previousKeys.location);
+ }
} else {
LOG.warn("Non existent message update attempt rejected. Destination: {}://{}, Message id: {}", command.getDestination().getType(), command.getDestination().getName(), command.getMessageId());
}
[6/8] activemq git commit: fix deadlock that blocks remote broker
start in duplex case with durable sub recreation on restart -
dynamicOnly=true works around
Posted by ha...@apache.org.
fix deadlock that blocks remote broker start in duplex case with durable sub recreation on restart - dynamicOnly=true works around
Project: http://git-wip-us.apache.org/repos/asf/activemq/repo
Commit: http://git-wip-us.apache.org/repos/asf/activemq/commit/1a241606
Tree: http://git-wip-us.apache.org/repos/asf/activemq/tree/1a241606
Diff: http://git-wip-us.apache.org/repos/asf/activemq/diff/1a241606
Branch: refs/heads/activemq-5.10.x
Commit: 1a24160641c2197b3e29a3f4851ca6c01aeace25
Parents: aeecf88
Author: gtully <ga...@gmail.com>
Authored: Tue Oct 7 14:38:20 2014 +0100
Committer: Hadrian Zbarcea <ha...@apache.org>
Committed: Wed Dec 17 21:52:23 2014 -0500
----------------------------------------------------------------------
.../network/DemandForwardingBridgeSupport.java | 24 +-
...DurableSubscriberWithNetworkRestartTest.java | 231 +++++++++++++++++++
2 files changed, 244 insertions(+), 11 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/activemq/blob/1a241606/activemq-broker/src/main/java/org/apache/activemq/network/DemandForwardingBridgeSupport.java
----------------------------------------------------------------------
diff --git a/activemq-broker/src/main/java/org/apache/activemq/network/DemandForwardingBridgeSupport.java b/activemq-broker/src/main/java/org/apache/activemq/network/DemandForwardingBridgeSupport.java
index 7d334ac..83eea31 100644
--- a/activemq-broker/src/main/java/org/apache/activemq/network/DemandForwardingBridgeSupport.java
+++ b/activemq-broker/src/main/java/org/apache/activemq/network/DemandForwardingBridgeSupport.java
@@ -427,6 +427,15 @@ public abstract class DemandForwardingBridgeSupport implements NetworkBridge, Br
startRemoteBridge();
} catch (Throwable e) {
serviceRemoteException(e);
+ return;
+ }
+
+ try {
+ if (safeWaitUntilStarted()) {
+ setupStaticDestinations();
+ }
+ } catch (Throwable e) {
+ serviceLocalException(e);
}
}
@@ -509,14 +518,6 @@ public abstract class DemandForwardingBridgeSupport implements NetworkBridge, Br
startedLatch.countDown();
localStartedLatch.countDown();
}
-
- if (!disposed.get()) {
- setupStaticDestinations();
- } else {
- LOG.warn("Network connection between {} and {} ({}) was interrupted during establishment.", new Object[]{
- localBroker, remoteBroker, remoteBrokerName
- });
- }
}
}
@@ -841,7 +842,7 @@ public abstract class DemandForwardingBridgeSupport implements NetworkBridge, Br
}
public void serviceLocalException(MessageDispatch messageDispatch, Throwable error) {
-
+ LOG.trace("serviceLocalException: disposed {} ex", disposed.get(), error);
if (!disposed.get()) {
if (error instanceof DestinationDoesNotExistException && ((DestinationDoesNotExistException) error).isTemporary()) {
// not a reason to terminate the bridge - temps can disappear with
@@ -1398,12 +1399,13 @@ public abstract class DemandForwardingBridgeSupport implements NetworkBridge, Br
* Performs a timed wait on the started latch and then checks for disposed
* before performing another wait each time the the started wait times out.
*/
- protected void safeWaitUntilStarted() throws InterruptedException {
+ protected boolean safeWaitUntilStarted() throws InterruptedException {
while (!disposed.get()) {
if (startedLatch.await(1, TimeUnit.SECONDS)) {
- return;
+ break;
}
}
+ return !disposed.get();
}
protected NetworkBridgeFilter createNetworkBridgeFilter(ConsumerInfo info) throws IOException {
http://git-wip-us.apache.org/repos/asf/activemq/blob/1a241606/activemq-unit-tests/src/test/java/org/apache/activemq/usecases/DurableSubscriberWithNetworkRestartTest.java
----------------------------------------------------------------------
diff --git a/activemq-unit-tests/src/test/java/org/apache/activemq/usecases/DurableSubscriberWithNetworkRestartTest.java b/activemq-unit-tests/src/test/java/org/apache/activemq/usecases/DurableSubscriberWithNetworkRestartTest.java
new file mode 100644
index 0000000..3799c6c
--- /dev/null
+++ b/activemq-unit-tests/src/test/java/org/apache/activemq/usecases/DurableSubscriberWithNetworkRestartTest.java
@@ -0,0 +1,231 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.activemq.usecases;
+
+import java.net.MalformedURLException;
+import java.net.URI;
+import java.util.Set;
+import javax.jms.Connection;
+import javax.jms.DeliveryMode;
+import javax.jms.Message;
+import javax.jms.MessageConsumer;
+import javax.jms.MessageProducer;
+import javax.jms.Session;
+import javax.management.ObjectName;
+import org.apache.activemq.ActiveMQConnectionFactory;
+import org.apache.activemq.JmsMultipleBrokersTestSupport;
+import org.apache.activemq.broker.BrokerService;
+import org.apache.activemq.broker.region.policy.PolicyEntry;
+import org.apache.activemq.broker.region.policy.PolicyMap;
+import org.apache.activemq.command.ActiveMQTopic;
+import org.apache.activemq.network.NetworkConnector;
+import org.apache.activemq.util.Wait;
+import org.apache.commons.logging.Log;
+import org.apache.commons.logging.LogFactory;
+
+
+import static org.junit.Assume.assumeNotNull;
+
+
+public class DurableSubscriberWithNetworkRestartTest extends JmsMultipleBrokersTestSupport {
+ private static final Log LOG = LogFactory.getLog(DurableSubscriberWithNetworkRestartTest.class);
+ private static final String HUB = "HubBroker";
+ private static final String SPOKE = "SpokeBroker";
+ protected static final int MESSAGE_COUNT = 10;
+ public boolean dynamicOnly = false;
+
+ public void testSendOnAReceiveOnBWithTransportDisconnectDynamicOnly() throws Exception {
+ dynamicOnly = true;
+ try {
+ testSendOnAReceiveOnBWithTransportDisconnect();
+ } finally {
+ dynamicOnly = false;
+ }
+ }
+
+ public void testSendOnAReceiveOnBWithTransportDisconnect() throws Exception {
+ bridge(SPOKE, HUB);
+ startAllBrokers();
+
+ verifyDuplexBridgeMbean();
+
+ // Setup connection
+ URI hubURI = brokers.get(HUB).broker.getTransportConnectors().get(0).getPublishableConnectURI();
+ URI spokeURI = brokers.get(SPOKE).broker.getTransportConnectors().get(0).getPublishableConnectURI();
+ ActiveMQConnectionFactory facHub = new ActiveMQConnectionFactory(hubURI);
+ ActiveMQConnectionFactory facSpoke = new ActiveMQConnectionFactory(spokeURI);
+ Connection conHub = facHub.createConnection();
+ Connection conSpoke = facSpoke.createConnection();
+ conHub.setClientID("clientHUB");
+ conSpoke.setClientID("clientSPOKE");
+ conHub.start();
+ conSpoke.start();
+ Session sesHub = conHub.createSession(false, Session.AUTO_ACKNOWLEDGE);
+ Session sesSpoke = conSpoke.createSession(false, Session.AUTO_ACKNOWLEDGE);
+
+ ActiveMQTopic topic = new ActiveMQTopic("TEST.FOO");
+ String consumerName = "consumerName";
+
+ // Setup consumers
+ MessageConsumer remoteConsumer = sesHub.createDurableSubscriber(topic, consumerName);
+ sleep(1000);
+ remoteConsumer.close();
+
+ // Setup producer
+ MessageProducer localProducer = sesSpoke.createProducer(topic);
+ localProducer.setDeliveryMode(DeliveryMode.PERSISTENT);
+
+ final String payloadString = new String(new byte[10*1024]);
+ // Send messages
+ for (int i = 0; i < MESSAGE_COUNT; i++) {
+ Message test = sesSpoke.createTextMessage("test-" + i);
+ test.setStringProperty("payload", payloadString);
+ localProducer.send(test);
+ }
+ localProducer.close();
+
+ final String options = "?persistent=true&useJmx=true&deleteAllMessagesOnStartup=false";
+ for (int i=0;i<2;i++) {
+ brokers.get(SPOKE).broker.stop();
+ sleep(1000);
+ createBroker(new URI("broker:(tcp://localhost:61616)/" + SPOKE + options));
+ bridge(SPOKE, HUB);
+ brokers.get(SPOKE).broker.start();
+ LOG.info("restarted spoke..:" + i);
+
+ assertTrue("got mbeans on restart", Wait.waitFor(new Wait.Condition() {
+ @Override
+ public boolean isSatisified() throws Exception {
+ return countMbeans( brokers.get(HUB).broker, "networkBridge", 20000) == (dynamicOnly ? 1 : 2);
+ }
+ }));
+ }
+ }
+
+ private void verifyDuplexBridgeMbean() throws Exception {
+ assertEquals(1, countMbeans( brokers.get(HUB).broker, "networkBridge", 5000));
+ }
+
+ private int countMbeans(BrokerService broker, String type, int timeout) throws Exception {
+ final long expiryTime = System.currentTimeMillis() + timeout;
+
+ if (!type.contains("=")) {
+ type = type + "=*";
+ }
+
+ final ObjectName beanName = new ObjectName("org.apache.activemq:type=Broker,brokerName="
+ + broker.getBrokerName() + "," + type +",*");
+ Set<ObjectName> mbeans = null;
+ int count = 0;
+ do {
+ if (timeout > 0) {
+ Thread.sleep(100);
+ }
+
+ mbeans = broker.getManagementContext().queryNames(beanName, null);
+ if (mbeans != null) {
+ count = mbeans.size();
+ LOG.info("Found: " + count + ", matching type: " +type);
+ for (ObjectName objectName : mbeans) {
+ LOG.info("" + objectName);
+ }
+ //} else {
+ //logAllMbeans(broker);
+ }
+ } while ((mbeans == null || mbeans.isEmpty()) && expiryTime > System.currentTimeMillis());
+
+ // If port 1099 is in use when the Broker starts, starting the jmx connector
+ // will fail. So, if we have no mbsc to query, skip the test.
+ if (timeout > 0) {
+ assumeNotNull(mbeans);
+ }
+
+ return count;
+
+ }
+
+ private void logAllMbeans(BrokerService broker) throws MalformedURLException {
+ try {
+ // trace all existing MBeans
+ Set<?> all = broker.getManagementContext().queryNames(null, null);
+ LOG.info("Total MBean count=" + all.size());
+ for (Object o : all) {
+ //ObjectInstance bean = (ObjectInstance)o;
+ LOG.info(o);
+ }
+ } catch (Exception ignored) {
+ LOG.warn("getMBeanServer ex: " + ignored);
+ }
+ }
+
+ public NetworkConnector bridge(String from, String to) throws Exception {
+ NetworkConnector networkConnector = bridgeBrokers(from, to, dynamicOnly, -1, true);
+ networkConnector.setSuppressDuplicateQueueSubscriptions(true);
+ networkConnector.setDecreaseNetworkConsumerPriority(true);
+ networkConnector.setConsumerTTL(1);
+ networkConnector.setDuplex(true);
+ return networkConnector;
+ }
+
+ @Override
+ protected void startAllBrokers() throws Exception {
+ // Ensure HUB is started first so bridge will be active from the get go
+ BrokerItem brokerItem = brokers.get(HUB);
+ brokerItem.broker.start();
+ brokerItem = brokers.get(SPOKE);
+ brokerItem.broker.start();
+ sleep(600);
+ }
+
+ public void setUp() throws Exception {
+ super.setAutoFail(false);
+ super.setUp();
+ createBrokers(true);
+ }
+
+ private void createBrokers(boolean del) throws Exception {
+ final String options = "?persistent=true&useJmx=true&deleteAllMessagesOnStartup=" + del;
+ createBroker(new URI("broker:(tcp://localhost:61617)/" + HUB + options));
+ createBroker(new URI("broker:(tcp://localhost:61616)/" + SPOKE + options));
+ }
+
+ protected void configureBroker(BrokerService broker) {
+ broker.setKeepDurableSubsActive(false);
+ broker.getManagementContext().setCreateConnector(false);
+ PolicyMap defaultPolcyMap = new PolicyMap();
+ PolicyEntry defaultPolicy = new PolicyEntry();
+ //defaultPolicy.setUseCache(false);
+ if (broker.getBrokerName().equals(HUB)) {
+ defaultPolicy.setStoreUsageHighWaterMark(2);
+ broker.getSystemUsage().getStoreUsage().setLimit(1*1024*1024);
+ }
+ defaultPolcyMap.setDefaultEntry(defaultPolicy);
+ broker.setDestinationPolicy(defaultPolcyMap);
+ broker.getSystemUsage().getMemoryUsage().setLimit(100*1024*1024);
+ }
+
+ public void tearDown() throws Exception {
+ super.tearDown();
+ }
+
+ private void sleep(int milliSecondTime) {
+ try {
+ Thread.sleep(milliSecondTime);
+ } catch (InterruptedException igonred) {
+ }
+ }
+}
[5/8] activemq git commit: improve trace logging on failure and tidy
up future - try to nail down intermittent duplex bridge half start
Posted by ha...@apache.org.
improve trace logging on failure and tidy up future - try to nail down intermittent duplex bridge half start
Project: http://git-wip-us.apache.org/repos/asf/activemq/repo
Commit: http://git-wip-us.apache.org/repos/asf/activemq/commit/aeecf888
Tree: http://git-wip-us.apache.org/repos/asf/activemq/tree/aeecf888
Diff: http://git-wip-us.apache.org/repos/asf/activemq/diff/aeecf888
Branch: refs/heads/activemq-5.10.x
Commit: aeecf8880920981afeae24f47b0e1dfdf87e3e3a
Parents: a0af997
Author: gtully <ga...@gmail.com>
Authored: Thu Sep 18 22:40:04 2014 +0100
Committer: Hadrian Zbarcea <ha...@apache.org>
Committed: Wed Dec 17 21:50:09 2014 -0500
----------------------------------------------------------------------
.../network/DemandForwardingBridgeSupport.java | 13 +++++++------
1 file changed, 7 insertions(+), 6 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/activemq/blob/aeecf888/activemq-broker/src/main/java/org/apache/activemq/network/DemandForwardingBridgeSupport.java
----------------------------------------------------------------------
diff --git a/activemq-broker/src/main/java/org/apache/activemq/network/DemandForwardingBridgeSupport.java b/activemq-broker/src/main/java/org/apache/activemq/network/DemandForwardingBridgeSupport.java
index f61c5ac..7d334ac 100644
--- a/activemq-broker/src/main/java/org/apache/activemq/network/DemandForwardingBridgeSupport.java
+++ b/activemq-broker/src/main/java/org/apache/activemq/network/DemandForwardingBridgeSupport.java
@@ -347,7 +347,7 @@ public abstract class DemandForwardingBridgeSupport implements NetworkBridge, Br
try {
remoteBrokerInfo = futureRemoteBrokerInfo.get();
if (remoteBrokerInfo == null) {
- fireBridgeFailed();
+ fireBridgeFailed(new Throwable("remoteBrokerInfo is null"));
return;
}
} catch (Exception e) {
@@ -358,7 +358,7 @@ public abstract class DemandForwardingBridgeSupport implements NetworkBridge, Br
try {
localBrokerInfo = futureLocalBrokerInfo.get();
if (localBrokerInfo == null) {
- fireBridgeFailed();
+ fireBridgeFailed(new Throwable("localBrokerInfo is null"));
return;
}
@@ -592,7 +592,7 @@ public abstract class DemandForwardingBridgeSupport implements NetworkBridge, Br
ServiceSupport.dispose(getControllingService());
}
});
- fireBridgeFailed();
+ fireBridgeFailed(error);
}
}
@@ -871,7 +871,7 @@ public abstract class DemandForwardingBridgeSupport implements NetworkBridge, Br
ServiceSupport.dispose(getControllingService());
}
});
- fireBridgeFailed();
+ fireBridgeFailed(error);
}
}
@@ -1430,7 +1430,8 @@ public abstract class DemandForwardingBridgeSupport implements NetworkBridge, Br
this.networkBridgeListener = listener;
}
- private void fireBridgeFailed() {
+ private void fireBridgeFailed(Throwable reason) {
+ LOG.trace("fire bridge failed, listener: {}", this.networkBridgeListener, reason);
NetworkBridgeListener l = this.networkBridgeListener;
if (l != null && this.bridgeFailed.compareAndSet(false, true)) {
l.bridgeFailed();
@@ -1596,7 +1597,7 @@ public abstract class DemandForwardingBridgeSupport implements NetworkBridge, Br
private final CountDownLatch slot = new CountDownLatch(1);
private final AtomicBoolean disposed;
- private BrokerInfo info = null;
+ private volatile BrokerInfo info = null;
public FutureBrokerInfo(BrokerInfo info, AtomicBoolean disposed) {
this.info = info;
[3/8] activemq git commit:
https://issues.apache.org/jira/browse/AMQ-4705 - verify lock sanity on
acquire so master cannot start and immediatly fail with a keepalive failure
Posted by ha...@apache.org.
https://issues.apache.org/jira/browse/AMQ-4705 - verify lock sanity on acquire so master cannot start and immediatly fail with a keepalive failure
Project: http://git-wip-us.apache.org/repos/asf/activemq/repo
Commit: http://git-wip-us.apache.org/repos/asf/activemq/commit/a1946162
Tree: http://git-wip-us.apache.org/repos/asf/activemq/tree/a1946162
Diff: http://git-wip-us.apache.org/repos/asf/activemq/diff/a1946162
Branch: refs/heads/activemq-5.10.x
Commit: a19461627ec3903be442d272fc088df58fcd49bc
Parents: c682f91
Author: gtully <ga...@gmail.com>
Authored: Wed Sep 17 16:14:51 2014 +0100
Committer: Hadrian Zbarcea <ha...@apache.org>
Committed: Wed Dec 17 21:45:34 2014 -0500
----------------------------------------------------------------------
.../src/main/java/org/apache/activemq/store/SharedFileLocker.java | 2 +-
1 file changed, 1 insertion(+), 1 deletion(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/activemq/blob/a1946162/activemq-broker/src/main/java/org/apache/activemq/store/SharedFileLocker.java
----------------------------------------------------------------------
diff --git a/activemq-broker/src/main/java/org/apache/activemq/store/SharedFileLocker.java b/activemq-broker/src/main/java/org/apache/activemq/store/SharedFileLocker.java
index e1b51df..e14eb03 100644
--- a/activemq-broker/src/main/java/org/apache/activemq/store/SharedFileLocker.java
+++ b/activemq-broker/src/main/java/org/apache/activemq/store/SharedFileLocker.java
@@ -52,7 +52,7 @@ public class SharedFileLocker extends AbstractLocker {
while ((!isStopped()) && (!isStopping())) {
try {
lockFile.lock();
- locked = true;
+ locked = keepAlive();
break;
} catch (IOException e) {
LOG.info("Database "
[7/8] activemq git commit: avoid npe on network bridge failure,
dispose when not started on vm transport
Posted by ha...@apache.org.
avoid npe on network bridge failure, dispose when not started on vm transport
Project: http://git-wip-us.apache.org/repos/asf/activemq/repo
Commit: http://git-wip-us.apache.org/repos/asf/activemq/commit/c52c4ed0
Tree: http://git-wip-us.apache.org/repos/asf/activemq/tree/c52c4ed0
Diff: http://git-wip-us.apache.org/repos/asf/activemq/diff/c52c4ed0
Branch: refs/heads/activemq-5.10.x
Commit: c52c4ed0f850b1b1b0e58cdfba4332d18576627e
Parents: 1a24160
Author: gtully <ga...@gmail.com>
Authored: Thu Oct 9 13:44:30 2014 +0100
Committer: Hadrian Zbarcea <ha...@apache.org>
Committed: Wed Dec 17 21:53:13 2014 -0500
----------------------------------------------------------------------
.../main/java/org/apache/activemq/transport/vm/VMTransport.java | 2 +-
1 file changed, 1 insertion(+), 1 deletion(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/activemq/blob/c52c4ed0/activemq-broker/src/main/java/org/apache/activemq/transport/vm/VMTransport.java
----------------------------------------------------------------------
diff --git a/activemq-broker/src/main/java/org/apache/activemq/transport/vm/VMTransport.java b/activemq-broker/src/main/java/org/apache/activemq/transport/vm/VMTransport.java
index ba6ed13..ef1b1e2 100755
--- a/activemq-broker/src/main/java/org/apache/activemq/transport/vm/VMTransport.java
+++ b/activemq-broker/src/main/java/org/apache/activemq/transport/vm/VMTransport.java
@@ -171,7 +171,7 @@ public class VMTransport implements Transport, Task {
public void stop() throws Exception {
// Only need to do this once, all future oneway calls will now
// fail as will any asnyc jobs in the task runner.
- if (disposed.compareAndSet(false, true)) {
+ if (disposed.compareAndSet(false, true) && started.get()) {
TaskRunner tr = taskRunner;
LinkedBlockingQueue<Object> mq = this.messageQueue;
[4/8] activemq git commit:
https://issues.apache.org/jira/browse/AMQ-5121 - use jmxLocal in karaf only
for jmx-based commands
Posted by ha...@apache.org.
https://issues.apache.org/jira/browse/AMQ-5121 - use jmxLocal in karaf only for jmx-based commands
Project: http://git-wip-us.apache.org/repos/asf/activemq/repo
Commit: http://git-wip-us.apache.org/repos/asf/activemq/commit/a0af997b
Tree: http://git-wip-us.apache.org/repos/asf/activemq/tree/a0af997b
Diff: http://git-wip-us.apache.org/repos/asf/activemq/diff/a0af997b
Branch: refs/heads/activemq-5.10.x
Commit: a0af997baa16ea7e00ace521109de805bae08e7a
Parents: a194616
Author: Dejan Bosanac <de...@nighttale.net>
Authored: Thu Sep 18 15:25:22 2014 +0200
Committer: Hadrian Zbarcea <ha...@apache.org>
Committed: Wed Dec 17 21:49:18 2014 -0500
----------------------------------------------------------------------
.../apache/activemq/karaf/itest/ActiveMQBrokerFeatureTest.java | 1 +
.../apache/activemq/karaf/commands/ActiveMQCommandSupport.java | 5 ++++-
2 files changed, 5 insertions(+), 1 deletion(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/activemq/blob/a0af997b/activemq-karaf-itest/src/test/java/org/apache/activemq/karaf/itest/ActiveMQBrokerFeatureTest.java
----------------------------------------------------------------------
diff --git a/activemq-karaf-itest/src/test/java/org/apache/activemq/karaf/itest/ActiveMQBrokerFeatureTest.java b/activemq-karaf-itest/src/test/java/org/apache/activemq/karaf/itest/ActiveMQBrokerFeatureTest.java
index ede449e..3015fd1 100644
--- a/activemq-karaf-itest/src/test/java/org/apache/activemq/karaf/itest/ActiveMQBrokerFeatureTest.java
+++ b/activemq-karaf-itest/src/test/java/org/apache/activemq/karaf/itest/ActiveMQBrokerFeatureTest.java
@@ -67,6 +67,7 @@ public class ActiveMQBrokerFeatureTest extends AbstractJmsFeatureTest {
produceMessage(nameAndPayload);
System.err.println(executeCommand("activemq:bstat").trim());
+ assertEquals("JMS_BODY_FIELD:JMSText = " + nameAndPayload, executeCommand("activemq:browse --amqurl tcp://localhost:61616 --user karaf --password karaf -Vbody " + nameAndPayload).trim());
assertEquals("got our message", nameAndPayload, consumeMessage(nameAndPayload));
}
http://git-wip-us.apache.org/repos/asf/activemq/blob/a0af997b/activemq-karaf/src/main/java/org/apache/activemq/karaf/commands/ActiveMQCommandSupport.java
----------------------------------------------------------------------
diff --git a/activemq-karaf/src/main/java/org/apache/activemq/karaf/commands/ActiveMQCommandSupport.java b/activemq-karaf/src/main/java/org/apache/activemq/karaf/commands/ActiveMQCommandSupport.java
index c05e7d0..58986c5 100644
--- a/activemq-karaf/src/main/java/org/apache/activemq/karaf/commands/ActiveMQCommandSupport.java
+++ b/activemq-karaf/src/main/java/org/apache/activemq/karaf/commands/ActiveMQCommandSupport.java
@@ -17,6 +17,7 @@
package org.apache.activemq.karaf.commands;
import org.apache.activemq.console.CommandContext;
+import org.apache.activemq.console.command.AbstractJmxCommand;
import org.apache.activemq.console.command.Command;
import org.apache.activemq.console.formatter.CommandShellOutputFormatter;
import org.apache.felix.gogo.commands.Argument;
@@ -42,7 +43,9 @@ public class ActiveMQCommandSupport extends OsgiCommandSupport {
try {
currentCommand.setCommandContext(context2);
// must be added first
- arguments.add(0, "--jmxlocal");
+ if (command instanceof AbstractJmxCommand) {
+ arguments.add(0, "--jmxlocal");
+ }
currentCommand.execute(arguments);
return null;
} catch (Throwable e) {