You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@accumulo.apache.org by ct...@apache.org on 2017/03/09 01:53:22 UTC
[3/4] accumulo git commit: ACCUMULO-4409 Add tests for monitor
appender
ACCUMULO-4409 Add tests for monitor appender
Changes to make AccumuloMonitorAppender more testable and the tests to
accompany it.
Project: http://git-wip-us.apache.org/repos/asf/accumulo/repo
Commit: http://git-wip-us.apache.org/repos/asf/accumulo/commit/5afbacc0
Tree: http://git-wip-us.apache.org/repos/asf/accumulo/tree/5afbacc0
Diff: http://git-wip-us.apache.org/repos/asf/accumulo/diff/5afbacc0
Branch: refs/heads/master
Commit: 5afbacc0af9a6d7f9c610969eafcfed2734364a2
Parents: ff7525e
Author: Christopher Tubbs <ct...@apache.org>
Authored: Wed Mar 8 20:50:44 2017 -0500
Committer: Christopher Tubbs <ct...@apache.org>
Committed: Wed Mar 8 20:50:44 2017 -0500
----------------------------------------------------------------------
.../monitor/util/AccumuloMonitorAppender.java | 193 +++++++++++++------
.../util/AccumuloMonitorAppenderTest.java | 184 ++++++++++++++++++
2 files changed, 315 insertions(+), 62 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/accumulo/blob/5afbacc0/server/monitor/src/main/java/org/apache/accumulo/monitor/util/AccumuloMonitorAppender.java
----------------------------------------------------------------------
diff --git a/server/monitor/src/main/java/org/apache/accumulo/monitor/util/AccumuloMonitorAppender.java b/server/monitor/src/main/java/org/apache/accumulo/monitor/util/AccumuloMonitorAppender.java
index 8a855d0..a965f0c 100644
--- a/server/monitor/src/main/java/org/apache/accumulo/monitor/util/AccumuloMonitorAppender.java
+++ b/server/monitor/src/main/java/org/apache/accumulo/monitor/util/AccumuloMonitorAppender.java
@@ -18,10 +18,13 @@ package org.apache.accumulo.monitor.util;
import static java.nio.charset.StandardCharsets.UTF_8;
+import java.util.Objects;
import java.util.concurrent.Executors;
import java.util.concurrent.ScheduledExecutorService;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.atomic.AtomicBoolean;
+import java.util.function.Function;
+import java.util.function.Supplier;
import org.apache.accumulo.core.Constants;
import org.apache.accumulo.core.client.Instance;
@@ -30,16 +33,19 @@ import org.apache.accumulo.core.zookeeper.ZooUtil;
import org.apache.accumulo.fate.zookeeper.ZooCache;
import org.apache.accumulo.fate.zookeeper.ZooCacheFactory;
import org.apache.accumulo.server.client.HdfsZooInstance;
+import org.apache.log4j.AppenderSkeleton;
import org.apache.log4j.AsyncAppender;
import org.apache.log4j.net.SocketAppender;
import org.apache.zookeeper.data.Stat;
import com.google.common.net.HostAndPort;
-public class AccumuloMonitorAppender extends AsyncAppender {
+public class AccumuloMonitorAppender extends AsyncAppender implements AutoCloseable {
- private final ScheduledExecutorService executorService;
- private final AtomicBoolean trackerScheduled;
+ final ScheduledExecutorService executorService;
+ final AtomicBoolean trackerScheduled;
+ private int frequency = 0;
+ private MonitorTracker tracker = null;
/**
* A Log4j Appender which follows the registered location of the active Accumulo monitor service, and forwards log messages to it
@@ -54,12 +60,33 @@ public class AccumuloMonitorAppender extends AsyncAppender {
});
}
+ public void setFrequency(int millis) {
+ if (millis > 0) {
+ frequency = millis;
+ }
+ }
+
+ public int getFrequency() {
+ return frequency;
+ }
+
+ // this is just for testing
+ void setTracker(MonitorTracker monitorTracker) {
+ tracker = monitorTracker;
+ }
+
@Override
public void activateOptions() {
// only schedule it once (in case options get activated more than once); not sure if this is possible
if (trackerScheduled.compareAndSet(false, true)) {
- // wait 5 seconds, then run every 5 seconds
- executorService.scheduleAtFixedRate(new MonitorTracker(), 5, 5, TimeUnit.SECONDS);
+ if (frequency <= 0) {
+ // use default rate of 5 seconds between each check
+ frequency = 5000;
+ }
+ if (tracker == null) {
+ tracker = new MonitorTracker(this, new ZooCacheLocationSupplier(), new SocketAppenderFactory());
+ }
+ executorService.scheduleWithFixedDelay(tracker, frequency, frequency, TimeUnit.MILLISECONDS);
}
super.activateOptions();
}
@@ -72,80 +99,122 @@ public class AccumuloMonitorAppender extends AsyncAppender {
super.close();
}
- private class MonitorTracker implements Runnable {
+ static class MonitorLocation {
+ private final String location;
+ private final long modId;
- private String path;
- private ZooCache zooCache;
-
- private long lastModifiedTransactionId;
- private SocketAppender lastSocketAppender;
+ public MonitorLocation(long modId, byte[] location) {
+ this.modId = modId;
+ this.location = location == null ? null : new String(location, UTF_8);
+ }
- public MonitorTracker() {
+ public boolean hasLocation() {
+ return location != null;
+ }
- // path and zooCache are lazily set the first time this tracker is run
- // this allows the tracker to be constructed and scheduled during log4j initialization without
- // triggering any actual logs from the Accumulo or ZooKeeper code
- this.path = null;
- this.zooCache = null;
+ public String getLocation() {
+ return location;
+ }
- this.lastModifiedTransactionId = 0;
- this.lastSocketAppender = null;
+ @Override
+ public boolean equals(Object obj) {
+ if (obj != null && obj instanceof MonitorLocation) {
+ MonitorLocation other = (MonitorLocation) obj;
+ return modId == other.modId && Objects.equals(location, other.location);
+ }
+ return false;
}
@Override
- public void run() {
- try {
- // lazily set up path and zooCache (see comment in constructor)
- if (this.zooCache == null) {
- Instance instance = HdfsZooInstance.getInstance();
- this.path = ZooUtil.getRoot(instance) + Constants.ZMONITOR_LOG4J_ADDR;
- this.zooCache = new ZooCacheFactory().getZooCache(instance.getZooKeepers(), instance.getZooKeepersSessionTimeOut());
- }
+ public int hashCode() {
+ return Long.hashCode(modId);
+ }
+ }
- // get the current location from the cache and update if necessary
- Stat stat = new Stat();
- byte[] loc = zooCache.get(path, stat);
- long modifiedTransactionId = stat.getMzxid();
-
- // modifiedTransactionId will be 0 if no current location
- // lastModifiedTransactionId will be 0 if we've never seen a location
- if (modifiedTransactionId != lastModifiedTransactionId) {
- // replace old socket on every change, even if new location is the same as old location
- // if modifiedTransactionId changed, then the monitor restarted and the old socket is dead now
- switchAppender(loc, modifiedTransactionId);
- }
- } catch (Exception e) {
- // dump any non-fatal problems to the console, but let it run again
- e.printStackTrace();
+ private static class ZooCacheLocationSupplier implements Supplier<MonitorLocation> {
+
+ // path and zooCache are lazily set the first time this tracker is run
+ // this allows the tracker to be constructed and scheduled during log4j initialization without
+ // triggering any actual logs from the Accumulo or ZooKeeper code
+ private String path = null;
+ private ZooCache zooCache = null;
+
+ @Override
+ public MonitorLocation get() {
+ // lazily set up path and zooCache (see comment in constructor)
+ if (this.zooCache == null) {
+ Instance instance = HdfsZooInstance.getInstance();
+ this.path = ZooUtil.getRoot(instance) + Constants.ZMONITOR_LOG4J_ADDR;
+ this.zooCache = new ZooCacheFactory().getZooCache(instance.getZooKeepers(), instance.getZooKeepersSessionTimeOut());
}
+
+ // get the current location from the cache and update if necessary
+ Stat stat = new Stat();
+ byte[] loc = zooCache.get(path, stat);
+ // mzxid is 0 if location does not exist and the non-zero transaction id of the last modification otherwise
+ return new MonitorLocation(stat.getMzxid(), loc);
}
+ }
- private void switchAppender(byte[] newLocation, long newModifiedTransactionId) {
- // remove and close the last one, if it was non-null
- if (lastSocketAppender != null) {
- AccumuloMonitorAppender.this.removeAppender(lastSocketAppender);
- lastSocketAppender.close();
- }
+ private static class SocketAppenderFactory implements Function<MonitorLocation,AppenderSkeleton> {
+ @Override
+ public AppenderSkeleton apply(MonitorLocation loc) {
+ int defaultPort = Integer.parseUnsignedInt(Property.MONITOR_LOG4J_PORT.getDefaultValue());
+ HostAndPort remote = HostAndPort.fromString(loc.getLocation());
+
+ SocketAppender socketAppender = new SocketAppender();
+ socketAppender.setApplication(System.getProperty("accumulo.application", "unknown"));
+ socketAppender.setRemoteHost(remote.getHostText());
+ socketAppender.setPort(remote.getPortOrDefault(defaultPort));
- // create a new SocketAppender, if new location is non-null
- if (newLocation != null) {
+ return socketAppender;
+ }
+ }
- int defaultPort = Integer.parseUnsignedInt(Property.MONITOR_LOG4J_PORT.getDefaultValue());
- HostAndPort remote = HostAndPort.fromString(new String(newLocation, UTF_8));
+ static class MonitorTracker implements Runnable {
- SocketAppender socketAppender = new SocketAppender();
- socketAppender.setApplication(System.getProperty("accumulo.application", "unknown"));
- socketAppender.setRemoteHost(remote.getHostText());
- socketAppender.setPort(remote.getPortOrDefault(defaultPort));
+ private final AccumuloMonitorAppender parentAsyncAppender;
+ private final Supplier<MonitorLocation> currentLocationSupplier;
+ private final Function<MonitorLocation,AppenderSkeleton> appenderFactory;
- socketAppender.activateOptions();
- AccumuloMonitorAppender.this.addAppender(socketAppender);
+ private MonitorLocation lastLocation;
+ private AppenderSkeleton lastSocketAppender;
- lastSocketAppender = socketAppender;
- }
+ public MonitorTracker(AccumuloMonitorAppender appender, Supplier<MonitorLocation> currentLocationSupplier,
+ Function<MonitorLocation,AppenderSkeleton> appenderFactory) {
+ this.parentAsyncAppender = Objects.requireNonNull(appender);
+ this.appenderFactory = Objects.requireNonNull(appenderFactory);
+ this.currentLocationSupplier = Objects.requireNonNull(currentLocationSupplier);
- // update lastModifiedTransactionId, even if the new one is 0 (no new location)
- lastModifiedTransactionId = newModifiedTransactionId;
+ this.lastLocation = new MonitorLocation(0, null);
+ this.lastSocketAppender = null;
+ }
+
+ @Override
+ public void run() {
+ try {
+ MonitorLocation currentLocation = currentLocationSupplier.get();
+ // detect change
+ if (!currentLocation.equals(lastLocation)) {
+ // clean up old appender
+ if (lastSocketAppender != null) {
+ parentAsyncAppender.removeAppender(lastSocketAppender);
+ lastSocketAppender.close();
+ lastSocketAppender = null;
+ }
+ // create a new one
+ if (currentLocation.hasLocation()) {
+ lastSocketAppender = appenderFactory.apply(currentLocation);
+ lastSocketAppender.activateOptions();
+ parentAsyncAppender.addAppender(lastSocketAppender);
+ }
+ // update the last location only if switching was successful
+ lastLocation = currentLocation;
+ }
+ } catch (Exception e) {
+ // dump any non-fatal problems to the console, but let it run again
+ e.printStackTrace();
+ }
}
}
http://git-wip-us.apache.org/repos/asf/accumulo/blob/5afbacc0/server/monitor/src/test/java/org/apache/accumulo/monitor/util/AccumuloMonitorAppenderTest.java
----------------------------------------------------------------------
diff --git a/server/monitor/src/test/java/org/apache/accumulo/monitor/util/AccumuloMonitorAppenderTest.java b/server/monitor/src/test/java/org/apache/accumulo/monitor/util/AccumuloMonitorAppenderTest.java
new file mode 100644
index 0000000..cd4eb03
--- /dev/null
+++ b/server/monitor/src/test/java/org/apache/accumulo/monitor/util/AccumuloMonitorAppenderTest.java
@@ -0,0 +1,184 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.accumulo.monitor.util;
+
+import static java.nio.charset.StandardCharsets.UTF_8;
+import static org.junit.Assert.fail;
+
+import java.util.Enumeration;
+import java.util.concurrent.ExecutionException;
+import java.util.concurrent.RejectedExecutionException;
+import java.util.concurrent.ScheduledExecutorService;
+import java.util.concurrent.ScheduledFuture;
+import java.util.concurrent.TimeUnit;
+import java.util.concurrent.atomic.AtomicLong;
+import java.util.function.Function;
+import java.util.function.Supplier;
+
+import org.apache.accumulo.monitor.util.AccumuloMonitorAppender.MonitorLocation;
+import org.apache.accumulo.monitor.util.AccumuloMonitorAppender.MonitorTracker;
+import org.apache.log4j.AppenderSkeleton;
+import org.apache.log4j.spi.LoggingEvent;
+import org.junit.Assert;
+import org.junit.Rule;
+import org.junit.Test;
+import org.junit.rules.ExpectedException;
+import org.junit.rules.Timeout;
+
+public class AccumuloMonitorAppenderTest {
+
+ @Rule
+ public Timeout timeout = new Timeout(10, TimeUnit.SECONDS);
+
+ @Rule
+ public ExpectedException exception = ExpectedException.none();
+
+ @Test
+ public void testActivateOptions() {
+ try (AccumuloMonitorAppender appender = new AccumuloMonitorAppender()) {
+ appender.executorService.shutdown();
+ // simulate tracker having already been scheduled
+ appender.trackerScheduled.compareAndSet(false, true);
+ appender.activateOptions();
+ // activateOptions should not trigger a RejectedExecutionException, because we tricked it into thinking it was already called, and therefore it did not
+ // schedule the tracker after shutting down
+ }
+
+ exception.expect(RejectedExecutionException.class);
+ try (AccumuloMonitorAppender appender = new AccumuloMonitorAppender()) {
+ appender.executorService.shutdown();
+ appender.activateOptions();
+ fail("Calling activateOptions should have triggered a RejectedExecutionException");
+ // this ensures that the activateOptions correctly attempts to schedule a worker
+ }
+ }
+
+ @Test
+ public void testExecutorService() throws InterruptedException, ExecutionException {
+ ScheduledExecutorService executorService = null;
+ AtomicLong counter = new AtomicLong(2);
+ try (AccumuloMonitorAppender appender = new AccumuloMonitorAppender()) {
+ executorService = appender.executorService;
+
+ // make sure executor service is started and running
+ Assert.assertEquals(false, executorService.isShutdown());
+ Assert.assertEquals(false, executorService.isTerminated());
+
+ // make sure executor service executes tasks
+ ScheduledFuture<Long> future = executorService.schedule(() -> counter.getAndIncrement(), 1, TimeUnit.MILLISECONDS);
+ Assert.assertEquals(Long.valueOf(2), future.get());
+ Assert.assertEquals(3, counter.get());
+
+ // schedule a task that won't finish
+ executorService.schedule(() -> counter.getAndIncrement(), 1, TimeUnit.DAYS);
+
+ // make sure executor service is still running
+ Assert.assertEquals(false, executorService.isShutdown());
+ Assert.assertEquals(false, executorService.isTerminated());
+ }
+ // verify that closing the appender shuts down the executor service threads
+ Assert.assertEquals(true, executorService.isShutdown());
+ executorService.awaitTermination(5, TimeUnit.SECONDS);
+ Assert.assertEquals(true, executorService.isTerminated());
+
+ // verify executor service did not wait for scheduled task to run
+ Assert.assertEquals(3, counter.get());
+ }
+
+ @Test
+ public void testMonitorTracker() throws InterruptedException {
+ AtomicLong currentLoc = new AtomicLong(0);
+ Supplier<MonitorLocation> locSupplier = () -> {
+ long loc = currentLoc.get();
+ // for simplicity, create the location name from a number (0 represents no location)
+ byte[] location = loc == 0 ? null : ("loc" + loc).getBytes(UTF_8);
+ return new MonitorLocation(loc, location);
+ };
+ Function<MonitorLocation,AppenderSkeleton> appenderFactory = newLocation -> new AppenderSkeleton() {
+
+ {
+ this.name = "Appender for " + newLocation.getLocation();
+ }
+
+ @Override
+ public boolean requiresLayout() {
+ return false;
+ }
+
+ @Override
+ public void close() {}
+
+ @Override
+ protected void append(LoggingEvent event) {}
+
+ };
+
+ try (AccumuloMonitorAppender parent = new AccumuloMonitorAppender()) {
+ parent.setFrequency(1); // make it check frequently (every 1 millisecond)
+ parent.setTracker(new MonitorTracker(parent, locSupplier, appenderFactory));
+ parent.activateOptions();
+
+ // initially there are no appenders
+ Assert.assertTrue(parent.getAllAppenders() == null);
+ updateLocAndVerify(currentLoc, parent, 0);
+ updateLocAndVerify(currentLoc, parent, 10);
+
+ // verify it's the same after a few times
+ // this verifies the logic in the tracker's run method which compares current location with last to see if a change occurred
+ AppenderSkeleton lastAppender = (AppenderSkeleton) parent.getAllAppenders().nextElement();
+ for (int x = 0; x < 10; x++) {
+ Thread.sleep(10);
+ AppenderSkeleton currentAppender = (AppenderSkeleton) parent.getAllAppenders().nextElement();
+ Assert.assertSame(lastAppender, currentAppender);
+ }
+
+ updateLocAndVerify(currentLoc, parent, 3);
+ updateLocAndVerify(currentLoc, parent, 0);
+ updateLocAndVerify(currentLoc, parent, 0);
+ updateLocAndVerify(currentLoc, parent, 12);
+ updateLocAndVerify(currentLoc, parent, 0);
+ updateLocAndVerify(currentLoc, parent, 335);
+
+ updateLocAndVerify(currentLoc, parent, 0);
+ // verify we removed all the appenders
+ Assert.assertFalse(parent.getAllAppenders().hasMoreElements());
+ }
+ }
+
+ private static void updateLocAndVerify(AtomicLong currentLoc, AccumuloMonitorAppender parent, int newLoc) {
+ // set the new location
+ currentLoc.set(newLoc);
+ // wait for the appender to notice the change
+ while (!verifyAppender(parent, newLoc == 0 ? null : ("loc" + newLoc))) {}
+ }
+
+ private static boolean verifyAppender(AccumuloMonitorAppender parent, String newLocName) {
+ Enumeration<?> childAppenders = parent.getAllAppenders();
+ if (newLocName == null) {
+ return childAppenders == null || !childAppenders.hasMoreElements();
+ }
+ if (childAppenders == null || !childAppenders.hasMoreElements()) {
+ return false;
+ }
+ AppenderSkeleton child = (AppenderSkeleton) childAppenders.nextElement();
+ if (childAppenders.hasMoreElements()) {
+ Assert.fail("Appender should never have more than one child");
+ }
+ return ("Appender for " + newLocName).equals(child.getName());
+ }
+
+}