You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@kylin.apache.org by sh...@apache.org on 2018/08/14 09:42:49 UTC
[kylin] branch master updated: KYLIN-3470 Add cache for execute and
execute_output to speed up list job api
This is an automated email from the ASF dual-hosted git repository.
shaofengshi pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/kylin.git
The following commit(s) were added to refs/heads/master by this push:
new 0b5d255 KYLIN-3470 Add cache for execute and execute_output to speed up list job api
0b5d255 is described below
commit 0b5d25566cf24441e8371a782a8ac74dfa1dca72
Author: nichunen <ch...@kyligence.io>
AuthorDate: Sun Jul 29 19:34:55 2018 +0800
KYLIN-3470 Add cache for execute and execute_output to speed up list job api
---
.../java/org/apache/kylin/job/JobSearchResult.java | 66 +++
.../org/apache/kylin/job/dao/ExecutableDao.java | 164 +++++-
.../kylin/job/execution/ExecutableManager.java | 23 +
.../kylin/metadata/cachesync/CachedCrudAssist.java | 2 +-
.../kylin/engine/mr/common/JobInfoConverter.java | 34 ++
.../kylin/rest/controller/JobController.java | 2 +-
.../org/apache/kylin/rest/service/JobService.java | 618 +++++++++++++++------
7 files changed, 736 insertions(+), 173 deletions(-)
diff --git a/core-job/src/main/java/org/apache/kylin/job/JobSearchResult.java b/core-job/src/main/java/org/apache/kylin/job/JobSearchResult.java
new file mode 100644
index 0000000..bd4326d
--- /dev/null
+++ b/core-job/src/main/java/org/apache/kylin/job/JobSearchResult.java
@@ -0,0 +1,66 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+*/
+
+package org.apache.kylin.job;
+
+public class JobSearchResult implements Comparable<JobSearchResult> {
+ private String id;
+
+ private String jobName;
+
+ private String cubeName;
+
+ private long lastModified;
+
+ public String getId() {
+ return id;
+ }
+
+ public void setId(String id) {
+ this.id = id;
+ }
+
+ public String getJobName() {
+ return jobName;
+ }
+
+ public void setJobName(String jobName) {
+ this.jobName = jobName;
+ }
+
+ public String getCubeName() {
+ return cubeName;
+ }
+
+ public void setCubeName(String cubeName) {
+ this.cubeName = cubeName;
+ }
+
+ public long getLastModified() {
+ return lastModified;
+ }
+
+ public void setLastModified(long lastModified) {
+ this.lastModified = lastModified;
+ }
+
+ @Override
+ public int compareTo(JobSearchResult o) {
+ return o.lastModified < this.lastModified ? -1 : o.lastModified > this.lastModified ? 1 : 0;
+ }
+}
diff --git a/core-job/src/main/java/org/apache/kylin/job/dao/ExecutableDao.java b/core-job/src/main/java/org/apache/kylin/job/dao/ExecutableDao.java
index 1b34aa1..0cc6c8e 100644
--- a/core-job/src/main/java/org/apache/kylin/job/dao/ExecutableDao.java
+++ b/core-job/src/main/java/org/apache/kylin/job/dao/ExecutableDao.java
@@ -28,7 +28,11 @@ import org.apache.kylin.common.KylinConfig;
import org.apache.kylin.common.persistence.JsonSerializer;
import org.apache.kylin.common.persistence.ResourceStore;
import org.apache.kylin.common.persistence.Serializer;
+import org.apache.kylin.common.util.AutoReadWriteLock;
import org.apache.kylin.job.exception.PersistentException;
+import org.apache.kylin.metadata.cachesync.Broadcaster;
+import org.apache.kylin.metadata.cachesync.CachedCrudAssist;
+import org.apache.kylin.metadata.cachesync.CaseInsensitiveStringCache;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
@@ -55,9 +59,140 @@ public class ExecutableDao {
private ResourceStore store;
- private ExecutableDao(KylinConfig config) {
+ private CaseInsensitiveStringCache<ExecutablePO> executableDigestMap;
+
+ private CaseInsensitiveStringCache<ExecutableOutputPO> executableOutputDigestMap;
+
+ private CachedCrudAssist<ExecutablePO> executableDigestCrud;
+
+ private CachedCrudAssist<ExecutableOutputPO> executableOutputDigestCrud;
+
+ private AutoReadWriteLock executableDigestMapLock = new AutoReadWriteLock();
+
+ private AutoReadWriteLock executableOutputDigestMapLock = new AutoReadWriteLock();
+
+ private ExecutableDao(KylinConfig config) throws IOException {
logger.info("Using metadata url: " + config);
this.store = ResourceStore.getStore(config);
+ this.executableDigestMap = new CaseInsensitiveStringCache<>(config, "execute");
+ this.executableDigestCrud = new CachedCrudAssist<ExecutablePO>(store, ResourceStore.EXECUTE_RESOURCE_ROOT, "",
+ ExecutablePO.class, executableDigestMap, false) {
+ @Override
+ public ExecutablePO reloadAt(String path) {
+ try {
+ ExecutablePO executablePO = readJobResource(path);
+ if (executablePO == null) {
+ logger.warn("No job found at " + path + ", returning null");
+ executableDigestMap.removeLocal(resourceName(path));
+ return null;
+ }
+
+ // create a digest
+ ExecutablePO digestExecutablePO = new ExecutablePO();
+ digestExecutablePO.setUuid(executablePO.getUuid());
+ digestExecutablePO.setName(executablePO.getName());
+ digestExecutablePO.setLastModified(executablePO.getLastModified());
+ digestExecutablePO.setType(executablePO.getType());
+ digestExecutablePO.setParams(executablePO.getParams());
+ executableDigestMap.putLocal(resourceName(path), digestExecutablePO);
+ return digestExecutablePO;
+ } catch (Exception e) {
+ throw new IllegalStateException("Error loading execute at " + path, e);
+ }
+ }
+
+ @Override
+ protected ExecutablePO initEntityAfterReload(ExecutablePO entity, String resourceName) {
+ return entity;
+ }
+ };
+ this.executableDigestCrud.setCheckCopyOnWrite(true);
+ this.executableDigestCrud.reloadAll();
+
+ this.executableOutputDigestMap = new CaseInsensitiveStringCache<>(config, "execute_output");
+ this.executableOutputDigestCrud = new CachedCrudAssist<ExecutableOutputPO>(store, ResourceStore.EXECUTE_OUTPUT_RESOURCE_ROOT,
+ "", ExecutableOutputPO.class, executableOutputDigestMap, false) {
+ @Override
+ public void reloadAll() throws IOException {
+ logger.debug("Reloading execute_output from " + ResourceStore.EXECUTE_OUTPUT_RESOURCE_ROOT);
+ executableOutputDigestMap.clear();
+
+ NavigableSet<String> paths = store.listResources(ResourceStore.EXECUTE_OUTPUT_RESOURCE_ROOT);
+
+ if (paths != null) {
+ for (String path : paths) {
+ if (!isTaskExecutableOutput(resourceName(path)))
+ reloadAt(path);
+ }
+
+ logger.debug("Loaded " + executableOutputDigestMap.size() + " execute_output digest(s) out of " + paths.size()
+ + " resource");
+ }
+ }
+
+ @Override
+ public ExecutableOutputPO reloadAt(String path) {
+ try {
+ ExecutableOutputPO executableOutputPO = readJobOutputResource(path);
+ if (executableOutputPO == null) {
+ logger.warn("No job output found at " + path + ", returning null");
+ executableOutputDigestMap.removeLocal(resourceName(path));
+ return null;
+ }
+
+ // create a digest
+ ExecutableOutputPO digestExecutableOutputPO = new ExecutableOutputPO();
+ digestExecutableOutputPO.setUuid(executableOutputPO.getUuid());
+ digestExecutableOutputPO.setLastModified(executableOutputPO.getLastModified());
+ digestExecutableOutputPO.setStatus(executableOutputPO.getStatus());
+ executableOutputDigestMap.putLocal(resourceName(path), digestExecutableOutputPO);
+ return digestExecutableOutputPO;
+ } catch (Exception e) {
+ throw new IllegalStateException("Error loading execute at " + path, e);
+ }
+ }
+
+ @Override
+ protected ExecutableOutputPO initEntityAfterReload(ExecutableOutputPO entity, String resourceName) {
+ return entity;
+ }
+ };
+ this.executableOutputDigestCrud.setCheckCopyOnWrite(true);
+ this.executableOutputDigestCrud.reloadAll();
+ Broadcaster.getInstance(config).registerListener(new JobSyncListener(), "execute");
+ Broadcaster.getInstance(config).registerListener(new JobOutputSyncListener(), "execute_output");
+ }
+
+ private boolean isTaskExecutableOutput(String id) {
+ return id.length() > 36;
+ }
+
+ private class JobSyncListener extends Broadcaster.Listener {
+ @Override
+ public void onEntityChange(Broadcaster broadcaster, String entity, Broadcaster.Event event, String cacheKey)
+ throws IOException {
+ try (AutoReadWriteLock.AutoLock l = executableDigestMapLock.lockForWrite()) {
+ if (event == Broadcaster.Event.DROP)
+ executableDigestMap.removeLocal(cacheKey);
+ else
+ executableDigestCrud.reloadQuietly(cacheKey);
+ }
+ }
+ }
+
+ private class JobOutputSyncListener extends Broadcaster.Listener {
+ @Override
+ public void onEntityChange(Broadcaster broadcaster, String entity, Broadcaster.Event event, String cacheKey)
+ throws IOException {
+ try (AutoReadWriteLock.AutoLock l = executableOutputDigestMapLock.lockForWrite()) {
+ if (!isTaskExecutableOutput(cacheKey)) {
+ if (event == Broadcaster.Event.DROP)
+ executableOutputDigestMap.removeLocal(cacheKey);
+ else
+ executableOutputDigestCrud.reloadQuietly(cacheKey);
+ }
+ }
+ }
}
private String pathOfJob(ExecutablePO job) {
@@ -106,6 +241,15 @@ public class ExecutableDao {
}
}
+ public List<ExecutableOutputPO> getJobOutputDigests(long timeStart, long timeEndExclusive) {
+ List<ExecutableOutputPO> jobOutputDigests = Lists.newArrayList();
+ for (ExecutableOutputPO po : executableOutputDigestMap.values()) {
+ if (po.getLastModified() >= timeStart && po.getLastModified() < timeEndExclusive)
+ jobOutputDigests.add(po);
+ }
+ return jobOutputDigests;
+ }
+
public List<ExecutablePO> getJobs() throws PersistentException {
try {
return store.getAllResources(ResourceStore.EXECUTE_RESOURCE_ROOT, ExecutablePO.class, JOB_SERIALIZER);
@@ -124,6 +268,15 @@ public class ExecutableDao {
}
}
+ public List<ExecutablePO> getJobDigests(long timeStart, long timeEndExclusive) {
+ List<ExecutablePO> jobDigests = Lists.newArrayList();
+ for (ExecutablePO po : executableDigestMap.values()) {
+ if (po.getLastModified() >= timeStart && po.getLastModified() < timeEndExclusive)
+ jobDigests.add(po);
+ }
+ return jobDigests;
+ }
+
public List<String> getJobIds() throws PersistentException {
try {
NavigableSet<String> resources = store.listResources(ResourceStore.EXECUTE_RESOURCE_ROOT);
@@ -156,6 +309,7 @@ public class ExecutableDao {
throw new IllegalArgumentException("job id:" + job.getUuid() + " already exists");
}
writeJobResource(pathOfJob(job), job);
+ executableDigestMap.put(job.getId(), job);
return job;
} catch (IOException e) {
logger.error("error save job:" + job.getUuid(), e);
@@ -170,6 +324,7 @@ public class ExecutableDao {
}
final long ts = writeJobResource(pathOfJob(job), job);
job.setLastModified(ts);
+ executableDigestMap.put(job.getId(), job);
return job;
} catch (IOException e) {
logger.error("error update job:" + job.getUuid(), e);
@@ -180,6 +335,7 @@ public class ExecutableDao {
public void deleteJob(String uuid) throws PersistentException {
try {
store.deleteResource(pathOfJob(uuid));
+ executableDigestMap.remove(uuid);
} catch (IOException e) {
logger.error("error delete job:" + uuid, e);
throw new PersistentException(e);
@@ -205,6 +361,8 @@ public class ExecutableDao {
try {
output.setLastModified(0);
writeJobOutputResource(pathOfJobOutput(output.getUuid()), output);
+ if (!isTaskExecutableOutput(output.getUuid()))
+ executableOutputDigestMap.put(output.getUuid(), output);
} catch (IOException e) {
logger.error("error update job output id:" + output.getUuid(), e);
throw new PersistentException(e);
@@ -215,6 +373,8 @@ public class ExecutableDao {
try {
final long ts = writeJobOutputResource(pathOfJobOutput(output.getUuid()), output);
output.setLastModified(ts);
+ if (!isTaskExecutableOutput(output.getUuid()))
+ executableOutputDigestMap.put(output.getUuid(), output);
} catch (IOException e) {
logger.error("error update job output id:" + output.getUuid(), e);
throw new PersistentException(e);
@@ -224,6 +384,8 @@ public class ExecutableDao {
public void deleteJobOutput(String uuid) throws PersistentException {
try {
store.deleteResource(pathOfJobOutput(uuid));
+ if (!isTaskExecutableOutput(uuid))
+ executableOutputDigestMap.remove(uuid);
} catch (IOException e) {
logger.error("error delete job:" + uuid, e);
throw new PersistentException(e);
diff --git a/core-job/src/main/java/org/apache/kylin/job/execution/ExecutableManager.java b/core-job/src/main/java/org/apache/kylin/job/execution/ExecutableManager.java
index abcb048..d37b3da 100644
--- a/core-job/src/main/java/org/apache/kylin/job/execution/ExecutableManager.java
+++ b/core-job/src/main/java/org/apache/kylin/job/execution/ExecutableManager.java
@@ -202,6 +202,16 @@ public class ExecutableManager {
}
}
+ public Map<String, ExecutableOutputPO> getAllOutputDigests(long timeStartInMillis, long timeEndInMillis) {
+ final List<ExecutableOutputPO> jobOutputs = executableDao.getJobOutputDigests(timeStartInMillis,
+ timeEndInMillis);
+ HashMap<String, ExecutableOutputPO> result = Maps.newHashMap();
+ for (ExecutableOutputPO jobOutput : jobOutputs) {
+ result.put(jobOutput.getId(), jobOutput);
+ }
+ return result;
+ }
+
public List<AbstractExecutable> getAllExecutables() {
try {
List<AbstractExecutable> ret = Lists.newArrayList();
@@ -238,6 +248,19 @@ public class ExecutableManager {
}
}
+ public List<AbstractExecutable> getAllExecutableDigests(long timeStartInMillis, long timeEndInMillis) {
+ List<AbstractExecutable> ret = Lists.newArrayList();
+ for (ExecutablePO po : executableDao.getJobDigests(timeStartInMillis, timeEndInMillis)) {
+ try {
+ AbstractExecutable ae = parseTo(po);
+ ret.add(ae);
+ } catch (IllegalArgumentException e) {
+ logger.error("error parsing one executabePO: ", e);
+ }
+ }
+ return ret;
+ }
+
public List<String> getAllJobIds() {
try {
return executableDao.getJobIds();
diff --git a/core-metadata/src/main/java/org/apache/kylin/metadata/cachesync/CachedCrudAssist.java b/core-metadata/src/main/java/org/apache/kylin/metadata/cachesync/CachedCrudAssist.java
index be3d8d4..37e6328 100644
--- a/core-metadata/src/main/java/org/apache/kylin/metadata/cachesync/CachedCrudAssist.java
+++ b/core-metadata/src/main/java/org/apache/kylin/metadata/cachesync/CachedCrudAssist.java
@@ -108,7 +108,7 @@ abstract public class CachedCrudAssist<T extends RootPersistentEntity> {
return resRootPath + "/" + resourceName + resPathSuffix;
}
- private String resourceName(String resourcePath) {
+ protected String resourceName(String resourcePath) {
Preconditions.checkArgument(resourcePath.startsWith(resRootPath));
Preconditions.checkArgument(resourcePath.endsWith(resPathSuffix));
return resourcePath.substring(resRootPath.length() + 1, resourcePath.length() - resPathSuffix.length());
diff --git a/engine-mr/src/main/java/org/apache/kylin/engine/mr/common/JobInfoConverter.java b/engine-mr/src/main/java/org/apache/kylin/engine/mr/common/JobInfoConverter.java
index 862d7e5..19fdca8 100644
--- a/engine-mr/src/main/java/org/apache/kylin/engine/mr/common/JobInfoConverter.java
+++ b/engine-mr/src/main/java/org/apache/kylin/engine/mr/common/JobInfoConverter.java
@@ -27,11 +27,14 @@ import org.apache.kylin.cube.model.CubeBuildTypeEnum;
import org.apache.kylin.engine.mr.CubingJob;
import org.apache.kylin.engine.mr.steps.CubingExecutableUtil;
import org.apache.kylin.job.JobInstance;
+import org.apache.kylin.job.JobSearchResult;
import org.apache.kylin.job.common.ShellExecutable;
import org.apache.kylin.job.constant.JobStatusEnum;
import org.apache.kylin.job.constant.JobStepStatusEnum;
+import org.apache.kylin.job.dao.ExecutableOutputPO;
import org.apache.kylin.job.execution.AbstractExecutable;
import org.apache.kylin.job.execution.CheckpointExecutable;
+import org.apache.kylin.job.execution.DefaultChainedExecutable;
import org.apache.kylin.job.execution.ExecutableState;
import org.apache.kylin.job.execution.Output;
import org.slf4j.Logger;
@@ -201,4 +204,35 @@ public class JobInfoConverter {
throw new RuntimeException("invalid state:" + state);
}
}
+
+ public static JobSearchResult parseToJobSearchResult(DefaultChainedExecutable job, Map<String, ExecutableOutputPO> outputs) {
+ if (job == null) {
+ logger.warn("job is null.");
+ return null;
+ }
+
+ ExecutableOutputPO output = outputs.get(job.getId());
+ if (output == null) {
+ logger.warn("job output is null.");
+ return null;
+ }
+
+ final JobSearchResult result = new JobSearchResult();
+
+ String cubeName = CubingExecutableUtil.getCubeName(job.getParams());
+
+ if (cubeName == null) {
+ cubeName = job.getParam("model_name");
+ } else {
+ CubeInstance cube = CubeManager.getInstance(KylinConfig.getInstanceFromEnv()).getCube(cubeName);
+ if (cube != null) {
+ cubeName = cube.getDisplayName();
+ }
+ }
+ result.setCubeName(cubeName);
+ result.setId(job.getId());
+ result.setJobName(job.getName());
+ result.setLastModified(output.getLastModified());
+ return result;
+ }
}
diff --git a/server-base/src/main/java/org/apache/kylin/rest/controller/JobController.java b/server-base/src/main/java/org/apache/kylin/rest/controller/JobController.java
index 1bf6ab6..5407649 100644
--- a/server-base/src/main/java/org/apache/kylin/rest/controller/JobController.java
+++ b/server-base/src/main/java/org/apache/kylin/rest/controller/JobController.java
@@ -83,7 +83,7 @@ public class JobController extends BasicController {
}
try {
- jobInstanceList = jobService.searchJobs(jobRequest.getCubeName(), jobRequest.getProjectName(), statusList,
+ jobInstanceList = jobService.searchJobsV2(jobRequest.getCubeName(), jobRequest.getProjectName(), statusList,
jobRequest.getLimit(), jobRequest.getOffset(), timeFilter, jobSearchMode);
} catch (Exception e) {
logger.error(e.getLocalizedMessage(), e);
diff --git a/server-base/src/main/java/org/apache/kylin/rest/service/JobService.java b/server-base/src/main/java/org/apache/kylin/rest/service/JobService.java
index b1b69f6..2603267 100644
--- a/server-base/src/main/java/org/apache/kylin/rest/service/JobService.java
+++ b/server-base/src/main/java/org/apache/kylin/rest/service/JobService.java
@@ -18,12 +18,19 @@
package org.apache.kylin.rest.service;
-import com.google.common.base.Function;
-import com.google.common.base.Predicate;
-import com.google.common.base.Predicates;
-import com.google.common.collect.FluentIterable;
-import com.google.common.collect.Lists;
-import com.google.common.collect.Sets;
+import java.io.IOException;
+import java.util.ArrayList;
+import java.util.Calendar;
+import java.util.Collections;
+import java.util.Date;
+import java.util.EnumSet;
+import java.util.List;
+import java.util.Map;
+import java.util.Set;
+import java.util.TimeZone;
+
+import javax.annotation.Nullable;
+
import org.apache.commons.lang3.StringUtils;
import org.apache.directory.api.util.Strings;
import org.apache.kylin.common.KylinConfig;
@@ -41,10 +48,12 @@ import org.apache.kylin.engine.mr.LookupSnapshotJobBuilder;
import org.apache.kylin.engine.mr.common.JobInfoConverter;
import org.apache.kylin.engine.mr.steps.CubingExecutableUtil;
import org.apache.kylin.job.JobInstance;
+import org.apache.kylin.job.JobSearchResult;
import org.apache.kylin.job.Scheduler;
import org.apache.kylin.job.SchedulerFactory;
import org.apache.kylin.job.constant.JobStatusEnum;
import org.apache.kylin.job.constant.JobTimeFilterEnum;
+import org.apache.kylin.job.dao.ExecutableOutputPO;
import org.apache.kylin.job.engine.JobEngineConfig;
import org.apache.kylin.job.exception.JobException;
import org.apache.kylin.job.exception.SchedulerException;
@@ -74,16 +83,12 @@ import org.springframework.context.annotation.EnableAspectJAutoProxy;
import org.springframework.security.core.context.SecurityContextHolder;
import org.springframework.stereotype.Component;
-import javax.annotation.Nullable;
-import java.io.IOException;
-import java.util.Calendar;
-import java.util.Collections;
-import java.util.Date;
-import java.util.EnumSet;
-import java.util.List;
-import java.util.Map;
-import java.util.Set;
-import java.util.TimeZone;
+import com.google.common.base.Function;
+import com.google.common.base.Predicate;
+import com.google.common.base.Predicates;
+import com.google.common.collect.FluentIterable;
+import com.google.common.collect.Lists;
+import com.google.common.collect.Sets;
/**
* @author ysong1
@@ -387,7 +392,8 @@ public class JobService extends BasicService implements InitializingBean {
return optimizeJobInstance;
}
- public JobInstance submitLookupSnapshotJob(CubeInstance cube, String lookupTable, List<String> segmentIDs, String submitter) throws IOException {
+ public JobInstance submitLookupSnapshotJob(CubeInstance cube, String lookupTable, List<String> segmentIDs,
+ String submitter) throws IOException {
LookupSnapshotBuildJob job = new LookupSnapshotJobBuilder(cube, lookupTable, segmentIDs, submitter).build();
getExecutableManager().addJob(job);
@@ -660,17 +666,19 @@ public class JobService extends BasicService implements InitializingBean {
+ SecurityContextHolder.getContext().getAuthentication().getName());
}
+ //******************************** Job search apis for Job controller V1 *******************************************
/**
- * currently only support substring match
- *
- * @return
- */
+ * currently only support substring match
+ *
+ * @return
+ */
public List<JobInstance> searchJobs(final String cubeNameSubstring, final String projectName,
final List<JobStatusEnum> statusList, final Integer limitValue, final Integer offsetValue,
final JobTimeFilterEnum timeFilter, JobSearchMode jobSearchMode) {
Integer limit = (null == limitValue) ? 30 : limitValue;
Integer offset = (null == offsetValue) ? 0 : offsetValue;
- List<JobInstance> jobs = searchJobsByCubeName(cubeNameSubstring, projectName, statusList, timeFilter, jobSearchMode);
+ List<JobInstance> jobs = searchJobsByCubeName(cubeNameSubstring, projectName, statusList, timeFilter,
+ jobSearchMode);
Collections.sort(jobs);
@@ -685,26 +693,18 @@ public class JobService extends BasicService implements InitializingBean {
return jobs.subList(offset, offset + limit);
}
+ /**
+ * it loads all metadata of "execute" and "execute_output", and parses all job instances within the scope of the given filters
+ *
+ * @return List of job instances searched by the method
+ *
+ */
public List<JobInstance> searchJobsByCubeName(final String cubeNameSubstring, final String projectName,
- final List<JobStatusEnum> statusList, final JobTimeFilterEnum timeFilter) {
- return searchJobsByCubeName(cubeNameSubstring, projectName, statusList, timeFilter, JobSearchMode.CUBING_ONLY);
- }
-
- public List<JobInstance> searchJobsByCubeName(final String cubeNameSubstring, final String projectName,
- final List<JobStatusEnum> statusList, final JobTimeFilterEnum timeFilter, final JobSearchMode jobSearchMode) {
+ final List<JobStatusEnum> statusList, final JobTimeFilterEnum timeFilter,
+ final JobSearchMode jobSearchMode) {
return innerSearchJobs(cubeNameSubstring, null, projectName, statusList, timeFilter, jobSearchMode);
}
- public List<JobInstance> searchJobsByJobName(final String jobName, final String projectName,
- final List<JobStatusEnum> statusList, final JobTimeFilterEnum timeFilter) {
- return searchJobsByJobName(jobName, projectName, statusList, timeFilter, JobSearchMode.ALL);
- }
-
- public List<JobInstance> searchJobsByJobName(final String jobName, final String projectName,
- final List<JobStatusEnum> statusList, final JobTimeFilterEnum timeFilter, JobSearchMode jobSearchMode) {
- return innerSearchJobs(null, jobName, projectName, statusList, timeFilter, jobSearchMode);
- }
-
public List<JobInstance> innerSearchJobs(final String cubeName, final String jobName, final String projectName,
final List<JobStatusEnum> statusList, final JobTimeFilterEnum timeFilter, JobSearchMode jobSearchMode) {
List<JobInstance> result = Lists.newArrayList();
@@ -756,82 +756,90 @@ public class JobService extends BasicService implements InitializingBean {
}));
}
+ /**
+ * loads all metadata of "execute" and returns list of cubing job within the scope of the given filters
+ *
+ * @param allOutputs map of executable output data with type DefaultOutput parsed from ExecutableOutputPO
+ *
+ */
public List<CubingJob> innerSearchCubingJobs(final String cubeName, final String jobName,
final Set<ExecutableState> statusList, long timeStartInMillis, long timeEndInMillis,
final Map<String, Output> allOutputs, final boolean nameExactMatch, final String projectName) {
List<CubingJob> results = Lists.newArrayList(
FluentIterable.from(getExecutableManager().getAllExecutables(timeStartInMillis, timeEndInMillis))
.filter(new Predicate<AbstractExecutable>() {
- @Override
- public boolean apply(AbstractExecutable executable) {
- if (executable instanceof CubingJob) {
- if (StringUtils.isEmpty(cubeName)) {
- return true;
+ @Override
+ public boolean apply(AbstractExecutable executable) {
+ if (executable instanceof CubingJob) {
+ if (StringUtils.isEmpty(cubeName)) {
+ return true;
+ }
+ String executableCubeName = CubingExecutableUtil
+ .getCubeName(executable.getParams());
+ if (executableCubeName == null)
+ return true;
+ if (nameExactMatch)
+ return executableCubeName.equalsIgnoreCase(cubeName);
+ else
+ return executableCubeName.toLowerCase().contains(cubeName.toLowerCase());
+ } else {
+ return false;
+ }
}
- String executableCubeName = CubingExecutableUtil.getCubeName(executable.getParams());
- if (executableCubeName == null)
- return true;
- if (nameExactMatch)
- return executableCubeName.equalsIgnoreCase(cubeName);
- else
- return executableCubeName.toLowerCase().contains(cubeName.toLowerCase());
- } else {
- return false;
- }
- }
- }).transform(new Function<AbstractExecutable, CubingJob>() {
- @Override
- public CubingJob apply(AbstractExecutable executable) {
- return (CubingJob) executable;
- }
- }).filter(Predicates.and(new Predicate<CubingJob>() {
- @Override
- public boolean apply(CubingJob executable) {
- if (null == projectName || null == getProjectManager().getProject(projectName)) {
- return true;
- } else {
- return projectName.equalsIgnoreCase(executable.getProjectName());
- }
- }
- }, new Predicate<CubingJob>() {
- @Override
- public boolean apply(CubingJob executable) {
- try {
- Output output = allOutputs.get(executable.getId());
- if (output == null) {
- return false;
+ }).transform(new Function<AbstractExecutable, CubingJob>() {
+ @Override
+ public CubingJob apply(AbstractExecutable executable) {
+ return (CubingJob) executable;
}
+ }).filter(Predicates.and(new Predicate<CubingJob>() {
+ @Override
+ public boolean apply(CubingJob executable) {
+ if (null == projectName || null == getProjectManager().getProject(projectName)) {
+ return true;
+ } else {
+ return projectName.equalsIgnoreCase(executable.getProjectName());
+ }
+ }
+ }, new Predicate<CubingJob>() {
+ @Override
+ public boolean apply(CubingJob executable) {
+ try {
+ Output output = allOutputs.get(executable.getId());
+ if (output == null) {
+ return false;
+ }
- ExecutableState state = output.getState();
- boolean ret = statusList.contains(state);
- return ret;
- } catch (Exception e) {
- throw e;
- }
- }
- }, new Predicate<CubingJob>() {
- @Override
- public boolean apply(@Nullable CubingJob cubeJob) {
- if (cubeJob == null) {
- return false;
- }
-
- if (Strings.isEmpty(jobName)) {
- return true;
- }
-
- if (nameExactMatch) {
- return cubeJob.getName().equalsIgnoreCase(jobName);
- } else {
- return cubeJob.getName().toLowerCase().contains(jobName.toLowerCase());
- }
- }
- })));
+ ExecutableState state = output.getState();
+ boolean ret = statusList.contains(state);
+ return ret;
+ } catch (Exception e) {
+ throw e;
+ }
+ }
+ }, new Predicate<CubingJob>() {
+ @Override
+ public boolean apply(@Nullable CubingJob cubeJob) {
+ if (cubeJob == null) {
+ return false;
+ }
+
+ if (Strings.isEmpty(jobName)) {
+ return true;
+ }
+
+ if (nameExactMatch) {
+ return cubeJob.getName().equalsIgnoreCase(jobName);
+ } else {
+ return cubeJob.getName().toLowerCase().contains(jobName.toLowerCase());
+ }
+ }
+ })));
return results;
}
public List<JobInstance> innerSearchCheckpointJobs(final String cubeName, final String jobName,
final String projectName, final List<JobStatusEnum> statusList, final JobTimeFilterEnum timeFilter) {
+ // TODO: use cache of jobs for this method
// prepare time range
Calendar calendar = Calendar.getInstance();
calendar.setTime(new Date());
@@ -855,83 +863,353 @@ public class JobService extends BasicService implements InitializingBean {
public List<CheckpointExecutable> innerSearchCheckpointJobs(final String cubeName, final String jobName,
final Set<ExecutableState> statusList, long timeStartInMillis, long timeEndInMillis,
final Map<String, Output> allOutputs, final boolean nameExactMatch, final String projectName) {
- List<CheckpointExecutable> results = Lists
- .newArrayList(
- FluentIterable
- .from(getExecutableManager().getAllExecutables(timeStartInMillis, timeEndInMillis))
- .filter(new Predicate<AbstractExecutable>() {
- @Override
- public boolean apply(AbstractExecutable executable) {
- if (executable instanceof CheckpointExecutable) {
- if (StringUtils.isEmpty(cubeName)) {
- return true;
- }
- String executableCubeName = CubingExecutableUtil
- .getCubeName(executable.getParams());
- if (executableCubeName == null)
- return true;
- if (nameExactMatch)
- return executableCubeName.equalsIgnoreCase(cubeName);
- else
- return executableCubeName.toLowerCase()
- .contains(cubeName.toLowerCase());
- } else {
- return false;
- }
- }
- }).transform(new Function<AbstractExecutable, CheckpointExecutable>() {
- @Override
- public CheckpointExecutable apply(AbstractExecutable executable) {
- return (CheckpointExecutable) executable;
+ List<CheckpointExecutable> results = Lists.newArrayList(
+ FluentIterable.from(getExecutableManager().getAllExecutables(timeStartInMillis, timeEndInMillis))
+ .filter(new Predicate<AbstractExecutable>() {
+ @Override
+ public boolean apply(AbstractExecutable executable) {
+ if (executable instanceof CheckpointExecutable) {
+ if (StringUtils.isEmpty(cubeName)) {
+ return true;
}
- }).filter(Predicates.and(new Predicate<CheckpointExecutable>() {
- @Override
- public boolean apply(CheckpointExecutable executable) {
- if (null == projectName
- || null == getProjectManager().getProject(projectName)) {
- return true;
- } else {
- return projectName.equalsIgnoreCase(executable.getProjectName());
- }
+ String executableCubeName = CubingExecutableUtil
+ .getCubeName(executable.getParams());
+ if (executableCubeName == null)
+ return true;
+ if (nameExactMatch)
+ return executableCubeName.equalsIgnoreCase(cubeName);
+ else
+ return executableCubeName.toLowerCase().contains(cubeName.toLowerCase());
+ } else {
+ return false;
+ }
+ }
+ }).transform(new Function<AbstractExecutable, CheckpointExecutable>() {
+ @Override
+ public CheckpointExecutable apply(AbstractExecutable executable) {
+ return (CheckpointExecutable) executable;
+ }
+ }).filter(Predicates.and(new Predicate<CheckpointExecutable>() {
+ @Override
+ public boolean apply(CheckpointExecutable executable) {
+ if (null == projectName || null == getProjectManager().getProject(projectName)) {
+ return true;
+ } else {
+ return projectName.equalsIgnoreCase(executable.getProjectName());
+ }
+ }
+ }, new Predicate<CheckpointExecutable>() {
+ @Override
+ public boolean apply(CheckpointExecutable executable) {
+ try {
+ Output output = allOutputs.get(executable.getId());
+ if (output == null) {
+ return false;
}
- }, new Predicate<CheckpointExecutable>() {
- @Override
- public boolean apply(CheckpointExecutable executable) {
- try {
- Output output = allOutputs.get(executable.getId());
- if (output == null) {
- return false;
- }
-
- ExecutableState state = output.getState();
- boolean ret = statusList.contains(state);
- return ret;
- } catch (Exception e) {
- throw e;
- }
+
+ ExecutableState state = output.getState();
+ boolean ret = statusList.contains(state);
+ return ret;
+ } catch (Exception e) {
+ throw e;
+ }
+ }
+ }, new Predicate<CheckpointExecutable>() {
+ @Override
+ public boolean apply(@Nullable CheckpointExecutable checkpointExecutable) {
+ if (checkpointExecutable == null) {
+ return false;
+ }
+
+ if (Strings.isEmpty(jobName)) {
+ return true;
+ }
+
+ if (nameExactMatch) {
+ return checkpointExecutable.getName().equalsIgnoreCase(jobName);
+ } else {
+ return checkpointExecutable.getName().toLowerCase().contains(jobName.toLowerCase());
+ }
+ }
+ })));
+ return results;
+ }
+ //****************************** Job search apis for Job controller V1 end *****************************************
+
+ //******************************** Job search apis for Job controller V2 *******************************************
+ public List<JobInstance> searchJobsV2(final String cubeNameSubstring, final String projectName,
+ final List<JobStatusEnum> statusList, final Integer limitValue, final Integer offsetValue,
+ final JobTimeFilterEnum timeFilter, JobSearchMode jobSearchMode) {
+ Integer limit = (null == limitValue) ? 30 : limitValue;
+ Integer offset = (null == offsetValue) ? 0 : offsetValue;
+ List<JobSearchResult> jobSearchResultList = searchJobsByCubeNameV2(cubeNameSubstring, projectName, statusList,
+ timeFilter, jobSearchMode);
+
+ Collections.sort(jobSearchResultList);
+
+ if (jobSearchResultList.size() <= offset) {
+ return Collections.emptyList();
+ }
+
+ // Fetch instance data of jobs for the searched job results
+ List<JobSearchResult> subJobSearchResultList;
+ if ((jobSearchResultList.size() - offset) < limit) {
+ subJobSearchResultList = jobSearchResultList.subList(offset, jobSearchResultList.size());
+ } else {
+ subJobSearchResultList = jobSearchResultList.subList(offset, offset + limit);
+ }
+
+ List<JobInstance> jobInstanceList = new ArrayList<>();
+ for (JobSearchResult result : subJobSearchResultList) {
+ JobInstance jobInstance = getJobInstance(result.getId());
+ jobInstanceList.add(jobInstance);
+ }
+
+ return jobInstanceList;
+ }
+
+ /**
+ * it loads all cache for digest metadata of "execute" and "execute_output", and returns the search results within the scope of the given filters
+ *
+ * @return List of search results searched by the method
+ *
+ */
+ public List<JobSearchResult> searchJobsByCubeNameV2(final String cubeNameSubstring, final String projectName,
+ final List<JobStatusEnum> statusList, final JobTimeFilterEnum timeFilter,
+ final JobSearchMode jobSearchMode) {
+ List<JobSearchResult> result = Lists.newArrayList();
+ switch (jobSearchMode) {
+ case ALL:
+ result.addAll(innerSearchCubingJobsV2(cubeNameSubstring, null, projectName, statusList, timeFilter));
+ result.addAll(innerSearchCheckpointJobsV2(cubeNameSubstring, null, projectName, statusList, timeFilter));
+ break;
+ case CHECKPOINT_ONLY:
+ result.addAll(innerSearchCheckpointJobsV2(cubeNameSubstring, null, projectName, statusList, timeFilter));
+ break;
+ case CUBING_ONLY:
+ default:
+ result.addAll(innerSearchCubingJobsV2(cubeNameSubstring, null, projectName, statusList, timeFilter));
+ }
+ return result;
+ }
+
+ public List<JobSearchResult> innerSearchCubingJobsV2(final String cubeName, final String jobName,
+ final String projectName, final List<JobStatusEnum> statusList, final JobTimeFilterEnum timeFilter) {
+ if (null == projectName) {
+ aclEvaluate.checkIsGlobalAdmin();
+ } else {
+ aclEvaluate.checkProjectOperationPermission(projectName);
+ }
+ // prepare time range
+ Calendar calendar = Calendar.getInstance();
+ calendar.setTime(new Date());
+ long timeStartInMillis = getTimeStartInMillis(calendar, timeFilter);
+ long timeEndInMillis = Long.MAX_VALUE;
+ Set<ExecutableState> states = convertStatusEnumToStates(statusList);
+ final Map<String, ExecutableOutputPO> allOutputDigests = getExecutableManager()
+ .getAllOutputDigests(timeStartInMillis, timeEndInMillis);
+ return Lists
+ .newArrayList(FluentIterable
+ .from(innerSearchCubingJobsV2(cubeName, jobName, states, timeStartInMillis, timeEndInMillis,
+ allOutputDigests, false, projectName))
+ .transform(new Function<CubingJob, JobSearchResult>() {
+ @Override
+ public JobSearchResult apply(CubingJob cubingJob) {
+ return JobInfoConverter.parseToJobSearchResult(cubingJob, allOutputDigests);
+ }
+ }).filter(new Predicate<JobSearchResult>() {
+ @Override
+ public boolean apply(@Nullable JobSearchResult input) {
+ return input != null;
+ }
+ }));
+ }
+
+ public List<JobSearchResult> innerSearchCheckpointJobsV2(final String cubeName, final String jobName,
+ final String projectName, final List<JobStatusEnum> statusList, final JobTimeFilterEnum timeFilter) {
+ if (null == projectName) {
+ aclEvaluate.checkIsGlobalAdmin();
+ } else {
+ aclEvaluate.checkProjectOperationPermission(projectName);
+ }
+ // prepare time range
+ Calendar calendar = Calendar.getInstance();
+ calendar.setTime(new Date());
+ long timeStartInMillis = getTimeStartInMillis(calendar, timeFilter);
+ long timeEndInMillis = Long.MAX_VALUE;
+ Set<ExecutableState> states = convertStatusEnumToStates(statusList);
+ final Map<String, ExecutableOutputPO> allOutputDigests = getExecutableManager()
+ .getAllOutputDigests(timeStartInMillis, timeEndInMillis);
+ return Lists.newArrayList(FluentIterable
+ .from(innerSearchCheckpointJobsV2(cubeName, jobName, states, timeStartInMillis, timeEndInMillis,
+ allOutputDigests, false, projectName))
+ .transform(new Function<CheckpointExecutable, JobSearchResult>() {
+ @Override
+ public JobSearchResult apply(CheckpointExecutable checkpointExecutable) {
+ return JobInfoConverter.parseToJobSearchResult(checkpointExecutable, allOutputDigests);
+ }
+ }).filter(new Predicate<JobSearchResult>() {
+ @Override
+ public boolean apply(@Nullable JobSearchResult input) {
+ return input != null;
+ }
+ }));
+ }
+
+ /**
+ * Called by searchJobsByCubeNameV2, it loads all cache of digest metadata of "execute" and returns list of cubing job within the scope of the given filters
+ *
+ * @param allExecutableOutputPO map of executable output data with type ExecutableOutputPO
+ *
+ */
+ public List<CubingJob> innerSearchCubingJobsV2(final String cubeName, final String jobName,
+ final Set<ExecutableState> statusList, long timeStartInMillis, long timeEndInMillis,
+ final Map<String, ExecutableOutputPO> allExecutableOutputPO, final boolean nameExactMatch,
+ final String projectName) {
+ List<CubingJob> results = Lists.newArrayList(
+ FluentIterable.from(getExecutableManager().getAllExecutableDigests(timeStartInMillis, timeEndInMillis))
+ .filter(new Predicate<AbstractExecutable>() {
+ @Override
+ public boolean apply(AbstractExecutable executable) {
+ if (executable instanceof CubingJob) {
+ if (StringUtils.isEmpty(cubeName)) {
+ return true;
}
- }, new Predicate<CheckpointExecutable>() {
- @Override
- public boolean apply(@Nullable CheckpointExecutable checkpointExecutable) {
- if (checkpointExecutable == null) {
- return false;
- }
-
- if (Strings.isEmpty(jobName)) {
- return true;
- }
-
- if (nameExactMatch) {
- return checkpointExecutable.getName().equalsIgnoreCase(jobName);
- } else {
- return checkpointExecutable.getName().toLowerCase()
- .contains(jobName.toLowerCase());
- }
+ String executableCubeName = CubingExecutableUtil
+ .getCubeName(executable.getParams());
+ if (executableCubeName == null)
+ return true;
+ if (nameExactMatch)
+ return executableCubeName.equalsIgnoreCase(cubeName);
+ else
+ return executableCubeName.toLowerCase().contains(cubeName.toLowerCase());
+ } else {
+ return false;
+ }
+ }
+ }).transform(new Function<AbstractExecutable, CubingJob>() {
+ @Override
+ public CubingJob apply(AbstractExecutable executable) {
+ return (CubingJob) executable;
+ }
+ }).filter(Predicates.and(new Predicate<CubingJob>() {
+ @Override
+ public boolean apply(CubingJob executable) {
+ if (null == projectName || null == getProjectManager().getProject(projectName)) {
+ return true;
+ } else {
+ return projectName.equalsIgnoreCase(executable.getProjectName());
+ }
+ }
+ }, new Predicate<CubingJob>() {
+ @Override
+ public boolean apply(CubingJob executable) {
+ try {
+ ExecutableOutputPO executableOutputPO = allExecutableOutputPO
+ .get(executable.getId());
+ ExecutableState state = ExecutableState.valueOf(executableOutputPO.getStatus());
+ return statusList.contains(state);
+
+ } catch (Exception e) {
+ throw e;
+ }
+ }
+ }, new Predicate<CubingJob>() {
+ @Override
+ public boolean apply(@Nullable CubingJob cubeJob) {
+ if (cubeJob == null) {
+ return false;
+ }
+
+ if (Strings.isEmpty(jobName)) {
+ return true;
+ }
+
+ if (nameExactMatch) {
+ return cubeJob.getName().equalsIgnoreCase(jobName);
+ } else {
+ return cubeJob.getName().toLowerCase().contains(jobName.toLowerCase());
+ }
+ }
+ })));
+ return results;
+ }
+
+ public List<CheckpointExecutable> innerSearchCheckpointJobsV2(final String cubeName, final String jobName,
+ final Set<ExecutableState> statusList, long timeStartInMillis, long timeEndInMillis,
+ final Map<String, ExecutableOutputPO> allExecutableOutputPO, final boolean nameExactMatch,
+ final String projectName) {
+ List<CheckpointExecutable> results = Lists.newArrayList(
+ FluentIterable.from(getExecutableManager().getAllExecutableDigests(timeStartInMillis, timeEndInMillis))
+ .filter(new Predicate<AbstractExecutable>() {
+ @Override
+ public boolean apply(AbstractExecutable executable) {
+ if (executable instanceof CheckpointExecutable) {
+ if (StringUtils.isEmpty(cubeName)) {
+ return true;
}
- })));
+ String executableCubeName = CubingExecutableUtil
+ .getCubeName(executable.getParams());
+ if (executableCubeName == null)
+ return true;
+ if (nameExactMatch)
+ return executableCubeName.equalsIgnoreCase(cubeName);
+ else
+ return executableCubeName.toLowerCase().contains(cubeName.toLowerCase());
+ } else {
+ return false;
+ }
+ }
+ }).transform(new Function<AbstractExecutable, CheckpointExecutable>() {
+ @Override
+ public CheckpointExecutable apply(AbstractExecutable executable) {
+ return (CheckpointExecutable) executable;
+ }
+ }).filter(Predicates.and(new Predicate<CheckpointExecutable>() {
+ @Override
+ public boolean apply(CheckpointExecutable executable) {
+ if (null == projectName || null == getProjectManager().getProject(projectName)) {
+ return true;
+ } else {
+ return projectName.equalsIgnoreCase(executable.getProjectName());
+ }
+ }
+ }, new Predicate<CheckpointExecutable>() {
+ @Override
+ public boolean apply(CheckpointExecutable executable) {
+ try {
+ ExecutableOutputPO executableOutputPO = allExecutableOutputPO
+ .get(executable.getId());
+ ExecutableState state = ExecutableState.valueOf(executableOutputPO.getStatus());
+ return statusList.contains(state);
+
+ } catch (Exception e) {
+ throw e;
+ }
+ }
+ }, new Predicate<CheckpointExecutable>() {
+ @Override
+ public boolean apply(@Nullable CheckpointExecutable checkpointExecutable) {
+ if (checkpointExecutable == null) {
+ return false;
+ }
+
+ if (Strings.isEmpty(jobName)) {
+ return true;
+ }
+
+ if (nameExactMatch) {
+ return checkpointExecutable.getName().equalsIgnoreCase(jobName);
+ } else {
+ return checkpointExecutable.getName().toLowerCase().contains(jobName.toLowerCase());
+ }
+ }
+ })));
return results;
}
+ //****************************** Job search apis for Job controller V2 end *****************************************
+
public List<CubingJob> listJobsByRealizationName(final String realizationName, final String projectName,
final Set<ExecutableState> statusList) {
return innerSearchCubingJobs(realizationName, null, statusList, 0L, Long.MAX_VALUE,