You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@ambari.apache.org by yu...@apache.org on 2014/11/14 03:20:08 UTC

[26/29] ambari git commit: AMBARI-8269. Merge branch-windows-dev changes to trunk. (Jayush Luniya via yusaku)

http://git-wip-us.apache.org/repos/asf/ambari/blob/8de3425f/ambari-common/src/main/python/ambari_commons/os_windows.py
----------------------------------------------------------------------
diff --git a/ambari-common/src/main/python/ambari_commons/os_windows.py b/ambari-common/src/main/python/ambari_commons/os_windows.py
new file mode 100644
index 0000000..2fb98e4
--- /dev/null
+++ b/ambari-common/src/main/python/ambari_commons/os_windows.py
@@ -0,0 +1,563 @@
+# !/usr/bin/env python
+
+'''
+Licensed to the Apache Software Foundation (ASF) under one
+or more contributor license agreements.  See the NOTICE file
+distributed with this work for additional information
+regarding copyright ownership.  The ASF licenses this file
+to you under the Apache License, Version 2.0 (the
+"License"); you may not use this file except in compliance
+with the License.  You may obtain a copy of the License at
+
+    http://www.apache.org/licenses/LICENSE-2.0
+
+Unless required by applicable law or agreed to in writing, software
+distributed under the License is distributed on an "AS IS" BASIS,
+WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+See the License for the specific language governing permissions and
+limitations under the License.
+'''
+import os
+import getpass
+import shlex
+import subprocess
+import sys
+import time
+import win32api
+import win32event
+import win32service
+import win32con
+import win32serviceutil
+import wmi
+import random
+import string
+
+import ctypes
+
+from win32security import *
+from win32api import *
+from winerror import ERROR_INVALID_HANDLE
+from win32process import GetExitCodeProcess, STARTF_USESTDHANDLES, STARTUPINFO, CreateProcessAsUser
+from win32event import WaitForSingleObject, INFINITE
+import msvcrt
+import tempfile
+from win32event import *
+from win32api import CloseHandle
+
+from ambari_commons.exceptions import *
+from logging_utils import *
+
+from win32security import LsaOpenPolicy, POLICY_CREATE_ACCOUNT, POLICY_LOOKUP_NAMES, LookupAccountName, \
+  LsaAddAccountRights, LsaRemoveAccountRights, SE_SERVICE_LOGON_NAME
+from win32net import NetUserAdd
+from win32netcon import USER_PRIV_USER, UF_NORMAL_ACCOUNT, UF_SCRIPT
+import pywintypes
+
+SERVICE_STATUS_UNKNOWN = "unknown"
+SERVICE_STATUS_STARTING = "starting"
+SERVICE_STATUS_RUNNING = "running"
+SERVICE_STATUS_STOPPING = "stopping"
+SERVICE_STATUS_STOPPED = "stopped"
+SERVICE_STATUS_NOT_INSTALLED = "not installed"
+
+WHOAMI_GROUPS = "whoami /groups"
+ADMIN_ACCOUNT = "BUILTIN\\Administrators"
+
+class OSVERSIONINFOEXW(ctypes.Structure):
+    _fields_ = [('dwOSVersionInfoSize', ctypes.c_ulong),
+                ('dwMajorVersion', ctypes.c_ulong),
+                ('dwMinorVersion', ctypes.c_ulong),
+                ('dwBuildNumber', ctypes.c_ulong),
+                ('dwPlatformId', ctypes.c_ulong),
+                ('szCSDVersion', ctypes.c_wchar*128),
+                ('wServicePackMajor', ctypes.c_ushort),
+                ('wServicePackMinor', ctypes.c_ushort),
+                ('wSuiteMask', ctypes.c_ushort),
+                ('wProductType', ctypes.c_byte),
+                ('wReserved', ctypes.c_byte)]
+
+def get_windows_version():
+    """
+    Get's the OS major and minor versions.  Returns a tuple of
+    (OS_MAJOR, OS_MINOR).
+    """
+    os_version = OSVERSIONINFOEXW()
+    os_version.dwOSVersionInfoSize = ctypes.sizeof(os_version)
+    retcode = ctypes.windll.Ntdll.RtlGetVersion(ctypes.byref(os_version))
+    if retcode != 0:
+        raise Exception("Failed to get OS version")
+
+    return os_version.dwMajorVersion, os_version.dwMinorVersion, os_version.dwBuildNumber
+
+CHECK_FIREWALL_SCRIPT = """[string]$CName = $env:computername
+$reg = [Microsoft.Win32.RegistryKey]::OpenRemoteBaseKey("LocalMachine",$computer)
+$domain = $reg.OpenSubKey("System\CurrentControlSet\Services\SharedAccess\Parameters\FirewallPolicy\DomainProfile").GetValue("EnableFirewall")
+$standart = $reg.OpenSubKey("System\CurrentControlSet\Services\SharedAccess\Parameters\FirewallPolicy\StandardProfile").GetValue("EnableFirewall")
+$public = $reg.OpenSubKey("System\CurrentControlSet\Services\SharedAccess\Parameters\FirewallPolicy\PublicProfile").GetValue("EnableFirewall")
+Write-Host $domain
+Write-Host $standart
+Write-Host $public
+"""
+
+def _create_tmp_files():
+  out_file = tempfile.TemporaryFile(mode="r+b")
+  err_file = tempfile.TemporaryFile(mode="r+b")
+  return (msvcrt.get_osfhandle(out_file.fileno()),
+          msvcrt.get_osfhandle(err_file.fileno()),
+          out_file,
+          err_file)
+
+
+def _get_files_output(out, err):
+  out.seek(0)
+  err.seek(0)
+  return out.read().strip(), err.read().strip()
+
+
+def _safe_duplicate_handle(h):
+  try:
+    h = DuplicateHandle(GetCurrentProcess(),
+                        h,
+                        GetCurrentProcess(),
+                        0,
+                        True,
+                        win32con.DUPLICATE_SAME_ACCESS)
+    return True, h
+  except Exception as exc:
+    if exc.winerror == ERROR_INVALID_HANDLE:
+      return True, None
+  return False, None
+
+
+def run_os_command_impersonated(cmd, user, password, domain='.'):
+  si = STARTUPINFO()
+
+  out_handle, err_handle, out_file, err_file = _create_tmp_files()
+
+  ok, si.hStdInput = _safe_duplicate_handle(GetStdHandle(STD_INPUT_HANDLE))
+
+  if not ok:
+    raise Exception("Unable to create StdInput for child process")
+  ok, si.hStdOutput = _safe_duplicate_handle(out_handle)
+  if not ok:
+    raise Exception("Unable to create StdOut for child process")
+  ok, si.hStdError = _safe_duplicate_handle(err_handle)
+  if not ok:
+    raise Exception("Unable to create StdErr for child process")
+
+  si.dwFlags = STARTF_USESTDHANDLES
+  si.lpDesktop = ""
+
+  user_token = LogonUser(user, domain, password, win32con.LOGON32_LOGON_SERVICE, win32con.LOGON32_PROVIDER_DEFAULT)
+  primary_token = DuplicateTokenEx(user_token, SecurityImpersonation, 0, TokenPrimary)
+  info = CreateProcessAsUser(primary_token, None, cmd, None, None, 1, 0, None, None, si)
+
+  hProcess, hThread, dwProcessId, dwThreadId = info
+  hThread.Close()
+
+  try:
+    WaitForSingleObject(hProcess, INFINITE)
+  except KeyboardInterrupt:
+    pass
+
+  out, err = _get_files_output(out_file, err_file)
+  exitcode = GetExitCodeProcess(hProcess)
+
+  return exitcode, out, err
+
+def run_os_command(cmd, env=None):
+  if isinstance(cmd,basestring):
+    cmd = cmd.replace("\\", "\\\\")
+    cmd = shlex.split(cmd)
+  process = subprocess.Popen(cmd,
+                             stdout=subprocess.PIPE,
+                             stdin=subprocess.PIPE,
+                             stderr=subprocess.PIPE,
+                             env=env
+  )
+  (stdoutdata, stderrdata) = process.communicate()
+  return process.returncode, stdoutdata, stderrdata
+
+# execute powershell script passed in script_content. Script will be in temporary file to avoid different escape
+# and formatting problems.
+def run_powershell_script(script_content):
+  tmp_dir = tempfile.gettempdir()
+  random_filename = ''.join(random.choice(string.lowercase) for i in range(10))
+  script_file = open(os.path.join(tmp_dir,random_filename+".ps1"),"w")
+  script_file.write(script_content)
+  script_file.close()
+  result = run_os_command("powershell  -ExecutionPolicy unrestricted -File {0}".format(script_file.name))
+  os.remove(script_file.name)
+  return result
+
+def os_change_owner(filePath, user):
+  cmd = ['icacls', filePath, '/setowner', user]
+  retcode, outdata, errdata = run_os_command(cmd)
+  return retcode
+
+def os_is_root():
+  '''
+  Checks whether the current user is a member of the Administrators group
+  Returns True if yes, otherwise False
+  '''
+  retcode, out, err = run_os_command(WHOAMI_GROUPS)
+  if retcode != 0:
+    err_msg = "Unable to check the current user's group memberships. Command {0} returned exit code {1} with message: {2}".format(WHOAMI_GROUPS, retcode, err)
+    print_warning_msg(err_msg)
+    raise FatalException(retcode, err_msg)
+
+  #Check for Administrators group membership
+  if -1 != out.find('\n' + ADMIN_ACCOUNT):
+    return True
+
+  return False
+
+def os_set_file_permissions(file, mod, recursive, user):
+  retcode = 0
+
+  #WARN_MSG = "Command {0} returned exit code {1} with message: {2}"
+  #if recursive:
+  #  params = " -R "
+  #else:
+  #  params = ""
+  #command = NR_CHMOD_CMD.format(params, mod, file)
+  #retcode, out, err = run_os_command(command)
+  #if retcode != 0:
+  #  print_warning_msg(WARN_MSG.format(command, file, err))
+  #command = NR_CHOWN_CMD.format(params, user, file)
+  #retcode, out, err = run_os_command(command)
+  #if retcode != 0:
+  #  print_warning_msg(WARN_MSG.format(command, file, err))
+
+  # rights = mod
+  # acls_remove_cmd = "icacls {0} /remove {1}".format(file, user)
+  # retcode, out, err = run_os_command(acls_remove_cmd)
+  # if retcode == 0:
+  #   acls_modify_cmd = "icacls {0} /grant {1}:{2}".format(file, user, rights)
+  #   retcode, out, err = run_os_command(acls_modify_cmd)
+  return retcode
+
+
+def os_set_open_files_limit(maxOpenFiles):
+  # No open files limit in Windows. Not messing around with the System Resource Manager, at least for now.
+  pass
+
+
+def os_getpass(prompt, stream=None):
+  """Prompt for password with echo off, using Windows getch()."""
+  if sys.stdin is not sys.__stdin__:
+    return getpass.fallback_getpass(prompt, stream)
+
+  import msvcrt
+
+  for c in prompt:
+    msvcrt.putch(c)
+
+  pw = ""
+  while True:
+    c = msvcrt.getch()
+    if c == '\r' or c == '\n':
+      break
+    if c == '\003':
+      raise KeyboardInterrupt
+    if c == '\b':
+      if pw == '':
+        pass
+      else:
+        pw = pw[:-1]
+        msvcrt.putch('\b')
+        msvcrt.putch(" ")
+        msvcrt.putch('\b')
+    else:
+      pw = pw + c
+      msvcrt.putch("*")
+
+  msvcrt.putch('\r')
+  msvcrt.putch('\n')
+  return pw
+
+#[fbarca] Not used for now, keep it around just in case
+def wait_for_pid_wmi(processName, parentPid, pattern, timeout):
+  """
+    Check pid for existence during timeout
+  """
+  tstart = time.time()
+  pid_live = 0
+
+  c = wmi.WMI(find_classes=False)
+  qry = "select * from Win32_Process where Name=\"%s\" and ParentProcessId=%d" % (processName, parentPid)
+
+  while int(time.time() - tstart) <= timeout:
+    for proc in c.query(qry):
+      cmdLine = proc.CommandLine
+      if cmdLine is not None and pattern in cmdLine:
+        return pid_live
+    time.sleep(1)
+  return 0
+
+
+#need this for redirecting output form python process to file
+class SyncStreamWriter(object):
+  def __init__(self, stream, hMutexWrite):
+    self.stream = stream
+    self.hMutexWrite = hMutexWrite
+
+  def write(self, data):
+    #Ensure that the output is thread-safe when writing from 2 separate streams into the same file
+    #  (typical when redirecting both stdout and stderr to the same file).
+    win32event.WaitForSingleObject(self.hMutexWrite, win32event.INFINITE)
+    try:
+      self.stream.write(data)
+      self.stream.flush()
+    finally:
+      win32event.ReleaseMutex(self.hMutexWrite)
+
+  def __getattr__(self, attr):
+    return getattr(self.stream, attr)
+
+
+class SvcStatusCallback(object):
+  def __init__(self, svc):
+    self.svc = svc
+
+  def reportStartPending(self):
+    self.svc.ReportServiceStatus(win32service.SERVICE_START_PENDING)
+
+  def reportStarted(self):
+    self.svc.ReportServiceStatus(win32service.SERVICE_RUNNING)
+
+  def reportStopPending(self):
+    self.svc.ReportServiceStatus(win32service.SERVICE_STOP_PENDING)
+
+  def reportStopped(self):
+    self.svc.ReportServiceStatus(win32service.SERVICE_STOPPED)
+
+
+class WinServiceController:
+  @staticmethod
+  def Start(serviceName, waitSecs=30):
+    err = 0
+    try:
+      win32serviceutil.StartService(serviceName)
+      if waitSecs:
+        win32serviceutil.WaitForServiceStatus(serviceName, win32service.SERVICE_RUNNING, waitSecs)
+    except win32service.error, exc:
+      print "Error starting service: %s" % exc.strerror
+      err = exc.winerror
+    return err
+
+  @staticmethod
+  def Stop(serviceName, waitSecs=30):
+    err = 0
+    try:
+      if waitSecs:
+        win32serviceutil.StopServiceWithDeps(serviceName, waitSecs=waitSecs)
+      else:
+        win32serviceutil.StopService(serviceName)
+        if waitSecs:
+          win32serviceutil.WaitForServiceStatus(serviceName, win32service.SERVICE_STOPPED, waitSecs)
+    except win32service.error, exc:
+      print "Error stopping service: %s (%d)" % (exc.strerror, exc.winerror)
+      err = exc.winerror
+    return err
+
+  @staticmethod
+  def QueryStatus(serviceName):
+    statusString = SERVICE_STATUS_UNKNOWN
+
+    try:
+      status = win32serviceutil.QueryServiceStatus(serviceName)[1]
+
+      if status == win32service.SERVICE_STOPPED:
+        statusString = SERVICE_STATUS_STOPPED
+      elif status == win32service.SERVICE_START_PENDING:
+        statusString = SERVICE_STATUS_STARTING
+      elif status == win32service.SERVICE_RUNNING:
+        statusString = SERVICE_STATUS_RUNNING
+      elif status == win32service.SERVICE_STOP_PENDING:
+        statusString = SERVICE_STATUS_STOPPING
+    except win32api.error:
+      statusString = SERVICE_STATUS_NOT_INSTALLED
+      pass
+
+    return statusString
+
+  @staticmethod
+  def EnsureServiceIsStarted(serviceName, waitSecs=30):
+    err = 0
+    try:
+      status = win32serviceutil.QueryServiceStatus(serviceName)[1]
+      if win32service.SERVICE_RUNNING != status:
+        if win32service.SERVICE_START_PENDING != status:
+          win32serviceutil.StartService(serviceName)
+        if waitSecs:
+          win32serviceutil.WaitForServiceStatus(serviceName, win32service.SERVICE_RUNNING, waitSecs)
+    except win32service.error, exc:
+      err = exc.winerror
+    return err
+
+
+class WinService(win32serviceutil.ServiceFramework):
+  # _svc_name_ = The service name
+  # _svc_display_name_ = The service display name
+  # _svc_description_ = The service description
+
+  _heventSvcStop = win32event.CreateEvent(None, 0, 0, None)
+  _hmtxOut = win32event.CreateMutex(None, False, None)  #[fbarca] Python doesn't support critical sections
+
+  def __init__(self, *args):
+    win32serviceutil.ServiceFramework.__init__(self, *args)
+
+  def SvcDoRun(self):
+    try:
+      self.ReportServiceStatus(win32service.SERVICE_RUNNING)
+      self.ServiceMain()
+    except Exception, x:
+      #TODO: Log exception
+      self.SvcStop()
+
+  def SvcStop(self):
+    self.ReportServiceStatus(win32service.SERVICE_STOP_PENDING)
+    win32event.SetEvent(self._heventSvcStop)
+
+  # Service code entry point. Override it to implement the intended functionality.
+  def ServiceMain(self):
+    #Default implementation, does nothing.
+    win32event.WaitForSingleObject(self._heventSvcStop, win32event.INFINITE)
+    pass
+
+  def DefCtrlCHandler(self):
+    print_info_msg("Ctrl+C handler invoked. Stopping.")
+    win32event.SetEvent(self._heventSvcStop)
+    pass
+
+  #username domain\\username : The Username the service is to run under
+  #password password : The password for the username
+  #startup [manual|auto|disabled|delayed] : How the service starts, default = auto
+  #interactive : Allow the service to interact with the desktop.
+  #perfmonini file: .ini file to use for registering performance monitor data
+  #perfmondll file: .dll file to use when querying the service for performance data, default = perfmondata.dll
+  @classmethod
+  def Install(cls, startupMode = "auto", username = None, password = None, interactive = False,
+              perfMonIni = None, perfMonDll = None):
+    installArgs = [sys.argv[0], "--startup=" + startupMode]
+    if username is not None and username:
+      installArgs.append("--username=" + username)
+      if password is not None and password:
+        installArgs.append("--password=" + password)
+    if interactive:
+      installArgs.append("--interactive")
+    if perfMonIni is not None and perfMonIni:
+      installArgs.append("--perfmonini=" + perfMonIni)
+    if perfMonDll is not None and perfMonDll:
+      installArgs.append("--perfmondll=" + perfMonDll)
+    installArgs.append("install")
+    win32serviceutil.HandleCommandLine(cls, None, installArgs)
+
+  @classmethod
+  def Start(cls, waitSecs = 30):
+    return WinServiceController.Start(cls._svc_name_, waitSecs)
+
+  @classmethod
+  def Stop(cls, waitSecs = 30):
+    return WinServiceController.Stop(cls._svc_name_, waitSecs)
+
+  @classmethod
+  def QueryStatus(cls):
+    return WinServiceController.QueryStatus(cls._svc_name_)
+
+  @classmethod
+  def set_ctrl_c_handler(cls, ctrlHandler):
+    win32api.SetConsoleCtrlHandler(ctrlHandler, True)
+    pass
+
+  def _RedirectOutputStreamsToFile(self, outFilePath):
+    outFileDir = os.path.dirname(outFilePath)
+    if not os.path.exists(outFileDir):
+      os.makedirs(outFileDir)
+
+    out_writer = SyncStreamWriter(file(outFilePath, "w"), self._hmtxOut)
+    sys.stderr = out_writer
+    sys.stdout = out_writer
+    pass
+
+  def CheckForStop(self):
+    #Check for stop event to be signaled
+    return win32event.WAIT_OBJECT_0 == win32event.WaitForSingleObject(self._heventSvcStop, 1)
+
+  def _StopOrWaitForChildProcessToFinish(self, childProcess):
+    #Wait for the child process to finish or for the stop event to be signaled
+    if(win32event.WAIT_OBJECT_0 == win32event.WaitForMultipleObjects([self._heventSvcStop, childProcess._handle], False, win32event.INFINITE)):
+      # The OS only detaches the child process when the master process exits.
+      # We must kill it manually.
+      try:
+        #Sending signal.CTRL_BREAK_EVENT doesn't work. It only detaches the child process from the master.
+        #  Must brutally terminate the child process. Sorry Java.
+        childProcess.terminate()
+      except OSError, e:
+        print_info_msg("Unable to stop Ambari Server - " + str(e))
+        return False
+
+    return True
+
+class SystemWideLock(object):
+
+  def __init__(self, name):
+    self._mutex = CreateMutex(None, 0, name)
+
+  def lock(self, timeout=0):
+    result = WaitForSingleObject(self._mutex, timeout)
+    if result in [WAIT_TIMEOUT, WAIT_ABANDONED, WAIT_FAILED]:
+      return False
+    elif result == WAIT_OBJECT_0:
+      return True
+
+  def unlock(self):
+    try:
+      ReleaseMutex(self._mutex)
+      return True
+    except:
+      return False
+
+  def __del__(self):
+    CloseHandle(self._mutex)
+
+class UserHelper(object):
+  ACTION_OK = 0
+  USER_EXISTS = 1
+  ACTION_FAILED = -1
+
+  def __init__(self):
+    self._policy = LsaOpenPolicy(None, POLICY_CREATE_ACCOUNT | POLICY_LOOKUP_NAMES)
+
+  def create_user(self, name, password, comment="Ambari user"):
+    user_info = {}
+    user_info['name'] = name
+    user_info['password'] = password
+    user_info['priv'] = USER_PRIV_USER
+    user_info['comment'] = comment
+    user_info['flags'] = UF_NORMAL_ACCOUNT | UF_SCRIPT
+    try:
+      NetUserAdd(None, 1, user_info)
+    except pywintypes.error as e:
+      if e.winerror == 2224:
+        return UserHelper.USER_EXISTS, e.strerror
+      else:
+        return UserHelper.ACTION_FAILED, e.strerror
+    return UserHelper.ACTION_OK, "User created."
+
+  def add_user_privilege(self, name, privilege):
+    try:
+      acc_sid = LookupAccountName(None, name)[0]
+      LsaAddAccountRights(self._policy, acc_sid, (privilege,))
+    except pywintypes.error as e:
+      return UserHelper.ACTION_FAILED, e.strerror
+    return UserHelper.ACTION_OK, "Privilege added."
+
+  def remove_user_privilege(self, name, privilege):
+    try:
+      acc_sid = LookupAccountName(None, name)[0]
+      LsaRemoveAccountRights(self._policy, acc_sid, 0, (privilege,))
+    except pywintypes.error as e:
+      return UserHelper.ACTION_FAILED, e.strerror
+    return UserHelper.ACTION_OK, "Privilege removed."

