You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@kylin.apache.org by li...@apache.org on 2015/11/18 03:39:17 UTC
[23/30] incubator-kylin git commit: KYLIN-1152 Modify resource store
to always return content and timestamp together
KYLIN-1152 Modify resource store to always return content and timestamp together
Project: http://git-wip-us.apache.org/repos/asf/incubator-kylin/repo
Commit: http://git-wip-us.apache.org/repos/asf/incubator-kylin/commit/2f788e71
Tree: http://git-wip-us.apache.org/repos/asf/incubator-kylin/tree/2f788e71
Diff: http://git-wip-us.apache.org/repos/asf/incubator-kylin/diff/2f788e71
Branch: refs/heads/1.x-HBase1.1.3
Commit: 2f788e71e7a12ec54cc99b5ec0305924847a178e
Parents: 1afb32d
Author: Li, Yang <ya...@ebay.com>
Authored: Tue Nov 17 13:15:54 2015 +0800
Committer: Li, Yang <ya...@ebay.com>
Committed: Tue Nov 17 13:17:04 2015 +0800
----------------------------------------------------------------------
.../common/persistence/FileResourceStore.java | 16 +++------
.../common/persistence/HBaseResourceStore.java | 36 ++++++++++----------
.../kylin/common/persistence/RawResource.java | 6 ++--
.../kylin/common/persistence/ResourceStore.java | 27 ++++++---------
.../kylin/common/persistence/ResourceTool.java | 12 +++----
.../apache/kylin/job/CubeMetadataUpgrade.java | 9 ++---
.../kylin/job/hadoop/AbstractHadoopJob.java | 12 +++----
.../job/hadoop/cube/MetadataCleanupJob.java | 9 +++--
.../kylin/job/tools/CubeMigrationCLI.java | 9 +++--
.../java/org/apache/kylin/job/DeployUtil.java | 4 +--
.../kylin/job/dataGen/FactTableGenerator.java | 7 ++--
.../apache/kylin/metadata/MetadataManager.java | 7 ++--
.../org/apache/kylin/query/test/H2Database.java | 15 ++++----
.../apache/kylin/query/test/KylinQueryTest.java | 4 +--
14 files changed, 82 insertions(+), 91 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/incubator-kylin/blob/2f788e71/common/src/main/java/org/apache/kylin/common/persistence/FileResourceStore.java
----------------------------------------------------------------------
diff --git a/common/src/main/java/org/apache/kylin/common/persistence/FileResourceStore.java b/common/src/main/java/org/apache/kylin/common/persistence/FileResourceStore.java
index 9648f6b..98c6b18 100644
--- a/common/src/main/java/org/apache/kylin/common/persistence/FileResourceStore.java
+++ b/common/src/main/java/org/apache/kylin/common/persistence/FileResourceStore.java
@@ -75,14 +75,14 @@ public class FileResourceStore extends ResourceStore {
for (String resource : resources) {
if (resource.compareTo(rangeStart) >= 0 && resource.compareTo(rangeEnd) <= 0) {
if (existsImpl(resource)) {
- result.add(new RawResource(getResourceImpl(resource), getResourceTimestampImpl(resource)));
+ result.add(getResourceImpl(resource));
}
}
}
return result;
} catch (IOException ex) {
for (RawResource rawResource : result) {
- IOUtils.closeQuietly(rawResource.resource);
+ IOUtils.closeQuietly(rawResource.inputStream);
}
throw ex;
} catch (Exception ex) {
@@ -91,21 +91,15 @@ public class FileResourceStore extends ResourceStore {
}
@Override
- protected InputStream getResourceImpl(String resPath) throws IOException {
+ protected RawResource getResourceImpl(String resPath) throws IOException {
File f = file(resPath);
if (f.exists() && f.isFile())
- return new FileInputStream(file(resPath));
+ return new RawResource(new FileInputStream(f), f.lastModified());
else
return null;
}
@Override
- protected long getResourceTimestampImpl(String resPath) throws IOException {
- File f = file(resPath);
- return f.lastModified();
- }
-
- @Override
protected void putResourceImpl(String resPath, InputStream content, long ts) throws IOException {
File f = file(resPath);
f.getParentFile().mkdirs();
@@ -128,7 +122,7 @@ public class FileResourceStore extends ResourceStore {
putResourceImpl(resPath, new ByteArrayInputStream(content), newTS);
// some FS lose precision on given time stamp
- return getResourceTimestamp(resPath);
+ return f.lastModified();
}
@Override
http://git-wip-us.apache.org/repos/asf/incubator-kylin/blob/2f788e71/common/src/main/java/org/apache/kylin/common/persistence/HBaseResourceStore.java
----------------------------------------------------------------------
diff --git a/common/src/main/java/org/apache/kylin/common/persistence/HBaseResourceStore.java b/common/src/main/java/org/apache/kylin/common/persistence/HBaseResourceStore.java
index 1c4a7ba..2b14345 100644
--- a/common/src/main/java/org/apache/kylin/common/persistence/HBaseResourceStore.java
+++ b/common/src/main/java/org/apache/kylin/common/persistence/HBaseResourceStore.java
@@ -141,7 +141,7 @@ public class HBaseResourceStore extends ResourceStore {
@Override
protected boolean existsImpl(String resPath) throws IOException {
- Result r = getByScan(resPath, null, null);
+ Result r = getByScan(resPath, false, false);
return r != null;
}
@@ -163,7 +163,7 @@ public class HBaseResourceStore extends ResourceStore {
}
} catch (IOException e) {
for (RawResource rawResource : result) {
- IOUtils.closeQuietly(rawResource.resource);
+ IOUtils.closeQuietly(rawResource.inputStream);
}
throw e;
} finally {
@@ -179,7 +179,7 @@ public class HBaseResourceStore extends ResourceStore {
byte[] value = r.getValue(B_FAMILY, B_COLUMN);
if (value.length == 0) {
Path redirectPath = bigCellHDFSPath(resPath);
- Configuration hconf = HadoopUtil.getCurrentHBaseConfiguration();
+ Configuration hconf = HadoopUtil.getCurrentHBaseConfiguration();
FileSystem fileSystem = FileSystem.get(hconf);
return fileSystem.open(redirectPath);
@@ -197,15 +197,12 @@ public class HBaseResourceStore extends ResourceStore {
}
@Override
- protected InputStream getResourceImpl(String resPath) throws IOException {
- Result r = getByScan(resPath, B_FAMILY, B_COLUMN);
- return getInputStream(resPath, r);
- }
-
- @Override
- protected long getResourceTimestampImpl(String resPath) throws IOException {
- Result r = getByScan(resPath, B_FAMILY, B_COLUMN_TS);
- return getTimestamp(r);
+ protected RawResource getResourceImpl(String resPath) throws IOException {
+ Result r = getByScan(resPath, true, true);
+ if (r == null)
+ return null;
+ else
+ return new RawResource(getInputStream(resPath, r), getTimestamp(r));
}
@Override
@@ -236,8 +233,8 @@ public class HBaseResourceStore extends ResourceStore {
boolean ok = table.checkAndPut(row, B_FAMILY, B_COLUMN_TS, bOldTS, put);
if (!ok) {
- long real = getResourceTimestamp(resPath);
- throw new IllegalStateException("Overwriting conflict " + resPath + ", expect old TS " + real + ", but it is " + oldTS);
+ long real = getTimestamp(getByScan(resPath, false, true));
+ throw new IllegalStateException("Overwriting conflict " + resPath + ", expect old TS " + real + ", but it is " + oldTS);
}
table.flushCommits();
@@ -265,15 +262,18 @@ public class HBaseResourceStore extends ResourceStore {
return getAllInOneTableName() + "(key='" + resPath + "')@" + kylinConfig.getMetadataUrl();
}
- private Result getByScan(String path, byte[] family, byte[] column) throws IOException {
+ private Result getByScan(String path, boolean fetchContent, boolean fetchTimestamp) throws IOException {
byte[] startRow = Bytes.toBytes(path);
byte[] endRow = plusZero(startRow);
Scan scan = new Scan(startRow, endRow);
- if (family == null || column == null) {
+ if (!fetchContent && !fetchTimestamp) {
scan.setFilter(new KeyOnlyFilter());
} else {
- scan.addColumn(family, column);
+ if (fetchContent)
+ scan.addColumn(B_FAMILY, B_COLUMN);
+ if (fetchTimestamp)
+ scan.addColumn(B_FAMILY, B_COLUMN_TS);
}
HTableInterface table = getConnection().getTable(getAllInOneTableName());
@@ -297,7 +297,7 @@ public class HBaseResourceStore extends ResourceStore {
private Path writeLargeCellToHdfs(String resPath, byte[] largeColumn, HTableInterface table) throws IOException {
Path redirectPath = bigCellHDFSPath(resPath);
- Configuration hconf = HadoopUtil.getCurrentHBaseConfiguration();
+ Configuration hconf = HadoopUtil.getCurrentHBaseConfiguration();
FileSystem fileSystem = FileSystem.get(hconf);
if (fileSystem.exists(redirectPath)) {
http://git-wip-us.apache.org/repos/asf/incubator-kylin/blob/2f788e71/common/src/main/java/org/apache/kylin/common/persistence/RawResource.java
----------------------------------------------------------------------
diff --git a/common/src/main/java/org/apache/kylin/common/persistence/RawResource.java b/common/src/main/java/org/apache/kylin/common/persistence/RawResource.java
index 8125b86..4f52553 100644
--- a/common/src/main/java/org/apache/kylin/common/persistence/RawResource.java
+++ b/common/src/main/java/org/apache/kylin/common/persistence/RawResource.java
@@ -24,11 +24,11 @@ import java.io.InputStream;
*/
public class RawResource {
- public final InputStream resource;
+ public final InputStream inputStream;
public final long timestamp;
- public RawResource(InputStream resource, long timestamp) {
- this.resource = resource;
+ public RawResource(InputStream inputStream, long timestamp) {
+ this.inputStream = inputStream;
this.timestamp = timestamp;
}
}
http://git-wip-us.apache.org/repos/asf/incubator-kylin/blob/2f788e71/common/src/main/java/org/apache/kylin/common/persistence/ResourceStore.java
----------------------------------------------------------------------
diff --git a/common/src/main/java/org/apache/kylin/common/persistence/ResourceStore.java b/common/src/main/java/org/apache/kylin/common/persistence/ResourceStore.java
index 55a1a58..a23a4cd 100644
--- a/common/src/main/java/org/apache/kylin/common/persistence/ResourceStore.java
+++ b/common/src/main/java/org/apache/kylin/common/persistence/ResourceStore.java
@@ -127,22 +127,22 @@ abstract public class ResourceStore {
*/
final public <T extends RootPersistentEntity> T getResource(String resPath, Class<T> clz, Serializer<T> serializer) throws IOException {
resPath = norm(resPath);
- InputStream in = getResourceImpl(resPath);
- if (in == null)
+ RawResource res = getResourceImpl(resPath);
+ if (res == null)
return null;
-
- DataInputStream din = new DataInputStream(in);
+
+ DataInputStream din = new DataInputStream(res.inputStream);
try {
T r = serializer.deserialize(din);
- r.setLastModified(getResourceTimestamp(resPath));
+ r.setLastModified(res.timestamp);
return r;
} finally {
IOUtils.closeQuietly(din);
- IOUtils.closeQuietly(in);
+ IOUtils.closeQuietly(res.inputStream);
}
}
- final public InputStream getResource(String resPath) throws IOException {
+ final public RawResource getResource(String resPath) throws IOException {
return getResourceImpl(norm(resPath));
}
@@ -154,27 +154,22 @@ abstract public class ResourceStore {
List<T> result = Lists.newArrayList();
try {
for (RawResource rawResource : allResources) {
- final T element = serializer.deserialize(new DataInputStream(rawResource.resource));
+ final T element = serializer.deserialize(new DataInputStream(rawResource.inputStream));
element.setLastModified(rawResource.timestamp);
result.add(element);
}
return result;
} finally {
for (RawResource rawResource : allResources) {
- IOUtils.closeQuietly(rawResource.resource);
+ IOUtils.closeQuietly(rawResource.inputStream);
}
}
}
abstract protected List<RawResource> getAllResources(String rangeStart, String rangeEnd) throws IOException;
- abstract protected InputStream getResourceImpl(String resPath) throws IOException;
-
- final public long getResourceTimestamp(String resPath) throws IOException {
- return getResourceTimestampImpl(norm(resPath));
- }
-
- abstract protected long getResourceTimestampImpl(String resPath) throws IOException;
+ /** returns null if not exists */
+ abstract protected RawResource getResourceImpl(String resPath) throws IOException;
/**
* overwrite a resource without write conflict check
http://git-wip-us.apache.org/repos/asf/incubator-kylin/blob/2f788e71/common/src/main/java/org/apache/kylin/common/persistence/ResourceTool.java
----------------------------------------------------------------------
diff --git a/common/src/main/java/org/apache/kylin/common/persistence/ResourceTool.java b/common/src/main/java/org/apache/kylin/common/persistence/ResourceTool.java
index 6c15f00..0ebed3d 100644
--- a/common/src/main/java/org/apache/kylin/common/persistence/ResourceTool.java
+++ b/common/src/main/java/org/apache/kylin/common/persistence/ResourceTool.java
@@ -19,7 +19,6 @@
package org.apache.kylin.common.persistence;
import java.io.IOException;
-import java.io.InputStream;
import java.util.ArrayList;
import org.apache.kylin.common.KylinConfig;
@@ -86,13 +85,12 @@ public class ResourceTool {
// case of resource (not a folder)
if (children == null) {
if (matchExclude(path) == false) {
- InputStream content = src.getResource(path);
- long ts = src.getResourceTimestamp(path);
- if (content != null) {
- dst.putResource(path, content, ts);
- content.close();
+ RawResource res = src.getResource(path);
+ if (res != null) {
+ dst.putResource(path, res.inputStream, res.timestamp);
+ res.inputStream.close();
} else {
- System.out.println("Null inputstream for " + path);
+ System.out.println("Resource not exist for " + path);
}
}
}
http://git-wip-us.apache.org/repos/asf/incubator-kylin/blob/2f788e71/job/src/main/java/org/apache/kylin/job/CubeMetadataUpgrade.java
----------------------------------------------------------------------
diff --git a/job/src/main/java/org/apache/kylin/job/CubeMetadataUpgrade.java b/job/src/main/java/org/apache/kylin/job/CubeMetadataUpgrade.java
index cd3427e..0a08709 100644
--- a/job/src/main/java/org/apache/kylin/job/CubeMetadataUpgrade.java
+++ b/job/src/main/java/org/apache/kylin/job/CubeMetadataUpgrade.java
@@ -193,7 +193,7 @@ public class CubeMetadataUpgrade {
MetadataManager.getInstance(config).reload();
CubeDescManager.clearCache();
CubeDescManager.getInstance(config);
- CubeManager cubeManager = CubeManager.getInstance(config);
+ CubeManager.getInstance(config);
ProjectManager.getInstance(config);
//cleanup();
@@ -267,7 +267,7 @@ public class CubeMetadataUpgrade {
InputStream is = null;
try {
- is = store.getResource(path);
+ is = store.getResource(path).inputStream;
if (is == null) {
continue;
}
@@ -496,7 +496,7 @@ public class CubeMetadataUpgrade {
if (pkToFK.containsKey(key) && !newSeg.getDictionaries().containsKey(pkToFK.get(key))) {
logger.debug("Duplicate dictionary for FK " + pkToFK.get(key) + " in cube " + newInstance.getName());
changedCubes.add(newInstance.getName());
- newDictionaries.add(new Pair(pkToFK.get(key), e.getValue()));
+ newDictionaries.add(new Pair<String, String>(pkToFK.get(key), e.getValue()));
}
}
@@ -617,10 +617,11 @@ public class CubeMetadataUpgrade {
for (int i = 0, size = job.getSteps().size(); i < size; ++i) {
final JobInstance.JobStep jobStep = job.getSteps().get(i);
final String outputPath = ResourceStore.JOB_OUTPUT_PATH_ROOT + "/" + job.getId() + "." + i;
- final InputStream inputStream = getStore().getResource(outputPath);
+ final InputStream inputStream = getStore().getResource(outputPath).inputStream;
String output = null;
if (inputStream != null) {
+ @SuppressWarnings("unchecked")
HashMap<String, String> job_output = JsonUtil.readValue(inputStream, HashMap.class);
if (job_output != null) {
http://git-wip-us.apache.org/repos/asf/incubator-kylin/blob/2f788e71/job/src/main/java/org/apache/kylin/job/hadoop/AbstractHadoopJob.java
----------------------------------------------------------------------
diff --git a/job/src/main/java/org/apache/kylin/job/hadoop/AbstractHadoopJob.java b/job/src/main/java/org/apache/kylin/job/hadoop/AbstractHadoopJob.java
index 7b3af95..a851756 100644
--- a/job/src/main/java/org/apache/kylin/job/hadoop/AbstractHadoopJob.java
+++ b/job/src/main/java/org/apache/kylin/job/hadoop/AbstractHadoopJob.java
@@ -27,7 +27,6 @@ import static org.apache.hadoop.util.StringUtils.*;
import java.io.File;
import java.io.IOException;
-import java.io.InputStream;
import java.util.ArrayList;
import java.util.HashMap;
import java.util.Map;
@@ -51,6 +50,7 @@ import org.apache.hadoop.mapreduce.lib.input.FileInputFormat;
import org.apache.hadoop.util.ReflectionUtils;
import org.apache.hadoop.util.Tool;
import org.apache.kylin.common.KylinConfig;
+import org.apache.kylin.common.persistence.RawResource;
import org.apache.kylin.common.persistence.ResourceStore;
import org.apache.kylin.common.util.CliCommandExecutor;
import org.apache.kylin.common.util.StringSplitter;
@@ -347,13 +347,11 @@ public abstract class AbstractHadoopJob extends Configured implements Tool {
KylinConfig localConfig = KylinConfig.createInstanceFromUri(metaDir.getAbsolutePath());
ResourceStore to = ResourceStore.getStore(localConfig);
for (String path : dumpList) {
- InputStream in = from.getResource(path);
- if (in == null)
+ RawResource res = from.getResource(path);
+ if (res == null)
throw new IllegalStateException("No resource found at -- " + path);
- long ts = from.getResourceTimestamp(path);
- to.putResource(path, in, ts);
- //The following log is duplicate with in ResourceStore
- //log.info("Dumped resource " + path + " to " + metaDir.getAbsolutePath());
+ to.putResource(path, res.inputStream, res.timestamp);
+ res.inputStream.close();
}
}
http://git-wip-us.apache.org/repos/asf/incubator-kylin/blob/2f788e71/job/src/main/java/org/apache/kylin/job/hadoop/cube/MetadataCleanupJob.java
----------------------------------------------------------------------
diff --git a/job/src/main/java/org/apache/kylin/job/hadoop/cube/MetadataCleanupJob.java b/job/src/main/java/org/apache/kylin/job/hadoop/cube/MetadataCleanupJob.java
index b322a4b..cb601c5 100644
--- a/job/src/main/java/org/apache/kylin/job/hadoop/cube/MetadataCleanupJob.java
+++ b/job/src/main/java/org/apache/kylin/job/hadoop/cube/MetadataCleanupJob.java
@@ -28,6 +28,7 @@ import org.apache.commons.cli.OptionBuilder;
import org.apache.commons.cli.Options;
import org.apache.hadoop.util.ToolRunner;
import org.apache.kylin.common.KylinConfig;
+import org.apache.kylin.common.persistence.RawResource;
import org.apache.kylin.common.persistence.ResourceStore;
import org.apache.kylin.cube.CubeManager;
import org.apache.kylin.job.hadoop.AbstractHadoopJob;
@@ -114,7 +115,9 @@ public class MetadataCleanupJob extends AbstractHadoopJob {
if (snapshotNames != null)
for (String snapshot : snapshotNames) {
if (!activeResourceList.contains(snapshot)) {
- if (isOlderThanThreshold(getStore().getResourceTimestamp(snapshot)))
+ RawResource res = getStore().getResource(snapshot);
+ res.inputStream.close();
+ if (isOlderThanThreshold(res.timestamp))
toDeleteResource.add(snapshot);
}
}
@@ -134,7 +137,9 @@ public class MetadataCleanupJob extends AbstractHadoopJob {
if (dictionaries != null)
for (String dict : dictionaries)
if (!activeResourceList.contains(dict)) {
- if (isOlderThanThreshold(getStore().getResourceTimestamp(dict)))
+ RawResource res = getStore().getResource(dict);
+ res.inputStream.close();
+ if (isOlderThanThreshold(res.timestamp))
toDeleteResource.add(dict);
}
}
http://git-wip-us.apache.org/repos/asf/incubator-kylin/blob/2f788e71/job/src/main/java/org/apache/kylin/job/tools/CubeMigrationCLI.java
----------------------------------------------------------------------
diff --git a/job/src/main/java/org/apache/kylin/job/tools/CubeMigrationCLI.java b/job/src/main/java/org/apache/kylin/job/tools/CubeMigrationCLI.java
index b07d6a9..2d4b0bf 100644
--- a/job/src/main/java/org/apache/kylin/job/tools/CubeMigrationCLI.java
+++ b/job/src/main/java/org/apache/kylin/job/tools/CubeMigrationCLI.java
@@ -19,7 +19,6 @@
package org.apache.kylin.job.tools;
import java.io.IOException;
-import java.io.InputStream;
import java.util.ArrayList;
import java.util.List;
import java.util.Map;
@@ -33,6 +32,7 @@ import org.apache.hadoop.hbase.TableName;
import org.apache.hadoop.hbase.client.HBaseAdmin;
import org.apache.kylin.common.KylinConfig;
import org.apache.kylin.common.persistence.JsonSerializer;
+import org.apache.kylin.common.persistence.RawResource;
import org.apache.kylin.common.persistence.ResourceStore;
import org.apache.kylin.common.persistence.Serializer;
import org.apache.kylin.cube.CubeInstance;
@@ -293,10 +293,9 @@ public class CubeMigrationCLI {
}
case COPY_FILE_IN_META: {
String item = (String) opt.params[0];
- InputStream inputStream = srcStore.getResource(item);
- long ts = srcStore.getResourceTimestamp(item);
- dstStore.putResource(item, inputStream, ts);
- inputStream.close();
+ RawResource res = srcStore.getResource(item);
+ dstStore.putResource(item, res.inputStream, res.timestamp);
+ res.inputStream.close();
logger.info("Item " + item + " is copied");
break;
}
http://git-wip-us.apache.org/repos/asf/incubator-kylin/blob/2f788e71/job/src/test/java/org/apache/kylin/job/DeployUtil.java
----------------------------------------------------------------------
diff --git a/job/src/test/java/org/apache/kylin/job/DeployUtil.java b/job/src/test/java/org/apache/kylin/job/DeployUtil.java
index 8684aa0..550bddb 100644
--- a/job/src/test/java/org/apache/kylin/job/DeployUtil.java
+++ b/job/src/test/java/org/apache/kylin/job/DeployUtil.java
@@ -183,7 +183,7 @@ public class DeployUtil {
// duplicate a copy of this fact table, with a naming convention with fact.csv.inner or fact.csv.left
// so that later test cases can select different data files
ResourceStore store = ResourceStore.getStore(config());
- InputStream in = store.getResource("/data/" + factTableName + ".csv");
+ InputStream in = store.getResource("/data/" + factTableName + ".csv").inputStream;
String factTablePathWithJoinType = "/data/" + factTableName + ".csv." + joinType.toLowerCase();
store.deleteResource(factTablePathWithJoinType);
store.putResource(factTablePathWithJoinType, in, System.currentTimeMillis());
@@ -203,7 +203,7 @@ public class DeployUtil {
File localBufferFile = new File(temp.getParent() + "/" + tablename + ".csv");
localBufferFile.createNewFile();
- InputStream hbaseDataStream = metaMgr.getStore().getResource("/data/" + tablename + ".csv");
+ InputStream hbaseDataStream = metaMgr.getStore().getResource("/data/" + tablename + ".csv").inputStream;
FileOutputStream localFileStream = new FileOutputStream(localBufferFile);
IOUtils.copy(hbaseDataStream, localFileStream);
http://git-wip-us.apache.org/repos/asf/incubator-kylin/blob/2f788e71/job/src/test/java/org/apache/kylin/job/dataGen/FactTableGenerator.java
----------------------------------------------------------------------
diff --git a/job/src/test/java/org/apache/kylin/job/dataGen/FactTableGenerator.java b/job/src/test/java/org/apache/kylin/job/dataGen/FactTableGenerator.java
index 2bc4dc3..c9988fc 100644
--- a/job/src/test/java/org/apache/kylin/job/dataGen/FactTableGenerator.java
+++ b/job/src/test/java/org/apache/kylin/job/dataGen/FactTableGenerator.java
@@ -107,8 +107,7 @@ public class FactTableGenerator {
*/
private void loadConfig() {
try {
- InputStream configStream = null;
- configStream = store.getResource("/data/data_gen_config.json");
+ InputStream configStream = store.getResource("/data/data_gen_config.json").inputStream;
this.genConf = GenConfig.loadConfig(configStream);
if (configStream != null)
@@ -136,7 +135,7 @@ public class FactTableGenerator {
}
String path = "/data/" + lookupTableName + ".csv";
- tableStream = store.getResource(path);
+ tableStream = store.getResource(path).inputStream;
tableReader = new BufferedReader(new InputStreamReader(tableStream));
tableReader.mark(0);
int rowCount = 0;
@@ -158,7 +157,7 @@ public class FactTableGenerator {
tableStream = null;
tableReader = null;
- tableStream = store.getResource(path);
+ tableStream = store.getResource(path).inputStream;
tableReader = new BufferedReader(new InputStreamReader(tableStream));
while ((curRow = tableReader.readLine()) != null) {
http://git-wip-us.apache.org/repos/asf/incubator-kylin/blob/2f788e71/metadata/src/main/java/org/apache/kylin/metadata/MetadataManager.java
----------------------------------------------------------------------
diff --git a/metadata/src/main/java/org/apache/kylin/metadata/MetadataManager.java b/metadata/src/main/java/org/apache/kylin/metadata/MetadataManager.java
index 377fba7..b540588 100644
--- a/metadata/src/main/java/org/apache/kylin/metadata/MetadataManager.java
+++ b/metadata/src/main/java/org/apache/kylin/metadata/MetadataManager.java
@@ -32,6 +32,7 @@ import java.util.concurrent.ConcurrentHashMap;
import org.apache.kylin.common.KylinConfig;
import org.apache.kylin.common.persistence.JsonSerializer;
+import org.apache.kylin.common.persistence.RawResource;
import org.apache.kylin.common.persistence.ResourceStore;
import org.apache.kylin.common.persistence.Serializer;
import org.apache.kylin.common.restclient.Broadcaster;
@@ -215,11 +216,13 @@ public class MetadataManager {
Map<String, String> attrs = Maps.newHashMap();
ResourceStore store = getStore();
- InputStream is = store.getResource(path);
- if (is == null) {
+ RawResource res = store.getResource(path);
+ if (res == null) {
logger.warn("Failed to get table exd info from " + path);
return null;
}
+
+ InputStream is = res.inputStream;
try {
attrs.putAll(JsonUtil.readValue(is, HashMap.class));
http://git-wip-us.apache.org/repos/asf/incubator-kylin/blob/2f788e71/query/src/test/java/org/apache/kylin/query/test/H2Database.java
----------------------------------------------------------------------
diff --git a/query/src/test/java/org/apache/kylin/query/test/H2Database.java b/query/src/test/java/org/apache/kylin/query/test/H2Database.java
index eea1a96..564363f 100644
--- a/query/src/test/java/org/apache/kylin/query/test/H2Database.java
+++ b/query/src/test/java/org/apache/kylin/query/test/H2Database.java
@@ -21,7 +21,6 @@ package org.apache.kylin.query.test;
import java.io.File;
import java.io.FileOutputStream;
import java.io.IOException;
-import java.io.InputStream;
import java.sql.Connection;
import java.sql.SQLException;
import java.sql.Statement;
@@ -29,6 +28,7 @@ import java.util.HashMap;
import java.util.Map;
import org.apache.kylin.common.KylinConfig;
+import org.apache.kylin.common.persistence.RawResource;
import org.apache.kylin.metadata.MetadataManager;
import org.apache.kylin.metadata.model.ColumnDesc;
import org.apache.kylin.metadata.model.TableDesc;
@@ -76,18 +76,17 @@ public class H2Database {
String normalPath = "/data/" + tableDesc.getIdentity() + ".csv";
// If it's the fact table, there will be a facttable.csv.inner or
- // facttable.csv.left in hbase
- // otherwise just use lookup.csv
- InputStream csvStream = metaMgr.getStore().getResource(normalPath + fileNameSuffix);
- if (csvStream == null) {
- csvStream = metaMgr.getStore().getResource(normalPath);
+ // facttable.csv.left in hbase, otherwise just use lookup.csv
+ RawResource res = metaMgr.getStore().getResource(normalPath + fileNameSuffix);
+ if (res == null) {
+ res = metaMgr.getStore().getResource(normalPath);
} else {
logger.info("H2 decides to load " + (normalPath + fileNameSuffix) + " for table " + tableDesc.getIdentity());
}
- org.apache.commons.io.IOUtils.copy(csvStream, tempFileStream);
+ org.apache.commons.io.IOUtils.copy(res.inputStream, tempFileStream);
- csvStream.close();
+ res.inputStream.close();
tempFileStream.close();
} catch (IOException e) {
http://git-wip-us.apache.org/repos/asf/incubator-kylin/blob/2f788e71/query/src/test/java/org/apache/kylin/query/test/KylinQueryTest.java
----------------------------------------------------------------------
diff --git a/query/src/test/java/org/apache/kylin/query/test/KylinQueryTest.java b/query/src/test/java/org/apache/kylin/query/test/KylinQueryTest.java
index 84f1042..148607a 100644
--- a/query/src/test/java/org/apache/kylin/query/test/KylinQueryTest.java
+++ b/query/src/test/java/org/apache/kylin/query/test/KylinQueryTest.java
@@ -43,7 +43,7 @@ import org.junit.BeforeClass;
import org.junit.Ignore;
import org.junit.Test;
-@Ignore("KylinQueryTest is contained by CombinationTest")
+//@Ignore("KylinQueryTest is contained by CombinationTest")
public class KylinQueryTest extends KylinTestBase {
@BeforeClass
@@ -140,7 +140,7 @@ public class KylinQueryTest extends KylinTestBase {
@Test
public void testSingleRunQuery() throws Exception {
- String queryFileName = "src/test/resources/query/sql/query62.sql";
+ String queryFileName = "src/test/resources/query/sql/sample.txt";
File sqlFile = new File(queryFileName);
runSQL(sqlFile, true, true);