You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@flume.apache.org by hs...@apache.org on 2014/09/12 20:22:28 UTC

git commit: FLUME-2420. HDFS Bucketwriter must access sfWriters map only within synchronized blocks.

Repository: flume
Updated Branches:
  refs/heads/trunk 5c5b96a8c -> 4e08bf7d3


FLUME-2420. HDFS Bucketwriter must access sfWriters map only within synchronized blocks.

(chenshangan via Hari)


Project: http://git-wip-us.apache.org/repos/asf/flume/repo
Commit: http://git-wip-us.apache.org/repos/asf/flume/commit/4e08bf7d
Tree: http://git-wip-us.apache.org/repos/asf/flume/tree/4e08bf7d
Diff: http://git-wip-us.apache.org/repos/asf/flume/diff/4e08bf7d

Branch: refs/heads/trunk
Commit: 4e08bf7d38bea365d35a6d391d1507a129cc9ba9
Parents: 5c5b96a
Author: Hari Shreedharan <hs...@apache.org>
Authored: Fri Sep 12 11:21:26 2014 -0700
Committer: Hari Shreedharan <hs...@apache.org>
Committed: Fri Sep 12 11:21:26 2014 -0700

----------------------------------------------------------------------
 .../apache/flume/sink/hdfs/HDFSEventSink.java   | 26 +++++++++++---------
 1 file changed, 15 insertions(+), 11 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/flume/blob/4e08bf7d/flume-ng-sinks/flume-hdfs-sink/src/main/java/org/apache/flume/sink/hdfs/HDFSEventSink.java
----------------------------------------------------------------------
diff --git a/flume-ng-sinks/flume-hdfs-sink/src/main/java/org/apache/flume/sink/hdfs/HDFSEventSink.java b/flume-ng-sinks/flume-hdfs-sink/src/main/java/org/apache/flume/sink/hdfs/HDFSEventSink.java
index 4f3b3f0..33f73a9 100644
--- a/flume-ng-sinks/flume-hdfs-sink/src/main/java/org/apache/flume/sink/hdfs/HDFSEventSink.java
+++ b/flume-ng-sinks/flume-hdfs-sink/src/main/java/org/apache/flume/sink/hdfs/HDFSEventSink.java
@@ -495,16 +495,18 @@ public class HDFSEventSink extends AbstractSink implements Configurable {
   @Override
   public void stop() {
     // do not constrain close() calls with a timeout
-    for (Entry<String, BucketWriter> entry : sfWriters.entrySet()) {
-      LOG.info("Closing {}", entry.getKey());
+    synchronized (sfWritersLock) {
+      for (Entry<String, BucketWriter> entry : sfWriters.entrySet()) {
+        LOG.info("Closing {}", entry.getKey());
 
-      try {
-        entry.getValue().close();
-      } catch (Exception ex) {
-        LOG.warn("Exception while closing " + entry.getKey() + ". " +
-                "Exception follows.", ex);
-        if (ex instanceof InterruptedException) {
-          Thread.currentThread().interrupt();
+        try {
+          entry.getValue().close();
+        } catch (Exception ex) {
+          LOG.warn("Exception while closing " + entry.getKey() + ". " +
+                  "Exception follows.", ex);
+          if (ex instanceof InterruptedException) {
+            Thread.currentThread().interrupt();
+          }
         }
       }
     }
@@ -526,8 +528,10 @@ public class HDFSEventSink extends AbstractSink implements Configurable {
     callTimeoutPool = null;
     timedRollerPool = null;
 
-    sfWriters.clear();
-    sfWriters = null;
+    synchronized (sfWritersLock) {
+      sfWriters.clear();
+      sfWriters = null;
+    }
     sinkCounter.stop();
     super.stop();
   }