You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@ambari.apache.org by fb...@apache.org on 2015/03/25 18:23:17 UTC

ambari git commit: AMBARI-10149 Agent Automatic Bootstrap: Install and setup Ambari Agent

Repository: ambari
Updated Branches:
  refs/heads/trunk 7f191208e -> 65b5b9c28


AMBARI-10149 Agent Automatic Bootstrap: Install and setup Ambari Agent

Implemented setupAgent.py on Windows.
+Downloading the Agent msi from the URL received as parameter
+Installing, configuring & starting the Agent as a Windows service


Project: http://git-wip-us.apache.org/repos/asf/ambari/repo
Commit: http://git-wip-us.apache.org/repos/asf/ambari/commit/65b5b9c2
Tree: http://git-wip-us.apache.org/repos/asf/ambari/tree/65b5b9c2
Diff: http://git-wip-us.apache.org/repos/asf/ambari/diff/65b5b9c2

Branch: refs/heads/trunk
Commit: 65b5b9c287483deb478f6fc2109703b624952dd5
Parents: 7f19120
Author: Florian Barca <fb...@hortonworks.com>
Authored: Wed Mar 25 10:22:58 2015 -0700
Committer: Florian Barca <fb...@hortonworks.com>
Committed: Wed Mar 25 10:22:58 2015 -0700

----------------------------------------------------------------------
 ambari-agent/conf/windows/service_wrapper.py    |  17 +-
 .../main/python/ambari_agent/AmbariConfig.py    |  25 ++
 .../src/main/python/ambari_commons/os_linux.py  |   3 +-
 .../src/main/python/ambari_commons/os_utils.py  |   8 +-
 .../main/python/ambari_commons/os_windows.py    | 160 +++++++++-
 .../libraries/functions/install_hdp_msi.py      |   2 +-
 ambari-server/src/main/python/setupAgent.py     | 291 ++++++++++++++++++-
 ambari-server/src/test/python/TestSetupAgent.py |  28 +-
 8 files changed, 512 insertions(+), 22 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/ambari/blob/65b5b9c2/ambari-agent/conf/windows/service_wrapper.py
