You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@activemq.apache.org by gt...@apache.org on 2015/05/12 23:24:48 UTC

[2/3] activemq git commit: https://issues.apache.org/jira/browse/AMQ-4897 - rework with test that shows the blocking inactivity monitor task. Removed some redundant state

https://issues.apache.org/jira/browse/AMQ-4897 - rework with test that shows the blocking inactivity monitor task. Removed some redundant state


Project: http://git-wip-us.apache.org/repos/asf/activemq/repo
Commit: http://git-wip-us.apache.org/repos/asf/activemq/commit/3ef53896
Tree: http://git-wip-us.apache.org/repos/asf/activemq/tree/3ef53896
Diff: http://git-wip-us.apache.org/repos/asf/activemq/diff/3ef53896

Branch: refs/heads/master
Commit: 3ef53896917076981c505b087e9f6ff99042aa3f
Parents: d467220
Author: gtully <ga...@gmail.com>
Authored: Tue May 12 21:55:07 2015 +0100
Committer: gtully <ga...@gmail.com>
Committed: Tue May 12 22:10:57 2015 +0100

----------------------------------------------------------------------
 .../transport/failover/FailoverTransport.java   |  41 +++--
 ...adInactivityBlockWriteTimeoutClientTest.java | 152 +++++++++++++++++++
 2 files changed, 169 insertions(+), 24 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/activemq/blob/3ef53896/activemq-client/src/main/java/org/apache/activemq/transport/failover/FailoverTransport.java
