You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@kylin.apache.org by sh...@apache.org on 2016/10/09 14:17:35 UTC
[3/9] kylin git commit: KYLIN-1726 fix 'FileSystem Closed' error
KYLIN-1726 fix 'FileSystem Closed' error
Project: http://git-wip-us.apache.org/repos/asf/kylin/repo
Commit: http://git-wip-us.apache.org/repos/asf/kylin/commit/14c680c9
Tree: http://git-wip-us.apache.org/repos/asf/kylin/tree/14c680c9
Diff: http://git-wip-us.apache.org/repos/asf/kylin/diff/14c680c9
Branch: refs/heads/KYLIN-2072
Commit: 14c680c9444b4a2e1a64baad4ca81498e9c27e47
Parents: bc3cd88
Author: shaofengshi <sh...@apache.org>
Authored: Sun Oct 9 19:06:07 2016 +0800
Committer: shaofengshi <sh...@apache.org>
Committed: Sun Oct 9 19:06:17 2016 +0800
----------------------------------------------------------------------
.../apache/kylin/source/kafka/UpdateTimeRangeStep.java | 11 ++++++++++-
1 file changed, 10 insertions(+), 1 deletion(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/kylin/blob/14c680c9/source-kafka/src/main/java/org/apache/kylin/source/kafka/UpdateTimeRangeStep.java
----------------------------------------------------------------------
diff --git a/source-kafka/src/main/java/org/apache/kylin/source/kafka/UpdateTimeRangeStep.java b/source-kafka/src/main/java/org/apache/kylin/source/kafka/UpdateTimeRangeStep.java
index bb64bf9..9e902d8 100644
--- a/source-kafka/src/main/java/org/apache/kylin/source/kafka/UpdateTimeRangeStep.java
+++ b/source-kafka/src/main/java/org/apache/kylin/source/kafka/UpdateTimeRangeStep.java
@@ -21,6 +21,7 @@ import java.io.BufferedReader;
import java.io.IOException;
import java.io.InputStreamReader;
+import org.apache.commons.io.IOUtils;
import org.apache.commons.lang3.StringUtils;
import org.apache.commons.lang3.time.FastDateFormat;
import org.apache.hadoop.fs.FSDataInputStream;
@@ -63,7 +64,12 @@ public class UpdateTimeRangeStep extends AbstractExecutable {
final Path outputFile = new Path(outputPath, partitionCol.getName());
String minValue = null, maxValue = null, currentValue = null;
- try (FileSystem fs = HadoopUtil.getFileSystem(outputPath); FSDataInputStream inputStream = fs.open(outputFile); BufferedReader bufferedReader = new BufferedReader(new InputStreamReader(inputStream))) {
+ FSDataInputStream inputStream = null;
+ BufferedReader bufferedReader = null;
+ try {
+ FileSystem fs = HadoopUtil.getFileSystem(outputPath);
+ inputStream = fs.open(outputFile);
+ bufferedReader = new BufferedReader(new InputStreamReader(inputStream));
minValue = currentValue = bufferedReader.readLine();
while (currentValue != null) {
maxValue = currentValue;
@@ -72,6 +78,9 @@ public class UpdateTimeRangeStep extends AbstractExecutable {
} catch (IOException e) {
logger.error("fail to read file " + outputFile, e);
return new ExecuteResult(ExecuteResult.State.ERROR, e.getLocalizedMessage());
+ } finally {
+ IOUtils.closeQuietly(bufferedReader);
+ IOUtils.closeQuietly(inputStream);
}
final DataType partitionColType = partitionCol.getType();