You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@flume.apache.org by hs...@apache.org on 2014/09/12 20:22:28 UTC
git commit: FLUME-2420. HDFS Bucketwriter must access sfWriters map
only within synchronized blocks.
Repository: flume
Updated Branches:
refs/heads/trunk 5c5b96a8c -> 4e08bf7d3
FLUME-2420. HDFS Bucketwriter must access sfWriters map only within synchronized blocks.
(chenshangan via Hari)
Project: http://git-wip-us.apache.org/repos/asf/flume/repo
Commit: http://git-wip-us.apache.org/repos/asf/flume/commit/4e08bf7d
Tree: http://git-wip-us.apache.org/repos/asf/flume/tree/4e08bf7d
Diff: http://git-wip-us.apache.org/repos/asf/flume/diff/4e08bf7d
Branch: refs/heads/trunk
Commit: 4e08bf7d38bea365d35a6d391d1507a129cc9ba9
Parents: 5c5b96a
Author: Hari Shreedharan <hs...@apache.org>
Authored: Fri Sep 12 11:21:26 2014 -0700
Committer: Hari Shreedharan <hs...@apache.org>
Committed: Fri Sep 12 11:21:26 2014 -0700
----------------------------------------------------------------------
.../apache/flume/sink/hdfs/HDFSEventSink.java | 26 +++++++++++---------
1 file changed, 15 insertions(+), 11 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/flume/blob/4e08bf7d/flume-ng-sinks/flume-hdfs-sink/src/main/java/org/apache/flume/sink/hdfs/HDFSEventSink.java
----------------------------------------------------------------------
diff --git a/flume-ng-sinks/flume-hdfs-sink/src/main/java/org/apache/flume/sink/hdfs/HDFSEventSink.java b/flume-ng-sinks/flume-hdfs-sink/src/main/java/org/apache/flume/sink/hdfs/HDFSEventSink.java
index 4f3b3f0..33f73a9 100644
--- a/flume-ng-sinks/flume-hdfs-sink/src/main/java/org/apache/flume/sink/hdfs/HDFSEventSink.java
+++ b/flume-ng-sinks/flume-hdfs-sink/src/main/java/org/apache/flume/sink/hdfs/HDFSEventSink.java
@@ -495,16 +495,18 @@ public class HDFSEventSink extends AbstractSink implements Configurable {
@Override
public void stop() {
// do not constrain close() calls with a timeout
- for (Entry<String, BucketWriter> entry : sfWriters.entrySet()) {
- LOG.info("Closing {}", entry.getKey());
+ synchronized (sfWritersLock) {
+ for (Entry<String, BucketWriter> entry : sfWriters.entrySet()) {
+ LOG.info("Closing {}", entry.getKey());
- try {
- entry.getValue().close();
- } catch (Exception ex) {
- LOG.warn("Exception while closing " + entry.getKey() + ". " +
- "Exception follows.", ex);
- if (ex instanceof InterruptedException) {
- Thread.currentThread().interrupt();
+ try {
+ entry.getValue().close();
+ } catch (Exception ex) {
+ LOG.warn("Exception while closing " + entry.getKey() + ". " +
+ "Exception follows.", ex);
+ if (ex instanceof InterruptedException) {
+ Thread.currentThread().interrupt();
+ }
}
}
}
@@ -526,8 +528,10 @@ public class HDFSEventSink extends AbstractSink implements Configurable {
callTimeoutPool = null;
timedRollerPool = null;
- sfWriters.clear();
- sfWriters = null;
+ synchronized (sfWritersLock) {
+ sfWriters.clear();
+ sfWriters = null;
+ }
sinkCounter.stop();
super.stop();
}