You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@flume.apache.org by sz...@apache.org on 2018/03/09 11:07:42 UTC
flume git commit: FLUME-2786 FLUME-3056 FLUME-3117 Application enters
a deadlock when stopped while handleConfigurationEvent
Repository: flume
Updated Branches:
refs/heads/trunk d1f24f56c -> 0d437810d
FLUME-2786 FLUME-3056 FLUME-3117 Application enters a deadlock when stopped while handleConfigurationEvent
Adding better locking mechanism to Application class to prevent deadlock.
this closes #108
this closes #144
Revievers: Denes Arvay, Attila Simon, Benedict Jin, Ferenc Szabo
(Andras Beni, Yan Jian via Ferenc Szabo)
Project: http://git-wip-us.apache.org/repos/asf/flume/repo
Commit: http://git-wip-us.apache.org/repos/asf/flume/commit/0d437810
Tree: http://git-wip-us.apache.org/repos/asf/flume/tree/0d437810
Diff: http://git-wip-us.apache.org/repos/asf/flume/diff/0d437810
Branch: refs/heads/trunk
Commit: 0d437810dc850192b48fa3b31608ffcd23b1f1e9
Parents: d1f24f5
Author: Andras Beni <an...@cloudera.com>
Authored: Fri Mar 9 12:06:25 2018 +0100
Committer: Ferenc Szabo <sz...@apache.org>
Committed: Fri Mar 9 12:06:25 2018 +0100
----------------------------------------------------------------------
.../java/org/apache/flume/node/Application.java | 45 +++++++++++++++-----
...lingPropertiesFileConfigurationProvider.java | 8 +++-
.../org/apache/flume/node/TestApplication.java | 42 ++++++++++++++++++
.../test/resources/flume-conf.properties.2786 | 35 +++++++++++++++
4 files changed, 117 insertions(+), 13 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/flume/blob/0d437810/flume-ng-node/src/main/java/org/apache/flume/node/Application.java
----------------------------------------------------------------------
diff --git a/flume-ng-node/src/main/java/org/apache/flume/node/Application.java b/flume-ng-node/src/main/java/org/apache/flume/node/Application.java
index d6d92f0..7893fcc 100644
--- a/flume-ng-node/src/main/java/org/apache/flume/node/Application.java
+++ b/flume-ng-node/src/main/java/org/apache/flume/node/Application.java
@@ -52,6 +52,7 @@ import java.util.Locale;
import java.util.Map.Entry;
import java.util.Properties;
import java.util.Set;
+import java.util.concurrent.locks.ReentrantLock;
public class Application {
@@ -65,6 +66,7 @@ public class Application {
private final LifecycleSupervisor supervisor;
private MaterializedConfiguration materializedConfiguration;
private MonitorService monitorServer;
+ private final ReentrantLock lifecycleLock = new ReentrantLock();
public Application() {
this(new ArrayList<LifecycleAware>(0));
@@ -75,23 +77,44 @@ public class Application {
supervisor = new LifecycleSupervisor();
}
- public synchronized void start() {
- for (LifecycleAware component : components) {
- supervisor.supervise(component,
- new SupervisorPolicy.AlwaysRestartPolicy(), LifecycleState.START);
+ public void start() {
+ lifecycleLock.lock();
+ try {
+ for (LifecycleAware component : components) {
+ supervisor.supervise(component,
+ new SupervisorPolicy.AlwaysRestartPolicy(), LifecycleState.START);
+ }
+ } finally {
+ lifecycleLock.unlock();
}
}
@Subscribe
- public synchronized void handleConfigurationEvent(MaterializedConfiguration conf) {
- stopAllComponents();
- startAllComponents(conf);
+ public void handleConfigurationEvent(MaterializedConfiguration conf) {
+ try {
+ lifecycleLock.lockInterruptibly();
+ stopAllComponents();
+ startAllComponents(conf);
+ } catch (InterruptedException e) {
+ logger.info("Interrupted while trying to handle configuration event");
+ return;
+ } finally {
+ // If interrupted while trying to lock, we don't own the lock, so must not attempt to unlock
+ if (lifecycleLock.isHeldByCurrentThread()) {
+ lifecycleLock.unlock();
+ }
+ }
}
- public synchronized void stop() {
- supervisor.stop();
- if (monitorServer != null) {
- monitorServer.stop();
+ public void stop() {
+ lifecycleLock.lock();
+ try {
+ supervisor.stop();
+ if (monitorServer != null) {
+ monitorServer.stop();
+ }
+ } finally {
+ lifecycleLock.unlock();
}
}
http://git-wip-us.apache.org/repos/asf/flume/blob/0d437810/flume-ng-node/src/main/java/org/apache/flume/node/PollingPropertiesFileConfigurationProvider.java
----------------------------------------------------------------------
diff --git a/flume-ng-node/src/main/java/org/apache/flume/node/PollingPropertiesFileConfigurationProvider.java b/flume-ng-node/src/main/java/org/apache/flume/node/PollingPropertiesFileConfigurationProvider.java
index 91a09f0..13cb38f 100644
--- a/flume-ng-node/src/main/java/org/apache/flume/node/PollingPropertiesFileConfigurationProvider.java
+++ b/flume-ng-node/src/main/java/org/apache/flume/node/PollingPropertiesFileConfigurationProvider.java
@@ -85,8 +85,12 @@ public class PollingPropertiesFileConfigurationProvider
executorService.shutdown();
try {
- while (!executorService.awaitTermination(500, TimeUnit.MILLISECONDS)) {
- LOGGER.debug("Waiting for file watcher to terminate");
+ if (!executorService.awaitTermination(500, TimeUnit.MILLISECONDS)) {
+ LOGGER.debug("File watcher has not terminated. Forcing shutdown of executor.");
+ executorService.shutdownNow();
+ while (!executorService.awaitTermination(500, TimeUnit.MILLISECONDS)) {
+ LOGGER.debug("Waiting for file watcher to terminate");
+ }
}
} catch (InterruptedException e) {
LOGGER.debug("Interrupted while waiting for file watcher to terminate");
http://git-wip-us.apache.org/repos/asf/flume/blob/0d437810/flume-ng-node/src/test/java/org/apache/flume/node/TestApplication.java
----------------------------------------------------------------------
diff --git a/flume-ng-node/src/test/java/org/apache/flume/node/TestApplication.java b/flume-ng-node/src/test/java/org/apache/flume/node/TestApplication.java
index affbd8c..3853d50 100644
--- a/flume-ng-node/src/test/java/org/apache/flume/node/TestApplication.java
+++ b/flume-ng-node/src/test/java/org/apache/flume/node/TestApplication.java
@@ -153,4 +153,46 @@ public class TestApplication {
application.stop();
}
}
+
+ @Test(timeout = 10000L)
+ public void testFLUME2786() throws Exception {
+ final String agentName = "test";
+ final int interval = 1;
+ final long intervalMs = 1000L;
+
+ File configFile = new File(baseDir, "flume-conf.properties");
+ Files.copy(new File(getClass().getClassLoader()
+ .getResource("flume-conf.properties.2786").getFile()), configFile);
+ File mockConfigFile = spy(configFile);
+ when(mockConfigFile.lastModified()).then(new Answer<Long>() {
+ @Override
+ public Long answer(InvocationOnMock invocation) throws Throwable {
+ Thread.sleep(intervalMs);
+ return System.currentTimeMillis();
+ }
+ });
+
+ EventBus eventBus = new EventBus(agentName + "-event-bus");
+ PollingPropertiesFileConfigurationProvider configurationProvider =
+ new PollingPropertiesFileConfigurationProvider(agentName,
+ mockConfigFile, eventBus, interval);
+ PollingPropertiesFileConfigurationProvider mockConfigurationProvider =
+ spy(configurationProvider);
+ doAnswer(new Answer<Void>() {
+ @Override
+ public Void answer(InvocationOnMock invocation) throws Throwable {
+ Thread.sleep(intervalMs);
+ invocation.callRealMethod();
+ return null;
+ }
+ }).when(mockConfigurationProvider).stop();
+
+ List<LifecycleAware> components = Lists.newArrayList();
+ components.add(mockConfigurationProvider);
+ Application application = new Application(components);
+ eventBus.register(application);
+ application.start();
+ Thread.sleep(1500L);
+ application.stop();
+ }
}
http://git-wip-us.apache.org/repos/asf/flume/blob/0d437810/flume-ng-node/src/test/resources/flume-conf.properties.2786
----------------------------------------------------------------------
diff --git a/flume-ng-node/src/test/resources/flume-conf.properties.2786 b/flume-ng-node/src/test/resources/flume-conf.properties.2786
new file mode 100755
index 0000000..2a7bea0
--- /dev/null
+++ b/flume-ng-node/src/test/resources/flume-conf.properties.2786
@@ -0,0 +1,35 @@
+#
+# Licensed to the Apache Software Foundation (ASF) under one
+# or more contributor license agreements. See the NOTICE file
+# distributed with this work for additional information
+# regarding copyright ownership. The ASF licenses this file
+# to you under the Apache License, Version 2.0 (the
+# "License"); you may not use this file except in compliance
+# with the License. You may obtain a copy of the License at
+#
+# http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing, software
+# distributed under the License is distributed on an "AS IS" BASIS,
+# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+# See the License for the specific language governing permissions and
+# limitations under the License.
+#
+
+#
+# Flume Configuration for testing FLUME-2786
+#
+
+test.sources = source1
+test.channels = channel1
+test.sinks = sink1
+
+test.sources.source1.type = seq
+test.sources.source1.totalEvents = 10000
+test.sources.source1.channels = channel1
+
+test.channels.channel1.type = memory
+test.channels.channel1.capacity = 10000
+
+test.sinks.sink1.type = null
+test.sinks.sink1.channel = channel1