You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@flume.apache.org by ar...@apache.org on 2012/05/30 08:44:48 UTC

svn commit: r1344115 - /incubator/flume/trunk/flume-ng-sinks/flume-hdfs-sink/src/main/java/org/apache/flume/sink/hdfs/BucketWriter.java

Author: arvind
Date: Wed May 30 06:44:47 2012
New Revision: 1344115

URL: http://svn.apache.org/viewvc?rev=1344115&view=rev
Log:
FLUME-1231. Deadlock in BucketWriter during shutdown.

(Mike Percy via Arvind Prabhakar)

Modified:
    incubator/flume/trunk/flume-ng-sinks/flume-hdfs-sink/src/main/java/org/apache/flume/sink/hdfs/BucketWriter.java

Modified: incubator/flume/trunk/flume-ng-sinks/flume-hdfs-sink/src/main/java/org/apache/flume/sink/hdfs/BucketWriter.java
URL: http://svn.apache.org/viewvc/incubator/flume/trunk/flume-ng-sinks/flume-hdfs-sink/src/main/java/org/apache/flume/sink/hdfs/BucketWriter.java?rev=1344115&r1=1344114&r2=1344115&view=diff
==============================================================================
--- incubator/flume/trunk/flume-ng-sinks/flume-hdfs-sink/src/main/java/org/apache/flume/sink/hdfs/BucketWriter.java (original)
+++ incubator/flume/trunk/flume-ng-sinks/flume-hdfs-sink/src/main/java/org/apache/flume/sink/hdfs/BucketWriter.java Wed May 30 06:44:47 2012
@@ -30,7 +30,6 @@ import org.apache.hadoop.fs.FileSystem;
 import org.apache.hadoop.fs.Path;
 import org.apache.hadoop.io.SequenceFile.CompressionType;
 import org.apache.hadoop.io.compress.CompressionCodec;
-import org.apache.hadoop.security.UserGroupInformation;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 
@@ -46,10 +45,9 @@ class BucketWriter {
 
   private static final String IN_USE_EXT = ".tmp";
   /**
-   * In case of an error writing to HDFS (it hangs) this instance will be
-   * tossed away and we will create a new instance. Gurantee unique files
-   * in this case.
+   * This lock ensures that only one thread can open a file at a time.
    */
+  private static final Integer staticLock = new Integer(1);
   private HDFSWriter writer;
   private FlumeFormatter formatter;
   private long eventCounter;
@@ -100,7 +98,6 @@ class BucketWriter {
 
   /**
    * open() is called by append()
-   * WARNING: acquires a lock on the logged-in Kerberos user object!
    * @throws IOException
    */
   private void open() throws IOException {
@@ -108,19 +105,21 @@ class BucketWriter {
       throw new IOException("Invalid file settings");
     }
 
-    long counter = fileExtensionCounter.incrementAndGet();
-
     Configuration config = new Configuration();
     // disable FileSystem JVM shutdown hook
     config.setBoolean("fs.automatic.close", false);
 
     // Hadoop is not thread safe when doing certain RPC operations,
-    // including getFileSystem(), when running under Kerberos
-    UserGroupInformation staticLogin = UserGroupInformation.getLoginUser();
-    synchronized (staticLogin) {
+    // including getFileSystem(), when running under Kerberos.
+    // open() must be called by one thread at a time in the JVM.
+    // NOTE: tried synchronizing on the underlying Kerberos principal previously
+    // which caused deadlocks. See FLUME-1231.
+    synchronized (staticLock) {
+      long counter = fileExtensionCounter.incrementAndGet();
       if (codeC == null) {
         bucketPath = filePath + "." + counter;
-        // need to get reference to FS before writer does to avoid shutdown hook
+        // Need to get reference to FS using above config before underlying
+        // writer does in order to avoid shutdown hook & IllegalStateExceptions
         fileSystem = new Path(bucketPath).getFileSystem(config);
         LOG.info("Creating " + bucketPath + IN_USE_EXT);
         writer.open(bucketPath + IN_USE_EXT, formatter);