You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@linkis.apache.org by pe...@apache.org on 2022/03/05 01:59:08 UTC
[incubator-linkis] 03/03: optimize HttpBmlClient util
This is an automated email from the ASF dual-hosted git repository.
peacewong pushed a commit to branch dev-1.1.0-datasource
in repository https://gitbox.apache.org/repos/asf/incubator-linkis.git
commit ea7862e4f066ca0164297b1b25e53327936e9d8f
Author: casionone <ca...@gmail.com>
AuthorDate: Fri Mar 4 21:26:47 2022 +0800
optimize HttpBmlClient util
---
.../linkis/httpclient/request/DownloadAction.scala | 13 ++-
.../linkis/bml/client/impl/HttpBmlClient.scala | 95 +++++++++++-----------
.../org/apache/linkis/bml/http/HttpConf.scala | 4 +
.../apache/linkis/bml/request/BmlPOSTAction.scala | 31 ++++++-
4 files changed, 92 insertions(+), 51 deletions(-)
diff --git a/linkis-commons/linkis-httpclient/src/main/scala/org/apache/linkis/httpclient/request/DownloadAction.scala b/linkis-commons/linkis-httpclient/src/main/scala/org/apache/linkis/httpclient/request/DownloadAction.scala
index 5824db9..5826bcd 100644
--- a/linkis-commons/linkis-httpclient/src/main/scala/org/apache/linkis/httpclient/request/DownloadAction.scala
+++ b/linkis-commons/linkis-httpclient/src/main/scala/org/apache/linkis/httpclient/request/DownloadAction.scala
@@ -17,11 +17,22 @@
package org.apache.linkis.httpclient.request
+import org.apache.http.HttpResponse
+import org.apache.linkis.common.utils.Logging
+
import scala.tools.nsc.interpreter.InputStream
-trait DownloadAction {
+trait DownloadAction extends Logging {
+
+ def getResponse: HttpResponse
+
+ def setResponse(response: HttpResponse)
def write(inputStream: InputStream): Unit
+ def write(inputStream: InputStream, response: HttpResponse): Unit = {
+ write(inputStream)
+ setResponse(response)
+ }
}
diff --git a/linkis-public-enhancements/linkis-bml/linkis-bml-client/src/main/scala/org/apache/linkis/bml/client/impl/HttpBmlClient.scala b/linkis-public-enhancements/linkis-bml/linkis-bml-client/src/main/scala/org/apache/linkis/bml/client/impl/HttpBmlClient.scala
index 43f7fd7..0576cdb 100644
--- a/linkis-public-enhancements/linkis-bml/linkis-bml-client/src/main/scala/org/apache/linkis/bml/client/impl/HttpBmlClient.scala
+++ b/linkis-public-enhancements/linkis-bml/linkis-bml-client/src/main/scala/org/apache/linkis/bml/client/impl/HttpBmlClient.scala
@@ -17,7 +17,7 @@
package org.apache.linkis.bml.client.impl
-import java.io.{File, IOException, InputStream}
+import java.io.{File, IOException, InputStream, OutputStream}
import java.util
import org.apache.linkis.bml.client.AbstractBmlClient
@@ -29,7 +29,7 @@ import org.apache.linkis.bml.request._
import org.apache.linkis.bml.response.{BmlCreateBmlProjectResult, _}
import org.apache.linkis.common.conf.Configuration
import org.apache.linkis.common.io.FsPath
-import org.apache.linkis.common.utils.Logging
+import org.apache.linkis.common.utils.{Logging, Utils}
import org.apache.linkis.httpclient.authentication.AuthenticationStrategy
import org.apache.linkis.httpclient.config.ClientConfigBuilder
import org.apache.linkis.httpclient.dws.DWSHttpClient
@@ -38,6 +38,7 @@ import org.apache.linkis.httpclient.dws.config.DWSClientConfig
import org.apache.linkis.storage.FSFactory
import org.apache.commons.io.IOUtils
import org.apache.commons.lang.StringUtils
+import org.apache.http.client.methods.CloseableHttpResponse
class HttpBmlClient(clientConfig: DWSClientConfig,
serverUrl: String,
@@ -116,7 +117,7 @@ class HttpBmlClient(clientConfig: DWSClientConfig,
* @param overwrite 是否是追加
* @return 返回的inputStream已经被全部读完,所以返回一个null,另外的fullFileName是整个文件的名字
*/
- override def downloadResource(user: String, resourceId: String, version: String, path: String, overwrite:Boolean = false): BmlDownloadResponse = {
+ override def downloadResource(user: String, resourceId: String, version: String, path: String, overwrite: Boolean = false): BmlDownloadResponse = {
val fsPath = new FsPath(path)
val fileSystem = FSFactory.getFsByProxyUser(fsPath, user)
fileSystem.init(new util.HashMap[String, String]())
@@ -126,31 +127,35 @@ class HttpBmlClient(clientConfig: DWSClientConfig,
downloadAction.getParameters += "resourceId" -> resourceId
if(StringUtils.isNotEmpty(version)) downloadAction.getParameters += "version" -> version
downloadAction.setUser(user)
- val downloadResult = dwsClient.execute(downloadAction)
- val fullFilePath = new FsPath(fullFileName)
- if (downloadResult != null){
- val inputStream = downloadAction.getInputStream
- val outputStream = fileSystem.write(fullFilePath, overwrite)
- try{
- IOUtils.copy(inputStream, outputStream)
- }catch{
- case e:IOException => logger.error("failed to copy inputStream and outputStream (inputStream和outputStream流copy失败)", e)
- val exception = BmlClientFailException("failed to copy inputStream and outputStream (inputStream和outputStream流copy失败)")
- exception.initCause(e)
- throw e
- case t:Throwable => logger.error("failed to copy stream (流复制失败)",t)
- throw t
- }finally{
- IOUtils.closeQuietly(inputStream)
- IOUtils.closeQuietly(outputStream)
+ var inputStream: InputStream = null
+ var outputStream: OutputStream = null
+ try {
+ dwsClient.execute(downloadAction)
+ val fullFilePath = new FsPath(fullFileName)
+ outputStream = fileSystem.write(fullFilePath, overwrite)
+ inputStream = downloadAction.getInputStream
+ IOUtils.copy(inputStream, outputStream)
+ downloadAction.getResponse match {
+ case r: CloseableHttpResponse =>
+ Utils.tryAndWarn(r.close())
+ case o: Any =>
+ info(s"Download response : ${o.getClass.getName} cannot close.")
}
- BmlDownloadResponse(isSuccess = true, null, resourceId, version, fullFileName)
- }else{
- BmlDownloadResponse(isSuccess = false, null, null, null, null)
+ } catch {
+ case e: IOException => logger.error("failed to copy inputStream and outputStream (inputStream和outputStream流copy失败)", e)
+ val exception = BmlClientFailException("failed to copy inputStream and outputStream (inputStream和outputStream流copy失败)")
+ exception.initCause(e)
+ throw exception
+ case t: Throwable => logger.error("failed to copy stream (流复制失败)", t)
+ throw t
+ } finally {
+ if (null != inputStream) IOUtils.closeQuietly(inputStream)
+ if (null != outputStream) IOUtils.closeQuietly(outputStream)
+ fileSystem.close()
}
+ BmlDownloadResponse(isSuccess = true, null, resourceId, version, fullFileName)
}
-
override def downloadShareResource(user: String, resourceId: String, version: String, path: String,
overwrite:Boolean = false): BmlDownloadResponse = {
val fsPath = new FsPath(path)
@@ -162,32 +167,30 @@ class HttpBmlClient(clientConfig: DWSClientConfig,
downloadAction.getParameters += "resourceId" -> resourceId
if(StringUtils.isNotEmpty(version)) downloadAction.getParameters += "version" -> version
downloadAction.setUser(user)
- val downloadResult = dwsClient.execute(downloadAction)
- val fullFilePath = new FsPath(fullFileName)
- if (downloadResult != null){
- val inputStream = downloadAction.getInputStream
- val outputStream = fileSystem.write(fullFilePath, overwrite)
- try{
- IOUtils.copy(inputStream, outputStream)
- }catch{
- case e:IOException => logger.error("failed to copy inputStream and outputStream (inputStream和outputStream流copy失败)", e)
- val exception = BmlClientFailException("failed to copy inputStream and outputStream (inputStream和outputStream流copy失败)")
- exception.initCause(e)
- throw e
- case t:Throwable => logger.error("failed to copy stream (流复制失败)",t)
- throw t
- }finally{
- IOUtils.closeQuietly(inputStream)
- IOUtils.closeQuietly(outputStream)
- }
- BmlDownloadResponse(isSuccess = true, null, resourceId, version, fullFileName)
- }else{
- BmlDownloadResponse(isSuccess = false, null, null, null, null)
+ var inputStream: InputStream = null
+ var outputStream: OutputStream = null
+ try {
+ dwsClient.execute(downloadAction)
+ val fullFilePath = new FsPath(fullFileName)
+ outputStream = fileSystem.write(fullFilePath, overwrite)
+ inputStream = downloadAction.getInputStream
+ IOUtils.copy(inputStream, outputStream)
+ } catch {
+ case e:IOException => logger.error("failed to copy inputStream and outputStream (inputStream和outputStream流copy失败)", e)
+ val exception = BmlClientFailException("failed to copy inputStream and outputStream (inputStream和outputStream流copy失败)")
+ exception.initCause(e)
+ throw e
+ case t:Throwable => logger.error("failed to copy stream (流复制失败)",t)
+ throw t
+ } finally {
+ if (null != inputStream) IOUtils.closeQuietly(inputStream)
+ if (null != outputStream) IOUtils.closeQuietly(outputStream)
+ fileSystem.close()
}
+ BmlDownloadResponse(isSuccess = true, null, resourceId, version, fullFileName)
}
-
/**
* 更新资源信息
*
diff --git a/linkis-public-enhancements/linkis-bml/linkis-bml-client/src/main/scala/org/apache/linkis/bml/http/HttpConf.scala b/linkis-public-enhancements/linkis-bml/linkis-bml-client/src/main/scala/org/apache/linkis/bml/http/HttpConf.scala
index 8447764..1308bd9 100644
--- a/linkis-public-enhancements/linkis-bml/linkis-bml-client/src/main/scala/org/apache/linkis/bml/http/HttpConf.scala
+++ b/linkis-public-enhancements/linkis-bml/linkis-bml-client/src/main/scala/org/apache/linkis/bml/http/HttpConf.scala
@@ -43,6 +43,10 @@ object HttpConf {
val uploadShareResourceUrl:String = urlPrefix + "/" + "uploadShareResource"
val updateShareResourceUrl:String = urlPrefix + "/" + "updateShareResource"
val attachUrl:String = urlPrefix + "/" + "attachResourceAndProject"
+ val changeOwnerUrl: String = urlPrefix + "/" + "changeOwner"
+ val rollbackVersionUrl: String = urlPrefix + "/" + "rollbackVersion"
+ val copyResourceUrl: String = urlPrefix + "/" + "copyResourceToAnotherUser"
+
def main(args: Array[String]): Unit = {
println(uploadURL)
println(downloadURL)
diff --git a/linkis-public-enhancements/linkis-bml/linkis-bml-client/src/main/scala/org/apache/linkis/bml/request/BmlPOSTAction.scala b/linkis-public-enhancements/linkis-bml/linkis-bml-client/src/main/scala/org/apache/linkis/bml/request/BmlPOSTAction.scala
index 5b2d497..4d8c419 100644
--- a/linkis-public-enhancements/linkis-bml/linkis-bml-client/src/main/scala/org/apache/linkis/bml/request/BmlPOSTAction.scala
+++ b/linkis-public-enhancements/linkis-bml/linkis-bml-client/src/main/scala/org/apache/linkis/bml/request/BmlPOSTAction.scala
@@ -24,6 +24,7 @@ import java.{lang, util}
import com.google.gson._
import org.apache.linkis.bml.http.HttpConf
import org.apache.linkis.httpclient.request._
+import org.apache.http.HttpResponse
trait BmlAction extends UserAction{
@@ -184,8 +185,9 @@ case class BmlUpdateShareResourceAction(filePaths:Array[String],
case class BmlDownloadAction() extends BmlGETAction with DownloadAction with UserAction{
- private var inputStream:InputStream = _
- private var user:String = _
+ private var inputStream: InputStream = _
+ private var user: String = _
+ private var respoonse: HttpResponse = _
def getInputStream:InputStream = this.inputStream
@@ -198,13 +200,18 @@ case class BmlDownloadAction() extends BmlGETAction with DownloadAction with Use
override def setUser(user: String): Unit = this.user = user
override def getUser: String = this.user
+
+ override def getResponse: HttpResponse = respoonse
+
+ override def setResponse(response: HttpResponse): Unit = this.respoonse = response
}
case class BmlDownloadShareAction() extends BmlGETAction with DownloadAction with UserAction{
- private var inputStream:InputStream = _
- private var user:String = _
+ private var inputStream: InputStream = _
+ private var user: String = _
+ private var response: HttpResponse = _
def getInputStream:InputStream = this.inputStream
@@ -217,6 +224,10 @@ case class BmlDownloadShareAction() extends BmlGETAction with DownloadAction wit
override def setUser(user: String): Unit = this.user = user
override def getUser: String = this.user
+
+ override def getResponse: HttpResponse = response
+
+ override def setResponse(response: HttpResponse): Unit = this.response = response
}
@@ -280,3 +291,15 @@ case class UpdateBmlProjectAction() extends BmlPOSTAction{
case class BmlAttachAction()extends BmlPOSTAction{
override def getURL: String = HttpConf.attachUrl
}
+
+case class BmlChangeOwnerAction() extends BmlPOSTAction {
+ override def getURL: String = HttpConf.changeOwnerUrl
+}
+
+case class BmlCopyResourceAction() extends BmlPOSTAction {
+ override def getURL: String = HttpConf.copyResourceUrl
+}
+
+case class BmlRollbackVersionAction() extends BmlPOSTAction {
+ override def getURL: String = HttpConf.rollbackVersionUrl
+}
---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscribe@linkis.apache.org
For additional commands, e-mail: commits-help@linkis.apache.org