You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@samza.apache.org by ni...@apache.org on 2016/04/10 06:20:19 UTC

samza git commit: SAMZA-924: Add disk space monitoring

Repository: samza
Updated Branches:
  refs/heads/master 57aae364b -> dc67d1560


SAMZA-924: Add disk space monitoring


Project: http://git-wip-us.apache.org/repos/asf/samza/repo
Commit: http://git-wip-us.apache.org/repos/asf/samza/commit/dc67d156
Tree: http://git-wip-us.apache.org/repos/asf/samza/tree/dc67d156
Diff: http://git-wip-us.apache.org/repos/asf/samza/diff/dc67d156

Branch: refs/heads/master
Commit: dc67d1560c0ae76fe75e6b767b857ceb492cb7a8
Parents: 57aae36
Author: Chris Pettitt <cp...@linkedin.com>
Authored: Sat Apr 9 21:19:38 2016 -0700
Committer: Yi Pan (Data Infrastructure) <ni...@gmail.com>
Committed: Sat Apr 9 21:19:38 2016 -0700

----------------------------------------------------------------------
 .../samza/container/disk/DiskSpaceMonitor.java  |  59 +++++
 .../disk/PollingScanDiskSpaceMonitor.java       | 199 +++++++++++++++
 .../apache/samza/container/SamzaContainer.scala |  53 +++-
 .../samza/container/SamzaContainerMetrics.scala |  13 +-
 .../disk/TestPollingScanDiskSpaceMonitor.java   | 252 +++++++++++++++++++
 5 files changed, 569 insertions(+), 7 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/samza/blob/dc67d156/samza-core/src/main/java/org/apache/samza/container/disk/DiskSpaceMonitor.java
----------------------------------------------------------------------
diff --git a/samza-core/src/main/java/org/apache/samza/container/disk/DiskSpaceMonitor.java b/samza-core/src/main/java/org/apache/samza/container/disk/DiskSpaceMonitor.java
new file mode 100644
index 0000000..2a565be
--- /dev/null
+++ b/samza-core/src/main/java/org/apache/samza/container/disk/DiskSpaceMonitor.java
@@ -0,0 +1,59 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.samza.container.disk;
+
+/**
+ * An object that monitors the amount of disk space used and reports this usage via a
+ * {@link DiskSpaceMonitor.Listener}.
+ */
+public interface DiskSpaceMonitor {
+  /**
+   * Starts the disk space monitor.
+   */
+  void start();
+
+  /**
+   * Stops the disk space monitor. Once shutdown is complete listeners will not longer receive
+   * new samples. A stopped monitor cannot be restarted with {@link #start()}.
+   */
+  void stop();
+
+  /**
+   * Registers the specified listener with this monitor. The listener will be called
+   * when the monitor has a new sample. The update interval is implementation specific.
+   *
+   * @param listener the listener to register
+   * @return {@code true} if the registration was successful and {@code false} if not. Registration
+   * can fail if the monitor has been stopped or if the listener was already registered.
+   */
+  boolean registerListener(Listener listener);
+
+  /**
+   * A listener that is notified when the disk space manager has sampled a new disk usage value.
+   * Register this listener with {@link #registerListener(Listener)} to receive updates.
+   */
+  interface Listener {
+    /**
+     * Invoked with new samples as they become available.
+     *
+     * @param diskUsageSample the measured disk usage size in bytes.
+     */
+    void onUpdate(long diskUsageSample);
+  }
+}

