You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@ambari.apache.org by nc...@apache.org on 2015/01/16 21:57:10 UTC

ambari git commit: AMBARI-9154. Upgrade pack for Flume (Yurii Shylov via ncole)

Repository: ambari
Updated Branches:
  refs/heads/trunk 8d27ac2b7 -> 7330d0eac


AMBARI-9154. Upgrade pack for Flume (Yurii Shylov via ncole)


Project: http://git-wip-us.apache.org/repos/asf/ambari/repo
Commit: http://git-wip-us.apache.org/repos/asf/ambari/commit/7330d0ea
Tree: http://git-wip-us.apache.org/repos/asf/ambari/tree/7330d0ea
Diff: http://git-wip-us.apache.org/repos/asf/ambari/diff/7330d0ea

Branch: refs/heads/trunk
Commit: 7330d0eacc2e825a441cddbfa1684e4aae6557f9
Parents: 8d27ac2
Author: Nate Cole <nc...@hortonworks.com>
Authored: Fri Jan 16 15:56:47 2015 -0500
Committer: Nate Cole <nc...@hortonworks.com>
Committed: Fri Jan 16 15:56:47 2015 -0500

----------------------------------------------------------------------
 .../1.4.0.2.0/package/scripts/flume_handler.py  | 19 ++++
 .../1.4.0.2.0/package/scripts/flume_upgrade.py  | 96 ++++++++++++++++++++
 .../FLUME/1.4.0.2.0/package/scripts/params.py   |  3 +
 .../stacks/HDP/2.2/upgrades/upgrade-2.2.xml     | 26 +++++-
 .../python/stacks/2.0.6/FLUME/test_flume.py     | 11 +++
 .../python/stacks/2.0.6/configs/flume_22.json   |  1 +
 6 files changed, 152 insertions(+), 4 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/ambari/blob/7330d0ea/ambari-server/src/main/resources/common-services/FLUME/1.4.0.2.0/package/scripts/flume_handler.py
