You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@bigtop.apache.org by kw...@apache.org on 2017/04/12 20:49:41 UTC

bigtop git commit: BIGTOP-2737: Spark charm doesn't handle HA or examples well (closes #194)

Repository: bigtop
Updated Branches:
  refs/heads/master da2c4292f -> c6bd2a2d9


BIGTOP-2737: Spark charm doesn't handle HA or examples well (closes #194)

Signed-off-by: Kevin W Monroe <ke...@canonical.com>


Project: http://git-wip-us.apache.org/repos/asf/bigtop/repo
Commit: http://git-wip-us.apache.org/repos/asf/bigtop/commit/c6bd2a2d
Tree: http://git-wip-us.apache.org/repos/asf/bigtop/tree/c6bd2a2d
Diff: http://git-wip-us.apache.org/repos/asf/bigtop/diff/c6bd2a2d

Branch: refs/heads/master
Commit: c6bd2a2d96c8e57167939abc65b7844931a26bc7
Parents: da2c429
Author: Kevin W Monroe <ke...@canonical.com>
Authored: Tue Mar 28 20:46:17 2017 +0000
Committer: Kevin W Monroe <ke...@canonical.com>
Committed: Wed Apr 12 15:47:49 2017 -0500

----------------------------------------------------------------------
 .../src/charm/spark/layer-spark/actions.yaml    |  11 +-
 .../charm/spark/layer-spark/actions/pagerank    | 135 ++++++++++++-
 .../src/charm/spark/layer-spark/actions/sparkpi |  15 +-
 .../src/charm/spark/layer-spark/config.yaml     |  20 +-
 .../lib/charms/layer/bigtop_spark.py            |  89 +++++----
 .../charm/spark/layer-spark/reactive/spark.py   | 188 ++++++++++++-------
 6 files changed, 341 insertions(+), 117 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/bigtop/blob/c6bd2a2d/bigtop-packages/src/charm/spark/layer-spark/actions.yaml
----------------------------------------------------------------------
diff --git a/bigtop-packages/src/charm/spark/layer-spark/actions.yaml b/bigtop-packages/src/charm/spark/layer-spark/actions.yaml
index 6564b1c..7f0961d 100644
--- a/bigtop-packages/src/charm/spark/layer-spark/actions.yaml
+++ b/bigtop-packages/src/charm/spark/layer-spark/actions.yaml
@@ -1,5 +1,12 @@
+pagerank:
+    description: Calculate PageRank for a sample data set
+    params:
+        iterations:
+            description: Number of iterations for the SparkPageRank job
+            type: string
+            default: "1"
 smoke-test:
-    description: Verify that Spark is working by calculating pi.
+    description: Verify that Spark is working by calculating pi
 sparkpi:
     description: Calculate Pi
     params:
@@ -19,8 +26,6 @@ logisticregression:
     description: Run the Spark Bench LogisticRegression benchmark.
 matrixfactorization:
     description: Run the Spark Bench MatrixFactorization benchmark.
-pagerank:
-    description: Run the Spark Bench PageRank benchmark.
 pca:
     description: Run the Spark Bench PCA benchmark.
 pregeloperation:

