You are viewing a plain text version of this content. The canonical link for it is here.
Posted to common-commits@hadoop.apache.org by as...@apache.org on 2016/02/11 09:00:12 UTC

[38/50] hadoop git commit: HDFS-9637. Tests for RollingFileSystemSink. (Daniel Templeton via kasha)

HDFS-9637. Tests for RollingFileSystemSink. (Daniel Templeton via kasha)


Project: http://git-wip-us.apache.org/repos/asf/hadoop/repo
Commit: http://git-wip-us.apache.org/repos/asf/hadoop/commit/bc425a62
Tree: http://git-wip-us.apache.org/repos/asf/hadoop/tree/bc425a62
Diff: http://git-wip-us.apache.org/repos/asf/hadoop/diff/bc425a62

Branch: refs/heads/yarn-2877
Commit: bc425a623fd06c4f56106fb2ea662b19e4434d11
Parents: 4ef1324
Author: Karthik Kambatla <ka...@apache.org>
Authored: Wed Feb 10 10:08:05 2016 -0800
Committer: Karthik Kambatla <ka...@apache.org>
Committed: Wed Feb 10 10:08:05 2016 -0800

----------------------------------------------------------------------
 hadoop-hdfs-project/hadoop-hdfs/CHANGES.txt     |   2 +
 .../sink/TestRollingFileSystemSinkWithHdfs.java | 287 +++++++++++++++++++
 2 files changed, 289 insertions(+)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/hadoop/blob/bc425a62/hadoop-hdfs-project/hadoop-hdfs/CHANGES.txt
----------------------------------------------------------------------
diff --git a/hadoop-hdfs-project/hadoop-hdfs/CHANGES.txt b/hadoop-hdfs-project/hadoop-hdfs/CHANGES.txt
index 4b8bdcb..d691ceb 100644
--- a/hadoop-hdfs-project/hadoop-hdfs/CHANGES.txt
+++ b/hadoop-hdfs-project/hadoop-hdfs/CHANGES.txt
@@ -986,6 +986,8 @@ Release 2.9.0 - UNRELEASED
     HDFS-7764. DirectoryScanner shouldn't abort the scan if one directory had
     an error (Rakesh R via cmccabe)
 
+    HDFS-9637. Tests for RollingFileSystemSink. (Daniel Templeton via kasha)
+
   OPTIMIZATIONS
 
   BUG FIXES

