You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@linkis.apache.org by pe...@apache.org on 2022/03/05 01:59:08 UTC

[incubator-linkis] 03/03: optimize HttpBmlClient util

This is an automated email from the ASF dual-hosted git repository.

peacewong pushed a commit to branch dev-1.1.0-datasource
in repository https://gitbox.apache.org/repos/asf/incubator-linkis.git

commit ea7862e4f066ca0164297b1b25e53327936e9d8f
Author: casionone <ca...@gmail.com>
AuthorDate: Fri Mar 4 21:26:47 2022 +0800

    optimize HttpBmlClient util
---
 .../linkis/httpclient/request/DownloadAction.scala | 13 ++-
 .../linkis/bml/client/impl/HttpBmlClient.scala     | 95 +++++++++++-----------
 .../org/apache/linkis/bml/http/HttpConf.scala      |  4 +
 .../apache/linkis/bml/request/BmlPOSTAction.scala  | 31 ++++++-
 4 files changed, 92 insertions(+), 51 deletions(-)

diff --git a/linkis-commons/linkis-httpclient/src/main/scala/org/apache/linkis/httpclient/request/DownloadAction.scala b/linkis-commons/linkis-httpclient/src/main/scala/org/apache/linkis/httpclient/request/DownloadAction.scala
index 5824db9..5826bcd 100644
--- a/linkis-commons/linkis-httpclient/src/main/scala/org/apache/linkis/httpclient/request/DownloadAction.scala
+++ b/linkis-commons/linkis-httpclient/src/main/scala/org/apache/linkis/httpclient/request/DownloadAction.scala
@@ -17,11 +17,22 @@
  
 package org.apache.linkis.httpclient.request
 
+import org.apache.http.HttpResponse
+import org.apache.linkis.common.utils.Logging
+
 import scala.tools.nsc.interpreter.InputStream
 
 
