You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@flume.apache.org by sz...@apache.org on 2018/03/09 11:07:42 UTC

flume git commit: FLUME-2786 FLUME-3056 FLUME-3117 Application enters a deadlock when stopped while handleConfigurationEvent

Repository: flume
Updated Branches:
  refs/heads/trunk d1f24f56c -> 0d437810d


FLUME-2786 FLUME-3056 FLUME-3117 Application enters a deadlock when stopped while handleConfigurationEvent

Adding better locking mechanism to Application class to prevent deadlock.

this closes #108
this closes #144

Revievers: Denes Arvay, Attila Simon, Benedict Jin, Ferenc Szabo

(Andras Beni, Yan Jian via Ferenc Szabo)


Project: http://git-wip-us.apache.org/repos/asf/flume/repo
Commit: http://git-wip-us.apache.org/repos/asf/flume/commit/0d437810
Tree: http://git-wip-us.apache.org/repos/asf/flume/tree/0d437810
Diff: http://git-wip-us.apache.org/repos/asf/flume/diff/0d437810

Branch: refs/heads/trunk
Commit: 0d437810dc850192b48fa3b31608ffcd23b1f1e9
Parents: d1f24f5
Author: Andras Beni <an...@cloudera.com>
Authored: Fri Mar 9 12:06:25 2018 +0100
Committer: Ferenc Szabo <sz...@apache.org>
Committed: Fri Mar 9 12:06:25 2018 +0100

----------------------------------------------------------------------
 .../java/org/apache/flume/node/Application.java | 45 +++++++++++++++-----
 ...lingPropertiesFileConfigurationProvider.java |  8 +++-
 .../org/apache/flume/node/TestApplication.java  | 42 ++++++++++++++++++
 .../test/resources/flume-conf.properties.2786   | 35 +++++++++++++++
 4 files changed, 117 insertions(+), 13 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/flume/blob/0d437810/flume-ng-node/src/main/java/org/apache/flume/node/Application.java
----------------------------------------------------------------------
diff --git a/flume-ng-node/src/main/java/org/apache/flume/node/Application.java b/flume-ng-node/src/main/java/org/apache/flume/node/Application.java
index d6d92f0..7893fcc 100644
--- a/flume-ng-node/src/main/java/org/apache/flume/node/Application.java
+++ b/flume-ng-node/src/main/java/org/apache/flume/node/Application.java
@@ -52,6 +52,7 @@ import java.util.Locale;
 import java.util.Map.Entry;
 import java.util.Properties;
 import java.util.Set;
