You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@kylin.apache.org by li...@apache.org on 2017/01/05 06:05:13 UTC

[2/5] kylin git commit: KYLIN-2351 enforce Path schema

KYLIN-2351 enforce Path schema


Project: http://git-wip-us.apache.org/repos/asf/kylin/repo
Commit: http://git-wip-us.apache.org/repos/asf/kylin/commit/ee020cc6
Tree: http://git-wip-us.apache.org/repos/asf/kylin/tree/ee020cc6
Diff: http://git-wip-us.apache.org/repos/asf/kylin/diff/ee020cc6

Branch: refs/heads/yang22-cdh5.7
Commit: ee020cc69bef3a263e348b594ddae77ca8390efe
Parents: 4f6515d
Author: Li Yang <li...@apache.org>
Authored: Thu Jan 5 11:26:02 2017 +0800
Committer: Li Yang <li...@apache.org>
Committed: Thu Jan 5 13:24:38 2017 +0800

----------------------------------------------------------------------
 .../org/apache/kylin/engine/mr/HadoopUtil.java  | 23 +++++++++++++++-----
 .../engine/mr/steps/MergeStatisticsStep.java    |  2 +-
 .../steps/RowKeyDistributionCheckerMapper.java  |  3 ++-
 .../kylin/engine/mr/SortedColumnReaderTest.java | 20 ++++++++++++-----
 4 files changed, 35 insertions(+), 13 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/kylin/blob/ee020cc6/engine-mr/src/main/java/org/apache/kylin/engine/mr/HadoopUtil.java
----------------------------------------------------------------------
diff --git a/engine-mr/src/main/java/org/apache/kylin/engine/mr/HadoopUtil.java b/engine-mr/src/main/java/org/apache/kylin/engine/mr/HadoopUtil.java
index 3d29a02..3119c1e 100644
--- a/engine-mr/src/main/java/org/apache/kylin/engine/mr/HadoopUtil.java
+++ b/engine-mr/src/main/java/org/apache/kylin/engine/mr/HadoopUtil.java
@@ -72,17 +72,30 @@ public class HadoopUtil {
     }
     
     public static FileSystem getWorkingFileSystem(Configuration conf) throws IOException {
-        return getFileSystem(KylinConfig.getInstanceFromEnv().getHdfsWorkingDirectory(), conf);
+        Path workingPath = new Path(KylinConfig.getInstanceFromEnv().getHdfsWorkingDirectory());
+        return getFileSystem(workingPath, conf);
     }
     
     public static FileSystem getFileSystem(String path) throws IOException {
-        return getFileSystem(path, getCurrentConfiguration());
+        return getFileSystem(new Path(makeURI(path)));
     }