-trait DownloadAction {
+trait DownloadAction extends Logging {
+
+  def getResponse: HttpResponse
+
+  def setResponse(response: HttpResponse)
 
   def write(inputStream: InputStream): Unit
 
+  def write(inputStream: InputStream, response: HttpResponse): Unit = {
+    write(inputStream)
+    setResponse(response)
+  }
 }
diff --git a/linkis-public-enhancements/linkis-bml/linkis-bml-client/src/main/scala/org/apache/linkis/bml/client/impl/HttpBmlClient.scala b/linkis-public-enhancements/linkis-bml/linkis-bml-client/src/main/scala/org/apache/linkis/bml/client/impl/HttpBmlClient.scala
index 43f7fd7..0576cdb 100644
--- a/linkis-public-enhancements/linkis-bml/linkis-bml-client/src/main/scala/org/apache/linkis/bml/client/impl/HttpBmlClient.scala
+++ b/linkis-public-enhancements/linkis-bml/linkis-bml-client/src/main/scala/org/apache/linkis/bml/client/impl/HttpBmlClient.scala
@@ -17,7 +17,7 @@
  
 package org.apache.linkis.bml.client.impl
 
-import java.io.{File, IOException, InputStream}
+import java.io.{File, IOException, InputStream, OutputStream}
 import java.util
 
 import org.apache.linkis.bml.client.AbstractBmlClient
@@ -29,7 +29,7 @@ import org.apache.linkis.bml.request._
 import org.apache.linkis.bml.response.{BmlCreateBmlProjectResult, _}
 import org.apache.linkis.common.conf.Configuration
 import org.apache.linkis.common.io.FsPath
-import org.apache.linkis.common.utils.Logging
+import org.apache.linkis.common.utils.{Logging, Utils}
 import org.apache.linkis.httpclient.authentication.AuthenticationStrategy
 import org.apache.linkis.httpclient.config.ClientConfigBuilder
 import org.apache.linkis.httpclient.dws.DWSHttpClient
@@ -38,6 +38,7 @@ import org.apache.linkis.httpclient.dws.config.DWSClientConfig
 import org.apache.linkis.storage.FSFactory
 import org.apache.commons.io.IOUtils
 import org.apache.commons.lang.StringUtils
+import org.apache.http.client.methods.CloseableHttpResponse
 
 class HttpBmlClient(clientConfig: DWSClientConfig,
                    serverUrl: String,
@@ -116,7 +117,7 @@ class HttpBmlClient(clientConfig: DWSClientConfig,
     * @param overwrite 是否是追加
     * @return 返回的inputStream已经被全部读完,所以返回一个null,另外的fullFileName是整个文件的名字
     */
-  override def downloadResource(user: String, resourceId: String, version: String, path: String, overwrite:Boolean = false): BmlDownloadResponse = {
+  override def downloadResource(user: String, resourceId: String, version: String, path: String, overwrite: Boolean = false): BmlDownloadResponse = {
     val fsPath = new FsPath(path)
     val fileSystem = FSFactory.getFsByProxyUser(fsPath, user)
     fileSystem.init(new util.HashMap[String, String]())
@@ -126,31 +127,35 @@ class HttpBmlClient(clientConfig: DWSClientConfig,
     downloadAction.getParameters += "resourceId" -> resourceId
     if(StringUtils.isNotEmpty(version)) downloadAction.getParameters += "version" -> version
     downloadAction.setUser(user)
-    val downloadResult = dwsClient.execute(downloadAction)
-    val fullFilePath = new FsPath(fullFileName)
-    if (downloadResult != null){
-      val inputStream = downloadAction.getInputStream
-      val outputStream = fileSystem.write(fullFilePath, overwrite)
-      try{
-        IOUtils.copy(inputStream, outputStream)
-      }catch{
-        case e:IOException => logger.error("failed to copy inputStream and outputStream (inputStream和outputStream流copy失败)", e)
-          val exception = BmlClientFailException("failed to copy inputStream and outputStream (inputStream和outputStream流copy失败)")
-          exception.initCause(e)
-          throw e
-        case t:Throwable => logger.error("failed to copy stream (流复制失败)",t)
-          throw t
-      }finally{
-        IOUtils.closeQuietly(inputStream)
-        IOUtils.closeQuietly(outputStream)
+    var inputStream: InputStream = null
+    var outputStream: OutputStream = null
+    try {
+      dwsClient.execute(downloadAction)
+      val fullFilePath = new FsPath(fullFileName)
+      outputStream = fileSystem.write(fullFilePath, overwrite)
+      inputStream = downloadAction.getInputStream
+      IOUtils.copy(inputStream, outputStream)
+      downloadAction.getResponse match {
+        case r: CloseableHttpResponse =>
+          Utils.tryAndWarn(r.close())
+        case o: Any =>
+          info(s"Download response : ${o.getClass.getName} cannot close.")
       }
-      BmlDownloadResponse(isSuccess = true, null, resourceId, version, fullFileName)
-    }else{
-      BmlDownloadResponse(isSuccess = false, null, null, null, null)
+    } catch {
+      case e: IOException => logger.error("failed to copy inputStream and outputStream (inputStream和outputStream流copy失败)", e)
+        val exception = BmlClientFailException("failed to copy inputStream and outputStream (inputStream和outputStream流copy失败)")
+        exception.initCause(e)
+        throw exception
+      case t: Throwable => logger.error("failed to copy stream (流复制失败)", t)
+        throw t
+    } finally {
+      if (null != inputStream) IOUtils.closeQuietly(inputStream)
+      if (null != outputStream) IOUtils.closeQuietly(outputStream)
+      fileSystem.close()
     }
+    BmlDownloadResponse(isSuccess = true, null, resourceId, version, fullFileName)
   }
 
-
   override def downloadShareResource(user: String, resourceId: String, version: String, path: String,
                                      overwrite:Boolean = false): BmlDownloadResponse = {
     val fsPath = new FsPath(path)
@@ -162,32 +167,30 @@ class HttpBmlClient(clientConfig: DWSClientConfig,
     downloadAction.getParameters += "resourceId" -> resourceId
     if(StringUtils.isNotEmpty(version)) downloadAction.getParameters += "version" -> version
     downloadAction.setUser(user)
-    val downloadResult = dwsClient.execute(downloadAction)
-    val fullFilePath = new FsPath(fullFileName)
-    if (downloadResult != null){
-      val inputStream = downloadAction.getInputStream
-      val outputStream = fileSystem.write(fullFilePath, overwrite)
-      try{
-        IOUtils.copy(inputStream, outputStream)
-      }catch{
-        case e:IOException => logger.error("failed to copy inputStream and outputStream (inputStream和outputStream流copy失败)", e)
-          val exception = BmlClientFailException("failed to copy inputStream and outputStream (inputStream和outputStream流copy失败)")
-          exception.initCause(e)
-          throw e
-        case t:Throwable => logger.error("failed to copy stream (流复制失败)",t)
-          throw t
-      }finally{
-        IOUtils.closeQuietly(inputStream)
-        IOUtils.closeQuietly(outputStream)
-      }
-      BmlDownloadResponse(isSuccess = true, null, resourceId, version, fullFileName)
-    }else{
-      BmlDownloadResponse(isSuccess = false, null, null, null, null)
+    var inputStream: InputStream = null
+    var outputStream: OutputStream = null
+    try {
+      dwsClient.execute(downloadAction)
+      val fullFilePath = new FsPath(fullFileName)
+      outputStream = fileSystem.write(fullFilePath, overwrite)
+      inputStream = downloadAction.getInputStream
+      IOUtils.copy(inputStream, outputStream)
+    } catch {
+      case e:IOException => logger.error("failed to copy inputStream and outputStream (inputStream和outputStream流copy失败)", e)
+        val exception = BmlClientFailException("failed to copy inputStream and outputStream (inputStream和outputStream流copy失败)")
+        exception.initCause(e)
+        throw e
+      case t:Throwable => logger.error("failed to copy stream (流复制失败)",t)
+        throw t
+    } finally {
+      if (null != inputStream) IOUtils.closeQuietly(inputStream)
+      if (null != outputStream) IOUtils.closeQuietly(outputStream)
+      fileSystem.close()
     }
+    BmlDownloadResponse(isSuccess = true, null, resourceId, version, fullFileName)
   }
 
 
-
   /**
     * 更新资源信息
     *
diff --git a/linkis-public-enhancements/linkis-bml/linkis-bml-client/src/main/scala/org/apache/linkis/bml/http/HttpConf.scala b/linkis-public-enhancements/linkis-bml/linkis-bml-client/src/main/scala/org/apache/linkis/bml/http/HttpConf.scala
index 8447764..1308bd9 100644
--- a/linkis-public-enhancements/linkis-bml/linkis-bml-client/src/main/scala/org/apache/linkis/bml/http/HttpConf.scala
+++ b/linkis-public-enhancements/linkis-bml/linkis-bml-client/src/main/scala/org/apache/linkis/bml/http/HttpConf.scala
@@ -43,6 +43,10 @@ object HttpConf {
   val uploadShareResourceUrl:String = urlPrefix + "/" + "uploadShareResource"
   val updateShareResourceUrl:String = urlPrefix + "/" + "updateShareResource"
   val attachUrl:String = urlPrefix + "/" + "attachResourceAndProject"
+  val changeOwnerUrl: String = urlPrefix + "/" + "changeOwner"
+  val rollbackVersionUrl: String = urlPrefix + "/" + "rollbackVersion"
+  val copyResourceUrl: String = urlPrefix + "/" + "copyResourceToAnotherUser"
+
   def main(args: Array[String]): Unit = {
     println(uploadURL)
     println(downloadURL)
diff --git a/linkis-public-enhancements/linkis-bml/linkis-bml-client/src/main/scala/org/apache/linkis/bml/request/BmlPOSTAction.scala b/linkis-public-enhancements/linkis-bml/linkis-bml-client/src/main/scala/org/apache/linkis/bml/request/BmlPOSTAction.scala
index 5b2d497..4d8c419 100644
--- a/linkis-public-enhancements/linkis-bml/linkis-bml-client/src/main/scala/org/apache/linkis/bml/request/BmlPOSTAction.scala
+++ b/linkis-public-enhancements/linkis-bml/linkis-bml-client/src/main/scala/org/apache/linkis/bml/request/BmlPOSTAction.scala
@@ -24,6 +24,7 @@ import java.{lang, util}
 import com.google.gson._
 import org.apache.linkis.bml.http.HttpConf
 import org.apache.linkis.httpclient.request._
+import org.apache.http.HttpResponse
 
 trait BmlAction extends UserAction{
 
@@ -184,8 +185,9 @@ case class BmlUpdateShareResourceAction(filePaths:Array[String],
 
 case class BmlDownloadAction() extends BmlGETAction with DownloadAction with UserAction{
 
-  private var inputStream:InputStream = _
-  private var user:String = _
+  private var inputStream: InputStream = _
+  private var user: String = _
+  private var respoonse: HttpResponse = _
 
   def getInputStream:InputStream = this.inputStream
 
@@ -198,13 +200,18 @@ case class BmlDownloadAction() extends BmlGETAction with DownloadAction with Use
   override def setUser(user: String): Unit = this.user = user
 
   override def getUser: String = this.user
+
+  override def getResponse: HttpResponse = respoonse
+
+  override def setResponse(response: HttpResponse): Unit = this.respoonse = response
 }
 
 
 case class BmlDownloadShareAction() extends BmlGETAction with DownloadAction with UserAction{
 
-  private var inputStream:InputStream = _
-  private var user:String = _
+  private var inputStream: InputStream = _
+  private var user: String = _
+  private var response: HttpResponse = _
 
   def getInputStream:InputStream = this.inputStream
 
@@ -217,6 +224,10 @@ case class BmlDownloadShareAction() extends BmlGETAction with DownloadAction wit
   override def setUser(user: String): Unit = this.user = user
 
   override def getUser: String = this.user
+
+  override def getResponse: HttpResponse = response
+
+  override def setResponse(response: HttpResponse): Unit = this.response = response
 }
 
 
@@ -280,3 +291,15 @@ case class UpdateBmlProjectAction() extends BmlPOSTAction{
 case class BmlAttachAction()extends BmlPOSTAction{
   override def getURL: String = HttpConf.attachUrl
 }
+
+case class BmlChangeOwnerAction() extends BmlPOSTAction {
+  override def getURL: String = HttpConf.changeOwnerUrl
+}
+
+case class BmlCopyResourceAction() extends BmlPOSTAction {
+  override def getURL: String = HttpConf.copyResourceUrl
+}
+
+case class BmlRollbackVersionAction() extends BmlPOSTAction {
+  override def getURL: String = HttpConf.rollbackVersionUrl
+}

---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscribe@linkis.apache.org
For additional commands, e-mail: commits-help@linkis.apache.org