You are viewing a plain text version of this content. The canonical link for it is here.
Posted to common-commits@hadoop.apache.org by as...@apache.org on 2016/02/11 09:00:12 UTC
[38/50] hadoop git commit: HDFS-9637. Tests for
RollingFileSystemSink. (Daniel Templeton via kasha)
HDFS-9637. Tests for RollingFileSystemSink. (Daniel Templeton via kasha)
Project: http://git-wip-us.apache.org/repos/asf/hadoop/repo
Commit: http://git-wip-us.apache.org/repos/asf/hadoop/commit/bc425a62
Tree: http://git-wip-us.apache.org/repos/asf/hadoop/tree/bc425a62
Diff: http://git-wip-us.apache.org/repos/asf/hadoop/diff/bc425a62
Branch: refs/heads/yarn-2877
Commit: bc425a623fd06c4f56106fb2ea662b19e4434d11
Parents: 4ef1324
Author: Karthik Kambatla <ka...@apache.org>
Authored: Wed Feb 10 10:08:05 2016 -0800
Committer: Karthik Kambatla <ka...@apache.org>
Committed: Wed Feb 10 10:08:05 2016 -0800
----------------------------------------------------------------------
hadoop-hdfs-project/hadoop-hdfs/CHANGES.txt | 2 +
.../sink/TestRollingFileSystemSinkWithHdfs.java | 287 +++++++++++++++++++
2 files changed, 289 insertions(+)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/hadoop/blob/bc425a62/hadoop-hdfs-project/hadoop-hdfs/CHANGES.txt
----------------------------------------------------------------------
diff --git a/hadoop-hdfs-project/hadoop-hdfs/CHANGES.txt b/hadoop-hdfs-project/hadoop-hdfs/CHANGES.txt
index 4b8bdcb..d691ceb 100644
--- a/hadoop-hdfs-project/hadoop-hdfs/CHANGES.txt
+++ b/hadoop-hdfs-project/hadoop-hdfs/CHANGES.txt
@@ -986,6 +986,8 @@ Release 2.9.0 - UNRELEASED
HDFS-7764. DirectoryScanner shouldn't abort the scan if one directory had
an error (Rakesh R via cmccabe)
+ HDFS-9637. Tests for RollingFileSystemSink. (Daniel Templeton via kasha)
+
OPTIMIZATIONS
BUG FIXES
http://git-wip-us.apache.org/repos/asf/hadoop/blob/bc425a62/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/metrics2/sink/TestRollingFileSystemSinkWithHdfs.java
----------------------------------------------------------------------
diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/metrics2/sink/TestRollingFileSystemSinkWithHdfs.java b/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/metrics2/sink/TestRollingFileSystemSinkWithHdfs.java
new file mode 100644
index 0000000..56ff773
--- /dev/null
+++ b/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/metrics2/sink/TestRollingFileSystemSinkWithHdfs.java
@@ -0,0 +1,287 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.hadoop.metrics2.sink;
+
+import java.io.IOException;
+import java.net.URI;
+import java.util.Calendar;
+import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.fs.FileStatus;
+import org.apache.hadoop.fs.FileSystem;
+import org.apache.hadoop.fs.Path;
+import org.apache.hadoop.hdfs.MiniDFSCluster;
+import org.apache.hadoop.metrics2.MetricsSystem;
+import org.apache.hadoop.metrics2.sink.RollingFileSystemSinkTestBase.MyMetrics1;
+import org.junit.After;
+import org.junit.Test;
+import static org.junit.Assert.assertFalse;
+import static org.junit.Assert.assertTrue;
+import org.junit.Before;
+
+/**
+ * Test the {@link RollingFileSystemSink} class in the context of HDFS.
+ */
+public class TestRollingFileSystemSinkWithHdfs
+ extends RollingFileSystemSinkTestBase {
+ private static final int NUM_DATANODES = 4;
+ private MiniDFSCluster cluster;
+
+ /**
+ * Create a {@link MiniDFSCluster} instance with four nodes. The
+ * node count is required to allow append to function. Also clear the
+ * sink's test flags.
+ *
+ * @throws IOException thrown if cluster creation fails
+ */
+ @Before
+ public void setupHdfs() throws IOException {
+ Configuration conf = new Configuration();
+
+ // It appears that since HDFS-265, append is always enabled.
+ cluster =
+ new MiniDFSCluster.Builder(conf).numDataNodes(NUM_DATANODES).build();
+
+ // Also clear sink flags
+ RollingFileSystemSink.isTest = false;
+ RollingFileSystemSink.hasFlushed = false;
+ }
+
+ /**
+ * Stop the {@link MiniDFSCluster}.
+ */
+ @After
+ public void shutdownHdfs() {
+ if (cluster != null) {
+ cluster.shutdown();
+ }
+ }
+
+ /**
+ * Test writing logs to HDFS.
+ *
+ * @throws Exception thrown when things break
+ */
+ @Test
+ public void testWrite() throws Exception {
+ String path = "hdfs://" + cluster.getNameNode().getHostAndPort() + "/tmp";
+ MetricsSystem ms = initMetricsSystem(path, false, true);
+
+ assertMetricsContents(doWriteTest(ms, path, 1));
+ }
+
+ /**
+ * Test writing logs to HDFS if append is enabled and the log file already
+ * exists.
+ *
+ * @throws Exception thrown when things break
+ */
+ @Test
+ public void testAppend() throws Exception {
+ String path = "hdfs://" + cluster.getNameNode().getHostAndPort() + "/tmp";
+
+ assertExtraContents(doAppendTest(path, false, true, 1));
+ }
+
+ /**
+ * Test writing logs to HDFS if append is enabled, the log file already
+ * exists, and the sink is set to ignore errors.
+ *
+ * @throws Exception thrown when things break
+ */
+ @Test
+ public void testSilentAppend() throws Exception {
+ String path = "hdfs://" + cluster.getNameNode().getHostAndPort() + "/tmp";
+
+ assertExtraContents(doAppendTest(path, false, true, 1));
+ }
+
+ /**
+ * Test writing logs to HDFS without append enabled, when the log file already
+ * exists.
+ *
+ * @throws Exception thrown when things break
+ */
+ @Test
+ public void testNoAppend() throws Exception {
+ String path = "hdfs://" + cluster.getNameNode().getHostAndPort() + "/tmp";
+
+ assertMetricsContents(doAppendTest(path, false, false, 2));
+ }
+
+ /**
+ * Test writing logs to HDFS without append enabled, with ignore errors
+ * enabled, and when the log file already exists.
+ *
+ * @throws Exception thrown when things break
+ */
+ @Test
+ public void testSilentOverwrite() throws Exception {
+ String path = "hdfs://" + cluster.getNameNode().getHostAndPort() + "/tmp";
+
+ assertMetricsContents(doAppendTest(path, true, false, 2));
+ }
+
+ /**
+ * Test that writing to HDFS fails when HDFS is unavailable.
+ *
+ * @throws IOException thrown when reading or writing log files
+ */
+ @Test
+ public void testFailedWrite() throws IOException {
+ final String path =
+ "hdfs://" + cluster.getNameNode().getHostAndPort() + "/tmp";
+ MetricsSystem ms = initMetricsSystem(path, false, false);
+
+ new MyMetrics1().registerWith(ms);
+
+ shutdownHdfs();
+ ErrorSink.errored = false;
+
+ ms.publishMetricsNow(); // publish the metrics
+
+ assertTrue("No exception was generated while writing metrics "
+ + "even though HDFS was unavailable", ErrorSink.errored);
+
+ ms.stop();
+ ms.shutdown();
+ }
+
+ /**
+ * Test that closing a file in HDFS fails when HDFS is unavailable.
+ *
+ * @throws IOException thrown when reading or writing log files
+ */
+ @Test
+ public void testFailedClose() throws IOException {
+ final String path =
+ "hdfs://" + cluster.getNameNode().getHostAndPort() + "/tmp";
+ MetricsSystem ms = initMetricsSystem(path, false, false);
+
+ new MyMetrics1().registerWith(ms);
+
+ ms.publishMetricsNow(); // publish the metrics
+
+ shutdownHdfs();
+ ErrorSink.errored = false;
+
+ ms.stop();
+
+ assertTrue("No exception was generated while stopping sink "
+ + "even though HDFS was unavailable", ErrorSink.errored);
+
+ ms.shutdown();
+ }
+
+ /**
+ * Test that writing to HDFS fails silently when HDFS is unavailable.
+ *
+ * @throws IOException thrown when reading or writing log files
+ * @throws java.lang.InterruptedException thrown if interrupted
+ */
+ @Test
+ public void testSilentFailedWrite() throws IOException, InterruptedException {
+ final String path =
+ "hdfs://" + cluster.getNameNode().getHostAndPort() + "/tmp";
+ MetricsSystem ms = initMetricsSystem(path, true, false);
+
+ new MyMetrics1().registerWith(ms);
+
+ shutdownHdfs();
+ ErrorSink.errored = false;
+
+ ms.publishMetricsNow(); // publish the metrics
+
+ assertFalse("An exception was generated writing metrics "
+ + "while HDFS was unavailable, even though the sink is set to "
+ + "ignore errors", ErrorSink.errored);
+
+ ms.stop();
+ ms.shutdown();
+ }
+
+ /**
+ * Test that closing a file in HDFS silently fails when HDFS is unavailable.
+ *
+ * @throws IOException thrown when reading or writing log files
+ */
+ @Test
+ public void testSilentFailedClose() throws IOException {
+ final String path =
+ "hdfs://" + cluster.getNameNode().getHostAndPort() + "/tmp";
+ MetricsSystem ms = initMetricsSystem(path, true, false);
+
+ new MyMetrics1().registerWith(ms);
+
+ ms.publishMetricsNow(); // publish the metrics
+
+ shutdownHdfs();
+ ErrorSink.errored = false;
+
+ ms.stop();
+
+ assertFalse("An exception was generated stopping sink "
+ + "while HDFS was unavailable, even though the sink is set to "
+ + "ignore errors", ErrorSink.errored);
+
+ ms.shutdown();
+ }
+
+ /**
+ * This test specifically checks whether the flusher thread is automatically
+ * flushing the files. It unfortunately can only test with the alternative
+ * flushing schedule (because of test timing), but it's better than nothing.
+ *
+ * @throws Exception thrown if something breaks
+ */
+ @Test
+ public void testFlushThread() throws Exception {
+ RollingFileSystemSink.isTest = true;
+ RollingFileSystemSink.hasFlushed = false;
+
+ String path = "hdfs://" + cluster.getNameNode().getHostAndPort() + "/tmp";
+ MetricsSystem ms = initMetricsSystem(path, true, false);
+
+ new MyMetrics1().registerWith(ms);
+
+ // Publish the metrics
+ ms.publishMetricsNow();
+ // Pubish again because the first write seems to get properly flushed
+ // regardless.
+ ms.publishMetricsNow();
+
+ // Sleep until the flusher has run
+ while (!RollingFileSystemSink.hasFlushed) {
+ Thread.sleep(50L);
+ }
+
+ Calendar now = getNowNotTopOfHour();
+ FileSystem fs = FileSystem.newInstance(new URI(path), new Configuration());
+ Path currentDir = new Path(path, DATE_FORMAT.format(now.getTime()));
+ Path currentFile =
+ findMostRecentLogFile(fs, new Path(currentDir, getLogFilename()));
+ FileStatus status = fs.getFileStatus(currentFile);
+
+ // Each metrics record is 118+ bytes, depending on hostname
+ assertTrue("The flusher thread didn't flush the log contents. Expected "
+ + "at least 236 bytes in the log file, but got " + status.getLen(),
+ status.getLen() >= 236);
+
+ ms.stop();
+ }
+}