You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@kylin.apache.org by li...@apache.org on 2017/11/23 05:53:06 UTC

[04/18] kylin git commit: APACHE-KYLIN-2731: Introduce checkpoint executable

APACHE-KYLIN-2731: Introduce checkpoint executable


Project: http://git-wip-us.apache.org/repos/asf/kylin/repo
Commit: http://git-wip-us.apache.org/repos/asf/kylin/commit/fbfbee41
Tree: http://git-wip-us.apache.org/repos/asf/kylin/tree/fbfbee41
Diff: http://git-wip-us.apache.org/repos/asf/kylin/diff/fbfbee41

Branch: refs/heads/ci-dong
Commit: fbfbee4190481e1dd8522ec92eb6124c9f9a3c3f
Parents: e83a2e5
Author: Zhong <nj...@apache.org>
Authored: Fri Aug 25 11:35:26 2017 +0800
Committer: lidongsjtu <li...@apache.org>
Committed: Thu Nov 23 13:31:34 2017 +0800

----------------------------------------------------------------------
 .../kylin/cube/model/CubeBuildTypeEnum.java     |   7 +-
 .../kylin/job/execution/AbstractExecutable.java |   5 +
 .../job/execution/CheckpointExecutable.java     |  78 +++++++++
 .../engine/mr/common/JobInfoConverter.java      |  57 +++++--
 .../apache/kylin/rest/service/JobService.java   | 167 ++++++++++++++++++-
 5 files changed, 298 insertions(+), 16 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/kylin/blob/fbfbee41/core-cube/src/main/java/org/apache/kylin/cube/model/CubeBuildTypeEnum.java
----------------------------------------------------------------------
diff --git a/core-cube/src/main/java/org/apache/kylin/cube/model/CubeBuildTypeEnum.java b/core-cube/src/main/java/org/apache/kylin/cube/model/CubeBuildTypeEnum.java
index e3ae214..6a14025 100644
--- a/core-cube/src/main/java/org/apache/kylin/cube/model/CubeBuildTypeEnum.java
+++ b/core-cube/src/main/java/org/apache/kylin/cube/model/CubeBuildTypeEnum.java
@@ -35,5 +35,10 @@ public enum CubeBuildTypeEnum {
     /**
      * refresh segments
      */
-    REFRESH
+    REFRESH,
+
+    /**
+     * checkpoint for set of other jobs
+     */
+    CHECKPOINT
 }

http://git-wip-us.apache.org/repos/asf/kylin/blob/fbfbee41/core-job/src/main/java/org/apache/kylin/job/execution/AbstractExecutable.java
----------------------------------------------------------------------
diff --git a/core-job/src/main/java/org/apache/kylin/job/execution/AbstractExecutable.java b/core-job/src/main/java/org/apache/kylin/job/execution/AbstractExecutable.java
index d36f598..30b6421 100644
--- a/core-job/src/main/java/org/apache/kylin/job/execution/AbstractExecutable.java
+++ b/core-job/src/main/java/org/apache/kylin/job/execution/AbstractExecutable.java
@@ -384,6 +384,11 @@ public abstract class AbstractExecutable implements Executable, Idempotent {
         return getDuration(getStartTime(), getEndTime(), getInterruptTime());
     }
 
