You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@kylin.apache.org by li...@apache.org on 2015/11/17 06:17:16 UTC

incubator-kylin git commit: KYLIN-1152 Modify resource store to always return content and timestamp together

Repository: incubator-kylin
Updated Branches:
  refs/heads/1.x-staging 1afb32dc5 -> 2f788e71e


KYLIN-1152 Modify resource store to always return content and timestamp together


Project: http://git-wip-us.apache.org/repos/asf/incubator-kylin/repo
Commit: http://git-wip-us.apache.org/repos/asf/incubator-kylin/commit/2f788e71
Tree: http://git-wip-us.apache.org/repos/asf/incubator-kylin/tree/2f788e71
Diff: http://git-wip-us.apache.org/repos/asf/incubator-kylin/diff/2f788e71

Branch: refs/heads/1.x-staging
Commit: 2f788e71e7a12ec54cc99b5ec0305924847a178e
Parents: 1afb32d
Author: Li, Yang <ya...@ebay.com>
Authored: Tue Nov 17 13:15:54 2015 +0800
Committer: Li, Yang <ya...@ebay.com>
Committed: Tue Nov 17 13:17:04 2015 +0800

----------------------------------------------------------------------
 .../common/persistence/FileResourceStore.java   | 16 +++------
 .../common/persistence/HBaseResourceStore.java  | 36 ++++++++++----------
 .../kylin/common/persistence/RawResource.java   |  6 ++--
 .../kylin/common/persistence/ResourceStore.java | 27 ++++++---------
 .../kylin/common/persistence/ResourceTool.java  | 12 +++----
 .../apache/kylin/job/CubeMetadataUpgrade.java   |  9 ++---
 .../kylin/job/hadoop/AbstractHadoopJob.java     | 12 +++----
 .../job/hadoop/cube/MetadataCleanupJob.java     |  9 +++--
 .../kylin/job/tools/CubeMigrationCLI.java       |  9 +++--
 .../java/org/apache/kylin/job/DeployUtil.java   |  4 +--
 .../kylin/job/dataGen/FactTableGenerator.java   |  7 ++--
 .../apache/kylin/metadata/MetadataManager.java  |  7 ++--
 .../org/apache/kylin/query/test/H2Database.java | 15 ++++----
 .../apache/kylin/query/test/KylinQueryTest.java |  4 +--
 14 files changed, 82 insertions(+), 91 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/incubator-kylin/blob/2f788e71/common/src/main/java/org/apache/kylin/common/persistence/FileResourceStore.java
