You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@activemq.apache.org by gt...@apache.org on 2016/09/30 13:05:54 UTC

activemq git commit: https://issues.apache.org/jira/browse/AMQ-6446 - use shared logger and pepend statements with connection counter. Old behaviour or per connection logger can be obtained with trace=true&jmxPort=0

Repository: activemq
Updated Branches:
  refs/heads/master c60d71696 -> 5385fd1bb


https://issues.apache.org/jira/browse/AMQ-6446 - use shared logger and pepend statements with connection counter. Old behaviour or per connection logger can be obtained with trace=true&jmxPort=0


Project: http://git-wip-us.apache.org/repos/asf/activemq/repo
Commit: http://git-wip-us.apache.org/repos/asf/activemq/commit/5385fd1b
Tree: http://git-wip-us.apache.org/repos/asf/activemq/tree/5385fd1b
Diff: http://git-wip-us.apache.org/repos/asf/activemq/diff/5385fd1b

Branch: refs/heads/master
Commit: 5385fd1bb36abe5a123db039b63d30cefd594ba4
Parents: c60d716
Author: gtully <ga...@gmail.com>
Authored: Fri Sep 30 14:05:42 2016 +0100
Committer: gtully <ga...@gmail.com>
Committed: Fri Sep 30 14:05:42 2016 +0100

----------------------------------------------------------------------
 .../transport/TransportLoggerFactory.java       |  17 ++-
 .../transport/logwriters/CustomLogWriter.java   |   7 +-
 .../transport/logwriters/DefaultLogWriter.java  |  18 ++-
 .../apache/activemq/TransportLoggerSupport.java |   6 +
 .../apache/activemq/transport/LogWriter.java    |   7 +
 .../transport/tcp/TcpTransportServer.java       |  10 ++
 .../activemq/store/kahadb/KahaDBTest.java       |   2 +-
 .../apache/activemq/usecases/AMQ6446Test.java   | 152 +++++++++++++++++++
 8 files changed, 205 insertions(+), 14 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/activemq/blob/5385fd1b/activemq-broker/src/main/java/org/apache/activemq/transport/TransportLoggerFactory.java
----------------------------------------------------------------------
diff --git a/activemq-broker/src/main/java/org/apache/activemq/transport/TransportLoggerFactory.java b/activemq-broker/src/main/java/org/apache/activemq/transport/TransportLoggerFactory.java
index 37ee004..599ea46 100644
--- a/activemq-broker/src/main/java/org/apache/activemq/transport/TransportLoggerFactory.java
+++ b/activemq-broker/src/main/java/org/apache/activemq/transport/TransportLoggerFactory.java
@@ -27,6 +27,8 @@ import org.apache.activemq.util.LogWriterFinder;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 
+import static org.apache.activemq.TransportLoggerSupport.defaultJmxPort;
+
 /**
  * Singleton class to create TransportLogger objects.
  * When the method getInstance() is called for the first time,
@@ -61,10 +63,6 @@ public class TransportLoggerFactory {
      * This setting only has a meaning if
      */
     private static boolean defaultInitialBehavior = true;
-    /**
-     * Default port to control the transport loggers through JMX
-     */
-    private static int defaultJmxPort = 1099;
 
     private boolean transportLoggerControlCreated = false;
     private ManagementContext managementContext;
