You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@flume.apache.org by ar...@apache.org on 2012/05/30 08:44:48 UTC
svn commit: r1344115 -
/incubator/flume/trunk/flume-ng-sinks/flume-hdfs-sink/src/main/java/org/apache/flume/sink/hdfs/BucketWriter.java
Author: arvind
Date: Wed May 30 06:44:47 2012
New Revision: 1344115
URL: http://svn.apache.org/viewvc?rev=1344115&view=rev
Log:
FLUME-1231. Deadlock in BucketWriter during shutdown.
(Mike Percy via Arvind Prabhakar)
Modified:
incubator/flume/trunk/flume-ng-sinks/flume-hdfs-sink/src/main/java/org/apache/flume/sink/hdfs/BucketWriter.java
Modified: incubator/flume/trunk/flume-ng-sinks/flume-hdfs-sink/src/main/java/org/apache/flume/sink/hdfs/BucketWriter.java
URL: http://svn.apache.org/viewvc/incubator/flume/trunk/flume-ng-sinks/flume-hdfs-sink/src/main/java/org/apache/flume/sink/hdfs/BucketWriter.java?rev=1344115&r1=1344114&r2=1344115&view=diff
==============================================================================
--- incubator/flume/trunk/flume-ng-sinks/flume-hdfs-sink/src/main/java/org/apache/flume/sink/hdfs/BucketWriter.java (original)
+++ incubator/flume/trunk/flume-ng-sinks/flume-hdfs-sink/src/main/java/org/apache/flume/sink/hdfs/BucketWriter.java Wed May 30 06:44:47 2012
@@ -30,7 +30,6 @@ import org.apache.hadoop.fs.FileSystem;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.io.SequenceFile.CompressionType;
import org.apache.hadoop.io.compress.CompressionCodec;
-import org.apache.hadoop.security.UserGroupInformation;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
@@ -46,10 +45,9 @@ class BucketWriter {
private static final String IN_USE_EXT = ".tmp";
/**
- * In case of an error writing to HDFS (it hangs) this instance will be
- * tossed away and we will create a new instance. Gurantee unique files
- * in this case.
+ * This lock ensures that only one thread can open a file at a time.
*/
+ private static final Integer staticLock = new Integer(1);
private HDFSWriter writer;
private FlumeFormatter formatter;
private long eventCounter;
@@ -100,7 +98,6 @@ class BucketWriter {
/**
* open() is called by append()
- * WARNING: acquires a lock on the logged-in Kerberos user object!
* @throws IOException
*/
private void open() throws IOException {
@@ -108,19 +105,21 @@ class BucketWriter {
throw new IOException("Invalid file settings");
}
- long counter = fileExtensionCounter.incrementAndGet();
-
Configuration config = new Configuration();
// disable FileSystem JVM shutdown hook
config.setBoolean("fs.automatic.close", false);
// Hadoop is not thread safe when doing certain RPC operations,
- // including getFileSystem(), when running under Kerberos
- UserGroupInformation staticLogin = UserGroupInformation.getLoginUser();
- synchronized (staticLogin) {
+ // including getFileSystem(), when running under Kerberos.
+ // open() must be called by one thread at a time in the JVM.
+ // NOTE: tried synchronizing on the underlying Kerberos principal previously
+ // which caused deadlocks. See FLUME-1231.
+ synchronized (staticLock) {
+ long counter = fileExtensionCounter.incrementAndGet();
if (codeC == null) {
bucketPath = filePath + "." + counter;
- // need to get reference to FS before writer does to avoid shutdown hook
+ // Need to get reference to FS using above config before underlying
+ // writer does in order to avoid shutdown hook & IllegalStateExceptions
fileSystem = new Path(bucketPath).getFileSystem(config);
LOG.info("Creating " + bucketPath + IN_USE_EXT);
writer.open(bucketPath + IN_USE_EXT, formatter);