You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@activemq.apache.org by ha...@apache.org on 2014/12/18 04:17:34 UTC

[1/8] activemq git commit: https://issues.apache.org/jira/browse/AMQ-5352

Repository: activemq
Updated Branches:
  refs/heads/activemq-5.10.x 22f2f3dde -> e3d218a97


https://issues.apache.org/jira/browse/AMQ-5352

Applied and tested, all tests still passing after this change.


Project: http://git-wip-us.apache.org/repos/asf/activemq/repo
Commit: http://git-wip-us.apache.org/repos/asf/activemq/commit/d7e65a3c
Tree: http://git-wip-us.apache.org/repos/asf/activemq/tree/d7e65a3c
Diff: http://git-wip-us.apache.org/repos/asf/activemq/diff/d7e65a3c

Branch: refs/heads/activemq-5.10.x
Commit: d7e65a3c1962cf29292c38f9e43b6c6b0399e17d
Parents: 22f2f3d
Author: Timothy Bish <ta...@gmail.com>
Authored: Tue Sep 9 14:19:39 2014 -0400
Committer: Hadrian Zbarcea <ha...@apache.org>
Committed: Wed Dec 17 21:42:50 2014 -0500

----------------------------------------------------------------------
 .../activemq/transport/amqp/AmqpProtocolConverter.java | 13 +++++++++++--
 1 file changed, 11 insertions(+), 2 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/activemq/blob/d7e65a3c/activemq-amqp/src/main/java/org/apache/activemq/transport/amqp/AmqpProtocolConverter.java
----------------------------------------------------------------------
diff --git a/activemq-amqp/src/main/java/org/apache/activemq/transport/amqp/AmqpProtocolConverter.java b/activemq-amqp/src/main/java/org/apache/activemq/transport/amqp/AmqpProtocolConverter.java
index df8509d..69bf856 100644
--- a/activemq-amqp/src/main/java/org/apache/activemq/transport/amqp/AmqpProtocolConverter.java
+++ b/activemq-amqp/src/main/java/org/apache/activemq/transport/amqp/AmqpProtocolConverter.java
@@ -610,7 +610,7 @@ class AmqpProtocolConverter implements IAmqpProtocolConverter {
 
                 LOG.trace("Inbound Message:{} from Producer:{}", message.getMessageId(), producerId + ":" + messageId.getProducerSequenceId());
 
-                DeliveryState remoteState = delivery.getRemoteState();
+                final DeliveryState remoteState = delivery.getRemoteState();
                 if (remoteState != null && remoteState instanceof TransactionalState) {
                     TransactionalState s = (TransactionalState) remoteState;
                     long txid = toLong(s.getTxnId());
@@ -648,7 +648,16 @@ class AmqpProtocolConverter implements IAmqpProtocolConverter {
                                     receiver.flow(prefetch - receiver.getCredit());
                                 }
 
-                                delivery.disposition(Accepted.getInstance());
+                                if (remoteState != null && remoteState instanceof TransactionalState) {
+                                    TransactionalState txAccepted = new TransactionalState();
+                                    txAccepted.setOutcome(Accepted.getInstance());
+                                    txAccepted.setTxnId(((TransactionalState) remoteState).getTxnId());
+
+                                    delivery.disposition(txAccepted);
+                                } else {
+                                    delivery.disposition(Accepted.getInstance());
+                                }
+
                                 delivery.settle();
                             }
 


[8/8] activemq git commit: https://issues.apache.org/jira/browse/AMQ-5381

Posted by ha...@apache.org.
https://issues.apache.org/jira/browse/AMQ-5381

Apply fix and test for error when restoring old content and treating it
as compressed when it was not.


Project: http://git-wip-us.apache.org/repos/asf/activemq/repo
Commit: http://git-wip-us.apache.org/repos/asf/activemq/commit/e3d218a9
Tree: http://git-wip-us.apache.org/repos/asf/activemq/tree/e3d218a9
Diff: http://git-wip-us.apache.org/repos/asf/activemq/diff/e3d218a9

Branch: refs/heads/activemq-5.10.x
Commit: e3d218a97ab3f502366b7cc1fc62fc212a5591c5
Parents: c52c4ed
Author: Timothy Bish <ta...@gmail.com>
Authored: Tue Oct 21 15:52:10 2014 -0400
Committer: Hadrian Zbarcea <ha...@apache.org>
Committed: Wed Dec 17 22:00:52 2014 -0500

----------------------------------------------------------------------
 .../activemq/command/ActiveMQBytesMessage.java  |  12 +-
 .../org/apache/activemq/bugs/AMQ5381Test.java   | 182 +++++++++++++++++++
 2 files changed, 185 insertions(+), 9 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/activemq/blob/e3d218a9/activemq-client/src/main/java/org/apache/activemq/command/ActiveMQBytesMessage.java
----------------------------------------------------------------------
diff --git a/activemq-client/src/main/java/org/apache/activemq/command/ActiveMQBytesMessage.java b/activemq-client/src/main/java/org/apache/activemq/command/ActiveMQBytesMessage.java
index 65e1036..8806028 100755
--- a/activemq-client/src/main/java/org/apache/activemq/command/ActiveMQBytesMessage.java
+++ b/activemq-client/src/main/java/org/apache/activemq/command/ActiveMQBytesMessage.java
@@ -19,15 +19,12 @@ package org.apache.activemq.command;
 import java.io.DataInputStream;
 import java.io.DataOutputStream;
 import java.io.EOFException;
-import java.io.FilterOutputStream;
 import java.io.IOException;
 import java.io.InputStream;
 import java.io.OutputStream;
 import java.util.Arrays;
 import java.util.zip.Deflater;
-import java.util.zip.DeflaterOutputStream;
 import java.util.zip.Inflater;
-import java.util.zip.InflaterInputStream;
 
 import javax.jms.BytesMessage;
 import javax.jms.JMSException;
@@ -131,7 +128,9 @@ public class ActiveMQBytesMessage extends ActiveMQMessage implements BytesMessag
                 dataOut.close();
                 ByteSequence bs = bytesOut.toByteSequence();
                 setContent(bs);
-                if (compressed) {
+
+                ActiveMQConnection connection = getConnection();
+                if (connection != null && connection.isUseCompression()) {
                     doCompress();
                 }
             } catch (IOException ioe) {
@@ -834,11 +833,6 @@ public class ActiveMQBytesMessage extends ActiveMQMessage implements BytesMessag
             this.dataOut = new DataOutputStream(os);
         }
 
-        ActiveMQConnection connection = getConnection();
-        if (connection != null && connection.isUseCompression()) {
-            compressed = true;
-        }
-
         restoreOldContent();
     }
 

