You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@linkis.apache.org by al...@apache.org on 2022/03/09 09:22:34 UTC

[incubator-linkis] 07/09: Modify to create inputStream every time #1640

This is an automated email from the ASF dual-hosted git repository.

alexkun pushed a commit to branch dev-1.1.1
in repository https://gitbox.apache.org/repos/asf/incubator-linkis.git

commit 62945738d94d4b308c3573e80db4aee3ed21ee78
Author: peacewong <wp...@gmail.com>
AuthorDate: Wed Mar 9 16:11:50 2022 +0800

    Modify to create inputStream every time #1640
---
 .../linkis/entrance/log/CacheLogReader.scala       | 16 ++-------
 .../org/apache/linkis/entrance/log/LogReader.scala | 38 +++++++++++++++-------
 2 files changed, 28 insertions(+), 26 deletions(-)

diff --git a/linkis-computation-governance/linkis-entrance/src/main/scala/org/apache/linkis/entrance/log/CacheLogReader.scala b/linkis-computation-governance/linkis-entrance/src/main/scala/org/apache/linkis/entrance/log/CacheLogReader.scala
index 38d1e65..a3f9e2d 100644
--- a/linkis-computation-governance/linkis-entrance/src/main/scala/org/apache/linkis/entrance/log/CacheLogReader.scala
+++ b/linkis-computation-governance/linkis-entrance/src/main/scala/org/apache/linkis/entrance/log/CacheLogReader.scala
@@ -30,7 +30,6 @@ class CacheLogReader(logPath: String, charset: String, sharedCache: Cache, user:
 
   def getCache: Cache = sharedCache
 
-  var inputStream: InputStream = _
 
   var fileSystem: Fs = _
 
@@ -38,7 +37,7 @@ class CacheLogReader(logPath: String, charset: String, sharedCache: Cache, user:
 
 
   private def createInputStream: InputStream = {
-    if (fileSystem == null) this synchronized {
+    if (fileSystem == null) lock synchronized {
       if (fileSystem == null) {
         fileSystem = FSFactory.getFsByProxyUser(new FsPath(logPath), user)
         fileSystem.init(new util.HashMap[String, String]())
@@ -50,12 +49,7 @@ class CacheLogReader(logPath: String, charset: String, sharedCache: Cache, user:
 
 
   override def getInputStream: InputStream = {
-    if (inputStream == null) lock.synchronized{
-      if (inputStream == null) {
-        inputStream = createInputStream
-      }
-    }
-    inputStream
+    createInputStream
   }
 
 
@@ -79,12 +73,6 @@ class CacheLogReader(logPath: String, charset: String, sharedCache: Cache, user:
 
   @throws[IOException]
   override def close(): Unit = {
-    if (inputStream != null) {
-      Utils.tryQuietly(inputStream.close(), t => {
-        warn("Error encounters when closing inputStream.", t)
-      })
-      inputStream = null
-    }
     if (fileSystem != null) {
       Utils.tryQuietly(fileSystem.close(), t => {
         warn("Error encounters when closing fileSystem.", t)
diff --git a/linkis-computation-governance/linkis-entrance/src/main/scala/org/apache/linkis/entrance/log/LogReader.scala b/linkis-computation-governance/linkis-entrance/src/main/scala/org/apache/linkis/entrance/log/LogReader.scala
index 11ce89a..eea181c 100644
--- a/linkis-computation-governance/linkis-entrance/src/main/scala/org/apache/linkis/entrance/log/LogReader.scala
+++ b/linkis-computation-governance/linkis-entrance/src/main/scala/org/apache/linkis/entrance/log/LogReader.scala
@@ -17,8 +17,7 @@
 
 package org.apache.linkis.entrance.log
 
-import java.io.{Closeable, IOException, InputStream}
-
+import java.io.{BufferedReader, Closeable, IOException, InputStream, InputStreamReader}
 import org.apache.linkis.common.utils.{Logging, Utils}
 import org.apache.linkis.entrance.exception.LogReadFailedException
 import org.apache.commons.io.{IOUtils, LineIterator}
@@ -108,20 +107,35 @@ abstract class LogReader(charset: String) extends Closeable with Logging{
   }
 
   protected def readLog(deal: String => Unit, fromLine: Int, size: Int = 100): Int = {
-    val from = if (fromLine < 0) 0 else fromLine
+    val from = if (fromLine < 1) 0 else fromLine - 1
     var line, read = 0
-    val lineIterator = IOUtils.lineIterator(getInputStream, charset)
-    Utils.tryFinally(
-      while (lineIterator.hasNext && (read < size || size < 0)) {
-        val r = lineIterator.next()
-        if (line >= from) {
-          deal(r)
-          read += 1
+    val inputStream = getInputStream
+    val bufferReader = new BufferedReader(new InputStreamReader(inputStream, charset))
+    Utils.tryFinally {
+      val skipNum = bufferReader.skip(from)
+      if (skipNum > from && size >= 0) {
+        var lineText = bufferReader.readLine()
+        while (lineText != null && read < size) {
+          val r = lineText
+          if (line >= from) {
+            deal(r)
+            read += 1
+          }
+          line += 1
+          lineText = bufferReader.readLine()
         }
-        line += 1
-      })(LineIterator.closeQuietly(lineIterator))
+      }
+    } {
+      if (null != bufferReader) {
+        IOUtils.closeQuietly(bufferReader)
+      }
+      if (null != inputStream) {
+        IOUtils.closeQuietly(inputStream)
+      }
+    }
     read
   }
+
 }
 object LogReader {
   val ERROR_HEADER1:Regex = "[0-9\\-]{10,10} [0-9:]{8,8}.?\\d{0,3} SYSTEM-ERROR ".r.unanchored

---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscribe@linkis.apache.org
For additional commands, e-mail: commits-help@linkis.apache.org