You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@inlong.apache.org by do...@apache.org on 2023/03/30 07:59:47 UTC

[inlong] branch master updated: [INLONG-6672][Manager] Add suspend and restart command (#7721)

This is an automated email from the ASF dual-hosted git repository.

dockerzhang pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/inlong.git


The following commit(s) were added to refs/heads/master by this push:
     new a3b4c8d1c [INLONG-6672][Manager] Add suspend and restart command (#7721)
a3b4c8d1c is described below

commit a3b4c8d1c7a9b78e263bce5056b44806a23ee8ec
Author: haifxu <xh...@gmail.com>
AuthorDate: Thu Mar 30 15:59:41 2023 +0800

    [INLONG-6672][Manager] Add suspend and restart command (#7721)
---
 .../inlong/manager/client/cli/CommandToolMain.java |  2 +
 .../inlong/manager/client/cli/CreateCommand.java   |  9 ++-
 .../inlong/manager/client/cli/DeleteCommand.java   |  9 ++-
 .../inlong/manager/client/cli/RestartCommand.java  | 68 ++++++++++++++++++++++
 .../inlong/manager/client/cli/SuspendCommand.java  | 68 ++++++++++++++++++++++
 .../client/api/inner/client/InlongGroupClient.java | 35 +++++++----
 .../manager/client/api/service/InlongGroupApi.java |  4 +-
 7 files changed, 179 insertions(+), 16 deletions(-)

diff --git a/inlong-manager/manager-client-tools/src/main/java/org/apache/inlong/manager/client/cli/CommandToolMain.java b/inlong-manager/manager-client-tools/src/main/java/org/apache/inlong/manager/client/cli/CommandToolMain.java
index e8da1bb42..298db76a1 100644
--- a/inlong-manager/manager-client-tools/src/main/java/org/apache/inlong/manager/client/cli/CommandToolMain.java
+++ b/inlong-manager/manager-client-tools/src/main/java/org/apache/inlong/manager/client/cli/CommandToolMain.java
@@ -38,6 +38,8 @@ public class CommandToolMain {
         jcommander.addCommand("list", new ListCommand());
         jcommander.addCommand("describe", new DescribeCommand());
         jcommander.addCommand("create", new CreateCommand());
+        jcommander.addCommand("suspend", new SuspendCommand());
+        jcommander.addCommand("restart", new RestartCommand());
         jcommander.addCommand("delete", new DeleteCommand());
         jcommander.addCommand("update", new UpdateCommand());
         jcommander.addCommand("log", new LogCommand());
diff --git a/inlong-manager/manager-client-tools/src/main/java/org/apache/inlong/manager/client/cli/CreateCommand.java b/inlong-manager/manager-client-tools/src/main/java/org/apache/inlong/manager/client/cli/CreateCommand.java
index 7674ec334..4b4e602ef 100644
--- a/inlong-manager/manager-client-tools/src/main/java/org/apache/inlong/manager/client/cli/CreateCommand.java
+++ b/inlong-manager/manager-client-tools/src/main/java/org/apache/inlong/manager/client/cli/CreateCommand.java
@@ -22,12 +22,14 @@ import com.beust.jcommander.Parameters;
 import com.beust.jcommander.converters.FileConverter;
 import org.apache.inlong.manager.client.api.InlongClient;
 import org.apache.inlong.manager.client.api.InlongGroup;
+import org.apache.inlong.manager.client.api.InlongGroupContext;
 import org.apache.inlong.manager.client.api.InlongStreamBuilder;
 import org.apache.inlong.manager.client.api.inner.client.InlongClusterClient;
 import org.apache.inlong.manager.client.api.inner.client.UserClient;
 import org.apache.inlong.manager.client.cli.pojo.CreateGroupConf;
 import org.apache.inlong.manager.client.cli.util.ClientUtils;
 import org.apache.inlong.manager.client.cli.validator.UserTypeValidator;
+import org.apache.inlong.manager.common.enums.SimpleGroupStatus;
 import org.apache.inlong.manager.common.enums.UserTypeEnum;
 import org.apache.inlong.manager.common.util.JsonUtils;
 import org.apache.inlong.manager.pojo.cluster.ClusterNodeRequest;
@@ -91,8 +93,11 @@ public class CreateCommand extends AbstractCommand {
                 streamBuilder.transform(groupConf.getStreamTransform());
                 streamBuilder.initOrUpdate();
                 // initialize the new stream group
-                group.init();
-                System.out.println("Create group success!");
+                InlongGroupContext context = group.init();
+                if (!SimpleGroupStatus.STARTED.equals(context.getStatus())) {
+                    throw new Exception("Start group failed, current status: " + context.getStatus());
+                }
+                System.out.println("Start group success!");
             } catch (Exception e) {
                 System.out.println("Create group failed!");
                 System.out.println(e.getMessage());
diff --git a/inlong-manager/manager-client-tools/src/main/java/org/apache/inlong/manager/client/cli/DeleteCommand.java b/inlong-manager/manager-client-tools/src/main/java/org/apache/inlong/manager/client/cli/DeleteCommand.java
index ffa2e1e89..81db2df4f 100644
--- a/inlong-manager/manager-client-tools/src/main/java/org/apache/inlong/manager/client/cli/DeleteCommand.java
+++ b/inlong-manager/manager-client-tools/src/main/java/org/apache/inlong/manager/client/cli/DeleteCommand.java
@@ -21,9 +21,11 @@ import com.beust.jcommander.Parameter;
 import com.beust.jcommander.Parameters;
 import org.apache.inlong.manager.client.api.InlongClient;
 import org.apache.inlong.manager.client.api.InlongGroup;
+import org.apache.inlong.manager.client.api.InlongGroupContext;
 import org.apache.inlong.manager.client.api.inner.client.InlongClusterClient;
 import org.apache.inlong.manager.client.api.inner.client.UserClient;
 import org.apache.inlong.manager.client.cli.util.ClientUtils;
+import org.apache.inlong.manager.common.enums.SimpleGroupStatus;
 
 import java.util.List;
 
@@ -31,7 +33,7 @@ import java.util.List;
  * The delete command used for deleting inlong group instances.
  * Please refer to the document for parameters
  */
-@Parameters(commandDescription = "Delete resource by json file")
+@Parameters(commandDescription = "Delete resource by group id")
 public class DeleteCommand extends AbstractCommand {
 
     @Parameter()
@@ -62,7 +64,10 @@ public class DeleteCommand extends AbstractCommand {
             try {
                 InlongClient inlongClient = ClientUtils.getClient();
                 InlongGroup group = inlongClient.getGroup(inlongGroupId);
-                group.delete();
+                InlongGroupContext context = group.delete();
+                if (!SimpleGroupStatus.STOPPED.equals(context.getStatus())) {
+                    throw new Exception("Delete group failed, current status: " + context.getStatus());
+                }
                 System.out.println("Delete group success!");
             } catch (Exception e) {
                 System.out.format("Delete group failed! message: %s \n", e.getMessage());
diff --git a/inlong-manager/manager-client-tools/src/main/java/org/apache/inlong/manager/client/cli/RestartCommand.java b/inlong-manager/manager-client-tools/src/main/java/org/apache/inlong/manager/client/cli/RestartCommand.java
new file mode 100644
index 000000000..9154288f0
--- /dev/null
+++ b/inlong-manager/manager-client-tools/src/main/java/org/apache/inlong/manager/client/cli/RestartCommand.java
@@ -0,0 +1,68 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.inlong.manager.client.cli;
+
+import com.beust.jcommander.Parameter;
+import com.beust.jcommander.Parameters;
+import org.apache.inlong.manager.client.api.InlongClient;
+import org.apache.inlong.manager.client.api.InlongGroup;
+import org.apache.inlong.manager.client.api.InlongGroupContext;
+import org.apache.inlong.manager.client.cli.util.ClientUtils;
+import org.apache.inlong.manager.common.enums.SimpleGroupStatus;
+
+import java.util.List;
+
+/**
+ * Restart inlong group
+ */
+@Parameters(commandDescription = "Restart resource by group id")
+public class RestartCommand extends AbstractCommand {
+
+    @Parameter()
+    private List<String> params;
+
+    public RestartCommand() {
+        super("restart");
+        jcommander.addCommand("group", new RestartCommand.RestartGroup());
+    }
+
+    @Parameters(commandDescription = "Restart the inlong group task")
+    private static class RestartGroup extends AbstractCommandRunner {
+
+        @Parameter()
+        private List<String> params;
+
+        @Parameter(names = {"--group", "-g"}, required = true, description = "inlong group id")
+        private String inlongGroupId;
+
+        @Override
+        void run() {
+            try {
+                InlongClient inlongClient = ClientUtils.getClient();
+                InlongGroup group = inlongClient.getGroup(inlongGroupId);
+                InlongGroupContext context = group.restart();
+                if (!SimpleGroupStatus.STARTED.equals(context.getStatus())) {
+                    throw new Exception("Restart group failed, current status: " + context.getStatus());
+                }
+                System.out.println("Restart group success!");
+            } catch (Exception e) {
+                System.out.println(e.getMessage());
+            }
+        }
+    }
+}
diff --git a/inlong-manager/manager-client-tools/src/main/java/org/apache/inlong/manager/client/cli/SuspendCommand.java b/inlong-manager/manager-client-tools/src/main/java/org/apache/inlong/manager/client/cli/SuspendCommand.java
new file mode 100644
index 000000000..f9a6f7e11
--- /dev/null
+++ b/inlong-manager/manager-client-tools/src/main/java/org/apache/inlong/manager/client/cli/SuspendCommand.java
@@ -0,0 +1,68 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.inlong.manager.client.cli;
+
+import com.beust.jcommander.Parameter;
+import com.beust.jcommander.Parameters;
+import org.apache.inlong.manager.client.api.InlongClient;
+import org.apache.inlong.manager.client.api.InlongGroup;
+import org.apache.inlong.manager.client.api.InlongGroupContext;
+import org.apache.inlong.manager.client.cli.util.ClientUtils;
+import org.apache.inlong.manager.common.enums.SimpleGroupStatus;
+
+import java.util.List;
+
+/**
+ * Suspend inlong group
+ */
+@Parameters(commandDescription = "Suspend resource by group id")
+public class SuspendCommand extends AbstractCommand {
+
+    @Parameter()
+    private List<String> params;
+
+    public SuspendCommand() {
+        super("suspend");
+        jcommander.addCommand("group", new SuspendCommand.SuspendGroup());
+    }
+
+    @Parameters(commandDescription = "Suspend the inlong group task")
+    private static class SuspendGroup extends AbstractCommandRunner {
+
+        @Parameter()
+        private List<String> params;
+
+        @Parameter(names = {"--group", "-g"}, required = true, description = "inlong group id")
+        private String inlongGroupId;
+
+        @Override
+        void run() {
+            try {
+                InlongClient inlongClient = ClientUtils.getClient();
+                InlongGroup group = inlongClient.getGroup(inlongGroupId);
+                InlongGroupContext context = group.suspend();
+                if (!SimpleGroupStatus.STOPPED.equals(context.getStatus())) {
+                    throw new Exception("Suspend group failed, current status: " + context.getStatus());
+                }
+                System.out.println("Suspend group success!");
+            } catch (Exception e) {
+                System.out.println(e.getMessage());
+            }
+        }
+    }
+}
diff --git a/inlong-manager/manager-client/src/main/java/org/apache/inlong/manager/client/api/inner/client/InlongGroupClient.java b/inlong-manager/manager-client/src/main/java/org/apache/inlong/manager/client/api/inner/client/InlongGroupClient.java
index 514c87077..862bf605c 100644
--- a/inlong-manager/manager-client/src/main/java/org/apache/inlong/manager/client/api/inner/client/InlongGroupClient.java
+++ b/inlong-manager/manager-client/src/main/java/org/apache/inlong/manager/client/api/inner/client/InlongGroupClient.java
@@ -199,19 +199,34 @@ public class InlongGroupClient {
     }
 
     public boolean operateInlongGroup(String groupId, SimpleGroupStatus status, boolean async) {
+        if (async) {
+            return operateInlongGroupAsync(groupId, status);
+        }
+
+        // Sync
+        Call<Response<WorkflowResult>> responseCall;
+        if (status == SimpleGroupStatus.STOPPED) {
+            responseCall = inlongGroupApi.suspendProcess(groupId);
+        } else if (status == SimpleGroupStatus.STARTED) {
+            responseCall = inlongGroupApi.restartProcess(groupId);
+        } else {
+            throw new IllegalArgumentException(String.format("Unsupported inlong group status: %s", status));
+        }
+
+        Response<WorkflowResult> responseBody = ClientUtils.executeHttpCall(responseCall);
+
+        String errMsg = responseBody.getErrMsg();
+        return responseBody.isSuccess()
+                || errMsg == null
+                || !errMsg.contains("not allowed");
+    }
+
+    public boolean operateInlongGroupAsync(String groupId, SimpleGroupStatus status) {
         Call<Response<String>> responseCall;
         if (status == SimpleGroupStatus.STOPPED) {
-            if (async) {
-                responseCall = inlongGroupApi.suspendProcessAsync(groupId);
-            } else {
-                responseCall = inlongGroupApi.suspendProcess(groupId);
-            }
+            responseCall = inlongGroupApi.suspendProcessAsync(groupId);
         } else if (status == SimpleGroupStatus.STARTED) {
-            if (async) {
-                responseCall = inlongGroupApi.restartProcessAsync(groupId);
-            } else {
-                responseCall = inlongGroupApi.restartProcess(groupId);
-            }
+            responseCall = inlongGroupApi.restartProcessAsync(groupId);
         } else {
             throw new IllegalArgumentException(String.format("Unsupported inlong group status: %s", status));
         }
diff --git a/inlong-manager/manager-client/src/main/java/org/apache/inlong/manager/client/api/service/InlongGroupApi.java b/inlong-manager/manager-client/src/main/java/org/apache/inlong/manager/client/api/service/InlongGroupApi.java
index 10cd103b9..6c01de281 100644
--- a/inlong-manager/manager-client/src/main/java/org/apache/inlong/manager/client/api/service/InlongGroupApi.java
+++ b/inlong-manager/manager-client/src/main/java/org/apache/inlong/manager/client/api/service/InlongGroupApi.java
@@ -59,13 +59,13 @@ public interface InlongGroupApi {
     Call<Response<String>> suspendProcessAsync(@Path("id") String id);
 
     @POST("group/suspendProcess/{id}")
-    Call<Response<String>> suspendProcess(@Path("id") String id);
+    Call<Response<WorkflowResult>> suspendProcess(@Path("id") String id);
 
     @POST("group/restartProcessAsync/{id}")
     Call<Response<String>> restartProcessAsync(@Path("id") String id);
 
     @POST("group/restartProcess/{id}")
-    Call<Response<String>> restartProcess(@Path("id") String id);
+    Call<Response<WorkflowResult>> restartProcess(@Path("id") String id);
 
     @DELETE("group/deleteAsync/{id}")
     Call<Response<String>> deleteGroupAsync(@Path("id") String id);