http://git-wip-us.apache.org/repos/asf/ambari/blob/8de3425f/ambari-common/src/main/python/ambari_commons/resources/os_family.json
----------------------------------------------------------------------
diff --git a/ambari-common/src/main/python/ambari_commons/resources/os_family.json b/ambari-common/src/main/python/ambari_commons/resources/os_family.json
index 2f2abcc..ac8b759 100644
--- a/ambari-common/src/main/python/ambari_commons/resources/os_family.json
+++ b/ambari-common/src/main/python/ambari_commons/resources/os_family.json
@@ -41,5 +41,16 @@
     "versions": [
       11
     ]
+  },
+  "winsrv": {
+    "distro": [
+      "win2008server",
+      "win2008serverr2",
+      "win2012server",
+      "win2012serverr2"
+    ],
+    "versions": [
+      6
+    ]
   }
 }

http://git-wip-us.apache.org/repos/asf/ambari/blob/8de3425f/ambari-common/src/main/python/ambari_commons/str_utils.py
----------------------------------------------------------------------
diff --git a/ambari-common/src/main/python/ambari_commons/str_utils.py b/ambari-common/src/main/python/ambari_commons/str_utils.py
new file mode 100644
index 0000000..9a9e954
--- /dev/null
+++ b/ambari-common/src/main/python/ambari_commons/str_utils.py
@@ -0,0 +1,30 @@
+#!/usr/bin/env python
+
+'''
+Licensed to the Apache Software Foundation (ASF) under one
+or more contributor license agreements.  See the NOTICE file
+distributed with this work for additional information
+regarding copyright ownership.  The ASF licenses this file
+to you under the Apache License, Version 2.0 (the
+"License"); you may not use this file except in compliance
+with the License.  You may obtain a copy of the License at
+
+    http://www.apache.org/licenses/LICENSE-2.0
+
+Unless required by applicable law or agreed to in writing, software
+distributed under the License is distributed on an "AS IS" BASIS,
+WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+See the License for the specific language governing permissions and
+limitations under the License.
+'''
+
+def compress_backslashes(s):
+  s1 = s
+  while (-1 != s1.find('\\\\')):
+    s1 = s1.replace('\\\\', '\\')
+  return s1
+
+def ensure_double_backslashes(s):
+  s1 = compress_backslashes(s)
+  s2 = s1.replace('\\', '\\\\')
+  return s2

