You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@kylin.apache.org by sh...@apache.org on 2020/03/06 06:22:52 UTC

[kylin] branch master updated: KYLIN-4396 Close FileReader in SaveDictStep

This is an automated email from the ASF dual-hosted git repository.

shaofengshi pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/kylin.git


The following commit(s) were added to refs/heads/master by this push:
     new 526d631  KYLIN-4396 Close FileReader in SaveDictStep
526d631 is described below

commit 526d6319e3dd9e2aa4d3b18ee8b74f83ba791ccf
Author: XiaoxiangYu <hi...@126.com>
AuthorDate: Wed Mar 4 09:51:57 2020 +0800

    KYLIN-4396 Close FileReader in SaveDictStep
---
 .../mr/streaming/ColumnarSplitDictReader.java      |  2 +-
 .../kylin/engine/mr/streaming/DictsReader.java     |  2 +
 .../kylin/engine/mr/streaming/SaveDictStep.java    | 46 +++++++++++-----------
 3 files changed, 26 insertions(+), 24 deletions(-)

diff --git a/engine-mr/src/main/java/org/apache/kylin/engine/mr/streaming/ColumnarSplitDictReader.java b/engine-mr/src/main/java/org/apache/kylin/engine/mr/streaming/ColumnarSplitDictReader.java
index 5b46270..de17b23 100644
--- a/engine-mr/src/main/java/org/apache/kylin/engine/mr/streaming/ColumnarSplitDictReader.java
+++ b/engine-mr/src/main/java/org/apache/kylin/engine/mr/streaming/ColumnarSplitDictReader.java
@@ -68,7 +68,7 @@ public class ColumnarSplitDictReader extends ColumnarSplitReader {
         itr = set.iterator();
         readCount = new AtomicInteger(0);
 
-        logger.debug("Reader for dictinary reader initialized. ");
+        logger.info("Reader for dictionary reader initialized. ");
     }
 
     @Override
diff --git a/engine-mr/src/main/java/org/apache/kylin/engine/mr/streaming/DictsReader.java b/engine-mr/src/main/java/org/apache/kylin/engine/mr/streaming/DictsReader.java
index e10a012..ecbe8cc 100644
--- a/engine-mr/src/main/java/org/apache/kylin/engine/mr/streaming/DictsReader.java
+++ b/engine-mr/src/main/java/org/apache/kylin/engine/mr/streaming/DictsReader.java
@@ -51,10 +51,12 @@ public class DictsReader extends ColumnarFilesReader {
         dataInputStream = fs.open(dataFilePath);
         Dictionary dict;
         String colName;
+        logger.info("Reading dictionary from {}", dataFilePath.getName());
         for (DimDictionaryMetaInfo dimDictMetaInfo : dimDictMetaInfos) {
             dataInputStream.seek(dimDictMetaInfo.getStartOffset());
             dict = DictionarySerializer.deserialize(dataInputStream);
             colName = dimDictMetaInfo.getDimName();
+            logger.info("Add dict for {}", colName);
             builder.put(colName, dict);
         }
         return builder.build();
diff --git a/engine-mr/src/main/java/org/apache/kylin/engine/mr/streaming/SaveDictStep.java b/engine-mr/src/main/java/org/apache/kylin/engine/mr/streaming/SaveDictStep.java
index fdd69ce..ae2fbe4 100644
--- a/engine-mr/src/main/java/org/apache/kylin/engine/mr/streaming/SaveDictStep.java
+++ b/engine-mr/src/main/java/org/apache/kylin/engine/mr/streaming/SaveDictStep.java
@@ -108,32 +108,32 @@ public class SaveDictStep extends AbstractExecutable {
                 }
             });
 
-            SequenceFile.Reader reader;
             for (FileStatus file : files) {
-                reader = new SequenceFile.Reader(fs, file.getPath(), conf);
-                Text colName = new Text();
-                Text dictInfo = new Text();
-                while (reader.next(colName, dictInfo)) {
-                    TblColRef colRef = colRefMap.get(colName.toString());
-                    if (colRef == null) {
-                        throw new IllegalArgumentException("Invalid column name " + colName
-                                + " or it need not build dictionary!");
-                    }
-                    DictionaryInfo dictionaryInfo = serializer.deserialize(new DataInputStream(
-                            new ByteArrayInputStream(dictInfo.getBytes())));
-
-                    Dictionary dict = dictionaryInfo.getDictionaryObject();
-                    if (dict != null) {
-                        dictionaryInfo = dictManager.trySaveNewDict(dict, dictionaryInfo);
-                        cubeSeg.putDictResPath(colRef, dictionaryInfo.getResourcePath());
-                        if (cubeSeg.getRowkeyStats() != null) {
-                            cubeSeg.getRowkeyStats().add(
-                                    new Object[] { colRef.getName(), dict.getSize(), dict.getSizeOfId() });
+                try (SequenceFile.Reader reader = new SequenceFile.Reader(fs, file.getPath(), conf)) {
+                    Text colName = new Text();
+                    Text dictInfo = new Text();
+                    while (reader.next(colName, dictInfo)) {
+                        TblColRef colRef = colRefMap.get(colName.toString());
+                        if (colRef == null) {
+                            throw new IllegalArgumentException("Invalid column name " + colName
+                                    + " or it need not build dictionary!");
+                        }
+                        DictionaryInfo dictionaryInfo = serializer.deserialize(new DataInputStream(
+                                new ByteArrayInputStream(dictInfo.getBytes())));
+
+                        Dictionary dict = dictionaryInfo.getDictionaryObject();
+                        if (dict != null) {
+                            dictionaryInfo = dictManager.trySaveNewDict(dict, dictionaryInfo);
+                            cubeSeg.putDictResPath(colRef, dictionaryInfo.getResourcePath());
+                            if (cubeSeg.getRowkeyStats() != null) {
+                                cubeSeg.getRowkeyStats().add(
+                                        new Object[]{colRef.getName(), dict.getSize(), dict.getSizeOfId()});
+                            } else {
+                                logger.error("rowkey_stats field not found!");
+                            }
                         } else {
-                            logger.error("rowkey_stats field not found!");
+                            logger.error("dictionary of column {} not found! ", colRef.getName());
                         }
-                    } else {
-                        logger.error("dictionary of column {} not found! ", colRef.getName());
                     }
                 }
             }