You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@slider.apache.org by go...@apache.org on 2017/03/07 03:14:36 UTC
incubator-slider git commit: SLIDER-1174 Support Tensorflow on Slider
(Yang Wang aka. fly_in_gis via gourksaha)
Repository: incubator-slider
Updated Branches:
refs/heads/develop 134ef53f9 -> 5bf14692d
SLIDER-1174 Support Tensorflow on Slider (Yang Wang aka. fly_in_gis via gourksaha)
Project: http://git-wip-us.apache.org/repos/asf/incubator-slider/repo
Commit: http://git-wip-us.apache.org/repos/asf/incubator-slider/commit/5bf14692
Tree: http://git-wip-us.apache.org/repos/asf/incubator-slider/tree/5bf14692
Diff: http://git-wip-us.apache.org/repos/asf/incubator-slider/diff/5bf14692
Branch: refs/heads/develop
Commit: 5bf14692d83705392de594e9a199ec0e632fc0d2
Parents: 134ef53
Author: Gour Saha <go...@apache.org>
Authored: Mon Mar 6 19:13:56 2017 -0800
Committer: Gour Saha <go...@apache.org>
Committed: Mon Mar 6 19:13:56 2017 -0800
----------------------------------------------------------------------
app-packages/tensorflow/README.md | 56 +++++++
app-packages/tensorflow/appConfig.default.json | 18 +++
app-packages/tensorflow/metainfo.json | 85 ++++++++++
.../tensorflow/package/files/yarn_bootstrap.py | 95 +++++++++++
app-packages/tensorflow/package/files/ymnist.py | 88 ++++++++++
.../tensorflow/package/scripts/functions.py | 160 +++++++++++++++++++
.../tensorflow/package/scripts/params.py | 54 +++++++
.../tensorflow/package/scripts/tensorflow.py | 51 ++++++
.../package/scripts/tensorflow_service.py | 109 +++++++++++++
app-packages/tensorflow/resources.default.json | 38 +++++
.../tensorflow/ytensorflow/config.default.json | 29 ++++
app-packages/tensorflow/ytensorflow/ytensorflow | 65 ++++++++
.../tensorflow/ytensorflow/ytensorflow.py | 108 +++++++++++++
13 files changed, 956 insertions(+)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/incubator-slider/blob/5bf14692/app-packages/tensorflow/README.md
----------------------------------------------------------------------
diff --git a/app-packages/tensorflow/README.md b/app-packages/tensorflow/README.md
new file mode 100644
index 0000000..b6dc8f5
--- /dev/null
+++ b/app-packages/tensorflow/README.md
@@ -0,0 +1,56 @@
+<!---
+ Licensed to the Apache Software Foundation (ASF) under one or more
+ contributor license agreements. See the NOTICE file distributed with
+ this work for additional information regarding copyright ownership.
+ The ASF licenses this file to You under the Apache License, Version 2.0
+ (the "License"); you may not use this file except in compliance with
+ the License. You may obtain a copy of the License at
+
+ http://www.apache.org/licenses/LICENSE-2.0
+
+ Unless required by applicable law or agreed to in writing, software
+ distributed under the License is distributed on an "AS IS" BASIS,
+ WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ See the License for the specific language governing permissions and
+ limitations under the License.
+-->
+
+# Environments
+* Redhat 7
+* Linux Kernel 3.10.0
+* Docker 1.12.3
+* Hadoop 2.6.5
+* Docker image ytensorflow:0.2.1, [Dockerfile](https://github.com/tensorflow/ecosystem/blob/master/docker/Dockerfile.hdfs)
+
+# Use slider to run a tensorflow cluster
+1. Make sure slider could work well, see the [Slider Start](https://slider.incubator.apache.org/docs/getting_started.html)
+2. Download app-packages/tensorflow to $SLIDER_HOME/app-packages/tensorflow
+3. Put your tensorflow scripts under app-packages/tensorflow/package/files
+4. Set "site.global.hadoop.conf", "site.global.user.scripts.entry", "site.global.user.data.dir", "site.global.user.checkpoint.dir" according to your situation in "appConfig.default.json"
+5. Set resource in resources.default.json if you need
+6. As is often the case, there is no need to update metainfo.json
+7. Start your tensorflow cluster
+```
+cd $SLIDER_HOME/app-packages/tensorflow
+slider create [app-name] --appdef . --template appConfig.default.json --resources resources.default.json
+```
+
+# Use ytensorflow to run a tensorflow cluster
+## Introduction
+ytensorflow(tensorflow on YARN admin client), is used to submit and manage tensorflow cluster on YARN. It aims to make submit more easier.
+## Command
+```
+ytensorflow cluster -start ./config.json -files ./mnist.py
+ytensorflow cluster -stop <appName>
+ytensorflow cluster -status <appName>
+ytensorflow version
+```
+
+# User scripts requirements
+The following arguments will be generated by the framework and passed to user script. You should use them in the right positon, just as the "mnist.py"
+* job_name, worker or ps
+* task_index
+* ps_hosts
+* worker_hosts
+* data_dir, directory where the user data is stored
+* ckp_dir, directory for storing the checkpoints
http://git-wip-us.apache.org/repos/asf/incubator-slider/blob/5bf14692/app-packages/tensorflow/appConfig.default.json
----------------------------------------------------------------------
diff --git a/app-packages/tensorflow/appConfig.default.json b/app-packages/tensorflow/appConfig.default.json
new file mode 100644
index 0000000..5482254
--- /dev/null
+++ b/app-packages/tensorflow/appConfig.default.json
@@ -0,0 +1,18 @@
+{
+ "schema": "http://example.org/specification/v2.0.0",
+ "metadata": {
+ },
+ "global": {
+ "site.global.hadoop.conf": "/etc/hadoop/conf",
+ "site.global.yarn.cgroup.root": "/hadoop-yarn",
+ "site.global.user.scripts.entry": "ymnist.py",
+ "site.global.user.checkpoint.prefix": "hdfs://hdpdev/user/${USER_NAME}/.slider/cluster",
+ "site.global.user.name": "${USER_NAME}",
+ "site.global.zookeeper.quorum": "${ZK_HOST}",
+ "site.global.docker.image": "ytensorflow:0.2.1",
+ "site.global.ps.port": "${ps.ALLOCATED_PORT}{PER_CONTAINER}",
+ "site.global.chiefworker.port": "${chiefworker.ALLOCATED_PORT}{PER_CONTAINER}",
+ "site.global.worker.port": "${worker.ALLOCATED_PORT}{PER_CONTAINER}",
+ "site.global.tensorboard.port": "${tensorboard.ALLOCATED_PORT}{PER_CONTAINER}"
+ }
+}
http://git-wip-us.apache.org/repos/asf/incubator-slider/blob/5bf14692/app-packages/tensorflow/metainfo.json
----------------------------------------------------------------------
diff --git a/app-packages/tensorflow/metainfo.json b/app-packages/tensorflow/metainfo.json
new file mode 100644
index 0000000..631146b
--- /dev/null
+++ b/app-packages/tensorflow/metainfo.json
@@ -0,0 +1,85 @@
+{
+ "schemaVersion": "2.1",
+ "application": {
+ "name": "tensorflow",
+ "version": "0.1.1",
+ "exportGroups": [
+ {
+ "name": "ps",
+ "exports": [
+ {
+ "name": "host_port",
+ "value": "${ps_HOST}:${site.global.ps.port}"
+ }
+ ]
+ },
+ {
+ "name": "chiefworker",
+ "exports": [
+ {
+ "name": "host_port",
+ "value": "${chiefworker_HOST}:${site.global.chiefworker.port}"
+ }
+ ]
+ },
+ {
+ "name": "worker",
+ "exports": [
+ {
+ "name": "host_port",
+ "value": "${worker_HOST}:${site.global.worker.port}"
+ }
+ ]
+ },
+ {
+ "name": "tensorboard",
+ "exports": [
+ {
+ "name": "url",
+ "value": "http://${tensorboard_HOST}:${site.global.tensorboard.port}"
+ }
+ ]
+ }
+ ],
+ "components": [
+ {
+ "name": "ps",
+ "compExports": "ps-host_port",
+ "commandScript": {
+ "script": "scripts/tensorflow.py",
+ "scriptType": "PYTHON"
+ }
+ },
+ {
+ "name": "chiefworker",
+ "compExports": "chiefworker-host_port",
+ "commandScript": {
+ "script": "scripts/tensorflow.py",
+ "scriptType": "PYTHON"
+ }
+ },
+ {
+ "name": "worker",
+ "compExports": "worker-host_port",
+ "commandScript": {
+ "script": "scripts/tensorflow.py",
+ "scriptType": "PYTHON"
+ }
+ },
+ {
+ "name": "tensorboard",
+ "compExports": "tensorboard-url",
+ "commandScript": {
+ "script": "scripts/tensorflow.py",
+ "scriptType": "PYTHON"
+ }
+ }
+ ],
+ "packages": [
+ {
+ "type": "folder",
+ "name": "files"
+ }
+ ]
+ }
+}
http://git-wip-us.apache.org/repos/asf/incubator-slider/blob/5bf14692/app-packages/tensorflow/package/files/yarn_bootstrap.py
----------------------------------------------------------------------
diff --git a/app-packages/tensorflow/package/files/yarn_bootstrap.py b/app-packages/tensorflow/package/files/yarn_bootstrap.py
new file mode 100644
index 0000000..8587049
--- /dev/null
+++ b/app-packages/tensorflow/package/files/yarn_bootstrap.py
@@ -0,0 +1,95 @@
+#!/usr/bin/env python
+"""
+Licensed to the Apache Software Foundation (ASF) under one
+or more contributor license agreements. See the NOTICE file
+distributed with this work for additional information
+regarding copyright ownership. The ASF licenses this file
+to you under the Apache License, Version 2.0 (the
+"License"); you may not use this file except in compliance
+with the License. You may obtain a copy of the License at
+
+ http://www.apache.org/licenses/LICENSE-2.0
+
+Unless required by applicable law or agreed to in writing, software
+distributed under the License is distributed on an "AS IS" BASIS,
+WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+See the License for the specific language governing permissions and
+limitations under the License.
+
+"""
+
+from __future__ import absolute_import
+from __future__ import division
+from __future__ import print_function
+
+import time
+from abc import abstractmethod, ABCMeta
+import tensorflow as tf
+
+flags = tf.app.flags
+# Flags for configuring the task
+flags.DEFINE_string("job_name", None, "job name: worker or ps")
+flags.DEFINE_integer("task_index", 0, "Worker task index, should be >= 0")
+flags.DEFINE_string("ps_hosts", "", "Comma-separated list of hostname:port pairs")
+flags.DEFINE_string("worker_hosts", "", "Comma-separated list of hostname:port pairs")
+flags.DEFINE_string("ckp_dir", None, "Directory for storing the checkpoints")
+flags.DEFINE_string("work_dir", "/tmp/tf_on_yarn", "Work directory")
+
+FLAGS = flags.FLAGS
+
+class YarnBootstrap(object):
+ def __init__(self):
+ pass
+
+ __metaclass__ = ABCMeta
+
+ @abstractmethod
+ def worker_do(self, server, cluster_spec, task_id):
+ pass
+
+ @abstractmethod
+ def ps_do(self, server, cluster_spec, task_id):
+ pass
+
+ def device_and_server(self):
+ # If FLAGS.job_name is not set, we're running single-machine TensorFlow.
+ # Don't set a device.
+ if FLAGS.job_name is None:
+ print("Running single-machine training")
+ return (None, "", "")
+
+ # Otherwise we're running distributed TensorFlow.
+ print("Running distributed training")
+ if FLAGS.task_index is None or FLAGS.task_index == "":
+ raise ValueError("Must specify an explicit `task_index`")
+ if FLAGS.ps_hosts is None or FLAGS.ps_hosts == "":
+ raise ValueError("Must specify an explicit `ps_hosts`")
+ if FLAGS.worker_hosts is None or FLAGS.worker_hosts == "":
+ raise ValueError("Must specify an explicit `worker_hosts`")
+
+ cluster_spec = tf.train.ClusterSpec({
+ "ps": FLAGS.ps_hosts.split(","),
+ "worker": FLAGS.worker_hosts.split(","),
+ })
+ server = tf.train.Server(
+ cluster_spec, job_name=FLAGS.job_name, task_index=FLAGS.task_index)
+ time.sleep(60)
+ if FLAGS.job_name == "ps":
+ self.ps_do(server, cluster_spec, FLAGS.task_index)
+ server.join()
+
+ worker_device = "/job:worker/task:{}".format(FLAGS.task_index)
+ return (
+ tf.train.replica_device_setter(
+ worker_device=worker_device,
+ cluster=cluster_spec),
+ server, cluster_spec
+ )
+
+ def start(self, unused_args):
+ if FLAGS.ckp_dir is None or FLAGS.ckp_dir == "":
+ raise ValueError("Must specify an explicit `ckp_dir`")
+ device, server, cluster_spec = self.device_and_server()
+ with tf.device(device):
+ self.worker_do(server, cluster_spec, FLAGS.task_index)
+
http://git-wip-us.apache.org/repos/asf/incubator-slider/blob/5bf14692/app-packages/tensorflow/package/files/ymnist.py
----------------------------------------------------------------------
diff --git a/app-packages/tensorflow/package/files/ymnist.py b/app-packages/tensorflow/package/files/ymnist.py
new file mode 100644
index 0000000..e938b05
--- /dev/null
+++ b/app-packages/tensorflow/package/files/ymnist.py
@@ -0,0 +1,88 @@
+#!/usr/bin/env python
+"""
+Licensed to the Apache Software Foundation (ASF) under one
+or more contributor license agreements. See the NOTICE file
+distributed with this work for additional information
+regarding copyright ownership. The ASF licenses this file
+to you under the Apache License, Version 2.0 (the
+"License"); you may not use this file except in compliance
+with the License. You may obtain a copy of the License at
+
+ http://www.apache.org/licenses/LICENSE-2.0
+
+Unless required by applicable law or agreed to in writing, software
+distributed under the License is distributed on an "AS IS" BASIS,
+WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+See the License for the specific language governing permissions and
+limitations under the License.
+
+"""
+
+from __future__ import division
+from __future__ import print_function
+
+import os
+
+import tensorflow as tf
+from tensorflow.examples.tutorials.mnist import mnist
+from tensorflow.python.training import training_util
+from yarn_bootstrap import *
+
+FLAGS = tf.app.flags.FLAGS
+
+class Ymnist(YarnBootstrap):
+ def worker_do(self, server, cluster_spec, task_id):
+ print("Checkpoint dir: " + FLAGS.ckp_dir)
+ images, labels = self.inputs(100)
+ logits = mnist.inference(images, 128, 128)
+ loss = mnist.loss(logits, labels)
+ train_op = mnist.training(loss, 0.01)
+ target = "" if server == "" else server.target
+ with tf.train.MonitoredTrainingSession(
+ master=target,
+ is_chief=(task_id == 0),
+ checkpoint_dir=FLAGS.ckp_dir) as sess:
+ step = 0
+ while not sess.should_stop() and step < 1000000:
+ sess.run(train_op)
+ step = training_util.global_step(sess, training_util.get_global_step(sess.graph))
+ print("Global step " + str(step))
+
+ def ps_do(self, server, cluster_spec, task_id):
+ print("Starting ps " + str(task_id))
+
+ def read_and_decode(self, filename_queue):
+ reader = tf.TFRecordReader()
+ _, serialized_example = reader.read(filename_queue)
+ features = tf.parse_single_example(
+ serialized_example,
+ # Defaults are not specified since both keys are required.
+ features={
+ 'image_raw': tf.FixedLenFeature([], tf.string),
+ 'label': tf.FixedLenFeature([], tf.int64),
+ })
+ image = tf.decode_raw(features['image_raw'], tf.uint8)
+ image.set_shape([mnist.IMAGE_PIXELS])
+ # Convert from [0, 255] -> [-0.5, 0.5] floats.
+ image = tf.cast(image, tf.float32) * (1. / 255) - 0.5
+ # Convert label from a scalar uint8 tensor to an int32 scalar.
+ label = tf.cast(features['label'], tf.int32)
+ return image, label
+
+ def inputs(self, batch_size):
+ filename = os.path.join("hdfs://hdpdev/user/danrtsey.wy/mnist-data", "train.tfrecords")
+ with tf.name_scope('input'):
+ filename_queue = tf.train.string_input_producer([filename])
+ image, label = self.read_and_decode(filename_queue)
+ images, sparse_labels = tf.train.shuffle_batch(
+ [image, label], batch_size=batch_size, num_threads=2,
+ capacity=1000 + 3 * batch_size,
+ # Ensures a minimum amount of shuffling of examples.
+ min_after_dequeue=1000)
+ return images, sparse_labels
+
+def main(unused_argv):
+ Ymnist().start(unused_args=unused_argv)
+
+if __name__ == "__main__":
+ tf.app.run()
http://git-wip-us.apache.org/repos/asf/incubator-slider/blob/5bf14692/app-packages/tensorflow/package/scripts/functions.py
----------------------------------------------------------------------
diff --git a/app-packages/tensorflow/package/scripts/functions.py b/app-packages/tensorflow/package/scripts/functions.py
new file mode 100644
index 0000000..60fdbd4
--- /dev/null
+++ b/app-packages/tensorflow/package/scripts/functions.py
@@ -0,0 +1,160 @@
+#!/usr/bin/env python
+"""
+Licensed to the Apache Software Foundation (ASF) under one
+or more contributor license agreements. See the NOTICE file
+distributed with this work for additional information
+regarding copyright ownership. The ASF licenses this file
+to you under the Apache License, Version 2.0 (the
+"License"); you may not use this file except in compliance
+with the License. You may obtain a copy of the License at
+
+ http://www.apache.org/licenses/LICENSE-2.0
+
+Unless required by applicable law or agreed to in writing, software
+distributed under the License is distributed on an "AS IS" BASIS,
+WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+See the License for the specific language governing permissions and
+limitations under the License.
+
+"""
+
+import json
+import urllib2
+import socket
+from kazoo.client import KazooClient
+from resource_management import *
+
+
+def get_am_rest_base():
+ am_rest_base = ""
+ zk = KazooClient(hosts=format("{registry_zk}"))
+ zk.start()
+ path = format("/registry/users/{user_name}/services/org-apache-slider/{service_name}")
+ data, stat = zk.get(path)
+ app_registry = json.loads(data)
+ for item in app_registry['external']:
+ if item['api'] == 'classpath:org.apache.slider.client.rest':
+ am_rest_base = item['addresses'][0]['uri']
+ return am_rest_base
+
+
+def get_allocated_resources():
+ resources_rest_url = get_am_rest_base() + '/ws/v1/slider/application/live/resources'
+ resources = json.loads(urllib2.urlopen(resources_rest_url).read())
+ mem_ps = int(resources['components']['ps']['yarn.memory'])
+ vcore_ps = int(resources['components']['ps']['yarn.vcores'])
+ mem_chiefworker = int(resources['components']['chiefworker']['yarn.memory'])
+ vcore_chiefworker = int(resources['components']['chiefworker']['yarn.vcores'])
+ mem_worker = int(resources['components']['worker']['yarn.memory'])
+ vcore_worker = int(resources['components']['worker']['yarn.vcores'])
+ mem_tb = int(resources['components']['tensorboard']['yarn.memory'])
+ vcore_tb = int(resources['components']['tensorboard']['yarn.vcores'])
+ dict = {"mem.ps": mem_ps, "vcore.ps": vcore_ps,
+ "mem.chiefworker": mem_chiefworker, "vcore_chiefworker": vcore_chiefworker,
+ "mem.worker": mem_worker, "vcore.worker": vcore_worker,
+ "mem.tensorboard": mem_tb, "vcore.tensorboard": vcore_tb}
+ return dict
+
+def get_allocated_instances_num():
+ resources_rest_url = get_am_rest_base() + '/ws/v1/slider/application/live/resources'
+ resources = json.loads(urllib2.urlopen(resources_rest_url).read())
+ n = int(resources['components']['ps']['yarn.component.instances'])
+ cw = int(resources['components']['chiefworker']['yarn.component.instances'])
+ w = int(resources['components']['worker']['yarn.component.instances'])
+ return n, cw + w
+
+def get_launched_instances():
+ try:
+ exports_rest_url = get_am_rest_base() + '/ws/v1/slider/publisher/exports'
+ # get launched ps list
+ exports_ps = json.loads(urllib2.urlopen(exports_rest_url + '/ps').read())
+ ps_list = []
+ for item in exports_ps['entries']['host_port']:
+ ps_list.append(item['value'])
+ # get launched chief worker
+ exports_chiefworker = json.loads(urllib2.urlopen(exports_rest_url + '/chiefworker').read())
+ chiefworker_list = []
+ for item in exports_chiefworker['entries']['host_port']:
+ chiefworker_list.append(item['value'])
+ # get launched worker list
+ exports_worker = json.loads(urllib2.urlopen(exports_rest_url + '/worker').read())
+ worker_list = []
+ for item in exports_worker['entries']['host_port']:
+ worker_list.append(item['value'])
+ except:
+ return ([], [])
+ else:
+ return (ps_list, chiefworker_list + worker_list)
+
+def get_application_id(container_id):
+ ss = container_id.split('_')
+ return "application_" + ss[-4] + "_" + ss[-3]
+
+def is_port_active(host, port, retry=3):
+ sock = socket.socket(socket.AF_INET, socket.SOCK_STREAM)
+ for i in range(0, 3):
+ try:
+ sock.connect((host, port))
+ sock.close()
+ return True
+ except Exception, e:
+ sock.close()
+ return False
+
+
+def get_workers():
+ running = []
+ finished = []
+ comps_url = get_am_rest_base() + format(
+ "/ws/v1/slider/registry/users/{user_name}/services/org-apache-slider/{service_name}/components")
+ comps = json.loads(urllib2.urlopen(comps_url).read())
+ for node in comps['nodes']:
+ comp_url = comps_url + "/" + node
+ comp = json.loads(urllib2.urlopen(comp_url).read())
+ if 'worker' in comp['service']['description']:
+ if comp['service'].has_key('status') and comp['service']['status'] == 'finished':
+ finished.append(node)
+ else:
+ running.append(node)
+ return running, finished
+
+
+def set_container_status(containerid, status='finished'):
+ comp_url = get_am_rest_base() \
+ + format("/ws/v1/slider/registry/users/{user_name}/services/org-apache-slider/{service_name}/components/") \
+ + containerid.replace('_', '-')
+ comp = json.loads(urllib2.urlopen(comp_url).read())
+ if not comp['service'].has_key('status') or comp['service']['status'] != 'finished':
+ comp['service']['status'] = status
+ zk = KazooClient(hosts=format("{registry_zk}"))
+ zk.start()
+ path = format("/registry/users/{user_name}/services/org-apache-slider/{service_name}/components/") \
+ + containerid.replace('_', '-')
+ zk.set(path, json.dumps(comp['service']))
+
+
+def set_retry_num(containerid):
+ comp_url = get_am_rest_base() \
+ + format("/ws/v1/slider/registry/users/{user_name}/services/org-apache-slider/{service_name}/components/") \
+ + containerid.replace('_', '-')
+ comp = json.loads(urllib2.urlopen(comp_url).read())
+ if not comp['service'].has_key('retry'):
+ comp['service']['retry'] = 1
+ else:
+ comp['service']['retry'] = int(comp['service']['retry']) + 1
+ zk = KazooClient(hosts=format("{registry_zk}"))
+ zk.start()
+ path = format("/registry/users/{user_name}/services/org-apache-slider/{service_name}/components/") \
+ + containerid.replace('_', '-')
+ zk.set(path, json.dumps(comp['service']))
+ return comp['service']['retry']
+
+
+def stop_cluster():
+ # use restAPI to stop cluster
+ stop_url = get_am_rest_base() + '/ws/v1/slider/application/action/stop'
+ # To be compatible with hadoop-2.6.*, make a trick, use GET method to stop
+ # This will be replaced by POST, https://issues.apache.org/jira/browse/YARN-2031
+ # And actionStopGet should be implemented in ApplicationResource.java
+ res = urllib2.urlopen(stop_url).read()
+ print res
http://git-wip-us.apache.org/repos/asf/incubator-slider/blob/5bf14692/app-packages/tensorflow/package/scripts/params.py
----------------------------------------------------------------------
diff --git a/app-packages/tensorflow/package/scripts/params.py b/app-packages/tensorflow/package/scripts/params.py
new file mode 100644
index 0000000..ed7c6ef
--- /dev/null
+++ b/app-packages/tensorflow/package/scripts/params.py
@@ -0,0 +1,54 @@
+#!/usr/bin/env python
+"""
+Licensed to the Apache Software Foundation (ASF) under one
+or more contributor license agreements. See the NOTICE file
+distributed with this work for additional information
+regarding copyright ownership. The ASF licenses this file
+to you under the Apache License, Version 2.0 (the
+"License"); you may not use this file except in compliance
+with the License. You may obtain a copy of the License at
+
+ http://www.apache.org/licenses/LICENSE-2.0
+
+Unless required by applicable law or agreed to in writing, software
+distributed under the License is distributed on an "AS IS" BASIS,
+WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+See the License for the specific language governing permissions and
+limitations under the License.
+
+"""
+
+from resource_management import *
+
+# server configurations
+config = Script.get_config()
+
+hadoop_conf = config['configurations']['global']['hadoop.conf']
+yarn_cg_root = config['configurations']['global']['yarn.cgroup.root']
+
+user_name = config['configurations']['global']['user.name']
+registry_zk = config['configurations']['global']['zookeeper.quorum']
+
+user_scripts_entry = config['configurations']['global']['user.scripts.entry']
+user_checkpoint_prefix = config['configurations']['global']['user.checkpoint.prefix']
+
+docker_image = config['configurations']['global']['docker.image']
+
+app_root = config['configurations']['global']['app_root']
+app_log_dir = config['configurations']['global']['app_log_dir']
+pid_file = config['configurations']['global']['pid_file']
+
+container_id = config['configurations']['global']['app_container_id']
+
+ps_port = config['configurations']['global']['ps.port']
+chiefworker_port = config['configurations']['global']['chiefworker.port']
+worker_port = config['configurations']['global']['worker.port']
+tensorboard_port = config['configurations']['global']['tensorboard.port']
+ports_dict = {"port.ps": ps_port,
+ "port.chiefworker": chiefworker_port,
+ "port.worker": worker_port,
+ "port.tensorboard": tensorboard_port}
+
+componentName = config['componentName']
+service_name = config['serviceName']
+hostname = config['hostname']
http://git-wip-us.apache.org/repos/asf/incubator-slider/blob/5bf14692/app-packages/tensorflow/package/scripts/tensorflow.py
----------------------------------------------------------------------
diff --git a/app-packages/tensorflow/package/scripts/tensorflow.py b/app-packages/tensorflow/package/scripts/tensorflow.py
new file mode 100644
index 0000000..d41998b
--- /dev/null
+++ b/app-packages/tensorflow/package/scripts/tensorflow.py
@@ -0,0 +1,51 @@
+#!/usr/bin/env python
+"""
+Licensed to the Apache Software Foundation (ASF) under one
+or more contributor license agreements. See the NOTICE file
+distributed with this work for additional information
+regarding copyright ownership. The ASF licenses this file
+to you under the Apache License, Version 2.0 (the
+"License"); you may not use this file except in compliance
+with the License. You may obtain a copy of the License at
+
+ http://www.apache.org/licenses/LICENSE-2.0
+
+Unless required by applicable law or agreed to in writing, software
+distributed under the License is distributed on an "AS IS" BASIS,
+WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+See the License for the specific language governing permissions and
+limitations under the License.
+
+"""
+
+from resource_management import *
+from tensorflow_service import tensorflow_service
+
+
+class Tensorflow(Script):
+ def install(self, env):
+ self.install_packages(env)
+ pass
+
+ def configure(self, env):
+ import params
+ env.set_params(params)
+
+ def start(self, env):
+ import params
+ env.set_params(params)
+ tensorflow_service(action='start')
+
+ def stop(self, env):
+ import params
+ env.set_params(params)
+ tensorflow_service(action='stop')
+
+ def status(self, env):
+ import params
+ env.set_params(params)
+ tensorflow_service(action='status')
+
+
+if __name__ == "__main__":
+ Tensorflow().execute()
http://git-wip-us.apache.org/repos/asf/incubator-slider/blob/5bf14692/app-packages/tensorflow/package/scripts/tensorflow_service.py
----------------------------------------------------------------------
diff --git a/app-packages/tensorflow/package/scripts/tensorflow_service.py b/app-packages/tensorflow/package/scripts/tensorflow_service.py
new file mode 100644
index 0000000..8fe265d
--- /dev/null
+++ b/app-packages/tensorflow/package/scripts/tensorflow_service.py
@@ -0,0 +1,109 @@
+#!/usr/bin/env python
+"""
+Licensed to the Apache Software Foundation (ASF) under one
+or more contributor license agreements. See the NOTICE file
+distributed with this work for additional information
+regarding copyright ownership. The ASF licenses this file
+to you under the Apache License, Version 2.0 (the
+"License"); you may not use this file except in compliance
+with the License. You may obtain a copy of the License at
+
+ http://www.apache.org/licenses/LICENSE-2.0
+
+Unless required by applicable law or agreed to in writing, software
+distributed under the License is distributed on an "AS IS" BASIS,
+WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+See the License for the specific language governing permissions and
+limitations under the License.
+
+"""
+
+import os
+import time
+from resource_management import *
+
+def tensorflow_service(action='start'): # 'start' or 'stop' or 'status'
+ import params
+ import functions
+ container_id = format("{container_id}")
+ application_id = functions.get_application_id(container_id)
+ componentName = format("{componentName}")
+
+ if action == 'start':
+ checkpoint_dir = format("{user_checkpoint_prefix}/{service_name}/checkpoints")
+ mem = functions.get_allocated_resources()['mem.' + componentName]
+ allocated_port = params.ports_dict['port.' + componentName]
+ # Always launch role tensorboard
+ if componentName == "tensorboard":
+ daemon_cmd = format("/usr/bin/docker run -d -u $(id -u yarn) --cgroup-parent={yarn_cg_root}/{container_id} -m {mem}m " \
+ "-v {hadoop_conf}:/usr/local/hadoop/etc/hadoop " \
+ "-v /etc/passwd:/etc/passwd -v /etc/group:/etc/group " \
+ "-p {allocated_port}:{allocated_port} --name={container_id} ytensorflow:0.2.1 " \
+ "/bin/bash -c 'tensorboard --logdir={checkpoint_dir} --port={allocated_port}'")
+ Execute(daemon_cmd)
+ else:
+ # Waiting for all ps/worker to be exported
+ num_ps, num_worker = functions.get_allocated_instances_num()
+ num_allocated = num_ps + num_worker
+ ps_list, worker_list = functions.get_launched_instances()
+ num_launched = len(ps_list) + len(worker_list)
+ while num_launched < num_allocated:
+ print format("Waiting for all ports({num_launched}/{num_allocated}) to be exported")
+ time.sleep(5)
+ ps_list, worker_list = functions.get_launched_instances()
+ num_launched = len(ps_list) + len(worker_list)
+ # Generate parameters
+ ps_hosts = ",".join(ps_list)
+ worker_hosts = ",".join(worker_list)
+ task_index = (ps_list.index(format("{hostname}:{allocated_port}"))) if (componentName == 'ps') else (
+ worker_list.index(format("{hostname}:{allocated_port}")))
+ job_name = "worker" if (componentName == 'chiefworker') else componentName
+ # Build clusterSpec and command
+ daemon_cmd = format("/usr/bin/docker run -d -u $(id -u yarn) --cgroup-parent={yarn_cg_root}/{container_id} -m {mem}m " \
+ "-v {hadoop_conf}:/usr/local/hadoop/etc/hadoop " \
+ "-v /etc/passwd:/etc/passwd -v /etc/group:/etc/group " \
+ "-v {app_root}:{app_root} -v {app_log_dir}:{app_log_dir} " \
+ "-p {allocated_port}:{allocated_port} --name={container_id} {docker_image} " \
+ "/bin/bash -c 'export HADOOP_USER_NAME={user_name}; /usr/bin/python {app_root}/{user_scripts_entry} " \
+ "--ps_hosts={ps_hosts} --worker_hosts={worker_hosts} --job_name={job_name} --task_index={task_index} " \
+ "--ckp_dir={checkpoint_dir} --work_dir={app_root} >>{app_log_dir}/tensorflow.out 2>>{app_log_dir}/tensorflow.err'")
+ Execute(daemon_cmd)
+ elif action == 'stop':
+ cmd = format("/usr/bin/docker stop {container_id}")
+ op_test = format("/usr/bin/docker ps | grep {container_id} >/dev/null 2>&1")
+ Execute(cmd,
+ tries=5,
+ try_sleep=10,
+ wait_for_finish=True,
+ only_if=op_test
+ )
+ elif action == 'status':
+ cmd_status = "/usr/bin/docker inspect -f '{{.State.Running}}' %s" % container_id
+ running = os.popen(cmd_status).read().strip('\n')
+ if running == 'true':
+ print "Component instance is running..."
+ # Role tensorboard will watch all workers' status
+ if componentName == "tensorboard":
+ running, finished = functions.get_workers()
+ print "Running tensorflow workers(%s) : %s \nFinished tensorflow workers(%s) : %s" \
+ % (len(running), ','.join(running), len(finished), ','.join(finished))
+ # All worker has finished successfully, going to stop cluster...
+ num_ps, num_worker = functions.get_allocated_instances_num()
+ if len(finished) == num_worker:
+ functions.stop_cluster()
+ else:
+ cmd_exit = "/usr/bin/docker inspect -f '{{.State.ExitCode}}' %s" % container_id
+ exit_code = int(os.popen(cmd_exit).read().strip('\n'))
+ if exit_code != 0:
+ retry = functions.set_retry_num(container_id)
+ if retry <= 5:
+ # Remove failed docker container
+ cmd_rm = format("/usr/bin/docker rm -f {container_id}")
+ Execute(cmd_rm)
+ # restart user tensorflow script
+ tensorflow_service(action='start')
+ else:
+ raise ComponentIsNotRunning()
+ else:
+ print "Component instance has finished successfully"
+ functions.set_container_status(container_id)
http://git-wip-us.apache.org/repos/asf/incubator-slider/blob/5bf14692/app-packages/tensorflow/resources.default.json
----------------------------------------------------------------------
diff --git a/app-packages/tensorflow/resources.default.json b/app-packages/tensorflow/resources.default.json
new file mode 100644
index 0000000..b0ef16a
--- /dev/null
+++ b/app-packages/tensorflow/resources.default.json
@@ -0,0 +1,38 @@
+{
+ "schema" : "http://example.org/specification/v2.0.0",
+ "metadata" : {
+ },
+ "global" : {
+ },
+ "components": {
+ "slider-appmaster": {
+ },
+ "ps": {
+ "yarn.role.priority": "1",
+ "yarn.component.instances": "2",
+ "yarn.memory": "2048",
+ "yarn.vcores": "1",
+ "yarn.container.failure.threshold": "0"
+ },
+ "chiefworker": {
+ "yarn.role.priority": "2",
+ "yarn.component.instances": "1",
+ "yarn.memory": "4096",
+ "yarn.vcores": "1",
+ "yarn.container.failure.threshold": "0"
+ },
+ "worker": {
+ "yarn.role.priority": "3",
+ "yarn.component.instances": "5",
+ "yarn.memory": "2048",
+ "yarn.vcores": "1",
+ "yarn.container.failure.threshold": "0"
+ },
+ "tensorboard": {
+ "yarn.role.priority": "4",
+ "yarn.component.instances": "1",
+ "yarn.memory": "4096",
+ "yarn.vcores": "1"
+ }
+ }
+}
http://git-wip-us.apache.org/repos/asf/incubator-slider/blob/5bf14692/app-packages/tensorflow/ytensorflow/config.default.json
----------------------------------------------------------------------
diff --git a/app-packages/tensorflow/ytensorflow/config.default.json b/app-packages/tensorflow/ytensorflow/config.default.json
new file mode 100644
index 0000000..eff4368
--- /dev/null
+++ b/app-packages/tensorflow/ytensorflow/config.default.json
@@ -0,0 +1,29 @@
+{
+ "schema" : "http://example.org/specification/v2.0.0",
+ "commandConfig": {
+ "app.name": "tfdocker1"
+ },
+ "appConfig" : {
+ "global": {
+ "user.scripts.entry": "ymnist.py"
+ }
+ },
+ "resources": {
+ "components": {
+ "ps": {
+ "yarn.component.instances": "2",
+ "yarn.memory": "2048",
+ "yarn.vcores": "1"
+ },
+ "chiefworker": {
+ "yarn.memory": "4096",
+ "yarn.vcores": "1"
+ },
+ "worker": {
+ "yarn.component.instances": "4",
+ "yarn.memory": "2048",
+ "yarn.vcores": "1"
+ }
+ }
+ }
+}
\ No newline at end of file
http://git-wip-us.apache.org/repos/asf/incubator-slider/blob/5bf14692/app-packages/tensorflow/ytensorflow/ytensorflow
----------------------------------------------------------------------
diff --git a/app-packages/tensorflow/ytensorflow/ytensorflow b/app-packages/tensorflow/ytensorflow/ytensorflow
new file mode 100644
index 0000000..2e3c689
--- /dev/null
+++ b/app-packages/tensorflow/ytensorflow/ytensorflow
@@ -0,0 +1,65 @@
+#!/usr/bin/env bash
+
+# Licensed to the Apache Software Foundation (ASF) under one
+# or more contributor license agreements. See the NOTICE file
+# distributed with this work for additional information
+# regarding copyright ownership. The ASF licenses this file
+# to you under the Apache License, Version 2.0 (the
+# "License"); you may not use this file except in compliance
+# with the License. You may obtain a copy of the License at
+#
+# http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing, software
+# distributed under the License is distributed on an "AS IS" BASIS,
+# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+# See the License for the specific language governing permissions and
+# limitations under the License.
+
+# this is the wrapper shell script to invoke the ytensorflow.py script in the same folder
+
+bin=`S=\`readlink "$0"\`; [ -z "$S" ] && S=$0; dirname $S`
+bin=`cd "$bin"; pwd`
+
+function print_usage(){
+ echo "Usage: ytensorflow COMMAND"
+ echo " where COMMAND is one of:"
+ echo " cluster cluster management"
+ echo " version print the version"
+ echo ""
+ echo "Most commands print help when invoked w/o parameters."
+}
+
+if [ $# = 0 ]; then
+ print_usage
+ exit
+fi
+
+COMMAND=$1
+case $COMMAND in
+ # usage flags
+ --help|-help|-h)
+ print_usage
+ exit
+ ;;
+
+ # tensorflow cluster management
+ cluster)
+ [ ! -d "$HOME/.slider" ] && { mkdir "$HOME/.slider"; }
+ path_tmp="$HOME/.slider/tensorflow.$(date +'%s')"
+ cp -r "$bin/.." $path_tmp
+ /usr/bin/python $bin/ytensorflow.py $@ $path_tmp
+ [ $? -eq 0 ] && rm -rf $path_tmp
+ exit
+ ;;
+
+ version)
+ /usr/bin/python $bin/ytensorflow.py $@
+ exit
+ ;;
+
+ *)
+ echo "Error: No command named $COMMAND was found."
+ exit
+ ;;
+esac
\ No newline at end of file
http://git-wip-us.apache.org/repos/asf/incubator-slider/blob/5bf14692/app-packages/tensorflow/ytensorflow/ytensorflow.py
----------------------------------------------------------------------
diff --git a/app-packages/tensorflow/ytensorflow/ytensorflow.py b/app-packages/tensorflow/ytensorflow/ytensorflow.py
new file mode 100644
index 0000000..c0597c4
--- /dev/null
+++ b/app-packages/tensorflow/ytensorflow/ytensorflow.py
@@ -0,0 +1,108 @@
+#!/usr/bin/env python
+"""
+Licensed to the Apache Software Foundation (ASF) under one
+or more contributor license agreements. See the NOTICE file
+distributed with this work for additional information
+regarding copyright ownership. The ASF licenses this file
+to you under the Apache License, Version 2.0 (the
+"License"); you may not use this file except in compliance
+with the License. You may obtain a copy of the License at
+
+ http://www.apache.org/licenses/LICENSE-2.0
+
+Unless required by applicable law or agreed to in writing, software
+distributed under the License is distributed on an "AS IS" BASIS,
+WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+See the License for the specific language governing permissions and
+limitations under the License.
+
+"""
+
+import os
+import sys
+import json
+
+
+def main():
+ """
+ ytensorflow main method
+ :return: exit code of the process
+ """
+ returncode = 1
+ slider_home = sys.path[0] + '/../../..'
+ slider = slider_home + '/bin/slider' if os.path.exists(slider_home + '/bin/slider') else 'slider'
+ args = sys.argv[1:]
+ if args[0] == 'version':
+ print get_version()
+ returncode = 0
+ elif args[0] == 'cluster':
+ if args[1] == '-start':
+ app_name = parse_conf(args[2])
+ if 4 < len(args):
+ cp_files(args[args.index('-files') + 1])
+ cmd = "%s status %s || %s destroy %s --force && %s create %s --appdef %s --resources %s --template %s" \
+ %(slider, app_name,
+ slider, app_name,
+ slider, app_name, sys.argv[-1], sys.argv[-1] + '/resources.json', sys.argv[-1] + '/appConfig.json')
+ if args[1] == '-stop':
+ app_name = args[2]
+ cmd = slider + " stop " + app_name
+ if args[1] == '-status':
+ app_name = args[2]
+ cmd = slider + " status " + app_name
+ print cmd
+ returncode = os.system(cmd)
+ return returncode
+
+def get_version():
+ root_path = sys.path[0] + '/..'
+ with open(root_path + "/metainfo.json", 'r') as f:
+ metainfo = json.load(f)
+ name = metainfo['application']['name']
+ version = metainfo['application']['version']
+ return name + " on Slider " + version
+
+def cp_files(files):
+ os.system('cp -r ' + files + ' ' + sys.argv[-1] + '/package/files/')
+
+def parse_conf(config_file):
+ root_path = sys.path[0] + '/..'
+ tmp_path = sys.argv[-1]
+ app_name = 'default_app_name'
+ with open(config_file, 'r') as f:
+ data = json.load(f)
+ for k in data:
+ if k == 'commandConfig':
+ app_name = data[k]['app.name']
+ if k == 'appConfig':
+ # override appconfig.default.json
+ with open(root_path + '/appConfig.default.json', 'r') as f:
+ data_app = json.load(f)
+ for kk,vv in data['appConfig']['global'].items():
+ data_app['global']["site.global." + kk] = vv
+ with open(tmp_path + '/appConfig.json', 'w') as f:
+ json.dump(data_app, f)
+ if k == 'resources':
+ # override resources.default.json
+ with open(root_path + '/resources.default.json', 'r') as f:
+ data_res = json.load(f)
+ for kk,vv in data['resources']['components']['ps'].items():
+ data_res['components']['ps'][kk] = vv
+ for kk,vv in data['resources']['components']['chiefworker'].items():
+ data_res['components']['chiefworker'][kk] = vv
+ for kk,vv in data['resources']['components']['worker'].items():
+ data_res['components']['worker'][kk] = vv
+ with open(tmp_path + '/resources.json', 'w') as f:
+ json.dump(data_res, f)
+ return app_name
+
+if __name__ == '__main__':
+ """
+ Entry point
+ """
+ try:
+ returncode = main()
+ except Exception as e:
+ print "Exception: %s " % str(e)
+ returncode = -1
+ sys.exit(returncode)