You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@linkis.apache.org by al...@apache.org on 2022/03/09 09:22:34 UTC
[incubator-linkis] 07/09: Modify to create inputStream every time #1640
This is an automated email from the ASF dual-hosted git repository.
alexkun pushed a commit to branch dev-1.1.1
in repository https://gitbox.apache.org/repos/asf/incubator-linkis.git
commit 62945738d94d4b308c3573e80db4aee3ed21ee78
Author: peacewong <wp...@gmail.com>
AuthorDate: Wed Mar 9 16:11:50 2022 +0800
Modify to create inputStream every time #1640
---
.../linkis/entrance/log/CacheLogReader.scala | 16 ++-------
.../org/apache/linkis/entrance/log/LogReader.scala | 38 +++++++++++++++-------
2 files changed, 28 insertions(+), 26 deletions(-)
diff --git a/linkis-computation-governance/linkis-entrance/src/main/scala/org/apache/linkis/entrance/log/CacheLogReader.scala b/linkis-computation-governance/linkis-entrance/src/main/scala/org/apache/linkis/entrance/log/CacheLogReader.scala
index 38d1e65..a3f9e2d 100644
--- a/linkis-computation-governance/linkis-entrance/src/main/scala/org/apache/linkis/entrance/log/CacheLogReader.scala
+++ b/linkis-computation-governance/linkis-entrance/src/main/scala/org/apache/linkis/entrance/log/CacheLogReader.scala
@@ -30,7 +30,6 @@ class CacheLogReader(logPath: String, charset: String, sharedCache: Cache, user:
def getCache: Cache = sharedCache
- var inputStream: InputStream = _
var fileSystem: Fs = _
@@ -38,7 +37,7 @@ class CacheLogReader(logPath: String, charset: String, sharedCache: Cache, user:
private def createInputStream: InputStream = {
- if (fileSystem == null) this synchronized {
+ if (fileSystem == null) lock synchronized {
if (fileSystem == null) {
fileSystem = FSFactory.getFsByProxyUser(new FsPath(logPath), user)
fileSystem.init(new util.HashMap[String, String]())
@@ -50,12 +49,7 @@ class CacheLogReader(logPath: String, charset: String, sharedCache: Cache, user:
override def getInputStream: InputStream = {
- if (inputStream == null) lock.synchronized{
- if (inputStream == null) {
- inputStream = createInputStream
- }
- }
- inputStream
+ createInputStream
}
@@ -79,12 +73,6 @@ class CacheLogReader(logPath: String, charset: String, sharedCache: Cache, user:
@throws[IOException]
override def close(): Unit = {
- if (inputStream != null) {
- Utils.tryQuietly(inputStream.close(), t => {
- warn("Error encounters when closing inputStream.", t)
- })
- inputStream = null
- }
if (fileSystem != null) {
Utils.tryQuietly(fileSystem.close(), t => {
warn("Error encounters when closing fileSystem.", t)
diff --git a/linkis-computation-governance/linkis-entrance/src/main/scala/org/apache/linkis/entrance/log/LogReader.scala b/linkis-computation-governance/linkis-entrance/src/main/scala/org/apache/linkis/entrance/log/LogReader.scala
index 11ce89a..eea181c 100644
--- a/linkis-computation-governance/linkis-entrance/src/main/scala/org/apache/linkis/entrance/log/LogReader.scala
+++ b/linkis-computation-governance/linkis-entrance/src/main/scala/org/apache/linkis/entrance/log/LogReader.scala
@@ -17,8 +17,7 @@
package org.apache.linkis.entrance.log
-import java.io.{Closeable, IOException, InputStream}
-
+import java.io.{BufferedReader, Closeable, IOException, InputStream, InputStreamReader}
import org.apache.linkis.common.utils.{Logging, Utils}
import org.apache.linkis.entrance.exception.LogReadFailedException
import org.apache.commons.io.{IOUtils, LineIterator}
@@ -108,20 +107,35 @@ abstract class LogReader(charset: String) extends Closeable with Logging{
}
protected def readLog(deal: String => Unit, fromLine: Int, size: Int = 100): Int = {
- val from = if (fromLine < 0) 0 else fromLine
+ val from = if (fromLine < 1) 0 else fromLine - 1
var line, read = 0
- val lineIterator = IOUtils.lineIterator(getInputStream, charset)
- Utils.tryFinally(
- while (lineIterator.hasNext && (read < size || size < 0)) {
- val r = lineIterator.next()
- if (line >= from) {
- deal(r)
- read += 1
+ val inputStream = getInputStream
+ val bufferReader = new BufferedReader(new InputStreamReader(inputStream, charset))
+ Utils.tryFinally {
+ val skipNum = bufferReader.skip(from)
+ if (skipNum > from && size >= 0) {
+ var lineText = bufferReader.readLine()
+ while (lineText != null && read < size) {
+ val r = lineText
+ if (line >= from) {
+ deal(r)
+ read += 1
+ }
+ line += 1
+ lineText = bufferReader.readLine()
}
- line += 1
- })(LineIterator.closeQuietly(lineIterator))
+ }
+ } {
+ if (null != bufferReader) {
+ IOUtils.closeQuietly(bufferReader)
+ }
+ if (null != inputStream) {
+ IOUtils.closeQuietly(inputStream)
+ }
+ }
read
}
+
}
object LogReader {
val ERROR_HEADER1:Regex = "[0-9\\-]{10,10} [0-9:]{8,8}.?\\d{0,3} SYSTEM-ERROR ".r.unanchored
---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscribe@linkis.apache.org
For additional commands, e-mail: commits-help@linkis.apache.org