You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@kylin.apache.org by li...@apache.org on 2017/11/23 05:53:06 UTC
[04/18] kylin git commit: APACHE-KYLIN-2731: Introduce checkpoint
executable
APACHE-KYLIN-2731: Introduce checkpoint executable
Project: http://git-wip-us.apache.org/repos/asf/kylin/repo
Commit: http://git-wip-us.apache.org/repos/asf/kylin/commit/fbfbee41
Tree: http://git-wip-us.apache.org/repos/asf/kylin/tree/fbfbee41
Diff: http://git-wip-us.apache.org/repos/asf/kylin/diff/fbfbee41
Branch: refs/heads/ci-dong
Commit: fbfbee4190481e1dd8522ec92eb6124c9f9a3c3f
Parents: e83a2e5
Author: Zhong <nj...@apache.org>
Authored: Fri Aug 25 11:35:26 2017 +0800
Committer: lidongsjtu <li...@apache.org>
Committed: Thu Nov 23 13:31:34 2017 +0800
----------------------------------------------------------------------
.../kylin/cube/model/CubeBuildTypeEnum.java | 7 +-
.../kylin/job/execution/AbstractExecutable.java | 5 +
.../job/execution/CheckpointExecutable.java | 78 +++++++++
.../engine/mr/common/JobInfoConverter.java | 57 +++++--
.../apache/kylin/rest/service/JobService.java | 167 ++++++++++++++++++-
5 files changed, 298 insertions(+), 16 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/kylin/blob/fbfbee41/core-cube/src/main/java/org/apache/kylin/cube/model/CubeBuildTypeEnum.java
----------------------------------------------------------------------
diff --git a/core-cube/src/main/java/org/apache/kylin/cube/model/CubeBuildTypeEnum.java b/core-cube/src/main/java/org/apache/kylin/cube/model/CubeBuildTypeEnum.java
index e3ae214..6a14025 100644
--- a/core-cube/src/main/java/org/apache/kylin/cube/model/CubeBuildTypeEnum.java
+++ b/core-cube/src/main/java/org/apache/kylin/cube/model/CubeBuildTypeEnum.java
@@ -35,5 +35,10 @@ public enum CubeBuildTypeEnum {
/**
* refresh segments
*/
- REFRESH
+ REFRESH,
+
+ /**
+ * checkpoint for set of other jobs
+ */
+ CHECKPOINT
}
http://git-wip-us.apache.org/repos/asf/kylin/blob/fbfbee41/core-job/src/main/java/org/apache/kylin/job/execution/AbstractExecutable.java
----------------------------------------------------------------------
diff --git a/core-job/src/main/java/org/apache/kylin/job/execution/AbstractExecutable.java b/core-job/src/main/java/org/apache/kylin/job/execution/AbstractExecutable.java
index d36f598..30b6421 100644
--- a/core-job/src/main/java/org/apache/kylin/job/execution/AbstractExecutable.java
+++ b/core-job/src/main/java/org/apache/kylin/job/execution/AbstractExecutable.java
@@ -384,6 +384,11 @@ public abstract class AbstractExecutable implements Executable, Idempotent {
return getDuration(getStartTime(), getEndTime(), getInterruptTime());
}
+ public boolean isReady() {
+ final Output output = getManager().getOutput(id);
+ return output.getState() == ExecutableState.READY;
+ }
+
/*
* discarded is triggered by JobService, the Scheduler is not awake of that
*
http://git-wip-us.apache.org/repos/asf/kylin/blob/fbfbee41/core-job/src/main/java/org/apache/kylin/job/execution/CheckpointExecutable.java
----------------------------------------------------------------------
diff --git a/core-job/src/main/java/org/apache/kylin/job/execution/CheckpointExecutable.java b/core-job/src/main/java/org/apache/kylin/job/execution/CheckpointExecutable.java
new file mode 100644
index 0000000..9864400
--- /dev/null
+++ b/core-job/src/main/java/org/apache/kylin/job/execution/CheckpointExecutable.java
@@ -0,0 +1,78 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+*/
+
+package org.apache.kylin.job.execution;
+
+import java.util.List;
+
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import com.google.common.collect.Lists;
+
+public class CheckpointExecutable extends DefaultChainedExecutable {
+
+ private static final Logger logger = LoggerFactory.getLogger(CheckpointExecutable.class);
+
+ private static final String DEPLOY_ENV_NAME = "envName";
+ private static final String PROJECT_INSTANCE_NAME = "projectName";
+
+ private final List<AbstractExecutable> subTasksForCheck = Lists.newArrayList();
+
+ public void addTaskForCheck(AbstractExecutable executable) {
+ this.subTasksForCheck.add(executable);
+ }
+
+ public void addTaskListForCheck(List<AbstractExecutable> executableList) {
+ this.subTasksForCheck.addAll(executableList);
+ }
+
+ public List<AbstractExecutable> getSubTasksForCheck() {
+ return subTasksForCheck;
+ }
+
+ @Override
+ public boolean isReady() {
+ if (!super.isReady()) {
+ return false;
+ }
+ for (Executable task : subTasksForCheck) {
+ final Output output = getManager().getOutput(task.getId());
+ if (output.getState() != ExecutableState.SUCCEED) {
+ return false;
+ }
+ }
+ return true;
+ }
+
+ public String getDeployEnvName() {
+ return getParam(DEPLOY_ENV_NAME);
+ }
+
+ public void setDeployEnvName(String name) {
+ setParam(DEPLOY_ENV_NAME, name);
+ }
+
+ public String getProjectName() {
+ return getParam(PROJECT_INSTANCE_NAME);
+ }
+
+ public void setProjectName(String name) {
+ setParam(PROJECT_INSTANCE_NAME, name);
+ }
+}
http://git-wip-us.apache.org/repos/asf/kylin/blob/fbfbee41/engine-mr/src/main/java/org/apache/kylin/engine/mr/common/JobInfoConverter.java
----------------------------------------------------------------------
diff --git a/engine-mr/src/main/java/org/apache/kylin/engine/mr/common/JobInfoConverter.java b/engine-mr/src/main/java/org/apache/kylin/engine/mr/common/JobInfoConverter.java
index 9b8400c..3098c15 100644
--- a/engine-mr/src/main/java/org/apache/kylin/engine/mr/common/JobInfoConverter.java
+++ b/engine-mr/src/main/java/org/apache/kylin/engine/mr/common/JobInfoConverter.java
@@ -28,6 +28,7 @@ import org.apache.kylin.job.common.ShellExecutable;
import org.apache.kylin.job.constant.JobStatusEnum;
import org.apache.kylin.job.constant.JobStepStatusEnum;
import org.apache.kylin.job.execution.AbstractExecutable;
+import org.apache.kylin.job.execution.CheckpointExecutable;
import org.apache.kylin.job.execution.ExecutableState;
import org.apache.kylin.job.execution.Output;
import org.slf4j.Logger;
@@ -36,7 +37,7 @@ import org.slf4j.LoggerFactory;
public class JobInfoConverter {
private static final Logger logger = LoggerFactory.getLogger(JobInfoConverter.class);
- public static JobInstance parseToJobInstanceQuietly(AbstractExecutable job, Map<String, Output> outputs) {
+ public static JobInstance parseToJobInstanceQuietly(CubingJob job, Map<String, Output> outputs) {
try {
return parseToJobInstance(job, outputs);
} catch (Exception e) {
@@ -45,26 +46,29 @@ public class JobInfoConverter {
}
}
- public static JobInstance parseToJobInstance(AbstractExecutable job, Map<String, Output> outputs) {
- if (job == null) {
- logger.warn("job is null.");
+ public static JobInstance parseToJobInstanceQuietly(CheckpointExecutable job, Map<String, Output> outputs) {
+ try {
+ return parseToJobInstance(job, outputs);
+ } catch (Exception e) {
+ logger.error("Failed to parse job instance: uuid={}", job, e);
return null;
}
+ }
- if (!(job instanceof CubingJob)) {
- logger.warn("illegal job type, id:" + job.getId());
+ public static JobInstance parseToJobInstance(CubingJob job, Map<String, Output> outputs) {
+ if (job == null) {
+ logger.warn("job is null.");
return null;
}
- CubingJob cubeJob = (CubingJob) job;
Output output = outputs.get(job.getId());
final JobInstance result = new JobInstance();
result.setName(job.getName());
- result.setRelatedCube(CubingExecutableUtil.getCubeName(cubeJob.getParams()));
- result.setRelatedSegment(CubingExecutableUtil.getSegmentId(cubeJob.getParams()));
+ result.setRelatedCube(CubingExecutableUtil.getCubeName(job.getParams()));
+ result.setRelatedSegment(CubingExecutableUtil.getSegmentId(job.getParams()));
result.setLastModified(output.getLastModified());
- result.setSubmitter(cubeJob.getSubmitter());
- result.setUuid(cubeJob.getId());
+ result.setSubmitter(job.getSubmitter());
+ result.setUuid(job.getId());
result.setType(CubeBuildTypeEnum.BUILD);
result.setStatus(parseToJobStatus(output.getState()));
result.setMrWaiting(AbstractExecutable.getExtraInfoAsLong(output, CubingJob.MAP_REDUCE_WAIT_TIME, 0L) / 1000);
@@ -73,8 +77,35 @@ public class JobInfoConverter {
result.setExecInterruptTime(AbstractExecutable.getInterruptTime(output));
result.setDuration(AbstractExecutable.getDuration(result.getExecStartTime(), result.getExecEndTime(),
result.getExecInterruptTime()) / 1000);
- for (int i = 0; i < cubeJob.getTasks().size(); ++i) {
- AbstractExecutable task = cubeJob.getTasks().get(i);
+ for (int i = 0; i < job.getTasks().size(); ++i) {
+ AbstractExecutable task = job.getTasks().get(i);
+ result.addStep(parseToJobStep(task, i, outputs.get(task.getId())));
+ }
+ return result;
+ }
+
+ public static JobInstance parseToJobInstance(CheckpointExecutable job, Map<String, Output> outputs) {
+ if (job == null) {
+ logger.warn("job is null.");
+ return null;
+ }
+
+ Output output = outputs.get(job.getId());
+ final JobInstance result = new JobInstance();
+ result.setName(job.getName());
+ result.setRelatedCube(CubingExecutableUtil.getCubeName(job.getParams()));
+ result.setLastModified(output.getLastModified());
+ result.setSubmitter(job.getSubmitter());
+ result.setUuid(job.getId());
+ result.setType(CubeBuildTypeEnum.CHECKPOINT);
+ result.setStatus(parseToJobStatus(output.getState()));
+ result.setExecStartTime(AbstractExecutable.getStartTime(output));
+ result.setExecEndTime(AbstractExecutable.getEndTime(output));
+ result.setExecInterruptTime(AbstractExecutable.getInterruptTime(output));
+ result.setDuration(AbstractExecutable.getDuration(result.getExecStartTime(), result.getExecEndTime(),
+ result.getExecInterruptTime()) / 1000);
+ for (int i = 0; i < job.getTasks().size(); ++i) {
+ AbstractExecutable task = job.getTasks().get(i);
result.addStep(parseToJobStep(task, i, outputs.get(task.getId())));
}
return result;
http://git-wip-us.apache.org/repos/asf/kylin/blob/fbfbee41/server-base/src/main/java/org/apache/kylin/rest/service/JobService.java
----------------------------------------------------------------------
diff --git a/server-base/src/main/java/org/apache/kylin/rest/service/JobService.java b/server-base/src/main/java/org/apache/kylin/rest/service/JobService.java
index d27b39a..529f3b8 100644
--- a/server-base/src/main/java/org/apache/kylin/rest/service/JobService.java
+++ b/server-base/src/main/java/org/apache/kylin/rest/service/JobService.java
@@ -50,6 +50,7 @@ import org.apache.kylin.job.constant.JobTimeFilterEnum;
import org.apache.kylin.job.engine.JobEngineConfig;
import org.apache.kylin.job.exception.SchedulerException;
import org.apache.kylin.job.execution.AbstractExecutable;
+import org.apache.kylin.job.execution.CheckpointExecutable;
import org.apache.kylin.job.execution.DefaultChainedExecutable;
import org.apache.kylin.job.execution.ExecutableState;
import org.apache.kylin.job.execution.Output;
@@ -317,6 +318,33 @@ public class JobService extends BasicService implements InitializingBean {
return result;
}
+ protected JobInstance getCheckpointJobInstance(AbstractExecutable job) {
+ Message msg = MsgPicker.getMsg();
+
+ if (job == null) {
+ return null;
+ }
+ if (!(job instanceof CheckpointExecutable)) {
+ throw new BadRequestException(String.format(msg.getILLEGAL_JOB_TYPE(), job.getId()));
+ }
+
+ CheckpointExecutable checkpointExecutable = (CheckpointExecutable) job;
+ final JobInstance result = new JobInstance();
+ result.setName(job.getName());
+ result.setRelatedCube(CubingExecutableUtil.getCubeName(job.getParams()));
+ result.setLastModified(job.getLastModified());
+ result.setSubmitter(job.getSubmitter());
+ result.setUuid(job.getId());
+ result.setType(CubeBuildTypeEnum.CHECKPOINT);
+ result.setStatus(JobInfoConverter.parseToJobStatus(job.getStatus()));
+ result.setDuration(job.getDuration() / 1000);
+ for (int i = 0; i < checkpointExecutable.getTasks().size(); ++i) {
+ AbstractExecutable task = checkpointExecutable.getTasks().get(i);
+ result.addStep(JobInfoConverter.parseToJobStep(task, i, getExecutableManager().getOutput(task.getId())));
+ }
+ return result;
+ }
+
public void resumeJob(JobInstance job) {
aclEvaluate.checkProjectOperationPermission(job);
getExecutableManager().resumeJob(job.getId());
@@ -373,6 +401,7 @@ public class JobService extends BasicService implements InitializingBean {
Integer limit = (null == limitValue) ? 30 : limitValue;
Integer offset = (null == offsetValue) ? 0 : offsetValue;
List<JobInstance> jobs = searchJobsByCubeName(cubeNameSubstring, projectName, statusList, timeFilter);
+
Collections.sort(jobs);
if (jobs.size() <= offset) {
@@ -388,12 +417,40 @@ public class JobService extends BasicService implements InitializingBean {
public List<JobInstance> searchJobsByCubeName(final String cubeNameSubstring, final String projectName,
final List<JobStatusEnum> statusList, final JobTimeFilterEnum timeFilter) {
- return innerSearchCubingJobs(cubeNameSubstring, null, projectName, statusList, timeFilter);
+ return searchJobsByCubeName(cubeNameSubstring, projectName, statusList, timeFilter, JobSearchMode.ALL);
+ }
+
+ public List<JobInstance> searchJobsByCubeName(final String cubeNameSubstring, final String projectName,
+ final List<JobStatusEnum> statusList, final JobTimeFilterEnum timeFilter, JobSearchMode jobSearchMode) {
+ return innerSearchJobs(cubeNameSubstring, null, projectName, statusList, timeFilter, jobSearchMode);
}
public List<JobInstance> searchJobsByJobName(final String jobName, final String projectName,
final List<JobStatusEnum> statusList, final JobTimeFilterEnum timeFilter) {
- return innerSearchCubingJobs(null, jobName, projectName, statusList, timeFilter);
+ return searchJobsByJobName(jobName, projectName, statusList, timeFilter, JobSearchMode.ALL);
+ }
+
+ public List<JobInstance> searchJobsByJobName(final String jobName, final String projectName,
+ final List<JobStatusEnum> statusList, final JobTimeFilterEnum timeFilter, JobSearchMode jobSearchMode) {
+ return innerSearchJobs(null, jobName, projectName, statusList, timeFilter, jobSearchMode);
+ }
+
+ public List<JobInstance> innerSearchJobs(final String cubeName, final String jobName, final String projectName,
+ final List<JobStatusEnum> statusList, final JobTimeFilterEnum timeFilter, JobSearchMode jobSearchMode) {
+ List<JobInstance> result = Lists.newArrayList();
+ switch (jobSearchMode) {
+ case CUBING_ONLY:
+ result.addAll(innerSearchCubingJobs(cubeName, jobName, projectName, statusList, timeFilter));
+ break;
+ case CHECKPOINT_ONLY:
+ result.addAll(innerSearchCheckpointJobs(cubeName, jobName, projectName, statusList, timeFilter));
+ break;
+ case ALL:
+ default:
+ result.addAll(innerSearchCubingJobs(cubeName, jobName, projectName, statusList, timeFilter));
+ result.addAll(innerSearchCheckpointJobs(cubeName, jobName, projectName, statusList, timeFilter));
+ }
+ return result;
}
public List<JobInstance> innerSearchCubingJobs(final String cubeName, final String jobName,
@@ -503,6 +560,109 @@ public class JobService extends BasicService implements InitializingBean {
return results;
}
+ public List<JobInstance> innerSearchCheckpointJobs(final String cubeName, final String jobName,
+ final String projectName, final List<JobStatusEnum> statusList, final JobTimeFilterEnum timeFilter) {
+ // prepare time range
+ Calendar calendar = Calendar.getInstance();
+ calendar.setTime(new Date());
+ long timeStartInMillis = getTimeStartInMillis(calendar, timeFilter);
+ long timeEndInMillis = Long.MAX_VALUE;
+ Set<ExecutableState> states = convertStatusEnumToStates(statusList);
+ final Map<String, Output> allOutputs = getExecutableManager().getAllOutputs(timeStartInMillis, timeEndInMillis);
+
+ return Lists
+ .newArrayList(FluentIterable
+ .from(innerSearchCheckpointJobs(cubeName, jobName, states, timeStartInMillis, timeEndInMillis,
+ allOutputs, false, projectName))
+ .transform(new Function<CheckpointExecutable, JobInstance>() {
+ @Override
+ public JobInstance apply(CheckpointExecutable checkpointExecutable) {
+ return JobInfoConverter.parseToJobInstanceQuietly(checkpointExecutable, allOutputs);
+ }
+ }));
+ }
+
+ public List<CheckpointExecutable> innerSearchCheckpointJobs(final String cubeName, final String jobName,
+ final Set<ExecutableState> statusList, long timeStartInMillis, long timeEndInMillis,
+ final Map<String, Output> allOutputs, final boolean nameExactMatch, final String projectName) {
+ List<CheckpointExecutable> results = Lists
+ .newArrayList(
+ FluentIterable
+ .from(getExecutableManager().getAllAbstractExecutables(timeStartInMillis,
+ timeEndInMillis, CheckpointExecutable.class))
+ .filter(new Predicate<AbstractExecutable>() {
+ @Override
+ public boolean apply(AbstractExecutable executable) {
+ if (executable instanceof CheckpointExecutable) {
+ if (StringUtils.isEmpty(cubeName)) {
+ return true;
+ }
+ String executableCubeName = CubingExecutableUtil
+ .getCubeName(executable.getParams());
+ if (executableCubeName == null)
+ return true;
+ if (nameExactMatch)
+ return executableCubeName.equalsIgnoreCase(cubeName);
+ else
+ return executableCubeName.toLowerCase()
+ .contains(cubeName.toLowerCase());
+ } else {
+ return false;
+ }
+ }
+ }).transform(new Function<AbstractExecutable, CheckpointExecutable>() {
+ @Override
+ public CheckpointExecutable apply(AbstractExecutable executable) {
+ return (CheckpointExecutable) executable;
+ }
+ }).filter(Predicates.and(new Predicate<CheckpointExecutable>() {
+ @Override
+ public boolean apply(CheckpointExecutable executable) {
+ if (null == projectName
+ || null == getProjectManager().getProject(projectName)) {
+ return true;
+ } else {
+ return projectName.equalsIgnoreCase(executable.getProjectName());
+ }
+ }
+ }, new Predicate<CheckpointExecutable>() {
+ @Override
+ public boolean apply(CheckpointExecutable executable) {
+ try {
+ Output output = allOutputs.get(executable.getId());
+ if (output == null) {
+ return false;
+ }
+
+ ExecutableState state = output.getState();
+ boolean ret = statusList.contains(state);
+ return ret;
+ } catch (Exception e) {
+ throw e;
+ }
+ }
+ }, new Predicate<CheckpointExecutable>() {
+ @Override
+ public boolean apply(@Nullable CheckpointExecutable checkpointExecutable) {
+ if (checkpointExecutable == null) {
+ return false;
+ }
+
+ if (Strings.isEmpty(jobName)) {
+ return true;
+ }
+
+ if (nameExactMatch) {
+ return checkpointExecutable.getName().equalsIgnoreCase(jobName);
+ } else {
+ return checkpointExecutable.getName().toLowerCase()
+ .contains(jobName.toLowerCase());
+ }
+ }
+ })));
+ return results;
+ }
+
public List<CubingJob> listJobsByRealizationName(final String realizationName, final String projectName,
final Set<ExecutableState> statusList) {
return innerSearchCubingJobs(realizationName, null, statusList, 0L, Long.MAX_VALUE,
@@ -513,4 +673,7 @@ public class JobService extends BasicService implements InitializingBean {
return listJobsByRealizationName(realizationName, projectName, EnumSet.allOf(ExecutableState.class));
}
+ public enum JobSearchMode {
+ CUBING_ONLY, CHECKPOINT_ONLY, ALL
+ }
}