You are viewing a plain text version of this content. The canonical link for it is here.
Posted to common-commits@hadoop.apache.org by ka...@apache.org on 2016/02/26 01:31:13 UTC
hadoop git commit: HDFS-9858. RollingFileSystemSink can throw an NPE
on non-secure clusters. (Daniel Templeton via kasha)
Repository: hadoop
Updated Branches:
refs/heads/trunk b2951f9fb -> c2460dad6
HDFS-9858. RollingFileSystemSink can throw an NPE on non-secure clusters. (Daniel Templeton via kasha)
Project: http://git-wip-us.apache.org/repos/asf/hadoop/repo
Commit: http://git-wip-us.apache.org/repos/asf/hadoop/commit/c2460dad
Tree: http://git-wip-us.apache.org/repos/asf/hadoop/tree/c2460dad
Diff: http://git-wip-us.apache.org/repos/asf/hadoop/diff/c2460dad
Branch: refs/heads/trunk
Commit: c2460dad642feee1086442d33c30c24ec77236b9
Parents: b2951f9
Author: Karthik Kambatla <ka...@cloudera.com>
Authored: Thu Feb 25 16:31:01 2016 -0800
Committer: Karthik Kambatla <ka...@cloudera.com>
Committed: Thu Feb 25 16:31:09 2016 -0800
----------------------------------------------------------------------
.../metrics2/sink/RollingFileSystemSink.java | 143 +++++++++++++------
.../sink/RollingFileSystemSinkTestBase.java | 7 +-
.../sink/TestRollingFileSystemSink.java | 10 +-
hadoop-hdfs-project/hadoop-hdfs/CHANGES.txt | 3 +
.../sink/TestRollingFileSystemSinkWithHdfs.java | 34 +++--
...TestRollingFileSystemSinkWithSecureHdfs.java | 3 +-
6 files changed, 138 insertions(+), 62 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/hadoop/blob/c2460dad/hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/metrics2/sink/RollingFileSystemSink.java
----------------------------------------------------------------------
diff --git a/hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/metrics2/sink/RollingFileSystemSink.java b/hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/metrics2/sink/RollingFileSystemSink.java
index 2c0a26a..9a43901 100644
--- a/hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/metrics2/sink/RollingFileSystemSink.java
+++ b/hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/metrics2/sink/RollingFileSystemSink.java
@@ -132,6 +132,9 @@ public class RollingFileSystemSink implements MetricsSink, Closeable {
private static final FastDateFormat DATE_FORMAT =
FastDateFormat.getInstance("yyyyMMddHH", TimeZone.getTimeZone("GMT"));
private final Object lock = new Object();
+ private boolean initialized = false;
+ private SubsetConfiguration properties;
+ private Configuration conf;
private String source;
private boolean ignoreError;
private boolean allowAppend;
@@ -163,63 +166,102 @@ public class RollingFileSystemSink implements MetricsSink, Closeable {
protected static FileSystem suppliedFilesystem = null;
@Override
- public void init(SubsetConfiguration conf) {
- basePath = new Path(conf.getString(BASEPATH_KEY, BASEPATH_DEFAULT));
- source = conf.getString(SOURCE_KEY, SOURCE_DEFAULT);
- ignoreError = conf.getBoolean(IGNORE_ERROR_KEY, false);
- allowAppend = conf.getBoolean(ALLOW_APPEND_KEY, false);
+ public void init(SubsetConfiguration metrics2Properties) {
+ properties = metrics2Properties;
+ basePath = new Path(properties.getString(BASEPATH_KEY, BASEPATH_DEFAULT));
+ source = properties.getString(SOURCE_KEY, SOURCE_DEFAULT);
+ ignoreError = properties.getBoolean(IGNORE_ERROR_KEY, false);
+ allowAppend = properties.getBoolean(ALLOW_APPEND_KEY, false);
- Configuration configuration = loadConf();
-
- UserGroupInformation.setConfiguration(configuration);
+ conf = loadConf();
+ UserGroupInformation.setConfiguration(conf);
// Don't do secure setup if it's not needed.
if (UserGroupInformation.isSecurityEnabled()) {
// Validate config so that we don't get an NPE
- checkForProperty(conf, KEYTAB_PROPERTY_KEY);
- checkForProperty(conf, USERNAME_PROPERTY_KEY);
+ checkForProperty(properties, KEYTAB_PROPERTY_KEY);
+ checkForProperty(properties, USERNAME_PROPERTY_KEY);
try {
// Login as whoever we're supposed to be and let the hostname be pulled
// from localhost. If security isn't enabled, this does nothing.
- SecurityUtil.login(configuration, conf.getString(KEYTAB_PROPERTY_KEY),
- conf.getString(USERNAME_PROPERTY_KEY));
+ SecurityUtil.login(conf, properties.getString(KEYTAB_PROPERTY_KEY),
+ properties.getString(USERNAME_PROPERTY_KEY));
} catch (IOException ex) {
throw new MetricsException("Error logging in securely: ["
+ ex.toString() + "]", ex);
}
}
+ }
+
+ /**
+ * Initialize the connection to HDFS and create the base directory. Also
+ * launch the flush thread.
+ */
+ private boolean initFs() {
+ boolean success = false;
- fileSystem = getFileSystem(configuration);
+ fileSystem = getFileSystem();
// This step isn't strictly necessary, but it makes debugging issues much
// easier. We try to create the base directory eagerly and fail with
// copious debug info if it fails.
try {
fileSystem.mkdirs(basePath);
+ success = true;
} catch (Exception ex) {
- throw new MetricsException("Failed to create " + basePath + "["
- + SOURCE_KEY + "=" + source + ", "
- + IGNORE_ERROR_KEY + "=" + ignoreError + ", "
- + ALLOW_APPEND_KEY + "=" + allowAppend + ", "
- + KEYTAB_PROPERTY_KEY + "="
- + conf.getString(KEYTAB_PROPERTY_KEY) + ", "
- + conf.getString(KEYTAB_PROPERTY_KEY) + "="
- + configuration.get(conf.getString(KEYTAB_PROPERTY_KEY)) + ", "
- + USERNAME_PROPERTY_KEY + "="
- + conf.getString(USERNAME_PROPERTY_KEY) + ", "
- + conf.getString(USERNAME_PROPERTY_KEY) + "="
- + configuration.get(conf.getString(USERNAME_PROPERTY_KEY))
- + "] -- " + ex.toString(), ex);
+ if (!ignoreError) {
+ throw new MetricsException("Failed to create " + basePath + "["
+ + SOURCE_KEY + "=" + source + ", "
+ + ALLOW_APPEND_KEY + "=" + allowAppend + ", "
+ + stringifySecurityProperty(KEYTAB_PROPERTY_KEY) + ", "
+ + stringifySecurityProperty(USERNAME_PROPERTY_KEY)
+ + "] -- " + ex.toString(), ex);
+ }
}
- // If we're permitted to append, check if we actually can
- if (allowAppend) {
- allowAppend = checkAppend(fileSystem);
+ if (success) {
+ // If we're permitted to append, check if we actually can
+ if (allowAppend) {
+ allowAppend = checkAppend(fileSystem);
+ }
+
+ flushTimer = new Timer("RollingFileSystemSink Flusher", true);
}
- flushTimer = new Timer("RollingFileSystemSink Flusher", true);
+ return success;
+ }
+
+ /**
+ * Turn a security property into a nicely formatted set of <i>name=value</i>
+ * strings, allowing for either the property or the configuration not to be
+ * set.
+ *
+ * @param properties the sink properties
+ * @param conf the conf
+ * @param property the property to stringify
+ * @return the stringified property
+ */
+ private String stringifySecurityProperty(String property) {
+ String securityProperty;
+
+ if (properties.containsKey(property)) {
+ String propertyValue = properties.getString(property);
+ String confValue = conf.get(properties.getString(property));
+
+ if (confValue != null) {
+ securityProperty = property + "=" + propertyValue
+ + ", " + properties.getString(property) + "=" + confValue;
+ } else {
+ securityProperty = property + "=" + propertyValue
+ + ", " + properties.getString(property) + "=<NOT SET>";
+ }
+ } else {
+ securityProperty = property + "=<NOT SET>";
+ }
+
+ return securityProperty;
}
/**
@@ -242,17 +284,17 @@ public class RollingFileSystemSink implements MetricsSink, Closeable {
* @return the configuration to use
*/
private Configuration loadConf() {
- Configuration conf;
+ Configuration c;
if (suppliedConf != null) {
- conf = suppliedConf;
+ c = suppliedConf;
} else {
// The config we're handed in init() isn't the one we want here, so we
// create a new one to pick up the full settings.
- conf = new Configuration();
+ c = new Configuration();
}
- return conf;
+ return c;
}
/**
@@ -263,7 +305,7 @@ public class RollingFileSystemSink implements MetricsSink, Closeable {
* @return the file system to use
* @throws MetricsException thrown if the file system could not be retrieved
*/
- private FileSystem getFileSystem(Configuration conf) throws MetricsException {
+ private FileSystem getFileSystem() throws MetricsException {
FileSystem fs = null;
if (suppliedFilesystem != null) {
@@ -317,22 +359,29 @@ public class RollingFileSystemSink implements MetricsSink, Closeable {
// because if currentDirPath is null, then currentOutStream is null, but
// currentOutStream can be null for other reasons.
if ((currentOutStream == null) || !path.equals(currentDirPath)) {
- // Close the stream. This step could have been handled already by the
- // flusher thread, but if it has, the PrintStream will just swallow the
- // exception, which is fine.
- if (currentOutStream != null) {
- currentOutStream.close();
+ // If we're not yet connected to HDFS, create the connection
+ if (!initialized) {
+ initialized = initFs();
}
- currentDirPath = path;
+ if (initialized) {
+ // Close the stream. This step could have been handled already by the
+ // flusher thread, but if it has, the PrintStream will just swallow the
+ // exception, which is fine.
+ if (currentOutStream != null) {
+ currentOutStream.close();
+ }
- try {
- rollLogDir();
- } catch (IOException ex) {
- throwMetricsException("Failed to create new log file", ex);
- }
+ currentDirPath = path;
- scheduleFlush(now);
+ try {
+ rollLogDir();
+ } catch (IOException ex) {
+ throwMetricsException("Failed to create new log file", ex);
+ }
+
+ scheduleFlush(now);
+ }
}
}
http://git-wip-us.apache.org/repos/asf/hadoop/blob/c2460dad/hadoop-common-project/hadoop-common/src/test/java/org/apache/hadoop/metrics2/sink/RollingFileSystemSinkTestBase.java
----------------------------------------------------------------------
diff --git a/hadoop-common-project/hadoop-common/src/test/java/org/apache/hadoop/metrics2/sink/RollingFileSystemSinkTestBase.java b/hadoop-common-project/hadoop-common/src/test/java/org/apache/hadoop/metrics2/sink/RollingFileSystemSinkTestBase.java
index f1ad058..9914c5e 100644
--- a/hadoop-common-project/hadoop-common/src/test/java/org/apache/hadoop/metrics2/sink/RollingFileSystemSinkTestBase.java
+++ b/hadoop-common-project/hadoop-common/src/test/java/org/apache/hadoop/metrics2/sink/RollingFileSystemSinkTestBase.java
@@ -175,7 +175,7 @@ public class RollingFileSystemSinkTestBase {
String prefix = methodName.getMethodName().toLowerCase();
ConfigBuilder builder = new ConfigBuilder().add("*.period", 10000)
- .add(prefix + ".sink.mysink0.class", ErrorSink.class.getName())
+ .add(prefix + ".sink.mysink0.class", MockSink.class.getName())
.add(prefix + ".sink.mysink0.basepath", path)
.add(prefix + ".sink.mysink0.source", "testsrc")
.add(prefix + ".sink.mysink0.context", "test1")
@@ -503,8 +503,9 @@ public class RollingFileSystemSinkTestBase {
* This class is a {@link RollingFileSystemSink} wrapper that tracks whether
* an exception has been thrown during operations.
*/
- public static class ErrorSink extends RollingFileSystemSink {
+ public static class MockSink extends RollingFileSystemSink {
public static volatile boolean errored = false;
+ public static volatile boolean initialized = false;
@Override
public void init(SubsetConfiguration conf) {
@@ -515,6 +516,8 @@ public class RollingFileSystemSinkTestBase {
throw new MetricsException(ex);
}
+
+ initialized = true;
}
@Override
http://git-wip-us.apache.org/repos/asf/hadoop/blob/c2460dad/hadoop-common-project/hadoop-common/src/test/java/org/apache/hadoop/metrics2/sink/TestRollingFileSystemSink.java
----------------------------------------------------------------------
diff --git a/hadoop-common-project/hadoop-common/src/test/java/org/apache/hadoop/metrics2/sink/TestRollingFileSystemSink.java b/hadoop-common-project/hadoop-common/src/test/java/org/apache/hadoop/metrics2/sink/TestRollingFileSystemSink.java
index da63235..3c6cd27 100644
--- a/hadoop-common-project/hadoop-common/src/test/java/org/apache/hadoop/metrics2/sink/TestRollingFileSystemSink.java
+++ b/hadoop-common-project/hadoop-common/src/test/java/org/apache/hadoop/metrics2/sink/TestRollingFileSystemSink.java
@@ -23,6 +23,8 @@ import org.apache.hadoop.metrics2.MetricsSystem;
import org.junit.Test;
import static org.junit.Assert.assertFalse;
import static org.junit.Assert.assertTrue;
+import static org.junit.Assert.assertFalse;
+import static org.junit.Assert.assertTrue;
/**
* Test the {@link RollingFileSystemSink} class in the context of the local file
@@ -106,7 +108,7 @@ public class TestRollingFileSystemSink extends RollingFileSystemSinkTestBase {
new MyMetrics1().registerWith(ms);
methodDir.setWritable(false);
- ErrorSink.errored = false;
+ MockSink.errored = false;
try {
// publish the metrics
@@ -114,7 +116,7 @@ public class TestRollingFileSystemSink extends RollingFileSystemSinkTestBase {
assertTrue("No exception was generated while writing metrics "
+ "even though the target directory was not writable",
- ErrorSink.errored);
+ MockSink.errored);
ms.stop();
ms.shutdown();
@@ -135,7 +137,7 @@ public class TestRollingFileSystemSink extends RollingFileSystemSinkTestBase {
new MyMetrics1().registerWith(ms);
methodDir.setWritable(false);
- ErrorSink.errored = false;
+ MockSink.errored = false;
try {
// publish the metrics
@@ -144,7 +146,7 @@ public class TestRollingFileSystemSink extends RollingFileSystemSinkTestBase {
assertFalse("An exception was generated while writing metrics "
+ "when the target directory was not writable, even though the "
+ "sink is set to ignore errors",
- ErrorSink.errored);
+ MockSink.errored);
ms.stop();
ms.shutdown();
http://git-wip-us.apache.org/repos/asf/hadoop/blob/c2460dad/hadoop-hdfs-project/hadoop-hdfs/CHANGES.txt
----------------------------------------------------------------------
diff --git a/hadoop-hdfs-project/hadoop-hdfs/CHANGES.txt b/hadoop-hdfs-project/hadoop-hdfs/CHANGES.txt
index e3990ea..104b46d 100644
--- a/hadoop-hdfs-project/hadoop-hdfs/CHANGES.txt
+++ b/hadoop-hdfs-project/hadoop-hdfs/CHANGES.txt
@@ -1047,6 +1047,9 @@ Release 2.9.0 - UNRELEASED
HDFS-9608. Disk IO imbalance in HDFS with heterogeneous storages.
(Wei Zhou via wang)
+ HDFS-9858. RollingFileSystemSink can throw an NPE on non-secure clusters.
+ (Daniel Templeton via kasha)
+
Release 2.8.0 - UNRELEASED
NEW FEATURES
http://git-wip-us.apache.org/repos/asf/hadoop/blob/c2460dad/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/metrics2/sink/TestRollingFileSystemSinkWithHdfs.java
----------------------------------------------------------------------
diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/metrics2/sink/TestRollingFileSystemSinkWithHdfs.java b/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/metrics2/sink/TestRollingFileSystemSinkWithHdfs.java
index 0f3725b..9984b34 100644
--- a/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/metrics2/sink/TestRollingFileSystemSinkWithHdfs.java
+++ b/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/metrics2/sink/TestRollingFileSystemSinkWithHdfs.java
@@ -151,12 +151,12 @@ public class TestRollingFileSystemSinkWithHdfs
new MyMetrics1().registerWith(ms);
shutdownHdfs();
- ErrorSink.errored = false;
+ MockSink.errored = false;
ms.publishMetricsNow(); // publish the metrics
assertTrue("No exception was generated while writing metrics "
- + "even though HDFS was unavailable", ErrorSink.errored);
+ + "even though HDFS was unavailable", MockSink.errored);
ms.stop();
ms.shutdown();
@@ -178,12 +178,12 @@ public class TestRollingFileSystemSinkWithHdfs
ms.publishMetricsNow(); // publish the metrics
shutdownHdfs();
- ErrorSink.errored = false;
+ MockSink.errored = false;
ms.stop();
assertTrue("No exception was generated while stopping sink "
- + "even though HDFS was unavailable", ErrorSink.errored);
+ + "even though HDFS was unavailable", MockSink.errored);
ms.shutdown();
}
@@ -203,13 +203,13 @@ public class TestRollingFileSystemSinkWithHdfs
new MyMetrics1().registerWith(ms);
shutdownHdfs();
- ErrorSink.errored = false;
+ MockSink.errored = false;
ms.publishMetricsNow(); // publish the metrics
assertFalse("An exception was generated writing metrics "
+ "while HDFS was unavailable, even though the sink is set to "
- + "ignore errors", ErrorSink.errored);
+ + "ignore errors", MockSink.errored);
ms.stop();
ms.shutdown();
@@ -231,13 +231,13 @@ public class TestRollingFileSystemSinkWithHdfs
ms.publishMetricsNow(); // publish the metrics
shutdownHdfs();
- ErrorSink.errored = false;
+ MockSink.errored = false;
ms.stop();
assertFalse("An exception was generated stopping sink "
+ "while HDFS was unavailable, even though the sink is set to "
- + "ignore errors", ErrorSink.errored);
+ + "ignore errors", MockSink.errored);
ms.shutdown();
}
@@ -283,4 +283,22 @@ public class TestRollingFileSystemSinkWithHdfs
ms.stop();
}
+
+ /**
+ * Test that a failure to connect to HDFS does not cause the init() method
+ * to fail.
+ */
+ @Test
+ public void testInitWithNoHDFS() {
+ String path = "hdfs://" + cluster.getNameNode().getHostAndPort() + "/tmp";
+
+ shutdownHdfs();
+ MockSink.errored = false;
+ initMetricsSystem(path, true, false);
+
+ assertTrue("The sink was not initialized as expected",
+ MockSink.initialized);
+ assertFalse("The sink threw an unexpected error on initialization",
+ MockSink.errored);
+ }
}
http://git-wip-us.apache.org/repos/asf/hadoop/blob/c2460dad/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/metrics2/sink/TestRollingFileSystemSinkWithSecureHdfs.java
----------------------------------------------------------------------
diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/metrics2/sink/TestRollingFileSystemSinkWithSecureHdfs.java b/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/metrics2/sink/TestRollingFileSystemSinkWithSecureHdfs.java
index 12b71f8..dce4fdc 100644
--- a/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/metrics2/sink/TestRollingFileSystemSinkWithSecureHdfs.java
+++ b/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/metrics2/sink/TestRollingFileSystemSinkWithSecureHdfs.java
@@ -51,6 +51,7 @@ import org.apache.hadoop.security.UserGroupInformation;
import org.apache.hadoop.security.ssl.KeyStoreTestUtil;
import org.junit.Test;
import static org.junit.Assert.assertTrue;
+import static org.junit.Assert.assertTrue;
/**
* Test the {@link RollingFileSystemSink} class in the context of HDFS with
@@ -147,7 +148,7 @@ public class TestRollingFileSystemSinkWithSecureHdfs
assertTrue("No exception was generated initializing the sink against a "
+ "secure cluster even though the principal and keytab properties "
- + "were missing", ErrorSink.errored);
+ + "were missing", MockSink.errored);
} finally {
if (cluster != null) {
cluster.shutdown();