You are viewing a plain text version of this content. The canonical link for it is here.
Posted to common-commits@hadoop.apache.org by ae...@apache.org on 2016/02/01 19:40:52 UTC

[33/50] [abbrv] hadoop git commit: HADOOP-12702. Add an HDFS metrics sink. (Daniel Templeton via kasha)

HADOOP-12702. Add an HDFS metrics sink. (Daniel Templeton via kasha)


Project: http://git-wip-us.apache.org/repos/asf/hadoop/repo
Commit: http://git-wip-us.apache.org/repos/asf/hadoop/commit/ee005e01
Tree: http://git-wip-us.apache.org/repos/asf/hadoop/tree/ee005e01
Diff: http://git-wip-us.apache.org/repos/asf/hadoop/diff/ee005e01

Branch: refs/heads/HDFS-7240
Commit: ee005e010cff3f97a5daa8000ac2cd151e2631ca
Parents: 7f46636
Author: Karthik Kambatla <ka...@cloudera.com>
Authored: Thu Jan 28 17:43:17 2016 -0800
Committer: Karthik Kambatla <ka...@cloudera.com>
Committed: Thu Jan 28 17:43:17 2016 -0800

----------------------------------------------------------------------
 hadoop-common-project/hadoop-common/CHANGES.txt |   2 +
 .../metrics2/sink/RollingFileSystemSink.java    | 420 +++++++++++++++
 .../sink/RollingFileSystemSinkTestBase.java     | 506 +++++++++++++++++++
 .../sink/TestRollingFileSystemSink.java         | 156 ++++++
 4 files changed, 1084 insertions(+)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/hadoop/blob/ee005e01/hadoop-common-project/hadoop-common/CHANGES.txt
----------------------------------------------------------------------
diff --git a/hadoop-common-project/hadoop-common/CHANGES.txt b/hadoop-common-project/hadoop-common/CHANGES.txt
index 4da20e0..4d01857 100644
--- a/hadoop-common-project/hadoop-common/CHANGES.txt
+++ b/hadoop-common-project/hadoop-common/CHANGES.txt
@@ -650,6 +650,8 @@ Release 2.9.0 - UNRELEASED
 
   NEW FEATURES
 
+    HADOOP-12702. Add an HDFS metrics sink. (Daniel Templeton via kasha)
+
   IMPROVEMENTS
 
     HADOOP-12321. Make JvmPauseMonitor an AbstractService.