http://git-wip-us.apache.org/repos/asf/bigtop/blob/c6bd2a2d/bigtop-packages/src/charm/spark/layer-spark/actions/pagerank
----------------------------------------------------------------------
diff --git a/bigtop-packages/src/charm/spark/layer-spark/actions/pagerank b/bigtop-packages/src/charm/spark/layer-spark/actions/pagerank
deleted file mode 120000
index 9e15049..0000000
--- a/bigtop-packages/src/charm/spark/layer-spark/actions/pagerank
+++ /dev/null
@@ -1 +0,0 @@
-sparkbench
\ No newline at end of file
diff --git a/bigtop-packages/src/charm/spark/layer-spark/actions/pagerank b/bigtop-packages/src/charm/spark/layer-spark/actions/pagerank
new file mode 100755
index 0000000..b2d85e6
--- /dev/null
+++ b/bigtop-packages/src/charm/spark/layer-spark/actions/pagerank
@@ -0,0 +1,134 @@
+#!/usr/bin/env python3
+# Licensed to the Apache Software Foundation (ASF) under one or more
+# contributor license agreements.  See the NOTICE file distributed with
+# this work for additional information regarding copyright ownership.
+# The ASF licenses this file to You under the Apache License, Version 2.0
+# (the "License"); you may not use this file except in compliance with
+# the License.  You may obtain a copy of the License at
+#
+#     http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing, software
+# distributed under the License is distributed on an "AS IS" BASIS,
+# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+# See the License for the specific language governing permissions and
+# limitations under the License.
+
+import os
+import subprocess
+import sys
+
+from path import Path
+from time import time
+
+from charmhelpers.contrib.benchmark import Benchmark
+from charmhelpers.core import hookenv
+from charms.reactive import is_state
+from jujubigdata import utils
+
+
+def fail(msg, output):
+    print(msg)
+    hookenv.action_set({'output': output})
+    hookenv.action_fail(msg)
+    sys.exit(1)
+
+
+def main():
+    bench = Benchmark()
+
+    if not is_state('spark.started'):
+        msg = 'Spark is not started yet'
+        fail(msg, 'error')
+
+    # gather params and create dir to store results
+    num_iter = hookenv.action_get('iterations')
+    run = int(time())
+    result_dir = Path('/opt/sparkpagerank-results')
+    result_log = result_dir / '{}.log'.format(run)
+    if not result_dir.exists():
+        result_dir.mkdir()
+    result_dir.chown('ubuntu', 'ubuntu')
+    hookenv.log("values: {} {}".format(num_iter, result_log))
+
+    sample = "/home/ubuntu/SparkBench/PageRank/web-Google.txt"
+    if not os.path.isfile(sample):
+        msg = 'Could not find pagerank sample data'
+        fail('{}: {}'.format(msg, sample), 'error')
+
+    # Benchmark input data is packed into our sparkbench.tgz, which makes
+    # it available on all spark units. In yarn mode, however, the nodemanagers
+    # act as the spark workers and will not have access to this local data.
+    # In yarn mode, copy our input data to hdfs so nodemanagers can access it.
+    mode = hookenv.config()['spark_execution_mode']
+    if mode.startswith('yarn'):
+        if is_state('hadoop.hdfs.ready'):
+            try:
+                utils.run_as('ubuntu',
+                             'hdfs', 'dfs', '-put', '-f', sample, '/user/ubuntu',
+                             capture_output=True)
+            except subprocess.CalledProcessError as e:
+                msg = 'Unable to copy pagerank sample data to hdfs'
+                fail('{}: {}'.format(msg, e), 'error')
+            else:
+                sample = "/user/ubuntu/web-Google.txt"
+        else:
+            msg = 'Spark is configured for yarn mode, but HDFS is not ready yet'
+            fail(msg, 'error')
+
+    # find jar location
+    spark_home = "/usr/lib/spark"
+    example_jar_name = "spark-examples.jar"
+    example_jar_path = None
+    for root, dirs, files in os.walk(spark_home):
+        if example_jar_name in files:
+            example_jar_path = os.path.join(root, example_jar_name)
+
+    if not example_jar_path:
+        msg = 'Could not find {}'.format(example_jar_name)
+        fail(msg, 'error')
+
+    print('Calculating PageRank')
+    bench.start()
+    start = int(time())
+
+    with open(result_log, 'w') as log_file:
+        arg_list = [
+            'spark-submit',
+            '--class',
+            'org.apache.spark.examples.SparkPageRank',
+            example_jar_path,
+            sample,
+            num_iter,
+        ]
+
+        try:
+            subprocess.check_call(arg_list, stdout=log_file,
+                                  stderr=subprocess.STDOUT)
+        except subprocess.CalledProcessError as e:
+            msg = 'SparkPageRank command failed: {}'.format(' '.join(arg_list))
+            fail('{}: {}'.format(msg, e), 'error')
+
+    stop = int(time())
+    bench.finish()
+
+    duration = stop - start
+    bench.set_composite_score(duration, 'secs')
+    subprocess.check_call(['benchmark-raw', result_log])
+
+    with open(result_log) as log:
+        success = False
+        for line in log.readlines():
+            if 'rank' in line:
+                success = True
+                break
+
+    if not success:
+        msg = 'Spark-submit failed to calculate pagerank'
+        fail(msg, 'error')
+
+    hookenv.action_set({'output': {'status': 'completed'}})
+
+
+if __name__ == '__main__':
+    main()

