You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@kyuubi.apache.org by GitBox <gi...@apache.org> on 2021/08/05 02:15:40 UTC

[GitHub] [incubator-kyuubi] ulysses-you commented on a change in pull request #892: Make JsonEventLogger compatible with HDFS-like FileSystems

ulysses-you commented on a change in pull request #892:
URL: https://github.com/apache/incubator-kyuubi/pull/892#discussion_r683078533



##########
File path: externals/kyuubi-spark-sql-engine/src/main/scala/org/apache/kyuubi/engine/spark/events/JsonEventLogger.scala
##########
@@ -33,32 +39,50 @@ import org.apache.kyuubi.service.AbstractService
  * The {eventType} is based on core concepts of the Kyuubi systems, e.g. engine/session/statement
  * @param logName the engine id formed of appId + attemptId(if any)
  */
-class JsonEventLogger(logName: String)
+class JsonEventLogger(logName: String, hadoopConf: Configuration)
   extends AbstractService("JsonEventLogger") with EventLogger with Logging {
 
-  private var logRoot: Path = _
-  private val writers = new scala.collection.mutable.HashMap[String, PrintWriter]()
+  type Logger = (PrintWriter, Option[FSDataOutputStream])
 
-  private def getOrUpdate(event: KyuubiEvent): PrintWriter = synchronized {
+  private var logRoot: URI = _
+  private var fs: FileSystem = _
+  private val writers = HashMap.empty[String, Logger]
+
+  private def getOrUpdate(event: KyuubiEvent): Logger = synchronized {
     writers.getOrElseUpdate(event.eventType, {
-      val eventDir = Files.createDirectories(Paths.get(logRoot.toString, event.eventType))
-      Files.setPosixFilePermissions(eventDir, JSON_LOG_DIR_PERM)
-      val eventPath = Files.createFile(Paths.get(eventDir.toString, logName +  ".json"))
-      // TODO: make it support Hadoop compatible filesystems
-      val newWriter = new PrintWriter(Files.newBufferedWriter(eventPath, StandardCharsets.UTF_8))
-      Files.setPosixFilePermissions(eventPath, JSON_LOG_FILE_PERM)
-      newWriter
+      val eventPath = new Path(new Path(logRoot), event.eventType)
+      FileSystem.mkdirs(fs, eventPath, JSON_LOG_DIR_PERM)
+      val logFile = new Path(eventPath, logName + ".json")
+      var hadoopDataStream: FSDataOutputStream = null
+      val rawStream = if (logFile.toUri.getScheme == "file") {
+        new FileOutputStream(logFile.toUri.getPath)
+      } else {
+        hadoopDataStream = fs.create(logFile)
+        hadoopDataStream
+      }

Review comment:
       why we need distinguish the scheme here, we can't just do this ?
   ```
   val stream = fs.create(logFile)
   val bStream = new BufferedOutputStream(stream)
   ```




-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscribe@kyuubi.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org