You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@activemq.apache.org by ma...@apache.org on 2017/03/31 14:25:21 UTC
[1/3] activemq-artemis git commit: NO-JIRA: Fixing test hunging on
OpenWire
Repository: activemq-artemis
Updated Branches:
refs/heads/master 690b8d24d -> 33fff5265
NO-JIRA: Fixing test hunging on OpenWire
Project: http://git-wip-us.apache.org/repos/asf/activemq-artemis/repo
Commit: http://git-wip-us.apache.org/repos/asf/activemq-artemis/commit/d779afe8
Tree: http://git-wip-us.apache.org/repos/asf/activemq-artemis/tree/d779afe8
Diff: http://git-wip-us.apache.org/repos/asf/activemq-artemis/diff/d779afe8
Branch: refs/heads/master
Commit: d779afe874abdeebbfc893ddbf78c2c6f49d9ef5
Parents: 690b8d2
Author: Clebert Suconic <cl...@apache.org>
Authored: Thu Mar 30 21:56:46 2017 -0400
Committer: Clebert Suconic <cl...@apache.org>
Committed: Thu Mar 30 21:56:46 2017 -0400
----------------------------------------------------------------------
.../protocol/openwire/OpenWireConnection.java | 9 ++
.../amq/ProducerFlowControlBaseTest.java | 158 +++++++++++++++++++
.../amq/ProducerFlowControlSendFailTest.java | 62 +++-----
.../openwire/amq/ProducerFlowControlTest.java | 128 +--------------
4 files changed, 190 insertions(+), 167 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/activemq-artemis/blob/d779afe8/artemis-protocols/artemis-openwire-protocol/src/main/java/org/apache/activemq/artemis/core/protocol/openwire/OpenWireConnection.java
----------------------------------------------------------------------
diff --git a/artemis-protocols/artemis-openwire-protocol/src/main/java/org/apache/activemq/artemis/core/protocol/openwire/OpenWireConnection.java b/artemis-protocols/artemis-openwire-protocol/src/main/java/org/apache/activemq/artemis/core/protocol/openwire/OpenWireConnection.java
index 60a8dca..60876b9 100644
--- a/artemis-protocols/artemis-openwire-protocol/src/main/java/org/apache/activemq/artemis/core/protocol/openwire/OpenWireConnection.java
+++ b/artemis-protocols/artemis-openwire-protocol/src/main/java/org/apache/activemq/artemis/core/protocol/openwire/OpenWireConnection.java
@@ -1371,6 +1371,11 @@ public class OpenWireConnection extends AbstractRemotingConnection implements Se
session.getCoreSession().resetTX(tx);
try {
session.send(producerInfo, messageSend, sendProducerAck);
+ } catch (Exception e) {
+ if (tx != null) {
+ tx.markAsRollbackOnly(new ActiveMQException(e.getMessage()));
+ }
+ throw e;
} finally {
session.getCoreSession().resetTX(null);
}
@@ -1387,6 +1392,10 @@ public class OpenWireConnection extends AbstractRemotingConnection implements Se
try {
AMQConsumerBrokerExchange consumerBrokerExchange = consumerExchanges.get(ack.getConsumerId());
consumerBrokerExchange.acknowledge(ack);
+ } catch (Exception e) {
+ if (tx != null) {
+ tx.markAsRollbackOnly(new ActiveMQException(e.getMessage()));
+ }
} finally {
session.getCoreSession().resetTX(null);
}
http://git-wip-us.apache.org/repos/asf/activemq-artemis/blob/d779afe8/tests/integration-tests/src/test/java/org/apache/activemq/artemis/tests/integration/openwire/amq/ProducerFlowControlBaseTest.java
----------------------------------------------------------------------
diff --git a/tests/integration-tests/src/test/java/org/apache/activemq/artemis/tests/integration/openwire/amq/ProducerFlowControlBaseTest.java b/tests/integration-tests/src/test/java/org/apache/activemq/artemis/tests/integration/openwire/amq/ProducerFlowControlBaseTest.java
new file mode 100644
index 0000000..2166201
--- /dev/null
+++ b/tests/integration-tests/src/test/java/org/apache/activemq/artemis/tests/integration/openwire/amq/ProducerFlowControlBaseTest.java
@@ -0,0 +1,158 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.activemq.artemis.tests.integration.openwire.amq;
+
+import javax.jms.DeliveryMode;
+import javax.jms.JMSException;
+import javax.jms.MessageProducer;
+import javax.jms.Session;
+import java.io.IOException;
+import java.util.Map;
+import java.util.concurrent.CountDownLatch;
+import java.util.concurrent.atomic.AtomicBoolean;
+
+import org.apache.activemq.ActiveMQConnection;
+import org.apache.activemq.artemis.core.config.Configuration;
+import org.apache.activemq.artemis.core.settings.impl.AddressFullMessagePolicy;
+import org.apache.activemq.artemis.core.settings.impl.AddressSettings;
+import org.apache.activemq.artemis.tests.integration.openwire.BasicOpenWireTest;
+import org.apache.activemq.command.ActiveMQQueue;
+import org.apache.activemq.transport.tcp.TcpTransport;
+import org.junit.After;
+import org.junit.Before;
+
+public class ProducerFlowControlBaseTest extends BasicOpenWireTest {
+ ActiveMQQueue queueA = new ActiveMQQueue("QUEUE.A");
+ ActiveMQQueue queueB = new ActiveMQQueue("QUEUE.B");
+ protected ActiveMQConnection flowControlConnection;
+ // used to test sendFailIfNoSpace on SystemUsage
+ protected final AtomicBoolean gotResourceException = new AtomicBoolean(false);
+ private Thread asyncThread = null;
+
+
+ protected void fillQueue(final ActiveMQQueue queue) throws JMSException, InterruptedException {
+ final AtomicBoolean done = new AtomicBoolean(true);
+ final AtomicBoolean keepGoing = new AtomicBoolean(true);
+
+ try {
+ // Starts an async thread that every time it publishes it sets the done
+ // flag to false.
+ // Once the send starts to block it will not reset the done flag
+ // anymore.
+ asyncThread = new Thread("Fill thread.") {
+ @Override
+ public void run() {
+ Session session = null;
+ try {
+ session = flowControlConnection.createSession(false, Session.AUTO_ACKNOWLEDGE);
+ MessageProducer producer = session.createProducer(queue);
+ producer.setDeliveryMode(DeliveryMode.NON_PERSISTENT);
+ while (keepGoing.get()) {
+ done.set(false);
+ producer.send(session.createTextMessage("Hello World"));
+ }
+ } catch (JMSException e) {
+ } finally {
+ safeClose(session);
+ }
+ }
+ };
+ asyncThread.start();
+
+ waitForBlockedOrResourceLimit(done);
+ } finally {
+ keepGoing.set(false);
+ }
+ }
+
+ protected void waitForBlockedOrResourceLimit(final AtomicBoolean done) throws InterruptedException {
+ while (true) {
+ Thread.sleep(100);
+ // the producer is blocked once the done flag stays true or there is a
+ // resource exception
+ if (done.get() || gotResourceException.get()) {
+ break;
+ }
+ done.set(true);
+ }
+ }
+
+ protected CountDownLatch asyncSendTo(final ActiveMQQueue queue, final String message) throws JMSException {
+ final CountDownLatch done = new CountDownLatch(1);
+ new Thread("Send thread.") {
+ @Override
+ public void run() {
+ Session session = null;
+ try {
+ session = flowControlConnection.createSession(false, Session.AUTO_ACKNOWLEDGE);
+ MessageProducer producer = session.createProducer(queue);
+ producer.setDeliveryMode(DeliveryMode.NON_PERSISTENT);
+ producer.send(session.createTextMessage(message));
+ done.countDown();
+ } catch (JMSException e) {
+ e.printStackTrace();
+ } finally {
+ safeClose(session);
+ }
+ }
+ }.start();
+ return done;
+ }
+
+ @Override
+ protected void extraServerConfig(Configuration serverConfig) {
+ String match = "#";
+ Map<String, AddressSettings> asMap = serverConfig.getAddressesSettings();
+ asMap.get(match).setMaxSizeBytes(1).setAddressFullMessagePolicy(AddressFullMessagePolicy.BLOCK);
+ }
+
+ @Override
+ @Before
+ public void setUp() throws Exception {
+ super.setUp();
+ this.makeSureCoreQueueExist("QUEUE.A");
+ this.makeSureCoreQueueExist("QUEUE.B");
+ }
+
+ @Override
+ @After
+ public void tearDown() throws Exception {
+ try {
+ if (flowControlConnection != null) {
+ TcpTransport t = flowControlConnection.getTransport().narrow(TcpTransport.class);
+ try {
+ flowControlConnection.getTransport().stop();
+ flowControlConnection.close();
+ } catch (Throwable ignored) {
+ // sometimes the disposed up can make the test to fail
+ // even worse I have seen this breaking every single test after this
+ // if not caught here
+ }
+ t.getTransportListener().onException(new IOException("Disposed."));
+ }
+ if (asyncThread != null) {
+ asyncThread.join();
+ asyncThread = null;
+ }
+ } finally {
+ super.tearDown();
+ }
+ }
+
+
+}
http://git-wip-us.apache.org/repos/asf/activemq-artemis/blob/d779afe8/tests/integration-tests/src/test/java/org/apache/activemq/artemis/tests/integration/openwire/amq/ProducerFlowControlSendFailTest.java
----------------------------------------------------------------------
diff --git a/tests/integration-tests/src/test/java/org/apache/activemq/artemis/tests/integration/openwire/amq/ProducerFlowControlSendFailTest.java b/tests/integration-tests/src/test/java/org/apache/activemq/artemis/tests/integration/openwire/amq/ProducerFlowControlSendFailTest.java
index baacd16..e03ae27 100644
--- a/tests/integration-tests/src/test/java/org/apache/activemq/artemis/tests/integration/openwire/amq/ProducerFlowControlSendFailTest.java
+++ b/tests/integration-tests/src/test/java/org/apache/activemq/artemis/tests/integration/openwire/amq/ProducerFlowControlSendFailTest.java
@@ -34,13 +34,14 @@ import org.apache.activemq.artemis.core.config.Configuration;
import org.apache.activemq.artemis.core.settings.impl.AddressFullMessagePolicy;
import org.apache.activemq.artemis.core.settings.impl.AddressSettings;
import org.junit.After;
+import org.junit.Assert;
import org.junit.Before;
import org.junit.Test;
/**
* adapted from: org.apache.activemq.ProducerFlowControlSendFailTest
*/
-public class ProducerFlowControlSendFailTest extends ProducerFlowControlTest {
+public class ProducerFlowControlSendFailTest extends ProducerFlowControlBaseTest {
@Override
@Before
@@ -61,20 +62,8 @@ public class ProducerFlowControlSendFailTest extends ProducerFlowControlTest {
asMap.get(match).setMaxSizeBytes(1).setAddressFullMessagePolicy(AddressFullMessagePolicy.FAIL);
}
- @Override
- public void test2ndPublisherWithStandardConnectionThatIsBlocked() throws Exception {
- // with sendFailIfNoSpace set, there is no blocking of the connection
- }
-
- @Override
- public void testAsyncPublisherRecoverAfterBlock() throws Exception {
- // sendFail means no flowControllwindow as there is no producer ack, just
- // an exception
- }
-
- @Override
@Test
- public void testPublisherRecoverAfterBlock() throws Exception {
+ public void testPublishWithTX() throws Exception {
ActiveMQConnectionFactory factory = (ActiveMQConnectionFactory) getConnectionFactory();
// with sendFail, there must be no flowControllwindow
// sendFail is an alternative flow control mechanism that does not block
@@ -82,45 +71,38 @@ public class ProducerFlowControlSendFailTest extends ProducerFlowControlTest {
this.flowControlConnection = (ActiveMQConnection) factory.createConnection();
this.flowControlConnection.start();
- final Session session = this.flowControlConnection.createSession(false, Session.CLIENT_ACKNOWLEDGE);
+ final Session session = this.flowControlConnection.createSession(true, Session.SESSION_TRANSACTED);
final MessageProducer producer = session.createProducer(queueA);
- final AtomicBoolean keepGoing = new AtomicBoolean(true);
-
- Thread thread = new Thread("Filler") {
- @Override
- public void run() {
- while (keepGoing.get()) {
- try {
- producer.send(session.createTextMessage("Test message"));
- if (gotResourceException.get()) {
- System.out.println("got exception");
- // do not flood the broker with requests when full as we
- // are sending async and they
- // will be limited by the network buffers
- Thread.sleep(200);
- }
- } catch (Exception e) {
- // with async send, there will be no exceptions
- e.printStackTrace();
- }
- }
+ int successSent = 0;
+ boolean exception = false;
+ try {
+ for (int i = 0; i < 5000; i++) {
+ producer.send(session.createTextMessage("Test message"));
+ session.commit();
+ successSent++;
}
- };
- thread.start();
- waitForBlockedOrResourceLimit(new AtomicBoolean(false));
+ } catch (Exception e) {
+ exception = true;
+ // with async send, there will be no exceptions
+ e.printStackTrace();
+ }
+
+ Assert.assertTrue(exception);
// resourceException on second message, resumption if we
// can receive 10
MessageConsumer consumer = session.createConsumer(queueA);
TextMessage msg;
- for (int idx = 0; idx < 10; ++idx) {
+ for (int idx = 0; idx < successSent; ++idx) {
msg = (TextMessage) consumer.receive(1000);
+ Assert.assertNotNull(msg);
+ System.out.println("Received " + msg);
if (msg != null) {
msg.acknowledge();
}
+ session.commit();
}
- keepGoing.set(false);
consumer.close();
}
http://git-wip-us.apache.org/repos/asf/activemq-artemis/blob/d779afe8/tests/integration-tests/src/test/java/org/apache/activemq/artemis/tests/integration/openwire/amq/ProducerFlowControlTest.java
----------------------------------------------------------------------
diff --git a/tests/integration-tests/src/test/java/org/apache/activemq/artemis/tests/integration/openwire/amq/ProducerFlowControlTest.java b/tests/integration-tests/src/test/java/org/apache/activemq/artemis/tests/integration/openwire/amq/ProducerFlowControlTest.java
index c085d0f..bde8b79 100644
--- a/tests/integration-tests/src/test/java/org/apache/activemq/artemis/tests/integration/openwire/amq/ProducerFlowControlTest.java
+++ b/tests/integration-tests/src/test/java/org/apache/activemq/artemis/tests/integration/openwire/amq/ProducerFlowControlTest.java
@@ -16,40 +16,22 @@
*/
package org.apache.activemq.artemis.tests.integration.openwire.amq;
-import javax.jms.DeliveryMode;
import javax.jms.JMSException;
import javax.jms.MessageConsumer;
import javax.jms.MessageProducer;
import javax.jms.Session;
import javax.jms.TextMessage;
-import java.io.IOException;
-import java.util.Map;
import java.util.concurrent.CountDownLatch;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.atomic.AtomicBoolean;
import org.apache.activemq.ActiveMQConnection;
-import org.apache.activemq.artemis.core.config.Configuration;
-import org.apache.activemq.artemis.core.settings.impl.AddressFullMessagePolicy;
-import org.apache.activemq.artemis.core.settings.impl.AddressSettings;
-import org.apache.activemq.artemis.tests.integration.openwire.BasicOpenWireTest;
-import org.apache.activemq.command.ActiveMQQueue;
-import org.apache.activemq.transport.tcp.TcpTransport;
-import org.junit.After;
-import org.junit.Before;
import org.junit.Test;
/**
* adapted from: org.apache.activemq.ProducerFlowControlTest
*/
-public class ProducerFlowControlTest extends BasicOpenWireTest {
-
- ActiveMQQueue queueA = new ActiveMQQueue("QUEUE.A");
- ActiveMQQueue queueB = new ActiveMQQueue("QUEUE.B");
- protected ActiveMQConnection flowControlConnection;
- // used to test sendFailIfNoSpace on SystemUsage
- protected final AtomicBoolean gotResourceException = new AtomicBoolean(false);
- private Thread asyncThread = null;
+public class ProducerFlowControlTest extends ProducerFlowControlBaseTest {
@Test
public void test2ndPublisherWithProducerWindowSendConnectionThatIsBlocked() throws Exception {
@@ -247,112 +229,4 @@ public class ProducerFlowControlTest extends BasicOpenWireTest {
CountDownLatch pubishDoneToQeueuB = asyncSendTo(queueB, "Message 1");
assertFalse(pubishDoneToQeueuB.await(2, TimeUnit.SECONDS));
}
-
- private void fillQueue(final ActiveMQQueue queue) throws JMSException, InterruptedException {
- final AtomicBoolean done = new AtomicBoolean(true);
- final AtomicBoolean keepGoing = new AtomicBoolean(true);
-
- // Starts an async thread that every time it publishes it sets the done
- // flag to false.
- // Once the send starts to block it will not reset the done flag
- // anymore.
- asyncThread = new Thread("Fill thread.") {
- @Override
- public void run() {
- Session session = null;
- try {
- session = flowControlConnection.createSession(false, Session.AUTO_ACKNOWLEDGE);
- MessageProducer producer = session.createProducer(queue);
- producer.setDeliveryMode(DeliveryMode.NON_PERSISTENT);
- while (keepGoing.get()) {
- done.set(false);
- producer.send(session.createTextMessage("Hello World"));
- }
- } catch (JMSException e) {
- } finally {
- safeClose(session);
- }
- }
- };
- asyncThread.start();
-
- waitForBlockedOrResourceLimit(done);
- keepGoing.set(false);
- }
-
- protected void waitForBlockedOrResourceLimit(final AtomicBoolean done) throws InterruptedException {
- while (true) {
- Thread.sleep(100);
- System.out.println("check done: " + done.get() + " ex: " + gotResourceException.get());
- // the producer is blocked once the done flag stays true or there is a
- // resource exception
- if (done.get() || gotResourceException.get()) {
- break;
- }
- done.set(true);
- }
- }
-
- private CountDownLatch asyncSendTo(final ActiveMQQueue queue, final String message) throws JMSException {
- final CountDownLatch done = new CountDownLatch(1);
- new Thread("Send thread.") {
- @Override
- public void run() {
- Session session = null;
- try {
- session = flowControlConnection.createSession(false, Session.AUTO_ACKNOWLEDGE);
- MessageProducer producer = session.createProducer(queue);
- producer.setDeliveryMode(DeliveryMode.NON_PERSISTENT);
- producer.send(session.createTextMessage(message));
- done.countDown();
- } catch (JMSException e) {
- e.printStackTrace();
- } finally {
- safeClose(session);
- }
- }
- }.start();
- return done;
- }
-
- @Override
- protected void extraServerConfig(Configuration serverConfig) {
- String match = "#";
- Map<String, AddressSettings> asMap = serverConfig.getAddressesSettings();
- asMap.get(match).setMaxSizeBytes(1).setAddressFullMessagePolicy(AddressFullMessagePolicy.BLOCK);
- }
-
- @Override
- @Before
- public void setUp() throws Exception {
- super.setUp();
- this.makeSureCoreQueueExist("QUEUE.A");
- this.makeSureCoreQueueExist("QUEUE.B");
- }
-
- @Override
- @After
- public void tearDown() throws Exception {
- try {
- if (flowControlConnection != null) {
- TcpTransport t = flowControlConnection.getTransport().narrow(TcpTransport.class);
- try {
- flowControlConnection.getTransport().stop();
- flowControlConnection.close();
- } catch (Throwable ignored) {
- // sometimes the disposed up can make the test to fail
- // even worse I have seen this breaking every single test after this
- // if not caught here
- }
- t.getTransportListener().onException(new IOException("Disposed."));
- }
- if (asyncThread != null) {
- asyncThread.join();
- asyncThread = null;
- }
- } finally {
- super.tearDown();
- }
- }
-
}
[3/3] activemq-artemis git commit: This closes #1164
Posted by ma...@apache.org.
This closes #1164
Project: http://git-wip-us.apache.org/repos/asf/activemq-artemis/repo
Commit: http://git-wip-us.apache.org/repos/asf/activemq-artemis/commit/33fff526
Tree: http://git-wip-us.apache.org/repos/asf/activemq-artemis/tree/33fff526
Diff: http://git-wip-us.apache.org/repos/asf/activemq-artemis/diff/33fff526
Branch: refs/heads/master
Commit: 33fff526518afba6f4c89a657b884be26e30c727
Parents: 690b8d2 1ca1b4b
Author: Martyn Taylor <mt...@redhat.com>
Authored: Fri Mar 31 15:24:50 2017 +0100
Committer: Martyn Taylor <mt...@redhat.com>
Committed: Fri Mar 31 15:24:50 2017 +0100
----------------------------------------------------------------------
.../protocol/openwire/OpenWireConnection.java | 9 ++
.../integration/amqp/AmqpNettyFailoverTest.java | 3 +-
.../amq/ProducerFlowControlBaseTest.java | 158 +++++++++++++++++++
.../amq/ProducerFlowControlSendFailTest.java | 62 +++-----
.../openwire/amq/ProducerFlowControlTest.java | 128 +--------------
5 files changed, 192 insertions(+), 168 deletions(-)
----------------------------------------------------------------------
[2/3] activemq-artemis git commit: ARTEMIS-1042 Commenting out test
Posted by ma...@apache.org.
ARTEMIS-1042 Commenting out test
Project: http://git-wip-us.apache.org/repos/asf/activemq-artemis/repo
Commit: http://git-wip-us.apache.org/repos/asf/activemq-artemis/commit/1ca1b4ba
Tree: http://git-wip-us.apache.org/repos/asf/activemq-artemis/tree/1ca1b4ba
Diff: http://git-wip-us.apache.org/repos/asf/activemq-artemis/diff/1ca1b4ba
Branch: refs/heads/master
Commit: 1ca1b4baf92472756085b4e9e1865265c1ed8782
Parents: d779afe
Author: Clebert Suconic <cl...@apache.org>
Authored: Thu Mar 30 22:23:20 2017 -0400
Committer: Clebert Suconic <cl...@apache.org>
Committed: Thu Mar 30 22:23:20 2017 -0400
----------------------------------------------------------------------
.../artemis/tests/integration/amqp/AmqpNettyFailoverTest.java | 3 ++-
1 file changed, 2 insertions(+), 1 deletion(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/activemq-artemis/blob/1ca1b4ba/tests/integration-tests/src/test/java/org/apache/activemq/artemis/tests/integration/amqp/AmqpNettyFailoverTest.java
----------------------------------------------------------------------
diff --git a/tests/integration-tests/src/test/java/org/apache/activemq/artemis/tests/integration/amqp/AmqpNettyFailoverTest.java b/tests/integration-tests/src/test/java/org/apache/activemq/artemis/tests/integration/amqp/AmqpNettyFailoverTest.java
index 5fb4e35..16cd7c3 100644
--- a/tests/integration-tests/src/test/java/org/apache/activemq/artemis/tests/integration/amqp/AmqpNettyFailoverTest.java
+++ b/tests/integration-tests/src/test/java/org/apache/activemq/artemis/tests/integration/amqp/AmqpNettyFailoverTest.java
@@ -45,7 +45,8 @@ public class AmqpNettyFailoverTest extends FailoverTestBase {
public static Collection getParameters() {
// these 3 are for comparison
- return Arrays.asList(new Object[][]{{"NON_SSL", 0}, {"SSL", 1}});
+ return Arrays.asList(new Object[][]{{"NON_SSL", 0}
+ /*, {"SSL", 1} */ });
}