http://git-wip-us.apache.org/repos/asf/ambari/blob/8de3425f/ambari-common/src/main/python/resource_management/core/logger.py
----------------------------------------------------------------------
diff --git a/ambari-common/src/main/python/resource_management/core/logger.py b/ambari-common/src/main/python/resource_management/core/logger.py
index 5e45e94..3550247 100644
--- a/ambari-common/src/main/python/resource_management/core/logger.py
+++ b/ambari-common/src/main/python/resource_management/core/logger.py
@@ -26,7 +26,7 @@ from resource_management.libraries.script.config_dictionary import UnknownConfig
 
 class Logger:
   logger = logging.getLogger("resource_management")
-  
+
   # unprotected_strings : protected_strings map
   sensitive_strings = {}
 
@@ -41,8 +41,8 @@ class Logger:
   @staticmethod
   def info(text):
     Logger.logger.info(Logger.get_protected_text(text))
-  
-  @staticmethod  
+
+  @staticmethod
   def debug(text):
     Logger.logger.debug(Logger.get_protected_text(text))
 
@@ -57,11 +57,11 @@ class Logger:
   @staticmethod
   def info_resource(resource):
     Logger.info(Logger.get_protected_text(Logger._get_resource_repr(resource)))
-  
-  @staticmethod  
+
+  @staticmethod
   def debug_resource(resource):
     Logger.debug(Logger.get_protected_text(Logger._get_resource_repr(resource)))
-    
+
   @staticmethod
   def get_protected_text(text):
     """
@@ -69,17 +69,17 @@ class Logger:
     """
     for unprotected_string, protected_string in Logger.sensitive_strings.iteritems():
       text = text.replace(unprotected_string, protected_string)
-      
+
     return text
-    
-  @staticmethod  
+
+  @staticmethod
   def _get_resource_repr(resource):
     MESSAGE_MAX_LEN = 256
     logger_level = logging._levelNames[Logger.logger.level]
-    
+
     arguments_str = ""
     for x,y in resource.arguments.iteritems():
-      
+
       # strip unicode 'u' sign
       if isinstance(y, unicode):
         # don't show long messages
@@ -87,7 +87,7 @@ class Logger:
           y = '...'
         val = repr(y).lstrip('u')
       # don't show dicts of configurations
-      # usually too long  
+      # usually too long
       elif logger_level != 'DEBUG' and isinstance(y, dict):
         val = "..."
       # for configs which didn't come
@@ -95,14 +95,17 @@ class Logger:
         val = "[EMPTY]"
       # correctly output 'mode' (as they are octal values like 0755)
       elif y and x == 'mode':
-        val = oct(y)
+        try:
+          val = oct(y)
+        except:
+          val = repr(y)
       else:
         val = repr(y)
-      
-      
+
+
       arguments_str += "'{0}': {1}, ".format(x, val)
-      
-    if arguments_str:  
+
+    if arguments_str:
       arguments_str = arguments_str[:-2]
-    
-    return unicode("{0} {{{1}}}").format(resource, arguments_str)
\ No newline at end of file
+
+    return unicode("{0} {{{1}}}").format(resource, arguments_str)

