You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@accumulo.apache.org by el...@apache.org on 2014/05/09 17:29:00 UTC

[22/50] [abbrv] git commit: ACCUMULO-2343 Add AsyncSocketAppender

ACCUMULO-2343 Add AsyncSocketAppender

AsyncSocketAppender is a Log4J AsyncAppender with its own internal SocketAppender.
Configuration for either appender can be set on the AsyncSocketAppender itself. An
AsyncSocketAppender can be configured using a Log4J properties file, while an ordinary
AsyncAppender cannot.


Project: http://git-wip-us.apache.org/repos/asf/accumulo/repo
Commit: http://git-wip-us.apache.org/repos/asf/accumulo/commit/5a72c37e
Tree: http://git-wip-us.apache.org/repos/asf/accumulo/tree/5a72c37e
Diff: http://git-wip-us.apache.org/repos/asf/accumulo/diff/5a72c37e

Branch: refs/heads/ACCUMULO-378
Commit: 5a72c37eadeca58e670517611f39824f4307321e
Parents: 4d5ba9d
Author: Bill Havanki <bh...@cloudera.com>
Authored: Thu Apr 17 17:01:20 2014 -0400
Committer: Bill Havanki <bh...@cloudera.com>
Committed: Mon May 5 09:16:36 2014 -0400

----------------------------------------------------------------------
 .../accumulo/core/util/AsyncSocketAppender.java | 112 +++++++++++++++++++
 .../core/util/AsyncSocketAppenderTest.java      |  79 +++++++++++++
 .../test/functional/ConfigurableMacIT.java      |   3 +
 .../test/functional/MonitorLoggingIT.java       | 101 +++++++++++++++++
 test/src/test/resources/conf/generic_logger.xml |  83 ++++++++++++++
 test/src/test/resources/conf/monitor_logger.xml |  64 +++++++++++
 6 files changed, 442 insertions(+)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/accumulo/blob/5a72c37e/core/src/main/java/org/apache/accumulo/core/util/AsyncSocketAppender.java
----------------------------------------------------------------------
diff --git a/core/src/main/java/org/apache/accumulo/core/util/AsyncSocketAppender.java b/core/src/main/java/org/apache/accumulo/core/util/AsyncSocketAppender.java
new file mode 100644
index 0000000..baae9ba
--- /dev/null
+++ b/core/src/main/java/org/apache/accumulo/core/util/AsyncSocketAppender.java
@@ -0,0 +1,112 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.accumulo.core.util;
+
+import java.util.concurrent.atomic.AtomicBoolean;
+import org.apache.log4j.AsyncAppender;
+import org.apache.log4j.Level;
+import org.apache.log4j.Logger;
+import org.apache.log4j.net.SocketAppender;
+import org.apache.log4j.spi.LoggingEvent;
+
+/**
+ * An asynchronous appender that maintains its own internal socket appender. Unlike <code>AsyncAppender</code>, this appender can be configured with a Log4J
+ * properties file, although in that case no additional appenders can be added.
+ */
+public class AsyncSocketAppender extends AsyncAppender {
+
+  private final SocketAppender socketAppender;
+  private final AtomicBoolean activated = new AtomicBoolean(false);
+
+  /**
+   * Creates a new appender.
+   */
+  public AsyncSocketAppender() {
+    socketAppender = new SocketAppender();
+  }
+
+  /**
+   * Creates a new appender using the given socket appender internally. Use this constructor for testing only.
+   */
+  AsyncSocketAppender(SocketAppender socketAppender) {
+    this.socketAppender = socketAppender;
+  }
+
+  @Override
+  public void append(final LoggingEvent event) {
+    // Lazy attachment, to avoid calling non-final method in constructor
+    if (!isAttached(socketAppender)) {
+      addAppender(socketAppender);
+    }
+
+    // Lazy activation / connection too, to allow setting host and port
+    if (activated.compareAndSet(false, true)) {
+      socketAppender.activateOptions();
+    }
+
+    super.append(event);
+  }
+
+  // SocketAppender delegate methods
+
+  public String getApplication() {
+    return socketAppender.getApplication();
+  }
+
+  // super.getLocationInfo() will always agree with socketAppender
+  public int getPort() {
+    return socketAppender.getPort();
+  }
+
+  public int getReconnectionDelay() {
+    return socketAppender.getReconnectionDelay();
+  }
+
+  public String getRemoteHost() {
+    return socketAppender.getRemoteHost();
+  }
+
+  public boolean isAdvertiseViaMulticastDNS() {
+    return socketAppender.isAdvertiseViaMulticastDNS();
+  }
+
+  public void setAdvertiseViaMulticastDNS(boolean advertiseViaMulticastDNS) {
+    socketAppender.setAdvertiseViaMulticastDNS(advertiseViaMulticastDNS);
+  }
+
+  public void setApplication(String lapp) {
+    socketAppender.setApplication(lapp);
+  }
+
+  @Override
+  public void setLocationInfo(boolean locationInfo) {
+    super.setLocationInfo(locationInfo);
+    socketAppender.setLocationInfo(locationInfo);
+  }
+
+  public void setPort(int port) {
+    socketAppender.setPort(port);
+  }
+
+  public void setReconnectionDelay(int delay) {
+    socketAppender.setReconnectionDelay(delay);
+  }
+
+  public void setRemoteHost(String host) {
+    socketAppender.setRemoteHost(host);
+  }
+}