http://git-wip-us.apache.org/repos/asf/hadoop/blob/ee005e01/hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/metrics2/sink/RollingFileSystemSink.java
----------------------------------------------------------------------
diff --git a/hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/metrics2/sink/RollingFileSystemSink.java b/hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/metrics2/sink/RollingFileSystemSink.java
new file mode 100644
index 0000000..8271362
--- /dev/null
+++ b/hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/metrics2/sink/RollingFileSystemSink.java
@@ -0,0 +1,420 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.hadoop.metrics2.sink;
+
+import java.io.Closeable;
+import java.io.IOException;
+import java.io.PrintStream;
+import java.net.InetAddress;
+import java.net.URI;
+import java.net.URISyntaxException;
+import java.nio.charset.StandardCharsets;
+import java.util.Date;
+import java.util.TimeZone;
+
+import org.apache.commons.configuration.SubsetConfiguration;
+import org.apache.commons.lang.time.FastDateFormat;
+import org.apache.hadoop.classification.InterfaceAudience;
+import org.apache.hadoop.classification.InterfaceStability;
+import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.fs.FSDataOutputStream;
+import org.apache.hadoop.fs.FileSystem;
+import org.apache.hadoop.fs.Path;
+import org.apache.hadoop.metrics2.AbstractMetric;
+import org.apache.hadoop.metrics2.MetricsException;
+import org.apache.hadoop.metrics2.MetricsRecord;
+import org.apache.hadoop.metrics2.MetricsSink;
+import org.apache.hadoop.metrics2.MetricsTag;
+
+/**
+ * This class is a metrics sink that uses
+ * {@link org.apache.hadoop.fs.FileSystem} to write the metrics logs.  Every
+ * hour a new directory will be created under the path specified by the
+ * <code>basepath</code> property. All metrics will be logged to a file in the
+ * current hour's directory in a file named &lt;hostname&gt;.log, where
+ * &lt;hostname&gt; is the name of the host on which the metrics logging
+ * process is running. The base path is set by the
+ * <code>&lt;prefix&gt;.sink.&lt;instance&gt;.basepath</code> property.  The
+ * time zone used to create the current hour's directory name is GMT.  If the
+ * <code>basepath</code> property isn't specified, it will default to
+ * &quot;/tmp&quot;, which is the temp directory on whatever default file
+ * system is configured for the cluster.
+ *
+ * The <code>&lt;prefix&gt;.sink.&lt;instance&gt;.ignore-error</code> property
+ * controls whether an exception is thrown when an error is encountered writing
+ * a log file.  The default value is <code>true</code>.  When set to
+ * <code>false</code>, file errors are quietly swallowed.
+ *
+ * The primary use of this class is for logging to HDFS.  As it uses
+ * {@link org.apache.hadoop.fs.FileSystem} to access the target file system,
+ * however, it can be used to write to the local file system, Amazon S3, or any
+ * other supported file system.  The base path for the sink will determine the
+ * file system used.  An unqualified path will write to the default file system
+ * set by the configuration.
+ *
+ * Not all file systems support the ability to append to files.  In file systems
+ * without the ability to append to files, only one writer can write to a file
+ * at a time.  To allow for concurrent writes from multiple daemons on a single
+ * host, the <code>source</code> property should be set to the name of the
+ * source daemon, e.g. <i>namenode</i>.  The value of the <code>source</code>
+ * property should typically be the same as the property's prefix.  If this
+ * property is not set, the source is taken to be <i>unknown</i>.
+ *
+ * Instead of appending to an existing file, by default the sink
+ * will create a new file with a suffix of &quot;.&lt;n&gt;&quet;, where
+ * <i>n</i> is the next lowest integer that isn't already used in a file name,
+ * similar to the Hadoop daemon logs.  NOTE: the file with the <b>highest</b>
+ * sequence number is the <b>newest</b> file, unlike the Hadoop daemon logs.
+ *
+ * For file systems that allow append, the sink supports appending to the
+ * existing file instead. If the <code>allow-append</code> property is set to
+ * true, the sink will instead append to the existing file on file systems that
+ * support appends. By default, the <code>allow-append</code> property is
+ * false.
+ *
+ * Note that when writing to HDFS with <code>allow-append</code> set to true,
+ * there is a minimum acceptable number of data nodes.  If the number of data
+ * nodes drops below that minimum, the append will succeed, but reading the
+ * data will fail with an IOException in the DataStreamer class.  The minimum
+ * number of data nodes required for a successful append is generally 2 or 3.
+ *
+ * Note also that when writing to HDFS, the file size information is not updated
+ * until the file is closed (e.g. at the top of the hour) even though the data
+ * is being written successfully. This is a known HDFS limitation that exists
+ * because of the performance cost of updating the metadata.  See
+ * <a href="https://issues.apache.org/jira/browse/HDFS-5478">HDFS-5478</a>.
+ */
+@InterfaceAudience.Public
+@InterfaceStability.Evolving
+public class RollingFileSystemSink implements MetricsSink, Closeable {
+  private static final String BASEPATH_KEY = "basepath";
+  private static final String SOURCE_KEY = "source";
+  private static final String IGNORE_ERROR_KEY = "ignore-error";
+  private static final String ALLOW_APPEND_KEY = "allow-append";
+  private static final String SOURCE_DEFAULT = "unknown";
+  private static final String BASEPATH_DEFAULT = "/tmp";
+  private static final FastDateFormat DATE_FORMAT =
+      FastDateFormat.getInstance("yyyyMMddHH", TimeZone.getTimeZone("GMT"));
+  private String source;
+  private boolean ignoreError;
+  private boolean allowAppend;
+  private Path basePath;
+  private FileSystem fileSystem;
+  // The current directory path into which we're writing files
+  private Path currentDirPath;
+  // The path to the current file into which we're writing data
+  private Path currentFilePath;
+  // The stream to which we're currently writing.
+  private PrintStream currentOutStream;
+  // We keep this only to be able to call hsynch() on it.
+  private FSDataOutputStream currentFSOutStream;
+
+  @Override
+  public void init(SubsetConfiguration conf) {
+    basePath = new Path(conf.getString(BASEPATH_KEY, BASEPATH_DEFAULT));
+    source = conf.getString(SOURCE_KEY, SOURCE_DEFAULT);
+    ignoreError = conf.getBoolean(IGNORE_ERROR_KEY, false);
+    allowAppend = conf.getBoolean(ALLOW_APPEND_KEY, false);
+
+    try {
+      fileSystem = FileSystem.get(new URI(basePath.toString()),
+          new Configuration());
+    } catch (URISyntaxException ex) {
+      throw new MetricsException("The supplied filesystem base path URI"
+          + " is not a valid URI: " + basePath.toString(), ex);
+    } catch (IOException ex) {
+      throw new MetricsException("Error connecting to file system: "
+          + basePath + " [" + ex.toString() + "]", ex);
+    }
+
+    // If we're permitted to append, check if we actually can
+    if (allowAppend) {
+      allowAppend = checkAppend(fileSystem);
+    }
+  }
+
+  /**
+   * Test whether the file system supports append and return the answer.
+   * @param fs the target file system
+   */
+  private boolean checkAppend(FileSystem fs) {
+    boolean canAppend = true;
+
+    try {
+      fs.append(basePath);
+    } catch (IOException ex) {
+      if (ex.getMessage().equals("Not supported")) {
+        canAppend = false;
+      }
+    }
+
+    return canAppend;
+  }
+
+  /**
+   * Check the current directory against the time stamp.  If they're not
+   * the same, create a new directory and a new log file in that directory.
+   *
+   * @throws MetricsException thrown if an error occurs while creating the
+   * new directory or new log file
+   */
+  private void rollLogDirIfNeeded() throws MetricsException {
+    String currentDir = DATE_FORMAT.format(new Date());
+    Path path = new Path(basePath, currentDir);
+
+    // We check whether currentOutStream is null instead of currentDirPath,
+    // because if currentDirPath is null, then currentOutStream is null, but
+    // currentOutStream can be null for other reasons.
+    if ((currentOutStream == null) || !path.equals(currentDirPath)) {
+      currentDirPath = path;
+
+      if (currentOutStream != null) {
+        currentOutStream.close();
+      }
+
+      try {
+        rollLogDir();
+      } catch (IOException ex) {
+        throwMetricsException("Failed to creating new log file", ex);
+      }
+    }
+  }
+
+  /**
+   * Create a new directory based on the current hour and a new log file in
+   * that directory.
+   *
+   * @throws IOException thrown if an error occurs while creating the
+   * new directory or new log file
+   */
+  private void rollLogDir() throws IOException {
+    String fileName =
+        source + "-" + InetAddress.getLocalHost().getHostName() + ".log";
+
+    Path targetFile = new Path(currentDirPath, fileName);
+    fileSystem.mkdirs(currentDirPath);
+
+    if (allowAppend) {
+      createOrAppendLogFile(targetFile);
+    } else {
+      createLogFile(targetFile);
+    }
+  }
+
+  /**
+   * Create a new log file and return the {@link FSDataOutputStream}. If a
+   * file with the specified path already exists, add a suffix, starting with 1
+   * and try again. Keep incrementing the suffix until a nonexistent target
+   * path is found.
+   *
+   * Once the file is open, update {@link #currentFSOutStream},
+   * {@link #currentOutStream}, and {@#link #currentFile} are set appropriately.
+   *
+   * @param initial the target path
+   * @throws IOException thrown if the call to see if the exists fails
+   */
+  private void createLogFile(Path initial) throws IOException {
+    Path currentAttempt = initial;
+    int id = 1;
+
+    while (true) {
+      // First try blindly creating the file. If we fail, it either means
+      // the file exists, or the operation actually failed.  We do it this way
+      // because if we check whether the file exists, it might still be created
+      // by the time we try to create it. Creating first works like a
+      // test-and-set.
+      try {
+        currentFSOutStream = fileSystem.create(currentAttempt, false);
+        currentOutStream = new PrintStream(currentFSOutStream, true,
+            StandardCharsets.UTF_8.name());
+        currentFilePath = currentAttempt;
+        break;
+      } catch (IOException ex) {
+        // Now we can check to see if the file exists to know why we failed
+        if (fileSystem.exists(currentAttempt)) {
+          currentAttempt = new Path(initial.toString() + "." + id);
+          id += 1;
+        } else {
+          throw ex;
+        }
+      }
+    }
+  }
+
+  /**
+   * Create a new log file and return the {@link FSDataOutputStream}. If a
+   * file with the specified path already exists, open the file for append
+   * instead.
+   *
+   * Once the file is open, update {@link #currentFSOutStream},
+   * {@link #currentOutStream}, and {@#link #currentFile} are set appropriately.
+   *
+   * @param initial the target path
+   * @throws IOException thrown if the call to see the append operation fails.
+   */
+  private void createOrAppendLogFile(Path targetFile) throws IOException {
+    // First try blindly creating the file. If we fail, it either means
+    // the file exists, or the operation actually failed.  We do it this way
+    // because if we check whether the file exists, it might still be created
+    // by the time we try to create it. Creating first works like a
+    // test-and-set.
+    try {
+      currentFSOutStream = fileSystem.create(targetFile, false);
+      currentOutStream = new PrintStream(currentFSOutStream, true,
+          StandardCharsets.UTF_8.name());
+    } catch (IOException ex) {
+      // Try appending instead.  If we fail, if means the file doesn't
+      // actually exist yet or the operation actually failed.
+      try {
+        currentFSOutStream = fileSystem.append(targetFile);
+        currentOutStream = new PrintStream(currentFSOutStream, true,
+            StandardCharsets.UTF_8.name());
+      } catch (IOException ex2) {
+        // If the original create failed for a legit but transitory
+        // reason, the append will fail because the file now doesn't exist,
+        // resulting in a confusing stack trace.  To avoid that, we set
+        // the cause of the second exception to be the first exception.
+        // It's still a tiny bit confusing, but it's enough
+        // information that someone should be able to figure it out.
+        ex2.initCause(ex);
+
+        throw ex2;
+      }
+    }
+
+    currentFilePath = targetFile;
+  }
+
+  @Override
+  public void putMetrics(MetricsRecord record) {
+    rollLogDirIfNeeded();
+
+    if (currentOutStream != null) {
+      currentOutStream.printf("%d %s.%s", record.timestamp(),
+          record.context(), record.name());
+
+      String separator = ": ";
+
+      for (MetricsTag tag : record.tags()) {
+        currentOutStream.printf("%s%s=%s", separator, tag.name(), tag.value());
+        separator = ", ";
+      }
+
+      for (AbstractMetric metric : record.metrics()) {
+        currentOutStream.printf("%s%s=%s", separator, metric.name(),
+            metric.value());
+      }
+
+      currentOutStream.println();
+
+      // If we don't hflush(), the data may not be written until the file is
+      // closed. The file won't be closed until the top of the hour *AND*
+      // another record is received. Calling hflush() makes sure that the data
+      // is complete at the top of the hour.
+      try {
+        currentFSOutStream.hflush();
+      } catch (IOException ex) {
+        throwMetricsException("Failed flushing the stream", ex);
+      }
+
+      checkForErrors("Unable to write to log file");
+    } else if (!ignoreError) {
+      throwMetricsException("Unable to write to log file");
+    }
+  }
+
+  @Override
+  public void flush() {
+    // currentOutStream is null if currentFSOutStream is null
+    if (currentFSOutStream != null) {
+      try {
+        currentFSOutStream.hflush();
+      } catch (IOException ex) {
+        throwMetricsException("Unable to flush log file", ex);
+      }
+    }
+  }
+
+  @Override
+  public void close() throws IOException {
+    if (currentOutStream != null) {
+      currentOutStream.close();
+
+      try {
+        checkForErrors("Unable to close log file");
+      } finally {
+        // Null out the streams just in case someone tries to reuse us.
+        currentOutStream = null;
+        currentFSOutStream = null;
+      }
+    }
+  }
+
+  /**
+   * If the sink isn't set to ignore errors, throw a {@link MetricsException}
+   * if the stream encountered an exception.  The message parameter will be used
+   * as the new exception's message with the current file name
+   * ({@link #currentFilePath}) appended to it.
+   *
+   * @param message the exception message. The message will have the current
+   * file name ({@link #currentFilePath}) appended to it.
+   * @throws MetricsException thrown if there was an error and the sink isn't
+   * ignoring errors
+   */
+  private void checkForErrors(String message)
+      throws MetricsException {
+    if (!ignoreError && currentOutStream.checkError()) {
+      throw new MetricsException(message + ": " + currentFilePath);
+    }
+  }
+
+  /**
+   * If the sink isn't set to ignore errors, wrap the Throwable in a
+   * {@link MetricsException} and throw it.  The message parameter will be used
+   * as the new exception's message with the current file name
+   * ({@link #currentFilePath}) and the Throwable's string representation
+   * appended to it.
+   *
+   * @param message the exception message. The message will have the current
+   * file name ({@link #currentFilePath}) and the Throwable's string
+   * representation appended to it.
+   * @param t the Throwable to wrap
+   */
+  private void throwMetricsException(String message, Throwable t) {
+    if (!ignoreError) {
+      throw new MetricsException(message + ": " + currentFilePath + " ["
+          + t.toString() + "]", t);
+    }
+  }
+
+  /**
+   * If the sink isn't set to ignore errors, throw a new
+   * {@link MetricsException}.  The message parameter will be used  as the
+   * new exception's message with the current file name
+   * ({@link #currentFilePath}) appended to it.
+   *
+   * @param message the exception message. The message will have the current
+   * file name ({@link #currentFilePath}) appended to it.
+   */
+  private void throwMetricsException(String message) {
+    if (!ignoreError) {
+      throw new MetricsException(message + ": " + currentFilePath);
+    }
+  }
+}