----------------------------------------------------------------------
diff --git a/activemq-client/src/main/java/org/apache/activemq/transport/failover/FailoverTransport.java b/activemq-client/src/main/java/org/apache/activemq/transport/failover/FailoverTransport.java
index c9ef141..728d4b7 100755
--- a/activemq-client/src/main/java/org/apache/activemq/transport/failover/FailoverTransport.java
+++ b/activemq-client/src/main/java/org/apache/activemq/transport/failover/FailoverTransport.java
@@ -74,7 +74,6 @@ public class FailoverTransport implements CompositeTransport {
     private static final int INFINITE = -1;
     private TransportListener transportListener;
     private boolean disposed;
-    private boolean connected;
     private final CopyOnWriteArrayList<URI> uris = new CopyOnWriteArrayList<URI>();
     private final CopyOnWriteArrayList<URI> updated = new CopyOnWriteArrayList<URI>();
 
@@ -91,7 +90,6 @@ public class FailoverTransport implements CompositeTransport {
     private final TaskRunnerFactory reconnectTaskFactory;
     private final TaskRunner reconnectTask;
     private boolean started;
-    private boolean initialized;
     private long initialReconnectDelay = DEFAULT_INITIAL_RECONNECT_DELAY;
     private long maxReconnectDelay = 1000 * 30;
     private double backOffMultiplier = 2d;
@@ -198,9 +196,6 @@ public class FailoverTransport implements CompositeTransport {
                         ((Tracked) object).onResponses(command);
                     }
                 }
-                if (!initialized) {
-                    initialized = true;
-                }
 
                 if (command.isConnectionControl()) {
                     handleConnectionControl((ConnectionControl) command);
@@ -242,35 +237,35 @@ public class FailoverTransport implements CompositeTransport {
     }
 
     public final void handleTransportFailure(IOException e) throws InterruptedException {
-        synchronized (reconnectMutex) {
-            if (shuttingDown) {
-                // shutdown info sent and remote socket closed and we see that before a local close
-                // let the close do the work
-                return;
-            }
-
-            if (LOG.isTraceEnabled()) {
-                LOG.trace(this + " handleTransportFailure: " + e, e);
-            }
+        if (shuttingDown) {
+            // shutdown info sent and remote socket closed and we see that before a local close
+            // let the close do the work
+            return;
+        }
 
-            Transport transport = connectedTransport.getAndSet(null);
+        if (LOG.isTraceEnabled()) {
+            LOG.trace(this + " handleTransportFailure: " + e, e);
+        }
 
-            if (transport != null) {
+        // could be blocked in write with the reconnectMutex held, but still needs to be whacked
+        Transport transport = connectedTransport.getAndSet(null);
+        if (transport != null) {
+            disposeTransport(transport);
+        }
 
-                disposeTransport(transport);
+        synchronized (reconnectMutex) {
+            if (transport != null && connectedTransport.get() == null) {
 
                 boolean reconnectOk = false;
 
                 if (canReconnect()) {
                     reconnectOk = true;
                 }
-                LOG.warn("Transport (" + transport + ") failed"
+                 LOG.warn("Transport (" + connectedTransportURI + ") failed"
                         + (reconnectOk ? "," : ", not") + " attempting to automatically reconnect", e);
 
-                initialized = false;
                 failedConnectTransportURI = connectedTransportURI;
                 connectedTransportURI = null;
-                connected = false;
                 connectedToPriority = false;
 
                 if (reconnectOk) {
@@ -377,7 +372,6 @@ public class FailoverTransport implements CompositeTransport {
                 }
                 started = false;
                 disposed = true;
-                connected = false;
 
                 if (connectedTransport.get() != null) {
                     transportToStop = connectedTransport.getAndSet(null);
@@ -1067,7 +1061,6 @@ public class FailoverTransport implements CompositeTransport {
                                 LOG.info("Successfully reconnected to " + uri);
                             }
 
-                            connected = true;
                             return false;
                         } catch (Exception e) {
                             failure = e;
@@ -1262,7 +1255,7 @@ public class FailoverTransport implements CompositeTransport {
 
     @Override
     public boolean isConnected() {
-        return connected;
+        return connectedTransport.get() != null;
     }
 
     @Override

http://git-wip-us.apache.org/repos/asf/activemq/blob/3ef53896/activemq-unit-tests/src/test/java/org/apache/activemq/transport/failover/FailoverReadInactivityBlockWriteTimeoutClientTest.java
----------------------------------------------------------------------
diff --git a/activemq-unit-tests/src/test/java/org/apache/activemq/transport/failover/FailoverReadInactivityBlockWriteTimeoutClientTest.java b/activemq-unit-tests/src/test/java/org/apache/activemq/transport/failover/FailoverReadInactivityBlockWriteTimeoutClientTest.java
new file mode 100644
index 0000000..95fce82
--- /dev/null
+++ b/activemq-unit-tests/src/test/java/org/apache/activemq/transport/failover/FailoverReadInactivityBlockWriteTimeoutClientTest.java
@@ -0,0 +1,152 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.activemq.transport.failover;
+
+import java.io.IOException;
+import java.net.URI;
+import java.util.concurrent.CountDownLatch;
+import java.util.concurrent.ExecutorService;
+import java.util.concurrent.Executors;
+import java.util.concurrent.TimeUnit;
+import java.util.concurrent.atomic.AtomicInteger;
+import javax.jms.Connection;
+import javax.jms.MessageConsumer;
+import javax.jms.MessageProducer;
+import javax.jms.Session;
+import org.apache.activemq.ActiveMQConnection;
+import org.apache.activemq.ActiveMQConnectionFactory;
+import org.apache.activemq.JmsTestSupport;
+import org.apache.activemq.broker.BrokerService;
+import org.apache.activemq.command.ActiveMQQueue;
+import org.apache.activemq.store.kahadb.KahaDBPersistenceAdapter;
+import org.apache.activemq.transport.TransportListener;
+import org.apache.activemq.util.SocketProxy;
+import org.apache.activemq.util.URISupport;
+import org.apache.activemq.util.Wait;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+public class FailoverReadInactivityBlockWriteTimeoutClientTest extends JmsTestSupport {
+    private static final Logger LOG = LoggerFactory.getLogger(FailoverReadInactivityBlockWriteTimeoutClientTest.class);
+
+    protected BrokerService createBroker() throws Exception {
+        BrokerService broker =  new BrokerService();
+        broker.setDeleteAllMessagesOnStartup(true);
+        KahaDBPersistenceAdapter adapter = new KahaDBPersistenceAdapter();
+        adapter.setConcurrentStoreAndDispatchQueues(false);
+        broker.setPersistenceAdapter(adapter);
+        broker.addConnector("tcp://localhost:0?wireFormat.maxInactivityDuration=0");
+        return broker;
+    }
+
+    public void testBlockedFailoverSendWillReactToReadInactivityTimeout() throws Exception {
+        final ActiveMQQueue dest = new ActiveMQQueue("testClientWriteTimeout");
+        messageTextPrefix = initMessagePrefix(80*1024);
+
+        URI tcpBrokerUri = URISupport.removeQuery(broker.getTransportConnectors().get(0).getConnectUri());
+        LOG.info("consuming using uri: " + tcpBrokerUri);
+
+        ActiveMQConnectionFactory factory = new ActiveMQConnectionFactory(tcpBrokerUri);
+        Connection c = factory.createConnection();
+        c.start();
+        Session session = c.createSession(false, Session.AUTO_ACKNOWLEDGE);
+        MessageConsumer consumer = session.createConsumer(dest);
+
+        SocketProxy proxy = new SocketProxy();
+        proxy.setTarget(tcpBrokerUri);
+        proxy.open();
+
+        ActiveMQConnectionFactory pFactory = new ActiveMQConnectionFactory("failover:(" + proxy.getUrl() + "?wireFormat.maxInactivityDuration=5000&ignoreRemoteWireFormat=true)?jms.useAsyncSend=true&trackMessages=true&maxCacheSize=6638400");
+        final ActiveMQConnection pc = (ActiveMQConnection) pFactory.createConnection();
+        final AtomicInteger interruptCounter = new AtomicInteger(0);
+        pc.addTransportListener(new TransportListener() {
+            @Override
+            public void onCommand(Object command) {
+
+            }
+
+            @Override
+            public void onException(IOException error) {
+                LOG.info("Got: " + error);
+
+            }
+
+            @Override
+            public void transportInterupted() {
+                interruptCounter.incrementAndGet();
+            }
+
+            @Override
+            public void transportResumed() {
+
+            }
+        });
+        pc.start();
+
+
+        final int messageCount = 200;
+        final CountDownLatch sentOne = new CountDownLatch(1);
+        ExecutorService executorService = Executors.newCachedThreadPool();
+        executorService.execute(new Runnable() {
+            @Override
+            public void run() {
+                try{
+                    Session session = pc.createSession(false, Session.AUTO_ACKNOWLEDGE);
+                    MessageProducer producer = session.createProducer(dest);
+                    for (int i = 0; i < messageCount; i++) {
+                        producer.send(session.createTextMessage(messageTextPrefix  + i));
+                        sentOne.countDown();
+                    }
+                    producer.close();
+                    session.close();
+                    LOG.info("Done with send of: " + messageCount);
+                } catch (Exception ignored) {
+                    ignored.printStackTrace();
+                }
+            }
+        });
+
+        sentOne.await(5, TimeUnit.SECONDS);
+        proxy.pause();
+
+        assertTrue("Got interrupted", Wait.waitFor(new Wait.Condition() {
+            @Override
+            public boolean isSatisified() throws Exception {
+                return interruptCounter.get() > 0;
+            }
+        }));
+
+        proxy.goOn();
+        for (int i=0; i<messageCount; i++) {
+            assertNotNull("Got message " + i  + " after reconnect", consumer.receive(5000));
+        }
+
+        assertTrue("no pending messages when done", Wait.waitFor(new Wait.Condition() {
+            @Override
+            public boolean isSatisified() throws Exception {
+
+                LOG.info("current total message count: " + broker.getAdminView().getTotalMessageCount());
+                return broker.getAdminView().getTotalMessageCount() == 0;
+            }
+        }));
+    }
+
+    private String initMessagePrefix(int i) {
+        byte[] content = new byte[i];
+        return new String(content);
+    }
+}
\ No newline at end of file