You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@samza.apache.org by ni...@apache.org on 2016/04/10 06:20:19 UTC
samza git commit: SAMZA-924: Add disk space monitoring
Repository: samza
Updated Branches:
refs/heads/master 57aae364b -> dc67d1560
SAMZA-924: Add disk space monitoring
Project: http://git-wip-us.apache.org/repos/asf/samza/repo
Commit: http://git-wip-us.apache.org/repos/asf/samza/commit/dc67d156
Tree: http://git-wip-us.apache.org/repos/asf/samza/tree/dc67d156
Diff: http://git-wip-us.apache.org/repos/asf/samza/diff/dc67d156
Branch: refs/heads/master
Commit: dc67d1560c0ae76fe75e6b767b857ceb492cb7a8
Parents: 57aae36
Author: Chris Pettitt <cp...@linkedin.com>
Authored: Sat Apr 9 21:19:38 2016 -0700
Committer: Yi Pan (Data Infrastructure) <ni...@gmail.com>
Committed: Sat Apr 9 21:19:38 2016 -0700
----------------------------------------------------------------------
.../samza/container/disk/DiskSpaceMonitor.java | 59 +++++
.../disk/PollingScanDiskSpaceMonitor.java | 199 +++++++++++++++
.../apache/samza/container/SamzaContainer.scala | 53 +++-
.../samza/container/SamzaContainerMetrics.scala | 13 +-
.../disk/TestPollingScanDiskSpaceMonitor.java | 252 +++++++++++++++++++
5 files changed, 569 insertions(+), 7 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/samza/blob/dc67d156/samza-core/src/main/java/org/apache/samza/container/disk/DiskSpaceMonitor.java
----------------------------------------------------------------------
diff --git a/samza-core/src/main/java/org/apache/samza/container/disk/DiskSpaceMonitor.java b/samza-core/src/main/java/org/apache/samza/container/disk/DiskSpaceMonitor.java
new file mode 100644
index 0000000..2a565be
--- /dev/null
+++ b/samza-core/src/main/java/org/apache/samza/container/disk/DiskSpaceMonitor.java
@@ -0,0 +1,59 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.samza.container.disk;
+
+/**
+ * An object that monitors the amount of disk space used and reports this usage via a
+ * {@link DiskSpaceMonitor.Listener}.
+ */
+public interface DiskSpaceMonitor {
+ /**
+ * Starts the disk space monitor.
+ */
+ void start();
+
+ /**
+ * Stops the disk space monitor. Once shutdown is complete listeners will not longer receive
+ * new samples. A stopped monitor cannot be restarted with {@link #start()}.
+ */
+ void stop();
+
+ /**
+ * Registers the specified listener with this monitor. The listener will be called
+ * when the monitor has a new sample. The update interval is implementation specific.
+ *
+ * @param listener the listener to register
+ * @return {@code true} if the registration was successful and {@code false} if not. Registration
+ * can fail if the monitor has been stopped or if the listener was already registered.
+ */
+ boolean registerListener(Listener listener);
+
+ /**
+ * A listener that is notified when the disk space manager has sampled a new disk usage value.
+ * Register this listener with {@link #registerListener(Listener)} to receive updates.
+ */
+ interface Listener {
+ /**
+ * Invoked with new samples as they become available.
+ *
+ * @param diskUsageSample the measured disk usage size in bytes.
+ */
+ void onUpdate(long diskUsageSample);
+ }
+}
http://git-wip-us.apache.org/repos/asf/samza/blob/dc67d156/samza-core/src/main/java/org/apache/samza/container/disk/PollingScanDiskSpaceMonitor.java
----------------------------------------------------------------------
diff --git a/samza-core/src/main/java/org/apache/samza/container/disk/PollingScanDiskSpaceMonitor.java b/samza-core/src/main/java/org/apache/samza/container/disk/PollingScanDiskSpaceMonitor.java
new file mode 100644
index 0000000..50c8500
--- /dev/null
+++ b/samza-core/src/main/java/org/apache/samza/container/disk/PollingScanDiskSpaceMonitor.java
@@ -0,0 +1,199 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.samza.container.disk;
+
+import java.io.IOException;
+import java.nio.file.DirectoryStream;
+import java.nio.file.Files;
+import java.nio.file.Path;
+import java.nio.file.attribute.BasicFileAttributes;
+import java.util.ArrayDeque;
+import java.util.Collections;
+import java.util.HashSet;
+import java.util.Set;
+import java.util.concurrent.ConcurrentHashMap;
+import java.util.concurrent.ConcurrentMap;
+import java.util.concurrent.Executors;
+import java.util.concurrent.ScheduledExecutorService;
+import java.util.concurrent.ThreadFactory;
+import java.util.concurrent.TimeUnit;
+import java.util.concurrent.atomic.AtomicInteger;
+
+/**
+ * An implementation of {@link DiskSpaceMonitor} that polls for disk usage based on a specified
+ * polling interval.
+ * <p>
+ * This class is thread-safe.
+ */
+public class PollingScanDiskSpaceMonitor implements DiskSpaceMonitor {
+ private enum State { INIT, RUNNING, STOPPED }
+
+ private static final ThreadFactory THREAD_FACTORY = new ThreadFactoryImpl();
+ // Note: we use this as a set where the value is always Boolean.TRUE.
+ private final ConcurrentMap<Listener, Boolean> listenerSet = new ConcurrentHashMap<>();
+
+ // Used to guard write access to state and listenerSet.
+ private final Object lock = new Object();
+
+ private final ScheduledExecutorService schedulerService =
+ Executors.newSingleThreadScheduledExecutor(THREAD_FACTORY);
+ private final Set<Path> watchPaths;
+ private final long pollingIntervalMillis;
+
+ private State state = State.INIT;
+
+ /**
+ * Returns the total size in bytes used by the specified paths. This function guarantees that it
+ * will not double count overlapping portions of the path set. For example, with a trivial
+ * overlap of /A and /A, it will count /A only once. It also handles other types of overlaps
+ * similarly, such as counting /A/B only once given the paths /A and /A/B.
+ * <p>
+ * This function is exposed as package private to simplify testing various cases without involving
+ * an executor. Alternatively this could have been pulled out to a utility class, but it would
+ * unnecessarily pollute the global namespace.
+ */
+ static long getSpaceUsed(Set<Path> paths) {
+ ArrayDeque<Path> pathStack = new ArrayDeque<>();
+
+ for (Path path : paths) {
+ pathStack.push(path);
+ }
+
+ // Track the directories we've visited to ensure we're not double counting. It would be
+ // preferable to resolve overlap once at startup, but the problem is that the filesystem may
+ // change over time and, in fact, at startup I found that the rocks DB store directory was not
+ // created by the time the disk space monitor was started.
+ Set<Path> visited = new HashSet<>();
+ long totalBytes = 0;
+ while (!pathStack.isEmpty()) {
+ try {
+ // We need to resolve to the real path to ensure that we don't inadvertently double count
+ // due to different paths to the same directory (e.g. /A and /A/../A).
+ Path current = pathStack.pop().toRealPath();
+
+ if (visited.contains(current)) {
+ continue;
+ }
+ visited.add(current);
+
+ BasicFileAttributes currentAttrs = Files.readAttributes(current,
+ BasicFileAttributes.class);
+ if (currentAttrs.isDirectory()) {
+ try (DirectoryStream<Path> directoryListing = Files.newDirectoryStream(current)) {
+ for (Path child : directoryListing) {
+ pathStack.push(child);
+ }
+ }
+ } else if (currentAttrs.isRegularFile()) {
+ totalBytes += currentAttrs.size();
+ }
+ } catch (IOException e) {
+ // If we can't stat the file, just ignore it. This can happen, for example, if we scan
+ // a directory, but by the time we get to stat'ing the file it has been deleted (e.g.
+ // due to compaction, rotation, etc.).
+ }
+ }
+
+ return totalBytes;
+ }
+
+ /**
+ * Creates a new disk space monitor that uses a periodic polling mechanism.
+ *
+ * @param watchPaths the set of paths to watch
+ * @param pollingIntervalMillis the polling interval in milliseconds
+ */
+ public PollingScanDiskSpaceMonitor(Set<Path> watchPaths, long pollingIntervalMillis) {
+ this.watchPaths = Collections.unmodifiableSet(new HashSet<>(watchPaths));
+ this.pollingIntervalMillis = pollingIntervalMillis;
+ }
+
+ @Override
+ public void start() {
+ synchronized (lock) {
+ switch (state) {
+ case INIT:
+ schedulerService.scheduleAtFixedRate(new Runnable() {
+ @Override
+ public void run() {
+ updateSample();
+ }
+ }, pollingIntervalMillis, pollingIntervalMillis, TimeUnit.MILLISECONDS);
+
+ state = State.RUNNING;
+ break;
+
+ case RUNNING:
+ // start is idempotent
+ return;
+
+ case STOPPED:
+ throw new IllegalStateException("PollingScanDiskSpaceMonitor was stopped and cannot be restarted.");
+ }
+ }
+ }
+
+ @Override
+ public void stop() {
+ synchronized (lock) {
+ // We could also wait for full termination of the scheduler service, but it is overkill for
+ // our use case.
+ schedulerService.shutdownNow();
+
+ listenerSet.clear();
+ state = State.STOPPED;
+ }
+ }
+
+ @Override
+ public boolean registerListener(Listener listener) {
+ synchronized (lock) {
+ if (state != State.STOPPED) {
+ return listenerSet.putIfAbsent(listener, Boolean.TRUE) == Boolean.TRUE;
+ }
+ }
+ return false;
+ }
+
+ /**
+ * Wait until this service has shutdown. Returns true if shutdown occurred within the timeout
+ * and false otherwise.
+ * <p>
+ * This is currently exposed at the package private level for tests only.
+ */
+ boolean awaitTermination(long timeout, TimeUnit unit) throws InterruptedException {
+ return schedulerService.awaitTermination(timeout, unit);
+ }
+
+ private void updateSample() {
+ long totalBytes = getSpaceUsed(watchPaths);
+ for (Listener listener : listenerSet.keySet()) {
+ listener.onUpdate(totalBytes);
+ }
+ }
+
+ private static class ThreadFactoryImpl implements ThreadFactory {
+ private static final String PREFIX = "Samza-" + PollingScanDiskSpaceMonitor.class.getSimpleName() + "-";
+ private static final AtomicInteger INSTANCE_NUM = new AtomicInteger();
+
+ public Thread newThread(Runnable runnable) {
+ return new Thread(runnable, PREFIX + INSTANCE_NUM.getAndIncrement());
+ }
+ }
+}
http://git-wip-us.apache.org/repos/asf/samza/blob/dc67d156/samza-core/src/main/scala/org/apache/samza/container/SamzaContainer.scala
----------------------------------------------------------------------
diff --git a/samza-core/src/main/scala/org/apache/samza/container/SamzaContainer.scala b/samza-core/src/main/scala/org/apache/samza/container/SamzaContainer.scala
index bcbc90a..5462208 100644
--- a/samza-core/src/main/scala/org/apache/samza/container/SamzaContainer.scala
+++ b/samza-core/src/main/scala/org/apache/samza/container/SamzaContainer.scala
@@ -20,6 +20,8 @@
package org.apache.samza.container
import java.io.File
+import java.nio.file.Path
+import java.util
import org.apache.samza.SamzaException
import org.apache.samza.checkpoint.{CheckpointManagerFactory, OffsetManager, OffsetManagerMetrics}
import org.apache.samza.config.MetricsConfig.Config2Metrics
@@ -29,6 +31,8 @@ import org.apache.samza.config.StorageConfig.Config2Storage
import org.apache.samza.config.StreamConfig.Config2Stream
import org.apache.samza.config.SystemConfig.Config2System
import org.apache.samza.config.TaskConfig.Config2Task
+import org.apache.samza.container.disk.DiskSpaceMonitor.Listener
+import org.apache.samza.container.disk.{PollingScanDiskSpaceMonitor, DiskSpaceMonitor}
import org.apache.samza.coordinator.stream.CoordinatorStreamSystemFactory
import org.apache.samza.metrics.JmxServer
import org.apache.samza.metrics.JvmMetrics
@@ -391,6 +395,14 @@ object SamzaContainer extends Logging {
.toSet
val containerContext = new SamzaContainerContext(containerId, config, taskNames)
+ // TODO not sure how we should make this config based, or not. Kind of
+ // strange, since it has some dynamic directories when used with YARN.
+ val defaultStoreBaseDir = new File(System.getProperty("user.dir"), "state")
+ info("Got default storage engine base directory: %s" format defaultStoreBaseDir)
+
+ val storeWatchPaths = new util.HashSet[Path]()
+ storeWatchPaths.add(defaultStoreBaseDir.toPath)
+
val taskInstances: Map[TaskName, TaskInstance] = containerModel.getTasks.values.map(taskModel => {
debug("Setting up task instance: %s" format taskModel)
@@ -414,11 +426,6 @@ object SamzaContainer extends Logging {
info("Got store consumers: %s" format storeConsumers)
- // TODO not sure how we should make this config based, or not. Kind of
- // strange, since it has some dynamic directories when used with YARN.
- val defaultStoreBaseDir = new File(System.getProperty("user.dir"), "state")
- info("Got default storage engine base directory: %s" format defaultStoreBaseDir)
-
var loggedStorageBaseDir: File = null
if(System.getenv(ShellCommandConfig.ENV_LOGGED_STORE_BASE_DIR) != null) {
val jobNameAndId = Util.getJobNameAndId(config)
@@ -430,6 +437,8 @@ object SamzaContainer extends Logging {
loggedStorageBaseDir = defaultStoreBaseDir
}
+ storeWatchPaths.add(loggedStorageBaseDir.toPath)
+
info("Got base directory for logged data stores: %s" format loggedStorageBaseDir)
val taskStores = storageEngineFactories
@@ -504,6 +513,20 @@ object SamzaContainer extends Logging {
(taskName, taskInstance)
}).toMap
+ val diskPollMillis = config.getInt("container.disk.poll.interval.ms", 0)
+ var diskSpaceMonitor: DiskSpaceMonitor = null
+ if (diskPollMillis != 0) {
+ val diskUsage = samzaContainerMetrics.createOrGetDiskUsageGauge()
+
+ diskSpaceMonitor = new PollingScanDiskSpaceMonitor(storeWatchPaths, diskPollMillis)
+ diskSpaceMonitor.registerListener(new Listener {
+ override def onUpdate(diskUsageSample: Long): Unit =
+ diskUsage.set(diskUsageSample)
+ })
+
+ info("Initialized disk space monitor watch paths to: %s" format storeWatchPaths)
+ }
+
val runLoop = new RunLoop(
taskInstances = taskInstances,
consumerMultiplexer = consumerMultiplexer,
@@ -525,7 +548,8 @@ object SamzaContainer extends Logging {
metrics = samzaContainerMetrics,
reporters = reporters,
jvm = jvm,
- jmxServer = jmxServer)
+ jmxServer = jmxServer,
+ diskSpaceMonitor = diskSpaceMonitor)
}
}
@@ -537,6 +561,7 @@ class SamzaContainer(
producerMultiplexer: SystemProducers,
metrics: SamzaContainerMetrics,
jmxServer: JmxServer,
+ diskSpaceMonitor: DiskSpaceMonitor = null,
offsetManager: OffsetManager = new OffsetManager,
localityManager: LocalityManager = null,
reporters: Map[String, MetricsReporter] = Map(),
@@ -550,6 +575,7 @@ class SamzaContainer(
startOffsetManager
startLocalityManager
startStores
+ startDiskSpaceMonitor
startProducers
startTask
startConsumers
@@ -566,6 +592,7 @@ class SamzaContainer(
shutdownConsumers
shutdownTask
shutdownStores
+ shutdownDiskSpaceMonitor
shutdownProducers
shutdownLocalityManager
shutdownOffsetManager
@@ -575,6 +602,13 @@ class SamzaContainer(
}
}
+ def startDiskSpaceMonitor: Unit = {
+ if (diskSpaceMonitor != null) {
+ info("Starting disk space monitor")
+ diskSpaceMonitor.start()
+ }
+ }
+
def startMetrics {
info("Registering task instances with metrics.")
@@ -713,4 +747,11 @@ class SamzaContainer(
jvm.stop
}
}
+
+ def shutdownDiskSpaceMonitor: Unit = {
+ if (diskSpaceMonitor != null) {
+ info("Shutting down disk space monitor.")
+ diskSpaceMonitor.stop()
+ }
+ }
}
http://git-wip-us.apache.org/repos/asf/samza/blob/dc67d156/samza-core/src/main/scala/org/apache/samza/container/SamzaContainerMetrics.scala
----------------------------------------------------------------------
diff --git a/samza-core/src/main/scala/org/apache/samza/container/SamzaContainerMetrics.scala b/samza-core/src/main/scala/org/apache/samza/container/SamzaContainerMetrics.scala
index 6fae650..9e6641c 100644
--- a/samza-core/src/main/scala/org/apache/samza/container/SamzaContainerMetrics.scala
+++ b/samza-core/src/main/scala/org/apache/samza/container/SamzaContainerMetrics.scala
@@ -37,11 +37,22 @@ class SamzaContainerMetrics(
val windowNs = newTimer("window-ns")
val processNs = newTimer("process-ns")
val commitNs = newTimer("commit-ns")
- val utilization = newGauge("event-loop-utilization", 0.0F);
+ val utilization = newGauge("event-loop-utilization", 0.0F)
val taskStoreRestorationMetrics: util.Map[TaskName, Gauge[Long]] = new util.HashMap[TaskName, Gauge[Long]]()
def addStoreRestorationGauge(taskName: TaskName, storeName: String) {
taskStoreRestorationMetrics.put(taskName, newGauge("%s-%s-restore-time" format(taskName.toString, storeName), -1L))
}
+
+ /**
+ * Creates or gets the disk usage gauge for the container and returns it.
+ */
+ def createOrGetDiskUsageGauge(): Gauge[Long] = {
+ // Despite the name, this function appears to be idempotent. A more defensive approach would be
+ // to ensure idempotency at this level, e.g. via a CAS operation. Unfortunately, it appears that
+ // the mechanism to register a Gauge is hidden. An alternative would be to use a mutex to
+ // set ensure the gauge is created once.
+ newGauge("disk-usage", 0L)
+ }
}
http://git-wip-us.apache.org/repos/asf/samza/blob/dc67d156/samza-core/src/test/java/org/apache/samza/container/disk/TestPollingScanDiskSpaceMonitor.java
----------------------------------------------------------------------
diff --git a/samza-core/src/test/java/org/apache/samza/container/disk/TestPollingScanDiskSpaceMonitor.java b/samza-core/src/test/java/org/apache/samza/container/disk/TestPollingScanDiskSpaceMonitor.java
new file mode 100644
index 0000000..2576437
--- /dev/null
+++ b/samza-core/src/test/java/org/apache/samza/container/disk/TestPollingScanDiskSpaceMonitor.java
@@ -0,0 +1,252 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.samza.container.disk;
+
+import org.junit.After;
+import org.junit.Before;
+import org.junit.Test;
+
+import java.io.IOException;
+import java.nio.file.Files;
+import java.nio.file.Path;
+import java.util.ArrayDeque;
+import java.util.Arrays;
+import java.util.Collections;
+import java.util.HashSet;
+import java.util.Set;
+import java.util.concurrent.CountDownLatch;
+import java.util.concurrent.TimeUnit;
+import java.util.concurrent.atomic.AtomicInteger;
+import java.util.concurrent.atomic.AtomicLong;
+
+import static org.junit.Assert.*;
+
+public class TestPollingScanDiskSpaceMonitor {
+ private Path testDir;
+ private ArrayDeque<Path> filesToDelete;
+
+ @Before
+ public void setUp() throws IOException {
+ filesToDelete = new ArrayDeque<>();
+ testDir = Files.createTempDirectory("samza-polling-scan-disk-monitor-test");
+ filesToDelete.push(testDir);
+ }
+
+ @After
+ public void tearDown() throws IOException {
+ while (!filesToDelete.isEmpty()) {
+ Path path = filesToDelete.pop();
+
+ try {
+ Files.delete(path);
+ } catch (IOException e) {
+ // Continue with best effort, this is just test code.
+ }
+ }
+ }
+
+ @Test
+ public void testSizeOfSingleFile() throws IOException {
+ writeFile(testDir, "single-file", new byte[1024]);
+ assertEquals(1024, PollingScanDiskSpaceMonitor.getSpaceUsed(Collections.singleton(testDir)));
+ }
+
+ @Test
+ public void testSizeOfDisjointDirectoriesFromRoot() throws IOException {
+ Path child1Dir = createDirectory(testDir, "child1");
+ writeFile(child1Dir, "foo", new byte[1024]);
+
+ Path child2Dir = createDirectory(testDir, "child2");
+ writeFile(child2Dir, "bar", new byte[4096]);
+
+ assertEquals(1024 + 4096, PollingScanDiskSpaceMonitor.getSpaceUsed(Collections.singleton(testDir)));
+ }
+
+ @Test
+ public void testSizeOfDisjointDirectoriesFromChildDirs() throws IOException {
+ Path child1Dir = createDirectory(testDir, "child1");
+ writeFile(child1Dir, "foo", new byte[1024]);
+
+ Path child2Dir = createDirectory(testDir, "child2");
+ writeFile(child2Dir, "bar", new byte[4096]);
+
+ Set<Path> pathSet = new HashSet<>(Arrays.asList(child1Dir, child2Dir));
+ assertEquals(1024 + 4096, PollingScanDiskSpaceMonitor.getSpaceUsed(pathSet));
+ }
+
+ @Test
+ public void testSizeOfOverlappedDirectories() throws IOException {
+ Path childDir = createDirectory(testDir, "child");
+ writeFile(childDir, "foo", new byte[1024]);
+
+ Path grandchildDir = createDirectory(childDir, "grandchild");
+ writeFile(grandchildDir, "bar", new byte[4096]);
+
+ // If getSpaceUsed were not handling overlapping directories we would expect to count
+ // grandchild twice, which would give us the erroneous total `1024 + 4096 * 2`.
+ Set<Path> pathSet = new HashSet<>(Arrays.asList(childDir, grandchildDir));
+ assertEquals(1024 + 4096, PollingScanDiskSpaceMonitor.getSpaceUsed(pathSet));
+ }
+
+ @Test
+ public void testSizeOfDirectoryAccessedWithDifferentPaths() throws IOException {
+ Path childDir = createDirectory(testDir, "child1");
+ writeFile(childDir, "foo", new byte[1024]);
+
+ Path otherPath = childDir.resolve("..").resolve(childDir.getFileName());
+ Set<Path> pathSet = new HashSet<>(Arrays.asList(childDir, otherPath));
+
+ // This test actually verifies that !childDir.equals(otherPath) and ensures that we properly
+ // handle duplicate paths to the same directory.
+ assertEquals(1024, PollingScanDiskSpaceMonitor.getSpaceUsed(pathSet));
+ }
+
+ @Test
+ public void testSizeOfAlreadyCountedSymlinkedFile() throws IOException {
+ writeFile(testDir, "regular-file", new byte[1024]);
+ Files.createSymbolicLink(testDir.resolve("symlink"), testDir.resolve("regular-file"));
+
+ // We should not double count a symlinked file
+ assertEquals(1024, PollingScanDiskSpaceMonitor.getSpaceUsed(Collections.singleton(testDir)));
+ }
+
+ @Test
+ public void testSizeOfUncountedSymlinkedFile() throws IOException {
+ Path childDir = createDirectory(testDir, "child");
+ writeFile(testDir, "regular-file", new byte[1024]);
+ Files.createSymbolicLink(childDir.resolve("symlink"), testDir.resolve("regular-file"));
+
+ // We should count the space of the symlinked file even thought it is outside of the root
+ // from which we started.
+ assertEquals(1024, PollingScanDiskSpaceMonitor.getSpaceUsed(Collections.singleton(testDir)));
+ }
+
+ @Test
+ public void testFollowSymlinkedDirectory() throws IOException {
+ Path childDir = createDirectory(testDir, "child");
+ writeFile(childDir, "regular-file", new byte[1024]);
+
+ Path dirSymlink = testDir.resolve("symlink");
+ Files.createSymbolicLink(dirSymlink, childDir);
+
+ // We should follow the symlink and read the symlinked file
+ assertEquals(1024, PollingScanDiskSpaceMonitor.getSpaceUsed(Collections.singleton(dirSymlink)));
+ }
+
+ @Test
+ public void testHandleCyclicalSymlink() throws IOException {
+ Path childDir = createDirectory(testDir, "child");
+ writeFile(childDir, "regular-file", new byte[1024]);
+ Files.createSymbolicLink(childDir.resolve("symlink"), testDir);
+
+ // We have testDir/childDir/symlink -> testDir, which effectively creates a cycle.
+ assertEquals(1024, PollingScanDiskSpaceMonitor.getSpaceUsed(Collections.singleton(childDir)));
+ }
+
+ @Test
+ public void testMissingDirectory() throws IOException {
+ Set<Path> pathSet = Collections.singleton(testDir.resolve("non-existant-child"));
+ assertEquals(0, PollingScanDiskSpaceMonitor.getSpaceUsed(pathSet));
+ }
+
+ @Test
+ public void testGetSamplesFromListener() throws IOException, InterruptedException {
+ writeFile(testDir, "single-file", new byte[1024]);
+
+ final AtomicLong sample = new AtomicLong();
+ final CountDownLatch sampleReady = new CountDownLatch(1);
+ final PollingScanDiskSpaceMonitor monitor = new PollingScanDiskSpaceMonitor(Collections.singleton(testDir), 50);
+ monitor.registerListener(new DiskSpaceMonitor.Listener() {
+ @Override
+ public void onUpdate(long diskUsageSample) {
+ sample.set(diskUsageSample);
+ sampleReady.countDown();
+ }
+ });
+
+ monitor.start();
+
+ try {
+ if (!sampleReady.await(5, TimeUnit.SECONDS)) {
+ fail("Timed out waiting for listener to be provide disk usage sample");
+ }
+
+ assertEquals(1024, sample.get());
+ } finally {
+ monitor.stop();
+ }
+ }
+
+ @Test
+ public void testStartStop() throws IOException, InterruptedException {
+ writeFile(testDir, "single-file", new byte[1024]);
+
+ final int numSamplesToCollect = 5;
+
+ final AtomicInteger numCallbackInvocations = new AtomicInteger();
+ final CountDownLatch doneLatch = new CountDownLatch(1);
+ final PollingScanDiskSpaceMonitor monitor = new PollingScanDiskSpaceMonitor(Collections.singleton(testDir), 50);
+ monitor.registerListener(new DiskSpaceMonitor.Listener() {
+ @Override
+ public void onUpdate(long diskUsageSample) {
+ if (numCallbackInvocations.incrementAndGet() == numSamplesToCollect) {
+ monitor.stop();
+ doneLatch.countDown();
+ }
+ }
+ });
+
+ monitor.start();
+
+ try {
+ if (!doneLatch.await(5, TimeUnit.SECONDS)) {
+ fail(String.format("Timed out waiting for listener to be give %d updates", numSamplesToCollect));
+ }
+ if (!monitor.awaitTermination(5, TimeUnit.SECONDS)) {
+ fail("Timed out waiting for monitor to terminate");
+ }
+
+ // A number larger than numSamplesToCollect indicates that we got a callback after we stopped
+ // the monitor. We should safely be able to assert this will not happen as we stopped the
+ // monitor in the the thread on which it is delivering notifications.
+ assertEquals(numSamplesToCollect, numCallbackInvocations.get());
+ } finally {
+ monitor.stop();
+ }
+ }
+
+ private Path createDirectory(Path parentDir, String name) throws IOException {
+ name = name + "-";
+ Path path = Files.createTempDirectory(parentDir, name);
+ filesToDelete.push(path);
+ return path;
+ }
+
+ private Path createFile(Path parentDir, String name) throws IOException {
+ name = name + "-";
+ Path path = Files.createTempFile(parentDir, name, null);
+ filesToDelete.push(path);
+ return path;
+ }
+
+ private void writeFile(Path parentDir, String name, byte[] contents) throws IOException {
+ Path path = createFile(parentDir, name);
+ Files.write(path, contents);
+ }
+}