You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@bigtop.apache.org by kw...@apache.org on 2017/04/12 20:49:41 UTC
bigtop git commit: BIGTOP-2737: Spark charm doesn't handle HA or
examples well (closes #194)
Repository: bigtop
Updated Branches:
refs/heads/master da2c4292f -> c6bd2a2d9
BIGTOP-2737: Spark charm doesn't handle HA or examples well (closes #194)
Signed-off-by: Kevin W Monroe <ke...@canonical.com>
Project: http://git-wip-us.apache.org/repos/asf/bigtop/repo
Commit: http://git-wip-us.apache.org/repos/asf/bigtop/commit/c6bd2a2d
Tree: http://git-wip-us.apache.org/repos/asf/bigtop/tree/c6bd2a2d
Diff: http://git-wip-us.apache.org/repos/asf/bigtop/diff/c6bd2a2d
Branch: refs/heads/master
Commit: c6bd2a2d96c8e57167939abc65b7844931a26bc7
Parents: da2c429
Author: Kevin W Monroe <ke...@canonical.com>
Authored: Tue Mar 28 20:46:17 2017 +0000
Committer: Kevin W Monroe <ke...@canonical.com>
Committed: Wed Apr 12 15:47:49 2017 -0500
----------------------------------------------------------------------
.../src/charm/spark/layer-spark/actions.yaml | 11 +-
.../charm/spark/layer-spark/actions/pagerank | 135 ++++++++++++-
.../src/charm/spark/layer-spark/actions/sparkpi | 15 +-
.../src/charm/spark/layer-spark/config.yaml | 20 +-
.../lib/charms/layer/bigtop_spark.py | 89 +++++----
.../charm/spark/layer-spark/reactive/spark.py | 188 ++++++++++++-------
6 files changed, 341 insertions(+), 117 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/bigtop/blob/c6bd2a2d/bigtop-packages/src/charm/spark/layer-spark/actions.yaml
----------------------------------------------------------------------
diff --git a/bigtop-packages/src/charm/spark/layer-spark/actions.yaml b/bigtop-packages/src/charm/spark/layer-spark/actions.yaml
index 6564b1c..7f0961d 100644
--- a/bigtop-packages/src/charm/spark/layer-spark/actions.yaml
+++ b/bigtop-packages/src/charm/spark/layer-spark/actions.yaml
@@ -1,5 +1,12 @@
+pagerank:
+ description: Calculate PageRank for a sample data set
+ params:
+ iterations:
+ description: Number of iterations for the SparkPageRank job
+ type: string
+ default: "1"
smoke-test:
- description: Verify that Spark is working by calculating pi.
+ description: Verify that Spark is working by calculating pi
sparkpi:
description: Calculate Pi
params:
@@ -19,8 +26,6 @@ logisticregression:
description: Run the Spark Bench LogisticRegression benchmark.
matrixfactorization:
description: Run the Spark Bench MatrixFactorization benchmark.
-pagerank:
- description: Run the Spark Bench PageRank benchmark.
pca:
description: Run the Spark Bench PCA benchmark.
pregeloperation:
http://git-wip-us.apache.org/repos/asf/bigtop/blob/c6bd2a2d/bigtop-packages/src/charm/spark/layer-spark/actions/pagerank
----------------------------------------------------------------------
diff --git a/bigtop-packages/src/charm/spark/layer-spark/actions/pagerank b/bigtop-packages/src/charm/spark/layer-spark/actions/pagerank
deleted file mode 120000
index 9e15049..0000000
--- a/bigtop-packages/src/charm/spark/layer-spark/actions/pagerank
+++ /dev/null
@@ -1 +0,0 @@
-sparkbench
\ No newline at end of file
diff --git a/bigtop-packages/src/charm/spark/layer-spark/actions/pagerank b/bigtop-packages/src/charm/spark/layer-spark/actions/pagerank
new file mode 100755
index 0000000..b2d85e6
--- /dev/null
+++ b/bigtop-packages/src/charm/spark/layer-spark/actions/pagerank
@@ -0,0 +1,134 @@
+#!/usr/bin/env python3
+# Licensed to the Apache Software Foundation (ASF) under one or more
+# contributor license agreements. See the NOTICE file distributed with
+# this work for additional information regarding copyright ownership.
+# The ASF licenses this file to You under the Apache License, Version 2.0
+# (the "License"); you may not use this file except in compliance with
+# the License. You may obtain a copy of the License at
+#
+# http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing, software
+# distributed under the License is distributed on an "AS IS" BASIS,
+# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+# See the License for the specific language governing permissions and
+# limitations under the License.
+
+import os
+import subprocess
+import sys
+
+from path import Path
+from time import time
+
+from charmhelpers.contrib.benchmark import Benchmark
+from charmhelpers.core import hookenv
+from charms.reactive import is_state
+from jujubigdata import utils
+
+
+def fail(msg, output):
+ print(msg)
+ hookenv.action_set({'output': output})
+ hookenv.action_fail(msg)
+ sys.exit(1)
+
+
+def main():
+ bench = Benchmark()
+
+ if not is_state('spark.started'):
+ msg = 'Spark is not started yet'
+ fail(msg, 'error')
+
+ # gather params and create dir to store results
+ num_iter = hookenv.action_get('iterations')
+ run = int(time())
+ result_dir = Path('/opt/sparkpagerank-results')
+ result_log = result_dir / '{}.log'.format(run)
+ if not result_dir.exists():
+ result_dir.mkdir()
+ result_dir.chown('ubuntu', 'ubuntu')
+ hookenv.log("values: {} {}".format(num_iter, result_log))
+
+ sample = "/home/ubuntu/SparkBench/PageRank/web-Google.txt"
+ if not os.path.isfile(sample):
+ msg = 'Could not find pagerank sample data'
+ fail('{}: {}'.format(msg, sample), 'error')
+
+ # Benchmark input data is packed into our sparkbench.tgz, which makes
+ # it available on all spark units. In yarn mode, however, the nodemanagers
+ # act as the spark workers and will not have access to this local data.
+ # In yarn mode, copy our input data to hdfs so nodemanagers can access it.
+ mode = hookenv.config()['spark_execution_mode']
+ if mode.startswith('yarn'):
+ if is_state('hadoop.hdfs.ready'):
+ try:
+ utils.run_as('ubuntu',
+ 'hdfs', 'dfs', '-put', '-f', sample, '/user/ubuntu',
+ capture_output=True)
+ except subprocess.CalledProcessError as e:
+ msg = 'Unable to copy pagerank sample data to hdfs'
+ fail('{}: {}'.format(msg, e), 'error')
+ else:
+ sample = "/user/ubuntu/web-Google.txt"
+ else:
+ msg = 'Spark is configured for yarn mode, but HDFS is not ready yet'
+ fail(msg, 'error')
+
+ # find jar location
+ spark_home = "/usr/lib/spark"
+ example_jar_name = "spark-examples.jar"
+ example_jar_path = None
+ for root, dirs, files in os.walk(spark_home):
+ if example_jar_name in files:
+ example_jar_path = os.path.join(root, example_jar_name)
+
+ if not example_jar_path:
+ msg = 'Could not find {}'.format(example_jar_name)
+ fail(msg, 'error')
+
+ print('Calculating PageRank')
+ bench.start()
+ start = int(time())
+
+ with open(result_log, 'w') as log_file:
+ arg_list = [
+ 'spark-submit',
+ '--class',
+ 'org.apache.spark.examples.SparkPageRank',
+ example_jar_path,
+ sample,
+ num_iter,
+ ]
+
+ try:
+ subprocess.check_call(arg_list, stdout=log_file,
+ stderr=subprocess.STDOUT)
+ except subprocess.CalledProcessError as e:
+ msg = 'SparkPageRank command failed: {}'.format(' '.join(arg_list))
+ fail('{}: {}'.format(msg, e), 'error')
+
+ stop = int(time())
+ bench.finish()
+
+ duration = stop - start
+ bench.set_composite_score(duration, 'secs')
+ subprocess.check_call(['benchmark-raw', result_log])
+
+ with open(result_log) as log:
+ success = False
+ for line in log.readlines():
+ if 'rank' in line:
+ success = True
+ break
+
+ if not success:
+ msg = 'Spark-submit failed to calculate pagerank'
+ fail(msg, 'error')
+
+ hookenv.action_set({'output': {'status': 'completed'}})
+
+
+if __name__ == '__main__':
+ main()
http://git-wip-us.apache.org/repos/asf/bigtop/blob/c6bd2a2d/bigtop-packages/src/charm/spark/layer-spark/actions/sparkpi
----------------------------------------------------------------------
diff --git a/bigtop-packages/src/charm/spark/layer-spark/actions/sparkpi b/bigtop-packages/src/charm/spark/layer-spark/actions/sparkpi
index 9afceaf..99fded6 100755
--- a/bigtop-packages/src/charm/spark/layer-spark/actions/sparkpi
+++ b/bigtop-packages/src/charm/spark/layer-spark/actions/sparkpi
@@ -14,8 +14,8 @@
# See the License for the specific language governing permissions and
# limitations under the License.
+import os
import sys
-sys.path.append('lib')
from path import Path
from time import time
@@ -55,12 +55,23 @@ def main():
print('calculating pi')
+ # get the examples jar
+ spark_home = "/usr/lib/spark"
+ example_jar_name = "spark-examples.jar"
+ example_jar_path = None
+ for root, dirs, files in os.walk(spark_home):
+ if example_jar_name in files:
+ example_jar_path = os.path.join(root, example_jar_name)
+
+ if not example_jar_path:
+ fail('could not find {}'.format(example_jar_name), 'error')
+
with open(result_log, 'w') as log_file:
arg_list = [
'spark-submit',
'--class',
'org.apache.spark.examples.SparkPi',
- '/usr/lib/spark/lib/spark-examples.jar'
+ example_jar_path
]
if num_partitions:
# This is always blank. TODO: figure out what it was
http://git-wip-us.apache.org/repos/asf/bigtop/blob/c6bd2a2d/bigtop-packages/src/charm/spark/layer-spark/config.yaml
----------------------------------------------------------------------
diff --git a/bigtop-packages/src/charm/spark/layer-spark/config.yaml b/bigtop-packages/src/charm/spark/layer-spark/config.yaml
index 2a88752..b923687 100644
--- a/bigtop-packages/src/charm/spark/layer-spark/config.yaml
+++ b/bigtop-packages/src/charm/spark/layer-spark/config.yaml
@@ -1,10 +1,18 @@
options:
- resources_mirror:
+ driver_memory:
type: string
- default: ''
+ default: '1g'
description: |
- URL used to fetch resources (e.g., Hadoop binaries) instead of the
- location specified in resources.yaml.
+ Specify gigabytes (e.g. 1g) or megabytes (e.g. 1024m). If running
+ in 'local' or 'standalone' mode, you may also specify a percentage
+ of total system memory (e.g. 50%).
+ executor_memory:
+ type: string
+ default: '1g'
+ description: |
+ Specify gigabytes (e.g. 1g) or megabytes (e.g. 1024m). If running
+ in 'local' or 'standalone' mode, you may also specify a percentage
+ of total system memory (e.g. 50%).
spark_bench_enabled:
type: boolean
default: true
@@ -16,7 +24,7 @@ options:
preserved.
spark_bench_ppc64le:
type: string
- default: 'https://s3.amazonaws.com/jujubigdata/ibm/noarch/SparkBench-2.0-20161101.tgz#sha256=2a34150dc3ad4a1469ca09c202f4db4ee995e2932b8a633d8c006d46c1f61e9f'
+ default: 'https://s3.amazonaws.com/jujubigdata/ibm/noarch/SparkBench-2.0-20170403.tgz#sha256=709caec6667dd82e42de25eb8bcd5763ca894e99e5c83c97bdfcf62cb1aa00c8'
description: |
URL (including hash) of a ppc64le tarball of SparkBench. By
default, this points to a pre-built SparkBench binary based on
@@ -24,7 +32,7 @@ options:
'spark_bench_enabled' is 'true'.
spark_bench_x86_64:
type: string
- default: 'https://s3.amazonaws.com/jujubigdata/ibm/noarch/SparkBench-2.0-20161101.tgz#sha256=2a34150dc3ad4a1469ca09c202f4db4ee995e2932b8a633d8c006d46c1f61e9f'
+ default: 'https://s3.amazonaws.com/jujubigdata/ibm/noarch/SparkBench-2.0-20170403.tgz#sha256=709caec6667dd82e42de25eb8bcd5763ca894e99e5c83c97bdfcf62cb1aa00c8'
description: |
URL (including hash) of an x86_64 tarball of SparkBench. By
default, this points to a pre-built SparkBench binary based on
http://git-wip-us.apache.org/repos/asf/bigtop/blob/c6bd2a2d/bigtop-packages/src/charm/spark/layer-spark/lib/charms/layer/bigtop_spark.py
----------------------------------------------------------------------
diff --git a/bigtop-packages/src/charm/spark/layer-spark/lib/charms/layer/bigtop_spark.py b/bigtop-packages/src/charm/spark/layer-spark/lib/charms/layer/bigtop_spark.py
index dc2e373..1be1072 100755
--- a/bigtop-packages/src/charm/spark/layer-spark/lib/charms/layer/bigtop_spark.py
+++ b/bigtop-packages/src/charm/spark/layer-spark/lib/charms/layer/bigtop_spark.py
@@ -13,6 +13,7 @@
# See the License for the specific language governing permissions and
# limitations under the License.
import os
+import time
from jujubigdata import utils
from path import Path
@@ -174,7 +175,7 @@ class Spark(object):
Two flags are needed:
- * Namenode exists aka HDFS is there
+ * Namenode exists aka HDFS is ready
* Resource manager exists aka YARN is ready
both flags are infered from the available hosts.
@@ -189,7 +190,9 @@ class Spark(object):
self.setup()
unitdata.kv().set('spark.bootstrapped', True)
+ mode = hookenv.config()['spark_execution_mode']
master_ip = utils.resolve_private_address(available_hosts['spark-master'])
+ master_url = self.get_master_url(master_ip)
hosts = {
'spark': master_ip,
}
@@ -206,7 +209,7 @@ class Spark(object):
roles = self.get_roles()
override = {
- 'spark::common::master_url': self.get_master_url(master_ip),
+ 'spark::common::master_url': master_url,
'spark::common::event_log_dir': events_log_dir,
'spark::common::history_log_dir': events_log_dir,
}
@@ -220,18 +223,13 @@ class Spark(object):
zk_connect = ",".join(zks)
override['spark::common::zookeeper_connection_string'] = zk_connect
else:
- override['spark::common::zookeeper_connection_string'] = ""
+ override['spark::common::zookeeper_connection_string'] = None
bigtop = Bigtop()
bigtop.render_site_yaml(hosts, roles, override)
bigtop.trigger_puppet()
- # There is a race condition here.
- # The work role will not start the first time we trigger puppet apply.
- # The exception in /var/logs/spark:
- # Exception in thread "main" org.apache.spark.SparkException: Invalid master URL: spark://:7077
- # The master url is not set at the time the worker start the first time.
- # TODO(kjackal): ...do the needed... (investiate,debug,submit patch)
- bigtop.trigger_puppet()
+
+ # Do this after our puppet bits in case puppet overrides needed perms
if 'namenode' not in available_hosts:
# Local event dir (not in HDFS) needs to be 777 so non-spark
# users can write job history there. It needs to be g+s so
@@ -239,22 +237,54 @@ class Spark(object):
# It needs to be +t so users cannot remove files they don't own.
dc.path('spark_events').chmod(0o3777)
- self.patch_worker_master_url(master_ip)
+ self.patch_worker_master_url(master_ip, master_url)
+
+ # handle tuning options that may be set as percentages
+ driver_mem = '1g'
+ req_driver_mem = hookenv.config()['driver_memory']
+ executor_mem = '1g'
+ req_executor_mem = hookenv.config()['executor_memory']
+ if req_driver_mem.endswith('%'):
+ if mode == 'standalone' or mode.startswith('local'):
+ mem_mb = host.get_total_ram() / 1024 / 1024
+ req_percentage = float(req_driver_mem.strip('%')) / 100
+ driver_mem = str(int(mem_mb * req_percentage)) + 'm'
+ else:
+ hookenv.log("driver_memory percentage in non-local mode. Using 1g default.",
+ level=None)
+ else:
+ driver_mem = req_driver_mem
+ if req_executor_mem.endswith('%'):
+ if mode == 'standalone' or mode.startswith('local'):
+ mem_mb = host.get_total_ram() / 1024 / 1024
+ req_percentage = float(req_executor_mem.strip('%')) / 100
+ executor_mem = str(int(mem_mb * req_percentage)) + 'm'
+ else:
+ hookenv.log("executor_memory percentage in non-local mode. Using 1g default.",
+ level=None)
+ else:
+ executor_mem = req_executor_mem
+
+ spark_env = '/etc/spark/conf/spark-env.sh'
+ utils.re_edit_in_place(spark_env, {
+ r'.*SPARK_DRIVER_MEMORY.*': 'export SPARK_DRIVER_MEMORY={}'.format(driver_mem),
+ r'.*SPARK_EXECUTOR_MEMORY.*': 'export SPARK_EXECUTOR_MEMORY={}'.format(executor_mem),
+ }, append_non_matches=True)
+
+ # Install SB (subsequent calls will reconfigure existing install)
# SparkBench looks for the spark master in /etc/environment
with utils.environment_edit_in_place('/etc/environment') as env:
- env['MASTER'] = self.get_master_url(master_ip)
- # Install SB (subsequent calls will reconfigure existing install)
+ env['MASTER'] = master_url
self.install_benchmark()
- def patch_worker_master_url(self, master_ip):
+ def patch_worker_master_url(self, master_ip, master_url):
'''
Patch the worker startup script to use the full master url istead of contracting it.
The master url is placed in the spark-env.sh so that the startup script will use it.
In HA mode the master_ip is set to be the local_ip instead of the one the leader
elects. This requires a restart of the master service.
'''
- master_url = self.get_master_url(master_ip)
zk_units = unitdata.kv().get('zookeeper.units', [])
if master_url.startswith('spark://'):
if zk_units:
@@ -268,8 +298,6 @@ class Spark(object):
self.inplace_change('/etc/init.d/spark-worker',
'spark://$SPARK_MASTER_IP:$SPARK_MASTER_PORT',
'$SPARK_MASTER_URL')
- host.service_restart('spark-master')
- host.service_restart('spark-worker')
def inplace_change(self, filename, old_string, new_string):
# Safely read the input filename using 'with'
@@ -294,27 +322,24 @@ class Spark(object):
Path(demo_target).chown('ubuntu', 'hadoop')
def start(self):
- if unitdata.kv().get('spark.uprading', False):
- return
-
- # stop services (if they're running) to pick up any config change
- self.stop()
# always start the history server, start master/worker if we're standalone
host.service_start('spark-history-server')
if hookenv.config()['spark_execution_mode'] == 'standalone':
- host.service_start('spark-master')
+ if host.service_start('spark-master'):
+ # If the master started, wait 2m for recovery before starting
+ # the worker.
+ hookenv.status_set('maintenance',
+ 'waiting for spark master recovery')
+ hookenv.log("Waiting 2m to ensure spark master is ALIVE")
+ time.sleep(120)
+ else:
+ hookenv.log("Master did not start")
host.service_start('spark-worker')
def stop(self):
- if not unitdata.kv().get('spark.installed', False):
- return
- # Only stop services if they're running
- if utils.jps("HistoryServer"):
- host.service_stop('spark-history-server')
- if utils.jps("Master"):
- host.service_stop('spark-master')
- if utils.jps("Worker"):
- host.service_stop('spark-worker')
+ host.service_stop('spark-history-server')
+ host.service_stop('spark-master')
+ host.service_stop('spark-worker')
def open_ports(self):
for port in self.dist_config.exposed_ports('spark'):
http://git-wip-us.apache.org/repos/asf/bigtop/blob/c6bd2a2d/bigtop-packages/src/charm/spark/layer-spark/reactive/spark.py
----------------------------------------------------------------------
diff --git a/bigtop-packages/src/charm/spark/layer-spark/reactive/spark.py b/bigtop-packages/src/charm/spark/layer-spark/reactive/spark.py
index 99b2101..b6b0ca7 100644
--- a/bigtop-packages/src/charm/spark/layer-spark/reactive/spark.py
+++ b/bigtop-packages/src/charm/spark/layer-spark/reactive/spark.py
@@ -12,6 +12,8 @@
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.
+import time
+
from charms.reactive import RelationBase, when, when_not, is_state, set_state, remove_state, when_any
from charms.layer.apache_bigtop_base import get_fqdn, get_package_version
from charms.layer.bigtop_spark import Spark
@@ -21,18 +23,9 @@ from charms.reactive.helpers import data_changed
from jujubigdata import utils
-def set_deployment_mode_state(state):
- if is_state('spark.yarn.installed'):
- remove_state('spark.yarn.installed')
- if is_state('spark.standalone.installed'):
- remove_state('spark.standalone.installed')
- set_state('spark.started')
- set_state(state)
- # set app version string for juju status output
- spark_version = get_package_version('spark-core') or 'unknown'
- hookenv.application_version_set(spark_version)
-
-
+###############################################################################
+# Status methods
+###############################################################################
def report_status():
mode = hookenv.config()['spark_execution_mode']
if (not is_state('spark.yarn.installed')) and mode.startswith('yarn'):
@@ -45,71 +38,115 @@ def report_status():
elif mode == 'standalone' and is_state('leadership.is_leader'):
mode = mode + " - master"
- hookenv.status_set('active', 'ready ({})'.format(mode))
+ if is_state('spark.started'):
+ hookenv.status_set('active', 'ready ({})'.format(mode))
+ else:
+ hookenv.status_set('blocked', 'unable to start spark ({})'.format(mode))
-def install_spark(hadoop=None, zks=None):
- spark_master_host = leadership.leader_get('master-fqdn')
- if not spark_master_host:
- hookenv.status_set('waiting', 'master not elected yet')
- return False
+###############################################################################
+# Utility methods
+###############################################################################
+def get_spark_peers():
+ nodes = [(hookenv.local_unit(), hookenv.unit_private_ip())]
+ sparkpeer = RelationBase.from_state('sparkpeers.joined')
+ if sparkpeer:
+ nodes.extend(sorted(sparkpeer.get_nodes()))
+ return nodes
+
+def install_spark_standalone(zks, peers):
+ """
+ Called in local/standalone mode after Juju has elected a leader.
+ """
hosts = {
- 'spark-master': spark_master_host,
+ 'spark-master': leadership.leader_get('master-fqdn'),
}
- if is_state('hadoop.yarn.ready'):
- rms = hadoop.resourcemanagers()
- hosts['resourcemanager'] = rms[0]
-
- if is_state('hadoop.hdfs.ready'):
- nns = hadoop.namenodes()
- hosts['namenode'] = nns[0]
+ # If zks have changed and we are not handling a departed spark peer,
+ # give the ensemble time to settle. Otherwise we might try to start
+ # spark master with data from the wrong zk leader. Doing so will cause
+ # spark-master to shutdown:
+ # https://issues.apache.org/jira/browse/SPARK-15544
+ if (zks and data_changed('zks', zks) and not is_state('sparkpeers.departed')):
+ hookenv.status_set('maintenance',
+ 'waiting for zookeeper ensemble to settle')
+ hookenv.log("Waiting 2m to ensure zk ensemble has settled: {}".format(zks))
+ time.sleep(120)
spark = Spark()
- spark.configure(hosts, zks, get_spark_peers())
- return True
-
+ spark.configure(hosts, zks, peers)
+ set_deployment_mode_state('spark.standalone.installed')
-@when('config.changed', 'spark.started')
-def reconfigure_spark():
- config = hookenv.config()
- mode = config['spark_execution_mode']
- hookenv.status_set('maintenance',
- 'changing default execution mode to {}'.format(mode))
+def install_spark_yarn():
+ """
+ Called in 'yarn-*' mode after Juju has elected a leader. The
+ 'hadoop.yarn.ready' state must be set.
+ """
+ hosts = {
+ 'spark-master': leadership.leader_get('master-fqdn'),
+ }
hadoop = (RelationBase.from_state('hadoop.yarn.ready') or
RelationBase.from_state('hadoop.hdfs.ready'))
+ rms = hadoop.resourcemanagers()
+ hosts['resourcemanager'] = rms[0]
- zks = None
- if is_state('zookeeper.ready'):
- zk = RelationBase.from_state('zookeeper.ready')
- zks = zk.zookeepers()
+ # Probably don't need to check this since yarn.ready implies hdfs.ready
+ # for us, but it doesn't hurt.
+ if is_state('hadoop.hdfs.ready'):
+ nns = hadoop.namenodes()
+ hosts['namenode'] = nns[0]
- if install_spark(hadoop, zks):
- report_status()
+ spark = Spark()
+ spark.configure(hosts, zk_units=None, peers=None)
+ set_deployment_mode_state('spark.yarn.installed')
-# This is a triky call. We want to fire when the leader changes, yarn and hdfs become ready or
-# depart. In the future this should fire when Cassandra or any other storage
-# becomes ready or departs. Since hdfs and yarn do not have a departed state we make sure
-# we fire this method always ('spark.started'). We then build a deployment-matrix
-# and if anything has changed we re-install.
-# 'hadoop.yarn.ready', 'hadoop.hdfs.ready' can be ommited but I like them here for clarity
-@when_any('hadoop.yarn.ready',
- 'hadoop.hdfs.ready', 'master.elected', 'sparkpeers.joined', 'zookeeper.ready')
+def set_deployment_mode_state(state):
+ if is_state('spark.yarn.installed'):
+ remove_state('spark.standalone.installed')
+ if is_state('spark.standalone.installed'):
+ remove_state('spark.yarn.installed')
+ set_state(state)
+ # set app version string for juju status output
+ spark_version = get_package_version('spark-core') or 'unknown'
+ hookenv.application_version_set(spark_version)
+
+
+###############################################################################
+# Reactive methods
+###############################################################################
+@when_any('config.changed', 'master.elected',
+ 'hadoop.hdfs.ready', 'hadoop.yarn.ready',
+ 'sparkpeers.joined', 'sparkpeers.departed',
+ 'zookeeper.ready')
@when('bigtop.available', 'master.elected')
def reinstall_spark():
+ """
+ This is tricky. We want to fire on config or leadership changes, or when
+ hadoop, sparkpeers, or zookeepers come and go. In the future this should
+ fire when Cassandra or any other storage comes or goes. We always fire
+ this method (or rather, when bigtop is ready and juju has elected a
+ master). We then build a deployment-matrix and (re)install as things
+ change.
+ """
spark_master_host = leadership.leader_get('master-fqdn')
- peers = []
- zks = []
- if is_state('zookeeper.ready'):
- # if ZK is availuable we are in HA. We do not want reconfigurations if a leader fails
- # HA takes care of this
+ if not spark_master_host:
+ hookenv.status_set('maintenance', 'juju leader not elected yet')
+ return
+
+ mode = hookenv.config()['spark_execution_mode']
+ peers = None
+ zks = None
+
+ # If mode is standalone and ZK is ready, we are in HA. Do not consider
+ # the master_host from juju leadership in our matrix. ZK handles this.
+ if (mode == 'standalone' and is_state('zookeeper.ready')):
spark_master_host = ''
zk = RelationBase.from_state('zookeeper.ready')
zks = zk.zookeepers()
- # We need reconfigure Spark when in HA and peers change ignore otherwise
+ # peers are only used to set our MASTER_URL in standalone HA mode
peers = get_spark_peers()
deployment_matrix = {
@@ -120,34 +157,39 @@ def reinstall_spark():
'peers': peers,
}
- if not data_changed('deployment_matrix', deployment_matrix):
+ # If neither config nor our matrix is changing, there is nothing to do.
+ if not (is_state('config.changed') or
+ data_changed('deployment_matrix', deployment_matrix)):
return
- hookenv.status_set('maintenance', 'configuring spark')
- hadoop = (RelationBase.from_state('hadoop.yarn.ready') or
- RelationBase.from_state('hadoop.hdfs.ready'))
- if install_spark(hadoop, zks):
- if is_state('hadoop.yarn.ready'):
- set_deployment_mode_state('spark.yarn.installed')
- else:
- set_deployment_mode_state('spark.standalone.installed')
-
+ # (Re)install based on our execution mode
+ hookenv.status_set('maintenance', 'configuring spark in {} mode'.format(mode))
+ hookenv.log("Configuring spark with deployment matrix: {}".format(deployment_matrix))
+
+ if mode.startswith('yarn') and is_state('hadoop.yarn.ready'):
+ install_spark_yarn()
+ elif mode.startswith('local') or mode == 'standalone':
+ install_spark_standalone(zks, peers)
+ else:
+ # Something's wrong (probably requested yarn without yarn.ready).
+ remove_state('spark.started')
report_status()
+ return
+ # restart services to pick up possible config changes
+ spark = Spark()
+ spark.stop()
+ spark.start()
-def get_spark_peers():
- nodes = [(hookenv.local_unit(), hookenv.unit_private_ip())]
- sparkpeer = RelationBase.from_state('sparkpeers.joined')
- if sparkpeer:
- nodes.extend(sorted(sparkpeer.get_nodes()))
- return nodes
+ set_state('spark.started')
+ report_status()
-@when('leadership.is_leader', 'bigtop.available')
+@when('bigtop.available', 'leadership.is_leader')
def send_fqdn():
spark_master_host = get_fqdn()
leadership.leader_set({'master-fqdn': spark_master_host})
- hookenv.log("Setting leader to {}".format(spark_master_host))
+ hookenv.log("Setting juju leader to {}".format(spark_master_host))
@when('leadership.changed.master-fqdn')