http://git-wip-us.apache.org/repos/asf/activemq/blob/e3d218a9/activemq-unit-tests/src/test/java/org/apache/activemq/bugs/AMQ5381Test.java
----------------------------------------------------------------------
diff --git a/activemq-unit-tests/src/test/java/org/apache/activemq/bugs/AMQ5381Test.java b/activemq-unit-tests/src/test/java/org/apache/activemq/bugs/AMQ5381Test.java
new file mode 100644
index 0000000..ff10b0d
--- /dev/null
+++ b/activemq-unit-tests/src/test/java/org/apache/activemq/bugs/AMQ5381Test.java
@@ -0,0 +1,182 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.activemq.bugs;
+
+import java.io.PrintWriter;
+import java.io.StringWriter;
+import java.util.Arrays;
+import java.util.Random;
+
+import javax.jms.Connection;
+import javax.jms.JMSException;
+import javax.jms.MessageConsumer;
+import javax.jms.MessageProducer;
+import javax.jms.Queue;
+import javax.jms.Session;
+
+import org.apache.activemq.ActiveMQConnection;
+import org.apache.activemq.ActiveMQConnectionFactory;
+import org.apache.activemq.broker.BrokerService;
+import org.apache.activemq.command.ActiveMQBytesMessage;
+import org.apache.activemq.command.ActiveMQMessage;
+import org.junit.After;
+import org.junit.Assert;
+import org.junit.Before;
+import org.junit.Rule;
+import org.junit.Test;
+import org.junit.rules.TestName;
+
+public class AMQ5381Test {
+
+    public static final byte[] ORIG_MSG_CONTENT = randomByteArray();
+    public static final String AMQ5381_EXCEPTION_MESSAGE = "java.util.zip.DataFormatException: incorrect header check";
+
+    private BrokerService brokerService;
+    private String brokerURI;
+
+    @Rule public TestName name = new TestName();
+
+    @Before
+    public void startBroker() throws Exception {
+        brokerService = new BrokerService();
+        brokerService.setPersistent(false);
+        brokerService.setUseJmx(false);
+        brokerService.addConnector("tcp://localhost:0");
+        brokerService.start();
+        brokerService.waitUntilStarted();
+
+        brokerURI = brokerService.getTransportConnectorByScheme("tcp").getPublishableConnectString();
+    }
+
+    @After
+    public void stopBroker() throws Exception {
+        if (brokerService != null) {
+            brokerService.stop();
+        }
+    }
+
+    private ActiveMQConnection createConnection(boolean useCompression) throws Exception {
+        ActiveMQConnectionFactory factory = new ActiveMQConnectionFactory(brokerURI);
+        factory.setUseCompression(useCompression);
+        Connection connection = factory.createConnection();
+        connection.start();
+        return (ActiveMQConnection) connection;
+    }
+
+    @Test
+    public void amq5381Test() throws Exception {
+
+        // Consumer Configured for (useCompression=true)
+        final ActiveMQConnection consumerConnection = createConnection(true);
+        final Session consumerSession = consumerConnection.createSession(false, Session.AUTO_ACKNOWLEDGE);
+        final Queue consumerQueue = consumerSession.createQueue(name.getMethodName());
+        final MessageConsumer consumer = consumerSession.createConsumer(consumerQueue);
+
+        // Producer Configured for (useCompression=false)
+        final ActiveMQConnection producerConnection = createConnection(false);
+        final Session producerSession = producerConnection.createSession(false, Session.AUTO_ACKNOWLEDGE);
+        final Queue producerQueue = producerSession.createQueue(name.getMethodName());
+
+        try {
+
+            final ActiveMQBytesMessage messageProduced = (ActiveMQBytesMessage) producerSession.createBytesMessage();
+            messageProduced.writeBytes(ORIG_MSG_CONTENT);
+            Assert.assertFalse(messageProduced.isReadOnlyBody());
+
+            Assert.assertFalse(
+                "Produced Message's 'compressed' flag should remain false until the message is sent (where it will be compressed, if necessary)",
+                messageProduced.isCompressed());
+
+            final MessageProducer producer = producerSession.createProducer(null);
+            producer.send(producerQueue, messageProduced);
+
+            Assert.assertEquals("Once sent, the produced Message's 'compressed' flag should match its Connection's 'useCompression' flag",
+                producerConnection.isUseCompression(), messageProduced.isCompressed());
+
+            final ActiveMQBytesMessage messageConsumed = (ActiveMQBytesMessage) consumer.receive();
+            Assert.assertNotNull(messageConsumed);
+            Assert.assertTrue("Consumed Message should be read-only", messageConsumed.isReadOnlyBody());
+            Assert.assertEquals("Consumed Message's 'compressed' flag should match the produced Message's 'compressed' flag",
+                                messageProduced.isCompressed(), messageConsumed.isCompressed());
+
+            // ensure consumed message content matches what was originally set
+            final byte[] consumedMsgContent = new byte[(int) messageConsumed.getBodyLength()];
+            messageConsumed.readBytes(consumedMsgContent);
+
+            Assert.assertTrue("Consumed Message content should match the original Message content", Arrays.equals(ORIG_MSG_CONTENT, consumedMsgContent));
+
+            // make message writable so the consumer can modify and reuse it
+            makeWritable(messageConsumed);
+
+            // modify message, attempt to trigger DataFormatException due
+            // to old incorrect compression logic
+            try {
+                messageConsumed.setStringProperty(this.getClass().getName(), "test");
+            } catch (JMSException jmsE) {
+                if (AMQ5381_EXCEPTION_MESSAGE.equals(jmsE.getMessage())) {
+                    StringWriter sw = new StringWriter();
+                    PrintWriter pw = new PrintWriter(sw);
+                    jmsE.printStackTrace(pw);
+
+                    Assert.fail("AMQ5381 Error State Achieved: attempted to decompress BytesMessage contents that are not compressed\n" + sw.toString());
+                } else {
+                    throw jmsE;
+                }
+            }
+
+            Assert.assertEquals(
+                "The consumed Message's 'compressed' flag should still match the produced Message's 'compressed' flag after it has been made writable",
+                messageProduced.isCompressed(), messageConsumed.isCompressed());
+
+            // simulate re-publishing message
+            simulatePublish(messageConsumed);
+
+            // ensure consumed message content matches what was originally set
+            final byte[] modifiedMsgContent = new byte[(int) messageConsumed.getBodyLength()];
+            messageConsumed.readBytes(modifiedMsgContent);
+
+            Assert.assertTrue(
+                "After the message properties are modified and it is re-published, its message content should still match the original message content",
+                Arrays.equals(ORIG_MSG_CONTENT, modifiedMsgContent));
+        } finally {
+            producerSession.close();
+            producerConnection.close();
+            consumerSession.close();
+            consumerConnection.close();
+        }
+    }
+
+    protected static final int MAX_RANDOM_BYTE_ARRAY_SIZE_KB = 128;
+
+    protected static byte[] randomByteArray() {
+        final Random random = new Random();
+        final byte[] byteArray = new byte[random.nextInt(MAX_RANDOM_BYTE_ARRAY_SIZE_KB * 1024)];
+        random.nextBytes(byteArray);
+
+        return byteArray;
+    }
+
+    protected static void makeWritable(final ActiveMQMessage message) {
+        message.setReadOnlyBody(false);
+        message.setReadOnlyProperties(false);
+    }
+
+    protected static void simulatePublish(final ActiveMQBytesMessage message) throws JMSException {
+        message.reset();
+        message.onSend();
+    }
+}