http://git-wip-us.apache.org/repos/asf/bigtop/blob/c6bd2a2d/bigtop-packages/src/charm/spark/layer-spark/actions/sparkpi
----------------------------------------------------------------------
diff --git a/bigtop-packages/src/charm/spark/layer-spark/actions/sparkpi b/bigtop-packages/src/charm/spark/layer-spark/actions/sparkpi
index 9afceaf..99fded6 100755
--- a/bigtop-packages/src/charm/spark/layer-spark/actions/sparkpi
+++ b/bigtop-packages/src/charm/spark/layer-spark/actions/sparkpi
@@ -14,8 +14,8 @@
 # See the License for the specific language governing permissions and
 # limitations under the License.
 
+import os
 import sys
-sys.path.append('lib')
 
 from path import Path
 from time import time
@@ -55,12 +55,23 @@ def main():
 
     print('calculating pi')
 
+    # get the examples jar
+    spark_home = "/usr/lib/spark"
+    example_jar_name = "spark-examples.jar"
+    example_jar_path = None
+    for root, dirs, files in os.walk(spark_home):
+        if example_jar_name in files:
+            example_jar_path = os.path.join(root, example_jar_name)
+
+    if not example_jar_path:
+        fail('could not find {}'.format(example_jar_name), 'error')
+
     with open(result_log, 'w') as log_file:
         arg_list = [
             'spark-submit',
             '--class',
             'org.apache.spark.examples.SparkPi',
-            '/usr/lib/spark/lib/spark-examples.jar'
+            example_jar_path
         ]
         if num_partitions:
             # This is always blank. TODO: figure out what it was

http://git-wip-us.apache.org/repos/asf/bigtop/blob/c6bd2a2d/bigtop-packages/src/charm/spark/layer-spark/config.yaml
----------------------------------------------------------------------
diff --git a/bigtop-packages/src/charm/spark/layer-spark/config.yaml b/bigtop-packages/src/charm/spark/layer-spark/config.yaml
index 2a88752..b923687 100644
--- a/bigtop-packages/src/charm/spark/layer-spark/config.yaml
+++ b/bigtop-packages/src/charm/spark/layer-spark/config.yaml
@@ -1,10 +1,18 @@
 options:
-    resources_mirror:
+    driver_memory:
         type: string
-        default: ''
+        default: '1g'
         description: |
-            URL used to fetch resources (e.g., Hadoop binaries) instead of the
-            location specified in resources.yaml.
+            Specify gigabytes (e.g. 1g) or megabytes (e.g. 1024m). If running
+            in 'local' or 'standalone' mode, you may also specify a percentage
+            of total system memory (e.g. 50%).
+    executor_memory:
+        type: string
+        default: '1g'
+        description: |
+            Specify gigabytes (e.g. 1g) or megabytes (e.g. 1024m). If running
+            in 'local' or 'standalone' mode, you may also specify a percentage
+            of total system memory (e.g. 50%).
     spark_bench_enabled:
         type: boolean
         default: true
@@ -16,7 +24,7 @@ options:
             preserved.
     spark_bench_ppc64le:
         type: string
