You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@ignite.apache.org by ni...@apache.org on 2020/07/16 10:34:54 UTC

[ignite] branch ignite-ducktape updated: Duck is a duck (#7967)

This is an automated email from the ASF dual-hosted git repository.

nizhikov pushed a commit to branch ignite-ducktape
in repository https://gitbox.apache.org/repos/asf/ignite.git


The following commit(s) were added to refs/heads/ignite-ducktape by this push:
     new aeb5db9  Duck is a duck  (#7967)
aeb5db9 is described below

commit aeb5db9afe9b117d36ebf8590ff3df1df114f15b
Author: Anton Vinogradov <av...@apache.org>
AuthorDate: Thu Jul 16 13:34:28 2020 +0300

    Duck is a duck  (#7967)
---
 .../ducktest/DataGenerationApplication.java        |  53 ++++++
 .../ducktest/LongTxStreamerApplication.java        | 107 ++++++++++++
 .../ducktest/SampleDataStreamerApplication.java    |  61 +++++++
 .../ducktest/SingleKeyTxStreamerApplication.java   |  85 ++++++++++
 .../{test => ducktest}/SparkApplication.java       |  52 +++---
 .../ducktest/utils/IgniteApplicationService.java   |  63 +++++++
 .../ducktest/utils/IgniteAwareApplication.java     | 185 +++++++++++++++++++++
 .../utils/IgniteAwareApplicationService.java       |  47 ++++++
 .../ignite/internal/test/IgniteApplication.java    |  73 --------
 .../ducktests/src/main/resources/log4j.properties  |  14 +-
 modules/ducktests/tests/docker/Dockerfile          |  18 +-
 modules/ducktests/tests/docker/ducker-ignite       |   4 +-
 .../benchmarks/add_node_rebalance_test.py          |  56 -------
 .../tests/ignitetest/services/__init__.py          |   1 -
 .../ducktests/tests/ignitetest/services/ignite.py  |  86 +++-------
 .../tests/ignitetest/services/ignite_app.py        |  38 +++++
 .../tests/ignitetest/services/ignite_client_app.py | 151 -----------------
 .../services/{__init__.py => ignite_spark_app.py}  |  16 ++
 .../ducktests/tests/ignitetest/services/spark.py   |  26 ++-
 .../ignitetest/services/utils/ignite_aware.py      | 105 ++++++++++++
 .../ignitetest/services/utils/ignite_aware_app.py  | 108 ++++++++++++
 .../ignitetest/services/utils/ignite_config.py     |   6 +-
 .../tests/ignitetest/services/utils/ignite_path.py |   6 +-
 .../ignitetest/{ => tests}/benchmarks/__init__.py  |   0
 .../tests/benchmarks/add_node_rebalance_test.py    |  94 +++++++++++
 .../tests/benchmarks/pme_free_switch_test.py       | 117 +++++++++++++
 .../ignitetest/tests/spark_integration_test.py     |  30 +++-
 .../{benchmarks => tests/utils}/__init__.py        |   0
 .../__init__.py => tests/utils/ignite_test.py}     |   9 +
 modules/ducktests/tests/ignitetest/version.py      |   5 +
 scripts/{build.sh => build-module.sh}              |   4 +-
 scripts/build.sh                                   |   2 +-
 32 files changed, 1200 insertions(+), 422 deletions(-)

diff --git a/modules/ducktests/src/main/java/org/apache/ignite/internal/ducktest/DataGenerationApplication.java b/modules/ducktests/src/main/java/org/apache/ignite/internal/ducktest/DataGenerationApplication.java
new file mode 100644
index 0000000..4c5ffcf
--- /dev/null
+++ b/modules/ducktests/src/main/java/org/apache/ignite/internal/ducktest/DataGenerationApplication.java
@@ -0,0 +1,53 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.ignite.internal.ducktest;
+
+import org.apache.ignite.Ignite;
+import org.apache.ignite.IgniteCache;
+import org.apache.ignite.IgniteDataStreamer;
+import org.apache.ignite.internal.ducktest.utils.IgniteAwareApplication;
+
+/**
+ *
+ */
+public class DataGenerationApplication extends IgniteAwareApplication {
+    /**
+     * @param ignite Ignite.
+     */
+    public DataGenerationApplication(Ignite ignite) {
+        super(ignite);
+    }
+
+    /** {@inheritDoc} */
+    @Override protected void run(String[] args) {
+        log.info("Creating cache...");
+
+        IgniteCache<Integer, Integer> cache = ignite.createCache(args[0]);
+
+        try (IgniteDataStreamer<Integer, Integer> stmr = ignite.dataStreamer(cache.getName())) {
+            for (int i = 0; i < Integer.parseInt(args[1]); i++) {
+                stmr.addData(i, i);
+
+                if (i % 10_000 == 0)
+                    log.info("Streamed " + i + " entries");
+            }
+        }
+
+        markSyncExecutionComplete();
+    }
+}
diff --git a/modules/ducktests/src/main/java/org/apache/ignite/internal/ducktest/LongTxStreamerApplication.java b/modules/ducktests/src/main/java/org/apache/ignite/internal/ducktest/LongTxStreamerApplication.java
new file mode 100644
index 0000000..4bbc078
--- /dev/null
+++ b/modules/ducktests/src/main/java/org/apache/ignite/internal/ducktest/LongTxStreamerApplication.java
@@ -0,0 +1,107 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.ignite.internal.ducktest;
+
+import java.util.Collection;
+import java.util.concurrent.CountDownLatch;
+import org.apache.ignite.Ignite;
+import org.apache.ignite.IgniteCache;
+import org.apache.ignite.internal.IgniteEx;
+import org.apache.ignite.internal.IgniteInterruptedCheckedException;
+import org.apache.ignite.internal.processors.cache.transactions.IgniteInternalTx;
+import org.apache.ignite.internal.ducktest.utils.IgniteAwareApplication;
+import org.apache.ignite.internal.util.typedef.internal.U;
+import org.apache.ignite.transactions.Transaction;
+import org.apache.ignite.transactions.TransactionState;
+
+/**
+ *
+ */
+public class LongTxStreamerApplication extends IgniteAwareApplication {
+    /** Tx count. */
+    private static final int TX_CNT = 100;
+
+    /** Started. */
+    private static final CountDownLatch started = new CountDownLatch(TX_CNT);
+
+    /**
+     * @param ignite Ignite.
+     */
+    public LongTxStreamerApplication(Ignite ignite) {
+        super(ignite);
+    }
+
+    /** {@inheritDoc} */
+    @Override public void run(String[] args) throws InterruptedException {
+        IgniteCache<Integer, Integer> cache = ignite.getOrCreateCache(args[0]);
+
+        log.info("Starting Long Tx...");
+
+        for (int i = 0; i < TX_CNT; i++) {
+            int finalI = i;
+
+            new Thread(() -> {
+                Transaction tx = ignite.transactions().txStart();
+
+                cache.put(finalI, finalI);
+
+                log.info("Long Tx started [key=" + finalI + "]");
+
+                started.countDown();
+
+                while (!terminated()) {
+                    if (tx.state() != TransactionState.ACTIVE) {
+                        log.info("Transaction broken. [key=" + finalI + "]");
+
+                        break;
+                    }
+
+                    try {
+                        U.sleep(1000);
+                    }
+                    catch (IgniteInterruptedCheckedException ignored) {
+                        // No-op.
+                    }
+                }
+
+                log.info("Stopping tx thread [state=" + tx.state() + "]");
+
+            }).start();
+        }
+
+        started.await();
+
+        markInitialized();
+
+        while (!terminated()) {
+            Collection<IgniteInternalTx> active =
+                ((IgniteEx)ignite).context().cache().context().tm().activeTransactions();
+
+            log.info("Long Txs are in progress [txs=" + active.size() + "]");
+
+            try {
+                U.sleep(100); // Keeping node/txs alive.
+            }
+            catch (IgniteInterruptedCheckedException ignored) {
+                log.info("Waiting interrupted.");
+            }
+        }
+
+        markFinished();
+    }
+}
diff --git a/modules/ducktests/src/main/java/org/apache/ignite/internal/ducktest/SampleDataStreamerApplication.java b/modules/ducktests/src/main/java/org/apache/ignite/internal/ducktest/SampleDataStreamerApplication.java
new file mode 100644
index 0000000..281d202
--- /dev/null
+++ b/modules/ducktests/src/main/java/org/apache/ignite/internal/ducktest/SampleDataStreamerApplication.java
@@ -0,0 +1,61 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.ignite.internal.ducktest;
+
+import org.apache.ignite.Ignite;
+import org.apache.ignite.IgniteCache;
+import org.apache.ignite.cache.query.SqlFieldsQuery;
+import org.apache.ignite.internal.ducktest.utils.IgniteAwareApplication;
+
+/**
+ *
+ */
+public class SampleDataStreamerApplication extends IgniteAwareApplication {
+    /**
+     * @param ignite Ignite.
+     */
+    public SampleDataStreamerApplication(Ignite ignite) {
+        super(ignite);
+    }
+
+    /**
+     * @param cache Cache.
+     * @param qry Query.
+     * @param args Args.
+     */
+    private static void executeSql(IgniteCache<Integer, Integer> cache, String qry, Object... args) {
+        cache.query(new SqlFieldsQuery(qry).setArgs(args)).getAll();
+    }
+
+    /** {@inheritDoc} */
+    @Override protected void run(String[] args) {
+        System.out.println("Creating cache...");
+
+        IgniteCache<Integer, Integer> cache = ignite.createCache(args[0]);
+
+        for (int i = 0; i < Integer.parseInt(args[1]); i++)
+            cache.put(i, i);
+
+        executeSql(cache, "CREATE TABLE person(id INT, fio VARCHAR, PRIMARY KEY(id))");
+        executeSql(cache, "INSERT INTO person(id, fio) VALUES(?, ?)", 1, "Ivanov Ivan");
+        executeSql(cache, "INSERT INTO person(id, fio) VALUES(?, ?)", 2, "Petrov Petr");
+        executeSql(cache, "INSERT INTO person(id, fio) VALUES(?, ?)", 3, "Sidorov Sidr");
+
+        markSyncExecutionComplete();
+    }
+}
diff --git a/modules/ducktests/src/main/java/org/apache/ignite/internal/ducktest/SingleKeyTxStreamerApplication.java b/modules/ducktests/src/main/java/org/apache/ignite/internal/ducktest/SingleKeyTxStreamerApplication.java
new file mode 100644
index 0000000..bff4e7e
--- /dev/null
+++ b/modules/ducktests/src/main/java/org/apache/ignite/internal/ducktest/SingleKeyTxStreamerApplication.java
@@ -0,0 +1,85 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.ignite.internal.ducktest;
+
+import org.apache.ignite.Ignite;
+import org.apache.ignite.IgniteCache;
+import org.apache.ignite.internal.ducktest.utils.IgniteAwareApplication;
+
+/**
+ *
+ */
+public class SingleKeyTxStreamerApplication extends IgniteAwareApplication {
+    /**
+     * @param ignite Ignite.
+     */
+    public SingleKeyTxStreamerApplication(Ignite ignite) {
+        super(ignite);
+    }
+
+    /** {@inheritDoc} */
+    @Override public void run(String[] args) {
+        IgniteCache<Integer, Integer> cache = ignite.getOrCreateCache(args[0]);
+
+        int warmup = Integer.parseInt(args[1]);
+
+        long max = -1;
+
+        int key = 10_000_000;
+
+        int cnt = 0;
+
+        long initTime = 0;
+
+        boolean record = false;
+
+        while (!terminated()) {
+            cnt++;
+
+            long start = System.currentTimeMillis();
+
+            cache.put(key++, key);
+
+            long finish = System.currentTimeMillis();
+
+            long time = finish - start;
+
+            if (!record && cnt > warmup) {
+                record = true;
+
+                initTime = System.currentTimeMillis();;
+
+                markInitialized();
+            }
+
+            if (record) {
+                if (max < time)
+                    max = time;
+            }
+
+            if (cnt % 1000 == 0)
+                log.info("Streamed " + cnt + " transactions [max=" + max + "]");
+        }
+
+        recordResult("WORST_LATENCY", max);
+        recordResult("STREAMED", cnt - warmup);
+        recordResult("MEASURE_DURATION", System.currentTimeMillis() - initTime);
+
+        markFinished();
+    }
+}
diff --git a/modules/ducktests/src/main/java/org/apache/ignite/internal/test/SparkApplication.java b/modules/ducktests/src/main/java/org/apache/ignite/internal/ducktest/SparkApplication.java
similarity index 85%
rename from modules/ducktests/src/main/java/org/apache/ignite/internal/test/SparkApplication.java
rename to modules/ducktests/src/main/java/org/apache/ignite/internal/ducktest/SparkApplication.java
index fc41f7f..0823a82 100644
--- a/modules/ducktests/src/main/java/org/apache/ignite/internal/test/SparkApplication.java
+++ b/modules/ducktests/src/main/java/org/apache/ignite/internal/ducktest/SparkApplication.java
@@ -15,20 +15,19 @@
  * limitations under the License.
  */
 
-package org.apache.ignite.internal.test;
+package org.apache.ignite.internal.ducktest;
 
+import org.apache.ignite.internal.ducktest.utils.IgniteAwareApplication;
 import org.apache.ignite.spark.IgniteDataFrameSettings;
 import org.apache.spark.sql.Dataset;
 import org.apache.spark.sql.Row;
 import org.apache.spark.sql.SparkSession;
 import org.apache.spark.sql.ignite.IgniteSparkSession;
 
-import static org.apache.ignite.internal.test.IgniteApplication.CONFIG_PATH;
-
 /**
  *
  */
-public class SparkApplication {
+public class SparkApplication extends IgniteAwareApplication {
     /** Home. */
     public static final String HOME = "/opt/ignite-dev";
 
@@ -39,24 +38,9 @@ public class SparkApplication {
     public static final String SPRING_VER = "4.3.26.RELEASE";
 
     /**
-     * @param args Args.
-     */
-    public static void main(String[] args) {
-        System.out.println("SparkApplication.main - args");
-        for (String arg : args)
-            System.out.println("SparkApplication.main - " + arg);
-
-        sparkSession(args[0]);
-
-        igniteSession(args[0]);
-
-        System.out.println("Ignite Client Finish.");
-    }
-
-    /**
      * @param masterUrl Master url.
      */
-    private static void sparkSession(String masterUrl) {
+    private static void sparkSession(String cfgPath, String masterUrl) {
         //Creating spark session.
         try (SparkSession spark = SparkSession.builder()
             .appName("SparkApplication")
@@ -74,18 +58,19 @@ public class SparkApplication {
             spark.sparkContext().addJar(HOME + "/modules/core/target/libs/cache-api-1.0.0.jar");
             spark.sparkContext().addJar(HOME + "/modules/indexing/target/libs/h2-1.4.197.jar");
 
-            sparkDSLExample(spark);
+            sparkDSLExample(cfgPath, spark);
         }
     }
 
     /**
      * @param masterUrl Master url.
+     * @param cfgPath Config path.
      */
-    private static void igniteSession(String masterUrl) {
+    private static void igniteSession(String cfgPath, String masterUrl) {
         //Creating spark session.
         try (IgniteSparkSession spark = IgniteSparkSession.builder()
             .appName("SparkApplication")
-            .igniteConfig(CONFIG_PATH)
+            .igniteConfig(cfgPath)
             .master(masterUrl)
             .getOrCreate()) {
             spark.sparkContext().addJar(HOME + "/modules/core/target/ignite-core-" + VER + ".jar");
@@ -102,20 +87,21 @@ public class SparkApplication {
 
             spark.catalog().listTables().show();
 
-            sparkDSLExample(spark);
+            sparkDSLExample(cfgPath, spark);
         }
     }
 
     /**
      * @param spark Spark.
+     * @param cfgPath Config path.
      */
-    private static void sparkDSLExample(SparkSession spark) {
+    private static void sparkDSLExample(String cfgPath, SparkSession spark) {
         System.out.println("Querying using Spark DSL.");
 
         Dataset<Row> igniteDF = spark.read()
             .format(IgniteDataFrameSettings.FORMAT_IGNITE()) //Data source type.
             .option(IgniteDataFrameSettings.OPTION_TABLE(), "person") //Table to read.
-            .option(IgniteDataFrameSettings.OPTION_CONFIG_FILE(), CONFIG_PATH) //Ignite config.
+            .option(IgniteDataFrameSettings.OPTION_CONFIG_FILE(), cfgPath) //Ignite config.
             .load();
 
         System.out.println("Data frame schema:");
@@ -126,4 +112,18 @@ public class SparkApplication {
 
         igniteDF.show(); //Printing query results to console.
     }
+
+    /** {@inheritDoc} */
+    @Override protected void run(String[] args) throws Exception {
+        System.out.println("SparkApplication.main - args");
+
+        for (String arg : args)
+            System.out.println("SparkApplication.main - " + arg);
+
+        sparkSession(args[0], args[1]);
+
+        igniteSession(args[0], args[1]);
+
+        markSyncExecutionComplete();
+    }
 }
diff --git a/modules/ducktests/src/main/java/org/apache/ignite/internal/ducktest/utils/IgniteApplicationService.java b/modules/ducktests/src/main/java/org/apache/ignite/internal/ducktest/utils/IgniteApplicationService.java
new file mode 100644
index 0000000..a41f27c
--- /dev/null
+++ b/modules/ducktests/src/main/java/org/apache/ignite/internal/ducktest/utils/IgniteApplicationService.java
@@ -0,0 +1,63 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.ignite.internal.ducktest.utils;
+
+import java.util.Arrays;
+import org.apache.ignite.Ignite;
+import org.apache.ignite.Ignition;
+import org.apache.ignite.configuration.IgniteConfiguration;
+import org.apache.ignite.internal.IgnitionEx;
+import org.apache.ignite.internal.processors.resource.GridSpringResourceContext;
+import org.apache.ignite.lang.IgniteBiTuple;
+import org.apache.log4j.LogManager;
+import org.apache.log4j.Logger;
+
+/**
+ *
+ */
+public class IgniteApplicationService {
+    /** Logger. */
+    private static final Logger log = LogManager.getLogger(IgniteApplicationService.class.getName());
+
+    /**
+     * @param args Args.
+     */
+    public static void main(String[] args) throws Exception {
+        log.info("Starting Application... [params=" + args[0] + "]");
+
+        String[] params = args[0].split(",");
+
+        Class<?> clazz = Class.forName(params[0]);
+
+        IgniteBiTuple<IgniteConfiguration, GridSpringResourceContext> cfgs = IgnitionEx.loadConfiguration(params[1]);
+
+        IgniteConfiguration cfg = cfgs.get1();
+
+        assert cfg.isClientMode();
+
+        log.info("Starting Ignite node...");
+
+        try (Ignite ignite = Ignition.start(cfg)) {
+            IgniteAwareApplication app = (IgniteAwareApplication)clazz.getConstructor(Ignite.class).newInstance(ignite);
+
+            String[] appParams = Arrays.copyOfRange(params, 2, params.length);
+
+            app.start(appParams);
+        }
+    }
+}
diff --git a/modules/ducktests/src/main/java/org/apache/ignite/internal/ducktest/utils/IgniteAwareApplication.java b/modules/ducktests/src/main/java/org/apache/ignite/internal/ducktest/utils/IgniteAwareApplication.java
new file mode 100644
index 0000000..525c76d
--- /dev/null
+++ b/modules/ducktests/src/main/java/org/apache/ignite/internal/ducktest/utils/IgniteAwareApplication.java
@@ -0,0 +1,185 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.ignite.internal.ducktest.utils;
+
+import java.util.Arrays;
+import org.apache.ignite.Ignite;
+import org.apache.ignite.internal.IgniteInterruptedCheckedException;
+import org.apache.ignite.internal.util.typedef.internal.U;
+import org.apache.log4j.LogManager;
+import org.apache.log4j.Logger;
+
+/**
+ *
+ */
+public abstract class IgniteAwareApplication {
+    /** Logger. */
+    protected static final Logger log = LogManager.getLogger(IgniteAwareApplication.class.getName());
+
+    /** App inited. */
+    private static final String APP_INITED = "IGNITE_APPLICATION_INITIALIZED";
+
+    /** App finished. */
+    private static final String APP_FINISHED = "IGNITE_APPLICATION_FINISHED";
+
+    /** App terminated. */
+    private static final String APP_TERMINATED = "IGNITE_APPLICATION_TERMINATED";
+
+    /** Inited. */
+    private static volatile boolean inited;
+
+    /** Finished. */
+    private static volatile boolean finished;
+
+    /** Terminated. */
+    private static volatile boolean terminated;
+
+    /** Ignite. */
+    protected final Ignite ignite;
+
+    /**
+     * Default constructor.
+     */
+    protected IgniteAwareApplication() {
+        ignite = null;
+    }
+
+    /**
+     *
+     */
+    protected IgniteAwareApplication(Ignite ignite) {
+        this.ignite = ignite;
+
+        Runtime.getRuntime().addShutdownHook(new Thread(() -> {
+            terminate();
+
+            while (!finished()) {
+                log.info("Waiting for graceful termnation.");
+
+                try {
+                    U.sleep(100);
+                }
+                catch (IgniteInterruptedCheckedException e) {
+                    e.printStackTrace();
+                }
+            }
+
+            log.info("SIGTERM recorded.");
+        }));
+
+        log.info("ShutdownHook registered.");
+    }
+
+    /**
+     * Used to marks as started to perform actions. Suitable for async runs.
+     */
+    protected void markInitialized() {
+        assert !inited;
+
+        log.info(APP_INITED);
+
+        inited = true;
+    }
+
+    /**
+     *
+     */
+    protected void markFinished() {
+        assert !finished;
+
+        log.info(APP_FINISHED);
+
+        finished = true;
+    }
+
+    /**
+     *
+     */
+    protected void markSyncExecutionComplete() {
+        markInitialized();
+        markFinished();
+    }
+
+    /**
+     *
+     */
+    private boolean finished() {
+        return finished;
+    }
+
+    /**
+     *
+     */
+    private void terminate() {
+        assert !terminated;
+
+        log.info(APP_TERMINATED);
+
+        terminated = true;
+    }
+
+    /**
+     *
+     */
+    protected boolean terminated() {
+        return terminated;
+    }
+
+    /**
+     * @param name Name.
+     * @param val Value.
+     */
+    protected void recordResult(String name, String val) {
+        assert !finished;
+
+        log.info(name + "->" + val + "<-");
+    }
+
+    /**
+     * @param name Name.
+     * @param val Value.
+     */
+    protected void recordResult(String name, long val) {
+        recordResult(name, String.valueOf(val));
+    }
+
+    /**
+     *
+     */
+    protected abstract void run(String[] args) throws Exception;
+
+    /**
+     * @param args Args.
+     */
+    public void start(String[] args) {
+        try {
+            log.info("Application params: " + Arrays.toString(args));
+
+            run(args);
+
+            assert inited : "Was not properly initialized.";
+            assert finished : "Was not properly finished.";
+        }
+        catch (Throwable th) {
+            log.error("Unexpected Application failure... ", th);
+        }
+        finally {
+            log.info("Application finished.");
+        }
+    }
+}
diff --git a/modules/ducktests/src/main/java/org/apache/ignite/internal/ducktest/utils/IgniteAwareApplicationService.java b/modules/ducktests/src/main/java/org/apache/ignite/internal/ducktest/utils/IgniteAwareApplicationService.java
new file mode 100644
index 0000000..a7836c2
--- /dev/null
+++ b/modules/ducktests/src/main/java/org/apache/ignite/internal/ducktest/utils/IgniteAwareApplicationService.java
@@ -0,0 +1,47 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.ignite.internal.ducktest.utils;
+
+import java.util.Arrays;
+import org.apache.log4j.LogManager;
+import org.apache.log4j.Logger;
+
+/**
+ *
+ */
+public class IgniteAwareApplicationService {
+    /** Logger. */
+    private static final Logger log = LogManager.getLogger(IgniteAwareApplicationService.class.getName());
+
+    /**
+     * @param args Args.
+     */
+    public static void main(String[] args) throws Exception {
+        log.info("Starting Application... [params=" + args[0] + "]");
+
+        String[] params = args[0].split(",");
+
+        Class<?> clazz = Class.forName(params[0]);
+
+        IgniteAwareApplication app = (IgniteAwareApplication)clazz.getConstructor().newInstance();
+
+        String[] appParams = Arrays.copyOfRange(params, 1, params.length);
+
+        app.start(appParams);
+    }
+}
diff --git a/modules/ducktests/src/main/java/org/apache/ignite/internal/test/IgniteApplication.java b/modules/ducktests/src/main/java/org/apache/ignite/internal/test/IgniteApplication.java
deleted file mode 100644
index d6d710f..0000000
--- a/modules/ducktests/src/main/java/org/apache/ignite/internal/test/IgniteApplication.java
+++ /dev/null
@@ -1,73 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements.  See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License.  You may obtain a copy of the License at
- *
- *      http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.ignite.internal.test;
-
-import org.apache.ignite.Ignite;
-import org.apache.ignite.IgniteCache;
-import org.apache.ignite.IgniteCheckedException;
-import org.apache.ignite.Ignition;
-import org.apache.ignite.cache.query.SqlFieldsQuery;
-import org.apache.ignite.configuration.IgniteConfiguration;
-import org.apache.ignite.internal.IgnitionEx;
-import org.apache.ignite.internal.processors.resource.GridSpringResourceContext;
-import org.apache.ignite.lang.IgniteBiTuple;
-
-/**
- *
- */
-public class IgniteApplication {
-    /** Config path. */
-    public static final String CONFIG_PATH = "/mnt/client_app/ignite-client-config.xml";
-
-    /**
-     * @param args Args.
-     */
-    public static void main(String[] args) throws IgniteCheckedException {
-        IgniteBiTuple<IgniteConfiguration, GridSpringResourceContext> cfgs = IgnitionEx.loadConfiguration(CONFIG_PATH);
-        IgniteConfiguration cfg = cfgs.get1();
-
-        cfg.setClientMode(true);
-
-        System.out.println("Starting Ignite client...");
-
-        try (Ignite ign = Ignition.start(cfg)) {
-            System.out.println("Creating cache...");
-
-            IgniteCache<Integer, Integer> cache = ign.createCache("test-cache");
-
-            for (int i = 0; i < 1000; i++)
-                cache.put(i, i);
-
-            executeSql(cache, "CREATE TABLE person(id INT, fio VARCHAR, PRIMARY KEY(id))");
-            executeSql(cache, "INSERT INTO person(id, fio) VALUES(?, ?)", 1, "Ivanov Ivan");
-            executeSql(cache, "INSERT INTO person(id, fio) VALUES(?, ?)", 2, "Petrov Petr");
-            executeSql(cache, "INSERT INTO person(id, fio) VALUES(?, ?)", 3, "Sidorov Sidr");
-
-            System.out.println("Ignite Client Finish.");
-        }
-    }
-
-    /**
-     * @param cache Cache.
-     * @param query Query.
-     * @param args Args.
-     */
-    private static void executeSql(IgniteCache<Integer, Integer> cache, String query, Object... args) {
-        cache.query(new SqlFieldsQuery(query).setArgs(args)).getAll();
-    }
-}
diff --git a/scripts/build.sh b/modules/ducktests/src/main/resources/log4j.properties
old mode 100755
new mode 100644
similarity index 71%
copy from scripts/build.sh
copy to modules/ducktests/src/main/resources/log4j.properties
index 333ba9e..ecfe84a
--- a/scripts/build.sh
+++ b/modules/ducktests/src/main/resources/log4j.properties
@@ -1,4 +1,3 @@
-#!/bin/bash
 #
 # Licensed to the Apache Software Foundation (ASF) under one or more
 # contributor license agreements.  See the NOTICE file distributed with
@@ -16,10 +15,11 @@
 # limitations under the License.
 #
 
-#
-# Builds project.
-# Run in Ignite sources root directory.
-# Usage: ./build.sh
-#
+# Root logger option
+log4j.rootLogger=INFO, stdout
 
-mvn clean package -Pall-java,all-scala -DskipTests -Dmaven.javadoc.skip=true
\ No newline at end of file
+# Direct log messages to stdout
+log4j.appender.stdout=org.apache.log4j.ConsoleAppender
+log4j.appender.stdout.Target=System.out
+log4j.appender.stdout.layout=org.apache.log4j.PatternLayout
+log4j.appender.stdout.layout.ConversionPattern=%d{ISO8601}][%-5p][%t][%c{1}] %m%n
diff --git a/modules/ducktests/tests/docker/Dockerfile b/modules/ducktests/tests/docker/Dockerfile
index 790331c..d083ff3 100644
--- a/modules/ducktests/tests/docker/Dockerfile
+++ b/modules/ducktests/tests/docker/Dockerfile
@@ -20,7 +20,7 @@ MAINTAINER Apache Ignite dev@ignite.apache.org
 VOLUME ["/opt/ignite-dev"]
 
 # Set the timezone.
-ENV TZ="/usr/share/zoneinfo/America/Los_Angeles"
+ENV TZ="/usr/share/zoneinfo/Europe/Moscow"
 
 # Do not ask for confirmations when running apt-get, etc.
 ENV DEBIAN_FRONTEND noninteractive
@@ -42,28 +42,22 @@ COPY ./ssh-config /root/.ssh/config
 RUN ssh-keygen -m PEM -q -t rsa -N '' -f /root/.ssh/id_rsa && cp -f /root/.ssh/id_rsa.pub /root/.ssh/authorized_keys
 RUN echo 'PermitUserEnvironment yes' >> /etc/ssh/sshd_config
 
-RUN chmod a+wr /opt
-
 ARG APACHE_MIRROR="https://apache-mirror.rbc.ru/pub/apache/"
 ARG APACHE_ARCHIVE="https://archive.apache.org/dist/"
 
 # Install binary test dependencies.
-ARG IGNITE_VERSION="2.8.0"
-ARG IGNITE_NAME="ignite-$IGNITE_VERSION"
-ARG IGNITE_RELEASE_NAME="apache-ignite-$IGNITE_VERSION-bin"
+RUN cd /opt && curl -O $APACHE_ARCHIVE/ignite/2.7.6/apache-ignite-2.7.6-bin.zip && unzip apache-ignite-2.7.6-bin.zip && mv /opt/apache-ignite-2.7.6-bin /opt/ignite-2.7.6
+RUN cd /opt && curl -O $APACHE_ARCHIVE/ignite/2.8.0/apache-ignite-2.8.0-bin.zip && unzip apache-ignite-2.8.0-bin.zip && mv /opt/apache-ignite-2.8.0-bin /opt/ignite-2.8.0
+RUN cd /opt && curl -O $APACHE_ARCHIVE/ignite/2.8.1/apache-ignite-2.8.1-bin.zip && unzip apache-ignite-2.8.1-bin.zip && mv /opt/apache-ignite-2.8.1-bin /opt/ignite-2.8.1
 
-ADD $APACHE_ARCHIVE/ignite/$IGNITE_VERSION/$IGNITE_RELEASE_NAME.zip /opt/
-RUN cd /opt && unzip $IGNITE_RELEASE_NAME.zip && rm $IGNITE_RELEASE_NAME.zip
-RUN mv /opt/$IGNITE_RELEASE_NAME /opt/$IGNITE_NAME
-RUN chmod a+wr /opt/$IGNITE_NAME -R
+RUN rm /opt/apache-ignite-*-bin.zip
 
 # Install spark
 ARG SPARK_VERSION="2.3.4"
 ARG SPARK_NAME="spark-$SPARK_VERSION"
 ARG SPARK_RELEASE_NAME="spark-$SPARK_VERSION-bin-hadoop2.7"
 
-ADD $APACHE_MIRROR/spark/$SPARK_NAME/$SPARK_RELEASE_NAME.tgz /opt/
-RUN cd /opt && tar xvf $SPARK_RELEASE_NAME.tgz && rm $SPARK_RELEASE_NAME.tgz
+RUN cd /opt && curl -O $APACHE_MIRROR/spark/$SPARK_NAME/$SPARK_RELEASE_NAME.tgz && tar xvf $SPARK_RELEASE_NAME.tgz && rm $SPARK_RELEASE_NAME.tgz
 RUN mv /opt/$SPARK_RELEASE_NAME /opt/$SPARK_NAME
 RUN chmod a+wr /opt/$SPARK_NAME -R
 
diff --git a/modules/ducktests/tests/docker/ducker-ignite b/modules/ducktests/tests/docker/ducker-ignite
index 806e529..e41212c 100755
--- a/modules/ducktests/tests/docker/ducker-ignite
+++ b/modules/ducktests/tests/docker/ducker-ignite
@@ -197,7 +197,7 @@ must_do() {
 # Ask the user a yes/no question.
 #
 # $1: The prompt to use
-# $_return: 0 if the user answered no; 1 if the user anМинус - создаст дополнительную нагрузку на core team.swered yes.
+# $_return: 0 if the user answered no; 1 if the user answered yes.
 ask_yes_no() {
     local prompt="${1}"
     while true; do
@@ -258,7 +258,7 @@ docker_run() {
     must_do -v docker run --privileged \
         -d -t -h "${node}" --network ducknet "${expose_ports}" \
         --memory=${docker_run_memory_limit} --memory-swappiness=1 \
-        -v "${ignite_dir}:/opt/ignite-dev" --name "${node}" -- "${image_name}"
+        -v "${ignite_dir}:/opt/ignite-dev:delegated" --name "${node}" -- "${image_name}"
 }
 
 setup_custom_ducktape() {
diff --git a/modules/ducktests/tests/ignitetest/benchmarks/add_node_rebalance_test.py b/modules/ducktests/tests/ignitetest/benchmarks/add_node_rebalance_test.py
deleted file mode 100644
index 828ec5a..0000000
--- a/modules/ducktests/tests/ignitetest/benchmarks/add_node_rebalance_test.py
+++ /dev/null
@@ -1,56 +0,0 @@
-# Licensed to the Apache Software Foundation (ASF) under one or more
-# contributor license agreements.  See the NOTICE file distributed with
-# this work for additional information regarding copyright ownership.
-# The ASF licenses this file to You under the Apache License, Version 2.0
-# (the "License"); you may not use this file except in compliance with
-# the License.  You may obtain a copy of the License at
-#
-#    http://www.apache.org/licenses/LICENSE-2.0
-#
-# Unless required by applicable law or agreed to in writing, software
-# distributed under the License is distributed on an "AS IS" BASIS,
-# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
-# See the License for the specific language governing permissions and
-# limitations under the License.
-
-from ducktape.tests.test import Test
-
-from ignitetest.services.ignite import IgniteService
-from ignitetest.services.ignite_client_app import IgniteClientApp
-
-
-class AddNodeRebalanceTest(Test):
-    NUM_NODES = 3
-    REBALANCE_TIMEOUT = 600
-
-    """
-    Test performs rebalance tests.
-    """
-    def __init__(self, test_context):
-        super(AddNodeRebalanceTest, self).__init__(test_context=test_context)
-        self.ignite = IgniteService(test_context, num_nodes=AddNodeRebalanceTest.NUM_NODES)
-
-    def setUp(self):
-        # starting all nodes except last.
-        for i in range(AddNodeRebalanceTest.NUM_NODES-1):
-            self.ignite.start_node(self.ignite.nodes[i])
-
-    def teardown(self):
-        self.ignite.stop()
-
-    def test_add_node(self):
-        """
-        Test performs add node rebalance test which consists of following steps:
-            * Start cluster.
-            * Put data to it via IgniteClientApp.
-            * Start one more node and awaits for rebalance to finish.
-        """
-        self.logger.info("Start add node rebalance test.")
-
-        # This client just put some data to the cache.
-        IgniteClientApp(self.test_context,
-                        java_class_name="org.apache.ignite.internal.test.IgniteApplication").run()
-
-        self.ignite.start_node(self.ignite.nodes[AddNodeRebalanceTest.NUM_NODES-1],
-                               timeout_sec=AddNodeRebalanceTest.REBALANCE_TIMEOUT,
-                               wait_for_rebalance = True)
diff --git a/modules/ducktests/tests/ignitetest/services/__init__.py b/modules/ducktests/tests/ignitetest/services/__init__.py
index e556dc9..ec20143 100644
--- a/modules/ducktests/tests/ignitetest/services/__init__.py
+++ b/modules/ducktests/tests/ignitetest/services/__init__.py
@@ -12,4 +12,3 @@
 # WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
 # See the License for the specific language governing permissions and
 # limitations under the License.
-
diff --git a/modules/ducktests/tests/ignitetest/services/ignite.py b/modules/ducktests/tests/ignitetest/services/ignite.py
index 5559059..07b562f 100644
--- a/modules/ducktests/tests/ignitetest/services/ignite.py
+++ b/modules/ducktests/tests/ignitetest/services/ignite.py
@@ -20,22 +20,17 @@ from ducktape.cluster.remoteaccount import RemoteCommandError
 from ducktape.services.service import Service
 from ducktape.utils.util import wait_until
 
-from ignitetest.services.utils.ignite_config import IgniteConfig
-from ignitetest.services.utils.ignite_path import IgnitePath
+from ignitetest.services.utils.ignite_aware import IgniteAwareService
 from ignitetest.version import DEV_BRANCH
 
 
-class IgniteService(Service):
-    PERSISTENT_ROOT = "/mnt/ignite"
-    WORK_DIR = os.path.join(PERSISTENT_ROOT, "work")
-    CONFIG_FILE = os.path.join(PERSISTENT_ROOT, "ignite-config.xml")
-    LOG4J_CONFIG_FILE = os.path.join(PERSISTENT_ROOT, "ignite-log4j.xml")
-    HEAP_DUMP_FILE = os.path.join(PERSISTENT_ROOT, "ignite-heap.bin")
-    STDOUT_STDERR_CAPTURE = os.path.join(PERSISTENT_ROOT, "console.log")
+class IgniteService(IgniteAwareService):
+    APP_SERVICE_CLASS = "org.apache.ignite.startup.cmdline.CommandLineStartup"
+    HEAP_DUMP_FILE = os.path.join(IgniteAwareService.PERSISTENT_ROOT, "ignite-heap.bin")
 
     logs = {
         "console_log": {
-            "path": STDOUT_STDERR_CAPTURE,
+            "path": IgniteAwareService.STDOUT_STDERR_CAPTURE,
             "collect_default": True},
 
         "heap_dump": {
@@ -43,56 +38,35 @@ class IgniteService(Service):
             "collect_default": False}
     }
 
-    def __init__(self, context, num_nodes=3, version=DEV_BRANCH):
-        """
-        :param context: test context
-        :param num_nodes: number of Ignite nodes.
-        """
-        Service.__init__(self, context, num_nodes)
+    def __init__(self, context, num_nodes, version=DEV_BRANCH, properties=""):
+        IgniteAwareService.__init__(self, context, num_nodes, version, properties)
 
-        self.log_level = "DEBUG"
-        self.config = IgniteConfig()
-        self.path = IgnitePath()
-
-        for node in self.nodes:
-            node.version = version
-
-    def start(self):
+    def start(self, timeout_sec=180):
         Service.start(self)
 
-        self.logger.info("Waiting for Ignite to start...")
+        self.logger.info("Waiting for Ignite(s) to start...")
+
+        for node in self.nodes:
+            self.await_node_stated(node, timeout_sec)
 
     def start_cmd(self, node):
         jvm_opts = "-J-DIGNITE_SUCCESS_FILE=" + IgniteService.PERSISTENT_ROOT + "/success_file "
-        jvm_opts += "-J-Dlog4j.configDebug=true"
+        jvm_opts += "-J-Dlog4j.configDebug=true "
+        jvm_opts += "-J-XX:+UnlockExperimentalVMOptions -J-XX:+UseCGroupMemoryLimitForHeap"  # java8 docker fix
 
         cmd = "export EXCLUDE_TEST_CLASSES=true; "
         cmd += "export IGNITE_LOG_DIR=" + IgniteService.PERSISTENT_ROOT + "; "
+        cmd += "export USER_LIBS=%s/libs/optional/ignite-log4j/*; " % self.path.home(self.version)
         cmd += "%s %s %s 1>> %s 2>> %s &" % \
-              (self.path.script("ignite.sh", node),
-               jvm_opts,
-               IgniteService.CONFIG_FILE,
-               IgniteService.STDOUT_STDERR_CAPTURE,
-               IgniteService.STDOUT_STDERR_CAPTURE)
+               (self.path.script("ignite.sh", node),
+                jvm_opts,
+                IgniteService.CONFIG_FILE,
+                IgniteService.STDOUT_STDERR_CAPTURE,
+                IgniteService.STDOUT_STDERR_CAPTURE)
         return cmd
 
-    def start_node(self, node, timeout_sec=180, wait_for_rebalance=False):
-        node.account.mkdirs(IgniteService.PERSISTENT_ROOT)
-        node.account.create_file(IgniteService.CONFIG_FILE,
-                                 self.config.render(IgniteService.PERSISTENT_ROOT, IgniteService.WORK_DIR))
-        node.account.create_file(IgniteService.LOG4J_CONFIG_FILE, self.config.render_log4j(IgniteService.WORK_DIR))
-
-        cmd = self.start_cmd(node)
-        self.logger.debug("Attempting to start IgniteService on %s with command: %s" % (str(node.account), cmd))
-
-        wait_for_message = "Topology snapshot"
-        if wait_for_rebalance:
-            wait_for_message = "Completed (final) rebalancing \[grp=test-cache"
-
-        with node.account.monitor_log(IgniteService.STDOUT_STDERR_CAPTURE) as monitor:
-            node.account.ssh(cmd)
-            monitor.wait_until(wait_for_message, timeout_sec=timeout_sec, backoff_sec=5,
-                               err_msg="Ignite server didn't finish startup in %d seconds" % timeout_sec)
+    def await_node_stated(self, node, timeout_sec):
+        self.await_event_on_node("Topology snapshot", node, timeout_sec, from_the_beginning=True)
 
         if len(self.pids(node)) == 0:
             raise Exception("No process ids recorded on node %s" % node.account.hostname)
@@ -112,8 +86,7 @@ class IgniteService(Service):
             raise
 
     def clean_node(self, node):
-        node.account.kill_java_processes(self.java_class_name(),
-                                         clean_shutdown=False, allow_fail=True)
+        node.account.kill_java_processes(self.APP_SERVICE_CLASS, clean_shutdown=False, allow_fail=True)
         node.account.ssh("sudo rm -rf -- %s" % IgniteService.PERSISTENT_ROOT, allow_fail=False)
 
     def thread_dump(self, node):
@@ -126,19 +99,8 @@ class IgniteService(Service):
     def pids(self, node):
         """Return process ids associated with running processes on the given node."""
         try:
-            cmd = "jcmd | grep -e %s | awk '{print $1}'" % self.java_class_name()
+            cmd = "jcmd | grep -e %s | awk '{print $1}'" % self.APP_SERVICE_CLASS
             pid_arr = [pid for pid in node.account.ssh_capture(cmd, allow_fail=True, callback=int)]
             return pid_arr
         except (RemoteCommandError, ValueError) as e:
             return []
-
-    def java_class_name(self):
-        return "org.apache.ignite.startup.cmdline.CommandLineStartup"
-
-    def set_version(self, version):
-        for node in self.nodes:
-            node.version = version
-
-    def alive(self, node):
-        return len(self.pids(node)) > 0
-
diff --git a/modules/ducktests/tests/ignitetest/services/ignite_app.py b/modules/ducktests/tests/ignitetest/services/ignite_app.py
new file mode 100644
index 0000000..87cb8e1
--- /dev/null
+++ b/modules/ducktests/tests/ignitetest/services/ignite_app.py
@@ -0,0 +1,38 @@
+# Licensed to the Apache Software Foundation (ASF) under one or more
+# contributor license agreements.  See the NOTICE file distributed with
+# this work for additional information regarding copyright ownership.
+# The ASF licenses this file to You under the Apache License, Version 2.0
+# (the "License"); you may not use this file except in compliance with
+# the License.  You may obtain a copy of the License at
+#
+#    http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing, software
+# distributed under the License is distributed on an "AS IS" BASIS,
+# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+# See the License for the specific language governing permissions and
+# limitations under the License.
+
+from ducktape.services.service import Service
+
+from ignitetest.services.utils.ignite_aware_app import IgniteAwareApplicationService
+from ignitetest.version import DEV_BRANCH
+
+"""
+The Ignite application service allows to perform custom logic writen on java.
+"""
+
+
+class IgniteApplicationService(IgniteAwareApplicationService):
+    def __init__(self, context, java_class_name, version=DEV_BRANCH, properties="", params="", timeout_sec=60):
+        IgniteAwareApplicationService.__init__(
+            self, context, java_class_name, version, properties, params, timeout_sec,
+            service_java_class_name="org.apache.ignite.internal.ducktest.utils.IgniteApplicationService")
+
+    def start(self):
+        Service.start(self)
+
+        self.logger.info("Waiting for Ignite Application (%s) to start..." % self.java_class_name)
+
+        self.await_event("Topology snapshot", self.timeout_sec, from_the_beginning=True)
+        self.await_event("IGNITE_APPLICATION_INITIALIZED", self.timeout_sec, from_the_beginning=True)
diff --git a/modules/ducktests/tests/ignitetest/services/ignite_client_app.py b/modules/ducktests/tests/ignitetest/services/ignite_client_app.py
deleted file mode 100644
index 2474abc..0000000
--- a/modules/ducktests/tests/ignitetest/services/ignite_client_app.py
+++ /dev/null
@@ -1,151 +0,0 @@
-# Licensed to the Apache Software Foundation (ASF) under one or more
-# contributor license agreements.  See the NOTICE file distributed with
-# this work for additional information regarding copyright ownership.
-# The ASF licenses this file to You under the Apache License, Version 2.0
-# (the "License"); you may not use this file except in compliance with
-# the License.  You may obtain a copy of the License at
-#
-#    http://www.apache.org/licenses/LICENSE-2.0
-#
-# Unless required by applicable law or agreed to in writing, software
-# distributed under the License is distributed on an "AS IS" BASIS,
-# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
-# See the License for the specific language governing permissions and
-# limitations under the License.
-
-import os
-
-from ducktape.services.background_thread import BackgroundThreadService
-
-from ignitetest.services.utils.ignite_config import IgniteConfig
-from ignitetest.services.utils.ignite_path import IgnitePath
-from ignitetest.version import DEV_BRANCH
-
-"""
-The Ignite client application is a main class that implements custom logic.
-First CMD param is an absolute path to the Ignite config file.
-"""
-
-
-class IgniteClientApp(BackgroundThreadService):
-    # Root directory for persistent output
-    PERSISTENT_ROOT = "/mnt/client_app"
-    STDOUT_STDERR_CAPTURE = os.path.join(PERSISTENT_ROOT, "console.log")
-    WORK_DIR = os.path.join(PERSISTENT_ROOT, "work")
-    CLIENT_CONFIG_FILE = os.path.join(PERSISTENT_ROOT, "ignite-client-config.xml")
-    LOG4J_CONFIG_FILE = os.path.join(PERSISTENT_ROOT, "ignite-log4j.xml")
-
-    logs = {
-        "console_log": {
-            "path": STDOUT_STDERR_CAPTURE,
-            "collect_default": True}
-    }
-
-    def __init__(self, context, java_class_name, version=DEV_BRANCH, num_nodes=1):
-        """
-        Args:
-            num_nodes:                  number of nodes to use (this should be 1)
-        """
-        BackgroundThreadService.__init__(self, context, num_nodes)
-
-        self.log_level = "DEBUG"
-        self.config = IgniteConfig()
-        self.path = IgnitePath()
-        self.java_class_name = java_class_name
-        self.timeout_sec = 60
-        self.stop_timeout_sec = 10
-
-        for node in self.nodes:
-            node.version = version
-
-    def start_cmd(self, node):
-        """Return the start command appropriate for the given node."""
-
-        cmd = self.env()
-        cmd += "%s %s %s 1>> %s 2>> %s " % \
-               (self.path.script("ignite.sh", node),
-                self.jvm_opts(),
-                self.app_args(),
-                IgniteClientApp.STDOUT_STDERR_CAPTURE,
-                IgniteClientApp.STDOUT_STDERR_CAPTURE)
-        return cmd
-
-    def start_node(self, node):
-        BackgroundThreadService.start_node(self, node)
-
-    def stop_node(self, node):
-        self.logger.info("%s Stopping node %s" % (self.__class__.__name__, str(node.account)))
-        node.account.kill_java_processes(self.java_class_name,
-                                         clean_shutdown=True,
-                                         allow_fail=True)
-
-        stopped = self.wait_node(node, timeout_sec=self.stop_timeout_sec)
-        assert stopped, "Node %s: did not stop within the specified timeout of %s seconds" % \
-                        (str(node.account), str(self.stop_timeout_sec))
-
-    def clean_node(self, node):
-        if self.alive(node):
-            self.logger.warn("%s %s was still alive at cleanup time. Killing forcefully..." %
-                             (self.__class__.__name__, node.account))
-
-        node.account.kill_java_processes(self.java_class_name,
-                                         clean_shutdown=False,
-                                         allow_fail=True)
-
-        node.account.ssh("rm -rf %s" % IgniteClientApp.PERSISTENT_ROOT, allow_fail=False)
-
-    def pids(self, node):
-        return node.account.java_pids(self.java_class_name)
-
-    def alive(self, node):
-        return len(self.pids(node)) > 0
-
-    def _worker(self, idx, node):
-        create_client_configs(node, self.config)
-
-        # Just run application.
-        cmd = self.start_cmd(node)
-        self.logger.info("Ignite client application command: %s", cmd)
-
-        with node.account.monitor_log(IgniteClientApp.STDOUT_STDERR_CAPTURE) as monitor:
-            node.account.ssh(cmd, allow_fail=False)
-            monitor.wait_until("Ignite Client Finish.", timeout_sec=self.timeout_sec, backoff_sec=5,
-                               err_msg="Ignite client don't finish before timeout %s" % self.timeout_sec)
-
-    def app_args(self):
-        return IgniteClientApp.CLIENT_CONFIG_FILE
-
-    def jvm_opts(self):
-        return "-J-DIGNITE_SUCCESS_FILE=" + IgniteClientApp.PERSISTENT_ROOT + "/success_file " + \
-               "-J-Dlog4j.configDebug=true " \
-               "-J-Xmx1G"
-
-    def env(self):
-        return "export MAIN_CLASS={main_class}; ".format(main_class=self.java_class_name) + \
-               "export EXCLUDE_TEST_CLASSES=true; " + \
-               "export IGNITE_LOG_DIR={log_dir}; ".format(log_dir=IgniteClientApp.PERSISTENT_ROOT)
-
-
-class SparkIgniteClientApp(IgniteClientApp):
-    def __init__(self, context, master_node):
-        IgniteClientApp.__init__(self, context, java_class_name="org.apache.ignite.internal.test.SparkApplication")
-        self.master_node = master_node
-        self.timeout_sec = 120
-
-    def app_args(self):
-        return " spark://" + self.master_node.account.hostname + ":7077"
-
-    def env(self):
-        return IgniteClientApp.env(self) + \
-               "export EXCLUDE_MODULES=\"kubernetes,aws,gce,mesos,rest-http,web-agent,zookeeper,serializers,store," \
-               "rocketmq\"; "
-
-
-def create_client_configs(node, config):
-    node.account.mkdirs(IgniteClientApp.PERSISTENT_ROOT)
-    node.account.create_file(IgniteClientApp.CLIENT_CONFIG_FILE,
-                             config.render(IgniteClientApp.PERSISTENT_ROOT,
-                                           IgniteClientApp.WORK_DIR,
-                                           "true"))
-    node.account.create_file(IgniteClientApp.LOG4J_CONFIG_FILE,
-                             config.render_log4j(IgniteClientApp.WORK_DIR))
diff --git a/modules/ducktests/tests/ignitetest/services/__init__.py b/modules/ducktests/tests/ignitetest/services/ignite_spark_app.py
similarity index 52%
copy from modules/ducktests/tests/ignitetest/services/__init__.py
copy to modules/ducktests/tests/ignitetest/services/ignite_spark_app.py
index e556dc9..dffe7ac 100644
--- a/modules/ducktests/tests/ignitetest/services/__init__.py
+++ b/modules/ducktests/tests/ignitetest/services/ignite_spark_app.py
@@ -13,3 +13,19 @@
 # See the License for the specific language governing permissions and
 # limitations under the License.
 
+"""
+The Ignite-Spark application service.
+"""
+from ignitetest.services.utils.ignite_aware_app import IgniteAwareApplicationService
+from ignitetest.version import DEV_BRANCH
+
+
+class SparkIgniteApplicationService(IgniteAwareApplicationService):
+    def __init__(self, context, java_class_name, version=DEV_BRANCH, properties="", params="", timeout_sec=60):
+        IgniteAwareApplicationService.__init__(
+            self, context, java_class_name, version, properties, params, timeout_sec)
+
+    def env(self):
+        return IgniteAwareApplicationService.env(self) + \
+               "export EXCLUDE_MODULES=\"kubernetes,aws,gce,mesos,rest-http,web-agent,zookeeper,serializers,store," \
+               "rocketmq\"; "
diff --git a/modules/ducktests/tests/ignitetest/services/spark.py b/modules/ducktests/tests/ignitetest/services/spark.py
index 9945fd5..731bcd5 100644
--- a/modules/ducktests/tests/ignitetest/services/spark.py
+++ b/modules/ducktests/tests/ignitetest/services/spark.py
@@ -18,22 +18,23 @@ import os.path
 from ducktape.cluster.remoteaccount import RemoteCommandError
 from ducktape.services.service import Service
 
+from ignitetest.services.utils.ignite_aware import IgniteAwareService
 from ignitetest.services.utils.ignite_config import IgniteConfig
-from ignitetest.services.ignite_client_app import IgniteClientApp, create_client_configs
+from ignitetest.version import DEV_BRANCH
 
 
-class SparkService(Service):
+class SparkService(IgniteAwareService):
     INSTALL_DIR = "/opt/spark-{version}".format(version="2.3.4")
-    PERSISTENT_ROOT = "/mnt/spark"
+    SPARK_PERSISTENT_ROOT = "/mnt/spark"
 
     logs = {}
 
-    def __init__(self, context, num_nodes=3):
+    def __init__(self, context, version=DEV_BRANCH, num_nodes=3, properties=""):
         """
         :param context: test context
         :param num_nodes: number of Ignite nodes.
         """
-        Service.__init__(self, context, num_nodes)
+        IgniteAwareService.__init__(self, context, num_nodes, version, properties)
 
         self.log_level = "DEBUG"
         self.ignite_config = IgniteConfig()
@@ -61,14 +62,14 @@ class SparkService(Service):
 
         start_script = os.path.join(SparkService.INSTALL_DIR, "sbin", script)
 
-        cmd = "export SPARK_LOG_DIR={spark_dir}; ".format(spark_dir=SparkService.PERSISTENT_ROOT)
-        cmd += "export SPARK_WORKER_DIR={spark_dir}; ".format(spark_dir=SparkService.PERSISTENT_ROOT)
+        cmd = "export SPARK_LOG_DIR={spark_dir}; ".format(spark_dir=SparkService.SPARK_PERSISTENT_ROOT)
+        cmd += "export SPARK_WORKER_DIR={spark_dir}; ".format(spark_dir=SparkService.SPARK_PERSISTENT_ROOT)
         cmd += "{start_script} &".format(start_script=start_script)
 
         return cmd
 
     def start_node(self, node, timeout_sec=30):
-        create_client_configs(node, self.ignite_config)
+        self.init_persistent(node)
 
         cmd = self.start_cmd(node)
         self.logger.debug("Attempting to start SparkService on %s with command: %s" % (str(node.account), cmd))
@@ -99,7 +100,7 @@ class SparkService(Service):
     def clean_node(self, node):
         node.account.kill_java_processes(self.java_class_name(node),
                                          clean_shutdown=False, allow_fail=True)
-        node.account.ssh("sudo rm -rf -- %s" % SparkService.PERSISTENT_ROOT, allow_fail=False)
+        node.account.ssh("sudo rm -rf -- %s" % SparkService.SPARK_PERSISTENT_ROOT, allow_fail=False)
 
     def pids(self, node):
         """Return process ids associated with running processes on the given node."""
@@ -116,19 +117,16 @@ class SparkService(Service):
         else:
             return "org.apache.spark.deploy.worker.Worker"
 
-    def alive(self, node):
-        return len(self.pids(node)) > 0
-
     def master_log_path(self, node):
         return "{SPARK_LOG_DIR}/spark-{userID}-org.apache.spark.deploy.master.Master-{instance}-{host}.out".format(
-            SPARK_LOG_DIR=SparkService.PERSISTENT_ROOT,
+            SPARK_LOG_DIR=SparkService.SPARK_PERSISTENT_ROOT,
             userID=node.account.user,
             instance=1,
             host=node.account.hostname)
 
     def slave_log_path(self, node):
         return "{SPARK_LOG_DIR}/spark-{userID}-org.apache.spark.deploy.worker.Worker-{instance}-{host}.out".format(
-            SPARK_LOG_DIR=SparkService.PERSISTENT_ROOT,
+            SPARK_LOG_DIR=SparkService.SPARK_PERSISTENT_ROOT,
             userID=node.account.user,
             instance=1,
             host=node.account.hostname)
diff --git a/modules/ducktests/tests/ignitetest/services/utils/ignite_aware.py b/modules/ducktests/tests/ignitetest/services/utils/ignite_aware.py
new file mode 100644
index 0000000..dd82feb
--- /dev/null
+++ b/modules/ducktests/tests/ignitetest/services/utils/ignite_aware.py
@@ -0,0 +1,105 @@
+# Licensed to the Apache Software Foundation (ASF) under one or more
+# contributor license agreements.  See the NOTICE file distributed with
+# this work for additional information regarding copyright ownership.
+# The ASF licenses this file to You under the Apache License, Version 2.0
+# (the "License"); you may not use this file except in compliance with
+# the License.  You may obtain a copy of the License at
+#
+#    http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing, software
+# distributed under the License is distributed on an "AS IS" BASIS,
+# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+# See the License for the specific language governing permissions and
+# limitations under the License.
+
+import os
+from abc import abstractmethod
+
+from ducktape.services.background_thread import BackgroundThreadService
+
+from ignitetest.services.utils.ignite_config import IgniteConfig
+from ignitetest.services.utils.ignite_path import IgnitePath
+
+"""
+The base class to build services aware of Ignite.
+"""
+
+
+class IgniteAwareService(BackgroundThreadService):
+    # Root directory for persistent output
+    PERSISTENT_ROOT = "/mnt/service"
+    STDOUT_STDERR_CAPTURE = os.path.join(PERSISTENT_ROOT, "console.log")
+    WORK_DIR = os.path.join(PERSISTENT_ROOT, "work")
+    CONFIG_FILE = os.path.join(PERSISTENT_ROOT, "ignite-config.xml")
+    LOG4J_CONFIG_FILE = os.path.join(PERSISTENT_ROOT, "ignite-log4j.xml")
+
+    logs = {
+        "console_log": {
+            "path": STDOUT_STDERR_CAPTURE,
+            "collect_default": True}
+    }
+
+    def __init__(self, context, num_nodes, version, properties):
+        BackgroundThreadService.__init__(self, context, num_nodes)
+
+        self.log_level = "DEBUG"
+        self.config = IgniteConfig()
+        self.path = IgnitePath()
+        self.properties = properties
+        self.version = version
+
+        for node in self.nodes:
+            node.version = version
+
+    def start_node(self, node):
+        self.init_persistent(node)
+
+        BackgroundThreadService.start_node(self, node)
+
+    def init_persistent(self, node):
+        node.account.mkdirs(self.PERSISTENT_ROOT)
+        node.account.create_file(self.CONFIG_FILE, self.config.render(
+            self.PERSISTENT_ROOT, self.WORK_DIR, properties=self.properties))
+        node.account.create_file(self.LOG4J_CONFIG_FILE, self.config.render_log4j(self.WORK_DIR))
+
+    @abstractmethod
+    def start_cmd(self, node):
+        raise NotImplementedError
+
+    @abstractmethod
+    def pids(self, node):
+        raise NotImplementedError
+
+    def _worker(self, idx, node):
+        cmd = self.start_cmd(node)
+
+        self.logger.debug("Attempting to start Application Service on %s with command: %s" % (str(node.account), cmd))
+
+        node.account.ssh(cmd)
+
+    def alive(self, node):
+        return len(self.pids(node)) > 0
+
+    def await_event_on_node(self, evt_message, node, timeout_sec, from_the_beginning=False, backoff_sec=5):
+        with node.account.monitor_log(self.STDOUT_STDERR_CAPTURE) as monitor:
+            if from_the_beginning:
+                monitor.offset = 0
+
+            monitor.wait_until(evt_message, timeout_sec=timeout_sec, backoff_sec=backoff_sec,
+                               err_msg="Event [%s] was not triggered in %d seconds" % (evt_message, timeout_sec))
+
+    def await_event(self, evt_message, timeout_sec, from_the_beginning=False, backoff_sec=5):
+        assert len(self.nodes) == 1
+
+        self.await_event_on_node(evt_message, self.nodes[0], timeout_sec, from_the_beginning=from_the_beginning,
+                                 backoff_sec=backoff_sec)
+
+    def execute(self, command):
+        for node in self.nodes:
+            cmd = "%s 1>> %s 2>> %s" % \
+                  (self.path.script(command, node),
+                   self.STDOUT_STDERR_CAPTURE,
+                   self.STDOUT_STDERR_CAPTURE)
+
+            node.account.ssh(cmd)
diff --git a/modules/ducktests/tests/ignitetest/services/utils/ignite_aware_app.py b/modules/ducktests/tests/ignitetest/services/utils/ignite_aware_app.py
new file mode 100644
index 0000000..f569c12
--- /dev/null
+++ b/modules/ducktests/tests/ignitetest/services/utils/ignite_aware_app.py
@@ -0,0 +1,108 @@
+# Licensed to the Apache Software Foundation (ASF) under one or more
+# contributor license agreements.  See the NOTICE file distributed with
+# this work for additional information regarding copyright ownership.
+# The ASF licenses this file to You under the Apache License, Version 2.0
+# (the "License"); you may not use this file except in compliance with
+# the License.  You may obtain a copy of the License at
+#
+#    http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing, software
+# distributed under the License is distributed on an "AS IS" BASIS,
+# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+# See the License for the specific language governing permissions and
+# limitations under the License.
+import re
+
+from ducktape.services.service import Service
+
+from ignitetest.services.utils.ignite_aware import IgniteAwareService
+
+"""
+The base class to build Ignite aware application written on java.
+"""
+
+
+class IgniteAwareApplicationService(IgniteAwareService):
+    def __init__(self, context, java_class_name, version, properties, params, timeout_sec,
+                 service_java_class_name="org.apache.ignite.internal.ducktest.utils.IgniteAwareApplicationService"):
+        IgniteAwareService.__init__(self, context, 1, version, properties)
+
+        self.servicejava_class_name = service_java_class_name
+        self.java_class_name = java_class_name
+        self.timeout_sec = timeout_sec
+        self.stop_timeout_sec = 10
+        self.params = params
+
+    def start(self):
+        Service.start(self)
+
+        self.logger.info("Waiting for Ignite aware Application (%s) to start..." % self.java_class_name)
+
+        self.await_event("IGNITE_APPLICATION_INITIALIZED", self.timeout_sec, from_the_beginning=True)
+
+    def start_cmd(self, node):
+        cmd = self.env()
+        cmd += "%s %s %s 1>> %s 2>> %s &" % \
+               (self.path.script("ignite.sh", node),
+                self.jvm_opts(),
+                self.app_args(),
+                self.STDOUT_STDERR_CAPTURE,
+                self.STDOUT_STDERR_CAPTURE)
+        return cmd
+
+    def stop_node(self, node, clean_shutdown=True, timeout_sec=20):
+        self.logger.info("%s Stopping node %s" % (self.__class__.__name__, str(node.account)))
+        node.account.kill_java_processes(self.servicejava_class_name, clean_shutdown=True, allow_fail=True)
+
+        stopped = self.wait_node(node, timeout_sec=self.stop_timeout_sec)
+        assert stopped, "Node %s: did not stop within the specified timeout of %s seconds" % \
+                        (str(node.account), str(self.stop_timeout_sec))
+
+        self.await_event("IGNITE_APPLICATION_FINISHED", from_the_beginning=True, timeout_sec=timeout_sec)
+
+    def clean_node(self, node):
+        if self.alive(node):
+            self.logger.warn("%s %s was still alive at cleanup time. Killing forcefully..." %
+                             (self.__class__.__name__, node.account))
+
+        node.account.kill_java_processes(self.servicejava_class_name, clean_shutdown=False, allow_fail=True)
+
+        node.account.ssh("rm -rf %s" % self.PERSISTENT_ROOT, allow_fail=False)
+
+    def app_args(self):
+        args = self.java_class_name + "," + IgniteAwareApplicationService.CONFIG_FILE
+
+        if self.params != "":
+            args += "," + self.params
+
+        return args
+
+    def pids(self, node):
+        return node.account.java_pids(self.servicejava_class_name)
+
+    def jvm_opts(self):
+        return "-J-DIGNITE_SUCCESS_FILE=" + self.PERSISTENT_ROOT + "/success_file " + \
+               "-J-Dlog4j.configDebug=true " \
+               "-J-Xmx1G " \
+               "-J-ea " \
+               "-J-DIGNITE_ALLOW_ATOMIC_OPS_IN_TX=false " \
+               "-J-XX:+UnlockExperimentalVMOptions -J-XX:+UseCGroupMemoryLimitForHeap"  # java8 docker fix
+
+    def env(self):
+        return "export MAIN_CLASS={main_class}; ".format(main_class=self.servicejava_class_name) + \
+               "export EXCLUDE_TEST_CLASSES=true; " + \
+               "export IGNITE_LOG_DIR={log_dir}; ".format(log_dir=self.PERSISTENT_ROOT) + \
+               "export USER_LIBS=%s/libs/optional/ignite-log4j/*:/opt/ignite-dev/modules/ducktests/target/*; " \
+               % self.path.home(self.version)
+
+    def extract_result(self, name):
+        res = ""
+
+        output = self.nodes[0].account.ssh_capture(
+            "grep '%s' %s" % (name + "->", self.STDOUT_STDERR_CAPTURE), allow_fail=False)
+
+        for line in output:
+            res = re.search("%s(.*)%s" % (name + "->", "<-"), line).group(1)
+
+        return res
diff --git a/modules/ducktests/tests/ignitetest/services/utils/ignite_config.py b/modules/ducktests/tests/ignitetest/services/utils/ignite_config.py
index f8bd92b..5ffaa81 100644
--- a/modules/ducktests/tests/ignitetest/services/utils/ignite_config.py
+++ b/modules/ducktests/tests/ignitetest/services/utils/ignite_config.py
@@ -22,7 +22,7 @@ class IgniteConfig:
     def __init__(self, project="ignite"):
         self.project = project
 
-    def render(self, config_dir, work_dir, client_mode="false"):
+    def render(self, config_dir, work_dir, properties=""):
         return """<?xml version="1.0" encoding="UTF-8"?>
 
 <beans xmlns="http://www.springframework.org/schema/beans"
@@ -36,12 +36,12 @@ class IgniteConfig:
                 <constructor-arg type="java.lang.String" value="{config_dir}/ignite-log4j.xml"/>
             </bean>
         </property>
-        <property name="clientMode" value="{client_mode}" />
+        {properties}
     </bean>
 </beans>
         """.format(config_dir=config_dir,
                    work_dir=work_dir,
-                   client_mode=client_mode)
+                   properties=properties)
 
     def render_log4j(self, work_dir):
         return """<?xml version="1.0" encoding="UTF-8"?>
diff --git a/modules/ducktests/tests/ignitetest/services/utils/ignite_path.py b/modules/ducktests/tests/ignitetest/services/utils/ignite_path.py
index 6307175..ed1a3fb 100644
--- a/modules/ducktests/tests/ignitetest/services/utils/ignite_path.py
+++ b/modules/ducktests/tests/ignitetest/services/utils/ignite_path.py
@@ -15,7 +15,7 @@
 
 import os
 
-from ignitetest.version import get_version, IgniteVersion, DEV_BRANCH
+from ignitetest.version import get_version, IgniteVersion
 
 """
 This module provides Ignite path methods
@@ -37,7 +37,7 @@ class IgnitePath:
     def __init__(self, project="ignite"):
         self.project = project
 
-    def home(self, node_or_version=DEV_BRANCH, project=None):
+    def home(self, node_or_version, project=None):
         version = self._version(node_or_version)
         home_dir = project or self.project
         if version is not None:
@@ -45,7 +45,7 @@ class IgnitePath:
 
         return os.path.join(IgnitePath.IGNITE_INSTALL_ROOT, home_dir)
 
-    def script(self, script_name, node_or_version=DEV_BRANCH, project=None):
+    def script(self, script_name, node_or_version, project=None):
         version = self._version(node_or_version)
         return os.path.join(self.home(version, project=project), "bin", script_name)
 
diff --git a/modules/ducktests/tests/ignitetest/benchmarks/__init__.py b/modules/ducktests/tests/ignitetest/tests/benchmarks/__init__.py
similarity index 100%
copy from modules/ducktests/tests/ignitetest/benchmarks/__init__.py
copy to modules/ducktests/tests/ignitetest/tests/benchmarks/__init__.py
diff --git a/modules/ducktests/tests/ignitetest/tests/benchmarks/add_node_rebalance_test.py b/modules/ducktests/tests/ignitetest/tests/benchmarks/add_node_rebalance_test.py
new file mode 100644
index 0000000..2c725d7
--- /dev/null
+++ b/modules/ducktests/tests/ignitetest/tests/benchmarks/add_node_rebalance_test.py
@@ -0,0 +1,94 @@
+# Licensed to the Apache Software Foundation (ASF) under one or more
+# contributor license agreements.  See the NOTICE file distributed with
+# this work for additional information regarding copyright ownership.
+# The ASF licenses this file to You under the Apache License, Version 2.0
+# (the "License"); you may not use this file except in compliance with
+# the License.  You may obtain a copy of the License at
+#
+#    http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing, software
+# distributed under the License is distributed on an "AS IS" BASIS,
+# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+# See the License for the specific language governing permissions and
+# limitations under the License.
+import time
+
+from ducktape.mark import parametrize
+from ducktape.mark.resource import cluster
+
+from ignitetest.services.ignite import IgniteService
+from ignitetest.services.ignite_app import IgniteApplicationService
+from ignitetest.tests.utils.ignite_test import IgniteTest
+from ignitetest.version import DEV_BRANCH, IgniteVersion, LATEST
+
+
+class AddNodeRebalanceTest(IgniteTest):
+    NUM_NODES = 4
+    PRELOAD_TIMEOUT = 60
+    DATA_AMOUNT = 1000000
+    REBALANCE_TIMEOUT = 60
+
+    """
+    Test performs rebalance tests.
+    """
+
+    @staticmethod
+    def properties(client_mode="false"):
+        return """
+            <property name="clientMode" value="{client_mode}"/>
+        """.format(client_mode=client_mode)
+
+    def __init__(self, test_context):
+        super(AddNodeRebalanceTest, self).__init__(test_context=test_context)
+
+    def setUp(self):
+        pass
+
+    def teardown(self):
+        pass
+
+    @cluster(num_nodes=NUM_NODES + 1)
+    @parametrize(version=str(DEV_BRANCH))
+    @parametrize(version=str(LATEST))
+    def test_add_node(self, version):
+        """
+        Test performs add node rebalance test which consists of following steps:
+            * Start cluster.
+            * Put data to it via IgniteClientApp.
+            * Start one more node and awaits for rebalance to finish.
+        """
+        ignite_version = IgniteVersion(version)
+
+        self.stage("Start Ignite nodes")
+
+        ignites = IgniteService(self.test_context, num_nodes=AddNodeRebalanceTest.NUM_NODES - 1, version=ignite_version)
+
+        ignites.start()
+
+        self.stage("Starting DataGenerationApplication")
+
+        # This client just put some data to the cache.
+        IgniteApplicationService(self.test_context,
+                                 java_class_name="org.apache.ignite.internal.ducktest.DataGenerationApplication",
+                                 properties=self.properties(client_mode="true"),
+                                 version=ignite_version,
+                                 params="test-cache,%d" % self.DATA_AMOUNT,
+                                 timeout_sec=self.PRELOAD_TIMEOUT).run()
+
+        ignite = IgniteService(self.test_context, num_nodes=1, version=ignite_version)
+
+        self.stage("Starting Ignite node")
+
+        ignite.start()
+
+        start = time.time()
+
+        ignite.await_event("rebalanced=true, wasRebalanced=false",
+                           timeout_sec=AddNodeRebalanceTest.REBALANCE_TIMEOUT,
+                           from_the_beginning=True,
+                           backoff_sec=1)
+
+        data = {"Rebalanced in (sec)": time.time() - start}
+
+        return data
diff --git a/modules/ducktests/tests/ignitetest/tests/benchmarks/pme_free_switch_test.py b/modules/ducktests/tests/ignitetest/tests/benchmarks/pme_free_switch_test.py
new file mode 100644
index 0000000..d06b603
--- /dev/null
+++ b/modules/ducktests/tests/ignitetest/tests/benchmarks/pme_free_switch_test.py
@@ -0,0 +1,117 @@
+# Licensed to the Apache Software Foundation (ASF) under one or more
+# contributor license agreements.  See the NOTICE file distributed with
+# this work for additional information regarding copyright ownership.
+# The ASF licenses this file to You under the Apache License, Version 2.0
+# (the "License"); you may not use this file except in compliance with
+# the License.  You may obtain a copy of the License at
+#
+#    http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing, software
+# distributed under the License is distributed on an "AS IS" BASIS,
+# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+# See the License for the specific language governing permissions and
+# limitations under the License.
+import time
+
+from ducktape.mark import parametrize
+from ducktape.mark.resource import cluster
+
+from ignitetest.services.ignite import IgniteService
+from ignitetest.services.ignite_app import IgniteApplicationService
+from ignitetest.tests.utils.ignite_test import IgniteTest
+from ignitetest.version import DEV_BRANCH, LATEST_2_7, V_2_8_0, IgniteVersion
+
+
+class PmeFreeSwitchTest(IgniteTest):
+    NUM_NODES = 3
+
+    @staticmethod
+    def properties(client_mode="false"):
+        return """
+            <property name="clientMode" value="{client_mode}"/>
+            <property name="cacheConfiguration">
+                <list>
+                    <bean class="org.apache.ignite.configuration.CacheConfiguration">
+                        <property name="name" value="test-cache"/>
+                        <property name="backups" value="2"/>
+                        <property name="atomicityMode" value="TRANSACTIONAL"/>
+                    </bean>
+                </list>
+            </property>
+        """.format(client_mode=client_mode)
+
+    def __init__(self, test_context):
+        super(PmeFreeSwitchTest, self).__init__(test_context=test_context)
+
+    def setUp(self):
+        pass
+
+    def teardown(self):
+        pass
+
+    @cluster(num_nodes=NUM_NODES + 2)
+    @parametrize(version=str(DEV_BRANCH))
+    @parametrize(version=str(LATEST_2_7))
+    def test(self, version):
+        data = {}
+
+        self.stage("Starting nodes")
+
+        ignite_version = IgniteVersion(version)
+
+        ignites = IgniteService(
+            self.test_context,
+            num_nodes=self.NUM_NODES,
+            properties=self.properties(),
+            version=ignite_version)
+
+        ignites.start()
+
+        self.stage("Starting long_tx_streamer")
+
+        long_tx_streamer = IgniteApplicationService(
+            self.test_context,
+            java_class_name="org.apache.ignite.internal.ducktest.LongTxStreamerApplication",
+            properties=self.properties(client_mode="true"),
+            params="test-cache",
+            version=ignite_version)
+
+        long_tx_streamer.start()
+
+        self.stage("Starting single_key_tx_streamer")
+
+        single_key_tx_streamer = IgniteApplicationService(
+            self.test_context,
+            java_class_name="org.apache.ignite.internal.ducktest.SingleKeyTxStreamerApplication",
+            properties=self.properties(client_mode="true"),
+            params="test-cache,1000",
+            version=ignite_version)
+
+        single_key_tx_streamer.start()
+
+        if ignite_version >= V_2_8_0:
+            long_tx_streamer.execute(
+                "control.sh --host %s --baseline auto_adjust disable --yes" % ignites.nodes[0].account.hostname)
+
+        self.stage("Stopping server node")
+
+        ignites.stop_node(ignites.nodes[1])
+
+        long_tx_streamer.await_event("Node left topology", 60, from_the_beginning=True)
+
+        time.sleep(30)  # keeping txs alive for 30 seconds.
+
+        self.stage("Stopping long_tx_streamer")
+
+        long_tx_streamer.stop()
+
+        self.stage("Stopping single_key_tx_streamer")
+
+        single_key_tx_streamer.stop()
+
+        data["Worst latency (ms)"] = single_key_tx_streamer.extract_result("WORST_LATENCY")
+        data["Streamed txs"] = single_key_tx_streamer.extract_result("STREAMED")
+        data["Measure duration (ms)"] = single_key_tx_streamer.extract_result("MEASURE_DURATION")
+
+        return data
diff --git a/modules/ducktests/tests/ignitetest/tests/spark_integration_test.py b/modules/ducktests/tests/ignitetest/tests/spark_integration_test.py
index a239a4a..4fa46bb 100644
--- a/modules/ducktests/tests/ignitetest/tests/spark_integration_test.py
+++ b/modules/ducktests/tests/ignitetest/tests/spark_integration_test.py
@@ -13,14 +13,14 @@
 # See the License for the specific language governing permissions and
 # limitations under the License.
 
-from ducktape.tests.test import Test
-
 from ignitetest.services.ignite import IgniteService
-from ignitetest.services.ignite_client_app import IgniteClientApp, SparkIgniteClientApp
+from ignitetest.services.ignite_app import IgniteApplicationService
+from ignitetest.services.ignite_spark_app import SparkIgniteApplicationService
 from ignitetest.services.spark import SparkService
+from ignitetest.tests.utils.ignite_test import IgniteTest
 
 
-class SparkIntegrationTest(Test):
+class SparkIntegrationTest(IgniteTest):
     """
     Test performs:
     1. Start of Spark cluster.
@@ -28,13 +28,18 @@ class SparkIntegrationTest(Test):
     3. Checks results of client application.
     """
 
+    @staticmethod
+    def properties(client_mode="false"):
+        return """
+            <property name="clientMode" value="{client_mode}"/>
+        """.format(client_mode=client_mode)
+
     def __init__(self, test_context):
         super(SparkIntegrationTest, self).__init__(test_context=test_context)
         self.spark = SparkService(test_context, num_nodes=2)
         self.ignite = IgniteService(test_context, num_nodes=1)
 
     def setUp(self):
-        # starting all nodes except last.
         self.spark.start()
         self.ignite.start()
 
@@ -43,9 +48,16 @@ class SparkIntegrationTest(Test):
         self.ignite.stop()
 
     def test_spark_client(self):
-        self.logger.info("Spark integration test.")
+        self.stage("Starting sample data generator")
+
+        IgniteApplicationService(self.test_context,
+                                 java_class_name="org.apache.ignite.internal.ducktest.SampleDataStreamerApplication",
+                                 params="cache,1000",
+                                 properties=self.properties(client_mode="true")).run()
 
-        IgniteClientApp(self.test_context,
-                        java_class_name="org.apache.ignite.internal.test.IgniteApplication").run()
+        self.stage("Starting Spark application")
 
-        SparkIgniteClientApp(self.test_context, self.spark.nodes[0]).run()
+        SparkIgniteApplicationService(self.test_context,
+                                      "org.apache.ignite.internal.ducktest.SparkApplication",
+                                      params="spark://" + self.spark.nodes[0].account.hostname + ":7077",
+                                      timeout_sec=120).run()
diff --git a/modules/ducktests/tests/ignitetest/benchmarks/__init__.py b/modules/ducktests/tests/ignitetest/tests/utils/__init__.py
similarity index 100%
rename from modules/ducktests/tests/ignitetest/benchmarks/__init__.py
rename to modules/ducktests/tests/ignitetest/tests/utils/__init__.py
diff --git a/modules/ducktests/tests/ignitetest/services/__init__.py b/modules/ducktests/tests/ignitetest/tests/utils/ignite_test.py
similarity index 75%
copy from modules/ducktests/tests/ignitetest/services/__init__.py
copy to modules/ducktests/tests/ignitetest/tests/utils/ignite_test.py
index e556dc9..1ec2132 100644
--- a/modules/ducktests/tests/ignitetest/services/__init__.py
+++ b/modules/ducktests/tests/ignitetest/tests/utils/ignite_test.py
@@ -13,3 +13,12 @@
 # See the License for the specific language governing permissions and
 # limitations under the License.
 
+from ducktape.tests.test import Test
+
+
+class IgniteTest(Test):
+    def __init__(self, test_context):
+        super(IgniteTest, self).__init__(test_context=test_context)
+
+    def stage(self, msg):
+        self.logger.info("[TEST_STAGE] " + msg + "...")
diff --git a/modules/ducktests/tests/ignitetest/version.py b/modules/ducktests/tests/ignitetest/version.py
index 31ad73d..9ba5c8e 100644
--- a/modules/ducktests/tests/ignitetest/version.py
+++ b/modules/ducktests/tests/ignitetest/version.py
@@ -67,6 +67,11 @@ DEV_VERSION = IgniteVersion("2.9.0-SNAPSHOT")
 
 # 2.7.x versions
 V_2_7_6 = IgniteVersion("2.7.6")
+LATEST_2_7 = V_2_7_6
 
 # 2.8.0 versions
 V_2_8_0 = IgniteVersion("2.8.0")
+V_2_8_1 = IgniteVersion("2.8.1")
+LATEST_2_8 = V_2_8_1
+
+LATEST = LATEST_2_8
diff --git a/scripts/build.sh b/scripts/build-module.sh
similarity index 86%
copy from scripts/build.sh
copy to scripts/build-module.sh
index 333ba9e..ca788fe 100755
--- a/scripts/build.sh
+++ b/scripts/build-module.sh
@@ -19,7 +19,7 @@
 #
 # Builds project.
 # Run in Ignite sources root directory.
-# Usage: ./build.sh
+# Usage: ./scripts/build-module.sh ducktests
 #
 
-mvn clean package -Pall-java,all-scala -DskipTests -Dmaven.javadoc.skip=true
\ No newline at end of file
+mvn clean package -pl :ignite-$1 -Pall-java,all-scala -DskipTests -Dmaven.javadoc.skip=true -am
\ No newline at end of file
diff --git a/scripts/build.sh b/scripts/build.sh
index 333ba9e..d6d8305 100755
--- a/scripts/build.sh
+++ b/scripts/build.sh
@@ -19,7 +19,7 @@
 #
 # Builds project.
 # Run in Ignite sources root directory.
-# Usage: ./build.sh
+# Usage: ./scripts/build.sh
 #
 
 mvn clean package -Pall-java,all-scala -DskipTests -Dmaven.javadoc.skip=true
\ No newline at end of file