You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@linkis.apache.org by ca...@apache.org on 2022/09/14 02:43:17 UTC

[incubator-linkis] branch dev-1.3.1 updated: Fix the issue that the writer's close method is called repeatedly (#3347)

This is an automated email from the ASF dual-hosted git repository.

casion pushed a commit to branch dev-1.3.1
in repository https://gitbox.apache.org/repos/asf/incubator-linkis.git


The following commit(s) were added to refs/heads/dev-1.3.1 by this push:
     new b606cc899 Fix the issue that the writer's close method is called repeatedly (#3347)
b606cc899 is described below

commit b606cc899e40cf9e680e79b0f1260494c3e5ddec
Author: peacewong <wp...@gmail.com>
AuthorDate: Wed Sep 14 10:43:12 2022 +0800

    Fix the issue that the writer's close method is called repeatedly (#3347)
---
 .../linkis/storage/resultset/StorageResultSetWriter.scala     |  7 +++++--
 .../linkis/storage/script/writer/StorageScriptFsWriter.scala  |  4 +++-
 .../engineconnplugin/flink/executor/FlinkExecutor.scala       |  6 ++----
 .../engineplugin/hive/executor/HiveEngineConnExecutor.scala   |  1 -
 .../openlookeng/executor/OpenLooKengEngineConnExecutor.java   |  3 ++-
 .../presto/executer/PrestoEngineConnExecutor.scala            | 11 ++++++-----
 .../engineplugin/trino/executor/TrinoEngineConnExecutor.scala |  9 +++++----
 7 files changed, 23 insertions(+), 18 deletions(-)

diff --git a/linkis-commons/linkis-storage/src/main/scala/org/apache/linkis/storage/resultset/StorageResultSetWriter.scala b/linkis-commons/linkis-storage/src/main/scala/org/apache/linkis/storage/resultset/StorageResultSetWriter.scala
index c5f798264..9d56adc45 100644
--- a/linkis-commons/linkis-storage/src/main/scala/org/apache/linkis/storage/resultset/StorageResultSetWriter.scala
+++ b/linkis-commons/linkis-storage/src/main/scala/org/apache/linkis/storage/resultset/StorageResultSetWriter.scala
@@ -25,6 +25,7 @@ import org.apache.linkis.storage.conf.LinkisStorageConf
 import org.apache.linkis.storage.domain.Dolphin
 import org.apache.linkis.storage.utils.{FileSystemUtils, StorageUtils}
 
+import org.apache.commons.io.IOUtils
 import org.apache.hadoop.hdfs.client.HdfsDataOutputStream
 
 import java.io.{IOException, OutputStream}
