You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@kylin.apache.org by sh...@apache.org on 2020/03/06 06:22:52 UTC
[kylin] branch master updated: KYLIN-4396 Close FileReader in
SaveDictStep
This is an automated email from the ASF dual-hosted git repository.
shaofengshi pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/kylin.git
The following commit(s) were added to refs/heads/master by this push:
new 526d631 KYLIN-4396 Close FileReader in SaveDictStep
526d631 is described below
commit 526d6319e3dd9e2aa4d3b18ee8b74f83ba791ccf
Author: XiaoxiangYu <hi...@126.com>
AuthorDate: Wed Mar 4 09:51:57 2020 +0800
KYLIN-4396 Close FileReader in SaveDictStep
---
.../mr/streaming/ColumnarSplitDictReader.java | 2 +-
.../kylin/engine/mr/streaming/DictsReader.java | 2 +
.../kylin/engine/mr/streaming/SaveDictStep.java | 46 +++++++++++-----------
3 files changed, 26 insertions(+), 24 deletions(-)
diff --git a/engine-mr/src/main/java/org/apache/kylin/engine/mr/streaming/ColumnarSplitDictReader.java b/engine-mr/src/main/java/org/apache/kylin/engine/mr/streaming/ColumnarSplitDictReader.java
index 5b46270..de17b23 100644
--- a/engine-mr/src/main/java/org/apache/kylin/engine/mr/streaming/ColumnarSplitDictReader.java
+++ b/engine-mr/src/main/java/org/apache/kylin/engine/mr/streaming/ColumnarSplitDictReader.java
@@ -68,7 +68,7 @@ public class ColumnarSplitDictReader extends ColumnarSplitReader {
itr = set.iterator();
readCount = new AtomicInteger(0);
- logger.debug("Reader for dictinary reader initialized. ");
+ logger.info("Reader for dictionary reader initialized. ");
}
@Override
diff --git a/engine-mr/src/main/java/org/apache/kylin/engine/mr/streaming/DictsReader.java b/engine-mr/src/main/java/org/apache/kylin/engine/mr/streaming/DictsReader.java
index e10a012..ecbe8cc 100644
--- a/engine-mr/src/main/java/org/apache/kylin/engine/mr/streaming/DictsReader.java
+++ b/engine-mr/src/main/java/org/apache/kylin/engine/mr/streaming/DictsReader.java
@@ -51,10 +51,12 @@ public class DictsReader extends ColumnarFilesReader {
dataInputStream = fs.open(dataFilePath);
Dictionary dict;
String colName;
+ logger.info("Reading dictionary from {}", dataFilePath.getName());
for (DimDictionaryMetaInfo dimDictMetaInfo : dimDictMetaInfos) {
dataInputStream.seek(dimDictMetaInfo.getStartOffset());
dict = DictionarySerializer.deserialize(dataInputStream);
colName = dimDictMetaInfo.getDimName();
+ logger.info("Add dict for {}", colName);
builder.put(colName, dict);
}
return builder.build();
diff --git a/engine-mr/src/main/java/org/apache/kylin/engine/mr/streaming/SaveDictStep.java b/engine-mr/src/main/java/org/apache/kylin/engine/mr/streaming/SaveDictStep.java
index fdd69ce..ae2fbe4 100644
--- a/engine-mr/src/main/java/org/apache/kylin/engine/mr/streaming/SaveDictStep.java
+++ b/engine-mr/src/main/java/org/apache/kylin/engine/mr/streaming/SaveDictStep.java
@@ -108,32 +108,32 @@ public class SaveDictStep extends AbstractExecutable {
}
});
- SequenceFile.Reader reader;
for (FileStatus file : files) {
- reader = new SequenceFile.Reader(fs, file.getPath(), conf);
- Text colName = new Text();
- Text dictInfo = new Text();
- while (reader.next(colName, dictInfo)) {
- TblColRef colRef = colRefMap.get(colName.toString());
- if (colRef == null) {
- throw new IllegalArgumentException("Invalid column name " + colName
- + " or it need not build dictionary!");
- }
- DictionaryInfo dictionaryInfo = serializer.deserialize(new DataInputStream(
- new ByteArrayInputStream(dictInfo.getBytes())));
-
- Dictionary dict = dictionaryInfo.getDictionaryObject();
- if (dict != null) {
- dictionaryInfo = dictManager.trySaveNewDict(dict, dictionaryInfo);
- cubeSeg.putDictResPath(colRef, dictionaryInfo.getResourcePath());
- if (cubeSeg.getRowkeyStats() != null) {
- cubeSeg.getRowkeyStats().add(
- new Object[] { colRef.getName(), dict.getSize(), dict.getSizeOfId() });
+ try (SequenceFile.Reader reader = new SequenceFile.Reader(fs, file.getPath(), conf)) {
+ Text colName = new Text();
+ Text dictInfo = new Text();
+ while (reader.next(colName, dictInfo)) {
+ TblColRef colRef = colRefMap.get(colName.toString());
+ if (colRef == null) {
+ throw new IllegalArgumentException("Invalid column name " + colName
+ + " or it need not build dictionary!");
+ }
+ DictionaryInfo dictionaryInfo = serializer.deserialize(new DataInputStream(
+ new ByteArrayInputStream(dictInfo.getBytes())));
+
+ Dictionary dict = dictionaryInfo.getDictionaryObject();
+ if (dict != null) {
+ dictionaryInfo = dictManager.trySaveNewDict(dict, dictionaryInfo);
+ cubeSeg.putDictResPath(colRef, dictionaryInfo.getResourcePath());
+ if (cubeSeg.getRowkeyStats() != null) {
+ cubeSeg.getRowkeyStats().add(
+ new Object[]{colRef.getName(), dict.getSize(), dict.getSizeOfId()});
+ } else {
+ logger.error("rowkey_stats field not found!");
+ }
} else {
- logger.error("rowkey_stats field not found!");
+ logger.error("dictionary of column {} not found! ", colRef.getName());
}
- } else {
- logger.error("dictionary of column {} not found! ", colRef.getName());
}
}
}