You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@inlong.apache.org by do...@apache.org on 2023/03/30 07:59:47 UTC
[inlong] branch master updated: [INLONG-6672][Manager] Add suspend and restart command (#7721)
This is an automated email from the ASF dual-hosted git repository.
dockerzhang pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/inlong.git
The following commit(s) were added to refs/heads/master by this push:
new a3b4c8d1c [INLONG-6672][Manager] Add suspend and restart command (#7721)
a3b4c8d1c is described below
commit a3b4c8d1c7a9b78e263bce5056b44806a23ee8ec
Author: haifxu <xh...@gmail.com>
AuthorDate: Thu Mar 30 15:59:41 2023 +0800
[INLONG-6672][Manager] Add suspend and restart command (#7721)
---
.../inlong/manager/client/cli/CommandToolMain.java | 2 +
.../inlong/manager/client/cli/CreateCommand.java | 9 ++-
.../inlong/manager/client/cli/DeleteCommand.java | 9 ++-
.../inlong/manager/client/cli/RestartCommand.java | 68 ++++++++++++++++++++++
.../inlong/manager/client/cli/SuspendCommand.java | 68 ++++++++++++++++++++++
.../client/api/inner/client/InlongGroupClient.java | 35 +++++++----
.../manager/client/api/service/InlongGroupApi.java | 4 +-
7 files changed, 179 insertions(+), 16 deletions(-)
diff --git a/inlong-manager/manager-client-tools/src/main/java/org/apache/inlong/manager/client/cli/CommandToolMain.java b/inlong-manager/manager-client-tools/src/main/java/org/apache/inlong/manager/client/cli/CommandToolMain.java
index e8da1bb42..298db76a1 100644
--- a/inlong-manager/manager-client-tools/src/main/java/org/apache/inlong/manager/client/cli/CommandToolMain.java
+++ b/inlong-manager/manager-client-tools/src/main/java/org/apache/inlong/manager/client/cli/CommandToolMain.java
@@ -38,6 +38,8 @@ public class CommandToolMain {
jcommander.addCommand("list", new ListCommand());
jcommander.addCommand("describe", new DescribeCommand());
jcommander.addCommand("create", new CreateCommand());
+ jcommander.addCommand("suspend", new SuspendCommand());
+ jcommander.addCommand("restart", new RestartCommand());
jcommander.addCommand("delete", new DeleteCommand());
jcommander.addCommand("update", new UpdateCommand());
jcommander.addCommand("log", new LogCommand());
diff --git a/inlong-manager/manager-client-tools/src/main/java/org/apache/inlong/manager/client/cli/CreateCommand.java b/inlong-manager/manager-client-tools/src/main/java/org/apache/inlong/manager/client/cli/CreateCommand.java
index 7674ec334..4b4e602ef 100644
--- a/inlong-manager/manager-client-tools/src/main/java/org/apache/inlong/manager/client/cli/CreateCommand.java
+++ b/inlong-manager/manager-client-tools/src/main/java/org/apache/inlong/manager/client/cli/CreateCommand.java
@@ -22,12 +22,14 @@ import com.beust.jcommander.Parameters;
import com.beust.jcommander.converters.FileConverter;
import org.apache.inlong.manager.client.api.InlongClient;
import org.apache.inlong.manager.client.api.InlongGroup;
+import org.apache.inlong.manager.client.api.InlongGroupContext;
import org.apache.inlong.manager.client.api.InlongStreamBuilder;
import org.apache.inlong.manager.client.api.inner.client.InlongClusterClient;
import org.apache.inlong.manager.client.api.inner.client.UserClient;
import org.apache.inlong.manager.client.cli.pojo.CreateGroupConf;
import org.apache.inlong.manager.client.cli.util.ClientUtils;
import org.apache.inlong.manager.client.cli.validator.UserTypeValidator;
+import org.apache.inlong.manager.common.enums.SimpleGroupStatus;
import org.apache.inlong.manager.common.enums.UserTypeEnum;
import org.apache.inlong.manager.common.util.JsonUtils;
import org.apache.inlong.manager.pojo.cluster.ClusterNodeRequest;
@@ -91,8 +93,11 @@ public class CreateCommand extends AbstractCommand {
streamBuilder.transform(groupConf.getStreamTransform());
streamBuilder.initOrUpdate();
// initialize the new stream group
- group.init();
- System.out.println("Create group success!");
+ InlongGroupContext context = group.init();
+ if (!SimpleGroupStatus.STARTED.equals(context.getStatus())) {
+ throw new Exception("Start group failed, current status: " + context.getStatus());
+ }
+ System.out.println("Start group success!");
} catch (Exception e) {
System.out.println("Create group failed!");
System.out.println(e.getMessage());
diff --git a/inlong-manager/manager-client-tools/src/main/java/org/apache/inlong/manager/client/cli/DeleteCommand.java b/inlong-manager/manager-client-tools/src/main/java/org/apache/inlong/manager/client/cli/DeleteCommand.java
index ffa2e1e89..81db2df4f 100644
--- a/inlong-manager/manager-client-tools/src/main/java/org/apache/inlong/manager/client/cli/DeleteCommand.java
+++ b/inlong-manager/manager-client-tools/src/main/java/org/apache/inlong/manager/client/cli/DeleteCommand.java
@@ -21,9 +21,11 @@ import com.beust.jcommander.Parameter;
import com.beust.jcommander.Parameters;
import org.apache.inlong.manager.client.api.InlongClient;
import org.apache.inlong.manager.client.api.InlongGroup;
+import org.apache.inlong.manager.client.api.InlongGroupContext;
import org.apache.inlong.manager.client.api.inner.client.InlongClusterClient;
import org.apache.inlong.manager.client.api.inner.client.UserClient;
import org.apache.inlong.manager.client.cli.util.ClientUtils;
+import org.apache.inlong.manager.common.enums.SimpleGroupStatus;
import java.util.List;
@@ -31,7 +33,7 @@ import java.util.List;
* The delete command used for deleting inlong group instances.
* Please refer to the document for parameters
*/
-@Parameters(commandDescription = "Delete resource by json file")
+@Parameters(commandDescription = "Delete resource by group id")
public class DeleteCommand extends AbstractCommand {
@Parameter()
@@ -62,7 +64,10 @@ public class DeleteCommand extends AbstractCommand {
try {
InlongClient inlongClient = ClientUtils.getClient();
InlongGroup group = inlongClient.getGroup(inlongGroupId);
- group.delete();
+ InlongGroupContext context = group.delete();
+ if (!SimpleGroupStatus.STOPPED.equals(context.getStatus())) {
+ throw new Exception("Delete group failed, current status: " + context.getStatus());
+ }
System.out.println("Delete group success!");
} catch (Exception e) {
System.out.format("Delete group failed! message: %s \n", e.getMessage());
diff --git a/inlong-manager/manager-client-tools/src/main/java/org/apache/inlong/manager/client/cli/RestartCommand.java b/inlong-manager/manager-client-tools/src/main/java/org/apache/inlong/manager/client/cli/RestartCommand.java
new file mode 100644
index 000000000..9154288f0
--- /dev/null
+++ b/inlong-manager/manager-client-tools/src/main/java/org/apache/inlong/manager/client/cli/RestartCommand.java
@@ -0,0 +1,68 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.inlong.manager.client.cli;
+
+import com.beust.jcommander.Parameter;
+import com.beust.jcommander.Parameters;
+import org.apache.inlong.manager.client.api.InlongClient;
+import org.apache.inlong.manager.client.api.InlongGroup;
+import org.apache.inlong.manager.client.api.InlongGroupContext;
+import org.apache.inlong.manager.client.cli.util.ClientUtils;
+import org.apache.inlong.manager.common.enums.SimpleGroupStatus;
+
+import java.util.List;
+
+/**
+ * Restart inlong group
+ */
+@Parameters(commandDescription = "Restart resource by group id")
+public class RestartCommand extends AbstractCommand {
+
+ @Parameter()
+ private List<String> params;
+
+ public RestartCommand() {
+ super("restart");
+ jcommander.addCommand("group", new RestartCommand.RestartGroup());
+ }
+
+ @Parameters(commandDescription = "Restart the inlong group task")
+ private static class RestartGroup extends AbstractCommandRunner {
+
+ @Parameter()
+ private List<String> params;
+
+ @Parameter(names = {"--group", "-g"}, required = true, description = "inlong group id")
+ private String inlongGroupId;
+
+ @Override
+ void run() {
+ try {
+ InlongClient inlongClient = ClientUtils.getClient();
+ InlongGroup group = inlongClient.getGroup(inlongGroupId);
+ InlongGroupContext context = group.restart();
+ if (!SimpleGroupStatus.STARTED.equals(context.getStatus())) {
+ throw new Exception("Restart group failed, current status: " + context.getStatus());
+ }
+ System.out.println("Restart group success!");
+ } catch (Exception e) {
+ System.out.println(e.getMessage());
+ }
+ }
+ }
+}
diff --git a/inlong-manager/manager-client-tools/src/main/java/org/apache/inlong/manager/client/cli/SuspendCommand.java b/inlong-manager/manager-client-tools/src/main/java/org/apache/inlong/manager/client/cli/SuspendCommand.java
new file mode 100644
index 000000000..f9a6f7e11
--- /dev/null
+++ b/inlong-manager/manager-client-tools/src/main/java/org/apache/inlong/manager/client/cli/SuspendCommand.java
@@ -0,0 +1,68 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.inlong.manager.client.cli;
+
+import com.beust.jcommander.Parameter;
+import com.beust.jcommander.Parameters;
+import org.apache.inlong.manager.client.api.InlongClient;
+import org.apache.inlong.manager.client.api.InlongGroup;
+import org.apache.inlong.manager.client.api.InlongGroupContext;
+import org.apache.inlong.manager.client.cli.util.ClientUtils;
+import org.apache.inlong.manager.common.enums.SimpleGroupStatus;
+
+import java.util.List;
+
+/**
+ * Suspend inlong group
+ */
+@Parameters(commandDescription = "Suspend resource by group id")
+public class SuspendCommand extends AbstractCommand {
+
+ @Parameter()
+ private List<String> params;
+
+ public SuspendCommand() {
+ super("suspend");
+ jcommander.addCommand("group", new SuspendCommand.SuspendGroup());
+ }
+
+ @Parameters(commandDescription = "Suspend the inlong group task")
+ private static class SuspendGroup extends AbstractCommandRunner {
+
+ @Parameter()
+ private List<String> params;
+
+ @Parameter(names = {"--group", "-g"}, required = true, description = "inlong group id")
+ private String inlongGroupId;
+
+ @Override
+ void run() {
+ try {
+ InlongClient inlongClient = ClientUtils.getClient();
+ InlongGroup group = inlongClient.getGroup(inlongGroupId);
+ InlongGroupContext context = group.suspend();
+ if (!SimpleGroupStatus.STOPPED.equals(context.getStatus())) {
+ throw new Exception("Suspend group failed, current status: " + context.getStatus());
+ }
+ System.out.println("Suspend group success!");
+ } catch (Exception e) {
+ System.out.println(e.getMessage());
+ }
+ }
+ }
+}
diff --git a/inlong-manager/manager-client/src/main/java/org/apache/inlong/manager/client/api/inner/client/InlongGroupClient.java b/inlong-manager/manager-client/src/main/java/org/apache/inlong/manager/client/api/inner/client/InlongGroupClient.java
index 514c87077..862bf605c 100644
--- a/inlong-manager/manager-client/src/main/java/org/apache/inlong/manager/client/api/inner/client/InlongGroupClient.java
+++ b/inlong-manager/manager-client/src/main/java/org/apache/inlong/manager/client/api/inner/client/InlongGroupClient.java
@@ -199,19 +199,34 @@ public class InlongGroupClient {
}
public boolean operateInlongGroup(String groupId, SimpleGroupStatus status, boolean async) {
+ if (async) {
+ return operateInlongGroupAsync(groupId, status);
+ }
+
+ // Sync
+ Call<Response<WorkflowResult>> responseCall;
+ if (status == SimpleGroupStatus.STOPPED) {
+ responseCall = inlongGroupApi.suspendProcess(groupId);
+ } else if (status == SimpleGroupStatus.STARTED) {
+ responseCall = inlongGroupApi.restartProcess(groupId);
+ } else {
+ throw new IllegalArgumentException(String.format("Unsupported inlong group status: %s", status));
+ }
+
+ Response<WorkflowResult> responseBody = ClientUtils.executeHttpCall(responseCall);
+
+ String errMsg = responseBody.getErrMsg();
+ return responseBody.isSuccess()
+ || errMsg == null
+ || !errMsg.contains("not allowed");
+ }
+
+ public boolean operateInlongGroupAsync(String groupId, SimpleGroupStatus status) {
Call<Response<String>> responseCall;
if (status == SimpleGroupStatus.STOPPED) {
- if (async) {
- responseCall = inlongGroupApi.suspendProcessAsync(groupId);
- } else {
- responseCall = inlongGroupApi.suspendProcess(groupId);
- }
+ responseCall = inlongGroupApi.suspendProcessAsync(groupId);
} else if (status == SimpleGroupStatus.STARTED) {
- if (async) {
- responseCall = inlongGroupApi.restartProcessAsync(groupId);
- } else {
- responseCall = inlongGroupApi.restartProcess(groupId);
- }
+ responseCall = inlongGroupApi.restartProcessAsync(groupId);
} else {
throw new IllegalArgumentException(String.format("Unsupported inlong group status: %s", status));
}
diff --git a/inlong-manager/manager-client/src/main/java/org/apache/inlong/manager/client/api/service/InlongGroupApi.java b/inlong-manager/manager-client/src/main/java/org/apache/inlong/manager/client/api/service/InlongGroupApi.java
index 10cd103b9..6c01de281 100644
--- a/inlong-manager/manager-client/src/main/java/org/apache/inlong/manager/client/api/service/InlongGroupApi.java
+++ b/inlong-manager/manager-client/src/main/java/org/apache/inlong/manager/client/api/service/InlongGroupApi.java
@@ -59,13 +59,13 @@ public interface InlongGroupApi {
Call<Response<String>> suspendProcessAsync(@Path("id") String id);
@POST("group/suspendProcess/{id}")
- Call<Response<String>> suspendProcess(@Path("id") String id);
+ Call<Response<WorkflowResult>> suspendProcess(@Path("id") String id);
@POST("group/restartProcessAsync/{id}")
Call<Response<String>> restartProcessAsync(@Path("id") String id);
@POST("group/restartProcess/{id}")
- Call<Response<String>> restartProcess(@Path("id") String id);
+ Call<Response<WorkflowResult>> restartProcess(@Path("id") String id);
@DELETE("group/deleteAsync/{id}")
Call<Response<String>> deleteGroupAsync(@Path("id") String id);