http://git-wip-us.apache.org/repos/asf/samza/blob/dc67d156/samza-core/src/main/java/org/apache/samza/container/disk/PollingScanDiskSpaceMonitor.java
----------------------------------------------------------------------
diff --git a/samza-core/src/main/java/org/apache/samza/container/disk/PollingScanDiskSpaceMonitor.java b/samza-core/src/main/java/org/apache/samza/container/disk/PollingScanDiskSpaceMonitor.java
new file mode 100644
index 0000000..50c8500
--- /dev/null
+++ b/samza-core/src/main/java/org/apache/samza/container/disk/PollingScanDiskSpaceMonitor.java
@@ -0,0 +1,199 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.samza.container.disk;
+
+import java.io.IOException;
+import java.nio.file.DirectoryStream;
+import java.nio.file.Files;
+import java.nio.file.Path;
+import java.nio.file.attribute.BasicFileAttributes;
+import java.util.ArrayDeque;
+import java.util.Collections;
+import java.util.HashSet;
+import java.util.Set;
+import java.util.concurrent.ConcurrentHashMap;
+import java.util.concurrent.ConcurrentMap;
+import java.util.concurrent.Executors;
+import java.util.concurrent.ScheduledExecutorService;
+import java.util.concurrent.ThreadFactory;
+import java.util.concurrent.TimeUnit;
+import java.util.concurrent.atomic.AtomicInteger;
+
+/**
+ * An implementation of {@link DiskSpaceMonitor} that polls for disk usage based on a specified
+ * polling interval.
+ * <p>
+ * This class is thread-safe.
+ */
+public class PollingScanDiskSpaceMonitor implements DiskSpaceMonitor {
+  private enum State { INIT, RUNNING, STOPPED }
+
+  private static final ThreadFactory THREAD_FACTORY = new ThreadFactoryImpl();
+  // Note: we use this as a set where the value is always Boolean.TRUE.
+  private final ConcurrentMap<Listener, Boolean> listenerSet = new ConcurrentHashMap<>();
+
+  // Used to guard write access to state and listenerSet.
+  private final Object lock = new Object();
+
+  private final ScheduledExecutorService schedulerService =
+      Executors.newSingleThreadScheduledExecutor(THREAD_FACTORY);
+  private final Set<Path> watchPaths;
+  private final long pollingIntervalMillis;
+
+  private State state = State.INIT;
+
+  /**
+   * Returns the total size in bytes used by the specified paths. This function guarantees that it
+   * will not double count overlapping portions of the path set. For example, with a trivial
+   * overlap of /A and /A, it will count /A only once. It also handles other types of overlaps
+   * similarly, such as counting /A/B only once given the paths /A and /A/B.
+   * <p>
+   * This function is exposed as package private to simplify testing various cases without involving
+   * an executor. Alternatively this could have been pulled out to a utility class, but it would
+   * unnecessarily pollute the global namespace.
+   */
+  static long getSpaceUsed(Set<Path> paths) {
+    ArrayDeque<Path> pathStack = new ArrayDeque<>();
+
+    for (Path path : paths) {
+      pathStack.push(path);
+    }
+
+    // Track the directories we've visited to ensure we're not double counting. It would be
+    // preferable to resolve overlap once at startup, but the problem is that the filesystem may
+    // change over time and, in fact, at startup I found that the rocks DB store directory was not
+    // created by the time the disk space monitor was started.
+    Set<Path> visited = new HashSet<>();
+    long totalBytes = 0;
+    while (!pathStack.isEmpty()) {
+      try {
+        // We need to resolve to the real path to ensure that we don't inadvertently double count
+        // due to different paths to the same directory (e.g. /A and /A/../A).
+        Path current = pathStack.pop().toRealPath();
+
+        if (visited.contains(current)) {
+          continue;
+        }
+        visited.add(current);
+
+        BasicFileAttributes currentAttrs = Files.readAttributes(current,
+                                                                BasicFileAttributes.class);
+        if (currentAttrs.isDirectory()) {
+          try (DirectoryStream<Path> directoryListing = Files.newDirectoryStream(current)) {
+            for (Path child : directoryListing) {
+              pathStack.push(child);
+            }
+          }
+        } else if (currentAttrs.isRegularFile()) {
+          totalBytes += currentAttrs.size();
+        }
+      } catch (IOException e) {
+        // If we can't stat the file, just ignore it. This can happen, for example, if we scan
+        // a directory, but by the time we get to stat'ing the file it has been deleted (e.g.
+        // due to compaction, rotation, etc.).
+      }
+    }
+
+    return totalBytes;
+  }
+
+  /**
+   * Creates a new disk space monitor that uses a periodic polling mechanism.
+   *
+   * @param watchPaths the set of paths to watch
+   * @param pollingIntervalMillis the polling interval in milliseconds
+   */
+  public PollingScanDiskSpaceMonitor(Set<Path> watchPaths, long pollingIntervalMillis) {
+    this.watchPaths = Collections.unmodifiableSet(new HashSet<>(watchPaths));
+    this.pollingIntervalMillis = pollingIntervalMillis;
+  }
+
+  @Override
+  public void start() {
+    synchronized (lock) {
+      switch (state) {
+        case INIT:
+          schedulerService.scheduleAtFixedRate(new Runnable() {
+            @Override
+            public void run() {
+              updateSample();
+            }
+          }, pollingIntervalMillis, pollingIntervalMillis, TimeUnit.MILLISECONDS);
+
+          state = State.RUNNING;
+          break;
+
+        case RUNNING:
+          // start is idempotent
+          return;
+
+        case STOPPED:
+          throw new IllegalStateException("PollingScanDiskSpaceMonitor was stopped and cannot be restarted.");
+      }
+    }
+  }
+
+  @Override
+  public void stop() {
+    synchronized (lock) {
+      // We could also wait for full termination of the scheduler service, but it is overkill for
+      // our use case.
+      schedulerService.shutdownNow();
+
+      listenerSet.clear();
+      state = State.STOPPED;
+    }
+  }
+
+  @Override
+  public boolean registerListener(Listener listener) {
+    synchronized (lock) {
+      if (state != State.STOPPED) {
+        return listenerSet.putIfAbsent(listener, Boolean.TRUE) == Boolean.TRUE;
+      }
+    }
+    return false;
+  }
+
+  /**
+   * Wait until this service has shutdown. Returns true if shutdown occurred within the timeout
+   * and false otherwise.
+   * <p>
+   * This is currently exposed at the package private level for tests only.
+   */
+  boolean awaitTermination(long timeout, TimeUnit unit) throws InterruptedException {
+    return schedulerService.awaitTermination(timeout, unit);
+  }
+
+  private void updateSample() {
+    long totalBytes = getSpaceUsed(watchPaths);
+    for (Listener listener : listenerSet.keySet()) {
+      listener.onUpdate(totalBytes);
+    }
+  }
+
+  private static class ThreadFactoryImpl implements ThreadFactory  {
+    private static final String PREFIX = "Samza-" + PollingScanDiskSpaceMonitor.class.getSimpleName() + "-";
+    private static final AtomicInteger INSTANCE_NUM = new AtomicInteger();
+
+    public Thread newThread(Runnable runnable) {
+      return new Thread(runnable, PREFIX + INSTANCE_NUM.getAndIncrement());
+    }
+  }
+}