-        default: 'https://s3.amazonaws.com/jujubigdata/ibm/noarch/SparkBench-2.0-20161101.tgz#sha256=2a34150dc3ad4a1469ca09c202f4db4ee995e2932b8a633d8c006d46c1f61e9f'
+        default: 'https://s3.amazonaws.com/jujubigdata/ibm/noarch/SparkBench-2.0-20170403.tgz#sha256=709caec6667dd82e42de25eb8bcd5763ca894e99e5c83c97bdfcf62cb1aa00c8'
         description: |
             URL (including hash) of a ppc64le tarball of SparkBench. By
             default, this points to a pre-built SparkBench binary based on
@@ -24,7 +32,7 @@ options:
             'spark_bench_enabled' is 'true'.
     spark_bench_x86_64:
         type: string
-        default: 'https://s3.amazonaws.com/jujubigdata/ibm/noarch/SparkBench-2.0-20161101.tgz#sha256=2a34150dc3ad4a1469ca09c202f4db4ee995e2932b8a633d8c006d46c1f61e9f'
+        default: 'https://s3.amazonaws.com/jujubigdata/ibm/noarch/SparkBench-2.0-20170403.tgz#sha256=709caec6667dd82e42de25eb8bcd5763ca894e99e5c83c97bdfcf62cb1aa00c8'
         description: |
             URL (including hash) of an x86_64 tarball of SparkBench. By
             default, this points to a pre-built SparkBench binary based on

http://git-wip-us.apache.org/repos/asf/bigtop/blob/c6bd2a2d/bigtop-packages/src/charm/spark/layer-spark/lib/charms/layer/bigtop_spark.py
----------------------------------------------------------------------
diff --git a/bigtop-packages/src/charm/spark/layer-spark/lib/charms/layer/bigtop_spark.py b/bigtop-packages/src/charm/spark/layer-spark/lib/charms/layer/bigtop_spark.py
index dc2e373..1be1072 100755
--- a/bigtop-packages/src/charm/spark/layer-spark/lib/charms/layer/bigtop_spark.py
+++ b/bigtop-packages/src/charm/spark/layer-spark/lib/charms/layer/bigtop_spark.py
@@ -13,6 +13,7 @@
 # See the License for the specific language governing permissions and
 # limitations under the License.
 import os
+import time
 from jujubigdata import utils
 from path import Path
 
@@ -174,7 +175,7 @@ class Spark(object):
 
         Two flags are needed:
 
-          * Namenode exists aka HDFS is there
+          * Namenode exists aka HDFS is ready
           * Resource manager exists aka YARN is ready
 
         both flags are infered from the available hosts.
@@ -189,7 +190,9 @@ class Spark(object):
             self.setup()
             unitdata.kv().set('spark.bootstrapped', True)
 
+        mode = hookenv.config()['spark_execution_mode']
         master_ip = utils.resolve_private_address(available_hosts['spark-master'])
+        master_url = self.get_master_url(master_ip)
         hosts = {
             'spark': master_ip,
         }
@@ -206,7 +209,7 @@ class Spark(object):
         roles = self.get_roles()
 
         override = {
-            'spark::common::master_url': self.get_master_url(master_ip),
+            'spark::common::master_url': master_url,
             'spark::common::event_log_dir': events_log_dir,
             'spark::common::history_log_dir': events_log_dir,
         }
@@ -220,18 +223,13 @@ class Spark(object):
             zk_connect = ",".join(zks)
             override['spark::common::zookeeper_connection_string'] = zk_connect
         else:
-            override['spark::common::zookeeper_connection_string'] = ""
+            override['spark::common::zookeeper_connection_string'] = None
 
         bigtop = Bigtop()
         bigtop.render_site_yaml(hosts, roles, override)
         bigtop.trigger_puppet()
