You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@drill.apache.org by vi...@apache.org on 2021/10/19 14:11:37 UTC

[drill] branch master updated: DRILL-7973: Fix GitHub CI intermittent failures (#2293)

This is an automated email from the ASF dual-hosted git repository.

vitalii pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/drill.git


The following commit(s) were added to refs/heads/master by this push:
     new 304230a  DRILL-7973: Fix GitHub CI intermittent failures (#2293)
304230a is described below

commit 304230a289505526e1ff1bb1aae932517b7b6965
Author: Vitalii Diravka <vi...@apache.org>
AuthorDate: Tue Oct 19 17:11:28 2021 +0300

    DRILL-7973: Fix GitHub CI intermittent failures (#2293)
    
     * Disable flaky Splunk test
     * DdirectMemoryMb=3000 -DmemoryMb=1800
     * fail-fast: false
     * Update cache version
     * Fix flaky TestHttpPlugin#testSlowResponse
     * Enable all ignored tests in TestDrillbitResilience
     * Set timeout for every test in TestDrillbitResilience
     * Migrate TestDrillbitResilience from junit4 to junit5
     * Release resources in the @AfterEach method in TestDrillbitResilience
     * check timeout for the mock http plugin
     * fix JUnit Tag name for slow tests
     * make memory configs are common for builds in all CI. Fix travis OOM
     * refactoring test cases to use ClusterTest and fixtures
     * COMPLETED state for TestDrillbitResilience#cancelAfterEverythingIsCompleted
     * Enable LogFixture for TestDrillbitResilience. Udpate javacc-maven-plugin for maven parallel execution (see maven message in the beginning with -V)
     * -B -ntp -> --batch-mode --no-transfer-progress. Disable TestConfigLinkage
     * add proper logging. Check whether query is really interrupted. Check whether control is properly injected. Check when UnorderReceiverBatch is starting waiting
     * fix moving from STARTING to CANCELLATION_REQUESTED
    * Doc is added
    * Refactor TestPStoreProviders test. Update curator.version 5.1.0 -> 5.2.0
    * Fix TestDrillbitResilience#cancelWhenQueryIdArrives
---
 .github/workflows/ci.yml                           |   12 +-
 .travis.yml                                        |    3 +-
 .../drill/common/concurrent/ExtendedLatch.java     |    1 +
 .../categories/{SlowTest.java => FlakyTest.java}   |   10 +-
 .../java/org/apache/drill/categories/SlowTest.java |    8 +-
 .../apache/drill/common/util/RepeatTestRule.java   |    3 +
 .../java/org/apache/drill/test/DirTestWatcher.java |    4 +
 .../exec/store/http/HttpStoragePluginConfig.java   |    5 +-
 .../drill/exec/store/http/TestHttpPlugin.java      |    2 +-
 .../drill/store/openTSDB/TestOpenTSDBPlugin.java   |   21 +-
 .../drill/exec/store/splunk/SplunkPluginTest.java  |    1 +
 docs/dev/InjectControls.md                         |   57 ++
 exec/java-exec/pom.xml                             |    2 +-
 .../org/apache/drill/exec/client/DrillClient.java  |    5 +-
 .../unorderedreceiver/UnorderedReceiverBatch.java  |    3 +
 .../drill/exec/record/RecordBatchLoader.java       |    2 +-
 .../apache/drill/exec/server/RemoteServiceSet.java |    6 +
 .../drill/exec/testing/ExecutionControls.java      |    2 +-
 .../org/apache/drill/exec/testing/Injection.java   |    7 +
 .../org/apache/drill/exec/work/WorkManager.java    |    2 +-
 .../apache/drill/exec/work/foreman/Foreman.java    |    1 +
 .../exec/work/foreman/QueryStateProcessor.java     |   26 +-
 .../java/org/apache/drill/SingleRowListener.java   |    2 +-
 .../physical/impl/TestNestedDateTimeTimestamp.java |    3 +
 .../physical/impl/writer/TestParquetWriter.java    |    2 +-
 .../drill/exec/server/TestDrillbitResilience.java  | 1007 ++++++++++----------
 .../store/easy/json/loader/TestExtendedTypes.java  |    3 +-
 .../drill/exec/store/sys/PStoreTestUtil.java       |   66 +-
 .../drill/exec/store/sys/TestPStoreProviders.java  |    8 +-
 .../org/apache/drill/exec/testing/Controls.java    |    4 +-
 .../drill/exec/testing/ControlsInjectionUtil.java  |    4 +-
 .../java/org/apache/drill/test/BaseTestQuery.java  |    2 +-
 .../java/org/apache/drill/test/LogFixture.java     |    2 -
 .../jdbc/test/TestInformationSchemaColumns.java    |    3 +-
 pom.xml                                            |    4 +-
 35 files changed, 694 insertions(+), 599 deletions(-)

diff --git a/.github/workflows/ci.yml b/.github/workflows/ci.yml
index 5866b19..e32d7b7 100644
--- a/.github/workflows/ci.yml
+++ b/.github/workflows/ci.yml
@@ -34,6 +34,7 @@ jobs:
       matrix:
         # Java versions to run unit tests
         java: [ '8', '11', '14' ]
+      fail-fast: false
     steps:
       - name: Checkout
         uses: actions/checkout@v2
@@ -47,7 +48,7 @@ jobs:
               ${{ runner.os }}-maven-
       # Caches MySQL directory used for JDBC storage plugin tests
       - name: Cache MySQL
-        uses: actions/cache@v1
+        uses: actions/cache@v2
         with:
           path: ~/.embedmysql
           key: ${{ runner.os }}-mysql
@@ -57,7 +58,8 @@ jobs:
           distribution: 'adopt'
           java-version: ${{ matrix.java }}
       - name: Build and test
-        run: mvn install -V -ntp -DdirectMemoryMb=2500 -DmemoryMb=2000 # Note: the total GitHub Actions memory is 7000Mb
+        # The total GitHub Actions memory is 7000Mb. But GitHub CI requires some memory for the container to perform tests
+        run: mvn install --batch-mode --no-transfer-progress # -X -V for debugging
 
   checkstyle_protobuf:
     name: Run checkstyle and generate protobufs
@@ -66,7 +68,7 @@ jobs:
       - name: Checkout
         uses: actions/checkout@v2
       - name: Cache Maven Repository
-        uses: actions/cache@v1
+        uses: actions/cache@v2
         with:
           path: ~/.m2/repository
           key: ${{ runner.os }}-maven-${{ hashFiles('**/pom.xml') }}
@@ -74,7 +76,7 @@ jobs:
             ${{ runner.os }}-maven-
       # Caches built protobuf library
       - name: Cache protobufs
-        uses: actions/cache@v1
+        uses: actions/cache@v2
         with:
           path: ~/protobuf
           key: ${{ runner.os }}-protobuf
@@ -98,7 +100,7 @@ jobs:
       # Builds Drill project, performs license checkstyle goal and regenerates java and C++ protobuf files
       - name: Build
         run: |
-          MAVEN_OPTS="-Xms1G -Xmx1G" mvn install -Drat.skip=false -Dlicense.skip=false --batch-mode -Dorg.slf4j.simpleLogger.log.org.apache.maven.cli.transfer.Slf4jMavenTransferListener=warn -DskipTests=true -Dmaven.javadoc.skip=true -Dmaven.source.skip=true && \
+          MAVEN_OPTS="-Xms1G -Xmx1G" mvn install -Drat.skip=false -Dlicense.skip=false --batch-mode --no-transfer-progress -Dorg.slf4j.simpleLogger.log.org.apache.maven.cli.transfer.Slf4jMavenTransferListener=warn -DskipTests=true -Dmaven.javadoc.skip=true -Dmaven.source.skip=true && \
           pushd protocol && mvn process-sources -P proto-compile && popd && \
           mkdir contrib/native/client/build && pushd contrib/native/client/build && cmake -G "Unix Makefiles" .. && make cpProtobufs && popd; \
       # Checks whether project files weren't changed after regenerating protobufs
diff --git a/.travis.yml b/.travis.yml
index 0f059bf..36e3b95 100644
--- a/.travis.yml
+++ b/.travis.yml
@@ -72,8 +72,7 @@ install:
   - |
     if [ $PHASE = "tests" ]; then \
       mvn install --batch-mode --no-transfer-progress \
-        -DexcludedGroups="org.apache.drill.categories.SlowTest,org.apache.drill.categories.UnlikelyTest,org.apache.drill.categories.SecurityTest" \
-        -DmemoryMb=1300 -DdirectMemoryMb=3000; \
+        -DexcludedGroups="org.apache.drill.categories.SlowTest,org.apache.drill.categories.UnlikelyTest,org.apache.drill.categories.SecurityTest"; \
     elif [ $PHASE = "build_checkstyle_protobuf" ]; then \
       MAVEN_OPTS="-Xms1G -Xmx1G" mvn install --no-transfer-progress -Drat.skip=false -Dlicense.skip=false --batch-mode -Dorg.slf4j.simpleLogger.log.org.apache.maven.cli.transfer.Slf4jMavenTransferListener=warn -DskipTests=true -Dmaven.javadoc.skip=true -Dmaven.source.skip=true && \
       pushd protocol && mvn process-sources -P proto-compile && popd && \
diff --git a/common/src/main/java/org/apache/drill/common/concurrent/ExtendedLatch.java b/common/src/main/java/org/apache/drill/common/concurrent/ExtendedLatch.java
index 9868537..383cd7f 100644
--- a/common/src/main/java/org/apache/drill/common/concurrent/ExtendedLatch.java
+++ b/common/src/main/java/org/apache/drill/common/concurrent/ExtendedLatch.java
@@ -52,6 +52,7 @@ public class ExtendedLatch extends CountDownLatch {
         return await(wait, TimeUnit.MILLISECONDS);
       } catch (final InterruptedException e) {
         // if we weren't ready, the while loop will continue to wait
+        logger.warn("Interrupted while waiting for event latch.", e);
       }
     }
     return false;
diff --git a/common/src/test/java/org/apache/drill/categories/SlowTest.java b/common/src/test/java/org/apache/drill/categories/FlakyTest.java
similarity index 77%
copy from common/src/test/java/org/apache/drill/categories/SlowTest.java
copy to common/src/test/java/org/apache/drill/categories/FlakyTest.java
index cf1103f..9a747b2 100644
--- a/common/src/test/java/org/apache/drill/categories/SlowTest.java
+++ b/common/src/test/java/org/apache/drill/categories/FlakyTest.java
@@ -18,8 +18,12 @@
 package org.apache.drill.categories;
 
 /**
- * A category for tests that take a long time to run.
+ * A category for tests that intermittently fail. It is better to run them in one Maven fork to avoid OOM or deadlocks <br>
+ * Junit category marker
  */
-public interface SlowTest {
-  // Junit category marker
+public interface FlakyTest {
+    /**
+     * tag for JUnit5
+     */
+    String TAG = "flaky-test";
 }
diff --git a/common/src/test/java/org/apache/drill/categories/SlowTest.java b/common/src/test/java/org/apache/drill/categories/SlowTest.java
index cf1103f..f4f6ea9 100644
--- a/common/src/test/java/org/apache/drill/categories/SlowTest.java
+++ b/common/src/test/java/org/apache/drill/categories/SlowTest.java
@@ -18,8 +18,12 @@
 package org.apache.drill.categories;
 
 /**
- * A category for tests that take a long time to run.
+ * A category for tests that take a long time to run. <br>
+ * Junit category marker
  */
 public interface SlowTest {
-  // Junit category marker
+    /**
+     * tag for JUnit5
+     */
+    String TAG = "slow-test";
 }
diff --git a/common/src/test/java/org/apache/drill/common/util/RepeatTestRule.java b/common/src/test/java/org/apache/drill/common/util/RepeatTestRule.java
index fd6d95e..07b4da2 100644
--- a/common/src/test/java/org/apache/drill/common/util/RepeatTestRule.java
+++ b/common/src/test/java/org/apache/drill/common/util/RepeatTestRule.java
@@ -38,7 +38,10 @@ import java.lang.annotation.Target;
  * }
  * }
  * </pre>
+ * @deprecated It was created for Junit4. Junit5 has its own @RepeatedTest(num). And looks like it is not used now, but
+ * it can be used for manual testing
  */
+@Deprecated
 public class RepeatTestRule implements TestRule {
 
   @Retention(RetentionPolicy.RUNTIME)
diff --git a/common/src/test/java/org/apache/drill/test/DirTestWatcher.java b/common/src/test/java/org/apache/drill/test/DirTestWatcher.java
index 839fbac..9dea4a5 100644
--- a/common/src/test/java/org/apache/drill/test/DirTestWatcher.java
+++ b/common/src/test/java/org/apache/drill/test/DirTestWatcher.java
@@ -18,6 +18,8 @@
 package org.apache.drill.test;
 
 import org.apache.commons.io.FileUtils;
+import org.junit.jupiter.api.extension.AfterEachCallback;
+import org.junit.jupiter.api.extension.BeforeEachCallback;
 import org.junit.rules.TestWatcher;
 import org.junit.runner.Description;
 
@@ -87,6 +89,8 @@ import java.nio.file.Paths;
  * {@link DirTestWatcher#getDir()} in myTestMethod1 and myTestMethod2 are
  * <b>my.proj.MyTestClass/myTestMethod1</b> and
  * <b>my.proj.MyTestClass/myTestMethod2</b> respectively.
+ * TODO: need to implement {@link AfterEachCallback} and {@link BeforeEachCallback} to use it with
+ *  JUnit5 @ExtendWith annotation
  * </p>
  */
 public class DirTestWatcher extends TestWatcher {
diff --git a/contrib/storage-http/src/main/java/org/apache/drill/exec/store/http/HttpStoragePluginConfig.java b/contrib/storage-http/src/main/java/org/apache/drill/exec/store/http/HttpStoragePluginConfig.java
index 280895d..119d4b0 100644
--- a/contrib/storage-http/src/main/java/org/apache/drill/exec/store/http/HttpStoragePluginConfig.java
+++ b/contrib/storage-http/src/main/java/org/apache/drill/exec/store/http/HttpStoragePluginConfig.java
@@ -35,6 +35,7 @@ import org.slf4j.LoggerFactory;
 import java.util.HashMap;
 import java.util.Map;
 import java.util.Objects;
+import java.util.concurrent.TimeUnit;
 
 
 @JsonTypeName(HttpStoragePluginConfig.NAME)
@@ -47,9 +48,9 @@ public class HttpStoragePluginConfig extends AbstractSecuredStoragePluginConfig
   public final int proxyPort;
   public final String proxyType;
   /**
-   * Timeout in seconds.
+   * Timeout in {@link TimeUnit#SECONDS}.
    */
-  public int timeout;
+  public final int timeout;
 
   @JsonCreator
   public HttpStoragePluginConfig(@JsonProperty("cacheResults") Boolean cacheResults,
diff --git a/contrib/storage-http/src/test/java/org/apache/drill/exec/store/http/TestHttpPlugin.java b/contrib/storage-http/src/test/java/org/apache/drill/exec/store/http/TestHttpPlugin.java
index 57510f3..0375c99 100644
--- a/contrib/storage-http/src/test/java/org/apache/drill/exec/store/http/TestHttpPlugin.java
+++ b/contrib/storage-http/src/test/java/org/apache/drill/exec/store/http/TestHttpPlugin.java
@@ -877,7 +877,7 @@ public class TestHttpPlugin extends ClusterTest {
       server.enqueue(
         new MockResponse().setResponseCode(200)
           .setBody(TEST_JSON_RESPONSE)
-          .throttleBody(64, 4, TimeUnit.SECONDS)
+          .throttleBody(64, 6, TimeUnit.SECONDS)
       );
 
       String sql = "SELECT sunrise AS sunrise, sunset AS sunset FROM local.sunrise.`?lat=36.7201600&lng=-4.4203400&date=2019-10-02` AS t1";
diff --git a/contrib/storage-opentsdb/src/test/java/org/apache/drill/store/openTSDB/TestOpenTSDBPlugin.java b/contrib/storage-opentsdb/src/test/java/org/apache/drill/store/openTSDB/TestOpenTSDBPlugin.java
index 8c03752..1c8207a 100644
--- a/contrib/storage-opentsdb/src/test/java/org/apache/drill/store/openTSDB/TestOpenTSDBPlugin.java
+++ b/contrib/storage-opentsdb/src/test/java/org/apache/drill/store/openTSDB/TestOpenTSDBPlugin.java
@@ -22,6 +22,7 @@ import org.apache.drill.common.exceptions.UserRemoteException;
 import org.apache.drill.exec.proto.UserBitShared.QueryType;
 import org.apache.drill.exec.store.StoragePluginRegistry;
 import org.apache.drill.exec.store.openTSDB.OpenTSDBStoragePluginConfig;
+import org.apache.drill.test.ClientFixture;
 import org.apache.drill.test.ClusterFixture;
 import org.apache.drill.test.ClusterTest;
 import org.apache.drill.test.QueryTestUtil;
@@ -196,19 +197,19 @@ public class TestOpenTSDBPlugin extends ClusterTest {
 
   @Test
   public void testInformationSchemaWrongPluginConfig() throws Exception {
-    ClusterFixture cluster = ClusterFixture.bareBuilder(dirTestWatcher)
-        .build();
-    int portNumber = QueryTestUtil.getFreePortNumber(10_000, 200);
-    final StoragePluginRegistry pluginRegistry = cluster.drillbit().getContext().getStorage();
-    OpenTSDBStoragePluginConfig storagePluginConfig =
+    try (ClusterFixture cluster = ClusterFixture.bareBuilder(dirTestWatcher).build();
+         ClientFixture client = cluster.clientFixture()) {
+      int portNumber = QueryTestUtil.getFreePortNumber(10_000, 200);
+      final StoragePluginRegistry pluginRegistry = cluster.drillbit().getContext().getStorage();
+      OpenTSDBStoragePluginConfig storagePluginConfig =
         new OpenTSDBStoragePluginConfig(String.format("http://localhost:%s/", portNumber));
-    storagePluginConfig.setEnabled(true);
-    pluginRegistry.put(OpenTSDBStoragePluginConfig.NAME, storagePluginConfig);
-    String query = "select * from information_schema.`views`";
-    cluster.clientFixture()
-        .queryBuilder()
+      storagePluginConfig.setEnabled(true);
+      pluginRegistry.put(OpenTSDBStoragePluginConfig.NAME, storagePluginConfig);
+      String query = "select * from information_schema.`views`";
+      client.queryBuilder()
         .sql(query)
         .run();
+    }
   }
 
   private long runQuery(String query) throws Exception {
diff --git a/contrib/storage-splunk/src/test/java/org/apache/drill/exec/store/splunk/SplunkPluginTest.java b/contrib/storage-splunk/src/test/java/org/apache/drill/exec/store/splunk/SplunkPluginTest.java
index a9e7aff..7d52008 100644
--- a/contrib/storage-splunk/src/test/java/org/apache/drill/exec/store/splunk/SplunkPluginTest.java
+++ b/contrib/storage-splunk/src/test/java/org/apache/drill/exec/store/splunk/SplunkPluginTest.java
@@ -172,6 +172,7 @@ public class SplunkPluginTest extends SplunkBaseTest {
   }
 
   @Test
+  @Ignore("the result is not consistent on system tables")
   public void testExplicitFieldsWithSourceType() throws Exception {
     String sql = "SELECT action, _sourcetype, _subsecond, _time FROM splunk._audit WHERE sourcetype='audittrail' LIMIT 5";
     client.testBuilder()
diff --git a/docs/dev/InjectControls.md b/docs/dev/InjectControls.md
new file mode 100644
index 0000000..1ab1029
--- /dev/null
+++ b/docs/dev/InjectControls.md
@@ -0,0 +1,57 @@
+# InjectControls (DRILL-2383)
+_This is the copy of the following doc:
+https://docs.google.com/document/d/1qwsV5Uq2ftJn6ZMzjydxGl8lqeIiS4gmUTMcmdz4gDY/edit <p>_
+This feature is useful for developers to test Drill resiliency and to verify behavior when an exception is thrown. Use pauses to test cancellations.
+Injection sites can be added to classes that can get hold of ExecutionControls from either FragmentContext, QueryContext or OperatorContext. Injection sites can be chained.
+1) Declare an injector just like a Logger:
+   private final static ExecutionControlsInjector injector =
+   ExecutionControlsInjector.getInjector(FragmentExecutor.class);
+
+2) Add an injection site (unique descriptor per site):
+   // ...code...
+   // function signature: (controls, descriptor, exception)
+   injector.injectChecked(fragmentContext.getExecutionControls(),     
+   "fragment-execution-checked", IOException.class);
+   // function signature: (controls, descriptor)
+   injector.injectUnchecked(fragmentContext.getExecutionControls(),     
+   "fragment-execution-unchecked");
+   // ...code...
+   OR
+   // ...code...
+   // function signature: (controls, descriptor, logger)
+   injector.injectPause(queryContext.getExecutionControls(), "pause-run-plan",
+   logger);
+   // ...code...
+
+To set controls:
+> ALTER SESSION SET `drill.exec.testing.controls`='{
+"injections":[ {
+"siteClass": "org.apache.drill.exec.work.fragment.FragmentExecutor",
+"desc": "fragment-execution-checked",
+"nSkip": 0,
+"nFire": 1,
+"type":"exception",
+"exceptionClass": "java.io.IOException"
+}, {
+"siteClass": "org.apache.drill.exec.work.foreman.Foreman",
+"desc": "pause-run-plan",
+"nSkip": 0,
+"nFire": 1,
+"address": "10.10.10.10",
+"port": 31019,
+"type":"pause"
+}
+] }';
+> SELECT * FROM sys.memory;
+...
+
+For the above query, an IOException with fragment-execution message will be thrown on all drillbits. Also, if the foreman is on 10.10.10.10:31019, it will pause at pause-run-plan. For other examples, see TestDrillbitResilience, TestExceptionInjection and TestPauseInjection.
+
+_NOTE:_
+Controls are fired only if assertions are enabled.
+Controls are fired only on one query after the alter session query.
+If controls are specified, they are passed to every fragment as part of options.
+address and port fields are optional.
+If these fields are set, controls will be fired only on specified drillbit.
+If they are not set, the injection will be fired on EVERY drillbit that reaches the site.
+
diff --git a/exec/java-exec/pom.xml b/exec/java-exec/pom.xml
index 7997c37..167b675 100644
--- a/exec/java-exec/pom.xml
+++ b/exec/java-exec/pom.xml
@@ -889,7 +889,7 @@
       <plugin> <!-- generate the parser (Parser.jj is itself generated wit fmpp above) -->
         <groupId>org.codehaus.mojo</groupId>
         <artifactId>javacc-maven-plugin</artifactId>
-        <version>2.4</version>
+        <version>2.6</version>
         <executions>
           <execution>
             <id>javacc</id>
diff --git a/exec/java-exec/src/main/java/org/apache/drill/exec/client/DrillClient.java b/exec/java-exec/src/main/java/org/apache/drill/exec/client/DrillClient.java
index fdbb971..e5562dd 100644
--- a/exec/java-exec/src/main/java/org/apache/drill/exec/client/DrillClient.java
+++ b/exec/java-exec/src/main/java/org/apache/drill/exec/client/DrillClient.java
@@ -832,7 +832,7 @@ public class DrillClient implements Closeable, ConnectionThrottle {
     private final UserProtos.RunQuery query;
 
     public ListHoldingResultsListener(UserProtos.RunQuery query) {
-      logger.debug( "Listener created for query \"\"\"{}\"\"\"", query );
+      logger.debug( "Listener created for query \"{}\"", query );
       this.query = query;
     }
 
@@ -867,7 +867,8 @@ public class DrillClient implements Closeable, ConnectionThrottle {
 
     @Override
     public void dataArrived(QueryDataBatch result, ConnectionThrottle throttle) {
-      logger.debug("Result arrived:  Result: {}", result);
+      logger.debug("Result arrived:  Row count: {}", result.getHeader().getRowCount());
+      logger.trace("Result batch: {}", result);
       results.add(result);
     }
 
diff --git a/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/unorderedreceiver/UnorderedReceiverBatch.java b/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/unorderedreceiver/UnorderedReceiverBatch.java
index bfbad64..0ac2565 100644
--- a/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/unorderedreceiver/UnorderedReceiverBatch.java
+++ b/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/unorderedreceiver/UnorderedReceiverBatch.java
@@ -229,6 +229,7 @@ public class UnorderedReceiverBatch implements CloseableRecordBatch {
 
   @Override
   public void close() {
+    logger.debug("Closing {}", getClass().getCanonicalName());
     batchLoader.clear();
   }
 
@@ -279,6 +280,8 @@ public class UnorderedReceiverBatch implements CloseableRecordBatch {
 
     @Override
     public void interrupted(InterruptedException e) {
+      logger.debug("{} interrupted. shouldContinue value: {}", getClass().getCanonicalName(),
+        context.getExecutorState().shouldContinue());
       if (context.getExecutorState().shouldContinue()) {
         String errMsg = "Received an interrupt RPC outcome while sending ReceiverFinished message";
         logger.error(errMsg, e);
diff --git a/exec/java-exec/src/main/java/org/apache/drill/exec/record/RecordBatchLoader.java b/exec/java-exec/src/main/java/org/apache/drill/exec/record/RecordBatchLoader.java
index 1eb2ac0..83e1e85 100644
--- a/exec/java-exec/src/main/java/org/apache/drill/exec/record/RecordBatchLoader.java
+++ b/exec/java-exec/src/main/java/org/apache/drill/exec/record/RecordBatchLoader.java
@@ -47,7 +47,7 @@ import org.apache.drill.shaded.guava.com.google.common.base.Preconditions;
 /**
  * Holds record batch loaded from record batch message.
  */
-public class RecordBatchLoader implements VectorAccessible, Iterable<VectorWrapper<?>>{
+public class RecordBatchLoader implements VectorAccessible {
   private final static Logger logger = LoggerFactory.getLogger(RecordBatchLoader.class);
 
   private final BufferAllocator allocator;
diff --git a/exec/java-exec/src/main/java/org/apache/drill/exec/server/RemoteServiceSet.java b/exec/java-exec/src/main/java/org/apache/drill/exec/server/RemoteServiceSet.java
index aeb4475..723cd0a 100644
--- a/exec/java-exec/src/main/java/org/apache/drill/exec/server/RemoteServiceSet.java
+++ b/exec/java-exec/src/main/java/org/apache/drill/exec/server/RemoteServiceSet.java
@@ -20,6 +20,9 @@ package org.apache.drill.exec.server;
 import org.apache.drill.exec.coord.ClusterCoordinator;
 import org.apache.drill.exec.coord.local.LocalClusterCoordinator;
 
+/**
+ * It is necessary to start Drillbit. For more info check {@link ClusterCoordinator}
+ */
 public class RemoteServiceSet implements AutoCloseable {
   static final org.slf4j.Logger logger = org.slf4j.LoggerFactory.getLogger(RemoteServiceSet.class);
 
@@ -40,6 +43,9 @@ public class RemoteServiceSet implements AutoCloseable {
     coordinator.close();
   }
 
+  /**
+   * @return Use a non-null service set so that the drillbits can use port hunting
+   */
   public static RemoteServiceSet getLocalServiceSet() {
     return new RemoteServiceSet(new LocalClusterCoordinator());
   }
diff --git a/exec/java-exec/src/main/java/org/apache/drill/exec/testing/ExecutionControls.java b/exec/java-exec/src/main/java/org/apache/drill/exec/testing/ExecutionControls.java
index 29a3a2a..3543253 100644
--- a/exec/java-exec/src/main/java/org/apache/drill/exec/testing/ExecutionControls.java
+++ b/exec/java-exec/src/main/java/org/apache/drill/exec/testing/ExecutionControls.java
@@ -123,7 +123,7 @@ public final class ExecutionControls {
   /**
    * The default value for controls.
    */
-  public static final String DEFAULT_CONTROLS = "{}";
+  public static final String EMPTY_CONTROLS = "{\"injections\" : []}";
 
   /**
    * Caches the currently specified controls.
diff --git a/exec/java-exec/src/main/java/org/apache/drill/exec/testing/Injection.java b/exec/java-exec/src/main/java/org/apache/drill/exec/testing/Injection.java
index d8349c4..3414f24 100644
--- a/exec/java-exec/src/main/java/org/apache/drill/exec/testing/Injection.java
+++ b/exec/java-exec/src/main/java/org/apache/drill/exec/testing/Injection.java
@@ -17,6 +17,8 @@
  */
 package org.apache.drill.exec.testing;
 
+import lombok.ToString;
+import lombok.extern.slf4j.Slf4j;
 import org.apache.drill.exec.proto.CoordinationProtos.DrillbitEndpoint;
 
 import java.util.concurrent.atomic.AtomicInteger;
@@ -24,6 +26,8 @@ import java.util.concurrent.atomic.AtomicInteger;
 /**
  * The base class for all types of injections (currently, pause and exception).
  */
+@Slf4j
+@ToString
 public abstract class Injection {
 
   protected final String address;  // the address of the drillbit on which to inject
@@ -67,6 +71,9 @@ public abstract class Injection {
    * @return if the injection should be injected now
    */
   protected boolean injectNow() {
+    if(logger.isDebugEnabled()) {
+      logger.debug(toString());
+    }
     return nSkip.decrementAndGet() < 0 && nFire.decrementAndGet() >= 0;
   }
 
diff --git a/exec/java-exec/src/main/java/org/apache/drill/exec/work/WorkManager.java b/exec/java-exec/src/main/java/org/apache/drill/exec/work/WorkManager.java
index de49e28..5fcdac1 100644
--- a/exec/java-exec/src/main/java/org/apache/drill/exec/work/WorkManager.java
+++ b/exec/java-exec/src/main/java/org/apache/drill/exec/work/WorkManager.java
@@ -282,7 +282,7 @@ public class WorkManager implements AutoCloseable {
         final String originalName = currentThread.getName();
         try {
           currentThread.setName(queryIdString + ":foreman:cancel");
-          logger.debug("Canceling foreman");
+          logger.debug("Canceling foreman. Thread: {}", originalName);
           foreman.cancel();
         } catch (Throwable t) {
           logger.warn("Exception while canceling foreman", t);
diff --git a/exec/java-exec/src/main/java/org/apache/drill/exec/work/foreman/Foreman.java b/exec/java-exec/src/main/java/org/apache/drill/exec/work/foreman/Foreman.java
index 64df6c4..c12c713 100644
--- a/exec/java-exec/src/main/java/org/apache/drill/exec/work/foreman/Foreman.java
+++ b/exec/java-exec/src/main/java/org/apache/drill/exec/work/foreman/Foreman.java
@@ -201,6 +201,7 @@ public class Foreman implements Runnable {
    * Query execution will be canceled once possible.
    */
   public void cancel() {
+    logger.debug("Cancel Foreman");
     queryStateProcessor.cancel();
   }
 
diff --git a/exec/java-exec/src/main/java/org/apache/drill/exec/work/foreman/QueryStateProcessor.java b/exec/java-exec/src/main/java/org/apache/drill/exec/work/foreman/QueryStateProcessor.java
index 3bc75c3..17a7c5d 100644
--- a/exec/java-exec/src/main/java/org/apache/drill/exec/work/foreman/QueryStateProcessor.java
+++ b/exec/java-exec/src/main/java/org/apache/drill/exec/work/foreman/QueryStateProcessor.java
@@ -239,10 +239,12 @@ public class QueryStateProcessor implements AutoCloseable {
         return;
       case COMPLETED:
         wrapUpCompletion();
+        return;
       case CANCELLATION_REQUESTED:
         // since during starting state fragments are sent to the remote nodes,
         // we don't want to cancel until they all are sent out
-        addToEventQueue(QueryState.CANCELLATION_REQUESTED, null);
+        assert exception == null;
+        wrapUpCancellation();
         return;
     }
 
@@ -256,23 +258,13 @@ public class QueryStateProcessor implements AutoCloseable {
        * cause this to be called recursively.
        */
     switch (newState) {
-      case CANCELLATION_REQUESTED: {
+      case CANCELLATION_REQUESTED:
         assert exception == null;
-        recordNewState(QueryState.CANCELLATION_REQUESTED);
-        queryManager.cancelExecutingFragments(drillbitContext);
-        foremanResult.setCompleted(QueryState.CANCELED);
-        /*
-         * We don't close the foremanResult until we've gotten
-         * acknowledgments, which happens below in the case for current state
-         * == CANCELLATION_REQUESTED.
-         */
+        wrapUpCancellation();
         return;
-      }
-
-      case COMPLETED: {
+      case COMPLETED:
         wrapUpCompletion();
         return;
-      }
     }
     checkCommonStates(newState, exception);
   }
@@ -303,7 +295,13 @@ public class QueryStateProcessor implements AutoCloseable {
 
   private void wrapUpCancellation() {
     recordNewState(QueryState.CANCELLATION_REQUESTED);
+    queryManager.cancelExecutingFragments(drillbitContext);
     foremanResult.setCompleted(QueryState.CANCELED);
+    /*
+     * We don't close the foremanResult until we've gotten
+     * acknowledgments, which happens below in the case for current state
+     * == CANCELLATION_REQUESTED.
+     */
   }
 
   private void wrapUpCompletion() {
diff --git a/exec/java-exec/src/test/java/org/apache/drill/SingleRowListener.java b/exec/java-exec/src/test/java/org/apache/drill/SingleRowListener.java
index 65c900c..5532abe 100644
--- a/exec/java-exec/src/test/java/org/apache/drill/SingleRowListener.java
+++ b/exec/java-exec/src/test/java/org/apache/drill/SingleRowListener.java
@@ -68,7 +68,7 @@ public abstract class SingleRowListener implements UserResultsListener {
   }
 
   @Override
-  public void dataArrived(final QueryDataBatch result, final ConnectionThrottle throttle) {
+  public void dataArrived(QueryDataBatch result, ConnectionThrottle throttle) {
     final QueryData queryData = result.getHeader();
     if (result.hasData()) {
       final int nRows = this.nRows.addAndGet(queryData.getRowCount());
diff --git a/exec/java-exec/src/test/java/org/apache/drill/exec/physical/impl/TestNestedDateTimeTimestamp.java b/exec/java-exec/src/test/java/org/apache/drill/exec/physical/impl/TestNestedDateTimeTimestamp.java
index 8d79ebe..05697aa 100644
--- a/exec/java-exec/src/test/java/org/apache/drill/exec/physical/impl/TestNestedDateTimeTimestamp.java
+++ b/exec/java-exec/src/test/java/org/apache/drill/exec/physical/impl/TestNestedDateTimeTimestamp.java
@@ -29,16 +29,19 @@ import java.util.List;
 import java.util.Map;
 import java.util.TreeMap;
 
+import org.apache.drill.categories.FlakyTest;
 import org.apache.drill.exec.expr.fn.impl.DateUtility;
 import org.apache.drill.exec.rpc.user.QueryDataBatch;
 import org.apache.drill.test.BaseTestQuery;
 import org.apache.drill.test.TestBuilder;
 import org.junit.Assert;
 import org.junit.Test;
+import org.junit.experimental.categories.Category;
 
 /**
  * For DRILL-6242, output for Date, Time, Timestamp should use different classes
  */
+@Category(FlakyTest.class)
 public class TestNestedDateTimeTimestamp extends BaseTestQuery {
   private static final String              DATAFILE       = "cp.`datetime.parquet`";
   private static final Map<String, Object> expectedRecord = new TreeMap<String, Object>();
diff --git a/exec/java-exec/src/test/java/org/apache/drill/exec/physical/impl/writer/TestParquetWriter.java b/exec/java-exec/src/test/java/org/apache/drill/exec/physical/impl/writer/TestParquetWriter.java
index dbae776..887af0d 100644
--- a/exec/java-exec/src/test/java/org/apache/drill/exec/physical/impl/writer/TestParquetWriter.java
+++ b/exec/java-exec/src/test/java/org/apache/drill/exec/physical/impl/writer/TestParquetWriter.java
@@ -641,7 +641,7 @@ public class TestParquetWriter extends BaseTestQuery {
  * have written the correct type. For every CTAS operation we use both the readers to verify results.
  */
   @Test
-  public void testCTASWithIntervalTypes() throws Exception {
+  public void testCTASWithIntervalTypes() throws Exception { // TODO: investigate NPE errors during the test execution
     test("use dfs.tmp");
 
     String tableName = "drill_1980_t1";
diff --git a/exec/java-exec/src/test/java/org/apache/drill/exec/server/TestDrillbitResilience.java b/exec/java-exec/src/test/java/org/apache/drill/exec/server/TestDrillbitResilience.java
index 206221d..fed9c4d 100644
--- a/exec/java-exec/src/test/java/org/apache/drill/exec/server/TestDrillbitResilience.java
+++ b/exec/java-exec/src/test/java/org/apache/drill/exec/server/TestDrillbitResilience.java
@@ -19,34 +19,40 @@ package org.apache.drill.exec.server;
 
 import static org.apache.drill.exec.ExecConstants.SLICE_TARGET;
 import static org.apache.drill.exec.ExecConstants.SLICE_TARGET_DEFAULT;
-import static org.apache.drill.exec.planner.physical.PlannerSettings.HASHAGG;
+import static org.apache.drill.exec.planner.physical.PlannerSettings.ENABLE_HASH_AGG_OPTION;
 import static org.apache.drill.exec.planner.physical.PlannerSettings.PARTITION_SENDER_SET_THREADS;
-import static org.junit.Assert.assertEquals;
-import static org.junit.Assert.assertTrue;
-import static org.junit.Assert.fail;
+import static org.junit.jupiter.api.Assertions.assertEquals;
+import static org.junit.jupiter.api.Assertions.assertSame;
+import static org.junit.jupiter.api.Assertions.assertTrue;
+import static org.junit.jupiter.api.Assertions.fail;
 
 import java.io.IOException;
-import java.util.HashMap;
-import java.util.List;
-import java.util.Map;
-import java.util.Properties;
 
+import ch.qos.logback.classic.Level;
 import org.apache.commons.math3.util.Pair;
+import org.apache.drill.categories.FlakyTest;
+import org.apache.drill.exec.client.DrillClient;
+import org.apache.drill.exec.physical.impl.partitionsender.PartitionSenderRootExec;
+import org.apache.drill.exec.testing.ExecutionControlsInjector;
+import org.apache.drill.exec.work.WorkManager;
+import org.apache.drill.exec.work.foreman.Foreman;
+import org.apache.drill.exec.work.foreman.ForemanException;
+import org.apache.drill.exec.work.foreman.ForemanSetupException;
 import org.apache.drill.exec.work.foreman.FragmentsRunner;
+import org.apache.drill.exec.work.foreman.QueryStateProcessor;
 import org.apache.drill.test.BaseTestQuery;
+import org.apache.drill.test.ClusterFixture;
+import org.apache.drill.test.ClusterFixtureBuilder;
+import org.apache.drill.test.ClusterTest;
+import org.apache.drill.test.LogFixture;
 import org.apache.drill.test.QueryTestUtil;
 import org.apache.drill.SingleRowListener;
 import org.apache.drill.common.DrillAutoCloseables;
 import org.apache.drill.common.concurrent.ExtendedLatch;
-import org.apache.drill.common.config.DrillConfig;
 import org.apache.drill.common.exceptions.UserException;
 import org.apache.drill.common.types.TypeProtos.MinorType;
-import org.apache.drill.common.util.RepeatTestRule.Repeat;
 import org.apache.drill.exec.ExecConstants;
-import org.apache.drill.exec.ZookeeperHelper;
 import org.apache.drill.exec.ZookeeperTestUtil;
-import org.apache.drill.exec.client.DrillClient;
-import org.apache.drill.exec.exception.DrillbitStartupException;
 import org.apache.drill.exec.memory.BufferAllocator;
 import org.apache.drill.exec.memory.RootAllocatorFactory;
 import org.apache.drill.exec.physical.impl.ScreenCreator;
@@ -56,10 +62,7 @@ import org.apache.drill.exec.physical.impl.partitionsender.PartitionerDecorator;
 import org.apache.drill.exec.physical.impl.unorderedreceiver.UnorderedReceiverBatch;
 import org.apache.drill.exec.physical.impl.xsort.ExternalSortBatch;
 import org.apache.drill.exec.planner.sql.DrillSqlWorker;
-import org.apache.drill.exec.proto.CoordinationProtos.DrillbitEndpoint;
 import org.apache.drill.exec.proto.GeneralRPCProtos.Ack;
-import org.apache.drill.exec.proto.UserBitShared;
-import org.apache.drill.exec.proto.UserBitShared.DrillPBError;
 import org.apache.drill.exec.proto.UserBitShared.ExceptionWrapper;
 import org.apache.drill.exec.proto.UserBitShared.QueryData;
 import org.apache.drill.exec.proto.UserBitShared.QueryId;
@@ -78,18 +81,18 @@ import org.apache.drill.exec.store.pojo.PojoRecordReader;
 import org.apache.drill.exec.testing.Controls;
 import org.apache.drill.exec.testing.ControlsInjectionUtil;
 import org.apache.drill.exec.util.Pointer;
-import org.apache.drill.exec.work.foreman.Foreman;
-import org.apache.drill.exec.work.foreman.ForemanException;
-import org.apache.drill.exec.work.foreman.ForemanSetupException;
 import org.apache.drill.exec.work.fragment.FragmentExecutor;
-import org.apache.drill.test.DrillTest;
 import org.apache.drill.categories.SlowTest;
-import org.junit.After;
-import org.junit.AfterClass;
-import org.junit.BeforeClass;
-import org.junit.Ignore;
-import org.junit.Test;
-import org.junit.experimental.categories.Category;
+import org.junit.jupiter.api.AfterAll;
+import org.junit.jupiter.api.AfterEach;
+import org.junit.jupiter.api.BeforeEach;
+import org.junit.jupiter.api.RepeatedTest;
+import org.junit.jupiter.api.BeforeAll;
+import org.junit.jupiter.api.Tag;
+import org.junit.jupiter.api.Test;
+import org.junit.jupiter.api.TestInfo;
+import org.junit.jupiter.api.Timeout;
+import org.junit.jupiter.api.extension.TestInstantiationException;
 import org.slf4j.Logger;
 
 import org.apache.drill.shaded.guava.com.google.common.base.Preconditions;
@@ -98,19 +101,19 @@ import org.apache.drill.shaded.guava.com.google.common.base.Preconditions;
  * Test how resilient drillbits are to throwing exceptions during various phases of query
  * execution by injecting exceptions at various points, and to cancellations in various phases.
  */
-@Category({SlowTest.class})
-public class TestDrillbitResilience extends DrillTest {
+@Tag(SlowTest.TAG)
+@Tag(FlakyTest.TAG)
+public class TestDrillbitResilience extends ClusterTest {
   private static final Logger logger = org.slf4j.LoggerFactory.getLogger(TestDrillbitResilience.class);
-
-  private static ZookeeperHelper zkHelper;
-  private static RemoteServiceSet remoteServiceSet;
-  private static final Map<String, Drillbit> drillbits = new HashMap<>();
-  private static DrillClient drillClient;
+  protected static LogFixture logFixture;
 
   /**
-   * The number of times test (that are repeated) should be repeated.
+   * The number of times test should be repeated. For proper testing this functionality set this value to 1000
    */
   private static final int NUM_RUNS = 3;
+  private static final int PROBLEMATIC_TEST_NUM_RUNS = 3;
+  private static final int TIMEOUT = 10;
+  private final static Level CURRENT_LOG_LEVEL = Level.INFO;
 
   /**
    * Note: Counting sys.memory executes a fragment on every drillbit. This is a better check in comparison to
@@ -118,48 +121,6 @@ public class TestDrillbitResilience extends DrillTest {
    */
   private static final String TEST_QUERY = "select * from sys.memory";
 
-  private static void startDrillbit(final String name, final RemoteServiceSet remoteServiceSet) {
-    if (drillbits.containsKey(name)) {
-      throw new IllegalStateException("Drillbit named \"" + name + "\" already exists");
-    }
-
-    try {
-      final Drillbit drillbit = Drillbit.start(zkHelper.getConfig(), remoteServiceSet);
-      drillbits.put(name, drillbit);
-    } catch (final DrillbitStartupException e) {
-      throw new RuntimeException("Failed to start Drillbit \"" + name + "\"", e);
-    }
-  }
-
-  /**
-   * Shutdown the specified drillbit.
-   *
-   * @param name name of the drillbit
-   */
-  private static void stopDrillbit(final String name) {
-    final Drillbit drillbit = drillbits.get(name);
-    if (drillbit == null) {
-      throw new IllegalStateException("No Drillbit named \"" + name + "\" found");
-    }
-
-    try {
-      drillbit.close();
-    } catch (final Exception e) {
-      final String message = "Error shutting down Drillbit \"" + name + "\"";
-      logger.warn(message, e);
-    }
-  }
-
-  /**
-   * Shutdown all the drillbits.
-   */
-  private static void stopAllDrillbits() {
-    for (String name : drillbits.keySet()) {
-      stopDrillbit(name);
-    }
-    drillbits.clear();
-  }
-
   /*
    * Canned drillbit names.
    */
@@ -167,71 +128,42 @@ public class TestDrillbitResilience extends DrillTest {
   private final static String DRILLBIT_BETA = "beta";
   private final static String DRILLBIT_GAMMA = "gamma";
 
-  /**
-   * Get the endpoint for the drillbit, if it is running
-   * @param name name of the drillbit
-   * @return endpoint of the drillbit
-   */
-  private static DrillbitEndpoint getEndpoint(final String name) {
-    final Drillbit drillbit = drillbits.get(name);
-    if (drillbit == null) {
-      throw new IllegalStateException("No Drillbit named \"" + name + "\" found.");
-    }
-    return drillbit.getContext().getEndpoint();
-  }
-
-  @BeforeClass
+  @BeforeAll
   public static void startSomeDrillbits() throws Exception {
-    // turn off the HTTP server to avoid port conflicts between the drill bits
-    System.setProperty(ExecConstants.HTTP_ENABLE, "false");
-
+    logFixture = LogFixture.builder()
+      .toConsole()
+      .logger(TestDrillbitResilience.class, CURRENT_LOG_LEVEL)
+      .logger(DrillClient.class, CURRENT_LOG_LEVEL)
+      .logger(QueryStateProcessor.class, CURRENT_LOG_LEVEL)
+      .logger(WorkManager.class, CURRENT_LOG_LEVEL)
+      .logger(UnorderedReceiverBatch.class, CURRENT_LOG_LEVEL)
+      .logger(ExtendedLatch.class, CURRENT_LOG_LEVEL)
+      .logger(Foreman.class, CURRENT_LOG_LEVEL)
+      .logger(QueryStateProcessor.class, CURRENT_LOG_LEVEL)
+      .logger(ExecutionControlsInjector.class, CURRENT_LOG_LEVEL)
+      .build();
     ZookeeperTestUtil.setJaasTestConfigFile();
-
-    // turn on error for failure in cancelled fragments
-    zkHelper = new ZookeeperHelper(true, true);
-    zkHelper.startZookeeper(1);
-
-    // use a non-null service set so that the drillbits can use port hunting
-    remoteServiceSet = RemoteServiceSet.getLocalServiceSet();
-
-    // create name-addressable drillbits
-    startDrillbit(DRILLBIT_ALPHA, remoteServiceSet);
-    startDrillbit(DRILLBIT_BETA, remoteServiceSet);
-    startDrillbit(DRILLBIT_GAMMA, remoteServiceSet);
-
-    // create a client
-    final DrillConfig drillConfig = zkHelper.getConfig();
-    drillClient = QueryTestUtil.createClient(drillConfig, remoteServiceSet, 1, new Properties());
+    dirTestWatcher.start(TestDrillbitResilience.class); // until DirTestWatcher is implemented for JUnit5
+    ClusterFixtureBuilder builder = ClusterFixture.builder(dirTestWatcher)
+      .configProperty(ExecConstants.HTTP_ENABLE, false) // turn off the HTTP server to avoid port conflicts between the drill bits
+      .withBits(DRILLBIT_ALPHA, DRILLBIT_BETA, DRILLBIT_GAMMA);
+    startCluster(builder);
     clearAllInjections();
+    logger.debug("Start 3 drillbits Test Drill cluster: {}, {}, {}", DRILLBIT_ALPHA, DRILLBIT_BETA, DRILLBIT_GAMMA);
   }
 
-  @AfterClass
-  public static void shutdownAllDrillbits() {
-    if (drillClient != null) {
-      drillClient.close();
-      drillClient = null;
-    }
-
-    stopAllDrillbits();
-
-    if (remoteServiceSet != null) {
-      try {
-        remoteServiceSet.close();
-      } catch (Exception e) {
-        logger.warn("Failure on close()", e);
-      }
-      remoteServiceSet = null;
-    }
-
-    zkHelper.stopZookeeper();
+  @AfterAll
+  public static void tearDownAfterClass() throws Exception {
+    logFixture.close();
   }
 
   /**
    * Clear all injections.
    */
   private static void clearAllInjections() {
-    Preconditions.checkNotNull(drillClient);
-    ControlsInjectionUtil.clearControls(drillClient);
+    logger.debug("Clear all injections");
+    Preconditions.checkNotNull(client);
+    ControlsInjectionUtil.clearControls(client.client());
   }
 
   /**
@@ -240,323 +172,128 @@ public class TestDrillbitResilience extends DrillTest {
    * <p>The current implementation does this by counting the number of drillbits using a query.
    */
   private static void assertDrillbitsOk() {
-      final SingleRowListener listener = new SingleRowListener() {
-          private final BufferAllocator bufferAllocator = RootAllocatorFactory.newRoot(zkHelper.getConfig());
-          private final RecordBatchLoader loader = new RecordBatchLoader(bufferAllocator);
-
-          @Override
-          public void rowArrived(final QueryDataBatch queryResultBatch) {
-            // load the single record
-            final QueryData queryData = queryResultBatch.getHeader();
-            // TODO:  Clean:  DRILL-2933:  That load(...) no longer throws
-            // SchemaChangeException.
-            loader.load(queryData.getDef(), queryResultBatch.getData());
-            assertEquals(1, loader.getRecordCount());
-
-            // there should only be one column
-            final BatchSchema batchSchema = loader.getSchema();
-            assertEquals(1, batchSchema.getFieldCount());
-
-            // the column should be an integer
-            final MaterializedField countField = batchSchema.getColumn(0);
-            final MinorType fieldType = countField.getType().getMinorType();
-            assertEquals(MinorType.BIGINT, fieldType);
-
-            // get the column value
-            final VectorWrapper<?> vw = loader.iterator().next();
-            final Object obj = vw.getValueVector().getAccessor().getObject(0);
-            assertTrue(obj instanceof Long);
-            final Long countValue = (Long) obj;
-
-            // assume this means all the drillbits are still ok
-            assertEquals(drillbits.size(), countValue.intValue());
-
-            loader.clear();
-          }
+    SingleRowListener listener = new SingleRowListener() {
+      private final BufferAllocator bufferAllocator = RootAllocatorFactory.newRoot(cluster.config());
+      private final RecordBatchLoader loader = new RecordBatchLoader(bufferAllocator);
 
-          @Override
-          public void cleanup() {
-            DrillAutoCloseables.closeNoChecked(bufferAllocator);
-          }
-        };
+      @Override
+      public void rowArrived(QueryDataBatch queryResultBatch) {
+        // load the single record
+        final QueryData queryData = queryResultBatch.getHeader();
+        // TODO:  Clean:  DRILL-2933:  That load(...) no longer throws
+        // SchemaChangeException.
+        loader.load(queryData.getDef(), queryResultBatch.getData());
+        assertEquals(1, loader.getRecordCount());
+
+        // there should only be one column
+        final BatchSchema batchSchema = loader.getSchema();
+        assertEquals(1, batchSchema.getFieldCount());
+
+        // the column should be an integer
+        final MaterializedField countField = batchSchema.getColumn(0);
+        final MinorType fieldType = countField.getType().getMinorType();
+        assertEquals(MinorType.BIGINT, fieldType);
+
+        // get the column value
+        final VectorWrapper<?> vw = loader.iterator().next();
+        final Object obj = vw.getValueVector().getAccessor().getObject(0);
+        assertTrue(obj instanceof Long);
+        final Long countValue = (Long) obj;
+
+        // assume this means all the drillbits are still ok
+        assertEquals(cluster.drillbits().size(), countValue.intValue());
+
+        loader.clear();
+      }
+
+      @Override
+      public void cleanup() {
+        loader.clear();
+        DrillAutoCloseables.closeNoChecked(bufferAllocator);
+      }
+    };
 
     try {
-      QueryTestUtil.testWithListener(drillClient, QueryType.SQL, "select count(*) from sys.memory", listener);
+      QueryTestUtil.testWithListener(client.client(), QueryType.SQL, "select count(*) from sys.memory", listener);
       listener.waitForCompletion();
-      final QueryState state = listener.getQueryState();
-      assertTrue(String.format("QueryState should be COMPLETED (and not %s).", state), state == QueryState.COMPLETED);
+      QueryState state = listener.getQueryState();
+      assertSame(state, QueryState.COMPLETED, () -> String.format("QueryState should be COMPLETED (and not %s).", state));
+      assertTrue(listener.getErrorList().isEmpty(), "There should not be any errors when checking if Drillbits are OK");
     } catch (final Exception e) {
       throw new RuntimeException("Couldn't query active drillbits", e);
+    } finally {
+      logger.debug("Cleanup listener");
+      listener.cleanup();
     }
+    logger.debug("Drillbits are ok.");
+  }
 
-    final List<DrillPBError> errorList = listener.getErrorList();
-    assertTrue("There should not be any errors when checking if Drillbits are OK.", errorList.isEmpty());
+  @BeforeEach
+  void setUp(TestInfo testInfo) {
+    String testName = testInfo.getTestMethod().orElseThrow(() -> new TestInstantiationException("Can't get method neme")).getName();
+    String testOrRepetitionName = testInfo.getDisplayName();
+    if(testOrRepetitionName.startsWith("repetition")) {
+      logger.debug("{} for {} test started", testOrRepetitionName, testName);
+    } else {
+      logger.debug("{} test started", testName);
+    }
   }
 
-  @After
-  public void checkDrillbits() {
+  @AfterEach
+  public void checkDrillbits(TestInfo testInfo) {
     clearAllInjections(); // so that the drillbit check itself doesn't trigger anything
     assertDrillbitsOk(); // TODO we need a way to do this without using a query
-  }
-
-  /**
-   * Set the given controls.
-   */
-  private static void setControls(final String controls) {
-    ControlsInjectionUtil.setControls(drillClient, controls);
-  }
-
-  /**
-   * Sets a session option.
-   */
-  private static void setSessionOption(final String option, final String value) {
-    ControlsInjectionUtil.setSessionOption(drillClient, option, value);
-  }
-
-  private static void resetSessionOption(final String option) {
-    try {
-      final List<QueryDataBatch> results = drillClient.runQuery(
-        UserBitShared.QueryType.SQL, String.format("ALTER session RESET `%s`",
-          option));
-      for (final QueryDataBatch data : results) {
-        data.release();
-      }
-    } catch (final RpcException e) {
-      fail("Could not reset option: " + e.toString());
+    String testName = testInfo.getTestMethod().orElseThrow(() -> new TestInstantiationException("Can't get method neme")).getName();
+    String testOrRepetitionName = testInfo.getDisplayName();
+    if(testOrRepetitionName.startsWith("repetition")) {
+      logger.debug("{} for {} test finished", testOrRepetitionName, testName);
+    } else {
+      logger.debug("{} test finished", testName);
     }
   }
 
-  /**
-   * Check that the injected exception is what we were expecting.
-   *
-   * @param throwable      the throwable that was caught (by the test)
-   * @param exceptionClass the expected exception class
-   * @param desc           the expected exception site description
-   */
-  private static void assertExceptionMessage(final Throwable throwable, final Class<? extends Throwable> exceptionClass,
-                                             final String desc) {
-    assertTrue("Throwable was not of UserException type.", throwable instanceof UserException);
-    final ExceptionWrapper cause = ((UserException) throwable).getOrCreatePBError(false).getException();
-    assertEquals("Exception class names should match.", exceptionClass.getName(), cause.getExceptionClass());
-    assertEquals("Exception sites should match.", desc, cause.getMessage());
-  }
-
   @Test
+  @Timeout(value = TIMEOUT)
   public void settingNoOpInjectionsAndQuery() {
     final long before = countAllocatedMemory();
 
     final String controls = Controls.newBuilder()
-      .addExceptionOnBit(getClass(), "noop", RuntimeException.class, getEndpoint(DRILLBIT_BETA))
+      .addExceptionOnBit(getClass(), "noop", RuntimeException.class, cluster.drillbit(DRILLBIT_BETA).getContext().getEndpoint())
       .build();
     setControls(controls);
     final WaitUntilCompleteListener listener = new WaitUntilCompleteListener();
-    QueryTestUtil.testWithListener(drillClient, QueryType.SQL, TEST_QUERY, listener);
+    QueryTestUtil.testWithListener(client.client(), QueryType.SQL, TEST_QUERY, listener);
     final Pair<QueryState, Exception> pair = listener.waitForCompletion();
     assertStateCompleted(pair, QueryState.COMPLETED);
 
     final long after = countAllocatedMemory();
-    assertEquals(String.format("We are leaking %d bytes", after - before), before, after);
+    assertEquals(before, after, () -> String.format("We are leaking %d bytes", after - before));
   }
 
-  /**
-   * Test throwing exceptions from sites within the Foreman class, as specified by the site
-   * description
-   *
-   * @param desc site description
-   */
-  private static void testForeman(final String desc) {
-    final String controls = Controls.newBuilder()
-      .addException(Foreman.class, desc, ForemanException.class)
-      .build();
-    assertFailsWithException(controls, ForemanException.class, desc);
-  }
-
-  @Test
-  @Repeat(count = NUM_RUNS)
+  @RepeatedTest(NUM_RUNS)
+  @Timeout(value = TIMEOUT)
   public void foreman_runTryBeginning() {
     final long before = countAllocatedMemory();
 
     testForeman("run-try-beginning");
 
     final long after = countAllocatedMemory();
-    assertEquals(String.format("We are leaking %d bytes", after - before), before, after);
+    assertEquals(before, after, () -> String.format("We are leaking %d bytes", after - before));
   }
 
-  @Test
-  @Ignore // TODO(DRILL-3163, DRILL-3167)
-  //@Repeat(count = NUM_RUNS)
+  @Test // DRILL-3163, DRILL-3167
+  @Timeout(value = TIMEOUT)
   public void foreman_runTryEnd() {
     final long before = countAllocatedMemory();
 
     testForeman("run-try-end");
 
     final long after = countAllocatedMemory();
-    assertEquals(String.format("We are leaking %d bytes", after - before), before, after);
-  }
-
-  /**
-   * Tests can use this listener to wait, until the submitted query completes or fails, by
-   * calling #waitForCompletion.
-   */
-  private static class WaitUntilCompleteListener implements UserResultsListener {
-    private final ExtendedLatch latch = new ExtendedLatch(1); // to signal completion
-    protected QueryId queryId = null;
-    protected volatile Pointer<Exception> ex = new Pointer<>();
-    protected volatile QueryState state = null;
-
-    /**
-     * Method that sets the exception if the condition is not met.
-     */
-    protected final void check(final boolean condition, final String format, final Object... args) {
-      if (!condition) {
-        ex.value = new IllegalStateException(String.format(format, args));
-      }
-    }
-
-    /**
-     * Method that cancels and resumes the query, in order.
-     */
-    protected final void cancelAndResume() {
-      Preconditions.checkNotNull(queryId);
-      final ExtendedLatch trigger = new ExtendedLatch(1);
-      (new CancellingThread(queryId, ex, trigger)).start();
-      (new ResumingThread(queryId, ex, trigger)).start();
-    }
-
-    @Override
-    public void queryIdArrived(final QueryId queryId) {
-      this.queryId = queryId;
-    }
-
-    @Override
-    public void submissionFailed(final UserException ex) {
-      this.ex.value = ex;
-      state = QueryState.FAILED;
-      latch.countDown();
-    }
-
-    @Override
-    public void queryCompleted(final QueryState state) {
-      this.state = state;
-      latch.countDown();
-    }
-
-    @Override
-    public void dataArrived(final QueryDataBatch result, final ConnectionThrottle throttle) {
-      result.release();
-    }
-
-    public final Pair<QueryState, Exception> waitForCompletion() {
-      latch.awaitUninterruptibly();
-      return new Pair<>(state, ex.value);
-    }
-  }
-
-  private static class ListenerThatCancelsQueryAfterFirstBatchOfData extends WaitUntilCompleteListener {
-    private boolean cancelRequested = false;
-
-    @Override
-    public void dataArrived(final QueryDataBatch result, final ConnectionThrottle throttle) {
-      if (!cancelRequested) {
-        check(queryId != null, "Query id should not be null, since we have waited long enough.");
-        (new CancellingThread(queryId, ex, null)).start();
-        cancelRequested = true;
-      }
-      result.release();
-    }
-  }
-
-  /**
-   * Thread that cancels the given query id. After the cancel is acknowledged, the latch is counted down.
-   */
-  private static class CancellingThread extends Thread {
-    private final QueryId queryId;
-    private final Pointer<Exception> ex;
-    private final ExtendedLatch latch;
-
-    public CancellingThread(final QueryId queryId, final Pointer<Exception> ex, final ExtendedLatch latch) {
-      this.queryId = queryId;
-      this.ex = ex;
-      this.latch = latch;
-    }
-
-    @Override
-    public void run() {
-      final DrillRpcFuture<Ack> cancelAck = drillClient.cancelQuery(queryId);
-      try {
-        cancelAck.checkedGet();
-      } catch (final RpcException ex) {
-        this.ex.value = ex;
-      }
-      if (latch != null) {
-        latch.countDown();
-      }
-    }
-  }
-
-  /**
-   * Thread that resumes the given query id. After the latch is counted down, the resume signal is sent, until then
-   * the thread waits without interruption.
-   */
-  private static class ResumingThread extends Thread {
-    private final QueryId queryId;
-    private final Pointer<Exception> ex;
-    private final ExtendedLatch latch;
-
-    public ResumingThread(final QueryId queryId, final Pointer<Exception> ex, final ExtendedLatch latch) {
-      this.queryId = queryId;
-      this.ex = ex;
-      this.latch = latch;
-    }
-
-    @Override
-    public void run() {
-      latch.awaitUninterruptibly();
-      final DrillRpcFuture<Ack> resumeAck = drillClient.resumeQuery(queryId);
-      try {
-        resumeAck.checkedGet();
-      } catch (final RpcException ex) {
-        this.ex.value = ex;
-      }
-    }
-  }
-
-  /**
-   * Given the result of {@link WaitUntilCompleteListener#waitForCompletion},
-   * this method fails if the completed state is not as expected, or if an
-   * exception is thrown. The completed state could be COMPLETED or CANCELED.
-   * This state is set when {@link WaitUntilCompleteListener#queryCompleted} is
-   * called.
-   */
-  private static void assertStateCompleted(final Pair<QueryState, Exception> result, final QueryState expectedState) {
-    final QueryState actualState = result.getFirst();
-    final Exception exception = result.getSecond();
-    if (actualState != expectedState || exception != null) {
-      fail(String.format("Query state is incorrect (expected: %s, actual: %s) AND/OR \nException thrown: %s",
-        expectedState, actualState, exception == null ? "none." : exception));
-    }
-  }
-
-  /**
-   * Given a set of controls, this method ensures that the given query completes with a CANCELED state.
-   */
-  private static void assertCancelledWithoutException(final String controls, final WaitUntilCompleteListener listener,
-                                                      final String query) {
-    setControls(controls);
-
-    QueryTestUtil.testWithListener(drillClient, QueryType.SQL, query, listener);
-    final Pair<QueryState, Exception> result = listener.waitForCompletion();
-    assertStateCompleted(result, QueryState.CANCELED);
-  }
-
-  /**
-   * Given a set of controls, this method ensures that the TEST_QUERY completes with a CANCELED state.
-   */
-  private static void assertCancelledWithoutException(final String controls, final WaitUntilCompleteListener listener) {
-    assertCancelledWithoutException(controls, listener, TEST_QUERY);
+    assertEquals(before, after, () -> String.format("We are leaking %d bytes", after - before));
   }
 
   @Test // To test pause and resume. Test hangs and times out if resume did not happen.
+  @Timeout(value = TIMEOUT)
   public void passThrough() {
     final long before = countAllocatedMemory();
 
@@ -575,20 +312,19 @@ public class TestDrillbitResilience extends DrillTest {
       .build();
     setControls(controls);
 
-    QueryTestUtil.testWithListener(drillClient, QueryType.SQL, TEST_QUERY, listener);
+    QueryTestUtil.testWithListener(client.client(), QueryType.SQL, TEST_QUERY, listener);
     final Pair<QueryState, Exception> result = listener.waitForCompletion();
     assertStateCompleted(result, QueryState.COMPLETED);
 
     final long after = countAllocatedMemory();
-    assertEquals(String.format("We are leaking %d bytes", after - before), before, after);
+    assertEquals(before, after, () -> String.format("We are leaking %d bytes", after - before));
   }
 
   // DRILL-3052: Since root fragment is waiting on data and leaf fragments are cancelled before they send any
   // data to root, root will never run. This test will timeout if the root did not send the final state to Foreman.
   // DRILL-2383: Cancellation TC 1: cancel before any result set is returned.
-  @Test
-  @Ignore // TODO(DRILL-3192)
-  //@Repeat(count = NUM_RUNS)
+  @RepeatedTest(PROBLEMATIC_TEST_NUM_RUNS) // DRILL-3192
+  @Timeout(value = TIMEOUT)
   public void cancelWhenQueryIdArrives() {
     final long before = countAllocatedMemory();
 
@@ -597,7 +333,7 @@ public class TestDrillbitResilience extends DrillTest {
       @Override
       public void queryIdArrived(final QueryId queryId) {
         super.queryIdArrived(queryId);
-        cancelAndResume();
+        cancelAndResume(true);
       }
     };
 
@@ -607,12 +343,11 @@ public class TestDrillbitResilience extends DrillTest {
     assertCancelledWithoutException(controls, listener);
 
     final long after = countAllocatedMemory();
-    assertEquals(String.format("We are leaking %d bytes", after - before), before, after);
+    assertEquals(before, after, () -> String.format("We are leaking %d bytes", after - before));
   }
 
-  @Test // DRILL-2383: Cancellation TC 2: cancel in the middle of fetching result set
-  @Repeat(count = NUM_RUNS)
-  @Ignore("DRILL-6228")
+  @RepeatedTest(NUM_RUNS) // DRILL-6228
+  @Timeout(value = TIMEOUT) // DRILL-2383: Cancellation TC 2: cancel in the middle of fetching result set
   public void cancelInMiddleOfFetchingResults() {
     final long before = countAllocatedMemory();
 
@@ -623,7 +358,7 @@ public class TestDrillbitResilience extends DrillTest {
       public void dataArrived(final QueryDataBatch result, final ConnectionThrottle throttle) {
         if (!cancelRequested) {
           check(queryId != null, "Query id should not be null, since we have waited long enough.");
-          cancelAndResume();
+          cancelAndResume(false);
           cancelRequested = true;
         }
         result.release();
@@ -637,13 +372,12 @@ public class TestDrillbitResilience extends DrillTest {
     assertCancelledWithoutException(controls, listener);
 
     final long after = countAllocatedMemory();
-    assertEquals(String.format("We are leaking %d bytes", after - before), before, after);
+    assertEquals(before, after, () -> String.format("We are leaking %d bytes", after - before));
   }
 
 
-  @Test // DRILL-2383: Cancellation TC 3: cancel after all result set are produced but not all are fetched
-  @Repeat(count = NUM_RUNS)
-  @Ignore("DRILL-6228")
+  @RepeatedTest(NUM_RUNS) // DRILL-6228
+  @Timeout(value = TIMEOUT) // DRILL-2383: Cancellation TC 3: cancel after all result set are produced but not all are fetched
   public void cancelAfterAllResultsProduced() {
     final long before = countAllocatedMemory();
 
@@ -652,9 +386,9 @@ public class TestDrillbitResilience extends DrillTest {
 
       @Override
       public void dataArrived(final QueryDataBatch result, final ConnectionThrottle throttle) {
-        if (++count == drillbits.size()) {
+        if (lastDrillbit()) {
           check(queryId != null, "Query id should not be null, since we have waited long enough.");
-          cancelAndResume();
+          cancelAndResume(false);
         }
         result.release();
       }
@@ -666,23 +400,23 @@ public class TestDrillbitResilience extends DrillTest {
     assertCancelledWithoutException(controls, listener);
 
     final long after = countAllocatedMemory();
-    assertEquals(String.format("We are leaking %d bytes", after - before), before, after);
+    assertEquals(before, after, () -> String.format("We are leaking %d bytes", after - before));
   }
 
-  @Test // DRILL-2383: Cancellation TC 4: cancel after everything is completed and fetched
-  @Repeat(count = NUM_RUNS)
-  @Ignore("DRILL-3967")
+  @RepeatedTest(PROBLEMATIC_TEST_NUM_RUNS) // DRILL-3967
+  @Timeout(value = TIMEOUT) // DRILL-2383: Cancellation TC 4: cancel after everything is completed and fetched
   public void cancelAfterEverythingIsCompleted() {
     final long before = countAllocatedMemory();
 
     final WaitUntilCompleteListener listener = new WaitUntilCompleteListener() {
-      private int count = 0;
 
       @Override
       public void dataArrived(final QueryDataBatch result, final ConnectionThrottle throttle) {
-        if (++count == drillbits.size()) {
+        if (lastDrillbit()) {
           check(queryId != null, "Query id should not be null, since we have waited long enough.");
-          cancelAndResume();
+          // need to wait until all batches are processed, since foreman-cleanup - the pause that happened earlier,
+          // than the client accepts queryCompleted() from UserResultsListener
+          cancelAndResume(true);
         }
         result.release();
       }
@@ -691,45 +425,28 @@ public class TestDrillbitResilience extends DrillTest {
     final String controls = Controls.newBuilder()
       .addPause(Foreman.class, "foreman-cleanup")
       .build();
-    assertCancelledWithoutException(controls, listener);
+    assertCompletedWithoutException(controls, listener); // changed from CANCELLED
 
     final long after = countAllocatedMemory();
-    assertEquals(String.format("We are leaking %d bytes", after - before), before, after);
+    assertEquals(before, after, () -> String.format("We are leaking %d bytes", after - before));
   }
 
   @Test // DRILL-2383: Completion TC 1: success
+  @Timeout(value = TIMEOUT)
   public void successfullyCompletes() {
     final long before = countAllocatedMemory();
 
     final WaitUntilCompleteListener listener = new WaitUntilCompleteListener();
-    QueryTestUtil.testWithListener(drillClient, QueryType.SQL, TEST_QUERY, listener);
+    QueryTestUtil.testWithListener(client.client(), QueryType.SQL, TEST_QUERY, listener);
     final Pair<QueryState, Exception> result = listener.waitForCompletion();
     assertStateCompleted(result, QueryState.COMPLETED);
 
     final long after = countAllocatedMemory();
-    assertEquals(String.format("We are leaking %d bytes", after - before), before, after);
-  }
-
-  /**
-   * Given a set of controls, this method ensures TEST_QUERY fails with the given class and desc.
-   */
-  private static void assertFailsWithException(final String controls, final Class<? extends Throwable> exceptionClass,
-                                               final String exceptionDesc, final String query) {
-    setControls(controls);
-    final WaitUntilCompleteListener listener = new WaitUntilCompleteListener();
-    QueryTestUtil.testWithListener(drillClient, QueryType.SQL, query, listener);
-    final Pair<QueryState, Exception> result = listener.waitForCompletion();
-    final QueryState state = result.getFirst();
-    assertTrue(String.format("Query state should be FAILED (and not %s).", state), state == QueryState.FAILED);
-    assertExceptionMessage(result.getSecond(), exceptionClass, exceptionDesc);
-  }
-
-  private static void assertFailsWithException(final String controls, final Class<? extends Throwable> exceptionClass,
-                                               final String exceptionDesc) {
-    assertFailsWithException(controls, exceptionClass, exceptionDesc, TEST_QUERY);
+    assertEquals(before, after, () -> String.format("We are leaking %d bytes", after - before));
   }
 
   @Test // DRILL-2383: Completion TC 2: failed query - before query is executed - while sql parsing
+  @Timeout(value = TIMEOUT)
   public void failsWhenParsing() {
     final long before = countAllocatedMemory();
 
@@ -743,26 +460,27 @@ public class TestDrillbitResilience extends DrillTest {
     assertFailsWithException(controls, exceptionClass, exceptionDesc);
 
     final long after = countAllocatedMemory();
-    assertEquals(String.format("We are leaking %d bytes", after - before), before, after);
+    assertEquals(before, after, () -> String.format("We are leaking %d bytes", after - before));
   }
 
-  @Test // DRILL-2383: Completion TC 3: failed query - before query is executed - while sending fragments to other
-  // drillbits
+  @Test // DRILL-2383: Completion TC 3: failed query - before query is executed, while sending fragments to other drillbits
+  @Timeout(value = TIMEOUT)
   public void failsWhenSendingFragments() {
     final long before = countAllocatedMemory();
 
     final String exceptionDesc = "send-fragments";
     final Class<? extends Throwable> exceptionClass = ForemanException.class;
     final String controls = Controls.newBuilder()
-    .addException(FragmentsRunner.class, exceptionDesc, exceptionClass)
+      .addException(FragmentsRunner.class, exceptionDesc, exceptionClass)
       .build();
     assertFailsWithException(controls, exceptionClass, exceptionDesc);
 
     final long after = countAllocatedMemory();
-    assertEquals(String.format("We are leaking %d bytes", after - before), before, after);
+    assertEquals(before, after, () -> String.format("We are leaking %d bytes", after - before));
   }
 
   @Test // DRILL-2383: Completion TC 4: failed query - during query execution
+  @Timeout(value = TIMEOUT)
   public void failsDuringExecution() {
     final long before = countAllocatedMemory();
 
@@ -774,15 +492,15 @@ public class TestDrillbitResilience extends DrillTest {
     assertFailsWithException(controls, exceptionClass, exceptionDesc);
 
     final long after = countAllocatedMemory();
-    assertEquals(String.format("We are leaking %d bytes", after - before), before, after);
+    assertEquals(before, after, () -> String.format("We are leaking %d bytes", after - before));
   }
 
   /**
    * Test canceling query interrupts currently blocked FragmentExecutor threads waiting for some event to happen.
    * Specifically tests canceling fragment which has {@link MergingRecordBatch} blocked waiting for data.
    */
-  @Test
-  @Repeat(count = NUM_RUNS)
+  @RepeatedTest(NUM_RUNS)
+  @Timeout(value = TIMEOUT)
   public void interruptingBlockedMergingRecordBatch() {
     final long before = countAllocatedMemory();
 
@@ -792,38 +510,26 @@ public class TestDrillbitResilience extends DrillTest {
     interruptingBlockedFragmentsWaitingForData(control);
 
     final long after = countAllocatedMemory();
-    assertEquals(String.format("We are leaking %d bytes", after - before), before, after);
+    assertEquals(before, after, () -> String.format("We are leaking %d bytes", after - before));
   }
 
   /**
    * Test canceling query interrupts currently blocked FragmentExecutor threads waiting for some event to happen.
    * Specifically tests canceling fragment which has {@link UnorderedReceiverBatch} blocked waiting for data.
    */
-  @Test
-  @Repeat(count = NUM_RUNS)
+  @RepeatedTest(PROBLEMATIC_TEST_NUM_RUNS)
+  @Timeout(value = TIMEOUT)
   public void interruptingBlockedUnorderedReceiverBatch() {
     final long before = countAllocatedMemory();
 
     final String control = Controls.newBuilder()
       .addPause(UnorderedReceiverBatch.class, "waiting-for-data", 1)
       .build();
+    logger.debug("Start interruptingBlockedFragmentsWaitingForData");
     interruptingBlockedFragmentsWaitingForData(control);
 
     final long after = countAllocatedMemory();
-    assertEquals(String.format("We are leaking %d bytes", after - before), before, after);
-  }
-
-  private static void interruptingBlockedFragmentsWaitingForData(final String control) {
-    try {
-      setSessionOption(SLICE_TARGET, "1");
-      setSessionOption(HASHAGG.getOptionName(), "false");
-
-      final String query = "SELECT sales_city, COUNT(*) cnt FROM cp.`region.json` GROUP BY sales_city";
-      assertCancelledWithoutException(control, new ListenerThatCancelsQueryAfterFirstBatchOfData(), query);
-    } finally {
-      resetSessionOption(SLICE_TARGET);
-      resetSessionOption(HASHAGG.getOptionName());
-    }
+    assertEquals(before, after, () -> String.format("We are leaking %d bytes", after - before));
   }
 
   /**
@@ -831,13 +537,13 @@ public class TestDrillbitResilience extends DrillTest {
    * {@link PartitionSenderRootExec} spawns threads for partitioner. Interrupting fragment thread should also interrupt
    * the partitioner threads.
    */
-  @Test
-  @Repeat(count = NUM_RUNS)
+  @RepeatedTest(NUM_RUNS)
+  @Timeout(value = TIMEOUT)
   public void interruptingPartitionerThreadFragment() {
     try {
-      setSessionOption(SLICE_TARGET, "1");
-      setSessionOption(HASHAGG.getOptionName(), "true");
-      setSessionOption(PARTITION_SENDER_SET_THREADS.getOptionName(), "6");
+      client.alterSession(SLICE_TARGET, "1");
+      client.alterSession(ENABLE_HASH_AGG_OPTION, "true");
+      client.alterSession(PARTITION_SENDER_SET_THREADS.getOptionName(), "6");
 
       final long before = countAllocatedMemory();
 
@@ -850,17 +556,16 @@ public class TestDrillbitResilience extends DrillTest {
       assertCancelledWithoutException(controls, new ListenerThatCancelsQueryAfterFirstBatchOfData(), query);
 
       final long after = countAllocatedMemory();
-      assertEquals(String.format("We are leaking %d bytes", after - before), before, after);
+      assertEquals(before, after, () -> String.format("We are leaking %d bytes", after - before));
     } finally {
-      resetSessionOption(SLICE_TARGET);
-      resetSessionOption(HASHAGG.getOptionName());
-      resetSessionOption(PARTITION_SENDER_SET_THREADS.getOptionName());
+      client.resetSession(SLICE_TARGET);
+      client.resetSession(ENABLE_HASH_AGG_OPTION);
+      client.resetSession(PARTITION_SENDER_SET_THREADS.getOptionName());
     }
   }
 
-  @Test
-  @Ignore // TODO(DRILL-3193)
-  //@Repeat(count = NUM_RUNS)
+  @Test // DRILL-3193
+  @Timeout(value = TIMEOUT)
   public void interruptingWhileFragmentIsBlockedInAcquiringSendingTicket() {
     final long before = countAllocatedMemory();
 
@@ -870,13 +575,13 @@ public class TestDrillbitResilience extends DrillTest {
     assertCancelledWithoutException(control, new ListenerThatCancelsQueryAfterFirstBatchOfData());
 
     final long after = countAllocatedMemory();
-    assertEquals(String.format("We are leaking %d bytes", after - before), before, after);
+    assertEquals(before, after, () -> String.format("We are leaking %d bytes", after - before));
   }
 
-  @Test
-  @Repeat(count = NUM_RUNS)
+  @RepeatedTest(PROBLEMATIC_TEST_NUM_RUNS)
+  @Timeout(value = TIMEOUT)
   public void memoryLeaksWhenCancelled() {
-    setSessionOption(SLICE_TARGET, "10");
+    client.alterSession(SLICE_TARGET, "10");
 
     final long before = countAllocatedMemory();
 
@@ -889,7 +594,7 @@ public class TestDrillbitResilience extends DrillTest {
         query = BaseTestQuery.getFile("queries/tpch/09.sql");
         query = query.substring(0, query.length() - 1); // drop the ";"
       } catch (final IOException e) {
-        fail("Failed to get query file: " + e);
+        fail("Failed to get query file", e);
       }
 
       final WaitUntilCompleteListener listener = new WaitUntilCompleteListener() {
@@ -899,7 +604,7 @@ public class TestDrillbitResilience extends DrillTest {
         public void dataArrived(final QueryDataBatch result, final ConnectionThrottle throttle) {
           if (!cancelRequested) {
             check(queryId != null, "Query id should not be null, since we have waited long enough.");
-            cancelAndResume();
+            cancelAndResume(false);
             cancelRequested = true;
           }
           result.release();
@@ -909,17 +614,16 @@ public class TestDrillbitResilience extends DrillTest {
       assertCancelledWithoutException(controls, listener, query);
 
       final long after = countAllocatedMemory();
-      assertEquals(String.format("We are leaking %d bytes", after - before), before, after);
+      assertEquals(before, after, () -> String.format("We are leaking %d bytes", after - before));
     } finally {
-      setSessionOption(SLICE_TARGET, Long.toString(SLICE_TARGET_DEFAULT));
+      client.alterSession(SLICE_TARGET, Long.toString(SLICE_TARGET_DEFAULT));
     }
   }
 
-  @Test
-  @Ignore // TODO(DRILL-3194)
-  //@Repeat(count = NUM_RUNS)
+  @RepeatedTest(NUM_RUNS) // DRILL-3194
+  @Timeout(value = TIMEOUT)
   public void memoryLeaksWhenFailed() {
-    setSessionOption(SLICE_TARGET, "10");
+    client.alterSession(SLICE_TARGET, "10");
 
     final long before = countAllocatedMemory();
 
@@ -941,14 +645,15 @@ public class TestDrillbitResilience extends DrillTest {
       assertFailsWithException(controls, exceptionClass, exceptionDesc, query);
 
       final long after = countAllocatedMemory();
-      assertEquals(String.format("We are leaking %d bytes", after - before), before, after);
+      assertEquals(before, after, () -> String.format("We are leaking %d bytes", after - before));
 
     } finally {
-      setSessionOption(SLICE_TARGET, Long.toString(SLICE_TARGET_DEFAULT));
+      client.alterSession(SLICE_TARGET, Long.toString(SLICE_TARGET_DEFAULT));
     }
   }
 
-  @Test // DRILL-3065
+  @Test
+  @Timeout(value = TIMEOUT) // DRILL-3065
   public void failsAfterMSorterSorting() {
 
     // Note: must use an input table that returns more than one
@@ -966,10 +671,11 @@ public class TestDrillbitResilience extends DrillTest {
     assertFailsWithException(controls, typeOfException, ExternalSortBatch.INTERRUPTION_AFTER_SORT, query);
 
     final long after = countAllocatedMemory();
-    assertEquals(String.format("We are leaking %d bytes", after - before), before, after);
+    assertEquals(before, after, () -> String.format("We are leaking %d bytes", after - before));
   }
 
-  @Test // DRILL-3085
+  @Test
+  @Timeout(value = TIMEOUT) // DRILL-3085
   public void failsAfterMSorterSetup() {
 
     // Note: must use an input table that returns more than one
@@ -987,22 +693,305 @@ public class TestDrillbitResilience extends DrillTest {
     assertFailsWithException(controls, typeOfException, ExternalSortBatch.INTERRUPTION_AFTER_SETUP, query);
 
     final long after = countAllocatedMemory();
-    assertEquals(String.format("We are leaking %d bytes", after - before), before, after);
+    assertEquals(before, after, () -> String.format("We are leaking %d bytes", after - before));
+  }
+
+  /**
+   * Tests can use this listener to wait, until the submitted query completes or fails, by
+   * calling #waitForCompletion.
+   */
+  private static class WaitUntilCompleteListener implements UserResultsListener {
+    protected final ExtendedLatch latch = new ExtendedLatch(1); // to signal completion
+    protected QueryId queryId = null;
+    protected volatile Pointer<Exception> ex = new Pointer<>();
+    protected volatile QueryState state = null;
+    private int count = 0;
+
+    /**
+     * Method that sets the exception if the condition is not met.
+     */
+    protected final void check(final boolean condition, final String format, final Object... args) {
+      if (!condition) {
+        ex.value = new IllegalStateException(String.format(format, args));
+      }
+    }
+
+    /**
+     * Method that cancels and resumes the query, in order.
+     */
+    protected final void cancelAndResume(boolean sleepBeforeStart) {
+      Preconditions.checkNotNull(queryId);
+      final ExtendedLatch trigger = new ExtendedLatch(1);
+      (new CancellingThread(queryId, ex, trigger, sleepBeforeStart)).start();
+      (new ResumingThread(queryId, ex, trigger)).start();
+    }
+
+    @Override
+    public void queryIdArrived(final QueryId queryId) {
+      this.queryId = queryId;
+    }
+
+    @Override
+    public void submissionFailed(final UserException ex) {
+      this.ex.value = ex;
+      state = QueryState.FAILED;
+      latch.countDown();
+    }
+
+    @Override
+    public void queryCompleted(final QueryState state) {
+      this.state = state;
+      latch.countDown();
+    }
+
+    @Override
+    public void dataArrived(final QueryDataBatch result, final ConnectionThrottle throttle) {
+      result.release();
+    }
+
+    public final Pair<QueryState, Exception> waitForCompletion() {
+      try {
+        logger.debug("Wait for completion. latch: {}", latch.getCount());
+        latch.await(); // awaitUninterruptibly method usage here prevents JUnit timeout work
+      } catch (InterruptedException e) {
+        logger.error("Interrupted while waiting for event latch");
+      }
+      logger.debug("Completed. Wait finished");
+      return new Pair<>(state, ex.value);
+    }
+
+    /**
+     * It is useful for queries returned the data from all drillbits. Use this method to make sure {@link #dataArrived}
+     * returns the data from every drillbit
+     *
+     * @return true, in case current listener methods return results from last drillbit already
+     */
+    boolean lastDrillbit() {
+      return ++count == cluster.drillbits().size();
+    }
+  }
+
+  private static class ListenerThatCancelsQueryAfterFirstBatchOfData extends WaitUntilCompleteListener {
+    private boolean cancelRequested = false;
+
+    @Override
+    public void dataArrived(final QueryDataBatch result, final ConnectionThrottle throttle) {
+      if (!cancelRequested) {
+        logger.debug("First batch arrived, so cancelling thread started");
+        check(queryId != null, "Query id should not be null, since we have waited long enough.");
+        (new CancellingThread(queryId, ex, null, false)).start();
+        cancelRequested = true;
+      }
+      result.release();
+    }
+  }
+
+  /**
+   * Thread that cancels the given query id. After the cancel is acknowledged, the latch is counted down.
+   */
+  private static class CancellingThread extends Thread {
+    private final QueryId queryId;
+    private final Pointer<Exception> ex;
+    private final ExtendedLatch latch;
+    private final boolean sleepBeforeStart;
+
+    public CancellingThread(QueryId queryId, Pointer<Exception> ex, ExtendedLatch latch, boolean sleepBeforeStart) {
+      this.queryId = queryId;
+      this.ex = ex;
+      this.latch = latch;
+      this.sleepBeforeStart = sleepBeforeStart;
+      logger.debug("Cancelling thread created");
+    }
+
+    @Override
+    public void run() {
+      if(sleepBeforeStart) {
+        try {
+          Thread.sleep(1000);
+        } catch (InterruptedException e) {
+          logger.debug("Sleep thread interrupted. Ignore it");
+          // just ignore
+        }
+      }
+      logger.debug("Cancelling {} query started", queryId);
+      final DrillRpcFuture<Ack> cancelAck = client.client().cancelQuery(queryId);
+      logger.debug("Check future: {}", cancelAck);
+      try {
+        Ack ack = cancelAck.checkedGet();
+        logger.debug("Sleep thread for 0.01 seconds");
+        Thread.sleep(10);
+        logger.debug("Ack: {}", ack);
+      } catch (RpcException ex) {
+        this.ex.value = ex;
+        logger.debug("The query wasn't canceled." + ex);
+      } catch (InterruptedException e) {
+        logger.debug("Sleep thread interrupted. Ignore it");
+        // just ignore
+      }
+      if (latch != null) {
+        latch.countDown();
+      }
+      logger.debug("Finish cancelling thread");
+    }
+  }
+
+  /**
+   * Thread that resumes the given query id. After the latch is counted down, the resume signal is sent, until then
+   * the thread waits without interruption.
+   */
+  private static class ResumingThread extends Thread {
+    private final QueryId queryId;
+    private final Pointer<Exception> ex;
+    private final ExtendedLatch latch;
+
+    public ResumingThread(final QueryId queryId, final Pointer<Exception> ex, final ExtendedLatch latch) {
+      this.queryId = queryId;
+      this.ex = ex;
+      this.latch = latch;
+      logger.debug("Cancelling thread created");
+    }
+
+    @Override
+    public void run() {
+      latch.awaitUninterruptibly();
+      final DrillRpcFuture<Ack> resumeAck = client.client().resumeQuery(queryId);
+      try {
+        resumeAck.checkedGet();
+      } catch (final RpcException ex) {
+        this.ex.value = ex;
+      }
+    }
   }
 
-  private static long countAllocatedMemory() {
+  /**
+   * Given the result of {@link WaitUntilCompleteListener#waitForCompletion},
+   * this method fails if the completed state is not as expected, or if an
+   * exception is thrown. The completed state could be COMPLETED or CANCELED.
+   * This state is set when {@link WaitUntilCompleteListener#queryCompleted} is
+   * called.
+   */
+  private void assertStateCompleted(final Pair<QueryState, Exception> result, final QueryState expectedState) {
+    final QueryState actualState = result.getFirst();
+    final Exception exception = result.getSecond();
+    if (actualState != expectedState || exception != null) {
+      fail(String.format("Query state is incorrect (expected: %s, actual: %s) AND/OR \nException thrown: %s",
+        expectedState, actualState, exception == null ? "none." : exception));
+    }
+  }
+
+  /**
+   * Given a set of controls, this method ensures that the given query completes with a {@link QueryState#CANCELED} state.</p>
+   */
+  private void assertCancelledWithoutException(final String controls, final WaitUntilCompleteListener listener,
+                                                      final String query) {
+    setControls(controls);
+
+    QueryTestUtil.testWithListener(client.client(), QueryType.SQL, query, listener);
+    final Pair<QueryState, Exception> result = listener.waitForCompletion();
+    assertStateCompleted(result, QueryState.CANCELED);
+  }
+
+  /**
+   * Given a set of controls, this method ensures that the given query completes with a {@link QueryState#COMPLETED} state.</p>
+   * {@link QueryState#COMPLETED} is a terminal state and can't be canceled.
+   * See more: {@link org.apache.drill.exec.work.foreman.QueryStateProcessor#cancel}
+   */
+  private void assertCompletedWithoutException(final String controls, final WaitUntilCompleteListener listener) {
+    setControls(controls);
+
+    QueryTestUtil.testWithListener(client.client(), QueryType.SQL, TEST_QUERY, listener);
+    assertStateCompleted(listener.waitForCompletion(), QueryState.COMPLETED);
+  }
+
+  /**
+   * Given a set of controls, this method ensures that the TEST_QUERY completes with a CANCELED state.
+   */
+  private void assertCancelledWithoutException(final String controls, final WaitUntilCompleteListener listener) {
+    assertCancelledWithoutException(controls, listener, TEST_QUERY);
+  }
+
+  private long countAllocatedMemory() {
     // wait to make sure all fragments finished cleaning up
     try {
-      Thread.sleep(2000);
-    } catch (final InterruptedException e) {
+      logger.debug("Sleep thread for 0.05 seconds");
+      Thread.sleep(500);
+    } catch (InterruptedException e) {
+      logger.debug("Sleep thread interrupted. Ignore it", e);
       // just ignore
     }
 
     long allocated = 0;
-    for (final String name : drillbits.keySet()) {
-      allocated += drillbits.get(name).getContext().getAllocator().getAllocatedMemory();
+    for (Drillbit drillbit : cluster.drillbits()) {
+      allocated += drillbit.getContext().getAllocator().getAllocatedMemory();
     }
-
+    logger.debug("Allocated memory: " + allocated);
     return allocated;
   }
+
+  private void interruptingBlockedFragmentsWaitingForData(final String control) {
+    try {
+      client.alterSession(SLICE_TARGET, "1");
+      client.alterSession(ENABLE_HASH_AGG_OPTION, "false");
+
+      final String query = "SELECT sales_city, COUNT(*) cnt FROM cp.`region.json` GROUP BY sales_city";
+      assertCancelledWithoutException(control, new ListenerThatCancelsQueryAfterFirstBatchOfData(), query);
+    } finally {
+      client.resetSession(SLICE_TARGET);
+      client.resetSession(ENABLE_HASH_AGG_OPTION);
+    }
+  }
+
+  /**
+   * Given a set of controls, this method ensures TEST_QUERY fails with the given class and desc.
+   */
+  private void assertFailsWithException(final String controls, final Class<? extends Throwable> exceptionClass,
+                                               final String exceptionDesc, final String query) {
+    setControls(controls);
+    final WaitUntilCompleteListener listener = new WaitUntilCompleteListener();
+    QueryTestUtil.testWithListener(client.client(), QueryType.SQL, query, listener);
+    final Pair<QueryState, Exception> result = listener.waitForCompletion();
+    final QueryState state = result.getFirst();
+    assertSame(state, QueryState.FAILED, () -> String.format("Query state should be FAILED (and not %s).", state));
+    assertExceptionMessage(result.getSecond(), exceptionClass, exceptionDesc);
+  }
+
+  private void assertFailsWithException(final String controls, final Class<? extends Throwable> exceptionClass,
+                                               final String exceptionDesc) {
+    assertFailsWithException(controls, exceptionClass, exceptionDesc, TEST_QUERY);
+  }
+
+  /**
+   * Test throwing exceptions from sites within the Foreman class, as specified by the site
+   * description
+   *
+   * @param desc site description
+   */
+  private void testForeman(final String desc) {
+    final String controls = Controls.newBuilder()
+      .addException(Foreman.class, desc, ForemanException.class)
+      .build();
+    assertFailsWithException(controls, ForemanException.class, desc);
+  }
+
+  /**
+   * Set the given controls.
+   */
+  private void setControls(final String controls) {
+    ControlsInjectionUtil.setControls(client.client(), controls);
+  }
+
+  /**
+   * Check that the injected exception is what we were expecting.
+   *
+   * @param throwable      the throwable that was caught (by the test)
+   * @param exceptionClass the expected exception class
+   * @param desc           the expected exception site description
+   */
+  private void assertExceptionMessage(final Throwable throwable, final Class<? extends Throwable> exceptionClass,
+                                      final String desc) {
+    assertTrue(throwable instanceof UserException, "Throwable was not of UserException type");
+    final ExceptionWrapper cause = ((UserException) throwable).getOrCreatePBError(false).getException();
+    assertEquals(exceptionClass.getName(), cause.getExceptionClass(), "Exception class names should match");
+    assertEquals(desc, cause.getMessage(), "Exception sites should match.");
+  }
 }
diff --git a/exec/java-exec/src/test/java/org/apache/drill/exec/store/easy/json/loader/TestExtendedTypes.java b/exec/java-exec/src/test/java/org/apache/drill/exec/store/easy/json/loader/TestExtendedTypes.java
index 7c46e02..ae69b46 100644
--- a/exec/java-exec/src/test/java/org/apache/drill/exec/store/easy/json/loader/TestExtendedTypes.java
+++ b/exec/java-exec/src/test/java/org/apache/drill/exec/store/easy/json/loader/TestExtendedTypes.java
@@ -33,6 +33,7 @@ import java.util.TimeZone;
 import static org.apache.drill.test.rowSet.RowSetUtilities.dec;
 import static org.apache.drill.test.rowSet.RowSetUtilities.mapValue;
 
+import org.apache.drill.categories.FlakyTest;
 import org.apache.drill.categories.JsonTest;
 import org.apache.drill.common.exceptions.UserException;
 import org.apache.drill.common.types.TypeProtos.MinorType;
@@ -43,7 +44,7 @@ import org.apache.drill.test.rowSet.RowSetUtilities;
 import org.junit.Test;
 import org.junit.experimental.categories.Category;
 
-@Category(JsonTest.class)
+@Category({JsonTest.class, FlakyTest.class})
 public class TestExtendedTypes extends BaseJsonLoaderTest {
 
   @Test
diff --git a/exec/java-exec/src/test/java/org/apache/drill/exec/store/sys/PStoreTestUtil.java b/exec/java-exec/src/test/java/org/apache/drill/exec/store/sys/PStoreTestUtil.java
index 610558a..853a022 100644
--- a/exec/java-exec/src/test/java/org/apache/drill/exec/store/sys/PStoreTestUtil.java
+++ b/exec/java-exec/src/test/java/org/apache/drill/exec/store/sys/PStoreTestUtil.java
@@ -21,58 +21,66 @@ import static org.junit.Assert.assertEquals;
 import static org.junit.Assert.assertFalse;
 import static org.junit.Assert.assertTrue;
 
+import java.util.HashMap;
 import java.util.Iterator;
 import java.util.Map;
 import java.util.Map.Entry;
 
 import com.fasterxml.jackson.databind.ObjectMapper;
-import org.apache.drill.shaded.guava.com.google.common.collect.Maps;
+import lombok.val;
 
 public class PStoreTestUtil {
   static final org.slf4j.Logger logger = org.slf4j.LoggerFactory.getLogger(PStoreTestUtil.class);
 
-  public static void test(PersistentStoreProvider provider) throws Exception{
-    PersistentStore<String> store = provider.getOrCreateStore(PersistentStoreConfig.newJacksonBuilder(new ObjectMapper(), String.class).name("sys.test").build());
-    String[] keys = {"first", "second"};
-    String[] values = {"value1", "value2"};
-    Map<String, String> expectedMap = Maps.newHashMap();
+  public static void test(PersistentStoreProvider provider) throws Exception {
+    val store = provider.getOrCreateStore(
+      PersistentStoreConfig.newJacksonBuilder(new ObjectMapper(), String.class)
+        .name("sys.test")
+        .build()
+    );
+    try {
+      val expectedMap = new HashMap<String, String>() {{
+        put("first", "value1");
+        put("second", "value2");
+      }}; // todo: rewrite with Java11
+      expectedMap.forEach(store::put);
+      waitForNumProps(store, expectedMap.size());
 
-    for(int i =0; i < keys.length; i++){
-      expectedMap.put(keys[i], values[i]);
-      store.put(keys[i], values[i]);
-    }
+      Iterator<Map.Entry<String, String>> iter = store.getAll();
+      for(int i =0; i < expectedMap.size(); i++) {
+        Entry<String, String> storeEntry = iter.next();
 
-    // Wait for store caches to update, this is necessary because ZookeeperClient caches can update asynchronously in some cases.
-    waitForNumProps(store, keys.length);
+        assertTrue(String.format("This element wasn't added to PStore, storeEntry: %s", storeEntry),
+          expectedMap.containsKey(storeEntry.getKey()));
 
-    {
-      Iterator<Map.Entry<String, String>> iter = store.getAll();
-      for(int i =0; i < keys.length; i++){
-        Entry<String, String> e = iter.next();
-        assertTrue(expectedMap.containsKey(e.getKey()));
-        assertEquals(expectedMap.get(e.getKey()), e.getValue());
+        String expectedValue = expectedMap.get(storeEntry.getKey());
+        assertEquals(String.format("The value is different in PStore for this key: %s. Expected value: %s, Actual: %s",
+          storeEntry.getKey(), expectedValue, storeEntry.getValue()), expectedValue, storeEntry.getValue());
       }
 
       assertFalse(iter.hasNext());
-    }
-
-    {
+    } finally {
       Iterator<Map.Entry<String, String>> iter = store.getAll();
-      while(iter.hasNext()){
+      while(iter.hasNext()) {
         final String key = iter.next().getKey();
         store.delete(key);
       }
-    }
 
-    // Wait for store caches to update, this is necessary because ZookeeperClient caches can update asynchronously in some cases.
-    waitForNumProps(store, 0);
-    assertFalse(store.getAll().hasNext());
+      waitForNumProps(store, 0);
+      assertFalse(store.getAll().hasNext());
+    }
   }
 
+  /**
+   * Wait for store caches to update, this is necessary because ZookeeperClient caches can update asynchronously
+   * in some cases.
+   *
+   * @param store PersistentStore
+   * @param expected num props
+   * @throws InterruptedException will fail this test once arises
+   */
   private static void waitForNumProps(PersistentStore store, int expected) throws InterruptedException {
-    for (int numProps = getNumProps(store);
-         numProps < expected;
-         numProps = getNumProps(store)) {
+    while(getNumProps(store) < expected) {
       Thread.sleep(100L);
     }
   }
diff --git a/exec/java-exec/src/test/java/org/apache/drill/exec/store/sys/TestPStoreProviders.java b/exec/java-exec/src/test/java/org/apache/drill/exec/store/sys/TestPStoreProviders.java
index cab9877..db95c02 100644
--- a/exec/java-exec/src/test/java/org/apache/drill/exec/store/sys/TestPStoreProviders.java
+++ b/exec/java-exec/src/test/java/org/apache/drill/exec/store/sys/TestPStoreProviders.java
@@ -18,7 +18,9 @@
 package org.apache.drill.exec.store.sys;
 
 import com.fasterxml.jackson.databind.ObjectMapper;
+import lombok.val;
 import org.apache.curator.framework.CuratorFramework;
+import org.apache.drill.categories.FlakyTest;
 import org.apache.drill.categories.SlowTest;
 import org.apache.drill.common.config.DrillConfig;
 import org.apache.drill.common.util.DrillFileUtils;
@@ -45,7 +47,7 @@ import java.io.File;
 
 import static org.junit.Assert.assertTrue;
 
-@Category({SlowTest.class})
+@Category({SlowTest.class, FlakyTest.class})
 public class TestPStoreProviders extends TestWithZookeeper {
   @Rule
   public BaseDirTestWatcher dirTestWatcher = new BaseDirTestWatcher();
@@ -59,9 +61,9 @@ public class TestPStoreProviders extends TestWithZookeeper {
 
   @Test
   public void verifyZkStore() throws Exception {
-    try(CuratorFramework curator = createCurator()){
+    try(CuratorFramework curator = createCurator()) {
       curator.start();
-      ZookeeperPersistentStoreProvider provider = new ZookeeperPersistentStoreProvider(zkHelper.getConfig(), curator);
+      val provider = new ZookeeperPersistentStoreProvider(zkHelper.getConfig(), curator);
       PStoreTestUtil.test(provider);
     }
   }
diff --git a/exec/java-exec/src/test/java/org/apache/drill/exec/testing/Controls.java b/exec/java-exec/src/test/java/org/apache/drill/exec/testing/Controls.java
index ec264c7..5add50c 100644
--- a/exec/java-exec/src/test/java/org/apache/drill/exec/testing/Controls.java
+++ b/exec/java-exec/src/test/java/org/apache/drill/exec/testing/Controls.java
@@ -23,9 +23,9 @@ import org.apache.drill.exec.proto.CoordinationProtos.DrillbitEndpoint;
 
 import org.apache.drill.shaded.guava.com.google.common.collect.Lists;
 
-public class Controls {
+import static org.apache.drill.exec.testing.ExecutionControls.EMPTY_CONTROLS;
 
-  private static final String EMPTY_CONTROLS = "{\"injections\" : []}";
+public class Controls {
 
   /**
    * Returns a builder that can be used to add injections.
diff --git a/exec/java-exec/src/test/java/org/apache/drill/exec/testing/ControlsInjectionUtil.java b/exec/java-exec/src/test/java/org/apache/drill/exec/testing/ControlsInjectionUtil.java
index 576b101..28693be 100644
--- a/exec/java-exec/src/test/java/org/apache/drill/exec/testing/ControlsInjectionUtil.java
+++ b/exec/java-exec/src/test/java/org/apache/drill/exec/testing/ControlsInjectionUtil.java
@@ -57,7 +57,7 @@ public class ControlsInjectionUtil {
         data.release();
       }
     } catch (final RpcException e) {
-      fail("Could not set option: " + e.toString());
+      fail("Could not set option: " + e);
     }
   }
 
@@ -187,6 +187,6 @@ public class ControlsInjectionUtil {
    * Clears all the controls.
    */
   public static void clearControls(final DrillClient client) {
-    setControls(client, ExecutionControls.DEFAULT_CONTROLS);
+    setControls(client, ExecutionControls.EMPTY_CONTROLS);
   }
 }
diff --git a/exec/java-exec/src/test/java/org/apache/drill/test/BaseTestQuery.java b/exec/java-exec/src/test/java/org/apache/drill/test/BaseTestQuery.java
index f5e8064..da595cf 100644
--- a/exec/java-exec/src/test/java/org/apache/drill/test/BaseTestQuery.java
+++ b/exec/java-exec/src/test/java/org/apache/drill/test/BaseTestQuery.java
@@ -451,7 +451,7 @@ public class BaseTestQuery extends ExecTest {
     errorMsgTestHelper(testSqlQuery, UserBitShared.DrillPBError.ErrorType.PARSE.name());
   }
 
-  public static String getFile(String resource) throws IOException{
+  public static String getFile(String resource) throws IOException {
     URL url = Resources.getResource(resource);
     if (url == null) {
       throw new IOException(String.format("Unable to find path %s.", resource));
diff --git a/exec/java-exec/src/test/java/org/apache/drill/test/LogFixture.java b/exec/java-exec/src/test/java/org/apache/drill/test/LogFixture.java
index b62a188..acd1947 100644
--- a/exec/java-exec/src/test/java/org/apache/drill/test/LogFixture.java
+++ b/exec/java-exec/src/test/java/org/apache/drill/test/LogFixture.java
@@ -216,8 +216,6 @@ public class LogFixture implements AutoCloseable {
     appender.setEncoder(ple);
     appender.start();
 
-    Logger root = (Logger)LoggerFactory.getLogger(Logger.ROOT_LOGGER_NAME);
-    root.addAppender(appender);
     drillLogger.addAppender(appender);
   }
 
diff --git a/exec/jdbc/src/test/java/org/apache/drill/jdbc/test/TestInformationSchemaColumns.java b/exec/jdbc/src/test/java/org/apache/drill/jdbc/test/TestInformationSchemaColumns.java
index 76f2553..218c618 100644
--- a/exec/jdbc/src/test/java/org/apache/drill/jdbc/test/TestInformationSchemaColumns.java
+++ b/exec/jdbc/src/test/java/org/apache/drill/jdbc/test/TestInformationSchemaColumns.java
@@ -33,6 +33,7 @@ import java.sql.SQLException;
 import java.sql.Statement;
 import java.sql.Types;
 
+import org.apache.drill.categories.FlakyTest;
 import org.apache.drill.jdbc.Driver;
 import org.apache.drill.jdbc.JdbcTestBase;
 import org.apache.drill.categories.JdbcTest;
@@ -54,7 +55,7 @@ import org.junit.experimental.categories.Category;
 /**
  * Test class for Drill's INFORMATION_SCHEMA.COLUMNS implementation.
  */
-@Category(JdbcTest.class)
+@Category({JdbcTest.class, FlakyTest.class})
 public class TestInformationSchemaColumns extends JdbcTestBase {
 
   private static final String VIEW_NAME =
diff --git a/pom.xml b/pom.xml
index 31bbc9e..8292caf 100644
--- a/pom.xml
+++ b/pom.xml
@@ -73,7 +73,7 @@
     <commons.io.version>2.7</commons.io.version>
     <commons.collections.version>4.4</commons.collections.version>
     <hamcrest.version>2.2</hamcrest.version>
-    <curator.version>5.1.0</curator.version>
+    <curator.version>5.2.0</curator.version>
     <wiremock.standalone.version>2.23.2</wiremock.standalone.version>
     <jmockit.version>1.47</jmockit.version>
     <logback.version>1.2.3</logback.version>
@@ -98,7 +98,7 @@
     <javax.validation.api>2.0.1.Final</javax.validation.api>
     <asm.version>7.3.1</asm.version>
     <excludedGroups />
-    <memoryMb>4000</memoryMb>
+    <memoryMb>1800</memoryMb>
     <directMemoryMb>3000</directMemoryMb>
     <rat.skip>true</rat.skip>
     <license.skip>true</license.skip>