You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@ambari.apache.org by yu...@apache.org on 2014/11/14 03:20:08 UTC
[26/29] ambari git commit: AMBARI-8269. Merge branch-windows-dev
changes to trunk. (Jayush Luniya via yusaku)
http://git-wip-us.apache.org/repos/asf/ambari/blob/8de3425f/ambari-common/src/main/python/ambari_commons/os_windows.py
----------------------------------------------------------------------
diff --git a/ambari-common/src/main/python/ambari_commons/os_windows.py b/ambari-common/src/main/python/ambari_commons/os_windows.py
new file mode 100644
index 0000000..2fb98e4
--- /dev/null
+++ b/ambari-common/src/main/python/ambari_commons/os_windows.py
@@ -0,0 +1,563 @@
+# !/usr/bin/env python
+
+'''
+Licensed to the Apache Software Foundation (ASF) under one
+or more contributor license agreements. See the NOTICE file
+distributed with this work for additional information
+regarding copyright ownership. The ASF licenses this file
+to you under the Apache License, Version 2.0 (the
+"License"); you may not use this file except in compliance
+with the License. You may obtain a copy of the License at
+
+ http://www.apache.org/licenses/LICENSE-2.0
+
+Unless required by applicable law or agreed to in writing, software
+distributed under the License is distributed on an "AS IS" BASIS,
+WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+See the License for the specific language governing permissions and
+limitations under the License.
+'''
+import os
+import getpass
+import shlex
+import subprocess
+import sys
+import time
+import win32api
+import win32event
+import win32service
+import win32con
+import win32serviceutil
+import wmi
+import random
+import string
+
+import ctypes
+
+from win32security import *
+from win32api import *
+from winerror import ERROR_INVALID_HANDLE
+from win32process import GetExitCodeProcess, STARTF_USESTDHANDLES, STARTUPINFO, CreateProcessAsUser
+from win32event import WaitForSingleObject, INFINITE
+import msvcrt
+import tempfile
+from win32event import *
+from win32api import CloseHandle
+
+from ambari_commons.exceptions import *
+from logging_utils import *
+
+from win32security import LsaOpenPolicy, POLICY_CREATE_ACCOUNT, POLICY_LOOKUP_NAMES, LookupAccountName, \
+ LsaAddAccountRights, LsaRemoveAccountRights, SE_SERVICE_LOGON_NAME
+from win32net import NetUserAdd
+from win32netcon import USER_PRIV_USER, UF_NORMAL_ACCOUNT, UF_SCRIPT
+import pywintypes
+
+SERVICE_STATUS_UNKNOWN = "unknown"
+SERVICE_STATUS_STARTING = "starting"
+SERVICE_STATUS_RUNNING = "running"
+SERVICE_STATUS_STOPPING = "stopping"
+SERVICE_STATUS_STOPPED = "stopped"
+SERVICE_STATUS_NOT_INSTALLED = "not installed"
+
+WHOAMI_GROUPS = "whoami /groups"
+ADMIN_ACCOUNT = "BUILTIN\\Administrators"
+
+class OSVERSIONINFOEXW(ctypes.Structure):
+ _fields_ = [('dwOSVersionInfoSize', ctypes.c_ulong),
+ ('dwMajorVersion', ctypes.c_ulong),
+ ('dwMinorVersion', ctypes.c_ulong),
+ ('dwBuildNumber', ctypes.c_ulong),
+ ('dwPlatformId', ctypes.c_ulong),
+ ('szCSDVersion', ctypes.c_wchar*128),
+ ('wServicePackMajor', ctypes.c_ushort),
+ ('wServicePackMinor', ctypes.c_ushort),
+ ('wSuiteMask', ctypes.c_ushort),
+ ('wProductType', ctypes.c_byte),
+ ('wReserved', ctypes.c_byte)]
+
+def get_windows_version():
+ """
+ Get's the OS major and minor versions. Returns a tuple of
+ (OS_MAJOR, OS_MINOR).
+ """
+ os_version = OSVERSIONINFOEXW()
+ os_version.dwOSVersionInfoSize = ctypes.sizeof(os_version)
+ retcode = ctypes.windll.Ntdll.RtlGetVersion(ctypes.byref(os_version))
+ if retcode != 0:
+ raise Exception("Failed to get OS version")
+
+ return os_version.dwMajorVersion, os_version.dwMinorVersion, os_version.dwBuildNumber
+
+CHECK_FIREWALL_SCRIPT = """[string]$CName = $env:computername
+$reg = [Microsoft.Win32.RegistryKey]::OpenRemoteBaseKey("LocalMachine",$computer)
+$domain = $reg.OpenSubKey("System\CurrentControlSet\Services\SharedAccess\Parameters\FirewallPolicy\DomainProfile").GetValue("EnableFirewall")
+$standart = $reg.OpenSubKey("System\CurrentControlSet\Services\SharedAccess\Parameters\FirewallPolicy\StandardProfile").GetValue("EnableFirewall")
+$public = $reg.OpenSubKey("System\CurrentControlSet\Services\SharedAccess\Parameters\FirewallPolicy\PublicProfile").GetValue("EnableFirewall")
+Write-Host $domain
+Write-Host $standart
+Write-Host $public
+"""
+
+def _create_tmp_files():
+ out_file = tempfile.TemporaryFile(mode="r+b")
+ err_file = tempfile.TemporaryFile(mode="r+b")
+ return (msvcrt.get_osfhandle(out_file.fileno()),
+ msvcrt.get_osfhandle(err_file.fileno()),
+ out_file,
+ err_file)
+
+
+def _get_files_output(out, err):
+ out.seek(0)
+ err.seek(0)
+ return out.read().strip(), err.read().strip()
+
+
+def _safe_duplicate_handle(h):
+ try:
+ h = DuplicateHandle(GetCurrentProcess(),
+ h,
+ GetCurrentProcess(),
+ 0,
+ True,
+ win32con.DUPLICATE_SAME_ACCESS)
+ return True, h
+ except Exception as exc:
+ if exc.winerror == ERROR_INVALID_HANDLE:
+ return True, None
+ return False, None
+
+
+def run_os_command_impersonated(cmd, user, password, domain='.'):
+ si = STARTUPINFO()
+
+ out_handle, err_handle, out_file, err_file = _create_tmp_files()
+
+ ok, si.hStdInput = _safe_duplicate_handle(GetStdHandle(STD_INPUT_HANDLE))
+
+ if not ok:
+ raise Exception("Unable to create StdInput for child process")
+ ok, si.hStdOutput = _safe_duplicate_handle(out_handle)
+ if not ok:
+ raise Exception("Unable to create StdOut for child process")
+ ok, si.hStdError = _safe_duplicate_handle(err_handle)
+ if not ok:
+ raise Exception("Unable to create StdErr for child process")
+
+ si.dwFlags = STARTF_USESTDHANDLES
+ si.lpDesktop = ""
+
+ user_token = LogonUser(user, domain, password, win32con.LOGON32_LOGON_SERVICE, win32con.LOGON32_PROVIDER_DEFAULT)
+ primary_token = DuplicateTokenEx(user_token, SecurityImpersonation, 0, TokenPrimary)
+ info = CreateProcessAsUser(primary_token, None, cmd, None, None, 1, 0, None, None, si)
+
+ hProcess, hThread, dwProcessId, dwThreadId = info
+ hThread.Close()
+
+ try:
+ WaitForSingleObject(hProcess, INFINITE)
+ except KeyboardInterrupt:
+ pass
+
+ out, err = _get_files_output(out_file, err_file)
+ exitcode = GetExitCodeProcess(hProcess)
+
+ return exitcode, out, err
+
+def run_os_command(cmd, env=None):
+ if isinstance(cmd,basestring):
+ cmd = cmd.replace("\\", "\\\\")
+ cmd = shlex.split(cmd)
+ process = subprocess.Popen(cmd,
+ stdout=subprocess.PIPE,
+ stdin=subprocess.PIPE,
+ stderr=subprocess.PIPE,
+ env=env
+ )
+ (stdoutdata, stderrdata) = process.communicate()
+ return process.returncode, stdoutdata, stderrdata
+
+# execute powershell script passed in script_content. Script will be in temporary file to avoid different escape
+# and formatting problems.
+def run_powershell_script(script_content):
+ tmp_dir = tempfile.gettempdir()
+ random_filename = ''.join(random.choice(string.lowercase) for i in range(10))
+ script_file = open(os.path.join(tmp_dir,random_filename+".ps1"),"w")
+ script_file.write(script_content)
+ script_file.close()
+ result = run_os_command("powershell -ExecutionPolicy unrestricted -File {0}".format(script_file.name))
+ os.remove(script_file.name)
+ return result
+
+def os_change_owner(filePath, user):
+ cmd = ['icacls', filePath, '/setowner', user]
+ retcode, outdata, errdata = run_os_command(cmd)
+ return retcode
+
+def os_is_root():
+ '''
+ Checks whether the current user is a member of the Administrators group
+ Returns True if yes, otherwise False
+ '''
+ retcode, out, err = run_os_command(WHOAMI_GROUPS)
+ if retcode != 0:
+ err_msg = "Unable to check the current user's group memberships. Command {0} returned exit code {1} with message: {2}".format(WHOAMI_GROUPS, retcode, err)
+ print_warning_msg(err_msg)
+ raise FatalException(retcode, err_msg)
+
+ #Check for Administrators group membership
+ if -1 != out.find('\n' + ADMIN_ACCOUNT):
+ return True
+
+ return False
+
+def os_set_file_permissions(file, mod, recursive, user):
+ retcode = 0
+
+ #WARN_MSG = "Command {0} returned exit code {1} with message: {2}"
+ #if recursive:
+ # params = " -R "
+ #else:
+ # params = ""
+ #command = NR_CHMOD_CMD.format(params, mod, file)
+ #retcode, out, err = run_os_command(command)
+ #if retcode != 0:
+ # print_warning_msg(WARN_MSG.format(command, file, err))
+ #command = NR_CHOWN_CMD.format(params, user, file)
+ #retcode, out, err = run_os_command(command)
+ #if retcode != 0:
+ # print_warning_msg(WARN_MSG.format(command, file, err))
+
+ # rights = mod
+ # acls_remove_cmd = "icacls {0} /remove {1}".format(file, user)
+ # retcode, out, err = run_os_command(acls_remove_cmd)
+ # if retcode == 0:
+ # acls_modify_cmd = "icacls {0} /grant {1}:{2}".format(file, user, rights)
+ # retcode, out, err = run_os_command(acls_modify_cmd)
+ return retcode
+
+
+def os_set_open_files_limit(maxOpenFiles):
+ # No open files limit in Windows. Not messing around with the System Resource Manager, at least for now.
+ pass
+
+
+def os_getpass(prompt, stream=None):
+ """Prompt for password with echo off, using Windows getch()."""
+ if sys.stdin is not sys.__stdin__:
+ return getpass.fallback_getpass(prompt, stream)
+
+ import msvcrt
+
+ for c in prompt:
+ msvcrt.putch(c)
+
+ pw = ""
+ while True:
+ c = msvcrt.getch()
+ if c == '\r' or c == '\n':
+ break
+ if c == '\003':
+ raise KeyboardInterrupt
+ if c == '\b':
+ if pw == '':
+ pass
+ else:
+ pw = pw[:-1]
+ msvcrt.putch('\b')
+ msvcrt.putch(" ")
+ msvcrt.putch('\b')
+ else:
+ pw = pw + c
+ msvcrt.putch("*")
+
+ msvcrt.putch('\r')
+ msvcrt.putch('\n')
+ return pw
+
+#[fbarca] Not used for now, keep it around just in case
+def wait_for_pid_wmi(processName, parentPid, pattern, timeout):
+ """
+ Check pid for existence during timeout
+ """
+ tstart = time.time()
+ pid_live = 0
+
+ c = wmi.WMI(find_classes=False)
+ qry = "select * from Win32_Process where Name=\"%s\" and ParentProcessId=%d" % (processName, parentPid)
+
+ while int(time.time() - tstart) <= timeout:
+ for proc in c.query(qry):
+ cmdLine = proc.CommandLine
+ if cmdLine is not None and pattern in cmdLine:
+ return pid_live
+ time.sleep(1)
+ return 0
+
+
+#need this for redirecting output form python process to file
+class SyncStreamWriter(object):
+ def __init__(self, stream, hMutexWrite):
+ self.stream = stream
+ self.hMutexWrite = hMutexWrite
+
+ def write(self, data):
+ #Ensure that the output is thread-safe when writing from 2 separate streams into the same file
+ # (typical when redirecting both stdout and stderr to the same file).
+ win32event.WaitForSingleObject(self.hMutexWrite, win32event.INFINITE)
+ try:
+ self.stream.write(data)
+ self.stream.flush()
+ finally:
+ win32event.ReleaseMutex(self.hMutexWrite)
+
+ def __getattr__(self, attr):
+ return getattr(self.stream, attr)
+
+
+class SvcStatusCallback(object):
+ def __init__(self, svc):
+ self.svc = svc
+
+ def reportStartPending(self):
+ self.svc.ReportServiceStatus(win32service.SERVICE_START_PENDING)
+
+ def reportStarted(self):
+ self.svc.ReportServiceStatus(win32service.SERVICE_RUNNING)
+
+ def reportStopPending(self):
+ self.svc.ReportServiceStatus(win32service.SERVICE_STOP_PENDING)
+
+ def reportStopped(self):
+ self.svc.ReportServiceStatus(win32service.SERVICE_STOPPED)
+
+
+class WinServiceController:
+ @staticmethod
+ def Start(serviceName, waitSecs=30):
+ err = 0
+ try:
+ win32serviceutil.StartService(serviceName)
+ if waitSecs:
+ win32serviceutil.WaitForServiceStatus(serviceName, win32service.SERVICE_RUNNING, waitSecs)
+ except win32service.error, exc:
+ print "Error starting service: %s" % exc.strerror
+ err = exc.winerror
+ return err
+
+ @staticmethod
+ def Stop(serviceName, waitSecs=30):
+ err = 0
+ try:
+ if waitSecs:
+ win32serviceutil.StopServiceWithDeps(serviceName, waitSecs=waitSecs)
+ else:
+ win32serviceutil.StopService(serviceName)
+ if waitSecs:
+ win32serviceutil.WaitForServiceStatus(serviceName, win32service.SERVICE_STOPPED, waitSecs)
+ except win32service.error, exc:
+ print "Error stopping service: %s (%d)" % (exc.strerror, exc.winerror)
+ err = exc.winerror
+ return err
+
+ @staticmethod
+ def QueryStatus(serviceName):
+ statusString = SERVICE_STATUS_UNKNOWN
+
+ try:
+ status = win32serviceutil.QueryServiceStatus(serviceName)[1]
+
+ if status == win32service.SERVICE_STOPPED:
+ statusString = SERVICE_STATUS_STOPPED
+ elif status == win32service.SERVICE_START_PENDING:
+ statusString = SERVICE_STATUS_STARTING
+ elif status == win32service.SERVICE_RUNNING:
+ statusString = SERVICE_STATUS_RUNNING
+ elif status == win32service.SERVICE_STOP_PENDING:
+ statusString = SERVICE_STATUS_STOPPING
+ except win32api.error:
+ statusString = SERVICE_STATUS_NOT_INSTALLED
+ pass
+
+ return statusString
+
+ @staticmethod
+ def EnsureServiceIsStarted(serviceName, waitSecs=30):
+ err = 0
+ try:
+ status = win32serviceutil.QueryServiceStatus(serviceName)[1]
+ if win32service.SERVICE_RUNNING != status:
+ if win32service.SERVICE_START_PENDING != status:
+ win32serviceutil.StartService(serviceName)
+ if waitSecs:
+ win32serviceutil.WaitForServiceStatus(serviceName, win32service.SERVICE_RUNNING, waitSecs)
+ except win32service.error, exc:
+ err = exc.winerror
+ return err
+
+
+class WinService(win32serviceutil.ServiceFramework):
+ # _svc_name_ = The service name
+ # _svc_display_name_ = The service display name
+ # _svc_description_ = The service description
+
+ _heventSvcStop = win32event.CreateEvent(None, 0, 0, None)
+ _hmtxOut = win32event.CreateMutex(None, False, None) #[fbarca] Python doesn't support critical sections
+
+ def __init__(self, *args):
+ win32serviceutil.ServiceFramework.__init__(self, *args)
+
+ def SvcDoRun(self):
+ try:
+ self.ReportServiceStatus(win32service.SERVICE_RUNNING)
+ self.ServiceMain()
+ except Exception, x:
+ #TODO: Log exception
+ self.SvcStop()
+
+ def SvcStop(self):
+ self.ReportServiceStatus(win32service.SERVICE_STOP_PENDING)
+ win32event.SetEvent(self._heventSvcStop)
+
+ # Service code entry point. Override it to implement the intended functionality.
+ def ServiceMain(self):
+ #Default implementation, does nothing.
+ win32event.WaitForSingleObject(self._heventSvcStop, win32event.INFINITE)
+ pass
+
+ def DefCtrlCHandler(self):
+ print_info_msg("Ctrl+C handler invoked. Stopping.")
+ win32event.SetEvent(self._heventSvcStop)
+ pass
+
+ #username domain\\username : The Username the service is to run under
+ #password password : The password for the username
+ #startup [manual|auto|disabled|delayed] : How the service starts, default = auto
+ #interactive : Allow the service to interact with the desktop.
+ #perfmonini file: .ini file to use for registering performance monitor data
+ #perfmondll file: .dll file to use when querying the service for performance data, default = perfmondata.dll
+ @classmethod
+ def Install(cls, startupMode = "auto", username = None, password = None, interactive = False,
+ perfMonIni = None, perfMonDll = None):
+ installArgs = [sys.argv[0], "--startup=" + startupMode]
+ if username is not None and username:
+ installArgs.append("--username=" + username)
+ if password is not None and password:
+ installArgs.append("--password=" + password)
+ if interactive:
+ installArgs.append("--interactive")
+ if perfMonIni is not None and perfMonIni:
+ installArgs.append("--perfmonini=" + perfMonIni)
+ if perfMonDll is not None and perfMonDll:
+ installArgs.append("--perfmondll=" + perfMonDll)
+ installArgs.append("install")
+ win32serviceutil.HandleCommandLine(cls, None, installArgs)
+
+ @classmethod
+ def Start(cls, waitSecs = 30):
+ return WinServiceController.Start(cls._svc_name_, waitSecs)
+
+ @classmethod
+ def Stop(cls, waitSecs = 30):
+ return WinServiceController.Stop(cls._svc_name_, waitSecs)
+
+ @classmethod
+ def QueryStatus(cls):
+ return WinServiceController.QueryStatus(cls._svc_name_)
+
+ @classmethod
+ def set_ctrl_c_handler(cls, ctrlHandler):
+ win32api.SetConsoleCtrlHandler(ctrlHandler, True)
+ pass
+
+ def _RedirectOutputStreamsToFile(self, outFilePath):
+ outFileDir = os.path.dirname(outFilePath)
+ if not os.path.exists(outFileDir):
+ os.makedirs(outFileDir)
+
+ out_writer = SyncStreamWriter(file(outFilePath, "w"), self._hmtxOut)
+ sys.stderr = out_writer
+ sys.stdout = out_writer
+ pass
+
+ def CheckForStop(self):
+ #Check for stop event to be signaled
+ return win32event.WAIT_OBJECT_0 == win32event.WaitForSingleObject(self._heventSvcStop, 1)
+
+ def _StopOrWaitForChildProcessToFinish(self, childProcess):
+ #Wait for the child process to finish or for the stop event to be signaled
+ if(win32event.WAIT_OBJECT_0 == win32event.WaitForMultipleObjects([self._heventSvcStop, childProcess._handle], False, win32event.INFINITE)):
+ # The OS only detaches the child process when the master process exits.
+ # We must kill it manually.
+ try:
+ #Sending signal.CTRL_BREAK_EVENT doesn't work. It only detaches the child process from the master.
+ # Must brutally terminate the child process. Sorry Java.
+ childProcess.terminate()
+ except OSError, e:
+ print_info_msg("Unable to stop Ambari Server - " + str(e))
+ return False
+
+ return True
+
+class SystemWideLock(object):
+
+ def __init__(self, name):
+ self._mutex = CreateMutex(None, 0, name)
+
+ def lock(self, timeout=0):
+ result = WaitForSingleObject(self._mutex, timeout)
+ if result in [WAIT_TIMEOUT, WAIT_ABANDONED, WAIT_FAILED]:
+ return False
+ elif result == WAIT_OBJECT_0:
+ return True
+
+ def unlock(self):
+ try:
+ ReleaseMutex(self._mutex)
+ return True
+ except:
+ return False
+
+ def __del__(self):
+ CloseHandle(self._mutex)
+
+class UserHelper(object):
+ ACTION_OK = 0
+ USER_EXISTS = 1
+ ACTION_FAILED = -1
+
+ def __init__(self):
+ self._policy = LsaOpenPolicy(None, POLICY_CREATE_ACCOUNT | POLICY_LOOKUP_NAMES)
+
+ def create_user(self, name, password, comment="Ambari user"):
+ user_info = {}
+ user_info['name'] = name
+ user_info['password'] = password
+ user_info['priv'] = USER_PRIV_USER
+ user_info['comment'] = comment
+ user_info['flags'] = UF_NORMAL_ACCOUNT | UF_SCRIPT
+ try:
+ NetUserAdd(None, 1, user_info)
+ except pywintypes.error as e:
+ if e.winerror == 2224:
+ return UserHelper.USER_EXISTS, e.strerror
+ else:
+ return UserHelper.ACTION_FAILED, e.strerror
+ return UserHelper.ACTION_OK, "User created."
+
+ def add_user_privilege(self, name, privilege):
+ try:
+ acc_sid = LookupAccountName(None, name)[0]
+ LsaAddAccountRights(self._policy, acc_sid, (privilege,))
+ except pywintypes.error as e:
+ return UserHelper.ACTION_FAILED, e.strerror
+ return UserHelper.ACTION_OK, "Privilege added."
+
+ def remove_user_privilege(self, name, privilege):
+ try:
+ acc_sid = LookupAccountName(None, name)[0]
+ LsaRemoveAccountRights(self._policy, acc_sid, 0, (privilege,))
+ except pywintypes.error as e:
+ return UserHelper.ACTION_FAILED, e.strerror
+ return UserHelper.ACTION_OK, "Privilege removed."
http://git-wip-us.apache.org/repos/asf/ambari/blob/8de3425f/ambari-common/src/main/python/ambari_commons/resources/os_family.json
----------------------------------------------------------------------
diff --git a/ambari-common/src/main/python/ambari_commons/resources/os_family.json b/ambari-common/src/main/python/ambari_commons/resources/os_family.json
index 2f2abcc..ac8b759 100644
--- a/ambari-common/src/main/python/ambari_commons/resources/os_family.json
+++ b/ambari-common/src/main/python/ambari_commons/resources/os_family.json
@@ -41,5 +41,16 @@
"versions": [
11
]
+ },
+ "winsrv": {
+ "distro": [
+ "win2008server",
+ "win2008serverr2",
+ "win2012server",
+ "win2012serverr2"
+ ],
+ "versions": [
+ 6
+ ]
}
}
http://git-wip-us.apache.org/repos/asf/ambari/blob/8de3425f/ambari-common/src/main/python/ambari_commons/str_utils.py
----------------------------------------------------------------------
diff --git a/ambari-common/src/main/python/ambari_commons/str_utils.py b/ambari-common/src/main/python/ambari_commons/str_utils.py
new file mode 100644
index 0000000..9a9e954
--- /dev/null
+++ b/ambari-common/src/main/python/ambari_commons/str_utils.py
@@ -0,0 +1,30 @@
+#!/usr/bin/env python
+
+'''
+Licensed to the Apache Software Foundation (ASF) under one
+or more contributor license agreements. See the NOTICE file
+distributed with this work for additional information
+regarding copyright ownership. The ASF licenses this file
+to you under the Apache License, Version 2.0 (the
+"License"); you may not use this file except in compliance
+with the License. You may obtain a copy of the License at
+
+ http://www.apache.org/licenses/LICENSE-2.0
+
+Unless required by applicable law or agreed to in writing, software
+distributed under the License is distributed on an "AS IS" BASIS,
+WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+See the License for the specific language governing permissions and
+limitations under the License.
+'''
+
+def compress_backslashes(s):
+ s1 = s
+ while (-1 != s1.find('\\\\')):
+ s1 = s1.replace('\\\\', '\\')
+ return s1
+
+def ensure_double_backslashes(s):
+ s1 = compress_backslashes(s)
+ s2 = s1.replace('\\', '\\\\')
+ return s2
http://git-wip-us.apache.org/repos/asf/ambari/blob/8de3425f/ambari-common/src/main/python/resource_management/core/logger.py
----------------------------------------------------------------------
diff --git a/ambari-common/src/main/python/resource_management/core/logger.py b/ambari-common/src/main/python/resource_management/core/logger.py
index 5e45e94..3550247 100644
--- a/ambari-common/src/main/python/resource_management/core/logger.py
+++ b/ambari-common/src/main/python/resource_management/core/logger.py
@@ -26,7 +26,7 @@ from resource_management.libraries.script.config_dictionary import UnknownConfig
class Logger:
logger = logging.getLogger("resource_management")
-
+
# unprotected_strings : protected_strings map
sensitive_strings = {}
@@ -41,8 +41,8 @@ class Logger:
@staticmethod
def info(text):
Logger.logger.info(Logger.get_protected_text(text))
-
- @staticmethod
+
+ @staticmethod
def debug(text):
Logger.logger.debug(Logger.get_protected_text(text))
@@ -57,11 +57,11 @@ class Logger:
@staticmethod
def info_resource(resource):
Logger.info(Logger.get_protected_text(Logger._get_resource_repr(resource)))
-
- @staticmethod
+
+ @staticmethod
def debug_resource(resource):
Logger.debug(Logger.get_protected_text(Logger._get_resource_repr(resource)))
-
+
@staticmethod
def get_protected_text(text):
"""
@@ -69,17 +69,17 @@ class Logger:
"""
for unprotected_string, protected_string in Logger.sensitive_strings.iteritems():
text = text.replace(unprotected_string, protected_string)
-
+
return text
-
- @staticmethod
+
+ @staticmethod
def _get_resource_repr(resource):
MESSAGE_MAX_LEN = 256
logger_level = logging._levelNames[Logger.logger.level]
-
+
arguments_str = ""
for x,y in resource.arguments.iteritems():
-
+
# strip unicode 'u' sign
if isinstance(y, unicode):
# don't show long messages
@@ -87,7 +87,7 @@ class Logger:
y = '...'
val = repr(y).lstrip('u')
# don't show dicts of configurations
- # usually too long
+ # usually too long
elif logger_level != 'DEBUG' and isinstance(y, dict):
val = "..."
# for configs which didn't come
@@ -95,14 +95,17 @@ class Logger:
val = "[EMPTY]"
# correctly output 'mode' (as they are octal values like 0755)
elif y and x == 'mode':
- val = oct(y)
+ try:
+ val = oct(y)
+ except:
+ val = repr(y)
else:
val = repr(y)
-
-
+
+
arguments_str += "'{0}': {1}, ".format(x, val)
-
- if arguments_str:
+
+ if arguments_str:
arguments_str = arguments_str[:-2]
-
- return unicode("{0} {{{1}}}").format(resource, arguments_str)
\ No newline at end of file
+
+ return unicode("{0} {{{1}}}").format(resource, arguments_str)
http://git-wip-us.apache.org/repos/asf/ambari/blob/8de3425f/ambari-common/src/main/python/resource_management/core/providers/__init__.py
----------------------------------------------------------------------
diff --git a/ambari-common/src/main/python/resource_management/core/providers/__init__.py b/ambari-common/src/main/python/resource_management/core/providers/__init__.py
index 67ca483..7f97336 100644
--- a/ambari-common/src/main/python/resource_management/core/providers/__init__.py
+++ b/ambari-common/src/main/python/resource_management/core/providers/__init__.py
@@ -50,6 +50,12 @@ PROVIDERS = dict(
ubuntu=dict(
Package="resource_management.core.providers.package.apt.AptProvider",
),
+ winsrv=dict(
+ Service="resource_management.core.providers.windows.service.ServiceProvider",
+ Execute="resource_management.core.providers.windows.system.ExecuteProvider",
+ File="resource_management.core.providers.windows.system.FileProvider",
+ Directory="resource_management.core.providers.windows.system.DirectoryProvider"
+ ),
default=dict(
File="resource_management.core.providers.system.FileProvider",
Directory="resource_management.core.providers.system.DirectoryProvider",
http://git-wip-us.apache.org/repos/asf/ambari/blob/8de3425f/ambari-common/src/main/python/resource_management/core/providers/windows/__init__.py
----------------------------------------------------------------------
diff --git a/ambari-common/src/main/python/resource_management/core/providers/windows/__init__.py b/ambari-common/src/main/python/resource_management/core/providers/windows/__init__.py
new file mode 100644
index 0000000..b0b988b
--- /dev/null
+++ b/ambari-common/src/main/python/resource_management/core/providers/windows/__init__.py
@@ -0,0 +1,20 @@
+"""
+Licensed to the Apache Software Foundation (ASF) under one
+or more contributor license agreements. See the NOTICE file
+distributed with this work for additional information
+regarding copyright ownership. The ASF licenses this file
+to you under the Apache License, Version 2.0 (the
+"License"); you may not use this file except in compliance
+with the License. You may obtain a copy of the License at
+
+ http://www.apache.org/licenses/LICENSE-2.0
+
+Unless required by applicable law or agreed to in writing, software
+distributed under the License is distributed on an "AS IS" BASIS,
+WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+See the License for the specific language governing permissions and
+limitations under the License.
+
+Ambari Agent
+
+"""
\ No newline at end of file
http://git-wip-us.apache.org/repos/asf/ambari/blob/8de3425f/ambari-common/src/main/python/resource_management/core/providers/windows/service.py
----------------------------------------------------------------------
diff --git a/ambari-common/src/main/python/resource_management/core/providers/windows/service.py b/ambari-common/src/main/python/resource_management/core/providers/windows/service.py
new file mode 100644
index 0000000..cdf3137
--- /dev/null
+++ b/ambari-common/src/main/python/resource_management/core/providers/windows/service.py
@@ -0,0 +1,65 @@
+"""
+Licensed to the Apache Software Foundation (ASF) under one
+or more contributor license agreements. See the NOTICE file
+distributed with this work for additional information
+regarding copyright ownership. The ASF licenses this file
+to you under the Apache License, Version 2.0 (the
+"License"); you may not use this file except in compliance
+with the License. You may obtain a copy of the License at
+
+ http://www.apache.org/licenses/LICENSE-2.0
+
+Unless required by applicable law or agreed to in writing, software
+distributed under the License is distributed on an "AS IS" BASIS,
+WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+See the License for the specific language governing permissions and
+limitations under the License.
+
+Ambari Agent
+
+"""
+from resource_management.core.providers import Provider
+from resource_management.core.base import Fail
+import win32service
+import time
+
+
+_schSCManager = win32service.OpenSCManager(None, None, win32service.SC_MANAGER_ALL_ACCESS)
+
+
+class ServiceProvider(Provider):
+ def action_start(self):
+ self._service_handle = self._service_handle if hasattr(self, "_service_handle") else \
+ win32service.OpenService(_schSCManager, self.resource.service_name, win32service.SERVICE_ALL_ACCESS)
+ if not self.status():
+ win32service.StartService(self._service_handle, None)
+ self.wait_status(win32service.SERVICE_RUNNING)
+
+ def action_stop(self):
+ self._service_handle = self._service_handle if hasattr(self, "_service_handle") else \
+ win32service.OpenService(_schSCManager, self.resource.service_name, win32service.SERVICE_ALL_ACCESS)
+ if self.status():
+ win32service.ControlService(self._service_handle, win32service.SERVICE_CONTROL_STOP)
+ self.wait_status(win32service.SERVICE_STOPPED)
+
+ def action_restart(self):
+ self._service_handle = win32service.OpenService(_schSCManager, self.resource.service_name,
+ win32service.SERVICE_ALL_ACCESS)
+ self.action_stop()
+ self.action_start()
+
+ def action_reload(self):
+ raise Fail("Reload for Service resource not supported on windows")
+
+ def status(self):
+ if win32service.QueryServiceStatusEx(self._service_handle)["CurrentState"] == win32service.SERVICE_RUNNING:
+ return True
+ return False
+
+ def get_current_status(self):
+ return win32service.QueryServiceStatusEx(self._service_handle)["CurrentState"]
+
+ def wait_status(self, status, timeout=5):
+ begin = time.time()
+ while self.get_current_status() != status and (timeout == 0 or time.time() - begin < timeout):
+ time.sleep(1)
\ No newline at end of file
http://git-wip-us.apache.org/repos/asf/ambari/blob/8de3425f/ambari-common/src/main/python/resource_management/core/providers/windows/system.py
----------------------------------------------------------------------
diff --git a/ambari-common/src/main/python/resource_management/core/providers/windows/system.py b/ambari-common/src/main/python/resource_management/core/providers/windows/system.py
new file mode 100644
index 0000000..e7a98fc
--- /dev/null
+++ b/ambari-common/src/main/python/resource_management/core/providers/windows/system.py
@@ -0,0 +1,382 @@
+"""
+Licensed to the Apache Software Foundation (ASF) under one
+or more contributor license agreements. See the NOTICE file
+distributed with this work for additional information
+regarding copyright ownership. The ASF licenses this file
+to you under the Apache License, Version 2.0 (the
+"License"); you may not use this file except in compliance
+with the License. You may obtain a copy of the License at
+
+ http://www.apache.org/licenses/LICENSE-2.0
+
+Unless required by applicable law or agreed to in writing, software
+distributed under the License is distributed on an "AS IS" BASIS,
+WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+See the License for the specific language governing permissions and
+limitations under the License.
+
+Ambari Agent
+
+"""
+
+from resource_management.core.providers import Provider
+from resource_management.core.logger import Logger
+from resource_management.core.base import Fail
+from resource_management.core import ExecuteTimeoutException
+import time
+import os
+import subprocess
+import shutil
+from resource_management.libraries.script import Script
+import win32con
+from win32security import *
+from win32api import *
+from winerror import ERROR_INVALID_HANDLE
+from win32profile import CreateEnvironmentBlock
+from win32process import GetExitCodeProcess, STARTF_USESTDHANDLES, STARTUPINFO, CreateProcessAsUser
+from win32event import WaitForSingleObject, INFINITE
+from win32security import *
+import msvcrt
+import tempfile
+
+def _create_tmp_files(env=None):
+ dirname = None
+ if env is None:
+ env = os.environ
+
+ for env_var_name in 'TMPDIR', 'TEMP', 'TMP':
+ if env.has_key(env_var_name):
+ dirname = env[env_var_name]
+ if dirname and os.path.exists(dirname):
+ break
+
+ if dirname is None:
+ for dirname2 in r'c:\temp', r'c:\tmp', r'\temp', r'\tmp':
+ try:
+ os.makedirs(dirname2)
+ dirname = dirname2
+ break
+ except:
+ pass
+
+ if dirname is None:
+ raise Exception('Unable to create temp dir. Insufficient access rights.')
+
+ out_file = tempfile.TemporaryFile(mode="r+b", dir=dirname)
+ err_file = tempfile.TemporaryFile(mode="r+b", dir=dirname)
+ return (msvcrt.get_osfhandle(out_file.fileno()),
+ msvcrt.get_osfhandle(err_file.fileno()),
+ out_file,
+ err_file)
+
+
+def _get_files_output(out, err):
+ out.seek(0)
+ err.seek(0)
+ return out.read().strip(), err.read().strip()
+
+
+def _safe_duplicate_handle(h):
+ try:
+ h = DuplicateHandle(GetCurrentProcess(),
+ h,
+ GetCurrentProcess(),
+ 0,
+ True,
+ win32con.DUPLICATE_SAME_ACCESS)
+ return True, h
+ except Exception as exc:
+ if exc.winerror == ERROR_INVALID_HANDLE:
+ return True, None
+ return False, None
+
+
+def _merge_env(env1, env2, merge_keys=['PYTHONPATH']):
+ """
+ Merge env2 into env1. Also current python instance variables from merge_keys list taken into account and they will be
+ merged with equivalent keys from env1 and env2 using system path separator.
+ :param env1: first environment, usually returned by CreateEnvironmentBlock
+ :param env2: custom environment
+ :param merge_keys: env variables to merge as PATH
+ :return: merged environment
+ """
+ env1 = dict(env1) # copy to new dict in case env1 is os.environ
+ if env2:
+ for key, value in env2.iteritems():
+ if not key in merge_keys:
+ env1[key] = value
+ # strnsform keys and values to str(windows can not accept unicode)
+ result_env = {}
+ for key, value in env1.iteritems():
+ if not key in merge_keys:
+ result_env[str(key)] = str(value)
+ #merge keys from merge_keys
+ def put_values(key, env, result):
+ if env and key in env:
+ result.extend(env[key].split(os.pathsep))
+
+ for key in merge_keys:
+ all_values = []
+ for env in [env1, env2, os.environ]:
+ put_values(key, env, all_values)
+ result_env[str(key)] = str(os.pathsep.join(set(all_values)))
+ return result_env
+
+def AdjustPrivilege(htoken, priv, enable = 1):
+ # Get the ID for the privilege.
+ privId = LookupPrivilegeValue(None, priv)
+ # Now obtain the privilege for this token.
+ # Create a list of the privileges to be added.
+ privState = SE_PRIVILEGE_ENABLED if enable else 0
+ newPrivileges = [(privId, privState)]
+ # and make the adjustment.
+ AdjustTokenPrivileges(htoken, 0, newPrivileges)
+
+def QueryPrivilegeState(hToken, priv):
+ # Get the ID for the privilege.
+ privId = LookupPrivilegeValue(None, priv)
+ privList = GetTokenInformation(hToken, TokenPrivileges)
+ privState = 0
+ for (id, attr) in privList:
+ if id == privId:
+ privState = attr
+ Logger.debug('Privilege state: {}={} ({}) Enabled={}'.format(privId, priv, LookupPrivilegeDisplayName(None, priv), privState))
+ return privState
+
+# Execute command. As windows hdp stack heavily relies on proper environment it is better to reload fresh environment
+# on every execution. env variable will me merged with fresh environment for user.
+def _call_command(command, logoutput=False, cwd=None, env=None, wait_for_finish=True, timeout=None, user=None):
+ # TODO implement timeout, wait_for_finish
+ Logger.info("Executing %s" % (command))
+ if user:
+ proc_token = OpenProcessToken(GetCurrentProcess(), TOKEN_QUERY | TOKEN_ADJUST_PRIVILEGES)
+
+ old_states = []
+
+ privileges = [
+ SE_ASSIGNPRIMARYTOKEN_NAME,
+ SE_INCREASE_QUOTA_NAME,
+ ]
+
+ for priv in privileges:
+ old_states.append(QueryPrivilegeState(proc_token, priv))
+ AdjustPrivilege(proc_token, priv)
+ QueryPrivilegeState(proc_token, priv)
+
+ user_token = LogonUser(user, ".", Script.get_password(user), win32con.LOGON32_LOGON_SERVICE,
+ win32con.LOGON32_PROVIDER_DEFAULT)
+ env_token = DuplicateTokenEx(user_token, SecurityIdentification, TOKEN_QUERY, TokenPrimary)
+ # getting updated environment for impersonated user and merge it with custom env
+ current_env = CreateEnvironmentBlock(env_token, False)
+ current_env = _merge_env(current_env, env)
+
+ si = STARTUPINFO()
+ out_handle, err_handle, out_file, err_file = _create_tmp_files(current_env)
+ ok, si.hStdInput = _safe_duplicate_handle(GetStdHandle(STD_INPUT_HANDLE))
+ if not ok:
+ raise Exception("Unable to create StdInput for child process")
+ ok, si.hStdOutput = _safe_duplicate_handle(out_handle)
+ if not ok:
+ raise Exception("Unable to create StdOut for child process")
+ ok, si.hStdError = _safe_duplicate_handle(err_handle)
+ if not ok:
+ raise Exception("Unable to create StdErr for child process")
+
+ Logger.debug("Redirecting stdout to '{}', stderr to '{}'".format(out_file.name, err_file.name))
+
+ si.dwFlags = win32con.STARTF_USESTDHANDLES
+ si.lpDesktop = ""
+
+ try:
+ info = CreateProcessAsUser(user_token, None, command, None, None, 1, win32con.CREATE_NO_WINDOW, current_env, cwd, si)
+ hProcess, hThread, dwProcessId, dwThreadId = info
+ hThread.Close()
+
+ try:
+ WaitForSingleObject(hProcess, INFINITE)
+ except KeyboardInterrupt:
+ pass
+ out, err = _get_files_output(out_file, err_file)
+ code = GetExitCodeProcess(hProcess)
+ finally:
+ for priv in privileges:
+ old_state = old_states.pop(0)
+ AdjustPrivilege(proc_token, priv, old_state)
+ else:
+ # getting updated environment for current process and merge it with custom env
+ cur_token = OpenProcessToken(GetCurrentProcess(), TOKEN_QUERY)
+ current_env = CreateEnvironmentBlock(cur_token, False)
+ current_env = _merge_env(current_env, env)
+ proc = subprocess.Popen(command, stdout=subprocess.PIPE, stderr=subprocess.STDOUT,
+ cwd=cwd, env=current_env, shell=False)
+ out, err = proc.communicate()
+ code = proc.returncode
+
+ if logoutput and out:
+ Logger.info(out)
+ if logoutput and err:
+ Logger.info(err)
+ return code, out, err
+
+
+# see msdn Icacls doc for rights
+def _set_file_acl(file, user, rights):
+ acls_modify_cmd = "icacls {0} /grant {1}:{2}".format(file, user, rights)
+ acls_remove_cmd = "icacls {0} /remove {1}".format(file, user)
+ code, out, err = _call_command(acls_remove_cmd)
+ if code != 0:
+ raise Fail("Can not remove rights for path {0} and user {1}".format(file, user))
+ code, out, err = _call_command(acls_modify_cmd)
+ if code != 0:
+ raise Fail("Can not set rights {0} for path {1} and user {2}".format(file, user))
+ else:
+ return
+
+
+class FileProvider(Provider):
+ def action_create(self):
+ path = self.resource.path
+
+ if os.path.isdir(path):
+ raise Fail("Applying %s failed, directory with name %s exists" % (self.resource, path))
+
+ dirname = os.path.dirname(path)
+ if not os.path.isdir(dirname):
+ raise Fail("Applying %s failed, parent directory %s doesn't exist" % (self.resource, dirname))
+
+ write = False
+ content = self._get_content()
+ if not os.path.exists(path):
+ write = True
+ reason = "it doesn't exist"
+ elif self.resource.replace:
+ if content is not None:
+ with open(path, "rb") as fp:
+ old_content = fp.read()
+ if content != old_content:
+ write = True
+ reason = "contents don't match"
+ if self.resource.backup:
+ self.resource.env.backup_file(path)
+
+ if write:
+ Logger.info("Writing %s because %s" % (self.resource, reason))
+ with open(path, "wb") as fp:
+ if content:
+ fp.write(content)
+
+ if self.resource.owner and self.resource.mode:
+ _set_file_acl(self.resource.path, self.resource.owner, self.resource.mode)
+
+ def action_delete(self):
+ path = self.resource.path
+
+ if os.path.isdir(path):
+ raise Fail("Applying %s failed, %s is directory not file!" % (self.resource, path))
+
+ if os.path.exists(path):
+ Logger.info("Deleting %s" % self.resource)
+ os.unlink(path)
+
+ def _get_content(self):
+ content = self.resource.content
+ if content is None:
+ return None
+ elif isinstance(content, basestring):
+ return content
+ elif hasattr(content, "__call__"):
+ return content()
+ raise Fail("Unknown source type for %s: %r" % (self, content))
+
+
+class ExecuteProvider(Provider):
+ def action_run(self):
+ if self.resource.creates:
+ if os.path.exists(self.resource.creates):
+ return
+
+ Logger.debug("Executing %s" % self.resource)
+
+ if self.resource.path != []:
+ if not self.resource.environment:
+ self.resource.environment = {}
+
+ self.resource.environment['PATH'] = os.pathsep.join(self.resource.path)
+
+ for i in range(0, self.resource.tries):
+ try:
+ code, _, _ = _call_command(self.resource.command, logoutput=self.resource.logoutput,
+ cwd=self.resource.cwd, env=self.resource.environment,
+ wait_for_finish=self.resource.wait_for_finish,
+ timeout=self.resource.timeout, user=self.resource.user)
+ if code != 0 and not self.resource.ignore_failures:
+ raise Fail("Failed to execute " + self.resource.command)
+ break
+ except Fail as ex:
+ if i == self.resource.tries - 1: # last try
+ raise ex
+ else:
+ Logger.info("Retrying after %d seconds. Reason: %s" % (self.resource.try_sleep, str(ex)))
+ time.sleep(self.resource.try_sleep)
+ except ExecuteTimeoutException:
+ err_msg = ("Execution of '%s' was killed due timeout after %d seconds") % (
+ self.resource.command, self.resource.timeout)
+
+ if self.resource.on_timeout:
+ Logger.info("Executing '%s'. Reason: %s" % (self.resource.on_timeout, err_msg))
+ _call_command(self.resource.on_timeout)
+ else:
+ raise Fail(err_msg)
+
+
+class DirectoryProvider(Provider):
+ def action_create(self):
+ path = DirectoryProvider._trim_uri(self.resource.path)
+ if not os.path.exists(path):
+ Logger.info("Creating directory %s" % self.resource)
+ if self.resource.recursive:
+ os.makedirs(path)
+ else:
+ dirname = os.path.dirname(path)
+ if not os.path.isdir(dirname):
+ raise Fail("Applying %s failed, parent directory %s doesn't exist" % (self.resource, dirname))
+
+ os.mkdir(path)
+
+ if not os.path.isdir(path):
+ raise Fail("Applying %s failed, file %s already exists" % (self.resource, path))
+
+ if self.resource.owner and self.resource.mode:
+ _set_file_acl(path, self.resource.owner, self.resource.mode)
+
+ def action_delete(self):
+ path = self.resource.path
+ if os.path.exists(path):
+ if not os.path.isdir(path):
+ raise Fail("Applying %s failed, %s is not a directory" % (self.resource, path))
+
+ Logger.info("Removing directory %s and all its content" % self.resource)
+ shutil.rmtree(path)
+
+ @staticmethod
+ def _trim_uri(file_uri):
+ if file_uri.startswith("file:///"):
+ return file_uri[8:]
+ return file_uri
+ # class res: pass
+ # resource = res()
+ # resource.creates = None
+ # resource.path =[]
+ # resource.tries = 1
+ # resource.logoutput = True
+ # resource.cwd = None
+ # resource.environment = None
+ # resource.wait_for_finish = True
+ # resource.timeout = None
+ # resource.command = "cmd /C echo 1 & echo 2"
+ # provider = ExecuteProvider(resource)
+ # provider.action_run()
+ # pass
+ # _set_file_acl("C:\\lol.txt", "Administrator","f")
+ # pass
+ # pass
\ No newline at end of file
http://git-wip-us.apache.org/repos/asf/ambari/blob/8de3425f/ambari-common/src/main/python/resource_management/libraries/functions/__init__.py
----------------------------------------------------------------------
diff --git a/ambari-common/src/main/python/resource_management/libraries/functions/__init__.py b/ambari-common/src/main/python/resource_management/libraries/functions/__init__.py
index 36b8e51..9b32b92 100644
--- a/ambari-common/src/main/python/resource_management/libraries/functions/__init__.py
+++ b/ambari-common/src/main/python/resource_management/libraries/functions/__init__.py
@@ -20,6 +20,8 @@ Ambari Agent
"""
+import platform
+
from resource_management.libraries.functions.default import *
from resource_management.libraries.functions.format import *
from resource_management.libraries.functions.get_kinit_path import *
@@ -31,3 +33,10 @@ from resource_management.libraries.functions.get_port_from_url import *
from resource_management.libraries.functions.hive_check import *
from resource_management.libraries.functions.version import *
from resource_management.libraries.functions.format_jvm_option import *
+
+IS_WINDOWS = platform.system() == "Windows"
+
+if IS_WINDOWS:
+ from resource_management.libraries.functions.windows_service_utils import *
+ from resource_management.libraries.functions.install_hdp_msi import *
+ from resource_management.libraries.functions.reload_windows_env import *
http://git-wip-us.apache.org/repos/asf/ambari/blob/8de3425f/ambari-common/src/main/python/resource_management/libraries/functions/default.py
----------------------------------------------------------------------
diff --git a/ambari-common/src/main/python/resource_management/libraries/functions/default.py b/ambari-common/src/main/python/resource_management/libraries/functions/default.py
index 733c03a..16782de 100644
--- a/ambari-common/src/main/python/resource_management/libraries/functions/default.py
+++ b/ambari-common/src/main/python/resource_management/libraries/functions/default.py
@@ -20,7 +20,7 @@ Ambari Agent
"""
-__all__ = ["default"]
+__all__ = ['default', 'default_string']
from resource_management.libraries.script import Script
from resource_management.libraries.script.config_dictionary import UnknownConfiguration
from resource_management.core.logger import Logger
@@ -37,4 +37,8 @@ def default(name, default_value):
Logger.debug("Cannot find configuration: '%s'. Using '%s' value as default" % (name, default_value))
return default_value
- return curr_dict
\ No newline at end of file
+ return curr_dict
+
+def default_string(name, default_value, delimiter):
+ default_list = default(name, default_value)
+ return delimiter.join(default_list)
http://git-wip-us.apache.org/repos/asf/ambari/blob/8de3425f/ambari-common/src/main/python/resource_management/libraries/functions/get_unique_id_and_date.py
----------------------------------------------------------------------
diff --git a/ambari-common/src/main/python/resource_management/libraries/functions/get_unique_id_and_date.py b/ambari-common/src/main/python/resource_management/libraries/functions/get_unique_id_and_date.py
index a79a1e5..bb68270 100644
--- a/ambari-common/src/main/python/resource_management/libraries/functions/get_unique_id_and_date.py
+++ b/ambari-common/src/main/python/resource_management/libraries/functions/get_unique_id_and_date.py
@@ -23,12 +23,19 @@ Ambari Agent
__all__ = ["get_unique_id_and_date"]
import datetime
from resource_management.core import shell
-
+from ambari_commons import os_check
def get_unique_id_and_date():
+ if os_check.OSCheck.is_windows_os():
+ from ambari_commons.os_windows import run_os_command
+ code, out, err = run_os_command("cmd /c vol C:")
+ for line in out.splitlines():
+ if line.startswith(" Volume Serial Number is"):
+ id = line[25:]
+ else:
out = shell.checked_call("hostid")[1].split('\n')[-1] # bugfix: take the lastline (stdin is not tty part cut)
id = out.strip()
- now = datetime.datetime.now()
- date = now.strftime("%M%d%y")
+ now = datetime.datetime.now()
+ date = now.strftime("%M%d%y")
- return "id{id}_date{date}".format(id=id, date=date)
+ return "id{id}_date{date}".format(id=id, date=date)
http://git-wip-us.apache.org/repos/asf/ambari/blob/8de3425f/ambari-common/src/main/python/resource_management/libraries/functions/install_hdp_msi.py
----------------------------------------------------------------------
diff --git a/ambari-common/src/main/python/resource_management/libraries/functions/install_hdp_msi.py b/ambari-common/src/main/python/resource_management/libraries/functions/install_hdp_msi.py
new file mode 100644
index 0000000..a7c2fe2
--- /dev/null
+++ b/ambari-common/src/main/python/resource_management/libraries/functions/install_hdp_msi.py
@@ -0,0 +1,182 @@
+"""
+Licensed to the Apache Software Foundation (ASF) under one
+or more contributor license agreements. See the NOTICE file
+distributed with this work for additional information
+regarding copyright ownership. The ASF licenses this file
+to you under the Apache License, Version 2.0 (the
+"License"); you may not use this file except in compliance
+with the License. You may obtain a copy of the License at
+
+ http://www.apache.org/licenses/LICENSE-2.0
+
+Unless required by applicable law or agreed to in writing, software
+distributed under the License is distributed on an "AS IS" BASIS,
+WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+See the License for the specific language governing permissions and
+limitations under the License.
+
+Ambari Agent
+
+"""
+from ambari_commons import os_utils
+from ambari_commons.inet_utils import download_file
+from ambari_commons.os_windows import SystemWideLock
+
+from resource_management.core.resources.system import Execute
+from resource_management.core.resources.system import File
+from resource_management.core.logger import Logger
+from resource_management.core.exceptions import Fail
+from resource_management.libraries.functions.reload_windows_env import reload_windows_env
+from resource_management.libraries.functions.windows_service_utils import check_windows_service_exists
+import socket
+import os
+import glob
+
+
+__all__ = ['install_windows_msi']
+
+msi_save_dir = None
+hdp_log_dir = "c:\\hadoop\\logs"
+hdp_data_dir = "c:\\hadoopDefaultData"
+local_host = socket.getfqdn()
+db_flavor = "DERBY"
+cluster_properties = """#Log directory
+HDP_LOG_DIR={hdp_log_dir}
+
+#Data directory
+HDP_DATA_DIR={hdp_data_dir}
+
+#hosts
+NAMENODE_HOST={local_host}
+SECONDARY_NAMENODE_HOST={local_host}
+RESOURCEMANAGER_HOST={local_host}
+HIVE_SERVER_HOST={local_host}
+OOZIE_SERVER_HOST={local_host}
+WEBHCAT_HOST={local_host}
+SLAVE_HOSTS={local_host}
+ZOOKEEPER_HOSTS={local_host}
+CLIENT_HOSTS={local_host}
+HBASE_MASTER={local_host}
+HBASE_REGIONSERVERS={local_host}
+FLUME_HOSTS={local_host}
+FALCON_HOST={local_host}
+KNOX_HOST={local_host}
+STORM_NIMBUS={local_host}
+STORM_SUPERVISORS={local_host}
+
+#Database host
+DB_FLAVOR={db_flavor}
+DB_HOSTNAME={local_host}
+DB_PORT=1527
+
+#Hive properties
+HIVE_DB_NAME=hive
+HIVE_DB_USERNAME=hive
+HIVE_DB_PASSWORD=hive
+
+#Oozie properties
+OOZIE_DB_NAME=oozie
+OOZIE_DB_USERNAME=oozie
+OOZIE_DB_PASSWORD=oozie
+"""
+
+INSTALL_MSI_CMD = 'cmd /C start /wait msiexec /qn /i {hdp_msi_path} /lv {hdp_log_path} MSIUSEREALADMINDETECTION=1 ' \
+ 'HDP_LAYOUT={hdp_layout_path} DESTROY_DATA=yes HDP_USER_PASSWORD={hadoop_password_arg} HDP=yes ' \
+ 'KNOX=yes KNOX_MASTER_SECRET="AmbariHDP2Windows" FALCON=yes STORM=yes HBase=yes STORM=yes FLUME=yes'
+CREATE_SERVICE_SCRIPT = os.path.abspath("sbin\createservice.ps1")
+CREATE_SERVICE_CMD = 'cmd /C powershell -File "{script}" -username hadoop -password "{password}" -servicename ' \
+ '{servicename} -hdpresourcesdir "{resourcedir}" -servicecmdpath "{servicecmd}"'
+INSTALL_MARKER_OK = "msi.installed"
+INSTALL_MARKER_FAILED = "msi.failed"
+_working_dir = None
+
+
+def _ensure_services_created(hadoop_password):
+ resource_dir_hdfs = os.path.join(os.environ["HADOOP_HDFS_HOME"], "bin")
+ service_cmd_hdfs = os.path.join(os.environ["HADOOP_HDFS_HOME"], "bin", "hdfs.cmd")
+ if not check_windows_service_exists("journalnode"):
+ Execute(CREATE_SERVICE_CMD.format(script=CREATE_SERVICE_SCRIPT, password=hadoop_password, servicename="journalnode",
+ resourcedir=resource_dir_hdfs, servicecmd=service_cmd_hdfs), logoutput=True)
+ if not check_windows_service_exists("zkfc"):
+ Execute(CREATE_SERVICE_CMD.format(script=CREATE_SERVICE_SCRIPT, password=hadoop_password, servicename="zkfc",
+ resourcedir=resource_dir_hdfs, servicecmd=service_cmd_hdfs), logoutput=True)
+
+
+# creating symlinks to services folders to avoid using stack-dependent paths
+def _create_symlinks():
+ # folders
+ Execute("cmd /c mklink /d %HADOOP_NODE%\\hadoop %HADOOP_HOME%")
+ Execute("cmd /c mklink /d %HADOOP_NODE%\\hive %HIVE_HOME%")
+ # files pairs (symlink_path, path_template_to_target_file), use * to replace file version
+ links_pairs = [
+ ("%HADOOP_HOME%\\share\\hadoop\\tools\\lib\\hadoop-streaming.jar",
+ "%HADOOP_HOME%\\share\\hadoop\\tools\\lib\\hadoop-streaming-*.jar"),
+ ("%HIVE_HOME%\\hcatalog\\share\\webhcat\\svr\\lib\\hive-webhcat.jar",
+ "%HIVE_HOME%\\hcatalog\\share\\webhcat\\svr\\lib\\hive-webhcat-*.jar"),
+ ("%HIVE_HOME%\\lib\\zookeeper.jar", "%HIVE_HOME%\\lib\\zookeeper-*.jar")
+ ]
+ for link_pair in links_pairs:
+ link, target = link_pair
+ target = glob.glob(os.path.expandvars(target))[0].replace("\\\\", "\\")
+ Execute('cmd /c mklink "{0}" "{1}"'.format(link, target))
+
+
+# check if services exists and marker file present
+def _is_msi_installed():
+ return os.path.exists(os.path.join(_working_dir, INSTALL_MARKER_OK)) and check_windows_service_exists("namenode")
+
+
+# check if msi was installed correctly and raise Fail in case of broken install
+def _validate_msi_install():
+ if not _is_msi_installed() and os.path.exists(os.path.join(_working_dir, INSTALL_MARKER_FAILED)):
+ Fail("Current or previous hdp.msi install failed. Check hdp.msi install logs")
+ return _is_msi_installed()
+
+
+def _write_marker():
+ if check_windows_service_exists("namenode"):
+ open(os.path.join(_working_dir, INSTALL_MARKER_OK), "w").close()
+ else:
+ open(os.path.join(_working_dir, INSTALL_MARKER_FAILED), "w").close()
+
+
+def install_windows_msi(msi_url, save_dir, save_file, hadoop_password):
+ global _working_dir
+ _working_dir = save_dir
+ save_dir = os.path.abspath(save_dir)
+ msi_save_dir = save_dir
+ # system wide lock to prevent simultaneous installations(when first task failed on timeout)
+ install_lock = SystemWideLock("hdp_msi_lock")
+ try:
+ # try to acquire lock
+ if not install_lock.lock():
+ Logger.info("Some other task currently installing hdp.msi, waiting for 10 min for finish")
+ if not install_lock.lock(600000):
+ raise Fail("Timeout on acquiring lock")
+ if _validate_msi_install():
+ Logger.info("hdp.msi already installed")
+ return
+
+ # install msi
+ download_file(msi_url, os.path.join(msi_save_dir, save_file))
+ File(os.path.join(msi_save_dir, "properties.txt"), content=cluster_properties.format(hdp_log_dir=hdp_log_dir,
+ hdp_data_dir=hdp_data_dir,
+ local_host=local_host,
+ db_flavor=db_flavor))
+ hdp_msi_path = os_utils.quote_path(os.path.join(save_dir, "hdp.msi"))
+ hdp_log_path = os_utils.quote_path(os.path.join(save_dir, "hdp.log"))
+ hdp_layout_path = os_utils.quote_path(os.path.join(save_dir, "properties.txt"))
+ hadoop_password_arg = os_utils.quote_path(hadoop_password)
+
+ Execute(
+ INSTALL_MSI_CMD.format(hdp_msi_path=hdp_msi_path, hdp_log_path=hdp_log_path, hdp_layout_path=hdp_layout_path,
+ hadoop_password_arg=hadoop_password_arg))
+ reload_windows_env()
+ # create additional services manually due to hdp.msi limitaitons
+ _ensure_services_created(hadoop_password)
+ _create_symlinks()
+ # finalizing install
+ _write_marker()
+ _validate_msi_install()
+ finally:
+ install_lock.unlock()
http://git-wip-us.apache.org/repos/asf/ambari/blob/8de3425f/ambari-common/src/main/python/resource_management/libraries/functions/reload_windows_env.py
----------------------------------------------------------------------
diff --git a/ambari-common/src/main/python/resource_management/libraries/functions/reload_windows_env.py b/ambari-common/src/main/python/resource_management/libraries/functions/reload_windows_env.py
new file mode 100644
index 0000000..f6f3626
--- /dev/null
+++ b/ambari-common/src/main/python/resource_management/libraries/functions/reload_windows_env.py
@@ -0,0 +1,48 @@
+"""
+Licensed to the Apache Software Foundation (ASF) under one
+or more contributor license agreements. See the NOTICE file
+distributed with this work for additional information
+regarding copyright ownership. The ASF licenses this file
+to you under the Apache License, Version 2.0 (the
+"License"); you may not use this file except in compliance
+with the License. You may obtain a copy of the License at
+
+ http://www.apache.org/licenses/LICENSE-2.0
+
+Unless required by applicable law or agreed to in writing, software
+distributed under the License is distributed on an "AS IS" BASIS,
+WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+See the License for the specific language governing permissions and
+limitations under the License.
+
+Ambari Agent
+
+"""
+
+from _winreg import (OpenKey, EnumValue, HKEY_LOCAL_MACHINE, KEY_READ, CloseKey)
+import os
+
+default_whitelist = ["FALCON_CONF_DIR", "FALCON_DATA_DIR", "FALCON_HOME", "FALCON_LOG_DIR", "FLUME_HOME",
+ "HADOOP_COMMON_HOME", "HADOOP_CONF_DIR", "HADOOP_HDFS_HOME", "HADOOP_HOME", "HADOOP_LOG_DIR",
+ "HADOOP_MAPRED_HOME", "HADOOP_NODE", "HADOOP_NODE_INSTALL_ROOT", "HADOOP_PACKAGES",
+ "HADOOP_SETUP_TOOLS", "HADOOP_YARN_HOME", "HBASE_CONF_DIR", "HBASE_HOME", "HCAT_HOME",
+ "HDFS_AUDIT_LOGGER", "HDFS_DATA_DIR", "HIVE_CONF_DIR", "HIVE_HOME", "HIVE_LIB_DIR", "HIVE_LOG_DIR",
+ "HIVE_OPTS", "KNOX_CONF_DIR", "KNOX_HOME", "KNOX_LOG_DIR", "MAHOUT_HOME", "OOZIE_DATA",
+ "OOZIE_HOME", "OOZIE_LOG", "OOZIE_ROOT", "PIG_HOME", "SQOOP_HOME", "STORM_CONF_DIR", "STORM_HOME",
+ "STORM_LOG_DIR", "WEBHCAT_CONF_DIR", "YARN_LOG_DIR", "ZOOKEEPER_CONF_DIR", "ZOOKEEPER_HOME",
+ "ZOOKEEPER_LIB_DIR", "ZOO_LOG_DIR"]
+def reload_windows_env(keys_white_list=default_whitelist):
+ root = HKEY_LOCAL_MACHINE
+ subkey = r'SYSTEM\CurrentControlSet\Control\Session Manager\Environment'
+ key = OpenKey(root, subkey, 0, KEY_READ)
+ finish = False
+ index = 0
+ while not finish:
+ try:
+ _key, _value, _ = EnumValue(key, index)
+ if (_key in keys_white_list):
+ os.environ[_key] = _value
+ except WindowsError:
+ finish = True
+ index += 1
+ CloseKey(key)
http://git-wip-us.apache.org/repos/asf/ambari/blob/8de3425f/ambari-common/src/main/python/resource_management/libraries/functions/tar_archive.py
----------------------------------------------------------------------
diff --git a/ambari-common/src/main/python/resource_management/libraries/functions/tar_archive.py b/ambari-common/src/main/python/resource_management/libraries/functions/tar_archive.py
new file mode 100644
index 0000000..efbf933
--- /dev/null
+++ b/ambari-common/src/main/python/resource_management/libraries/functions/tar_archive.py
@@ -0,0 +1,30 @@
+#!/usr/bin/env python
+
+'''
+Licensed to the Apache Software Foundation (ASF) under one
+or more contributor license agreements. See the NOTICE file
+distributed with this work for additional information
+regarding copyright ownership. The ASF licenses this file
+to you under the Apache License, Version 2.0 (the
+"License"); you may not use this file except in compliance
+with the License. You may obtain a copy of the License at
+
+ http://www.apache.org/licenses/LICENSE-2.0
+
+Unless required by applicable law or agreed to in writing, software
+distributed under the License is distributed on an "AS IS" BASIS,
+WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+See the License for the specific language governing permissions and
+limitations under the License.
+'''
+
+import os
+import tarfile
+from contextlib import closing
+
+def archive_dir(output_filename, input_dir):
+ with closing(tarfile.open(output_filename, "w:gz")) as tar:
+ try:
+ tar.add(input_dir, arcname=os.path.basename("."))
+ finally:
+ tar.close()
http://git-wip-us.apache.org/repos/asf/ambari/blob/8de3425f/ambari-common/src/main/python/resource_management/libraries/functions/windows_service_utils.py
----------------------------------------------------------------------
diff --git a/ambari-common/src/main/python/resource_management/libraries/functions/windows_service_utils.py b/ambari-common/src/main/python/resource_management/libraries/functions/windows_service_utils.py
new file mode 100644
index 0000000..7d994b7
--- /dev/null
+++ b/ambari-common/src/main/python/resource_management/libraries/functions/windows_service_utils.py
@@ -0,0 +1,42 @@
+"""
+Licensed to the Apache Software Foundation (ASF) under one
+or more contributor license agreements. See the NOTICE file
+distributed with this work for additional information
+regarding copyright ownership. The ASF licenses this file
+to you under the Apache License, Version 2.0 (the
+"License"); you may not use this file except in compliance
+with the License. You may obtain a copy of the License at
+
+ http://www.apache.org/licenses/LICENSE-2.0
+
+Unless required by applicable law or agreed to in writing, software
+distributed under the License is distributed on an "AS IS" BASIS,
+WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+See the License for the specific language governing permissions and
+limitations under the License.
+
+Ambari Agent
+
+"""
+
+from resource_management.core.exceptions import ComponentIsNotRunning
+from resource_management.core.logger import Logger
+__all__ = ['check_windows_service_status', 'check_windows_service_exists']
+
+import win32service
+
+_schSCManager = win32service.OpenSCManager(None, None, win32service.SC_MANAGER_ALL_ACCESS)
+
+def check_windows_service_status(service_name):
+ _service_handle = win32service.OpenService(_schSCManager, service_name, win32service.SERVICE_ALL_ACCESS)
+ if win32service.QueryServiceStatusEx(_service_handle)["CurrentState"] == win32service.SERVICE_STOPPED:
+ raise ComponentIsNotRunning()
+
+def check_windows_service_exists(service_name):
+ typeFilter = win32service.SERVICE_WIN32
+ stateFilter = win32service.SERVICE_STATE_ALL
+ statuses = win32service.EnumServicesStatus(_schSCManager, typeFilter, stateFilter)
+ for (short_name, desc, status) in statuses:
+ if short_name == service_name:
+ return True
+ return False
http://git-wip-us.apache.org/repos/asf/ambari/blob/8de3425f/ambari-common/src/main/python/resource_management/libraries/functions/zip_archive.py
----------------------------------------------------------------------
diff --git a/ambari-common/src/main/python/resource_management/libraries/functions/zip_archive.py b/ambari-common/src/main/python/resource_management/libraries/functions/zip_archive.py
new file mode 100644
index 0000000..cab3627
--- /dev/null
+++ b/ambari-common/src/main/python/resource_management/libraries/functions/zip_archive.py
@@ -0,0 +1,40 @@
+#!/usr/bin/env python
+
+'''
+Licensed to the Apache Software Foundation (ASF) under one
+or more contributor license agreements. See the NOTICE file
+distributed with this work for additional information
+regarding copyright ownership. The ASF licenses this file
+to you under the Apache License, Version 2.0 (the
+"License"); you may not use this file except in compliance
+with the License. You may obtain a copy of the License at
+
+ http://www.apache.org/licenses/LICENSE-2.0
+
+Unless required by applicable law or agreed to in writing, software
+distributed under the License is distributed on an "AS IS" BASIS,
+WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+See the License for the specific language governing permissions and
+limitations under the License.
+'''
+
+import os
+import zipfile
+
+def _zip_dir(zip, root):
+ for dirname, dirnames, filenames in os.walk(root):
+ for filename in filenames:
+ if len(dirname) > len(root):
+ rel_path = os.path.relpath(dirname, root)
+ arch_name = rel_path + os.sep + filename
+ else:
+ arch_name = filename
+ zip.write(os.path.join(dirname, filename), arch_name)
+
+
+def archive_dir(output_filename, input_dir):
+ zipf = zipfile.ZipFile(output_filename, 'w')
+ try:
+ _zip_dir(zipf, input_dir)
+ finally:
+ zipf.close()
http://git-wip-us.apache.org/repos/asf/ambari/blob/8de3425f/ambari-common/src/main/python/resource_management/libraries/providers/__init__.py
----------------------------------------------------------------------
diff --git a/ambari-common/src/main/python/resource_management/libraries/providers/__init__.py b/ambari-common/src/main/python/resource_management/libraries/providers/__init__.py
index 5ca7bd9..80e0a14 100644
--- a/ambari-common/src/main/python/resource_management/libraries/providers/__init__.py
+++ b/ambari-common/src/main/python/resource_management/libraries/providers/__init__.py
@@ -30,6 +30,9 @@ PROVIDERS = dict(
ubuntu=dict(
Repository="resource_management.libraries.providers.repository.UbuntuRepositoryProvider",
),
+ winsrv=dict(
+
+ ),
default=dict(
ExecuteHadoop="resource_management.libraries.providers.execute_hadoop.ExecuteHadoopProvider",
TemplateConfig="resource_management.libraries.providers.template_config.TemplateConfigProvider",
http://git-wip-us.apache.org/repos/asf/ambari/blob/8de3425f/ambari-common/src/main/python/resource_management/libraries/providers/xml_config.py
----------------------------------------------------------------------
diff --git a/ambari-common/src/main/python/resource_management/libraries/providers/xml_config.py b/ambari-common/src/main/python/resource_management/libraries/providers/xml_config.py
index 87fc657..b5c2b54 100644
--- a/ambari-common/src/main/python/resource_management/libraries/providers/xml_config.py
+++ b/ambari-common/src/main/python/resource_management/libraries/providers/xml_config.py
@@ -21,13 +21,14 @@ Ambari Agent
"""
import time
+import os
from resource_management import *
class XmlConfigProvider(Provider):
def action_create(self):
filename = self.resource.filename
xml_config_provider_config_dir = self.resource.conf_dir
-
+
# |e - for html-like escaping of <,>,',"
config_content = InlineTemplate('''<!--{{time.asctime(time.localtime())}}-->
<configuration>
@@ -48,12 +49,12 @@ class XmlConfigProvider(Provider):
{% endfor %}
</configuration>''', extra_imports=[time], configurations_dict=self.resource.configurations,
configuration_attrs=self.resource.configuration_attributes)
-
-
- Logger.info(format("Generating config: {xml_config_provider_config_dir}/{filename}"))
-
+
+ xml_config_dest_file_path = os.path.join(xml_config_provider_config_dir, filename)
+ Logger.info("Generating config: {0}".format(xml_config_dest_file_path))
+
with Environment.get_instance_copy() as env:
- File (format("{xml_config_provider_config_dir}/{filename}"),
+ File (xml_config_dest_file_path,
content = config_content,
owner = self.resource.owner,
group = self.resource.group,
http://git-wip-us.apache.org/repos/asf/ambari/blob/8de3425f/ambari-common/src/main/python/resource_management/libraries/script/script.py
----------------------------------------------------------------------
diff --git a/ambari-common/src/main/python/resource_management/libraries/script/script.py b/ambari-common/src/main/python/resource_management/libraries/script/script.py
index 001922d..39511bc 100644
--- a/ambari-common/src/main/python/resource_management/libraries/script/script.py
+++ b/ambari-common/src/main/python/resource_management/libraries/script/script.py
@@ -17,7 +17,6 @@ WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
See the License for the specific language governing permissions and
limitations under the License.
'''
-import tarfile
import tempfile
__all__ = ["Script"]
@@ -26,19 +25,24 @@ import os
import sys
import json
import logging
-from contextlib import closing
-
+import platform
from resource_management.libraries.resources import XmlConfig
from resource_management.libraries.resources import PropertiesFile
from resource_management.core.resources import File, Directory
from resource_management.core.source import InlineTemplate
-
from resource_management.core.environment import Environment
from resource_management.core.exceptions import Fail, ClientComponentHasNoStatus, ComponentIsNotRunning
from resource_management.core.resources.packaging import Package
-from resource_management.libraries.script.config_dictionary import ConfigDictionary
+from resource_management.libraries.script.config_dictionary import ConfigDictionary, UnknownConfiguration
+IS_WINDOWS = platform.system() == "Windows"
+if IS_WINDOWS:
+ from resource_management.libraries.functions.install_hdp_msi import install_windows_msi
+ from resource_management.libraries.functions.reload_windows_env import reload_windows_env
+ from resource_management.libraries.functions.zip_archive import archive_dir
+else:
+ from resource_management.libraries.functions.tar_archive import archive_dir
USAGE = """Usage: {0} <COMMAND> <JSON_CONFIG> <BASEDIR> <STROUTPUT> <LOGGING_LEVEL> <TMP_DIR>
@@ -50,6 +54,19 @@ USAGE = """Usage: {0} <COMMAND> <JSON_CONFIG> <BASEDIR> <STROUTPUT> <LOGGING_LEV
<TMP_DIR> temporary directory for executable scripts. Ex: /var/lib/ambari-agent/data/tmp
"""
+_PASSWORD_MAP = {"/configurations/cluster-env/hadoop.user.name":"/configurations/cluster-env/hadoop.user.password"}
+
+def get_path_form_configuration(name, configuration):
+ subdicts = filter(None, name.split('/'))
+
+ for x in subdicts:
+ if x in configuration:
+ configuration = configuration[x]
+ else:
+ return None
+
+ return configuration
+
class Script(object):
"""
Executes a command for custom service. stdout and stderr are written to
@@ -91,13 +108,13 @@ class Script(object):
cherr.setFormatter(formatter)
logger.addHandler(cherr)
logger.addHandler(chout)
-
+
# parse arguments
- if len(sys.argv) < 7:
+ if len(sys.argv) < 7:
logger.error("Script expects at least 6 arguments")
print USAGE.format(os.path.basename(sys.argv[0])) # print to stdout
sys.exit(1)
-
+
command_name = str.lower(sys.argv[1])
command_data_file = sys.argv[2]
basedir = sys.argv[3]
@@ -108,11 +125,23 @@ class Script(object):
logging_level_str = logging._levelNames[logging_level]
chout.setLevel(logging_level_str)
logger.setLevel(logging_level_str)
-
+
+ # on windows we need to reload some of env variables manually because there is no default paths for configs(like
+ # /etc/something/conf on linux. When this env vars created by one of the Script execution, they can not be updated
+ # in agent, so other Script executions will not be able to access to new env variables
+ if platform.system() == "Windows":
+ reload_windows_env()
+
try:
with open(command_data_file, "r") as f:
pass
Script.config = ConfigDictionary(json.load(f))
+ #load passwords here(used on windows to impersonate different users)
+ Script.passwords = {}
+ for k, v in _PASSWORD_MAP.iteritems():
+ if get_path_form_configuration(k,Script.config) and get_path_form_configuration(v,Script.config ):
+ Script.passwords[get_path_form_configuration(k,Script.config)] = get_path_form_configuration(v,Script.config)
+
except IOError:
logger.exception("Can not read json file with command parameters: ")
sys.exit(1)
@@ -150,6 +179,9 @@ class Script(object):
"""
return Script.config
+ @staticmethod
+ def get_password(user):
+ return Script.passwords[user]
@staticmethod
def get_tmp_dir():
@@ -170,28 +202,39 @@ class Script(object):
self.install_packages(env)
- def install_packages(self, env, exclude_packages=[]):
- """
- List of packages that are required< by service is received from the server
- as a command parameter. The method installs all packages
- from this list
- """
- config = self.get_config()
-
- try:
- package_list_str = config['hostLevelParams']['package_list']
- if isinstance(package_list_str,basestring) and len(package_list_str) > 0:
- package_list = json.loads(package_list_str)
- for package in package_list:
- if not package['name'] in exclude_packages:
- name = package['name']
- Package(name)
- except KeyError:
- pass # No reason to worry
-
- #RepoInstaller.remove_repos(config)
-
-
+ if not IS_WINDOWS:
+ def install_packages(self, env, exclude_packages=[]):
+ """
+ List of packages that are required< by service is received from the server
+ as a command parameter. The method installs all packages
+ from this list
+ """
+ config = self.get_config()
+ try:
+ package_list_str = config['hostLevelParams']['package_list']
+ if isinstance(package_list_str, basestring) and len(package_list_str) > 0:
+ package_list = json.loads(package_list_str)
+ for package in package_list:
+ if not package['name'] in exclude_packages:
+ name = package['name']
+ Package(name)
+ except KeyError:
+ pass # No reason to worry
+
+ # RepoInstaller.remove_repos(config)
+ pass
+ else:
+ def install_packages(self, env, exclude_packages=[]):
+ """
+ List of packages that are required< by service is received from the server
+ as a command parameter. The method installs all packages
+ from this list
+ """
+ config = self.get_config()
+
+ install_windows_msi(os.path.join(config['hostLevelParams']['jdk_location'], "hdp.msi"),
+ config["hostLevelParams"]["agentCacheDir"], "hdp.msi", self.get_password("hadoop"))
+ pass
def fail_with_error(self, message):
"""
@@ -239,56 +282,60 @@ class Script(object):
self.fail_with_error('configure method isn\'t implemented')
def generate_configs_get_template_file_content(self, filename, dicts):
- import params
+ config = self.get_config()
content = ''
for dict in dicts.split(','):
- if dict.strip() in params.config['configurations']:
- content += params.config['configurations'][dict.strip()]['content']
+ if dict.strip() in config['configurations']:
+ try:
+ content += config['configurations'][dict.strip()]['content']
+ except Fail:
+ # 'content' section not available in the component client configuration
+ pass
return content
def generate_configs_get_xml_file_content(self, filename, dict):
- import params
- return {'configurations':params.config['configurations'][dict],
- 'configuration_attributes':params.config['configuration_attributes'][dict]}
+ config = self.get_config()
+ return {'configurations':config['configurations'][dict],
+ 'configuration_attributes':config['configuration_attributes'][dict]}
def generate_configs_get_xml_file_dict(self, filename, dict):
- import params
- return params.config['configurations'][dict]
+ config = self.get_config()
+ return config['configurations'][dict]
def generate_configs(self, env):
"""
Generates config files and stores them as an archive in tmp_dir
based on xml_configs_list and env_configs_list from commandParams
"""
- import params
- env.set_params(params)
- xml_configs_list = params.config['commandParams']['xml_configs_list']
- env_configs_list = params.config['commandParams']['env_configs_list']
- properties_configs_list = params.config['commandParams']['properties_configs_list']
-
- conf_tmp_dir = tempfile.mkdtemp()
- output_filename = os.path.join(self.get_tmp_dir(),params.config['commandParams']['output_file'])
+ config = self.get_config()
+
+ xml_configs_list = config['commandParams']['xml_configs_list']
+ env_configs_list = config['commandParams']['env_configs_list']
+ properties_configs_list = config['commandParams']['properties_configs_list']
Directory(self.get_tmp_dir(), recursive=True)
- for file_dict in xml_configs_list:
- for filename, dict in file_dict.iteritems():
- XmlConfig(filename,
- conf_dir=conf_tmp_dir,
- **self.generate_configs_get_xml_file_content(filename, dict)
- )
- for file_dict in env_configs_list:
- for filename,dicts in file_dict.iteritems():
- File(os.path.join(conf_tmp_dir, filename),
- content=InlineTemplate(self.generate_configs_get_template_file_content(filename, dicts)))
-
- for file_dict in properties_configs_list:
- for filename, dict in file_dict.iteritems():
- PropertiesFile(os.path.join(conf_tmp_dir, filename),
- properties=self.generate_configs_get_xml_file_dict(filename, dict)
- )
-
- with closing(tarfile.open(output_filename, "w:gz")) as tar:
- tar.add(conf_tmp_dir, arcname=os.path.basename("."))
- tar.close()
- Directory(conf_tmp_dir, action="delete")
+
+ conf_tmp_dir = tempfile.mkdtemp(dir=self.get_tmp_dir())
+ output_filename = os.path.join(self.get_tmp_dir(), config['commandParams']['output_file'])
+
+ try:
+ for file_dict in xml_configs_list:
+ for filename, dict in file_dict.iteritems():
+ XmlConfig(filename,
+ conf_dir=conf_tmp_dir,
+ **self.generate_configs_get_xml_file_content(filename, dict)
+ )
+ for file_dict in env_configs_list:
+ for filename,dicts in file_dict.iteritems():
+ File(os.path.join(conf_tmp_dir, filename),
+ content=InlineTemplate(self.generate_configs_get_template_file_content(filename, dicts)))
+
+ for file_dict in properties_configs_list:
+ for filename, dict in file_dict.iteritems():
+ PropertiesFile(os.path.join(conf_tmp_dir, filename),
+ properties=self.generate_configs_get_xml_file_dict(filename, dict)
+ )
+ archive_dir(output_filename, conf_tmp_dir)
+ finally:
+ Directory(conf_tmp_dir, action="delete")
http://git-wip-us.apache.org/repos/asf/ambari/blob/8de3425f/ambari-server/conf/unix/ambari.properties
----------------------------------------------------------------------
diff --git a/ambari-server/conf/unix/ambari.properties b/ambari-server/conf/unix/ambari.properties
index ed1994c..e5b9e7b 100644
--- a/ambari-server/conf/unix/ambari.properties
+++ b/ambari-server/conf/unix/ambari.properties
@@ -35,6 +35,7 @@ bootstrap.setup_agent.script=/usr/lib/python2.6/site-packages/ambari_server/setu
recommendations.dir=/var/run/ambari-server/stack-recommendations
stackadvisor.script=/var/lib/ambari-server/resources/scripts/stack_advisor.py
server.tmp.dir=/var/lib/ambari-server/tmp
+ambari.python.wrap=ambari-python-wrap
api.authenticate=true
server.connection.max.idle.millis=900000
http://git-wip-us.apache.org/repos/asf/ambari/blob/8de3425f/ambari-server/conf/windows/ambari-env.cmd
----------------------------------------------------------------------
diff --git a/ambari-server/conf/windows/ambari-env.cmd b/ambari-server/conf/windows/ambari-env.cmd
new file mode 100644
index 0000000..23600d4
--- /dev/null
+++ b/ambari-server/conf/windows/ambari-env.cmd
@@ -0,0 +1,19 @@
+@echo off
+rem Licensed to the Apache Software Foundation (ASF) under one or more
+rem contributor license agreements. See the NOTICE file distributed with
+rem this work for additional information regarding copyright ownership.
+rem The ASF licenses this file to You under the Apache License, Version 2.0
+rem (the "License"); you may not use this file except in compliance with
+rem the License. You may obtain a copy of the License at
+rem
+rem http://www.apache.org/licenses/LICENSE-2.0
+rem
+rem Unless required by applicable law or agreed to in writing, software
+rem distributed under the License is distributed on an "AS IS" BASIS,
+rem WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+rem See the License for the specific language governing permissions and
+rem limitations under the License.
+
+
+set AMBARI_PASSHPHRASE=DEV
+set AMBARI_JVM_ARGS=%AMBARI_JVM_ARGS% -Xms512m -Xmx2048m -Djava.security.auth.login.config=conf\krb5JAASLogin.conf -Djava.security.krb5.conf=conf\krb5.conf -Djavax.security.auth.useSubjectCredsOnly=false
http://git-wip-us.apache.org/repos/asf/ambari/blob/8de3425f/ambari-server/conf/windows/ambari.properties
----------------------------------------------------------------------
diff --git a/ambari-server/conf/windows/ambari.properties b/ambari-server/conf/windows/ambari.properties
new file mode 100644
index 0000000..fd3a7ba
--- /dev/null
+++ b/ambari-server/conf/windows/ambari.properties
@@ -0,0 +1,82 @@
+# Copyright 2011 The Apache Software Foundation
+#
+# Licensed to the Apache Software Foundation (ASF) under one
+# or more contributor license agreements. See the NOTICE file
+# distributed with this work for additional information
+# regarding copyright ownership. The ASF licenses this file
+# to you under the Apache License, Version 2.0 (the
+# "License"); you may not use this file except in compliance
+# with the License. You may obtain a copy of the License at
+#
+# http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing, software
+# distributed under the License is distributed on an "AS IS" BASIS,
+# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+# See the License for the specific language governing permissions and
+# limitations under the License.
+
+security.server.keys_dir=keystore
+resources.dir=resources
+shared.resources.dir = sbin\\ambari_commons\\resources
+custom.action.definitions=resources\\custom_action_definitions
+
+#Comma-separated list of JDK versions
+#java.releases=jdk1.8.20,jdk1.6.31
+java.releases=jdk1.7.67
+jdk1.7.67.desc=Oracle JDK 1.7.67
+jdk1.7.67.url=http://public-repo-1.hortonworks.com/ARTIFACTS/jdk-7u67-windows-x64.exe
+jdk1.7.67.dest-file=jdk-7u67-windows-x64.exe
+jdk1.7.67.jcpol-url=http://public-repo-1.hortonworks.com/ARTIFACTS/UnlimitedJCEPolicyJDK7.zip
+jdk1.7.67.jcpol-file=UnlimitedJCEPolicyJDK7.zip
+jdk1.7.67.home=C:\\jdk1.7.0_67
+
+metadata.path=resources\\stacks
+server.version.file=version
+webapp.dir=web
+bootstrap.dir=bootstrap
+bootstrap.script=bootstrap\\bootstrap.py
+bootstrap.setup_agent.script=bootstrap\\setupAgent.py
+api.authenticate=true
+server.connection.max.idle.millis=900000
+server.fqdn.service.url=http://127.0.0.1/latest/meta-data/public-hostname
+server.stages.parallel=true
+
+# Scheduler settings
+server.execution.scheduler.isClustered=false
+server.execution.scheduler.maxThreads=5
+server.execution.scheduler.maxDbConnections=5
+server.execution.scheduler.misfire.toleration.minutes=480
+
+recommendations.dir=\\var\\run\\ambari-server\\stack-recommendations
+stackadvisor.script=resources\\scripts\\stack_advisor.py
+server.tmp.dir=\\var\\run\\ambari-server\\tmp
+views.dir=resources\\views
+ambari.python.wrap=python.exe
+
+# Default timeout in seconds before task is killed
+agent.task.timeout=600
+
+# thread pool maximums
+client.threadpool.size.max=25
+agent.threadpool.size.max=25
+
+# linux open-file limit
+ulimit.open.files=10000
+
+#java.home=C:\j2se1.8.0_05
+
+#server.jdbc.rca.driver=com.microsoft.sqlserver.jdbc.SQLServerDriver
+#server.jdbc.rca.url=jdbc:sqlserver://localhost\\SQLEXPRESS;databaseName=ambari;integratedSecurity=true
+##server.jdbc.rca.user.name=ambari
+##server.jdbc.rca.user.passwd=etc\\ambari-server\\conf\\password.dat
+
+#server.jdbc.driver=com.microsoft.sqlserver.jdbc.SQLServerDriver
+#server.jdbc.driver.path=C:\\Program Files\\Microsoft JDBC DRIVER 4.0 for SQL Server\\sqljdbc_4.0\\enu\\sqljdbc4.jar
+#server.jdbc.url=jdbc:sqlserver://localhost\\SQLEXPRESS;databaseName=ambari;integratedSecurity=true
+#server.jdbc.schema=ambari
+##server.jdbc.user.passwd=etc\\ambari-server\\conf\\password.dat
+##server.jdbc.user.name=ambari
+#scom.sink.db.driver=com.microsoft.sqlserver.jdbc.SQLServerDriver
+##scom.sink.db.url=jdbc:sqlserver://[server]:[port];databaseName=[databaseName];user=[user];password=[password]
+#scom.sink.db.url=jdbc:sqlserver://localhost\\SQLEXPRESS;databaseName=HadoopMetrics;integratedSecurity=true
http://git-wip-us.apache.org/repos/asf/ambari/blob/8de3425f/ambari-server/conf/windows/ca.config
----------------------------------------------------------------------
diff --git a/ambari-server/conf/windows/ca.config b/ambari-server/conf/windows/ca.config
new file mode 100644
index 0000000..b4dd1c5
--- /dev/null
+++ b/ambari-server/conf/windows/ca.config
@@ -0,0 +1,29 @@
+[ ca ]
+default_ca = CA_CLIENT
+[ CA_CLIENT ]
+dir = keystore\\db
+certs = $dir\\certs
+new_certs_dir = $dir\\newcerts
+
+database = $dir\\index.txt
+serial = $dir\\serial
+default_days = 365
+
+default_crl_days = 7
+default_md = md5
+
+policy = policy_anything
+
+[ policy_anything ]
+countryName = optional
+stateOrProvinceName = optional
+localityName = optional
+organizationName = optional
+organizationalUnitName = optional
+commonName = optional
+emailAddress = optional
+
+[ jdk7_ca ]
+subjectKeyIdentifier = hash
+authorityKeyIdentifier = keyid:always,issuer:always
+basicConstraints = CA:true
\ No newline at end of file