You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@linkis.apache.org by ca...@apache.org on 2022/09/07 16:04:20 UTC
[incubator-linkis] branch dev-1.3.1 updated: [linkis-io_file-client] Modification of scala file floating red (#3191)
This is an automated email from the ASF dual-hosted git repository.
casion pushed a commit to branch dev-1.3.1
in repository https://gitbox.apache.org/repos/asf/incubator-linkis.git
The following commit(s) were added to refs/heads/dev-1.3.1 by this push:
new 2d0b7eb45 [linkis-io_file-client] Modification of scala file floating red (#3191)
2d0b7eb45 is described below
commit 2d0b7eb4516d8557804554d57e08b6958c45d8e1
Author: 成彬彬 <10...@users.noreply.github.com>
AuthorDate: Thu Sep 8 00:04:14 2022 +0800
[linkis-io_file-client] Modification of scala file floating red (#3191)
---
.../io/iteraceptor/IOMethodInterceptor.scala | 43 +++++++++++++---------
1 file changed, 26 insertions(+), 17 deletions(-)
diff --git a/linkis-extensions/linkis-io-file-client/src/main/scala/org/apache/linkis/storage/io/iteraceptor/IOMethodInterceptor.scala b/linkis-extensions/linkis-io-file-client/src/main/scala/org/apache/linkis/storage/io/iteraceptor/IOMethodInterceptor.scala
index e4638e399..b081aded9 100644
--- a/linkis-extensions/linkis-io-file-client/src/main/scala/org/apache/linkis/storage/io/iteraceptor/IOMethodInterceptor.scala
+++ b/linkis-extensions/linkis-io-file-client/src/main/scala/org/apache/linkis/storage/io/iteraceptor/IOMethodInterceptor.scala
@@ -36,9 +36,10 @@ import org.springframework.cglib.proxy.{MethodInterceptor, MethodProxy}
import java.io.{InputStream, IOException, OutputStream}
import java.lang.reflect.Method
import java.net.InetAddress
+import java.util
import scala.beans.BeanProperty
-import scala.collection.JavaConversions._
+import scala.collection.JavaConverters._
import scala.collection.mutable
import com.google.gson.reflect.TypeToken
@@ -47,7 +48,7 @@ class IOMethodInterceptor(fsType: String) extends MethodInterceptor with Logging
@BeanProperty var ioClient: IOClient = _
- private val properties: mutable.HashMap[String, String] = mutable.HashMap[String, String]()
+ private val properties: java.util.Map[String, String] = new util.HashMap[String, String]
private var inited = false
@@ -102,11 +103,12 @@ class IOMethodInterceptor(fsType: String) extends MethodInterceptor with Logging
}
def initFS(methodName: String = "init"): Unit = {
- if (!properties.contains(StorageConfiguration.PROXY_USER.key))
+ if (!properties.asScala.contains(StorageConfiguration.PROXY_USER.key)) {
throw new StorageErrorException(
52002,
"no user set, we cannot get the permission information."
)
+ }
bindEngineLabel.setIsJobGroupHead("true")
bindEngineLabel.setIsJobGroupEnd("false")
val res = ioClient.executeWithRetry(
@@ -118,7 +120,7 @@ class IOMethodInterceptor(fsType: String) extends MethodInterceptor with Logging
getProxyUser,
getLocalIP,
methodName,
- Array(properties.toMap)
+ Array(properties.asScala.toMap)
),
bindEngineLabel
)
@@ -134,11 +136,12 @@ class IOMethodInterceptor(fsType: String) extends MethodInterceptor with Logging
}
def beforeOperation(): Unit = {
- if (closed)
+ if (closed) {
throw new StorageErrorException(
52002,
s"$fsType storage($id) engine($bindEngineLabel) has been closed, IO operation was illegal."
)
+ }
if (System.currentTimeMillis() - lastAccessTime >= iOEngineExecutorMaxFreeTime) synchronized {
if (System.currentTimeMillis() - lastAccessTime >= iOEngineExecutorMaxFreeTime) {
initFS()
@@ -156,14 +159,15 @@ class IOMethodInterceptor(fsType: String) extends MethodInterceptor with Logging
args: Array[AnyRef],
methodProxy: MethodProxy
): AnyRef = {
- if (closed && method.getName != "close")
+ if (closed && method.getName != "close") {
throw new StorageErrorException(52002, s"$fsType storage has been closed.")
+ }
if (System.currentTimeMillis() - lastAccessTime >= iOEngineExecutorMaxFreeTime) synchronized {
method.getName match {
case "init" =>
case "storageName" => return fsType
case "setUser" =>
- properties += StorageConfiguration.PROXY_USER.key -> args(0).asInstanceOf[String];
+ properties.asScala += StorageConfiguration.PROXY_USER.key -> args(0).asInstanceOf[String];
return Unit
case _ =>
if (inited) {
@@ -176,20 +180,22 @@ class IOMethodInterceptor(fsType: String) extends MethodInterceptor with Logging
method.getName match {
case "init" =>
val user =
- if (properties.contains(StorageConfiguration.PROXY_USER.key))
- StorageConfiguration.PROXY_USER.getValue(properties.toMap)
- else null
+ if (properties.asScala.contains(StorageConfiguration.PROXY_USER.key)) {
+ StorageConfiguration.PROXY_USER.getValue(properties.asScala.toMap)
+ } else null
if (args.length > 0 && args(0).isInstanceOf[java.util.Map[String, String]]) {
- properties ++= args(0).asInstanceOf[java.util.Map[String, String]]
+ properties.asScala ++= args(0).asInstanceOf[java.util.Map[String, String]].asScala
+ }
+ if (StringUtils.isNotEmpty(user)) {
+ properties.asScala += StorageConfiguration.PROXY_USER.key -> user
}
- if (StringUtils.isNotEmpty(user))
- properties += StorageConfiguration.PROXY_USER.key -> user
initFS()
logger.warn(s"For user($user)inited a $fsType storage($id) .")
Unit
case "fsName" => fsType
case "setUser" =>
- properties += StorageConfiguration.PROXY_USER.key -> args(0).asInstanceOf[String]; Unit
+ properties.asScala += StorageConfiguration.PROXY_USER.key -> args(0).asInstanceOf[String];
+ Unit
case "read" =>
if (!inited) throw new IllegalAccessException("storage has not been inited.")
new IOInputStream(args)
@@ -197,15 +203,17 @@ class IOMethodInterceptor(fsType: String) extends MethodInterceptor with Logging
if (!inited) throw new IllegalAccessException("storage has not been inited.")
new IOOutputStream(args)
case "renameTo" =>
- if (!inited || args.length < 2)
+ if (!inited || args.length < 2) {
throw new IllegalAccessException("storage has not been inited.")
+ }
val params =
args.map(MethodEntitySerializer.serializerJavaObject(_)).map(_.asInstanceOf[AnyRef])
executeMethod(method.getName, params)
new java.lang.Boolean(true)
case "list" =>
- if (!inited || args.length < 1)
+ if (!inited || args.length < 1) {
throw new IllegalAccessException("storage has not been inited.")
+ }
val params =
Array(MethodEntitySerializer.serializerJavaObject(args(0))).map(_.asInstanceOf[AnyRef])
val msg = executeMethod(method.getName, params)
@@ -214,8 +222,9 @@ class IOMethodInterceptor(fsType: String) extends MethodInterceptor with Logging
new TypeToken[java.util.List[FsPath]]() {}.getType
)
case "listPathWithError" =>
- if (!inited || args.length < 1)
+ if (!inited || args.length < 1) {
throw new IllegalAccessException("storage has not been inited.")
+ }
val params =
Array(MethodEntitySerializer.serializerJavaObject(args(0))).map(_.asInstanceOf[AnyRef])
val msg = executeMethod(method.getName, params)
---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscribe@linkis.apache.org
For additional commands, e-mail: commits-help@linkis.apache.org