You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@kylin.apache.org by li...@apache.org on 2015/11/17 06:53:01 UTC
incubator-kylin git commit: KYLIN-1152 ResourceStore to return
content and timestamp together
Repository: incubator-kylin
Updated Branches:
refs/heads/2.x-staging 6ae2bfc48 -> 60daf39b3
KYLIN-1152 ResourceStore to return content and timestamp together
Project: http://git-wip-us.apache.org/repos/asf/incubator-kylin/repo
Commit: http://git-wip-us.apache.org/repos/asf/incubator-kylin/commit/60daf39b
Tree: http://git-wip-us.apache.org/repos/asf/incubator-kylin/tree/60daf39b
Diff: http://git-wip-us.apache.org/repos/asf/incubator-kylin/diff/60daf39b
Branch: refs/heads/2.x-staging
Commit: 60daf39b3dcabe6d48b611a6268729f6af8aeae1
Parents: 6ae2bfc
Author: Li, Yang <ya...@ebay.com>
Authored: Tue Nov 17 13:52:18 2015 +0800
Committer: Li, Yang <ya...@ebay.com>
Committed: Tue Nov 17 13:52:50 2015 +0800
----------------------------------------------------------------------
.../java/org/apache/kylin/job/DeployUtil.java | 17 +++++++----
.../kylin/job/dataGen/FactTableGenerator.java | 7 ++---
.../common/persistence/FileResourceStore.java | 15 +++++----
.../kylin/common/persistence/RawResource.java | 4 +--
.../kylin/common/persistence/ResourceStore.java | 32 +++++++++++---------
.../kylin/common/persistence/ResourceTool.java | 12 +++-----
.../apache/kylin/metadata/MetadataManager.java | 7 +++--
.../engine/mr/common/AbstractHadoopJob.java | 16 ++++------
.../engine/mr/steps/MergeStatisticsStep.java | 2 +-
.../engine/streaming/StreamingManager.java | 26 ++++++++++------
.../org/apache/kylin/query/test/H2Database.java | 3 +-
.../kylin/storage/hbase/HBaseResourceStore.java | 29 ++++++++++--------
.../storage/hbase/steps/CreateHTableJob.java | 2 +-
.../storage/hbase/util/CubeMigrationCLI.java | 9 +++---
14 files changed, 99 insertions(+), 82 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/incubator-kylin/blob/60daf39b/assembly/src/test/java/org/apache/kylin/job/DeployUtil.java
----------------------------------------------------------------------
diff --git a/assembly/src/test/java/org/apache/kylin/job/DeployUtil.java b/assembly/src/test/java/org/apache/kylin/job/DeployUtil.java
index 722b6b8..2f51475 100644
--- a/assembly/src/test/java/org/apache/kylin/job/DeployUtil.java
+++ b/assembly/src/test/java/org/apache/kylin/job/DeployUtil.java
@@ -18,9 +18,17 @@
package org.apache.kylin.job;
-import com.google.common.collect.Lists;
+import java.io.ByteArrayInputStream;
+import java.io.File;
+import java.io.FileOutputStream;
+import java.io.FileReader;
+import java.io.IOException;
+import java.io.InputStream;
+import java.util.List;
+
import kafka.message.Message;
import kafka.message.MessageAndOffset;
+
import org.apache.commons.io.IOUtils;
import org.apache.commons.lang.StringUtils;
import org.apache.kylin.common.KylinConfig;
@@ -30,7 +38,6 @@ import org.apache.kylin.common.util.AbstractKylinTestCase;
import org.apache.kylin.cube.CubeInstance;
import org.apache.kylin.cube.CubeManager;
import org.apache.kylin.cube.CubeUpdate;
-import org.apache.kylin.engine.streaming.StreamingConfig;
import org.apache.kylin.job.dataGen.FactTableGenerator;
import org.apache.kylin.job.streaming.KafkaDataLoader;
import org.apache.kylin.job.streaming.StreamingTableDataGenerator;
@@ -39,7 +46,6 @@ import org.apache.kylin.metadata.model.ColumnDesc;
import org.apache.kylin.metadata.model.TableDesc;
import org.apache.kylin.metadata.model.TblColRef;
import org.apache.kylin.source.hive.HiveClient;
-import org.apache.kylin.source.kafka.KafkaConfigManager;
import org.apache.kylin.source.kafka.TimedJsonStreamParser;
import org.apache.kylin.source.kafka.config.KafkaConfig;
import org.apache.maven.model.Model;
@@ -47,8 +53,7 @@ import org.apache.maven.model.io.xpp3.MavenXpp3Reader;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
-import java.io.*;
-import java.util.List;
+import com.google.common.collect.Lists;
public class DeployUtil {
private static final Logger logger = LoggerFactory.getLogger(DeployUtil.class);
@@ -192,7 +197,7 @@ public class DeployUtil {
File localBufferFile = new File(temp.getParent() + "/" + tablename + ".csv");
localBufferFile.createNewFile();
- InputStream hbaseDataStream = metaMgr.getStore().getResource("/data/" + tablename + ".csv");
+ InputStream hbaseDataStream = metaMgr.getStore().getResource("/data/" + tablename + ".csv").inputStream;
FileOutputStream localFileStream = new FileOutputStream(localBufferFile);
IOUtils.copy(hbaseDataStream, localFileStream);
http://git-wip-us.apache.org/repos/asf/incubator-kylin/blob/60daf39b/assembly/src/test/java/org/apache/kylin/job/dataGen/FactTableGenerator.java
----------------------------------------------------------------------
diff --git a/assembly/src/test/java/org/apache/kylin/job/dataGen/FactTableGenerator.java b/assembly/src/test/java/org/apache/kylin/job/dataGen/FactTableGenerator.java
index a965753..5a0fee7 100644
--- a/assembly/src/test/java/org/apache/kylin/job/dataGen/FactTableGenerator.java
+++ b/assembly/src/test/java/org/apache/kylin/job/dataGen/FactTableGenerator.java
@@ -106,8 +106,7 @@ public class FactTableGenerator {
*/
private void loadConfig() {
try {
- InputStream configStream = null;
- configStream = store.getResource("/data/data_gen_config.json");
+ InputStream configStream = store.getResource("/data/data_gen_config.json").inputStream;
this.genConf = GenConfig.loadConfig(configStream);
if (configStream != null)
@@ -135,7 +134,7 @@ public class FactTableGenerator {
}
String path = "/data/" + lookupTableName + ".csv";
- tableStream = store.getResource(path);
+ tableStream = store.getResource(path).inputStream;
tableReader = new BufferedReader(new InputStreamReader(tableStream));
tableReader.mark(0);
int rowCount = 0;
@@ -157,7 +156,7 @@ public class FactTableGenerator {
tableStream = null;
tableReader = null;
- tableStream = store.getResource(path);
+ tableStream = store.getResource(path).inputStream;
tableReader = new BufferedReader(new InputStreamReader(tableStream));
while ((curRow = tableReader.readLine()) != null) {
http://git-wip-us.apache.org/repos/asf/incubator-kylin/blob/60daf39b/core-common/src/main/java/org/apache/kylin/common/persistence/FileResourceStore.java
----------------------------------------------------------------------
diff --git a/core-common/src/main/java/org/apache/kylin/common/persistence/FileResourceStore.java b/core-common/src/main/java/org/apache/kylin/common/persistence/FileResourceStore.java
index e74a9b2..89e3a1d 100644
--- a/core-common/src/main/java/org/apache/kylin/common/persistence/FileResourceStore.java
+++ b/core-common/src/main/java/org/apache/kylin/common/persistence/FileResourceStore.java
@@ -79,14 +79,14 @@ public class FileResourceStore extends ResourceStore {
for (String resource : resources) {
if (resource.compareTo(rangeStart) >= 0 && resource.compareTo(rangeEnd) <= 0) {
if (existsImpl(resource)) {
- result.add(new RawResource(getResourceImpl(resource), getResourceTimestampImpl(resource)));
+ result.add(getResourceImpl(resource));
}
}
}
return result;
} catch (IOException ex) {
for (RawResource rawResource : result) {
- IOUtils.closeQuietly(rawResource.resource);
+ IOUtils.closeQuietly(rawResource.inputStream);
}
throw ex;
} catch (Exception ex) {
@@ -95,13 +95,13 @@ public class FileResourceStore extends ResourceStore {
}
@Override
- protected InputStream getResourceImpl(String resPath) throws IOException {
+ protected RawResource getResourceImpl(String resPath) throws IOException {
File f = file(resPath);
if (f.exists() && f.isFile()) {
if (f.length() == 0) {
logger.warn("Zero length file: " + f.getAbsolutePath());
}
- return new FileInputStream(f);
+ return new RawResource(new FileInputStream(f), f.lastModified());
} else {
return null;
}
@@ -110,7 +110,10 @@ public class FileResourceStore extends ResourceStore {
@Override
protected long getResourceTimestampImpl(String resPath) throws IOException {
File f = file(resPath);
- return f.lastModified();
+ if (f.exists() && f.isFile())
+ return f.lastModified();
+ else
+ return 0;
}
@Override
@@ -136,7 +139,7 @@ public class FileResourceStore extends ResourceStore {
putResourceImpl(resPath, new ByteArrayInputStream(content), newTS);
// some FS lose precision on given time stamp
- return getResourceTimestamp(resPath);
+ return f.lastModified();
}
@Override
http://git-wip-us.apache.org/repos/asf/incubator-kylin/blob/60daf39b/core-common/src/main/java/org/apache/kylin/common/persistence/RawResource.java
----------------------------------------------------------------------
diff --git a/core-common/src/main/java/org/apache/kylin/common/persistence/RawResource.java b/core-common/src/main/java/org/apache/kylin/common/persistence/RawResource.java
index 2c3238f..c0b5b21 100644
--- a/core-common/src/main/java/org/apache/kylin/common/persistence/RawResource.java
+++ b/core-common/src/main/java/org/apache/kylin/common/persistence/RawResource.java
@@ -6,11 +6,11 @@ import java.io.InputStream;
*/
public class RawResource {
- public final InputStream resource;
+ public final InputStream inputStream;
public final long timestamp;
public RawResource(InputStream resource, long timestamp) {
- this.resource = resource;
+ this.inputStream = resource;
this.timestamp = timestamp;
}
}
http://git-wip-us.apache.org/repos/asf/incubator-kylin/blob/60daf39b/core-common/src/main/java/org/apache/kylin/common/persistence/ResourceStore.java
----------------------------------------------------------------------
diff --git a/core-common/src/main/java/org/apache/kylin/common/persistence/ResourceStore.java b/core-common/src/main/java/org/apache/kylin/common/persistence/ResourceStore.java
index 89100a2..7a2178c 100644
--- a/core-common/src/main/java/org/apache/kylin/common/persistence/ResourceStore.java
+++ b/core-common/src/main/java/org/apache/kylin/common/persistence/ResourceStore.java
@@ -138,25 +138,29 @@ abstract public class ResourceStore {
*/
final public <T extends RootPersistentEntity> T getResource(String resPath, Class<T> clz, Serializer<T> serializer) throws IOException {
resPath = norm(resPath);
- InputStream in = getResourceImpl(resPath);
- if (in == null)
+ RawResource res = getResourceImpl(resPath);
+ if (res == null)
return null;
-
- DataInputStream din = new DataInputStream(in);
+
+ DataInputStream din = new DataInputStream(res.inputStream);
try {
T r = serializer.deserialize(din);
- r.setLastModified(getResourceTimestamp(resPath));
+ r.setLastModified(res.timestamp);
return r;
} finally {
IOUtils.closeQuietly(din);
- IOUtils.closeQuietly(in);
+ IOUtils.closeQuietly(res.inputStream);
}
}
- final public InputStream getResource(String resPath) throws IOException {
+ final public RawResource getResource(String resPath) throws IOException {
return getResourceImpl(norm(resPath));
}
+ final public long getResourceTimestamp(String resPath) throws IOException {
+ return getResourceTimestampImpl(norm(resPath));
+ }
+
final public <T extends RootPersistentEntity> List<T> getAllResources(String rangeStart, String rangeEnd, Class<T> clazz, Serializer<T> serializer) throws IOException {
final List<RawResource> allResources = getAllResources(rangeStart, rangeEnd);
if (allResources.isEmpty()) {
@@ -165,28 +169,26 @@ abstract public class ResourceStore {
List<T> result = Lists.newArrayList();
try {
for (RawResource rawResource : allResources) {
- final T element = serializer.deserialize(new DataInputStream(rawResource.resource));
+ final T element = serializer.deserialize(new DataInputStream(rawResource.inputStream));
element.setLastModified(rawResource.timestamp);
result.add(element);
}
return result;
} finally {
for (RawResource rawResource : allResources) {
- IOUtils.closeQuietly(rawResource.resource);
+ IOUtils.closeQuietly(rawResource.inputStream);
}
}
}
abstract protected List<RawResource> getAllResources(String rangeStart, String rangeEnd) throws IOException;
- abstract protected InputStream getResourceImpl(String resPath) throws IOException;
-
- final public long getResourceTimestamp(String resPath) throws IOException {
- return getResourceTimestampImpl(norm(resPath));
- }
+ /** returns null if not exists */
+ abstract protected RawResource getResourceImpl(String resPath) throws IOException;
+ /** returns 0 if not exists */
abstract protected long getResourceTimestampImpl(String resPath) throws IOException;
-
+
/**
* overwrite a resource without write conflict check
*/
http://git-wip-us.apache.org/repos/asf/incubator-kylin/blob/60daf39b/core-common/src/main/java/org/apache/kylin/common/persistence/ResourceTool.java
----------------------------------------------------------------------
diff --git a/core-common/src/main/java/org/apache/kylin/common/persistence/ResourceTool.java b/core-common/src/main/java/org/apache/kylin/common/persistence/ResourceTool.java
index 7e89c21..0efea6d 100644
--- a/core-common/src/main/java/org/apache/kylin/common/persistence/ResourceTool.java
+++ b/core-common/src/main/java/org/apache/kylin/common/persistence/ResourceTool.java
@@ -19,7 +19,6 @@
package org.apache.kylin.common.persistence;
import java.io.IOException;
-import java.io.InputStream;
import java.util.ArrayList;
import org.apache.kylin.common.KylinConfig;
@@ -85,13 +84,12 @@ public class ResourceTool {
// case of resource (not a folder)
if (children == null) {
if (matchExclude(path) == false) {
- InputStream content = src.getResource(path);
- long ts = src.getResourceTimestamp(path);
- if (content != null) {
- dst.putResource(path, content, ts);
- content.close();
+ RawResource res = src.getResource(path);
+ if (res != null) {
+ dst.putResource(path, res.inputStream, res.timestamp);
+ res.inputStream.close();
} else {
- System.out.println("Null inputstream for " + path);
+ System.out.println("Resource not exist for " + path);
}
}
}
http://git-wip-us.apache.org/repos/asf/incubator-kylin/blob/60daf39b/core-metadata/src/main/java/org/apache/kylin/metadata/MetadataManager.java
----------------------------------------------------------------------
diff --git a/core-metadata/src/main/java/org/apache/kylin/metadata/MetadataManager.java b/core-metadata/src/main/java/org/apache/kylin/metadata/MetadataManager.java
index c907afd..d2caa35 100644
--- a/core-metadata/src/main/java/org/apache/kylin/metadata/MetadataManager.java
+++ b/core-metadata/src/main/java/org/apache/kylin/metadata/MetadataManager.java
@@ -34,6 +34,7 @@ import java.util.concurrent.ConcurrentHashMap;
import org.apache.kylin.common.KylinConfig;
import org.apache.kylin.common.persistence.JsonSerializer;
+import org.apache.kylin.common.persistence.RawResource;
import org.apache.kylin.common.persistence.ResourceStore;
import org.apache.kylin.common.persistence.Serializer;
import org.apache.kylin.common.restclient.Broadcaster;
@@ -226,11 +227,13 @@ public class MetadataManager {
Map<String, String> attrs = Maps.newHashMap();
ResourceStore store = getStore();
- InputStream is = store.getResource(path);
- if (is == null) {
+ RawResource res = store.getResource(path);
+ if (res == null) {
logger.warn("Failed to get table exd info from " + path);
return null;
}
+
+ InputStream is = res.inputStream;
try {
attrs.putAll(JsonUtil.readValue(is, HashMap.class));
http://git-wip-us.apache.org/repos/asf/incubator-kylin/blob/60daf39b/engine-mr/src/main/java/org/apache/kylin/engine/mr/common/AbstractHadoopJob.java
----------------------------------------------------------------------
diff --git a/engine-mr/src/main/java/org/apache/kylin/engine/mr/common/AbstractHadoopJob.java b/engine-mr/src/main/java/org/apache/kylin/engine/mr/common/AbstractHadoopJob.java
index 366a730..f031f76 100644
--- a/engine-mr/src/main/java/org/apache/kylin/engine/mr/common/AbstractHadoopJob.java
+++ b/engine-mr/src/main/java/org/apache/kylin/engine/mr/common/AbstractHadoopJob.java
@@ -27,7 +27,6 @@ import static org.apache.hadoop.util.StringUtils.*;
import java.io.File;
import java.io.IOException;
-import java.io.InputStream;
import java.util.ArrayList;
import java.util.HashMap;
import java.util.Map;
@@ -52,16 +51,16 @@ import org.apache.hadoop.util.ReflectionUtils;
import org.apache.hadoop.util.Tool;
import org.apache.hadoop.util.ToolRunner;
import org.apache.kylin.common.KylinConfig;
+import org.apache.kylin.common.persistence.RawResource;
import org.apache.kylin.common.persistence.ResourceStore;
import org.apache.kylin.common.util.CliCommandExecutor;
+import org.apache.kylin.common.util.OptionsHelper;
import org.apache.kylin.common.util.StringSplitter;
import org.apache.kylin.cube.CubeInstance;
import org.apache.kylin.cube.CubeSegment;
import org.apache.kylin.engine.mr.HadoopUtil;
import org.apache.kylin.invertedindex.IIInstance;
-import org.apache.kylin.invertedindex.IISegment;
import org.apache.kylin.job.JobInstance;
-import org.apache.kylin.common.util.OptionsHelper;
import org.apache.kylin.job.exception.JobException;
import org.apache.kylin.metadata.MetadataManager;
import org.apache.kylin.metadata.model.TableDesc;
@@ -395,14 +394,11 @@ public abstract class AbstractHadoopJob extends Configured implements Tool {
KylinConfig localConfig = KylinConfig.createInstanceFromUri(metaDir.getAbsolutePath());
ResourceStore to = ResourceStore.getStore(localConfig);
for (String path : dumpList) {
- InputStream in = from.getResource(path);
- if (in == null)
+ RawResource res = from.getResource(path);
+ if (res == null)
throw new IllegalStateException("No resource found at -- " + path);
- long ts = from.getResourceTimestamp(path);
- to.putResource(path, in, ts);
- in.close();
- //The following log is duplicate with in ResourceStore
- //log.info("Dumped resource " + path + " to " + metaDir.getAbsolutePath());
+ to.putResource(path, res.inputStream, res.timestamp);
+ res.inputStream.close();
}
}
http://git-wip-us.apache.org/repos/asf/incubator-kylin/blob/60daf39b/engine-mr/src/main/java/org/apache/kylin/engine/mr/steps/MergeStatisticsStep.java
----------------------------------------------------------------------
diff --git a/engine-mr/src/main/java/org/apache/kylin/engine/mr/steps/MergeStatisticsStep.java b/engine-mr/src/main/java/org/apache/kylin/engine/mr/steps/MergeStatisticsStep.java
index 45282df..3502271 100644
--- a/engine-mr/src/main/java/org/apache/kylin/engine/mr/steps/MergeStatisticsStep.java
+++ b/engine-mr/src/main/java/org/apache/kylin/engine/mr/steps/MergeStatisticsStep.java
@@ -83,7 +83,7 @@ public class MergeStatisticsStep extends AbstractExecutable {
int averageSamplingPercentage = 0;
for (String segmentId : this.getMergingSegmentIds()) {
String fileKey = CubeSegment.getStatisticsResourcePath(getCubeName(), segmentId);
- InputStream is = rs.getResource(fileKey);
+ InputStream is = rs.getResource(fileKey).inputStream;
File tempFile = null;
FileOutputStream tempFileStream = null;
try {
http://git-wip-us.apache.org/repos/asf/incubator-kylin/blob/60daf39b/engine-streaming/src/main/java/org/apache/kylin/engine/streaming/StreamingManager.java
----------------------------------------------------------------------
diff --git a/engine-streaming/src/main/java/org/apache/kylin/engine/streaming/StreamingManager.java b/engine-streaming/src/main/java/org/apache/kylin/engine/streaming/StreamingManager.java
index 8cabe1b..583eb7b 100644
--- a/engine-streaming/src/main/java/org/apache/kylin/engine/streaming/StreamingManager.java
+++ b/engine-streaming/src/main/java/org/apache/kylin/engine/streaming/StreamingManager.java
@@ -47,9 +47,11 @@ import java.util.List;
import java.util.Map;
import java.util.concurrent.ConcurrentHashMap;
+import org.apache.commons.io.IOUtils;
import org.apache.commons.lang3.StringUtils;
import org.apache.kylin.common.KylinConfig;
import org.apache.kylin.common.persistence.JsonSerializer;
+import org.apache.kylin.common.persistence.RawResource;
import org.apache.kylin.common.persistence.ResourceStore;
import org.apache.kylin.common.persistence.Serializer;
import org.apache.kylin.common.restclient.Broadcaster;
@@ -119,10 +121,6 @@ public class StreamingManager {
}
}
- private boolean checkExistence(String name) {
- return true;
- }
-
private String formatStreamingConfigPath(String name) {
return ResourceStore.STREAMING_RESOURCE_ROOT + "/" + name + ".json";
}
@@ -224,17 +222,21 @@ public class StreamingManager {
public long getOffset(String streaming, int shard) {
final String resPath = formatStreamingOutputPath(streaming, shard);
+ InputStream inputStream = null;
try {
- final InputStream inputStream = getStore().getResource(resPath);
- if (inputStream == null) {
+ final RawResource res = getStore().getResource(resPath);
+ if (res == null) {
return 0;
} else {
+ inputStream = res.inputStream;
final BufferedReader br = new BufferedReader(new InputStreamReader(inputStream));
return Long.parseLong(br.readLine());
}
} catch (Exception e) {
logger.error("error get offset, path:" + resPath, e);
throw new RuntimeException("error get offset, path:" + resPath, e);
+ } finally {
+ IOUtils.closeQuietly(inputStream);
}
}
@@ -252,16 +254,20 @@ public class StreamingManager {
public Map<Integer, Long> getOffset(String streaming, List<Integer> partitions) {
Collections.sort(partitions);
final String resPath = formatStreamingOutputPath(streaming, partitions);
+ InputStream inputStream = null;
try {
- final InputStream inputStream = getStore().getResource(resPath);
- if (inputStream == null) {
- return Collections.emptyMap();
- }
+ RawResource res = getStore().getResource(resPath);
+ if (res == null)
+ return Collections.emptyMap();
+
+ inputStream = res.inputStream;
final HashMap<Integer, Long> result = mapper.readValue(inputStream, mapType);
return result;
} catch (IOException e) {
logger.error("error get offset, path:" + resPath, e);
throw new RuntimeException("error get offset, path:" + resPath, e);
+ } finally {
+ IOUtils.closeQuietly(inputStream);
}
}
http://git-wip-us.apache.org/repos/asf/incubator-kylin/blob/60daf39b/query/src/test/java/org/apache/kylin/query/test/H2Database.java
----------------------------------------------------------------------
diff --git a/query/src/test/java/org/apache/kylin/query/test/H2Database.java b/query/src/test/java/org/apache/kylin/query/test/H2Database.java
index 54e0914..cab51d3 100644
--- a/query/src/test/java/org/apache/kylin/query/test/H2Database.java
+++ b/query/src/test/java/org/apache/kylin/query/test/H2Database.java
@@ -36,6 +36,7 @@ import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
public class H2Database {
+ @SuppressWarnings("unused")
private static final Logger logger = LoggerFactory.getLogger(H2Database.class);
private static final String[] ALL_TABLES = new String[] { "edw.test_cal_dt", "default.test_category_groupings", "default.test_kylin_fact", "edw.test_seller_type_dim", "edw.test_sites", "default.streaming_table" };
@@ -72,7 +73,7 @@ public class H2Database {
tempFile = File.createTempFile("tmp_h2", ".csv");
FileOutputStream tempFileStream = new FileOutputStream(tempFile);
String normalPath = "/data/" + tableDesc.getIdentity() + ".csv";
- InputStream csvStream = metaMgr.getStore().getResource(normalPath);
+ InputStream csvStream = metaMgr.getStore().getResource(normalPath).inputStream;
org.apache.commons.io.IOUtils.copy(csvStream, tempFileStream);
http://git-wip-us.apache.org/repos/asf/incubator-kylin/blob/60daf39b/storage-hbase/src/main/java/org/apache/kylin/storage/hbase/HBaseResourceStore.java
----------------------------------------------------------------------
diff --git a/storage-hbase/src/main/java/org/apache/kylin/storage/hbase/HBaseResourceStore.java b/storage-hbase/src/main/java/org/apache/kylin/storage/hbase/HBaseResourceStore.java
index 560850f..eecbfb5 100644
--- a/storage-hbase/src/main/java/org/apache/kylin/storage/hbase/HBaseResourceStore.java
+++ b/storage-hbase/src/main/java/org/apache/kylin/storage/hbase/HBaseResourceStore.java
@@ -146,7 +146,7 @@ public class HBaseResourceStore extends ResourceStore {
@Override
protected boolean existsImpl(String resPath) throws IOException {
- Result r = getByScan(resPath, null, null);
+ Result r = getByScan(resPath, false, false);
return r != null;
}
@@ -168,7 +168,7 @@ public class HBaseResourceStore extends ResourceStore {
}
} catch (IOException e) {
for (RawResource rawResource : result) {
- IOUtils.closeQuietly(rawResource.resource);
+ IOUtils.closeQuietly(rawResource.inputStream);
}
throw e;
} finally {
@@ -202,17 +202,19 @@ public class HBaseResourceStore extends ResourceStore {
}
@Override
- protected InputStream getResourceImpl(String resPath) throws IOException {
- Result r = getByScan(resPath, B_FAMILY, B_COLUMN);
- return getInputStream(resPath, r);
+ protected RawResource getResourceImpl(String resPath) throws IOException {
+ Result r = getByScan(resPath, true, true);
+ if (r == null)
+ return null;
+ else
+ return new RawResource(getInputStream(resPath, r), getTimestamp(r));
}
@Override
protected long getResourceTimestampImpl(String resPath) throws IOException {
- Result r = getByScan(resPath, B_FAMILY, B_COLUMN_TS);
- return getTimestamp(r);
+ return getTimestamp(getByScan(resPath, false, true));
}
-
+
@Override
protected void putResourceImpl(String resPath, InputStream content, long ts) throws IOException {
ByteArrayOutputStream bout = new ByteArrayOutputStream();
@@ -242,7 +244,7 @@ public class HBaseResourceStore extends ResourceStore {
boolean ok = table.checkAndPut(row, B_FAMILY, B_COLUMN_TS, bOldTS, put);
logger.info("Update row " + resPath + " from oldTs: " + oldTS + ", to newTs: " + newTS + ", operation result: " + ok);
if (!ok) {
- long real = getResourceTimestamp(resPath);
+ long real = getResourceTimestampImpl(resPath);
throw new IllegalStateException("Overwriting conflict " + resPath + ", expect old TS " + oldTS + ", but it is " + real);
}
@@ -271,15 +273,18 @@ public class HBaseResourceStore extends ResourceStore {
return getAllInOneTableName() + "(key='" + resPath + "')@" + kylinConfig.getMetadataUrl();
}
- private Result getByScan(String path, byte[] family, byte[] column) throws IOException {
+ private Result getByScan(String path, boolean fetchContent, boolean fetchTimestamp) throws IOException {
byte[] startRow = Bytes.toBytes(path);
byte[] endRow = plusZero(startRow);
Scan scan = new Scan(startRow, endRow);
- if (family == null || column == null) {
+ if (!fetchContent && !fetchTimestamp) {
scan.setFilter(new KeyOnlyFilter());
} else {
- scan.addColumn(family, column);
+ if (fetchContent)
+ scan.addColumn(B_FAMILY, B_COLUMN);
+ if (fetchTimestamp)
+ scan.addColumn(B_FAMILY, B_COLUMN_TS);
}
HTableInterface table = getConnection().getTable(getAllInOneTableName());
http://git-wip-us.apache.org/repos/asf/incubator-kylin/blob/60daf39b/storage-hbase/src/main/java/org/apache/kylin/storage/hbase/steps/CreateHTableJob.java
----------------------------------------------------------------------
diff --git a/storage-hbase/src/main/java/org/apache/kylin/storage/hbase/steps/CreateHTableJob.java b/storage-hbase/src/main/java/org/apache/kylin/storage/hbase/steps/CreateHTableJob.java
index 8bff4d1..e8b21d3 100644
--- a/storage-hbase/src/main/java/org/apache/kylin/storage/hbase/steps/CreateHTableJob.java
+++ b/storage-hbase/src/main/java/org/apache/kylin/storage/hbase/steps/CreateHTableJob.java
@@ -165,7 +165,7 @@ public class CreateHTableJob extends AbstractHadoopJob {
public static Map<Long, Long> getCubeRowCountMapFromCuboidStatistics(CubeSegment cubeSegment, KylinConfig kylinConfig, Configuration conf) throws IOException {
ResourceStore rs = ResourceStore.getStore(kylinConfig);
String fileKey = cubeSegment.getStatisticsResourcePath();
- InputStream is = rs.getResource(fileKey);
+ InputStream is = rs.getResource(fileKey).inputStream;
File tempFile = null;
FileOutputStream tempFileStream = null;
try {
http://git-wip-us.apache.org/repos/asf/incubator-kylin/blob/60daf39b/storage-hbase/src/main/java/org/apache/kylin/storage/hbase/util/CubeMigrationCLI.java
----------------------------------------------------------------------
diff --git a/storage-hbase/src/main/java/org/apache/kylin/storage/hbase/util/CubeMigrationCLI.java b/storage-hbase/src/main/java/org/apache/kylin/storage/hbase/util/CubeMigrationCLI.java
index aec1d5b..6a4d5e3 100644
--- a/storage-hbase/src/main/java/org/apache/kylin/storage/hbase/util/CubeMigrationCLI.java
+++ b/storage-hbase/src/main/java/org/apache/kylin/storage/hbase/util/CubeMigrationCLI.java
@@ -19,7 +19,6 @@
package org.apache.kylin.storage.hbase.util;
import java.io.IOException;
-import java.io.InputStream;
import java.util.ArrayList;
import java.util.List;
import java.util.Map;
@@ -34,6 +33,7 @@ import org.apache.hadoop.hbase.TableName;
import org.apache.hadoop.hbase.client.HBaseAdmin;
import org.apache.kylin.common.KylinConfig;
import org.apache.kylin.common.persistence.JsonSerializer;
+import org.apache.kylin.common.persistence.RawResource;
import org.apache.kylin.common.persistence.ResourceStore;
import org.apache.kylin.common.persistence.Serializer;
import org.apache.kylin.cube.CubeInstance;
@@ -305,10 +305,9 @@ public class CubeMigrationCLI {
}
case COPY_FILE_IN_META: {
String item = (String) opt.params[0];
- InputStream inputStream = srcStore.getResource(item);
- long ts = srcStore.getResourceTimestamp(item);
- dstStore.putResource(item, inputStream, ts);
- inputStream.close();
+ RawResource res = srcStore.getResource(item);
+ dstStore.putResource(item, res.inputStream, res.timestamp);
+ res.inputStream.close();
logger.info("Item " + item + " is copied");
break;
}