----------------------------------------------------------------------
diff --git a/ambari-server/src/main/resources/common-services/FLUME/1.4.0.2.0/package/scripts/flume_handler.py b/ambari-server/src/main/resources/common-services/FLUME/1.4.0.2.0/package/scripts/flume_handler.py
index 849fcf1..a24e6ce 100644
--- a/ambari-server/src/main/resources/common-services/FLUME/1.4.0.2.0/package/scripts/flume_handler.py
+++ b/ambari-server/src/main/resources/common-services/FLUME/1.4.0.2.0/package/scripts/flume_handler.py
@@ -17,12 +17,15 @@ limitations under the License.
 
 """
 
+import flume_upgrade
+
 from flume import flume
 from flume import get_desired_state
 
 from resource_management import *
 from resource_management.libraries.functions.flume_agent_helper import find_expected_agent_names
 from resource_management.libraries.functions.flume_agent_helper import get_flume_status
+from resource_management.libraries.functions.version import compare_versions, format_hdp_stack_version
 
 class FlumeHandler(Script):
 
@@ -50,6 +53,9 @@ class FlumeHandler(Script):
 
     flume(action='stop')
 
+    if rolling_restart:
+      flume_upgrade.post_stop_backup()
+
   def configure(self, env):
     import params
 
@@ -79,5 +85,18 @@ class FlumeHandler(Script):
     elif len(expected_agents) == 0 and 'INSTALLED' == get_desired_state():
       raise ComponentIsNotRunning()
 
+  def pre_rolling_restart(self, env):
+    import params
+    env.set_params(params)
+
+    # this function should not execute if the version can't be determined or
+    # is not at least HDP 2.2.0.0
+    if not params.version or compare_versions(format_hdp_stack_version(params.version), '2.2.0.0') < 0:
+      return
+
+    Logger.info("Executing Flume Rolling Upgrade pre-restart")
+    Execute(format("hdp-select set flume-server {version}"))
+    flume_upgrade.pre_start_restore()
+
 if __name__ == "__main__":
   FlumeHandler().execute()

http://git-wip-us.apache.org/repos/asf/ambari/blob/7330d0ea/ambari-server/src/main/resources/common-services/FLUME/1.4.0.2.0/package/scripts/flume_upgrade.py
----------------------------------------------------------------------
diff --git a/ambari-server/src/main/resources/common-services/FLUME/1.4.0.2.0/package/scripts/flume_upgrade.py b/ambari-server/src/main/resources/common-services/FLUME/1.4.0.2.0/package/scripts/flume_upgrade.py
new file mode 100644
index 0000000..e2028fc
--- /dev/null
+++ b/ambari-server/src/main/resources/common-services/FLUME/1.4.0.2.0/package/scripts/flume_upgrade.py
@@ -0,0 +1,96 @@
+"""
+Licensed to the Apache Software Foundation (ASF) under one
+or more contributor license agreements.  See the NOTICE file
+distributed with this work for additional information
+regarding copyright ownership.  The ASF licenses this file
+to you under the Apache License, Version 2.0 (the
+"License"); you may not use this file except in compliance
+with the License.  You may obtain a copy of the License at
+
+    http://www.apache.org/licenses/LICENSE-2.0
+
+Unless required by applicable law or agreed to in writing, software
+distributed under the License is distributed on an "AS IS" BASIS,
+WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+See the License for the specific language governing permissions and
+limitations under the License.
+
+"""
+import os
+import shutil
+import tarfile
+import tempfile
+
+from resource_management.core.logger import Logger
+from resource_management.core.exceptions import Fail
+
+BACKUP_TEMP_DIR = "flume-upgrade-backup"
+BACKUP_CONF_DIR_ARCHIVE = "flume-conf-backup.tar"
+
+def post_stop_backup():
+  """
+  Backs up the flume config, config dir, file/spillable channels as part of the
+  upgrade process.
+  :return:
+  """
+  Logger.info('Backing up Flume data and configuration before upgrade...')
+  directoryMappings = _get_directory_mappings()
+
+  absolute_backup_dir = os.path.join(tempfile.gettempdir(), BACKUP_TEMP_DIR)
+  if not os.path.isdir(absolute_backup_dir):
+    os.makedirs(absolute_backup_dir)
+
+  for directory in directoryMappings:
+    if not os.path.isdir(directory):
+      raise Fail("Unable to backup missing directory {0}".format(directory))
+
+    archive = os.path.join(absolute_backup_dir, directoryMappings[directory])
+    Logger.info('Compressing {0} to {1}'.format(directory, archive))
+
+    if os.path.exists(archive):
+      os.remove(archive)
+
+    tarball = None
+    try:
+      tarball = tarfile.open(archive, "w")
+      tarball.add(directory, arcname=os.path.basename(directory))
+    finally:
+      if tarball:
+        tarball.close()
+
+def pre_start_restore():
+  """
+  Restores the flume config, config dir, file/spillable channels to their proper locations
+  after an upgrade has completed.
+  :return:
+  """
+  Logger.info('Restoring Flume data and configuration after upgrade...')
+  directoryMappings = _get_directory_mappings()
+
+  for directory in directoryMappings:
+    archive = os.path.join(tempfile.gettempdir(), BACKUP_TEMP_DIR,
+      directoryMappings[directory])
+
+    if os.path.isfile(archive):
+      Logger.info('Extracting {0} to {1}'.format(archive, directory))
+      tarball = None
+      try:
+        tarball = tarfile.open(archive, "r")
+        tarball.extractall(directory)
+      finally:
+        if tarball:
+          tarball.close()
+
+    # cleanup
+    if os.path.exists(os.path.join(tempfile.gettempdir(), BACKUP_TEMP_DIR)):
+      shutil.rmtree(os.path.join(tempfile.gettempdir(), BACKUP_TEMP_DIR))
+
+def _get_directory_mappings():
+  """
+  Gets a dictionary of directory to archive name that represents the
+  directories that need to be backed up and their output tarball archive targets
+  :return:  the dictionary of directory to tarball mappings
+  """
+  import params
+
+  return { params.flume_conf_dir : BACKUP_CONF_DIR_ARCHIVE}

http://git-wip-us.apache.org/repos/asf/ambari/blob/7330d0ea/ambari-server/src/main/resources/common-services/FLUME/1.4.0.2.0/package/scripts/params.py
----------------------------------------------------------------------
diff --git a/ambari-server/src/main/resources/common-services/FLUME/1.4.0.2.0/package/scripts/params.py b/ambari-server/src/main/resources/common-services/FLUME/1.4.0.2.0/package/scripts/params.py
index 28c4240..efa22d4 100644
--- a/ambari-server/src/main/resources/common-services/FLUME/1.4.0.2.0/package/scripts/params.py
+++ b/ambari-server/src/main/resources/common-services/FLUME/1.4.0.2.0/package/scripts/params.py
@@ -25,6 +25,9 @@ config = Script.get_config()
 
 stack_name = default("/hostLevelParams/stack_name", None)
 
+# New Cluster Stack Version that is defined during the RESTART of a Rolling Upgrade
+version = default("/commandParams/version", None)
+
 user_group = config['configurations']['cluster-env']['user_group']
 proxyuser_group =  config['configurations']['hadoop-env']['proxyuser_group']
 

http://git-wip-us.apache.org/repos/asf/ambari/blob/7330d0ea/ambari-server/src/main/resources/stacks/HDP/2.2/upgrades/upgrade-2.2.xml
----------------------------------------------------------------------
diff --git a/ambari-server/src/main/resources/stacks/HDP/2.2/upgrades/upgrade-2.2.xml b/ambari-server/src/main/resources/stacks/HDP/2.2/upgrades/upgrade-2.2.xml
index 4a4b158..325df21 100644
--- a/ambari-server/src/main/resources/stacks/HDP/2.2/upgrades/upgrade-2.2.xml
+++ b/ambari-server/src/main/resources/stacks/HDP/2.2/upgrades/upgrade-2.2.xml
@@ -118,6 +118,20 @@
       </service>
     </group>
 
+    <group name="SLIDER" title="Slider">
+      <skippable>true</skippable>
+      <service name="SLIDER">
+        <component>SLIDER</component>
+      </service>
+    </group>
+
+    <group name="FLUME" title="Flume">
+      <skippable>true</skippable>
+      <service name="FLUME">
+        <component>FLUME_HANDLER</component>
+      </service>
+    </group>
+
     <group name="CLIENTS" title="Client Components">
       <service name="HDFS">
         <component>HDFS_CLIENT</component>
@@ -147,10 +161,6 @@
         <component>HIVE_CLIENT</component>
         <component>HCAT</component>
       </service>
-
-      <service name="SLIDER">
-        <component>SLIDER</component>
-      </service>
     </group>
 
     <group xsi:type="cluster" name="POST_CLUSTER" title="Finalize Upgrade">
@@ -483,5 +493,13 @@
         </post-upgrade>
       </component>
     </service>
+
+    <service name="FLUME">
+      <component name="FLUME_HANDLER">
+        <upgrade>
+          <task xsi:type="restart" />
+        </upgrade>
+      </component>
+    </service>
   </processing>
 </upgrade>

http://git-wip-us.apache.org/repos/asf/ambari/blob/7330d0ea/ambari-server/src/test/python/stacks/2.0.6/FLUME/test_flume.py
----------------------------------------------------------------------
diff --git a/ambari-server/src/test/python/stacks/2.0.6/FLUME/test_flume.py b/ambari-server/src/test/python/stacks/2.0.6/FLUME/test_flume.py
index 193eefb..ada29c2 100644
--- a/ambari-server/src/test/python/stacks/2.0.6/FLUME/test_flume.py
+++ b/ambari-server/src/test/python/stacks/2.0.6/FLUME/test_flume.py
@@ -410,6 +410,17 @@ class TestFlumeHandler(RMFTestCase):
                               owner="flume",
                               content=content)
 
+  def test_pre_rolling_restart(self):
+    self.executeScript(self.COMMON_SERVICES_PACKAGE_DIR + "/scripts/flume_handler.py",
+                       classname = "FlumeHandler",
+                       command = "pre_rolling_restart",
+                       config_file="flume_22.json",
+                       hdp_stack_version = self.STACK_VERSION,
+                       target = RMFTestCase.TARGET_COMMON_SERVICES)
+
+    self.assertResourceCalled("Execute", "hdp-select set flume-server 2.2.1.0-2067")
+
+
 def build_flume(content):
   result = {}
   agent_names = []

http://git-wip-us.apache.org/repos/asf/ambari/blob/7330d0ea/ambari-server/src/test/python/stacks/2.0.6/configs/flume_22.json
----------------------------------------------------------------------
diff --git a/ambari-server/src/test/python/stacks/2.0.6/configs/flume_22.json b/ambari-server/src/test/python/stacks/2.0.6/configs/flume_22.json
index 3a7aa33..909e965 100644
--- a/ambari-server/src/test/python/stacks/2.0.6/configs/flume_22.json
+++ b/ambari-server/src/test/python/stacks/2.0.6/configs/flume_22.json
@@ -23,6 +23,7 @@
     "serviceName": "HIVE",
     "role": "HIVE_SERVER",
     "commandParams": {
+        "version": "2.2.1.0-2067",
         "command_timeout": "300", 
         "service_package_folder": "OOZIE",
         "script_type": "PYTHON",