You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@linkis.apache.org by ca...@apache.org on 2022/09/07 16:04:20 UTC

[incubator-linkis] branch dev-1.3.1 updated: [linkis-io_file-client] Modification of scala file floating red (#3191)

This is an automated email from the ASF dual-hosted git repository.

casion pushed a commit to branch dev-1.3.1
in repository https://gitbox.apache.org/repos/asf/incubator-linkis.git


The following commit(s) were added to refs/heads/dev-1.3.1 by this push:
     new 2d0b7eb45 [linkis-io_file-client] Modification of scala file floating red (#3191)
2d0b7eb45 is described below

commit 2d0b7eb4516d8557804554d57e08b6958c45d8e1
Author: 成彬彬 <10...@users.noreply.github.com>
AuthorDate: Thu Sep 8 00:04:14 2022 +0800

    [linkis-io_file-client] Modification of scala file floating red (#3191)
---
 .../io/iteraceptor/IOMethodInterceptor.scala       | 43 +++++++++++++---------
 1 file changed, 26 insertions(+), 17 deletions(-)

diff --git a/linkis-extensions/linkis-io-file-client/src/main/scala/org/apache/linkis/storage/io/iteraceptor/IOMethodInterceptor.scala b/linkis-extensions/linkis-io-file-client/src/main/scala/org/apache/linkis/storage/io/iteraceptor/IOMethodInterceptor.scala
index e4638e399..b081aded9 100644
--- a/linkis-extensions/linkis-io-file-client/src/main/scala/org/apache/linkis/storage/io/iteraceptor/IOMethodInterceptor.scala
+++ b/linkis-extensions/linkis-io-file-client/src/main/scala/org/apache/linkis/storage/io/iteraceptor/IOMethodInterceptor.scala
@@ -36,9 +36,10 @@ import org.springframework.cglib.proxy.{MethodInterceptor, MethodProxy}
 import java.io.{InputStream, IOException, OutputStream}
 import java.lang.reflect.Method
 import java.net.InetAddress
+import java.util
 
 import scala.beans.BeanProperty
-import scala.collection.JavaConversions._
+import scala.collection.JavaConverters._
 import scala.collection.mutable
 
 import com.google.gson.reflect.TypeToken
@@ -47,7 +48,7 @@ class IOMethodInterceptor(fsType: String) extends MethodInterceptor with Logging
 
   @BeanProperty var ioClient: IOClient = _
 
-  private val properties: mutable.HashMap[String, String] = mutable.HashMap[String, String]()
+  private val properties: java.util.Map[String, String] = new util.HashMap[String, String]
 
   private var inited = false
 
@@ -102,11 +103,12 @@ class IOMethodInterceptor(fsType: String) extends MethodInterceptor with Logging
   }
 
   def initFS(methodName: String = "init"): Unit = {
-    if (!properties.contains(StorageConfiguration.PROXY_USER.key))
+    if (!properties.asScala.contains(StorageConfiguration.PROXY_USER.key)) {
       throw new StorageErrorException(
         52002,
         "no user set, we cannot get the permission information."
       )
+    }
     bindEngineLabel.setIsJobGroupHead("true")
     bindEngineLabel.setIsJobGroupEnd("false")
     val res = ioClient.executeWithRetry(
@@ -118,7 +120,7 @@ class IOMethodInterceptor(fsType: String) extends MethodInterceptor with Logging
         getProxyUser,
         getLocalIP,
         methodName,
-        Array(properties.toMap)
+        Array(properties.asScala.toMap)
       ),
       bindEngineLabel
     )
@@ -134,11 +136,12 @@ class IOMethodInterceptor(fsType: String) extends MethodInterceptor with Logging
   }
 
   def beforeOperation(): Unit = {
-    if (closed)
+    if (closed) {
       throw new StorageErrorException(
         52002,
         s"$fsType storage($id) engine($bindEngineLabel) has been closed, IO operation was illegal."
       )
+    }
     if (System.currentTimeMillis() - lastAccessTime >= iOEngineExecutorMaxFreeTime) synchronized {
       if (System.currentTimeMillis() - lastAccessTime >= iOEngineExecutorMaxFreeTime) {
         initFS()
@@ -156,14 +159,15 @@ class IOMethodInterceptor(fsType: String) extends MethodInterceptor with Logging
       args: Array[AnyRef],
       methodProxy: MethodProxy
   ): AnyRef = {
-    if (closed && method.getName != "close")
+    if (closed && method.getName != "close") {
       throw new StorageErrorException(52002, s"$fsType storage has been closed.")
+    }
     if (System.currentTimeMillis() - lastAccessTime >= iOEngineExecutorMaxFreeTime) synchronized {
       method.getName match {
         case "init" =>
         case "storageName" => return fsType
         case "setUser" =>
-          properties += StorageConfiguration.PROXY_USER.key -> args(0).asInstanceOf[String];
+          properties.asScala += StorageConfiguration.PROXY_USER.key -> args(0).asInstanceOf[String];
           return Unit
         case _ =>
           if (inited) {
@@ -176,20 +180,22 @@ class IOMethodInterceptor(fsType: String) extends MethodInterceptor with Logging
     method.getName match {
       case "init" =>
         val user =
-          if (properties.contains(StorageConfiguration.PROXY_USER.key))
-            StorageConfiguration.PROXY_USER.getValue(properties.toMap)
-          else null
+          if (properties.asScala.contains(StorageConfiguration.PROXY_USER.key)) {
+            StorageConfiguration.PROXY_USER.getValue(properties.asScala.toMap)
+          } else null
         if (args.length > 0 && args(0).isInstanceOf[java.util.Map[String, String]]) {
-          properties ++= args(0).asInstanceOf[java.util.Map[String, String]]
+          properties.asScala ++= args(0).asInstanceOf[java.util.Map[String, String]].asScala
+        }
+        if (StringUtils.isNotEmpty(user)) {
+          properties.asScala += StorageConfiguration.PROXY_USER.key -> user
         }
-        if (StringUtils.isNotEmpty(user))
-          properties += StorageConfiguration.PROXY_USER.key -> user
         initFS()
         logger.warn(s"For user($user)inited a $fsType storage($id) .")
         Unit
       case "fsName" => fsType
       case "setUser" =>
-        properties += StorageConfiguration.PROXY_USER.key -> args(0).asInstanceOf[String]; Unit
+        properties.asScala += StorageConfiguration.PROXY_USER.key -> args(0).asInstanceOf[String];
+        Unit
       case "read" =>
         if (!inited) throw new IllegalAccessException("storage has not been inited.")
         new IOInputStream(args)
@@ -197,15 +203,17 @@ class IOMethodInterceptor(fsType: String) extends MethodInterceptor with Logging
         if (!inited) throw new IllegalAccessException("storage has not been inited.")
         new IOOutputStream(args)
       case "renameTo" =>
-        if (!inited || args.length < 2)
+        if (!inited || args.length < 2) {
           throw new IllegalAccessException("storage has not been inited.")
+        }
         val params =
           args.map(MethodEntitySerializer.serializerJavaObject(_)).map(_.asInstanceOf[AnyRef])
         executeMethod(method.getName, params)
         new java.lang.Boolean(true)
       case "list" =>
-        if (!inited || args.length < 1)
+        if (!inited || args.length < 1) {
           throw new IllegalAccessException("storage has not been inited.")
+        }
         val params =
           Array(MethodEntitySerializer.serializerJavaObject(args(0))).map(_.asInstanceOf[AnyRef])
         val msg = executeMethod(method.getName, params)
@@ -214,8 +222,9 @@ class IOMethodInterceptor(fsType: String) extends MethodInterceptor with Logging
           new TypeToken[java.util.List[FsPath]]() {}.getType
         )
       case "listPathWithError" =>
-        if (!inited || args.length < 1)
+        if (!inited || args.length < 1) {
           throw new IllegalAccessException("storage has not been inited.")
+        }
         val params =
           Array(MethodEntitySerializer.serializerJavaObject(args(0))).map(_.asInstanceOf[AnyRef])
         val msg = executeMethod(method.getName, params)


---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscribe@linkis.apache.org
For additional commands, e-mail: commits-help@linkis.apache.org