You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@kylin.apache.org by li...@apache.org on 2015/11/17 06:53:01 UTC

incubator-kylin git commit: KYLIN-1152 ResourceStore to return content and timestamp together

Repository: incubator-kylin
Updated Branches:
  refs/heads/2.x-staging 6ae2bfc48 -> 60daf39b3


KYLIN-1152 ResourceStore to return content and timestamp together


Project: http://git-wip-us.apache.org/repos/asf/incubator-kylin/repo
Commit: http://git-wip-us.apache.org/repos/asf/incubator-kylin/commit/60daf39b
Tree: http://git-wip-us.apache.org/repos/asf/incubator-kylin/tree/60daf39b
Diff: http://git-wip-us.apache.org/repos/asf/incubator-kylin/diff/60daf39b

Branch: refs/heads/2.x-staging
Commit: 60daf39b3dcabe6d48b611a6268729f6af8aeae1
Parents: 6ae2bfc
Author: Li, Yang <ya...@ebay.com>
Authored: Tue Nov 17 13:52:18 2015 +0800
Committer: Li, Yang <ya...@ebay.com>
Committed: Tue Nov 17 13:52:50 2015 +0800

----------------------------------------------------------------------
 .../java/org/apache/kylin/job/DeployUtil.java   | 17 +++++++----
 .../kylin/job/dataGen/FactTableGenerator.java   |  7 ++---
 .../common/persistence/FileResourceStore.java   | 15 +++++----
 .../kylin/common/persistence/RawResource.java   |  4 +--
 .../kylin/common/persistence/ResourceStore.java | 32 +++++++++++---------
 .../kylin/common/persistence/ResourceTool.java  | 12 +++-----
 .../apache/kylin/metadata/MetadataManager.java  |  7 +++--
 .../engine/mr/common/AbstractHadoopJob.java     | 16 ++++------
 .../engine/mr/steps/MergeStatisticsStep.java    |  2 +-
 .../engine/streaming/StreamingManager.java      | 26 ++++++++++------
 .../org/apache/kylin/query/test/H2Database.java |  3 +-
 .../kylin/storage/hbase/HBaseResourceStore.java | 29 ++++++++++--------
 .../storage/hbase/steps/CreateHTableJob.java    |  2 +-
 .../storage/hbase/util/CubeMigrationCLI.java    |  9 +++---
 14 files changed, 99 insertions(+), 82 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/incubator-kylin/blob/60daf39b/assembly/src/test/java/org/apache/kylin/job/DeployUtil.java
----------------------------------------------------------------------
diff --git a/assembly/src/test/java/org/apache/kylin/job/DeployUtil.java b/assembly/src/test/java/org/apache/kylin/job/DeployUtil.java
index 722b6b8..2f51475 100644
--- a/assembly/src/test/java/org/apache/kylin/job/DeployUtil.java
+++ b/assembly/src/test/java/org/apache/kylin/job/DeployUtil.java
@@ -18,9 +18,17 @@
 
 package org.apache.kylin.job;
 
-import com.google.common.collect.Lists;
+import java.io.ByteArrayInputStream;
+import java.io.File;
+import java.io.FileOutputStream;
+import java.io.FileReader;
+import java.io.IOException;
+import java.io.InputStream;
+import java.util.List;
+
 import kafka.message.Message;
 import kafka.message.MessageAndOffset;
+
 import org.apache.commons.io.IOUtils;
 import org.apache.commons.lang.StringUtils;
 import org.apache.kylin.common.KylinConfig;
@@ -30,7 +38,6 @@ import org.apache.kylin.common.util.AbstractKylinTestCase;
 import org.apache.kylin.cube.CubeInstance;
 import org.apache.kylin.cube.CubeManager;
 import org.apache.kylin.cube.CubeUpdate;
-import org.apache.kylin.engine.streaming.StreamingConfig;
 import org.apache.kylin.job.dataGen.FactTableGenerator;
 import org.apache.kylin.job.streaming.KafkaDataLoader;
 import org.apache.kylin.job.streaming.StreamingTableDataGenerator;
@@ -39,7 +46,6 @@ import org.apache.kylin.metadata.model.ColumnDesc;
 import org.apache.kylin.metadata.model.TableDesc;
 import org.apache.kylin.metadata.model.TblColRef;
 import org.apache.kylin.source.hive.HiveClient;