-
-    static FileSystem getFileSystem(String path, Configuration conf) throws IOException {
-        return FileSystem.get(makeURI(path), conf);
+    
+    public static FileSystem getFileSystem(Path path) throws IOException {
+        Configuration conf = getCurrentConfiguration();
+        return getFileSystem(path, conf);
     }
     
+    public static FileSystem getFileSystem(Path path, Configuration conf) {
+        if (StringUtils.isBlank(path.toUri().getScheme()))
+            throw new IllegalArgumentException("Path must be qualified: " + path);
+        
+        try {
+            return path.getFileSystem(conf);
+        } catch (IOException e) {
+            throw new RuntimeException(e);
+        }
+    }
+
     public static URI makeURI(String filePath) {
         try {
             return new URI(fixWindowsPath(filePath));

http://git-wip-us.apache.org/repos/asf/kylin/blob/ee020cc6/engine-mr/src/main/java/org/apache/kylin/engine/mr/steps/MergeStatisticsStep.java
----------------------------------------------------------------------
diff --git a/engine-mr/src/main/java/org/apache/kylin/engine/mr/steps/MergeStatisticsStep.java b/engine-mr/src/main/java/org/apache/kylin/engine/mr/steps/MergeStatisticsStep.java
index 88f6ba2..af86181 100644
--- a/engine-mr/src/main/java/org/apache/kylin/engine/mr/steps/MergeStatisticsStep.java
+++ b/engine-mr/src/main/java/org/apache/kylin/engine/mr/steps/MergeStatisticsStep.java
@@ -123,7 +123,7 @@ public class MergeStatisticsStep extends AbstractExecutable {
             averageSamplingPercentage = averageSamplingPercentage / CubingExecutableUtil.getMergingSegmentIds(this.getParams()).size();
             CubeStatsWriter.writeCuboidStatistics(conf, new Path(CubingExecutableUtil.getMergedStatisticsPath(this.getParams())), cuboidHLLMap, averageSamplingPercentage);
             Path statisticsFilePath = new Path(CubingExecutableUtil.getMergedStatisticsPath(this.getParams()), BatchConstants.CFG_STATISTICS_CUBOID_ESTIMATION_FILENAME);
-            FileSystem fs = statisticsFilePath.getFileSystem(conf);
+            FileSystem fs = HadoopUtil.getFileSystem(statisticsFilePath, conf);
             FSDataInputStream is = fs.open(statisticsFilePath);
             try {
                 // put the statistics to metadata store

http://git-wip-us.apache.org/repos/asf/kylin/blob/ee020cc6/engine-mr/src/main/java/org/apache/kylin/engine/mr/steps/RowKeyDistributionCheckerMapper.java
----------------------------------------------------------------------
diff --git a/engine-mr/src/main/java/org/apache/kylin/engine/mr/steps/RowKeyDistributionCheckerMapper.java b/engine-mr/src/main/java/org/apache/kylin/engine/mr/steps/RowKeyDistributionCheckerMapper.java
index fca91a6..ee8da6b 100644
--- a/engine-mr/src/main/java/org/apache/kylin/engine/mr/steps/RowKeyDistributionCheckerMapper.java
+++ b/engine-mr/src/main/java/org/apache/kylin/engine/mr/steps/RowKeyDistributionCheckerMapper.java
@@ -33,6 +33,7 @@ import org.apache.hadoop.io.SequenceFile;
 import org.apache.hadoop.io.Text;
 import org.apache.hadoop.io.Writable;
 import org.apache.hadoop.util.ReflectionUtils;
+import org.apache.kylin.engine.mr.HadoopUtil;
 import org.apache.kylin.engine.mr.KylinMapper;
 
 /**
@@ -89,7 +90,7 @@ public class RowKeyDistributionCheckerMapper extends KylinMapper<Text, Text, Tex
         List<byte[]> rowkeyList = new ArrayList<byte[]>();
         SequenceFile.Reader reader = null;
         try {
-            reader = new SequenceFile.Reader(path.getFileSystem(conf), path, conf);
+            reader = new SequenceFile.Reader(HadoopUtil.getFileSystem(path, conf), path, conf);
             Writable key = (Writable) ReflectionUtils.newInstance(reader.getKeyClass(), conf);
             Writable value = (Writable) ReflectionUtils.newInstance(reader.getValueClass(), conf);
             while (reader.next(key, value)) {

http://git-wip-us.apache.org/repos/asf/kylin/blob/ee020cc6/engine-mr/src/test/java/org/apache/kylin/engine/mr/SortedColumnReaderTest.java
----------------------------------------------------------------------
diff --git a/engine-mr/src/test/java/org/apache/kylin/engine/mr/SortedColumnReaderTest.java b/engine-mr/src/test/java/org/apache/kylin/engine/mr/SortedColumnReaderTest.java
index 3c4195f..be440f6 100644
--- a/engine-mr/src/test/java/org/apache/kylin/engine/mr/SortedColumnReaderTest.java
+++ b/engine-mr/src/test/java/org/apache/kylin/engine/mr/SortedColumnReaderTest.java
@@ -48,10 +48,9 @@ public class SortedColumnReaderTest {
     @Test
     public void testReadStringMultiFile() throws Exception {
         String dirPath = "src/test/resources/multi_file_str";
-        StringBytesConverter converter = new StringBytesConverter();
         ArrayList<String> correctAnswer = readAllFiles(dirPath);
         Collections.sort(correctAnswer, new ByteComparator<String>(new StringBytesConverter()));
-        SortedColumnDFSFile column = new SortedColumnDFSFile(dirPath + "/", DataType.getType("varchar"));
+        SortedColumnDFSFile column = new SortedColumnDFSFile(qualify(dirPath + "/"), DataType.getType("varchar"));
         IDictionaryValueEnumerator e = new TableColumnValueEnumerator(column.getReader(), -1);
         ArrayList<String> output = new ArrayList<>();
         while (e.moveNext()) {
@@ -124,7 +123,7 @@ public class SortedColumnReaderTest {
                 }
             }
         });
-        SortedColumnDFSFile column = new SortedColumnDFSFile(dirPath + "/", DataType.getType("long"));
+        SortedColumnDFSFile column = new SortedColumnDFSFile(qualify(dirPath + "/"), DataType.getType("long"));
         IDictionaryValueEnumerator e = new TableColumnValueEnumerator(column.getReader(), -1);
         ArrayList<String> output = new ArrayList<>();
         while (e.moveNext()) {
@@ -142,7 +141,7 @@ public class SortedColumnReaderTest {
     public void testEmptyDir() throws Exception {
         String dirPath = "src/test/resources/empty_dir";
         new File(dirPath).mkdirs();
-        SortedColumnDFSFile column = new SortedColumnDFSFile(dirPath + "/", DataType.getType("varchar"));
+        SortedColumnDFSFile column = new SortedColumnDFSFile(qualify(dirPath + "/"), DataType.getType("varchar"));
         IDictionaryValueEnumerator e = new TableColumnValueEnumerator(column.getReader(), -1);
         ArrayList<String> output = new ArrayList<>();
         while (e.moveNext()) {
@@ -159,7 +158,7 @@ public class SortedColumnReaderTest {
         final BytesConverter<String> converter = new StringBytesConverter();
         Collections.sort(correctAnswer, new ByteComparator<String>(new StringBytesConverter()));
         System.out.println("correct answer:" + correctAnswer);
-        SortedColumnDFSFile column = new SortedColumnDFSFile(dirPath + "/", DataType.getType("varchar"));
+        SortedColumnDFSFile column = new SortedColumnDFSFile(qualify(dirPath + "/"), DataType.getType("varchar"));
         IDictionaryValueEnumerator e = new TableColumnValueEnumerator(column.getReader(), -1);
         ArrayList<String> output = new ArrayList<>();
         while (e.moveNext()) {
@@ -230,7 +229,7 @@ public class SortedColumnReaderTest {
                 }
             }
         });
-        SortedColumnDFSFile column = new SortedColumnDFSFile(dirPath + "/", DataType.getType("double"));
+        SortedColumnDFSFile column = new SortedColumnDFSFile(qualify(dirPath + "/"), DataType.getType("double"));
         IDictionaryValueEnumerator e = new TableColumnValueEnumerator(column.getReader(), -1);
         ArrayList<String> output = new ArrayList<>();
         while (e.moveNext()) {
@@ -300,4 +299,13 @@ public class SortedColumnReaderTest {
         }
         return result;
     }
+    
+    private String qualify(String path) {
+        String absolutePath = new File(path).getAbsolutePath();
+        if (absolutePath.startsWith("/"))
+            return "file://" + absolutePath;
+        else
+            return "file:///" + absolutePath;
+    }
+
 }