@@ -137,7 +135,11 @@ public class TransportLoggerFactory {
      */
     public TransportLogger createTransportLogger(Transport next, String logWriterName,
             boolean useJmx, boolean startLogging, int jmxport) throws IOException {
-        int id = getNextId();
+        int id = -1; // new default to single logger
+        // allow old behaviour with incantation
+        if (!useJmx && jmxport != defaultJmxPort) {
+            id = getNextId();
+        }
         return createTransportLogger(next, id, createLog(id), logWriterName, useJmx, startLogging, jmxport);
     }
 
@@ -162,6 +164,9 @@ public class TransportLoggerFactory {
             String logWriterName, boolean dynamicManagement, boolean startLogging, int jmxport) throws IOException {
         try {
             LogWriter logWriter = logWriterFinder.newInstance(logWriterName);
+            if (id == -1) {
+                logWriter.setPrefix(String.format("%08X: ", getNextId()));
+            }
             TransportLogger tl =  new TransportLogger (next, log, startLogging, logWriter);
             if (dynamicManagement) {
                 synchronized (this) {
@@ -183,7 +188,7 @@ public class TransportLoggerFactory {
     }
 
     private static Logger createLog(int id) {
-        return LoggerFactory.getLogger(TransportLogger.class.getName()+".Connection:" + id);
+        return LoggerFactory.getLogger(TransportLogger.class.getName()+".Connection" + (id > 0 ? ":"+id : "" ));
     }
 
     /**

http://git-wip-us.apache.org/repos/asf/activemq/blob/5385fd1b/activemq-broker/src/main/java/org/apache/activemq/transport/logwriters/CustomLogWriter.java
----------------------------------------------------------------------
diff --git a/activemq-broker/src/main/java/org/apache/activemq/transport/logwriters/CustomLogWriter.java b/activemq-broker/src/main/java/org/apache/activemq/transport/logwriters/CustomLogWriter.java
index 65b9162..4387cdc 100644
--- a/activemq-broker/src/main/java/org/apache/activemq/transport/logwriters/CustomLogWriter.java
+++ b/activemq-broker/src/main/java/org/apache/activemq/transport/logwriters/CustomLogWriter.java
@@ -36,7 +36,12 @@ import org.slf4j.Logger;
  * 
  */
 public class CustomLogWriter implements LogWriter {
-    
+
+    @Override
+    public void setPrefix(String prefix) {
+        // for the custom case, revert to the logger per connection
+    }
+
     // doc comment inherited from LogWriter
     public void initialMessage(Logger log) {
         

http://git-wip-us.apache.org/repos/asf/activemq/blob/5385fd1b/activemq-broker/src/main/java/org/apache/activemq/transport/logwriters/DefaultLogWriter.java
----------------------------------------------------------------------
diff --git a/activemq-broker/src/main/java/org/apache/activemq/transport/logwriters/DefaultLogWriter.java b/activemq-broker/src/main/java/org/apache/activemq/transport/logwriters/DefaultLogWriter.java
index b8261d9..4de3706 100644
--- a/activemq-broker/src/main/java/org/apache/activemq/transport/logwriters/DefaultLogWriter.java
+++ b/activemq-broker/src/main/java/org/apache/activemq/transport/logwriters/DefaultLogWriter.java
@@ -30,6 +30,12 @@ import org.slf4j.Logger;
  */
 public class DefaultLogWriter implements LogWriter {
 
+    String prefix = "";
+    @Override
+    public void setPrefix(String prefix) {
+        this.prefix = prefix;
+    }
+
     // doc comment inherited from LogWriter
     public void initialMessage(Logger log) {
         // Default log writer does nothing here
@@ -37,32 +43,32 @@ public class DefaultLogWriter implements LogWriter {
 
     // doc comment inherited from LogWriter
     public void logRequest (Logger log, Object command) {
-        log.debug("SENDING REQUEST: "+command);
+        log.debug(prefix + "SENDING REQUEST: "+command);
     }
 
     // doc comment inherited from LogWriter
     public void logResponse (Logger log, Object response) {
-        log.debug("GOT RESPONSE: "+response);
+        log.debug(prefix + "GOT RESPONSE: "+response);
     }
 
     // doc comment inherited from LogWriter
     public void logAsyncRequest (Logger log, Object command) {
-        log.debug("SENDING ASNYC REQUEST: "+command);
+        log.debug(prefix + "SENDING ASNYC REQUEST: "+command);
     }
 
     // doc comment inherited from LogWriter
     public void logOneWay (Logger log, Object command) {
-        log.debug("SENDING: "+command);
+        log.debug(prefix + "SENDING: "+command);
     }
 
     // doc comment inherited from LogWriter
     public void logReceivedCommand (Logger log, Object command) {
-        log.debug("RECEIVED: " + command);
+        log.debug(prefix + "RECEIVED: " + command);
     }
 
     // doc comment inherited from LogWriter
     public void logReceivedException (Logger log, IOException error) {
-        log.debug("RECEIVED Exception: "+error, error);
+        log.debug(prefix + "RECEIVED Exception: "+error, error);
     }
 
 

http://git-wip-us.apache.org/repos/asf/activemq/blob/5385fd1b/activemq-client/src/main/java/org/apache/activemq/TransportLoggerSupport.java
----------------------------------------------------------------------
diff --git a/activemq-client/src/main/java/org/apache/activemq/TransportLoggerSupport.java b/activemq-client/src/main/java/org/apache/activemq/TransportLoggerSupport.java
index 2a4e600..80ad176 100644
--- a/activemq-client/src/main/java/org/apache/activemq/TransportLoggerSupport.java
+++ b/activemq-client/src/main/java/org/apache/activemq/TransportLoggerSupport.java
@@ -27,6 +27,12 @@ public class TransportLoggerSupport {
 
     public static String defaultLogWriterName = "default";
 
+    /**
+     * Default port to control the transport loggers through JMX
+     */
+    public static int defaultJmxPort = 1099;
+
+
     public static interface SPI {
         public Transport createTransportLogger(Transport transport) throws IOException;
         public Transport createTransportLogger(Transport transport, String logWriterName, boolean dynamicManagement, boolean startLogging, int jmxPort) throws IOException;

http://git-wip-us.apache.org/repos/asf/activemq/blob/5385fd1b/activemq-client/src/main/java/org/apache/activemq/transport/LogWriter.java
----------------------------------------------------------------------
diff --git a/activemq-client/src/main/java/org/apache/activemq/transport/LogWriter.java b/activemq-client/src/main/java/org/apache/activemq/transport/LogWriter.java
index afc2ca2..d92ccc6 100644
--- a/activemq-client/src/main/java/org/apache/activemq/transport/LogWriter.java
+++ b/activemq-client/src/main/java/org/apache/activemq/transport/LogWriter.java
@@ -35,6 +35,13 @@ import org.slf4j.Logger;
 public interface LogWriter {
 
     /**
+     * prefix each statement with this value. allows connections to be correlated
+     * when logger is shared
+     * @param prefix
+     */
+    public void setPrefix(String prefix);
+
+    /**
      * Writes a header message to the log.
      * @param log The log to be written to.
      */

http://git-wip-us.apache.org/repos/asf/activemq/blob/5385fd1b/activemq-client/src/main/java/org/apache/activemq/transport/tcp/TcpTransportServer.java
----------------------------------------------------------------------
diff --git a/activemq-client/src/main/java/org/apache/activemq/transport/tcp/TcpTransportServer.java b/activemq-client/src/main/java/org/apache/activemq/transport/tcp/TcpTransportServer.java
index 5d623b6..f3e225f 100755
--- a/activemq-client/src/main/java/org/apache/activemq/transport/tcp/TcpTransportServer.java
+++ b/activemq-client/src/main/java/org/apache/activemq/transport/tcp/TcpTransportServer.java
@@ -112,6 +112,7 @@ public class TcpTransportServer extends TransportServerThreadSupport implements
      * TransportConnector URIs.
      */
     protected boolean startLogging = true;
+    protected int jmxPort = TransportLoggerSupport.defaultJmxPort;
     protected final ServerSocketFactory serverSocketFactory;
     protected final BlockingQueue<Socket> socketQueue = new LinkedBlockingQueue<Socket>();
     protected Thread socketHandlerThread;
@@ -258,6 +259,14 @@ public class TcpTransportServer extends TransportServerThreadSupport implements
         this.dynamicManagement = useJmx;
     }
 
+    public void setJmxPort(int jmxPort) {
+        this.jmxPort = jmxPort;
+    }
+
+    public int getJmxPort() {
+        return jmxPort;
+    }
+
     public boolean isStartLogging() {
         return startLogging;
     }
@@ -577,6 +586,7 @@ public class TcpTransportServer extends TransportServerThreadSupport implements
             options.put("logWriterName", logWriterName);
             options.put("dynamicManagement", Boolean.valueOf(dynamicManagement));
             options.put("startLogging", Boolean.valueOf(startLogging));
+            options.put("jmxPort", Integer.valueOf(jmxPort));
             options.putAll(transportOptions);
 
             TransportInfo transportInfo = configureTransport(this, socket);

http://git-wip-us.apache.org/repos/asf/activemq/blob/5385fd1b/activemq-unit-tests/src/test/java/org/apache/activemq/store/kahadb/KahaDBTest.java
----------------------------------------------------------------------
diff --git a/activemq-unit-tests/src/test/java/org/apache/activemq/store/kahadb/KahaDBTest.java b/activemq-unit-tests/src/test/java/org/apache/activemq/store/kahadb/KahaDBTest.java
index bd81524..74d2bb8 100644
--- a/activemq-unit-tests/src/test/java/org/apache/activemq/store/kahadb/KahaDBTest.java
+++ b/activemq-unit-tests/src/test/java/org/apache/activemq/store/kahadb/KahaDBTest.java
@@ -226,7 +226,7 @@ public class KahaDBTest extends TestCase {
         assertEquals("Expected to received all messages.", count, 100);
         broker.stop();
 
-        Logger.getRootLogger().addAppender(appender);
+        Logger.getRootLogger().removeAppender(appender);
         assertFalse("Did not replay any records from the journal", didSomeRecovery.get());
     }
 

http://git-wip-us.apache.org/repos/asf/activemq/blob/5385fd1b/activemq-unit-tests/src/test/java/org/apache/activemq/usecases/AMQ6446Test.java
----------------------------------------------------------------------
diff --git a/activemq-unit-tests/src/test/java/org/apache/activemq/usecases/AMQ6446Test.java b/activemq-unit-tests/src/test/java/org/apache/activemq/usecases/AMQ6446Test.java
new file mode 100644
index 0000000..ecc570f
--- /dev/null
+++ b/activemq-unit-tests/src/test/java/org/apache/activemq/usecases/AMQ6446Test.java
@@ -0,0 +1,152 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.activemq.usecases;
+
+import org.apache.activemq.ActiveMQConnectionFactory;
+import org.apache.activemq.broker.BrokerFactory;
+import org.apache.activemq.broker.BrokerService;
+import org.apache.activemq.util.DefaultTestAppender;
+import org.apache.log4j.Level;
+import org.apache.log4j.Logger;
+import org.apache.log4j.spi.LoggingEvent;
+import org.junit.After;
+import org.junit.Before;
+import org.junit.Test;
+
+import javax.jms.Connection;
+import java.util.Collection;
+import java.util.HashSet;
+import java.util.LinkedList;
+import java.util.regex.Matcher;
+import java.util.regex.Pattern;
+
+import static org.junit.Assert.assertFalse;
+import static org.junit.Assert.assertTrue;
+
+public class AMQ6446Test {
+
+    private BrokerService brokerService;
+    LinkedList<Connection> connections = new LinkedList<>();
+
+    @Test
+    public void test2Connections() throws Exception {
+        final String urlTraceParam = "?trace=true";
+        startBroker(urlTraceParam);
+        final HashSet<String> loggers = new HashSet<String>();
+        final HashSet<String> messages = new HashSet<String>();
+
+        DefaultTestAppender appender = new DefaultTestAppender() {
+            @Override
+            public void doAppend(LoggingEvent event) {
+                loggers.add(event.getLoggerName());
+                messages.add(event.getRenderedMessage());
+            }
+        };
+
+        Logger.getRootLogger().addAppender(appender);
+        Logger.getRootLogger().setLevel(Level.DEBUG);
+
+        String brokerUrlWithTrace = brokerService.getTransportConnectorByScheme("tcp").getPublishableConnectString() +
+                urlTraceParam;
+        ActiveMQConnectionFactory factory = new ActiveMQConnectionFactory(brokerUrlWithTrace);
+
+        for (int i=0; i<2; i++) {
+            Connection c = factory.createConnection();
+            c.start();
+            connections.add(c);
+        }
+
+        Logger.getRootLogger().removeAppender(appender);
+
+        // no logger ends with :2
+        assertFalse(foundMatch(loggers, ".*:2$"));
+
+        // starts with 000000x:
+        assertTrue(foundMatch(messages, "^0+\\d:.*"));
+    }
+
+    public boolean foundMatch(Collection<String> values, String regex) {
+        boolean found = false;
+        Pattern p = Pattern.compile(regex);
+
+        for (String input: values) {
+            Matcher m = p.matcher(input);
+            found =  m.matches();
+            if (found) {
+                break;
+            }
+        }
+        return found;
+    }
+
+    @Test
+    public void test2ConnectionsLegacy() throws Exception {
+        final String legacySupportParam = "?trace=true&jmxPort=22";
+        startBroker(legacySupportParam);
+
+        final HashSet<String> loggers = new HashSet<String>();
+        final HashSet<String> messages = new HashSet<String>();
+
+        DefaultTestAppender appender = new DefaultTestAppender() {
+            @Override
+            public void doAppend(LoggingEvent event) {
+                loggers.add(event.getLoggerName());
+                messages.add(event.getRenderedMessage());
+            }
+        };
+
+        Logger.getRootLogger().addAppender(appender);
+        Logger.getRootLogger().setLevel(Level.TRACE);
+
+        String brokerUrlWithTrace = brokerService.getTransportConnectorByScheme("tcp").getPublishableConnectString() +
+                legacySupportParam;
+        ActiveMQConnectionFactory factory = new ActiveMQConnectionFactory(brokerUrlWithTrace);
+
+        for (int i=0; i<2; i++) {
+            Connection c = factory.createConnection();
+            c.start();
+            connections.add(c);
+        }
+
+        Logger.getRootLogger().removeAppender(appender);
+
+        // logger ends with :2
+        assertTrue(foundMatch(loggers, ".*:2$"));
+
+        // starts with 000000x:
+        assertFalse(foundMatch(messages, "^0+\\d:.*"));
+
+    }
+
+    @After
+    public void tearDown() throws Exception {
+        for (Connection connection : connections) {
+            try {
+                connection.close();
+            } catch (Exception ignored) {}
+        }
+        brokerService.stop();
+        brokerService.waitUntilStopped();
+    }
+
+    public void startBroker(String urlParam) throws Exception {
+        brokerService = BrokerFactory.createBroker("broker:(tcp://0.0.0.0:0" + urlParam + ")/localhost?useJmx=false&persistent=false");
+        brokerService.start();
+        brokerService.waitUntilStarted();
+    }
+
+}