http://git-wip-us.apache.org/repos/asf/hadoop/blob/ee005e01/hadoop-common-project/hadoop-common/src/test/java/org/apache/hadoop/metrics2/sink/RollingFileSystemSinkTestBase.java
----------------------------------------------------------------------
diff --git a/hadoop-common-project/hadoop-common/src/test/java/org/apache/hadoop/metrics2/sink/RollingFileSystemSinkTestBase.java b/hadoop-common-project/hadoop-common/src/test/java/org/apache/hadoop/metrics2/sink/RollingFileSystemSinkTestBase.java
new file mode 100644
index 0000000..3213276
--- /dev/null
+++ b/hadoop-common-project/hadoop-common/src/test/java/org/apache/hadoop/metrics2/sink/RollingFileSystemSinkTestBase.java
@@ -0,0 +1,506 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.hadoop.metrics2.sink;
+
+import java.io.BufferedReader;
+import java.io.File;
+import java.io.IOException;
+import java.io.InputStreamReader;
+import java.net.InetAddress;
+import java.net.URI;
+import java.net.URISyntaxException;
+import java.nio.charset.StandardCharsets;
+import java.text.SimpleDateFormat;
+import java.util.Calendar;
+import java.util.Date;
+import java.util.TimeZone;
+import java.util.regex.Pattern;
+import org.apache.commons.io.FileUtils;
+import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.fs.FSDataInputStream;
+import org.apache.hadoop.fs.FSDataOutputStream;
+import org.apache.hadoop.fs.FileStatus;
+import org.apache.hadoop.fs.FileSystem;
+import org.apache.hadoop.fs.LocatedFileStatus;
+import org.apache.hadoop.fs.Path;
+import org.apache.hadoop.fs.RemoteIterator;
+import org.apache.hadoop.metrics2.MetricsException;
+import org.apache.hadoop.metrics2.MetricsRecord;
+
+import org.apache.hadoop.metrics2.MetricsSystem;
+import org.apache.hadoop.metrics2.annotation.Metric;
+import org.apache.hadoop.metrics2.annotation.Metrics;
+import org.apache.hadoop.metrics2.annotation.Metric.Type;
+import org.apache.hadoop.metrics2.impl.ConfigBuilder;
+import org.apache.hadoop.metrics2.impl.MetricsSystemImpl;
+import org.apache.hadoop.metrics2.impl.TestMetricsConfig;
+import org.apache.hadoop.metrics2.lib.MutableGaugeInt;
+import org.apache.hadoop.metrics2.lib.MutableGaugeLong;
+import org.junit.AfterClass;
+import org.junit.Rule;
+import org.junit.rules.TestName;
+import org.junit.Before;
+import org.junit.BeforeClass;
+import static org.junit.Assert.assertTrue;
+
+/**
+ * This class is a base class for testing the {@link RollingFileSystemSink}
+ * class in various contexts. It provides the a number of useful utility
+ * methods for classes that extend it.
+ */
+public class RollingFileSystemSinkTestBase {
+  protected static final File ROOT_TEST_DIR =
+      new File(System.getProperty("test.build.data", "target/"),
+        "FileSystemSinkTest");
+  protected static final SimpleDateFormat DATE_FORMAT =
+      new SimpleDateFormat("yyyyMMddHH");
+  protected static File methodDir;
+
+  /**
+   * The name of the current test method.
+   */
+  @Rule
+  public TestName methodName = new TestName();
+
+  /**
+   * A sample metric class
+   */
+  @Metrics(name="testRecord1", context="test1")
+  protected class MyMetrics1 {
+    @Metric(value={"testTag1", ""}, type=Type.TAG)
+    String testTag1() { return "testTagValue1"; }
+
+    @Metric(value={"testTag2", ""}, type=Type.TAG)
+    String gettestTag2() { return "testTagValue2"; }
+
+    @Metric(value={"testMetric1", "An integer gauge"}, always=true)
+    MutableGaugeInt testMetric1;
+
+    @Metric(value={"testMetric2", "A long gauge"}, always=true)
+    MutableGaugeLong testMetric2;
+
+    public MyMetrics1 registerWith(MetricsSystem ms) {
+      return ms.register(methodName.getMethodName() + "-m1", null, this);
+    }
+  }
+
+  /**
+   * Another sample metrics class
+   */
+  @Metrics(name="testRecord2", context="test1")
+  protected class MyMetrics2 {
+    @Metric(value={"testTag22", ""}, type=Type.TAG)
+    String testTag1() { return "testTagValue22"; }
+
+    public MyMetrics2 registerWith(MetricsSystem ms) {
+      return ms.register(methodName.getMethodName() + "-m2", null, this);
+    }
+  }
+
+  /**
+   * Set the date format's timezone to GMT.
+   */
+  @BeforeClass
+  public static void setTZ() {
+    DATE_FORMAT.setTimeZone(TimeZone.getTimeZone("GMT"));
+  }
+
+  /**
+   * Delete the test directory for this test.
+   * @throws IOException thrown if the delete fails
+   */
+  @AfterClass
+  public static void deleteBaseDir() throws IOException {
+    FileUtils.deleteDirectory(ROOT_TEST_DIR);
+  }
+
+  /**
+   * Create the test directory for this test.
+   * @throws IOException thrown if the create fails
+   */
+  @Before
+  public void createMethodDir() throws IOException {
+    methodDir = new File(ROOT_TEST_DIR, methodName.getMethodName());
+
+    methodDir.mkdirs();
+  }
+
+  /**
+   * Set up the metrics system, start it, and return it.
+   * @param path the base path for the sink
+   * @param ignoreErrors whether the sink should ignore errors
+   * @param allowAppend whether the sink is allowed to append to existing files
+   * @return the metrics system
+   */
+  protected MetricsSystem initMetricsSystem(String path, boolean ignoreErrors,
+      boolean allowAppend) {
+    // If the prefix is not lower case, the metrics system won't be able to
+    // read any of the properties.
+    final String prefix = methodName.getMethodName().toLowerCase();
+
+    new ConfigBuilder().add("*.period", 10000)
+        .add(prefix + ".sink.mysink0.class", ErrorSink.class.getName())
+        .add(prefix + ".sink.mysink0.basepath", path)
+        .add(prefix + ".sink.mysink0.source", "testsrc")
+        .add(prefix + ".sink.mysink0.context", "test1")
+        .add(prefix + ".sink.mysink0.ignore-error", ignoreErrors)
+        .add(prefix + ".sink.mysink0.allow-append", allowAppend)
+        .save(TestMetricsConfig.getTestFilename("hadoop-metrics2-" + prefix));
+
+    MetricsSystemImpl ms = new MetricsSystemImpl(prefix);
+
+    ms.start();
+
+    return ms;
+  }
+
+  /**
+   * Helper method that writes metrics files to a target path, reads those
+   * files, and returns the contents of all files as a single string. This
+   * method will assert that the correct number of files is found.
+   *
+   * @param ms an initialized MetricsSystem to use
+   * @param path the target path from which to read the logs
+   * @param count the number of log files to expect
+   * @return the contents of the log files
+   * @throws IOException when the log file can't be read
+   * @throws URISyntaxException when the target path is an invalid URL
+   */
+  protected String doWriteTest(MetricsSystem ms, String path, int count)
+      throws IOException, URISyntaxException {
+    final String then = DATE_FORMAT.format(new Date());
+
+    MyMetrics1 mm1 = new MyMetrics1().registerWith(ms);
+    new MyMetrics2().registerWith(ms);
+
+    mm1.testMetric1.incr();
+    mm1.testMetric2.incr(2);
+
+    ms.publishMetricsNow(); // publish the metrics
+    ms.stop();
+    ms.shutdown();
+
+    return readLogFile(path, then, count);
+  }
+
+  /**
+   * Read the log files at the target path and return the contents as a single
+   * string. This method will assert that the correct number of files is found.
+   *
+   * @param path the target path
+   * @param then when the test method began. Used to find the log directory in
+   * the case that the test run crosses the top of the hour.
+   * @param count the number of log files to expect
+   * @return
+   * @throws IOException
+   * @throws URISyntaxException
+   */
+  protected String readLogFile(String path, String then, int count)
+      throws IOException, URISyntaxException {
+    final String now = DATE_FORMAT.format(new Date());
+    final String logFile =
+        "testsrc-" + InetAddress.getLocalHost().getHostName() + ".log";
+    FileSystem fs = FileSystem.get(new URI(path), new Configuration());
+    StringBuilder metrics = new StringBuilder();
+    boolean found = false;
+
+    for (FileStatus status : fs.listStatus(new Path(path))) {
+      Path logDir = status.getPath();
+
+      // There are only two possible valid log directory names: the time when
+      // the test started and the current time.  Anything else can be ignored.
+      if (now.equals(logDir.getName()) || then.equals(logDir.getName())) {
+        readLogData(fs, findMostRecentLogFile(fs, new Path(logDir, logFile)),
+            metrics);
+        assertFileCount(fs, logDir, count);
+        found = true;
+      }
+    }
+
+    assertTrue("No valid log directories found", found);
+
+    return metrics.toString();
+  }
+
+  /**
+   * Read the target log file and append its contents to the StringBuilder.
+   * @param fs the target FileSystem
+   * @param logFile the target file path
+   * @param metrics where to append the file contents
+   * @throws IOException thrown if the file cannot be read
+   */
+  protected void readLogData(FileSystem fs, Path logFile, StringBuilder metrics)
+      throws IOException {
+    FSDataInputStream fsin = fs.open(logFile);
+    BufferedReader in = new BufferedReader(new InputStreamReader(fsin,
+        StandardCharsets.UTF_8));
+    String line = null;
+
+    while ((line = in.readLine()) != null) {
+      metrics.append(line).append("\n");
+    }
+  }
+
+  /**
+   * Return the path to the log file to use, based on the target path.
+   * @param fs the target FileSystem
+   * @param initial the path from which to start
+   * @return the path to use
+   * @throws IOException thrown if testing for file existence fails.
+   */
+  protected Path findMostRecentLogFile(FileSystem fs, Path initial)
+      throws IOException {
+    Path logFile = null;
+    Path nextLogFile = initial;
+    int id = 1;
+
+    do {
+      logFile = nextLogFile;
+      nextLogFile = new Path(initial.toString() + "." + id);
+      id += 1;
+    } while (fs.exists(nextLogFile));
+    return logFile;
+  }
+
+  /**
+   * Assert that the given contents match what is expected from the test
+   * metrics.
+   *
+   * @param contents the file contents to test
+   */
+  protected void assertMetricsContents(String contents) {
+    // Note that in the below expression we allow tags and metrics to go in
+    // arbitrary order, but the records must be in order.
+    final Pattern expectedContentPattern = Pattern.compile(
+        "^\\d+\\s+test1.testRecord1:\\s+Context=test1,\\s+"
+        + "(testTag1=testTagValue1,\\s+testTag2=testTagValue2|"
+        + "testTag2=testTagValue2,\\s+testTag1=testTagValue1),"
+        + "\\s+Hostname=.*,\\s+"
+        + "(testMetric1=1,\\s+testMetric2=2|testMetric2=2,\\s+testMetric1=1)"
+        + "[\\n\\r]*^\\d+\\s+test1.testRecord2:\\s+Context=test1,"
+        + "\\s+testTag22=testTagValue22,\\s+Hostname=.*$[\\n\\r]*",
+         Pattern.MULTILINE);
+
+    assertTrue("Sink did not produce the expected output. Actual output was: "
+        + contents, expectedContentPattern.matcher(contents).matches());
+  }
+
+  /**
+   * Assert that the given contents match what is expected from the test
+   * metrics when there is pre-existing data.
+   *
+   * @param contents the file contents to test
+   */
+  protected void assertExtraContents(String contents) {
+    // Note that in the below expression we allow tags and metrics to go in
+    // arbitrary order, but the records must be in order.
+    final Pattern expectedContentPattern = Pattern.compile(
+        "Extra stuff[\\n\\r]*"
+        + "^\\d+\\s+test1.testRecord1:\\s+Context=test1,\\s+"
+        + "(testTag1=testTagValue1,\\s+testTag2=testTagValue2|"
+        + "testTag2=testTagValue2,\\s+testTag1=testTagValue1),"
+        + "\\s+Hostname=.*,\\s+"
+        + "(testMetric1=1,\\s+testMetric2=2|testMetric2=2,\\s+testMetric1=1)"
+        + "[\\n\\r]*^\\d+\\s+test1.testRecord2:\\s+Context=test1,"
+        + "\\s+testTag22=testTagValue22,\\s+Hostname=.*$[\\n\\r]*",
+         Pattern.MULTILINE);
+
+    assertTrue("Sink did not produce the expected output. Actual output was: "
+        + contents, expectedContentPattern.matcher(contents).matches());
+  }
+
+  /**
+   * Call {@link #doWriteTest} after pre-creating the log file and filling it
+   * with junk data.
+   *
+   * @param path the base path for the test
+   * @param ignoreErrors whether to ignore errors
+   * @param allowAppend whether to allow appends
+   * @param count the number of files to expect
+   * @return the contents of the final log file
+   * @throws IOException if a file system operation fails
+   * @throws InterruptedException if interrupted while calling
+   * {@link #getNowNotTopOfHour()}
+   * @throws URISyntaxException if the path is not a valid URI
+   */
+  protected String doAppendTest(String path, boolean ignoreErrors,
+      boolean allowAppend, int count)
+      throws IOException, InterruptedException, URISyntaxException {
+    preCreateLogFile(path);
+
+    return doWriteTest(initMetricsSystem(path, ignoreErrors, allowAppend),
+        path, count);
+  }
+
+  /**
+   * Create a file at the target path with some known data in it:
+   * &quot;Extra stuff&quot;.
+   *
+   * If the test run is happening within 20 seconds of the top of the hour,
+   * this method will sleep until the top of the hour.
+   *
+   * @param path the target path under which to create the directory for the
+   * current hour that will contain the log file.
+   *
+   * @throws IOException thrown if the file creation fails
+   * @throws InterruptedException thrown if interrupted while waiting for the
+   * top of the hour.
+   * @throws URISyntaxException thrown if the path isn't a valid URI
+   */
+  protected void preCreateLogFile(String path)
+      throws IOException, InterruptedException, URISyntaxException {
+    preCreateLogFile(path, 1);
+  }
+
+  /**
+   * Create files at the target path with some known data in them.  Each file
+   * will have the same content: &quot;Extra stuff&quot;.
+   *
+   * If the test run is happening within 20 seconds of the top of the hour,
+   * this method will sleep until the top of the hour.
+   *
+   * @param path the target path under which to create the directory for the
+   * current hour that will contain the log files.
+   * @param numFiles the number of log files to create
+   * @throws IOException thrown if the file creation fails
+   * @throws InterruptedException thrown if interrupted while waiting for the
+   * top of the hour.
+   * @throws URISyntaxException thrown if the path isn't a valid URI
+   */
+  protected void preCreateLogFile(String path, int numFiles)
+      throws IOException, InterruptedException, URISyntaxException {
+    Calendar now = getNowNotTopOfHour();
+
+    FileSystem fs = FileSystem.get(new URI(path), new Configuration());
+    Path dir = new Path(path, DATE_FORMAT.format(now.getTime()));
+
+    fs.mkdirs(dir);
+
+    Path file = new Path(dir,
+        "testsrc-" + InetAddress.getLocalHost().getHostName() + ".log");
+
+    // Create the log file to force the sink to append
+    try (FSDataOutputStream out = fs.create(file)) {
+      out.write("Extra stuff\n".getBytes());
+      out.flush();
+    }
+
+    if (numFiles > 1) {
+      int count = 1;
+
+      while (count < numFiles) {
+        file = new Path(dir, "testsrc-"
+            + InetAddress.getLocalHost().getHostName() + ".log." + count);
+
+        // Create the log file to force the sink to append
+        try (FSDataOutputStream out = fs.create(file)) {
+          out.write("Extra stuff\n".getBytes());
+          out.flush();
+        }
+
+        count += 1;
+      }
+    }
+  }
+
+  /**
+   * Return a calendar based on the current time.  If the current time is very
+   * near the top of the hour (less than 20 seconds), sleep until the new hour
+   * before returning a new Calendar instance.
+   *
+   * @return a new Calendar instance that isn't near the top of the hour
+   * @throws InterruptedException if interrupted while sleeping
+   */
+  public Calendar getNowNotTopOfHour() throws InterruptedException {
+    Calendar now = Calendar.getInstance(TimeZone.getTimeZone("GMT"));
+
+    // If we're at the very top of the hour, sleep until the next hour
+    // so that we don't get confused by the directory rolling
+    if ((now.get(Calendar.MINUTE) == 59) && (now.get(Calendar.SECOND) > 40)) {
+      Thread.sleep((61 - now.get(Calendar.SECOND)) * 1000L);
+      now.setTime(new Date());
+    }
+
+    return now;
+  }
+
+  /**
+   * Assert that the number of log files in the target directory is as expected.
+   * @param fs the target FileSystem
+   * @param dir the target directory path
+   * @param expected the expected number of files
+   * @throws IOException thrown if listing files fails
+   */
+  public void assertFileCount(FileSystem fs, Path dir, int expected)
+      throws IOException {
+    RemoteIterator<LocatedFileStatus> i = fs.listFiles(dir, true);
+    int count = 0;
+
+    while (i.hasNext()) {
+      i.next();
+      count++;
+    }
+
+    assertTrue("The sink created additional unexpected log files. " + count
+        + "files were created", expected >= count);
+    assertTrue("The sink created too few log files. " + count + "files were "
+        + "created", expected <= count);
+  }
+
+  /**
+   * This class is a {@link RollingFileSystemSink} wrapper that tracks whether
+   * an exception has been thrown during operations.
+   */
+  public static class ErrorSink extends RollingFileSystemSink {
+    public static volatile boolean errored = false;
+
+    @Override
+    public void putMetrics(MetricsRecord record) {
+      try {
+        super.putMetrics(record);
+      } catch (MetricsException ex) {
+        errored = true;
+
+        throw new MetricsException(ex);
+      }
+    }
+
+    @Override
+    public void close() throws IOException {
+      try {
+        super.close();
+      } catch (MetricsException ex) {
+        errored = true;
+
+        throw new MetricsException(ex);
+      }
+    }
+
+    @Override
+    public void flush() {
+      try {
+        super.flush();
+      } catch (MetricsException ex) {
+        errored = true;
+
+        throw new MetricsException(ex);
+      }
+    }
+  }
+}