[2/8] activemq git commit: https://issues.apache.org/jira/browse/AMQ-5354 - patch applied with thanks

Posted by ha...@apache.org.
https://issues.apache.org/jira/browse/AMQ-5354 - patch applied with thanks


Project: http://git-wip-us.apache.org/repos/asf/activemq/repo
Commit: http://git-wip-us.apache.org/repos/asf/activemq/commit/c682f919
Tree: http://git-wip-us.apache.org/repos/asf/activemq/tree/c682f919
Diff: http://git-wip-us.apache.org/repos/asf/activemq/diff/c682f919

Branch: refs/heads/activemq-5.10.x
Commit: c682f91946c51ddcfa044aa06f41a74a4c22f83b
Parents: d7e65a3
Author: gtully <ga...@gmail.com>
Authored: Mon Sep 15 15:59:00 2014 +0100
Committer: Hadrian Zbarcea <ha...@apache.org>
Committed: Wed Dec 17 21:44:52 2014 -0500

----------------------------------------------------------------------
 .../java/org/apache/activemq/store/kahadb/MessageDatabase.java  | 5 ++++-
 1 file changed, 4 insertions(+), 1 deletion(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/activemq/blob/c682f919/activemq-kahadb-store/src/main/java/org/apache/activemq/store/kahadb/MessageDatabase.java
----------------------------------------------------------------------
diff --git a/activemq-kahadb-store/src/main/java/org/apache/activemq/store/kahadb/MessageDatabase.java b/activemq-kahadb-store/src/main/java/org/apache/activemq/store/kahadb/MessageDatabase.java
index 755f214..337e188 100644
--- a/activemq-kahadb-store/src/main/java/org/apache/activemq/store/kahadb/MessageDatabase.java
+++ b/activemq-kahadb-store/src/main/java/org/apache/activemq/store/kahadb/MessageDatabase.java
@@ -1332,13 +1332,16 @@ public abstract class MessageDatabase extends ServiceSupport implements BrokerSe
 
         Long id = sd.messageIdIndex.get(tx, command.getMessageId());
         if (id != null) {
-            sd.orderIndex.put(
+            MessageKeys previousKeys = sd.orderIndex.put(
                     tx,
                     command.getPrioritySupported() ? command.getPriority() : javax.jms.Message.DEFAULT_PRIORITY,
                     id,
                     new MessageKeys(command.getMessageId(), location)
             );
             sd.locationIndex.put(tx, location, id);
+            if(previousKeys != null) {
+                sd.locationIndex.remove(tx, previousKeys.location);
+            }
         } else {
             LOG.warn("Non existent message update attempt rejected. Destination: {}://{}, Message id: {}", command.getDestination().getType(), command.getDestination().getName(), command.getMessageId());
         }


[6/8] activemq git commit: fix deadlock that blocks remote broker start in duplex case with durable sub recreation on restart - dynamicOnly=true works around

Posted by ha...@apache.org.
fix deadlock that blocks remote broker start in duplex case with durable sub recreation on restart - dynamicOnly=true works around


Project: http://git-wip-us.apache.org/repos/asf/activemq/repo
Commit: http://git-wip-us.apache.org/repos/asf/activemq/commit/1a241606
Tree: http://git-wip-us.apache.org/repos/asf/activemq/tree/1a241606
Diff: http://git-wip-us.apache.org/repos/asf/activemq/diff/1a241606

Branch: refs/heads/activemq-5.10.x
Commit: 1a24160641c2197b3e29a3f4851ca6c01aeace25
Parents: aeecf88
Author: gtully <ga...@gmail.com>
Authored: Tue Oct 7 14:38:20 2014 +0100
Committer: Hadrian Zbarcea <ha...@apache.org>
Committed: Wed Dec 17 21:52:23 2014 -0500

----------------------------------------------------------------------
 .../network/DemandForwardingBridgeSupport.java  |  24 +-
 ...DurableSubscriberWithNetworkRestartTest.java | 231 +++++++++++++++++++
 2 files changed, 244 insertions(+), 11 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/activemq/blob/1a241606/activemq-broker/src/main/java/org/apache/activemq/network/DemandForwardingBridgeSupport.java
----------------------------------------------------------------------
diff --git a/activemq-broker/src/main/java/org/apache/activemq/network/DemandForwardingBridgeSupport.java b/activemq-broker/src/main/java/org/apache/activemq/network/DemandForwardingBridgeSupport.java
index 7d334ac..83eea31 100644
--- a/activemq-broker/src/main/java/org/apache/activemq/network/DemandForwardingBridgeSupport.java
+++ b/activemq-broker/src/main/java/org/apache/activemq/network/DemandForwardingBridgeSupport.java
@@ -427,6 +427,15 @@ public abstract class DemandForwardingBridgeSupport implements NetworkBridge, Br
             startRemoteBridge();
         } catch (Throwable e) {
             serviceRemoteException(e);
+            return;
+        }
+
+        try {
+            if (safeWaitUntilStarted()) {
+                setupStaticDestinations();
+            }
+        } catch (Throwable e) {
+            serviceLocalException(e);
         }
     }
 