----------------------------------------------------------------------
diff --git a/common/src/main/java/org/apache/kylin/common/persistence/FileResourceStore.java b/common/src/main/java/org/apache/kylin/common/persistence/FileResourceStore.java
index 9648f6b..98c6b18 100644
--- a/common/src/main/java/org/apache/kylin/common/persistence/FileResourceStore.java
+++ b/common/src/main/java/org/apache/kylin/common/persistence/FileResourceStore.java
@@ -75,14 +75,14 @@ public class FileResourceStore extends ResourceStore {
             for (String resource : resources) {
                 if (resource.compareTo(rangeStart) >= 0 && resource.compareTo(rangeEnd) <= 0) {
                     if (existsImpl(resource)) {
-                        result.add(new RawResource(getResourceImpl(resource), getResourceTimestampImpl(resource)));
+                        result.add(getResourceImpl(resource));
                     }
                 }
             }
             return result;
         } catch (IOException ex) {
             for (RawResource rawResource : result) {
-                IOUtils.closeQuietly(rawResource.resource);
+                IOUtils.closeQuietly(rawResource.inputStream);
             }
             throw ex;
         } catch (Exception ex) {
@@ -91,21 +91,15 @@ public class FileResourceStore extends ResourceStore {
     }
 
     @Override
-    protected InputStream getResourceImpl(String resPath) throws IOException {
+    protected RawResource getResourceImpl(String resPath) throws IOException {
         File f = file(resPath);
         if (f.exists() && f.isFile())
-            return new FileInputStream(file(resPath));
+            return new RawResource(new FileInputStream(f), f.lastModified());
         else
             return null;
     }
 
     @Override
-    protected long getResourceTimestampImpl(String resPath) throws IOException {
-        File f = file(resPath);
-        return f.lastModified();
-    }
-
-    @Override
     protected void putResourceImpl(String resPath, InputStream content, long ts) throws IOException {
         File f = file(resPath);
         f.getParentFile().mkdirs();
@@ -128,7 +122,7 @@ public class FileResourceStore extends ResourceStore {
         putResourceImpl(resPath, new ByteArrayInputStream(content), newTS);
 
         // some FS lose precision on given time stamp
-        return getResourceTimestamp(resPath);
+        return f.lastModified();
     }
 
     @Override

http://git-wip-us.apache.org/repos/asf/incubator-kylin/blob/2f788e71/common/src/main/java/org/apache/kylin/common/persistence/HBaseResourceStore.java
----------------------------------------------------------------------
diff --git a/common/src/main/java/org/apache/kylin/common/persistence/HBaseResourceStore.java b/common/src/main/java/org/apache/kylin/common/persistence/HBaseResourceStore.java
index 1c4a7ba..2b14345 100644
--- a/common/src/main/java/org/apache/kylin/common/persistence/HBaseResourceStore.java
+++ b/common/src/main/java/org/apache/kylin/common/persistence/HBaseResourceStore.java
@@ -141,7 +141,7 @@ public class HBaseResourceStore extends ResourceStore {
 
     @Override
     protected boolean existsImpl(String resPath) throws IOException {
-        Result r = getByScan(resPath, null, null);
+        Result r = getByScan(resPath, false, false);
         return r != null;
     }
 
@@ -163,7 +163,7 @@ public class HBaseResourceStore extends ResourceStore {
             }
         } catch (IOException e) {
             for (RawResource rawResource : result) {
-                IOUtils.closeQuietly(rawResource.resource);
+                IOUtils.closeQuietly(rawResource.inputStream);
             }
             throw e;
         } finally {
@@ -179,7 +179,7 @@ public class HBaseResourceStore extends ResourceStore {
         byte[] value = r.getValue(B_FAMILY, B_COLUMN);
         if (value.length == 0) {
             Path redirectPath = bigCellHDFSPath(resPath);
-            Configuration hconf = HadoopUtil.getCurrentHBaseConfiguration();
+            Configuration hconf = HadoopUtil.getCurrentHBaseConfiguration();
             FileSystem fileSystem = FileSystem.get(hconf);
 
             return fileSystem.open(redirectPath);
@@ -197,15 +197,12 @@ public class HBaseResourceStore extends ResourceStore {
     }
 
     @Override
-    protected InputStream getResourceImpl(String resPath) throws IOException {
-        Result r = getByScan(resPath, B_FAMILY, B_COLUMN);
-        return getInputStream(resPath, r);
-    }
-
-    @Override
-    protected long getResourceTimestampImpl(String resPath) throws IOException {
-        Result r = getByScan(resPath, B_FAMILY, B_COLUMN_TS);
-        return getTimestamp(r);
+    protected RawResource getResourceImpl(String resPath) throws IOException {
+        Result r = getByScan(resPath, true, true);
+        if (r == null)
+            return null;
+        else
+            return new RawResource(getInputStream(resPath, r), getTimestamp(r));
     }
 
     @Override
@@ -236,8 +233,8 @@ public class HBaseResourceStore extends ResourceStore {
 
             boolean ok = table.checkAndPut(row, B_FAMILY, B_COLUMN_TS, bOldTS, put);
             if (!ok) {
-                long real = getResourceTimestamp(resPath);
-                throw new IllegalStateException("Overwriting conflict " + resPath + ", expect old TS " + real + ", but it is " + oldTS);
+                long real = getTimestamp(getByScan(resPath, false, true));
+                throw new IllegalStateException("Overwriting conflict " + resPath + ", expect old TS " + real + ", but it is " + oldTS);
             }
 
             table.flushCommits();
@@ -265,15 +262,18 @@ public class HBaseResourceStore extends ResourceStore {
         return getAllInOneTableName() + "(key='" + resPath + "')@" + kylinConfig.getMetadataUrl();
     }
 
-    private Result getByScan(String path, byte[] family, byte[] column) throws IOException {
+    private Result getByScan(String path, boolean fetchContent, boolean fetchTimestamp) throws IOException {
         byte[] startRow = Bytes.toBytes(path);
         byte[] endRow = plusZero(startRow);
 
         Scan scan = new Scan(startRow, endRow);
-        if (family == null || column == null) {
+        if (!fetchContent && !fetchTimestamp) {
             scan.setFilter(new KeyOnlyFilter());
         } else {
-            scan.addColumn(family, column);
+            if (fetchContent)
+                scan.addColumn(B_FAMILY, B_COLUMN);
+            if (fetchTimestamp)
+                scan.addColumn(B_FAMILY, B_COLUMN_TS);
         }
 
         HTableInterface table = getConnection().getTable(getAllInOneTableName());
@@ -297,7 +297,7 @@ public class HBaseResourceStore extends ResourceStore {
 
     private Path writeLargeCellToHdfs(String resPath, byte[] largeColumn, HTableInterface table) throws IOException {
         Path redirectPath = bigCellHDFSPath(resPath);
-        Configuration hconf = HadoopUtil.getCurrentHBaseConfiguration();
+        Configuration hconf = HadoopUtil.getCurrentHBaseConfiguration();
         FileSystem fileSystem = FileSystem.get(hconf);
 
         if (fileSystem.exists(redirectPath)) {

http://git-wip-us.apache.org/repos/asf/incubator-kylin/blob/2f788e71/common/src/main/java/org/apache/kylin/common/persistence/RawResource.java
----------------------------------------------------------------------
diff --git a/common/src/main/java/org/apache/kylin/common/persistence/RawResource.java b/common/src/main/java/org/apache/kylin/common/persistence/RawResource.java
index 8125b86..4f52553 100644
--- a/common/src/main/java/org/apache/kylin/common/persistence/RawResource.java
+++ b/common/src/main/java/org/apache/kylin/common/persistence/RawResource.java
@@ -24,11 +24,11 @@ import java.io.InputStream;
  */
 public class RawResource {
 
-    public final InputStream resource;
+    public final InputStream inputStream;
     public final long timestamp;
 
-    public RawResource(InputStream resource, long timestamp) {
-        this.resource = resource;
+    public RawResource(InputStream inputStream, long timestamp) {
+        this.inputStream = inputStream;
         this.timestamp = timestamp;
     }
 }

http://git-wip-us.apache.org/repos/asf/incubator-kylin/blob/2f788e71/common/src/main/java/org/apache/kylin/common/persistence/ResourceStore.java
----------------------------------------------------------------------
diff --git a/common/src/main/java/org/apache/kylin/common/persistence/ResourceStore.java b/common/src/main/java/org/apache/kylin/common/persistence/ResourceStore.java
index 55a1a58..a23a4cd 100644
--- a/common/src/main/java/org/apache/kylin/common/persistence/ResourceStore.java
+++ b/common/src/main/java/org/apache/kylin/common/persistence/ResourceStore.java
@@ -127,22 +127,22 @@ abstract public class ResourceStore {
      */
     final public <T extends RootPersistentEntity> T getResource(String resPath, Class<T> clz, Serializer<T> serializer) throws IOException {
         resPath = norm(resPath);
-        InputStream in = getResourceImpl(resPath);
-        if (in == null)
+        RawResource res = getResourceImpl(resPath);
+        if (res == null)
             return null;
-
-        DataInputStream din = new DataInputStream(in);
+        
+        DataInputStream din = new DataInputStream(res.inputStream);
         try {
             T r = serializer.deserialize(din);
-            r.setLastModified(getResourceTimestamp(resPath));
+            r.setLastModified(res.timestamp);
             return r;
         } finally {
             IOUtils.closeQuietly(din);
-            IOUtils.closeQuietly(in);
+            IOUtils.closeQuietly(res.inputStream);
         }
     }
 
-    final public InputStream getResource(String resPath) throws IOException {
+    final public RawResource getResource(String resPath) throws IOException {
         return getResourceImpl(norm(resPath));
     }
 
@@ -154,27 +154,22 @@ abstract public class ResourceStore {
         List<T> result = Lists.newArrayList();
         try {
             for (RawResource rawResource : allResources) {
-                final T element = serializer.deserialize(new DataInputStream(rawResource.resource));
+                final T element = serializer.deserialize(new DataInputStream(rawResource.inputStream));
                 element.setLastModified(rawResource.timestamp);
                 result.add(element);
             }
             return result;
         } finally {
             for (RawResource rawResource : allResources) {
-                IOUtils.closeQuietly(rawResource.resource);
+                IOUtils.closeQuietly(rawResource.inputStream);
             }
         }
     }
 
     abstract protected List<RawResource> getAllResources(String rangeStart, String rangeEnd) throws IOException;
 
-    abstract protected InputStream getResourceImpl(String resPath) throws IOException;
-
-    final public long getResourceTimestamp(String resPath) throws IOException {
-        return getResourceTimestampImpl(norm(resPath));
-    }
-
-    abstract protected long getResourceTimestampImpl(String resPath) throws IOException;
+    /** returns null if not exists */
+    abstract protected RawResource getResourceImpl(String resPath) throws IOException;
 
     /**
      * overwrite a resource without write conflict check

http://git-wip-us.apache.org/repos/asf/incubator-kylin/blob/2f788e71/common/src/main/java/org/apache/kylin/common/persistence/ResourceTool.java
----------------------------------------------------------------------
diff --git a/common/src/main/java/org/apache/kylin/common/persistence/ResourceTool.java b/common/src/main/java/org/apache/kylin/common/persistence/ResourceTool.java
index 6c15f00..0ebed3d 100644
--- a/common/src/main/java/org/apache/kylin/common/persistence/ResourceTool.java
+++ b/common/src/main/java/org/apache/kylin/common/persistence/ResourceTool.java
@@ -19,7 +19,6 @@
 package org.apache.kylin.common.persistence;
 
 import java.io.IOException;
-import java.io.InputStream;
 import java.util.ArrayList;
 
 import org.apache.kylin.common.KylinConfig;
@@ -86,13 +85,12 @@ public class ResourceTool {
         // case of resource (not a folder)
         if (children == null) {
             if (matchExclude(path) == false) {
-                InputStream content = src.getResource(path);
-                long ts = src.getResourceTimestamp(path);
-                if (content != null) {
-                    dst.putResource(path, content, ts);
-                    content.close();
+                RawResource res = src.getResource(path);
+                if (res != null) {
+                    dst.putResource(path, res.inputStream, res.timestamp);
+                    res.inputStream.close();
                 } else {
-                    System.out.println("Null inputstream for " + path);
+                    System.out.println("Resource not exist for " + path);
                 }
             }
         }

http://git-wip-us.apache.org/repos/asf/incubator-kylin/blob/2f788e71/job/src/main/java/org/apache/kylin/job/CubeMetadataUpgrade.java
----------------------------------------------------------------------
diff --git a/job/src/main/java/org/apache/kylin/job/CubeMetadataUpgrade.java b/job/src/main/java/org/apache/kylin/job/CubeMetadataUpgrade.java
index cd3427e..0a08709 100644
--- a/job/src/main/java/org/apache/kylin/job/CubeMetadataUpgrade.java
+++ b/job/src/main/java/org/apache/kylin/job/CubeMetadataUpgrade.java
@@ -193,7 +193,7 @@ public class CubeMetadataUpgrade {
         MetadataManager.getInstance(config).reload();
         CubeDescManager.clearCache();
         CubeDescManager.getInstance(config);
-        CubeManager cubeManager = CubeManager.getInstance(config);
+        CubeManager.getInstance(config);
         ProjectManager.getInstance(config);
         //cleanup();
 
@@ -267,7 +267,7 @@ public class CubeMetadataUpgrade {
 
             InputStream is = null;
             try {
-                is = store.getResource(path);
+                is = store.getResource(path).inputStream;
                 if (is == null) {
                     continue;
                 }
@@ -496,7 +496,7 @@ public class CubeMetadataUpgrade {
                         if (pkToFK.containsKey(key) && !newSeg.getDictionaries().containsKey(pkToFK.get(key))) {
                             logger.debug("Duplicate dictionary for FK " + pkToFK.get(key) + " in cube " + newInstance.getName());
                             changedCubes.add(newInstance.getName());
-                            newDictionaries.add(new Pair(pkToFK.get(key), e.getValue()));
+                            newDictionaries.add(new Pair<String, String>(pkToFK.get(key), e.getValue()));
 
                         }
                     }
@@ -617,10 +617,11 @@ public class CubeMetadataUpgrade {
         for (int i = 0, size = job.getSteps().size(); i < size; ++i) {
             final JobInstance.JobStep jobStep = job.getSteps().get(i);
             final String outputPath = ResourceStore.JOB_OUTPUT_PATH_ROOT + "/" + job.getId() + "." + i;
-            final InputStream inputStream = getStore().getResource(outputPath);
+            final InputStream inputStream = getStore().getResource(outputPath).inputStream;
 
             String output = null;
             if (inputStream != null) {
+                @SuppressWarnings("unchecked")
                 HashMap<String, String> job_output = JsonUtil.readValue(inputStream, HashMap.class);
 
                 if (job_output != null) {

http://git-wip-us.apache.org/repos/asf/incubator-kylin/blob/2f788e71/job/src/main/java/org/apache/kylin/job/hadoop/AbstractHadoopJob.java
----------------------------------------------------------------------
diff --git a/job/src/main/java/org/apache/kylin/job/hadoop/AbstractHadoopJob.java b/job/src/main/java/org/apache/kylin/job/hadoop/AbstractHadoopJob.java
index 7b3af95..a851756 100644
--- a/job/src/main/java/org/apache/kylin/job/hadoop/AbstractHadoopJob.java
+++ b/job/src/main/java/org/apache/kylin/job/hadoop/AbstractHadoopJob.java
@@ -27,7 +27,6 @@ import static org.apache.hadoop.util.StringUtils.*;
 
 import java.io.File;
 import java.io.IOException;
-import java.io.InputStream;
 import java.util.ArrayList;
 import java.util.HashMap;
 import java.util.Map;
@@ -51,6 +50,7 @@ import org.apache.hadoop.mapreduce.lib.input.FileInputFormat;
 import org.apache.hadoop.util.ReflectionUtils;
 import org.apache.hadoop.util.Tool;
 import org.apache.kylin.common.KylinConfig;
+import org.apache.kylin.common.persistence.RawResource;
 import org.apache.kylin.common.persistence.ResourceStore;
 import org.apache.kylin.common.util.CliCommandExecutor;
 import org.apache.kylin.common.util.StringSplitter;
@@ -347,13 +347,11 @@ public abstract class AbstractHadoopJob extends Configured implements Tool {
         KylinConfig localConfig = KylinConfig.createInstanceFromUri(metaDir.getAbsolutePath());
         ResourceStore to = ResourceStore.getStore(localConfig);
         for (String path : dumpList) {
-            InputStream in = from.getResource(path);
-            if (in == null)
+            RawResource res = from.getResource(path);
+            if (res == null)
                 throw new IllegalStateException("No resource found at -- " + path);
-            long ts = from.getResourceTimestamp(path);
-            to.putResource(path, in, ts);
-            //The following log is duplicate with in ResourceStore
-            //log.info("Dumped resource " + path + " to " + metaDir.getAbsolutePath());
+            to.putResource(path, res.inputStream, res.timestamp);
+            res.inputStream.close();
         }
     }
 

http://git-wip-us.apache.org/repos/asf/incubator-kylin/blob/2f788e71/job/src/main/java/org/apache/kylin/job/hadoop/cube/MetadataCleanupJob.java
----------------------------------------------------------------------
diff --git a/job/src/main/java/org/apache/kylin/job/hadoop/cube/MetadataCleanupJob.java b/job/src/main/java/org/apache/kylin/job/hadoop/cube/MetadataCleanupJob.java
index b322a4b..cb601c5 100644
--- a/job/src/main/java/org/apache/kylin/job/hadoop/cube/MetadataCleanupJob.java
+++ b/job/src/main/java/org/apache/kylin/job/hadoop/cube/MetadataCleanupJob.java
@@ -28,6 +28,7 @@ import org.apache.commons.cli.OptionBuilder;
 import org.apache.commons.cli.Options;
 import org.apache.hadoop.util.ToolRunner;
 import org.apache.kylin.common.KylinConfig;
+import org.apache.kylin.common.persistence.RawResource;
 import org.apache.kylin.common.persistence.ResourceStore;
 import org.apache.kylin.cube.CubeManager;
 import org.apache.kylin.job.hadoop.AbstractHadoopJob;
@@ -114,7 +115,9 @@ public class MetadataCleanupJob extends AbstractHadoopJob {
                     if (snapshotNames != null)
                         for (String snapshot : snapshotNames) {
                             if (!activeResourceList.contains(snapshot)) {
-                                if (isOlderThanThreshold(getStore().getResourceTimestamp(snapshot)))
+                                RawResource res = getStore().getResource(snapshot);
+                                res.inputStream.close();
+                                if (isOlderThanThreshold(res.timestamp))
                                     toDeleteResource.add(snapshot);
                             }
                         }
@@ -134,7 +137,9 @@ public class MetadataCleanupJob extends AbstractHadoopJob {
                         if (dictionaries != null)
                             for (String dict : dictionaries)
                                 if (!activeResourceList.contains(dict)) {
-                                    if (isOlderThanThreshold(getStore().getResourceTimestamp(dict)))
+                                    RawResource res = getStore().getResource(dict);
+                                    res.inputStream.close();
+                                    if (isOlderThanThreshold(res.timestamp))
                                         toDeleteResource.add(dict);
                                 }
                     }

http://git-wip-us.apache.org/repos/asf/incubator-kylin/blob/2f788e71/job/src/main/java/org/apache/kylin/job/tools/CubeMigrationCLI.java
----------------------------------------------------------------------
diff --git a/job/src/main/java/org/apache/kylin/job/tools/CubeMigrationCLI.java b/job/src/main/java/org/apache/kylin/job/tools/CubeMigrationCLI.java
index b07d6a9..2d4b0bf 100644
--- a/job/src/main/java/org/apache/kylin/job/tools/CubeMigrationCLI.java
+++ b/job/src/main/java/org/apache/kylin/job/tools/CubeMigrationCLI.java
@@ -19,7 +19,6 @@
 package org.apache.kylin.job.tools;
 
 import java.io.IOException;
-import java.io.InputStream;
 import java.util.ArrayList;
 import java.util.List;
 import java.util.Map;
@@ -33,6 +32,7 @@ import org.apache.hadoop.hbase.TableName;
 import org.apache.hadoop.hbase.client.HBaseAdmin;
 import org.apache.kylin.common.KylinConfig;
 import org.apache.kylin.common.persistence.JsonSerializer;
+import org.apache.kylin.common.persistence.RawResource;
 import org.apache.kylin.common.persistence.ResourceStore;
 import org.apache.kylin.common.persistence.Serializer;
 import org.apache.kylin.cube.CubeInstance;
@@ -293,10 +293,9 @@ public class CubeMigrationCLI {
         }
         case COPY_FILE_IN_META: {
             String item = (String) opt.params[0];
-            InputStream inputStream = srcStore.getResource(item);
-            long ts = srcStore.getResourceTimestamp(item);
-            dstStore.putResource(item, inputStream, ts);
-            inputStream.close();
+            RawResource res = srcStore.getResource(item);
+            dstStore.putResource(item, res.inputStream, res.timestamp);
+            res.inputStream.close();
             logger.info("Item " + item + " is copied");
             break;
         }

http://git-wip-us.apache.org/repos/asf/incubator-kylin/blob/2f788e71/job/src/test/java/org/apache/kylin/job/DeployUtil.java
----------------------------------------------------------------------
diff --git a/job/src/test/java/org/apache/kylin/job/DeployUtil.java b/job/src/test/java/org/apache/kylin/job/DeployUtil.java
index 8684aa0..550bddb 100644
--- a/job/src/test/java/org/apache/kylin/job/DeployUtil.java
+++ b/job/src/test/java/org/apache/kylin/job/DeployUtil.java
@@ -183,7 +183,7 @@ public class DeployUtil {
         // duplicate a copy of this fact table, with a naming convention with fact.csv.inner or fact.csv.left
         // so that later test cases can select different data files
         ResourceStore store = ResourceStore.getStore(config());
-        InputStream in = store.getResource("/data/" + factTableName + ".csv");
+        InputStream in = store.getResource("/data/" + factTableName + ".csv").inputStream;
         String factTablePathWithJoinType = "/data/" + factTableName + ".csv." + joinType.toLowerCase();
         store.deleteResource(factTablePathWithJoinType);
         store.putResource(factTablePathWithJoinType, in, System.currentTimeMillis());
@@ -203,7 +203,7 @@ public class DeployUtil {
             File localBufferFile = new File(temp.getParent() + "/" + tablename + ".csv");
             localBufferFile.createNewFile();
 
-            InputStream hbaseDataStream = metaMgr.getStore().getResource("/data/" + tablename + ".csv");
+            InputStream hbaseDataStream = metaMgr.getStore().getResource("/data/" + tablename + ".csv").inputStream;
             FileOutputStream localFileStream = new FileOutputStream(localBufferFile);
             IOUtils.copy(hbaseDataStream, localFileStream);
 

http://git-wip-us.apache.org/repos/asf/incubator-kylin/blob/2f788e71/job/src/test/java/org/apache/kylin/job/dataGen/FactTableGenerator.java
----------------------------------------------------------------------
diff --git a/job/src/test/java/org/apache/kylin/job/dataGen/FactTableGenerator.java b/job/src/test/java/org/apache/kylin/job/dataGen/FactTableGenerator.java
index 2bc4dc3..c9988fc 100644
--- a/job/src/test/java/org/apache/kylin/job/dataGen/FactTableGenerator.java
+++ b/job/src/test/java/org/apache/kylin/job/dataGen/FactTableGenerator.java
@@ -107,8 +107,7 @@ public class FactTableGenerator {
      */
     private void loadConfig() {
         try {
-            InputStream configStream = null;
-            configStream = store.getResource("/data/data_gen_config.json");
+            InputStream configStream = store.getResource("/data/data_gen_config.json").inputStream;
             this.genConf = GenConfig.loadConfig(configStream);
 
             if (configStream != null)
@@ -136,7 +135,7 @@ public class FactTableGenerator {
             }
 
             String path = "/data/" + lookupTableName + ".csv";
-            tableStream = store.getResource(path);
+            tableStream = store.getResource(path).inputStream;
             tableReader = new BufferedReader(new InputStreamReader(tableStream));
             tableReader.mark(0);
             int rowCount = 0;
@@ -158,7 +157,7 @@ public class FactTableGenerator {
             tableStream = null;
             tableReader = null;
 
-            tableStream = store.getResource(path);
+            tableStream = store.getResource(path).inputStream;
             tableReader = new BufferedReader(new InputStreamReader(tableStream));
 
             while ((curRow = tableReader.readLine()) != null) {

http://git-wip-us.apache.org/repos/asf/incubator-kylin/blob/2f788e71/metadata/src/main/java/org/apache/kylin/metadata/MetadataManager.java
----------------------------------------------------------------------
diff --git a/metadata/src/main/java/org/apache/kylin/metadata/MetadataManager.java b/metadata/src/main/java/org/apache/kylin/metadata/MetadataManager.java
index 377fba7..b540588 100644
--- a/metadata/src/main/java/org/apache/kylin/metadata/MetadataManager.java
+++ b/metadata/src/main/java/org/apache/kylin/metadata/MetadataManager.java
@@ -32,6 +32,7 @@ import java.util.concurrent.ConcurrentHashMap;
 
 import org.apache.kylin.common.KylinConfig;
 import org.apache.kylin.common.persistence.JsonSerializer;
+import org.apache.kylin.common.persistence.RawResource;
 import org.apache.kylin.common.persistence.ResourceStore;
 import org.apache.kylin.common.persistence.Serializer;
 import org.apache.kylin.common.restclient.Broadcaster;
@@ -215,11 +216,13 @@ public class MetadataManager {
         Map<String, String> attrs = Maps.newHashMap();
 
         ResourceStore store = getStore();
-        InputStream is = store.getResource(path);
-        if (is == null) {
+        RawResource res = store.getResource(path);
+        if (res == null) {
             logger.warn("Failed to get table exd info from " + path);
             return null;
         }
+        
+        InputStream is = res.inputStream;
 
         try {
             attrs.putAll(JsonUtil.readValue(is, HashMap.class));

http://git-wip-us.apache.org/repos/asf/incubator-kylin/blob/2f788e71/query/src/test/java/org/apache/kylin/query/test/H2Database.java
----------------------------------------------------------------------
diff --git a/query/src/test/java/org/apache/kylin/query/test/H2Database.java b/query/src/test/java/org/apache/kylin/query/test/H2Database.java
index eea1a96..564363f 100644
--- a/query/src/test/java/org/apache/kylin/query/test/H2Database.java
+++ b/query/src/test/java/org/apache/kylin/query/test/H2Database.java
@@ -21,7 +21,6 @@ package org.apache.kylin.query.test;
 import java.io.File;
 import java.io.FileOutputStream;
 import java.io.IOException;
-import java.io.InputStream;
 import java.sql.Connection;
 import java.sql.SQLException;
 import java.sql.Statement;
@@ -29,6 +28,7 @@ import java.util.HashMap;
 import java.util.Map;
 
 import org.apache.kylin.common.KylinConfig;
+import org.apache.kylin.common.persistence.RawResource;
 import org.apache.kylin.metadata.MetadataManager;
 import org.apache.kylin.metadata.model.ColumnDesc;
 import org.apache.kylin.metadata.model.TableDesc;
@@ -76,18 +76,17 @@ public class H2Database {
             String normalPath = "/data/" + tableDesc.getIdentity() + ".csv";
 
             // If it's the fact table, there will be a facttable.csv.inner or
-            // facttable.csv.left in hbase
-            // otherwise just use lookup.csv
-            InputStream csvStream = metaMgr.getStore().getResource(normalPath + fileNameSuffix);
-            if (csvStream == null) {
-                csvStream = metaMgr.getStore().getResource(normalPath);
+            // facttable.csv.left in hbase, otherwise just use lookup.csv
+            RawResource res = metaMgr.getStore().getResource(normalPath + fileNameSuffix);
+            if (res == null) {
+                res = metaMgr.getStore().getResource(normalPath);
             } else {
                 logger.info("H2 decides to load " + (normalPath + fileNameSuffix) + " for table " + tableDesc.getIdentity());
             }
 
-            org.apache.commons.io.IOUtils.copy(csvStream, tempFileStream);
+            org.apache.commons.io.IOUtils.copy(res.inputStream, tempFileStream);
 
-            csvStream.close();
+            res.inputStream.close();
             tempFileStream.close();
 
         } catch (IOException e) {

http://git-wip-us.apache.org/repos/asf/incubator-kylin/blob/2f788e71/query/src/test/java/org/apache/kylin/query/test/KylinQueryTest.java
----------------------------------------------------------------------
diff --git a/query/src/test/java/org/apache/kylin/query/test/KylinQueryTest.java b/query/src/test/java/org/apache/kylin/query/test/KylinQueryTest.java
index 84f1042..148607a 100644
--- a/query/src/test/java/org/apache/kylin/query/test/KylinQueryTest.java
+++ b/query/src/test/java/org/apache/kylin/query/test/KylinQueryTest.java
@@ -43,7 +43,7 @@ import org.junit.BeforeClass;
 import org.junit.Ignore;
 import org.junit.Test;
 
-@Ignore("KylinQueryTest is contained by CombinationTest")
+//@Ignore("KylinQueryTest is contained by CombinationTest")
 public class KylinQueryTest extends KylinTestBase {
 
     @BeforeClass
@@ -140,7 +140,7 @@ public class KylinQueryTest extends KylinTestBase {
     @Test
     public void testSingleRunQuery() throws Exception {
 
-        String queryFileName = "src/test/resources/query/sql/query62.sql";
+        String queryFileName = "src/test/resources/query/sql/sample.txt";
 
         File sqlFile = new File(queryFileName);
         runSQL(sqlFile, true, true);