You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@kylin.apache.org by lu...@apache.org on 2015/01/07 15:46:42 UTC
[21/51] [partial] incubator-kylin git commit: migrate repo from
github.com to apache git
http://git-wip-us.apache.org/repos/asf/incubator-kylin/blob/4b631f92/job/src/main/java/com/kylinolap/job/engine/JobEngine.java
----------------------------------------------------------------------
diff --git a/job/src/main/java/com/kylinolap/job/engine/JobEngine.java b/job/src/main/java/com/kylinolap/job/engine/JobEngine.java
new file mode 100644
index 0000000..b86fe36
--- /dev/null
+++ b/job/src/main/java/com/kylinolap/job/engine/JobEngine.java
@@ -0,0 +1,227 @@
+/*
+ * Copyright 2013-2014 eBay Software Foundation
+ *
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package com.kylinolap.job.engine;
+
+/**
+ * @author George Song (ysong1), xduo
+ *
+ */
+import java.io.File;
+import java.io.IOException;
+import java.util.Arrays;
+import java.util.Collection;
+import java.util.concurrent.ConcurrentHashMap;
+
+import com.kylinolap.common.KylinConfig;
+import org.apache.commons.lang.ArrayUtils;
+import org.apache.commons.math.stat.descriptive.rank.Percentile;
+import org.apache.curator.RetryPolicy;
+import org.apache.curator.framework.CuratorFramework;
+import org.apache.curator.framework.CuratorFrameworkFactory;
+import org.apache.curator.framework.imps.CuratorFrameworkState;
+import org.apache.curator.framework.recipes.locks.InterProcessMutex;
+import org.apache.curator.framework.state.ConnectionState;
+import org.apache.curator.framework.state.ConnectionStateListener;
+import org.apache.curator.retry.ExponentialBackoffRetry;
+import org.apache.hadoop.hbase.util.Bytes;
+import org.quartz.Scheduler;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import com.kylinolap.job.JobInstance;
+import com.kylinolap.job.JobInstance.JobStep;
+import com.kylinolap.job.constant.JobConstants;
+import com.kylinolap.job.exception.JobException;
+
+public class JobEngine implements ConnectionStateListener {
+
+ private static Logger log = LoggerFactory.getLogger(JobEngine.class);
+
+ private final String engineID;
+
+ private final JobEngineConfig engineConfig;
+ private final QuatzScheduler scheduler;
+ private InterProcessMutex sharedLock;
+ private CuratorFramework zkClient;
+
+ private static final String ZOOKEEPER_LOCK_PATH = "/kylin/job_engine/lock";
+ private static final ConcurrentHashMap<JobEngineConfig, JobEngine> CACHE = new ConcurrentHashMap<JobEngineConfig, JobEngine>();
+ private int daemonJobIntervalInSeconds;
+
+ public static JobEngine getInstance(String engineID, JobEngineConfig engineCfg) throws JobException {
+ JobEngine r = CACHE.get(engineCfg);
+ if (r == null) {
+ r = new JobEngine(engineID, engineCfg);
+ CACHE.putIfAbsent(engineCfg, r);
+ }
+ return r;
+ }
+
+ private JobEngine(String engineID, JobEngineConfig context) throws JobException {
+ if (context.getZookeeperString() == null || context.getZookeeperString().equals("")) {
+ throw new IllegalArgumentException("Zookeeper connection string is null or empty");
+ }
+ log.info("Using metadata url: " + context.getConfig());
+
+ this.engineID = engineID;
+ this.engineConfig = context;
+ this.scheduler = new QuatzScheduler();
+
+ RetryPolicy retryPolicy = new ExponentialBackoffRetry(1000, 3);
+ this.zkClient = CuratorFrameworkFactory.newClient(context.getZookeeperString(), retryPolicy);
+ this.zkClient.start();
+
+ Runtime.getRuntime().addShutdownHook(new Thread() {
+ public void run() {
+ log.debug("Closing HBASE connection");
+ releaseLock();
+ }
+ });
+ }
+
+ private void releaseLock() {
+ try {
+ if (sharedLock != null && sharedLock.isAcquiredInThisProcess()) {
+ sharedLock.release();
+ }
+ if (zkClient.getState().equals(CuratorFrameworkState.STARTED)) {
+ // client.setData().forPath(ZOOKEEPER_LOCK_PATH, null);
+ if (zkClient.checkExists().forPath(ZOOKEEPER_LOCK_PATH + "/" + this.engineID) != null) {
+ zkClient.delete().guaranteed().deletingChildrenIfNeeded().forPath(ZOOKEEPER_LOCK_PATH + "/" + this.engineID);
+ }
+ }
+ } catch (Exception e) {
+ throw new RuntimeException(e);
+ }
+ }
+
+ public void start(int daemonJobIntervalInSeconds) throws Exception {
+ this.daemonJobIntervalInSeconds = daemonJobIntervalInSeconds;
+
+ sharedLock = new InterProcessMutex(zkClient, ZOOKEEPER_LOCK_PATH + "/" + this.engineID);
+ log.info("Trying to obtain the shared lock...");
+ // current thread will be blocked until the lock is got
+ sharedLock.acquire();
+
+ log.info("Obtained the shared lock. Starting job scheduler...");
+ zkClient.setData().forPath(ZOOKEEPER_LOCK_PATH + "/" + this.engineID, Bytes.toBytes(this.engineID));
+ startScheduler();
+ }
+
+ private void startScheduler() throws JobException, IOException {
+ String logDir = KylinConfig.getInstanceFromEnv().getKylinJobLogDir();
+ new File(logDir).mkdirs();
+
+ log.info("Starting scheduler.");
+ this.scheduler.start();
+ this.scheduler.scheduleFetcher(this.daemonJobIntervalInSeconds, this.engineConfig);
+ }
+
+ public void start() throws Exception {
+ start(JobConstants.DEFAULT_SCHEDULER_INTERVAL_SECONDS);
+ }
+
+ public void stop() throws JobException {
+ releaseLock();
+ this.scheduler.stop();
+ }
+
+ @Override
+ public void stateChanged(CuratorFramework client, ConnectionState newState) {
+ if ((newState == ConnectionState.SUSPENDED) || (newState == ConnectionState.LOST)) {
+ releaseLock();
+ }
+ }
+
+ public void interruptJob(JobInstance jobInstance, JobStep jobStep) throws IOException, JobException {
+ // kill the running step
+ this.scheduler.interrupt(jobInstance, jobStep);
+ }
+
+ public Scheduler getScheduler() {
+ return this.scheduler.getScheduler();
+ }
+
+ // Job engine metrics related methods
+
+ // <StepID, Duration Seconds>
+ public static ConcurrentHashMap<String, Double> JOB_DURATION = new ConcurrentHashMap<String, Double>();
+
+ public int getNumberOfJobStepsExecuted() {
+ return JOB_DURATION.values().size();
+ }
+
+ public String getPrimaryEngineID() throws Exception {
+ byte[] data = zkClient.getData().forPath(ZOOKEEPER_LOCK_PATH + "/" + this.engineID);
+ if (data == null) {
+ return "";
+ } else {
+ return Bytes.toString(data);
+ }
+ }
+
+ public double getMinJobStepDuration() {
+ double[] all = getJobStepDuration();
+ Arrays.sort(all);
+
+ if (all.length > 0) {
+ return all[0];
+ } else {
+ return 0;
+ }
+ }
+
+ private double[] getJobStepDuration() {
+ Collection<Double> values = JOB_DURATION.values();
+ Double[] all = (Double[]) values.toArray(new Double[values.size()]);
+ return ArrayUtils.toPrimitive(all);
+ }
+
+ public double getMaxJobStepDuration() {
+ double[] all = getJobStepDuration();
+ Arrays.sort(all);
+
+ if (all.length > 1) {
+ return all[all.length - 1];
+ } else {
+ return 0;
+ }
+ }
+
+ public double getPercentileJobStepDuration(double percentile) {
+ Collection<Double> values = JOB_DURATION.values();
+ Double[] all = (Double[]) values.toArray(new Double[values.size()]);
+ Percentile p = new Percentile(percentile);
+ return p.evaluate(ArrayUtils.toPrimitive(all));
+ }
+
+ public Integer getScheduledJobsSzie() {
+ return scheduler.getScheduledJobs();
+ }
+
+ public int getEngineThreadPoolSize() {
+ return scheduler.getThreadPoolSize();
+ }
+
+ public int getNumberOfIdleSlots() {
+ return scheduler.getIdleSlots();
+ }
+
+ public int getNumberOfJobStepsRunning() {
+ return scheduler.getRunningJobs();
+ }
+
+}
http://git-wip-us.apache.org/repos/asf/incubator-kylin/blob/4b631f92/job/src/main/java/com/kylinolap/job/engine/JobEngineConfig.java
----------------------------------------------------------------------
diff --git a/job/src/main/java/com/kylinolap/job/engine/JobEngineConfig.java b/job/src/main/java/com/kylinolap/job/engine/JobEngineConfig.java
new file mode 100644
index 0000000..5527cf6
--- /dev/null
+++ b/job/src/main/java/com/kylinolap/job/engine/JobEngineConfig.java
@@ -0,0 +1,254 @@
+/*
+ * Copyright 2013-2014 eBay Software Foundation
+ *
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package com.kylinolap.job.engine;
+
+import java.io.File;
+import java.io.FileOutputStream;
+import java.io.IOException;
+import java.io.InputStream;
+import java.io.OutputStream;
+
+import org.apache.commons.lang.StringUtils;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import com.kylinolap.common.KylinConfig;
+import com.kylinolap.job.tools.OptionsHelper;
+import com.kylinolap.metadata.model.cube.CubeDesc.CubeCapacity;
+
+/**
+ * @author ysong1
+ */
+public class JobEngineConfig {
+ private static final Logger logger = LoggerFactory.getLogger(JobEngineConfig.class);
+ public static String HADOOP_JOB_CONF_FILENAME = "kylin_job_conf";
+
+ private String getHadoopJobConfFilePath(CubeCapacity capaticy, boolean appendSuffix) throws IOException {
+ String hadoopJobConfFile;
+ if (appendSuffix)
+ hadoopJobConfFile = (HADOOP_JOB_CONF_FILENAME + "_" + capaticy.toString().toLowerCase() + ".xml");
+ else
+ hadoopJobConfFile = (HADOOP_JOB_CONF_FILENAME + ".xml");
+
+ String path = System.getProperty(KylinConfig.KYLIN_CONF);
+
+ if (path == null) {
+ path = System.getenv(KylinConfig.KYLIN_CONF);
+ }
+
+ if (path != null) {
+ path = path + File.separator + hadoopJobConfFile;
+ }
+
+ if (null == path || !new File(path).exists()) {
+ File defaultFilePath = new File("/etc/kylin/" + hadoopJobConfFile);
+
+ if (defaultFilePath.exists()) {
+ path = defaultFilePath.getAbsolutePath();
+ } else {
+ logger.debug("Search conf file " + hadoopJobConfFile + " from classpath ...");
+ InputStream is = JobEngineConfig.class.getClassLoader().getResourceAsStream(hadoopJobConfFile);
+ if (is == null) {
+ logger.debug("Can't get " + hadoopJobConfFile + " from classpath");
+ logger.debug("No " + hadoopJobConfFile + " file were found");
+ } else {
+ File tmp = File.createTempFile(HADOOP_JOB_CONF_FILENAME, ".xml");
+ inputStreamToFile(is, tmp);
+ path = tmp.getAbsolutePath();
+ }
+ }
+ }
+
+ if (null == path || !new File(path).exists()) {
+ return "";
+ }
+
+ return OptionsHelper.convertToFileURL(path);
+ }
+
+ public String getHadoopJobConfFilePath(CubeCapacity capaticy) throws IOException {
+ String path = getHadoopJobConfFilePath(capaticy, true);
+ if (!StringUtils.isEmpty(path)) {
+ logger.info("Chosen job conf is : " + path);
+ return path;
+ } else {
+ path = getHadoopJobConfFilePath(capaticy, false);
+ if (!StringUtils.isEmpty(path)) {
+ logger.info("Chosen job conf is : " + path);
+ return path;
+ }
+ }
+ return "";
+ }
+
+ private void inputStreamToFile(InputStream ins, File file) throws IOException {
+ OutputStream os = new FileOutputStream(file);
+ int bytesRead = 0;
+ byte[] buffer = new byte[8192];
+ while ((bytesRead = ins.read(buffer, 0, 8192)) != -1) {
+ os.write(buffer, 0, bytesRead);
+ }
+ os.close();
+ ins.close();
+ }
+
+ // there should be no setters
+ private final KylinConfig config;
+
+ public JobEngineConfig(KylinConfig config) {
+ this.config = config;
+ }
+
+ public KylinConfig getConfig() {
+ return config;
+ }
+
+ public String getHdfsWorkingDirectory() {
+ return config.getHdfsWorkingDirectory();
+ }
+
+ /**
+ * @return the kylinJobJarPath
+ */
+ public String getKylinJobJarPath() {
+ return config.getKylinJobJarPath();
+ }
+
+ /**
+ * @return the runAsRemoteCommand
+ */
+ public boolean isRunAsRemoteCommand() {
+ return config.getRunAsRemoteCommand();
+ }
+
+ /**
+ * @return the zookeeperString
+ */
+ public String getZookeeperString() {
+ return config.getZookeeperString();
+ }
+
+ /**
+ * @return the remoteHadoopCliHostname
+ */
+ public String getRemoteHadoopCliHostname() {
+ return config.getRemoteHadoopCliHostname();
+ }
+
+ /**
+ * @return the remoteHadoopCliUsername
+ */
+ public String getRemoteHadoopCliUsername() {
+ return config.getRemoteHadoopCliUsername();
+ }
+
+ /**
+ * @return the remoteHadoopCliPassword
+ */
+ public String getRemoteHadoopCliPassword() {
+ return config.getRemoteHadoopCliPassword();
+ }
+
+ public String getMapReduceCmdExtraArgs() {
+ return config.getMapReduceCmdExtraArgs();
+ }
+
+ /**
+ * @return the yarnStatusServiceUrl
+ */
+ public String getYarnStatusServiceUrl() {
+ return config.getYarnStatusServiceUrl();
+ }
+
+ /**
+ * @return the maxConcurrentJobLimit
+ */
+ public int getMaxConcurrentJobLimit() {
+ return config.getMaxConcurrentJobLimit();
+ }
+
+ /**
+ * @return the timeZone
+ */
+ public String getTimeZone() {
+ return config.getTimeZone();
+ }
+
+ /**
+ * @return the adminDls
+ */
+ public String getAdminDls() {
+ return config.getAdminDls();
+ }
+
+ /**
+ * @return the jobStepTimeout
+ */
+ public long getJobStepTimeout() {
+ return config.getJobStepTimeout();
+ }
+
+ /**
+ * @return the asyncJobCheckInterval
+ */
+ public int getAsyncJobCheckInterval() {
+ return config.getYarnStatusCheckIntervalSeconds();
+ }
+
+ /**
+ * @return the flatTableByHive
+ */
+ public boolean isFlatTableByHive() {
+ return config.getFlatTableByHive();
+ }
+
+ /*
+ * (non-Javadoc)
+ *
+ * @see java.lang.Object#hashCode()
+ */
+ @Override
+ public int hashCode() {
+ final int prime = 31;
+ int result = 1;
+ result = prime * result + ((config == null) ? 0 : config.hashCode());
+ return result;
+ }
+
+ /*
+ * (non-Javadoc)
+ *
+ * @see java.lang.Object#equals(java.lang.Object)
+ */
+ @Override
+ public boolean equals(Object obj) {
+ if (this == obj)
+ return true;
+ if (obj == null)
+ return false;
+ if (getClass() != obj.getClass())
+ return false;
+ JobEngineConfig other = (JobEngineConfig) obj;
+ if (config == null) {
+ if (other.config != null)
+ return false;
+ } else if (!config.equals(other.config))
+ return false;
+ return true;
+ }
+
+}
\ No newline at end of file
http://git-wip-us.apache.org/repos/asf/incubator-kylin/blob/4b631f92/job/src/main/java/com/kylinolap/job/engine/JobFetcher.java
----------------------------------------------------------------------
diff --git a/job/src/main/java/com/kylinolap/job/engine/JobFetcher.java b/job/src/main/java/com/kylinolap/job/engine/JobFetcher.java
new file mode 100644
index 0000000..2389f27
--- /dev/null
+++ b/job/src/main/java/com/kylinolap/job/engine/JobFetcher.java
@@ -0,0 +1,117 @@
+/*
+ * Copyright 2013-2014 eBay Software Foundation
+ *
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package com.kylinolap.job.engine;
+
+import java.util.List;
+import java.util.Random;
+import java.util.concurrent.ConcurrentHashMap;
+
+import org.quartz.Job;
+import org.quartz.JobDetail;
+import org.quartz.JobExecutionContext;
+import org.quartz.JobExecutionException;
+import org.quartz.Trigger;
+import org.quartz.TriggerBuilder;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import com.kylinolap.common.util.StringSplitter;
+import com.kylinolap.job.JobDAO;
+import com.kylinolap.job.JobInstance;
+import com.kylinolap.job.constant.JobConstants;
+import com.kylinolap.job.constant.JobStatusEnum;
+import com.kylinolap.job.flow.JobFlow;
+
+/**
+ * @author ysong1, xduo
+ *
+ */
+public class JobFetcher implements Job {
+
+ private static final Logger log = LoggerFactory.getLogger(JobFetcher.class);
+
+ public static final int JOB_THRESHOLD = 10;
+
+ @Override
+ public void execute(JobExecutionContext context) throws JobExecutionException {
+
+ JobEngineConfig engineConfig = (JobEngineConfig) context.getJobDetail().getJobDataMap().get(JobConstants.PROP_ENGINE_CONTEXT);
+
+ JobDAO jobDAO = JobDAO.getInstance(engineConfig.getConfig());
+
+ try {
+ // get all pending jobs
+ List<JobInstance> pendingJobList = jobDAO.listAllJobs(JobStatusEnum.PENDING);
+
+ log.debug(pendingJobList.size() + " pending jobs");
+ int leftJobs = JOB_THRESHOLD;
+ Random rand = new Random();
+ int maxConcurrentJobCount = engineConfig.getMaxConcurrentJobLimit();
+
+ for (JobInstance jobInstance : pendingJobList) {
+ @SuppressWarnings("unchecked")
+ ConcurrentHashMap<String, JobFlow> jobFlows = (ConcurrentHashMap<String, JobFlow>) context.getScheduler().getContext().get(JobConstants.PROP_JOB_RUNTIME_FLOWS);
+
+ if (jobFlows.size() >= maxConcurrentJobCount) {
+ // If too many job instances in current job context, just
+ // wait.
+ break;
+ }
+
+ try {
+ // there should be only 1 job for a certain job running
+ boolean cubeHasRunningJob = false;
+ for (String s : jobFlows.keySet()) {
+ String[] tmp = StringSplitter.split(s, ".");
+ String cubename = tmp[0];
+ String jobid = tmp[1];
+ if (cubename.equals(jobInstance.getRelatedCube())) {
+ log.info("There is already a job of cube " + jobInstance.getRelatedCube() + " running, job uuid is " + jobid);
+ cubeHasRunningJob = true;
+ break;
+ }
+ }
+
+ if (cubeHasRunningJob == false && jobFlows.containsKey(JobInstance.getJobIdentity(jobInstance)) == false) {
+ // create job flow
+ JobFlow jobFlow = new JobFlow(jobInstance, engineConfig);
+ jobFlows.put(JobInstance.getJobIdentity(jobInstance), jobFlow);
+
+ // schedule the 1st step
+ Trigger trigger = TriggerBuilder.newTrigger().startNow().build();
+ JobDetail firstStep = jobFlow.getFirst();
+ context.getScheduler().scheduleJob(firstStep, trigger);
+
+ log.info("Job " + jobInstance.getUuid() + " has been scheduled with the first step " + firstStep.getKey().toString());
+ }
+ } catch (Exception e) {
+ log.error("Failed to trigger the job detail", e);
+ }
+
+ if (--leftJobs < 0) {
+ log.info("Too many pending jobs!");
+ break;
+ }
+ long ms = Math.abs(rand.nextLong() % 10L);
+ Thread.sleep(ms * 1000L);
+ }
+ } catch (Throwable t) {
+ log.error(t.getMessage());
+ throw new JobExecutionException(t);
+ }
+ }
+}
http://git-wip-us.apache.org/repos/asf/incubator-kylin/blob/4b631f92/job/src/main/java/com/kylinolap/job/engine/QuatzScheduler.java
----------------------------------------------------------------------
diff --git a/job/src/main/java/com/kylinolap/job/engine/QuatzScheduler.java b/job/src/main/java/com/kylinolap/job/engine/QuatzScheduler.java
new file mode 100644
index 0000000..f080c37
--- /dev/null
+++ b/job/src/main/java/com/kylinolap/job/engine/QuatzScheduler.java
@@ -0,0 +1,193 @@
+/*
+ * Copyright 2013-2014 eBay Software Foundation
+ *
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package com.kylinolap.job.engine;
+
+import java.io.IOException;
+import java.util.Properties;
+import java.util.concurrent.ConcurrentHashMap;
+
+import org.quartz.JobBuilder;
+import org.quartz.JobDetail;
+import org.quartz.JobKey;
+import org.quartz.Scheduler;
+import org.quartz.SchedulerException;
+import org.quartz.SimpleScheduleBuilder;
+import org.quartz.Trigger;
+import org.quartz.TriggerBuilder;
+import org.quartz.UnableToInterruptJobException;
+import org.quartz.impl.StdSchedulerFactory;
+import org.quartz.impl.matchers.GroupMatcher;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import com.kylinolap.job.JobInstance;
+import com.kylinolap.job.JobInstance.JobStep;
+import com.kylinolap.job.cmd.IJobCommand;
+import com.kylinolap.job.constant.JobConstants;
+import com.kylinolap.job.exception.JobException;
+import com.kylinolap.job.flow.JobFlow;
+import com.kylinolap.job.flow.JobFlowListener;
+
+/**
+ * @author xduo
+ *
+ */
+public class QuatzScheduler {
+
+ private static Logger log = LoggerFactory.getLogger(QuatzScheduler.class);
+
+ private Scheduler scheduler;
+ private JobFlowListener globalJobListener;
+
+ // public static void scheduleJobFlow(Scheduler scheduler, JobFlow jobFlow)
+ // throws JobException {
+ // // schedule the 1st step
+ // Trigger trigger = TriggerBuilder.newTrigger().startNow().build();
+ // JobDetail firstStep = jobFlow.getFirst();
+ // try {
+ // scheduler.scheduleJob(firstStep, trigger);
+ // } catch (SchedulerException e) {
+ // throw new JobException(e);
+ // }
+ // }
+
+ public QuatzScheduler() throws JobException {
+ this.globalJobListener = new JobFlowListener(JobConstants.GLOBAL_LISTENER_NAME);
+ StdSchedulerFactory sf = new StdSchedulerFactory();
+ Properties schedulerProperties = new Properties();
+ int numberOfProcessors = Runtime.getRuntime().availableProcessors();
+ schedulerProperties.setProperty("org.quartz.threadPool.threadCount", String.valueOf(numberOfProcessors));
+ schedulerProperties.setProperty("org.quartz.scheduler.skipUpdateCheck", "true");
+
+ try {
+ sf.initialize(schedulerProperties);
+ this.scheduler = sf.getScheduler();
+ this.scheduler.getListenerManager().addJobListener(this.globalJobListener, GroupMatcher.jobGroupEquals(JobConstants.CUBE_JOB_GROUP_NAME));
+
+ // cubename.jobUuid -> job flow
+ this.scheduler.getContext().put(JobConstants.PROP_JOB_RUNTIME_FLOWS, new ConcurrentHashMap<String, JobFlow>());
+
+ // put the scheduler in standby mode first
+ this.scheduler.standby();
+ } catch (SchedulerException e) {
+ throw new JobException(e);
+ }
+ }
+
+ public void start() throws JobException {
+ try {
+ this.scheduler.start();
+ } catch (SchedulerException e) {
+ throw new JobException(e);
+ }
+ }
+
+ public void scheduleFetcher(int intervalInSeconds, JobEngineConfig engineConfig) throws JobException {
+ JobDetail job = JobBuilder.newJob(JobFetcher.class).withIdentity(JobFetcher.class.getCanonicalName(), JobConstants.DAEMON_JOB_GROUP_NAME).build();
+ job.getJobDataMap().put(JobConstants.PROP_ENGINE_CONTEXT, engineConfig);
+
+ Trigger trigger = TriggerBuilder.newTrigger().startNow().withSchedule(SimpleScheduleBuilder.simpleSchedule().withIntervalInSeconds(intervalInSeconds).repeatForever()).build();
+
+ try {
+ this.scheduler.scheduleJob(job, trigger);
+ } catch (SchedulerException e) {
+ throw new JobException(e);
+ }
+ }
+
+ public boolean interrupt(JobInstance jobInstance, JobStep jobStep) throws JobException, IOException {
+ JobKey jobKey = new JobKey(JobInstance.getStepIdentity(jobInstance, jobStep), JobConstants.CUBE_JOB_GROUP_NAME);
+
+ boolean res = false;
+ try {
+ JobDetail jobDetail = this.scheduler.getJobDetail(jobKey);
+
+ IJobCommand iJobStepCmd = (IJobCommand) jobDetail.getJobDataMap().get(JobConstants.PROP_JOB_CMD_EXECUTOR);
+ if (null != iJobStepCmd) {
+ iJobStepCmd.cancel();
+ }
+
+ jobDetail.getJobDataMap().put(JobConstants.PROP_JOB_KILLED, true);
+ this.scheduler.addJob(jobDetail, true, true);
+
+ @SuppressWarnings("unchecked")
+ ConcurrentHashMap<String, JobFlow> jobFlows = (ConcurrentHashMap<String, JobFlow>) this.scheduler.getContext().get(JobConstants.PROP_JOB_RUNTIME_FLOWS);
+ jobFlows.remove(JobInstance.getJobIdentity(jobInstance));
+ } catch (UnableToInterruptJobException e) {
+ log.error(e.getLocalizedMessage(), e);
+ throw new JobException(e);
+ } catch (SchedulerException e) {
+ log.error(e.getLocalizedMessage(), e);
+ throw new JobException(e);
+ }
+ return res;
+ }
+
+ public void stop() throws JobException {
+ try {
+ this.scheduler.standby();
+ } catch (SchedulerException e) {
+ throw new JobException(e);
+ }
+ }
+
+ public Scheduler getScheduler() {
+ return this.scheduler;
+ }
+
+ // // metrics
+
+ public int getThreadPoolSize() {
+ try {
+ return scheduler.getMetaData().getThreadPoolSize();
+ } catch (SchedulerException e) {
+ log.error("Can't get scheduler metadata!", e);
+ return 0;
+ }
+ }
+
+ public int getRunningJobs() {
+ try {
+ return this.scheduler.getCurrentlyExecutingJobs().size();
+ } catch (SchedulerException e) {
+ log.error("Can't get scheduler metadata!", e);
+ return 0;
+ }
+ }
+
+ public int getIdleSlots() {
+ try {
+ return this.scheduler.getMetaData().getThreadPoolSize() - this.scheduler.getCurrentlyExecutingJobs().size();
+ } catch (SchedulerException e) {
+ log.error("Can't get scheduler metadata!", e);
+ return 0;
+ }
+ }
+
+ public int getScheduledJobs() {
+ int allTriggersCount = 0;
+ try {
+ for (String groupName : scheduler.getJobGroupNames()) {
+ allTriggersCount += scheduler.getJobKeys(GroupMatcher.jobGroupEquals(groupName)).size();
+ }
+ } catch (SchedulerException e) {
+ log.error("Can't get scheduler metadata!", e);
+ }
+ return allTriggersCount;
+ }
+
+}
http://git-wip-us.apache.org/repos/asf/incubator-kylin/blob/4b631f92/job/src/main/java/com/kylinolap/job/exception/InvalidJobInstanceException.java
----------------------------------------------------------------------
diff --git a/job/src/main/java/com/kylinolap/job/exception/InvalidJobInstanceException.java b/job/src/main/java/com/kylinolap/job/exception/InvalidJobInstanceException.java
new file mode 100644
index 0000000..1b404cb
--- /dev/null
+++ b/job/src/main/java/com/kylinolap/job/exception/InvalidJobInstanceException.java
@@ -0,0 +1,30 @@
+/*
+ * Copyright 2013-2014 eBay Software Foundation
+ *
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package com.kylinolap.job.exception;
+
+/**
+ * @author ysong1
+ *
+ */
+public class InvalidJobInstanceException extends Exception {
+
+ private static final long serialVersionUID = 2045169570038227895L;
+
+ public InvalidJobInstanceException(String message) {
+ super(message);
+ }
+}
http://git-wip-us.apache.org/repos/asf/incubator-kylin/blob/4b631f92/job/src/main/java/com/kylinolap/job/exception/InvalidJobStatusException.java
----------------------------------------------------------------------
diff --git a/job/src/main/java/com/kylinolap/job/exception/InvalidJobStatusException.java b/job/src/main/java/com/kylinolap/job/exception/InvalidJobStatusException.java
new file mode 100644
index 0000000..d1050d0
--- /dev/null
+++ b/job/src/main/java/com/kylinolap/job/exception/InvalidJobStatusException.java
@@ -0,0 +1,33 @@
+/*
+ * Copyright 2013-2014 eBay Software Foundation
+ *
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package com.kylinolap.job.exception;
+
+/**
+ * @author xduo
+ *
+ */
+public class InvalidJobStatusException extends Exception {
+
+ /**
+ * @param string
+ */
+ public InvalidJobStatusException(String string) {
+ }
+
+ private static final long serialVersionUID = -8549756520626114000L;
+
+}
http://git-wip-us.apache.org/repos/asf/incubator-kylin/blob/4b631f92/job/src/main/java/com/kylinolap/job/exception/JobException.java
----------------------------------------------------------------------
diff --git a/job/src/main/java/com/kylinolap/job/exception/JobException.java b/job/src/main/java/com/kylinolap/job/exception/JobException.java
new file mode 100644
index 0000000..6c4bc6a
--- /dev/null
+++ b/job/src/main/java/com/kylinolap/job/exception/JobException.java
@@ -0,0 +1,56 @@
+/*
+ * Copyright 2013-2014 eBay Software Foundation
+ *
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package com.kylinolap.job.exception;
+
+/**
+ * @author xduo
+ *
+ */
+public class JobException extends Exception {
+
+ private static final long serialVersionUID = 1L;
+
+ /**
+ *
+ */
+ public JobException() {
+ super();
+ }
+
+ /**
+ * @param message
+ * @param cause
+ */
+ public JobException(String message, Throwable cause) {
+ super(message, cause);
+ }
+
+ /**
+ * @param message
+ */
+ public JobException(String message) {
+ super(message);
+ }
+
+ /**
+ * @param cause
+ */
+ public JobException(Throwable cause) {
+ super(cause);
+ }
+
+}
http://git-wip-us.apache.org/repos/asf/incubator-kylin/blob/4b631f92/job/src/main/java/com/kylinolap/job/flow/AsyncJobFlowNode.java
----------------------------------------------------------------------
diff --git a/job/src/main/java/com/kylinolap/job/flow/AsyncJobFlowNode.java b/job/src/main/java/com/kylinolap/job/flow/AsyncJobFlowNode.java
new file mode 100644
index 0000000..1b8777e
--- /dev/null
+++ b/job/src/main/java/com/kylinolap/job/flow/AsyncJobFlowNode.java
@@ -0,0 +1,119 @@
+/*
+ * Copyright 2013-2014 eBay Software Foundation
+ *
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package com.kylinolap.job.flow;
+
+import java.util.HashSet;
+import java.util.Set;
+
+import org.quartz.DateBuilder;
+import org.quartz.DateBuilder.IntervalUnit;
+import org.quartz.JobDataMap;
+import org.quartz.JobExecutionContext;
+import org.quartz.JobExecutionException;
+import org.quartz.SchedulerException;
+import org.quartz.Trigger;
+import org.quartz.TriggerBuilder;
+
+import com.kylinolap.common.KylinConfig;
+import com.kylinolap.job.JobDAO;
+import com.kylinolap.job.JobInstance;
+import com.kylinolap.job.JobInstance.JobStep;
+import com.kylinolap.job.cmd.ICommandOutput;
+import com.kylinolap.job.cmd.JobCommandFactory;
+import com.kylinolap.job.constant.JobConstants;
+import com.kylinolap.job.constant.JobStepStatusEnum;
+import com.kylinolap.job.engine.JobEngineConfig;
+import com.kylinolap.job.exception.JobException;
+
+/**
+ * @author xduo
+ *
+ */
+public class AsyncJobFlowNode extends JobFlowNode {
+
+ @Override
+ public void execute(JobExecutionContext context) throws JobExecutionException {
+ this.currentJobDetail = context.getJobDetail();
+ JobDataMap data = this.currentJobDetail.getJobDataMap();
+ JobFlow jobFlow = (JobFlow) data.get(JobConstants.PROP_JOB_FLOW);
+ JobEngineConfig engineConfig = jobFlow.getJobengineConfig();
+ KylinConfig config = engineConfig.getConfig();
+ String jobInstanceID = data.getString(JobConstants.PROP_JOBINSTANCE_UUID);
+ int jobStepID = data.getInt(JobConstants.PROP_JOBSTEP_SEQ_ID);
+ ICommandOutput output = (ICommandOutput) data.get(JobConstants.PROP_JOB_CMD_OUTPUT);
+
+ try {
+ if (data.getBoolean(JobConstants.PROP_JOB_KILLED)) {
+ log.info(this.currentJobDetail.getKey() + " is killed");
+ return;
+ }
+
+ if (output == null) {
+ JobInstance jobInstance = updateJobStep(jobInstanceID, jobStepID, config, JobStepStatusEnum.RUNNING, System.currentTimeMillis(), null, null);
+
+ String command = data.getString(JobConstants.PROP_COMMAND);
+ jobCmd = JobCommandFactory.getJobCommand(command, jobInstance, jobStepID, engineConfig);
+ output = jobCmd.execute();
+ data.put(JobConstants.PROP_JOB_CMD_OUTPUT, output);
+ data.put(JobConstants.PROP_JOB_CMD_EXECUTOR, jobCmd);
+ context.getScheduler().addJob(this.currentJobDetail, true, true);
+
+ JobStepStatusEnum stepStatus = output.getStatus();
+ updateJobStep(jobInstanceID, jobStepID, config, stepStatus, null, stepStatus.isComplete() ? System.currentTimeMillis() : null, output.getOutput());
+
+ context.setResult(output.getExitCode());
+ scheduleStatusChecker(context);
+ log.debug("Start async job " + currentJobDetail.getKey());
+ } else {
+ JobInstance jobInstance = JobDAO.getInstance(engineConfig.getConfig()).getJob(jobInstanceID);
+ JobStep jobStep = jobInstance.getSteps().get(jobStepID);
+
+ log.debug("Start to check hadoop job status of " + currentJobDetail.getKey());
+ JobStepStatusEnum stepStatus = output.getStatus();
+
+ if ((System.currentTimeMillis() - jobStep.getExecStartTime()) / 1000 >= engineConfig.getJobStepTimeout()) {
+ throw new JobException("Job step " + jobStep.getName() + " timeout.");
+ }
+
+ updateJobStep(jobInstance.getUuid(), jobStepID, config, stepStatus, null, stepStatus.isComplete() ? System.currentTimeMillis() : null, output.getOutput());
+
+ if (!stepStatus.isComplete()) {
+ scheduleStatusChecker(context);
+ }
+
+ context.setResult(0);
+ log.debug("Status of async job " + currentJobDetail.getKey() + ":" + stepStatus);
+ }
+ } catch (Throwable t) {
+ handleException(jobInstanceID, jobStepID, config, t);
+ }
+
+ }
+
+ private void scheduleStatusChecker(JobExecutionContext context) throws SchedulerException {
+ JobDataMap jobDataMap = this.currentJobDetail.getJobDataMap();
+ JobFlow jobFlow = (JobFlow) jobDataMap.get(JobConstants.PROP_JOB_FLOW);
+ JobEngineConfig engineConfig = jobFlow.getJobengineConfig();
+ int interval = engineConfig.getAsyncJobCheckInterval();
+ log.debug("Trigger a status check job in " + interval + " seconds for job " + currentJobDetail.getKey());
+
+ Trigger trigger = TriggerBuilder.newTrigger().startAt(DateBuilder.futureDate(interval, IntervalUnit.SECOND)).build();
+ Set<Trigger> triggers = new HashSet<Trigger>();
+ triggers.add(trigger);
+ context.getScheduler().scheduleJob(currentJobDetail, triggers, true);
+ }
+}
http://git-wip-us.apache.org/repos/asf/incubator-kylin/blob/4b631f92/job/src/main/java/com/kylinolap/job/flow/JobFlow.java
----------------------------------------------------------------------
diff --git a/job/src/main/java/com/kylinolap/job/flow/JobFlow.java b/job/src/main/java/com/kylinolap/job/flow/JobFlow.java
new file mode 100644
index 0000000..9a58107
--- /dev/null
+++ b/job/src/main/java/com/kylinolap/job/flow/JobFlow.java
@@ -0,0 +1,159 @@
+/*
+ * Copyright 2013-2014 eBay Software Foundation
+ *
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package com.kylinolap.job.flow;
+
+import java.io.File;
+import java.io.IOException;
+import java.util.Collections;
+import java.util.LinkedList;
+import java.util.List;
+
+import org.apache.commons.io.FileUtils;
+import org.apache.commons.lang3.StringUtils;
+import org.quartz.JobBuilder;
+import org.quartz.JobDetail;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import com.kylinolap.common.KylinConfig;
+import com.kylinolap.job.JobInstance;
+import com.kylinolap.job.JobInstance.JobStep;
+import com.kylinolap.job.constant.JobConstants;
+import com.kylinolap.job.constant.JobStepCmdTypeEnum;
+import com.kylinolap.job.engine.JobEngineConfig;
+
+/**
+ * @author xduo
+ */
+public class JobFlow {
+
+ private static Logger log = LoggerFactory.getLogger(JobFlow.class);
+
+ private final List<JobDetail> flowNodes;
+ private final JobInstance jobInstance;
+ private final JobEngineConfig engineConfig;
+
+ public JobFlow(JobInstance job, JobEngineConfig context) {
+ this.engineConfig = context;
+ this.jobInstance = job;
+ // validate job instance
+ List<JobStep> sortedSteps = jobInstance.getSteps();
+
+ if (sortedSteps == null || sortedSteps.size() == 0) {
+ throw new IllegalStateException("Steps of job " + jobInstance.getUuid() + " is null or empty!");
+ }
+
+ // sort the steps by step_sequenceID
+ Collections.sort(sortedSteps);
+ // find the 1st runnable job
+ int firstStepIndex = findFirstStep(sortedSteps);
+
+ log.info("Job " + jobInstance.getUuid() + " will be started at step " + firstStepIndex + " (sequence number)");
+
+ flowNodes = new LinkedList<JobDetail>();
+ for (int i = firstStepIndex; i < sortedSteps.size(); i++) {
+ JobDetail node = createJobFlowNode(jobInstance, i);
+ flowNodes.add(node);
+ }
+ }
+
+ public JobInstance getJobInstance() {
+ return jobInstance;
+ }
+
+ public JobEngineConfig getJobengineConfig() {
+ return engineConfig;
+ }
+
+ public JobDetail getFirst() {
+ if (flowNodes.isEmpty()) {
+ return null;
+ }
+ return flowNodes.get(0);
+ }
+
+ public JobDetail getNext(JobDetail jobFlowNode) {
+ int targetIndex = -1;
+ for (int index = 0; index < flowNodes.size(); index++) {
+ if (flowNodes.get(index).equals(jobFlowNode)) {
+ targetIndex = index;
+ }
+ }
+
+ if (targetIndex != -1 && flowNodes.size() > targetIndex + 1) {
+ return flowNodes.get(targetIndex + 1);
+ }
+ return null;
+ }
+
+ private int findFirstStep(List<JobStep> stepList) {
+ int firstJobIndex = 0;
+ for (int i = 0; i < stepList.size(); i++) {
+ JobStep currentStep = stepList.get(i);
+ if (currentStep.getStatus().isRunable() == false) {
+ continue;
+ } else {
+ firstJobIndex = i;
+ break;
+ }
+ }
+ return firstJobIndex;
+ }
+
+ private JobDetail createJobFlowNode(final JobInstance jobInstance, final int stepSeqId) {
+ JobStep step = jobInstance.getSteps().get(stepSeqId);
+
+ if (jobInstance.getName() == null || step.getName() == null) {
+ throw new IllegalArgumentException("JobInstance name or JobStep name cannot be null!");
+ }
+
+ // submit job the different groups based on isRunAsync property
+ JobDetail jobFlowNode = JobBuilder.newJob(step.isRunAsync() ? AsyncJobFlowNode.class : JobFlowNode.class).withIdentity(JobInstance.getStepIdentity(jobInstance, step), JobConstants.CUBE_JOB_GROUP_NAME).storeDurably().build();
+
+ // add job flow to node
+ jobFlowNode.getJobDataMap().put(JobConstants.PROP_JOB_FLOW, this);
+
+ // add command to flow node
+ String execCmd = (step.getCmdType() == JobStepCmdTypeEnum.SHELL_CMD || step.getCmdType() == JobStepCmdTypeEnum.SHELL_CMD_HADOOP) ? wrapExecCmd(jobInstance, step.getExecCmd(), String.valueOf(step.getSequenceID())) : step.getExecCmd();
+ jobFlowNode.getJobDataMap().put(JobConstants.PROP_COMMAND, execCmd);
+
+ // add job instance and step sequenceID to flow node
+ jobFlowNode.getJobDataMap().put(JobConstants.PROP_JOBINSTANCE_UUID, jobInstance.getUuid());
+ jobFlowNode.getJobDataMap().put(JobConstants.PROP_JOBSTEP_SEQ_ID, step.getSequenceID());
+
+ // add async flag to flow node
+ jobFlowNode.getJobDataMap().put(JobConstants.PROP_JOB_ASYNC, step.isRunAsync());
+
+ return jobFlowNode;
+ }
+
+ private String wrapExecCmd(JobInstance job, String cmd, String suffix) {
+ if (StringUtils.isBlank(cmd))
+ return cmd;
+
+ KylinConfig config = KylinConfig.getInstanceFromEnv();
+ try {
+ FileUtils.forceMkdir(new File(config.getKylinJobLogDir()));
+ } catch (IOException e) {
+ throw new RuntimeException("Create log dir " + config.getKylinJobLogDir() + " failed.", e);
+ }
+ String log = config.getKylinJobLogDir() + "/" + job.getUuid() + "_" + suffix + ".log";
+
+ String mkLogDir = "mkdir -p " + config.getKylinJobLogDir();
+ return mkLogDir + ";" + "set -o pipefail; " + cmd + " 2>&1 | tee " + log;
+ }
+}
http://git-wip-us.apache.org/repos/asf/incubator-kylin/blob/4b631f92/job/src/main/java/com/kylinolap/job/flow/JobFlowListener.java
----------------------------------------------------------------------
diff --git a/job/src/main/java/com/kylinolap/job/flow/JobFlowListener.java b/job/src/main/java/com/kylinolap/job/flow/JobFlowListener.java
new file mode 100644
index 0000000..8274d44
--- /dev/null
+++ b/job/src/main/java/com/kylinolap/job/flow/JobFlowListener.java
@@ -0,0 +1,419 @@
+/*
+ * Copyright 2013-2014 eBay Software Foundation
+ *
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package com.kylinolap.job.flow;
+
+import java.io.IOException;
+import java.net.InetAddress;
+import java.net.UnknownHostException;
+import java.util.ArrayList;
+import java.util.Collections;
+import java.util.Date;
+import java.util.List;
+import java.util.concurrent.ConcurrentHashMap;
+
+import com.kylinolap.cube.CubeSegmentStatusEnum;
+
+import org.apache.commons.lang.exception.ExceptionUtils;
+import org.quartz.JobDataMap;
+import org.quartz.JobDetail;
+import org.quartz.JobExecutionContext;
+import org.quartz.JobExecutionException;
+import org.quartz.JobListener;
+import org.quartz.SchedulerException;
+import org.quartz.Trigger;
+import org.quartz.TriggerBuilder;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import com.kylinolap.common.KylinConfig;
+import com.kylinolap.common.util.MailService;
+import com.kylinolap.cube.CubeInstance;
+import com.kylinolap.cube.CubeManager;
+import com.kylinolap.cube.CubeSegment;
+import com.kylinolap.cube.exception.CubeIntegrityException;
+import com.kylinolap.job.JobDAO;
+import com.kylinolap.job.JobInstance;
+import com.kylinolap.job.JobInstance.JobStep;
+import com.kylinolap.job.constant.JobConstants;
+import com.kylinolap.job.constant.JobStatusEnum;
+import com.kylinolap.job.constant.JobStepStatusEnum;
+import com.kylinolap.job.engine.JobEngineConfig;
+
+/**
+ * Handle kylin job and cube change update.
+ *
+ * @author George Song (ysong1), xduo
+ *
+ */
+public class JobFlowListener implements JobListener {
+
+ private static Logger log = LoggerFactory.getLogger(JobFlowListener.class);
+
+ private String name;
+
+ public JobFlowListener(String name) {
+ if (name == null) {
+ throw new IllegalArgumentException("Listener name cannot be null!");
+ }
+ this.name = name;
+ }
+
+ public String getName() {
+ return name;
+ }
+
+ @Override
+ public void jobWasExecuted(JobExecutionContext context, JobExecutionException jobException) {
+ log.info(context.getJobDetail().getKey() + " was executed.");
+ JobDataMap jobDataMap = context.getJobDetail().getJobDataMap();
+ JobFlow jobFlow = (JobFlow) jobDataMap.get(JobConstants.PROP_JOB_FLOW);
+ JobEngineConfig engineConfig = jobFlow.getJobengineConfig();
+ String jobUuid = jobDataMap.getString(JobConstants.PROP_JOBINSTANCE_UUID);
+ int stepSeqID = jobDataMap.getInt(JobConstants.PROP_JOBSTEP_SEQ_ID);
+ KylinConfig config = engineConfig.getConfig();
+
+ JobInstance jobInstance = null;
+ JobStep jobStep = null;
+ try {
+ jobInstance = JobDAO.getInstance(config).getJob(jobUuid);
+ jobStep = jobInstance.getSteps().get(stepSeqID);
+ CubeInstance cube = CubeManager.getInstance(config).getCube(jobInstance.getRelatedCube());
+
+ log.info(context.getJobDetail().getKey() + " status: " + jobStep.getStatus());
+ switch (jobStep.getStatus()) {
+ case FINISHED:
+ // Ensure we are using the latest metadata
+ CubeManager.getInstance(config).loadCubeCache(cube);
+ updateKylinJobOnSuccess(jobInstance, stepSeqID, engineConfig);
+ updateCubeSegmentInfoOnSucceed(jobInstance, engineConfig);
+ notifyUsers(jobInstance, engineConfig);
+ scheduleNextJob(context, jobInstance);
+ break;
+ case ERROR:
+ updateKylinJobStatus(jobInstance, stepSeqID, engineConfig);
+ notifyUsers(jobInstance, engineConfig);
+ break;
+ case DISCARDED:
+ // Ensure we are using the latest metadata
+ CubeManager.getInstance(config).loadCubeCache(cube);
+ updateCubeSegmentInfoOnDiscard(jobInstance, engineConfig);
+ notifyUsers(jobInstance, engineConfig);
+ break;
+ default:
+ break;
+ }
+ } catch (Exception e) {
+ log.error(e.getMessage(), e);
+ handleException(jobUuid, stepSeqID, config, e);
+ } finally {
+ if (null != jobInstance && jobInstance.getStatus().isComplete()) {
+ try {
+ context.getScheduler().deleteJob(context.getJobDetail().getKey());
+ @SuppressWarnings("unchecked")
+ ConcurrentHashMap<String, JobFlow> jobFlows = (ConcurrentHashMap<String, JobFlow>) context.getScheduler().getContext().get(JobConstants.PROP_JOB_RUNTIME_FLOWS);
+ jobFlows.remove(JobInstance.getJobIdentity(jobInstance));
+ } catch (SchedulerException e) {
+ log.error(e.getMessage(), e);
+ }
+ }
+ }
+ }
+
+ /*
+ * (non-Javadoc)
+ *
+ * @see
+ * org.quartz.JobListener#jobToBeExecuted(org.quartz.JobExecutionContext)
+ */
+ @Override
+ public void jobToBeExecuted(JobExecutionContext context) {
+ }
+
+ /*
+ * (non-Javadoc)
+ *
+ * @see
+ * org.quartz.JobListener#jobExecutionVetoed(org.quartz.JobExecutionContext)
+ */
+ @Override
+ public void jobExecutionVetoed(JobExecutionContext context) {
+ }
+
+ /**
+ * @param context
+ * @param jobInstance
+ */
+ protected void scheduleNextJob(JobExecutionContext context, JobInstance jobInstance) {
+ try {
+ // schedule next job
+ JobDataMap jobDataMap = context.getJobDetail().getJobDataMap();
+ JobFlow jobFlow = (JobFlow) jobDataMap.get(JobConstants.PROP_JOB_FLOW);
+ JobDetail nextJob = (JobDetail) jobFlow.getNext(context.getJobDetail());
+ if (nextJob != null) {
+ try {
+ Trigger trigger = TriggerBuilder.newTrigger().startNow().build();
+ log.debug("Job " + context.getJobDetail().getKey() + " will now chain to Job " + nextJob.getKey() + "");
+
+ context.getScheduler().scheduleJob(nextJob, trigger);
+
+ } catch (SchedulerException se) {
+ log.error("Error encountered during chaining to Job " + nextJob.getKey() + "", se);
+ }
+ }
+
+ context.getScheduler().deleteJob(context.getJobDetail().getKey());
+ } catch (SchedulerException e) {
+ log.error(e.getLocalizedMessage(), e);
+ throw new RuntimeException(e);
+ }
+ }
+
+ /**
+ * @param jobInstance
+ * @param stepId
+ */
+ private void updateKylinJobStatus(JobInstance jobInstance, int stepId, JobEngineConfig engineConfig) {
+ validate(jobInstance);
+ List<JobStep> steps = jobInstance.getSteps();
+ Collections.sort(steps);
+
+ JobStep jobStep = jobInstance.getSteps().get(stepId);
+
+ long duration = jobStep.getExecEndTime() - jobStep.getExecStartTime();
+ jobInstance.setDuration(jobInstance.getDuration() + (duration > 0 ? duration : 0) / 1000);
+ jobInstance.setMrWaiting(jobInstance.getMrWaiting() + jobStep.getExecWaitTime());
+
+ try {
+ JobDAO.getInstance(engineConfig.getConfig()).updateJobInstance(jobInstance);
+ } catch (IOException e) {
+ e.printStackTrace();
+ log.error(e.getLocalizedMessage(), e);
+ }
+ }
+
+ private void updateKylinJobOnSuccess(JobInstance jobInstance, int stepId, JobEngineConfig engineConfig) {
+ validate(jobInstance);
+ List<JobStep> steps = jobInstance.getSteps();
+ Collections.sort(steps);
+
+ JobStep jobStep = jobInstance.getSteps().get(stepId);
+ jobInstance.setExecStartTime(steps.get(0).getExecStartTime());
+
+ long duration = jobStep.getExecEndTime() - jobStep.getExecStartTime();
+ jobInstance.setDuration(jobInstance.getDuration() + (duration > 0 ? duration / 1000 : 0));
+ jobInstance.setMrWaiting(jobInstance.getMrWaiting() + jobStep.getExecWaitTime());
+ if (jobInstance.getStatus().equals(JobStatusEnum.FINISHED)) {
+ jobInstance.setExecEndTime(steps.get(steps.size() - 1).getExecEndTime());
+ }
+
+ try {
+ JobDAO.getInstance(engineConfig.getConfig()).updateJobInstance(jobInstance);
+ } catch (IOException e) {
+ e.printStackTrace();
+ log.error(e.getLocalizedMessage(), e);
+ }
+ }
+
+ private void updateCubeSegmentInfoOnDiscard(JobInstance jobInstance, JobEngineConfig engineConfig) throws IOException, CubeIntegrityException {
+ CubeManager cubeMgr = CubeManager.getInstance(engineConfig.getConfig());
+ CubeInstance cubeInstance = cubeMgr.getCube(jobInstance.getRelatedCube());
+ cubeMgr.updateSegmentOnJobDiscard(cubeInstance, jobInstance.getRelatedSegment());
+ }
+
+ private void updateCubeSegmentInfoOnSucceed(JobInstance jobInstance, JobEngineConfig engineConfig) throws CubeIntegrityException, IOException {
+ if (jobInstance.getStatus().equals(JobStatusEnum.FINISHED)) {
+ validate(jobInstance);
+
+ log.info("Updating cube segment " + jobInstance.getRelatedSegment() + " for cube " + jobInstance.getRelatedCube());
+
+ long cubeSize = 0;
+ JobStep convertToHFileStep = jobInstance.findStep(JobConstants.STEP_NAME_CONVERT_CUBOID_TO_HFILE);
+ if (null != convertToHFileStep) {
+ String cubeSizeString = convertToHFileStep.getInfo(JobInstance.HDFS_BYTES_WRITTEN);
+ if (cubeSizeString == null || cubeSizeString.equals("")) {
+ throw new RuntimeException("Can't get cube segment size.");
+ }
+ cubeSize = Long.parseLong(cubeSizeString) / 1024;
+ } else {
+ log.info("No step with name '" + JobConstants.STEP_NAME_CONVERT_CUBOID_TO_HFILE + "' is found");
+ }
+
+ CubeManager cubeMgr = CubeManager.getInstance(engineConfig.getConfig());
+ CubeInstance cubeInstance = cubeMgr.getCube(jobInstance.getRelatedCube());
+ CubeSegment newSegment = cubeInstance.getSegmentById(jobInstance.getUuid());
+
+ long sourceCount = 0;
+ long sourceSize = 0;
+ switch (jobInstance.getType()) {
+ case BUILD:
+ JobStep baseCuboidStep = jobInstance.findStep(JobConstants.STEP_NAME_BUILD_BASE_CUBOID);
+ if (null != baseCuboidStep) {
+ String sourceRecordsCount = baseCuboidStep.getInfo(JobInstance.SOURCE_RECORDS_COUNT);
+ if (sourceRecordsCount == null || sourceRecordsCount.equals("")) {
+ throw new RuntimeException("Can't get cube source record count.");
+ }
+ sourceCount = Long.parseLong(sourceRecordsCount);
+ } else {
+ log.info("No step with name '" + JobConstants.STEP_NAME_BUILD_BASE_CUBOID + "' is found");
+ }
+
+ JobStep createFlatTableStep = jobInstance.findStep(JobConstants.STEP_NAME_CREATE_FLAT_HIVE_TABLE);
+ if (null != createFlatTableStep) {
+ String sourceRecordsSize = createFlatTableStep.getInfo(JobInstance.SOURCE_RECORDS_SIZE);
+ if (sourceRecordsSize == null || sourceRecordsSize.equals("")) {
+ throw new RuntimeException("Can't get cube source record size.");
+ }
+ sourceSize = Long.parseLong(sourceRecordsSize);
+ } else {
+ log.info("No step with name '" + JobConstants.STEP_NAME_CREATE_FLAT_HIVE_TABLE + "' is found");
+ }
+
+ if (cubeInstance.needMergeImmediatelyAfterBuild(newSegment)) {
+ for (CubeSegment seg : cubeInstance.getSegment(CubeSegmentStatusEnum.READY)) {
+ sourceCount += seg.getSourceRecords();
+ sourceSize += seg.getSourceRecordsSize();
+ }
+ }
+ break;
+ case MERGE:
+ for (CubeSegment seg : cubeInstance.getMergingSegments()) {
+ sourceCount += seg.getSourceRecords();
+ sourceSize += seg.getSourceRecordsSize();
+ }
+ break;
+ }
+
+ cubeMgr.updateSegmentOnJobSucceed(cubeInstance, jobInstance.getType(), jobInstance.getRelatedSegment(), jobInstance.getUuid(), jobInstance.getExecEndTime(), cubeSize, sourceCount, sourceSize);
+ log.info("Update cube segment succeed" + jobInstance.getRelatedSegment() + " for cube " + jobInstance.getRelatedCube());
+ }
+ }
+
+ private void validate(JobInstance jobInstance) {
+ List<JobStep> steps = jobInstance.getSteps();
+ if (steps == null || steps.size() == 0) {
+ throw new RuntimeException("Steps of job " + jobInstance.getUuid() + " is null or empty!");
+ }
+ }
+
+ private void handleException(String jobInstanceUuid, int jobInstanceStepSeqId, KylinConfig config, Throwable t) {
+ log.error(t.getLocalizedMessage(), t);
+ String exceptionMsg = "Failed with Exception:" + ExceptionUtils.getFullStackTrace(t);
+ try {
+ JobInstance jobInstance = JobDAO.getInstance(config).getJob(jobInstanceUuid);
+ jobInstance.getSteps().get(jobInstanceStepSeqId).setStatus(JobStepStatusEnum.ERROR);
+ // String output =
+ // jobInstance.getSteps().get(jobInstanceStepSeqId).getCmdOutput();
+ // jobInstance.getSteps().get(jobInstanceStepSeqId).setCmdOutput(output
+ // + "\n" + exceptionMsg);
+ jobInstance.getSteps().get(jobInstanceStepSeqId).setExecEndTime(System.currentTimeMillis());
+ JobDAO.getInstance(config).updateJobInstance(jobInstance);
+
+ String output = JobDAO.getInstance(config).getJobOutput(jobInstanceUuid, jobInstanceStepSeqId).getOutput();
+ output = output + "\n" + exceptionMsg;
+ JobDAO.getInstance(config).saveJobOutput(jobInstanceUuid, jobInstanceStepSeqId, output);
+ } catch (IOException e1) {
+ log.error(e1.getLocalizedMessage(), e1);
+ }
+ }
+
+ /**
+ * @param jobInstance
+ */
+ protected void notifyUsers(JobInstance jobInstance, JobEngineConfig engineConfig) {
+ KylinConfig config = engineConfig.getConfig();
+ String cubeName = jobInstance.getRelatedCube();
+ CubeInstance cubeInstance = CubeManager.getInstance(config).getCube(cubeName);
+ String finalStatus = null;
+ String content = JobConstants.NOTIFY_EMAIL_TEMPLATE;
+ String logMsg = "";
+
+ switch (jobInstance.getStatus()) {
+ case FINISHED:
+ finalStatus = "SUCCESS";
+ break;
+ case ERROR:
+ for (JobStep step : jobInstance.getSteps()) {
+ if (step.getStatus() == JobStepStatusEnum.ERROR) {
+ try {
+ logMsg = JobDAO.getInstance(config).getJobOutput(step).getOutput();
+ } catch (IOException e) {
+ log.error(e.getLocalizedMessage(), e);
+ }
+ }
+ }
+ finalStatus = "FAILED";
+ break;
+ case DISCARDED:
+ finalStatus = "DISCARDED";
+ default:
+ break;
+ }
+
+ if (null == finalStatus) {
+ return;
+ }
+
+ try {
+ InetAddress inetAddress = InetAddress.getLocalHost();
+ content = content.replaceAll("\\$\\{job_engine\\}", inetAddress.getCanonicalHostName());
+ } catch (UnknownHostException e) {
+ log.error(e.getLocalizedMessage(), e);
+ }
+
+ content = content.replaceAll("\\$\\{job_name\\}", jobInstance.getName());
+ content = content.replaceAll("\\$\\{result\\}", finalStatus);
+ content = content.replaceAll("\\$\\{cube_name\\}", cubeName);
+ content = content.replaceAll("\\$\\{start_time\\}", new Date(jobInstance.getExecStartTime()).toString());
+ content = content.replaceAll("\\$\\{duration\\}", jobInstance.getDuration() / 60 + "mins");
+ content = content.replaceAll("\\$\\{mr_waiting\\}", jobInstance.getMrWaiting() / 60 + "mins");
+ content = content.replaceAll("\\$\\{last_update_time\\}", new Date(jobInstance.getLastModified()).toString());
+ content = content.replaceAll("\\$\\{submitter\\}", jobInstance.getSubmitter());
+ content = content.replaceAll("\\$\\{error_log\\}", logMsg);
+
+
+ MailService mailService = new MailService();
+ try {
+ List<String> users = new ArrayList<String>();
+
+ if (null != cubeInstance.getDescriptor().getNotifyList()) {
+ users.addAll(cubeInstance.getDescriptor().getNotifyList());
+ }
+
+ if (null != engineConfig.getAdminDls()) {
+ String[] adminDls = engineConfig.getAdminDls().split(",");
+
+ for (String adminDl : adminDls) {
+ users.add(adminDl);
+ }
+ }
+
+ log.info("prepare to send email to:"+users);
+
+ log.info("job name:"+jobInstance.getName());
+
+ log.info("submitter:"+jobInstance.getSubmitter());
+
+ if (users.size() > 0) {
+ log.info("notify list:"+users);
+ mailService.sendMail(users, "["+ finalStatus + "] - [Kylin Cube Build Job]-" + cubeName, content);
+ log.info("notified users:"+users);
+ }
+ } catch (IOException e) {
+ log.error(e.getLocalizedMessage(), e);
+ }
+
+ }
+}
http://git-wip-us.apache.org/repos/asf/incubator-kylin/blob/4b631f92/job/src/main/java/com/kylinolap/job/flow/JobFlowNode.java
----------------------------------------------------------------------
diff --git a/job/src/main/java/com/kylinolap/job/flow/JobFlowNode.java b/job/src/main/java/com/kylinolap/job/flow/JobFlowNode.java
new file mode 100644
index 0000000..e539121
--- /dev/null
+++ b/job/src/main/java/com/kylinolap/job/flow/JobFlowNode.java
@@ -0,0 +1,165 @@
+/*
+ * Copyright 2013-2014 eBay Software Foundation
+ *
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package com.kylinolap.job.flow;
+
+import java.io.IOException;
+
+import org.apache.commons.lang.exception.ExceptionUtils;
+import org.quartz.InterruptableJob;
+import org.quartz.JobDataMap;
+import org.quartz.JobDetail;
+import org.quartz.JobExecutionContext;
+import org.quartz.JobExecutionException;
+import org.quartz.UnableToInterruptJobException;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import com.kylinolap.common.KylinConfig;
+import com.kylinolap.job.JobDAO;
+import com.kylinolap.job.JobInstance;
+import com.kylinolap.job.JobInstance.JobStep;
+import com.kylinolap.job.cmd.ICommandOutput;
+import com.kylinolap.job.cmd.IJobCommand;
+import com.kylinolap.job.cmd.JobCommandFactory;
+import com.kylinolap.job.constant.JobConstants;
+import com.kylinolap.job.constant.JobStepStatusEnum;
+import com.kylinolap.job.engine.JobEngine;
+import com.kylinolap.job.engine.JobEngineConfig;
+
+/**
+ * @author xduo
+ *
+ */
+public class JobFlowNode implements InterruptableJob {
+
+ protected static final Logger log = LoggerFactory.getLogger(JobFlowNode.class);
+
+ protected JobDetail currentJobDetail;
+ protected IJobCommand jobCmd;
+
+ /*
+ * (non-Javadoc)
+ *
+ * @see org.quartz.Job#execute(org.quartz.JobExecutionContext)
+ */
+ @Override
+ public void execute(JobExecutionContext context) throws JobExecutionException {
+ this.currentJobDetail = context.getJobDetail();
+ JobDataMap data = this.currentJobDetail.getJobDataMap();
+ JobFlow jobFlow = (JobFlow) data.get(JobConstants.PROP_JOB_FLOW);
+ JobEngineConfig engineConfig = jobFlow.getJobengineConfig();
+ String jobInstanceID = data.getString(JobConstants.PROP_JOBINSTANCE_UUID);
+ int jobStepID = data.getInt(JobConstants.PROP_JOBSTEP_SEQ_ID);
+ String command = data.getString(JobConstants.PROP_COMMAND);
+ KylinConfig config = engineConfig.getConfig();
+
+ try {
+ JobInstance jobInstance = updateJobStep(jobInstanceID, jobStepID, config, JobStepStatusEnum.RUNNING, System.currentTimeMillis(), null, null);
+
+ jobCmd = JobCommandFactory.getJobCommand(command, jobInstance, jobStepID, engineConfig);
+ data.put(JobConstants.PROP_JOB_CMD_EXECUTOR, jobCmd);
+ context.getScheduler().addJob(this.currentJobDetail, true, true);
+
+ ICommandOutput output = jobCmd.execute();
+
+ if (data.getBoolean(JobConstants.PROP_JOB_KILLED)) {
+ return;
+ }
+
+ int exitCode = output.getExitCode();
+ updateJobStep(jobInstanceID, jobStepID, config, output.getStatus(), null, System.currentTimeMillis(), output.getOutput());
+ context.setResult(exitCode);
+
+ log.info("Job status for " + context.getJobDetail().getKey() + " has been updated.");
+ log.info("cmd:" + command);
+ log.info("output:" + output.getOutput());
+ log.info("exitCode:" + exitCode);
+
+ } catch (Throwable t) {
+ handleException(jobInstanceID, jobStepID, config, t);
+ }
+ }
+
+ @Override
+ public void interrupt() throws UnableToInterruptJobException {
+ }
+
+ protected JobInstance updateJobStep(String jobInstanceUuid, int jobInstanceStepSeqId, KylinConfig config, JobStepStatusEnum newStatus, Long execStartTime, Long execEndTime, String output) throws IOException {
+ // set step status to running
+ JobInstance jobInstance = JobDAO.getInstance(config).getJob(jobInstanceUuid);
+ JobStep currentStep = null;
+
+ try {
+ currentStep = jobInstance.getSteps().get(jobInstanceStepSeqId);
+ JobStepStatusEnum currentStatus = currentStep.getStatus();
+ boolean hasChange = false;
+
+ if (null != execStartTime) {
+ hasChange = true;
+ currentStep.setExecStartTime(execStartTime);
+ }
+ if (null != execEndTime) {
+ hasChange = true;
+ currentStep.setExecEndTime(execEndTime);
+ }
+ if (null != output) {
+ hasChange = true;
+ // currentStep.setCmdOutput(output);
+ JobDAO.getInstance(config).saveJobOutput(currentStep, output);
+ }
+ if (JobStepStatusEnum.WAITING == currentStatus && (JobStepStatusEnum.RUNNING == newStatus || JobStepStatusEnum.FINISHED == newStatus)) {
+ hasChange = true;
+ currentStep.setExecWaitTime((System.currentTimeMillis() - currentStep.getExecStartTime()) / 1000);
+ }
+ if (null != newStatus) {
+ hasChange = true;
+ currentStep.setStatus(newStatus);
+ }
+
+ if (hasChange) {
+ JobDAO.getInstance(config).updateJobInstance(jobInstance);
+ }
+ } catch (IOException e) {
+ log.error(e.getLocalizedMessage(), e);
+ }
+
+ if (null != execEndTime) {
+ JobEngine.JOB_DURATION.put(JobInstance.getStepIdentity(jobInstance, currentStep) + " - " + String.valueOf(currentStep.getExecStartTime()), (double) (currentStep.getExecEndTime() - currentStep.getExecStartTime()) / 1000);
+ }
+
+ return jobInstance;
+ }
+
+ protected void handleException(String jobInstanceUuid, int jobInstanceStepSeqId, KylinConfig config, Throwable t) {
+ log.error(t.getLocalizedMessage(), t);
+ String exceptionMsg = "Failed with Exception:" + ExceptionUtils.getFullStackTrace(t);
+ try {
+ JobDAO dao = JobDAO.getInstance(config);
+ JobInstance jobInstance = dao.getJob(jobInstanceUuid);
+ JobStep jobStep = jobInstance.getSteps().get(jobInstanceStepSeqId);
+ jobStep.setStatus(JobStepStatusEnum.ERROR);
+ jobStep.setExecEndTime(System.currentTimeMillis());
+ dao.updateJobInstance(jobInstance);
+
+ String output = dao.getJobOutput(jobInstanceUuid, jobInstanceStepSeqId).getOutput();
+ output = output + "\n" + exceptionMsg;
+ dao.saveJobOutput(jobInstanceUuid, jobInstanceStepSeqId, output);
+ } catch (IOException e1) {
+ log.error(e1.getLocalizedMessage(), e1);
+ }
+ }
+}
http://git-wip-us.apache.org/repos/asf/incubator-kylin/blob/4b631f92/job/src/main/java/com/kylinolap/job/hadoop/AbstractHadoopJob.java
----------------------------------------------------------------------
diff --git a/job/src/main/java/com/kylinolap/job/hadoop/AbstractHadoopJob.java b/job/src/main/java/com/kylinolap/job/hadoop/AbstractHadoopJob.java
new file mode 100644
index 0000000..4cd59a9
--- /dev/null
+++ b/job/src/main/java/com/kylinolap/job/hadoop/AbstractHadoopJob.java
@@ -0,0 +1,291 @@
+/*
+ * Copyright 2013-2014 eBay Software Foundation
+ *
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package com.kylinolap.job.hadoop;
+
+/**
+ * @author George Song (ysong1)
+ *
+ */
+import java.io.File;
+import java.io.IOException;
+import java.io.InputStream;
+import java.util.ArrayList;
+import java.util.HashMap;
+import java.util.Map;
+
+import org.apache.commons.cli.Option;
+import org.apache.commons.cli.OptionBuilder;
+import org.apache.commons.cli.Options;
+import org.apache.commons.cli.ParseException;
+import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.conf.Configured;
+import org.apache.hadoop.fs.FileStatus;
+import org.apache.hadoop.fs.FileSystem;
+import org.apache.hadoop.fs.Path;
+import org.apache.hadoop.mapreduce.Counters;
+import org.apache.hadoop.mapreduce.InputFormat;
+import org.apache.hadoop.mapreduce.InputSplit;
+import org.apache.hadoop.mapreduce.Job;
+import org.apache.hadoop.mapreduce.lib.input.FileInputFormat;
+import org.apache.hadoop.util.ReflectionUtils;
+import org.apache.hadoop.util.StringUtils;
+import org.apache.hadoop.util.Tool;
+import org.apache.hadoop.util.ToolRunner;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import com.kylinolap.common.KylinConfig;
+import com.kylinolap.common.persistence.ResourceStore;
+import com.kylinolap.common.util.StringSplitter;
+import com.kylinolap.cube.CubeInstance;
+import com.kylinolap.cube.CubeSegment;
+import com.kylinolap.job.JobInstance;
+import com.kylinolap.job.exception.JobException;
+import com.kylinolap.job.tools.OptionsHelper;
+import com.kylinolap.metadata.model.schema.TableDesc;
+
+@SuppressWarnings("static-access")
+public abstract class AbstractHadoopJob extends Configured implements Tool {
+ protected static final Logger log = LoggerFactory.getLogger(AbstractHadoopJob.class);
+
+ protected static final Option OPTION_JOB_NAME = OptionBuilder.withArgName("name").hasArg().isRequired(true).withDescription("Job name. For exmaple, Kylin_Cuboid_Builder-clsfd_v2_Step_22-D)").create("jobname");
+ protected static final Option OPTION_CUBE_NAME = OptionBuilder.withArgName("name").hasArg().isRequired(true).withDescription("Cube name. For exmaple, flat_item_cube").create("cubename");
+ protected static final Option OPTION_SEGMENT_NAME = OptionBuilder.withArgName("name").hasArg().isRequired(true).withDescription("Cube segment name)").create("segmentname");
+ protected static final Option OPTION_TABLE_NAME = OptionBuilder.withArgName("name").hasArg().isRequired(true).withDescription("Hive table name.").create("tablename");
+ protected static final Option OPTION_INPUT_PATH = OptionBuilder.withArgName("path").hasArg().isRequired(true).withDescription("Input path").create("input");
+ protected static final Option OPTION_INPUT_FORMAT = OptionBuilder.withArgName("inputformat").hasArg().isRequired(false).withDescription("Input format").create("inputformat");
+ protected static final Option OPTION_INPUT_DELIM = OptionBuilder.withArgName("inputdelim").hasArg().isRequired(false).withDescription("Input delimeter").create("inputdelim");
+ protected static final Option OPTION_OUTPUT_PATH = OptionBuilder.withArgName("path").hasArg().isRequired(true).withDescription("Output path").create("output");
+ protected static final Option OPTION_NCUBOID_LEVEL = OptionBuilder.withArgName("level").hasArg().isRequired(true).withDescription("N-Cuboid build level, e.g. 1, 2, 3...").create("level");
+ protected static final Option OPTION_PARTITION_FILE_PATH = OptionBuilder.withArgName("path").hasArg().isRequired(true).withDescription("Partition file path.").create("input");
+ protected static final Option OPTION_HTABLE_NAME = OptionBuilder.withArgName("htable name").hasArg().isRequired(true).withDescription("HTable name").create("htablename");
+ protected static final Option OPTION_KEY_COLUMN_PERCENTAGE = OptionBuilder.withArgName("rowkey column percentage").hasArg().isRequired(true).withDescription("Percentage of row key columns").create("columnpercentage");
+ protected static final Option OPTION_KEY_SPLIT_NUMBER = OptionBuilder.withArgName("key split number").hasArg().isRequired(true).withDescription("Number of key split range").create("splitnumber");
+
+ protected String name;
+ protected String description;
+ protected boolean isAsync = false;
+ protected OptionsHelper optionsHelper = new OptionsHelper();
+
+ protected Job job;
+
+ protected void parseOptions(Options options, String[] args) throws ParseException {
+ optionsHelper.parseOptions(options, args);
+ }
+
+ public void printUsage(Options options) {
+ optionsHelper.printUsage(getClass().getSimpleName(), options);
+ }
+
+ public Option[] getOptions() {
+ return optionsHelper.getOptions();
+ }
+
+ public String getOptionsAsString() {
+ return optionsHelper.getOptionsAsString();
+ }
+
+ protected String getOptionValue(Option option) {
+ return optionsHelper.getOptionValue(option);
+ }
+
+ protected boolean hasOption(Option option) {
+ return optionsHelper.hasOption(option);
+ }
+
+ protected int waitForCompletion(Job job) throws IOException, InterruptedException, ClassNotFoundException {
+ int retVal = 0;
+ long start = System.nanoTime();
+
+ if (isAsync) {
+ job.submit();
+ } else {
+ job.waitForCompletion(true);
+ retVal = job.isSuccessful() ? 0 : 1;
+ }
+
+ log.debug("Job '" + job.getJobName() + "' finished " + (job.isSuccessful() ? "successfully in " : "with failures. Time taken ") + StringUtils.formatTime((System.nanoTime() - start) / 1000000L));
+
+ return retVal;
+ }
+
+ protected static void runJob(Tool job, String[] args) {
+ try {
+ int exitCode = ToolRunner.run(job, args);
+ System.exit(exitCode);
+ } catch (Exception e) {
+ e.printStackTrace(System.err);
+ System.exit(5);
+ }
+ }
+
+ public void addInputDirs(String input, Job job) throws IOException {
+ for (String inp : StringSplitter.split(input, ",")) {
+ inp = inp.trim();
+ if (inp.endsWith("/*")) {
+ inp = inp.substring(0, inp.length() - 2);
+ FileSystem fs = FileSystem.get(job.getConfiguration());
+ Path path = new Path(inp);
+ FileStatus[] fileStatuses = fs.listStatus(path);
+ boolean hasDir = false;
+ for (FileStatus stat : fileStatuses) {
+ if (stat.isDirectory()) {
+ hasDir = true;
+ addInputDirs(stat.getPath().toString(), job);
+ }
+ }
+ if (fileStatuses.length > 0 && !hasDir) {
+ addInputDirs(path.toString(), job);
+ }
+ } else {
+ System.out.println("Add input " + inp);
+ FileInputFormat.addInputPath(job, new Path(inp));
+ }
+ }
+ }
+
+ protected void attachKylinPropsAndMetadata(CubeInstance cube, Configuration conf) throws IOException {
+ File tmp = File.createTempFile("kylin_job_meta", "");
+ tmp.delete(); // we need a directory, so delete the file first
+
+ File metaDir = new File(tmp, "meta");
+ metaDir.mkdirs();
+ metaDir.getParentFile().deleteOnExit();
+
+ // write kylin.properties
+ KylinConfig kylinConfig = KylinConfig.getInstanceFromEnv();
+ File kylinPropsFile = new File(metaDir, "kylin.properties");
+ kylinConfig.writeProperties(kylinPropsFile);
+
+ // write cube / cube_desc / dict / table
+ ArrayList<String> dumpList = new ArrayList<String>();
+ dumpList.add(cube.getResourcePath());
+ dumpList.add(cube.getDescriptor().getResourcePath());
+ if (cube.isInvertedIndex()) {
+ dumpList.add(cube.getInvertedIndexDesc().getResourcePath());
+ }
+ for (TableDesc table : cube.getDescriptor().listTables()) {
+ dumpList.add(table.getResourcePath());
+ }
+
+ for (CubeSegment segment : cube.getSegments()) {
+ dumpList.addAll(segment.getDictionaryPaths());
+ }
+
+ dumpResources(kylinConfig, metaDir, dumpList);
+
+ // hadoop distributed cache
+ conf.set("tmpfiles", "file:///" + OptionsHelper.convertToFileURL(metaDir.getAbsolutePath()));
+ }
+
+ private void dumpResources(KylinConfig kylinConfig, File metaDir, ArrayList<String> dumpList) throws IOException {
+ ResourceStore from = ResourceStore.getStore(kylinConfig);
+ KylinConfig localConfig = KylinConfig.createInstanceFromUri(metaDir.getAbsolutePath());
+ ResourceStore to = ResourceStore.getStore(localConfig);
+ for (String path : dumpList) {
+ InputStream in = from.getResource(path);
+ if (in == null)
+ throw new IllegalStateException("No resource found at -- " + path);
+ long ts = from.getResourceTimestamp(path);
+ to.putResource(path, in, ts);
+ log.info("Dumped resource " + path + " to " + metaDir.getAbsolutePath());
+ }
+ }
+
+ protected void deletePath(Configuration conf, Path path) throws IOException {
+ FileSystem fs = FileSystem.get(conf);
+ if (fs.exists(path)) {
+ fs.delete(path, true);
+ }
+ }
+
+ protected double getTotalMapInputMB() throws ClassNotFoundException, IOException, InterruptedException, JobException {
+ if (job == null) {
+ throw new JobException("Job is null");
+ }
+
+ long mapInputBytes = 0;
+ InputFormat<?, ?> input = ReflectionUtils.newInstance(job.getInputFormatClass(), job.getConfiguration());
+ for (InputSplit split : input.getSplits(job)) {
+ mapInputBytes += split.getLength();
+ }
+ if (mapInputBytes == 0) {
+ throw new IllegalArgumentException("Map input splits are 0 bytes, something is wrong!");
+ }
+ double totalMapInputMB = (double) mapInputBytes / 1024 / 1024;
+ return totalMapInputMB;
+ }
+
+ protected int getMapInputSplitCount() throws ClassNotFoundException, JobException, IOException, InterruptedException {
+ if (job == null) {
+ throw new JobException("Job is null");
+ }
+ InputFormat<?, ?> input = ReflectionUtils.newInstance(job.getInputFormatClass(), job.getConfiguration());
+ return input.getSplits(job).size();
+ }
+
+ public static KylinConfig loadKylinPropsAndMetadata(Configuration conf) throws IOException {
+ File metaDir = new File("meta");
+ System.setProperty(KylinConfig.KYLIN_CONF, metaDir.getAbsolutePath());
+ System.out.println("The absolute path for meta dir is " + metaDir.getAbsolutePath());
+ KylinConfig kylinConfig = KylinConfig.getInstanceFromEnv();
+ kylinConfig.setMetadataUrl(metaDir.getCanonicalPath());
+ return kylinConfig;
+ }
+
+ public void kill() throws JobException {
+ if (job != null) {
+ try {
+ job.killJob();
+ } catch (IOException e) {
+ throw new JobException(e);
+ }
+ }
+ }
+
+ public Map<String, String> getInfo() throws JobException {
+ if (job != null) {
+ Map<String, String> status = new HashMap<String, String>();
+ if (null != job.getJobID()) {
+ status.put(JobInstance.MR_JOB_ID, job.getJobID().toString());
+ }
+ if (null != job.getTrackingURL()) {
+ status.put(JobInstance.YARN_APP_URL, job.getTrackingURL().toString());
+ }
+
+ return status;
+ } else {
+ throw new JobException("Job is null");
+ }
+ }
+
+ public Counters getCounters() throws JobException {
+ if (job != null) {
+ try {
+ return job.getCounters();
+ } catch (IOException e) {
+ throw new JobException(e);
+ }
+ } else {
+ throw new JobException("Job is null");
+ }
+ }
+
+ public void setAsync(boolean isAsync) {
+ this.isAsync = isAsync;
+ }
+
+}
http://git-wip-us.apache.org/repos/asf/incubator-kylin/blob/4b631f92/job/src/main/java/com/kylinolap/job/hadoop/cardinality/ColumnCardinalityMapper.java
----------------------------------------------------------------------
diff --git a/job/src/main/java/com/kylinolap/job/hadoop/cardinality/ColumnCardinalityMapper.java b/job/src/main/java/com/kylinolap/job/hadoop/cardinality/ColumnCardinalityMapper.java
new file mode 100644
index 0000000..e25ec36
--- /dev/null
+++ b/job/src/main/java/com/kylinolap/job/hadoop/cardinality/ColumnCardinalityMapper.java
@@ -0,0 +1,81 @@
+/*
+ * Copyright 2013-2014 eBay Software Foundation
+ *
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package com.kylinolap.job.hadoop.cardinality;
+
+import java.io.IOException;
+import java.nio.ByteBuffer;
+import java.util.HashMap;
+import java.util.Iterator;
+import java.util.Map;
+import java.util.StringTokenizer;
+
+import org.apache.hadoop.hbase.util.Bytes;
+import org.apache.hadoop.io.BytesWritable;
+import org.apache.hadoop.io.IntWritable;
+import org.apache.hadoop.io.Text;
+import org.apache.hadoop.mapreduce.Mapper;
+
+import com.kylinolap.common.hll.HyperLogLogPlusCounter;
+import com.kylinolap.cube.kv.RowConstants;
+
+/**
+ * @author Jack
+ *
+ */
+public class ColumnCardinalityMapper<T> extends Mapper<T, Text, IntWritable, BytesWritable> {
+
+ private Map<Integer, HyperLogLogPlusCounter> hllcMap = new HashMap<Integer, HyperLogLogPlusCounter>();
+ public static final String DEFAULT_DELIM = ",";
+
+ @Override
+ public void map(T key, Text value, Context context) throws IOException, InterruptedException {
+ String delim = context.getConfiguration().get(HiveColumnCardinalityJob.KEY_INPUT_DELIM);
+ if (delim == null) {
+ delim = DEFAULT_DELIM;
+ }
+ String line = value.toString();
+ StringTokenizer tokenizer = new StringTokenizer(line, delim);
+ int i = 1;
+ while (tokenizer.hasMoreTokens()) {
+ String temp = tokenizer.nextToken();
+ getHllc(i).add(Bytes.toBytes(temp));
+ i++;
+ }
+ }
+
+ private HyperLogLogPlusCounter getHllc(Integer key) {
+ if (!hllcMap.containsKey(key)) {
+ hllcMap.put(key, new HyperLogLogPlusCounter());
+ }
+ return hllcMap.get(key);
+ }
+
+ @Override
+ protected void cleanup(Context context) throws IOException, InterruptedException {
+ Iterator<Integer> it = hllcMap.keySet().iterator();
+ while (it.hasNext()) {
+ int key = it.next();
+ HyperLogLogPlusCounter hllc = hllcMap.get(key);
+ ByteBuffer buf = ByteBuffer.allocate(RowConstants.ROWVALUE_BUFFER_SIZE);
+ buf.clear();
+ hllc.writeRegisters(buf);
+ buf.flip();
+ context.write(new IntWritable(key), new BytesWritable(buf.array()));
+ }
+ }
+
+}