+import java.util.concurrent.locks.ReentrantLock;
 
 public class Application {
 
@@ -65,6 +66,7 @@ public class Application {
   private final LifecycleSupervisor supervisor;
   private MaterializedConfiguration materializedConfiguration;
   private MonitorService monitorServer;
+  private final ReentrantLock lifecycleLock = new ReentrantLock();
 
   public Application() {
     this(new ArrayList<LifecycleAware>(0));
@@ -75,23 +77,44 @@ public class Application {
     supervisor = new LifecycleSupervisor();
   }
 
-  public synchronized void start() {
-    for (LifecycleAware component : components) {
-      supervisor.supervise(component,
-          new SupervisorPolicy.AlwaysRestartPolicy(), LifecycleState.START);
+  public void start() {
+    lifecycleLock.lock();
+    try {
+      for (LifecycleAware component : components) {
+        supervisor.supervise(component,
+            new SupervisorPolicy.AlwaysRestartPolicy(), LifecycleState.START);
+      }
+    } finally {
+      lifecycleLock.unlock();
     }
   }
 
   @Subscribe
-  public synchronized void handleConfigurationEvent(MaterializedConfiguration conf) {
-    stopAllComponents();
-    startAllComponents(conf);
+  public void handleConfigurationEvent(MaterializedConfiguration conf) {
+    try {
+      lifecycleLock.lockInterruptibly();
+      stopAllComponents();
+      startAllComponents(conf);
+    } catch (InterruptedException e) {
+      logger.info("Interrupted while trying to handle configuration event");
+      return;
+    } finally {
+      // If interrupted while trying to lock, we don't own the lock, so must not attempt to unlock
+      if (lifecycleLock.isHeldByCurrentThread()) {
+        lifecycleLock.unlock();
+      }
+    }
   }
 
-  public synchronized void stop() {
-    supervisor.stop();
-    if (monitorServer != null) {
-      monitorServer.stop();
+  public void stop() {
+    lifecycleLock.lock();
+    try {
+      supervisor.stop();
+      if (monitorServer != null) {
+        monitorServer.stop();
+      }
+    } finally {
+      lifecycleLock.unlock();
     }
   }
 

http://git-wip-us.apache.org/repos/asf/flume/blob/0d437810/flume-ng-node/src/main/java/org/apache/flume/node/PollingPropertiesFileConfigurationProvider.java
----------------------------------------------------------------------
diff --git a/flume-ng-node/src/main/java/org/apache/flume/node/PollingPropertiesFileConfigurationProvider.java b/flume-ng-node/src/main/java/org/apache/flume/node/PollingPropertiesFileConfigurationProvider.java
index 91a09f0..13cb38f 100644
--- a/flume-ng-node/src/main/java/org/apache/flume/node/PollingPropertiesFileConfigurationProvider.java
+++ b/flume-ng-node/src/main/java/org/apache/flume/node/PollingPropertiesFileConfigurationProvider.java
@@ -85,8 +85,12 @@ public class PollingPropertiesFileConfigurationProvider
 
     executorService.shutdown();
     try {
-      while (!executorService.awaitTermination(500, TimeUnit.MILLISECONDS)) {
-        LOGGER.debug("Waiting for file watcher to terminate");
+      if (!executorService.awaitTermination(500, TimeUnit.MILLISECONDS)) {
+        LOGGER.debug("File watcher has not terminated. Forcing shutdown of executor.");
+        executorService.shutdownNow();
+        while (!executorService.awaitTermination(500, TimeUnit.MILLISECONDS)) {
+          LOGGER.debug("Waiting for file watcher to terminate");
+        }
       }
     } catch (InterruptedException e) {
       LOGGER.debug("Interrupted while waiting for file watcher to terminate");

http://git-wip-us.apache.org/repos/asf/flume/blob/0d437810/flume-ng-node/src/test/java/org/apache/flume/node/TestApplication.java
----------------------------------------------------------------------
diff --git a/flume-ng-node/src/test/java/org/apache/flume/node/TestApplication.java b/flume-ng-node/src/test/java/org/apache/flume/node/TestApplication.java
index affbd8c..3853d50 100644
--- a/flume-ng-node/src/test/java/org/apache/flume/node/TestApplication.java
+++ b/flume-ng-node/src/test/java/org/apache/flume/node/TestApplication.java
@@ -153,4 +153,46 @@ public class TestApplication {
       application.stop();
     }
   }
+
+  @Test(timeout = 10000L)
+  public void testFLUME2786() throws Exception {
+    final String agentName = "test";
+    final int interval = 1;
+    final long intervalMs = 1000L;
+
+    File configFile = new File(baseDir, "flume-conf.properties");
+    Files.copy(new File(getClass().getClassLoader()
+        .getResource("flume-conf.properties.2786").getFile()), configFile);
+    File mockConfigFile = spy(configFile);
+    when(mockConfigFile.lastModified()).then(new Answer<Long>() {
+      @Override
+      public Long answer(InvocationOnMock invocation) throws Throwable {
+        Thread.sleep(intervalMs);
+        return System.currentTimeMillis();
+      }
+    });
+
+    EventBus eventBus = new EventBus(agentName + "-event-bus");
+    PollingPropertiesFileConfigurationProvider configurationProvider =
+        new PollingPropertiesFileConfigurationProvider(agentName,
+            mockConfigFile, eventBus, interval);
+    PollingPropertiesFileConfigurationProvider mockConfigurationProvider =
+        spy(configurationProvider);
+    doAnswer(new Answer<Void>() {
+      @Override
+      public Void answer(InvocationOnMock invocation) throws Throwable {
+        Thread.sleep(intervalMs);
+        invocation.callRealMethod();
+        return null;
+      }
+    }).when(mockConfigurationProvider).stop();
+
+    List<LifecycleAware> components = Lists.newArrayList();
+    components.add(mockConfigurationProvider);
+    Application application = new Application(components);
+    eventBus.register(application);
+    application.start();
+    Thread.sleep(1500L);
+    application.stop();
+  }
 }

http://git-wip-us.apache.org/repos/asf/flume/blob/0d437810/flume-ng-node/src/test/resources/flume-conf.properties.2786
----------------------------------------------------------------------
diff --git a/flume-ng-node/src/test/resources/flume-conf.properties.2786 b/flume-ng-node/src/test/resources/flume-conf.properties.2786
new file mode 100755
index 0000000..2a7bea0
--- /dev/null
+++ b/flume-ng-node/src/test/resources/flume-conf.properties.2786
@@ -0,0 +1,35 @@
+#
+# Licensed to the Apache Software Foundation (ASF) under one
+# or more contributor license agreements.  See the NOTICE file
+# distributed with this work for additional information
+# regarding copyright ownership.  The ASF licenses this file
+# to you under the Apache License, Version 2.0 (the
+# "License"); you may not use this file except in compliance
+# with the License.  You may obtain a copy of the License at
+#
+#     http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing, software
+# distributed under the License is distributed on an "AS IS" BASIS,
+# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+# See the License for the specific language governing permissions and
+# limitations under the License.
+#
+
+#
+# Flume Configuration for testing FLUME-2786
+#
+
+test.sources = source1
+test.channels = channel1
+test.sinks = sink1
+
+test.sources.source1.type = seq
+test.sources.source1.totalEvents = 10000
+test.sources.source1.channels = channel1
+
+test.channels.channel1.type = memory
+test.channels.channel1.capacity = 10000
+
+test.sinks.sink1.type = null
+test.sinks.sink1.channel = channel1