You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@kylin.apache.org by li...@apache.org on 2016/03/23 07:59:17 UTC
[28/50] [abbrv] kylin git commit: KYLIN-1249 A client library to help
automatic cube
http://git-wip-us.apache.org/repos/asf/kylin/blob/8d1b1941/tools/kylin_client_tool/scheduler/workers/cube.py
----------------------------------------------------------------------
diff --git a/tools/kylin_client_tool/scheduler/workers/cube.py b/tools/kylin_client_tool/scheduler/workers/cube.py
new file mode 100644
index 0000000..b18be6e
--- /dev/null
+++ b/tools/kylin_client_tool/scheduler/workers/cube.py
@@ -0,0 +1,122 @@
+# -*- coding: utf-8 -*-
+__author__ = 'Huang, Hua'
+
+import time
+import datetime
+import calendar
+from jobs.build import CubeBuildJob
+from jobs.cube import CubeJob
+from models.request import JobBuildRequest
+from models.job import JobInstance, CubeJobStatus
+from settings.settings import KYLIN_JOB_MAX_COCURRENT, KYLIN_JOB_MAX_RETRY
+
+
+class CubeWorker:
+ job_instance_dict = {}
+ job_retry_dict = {}
+ scheduler = None
+ run_cube_job_id = None
+ check_cube_job_id = None
+
+ def __init__(self):
+ pass
+
+ @staticmethod
+ def run_cube_job(endtime):
+ if CubeWorker.all_finished():
+ return True
+
+ running_job_list = CubeWorker.get_current_running_job()
+ print "current running", len(running_job_list), "jobs"
+
+ if running_job_list and len(running_job_list) >= KYLIN_JOB_MAX_COCURRENT:
+ print "will not schedule new jobs this time because running job number >= the max cocurrent job number", KYLIN_JOB_MAX_COCURRENT
+ else:
+ max_allow = KYLIN_JOB_MAX_COCURRENT - len(running_job_list)
+ for cube_name in CubeWorker.job_instance_dict:
+ if max_allow <= 0: break
+
+ job_instance = CubeWorker.job_instance_dict[cube_name]
+ if job_instance is None or (
+ isinstance(job_instance, JobInstance) and job_instance.get_status() == CubeJobStatus.ERROR):
+ try_cnt = CubeWorker.job_retry_dict.get(cube_name, -1)
+
+ if try_cnt >= KYLIN_JOB_MAX_RETRY:
+ # have already tried KYLIN_JOB_MAX_RETRY times
+ CubeWorker.job_instance_dict[cube_name] = 0
+ else:
+ # try to cancel the error cube build segment
+ error_job_list = CubeJob.get_cube_job(cube_name, CubeJob.ERROR_JOB_STATUS)
+ if error_job_list:
+ for error_job in error_job_list:
+ CubeBuildJob.cancel_job(error_job.uuid)
+ print "cancel an error job with uuid=", error_job.uuid, "for cube=", cube_name
+
+ # run cube job
+ # instance_list = CubeJob.list_cubes(cube_name)
+ build_request = JobBuildRequest()
+ if endtime is not None:
+ # build_request.startTime = instance_list[0].segments[instance_list[0].segments.__len__() - 1].date_range_end
+ build_request.endTime = \
+ (int(time.mktime(datetime.datetime.strptime(endtime,
+ "%Y-%m-%d").timetuple())) - time.timezone) * 1000
+ else:
+ d = datetime.datetime.utcnow()
+ build_request.endTime = calendar.timegm(d.utctimetuple()) * 1000
+
+ current_job_instance = CubeBuildJob.rebuild_cube(cube_name, build_request)
+
+ if current_job_instance:
+ print "schedule a cube build job for cube =", cube_name
+ CubeWorker.job_instance_dict[cube_name] = current_job_instance
+ max_allow -= 1
+ CubeWorker.job_retry_dict[cube_name] = try_cnt + 1
+
+ @staticmethod
+ def check_cube_job():
+ for cube_name in CubeWorker.job_instance_dict:
+ job_instance = CubeWorker.job_instance_dict[cube_name]
+ if isinstance(job_instance,
+ JobInstance) and job_instance.uuid and job_instance.get_status() == CubeJobStatus.RUNNING:
+ current_job_instance = CubeBuildJob.get_job(job_instance.uuid)
+ if current_job_instance:
+ CubeWorker.job_instance_dict[cube_name] = current_job_instance
+
+ job_instance = CubeWorker.job_instance_dict[cube_name]
+ if job_instance is None:
+ print "status of cube =", cube_name, "is NOT STARTED YET"
+ elif isinstance(job_instance, JobInstance):
+ print "status of cube =", cube_name, "is", job_instance.get_status(), "at %d/%d" % (
+ job_instance.get_current_step(), len(job_instance.steps))
+
+ @staticmethod
+ def get_current_running_job():
+ if not CubeWorker.job_instance_dict:
+ return None
+
+ running_job_list = []
+ for cube_name in CubeWorker.job_instance_dict:
+ job_instance = CubeWorker.job_instance_dict[cube_name]
+
+ if job_instance and isinstance(job_instance,
+ JobInstance) and job_instance.get_status() == CubeJobStatus.RUNNING:
+ running_job_list.append(job_instance)
+
+ return running_job_list
+
+ @staticmethod
+ def all_finished():
+ if not CubeWorker.job_instance_dict:
+ return True
+
+ for cube_name in CubeWorker.job_instance_dict:
+ job_instance = CubeWorker.job_instance_dict[cube_name]
+
+ if job_instance == 0:
+ pass
+ elif job_instance is None:
+ return False
+ elif isinstance(job_instance, JobInstance) and job_instance.get_status() == CubeJobStatus.RUNNING:
+ return False
+
+ return True
http://git-wip-us.apache.org/repos/asf/kylin/blob/8d1b1941/tools/kylin_client_tool/settings/__init__.py
----------------------------------------------------------------------
diff --git a/tools/kylin_client_tool/settings/__init__.py b/tools/kylin_client_tool/settings/__init__.py
new file mode 100644
index 0000000..1b249ac
--- /dev/null
+++ b/tools/kylin_client_tool/settings/__init__.py
@@ -0,0 +1,2 @@
+# -*- coding: utf-8 -*-
+__author__ = 'Huang, Hua'
http://git-wip-us.apache.org/repos/asf/kylin/blob/8d1b1941/tools/kylin_client_tool/settings/settings.py
----------------------------------------------------------------------
diff --git a/tools/kylin_client_tool/settings/settings.py b/tools/kylin_client_tool/settings/settings.py
new file mode 100644
index 0000000..3e7dcfd
--- /dev/null
+++ b/tools/kylin_client_tool/settings/settings.py
@@ -0,0 +1,15 @@
+# -*- coding: utf-8 -*-
+__author__ = 'Huang, Hua'
+
+import os
+
+WORKING_DIR = os.path.dirname(os.path.dirname(__file__))
+PID_PATH = os.path.join(WORKING_DIR, 'kylin_client_tool.pid')
+
+KYLIN_USER = 'ADMIN'
+KYLIN_PASSWORD = 'KYLIN'
+KYLIN_REST_HOST = 'http://123.103.21.35'
+KYLIN_REST_PORT = 7070
+KYLIN_REST_PATH_PREFIX = '/kylin/api'
+KYLIN_JOB_MAX_COCURRENT = 3
+KYLIN_JOB_MAX_RETRY = 1
http://git-wip-us.apache.org/repos/asf/kylin/blob/8d1b1941/tools/kylin_client_tool/setup-mac.sh
----------------------------------------------------------------------
diff --git a/tools/kylin_client_tool/setup-mac.sh b/tools/kylin_client_tool/setup-mac.sh
new file mode 100644
index 0000000..3ec78d8
--- /dev/null
+++ b/tools/kylin_client_tool/setup-mac.sh
@@ -0,0 +1,20 @@
+#!/bin/bash
+
+if [ ! -f "requests-2.9.1.tar.gz" ]
+then
+ echo "requests binary file found"
+ curl -O https://pypi.python.org/packages/source/r/requests/requests-2.9.1.tar.gz || echo "download requests failed"
+fi
+tar -zxvf requests-2.9.1.tar.gz
+cd requests-2.9.1
+python setup.py install
+cd ..
+
+if [ ! -f "APScheduler-3.0.5.tar.gz" ]
+then
+ echo "APScheduler binary file found"
+ curl -O https://pypi.python.org/packages/source/A/APScheduler/APScheduler-3.0.5.tar.gz || echo "download APScheduler failed"
+fi
+tar -zxvf APScheduler-3.0.5.tar.gz
+cd APScheduler-3.0.5
+python setup.py install
\ No newline at end of file
http://git-wip-us.apache.org/repos/asf/kylin/blob/8d1b1941/tools/kylin_client_tool/setup.sh
----------------------------------------------------------------------
diff --git a/tools/kylin_client_tool/setup.sh b/tools/kylin_client_tool/setup.sh
new file mode 100644
index 0000000..bb7be9c
--- /dev/null
+++ b/tools/kylin_client_tool/setup.sh
@@ -0,0 +1,20 @@
+#!/bin/bash
+
+if [ ! -f "requests-2.9.1.tar.gz" ]
+then
+ echo "requests binary file found"
+ wget https://pypi.python.org/packages/source/r/requests/requests-2.9.1.tar.gz || echo "download requests failed"
+fi
+tar -zxvf requests-2.9.1.tar.gz
+cd requests-2.9.1
+python setup.py install
+cd ..
+
+if [ ! -f "APScheduler-3.0.5.tar.gz" ]
+then
+ echo "APScheduler binary file found"
+ wget https://pypi.python.org/packages/source/A/APScheduler/APScheduler-3.0.5.tar.gz || echo "download APScheduler failed"
+fi
+tar -zxvf APScheduler-3.0.5.tar.gz
+cd APScheduler-3.0.5
+python setup.py install
\ No newline at end of file