You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@kyuubi.apache.org by ch...@apache.org on 2023/02/17 03:56:19 UTC
[kyuubi] branch branch-1.6 updated: [KYUUBI #4282] Fix ConcurrentModificationException when log4j2 async enabled
This is an automated email from the ASF dual-hosted git repository.
chengpan pushed a commit to branch branch-1.6
in repository https://gitbox.apache.org/repos/asf/kyuubi.git
The following commit(s) were added to refs/heads/branch-1.6 by this push:
new 36b91ba2e [KYUUBI #4282] Fix ConcurrentModificationException when log4j2 async enabled
36b91ba2e is described below
commit 36b91ba2eb3f18650983295c2aead131fbe20d93
Author: Cheng Pan <ch...@apache.org>
AuthorDate: Fri Feb 17 11:39:32 2023 +0800
[KYUUBI #4282] Fix ConcurrentModificationException when log4j2 async enabled
This PR proposes to fix #4282, since log4j2 supports async mode, we need to make sure the `Log4j2DivertAppender#append` is thread-safe.
This PR also changes `OperationLog.getCurrentOperationLog` from `OperationLog` to `Option[OperationLog]`
- [ ] Add some test cases that check the changes thoroughly including negative and positive cases if possible
- [ ] Add screenshots for manual tests if appropriate
- [x] [Run test](https://kyuubi.readthedocs.io/en/master/develop_tools/testing.html#running-tests) locally before make a pull request
Closes #4300 from pan3793/log.
Closes #4282
010e34b0 [Cheng Pan] fix
068405b2 [Cheng Pan] fix compile
c79dedd5 [Cheng Pan] Use write lock instead
3daf8a4d [Cheng Pan] nit
94176a04 [Cheng Pan] [KYUUBI #4282] Fix ConcurrentModificationException when log4j2 async enabled
Authored-by: Cheng Pan <ch...@apache.org>
Signed-off-by: Cheng Pan <ch...@apache.org>
(cherry picked from commit 0be3cbff6e35c8e86635bfe6d856d0dfa148247d)
Signed-off-by: Cheng Pan <ch...@apache.org>
---
.../operation/log/Log4j12DivertAppender.scala | 5 ++-
.../operation/log/Log4j2DivertAppender.scala | 38 ++++++++++++----------
.../apache/kyuubi/operation/log/OperationLog.scala | 2 +-
.../kyuubi/operation/log/OperationLogSuite.scala | 4 +--
.../kyuubi/server/api/v1/BatchesResource.scala | 19 +++++++----
5 files changed, 38 insertions(+), 30 deletions(-)
diff --git a/kyuubi-common/src/main/scala/org/apache/kyuubi/operation/log/Log4j12DivertAppender.scala b/kyuubi-common/src/main/scala/org/apache/kyuubi/operation/log/Log4j12DivertAppender.scala
index 1191e94ae..df2ef93d8 100644
--- a/kyuubi-common/src/main/scala/org/apache/kyuubi/operation/log/Log4j12DivertAppender.scala
+++ b/kyuubi-common/src/main/scala/org/apache/kyuubi/operation/log/Log4j12DivertAppender.scala
@@ -39,7 +39,7 @@ class Log4j12DivertAppender extends WriterAppender {
setLayout(lo)
addFilter { _: LoggingEvent =>
- if (OperationLog.getCurrentOperationLog == null) Filter.DENY else Filter.NEUTRAL
+ if (OperationLog.getCurrentOperationLog.isDefined) Filter.NEUTRAL else Filter.DENY
}
/**
@@ -51,8 +51,7 @@ class Log4j12DivertAppender extends WriterAppender {
// That should've gone into our writer. Notify the LogContext.
val logOutput = writer.toString
writer.reset()
- val log = OperationLog.getCurrentOperationLog
- if (log != null) log.write(logOutput)
+ OperationLog.getCurrentOperationLog.foreach(_.write(logOutput))
}
}
diff --git a/kyuubi-common/src/main/scala/org/apache/kyuubi/operation/log/Log4j2DivertAppender.scala b/kyuubi-common/src/main/scala/org/apache/kyuubi/operation/log/Log4j2DivertAppender.scala
index 68753cf98..dc4b24a8c 100644
--- a/kyuubi-common/src/main/scala/org/apache/kyuubi/operation/log/Log4j2DivertAppender.scala
+++ b/kyuubi-common/src/main/scala/org/apache/kyuubi/operation/log/Log4j2DivertAppender.scala
@@ -18,6 +18,7 @@
package org.apache.kyuubi.operation.log
import java.io.CharArrayWriter
+import java.util.concurrent.locks.ReadWriteLock
import scala.collection.JavaConverters._
@@ -27,6 +28,8 @@ import org.apache.logging.log4j.core.appender.{AbstractWriterAppender, ConsoleAp
import org.apache.logging.log4j.core.filter.AbstractFilter
import org.apache.logging.log4j.core.layout.PatternLayout
+import org.apache.kyuubi.reflection.DynFields
+
class Log4j2DivertAppender(
name: String,
layout: StringLayout,
@@ -52,22 +55,19 @@ class Log4j2DivertAppender(
addFilter(new AbstractFilter() {
override def filter(event: LogEvent): Filter.Result = {
- if (OperationLog.getCurrentOperationLog == null) {
- Filter.Result.DENY
- } else {
+ if (OperationLog.getCurrentOperationLog.isDefined) {
Filter.Result.NEUTRAL
+ } else {
+ Filter.Result.DENY
}
}
})
- def initLayout(): StringLayout = {
- LogManager.getRootLogger.asInstanceOf[org.apache.logging.log4j.core.Logger]
- .getAppenders.values().asScala
- .find(ap => ap.isInstanceOf[ConsoleAppender] && ap.getLayout.isInstanceOf[StringLayout])
- .map(_.getLayout.asInstanceOf[StringLayout])
- .getOrElse(PatternLayout.newBuilder().withPattern(
- "%d{yy/MM/dd HH:mm:ss} %p %c{2}: %m%n").build())
- }
+ private val writeLock = DynFields.builder()
+ .hiddenImpl(classOf[AbstractWriterAppender[_]], "readWriteLock")
+ .build[ReadWriteLock](this)
+ .get()
+ .writeLock
/**
* Overrides AbstractWriterAppender.append(), which does the real logging. No need
@@ -75,11 +75,15 @@ class Log4j2DivertAppender(
*/
override def append(event: LogEvent): Unit = {
super.append(event)
- // That should've gone into our writer. Notify the LogContext.
- val logOutput = writer.toString
- writer.reset()
- val log = OperationLog.getCurrentOperationLog
- if (log != null) log.write(logOutput)
+ writeLock.lock()
+ try {
+ // That should've gone into our writer. Notify the LogContext.
+ val logOutput = writer.toString
+ writer.reset()
+ OperationLog.getCurrentOperationLog.foreach(_.write(logOutput))
+ } finally {
+ writeLock.unlock()
+ }
}
}
@@ -95,7 +99,7 @@ object Log4j2DivertAppender {
def initialize(): Unit = {
val ap = new Log4j2DivertAppender()
- org.apache.logging.log4j.LogManager.getRootLogger()
+ org.apache.logging.log4j.LogManager.getRootLogger
.asInstanceOf[org.apache.logging.log4j.core.Logger].addAppender(ap)
ap.start()
}
diff --git a/kyuubi-common/src/main/scala/org/apache/kyuubi/operation/log/OperationLog.scala b/kyuubi-common/src/main/scala/org/apache/kyuubi/operation/log/OperationLog.scala
index 84c4ed55c..e6312d0fb 100644
--- a/kyuubi-common/src/main/scala/org/apache/kyuubi/operation/log/OperationLog.scala
+++ b/kyuubi-common/src/main/scala/org/apache/kyuubi/operation/log/OperationLog.scala
@@ -44,7 +44,7 @@ object OperationLog extends Logging {
OPERATION_LOG.set(operationLog)
}
- def getCurrentOperationLog: OperationLog = OPERATION_LOG.get()
+ def getCurrentOperationLog: Option[OperationLog] = Option(OPERATION_LOG.get)
def removeCurrentOperationLog(): Unit = OPERATION_LOG.remove()
diff --git a/kyuubi-common/src/test/scala/org/apache/kyuubi/operation/log/OperationLogSuite.scala b/kyuubi-common/src/test/scala/org/apache/kyuubi/operation/log/OperationLogSuite.scala
index 758eeeeaf..fe3cbc7fc 100644
--- a/kyuubi-common/src/test/scala/org/apache/kyuubi/operation/log/OperationLogSuite.scala
+++ b/kyuubi-common/src/test/scala/org/apache/kyuubi/operation/log/OperationLogSuite.scala
@@ -61,10 +61,10 @@ class OperationLogSuite extends KyuubiFunSuite {
assert(!Files.exists(logFile))
OperationLog.setCurrentOperationLog(operationLog)
- assert(OperationLog.getCurrentOperationLog === operationLog)
+ assert(OperationLog.getCurrentOperationLog === Some(operationLog))
OperationLog.removeCurrentOperationLog()
- assert(OperationLog.getCurrentOperationLog === null)
+ assert(OperationLog.getCurrentOperationLog.isEmpty)
operationLog.write(msg1 + "\n")
assert(Files.exists(logFile))
diff --git a/kyuubi-server/src/main/scala/org/apache/kyuubi/server/api/v1/BatchesResource.scala b/kyuubi-server/src/main/scala/org/apache/kyuubi/server/api/v1/BatchesResource.scala
index e815ad73b..86beb5281 100644
--- a/kyuubi-server/src/main/scala/org/apache/kyuubi/server/api/v1/BatchesResource.scala
+++ b/kyuubi-server/src/main/scala/org/apache/kyuubi/server/api/v1/BatchesResource.scala
@@ -17,7 +17,8 @@
package org.apache.kyuubi.server.api.v1
-import java.util.Locale
+import java.util
+import java.util.{Collections, Locale}
import java.util.concurrent.ConcurrentHashMap
import javax.ws.rs._
import javax.ws.rs.core.MediaType
@@ -270,12 +271,16 @@ private[v1] class BatchesResource extends ApiRequestContext with Logging {
Option(sessionManager.getBatchSessionImpl(sessionHandle)).map { batchSession =>
try {
val submissionOp = batchSession.batchJobSubmissionOp
- val rowSet = submissionOp.getOperationLogRowSet(
- FetchOrientation.FETCH_NEXT,
- from,
- size)
- val logRowSet = rowSet.getColumns.get(0).getStringVal.getValues.asScala
- new OperationLog(logRowSet.asJava, logRowSet.size)
+ val rowSet = submissionOp.getOperationLogRowSet(FetchOrientation.FETCH_NEXT, from, size)
+ val columns = rowSet.getColumns
+ val logRowSet: util.List[String] =
+ if (columns == null || columns.size == 0) {
+ Collections.emptyList()
+ } else {
+ assert(columns.size == 1)
+ columns.get(0).getStringVal.getValues
+ }
+ new OperationLog(logRowSet, logRowSet.size)
} catch {
case NonFatal(e) =>
val errorMsg = s"Error getting operation log for batchId: $batchId"