http://git-wip-us.apache.org/repos/asf/ambari/blob/8de3425f/ambari-common/src/main/python/resource_management/core/providers/__init__.py
----------------------------------------------------------------------
diff --git a/ambari-common/src/main/python/resource_management/core/providers/__init__.py b/ambari-common/src/main/python/resource_management/core/providers/__init__.py
index 67ca483..7f97336 100644
--- a/ambari-common/src/main/python/resource_management/core/providers/__init__.py
+++ b/ambari-common/src/main/python/resource_management/core/providers/__init__.py
@@ -50,6 +50,12 @@ PROVIDERS = dict(
   ubuntu=dict(
     Package="resource_management.core.providers.package.apt.AptProvider",
   ),
+  winsrv=dict(
+    Service="resource_management.core.providers.windows.service.ServiceProvider",
+    Execute="resource_management.core.providers.windows.system.ExecuteProvider",
+    File="resource_management.core.providers.windows.system.FileProvider",
+    Directory="resource_management.core.providers.windows.system.DirectoryProvider"
+  ),
   default=dict(
     File="resource_management.core.providers.system.FileProvider",
     Directory="resource_management.core.providers.system.DirectoryProvider",

http://git-wip-us.apache.org/repos/asf/ambari/blob/8de3425f/ambari-common/src/main/python/resource_management/core/providers/windows/__init__.py
----------------------------------------------------------------------
diff --git a/ambari-common/src/main/python/resource_management/core/providers/windows/__init__.py b/ambari-common/src/main/python/resource_management/core/providers/windows/__init__.py
new file mode 100644
index 0000000..b0b988b
--- /dev/null
+++ b/ambari-common/src/main/python/resource_management/core/providers/windows/__init__.py
@@ -0,0 +1,20 @@
+"""
+Licensed to the Apache Software Foundation (ASF) under one
+or more contributor license agreements.  See the NOTICE file
+distributed with this work for additional information
+regarding copyright ownership.  The ASF licenses this file
+to you under the Apache License, Version 2.0 (the
+"License"); you may not use this file except in compliance
+with the License.  You may obtain a copy of the License at
+
+    http://www.apache.org/licenses/LICENSE-2.0
+
+Unless required by applicable law or agreed to in writing, software
+distributed under the License is distributed on an "AS IS" BASIS,
+WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+See the License for the specific language governing permissions and
+limitations under the License.
+
+Ambari Agent
+
+"""
\ No newline at end of file

http://git-wip-us.apache.org/repos/asf/ambari/blob/8de3425f/ambari-common/src/main/python/resource_management/core/providers/windows/service.py
----------------------------------------------------------------------
diff --git a/ambari-common/src/main/python/resource_management/core/providers/windows/service.py b/ambari-common/src/main/python/resource_management/core/providers/windows/service.py
new file mode 100644
index 0000000..cdf3137
--- /dev/null
+++ b/ambari-common/src/main/python/resource_management/core/providers/windows/service.py
@@ -0,0 +1,65 @@
+"""
+Licensed to the Apache Software Foundation (ASF) under one
+or more contributor license agreements.  See the NOTICE file
+distributed with this work for additional information
+regarding copyright ownership.  The ASF licenses this file
+to you under the Apache License, Version 2.0 (the
+"License"); you may not use this file except in compliance
+with the License.  You may obtain a copy of the License at
+
+    http://www.apache.org/licenses/LICENSE-2.0
+
+Unless required by applicable law or agreed to in writing, software
+distributed under the License is distributed on an "AS IS" BASIS,
+WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+See the License for the specific language governing permissions and
+limitations under the License.
+
+Ambari Agent
+
+"""
+from resource_management.core.providers import Provider
+from resource_management.core.base import Fail
+import win32service
+import time
+
+
+_schSCManager = win32service.OpenSCManager(None, None, win32service.SC_MANAGER_ALL_ACCESS)
+
+
+class ServiceProvider(Provider):
+  def action_start(self):
+    self._service_handle = self._service_handle if hasattr(self, "_service_handle") else \
+      win32service.OpenService(_schSCManager, self.resource.service_name, win32service.SERVICE_ALL_ACCESS)
+    if not self.status():
+      win32service.StartService(self._service_handle, None)
+      self.wait_status(win32service.SERVICE_RUNNING)
+
+  def action_stop(self):
+    self._service_handle = self._service_handle if hasattr(self, "_service_handle") else \
+      win32service.OpenService(_schSCManager, self.resource.service_name, win32service.SERVICE_ALL_ACCESS)
+    if self.status():
+      win32service.ControlService(self._service_handle, win32service.SERVICE_CONTROL_STOP)
+      self.wait_status(win32service.SERVICE_STOPPED)
+
+  def action_restart(self):
+    self._service_handle = win32service.OpenService(_schSCManager, self.resource.service_name,
+                                                    win32service.SERVICE_ALL_ACCESS)
+    self.action_stop()
+    self.action_start()
+
+  def action_reload(self):
+    raise Fail("Reload for Service resource not supported on windows")
+
+  def status(self):
+    if win32service.QueryServiceStatusEx(self._service_handle)["CurrentState"] == win32service.SERVICE_RUNNING:
+      return True
+    return False
+
+  def get_current_status(self):
+    return win32service.QueryServiceStatusEx(self._service_handle)["CurrentState"]
+
+  def wait_status(self, status, timeout=5):
+    begin = time.time()
+    while self.get_current_status() != status and (timeout == 0 or time.time() - begin < timeout):
+      time.sleep(1)
\ No newline at end of file

http://git-wip-us.apache.org/repos/asf/ambari/blob/8de3425f/ambari-common/src/main/python/resource_management/core/providers/windows/system.py
----------------------------------------------------------------------
diff --git a/ambari-common/src/main/python/resource_management/core/providers/windows/system.py b/ambari-common/src/main/python/resource_management/core/providers/windows/system.py
new file mode 100644
index 0000000..e7a98fc
--- /dev/null
+++ b/ambari-common/src/main/python/resource_management/core/providers/windows/system.py
@@ -0,0 +1,382 @@
+"""
+Licensed to the Apache Software Foundation (ASF) under one
+or more contributor license agreements.  See the NOTICE file
+distributed with this work for additional information
+regarding copyright ownership.  The ASF licenses this file
+to you under the Apache License, Version 2.0 (the
+"License"); you may not use this file except in compliance
+with the License.  You may obtain a copy of the License at
+
+    http://www.apache.org/licenses/LICENSE-2.0
+
+Unless required by applicable law or agreed to in writing, software
+distributed under the License is distributed on an "AS IS" BASIS,
+WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+See the License for the specific language governing permissions and
+limitations under the License.
+
+Ambari Agent
+
+"""
+
+from resource_management.core.providers import Provider
+from resource_management.core.logger import Logger
+from resource_management.core.base import Fail
+from resource_management.core import ExecuteTimeoutException
+import time
+import os
+import subprocess
+import shutil
+from resource_management.libraries.script import Script
+import win32con
+from win32security import *
+from win32api import *
+from winerror import ERROR_INVALID_HANDLE
+from win32profile import CreateEnvironmentBlock
+from win32process import GetExitCodeProcess, STARTF_USESTDHANDLES, STARTUPINFO, CreateProcessAsUser
+from win32event import WaitForSingleObject, INFINITE
+from win32security import *
+import msvcrt
+import tempfile
+
+def _create_tmp_files(env=None):
+  dirname = None
+  if env is None:
+    env = os.environ
+
+  for env_var_name in 'TMPDIR', 'TEMP', 'TMP':
+    if env.has_key(env_var_name):
+      dirname = env[env_var_name]
+      if dirname and os.path.exists(dirname):
+        break
+
+  if dirname is None:
+    for dirname2 in r'c:\temp', r'c:\tmp', r'\temp', r'\tmp':
+      try:
+        os.makedirs(dirname2)
+        dirname = dirname2
+        break
+      except:
+        pass
+
+  if dirname is None:
+    raise Exception('Unable to create temp dir. Insufficient access rights.')
+
+  out_file = tempfile.TemporaryFile(mode="r+b", dir=dirname)
+  err_file = tempfile.TemporaryFile(mode="r+b", dir=dirname)
+  return (msvcrt.get_osfhandle(out_file.fileno()),
+          msvcrt.get_osfhandle(err_file.fileno()),
+          out_file,
+          err_file)
+
+
+def _get_files_output(out, err):
+  out.seek(0)
+  err.seek(0)
+  return out.read().strip(), err.read().strip()
+
+
+def _safe_duplicate_handle(h):
+  try:
+    h = DuplicateHandle(GetCurrentProcess(),
+                        h,
+                        GetCurrentProcess(),
+                        0,
+                        True,
+                        win32con.DUPLICATE_SAME_ACCESS)
+    return True, h
+  except Exception as exc:
+    if exc.winerror == ERROR_INVALID_HANDLE:
+      return True, None
+  return False, None
+
+
+def _merge_env(env1, env2, merge_keys=['PYTHONPATH']):
+  """
+  Merge env2 into env1. Also current python instance variables from merge_keys list taken into account and they will be
+  merged with equivalent keys from env1 and env2 using system path separator.
+  :param env1: first environment, usually returned by CreateEnvironmentBlock
+  :param env2: custom environment
+  :param merge_keys: env variables to merge as PATH
+  :return: merged environment
+  """
+  env1 = dict(env1)  # copy to new dict in case env1 is os.environ
+  if env2:
+    for key, value in env2.iteritems():
+      if not key in merge_keys:
+        env1[key] = value
+  # strnsform keys and values to str(windows can not accept unicode)
+  result_env = {}
+  for key, value in env1.iteritems():
+    if not key in merge_keys:
+      result_env[str(key)] = str(value)
+  #merge keys from merge_keys
+  def put_values(key, env, result):
+    if env and key in env:
+      result.extend(env[key].split(os.pathsep))
+
+  for key in merge_keys:
+    all_values = []
+    for env in [env1, env2, os.environ]:
+      put_values(key, env, all_values)
+    result_env[str(key)] = str(os.pathsep.join(set(all_values)))
+  return result_env
+
+def AdjustPrivilege(htoken, priv, enable = 1):
+  # Get the ID for the privilege.
+  privId = LookupPrivilegeValue(None, priv)
+  # Now obtain the privilege for this token.
+  # Create a list of the privileges to be added.
+  privState = SE_PRIVILEGE_ENABLED if enable else 0
+  newPrivileges = [(privId, privState)]
+  # and make the adjustment.
+  AdjustTokenPrivileges(htoken, 0, newPrivileges)
+
+def QueryPrivilegeState(hToken, priv):
+  # Get the ID for the privilege.
+  privId = LookupPrivilegeValue(None, priv)
+  privList = GetTokenInformation(hToken, TokenPrivileges)
+  privState = 0
+  for (id, attr) in privList:
+    if id == privId:
+      privState = attr
+  Logger.debug('Privilege state: {}={} ({}) Enabled={}'.format(privId, priv, LookupPrivilegeDisplayName(None, priv), privState))
+  return privState
+
+# Execute command. As windows hdp stack heavily relies on proper environment it is better to reload fresh environment
+# on every execution. env variable will me merged with fresh environment for user.
+def _call_command(command, logoutput=False, cwd=None, env=None, wait_for_finish=True, timeout=None, user=None):
+  # TODO implement timeout, wait_for_finish
+  Logger.info("Executing %s" % (command))
+  if user:
+    proc_token = OpenProcessToken(GetCurrentProcess(), TOKEN_QUERY | TOKEN_ADJUST_PRIVILEGES)
+
+    old_states = []
+
+    privileges = [
+      SE_ASSIGNPRIMARYTOKEN_NAME,
+      SE_INCREASE_QUOTA_NAME,
+    ]
+
+    for priv in privileges:
+      old_states.append(QueryPrivilegeState(proc_token, priv))
+      AdjustPrivilege(proc_token, priv)
+      QueryPrivilegeState(proc_token, priv)
+
+    user_token = LogonUser(user, ".", Script.get_password(user), win32con.LOGON32_LOGON_SERVICE,
+                           win32con.LOGON32_PROVIDER_DEFAULT)
+    env_token = DuplicateTokenEx(user_token, SecurityIdentification, TOKEN_QUERY, TokenPrimary)
+    # getting updated environment for impersonated user and merge it with custom env
+    current_env = CreateEnvironmentBlock(env_token, False)
+    current_env = _merge_env(current_env, env)
+
+    si = STARTUPINFO()
+    out_handle, err_handle, out_file, err_file = _create_tmp_files(current_env)
+    ok, si.hStdInput = _safe_duplicate_handle(GetStdHandle(STD_INPUT_HANDLE))
+    if not ok:
+      raise Exception("Unable to create StdInput for child process")
+    ok, si.hStdOutput = _safe_duplicate_handle(out_handle)
+    if not ok:
+      raise Exception("Unable to create StdOut for child process")
+    ok, si.hStdError = _safe_duplicate_handle(err_handle)
+    if not ok:
+      raise Exception("Unable to create StdErr for child process")
+
+    Logger.debug("Redirecting stdout to '{}', stderr to '{}'".format(out_file.name, err_file.name))
+
+    si.dwFlags = win32con.STARTF_USESTDHANDLES
+    si.lpDesktop = ""
+
+    try:
+      info = CreateProcessAsUser(user_token, None, command, None, None, 1, win32con.CREATE_NO_WINDOW, current_env, cwd, si)
+      hProcess, hThread, dwProcessId, dwThreadId = info
+      hThread.Close()
+
+      try:
+        WaitForSingleObject(hProcess, INFINITE)
+      except KeyboardInterrupt:
+        pass
+      out, err = _get_files_output(out_file, err_file)
+      code = GetExitCodeProcess(hProcess)
+    finally:
+      for priv in privileges:
+        old_state = old_states.pop(0)
+        AdjustPrivilege(proc_token, priv, old_state)
+  else:
+    # getting updated environment for current process and merge it with custom env
+    cur_token = OpenProcessToken(GetCurrentProcess(), TOKEN_QUERY)
+    current_env = CreateEnvironmentBlock(cur_token, False)
+    current_env = _merge_env(current_env, env)
+    proc = subprocess.Popen(command, stdout=subprocess.PIPE, stderr=subprocess.STDOUT,
+                            cwd=cwd, env=current_env, shell=False)
+    out, err = proc.communicate()
+    code = proc.returncode
+
+  if logoutput and out:
+    Logger.info(out)
+  if logoutput and err:
+    Logger.info(err)
+  return code, out, err
+
+
+# see msdn Icacls doc for rights
+def _set_file_acl(file, user, rights):
+  acls_modify_cmd = "icacls {0} /grant {1}:{2}".format(file, user, rights)
+  acls_remove_cmd = "icacls {0} /remove {1}".format(file, user)
+  code, out, err = _call_command(acls_remove_cmd)
+  if code != 0:
+    raise Fail("Can not remove rights for path {0} and user {1}".format(file, user))
+  code, out, err = _call_command(acls_modify_cmd)
+  if code != 0:
+    raise Fail("Can not set rights {0} for path {1} and user {2}".format(file, user))
+  else:
+    return
+
+
+class FileProvider(Provider):
+  def action_create(self):
+    path = self.resource.path
+
+    if os.path.isdir(path):
+      raise Fail("Applying %s failed, directory with name %s exists" % (self.resource, path))
+
+    dirname = os.path.dirname(path)
+    if not os.path.isdir(dirname):
+      raise Fail("Applying %s failed, parent directory %s doesn't exist" % (self.resource, dirname))
+
+    write = False
+    content = self._get_content()
+    if not os.path.exists(path):
+      write = True
+      reason = "it doesn't exist"
+    elif self.resource.replace:
+      if content is not None:
+        with open(path, "rb") as fp:
+          old_content = fp.read()
+        if content != old_content:
+          write = True
+          reason = "contents don't match"
+          if self.resource.backup:
+            self.resource.env.backup_file(path)
+
+    if write:
+      Logger.info("Writing %s because %s" % (self.resource, reason))
+      with open(path, "wb") as fp:
+        if content:
+          fp.write(content)
+
+    if self.resource.owner and self.resource.mode:
+      _set_file_acl(self.resource.path, self.resource.owner, self.resource.mode)
+
+  def action_delete(self):
+    path = self.resource.path
+
+    if os.path.isdir(path):
+      raise Fail("Applying %s failed, %s is directory not file!" % (self.resource, path))
+
+    if os.path.exists(path):
+      Logger.info("Deleting %s" % self.resource)
+      os.unlink(path)
+
+  def _get_content(self):
+    content = self.resource.content
+    if content is None:
+      return None
+    elif isinstance(content, basestring):
+      return content
+    elif hasattr(content, "__call__"):
+      return content()
+    raise Fail("Unknown source type for %s: %r" % (self, content))
+
+
+class ExecuteProvider(Provider):
+  def action_run(self):
+    if self.resource.creates:
+      if os.path.exists(self.resource.creates):
+        return
+
+    Logger.debug("Executing %s" % self.resource)
+
+    if self.resource.path != []:
+      if not self.resource.environment:
+        self.resource.environment = {}
+
+      self.resource.environment['PATH'] = os.pathsep.join(self.resource.path)
+
+    for i in range(0, self.resource.tries):
+      try:
+        code, _, _ = _call_command(self.resource.command, logoutput=self.resource.logoutput,
+                                   cwd=self.resource.cwd, env=self.resource.environment,
+                                   wait_for_finish=self.resource.wait_for_finish,
+                                   timeout=self.resource.timeout, user=self.resource.user)
+        if code != 0 and not self.resource.ignore_failures:
+          raise Fail("Failed to execute " + self.resource.command)
+        break
+      except Fail as ex:
+        if i == self.resource.tries - 1:  # last try
+          raise ex
+        else:
+          Logger.info("Retrying after %d seconds. Reason: %s" % (self.resource.try_sleep, str(ex)))
+          time.sleep(self.resource.try_sleep)
+      except ExecuteTimeoutException:
+        err_msg = ("Execution of '%s' was killed due timeout after %d seconds") % (
+          self.resource.command, self.resource.timeout)
+
+        if self.resource.on_timeout:
+          Logger.info("Executing '%s'. Reason: %s" % (self.resource.on_timeout, err_msg))
+          _call_command(self.resource.on_timeout)
+        else:
+          raise Fail(err_msg)
+
+
+class DirectoryProvider(Provider):
+  def action_create(self):
+    path = DirectoryProvider._trim_uri(self.resource.path)
+    if not os.path.exists(path):
+      Logger.info("Creating directory %s" % self.resource)
+      if self.resource.recursive:
+        os.makedirs(path)
+      else:
+        dirname = os.path.dirname(path)
+        if not os.path.isdir(dirname):
+          raise Fail("Applying %s failed, parent directory %s doesn't exist" % (self.resource, dirname))
+
+        os.mkdir(path)
+
+    if not os.path.isdir(path):
+      raise Fail("Applying %s failed, file %s already exists" % (self.resource, path))
+
+    if self.resource.owner and self.resource.mode:
+      _set_file_acl(path, self.resource.owner, self.resource.mode)
+
+  def action_delete(self):
+    path = self.resource.path
+    if os.path.exists(path):
+      if not os.path.isdir(path):
+        raise Fail("Applying %s failed, %s is not a directory" % (self.resource, path))
+
+      Logger.info("Removing directory %s and all its content" % self.resource)
+      shutil.rmtree(path)
+
+  @staticmethod
+  def _trim_uri(file_uri):
+    if file_uri.startswith("file:///"):
+      return file_uri[8:]
+    return file_uri
+    # class res: pass
+    # resource = res()
+    # resource.creates = None
+    # resource.path =[]
+    # resource.tries = 1
+    # resource.logoutput = True
+    # resource.cwd = None
+    # resource.environment = None
+    # resource.wait_for_finish = True
+    # resource.timeout = None
+    # resource.command = "cmd /C echo 1 & echo 2"
+    # provider = ExecuteProvider(resource)
+    # provider.action_run()
+    # pass
+    # _set_file_acl("C:\\lol.txt", "Administrator","f")
+    # pass
+    # pass
\ No newline at end of file

http://git-wip-us.apache.org/repos/asf/ambari/blob/8de3425f/ambari-common/src/main/python/resource_management/libraries/functions/__init__.py
----------------------------------------------------------------------
diff --git a/ambari-common/src/main/python/resource_management/libraries/functions/__init__.py b/ambari-common/src/main/python/resource_management/libraries/functions/__init__.py
index 36b8e51..9b32b92 100644
--- a/ambari-common/src/main/python/resource_management/libraries/functions/__init__.py
+++ b/ambari-common/src/main/python/resource_management/libraries/functions/__init__.py
@@ -20,6 +20,8 @@ Ambari Agent
 
 """
 
+import platform
+
 from resource_management.libraries.functions.default import *
 from resource_management.libraries.functions.format import *
 from resource_management.libraries.functions.get_kinit_path import *
@@ -31,3 +33,10 @@ from resource_management.libraries.functions.get_port_from_url import *
 from resource_management.libraries.functions.hive_check import *
 from resource_management.libraries.functions.version import *
 from resource_management.libraries.functions.format_jvm_option import *
+
+IS_WINDOWS = platform.system() == "Windows"
+
+if IS_WINDOWS:
+  from resource_management.libraries.functions.windows_service_utils import *
+  from resource_management.libraries.functions.install_hdp_msi import *
+  from resource_management.libraries.functions.reload_windows_env import *

http://git-wip-us.apache.org/repos/asf/ambari/blob/8de3425f/ambari-common/src/main/python/resource_management/libraries/functions/default.py
----------------------------------------------------------------------
diff --git a/ambari-common/src/main/python/resource_management/libraries/functions/default.py b/ambari-common/src/main/python/resource_management/libraries/functions/default.py
index 733c03a..16782de 100644
--- a/ambari-common/src/main/python/resource_management/libraries/functions/default.py
+++ b/ambari-common/src/main/python/resource_management/libraries/functions/default.py
@@ -20,7 +20,7 @@ Ambari Agent
 
 """
 
-__all__ = ["default"]
+__all__ = ['default', 'default_string']
 from resource_management.libraries.script import Script
 from resource_management.libraries.script.config_dictionary import UnknownConfiguration
 from resource_management.core.logger import Logger
@@ -37,4 +37,8 @@ def default(name, default_value):
         Logger.debug("Cannot find configuration: '%s'. Using '%s' value as default" % (name, default_value))
       return default_value
 
-  return curr_dict
\ No newline at end of file
+  return curr_dict
+
+def default_string(name, default_value, delimiter):
+  default_list = default(name, default_value)
+  return delimiter.join(default_list)

http://git-wip-us.apache.org/repos/asf/ambari/blob/8de3425f/ambari-common/src/main/python/resource_management/libraries/functions/get_unique_id_and_date.py
----------------------------------------------------------------------
diff --git a/ambari-common/src/main/python/resource_management/libraries/functions/get_unique_id_and_date.py b/ambari-common/src/main/python/resource_management/libraries/functions/get_unique_id_and_date.py
index a79a1e5..bb68270 100644
--- a/ambari-common/src/main/python/resource_management/libraries/functions/get_unique_id_and_date.py
+++ b/ambari-common/src/main/python/resource_management/libraries/functions/get_unique_id_and_date.py
@@ -23,12 +23,19 @@ Ambari Agent
 __all__ = ["get_unique_id_and_date"]
 import datetime
 from resource_management.core import shell
-
+from ambari_commons import os_check
 def get_unique_id_and_date():
+  if os_check.OSCheck.is_windows_os():
+    from ambari_commons.os_windows import run_os_command
+    code, out, err = run_os_command("cmd /c vol C:")
+    for line in out.splitlines():
+      if line.startswith(" Volume Serial Number is"):
+        id = line[25:]
+  else:
     out = shell.checked_call("hostid")[1].split('\n')[-1] # bugfix: take the lastline (stdin is not tty part cut)
     id = out.strip()
 
-    now = datetime.datetime.now()
-    date = now.strftime("%M%d%y")
+  now = datetime.datetime.now()
+  date = now.strftime("%M%d%y")
 
-    return "id{id}_date{date}".format(id=id, date=date)
+  return "id{id}_date{date}".format(id=id, date=date)

http://git-wip-us.apache.org/repos/asf/ambari/blob/8de3425f/ambari-common/src/main/python/resource_management/libraries/functions/install_hdp_msi.py
----------------------------------------------------------------------
diff --git a/ambari-common/src/main/python/resource_management/libraries/functions/install_hdp_msi.py b/ambari-common/src/main/python/resource_management/libraries/functions/install_hdp_msi.py
new file mode 100644
index 0000000..a7c2fe2
--- /dev/null
+++ b/ambari-common/src/main/python/resource_management/libraries/functions/install_hdp_msi.py
@@ -0,0 +1,182 @@
+"""
+Licensed to the Apache Software Foundation (ASF) under one
+or more contributor license agreements.  See the NOTICE file
+distributed with this work for additional information
+regarding copyright ownership.  The ASF licenses this file
+to you under the Apache License, Version 2.0 (the
+"License"); you may not use this file except in compliance
+with the License.  You may obtain a copy of the License at
+
+    http://www.apache.org/licenses/LICENSE-2.0
+
+Unless required by applicable law or agreed to in writing, software
+distributed under the License is distributed on an "AS IS" BASIS,
+WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+See the License for the specific language governing permissions and
+limitations under the License.
+
+Ambari Agent
+
+"""
+from ambari_commons import os_utils
+from ambari_commons.inet_utils import download_file
+from ambari_commons.os_windows import SystemWideLock
+
+from resource_management.core.resources.system import Execute
+from resource_management.core.resources.system import File
+from resource_management.core.logger import Logger
+from resource_management.core.exceptions import Fail
+from resource_management.libraries.functions.reload_windows_env import reload_windows_env
+from resource_management.libraries.functions.windows_service_utils import check_windows_service_exists
+import socket
+import os
+import glob
+
+
+__all__ = ['install_windows_msi']
+
+msi_save_dir = None
+hdp_log_dir = "c:\\hadoop\\logs"
+hdp_data_dir = "c:\\hadoopDefaultData"
+local_host = socket.getfqdn()
+db_flavor = "DERBY"
+cluster_properties = """#Log directory
+HDP_LOG_DIR={hdp_log_dir}
+
+#Data directory
+HDP_DATA_DIR={hdp_data_dir}
+
+#hosts
+NAMENODE_HOST={local_host}
+SECONDARY_NAMENODE_HOST={local_host}
+RESOURCEMANAGER_HOST={local_host}
+HIVE_SERVER_HOST={local_host}
+OOZIE_SERVER_HOST={local_host}
+WEBHCAT_HOST={local_host}
+SLAVE_HOSTS={local_host}
+ZOOKEEPER_HOSTS={local_host}
+CLIENT_HOSTS={local_host}
+HBASE_MASTER={local_host}
+HBASE_REGIONSERVERS={local_host}
+FLUME_HOSTS={local_host}
+FALCON_HOST={local_host}
+KNOX_HOST={local_host}
+STORM_NIMBUS={local_host}
+STORM_SUPERVISORS={local_host}
+
+#Database host
+DB_FLAVOR={db_flavor}
+DB_HOSTNAME={local_host}
+DB_PORT=1527
+
+#Hive properties
+HIVE_DB_NAME=hive
+HIVE_DB_USERNAME=hive
+HIVE_DB_PASSWORD=hive
+
+#Oozie properties
+OOZIE_DB_NAME=oozie
+OOZIE_DB_USERNAME=oozie
+OOZIE_DB_PASSWORD=oozie
+"""
+
+INSTALL_MSI_CMD = 'cmd /C start /wait msiexec /qn /i  {hdp_msi_path} /lv {hdp_log_path} MSIUSEREALADMINDETECTION=1 ' \
+                  'HDP_LAYOUT={hdp_layout_path} DESTROY_DATA=yes HDP_USER_PASSWORD={hadoop_password_arg} HDP=yes ' \
+                  'KNOX=yes KNOX_MASTER_SECRET="AmbariHDP2Windows" FALCON=yes STORM=yes HBase=yes STORM=yes FLUME=yes'
+CREATE_SERVICE_SCRIPT = os.path.abspath("sbin\createservice.ps1")
+CREATE_SERVICE_CMD = 'cmd /C powershell -File "{script}" -username hadoop -password "{password}" -servicename ' \
+                     '{servicename} -hdpresourcesdir "{resourcedir}" -servicecmdpath "{servicecmd}"'
+INSTALL_MARKER_OK = "msi.installed"
+INSTALL_MARKER_FAILED = "msi.failed"
+_working_dir = None
+
+
+def _ensure_services_created(hadoop_password):
+  resource_dir_hdfs = os.path.join(os.environ["HADOOP_HDFS_HOME"], "bin")
+  service_cmd_hdfs = os.path.join(os.environ["HADOOP_HDFS_HOME"], "bin", "hdfs.cmd")
+  if not check_windows_service_exists("journalnode"):
+    Execute(CREATE_SERVICE_CMD.format(script=CREATE_SERVICE_SCRIPT, password=hadoop_password, servicename="journalnode",
+                                      resourcedir=resource_dir_hdfs, servicecmd=service_cmd_hdfs), logoutput=True)
+  if not check_windows_service_exists("zkfc"):
+    Execute(CREATE_SERVICE_CMD.format(script=CREATE_SERVICE_SCRIPT, password=hadoop_password, servicename="zkfc",
+                                      resourcedir=resource_dir_hdfs, servicecmd=service_cmd_hdfs), logoutput=True)
+
+
+# creating symlinks to services folders to avoid using stack-dependent paths
+def _create_symlinks():
+  # folders
+  Execute("cmd /c mklink /d %HADOOP_NODE%\\hadoop %HADOOP_HOME%")
+  Execute("cmd /c mklink /d %HADOOP_NODE%\\hive %HIVE_HOME%")
+  # files pairs (symlink_path, path_template_to_target_file), use * to replace file version
+  links_pairs = [
+    ("%HADOOP_HOME%\\share\\hadoop\\tools\\lib\\hadoop-streaming.jar",
+     "%HADOOP_HOME%\\share\\hadoop\\tools\\lib\\hadoop-streaming-*.jar"),
+    ("%HIVE_HOME%\\hcatalog\\share\\webhcat\\svr\\lib\\hive-webhcat.jar",
+     "%HIVE_HOME%\\hcatalog\\share\\webhcat\\svr\\lib\\hive-webhcat-*.jar"),
+    ("%HIVE_HOME%\\lib\\zookeeper.jar", "%HIVE_HOME%\\lib\\zookeeper-*.jar")
+  ]
+  for link_pair in links_pairs:
+    link, target = link_pair
+    target = glob.glob(os.path.expandvars(target))[0].replace("\\\\", "\\")
+    Execute('cmd /c mklink "{0}" "{1}"'.format(link, target))
+
+
+# check if services exists and marker file present
+def _is_msi_installed():
+  return os.path.exists(os.path.join(_working_dir, INSTALL_MARKER_OK)) and check_windows_service_exists("namenode")
+
+
+# check if msi was installed correctly and raise Fail in case of broken install
+def _validate_msi_install():
+  if not _is_msi_installed() and os.path.exists(os.path.join(_working_dir, INSTALL_MARKER_FAILED)):
+    Fail("Current or previous hdp.msi install failed. Check hdp.msi install logs")
+  return _is_msi_installed()
+
+
+def _write_marker():
+  if check_windows_service_exists("namenode"):
+    open(os.path.join(_working_dir, INSTALL_MARKER_OK), "w").close()
+  else:
+    open(os.path.join(_working_dir, INSTALL_MARKER_FAILED), "w").close()
+
+
+def install_windows_msi(msi_url, save_dir, save_file, hadoop_password):
+  global _working_dir
+  _working_dir = save_dir
+  save_dir = os.path.abspath(save_dir)
+  msi_save_dir = save_dir
+  # system wide lock to prevent simultaneous installations(when first task failed on timeout)
+  install_lock = SystemWideLock("hdp_msi_lock")
+  try:
+    # try to acquire lock
+    if not install_lock.lock():
+      Logger.info("Some other task currently installing hdp.msi, waiting for 10 min for finish")
+      if not install_lock.lock(600000):
+        raise Fail("Timeout on acquiring lock")
+    if _validate_msi_install():
+      Logger.info("hdp.msi already installed")
+      return
+
+    # install msi
+    download_file(msi_url, os.path.join(msi_save_dir, save_file))
+    File(os.path.join(msi_save_dir, "properties.txt"), content=cluster_properties.format(hdp_log_dir=hdp_log_dir,
+                                                                                         hdp_data_dir=hdp_data_dir,
+                                                                                         local_host=local_host,
+                                                                                         db_flavor=db_flavor))
+    hdp_msi_path = os_utils.quote_path(os.path.join(save_dir, "hdp.msi"))
+    hdp_log_path = os_utils.quote_path(os.path.join(save_dir, "hdp.log"))
+    hdp_layout_path = os_utils.quote_path(os.path.join(save_dir, "properties.txt"))
+    hadoop_password_arg = os_utils.quote_path(hadoop_password)
+
+    Execute(
+      INSTALL_MSI_CMD.format(hdp_msi_path=hdp_msi_path, hdp_log_path=hdp_log_path, hdp_layout_path=hdp_layout_path,
+                             hadoop_password_arg=hadoop_password_arg))
+    reload_windows_env()
+    # create additional services manually due to hdp.msi limitaitons
+    _ensure_services_created(hadoop_password)
+    _create_symlinks()
+    # finalizing install
+    _write_marker()
+    _validate_msi_install()
+  finally:
+    install_lock.unlock()

http://git-wip-us.apache.org/repos/asf/ambari/blob/8de3425f/ambari-common/src/main/python/resource_management/libraries/functions/reload_windows_env.py
----------------------------------------------------------------------
diff --git a/ambari-common/src/main/python/resource_management/libraries/functions/reload_windows_env.py b/ambari-common/src/main/python/resource_management/libraries/functions/reload_windows_env.py
new file mode 100644
index 0000000..f6f3626
--- /dev/null
+++ b/ambari-common/src/main/python/resource_management/libraries/functions/reload_windows_env.py
@@ -0,0 +1,48 @@
+"""
+Licensed to the Apache Software Foundation (ASF) under one
+or more contributor license agreements.  See the NOTICE file
+distributed with this work for additional information
+regarding copyright ownership.  The ASF licenses this file
+to you under the Apache License, Version 2.0 (the
+"License"); you may not use this file except in compliance
+with the License.  You may obtain a copy of the License at
+
+    http://www.apache.org/licenses/LICENSE-2.0
+
+Unless required by applicable law or agreed to in writing, software
+distributed under the License is distributed on an "AS IS" BASIS,
+WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+See the License for the specific language governing permissions and
+limitations under the License.
+
+Ambari Agent
+
+"""
+
+from _winreg import (OpenKey, EnumValue, HKEY_LOCAL_MACHINE, KEY_READ, CloseKey)
+import os
+
+default_whitelist = ["FALCON_CONF_DIR", "FALCON_DATA_DIR", "FALCON_HOME", "FALCON_LOG_DIR", "FLUME_HOME",
+                     "HADOOP_COMMON_HOME", "HADOOP_CONF_DIR", "HADOOP_HDFS_HOME", "HADOOP_HOME", "HADOOP_LOG_DIR",
+                     "HADOOP_MAPRED_HOME", "HADOOP_NODE", "HADOOP_NODE_INSTALL_ROOT", "HADOOP_PACKAGES",
+                     "HADOOP_SETUP_TOOLS", "HADOOP_YARN_HOME", "HBASE_CONF_DIR", "HBASE_HOME", "HCAT_HOME",
+                     "HDFS_AUDIT_LOGGER", "HDFS_DATA_DIR", "HIVE_CONF_DIR", "HIVE_HOME", "HIVE_LIB_DIR", "HIVE_LOG_DIR",
+                     "HIVE_OPTS", "KNOX_CONF_DIR", "KNOX_HOME", "KNOX_LOG_DIR", "MAHOUT_HOME", "OOZIE_DATA",
+                     "OOZIE_HOME", "OOZIE_LOG", "OOZIE_ROOT", "PIG_HOME", "SQOOP_HOME", "STORM_CONF_DIR", "STORM_HOME",
+                     "STORM_LOG_DIR", "WEBHCAT_CONF_DIR", "YARN_LOG_DIR", "ZOOKEEPER_CONF_DIR", "ZOOKEEPER_HOME",
+                     "ZOOKEEPER_LIB_DIR", "ZOO_LOG_DIR"]
+def reload_windows_env(keys_white_list=default_whitelist):
+  root = HKEY_LOCAL_MACHINE
+  subkey = r'SYSTEM\CurrentControlSet\Control\Session Manager\Environment'
+  key = OpenKey(root, subkey, 0, KEY_READ)
+  finish = False
+  index = 0
+  while not finish:
+    try:
+      _key, _value, _ = EnumValue(key, index)
+      if (_key in keys_white_list):
+        os.environ[_key] = _value
+    except WindowsError:
+      finish = True
+    index += 1
+  CloseKey(key)

http://git-wip-us.apache.org/repos/asf/ambari/blob/8de3425f/ambari-common/src/main/python/resource_management/libraries/functions/tar_archive.py
----------------------------------------------------------------------
diff --git a/ambari-common/src/main/python/resource_management/libraries/functions/tar_archive.py b/ambari-common/src/main/python/resource_management/libraries/functions/tar_archive.py
new file mode 100644
index 0000000..efbf933
--- /dev/null
+++ b/ambari-common/src/main/python/resource_management/libraries/functions/tar_archive.py
@@ -0,0 +1,30 @@
+#!/usr/bin/env python
+
+'''
+Licensed to the Apache Software Foundation (ASF) under one
+or more contributor license agreements.  See the NOTICE file
+distributed with this work for additional information
+regarding copyright ownership.  The ASF licenses this file
+to you under the Apache License, Version 2.0 (the
+"License"); you may not use this file except in compliance
+with the License.  You may obtain a copy of the License at
+
+    http://www.apache.org/licenses/LICENSE-2.0
+
+Unless required by applicable law or agreed to in writing, software
+distributed under the License is distributed on an "AS IS" BASIS,
+WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+See the License for the specific language governing permissions and
+limitations under the License.
+'''
+
+import os
+import tarfile
+from contextlib import closing
+
+def archive_dir(output_filename, input_dir):
+  with closing(tarfile.open(output_filename, "w:gz")) as tar:
+    try:
+      tar.add(input_dir, arcname=os.path.basename("."))
+    finally:
+      tar.close()

http://git-wip-us.apache.org/repos/asf/ambari/blob/8de3425f/ambari-common/src/main/python/resource_management/libraries/functions/windows_service_utils.py
----------------------------------------------------------------------
diff --git a/ambari-common/src/main/python/resource_management/libraries/functions/windows_service_utils.py b/ambari-common/src/main/python/resource_management/libraries/functions/windows_service_utils.py
new file mode 100644
index 0000000..7d994b7
--- /dev/null
+++ b/ambari-common/src/main/python/resource_management/libraries/functions/windows_service_utils.py
@@ -0,0 +1,42 @@
+"""
+Licensed to the Apache Software Foundation (ASF) under one
+or more contributor license agreements.  See the NOTICE file
+distributed with this work for additional information
+regarding copyright ownership.  The ASF licenses this file
+to you under the Apache License, Version 2.0 (the
+"License"); you may not use this file except in compliance
+with the License.  You may obtain a copy of the License at
+
+    http://www.apache.org/licenses/LICENSE-2.0
+
+Unless required by applicable law or agreed to in writing, software
+distributed under the License is distributed on an "AS IS" BASIS,
+WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+See the License for the specific language governing permissions and
+limitations under the License.
+
+Ambari Agent
+
+"""
+
+from resource_management.core.exceptions import ComponentIsNotRunning
+from resource_management.core.logger import Logger
+__all__ = ['check_windows_service_status', 'check_windows_service_exists']
+
+import win32service
+
+_schSCManager = win32service.OpenSCManager(None, None, win32service.SC_MANAGER_ALL_ACCESS)
+
+def check_windows_service_status(service_name):
+  _service_handle = win32service.OpenService(_schSCManager, service_name, win32service.SERVICE_ALL_ACCESS)
+  if win32service.QueryServiceStatusEx(_service_handle)["CurrentState"] == win32service.SERVICE_STOPPED:
+      raise ComponentIsNotRunning()
+
+def check_windows_service_exists(service_name):
+  typeFilter = win32service.SERVICE_WIN32
+  stateFilter = win32service.SERVICE_STATE_ALL
+  statuses = win32service.EnumServicesStatus(_schSCManager, typeFilter, stateFilter)
+  for (short_name, desc, status) in statuses:
+    if short_name == service_name:
+      return True
+  return False

http://git-wip-us.apache.org/repos/asf/ambari/blob/8de3425f/ambari-common/src/main/python/resource_management/libraries/functions/zip_archive.py
----------------------------------------------------------------------
diff --git a/ambari-common/src/main/python/resource_management/libraries/functions/zip_archive.py b/ambari-common/src/main/python/resource_management/libraries/functions/zip_archive.py
new file mode 100644
index 0000000..cab3627
--- /dev/null
+++ b/ambari-common/src/main/python/resource_management/libraries/functions/zip_archive.py
@@ -0,0 +1,40 @@
+#!/usr/bin/env python
+
+'''
+Licensed to the Apache Software Foundation (ASF) under one
+or more contributor license agreements.  See the NOTICE file
+distributed with this work for additional information
+regarding copyright ownership.  The ASF licenses this file
+to you under the Apache License, Version 2.0 (the
+"License"); you may not use this file except in compliance
+with the License.  You may obtain a copy of the License at
+
+    http://www.apache.org/licenses/LICENSE-2.0
+
+Unless required by applicable law or agreed to in writing, software
+distributed under the License is distributed on an "AS IS" BASIS,
+WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+See the License for the specific language governing permissions and
+limitations under the License.
+'''
+
+import os
+import zipfile
+
+def _zip_dir(zip, root):
+  for dirname, dirnames, filenames in os.walk(root):
+    for filename in filenames:
+      if len(dirname) > len(root):
+        rel_path = os.path.relpath(dirname, root)
+        arch_name = rel_path + os.sep + filename
+      else:
+        arch_name = filename
+      zip.write(os.path.join(dirname, filename), arch_name)
+
+
+def archive_dir(output_filename, input_dir):
+  zipf = zipfile.ZipFile(output_filename, 'w')
+  try:
+    _zip_dir(zipf, input_dir)
+  finally:
+    zipf.close()

http://git-wip-us.apache.org/repos/asf/ambari/blob/8de3425f/ambari-common/src/main/python/resource_management/libraries/providers/__init__.py
----------------------------------------------------------------------
diff --git a/ambari-common/src/main/python/resource_management/libraries/providers/__init__.py b/ambari-common/src/main/python/resource_management/libraries/providers/__init__.py
index 5ca7bd9..80e0a14 100644
--- a/ambari-common/src/main/python/resource_management/libraries/providers/__init__.py
+++ b/ambari-common/src/main/python/resource_management/libraries/providers/__init__.py
@@ -30,6 +30,9 @@ PROVIDERS = dict(
   ubuntu=dict(
     Repository="resource_management.libraries.providers.repository.UbuntuRepositoryProvider",
   ),
+  winsrv=dict(
+
+  ),
   default=dict(
     ExecuteHadoop="resource_management.libraries.providers.execute_hadoop.ExecuteHadoopProvider",
     TemplateConfig="resource_management.libraries.providers.template_config.TemplateConfigProvider",

http://git-wip-us.apache.org/repos/asf/ambari/blob/8de3425f/ambari-common/src/main/python/resource_management/libraries/providers/xml_config.py
----------------------------------------------------------------------
diff --git a/ambari-common/src/main/python/resource_management/libraries/providers/xml_config.py b/ambari-common/src/main/python/resource_management/libraries/providers/xml_config.py
index 87fc657..b5c2b54 100644
--- a/ambari-common/src/main/python/resource_management/libraries/providers/xml_config.py
+++ b/ambari-common/src/main/python/resource_management/libraries/providers/xml_config.py
@@ -21,13 +21,14 @@ Ambari Agent
 """
 
 import time
+import os
 from resource_management import *
 
 class XmlConfigProvider(Provider):
   def action_create(self):
     filename = self.resource.filename
     xml_config_provider_config_dir = self.resource.conf_dir
-    
+
     # |e - for html-like escaping of <,>,',"
     config_content = InlineTemplate('''<!--{{time.asctime(time.localtime())}}-->
     <configuration>
@@ -48,12 +49,12 @@ class XmlConfigProvider(Provider):
     {% endfor %}
   </configuration>''', extra_imports=[time], configurations_dict=self.resource.configurations,
                                     configuration_attrs=self.resource.configuration_attributes)
-   
-  
-    Logger.info(format("Generating config: {xml_config_provider_config_dir}/{filename}"))
-    
+
+    xml_config_dest_file_path = os.path.join(xml_config_provider_config_dir, filename)
+    Logger.info("Generating config: {0}".format(xml_config_dest_file_path))
+
     with Environment.get_instance_copy() as env:
-      File (format("{xml_config_provider_config_dir}/{filename}"),
+      File (xml_config_dest_file_path,
         content = config_content,
         owner = self.resource.owner,
         group = self.resource.group,

http://git-wip-us.apache.org/repos/asf/ambari/blob/8de3425f/ambari-common/src/main/python/resource_management/libraries/script/script.py
----------------------------------------------------------------------
diff --git a/ambari-common/src/main/python/resource_management/libraries/script/script.py b/ambari-common/src/main/python/resource_management/libraries/script/script.py
index 001922d..39511bc 100644
--- a/ambari-common/src/main/python/resource_management/libraries/script/script.py
+++ b/ambari-common/src/main/python/resource_management/libraries/script/script.py
@@ -17,7 +17,6 @@ WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
 See the License for the specific language governing permissions and
 limitations under the License.
 '''
-import tarfile
 import tempfile
 
 __all__ = ["Script"]
@@ -26,19 +25,24 @@ import os
 import sys
 import json
 import logging
-from contextlib import closing
-
+import platform
 
 from resource_management.libraries.resources import XmlConfig
 from resource_management.libraries.resources import PropertiesFile
 from resource_management.core.resources import File, Directory
 from resource_management.core.source import InlineTemplate
-
 from resource_management.core.environment import Environment
 from resource_management.core.exceptions import Fail, ClientComponentHasNoStatus, ComponentIsNotRunning
 from resource_management.core.resources.packaging import Package
-from resource_management.libraries.script.config_dictionary import ConfigDictionary
+from resource_management.libraries.script.config_dictionary import ConfigDictionary, UnknownConfiguration
 
+IS_WINDOWS = platform.system() == "Windows"
+if IS_WINDOWS:
+  from resource_management.libraries.functions.install_hdp_msi import install_windows_msi
+  from resource_management.libraries.functions.reload_windows_env import reload_windows_env
+  from resource_management.libraries.functions.zip_archive import archive_dir
+else:
+  from resource_management.libraries.functions.tar_archive import archive_dir
 
 USAGE = """Usage: {0} <COMMAND> <JSON_CONFIG> <BASEDIR> <STROUTPUT> <LOGGING_LEVEL> <TMP_DIR>
 
@@ -50,6 +54,19 @@ USAGE = """Usage: {0} <COMMAND> <JSON_CONFIG> <BASEDIR> <STROUTPUT> <LOGGING_LEV
 <TMP_DIR> temporary directory for executable scripts. Ex: /var/lib/ambari-agent/data/tmp
 """
 
+_PASSWORD_MAP = {"/configurations/cluster-env/hadoop.user.name":"/configurations/cluster-env/hadoop.user.password"}
+
+def get_path_form_configuration(name, configuration):
+  subdicts = filter(None, name.split('/'))
+
+  for x in subdicts:
+    if x in configuration:
+      configuration = configuration[x]
+    else:
+      return None
+
+  return configuration
+
 class Script(object):
   """
   Executes a command for custom service. stdout and stderr are written to
@@ -91,13 +108,13 @@ class Script(object):
     cherr.setFormatter(formatter)
     logger.addHandler(cherr)
     logger.addHandler(chout)
-    
+
     # parse arguments
-    if len(sys.argv) < 7: 
+    if len(sys.argv) < 7:
      logger.error("Script expects at least 6 arguments")
      print USAGE.format(os.path.basename(sys.argv[0])) # print to stdout
      sys.exit(1)
-    
+
     command_name = str.lower(sys.argv[1])
     command_data_file = sys.argv[2]
     basedir = sys.argv[3]
@@ -108,11 +125,23 @@ class Script(object):
     logging_level_str = logging._levelNames[logging_level]
     chout.setLevel(logging_level_str)
     logger.setLevel(logging_level_str)
-      
+
+    # on windows we need to reload some of env variables manually because there is no default paths for configs(like
+    # /etc/something/conf on linux. When this env vars created by one of the Script execution, they can not be updated
+    # in agent, so other Script executions will not be able to access to new env variables
+    if platform.system() == "Windows":
+      reload_windows_env()
+
     try:
       with open(command_data_file, "r") as f:
         pass
         Script.config = ConfigDictionary(json.load(f))
+        #load passwords here(used on windows to impersonate different users)
+        Script.passwords = {}
+        for k, v in _PASSWORD_MAP.iteritems():
+          if get_path_form_configuration(k,Script.config) and get_path_form_configuration(v,Script.config ):
+            Script.passwords[get_path_form_configuration(k,Script.config)] = get_path_form_configuration(v,Script.config)
+
     except IOError:
       logger.exception("Can not read json file with command parameters: ")
       sys.exit(1)
@@ -150,6 +179,9 @@ class Script(object):
     """
     return Script.config
 
+  @staticmethod
+  def get_password(user):
+    return Script.passwords[user]
 
   @staticmethod
   def get_tmp_dir():
@@ -170,28 +202,39 @@ class Script(object):
     self.install_packages(env)
 
 
-  def install_packages(self, env, exclude_packages=[]):
-    """
-    List of packages that are required< by service is received from the server
-    as a command parameter. The method installs all packages
-    from this list
-    """
-    config = self.get_config()
-    
-    try:
-      package_list_str = config['hostLevelParams']['package_list']
-      if isinstance(package_list_str,basestring) and len(package_list_str) > 0:
-        package_list = json.loads(package_list_str)
-        for package in package_list:
-          if not package['name'] in exclude_packages:
-            name = package['name']
-            Package(name)
-    except KeyError:
-      pass # No reason to worry
-    
-    #RepoInstaller.remove_repos(config)
-
-
+  if not IS_WINDOWS:
+    def install_packages(self, env, exclude_packages=[]):
+      """
+      List of packages that are required< by service is received from the server
+      as a command parameter. The method installs all packages
+      from this list
+      """
+      config = self.get_config()
+      try:
+        package_list_str = config['hostLevelParams']['package_list']
+        if isinstance(package_list_str, basestring) and len(package_list_str) > 0:
+          package_list = json.loads(package_list_str)
+          for package in package_list:
+            if not package['name'] in exclude_packages:
+              name = package['name']
+              Package(name)
+      except KeyError:
+        pass  # No reason to worry
+
+        # RepoInstaller.remove_repos(config)
+      pass
+  else:
+    def install_packages(self, env, exclude_packages=[]):
+      """
+      List of packages that are required< by service is received from the server
+      as a command parameter. The method installs all packages
+      from this list
+      """
+      config = self.get_config()
+
+      install_windows_msi(os.path.join(config['hostLevelParams']['jdk_location'], "hdp.msi"),
+                          config["hostLevelParams"]["agentCacheDir"], "hdp.msi", self.get_password("hadoop"))
+      pass
 
   def fail_with_error(self, message):
     """
@@ -239,56 +282,60 @@ class Script(object):
     self.fail_with_error('configure method isn\'t implemented')
 
   def generate_configs_get_template_file_content(self, filename, dicts):
-    import params
+    config = self.get_config()
     content = ''
     for dict in dicts.split(','):
-      if dict.strip() in params.config['configurations']:
-        content += params.config['configurations'][dict.strip()]['content']
+      if dict.strip() in config['configurations']:
+        try:
+          content += config['configurations'][dict.strip()]['content']
+        except Fail:
+          # 'content' section not available in the component client configuration
+          pass
 
     return content
 
   def generate_configs_get_xml_file_content(self, filename, dict):
-    import params
-    return {'configurations':params.config['configurations'][dict],
-            'configuration_attributes':params.config['configuration_attributes'][dict]}
+    config = self.get_config()
+    return {'configurations':config['configurations'][dict],
+            'configuration_attributes':config['configuration_attributes'][dict]}
     
   def generate_configs_get_xml_file_dict(self, filename, dict):
-    import params
-    return params.config['configurations'][dict]
+    config = self.get_config()
+    return config['configurations'][dict]
 
   def generate_configs(self, env):
     """
     Generates config files and stores them as an archive in tmp_dir
     based on xml_configs_list and env_configs_list from commandParams
     """
-    import params
-    env.set_params(params)
-    xml_configs_list = params.config['commandParams']['xml_configs_list']
-    env_configs_list = params.config['commandParams']['env_configs_list']
-    properties_configs_list = params.config['commandParams']['properties_configs_list']
-    
-    conf_tmp_dir = tempfile.mkdtemp()
-    output_filename = os.path.join(self.get_tmp_dir(),params.config['commandParams']['output_file'])
+    config = self.get_config()
+
+    xml_configs_list = config['commandParams']['xml_configs_list']
+    env_configs_list = config['commandParams']['env_configs_list']
+    properties_configs_list = config['commandParams']['properties_configs_list']
 
     Directory(self.get_tmp_dir(), recursive=True)
-    for file_dict in xml_configs_list:
-      for filename, dict in file_dict.iteritems():
-        XmlConfig(filename,
-                  conf_dir=conf_tmp_dir,
-                  **self.generate_configs_get_xml_file_content(filename, dict)
-        )
-    for file_dict in env_configs_list:
-      for filename,dicts in file_dict.iteritems():
-        File(os.path.join(conf_tmp_dir, filename),
-             content=InlineTemplate(self.generate_configs_get_template_file_content(filename, dicts)))
-        
-    for file_dict in properties_configs_list:
-      for filename, dict in file_dict.iteritems():
-        PropertiesFile(os.path.join(conf_tmp_dir, filename),
-          properties=self.generate_configs_get_xml_file_dict(filename, dict)
-        )
-      
-    with closing(tarfile.open(output_filename, "w:gz")) as tar:
-      tar.add(conf_tmp_dir, arcname=os.path.basename("."))
-      tar.close()
-    Directory(conf_tmp_dir, action="delete")
+
+    conf_tmp_dir = tempfile.mkdtemp(dir=self.get_tmp_dir())
+    output_filename = os.path.join(self.get_tmp_dir(), config['commandParams']['output_file'])
+
+    try:
+      for file_dict in xml_configs_list:
+        for filename, dict in file_dict.iteritems():
+          XmlConfig(filename,
+                    conf_dir=conf_tmp_dir,
+                    **self.generate_configs_get_xml_file_content(filename, dict)
+          )
+      for file_dict in env_configs_list:
+        for filename,dicts in file_dict.iteritems():
+          File(os.path.join(conf_tmp_dir, filename),
+               content=InlineTemplate(self.generate_configs_get_template_file_content(filename, dicts)))
+
+      for file_dict in properties_configs_list:
+        for filename, dict in file_dict.iteritems():
+          PropertiesFile(os.path.join(conf_tmp_dir, filename),
+            properties=self.generate_configs_get_xml_file_dict(filename, dict)
+          )
+      archive_dir(output_filename, conf_tmp_dir)
+    finally:
+      Directory(conf_tmp_dir, action="delete")

http://git-wip-us.apache.org/repos/asf/ambari/blob/8de3425f/ambari-server/conf/unix/ambari.properties
----------------------------------------------------------------------
diff --git a/ambari-server/conf/unix/ambari.properties b/ambari-server/conf/unix/ambari.properties
index ed1994c..e5b9e7b 100644
--- a/ambari-server/conf/unix/ambari.properties
+++ b/ambari-server/conf/unix/ambari.properties
@@ -35,6 +35,7 @@ bootstrap.setup_agent.script=/usr/lib/python2.6/site-packages/ambari_server/setu
 recommendations.dir=/var/run/ambari-server/stack-recommendations
 stackadvisor.script=/var/lib/ambari-server/resources/scripts/stack_advisor.py
 server.tmp.dir=/var/lib/ambari-server/tmp
+ambari.python.wrap=ambari-python-wrap
 
 api.authenticate=true
 server.connection.max.idle.millis=900000

http://git-wip-us.apache.org/repos/asf/ambari/blob/8de3425f/ambari-server/conf/windows/ambari-env.cmd
----------------------------------------------------------------------
diff --git a/ambari-server/conf/windows/ambari-env.cmd b/ambari-server/conf/windows/ambari-env.cmd
new file mode 100644
index 0000000..23600d4
--- /dev/null
+++ b/ambari-server/conf/windows/ambari-env.cmd
@@ -0,0 +1,19 @@
+@echo off
+rem Licensed to the Apache Software Foundation (ASF) under one or more
+rem contributor license agreements.  See the NOTICE file distributed with
+rem this work for additional information regarding copyright ownership.
+rem The ASF licenses this file to You under the Apache License, Version 2.0
+rem (the "License"); you may not use this file except in compliance with
+rem the License.  You may obtain a copy of the License at
+rem
+rem     http://www.apache.org/licenses/LICENSE-2.0
+rem
+rem Unless required by applicable law or agreed to in writing, software
+rem distributed under the License is distributed on an "AS IS" BASIS,
+rem WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+rem See the License for the specific language governing permissions and
+rem limitations under the License.
+
+
+set AMBARI_PASSHPHRASE=DEV
+set AMBARI_JVM_ARGS=%AMBARI_JVM_ARGS% -Xms512m -Xmx2048m -Djava.security.auth.login.config=conf\krb5JAASLogin.conf -Djava.security.krb5.conf=conf\krb5.conf -Djavax.security.auth.useSubjectCredsOnly=false

http://git-wip-us.apache.org/repos/asf/ambari/blob/8de3425f/ambari-server/conf/windows/ambari.properties
----------------------------------------------------------------------
diff --git a/ambari-server/conf/windows/ambari.properties b/ambari-server/conf/windows/ambari.properties
new file mode 100644
index 0000000..fd3a7ba
--- /dev/null
+++ b/ambari-server/conf/windows/ambari.properties
@@ -0,0 +1,82 @@
+# Copyright 2011 The Apache Software Foundation
+#
+# Licensed to the Apache Software Foundation (ASF) under one
+# or more contributor license agreements.  See the NOTICE file
+# distributed with this work for additional information
+# regarding copyright ownership.  The ASF licenses this file
+# to you under the Apache License, Version 2.0 (the
+# "License"); you may not use this file except in compliance
+# with the License.  You may obtain a copy of the License at
+#
+#     http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing, software
+# distributed under the License is distributed on an "AS IS" BASIS,
+# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+# See the License for the specific language governing permissions and
+# limitations under the License.
+
+security.server.keys_dir=keystore
+resources.dir=resources
+shared.resources.dir = sbin\\ambari_commons\\resources
+custom.action.definitions=resources\\custom_action_definitions
+
+#Comma-separated list of JDK versions
+#java.releases=jdk1.8.20,jdk1.6.31
+java.releases=jdk1.7.67
+jdk1.7.67.desc=Oracle JDK 1.7.67
+jdk1.7.67.url=http://public-repo-1.hortonworks.com/ARTIFACTS/jdk-7u67-windows-x64.exe
+jdk1.7.67.dest-file=jdk-7u67-windows-x64.exe
+jdk1.7.67.jcpol-url=http://public-repo-1.hortonworks.com/ARTIFACTS/UnlimitedJCEPolicyJDK7.zip
+jdk1.7.67.jcpol-file=UnlimitedJCEPolicyJDK7.zip
+jdk1.7.67.home=C:\\jdk1.7.0_67
+
+metadata.path=resources\\stacks
+server.version.file=version
+webapp.dir=web
+bootstrap.dir=bootstrap
+bootstrap.script=bootstrap\\bootstrap.py
+bootstrap.setup_agent.script=bootstrap\\setupAgent.py
+api.authenticate=true
+server.connection.max.idle.millis=900000
+server.fqdn.service.url=http://127.0.0.1/latest/meta-data/public-hostname
+server.stages.parallel=true
+
+# Scheduler settings
+server.execution.scheduler.isClustered=false
+server.execution.scheduler.maxThreads=5
+server.execution.scheduler.maxDbConnections=5
+server.execution.scheduler.misfire.toleration.minutes=480
+
+recommendations.dir=\\var\\run\\ambari-server\\stack-recommendations
+stackadvisor.script=resources\\scripts\\stack_advisor.py
+server.tmp.dir=\\var\\run\\ambari-server\\tmp
+views.dir=resources\\views
+ambari.python.wrap=python.exe
+
+# Default timeout in seconds before task is killed
+agent.task.timeout=600
+
+# thread pool maximums
+client.threadpool.size.max=25
+agent.threadpool.size.max=25
+
+# linux open-file limit
+ulimit.open.files=10000
+
+#java.home=C:\j2se1.8.0_05
+
+#server.jdbc.rca.driver=com.microsoft.sqlserver.jdbc.SQLServerDriver
+#server.jdbc.rca.url=jdbc:sqlserver://localhost\\SQLEXPRESS;databaseName=ambari;integratedSecurity=true
+##server.jdbc.rca.user.name=ambari
+##server.jdbc.rca.user.passwd=etc\\ambari-server\\conf\\password.dat
+
+#server.jdbc.driver=com.microsoft.sqlserver.jdbc.SQLServerDriver
+#server.jdbc.driver.path=C:\\Program Files\\Microsoft JDBC DRIVER 4.0 for SQL Server\\sqljdbc_4.0\\enu\\sqljdbc4.jar
+#server.jdbc.url=jdbc:sqlserver://localhost\\SQLEXPRESS;databaseName=ambari;integratedSecurity=true
+#server.jdbc.schema=ambari
+##server.jdbc.user.passwd=etc\\ambari-server\\conf\\password.dat
+##server.jdbc.user.name=ambari
+#scom.sink.db.driver=com.microsoft.sqlserver.jdbc.SQLServerDriver
+##scom.sink.db.url=jdbc:sqlserver://[server]:[port];databaseName=[databaseName];user=[user];password=[password]
+#scom.sink.db.url=jdbc:sqlserver://localhost\\SQLEXPRESS;databaseName=HadoopMetrics;integratedSecurity=true

http://git-wip-us.apache.org/repos/asf/ambari/blob/8de3425f/ambari-server/conf/windows/ca.config
----------------------------------------------------------------------
diff --git a/ambari-server/conf/windows/ca.config b/ambari-server/conf/windows/ca.config
new file mode 100644
index 0000000..b4dd1c5
--- /dev/null
+++ b/ambari-server/conf/windows/ca.config
@@ -0,0 +1,29 @@
+[ ca ]
+default_ca             = CA_CLIENT
+[ CA_CLIENT ]
+dir		               = keystore\\db
+certs                  = $dir\\certs
+new_certs_dir          = $dir\\newcerts
+
+database               = $dir\\index.txt
+serial                 = $dir\\serial
+default_days           = 365
+
+default_crl_days       = 7
+default_md             = md5
+
+policy                 = policy_anything
+
+[ policy_anything ]
+countryName            = optional
+stateOrProvinceName    = optional
+localityName           = optional
+organizationName       = optional
+organizationalUnitName = optional
+commonName             = optional
+emailAddress           = optional
+
+[ jdk7_ca ]
+subjectKeyIdentifier = hash
+authorityKeyIdentifier = keyid:always,issuer:always
+basicConstraints = CA:true
\ No newline at end of file