----------------------------------------------------------------------
diff --git a/ambari-agent/conf/windows/service_wrapper.py b/ambari-agent/conf/windows/service_wrapper.py
index 5eb06c4..5831e11 100644
--- a/ambari-agent/conf/windows/service_wrapper.py
+++ b/ambari-agent/conf/windows/service_wrapper.py
@@ -15,6 +15,7 @@ WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
 See the License for the specific language governing permissions and
 limitations under the License.
 '''
+import ConfigParser
 import os
 import optparse
 import sys
@@ -29,7 +30,7 @@ from ambari_commons.exceptions import *
 from ambari_commons.logging_utils import *
 from ambari_commons.os_windows import WinServiceController
 from ambari_commons.os_utils import find_in_path
-from ambari_agent.AmbariConfig import AmbariConfig
+from ambari_agent.AmbariConfig import AmbariConfig, updateConfigServerHostname
 from ambari_agent.HeartbeatHandlers import HeartbeatStopHandlers
 
 AMBARI_VERSION_VAR = "AMBARI_VERSION_VAR"
@@ -120,10 +121,17 @@ def ctrlHandler(ctrlType):
   return True
 
 
-def svcsetup():
+#
+# Configures the Ambari Agent settings and registers the Windows service.
+#
+def setup(options):
+  config = AmbariConfig()
+  configFile = config.getConfigFile()
+
+  updateConfigServerHostname(configFile, options.host_name)
+
   AmbariAgentService.set_ctrl_c_handler(ctrlHandler)
   AmbariAgentService.Install()
-  pass
 
 
 #
@@ -210,8 +218,7 @@ def agent_main():
   options.exit_message = "Ambari Agent '%s' completed successfully." % action
   try:
     if action == SETUP_ACTION:
-      #TODO Insert setup(options) here upon need
-      svcsetup()
+      setup(options)
     elif action == START_ACTION:
       svcstart(options)
     elif action == DEBUG_ACTION:

http://git-wip-us.apache.org/repos/asf/ambari/blob/65b5b9c2/ambari-agent/src/main/python/ambari_agent/AmbariConfig.py
----------------------------------------------------------------------
diff --git a/ambari-agent/src/main/python/ambari_agent/AmbariConfig.py b/ambari-agent/src/main/python/ambari_agent/AmbariConfig.py
index 8975729..05af243 100644
--- a/ambari-agent/src/main/python/ambari_agent/AmbariConfig.py
+++ b/ambari-agent/src/main/python/ambari_agent/AmbariConfig.py
@@ -23,6 +23,10 @@ import StringIO
 import json
 from NetUtil import NetUtil
 import os
+
+from ambari_commons import OSConst
+from ambari_commons.os_family_impl import OsFamilyFuncImpl, OsFamilyImpl
+
 content = """
 
 [server]
@@ -173,6 +177,15 @@ class AmbariConfig:
     return self.config
 
   @staticmethod
+  @OsFamilyFuncImpl(OSConst.WINSRV_FAMILY)
+  def getConfigFile():
+    if 'AMBARI_AGENT_CONF_DIR' in os.environ:
+      return os.path.join(os.environ['AMBARI_AGENT_CONF_DIR'], "ambari-agent.ini")
+    else:
+      return "ambari-agent.ini"
+
+  @staticmethod
+  @OsFamilyFuncImpl(OsFamilyImpl.DEFAULT)
   def getConfigFile():
     if 'AMBARI_AGENT_CONF_DIR' in os.environ:
       return os.path.join(os.environ['AMBARI_AGENT_CONF_DIR'], "ambari-agent.ini")
@@ -232,6 +245,18 @@ class AmbariConfig:
       return False
 
 
+def updateConfigServerHostname(configFile, new_host):
+  # update agent config file
+  agent_config = ConfigParser.ConfigParser()
+  agent_config.read(configFile)
+  server_host = agent_config.get('server', 'hostname')
+  if new_host is not None and server_host != new_host:
+    print "Updating server host from " + server_host + " to " + new_host
+    agent_config.set('server', 'hostname', new_host)
+    with (open(configFile, "wb")) as new_agent_config:
+      agent_config.write(new_agent_config)
+
+
 def main():
   print AmbariConfig().config
 

http://git-wip-us.apache.org/repos/asf/ambari/blob/65b5b9c2/ambari-common/src/main/python/ambari_commons/os_linux.py
----------------------------------------------------------------------
diff --git a/ambari-common/src/main/python/ambari_commons/os_linux.py b/ambari-common/src/main/python/ambari_commons/os_linux.py
index 8599a0d..00db872 100644
--- a/ambari-common/src/main/python/ambari_commons/os_linux.py
+++ b/ambari-common/src/main/python/ambari_commons/os_linux.py
@@ -33,7 +33,7 @@ NR_CHOWN_CMD = 'chown {0} {1} {2}'
 ULIMIT_CMD = "ulimit -n"
 
 
-def os_run_os_command(cmd, env=None, shell=False):
+def os_run_os_command(cmd, env=None, shell=False, cwd=None):
   print_info_msg('about to run command: ' + str(cmd))
   if type(cmd) == str:
     cmd = shlex.split(cmd)
@@ -42,6 +42,7 @@ def os_run_os_command(cmd, env=None, shell=False):
                              stdin=subprocess.PIPE,
                              stderr=subprocess.PIPE,
                              env=env,
+                             cwd=cwd,
                              shell=shell
                              )
   (stdoutdata, stderrdata) = process.communicate()

http://git-wip-us.apache.org/repos/asf/ambari/blob/65b5b9c2/ambari-common/src/main/python/ambari_commons/os_utils.py
----------------------------------------------------------------------
diff --git a/ambari-common/src/main/python/ambari_commons/os_utils.py b/ambari-common/src/main/python/ambari_commons/os_utils.py
index c6a3832..9126a5b 100644
--- a/ambari-common/src/main/python/ambari_commons/os_utils.py
+++ b/ambari-common/src/main/python/ambari_commons/os_utils.py
@@ -94,11 +94,11 @@ def set_file_permissions(file, mod, user, recursive):
   else:
     print_info_msg("File %s does not exist" % file)
 
-def run_os_command(cmd, env=None):
-  return os_run_os_command(cmd, env, False)
+def run_os_command(cmd, env=None, cwd=None):
+  return os_run_os_command(cmd, env, False, cwd)
 
-def run_in_shell(cmd, env=None):
-  return os_run_os_command(cmd, env, True)
+def run_in_shell(cmd, env=None, cwd=None):
+  return os_run_os_command(cmd, env, True, cwd)
 
 def is_root():
   return os_is_root()

http://git-wip-us.apache.org/repos/asf/ambari/blob/65b5b9c2/ambari-common/src/main/python/ambari_commons/os_windows.py
----------------------------------------------------------------------
diff --git a/ambari-common/src/main/python/ambari_commons/os_windows.py b/ambari-common/src/main/python/ambari_commons/os_windows.py
index e413b8e..190c817 100644
--- a/ambari-common/src/main/python/ambari_commons/os_windows.py
+++ b/ambari-common/src/main/python/ambari_commons/os_windows.py
@@ -32,16 +32,18 @@ import ctypes
 import msvcrt
 
 import pywintypes
-import winerror
 import win32api
 import win32con
 import win32event
+import win32file
 import win32net
 import win32netcon
 import win32process
 import win32security
 import win32service
 import win32serviceutil
+import winerror
+import winioctlcon
 import wmi
 
 from ambari_commons.exceptions import FatalException
@@ -78,6 +80,159 @@ def symlink(source, link_name):
 
 os.symlink = symlink
 
+# Win32file doesn't seem to have this attribute.
+FILE_ATTRIBUTE_REPARSE_POINT = 1024
+# To make things easier.
+REPARSE_FOLDER = (win32file.FILE_ATTRIBUTE_DIRECTORY | FILE_ATTRIBUTE_REPARSE_POINT)
+
+# For the parse_reparse_buffer function
+SYMBOLIC_LINK = 'symbolic'
+MOUNTPOINT = 'mountpoint'
+GENERIC = 'generic'
+
+def islink(fpath):
+  """ Windows islink implementation. """
+  if win32file.GetFileAttributes(fpath) & REPARSE_FOLDER == REPARSE_FOLDER:
+    return True
+  return False
+
+os.path.islink = islink
+
+def _parse_reparse_buffer(original, reparse_type=SYMBOLIC_LINK):
+  """ Implementing the below in Python:
+
+  typedef struct _REPARSE_DATA_BUFFER {
+      ULONG  ReparseTag;
+      USHORT ReparseDataLength;
+      USHORT Reserved;
+      union {
+          struct {
+              USHORT SubstituteNameOffset;
+              USHORT SubstituteNameLength;
+              USHORT PrintNameOffset;
+              USHORT PrintNameLength;
+              ULONG Flags;
+              WCHAR PathBuffer[1];
+          } SymbolicLinkReparseBuffer;
+          struct {
+              USHORT SubstituteNameOffset;
+              USHORT SubstituteNameLength;
+              USHORT PrintNameOffset;
+              USHORT PrintNameLength;
+              WCHAR PathBuffer[1];
+          } MountPointReparseBuffer;
+          struct {
+              UCHAR  DataBuffer[1];
+          } GenericReparseBuffer;
+      } DUMMYUNIONNAME;
+  } REPARSE_DATA_BUFFER, *PREPARSE_DATA_BUFFER;
+
+  """
+  # Size of our data types
+  SZULONG = 4 # sizeof(ULONG)
+  SZUSHORT = 2 # sizeof(USHORT)
+
+  # Our structure.
+  # Probably a better way to iterate a dictionary in a particular order,
+  # but I was in a hurry, unfortunately, so I used pkeys.
+  buffer = {
+    'tag' : SZULONG,
+    'data_length' : SZUSHORT,
+    'reserved' : SZUSHORT,
+    SYMBOLIC_LINK : {
+      'substitute_name_offset' : SZUSHORT,
+      'substitute_name_length' : SZUSHORT,
+      'print_name_offset' : SZUSHORT,
+      'print_name_length' : SZUSHORT,
+      'flags' : SZULONG,
+      'buffer' : u'',
+      'pkeys' : [
+        'substitute_name_offset',
+        'substitute_name_length',
+        'print_name_offset',
+        'print_name_length',
+        'flags',
+        ]
+    },
+    MOUNTPOINT : {
+      'substitute_name_offset' : SZUSHORT,
+      'substitute_name_length' : SZUSHORT,
+      'print_name_offset' : SZUSHORT,
+      'print_name_length' : SZUSHORT,
+      'buffer' : u'',
+      'pkeys' : [
+        'substitute_name_offset',
+        'substitute_name_length',
+        'print_name_offset',
+        'print_name_length',
+        ]
+    },
+    GENERIC : {
+      'pkeys' : [],
+      'buffer': ''
+    }
+  }
+
+  # Header stuff
+  buffer['tag'] = original[:SZULONG]
+  buffer['data_length'] = original[SZULONG:SZUSHORT]
+  buffer['reserved'] = original[SZULONG+SZUSHORT:SZUSHORT]
+  original = original[8:]
+
+  # Parsing
+  k = reparse_type
+  for c in buffer[k]['pkeys']:
+    if type(buffer[k][c]) == int:
+      sz = buffer[k][c]
+      bytes = original[:sz]
+      buffer[k][c] = 0
+      for b in bytes:
+        n = ord(b)
+        if n:
+          buffer[k][c] += n
+      original = original[sz:]
+
+  # Using the offset and length's grabbed, we'll set the buffer.
+  buffer[k]['buffer'] = original
+  return buffer
+
+def readlink(fpath):
+  """ Windows readlink implementation. """
+  # This wouldn't return true if the file didn't exist, as far as I know.
+  if not islink(fpath):
+    return None
+
+  try:
+    # Open the file correctly depending on the string type.
+    if type(fpath) == unicode:
+      handle = win32file.CreateFileW(fpath, win32file.GENERIC_READ, 0, None, win32file.OPEN_EXISTING, win32file.FILE_FLAG_OPEN_REPARSE_POINT | win32file.FILE_FLAG_BACKUP_SEMANTICS, 0)
+    else:
+      handle = win32file.CreateFile(fpath, win32file.GENERIC_READ, 0, None, win32file.OPEN_EXISTING, win32file.FILE_FLAG_OPEN_REPARSE_POINT | win32file.FILE_FLAG_BACKUP_SEMANTICS, 0)
+
+    # MAXIMUM_REPARSE_DATA_BUFFER_SIZE = 16384 = (16*1024)
+    buffer = win32file.DeviceIoControl(handle, winioctlcon.FSCTL_GET_REPARSE_POINT, None, 16*1024)
+    # Above will return an ugly string (byte array), so we'll need to parse it.
+
+    # But first, we'll close the handle to our file so we're not locking it anymore.
+    win32file.CloseHandle(handle)
+
+    # Minimum possible length (assuming that the length of the target is bigger than 0)
+    if len(buffer) < 9:
+      return None
+    # Parse and return our result.
+    result = _parse_reparse_buffer(buffer)
+    offset = result[SYMBOLIC_LINK]['substitute_name_offset']
+    ending = offset + result[SYMBOLIC_LINK]['substitute_name_length']
+    rpath = result[SYMBOLIC_LINK]['buffer'][offset:ending].replace('\x00','')
+    if len(rpath) > 4 and rpath[0:4] == '\\??\\':
+      rpath = rpath[4:]
+    return rpath
+  except pywintypes.error, e:
+    raise OSError(e.winerror, e.strerror, fpath)
+
+os.readlink = readlink
+
+
 class OSVERSIONINFOEXW(ctypes.Structure):
     _fields_ = [('dwOSVersionInfoSize', ctypes.c_ulong),
                 ('dwMajorVersion', ctypes.c_ulong),
@@ -182,7 +337,7 @@ def run_os_command_impersonated(cmd, user, password, domain='.'):
 
   return exitcode, out, err
 
-def os_run_os_command(cmd, env=None, shell=False):
+def os_run_os_command(cmd, env=None, shell=False, cwd=None):
   if isinstance(cmd,basestring):
     cmd = cmd.replace("\\", "\\\\")
     cmd = shlex.split(cmd)
@@ -191,6 +346,7 @@ def os_run_os_command(cmd, env=None, shell=False):
                              stdin=subprocess.PIPE,
                              stderr=subprocess.PIPE,
                              env=env,
+                             cwd=cwd,
                              shell=shell
   )
   (stdoutdata, stderrdata) = process.communicate()

http://git-wip-us.apache.org/repos/asf/ambari/blob/65b5b9c2/ambari-common/src/main/python/resource_management/libraries/functions/install_hdp_msi.py
----------------------------------------------------------------------
diff --git a/ambari-common/src/main/python/resource_management/libraries/functions/install_hdp_msi.py b/ambari-common/src/main/python/resource_management/libraries/functions/install_hdp_msi.py
index 8f32378..0fa72fe 100644
--- a/ambari-common/src/main/python/resource_management/libraries/functions/install_hdp_msi.py
+++ b/ambari-common/src/main/python/resource_management/libraries/functions/install_hdp_msi.py
@@ -161,7 +161,7 @@ def install_windows_msi(msi_url, save_dir, save_file, hadoop_password, stack_ver
   save_dir = os.path.abspath(save_dir)
   msi_save_dir = save_dir
   # system wide lock to prevent simultaneous installations(when first task failed on timeout)
-  install_lock = SystemWideLock("hdp_msi_lock")
+  install_lock = SystemWideLock("Global\\hdp_msi_lock")
   try:
     # try to acquire lock
     if not install_lock.lock():

http://git-wip-us.apache.org/repos/asf/ambari/blob/65b5b9c2/ambari-server/src/main/python/setupAgent.py
----------------------------------------------------------------------
diff --git a/ambari-server/src/main/python/setupAgent.py b/ambari-server/src/main/python/setupAgent.py
index 70dd91f..b3825ad 100755
--- a/ambari-server/src/main/python/setupAgent.py
+++ b/ambari-server/src/main/python/setupAgent.py
@@ -25,27 +25,199 @@ import logging
 import os
 import subprocess
 
-from ambari_commons import OSCheck
+from ambari_commons import OSCheck, OSConst
+from ambari_commons.os_family_impl import OsFamilyFuncImpl, OsFamilyImpl
+
+if OSCheck.is_windows_family():
+  import urllib2
+
+  from ambari_commons.exceptions import FatalException
+  from ambari_commons.inet_utils import force_download_file
+  from ambari_commons.os_utils import run_os_command
 
 
 AMBARI_PASSPHRASE_VAR = "AMBARI_PASSPHRASE"
+PROJECT_VERSION_DEFAULT = "DEFAULT"
+
+def _init_ambari_agent_symlink():
+  installationDrive = os.path.splitdrive(__file__.replace('/', os.sep))[0]
+  return os.path.join(installationDrive, os.sep, "ambari", "ambari-agent")
+
+AMBARI_AGENT_INSTALL_SYMLINK = _init_ambari_agent_symlink()
+INSTALL_MARKER_OK = "ambari-agent.installed"
+
+def _ret_init(ret):
+  if not ret:
+    ret = {'exitstatus': 0, 'log': ('', '')}
+  return ret
+
+def _ret_append_stdout(ret, stdout):
+  temp_stdout = ret['log'][0]
+  temp_stderr = ret['log'][1]
+  if stdout:
+    if temp_stdout:
+      temp_stdout += "\n"
+    temp_stdout += stdout
+  ret['log'] = (temp_stdout, temp_stderr)
+
+def _ret_append_stderr(ret, stderr):
+  temp_stdout = ret['log'][0]
+  temp_stderr = ret['log'][1]
+  if stderr:
+    if temp_stderr:
+      temp_stderr += "\n"
+    temp_stderr += stderr
+  ret['log'] = (temp_stdout, temp_stderr)
+
+def _ret_merge(ret, retcode, stdout, stderr):
+  ret['exitstatus'] = retcode
+  temp_stdout = ret['log'][0]
+  temp_stderr = ret['log'][1]
+  if stdout:
+    if temp_stdout:
+      temp_stdout += "\n"
+    temp_stdout += stdout
+  if stderr:
+    if temp_stderr:
+      temp_stderr += "\n"
+    temp_stderr += stderr
+  ret['log'] = (temp_stdout, temp_stderr)
+  return ret
+
+def _ret_merge2(ret, ret2):
+  return _ret_merge(ret, ret2['exitstatus'], ret['log'][0], ret['log'][1])
+
 
+@OsFamilyFuncImpl(OSConst.WINSRV_FAMILY)
+def execOsCommand(osCommand, tries=1, try_sleep=0, ret=None, cwd=None):
+  ret = _ret_init(ret)
+
+  for i in range(0, tries):
+    if i > 0:
+      time.sleep(try_sleep)
+
+    retcode, stdout, stderr = run_os_command(osCommand, cwd=cwd)
+    _ret_merge(ret, retcode, stdout, stderr)
+    if retcode == 0:
+      break
+
+    _ret_append_stdout("\nRetrying " + str(osCommand))
+
+  return ret
+
+@OsFamilyFuncImpl(OsFamilyImpl.DEFAULT)
+def execOsCommand(osCommand, tries=1, try_sleep=0, ret=None, cwd=None):
+  ret = _ret_init(ret)
 
-def execOsCommand(osCommand, tries=1, try_sleep=0):
   for i in range(0, tries):
     if i>0:
       time.sleep(try_sleep)
-    
-    osStat = subprocess.Popen(osCommand, stdout=subprocess.PIPE)
+
+    osStat = subprocess.Popen(osCommand, stdout=subprocess.PIPE, cwd=cwd)
     log = osStat.communicate(0)
     ret = {"exitstatus": osStat.returncode, "log": log}
-    
+
     if ret['exitstatus'] == 0:
       break
-      
+
+  return ret
+
+def _download_file(url, destFilePath, progress_function=None, ret=None):
+  ret = _ret_init(ret)
+
+  if os.path.exists(destFilePath):
+    _ret_append_stdout(ret, "\nFile {0} already exists, assuming it was downloaded before".format(destFilePath))
+  else:
+    try:
+      #Intrinsically reliable and resumable. Downloads to a temp file and renames the tem file to the destination file
+      # upon successful termination.
+      force_download_file(url, destFilePath, 16 * 1024, progress_function)
+    except FatalException, e:
+      _ret_merge(ret, e.code, None, "Failed to download {0} -> {1} : {2}".format(url, destFilePath, e.reason))
+    except urllib2.URLError, ue:
+      _ret_merge(ret, 2, None, "Failed to download {0} -> {1} : {2}".format(url, destFilePath, ue.reason))
+  return ret
+
+
+def _create_agent_symlink(symlinkPath, agentInstallDir, ret):
+  ret = _ret_init(ret)
+
+  symLinkCreationAttempts = 0
+  while (symLinkCreationAttempts < 1000):
+    # Handle contention from other bootstrap processes
+    try:
+      os.rmdir(symlinkPath)
+    except OSError:
+      #It's ok to attempt to delete a non-existing link
+      pass
+
+    try:
+      os.symlink(agentInstallDir, symlinkPath)
+      if os.readlink(symlinkPath) == agentInstallDir:
+        break
+    except OSError:
+      pass
+
+    symLinkCreationAttempts += 1
+  if symLinkCreationAttempts == 1000:
+    _ret_merge(ret, 1000, '',
+               'Failed creating the symbolic link {0} because of contention.'.format(AMBARI_AGENT_INSTALL_SYMLINK))
   return ret
 
 
+@OsFamilyFuncImpl(OSConst.WINSRV_FAMILY)
+def installAgent(url, downloadDir, projectVersion, ret=None):
+  """ Download the agent msi and install it
+  :param url:
+  :param projectVersion:
+  :return: {"exitstatus": exit code, "log": log records string}
+  """
+  ret = _ret_init(ret)
+
+  installationDrive = os.path.splitdrive(__file__.replace('/', os.sep))[0]
+
+  agentInstallDir = os.path.join(installationDrive, os.sep, "ambari", "ambari-agent-" + projectVersion)
+  agentInstallMarkerFile = os.path.join(agentInstallDir, INSTALL_MARKER_OK)
+
+  if not os.path.exists(agentInstallMarkerFile):
+    destMsiFilePath = os.path.join(downloadDir, "ambari-agent-{0}.msi".format(projectVersion))
+    ret = _download_file(url, destMsiFilePath, ret=ret)
+    if ret['exitstatus'] != 0:
+      return ret
+
+    #ambari-agent-<version>.msi downloaded, proceed to the installation
+    installLogPath = os.path.join(downloadDir, "ambari-agent-{0}.install.log".format(projectVersion))
+    installCmd = [
+      "cmd",
+      "/c",
+      "start",
+      "/wait",
+      "msiexec",
+      "/i", destMsiFilePath,
+      "AGENT_INSTALL_DIRECTORY=" + agentInstallDir,
+      "/qn",
+      "/Lv", installLogPath]
+    ret = execOsCommand(installCmd, tries=3, try_sleep=10, ret=ret)
+    if ret['exitstatus'] != 0:
+      #TODO Check if the product was already installed. Only machine reimage can repair a broken installation.
+      return ret
+
+    try:
+      if os.readlink(AMBARI_AGENT_INSTALL_SYMLINK) != agentInstallDir:
+        ret = _create_agent_symlink(AMBARI_AGENT_INSTALL_SYMLINK, agentInstallDir, ret)
+    except OSError:
+      ret = _create_agent_symlink(AMBARI_AGENT_INSTALL_SYMLINK, agentInstallDir, ret)
+    if ret['exitstatus'] != 0:
+      return ret
+
+    try:
+      open(agentInstallMarkerFile, "w+").close()
+    except IOError:
+      pass
+
+  return ret
+
+@OsFamilyFuncImpl(OsFamilyImpl.DEFAULT)
 def installAgent(projectVersion):
   """ Run install and make sure the agent install alright """
   # The command doesn't work with file mask ambari-agent*.rpm, so rename it on agent host
@@ -59,6 +231,13 @@ def installAgent(projectVersion):
   return execOsCommand(Command, tries=3, try_sleep=10)
 
 
+@OsFamilyFuncImpl(OSConst.WINSRV_FAMILY)
+def configureAgent(server_hostname, cwd, ret=None):
+  #Customize ambari-agent.ini & register the Ambari Agent service
+  agentSetupCmd = ["cmd", "/c", "ambari-agent.cmd", "setup", "--hostname=" + server_hostname]
+  return execOsCommand(agentSetupCmd, tries=3, try_sleep=10, cwd=AMBARI_AGENT_INSTALL_SYMLINK, ret=ret)
+
+@OsFamilyFuncImpl(OsFamilyImpl.DEFAULT)
 def configureAgent(server_hostname, user_run_as):
   """ Configure the agent so that it has all the configs knobs properly installed """
   osCommand = ["sed", "-i.bak", "s/hostname=localhost/hostname=" + server_hostname +
@@ -71,6 +250,16 @@ def configureAgent(server_hostname, user_run_as):
   ret = execOsCommand(osCommand)
   return ret
 
+
+#Windows-specific
+def runAgentService(ret=None):
+  ret = _ret_init(ret)
+
+  #Invoke ambari-agent restart as a child process
+  agentRestartCmd = ["cmd", "/c", "ambari-agent.cmd", "restart"]
+  return execOsCommand(agentRestartCmd, tries=3, try_sleep=10, cwd=AMBARI_AGENT_INSTALL_SYMLINK, ret=ret)
+
+#Linux-specific
 def runAgent(passPhrase, expected_hostname, user_run_as, verbose):
   os.environ[AMBARI_PASSPHRASE_VAR] = passPhrase
   vo = ""
@@ -100,8 +289,20 @@ def tryStopAgent():
   if execOsCommand(cmds)["exitstatus"] == 0 or execOsCommand(cmdl)["exitstatus"] == 0:
     verbose = True
   subprocess.call("/usr/sbin/ambari-agent stop", shell=True)
-  return verbose  
+  return verbose
 
+@OsFamilyFuncImpl(OSConst.WINSRV_FAMILY)
+def getOptimalVersion(initialProjectVersion):
+  if initialProjectVersion == "null" or initialProjectVersion == "{ambariVersion}" or \
+          initialProjectVersion == PROJECT_VERSION_DEFAULT or not initialProjectVersion:
+    #Extract the project version form the current script path
+    scriptPath = os.path.dirname(__file__.replace('/', os.sep))
+    optimalVersion = os.path.split(scriptPath)[1]
+  else:
+    optimalVersion = initialProjectVersion
+  return optimalVersion
+
+@OsFamilyFuncImpl(OsFamilyImpl.DEFAULT)
 def getOptimalVersion(initialProjectVersion):
   optimalVersion = initialProjectVersion
   ret = findNearestAgentPackageVersion(optimalVersion)
@@ -174,8 +375,31 @@ def checkServerReachability(host, port):
                 "Please check the network connectivity between the Ambari Agent host and the Ambari Server"
   return ret
 
+#  Command line syntax help - Windows
+# IsOptional  Index     Description
+#               0        ambari-agent.msi URL
+#               1        Server host name
+#      X        2        Project version (Ambari)
+
+@OsFamilyFuncImpl(OSConst.WINSRV_FAMILY)
+def parseArguments(argv=None):
+  if argv is None:  # make sure that arguments was passed
+    return {"exitstatus": 2, "log": "No arguments were passed"}
+  args = argv[1:]  # shift path to script
+  if len(args) < 2:
+    return {"exitstatus": 1, "log": "Not all required arguments were passed"}
 
-#  Command line syntax help
+  agentUrl = args[0]
+  serverHostname = args[1]
+  projectVersion = PROJECT_VERSION_DEFAULT
+
+  if len(args) > 2:
+    projectVersion = args[2]
+
+  parsed_args = (agentUrl, serverHostname, projectVersion)
+  return {"exitstatus": 0, "log": ("", ""), "parsed_args": parsed_args}
+
+#  Command line syntax help - Linux
 # IsOptional  Index     Description
 #               0        Expected host name
 #               1        Password
@@ -185,6 +409,7 @@ def checkServerReachability(host, port):
 #      X        5        Server port
 
 
+@OsFamilyFuncImpl(OsFamilyImpl.DEFAULT)
 def parseArguments(argv=None):
   if argv is None:  # make sure that arguments was passed
     return {"exitstatus": 2, "log": "No arguments were passed"}
@@ -212,6 +437,46 @@ def parseArguments(argv=None):
   return {"exitstatus": 0, "log": "", "parsed_args": parsed_args}
 
 
+@OsFamilyFuncImpl(OSConst.WINSRV_FAMILY)
+def run_setup(argv=None):
+  """
+  if the Agent is not downloaded or the download was interrupted
+    download the Agent msi package
+  install the Agent from the msi package
+  customize the Agent configuration
+  register the Ambari Agent Windows service
+  if JDK is not installed on the local machine
+    download and install JDK
+  set the machine-wide JAVA_HOME environment variable
+  if the Agent service is running from a previous session
+    stop the Agent service
+  create/switch the Agent dir symbolic link to the new version
+  start the Agent service
+  """
+
+  # Parse passed arguments
+  retcode = parseArguments(argv)
+  if (retcode["exitstatus"] != 0):
+    return retcode
+
+  (agent_url, server_hostname, projectVersion) = retcode["parsed_args"]
+
+  availableProjectVersion = getOptimalVersion(projectVersion)
+
+  retcode = installAgent(agent_url, os.getcwd(), availableProjectVersion, retcode)
+  if (not retcode["exitstatus"] == 0):
+    return retcode
+
+  retcode = configureAgent(server_hostname, retcode)
+  if retcode['exitstatus'] != 0:
+    return retcode
+
+  #TODO Install the JDK
+  #install_jdk(jdk_url, java_home_dir, jdk_name, ret)
+
+  return runAgentService(retcode)
+
+@OsFamilyFuncImpl(OsFamilyImpl.DEFAULT)
 def run_setup(argv=None):
   # Parse passed arguments
   retcode = parseArguments(argv)
@@ -247,6 +512,16 @@ def run_setup(argv=None):
     return retcode
   return runAgent(passPhrase, expected_hostname, user_run_as, verbose)
 
+
+@OsFamilyFuncImpl(OSConst.WINSRV_FAMILY)
+def main(argv=None):
+  try:
+    exitcode = run_setup(argv)
+  except Exception, e:
+    exitcode = {"exitstatus": -1, "log": str(e)}
+  return exitcode
+
+@OsFamilyFuncImpl(OsFamilyImpl.DEFAULT)
 def main(argv=None):
   #Try stop agent and check --verbose option if agent already run
   global verbose

http://git-wip-us.apache.org/repos/asf/ambari/blob/65b5b9c2/ambari-server/src/test/python/TestSetupAgent.py
----------------------------------------------------------------------
diff --git a/ambari-server/src/test/python/TestSetupAgent.py b/ambari-server/src/test/python/TestSetupAgent.py
index 031a642..380893f 100644
--- a/ambari-server/src/test/python/TestSetupAgent.py
+++ b/ambari-server/src/test/python/TestSetupAgent.py
@@ -20,7 +20,23 @@ from mock.mock import MagicMock
 from unittest import TestCase
 from mock.mock import patch
 import sys
-setup_agent = __import__('setupAgent')
+
+from ambari_commons import OSCheck
+from only_for_platform import get_platform, not_for_platform, only_for_platform, PLATFORM_WINDOWS, PLATFORM_LINUX
+from mock.mock import MagicMock, patch, ANY, Mock
+
+if get_platform() != PLATFORM_WINDOWS:
+  os_distro_value = ('Suse','11','Final')
+else:
+  os_distro_value = ('win2012serverr2','6.3','WindowsServer')
+
+with patch.object(OSCheck, "os_distribution", new = MagicMock(return_value = os_distro_value)):
+#  from ambari_agent import NetUtil, security
+
+#  if get_platform() != PLATFORM_WINDOWS:
+#    from ambari_commons.shell import shellRunnerLinux
+
+  setup_agent = __import__('setupAgent')
 
 class TestSetupAgent(TestCase):
 
@@ -42,6 +58,7 @@ class TestSetupAgent(TestCase):
     pass
 
 
+  @patch.object(OSCheck, "os_distribution", new = MagicMock(return_value = os_distro_value))
   @patch.object(setup_agent, 'execOsCommand')
   def test_configureAgent(self, execOsCommand_mock):
     # Test if expected_hostname is passed
@@ -110,6 +127,7 @@ class TestSetupAgent(TestCase):
     execOsCommand_mock.reset_mock()
     pass
 
+  @patch.object(OSCheck, "os_distribution", new = MagicMock(return_value = os_distro_value))
   @patch.object(setup_agent, 'getAvaliableAgentPackageVersions')
   @patch('ambari_commons.OSCheck.is_suse_family')
   @patch('ambari_commons.OSCheck.is_ubuntu_family')
@@ -126,6 +144,7 @@ class TestSetupAgent(TestCase):
     self.assertTrue(result_version["exitstatus"] == 1)
     pass
 
+  @patch.object(OSCheck, "os_distribution", new = MagicMock(return_value = os_distro_value))
   @patch.object(setup_agent, 'getAvaliableAgentPackageVersions')
   @patch('ambari_commons.OSCheck.is_suse_family')
   @patch('ambari_commons.OSCheck.is_ubuntu_family')
@@ -142,6 +161,7 @@ class TestSetupAgent(TestCase):
     self.assertTrue(result_version["exitstatus"] == 1)
     pass
 
+  @patch.object(OSCheck, "os_distribution", new = MagicMock(return_value = os_distro_value))
   @patch('ambari_commons.OSCheck.is_suse_family')
   @patch('ambari_commons.OSCheck.is_ubuntu_family')
   @patch.object(setup_agent, 'findNearestAgentPackageVersion')
@@ -163,6 +183,7 @@ class TestSetupAgent(TestCase):
     self.assertTrue(result_version["exitstatus"] == 1)
     pass
 
+  @patch.object(OSCheck, "os_distribution", new = MagicMock(return_value = os_distro_value))
   @patch('ambari_commons.OSCheck.is_suse_family')
   @patch('ambari_commons.OSCheck.is_ubuntu_family')
   @patch.object(setup_agent, 'findNearestAgentPackageVersion')
@@ -184,6 +205,7 @@ class TestSetupAgent(TestCase):
     self.assertTrue(result_version["exitstatus"] == 1)
     pass
 
+  @patch.object(OSCheck, "os_distribution", new = MagicMock(return_value = os_distro_value))
   @patch.object(setup_agent, 'getAvaliableAgentPackageVersions')
   @patch('ambari_commons.OSCheck.is_suse_family')
   @patch('ambari_commons.OSCheck.is_ubuntu_family')
@@ -201,6 +223,7 @@ class TestSetupAgent(TestCase):
     self.assertTrue(result_version["log"] == projectVersion)
     pass
 
+  @patch.object(OSCheck, "os_distribution", new = MagicMock(return_value = os_distro_value))
   @patch.object(setup_agent, 'getAvaliableAgentPackageVersions')
   @patch('ambari_commons.OSCheck.is_suse_family')
   @patch('ambari_commons.OSCheck.is_ubuntu_family')
@@ -223,10 +246,12 @@ class TestSetupAgent(TestCase):
     self.assertTrue(result_version["exitstatus"] == 1)
     pass
 
+  @patch.object(OSCheck, "os_distribution", new = MagicMock(return_value = os_distro_value))
   @patch.object(subprocess, 'Popen')
   def test_execOsCommand(self, Popen_mock):
     self.assertFalse(setup_agent.execOsCommand("hostname -f") == None)
 
+  @patch.object(OSCheck, "os_distribution", new = MagicMock(return_value = os_distro_value))
   @patch.object(setup_agent, 'tryStopAgent')
   @patch.object(setup_agent, 'isAgentPackageAlreadyInstalled')
   @patch.object(setup_agent, 'runAgent')
@@ -399,6 +424,7 @@ class TestSetupAgent(TestCase):
     self.assertTrue(execOsCommand_mock.called)
     pass
 
+  @patch.object(OSCheck, "os_distribution", new = MagicMock(return_value = os_distro_value))
   @patch.object(setup_agent, 'execOsCommand')
   def test_installAgent(self, execOsCommand_mock):
     setup_agent.installAgent("1.1.1")