You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@kyuubi.apache.org by ch...@apache.org on 2023/02/17 03:56:19 UTC

[kyuubi] branch branch-1.6 updated: [KYUUBI #4282] Fix ConcurrentModificationException when log4j2 async enabled

This is an automated email from the ASF dual-hosted git repository.

chengpan pushed a commit to branch branch-1.6
in repository https://gitbox.apache.org/repos/asf/kyuubi.git


The following commit(s) were added to refs/heads/branch-1.6 by this push:
     new 36b91ba2e [KYUUBI #4282] Fix ConcurrentModificationException when log4j2 async enabled
36b91ba2e is described below

commit 36b91ba2eb3f18650983295c2aead131fbe20d93
Author: Cheng Pan <ch...@apache.org>
AuthorDate: Fri Feb 17 11:39:32 2023 +0800

    [KYUUBI #4282] Fix ConcurrentModificationException when log4j2 async enabled
    
    This PR proposes to fix #4282, since log4j2 supports async mode, we need to make sure the `Log4j2DivertAppender#append` is thread-safe.
    
    This PR also changes `OperationLog.getCurrentOperationLog` from `OperationLog` to `Option[OperationLog]`
    
    - [ ] Add some test cases that check the changes thoroughly including negative and positive cases if possible
    
    - [ ] Add screenshots for manual tests if appropriate
    
    - [x] [Run test](https://kyuubi.readthedocs.io/en/master/develop_tools/testing.html#running-tests) locally before make a pull request
    
    Closes #4300 from pan3793/log.
    
    Closes #4282
    
    010e34b0 [Cheng Pan] fix
    068405b2 [Cheng Pan] fix compile
    c79dedd5 [Cheng Pan] Use write lock instead
    3daf8a4d [Cheng Pan] nit
    94176a04 [Cheng Pan] [KYUUBI #4282] Fix ConcurrentModificationException when log4j2 async enabled
    
    Authored-by: Cheng Pan <ch...@apache.org>
    Signed-off-by: Cheng Pan <ch...@apache.org>
    (cherry picked from commit 0be3cbff6e35c8e86635bfe6d856d0dfa148247d)
    Signed-off-by: Cheng Pan <ch...@apache.org>
---
 .../operation/log/Log4j12DivertAppender.scala      |  5 ++-
 .../operation/log/Log4j2DivertAppender.scala       | 38 ++++++++++++----------
 .../apache/kyuubi/operation/log/OperationLog.scala |  2 +-
 .../kyuubi/operation/log/OperationLogSuite.scala   |  4 +--
 .../kyuubi/server/api/v1/BatchesResource.scala     | 19 +++++++----
 5 files changed, 38 insertions(+), 30 deletions(-)

diff --git a/kyuubi-common/src/main/scala/org/apache/kyuubi/operation/log/Log4j12DivertAppender.scala b/kyuubi-common/src/main/scala/org/apache/kyuubi/operation/log/Log4j12DivertAppender.scala
index 1191e94ae..df2ef93d8 100644
--- a/kyuubi-common/src/main/scala/org/apache/kyuubi/operation/log/Log4j12DivertAppender.scala
+++ b/kyuubi-common/src/main/scala/org/apache/kyuubi/operation/log/Log4j12DivertAppender.scala
@@ -39,7 +39,7 @@ class Log4j12DivertAppender extends WriterAppender {
   setLayout(lo)
 
   addFilter { _: LoggingEvent =>
-    if (OperationLog.getCurrentOperationLog == null) Filter.DENY else Filter.NEUTRAL
+    if (OperationLog.getCurrentOperationLog.isDefined) Filter.NEUTRAL else Filter.DENY
   }
 
   /**
@@ -51,8 +51,7 @@ class Log4j12DivertAppender extends WriterAppender {
     // That should've gone into our writer. Notify the LogContext.
     val logOutput = writer.toString
     writer.reset()
-    val log = OperationLog.getCurrentOperationLog
-    if (log != null) log.write(logOutput)
+    OperationLog.getCurrentOperationLog.foreach(_.write(logOutput))
   }
 }
 
diff --git a/kyuubi-common/src/main/scala/org/apache/kyuubi/operation/log/Log4j2DivertAppender.scala b/kyuubi-common/src/main/scala/org/apache/kyuubi/operation/log/Log4j2DivertAppender.scala
index 68753cf98..dc4b24a8c 100644
--- a/kyuubi-common/src/main/scala/org/apache/kyuubi/operation/log/Log4j2DivertAppender.scala
+++ b/kyuubi-common/src/main/scala/org/apache/kyuubi/operation/log/Log4j2DivertAppender.scala
@@ -18,6 +18,7 @@
 package org.apache.kyuubi.operation.log
 
 import java.io.CharArrayWriter
+import java.util.concurrent.locks.ReadWriteLock
 
 import scala.collection.JavaConverters._
 
@@ -27,6 +28,8 @@ import org.apache.logging.log4j.core.appender.{AbstractWriterAppender, ConsoleAp
 import org.apache.logging.log4j.core.filter.AbstractFilter
 import org.apache.logging.log4j.core.layout.PatternLayout
 
+import org.apache.kyuubi.reflection.DynFields
+
 class Log4j2DivertAppender(
     name: String,
     layout: StringLayout,
@@ -52,22 +55,19 @@ class Log4j2DivertAppender(
 
   addFilter(new AbstractFilter() {
     override def filter(event: LogEvent): Filter.Result = {
-      if (OperationLog.getCurrentOperationLog == null) {
-        Filter.Result.DENY
-      } else {
+      if (OperationLog.getCurrentOperationLog.isDefined) {
         Filter.Result.NEUTRAL
+      } else {
+        Filter.Result.DENY
       }
     }
   })
 
-  def initLayout(): StringLayout = {
-    LogManager.getRootLogger.asInstanceOf[org.apache.logging.log4j.core.Logger]
-      .getAppenders.values().asScala
-      .find(ap => ap.isInstanceOf[ConsoleAppender] && ap.getLayout.isInstanceOf[StringLayout])
-      .map(_.getLayout.asInstanceOf[StringLayout])
-      .getOrElse(PatternLayout.newBuilder().withPattern(
-        "%d{yy/MM/dd HH:mm:ss} %p %c{2}: %m%n").build())
-  }
+  private val writeLock = DynFields.builder()
+    .hiddenImpl(classOf[AbstractWriterAppender[_]], "readWriteLock")
+    .build[ReadWriteLock](this)
+    .get()
+    .writeLock
 
   /**
    * Overrides AbstractWriterAppender.append(), which does the real logging. No need
@@ -75,11 +75,15 @@ class Log4j2DivertAppender(
    */
   override def append(event: LogEvent): Unit = {
     super.append(event)
-    // That should've gone into our writer. Notify the LogContext.
-    val logOutput = writer.toString
-    writer.reset()
-    val log = OperationLog.getCurrentOperationLog
-    if (log != null) log.write(logOutput)
+    writeLock.lock()
+    try {
+      // That should've gone into our writer. Notify the LogContext.
+      val logOutput = writer.toString
+      writer.reset()
+      OperationLog.getCurrentOperationLog.foreach(_.write(logOutput))
+    } finally {
+      writeLock.unlock()
+    }
   }
 }
 
@@ -95,7 +99,7 @@ object Log4j2DivertAppender {
 
   def initialize(): Unit = {
     val ap = new Log4j2DivertAppender()
-    org.apache.logging.log4j.LogManager.getRootLogger()
+    org.apache.logging.log4j.LogManager.getRootLogger
       .asInstanceOf[org.apache.logging.log4j.core.Logger].addAppender(ap)
     ap.start()
   }
diff --git a/kyuubi-common/src/main/scala/org/apache/kyuubi/operation/log/OperationLog.scala b/kyuubi-common/src/main/scala/org/apache/kyuubi/operation/log/OperationLog.scala
index 84c4ed55c..e6312d0fb 100644
--- a/kyuubi-common/src/main/scala/org/apache/kyuubi/operation/log/OperationLog.scala
+++ b/kyuubi-common/src/main/scala/org/apache/kyuubi/operation/log/OperationLog.scala
@@ -44,7 +44,7 @@ object OperationLog extends Logging {
     OPERATION_LOG.set(operationLog)
   }
 
-  def getCurrentOperationLog: OperationLog = OPERATION_LOG.get()
+  def getCurrentOperationLog: Option[OperationLog] = Option(OPERATION_LOG.get)
 
   def removeCurrentOperationLog(): Unit = OPERATION_LOG.remove()
 
diff --git a/kyuubi-common/src/test/scala/org/apache/kyuubi/operation/log/OperationLogSuite.scala b/kyuubi-common/src/test/scala/org/apache/kyuubi/operation/log/OperationLogSuite.scala
index 758eeeeaf..fe3cbc7fc 100644
--- a/kyuubi-common/src/test/scala/org/apache/kyuubi/operation/log/OperationLogSuite.scala
+++ b/kyuubi-common/src/test/scala/org/apache/kyuubi/operation/log/OperationLogSuite.scala
@@ -61,10 +61,10 @@ class OperationLogSuite extends KyuubiFunSuite {
     assert(!Files.exists(logFile))
 
     OperationLog.setCurrentOperationLog(operationLog)
-    assert(OperationLog.getCurrentOperationLog === operationLog)
+    assert(OperationLog.getCurrentOperationLog === Some(operationLog))
 
     OperationLog.removeCurrentOperationLog()
-    assert(OperationLog.getCurrentOperationLog === null)
+    assert(OperationLog.getCurrentOperationLog.isEmpty)
 
     operationLog.write(msg1 + "\n")
     assert(Files.exists(logFile))
diff --git a/kyuubi-server/src/main/scala/org/apache/kyuubi/server/api/v1/BatchesResource.scala b/kyuubi-server/src/main/scala/org/apache/kyuubi/server/api/v1/BatchesResource.scala
index e815ad73b..86beb5281 100644
--- a/kyuubi-server/src/main/scala/org/apache/kyuubi/server/api/v1/BatchesResource.scala
+++ b/kyuubi-server/src/main/scala/org/apache/kyuubi/server/api/v1/BatchesResource.scala
@@ -17,7 +17,8 @@
 
 package org.apache.kyuubi.server.api.v1
 
-import java.util.Locale
+import java.util
+import java.util.{Collections, Locale}
 import java.util.concurrent.ConcurrentHashMap
 import javax.ws.rs._
 import javax.ws.rs.core.MediaType
@@ -270,12 +271,16 @@ private[v1] class BatchesResource extends ApiRequestContext with Logging {
     Option(sessionManager.getBatchSessionImpl(sessionHandle)).map { batchSession =>
       try {
         val submissionOp = batchSession.batchJobSubmissionOp
-        val rowSet = submissionOp.getOperationLogRowSet(
-          FetchOrientation.FETCH_NEXT,
-          from,
-          size)
-        val logRowSet = rowSet.getColumns.get(0).getStringVal.getValues.asScala
-        new OperationLog(logRowSet.asJava, logRowSet.size)
+        val rowSet = submissionOp.getOperationLogRowSet(FetchOrientation.FETCH_NEXT, from, size)
+        val columns = rowSet.getColumns
+        val logRowSet: util.List[String] =
+          if (columns == null || columns.size == 0) {
+            Collections.emptyList()
+          } else {
+            assert(columns.size == 1)
+            columns.get(0).getStringVal.getValues
+          }
+        new OperationLog(logRowSet, logRowSet.size)
       } catch {
         case NonFatal(e) =>
           val errorMsg = s"Error getting operation log for batchId: $batchId"