You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@activemq.apache.org by gt...@apache.org on 2018/09/20 13:59:17 UTC

activemq git commit: AMQ-7057 - support transport connector warnOnRemoteClose to suppress eof warnings on loadbalance or health check socket ping

Repository: activemq
Updated Branches:
  refs/heads/master 8f88dcda0 -> cdbddcafa


AMQ-7057 - support transport connector warnOnRemoteClose to suppress eof warnings on loadbalance or health check socket ping


Project: http://git-wip-us.apache.org/repos/asf/activemq/repo
Commit: http://git-wip-us.apache.org/repos/asf/activemq/commit/cdbddcaf
Tree: http://git-wip-us.apache.org/repos/asf/activemq/tree/cdbddcaf
Diff: http://git-wip-us.apache.org/repos/asf/activemq/diff/cdbddcaf

Branch: refs/heads/master
Commit: cdbddcafa237027599dfd9ae030f42898ec5299c
Parents: 8f88dcd
Author: gtully <ga...@gmail.com>
Authored: Thu Sep 20 14:58:34 2018 +0100
Committer: gtully <ga...@gmail.com>
Committed: Thu Sep 20 14:58:34 2018 +0100

----------------------------------------------------------------------
 .../activemq/broker/TransportConnection.java    |  11 +-
 .../activemq/broker/TransportConnector.java     |  16 ++-
 .../tcp/TcpTransportCloseSocketNoWarnTest.java  | 141 +++++++++++++++++++
 3 files changed, 154 insertions(+), 14 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/activemq/blob/cdbddcaf/activemq-broker/src/main/java/org/apache/activemq/broker/TransportConnection.java
----------------------------------------------------------------------
diff --git a/activemq-broker/src/main/java/org/apache/activemq/broker/TransportConnection.java b/activemq-broker/src/main/java/org/apache/activemq/broker/TransportConnection.java
index 404e126..ba1f1eb 100644
--- a/activemq-broker/src/main/java/org/apache/activemq/broker/TransportConnection.java
+++ b/activemq-broker/src/main/java/org/apache/activemq/broker/TransportConnection.java
@@ -233,20 +233,15 @@ public class TransportConnection implements Connection, Task, CommandVisitor {
             transportException.set(e);
             if (TRANSPORTLOG.isDebugEnabled()) {
                 TRANSPORTLOG.debug(this + " failed: " + e, e);
-            } else if (TRANSPORTLOG.isWarnEnabled() && !expected(e)) {
+            } else if (TRANSPORTLOG.isWarnEnabled() && !suppressed(e)) {
                 TRANSPORTLOG.warn(this + " failed: " + e);
             }
             stopAsync(e);
         }
     }
 
