You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@kylin.apache.org by li...@apache.org on 2017/01/05 06:05:13 UTC
[2/5] kylin git commit: KYLIN-2351 enforce Path schema
KYLIN-2351 enforce Path schema
Project: http://git-wip-us.apache.org/repos/asf/kylin/repo
Commit: http://git-wip-us.apache.org/repos/asf/kylin/commit/ee020cc6
Tree: http://git-wip-us.apache.org/repos/asf/kylin/tree/ee020cc6
Diff: http://git-wip-us.apache.org/repos/asf/kylin/diff/ee020cc6
Branch: refs/heads/yang22-cdh5.7
Commit: ee020cc69bef3a263e348b594ddae77ca8390efe
Parents: 4f6515d
Author: Li Yang <li...@apache.org>
Authored: Thu Jan 5 11:26:02 2017 +0800
Committer: Li Yang <li...@apache.org>
Committed: Thu Jan 5 13:24:38 2017 +0800
----------------------------------------------------------------------
.../org/apache/kylin/engine/mr/HadoopUtil.java | 23 +++++++++++++++-----
.../engine/mr/steps/MergeStatisticsStep.java | 2 +-
.../steps/RowKeyDistributionCheckerMapper.java | 3 ++-
.../kylin/engine/mr/SortedColumnReaderTest.java | 20 ++++++++++++-----
4 files changed, 35 insertions(+), 13 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/kylin/blob/ee020cc6/engine-mr/src/main/java/org/apache/kylin/engine/mr/HadoopUtil.java
----------------------------------------------------------------------
diff --git a/engine-mr/src/main/java/org/apache/kylin/engine/mr/HadoopUtil.java b/engine-mr/src/main/java/org/apache/kylin/engine/mr/HadoopUtil.java
index 3d29a02..3119c1e 100644
--- a/engine-mr/src/main/java/org/apache/kylin/engine/mr/HadoopUtil.java
+++ b/engine-mr/src/main/java/org/apache/kylin/engine/mr/HadoopUtil.java
@@ -72,17 +72,30 @@ public class HadoopUtil {
}
public static FileSystem getWorkingFileSystem(Configuration conf) throws IOException {
- return getFileSystem(KylinConfig.getInstanceFromEnv().getHdfsWorkingDirectory(), conf);
+ Path workingPath = new Path(KylinConfig.getInstanceFromEnv().getHdfsWorkingDirectory());
+ return getFileSystem(workingPath, conf);
}
public static FileSystem getFileSystem(String path) throws IOException {
- return getFileSystem(path, getCurrentConfiguration());
+ return getFileSystem(new Path(makeURI(path)));
}
-
- static FileSystem getFileSystem(String path, Configuration conf) throws IOException {
- return FileSystem.get(makeURI(path), conf);
+
+ public static FileSystem getFileSystem(Path path) throws IOException {
+ Configuration conf = getCurrentConfiguration();
+ return getFileSystem(path, conf);
}
+ public static FileSystem getFileSystem(Path path, Configuration conf) {
+ if (StringUtils.isBlank(path.toUri().getScheme()))
+ throw new IllegalArgumentException("Path must be qualified: " + path);
+
+ try {
+ return path.getFileSystem(conf);
+ } catch (IOException e) {
+ throw new RuntimeException(e);
+ }
+ }
+
public static URI makeURI(String filePath) {
try {
return new URI(fixWindowsPath(filePath));
http://git-wip-us.apache.org/repos/asf/kylin/blob/ee020cc6/engine-mr/src/main/java/org/apache/kylin/engine/mr/steps/MergeStatisticsStep.java
----------------------------------------------------------------------
diff --git a/engine-mr/src/main/java/org/apache/kylin/engine/mr/steps/MergeStatisticsStep.java b/engine-mr/src/main/java/org/apache/kylin/engine/mr/steps/MergeStatisticsStep.java
index 88f6ba2..af86181 100644
--- a/engine-mr/src/main/java/org/apache/kylin/engine/mr/steps/MergeStatisticsStep.java
+++ b/engine-mr/src/main/java/org/apache/kylin/engine/mr/steps/MergeStatisticsStep.java
@@ -123,7 +123,7 @@ public class MergeStatisticsStep extends AbstractExecutable {
averageSamplingPercentage = averageSamplingPercentage / CubingExecutableUtil.getMergingSegmentIds(this.getParams()).size();
CubeStatsWriter.writeCuboidStatistics(conf, new Path(CubingExecutableUtil.getMergedStatisticsPath(this.getParams())), cuboidHLLMap, averageSamplingPercentage);
Path statisticsFilePath = new Path(CubingExecutableUtil.getMergedStatisticsPath(this.getParams()), BatchConstants.CFG_STATISTICS_CUBOID_ESTIMATION_FILENAME);
- FileSystem fs = statisticsFilePath.getFileSystem(conf);
+ FileSystem fs = HadoopUtil.getFileSystem(statisticsFilePath, conf);
FSDataInputStream is = fs.open(statisticsFilePath);
try {
// put the statistics to metadata store
http://git-wip-us.apache.org/repos/asf/kylin/blob/ee020cc6/engine-mr/src/main/java/org/apache/kylin/engine/mr/steps/RowKeyDistributionCheckerMapper.java
----------------------------------------------------------------------
diff --git a/engine-mr/src/main/java/org/apache/kylin/engine/mr/steps/RowKeyDistributionCheckerMapper.java b/engine-mr/src/main/java/org/apache/kylin/engine/mr/steps/RowKeyDistributionCheckerMapper.java
index fca91a6..ee8da6b 100644
--- a/engine-mr/src/main/java/org/apache/kylin/engine/mr/steps/RowKeyDistributionCheckerMapper.java
+++ b/engine-mr/src/main/java/org/apache/kylin/engine/mr/steps/RowKeyDistributionCheckerMapper.java
@@ -33,6 +33,7 @@ import org.apache.hadoop.io.SequenceFile;
import org.apache.hadoop.io.Text;
import org.apache.hadoop.io.Writable;
import org.apache.hadoop.util.ReflectionUtils;
+import org.apache.kylin.engine.mr.HadoopUtil;
import org.apache.kylin.engine.mr.KylinMapper;
/**
@@ -89,7 +90,7 @@ public class RowKeyDistributionCheckerMapper extends KylinMapper<Text, Text, Tex
List<byte[]> rowkeyList = new ArrayList<byte[]>();
SequenceFile.Reader reader = null;
try {
- reader = new SequenceFile.Reader(path.getFileSystem(conf), path, conf);
+ reader = new SequenceFile.Reader(HadoopUtil.getFileSystem(path, conf), path, conf);
Writable key = (Writable) ReflectionUtils.newInstance(reader.getKeyClass(), conf);
Writable value = (Writable) ReflectionUtils.newInstance(reader.getValueClass(), conf);
while (reader.next(key, value)) {
http://git-wip-us.apache.org/repos/asf/kylin/blob/ee020cc6/engine-mr/src/test/java/org/apache/kylin/engine/mr/SortedColumnReaderTest.java
----------------------------------------------------------------------
diff --git a/engine-mr/src/test/java/org/apache/kylin/engine/mr/SortedColumnReaderTest.java b/engine-mr/src/test/java/org/apache/kylin/engine/mr/SortedColumnReaderTest.java
index 3c4195f..be440f6 100644
--- a/engine-mr/src/test/java/org/apache/kylin/engine/mr/SortedColumnReaderTest.java
+++ b/engine-mr/src/test/java/org/apache/kylin/engine/mr/SortedColumnReaderTest.java
@@ -48,10 +48,9 @@ public class SortedColumnReaderTest {
@Test
public void testReadStringMultiFile() throws Exception {
String dirPath = "src/test/resources/multi_file_str";
- StringBytesConverter converter = new StringBytesConverter();
ArrayList<String> correctAnswer = readAllFiles(dirPath);
Collections.sort(correctAnswer, new ByteComparator<String>(new StringBytesConverter()));
- SortedColumnDFSFile column = new SortedColumnDFSFile(dirPath + "/", DataType.getType("varchar"));
+ SortedColumnDFSFile column = new SortedColumnDFSFile(qualify(dirPath + "/"), DataType.getType("varchar"));
IDictionaryValueEnumerator e = new TableColumnValueEnumerator(column.getReader(), -1);
ArrayList<String> output = new ArrayList<>();
while (e.moveNext()) {
@@ -124,7 +123,7 @@ public class SortedColumnReaderTest {
}
}
});
- SortedColumnDFSFile column = new SortedColumnDFSFile(dirPath + "/", DataType.getType("long"));
+ SortedColumnDFSFile column = new SortedColumnDFSFile(qualify(dirPath + "/"), DataType.getType("long"));
IDictionaryValueEnumerator e = new TableColumnValueEnumerator(column.getReader(), -1);
ArrayList<String> output = new ArrayList<>();
while (e.moveNext()) {
@@ -142,7 +141,7 @@ public class SortedColumnReaderTest {
public void testEmptyDir() throws Exception {
String dirPath = "src/test/resources/empty_dir";
new File(dirPath).mkdirs();
- SortedColumnDFSFile column = new SortedColumnDFSFile(dirPath + "/", DataType.getType("varchar"));
+ SortedColumnDFSFile column = new SortedColumnDFSFile(qualify(dirPath + "/"), DataType.getType("varchar"));
IDictionaryValueEnumerator e = new TableColumnValueEnumerator(column.getReader(), -1);
ArrayList<String> output = new ArrayList<>();
while (e.moveNext()) {
@@ -159,7 +158,7 @@ public class SortedColumnReaderTest {
final BytesConverter<String> converter = new StringBytesConverter();
Collections.sort(correctAnswer, new ByteComparator<String>(new StringBytesConverter()));
System.out.println("correct answer:" + correctAnswer);
- SortedColumnDFSFile column = new SortedColumnDFSFile(dirPath + "/", DataType.getType("varchar"));
+ SortedColumnDFSFile column = new SortedColumnDFSFile(qualify(dirPath + "/"), DataType.getType("varchar"));
IDictionaryValueEnumerator e = new TableColumnValueEnumerator(column.getReader(), -1);
ArrayList<String> output = new ArrayList<>();
while (e.moveNext()) {
@@ -230,7 +229,7 @@ public class SortedColumnReaderTest {
}
}
});
- SortedColumnDFSFile column = new SortedColumnDFSFile(dirPath + "/", DataType.getType("double"));
+ SortedColumnDFSFile column = new SortedColumnDFSFile(qualify(dirPath + "/"), DataType.getType("double"));
IDictionaryValueEnumerator e = new TableColumnValueEnumerator(column.getReader(), -1);
ArrayList<String> output = new ArrayList<>();
while (e.moveNext()) {
@@ -300,4 +299,13 @@ public class SortedColumnReaderTest {
}
return result;
}
+
+ private String qualify(String path) {
+ String absolutePath = new File(path).getAbsolutePath();
+ if (absolutePath.startsWith("/"))
+ return "file://" + absolutePath;
+ else
+ return "file:///" + absolutePath;
+ }
+
}