You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@kylin.apache.org by ma...@apache.org on 2015/12/15 11:50:00 UTC
[1/2] kylin git commit: KYLIN-1079 add time filter for job history
metastore
Repository: kylin
Updated Branches:
refs/heads/2.x-staging af060b2ff -> 9e02ef1e5
KYLIN-1079 add time filter for job history metastore
Signed-off-by: honma <ho...@ebay.com>
Project: http://git-wip-us.apache.org/repos/asf/kylin/repo
Commit: http://git-wip-us.apache.org/repos/asf/kylin/commit/9e02ef1e
Tree: http://git-wip-us.apache.org/repos/asf/kylin/tree/9e02ef1e
Diff: http://git-wip-us.apache.org/repos/asf/kylin/diff/9e02ef1e
Branch: refs/heads/2.x-staging
Commit: 9e02ef1e52742ca67c93bceaea15df8414fbc308
Parents: 4ab1c14
Author: wangxianbin1987 <wa...@gmail.com>
Authored: Fri Dec 11 15:25:07 2015 +0800
Committer: honma <ho...@ebay.com>
Committed: Tue Dec 15 18:56:38 2015 +0800
----------------------------------------------------------------------
.../common/persistence/FileResourceStore.java | 6 ++
.../kylin/common/persistence/ResourceStore.java | 22 ++++++
.../kylin/job/constant/JobTimeFilterEnum.java | 43 ++++++++++++
.../org/apache/kylin/job/dao/ExecutableDao.java | 32 +++++++++
.../kylin/job/manager/ExecutableManager.java | 32 +++++++++
.../kylin/rest/controller/JobController.java | 5 +-
.../kylin/rest/request/JobListRequest.java | 8 +++
.../apache/kylin/rest/service/BasicService.java | 37 ++++++++++
.../apache/kylin/rest/service/JobService.java | 73 ++++++++++++++++++--
.../kylin/storage/hbase/HBaseResourceStore.java | 50 ++++++++++++++
webapp/app/js/controllers/job.js | 4 +-
webapp/app/js/model/jobConfig.js | 7 ++
webapp/app/partials/jobs/jobs.html | 6 ++
13 files changed, 318 insertions(+), 7 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/kylin/blob/9e02ef1e/core-common/src/main/java/org/apache/kylin/common/persistence/FileResourceStore.java
----------------------------------------------------------------------
diff --git a/core-common/src/main/java/org/apache/kylin/common/persistence/FileResourceStore.java b/core-common/src/main/java/org/apache/kylin/common/persistence/FileResourceStore.java
index 89e3a1d..49ff441 100644
--- a/core-common/src/main/java/org/apache/kylin/common/persistence/FileResourceStore.java
+++ b/core-common/src/main/java/org/apache/kylin/common/persistence/FileResourceStore.java
@@ -95,6 +95,12 @@ public class FileResourceStore extends ResourceStore {
}
@Override
+ protected List<RawResource> getAllResources(String rangeStart, String rangeEnd, long timeStartInMillis, long timeEndInMillis) throws IOException {
+ //just ignore time filter
+ return getAllResources(rangeStart, rangeEnd);
+ }
+
+ @Override
protected RawResource getResourceImpl(String resPath) throws IOException {
File f = file(resPath);
if (f.exists() && f.isFile()) {
http://git-wip-us.apache.org/repos/asf/kylin/blob/9e02ef1e/core-common/src/main/java/org/apache/kylin/common/persistence/ResourceStore.java
----------------------------------------------------------------------
diff --git a/core-common/src/main/java/org/apache/kylin/common/persistence/ResourceStore.java b/core-common/src/main/java/org/apache/kylin/common/persistence/ResourceStore.java
index 848d412..b2a4ce3 100644
--- a/core-common/src/main/java/org/apache/kylin/common/persistence/ResourceStore.java
+++ b/core-common/src/main/java/org/apache/kylin/common/persistence/ResourceStore.java
@@ -181,8 +181,30 @@ abstract public class ResourceStore {
}
}
+ final public <T extends RootPersistentEntity> List<T> getAllResources(String rangeStart, String rangeEnd, long timeStartInMillis, long timeEndInMillis, Class<T> clazz, Serializer<T> serializer) throws IOException {
+ final List<RawResource> allResources = getAllResources(rangeStart, rangeEnd, timeStartInMillis, timeEndInMillis);
+ if (allResources.isEmpty()) {
+ return Collections.emptyList();
+ }
+ List<T> result = Lists.newArrayList();
+ try {
+ for (RawResource rawResource : allResources) {
+ final T element = serializer.deserialize(new DataInputStream(rawResource.inputStream));
+ element.setLastModified(rawResource.timestamp);
+ result.add(element);
+ }
+ return result;
+ } finally {
+ for (RawResource rawResource : allResources) {
+ IOUtils.closeQuietly(rawResource.inputStream);
+ }
+ }
+ }
+
abstract protected List<RawResource> getAllResources(String rangeStart, String rangeEnd) throws IOException;
+ abstract protected List<RawResource> getAllResources(String rangeStart, String rangeEnd, long timeStartInMillis, long timeEndInMillis) throws IOException;
+
/** returns null if not exists */
abstract protected RawResource getResourceImpl(String resPath) throws IOException;
http://git-wip-us.apache.org/repos/asf/kylin/blob/9e02ef1e/core-job/src/main/java/org/apache/kylin/job/constant/JobTimeFilterEnum.java
----------------------------------------------------------------------
diff --git a/core-job/src/main/java/org/apache/kylin/job/constant/JobTimeFilterEnum.java b/core-job/src/main/java/org/apache/kylin/job/constant/JobTimeFilterEnum.java
new file mode 100644
index 0000000..c4787f7
--- /dev/null
+++ b/core-job/src/main/java/org/apache/kylin/job/constant/JobTimeFilterEnum.java
@@ -0,0 +1,43 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+*/
+
+package org.apache.kylin.job.constant;
+
+public enum JobTimeFilterEnum {
+ LAST_ONE_DAY(0), LAST_ONE_WEEK(1), LAST_ONE_MONTH(2), LAST_ONE_YEAR(3), ALL(4);
+
+ private final int code;
+
+ private JobTimeFilterEnum(int code) {
+ this.code = code;
+ }
+
+ public static JobTimeFilterEnum getByCode(int code) {
+ for (JobTimeFilterEnum timeFilter : values()) {
+ if (timeFilter.getCode() == code) {
+ return timeFilter;
+ }
+ }
+
+ return null;
+ }
+
+ public int getCode() {
+ return code;
+ }
+}
http://git-wip-us.apache.org/repos/asf/kylin/blob/9e02ef1e/core-job/src/main/java/org/apache/kylin/job/dao/ExecutableDao.java
----------------------------------------------------------------------
diff --git a/core-job/src/main/java/org/apache/kylin/job/dao/ExecutableDao.java b/core-job/src/main/java/org/apache/kylin/job/dao/ExecutableDao.java
index 18e36b4..4b1336f 100644
--- a/core-job/src/main/java/org/apache/kylin/job/dao/ExecutableDao.java
+++ b/core-job/src/main/java/org/apache/kylin/job/dao/ExecutableDao.java
@@ -108,6 +108,22 @@ public class ExecutableDao {
}
}
+ public List<ExecutableOutputPO> getJobOutputs(long timeStartInMillis, long timeEndInMillis) throws PersistentException {
+ try {
+ ArrayList<String> resources = store.listResources(ResourceStore.EXECUTE_OUTPUT_RESOURCE_ROOT);
+ if (resources == null || resources.isEmpty()) {
+ return Collections.emptyList();
+ }
+ Collections.sort(resources);
+ String rangeStart = resources.get(0);
+ String rangeEnd = resources.get(resources.size() - 1);
+ return store.getAllResources(rangeStart, rangeEnd, timeStartInMillis, timeEndInMillis, ExecutableOutputPO.class, JOB_OUTPUT_SERIALIZER);
+ } catch (IOException e) {
+ logger.error("error get all Jobs:", e);
+ throw new PersistentException(e);
+ }
+ }
+
public List<ExecutablePO> getJobs() throws PersistentException {
try {
final List<String> jobIds = store.listResources(ResourceStore.EXECUTE_RESOURCE_ROOT);
@@ -124,6 +140,22 @@ public class ExecutableDao {
}
}
+ public List<ExecutablePO> getJobs(long timeStartInMillis, long timeEndInMillis) throws PersistentException {
+ try {
+ final List<String> jobIds = store.listResources(ResourceStore.EXECUTE_RESOURCE_ROOT);
+ if (jobIds == null || jobIds.isEmpty()) {
+ return Collections.emptyList();
+ }
+ Collections.sort(jobIds);
+ String rangeStart = jobIds.get(0);
+ String rangeEnd = jobIds.get(jobIds.size() - 1);
+ return store.getAllResources(rangeStart, rangeEnd, timeStartInMillis, timeEndInMillis, ExecutablePO.class, JOB_SERIALIZER);
+ } catch (IOException e) {
+ logger.error("error get all Jobs:", e);
+ throw new PersistentException(e);
+ }
+ }
+
public List<String> getJobIds() throws PersistentException {
try {
ArrayList<String> resources = store.listResources(ResourceStore.EXECUTE_RESOURCE_ROOT);
http://git-wip-us.apache.org/repos/asf/kylin/blob/9e02ef1e/core-job/src/main/java/org/apache/kylin/job/manager/ExecutableManager.java
----------------------------------------------------------------------
diff --git a/core-job/src/main/java/org/apache/kylin/job/manager/ExecutableManager.java b/core-job/src/main/java/org/apache/kylin/job/manager/ExecutableManager.java
index d73fd26..3effbe7 100644
--- a/core-job/src/main/java/org/apache/kylin/job/manager/ExecutableManager.java
+++ b/core-job/src/main/java/org/apache/kylin/job/manager/ExecutableManager.java
@@ -148,6 +148,20 @@ public class ExecutableManager {
}
}
+ public Map<String, Output> getAllOutputs(long timeStartInMillis, long timeEndInMillis) {
+ try {
+ final List<ExecutableOutputPO> jobOutputs = executableDao.getJobOutputs(timeStartInMillis, timeEndInMillis);
+ HashMap<String, Output> result = Maps.newHashMap();
+ for (ExecutableOutputPO jobOutput : jobOutputs) {
+ result.put(jobOutput.getId(), parseOutput(jobOutput));
+ }
+ return result;
+ } catch (PersistentException e) {
+ logger.error("fail to get all job output:", e);
+ throw new RuntimeException(e);
+ }
+ }
+
public List<AbstractExecutable> getAllExecutables() {
try {
List<AbstractExecutable> ret = Lists.newArrayList();
@@ -166,6 +180,24 @@ public class ExecutableManager {
}
}
+ public List<AbstractExecutable> getAllExecutables(long timeStartInMillis, long timeEndInMillis) {
+ try {
+ List<AbstractExecutable> ret = Lists.newArrayList();
+ for (ExecutablePO po : executableDao.getJobs(timeStartInMillis, timeEndInMillis)) {
+ try {
+ AbstractExecutable ae = parseTo(po);
+ ret.add(ae);
+ } catch (IllegalArgumentException e) {
+ logger.error("error parsing one executabePO: ", e);
+ }
+ }
+ return ret;
+ } catch (PersistentException e) {
+ logger.error("error get All Jobs", e);
+ throw new RuntimeException(e);
+ }
+ }
+
public List<String> getAllJobIds() {
try {
return executableDao.getJobIds();
http://git-wip-us.apache.org/repos/asf/kylin/blob/9e02ef1e/server/src/main/java/org/apache/kylin/rest/controller/JobController.java
----------------------------------------------------------------------
diff --git a/server/src/main/java/org/apache/kylin/rest/controller/JobController.java b/server/src/main/java/org/apache/kylin/rest/controller/JobController.java
index f6323ed..5c835ac 100644
--- a/server/src/main/java/org/apache/kylin/rest/controller/JobController.java
+++ b/server/src/main/java/org/apache/kylin/rest/controller/JobController.java
@@ -29,6 +29,7 @@ import java.util.TimeZone;
import org.apache.kylin.common.KylinConfig;
import org.apache.kylin.job.JobInstance;
import org.apache.kylin.job.constant.JobStatusEnum;
+import org.apache.kylin.job.constant.JobTimeFilterEnum;
import org.apache.kylin.job.engine.JobEngineConfig;
import org.apache.kylin.job.impl.threadpool.DefaultScheduler;
import org.apache.kylin.job.lock.JobLock;
@@ -118,8 +119,10 @@ public class JobController extends BasicController implements InitializingBean {
}
}
+ JobTimeFilterEnum timeFilter = JobTimeFilterEnum.getByCode(jobRequest.getTimeFilter());
+
try {
- jobInstanceList = jobService.listAllJobs(jobRequest.getCubeName(), jobRequest.getProjectName(), statusList, jobRequest.getLimit(), jobRequest.getOffset());
+ jobInstanceList = jobService.listAllJobs(jobRequest.getCubeName(), jobRequest.getProjectName(), statusList, jobRequest.getLimit(), jobRequest.getOffset(), timeFilter);
} catch (Exception e) {
logger.error(e.getLocalizedMessage(), e);
throw new InternalErrorException(e);
http://git-wip-us.apache.org/repos/asf/kylin/blob/9e02ef1e/server/src/main/java/org/apache/kylin/rest/request/JobListRequest.java
----------------------------------------------------------------------
diff --git a/server/src/main/java/org/apache/kylin/rest/request/JobListRequest.java b/server/src/main/java/org/apache/kylin/rest/request/JobListRequest.java
index 9ce8e3a..51160d2 100644
--- a/server/src/main/java/org/apache/kylin/rest/request/JobListRequest.java
+++ b/server/src/main/java/org/apache/kylin/rest/request/JobListRequest.java
@@ -31,6 +31,7 @@ public class JobListRequest {
private String projectName;
private Integer offset;
private Integer limit;
+ private Integer timeFilter;
public JobListRequest() {
}
@@ -75,4 +76,11 @@ public class JobListRequest {
this.limit = limit;
}
+ public Integer getTimeFilter() {
+ return timeFilter;
+ }
+
+ public void setTimeFilter(Integer timeFilter) {
+ this.timeFilter = timeFilter;
+ }
}
http://git-wip-us.apache.org/repos/asf/kylin/blob/9e02ef1e/server/src/main/java/org/apache/kylin/rest/service/BasicService.java
----------------------------------------------------------------------
diff --git a/server/src/main/java/org/apache/kylin/rest/service/BasicService.java b/server/src/main/java/org/apache/kylin/rest/service/BasicService.java
index 9135dfa..20fff47 100644
--- a/server/src/main/java/org/apache/kylin/rest/service/BasicService.java
+++ b/server/src/main/java/org/apache/kylin/rest/service/BasicService.java
@@ -136,6 +136,43 @@ public abstract class BasicService {
return results;
}
+ protected List<CubingJob> listAllCubingJobs(final String cubeName, final String projectName, final Set<ExecutableState> statusList, long timeStartInMillis, long timeEndInMillis, final Map<String, Output> allOutputs) {
+ List<CubingJob> results = Lists.newArrayList(FluentIterable.from(getExecutableManager().getAllExecutables(timeStartInMillis, timeEndInMillis)).filter(new Predicate<AbstractExecutable>() {
+ @Override
+ public boolean apply(AbstractExecutable executable) {
+ if (executable instanceof CubingJob) {
+ if (cubeName == null) {
+ return true;
+ }
+ return ((CubingJob) executable).getCubeName().equalsIgnoreCase(cubeName);
+ } else {
+ return false;
+ }
+ }
+ }).transform(new Function<AbstractExecutable, CubingJob>() {
+ @Override
+ public CubingJob apply(AbstractExecutable executable) {
+ return (CubingJob) executable;
+ }
+ }).filter(new Predicate<CubingJob>() {
+ @Override
+ public boolean apply(CubingJob executable) {
+ if (null == projectName || null == getProjectManager().getProject(projectName)) {
+ return true;
+ } else {
+ ProjectInstance project = getProjectManager().getProject(projectName);
+ return project.containsRealization(RealizationType.CUBE, executable.getCubeName());
+ }
+ }
+ }).filter(new Predicate<CubingJob>() {
+ @Override
+ public boolean apply(CubingJob executable) {
+ return statusList.contains(allOutputs.get(executable.getId()).getState());
+ }
+ }));
+ return results;
+ }
+
protected List<CubingJob> listAllCubingJobs(final String cubeName, final String projectName, final Set<ExecutableState> statusList) {
return listAllCubingJobs(cubeName, projectName, statusList, getExecutableManager().getAllOutputs());
}
http://git-wip-us.apache.org/repos/asf/kylin/blob/9e02ef1e/server/src/main/java/org/apache/kylin/rest/service/JobService.java
----------------------------------------------------------------------
diff --git a/server/src/main/java/org/apache/kylin/rest/service/JobService.java b/server/src/main/java/org/apache/kylin/rest/service/JobService.java
index 1386656..a19e4f1 100644
--- a/server/src/main/java/org/apache/kylin/rest/service/JobService.java
+++ b/server/src/main/java/org/apache/kylin/rest/service/JobService.java
@@ -19,11 +19,7 @@
package org.apache.kylin.rest.service;
import java.io.IOException;
-import java.util.Collections;
-import java.util.EnumSet;
-import java.util.List;
-import java.util.Map;
-import java.util.Set;
+import java.util.*;
import org.apache.commons.lang3.StringUtils;
import org.apache.kylin.cube.CubeInstance;
@@ -38,6 +34,7 @@ import org.apache.kylin.job.JobInstance;
import org.apache.kylin.job.common.ShellExecutable;
import org.apache.kylin.job.constant.JobStatusEnum;
import org.apache.kylin.job.constant.JobStepStatusEnum;
+import org.apache.kylin.job.constant.JobTimeFilterEnum;
import org.apache.kylin.job.exception.JobException;
import org.apache.kylin.job.execution.AbstractExecutable;
import org.apache.kylin.job.execution.DefaultChainedExecutable;
@@ -69,6 +66,32 @@ public class JobService extends BasicService {
@Autowired
private AccessService accessService;
+ public List<JobInstance> listAllJobs(final String cubeName, final String projectName, final List<JobStatusEnum> statusList, final Integer limitValue, final Integer offsetValue, final JobTimeFilterEnum timeFilter) throws IOException, JobException {
+ Integer limit = (null == limitValue) ? 30 : limitValue;
+ Integer offset = (null == offsetValue) ? 0 : offsetValue;
+ List<JobInstance> jobs = listAllJobs(cubeName, projectName, statusList, timeFilter);
+ Collections.sort(jobs);
+
+ if (jobs.size() <= offset) {
+ return Collections.emptyList();
+ }
+
+ if ((jobs.size() - offset) < limit) {
+ return jobs.subList(offset, jobs.size());
+ }
+
+ return jobs.subList(offset, offset + limit);
+ }
+
+ public List<JobInstance> listAllJobs(final String cubeName, final String projectName, final List<JobStatusEnum> statusList, final JobTimeFilterEnum timeFilter) {
+ Calendar calendar= Calendar.getInstance();
+ calendar.setTime(new Date());
+ long currentTimeMillis = calendar.getTimeInMillis();
+ long timeStartInMillis = getTimeStartInMillis(calendar, timeFilter);
+ return listCubeJobInstance(cubeName, projectName, statusList, timeStartInMillis, currentTimeMillis);
+ }
+
+ @Deprecated
public List<JobInstance> listAllJobs(final String cubeName, final String projectName, final List<JobStatusEnum> statusList, final Integer limitValue, final Integer offsetValue) throws IOException, JobException {
Integer limit = (null == limitValue) ? 30 : limitValue;
Integer offset = (null == offsetValue) ? 0 : offsetValue;
@@ -90,6 +113,25 @@ public class JobService extends BasicService {
return listCubeJobInstance(cubeName, projectName, statusList);
}
+ private List<JobInstance> listCubeJobInstance(final String cubeName, final String projectName, List<JobStatusEnum> statusList, final long timeStartInMillis, final long timeEndInMillis) {
+ Set<ExecutableState> states;
+ if (statusList == null || statusList.isEmpty()) {
+ states = EnumSet.allOf(ExecutableState.class);
+ } else {
+ states = Sets.newHashSet();
+ for (JobStatusEnum status : statusList) {
+ states.add(parseToExecutableState(status));
+ }
+ }
+ final Map<String, Output> allOutputs = getExecutableManager().getAllOutputs(timeStartInMillis, timeEndInMillis);
+ return Lists.newArrayList(FluentIterable.from(listAllCubingJobs(cubeName, projectName, states, timeStartInMillis, timeEndInMillis, allOutputs)).transform(new Function<CubingJob, JobInstance>() {
+ @Override
+ public JobInstance apply(CubingJob cubingJob) {
+ return parseToJobInstance(cubingJob, allOutputs);
+ }
+ }));
+ }
+
private List<JobInstance> listCubeJobInstance(final String cubeName, final String projectName, List<JobStatusEnum> statusList) {
Set<ExecutableState> states;
if (statusList == null || statusList.isEmpty()) {
@@ -109,6 +151,27 @@ public class JobService extends BasicService {
}));
}
+ private long getTimeStartInMillis(Calendar calendar, JobTimeFilterEnum timeFilter) {
+ switch (timeFilter) {
+ case LAST_ONE_DAY:
+ calendar.add(Calendar.DAY_OF_MONTH, -1);
+ return calendar.getTimeInMillis();
+ case LAST_ONE_WEEK:
+ calendar.add(Calendar.WEEK_OF_MONTH, -1);
+ return calendar.getTimeInMillis();
+ case LAST_ONE_MONTH:
+ calendar.add(Calendar.MONTH, -1);
+ return calendar.getTimeInMillis();
+ case LAST_ONE_YEAR:
+ calendar.add(Calendar.YEAR, -1);
+ return calendar.getTimeInMillis();
+ case ALL:
+ return 0;
+ default:
+ throw new RuntimeException("illegal timeFilter for job history:" + timeFilter);
+ }
+ }
+
private ExecutableState parseToExecutableState(JobStatusEnum status) {
switch (status) {
case DISCARDED:
http://git-wip-us.apache.org/repos/asf/kylin/blob/9e02ef1e/storage-hbase/src/main/java/org/apache/kylin/storage/hbase/HBaseResourceStore.java
----------------------------------------------------------------------
diff --git a/storage-hbase/src/main/java/org/apache/kylin/storage/hbase/HBaseResourceStore.java b/storage-hbase/src/main/java/org/apache/kylin/storage/hbase/HBaseResourceStore.java
index 6f638c4..72192a5 100644
--- a/storage-hbase/src/main/java/org/apache/kylin/storage/hbase/HBaseResourceStore.java
+++ b/storage-hbase/src/main/java/org/apache/kylin/storage/hbase/HBaseResourceStore.java
@@ -40,7 +40,10 @@ import org.apache.hadoop.hbase.client.Put;
import org.apache.hadoop.hbase.client.Result;
import org.apache.hadoop.hbase.client.ResultScanner;
import org.apache.hadoop.hbase.client.Scan;
+import org.apache.hadoop.hbase.filter.CompareFilter;
+import org.apache.hadoop.hbase.filter.FilterList;
import org.apache.hadoop.hbase.filter.KeyOnlyFilter;
+import org.apache.hadoop.hbase.filter.SingleColumnValueFilter;
import org.apache.kylin.common.KylinConfig;
import org.apache.kylin.common.persistence.RawResource;
import org.apache.kylin.common.persistence.ResourceStore;
@@ -175,6 +178,53 @@ public class HBaseResourceStore extends ResourceStore {
return result;
}
+ @Override
+ protected List<RawResource> getAllResources(String rangeStart, String rangeEnd, long timeStartInMillis, long timeEndInMillis) throws IOException {
+ byte[] startRow = Bytes.toBytes(rangeStart);
+ byte[] endRow = plusZero(Bytes.toBytes(rangeEnd));
+
+ Scan scan = new Scan(startRow, endRow);
+ scan.addColumn(B_FAMILY, B_COLUMN_TS);
+ scan.addColumn(B_FAMILY, B_COLUMN);
+ scan.setFilter(generateTimeFilterList(timeStartInMillis, timeEndInMillis));
+
+ HTableInterface table = getConnection().getTable(getAllInOneTableName());
+ List<RawResource> result = Lists.newArrayList();
+ try {
+ ResultScanner scanner = table.getScanner(scan);
+ for (Result r : scanner) {
+ result.add(new RawResource(getInputStream(Bytes.toString(r.getRow()), r), getTimestamp(r)));
+ }
+ } catch (IOException e) {
+ for (RawResource rawResource : result) {
+ IOUtils.closeQuietly(rawResource.inputStream);
+ }
+ throw e;
+ } finally {
+ IOUtils.closeQuietly(table);
+ }
+ return result;
+ }
+
+ private FilterList generateTimeFilterList(long timeStartInMillis, long timeEndInMillis) {
+ FilterList filterList = new FilterList(FilterList.Operator.MUST_PASS_ALL);
+ SingleColumnValueFilter timeStartFilter = new SingleColumnValueFilter(
+ B_FAMILY,
+ B_COLUMN_TS,
+ CompareFilter.CompareOp.GREATER,
+ Bytes.toBytes(timeStartInMillis)
+ );
+ filterList.addFilter(timeStartFilter);
+ SingleColumnValueFilter timeEndFilter = new SingleColumnValueFilter(
+ B_FAMILY,
+ B_COLUMN_TS,
+ CompareFilter.CompareOp.LESS_OR_EQUAL,
+ Bytes.toBytes(timeEndInMillis)
+ );
+ filterList.addFilter(timeEndFilter);
+ return filterList;
+ }
+
private InputStream getInputStream(String resPath, Result r) throws IOException {
if (r == null) {
return null;
http://git-wip-us.apache.org/repos/asf/kylin/blob/9e02ef1e/webapp/app/js/controllers/job.js
----------------------------------------------------------------------
diff --git a/webapp/app/js/controllers/job.js b/webapp/app/js/controllers/job.js
index 4838529..c859c12 100644
--- a/webapp/app/js/controllers/job.js
+++ b/webapp/app/js/controllers/job.js
@@ -27,6 +27,7 @@ KylinApp
$scope.cubeName = null;
//$scope.projects = [];
$scope.action = {};
+ $scope.timeFilter = jobConfig.timeFilter[1];
$scope.status = [];
$scope.toggleSelection = function toggleSelection(current) {
@@ -61,7 +62,8 @@ KylinApp
projectName: $scope.state.projectName,
status: statusIds,
offset: offset,
- limit: limit
+ limit: limit,
+ timeFilter: $scope.timeFilter.value
};
$scope.state.loading = true;
http://git-wip-us.apache.org/repos/asf/kylin/blob/9e02ef1e/webapp/app/js/model/jobConfig.js
----------------------------------------------------------------------
diff --git a/webapp/app/js/model/jobConfig.js b/webapp/app/js/model/jobConfig.js
index 2e74845..af7044c 100644
--- a/webapp/app/js/model/jobConfig.js
+++ b/webapp/app/js/model/jobConfig.js
@@ -25,6 +25,13 @@ KylinApp.constant('jobConfig', {
{name: 'ERROR', value: 8},
{name: 'DISCARDED', value: 16}
],
+ timeFilter: [
+ {name: 'LAST ONE DAY', value: 0},
+ {name: 'LAST ONE WEEK', value: 1},
+ {name: 'LAST ONE MONTH', value: 2},
+ {name: 'LAST ONE YEAR', value: 3},
+ {name: 'ALL', value: 4},
+ ],
theaditems: [
{attr: 'name', name: 'Job Name'},
{attr: 'related_cube', name: 'Cube'},
http://git-wip-us.apache.org/repos/asf/kylin/blob/9e02ef1e/webapp/app/partials/jobs/jobs.html
----------------------------------------------------------------------
diff --git a/webapp/app/partials/jobs/jobs.html b/webapp/app/partials/jobs/jobs.html
index fca5f88..124a1f6 100644
--- a/webapp/app/partials/jobs/jobs.html
+++ b/webapp/app/partials/jobs/jobs.html
@@ -49,6 +49,12 @@
<div class="col-xs-10">
<!--STATUS-->
<div class="pull-right">
+ <!--Job History Time Filter-->
+ <label>Jobs in:
+ <select data-ng-model="timeFilter"
+ data-ng-options="s.name for s in jobConfig.timeFilter">
+ </select>
+ </label>
<label ng-repeat="s in jobConfig.allStatus" class="checkbox-inline" >
<input type="checkbox"
value="{{s.name}}"
[2/2] kylin git commit: minor improvements
Posted by ma...@apache.org.
minor improvements
Project: http://git-wip-us.apache.org/repos/asf/kylin/repo
Commit: http://git-wip-us.apache.org/repos/asf/kylin/commit/4ab1c141
Tree: http://git-wip-us.apache.org/repos/asf/kylin/tree/4ab1c141
Diff: http://git-wip-us.apache.org/repos/asf/kylin/diff/4ab1c141
Branch: refs/heads/2.x-staging
Commit: 4ab1c1411943e1ae241e3453fa4fe0572c4c8e8d
Parents: af060b2
Author: honma <ho...@ebay.com>
Authored: Tue Dec 15 16:56:44 2015 +0800
Committer: honma <ho...@ebay.com>
Committed: Tue Dec 15 18:56:38 2015 +0800
----------------------------------------------------------------------
.../org/apache/kylin/metadata/model/FunctionDesc.java | 14 +++++---------
.../metadata/datatype/BigDecimalSerializerTest.java | 4 +---
.../org/apache/kylin/invertedindex/model/IIDesc.java | 4 ++--
.../org/apache/kylin/query/routing/RoutingRule.java | 3 ---
.../storage/hbase/cube/v1/CubeTupleConverter.java | 3 +--
5 files changed, 9 insertions(+), 19 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/kylin/blob/4ab1c141/core-metadata/src/main/java/org/apache/kylin/metadata/model/FunctionDesc.java
----------------------------------------------------------------------
diff --git a/core-metadata/src/main/java/org/apache/kylin/metadata/model/FunctionDesc.java b/core-metadata/src/main/java/org/apache/kylin/metadata/model/FunctionDesc.java
index 39fe6b3..36c8722 100644
--- a/core-metadata/src/main/java/org/apache/kylin/metadata/model/FunctionDesc.java
+++ b/core-metadata/src/main/java/org/apache/kylin/metadata/model/FunctionDesc.java
@@ -73,17 +73,13 @@ public class FunctionDesc {
}
parameter.setColRefs(colRefs);
-
- // make sure sum/max/min returns the exact type as its input
- if ((isSum() || isMax() || isMin()) && colRefs.size() > 0) {
- setReturnType(colRefs.get(0).getDatatype());
- }
+
}
-
+
public MeasureType<?> getMeasureType() {
if (isDimensionAsMetric)
return null;
-
+
if (measureType == null) {
measureType = MeasureTypeFactory.create(getExpression(), getReturnDataType());
}
@@ -93,7 +89,7 @@ public class FunctionDesc {
public boolean needRewrite() {
if (isDimensionAsMetric)
return false;
-
+
return getMeasureType().needRewrite();
}
@@ -186,7 +182,7 @@ public class FunctionDesc {
}
return count;
}
-
+
public String getReturnType() {
return returnType;
}
http://git-wip-us.apache.org/repos/asf/kylin/blob/4ab1c141/core-metadata/src/test/java/org/apache/kylin/metadata/datatype/BigDecimalSerializerTest.java
----------------------------------------------------------------------
diff --git a/core-metadata/src/test/java/org/apache/kylin/metadata/datatype/BigDecimalSerializerTest.java b/core-metadata/src/test/java/org/apache/kylin/metadata/datatype/BigDecimalSerializerTest.java
index f920ee7..dcdc3d4 100644
--- a/core-metadata/src/test/java/org/apache/kylin/metadata/datatype/BigDecimalSerializerTest.java
+++ b/core-metadata/src/test/java/org/apache/kylin/metadata/datatype/BigDecimalSerializerTest.java
@@ -1,12 +1,10 @@
package org.apache.kylin.metadata.datatype;
-import static org.junit.Assert.*;
+import static org.junit.Assert.assertEquals;
import java.math.BigDecimal;
import java.nio.ByteBuffer;
-import org.apache.kylin.metadata.datatype.BigDecimalSerializer;
-import org.apache.kylin.metadata.datatype.DataType;
import org.junit.BeforeClass;
import org.junit.Test;
http://git-wip-us.apache.org/repos/asf/kylin/blob/4ab1c141/invertedindex/src/main/java/org/apache/kylin/invertedindex/model/IIDesc.java
----------------------------------------------------------------------
diff --git a/invertedindex/src/main/java/org/apache/kylin/invertedindex/model/IIDesc.java b/invertedindex/src/main/java/org/apache/kylin/invertedindex/model/IIDesc.java
index 452e3a3..cf57519 100644
--- a/invertedindex/src/main/java/org/apache/kylin/invertedindex/model/IIDesc.java
+++ b/invertedindex/src/main/java/org/apache/kylin/invertedindex/model/IIDesc.java
@@ -94,7 +94,7 @@ public class IIDesc extends RootPersistentEntity {
@JsonProperty("storage_type")
private int storageType = IStorageAware.ID_HBASE;
-
+
@JsonProperty("signature")
private String signature;
@@ -230,7 +230,7 @@ public class IIDesc extends RootPersistentEntity {
p1.setColRefs(ImmutableList.of(new TblColRef(columnDesc)));
f1.setParameter(p1);
f1.setReturnType(returnType);
- if (f1.getReturnDataType().isIntegerFamily()) {
+ if (f1.isSum() && f1.getReturnDataType().isIntegerFamily()) {
f1.setReturnType("bigint");
}
http://git-wip-us.apache.org/repos/asf/kylin/blob/4ab1c141/query/src/main/java/org/apache/kylin/query/routing/RoutingRule.java
----------------------------------------------------------------------
diff --git a/query/src/main/java/org/apache/kylin/query/routing/RoutingRule.java b/query/src/main/java/org/apache/kylin/query/routing/RoutingRule.java
index 715f6d1..cb42412 100644
--- a/query/src/main/java/org/apache/kylin/query/routing/RoutingRule.java
+++ b/query/src/main/java/org/apache/kylin/query/routing/RoutingRule.java
@@ -36,9 +36,6 @@ public abstract class RoutingRule {
private static final Logger logger = LoggerFactory.getLogger(QueryRouter.class);
private static List<RoutingRule> rules = Lists.newLinkedList();
- //TODO: two rules are left out:
- //1. simple query use II prior to cube
- //2. exact match prior to week match
static {
rules.add(new RemoveUncapableRealizationsRule());
rules.add(new RealizationSortRule());
http://git-wip-us.apache.org/repos/asf/kylin/blob/4ab1c141/storage-hbase/src/main/java/org/apache/kylin/storage/hbase/cube/v1/CubeTupleConverter.java
----------------------------------------------------------------------
diff --git a/storage-hbase/src/main/java/org/apache/kylin/storage/hbase/cube/v1/CubeTupleConverter.java b/storage-hbase/src/main/java/org/apache/kylin/storage/hbase/cube/v1/CubeTupleConverter.java
index 3b90dfa..b43a616 100644
--- a/storage-hbase/src/main/java/org/apache/kylin/storage/hbase/cube/v1/CubeTupleConverter.java
+++ b/storage-hbase/src/main/java/org/apache/kylin/storage/hbase/cube/v1/CubeTupleConverter.java
@@ -63,7 +63,6 @@ public class CubeTupleConverter {
for (int i = 0; i < dimCols.size(); i++) {
TblColRef col = dimCols.get(i);
dimensionTupleIdx[i] = tupleInfo.hasColumn(col) ? tupleInfo.getColumnIndex(col) : -1;
- measureTypes.add(null);
}
// pre-calculate metrics index mapping to tuple
@@ -157,7 +156,7 @@ public class CubeTupleConverter {
int[] measureIdx = metricsMeasureIdx[i];
int[] tupleIdx = metricsTupleIdx[i];
for (int j = 0; j < measureIdx.length; j++) {
- if (measureTypes.get(dimensionValues.size() + j) != null) {
+ if (measureTypes.get(j) != null) {
tuple.setMeasureValue(tupleIdx[j], measureValues[measureIdx[j]]);
}
}