@@ -146,7 +147,8 @@ class StorageResultSetWriter[K <: MetaData, V <: Record](
 
   def closeFs: Unit = {
     if (fs != null) {
-      fs.close()
+      IOUtils.closeQuietly(fs)
+      fs = null
     }
   }
 
@@ -154,7 +156,8 @@ class StorageResultSetWriter[K <: MetaData, V <: Record](
     Utils.tryFinally(if (outputStream != null) flush()) {
       closeFs
       if (outputStream != null) {
-        outputStream.close()
+        IOUtils.closeQuietly(outputStream)
+        outputStream = null
       }
     }
   }
diff --git a/linkis-commons/linkis-storage/src/main/scala/org/apache/linkis/storage/script/writer/StorageScriptFsWriter.scala b/linkis-commons/linkis-storage/src/main/scala/org/apache/linkis/storage/script/writer/StorageScriptFsWriter.scala
index a318c5504..bd64d24f0 100644
--- a/linkis-commons/linkis-storage/src/main/scala/org/apache/linkis/storage/script/writer/StorageScriptFsWriter.scala
+++ b/linkis-commons/linkis-storage/src/main/scala/org/apache/linkis/storage/script/writer/StorageScriptFsWriter.scala
@@ -71,7 +71,9 @@ class StorageScriptFsWriter(
   }
 
   override def close(): Unit = {
-    IOUtils.closeQuietly(outputStream)
+    if (outputStream != null) {
+      IOUtils.closeQuietly(outputStream)
+    }
   }
 
   override def flush(): Unit = if (outputStream != null) {
diff --git a/linkis-engineconn-plugins/flink/src/main/scala/org/apache/linkis/engineconnplugin/flink/executor/FlinkExecutor.scala b/linkis-engineconn-plugins/flink/src/main/scala/org/apache/linkis/engineconnplugin/flink/executor/FlinkExecutor.scala
index d7f92e792..ff2eaafae 100644
--- a/linkis-engineconn-plugins/flink/src/main/scala/org/apache/linkis/engineconnplugin/flink/executor/FlinkExecutor.scala
+++ b/linkis-engineconn-plugins/flink/src/main/scala/org/apache/linkis/engineconnplugin/flink/executor/FlinkExecutor.scala
@@ -135,10 +135,8 @@ object FlinkExecutor {
   ): Unit = {
     val resultSetWriter =
       engineExecutionContext.createResultSetWriter(ResultSetFactory.TABLE_TYPE)
-    Utils.tryFinally {
-      writeResultSet(resultSet, resultSetWriter)
-      engineExecutionContext.sendResultSet(resultSetWriter)
-    }(IOUtils.closeQuietly(resultSetWriter))
+    writeResultSet(resultSet, resultSetWriter)
+    engineExecutionContext.sendResultSet(resultSetWriter)
   }
 
 }
diff --git a/linkis-engineconn-plugins/hive/src/main/scala/org/apache/linkis/engineplugin/hive/executor/HiveEngineConnExecutor.scala b/linkis-engineconn-plugins/hive/src/main/scala/org/apache/linkis/engineplugin/hive/executor/HiveEngineConnExecutor.scala
index 5fee64676..f1bc20914 100644
--- a/linkis-engineconn-plugins/hive/src/main/scala/org/apache/linkis/engineplugin/hive/executor/HiveEngineConnExecutor.scala
+++ b/linkis-engineconn-plugins/hive/src/main/scala/org/apache/linkis/engineplugin/hive/executor/HiveEngineConnExecutor.scala
@@ -325,7 +325,6 @@ class HiveEngineConnExecutor(
       result.clear()
     }
     engineExecutorContext.sendResultSet(resultSetWriter)
-    IOUtils.closeQuietly(resultSetWriter)
     rows
   }
 
diff --git a/linkis-engineconn-plugins/openlookeng/src/main/java/org/apache/linkis/engineplugin/openlookeng/executor/OpenLooKengEngineConnExecutor.java b/linkis-engineconn-plugins/openlookeng/src/main/java/org/apache/linkis/engineplugin/openlookeng/executor/OpenLooKengEngineConnExecutor.java
index f0e9b4b0b..994ff2a50 100644
--- a/linkis-engineconn-plugins/openlookeng/src/main/java/org/apache/linkis/engineplugin/openlookeng/executor/OpenLooKengEngineConnExecutor.java
+++ b/linkis-engineconn-plugins/openlookeng/src/main/java/org/apache/linkis/engineplugin/openlookeng/executor/OpenLooKengEngineConnExecutor.java
@@ -345,8 +345,9 @@ public class OpenLooKengEngineConnExecutor extends ConcurrentComputationExecutor
       }
       LOG.warn("Fetched {} col(s) : {} row(s) in openlookeng", columnCount, rows);
       engineExecutorContext.sendResultSet(resultSetWriter);
-    } finally {
+    } catch (Exception e) {
       IOUtils.closeQuietly(resultSetWriter);
+      throw e;
     }
   }
 