http://git-wip-us.apache.org/repos/asf/samza/blob/dc67d156/samza-core/src/main/scala/org/apache/samza/container/SamzaContainer.scala
----------------------------------------------------------------------
diff --git a/samza-core/src/main/scala/org/apache/samza/container/SamzaContainer.scala b/samza-core/src/main/scala/org/apache/samza/container/SamzaContainer.scala
index bcbc90a..5462208 100644
--- a/samza-core/src/main/scala/org/apache/samza/container/SamzaContainer.scala
+++ b/samza-core/src/main/scala/org/apache/samza/container/SamzaContainer.scala
@@ -20,6 +20,8 @@
 package org.apache.samza.container
 
 import java.io.File
+import java.nio.file.Path
+import java.util
 import org.apache.samza.SamzaException
 import org.apache.samza.checkpoint.{CheckpointManagerFactory, OffsetManager, OffsetManagerMetrics}
 import org.apache.samza.config.MetricsConfig.Config2Metrics
@@ -29,6 +31,8 @@ import org.apache.samza.config.StorageConfig.Config2Storage
 import org.apache.samza.config.StreamConfig.Config2Stream
 import org.apache.samza.config.SystemConfig.Config2System
 import org.apache.samza.config.TaskConfig.Config2Task
+import org.apache.samza.container.disk.DiskSpaceMonitor.Listener
+import org.apache.samza.container.disk.{PollingScanDiskSpaceMonitor, DiskSpaceMonitor}
 import org.apache.samza.coordinator.stream.CoordinatorStreamSystemFactory
 import org.apache.samza.metrics.JmxServer
 import org.apache.samza.metrics.JvmMetrics