-        # There is a race condition here.
-        # The work role will not start the first time we trigger puppet apply.
-        # The exception in /var/logs/spark:
-        # Exception in thread "main" org.apache.spark.SparkException: Invalid master URL: spark://:7077
-        # The master url is not set at the time the worker start the first time.
-        # TODO(kjackal): ...do the needed... (investiate,debug,submit patch)
-        bigtop.trigger_puppet()
+
+        # Do this after our puppet bits in case puppet overrides needed perms
         if 'namenode' not in available_hosts:
             # Local event dir (not in HDFS) needs to be 777 so non-spark
             # users can write job history there. It needs to be g+s so
@@ -239,22 +237,54 @@ class Spark(object):
             # It needs to be +t so users cannot remove files they don't own.
             dc.path('spark_events').chmod(0o3777)
 
-        self.patch_worker_master_url(master_ip)
+        self.patch_worker_master_url(master_ip, master_url)
+
+        # handle tuning options that may be set as percentages
+        driver_mem = '1g'
+        req_driver_mem = hookenv.config()['driver_memory']
+        executor_mem = '1g'
+        req_executor_mem = hookenv.config()['executor_memory']
+        if req_driver_mem.endswith('%'):
+            if mode == 'standalone' or mode.startswith('local'):
+                mem_mb = host.get_total_ram() / 1024 / 1024
+                req_percentage = float(req_driver_mem.strip('%')) / 100
+                driver_mem = str(int(mem_mb * req_percentage)) + 'm'
+            else:
+                hookenv.log("driver_memory percentage in non-local mode. Using 1g default.",
+                            level=None)
+        else:
+            driver_mem = req_driver_mem
 
+        if req_executor_mem.endswith('%'):
+            if mode == 'standalone' or mode.startswith('local'):
+                mem_mb = host.get_total_ram() / 1024 / 1024
+                req_percentage = float(req_executor_mem.strip('%')) / 100
+                executor_mem = str(int(mem_mb * req_percentage)) + 'm'
+            else:
+                hookenv.log("executor_memory percentage in non-local mode. Using 1g default.",
+                            level=None)
+        else:
+            executor_mem = req_executor_mem
+
+        spark_env = '/etc/spark/conf/spark-env.sh'
+        utils.re_edit_in_place(spark_env, {
+            r'.*SPARK_DRIVER_MEMORY.*': 'export SPARK_DRIVER_MEMORY={}'.format(driver_mem),
+            r'.*SPARK_EXECUTOR_MEMORY.*': 'export SPARK_EXECUTOR_MEMORY={}'.format(executor_mem),
+        }, append_non_matches=True)
+
+        # Install SB (subsequent calls will reconfigure existing install)
         # SparkBench looks for the spark master in /etc/environment
         with utils.environment_edit_in_place('/etc/environment') as env:
-            env['MASTER'] = self.get_master_url(master_ip)
-        # Install SB (subsequent calls will reconfigure existing install)
+            env['MASTER'] = master_url
         self.install_benchmark()
 
-    def patch_worker_master_url(self, master_ip):
+    def patch_worker_master_url(self, master_ip, master_url):
         '''
         Patch the worker startup script to use the full master url istead of contracting it.
         The master url is placed in the spark-env.sh so that the startup script will use it.
         In HA mode the master_ip is set to be the local_ip instead of the one the leader
         elects. This requires a restart of the master service.
         '''
-        master_url = self.get_master_url(master_ip)
         zk_units = unitdata.kv().get('zookeeper.units', [])
         if master_url.startswith('spark://'):
             if zk_units:
@@ -268,8 +298,6 @@ class Spark(object):
             self.inplace_change('/etc/init.d/spark-worker',
                                 'spark://$SPARK_MASTER_IP:$SPARK_MASTER_PORT',
                                 '$SPARK_MASTER_URL')
-            host.service_restart('spark-master')
-            host.service_restart('spark-worker')
 
     def inplace_change(self, filename, old_string, new_string):
         # Safely read the input filename using 'with'
@@ -294,27 +322,24 @@ class Spark(object):
         Path(demo_target).chown('ubuntu', 'hadoop')
 
     def start(self):
