You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@ambari.apache.org by ao...@apache.org on 2016/04/07 19:29:32 UTC
ambari git commit: AMBARI-15762. Component install post processing
can not be run in parallel (aonishuk)
Repository: ambari
Updated Branches:
refs/heads/branch-2.2 2d4bf7197 -> 9b86d41e1
AMBARI-15762. Component install post processing can not be run in parallel (aonishuk)
Project: http://git-wip-us.apache.org/repos/asf/ambari/repo
Commit: http://git-wip-us.apache.org/repos/asf/ambari/commit/9b86d41e
Tree: http://git-wip-us.apache.org/repos/asf/ambari/tree/9b86d41e
Diff: http://git-wip-us.apache.org/repos/asf/ambari/diff/9b86d41e
Branch: refs/heads/branch-2.2
Commit: 9b86d41e197adc31ab29c1bc71a578a7d4b5da81
Parents: 2d4bf71
Author: Andrew Onishuk <ao...@hortonworks.com>
Authored: Thu Apr 7 20:29:01 2016 +0300
Committer: Andrew Onishuk <ao...@hortonworks.com>
Committed: Thu Apr 7 20:29:01 2016 +0300
----------------------------------------------------------------------
.../TestFileBasedProcessLock.py | 61 ++++++++++++++++
.../functions/file_based_process_lock.py | 73 ++++++++++++++++++++
.../2.0.6/hooks/after-INSTALL/scripts/params.py | 5 ++
.../scripts/shared_initialization.py | 8 ++-
4 files changed, 145 insertions(+), 2 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/ambari/blob/9b86d41e/ambari-agent/src/test/python/resource_management/TestFileBasedProcessLock.py
----------------------------------------------------------------------
diff --git a/ambari-agent/src/test/python/resource_management/TestFileBasedProcessLock.py b/ambari-agent/src/test/python/resource_management/TestFileBasedProcessLock.py
new file mode 100644
index 0000000..e4606cc
--- /dev/null
+++ b/ambari-agent/src/test/python/resource_management/TestFileBasedProcessLock.py
@@ -0,0 +1,61 @@
+'''
+Licensed to the Apache Software Foundation (ASF) under one
+or more contributor license agreements. See the NOTICE file
+distributed with this work for additional information
+regarding copyright ownership. The ASF licenses this file
+to you under the Apache License, Version 2.0 (the
+"License"); you may not use this file except in compliance
+with the License. You may obtain a copy of the License at
+
+ http://www.apache.org/licenses/LICENSE-2.0
+
+Unless required by applicable law or agreed to in writing, software
+distributed under the License is distributed on an "AS IS" BASIS,
+WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+See the License for the specific language governing permissions and
+limitations under the License.
+'''
+
+import os
+import tempfile
+import time
+import shutil
+from unittest import TestCase
+from multiprocessing import Process
+from only_for_platform import not_for_platform, PLATFORM_WINDOWS
+from resource_management.libraries.functions.file_based_process_lock import FileBasedProcessLock
+
+class TestFileBasedProcessLock(TestCase):
+
+
+ @not_for_platform(PLATFORM_WINDOWS)
+ def test_file_based_lock(self):
+ """
+ Test BlockingLock using mkdir atomicity.
+ """
+ test_temp_dir = tempfile.mkdtemp(prefix="test_file_based_lock")
+ try:
+ indicator_dir = os.path.join(test_temp_dir, "indicator")
+ lock_file = os.path.join(test_temp_dir, "lock")
+
+ # Raises an exception if mkdir operation fails.
+ # It indicates that more than one process acquired the lock.
+ def dummy_task(index):
+ with FileBasedProcessLock(lock_file):
+ os.mkdir(indicator_dir)
+ time.sleep(0.1)
+ os.rmdir(indicator_dir)
+
+ process_list = []
+ for i in range(0, 3):
+ p = Process(target=dummy_task, args=(i,))
+ p.start()
+ process_list.append(p)
+
+ for p in process_list:
+ p.join(2)
+ self.assertEquals(p.exitcode, 0)
+
+ finally:
+ shutil.rmtree(test_temp_dir)
+
http://git-wip-us.apache.org/repos/asf/ambari/blob/9b86d41e/ambari-common/src/main/python/resource_management/libraries/functions/file_based_process_lock.py
----------------------------------------------------------------------
diff --git a/ambari-common/src/main/python/resource_management/libraries/functions/file_based_process_lock.py b/ambari-common/src/main/python/resource_management/libraries/functions/file_based_process_lock.py
new file mode 100644
index 0000000..f9c981d
--- /dev/null
+++ b/ambari-common/src/main/python/resource_management/libraries/functions/file_based_process_lock.py
@@ -0,0 +1,73 @@
+#!/usr/bin/env python
+"""
+Licensed to the Apache Software Foundation (ASF) under one
+or more contributor license agreements. See the NOTICE file
+distributed with this work for additional information
+regarding copyright ownership. The ASF licenses this file
+to you under the Apache License, Version 2.0 (the
+"License"); you may not use this file except in compliance
+with the License. You may obtain a copy of the License at
+
+ http://www.apache.org/licenses/LICENSE-2.0
+
+Unless required by applicable law or agreed to in writing, software
+distributed under the License is distributed on an "AS IS" BASIS,
+WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+See the License for the specific language governing permissions and
+limitations under the License.
+
+Ambari Agent
+
+"""
+
+import fcntl
+
+from resource_management.core.logger import Logger
+
+class FileBasedProcessLock(object):
+ """A file descriptor based lock for interprocess locking.
+ The lock is automatically released when process dies.
+
+ WARNING: Do not use this lock for synchronization between threads.
+ Multiple threads in a same process can simultaneously acquire this lock.
+ It should be used only for locking between processes.
+ """
+
+ def __init__(self, lock_file_path):
+ """
+ :param lock_file_path: The path to the file used for locking
+ """
+ self.lock_file_name = lock_file_path
+ self.lock_file = None
+
+ def blocking_lock(self):
+ """
+ Creates the lock file if it doesn't exist.
+ Waits to acquire an exclusive lock on the lock file descriptor.
+ """
+ Logger.info("Trying to acquire a lock on {0}".format(self.lock_file_name))
+ if self.lock_file is None or self.lock_file.closed:
+ self.lock_file = open(self.lock_file_name, 'a')
+ fcntl.lockf(self.lock_file, fcntl.LOCK_EX)
+ Logger.info("Acquired the lock on {0}".format(self.lock_file_name))
+
+ def unlock(self):
+ """
+ Unlocks the lock file descriptor.
+ """
+ Logger.info("Releasing the lock on {0}".format(self.lock_file_name))
+ fcntl.lockf(self.lock_file, fcntl.LOCK_UN)
+ try:
+ if self.lock_file is not None:
+ self.lock_file.close()
+ self.lock_file = None
+ except IOError:
+ pass
+
+ def __enter__(self):
+ self.blocking_lock()
+ return None
+
+ def __exit__(self, exc_type, exc_val, exc_tb):
+ self.unlock()
+ return False
\ No newline at end of file
http://git-wip-us.apache.org/repos/asf/ambari/blob/9b86d41e/ambari-server/src/main/resources/stacks/HDP/2.0.6/hooks/after-INSTALL/scripts/params.py
----------------------------------------------------------------------
diff --git a/ambari-server/src/main/resources/stacks/HDP/2.0.6/hooks/after-INSTALL/scripts/params.py b/ambari-server/src/main/resources/stacks/HDP/2.0.6/hooks/after-INSTALL/scripts/params.py
index 68fe9f9..603875d 100644
--- a/ambari-server/src/main/resources/stacks/HDP/2.0.6/hooks/after-INSTALL/scripts/params.py
+++ b/ambari-server/src/main/resources/stacks/HDP/2.0.6/hooks/after-INSTALL/scripts/params.py
@@ -17,6 +17,8 @@ limitations under the License.
"""
+import os
+
from ambari_commons.constants import AMBARI_SUDO_BINARY
from resource_management.libraries.script import Script
from resource_management.libraries.functions import default
@@ -26,6 +28,7 @@ from resource_management.libraries.functions import format_jvm_option
from resource_management.libraries.functions.version import format_hdp_stack_version
config = Script.get_config()
+tmp_dir = Script.get_tmp_dir()
dfs_type = default("/commandParams/dfs_type", "")
@@ -89,3 +92,5 @@ has_namenode = not len(namenode_host) == 0
if has_namenode or dfs_type == 'HCFS':
hadoop_conf_dir = conf_select.get_hadoop_conf_dir(force_latest_on_upgrade=True)
+
+link_configs_lock_file = os.path.join(tmp_dir, "link_configs_lock_file")
http://git-wip-us.apache.org/repos/asf/ambari/blob/9b86d41e/ambari-server/src/main/resources/stacks/HDP/2.0.6/hooks/after-INSTALL/scripts/shared_initialization.py
----------------------------------------------------------------------
diff --git a/ambari-server/src/main/resources/stacks/HDP/2.0.6/hooks/after-INSTALL/scripts/shared_initialization.py b/ambari-server/src/main/resources/stacks/HDP/2.0.6/hooks/after-INSTALL/scripts/shared_initialization.py
index 8ee2f7a..1d61dfc 100644
--- a/ambari-server/src/main/resources/stacks/HDP/2.0.6/hooks/after-INSTALL/scripts/shared_initialization.py
+++ b/ambari-server/src/main/resources/stacks/HDP/2.0.6/hooks/after-INSTALL/scripts/shared_initialization.py
@@ -24,6 +24,7 @@ from resource_management.libraries.functions import conf_select
from resource_management.libraries.functions import hdp_select
from resource_management.libraries.functions.format import format
from resource_management.libraries.functions.version import compare_versions
+from resource_management.libraries.functions.file_based_process_lock import FileBasedProcessLock
from resource_management.libraries.resources.xml_config import XmlConfig
from resource_management.libraries.script import Script
@@ -85,6 +86,7 @@ def link_configs(struct_out_file):
"""
Links configs, only on a fresh install of HDP-2.3 and higher
"""
+ import params
if not Script.is_hdp_stack_greater_or_equal("2.3"):
Logger.info("Can only link configs for HDP-2.3 and higher.")
@@ -96,5 +98,7 @@ def link_configs(struct_out_file):
Logger.info("Could not load 'version' from {0}".format(struct_out_file))
return
- for k, v in conf_select.PACKAGE_DIRS.iteritems():
- conf_select.convert_conf_directories_to_symlinks(k, json_version, v)
+ # On parallel command execution this should be executed by a single process at a time.
+ with FileBasedProcessLock(params.link_configs_lock_file):
+ for k, v in conf_select.PACKAGE_DIRS.iteritems():
+ conf_select.convert_conf_directories_to_symlinks(k, json_version, v)