@@ -509,14 +518,6 @@ public abstract class DemandForwardingBridgeSupport implements NetworkBridge, Br
                 startedLatch.countDown();
                 localStartedLatch.countDown();
             }
-
-            if (!disposed.get()) {
-                setupStaticDestinations();
-            } else {
-                LOG.warn("Network connection between {} and {} ({}) was interrupted during establishment.", new Object[]{
-                        localBroker, remoteBroker, remoteBrokerName
-                });
-            }
         }
     }
 
@@ -841,7 +842,7 @@ public abstract class DemandForwardingBridgeSupport implements NetworkBridge, Br
     }
 
     public void serviceLocalException(MessageDispatch messageDispatch, Throwable error) {
-
+        LOG.trace("serviceLocalException: disposed {} ex", disposed.get(), error);
         if (!disposed.get()) {
             if (error instanceof DestinationDoesNotExistException && ((DestinationDoesNotExistException) error).isTemporary()) {
                 // not a reason to terminate the bridge - temps can disappear with
@@ -1398,12 +1399,13 @@ public abstract class DemandForwardingBridgeSupport implements NetworkBridge, Br
      * Performs a timed wait on the started latch and then checks for disposed
      * before performing another wait each time the the started wait times out.
      */
-    protected void safeWaitUntilStarted() throws InterruptedException {
+    protected boolean safeWaitUntilStarted() throws InterruptedException {
         while (!disposed.get()) {
             if (startedLatch.await(1, TimeUnit.SECONDS)) {
-                return;
+                break;
             }
         }
+        return !disposed.get();
     }
 
     protected NetworkBridgeFilter createNetworkBridgeFilter(ConsumerInfo info) throws IOException {

http://git-wip-us.apache.org/repos/asf/activemq/blob/1a241606/activemq-unit-tests/src/test/java/org/apache/activemq/usecases/DurableSubscriberWithNetworkRestartTest.java
----------------------------------------------------------------------
diff --git a/activemq-unit-tests/src/test/java/org/apache/activemq/usecases/DurableSubscriberWithNetworkRestartTest.java b/activemq-unit-tests/src/test/java/org/apache/activemq/usecases/DurableSubscriberWithNetworkRestartTest.java
new file mode 100644
index 0000000..3799c6c
--- /dev/null
+++ b/activemq-unit-tests/src/test/java/org/apache/activemq/usecases/DurableSubscriberWithNetworkRestartTest.java
@@ -0,0 +1,231 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.activemq.usecases;
+
+import java.net.MalformedURLException;
+import java.net.URI;
+import java.util.Set;
+import javax.jms.Connection;
+import javax.jms.DeliveryMode;
+import javax.jms.Message;
+import javax.jms.MessageConsumer;
+import javax.jms.MessageProducer;
+import javax.jms.Session;
+import javax.management.ObjectName;
+import org.apache.activemq.ActiveMQConnectionFactory;
+import org.apache.activemq.JmsMultipleBrokersTestSupport;
+import org.apache.activemq.broker.BrokerService;
+import org.apache.activemq.broker.region.policy.PolicyEntry;
+import org.apache.activemq.broker.region.policy.PolicyMap;
+import org.apache.activemq.command.ActiveMQTopic;
+import org.apache.activemq.network.NetworkConnector;
+import org.apache.activemq.util.Wait;
+import org.apache.commons.logging.Log;
+import org.apache.commons.logging.LogFactory;
+
+
+import static org.junit.Assume.assumeNotNull;
+
+
+public class DurableSubscriberWithNetworkRestartTest extends JmsMultipleBrokersTestSupport {
+    private static final Log LOG = LogFactory.getLog(DurableSubscriberWithNetworkRestartTest.class);
+    private static final String HUB = "HubBroker";
+    private static final String SPOKE = "SpokeBroker";
+    protected static final int MESSAGE_COUNT = 10;
+    public boolean dynamicOnly = false;
+
+    public void testSendOnAReceiveOnBWithTransportDisconnectDynamicOnly() throws Exception {
+        dynamicOnly = true;
+        try {
+            testSendOnAReceiveOnBWithTransportDisconnect();
+        } finally {
+            dynamicOnly = false;
+        }
+    }
+
+    public void testSendOnAReceiveOnBWithTransportDisconnect() throws Exception {
+        bridge(SPOKE, HUB);
+        startAllBrokers();
+
+        verifyDuplexBridgeMbean();
+
+        // Setup connection
+        URI hubURI = brokers.get(HUB).broker.getTransportConnectors().get(0).getPublishableConnectURI();
+        URI spokeURI = brokers.get(SPOKE).broker.getTransportConnectors().get(0).getPublishableConnectURI();
+        ActiveMQConnectionFactory facHub = new ActiveMQConnectionFactory(hubURI);
+        ActiveMQConnectionFactory facSpoke = new ActiveMQConnectionFactory(spokeURI);
+        Connection conHub = facHub.createConnection();
+        Connection conSpoke = facSpoke.createConnection();
+        conHub.setClientID("clientHUB");
+        conSpoke.setClientID("clientSPOKE");
+        conHub.start();
+        conSpoke.start();
+        Session sesHub = conHub.createSession(false, Session.AUTO_ACKNOWLEDGE);
+        Session sesSpoke = conSpoke.createSession(false, Session.AUTO_ACKNOWLEDGE);
+
+        ActiveMQTopic topic = new ActiveMQTopic("TEST.FOO");
+        String consumerName = "consumerName";
+
+        // Setup consumers
+        MessageConsumer remoteConsumer = sesHub.createDurableSubscriber(topic, consumerName);
+        sleep(1000);
+        remoteConsumer.close();
+
+        // Setup producer
+        MessageProducer localProducer = sesSpoke.createProducer(topic);
+        localProducer.setDeliveryMode(DeliveryMode.PERSISTENT);
+
+        final String payloadString = new String(new byte[10*1024]);
+        // Send messages
+        for (int i = 0; i < MESSAGE_COUNT; i++) {
+            Message test = sesSpoke.createTextMessage("test-" + i);
+            test.setStringProperty("payload", payloadString);
+            localProducer.send(test);
+        }
+        localProducer.close();
+
+        final String options = "?persistent=true&useJmx=true&deleteAllMessagesOnStartup=false";
+        for (int i=0;i<2;i++) {
+            brokers.get(SPOKE).broker.stop();
+            sleep(1000);
+            createBroker(new URI("broker:(tcp://localhost:61616)/" + SPOKE + options));
+            bridge(SPOKE, HUB);
+            brokers.get(SPOKE).broker.start();
+            LOG.info("restarted spoke..:" + i);
+
+            assertTrue("got mbeans on restart", Wait.waitFor(new Wait.Condition() {
+                @Override
+                public boolean isSatisified() throws Exception {
+                    return countMbeans( brokers.get(HUB).broker, "networkBridge", 20000) == (dynamicOnly ? 1 : 2);
+                }
+            }));
+        }
+    }
+
+    private void verifyDuplexBridgeMbean() throws Exception {
+        assertEquals(1, countMbeans( brokers.get(HUB).broker, "networkBridge", 5000));
+    }
+
+    private int countMbeans(BrokerService broker, String type, int timeout) throws Exception {
+        final long expiryTime = System.currentTimeMillis() + timeout;
+
+        if (!type.contains("=")) {
+            type = type + "=*";
+        }
+
+        final ObjectName beanName = new ObjectName("org.apache.activemq:type=Broker,brokerName="
+                + broker.getBrokerName() + "," + type +",*");
+        Set<ObjectName> mbeans = null;
+        int count = 0;
+        do {
+            if (timeout > 0) {
+                Thread.sleep(100);
+            }
+
+            mbeans = broker.getManagementContext().queryNames(beanName, null);
+            if (mbeans != null) {
+                count = mbeans.size();
+                LOG.info("Found: " + count + ", matching type: " +type);
+                for (ObjectName objectName : mbeans) {
+                    LOG.info("" + objectName);
+                }
+                //} else {
+                //logAllMbeans(broker);
+            }
+        } while ((mbeans == null || mbeans.isEmpty()) && expiryTime > System.currentTimeMillis());
+
+        // If port 1099 is in use when the Broker starts, starting the jmx connector
+        // will fail.  So, if we have no mbsc to query, skip the test.
+        if (timeout > 0) {
+            assumeNotNull(mbeans);
+        }
+
+        return count;
+
+    }
+
+    private void logAllMbeans(BrokerService broker) throws MalformedURLException {
+        try {
+            // trace all existing MBeans
+            Set<?> all = broker.getManagementContext().queryNames(null, null);
+            LOG.info("Total MBean count=" + all.size());
+            for (Object o : all) {
+                //ObjectInstance bean = (ObjectInstance)o;
+                LOG.info(o);
+            }
+        } catch (Exception ignored) {
+            LOG.warn("getMBeanServer ex: " + ignored);
+        }
+    }
+
+    public NetworkConnector bridge(String from, String to) throws Exception {
+        NetworkConnector networkConnector = bridgeBrokers(from, to, dynamicOnly, -1, true);
+        networkConnector.setSuppressDuplicateQueueSubscriptions(true);
+        networkConnector.setDecreaseNetworkConsumerPriority(true);
+        networkConnector.setConsumerTTL(1);
+        networkConnector.setDuplex(true);
+        return networkConnector;
+    }
+
+    @Override
+    protected void startAllBrokers() throws Exception {
+        // Ensure HUB is started first so bridge will be active from the get go
+        BrokerItem brokerItem = brokers.get(HUB);
+        brokerItem.broker.start();
+        brokerItem = brokers.get(SPOKE);
+        brokerItem.broker.start();
+        sleep(600);
+    }
+
+    public void setUp() throws Exception {
+        super.setAutoFail(false);
+        super.setUp();
+        createBrokers(true);
+    }
+
+    private void createBrokers(boolean del) throws Exception {
+        final String options = "?persistent=true&useJmx=true&deleteAllMessagesOnStartup=" + del;
+        createBroker(new URI("broker:(tcp://localhost:61617)/" + HUB + options));
+        createBroker(new URI("broker:(tcp://localhost:61616)/" + SPOKE + options));
+    }
+
+    protected void configureBroker(BrokerService broker) {
+        broker.setKeepDurableSubsActive(false);
+        broker.getManagementContext().setCreateConnector(false);
+        PolicyMap defaultPolcyMap = new PolicyMap();
+        PolicyEntry defaultPolicy = new PolicyEntry();
+        //defaultPolicy.setUseCache(false);
+        if (broker.getBrokerName().equals(HUB)) {
+            defaultPolicy.setStoreUsageHighWaterMark(2);
+            broker.getSystemUsage().getStoreUsage().setLimit(1*1024*1024);
+        }
+        defaultPolcyMap.setDefaultEntry(defaultPolicy);
+        broker.setDestinationPolicy(defaultPolcyMap);
+        broker.getSystemUsage().getMemoryUsage().setLimit(100*1024*1024);
+    }
+
+    public void tearDown() throws Exception {
+        super.tearDown();
+    }
+
+    private void sleep(int milliSecondTime) {
+        try {
+            Thread.sleep(milliSecondTime);
+        } catch (InterruptedException igonred) {
+        }
+    }
+}


[5/8] activemq git commit: improve trace logging on failure and tidy up future - try to nail down intermittent duplex bridge half start

Posted by ha...@apache.org.
improve trace logging on failure and tidy up future - try to nail down intermittent duplex bridge half start


Project: http://git-wip-us.apache.org/repos/asf/activemq/repo
Commit: http://git-wip-us.apache.org/repos/asf/activemq/commit/aeecf888
Tree: http://git-wip-us.apache.org/repos/asf/activemq/tree/aeecf888
Diff: http://git-wip-us.apache.org/repos/asf/activemq/diff/aeecf888

Branch: refs/heads/activemq-5.10.x
Commit: aeecf8880920981afeae24f47b0e1dfdf87e3e3a
Parents: a0af997
Author: gtully <ga...@gmail.com>
Authored: Thu Sep 18 22:40:04 2014 +0100
Committer: Hadrian Zbarcea <ha...@apache.org>
Committed: Wed Dec 17 21:50:09 2014 -0500

----------------------------------------------------------------------
 .../network/DemandForwardingBridgeSupport.java         | 13 +++++++------
 1 file changed, 7 insertions(+), 6 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/activemq/blob/aeecf888/activemq-broker/src/main/java/org/apache/activemq/network/DemandForwardingBridgeSupport.java
----------------------------------------------------------------------
diff --git a/activemq-broker/src/main/java/org/apache/activemq/network/DemandForwardingBridgeSupport.java b/activemq-broker/src/main/java/org/apache/activemq/network/DemandForwardingBridgeSupport.java
index f61c5ac..7d334ac 100644
--- a/activemq-broker/src/main/java/org/apache/activemq/network/DemandForwardingBridgeSupport.java
+++ b/activemq-broker/src/main/java/org/apache/activemq/network/DemandForwardingBridgeSupport.java
@@ -347,7 +347,7 @@ public abstract class DemandForwardingBridgeSupport implements NetworkBridge, Br
         try {
             remoteBrokerInfo = futureRemoteBrokerInfo.get();
             if (remoteBrokerInfo == null) {
-                fireBridgeFailed();
+                fireBridgeFailed(new Throwable("remoteBrokerInfo is null"));
                 return;
             }
         } catch (Exception e) {
@@ -358,7 +358,7 @@ public abstract class DemandForwardingBridgeSupport implements NetworkBridge, Br
         try {
             localBrokerInfo = futureLocalBrokerInfo.get();
             if (localBrokerInfo == null) {
-                fireBridgeFailed();
+                fireBridgeFailed(new Throwable("localBrokerInfo is null"));
                 return;
             }
 
@@ -592,7 +592,7 @@ public abstract class DemandForwardingBridgeSupport implements NetworkBridge, Br
                     ServiceSupport.dispose(getControllingService());
                 }
             });
-            fireBridgeFailed();
+            fireBridgeFailed(error);
         }
     }
 
@@ -871,7 +871,7 @@ public abstract class DemandForwardingBridgeSupport implements NetworkBridge, Br
                     ServiceSupport.dispose(getControllingService());
                 }
             });
