You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@kylin.apache.org by li...@apache.org on 2015/12/23 08:14:07 UTC
kylin git commit: KYLIN-1245 bug fix when reading stats seq file
Repository: kylin
Updated Branches:
refs/heads/2.x-staging db95d72ca -> 54071e311
KYLIN-1245 bug fix when reading stats seq file
Project: http://git-wip-us.apache.org/repos/asf/kylin/repo
Commit: http://git-wip-us.apache.org/repos/asf/kylin/commit/54071e31
Tree: http://git-wip-us.apache.org/repos/asf/kylin/tree/54071e31
Diff: http://git-wip-us.apache.org/repos/asf/kylin/diff/54071e31
Branch: refs/heads/2.x-staging
Commit: 54071e311d1bfcbc96390631a2d98d3ce70aaef6
Parents: db95d72
Author: Li, Yang <ya...@ebay.com>
Authored: Wed Dec 23 15:13:57 2015 +0800
Committer: Li, Yang <ya...@ebay.com>
Committed: Wed Dec 23 15:13:57 2015 +0800
----------------------------------------------------------------------
.../kylin/engine/mr/common/CubeStatsReader.java | 29 +++++++++++++++-----
1 file changed, 22 insertions(+), 7 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/kylin/blob/54071e31/engine-mr/src/main/java/org/apache/kylin/engine/mr/common/CubeStatsReader.java
----------------------------------------------------------------------
diff --git a/engine-mr/src/main/java/org/apache/kylin/engine/mr/common/CubeStatsReader.java b/engine-mr/src/main/java/org/apache/kylin/engine/mr/common/CubeStatsReader.java
index fc27a81..bbc724a 100644
--- a/engine-mr/src/main/java/org/apache/kylin/engine/mr/common/CubeStatsReader.java
+++ b/engine-mr/src/main/java/org/apache/kylin/engine/mr/common/CubeStatsReader.java
@@ -18,6 +18,8 @@
package org.apache.kylin.engine.mr.common;
+import java.io.File;
+import java.io.FileOutputStream;
import java.io.IOException;
import java.io.InputStream;
import java.io.PrintWriter;
@@ -29,7 +31,7 @@ import java.util.Map;
import javax.annotation.Nullable;
import org.apache.hadoop.conf.Configuration;
-import org.apache.hadoop.fs.FSDataInputStream;
+import org.apache.hadoop.fs.Path;
import org.apache.hadoop.io.BytesWritable;
import org.apache.hadoop.io.IOUtils;
import org.apache.hadoop.io.LongWritable;
@@ -76,14 +78,14 @@ public class CubeStatsReader {
public CubeStatsReader(CubeSegment cubeSegment, KylinConfig kylinConfig) throws IOException {
ResourceStore store = ResourceStore.getStore(kylinConfig);
String statsKey = cubeSegment.getStatisticsResourcePath();
- InputStream is = store.getResource(statsKey).inputStream;
+ File tmpSeqFile = writeTmpSeqFile(store.getResource(statsKey).inputStream);
Reader reader = null;
try {
Configuration hadoopConf = HadoopUtil.getCurrentConfiguration();
- Option streamInput = SequenceFile.Reader.stream(new FSDataInputStream(is));
- reader = new SequenceFile.Reader(hadoopConf, streamInput);
+ Option seqInput = SequenceFile.Reader.file(new Path("file://" + tmpSeqFile.getAbsolutePath()));
+ reader = new SequenceFile.Reader(hadoopConf, seqInput);
int percentage = 100;
double mapperOverlapRatio = 0;
@@ -111,10 +113,23 @@ public class CubeStatsReader {
} finally {
IOUtils.closeStream(reader);
- IOUtils.closeStream(is);
+ tmpSeqFile.delete();
}
}
+ private File writeTmpSeqFile(InputStream inputStream) throws IOException {
+ File tempFile = File.createTempFile("kylin_stats_tmp", ".seq");
+ FileOutputStream out = null;
+ try {
+ out = new FileOutputStream(tempFile);
+ org.apache.commons.io.IOUtils.copy(inputStream, out);
+ } finally {
+ IOUtils.closeStream(inputStream);
+ IOUtils.closeStream(out);
+ }
+ return tempFile;
+ }
+
public Map<Long, Long> getCuboidRowCountMap() {
return getCuboidRowCountMapFromSampling(cuboidRowCountMap, samplingPercentage);
}
@@ -211,12 +226,12 @@ public class CubeStatsReader {
logger.info("Cuboid " + cuboidId + " has " + rowCount + " rows, each row size is " + bytesLength + " bytes." + " Total size is " + ret + "M.");
return ret;
}
-
+
public static void main(String[] args) throws IOException {
KylinConfig config = KylinConfig.getInstanceFromEnv();
CubeInstance cube = CubeManager.getInstance(config).getCube(args[0]);
List<CubeSegment> segments = cube.getSegments(SegmentStatusEnum.READY);
-
+
PrintWriter out = new PrintWriter(System.out);
for (CubeSegment seg : segments) {
new CubeStatsReader(seg, config).print(out);