-        if unitdata.kv().get('spark.uprading', False):
-            return
-
-        # stop services (if they're running) to pick up any config change
-        self.stop()
         # always start the history server, start master/worker if we're standalone
         host.service_start('spark-history-server')
         if hookenv.config()['spark_execution_mode'] == 'standalone':
-            host.service_start('spark-master')
+            if host.service_start('spark-master'):
+                # If the master started, wait 2m for recovery before starting
+                # the worker.
+                hookenv.status_set('maintenance',
+                                   'waiting for spark master recovery')
+                hookenv.log("Waiting 2m to ensure spark master is ALIVE")
+                time.sleep(120)
+            else:
+                hookenv.log("Master did not start")
             host.service_start('spark-worker')
 
     def stop(self):
-        if not unitdata.kv().get('spark.installed', False):
-            return
-        # Only stop services if they're running
-        if utils.jps("HistoryServer"):
-            host.service_stop('spark-history-server')
-        if utils.jps("Master"):
-            host.service_stop('spark-master')
-        if utils.jps("Worker"):
-            host.service_stop('spark-worker')
+        host.service_stop('spark-history-server')
+        host.service_stop('spark-master')
+        host.service_stop('spark-worker')
 
     def open_ports(self):
         for port in self.dist_config.exposed_ports('spark'):

http://git-wip-us.apache.org/repos/asf/bigtop/blob/c6bd2a2d/bigtop-packages/src/charm/spark/layer-spark/reactive/spark.py
----------------------------------------------------------------------
diff --git a/bigtop-packages/src/charm/spark/layer-spark/reactive/spark.py b/bigtop-packages/src/charm/spark/layer-spark/reactive/spark.py
index 99b2101..b6b0ca7 100644
--- a/bigtop-packages/src/charm/spark/layer-spark/reactive/spark.py
+++ b/bigtop-packages/src/charm/spark/layer-spark/reactive/spark.py
@@ -12,6 +12,8 @@
 # WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
 # See the License for the specific language governing permissions and
 # limitations under the License.
+import time
+
 from charms.reactive import RelationBase, when, when_not, is_state, set_state, remove_state, when_any
 from charms.layer.apache_bigtop_base import get_fqdn, get_package_version
 from charms.layer.bigtop_spark import Spark
@@ -21,18 +23,9 @@ from charms.reactive.helpers import data_changed
 from jujubigdata import utils
 
 
-def set_deployment_mode_state(state):
-    if is_state('spark.yarn.installed'):
-        remove_state('spark.yarn.installed')
-    if is_state('spark.standalone.installed'):
-        remove_state('spark.standalone.installed')
-    set_state('spark.started')
-    set_state(state)
-    # set app version string for juju status output
-    spark_version = get_package_version('spark-core') or 'unknown'
-    hookenv.application_version_set(spark_version)
-
-
+###############################################################################
+# Status methods
+###############################################################################
 def report_status():
     mode = hookenv.config()['spark_execution_mode']
     if (not is_state('spark.yarn.installed')) and mode.startswith('yarn'):
@@ -45,71 +38,115 @@ def report_status():
     elif mode == 'standalone' and is_state('leadership.is_leader'):
         mode = mode + " - master"
 
-    hookenv.status_set('active', 'ready ({})'.format(mode))
+    if is_state('spark.started'):
+        hookenv.status_set('active', 'ready ({})'.format(mode))
+    else:
+        hookenv.status_set('blocked', 'unable to start spark ({})'.format(mode))
 
 
-def install_spark(hadoop=None, zks=None):
-    spark_master_host = leadership.leader_get('master-fqdn')
-    if not spark_master_host:
-        hookenv.status_set('waiting', 'master not elected yet')
-        return False
+###############################################################################
+# Utility methods
+###############################################################################
+def get_spark_peers():
+    nodes = [(hookenv.local_unit(), hookenv.unit_private_ip())]
+    sparkpeer = RelationBase.from_state('sparkpeers.joined')
+    if sparkpeer:
+        nodes.extend(sorted(sparkpeer.get_nodes()))
+    return nodes
 