+    public boolean isReady() {
+        final Output output = getManager().getOutput(id);
+        return output.getState() == ExecutableState.READY;
+    }
+
     /*
     * discarded is triggered by JobService, the Scheduler is not awake of that
     *

http://git-wip-us.apache.org/repos/asf/kylin/blob/fbfbee41/core-job/src/main/java/org/apache/kylin/job/execution/CheckpointExecutable.java
----------------------------------------------------------------------
diff --git a/core-job/src/main/java/org/apache/kylin/job/execution/CheckpointExecutable.java b/core-job/src/main/java/org/apache/kylin/job/execution/CheckpointExecutable.java
new file mode 100644
index 0000000..9864400
--- /dev/null
+++ b/core-job/src/main/java/org/apache/kylin/job/execution/CheckpointExecutable.java
@@ -0,0 +1,78 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+*/
+
+package org.apache.kylin.job.execution;
+
+import java.util.List;
+
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import com.google.common.collect.Lists;
+
+public class CheckpointExecutable extends DefaultChainedExecutable {
+
+    private static final Logger logger = LoggerFactory.getLogger(CheckpointExecutable.class);
+
+    private static final String DEPLOY_ENV_NAME = "envName";
+    private static final String PROJECT_INSTANCE_NAME = "projectName";
+
+    private final List<AbstractExecutable> subTasksForCheck = Lists.newArrayList();
+
+    public void addTaskForCheck(AbstractExecutable executable) {
+        this.subTasksForCheck.add(executable);
+    }
+
+    public void addTaskListForCheck(List<AbstractExecutable> executableList) {
+        this.subTasksForCheck.addAll(executableList);
+    }
+
+    public List<AbstractExecutable> getSubTasksForCheck() {
+        return subTasksForCheck;
+    }
+
+    @Override
+    public boolean isReady() {
+        if (!super.isReady()) {
+            return false;
+        }
+        for (Executable task : subTasksForCheck) {
+            final Output output = getManager().getOutput(task.getId());
+            if (output.getState() != ExecutableState.SUCCEED) {
+                return false;
+            }
+        }
+        return true;
+    }
+
+    public String getDeployEnvName() {
+        return getParam(DEPLOY_ENV_NAME);
+    }
+
+    public void setDeployEnvName(String name) {
+        setParam(DEPLOY_ENV_NAME, name);
+    }
+
+    public String getProjectName() {
+        return getParam(PROJECT_INSTANCE_NAME);
+    }
+
+    public void setProjectName(String name) {
+        setParam(PROJECT_INSTANCE_NAME, name);
+    }
+}

http://git-wip-us.apache.org/repos/asf/kylin/blob/fbfbee41/engine-mr/src/main/java/org/apache/kylin/engine/mr/common/JobInfoConverter.java
----------------------------------------------------------------------
diff --git a/engine-mr/src/main/java/org/apache/kylin/engine/mr/common/JobInfoConverter.java b/engine-mr/src/main/java/org/apache/kylin/engine/mr/common/JobInfoConverter.java
index 9b8400c..3098c15 100644
--- a/engine-mr/src/main/java/org/apache/kylin/engine/mr/common/JobInfoConverter.java
+++ b/engine-mr/src/main/java/org/apache/kylin/engine/mr/common/JobInfoConverter.java
@@ -28,6 +28,7 @@ import org.apache.kylin.job.common.ShellExecutable;
 import org.apache.kylin.job.constant.JobStatusEnum;
 import org.apache.kylin.job.constant.JobStepStatusEnum;
 import org.apache.kylin.job.execution.AbstractExecutable;
+import org.apache.kylin.job.execution.CheckpointExecutable;
 import org.apache.kylin.job.execution.ExecutableState;
 import org.apache.kylin.job.execution.Output;
 import org.slf4j.Logger;
