You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@activemq.apache.org by gt...@apache.org on 2018/09/20 13:59:17 UTC
activemq git commit: AMQ-7057 - support transport connector
warnOnRemoteClose to suppress eof warnings on loadbalance or health check
socket ping
Repository: activemq
Updated Branches:
refs/heads/master 8f88dcda0 -> cdbddcafa
AMQ-7057 - support transport connector warnOnRemoteClose to suppress eof warnings on loadbalance or health check socket ping
Project: http://git-wip-us.apache.org/repos/asf/activemq/repo
Commit: http://git-wip-us.apache.org/repos/asf/activemq/commit/cdbddcaf
Tree: http://git-wip-us.apache.org/repos/asf/activemq/tree/cdbddcaf
Diff: http://git-wip-us.apache.org/repos/asf/activemq/diff/cdbddcaf
Branch: refs/heads/master
Commit: cdbddcafa237027599dfd9ae030f42898ec5299c
Parents: 8f88dcd
Author: gtully <ga...@gmail.com>
Authored: Thu Sep 20 14:58:34 2018 +0100
Committer: gtully <ga...@gmail.com>
Committed: Thu Sep 20 14:58:34 2018 +0100
----------------------------------------------------------------------
.../activemq/broker/TransportConnection.java | 11 +-
.../activemq/broker/TransportConnector.java | 16 ++-
.../tcp/TcpTransportCloseSocketNoWarnTest.java | 141 +++++++++++++++++++
3 files changed, 154 insertions(+), 14 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/activemq/blob/cdbddcaf/activemq-broker/src/main/java/org/apache/activemq/broker/TransportConnection.java
----------------------------------------------------------------------
diff --git a/activemq-broker/src/main/java/org/apache/activemq/broker/TransportConnection.java b/activemq-broker/src/main/java/org/apache/activemq/broker/TransportConnection.java
index 404e126..ba1f1eb 100644
--- a/activemq-broker/src/main/java/org/apache/activemq/broker/TransportConnection.java
+++ b/activemq-broker/src/main/java/org/apache/activemq/broker/TransportConnection.java
@@ -233,20 +233,15 @@ public class TransportConnection implements Connection, Task, CommandVisitor {
transportException.set(e);
if (TRANSPORTLOG.isDebugEnabled()) {
TRANSPORTLOG.debug(this + " failed: " + e, e);
- } else if (TRANSPORTLOG.isWarnEnabled() && !expected(e)) {
+ } else if (TRANSPORTLOG.isWarnEnabled() && !suppressed(e)) {
TRANSPORTLOG.warn(this + " failed: " + e);
}
stopAsync(e);
}
}
- private boolean expected(IOException e) {
- return isStomp() && ((e instanceof SocketException && e.getMessage().indexOf("reset") != -1) || e instanceof EOFException);
- }
-
- private boolean isStomp() {
- URI uri = connector.getUri();
- return uri != null && uri.getScheme() != null && uri.getScheme().indexOf("stomp") != -1;
+ private boolean suppressed(IOException e) {
+ return !connector.isWarnOnRemoteClose() && ((e instanceof SocketException && e.getMessage().indexOf("reset") != -1) || e instanceof EOFException);
}
/**
http://git-wip-us.apache.org/repos/asf/activemq/blob/cdbddcaf/activemq-broker/src/main/java/org/apache/activemq/broker/TransportConnector.java
----------------------------------------------------------------------
diff --git a/activemq-broker/src/main/java/org/apache/activemq/broker/TransportConnector.java b/activemq-broker/src/main/java/org/apache/activemq/broker/TransportConnector.java
index f8425ad..ba2a3a9 100644
--- a/activemq-broker/src/main/java/org/apache/activemq/broker/TransportConnector.java
+++ b/activemq-broker/src/main/java/org/apache/activemq/broker/TransportConnector.java
@@ -74,7 +74,7 @@ public class TransportConnector implements Connector, BrokerServiceAware {
private int maximumProducersAllowedPerConnection = Integer.MAX_VALUE;
private int maximumConsumersAllowedPerConnection = Integer.MAX_VALUE;
private PublishedAddressPolicy publishedAddressPolicy = new PublishedAddressPolicy();
- private boolean allowLinkStealing;
+ private boolean warnOnRemoteClose = false;
LinkedList<String> peerBrokers = new LinkedList<String>();
@@ -123,7 +123,7 @@ public class TransportConnector implements Connector, BrokerServiceAware {
rc.setMaximumConsumersAllowedPerConnection(getMaximumConsumersAllowedPerConnection());
rc.setMaximumProducersAllowedPerConnection(getMaximumProducersAllowedPerConnection());
rc.setPublishedAddressPolicy(getPublishedAddressPolicy());
- rc.setAllowLinkStealing(isAllowLinkStealing());
+ rc.setWarnOnRemoteClose(isWarnOnRemoteClose());
return rc;
}
@@ -587,10 +587,6 @@ public class TransportConnector implements Connector, BrokerServiceAware {
return server.isAllowLinkStealing();
}
- public void setAllowLinkStealing (boolean allowLinkStealing) {
- this.allowLinkStealing=allowLinkStealing;
- }
-
public boolean isAuditNetworkProducers() {
return auditNetworkProducers;
}
@@ -639,4 +635,12 @@ public class TransportConnector implements Connector, BrokerServiceAware {
public void setPublishedAddressPolicy(PublishedAddressPolicy publishedAddressPolicy) {
this.publishedAddressPolicy = publishedAddressPolicy;
}
+
+ public boolean isWarnOnRemoteClose() {
+ return warnOnRemoteClose;
+ }
+
+ public void setWarnOnRemoteClose(boolean warnOnRemoteClose) {
+ this.warnOnRemoteClose = warnOnRemoteClose;
+ }
}
http://git-wip-us.apache.org/repos/asf/activemq/blob/cdbddcaf/activemq-unit-tests/src/test/java/org/apache/activemq/transport/tcp/TcpTransportCloseSocketNoWarnTest.java
----------------------------------------------------------------------
diff --git a/activemq-unit-tests/src/test/java/org/apache/activemq/transport/tcp/TcpTransportCloseSocketNoWarnTest.java b/activemq-unit-tests/src/test/java/org/apache/activemq/transport/tcp/TcpTransportCloseSocketNoWarnTest.java
new file mode 100644
index 0000000..28c4c9d
--- /dev/null
+++ b/activemq-unit-tests/src/test/java/org/apache/activemq/transport/tcp/TcpTransportCloseSocketNoWarnTest.java
@@ -0,0 +1,141 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.activemq.transport.tcp;
+
+import org.apache.activemq.broker.BrokerService;
+import org.apache.activemq.broker.TransportConnection;
+import org.apache.activemq.broker.TransportConnector;
+import org.apache.activemq.util.DefaultTestAppender;
+import org.apache.log4j.Appender;
+import org.apache.log4j.Level;
+import org.apache.log4j.Logger;
+import org.apache.log4j.spi.LoggingEvent;
+import org.junit.After;
+import org.junit.Before;
+import org.junit.Test;
+import org.slf4j.LoggerFactory;
+
+import javax.net.ssl.*;
+import java.net.Socket;
+import java.net.URI;
+import java.sql.Time;
+import java.util.concurrent.CountDownLatch;
+import java.util.concurrent.TimeUnit;
+import java.util.concurrent.atomic.AtomicBoolean;
+
+import static org.junit.Assert.assertEquals;
+import static org.junit.Assert.assertTrue;
+
+public class TcpTransportCloseSocketNoWarnTest {
+
+ private static final org.slf4j.Logger LOG = LoggerFactory.getLogger(TcpTransportCloseSocketNoWarnTest.class);
+
+ public static final String KEYSTORE_TYPE = "jks";
+ public static final String PASSWORD = "password";
+ public static final String SERVER_KEYSTORE = "src/test/resources/server.keystore";
+ public static final String TRUST_KEYSTORE = "src/test/resources/client.keystore";
+
+ private BrokerService brokerService;
+
+
+ static {
+ System.setProperty("javax.net.ssl.trustStore", TRUST_KEYSTORE);
+ System.setProperty("javax.net.ssl.trustStorePassword", PASSWORD);
+ System.setProperty("javax.net.ssl.trustStoreType", KEYSTORE_TYPE);
+ System.setProperty("javax.net.ssl.keyStore", SERVER_KEYSTORE);
+ System.setProperty("javax.net.ssl.keyStorePassword", PASSWORD);
+ System.setProperty("javax.net.ssl.keyStoreType", KEYSTORE_TYPE);
+ }
+
+ final AtomicBoolean gotExceptionInLog = new AtomicBoolean();
+ Appender appender = new DefaultTestAppender() {
+ @Override
+ public void doAppend(LoggingEvent event) {
+ if (event.getLevel().equals(Level.WARN) && event.getRenderedMessage().contains("failed:")) {
+ gotExceptionInLog.set(Boolean.TRUE);
+ LOG.error("got event: " + event + ", ex:" + event.getRenderedMessage());
+ LOG.error("Event source: ", new Throwable("Here"));
+ }
+ return;
+ }
+ };
+
+ @Before
+ public void before() throws Exception {
+ brokerService = new BrokerService();
+ brokerService.setPersistent(false);
+ brokerService.setUseJmx(false);
+
+ Logger.getRootLogger().addAppender(appender);
+ Logger.getLogger(TransportConnection.class.getName() + ".Transport").setLevel(Level.WARN);
+ }
+
+ @After
+ public void after() throws Exception {
+ if (brokerService != null) {
+ brokerService.stop();
+ brokerService.waitUntilStopped();
+ }
+ Logger.getRootLogger().removeAppender(appender);
+ }
+
+ @Test(timeout = 60000)
+ public void testNoWarn() throws Exception {
+ doTest(false);
+ }
+
+ @Test(timeout = 60000)
+ public void testWarn() throws Exception {
+ doTest(true);
+ }
+
+ protected void doTest(boolean warn) throws Exception {
+ for (String protocol : new String[] {"tcp", "ssl", "stomp"}) {
+ TransportConnector transportConnector = brokerService.addConnector(protocol + "://localhost:0");
+ transportConnector.setWarnOnRemoteClose(warn);
+ }
+ this.brokerService = brokerService;
+ brokerService.start();
+ brokerService.waitUntilStarted();
+
+ for (TransportConnector transportConnector : brokerService.getTransportConnectors()) {
+ URI uri = transportConnector.getPublishableConnectURI();
+ Socket socket;
+ if (uri.getScheme().equals("ssl")) {
+ SSLSocket sslSocket = (SSLSocket) SSLSocketFactory.getDefault().createSocket("127.0.0.1", uri.getPort());
+ final CountDownLatch doneHandShake = new CountDownLatch(1);
+ sslSocket.addHandshakeCompletedListener(new HandshakeCompletedListener() {
+ @Override
+ public void handshakeCompleted(HandshakeCompletedEvent handshakeCompletedEvent) {
+ doneHandShake.countDown();
+ }
+ });
+ sslSocket.startHandshake();
+ assertTrue("handshake done", doneHandShake.await(10, TimeUnit.SECONDS));
+
+ socket = sslSocket;
+ } else {
+ socket = new Socket("127.0.0.1", uri.getPort());
+ }
+ // ensure broker gets a chance to send on the new connection
+ TimeUnit.SECONDS.sleep(1);
+ LOG.info("testing socket: " + socket);
+ socket.close();
+ }
+ assertEquals("warn|no warn in log", warn, gotExceptionInLog.get());
+ }
+}