http://git-wip-us.apache.org/repos/asf/accumulo/blob/5a72c37e/core/src/test/java/org/apache/accumulo/core/util/AsyncSocketAppenderTest.java
----------------------------------------------------------------------
diff --git a/core/src/test/java/org/apache/accumulo/core/util/AsyncSocketAppenderTest.java b/core/src/test/java/org/apache/accumulo/core/util/AsyncSocketAppenderTest.java
new file mode 100644
index 0000000..414125a
--- /dev/null
+++ b/core/src/test/java/org/apache/accumulo/core/util/AsyncSocketAppenderTest.java
@@ -0,0 +1,79 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.accumulo.core.util;
+
+import org.apache.log4j.Logger;
+import org.apache.log4j.Priority;
+import org.apache.log4j.net.SocketAppender;
+import org.apache.log4j.spi.LoggingEvent;
+import org.junit.Before;
+import org.junit.Test;
+import static org.junit.Assert.*;
+import org.easymock.Capture;
+import static org.easymock.EasyMock.*;
+
+public class AsyncSocketAppenderTest {
+  private SocketAppender sa;
+  private AsyncSocketAppender asa;
+
+  @Before
+  public void setUp() throws Exception {
+    sa = createMock(SocketAppender.class);
+  }
+
+  @Test
+  public void testDelegates() {
+    asa = new AsyncSocketAppender();
+    asa.setApplication("myapp");
+    asa.setLocationInfo(true);
+    asa.setPort(1234);
+    asa.setReconnectionDelay(56);
+    asa.setRemoteHost("remotehost");
+    assertEquals("myapp", asa.getApplication());
+    assertEquals(true, asa.getLocationInfo()); // not really delegating
+    assertEquals(1234, asa.getPort());
+    assertEquals(56, asa.getReconnectionDelay());
+    assertEquals("remotehost", asa.getRemoteHost());
+  }
+
+  @Test
+  public void testSetLocationInfo() {
+    sa.setLocationInfo(true);
+    replay(sa);
+    asa = new AsyncSocketAppender(sa);
+    asa.setLocationInfo(true);
+    verify(sa);
+  }
+
+  @Test
+  public void testAppend() {
+    asa = new AsyncSocketAppender(sa);
+    assertFalse(asa.isAttached(sa));
+    LoggingEvent event1 = new LoggingEvent("java.lang.String", Logger.getRootLogger(), Priority.INFO, "event1", null);
+    LoggingEvent event2 = new LoggingEvent("java.lang.Integer", Logger.getRootLogger(), Priority.WARN, "event2", null);
+    sa.activateOptions();
+    sa.doAppend(event1);
+    sa.doAppend(event2);
+    sa.close();
+    replay(sa);
+    asa.doAppend(event1);
+    asa.doAppend(event2);
+    asa.close(); // forces events to be appended to socket appender
+    assertTrue(asa.isAttached(sa));
+    verify(sa);
+  }
+}

