You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@kylin.apache.org by li...@apache.org on 2016/03/20 11:43:42 UTC
[1/2] kylin git commit: KYLIN-1505 Combine guava filters with
Predicates.and
Repository: kylin
Updated Branches:
refs/heads/master 801fb83b2 -> 6df837fa7
KYLIN-1505 Combine guava filters with Predicates.and
Signed-off-by: Yang Li <li...@apache.org>
Project: http://git-wip-us.apache.org/repos/asf/kylin/repo
Commit: http://git-wip-us.apache.org/repos/asf/kylin/commit/340338b7
Tree: http://git-wip-us.apache.org/repos/asf/kylin/tree/340338b7
Diff: http://git-wip-us.apache.org/repos/asf/kylin/diff/340338b7
Branch: refs/heads/master
Commit: 340338b73be7303cdceed22ecf8a4f01daf4c306
Parents: 801fb83
Author: Hao Chen <ha...@apache.org>
Authored: Sun Mar 20 15:50:26 2016 +0800
Committer: Yang Li <li...@apache.org>
Committed: Sun Mar 20 15:50:26 2016 +0800
----------------------------------------------------------------------
.../apache/kylin/rest/service/BasicService.java | 7 +++--
.../apache/kylin/rest/service/JobService.java | 29 +++++++++-----------
2 files changed, 17 insertions(+), 19 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/kylin/blob/340338b7/server/src/main/java/org/apache/kylin/rest/service/BasicService.java
----------------------------------------------------------------------
diff --git a/server/src/main/java/org/apache/kylin/rest/service/BasicService.java b/server/src/main/java/org/apache/kylin/rest/service/BasicService.java
index 5fea710..7696c59 100644
--- a/server/src/main/java/org/apache/kylin/rest/service/BasicService.java
+++ b/server/src/main/java/org/apache/kylin/rest/service/BasicService.java
@@ -45,6 +45,7 @@ import org.apache.kylin.storage.hybrid.HybridManager;
import com.google.common.base.Function;
import com.google.common.base.Predicate;
+import com.google.common.base.Predicates;
import com.google.common.collect.FluentIterable;
import com.google.common.collect.Lists;
@@ -122,7 +123,7 @@ public abstract class BasicService {
public CubingJob apply(AbstractExecutable executable) {
return (CubingJob) executable;
}
- }).filter(new Predicate<CubingJob>() {
+ }).filter(Predicates.and(new Predicate<CubingJob>() {
@Override
public boolean apply(CubingJob executable) {
if (null == projectName || null == getProjectManager().getProject(projectName)) {
@@ -132,12 +133,12 @@ public abstract class BasicService {
return project.containsRealization(RealizationType.CUBE, CubingExecutableUtil.getCubeName(executable.getParams()));
}
}
- }).filter(new Predicate<CubingJob>() {
+ }, new Predicate<CubingJob>() {
@Override
public boolean apply(CubingJob executable) {
return statusList.contains(allOutputs.get(executable.getId()).getState());
}
- }));
+ })));
return results;
}
http://git-wip-us.apache.org/repos/asf/kylin/blob/340338b7/server/src/main/java/org/apache/kylin/rest/service/JobService.java
----------------------------------------------------------------------
diff --git a/server/src/main/java/org/apache/kylin/rest/service/JobService.java b/server/src/main/java/org/apache/kylin/rest/service/JobService.java
index d0c29a4..1a35d0d 100644
--- a/server/src/main/java/org/apache/kylin/rest/service/JobService.java
+++ b/server/src/main/java/org/apache/kylin/rest/service/JobService.java
@@ -122,15 +122,7 @@ public class JobService extends BasicService {
}
private List<JobInstance> listCubeJobInstance(final String cubeName, final String projectName, List<JobStatusEnum> statusList, final long timeStartInMillis, final long timeEndInMillis) {
- Set<ExecutableState> states;
- if (statusList == null || statusList.isEmpty()) {
- states = EnumSet.allOf(ExecutableState.class);
- } else {
- states = Sets.newHashSet();
- for (JobStatusEnum status : statusList) {
- states.add(parseToExecutableState(status));
- }
- }
+ Set<ExecutableState> states = convertStatusEnumToStates(statusList);
final Map<String, Output> allOutputs = getExecutableManager().getAllOutputs(timeStartInMillis, timeEndInMillis);
return Lists.newArrayList(FluentIterable.from(listAllCubingJobs(cubeName, projectName, states, timeStartInMillis, timeEndInMillis, allOutputs)).transform(new Function<CubingJob, JobInstance>() {
@Override
@@ -141,6 +133,17 @@ public class JobService extends BasicService {
}
private List<JobInstance> listCubeJobInstance(final String cubeName, final String projectName, List<JobStatusEnum> statusList) {
+ Set<ExecutableState> states = convertStatusEnumToStates(statusList);
+ final Map<String, Output> allOutputs = getExecutableManager().getAllOutputs();
+ return Lists.newArrayList(FluentIterable.from(listAllCubingJobs(cubeName, projectName, states, allOutputs)).transform(new Function<CubingJob, JobInstance>() {
+ @Override
+ public JobInstance apply(CubingJob cubingJob) {
+ return parseToJobInstance(cubingJob, allOutputs);
+ }
+ }));
+ }
+
+ private Set<ExecutableState> convertStatusEnumToStates(List<JobStatusEnum> statusList) {
Set<ExecutableState> states;
if (statusList == null || statusList.isEmpty()) {
states = EnumSet.allOf(ExecutableState.class);
@@ -150,13 +153,7 @@ public class JobService extends BasicService {
states.add(parseToExecutableState(status));
}
}
- final Map<String, Output> allOutputs = getExecutableManager().getAllOutputs();
- return Lists.newArrayList(FluentIterable.from(listAllCubingJobs(cubeName, projectName, states, allOutputs)).transform(new Function<CubingJob, JobInstance>() {
- @Override
- public JobInstance apply(CubingJob cubingJob) {
- return parseToJobInstance(cubingJob, allOutputs);
- }
- }));
+ return states;
}
private long getTimeStartInMillis(Calendar calendar, JobTimeFilterEnum timeFilter) {
[2/2] kylin git commit: KYLIN-1506 Refactor resource interface for
timeseries-based data like jobs for better performance (with Hao Chen
)
Posted by li...@apache.org.
KYLIN-1506 Refactor resource interface for timeseries-based data like jobs
for better performance (with Hao Chen <ha...@apache.org>)
This closes #31
Project: http://git-wip-us.apache.org/repos/asf/kylin/repo
Commit: http://git-wip-us.apache.org/repos/asf/kylin/commit/6df837fa
Tree: http://git-wip-us.apache.org/repos/asf/kylin/tree/6df837fa
Diff: http://git-wip-us.apache.org/repos/asf/kylin/diff/6df837fa
Branch: refs/heads/master
Commit: 6df837fa7abbeba0edd13e099150dc1590e31761
Parents: 340338b
Author: Yang Li <li...@apache.org>
Authored: Sun Mar 20 18:41:48 2016 +0800
Committer: Yang Li <li...@apache.org>
Committed: Sun Mar 20 18:41:48 2016 +0800
----------------------------------------------------------------------
.../common/persistence/FileResourceStore.java | 44 ++---
.../kylin/common/persistence/ResourceStore.java | 49 +++--
.../kylin/common/persistence/ResourceTool.java | 3 +-
.../persistence/LocalFileResourceStoreTest.java | 131 +------------
.../common/persistence/ResourceStoreTest.java | 189 +++++++++++++++++++
.../org/apache/kylin/cube/CubeManagerTest.java | 18 +-
.../apache/kylin/dict/DictionaryManager.java | 18 +-
.../org/apache/kylin/job/dao/ExecutableDao.java | 36 +---
.../storage/hbase/ITHBaseResourceStoreTest.java | 132 +------------
.../kylin/storage/hbase/HBaseResourceStore.java | 121 ++++++------
10 files changed, 323 insertions(+), 418 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/kylin/blob/6df837fa/core-common/src/main/java/org/apache/kylin/common/persistence/FileResourceStore.java
----------------------------------------------------------------------
diff --git a/core-common/src/main/java/org/apache/kylin/common/persistence/FileResourceStore.java b/core-common/src/main/java/org/apache/kylin/common/persistence/FileResourceStore.java
index 1ab659f..c1c62fb 100644
--- a/core-common/src/main/java/org/apache/kylin/common/persistence/FileResourceStore.java
+++ b/core-common/src/main/java/org/apache/kylin/common/persistence/FileResourceStore.java
@@ -24,13 +24,12 @@ import java.io.FileInputStream;
import java.io.FileOutputStream;
import java.io.IOException;
import java.io.InputStream;
-import java.util.ArrayList;
+import java.util.Collections;
import java.util.List;
import java.util.NavigableSet;
import java.util.TreeSet;
import org.apache.commons.io.IOUtils;
-import org.apache.commons.lang3.StringUtils;
import org.apache.kylin.common.KylinConfig;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
@@ -51,13 +50,13 @@ public class FileResourceStore extends ResourceStore {
}
@Override
- protected NavigableSet<String> listResourcesImpl(String resPath) throws IOException {
- String[] names = file(resPath).list();
+ protected NavigableSet<String> listResourcesImpl(String folderPath) throws IOException {
+ String[] names = file(folderPath).list();
if (names == null) // not a directory
return null;
TreeSet<String> r = new TreeSet<>();
- String prefix = resPath.endsWith("/") ? resPath : resPath + "/";
+ String prefix = folderPath.endsWith("/") ? folderPath : folderPath + "/";
for (String n : names) {
r.add(prefix + n);
}
@@ -67,39 +66,32 @@ public class FileResourceStore extends ResourceStore {
@Override
protected boolean existsImpl(String resPath) throws IOException {
File f = file(resPath);
- return f.exists() && f.isFile(); // directory is not considered a
- // resource
+ return f.exists() && f.isFile(); // directory is not considered a resource
}
@Override
- protected List<RawResource> getAllResources(String rangeStart, String rangeEnd) throws IOException {
- List<RawResource> result = Lists.newArrayList();
+ protected List<RawResource> getAllResourcesImpl(String folderPath, long timeStart, long timeEndExclusive) throws IOException {
+ NavigableSet<String> resources = listResources(folderPath);
+ if (resources == null)
+ return Collections.emptyList();
+
+ List<RawResource> result = Lists.newArrayListWithCapacity(resources.size());
try {
- String commonPrefix = StringUtils.getCommonPrefix(rangeEnd, rangeStart);
- commonPrefix = commonPrefix.substring(0, commonPrefix.lastIndexOf("/") + 1);
- final NavigableSet<String> resources = listResourcesImpl(commonPrefix);
- for (String resource : resources) {
- if (resource.compareTo(rangeStart) >= 0 && resource.compareTo(rangeEnd) <= 0) {
- if (existsImpl(resource)) {
- result.add(getResourceImpl(resource));
- }
+ for (String res : resources) {
+ long ts = getResourceTimestampImpl(res);
+ if (timeStart <= ts && ts < timeEndExclusive) {
+ RawResource resource = getResourceImpl(res);
+ if (resource != null) // can be null if is a sub-folder
+ result.add(resource);
}
}
- return result;
} catch (IOException ex) {
for (RawResource rawResource : result) {
IOUtils.closeQuietly(rawResource.inputStream);
}
throw ex;
- } catch (Exception ex) {
- throw new UnsupportedOperationException(ex);
}
- }
-
- @Override
- protected List<RawResource> getAllResources(String rangeStart, String rangeEnd, long timeStartInMillis, long timeEndInMillis) throws IOException {
- //just ignore time filter
- return getAllResources(rangeStart, rangeEnd);
+ return result;
}
@Override
http://git-wip-us.apache.org/repos/asf/kylin/blob/6df837fa/core-common/src/main/java/org/apache/kylin/common/persistence/ResourceStore.java
----------------------------------------------------------------------
diff --git a/core-common/src/main/java/org/apache/kylin/common/persistence/ResourceStore.java b/core-common/src/main/java/org/apache/kylin/common/persistence/ResourceStore.java
index 4e9e904..746527d 100644
--- a/core-common/src/main/java/org/apache/kylin/common/persistence/ResourceStore.java
+++ b/core-common/src/main/java/org/apache/kylin/common/persistence/ResourceStore.java
@@ -114,19 +114,17 @@ abstract public class ResourceStore {
}
/**
- * return a list of child resources & folders under given path, return null
- * if given path is not a folder
+ * List resources and sub-folders under a given folder, return null if given path is not a folder
*/
- final public NavigableSet<String> listResources(String resPath) throws IOException {
- resPath = norm(resPath);
- return listResourcesImpl(resPath);
+ final public NavigableSet<String> listResources(String folderPath) throws IOException {
+ String path = norm(folderPath);
+ return listResourcesImpl(path);
}
- abstract protected NavigableSet<String> listResourcesImpl(String resPath) throws IOException;
+ abstract protected NavigableSet<String> listResourcesImpl(String folderPath) throws IOException;
/**
- * return true if a resource exists, return false in case of folder or
- * non-exist
+ * Return true if a resource exists, return false in case of folder or non-exist
*/
final public boolean exists(String resPath) throws IOException {
return existsImpl(norm(resPath));
@@ -135,7 +133,7 @@ abstract public class ResourceStore {
abstract protected boolean existsImpl(String resPath) throws IOException;
/**
- * read a resource, return null in case of not found
+ * Read a resource, return null in case of not found or is a folder
*/
final public <T extends RootPersistentEntity> T getResource(String resPath, Class<T> clz, Serializer<T> serializer) throws IOException {
resPath = norm(resPath);
@@ -162,16 +160,22 @@ abstract public class ResourceStore {
return getResourceTimestampImpl(norm(resPath));
}
- final public <T extends RootPersistentEntity> List<T> getAllResources(String rangeStart, String rangeEnd, Class<T> clazz, Serializer<T> serializer) throws IOException {
- return getAllResources(rangeStart, rangeEnd, -1L, -1L, clazz, serializer);
+ /**
+ * Read all resources under a folder. Return empty list if folder not exist.
+ */
+ final public <T extends RootPersistentEntity> List<T> getAllResources(String folderPath, Class<T> clazz, Serializer<T> serializer) throws IOException {
+ return getAllResources(folderPath, Long.MIN_VALUE, Long.MAX_VALUE, clazz, serializer);
}
- final public <T extends RootPersistentEntity> List<T> getAllResources(String rangeStart, String rangeEnd, long timeStartInMillis, long timeEndInMillis, Class<T> clazz, Serializer<T> serializer) throws IOException {
- final List<RawResource> allResources = getAllResources(rangeStart, rangeEnd, timeStartInMillis, timeEndInMillis);
- if (allResources.isEmpty()) {
+ /**
+ * Read all resources under a folder having last modified time between given range. Return empty list if folder not exist.
+ */
+ final public <T extends RootPersistentEntity> List<T> getAllResources(String folderPath, long timeStart, long timeEndExclusive, Class<T> clazz, Serializer<T> serializer) throws IOException {
+ final List<RawResource> allResources = getAllResourcesImpl(folderPath, timeStart, timeEndExclusive);
+ if (allResources == null || allResources.isEmpty()) {
return Collections.emptyList();
}
- List<T> result = Lists.newArrayList();
+ List<T> result = Lists.newArrayListWithCapacity(allResources.size());
try {
for (RawResource rawResource : allResources) {
final T element = serializer.deserialize(new DataInputStream(rawResource.inputStream));
@@ -181,14 +185,13 @@ abstract public class ResourceStore {
return result;
} finally {
for (RawResource rawResource : allResources) {
- IOUtils.closeQuietly(rawResource.inputStream);
+ if (rawResource != null)
+ IOUtils.closeQuietly(rawResource.inputStream);
}
}
}
- abstract protected List<RawResource> getAllResources(String rangeStart, String rangeEnd) throws IOException;
-
- abstract protected List<RawResource> getAllResources(String rangeStart, String rangeEnd, long timeStartInMillis, long timeEndInMillis) throws IOException;
+ abstract protected List<RawResource> getAllResourcesImpl(String folderPath, long timeStart, long timeEndExclusive) throws IOException;
/** returns null if not exists */
abstract protected RawResource getResourceImpl(String resPath) throws IOException;
@@ -211,11 +214,17 @@ abstract public class ResourceStore {
* check & set, overwrite a resource
*/
final public <T extends RootPersistentEntity> long putResource(String resPath, T obj, Serializer<T> serializer) throws IOException {
+ return putResource(resPath, obj, System.currentTimeMillis(), serializer);
+ }
+
+ /**
+ * check & set, overwrite a resource
+ */
+ final public <T extends RootPersistentEntity> long putResource(String resPath, T obj, long newTS, Serializer<T> serializer) throws IOException {
resPath = norm(resPath);
//logger.debug("Saving resource " + resPath + " (Store " + kylinConfig.getMetadataUrl() + ")");
long oldTS = obj.getLastModified();
- long newTS = System.currentTimeMillis();
obj.setLastModified(newTS);
try {
http://git-wip-us.apache.org/repos/asf/kylin/blob/6df837fa/core-common/src/main/java/org/apache/kylin/common/persistence/ResourceTool.java
----------------------------------------------------------------------
diff --git a/core-common/src/main/java/org/apache/kylin/common/persistence/ResourceTool.java b/core-common/src/main/java/org/apache/kylin/common/persistence/ResourceTool.java
index 56f855c..c0f3fd9 100644
--- a/core-common/src/main/java/org/apache/kylin/common/persistence/ResourceTool.java
+++ b/core-common/src/main/java/org/apache/kylin/common/persistence/ResourceTool.java
@@ -22,7 +22,6 @@ import java.io.BufferedReader;
import java.io.IOException;
import java.io.InputStream;
import java.io.InputStreamReader;
-import java.util.ArrayList;
import java.util.List;
import java.util.NavigableSet;
@@ -169,7 +168,7 @@ public class ResourceTool {
resetR(store, "/");
}
- private static void resetR(ResourceStore store, String path) throws IOException {
+ public static void resetR(ResourceStore store, String path) throws IOException {
NavigableSet<String> children = store.listResources(path);
if (children == null) { // path is a resource (not a folder)
if (matchFilter(path)) {
http://git-wip-us.apache.org/repos/asf/kylin/blob/6df837fa/core-common/src/test/java/org/apache/kylin/common/persistence/LocalFileResourceStoreTest.java
----------------------------------------------------------------------
diff --git a/core-common/src/test/java/org/apache/kylin/common/persistence/LocalFileResourceStoreTest.java b/core-common/src/test/java/org/apache/kylin/common/persistence/LocalFileResourceStoreTest.java
index 7ba5329..cc6143d 100644
--- a/core-common/src/test/java/org/apache/kylin/common/persistence/LocalFileResourceStoreTest.java
+++ b/core-common/src/test/java/org/apache/kylin/common/persistence/LocalFileResourceStoreTest.java
@@ -18,18 +18,6 @@
package org.apache.kylin.common.persistence;
-import static org.junit.Assert.assertEquals;
-import static org.junit.Assert.assertNull;
-import static org.junit.Assert.assertTrue;
-import static org.junit.Assert.fail;
-
-import java.io.DataInputStream;
-import java.io.DataOutputStream;
-import java.io.IOException;
-import java.util.ArrayList;
-import java.util.NavigableSet;
-
-import org.apache.commons.lang.StringUtils;
import org.apache.kylin.common.KylinConfig;
import org.apache.kylin.common.util.LocalFileMetadataTestCase;
import org.junit.After;
@@ -50,123 +38,8 @@ public class LocalFileResourceStoreTest extends LocalFileMetadataTestCase {
@Test
public void testFileStore() throws Exception {
- testAStore(ResourceStore.getStore(KylinConfig.getInstanceFromEnv()));
- }
-
- void testAStore(ResourceStore store) throws IOException {
- String dir1 = "/cube";
- String path1 = "/cube/_test.json";
- StringEntity content1 = new StringEntity("anything");
- String dir2 = "/table";
- String path2 = "/table/_test.json";
- StringEntity content2 = new StringEntity("something");
-
- // cleanup legacy if any
- store.deleteResource(path1);
- store.deleteResource(path2);
-
- StringEntity t;
-
- // put/get
- store.putResource(path1, content1, StringEntity.serializer);
- assertTrue(store.exists(path1));
- t = store.getResource(path1, StringEntity.class, StringEntity.serializer);
- assertEquals(content1, t);
-
- store.putResource(path2, content2, StringEntity.serializer);
- assertTrue(store.exists(path2));
- t = store.getResource(path2, StringEntity.class, StringEntity.serializer);
- assertEquals(content2, t);
-
- // overwrite
- t.str = "new string";
- store.putResource(path2, t, StringEntity.serializer);
-
- // write conflict
- try {
- t.setLastModified(t.lastModified - 1);
- store.putResource(path2, t, StringEntity.serializer);
- fail("write conflict should trigger IllegalStateException");
- } catch (IllegalStateException e) {
- // expected
- }
-
- // list
- NavigableSet<String> list;
-
- list = store.listResources(dir1);
- assertTrue(list.contains(path1));
- assertTrue(list.contains(path2) == false);
-
- list = store.listResources(dir2);
- assertTrue(list.contains(path2));
- assertTrue(list.contains(path1) == false);
-
- list = store.listResources("/");
- assertTrue(list.contains(dir1));
- assertTrue(list.contains(dir2));
- assertTrue(list.contains(path1) == false);
- assertTrue(list.contains(path2) == false);
-
- list = store.listResources(path1);
- assertNull(list);
- list = store.listResources(path2);
- assertNull(list);
-
- // delete/exist
- store.deleteResource(path1);
- assertTrue(store.exists(path1) == false);
- list = store.listResources(dir1);
- assertTrue(list == null || list.contains(path1) == false);
-
- store.deleteResource(path2);
- assertTrue(store.exists(path2) == false);
- list = store.listResources(dir2);
- assertTrue(list == null || list.contains(path2) == false);
- }
-
- public static class StringEntity extends RootPersistentEntity {
-
- static final Serializer<StringEntity> serializer = new Serializer<StringEntity>() {
- @Override
- public void serialize(StringEntity obj, DataOutputStream out) throws IOException {
- out.writeUTF(obj.str);
- }
-
- @Override
- public StringEntity deserialize(DataInputStream in) throws IOException {
- String str = in.readUTF();
- return new StringEntity(str);
- }
- };
-
- String str;
-
- public StringEntity(String str) {
- this.str = str;
- }
-
- @Override
- public int hashCode() {
- final int prime = 31;
- int result = super.hashCode();
- result = prime * result + ((str == null) ? 0 : str.hashCode());
- return result;
- }
-
- @Override
- public boolean equals(Object obj) {
- if (obj == this)
- return true;
- if (!(obj instanceof StringEntity))
- return false;
- return StringUtils.equals(this.str, ((StringEntity) obj).str);
- }
-
- @Override
- public String toString() {
- return str;
- }
+ ResourceStore store = ResourceStore.getStore(KylinConfig.getInstanceFromEnv());
+ ResourceStoreTest.testAStore(store);
}
}
http://git-wip-us.apache.org/repos/asf/kylin/blob/6df837fa/core-common/src/test/java/org/apache/kylin/common/persistence/ResourceStoreTest.java
----------------------------------------------------------------------
diff --git a/core-common/src/test/java/org/apache/kylin/common/persistence/ResourceStoreTest.java b/core-common/src/test/java/org/apache/kylin/common/persistence/ResourceStoreTest.java
new file mode 100644
index 0000000..69fc6d8
--- /dev/null
+++ b/core-common/src/test/java/org/apache/kylin/common/persistence/ResourceStoreTest.java
@@ -0,0 +1,189 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+*/
+
+package org.apache.kylin.common.persistence;
+
+import static org.junit.Assert.*;
+
+import java.io.DataInputStream;
+import java.io.DataOutputStream;
+import java.io.IOException;
+import java.util.List;
+import java.util.NavigableSet;
+
+import org.apache.commons.lang.StringUtils;
+
+/**
+ * Be called by LocalFileResourceStoreTest and ITHBaseResourceStoreTest.
+ */
+public class ResourceStoreTest {
+
+ public static void testAStore(ResourceStore store) throws IOException {
+ testBasics(store);
+ testGetAllResources(store);
+ }
+
+ private static void testGetAllResources(ResourceStore store) throws IOException {
+ final String folder = "/testFolder";
+ List<StringEntity> result;
+
+ // reset any leftover garbage
+ ResourceTool.resetR(store, folder);
+
+ store.putResource(folder + "/res1", new StringEntity("data1"), 1000, StringEntity.serializer);
+ store.putResource(folder + "/res2", new StringEntity("data2"), 2000, StringEntity.serializer);
+ store.putResource(folder + "/sub/res3", new StringEntity("data3"), 3000, StringEntity.serializer);
+ store.putResource(folder + "/res4", new StringEntity("data4"), 4000, StringEntity.serializer);
+
+ result = store.getAllResources(folder, StringEntity.class, StringEntity.serializer);
+ assertEntity(result.get(0), "data1", 1000);
+ assertEntity(result.get(1), "data2", 2000);
+ assertEntity(result.get(2), "data4", 4000);
+ assertEquals(3, result.size());
+
+ result = store.getAllResources(folder, 2000, 4000, StringEntity.class, StringEntity.serializer);
+ assertEntity(result.get(0), "data2", 2000);
+ assertEquals(1, result.size());
+
+ ResourceTool.resetR(store, folder);
+ }
+
+ private static void assertEntity(StringEntity entity, String data, int ts) {
+ assertEquals(data, entity.str);
+ assertEquals(ts, entity.lastModified);
+ }
+
+ private static void testBasics(ResourceStore store) throws IOException {
+ String dir1 = "/cube";
+ String path1 = "/cube/_test.json";
+ StringEntity content1 = new StringEntity("anything");
+ String dir2 = "/table";
+ String path2 = "/table/_test.json";
+ StringEntity content2 = new StringEntity("something");
+
+ // cleanup legacy if any
+ store.deleteResource(path1);
+ store.deleteResource(path2);
+
+ StringEntity t;
+
+ // put/get
+ store.putResource(path1, content1, StringEntity.serializer);
+ assertTrue(store.exists(path1));
+ t = store.getResource(path1, StringEntity.class, StringEntity.serializer);
+ assertEquals(content1, t);
+
+ store.putResource(path2, content2, StringEntity.serializer);
+ assertTrue(store.exists(path2));
+ t = store.getResource(path2, StringEntity.class, StringEntity.serializer);
+ assertEquals(content2, t);
+
+ // overwrite
+ t.str = "new string";
+ store.putResource(path2, t, StringEntity.serializer);
+
+ // write conflict
+ try {
+ t.setLastModified(t.getLastModified() - 1);
+ store.putResource(path2, t, StringEntity.serializer);
+ fail("write conflict should trigger IllegalStateException");
+ } catch (IllegalStateException e) {
+ // expected
+ }
+
+ // list
+ NavigableSet<String> list;
+
+ list = store.listResources(dir1);
+ assertTrue(list.contains(path1));
+ assertTrue(list.contains(path2) == false);
+
+ list = store.listResources(dir2);
+ assertTrue(list.contains(path2));
+ assertTrue(list.contains(path1) == false);
+
+ list = store.listResources("/");
+ assertTrue(list.contains(dir1));
+ assertTrue(list.contains(dir2));
+ assertTrue(list.contains(path1) == false);
+ assertTrue(list.contains(path2) == false);
+
+ list = store.listResources(path1);
+ assertNull(list);
+ list = store.listResources(path2);
+ assertNull(list);
+
+ // delete/exist
+ store.deleteResource(path1);
+ assertTrue(store.exists(path1) == false);
+ list = store.listResources(dir1);
+ assertTrue(list == null || list.contains(path1) == false);
+
+ store.deleteResource(path2);
+ assertTrue(store.exists(path2) == false);
+ list = store.listResources(dir2);
+ assertTrue(list == null || list.contains(path2) == false);
+ }
+
+ @SuppressWarnings("serial")
+ public static class StringEntity extends RootPersistentEntity {
+
+ public static final Serializer<StringEntity> serializer = new Serializer<StringEntity>() {
+ @Override
+ public void serialize(StringEntity obj, DataOutputStream out) throws IOException {
+ out.writeUTF(obj.str);
+ }
+
+ @Override
+ public StringEntity deserialize(DataInputStream in) throws IOException {
+ String str = in.readUTF();
+ return new StringEntity(str);
+ }
+ };
+
+ String str;
+
+ public StringEntity(String str) {
+ this.str = str;
+ }
+
+ @Override
+ public int hashCode() {
+ final int prime = 31;
+ int result = super.hashCode();
+ result = prime * result + ((str == null) ? 0 : str.hashCode());
+ return result;
+ }
+
+ @Override
+ public boolean equals(Object obj) {
+ if (obj == this)
+ return true;
+ if (!(obj instanceof StringEntity))
+ return false;
+ return StringUtils.equals(this.str, ((StringEntity) obj).str);
+ }
+
+ @Override
+ public String toString() {
+ return str;
+ }
+ }
+
+
+}
http://git-wip-us.apache.org/repos/asf/kylin/blob/6df837fa/core-cube/src/test/java/org/apache/kylin/cube/CubeManagerTest.java
----------------------------------------------------------------------
diff --git a/core-cube/src/test/java/org/apache/kylin/cube/CubeManagerTest.java b/core-cube/src/test/java/org/apache/kylin/cube/CubeManagerTest.java
index 18a8150..409e01d 100644
--- a/core-cube/src/test/java/org/apache/kylin/cube/CubeManagerTest.java
+++ b/core-cube/src/test/java/org/apache/kylin/cube/CubeManagerTest.java
@@ -18,6 +18,11 @@
package org.apache.kylin.cube;
+import static org.junit.Assert.*;
+
+import java.util.List;
+import java.util.NavigableSet;
+
import org.apache.kylin.common.KylinConfig;
import org.apache.kylin.common.persistence.ResourceStore;
import org.apache.kylin.common.util.JsonUtil;
@@ -30,12 +35,6 @@ import org.junit.After;
import org.junit.Before;
import org.junit.Test;
-import java.util.Iterator;
-import java.util.List;
-import java.util.NavigableSet;
-
-import static org.junit.Assert.*;
-
/**
* @author yangli9
*/
@@ -126,15 +125,10 @@ public class CubeManagerTest extends LocalFileMetadataTestCase {
public void testGetAllCubes() throws Exception {
final ResourceStore store = ResourceStore.getStore(getTestConfig());
final NavigableSet<String> cubePath = store.listResources(ResourceStore.CUBE_RESOURCE_ROOT);
- final Iterator<String> iterator = cubePath.iterator();
- final String firstPath = iterator.next();
- final String secondPath = iterator.next();
- final String lastPath = cubePath.last();
assertTrue(cubePath.size() > 1);
- final List<CubeInstance> cubes = store.getAllResources(firstPath, lastPath, CubeInstance.class, CubeManager.CUBE_SERIALIZER);
+ final List<CubeInstance> cubes = store.getAllResources(ResourceStore.CUBE_RESOURCE_ROOT, CubeInstance.class, CubeManager.CUBE_SERIALIZER);
assertEquals(cubePath.size(), cubes.size());
- assertEquals(cubePath.size() - 1, store.getAllResources(secondPath, lastPath, CubeInstance.class, CubeManager.CUBE_SERIALIZER).size());
}
@Test
http://git-wip-us.apache.org/repos/asf/kylin/blob/6df837fa/core-dictionary/src/main/java/org/apache/kylin/dict/DictionaryManager.java
----------------------------------------------------------------------
diff --git a/core-dictionary/src/main/java/org/apache/kylin/dict/DictionaryManager.java b/core-dictionary/src/main/java/org/apache/kylin/dict/DictionaryManager.java
index b3ed2ea..90ec35e 100644
--- a/core-dictionary/src/main/java/org/apache/kylin/dict/DictionaryManager.java
+++ b/core-dictionary/src/main/java/org/apache/kylin/dict/DictionaryManager.java
@@ -311,13 +311,8 @@ public class DictionaryManager {
}
private String checkDupByInfo(DictionaryInfo dictInfo) throws IOException {
- ResourceStore store = MetadataManager.getInstance(config).getStore();
- NavigableSet<String> existings = store.listResources(dictInfo.getResourceDir());
- if (existings == null || existings.isEmpty()) {
- return null;
- }
-
- final List<DictionaryInfo> allResources = MetadataManager.getInstance(config).getStore().getAllResources(existings.first(), existings.last(), DictionaryInfo.class, DictionaryInfoSerializer.INFO_SERIALIZER);
+ final ResourceStore store = MetadataManager.getInstance(config).getStore();
+ final List<DictionaryInfo> allResources = store.getAllResources(dictInfo.getResourceDir(), DictionaryInfo.class, DictionaryInfoSerializer.INFO_SERIALIZER);
TableSignature input = dictInfo.getInput();
@@ -330,13 +325,8 @@ public class DictionaryManager {
}
private DictionaryInfo findLargestDictInfo(DictionaryInfo dictInfo) throws IOException {
- ResourceStore store = MetadataManager.getInstance(config).getStore();
- NavigableSet<String> dictInfos = store.listResources(dictInfo.getResourceDir());
- if (dictInfos == null || dictInfos.isEmpty()) {
- return null;
- }
-
- final List<DictionaryInfo> allResources = MetadataManager.getInstance(config).getStore().getAllResources(dictInfos.first(), dictInfos.last(), DictionaryInfo.class, DictionaryInfoSerializer.INFO_SERIALIZER);
+ final ResourceStore store = MetadataManager.getInstance(config).getStore();
+ final List<DictionaryInfo> allResources = store.getAllResources(dictInfo.getResourceDir(), DictionaryInfo.class, DictionaryInfoSerializer.INFO_SERIALIZER);
DictionaryInfo largestDict = null;
for (DictionaryInfo dictionaryInfo : allResources) {
http://git-wip-us.apache.org/repos/asf/kylin/blob/6df837fa/core-job/src/main/java/org/apache/kylin/job/dao/ExecutableDao.java
----------------------------------------------------------------------
diff --git a/core-job/src/main/java/org/apache/kylin/job/dao/ExecutableDao.java b/core-job/src/main/java/org/apache/kylin/job/dao/ExecutableDao.java
index 493555d..58f845d 100644
--- a/core-job/src/main/java/org/apache/kylin/job/dao/ExecutableDao.java
+++ b/core-job/src/main/java/org/apache/kylin/job/dao/ExecutableDao.java
@@ -96,28 +96,16 @@ public class ExecutableDao {
public List<ExecutableOutputPO> getJobOutputs() throws PersistentException {
try {
- NavigableSet<String> resources = store.listResources(ResourceStore.EXECUTE_OUTPUT_RESOURCE_ROOT);
- if (resources == null || resources.isEmpty()) {
- return Collections.emptyList();
- }
- String rangeStart = resources.first();
- String rangeEnd = resources.last();
- return store.getAllResources(rangeStart, rangeEnd, ExecutableOutputPO.class, JOB_OUTPUT_SERIALIZER);
+ return store.getAllResources(ResourceStore.EXECUTE_OUTPUT_RESOURCE_ROOT, ExecutableOutputPO.class, JOB_OUTPUT_SERIALIZER);
} catch (IOException e) {
logger.error("error get all Jobs:", e);
throw new PersistentException(e);
}
}
- public List<ExecutableOutputPO> getJobOutputs(long timeStartInMillis, long timeEndInMillis) throws PersistentException {
+ public List<ExecutableOutputPO> getJobOutputs(long timeStart, long timeEndExclusive) throws PersistentException {
try {
- NavigableSet<String> resources = store.listResources(ResourceStore.EXECUTE_OUTPUT_RESOURCE_ROOT);
- if (resources == null || resources.isEmpty()) {
- return Collections.emptyList();
- }
- String rangeStart = resources.first();
- String rangeEnd = resources.last();
- return store.getAllResources(rangeStart, rangeEnd, timeStartInMillis, timeEndInMillis, ExecutableOutputPO.class, JOB_OUTPUT_SERIALIZER);
+ return store.getAllResources(ResourceStore.EXECUTE_OUTPUT_RESOURCE_ROOT, timeStart, timeEndExclusive, ExecutableOutputPO.class, JOB_OUTPUT_SERIALIZER);
} catch (IOException e) {
logger.error("error get all Jobs:", e);
throw new PersistentException(e);
@@ -126,28 +114,16 @@ public class ExecutableDao {
public List<ExecutablePO> getJobs() throws PersistentException {
try {
- final NavigableSet<String> jobIds = store.listResources(ResourceStore.EXECUTE_RESOURCE_ROOT);
- if (jobIds == null || jobIds.isEmpty()) {
- return Collections.emptyList();
- }
- String rangeStart = jobIds.first();
- String rangeEnd = jobIds.last();
- return store.getAllResources(rangeStart, rangeEnd, ExecutablePO.class, JOB_SERIALIZER);
+ return store.getAllResources(ResourceStore.EXECUTE_RESOURCE_ROOT, ExecutablePO.class, JOB_SERIALIZER);
} catch (IOException e) {
logger.error("error get all Jobs:", e);
throw new PersistentException(e);
}
}
- public List<ExecutablePO> getJobs(long timeStartInMillis, long timeEndInMillis) throws PersistentException {
+ public List<ExecutablePO> getJobs(long timeStart, long timeEndExclusive) throws PersistentException {
try {
- final NavigableSet<String> jobIds = store.listResources(ResourceStore.EXECUTE_RESOURCE_ROOT);
- if (jobIds == null || jobIds.isEmpty()) {
- return Collections.emptyList();
- }
- String rangeStart = jobIds.first();
- String rangeEnd = jobIds.last();
- return store.getAllResources(rangeStart, rangeEnd, timeStartInMillis, timeEndInMillis, ExecutablePO.class, JOB_SERIALIZER);
+ return store.getAllResources(ResourceStore.EXECUTE_RESOURCE_ROOT, timeStart, timeEndExclusive, ExecutablePO.class, JOB_SERIALIZER);
} catch (IOException e) {
logger.error("error get all Jobs:", e);
throw new PersistentException(e);
http://git-wip-us.apache.org/repos/asf/kylin/blob/6df837fa/kylin-it/src/test/java/org/apache/kylin/storage/hbase/ITHBaseResourceStoreTest.java
----------------------------------------------------------------------
diff --git a/kylin-it/src/test/java/org/apache/kylin/storage/hbase/ITHBaseResourceStoreTest.java b/kylin-it/src/test/java/org/apache/kylin/storage/hbase/ITHBaseResourceStoreTest.java
index dccdca9..bc5cdf1 100644
--- a/kylin-it/src/test/java/org/apache/kylin/storage/hbase/ITHBaseResourceStoreTest.java
+++ b/kylin-it/src/test/java/org/apache/kylin/storage/hbase/ITHBaseResourceStoreTest.java
@@ -19,25 +19,16 @@
package org.apache.kylin.storage.hbase;
import static org.junit.Assert.assertEquals;
-import static org.junit.Assert.assertNull;
import static org.junit.Assert.assertTrue;
-import static org.junit.Assert.fail;
-import java.io.DataInputStream;
-import java.io.DataOutputStream;
-import java.io.IOException;
-import java.util.ArrayList;
-import java.util.NavigableSet;
-
-import org.apache.commons.lang.StringUtils;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.fs.FSDataInputStream;
import org.apache.hadoop.fs.FileSystem;
import org.apache.hadoop.fs.Path;
import org.apache.kylin.common.KylinConfig;
import org.apache.kylin.common.persistence.ResourceStore;
-import org.apache.kylin.common.persistence.RootPersistentEntity;
-import org.apache.kylin.common.persistence.Serializer;
+import org.apache.kylin.common.persistence.ResourceStoreTest;
+import org.apache.kylin.common.persistence.ResourceStoreTest.StringEntity;
import org.apache.kylin.common.util.HBaseMetadataTestCase;
import org.junit.After;
import org.junit.Before;
@@ -57,7 +48,8 @@ public class ITHBaseResourceStoreTest extends HBaseMetadataTestCase {
@Test
public void testHBaseStore() throws Exception {
- testAStore(ResourceStore.getStore(KylinConfig.getInstanceFromEnv()));
+ ResourceStore store = ResourceStore.getStore(KylinConfig.getInstanceFromEnv());
+ ResourceStoreTest.testAStore(store);
}
@Test
@@ -95,120 +87,4 @@ public class ITHBaseResourceStoreTest extends HBaseMetadataTestCase {
}
}
- void testAStore(ResourceStore store) throws IOException {
- String dir1 = "/cube";
- String path1 = "/cube/_test.json";
- StringEntity content1 = new StringEntity("anything");
- String dir2 = "/table";
- String path2 = "/table/_test.json";
- StringEntity content2 = new StringEntity("something");
-
- // cleanup legacy if any
- store.deleteResource(path1);
- store.deleteResource(path2);
-
- StringEntity t;
-
- // put/get
- store.putResource(path1, content1, StringEntity.serializer);
- assertTrue(store.exists(path1));
- t = store.getResource(path1, StringEntity.class, StringEntity.serializer);
- assertEquals(content1, t);
-
- store.putResource(path2, content2, StringEntity.serializer);
- assertTrue(store.exists(path2));
- t = store.getResource(path2, StringEntity.class, StringEntity.serializer);
- assertEquals(content2, t);
-
- // overwrite
- t.str = "new string";
- store.putResource(path2, t, StringEntity.serializer);
-
- // write conflict
- try {
- t.setLastModified(t.getLastModified() - 1);
- store.putResource(path2, t, StringEntity.serializer);
- fail("write conflict should trigger IllegalStateException");
- } catch (IllegalStateException e) {
- // expected
- }
-
- // list
- NavigableSet<String> list;
-
- list = store.listResources(dir1);
- assertTrue(list.contains(path1));
- assertTrue(list.contains(path2) == false);
-
- list = store.listResources(dir2);
- assertTrue(list.contains(path2));
- assertTrue(list.contains(path1) == false);
-
- list = store.listResources("/");
- assertTrue(list.contains(dir1));
- assertTrue(list.contains(dir2));
- assertTrue(list.contains(path1) == false);
- assertTrue(list.contains(path2) == false);
-
- list = store.listResources(path1);
- assertNull(list);
- list = store.listResources(path2);
- assertNull(list);
-
- // delete/exist
- store.deleteResource(path1);
- assertTrue(store.exists(path1) == false);
- list = store.listResources(dir1);
- assertTrue(list == null || list.contains(path1) == false);
-
- store.deleteResource(path2);
- assertTrue(store.exists(path2) == false);
- list = store.listResources(dir2);
- assertTrue(list == null || list.contains(path2) == false);
- }
-
- public static class StringEntity extends RootPersistentEntity {
-
- static final Serializer<StringEntity> serializer = new Serializer<StringEntity>() {
- @Override
- public void serialize(StringEntity obj, DataOutputStream out) throws IOException {
- out.writeUTF(obj.str);
- }
-
- @Override
- public StringEntity deserialize(DataInputStream in) throws IOException {
- String str = in.readUTF();
- return new StringEntity(str);
- }
- };
-
- String str;
-
- public StringEntity(String str) {
- this.str = str;
- }
-
- @Override
- public int hashCode() {
- final int prime = 31;
- int result = super.hashCode();
- result = prime * result + ((str == null) ? 0 : str.hashCode());
- return result;
- }
-
- @Override
- public boolean equals(Object obj) {
- if (obj == this)
- return true;
- if (!(obj instanceof StringEntity))
- return false;
- return StringUtils.equals(this.str, ((StringEntity) obj).str);
- }
-
- @Override
- public String toString() {
- return str;
- }
- }
-
}
http://git-wip-us.apache.org/repos/asf/kylin/blob/6df837fa/storage-hbase/src/main/java/org/apache/kylin/storage/hbase/HBaseResourceStore.java
----------------------------------------------------------------------
diff --git a/storage-hbase/src/main/java/org/apache/kylin/storage/hbase/HBaseResourceStore.java b/storage-hbase/src/main/java/org/apache/kylin/storage/hbase/HBaseResourceStore.java
index bbb5e21..bb5382f 100644
--- a/storage-hbase/src/main/java/org/apache/kylin/storage/hbase/HBaseResourceStore.java
+++ b/storage-hbase/src/main/java/org/apache/kylin/storage/hbase/HBaseResourceStore.java
@@ -22,7 +22,9 @@ import java.io.ByteArrayInputStream;
import java.io.ByteArrayOutputStream;
import java.io.IOException;
import java.io.InputStream;
-import java.util.*;
+import java.util.List;
+import java.util.NavigableSet;
+import java.util.TreeSet;
import org.apache.commons.io.IOUtils;
import org.apache.hadoop.conf.Configuration;
@@ -37,7 +39,11 @@ import org.apache.hadoop.hbase.client.Put;
import org.apache.hadoop.hbase.client.Result;
import org.apache.hadoop.hbase.client.ResultScanner;
import org.apache.hadoop.hbase.client.Scan;
-import org.apache.hadoop.hbase.filter.*;
+import org.apache.hadoop.hbase.filter.CompareFilter;
+import org.apache.hadoop.hbase.filter.Filter;
+import org.apache.hadoop.hbase.filter.FilterList;
+import org.apache.hadoop.hbase.filter.KeyOnlyFilter;
+import org.apache.hadoop.hbase.filter.SingleColumnValueFilter;
import org.apache.kylin.common.KylinConfig;
import org.apache.kylin.common.persistence.RawResource;
import org.apache.kylin.common.persistence.ResourceStore;
@@ -88,18 +94,44 @@ public class HBaseResourceStore extends ResourceStore {
}
@Override
- protected NavigableSet<String> listResourcesImpl(String resPath) throws IOException {
- assert resPath.startsWith("/");
- String lookForPrefix = resPath.endsWith("/") ? resPath : resPath + "/";
+ protected boolean existsImpl(String resPath) throws IOException {
+ Result r = getFromHTable(resPath, false, false);
+ return r != null;
+ }
+
+ @Override
+ protected NavigableSet<String> listResourcesImpl(String folderPath) throws IOException {
+ final TreeSet<String> result = new TreeSet<>();
+
+ visitFolder(folderPath, new KeyOnlyFilter(), new FolderVisitor() {
+ @Override
+ public void visit(String childPath, String fullPath, Result hbaseResult) {
+ result.add(childPath);
+ }
+ });
+ // return null to indicate not a folder
+ return result.isEmpty() ? null : result;
+ }
+
+ private void visitFolder(String folderPath, Filter filter, FolderVisitor visitor) throws IOException {
+ assert folderPath.startsWith("/");
+ String lookForPrefix = folderPath.endsWith("/") ? folderPath : folderPath + "/";
byte[] startRow = Bytes.toBytes(lookForPrefix);
byte[] endRow = Bytes.toBytes(lookForPrefix);
endRow[endRow.length - 1]++;
- TreeSet<String> result = new TreeSet<>();
-
HTableInterface table = getConnection().getTable(getAllInOneTableName());
Scan scan = new Scan(startRow, endRow);
- scan.setFilter(new KeyOnlyFilter());
+ if ((filter != null && filter instanceof KeyOnlyFilter) == false) {
+ scan.addColumn(B_FAMILY, B_COLUMN_TS);
+ scan.addColumn(B_FAMILY, B_COLUMN);
+ }
+ if (filter != null) {
+ scan.setFilter(filter);
+ }
+
+ tuneScanParameters(scan);
+
try {
ResultScanner scanner = table.getScanner(scan);
for (Result r : scanner) {
@@ -107,73 +139,54 @@ public class HBaseResourceStore extends ResourceStore {
assert path.startsWith(lookForPrefix);
int cut = path.indexOf('/', lookForPrefix.length());
String child = cut < 0 ? path : path.substring(0, cut);
- result.add(child);
+ visitor.visit(child, path, r);
}
} finally {
IOUtils.closeQuietly(table);
}
- // return null to indicate not a folder
- return result.isEmpty() ? null : result;
}
- @Override
- protected boolean existsImpl(String resPath) throws IOException {
- Result r = getFromHTable(resPath, false, false);
- return r != null;
+ private void tuneScanParameters(Scan scan) {
+ // divide by 10 as some resource like dictionary or snapshot can be very large
+ scan.setCaching(kylinConfig.getHBaseScanCacheRows() / 10);
+ scan.setMaxResultSize(kylinConfig.getHBaseScanMaxResultSize());
+ scan.setCacheBlocks(true);
}
- @Override
- protected List<RawResource> getAllResources(String rangeStart, String rangeEnd) throws IOException {
- return getAllResources(rangeStart, rangeEnd, -1L, -1L);
+ interface FolderVisitor {
+ void visit(String childPath, String fullPath, Result hbaseResult) throws IOException;
}
@Override
- protected List<RawResource> getAllResources(String rangeStart, String rangeEnd, long timeStartInMillis, long timeEndInMillis) throws IOException {
- byte[] startRow = Bytes.toBytes(rangeStart);
- byte[] endRow = plusZero(Bytes.toBytes(rangeEnd));
-
- Scan scan = new Scan(startRow, endRow);
- scan.addColumn(B_FAMILY, B_COLUMN_TS);
- scan.addColumn(B_FAMILY, B_COLUMN);
- FilterList filterList = generateTimeFilterList(timeStartInMillis, timeEndInMillis);
- if (filterList != null) {
- scan.setFilter(filterList);
- }
- tuneScanParameters(scan);
-
- HTableInterface table = getConnection().getTable(getAllInOneTableName());
- List<RawResource> result = Lists.newArrayList();
+ protected List<RawResource> getAllResourcesImpl(String folderPath, long timeStart, long timeEndExclusive) throws IOException {
+ FilterList filter = generateTimeFilterList(timeStart, timeEndExclusive);
+ final List<RawResource> result = Lists.newArrayList();
try {
- ResultScanner scanner = table.getScanner(scan);
- for (Result r : scanner) {
- result.add(new RawResource(getInputStream(Bytes.toString(r.getRow()), r), getTimestamp(r)));
- }
+ visitFolder(folderPath, filter, new FolderVisitor() {
+ @Override
+ public void visit(String childPath, String fullPath, Result hbaseResult) throws IOException {
+ // is a direct child (not grand child)?
+ if (childPath.equals(fullPath))
+ result.add(new RawResource(getInputStream(childPath, hbaseResult), getTimestamp(hbaseResult)));
+ }
+ });
} catch (IOException e) {
for (RawResource rawResource : result) {
IOUtils.closeQuietly(rawResource.inputStream);
}
throw e;
- } finally {
- IOUtils.closeQuietly(table);
}
return result;
}
- private void tuneScanParameters(Scan scan) {
- // divide by 10 as some resource like dictionary or snapshot can be very large
- scan.setCaching(kylinConfig.getHBaseScanCacheRows() / 10);
- scan.setMaxResultSize(kylinConfig.getHBaseScanMaxResultSize());
- scan.setCacheBlocks(true);
- }
-
- private FilterList generateTimeFilterList(long timeStartInMillis, long timeEndInMillis) {
+ private FilterList generateTimeFilterList(long timeStart, long timeEndExclusive) {
FilterList filterList = new FilterList(FilterList.Operator.MUST_PASS_ALL);
- if (timeStartInMillis != -1L) {
- SingleColumnValueFilter timeStartFilter = new SingleColumnValueFilter(B_FAMILY, B_COLUMN_TS, CompareFilter.CompareOp.GREATER, Bytes.toBytes(timeStartInMillis));
+ if (timeStart != Long.MIN_VALUE) {
+ SingleColumnValueFilter timeStartFilter = new SingleColumnValueFilter(B_FAMILY, B_COLUMN_TS, CompareFilter.CompareOp.GREATER_OR_EQUAL, Bytes.toBytes(timeStart));
filterList.addFilter(timeStartFilter);
}
- if (timeEndInMillis != -1L) {
- SingleColumnValueFilter timeEndFilter = new SingleColumnValueFilter(B_FAMILY, B_COLUMN_TS, CompareFilter.CompareOp.LESS_OR_EQUAL, Bytes.toBytes(timeEndInMillis));
+ if (timeEndExclusive != Long.MAX_VALUE) {
+ SingleColumnValueFilter timeEndFilter = new SingleColumnValueFilter(B_FAMILY, B_COLUMN_TS, CompareFilter.CompareOp.LESS, Bytes.toBytes(timeEndExclusive));
filterList.addFilter(timeEndFilter);
}
return filterList.getFilters().size() == 0 ? null : filterList;
@@ -299,12 +312,6 @@ public class HBaseResourceStore extends ResourceStore {
}
}
- private byte[] plusZero(byte[] startRow) {
- byte[] endRow = Arrays.copyOf(startRow, startRow.length + 1);
- endRow[endRow.length - 1] = 0;
- return endRow;
- }
-
private Path writeLargeCellToHdfs(String resPath, byte[] largeColumn, HTableInterface table) throws IOException {
Path redirectPath = bigCellHDFSPath(resPath);
Configuration hconf = HBaseConnection.getCurrentHBaseConfiguration();