http://git-wip-us.apache.org/repos/asf/hadoop/blob/bc425a62/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/metrics2/sink/TestRollingFileSystemSinkWithHdfs.java
----------------------------------------------------------------------
diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/metrics2/sink/TestRollingFileSystemSinkWithHdfs.java b/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/metrics2/sink/TestRollingFileSystemSinkWithHdfs.java
new file mode 100644
index 0000000..56ff773
--- /dev/null
+++ b/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/metrics2/sink/TestRollingFileSystemSinkWithHdfs.java
@@ -0,0 +1,287 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.hadoop.metrics2.sink;
+
+import java.io.IOException;
+import java.net.URI;
+import java.util.Calendar;
+import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.fs.FileStatus;
+import org.apache.hadoop.fs.FileSystem;
+import org.apache.hadoop.fs.Path;
+import org.apache.hadoop.hdfs.MiniDFSCluster;
+import org.apache.hadoop.metrics2.MetricsSystem;
+import org.apache.hadoop.metrics2.sink.RollingFileSystemSinkTestBase.MyMetrics1;
+import org.junit.After;
+import org.junit.Test;
+import static org.junit.Assert.assertFalse;
+import static org.junit.Assert.assertTrue;
+import org.junit.Before;
+
+/**
+ * Test the {@link RollingFileSystemSink} class in the context of HDFS.
+ */
+public class TestRollingFileSystemSinkWithHdfs
+    extends RollingFileSystemSinkTestBase {
+  private static final int  NUM_DATANODES = 4;
+  private MiniDFSCluster cluster;
+
+  /**
+   * Create a {@link MiniDFSCluster} instance with four nodes.  The
+   * node count is required to allow append to function. Also clear the
+   * sink's test flags.
+   *
+   * @throws IOException thrown if cluster creation fails
+   */
+  @Before
+  public void setupHdfs() throws IOException {
+    Configuration conf = new Configuration();
+
+    // It appears that since HDFS-265, append is always enabled.
+    cluster =
+        new MiniDFSCluster.Builder(conf).numDataNodes(NUM_DATANODES).build();
+
+    // Also clear sink flags
+    RollingFileSystemSink.isTest = false;
+    RollingFileSystemSink.hasFlushed = false;
+  }
+
+  /**
+   * Stop the {@link MiniDFSCluster}.
+   */
+  @After
+  public void shutdownHdfs() {
+    if (cluster != null) {
+      cluster.shutdown();
+    }
+  }
+
+  /**
+   * Test writing logs to HDFS.
+   *
+   * @throws Exception thrown when things break
+   */
+  @Test
+  public void testWrite() throws Exception {
+    String path = "hdfs://" + cluster.getNameNode().getHostAndPort() + "/tmp";
+    MetricsSystem ms = initMetricsSystem(path, false, true);
+
+    assertMetricsContents(doWriteTest(ms, path, 1));
+  }
+
+  /**
+   * Test writing logs to HDFS if append is enabled and the log file already
+   * exists.
+   *
+   * @throws Exception thrown when things break
+   */
+  @Test
+  public void testAppend() throws Exception {
+    String path = "hdfs://" + cluster.getNameNode().getHostAndPort() + "/tmp";
+
+    assertExtraContents(doAppendTest(path, false, true, 1));
+  }
+
+  /**
+   * Test writing logs to HDFS if append is enabled, the log file already
+   * exists, and the sink is set to ignore errors.
+   *
+   * @throws Exception thrown when things break
+   */
+  @Test
+  public void testSilentAppend() throws Exception {
+    String path = "hdfs://" + cluster.getNameNode().getHostAndPort() + "/tmp";
+
+    assertExtraContents(doAppendTest(path, false, true, 1));
+  }
+
+  /**
+   * Test writing logs to HDFS without append enabled, when the log file already
+   * exists.
+   *
+   * @throws Exception thrown when things break
+   */
+  @Test
+  public void testNoAppend() throws Exception {
+    String path = "hdfs://" + cluster.getNameNode().getHostAndPort() + "/tmp";
+
+    assertMetricsContents(doAppendTest(path, false, false, 2));
+  }
+
+  /**
+   * Test writing logs to HDFS without append enabled, with ignore errors
+   * enabled, and when the log file already exists.
+   *
+   * @throws Exception thrown when things break
+   */
+  @Test
+  public void testSilentOverwrite() throws Exception {
+    String path = "hdfs://" + cluster.getNameNode().getHostAndPort() + "/tmp";
+
+    assertMetricsContents(doAppendTest(path, true, false, 2));
+  }
+
+  /**
+   * Test that writing to HDFS fails when HDFS is unavailable.
+   *
+   * @throws IOException thrown when reading or writing log files
+   */
+  @Test
+  public void testFailedWrite() throws IOException {
+    final String path =
+        "hdfs://" + cluster.getNameNode().getHostAndPort() + "/tmp";
+    MetricsSystem ms = initMetricsSystem(path, false, false);
+
+    new MyMetrics1().registerWith(ms);
+
+    shutdownHdfs();
+    ErrorSink.errored = false;
+
+    ms.publishMetricsNow(); // publish the metrics
+
+    assertTrue("No exception was generated while writing metrics "
+        + "even though HDFS was unavailable", ErrorSink.errored);
+
+    ms.stop();
+    ms.shutdown();
+  }
+
+  /**
+   * Test that closing a file in HDFS fails when HDFS is unavailable.
+   *
+   * @throws IOException thrown when reading or writing log files
+   */
+  @Test
+  public void testFailedClose() throws IOException {
+    final String path =
+        "hdfs://" + cluster.getNameNode().getHostAndPort() + "/tmp";
+    MetricsSystem ms = initMetricsSystem(path, false, false);
+
+    new MyMetrics1().registerWith(ms);
+
+    ms.publishMetricsNow(); // publish the metrics
+
+    shutdownHdfs();
+    ErrorSink.errored = false;
+
+    ms.stop();
+
+    assertTrue("No exception was generated while stopping sink "
+        + "even though HDFS was unavailable", ErrorSink.errored);
+
+    ms.shutdown();
+  }
+
+  /**
+   * Test that writing to HDFS fails silently when HDFS is unavailable.
+   *
+   * @throws IOException thrown when reading or writing log files
+   * @throws java.lang.InterruptedException thrown if interrupted
+   */
+  @Test
+  public void testSilentFailedWrite() throws IOException, InterruptedException {
+    final String path =
+        "hdfs://" + cluster.getNameNode().getHostAndPort() + "/tmp";
+    MetricsSystem ms = initMetricsSystem(path, true, false);
+
+    new MyMetrics1().registerWith(ms);
+
+    shutdownHdfs();
+    ErrorSink.errored = false;
+
+    ms.publishMetricsNow(); // publish the metrics
+
+    assertFalse("An exception was generated writing metrics "
+        + "while HDFS was unavailable, even though the sink is set to "
+        + "ignore errors", ErrorSink.errored);
+
+    ms.stop();
+    ms.shutdown();
+  }
+
+  /**
+   * Test that closing a file in HDFS silently fails when HDFS is unavailable.
+   *
+   * @throws IOException thrown when reading or writing log files
+   */
+  @Test
+  public void testSilentFailedClose() throws IOException {
+    final String path =
+        "hdfs://" + cluster.getNameNode().getHostAndPort() + "/tmp";
+    MetricsSystem ms = initMetricsSystem(path, true, false);
+
+    new MyMetrics1().registerWith(ms);
+
+    ms.publishMetricsNow(); // publish the metrics
+
+    shutdownHdfs();
+    ErrorSink.errored = false;
+
+    ms.stop();
+
+    assertFalse("An exception was generated stopping sink "
+        + "while HDFS was unavailable, even though the sink is set to "
+        + "ignore errors", ErrorSink.errored);
+
+    ms.shutdown();
+  }
+
+  /**
+   * This test specifically checks whether the flusher thread is automatically
+   * flushing the files.  It unfortunately can only test with the alternative
+   * flushing schedule (because of test timing), but it's better than nothing.
+   *
+   * @throws Exception thrown if something breaks
+   */
+  @Test
+  public void testFlushThread() throws Exception {
+    RollingFileSystemSink.isTest = true;
+    RollingFileSystemSink.hasFlushed = false;
+
+    String path = "hdfs://" + cluster.getNameNode().getHostAndPort() + "/tmp";
+    MetricsSystem ms = initMetricsSystem(path, true, false);
+
+    new MyMetrics1().registerWith(ms);
+
+    // Publish the metrics
+    ms.publishMetricsNow();
+    // Pubish again because the first write seems to get properly flushed
+    // regardless.
+    ms.publishMetricsNow();
+
+    // Sleep until the flusher has run
+    while (!RollingFileSystemSink.hasFlushed) {
+      Thread.sleep(50L);
+    }
+
+    Calendar now = getNowNotTopOfHour();
+    FileSystem fs = FileSystem.newInstance(new URI(path), new Configuration());
+    Path currentDir = new Path(path, DATE_FORMAT.format(now.getTime()));
+    Path currentFile =
+        findMostRecentLogFile(fs, new Path(currentDir, getLogFilename()));
+    FileStatus status = fs.getFileStatus(currentFile);
+
+    // Each metrics record is 118+ bytes, depending on hostname
+    assertTrue("The flusher thread didn't flush the log contents. Expected "
+        + "at least 236 bytes in the log file, but got " + status.getLen(),
+        status.getLen() >= 236);
+
+    ms.stop();
+  }
+}