You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@ignite.apache.org by ni...@apache.org on 2020/05/25 08:08:25 UTC

[ignite] 10/24: ignite-ducktape: WIP.

This is an automated email from the ASF dual-hosted git repository.

nizhikov pushed a commit to branch ignite-ducktape
in repository https://gitbox.apache.org/repos/asf/ignite.git

commit 84c302b811c9b1587ba6371364cc90d9755eef5b
Author: Nikolay Izhikov <ni...@apache.org>
AuthorDate: Mon Apr 13 12:29:28 2020 +0300

    ignite-ducktape: WIP.
---
 .../org/apache/ignite/test/IgniteApplication.java  |  27 ++++
 tests/ignitetest/services/ignite_client_app.py     | 146 +++++++++++++++++++++
 2 files changed, 173 insertions(+)

diff --git a/modules/spring/src/main/java/org/apache/ignite/test/IgniteApplication.java b/modules/spring/src/main/java/org/apache/ignite/test/IgniteApplication.java
new file mode 100644
index 0000000..d3446fc
--- /dev/null
+++ b/modules/spring/src/main/java/org/apache/ignite/test/IgniteApplication.java
@@ -0,0 +1,27 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.ignite.test;
+
+import java.io.File;
+
+public class IgniteApplication {
+    public static void main(String[] args) throws Exception {
+        System.out.println("IgniteApplication.main");
+        new File("/opt/hello-from-app.txt").createNewFile();
+    }
+}
diff --git a/tests/ignitetest/services/ignite_client_app.py b/tests/ignitetest/services/ignite_client_app.py
new file mode 100644
index 0000000..4462e71
--- /dev/null
+++ b/tests/ignitetest/services/ignite_client_app.py
@@ -0,0 +1,146 @@
+# Licensed to the Apache Software Foundation (ASF) under one or more
+# contributor license agreements.  See the NOTICE file distributed with
+# this work for additional information regarding copyright ownership.
+# The ASF licenses this file to You under the Apache License, Version 2.0
+# (the "License"); you may not use this file except in compliance with
+# the License.  You may obtain a copy of the License at
+#
+#    http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing, software
+# distributed under the License is distributed on an "AS IS" BASIS,
+# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+# See the License for the specific language governing permissions and
+# limitations under the License.
+
+import os
+
+from ducktape.cluster.remoteaccount import RemoteCommandError
+from ducktape.services.background_thread import BackgroundThreadService
+
+from ignitetest.directory_layout.ignite_path import IgnitePathResolverMixin
+
+from ignitetest.version import DEV_BRANCH
+
+"""
+The console consumer is a tool that reads data from Kafka and outputs it to standard output.
+"""
+
+
+class IgniteClientApp(IgnitePathResolverMixin, BackgroundThreadService):
+    # Root directory for persistent output
+    PERSISTENT_ROOT = "/mnt/client_app"
+    STDOUT_CAPTURE = os.path.join(PERSISTENT_ROOT, "client_app.stdout")
+    STDERR_CAPTURE = os.path.join(PERSISTENT_ROOT, "client_app.stderr")
+    LOG_DIR = os.path.join(PERSISTENT_ROOT, "logs")
+    LOG_FILE = os.path.join(LOG_DIR, "client_app.log")
+
+    logs = {
+        "consumer_stdout": {
+            "path": STDOUT_CAPTURE,
+            "collect_default": False},
+        "consumer_stderr": {
+            "path": STDERR_CAPTURE,
+            "collect_default": False},
+        "consumer_log": {
+            "path": LOG_FILE,
+            "collect_default": True}
+    }
+
+    def __init__(self, context, version=DEV_BRANCH, num_nodes=1):
+        """
+        Args:
+            num_nodes:                  number of nodes to use (this should be 1)
+        """
+        BackgroundThreadService.__init__(self, context, num_nodes)
+
+        for node in self.nodes:
+            node.version = version
+
+    def start_cmd(self, node):
+        """Return the start command appropriate for the given node."""
+        args = self.args.copy()
+        args['stdout'] = IgniteClientApp.STDOUT_CAPTURE
+        args['stderr'] = IgniteClientApp.STDERR_CAPTURE
+        args['log_dir'] = IgniteClientApp.LOG_DIR
+        args['config_file'] = IgniteClientApp.CONFIG_FILE
+        args['stdout'] = IgniteClientApp.STDOUT_CAPTURE
+        args['console_consumer'] = self.path.script("kafka-console-consumer.sh", node)
+
+        cmd = "%(console_consumer)s " \
+              "--topic %(topic)s " \
+              "--consumer.config %(config_file)s " % args
+
+        cmd += " 2>> %(stderr)s | tee -a %(stdout)s &" % args
+        return cmd
+
+    def pids(self, node):
+        return node.account.java_pids(self.java_class_name())
+
+    def alive(self, node):
+        return len(self.pids(node)) > 0
+
+    def _worker(self, idx, node):
+        node.account.ssh("mkdir -p %s" % IgniteClientApp.PERSISTENT_ROOT, allow_fail=False)
+
+        self.security_config = self.kafka.security_config.client_config(node=node,
+                                                                        jaas_override_variables=self.jaas_override_variables)
+        self.security_config.setup_node(node)
+
+        if self.client_prop_file_override:
+            prop_file = self.client_prop_file_override
+        else:
+            prop_file = self.prop_file(node)
+
+        # TODO: create ignite config: node.account.create_file(IgniteClientApp.CONFIG_FILE, prop_file)
+
+        # Run and capture output
+        cmd = self.start_cmd(node)
+        self.logger.debug("Console consumer %d command: %s", idx, cmd)
+
+        consumer_output = node.account.ssh_capture(cmd, allow_fail=False)
+
+        # TODO: create ignite version of waiting for a app finish
+        for line in consumer_output:
+            msg = line.strip()
+            if msg == "shutdown_complete":
+                # Note that we can only rely on shutdown_complete message if running 0.10.0 or greater
+                if node in self.clean_shutdown_nodes:
+                    raise Exception(
+                        "Unexpected shutdown event from consumer, already shutdown. Consumer index: %d" % idx)
+                self.clean_shutdown_nodes.add(node)
+            else:
+                if self.message_validator is not None:
+                    msg = self.message_validator(msg)
+                if msg is not None:
+                    self.messages_consumed[idx].append(msg)
+
+    def start_node(self, node):
+        BackgroundThreadService.start_node(self, node)
+
+    def stop_node(self, node):
+        self.logger.info("%s Stopping node %s" % (self.__class__.__name__, str(node.account)))
+        node.account.kill_java_processes(self.java_class_name(),
+                                         clean_shutdown=True, allow_fail=True)
+
+        stopped = self.wait_node(node, timeout_sec=self.stop_timeout_sec)
+        assert stopped, "Node %s: did not stop within the specified timeout of %s seconds" % \
+                        (str(node.account), str(self.stop_timeout_sec))
+
+    def clean_node(self, node):
+        if self.alive(node):
+            self.logger.warn("%s %s was still alive at cleanup time. Killing forcefully..." %
+                             (self.__class__.__name__, node.account))
+        node.account.kill_java_processes(self.java_class_name(), clean_shutdown=False, allow_fail=True)
+        node.account.ssh("rm -rf %s" % IgniteClientApp.PERSISTENT_ROOT, allow_fail=False)
+        self.security_config.clean_node(node)
+
+    def java_class_name(self):
+        return "org.apache.ignite.test.IgniteApplication"
+
+    def has_log_message(self, node, message):
+        try:
+            node.account.ssh("grep '%s' %s" % (message, IgniteClientApp.LOG_FILE))
+        except RemoteCommandError:
+            return False
+        return True