You are viewing a plain text version of this content. The canonical link for it is here.
Posted to common-commits@hadoop.apache.org by as...@apache.org on 2016/02/11 08:59:48 UTC
[14/50] hadoop git commit: HADOOP-12759. RollingFileSystemSink should
eagerly rotate directories. Contributed by Daniel Templeton.
HADOOP-12759. RollingFileSystemSink should eagerly rotate directories. Contributed by Daniel Templeton.
Project: http://git-wip-us.apache.org/repos/asf/hadoop/repo
Commit: http://git-wip-us.apache.org/repos/asf/hadoop/commit/5b59a0ea
Tree: http://git-wip-us.apache.org/repos/asf/hadoop/tree/5b59a0ea
Diff: http://git-wip-us.apache.org/repos/asf/hadoop/diff/5b59a0ea
Branch: refs/heads/yarn-2877
Commit: 5b59a0ea85c923384e36ad7c036e751551774142
Parents: 1495ff9
Author: Andrew Wang <wa...@apache.org>
Authored: Sat Feb 6 20:52:35 2016 -0800
Committer: Andrew Wang <wa...@apache.org>
Committed: Sat Feb 6 20:52:35 2016 -0800
----------------------------------------------------------------------
hadoop-common-project/hadoop-common/CHANGES.txt | 3 +
.../metrics2/sink/RollingFileSystemSink.java | 275 ++++++++++++++-----
.../sink/RollingFileSystemSinkTestBase.java | 27 +-
3 files changed, 224 insertions(+), 81 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/hadoop/blob/5b59a0ea/hadoop-common-project/hadoop-common/CHANGES.txt
----------------------------------------------------------------------
diff --git a/hadoop-common-project/hadoop-common/CHANGES.txt b/hadoop-common-project/hadoop-common/CHANGES.txt
index 3492fdb..dbfa482 100644
--- a/hadoop-common-project/hadoop-common/CHANGES.txt
+++ b/hadoop-common-project/hadoop-common/CHANGES.txt
@@ -1102,6 +1102,9 @@ Release 2.8.0 - UNRELEASED
HADOOP-12292. Make use of DeleteObjects optional.
(Thomas Demoor via stevel)
+ HADOOP-12759. RollingFileSystemSink should eagerly rotate directories.
+ (Daniel Templeton via wang)
+
OPTIMIZATIONS
HADOOP-11785. Reduce the number of listStatus operation in distcp
http://git-wip-us.apache.org/repos/asf/hadoop/blob/5b59a0ea/hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/metrics2/sink/RollingFileSystemSink.java
----------------------------------------------------------------------
diff --git a/hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/metrics2/sink/RollingFileSystemSink.java b/hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/metrics2/sink/RollingFileSystemSink.java
index 8271362..de403ca 100644
--- a/hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/metrics2/sink/RollingFileSystemSink.java
+++ b/hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/metrics2/sink/RollingFileSystemSink.java
@@ -18,6 +18,7 @@
package org.apache.hadoop.metrics2.sink;
+import com.google.common.annotations.VisibleForTesting;
import java.io.Closeable;
import java.io.IOException;
import java.io.PrintStream;
@@ -25,8 +26,11 @@ import java.net.InetAddress;
import java.net.URI;
import java.net.URISyntaxException;
import java.nio.charset.StandardCharsets;
+import java.util.Calendar;
import java.util.Date;
import java.util.TimeZone;
+import java.util.Timer;
+import java.util.TimerTask;
import org.apache.commons.configuration.SubsetConfiguration;
import org.apache.commons.lang.time.FastDateFormat;
@@ -35,7 +39,9 @@ import org.apache.hadoop.classification.InterfaceStability;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.fs.FSDataOutputStream;
import org.apache.hadoop.fs.FileSystem;
+import org.apache.hadoop.fs.LocatedFileStatus;
import org.apache.hadoop.fs.Path;
+import org.apache.hadoop.fs.RemoteIterator;
import org.apache.hadoop.metrics2.AbstractMetric;
import org.apache.hadoop.metrics2.MetricsException;
import org.apache.hadoop.metrics2.MetricsRecord;
@@ -43,7 +49,7 @@ import org.apache.hadoop.metrics2.MetricsSink;
import org.apache.hadoop.metrics2.MetricsTag;
/**
- * This class is a metrics sink that uses
+ * <p>This class is a metrics sink that uses
* {@link org.apache.hadoop.fs.FileSystem} to write the metrics logs. Every
* hour a new directory will be created under the path specified by the
* <code>basepath</code> property. All metrics will be logged to a file in the
@@ -54,51 +60,53 @@ import org.apache.hadoop.metrics2.MetricsTag;
* time zone used to create the current hour's directory name is GMT. If the
* <code>basepath</code> property isn't specified, it will default to
* "/tmp", which is the temp directory on whatever default file
- * system is configured for the cluster.
+ * system is configured for the cluster.</p>
*
- * The <code><prefix>.sink.<instance>.ignore-error</code> property
- * controls whether an exception is thrown when an error is encountered writing
- * a log file. The default value is <code>true</code>. When set to
- * <code>false</code>, file errors are quietly swallowed.
+ * <p>The <code><prefix>.sink.<instance>.ignore-error</code>
+ * property controls whether an exception is thrown when an error is encountered
+ * writing a log file. The default value is <code>true</code>. When set to
+ * <code>false</code>, file errors are quietly swallowed.</p>
*
- * The primary use of this class is for logging to HDFS. As it uses
+ * <p>The primary use of this class is for logging to HDFS. As it uses
* {@link org.apache.hadoop.fs.FileSystem} to access the target file system,
* however, it can be used to write to the local file system, Amazon S3, or any
* other supported file system. The base path for the sink will determine the
* file system used. An unqualified path will write to the default file system
- * set by the configuration.
+ * set by the configuration.</p>
*
- * Not all file systems support the ability to append to files. In file systems
- * without the ability to append to files, only one writer can write to a file
- * at a time. To allow for concurrent writes from multiple daemons on a single
- * host, the <code>source</code> property should be set to the name of the
- * source daemon, e.g. <i>namenode</i>. The value of the <code>source</code>
- * property should typically be the same as the property's prefix. If this
- * property is not set, the source is taken to be <i>unknown</i>.
+ * <p>Not all file systems support the ability to append to files. In file
+ * systems without the ability to append to files, only one writer can write to
+ * a file at a time. To allow for concurrent writes from multiple daemons on a
+ * single host, the <code>source</code> property should be set to the name of
+ * the source daemon, e.g. <i>namenode</i>. The value of the
+ * <code>source</code> property should typically be the same as the property's
+ * prefix. If this property is not set, the source is taken to be
+ * <i>unknown</i>.</p>
*
- * Instead of appending to an existing file, by default the sink
+ * <p>Instead of appending to an existing file, by default the sink
* will create a new file with a suffix of ".<n>&quet;, where
* <i>n</i> is the next lowest integer that isn't already used in a file name,
* similar to the Hadoop daemon logs. NOTE: the file with the <b>highest</b>
- * sequence number is the <b>newest</b> file, unlike the Hadoop daemon logs.
+ * sequence number is the <b>newest</b> file, unlike the Hadoop daemon logs.</p>
*
- * For file systems that allow append, the sink supports appending to the
+ * <p>For file systems that allow append, the sink supports appending to the
* existing file instead. If the <code>allow-append</code> property is set to
* true, the sink will instead append to the existing file on file systems that
* support appends. By default, the <code>allow-append</code> property is
- * false.
+ * false.</p>
*
- * Note that when writing to HDFS with <code>allow-append</code> set to true,
+ * <p>Note that when writing to HDFS with <code>allow-append</code> set to true,
* there is a minimum acceptable number of data nodes. If the number of data
* nodes drops below that minimum, the append will succeed, but reading the
* data will fail with an IOException in the DataStreamer class. The minimum
- * number of data nodes required for a successful append is generally 2 or 3.
+ * number of data nodes required for a successful append is generally 2 or
+ * 3.</p>
*
- * Note also that when writing to HDFS, the file size information is not updated
- * until the file is closed (e.g. at the top of the hour) even though the data
- * is being written successfully. This is a known HDFS limitation that exists
- * because of the performance cost of updating the metadata. See
- * <a href="https://issues.apache.org/jira/browse/HDFS-5478">HDFS-5478</a>.
+ * <p>Note also that when writing to HDFS, the file size information is not
+ * updated until the file is closed (e.g. at the top of the hour) even though
+ * the data is being written successfully. This is a known HDFS limitation that
+ * exists because of the performance cost of updating the metadata. See
+ * <a href="https://issues.apache.org/jira/browse/HDFS-5478">HDFS-5478</a>.</p>
*/
@InterfaceAudience.Public
@InterfaceStability.Evolving
@@ -111,6 +119,7 @@ public class RollingFileSystemSink implements MetricsSink, Closeable {
private static final String BASEPATH_DEFAULT = "/tmp";
private static final FastDateFormat DATE_FORMAT =
FastDateFormat.getInstance("yyyyMMddHH", TimeZone.getTimeZone("GMT"));
+ private final Object lock = new Object();
private String source;
private boolean ignoreError;
private boolean allowAppend;
@@ -124,6 +133,11 @@ public class RollingFileSystemSink implements MetricsSink, Closeable {
private PrintStream currentOutStream;
// We keep this only to be able to call hsynch() on it.
private FSDataOutputStream currentFSOutStream;
+ private Timer flushTimer;
+ @VisibleForTesting
+ protected static boolean isTest = false;
+ @VisibleForTesting
+ protected static volatile boolean hasFlushed = false;
@Override
public void init(SubsetConfiguration conf) {
@@ -147,6 +161,8 @@ public class RollingFileSystemSink implements MetricsSink, Closeable {
if (allowAppend) {
allowAppend = checkAppend(fileSystem);
}
+
+ flushTimer = new Timer("RollingFileSystemSink Flusher", true);
}
/**
@@ -175,28 +191,72 @@ public class RollingFileSystemSink implements MetricsSink, Closeable {
* new directory or new log file
*/
private void rollLogDirIfNeeded() throws MetricsException {
- String currentDir = DATE_FORMAT.format(new Date());
+ Date now = new Date();
+ String currentDir = DATE_FORMAT.format(now);
Path path = new Path(basePath, currentDir);
// We check whether currentOutStream is null instead of currentDirPath,
// because if currentDirPath is null, then currentOutStream is null, but
// currentOutStream can be null for other reasons.
if ((currentOutStream == null) || !path.equals(currentDirPath)) {
- currentDirPath = path;
-
+ // Close the stream. This step could have been handled already by the
+ // flusher thread, but if it has, the PrintStream will just swallow the
+ // exception, which is fine.
if (currentOutStream != null) {
currentOutStream.close();
}
+ currentDirPath = path;
+
try {
rollLogDir();
} catch (IOException ex) {
- throwMetricsException("Failed to creating new log file", ex);
+ throwMetricsException("Failed to create new log file", ex);
}
+
+ scheduleFlush(now);
}
}
/**
+ * Schedule the current hour's directory to be flushed at the top of the next
+ * hour. If this ends up running after the top of the next hour, it will
+ * execute immediately.
+ *
+ * @param now the current time
+ */
+ private void scheduleFlush(Date now) {
+ // Store the current currentDirPath to close later
+ final PrintStream toClose = currentOutStream;
+ Calendar next = Calendar.getInstance();
+
+ next.setTime(now);
+
+ if (isTest) {
+ // If we're running unit tests, flush after a short pause
+ next.add(Calendar.MILLISECOND, 400);
+ } else {
+ // Otherwise flush at the top of the hour
+ next.set(Calendar.SECOND, 0);
+ next.set(Calendar.MINUTE, 0);
+ next.add(Calendar.HOUR, 1);
+ }
+
+ flushTimer.schedule(new TimerTask() {
+ @Override
+ public void run() {
+ synchronized (lock) {
+ // This close may have already been done by a putMetrics() call. If it
+ // has, the PrintStream will swallow the exception, which is fine.
+ toClose.close();
+ }
+
+ hasFlushed = true;
+ }
+ }, next.getTime());
+ }
+
+ /**
* Create a new directory based on the current hour and a new log file in
* that directory.
*
@@ -231,7 +291,9 @@ public class RollingFileSystemSink implements MetricsSink, Closeable {
*/
private void createLogFile(Path initial) throws IOException {
Path currentAttempt = initial;
- int id = 1;
+ // Start at 0 so that if the base filname exists, we start with the suffix
+ // ".1".
+ int id = 0;
while (true) {
// First try blindly creating the file. If we fail, it either means
@@ -248,8 +310,8 @@ public class RollingFileSystemSink implements MetricsSink, Closeable {
} catch (IOException ex) {
// Now we can check to see if the file exists to know why we failed
if (fileSystem.exists(currentAttempt)) {
+ id = getNextIdToTry(initial, id);
currentAttempt = new Path(initial.toString() + "." + id);
- id += 1;
} else {
throw ex;
}
@@ -258,6 +320,66 @@ public class RollingFileSystemSink implements MetricsSink, Closeable {
}
/**
+ * Return the next ID suffix to use when creating the log file. This method
+ * will look at the files in the directory, find the one with the highest
+ * ID suffix, and 1 to that suffix, and return it. This approach saves a full
+ * linear probe, which matters in the case where there are a large number of
+ * log files.
+ *
+ * @param initial the base file path
+ * @param lastId the last ID value that was used
+ * @return the next ID to try
+ * @throws IOException thrown if there's an issue querying the files in the
+ * directory
+ */
+ private int getNextIdToTry(Path initial, int lastId)
+ throws IOException {
+ RemoteIterator<LocatedFileStatus> files =
+ fileSystem.listFiles(currentDirPath, true);
+ String base = initial.toString();
+ int id = lastId;
+
+ while (files.hasNext()) {
+ String file = files.next().getPath().getName();
+
+ if (file.startsWith(base)) {
+ int fileId = extractId(file);
+
+ if (fileId > id) {
+ id = fileId;
+ }
+ }
+ }
+
+ // Return either 1 more than the highest we found or 1 more than the last
+ // ID used (if no ID was found).
+ return id + 1;
+ }
+
+ /**
+ * Extract the ID from the suffix of the given file name.
+ *
+ * @param file the file name
+ * @return the ID or -1 if no ID could be extracted
+ */
+ private int extractId(String file) {
+ int index = file.lastIndexOf(".");
+ int id = -1;
+
+ // A hostname has to have at least 1 character
+ if (index > 0) {
+ try {
+ id = Integer.parseInt(file.substring(index + 1));
+ } catch (NumberFormatException ex) {
+ // This can happen if there's no suffix, but there is a dot in the
+ // hostname. Just ignore it.
+ }
+ }
+
+ return id;
+ }
+
+ /**
* Create a new log file and return the {@link FSDataOutputStream}. If a
* file with the specified path already exists, open the file for append
* instead.
@@ -303,65 +425,72 @@ public class RollingFileSystemSink implements MetricsSink, Closeable {
@Override
public void putMetrics(MetricsRecord record) {
- rollLogDirIfNeeded();
+ synchronized (lock) {
+ rollLogDirIfNeeded();
- if (currentOutStream != null) {
- currentOutStream.printf("%d %s.%s", record.timestamp(),
- record.context(), record.name());
+ if (currentOutStream != null) {
+ currentOutStream.printf("%d %s.%s", record.timestamp(),
+ record.context(), record.name());
- String separator = ": ";
+ String separator = ": ";
- for (MetricsTag tag : record.tags()) {
- currentOutStream.printf("%s%s=%s", separator, tag.name(), tag.value());
- separator = ", ";
- }
+ for (MetricsTag tag : record.tags()) {
+ currentOutStream.printf("%s%s=%s", separator, tag.name(),
+ tag.value());
+ separator = ", ";
+ }
- for (AbstractMetric metric : record.metrics()) {
- currentOutStream.printf("%s%s=%s", separator, metric.name(),
- metric.value());
- }
+ for (AbstractMetric metric : record.metrics()) {
+ currentOutStream.printf("%s%s=%s", separator, metric.name(),
+ metric.value());
+ }
- currentOutStream.println();
+ currentOutStream.println();
- // If we don't hflush(), the data may not be written until the file is
- // closed. The file won't be closed until the top of the hour *AND*
- // another record is received. Calling hflush() makes sure that the data
- // is complete at the top of the hour.
- try {
- currentFSOutStream.hflush();
- } catch (IOException ex) {
- throwMetricsException("Failed flushing the stream", ex);
- }
+ // If we don't hflush(), the data may not be written until the file is
+ // closed. The file won't be closed until the top of the hour *AND*
+ // another record is received. Calling hflush() makes sure that the data
+ // is complete at the top of the hour.
+ try {
+ currentFSOutStream.hflush();
+ } catch (IOException ex) {
+ throwMetricsException("Failed flushing the stream", ex);
+ }
- checkForErrors("Unable to write to log file");
- } else if (!ignoreError) {
- throwMetricsException("Unable to write to log file");
+ checkForErrors("Unable to write to log file");
+ } else if (!ignoreError) {
+ throwMetricsException("Unable to write to log file");
+ }
}
}
@Override
public void flush() {
- // currentOutStream is null if currentFSOutStream is null
- if (currentFSOutStream != null) {
- try {
- currentFSOutStream.hflush();
- } catch (IOException ex) {
- throwMetricsException("Unable to flush log file", ex);
+ synchronized (lock) {
+ // currentOutStream is null if currentFSOutStream is null
+ if (currentFSOutStream != null) {
+ try {
+ currentFSOutStream.hflush();
+ } catch (IOException ex) {
+ throwMetricsException("Unable to flush log file", ex);
+ }
}
}
}
@Override
- public void close() throws IOException {
- if (currentOutStream != null) {
- currentOutStream.close();
+ public void close() {
+ synchronized (lock) {
+ if (currentOutStream != null) {
+ currentOutStream.close();
- try {
- checkForErrors("Unable to close log file");
- } finally {
- // Null out the streams just in case someone tries to reuse us.
- currentOutStream = null;
- currentFSOutStream = null;
+ try {
+ checkForErrors("Unable to close log file");
+ } finally {
+ // Null out the streams just in case someone tries to reuse us.
+ currentOutStream = null;
+ currentFSOutStream = null;
+ }
}
}
}
http://git-wip-us.apache.org/repos/asf/hadoop/blob/5b59a0ea/hadoop-common-project/hadoop-common/src/test/java/org/apache/hadoop/metrics2/sink/RollingFileSystemSinkTestBase.java
----------------------------------------------------------------------
diff --git a/hadoop-common-project/hadoop-common/src/test/java/org/apache/hadoop/metrics2/sink/RollingFileSystemSinkTestBase.java b/hadoop-common-project/hadoop-common/src/test/java/org/apache/hadoop/metrics2/sink/RollingFileSystemSinkTestBase.java
index 3213276..292d1fc 100644
--- a/hadoop-common-project/hadoop-common/src/test/java/org/apache/hadoop/metrics2/sink/RollingFileSystemSinkTestBase.java
+++ b/hadoop-common-project/hadoop-common/src/test/java/org/apache/hadoop/metrics2/sink/RollingFileSystemSinkTestBase.java
@@ -25,6 +25,7 @@ import java.io.InputStreamReader;
import java.net.InetAddress;
import java.net.URI;
import java.net.URISyntaxException;
+import java.net.UnknownHostException;
import java.nio.charset.StandardCharsets;
import java.text.SimpleDateFormat;
import java.util.Calendar;
@@ -214,8 +215,7 @@ public class RollingFileSystemSinkTestBase {
protected String readLogFile(String path, String then, int count)
throws IOException, URISyntaxException {
final String now = DATE_FORMAT.format(new Date());
- final String logFile =
- "testsrc-" + InetAddress.getLocalHost().getHostName() + ".log";
+ final String logFile = getLogFilename();
FileSystem fs = FileSystem.get(new URI(path), new Configuration());
StringBuilder metrics = new StringBuilder();
boolean found = false;
@@ -258,7 +258,10 @@ public class RollingFileSystemSinkTestBase {
}
/**
- * Return the path to the log file to use, based on the target path.
+ * Return the path to the log file to use, based on the initial path. The
+ * initial path must be a valid log file path. This method will find the
+ * most recent version of the file.
+ *
* @param fs the target FileSystem
* @param initial the path from which to start
* @return the path to use
@@ -275,10 +278,20 @@ public class RollingFileSystemSinkTestBase {
nextLogFile = new Path(initial.toString() + "." + id);
id += 1;
} while (fs.exists(nextLogFile));
+
return logFile;
}
/**
+ * Return the name of the log file for this host.
+ *
+ * @return the name of the log file for this host
+ */
+ protected static String getLogFilename() throws UnknownHostException {
+ return "testsrc-" + InetAddress.getLocalHost().getHostName() + ".log";
+ }
+
+ /**
* Assert that the given contents match what is expected from the test
* metrics.
*
@@ -392,8 +405,7 @@ public class RollingFileSystemSinkTestBase {
fs.mkdirs(dir);
- Path file = new Path(dir,
- "testsrc-" + InetAddress.getLocalHost().getHostName() + ".log");
+ Path file = new Path(dir, getLogFilename());
// Create the log file to force the sink to append
try (FSDataOutputStream out = fs.create(file)) {
@@ -405,8 +417,7 @@ public class RollingFileSystemSinkTestBase {
int count = 1;
while (count < numFiles) {
- file = new Path(dir, "testsrc-"
- + InetAddress.getLocalHost().getHostName() + ".log." + count);
+ file = new Path(dir, getLogFilename() + "." + count);
// Create the log file to force the sink to append
try (FSDataOutputStream out = fs.create(file)) {
@@ -482,7 +493,7 @@ public class RollingFileSystemSinkTestBase {
}
@Override
- public void close() throws IOException {
+ public void close() {
try {
super.close();
} catch (MetricsException ex) {