-            fireBridgeFailed();
+            fireBridgeFailed(error);
         }
     }
 
@@ -1430,7 +1430,8 @@ public abstract class DemandForwardingBridgeSupport implements NetworkBridge, Br
         this.networkBridgeListener = listener;
     }
 
-    private void fireBridgeFailed() {
+    private void fireBridgeFailed(Throwable reason) {
+        LOG.trace("fire bridge failed, listener: {}", this.networkBridgeListener, reason);
         NetworkBridgeListener l = this.networkBridgeListener;
         if (l != null && this.bridgeFailed.compareAndSet(false, true)) {
             l.bridgeFailed();
@@ -1596,7 +1597,7 @@ public abstract class DemandForwardingBridgeSupport implements NetworkBridge, Br
 
         private final CountDownLatch slot = new CountDownLatch(1);
         private final AtomicBoolean disposed;
-        private BrokerInfo info = null;
+        private volatile BrokerInfo info = null;
 
         public FutureBrokerInfo(BrokerInfo info, AtomicBoolean disposed) {
             this.info = info;


[3/8] activemq git commit: https://issues.apache.org/jira/browse/AMQ-4705 - verify lock sanity on acquire so master cannot start and immediatly fail with a keepalive failure

Posted by ha...@apache.org.
https://issues.apache.org/jira/browse/AMQ-4705 - verify lock sanity on acquire so master cannot start and immediatly fail with a keepalive failure


Project: http://git-wip-us.apache.org/repos/asf/activemq/repo
Commit: http://git-wip-us.apache.org/repos/asf/activemq/commit/a1946162
Tree: http://git-wip-us.apache.org/repos/asf/activemq/tree/a1946162
Diff: http://git-wip-us.apache.org/repos/asf/activemq/diff/a1946162

Branch: refs/heads/activemq-5.10.x
Commit: a19461627ec3903be442d272fc088df58fcd49bc
Parents: c682f91
Author: gtully <ga...@gmail.com>
Authored: Wed Sep 17 16:14:51 2014 +0100
Committer: Hadrian Zbarcea <ha...@apache.org>
Committed: Wed Dec 17 21:45:34 2014 -0500

----------------------------------------------------------------------
 .../src/main/java/org/apache/activemq/store/SharedFileLocker.java  | 2 +-
 1 file changed, 1 insertion(+), 1 deletion(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/activemq/blob/a1946162/activemq-broker/src/main/java/org/apache/activemq/store/SharedFileLocker.java
----------------------------------------------------------------------
diff --git a/activemq-broker/src/main/java/org/apache/activemq/store/SharedFileLocker.java b/activemq-broker/src/main/java/org/apache/activemq/store/SharedFileLocker.java
index e1b51df..e14eb03 100644
--- a/activemq-broker/src/main/java/org/apache/activemq/store/SharedFileLocker.java
+++ b/activemq-broker/src/main/java/org/apache/activemq/store/SharedFileLocker.java
@@ -52,7 +52,7 @@ public class SharedFileLocker extends AbstractLocker {
                 while ((!isStopped()) && (!isStopping())) {
                     try {
                         lockFile.lock();
-                        locked = true;
+                        locked = keepAlive();
                         break;
                     } catch (IOException e) {
                         LOG.info("Database "


[7/8] activemq git commit: avoid npe on network bridge failure, dispose when not started on vm transport

Posted by ha...@apache.org.
avoid npe on network bridge failure, dispose when not started on vm transport


Project: http://git-wip-us.apache.org/repos/asf/activemq/repo
Commit: http://git-wip-us.apache.org/repos/asf/activemq/commit/c52c4ed0
Tree: http://git-wip-us.apache.org/repos/asf/activemq/tree/c52c4ed0
Diff: http://git-wip-us.apache.org/repos/asf/activemq/diff/c52c4ed0

Branch: refs/heads/activemq-5.10.x
Commit: c52c4ed0f850b1b1b0e58cdfba4332d18576627e
Parents: 1a24160
Author: gtully <ga...@gmail.com>
Authored: Thu Oct 9 13:44:30 2014 +0100
Committer: Hadrian Zbarcea <ha...@apache.org>
Committed: Wed Dec 17 21:53:13 2014 -0500

----------------------------------------------------------------------
 .../main/java/org/apache/activemq/transport/vm/VMTransport.java    | 2 +-
 1 file changed, 1 insertion(+), 1 deletion(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/activemq/blob/c52c4ed0/activemq-broker/src/main/java/org/apache/activemq/transport/vm/VMTransport.java
----------------------------------------------------------------------
diff --git a/activemq-broker/src/main/java/org/apache/activemq/transport/vm/VMTransport.java b/activemq-broker/src/main/java/org/apache/activemq/transport/vm/VMTransport.java
index ba6ed13..ef1b1e2 100755
--- a/activemq-broker/src/main/java/org/apache/activemq/transport/vm/VMTransport.java
+++ b/activemq-broker/src/main/java/org/apache/activemq/transport/vm/VMTransport.java
@@ -171,7 +171,7 @@ public class VMTransport implements Transport, Task {
     public void stop() throws Exception {
         // Only need to do this once, all future oneway calls will now
         // fail as will any asnyc jobs in the task runner.
-        if (disposed.compareAndSet(false, true)) {
+        if (disposed.compareAndSet(false, true) && started.get()) {
 
             TaskRunner tr = taskRunner;
             LinkedBlockingQueue<Object> mq = this.messageQueue;


[4/8] activemq git commit: https://issues.apache.org/jira/browse/AMQ-5121 - use jmxLocal in karaf only for jmx-based commands

Posted by ha...@apache.org.
https://issues.apache.org/jira/browse/AMQ-5121 - use jmxLocal in karaf only for jmx-based commands


Project: http://git-wip-us.apache.org/repos/asf/activemq/repo
Commit: http://git-wip-us.apache.org/repos/asf/activemq/commit/a0af997b
Tree: http://git-wip-us.apache.org/repos/asf/activemq/tree/a0af997b
Diff: http://git-wip-us.apache.org/repos/asf/activemq/diff/a0af997b

Branch: refs/heads/activemq-5.10.x
Commit: a0af997baa16ea7e00ace521109de805bae08e7a
Parents: a194616
Author: Dejan Bosanac <de...@nighttale.net>
Authored: Thu Sep 18 15:25:22 2014 +0200
Committer: Hadrian Zbarcea <ha...@apache.org>
Committed: Wed Dec 17 21:49:18 2014 -0500

----------------------------------------------------------------------
 .../apache/activemq/karaf/itest/ActiveMQBrokerFeatureTest.java  | 1 +
 .../apache/activemq/karaf/commands/ActiveMQCommandSupport.java  | 5 ++++-
 2 files changed, 5 insertions(+), 1 deletion(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/activemq/blob/a0af997b/activemq-karaf-itest/src/test/java/org/apache/activemq/karaf/itest/ActiveMQBrokerFeatureTest.java
----------------------------------------------------------------------
diff --git a/activemq-karaf-itest/src/test/java/org/apache/activemq/karaf/itest/ActiveMQBrokerFeatureTest.java b/activemq-karaf-itest/src/test/java/org/apache/activemq/karaf/itest/ActiveMQBrokerFeatureTest.java
index ede449e..3015fd1 100644
--- a/activemq-karaf-itest/src/test/java/org/apache/activemq/karaf/itest/ActiveMQBrokerFeatureTest.java
+++ b/activemq-karaf-itest/src/test/java/org/apache/activemq/karaf/itest/ActiveMQBrokerFeatureTest.java
@@ -67,6 +67,7 @@ public class ActiveMQBrokerFeatureTest extends AbstractJmsFeatureTest {
         produceMessage(nameAndPayload);
 
         System.err.println(executeCommand("activemq:bstat").trim());
+        assertEquals("JMS_BODY_FIELD:JMSText = " + nameAndPayload, executeCommand("activemq:browse --amqurl tcp://localhost:61616 --user karaf --password karaf -Vbody " + nameAndPayload).trim());
 
         assertEquals("got our message", nameAndPayload, consumeMessage(nameAndPayload));
     }

http://git-wip-us.apache.org/repos/asf/activemq/blob/a0af997b/activemq-karaf/src/main/java/org/apache/activemq/karaf/commands/ActiveMQCommandSupport.java
----------------------------------------------------------------------
diff --git a/activemq-karaf/src/main/java/org/apache/activemq/karaf/commands/ActiveMQCommandSupport.java b/activemq-karaf/src/main/java/org/apache/activemq/karaf/commands/ActiveMQCommandSupport.java
index c05e7d0..58986c5 100644
--- a/activemq-karaf/src/main/java/org/apache/activemq/karaf/commands/ActiveMQCommandSupport.java
+++ b/activemq-karaf/src/main/java/org/apache/activemq/karaf/commands/ActiveMQCommandSupport.java
@@ -17,6 +17,7 @@
 package org.apache.activemq.karaf.commands;
 
 import org.apache.activemq.console.CommandContext;
+import org.apache.activemq.console.command.AbstractJmxCommand;
 import org.apache.activemq.console.command.Command;
 import org.apache.activemq.console.formatter.CommandShellOutputFormatter;
 import org.apache.felix.gogo.commands.Argument;
@@ -42,7 +43,9 @@ public class ActiveMQCommandSupport extends OsgiCommandSupport {
         try {
             currentCommand.setCommandContext(context2);
             // must be added first
-            arguments.add(0, "--jmxlocal");
+            if (command instanceof AbstractJmxCommand) {
+                arguments.add(0, "--jmxlocal");
+            }
             currentCommand.execute(arguments);
             return null;
         } catch (Throwable e) {