-    private boolean expected(IOException e) {
-        return isStomp() && ((e instanceof SocketException && e.getMessage().indexOf("reset") != -1) || e instanceof EOFException);
-    }
-
-    private boolean isStomp() {
-        URI uri = connector.getUri();
-        return uri != null && uri.getScheme() != null && uri.getScheme().indexOf("stomp") != -1;
+    private boolean suppressed(IOException e) {
+        return !connector.isWarnOnRemoteClose() && ((e instanceof SocketException && e.getMessage().indexOf("reset") != -1) || e instanceof EOFException);
     }
 
     /**

http://git-wip-us.apache.org/repos/asf/activemq/blob/cdbddcaf/activemq-broker/src/main/java/org/apache/activemq/broker/TransportConnector.java
----------------------------------------------------------------------
diff --git a/activemq-broker/src/main/java/org/apache/activemq/broker/TransportConnector.java b/activemq-broker/src/main/java/org/apache/activemq/broker/TransportConnector.java
index f8425ad..ba2a3a9 100644
--- a/activemq-broker/src/main/java/org/apache/activemq/broker/TransportConnector.java
+++ b/activemq-broker/src/main/java/org/apache/activemq/broker/TransportConnector.java
@@ -74,7 +74,7 @@ public class TransportConnector implements Connector, BrokerServiceAware {
     private int maximumProducersAllowedPerConnection = Integer.MAX_VALUE;
     private int maximumConsumersAllowedPerConnection  = Integer.MAX_VALUE;
     private PublishedAddressPolicy publishedAddressPolicy = new PublishedAddressPolicy();
-    private boolean allowLinkStealing;
+    private boolean warnOnRemoteClose = false;
 
     LinkedList<String> peerBrokers = new LinkedList<String>();
 
@@ -123,7 +123,7 @@ public class TransportConnector implements Connector, BrokerServiceAware {
         rc.setMaximumConsumersAllowedPerConnection(getMaximumConsumersAllowedPerConnection());
         rc.setMaximumProducersAllowedPerConnection(getMaximumProducersAllowedPerConnection());
         rc.setPublishedAddressPolicy(getPublishedAddressPolicy());
-        rc.setAllowLinkStealing(isAllowLinkStealing());
+        rc.setWarnOnRemoteClose(isWarnOnRemoteClose());
         return rc;
     }
 
@@ -587,10 +587,6 @@ public class TransportConnector implements Connector, BrokerServiceAware {
         return server.isAllowLinkStealing();
     }
 
-    public void setAllowLinkStealing (boolean allowLinkStealing) {
-        this.allowLinkStealing=allowLinkStealing;
-    }
-
     public boolean isAuditNetworkProducers() {
         return auditNetworkProducers;
     }
@@ -639,4 +635,12 @@ public class TransportConnector implements Connector, BrokerServiceAware {
     public void setPublishedAddressPolicy(PublishedAddressPolicy publishedAddressPolicy) {
         this.publishedAddressPolicy = publishedAddressPolicy;
     }
+
+    public boolean isWarnOnRemoteClose() {
+        return warnOnRemoteClose;
+    }
+
+    public void setWarnOnRemoteClose(boolean warnOnRemoteClose) {
+        this.warnOnRemoteClose = warnOnRemoteClose;
+    }
 }

http://git-wip-us.apache.org/repos/asf/activemq/blob/cdbddcaf/activemq-unit-tests/src/test/java/org/apache/activemq/transport/tcp/TcpTransportCloseSocketNoWarnTest.java
----------------------------------------------------------------------
diff --git a/activemq-unit-tests/src/test/java/org/apache/activemq/transport/tcp/TcpTransportCloseSocketNoWarnTest.java b/activemq-unit-tests/src/test/java/org/apache/activemq/transport/tcp/TcpTransportCloseSocketNoWarnTest.java
new file mode 100644
index 0000000..28c4c9d
--- /dev/null
+++ b/activemq-unit-tests/src/test/java/org/apache/activemq/transport/tcp/TcpTransportCloseSocketNoWarnTest.java
@@ -0,0 +1,141 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.activemq.transport.tcp;
+
+import org.apache.activemq.broker.BrokerService;
+import org.apache.activemq.broker.TransportConnection;
+import org.apache.activemq.broker.TransportConnector;
+import org.apache.activemq.util.DefaultTestAppender;
+import org.apache.log4j.Appender;
+import org.apache.log4j.Level;
+import org.apache.log4j.Logger;
+import org.apache.log4j.spi.LoggingEvent;
+import org.junit.After;
+import org.junit.Before;
+import org.junit.Test;
+import org.slf4j.LoggerFactory;
+
+import javax.net.ssl.*;
+import java.net.Socket;
+import java.net.URI;
+import java.sql.Time;
+import java.util.concurrent.CountDownLatch;
+import java.util.concurrent.TimeUnit;
+import java.util.concurrent.atomic.AtomicBoolean;
+
+import static org.junit.Assert.assertEquals;
+import static org.junit.Assert.assertTrue;
+
+public class TcpTransportCloseSocketNoWarnTest {
+
+    private static final org.slf4j.Logger LOG = LoggerFactory.getLogger(TcpTransportCloseSocketNoWarnTest.class);
+
+    public static final String KEYSTORE_TYPE = "jks";
+    public static final String PASSWORD = "password";
+    public static final String SERVER_KEYSTORE = "src/test/resources/server.keystore";
+    public static final String TRUST_KEYSTORE = "src/test/resources/client.keystore";
+
+    private BrokerService brokerService;
+
+
+    static {
+        System.setProperty("javax.net.ssl.trustStore", TRUST_KEYSTORE);
+        System.setProperty("javax.net.ssl.trustStorePassword", PASSWORD);
+        System.setProperty("javax.net.ssl.trustStoreType", KEYSTORE_TYPE);
+        System.setProperty("javax.net.ssl.keyStore", SERVER_KEYSTORE);
+        System.setProperty("javax.net.ssl.keyStorePassword", PASSWORD);
+        System.setProperty("javax.net.ssl.keyStoreType", KEYSTORE_TYPE);
+    }
+
+    final AtomicBoolean gotExceptionInLog = new AtomicBoolean();
+    Appender appender = new DefaultTestAppender() {
+        @Override
+        public void doAppend(LoggingEvent event) {
+            if (event.getLevel().equals(Level.WARN) && event.getRenderedMessage().contains("failed:")) {
+                gotExceptionInLog.set(Boolean.TRUE);
+                LOG.error("got event: " + event + ", ex:" + event.getRenderedMessage());
+                LOG.error("Event source: ", new Throwable("Here"));
+            }
+            return;
+        }
+    };
+
+    @Before
+    public void before() throws Exception {
+        brokerService = new BrokerService();
+        brokerService.setPersistent(false);
+        brokerService.setUseJmx(false);
+
+        Logger.getRootLogger().addAppender(appender);
+        Logger.getLogger(TransportConnection.class.getName() + ".Transport").setLevel(Level.WARN);
+    }
+
+    @After
+    public void after() throws Exception {
+        if (brokerService != null) {
+            brokerService.stop();
+            brokerService.waitUntilStopped();
+        }
+        Logger.getRootLogger().removeAppender(appender);
+    }
+
+    @Test(timeout = 60000)
+    public void testNoWarn() throws Exception {
+        doTest(false);
+    }
+
+    @Test(timeout = 60000)
+    public void testWarn() throws Exception {
+        doTest(true);
+    }
+
+    protected void doTest(boolean warn) throws Exception {
+        for (String protocol : new String[] {"tcp", "ssl", "stomp"}) {
+            TransportConnector transportConnector = brokerService.addConnector(protocol + "://localhost:0");
+            transportConnector.setWarnOnRemoteClose(warn);
+        }
+        this.brokerService = brokerService;
+        brokerService.start();
+        brokerService.waitUntilStarted();
+
+        for (TransportConnector transportConnector : brokerService.getTransportConnectors()) {
+            URI uri = transportConnector.getPublishableConnectURI();
+            Socket socket;
+            if (uri.getScheme().equals("ssl")) {
+                SSLSocket sslSocket = (SSLSocket) SSLSocketFactory.getDefault().createSocket("127.0.0.1", uri.getPort());
+                final CountDownLatch doneHandShake = new CountDownLatch(1);
+                sslSocket.addHandshakeCompletedListener(new HandshakeCompletedListener() {
+                    @Override
+                    public void handshakeCompleted(HandshakeCompletedEvent handshakeCompletedEvent) {
+                        doneHandShake.countDown();
+                    }
+                });
+                sslSocket.startHandshake();
+                assertTrue("handshake done", doneHandShake.await(10, TimeUnit.SECONDS));
+
+                socket = sslSocket;
+            } else {
+                socket = new Socket("127.0.0.1", uri.getPort());
+            }
+            // ensure broker gets a chance to send on the new connection
+            TimeUnit.SECONDS.sleep(1);
+            LOG.info("testing socket: " + socket);
+            socket.close();
+        }
+        assertEquals("warn|no warn in log", warn, gotExceptionInLog.get());
+    }
+}