http://git-wip-us.apache.org/repos/asf/accumulo/blob/5a72c37e/test/src/test/java/org/apache/accumulo/test/functional/ConfigurableMacIT.java
----------------------------------------------------------------------
diff --git a/test/src/test/java/org/apache/accumulo/test/functional/ConfigurableMacIT.java b/test/src/test/java/org/apache/accumulo/test/functional/ConfigurableMacIT.java
index d9bed7f..59b0977 100644
--- a/test/src/test/java/org/apache/accumulo/test/functional/ConfigurableMacIT.java
+++ b/test/src/test/java/org/apache/accumulo/test/functional/ConfigurableMacIT.java
@@ -44,6 +44,8 @@ public class ConfigurableMacIT extends AbstractMacIT {
 
   public void configure(MiniAccumuloConfigImpl cfg, Configuration hadoopCoreSite) {}
 
+  public void beforeClusterStart(MiniAccumuloConfigImpl cfg) throws Exception {}
+
   @Before
   public void setUp() throws Exception {
     MiniAccumuloConfigImpl cfg = new MiniAccumuloConfigImpl(
@@ -63,6 +65,7 @@ public class ConfigurableMacIT extends AbstractMacIT {
       coreSite.writeXml(out);
       out.close();
     }
+    beforeClusterStart(cfg);
     cluster.start();
   }
 

http://git-wip-us.apache.org/repos/asf/accumulo/blob/5a72c37e/test/src/test/java/org/apache/accumulo/test/functional/MonitorLoggingIT.java
----------------------------------------------------------------------
diff --git a/test/src/test/java/org/apache/accumulo/test/functional/MonitorLoggingIT.java b/test/src/test/java/org/apache/accumulo/test/functional/MonitorLoggingIT.java
new file mode 100644
index 0000000..2dadafe
--- /dev/null
+++ b/test/src/test/java/org/apache/accumulo/test/functional/MonitorLoggingIT.java
@@ -0,0 +1,101 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.accumulo.test.functional;
+
+import static org.junit.Assert.assertNotNull;
+import static org.junit.Assert.assertTrue;
+
+import java.io.File;
+import java.net.URL;
+
+import org.apache.accumulo.core.client.Connector;
+import org.apache.accumulo.core.client.IteratorSetting;
+import org.apache.accumulo.core.client.Scanner;
+import org.apache.accumulo.core.security.Authorizations;
+import org.apache.accumulo.minicluster.impl.MiniAccumuloConfigImpl;
+import org.apache.accumulo.monitor.Monitor;
+import org.apache.accumulo.server.util.Admin;
+import org.apache.commons.io.FileUtils;
+import org.apache.hadoop.conf.Configuration;
+import org.apache.log4j.Logger;
+import org.apache.zookeeper.KeeperException;
+import org.junit.Test;
+
+public class MonitorLoggingIT extends ConfigurableMacIT {
+  public static final Logger log = Logger.getLogger(MonitorLoggingIT.class);
+
+  @Override
+  public void beforeClusterStart(MiniAccumuloConfigImpl cfg) throws Exception {
+    File confDir = cfg.getConfDir();
+    try {
+      FileUtils.copyFileToDirectory(new File(MonitorLoggingIT.class.getResource("/conf/generic_logger.xml").toURI()), confDir);
+      FileUtils.copyFileToDirectory(new File(MonitorLoggingIT.class.getResource("/conf/monitor_logger.xml").toURI()), confDir);
+    } catch (Exception e) {
+      log.error("Failed to copy Log4J XML files to conf dir", e);
+    }
+  }
+
+  private static final int NUM_LOCATION_PASSES = 5;
+  private static final long LOCATION_DELAY = 5000L;
+
+  @Test
+  public void logToMonitor() throws Exception {
+    // Start the monitor.
+    log.debug("Starting Monitor");
+    Process monitor = cluster.exec(Monitor.class);
+
+    // Get monitor location to ensure it is running.
+    String monitorLocation = null;
+    for (int i = 0; i < NUM_LOCATION_PASSES; i++) {
+      Thread.sleep(LOCATION_DELAY);
+      try {
+        monitorLocation = getMonitor();
+        break;
+      } catch (KeeperException e) {
+        log.debug("Monitor not up yet, trying again in " + LOCATION_DELAY + " ms");
+      }
+    }
+    assertNotNull("Monitor failed to start within " + (LOCATION_DELAY * NUM_LOCATION_PASSES) + " ms", monitorLocation);
+    log.debug("Monitor running at " + monitorLocation);
+
+    // Attempt a scan with an invalid iterator to force a log message in the monitor.
+    try {
+      Connector c = getConnector();
+      Scanner s = c.createScanner("accumulo.root", new Authorizations());
+      IteratorSetting cfg = new IteratorSetting(100, "incorrect", "java.lang.String");
+      s.addScanIterator(cfg);
+      s.iterator().next();
+    } catch (Exception e) {
+      // expected, the iterator was bad
+    }
+    Thread.sleep(5000L);  // extra precaution to ensure monitor has opportunity to log
+
+    // Verify messages were received at the monitor.
+    URL url = new URL("http://" + monitorLocation + "/log");
+    log.debug("Fetching web page " + url);
+    String result = FunctionalTestUtils.readAll(url.openStream());
+    assertTrue("No log messages found", result.contains("<pre class='logevent'>"));
+
+    // Shutdown cleanly.
+    log.debug("Stopping mini accumulo cluster");
+    Process shutdown = cluster.exec(Admin.class, "stopAll");
+    shutdown.waitFor();
+    assertTrue(shutdown.exitValue() == 0);
+    log.debug("success!");
+    monitor.destroy();
+  }
+}

http://git-wip-us.apache.org/repos/asf/accumulo/blob/5a72c37e/test/src/test/resources/conf/generic_logger.xml
----------------------------------------------------------------------
diff --git a/test/src/test/resources/conf/generic_logger.xml b/test/src/test/resources/conf/generic_logger.xml
new file mode 100644
index 0000000..db79efe
--- /dev/null
+++ b/test/src/test/resources/conf/generic_logger.xml
@@ -0,0 +1,83 @@
+<?xml version="1.0" encoding="UTF-8"?>
+<!--
+  Licensed to the Apache Software Foundation (ASF) under one or more
+  contributor license agreements.  See the NOTICE file distributed with
+  this work for additional information regarding copyright ownership.
+  The ASF licenses this file to You under the Apache License, Version 2.0
+  (the "License"); you may not use this file except in compliance with
+  the License.  You may obtain a copy of the License at
+
+      http://www.apache.org/licenses/LICENSE-2.0
+
+  Unless required by applicable law or agreed to in writing, software
+  distributed under the License is distributed on an "AS IS" BASIS,
+  WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+  See the License for the specific language governing permissions and
+  limitations under the License.
+-->
+<!DOCTYPE log4j:configuration SYSTEM "log4j.dtd">
+<log4j:configuration xmlns:log4j="http://jakarta.apache.org/log4j/">
+
+  <!-- Write out everything at the DEBUG level to the debug log -->
+  <appender name="A2" class="org.apache.log4j.RollingFileAppender">
+     <param name="File"           value="${org.apache.accumulo.core.dir.log}/${org.apache.accumulo.core.application}_${org.apache.accumulo.core.ip.localhost.hostname}.debug.log"/>
+     <param name="MaxFileSize"    value="1000MB"/>
+     <param name="MaxBackupIndex" value="10"/>
+     <param name="Threshold"      value="DEBUG"/>
+     <layout class="org.apache.log4j.PatternLayout">
+       <param name="ConversionPattern" value="%d{ISO8601} [%-8c{2}] %-5p: %m%n"/>
+     </layout>
+  </appender>
+
+  <!--  Write out INFO and higher to the regular log -->
+  <appender name="A3" class="org.apache.log4j.RollingFileAppender">
+     <param name="File"           value="${org.apache.accumulo.core.dir.log}/${org.apache.accumulo.core.application}_${org.apache.accumulo.core.ip.localhost.hostname}.log"/>
+     <param name="MaxFileSize"    value="1000MB"/>
+     <param name="MaxBackupIndex" value="10"/>
+     <param name="Threshold"      value="INFO"/>
+     <layout class="org.apache.log4j.PatternLayout">
+       <param name="ConversionPattern" value="%d{ISO8601} [%-8c{2}] %-5p: %m%n"/>
+     </layout>
+  </appender>
+
+  <!-- Send all logging data to a centralized logger -->
+  <appender name="N1" class="org.apache.log4j.net.SocketAppender">
+     <param name="remoteHost"     value="${org.apache.accumulo.core.host.log}"/>
+     <param name="port"           value="${org.apache.accumulo.core.host.log.port}"/>
+     <param name="application"    value="${org.apache.accumulo.core.application}:${org.apache.accumulo.core.ip.localhost.hostname}"/>
+     <param name="Threshold"      value="WARN"/>
+  </appender>
+
+  <!--  If the centralized logger is down, buffer the log events, but drop them if it stays down -->
+  <appender name="ASYNC" class="org.apache.log4j.AsyncAppender">
+     <appender-ref ref="N1" />
+  </appender>
+
+  <!-- Log accumulo events to the debug, normal and remote logs. -->
+  <logger name="org.apache.accumulo" additivity="false">
+     <level value="DEBUG"/>
+     <appender-ref ref="A2" />
+     <appender-ref ref="A3" />
+     <appender-ref ref="ASYNC" />
+  </logger>
+
+  <logger name="org.apache.accumulo.core.file.rfile.bcfile">
+     <level value="INFO"/>
+  </logger>
+
+  <logger name="org.mortbay.log">
+     <level value="WARN"/>
+  </logger>
+
+  <logger name="org.apache.zookeeper">
+     <level value="ERROR"/>
+  </logger>
+
+  <!-- Log non-accumulo events to the debug and normal logs. -->
+  <root>
+     <level value="INFO"/>
+     <appender-ref ref="A2" />
+     <appender-ref ref="A3" />
+  </root>
+
+</log4j:configuration>

http://git-wip-us.apache.org/repos/asf/accumulo/blob/5a72c37e/test/src/test/resources/conf/monitor_logger.xml
----------------------------------------------------------------------
diff --git a/test/src/test/resources/conf/monitor_logger.xml b/test/src/test/resources/conf/monitor_logger.xml
new file mode 100644
index 0000000..91a7671
--- /dev/null
+++ b/test/src/test/resources/conf/monitor_logger.xml
@@ -0,0 +1,64 @@
+<?xml version="1.0" encoding="UTF-8"?>
+<!--
+  Licensed to the Apache Software Foundation (ASF) under one or more
+  contributor license agreements.  See the NOTICE file distributed with
+  this work for additional information regarding copyright ownership.
+  The ASF licenses this file to You under the Apache License, Version 2.0
+  (the "License"); you may not use this file except in compliance with
+  the License.  You may obtain a copy of the License at
+
+      http://www.apache.org/licenses/LICENSE-2.0
+
+  Unless required by applicable law or agreed to in writing, software
+  distributed under the License is distributed on an "AS IS" BASIS,
+  WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+  See the License for the specific language governing permissions and
+  limitations under the License.
+-->
+<!DOCTYPE log4j:configuration SYSTEM "log4j.dtd">
+<log4j:configuration xmlns:log4j="http://jakarta.apache.org/log4j/">
+
+  <!-- Write out everything at the DEBUG level to the debug log -->
+  <appender name="A2" class="org.apache.log4j.RollingFileAppender">
+     <param name="File"           value="${org.apache.accumulo.core.dir.log}/${org.apache.accumulo.core.application}_${org.apache.accumulo.core.ip.localhost.hostname}.debug.log"/>
+     <param name="MaxFileSize"    value="100MB"/>
+     <param name="MaxBackupIndex" value="10"/>
+     <param name="Threshold"      value="DEBUG"/>
+     <layout class="org.apache.log4j.PatternLayout">
+       <param name="ConversionPattern" value="%d{ISO8601} [%-8c{2}] %-5p: %X{application} %m%n"/>
+     </layout>
+  </appender>
+
+  <!--  Write out INFO and higher to the regular log -->
+  <appender name="A3" class="org.apache.log4j.RollingFileAppender">
+     <param name="File"           value="${org.apache.accumulo.core.dir.log}/${org.apache.accumulo.core.application}_${org.apache.accumulo.core.ip.localhost.hostname}.log"/>
+     <param name="MaxFileSize"    value="100MB"/>
+     <param name="MaxBackupIndex" value="10"/>
+     <param name="Threshold"      value="INFO"/>
+     <layout class="org.apache.log4j.PatternLayout">
+       <param name="ConversionPattern" value="%d{ISO8601} [%-8c{2}] %-5p: %X{application} %m%n"/>
+     </layout>
+  </appender>
+
+  <!-- Keep the last few log messages for display to the user -->
+  <appender name="GUI" class="org.apache.accumulo.server.monitor.LogService">
+     <param name="keep"           value="40"/>
+     <param name="Threshold"      value="WARN"/>
+  </appender>
+
+  <!-- Log accumulo messages to debug, normal and GUI -->
+  <logger name="org.apache.accumulo" additivity="false">
+     <level value="DEBUG"/>
+     <appender-ref ref="A2" />
+     <appender-ref ref="A3" />
+     <appender-ref ref="GUI" />
+  </logger>
+
+  <!-- Log non-accumulo messages to debug, normal logs. -->
+  <root>
+     <level value="INFO"/>
+     <appender-ref ref="A2" />
+     <appender-ref ref="A3" />
+  </root>
+
+</log4j:configuration>