You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@accumulo.apache.org by ct...@apache.org on 2017/03/09 01:53:22 UTC

[3/4] accumulo git commit: ACCUMULO-4409 Add tests for monitor appender

ACCUMULO-4409 Add tests for monitor appender

Changes to make AccumuloMonitorAppender more testable and the tests to
accompany it.


Project: http://git-wip-us.apache.org/repos/asf/accumulo/repo
Commit: http://git-wip-us.apache.org/repos/asf/accumulo/commit/5afbacc0
Tree: http://git-wip-us.apache.org/repos/asf/accumulo/tree/5afbacc0
Diff: http://git-wip-us.apache.org/repos/asf/accumulo/diff/5afbacc0

Branch: refs/heads/master
Commit: 5afbacc0af9a6d7f9c610969eafcfed2734364a2
Parents: ff7525e
Author: Christopher Tubbs <ct...@apache.org>
Authored: Wed Mar 8 20:50:44 2017 -0500
Committer: Christopher Tubbs <ct...@apache.org>
Committed: Wed Mar 8 20:50:44 2017 -0500

----------------------------------------------------------------------
 .../monitor/util/AccumuloMonitorAppender.java   | 193 +++++++++++++------
 .../util/AccumuloMonitorAppenderTest.java       | 184 ++++++++++++++++++
 2 files changed, 315 insertions(+), 62 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/accumulo/blob/5afbacc0/server/monitor/src/main/java/org/apache/accumulo/monitor/util/AccumuloMonitorAppender.java
----------------------------------------------------------------------
diff --git a/server/monitor/src/main/java/org/apache/accumulo/monitor/util/AccumuloMonitorAppender.java b/server/monitor/src/main/java/org/apache/accumulo/monitor/util/AccumuloMonitorAppender.java
index 8a855d0..a965f0c 100644
--- a/server/monitor/src/main/java/org/apache/accumulo/monitor/util/AccumuloMonitorAppender.java
+++ b/server/monitor/src/main/java/org/apache/accumulo/monitor/util/AccumuloMonitorAppender.java
@@ -18,10 +18,13 @@ package org.apache.accumulo.monitor.util;
 
 import static java.nio.charset.StandardCharsets.UTF_8;
 
+import java.util.Objects;
 import java.util.concurrent.Executors;
 import java.util.concurrent.ScheduledExecutorService;
 import java.util.concurrent.TimeUnit;
 import java.util.concurrent.atomic.AtomicBoolean;
+import java.util.function.Function;
+import java.util.function.Supplier;
 
 import org.apache.accumulo.core.Constants;
 import org.apache.accumulo.core.client.Instance;
@@ -30,16 +33,19 @@ import org.apache.accumulo.core.zookeeper.ZooUtil;
 import org.apache.accumulo.fate.zookeeper.ZooCache;
 import org.apache.accumulo.fate.zookeeper.ZooCacheFactory;
 import org.apache.accumulo.server.client.HdfsZooInstance;
+import org.apache.log4j.AppenderSkeleton;
 import org.apache.log4j.AsyncAppender;
 import org.apache.log4j.net.SocketAppender;
 import org.apache.zookeeper.data.Stat;
 
 import com.google.common.net.HostAndPort;
 