-import org.apache.kylin.source.kafka.KafkaConfigManager;
 import org.apache.kylin.source.kafka.TimedJsonStreamParser;
 import org.apache.kylin.source.kafka.config.KafkaConfig;
 import org.apache.maven.model.Model;
@@ -47,8 +53,7 @@ import org.apache.maven.model.io.xpp3.MavenXpp3Reader;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 
-import java.io.*;
-import java.util.List;
+import com.google.common.collect.Lists;
 
 public class DeployUtil {
     private static final Logger logger = LoggerFactory.getLogger(DeployUtil.class);
@@ -192,7 +197,7 @@ public class DeployUtil {
             File localBufferFile = new File(temp.getParent() + "/" + tablename + ".csv");
             localBufferFile.createNewFile();
 
-            InputStream hbaseDataStream = metaMgr.getStore().getResource("/data/" + tablename + ".csv");
+            InputStream hbaseDataStream = metaMgr.getStore().getResource("/data/" + tablename + ".csv").inputStream;
             FileOutputStream localFileStream = new FileOutputStream(localBufferFile);
             IOUtils.copy(hbaseDataStream, localFileStream);
 

http://git-wip-us.apache.org/repos/asf/incubator-kylin/blob/60daf39b/assembly/src/test/java/org/apache/kylin/job/dataGen/FactTableGenerator.java
----------------------------------------------------------------------
diff --git a/assembly/src/test/java/org/apache/kylin/job/dataGen/FactTableGenerator.java b/assembly/src/test/java/org/apache/kylin/job/dataGen/FactTableGenerator.java
index a965753..5a0fee7 100644
--- a/assembly/src/test/java/org/apache/kylin/job/dataGen/FactTableGenerator.java
+++ b/assembly/src/test/java/org/apache/kylin/job/dataGen/FactTableGenerator.java
@@ -106,8 +106,7 @@ public class FactTableGenerator {
      */
     private void loadConfig() {
         try {
-            InputStream configStream = null;
-            configStream = store.getResource("/data/data_gen_config.json");
+            InputStream configStream = store.getResource("/data/data_gen_config.json").inputStream;
             this.genConf = GenConfig.loadConfig(configStream);
 
             if (configStream != null)
@@ -135,7 +134,7 @@ public class FactTableGenerator {
             }
 
             String path = "/data/" + lookupTableName + ".csv";
-            tableStream = store.getResource(path);
+            tableStream = store.getResource(path).inputStream;
             tableReader = new BufferedReader(new InputStreamReader(tableStream));
             tableReader.mark(0);
             int rowCount = 0;
@@ -157,7 +156,7 @@ public class FactTableGenerator {
             tableStream = null;
             tableReader = null;
 
-            tableStream = store.getResource(path);
+            tableStream = store.getResource(path).inputStream;
             tableReader = new BufferedReader(new InputStreamReader(tableStream));
 
             while ((curRow = tableReader.readLine()) != null) {

http://git-wip-us.apache.org/repos/asf/incubator-kylin/blob/60daf39b/core-common/src/main/java/org/apache/kylin/common/persistence/FileResourceStore.java
----------------------------------------------------------------------
diff --git a/core-common/src/main/java/org/apache/kylin/common/persistence/FileResourceStore.java b/core-common/src/main/java/org/apache/kylin/common/persistence/FileResourceStore.java
index e74a9b2..89e3a1d 100644
--- a/core-common/src/main/java/org/apache/kylin/common/persistence/FileResourceStore.java
+++ b/core-common/src/main/java/org/apache/kylin/common/persistence/FileResourceStore.java
@@ -79,14 +79,14 @@ public class FileResourceStore extends ResourceStore {
             for (String resource : resources) {
                 if (resource.compareTo(rangeStart) >= 0 && resource.compareTo(rangeEnd) <= 0) {
                     if (existsImpl(resource)) {
-                        result.add(new RawResource(getResourceImpl(resource), getResourceTimestampImpl(resource)));
+                        result.add(getResourceImpl(resource));
                     }
                 }
             }
             return result;
         } catch (IOException ex) {
             for (RawResource rawResource : result) {
-                IOUtils.closeQuietly(rawResource.resource);
+                IOUtils.closeQuietly(rawResource.inputStream);
             }
             throw ex;
         } catch (Exception ex) {
@@ -95,13 +95,13 @@ public class FileResourceStore extends ResourceStore {
     }
 
     @Override
-    protected InputStream getResourceImpl(String resPath) throws IOException {
+    protected RawResource getResourceImpl(String resPath) throws IOException {
         File f = file(resPath);
         if (f.exists() && f.isFile()) {
             if (f.length() == 0) {
                 logger.warn("Zero length file: " + f.getAbsolutePath());
             }
-            return new FileInputStream(f);
+            return new RawResource(new FileInputStream(f), f.lastModified());
         } else {
             return null;
         }
@@ -110,7 +110,10 @@ public class FileResourceStore extends ResourceStore {
     @Override
     protected long getResourceTimestampImpl(String resPath) throws IOException {
         File f = file(resPath);
-        return f.lastModified();
+        if (f.exists() && f.isFile())
+            return f.lastModified();
+        else
+            return 0;
     }
 
     @Override
@@ -136,7 +139,7 @@ public class FileResourceStore extends ResourceStore {
         putResourceImpl(resPath, new ByteArrayInputStream(content), newTS);
 
         // some FS lose precision on given time stamp
-        return getResourceTimestamp(resPath);
+        return f.lastModified();
     }
 
     @Override

http://git-wip-us.apache.org/repos/asf/incubator-kylin/blob/60daf39b/core-common/src/main/java/org/apache/kylin/common/persistence/RawResource.java
----------------------------------------------------------------------
diff --git a/core-common/src/main/java/org/apache/kylin/common/persistence/RawResource.java b/core-common/src/main/java/org/apache/kylin/common/persistence/RawResource.java
index 2c3238f..c0b5b21 100644
--- a/core-common/src/main/java/org/apache/kylin/common/persistence/RawResource.java
+++ b/core-common/src/main/java/org/apache/kylin/common/persistence/RawResource.java
@@ -6,11 +6,11 @@ import java.io.InputStream;
  */
 public class RawResource {
 
-    public final InputStream resource;
+    public final InputStream inputStream;
     public final long timestamp;
 
     public RawResource(InputStream resource, long timestamp) {
-        this.resource = resource;
+        this.inputStream = resource;
         this.timestamp = timestamp;
     }
 }

http://git-wip-us.apache.org/repos/asf/incubator-kylin/blob/60daf39b/core-common/src/main/java/org/apache/kylin/common/persistence/ResourceStore.java
----------------------------------------------------------------------
diff --git a/core-common/src/main/java/org/apache/kylin/common/persistence/ResourceStore.java b/core-common/src/main/java/org/apache/kylin/common/persistence/ResourceStore.java
index 89100a2..7a2178c 100644
--- a/core-common/src/main/java/org/apache/kylin/common/persistence/ResourceStore.java
+++ b/core-common/src/main/java/org/apache/kylin/common/persistence/ResourceStore.java
@@ -138,25 +138,29 @@ abstract public class ResourceStore {
      */
     final public <T extends RootPersistentEntity> T getResource(String resPath, Class<T> clz, Serializer<T> serializer) throws IOException {
         resPath = norm(resPath);
-        InputStream in = getResourceImpl(resPath);
-        if (in == null)
+        RawResource res = getResourceImpl(resPath);
+        if (res == null)
             return null;
-
-        DataInputStream din = new DataInputStream(in);
+        
+        DataInputStream din = new DataInputStream(res.inputStream);
         try {
             T r = serializer.deserialize(din);
-            r.setLastModified(getResourceTimestamp(resPath));
+            r.setLastModified(res.timestamp);
             return r;
         } finally {
             IOUtils.closeQuietly(din);
-            IOUtils.closeQuietly(in);
+            IOUtils.closeQuietly(res.inputStream);
         }
     }
 
-    final public InputStream getResource(String resPath) throws IOException {
+    final public RawResource getResource(String resPath) throws IOException {
         return getResourceImpl(norm(resPath));
     }
 
+    final public long getResourceTimestamp(String resPath) throws IOException {
+        return getResourceTimestampImpl(norm(resPath));
+    }
+    
     final public <T extends RootPersistentEntity> List<T> getAllResources(String rangeStart, String rangeEnd, Class<T> clazz, Serializer<T> serializer) throws IOException {
         final List<RawResource> allResources = getAllResources(rangeStart, rangeEnd);
         if (allResources.isEmpty()) {
@@ -165,28 +169,26 @@ abstract public class ResourceStore {
         List<T> result = Lists.newArrayList();
         try {
             for (RawResource rawResource : allResources) {
-                final T element = serializer.deserialize(new DataInputStream(rawResource.resource));
+                final T element = serializer.deserialize(new DataInputStream(rawResource.inputStream));
                 element.setLastModified(rawResource.timestamp);
                 result.add(element);
             }
             return result;
         } finally {
             for (RawResource rawResource : allResources) {
-                IOUtils.closeQuietly(rawResource.resource);
+                IOUtils.closeQuietly(rawResource.inputStream);
             }
         }
     }
 
     abstract protected List<RawResource> getAllResources(String rangeStart, String rangeEnd) throws IOException;
 
-    abstract protected InputStream getResourceImpl(String resPath) throws IOException;
-
-    final public long getResourceTimestamp(String resPath) throws IOException {
-        return getResourceTimestampImpl(norm(resPath));
-    }
+    /** returns null if not exists */
+    abstract protected RawResource getResourceImpl(String resPath) throws IOException;
 
+    /** returns 0 if not exists */
     abstract protected long getResourceTimestampImpl(String resPath) throws IOException;
-
+    
     /**
      * overwrite a resource without write conflict check
      */

http://git-wip-us.apache.org/repos/asf/incubator-kylin/blob/60daf39b/core-common/src/main/java/org/apache/kylin/common/persistence/ResourceTool.java
----------------------------------------------------------------------
diff --git a/core-common/src/main/java/org/apache/kylin/common/persistence/ResourceTool.java b/core-common/src/main/java/org/apache/kylin/common/persistence/ResourceTool.java
index 7e89c21..0efea6d 100644
--- a/core-common/src/main/java/org/apache/kylin/common/persistence/ResourceTool.java
+++ b/core-common/src/main/java/org/apache/kylin/common/persistence/ResourceTool.java
@@ -19,7 +19,6 @@
 package org.apache.kylin.common.persistence;
 
 import java.io.IOException;
-import java.io.InputStream;
 import java.util.ArrayList;
 
 import org.apache.kylin.common.KylinConfig;
@@ -85,13 +84,12 @@ public class ResourceTool {
         // case of resource (not a folder)
         if (children == null) {
             if (matchExclude(path) == false) {
-                InputStream content = src.getResource(path);
-                long ts = src.getResourceTimestamp(path);
-                if (content != null) {
-                    dst.putResource(path, content, ts);
-                    content.close();
+                RawResource res = src.getResource(path);
+                if (res != null) {
+                    dst.putResource(path, res.inputStream, res.timestamp);
+                    res.inputStream.close();
                 } else {
-                    System.out.println("Null inputstream for " + path);
+                    System.out.println("Resource not exist for " + path);
                 }
             }
         }

http://git-wip-us.apache.org/repos/asf/incubator-kylin/blob/60daf39b/core-metadata/src/main/java/org/apache/kylin/metadata/MetadataManager.java
----------------------------------------------------------------------
diff --git a/core-metadata/src/main/java/org/apache/kylin/metadata/MetadataManager.java b/core-metadata/src/main/java/org/apache/kylin/metadata/MetadataManager.java
index c907afd..d2caa35 100644
--- a/core-metadata/src/main/java/org/apache/kylin/metadata/MetadataManager.java
+++ b/core-metadata/src/main/java/org/apache/kylin/metadata/MetadataManager.java
@@ -34,6 +34,7 @@ import java.util.concurrent.ConcurrentHashMap;
 
 import org.apache.kylin.common.KylinConfig;
 import org.apache.kylin.common.persistence.JsonSerializer;
+import org.apache.kylin.common.persistence.RawResource;
 import org.apache.kylin.common.persistence.ResourceStore;
 import org.apache.kylin.common.persistence.Serializer;
 import org.apache.kylin.common.restclient.Broadcaster;
@@ -226,11 +227,13 @@ public class MetadataManager {
         Map<String, String> attrs = Maps.newHashMap();
 
         ResourceStore store = getStore();
-        InputStream is = store.getResource(path);
-        if (is == null) {
+        RawResource res = store.getResource(path);
+        if (res == null) {
             logger.warn("Failed to get table exd info from " + path);
             return null;
         }
+        
+        InputStream is = res.inputStream;
 
         try {
             attrs.putAll(JsonUtil.readValue(is, HashMap.class));

http://git-wip-us.apache.org/repos/asf/incubator-kylin/blob/60daf39b/engine-mr/src/main/java/org/apache/kylin/engine/mr/common/AbstractHadoopJob.java
----------------------------------------------------------------------
diff --git a/engine-mr/src/main/java/org/apache/kylin/engine/mr/common/AbstractHadoopJob.java b/engine-mr/src/main/java/org/apache/kylin/engine/mr/common/AbstractHadoopJob.java
index 366a730..f031f76 100644
--- a/engine-mr/src/main/java/org/apache/kylin/engine/mr/common/AbstractHadoopJob.java
+++ b/engine-mr/src/main/java/org/apache/kylin/engine/mr/common/AbstractHadoopJob.java
@@ -27,7 +27,6 @@ import static org.apache.hadoop.util.StringUtils.*;
 
 import java.io.File;
 import java.io.IOException;
-import java.io.InputStream;
 import java.util.ArrayList;
 import java.util.HashMap;
 import java.util.Map;
@@ -52,16 +51,16 @@ import org.apache.hadoop.util.ReflectionUtils;
 import org.apache.hadoop.util.Tool;
 import org.apache.hadoop.util.ToolRunner;
 import org.apache.kylin.common.KylinConfig;
+import org.apache.kylin.common.persistence.RawResource;
 import org.apache.kylin.common.persistence.ResourceStore;
 import org.apache.kylin.common.util.CliCommandExecutor;
+import org.apache.kylin.common.util.OptionsHelper;
 import org.apache.kylin.common.util.StringSplitter;
 import org.apache.kylin.cube.CubeInstance;
 import org.apache.kylin.cube.CubeSegment;
 import org.apache.kylin.engine.mr.HadoopUtil;
 import org.apache.kylin.invertedindex.IIInstance;
-import org.apache.kylin.invertedindex.IISegment;
 import org.apache.kylin.job.JobInstance;
-import org.apache.kylin.common.util.OptionsHelper;
 import org.apache.kylin.job.exception.JobException;
 import org.apache.kylin.metadata.MetadataManager;
 import org.apache.kylin.metadata.model.TableDesc;
@@ -395,14 +394,11 @@ public abstract class AbstractHadoopJob extends Configured implements Tool {
         KylinConfig localConfig = KylinConfig.createInstanceFromUri(metaDir.getAbsolutePath());
         ResourceStore to = ResourceStore.getStore(localConfig);
         for (String path : dumpList) {
-            InputStream in = from.getResource(path);
-            if (in == null)
+            RawResource res = from.getResource(path);
+            if (res == null)
                 throw new IllegalStateException("No resource found at -- " + path);
-            long ts = from.getResourceTimestamp(path);
-            to.putResource(path, in, ts);
-            in.close();
-            //The following log is duplicate with in ResourceStore
-            //log.info("Dumped resource " + path + " to " + metaDir.getAbsolutePath());
+            to.putResource(path, res.inputStream, res.timestamp);
+            res.inputStream.close();
         }
     }
 

http://git-wip-us.apache.org/repos/asf/incubator-kylin/blob/60daf39b/engine-mr/src/main/java/org/apache/kylin/engine/mr/steps/MergeStatisticsStep.java
----------------------------------------------------------------------
diff --git a/engine-mr/src/main/java/org/apache/kylin/engine/mr/steps/MergeStatisticsStep.java b/engine-mr/src/main/java/org/apache/kylin/engine/mr/steps/MergeStatisticsStep.java
index 45282df..3502271 100644
--- a/engine-mr/src/main/java/org/apache/kylin/engine/mr/steps/MergeStatisticsStep.java
+++ b/engine-mr/src/main/java/org/apache/kylin/engine/mr/steps/MergeStatisticsStep.java
@@ -83,7 +83,7 @@ public class MergeStatisticsStep extends AbstractExecutable {
             int averageSamplingPercentage = 0;
             for (String segmentId : this.getMergingSegmentIds()) {
                 String fileKey = CubeSegment.getStatisticsResourcePath(getCubeName(), segmentId);
-                InputStream is = rs.getResource(fileKey);
+                InputStream is = rs.getResource(fileKey).inputStream;
                 File tempFile = null;
                 FileOutputStream tempFileStream = null;
                 try {

http://git-wip-us.apache.org/repos/asf/incubator-kylin/blob/60daf39b/engine-streaming/src/main/java/org/apache/kylin/engine/streaming/StreamingManager.java
----------------------------------------------------------------------
diff --git a/engine-streaming/src/main/java/org/apache/kylin/engine/streaming/StreamingManager.java b/engine-streaming/src/main/java/org/apache/kylin/engine/streaming/StreamingManager.java
index 8cabe1b..583eb7b 100644
--- a/engine-streaming/src/main/java/org/apache/kylin/engine/streaming/StreamingManager.java
+++ b/engine-streaming/src/main/java/org/apache/kylin/engine/streaming/StreamingManager.java
@@ -47,9 +47,11 @@ import java.util.List;
 import java.util.Map;
 import java.util.concurrent.ConcurrentHashMap;
 
+import org.apache.commons.io.IOUtils;
 import org.apache.commons.lang3.StringUtils;
 import org.apache.kylin.common.KylinConfig;
 import org.apache.kylin.common.persistence.JsonSerializer;
+import org.apache.kylin.common.persistence.RawResource;
 import org.apache.kylin.common.persistence.ResourceStore;
 import org.apache.kylin.common.persistence.Serializer;
 import org.apache.kylin.common.restclient.Broadcaster;
@@ -119,10 +121,6 @@ public class StreamingManager {
         }
     }
 
-    private boolean checkExistence(String name) {
-        return true;
-    }
-
     private String formatStreamingConfigPath(String name) {
         return ResourceStore.STREAMING_RESOURCE_ROOT + "/" + name + ".json";
     }
@@ -224,17 +222,21 @@ public class StreamingManager {
 
     public long getOffset(String streaming, int shard) {
         final String resPath = formatStreamingOutputPath(streaming, shard);
+        InputStream inputStream = null; 
         try {
-            final InputStream inputStream = getStore().getResource(resPath);
-            if (inputStream == null) {
+            final RawResource res = getStore().getResource(resPath);
+            if (res == null) {
                 return 0;
             } else {
+            	inputStream = res.inputStream;
                 final BufferedReader br = new BufferedReader(new InputStreamReader(inputStream));
                 return Long.parseLong(br.readLine());
             }
         } catch (Exception e) {
             logger.error("error get offset, path:" + resPath, e);
             throw new RuntimeException("error get offset, path:" + resPath, e);
+        } finally {
+        	IOUtils.closeQuietly(inputStream);
         }
     }
 
@@ -252,16 +254,20 @@ public class StreamingManager {
     public Map<Integer, Long> getOffset(String streaming, List<Integer> partitions) {
         Collections.sort(partitions);
         final String resPath = formatStreamingOutputPath(streaming, partitions);
+        InputStream inputStream = null;
         try {
-            final InputStream inputStream = getStore().getResource(resPath);
-            if (inputStream == null) {
-                return Collections.emptyMap();
-            }
+        	RawResource res = getStore().getResource(resPath);
+        	if (res == null)
+        		return Collections.emptyMap();
+        	
+        	inputStream = res.inputStream;
             final HashMap<Integer, Long> result = mapper.readValue(inputStream, mapType);
             return result;
         } catch (IOException e) {
             logger.error("error get offset, path:" + resPath, e);
             throw new RuntimeException("error get offset, path:" + resPath, e);
+        } finally {
+        	IOUtils.closeQuietly(inputStream);
         }
     }
 

http://git-wip-us.apache.org/repos/asf/incubator-kylin/blob/60daf39b/query/src/test/java/org/apache/kylin/query/test/H2Database.java
----------------------------------------------------------------------
diff --git a/query/src/test/java/org/apache/kylin/query/test/H2Database.java b/query/src/test/java/org/apache/kylin/query/test/H2Database.java
index 54e0914..cab51d3 100644
--- a/query/src/test/java/org/apache/kylin/query/test/H2Database.java
+++ b/query/src/test/java/org/apache/kylin/query/test/H2Database.java
@@ -36,6 +36,7 @@ import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 
 public class H2Database {
+    @SuppressWarnings("unused")
     private static final Logger logger = LoggerFactory.getLogger(H2Database.class);
 
     private static final String[] ALL_TABLES = new String[] { "edw.test_cal_dt", "default.test_category_groupings", "default.test_kylin_fact", "edw.test_seller_type_dim", "edw.test_sites", "default.streaming_table" };
@@ -72,7 +73,7 @@ public class H2Database {
             tempFile = File.createTempFile("tmp_h2", ".csv");
             FileOutputStream tempFileStream = new FileOutputStream(tempFile);
             String normalPath = "/data/" + tableDesc.getIdentity() + ".csv";
-            InputStream csvStream = metaMgr.getStore().getResource(normalPath);
+            InputStream csvStream = metaMgr.getStore().getResource(normalPath).inputStream;
 
             org.apache.commons.io.IOUtils.copy(csvStream, tempFileStream);
 

http://git-wip-us.apache.org/repos/asf/incubator-kylin/blob/60daf39b/storage-hbase/src/main/java/org/apache/kylin/storage/hbase/HBaseResourceStore.java
----------------------------------------------------------------------
diff --git a/storage-hbase/src/main/java/org/apache/kylin/storage/hbase/HBaseResourceStore.java b/storage-hbase/src/main/java/org/apache/kylin/storage/hbase/HBaseResourceStore.java
index 560850f..eecbfb5 100644
--- a/storage-hbase/src/main/java/org/apache/kylin/storage/hbase/HBaseResourceStore.java
+++ b/storage-hbase/src/main/java/org/apache/kylin/storage/hbase/HBaseResourceStore.java
@@ -146,7 +146,7 @@ public class HBaseResourceStore extends ResourceStore {
 
     @Override
     protected boolean existsImpl(String resPath) throws IOException {
-        Result r = getByScan(resPath, null, null);
+        Result r = getByScan(resPath, false, false);
         return r != null;
     }
 
@@ -168,7 +168,7 @@ public class HBaseResourceStore extends ResourceStore {
             }
         } catch (IOException e) {
             for (RawResource rawResource : result) {
-                IOUtils.closeQuietly(rawResource.resource);
+                IOUtils.closeQuietly(rawResource.inputStream);
             }
             throw e;
         } finally {
@@ -202,17 +202,19 @@ public class HBaseResourceStore extends ResourceStore {
     }
 
     @Override
-    protected InputStream getResourceImpl(String resPath) throws IOException {
-        Result r = getByScan(resPath, B_FAMILY, B_COLUMN);
-        return getInputStream(resPath, r);
+    protected RawResource getResourceImpl(String resPath) throws IOException {
+        Result r = getByScan(resPath, true, true);
+        if (r == null)
+            return null;
+        else
+            return new RawResource(getInputStream(resPath, r), getTimestamp(r));
     }
 
     @Override
     protected long getResourceTimestampImpl(String resPath) throws IOException {
-        Result r = getByScan(resPath, B_FAMILY, B_COLUMN_TS);
-        return getTimestamp(r);
+        return getTimestamp(getByScan(resPath, false, true));
     }
-
+    
     @Override
     protected void putResourceImpl(String resPath, InputStream content, long ts) throws IOException {
         ByteArrayOutputStream bout = new ByteArrayOutputStream();
@@ -242,7 +244,7 @@ public class HBaseResourceStore extends ResourceStore {
             boolean ok = table.checkAndPut(row, B_FAMILY, B_COLUMN_TS, bOldTS, put);
             logger.info("Update row " + resPath + " from oldTs: " + oldTS + ", to newTs: " + newTS + ", operation result: " + ok);
             if (!ok) {
-                long real = getResourceTimestamp(resPath);
+                long real = getResourceTimestampImpl(resPath);
                 throw new IllegalStateException("Overwriting conflict " + resPath + ", expect old TS " + oldTS + ", but it is " + real);
             }
 
@@ -271,15 +273,18 @@ public class HBaseResourceStore extends ResourceStore {
         return getAllInOneTableName() + "(key='" + resPath + "')@" + kylinConfig.getMetadataUrl();
     }
 
-    private Result getByScan(String path, byte[] family, byte[] column) throws IOException {
+    private Result getByScan(String path, boolean fetchContent, boolean fetchTimestamp) throws IOException {
         byte[] startRow = Bytes.toBytes(path);
         byte[] endRow = plusZero(startRow);
 
         Scan scan = new Scan(startRow, endRow);
-        if (family == null || column == null) {
+        if (!fetchContent && !fetchTimestamp) {
             scan.setFilter(new KeyOnlyFilter());
         } else {
-            scan.addColumn(family, column);
+            if (fetchContent)
+                scan.addColumn(B_FAMILY, B_COLUMN);
+            if (fetchTimestamp)
+                scan.addColumn(B_FAMILY, B_COLUMN_TS);
         }
 
         HTableInterface table = getConnection().getTable(getAllInOneTableName());

http://git-wip-us.apache.org/repos/asf/incubator-kylin/blob/60daf39b/storage-hbase/src/main/java/org/apache/kylin/storage/hbase/steps/CreateHTableJob.java
----------------------------------------------------------------------
diff --git a/storage-hbase/src/main/java/org/apache/kylin/storage/hbase/steps/CreateHTableJob.java b/storage-hbase/src/main/java/org/apache/kylin/storage/hbase/steps/CreateHTableJob.java
index 8bff4d1..e8b21d3 100644
--- a/storage-hbase/src/main/java/org/apache/kylin/storage/hbase/steps/CreateHTableJob.java
+++ b/storage-hbase/src/main/java/org/apache/kylin/storage/hbase/steps/CreateHTableJob.java
@@ -165,7 +165,7 @@ public class CreateHTableJob extends AbstractHadoopJob {
     public static Map<Long, Long> getCubeRowCountMapFromCuboidStatistics(CubeSegment cubeSegment, KylinConfig kylinConfig, Configuration conf) throws IOException {
         ResourceStore rs = ResourceStore.getStore(kylinConfig);
         String fileKey = cubeSegment.getStatisticsResourcePath();
-        InputStream is = rs.getResource(fileKey);
+        InputStream is = rs.getResource(fileKey).inputStream;
         File tempFile = null;
         FileOutputStream tempFileStream = null;
         try {

http://git-wip-us.apache.org/repos/asf/incubator-kylin/blob/60daf39b/storage-hbase/src/main/java/org/apache/kylin/storage/hbase/util/CubeMigrationCLI.java
----------------------------------------------------------------------
diff --git a/storage-hbase/src/main/java/org/apache/kylin/storage/hbase/util/CubeMigrationCLI.java b/storage-hbase/src/main/java/org/apache/kylin/storage/hbase/util/CubeMigrationCLI.java
index aec1d5b..6a4d5e3 100644
--- a/storage-hbase/src/main/java/org/apache/kylin/storage/hbase/util/CubeMigrationCLI.java
+++ b/storage-hbase/src/main/java/org/apache/kylin/storage/hbase/util/CubeMigrationCLI.java
@@ -19,7 +19,6 @@
 package org.apache.kylin.storage.hbase.util;
 
 import java.io.IOException;
-import java.io.InputStream;
 import java.util.ArrayList;
 import java.util.List;
 import java.util.Map;
@@ -34,6 +33,7 @@ import org.apache.hadoop.hbase.TableName;
 import org.apache.hadoop.hbase.client.HBaseAdmin;
 import org.apache.kylin.common.KylinConfig;
 import org.apache.kylin.common.persistence.JsonSerializer;
+import org.apache.kylin.common.persistence.RawResource;
 import org.apache.kylin.common.persistence.ResourceStore;
 import org.apache.kylin.common.persistence.Serializer;
 import org.apache.kylin.cube.CubeInstance;
@@ -305,10 +305,9 @@ public class CubeMigrationCLI {
         }
         case COPY_FILE_IN_META: {
             String item = (String) opt.params[0];
-            InputStream inputStream = srcStore.getResource(item);
-            long ts = srcStore.getResourceTimestamp(item);
-            dstStore.putResource(item, inputStream, ts);
-            inputStream.close();
+            RawResource res = srcStore.getResource(item);
+            dstStore.putResource(item, res.inputStream, res.timestamp);
+            res.inputStream.close();
             logger.info("Item " + item + " is copied");
             break;
         }