+
+def install_spark_standalone(zks, peers):
+    """
+    Called in local/standalone mode after Juju has elected a leader.
+    """
     hosts = {
-        'spark-master': spark_master_host,
+        'spark-master': leadership.leader_get('master-fqdn'),
     }
 
-    if is_state('hadoop.yarn.ready'):
-        rms = hadoop.resourcemanagers()
-        hosts['resourcemanager'] = rms[0]
-
-    if is_state('hadoop.hdfs.ready'):
-        nns = hadoop.namenodes()
-        hosts['namenode'] = nns[0]
+    # If zks have changed and we are not handling a departed spark peer,
+    # give the ensemble time to settle. Otherwise we might try to start
+    # spark master with data from the wrong zk leader. Doing so will cause
+    # spark-master to shutdown:
+    #  https://issues.apache.org/jira/browse/SPARK-15544
+    if (zks and data_changed('zks', zks) and not is_state('sparkpeers.departed')):
+        hookenv.status_set('maintenance',
+                           'waiting for zookeeper ensemble to settle')
+        hookenv.log("Waiting 2m to ensure zk ensemble has settled: {}".format(zks))
+        time.sleep(120)
 
     spark = Spark()
-    spark.configure(hosts, zks, get_spark_peers())
-    return True
-
+    spark.configure(hosts, zks, peers)
+    set_deployment_mode_state('spark.standalone.installed')
 
-@when('config.changed', 'spark.started')
-def reconfigure_spark():
-    config = hookenv.config()
-    mode = config['spark_execution_mode']
-    hookenv.status_set('maintenance',
-                       'changing default execution mode to {}'.format(mode))
 
+def install_spark_yarn():
+    """
+    Called in 'yarn-*' mode after Juju has elected a leader. The
+    'hadoop.yarn.ready' state must be set.
+    """
+    hosts = {
+        'spark-master': leadership.leader_get('master-fqdn'),
+    }
     hadoop = (RelationBase.from_state('hadoop.yarn.ready') or
               RelationBase.from_state('hadoop.hdfs.ready'))
+    rms = hadoop.resourcemanagers()
+    hosts['resourcemanager'] = rms[0]
 
-    zks = None
-    if is_state('zookeeper.ready'):
-        zk = RelationBase.from_state('zookeeper.ready')
-        zks = zk.zookeepers()
+    # Probably don't need to check this since yarn.ready implies hdfs.ready
+    # for us, but it doesn't hurt.
+    if is_state('hadoop.hdfs.ready'):
+        nns = hadoop.namenodes()
+        hosts['namenode'] = nns[0]
 
-    if install_spark(hadoop, zks):
-        report_status()
+    spark = Spark()
+    spark.configure(hosts, zk_units=None, peers=None)
+    set_deployment_mode_state('spark.yarn.installed')
 
 
-# This is a triky call. We want to fire when the leader changes, yarn and hdfs become ready or
-# depart. In the future this should fire when Cassandra or any other storage
-# becomes ready or departs. Since hdfs and yarn do not have a departed state we make sure
-# we fire this method always ('spark.started'). We then build a deployment-matrix
-# and if anything has changed we re-install.
-# 'hadoop.yarn.ready', 'hadoop.hdfs.ready' can be ommited but I like them here for clarity
-@when_any('hadoop.yarn.ready',
-          'hadoop.hdfs.ready', 'master.elected', 'sparkpeers.joined', 'zookeeper.ready')
+def set_deployment_mode_state(state):
+    if is_state('spark.yarn.installed'):
+        remove_state('spark.standalone.installed')
+    if is_state('spark.standalone.installed'):
+        remove_state('spark.yarn.installed')
+    set_state(state)
+    # set app version string for juju status output
+    spark_version = get_package_version('spark-core') or 'unknown'
+    hookenv.application_version_set(spark_version)
+
+
+###############################################################################
+# Reactive methods
+###############################################################################
+@when_any('config.changed', 'master.elected',
+          'hadoop.hdfs.ready', 'hadoop.yarn.ready',
+          'sparkpeers.joined', 'sparkpeers.departed',
+          'zookeeper.ready')
 @when('bigtop.available', 'master.elected')
 def reinstall_spark():
