You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@kyuubi.apache.org by ul...@apache.org on 2022/06/16 01:10:31 UTC
[incubator-kyuubi] branch master updated: [KYUUBI #2628][FOLLOWUP] Refine kyuubi-ctl batch commands
This is an automated email from the ASF dual-hosted git repository.
ulyssesyou pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/incubator-kyuubi.git
The following commit(s) were added to refs/heads/master by this push:
new 27330ddb8 [KYUUBI #2628][FOLLOWUP] Refine kyuubi-ctl batch commands
27330ddb8 is described below
commit 27330ddb8e93dfeb5fc7ede619afe3e2e135b701
Author: Fei Wang <fw...@ebay.com>
AuthorDate: Thu Jun 16 09:10:24 2022 +0800
[KYUUBI #2628][FOLLOWUP] Refine kyuubi-ctl batch commands
### _Why are the changes needed?_
Refine kyuubi-ctl batch commands
### _How was this patch tested?_
- [ ] Add some test cases that check the changes thoroughly including negative and positive cases if possible
- [ ] Add screenshots for manual tests if appropriate
- [x] [Run test](https://kyuubi.apache.org/docs/latest/develop_tools/testing.html#running-tests) locally before make a pull request
Closes #2889 from turboFei/refine_batch_command.
Closes #2628
56bb7e3d [Fei Wang] print current state
d7be6b72 [Fei Wang] print batch report
3c94fb2d [Fei Wang] set debug for doRequest
7c5fd65a [Fei Wang] fix ut
bcd28a74 [Fei Wang] exit if batch job fail
5d732267 [Fei Wang] refactor
4a2bad8e [Fei Wang] refactor the state
9c982190 [Fei Wang] revert --conf
c2b7008d [Fei Wang] fix ut
10778df5 [Fei Wang] print close resp first
a453084e [Fei Wang] support --conf
Authored-by: Fei Wang <fw...@ebay.com>
Signed-off-by: ulysses-you <ul...@apache.org>
---
.../org/apache/kyuubi/ctl/RestClientFactory.scala | 5 +--
.../kyuubi/ctl/cmd/delete/DeleteBatchCommand.scala | 22 +++++++++----
.../kyuubi/ctl/cmd/log/LogBatchCommand.scala | 4 +--
.../kyuubi/ctl/cmd/submit/SubmitBatchCommand.scala | 16 +++++++---
.../org/apache/kyuubi/ctl/util/BatchUtil.scala | 37 ++++++++++++----------
.../java/org/apache/kyuubi/client/RestClient.java | 4 +--
.../org/apache/kyuubi/client/util/JsonUtil.java | 6 ++--
7 files changed, 57 insertions(+), 37 deletions(-)
diff --git a/kyuubi-ctl/src/main/scala/org/apache/kyuubi/ctl/RestClientFactory.scala b/kyuubi-ctl/src/main/scala/org/apache/kyuubi/ctl/RestClientFactory.scala
index a0fcb0458..eb98dcfff 100644
--- a/kyuubi-ctl/src/main/scala/org/apache/kyuubi/ctl/RestClientFactory.scala
+++ b/kyuubi-ctl/src/main/scala/org/apache/kyuubi/ctl/RestClientFactory.scala
@@ -45,7 +45,8 @@ object RestClientFactory {
map: JMap[String, Object],
conf: KyuubiConf): KyuubiRestClient = {
val version = getApiVersion(map)
- val hostUrl = getRestConfig("hostUrl", conf.get(CTL_REST_CLIENT_BASE_URL).get, cliConfig, map)
+ val hostUrl =
+ getRestConfig("hostUrl", conf.get(CTL_REST_CLIENT_BASE_URL).orNull, cliConfig, map)
val authSchema =
getRestConfig("authSchema", conf.get(CTL_REST_CLIENT_AUTH_SCHEMA), cliConfig, map)
@@ -67,7 +68,7 @@ object RestClientFactory {
.build()
case "spnego" =>
val spnegoHost =
- getRestConfig("spnegoHost", conf.get(CTL_REST_CLIENT_SPNEGO_HOST).get, cliConfig, map)
+ getRestConfig("spnegoHost", conf.get(CTL_REST_CLIENT_SPNEGO_HOST).orNull, cliConfig, map)
kyuubiRestClient = KyuubiRestClient.builder(hostUrl)
.apiVersion(KyuubiRestClient.ApiVersion.valueOf(version))
.authHeaderMethod(KyuubiRestClient.AuthHeaderMethod.SPNEGO)
diff --git a/kyuubi-ctl/src/main/scala/org/apache/kyuubi/ctl/cmd/delete/DeleteBatchCommand.scala b/kyuubi-ctl/src/main/scala/org/apache/kyuubi/ctl/cmd/delete/DeleteBatchCommand.scala
index e871d311d..be206f39e 100644
--- a/kyuubi-ctl/src/main/scala/org/apache/kyuubi/ctl/cmd/delete/DeleteBatchCommand.scala
+++ b/kyuubi-ctl/src/main/scala/org/apache/kyuubi/ctl/cmd/delete/DeleteBatchCommand.scala
@@ -18,24 +18,32 @@ package org.apache.kyuubi.ctl.cmd.delete
import org.apache.kyuubi.client.BatchRestApi
import org.apache.kyuubi.client.util.JsonUtil
-import org.apache.kyuubi.ctl.CliConfig
+import org.apache.kyuubi.ctl.{CliConfig, ControlCliException}
import org.apache.kyuubi.ctl.RestClientFactory.withKyuubiRestClient
import org.apache.kyuubi.ctl.cmd.Command
+import org.apache.kyuubi.ctl.util.BatchUtil
class DeleteBatchCommand(cliConfig: CliConfig) extends Command(cliConfig) {
-
- var result: String = null
-
def validate(): Unit = {}
def run(): Unit = {
withKyuubiRestClient(normalizedCliConfig, null, conf) { kyuubiRestClient =>
val batchRestApi: BatchRestApi = new BatchRestApi(kyuubiRestClient)
+ val batchId = normalizedCliConfig.batchOpts.batchId
+
+ val result = batchRestApi.deleteBatch(batchId, normalizedCliConfig.batchOpts.hs2ProxyUser)
- val result = batchRestApi.deleteBatch(
- normalizedCliConfig.batchOpts.batchId,
- normalizedCliConfig.batchOpts.hs2ProxyUser)
info(JsonUtil.toJson(result))
+
+ if (!result.isSuccess) {
+ val batch = batchRestApi.getBatchById(batchId)
+ if (!BatchUtil.isTerminalState(batch.getState)) {
+ error(s"Failed to delete batch $batchId, its current state is ${batch.getState}")
+ throw ControlCliException(1)
+ } else {
+ warn(s"Batch $batchId is already in terminal state ${batch.getState}.")
+ }
+ }
}
}
diff --git a/kyuubi-ctl/src/main/scala/org/apache/kyuubi/ctl/cmd/log/LogBatchCommand.scala b/kyuubi-ctl/src/main/scala/org/apache/kyuubi/ctl/cmd/log/LogBatchCommand.scala
index 40ad5ec8c..d57cd01bd 100644
--- a/kyuubi-ctl/src/main/scala/org/apache/kyuubi/ctl/cmd/log/LogBatchCommand.scala
+++ b/kyuubi-ctl/src/main/scala/org/apache/kyuubi/ctl/cmd/log/LogBatchCommand.scala
@@ -23,6 +23,7 @@ import org.apache.kyuubi.client.api.v1.dto.{Batch, OperationLog}
import org.apache.kyuubi.ctl.CliConfig
import org.apache.kyuubi.ctl.RestClientFactory.withKyuubiRestClient
import org.apache.kyuubi.ctl.cmd.Command
+import org.apache.kyuubi.ctl.util.BatchUtil
class LogBatchCommand(cliConfig: CliConfig) extends Command(cliConfig) {
@@ -59,8 +60,7 @@ class LogBatchCommand(cliConfig: CliConfig) extends Command(cliConfig) {
Thread.sleep(DEFAULT_LOG_QUERY_INTERVAL)
batch = batchRestApi.getBatchById(batchId)
- if (log.getLogRowSet.size() == 0 && batch.getState() != "PENDING"
- && batch.getState() != "RUNNING") {
+ if (log.getLogRowSet.size() == 0 && BatchUtil.isTerminalState(batch.getState)) {
done = true
}
}
diff --git a/kyuubi-ctl/src/main/scala/org/apache/kyuubi/ctl/cmd/submit/SubmitBatchCommand.scala b/kyuubi-ctl/src/main/scala/org/apache/kyuubi/ctl/cmd/submit/SubmitBatchCommand.scala
index 956aaf5d1..2936ba4dc 100644
--- a/kyuubi-ctl/src/main/scala/org/apache/kyuubi/ctl/cmd/submit/SubmitBatchCommand.scala
+++ b/kyuubi-ctl/src/main/scala/org/apache/kyuubi/ctl/cmd/submit/SubmitBatchCommand.scala
@@ -22,10 +22,11 @@ import scala.collection.JavaConverters._
import org.apache.kyuubi.client.BatchRestApi
import org.apache.kyuubi.client.api.v1.dto.{Batch, BatchRequest, OperationLog}
-import org.apache.kyuubi.ctl.CliConfig
+import org.apache.kyuubi.client.util.JsonUtil
+import org.apache.kyuubi.ctl.{CliConfig, ControlCliException}
import org.apache.kyuubi.ctl.RestClientFactory.withKyuubiRestClient
import org.apache.kyuubi.ctl.cmd.Command
-import org.apache.kyuubi.ctl.util.{CtlUtils, Validator}
+import org.apache.kyuubi.ctl.util.{BatchUtil, CtlUtils, Validator}
class SubmitBatchCommand(cliConfig: CliConfig) extends Command(cliConfig) {
@@ -65,12 +66,17 @@ class SubmitBatchCommand(cliConfig: CliConfig) extends Command(cliConfig) {
Thread.sleep(DEFAULT_LOG_QUERY_INTERVAL)
batch = batchRestApi.getBatchById(batchId)
- if (log.getLogRowSet.size() == 0 && batch.getState() != "PENDING"
- && batch.getState() != "RUNNING") {
+ if (log.getLogRowSet.size() == 0 && BatchUtil.isTerminalState(batch.getState)) {
done = true
}
}
+
+ if (BatchUtil.isTerminalState(batch.getState) && !BatchUtil.isFinishedState(batch.getState)) {
+ error(s"Batch $batchId failed: ${JsonUtil.toJson(batch)}")
+ throw ControlCliException(1)
+ } else {
+ info(s"Batch report for $batchId: ${JsonUtil.toJson(batch)}")
+ }
}
}
-
}
diff --git a/kyuubi-rest-client/src/main/java/org/apache/kyuubi/client/util/JsonUtil.java b/kyuubi-ctl/src/main/scala/org/apache/kyuubi/ctl/util/BatchUtil.scala
similarity index 51%
copy from kyuubi-rest-client/src/main/java/org/apache/kyuubi/client/util/JsonUtil.java
copy to kyuubi-ctl/src/main/scala/org/apache/kyuubi/ctl/util/BatchUtil.scala
index dcedcbbf8..b5e3c9d93 100644
--- a/kyuubi-rest-client/src/main/java/org/apache/kyuubi/client/util/JsonUtil.java
+++ b/kyuubi-ctl/src/main/scala/org/apache/kyuubi/ctl/util/BatchUtil.scala
@@ -15,28 +15,31 @@
* limitations under the License.
*/
-package org.apache.kyuubi.client.util;
+package org.apache.kyuubi.ctl.util
-import com.fasterxml.jackson.databind.ObjectMapper;
-import org.apache.kyuubi.client.exception.KyuubiRestException;
+import java.util.Locale
-public final class JsonUtil {
+object BatchUtil {
+ private val PENDING_STATE = "PENDING"
+ private val RUNNING_STATE = "RUNNING"
+ private val FINISHED_STATE = "FINISHED"
+ private val ERROR_STATE = "ERROR"
+ private val CANCELED_STATE = "CANCELED"
+ private val terminalBatchStates = Seq(FINISHED_STATE, ERROR_STATE, CANCELED_STATE)
- private static ObjectMapper MAPPER = new ObjectMapper();
+ def isPendingState(state: String): Boolean = {
+ PENDING_STATE.equalsIgnoreCase(state)
+ }
+
+ def isRunningState(state: String): Boolean = {
+ RUNNING_STATE.equalsIgnoreCase(state)
+ }
- public static String toJson(Object object) {
- try {
- return MAPPER.writeValueAsString(object);
- } catch (Exception e) {
- throw new KyuubiRestException("Failed to convert object to json", e);
- }
+ def isFinishedState(state: String): Boolean = {
+ FINISHED_STATE.equalsIgnoreCase(state)
}
- public static <T> T toObject(String json, Class<T> clazz) {
- try {
- return MAPPER.readValue(json, clazz);
- } catch (Exception e) {
- throw new KyuubiRestException("Failed to convert json string to object", e);
- }
+ def isTerminalState(state: String): Boolean = {
+ state != null && terminalBatchStates.contains(state.toUpperCase(Locale.ROOT))
}
}
diff --git a/kyuubi-rest-client/src/main/java/org/apache/kyuubi/client/RestClient.java b/kyuubi-rest-client/src/main/java/org/apache/kyuubi/client/RestClient.java
index 5de920e36..0ee8f68fa 100644
--- a/kyuubi-rest-client/src/main/java/org/apache/kyuubi/client/RestClient.java
+++ b/kyuubi-rest-client/src/main/java/org/apache/kyuubi/client/RestClient.java
@@ -111,7 +111,7 @@ public class RestClient implements IRestClient {
.setHeader(HttpHeaders.CONTENT_TYPE, "application/json")
.build();
- LOG.info("Executing {} request: {}", httpRequest.getMethod(), uri);
+ LOG.debug("Executing {} request: {}", httpRequest.getMethod(), uri);
ResponseHandler<String> responseHandler =
resp -> {
@@ -126,7 +126,7 @@ public class RestClient implements IRestClient {
};
response = httpclient.execute(httpRequest, responseHandler);
- LOG.info("Response: {}", response);
+ LOG.debug("Response: {}", response);
} catch (ConnectException | ConnectTimeoutException e) {
// net exception can be retried by connecting to other Kyuubi server
throw new KyuubiRetryableException("Api request failed for " + uri.toString(), e);
diff --git a/kyuubi-rest-client/src/main/java/org/apache/kyuubi/client/util/JsonUtil.java b/kyuubi-rest-client/src/main/java/org/apache/kyuubi/client/util/JsonUtil.java
index dcedcbbf8..af4b01c93 100644
--- a/kyuubi-rest-client/src/main/java/org/apache/kyuubi/client/util/JsonUtil.java
+++ b/kyuubi-rest-client/src/main/java/org/apache/kyuubi/client/util/JsonUtil.java
@@ -28,7 +28,8 @@ public final class JsonUtil {
try {
return MAPPER.writeValueAsString(object);
} catch (Exception e) {
- throw new KyuubiRestException("Failed to convert object to json", e);
+ throw new KyuubiRestException(
+ String.format("Failed to convert object(%s) to json", object), e);
}
}
@@ -36,7 +37,8 @@ public final class JsonUtil {
try {
return MAPPER.readValue(json, clazz);
} catch (Exception e) {
- throw new KyuubiRestException("Failed to convert json string to object", e);
+ throw new KyuubiRestException(
+ String.format("Failed to convert json string(%s) to %s", json, clazz.getName()), e);
}
}
}