@@ -36,7 +37,7 @@ import org.slf4j.LoggerFactory;
 public class JobInfoConverter {
     private static final Logger logger = LoggerFactory.getLogger(JobInfoConverter.class);
 
-    public static JobInstance parseToJobInstanceQuietly(AbstractExecutable job, Map<String, Output> outputs) {
+    public static JobInstance parseToJobInstanceQuietly(CubingJob job, Map<String, Output> outputs) {
         try {
             return parseToJobInstance(job, outputs);
         } catch (Exception e) {
@@ -45,26 +46,29 @@ public class JobInfoConverter {
         }
     }
 
-    public static JobInstance parseToJobInstance(AbstractExecutable job, Map<String, Output> outputs) {
-        if (job == null) {
-            logger.warn("job is null.");
+    public static JobInstance parseToJobInstanceQuietly(CheckpointExecutable job, Map<String, Output> outputs) {
+        try {
+            return parseToJobInstance(job, outputs);
+        } catch (Exception e) {
+            logger.error("Failed to parse job instance: uuid={}", job, e);
             return null;
         }
+    }
 
-        if (!(job instanceof CubingJob)) {
-            logger.warn("illegal job type, id:" + job.getId());
+    public static JobInstance parseToJobInstance(CubingJob job, Map<String, Output> outputs) {
+        if (job == null) {
+            logger.warn("job is null.");
             return null;
         }
 
-        CubingJob cubeJob = (CubingJob) job;
         Output output = outputs.get(job.getId());
         final JobInstance result = new JobInstance();
         result.setName(job.getName());
-        result.setRelatedCube(CubingExecutableUtil.getCubeName(cubeJob.getParams()));
-        result.setRelatedSegment(CubingExecutableUtil.getSegmentId(cubeJob.getParams()));
+        result.setRelatedCube(CubingExecutableUtil.getCubeName(job.getParams()));
+        result.setRelatedSegment(CubingExecutableUtil.getSegmentId(job.getParams()));
         result.setLastModified(output.getLastModified());
-        result.setSubmitter(cubeJob.getSubmitter());
-        result.setUuid(cubeJob.getId());
+        result.setSubmitter(job.getSubmitter());
+        result.setUuid(job.getId());
         result.setType(CubeBuildTypeEnum.BUILD);
         result.setStatus(parseToJobStatus(output.getState()));
         result.setMrWaiting(AbstractExecutable.getExtraInfoAsLong(output, CubingJob.MAP_REDUCE_WAIT_TIME, 0L) / 1000);
@@ -73,8 +77,35 @@ public class JobInfoConverter {
         result.setExecInterruptTime(AbstractExecutable.getInterruptTime(output));
         result.setDuration(AbstractExecutable.getDuration(result.getExecStartTime(), result.getExecEndTime(),
                 result.getExecInterruptTime()) / 1000);
-        for (int i = 0; i < cubeJob.getTasks().size(); ++i) {
-            AbstractExecutable task = cubeJob.getTasks().get(i);
+        for (int i = 0; i < job.getTasks().size(); ++i) {
+            AbstractExecutable task = job.getTasks().get(i);
+            result.addStep(parseToJobStep(task, i, outputs.get(task.getId())));
+        }
+        return result;
+    }
+
+    public static JobInstance parseToJobInstance(CheckpointExecutable job, Map<String, Output> outputs) {
+        if (job == null) {
+            logger.warn("job is null.");
+            return null;
+        }
+
+        Output output = outputs.get(job.getId());
+        final JobInstance result = new JobInstance();
+        result.setName(job.getName());
+        result.setRelatedCube(CubingExecutableUtil.getCubeName(job.getParams()));
+        result.setLastModified(output.getLastModified());
+        result.setSubmitter(job.getSubmitter());
+        result.setUuid(job.getId());
+        result.setType(CubeBuildTypeEnum.CHECKPOINT);
+        result.setStatus(parseToJobStatus(output.getState()));
+        result.setExecStartTime(AbstractExecutable.getStartTime(output));
+        result.setExecEndTime(AbstractExecutable.getEndTime(output));
+        result.setExecInterruptTime(AbstractExecutable.getInterruptTime(output));
+        result.setDuration(AbstractExecutable.getDuration(result.getExecStartTime(), result.getExecEndTime(),
+                result.getExecInterruptTime()) / 1000);
+        for (int i = 0; i < job.getTasks().size(); ++i) {
+            AbstractExecutable task = job.getTasks().get(i);
             result.addStep(parseToJobStep(task, i, outputs.get(task.getId())));
         }
         return result;

http://git-wip-us.apache.org/repos/asf/kylin/blob/fbfbee41/server-base/src/main/java/org/apache/kylin/rest/service/JobService.java
----------------------------------------------------------------------
diff --git a/server-base/src/main/java/org/apache/kylin/rest/service/JobService.java b/server-base/src/main/java/org/apache/kylin/rest/service/JobService.java
index d27b39a..529f3b8 100644
--- a/server-base/src/main/java/org/apache/kylin/rest/service/JobService.java
+++ b/server-base/src/main/java/org/apache/kylin/rest/service/JobService.java
@@ -50,6 +50,7 @@ import org.apache.kylin.job.constant.JobTimeFilterEnum;
 import org.apache.kylin.job.engine.JobEngineConfig;
 import org.apache.kylin.job.exception.SchedulerException;
 import org.apache.kylin.job.execution.AbstractExecutable;
+import org.apache.kylin.job.execution.CheckpointExecutable;
 import org.apache.kylin.job.execution.DefaultChainedExecutable;
 import org.apache.kylin.job.execution.ExecutableState;
 import org.apache.kylin.job.execution.Output;
@@ -317,6 +318,33 @@ public class JobService extends BasicService implements InitializingBean {
         return result;
     }
 
+    protected JobInstance getCheckpointJobInstance(AbstractExecutable job) {
+        Message msg = MsgPicker.getMsg();
+
+        if (job == null) {
+            return null;
+        }
+        if (!(job instanceof CheckpointExecutable)) {
+            throw new BadRequestException(String.format(msg.getILLEGAL_JOB_TYPE(), job.getId()));
+        }
+
+        CheckpointExecutable checkpointExecutable = (CheckpointExecutable) job;
+        final JobInstance result = new JobInstance();
+        result.setName(job.getName());
+        result.setRelatedCube(CubingExecutableUtil.getCubeName(job.getParams()));
+        result.setLastModified(job.getLastModified());
+        result.setSubmitter(job.getSubmitter());
+        result.setUuid(job.getId());
+        result.setType(CubeBuildTypeEnum.CHECKPOINT);
+        result.setStatus(JobInfoConverter.parseToJobStatus(job.getStatus()));
+        result.setDuration(job.getDuration() / 1000);
+        for (int i = 0; i < checkpointExecutable.getTasks().size(); ++i) {
+            AbstractExecutable task = checkpointExecutable.getTasks().get(i);
+            result.addStep(JobInfoConverter.parseToJobStep(task, i, getExecutableManager().getOutput(task.getId())));
+        }
+        return result;
+    }
+
     public void resumeJob(JobInstance job) {
         aclEvaluate.checkProjectOperationPermission(job);
         getExecutableManager().resumeJob(job.getId());
@@ -373,6 +401,7 @@ public class JobService extends BasicService implements InitializingBean {
         Integer limit = (null == limitValue) ? 30 : limitValue;
         Integer offset = (null == offsetValue) ? 0 : offsetValue;
         List<JobInstance> jobs = searchJobsByCubeName(cubeNameSubstring, projectName, statusList, timeFilter);
+
         Collections.sort(jobs);
 
         if (jobs.size() <= offset) {
@@ -388,12 +417,40 @@ public class JobService extends BasicService implements InitializingBean {
 
     public List<JobInstance> searchJobsByCubeName(final String cubeNameSubstring, final String projectName,
             final List<JobStatusEnum> statusList, final JobTimeFilterEnum timeFilter) {
-        return innerSearchCubingJobs(cubeNameSubstring, null, projectName, statusList, timeFilter);
+        return searchJobsByCubeName(cubeNameSubstring, projectName, statusList, timeFilter, JobSearchMode.ALL);
+    }
+
+    public List<JobInstance> searchJobsByCubeName(final String cubeNameSubstring, final String projectName,
+            final List<JobStatusEnum> statusList, final JobTimeFilterEnum timeFilter, JobSearchMode jobSearchMode) {
+        return innerSearchJobs(cubeNameSubstring, null, projectName, statusList, timeFilter, jobSearchMode);
     }
 
     public List<JobInstance> searchJobsByJobName(final String jobName, final String projectName,
             final List<JobStatusEnum> statusList, final JobTimeFilterEnum timeFilter) {
-        return innerSearchCubingJobs(null, jobName, projectName, statusList, timeFilter);
+        return searchJobsByJobName(jobName, projectName, statusList, timeFilter, JobSearchMode.ALL);
+    }
+
+    public List<JobInstance> searchJobsByJobName(final String jobName, final String projectName,
+            final List<JobStatusEnum> statusList, final JobTimeFilterEnum timeFilter, JobSearchMode jobSearchMode) {
+        return innerSearchJobs(null, jobName, projectName, statusList, timeFilter, jobSearchMode);
+    }
+
+    public List<JobInstance> innerSearchJobs(final String cubeName, final String jobName, final String projectName,
+            final List<JobStatusEnum> statusList, final JobTimeFilterEnum timeFilter, JobSearchMode jobSearchMode) {
+        List<JobInstance> result = Lists.newArrayList();
+        switch (jobSearchMode) {
+        case CUBING_ONLY:
+            result.addAll(innerSearchCubingJobs(cubeName, jobName, projectName, statusList, timeFilter));
+            break;
+        case CHECKPOINT_ONLY:
+            result.addAll(innerSearchCheckpointJobs(cubeName, jobName, projectName, statusList, timeFilter));
+            break;
+        case ALL:
+        default:
+            result.addAll(innerSearchCubingJobs(cubeName, jobName, projectName, statusList, timeFilter));
+            result.addAll(innerSearchCheckpointJobs(cubeName, jobName, projectName, statusList, timeFilter));
+        }
+        return result;
     }
 
     public List<JobInstance> innerSearchCubingJobs(final String cubeName, final String jobName,
@@ -503,6 +560,109 @@ public class JobService extends BasicService implements InitializingBean {
         return results;
     }
 
+    public List<JobInstance> innerSearchCheckpointJobs(final String cubeName, final String jobName,
+            final String projectName, final List<JobStatusEnum> statusList, final JobTimeFilterEnum timeFilter) {
+        // prepare time range
+        Calendar calendar = Calendar.getInstance();
+        calendar.setTime(new Date());
+        long timeStartInMillis = getTimeStartInMillis(calendar, timeFilter);
+        long timeEndInMillis = Long.MAX_VALUE;
+        Set<ExecutableState> states = convertStatusEnumToStates(statusList);
+        final Map<String, Output> allOutputs = getExecutableManager().getAllOutputs(timeStartInMillis, timeEndInMillis);
+
+        return Lists
+                .newArrayList(FluentIterable
+                        .from(innerSearchCheckpointJobs(cubeName, jobName, states, timeStartInMillis, timeEndInMillis,
+                                allOutputs, false, projectName))
+                        .transform(new Function<CheckpointExecutable, JobInstance>() {
+                            @Override
+                            public JobInstance apply(CheckpointExecutable checkpointExecutable) {
+                                return JobInfoConverter.parseToJobInstanceQuietly(checkpointExecutable, allOutputs);
+                            }
+                        }));
+    }
+
+    public List<CheckpointExecutable> innerSearchCheckpointJobs(final String cubeName, final String jobName,
+            final Set<ExecutableState> statusList, long timeStartInMillis, long timeEndInMillis,
+            final Map<String, Output> allOutputs, final boolean nameExactMatch, final String projectName) {
+        List<CheckpointExecutable> results = Lists
+                .newArrayList(
+                        FluentIterable
+                                .from(getExecutableManager().getAllAbstractExecutables(timeStartInMillis,
+                                        timeEndInMillis, CheckpointExecutable.class))
+                                .filter(new Predicate<AbstractExecutable>() {
+                                    @Override
+                                    public boolean apply(AbstractExecutable executable) {
+                                        if (executable instanceof CheckpointExecutable) {
+                                            if (StringUtils.isEmpty(cubeName)) {
+                                                return true;
+                                            }
+                                            String executableCubeName = CubingExecutableUtil
+                                                    .getCubeName(executable.getParams());
+                                            if (executableCubeName == null)
+                                                return true;
+                                            if (nameExactMatch)
+                                                return executableCubeName.equalsIgnoreCase(cubeName);
+                                            else
+                                                return executableCubeName.toLowerCase()
+                                                        .contains(cubeName.toLowerCase());
+                                        } else {
+                                            return false;
+                                        }
+                                    }
+                                }).transform(new Function<AbstractExecutable, CheckpointExecutable>() {
+                                    @Override
+                                    public CheckpointExecutable apply(AbstractExecutable executable) {
+                                        return (CheckpointExecutable) executable;
+                                    }
+                                }).filter(Predicates.and(new Predicate<CheckpointExecutable>() {
+                                    @Override
+                                    public boolean apply(CheckpointExecutable executable) {
+                                        if (null == projectName
+                                                || null == getProjectManager().getProject(projectName)) {
+                                            return true;
+                                        } else {
+                                            return projectName.equalsIgnoreCase(executable.getProjectName());
+                                        }
+                                    }
+                                }, new Predicate<CheckpointExecutable>() {
+                                    @Override
+                                    public boolean apply(CheckpointExecutable executable) {
+                                        try {
+                                            Output output = allOutputs.get(executable.getId());
+                                            if (output == null) {
+                                                return false;
+                                            }
+
+                                            ExecutableState state = output.getState();
+                                            boolean ret = statusList.contains(state);
+                                            return ret;
+                                        } catch (Exception e) {
+                                            throw e;
+                                        }
+                                    }
+                                }, new Predicate<CheckpointExecutable>() {
+                                    @Override
+                                    public boolean apply(@Nullable CheckpointExecutable checkpointExecutable) {
+                                        if (checkpointExecutable == null) {
+                                            return false;
+                                        }
+
+                                        if (Strings.isEmpty(jobName)) {
+                                            return true;
+                                        }
+
+                                        if (nameExactMatch) {
+                                            return checkpointExecutable.getName().equalsIgnoreCase(jobName);
+                                        } else {
+                                            return checkpointExecutable.getName().toLowerCase()
+                                                    .contains(jobName.toLowerCase());
+                                        }
+                                    }
+                                })));
+        return results;
+    }
+
     public List<CubingJob> listJobsByRealizationName(final String realizationName, final String projectName,
             final Set<ExecutableState> statusList) {
         return innerSearchCubingJobs(realizationName, null, statusList, 0L, Long.MAX_VALUE,
@@ -513,4 +673,7 @@ public class JobService extends BasicService implements InitializingBean {
         return listJobsByRealizationName(realizationName, projectName, EnumSet.allOf(ExecutableState.class));
     }
 
+    public enum JobSearchMode {
+        CUBING_ONLY, CHECKPOINT_ONLY, ALL
+    }
 }