+    """
+    This is tricky. We want to fire on config or leadership changes, or when
+    hadoop, sparkpeers, or zookeepers come and go. In the future this should
+    fire when Cassandra or any other storage comes or goes. We always fire
+    this method (or rather, when bigtop is ready and juju has elected a
+    master). We then build a deployment-matrix and (re)install as things
+    change.
+    """
     spark_master_host = leadership.leader_get('master-fqdn')
-    peers = []
-    zks = []
-    if is_state('zookeeper.ready'):
-        # if ZK is availuable we are in HA. We do not want reconfigurations if a leader fails
-        # HA takes care of this
+    if not spark_master_host:
+        hookenv.status_set('maintenance', 'juju leader not elected yet')
+        return
+
+    mode = hookenv.config()['spark_execution_mode']
+    peers = None
+    zks = None
+
+    # If mode is standalone and ZK is ready, we are in HA. Do not consider
+    # the master_host from juju leadership in our matrix. ZK handles this.
+    if (mode == 'standalone' and is_state('zookeeper.ready')):
         spark_master_host = ''
         zk = RelationBase.from_state('zookeeper.ready')
         zks = zk.zookeepers()
-        # We need reconfigure Spark when in HA and peers change ignore otherwise
+        # peers are only used to set our MASTER_URL in standalone HA mode
         peers = get_spark_peers()
 
     deployment_matrix = {
@@ -120,34 +157,39 @@ def reinstall_spark():
         'peers': peers,
     }
 
-    if not data_changed('deployment_matrix', deployment_matrix):
+    # If neither config nor our matrix is changing, there is nothing to do.
+    if not (is_state('config.changed') or
+            data_changed('deployment_matrix', deployment_matrix)):
         return
 
-    hookenv.status_set('maintenance', 'configuring spark')
-    hadoop = (RelationBase.from_state('hadoop.yarn.ready') or
-              RelationBase.from_state('hadoop.hdfs.ready'))
-    if install_spark(hadoop, zks):
-        if is_state('hadoop.yarn.ready'):
-            set_deployment_mode_state('spark.yarn.installed')
-        else:
-            set_deployment_mode_state('spark.standalone.installed')
-
+    # (Re)install based on our execution mode
+    hookenv.status_set('maintenance', 'configuring spark in {} mode'.format(mode))
+    hookenv.log("Configuring spark with deployment matrix: {}".format(deployment_matrix))
+
+    if mode.startswith('yarn') and is_state('hadoop.yarn.ready'):
+        install_spark_yarn()
+    elif mode.startswith('local') or mode == 'standalone':
+        install_spark_standalone(zks, peers)
+    else:
+        # Something's wrong (probably requested yarn without yarn.ready).
+        remove_state('spark.started')
         report_status()
+        return
 
+    # restart services to pick up possible config changes
+    spark = Spark()
+    spark.stop()
+    spark.start()
 
-def get_spark_peers():
-    nodes = [(hookenv.local_unit(), hookenv.unit_private_ip())]
-    sparkpeer = RelationBase.from_state('sparkpeers.joined')
-    if sparkpeer:
-        nodes.extend(sorted(sparkpeer.get_nodes()))
-    return nodes
+    set_state('spark.started')
+    report_status()
 
 
-@when('leadership.is_leader', 'bigtop.available')
+@when('bigtop.available', 'leadership.is_leader')
 def send_fqdn():
     spark_master_host = get_fqdn()
     leadership.leader_set({'master-fqdn': spark_master_host})
-    hookenv.log("Setting leader to {}".format(spark_master_host))
+    hookenv.log("Setting juju leader to {}".format(spark_master_host))
 
 
 @when('leadership.changed.master-fqdn')