http://git-wip-us.apache.org/repos/asf/hadoop/blob/ee005e01/hadoop-common-project/hadoop-common/src/test/java/org/apache/hadoop/metrics2/sink/TestRollingFileSystemSink.java
----------------------------------------------------------------------
diff --git a/hadoop-common-project/hadoop-common/src/test/java/org/apache/hadoop/metrics2/sink/TestRollingFileSystemSink.java b/hadoop-common-project/hadoop-common/src/test/java/org/apache/hadoop/metrics2/sink/TestRollingFileSystemSink.java
new file mode 100644
index 0000000..da63235
--- /dev/null
+++ b/hadoop-common-project/hadoop-common/src/test/java/org/apache/hadoop/metrics2/sink/TestRollingFileSystemSink.java
@@ -0,0 +1,156 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.hadoop.metrics2.sink;
+
+import org.apache.hadoop.metrics2.MetricsSystem;
+
+import org.junit.Test;
+import static org.junit.Assert.assertFalse;
+import static org.junit.Assert.assertTrue;
+
+/**
+ * Test the {@link RollingFileSystemSink} class in the context of the local file
+ * system.
+ */
+public class TestRollingFileSystemSink extends RollingFileSystemSinkTestBase {
+  /**
+   * Test writing logs to the local file system.
+   * @throws Exception when things break
+   */
+  @Test
+  public void testWrite() throws Exception {
+    String path = methodDir.getAbsolutePath();
+    MetricsSystem ms = initMetricsSystem(path, false, false);
+
+    assertMetricsContents(doWriteTest(ms, path, 1));
+  }
+
+  /**
+   * Test writing logs to the local file system with the sink set to ignore
+   * errors.
+   * @throws Exception when things break
+   */
+  @Test
+  public void testSilentWrite() throws Exception {
+    String path = methodDir.getAbsolutePath();
+    MetricsSystem ms = initMetricsSystem(path, true, false);
+
+    assertMetricsContents(doWriteTest(ms, path, 1));
+  }
+
+  /**
+   * Test writing logs to HDFS when the log file already exists.
+   *
+   * @throws Exception when things break
+   */
+  @Test
+  public void testExistingWrite() throws Exception {
+    String path = methodDir.getAbsolutePath();
+
+    assertMetricsContents(doAppendTest(path, false, false, 2));
+  }
+
+  /**
+   * Test writing logs to HDFS when the log file and the .1 log file already
+   * exist.
+   *
+   * @throws Exception when things break
+   */
+  @Test
+  public void testExistingWrite2() throws Exception {
+    String path = methodDir.getAbsolutePath();
+    MetricsSystem ms = initMetricsSystem(path, false, false);
+
+    preCreateLogFile(path, 2);
+
+    assertMetricsContents(doWriteTest(ms, path, 3));
+  }
+
+  /**
+   * Test writing logs to HDFS with ignore errors enabled when
+   * the log file already exists.
+   *
+   * @throws Exception when things break
+   */
+  @Test
+  public void testSilentExistingWrite() throws Exception {
+    String path = methodDir.getAbsolutePath();
+
+    assertMetricsContents(doAppendTest(path, false, false, 2));
+  }
+
+  /**
+   * Test that writing fails when the directory isn't writable.
+   */
+  @Test
+  public void testFailedWrite() {
+    String path = methodDir.getAbsolutePath();
+    MetricsSystem ms = initMetricsSystem(path, false, false);
+
+    new MyMetrics1().registerWith(ms);
+
+    methodDir.setWritable(false);
+    ErrorSink.errored = false;
+
+    try {
+      // publish the metrics
+      ms.publishMetricsNow();
+
+      assertTrue("No exception was generated while writing metrics "
+          + "even though the target directory was not writable",
+          ErrorSink.errored);
+
+      ms.stop();
+      ms.shutdown();
+    } finally {
+      // Make sure the dir is writable again so we can delete it at the end
+      methodDir.setWritable(true);
+    }
+  }
+
+  /**
+   * Test that writing fails silently when the directory is not writable.
+   */
+  @Test
+  public void testSilentFailedWrite() {
+    String path = methodDir.getAbsolutePath();
+    MetricsSystem ms = initMetricsSystem(path, true, false);
+
+    new MyMetrics1().registerWith(ms);
+
+    methodDir.setWritable(false);
+    ErrorSink.errored = false;
+
+    try {
+      // publish the metrics
+      ms.publishMetricsNow();
+
+      assertFalse("An exception was generated while writing metrics "
+          + "when the target directory was not writable, even though the "
+          + "sink is set to ignore errors",
+          ErrorSink.errored);
+
+      ms.stop();
+      ms.shutdown();
+    } finally {
+      // Make sure the dir is writable again so we can delete it at the end
+      methodDir.setWritable(true);
+    }
+  }
+}