diff --git a/linkis-engineconn-plugins/presto/src/main/scala/org/apache/linkis/engineplugin/presto/executer/PrestoEngineConnExecutor.scala b/linkis-engineconn-plugins/presto/src/main/scala/org/apache/linkis/engineplugin/presto/executer/PrestoEngineConnExecutor.scala
index 59b0a1b65..b80867078 100644
--- a/linkis-engineconn-plugins/presto/src/main/scala/org/apache/linkis/engineplugin/presto/executer/PrestoEngineConnExecutor.scala
+++ b/linkis-engineconn-plugins/presto/src/main/scala/org/apache/linkis/engineplugin/presto/executer/PrestoEngineConnExecutor.scala
@@ -279,7 +279,7 @@ class PrestoEngineConnExecutor(override val outputPrintLimit: Int, val id: Int)
     var columnCount = 0
     var rows = 0
     val resultSetWriter = engineExecutorContext.createResultSetWriter(ResultSetFactory.TABLE_TYPE)
-    Utils.tryFinally({
+    Utils.tryCatch {
       var results: QueryStatusInfo = null
       if (statement.isRunning) {
         results = statement.currentStatusInfo()
@@ -304,14 +304,15 @@ class PrestoEngineConnExecutor(override val outputPrintLimit: Int, val id: Int)
         engineExecutorContext.pushProgress(progress(taskId), getProgressInfo(taskId))
         statement.advance()
       }
-    })(IOUtils.closeQuietly(resultSetWriter))
-
-    info(s"Fetched $columnCount col(s) : $rows row(s) in presto")
+    } { case e: Exception =>
+      IOUtils.closeQuietly(resultSetWriter)
+      throw e
+    }
+    logger.info(s"Fetched $columnCount col(s) : $rows row(s) in presto")
     engineExecutorContext.appendStdout(
       LogUtils.generateInfo(s"Fetched $columnCount col(s) : $rows row(s) in presto")
     );
     engineExecutorContext.sendResultSet(resultSetWriter)
-    IOUtils.closeQuietly(resultSetWriter)
   }
 
   // check presto error
diff --git a/linkis-engineconn-plugins/trino/src/main/scala/org/apache/linkis/engineplugin/trino/executor/TrinoEngineConnExecutor.scala b/linkis-engineconn-plugins/trino/src/main/scala/org/apache/linkis/engineplugin/trino/executor/TrinoEngineConnExecutor.scala
index b1103fcd9..f9ecd2f0d 100644
--- a/linkis-engineconn-plugins/trino/src/main/scala/org/apache/linkis/engineplugin/trino/executor/TrinoEngineConnExecutor.scala
+++ b/linkis-engineconn-plugins/trino/src/main/scala/org/apache/linkis/engineplugin/trino/executor/TrinoEngineConnExecutor.scala
@@ -397,7 +397,7 @@ class TrinoEngineConnExecutor(override val outputPrintLimit: Int, val id: Int)
     var columnCount = 0
     var rows = 0
     val resultSetWriter = engineExecutorContext.createResultSetWriter(ResultSetFactory.TABLE_TYPE)
-    Utils.tryFinally({
+    Utils.tryCatch {
       var results: QueryStatusInfo = null
       if (statement.isRunning) {
         results = statement.currentStatusInfo()
@@ -422,14 +422,15 @@ class TrinoEngineConnExecutor(override val outputPrintLimit: Int, val id: Int)
         engineExecutorContext.pushProgress(progress(taskId), getProgressInfo(taskId))
         statement.advance()
       }
-    })(IOUtils.closeQuietly(resultSetWriter))
-
+    } { case e: Exception =>
+      IOUtils.closeQuietly(resultSetWriter)
+      throw e
+    }
     logger.info(s"Fetched $columnCount col(s) : $rows row(s) in trino")
     engineExecutorContext.appendStdout(
       LogUtils.generateInfo(s"Fetched $columnCount col(s) : $rows row(s) in trino")
     );
     engineExecutorContext.sendResultSet(resultSetWriter)
-    IOUtils.closeQuietly(resultSetWriter)
   }
 
   // check trino error


---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscribe@linkis.apache.org
For additional commands, e-mail: commits-help@linkis.apache.org