You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@kylin.apache.org by li...@apache.org on 2015/02/10 07:48:14 UTC
[21/54] [abbrv] [partial] incubator-kylin git commit: cleanup for
migration from github.com
http://git-wip-us.apache.org/repos/asf/incubator-kylin/blob/a4fd4268/job/src/main/java/com/kylinolap/job/engine/JobEngine.java
----------------------------------------------------------------------
diff --git a/job/src/main/java/com/kylinolap/job/engine/JobEngine.java b/job/src/main/java/com/kylinolap/job/engine/JobEngine.java
deleted file mode 100644
index b86fe36..0000000
--- a/job/src/main/java/com/kylinolap/job/engine/JobEngine.java
+++ /dev/null
@@ -1,227 +0,0 @@
-/*
- * Copyright 2013-2014 eBay Software Foundation
- *
- * Licensed under the Apache License, Version 2.0 (the "License");
- * you may not use this file except in compliance with the License.
- * You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package com.kylinolap.job.engine;
-
-/**
- * @author George Song (ysong1), xduo
- *
- */
-import java.io.File;
-import java.io.IOException;
-import java.util.Arrays;
-import java.util.Collection;
-import java.util.concurrent.ConcurrentHashMap;
-
-import com.kylinolap.common.KylinConfig;
-import org.apache.commons.lang.ArrayUtils;
-import org.apache.commons.math.stat.descriptive.rank.Percentile;
-import org.apache.curator.RetryPolicy;
-import org.apache.curator.framework.CuratorFramework;
-import org.apache.curator.framework.CuratorFrameworkFactory;
-import org.apache.curator.framework.imps.CuratorFrameworkState;
-import org.apache.curator.framework.recipes.locks.InterProcessMutex;
-import org.apache.curator.framework.state.ConnectionState;
-import org.apache.curator.framework.state.ConnectionStateListener;
-import org.apache.curator.retry.ExponentialBackoffRetry;
-import org.apache.hadoop.hbase.util.Bytes;
-import org.quartz.Scheduler;
-import org.slf4j.Logger;
-import org.slf4j.LoggerFactory;
-
-import com.kylinolap.job.JobInstance;
-import com.kylinolap.job.JobInstance.JobStep;
-import com.kylinolap.job.constant.JobConstants;
-import com.kylinolap.job.exception.JobException;
-
-public class JobEngine implements ConnectionStateListener {
-
- private static Logger log = LoggerFactory.getLogger(JobEngine.class);
-
- private final String engineID;
-
- private final JobEngineConfig engineConfig;
- private final QuatzScheduler scheduler;
- private InterProcessMutex sharedLock;
- private CuratorFramework zkClient;
-
- private static final String ZOOKEEPER_LOCK_PATH = "/kylin/job_engine/lock";
- private static final ConcurrentHashMap<JobEngineConfig, JobEngine> CACHE = new ConcurrentHashMap<JobEngineConfig, JobEngine>();
- private int daemonJobIntervalInSeconds;
-
- public static JobEngine getInstance(String engineID, JobEngineConfig engineCfg) throws JobException {
- JobEngine r = CACHE.get(engineCfg);
- if (r == null) {
- r = new JobEngine(engineID, engineCfg);
- CACHE.putIfAbsent(engineCfg, r);
- }
- return r;
- }
-
- private JobEngine(String engineID, JobEngineConfig context) throws JobException {
- if (context.getZookeeperString() == null || context.getZookeeperString().equals("")) {
- throw new IllegalArgumentException("Zookeeper connection string is null or empty");
- }
- log.info("Using metadata url: " + context.getConfig());
-
- this.engineID = engineID;
- this.engineConfig = context;
- this.scheduler = new QuatzScheduler();
-
- RetryPolicy retryPolicy = new ExponentialBackoffRetry(1000, 3);
- this.zkClient = CuratorFrameworkFactory.newClient(context.getZookeeperString(), retryPolicy);
- this.zkClient.start();
-
- Runtime.getRuntime().addShutdownHook(new Thread() {
- public void run() {
- log.debug("Closing HBASE connection");
- releaseLock();
- }
- });
- }
-
- private void releaseLock() {
- try {
- if (sharedLock != null && sharedLock.isAcquiredInThisProcess()) {
- sharedLock.release();
- }
- if (zkClient.getState().equals(CuratorFrameworkState.STARTED)) {
- // client.setData().forPath(ZOOKEEPER_LOCK_PATH, null);
- if (zkClient.checkExists().forPath(ZOOKEEPER_LOCK_PATH + "/" + this.engineID) != null) {
- zkClient.delete().guaranteed().deletingChildrenIfNeeded().forPath(ZOOKEEPER_LOCK_PATH + "/" + this.engineID);
- }
- }
- } catch (Exception e) {
- throw new RuntimeException(e);
- }
- }
-
- public void start(int daemonJobIntervalInSeconds) throws Exception {
- this.daemonJobIntervalInSeconds = daemonJobIntervalInSeconds;
-
- sharedLock = new InterProcessMutex(zkClient, ZOOKEEPER_LOCK_PATH + "/" + this.engineID);
- log.info("Trying to obtain the shared lock...");
- // current thread will be blocked until the lock is got
- sharedLock.acquire();
-
- log.info("Obtained the shared lock. Starting job scheduler...");
- zkClient.setData().forPath(ZOOKEEPER_LOCK_PATH + "/" + this.engineID, Bytes.toBytes(this.engineID));
- startScheduler();
- }
-
- private void startScheduler() throws JobException, IOException {
- String logDir = KylinConfig.getInstanceFromEnv().getKylinJobLogDir();
- new File(logDir).mkdirs();
-
- log.info("Starting scheduler.");
- this.scheduler.start();
- this.scheduler.scheduleFetcher(this.daemonJobIntervalInSeconds, this.engineConfig);
- }
-
- public void start() throws Exception {
- start(JobConstants.DEFAULT_SCHEDULER_INTERVAL_SECONDS);
- }
-
- public void stop() throws JobException {
- releaseLock();
- this.scheduler.stop();
- }
-
- @Override
- public void stateChanged(CuratorFramework client, ConnectionState newState) {
- if ((newState == ConnectionState.SUSPENDED) || (newState == ConnectionState.LOST)) {
- releaseLock();
- }
- }
-
- public void interruptJob(JobInstance jobInstance, JobStep jobStep) throws IOException, JobException {
- // kill the running step
- this.scheduler.interrupt(jobInstance, jobStep);
- }
-
- public Scheduler getScheduler() {
- return this.scheduler.getScheduler();
- }
-
- // Job engine metrics related methods
-
- // <StepID, Duration Seconds>
- public static ConcurrentHashMap<String, Double> JOB_DURATION = new ConcurrentHashMap<String, Double>();
-
- public int getNumberOfJobStepsExecuted() {
- return JOB_DURATION.values().size();
- }
-
- public String getPrimaryEngineID() throws Exception {
- byte[] data = zkClient.getData().forPath(ZOOKEEPER_LOCK_PATH + "/" + this.engineID);
- if (data == null) {
- return "";
- } else {
- return Bytes.toString(data);
- }
- }
-
- public double getMinJobStepDuration() {
- double[] all = getJobStepDuration();
- Arrays.sort(all);
-
- if (all.length > 0) {
- return all[0];
- } else {
- return 0;
- }
- }
-
- private double[] getJobStepDuration() {
- Collection<Double> values = JOB_DURATION.values();
- Double[] all = (Double[]) values.toArray(new Double[values.size()]);
- return ArrayUtils.toPrimitive(all);
- }
-
- public double getMaxJobStepDuration() {
- double[] all = getJobStepDuration();
- Arrays.sort(all);
-
- if (all.length > 1) {
- return all[all.length - 1];
- } else {
- return 0;
- }
- }
-
- public double getPercentileJobStepDuration(double percentile) {
- Collection<Double> values = JOB_DURATION.values();
- Double[] all = (Double[]) values.toArray(new Double[values.size()]);
- Percentile p = new Percentile(percentile);
- return p.evaluate(ArrayUtils.toPrimitive(all));
- }
-
- public Integer getScheduledJobsSzie() {
- return scheduler.getScheduledJobs();
- }
-
- public int getEngineThreadPoolSize() {
- return scheduler.getThreadPoolSize();
- }
-
- public int getNumberOfIdleSlots() {
- return scheduler.getIdleSlots();
- }
-
- public int getNumberOfJobStepsRunning() {
- return scheduler.getRunningJobs();
- }
-
-}
http://git-wip-us.apache.org/repos/asf/incubator-kylin/blob/a4fd4268/job/src/main/java/com/kylinolap/job/engine/JobEngineConfig.java
----------------------------------------------------------------------
diff --git a/job/src/main/java/com/kylinolap/job/engine/JobEngineConfig.java b/job/src/main/java/com/kylinolap/job/engine/JobEngineConfig.java
deleted file mode 100644
index 5527cf6..0000000
--- a/job/src/main/java/com/kylinolap/job/engine/JobEngineConfig.java
+++ /dev/null
@@ -1,254 +0,0 @@
-/*
- * Copyright 2013-2014 eBay Software Foundation
- *
- * Licensed under the Apache License, Version 2.0 (the "License");
- * you may not use this file except in compliance with the License.
- * You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package com.kylinolap.job.engine;
-
-import java.io.File;
-import java.io.FileOutputStream;
-import java.io.IOException;
-import java.io.InputStream;
-import java.io.OutputStream;
-
-import org.apache.commons.lang.StringUtils;
-import org.slf4j.Logger;
-import org.slf4j.LoggerFactory;
-
-import com.kylinolap.common.KylinConfig;
-import com.kylinolap.job.tools.OptionsHelper;
-import com.kylinolap.metadata.model.cube.CubeDesc.CubeCapacity;
-
-/**
- * @author ysong1
- */
-public class JobEngineConfig {
- private static final Logger logger = LoggerFactory.getLogger(JobEngineConfig.class);
- public static String HADOOP_JOB_CONF_FILENAME = "kylin_job_conf";
-
- private String getHadoopJobConfFilePath(CubeCapacity capaticy, boolean appendSuffix) throws IOException {
- String hadoopJobConfFile;
- if (appendSuffix)
- hadoopJobConfFile = (HADOOP_JOB_CONF_FILENAME + "_" + capaticy.toString().toLowerCase() + ".xml");
- else
- hadoopJobConfFile = (HADOOP_JOB_CONF_FILENAME + ".xml");
-
- String path = System.getProperty(KylinConfig.KYLIN_CONF);
-
- if (path == null) {
- path = System.getenv(KylinConfig.KYLIN_CONF);
- }
-
- if (path != null) {
- path = path + File.separator + hadoopJobConfFile;
- }
-
- if (null == path || !new File(path).exists()) {
- File defaultFilePath = new File("/etc/kylin/" + hadoopJobConfFile);
-
- if (defaultFilePath.exists()) {
- path = defaultFilePath.getAbsolutePath();
- } else {
- logger.debug("Search conf file " + hadoopJobConfFile + " from classpath ...");
- InputStream is = JobEngineConfig.class.getClassLoader().getResourceAsStream(hadoopJobConfFile);
- if (is == null) {
- logger.debug("Can't get " + hadoopJobConfFile + " from classpath");
- logger.debug("No " + hadoopJobConfFile + " file were found");
- } else {
- File tmp = File.createTempFile(HADOOP_JOB_CONF_FILENAME, ".xml");
- inputStreamToFile(is, tmp);
- path = tmp.getAbsolutePath();
- }
- }
- }
-
- if (null == path || !new File(path).exists()) {
- return "";
- }
-
- return OptionsHelper.convertToFileURL(path);
- }
-
- public String getHadoopJobConfFilePath(CubeCapacity capaticy) throws IOException {
- String path = getHadoopJobConfFilePath(capaticy, true);
- if (!StringUtils.isEmpty(path)) {
- logger.info("Chosen job conf is : " + path);
- return path;
- } else {
- path = getHadoopJobConfFilePath(capaticy, false);
- if (!StringUtils.isEmpty(path)) {
- logger.info("Chosen job conf is : " + path);
- return path;
- }
- }
- return "";
- }
-
- private void inputStreamToFile(InputStream ins, File file) throws IOException {
- OutputStream os = new FileOutputStream(file);
- int bytesRead = 0;
- byte[] buffer = new byte[8192];
- while ((bytesRead = ins.read(buffer, 0, 8192)) != -1) {
- os.write(buffer, 0, bytesRead);
- }
- os.close();
- ins.close();
- }
-
- // there should be no setters
- private final KylinConfig config;
-
- public JobEngineConfig(KylinConfig config) {
- this.config = config;
- }
-
- public KylinConfig getConfig() {
- return config;
- }
-
- public String getHdfsWorkingDirectory() {
- return config.getHdfsWorkingDirectory();
- }
-
- /**
- * @return the kylinJobJarPath
- */
- public String getKylinJobJarPath() {
- return config.getKylinJobJarPath();
- }
-
- /**
- * @return the runAsRemoteCommand
- */
- public boolean isRunAsRemoteCommand() {
- return config.getRunAsRemoteCommand();
- }
-
- /**
- * @return the zookeeperString
- */
- public String getZookeeperString() {
- return config.getZookeeperString();
- }
-
- /**
- * @return the remoteHadoopCliHostname
- */
- public String getRemoteHadoopCliHostname() {
- return config.getRemoteHadoopCliHostname();
- }
-
- /**
- * @return the remoteHadoopCliUsername
- */
- public String getRemoteHadoopCliUsername() {
- return config.getRemoteHadoopCliUsername();
- }
-
- /**
- * @return the remoteHadoopCliPassword
- */
- public String getRemoteHadoopCliPassword() {
- return config.getRemoteHadoopCliPassword();
- }
-
- public String getMapReduceCmdExtraArgs() {
- return config.getMapReduceCmdExtraArgs();
- }
-
- /**
- * @return the yarnStatusServiceUrl
- */
- public String getYarnStatusServiceUrl() {
- return config.getYarnStatusServiceUrl();
- }
-
- /**
- * @return the maxConcurrentJobLimit
- */
- public int getMaxConcurrentJobLimit() {
- return config.getMaxConcurrentJobLimit();
- }
-
- /**
- * @return the timeZone
- */
- public String getTimeZone() {
- return config.getTimeZone();
- }
-
- /**
- * @return the adminDls
- */
- public String getAdminDls() {
- return config.getAdminDls();
- }
-
- /**
- * @return the jobStepTimeout
- */
- public long getJobStepTimeout() {
- return config.getJobStepTimeout();
- }
-
- /**
- * @return the asyncJobCheckInterval
- */
- public int getAsyncJobCheckInterval() {
- return config.getYarnStatusCheckIntervalSeconds();
- }
-
- /**
- * @return the flatTableByHive
- */
- public boolean isFlatTableByHive() {
- return config.getFlatTableByHive();
- }
-
- /*
- * (non-Javadoc)
- *
- * @see java.lang.Object#hashCode()
- */
- @Override
- public int hashCode() {
- final int prime = 31;
- int result = 1;
- result = prime * result + ((config == null) ? 0 : config.hashCode());
- return result;
- }
-
- /*
- * (non-Javadoc)
- *
- * @see java.lang.Object#equals(java.lang.Object)
- */
- @Override
- public boolean equals(Object obj) {
- if (this == obj)
- return true;
- if (obj == null)
- return false;
- if (getClass() != obj.getClass())
- return false;
- JobEngineConfig other = (JobEngineConfig) obj;
- if (config == null) {
- if (other.config != null)
- return false;
- } else if (!config.equals(other.config))
- return false;
- return true;
- }
-
-}
\ No newline at end of file
http://git-wip-us.apache.org/repos/asf/incubator-kylin/blob/a4fd4268/job/src/main/java/com/kylinolap/job/engine/JobFetcher.java
----------------------------------------------------------------------
diff --git a/job/src/main/java/com/kylinolap/job/engine/JobFetcher.java b/job/src/main/java/com/kylinolap/job/engine/JobFetcher.java
deleted file mode 100644
index 2389f27..0000000
--- a/job/src/main/java/com/kylinolap/job/engine/JobFetcher.java
+++ /dev/null
@@ -1,117 +0,0 @@
-/*
- * Copyright 2013-2014 eBay Software Foundation
- *
- * Licensed under the Apache License, Version 2.0 (the "License");
- * you may not use this file except in compliance with the License.
- * You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package com.kylinolap.job.engine;
-
-import java.util.List;
-import java.util.Random;
-import java.util.concurrent.ConcurrentHashMap;
-
-import org.quartz.Job;
-import org.quartz.JobDetail;
-import org.quartz.JobExecutionContext;
-import org.quartz.JobExecutionException;
-import org.quartz.Trigger;
-import org.quartz.TriggerBuilder;
-import org.slf4j.Logger;
-import org.slf4j.LoggerFactory;
-
-import com.kylinolap.common.util.StringSplitter;
-import com.kylinolap.job.JobDAO;
-import com.kylinolap.job.JobInstance;
-import com.kylinolap.job.constant.JobConstants;
-import com.kylinolap.job.constant.JobStatusEnum;
-import com.kylinolap.job.flow.JobFlow;
-
-/**
- * @author ysong1, xduo
- *
- */
-public class JobFetcher implements Job {
-
- private static final Logger log = LoggerFactory.getLogger(JobFetcher.class);
-
- public static final int JOB_THRESHOLD = 10;
-
- @Override
- public void execute(JobExecutionContext context) throws JobExecutionException {
-
- JobEngineConfig engineConfig = (JobEngineConfig) context.getJobDetail().getJobDataMap().get(JobConstants.PROP_ENGINE_CONTEXT);
-
- JobDAO jobDAO = JobDAO.getInstance(engineConfig.getConfig());
-
- try {
- // get all pending jobs
- List<JobInstance> pendingJobList = jobDAO.listAllJobs(JobStatusEnum.PENDING);
-
- log.debug(pendingJobList.size() + " pending jobs");
- int leftJobs = JOB_THRESHOLD;
- Random rand = new Random();
- int maxConcurrentJobCount = engineConfig.getMaxConcurrentJobLimit();
-
- for (JobInstance jobInstance : pendingJobList) {
- @SuppressWarnings("unchecked")
- ConcurrentHashMap<String, JobFlow> jobFlows = (ConcurrentHashMap<String, JobFlow>) context.getScheduler().getContext().get(JobConstants.PROP_JOB_RUNTIME_FLOWS);
-
- if (jobFlows.size() >= maxConcurrentJobCount) {
- // If too many job instances in current job context, just
- // wait.
- break;
- }
-
- try {
- // there should be only 1 job for a certain job running
- boolean cubeHasRunningJob = false;
- for (String s : jobFlows.keySet()) {
- String[] tmp = StringSplitter.split(s, ".");
- String cubename = tmp[0];
- String jobid = tmp[1];
- if (cubename.equals(jobInstance.getRelatedCube())) {
- log.info("There is already a job of cube " + jobInstance.getRelatedCube() + " running, job uuid is " + jobid);
- cubeHasRunningJob = true;
- break;
- }
- }
-
- if (cubeHasRunningJob == false && jobFlows.containsKey(JobInstance.getJobIdentity(jobInstance)) == false) {
- // create job flow
- JobFlow jobFlow = new JobFlow(jobInstance, engineConfig);
- jobFlows.put(JobInstance.getJobIdentity(jobInstance), jobFlow);
-
- // schedule the 1st step
- Trigger trigger = TriggerBuilder.newTrigger().startNow().build();
- JobDetail firstStep = jobFlow.getFirst();
- context.getScheduler().scheduleJob(firstStep, trigger);
-
- log.info("Job " + jobInstance.getUuid() + " has been scheduled with the first step " + firstStep.getKey().toString());
- }
- } catch (Exception e) {
- log.error("Failed to trigger the job detail", e);
- }
-
- if (--leftJobs < 0) {
- log.info("Too many pending jobs!");
- break;
- }
- long ms = Math.abs(rand.nextLong() % 10L);
- Thread.sleep(ms * 1000L);
- }
- } catch (Throwable t) {
- log.error(t.getMessage());
- throw new JobExecutionException(t);
- }
- }
-}
http://git-wip-us.apache.org/repos/asf/incubator-kylin/blob/a4fd4268/job/src/main/java/com/kylinolap/job/engine/QuatzScheduler.java
----------------------------------------------------------------------
diff --git a/job/src/main/java/com/kylinolap/job/engine/QuatzScheduler.java b/job/src/main/java/com/kylinolap/job/engine/QuatzScheduler.java
deleted file mode 100644
index f080c37..0000000
--- a/job/src/main/java/com/kylinolap/job/engine/QuatzScheduler.java
+++ /dev/null
@@ -1,193 +0,0 @@
-/*
- * Copyright 2013-2014 eBay Software Foundation
- *
- * Licensed under the Apache License, Version 2.0 (the "License");
- * you may not use this file except in compliance with the License.
- * You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package com.kylinolap.job.engine;
-
-import java.io.IOException;
-import java.util.Properties;
-import java.util.concurrent.ConcurrentHashMap;
-
-import org.quartz.JobBuilder;
-import org.quartz.JobDetail;
-import org.quartz.JobKey;
-import org.quartz.Scheduler;
-import org.quartz.SchedulerException;
-import org.quartz.SimpleScheduleBuilder;
-import org.quartz.Trigger;
-import org.quartz.TriggerBuilder;
-import org.quartz.UnableToInterruptJobException;
-import org.quartz.impl.StdSchedulerFactory;
-import org.quartz.impl.matchers.GroupMatcher;
-import org.slf4j.Logger;
-import org.slf4j.LoggerFactory;
-
-import com.kylinolap.job.JobInstance;
-import com.kylinolap.job.JobInstance.JobStep;
-import com.kylinolap.job.cmd.IJobCommand;
-import com.kylinolap.job.constant.JobConstants;
-import com.kylinolap.job.exception.JobException;
-import com.kylinolap.job.flow.JobFlow;
-import com.kylinolap.job.flow.JobFlowListener;
-
-/**
- * @author xduo
- *
- */
-public class QuatzScheduler {
-
- private static Logger log = LoggerFactory.getLogger(QuatzScheduler.class);
-
- private Scheduler scheduler;
- private JobFlowListener globalJobListener;
-
- // public static void scheduleJobFlow(Scheduler scheduler, JobFlow jobFlow)
- // throws JobException {
- // // schedule the 1st step
- // Trigger trigger = TriggerBuilder.newTrigger().startNow().build();
- // JobDetail firstStep = jobFlow.getFirst();
- // try {
- // scheduler.scheduleJob(firstStep, trigger);
- // } catch (SchedulerException e) {
- // throw new JobException(e);
- // }
- // }
-
- public QuatzScheduler() throws JobException {
- this.globalJobListener = new JobFlowListener(JobConstants.GLOBAL_LISTENER_NAME);
- StdSchedulerFactory sf = new StdSchedulerFactory();
- Properties schedulerProperties = new Properties();
- int numberOfProcessors = Runtime.getRuntime().availableProcessors();
- schedulerProperties.setProperty("org.quartz.threadPool.threadCount", String.valueOf(numberOfProcessors));
- schedulerProperties.setProperty("org.quartz.scheduler.skipUpdateCheck", "true");
-
- try {
- sf.initialize(schedulerProperties);
- this.scheduler = sf.getScheduler();
- this.scheduler.getListenerManager().addJobListener(this.globalJobListener, GroupMatcher.jobGroupEquals(JobConstants.CUBE_JOB_GROUP_NAME));
-
- // cubename.jobUuid -> job flow
- this.scheduler.getContext().put(JobConstants.PROP_JOB_RUNTIME_FLOWS, new ConcurrentHashMap<String, JobFlow>());
-
- // put the scheduler in standby mode first
- this.scheduler.standby();
- } catch (SchedulerException e) {
- throw new JobException(e);
- }
- }
-
- public void start() throws JobException {
- try {
- this.scheduler.start();
- } catch (SchedulerException e) {
- throw new JobException(e);
- }
- }
-
- public void scheduleFetcher(int intervalInSeconds, JobEngineConfig engineConfig) throws JobException {
- JobDetail job = JobBuilder.newJob(JobFetcher.class).withIdentity(JobFetcher.class.getCanonicalName(), JobConstants.DAEMON_JOB_GROUP_NAME).build();
- job.getJobDataMap().put(JobConstants.PROP_ENGINE_CONTEXT, engineConfig);
-
- Trigger trigger = TriggerBuilder.newTrigger().startNow().withSchedule(SimpleScheduleBuilder.simpleSchedule().withIntervalInSeconds(intervalInSeconds).repeatForever()).build();
-
- try {
- this.scheduler.scheduleJob(job, trigger);
- } catch (SchedulerException e) {
- throw new JobException(e);
- }
- }
-
- public boolean interrupt(JobInstance jobInstance, JobStep jobStep) throws JobException, IOException {
- JobKey jobKey = new JobKey(JobInstance.getStepIdentity(jobInstance, jobStep), JobConstants.CUBE_JOB_GROUP_NAME);
-
- boolean res = false;
- try {
- JobDetail jobDetail = this.scheduler.getJobDetail(jobKey);
-
- IJobCommand iJobStepCmd = (IJobCommand) jobDetail.getJobDataMap().get(JobConstants.PROP_JOB_CMD_EXECUTOR);
- if (null != iJobStepCmd) {
- iJobStepCmd.cancel();
- }
-
- jobDetail.getJobDataMap().put(JobConstants.PROP_JOB_KILLED, true);
- this.scheduler.addJob(jobDetail, true, true);
-
- @SuppressWarnings("unchecked")
- ConcurrentHashMap<String, JobFlow> jobFlows = (ConcurrentHashMap<String, JobFlow>) this.scheduler.getContext().get(JobConstants.PROP_JOB_RUNTIME_FLOWS);
- jobFlows.remove(JobInstance.getJobIdentity(jobInstance));
- } catch (UnableToInterruptJobException e) {
- log.error(e.getLocalizedMessage(), e);
- throw new JobException(e);
- } catch (SchedulerException e) {
- log.error(e.getLocalizedMessage(), e);
- throw new JobException(e);
- }
- return res;
- }
-
- public void stop() throws JobException {
- try {
- this.scheduler.standby();
- } catch (SchedulerException e) {
- throw new JobException(e);
- }
- }
-
- public Scheduler getScheduler() {
- return this.scheduler;
- }
-
- // // metrics
-
- public int getThreadPoolSize() {
- try {
- return scheduler.getMetaData().getThreadPoolSize();
- } catch (SchedulerException e) {
- log.error("Can't get scheduler metadata!", e);
- return 0;
- }
- }
-
- public int getRunningJobs() {
- try {
- return this.scheduler.getCurrentlyExecutingJobs().size();
- } catch (SchedulerException e) {
- log.error("Can't get scheduler metadata!", e);
- return 0;
- }
- }
-
- public int getIdleSlots() {
- try {
- return this.scheduler.getMetaData().getThreadPoolSize() - this.scheduler.getCurrentlyExecutingJobs().size();
- } catch (SchedulerException e) {
- log.error("Can't get scheduler metadata!", e);
- return 0;
- }
- }
-
- public int getScheduledJobs() {
- int allTriggersCount = 0;
- try {
- for (String groupName : scheduler.getJobGroupNames()) {
- allTriggersCount += scheduler.getJobKeys(GroupMatcher.jobGroupEquals(groupName)).size();
- }
- } catch (SchedulerException e) {
- log.error("Can't get scheduler metadata!", e);
- }
- return allTriggersCount;
- }
-
-}
http://git-wip-us.apache.org/repos/asf/incubator-kylin/blob/a4fd4268/job/src/main/java/com/kylinolap/job/exception/InvalidJobInstanceException.java
----------------------------------------------------------------------
diff --git a/job/src/main/java/com/kylinolap/job/exception/InvalidJobInstanceException.java b/job/src/main/java/com/kylinolap/job/exception/InvalidJobInstanceException.java
deleted file mode 100644
index 1b404cb..0000000
--- a/job/src/main/java/com/kylinolap/job/exception/InvalidJobInstanceException.java
+++ /dev/null
@@ -1,30 +0,0 @@
-/*
- * Copyright 2013-2014 eBay Software Foundation
- *
- * Licensed under the Apache License, Version 2.0 (the "License");
- * you may not use this file except in compliance with the License.
- * You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package com.kylinolap.job.exception;
-
-/**
- * @author ysong1
- *
- */
-public class InvalidJobInstanceException extends Exception {
-
- private static final long serialVersionUID = 2045169570038227895L;
-
- public InvalidJobInstanceException(String message) {
- super(message);
- }
-}
http://git-wip-us.apache.org/repos/asf/incubator-kylin/blob/a4fd4268/job/src/main/java/com/kylinolap/job/exception/InvalidJobStatusException.java
----------------------------------------------------------------------
diff --git a/job/src/main/java/com/kylinolap/job/exception/InvalidJobStatusException.java b/job/src/main/java/com/kylinolap/job/exception/InvalidJobStatusException.java
deleted file mode 100644
index d1050d0..0000000
--- a/job/src/main/java/com/kylinolap/job/exception/InvalidJobStatusException.java
+++ /dev/null
@@ -1,33 +0,0 @@
-/*
- * Copyright 2013-2014 eBay Software Foundation
- *
- * Licensed under the Apache License, Version 2.0 (the "License");
- * you may not use this file except in compliance with the License.
- * You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package com.kylinolap.job.exception;
-
-/**
- * @author xduo
- *
- */
-public class InvalidJobStatusException extends Exception {
-
- /**
- * @param string
- */
- public InvalidJobStatusException(String string) {
- }
-
- private static final long serialVersionUID = -8549756520626114000L;
-
-}
http://git-wip-us.apache.org/repos/asf/incubator-kylin/blob/a4fd4268/job/src/main/java/com/kylinolap/job/exception/JobException.java
----------------------------------------------------------------------
diff --git a/job/src/main/java/com/kylinolap/job/exception/JobException.java b/job/src/main/java/com/kylinolap/job/exception/JobException.java
deleted file mode 100644
index 6c4bc6a..0000000
--- a/job/src/main/java/com/kylinolap/job/exception/JobException.java
+++ /dev/null
@@ -1,56 +0,0 @@
-/*
- * Copyright 2013-2014 eBay Software Foundation
- *
- * Licensed under the Apache License, Version 2.0 (the "License");
- * you may not use this file except in compliance with the License.
- * You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package com.kylinolap.job.exception;
-
-/**
- * @author xduo
- *
- */
-public class JobException extends Exception {
-
- private static final long serialVersionUID = 1L;
-
- /**
- *
- */
- public JobException() {
- super();
- }
-
- /**
- * @param message
- * @param cause
- */
- public JobException(String message, Throwable cause) {
- super(message, cause);
- }
-
- /**
- * @param message
- */
- public JobException(String message) {
- super(message);
- }
-
- /**
- * @param cause
- */
- public JobException(Throwable cause) {
- super(cause);
- }
-
-}
http://git-wip-us.apache.org/repos/asf/incubator-kylin/blob/a4fd4268/job/src/main/java/com/kylinolap/job/flow/AsyncJobFlowNode.java
----------------------------------------------------------------------
diff --git a/job/src/main/java/com/kylinolap/job/flow/AsyncJobFlowNode.java b/job/src/main/java/com/kylinolap/job/flow/AsyncJobFlowNode.java
deleted file mode 100644
index 1b8777e..0000000
--- a/job/src/main/java/com/kylinolap/job/flow/AsyncJobFlowNode.java
+++ /dev/null
@@ -1,119 +0,0 @@
-/*
- * Copyright 2013-2014 eBay Software Foundation
- *
- * Licensed under the Apache License, Version 2.0 (the "License");
- * you may not use this file except in compliance with the License.
- * You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package com.kylinolap.job.flow;
-
-import java.util.HashSet;
-import java.util.Set;
-
-import org.quartz.DateBuilder;
-import org.quartz.DateBuilder.IntervalUnit;
-import org.quartz.JobDataMap;
-import org.quartz.JobExecutionContext;
-import org.quartz.JobExecutionException;
-import org.quartz.SchedulerException;
-import org.quartz.Trigger;
-import org.quartz.TriggerBuilder;
-
-import com.kylinolap.common.KylinConfig;
-import com.kylinolap.job.JobDAO;
-import com.kylinolap.job.JobInstance;
-import com.kylinolap.job.JobInstance.JobStep;
-import com.kylinolap.job.cmd.ICommandOutput;
-import com.kylinolap.job.cmd.JobCommandFactory;
-import com.kylinolap.job.constant.JobConstants;
-import com.kylinolap.job.constant.JobStepStatusEnum;
-import com.kylinolap.job.engine.JobEngineConfig;
-import com.kylinolap.job.exception.JobException;
-
-/**
- * @author xduo
- *
- */
-public class AsyncJobFlowNode extends JobFlowNode {
-
- @Override
- public void execute(JobExecutionContext context) throws JobExecutionException {
- this.currentJobDetail = context.getJobDetail();
- JobDataMap data = this.currentJobDetail.getJobDataMap();
- JobFlow jobFlow = (JobFlow) data.get(JobConstants.PROP_JOB_FLOW);
- JobEngineConfig engineConfig = jobFlow.getJobengineConfig();
- KylinConfig config = engineConfig.getConfig();
- String jobInstanceID = data.getString(JobConstants.PROP_JOBINSTANCE_UUID);
- int jobStepID = data.getInt(JobConstants.PROP_JOBSTEP_SEQ_ID);
- ICommandOutput output = (ICommandOutput) data.get(JobConstants.PROP_JOB_CMD_OUTPUT);
-
- try {
- if (data.getBoolean(JobConstants.PROP_JOB_KILLED)) {
- log.info(this.currentJobDetail.getKey() + " is killed");
- return;
- }
-
- if (output == null) {
- JobInstance jobInstance = updateJobStep(jobInstanceID, jobStepID, config, JobStepStatusEnum.RUNNING, System.currentTimeMillis(), null, null);
-
- String command = data.getString(JobConstants.PROP_COMMAND);
- jobCmd = JobCommandFactory.getJobCommand(command, jobInstance, jobStepID, engineConfig);
- output = jobCmd.execute();
- data.put(JobConstants.PROP_JOB_CMD_OUTPUT, output);
- data.put(JobConstants.PROP_JOB_CMD_EXECUTOR, jobCmd);
- context.getScheduler().addJob(this.currentJobDetail, true, true);
-
- JobStepStatusEnum stepStatus = output.getStatus();
- updateJobStep(jobInstanceID, jobStepID, config, stepStatus, null, stepStatus.isComplete() ? System.currentTimeMillis() : null, output.getOutput());
-
- context.setResult(output.getExitCode());
- scheduleStatusChecker(context);
- log.debug("Start async job " + currentJobDetail.getKey());
- } else {
- JobInstance jobInstance = JobDAO.getInstance(engineConfig.getConfig()).getJob(jobInstanceID);
- JobStep jobStep = jobInstance.getSteps().get(jobStepID);
-
- log.debug("Start to check hadoop job status of " + currentJobDetail.getKey());
- JobStepStatusEnum stepStatus = output.getStatus();
-
- if ((System.currentTimeMillis() - jobStep.getExecStartTime()) / 1000 >= engineConfig.getJobStepTimeout()) {
- throw new JobException("Job step " + jobStep.getName() + " timeout.");
- }
-
- updateJobStep(jobInstance.getUuid(), jobStepID, config, stepStatus, null, stepStatus.isComplete() ? System.currentTimeMillis() : null, output.getOutput());
-
- if (!stepStatus.isComplete()) {
- scheduleStatusChecker(context);
- }
-
- context.setResult(0);
- log.debug("Status of async job " + currentJobDetail.getKey() + ":" + stepStatus);
- }
- } catch (Throwable t) {
- handleException(jobInstanceID, jobStepID, config, t);
- }
-
- }
-
- private void scheduleStatusChecker(JobExecutionContext context) throws SchedulerException {
- JobDataMap jobDataMap = this.currentJobDetail.getJobDataMap();
- JobFlow jobFlow = (JobFlow) jobDataMap.get(JobConstants.PROP_JOB_FLOW);
- JobEngineConfig engineConfig = jobFlow.getJobengineConfig();
- int interval = engineConfig.getAsyncJobCheckInterval();
- log.debug("Trigger a status check job in " + interval + " seconds for job " + currentJobDetail.getKey());
-
- Trigger trigger = TriggerBuilder.newTrigger().startAt(DateBuilder.futureDate(interval, IntervalUnit.SECOND)).build();
- Set<Trigger> triggers = new HashSet<Trigger>();
- triggers.add(trigger);
- context.getScheduler().scheduleJob(currentJobDetail, triggers, true);
- }
-}
http://git-wip-us.apache.org/repos/asf/incubator-kylin/blob/a4fd4268/job/src/main/java/com/kylinolap/job/flow/JobFlow.java
----------------------------------------------------------------------
diff --git a/job/src/main/java/com/kylinolap/job/flow/JobFlow.java b/job/src/main/java/com/kylinolap/job/flow/JobFlow.java
deleted file mode 100644
index 9a58107..0000000
--- a/job/src/main/java/com/kylinolap/job/flow/JobFlow.java
+++ /dev/null
@@ -1,159 +0,0 @@
-/*
- * Copyright 2013-2014 eBay Software Foundation
- *
- * Licensed under the Apache License, Version 2.0 (the "License");
- * you may not use this file except in compliance with the License.
- * You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package com.kylinolap.job.flow;
-
-import java.io.File;
-import java.io.IOException;
-import java.util.Collections;
-import java.util.LinkedList;
-import java.util.List;
-
-import org.apache.commons.io.FileUtils;
-import org.apache.commons.lang3.StringUtils;
-import org.quartz.JobBuilder;
-import org.quartz.JobDetail;
-import org.slf4j.Logger;
-import org.slf4j.LoggerFactory;
-
-import com.kylinolap.common.KylinConfig;
-import com.kylinolap.job.JobInstance;
-import com.kylinolap.job.JobInstance.JobStep;
-import com.kylinolap.job.constant.JobConstants;
-import com.kylinolap.job.constant.JobStepCmdTypeEnum;
-import com.kylinolap.job.engine.JobEngineConfig;
-
-/**
- * @author xduo
- */
-public class JobFlow {
-
- private static Logger log = LoggerFactory.getLogger(JobFlow.class);
-
- private final List<JobDetail> flowNodes;
- private final JobInstance jobInstance;
- private final JobEngineConfig engineConfig;
-
- public JobFlow(JobInstance job, JobEngineConfig context) {
- this.engineConfig = context;
- this.jobInstance = job;
- // validate job instance
- List<JobStep> sortedSteps = jobInstance.getSteps();
-
- if (sortedSteps == null || sortedSteps.size() == 0) {
- throw new IllegalStateException("Steps of job " + jobInstance.getUuid() + " is null or empty!");
- }
-
- // sort the steps by step_sequenceID
- Collections.sort(sortedSteps);
- // find the 1st runnable job
- int firstStepIndex = findFirstStep(sortedSteps);
-
- log.info("Job " + jobInstance.getUuid() + " will be started at step " + firstStepIndex + " (sequence number)");
-
- flowNodes = new LinkedList<JobDetail>();
- for (int i = firstStepIndex; i < sortedSteps.size(); i++) {
- JobDetail node = createJobFlowNode(jobInstance, i);
- flowNodes.add(node);
- }
- }
-
- public JobInstance getJobInstance() {
- return jobInstance;
- }
-
- public JobEngineConfig getJobengineConfig() {
- return engineConfig;
- }
-
- public JobDetail getFirst() {
- if (flowNodes.isEmpty()) {
- return null;
- }
- return flowNodes.get(0);
- }
-
- public JobDetail getNext(JobDetail jobFlowNode) {
- int targetIndex = -1;
- for (int index = 0; index < flowNodes.size(); index++) {
- if (flowNodes.get(index).equals(jobFlowNode)) {
- targetIndex = index;
- }
- }
-
- if (targetIndex != -1 && flowNodes.size() > targetIndex + 1) {
- return flowNodes.get(targetIndex + 1);
- }
- return null;
- }
-
- private int findFirstStep(List<JobStep> stepList) {
- int firstJobIndex = 0;
- for (int i = 0; i < stepList.size(); i++) {
- JobStep currentStep = stepList.get(i);
- if (currentStep.getStatus().isRunable() == false) {
- continue;
- } else {
- firstJobIndex = i;
- break;
- }
- }
- return firstJobIndex;
- }
-
- private JobDetail createJobFlowNode(final JobInstance jobInstance, final int stepSeqId) {
- JobStep step = jobInstance.getSteps().get(stepSeqId);
-
- if (jobInstance.getName() == null || step.getName() == null) {
- throw new IllegalArgumentException("JobInstance name or JobStep name cannot be null!");
- }
-
- // submit job the different groups based on isRunAsync property
- JobDetail jobFlowNode = JobBuilder.newJob(step.isRunAsync() ? AsyncJobFlowNode.class : JobFlowNode.class).withIdentity(JobInstance.getStepIdentity(jobInstance, step), JobConstants.CUBE_JOB_GROUP_NAME).storeDurably().build();
-
- // add job flow to node
- jobFlowNode.getJobDataMap().put(JobConstants.PROP_JOB_FLOW, this);
-
- // add command to flow node
- String execCmd = (step.getCmdType() == JobStepCmdTypeEnum.SHELL_CMD || step.getCmdType() == JobStepCmdTypeEnum.SHELL_CMD_HADOOP) ? wrapExecCmd(jobInstance, step.getExecCmd(), String.valueOf(step.getSequenceID())) : step.getExecCmd();
- jobFlowNode.getJobDataMap().put(JobConstants.PROP_COMMAND, execCmd);
-
- // add job instance and step sequenceID to flow node
- jobFlowNode.getJobDataMap().put(JobConstants.PROP_JOBINSTANCE_UUID, jobInstance.getUuid());
- jobFlowNode.getJobDataMap().put(JobConstants.PROP_JOBSTEP_SEQ_ID, step.getSequenceID());
-
- // add async flag to flow node
- jobFlowNode.getJobDataMap().put(JobConstants.PROP_JOB_ASYNC, step.isRunAsync());
-
- return jobFlowNode;
- }
-
- private String wrapExecCmd(JobInstance job, String cmd, String suffix) {
- if (StringUtils.isBlank(cmd))
- return cmd;
-
- KylinConfig config = KylinConfig.getInstanceFromEnv();
- try {
- FileUtils.forceMkdir(new File(config.getKylinJobLogDir()));
- } catch (IOException e) {
- throw new RuntimeException("Create log dir " + config.getKylinJobLogDir() + " failed.", e);
- }
- String log = config.getKylinJobLogDir() + "/" + job.getUuid() + "_" + suffix + ".log";
-
- String mkLogDir = "mkdir -p " + config.getKylinJobLogDir();
- return mkLogDir + ";" + "set -o pipefail; " + cmd + " 2>&1 | tee " + log;
- }
-}
http://git-wip-us.apache.org/repos/asf/incubator-kylin/blob/a4fd4268/job/src/main/java/com/kylinolap/job/flow/JobFlowListener.java
----------------------------------------------------------------------
diff --git a/job/src/main/java/com/kylinolap/job/flow/JobFlowListener.java b/job/src/main/java/com/kylinolap/job/flow/JobFlowListener.java
deleted file mode 100644
index 8274d44..0000000
--- a/job/src/main/java/com/kylinolap/job/flow/JobFlowListener.java
+++ /dev/null
@@ -1,419 +0,0 @@
-/*
- * Copyright 2013-2014 eBay Software Foundation
- *
- * Licensed under the Apache License, Version 2.0 (the "License");
- * you may not use this file except in compliance with the License.
- * You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package com.kylinolap.job.flow;
-
-import java.io.IOException;
-import java.net.InetAddress;
-import java.net.UnknownHostException;
-import java.util.ArrayList;
-import java.util.Collections;
-import java.util.Date;
-import java.util.List;
-import java.util.concurrent.ConcurrentHashMap;
-
-import com.kylinolap.cube.CubeSegmentStatusEnum;
-
-import org.apache.commons.lang.exception.ExceptionUtils;
-import org.quartz.JobDataMap;
-import org.quartz.JobDetail;
-import org.quartz.JobExecutionContext;
-import org.quartz.JobExecutionException;
-import org.quartz.JobListener;
-import org.quartz.SchedulerException;
-import org.quartz.Trigger;
-import org.quartz.TriggerBuilder;
-import org.slf4j.Logger;
-import org.slf4j.LoggerFactory;
-
-import com.kylinolap.common.KylinConfig;
-import com.kylinolap.common.util.MailService;
-import com.kylinolap.cube.CubeInstance;
-import com.kylinolap.cube.CubeManager;
-import com.kylinolap.cube.CubeSegment;
-import com.kylinolap.cube.exception.CubeIntegrityException;
-import com.kylinolap.job.JobDAO;
-import com.kylinolap.job.JobInstance;
-import com.kylinolap.job.JobInstance.JobStep;
-import com.kylinolap.job.constant.JobConstants;
-import com.kylinolap.job.constant.JobStatusEnum;
-import com.kylinolap.job.constant.JobStepStatusEnum;
-import com.kylinolap.job.engine.JobEngineConfig;
-
-/**
- * Handle kylin job and cube change update.
- *
- * @author George Song (ysong1), xduo
- *
- */
-public class JobFlowListener implements JobListener {
-
- private static Logger log = LoggerFactory.getLogger(JobFlowListener.class);
-
- private String name;
-
- public JobFlowListener(String name) {
- if (name == null) {
- throw new IllegalArgumentException("Listener name cannot be null!");
- }
- this.name = name;
- }
-
- public String getName() {
- return name;
- }
-
- @Override
- public void jobWasExecuted(JobExecutionContext context, JobExecutionException jobException) {
- log.info(context.getJobDetail().getKey() + " was executed.");
- JobDataMap jobDataMap = context.getJobDetail().getJobDataMap();
- JobFlow jobFlow = (JobFlow) jobDataMap.get(JobConstants.PROP_JOB_FLOW);
- JobEngineConfig engineConfig = jobFlow.getJobengineConfig();
- String jobUuid = jobDataMap.getString(JobConstants.PROP_JOBINSTANCE_UUID);
- int stepSeqID = jobDataMap.getInt(JobConstants.PROP_JOBSTEP_SEQ_ID);
- KylinConfig config = engineConfig.getConfig();
-
- JobInstance jobInstance = null;
- JobStep jobStep = null;
- try {
- jobInstance = JobDAO.getInstance(config).getJob(jobUuid);
- jobStep = jobInstance.getSteps().get(stepSeqID);
- CubeInstance cube = CubeManager.getInstance(config).getCube(jobInstance.getRelatedCube());
-
- log.info(context.getJobDetail().getKey() + " status: " + jobStep.getStatus());
- switch (jobStep.getStatus()) {
- case FINISHED:
- // Ensure we are using the latest metadata
- CubeManager.getInstance(config).loadCubeCache(cube);
- updateKylinJobOnSuccess(jobInstance, stepSeqID, engineConfig);
- updateCubeSegmentInfoOnSucceed(jobInstance, engineConfig);
- notifyUsers(jobInstance, engineConfig);
- scheduleNextJob(context, jobInstance);
- break;
- case ERROR:
- updateKylinJobStatus(jobInstance, stepSeqID, engineConfig);
- notifyUsers(jobInstance, engineConfig);
- break;
- case DISCARDED:
- // Ensure we are using the latest metadata
- CubeManager.getInstance(config).loadCubeCache(cube);
- updateCubeSegmentInfoOnDiscard(jobInstance, engineConfig);
- notifyUsers(jobInstance, engineConfig);
- break;
- default:
- break;
- }
- } catch (Exception e) {
- log.error(e.getMessage(), e);
- handleException(jobUuid, stepSeqID, config, e);
- } finally {
- if (null != jobInstance && jobInstance.getStatus().isComplete()) {
- try {
- context.getScheduler().deleteJob(context.getJobDetail().getKey());
- @SuppressWarnings("unchecked")
- ConcurrentHashMap<String, JobFlow> jobFlows = (ConcurrentHashMap<String, JobFlow>) context.getScheduler().getContext().get(JobConstants.PROP_JOB_RUNTIME_FLOWS);
- jobFlows.remove(JobInstance.getJobIdentity(jobInstance));
- } catch (SchedulerException e) {
- log.error(e.getMessage(), e);
- }
- }
- }
- }
-
- /*
- * (non-Javadoc)
- *
- * @see
- * org.quartz.JobListener#jobToBeExecuted(org.quartz.JobExecutionContext)
- */
- @Override
- public void jobToBeExecuted(JobExecutionContext context) {
- }
-
- /*
- * (non-Javadoc)
- *
- * @see
- * org.quartz.JobListener#jobExecutionVetoed(org.quartz.JobExecutionContext)
- */
- @Override
- public void jobExecutionVetoed(JobExecutionContext context) {
- }
-
- /**
- * @param context
- * @param jobInstance
- */
- protected void scheduleNextJob(JobExecutionContext context, JobInstance jobInstance) {
- try {
- // schedule next job
- JobDataMap jobDataMap = context.getJobDetail().getJobDataMap();
- JobFlow jobFlow = (JobFlow) jobDataMap.get(JobConstants.PROP_JOB_FLOW);
- JobDetail nextJob = (JobDetail) jobFlow.getNext(context.getJobDetail());
- if (nextJob != null) {
- try {
- Trigger trigger = TriggerBuilder.newTrigger().startNow().build();
- log.debug("Job " + context.getJobDetail().getKey() + " will now chain to Job " + nextJob.getKey() + "");
-
- context.getScheduler().scheduleJob(nextJob, trigger);
-
- } catch (SchedulerException se) {
- log.error("Error encountered during chaining to Job " + nextJob.getKey() + "", se);
- }
- }
-
- context.getScheduler().deleteJob(context.getJobDetail().getKey());
- } catch (SchedulerException e) {
- log.error(e.getLocalizedMessage(), e);
- throw new RuntimeException(e);
- }
- }
-
- /**
- * @param jobInstance
- * @param stepId
- */
- private void updateKylinJobStatus(JobInstance jobInstance, int stepId, JobEngineConfig engineConfig) {
- validate(jobInstance);
- List<JobStep> steps = jobInstance.getSteps();
- Collections.sort(steps);
-
- JobStep jobStep = jobInstance.getSteps().get(stepId);
-
- long duration = jobStep.getExecEndTime() - jobStep.getExecStartTime();
- jobInstance.setDuration(jobInstance.getDuration() + (duration > 0 ? duration : 0) / 1000);
- jobInstance.setMrWaiting(jobInstance.getMrWaiting() + jobStep.getExecWaitTime());
-
- try {
- JobDAO.getInstance(engineConfig.getConfig()).updateJobInstance(jobInstance);
- } catch (IOException e) {
- e.printStackTrace();
- log.error(e.getLocalizedMessage(), e);
- }
- }
-
- private void updateKylinJobOnSuccess(JobInstance jobInstance, int stepId, JobEngineConfig engineConfig) {
- validate(jobInstance);
- List<JobStep> steps = jobInstance.getSteps();
- Collections.sort(steps);
-
- JobStep jobStep = jobInstance.getSteps().get(stepId);
- jobInstance.setExecStartTime(steps.get(0).getExecStartTime());
-
- long duration = jobStep.getExecEndTime() - jobStep.getExecStartTime();
- jobInstance.setDuration(jobInstance.getDuration() + (duration > 0 ? duration / 1000 : 0));
- jobInstance.setMrWaiting(jobInstance.getMrWaiting() + jobStep.getExecWaitTime());
- if (jobInstance.getStatus().equals(JobStatusEnum.FINISHED)) {
- jobInstance.setExecEndTime(steps.get(steps.size() - 1).getExecEndTime());
- }
-
- try {
- JobDAO.getInstance(engineConfig.getConfig()).updateJobInstance(jobInstance);
- } catch (IOException e) {
- e.printStackTrace();
- log.error(e.getLocalizedMessage(), e);
- }
- }
-
- private void updateCubeSegmentInfoOnDiscard(JobInstance jobInstance, JobEngineConfig engineConfig) throws IOException, CubeIntegrityException {
- CubeManager cubeMgr = CubeManager.getInstance(engineConfig.getConfig());
- CubeInstance cubeInstance = cubeMgr.getCube(jobInstance.getRelatedCube());
- cubeMgr.updateSegmentOnJobDiscard(cubeInstance, jobInstance.getRelatedSegment());
- }
-
- private void updateCubeSegmentInfoOnSucceed(JobInstance jobInstance, JobEngineConfig engineConfig) throws CubeIntegrityException, IOException {
- if (jobInstance.getStatus().equals(JobStatusEnum.FINISHED)) {
- validate(jobInstance);
-
- log.info("Updating cube segment " + jobInstance.getRelatedSegment() + " for cube " + jobInstance.getRelatedCube());
-
- long cubeSize = 0;
- JobStep convertToHFileStep = jobInstance.findStep(JobConstants.STEP_NAME_CONVERT_CUBOID_TO_HFILE);
- if (null != convertToHFileStep) {
- String cubeSizeString = convertToHFileStep.getInfo(JobInstance.HDFS_BYTES_WRITTEN);
- if (cubeSizeString == null || cubeSizeString.equals("")) {
- throw new RuntimeException("Can't get cube segment size.");
- }
- cubeSize = Long.parseLong(cubeSizeString) / 1024;
- } else {
- log.info("No step with name '" + JobConstants.STEP_NAME_CONVERT_CUBOID_TO_HFILE + "' is found");
- }
-
- CubeManager cubeMgr = CubeManager.getInstance(engineConfig.getConfig());
- CubeInstance cubeInstance = cubeMgr.getCube(jobInstance.getRelatedCube());
- CubeSegment newSegment = cubeInstance.getSegmentById(jobInstance.getUuid());
-
- long sourceCount = 0;
- long sourceSize = 0;
- switch (jobInstance.getType()) {
- case BUILD:
- JobStep baseCuboidStep = jobInstance.findStep(JobConstants.STEP_NAME_BUILD_BASE_CUBOID);
- if (null != baseCuboidStep) {
- String sourceRecordsCount = baseCuboidStep.getInfo(JobInstance.SOURCE_RECORDS_COUNT);
- if (sourceRecordsCount == null || sourceRecordsCount.equals("")) {
- throw new RuntimeException("Can't get cube source record count.");
- }
- sourceCount = Long.parseLong(sourceRecordsCount);
- } else {
- log.info("No step with name '" + JobConstants.STEP_NAME_BUILD_BASE_CUBOID + "' is found");
- }
-
- JobStep createFlatTableStep = jobInstance.findStep(JobConstants.STEP_NAME_CREATE_FLAT_HIVE_TABLE);
- if (null != createFlatTableStep) {
- String sourceRecordsSize = createFlatTableStep.getInfo(JobInstance.SOURCE_RECORDS_SIZE);
- if (sourceRecordsSize == null || sourceRecordsSize.equals("")) {
- throw new RuntimeException("Can't get cube source record size.");
- }
- sourceSize = Long.parseLong(sourceRecordsSize);
- } else {
- log.info("No step with name '" + JobConstants.STEP_NAME_CREATE_FLAT_HIVE_TABLE + "' is found");
- }
-
- if (cubeInstance.needMergeImmediatelyAfterBuild(newSegment)) {
- for (CubeSegment seg : cubeInstance.getSegment(CubeSegmentStatusEnum.READY)) {
- sourceCount += seg.getSourceRecords();
- sourceSize += seg.getSourceRecordsSize();
- }
- }
- break;
- case MERGE:
- for (CubeSegment seg : cubeInstance.getMergingSegments()) {
- sourceCount += seg.getSourceRecords();
- sourceSize += seg.getSourceRecordsSize();
- }
- break;
- }
-
- cubeMgr.updateSegmentOnJobSucceed(cubeInstance, jobInstance.getType(), jobInstance.getRelatedSegment(), jobInstance.getUuid(), jobInstance.getExecEndTime(), cubeSize, sourceCount, sourceSize);
- log.info("Update cube segment succeed" + jobInstance.getRelatedSegment() + " for cube " + jobInstance.getRelatedCube());
- }
- }
-
- private void validate(JobInstance jobInstance) {
- List<JobStep> steps = jobInstance.getSteps();
- if (steps == null || steps.size() == 0) {
- throw new RuntimeException("Steps of job " + jobInstance.getUuid() + " is null or empty!");
- }
- }
-
- private void handleException(String jobInstanceUuid, int jobInstanceStepSeqId, KylinConfig config, Throwable t) {
- log.error(t.getLocalizedMessage(), t);
- String exceptionMsg = "Failed with Exception:" + ExceptionUtils.getFullStackTrace(t);
- try {
- JobInstance jobInstance = JobDAO.getInstance(config).getJob(jobInstanceUuid);
- jobInstance.getSteps().get(jobInstanceStepSeqId).setStatus(JobStepStatusEnum.ERROR);
- // String output =
- // jobInstance.getSteps().get(jobInstanceStepSeqId).getCmdOutput();
- // jobInstance.getSteps().get(jobInstanceStepSeqId).setCmdOutput(output
- // + "\n" + exceptionMsg);
- jobInstance.getSteps().get(jobInstanceStepSeqId).setExecEndTime(System.currentTimeMillis());
- JobDAO.getInstance(config).updateJobInstance(jobInstance);
-
- String output = JobDAO.getInstance(config).getJobOutput(jobInstanceUuid, jobInstanceStepSeqId).getOutput();
- output = output + "\n" + exceptionMsg;
- JobDAO.getInstance(config).saveJobOutput(jobInstanceUuid, jobInstanceStepSeqId, output);
- } catch (IOException e1) {
- log.error(e1.getLocalizedMessage(), e1);
- }
- }
-
- /**
- * @param jobInstance
- */
- protected void notifyUsers(JobInstance jobInstance, JobEngineConfig engineConfig) {
- KylinConfig config = engineConfig.getConfig();
- String cubeName = jobInstance.getRelatedCube();
- CubeInstance cubeInstance = CubeManager.getInstance(config).getCube(cubeName);
- String finalStatus = null;
- String content = JobConstants.NOTIFY_EMAIL_TEMPLATE;
- String logMsg = "";
-
- switch (jobInstance.getStatus()) {
- case FINISHED:
- finalStatus = "SUCCESS";
- break;
- case ERROR:
- for (JobStep step : jobInstance.getSteps()) {
- if (step.getStatus() == JobStepStatusEnum.ERROR) {
- try {
- logMsg = JobDAO.getInstance(config).getJobOutput(step).getOutput();
- } catch (IOException e) {
- log.error(e.getLocalizedMessage(), e);
- }
- }
- }
- finalStatus = "FAILED";
- break;
- case DISCARDED:
- finalStatus = "DISCARDED";
- default:
- break;
- }
-
- if (null == finalStatus) {
- return;
- }
-
- try {
- InetAddress inetAddress = InetAddress.getLocalHost();
- content = content.replaceAll("\\$\\{job_engine\\}", inetAddress.getCanonicalHostName());
- } catch (UnknownHostException e) {
- log.error(e.getLocalizedMessage(), e);
- }
-
- content = content.replaceAll("\\$\\{job_name\\}", jobInstance.getName());
- content = content.replaceAll("\\$\\{result\\}", finalStatus);
- content = content.replaceAll("\\$\\{cube_name\\}", cubeName);
- content = content.replaceAll("\\$\\{start_time\\}", new Date(jobInstance.getExecStartTime()).toString());
- content = content.replaceAll("\\$\\{duration\\}", jobInstance.getDuration() / 60 + "mins");
- content = content.replaceAll("\\$\\{mr_waiting\\}", jobInstance.getMrWaiting() / 60 + "mins");
- content = content.replaceAll("\\$\\{last_update_time\\}", new Date(jobInstance.getLastModified()).toString());
- content = content.replaceAll("\\$\\{submitter\\}", jobInstance.getSubmitter());
- content = content.replaceAll("\\$\\{error_log\\}", logMsg);
-
-
- MailService mailService = new MailService();
- try {
- List<String> users = new ArrayList<String>();
-
- if (null != cubeInstance.getDescriptor().getNotifyList()) {
- users.addAll(cubeInstance.getDescriptor().getNotifyList());
- }
-
- if (null != engineConfig.getAdminDls()) {
- String[] adminDls = engineConfig.getAdminDls().split(",");
-
- for (String adminDl : adminDls) {
- users.add(adminDl);
- }
- }
-
- log.info("prepare to send email to:"+users);
-
- log.info("job name:"+jobInstance.getName());
-
- log.info("submitter:"+jobInstance.getSubmitter());
-
- if (users.size() > 0) {
- log.info("notify list:"+users);
- mailService.sendMail(users, "["+ finalStatus + "] - [Kylin Cube Build Job]-" + cubeName, content);
- log.info("notified users:"+users);
- }
- } catch (IOException e) {
- log.error(e.getLocalizedMessage(), e);
- }
-
- }
-}
http://git-wip-us.apache.org/repos/asf/incubator-kylin/blob/a4fd4268/job/src/main/java/com/kylinolap/job/flow/JobFlowNode.java
----------------------------------------------------------------------
diff --git a/job/src/main/java/com/kylinolap/job/flow/JobFlowNode.java b/job/src/main/java/com/kylinolap/job/flow/JobFlowNode.java
deleted file mode 100644
index e539121..0000000
--- a/job/src/main/java/com/kylinolap/job/flow/JobFlowNode.java
+++ /dev/null
@@ -1,165 +0,0 @@
-/*
- * Copyright 2013-2014 eBay Software Foundation
- *
- * Licensed under the Apache License, Version 2.0 (the "License");
- * you may not use this file except in compliance with the License.
- * You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package com.kylinolap.job.flow;
-
-import java.io.IOException;
-
-import org.apache.commons.lang.exception.ExceptionUtils;
-import org.quartz.InterruptableJob;
-import org.quartz.JobDataMap;
-import org.quartz.JobDetail;
-import org.quartz.JobExecutionContext;
-import org.quartz.JobExecutionException;
-import org.quartz.UnableToInterruptJobException;
-import org.slf4j.Logger;
-import org.slf4j.LoggerFactory;
-
-import com.kylinolap.common.KylinConfig;
-import com.kylinolap.job.JobDAO;
-import com.kylinolap.job.JobInstance;
-import com.kylinolap.job.JobInstance.JobStep;
-import com.kylinolap.job.cmd.ICommandOutput;
-import com.kylinolap.job.cmd.IJobCommand;
-import com.kylinolap.job.cmd.JobCommandFactory;
-import com.kylinolap.job.constant.JobConstants;
-import com.kylinolap.job.constant.JobStepStatusEnum;
-import com.kylinolap.job.engine.JobEngine;
-import com.kylinolap.job.engine.JobEngineConfig;
-
-/**
- * @author xduo
- *
- */
-public class JobFlowNode implements InterruptableJob {
-
- protected static final Logger log = LoggerFactory.getLogger(JobFlowNode.class);
-
- protected JobDetail currentJobDetail;
- protected IJobCommand jobCmd;
-
- /*
- * (non-Javadoc)
- *
- * @see org.quartz.Job#execute(org.quartz.JobExecutionContext)
- */
- @Override
- public void execute(JobExecutionContext context) throws JobExecutionException {
- this.currentJobDetail = context.getJobDetail();
- JobDataMap data = this.currentJobDetail.getJobDataMap();
- JobFlow jobFlow = (JobFlow) data.get(JobConstants.PROP_JOB_FLOW);
- JobEngineConfig engineConfig = jobFlow.getJobengineConfig();
- String jobInstanceID = data.getString(JobConstants.PROP_JOBINSTANCE_UUID);
- int jobStepID = data.getInt(JobConstants.PROP_JOBSTEP_SEQ_ID);
- String command = data.getString(JobConstants.PROP_COMMAND);
- KylinConfig config = engineConfig.getConfig();
-
- try {
- JobInstance jobInstance = updateJobStep(jobInstanceID, jobStepID, config, JobStepStatusEnum.RUNNING, System.currentTimeMillis(), null, null);
-
- jobCmd = JobCommandFactory.getJobCommand(command, jobInstance, jobStepID, engineConfig);
- data.put(JobConstants.PROP_JOB_CMD_EXECUTOR, jobCmd);
- context.getScheduler().addJob(this.currentJobDetail, true, true);
-
- ICommandOutput output = jobCmd.execute();
-
- if (data.getBoolean(JobConstants.PROP_JOB_KILLED)) {
- return;
- }
-
- int exitCode = output.getExitCode();
- updateJobStep(jobInstanceID, jobStepID, config, output.getStatus(), null, System.currentTimeMillis(), output.getOutput());
- context.setResult(exitCode);
-
- log.info("Job status for " + context.getJobDetail().getKey() + " has been updated.");
- log.info("cmd:" + command);
- log.info("output:" + output.getOutput());
- log.info("exitCode:" + exitCode);
-
- } catch (Throwable t) {
- handleException(jobInstanceID, jobStepID, config, t);
- }
- }
-
- @Override
- public void interrupt() throws UnableToInterruptJobException {
- }
-
- protected JobInstance updateJobStep(String jobInstanceUuid, int jobInstanceStepSeqId, KylinConfig config, JobStepStatusEnum newStatus, Long execStartTime, Long execEndTime, String output) throws IOException {
- // set step status to running
- JobInstance jobInstance = JobDAO.getInstance(config).getJob(jobInstanceUuid);
- JobStep currentStep = null;
-
- try {
- currentStep = jobInstance.getSteps().get(jobInstanceStepSeqId);
- JobStepStatusEnum currentStatus = currentStep.getStatus();
- boolean hasChange = false;
-
- if (null != execStartTime) {
- hasChange = true;
- currentStep.setExecStartTime(execStartTime);
- }
- if (null != execEndTime) {
- hasChange = true;
- currentStep.setExecEndTime(execEndTime);
- }
- if (null != output) {
- hasChange = true;
- // currentStep.setCmdOutput(output);
- JobDAO.getInstance(config).saveJobOutput(currentStep, output);
- }
- if (JobStepStatusEnum.WAITING == currentStatus && (JobStepStatusEnum.RUNNING == newStatus || JobStepStatusEnum.FINISHED == newStatus)) {
- hasChange = true;
- currentStep.setExecWaitTime((System.currentTimeMillis() - currentStep.getExecStartTime()) / 1000);
- }
- if (null != newStatus) {
- hasChange = true;
- currentStep.setStatus(newStatus);
- }
-
- if (hasChange) {
- JobDAO.getInstance(config).updateJobInstance(jobInstance);
- }
- } catch (IOException e) {
- log.error(e.getLocalizedMessage(), e);
- }
-
- if (null != execEndTime) {
- JobEngine.JOB_DURATION.put(JobInstance.getStepIdentity(jobInstance, currentStep) + " - " + String.valueOf(currentStep.getExecStartTime()), (double) (currentStep.getExecEndTime() - currentStep.getExecStartTime()) / 1000);
- }
-
- return jobInstance;
- }
-
- protected void handleException(String jobInstanceUuid, int jobInstanceStepSeqId, KylinConfig config, Throwable t) {
- log.error(t.getLocalizedMessage(), t);
- String exceptionMsg = "Failed with Exception:" + ExceptionUtils.getFullStackTrace(t);
- try {
- JobDAO dao = JobDAO.getInstance(config);
- JobInstance jobInstance = dao.getJob(jobInstanceUuid);
- JobStep jobStep = jobInstance.getSteps().get(jobInstanceStepSeqId);
- jobStep.setStatus(JobStepStatusEnum.ERROR);
- jobStep.setExecEndTime(System.currentTimeMillis());
- dao.updateJobInstance(jobInstance);
-
- String output = dao.getJobOutput(jobInstanceUuid, jobInstanceStepSeqId).getOutput();
- output = output + "\n" + exceptionMsg;
- dao.saveJobOutput(jobInstanceUuid, jobInstanceStepSeqId, output);
- } catch (IOException e1) {
- log.error(e1.getLocalizedMessage(), e1);
- }
- }
-}
http://git-wip-us.apache.org/repos/asf/incubator-kylin/blob/a4fd4268/job/src/main/java/com/kylinolap/job/hadoop/AbstractHadoopJob.java
----------------------------------------------------------------------
diff --git a/job/src/main/java/com/kylinolap/job/hadoop/AbstractHadoopJob.java b/job/src/main/java/com/kylinolap/job/hadoop/AbstractHadoopJob.java
deleted file mode 100644
index 4cd59a9..0000000
--- a/job/src/main/java/com/kylinolap/job/hadoop/AbstractHadoopJob.java
+++ /dev/null
@@ -1,291 +0,0 @@
-/*
- * Copyright 2013-2014 eBay Software Foundation
- *
- * Licensed under the Apache License, Version 2.0 (the "License");
- * you may not use this file except in compliance with the License.
- * You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package com.kylinolap.job.hadoop;
-
-/**
- * @author George Song (ysong1)
- *
- */
-import java.io.File;
-import java.io.IOException;
-import java.io.InputStream;
-import java.util.ArrayList;
-import java.util.HashMap;
-import java.util.Map;
-
-import org.apache.commons.cli.Option;
-import org.apache.commons.cli.OptionBuilder;
-import org.apache.commons.cli.Options;
-import org.apache.commons.cli.ParseException;
-import org.apache.hadoop.conf.Configuration;
-import org.apache.hadoop.conf.Configured;
-import org.apache.hadoop.fs.FileStatus;
-import org.apache.hadoop.fs.FileSystem;
-import org.apache.hadoop.fs.Path;
-import org.apache.hadoop.mapreduce.Counters;
-import org.apache.hadoop.mapreduce.InputFormat;
-import org.apache.hadoop.mapreduce.InputSplit;
-import org.apache.hadoop.mapreduce.Job;
-import org.apache.hadoop.mapreduce.lib.input.FileInputFormat;
-import org.apache.hadoop.util.ReflectionUtils;
-import org.apache.hadoop.util.StringUtils;
-import org.apache.hadoop.util.Tool;
-import org.apache.hadoop.util.ToolRunner;
-import org.slf4j.Logger;
-import org.slf4j.LoggerFactory;
-
-import com.kylinolap.common.KylinConfig;
-import com.kylinolap.common.persistence.ResourceStore;
-import com.kylinolap.common.util.StringSplitter;
-import com.kylinolap.cube.CubeInstance;
-import com.kylinolap.cube.CubeSegment;
-import com.kylinolap.job.JobInstance;
-import com.kylinolap.job.exception.JobException;
-import com.kylinolap.job.tools.OptionsHelper;
-import com.kylinolap.metadata.model.schema.TableDesc;
-
-@SuppressWarnings("static-access")
-public abstract class AbstractHadoopJob extends Configured implements Tool {
- protected static final Logger log = LoggerFactory.getLogger(AbstractHadoopJob.class);
-
- protected static final Option OPTION_JOB_NAME = OptionBuilder.withArgName("name").hasArg().isRequired(true).withDescription("Job name. For exmaple, Kylin_Cuboid_Builder-clsfd_v2_Step_22-D)").create("jobname");
- protected static final Option OPTION_CUBE_NAME = OptionBuilder.withArgName("name").hasArg().isRequired(true).withDescription("Cube name. For exmaple, flat_item_cube").create("cubename");
- protected static final Option OPTION_SEGMENT_NAME = OptionBuilder.withArgName("name").hasArg().isRequired(true).withDescription("Cube segment name)").create("segmentname");
- protected static final Option OPTION_TABLE_NAME = OptionBuilder.withArgName("name").hasArg().isRequired(true).withDescription("Hive table name.").create("tablename");
- protected static final Option OPTION_INPUT_PATH = OptionBuilder.withArgName("path").hasArg().isRequired(true).withDescription("Input path").create("input");
- protected static final Option OPTION_INPUT_FORMAT = OptionBuilder.withArgName("inputformat").hasArg().isRequired(false).withDescription("Input format").create("inputformat");
- protected static final Option OPTION_INPUT_DELIM = OptionBuilder.withArgName("inputdelim").hasArg().isRequired(false).withDescription("Input delimeter").create("inputdelim");
- protected static final Option OPTION_OUTPUT_PATH = OptionBuilder.withArgName("path").hasArg().isRequired(true).withDescription("Output path").create("output");
- protected static final Option OPTION_NCUBOID_LEVEL = OptionBuilder.withArgName("level").hasArg().isRequired(true).withDescription("N-Cuboid build level, e.g. 1, 2, 3...").create("level");
- protected static final Option OPTION_PARTITION_FILE_PATH = OptionBuilder.withArgName("path").hasArg().isRequired(true).withDescription("Partition file path.").create("input");
- protected static final Option OPTION_HTABLE_NAME = OptionBuilder.withArgName("htable name").hasArg().isRequired(true).withDescription("HTable name").create("htablename");
- protected static final Option OPTION_KEY_COLUMN_PERCENTAGE = OptionBuilder.withArgName("rowkey column percentage").hasArg().isRequired(true).withDescription("Percentage of row key columns").create("columnpercentage");
- protected static final Option OPTION_KEY_SPLIT_NUMBER = OptionBuilder.withArgName("key split number").hasArg().isRequired(true).withDescription("Number of key split range").create("splitnumber");
-
- protected String name;
- protected String description;
- protected boolean isAsync = false;
- protected OptionsHelper optionsHelper = new OptionsHelper();
-
- protected Job job;
-
- protected void parseOptions(Options options, String[] args) throws ParseException {
- optionsHelper.parseOptions(options, args);
- }
-
- public void printUsage(Options options) {
- optionsHelper.printUsage(getClass().getSimpleName(), options);
- }
-
- public Option[] getOptions() {
- return optionsHelper.getOptions();
- }
-
- public String getOptionsAsString() {
- return optionsHelper.getOptionsAsString();
- }
-
- protected String getOptionValue(Option option) {
- return optionsHelper.getOptionValue(option);
- }
-
- protected boolean hasOption(Option option) {
- return optionsHelper.hasOption(option);
- }
-
- protected int waitForCompletion(Job job) throws IOException, InterruptedException, ClassNotFoundException {
- int retVal = 0;
- long start = System.nanoTime();
-
- if (isAsync) {
- job.submit();
- } else {
- job.waitForCompletion(true);
- retVal = job.isSuccessful() ? 0 : 1;
- }
-
- log.debug("Job '" + job.getJobName() + "' finished " + (job.isSuccessful() ? "successfully in " : "with failures. Time taken ") + StringUtils.formatTime((System.nanoTime() - start) / 1000000L));
-
- return retVal;
- }
-
- protected static void runJob(Tool job, String[] args) {
- try {
- int exitCode = ToolRunner.run(job, args);
- System.exit(exitCode);
- } catch (Exception e) {
- e.printStackTrace(System.err);
- System.exit(5);
- }
- }
-
- public void addInputDirs(String input, Job job) throws IOException {
- for (String inp : StringSplitter.split(input, ",")) {
- inp = inp.trim();
- if (inp.endsWith("/*")) {
- inp = inp.substring(0, inp.length() - 2);
- FileSystem fs = FileSystem.get(job.getConfiguration());
- Path path = new Path(inp);
- FileStatus[] fileStatuses = fs.listStatus(path);
- boolean hasDir = false;
- for (FileStatus stat : fileStatuses) {
- if (stat.isDirectory()) {
- hasDir = true;
- addInputDirs(stat.getPath().toString(), job);
- }
- }
- if (fileStatuses.length > 0 && !hasDir) {
- addInputDirs(path.toString(), job);
- }
- } else {
- System.out.println("Add input " + inp);
- FileInputFormat.addInputPath(job, new Path(inp));
- }
- }
- }
-
- protected void attachKylinPropsAndMetadata(CubeInstance cube, Configuration conf) throws IOException {
- File tmp = File.createTempFile("kylin_job_meta", "");
- tmp.delete(); // we need a directory, so delete the file first
-
- File metaDir = new File(tmp, "meta");
- metaDir.mkdirs();
- metaDir.getParentFile().deleteOnExit();
-
- // write kylin.properties
- KylinConfig kylinConfig = KylinConfig.getInstanceFromEnv();
- File kylinPropsFile = new File(metaDir, "kylin.properties");
- kylinConfig.writeProperties(kylinPropsFile);
-
- // write cube / cube_desc / dict / table
- ArrayList<String> dumpList = new ArrayList<String>();
- dumpList.add(cube.getResourcePath());
- dumpList.add(cube.getDescriptor().getResourcePath());
- if (cube.isInvertedIndex()) {
- dumpList.add(cube.getInvertedIndexDesc().getResourcePath());
- }
- for (TableDesc table : cube.getDescriptor().listTables()) {
- dumpList.add(table.getResourcePath());
- }
-
- for (CubeSegment segment : cube.getSegments()) {
- dumpList.addAll(segment.getDictionaryPaths());
- }
-
- dumpResources(kylinConfig, metaDir, dumpList);
-
- // hadoop distributed cache
- conf.set("tmpfiles", "file:///" + OptionsHelper.convertToFileURL(metaDir.getAbsolutePath()));
- }
-
- private void dumpResources(KylinConfig kylinConfig, File metaDir, ArrayList<String> dumpList) throws IOException {
- ResourceStore from = ResourceStore.getStore(kylinConfig);
- KylinConfig localConfig = KylinConfig.createInstanceFromUri(metaDir.getAbsolutePath());
- ResourceStore to = ResourceStore.getStore(localConfig);
- for (String path : dumpList) {
- InputStream in = from.getResource(path);
- if (in == null)
- throw new IllegalStateException("No resource found at -- " + path);
- long ts = from.getResourceTimestamp(path);
- to.putResource(path, in, ts);
- log.info("Dumped resource " + path + " to " + metaDir.getAbsolutePath());
- }
- }
-
- protected void deletePath(Configuration conf, Path path) throws IOException {
- FileSystem fs = FileSystem.get(conf);
- if (fs.exists(path)) {
- fs.delete(path, true);
- }
- }
-
- protected double getTotalMapInputMB() throws ClassNotFoundException, IOException, InterruptedException, JobException {
- if (job == null) {
- throw new JobException("Job is null");
- }
-
- long mapInputBytes = 0;
- InputFormat<?, ?> input = ReflectionUtils.newInstance(job.getInputFormatClass(), job.getConfiguration());
- for (InputSplit split : input.getSplits(job)) {
- mapInputBytes += split.getLength();
- }
- if (mapInputBytes == 0) {
- throw new IllegalArgumentException("Map input splits are 0 bytes, something is wrong!");
- }
- double totalMapInputMB = (double) mapInputBytes / 1024 / 1024;
- return totalMapInputMB;
- }
-
- protected int getMapInputSplitCount() throws ClassNotFoundException, JobException, IOException, InterruptedException {
- if (job == null) {
- throw new JobException("Job is null");
- }
- InputFormat<?, ?> input = ReflectionUtils.newInstance(job.getInputFormatClass(), job.getConfiguration());
- return input.getSplits(job).size();
- }
-
- public static KylinConfig loadKylinPropsAndMetadata(Configuration conf) throws IOException {
- File metaDir = new File("meta");
- System.setProperty(KylinConfig.KYLIN_CONF, metaDir.getAbsolutePath());
- System.out.println("The absolute path for meta dir is " + metaDir.getAbsolutePath());
- KylinConfig kylinConfig = KylinConfig.getInstanceFromEnv();
- kylinConfig.setMetadataUrl(metaDir.getCanonicalPath());
- return kylinConfig;
- }
-
- public void kill() throws JobException {
- if (job != null) {
- try {
- job.killJob();
- } catch (IOException e) {
- throw new JobException(e);
- }
- }
- }
-
- public Map<String, String> getInfo() throws JobException {
- if (job != null) {
- Map<String, String> status = new HashMap<String, String>();
- if (null != job.getJobID()) {
- status.put(JobInstance.MR_JOB_ID, job.getJobID().toString());
- }
- if (null != job.getTrackingURL()) {
- status.put(JobInstance.YARN_APP_URL, job.getTrackingURL().toString());
- }
-
- return status;
- } else {
- throw new JobException("Job is null");
- }
- }
-
- public Counters getCounters() throws JobException {
- if (job != null) {
- try {
- return job.getCounters();
- } catch (IOException e) {
- throw new JobException(e);
- }
- } else {
- throw new JobException("Job is null");
- }
- }
-
- public void setAsync(boolean isAsync) {
- this.isAsync = isAsync;
- }
-
-}
http://git-wip-us.apache.org/repos/asf/incubator-kylin/blob/a4fd4268/job/src/main/java/com/kylinolap/job/hadoop/cardinality/ColumnCardinalityMapper.java
----------------------------------------------------------------------
diff --git a/job/src/main/java/com/kylinolap/job/hadoop/cardinality/ColumnCardinalityMapper.java b/job/src/main/java/com/kylinolap/job/hadoop/cardinality/ColumnCardinalityMapper.java
deleted file mode 100644
index e25ec36..0000000
--- a/job/src/main/java/com/kylinolap/job/hadoop/cardinality/ColumnCardinalityMapper.java
+++ /dev/null
@@ -1,81 +0,0 @@
-/*
- * Copyright 2013-2014 eBay Software Foundation
- *
- * Licensed under the Apache License, Version 2.0 (the "License");
- * you may not use this file except in compliance with the License.
- * You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package com.kylinolap.job.hadoop.cardinality;
-
-import java.io.IOException;
-import java.nio.ByteBuffer;
-import java.util.HashMap;
-import java.util.Iterator;
-import java.util.Map;
-import java.util.StringTokenizer;
-
-import org.apache.hadoop.hbase.util.Bytes;
-import org.apache.hadoop.io.BytesWritable;
-import org.apache.hadoop.io.IntWritable;
-import org.apache.hadoop.io.Text;
-import org.apache.hadoop.mapreduce.Mapper;
-
-import com.kylinolap.common.hll.HyperLogLogPlusCounter;
-import com.kylinolap.cube.kv.RowConstants;
-
-/**
- * @author Jack
- *
- */
-public class ColumnCardinalityMapper<T> extends Mapper<T, Text, IntWritable, BytesWritable> {
-
- private Map<Integer, HyperLogLogPlusCounter> hllcMap = new HashMap<Integer, HyperLogLogPlusCounter>();
- public static final String DEFAULT_DELIM = ",";
-
- @Override
- public void map(T key, Text value, Context context) throws IOException, InterruptedException {
- String delim = context.getConfiguration().get(HiveColumnCardinalityJob.KEY_INPUT_DELIM);
- if (delim == null) {
- delim = DEFAULT_DELIM;
- }
- String line = value.toString();
- StringTokenizer tokenizer = new StringTokenizer(line, delim);
- int i = 1;
- while (tokenizer.hasMoreTokens()) {
- String temp = tokenizer.nextToken();
- getHllc(i).add(Bytes.toBytes(temp));
- i++;
- }
- }
-
- private HyperLogLogPlusCounter getHllc(Integer key) {
- if (!hllcMap.containsKey(key)) {
- hllcMap.put(key, new HyperLogLogPlusCounter());
- }
- return hllcMap.get(key);
- }
-
- @Override
- protected void cleanup(Context context) throws IOException, InterruptedException {
- Iterator<Integer> it = hllcMap.keySet().iterator();
- while (it.hasNext()) {
- int key = it.next();
- HyperLogLogPlusCounter hllc = hllcMap.get(key);
- ByteBuffer buf = ByteBuffer.allocate(RowConstants.ROWVALUE_BUFFER_SIZE);
- buf.clear();
- hllc.writeRegisters(buf);
- buf.flip();
- context.write(new IntWritable(key), new BytesWritable(buf.array()));
- }
- }
-
-}