You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@ambari.apache.org by jo...@apache.org on 2016/10/18 19:33:01 UTC
[09/12] ambari git commit: AMBARI-18629. HDFS goes down after
installing cluster (aonishuk)
AMBARI-18629. HDFS goes down after installing cluster (aonishuk)
Project: http://git-wip-us.apache.org/repos/asf/ambari/repo
Commit: http://git-wip-us.apache.org/repos/asf/ambari/commit/e68cc10d
Tree: http://git-wip-us.apache.org/repos/asf/ambari/tree/e68cc10d
Diff: http://git-wip-us.apache.org/repos/asf/ambari/diff/e68cc10d
Branch: refs/heads/branch-feature-AMBARI-18456
Commit: e68cc10dd9e0c0c012aa684a69f743fee41310d5
Parents: ee2a125
Author: Andrew Onishuk <ao...@hortonworks.com>
Authored: Tue Oct 18 18:35:15 2016 +0300
Committer: Andrew Onishuk <ao...@hortonworks.com>
Committed: Tue Oct 18 18:35:15 2016 +0300
----------------------------------------------------------------------
.../src/main/python/ambari_agent/ActionQueue.py | 26 +++++-------
.../main/python/ambari_commons/thread_utils.py | 43 ++++++++++++++++++++
2 files changed, 53 insertions(+), 16 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/ambari/blob/e68cc10d/ambari-agent/src/main/python/ambari_agent/ActionQueue.py
----------------------------------------------------------------------
diff --git a/ambari-agent/src/main/python/ambari_agent/ActionQueue.py b/ambari-agent/src/main/python/ambari_agent/ActionQueue.py
index c03ee4f..5962d94 100644
--- a/ambari-agent/src/main/python/ambari_agent/ActionQueue.py
+++ b/ambari-agent/src/main/python/ambari_agent/ActionQueue.py
@@ -36,6 +36,7 @@ from CommandStatusDict import CommandStatusDict
from CustomServiceOrchestrator import CustomServiceOrchestrator
from ambari_agent.BackgroundCommandExecutionHandle import BackgroundCommandExecutionHandle
from ambari_commons.str_utils import split_on_chunks
+from ambari_commons.thread_utils import terminate_thread
logger = logging.getLogger()
@@ -83,7 +84,6 @@ class ActionQueue(threading.Thread):
self.controller = controller
self.configTags = {}
self._stop = threading.Event()
- self.hangingStatusCommands = {}
self.tmpdir = config.get('agent', 'prefix')
self.customServiceOrchestrator = CustomServiceOrchestrator(config, controller)
self.parallel_execution = config.get_parallel_exec_option()
@@ -230,22 +230,16 @@ class ActionQueue(threading.Thread):
elif commandType == self.STATUS_COMMAND:
component_name = command['componentName']
- if component_name in self.hangingStatusCommands and not self.hangingStatusCommands[component_name].isAlive():
- del self.hangingStatusCommands[component_name]
+ thread = threading.Thread(target = self.execute_status_command, args = (command,))
+ thread.daemon = True # hanging status commands should not be prevent ambari-agent from stopping
+ thread.start()
+ thread.join(timeout=self.status_command_timeout)
- if not component_name in self.hangingStatusCommands:
- thread = threading.Thread(target = self.execute_status_command, args = (command,))
- thread.daemon = True # hanging status commands should not be prevent ambari-agent from stopping
- thread.start()
- thread.join(timeout=self.status_command_timeout)
-
- if thread.isAlive():
- # Force context to reset to normal. By context we mean sys.path, imports, logger setting, etc. They are set by specific status command, and are not relevant to ambari-agent.
- PythonReflectiveExecutor.last_context.revert()
- logger.warn("Command {0} for {1} is running for more than {2} seconds. Skipping it for current pack of status commands.".format(commandType, component_name, self.status_command_timeout))
- self.hangingStatusCommands[component_name] = thread
- else:
- logger.info("Not running {0} for {1}, because previous one is still running.".format(commandType, component_name))
+ if thread.isAlive():
+ terminate_thread(thread)
+ # Force context to reset to normal. By context we mean sys.path, imports, logger setting, etc. They are set by specific status command, and are not relevant to ambari-agent.
+ PythonReflectiveExecutor.last_context.revert()
+ logger.warn("Command {0} for {1} was running for more than {2} seconds. Terminated due to timeout.".format(commandType, component_name, self.status_command_timeout))
else:
logger.error("Unrecognized command " + pprint.pformat(command))
except Exception:
http://git-wip-us.apache.org/repos/asf/ambari/blob/e68cc10d/ambari-common/src/main/python/ambari_commons/thread_utils.py
----------------------------------------------------------------------
diff --git a/ambari-common/src/main/python/ambari_commons/thread_utils.py b/ambari-common/src/main/python/ambari_commons/thread_utils.py
new file mode 100644
index 0000000..952022c
--- /dev/null
+++ b/ambari-common/src/main/python/ambari_commons/thread_utils.py
@@ -0,0 +1,43 @@
+#!/usr/bin/env python
+
+'''
+Licensed to the Apache Software Foundation (ASF) under one
+or more contributor license agreements. See the NOTICE file
+distributed with this work for additional information
+regarding copyright ownership. The ASF licenses this file
+to you under the Apache License, Version 2.0 (the
+"License"); you may not use this file except in compliance
+with the License. You may obtain a copy of the License at
+
+ http://www.apache.org/licenses/LICENSE-2.0
+
+Unless required by applicable law or agreed to in writing, software
+distributed under the License is distributed on an "AS IS" BASIS,
+WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+See the License for the specific language governing permissions and
+limitations under the License.
+'''
+
+def terminate_thread(thread):
+ """Terminates a python thread abruptly from another thread.
+
+ This is consider a bad pattern to do this.
+ If possible, please consider handling stopping of the thread from inside of it
+ or creating thread as a separate process (multiprocessing module).
+
+ :param thread: a threading.Thread instance
+ """
+ import ctypes
+ if not thread.isAlive():
+ return
+
+ exc = ctypes.py_object(SystemExit)
+ res = ctypes.pythonapi.PyThreadState_SetAsyncExc(
+ ctypes.c_long(thread.ident), exc)
+ if res == 0:
+ raise ValueError("nonexistent thread id")
+ elif res > 1:
+ # """if it returns a number greater than one, you're in trouble,
+ # and you should call it again with exc=NULL to revert the effect"""
+ ctypes.pythonapi.PyThreadState_SetAsyncExc(thread.ident, None)
+ raise SystemError("PyThreadState_SetAsyncExc failed")
\ No newline at end of file