@@ -391,6 +395,14 @@ object SamzaContainer extends Logging {
       .toSet
     val containerContext = new SamzaContainerContext(containerId, config, taskNames)
 
+    // TODO not sure how we should make this config based, or not. Kind of
+    // strange, since it has some dynamic directories when used with YARN.
+    val defaultStoreBaseDir = new File(System.getProperty("user.dir"), "state")
+    info("Got default storage engine base directory: %s" format defaultStoreBaseDir)
+
+    val storeWatchPaths = new util.HashSet[Path]()
+    storeWatchPaths.add(defaultStoreBaseDir.toPath)
+
     val taskInstances: Map[TaskName, TaskInstance] = containerModel.getTasks.values.map(taskModel => {
       debug("Setting up task instance: %s" format taskModel)
 
@@ -414,11 +426,6 @@ object SamzaContainer extends Logging {
 
       info("Got store consumers: %s" format storeConsumers)
 
-      // TODO not sure how we should make this config based, or not. Kind of
-      // strange, since it has some dynamic directories when used with YARN.
-      val defaultStoreBaseDir = new File(System.getProperty("user.dir"), "state")
-      info("Got default storage engine base directory: %s" format defaultStoreBaseDir)
-
       var loggedStorageBaseDir: File = null
       if(System.getenv(ShellCommandConfig.ENV_LOGGED_STORE_BASE_DIR) != null) {
         val jobNameAndId = Util.getJobNameAndId(config)
@@ -430,6 +437,8 @@ object SamzaContainer extends Logging {
         loggedStorageBaseDir = defaultStoreBaseDir
       }
 
+      storeWatchPaths.add(loggedStorageBaseDir.toPath)
+
       info("Got base directory for logged data stores: %s" format loggedStorageBaseDir)
 
       val taskStores = storageEngineFactories
@@ -504,6 +513,20 @@ object SamzaContainer extends Logging {
       (taskName, taskInstance)
     }).toMap
 
+    val diskPollMillis = config.getInt("container.disk.poll.interval.ms", 0)
+    var diskSpaceMonitor: DiskSpaceMonitor = null
+    if (diskPollMillis != 0) {
+      val diskUsage = samzaContainerMetrics.createOrGetDiskUsageGauge()
+
+      diskSpaceMonitor = new PollingScanDiskSpaceMonitor(storeWatchPaths, diskPollMillis)
+      diskSpaceMonitor.registerListener(new Listener {
+        override def onUpdate(diskUsageSample: Long): Unit =
+          diskUsage.set(diskUsageSample)
+      })
+
+      info("Initialized disk space monitor watch paths to: %s" format storeWatchPaths)
+    }
+
     val runLoop = new RunLoop(
       taskInstances = taskInstances,
       consumerMultiplexer = consumerMultiplexer,
@@ -525,7 +548,8 @@ object SamzaContainer extends Logging {
       metrics = samzaContainerMetrics,
       reporters = reporters,
       jvm = jvm,
-      jmxServer = jmxServer)
+      jmxServer = jmxServer,
+      diskSpaceMonitor = diskSpaceMonitor)
   }
 }
 
@@ -537,6 +561,7 @@ class SamzaContainer(
   producerMultiplexer: SystemProducers,
   metrics: SamzaContainerMetrics,
   jmxServer: JmxServer,
+  diskSpaceMonitor: DiskSpaceMonitor = null,
   offsetManager: OffsetManager = new OffsetManager,
   localityManager: LocalityManager = null,
   reporters: Map[String, MetricsReporter] = Map(),
@@ -550,6 +575,7 @@ class SamzaContainer(
       startOffsetManager
       startLocalityManager
       startStores
+      startDiskSpaceMonitor
       startProducers
       startTask
       startConsumers
@@ -566,6 +592,7 @@ class SamzaContainer(
       shutdownConsumers
       shutdownTask
       shutdownStores
+      shutdownDiskSpaceMonitor
       shutdownProducers
       shutdownLocalityManager
       shutdownOffsetManager
@@ -575,6 +602,13 @@ class SamzaContainer(
     }
   }
 
+  def startDiskSpaceMonitor: Unit = {
+    if (diskSpaceMonitor != null) {
+      info("Starting disk space monitor")
+      diskSpaceMonitor.start()
+    }
+  }
+
   def startMetrics {
     info("Registering task instances with metrics.")
 
@@ -713,4 +747,11 @@ class SamzaContainer(
       jvm.stop
     }
   }
+
+  def shutdownDiskSpaceMonitor: Unit = {
+    if (diskSpaceMonitor != null) {
+      info("Shutting down disk space monitor.")
+      diskSpaceMonitor.stop()
+    }
+  }
 }

http://git-wip-us.apache.org/repos/asf/samza/blob/dc67d156/samza-core/src/main/scala/org/apache/samza/container/SamzaContainerMetrics.scala
----------------------------------------------------------------------
diff --git a/samza-core/src/main/scala/org/apache/samza/container/SamzaContainerMetrics.scala b/samza-core/src/main/scala/org/apache/samza/container/SamzaContainerMetrics.scala
index 6fae650..9e6641c 100644
--- a/samza-core/src/main/scala/org/apache/samza/container/SamzaContainerMetrics.scala
+++ b/samza-core/src/main/scala/org/apache/samza/container/SamzaContainerMetrics.scala
@@ -37,11 +37,22 @@ class SamzaContainerMetrics(
   val windowNs = newTimer("window-ns")
   val processNs = newTimer("process-ns")
   val commitNs = newTimer("commit-ns")
-  val utilization = newGauge("event-loop-utilization", 0.0F);
+  val utilization = newGauge("event-loop-utilization", 0.0F)
 
   val taskStoreRestorationMetrics: util.Map[TaskName, Gauge[Long]] = new util.HashMap[TaskName, Gauge[Long]]()
 
   def addStoreRestorationGauge(taskName: TaskName, storeName: String) {
     taskStoreRestorationMetrics.put(taskName, newGauge("%s-%s-restore-time" format(taskName.toString, storeName), -1L))
   }
+
+  /**
+   * Creates or gets the disk usage gauge for the container and returns it.
+   */
+  def createOrGetDiskUsageGauge(): Gauge[Long] = {
+    // Despite the name, this function appears to be idempotent. A more defensive approach would be
+    // to ensure idempotency at this level, e.g. via a CAS operation. Unfortunately, it appears that
+    // the mechanism to register a Gauge is hidden. An alternative would be to use a mutex to
+    // set ensure the gauge is created once.
+    newGauge("disk-usage", 0L)
+  }
 }

http://git-wip-us.apache.org/repos/asf/samza/blob/dc67d156/samza-core/src/test/java/org/apache/samza/container/disk/TestPollingScanDiskSpaceMonitor.java
----------------------------------------------------------------------
diff --git a/samza-core/src/test/java/org/apache/samza/container/disk/TestPollingScanDiskSpaceMonitor.java b/samza-core/src/test/java/org/apache/samza/container/disk/TestPollingScanDiskSpaceMonitor.java
new file mode 100644
index 0000000..2576437
--- /dev/null
+++ b/samza-core/src/test/java/org/apache/samza/container/disk/TestPollingScanDiskSpaceMonitor.java
@@ -0,0 +1,252 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.samza.container.disk;
+
+import org.junit.After;
+import org.junit.Before;
+import org.junit.Test;
+
+import java.io.IOException;
+import java.nio.file.Files;
+import java.nio.file.Path;
+import java.util.ArrayDeque;
+import java.util.Arrays;
+import java.util.Collections;
+import java.util.HashSet;
+import java.util.Set;
+import java.util.concurrent.CountDownLatch;
+import java.util.concurrent.TimeUnit;
+import java.util.concurrent.atomic.AtomicInteger;
+import java.util.concurrent.atomic.AtomicLong;
+
+import static org.junit.Assert.*;
+
+public class TestPollingScanDiskSpaceMonitor {
+  private Path testDir;
+  private ArrayDeque<Path> filesToDelete;
+
+  @Before
+  public void setUp() throws IOException {
+    filesToDelete = new ArrayDeque<>();
+    testDir = Files.createTempDirectory("samza-polling-scan-disk-monitor-test");
+    filesToDelete.push(testDir);
+  }
+
+  @After
+  public void tearDown() throws IOException {
+    while (!filesToDelete.isEmpty()) {
+      Path path = filesToDelete.pop();
+
+      try {
+        Files.delete(path);
+      } catch (IOException e) {
+        // Continue with best effort, this is just test code.
+      }
+    }
+  }
+
+  @Test
+  public void testSizeOfSingleFile() throws IOException {
+    writeFile(testDir, "single-file", new byte[1024]);
+    assertEquals(1024, PollingScanDiskSpaceMonitor.getSpaceUsed(Collections.singleton(testDir)));
+  }
+
+  @Test
+  public void testSizeOfDisjointDirectoriesFromRoot() throws IOException {
+    Path child1Dir = createDirectory(testDir, "child1");
+    writeFile(child1Dir, "foo", new byte[1024]);
+
+    Path child2Dir = createDirectory(testDir, "child2");
+    writeFile(child2Dir, "bar", new byte[4096]);
+
+    assertEquals(1024 + 4096, PollingScanDiskSpaceMonitor.getSpaceUsed(Collections.singleton(testDir)));
+  }
+
+  @Test
+  public void testSizeOfDisjointDirectoriesFromChildDirs() throws IOException {
+    Path child1Dir = createDirectory(testDir, "child1");
+    writeFile(child1Dir, "foo", new byte[1024]);
+
+    Path child2Dir = createDirectory(testDir, "child2");
+    writeFile(child2Dir, "bar", new byte[4096]);
+
+    Set<Path> pathSet = new HashSet<>(Arrays.asList(child1Dir, child2Dir));
+    assertEquals(1024 + 4096, PollingScanDiskSpaceMonitor.getSpaceUsed(pathSet));
+  }
+
+  @Test
+  public void testSizeOfOverlappedDirectories() throws IOException {
+    Path childDir = createDirectory(testDir, "child");
+    writeFile(childDir, "foo", new byte[1024]);
+
+    Path grandchildDir = createDirectory(childDir, "grandchild");
+    writeFile(grandchildDir, "bar", new byte[4096]);
+
+    // If getSpaceUsed were not handling overlapping directories we would expect to count
+    // grandchild twice, which would give us the erroneous total `1024 + 4096 * 2`.
+    Set<Path> pathSet = new HashSet<>(Arrays.asList(childDir, grandchildDir));
+    assertEquals(1024 + 4096, PollingScanDiskSpaceMonitor.getSpaceUsed(pathSet));
+  }
+
+  @Test
+  public void testSizeOfDirectoryAccessedWithDifferentPaths() throws IOException {
+    Path childDir = createDirectory(testDir, "child1");
+    writeFile(childDir, "foo", new byte[1024]);
+
+    Path otherPath = childDir.resolve("..").resolve(childDir.getFileName());
+    Set<Path> pathSet = new HashSet<>(Arrays.asList(childDir, otherPath));
+
+    // This test actually verifies that !childDir.equals(otherPath) and ensures that we properly
+    // handle duplicate paths to the same directory.
+    assertEquals(1024, PollingScanDiskSpaceMonitor.getSpaceUsed(pathSet));
+  }
+
+  @Test
+  public void testSizeOfAlreadyCountedSymlinkedFile() throws IOException {
+    writeFile(testDir, "regular-file", new byte[1024]);
+    Files.createSymbolicLink(testDir.resolve("symlink"), testDir.resolve("regular-file"));
+
+    // We should not double count a symlinked file
+    assertEquals(1024, PollingScanDiskSpaceMonitor.getSpaceUsed(Collections.singleton(testDir)));
+  }
+
+  @Test
+  public void testSizeOfUncountedSymlinkedFile() throws IOException {
+    Path childDir = createDirectory(testDir, "child");
+    writeFile(testDir, "regular-file", new byte[1024]);
+    Files.createSymbolicLink(childDir.resolve("symlink"), testDir.resolve("regular-file"));
+
+    // We should count the space of the symlinked file even thought it is outside of the root
+    // from which we started.
+    assertEquals(1024, PollingScanDiskSpaceMonitor.getSpaceUsed(Collections.singleton(testDir)));
+  }
+
+  @Test
+  public void testFollowSymlinkedDirectory() throws IOException {
+    Path childDir = createDirectory(testDir, "child");
+    writeFile(childDir, "regular-file", new byte[1024]);
+
+    Path dirSymlink = testDir.resolve("symlink");
+    Files.createSymbolicLink(dirSymlink, childDir);
+
+    // We should follow the symlink and read the symlinked file
+    assertEquals(1024, PollingScanDiskSpaceMonitor.getSpaceUsed(Collections.singleton(dirSymlink)));
+  }
+
+  @Test
+  public void testHandleCyclicalSymlink() throws IOException {
+    Path childDir = createDirectory(testDir, "child");
+    writeFile(childDir, "regular-file", new byte[1024]);
+    Files.createSymbolicLink(childDir.resolve("symlink"), testDir);
+
+    // We have testDir/childDir/symlink -> testDir, which effectively creates a cycle.
+    assertEquals(1024, PollingScanDiskSpaceMonitor.getSpaceUsed(Collections.singleton(childDir)));
+  }
+
+  @Test
+  public void testMissingDirectory() throws IOException {
+    Set<Path> pathSet = Collections.singleton(testDir.resolve("non-existant-child"));
+    assertEquals(0, PollingScanDiskSpaceMonitor.getSpaceUsed(pathSet));
+  }
+
+  @Test
+  public void testGetSamplesFromListener() throws IOException, InterruptedException {
+    writeFile(testDir, "single-file", new byte[1024]);
+
+    final AtomicLong sample = new AtomicLong();
+    final CountDownLatch sampleReady = new CountDownLatch(1);
+    final PollingScanDiskSpaceMonitor monitor = new PollingScanDiskSpaceMonitor(Collections.singleton(testDir), 50);
+    monitor.registerListener(new DiskSpaceMonitor.Listener() {
+      @Override
+      public void onUpdate(long diskUsageSample) {
+        sample.set(diskUsageSample);
+        sampleReady.countDown();
+      }
+    });
+
+    monitor.start();
+
+    try {
+      if (!sampleReady.await(5, TimeUnit.SECONDS)) {
+        fail("Timed out waiting for listener to be provide disk usage sample");
+      }
+
+      assertEquals(1024, sample.get());
+    } finally {
+      monitor.stop();
+    }
+  }
+
+  @Test
+  public void testStartStop() throws IOException, InterruptedException {
+    writeFile(testDir, "single-file", new byte[1024]);
+
+    final int numSamplesToCollect = 5;
+
+    final AtomicInteger numCallbackInvocations = new AtomicInteger();
+    final CountDownLatch doneLatch = new CountDownLatch(1);
+    final PollingScanDiskSpaceMonitor monitor = new PollingScanDiskSpaceMonitor(Collections.singleton(testDir), 50);
+    monitor.registerListener(new DiskSpaceMonitor.Listener() {
+      @Override
+      public void onUpdate(long diskUsageSample) {
+        if (numCallbackInvocations.incrementAndGet() == numSamplesToCollect) {
+          monitor.stop();
+          doneLatch.countDown();
+        }
+      }
+    });
+
+    monitor.start();
+
+    try {
+      if (!doneLatch.await(5, TimeUnit.SECONDS)) {
+        fail(String.format("Timed out waiting for listener to be give %d updates", numSamplesToCollect));
+      }
+      if (!monitor.awaitTermination(5, TimeUnit.SECONDS)) {
+        fail("Timed out waiting for monitor to terminate");
+      }
+
+      // A number larger than numSamplesToCollect indicates that we got a callback after we stopped
+      // the monitor. We should safely be able to assert this will not happen as we stopped the
+      // monitor in the the thread on which it is delivering notifications.
+      assertEquals(numSamplesToCollect, numCallbackInvocations.get());
+    } finally {
+      monitor.stop();
+    }
+  }
+
+  private Path createDirectory(Path parentDir, String name) throws IOException {
+    name = name + "-";
+    Path path = Files.createTempDirectory(parentDir, name);
+    filesToDelete.push(path);
+    return path;
+  }
+
+  private Path createFile(Path parentDir, String name) throws IOException {
+    name = name + "-";
+    Path path = Files.createTempFile(parentDir, name, null);
+    filesToDelete.push(path);
+    return path;
+  }
+
+  private void writeFile(Path parentDir, String name, byte[] contents) throws IOException {
+    Path path = createFile(parentDir, name);
+    Files.write(path, contents);
+  }
+}