You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@drill.apache.org by vi...@apache.org on 2021/10/19 14:11:37 UTC
[drill] branch master updated: DRILL-7973: Fix GitHub CI
intermittent failures (#2293)
This is an automated email from the ASF dual-hosted git repository.
vitalii pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/drill.git
The following commit(s) were added to refs/heads/master by this push:
new 304230a DRILL-7973: Fix GitHub CI intermittent failures (#2293)
304230a is described below
commit 304230a289505526e1ff1bb1aae932517b7b6965
Author: Vitalii Diravka <vi...@apache.org>
AuthorDate: Tue Oct 19 17:11:28 2021 +0300
DRILL-7973: Fix GitHub CI intermittent failures (#2293)
* Disable flaky Splunk test
* DdirectMemoryMb=3000 -DmemoryMb=1800
* fail-fast: false
* Update cache version
* Fix flaky TestHttpPlugin#testSlowResponse
* Enable all ignored tests in TestDrillbitResilience
* Set timeout for every test in TestDrillbitResilience
* Migrate TestDrillbitResilience from junit4 to junit5
* Release resources in the @AfterEach method in TestDrillbitResilience
* check timeout for the mock http plugin
* fix JUnit Tag name for slow tests
* make memory configs are common for builds in all CI. Fix travis OOM
* refactoring test cases to use ClusterTest and fixtures
* COMPLETED state for TestDrillbitResilience#cancelAfterEverythingIsCompleted
* Enable LogFixture for TestDrillbitResilience. Udpate javacc-maven-plugin for maven parallel execution (see maven message in the beginning with -V)
* -B -ntp -> --batch-mode --no-transfer-progress. Disable TestConfigLinkage
* add proper logging. Check whether query is really interrupted. Check whether control is properly injected. Check when UnorderReceiverBatch is starting waiting
* fix moving from STARTING to CANCELLATION_REQUESTED
* Doc is added
* Refactor TestPStoreProviders test. Update curator.version 5.1.0 -> 5.2.0
* Fix TestDrillbitResilience#cancelWhenQueryIdArrives
---
.github/workflows/ci.yml | 12 +-
.travis.yml | 3 +-
.../drill/common/concurrent/ExtendedLatch.java | 1 +
.../categories/{SlowTest.java => FlakyTest.java} | 10 +-
.../java/org/apache/drill/categories/SlowTest.java | 8 +-
.../apache/drill/common/util/RepeatTestRule.java | 3 +
.../java/org/apache/drill/test/DirTestWatcher.java | 4 +
.../exec/store/http/HttpStoragePluginConfig.java | 5 +-
.../drill/exec/store/http/TestHttpPlugin.java | 2 +-
.../drill/store/openTSDB/TestOpenTSDBPlugin.java | 21 +-
.../drill/exec/store/splunk/SplunkPluginTest.java | 1 +
docs/dev/InjectControls.md | 57 ++
exec/java-exec/pom.xml | 2 +-
.../org/apache/drill/exec/client/DrillClient.java | 5 +-
.../unorderedreceiver/UnorderedReceiverBatch.java | 3 +
.../drill/exec/record/RecordBatchLoader.java | 2 +-
.../apache/drill/exec/server/RemoteServiceSet.java | 6 +
.../drill/exec/testing/ExecutionControls.java | 2 +-
.../org/apache/drill/exec/testing/Injection.java | 7 +
.../org/apache/drill/exec/work/WorkManager.java | 2 +-
.../apache/drill/exec/work/foreman/Foreman.java | 1 +
.../exec/work/foreman/QueryStateProcessor.java | 26 +-
.../java/org/apache/drill/SingleRowListener.java | 2 +-
.../physical/impl/TestNestedDateTimeTimestamp.java | 3 +
.../physical/impl/writer/TestParquetWriter.java | 2 +-
.../drill/exec/server/TestDrillbitResilience.java | 1007 ++++++++++----------
.../store/easy/json/loader/TestExtendedTypes.java | 3 +-
.../drill/exec/store/sys/PStoreTestUtil.java | 66 +-
.../drill/exec/store/sys/TestPStoreProviders.java | 8 +-
.../org/apache/drill/exec/testing/Controls.java | 4 +-
.../drill/exec/testing/ControlsInjectionUtil.java | 4 +-
.../java/org/apache/drill/test/BaseTestQuery.java | 2 +-
.../java/org/apache/drill/test/LogFixture.java | 2 -
.../jdbc/test/TestInformationSchemaColumns.java | 3 +-
pom.xml | 4 +-
35 files changed, 694 insertions(+), 599 deletions(-)
diff --git a/.github/workflows/ci.yml b/.github/workflows/ci.yml
index 5866b19..e32d7b7 100644
--- a/.github/workflows/ci.yml
+++ b/.github/workflows/ci.yml
@@ -34,6 +34,7 @@ jobs:
matrix:
# Java versions to run unit tests
java: [ '8', '11', '14' ]
+ fail-fast: false
steps:
- name: Checkout
uses: actions/checkout@v2
@@ -47,7 +48,7 @@ jobs:
${{ runner.os }}-maven-
# Caches MySQL directory used for JDBC storage plugin tests
- name: Cache MySQL
- uses: actions/cache@v1
+ uses: actions/cache@v2
with:
path: ~/.embedmysql
key: ${{ runner.os }}-mysql
@@ -57,7 +58,8 @@ jobs:
distribution: 'adopt'
java-version: ${{ matrix.java }}
- name: Build and test
- run: mvn install -V -ntp -DdirectMemoryMb=2500 -DmemoryMb=2000 # Note: the total GitHub Actions memory is 7000Mb
+ # The total GitHub Actions memory is 7000Mb. But GitHub CI requires some memory for the container to perform tests
+ run: mvn install --batch-mode --no-transfer-progress # -X -V for debugging
checkstyle_protobuf:
name: Run checkstyle and generate protobufs
@@ -66,7 +68,7 @@ jobs:
- name: Checkout
uses: actions/checkout@v2
- name: Cache Maven Repository
- uses: actions/cache@v1
+ uses: actions/cache@v2
with:
path: ~/.m2/repository
key: ${{ runner.os }}-maven-${{ hashFiles('**/pom.xml') }}
@@ -74,7 +76,7 @@ jobs:
${{ runner.os }}-maven-
# Caches built protobuf library
- name: Cache protobufs
- uses: actions/cache@v1
+ uses: actions/cache@v2
with:
path: ~/protobuf
key: ${{ runner.os }}-protobuf
@@ -98,7 +100,7 @@ jobs:
# Builds Drill project, performs license checkstyle goal and regenerates java and C++ protobuf files
- name: Build
run: |
- MAVEN_OPTS="-Xms1G -Xmx1G" mvn install -Drat.skip=false -Dlicense.skip=false --batch-mode -Dorg.slf4j.simpleLogger.log.org.apache.maven.cli.transfer.Slf4jMavenTransferListener=warn -DskipTests=true -Dmaven.javadoc.skip=true -Dmaven.source.skip=true && \
+ MAVEN_OPTS="-Xms1G -Xmx1G" mvn install -Drat.skip=false -Dlicense.skip=false --batch-mode --no-transfer-progress -Dorg.slf4j.simpleLogger.log.org.apache.maven.cli.transfer.Slf4jMavenTransferListener=warn -DskipTests=true -Dmaven.javadoc.skip=true -Dmaven.source.skip=true && \
pushd protocol && mvn process-sources -P proto-compile && popd && \
mkdir contrib/native/client/build && pushd contrib/native/client/build && cmake -G "Unix Makefiles" .. && make cpProtobufs && popd; \
# Checks whether project files weren't changed after regenerating protobufs
diff --git a/.travis.yml b/.travis.yml
index 0f059bf..36e3b95 100644
--- a/.travis.yml
+++ b/.travis.yml
@@ -72,8 +72,7 @@ install:
- |
if [ $PHASE = "tests" ]; then \
mvn install --batch-mode --no-transfer-progress \
- -DexcludedGroups="org.apache.drill.categories.SlowTest,org.apache.drill.categories.UnlikelyTest,org.apache.drill.categories.SecurityTest" \
- -DmemoryMb=1300 -DdirectMemoryMb=3000; \
+ -DexcludedGroups="org.apache.drill.categories.SlowTest,org.apache.drill.categories.UnlikelyTest,org.apache.drill.categories.SecurityTest"; \
elif [ $PHASE = "build_checkstyle_protobuf" ]; then \
MAVEN_OPTS="-Xms1G -Xmx1G" mvn install --no-transfer-progress -Drat.skip=false -Dlicense.skip=false --batch-mode -Dorg.slf4j.simpleLogger.log.org.apache.maven.cli.transfer.Slf4jMavenTransferListener=warn -DskipTests=true -Dmaven.javadoc.skip=true -Dmaven.source.skip=true && \
pushd protocol && mvn process-sources -P proto-compile && popd && \
diff --git a/common/src/main/java/org/apache/drill/common/concurrent/ExtendedLatch.java b/common/src/main/java/org/apache/drill/common/concurrent/ExtendedLatch.java
index 9868537..383cd7f 100644
--- a/common/src/main/java/org/apache/drill/common/concurrent/ExtendedLatch.java
+++ b/common/src/main/java/org/apache/drill/common/concurrent/ExtendedLatch.java
@@ -52,6 +52,7 @@ public class ExtendedLatch extends CountDownLatch {
return await(wait, TimeUnit.MILLISECONDS);
} catch (final InterruptedException e) {
// if we weren't ready, the while loop will continue to wait
+ logger.warn("Interrupted while waiting for event latch.", e);
}
}
return false;
diff --git a/common/src/test/java/org/apache/drill/categories/SlowTest.java b/common/src/test/java/org/apache/drill/categories/FlakyTest.java
similarity index 77%
copy from common/src/test/java/org/apache/drill/categories/SlowTest.java
copy to common/src/test/java/org/apache/drill/categories/FlakyTest.java
index cf1103f..9a747b2 100644
--- a/common/src/test/java/org/apache/drill/categories/SlowTest.java
+++ b/common/src/test/java/org/apache/drill/categories/FlakyTest.java
@@ -18,8 +18,12 @@
package org.apache.drill.categories;
/**
- * A category for tests that take a long time to run.
+ * A category for tests that intermittently fail. It is better to run them in one Maven fork to avoid OOM or deadlocks <br>
+ * Junit category marker
*/
-public interface SlowTest {
- // Junit category marker
+public interface FlakyTest {
+ /**
+ * tag for JUnit5
+ */
+ String TAG = "flaky-test";
}
diff --git a/common/src/test/java/org/apache/drill/categories/SlowTest.java b/common/src/test/java/org/apache/drill/categories/SlowTest.java
index cf1103f..f4f6ea9 100644
--- a/common/src/test/java/org/apache/drill/categories/SlowTest.java
+++ b/common/src/test/java/org/apache/drill/categories/SlowTest.java
@@ -18,8 +18,12 @@
package org.apache.drill.categories;
/**
- * A category for tests that take a long time to run.
+ * A category for tests that take a long time to run. <br>
+ * Junit category marker
*/
public interface SlowTest {
- // Junit category marker
+ /**
+ * tag for JUnit5
+ */
+ String TAG = "slow-test";
}
diff --git a/common/src/test/java/org/apache/drill/common/util/RepeatTestRule.java b/common/src/test/java/org/apache/drill/common/util/RepeatTestRule.java
index fd6d95e..07b4da2 100644
--- a/common/src/test/java/org/apache/drill/common/util/RepeatTestRule.java
+++ b/common/src/test/java/org/apache/drill/common/util/RepeatTestRule.java
@@ -38,7 +38,10 @@ import java.lang.annotation.Target;
* }
* }
* </pre>
+ * @deprecated It was created for Junit4. Junit5 has its own @RepeatedTest(num). And looks like it is not used now, but
+ * it can be used for manual testing
*/
+@Deprecated
public class RepeatTestRule implements TestRule {
@Retention(RetentionPolicy.RUNTIME)
diff --git a/common/src/test/java/org/apache/drill/test/DirTestWatcher.java b/common/src/test/java/org/apache/drill/test/DirTestWatcher.java
index 839fbac..9dea4a5 100644
--- a/common/src/test/java/org/apache/drill/test/DirTestWatcher.java
+++ b/common/src/test/java/org/apache/drill/test/DirTestWatcher.java
@@ -18,6 +18,8 @@
package org.apache.drill.test;
import org.apache.commons.io.FileUtils;
+import org.junit.jupiter.api.extension.AfterEachCallback;
+import org.junit.jupiter.api.extension.BeforeEachCallback;
import org.junit.rules.TestWatcher;
import org.junit.runner.Description;
@@ -87,6 +89,8 @@ import java.nio.file.Paths;
* {@link DirTestWatcher#getDir()} in myTestMethod1 and myTestMethod2 are
* <b>my.proj.MyTestClass/myTestMethod1</b> and
* <b>my.proj.MyTestClass/myTestMethod2</b> respectively.
+ * TODO: need to implement {@link AfterEachCallback} and {@link BeforeEachCallback} to use it with
+ * JUnit5 @ExtendWith annotation
* </p>
*/
public class DirTestWatcher extends TestWatcher {
diff --git a/contrib/storage-http/src/main/java/org/apache/drill/exec/store/http/HttpStoragePluginConfig.java b/contrib/storage-http/src/main/java/org/apache/drill/exec/store/http/HttpStoragePluginConfig.java
index 280895d..119d4b0 100644
--- a/contrib/storage-http/src/main/java/org/apache/drill/exec/store/http/HttpStoragePluginConfig.java
+++ b/contrib/storage-http/src/main/java/org/apache/drill/exec/store/http/HttpStoragePluginConfig.java
@@ -35,6 +35,7 @@ import org.slf4j.LoggerFactory;
import java.util.HashMap;
import java.util.Map;
import java.util.Objects;
+import java.util.concurrent.TimeUnit;
@JsonTypeName(HttpStoragePluginConfig.NAME)
@@ -47,9 +48,9 @@ public class HttpStoragePluginConfig extends AbstractSecuredStoragePluginConfig
public final int proxyPort;
public final String proxyType;
/**
- * Timeout in seconds.
+ * Timeout in {@link TimeUnit#SECONDS}.
*/
- public int timeout;
+ public final int timeout;
@JsonCreator
public HttpStoragePluginConfig(@JsonProperty("cacheResults") Boolean cacheResults,
diff --git a/contrib/storage-http/src/test/java/org/apache/drill/exec/store/http/TestHttpPlugin.java b/contrib/storage-http/src/test/java/org/apache/drill/exec/store/http/TestHttpPlugin.java
index 57510f3..0375c99 100644
--- a/contrib/storage-http/src/test/java/org/apache/drill/exec/store/http/TestHttpPlugin.java
+++ b/contrib/storage-http/src/test/java/org/apache/drill/exec/store/http/TestHttpPlugin.java
@@ -877,7 +877,7 @@ public class TestHttpPlugin extends ClusterTest {
server.enqueue(
new MockResponse().setResponseCode(200)
.setBody(TEST_JSON_RESPONSE)
- .throttleBody(64, 4, TimeUnit.SECONDS)
+ .throttleBody(64, 6, TimeUnit.SECONDS)
);
String sql = "SELECT sunrise AS sunrise, sunset AS sunset FROM local.sunrise.`?lat=36.7201600&lng=-4.4203400&date=2019-10-02` AS t1";
diff --git a/contrib/storage-opentsdb/src/test/java/org/apache/drill/store/openTSDB/TestOpenTSDBPlugin.java b/contrib/storage-opentsdb/src/test/java/org/apache/drill/store/openTSDB/TestOpenTSDBPlugin.java
index 8c03752..1c8207a 100644
--- a/contrib/storage-opentsdb/src/test/java/org/apache/drill/store/openTSDB/TestOpenTSDBPlugin.java
+++ b/contrib/storage-opentsdb/src/test/java/org/apache/drill/store/openTSDB/TestOpenTSDBPlugin.java
@@ -22,6 +22,7 @@ import org.apache.drill.common.exceptions.UserRemoteException;
import org.apache.drill.exec.proto.UserBitShared.QueryType;
import org.apache.drill.exec.store.StoragePluginRegistry;
import org.apache.drill.exec.store.openTSDB.OpenTSDBStoragePluginConfig;
+import org.apache.drill.test.ClientFixture;
import org.apache.drill.test.ClusterFixture;
import org.apache.drill.test.ClusterTest;
import org.apache.drill.test.QueryTestUtil;
@@ -196,19 +197,19 @@ public class TestOpenTSDBPlugin extends ClusterTest {
@Test
public void testInformationSchemaWrongPluginConfig() throws Exception {
- ClusterFixture cluster = ClusterFixture.bareBuilder(dirTestWatcher)
- .build();
- int portNumber = QueryTestUtil.getFreePortNumber(10_000, 200);
- final StoragePluginRegistry pluginRegistry = cluster.drillbit().getContext().getStorage();
- OpenTSDBStoragePluginConfig storagePluginConfig =
+ try (ClusterFixture cluster = ClusterFixture.bareBuilder(dirTestWatcher).build();
+ ClientFixture client = cluster.clientFixture()) {
+ int portNumber = QueryTestUtil.getFreePortNumber(10_000, 200);
+ final StoragePluginRegistry pluginRegistry = cluster.drillbit().getContext().getStorage();
+ OpenTSDBStoragePluginConfig storagePluginConfig =
new OpenTSDBStoragePluginConfig(String.format("http://localhost:%s/", portNumber));
- storagePluginConfig.setEnabled(true);
- pluginRegistry.put(OpenTSDBStoragePluginConfig.NAME, storagePluginConfig);
- String query = "select * from information_schema.`views`";
- cluster.clientFixture()
- .queryBuilder()
+ storagePluginConfig.setEnabled(true);
+ pluginRegistry.put(OpenTSDBStoragePluginConfig.NAME, storagePluginConfig);
+ String query = "select * from information_schema.`views`";
+ client.queryBuilder()
.sql(query)
.run();
+ }
}
private long runQuery(String query) throws Exception {
diff --git a/contrib/storage-splunk/src/test/java/org/apache/drill/exec/store/splunk/SplunkPluginTest.java b/contrib/storage-splunk/src/test/java/org/apache/drill/exec/store/splunk/SplunkPluginTest.java
index a9e7aff..7d52008 100644
--- a/contrib/storage-splunk/src/test/java/org/apache/drill/exec/store/splunk/SplunkPluginTest.java
+++ b/contrib/storage-splunk/src/test/java/org/apache/drill/exec/store/splunk/SplunkPluginTest.java
@@ -172,6 +172,7 @@ public class SplunkPluginTest extends SplunkBaseTest {
}
@Test
+ @Ignore("the result is not consistent on system tables")
public void testExplicitFieldsWithSourceType() throws Exception {
String sql = "SELECT action, _sourcetype, _subsecond, _time FROM splunk._audit WHERE sourcetype='audittrail' LIMIT 5";
client.testBuilder()
diff --git a/docs/dev/InjectControls.md b/docs/dev/InjectControls.md
new file mode 100644
index 0000000..1ab1029
--- /dev/null
+++ b/docs/dev/InjectControls.md
@@ -0,0 +1,57 @@
+# InjectControls (DRILL-2383)
+_This is the copy of the following doc:
+https://docs.google.com/document/d/1qwsV5Uq2ftJn6ZMzjydxGl8lqeIiS4gmUTMcmdz4gDY/edit <p>_
+This feature is useful for developers to test Drill resiliency and to verify behavior when an exception is thrown. Use pauses to test cancellations.
+Injection sites can be added to classes that can get hold of ExecutionControls from either FragmentContext, QueryContext or OperatorContext. Injection sites can be chained.
+1) Declare an injector just like a Logger:
+ private final static ExecutionControlsInjector injector =
+ ExecutionControlsInjector.getInjector(FragmentExecutor.class);
+
+2) Add an injection site (unique descriptor per site):
+ // ...code...
+ // function signature: (controls, descriptor, exception)
+ injector.injectChecked(fragmentContext.getExecutionControls(),
+ "fragment-execution-checked", IOException.class);
+ // function signature: (controls, descriptor)
+ injector.injectUnchecked(fragmentContext.getExecutionControls(),
+ "fragment-execution-unchecked");
+ // ...code...
+ OR
+ // ...code...
+ // function signature: (controls, descriptor, logger)
+ injector.injectPause(queryContext.getExecutionControls(), "pause-run-plan",
+ logger);
+ // ...code...
+
+To set controls:
+> ALTER SESSION SET `drill.exec.testing.controls`='{
+"injections":[ {
+"siteClass": "org.apache.drill.exec.work.fragment.FragmentExecutor",
+"desc": "fragment-execution-checked",
+"nSkip": 0,
+"nFire": 1,
+"type":"exception",
+"exceptionClass": "java.io.IOException"
+}, {
+"siteClass": "org.apache.drill.exec.work.foreman.Foreman",
+"desc": "pause-run-plan",
+"nSkip": 0,
+"nFire": 1,
+"address": "10.10.10.10",
+"port": 31019,
+"type":"pause"
+}
+] }';
+> SELECT * FROM sys.memory;
+...
+
+For the above query, an IOException with fragment-execution message will be thrown on all drillbits. Also, if the foreman is on 10.10.10.10:31019, it will pause at pause-run-plan. For other examples, see TestDrillbitResilience, TestExceptionInjection and TestPauseInjection.
+
+_NOTE:_
+Controls are fired only if assertions are enabled.
+Controls are fired only on one query after the alter session query.
+If controls are specified, they are passed to every fragment as part of options.
+address and port fields are optional.
+If these fields are set, controls will be fired only on specified drillbit.
+If they are not set, the injection will be fired on EVERY drillbit that reaches the site.
+
diff --git a/exec/java-exec/pom.xml b/exec/java-exec/pom.xml
index 7997c37..167b675 100644
--- a/exec/java-exec/pom.xml
+++ b/exec/java-exec/pom.xml
@@ -889,7 +889,7 @@
<plugin> <!-- generate the parser (Parser.jj is itself generated wit fmpp above) -->
<groupId>org.codehaus.mojo</groupId>
<artifactId>javacc-maven-plugin</artifactId>
- <version>2.4</version>
+ <version>2.6</version>
<executions>
<execution>
<id>javacc</id>
diff --git a/exec/java-exec/src/main/java/org/apache/drill/exec/client/DrillClient.java b/exec/java-exec/src/main/java/org/apache/drill/exec/client/DrillClient.java
index fdbb971..e5562dd 100644
--- a/exec/java-exec/src/main/java/org/apache/drill/exec/client/DrillClient.java
+++ b/exec/java-exec/src/main/java/org/apache/drill/exec/client/DrillClient.java
@@ -832,7 +832,7 @@ public class DrillClient implements Closeable, ConnectionThrottle {
private final UserProtos.RunQuery query;
public ListHoldingResultsListener(UserProtos.RunQuery query) {
- logger.debug( "Listener created for query \"\"\"{}\"\"\"", query );
+ logger.debug( "Listener created for query \"{}\"", query );
this.query = query;
}
@@ -867,7 +867,8 @@ public class DrillClient implements Closeable, ConnectionThrottle {
@Override
public void dataArrived(QueryDataBatch result, ConnectionThrottle throttle) {
- logger.debug("Result arrived: Result: {}", result);
+ logger.debug("Result arrived: Row count: {}", result.getHeader().getRowCount());
+ logger.trace("Result batch: {}", result);
results.add(result);
}
diff --git a/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/unorderedreceiver/UnorderedReceiverBatch.java b/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/unorderedreceiver/UnorderedReceiverBatch.java
index bfbad64..0ac2565 100644
--- a/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/unorderedreceiver/UnorderedReceiverBatch.java
+++ b/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/unorderedreceiver/UnorderedReceiverBatch.java
@@ -229,6 +229,7 @@ public class UnorderedReceiverBatch implements CloseableRecordBatch {
@Override
public void close() {
+ logger.debug("Closing {}", getClass().getCanonicalName());
batchLoader.clear();
}
@@ -279,6 +280,8 @@ public class UnorderedReceiverBatch implements CloseableRecordBatch {
@Override
public void interrupted(InterruptedException e) {
+ logger.debug("{} interrupted. shouldContinue value: {}", getClass().getCanonicalName(),
+ context.getExecutorState().shouldContinue());
if (context.getExecutorState().shouldContinue()) {
String errMsg = "Received an interrupt RPC outcome while sending ReceiverFinished message";
logger.error(errMsg, e);
diff --git a/exec/java-exec/src/main/java/org/apache/drill/exec/record/RecordBatchLoader.java b/exec/java-exec/src/main/java/org/apache/drill/exec/record/RecordBatchLoader.java
index 1eb2ac0..83e1e85 100644
--- a/exec/java-exec/src/main/java/org/apache/drill/exec/record/RecordBatchLoader.java
+++ b/exec/java-exec/src/main/java/org/apache/drill/exec/record/RecordBatchLoader.java
@@ -47,7 +47,7 @@ import org.apache.drill.shaded.guava.com.google.common.base.Preconditions;
/**
* Holds record batch loaded from record batch message.
*/
-public class RecordBatchLoader implements VectorAccessible, Iterable<VectorWrapper<?>>{
+public class RecordBatchLoader implements VectorAccessible {
private final static Logger logger = LoggerFactory.getLogger(RecordBatchLoader.class);
private final BufferAllocator allocator;
diff --git a/exec/java-exec/src/main/java/org/apache/drill/exec/server/RemoteServiceSet.java b/exec/java-exec/src/main/java/org/apache/drill/exec/server/RemoteServiceSet.java
index aeb4475..723cd0a 100644
--- a/exec/java-exec/src/main/java/org/apache/drill/exec/server/RemoteServiceSet.java
+++ b/exec/java-exec/src/main/java/org/apache/drill/exec/server/RemoteServiceSet.java
@@ -20,6 +20,9 @@ package org.apache.drill.exec.server;
import org.apache.drill.exec.coord.ClusterCoordinator;
import org.apache.drill.exec.coord.local.LocalClusterCoordinator;
+/**
+ * It is necessary to start Drillbit. For more info check {@link ClusterCoordinator}
+ */
public class RemoteServiceSet implements AutoCloseable {
static final org.slf4j.Logger logger = org.slf4j.LoggerFactory.getLogger(RemoteServiceSet.class);
@@ -40,6 +43,9 @@ public class RemoteServiceSet implements AutoCloseable {
coordinator.close();
}
+ /**
+ * @return Use a non-null service set so that the drillbits can use port hunting
+ */
public static RemoteServiceSet getLocalServiceSet() {
return new RemoteServiceSet(new LocalClusterCoordinator());
}
diff --git a/exec/java-exec/src/main/java/org/apache/drill/exec/testing/ExecutionControls.java b/exec/java-exec/src/main/java/org/apache/drill/exec/testing/ExecutionControls.java
index 29a3a2a..3543253 100644
--- a/exec/java-exec/src/main/java/org/apache/drill/exec/testing/ExecutionControls.java
+++ b/exec/java-exec/src/main/java/org/apache/drill/exec/testing/ExecutionControls.java
@@ -123,7 +123,7 @@ public final class ExecutionControls {
/**
* The default value for controls.
*/
- public static final String DEFAULT_CONTROLS = "{}";
+ public static final String EMPTY_CONTROLS = "{\"injections\" : []}";
/**
* Caches the currently specified controls.
diff --git a/exec/java-exec/src/main/java/org/apache/drill/exec/testing/Injection.java b/exec/java-exec/src/main/java/org/apache/drill/exec/testing/Injection.java
index d8349c4..3414f24 100644
--- a/exec/java-exec/src/main/java/org/apache/drill/exec/testing/Injection.java
+++ b/exec/java-exec/src/main/java/org/apache/drill/exec/testing/Injection.java
@@ -17,6 +17,8 @@
*/
package org.apache.drill.exec.testing;
+import lombok.ToString;
+import lombok.extern.slf4j.Slf4j;
import org.apache.drill.exec.proto.CoordinationProtos.DrillbitEndpoint;
import java.util.concurrent.atomic.AtomicInteger;
@@ -24,6 +26,8 @@ import java.util.concurrent.atomic.AtomicInteger;
/**
* The base class for all types of injections (currently, pause and exception).
*/
+@Slf4j
+@ToString
public abstract class Injection {
protected final String address; // the address of the drillbit on which to inject
@@ -67,6 +71,9 @@ public abstract class Injection {
* @return if the injection should be injected now
*/
protected boolean injectNow() {
+ if(logger.isDebugEnabled()) {
+ logger.debug(toString());
+ }
return nSkip.decrementAndGet() < 0 && nFire.decrementAndGet() >= 0;
}
diff --git a/exec/java-exec/src/main/java/org/apache/drill/exec/work/WorkManager.java b/exec/java-exec/src/main/java/org/apache/drill/exec/work/WorkManager.java
index de49e28..5fcdac1 100644
--- a/exec/java-exec/src/main/java/org/apache/drill/exec/work/WorkManager.java
+++ b/exec/java-exec/src/main/java/org/apache/drill/exec/work/WorkManager.java
@@ -282,7 +282,7 @@ public class WorkManager implements AutoCloseable {
final String originalName = currentThread.getName();
try {
currentThread.setName(queryIdString + ":foreman:cancel");
- logger.debug("Canceling foreman");
+ logger.debug("Canceling foreman. Thread: {}", originalName);
foreman.cancel();
} catch (Throwable t) {
logger.warn("Exception while canceling foreman", t);
diff --git a/exec/java-exec/src/main/java/org/apache/drill/exec/work/foreman/Foreman.java b/exec/java-exec/src/main/java/org/apache/drill/exec/work/foreman/Foreman.java
index 64df6c4..c12c713 100644
--- a/exec/java-exec/src/main/java/org/apache/drill/exec/work/foreman/Foreman.java
+++ b/exec/java-exec/src/main/java/org/apache/drill/exec/work/foreman/Foreman.java
@@ -201,6 +201,7 @@ public class Foreman implements Runnable {
* Query execution will be canceled once possible.
*/
public void cancel() {
+ logger.debug("Cancel Foreman");
queryStateProcessor.cancel();
}
diff --git a/exec/java-exec/src/main/java/org/apache/drill/exec/work/foreman/QueryStateProcessor.java b/exec/java-exec/src/main/java/org/apache/drill/exec/work/foreman/QueryStateProcessor.java
index 3bc75c3..17a7c5d 100644
--- a/exec/java-exec/src/main/java/org/apache/drill/exec/work/foreman/QueryStateProcessor.java
+++ b/exec/java-exec/src/main/java/org/apache/drill/exec/work/foreman/QueryStateProcessor.java
@@ -239,10 +239,12 @@ public class QueryStateProcessor implements AutoCloseable {
return;
case COMPLETED:
wrapUpCompletion();
+ return;
case CANCELLATION_REQUESTED:
// since during starting state fragments are sent to the remote nodes,
// we don't want to cancel until they all are sent out
- addToEventQueue(QueryState.CANCELLATION_REQUESTED, null);
+ assert exception == null;
+ wrapUpCancellation();
return;
}
@@ -256,23 +258,13 @@ public class QueryStateProcessor implements AutoCloseable {
* cause this to be called recursively.
*/
switch (newState) {
- case CANCELLATION_REQUESTED: {
+ case CANCELLATION_REQUESTED:
assert exception == null;
- recordNewState(QueryState.CANCELLATION_REQUESTED);
- queryManager.cancelExecutingFragments(drillbitContext);
- foremanResult.setCompleted(QueryState.CANCELED);
- /*
- * We don't close the foremanResult until we've gotten
- * acknowledgments, which happens below in the case for current state
- * == CANCELLATION_REQUESTED.
- */
+ wrapUpCancellation();
return;
- }
-
- case COMPLETED: {
+ case COMPLETED:
wrapUpCompletion();
return;
- }
}
checkCommonStates(newState, exception);
}
@@ -303,7 +295,13 @@ public class QueryStateProcessor implements AutoCloseable {
private void wrapUpCancellation() {
recordNewState(QueryState.CANCELLATION_REQUESTED);
+ queryManager.cancelExecutingFragments(drillbitContext);
foremanResult.setCompleted(QueryState.CANCELED);
+ /*
+ * We don't close the foremanResult until we've gotten
+ * acknowledgments, which happens below in the case for current state
+ * == CANCELLATION_REQUESTED.
+ */
}
private void wrapUpCompletion() {
diff --git a/exec/java-exec/src/test/java/org/apache/drill/SingleRowListener.java b/exec/java-exec/src/test/java/org/apache/drill/SingleRowListener.java
index 65c900c..5532abe 100644
--- a/exec/java-exec/src/test/java/org/apache/drill/SingleRowListener.java
+++ b/exec/java-exec/src/test/java/org/apache/drill/SingleRowListener.java
@@ -68,7 +68,7 @@ public abstract class SingleRowListener implements UserResultsListener {
}
@Override
- public void dataArrived(final QueryDataBatch result, final ConnectionThrottle throttle) {
+ public void dataArrived(QueryDataBatch result, ConnectionThrottle throttle) {
final QueryData queryData = result.getHeader();
if (result.hasData()) {
final int nRows = this.nRows.addAndGet(queryData.getRowCount());
diff --git a/exec/java-exec/src/test/java/org/apache/drill/exec/physical/impl/TestNestedDateTimeTimestamp.java b/exec/java-exec/src/test/java/org/apache/drill/exec/physical/impl/TestNestedDateTimeTimestamp.java
index 8d79ebe..05697aa 100644
--- a/exec/java-exec/src/test/java/org/apache/drill/exec/physical/impl/TestNestedDateTimeTimestamp.java
+++ b/exec/java-exec/src/test/java/org/apache/drill/exec/physical/impl/TestNestedDateTimeTimestamp.java
@@ -29,16 +29,19 @@ import java.util.List;
import java.util.Map;
import java.util.TreeMap;
+import org.apache.drill.categories.FlakyTest;
import org.apache.drill.exec.expr.fn.impl.DateUtility;
import org.apache.drill.exec.rpc.user.QueryDataBatch;
import org.apache.drill.test.BaseTestQuery;
import org.apache.drill.test.TestBuilder;
import org.junit.Assert;
import org.junit.Test;
+import org.junit.experimental.categories.Category;
/**
* For DRILL-6242, output for Date, Time, Timestamp should use different classes
*/
+@Category(FlakyTest.class)
public class TestNestedDateTimeTimestamp extends BaseTestQuery {
private static final String DATAFILE = "cp.`datetime.parquet`";
private static final Map<String, Object> expectedRecord = new TreeMap<String, Object>();
diff --git a/exec/java-exec/src/test/java/org/apache/drill/exec/physical/impl/writer/TestParquetWriter.java b/exec/java-exec/src/test/java/org/apache/drill/exec/physical/impl/writer/TestParquetWriter.java
index dbae776..887af0d 100644
--- a/exec/java-exec/src/test/java/org/apache/drill/exec/physical/impl/writer/TestParquetWriter.java
+++ b/exec/java-exec/src/test/java/org/apache/drill/exec/physical/impl/writer/TestParquetWriter.java
@@ -641,7 +641,7 @@ public class TestParquetWriter extends BaseTestQuery {
* have written the correct type. For every CTAS operation we use both the readers to verify results.
*/
@Test
- public void testCTASWithIntervalTypes() throws Exception {
+ public void testCTASWithIntervalTypes() throws Exception { // TODO: investigate NPE errors during the test execution
test("use dfs.tmp");
String tableName = "drill_1980_t1";
diff --git a/exec/java-exec/src/test/java/org/apache/drill/exec/server/TestDrillbitResilience.java b/exec/java-exec/src/test/java/org/apache/drill/exec/server/TestDrillbitResilience.java
index 206221d..fed9c4d 100644
--- a/exec/java-exec/src/test/java/org/apache/drill/exec/server/TestDrillbitResilience.java
+++ b/exec/java-exec/src/test/java/org/apache/drill/exec/server/TestDrillbitResilience.java
@@ -19,34 +19,40 @@ package org.apache.drill.exec.server;
import static org.apache.drill.exec.ExecConstants.SLICE_TARGET;
import static org.apache.drill.exec.ExecConstants.SLICE_TARGET_DEFAULT;
-import static org.apache.drill.exec.planner.physical.PlannerSettings.HASHAGG;
+import static org.apache.drill.exec.planner.physical.PlannerSettings.ENABLE_HASH_AGG_OPTION;
import static org.apache.drill.exec.planner.physical.PlannerSettings.PARTITION_SENDER_SET_THREADS;
-import static org.junit.Assert.assertEquals;
-import static org.junit.Assert.assertTrue;
-import static org.junit.Assert.fail;
+import static org.junit.jupiter.api.Assertions.assertEquals;
+import static org.junit.jupiter.api.Assertions.assertSame;
+import static org.junit.jupiter.api.Assertions.assertTrue;
+import static org.junit.jupiter.api.Assertions.fail;
import java.io.IOException;
-import java.util.HashMap;
-import java.util.List;
-import java.util.Map;
-import java.util.Properties;
+import ch.qos.logback.classic.Level;
import org.apache.commons.math3.util.Pair;
+import org.apache.drill.categories.FlakyTest;
+import org.apache.drill.exec.client.DrillClient;
+import org.apache.drill.exec.physical.impl.partitionsender.PartitionSenderRootExec;
+import org.apache.drill.exec.testing.ExecutionControlsInjector;
+import org.apache.drill.exec.work.WorkManager;
+import org.apache.drill.exec.work.foreman.Foreman;
+import org.apache.drill.exec.work.foreman.ForemanException;
+import org.apache.drill.exec.work.foreman.ForemanSetupException;
import org.apache.drill.exec.work.foreman.FragmentsRunner;
+import org.apache.drill.exec.work.foreman.QueryStateProcessor;
import org.apache.drill.test.BaseTestQuery;
+import org.apache.drill.test.ClusterFixture;
+import org.apache.drill.test.ClusterFixtureBuilder;
+import org.apache.drill.test.ClusterTest;
+import org.apache.drill.test.LogFixture;
import org.apache.drill.test.QueryTestUtil;
import org.apache.drill.SingleRowListener;
import org.apache.drill.common.DrillAutoCloseables;
import org.apache.drill.common.concurrent.ExtendedLatch;
-import org.apache.drill.common.config.DrillConfig;
import org.apache.drill.common.exceptions.UserException;
import org.apache.drill.common.types.TypeProtos.MinorType;
-import org.apache.drill.common.util.RepeatTestRule.Repeat;
import org.apache.drill.exec.ExecConstants;
-import org.apache.drill.exec.ZookeeperHelper;
import org.apache.drill.exec.ZookeeperTestUtil;
-import org.apache.drill.exec.client.DrillClient;
-import org.apache.drill.exec.exception.DrillbitStartupException;
import org.apache.drill.exec.memory.BufferAllocator;
import org.apache.drill.exec.memory.RootAllocatorFactory;
import org.apache.drill.exec.physical.impl.ScreenCreator;
@@ -56,10 +62,7 @@ import org.apache.drill.exec.physical.impl.partitionsender.PartitionerDecorator;
import org.apache.drill.exec.physical.impl.unorderedreceiver.UnorderedReceiverBatch;
import org.apache.drill.exec.physical.impl.xsort.ExternalSortBatch;
import org.apache.drill.exec.planner.sql.DrillSqlWorker;
-import org.apache.drill.exec.proto.CoordinationProtos.DrillbitEndpoint;
import org.apache.drill.exec.proto.GeneralRPCProtos.Ack;
-import org.apache.drill.exec.proto.UserBitShared;
-import org.apache.drill.exec.proto.UserBitShared.DrillPBError;
import org.apache.drill.exec.proto.UserBitShared.ExceptionWrapper;
import org.apache.drill.exec.proto.UserBitShared.QueryData;
import org.apache.drill.exec.proto.UserBitShared.QueryId;
@@ -78,18 +81,18 @@ import org.apache.drill.exec.store.pojo.PojoRecordReader;
import org.apache.drill.exec.testing.Controls;
import org.apache.drill.exec.testing.ControlsInjectionUtil;
import org.apache.drill.exec.util.Pointer;
-import org.apache.drill.exec.work.foreman.Foreman;
-import org.apache.drill.exec.work.foreman.ForemanException;
-import org.apache.drill.exec.work.foreman.ForemanSetupException;
import org.apache.drill.exec.work.fragment.FragmentExecutor;
-import org.apache.drill.test.DrillTest;
import org.apache.drill.categories.SlowTest;
-import org.junit.After;
-import org.junit.AfterClass;
-import org.junit.BeforeClass;
-import org.junit.Ignore;
-import org.junit.Test;
-import org.junit.experimental.categories.Category;
+import org.junit.jupiter.api.AfterAll;
+import org.junit.jupiter.api.AfterEach;
+import org.junit.jupiter.api.BeforeEach;
+import org.junit.jupiter.api.RepeatedTest;
+import org.junit.jupiter.api.BeforeAll;
+import org.junit.jupiter.api.Tag;
+import org.junit.jupiter.api.Test;
+import org.junit.jupiter.api.TestInfo;
+import org.junit.jupiter.api.Timeout;
+import org.junit.jupiter.api.extension.TestInstantiationException;
import org.slf4j.Logger;
import org.apache.drill.shaded.guava.com.google.common.base.Preconditions;
@@ -98,19 +101,19 @@ import org.apache.drill.shaded.guava.com.google.common.base.Preconditions;
* Test how resilient drillbits are to throwing exceptions during various phases of query
* execution by injecting exceptions at various points, and to cancellations in various phases.
*/
-@Category({SlowTest.class})
-public class TestDrillbitResilience extends DrillTest {
+@Tag(SlowTest.TAG)
+@Tag(FlakyTest.TAG)
+public class TestDrillbitResilience extends ClusterTest {
private static final Logger logger = org.slf4j.LoggerFactory.getLogger(TestDrillbitResilience.class);
-
- private static ZookeeperHelper zkHelper;
- private static RemoteServiceSet remoteServiceSet;
- private static final Map<String, Drillbit> drillbits = new HashMap<>();
- private static DrillClient drillClient;
+ protected static LogFixture logFixture;
/**
- * The number of times test (that are repeated) should be repeated.
+ * The number of times test should be repeated. For proper testing this functionality set this value to 1000
*/
private static final int NUM_RUNS = 3;
+ private static final int PROBLEMATIC_TEST_NUM_RUNS = 3;
+ private static final int TIMEOUT = 10;
+ private final static Level CURRENT_LOG_LEVEL = Level.INFO;
/**
* Note: Counting sys.memory executes a fragment on every drillbit. This is a better check in comparison to
@@ -118,48 +121,6 @@ public class TestDrillbitResilience extends DrillTest {
*/
private static final String TEST_QUERY = "select * from sys.memory";
- private static void startDrillbit(final String name, final RemoteServiceSet remoteServiceSet) {
- if (drillbits.containsKey(name)) {
- throw new IllegalStateException("Drillbit named \"" + name + "\" already exists");
- }
-
- try {
- final Drillbit drillbit = Drillbit.start(zkHelper.getConfig(), remoteServiceSet);
- drillbits.put(name, drillbit);
- } catch (final DrillbitStartupException e) {
- throw new RuntimeException("Failed to start Drillbit \"" + name + "\"", e);
- }
- }
-
- /**
- * Shutdown the specified drillbit.
- *
- * @param name name of the drillbit
- */
- private static void stopDrillbit(final String name) {
- final Drillbit drillbit = drillbits.get(name);
- if (drillbit == null) {
- throw new IllegalStateException("No Drillbit named \"" + name + "\" found");
- }
-
- try {
- drillbit.close();
- } catch (final Exception e) {
- final String message = "Error shutting down Drillbit \"" + name + "\"";
- logger.warn(message, e);
- }
- }
-
- /**
- * Shutdown all the drillbits.
- */
- private static void stopAllDrillbits() {
- for (String name : drillbits.keySet()) {
- stopDrillbit(name);
- }
- drillbits.clear();
- }
-
/*
* Canned drillbit names.
*/
@@ -167,71 +128,42 @@ public class TestDrillbitResilience extends DrillTest {
private final static String DRILLBIT_BETA = "beta";
private final static String DRILLBIT_GAMMA = "gamma";
- /**
- * Get the endpoint for the drillbit, if it is running
- * @param name name of the drillbit
- * @return endpoint of the drillbit
- */
- private static DrillbitEndpoint getEndpoint(final String name) {
- final Drillbit drillbit = drillbits.get(name);
- if (drillbit == null) {
- throw new IllegalStateException("No Drillbit named \"" + name + "\" found.");
- }
- return drillbit.getContext().getEndpoint();
- }
-
- @BeforeClass
+ @BeforeAll
public static void startSomeDrillbits() throws Exception {
- // turn off the HTTP server to avoid port conflicts between the drill bits
- System.setProperty(ExecConstants.HTTP_ENABLE, "false");
-
+ logFixture = LogFixture.builder()
+ .toConsole()
+ .logger(TestDrillbitResilience.class, CURRENT_LOG_LEVEL)
+ .logger(DrillClient.class, CURRENT_LOG_LEVEL)
+ .logger(QueryStateProcessor.class, CURRENT_LOG_LEVEL)
+ .logger(WorkManager.class, CURRENT_LOG_LEVEL)
+ .logger(UnorderedReceiverBatch.class, CURRENT_LOG_LEVEL)
+ .logger(ExtendedLatch.class, CURRENT_LOG_LEVEL)
+ .logger(Foreman.class, CURRENT_LOG_LEVEL)
+ .logger(QueryStateProcessor.class, CURRENT_LOG_LEVEL)
+ .logger(ExecutionControlsInjector.class, CURRENT_LOG_LEVEL)
+ .build();
ZookeeperTestUtil.setJaasTestConfigFile();
-
- // turn on error for failure in cancelled fragments
- zkHelper = new ZookeeperHelper(true, true);
- zkHelper.startZookeeper(1);
-
- // use a non-null service set so that the drillbits can use port hunting
- remoteServiceSet = RemoteServiceSet.getLocalServiceSet();
-
- // create name-addressable drillbits
- startDrillbit(DRILLBIT_ALPHA, remoteServiceSet);
- startDrillbit(DRILLBIT_BETA, remoteServiceSet);
- startDrillbit(DRILLBIT_GAMMA, remoteServiceSet);
-
- // create a client
- final DrillConfig drillConfig = zkHelper.getConfig();
- drillClient = QueryTestUtil.createClient(drillConfig, remoteServiceSet, 1, new Properties());
+ dirTestWatcher.start(TestDrillbitResilience.class); // until DirTestWatcher is implemented for JUnit5
+ ClusterFixtureBuilder builder = ClusterFixture.builder(dirTestWatcher)
+ .configProperty(ExecConstants.HTTP_ENABLE, false) // turn off the HTTP server to avoid port conflicts between the drill bits
+ .withBits(DRILLBIT_ALPHA, DRILLBIT_BETA, DRILLBIT_GAMMA);
+ startCluster(builder);
clearAllInjections();
+ logger.debug("Start 3 drillbits Test Drill cluster: {}, {}, {}", DRILLBIT_ALPHA, DRILLBIT_BETA, DRILLBIT_GAMMA);
}
- @AfterClass
- public static void shutdownAllDrillbits() {
- if (drillClient != null) {
- drillClient.close();
- drillClient = null;
- }
-
- stopAllDrillbits();
-
- if (remoteServiceSet != null) {
- try {
- remoteServiceSet.close();
- } catch (Exception e) {
- logger.warn("Failure on close()", e);
- }
- remoteServiceSet = null;
- }
-
- zkHelper.stopZookeeper();
+ @AfterAll
+ public static void tearDownAfterClass() throws Exception {
+ logFixture.close();
}
/**
* Clear all injections.
*/
private static void clearAllInjections() {
- Preconditions.checkNotNull(drillClient);
- ControlsInjectionUtil.clearControls(drillClient);
+ logger.debug("Clear all injections");
+ Preconditions.checkNotNull(client);
+ ControlsInjectionUtil.clearControls(client.client());
}
/**
@@ -240,323 +172,128 @@ public class TestDrillbitResilience extends DrillTest {
* <p>The current implementation does this by counting the number of drillbits using a query.
*/
private static void assertDrillbitsOk() {
- final SingleRowListener listener = new SingleRowListener() {
- private final BufferAllocator bufferAllocator = RootAllocatorFactory.newRoot(zkHelper.getConfig());
- private final RecordBatchLoader loader = new RecordBatchLoader(bufferAllocator);
-
- @Override
- public void rowArrived(final QueryDataBatch queryResultBatch) {
- // load the single record
- final QueryData queryData = queryResultBatch.getHeader();
- // TODO: Clean: DRILL-2933: That load(...) no longer throws
- // SchemaChangeException.
- loader.load(queryData.getDef(), queryResultBatch.getData());
- assertEquals(1, loader.getRecordCount());
-
- // there should only be one column
- final BatchSchema batchSchema = loader.getSchema();
- assertEquals(1, batchSchema.getFieldCount());
-
- // the column should be an integer
- final MaterializedField countField = batchSchema.getColumn(0);
- final MinorType fieldType = countField.getType().getMinorType();
- assertEquals(MinorType.BIGINT, fieldType);
-
- // get the column value
- final VectorWrapper<?> vw = loader.iterator().next();
- final Object obj = vw.getValueVector().getAccessor().getObject(0);
- assertTrue(obj instanceof Long);
- final Long countValue = (Long) obj;
-
- // assume this means all the drillbits are still ok
- assertEquals(drillbits.size(), countValue.intValue());
-
- loader.clear();
- }
+ SingleRowListener listener = new SingleRowListener() {
+ private final BufferAllocator bufferAllocator = RootAllocatorFactory.newRoot(cluster.config());
+ private final RecordBatchLoader loader = new RecordBatchLoader(bufferAllocator);
- @Override
- public void cleanup() {
- DrillAutoCloseables.closeNoChecked(bufferAllocator);
- }
- };
+ @Override
+ public void rowArrived(QueryDataBatch queryResultBatch) {
+ // load the single record
+ final QueryData queryData = queryResultBatch.getHeader();
+ // TODO: Clean: DRILL-2933: That load(...) no longer throws
+ // SchemaChangeException.
+ loader.load(queryData.getDef(), queryResultBatch.getData());
+ assertEquals(1, loader.getRecordCount());
+
+ // there should only be one column
+ final BatchSchema batchSchema = loader.getSchema();
+ assertEquals(1, batchSchema.getFieldCount());
+
+ // the column should be an integer
+ final MaterializedField countField = batchSchema.getColumn(0);
+ final MinorType fieldType = countField.getType().getMinorType();
+ assertEquals(MinorType.BIGINT, fieldType);
+
+ // get the column value
+ final VectorWrapper<?> vw = loader.iterator().next();
+ final Object obj = vw.getValueVector().getAccessor().getObject(0);
+ assertTrue(obj instanceof Long);
+ final Long countValue = (Long) obj;
+
+ // assume this means all the drillbits are still ok
+ assertEquals(cluster.drillbits().size(), countValue.intValue());
+
+ loader.clear();
+ }
+
+ @Override
+ public void cleanup() {
+ loader.clear();
+ DrillAutoCloseables.closeNoChecked(bufferAllocator);
+ }
+ };
try {
- QueryTestUtil.testWithListener(drillClient, QueryType.SQL, "select count(*) from sys.memory", listener);
+ QueryTestUtil.testWithListener(client.client(), QueryType.SQL, "select count(*) from sys.memory", listener);
listener.waitForCompletion();
- final QueryState state = listener.getQueryState();
- assertTrue(String.format("QueryState should be COMPLETED (and not %s).", state), state == QueryState.COMPLETED);
+ QueryState state = listener.getQueryState();
+ assertSame(state, QueryState.COMPLETED, () -> String.format("QueryState should be COMPLETED (and not %s).", state));
+ assertTrue(listener.getErrorList().isEmpty(), "There should not be any errors when checking if Drillbits are OK");
} catch (final Exception e) {
throw new RuntimeException("Couldn't query active drillbits", e);
+ } finally {
+ logger.debug("Cleanup listener");
+ listener.cleanup();
}
+ logger.debug("Drillbits are ok.");
+ }
- final List<DrillPBError> errorList = listener.getErrorList();
- assertTrue("There should not be any errors when checking if Drillbits are OK.", errorList.isEmpty());
+ @BeforeEach
+ void setUp(TestInfo testInfo) {
+ String testName = testInfo.getTestMethod().orElseThrow(() -> new TestInstantiationException("Can't get method neme")).getName();
+ String testOrRepetitionName = testInfo.getDisplayName();
+ if(testOrRepetitionName.startsWith("repetition")) {
+ logger.debug("{} for {} test started", testOrRepetitionName, testName);
+ } else {
+ logger.debug("{} test started", testName);
+ }
}
- @After
- public void checkDrillbits() {
+ @AfterEach
+ public void checkDrillbits(TestInfo testInfo) {
clearAllInjections(); // so that the drillbit check itself doesn't trigger anything
assertDrillbitsOk(); // TODO we need a way to do this without using a query
- }
-
- /**
- * Set the given controls.
- */
- private static void setControls(final String controls) {
- ControlsInjectionUtil.setControls(drillClient, controls);
- }
-
- /**
- * Sets a session option.
- */
- private static void setSessionOption(final String option, final String value) {
- ControlsInjectionUtil.setSessionOption(drillClient, option, value);
- }
-
- private static void resetSessionOption(final String option) {
- try {
- final List<QueryDataBatch> results = drillClient.runQuery(
- UserBitShared.QueryType.SQL, String.format("ALTER session RESET `%s`",
- option));
- for (final QueryDataBatch data : results) {
- data.release();
- }
- } catch (final RpcException e) {
- fail("Could not reset option: " + e.toString());
+ String testName = testInfo.getTestMethod().orElseThrow(() -> new TestInstantiationException("Can't get method neme")).getName();
+ String testOrRepetitionName = testInfo.getDisplayName();
+ if(testOrRepetitionName.startsWith("repetition")) {
+ logger.debug("{} for {} test finished", testOrRepetitionName, testName);
+ } else {
+ logger.debug("{} test finished", testName);
}
}
- /**
- * Check that the injected exception is what we were expecting.
- *
- * @param throwable the throwable that was caught (by the test)
- * @param exceptionClass the expected exception class
- * @param desc the expected exception site description
- */
- private static void assertExceptionMessage(final Throwable throwable, final Class<? extends Throwable> exceptionClass,
- final String desc) {
- assertTrue("Throwable was not of UserException type.", throwable instanceof UserException);
- final ExceptionWrapper cause = ((UserException) throwable).getOrCreatePBError(false).getException();
- assertEquals("Exception class names should match.", exceptionClass.getName(), cause.getExceptionClass());
- assertEquals("Exception sites should match.", desc, cause.getMessage());
- }
-
@Test
+ @Timeout(value = TIMEOUT)
public void settingNoOpInjectionsAndQuery() {
final long before = countAllocatedMemory();
final String controls = Controls.newBuilder()
- .addExceptionOnBit(getClass(), "noop", RuntimeException.class, getEndpoint(DRILLBIT_BETA))
+ .addExceptionOnBit(getClass(), "noop", RuntimeException.class, cluster.drillbit(DRILLBIT_BETA).getContext().getEndpoint())
.build();
setControls(controls);
final WaitUntilCompleteListener listener = new WaitUntilCompleteListener();
- QueryTestUtil.testWithListener(drillClient, QueryType.SQL, TEST_QUERY, listener);
+ QueryTestUtil.testWithListener(client.client(), QueryType.SQL, TEST_QUERY, listener);
final Pair<QueryState, Exception> pair = listener.waitForCompletion();
assertStateCompleted(pair, QueryState.COMPLETED);
final long after = countAllocatedMemory();
- assertEquals(String.format("We are leaking %d bytes", after - before), before, after);
+ assertEquals(before, after, () -> String.format("We are leaking %d bytes", after - before));
}
- /**
- * Test throwing exceptions from sites within the Foreman class, as specified by the site
- * description
- *
- * @param desc site description
- */
- private static void testForeman(final String desc) {
- final String controls = Controls.newBuilder()
- .addException(Foreman.class, desc, ForemanException.class)
- .build();
- assertFailsWithException(controls, ForemanException.class, desc);
- }
-
- @Test
- @Repeat(count = NUM_RUNS)
+ @RepeatedTest(NUM_RUNS)
+ @Timeout(value = TIMEOUT)
public void foreman_runTryBeginning() {
final long before = countAllocatedMemory();
testForeman("run-try-beginning");
final long after = countAllocatedMemory();
- assertEquals(String.format("We are leaking %d bytes", after - before), before, after);
+ assertEquals(before, after, () -> String.format("We are leaking %d bytes", after - before));
}
- @Test
- @Ignore // TODO(DRILL-3163, DRILL-3167)
- //@Repeat(count = NUM_RUNS)
+ @Test // DRILL-3163, DRILL-3167
+ @Timeout(value = TIMEOUT)
public void foreman_runTryEnd() {
final long before = countAllocatedMemory();
testForeman("run-try-end");
final long after = countAllocatedMemory();
- assertEquals(String.format("We are leaking %d bytes", after - before), before, after);
- }
-
- /**
- * Tests can use this listener to wait, until the submitted query completes or fails, by
- * calling #waitForCompletion.
- */
- private static class WaitUntilCompleteListener implements UserResultsListener {
- private final ExtendedLatch latch = new ExtendedLatch(1); // to signal completion
- protected QueryId queryId = null;
- protected volatile Pointer<Exception> ex = new Pointer<>();
- protected volatile QueryState state = null;
-
- /**
- * Method that sets the exception if the condition is not met.
- */
- protected final void check(final boolean condition, final String format, final Object... args) {
- if (!condition) {
- ex.value = new IllegalStateException(String.format(format, args));
- }
- }
-
- /**
- * Method that cancels and resumes the query, in order.
- */
- protected final void cancelAndResume() {
- Preconditions.checkNotNull(queryId);
- final ExtendedLatch trigger = new ExtendedLatch(1);
- (new CancellingThread(queryId, ex, trigger)).start();
- (new ResumingThread(queryId, ex, trigger)).start();
- }
-
- @Override
- public void queryIdArrived(final QueryId queryId) {
- this.queryId = queryId;
- }
-
- @Override
- public void submissionFailed(final UserException ex) {
- this.ex.value = ex;
- state = QueryState.FAILED;
- latch.countDown();
- }
-
- @Override
- public void queryCompleted(final QueryState state) {
- this.state = state;
- latch.countDown();
- }
-
- @Override
- public void dataArrived(final QueryDataBatch result, final ConnectionThrottle throttle) {
- result.release();
- }
-
- public final Pair<QueryState, Exception> waitForCompletion() {
- latch.awaitUninterruptibly();
- return new Pair<>(state, ex.value);
- }
- }
-
- private static class ListenerThatCancelsQueryAfterFirstBatchOfData extends WaitUntilCompleteListener {
- private boolean cancelRequested = false;
-
- @Override
- public void dataArrived(final QueryDataBatch result, final ConnectionThrottle throttle) {
- if (!cancelRequested) {
- check(queryId != null, "Query id should not be null, since we have waited long enough.");
- (new CancellingThread(queryId, ex, null)).start();
- cancelRequested = true;
- }
- result.release();
- }
- }
-
- /**
- * Thread that cancels the given query id. After the cancel is acknowledged, the latch is counted down.
- */
- private static class CancellingThread extends Thread {
- private final QueryId queryId;
- private final Pointer<Exception> ex;
- private final ExtendedLatch latch;
-
- public CancellingThread(final QueryId queryId, final Pointer<Exception> ex, final ExtendedLatch latch) {
- this.queryId = queryId;
- this.ex = ex;
- this.latch = latch;
- }
-
- @Override
- public void run() {
- final DrillRpcFuture<Ack> cancelAck = drillClient.cancelQuery(queryId);
- try {
- cancelAck.checkedGet();
- } catch (final RpcException ex) {
- this.ex.value = ex;
- }
- if (latch != null) {
- latch.countDown();
- }
- }
- }
-
- /**
- * Thread that resumes the given query id. After the latch is counted down, the resume signal is sent, until then
- * the thread waits without interruption.
- */
- private static class ResumingThread extends Thread {
- private final QueryId queryId;
- private final Pointer<Exception> ex;
- private final ExtendedLatch latch;
-
- public ResumingThread(final QueryId queryId, final Pointer<Exception> ex, final ExtendedLatch latch) {
- this.queryId = queryId;
- this.ex = ex;
- this.latch = latch;
- }
-
- @Override
- public void run() {
- latch.awaitUninterruptibly();
- final DrillRpcFuture<Ack> resumeAck = drillClient.resumeQuery(queryId);
- try {
- resumeAck.checkedGet();
- } catch (final RpcException ex) {
- this.ex.value = ex;
- }
- }
- }
-
- /**
- * Given the result of {@link WaitUntilCompleteListener#waitForCompletion},
- * this method fails if the completed state is not as expected, or if an
- * exception is thrown. The completed state could be COMPLETED or CANCELED.
- * This state is set when {@link WaitUntilCompleteListener#queryCompleted} is
- * called.
- */
- private static void assertStateCompleted(final Pair<QueryState, Exception> result, final QueryState expectedState) {
- final QueryState actualState = result.getFirst();
- final Exception exception = result.getSecond();
- if (actualState != expectedState || exception != null) {
- fail(String.format("Query state is incorrect (expected: %s, actual: %s) AND/OR \nException thrown: %s",
- expectedState, actualState, exception == null ? "none." : exception));
- }
- }
-
- /**
- * Given a set of controls, this method ensures that the given query completes with a CANCELED state.
- */
- private static void assertCancelledWithoutException(final String controls, final WaitUntilCompleteListener listener,
- final String query) {
- setControls(controls);
-
- QueryTestUtil.testWithListener(drillClient, QueryType.SQL, query, listener);
- final Pair<QueryState, Exception> result = listener.waitForCompletion();
- assertStateCompleted(result, QueryState.CANCELED);
- }
-
- /**
- * Given a set of controls, this method ensures that the TEST_QUERY completes with a CANCELED state.
- */
- private static void assertCancelledWithoutException(final String controls, final WaitUntilCompleteListener listener) {
- assertCancelledWithoutException(controls, listener, TEST_QUERY);
+ assertEquals(before, after, () -> String.format("We are leaking %d bytes", after - before));
}
@Test // To test pause and resume. Test hangs and times out if resume did not happen.
+ @Timeout(value = TIMEOUT)
public void passThrough() {
final long before = countAllocatedMemory();
@@ -575,20 +312,19 @@ public class TestDrillbitResilience extends DrillTest {
.build();
setControls(controls);
- QueryTestUtil.testWithListener(drillClient, QueryType.SQL, TEST_QUERY, listener);
+ QueryTestUtil.testWithListener(client.client(), QueryType.SQL, TEST_QUERY, listener);
final Pair<QueryState, Exception> result = listener.waitForCompletion();
assertStateCompleted(result, QueryState.COMPLETED);
final long after = countAllocatedMemory();
- assertEquals(String.format("We are leaking %d bytes", after - before), before, after);
+ assertEquals(before, after, () -> String.format("We are leaking %d bytes", after - before));
}
// DRILL-3052: Since root fragment is waiting on data and leaf fragments are cancelled before they send any
// data to root, root will never run. This test will timeout if the root did not send the final state to Foreman.
// DRILL-2383: Cancellation TC 1: cancel before any result set is returned.
- @Test
- @Ignore // TODO(DRILL-3192)
- //@Repeat(count = NUM_RUNS)
+ @RepeatedTest(PROBLEMATIC_TEST_NUM_RUNS) // DRILL-3192
+ @Timeout(value = TIMEOUT)
public void cancelWhenQueryIdArrives() {
final long before = countAllocatedMemory();
@@ -597,7 +333,7 @@ public class TestDrillbitResilience extends DrillTest {
@Override
public void queryIdArrived(final QueryId queryId) {
super.queryIdArrived(queryId);
- cancelAndResume();
+ cancelAndResume(true);
}
};
@@ -607,12 +343,11 @@ public class TestDrillbitResilience extends DrillTest {
assertCancelledWithoutException(controls, listener);
final long after = countAllocatedMemory();
- assertEquals(String.format("We are leaking %d bytes", after - before), before, after);
+ assertEquals(before, after, () -> String.format("We are leaking %d bytes", after - before));
}
- @Test // DRILL-2383: Cancellation TC 2: cancel in the middle of fetching result set
- @Repeat(count = NUM_RUNS)
- @Ignore("DRILL-6228")
+ @RepeatedTest(NUM_RUNS) // DRILL-6228
+ @Timeout(value = TIMEOUT) // DRILL-2383: Cancellation TC 2: cancel in the middle of fetching result set
public void cancelInMiddleOfFetchingResults() {
final long before = countAllocatedMemory();
@@ -623,7 +358,7 @@ public class TestDrillbitResilience extends DrillTest {
public void dataArrived(final QueryDataBatch result, final ConnectionThrottle throttle) {
if (!cancelRequested) {
check(queryId != null, "Query id should not be null, since we have waited long enough.");
- cancelAndResume();
+ cancelAndResume(false);
cancelRequested = true;
}
result.release();
@@ -637,13 +372,12 @@ public class TestDrillbitResilience extends DrillTest {
assertCancelledWithoutException(controls, listener);
final long after = countAllocatedMemory();
- assertEquals(String.format("We are leaking %d bytes", after - before), before, after);
+ assertEquals(before, after, () -> String.format("We are leaking %d bytes", after - before));
}
- @Test // DRILL-2383: Cancellation TC 3: cancel after all result set are produced but not all are fetched
- @Repeat(count = NUM_RUNS)
- @Ignore("DRILL-6228")
+ @RepeatedTest(NUM_RUNS) // DRILL-6228
+ @Timeout(value = TIMEOUT) // DRILL-2383: Cancellation TC 3: cancel after all result set are produced but not all are fetched
public void cancelAfterAllResultsProduced() {
final long before = countAllocatedMemory();
@@ -652,9 +386,9 @@ public class TestDrillbitResilience extends DrillTest {
@Override
public void dataArrived(final QueryDataBatch result, final ConnectionThrottle throttle) {
- if (++count == drillbits.size()) {
+ if (lastDrillbit()) {
check(queryId != null, "Query id should not be null, since we have waited long enough.");
- cancelAndResume();
+ cancelAndResume(false);
}
result.release();
}
@@ -666,23 +400,23 @@ public class TestDrillbitResilience extends DrillTest {
assertCancelledWithoutException(controls, listener);
final long after = countAllocatedMemory();
- assertEquals(String.format("We are leaking %d bytes", after - before), before, after);
+ assertEquals(before, after, () -> String.format("We are leaking %d bytes", after - before));
}
- @Test // DRILL-2383: Cancellation TC 4: cancel after everything is completed and fetched
- @Repeat(count = NUM_RUNS)
- @Ignore("DRILL-3967")
+ @RepeatedTest(PROBLEMATIC_TEST_NUM_RUNS) // DRILL-3967
+ @Timeout(value = TIMEOUT) // DRILL-2383: Cancellation TC 4: cancel after everything is completed and fetched
public void cancelAfterEverythingIsCompleted() {
final long before = countAllocatedMemory();
final WaitUntilCompleteListener listener = new WaitUntilCompleteListener() {
- private int count = 0;
@Override
public void dataArrived(final QueryDataBatch result, final ConnectionThrottle throttle) {
- if (++count == drillbits.size()) {
+ if (lastDrillbit()) {
check(queryId != null, "Query id should not be null, since we have waited long enough.");
- cancelAndResume();
+ // need to wait until all batches are processed, since foreman-cleanup - the pause that happened earlier,
+ // than the client accepts queryCompleted() from UserResultsListener
+ cancelAndResume(true);
}
result.release();
}
@@ -691,45 +425,28 @@ public class TestDrillbitResilience extends DrillTest {
final String controls = Controls.newBuilder()
.addPause(Foreman.class, "foreman-cleanup")
.build();
- assertCancelledWithoutException(controls, listener);
+ assertCompletedWithoutException(controls, listener); // changed from CANCELLED
final long after = countAllocatedMemory();
- assertEquals(String.format("We are leaking %d bytes", after - before), before, after);
+ assertEquals(before, after, () -> String.format("We are leaking %d bytes", after - before));
}
@Test // DRILL-2383: Completion TC 1: success
+ @Timeout(value = TIMEOUT)
public void successfullyCompletes() {
final long before = countAllocatedMemory();
final WaitUntilCompleteListener listener = new WaitUntilCompleteListener();
- QueryTestUtil.testWithListener(drillClient, QueryType.SQL, TEST_QUERY, listener);
+ QueryTestUtil.testWithListener(client.client(), QueryType.SQL, TEST_QUERY, listener);
final Pair<QueryState, Exception> result = listener.waitForCompletion();
assertStateCompleted(result, QueryState.COMPLETED);
final long after = countAllocatedMemory();
- assertEquals(String.format("We are leaking %d bytes", after - before), before, after);
- }
-
- /**
- * Given a set of controls, this method ensures TEST_QUERY fails with the given class and desc.
- */
- private static void assertFailsWithException(final String controls, final Class<? extends Throwable> exceptionClass,
- final String exceptionDesc, final String query) {
- setControls(controls);
- final WaitUntilCompleteListener listener = new WaitUntilCompleteListener();
- QueryTestUtil.testWithListener(drillClient, QueryType.SQL, query, listener);
- final Pair<QueryState, Exception> result = listener.waitForCompletion();
- final QueryState state = result.getFirst();
- assertTrue(String.format("Query state should be FAILED (and not %s).", state), state == QueryState.FAILED);
- assertExceptionMessage(result.getSecond(), exceptionClass, exceptionDesc);
- }
-
- private static void assertFailsWithException(final String controls, final Class<? extends Throwable> exceptionClass,
- final String exceptionDesc) {
- assertFailsWithException(controls, exceptionClass, exceptionDesc, TEST_QUERY);
+ assertEquals(before, after, () -> String.format("We are leaking %d bytes", after - before));
}
@Test // DRILL-2383: Completion TC 2: failed query - before query is executed - while sql parsing
+ @Timeout(value = TIMEOUT)
public void failsWhenParsing() {
final long before = countAllocatedMemory();
@@ -743,26 +460,27 @@ public class TestDrillbitResilience extends DrillTest {
assertFailsWithException(controls, exceptionClass, exceptionDesc);
final long after = countAllocatedMemory();
- assertEquals(String.format("We are leaking %d bytes", after - before), before, after);
+ assertEquals(before, after, () -> String.format("We are leaking %d bytes", after - before));
}
- @Test // DRILL-2383: Completion TC 3: failed query - before query is executed - while sending fragments to other
- // drillbits
+ @Test // DRILL-2383: Completion TC 3: failed query - before query is executed, while sending fragments to other drillbits
+ @Timeout(value = TIMEOUT)
public void failsWhenSendingFragments() {
final long before = countAllocatedMemory();
final String exceptionDesc = "send-fragments";
final Class<? extends Throwable> exceptionClass = ForemanException.class;
final String controls = Controls.newBuilder()
- .addException(FragmentsRunner.class, exceptionDesc, exceptionClass)
+ .addException(FragmentsRunner.class, exceptionDesc, exceptionClass)
.build();
assertFailsWithException(controls, exceptionClass, exceptionDesc);
final long after = countAllocatedMemory();
- assertEquals(String.format("We are leaking %d bytes", after - before), before, after);
+ assertEquals(before, after, () -> String.format("We are leaking %d bytes", after - before));
}
@Test // DRILL-2383: Completion TC 4: failed query - during query execution
+ @Timeout(value = TIMEOUT)
public void failsDuringExecution() {
final long before = countAllocatedMemory();
@@ -774,15 +492,15 @@ public class TestDrillbitResilience extends DrillTest {
assertFailsWithException(controls, exceptionClass, exceptionDesc);
final long after = countAllocatedMemory();
- assertEquals(String.format("We are leaking %d bytes", after - before), before, after);
+ assertEquals(before, after, () -> String.format("We are leaking %d bytes", after - before));
}
/**
* Test canceling query interrupts currently blocked FragmentExecutor threads waiting for some event to happen.
* Specifically tests canceling fragment which has {@link MergingRecordBatch} blocked waiting for data.
*/
- @Test
- @Repeat(count = NUM_RUNS)
+ @RepeatedTest(NUM_RUNS)
+ @Timeout(value = TIMEOUT)
public void interruptingBlockedMergingRecordBatch() {
final long before = countAllocatedMemory();
@@ -792,38 +510,26 @@ public class TestDrillbitResilience extends DrillTest {
interruptingBlockedFragmentsWaitingForData(control);
final long after = countAllocatedMemory();
- assertEquals(String.format("We are leaking %d bytes", after - before), before, after);
+ assertEquals(before, after, () -> String.format("We are leaking %d bytes", after - before));
}
/**
* Test canceling query interrupts currently blocked FragmentExecutor threads waiting for some event to happen.
* Specifically tests canceling fragment which has {@link UnorderedReceiverBatch} blocked waiting for data.
*/
- @Test
- @Repeat(count = NUM_RUNS)
+ @RepeatedTest(PROBLEMATIC_TEST_NUM_RUNS)
+ @Timeout(value = TIMEOUT)
public void interruptingBlockedUnorderedReceiverBatch() {
final long before = countAllocatedMemory();
final String control = Controls.newBuilder()
.addPause(UnorderedReceiverBatch.class, "waiting-for-data", 1)
.build();
+ logger.debug("Start interruptingBlockedFragmentsWaitingForData");
interruptingBlockedFragmentsWaitingForData(control);
final long after = countAllocatedMemory();
- assertEquals(String.format("We are leaking %d bytes", after - before), before, after);
- }
-
- private static void interruptingBlockedFragmentsWaitingForData(final String control) {
- try {
- setSessionOption(SLICE_TARGET, "1");
- setSessionOption(HASHAGG.getOptionName(), "false");
-
- final String query = "SELECT sales_city, COUNT(*) cnt FROM cp.`region.json` GROUP BY sales_city";
- assertCancelledWithoutException(control, new ListenerThatCancelsQueryAfterFirstBatchOfData(), query);
- } finally {
- resetSessionOption(SLICE_TARGET);
- resetSessionOption(HASHAGG.getOptionName());
- }
+ assertEquals(before, after, () -> String.format("We are leaking %d bytes", after - before));
}
/**
@@ -831,13 +537,13 @@ public class TestDrillbitResilience extends DrillTest {
* {@link PartitionSenderRootExec} spawns threads for partitioner. Interrupting fragment thread should also interrupt
* the partitioner threads.
*/
- @Test
- @Repeat(count = NUM_RUNS)
+ @RepeatedTest(NUM_RUNS)
+ @Timeout(value = TIMEOUT)
public void interruptingPartitionerThreadFragment() {
try {
- setSessionOption(SLICE_TARGET, "1");
- setSessionOption(HASHAGG.getOptionName(), "true");
- setSessionOption(PARTITION_SENDER_SET_THREADS.getOptionName(), "6");
+ client.alterSession(SLICE_TARGET, "1");
+ client.alterSession(ENABLE_HASH_AGG_OPTION, "true");
+ client.alterSession(PARTITION_SENDER_SET_THREADS.getOptionName(), "6");
final long before = countAllocatedMemory();
@@ -850,17 +556,16 @@ public class TestDrillbitResilience extends DrillTest {
assertCancelledWithoutException(controls, new ListenerThatCancelsQueryAfterFirstBatchOfData(), query);
final long after = countAllocatedMemory();
- assertEquals(String.format("We are leaking %d bytes", after - before), before, after);
+ assertEquals(before, after, () -> String.format("We are leaking %d bytes", after - before));
} finally {
- resetSessionOption(SLICE_TARGET);
- resetSessionOption(HASHAGG.getOptionName());
- resetSessionOption(PARTITION_SENDER_SET_THREADS.getOptionName());
+ client.resetSession(SLICE_TARGET);
+ client.resetSession(ENABLE_HASH_AGG_OPTION);
+ client.resetSession(PARTITION_SENDER_SET_THREADS.getOptionName());
}
}
- @Test
- @Ignore // TODO(DRILL-3193)
- //@Repeat(count = NUM_RUNS)
+ @Test // DRILL-3193
+ @Timeout(value = TIMEOUT)
public void interruptingWhileFragmentIsBlockedInAcquiringSendingTicket() {
final long before = countAllocatedMemory();
@@ -870,13 +575,13 @@ public class TestDrillbitResilience extends DrillTest {
assertCancelledWithoutException(control, new ListenerThatCancelsQueryAfterFirstBatchOfData());
final long after = countAllocatedMemory();
- assertEquals(String.format("We are leaking %d bytes", after - before), before, after);
+ assertEquals(before, after, () -> String.format("We are leaking %d bytes", after - before));
}
- @Test
- @Repeat(count = NUM_RUNS)
+ @RepeatedTest(PROBLEMATIC_TEST_NUM_RUNS)
+ @Timeout(value = TIMEOUT)
public void memoryLeaksWhenCancelled() {
- setSessionOption(SLICE_TARGET, "10");
+ client.alterSession(SLICE_TARGET, "10");
final long before = countAllocatedMemory();
@@ -889,7 +594,7 @@ public class TestDrillbitResilience extends DrillTest {
query = BaseTestQuery.getFile("queries/tpch/09.sql");
query = query.substring(0, query.length() - 1); // drop the ";"
} catch (final IOException e) {
- fail("Failed to get query file: " + e);
+ fail("Failed to get query file", e);
}
final WaitUntilCompleteListener listener = new WaitUntilCompleteListener() {
@@ -899,7 +604,7 @@ public class TestDrillbitResilience extends DrillTest {
public void dataArrived(final QueryDataBatch result, final ConnectionThrottle throttle) {
if (!cancelRequested) {
check(queryId != null, "Query id should not be null, since we have waited long enough.");
- cancelAndResume();
+ cancelAndResume(false);
cancelRequested = true;
}
result.release();
@@ -909,17 +614,16 @@ public class TestDrillbitResilience extends DrillTest {
assertCancelledWithoutException(controls, listener, query);
final long after = countAllocatedMemory();
- assertEquals(String.format("We are leaking %d bytes", after - before), before, after);
+ assertEquals(before, after, () -> String.format("We are leaking %d bytes", after - before));
} finally {
- setSessionOption(SLICE_TARGET, Long.toString(SLICE_TARGET_DEFAULT));
+ client.alterSession(SLICE_TARGET, Long.toString(SLICE_TARGET_DEFAULT));
}
}
- @Test
- @Ignore // TODO(DRILL-3194)
- //@Repeat(count = NUM_RUNS)
+ @RepeatedTest(NUM_RUNS) // DRILL-3194
+ @Timeout(value = TIMEOUT)
public void memoryLeaksWhenFailed() {
- setSessionOption(SLICE_TARGET, "10");
+ client.alterSession(SLICE_TARGET, "10");
final long before = countAllocatedMemory();
@@ -941,14 +645,15 @@ public class TestDrillbitResilience extends DrillTest {
assertFailsWithException(controls, exceptionClass, exceptionDesc, query);
final long after = countAllocatedMemory();
- assertEquals(String.format("We are leaking %d bytes", after - before), before, after);
+ assertEquals(before, after, () -> String.format("We are leaking %d bytes", after - before));
} finally {
- setSessionOption(SLICE_TARGET, Long.toString(SLICE_TARGET_DEFAULT));
+ client.alterSession(SLICE_TARGET, Long.toString(SLICE_TARGET_DEFAULT));
}
}
- @Test // DRILL-3065
+ @Test
+ @Timeout(value = TIMEOUT) // DRILL-3065
public void failsAfterMSorterSorting() {
// Note: must use an input table that returns more than one
@@ -966,10 +671,11 @@ public class TestDrillbitResilience extends DrillTest {
assertFailsWithException(controls, typeOfException, ExternalSortBatch.INTERRUPTION_AFTER_SORT, query);
final long after = countAllocatedMemory();
- assertEquals(String.format("We are leaking %d bytes", after - before), before, after);
+ assertEquals(before, after, () -> String.format("We are leaking %d bytes", after - before));
}
- @Test // DRILL-3085
+ @Test
+ @Timeout(value = TIMEOUT) // DRILL-3085
public void failsAfterMSorterSetup() {
// Note: must use an input table that returns more than one
@@ -987,22 +693,305 @@ public class TestDrillbitResilience extends DrillTest {
assertFailsWithException(controls, typeOfException, ExternalSortBatch.INTERRUPTION_AFTER_SETUP, query);
final long after = countAllocatedMemory();
- assertEquals(String.format("We are leaking %d bytes", after - before), before, after);
+ assertEquals(before, after, () -> String.format("We are leaking %d bytes", after - before));
+ }
+
+ /**
+ * Tests can use this listener to wait, until the submitted query completes or fails, by
+ * calling #waitForCompletion.
+ */
+ private static class WaitUntilCompleteListener implements UserResultsListener {
+ protected final ExtendedLatch latch = new ExtendedLatch(1); // to signal completion
+ protected QueryId queryId = null;
+ protected volatile Pointer<Exception> ex = new Pointer<>();
+ protected volatile QueryState state = null;
+ private int count = 0;
+
+ /**
+ * Method that sets the exception if the condition is not met.
+ */
+ protected final void check(final boolean condition, final String format, final Object... args) {
+ if (!condition) {
+ ex.value = new IllegalStateException(String.format(format, args));
+ }
+ }
+
+ /**
+ * Method that cancels and resumes the query, in order.
+ */
+ protected final void cancelAndResume(boolean sleepBeforeStart) {
+ Preconditions.checkNotNull(queryId);
+ final ExtendedLatch trigger = new ExtendedLatch(1);
+ (new CancellingThread(queryId, ex, trigger, sleepBeforeStart)).start();
+ (new ResumingThread(queryId, ex, trigger)).start();
+ }
+
+ @Override
+ public void queryIdArrived(final QueryId queryId) {
+ this.queryId = queryId;
+ }
+
+ @Override
+ public void submissionFailed(final UserException ex) {
+ this.ex.value = ex;
+ state = QueryState.FAILED;
+ latch.countDown();
+ }
+
+ @Override
+ public void queryCompleted(final QueryState state) {
+ this.state = state;
+ latch.countDown();
+ }
+
+ @Override
+ public void dataArrived(final QueryDataBatch result, final ConnectionThrottle throttle) {
+ result.release();
+ }
+
+ public final Pair<QueryState, Exception> waitForCompletion() {
+ try {
+ logger.debug("Wait for completion. latch: {}", latch.getCount());
+ latch.await(); // awaitUninterruptibly method usage here prevents JUnit timeout work
+ } catch (InterruptedException e) {
+ logger.error("Interrupted while waiting for event latch");
+ }
+ logger.debug("Completed. Wait finished");
+ return new Pair<>(state, ex.value);
+ }
+
+ /**
+ * It is useful for queries returned the data from all drillbits. Use this method to make sure {@link #dataArrived}
+ * returns the data from every drillbit
+ *
+ * @return true, in case current listener methods return results from last drillbit already
+ */
+ boolean lastDrillbit() {
+ return ++count == cluster.drillbits().size();
+ }
+ }
+
+ private static class ListenerThatCancelsQueryAfterFirstBatchOfData extends WaitUntilCompleteListener {
+ private boolean cancelRequested = false;
+
+ @Override
+ public void dataArrived(final QueryDataBatch result, final ConnectionThrottle throttle) {
+ if (!cancelRequested) {
+ logger.debug("First batch arrived, so cancelling thread started");
+ check(queryId != null, "Query id should not be null, since we have waited long enough.");
+ (new CancellingThread(queryId, ex, null, false)).start();
+ cancelRequested = true;
+ }
+ result.release();
+ }
+ }
+
+ /**
+ * Thread that cancels the given query id. After the cancel is acknowledged, the latch is counted down.
+ */
+ private static class CancellingThread extends Thread {
+ private final QueryId queryId;
+ private final Pointer<Exception> ex;
+ private final ExtendedLatch latch;
+ private final boolean sleepBeforeStart;
+
+ public CancellingThread(QueryId queryId, Pointer<Exception> ex, ExtendedLatch latch, boolean sleepBeforeStart) {
+ this.queryId = queryId;
+ this.ex = ex;
+ this.latch = latch;
+ this.sleepBeforeStart = sleepBeforeStart;
+ logger.debug("Cancelling thread created");
+ }
+
+ @Override
+ public void run() {
+ if(sleepBeforeStart) {
+ try {
+ Thread.sleep(1000);
+ } catch (InterruptedException e) {
+ logger.debug("Sleep thread interrupted. Ignore it");
+ // just ignore
+ }
+ }
+ logger.debug("Cancelling {} query started", queryId);
+ final DrillRpcFuture<Ack> cancelAck = client.client().cancelQuery(queryId);
+ logger.debug("Check future: {}", cancelAck);
+ try {
+ Ack ack = cancelAck.checkedGet();
+ logger.debug("Sleep thread for 0.01 seconds");
+ Thread.sleep(10);
+ logger.debug("Ack: {}", ack);
+ } catch (RpcException ex) {
+ this.ex.value = ex;
+ logger.debug("The query wasn't canceled." + ex);
+ } catch (InterruptedException e) {
+ logger.debug("Sleep thread interrupted. Ignore it");
+ // just ignore
+ }
+ if (latch != null) {
+ latch.countDown();
+ }
+ logger.debug("Finish cancelling thread");
+ }
+ }
+
+ /**
+ * Thread that resumes the given query id. After the latch is counted down, the resume signal is sent, until then
+ * the thread waits without interruption.
+ */
+ private static class ResumingThread extends Thread {
+ private final QueryId queryId;
+ private final Pointer<Exception> ex;
+ private final ExtendedLatch latch;
+
+ public ResumingThread(final QueryId queryId, final Pointer<Exception> ex, final ExtendedLatch latch) {
+ this.queryId = queryId;
+ this.ex = ex;
+ this.latch = latch;
+ logger.debug("Cancelling thread created");
+ }
+
+ @Override
+ public void run() {
+ latch.awaitUninterruptibly();
+ final DrillRpcFuture<Ack> resumeAck = client.client().resumeQuery(queryId);
+ try {
+ resumeAck.checkedGet();
+ } catch (final RpcException ex) {
+ this.ex.value = ex;
+ }
+ }
}
- private static long countAllocatedMemory() {
+ /**
+ * Given the result of {@link WaitUntilCompleteListener#waitForCompletion},
+ * this method fails if the completed state is not as expected, or if an
+ * exception is thrown. The completed state could be COMPLETED or CANCELED.
+ * This state is set when {@link WaitUntilCompleteListener#queryCompleted} is
+ * called.
+ */
+ private void assertStateCompleted(final Pair<QueryState, Exception> result, final QueryState expectedState) {
+ final QueryState actualState = result.getFirst();
+ final Exception exception = result.getSecond();
+ if (actualState != expectedState || exception != null) {
+ fail(String.format("Query state is incorrect (expected: %s, actual: %s) AND/OR \nException thrown: %s",
+ expectedState, actualState, exception == null ? "none." : exception));
+ }
+ }
+
+ /**
+ * Given a set of controls, this method ensures that the given query completes with a {@link QueryState#CANCELED} state.</p>
+ */
+ private void assertCancelledWithoutException(final String controls, final WaitUntilCompleteListener listener,
+ final String query) {
+ setControls(controls);
+
+ QueryTestUtil.testWithListener(client.client(), QueryType.SQL, query, listener);
+ final Pair<QueryState, Exception> result = listener.waitForCompletion();
+ assertStateCompleted(result, QueryState.CANCELED);
+ }
+
+ /**
+ * Given a set of controls, this method ensures that the given query completes with a {@link QueryState#COMPLETED} state.</p>
+ * {@link QueryState#COMPLETED} is a terminal state and can't be canceled.
+ * See more: {@link org.apache.drill.exec.work.foreman.QueryStateProcessor#cancel}
+ */
+ private void assertCompletedWithoutException(final String controls, final WaitUntilCompleteListener listener) {
+ setControls(controls);
+
+ QueryTestUtil.testWithListener(client.client(), QueryType.SQL, TEST_QUERY, listener);
+ assertStateCompleted(listener.waitForCompletion(), QueryState.COMPLETED);
+ }
+
+ /**
+ * Given a set of controls, this method ensures that the TEST_QUERY completes with a CANCELED state.
+ */
+ private void assertCancelledWithoutException(final String controls, final WaitUntilCompleteListener listener) {
+ assertCancelledWithoutException(controls, listener, TEST_QUERY);
+ }
+
+ private long countAllocatedMemory() {
// wait to make sure all fragments finished cleaning up
try {
- Thread.sleep(2000);
- } catch (final InterruptedException e) {
+ logger.debug("Sleep thread for 0.05 seconds");
+ Thread.sleep(500);
+ } catch (InterruptedException e) {
+ logger.debug("Sleep thread interrupted. Ignore it", e);
// just ignore
}
long allocated = 0;
- for (final String name : drillbits.keySet()) {
- allocated += drillbits.get(name).getContext().getAllocator().getAllocatedMemory();
+ for (Drillbit drillbit : cluster.drillbits()) {
+ allocated += drillbit.getContext().getAllocator().getAllocatedMemory();
}
-
+ logger.debug("Allocated memory: " + allocated);
return allocated;
}
+
+ private void interruptingBlockedFragmentsWaitingForData(final String control) {
+ try {
+ client.alterSession(SLICE_TARGET, "1");
+ client.alterSession(ENABLE_HASH_AGG_OPTION, "false");
+
+ final String query = "SELECT sales_city, COUNT(*) cnt FROM cp.`region.json` GROUP BY sales_city";
+ assertCancelledWithoutException(control, new ListenerThatCancelsQueryAfterFirstBatchOfData(), query);
+ } finally {
+ client.resetSession(SLICE_TARGET);
+ client.resetSession(ENABLE_HASH_AGG_OPTION);
+ }
+ }
+
+ /**
+ * Given a set of controls, this method ensures TEST_QUERY fails with the given class and desc.
+ */
+ private void assertFailsWithException(final String controls, final Class<? extends Throwable> exceptionClass,
+ final String exceptionDesc, final String query) {
+ setControls(controls);
+ final WaitUntilCompleteListener listener = new WaitUntilCompleteListener();
+ QueryTestUtil.testWithListener(client.client(), QueryType.SQL, query, listener);
+ final Pair<QueryState, Exception> result = listener.waitForCompletion();
+ final QueryState state = result.getFirst();
+ assertSame(state, QueryState.FAILED, () -> String.format("Query state should be FAILED (and not %s).", state));
+ assertExceptionMessage(result.getSecond(), exceptionClass, exceptionDesc);
+ }
+
+ private void assertFailsWithException(final String controls, final Class<? extends Throwable> exceptionClass,
+ final String exceptionDesc) {
+ assertFailsWithException(controls, exceptionClass, exceptionDesc, TEST_QUERY);
+ }
+
+ /**
+ * Test throwing exceptions from sites within the Foreman class, as specified by the site
+ * description
+ *
+ * @param desc site description
+ */
+ private void testForeman(final String desc) {
+ final String controls = Controls.newBuilder()
+ .addException(Foreman.class, desc, ForemanException.class)
+ .build();
+ assertFailsWithException(controls, ForemanException.class, desc);
+ }
+
+ /**
+ * Set the given controls.
+ */
+ private void setControls(final String controls) {
+ ControlsInjectionUtil.setControls(client.client(), controls);
+ }
+
+ /**
+ * Check that the injected exception is what we were expecting.
+ *
+ * @param throwable the throwable that was caught (by the test)
+ * @param exceptionClass the expected exception class
+ * @param desc the expected exception site description
+ */
+ private void assertExceptionMessage(final Throwable throwable, final Class<? extends Throwable> exceptionClass,
+ final String desc) {
+ assertTrue(throwable instanceof UserException, "Throwable was not of UserException type");
+ final ExceptionWrapper cause = ((UserException) throwable).getOrCreatePBError(false).getException();
+ assertEquals(exceptionClass.getName(), cause.getExceptionClass(), "Exception class names should match");
+ assertEquals(desc, cause.getMessage(), "Exception sites should match.");
+ }
}
diff --git a/exec/java-exec/src/test/java/org/apache/drill/exec/store/easy/json/loader/TestExtendedTypes.java b/exec/java-exec/src/test/java/org/apache/drill/exec/store/easy/json/loader/TestExtendedTypes.java
index 7c46e02..ae69b46 100644
--- a/exec/java-exec/src/test/java/org/apache/drill/exec/store/easy/json/loader/TestExtendedTypes.java
+++ b/exec/java-exec/src/test/java/org/apache/drill/exec/store/easy/json/loader/TestExtendedTypes.java
@@ -33,6 +33,7 @@ import java.util.TimeZone;
import static org.apache.drill.test.rowSet.RowSetUtilities.dec;
import static org.apache.drill.test.rowSet.RowSetUtilities.mapValue;
+import org.apache.drill.categories.FlakyTest;
import org.apache.drill.categories.JsonTest;
import org.apache.drill.common.exceptions.UserException;
import org.apache.drill.common.types.TypeProtos.MinorType;
@@ -43,7 +44,7 @@ import org.apache.drill.test.rowSet.RowSetUtilities;
import org.junit.Test;
import org.junit.experimental.categories.Category;
-@Category(JsonTest.class)
+@Category({JsonTest.class, FlakyTest.class})
public class TestExtendedTypes extends BaseJsonLoaderTest {
@Test
diff --git a/exec/java-exec/src/test/java/org/apache/drill/exec/store/sys/PStoreTestUtil.java b/exec/java-exec/src/test/java/org/apache/drill/exec/store/sys/PStoreTestUtil.java
index 610558a..853a022 100644
--- a/exec/java-exec/src/test/java/org/apache/drill/exec/store/sys/PStoreTestUtil.java
+++ b/exec/java-exec/src/test/java/org/apache/drill/exec/store/sys/PStoreTestUtil.java
@@ -21,58 +21,66 @@ import static org.junit.Assert.assertEquals;
import static org.junit.Assert.assertFalse;
import static org.junit.Assert.assertTrue;
+import java.util.HashMap;
import java.util.Iterator;
import java.util.Map;
import java.util.Map.Entry;
import com.fasterxml.jackson.databind.ObjectMapper;
-import org.apache.drill.shaded.guava.com.google.common.collect.Maps;
+import lombok.val;
public class PStoreTestUtil {
static final org.slf4j.Logger logger = org.slf4j.LoggerFactory.getLogger(PStoreTestUtil.class);
- public static void test(PersistentStoreProvider provider) throws Exception{
- PersistentStore<String> store = provider.getOrCreateStore(PersistentStoreConfig.newJacksonBuilder(new ObjectMapper(), String.class).name("sys.test").build());
- String[] keys = {"first", "second"};
- String[] values = {"value1", "value2"};
- Map<String, String> expectedMap = Maps.newHashMap();
+ public static void test(PersistentStoreProvider provider) throws Exception {
+ val store = provider.getOrCreateStore(
+ PersistentStoreConfig.newJacksonBuilder(new ObjectMapper(), String.class)
+ .name("sys.test")
+ .build()
+ );
+ try {
+ val expectedMap = new HashMap<String, String>() {{
+ put("first", "value1");
+ put("second", "value2");
+ }}; // todo: rewrite with Java11
+ expectedMap.forEach(store::put);
+ waitForNumProps(store, expectedMap.size());
- for(int i =0; i < keys.length; i++){
- expectedMap.put(keys[i], values[i]);
- store.put(keys[i], values[i]);
- }
+ Iterator<Map.Entry<String, String>> iter = store.getAll();
+ for(int i =0; i < expectedMap.size(); i++) {
+ Entry<String, String> storeEntry = iter.next();
- // Wait for store caches to update, this is necessary because ZookeeperClient caches can update asynchronously in some cases.
- waitForNumProps(store, keys.length);
+ assertTrue(String.format("This element wasn't added to PStore, storeEntry: %s", storeEntry),
+ expectedMap.containsKey(storeEntry.getKey()));
- {
- Iterator<Map.Entry<String, String>> iter = store.getAll();
- for(int i =0; i < keys.length; i++){
- Entry<String, String> e = iter.next();
- assertTrue(expectedMap.containsKey(e.getKey()));
- assertEquals(expectedMap.get(e.getKey()), e.getValue());
+ String expectedValue = expectedMap.get(storeEntry.getKey());
+ assertEquals(String.format("The value is different in PStore for this key: %s. Expected value: %s, Actual: %s",
+ storeEntry.getKey(), expectedValue, storeEntry.getValue()), expectedValue, storeEntry.getValue());
}
assertFalse(iter.hasNext());
- }
-
- {
+ } finally {
Iterator<Map.Entry<String, String>> iter = store.getAll();
- while(iter.hasNext()){
+ while(iter.hasNext()) {
final String key = iter.next().getKey();
store.delete(key);
}
- }
- // Wait for store caches to update, this is necessary because ZookeeperClient caches can update asynchronously in some cases.
- waitForNumProps(store, 0);
- assertFalse(store.getAll().hasNext());
+ waitForNumProps(store, 0);
+ assertFalse(store.getAll().hasNext());
+ }
}
+ /**
+ * Wait for store caches to update, this is necessary because ZookeeperClient caches can update asynchronously
+ * in some cases.
+ *
+ * @param store PersistentStore
+ * @param expected num props
+ * @throws InterruptedException will fail this test once arises
+ */
private static void waitForNumProps(PersistentStore store, int expected) throws InterruptedException {
- for (int numProps = getNumProps(store);
- numProps < expected;
- numProps = getNumProps(store)) {
+ while(getNumProps(store) < expected) {
Thread.sleep(100L);
}
}
diff --git a/exec/java-exec/src/test/java/org/apache/drill/exec/store/sys/TestPStoreProviders.java b/exec/java-exec/src/test/java/org/apache/drill/exec/store/sys/TestPStoreProviders.java
index cab9877..db95c02 100644
--- a/exec/java-exec/src/test/java/org/apache/drill/exec/store/sys/TestPStoreProviders.java
+++ b/exec/java-exec/src/test/java/org/apache/drill/exec/store/sys/TestPStoreProviders.java
@@ -18,7 +18,9 @@
package org.apache.drill.exec.store.sys;
import com.fasterxml.jackson.databind.ObjectMapper;
+import lombok.val;
import org.apache.curator.framework.CuratorFramework;
+import org.apache.drill.categories.FlakyTest;
import org.apache.drill.categories.SlowTest;
import org.apache.drill.common.config.DrillConfig;
import org.apache.drill.common.util.DrillFileUtils;
@@ -45,7 +47,7 @@ import java.io.File;
import static org.junit.Assert.assertTrue;
-@Category({SlowTest.class})
+@Category({SlowTest.class, FlakyTest.class})
public class TestPStoreProviders extends TestWithZookeeper {
@Rule
public BaseDirTestWatcher dirTestWatcher = new BaseDirTestWatcher();
@@ -59,9 +61,9 @@ public class TestPStoreProviders extends TestWithZookeeper {
@Test
public void verifyZkStore() throws Exception {
- try(CuratorFramework curator = createCurator()){
+ try(CuratorFramework curator = createCurator()) {
curator.start();
- ZookeeperPersistentStoreProvider provider = new ZookeeperPersistentStoreProvider(zkHelper.getConfig(), curator);
+ val provider = new ZookeeperPersistentStoreProvider(zkHelper.getConfig(), curator);
PStoreTestUtil.test(provider);
}
}
diff --git a/exec/java-exec/src/test/java/org/apache/drill/exec/testing/Controls.java b/exec/java-exec/src/test/java/org/apache/drill/exec/testing/Controls.java
index ec264c7..5add50c 100644
--- a/exec/java-exec/src/test/java/org/apache/drill/exec/testing/Controls.java
+++ b/exec/java-exec/src/test/java/org/apache/drill/exec/testing/Controls.java
@@ -23,9 +23,9 @@ import org.apache.drill.exec.proto.CoordinationProtos.DrillbitEndpoint;
import org.apache.drill.shaded.guava.com.google.common.collect.Lists;
-public class Controls {
+import static org.apache.drill.exec.testing.ExecutionControls.EMPTY_CONTROLS;
- private static final String EMPTY_CONTROLS = "{\"injections\" : []}";
+public class Controls {
/**
* Returns a builder that can be used to add injections.
diff --git a/exec/java-exec/src/test/java/org/apache/drill/exec/testing/ControlsInjectionUtil.java b/exec/java-exec/src/test/java/org/apache/drill/exec/testing/ControlsInjectionUtil.java
index 576b101..28693be 100644
--- a/exec/java-exec/src/test/java/org/apache/drill/exec/testing/ControlsInjectionUtil.java
+++ b/exec/java-exec/src/test/java/org/apache/drill/exec/testing/ControlsInjectionUtil.java
@@ -57,7 +57,7 @@ public class ControlsInjectionUtil {
data.release();
}
} catch (final RpcException e) {
- fail("Could not set option: " + e.toString());
+ fail("Could not set option: " + e);
}
}
@@ -187,6 +187,6 @@ public class ControlsInjectionUtil {
* Clears all the controls.
*/
public static void clearControls(final DrillClient client) {
- setControls(client, ExecutionControls.DEFAULT_CONTROLS);
+ setControls(client, ExecutionControls.EMPTY_CONTROLS);
}
}
diff --git a/exec/java-exec/src/test/java/org/apache/drill/test/BaseTestQuery.java b/exec/java-exec/src/test/java/org/apache/drill/test/BaseTestQuery.java
index f5e8064..da595cf 100644
--- a/exec/java-exec/src/test/java/org/apache/drill/test/BaseTestQuery.java
+++ b/exec/java-exec/src/test/java/org/apache/drill/test/BaseTestQuery.java
@@ -451,7 +451,7 @@ public class BaseTestQuery extends ExecTest {
errorMsgTestHelper(testSqlQuery, UserBitShared.DrillPBError.ErrorType.PARSE.name());
}
- public static String getFile(String resource) throws IOException{
+ public static String getFile(String resource) throws IOException {
URL url = Resources.getResource(resource);
if (url == null) {
throw new IOException(String.format("Unable to find path %s.", resource));
diff --git a/exec/java-exec/src/test/java/org/apache/drill/test/LogFixture.java b/exec/java-exec/src/test/java/org/apache/drill/test/LogFixture.java
index b62a188..acd1947 100644
--- a/exec/java-exec/src/test/java/org/apache/drill/test/LogFixture.java
+++ b/exec/java-exec/src/test/java/org/apache/drill/test/LogFixture.java
@@ -216,8 +216,6 @@ public class LogFixture implements AutoCloseable {
appender.setEncoder(ple);
appender.start();
- Logger root = (Logger)LoggerFactory.getLogger(Logger.ROOT_LOGGER_NAME);
- root.addAppender(appender);
drillLogger.addAppender(appender);
}
diff --git a/exec/jdbc/src/test/java/org/apache/drill/jdbc/test/TestInformationSchemaColumns.java b/exec/jdbc/src/test/java/org/apache/drill/jdbc/test/TestInformationSchemaColumns.java
index 76f2553..218c618 100644
--- a/exec/jdbc/src/test/java/org/apache/drill/jdbc/test/TestInformationSchemaColumns.java
+++ b/exec/jdbc/src/test/java/org/apache/drill/jdbc/test/TestInformationSchemaColumns.java
@@ -33,6 +33,7 @@ import java.sql.SQLException;
import java.sql.Statement;
import java.sql.Types;
+import org.apache.drill.categories.FlakyTest;
import org.apache.drill.jdbc.Driver;
import org.apache.drill.jdbc.JdbcTestBase;
import org.apache.drill.categories.JdbcTest;
@@ -54,7 +55,7 @@ import org.junit.experimental.categories.Category;
/**
* Test class for Drill's INFORMATION_SCHEMA.COLUMNS implementation.
*/
-@Category(JdbcTest.class)
+@Category({JdbcTest.class, FlakyTest.class})
public class TestInformationSchemaColumns extends JdbcTestBase {
private static final String VIEW_NAME =
diff --git a/pom.xml b/pom.xml
index 31bbc9e..8292caf 100644
--- a/pom.xml
+++ b/pom.xml
@@ -73,7 +73,7 @@
<commons.io.version>2.7</commons.io.version>
<commons.collections.version>4.4</commons.collections.version>
<hamcrest.version>2.2</hamcrest.version>
- <curator.version>5.1.0</curator.version>
+ <curator.version>5.2.0</curator.version>
<wiremock.standalone.version>2.23.2</wiremock.standalone.version>
<jmockit.version>1.47</jmockit.version>
<logback.version>1.2.3</logback.version>
@@ -98,7 +98,7 @@
<javax.validation.api>2.0.1.Final</javax.validation.api>
<asm.version>7.3.1</asm.version>
<excludedGroups />
- <memoryMb>4000</memoryMb>
+ <memoryMb>1800</memoryMb>
<directMemoryMb>3000</directMemoryMb>
<rat.skip>true</rat.skip>
<license.skip>true</license.skip>