-public class AccumuloMonitorAppender extends AsyncAppender {
+public class AccumuloMonitorAppender extends AsyncAppender implements AutoCloseable {
 
-  private final ScheduledExecutorService executorService;
-  private final AtomicBoolean trackerScheduled;
+  final ScheduledExecutorService executorService;
+  final AtomicBoolean trackerScheduled;
+  private int frequency = 0;
+  private MonitorTracker tracker = null;
 
   /**
    * A Log4j Appender which follows the registered location of the active Accumulo monitor service, and forwards log messages to it
@@ -54,12 +60,33 @@ public class AccumuloMonitorAppender extends AsyncAppender {
     });
   }
 
+  public void setFrequency(int millis) {
+    if (millis > 0) {
+      frequency = millis;
+    }
+  }
+
+  public int getFrequency() {
+    return frequency;
+  }
+
+  // this is just for testing
+  void setTracker(MonitorTracker monitorTracker) {
+    tracker = monitorTracker;
+  }
+
   @Override
   public void activateOptions() {
     // only schedule it once (in case options get activated more than once); not sure if this is possible
     if (trackerScheduled.compareAndSet(false, true)) {
-      // wait 5 seconds, then run every 5 seconds
-      executorService.scheduleAtFixedRate(new MonitorTracker(), 5, 5, TimeUnit.SECONDS);
+      if (frequency <= 0) {
+        // use default rate of 5 seconds between each check
+        frequency = 5000;
+      }
+      if (tracker == null) {
+        tracker = new MonitorTracker(this, new ZooCacheLocationSupplier(), new SocketAppenderFactory());
+      }
+      executorService.scheduleWithFixedDelay(tracker, frequency, frequency, TimeUnit.MILLISECONDS);
     }
     super.activateOptions();
   }
@@ -72,80 +99,122 @@ public class AccumuloMonitorAppender extends AsyncAppender {
     super.close();
   }
 
-  private class MonitorTracker implements Runnable {
+  static class MonitorLocation {
+    private final String location;
+    private final long modId;
 
-    private String path;
-    private ZooCache zooCache;
-
-    private long lastModifiedTransactionId;
-    private SocketAppender lastSocketAppender;
+    public MonitorLocation(long modId, byte[] location) {
+      this.modId = modId;
+      this.location = location == null ? null : new String(location, UTF_8);
+    }
 
-    public MonitorTracker() {
+    public boolean hasLocation() {
+      return location != null;
+    }
 
-      // path and zooCache are lazily set the first time this tracker is run
-      // this allows the tracker to be constructed and scheduled during log4j initialization without
-      // triggering any actual logs from the Accumulo or ZooKeeper code
-      this.path = null;
-      this.zooCache = null;
+    public String getLocation() {
+      return location;
+    }
 
-      this.lastModifiedTransactionId = 0;
-      this.lastSocketAppender = null;
+    @Override
+    public boolean equals(Object obj) {
+      if (obj != null && obj instanceof MonitorLocation) {
+        MonitorLocation other = (MonitorLocation) obj;
+        return modId == other.modId && Objects.equals(location, other.location);
+      }
+      return false;
     }
 
     @Override
-    public void run() {
-      try {
-        // lazily set up path and zooCache (see comment in constructor)
-        if (this.zooCache == null) {
-          Instance instance = HdfsZooInstance.getInstance();
-          this.path = ZooUtil.getRoot(instance) + Constants.ZMONITOR_LOG4J_ADDR;
-          this.zooCache = new ZooCacheFactory().getZooCache(instance.getZooKeepers(), instance.getZooKeepersSessionTimeOut());
-        }
+    public int hashCode() {
+      return Long.hashCode(modId);
+    }
+  }
 
-        // get the current location from the cache and update if necessary
-        Stat stat = new Stat();
-        byte[] loc = zooCache.get(path, stat);
-        long modifiedTransactionId = stat.getMzxid();
-
-        // modifiedTransactionId will be 0 if no current location
-        // lastModifiedTransactionId will be 0 if we've never seen a location
-        if (modifiedTransactionId != lastModifiedTransactionId) {
-          // replace old socket on every change, even if new location is the same as old location
-          // if modifiedTransactionId changed, then the monitor restarted and the old socket is dead now
-          switchAppender(loc, modifiedTransactionId);
-        }
-      } catch (Exception e) {
-        // dump any non-fatal problems to the console, but let it run again
-        e.printStackTrace();
+  private static class ZooCacheLocationSupplier implements Supplier<MonitorLocation> {
+
+    // path and zooCache are lazily set the first time this tracker is run
+    // this allows the tracker to be constructed and scheduled during log4j initialization without
+    // triggering any actual logs from the Accumulo or ZooKeeper code
+    private String path = null;
+    private ZooCache zooCache = null;
+
+    @Override
+    public MonitorLocation get() {
+      // lazily set up path and zooCache (see comment in constructor)
+      if (this.zooCache == null) {
+        Instance instance = HdfsZooInstance.getInstance();
+        this.path = ZooUtil.getRoot(instance) + Constants.ZMONITOR_LOG4J_ADDR;
+        this.zooCache = new ZooCacheFactory().getZooCache(instance.getZooKeepers(), instance.getZooKeepersSessionTimeOut());
       }
+
+      // get the current location from the cache and update if necessary
+      Stat stat = new Stat();
+      byte[] loc = zooCache.get(path, stat);
+      // mzxid is 0 if location does not exist and the non-zero transaction id of the last modification otherwise
+      return new MonitorLocation(stat.getMzxid(), loc);
     }
+  }
 
-    private void switchAppender(byte[] newLocation, long newModifiedTransactionId) {
-      // remove and close the last one, if it was non-null
-      if (lastSocketAppender != null) {
-        AccumuloMonitorAppender.this.removeAppender(lastSocketAppender);
-        lastSocketAppender.close();
-      }
+  private static class SocketAppenderFactory implements Function<MonitorLocation,AppenderSkeleton> {
+    @Override
+    public AppenderSkeleton apply(MonitorLocation loc) {
+      int defaultPort = Integer.parseUnsignedInt(Property.MONITOR_LOG4J_PORT.getDefaultValue());
+      HostAndPort remote = HostAndPort.fromString(loc.getLocation());
+
+      SocketAppender socketAppender = new SocketAppender();
+      socketAppender.setApplication(System.getProperty("accumulo.application", "unknown"));
+      socketAppender.setRemoteHost(remote.getHostText());
+      socketAppender.setPort(remote.getPortOrDefault(defaultPort));
 
-      // create a new SocketAppender, if new location is non-null
-      if (newLocation != null) {
+      return socketAppender;
+    }
+  }
 
-        int defaultPort = Integer.parseUnsignedInt(Property.MONITOR_LOG4J_PORT.getDefaultValue());
-        HostAndPort remote = HostAndPort.fromString(new String(newLocation, UTF_8));
+  static class MonitorTracker implements Runnable {
 
-        SocketAppender socketAppender = new SocketAppender();
-        socketAppender.setApplication(System.getProperty("accumulo.application", "unknown"));
-        socketAppender.setRemoteHost(remote.getHostText());
-        socketAppender.setPort(remote.getPortOrDefault(defaultPort));
+    private final AccumuloMonitorAppender parentAsyncAppender;
+    private final Supplier<MonitorLocation> currentLocationSupplier;
+    private final Function<MonitorLocation,AppenderSkeleton> appenderFactory;
 
-        socketAppender.activateOptions();
-        AccumuloMonitorAppender.this.addAppender(socketAppender);
+    private MonitorLocation lastLocation;
+    private AppenderSkeleton lastSocketAppender;
 
-        lastSocketAppender = socketAppender;
-      }
+    public MonitorTracker(AccumuloMonitorAppender appender, Supplier<MonitorLocation> currentLocationSupplier,
+        Function<MonitorLocation,AppenderSkeleton> appenderFactory) {
+      this.parentAsyncAppender = Objects.requireNonNull(appender);
+      this.appenderFactory = Objects.requireNonNull(appenderFactory);
+      this.currentLocationSupplier = Objects.requireNonNull(currentLocationSupplier);
 
-      // update lastModifiedTransactionId, even if the new one is 0 (no new location)
-      lastModifiedTransactionId = newModifiedTransactionId;
+      this.lastLocation = new MonitorLocation(0, null);
+      this.lastSocketAppender = null;
+    }
+
+    @Override
+    public void run() {
+      try {
+        MonitorLocation currentLocation = currentLocationSupplier.get();
+        // detect change
+        if (!currentLocation.equals(lastLocation)) {
+          // clean up old appender
+          if (lastSocketAppender != null) {
+            parentAsyncAppender.removeAppender(lastSocketAppender);
+            lastSocketAppender.close();
+            lastSocketAppender = null;
+          }
+          // create a new one
+          if (currentLocation.hasLocation()) {
+            lastSocketAppender = appenderFactory.apply(currentLocation);
+            lastSocketAppender.activateOptions();
+            parentAsyncAppender.addAppender(lastSocketAppender);
+          }
+          // update the last location only if switching was successful
+          lastLocation = currentLocation;
+        }
+      } catch (Exception e) {
+        // dump any non-fatal problems to the console, but let it run again
+        e.printStackTrace();
+      }
     }
 
   }

http://git-wip-us.apache.org/repos/asf/accumulo/blob/5afbacc0/server/monitor/src/test/java/org/apache/accumulo/monitor/util/AccumuloMonitorAppenderTest.java
----------------------------------------------------------------------
diff --git a/server/monitor/src/test/java/org/apache/accumulo/monitor/util/AccumuloMonitorAppenderTest.java b/server/monitor/src/test/java/org/apache/accumulo/monitor/util/AccumuloMonitorAppenderTest.java
new file mode 100644
index 0000000..cd4eb03
--- /dev/null
+++ b/server/monitor/src/test/java/org/apache/accumulo/monitor/util/AccumuloMonitorAppenderTest.java
@@ -0,0 +1,184 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.accumulo.monitor.util;
+
+import static java.nio.charset.StandardCharsets.UTF_8;
+import static org.junit.Assert.fail;
+
+import java.util.Enumeration;
+import java.util.concurrent.ExecutionException;
+import java.util.concurrent.RejectedExecutionException;
+import java.util.concurrent.ScheduledExecutorService;
+import java.util.concurrent.ScheduledFuture;
+import java.util.concurrent.TimeUnit;
+import java.util.concurrent.atomic.AtomicLong;
+import java.util.function.Function;
+import java.util.function.Supplier;
+
+import org.apache.accumulo.monitor.util.AccumuloMonitorAppender.MonitorLocation;
+import org.apache.accumulo.monitor.util.AccumuloMonitorAppender.MonitorTracker;
+import org.apache.log4j.AppenderSkeleton;
+import org.apache.log4j.spi.LoggingEvent;
+import org.junit.Assert;
+import org.junit.Rule;
+import org.junit.Test;
+import org.junit.rules.ExpectedException;
+import org.junit.rules.Timeout;
+
+public class AccumuloMonitorAppenderTest {
+
+  @Rule
+  public Timeout timeout = new Timeout(10, TimeUnit.SECONDS);
+
+  @Rule
+  public ExpectedException exception = ExpectedException.none();
+
+  @Test
+  public void testActivateOptions() {
+    try (AccumuloMonitorAppender appender = new AccumuloMonitorAppender()) {
+      appender.executorService.shutdown();
+      // simulate tracker having already been scheduled
+      appender.trackerScheduled.compareAndSet(false, true);
+      appender.activateOptions();
+      // activateOptions should not trigger a RejectedExecutionException, because we tricked it into thinking it was already called, and therefore it did not
+      // schedule the tracker after shutting down
+    }
+
+    exception.expect(RejectedExecutionException.class);
+    try (AccumuloMonitorAppender appender = new AccumuloMonitorAppender()) {
+      appender.executorService.shutdown();
+      appender.activateOptions();
+      fail("Calling activateOptions should have triggered a RejectedExecutionException");
+      // this ensures that the activateOptions correctly attempts to schedule a worker
+    }
+  }
+
+  @Test
+  public void testExecutorService() throws InterruptedException, ExecutionException {
+    ScheduledExecutorService executorService = null;
+    AtomicLong counter = new AtomicLong(2);
+    try (AccumuloMonitorAppender appender = new AccumuloMonitorAppender()) {
+      executorService = appender.executorService;
+
+      // make sure executor service is started and running
+      Assert.assertEquals(false, executorService.isShutdown());
+      Assert.assertEquals(false, executorService.isTerminated());
+
+      // make sure executor service executes tasks
+      ScheduledFuture<Long> future = executorService.schedule(() -> counter.getAndIncrement(), 1, TimeUnit.MILLISECONDS);
+      Assert.assertEquals(Long.valueOf(2), future.get());
+      Assert.assertEquals(3, counter.get());
+
+      // schedule a task that won't finish
+      executorService.schedule(() -> counter.getAndIncrement(), 1, TimeUnit.DAYS);
+
+      // make sure executor service is still running
+      Assert.assertEquals(false, executorService.isShutdown());
+      Assert.assertEquals(false, executorService.isTerminated());
+    }
+    // verify that closing the appender shuts down the executor service threads
+    Assert.assertEquals(true, executorService.isShutdown());
+    executorService.awaitTermination(5, TimeUnit.SECONDS);
+    Assert.assertEquals(true, executorService.isTerminated());
+
+    // verify executor service did not wait for scheduled task to run
+    Assert.assertEquals(3, counter.get());
+  }
+
+  @Test
+  public void testMonitorTracker() throws InterruptedException {
+    AtomicLong currentLoc = new AtomicLong(0);
+    Supplier<MonitorLocation> locSupplier = () -> {
+      long loc = currentLoc.get();
+      // for simplicity, create the location name from a number (0 represents no location)
+      byte[] location = loc == 0 ? null : ("loc" + loc).getBytes(UTF_8);
+      return new MonitorLocation(loc, location);
+    };
+    Function<MonitorLocation,AppenderSkeleton> appenderFactory = newLocation -> new AppenderSkeleton() {
+
+      {
+        this.name = "Appender for " + newLocation.getLocation();
+      }
+
+      @Override
+      public boolean requiresLayout() {
+        return false;
+      }
+
+      @Override
+      public void close() {}
+
+      @Override
+      protected void append(LoggingEvent event) {}
+
+    };
+
+    try (AccumuloMonitorAppender parent = new AccumuloMonitorAppender()) {
+      parent.setFrequency(1); // make it check frequently (every 1 millisecond)
+      parent.setTracker(new MonitorTracker(parent, locSupplier, appenderFactory));
+      parent.activateOptions();
+
+      // initially there are no appenders
+      Assert.assertTrue(parent.getAllAppenders() == null);
+      updateLocAndVerify(currentLoc, parent, 0);
+      updateLocAndVerify(currentLoc, parent, 10);
+
+      // verify it's the same after a few times
+      // this verifies the logic in the tracker's run method which compares current location with last to see if a change occurred
+      AppenderSkeleton lastAppender = (AppenderSkeleton) parent.getAllAppenders().nextElement();
+      for (int x = 0; x < 10; x++) {
+        Thread.sleep(10);
+        AppenderSkeleton currentAppender = (AppenderSkeleton) parent.getAllAppenders().nextElement();
+        Assert.assertSame(lastAppender, currentAppender);
+      }
+
+      updateLocAndVerify(currentLoc, parent, 3);
+      updateLocAndVerify(currentLoc, parent, 0);
+      updateLocAndVerify(currentLoc, parent, 0);
+      updateLocAndVerify(currentLoc, parent, 12);
+      updateLocAndVerify(currentLoc, parent, 0);
+      updateLocAndVerify(currentLoc, parent, 335);
+
+      updateLocAndVerify(currentLoc, parent, 0);
+      // verify we removed all the appenders
+      Assert.assertFalse(parent.getAllAppenders().hasMoreElements());
+    }
+  }
+
+  private static void updateLocAndVerify(AtomicLong currentLoc, AccumuloMonitorAppender parent, int newLoc) {
+    // set the new location
+    currentLoc.set(newLoc);
+    // wait for the appender to notice the change
+    while (!verifyAppender(parent, newLoc == 0 ? null : ("loc" + newLoc))) {}
+  }
+
+  private static boolean verifyAppender(AccumuloMonitorAppender parent, String newLocName) {
+    Enumeration<?> childAppenders = parent.getAllAppenders();
+    if (newLocName == null) {
+      return childAppenders == null || !childAppenders.hasMoreElements();
+    }
+    if (childAppenders == null || !childAppenders.hasMoreElements()) {
+      return false;
+    }
+    AppenderSkeleton child = (AppenderSkeleton) childAppenders.nextElement();
+    if (childAppenders.hasMoreElements()) {
+      Assert.fail("Appender should never have more than one child");
+    }
+    return ("Appender for " + newLocName).equals(child.getName());
+  }
+
+}