You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@linkis.apache.org by ca...@apache.org on 2022/09/14 02:43:17 UTC
[incubator-linkis] branch dev-1.3.1 updated: Fix the issue that the writer's close method is called repeatedly (#3347)
This is an automated email from the ASF dual-hosted git repository.
casion pushed a commit to branch dev-1.3.1
in repository https://gitbox.apache.org/repos/asf/incubator-linkis.git
The following commit(s) were added to refs/heads/dev-1.3.1 by this push:
new b606cc899 Fix the issue that the writer's close method is called repeatedly (#3347)
b606cc899 is described below
commit b606cc899e40cf9e680e79b0f1260494c3e5ddec
Author: peacewong <wp...@gmail.com>
AuthorDate: Wed Sep 14 10:43:12 2022 +0800
Fix the issue that the writer's close method is called repeatedly (#3347)
---
.../linkis/storage/resultset/StorageResultSetWriter.scala | 7 +++++--
.../linkis/storage/script/writer/StorageScriptFsWriter.scala | 4 +++-
.../engineconnplugin/flink/executor/FlinkExecutor.scala | 6 ++----
.../engineplugin/hive/executor/HiveEngineConnExecutor.scala | 1 -
.../openlookeng/executor/OpenLooKengEngineConnExecutor.java | 3 ++-
.../presto/executer/PrestoEngineConnExecutor.scala | 11 ++++++-----
.../engineplugin/trino/executor/TrinoEngineConnExecutor.scala | 9 +++++----
7 files changed, 23 insertions(+), 18 deletions(-)
diff --git a/linkis-commons/linkis-storage/src/main/scala/org/apache/linkis/storage/resultset/StorageResultSetWriter.scala b/linkis-commons/linkis-storage/src/main/scala/org/apache/linkis/storage/resultset/StorageResultSetWriter.scala
index c5f798264..9d56adc45 100644
--- a/linkis-commons/linkis-storage/src/main/scala/org/apache/linkis/storage/resultset/StorageResultSetWriter.scala
+++ b/linkis-commons/linkis-storage/src/main/scala/org/apache/linkis/storage/resultset/StorageResultSetWriter.scala
@@ -25,6 +25,7 @@ import org.apache.linkis.storage.conf.LinkisStorageConf
import org.apache.linkis.storage.domain.Dolphin
import org.apache.linkis.storage.utils.{FileSystemUtils, StorageUtils}
+import org.apache.commons.io.IOUtils
import org.apache.hadoop.hdfs.client.HdfsDataOutputStream
import java.io.{IOException, OutputStream}
@@ -146,7 +147,8 @@ class StorageResultSetWriter[K <: MetaData, V <: Record](
def closeFs: Unit = {
if (fs != null) {
- fs.close()
+ IOUtils.closeQuietly(fs)
+ fs = null
}
}
@@ -154,7 +156,8 @@ class StorageResultSetWriter[K <: MetaData, V <: Record](
Utils.tryFinally(if (outputStream != null) flush()) {
closeFs
if (outputStream != null) {
- outputStream.close()
+ IOUtils.closeQuietly(outputStream)
+ outputStream = null
}
}
}
diff --git a/linkis-commons/linkis-storage/src/main/scala/org/apache/linkis/storage/script/writer/StorageScriptFsWriter.scala b/linkis-commons/linkis-storage/src/main/scala/org/apache/linkis/storage/script/writer/StorageScriptFsWriter.scala
index a318c5504..bd64d24f0 100644
--- a/linkis-commons/linkis-storage/src/main/scala/org/apache/linkis/storage/script/writer/StorageScriptFsWriter.scala
+++ b/linkis-commons/linkis-storage/src/main/scala/org/apache/linkis/storage/script/writer/StorageScriptFsWriter.scala
@@ -71,7 +71,9 @@ class StorageScriptFsWriter(
}
override def close(): Unit = {
- IOUtils.closeQuietly(outputStream)
+ if (outputStream != null) {
+ IOUtils.closeQuietly(outputStream)
+ }
}
override def flush(): Unit = if (outputStream != null) {
diff --git a/linkis-engineconn-plugins/flink/src/main/scala/org/apache/linkis/engineconnplugin/flink/executor/FlinkExecutor.scala b/linkis-engineconn-plugins/flink/src/main/scala/org/apache/linkis/engineconnplugin/flink/executor/FlinkExecutor.scala
index d7f92e792..ff2eaafae 100644
--- a/linkis-engineconn-plugins/flink/src/main/scala/org/apache/linkis/engineconnplugin/flink/executor/FlinkExecutor.scala
+++ b/linkis-engineconn-plugins/flink/src/main/scala/org/apache/linkis/engineconnplugin/flink/executor/FlinkExecutor.scala
@@ -135,10 +135,8 @@ object FlinkExecutor {
): Unit = {
val resultSetWriter =
engineExecutionContext.createResultSetWriter(ResultSetFactory.TABLE_TYPE)
- Utils.tryFinally {
- writeResultSet(resultSet, resultSetWriter)
- engineExecutionContext.sendResultSet(resultSetWriter)
- }(IOUtils.closeQuietly(resultSetWriter))
+ writeResultSet(resultSet, resultSetWriter)
+ engineExecutionContext.sendResultSet(resultSetWriter)
}
}
diff --git a/linkis-engineconn-plugins/hive/src/main/scala/org/apache/linkis/engineplugin/hive/executor/HiveEngineConnExecutor.scala b/linkis-engineconn-plugins/hive/src/main/scala/org/apache/linkis/engineplugin/hive/executor/HiveEngineConnExecutor.scala
index 5fee64676..f1bc20914 100644
--- a/linkis-engineconn-plugins/hive/src/main/scala/org/apache/linkis/engineplugin/hive/executor/HiveEngineConnExecutor.scala
+++ b/linkis-engineconn-plugins/hive/src/main/scala/org/apache/linkis/engineplugin/hive/executor/HiveEngineConnExecutor.scala
@@ -325,7 +325,6 @@ class HiveEngineConnExecutor(
result.clear()
}
engineExecutorContext.sendResultSet(resultSetWriter)
- IOUtils.closeQuietly(resultSetWriter)
rows
}
diff --git a/linkis-engineconn-plugins/openlookeng/src/main/java/org/apache/linkis/engineplugin/openlookeng/executor/OpenLooKengEngineConnExecutor.java b/linkis-engineconn-plugins/openlookeng/src/main/java/org/apache/linkis/engineplugin/openlookeng/executor/OpenLooKengEngineConnExecutor.java
index f0e9b4b0b..994ff2a50 100644
--- a/linkis-engineconn-plugins/openlookeng/src/main/java/org/apache/linkis/engineplugin/openlookeng/executor/OpenLooKengEngineConnExecutor.java
+++ b/linkis-engineconn-plugins/openlookeng/src/main/java/org/apache/linkis/engineplugin/openlookeng/executor/OpenLooKengEngineConnExecutor.java
@@ -345,8 +345,9 @@ public class OpenLooKengEngineConnExecutor extends ConcurrentComputationExecutor
}
LOG.warn("Fetched {} col(s) : {} row(s) in openlookeng", columnCount, rows);
engineExecutorContext.sendResultSet(resultSetWriter);
- } finally {
+ } catch (Exception e) {
IOUtils.closeQuietly(resultSetWriter);
+ throw e;
}
}
diff --git a/linkis-engineconn-plugins/presto/src/main/scala/org/apache/linkis/engineplugin/presto/executer/PrestoEngineConnExecutor.scala b/linkis-engineconn-plugins/presto/src/main/scala/org/apache/linkis/engineplugin/presto/executer/PrestoEngineConnExecutor.scala
index 59b0a1b65..b80867078 100644
--- a/linkis-engineconn-plugins/presto/src/main/scala/org/apache/linkis/engineplugin/presto/executer/PrestoEngineConnExecutor.scala
+++ b/linkis-engineconn-plugins/presto/src/main/scala/org/apache/linkis/engineplugin/presto/executer/PrestoEngineConnExecutor.scala
@@ -279,7 +279,7 @@ class PrestoEngineConnExecutor(override val outputPrintLimit: Int, val id: Int)
var columnCount = 0
var rows = 0
val resultSetWriter = engineExecutorContext.createResultSetWriter(ResultSetFactory.TABLE_TYPE)
- Utils.tryFinally({
+ Utils.tryCatch {
var results: QueryStatusInfo = null
if (statement.isRunning) {
results = statement.currentStatusInfo()
@@ -304,14 +304,15 @@ class PrestoEngineConnExecutor(override val outputPrintLimit: Int, val id: Int)
engineExecutorContext.pushProgress(progress(taskId), getProgressInfo(taskId))
statement.advance()
}
- })(IOUtils.closeQuietly(resultSetWriter))
-
- info(s"Fetched $columnCount col(s) : $rows row(s) in presto")
+ } { case e: Exception =>
+ IOUtils.closeQuietly(resultSetWriter)
+ throw e
+ }
+ logger.info(s"Fetched $columnCount col(s) : $rows row(s) in presto")
engineExecutorContext.appendStdout(
LogUtils.generateInfo(s"Fetched $columnCount col(s) : $rows row(s) in presto")
);
engineExecutorContext.sendResultSet(resultSetWriter)
- IOUtils.closeQuietly(resultSetWriter)
}
// check presto error
diff --git a/linkis-engineconn-plugins/trino/src/main/scala/org/apache/linkis/engineplugin/trino/executor/TrinoEngineConnExecutor.scala b/linkis-engineconn-plugins/trino/src/main/scala/org/apache/linkis/engineplugin/trino/executor/TrinoEngineConnExecutor.scala
index b1103fcd9..f9ecd2f0d 100644
--- a/linkis-engineconn-plugins/trino/src/main/scala/org/apache/linkis/engineplugin/trino/executor/TrinoEngineConnExecutor.scala
+++ b/linkis-engineconn-plugins/trino/src/main/scala/org/apache/linkis/engineplugin/trino/executor/TrinoEngineConnExecutor.scala
@@ -397,7 +397,7 @@ class TrinoEngineConnExecutor(override val outputPrintLimit: Int, val id: Int)
var columnCount = 0
var rows = 0
val resultSetWriter = engineExecutorContext.createResultSetWriter(ResultSetFactory.TABLE_TYPE)
- Utils.tryFinally({
+ Utils.tryCatch {
var results: QueryStatusInfo = null
if (statement.isRunning) {
results = statement.currentStatusInfo()
@@ -422,14 +422,15 @@ class TrinoEngineConnExecutor(override val outputPrintLimit: Int, val id: Int)
engineExecutorContext.pushProgress(progress(taskId), getProgressInfo(taskId))
statement.advance()
}
- })(IOUtils.closeQuietly(resultSetWriter))
-
+ } { case e: Exception =>
+ IOUtils.closeQuietly(resultSetWriter)
+ throw e
+ }
logger.info(s"Fetched $columnCount col(s) : $rows row(s) in trino")
engineExecutorContext.appendStdout(
LogUtils.generateInfo(s"Fetched $columnCount col(s) : $rows row(s) in trino")
);
engineExecutorContext.sendResultSet(resultSetWriter)
- IOUtils.closeQuietly(resultSetWriter)
}
// check trino error
---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscribe@linkis.apache.org
For additional commands, e-mail: commits-help@linkis.apache.org