You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@druid.apache.org by fj...@apache.org on 2018/08/11 05:03:39 UTC
[incubator-druid] branch master updated: Further optimize memory
for Travis jobs (#6150)
This is an automated email from the ASF dual-hosted git repository.
fjy pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/incubator-druid.git
The following commit(s) were added to refs/heads/master by this push:
new ecee3e0 Further optimize memory for Travis jobs (#6150)
ecee3e0 is described below
commit ecee3e0a2479eba734f992ca9ea086ecfb9dbaee
Author: Jihoon Son <ji...@apache.org>
AuthorDate: Fri Aug 10 22:03:36 2018 -0700
Further optimize memory for Travis jobs (#6150)
* Further optimize memory for Travis jobs
* fix build
* sudo false
---
.travis.yml | 89 +++----
.../io/druid/benchmark/query/SqlBenchmark.java | 15 +-
.../io/druid/collections/DefaultBlockingPool.java | 4 +-
.../main/java/io/druid/collections/StupidPool.java | 14 +-
.../io/druid/collections/BlockingPoolTest.java | 10 +-
.../collections/CloseableDefaultBlockingPool.java | 37 +++
.../io/druid/collections/CloseableStupidPool.java | 43 ++++
.../java/io/druid/collections/StupidPoolTest.java | 5 +-
extensions-contrib/distinctcount/pom.xml | 7 +
.../DistinctCountGroupByQueryTest.java | 27 +-
.../distinctcount/DistinctCountTopNQueryTest.java | 41 +--
extensions-contrib/time-min-max/pom.xml | 7 +
.../TimestampGroupByAggregationTest.java | 7 +
extensions-core/datasketches/pom.xml | 7 +
.../quantiles/DoublesSketchAggregatorTest.java | 8 +
.../datasketches/theta/SketchAggregationTest.java | 7 +
.../theta/SketchAggregationWithSimpleDataTest.java | 227 +++++++++--------
.../theta/oldapi/OldApiSketchAggregationTest.java | 7 +
.../tuple/ArrayOfDoublesSketchAggregationTest.java | 8 +
extensions-core/histogram/pom.xml | 7 +
.../ApproximateHistogramAggregationTest.java | 8 +
.../ApproximateHistogramGroupByQueryTest.java | 26 +-
.../ApproximateHistogramTopNQueryTest.java | 28 ++-
.../histogram/sql/QuantileSqlAggregatorTest.java | 30 ++-
extensions-core/stats/pom.xml | 7 +
.../variance/VarianceGroupByQueryTest.java | 7 +-
pom.xml | 12 +-
processing/pom.xml | 22 ++
.../io/druid/query/MultiValuedDimensionTest.java | 69 ++---
.../test/java/io/druid/query/TestQueryRunners.java | 41 +--
.../query/aggregation/AggregationTestHelper.java | 73 ++++--
.../hyperloglog/HyperUniquesAggregationTest.java | 226 +++++++++--------
.../FinalizingFieldAccessPostAggregatorTest.java | 119 ++++-----
...GroupByLimitPushDownInsufficientBufferTest.java | 25 +-
.../GroupByLimitPushDownMultiNodeMergeTest.java | 25 +-
.../query/groupby/GroupByMultiSegmentTest.java | 22 +-
.../query/groupby/GroupByQueryMergeBufferTest.java | 39 +--
.../groupby/GroupByQueryRunnerFactoryTest.java | 28 ++-
.../groupby/GroupByQueryRunnerFailureTest.java | 39 +--
.../query/groupby/GroupByQueryRunnerTest.java | 61 +++--
.../groupby/GroupByTimeseriesQueryRunnerTest.java | 23 +-
.../query/topn/TopNQueryQueryToolChestTest.java | 89 ++++---
.../io/druid/query/topn/TopNQueryRunnerTest.java | 26 +-
.../io/druid/query/topn/TopNUnionQueryTest.java | 36 ++-
.../src/test/java/io/druid/segment/AppendTest.java | 34 ++-
.../io/druid/segment/SchemalessTestSimpleTest.java | 10 +-
.../druid/segment/data/IncrementalIndexTest.java | 147 +++++------
.../IncrementalIndexStorageAdapterTest.java | 278 ++++++++++++---------
.../segment/incremental/IncrementalIndexTest.java | 54 ++--
server/pom.xml | 7 +
.../CachingClusteredClientFunctionalityTest.java | 42 +++-
.../druid/client/CachingClusteredClientTest.java | 118 ++++-----
.../client/CachingClusteredClientTestUtils.java | 118 +++++++++
.../segment/realtime/RealtimeManagerTest.java | 17 +-
sql/pom.xml | 7 +
.../druid/sql/avatica/DruidAvaticaHandlerTest.java | 33 ++-
.../io/druid/sql/avatica/DruidStatementTest.java | 33 ++-
.../io/druid/sql/calcite/CalciteQueryTest.java | 30 ++-
.../io/druid/sql/calcite/http/SqlResourceTest.java | 30 ++-
.../druid/sql/calcite/schema/DruidSchemaTest.java | 32 ++-
.../io/druid/sql/calcite/util/CalciteTests.java | 243 +++++++++---------
61 files changed, 1822 insertions(+), 1069 deletions(-)
diff --git a/.travis.yml b/.travis.yml
index 3c9f57e..67afc26 100644
--- a/.travis.yml
+++ b/.travis.yml
@@ -3,7 +3,7 @@ language: java
# On 12-12-2017, Travis updated their trusty image, which caused integration tests to fail.
# The group: config instructs Travis to use the previous trusty image.
# Please see https://github.com/druid-io/druid/pull/5155 for more information.
-sudo: required
+sudo: false
dist: trusty
group: deprecated-2017Q4
@@ -17,71 +17,76 @@ cache:
matrix:
include:
# strict compilation
- - sudo: false
- env:
+ - env:
- NAME="strict compilation"
install: true
# Strict compilation requires more than 2 GB
- script: echo "MAVEN_OPTS='-Xmx3000m'" > ~/.mavenrc && mvn clean -Pstrict -pl '!benchmarks' compile test-compile -B --fail-at-end
+ script: MAVEN_OPTS='-Xmx3000m' mvn clean -Pstrict -pl '!benchmarks' compile test-compile -B --fail-at-end
# processing module test
- - sudo: false
- env:
+ - env:
- NAME="processing module test"
- install: echo "MAVEN_OPTS='-Xmx3000m'" > ~/.mavenrc && mvn install -q -ff -DskipTests -B
- before_script:
- - unset _JAVA_OPTIONS
- script: echo "MAVEN_OPTS='-Xmx512m'" > ~/.mavenrc && mvn test -B -Pparallel-test -Dmaven.fork.count=2 -pl processing
+ install: MAVEN_OPTS='-Xmx3000m' mvn install -q -ff -DskipTests -B
+ before_script: unset _JAVA_OPTIONS
+ script:
+ # Set MAVEN_OPTS for Surefire launcher
+ - MAVEN_OPTS='-Xmx512m' mvn test -B -pl processing
+ - sh -c "dmesg | egrep -i '(oom|out of memory|kill process|killed).*' -C 1 || exit 0"
+ - free -m
# processing module tests with SQL Compatibility enabled
- - sudo: false
- env:
+ - env:
- NAME="processing module test with SQL Compatibility"
- install: echo "MAVEN_OPTS='-Xmx3000m'" > ~/.mavenrc && mvn install -q -ff -DskipTests -B
- before_script:
- - unset _JAVA_OPTIONS
- script: echo "MAVEN_OPTS='-Xmx512m'" > ~/.mavenrc && mvn test -B -Pparallel-test -Dmaven.fork.count=2 -Ddruid.generic.useDefaultValueForNull=false -pl processing
+ install: MAVEN_OPTS='-Xmx3000m' mvn install -q -ff -DskipTests -B
+ before_script: unset _JAVA_OPTIONS
+ script:
+ # Set MAVEN_OPTS for Surefire launcher
+ - MAVEN_OPTS='-Xmx512m' mvn test -B -Ddruid.generic.useDefaultValueForNull=false -pl processing
+ - sh -c "dmesg | egrep -i '(oom|out of memory|kill process|killed).*' -C 1 || exit 0"
+ - free -m
# server module test
- - sudo: false
- env:
+ - env:
- NAME="server module test"
- install: echo "MAVEN_OPTS='-Xmx3000m'" > ~/.mavenrc && mvn install -q -ff -DskipTests -B
- before_script:
- - unset _JAVA_OPTIONS
- # Server module test is run without the parallel-test option because it's memory sensitive and often fails with that option.
- script: echo "MAVEN_OPTS='-Xmx512m'" > ~/.mavenrc && mvn test -B -pl server
+ install: MAVEN_OPTS='-Xmx3000m' mvn install -q -ff -DskipTests -B
+ before_script: unset _JAVA_OPTIONS
+ script:
+ # Set MAVEN_OPTS for Surefire launcher
+ - MAVEN_OPTS='-Xmx512m' mvn test -B -pl server
# server module test with SQL Compatibility enabled
- - sudo: false
- env:
+ - env:
- NAME="server module test with SQL Compatibility enabled"
- install: echo "MAVEN_OPTS='-Xmx3000m'" > ~/.mavenrc && mvn install -q -ff -DskipTests -B
- before_script:
- - unset _JAVA_OPTIONS
- # Server module test is run without the parallel-test option because it's memory sensitive and often fails with that option.
- script: echo "MAVEN_OPTS='-Xmx512m'" > ~/.mavenrc && mvn test -B -pl server -Ddruid.generic.useDefaultValueForNull=false
+ install: MAVEN_OPTS='-Xmx3000m' mvn install -q -ff -DskipTests -B
+ before_script: unset _JAVA_OPTIONS
+ script:
+ # Set MAVEN_OPTS for Surefire launcher
+ - MAVEN_OPTS='-Xmx512m' mvn test -B -pl server -Ddruid.generic.useDefaultValueForNull=false
# other modules test
- - sudo: false
- env:
+ - env:
- NAME="other modules test"
- AWS_REGION=us-east-1 # set a aws region for unit tests
- install: echo "MAVEN_OPTS='-Xmx3000m'" > ~/.mavenrc && mvn install -q -ff -DskipTests -B
- before_script:
- - unset _JAVA_OPTIONS
- script: echo "MAVEN_OPTS='-Xmx512m'" > ~/.mavenrc && mvn test -B -Pparallel-test -Dmaven.fork.count=2 -pl '!processing,!server'
+ install: MAVEN_OPTS='-Xmx3000m' mvn install -q -ff -DskipTests -B
+ before_script: unset _JAVA_OPTIONS
+ script:
+ # Set MAVEN_OPTS for Surefire launcher
+ - MAVEN_OPTS='-Xmx512m' mvn test -B -pl '!processing,!server'
+ - sh -c "dmesg | egrep -i '(oom|out of memory|kill process|killed).*' -C 1 || exit 0"
+ - free -m
# other modules test with SQL Compatibility enabled
- - sudo: false
- env:
+ - env:
- NAME="other modules test with SQL Compatibility"
- AWS_REGION=us-east-1 # set a aws region for unit tests
- install: echo "MAVEN_OPTS='-Xmx3000m'" > ~/.mavenrc && mvn install -q -ff -DskipTests -B
- before_script:
- - unset _JAVA_OPTIONS
- script: echo "MAVEN_OPTS='-Xmx512m'" > ~/.mavenrc && mvn test -B -Pparallel-test -Dmaven.fork.count=2 -Ddruid.generic.useDefaultValueForNull=false -pl '!processing,!server'
+ install: MAVEN_OPTS='-Xmx3000m' mvn install -q -ff -DskipTests -B
+ before_script: unset _JAVA_OPTIONS
+ script:
+ # Set MAVEN_OPTS for Surefire launcher
+ - MAVEN_OPTS='-Xmx512m' mvn test -B -Ddruid.generic.useDefaultValueForNull=false -pl '!processing,!server'
+ - sh -c "dmesg | egrep -i '(oom|out of memory|kill process|killed).*' -C 1 || exit 0"
+ - free -m
# run integration tests
- sudo: required
diff --git a/benchmarks/src/main/java/io/druid/benchmark/query/SqlBenchmark.java b/benchmarks/src/main/java/io/druid/benchmark/query/SqlBenchmark.java
index 256c93f..ccf78d4 100644
--- a/benchmarks/src/main/java/io/druid/benchmark/query/SqlBenchmark.java
+++ b/benchmarks/src/main/java/io/druid/benchmark/query/SqlBenchmark.java
@@ -26,8 +26,10 @@ import io.druid.benchmark.datagen.BenchmarkSchemas;
import io.druid.benchmark.datagen.SegmentGenerator;
import io.druid.data.input.Row;
import io.druid.java.util.common.Intervals;
+import io.druid.java.util.common.Pair;
import io.druid.java.util.common.granularity.Granularities;
import io.druid.java.util.common.guava.Sequence;
+import io.druid.java.util.common.io.Closer;
import io.druid.java.util.common.logger.Logger;
import io.druid.query.QueryPlus;
import io.druid.query.QueryRunnerFactoryConglomerate;
@@ -85,6 +87,7 @@ public class SqlBenchmark
private PlannerFactory plannerFactory;
private GroupByQuery groupByQuery;
private String sqlQuery;
+ private Closer resourceCloser;
@Setup(Level.Trial)
public void setup()
@@ -104,13 +107,15 @@ public class SqlBenchmark
this.segmentGenerator = new SegmentGenerator();
final QueryableIndex index = segmentGenerator.generate(dataSegment, schemaInfo, Granularities.NONE, rowsPerSegment);
- final QueryRunnerFactoryConglomerate conglomerate = CalciteTests.queryRunnerFactoryConglomerate();
+ final Pair<QueryRunnerFactoryConglomerate, Closer> conglomerateCloserPair = CalciteTests
+ .createQueryRunnerFactoryConglomerate();
+ final QueryRunnerFactoryConglomerate conglomerate = conglomerateCloserPair.lhs;
final PlannerConfig plannerConfig = new PlannerConfig();
this.walker = new SpecificSegmentsQuerySegmentWalker(conglomerate).add(dataSegment, index);
plannerFactory = new PlannerFactory(
- CalciteTests.createMockSchema(walker, plannerConfig),
- CalciteTests.createMockQueryLifecycleFactory(walker),
+ CalciteTests.createMockSchema(conglomerate, walker, plannerConfig),
+ CalciteTests.createMockQueryLifecycleFactory(walker, conglomerate),
CalciteTests.createOperatorTable(),
CalciteTests.createExprMacroTable(),
plannerConfig,
@@ -147,6 +152,10 @@ public class SqlBenchmark
segmentGenerator = null;
}
+ if (resourceCloser != null) {
+ resourceCloser.close();
+ }
+
if (tmpDir != null) {
FileUtils.deleteDirectory(tmpDir);
}
diff --git a/common/src/main/java/io/druid/collections/DefaultBlockingPool.java b/common/src/main/java/io/druid/collections/DefaultBlockingPool.java
index 7282799..ce45afa 100644
--- a/common/src/main/java/io/druid/collections/DefaultBlockingPool.java
+++ b/common/src/main/java/io/druid/collections/DefaultBlockingPool.java
@@ -41,7 +41,9 @@ public class DefaultBlockingPool<T> implements BlockingPool<T>
{
private static final TimeUnit TIME_UNIT = TimeUnit.MILLISECONDS;
- private final ArrayDeque<T> objects;
+ @VisibleForTesting
+ final ArrayDeque<T> objects;
+
private final ReentrantLock lock;
private final Condition notEnough;
private final int maxSize;
diff --git a/common/src/main/java/io/druid/collections/StupidPool.java b/common/src/main/java/io/druid/collections/StupidPool.java
index ecafcfb..0262a49 100644
--- a/common/src/main/java/io/druid/collections/StupidPool.java
+++ b/common/src/main/java/io/druid/collections/StupidPool.java
@@ -39,9 +39,6 @@ public class StupidPool<T> implements NonBlockingPool<T>
{
private static final Logger log = new Logger(StupidPool.class);
- private final String name;
- private final Supplier<T> generator;
-
/**
* StupidPool Implementation Note
* It is assumed that StupidPools are never reclaimed by the GC, either stored in static fields or global singleton
@@ -50,13 +47,20 @@ public class StupidPool<T> implements NonBlockingPool<T>
* and registered in the global lifecycle), in this close() method all {@link ObjectResourceHolder}s should be drained
* from the {@code objects} queue, and notifier.disable() called for them.
*/
- private final Queue<ObjectResourceHolder> objects = new ConcurrentLinkedQueue<>();
+ @VisibleForTesting
+ final Queue<ObjectResourceHolder> objects = new ConcurrentLinkedQueue<>();
+
/**
* {@link ConcurrentLinkedQueue}'s size() is O(n) queue traversal apparently for the sake of being 100%
* wait-free, that is not required by {@code StupidPool}. In {@code poolSize} we account the queue size
* ourselves, to avoid traversal of {@link #objects} in {@link #tryReturnToPool}.
*/
- private final AtomicLong poolSize = new AtomicLong(0);
+ @VisibleForTesting
+ final AtomicLong poolSize = new AtomicLong(0);
+
+ private final String name;
+ private final Supplier<T> generator;
+
private final AtomicLong leakedObjectsCounter = new AtomicLong(0);
//note that this is just the max entries in the cache, pool can still create as many buffers as needed.
diff --git a/common/src/test/java/io/druid/collections/BlockingPoolTest.java b/common/src/test/java/io/druid/collections/BlockingPoolTest.java
index 3b48cff..46bd680 100644
--- a/common/src/test/java/io/druid/collections/BlockingPoolTest.java
+++ b/common/src/test/java/io/druid/collections/BlockingPoolTest.java
@@ -43,8 +43,8 @@ public class BlockingPoolTest
{
private ExecutorService service;
- private DefaultBlockingPool<Integer> pool;
- private BlockingPool<Integer> emptyPool;
+ private CloseableDefaultBlockingPool<Integer> pool;
+ private CloseableDefaultBlockingPool<Integer> emptyPool;
@Rule
public ExpectedException expectedException = ExpectedException.none();
@@ -53,13 +53,15 @@ public class BlockingPoolTest
public void setup()
{
service = Execs.multiThreaded(2, "blocking-pool-test");
- pool = new DefaultBlockingPool<>(Suppliers.ofInstance(1), 10);
- emptyPool = new DefaultBlockingPool<>(Suppliers.ofInstance(1), 0);
+ pool = new CloseableDefaultBlockingPool<>(Suppliers.ofInstance(1), 10);
+ emptyPool = new CloseableDefaultBlockingPool<>(Suppliers.ofInstance(1), 0);
}
@After
public void teardown()
{
+ pool.close();
+ emptyPool.close();
service.shutdownNow();
}
diff --git a/common/src/test/java/io/druid/collections/CloseableDefaultBlockingPool.java b/common/src/test/java/io/druid/collections/CloseableDefaultBlockingPool.java
new file mode 100644
index 0000000..5776a91
--- /dev/null
+++ b/common/src/test/java/io/druid/collections/CloseableDefaultBlockingPool.java
@@ -0,0 +1,37 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package io.druid.collections;
+
+import com.google.common.base.Supplier;
+
+import java.io.Closeable;
+
+public class CloseableDefaultBlockingPool<T> extends DefaultBlockingPool<T> implements Closeable
+{
+ public CloseableDefaultBlockingPool(Supplier<T> generator, int limit)
+ {
+ super(generator, limit);
+ }
+
+ @Override
+ public void close()
+ {
+ objects.clear();
+ }
+}
diff --git a/common/src/test/java/io/druid/collections/CloseableStupidPool.java b/common/src/test/java/io/druid/collections/CloseableStupidPool.java
new file mode 100644
index 0000000..e69f8b1
--- /dev/null
+++ b/common/src/test/java/io/druid/collections/CloseableStupidPool.java
@@ -0,0 +1,43 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package io.druid.collections;
+
+import com.google.common.base.Supplier;
+
+import java.io.Closeable;
+
+public class CloseableStupidPool<T> extends StupidPool<T> implements Closeable
+{
+ public CloseableStupidPool(String name, Supplier<T> generator)
+ {
+ super(name, generator);
+ }
+
+ public CloseableStupidPool(String name, Supplier<T> generator, int initCount, int objectsCacheMaxCount)
+ {
+ super(name, generator, initCount, objectsCacheMaxCount);
+ }
+
+ @Override
+ public void close()
+ {
+ objects.clear();
+ poolSize.set(0);
+ }
+}
diff --git a/common/src/test/java/io/druid/collections/StupidPoolTest.java b/common/src/test/java/io/druid/collections/StupidPoolTest.java
index cf7d3ce..98a8de0 100644
--- a/common/src/test/java/io/druid/collections/StupidPoolTest.java
+++ b/common/src/test/java/io/druid/collections/StupidPoolTest.java
@@ -31,7 +31,7 @@ import org.junit.Test;
public class StupidPoolTest
{
private Supplier<String> generator;
- private StupidPool<String> poolOfString;
+ private CloseableStupidPool<String> poolOfString;
private ResourceHolder<String> resourceHolderObj;
private String defaultString = new String("test");
@@ -41,7 +41,7 @@ public class StupidPoolTest
generator = EasyMock.createMock(Supplier.class);
EasyMock.expect(generator.get()).andReturn(defaultString).anyTimes();
EasyMock.replay(generator);
- poolOfString = new StupidPool<>("poolOfString", generator);
+ poolOfString = new CloseableStupidPool<>("poolOfString", generator);
resourceHolderObj = poolOfString.take();
}
@@ -51,6 +51,7 @@ public class StupidPoolTest
if (resourceHolderObj != null) {
resourceHolderObj.close();
}
+ poolOfString.close();
}
@Test
diff --git a/extensions-contrib/distinctcount/pom.xml b/extensions-contrib/distinctcount/pom.xml
index 485748d..acccd20 100644
--- a/extensions-contrib/distinctcount/pom.xml
+++ b/extensions-contrib/distinctcount/pom.xml
@@ -45,6 +45,13 @@
<!-- Tests -->
<dependency>
<groupId>io.druid</groupId>
+ <artifactId>druid-common</artifactId>
+ <version>${project.parent.version}</version>
+ <scope>test</scope>
+ <type>test-jar</type>
+ </dependency>
+ <dependency>
+ <groupId>io.druid</groupId>
<artifactId>druid-processing</artifactId>
<version>${project.parent.version}</version>
<scope>test</scope>
diff --git a/extensions-contrib/distinctcount/src/test/java/io/druid/query/aggregation/distinctcount/DistinctCountGroupByQueryTest.java b/extensions-contrib/distinctcount/src/test/java/io/druid/query/aggregation/distinctcount/DistinctCountGroupByQueryTest.java
index c02325d..750cdcd 100644
--- a/extensions-contrib/distinctcount/src/test/java/io/druid/query/aggregation/distinctcount/DistinctCountGroupByQueryTest.java
+++ b/extensions-contrib/distinctcount/src/test/java/io/druid/query/aggregation/distinctcount/DistinctCountGroupByQueryTest.java
@@ -23,7 +23,9 @@ import com.google.common.collect.ImmutableMap;
import com.google.common.collect.Lists;
import io.druid.data.input.MapBasedInputRow;
import io.druid.data.input.Row;
+import io.druid.java.util.common.Pair;
import io.druid.java.util.common.granularity.Granularities;
+import io.druid.java.util.common.io.Closer;
import io.druid.query.QueryRunnerTestHelper;
import io.druid.query.aggregation.CountAggregatorFactory;
import io.druid.query.dimension.DefaultDimensionSpec;
@@ -39,22 +41,41 @@ import io.druid.segment.Segment;
import io.druid.segment.TestHelper;
import io.druid.segment.incremental.IncrementalIndex;
import io.druid.segment.incremental.IncrementalIndexSchema;
+import org.junit.After;
+import org.junit.Before;
import org.junit.Test;
+import java.io.IOException;
import java.util.Arrays;
import java.util.Collections;
import java.util.List;
public class DistinctCountGroupByQueryTest
{
+ private GroupByQueryRunnerFactory factory;
+ private Closer resourceCloser;
- @Test
- public void testGroupByWithDistinctCountAgg() throws Exception
+ @Before
+ public void setup()
{
final GroupByQueryConfig config = new GroupByQueryConfig();
config.setMaxIntermediateRows(10000);
- final GroupByQueryRunnerFactory factory = GroupByQueryRunnerTest.makeQueryRunnerFactory(config);
+ final Pair<GroupByQueryRunnerFactory, Closer> factoryCloserPair = GroupByQueryRunnerTest.makeQueryRunnerFactory(
+ config
+ );
+ factory = factoryCloserPair.lhs;
+ resourceCloser = factoryCloserPair.rhs;
+ }
+
+ @After
+ public void teardown() throws IOException
+ {
+ resourceCloser.close();
+ }
+ @Test
+ public void testGroupByWithDistinctCountAgg() throws Exception
+ {
IncrementalIndex index = new IncrementalIndex.Builder()
.setIndexSchema(
new IncrementalIndexSchema.Builder()
diff --git a/extensions-contrib/distinctcount/src/test/java/io/druid/query/aggregation/distinctcount/DistinctCountTopNQueryTest.java b/extensions-contrib/distinctcount/src/test/java/io/druid/query/aggregation/distinctcount/DistinctCountTopNQueryTest.java
index 1669aee..dd02784 100644
--- a/extensions-contrib/distinctcount/src/test/java/io/druid/query/aggregation/distinctcount/DistinctCountTopNQueryTest.java
+++ b/extensions-contrib/distinctcount/src/test/java/io/druid/query/aggregation/distinctcount/DistinctCountTopNQueryTest.java
@@ -22,7 +22,7 @@ package io.druid.query.aggregation.distinctcount;
import com.google.common.base.Supplier;
import com.google.common.collect.ImmutableMap;
import com.google.common.collect.Lists;
-import io.druid.collections.StupidPool;
+import io.druid.collections.CloseableStupidPool;
import io.druid.data.input.MapBasedInputRow;
import io.druid.java.util.common.DateTimes;
import io.druid.java.util.common.granularity.Granularities;
@@ -38,6 +38,8 @@ import io.druid.segment.incremental.IncrementalIndex;
import io.druid.segment.incremental.IncrementalIndexSchema;
import io.druid.segment.incremental.IncrementalIndexStorageAdapter;
import org.joda.time.DateTime;
+import org.junit.After;
+import org.junit.Before;
import org.junit.Test;
import java.nio.ByteBuffer;
@@ -48,23 +50,34 @@ import java.util.Map;
public class DistinctCountTopNQueryTest
{
+ private CloseableStupidPool<ByteBuffer> pool;
+
+ @Before
+ public void setup()
+ {
+ pool = new CloseableStupidPool<>(
+ "TopNQueryEngine-bufferPool",
+ new Supplier<ByteBuffer>()
+ {
+ @Override
+ public ByteBuffer get()
+ {
+ return ByteBuffer.allocate(1024 * 1024);
+ }
+ }
+ );
+ }
+
+ @After
+ public void teardown()
+ {
+ pool.close();
+ }
@Test
public void testTopNWithDistinctCountAgg() throws Exception
{
- TopNQueryEngine engine = new TopNQueryEngine(
- new StupidPool<ByteBuffer>(
- "TopNQueryEngine-bufferPool",
- new Supplier<ByteBuffer>()
- {
- @Override
- public ByteBuffer get()
- {
- return ByteBuffer.allocate(1024 * 1024);
- }
- }
- )
- );
+ TopNQueryEngine engine = new TopNQueryEngine(pool);
IncrementalIndex index = new IncrementalIndex.Builder()
.setIndexSchema(
diff --git a/extensions-contrib/time-min-max/pom.xml b/extensions-contrib/time-min-max/pom.xml
index 373ccc2..e3a3299 100644
--- a/extensions-contrib/time-min-max/pom.xml
+++ b/extensions-contrib/time-min-max/pom.xml
@@ -64,6 +64,13 @@
</dependency>
<dependency>
<groupId>io.druid</groupId>
+ <artifactId>druid-common</artifactId>
+ <version>${project.parent.version}</version>
+ <type>test-jar</type>
+ <scope>test</scope>
+ </dependency>
+ <dependency>
+ <groupId>io.druid</groupId>
<artifactId>druid-processing</artifactId>
<version>${project.parent.version}</version>
<type>test-jar</type>
diff --git a/extensions-contrib/time-min-max/src/test/java/io/druid/query/aggregation/TimestampGroupByAggregationTest.java b/extensions-contrib/time-min-max/src/test/java/io/druid/query/aggregation/TimestampGroupByAggregationTest.java
index f90a945..d5db6cc 100644
--- a/extensions-contrib/time-min-max/src/test/java/io/druid/query/aggregation/TimestampGroupByAggregationTest.java
+++ b/extensions-contrib/time-min-max/src/test/java/io/druid/query/aggregation/TimestampGroupByAggregationTest.java
@@ -31,6 +31,7 @@ import io.druid.query.groupby.GroupByQueryRunnerTest;
import io.druid.segment.ColumnSelectorFactory;
import org.easymock.EasyMock;
import org.joda.time.DateTime;
+import org.junit.After;
import org.junit.Assert;
import org.junit.Before;
import org.junit.Rule;
@@ -40,6 +41,7 @@ import org.junit.runner.RunWith;
import org.junit.runners.Parameterized;
import java.io.File;
+import java.io.IOException;
import java.sql.Timestamp;
import java.util.List;
import java.util.zip.ZipFile;
@@ -112,7 +114,12 @@ public class TimestampGroupByAggregationTest
selectorFactory = EasyMock.createMock(ColumnSelectorFactory.class);
EasyMock.expect(selectorFactory.makeColumnValueSelector("test")).andReturn(selector);
EasyMock.replay(selectorFactory);
+ }
+ @After
+ public void teardown() throws IOException
+ {
+ helper.close();
}
@Test
diff --git a/extensions-core/datasketches/pom.xml b/extensions-core/datasketches/pom.xml
index f144e57..ea2839d 100644
--- a/extensions-core/datasketches/pom.xml
+++ b/extensions-core/datasketches/pom.xml
@@ -119,6 +119,13 @@
</dependency>
<dependency>
<groupId>io.druid</groupId>
+ <artifactId>druid-common</artifactId>
+ <version>${project.parent.version}</version>
+ <type>test-jar</type>
+ <scope>test</scope>
+ </dependency>
+ <dependency>
+ <groupId>io.druid</groupId>
<artifactId>druid-processing</artifactId>
<version>${project.parent.version}</version>
<type>test-jar</type>
diff --git a/extensions-core/datasketches/src/test/java/io/druid/query/aggregation/datasketches/quantiles/DoublesSketchAggregatorTest.java b/extensions-core/datasketches/src/test/java/io/druid/query/aggregation/datasketches/quantiles/DoublesSketchAggregatorTest.java
index 59f88b9..3def788 100644
--- a/extensions-core/datasketches/src/test/java/io/druid/query/aggregation/datasketches/quantiles/DoublesSketchAggregatorTest.java
+++ b/extensions-core/datasketches/src/test/java/io/druid/query/aggregation/datasketches/quantiles/DoublesSketchAggregatorTest.java
@@ -30,6 +30,7 @@ import io.druid.query.aggregation.AggregationTestHelper;
import io.druid.query.aggregation.AggregatorFactory;
import io.druid.query.groupby.GroupByQueryConfig;
import io.druid.query.groupby.GroupByQueryRunnerTest;
+import org.junit.After;
import org.junit.Assert;
import org.junit.Rule;
import org.junit.Test;
@@ -38,6 +39,7 @@ import org.junit.runner.RunWith;
import org.junit.runners.Parameterized;
import java.io.File;
+import java.io.IOException;
import java.util.Collection;
import java.util.List;
@@ -73,6 +75,12 @@ public class DoublesSketchAggregatorTest
return constructors;
}
+ @After
+ public void teardown() throws IOException
+ {
+ helper.close();
+ }
+
// this is to test Json properties and equals
@Test
public void serializeDeserializeFactoryWithFieldName() throws Exception
diff --git a/extensions-core/datasketches/src/test/java/io/druid/query/aggregation/datasketches/theta/SketchAggregationTest.java b/extensions-core/datasketches/src/test/java/io/druid/query/aggregation/datasketches/theta/SketchAggregationTest.java
index 3d20546..6a15269 100644
--- a/extensions-core/datasketches/src/test/java/io/druid/query/aggregation/datasketches/theta/SketchAggregationTest.java
+++ b/extensions-core/datasketches/src/test/java/io/druid/query/aggregation/datasketches/theta/SketchAggregationTest.java
@@ -42,6 +42,7 @@ import io.druid.query.groupby.GroupByQueryConfig;
import io.druid.query.groupby.GroupByQueryRunnerTest;
import io.druid.query.groupby.epinephelinae.GrouperTestUtil;
import io.druid.query.groupby.epinephelinae.TestColumnSelectorFactory;
+import org.junit.After;
import org.junit.Assert;
import org.junit.Rule;
import org.junit.Test;
@@ -88,6 +89,12 @@ public class SketchAggregationTest
return constructors;
}
+ @After
+ public void teardown() throws IOException
+ {
+ helper.close();
+ }
+
@Test
public void testSketchDataIngestAndGpByQuery() throws Exception
{
diff --git a/extensions-core/datasketches/src/test/java/io/druid/query/aggregation/datasketches/theta/SketchAggregationWithSimpleDataTest.java b/extensions-core/datasketches/src/test/java/io/druid/query/aggregation/datasketches/theta/SketchAggregationWithSimpleDataTest.java
index a5b9445..8e48c83 100644
--- a/extensions-core/datasketches/src/test/java/io/druid/query/aggregation/datasketches/theta/SketchAggregationWithSimpleDataTest.java
+++ b/extensions-core/datasketches/src/test/java/io/druid/query/aggregation/datasketches/theta/SketchAggregationWithSimpleDataTest.java
@@ -85,122 +85,127 @@ public class SketchAggregationWithSimpleDataTest
{
sm = new SketchModule();
sm.configure(null);
- AggregationTestHelper toolchest = AggregationTestHelper.createGroupByQueryAggregationTestHelper(
- sm.getJacksonModules(),
- config,
- tempFolder
- );
-
- s1 = tempFolder.newFolder();
- toolchest.createIndex(
- new File(this.getClass().getClassLoader().getResource("simple_test_data.tsv").getFile()),
- readFileFromClasspathAsString("simple_test_data_record_parser.json"),
- readFileFromClasspathAsString("simple_test_data_aggregators.json"),
- s1,
- 0,
- Granularities.NONE,
- 5000
- );
-
- s2 = tempFolder.newFolder();
- toolchest.createIndex(
- new File(this.getClass().getClassLoader().getResource("simple_test_data.tsv").getFile()),
- readFileFromClasspathAsString("simple_test_data_record_parser.json"),
- readFileFromClasspathAsString("simple_test_data_aggregators.json"),
- s2,
- 0,
- Granularities.NONE,
- 5000
- );
+ try (
+ final AggregationTestHelper toolchest = AggregationTestHelper.createGroupByQueryAggregationTestHelper(
+ sm.getJacksonModules(),
+ config,
+ tempFolder
+ )
+ ) {
+
+ s1 = tempFolder.newFolder();
+ toolchest.createIndex(
+ new File(this.getClass().getClassLoader().getResource("simple_test_data.tsv").getFile()),
+ readFileFromClasspathAsString("simple_test_data_record_parser.json"),
+ readFileFromClasspathAsString("simple_test_data_aggregators.json"),
+ s1,
+ 0,
+ Granularities.NONE,
+ 5000
+ );
+
+ s2 = tempFolder.newFolder();
+ toolchest.createIndex(
+ new File(this.getClass().getClassLoader().getResource("simple_test_data.tsv").getFile()),
+ readFileFromClasspathAsString("simple_test_data_record_parser.json"),
+ readFileFromClasspathAsString("simple_test_data_aggregators.json"),
+ s2,
+ 0,
+ Granularities.NONE,
+ 5000
+ );
+ }
}
-
@Test
public void testSimpleDataIngestAndGpByQuery() throws Exception
{
- AggregationTestHelper gpByQueryAggregationTestHelper = AggregationTestHelper.createGroupByQueryAggregationTestHelper(
- sm.getJacksonModules(),
- config,
- tempFolder
- );
-
- Sequence seq = gpByQueryAggregationTestHelper.runQueryOnSegments(
- ImmutableList.of(s1, s2),
- readFileFromClasspathAsString("simple_test_data_group_by_query.json")
- );
-
- List<Row> results = seq.toList();
- Assert.assertEquals(5, results.size());
- Assert.assertEquals(
- ImmutableList.of(
- new MapBasedRow(
- DateTimes.of("2014-10-19T00:00:00.000Z"),
- ImmutableMap
- .<String, Object>builder()
- .put("product", "product_3")
- .put("sketch_count", 38.0)
- .put("sketchEstimatePostAgg", 38.0)
- .put("sketchUnionPostAggEstimate", 38.0)
- .put("sketchIntersectionPostAggEstimate", 38.0)
- .put("sketchAnotBPostAggEstimate", 0.0)
- .put("non_existing_col_validation", 0.0)
- .build()
- ),
- new MapBasedRow(
- DateTimes.of("2014-10-19T00:00:00.000Z"),
- ImmutableMap
- .<String, Object>builder()
- .put("product", "product_1")
- .put("sketch_count", 42.0)
- .put("sketchEstimatePostAgg", 42.0)
- .put("sketchUnionPostAggEstimate", 42.0)
- .put("sketchIntersectionPostAggEstimate", 42.0)
- .put("sketchAnotBPostAggEstimate", 0.0)
- .put("non_existing_col_validation", 0.0)
- .build()
- ),
- new MapBasedRow(
- DateTimes.of("2014-10-19T00:00:00.000Z"),
- ImmutableMap
- .<String, Object>builder()
- .put("product", "product_2")
- .put("sketch_count", 42.0)
- .put("sketchEstimatePostAgg", 42.0)
- .put("sketchUnionPostAggEstimate", 42.0)
- .put("sketchIntersectionPostAggEstimate", 42.0)
- .put("sketchAnotBPostAggEstimate", 0.0)
- .put("non_existing_col_validation", 0.0)
- .build()
- ),
- new MapBasedRow(
- DateTimes.of("2014-10-19T00:00:00.000Z"),
- ImmutableMap
- .<String, Object>builder()
- .put("product", "product_4")
- .put("sketch_count", 42.0)
- .put("sketchEstimatePostAgg", 42.0)
- .put("sketchUnionPostAggEstimate", 42.0)
- .put("sketchIntersectionPostAggEstimate", 42.0)
- .put("sketchAnotBPostAggEstimate", 0.0)
- .put("non_existing_col_validation", 0.0)
- .build()
- ),
- new MapBasedRow(
- DateTimes.of("2014-10-19T00:00:00.000Z"),
- ImmutableMap
- .<String, Object>builder()
- .put("product", "product_5")
- .put("sketch_count", 42.0)
- .put("sketchEstimatePostAgg", 42.0)
- .put("sketchUnionPostAggEstimate", 42.0)
- .put("sketchIntersectionPostAggEstimate", 42.0)
- .put("sketchAnotBPostAggEstimate", 0.0)
- .put("non_existing_col_validation", 0.0)
- .build()
- )
- ),
- results
- );
+ try (
+ final AggregationTestHelper gpByQueryAggregationTestHelper = AggregationTestHelper.createGroupByQueryAggregationTestHelper(
+ sm.getJacksonModules(),
+ config,
+ tempFolder
+ )
+ ) {
+
+ Sequence seq = gpByQueryAggregationTestHelper.runQueryOnSegments(
+ ImmutableList.of(s1, s2),
+ readFileFromClasspathAsString("simple_test_data_group_by_query.json")
+ );
+
+ List<Row> results = seq.toList();
+ Assert.assertEquals(5, results.size());
+ Assert.assertEquals(
+ ImmutableList.of(
+ new MapBasedRow(
+ DateTimes.of("2014-10-19T00:00:00.000Z"),
+ ImmutableMap
+ .<String, Object>builder()
+ .put("product", "product_3")
+ .put("sketch_count", 38.0)
+ .put("sketchEstimatePostAgg", 38.0)
+ .put("sketchUnionPostAggEstimate", 38.0)
+ .put("sketchIntersectionPostAggEstimate", 38.0)
+ .put("sketchAnotBPostAggEstimate", 0.0)
+ .put("non_existing_col_validation", 0.0)
+ .build()
+ ),
+ new MapBasedRow(
+ DateTimes.of("2014-10-19T00:00:00.000Z"),
+ ImmutableMap
+ .<String, Object>builder()
+ .put("product", "product_1")
+ .put("sketch_count", 42.0)
+ .put("sketchEstimatePostAgg", 42.0)
+ .put("sketchUnionPostAggEstimate", 42.0)
+ .put("sketchIntersectionPostAggEstimate", 42.0)
+ .put("sketchAnotBPostAggEstimate", 0.0)
+ .put("non_existing_col_validation", 0.0)
+ .build()
+ ),
+ new MapBasedRow(
+ DateTimes.of("2014-10-19T00:00:00.000Z"),
+ ImmutableMap
+ .<String, Object>builder()
+ .put("product", "product_2")
+ .put("sketch_count", 42.0)
+ .put("sketchEstimatePostAgg", 42.0)
+ .put("sketchUnionPostAggEstimate", 42.0)
+ .put("sketchIntersectionPostAggEstimate", 42.0)
+ .put("sketchAnotBPostAggEstimate", 0.0)
+ .put("non_existing_col_validation", 0.0)
+ .build()
+ ),
+ new MapBasedRow(
+ DateTimes.of("2014-10-19T00:00:00.000Z"),
+ ImmutableMap
+ .<String, Object>builder()
+ .put("product", "product_4")
+ .put("sketch_count", 42.0)
+ .put("sketchEstimatePostAgg", 42.0)
+ .put("sketchUnionPostAggEstimate", 42.0)
+ .put("sketchIntersectionPostAggEstimate", 42.0)
+ .put("sketchAnotBPostAggEstimate", 0.0)
+ .put("non_existing_col_validation", 0.0)
+ .build()
+ ),
+ new MapBasedRow(
+ DateTimes.of("2014-10-19T00:00:00.000Z"),
+ ImmutableMap
+ .<String, Object>builder()
+ .put("product", "product_5")
+ .put("sketch_count", 42.0)
+ .put("sketchEstimatePostAgg", 42.0)
+ .put("sketchUnionPostAggEstimate", 42.0)
+ .put("sketchIntersectionPostAggEstimate", 42.0)
+ .put("sketchAnotBPostAggEstimate", 0.0)
+ .put("non_existing_col_validation", 0.0)
+ .build()
+ )
+ ),
+ results
+ );
+ }
}
@Test
diff --git a/extensions-core/datasketches/src/test/java/io/druid/query/aggregation/datasketches/theta/oldapi/OldApiSketchAggregationTest.java b/extensions-core/datasketches/src/test/java/io/druid/query/aggregation/datasketches/theta/oldapi/OldApiSketchAggregationTest.java
index 92e1fa9..07dfee2 100644
--- a/extensions-core/datasketches/src/test/java/io/druid/query/aggregation/datasketches/theta/oldapi/OldApiSketchAggregationTest.java
+++ b/extensions-core/datasketches/src/test/java/io/druid/query/aggregation/datasketches/theta/oldapi/OldApiSketchAggregationTest.java
@@ -37,6 +37,7 @@ import io.druid.query.groupby.GroupByQueryConfig;
import io.druid.query.groupby.GroupByQueryRunnerTest;
import io.druid.query.groupby.epinephelinae.GrouperTestUtil;
import io.druid.query.groupby.epinephelinae.TestColumnSelectorFactory;
+import org.junit.After;
import org.junit.Assert;
import org.junit.Rule;
import org.junit.Test;
@@ -82,6 +83,12 @@ public class OldApiSketchAggregationTest
return constructors;
}
+ @After
+ public void teardown() throws IOException
+ {
+ helper.close();
+ }
+
@Test
public void testSimpleDataIngestAndQuery() throws Exception
{
diff --git a/extensions-core/datasketches/src/test/java/io/druid/query/aggregation/datasketches/tuple/ArrayOfDoublesSketchAggregationTest.java b/extensions-core/datasketches/src/test/java/io/druid/query/aggregation/datasketches/tuple/ArrayOfDoublesSketchAggregationTest.java
index d5a2dec..7a8cdb7 100644
--- a/extensions-core/datasketches/src/test/java/io/druid/query/aggregation/datasketches/tuple/ArrayOfDoublesSketchAggregationTest.java
+++ b/extensions-core/datasketches/src/test/java/io/druid/query/aggregation/datasketches/tuple/ArrayOfDoublesSketchAggregationTest.java
@@ -28,6 +28,7 @@ import io.druid.java.util.common.guava.Sequence;
import io.druid.query.aggregation.AggregationTestHelper;
import io.druid.query.groupby.GroupByQueryConfig;
import io.druid.query.groupby.GroupByQueryRunnerTest;
+import org.junit.After;
import org.junit.Assert;
import org.junit.Rule;
import org.junit.Test;
@@ -36,6 +37,7 @@ import org.junit.runner.RunWith;
import org.junit.runners.Parameterized;
import java.io.File;
+import java.io.IOException;
import java.util.Collection;
import java.util.List;
@@ -66,6 +68,12 @@ public class ArrayOfDoublesSketchAggregationTest
return constructors;
}
+ @After
+ public void teardown() throws IOException
+ {
+ helper.close();
+ }
+
@Test
public void ingestingSketches() throws Exception
{
diff --git a/extensions-core/histogram/pom.xml b/extensions-core/histogram/pom.xml
index 21ea2c6..2051b1b 100644
--- a/extensions-core/histogram/pom.xml
+++ b/extensions-core/histogram/pom.xml
@@ -49,6 +49,13 @@
<!-- Tests -->
<dependency>
<groupId>io.druid</groupId>
+ <artifactId>druid-common</artifactId>
+ <version>${project.parent.version}</version>
+ <scope>test</scope>
+ <type>test-jar</type>
+ </dependency>
+ <dependency>
+ <groupId>io.druid</groupId>
<artifactId>druid-processing</artifactId>
<version>${project.parent.version}</version>
<scope>test</scope>
diff --git a/extensions-core/histogram/src/test/java/io/druid/query/aggregation/histogram/ApproximateHistogramAggregationTest.java b/extensions-core/histogram/src/test/java/io/druid/query/aggregation/histogram/ApproximateHistogramAggregationTest.java
index 261715d..0836acf 100644
--- a/extensions-core/histogram/src/test/java/io/druid/query/aggregation/histogram/ApproximateHistogramAggregationTest.java
+++ b/extensions-core/histogram/src/test/java/io/druid/query/aggregation/histogram/ApproximateHistogramAggregationTest.java
@@ -27,6 +27,7 @@ import io.druid.java.util.common.guava.Sequence;
import io.druid.query.aggregation.AggregationTestHelper;
import io.druid.query.groupby.GroupByQueryConfig;
import io.druid.query.groupby.GroupByQueryRunnerTest;
+import org.junit.After;
import org.junit.Assert;
import org.junit.Rule;
import org.junit.Test;
@@ -34,6 +35,7 @@ import org.junit.rules.TemporaryFolder;
import org.junit.runner.RunWith;
import org.junit.runners.Parameterized;
+import java.io.IOException;
import java.util.Collection;
import java.util.List;
@@ -68,6 +70,12 @@ public class ApproximateHistogramAggregationTest
return constructors;
}
+ @After
+ public void teardown() throws IOException
+ {
+ helper.close();
+ }
+
@Test
public void testIngestWithNullsIgnoredAndQuery() throws Exception
{
diff --git a/extensions-core/histogram/src/test/java/io/druid/query/aggregation/histogram/ApproximateHistogramGroupByQueryTest.java b/extensions-core/histogram/src/test/java/io/druid/query/aggregation/histogram/ApproximateHistogramGroupByQueryTest.java
index 4acf138..f10ee8d 100644
--- a/extensions-core/histogram/src/test/java/io/druid/query/aggregation/histogram/ApproximateHistogramGroupByQueryTest.java
+++ b/extensions-core/histogram/src/test/java/io/druid/query/aggregation/histogram/ApproximateHistogramGroupByQueryTest.java
@@ -22,7 +22,9 @@ package io.druid.query.aggregation.histogram;
import com.google.common.collect.ImmutableList;
import com.google.common.collect.Lists;
import io.druid.data.input.Row;
+import io.druid.java.util.common.Pair;
import io.druid.java.util.common.StringUtils;
+import io.druid.java.util.common.io.Closer;
import io.druid.query.QueryRunner;
import io.druid.query.QueryRunnerTestHelper;
import io.druid.query.dimension.DefaultDimensionSpec;
@@ -35,10 +37,12 @@ import io.druid.query.groupby.orderby.DefaultLimitSpec;
import io.druid.query.groupby.orderby.OrderByColumnSpec;
import io.druid.query.groupby.strategy.GroupByStrategySelector;
import io.druid.segment.TestHelper;
+import org.junit.After;
import org.junit.Test;
import org.junit.runner.RunWith;
import org.junit.runners.Parameterized;
+import java.io.IOException;
import java.util.Collections;
import java.util.List;
@@ -47,8 +51,10 @@ import java.util.List;
@RunWith(Parameterized.class)
public class ApproximateHistogramGroupByQueryTest
{
+ private static final Closer resourceCloser = Closer.create();
+
private final QueryRunner<Row> runner;
- private GroupByQueryRunnerFactory factory;
+ private final GroupByQueryRunnerFactory factory;
@Parameterized.Parameters(name = "{0}")
public static Iterable<Object[]> constructorFeeder()
@@ -113,7 +119,11 @@ public class ApproximateHistogramGroupByQueryTest
);
for (GroupByQueryConfig config : configs) {
- final GroupByQueryRunnerFactory factory = GroupByQueryRunnerTest.makeQueryRunnerFactory(config);
+ final Pair<GroupByQueryRunnerFactory, Closer> factoryAndCloser = GroupByQueryRunnerTest.makeQueryRunnerFactory(
+ config
+ );
+ final GroupByQueryRunnerFactory factory = factoryAndCloser.lhs;
+ resourceCloser.register(factoryAndCloser.rhs);
for (QueryRunner<Row> runner : QueryRunnerTestHelper.makeQueryRunners(factory)) {
final String testName = StringUtils.format(
"config=%s, runner=%s",
@@ -127,7 +137,11 @@ public class ApproximateHistogramGroupByQueryTest
return constructors;
}
- public ApproximateHistogramGroupByQueryTest(String testName, GroupByQueryRunnerFactory factory, QueryRunner runner)
+ public ApproximateHistogramGroupByQueryTest(
+ String testName,
+ GroupByQueryRunnerFactory factory,
+ QueryRunner runner
+ )
{
this.factory = factory;
this.runner = runner;
@@ -136,6 +150,12 @@ public class ApproximateHistogramGroupByQueryTest
new ApproximateHistogramDruidModule().configure(null);
}
+ @After
+ public void teardown() throws IOException
+ {
+ resourceCloser.close();
+ }
+
@Test
public void testGroupByWithApproximateHistogramAgg()
{
diff --git a/extensions-core/histogram/src/test/java/io/druid/query/aggregation/histogram/ApproximateHistogramTopNQueryTest.java b/extensions-core/histogram/src/test/java/io/druid/query/aggregation/histogram/ApproximateHistogramTopNQueryTest.java
index aa383b1..71d9cbf 100644
--- a/extensions-core/histogram/src/test/java/io/druid/query/aggregation/histogram/ApproximateHistogramTopNQueryTest.java
+++ b/extensions-core/histogram/src/test/java/io/druid/query/aggregation/histogram/ApproximateHistogramTopNQueryTest.java
@@ -22,8 +22,9 @@ package io.druid.query.aggregation.histogram;
import com.google.common.collect.ImmutableMap;
import com.google.common.collect.Iterables;
import com.google.common.collect.Lists;
-import io.druid.collections.StupidPool;
+import io.druid.collections.CloseableStupidPool;
import io.druid.java.util.common.DateTimes;
+import io.druid.java.util.common.io.Closer;
import io.druid.query.QueryPlus;
import io.druid.query.QueryRunner;
import io.druid.query.QueryRunnerTestHelper;
@@ -38,10 +39,12 @@ import io.druid.query.topn.TopNQueryQueryToolChest;
import io.druid.query.topn.TopNQueryRunnerFactory;
import io.druid.query.topn.TopNResultValue;
import io.druid.segment.TestHelper;
+import org.junit.AfterClass;
import org.junit.Test;
import org.junit.runner.RunWith;
import org.junit.runners.Parameterized;
+import java.io.IOException;
import java.nio.ByteBuffer;
import java.util.Arrays;
import java.util.Collections;
@@ -52,14 +55,30 @@ import java.util.Map;
@RunWith(Parameterized.class)
public class ApproximateHistogramTopNQueryTest
{
+ private static final Closer resourceCloser = Closer.create();
+
+ @AfterClass
+ public static void teardown() throws IOException
+ {
+ resourceCloser.close();
+ }
+
@Parameterized.Parameters(name = "{0}")
public static Iterable<Object[]> constructorFeeder()
{
+ final CloseableStupidPool<ByteBuffer> defaultPool = TestQueryRunners.createDefaultNonBlockingPool();
+ final CloseableStupidPool<ByteBuffer> customPool = new CloseableStupidPool<>(
+ "TopNQueryRunnerFactory-bufferPool",
+ () -> ByteBuffer.allocate(2000)
+ );
+ resourceCloser.register(defaultPool);
+ resourceCloser.register(customPool);
+
return QueryRunnerTestHelper.transformToConstructionFeeder(
Iterables.concat(
QueryRunnerTestHelper.makeQueryRunners(
new TopNQueryRunnerFactory(
- TestQueryRunners.getPool(),
+ defaultPool,
new TopNQueryQueryToolChest(
new TopNQueryConfig(),
QueryRunnerTestHelper.NoopIntervalChunkingQueryRunnerDecorator()
@@ -69,10 +88,7 @@ public class ApproximateHistogramTopNQueryTest
),
QueryRunnerTestHelper.makeQueryRunners(
new TopNQueryRunnerFactory(
- new StupidPool<ByteBuffer>(
- "TopNQueryRunnerFactory-bufferPool",
- () -> ByteBuffer.allocate(2000)
- ),
+ customPool,
new TopNQueryQueryToolChest(
new TopNQueryConfig(),
QueryRunnerTestHelper.NoopIntervalChunkingQueryRunnerDecorator()
diff --git a/extensions-core/histogram/src/test/java/io/druid/query/aggregation/histogram/sql/QuantileSqlAggregatorTest.java b/extensions-core/histogram/src/test/java/io/druid/query/aggregation/histogram/sql/QuantileSqlAggregatorTest.java
index e4d6faa..e17dbf0 100644
--- a/extensions-core/histogram/src/test/java/io/druid/query/aggregation/histogram/sql/QuantileSqlAggregatorTest.java
+++ b/extensions-core/histogram/src/test/java/io/druid/query/aggregation/histogram/sql/QuantileSqlAggregatorTest.java
@@ -24,9 +24,12 @@ import com.google.common.collect.ImmutableMap;
import com.google.common.collect.ImmutableSet;
import com.google.common.collect.Iterables;
import io.druid.common.config.NullHandling;
+import io.druid.java.util.common.Pair;
import io.druid.java.util.common.granularity.Granularities;
+import io.druid.java.util.common.io.Closer;
import io.druid.query.Druids;
import io.druid.query.QueryDataSource;
+import io.druid.query.QueryRunnerFactoryConglomerate;
import io.druid.query.aggregation.CountAggregatorFactory;
import io.druid.query.aggregation.DoubleSumAggregatorFactory;
import io.druid.query.aggregation.FilteredAggregatorFactory;
@@ -64,18 +67,39 @@ import io.druid.sql.calcite.util.SpecificSegmentsQuerySegmentWalker;
import io.druid.timeline.DataSegment;
import io.druid.timeline.partition.LinearShardSpec;
import org.junit.After;
+import org.junit.AfterClass;
import org.junit.Assert;
import org.junit.Before;
+import org.junit.BeforeClass;
import org.junit.Rule;
import org.junit.Test;
import org.junit.rules.TemporaryFolder;
+import java.io.IOException;
import java.util.List;
public class QuantileSqlAggregatorTest extends CalciteTestBase
{
private static final String DATA_SOURCE = "foo";
+ private static QueryRunnerFactoryConglomerate conglomerate;
+ private static Closer resourceCloser;
+
+ @BeforeClass
+ public static void setUpClass()
+ {
+ final Pair<QueryRunnerFactoryConglomerate, Closer> conglomerateCloserPair = CalciteTests
+ .createQueryRunnerFactoryConglomerate();
+ conglomerate = conglomerateCloserPair.lhs;
+ resourceCloser = conglomerateCloserPair.rhs;
+ }
+
+ @AfterClass
+ public static void tearDownClass() throws IOException
+ {
+ resourceCloser.close();
+ }
+
@Rule
public TemporaryFolder temporaryFolder = new TemporaryFolder();
@@ -114,7 +138,7 @@ public class QuantileSqlAggregatorTest extends CalciteTestBase
.rows(CalciteTests.ROWS1)
.buildMMappedIndex();
- walker = new SpecificSegmentsQuerySegmentWalker(CalciteTests.queryRunnerFactoryConglomerate()).add(
+ walker = new SpecificSegmentsQuerySegmentWalker(conglomerate).add(
DataSegment.builder()
.dataSource(DATA_SOURCE)
.interval(index.getDataInterval())
@@ -125,7 +149,7 @@ public class QuantileSqlAggregatorTest extends CalciteTestBase
);
final PlannerConfig plannerConfig = new PlannerConfig();
- final DruidSchema druidSchema = CalciteTests.createMockSchema(walker, plannerConfig);
+ final DruidSchema druidSchema = CalciteTests.createMockSchema(conglomerate, walker, plannerConfig);
final DruidOperatorTable operatorTable = new DruidOperatorTable(
ImmutableSet.of(new QuantileSqlAggregator()),
ImmutableSet.of()
@@ -133,7 +157,7 @@ public class QuantileSqlAggregatorTest extends CalciteTestBase
plannerFactory = new PlannerFactory(
druidSchema,
- CalciteTests.createMockQueryLifecycleFactory(walker),
+ CalciteTests.createMockQueryLifecycleFactory(walker, conglomerate),
operatorTable,
CalciteTests.createExprMacroTable(),
plannerConfig,
diff --git a/extensions-core/stats/pom.xml b/extensions-core/stats/pom.xml
index aec0745..0958d55 100644
--- a/extensions-core/stats/pom.xml
+++ b/extensions-core/stats/pom.xml
@@ -48,6 +48,13 @@
<!-- Tests -->
<dependency>
<groupId>io.druid</groupId>
+ <artifactId>druid-common</artifactId>
+ <version>${project.parent.version}</version>
+ <scope>test</scope>
+ <type>test-jar</type>
+ </dependency>
+ <dependency>
+ <groupId>io.druid</groupId>
<artifactId>druid-processing</artifactId>
<version>${project.parent.version}</version>
<scope>test</scope>
diff --git a/extensions-core/stats/src/test/java/io/druid/query/aggregation/variance/VarianceGroupByQueryTest.java b/extensions-core/stats/src/test/java/io/druid/query/aggregation/variance/VarianceGroupByQueryTest.java
index 5acfe77..ef3ca6f 100644
--- a/extensions-core/stats/src/test/java/io/druid/query/aggregation/variance/VarianceGroupByQueryTest.java
+++ b/extensions-core/stats/src/test/java/io/druid/query/aggregation/variance/VarianceGroupByQueryTest.java
@@ -62,7 +62,12 @@ public class VarianceGroupByQueryTest
return GroupByQueryRunnerTest.constructorFeeder();
}
- public VarianceGroupByQueryTest(String testName, GroupByQueryConfig config, GroupByQueryRunnerFactory factory, QueryRunner runner)
+ public VarianceGroupByQueryTest(
+ String testName,
+ GroupByQueryConfig config,
+ GroupByQueryRunnerFactory factory,
+ QueryRunner runner
+ )
{
this.testName = testName;
this.config = config;
diff --git a/pom.xml b/pom.xml
index 96f26be..641c552 100644
--- a/pom.xml
+++ b/pom.xml
@@ -1015,9 +1015,15 @@
<version>2.19.1</version>
<configuration>
<!-- locale settings must be set on the command line before startup -->
- <!-- set heap size to work around https://github.com/travis-ci/travis-ci/issues/3396 -->
- <argLine>-Xmx3000m -Duser.language=en -Duser.country=US -Dfile.encoding=UTF-8
- -Duser.timezone=UTC -Djava.util.logging.manager=org.apache.logging.log4j.jul.LogManager
+ <!-- set default options -->
+ <argLine>
+ -Xmx1500m
+ -XX:MaxDirectMemorySize=512m
+ -Duser.language=en
+ -Duser.GroupByQueryRunnerTest.javacountry=US
+ -Dfile.encoding=UTF-8
+ -Duser.timezone=UTC
+ -Djava.util.logging.manager=org.apache.logging.log4j.jul.LogManager
<!--@TODO After fixing https://github.com/druid-io/druid/issues/4964 remove this parameter-->
-Ddruid.indexing.doubleStorage=double
</argLine>
diff --git a/processing/pom.xml b/processing/pom.xml
index 30d1bfe..622b4a8 100644
--- a/processing/pom.xml
+++ b/processing/pom.xml
@@ -119,6 +119,13 @@
<!-- Tests -->
<dependency>
+ <groupId>io.druid</groupId>
+ <artifactId>druid-common</artifactId>
+ <version>${project.parent.version}</version>
+ <type>test-jar</type>
+ <scope>test</scope>
+ </dependency>
+ <dependency>
<groupId>junit</groupId>
<artifactId>junit</artifactId>
<scope>test</scope>
@@ -189,6 +196,21 @@
<groupId>org.apache.maven.plugins</groupId>
<artifactId>maven-surefire-plugin</artifactId>
<configuration>
+ <!-- locale settings must be set on the command line before startup -->
+ <!-- set default options -->
+ <argLine>
+ -Xmx512m
+ -XX:MaxDirectMemorySize=1500m
+ -Duser.language=en
+ -Duser.GroupByQueryRunnerTest.javacountry=US
+ -Dfile.encoding=UTF-8
+ -Duser.timezone=UTC
+ -Djava.util.logging.manager=org.apache.logging.log4j.jul.LogManager
+ <!--@TODO After fixing https://github.com/druid-io/druid/issues/4964 remove this parameter-->
+ -Ddruid.indexing.doubleStorage=double
+ </argLine>
+ <!-- our tests are very verbose, let's keep the volume down -->
+ <redirectTestOutputToFile>true</redirectTestOutputToFile>
<excludedGroups>io.druid.collections.test.annotation.Benchmark</excludedGroups>
</configuration>
</plugin>
diff --git a/processing/src/test/java/io/druid/query/MultiValuedDimensionTest.java b/processing/src/test/java/io/druid/query/MultiValuedDimensionTest.java
index 25980a3..2487ffe 100644
--- a/processing/src/test/java/io/druid/query/MultiValuedDimensionTest.java
+++ b/processing/src/test/java/io/druid/query/MultiValuedDimensionTest.java
@@ -25,6 +25,7 @@ import com.google.common.collect.ImmutableSet;
import com.google.common.collect.Lists;
import com.google.common.collect.Maps;
import com.google.common.io.Files;
+import io.druid.collections.CloseableStupidPool;
import io.druid.data.input.Row;
import io.druid.data.input.impl.CSVParseSpec;
import io.druid.data.input.impl.DimensionsSpec;
@@ -67,6 +68,8 @@ import org.junit.runner.RunWith;
import org.junit.runners.Parameterized;
import java.io.File;
+import java.io.IOException;
+import java.nio.ByteBuffer;
import java.util.Arrays;
import java.util.Collection;
import java.util.Collections;
@@ -145,6 +148,12 @@ public class MultiValuedDimensionTest
queryableIndex = TestHelper.getTestIndexIO(segmentWriteOutMediumFactory).loadIndex(persistedSegmentDir);
}
+ @After
+ public void teardown() throws IOException
+ {
+ helper.close();
+ }
+
@Test
public void testGroupByNoFilter()
{
@@ -256,35 +265,37 @@ public class MultiValuedDimensionTest
.threshold(5)
.filters(new SelectorDimFilter("tags", "t3", null)).build();
- QueryRunnerFactory factory = new TopNQueryRunnerFactory(
- TestQueryRunners.getPool(),
- new TopNQueryQueryToolChest(
- new TopNQueryConfig(),
- QueryRunnerTestHelper.NoopIntervalChunkingQueryRunnerDecorator()
- ),
- QueryRunnerTestHelper.NOOP_QUERYWATCHER
- );
- QueryRunner<Result<TopNResultValue>> runner = QueryRunnerTestHelper.makeQueryRunner(
- factory,
- new QueryableIndexSegment("sid1", queryableIndex),
- null
- );
- Map<String, Object> context = Maps.newHashMap();
- Sequence<Result<TopNResultValue>> result = runner.run(QueryPlus.wrap(query), context);
- List<Result<TopNResultValue>> expectedResults = Collections.singletonList(
- new Result<TopNResultValue>(
- DateTimes.of("2011-01-12T00:00:00.000Z"),
- new TopNResultValue(
- Collections.<Map<String, Object>>singletonList(
- ImmutableMap.of(
- "tags", "t3",
- "count", 2L
- )
- )
- )
- )
- );
- TestHelper.assertExpectedObjects(expectedResults, result.toList(), "");
+ try (CloseableStupidPool<ByteBuffer> pool = TestQueryRunners.createDefaultNonBlockingPool()) {
+ QueryRunnerFactory factory = new TopNQueryRunnerFactory(
+ pool,
+ new TopNQueryQueryToolChest(
+ new TopNQueryConfig(),
+ QueryRunnerTestHelper.NoopIntervalChunkingQueryRunnerDecorator()
+ ),
+ QueryRunnerTestHelper.NOOP_QUERYWATCHER
+ );
+ QueryRunner<Result<TopNResultValue>> runner = QueryRunnerTestHelper.makeQueryRunner(
+ factory,
+ new QueryableIndexSegment("sid1", queryableIndex),
+ null
+ );
+ Map<String, Object> context = Maps.newHashMap();
+ Sequence<Result<TopNResultValue>> result = runner.run(QueryPlus.wrap(query), context);
+ List<Result<TopNResultValue>> expectedResults = Collections.singletonList(
+ new Result<TopNResultValue>(
+ DateTimes.of("2011-01-12T00:00:00.000Z"),
+ new TopNResultValue(
+ Collections.<Map<String, Object>>singletonList(
+ ImmutableMap.of(
+ "tags", "t3",
+ "count", 2L
+ )
+ )
+ )
+ )
+ );
+ TestHelper.assertExpectedObjects(expectedResults, result.toList(), "");
+ }
}
@After
diff --git a/processing/src/test/java/io/druid/query/TestQueryRunners.java b/processing/src/test/java/io/druid/query/TestQueryRunners.java
index e3488b7..44b209b 100644
--- a/processing/src/test/java/io/druid/query/TestQueryRunners.java
+++ b/processing/src/test/java/io/druid/query/TestQueryRunners.java
@@ -19,14 +19,13 @@
package io.druid.query;
-import com.google.common.base.Supplier;
import com.google.common.base.Suppliers;
+import io.druid.collections.CloseableStupidPool;
import io.druid.collections.NonBlockingPool;
-import io.druid.collections.StupidPool;
+import io.druid.query.search.SearchQueryConfig;
import io.druid.query.search.SearchQueryQueryToolChest;
import io.druid.query.search.SearchQueryRunnerFactory;
import io.druid.query.search.SearchStrategySelector;
-import io.druid.query.search.SearchQueryConfig;
import io.druid.query.timeboundary.TimeBoundaryQueryRunnerFactory;
import io.druid.query.timeseries.TimeseriesQueryEngine;
import io.druid.query.timeseries.TimeseriesQueryQueryToolChest;
@@ -42,27 +41,17 @@ import java.nio.ByteBuffer;
*/
public class TestQueryRunners
{
- public static final NonBlockingPool<ByteBuffer> pool = new StupidPool<ByteBuffer>(
- "TestQueryRunners-bufferPool",
- new Supplier<ByteBuffer>()
- {
- @Override
- public ByteBuffer get()
- {
- return ByteBuffer.allocate(1024 * 1024 * 10);
- }
- }
- );
- public static final TopNQueryConfig topNConfig = new TopNQueryConfig();
+ private static final TopNQueryConfig topNConfig = new TopNQueryConfig();
- public static NonBlockingPool<ByteBuffer> getPool()
+ public static CloseableStupidPool<ByteBuffer> createDefaultNonBlockingPool()
{
- return pool;
+ return new CloseableStupidPool<>(
+ "TestQueryRunners-bufferPool",
+ () -> ByteBuffer.allocate(1024 * 1024 * 10)
+ );
}
- public static <T> QueryRunner<T> makeTopNQueryRunner(
- Segment adapter
- )
+ public static <T> QueryRunner<T> makeTopNQueryRunner(Segment adapter, NonBlockingPool<ByteBuffer> pool)
{
QueryRunnerFactory factory = new TopNQueryRunnerFactory(
pool,
@@ -78,9 +67,7 @@ public class TestQueryRunners
);
}
- public static <T> QueryRunner<T> makeTimeSeriesQueryRunner(
- Segment adapter
- )
+ public static <T> QueryRunner<T> makeTimeSeriesQueryRunner(Segment adapter)
{
QueryRunnerFactory factory = new TimeseriesQueryRunnerFactory(
new TimeseriesQueryQueryToolChest(
@@ -95,9 +82,7 @@ public class TestQueryRunners
);
}
- public static <T> QueryRunner<T> makeSearchQueryRunner(
- Segment adapter
- )
+ public static <T> QueryRunner<T> makeSearchQueryRunner(Segment adapter)
{
final SearchQueryConfig config = new SearchQueryConfig();
QueryRunnerFactory factory = new SearchQueryRunnerFactory(
@@ -114,9 +99,7 @@ public class TestQueryRunners
);
}
- public static <T> QueryRunner<T> makeTimeBoundaryQueryRunner(
- Segment adapter
- )
+ public static <T> QueryRunner<T> makeTimeBoundaryQueryRunner(Segment adapter)
{
QueryRunnerFactory factory = new TimeBoundaryQueryRunnerFactory(QueryRunnerTestHelper.NOOP_QUERYWATCHER);
return new FinalizeResultsQueryRunner<T>(
diff --git a/processing/src/test/java/io/druid/query/aggregation/AggregationTestHelper.java b/processing/src/test/java/io/druid/query/aggregation/AggregationTestHelper.java
index d738008..869d1ac 100644
--- a/processing/src/test/java/io/druid/query/aggregation/AggregationTestHelper.java
+++ b/processing/src/test/java/io/druid/query/aggregation/AggregationTestHelper.java
@@ -34,19 +34,20 @@ import com.google.common.collect.Lists;
import com.google.common.collect.Maps;
import com.google.common.io.Closeables;
import com.google.common.util.concurrent.MoreExecutors;
-import io.druid.collections.StupidPool;
+import io.druid.collections.CloseableStupidPool;
import io.druid.data.input.InputRow;
import io.druid.data.input.Row;
import io.druid.data.input.impl.InputRowParser;
import io.druid.data.input.impl.StringInputRowParser;
import io.druid.java.util.common.IAE;
+import io.druid.java.util.common.Pair;
import io.druid.java.util.common.granularity.Granularity;
import io.druid.java.util.common.guava.CloseQuietly;
import io.druid.java.util.common.guava.Sequence;
import io.druid.java.util.common.guava.Sequences;
import io.druid.java.util.common.guava.Yielder;
import io.druid.java.util.common.guava.YieldingAccumulator;
-import io.druid.segment.writeout.OffHeapMemorySegmentWriteOutMediumFactory;
+import io.druid.java.util.common.io.Closer;
import io.druid.query.FinalizeResultsQueryRunner;
import io.druid.query.Query;
import io.druid.query.QueryPlus;
@@ -79,10 +80,12 @@ import io.druid.segment.TestHelper;
import io.druid.segment.column.ColumnConfig;
import io.druid.segment.incremental.IncrementalIndex;
import io.druid.segment.incremental.IncrementalIndexSchema;
+import io.druid.segment.writeout.OffHeapMemorySegmentWriteOutMediumFactory;
import org.apache.commons.io.IOUtils;
import org.apache.commons.io.LineIterator;
import org.junit.rules.TemporaryFolder;
+import java.io.Closeable;
import java.io.File;
import java.io.FileInputStream;
import java.io.IOException;
@@ -101,7 +104,7 @@ import java.util.Map;
* It allows you to create index from raw data, run a group by query on it which simulates query processing inside
* of a druid cluster exercising most of the features from aggregation and returns the results that you could verify.
*/
-public class AggregationTestHelper
+public class AggregationTestHelper implements Closeable
{
private final ObjectMapper mapper;
private final IndexMerger indexMerger;
@@ -110,6 +113,7 @@ public class AggregationTestHelper
private final QueryRunnerFactory factory;
private final TemporaryFolder tempFolder;
+ private final Closer resourceCloser;
private AggregationTestHelper(
ObjectMapper mapper,
@@ -118,7 +122,8 @@ public class AggregationTestHelper
QueryToolChest toolchest,
QueryRunnerFactory factory,
TemporaryFolder tempFolder,
- List<? extends Module> jsonModulesToRegister
+ List<? extends Module> jsonModulesToRegister,
+ Closer resourceCloser
)
{
this.mapper = mapper;
@@ -127,20 +132,26 @@ public class AggregationTestHelper
this.toolChest = toolchest;
this.factory = factory;
this.tempFolder = tempFolder;
+ this.resourceCloser = resourceCloser;
for (Module mod : jsonModulesToRegister) {
mapper.registerModule(mod);
}
}
- public static final AggregationTestHelper createGroupByQueryAggregationTestHelper(
+ public static AggregationTestHelper createGroupByQueryAggregationTestHelper(
List<? extends Module> jsonModulesToRegister,
GroupByQueryConfig config,
TemporaryFolder tempFolder
)
{
- ObjectMapper mapper = TestHelper.makeJsonMapper();
- GroupByQueryRunnerFactory factory = GroupByQueryRunnerTest.makeQueryRunnerFactory(mapper, config);
+ final ObjectMapper mapper = TestHelper.makeJsonMapper();
+ final Pair<GroupByQueryRunnerFactory, Closer> factoryAndCloser = GroupByQueryRunnerTest.makeQueryRunnerFactory(
+ mapper,
+ config
+ );
+ final GroupByQueryRunnerFactory factory = factoryAndCloser.lhs;
+ final Closer closer = factoryAndCloser.rhs;
IndexIO indexIO = new IndexIO(
mapper,
@@ -162,11 +173,12 @@ public class AggregationTestHelper
factory.getToolchest(),
factory,
tempFolder,
- jsonModulesToRegister
+ jsonModulesToRegister,
+ closer
);
}
- public static final AggregationTestHelper createSelectQueryAggregationTestHelper(
+ public static AggregationTestHelper createSelectQueryAggregationTestHelper(
List<? extends Module> jsonModulesToRegister,
TemporaryFolder tempFolder
)
@@ -218,11 +230,12 @@ public class AggregationTestHelper
toolchest,
factory,
tempFolder,
- jsonModulesToRegister
+ jsonModulesToRegister,
+ Closer.create()
);
}
- public static final AggregationTestHelper createTimeseriesQueryAggregationTestHelper(
+ public static AggregationTestHelper createTimeseriesQueryAggregationTestHelper(
List<? extends Module> jsonModulesToRegister,
TemporaryFolder tempFolder
)
@@ -259,11 +272,12 @@ public class AggregationTestHelper
toolchest,
factory,
tempFolder,
- jsonModulesToRegister
+ jsonModulesToRegister,
+ Closer.create()
);
}
- public static final AggregationTestHelper createTopNQueryAggregationTestHelper(
+ public static AggregationTestHelper createTopNQueryAggregationTestHelper(
List<? extends Module> jsonModulesToRegister,
TemporaryFolder tempFolder
)
@@ -275,18 +289,20 @@ public class AggregationTestHelper
QueryRunnerTestHelper.NoopIntervalChunkingQueryRunnerDecorator()
);
+ final CloseableStupidPool<ByteBuffer> pool = new CloseableStupidPool<>(
+ "TopNQueryRunnerFactory-bufferPool",
+ new Supplier<ByteBuffer>()
+ {
+ @Override
+ public ByteBuffer get()
+ {
+ return ByteBuffer.allocate(10 * 1024 * 1024);
+ }
+ }
+ );
+ final Closer resourceCloser = Closer.create();
TopNQueryRunnerFactory factory = new TopNQueryRunnerFactory(
- new StupidPool<>(
- "TopNQueryRunnerFactory-bufferPool",
- new Supplier<ByteBuffer>()
- {
- @Override
- public ByteBuffer get()
- {
- return ByteBuffer.allocate(10 * 1024 * 1024);
- }
- }
- ),
+ pool,
toolchest,
QueryRunnerTestHelper.NOOP_QUERYWATCHER
);
@@ -311,7 +327,8 @@ public class AggregationTestHelper
toolchest,
factory,
tempFolder,
- jsonModulesToRegister
+ jsonModulesToRegister,
+ resourceCloser
);
}
@@ -647,5 +664,11 @@ public class AggregationTestHelper
results[1] = (T) agg.get(newBuf, 7574);
return results;
}
+
+ @Override
+ public void close() throws IOException
+ {
+ resourceCloser.close();
+ }
}
diff --git a/processing/src/test/java/io/druid/query/aggregation/hyperloglog/HyperUniquesAggregationTest.java b/processing/src/test/java/io/druid/query/aggregation/hyperloglog/HyperUniquesAggregationTest.java
index c4df309..c2c945a 100644
--- a/processing/src/test/java/io/druid/query/aggregation/hyperloglog/HyperUniquesAggregationTest.java
+++ b/processing/src/test/java/io/druid/query/aggregation/hyperloglog/HyperUniquesAggregationTest.java
@@ -65,123 +65,129 @@ public class HyperUniquesAggregationTest
@Test
public void testIngestAndQuery() throws Exception
{
- AggregationTestHelper helper = AggregationTestHelper.createGroupByQueryAggregationTestHelper(
- Collections.singletonList(new AggregatorsModule()),
- config,
- tempFolder
- );
-
- String metricSpec = "[{"
- + "\"type\": \"hyperUnique\","
- + "\"name\": \"index_hll\","
- + "\"fieldName\": \"market\""
- + "}]";
-
- String parseSpec = "{"
- + "\"type\" : \"string\","
- + "\"parseSpec\" : {"
- + " \"format\" : \"tsv\","
- + " \"timestampSpec\" : {"
- + " \"column\" : \"timestamp\","
- + " \"format\" : \"auto\""
- + "},"
- + " \"dimensionsSpec\" : {"
- + " \"dimensions\": [],"
- + " \"dimensionExclusions\" : [],"
- + " \"spatialDimensions\" : []"
- + " },"
- + " \"columns\": [\"timestamp\", \"market\", \"quality\", \"placement\", \"placementish\", \"index\"]"
- + " }"
- + "}";
-
- String query = "{"
- + "\"queryType\": \"groupBy\","
- + "\"dataSource\": \"test_datasource\","
- + "\"granularity\": \"ALL\","
- + "\"dimensions\": [],"
- + "\"aggregations\": ["
- + " { \"type\": \"hyperUnique\", \"name\": \"index_hll\", \"fieldName\": \"index_hll\" }"
- + "],"
- + "\"postAggregations\": ["
- + " { \"type\": \"hyperUniqueCardinality\", \"name\": \"index_unique_count\", \"fieldName\": \"index_hll\" }"
- + "],"
- + "\"intervals\": [ \"1970/2050\" ]"
- + "}";
-
- Sequence seq = helper.createIndexAndRunQueryOnSegment(
- new File(this.getClass().getClassLoader().getResource("druid.sample.tsv").getFile()),
- parseSpec,
- metricSpec,
- 0,
- Granularities.NONE,
- 50000,
- query
- );
-
- MapBasedRow row = (MapBasedRow) seq.toList().get(0);
- Assert.assertEquals(3.0, row.getMetric("index_hll").floatValue(), 0.1);
- Assert.assertEquals(3.0, row.getMetric("index_unique_count").floatValue(), 0.1);
+ try (
+ final AggregationTestHelper helper = AggregationTestHelper.createGroupByQueryAggregationTestHelper(
+ Collections.singletonList(new AggregatorsModule()),
+ config,
+ tempFolder
+ )
+ ) {
+
+ String metricSpec = "[{"
+ + "\"type\": \"hyperUnique\","
+ + "\"name\": \"index_hll\","
+ + "\"fieldName\": \"market\""
+ + "}]";
+
+ String parseSpec = "{"
+ + "\"type\" : \"string\","
+ + "\"parseSpec\" : {"
+ + " \"format\" : \"tsv\","
+ + " \"timestampSpec\" : {"
+ + " \"column\" : \"timestamp\","
+ + " \"format\" : \"auto\""
+ + "},"
+ + " \"dimensionsSpec\" : {"
+ + " \"dimensions\": [],"
+ + " \"dimensionExclusions\" : [],"
+ + " \"spatialDimensions\" : []"
+ + " },"
+ + " \"columns\": [\"timestamp\", \"market\", \"quality\", \"placement\", \"placementish\", \"index\"]"
+ + " }"
+ + "}";
+
+ String query = "{"
+ + "\"queryType\": \"groupBy\","
+ + "\"dataSource\": \"test_datasource\","
+ + "\"granularity\": \"ALL\","
+ + "\"dimensions\": [],"
+ + "\"aggregations\": ["
+ + " { \"type\": \"hyperUnique\", \"name\": \"index_hll\", \"fieldName\": \"index_hll\" }"
+ + "],"
+ + "\"postAggregations\": ["
+ + " { \"type\": \"hyperUniqueCardinality\", \"name\": \"index_unique_count\", \"fieldName\": \"index_hll\" }"
+ + "],"
+ + "\"intervals\": [ \"1970/2050\" ]"
+ + "}";
+
+ Sequence seq = helper.createIndexAndRunQueryOnSegment(
+ new File(this.getClass().getClassLoader().getResource("druid.sample.tsv").getFile()),
+ parseSpec,
+ metricSpec,
+ 0,
+ Granularities.NONE,
+ 50000,
+ query
+ );
+
+ MapBasedRow row = (MapBasedRow) seq.toList().get(0);
+ Assert.assertEquals(3.0, row.getMetric("index_hll").floatValue(), 0.1);
+ Assert.assertEquals(3.0, row.getMetric("index_unique_count").floatValue(), 0.1);
+ }
}
@Test
public void testIngestAndQueryPrecomputedHll() throws Exception
{
- AggregationTestHelper helper = AggregationTestHelper.createGroupByQueryAggregationTestHelper(
+ try (
+ final AggregationTestHelper helper = AggregationTestHelper.createGroupByQueryAggregationTestHelper(
Collections.singletonList(new AggregatorsModule()),
config,
tempFolder
- );
-
- String metricSpec = "[{"
- + "\"type\": \"hyperUnique\","
- + "\"name\": \"index_hll\","
- + "\"fieldName\": \"preComputedHll\","
- + "\"isInputHyperUnique\": true"
- + "}]";
-
- String parseSpec = "{"
- + "\"type\" : \"string\","
- + "\"parseSpec\" : {"
- + " \"format\" : \"tsv\","
- + " \"timestampSpec\" : {"
- + " \"column\" : \"timestamp\","
- + " \"format\" : \"auto\""
- + "},"
- + " \"dimensionsSpec\" : {"
- + " \"dimensions\": [],"
- + " \"dimensionExclusions\" : [],"
- + " \"spatialDimensions\" : []"
- + " },"
- + " \"columns\": [\"timestamp\", \"market\", \"preComputedHll\"]"
- + " }"
- + "}";
-
- String query = "{"
- + "\"queryType\": \"groupBy\","
- + "\"dataSource\": \"test_datasource\","
- + "\"granularity\": \"ALL\","
- + "\"dimensions\": [],"
- + "\"aggregations\": ["
- + " { \"type\": \"hyperUnique\", \"name\": \"index_hll\", \"fieldName\": \"index_hll\" }"
- + "],"
- + "\"postAggregations\": ["
- + " { \"type\": \"hyperUniqueCardinality\", \"name\": \"index_unique_count\", \"fieldName\": \"index_hll\" }"
- + "],"
- + "\"intervals\": [ \"1970/2050\" ]"
- + "}";
-
- Sequence seq = helper.createIndexAndRunQueryOnSegment(
- new File(this.getClass().getClassLoader().getResource("druid.hll.sample.tsv").getFile()),
- parseSpec,
- metricSpec,
- 0,
- Granularities.DAY,
- 50000,
- query
- );
-
- MapBasedRow row = (MapBasedRow) seq.toList().get(0);
- Assert.assertEquals(4.0, row.getMetric("index_hll").floatValue(), 0.1);
- Assert.assertEquals(4.0, row.getMetric("index_unique_count").floatValue(), 0.1);
+ )
+ ) {
+
+ String metricSpec = "[{"
+ + "\"type\": \"hyperUnique\","
+ + "\"name\": \"index_hll\","
+ + "\"fieldName\": \"preComputedHll\","
+ + "\"isInputHyperUnique\": true"
+ + "}]";
+
+ String parseSpec = "{"
+ + "\"type\" : \"string\","
+ + "\"parseSpec\" : {"
+ + " \"format\" : \"tsv\","
+ + " \"timestampSpec\" : {"
+ + " \"column\" : \"timestamp\","
+ + " \"format\" : \"auto\""
+ + "},"
+ + " \"dimensionsSpec\" : {"
+ + " \"dimensions\": [],"
+ + " \"dimensionExclusions\" : [],"
+ + " \"spatialDimensions\" : []"
+ + " },"
+ + " \"columns\": [\"timestamp\", \"market\", \"preComputedHll\"]"
+ + " }"
+ + "}";
+
+ String query = "{"
+ + "\"queryType\": \"groupBy\","
+ + "\"dataSource\": \"test_datasource\","
+ + "\"granularity\": \"ALL\","
+ + "\"dimensions\": [],"
+ + "\"aggregations\": ["
+ + " { \"type\": \"hyperUnique\", \"name\": \"index_hll\", \"fieldName\": \"index_hll\" }"
+ + "],"
+ + "\"postAggregations\": ["
+ + " { \"type\": \"hyperUniqueCardinality\", \"name\": \"index_unique_count\", \"fieldName\": \"index_hll\" }"
+ + "],"
+ + "\"intervals\": [ \"1970/2050\" ]"
+ + "}";
+
+ Sequence seq = helper.createIndexAndRunQueryOnSegment(
+ new File(this.getClass().getClassLoader().getResource("druid.hll.sample.tsv").getFile()),
+ parseSpec,
+ metricSpec,
+ 0,
+ Granularities.DAY,
+ 50000,
+ query
+ );
+
+ MapBasedRow row = (MapBasedRow) seq.toList().get(0);
+ Assert.assertEquals(4.0, row.getMetric("index_hll").floatValue(), 0.1);
+ Assert.assertEquals(4.0, row.getMetric("index_unique_count").floatValue(), 0.1);
+ }
}
}
diff --git a/processing/src/test/java/io/druid/query/aggregation/post/FinalizingFieldAccessPostAggregatorTest.java b/processing/src/test/java/io/druid/query/aggregation/post/FinalizingFieldAccessPostAggregatorTest.java
index 7c1e69c..7af6633 100644
--- a/processing/src/test/java/io/druid/query/aggregation/post/FinalizingFieldAccessPostAggregatorTest.java
+++ b/processing/src/test/java/io/druid/query/aggregation/post/FinalizingFieldAccessPostAggregatorTest.java
@@ -188,64 +188,67 @@ public class FinalizingFieldAccessPostAggregatorTest
@Test
public void testIngestAndQueryWithArithmeticPostAggregator() throws Exception
{
- AggregationTestHelper helper = AggregationTestHelper.createGroupByQueryAggregationTestHelper(
- Collections.singletonList(new AggregatorsModule()),
- GroupByQueryRunnerTest.testConfigs().get(0),
- tempFoler
- );
-
- String metricSpec = "[{\"type\": \"hyperUnique\", \"name\": \"hll_market\", \"fieldName\": \"market\"},"
- + "{\"type\": \"hyperUnique\", \"name\": \"hll_quality\", \"fieldName\": \"quality\"}]";
-
- String parseSpec = "{"
- + "\"type\" : \"string\","
- + "\"parseSpec\" : {"
- + " \"format\" : \"tsv\","
- + " \"timestampSpec\" : {"
- + " \"column\" : \"timestamp\","
- + " \"format\" : \"auto\""
- + "},"
- + " \"dimensionsSpec\" : {"
- + " \"dimensions\": [],"
- + " \"dimensionExclusions\" : [],"
- + " \"spatialDimensions\" : []"
- + " },"
- + " \"columns\": [\"timestamp\", \"market\", \"quality\", \"placement\", \"placementish\", \"index\"]"
- + " }"
- + "}";
-
- String query = "{"
- + "\"queryType\": \"groupBy\","
- + "\"dataSource\": \"test_datasource\","
- + "\"granularity\": \"ALL\","
- + "\"dimensions\": [],"
- + "\"aggregations\": ["
- + " { \"type\": \"hyperUnique\", \"name\": \"hll_market\", \"fieldName\": \"hll_market\" },"
- + " { \"type\": \"hyperUnique\", \"name\": \"hll_quality\", \"fieldName\": \"hll_quality\" }"
- + "],"
- + "\"postAggregations\": ["
- + " { \"type\": \"arithmetic\", \"name\": \"uniq_add\", \"fn\": \"+\", \"fields\":["
- + " { \"type\": \"finalizingFieldAccess\", \"name\": \"uniq_market\", \"fieldName\": \"hll_market\" },"
- + " { \"type\": \"finalizingFieldAccess\", \"name\": \"uniq_quality\", \"fieldName\": \"hll_quality\" }]"
- + " }"
- + "],"
- + "\"intervals\": [ \"1970/2050\" ]"
- + "}";
-
- Sequence seq = helper.createIndexAndRunQueryOnSegment(
- new File(this.getClass().getClassLoader().getResource("druid.sample.tsv").getFile()),
- parseSpec,
- metricSpec,
- 0,
- Granularities.NONE,
- 50000,
- query
- );
-
- MapBasedRow row = (MapBasedRow) seq.toList().get(0);
- Assert.assertEquals(3.0, row.getMetric("hll_market").floatValue(), 0.1);
- Assert.assertEquals(9.0, row.getMetric("hll_quality").floatValue(), 0.1);
- Assert.assertEquals(12.0, row.getMetric("uniq_add").floatValue(), 0.1);
+ try (
+ final AggregationTestHelper helper = AggregationTestHelper.createGroupByQueryAggregationTestHelper(
+ Collections.singletonList(new AggregatorsModule()),
+ GroupByQueryRunnerTest.testConfigs().get(0),
+ tempFoler
+ )
+ ) {
+
+ String metricSpec = "[{\"type\": \"hyperUnique\", \"name\": \"hll_market\", \"fieldName\": \"market\"},"
+ + "{\"type\": \"hyperUnique\", \"name\": \"hll_quality\", \"fieldName\": \"quality\"}]";
+
+ String parseSpec = "{"
+ + "\"type\" : \"string\","
+ + "\"parseSpec\" : {"
+ + " \"format\" : \"tsv\","
+ + " \"timestampSpec\" : {"
+ + " \"column\" : \"timestamp\","
+ + " \"format\" : \"auto\""
+ + "},"
+ + " \"dimensionsSpec\" : {"
+ + " \"dimensions\": [],"
+ + " \"dimensionExclusions\" : [],"
+ + " \"spatialDimensions\" : []"
+ + " },"
+ + " \"columns\": [\"timestamp\", \"market\", \"quality\", \"placement\", \"placementish\", \"index\"]"
+ + " }"
+ + "}";
+
+ String query = "{"
+ + "\"queryType\": \"groupBy\","
+ + "\"dataSource\": \"test_datasource\","
+ + "\"granularity\": \"ALL\","
+ + "\"dimensions\": [],"
+ + "\"aggregations\": ["
+ + " { \"type\": \"hyperUnique\", \"name\": \"hll_market\", \"fieldName\": \"hll_market\" },"
+ + " { \"type\": \"hyperUnique\", \"name\": \"hll_quality\", \"fieldName\": \"hll_quality\" }"
+ + "],"
+ + "\"postAggregations\": ["
+ + " { \"type\": \"arithmetic\", \"name\": \"uniq_add\", \"fn\": \"+\", \"fields\":["
+ + " { \"type\": \"finalizingFieldAccess\", \"name\": \"uniq_market\", \"fieldName\": \"hll_market\" },"
+ + " { \"type\": \"finalizingFieldAccess\", \"name\": \"uniq_quality\", \"fieldName\": \"hll_quality\" }]"
+ + " }"
+ + "],"
+ + "\"intervals\": [ \"1970/2050\" ]"
+ + "}";
+
+ Sequence seq = helper.createIndexAndRunQueryOnSegment(
+ new File(this.getClass().getClassLoader().getResource("druid.sample.tsv").getFile()),
+ parseSpec,
+ metricSpec,
+ 0,
+ Granularities.NONE,
+ 50000,
+ query
+ );
+
+ MapBasedRow row = (MapBasedRow) seq.toList().get(0);
+ Assert.assertEquals(3.0, row.getMetric("hll_market").floatValue(), 0.1);
+ Assert.assertEquals(9.0, row.getMetric("hll_quality").floatValue(), 0.1);
+ Assert.assertEquals(12.0, row.getMetric("uniq_add").floatValue(), 0.1);
+ }
}
@Test
diff --git a/processing/src/test/java/io/druid/query/groupby/GroupByLimitPushDownInsufficientBufferTest.java b/processing/src/test/java/io/druid/query/groupby/GroupByLimitPushDownInsufficientBufferTest.java
index abf4825..11f0cd8 100644
--- a/processing/src/test/java/io/druid/query/groupby/GroupByLimitPushDownInsufficientBufferTest.java
+++ b/processing/src/test/java/io/druid/query/groupby/GroupByLimitPushDownInsufficientBufferTest.java
@@ -30,10 +30,8 @@ import com.google.common.collect.Lists;
import com.google.common.collect.Maps;
import com.google.common.io.Files;
import com.google.common.util.concurrent.ListenableFuture;
-import io.druid.collections.BlockingPool;
-import io.druid.collections.DefaultBlockingPool;
-import io.druid.collections.NonBlockingPool;
-import io.druid.collections.StupidPool;
+import io.druid.collections.CloseableDefaultBlockingPool;
+import io.druid.collections.CloseableStupidPool;
import io.druid.data.input.InputRow;
import io.druid.data.input.MapBasedInputRow;
import io.druid.data.input.Row;
@@ -46,6 +44,7 @@ import io.druid.java.util.common.concurrent.Execs;
import io.druid.java.util.common.granularity.Granularities;
import io.druid.java.util.common.guava.Sequence;
import io.druid.java.util.common.guava.Sequences;
+import io.druid.java.util.common.io.Closer;
import io.druid.java.util.common.logger.Logger;
import io.druid.math.expr.ExprMacroTable;
import io.druid.query.BySegmentQueryRunner;
@@ -97,15 +96,18 @@ import java.util.function.Function;
public class GroupByLimitPushDownInsufficientBufferTest
{
+ public static final ObjectMapper JSON_MAPPER;
+
private static final IndexMergerV9 INDEX_MERGER_V9;
private static final IndexIO INDEX_IO;
- public static final ObjectMapper JSON_MAPPER;
+
private File tmpDir;
private QueryRunnerFactory<Row, GroupByQuery> groupByFactory;
private QueryRunnerFactory<Row, GroupByQuery> tooSmallGroupByFactory;
private List<IncrementalIndex> incrementalIndices = Lists.newArrayList();
private List<QueryableIndex> groupByIndices = Lists.newArrayList();
private ExecutorService executorService;
+ private Closer resourceCloser;
static {
JSON_MAPPER = new DefaultObjectMapper();
@@ -259,6 +261,7 @@ public class GroupByLimitPushDownInsufficientBufferTest
QueryableIndex qindexB = INDEX_IO.loadIndex(fileB);
groupByIndices = Arrays.asList(qindexA, qindexB);
+ resourceCloser = Closer.create();
setupGroupByFactory();
}
@@ -266,7 +269,7 @@ public class GroupByLimitPushDownInsufficientBufferTest
{
executorService = Execs.multiThreaded(3, "GroupByThreadPool[%d]");
- NonBlockingPool<ByteBuffer> bufferPool = new StupidPool<>(
+ final CloseableStupidPool<ByteBuffer> bufferPool = new CloseableStupidPool<>(
"GroupByBenchmark-computeBufferPool",
new OffheapBufferGenerator("compute", 10_000_000),
0,
@@ -274,16 +277,20 @@ public class GroupByLimitPushDownInsufficientBufferTest
);
// limit of 2 is required since we simulate both historical merge and broker merge in the same process
- BlockingPool<ByteBuffer> mergePool = new DefaultBlockingPool<>(
+ final CloseableDefaultBlockingPool<ByteBuffer> mergePool = new CloseableDefaultBlockingPool<>(
new OffheapBufferGenerator("merge", 10_000_000),
2
);
// limit of 2 is required since we simulate both historical merge and broker merge in the same process
- BlockingPool<ByteBuffer> tooSmallMergePool = new DefaultBlockingPool<>(
+ final CloseableDefaultBlockingPool<ByteBuffer> tooSmallMergePool = new CloseableDefaultBlockingPool<>(
new OffheapBufferGenerator("merge", 255),
2
);
+ resourceCloser.register(bufferPool);
+ resourceCloser.register(mergePool);
+ resourceCloser.register(tooSmallMergePool);
+
final GroupByQueryConfig config = new GroupByQueryConfig()
{
@Override
@@ -412,6 +419,8 @@ public class GroupByLimitPushDownInsufficientBufferTest
queryableIndex.close();
}
+ resourceCloser.close();
+
if (tmpDir != null) {
FileUtils.deleteDirectory(tmpDir);
}
diff --git a/processing/src/test/java/io/druid/query/groupby/GroupByLimitPushDownMultiNodeMergeTest.java b/processing/src/test/java/io/druid/query/groupby/GroupByLimitPushDownMultiNodeMergeTest.java
index d6809b5..98dd9e6 100644
--- a/processing/src/test/java/io/druid/query/groupby/GroupByLimitPushDownMultiNodeMergeTest.java
+++ b/processing/src/test/java/io/druid/query/groupby/GroupByLimitPushDownMultiNodeMergeTest.java
@@ -30,10 +30,8 @@ import com.google.common.collect.Lists;
import com.google.common.collect.Maps;
import com.google.common.io.Files;
import com.google.common.util.concurrent.ListenableFuture;
-import io.druid.collections.BlockingPool;
-import io.druid.collections.DefaultBlockingPool;
-import io.druid.collections.NonBlockingPool;
-import io.druid.collections.StupidPool;
+import io.druid.collections.CloseableDefaultBlockingPool;
+import io.druid.collections.CloseableStupidPool;
import io.druid.data.input.InputRow;
import io.druid.data.input.MapBasedInputRow;
import io.druid.data.input.Row;
@@ -47,6 +45,7 @@ import io.druid.java.util.common.granularity.Granularities;
import io.druid.java.util.common.granularity.PeriodGranularity;
import io.druid.java.util.common.guava.Sequence;
import io.druid.java.util.common.guava.Sequences;
+import io.druid.java.util.common.io.Closer;
import io.druid.java.util.common.logger.Logger;
import io.druid.math.expr.ExprMacroTable;
import io.druid.query.BySegmentQueryRunner;
@@ -107,15 +106,18 @@ import java.util.function.Function;
public class GroupByLimitPushDownMultiNodeMergeTest
{
+ public static final ObjectMapper JSON_MAPPER;
+
private static final IndexMergerV9 INDEX_MERGER_V9;
private static final IndexIO INDEX_IO;
- public static final ObjectMapper JSON_MAPPER;
+
private File tmpDir;
private QueryRunnerFactory<Row, GroupByQuery> groupByFactory;
private QueryRunnerFactory<Row, GroupByQuery> groupByFactory2;
private List<IncrementalIndex> incrementalIndices = Lists.newArrayList();
private List<QueryableIndex> groupByIndices = Lists.newArrayList();
private ExecutorService executorService;
+ private Closer resourceCloser;
static {
JSON_MAPPER = new DefaultObjectMapper();
@@ -314,6 +316,7 @@ public class GroupByLimitPushDownMultiNodeMergeTest
QueryableIndex qindexD = INDEX_IO.loadIndex(fileD);
groupByIndices = Arrays.asList(qindexA, qindexB, qindexC, qindexD);
+ resourceCloser = Closer.create();
setupGroupByFactory();
}
@@ -321,7 +324,7 @@ public class GroupByLimitPushDownMultiNodeMergeTest
{
executorService = Execs.multiThreaded(3, "GroupByThreadPool[%d]");
- NonBlockingPool<ByteBuffer> bufferPool = new StupidPool<>(
+ final CloseableStupidPool<ByteBuffer> bufferPool = new CloseableStupidPool<>(
"GroupByBenchmark-computeBufferPool",
new OffheapBufferGenerator("compute", 10_000_000),
0,
@@ -329,16 +332,20 @@ public class GroupByLimitPushDownMultiNodeMergeTest
);
// limit of 2 is required since we simulate both historical merge and broker merge in the same process
- BlockingPool<ByteBuffer> mergePool = new DefaultBlockingPool<>(
+ final CloseableDefaultBlockingPool<ByteBuffer> mergePool = new CloseableDefaultBlockingPool<>(
new OffheapBufferGenerator("merge", 10_000_000),
2
);
// limit of 2 is required since we simulate both historical merge and broker merge in the same process
- BlockingPool<ByteBuffer> mergePool2 = new DefaultBlockingPool<>(
+ final CloseableDefaultBlockingPool<ByteBuffer> mergePool2 = new CloseableDefaultBlockingPool<>(
new OffheapBufferGenerator("merge", 10_000_000),
2
);
+ resourceCloser.register(bufferPool);
+ resourceCloser.register(mergePool);
+ resourceCloser.register(mergePool2);
+
final GroupByQueryConfig config = new GroupByQueryConfig()
{
@Override
@@ -444,6 +451,8 @@ public class GroupByLimitPushDownMultiNodeMergeTest
queryableIndex.close();
}
+ resourceCloser.close();
+
if (tmpDir != null) {
FileUtils.deleteDirectory(tmpDir);
}
diff --git a/processing/src/test/java/io/druid/query/groupby/GroupByMultiSegmentTest.java b/processing/src/test/java/io/druid/query/groupby/GroupByMultiSegmentTest.java
index 6ec5875..37549dc 100644
--- a/processing/src/test/java/io/druid/query/groupby/GroupByMultiSegmentTest.java
+++ b/processing/src/test/java/io/druid/query/groupby/GroupByMultiSegmentTest.java
@@ -28,10 +28,8 @@ import com.google.common.collect.Lists;
import com.google.common.collect.Maps;
import com.google.common.io.Files;
import com.google.common.util.concurrent.ListenableFuture;
-import io.druid.collections.BlockingPool;
-import io.druid.collections.DefaultBlockingPool;
-import io.druid.collections.NonBlockingPool;
-import io.druid.collections.StupidPool;
+import io.druid.collections.CloseableDefaultBlockingPool;
+import io.druid.collections.CloseableStupidPool;
import io.druid.data.input.InputRow;
import io.druid.data.input.MapBasedInputRow;
import io.druid.data.input.Row;
@@ -43,6 +41,7 @@ import io.druid.java.util.common.Intervals;
import io.druid.java.util.common.concurrent.Execs;
import io.druid.java.util.common.granularity.Granularities;
import io.druid.java.util.common.guava.Sequence;
+import io.druid.java.util.common.io.Closer;
import io.druid.java.util.common.logger.Logger;
import io.druid.math.expr.ExprMacroTable;
import io.druid.query.BySegmentQueryRunner;
@@ -93,14 +92,17 @@ import java.util.concurrent.atomic.AtomicLong;
public class GroupByMultiSegmentTest
{
+ public static final ObjectMapper JSON_MAPPER;
+
private static final IndexMergerV9 INDEX_MERGER_V9;
private static final IndexIO INDEX_IO;
- public static final ObjectMapper JSON_MAPPER;
+
private File tmpDir;
private QueryRunnerFactory<Row, GroupByQuery> groupByFactory;
private List<IncrementalIndex> incrementalIndices = Lists.newArrayList();
private List<QueryableIndex> groupByIndices = Lists.newArrayList();
private ExecutorService executorService;
+ private Closer resourceCloser;
static {
JSON_MAPPER = new DefaultObjectMapper();
@@ -200,6 +202,7 @@ public class GroupByMultiSegmentTest
QueryableIndex qindexB = INDEX_IO.loadIndex(fileB);
groupByIndices = Arrays.asList(qindexA, qindexB);
+ resourceCloser = Closer.create();
setupGroupByFactory();
}
@@ -207,7 +210,7 @@ public class GroupByMultiSegmentTest
{
executorService = Execs.multiThreaded(2, "GroupByThreadPool[%d]");
- NonBlockingPool<ByteBuffer> bufferPool = new StupidPool<>(
+ final CloseableStupidPool<ByteBuffer> bufferPool = new CloseableStupidPool<>(
"GroupByBenchmark-computeBufferPool",
new OffheapBufferGenerator("compute", 10_000_000),
0,
@@ -215,10 +218,13 @@ public class GroupByMultiSegmentTest
);
// limit of 2 is required since we simulate both historical merge and broker merge in the same process
- BlockingPool<ByteBuffer> mergePool = new DefaultBlockingPool<>(
+ final CloseableDefaultBlockingPool<ByteBuffer> mergePool = new CloseableDefaultBlockingPool<>(
new OffheapBufferGenerator("merge", 10_000_000),
2
);
+
+ resourceCloser.register(bufferPool);
+ resourceCloser.register(mergePool);
final GroupByQueryConfig config = new GroupByQueryConfig()
{
@Override
@@ -298,6 +304,8 @@ public class GroupByMultiSegmentTest
queryableIndex.close();
}
+ resourceCloser.close();
+
if (tmpDir != null) {
FileUtils.deleteDirectory(tmpDir);
}
diff --git a/processing/src/test/java/io/druid/query/groupby/GroupByQueryMergeBufferTest.java b/processing/src/test/java/io/druid/query/groupby/GroupByQueryMergeBufferTest.java
index 6ec7e8b..2a0e918 100644
--- a/processing/src/test/java/io/druid/query/groupby/GroupByQueryMergeBufferTest.java
+++ b/processing/src/test/java/io/druid/query/groupby/GroupByQueryMergeBufferTest.java
@@ -26,10 +26,9 @@ import com.google.common.collect.ImmutableList;
import com.google.common.collect.ImmutableMap;
import com.google.common.collect.Lists;
import com.google.common.util.concurrent.MoreExecutors;
-import io.druid.collections.DefaultBlockingPool;
-import io.druid.collections.NonBlockingPool;
+import io.druid.collections.CloseableDefaultBlockingPool;
+import io.druid.collections.CloseableStupidPool;
import io.druid.collections.ReferenceCountingResourceHolder;
-import io.druid.collections.StupidPool;
import io.druid.data.input.Row;
import io.druid.java.util.common.granularity.Granularities;
import io.druid.query.DruidProcessingConfig;
@@ -42,6 +41,7 @@ import io.druid.query.dimension.DefaultDimensionSpec;
import io.druid.query.groupby.strategy.GroupByStrategySelector;
import io.druid.query.groupby.strategy.GroupByStrategyV1;
import io.druid.query.groupby.strategy.GroupByStrategyV2;
+import org.junit.AfterClass;
import org.junit.Before;
import org.junit.Test;
import org.junit.runner.RunWith;
@@ -59,7 +59,7 @@ import static org.junit.Assert.assertEquals;
public class GroupByQueryMergeBufferTest
{
private static final long TIMEOUT = 5000;
- private static class TestBlockingPool extends DefaultBlockingPool<ByteBuffer>
+ private static class TestBlockingPool extends CloseableDefaultBlockingPool<ByteBuffer>
{
private int minRemainBufferNum;
@@ -136,17 +136,7 @@ public class GroupByQueryMergeBufferTest
)
{
final Supplier<GroupByQueryConfig> configSupplier = Suppliers.ofInstance(config);
- final NonBlockingPool<ByteBuffer> bufferPool = new StupidPool<>(
- "GroupByQueryEngine-bufferPool",
- new Supplier<ByteBuffer>()
- {
- @Override
- public ByteBuffer get()
- {
- return ByteBuffer.allocateDirect(PROCESSING_CONFIG.intermediateComputeSizeBytes());
- }
- }
- );
+
final GroupByStrategySelector strategySelector = new GroupByStrategySelector(
configSupplier,
new GroupByStrategyV1(
@@ -174,6 +164,18 @@ public class GroupByQueryMergeBufferTest
);
}
+ private static final CloseableStupidPool<ByteBuffer> bufferPool = new CloseableStupidPool<>(
+ "GroupByQueryEngine-bufferPool",
+ new Supplier<ByteBuffer>()
+ {
+ @Override
+ public ByteBuffer get()
+ {
+ return ByteBuffer.allocateDirect(PROCESSING_CONFIG.intermediateComputeSizeBytes());
+ }
+ }
+ );
+
private static final TestBlockingPool mergeBufferPool = new TestBlockingPool(
new Supplier<ByteBuffer>()
{
@@ -200,6 +202,13 @@ public class GroupByQueryMergeBufferTest
private QueryRunner<Row> runner;
+ @AfterClass
+ public static void teardownClass()
+ {
+ bufferPool.close();
+ mergeBufferPool.close();
+ }
+
@Parameters(name = "{0}")
public static Collection<Object[]> constructorFeeder()
{
diff --git a/processing/src/test/java/io/druid/query/groupby/GroupByQueryRunnerFactoryTest.java b/processing/src/test/java/io/druid/query/groupby/GroupByQueryRunnerFactoryTest.java
index 4394003..e54c09e 100644
--- a/processing/src/test/java/io/druid/query/groupby/GroupByQueryRunnerFactoryTest.java
+++ b/processing/src/test/java/io/druid/query/groupby/GroupByQueryRunnerFactoryTest.java
@@ -27,14 +27,15 @@ import io.druid.data.input.impl.CSVParseSpec;
import io.druid.data.input.impl.DimensionsSpec;
import io.druid.data.input.impl.StringInputRowParser;
import io.druid.data.input.impl.TimestampSpec;
+import io.druid.java.util.common.Pair;
import io.druid.java.util.common.granularity.Granularities;
import io.druid.java.util.common.guava.MergeSequence;
import io.druid.java.util.common.guava.Sequence;
import io.druid.java.util.common.guava.Sequences;
+import io.druid.java.util.common.io.Closer;
import io.druid.query.Query;
import io.druid.query.QueryPlus;
import io.druid.query.QueryRunner;
-import io.druid.query.QueryRunnerFactory;
import io.druid.query.aggregation.CountAggregatorFactory;
import io.druid.query.dimension.DefaultDimensionSpec;
import io.druid.query.spec.LegacySegmentSpec;
@@ -43,9 +44,12 @@ import io.druid.segment.IncrementalIndexSegment;
import io.druid.segment.Segment;
import io.druid.segment.TestHelper;
import io.druid.segment.incremental.IncrementalIndex;
+import org.junit.After;
+import org.junit.Before;
import org.junit.Rule;
import org.junit.Test;
+import java.io.IOException;
import java.util.Arrays;
import java.util.List;
import java.util.Map;
@@ -57,6 +61,26 @@ public class GroupByQueryRunnerFactoryTest
@Rule
public CloserRule closerRule = new CloserRule(true);
+ private GroupByQueryRunnerFactory factory;
+ private Closer resourceCloser;
+
+ @Before
+ public void setup()
+ {
+ final Pair<GroupByQueryRunnerFactory, Closer> factoryAndCloser = GroupByQueryRunnerTest.makeQueryRunnerFactory(
+ new GroupByQueryConfig()
+ );
+
+ factory = factoryAndCloser.lhs;
+ resourceCloser = factoryAndCloser.rhs;
+ }
+
+ @After
+ public void teardown() throws IOException
+ {
+ resourceCloser.close();
+ }
+
@Test
public void testMergeRunnersEnsureGroupMerging()
{
@@ -69,8 +93,6 @@ public class GroupByQueryRunnerFactoryTest
.setAggregatorSpecs(new CountAggregatorFactory("count"))
.build();
- final QueryRunnerFactory factory = GroupByQueryRunnerTest.makeQueryRunnerFactory(new GroupByQueryConfig());
-
QueryRunner mergedRunner = factory.getToolchest().mergeResults(
new QueryRunner()
{
diff --git a/processing/src/test/java/io/druid/query/groupby/GroupByQueryRunnerFailureTest.java b/processing/src/test/java/io/druid/query/groupby/GroupByQueryRunnerFailureTest.java
index 09d4074..9e55b58 100644
--- a/processing/src/test/java/io/druid/query/groupby/GroupByQueryRunnerFailureTest.java
+++ b/processing/src/test/java/io/druid/query/groupby/GroupByQueryRunnerFailureTest.java
@@ -26,11 +26,9 @@ import com.google.common.collect.ImmutableList;
import com.google.common.collect.ImmutableMap;
import com.google.common.collect.Lists;
import com.google.common.util.concurrent.MoreExecutors;
-import io.druid.collections.BlockingPool;
-import io.druid.collections.DefaultBlockingPool;
-import io.druid.collections.NonBlockingPool;
+import io.druid.collections.CloseableDefaultBlockingPool;
+import io.druid.collections.CloseableStupidPool;
import io.druid.collections.ReferenceCountingResourceHolder;
-import io.druid.collections.StupidPool;
import io.druid.data.input.Row;
import io.druid.java.util.common.granularity.Granularities;
import io.druid.query.DruidProcessingConfig;
@@ -47,6 +45,7 @@ import io.druid.query.groupby.strategy.GroupByStrategySelector;
import io.druid.query.groupby.strategy.GroupByStrategyV1;
import io.druid.query.groupby.strategy.GroupByStrategyV2;
import org.hamcrest.CoreMatchers;
+import org.junit.AfterClass;
import org.junit.Rule;
import org.junit.Test;
import org.junit.rules.ExpectedException;
@@ -100,17 +99,7 @@ public class GroupByQueryRunnerFailureTest
)
{
final Supplier<GroupByQueryConfig> configSupplier = Suppliers.ofInstance(config);
- final NonBlockingPool<ByteBuffer> bufferPool = new StupidPool<>(
- "GroupByQueryEngine-bufferPool",
- new Supplier<ByteBuffer>()
- {
- @Override
- public ByteBuffer get()
- {
- return ByteBuffer.allocateDirect(DEFAULT_PROCESSING_CONFIG.intermediateComputeSizeBytes());
- }
- }
- );
+
final GroupByStrategySelector strategySelector = new GroupByStrategySelector(
configSupplier,
new GroupByStrategyV1(
@@ -138,7 +127,18 @@ public class GroupByQueryRunnerFailureTest
);
}
- private static final BlockingPool<ByteBuffer> mergeBufferPool = new DefaultBlockingPool<>(
+ private static final CloseableStupidPool<ByteBuffer> bufferPool = new CloseableStupidPool<>(
+ "GroupByQueryEngine-bufferPool",
+ new Supplier<ByteBuffer>()
+ {
+ @Override
+ public ByteBuffer get()
+ {
+ return ByteBuffer.allocateDirect(DEFAULT_PROCESSING_CONFIG.intermediateComputeSizeBytes());
+ }
+ }
+ );
+ private static final CloseableDefaultBlockingPool<ByteBuffer> mergeBufferPool = new CloseableDefaultBlockingPool<>(
new Supplier<ByteBuffer>()
{
@Override
@@ -164,6 +164,13 @@ public class GroupByQueryRunnerFailureTest
private QueryRunner<Row> runner;
+ @AfterClass
+ public static void teardownClass()
+ {
+ bufferPool.close();
+ mergeBufferPool.close();
+ }
+
@Parameters(name = "{0}")
public static Collection<Object[]> constructorFeeder()
{
diff --git a/processing/src/test/java/io/druid/query/groupby/GroupByQueryRunnerTest.java b/processing/src/test/java/io/druid/query/groupby/GroupByQueryRunnerTest.java
index 01e67bb..eedb4df 100644
--- a/processing/src/test/java/io/druid/query/groupby/GroupByQueryRunnerTest.java
+++ b/processing/src/test/java/io/druid/query/groupby/GroupByQueryRunnerTest.java
@@ -31,16 +31,15 @@ import com.google.common.collect.Maps;
import com.google.common.collect.Ordering;
import com.google.common.collect.Sets;
import com.google.common.util.concurrent.MoreExecutors;
-import io.druid.collections.BlockingPool;
-import io.druid.collections.DefaultBlockingPool;
-import io.druid.collections.NonBlockingPool;
-import io.druid.collections.StupidPool;
+import io.druid.collections.CloseableDefaultBlockingPool;
+import io.druid.collections.CloseableStupidPool;
import io.druid.common.config.NullHandling;
import io.druid.data.input.Row;
import io.druid.java.util.common.DateTimes;
import io.druid.java.util.common.IAE;
import io.druid.java.util.common.ISE;
import io.druid.java.util.common.Intervals;
+import io.druid.java.util.common.Pair;
import io.druid.java.util.common.StringUtils;
import io.druid.java.util.common.granularity.DurationGranularity;
import io.druid.java.util.common.granularity.Granularities;
@@ -48,6 +47,7 @@ import io.druid.java.util.common.granularity.PeriodGranularity;
import io.druid.java.util.common.guava.MergeSequence;
import io.druid.java.util.common.guava.Sequence;
import io.druid.java.util.common.guava.Sequences;
+import io.druid.java.util.common.io.Closer;
import io.druid.js.JavaScriptConfig;
import io.druid.query.BySegmentResultValue;
import io.druid.query.BySegmentResultValueClass;
@@ -127,6 +127,7 @@ import io.druid.segment.virtual.ExpressionVirtualColumn;
import org.joda.time.DateTime;
import org.joda.time.DateTimeZone;
import org.joda.time.Period;
+import org.junit.AfterClass;
import org.junit.Assert;
import org.junit.Ignore;
import org.junit.Rule;
@@ -135,6 +136,7 @@ import org.junit.rules.ExpectedException;
import org.junit.runner.RunWith;
import org.junit.runners.Parameterized;
+import java.io.IOException;
import java.nio.ByteBuffer;
import java.util.ArrayList;
import java.util.Arrays;
@@ -152,7 +154,7 @@ import java.util.concurrent.Executors;
public class GroupByQueryRunnerTest
{
public static final ObjectMapper DEFAULT_MAPPER = TestHelper.makeSmileMapper();
- public static final DruidProcessingConfig DEFAULT_PROCESSING_CONFIG = new DruidProcessingConfig()
+ private static final DruidProcessingConfig DEFAULT_PROCESSING_CONFIG = new DruidProcessingConfig()
{
@Override
public String getFormatString()
@@ -181,9 +183,11 @@ public class GroupByQueryRunnerTest
}
};
+ private static final Closer resourceCloser = Closer.create();
+
private final QueryRunner<Row> runner;
- private GroupByQueryRunnerFactory factory;
- private GroupByQueryConfig config;
+ private final GroupByQueryRunnerFactory factory;
+ private final GroupByQueryConfig config;
@Rule
public ExpectedException expectedException = ExpectedException.none();
@@ -324,14 +328,14 @@ public class GroupByQueryRunnerTest
);
}
- public static GroupByQueryRunnerFactory makeQueryRunnerFactory(
+ public static Pair<GroupByQueryRunnerFactory, Closer> makeQueryRunnerFactory(
final GroupByQueryConfig config
)
{
return makeQueryRunnerFactory(DEFAULT_MAPPER, config, DEFAULT_PROCESSING_CONFIG);
}
- public static GroupByQueryRunnerFactory makeQueryRunnerFactory(
+ public static Pair<GroupByQueryRunnerFactory, Closer> makeQueryRunnerFactory(
final ObjectMapper mapper,
final GroupByQueryConfig config
)
@@ -339,14 +343,14 @@ public class GroupByQueryRunnerTest
return makeQueryRunnerFactory(mapper, config, DEFAULT_PROCESSING_CONFIG);
}
- public static GroupByQueryRunnerFactory makeQueryRunnerFactory(
+ public static Pair<GroupByQueryRunnerFactory, Closer> makeQueryRunnerFactory(
final ObjectMapper mapper,
final GroupByQueryConfig config,
final DruidProcessingConfig processingConfig
)
{
final Supplier<GroupByQueryConfig> configSupplier = Suppliers.ofInstance(config);
- final NonBlockingPool<ByteBuffer> bufferPool = new StupidPool<>(
+ final CloseableStupidPool<ByteBuffer> bufferPool = new CloseableStupidPool<>(
"GroupByQueryEngine-bufferPool",
new Supplier<ByteBuffer>()
{
@@ -357,7 +361,7 @@ public class GroupByQueryRunnerTest
}
}
);
- final BlockingPool<ByteBuffer> mergeBufferPool = new DefaultBlockingPool<>(
+ final CloseableDefaultBlockingPool<ByteBuffer> mergeBufferPool = new CloseableDefaultBlockingPool<>(
new Supplier<ByteBuffer>()
{
@Override
@@ -389,9 +393,15 @@ public class GroupByQueryRunnerTest
strategySelector,
QueryRunnerTestHelper.sameThreadIntervalChunkingQueryRunnerDecorator()
);
- return new GroupByQueryRunnerFactory(
- strategySelector,
- toolChest
+ final Closer closer = Closer.create();
+ closer.register(bufferPool);
+ closer.register(mergeBufferPool);
+ return Pair.of(
+ new GroupByQueryRunnerFactory(
+ strategySelector,
+ toolChest
+ ),
+ closer
);
}
@@ -400,7 +410,9 @@ public class GroupByQueryRunnerTest
{
final List<Object[]> constructors = Lists.newArrayList();
for (GroupByQueryConfig config : testConfigs()) {
- final GroupByQueryRunnerFactory factory = makeQueryRunnerFactory(config);
+ final Pair<GroupByQueryRunnerFactory, Closer> factoryAndCloser = makeQueryRunnerFactory(config);
+ final GroupByQueryRunnerFactory factory = factoryAndCloser.lhs;
+ resourceCloser.register(factoryAndCloser.rhs);
for (QueryRunner<Row> runner : QueryRunnerTestHelper.makeQueryRunners(factory)) {
final String testName = StringUtils.format(
"config=%s, runner=%s",
@@ -414,8 +426,17 @@ public class GroupByQueryRunnerTest
return constructors;
}
+ @AfterClass
+ public static void teardown() throws IOException
+ {
+ resourceCloser.close();
+ }
+
public GroupByQueryRunnerTest(
- String testName, GroupByQueryConfig config, GroupByQueryRunnerFactory factory, QueryRunner runner
+ String testName,
+ GroupByQueryConfig config,
+ GroupByQueryRunnerFactory factory,
+ QueryRunner runner
)
{
this.config = config;
@@ -661,6 +682,9 @@ public class GroupByQueryRunnerTest
@Test
public void testGroupByWithSortDimsFirst()
{
+ if (config.getDefaultStrategy().equals(GroupByStrategySelector.STRATEGY_V1)) {
+ return;
+ }
GroupByQuery query = GroupByQuery
.builder()
.setDataSource(QueryRunnerTestHelper.dataSource)
@@ -668,7 +692,7 @@ public class GroupByQueryRunnerTest
.setDimensions(new DefaultDimensionSpec("quality", "alias"))
.setAggregatorSpecs(QueryRunnerTestHelper.rowsCount, new LongSumAggregatorFactory("idx", "index"))
.setGranularity(QueryRunnerTestHelper.dayGran)
- .setContext(ImmutableMap.of("sortByDimsFirst", true, "groupByStrategy", "v2"))
+ .setContext(ImmutableMap.of("sortByDimsFirst", true))
.build();
List<Row> expectedResults = Arrays.asList(
@@ -7526,7 +7550,6 @@ public class GroupByQueryRunnerTest
);
Iterable<Row> results = GroupByQueryRunnerTestHelper.runQuery(factory, runner, query);
- System.out.println(results);
TestHelper.assertExpectedObjects(expectedResults, results, "");
}
diff --git a/processing/src/test/java/io/druid/query/groupby/GroupByTimeseriesQueryRunnerTest.java b/processing/src/test/java/io/druid/query/groupby/GroupByTimeseriesQueryRunnerTest.java
index 157a91a..f41b026 100644
--- a/processing/src/test/java/io/druid/query/groupby/GroupByTimeseriesQueryRunnerTest.java
+++ b/processing/src/test/java/io/druid/query/groupby/GroupByTimeseriesQueryRunnerTest.java
@@ -26,10 +26,12 @@ import com.google.common.util.concurrent.MoreExecutors;
import io.druid.data.input.MapBasedRow;
import io.druid.data.input.Row;
import io.druid.java.util.common.DateTimes;
+import io.druid.java.util.common.Pair;
import io.druid.java.util.common.StringUtils;
import io.druid.java.util.common.granularity.Granularities;
import io.druid.java.util.common.guava.Sequence;
import io.druid.java.util.common.guava.Sequences;
+import io.druid.java.util.common.io.Closer;
import io.druid.query.Druids;
import io.druid.query.FinalizeResultsQueryRunner;
import io.druid.query.QueryPlus;
@@ -43,12 +45,13 @@ import io.druid.query.timeseries.TimeseriesQuery;
import io.druid.query.timeseries.TimeseriesQueryRunnerTest;
import io.druid.query.timeseries.TimeseriesResultValue;
import org.joda.time.DateTime;
+import org.junit.AfterClass;
import org.junit.Assert;
import org.junit.Test;
import org.junit.runner.RunWith;
import org.junit.runners.Parameterized;
-import javax.annotation.Nullable;
+import java.io.IOException;
import java.util.Map;
/**
@@ -56,20 +59,30 @@ import java.util.Map;
@RunWith(Parameterized.class)
public class GroupByTimeseriesQueryRunnerTest extends TimeseriesQueryRunnerTest
{
+ private static final Closer resourceCloser = Closer.create();
+
+ @AfterClass
+ public static void teardown() throws IOException
+ {
+ resourceCloser.close();
+ }
+
@SuppressWarnings("unchecked")
@Parameterized.Parameters(name = "{0}")
public static Iterable<Object[]> constructorFeeder()
{
GroupByQueryConfig config = new GroupByQueryConfig();
config.setMaxIntermediateRows(10000);
-
- final GroupByQueryRunnerFactory factory = GroupByQueryRunnerTest.makeQueryRunnerFactory(config);
+ final Pair<GroupByQueryRunnerFactory, Closer> factoryAndCloser = GroupByQueryRunnerTest.makeQueryRunnerFactory(
+ config
+ );
+ final GroupByQueryRunnerFactory factory = factoryAndCloser.lhs;
+ resourceCloser.register(factoryAndCloser.rhs);
return QueryRunnerTestHelper.transformToConstructionFeeder(
Lists.transform(
QueryRunnerTestHelper.makeQueryRunners(factory),
new Function<QueryRunner<Row>, Object>()
{
- @Nullable
@Override
public Object apply(final QueryRunner<Row> input)
{
@@ -110,7 +123,7 @@ public class GroupByTimeseriesQueryRunnerTest extends TimeseriesQueryRunnerTest
{
MapBasedRow row = (MapBasedRow) input;
- return new Result<TimeseriesResultValue>(
+ return new Result<>(
row.getTimestamp(), new TimeseriesResultValue(row.getEvent())
);
}
diff --git a/processing/src/test/java/io/druid/query/topn/TopNQueryQueryToolChestTest.java b/processing/src/test/java/io/druid/query/topn/TopNQueryQueryToolChestTest.java
index 5222d05..2847c89 100644
--- a/processing/src/test/java/io/druid/query/topn/TopNQueryQueryToolChestTest.java
+++ b/processing/src/test/java/io/druid/query/topn/TopNQueryQueryToolChestTest.java
@@ -23,6 +23,7 @@ import com.fasterxml.jackson.databind.ObjectMapper;
import com.google.common.collect.ImmutableList;
import com.google.common.collect.ImmutableMap;
import com.google.common.collect.Maps;
+import io.druid.collections.CloseableStupidPool;
import io.druid.java.util.common.DateTimes;
import io.druid.java.util.common.Intervals;
import io.druid.java.util.common.granularity.Granularities;
@@ -50,6 +51,7 @@ import org.junit.Assert;
import org.junit.Test;
import java.io.IOException;
+import java.nio.ByteBuffer;
import java.util.Arrays;
import java.util.Collections;
import java.util.Map;
@@ -69,11 +71,6 @@ public class TopNQueryQueryToolChestTest
}
@Test
- public void testCacheStrategyWithFloatDimension() throws Exception
- {
- }
-
- @Test
public void testComputeCacheKeyWithDifferentPostAgg()
{
final TopNQuery query1 = new TopNQuery(
@@ -140,46 +137,48 @@ public class TopNQueryQueryToolChestTest
config,
QueryRunnerTestHelper.NoopIntervalChunkingQueryRunnerDecorator()
);
- QueryRunnerFactory factory = new TopNQueryRunnerFactory(
- TestQueryRunners.getPool(),
- chest,
- QueryRunnerTestHelper.NOOP_QUERYWATCHER
- );
- QueryRunner<Result<TopNResultValue>> runner = QueryRunnerTestHelper.makeQueryRunner(
- factory,
- new IncrementalIndexSegment(TestIndex.getIncrementalTestIndex(), segmentId),
- null
- );
-
- Map<String, Object> context = Maps.newHashMap();
- context.put("minTopNThreshold", 500);
-
- TopNQueryBuilder builder = new TopNQueryBuilder()
- .dataSource(QueryRunnerTestHelper.dataSource)
- .granularity(QueryRunnerTestHelper.allGran)
- .dimension(QueryRunnerTestHelper.placementishDimension)
- .metric(QueryRunnerTestHelper.indexMetric)
- .intervals(QueryRunnerTestHelper.fullOnInterval)
- .aggregators(QueryRunnerTestHelper.commonDoubleAggregators);
-
- TopNQuery query1 = builder.threshold(10).context(null).build();
- MockQueryRunner mockRunner = new MockQueryRunner(runner);
- new TopNQueryQueryToolChest.ThresholdAdjustingQueryRunner(mockRunner, config).run(
- QueryPlus.wrap(query1),
- ImmutableMap.of()
- );
- Assert.assertEquals(1000, mockRunner.query.getThreshold());
-
- TopNQuery query2 = builder.threshold(10).context(context).build();
-
- new TopNQueryQueryToolChest.ThresholdAdjustingQueryRunner(mockRunner, config)
- .run(QueryPlus.wrap(query2), ImmutableMap.of());
- Assert.assertEquals(500, mockRunner.query.getThreshold());
-
- TopNQuery query3 = builder.threshold(2000).context(context).build();
- new TopNQueryQueryToolChest.ThresholdAdjustingQueryRunner(mockRunner, config)
- .run(QueryPlus.wrap(query3), ImmutableMap.of());
- Assert.assertEquals(2000, mockRunner.query.getThreshold());
+ try (CloseableStupidPool<ByteBuffer> pool = TestQueryRunners.createDefaultNonBlockingPool()) {
+ QueryRunnerFactory factory = new TopNQueryRunnerFactory(
+ pool,
+ chest,
+ QueryRunnerTestHelper.NOOP_QUERYWATCHER
+ );
+ QueryRunner<Result<TopNResultValue>> runner = QueryRunnerTestHelper.makeQueryRunner(
+ factory,
+ new IncrementalIndexSegment(TestIndex.getIncrementalTestIndex(), segmentId),
+ null
+ );
+
+ Map<String, Object> context = Maps.newHashMap();
+ context.put("minTopNThreshold", 500);
+
+ TopNQueryBuilder builder = new TopNQueryBuilder()
+ .dataSource(QueryRunnerTestHelper.dataSource)
+ .granularity(QueryRunnerTestHelper.allGran)
+ .dimension(QueryRunnerTestHelper.placementishDimension)
+ .metric(QueryRunnerTestHelper.indexMetric)
+ .intervals(QueryRunnerTestHelper.fullOnInterval)
+ .aggregators(QueryRunnerTestHelper.commonDoubleAggregators);
+
+ TopNQuery query1 = builder.threshold(10).context(null).build();
+ MockQueryRunner mockRunner = new MockQueryRunner(runner);
+ new TopNQueryQueryToolChest.ThresholdAdjustingQueryRunner(mockRunner, config).run(
+ QueryPlus.wrap(query1),
+ ImmutableMap.of()
+ );
+ Assert.assertEquals(1000, mockRunner.query.getThreshold());
+
+ TopNQuery query2 = builder.threshold(10).context(context).build();
+
+ new TopNQueryQueryToolChest.ThresholdAdjustingQueryRunner(mockRunner, config)
+ .run(QueryPlus.wrap(query2), ImmutableMap.of());
+ Assert.assertEquals(500, mockRunner.query.getThreshold());
+
+ TopNQuery query3 = builder.threshold(2000).context(context).build();
+ new TopNQueryQueryToolChest.ThresholdAdjustingQueryRunner(mockRunner, config)
+ .run(QueryPlus.wrap(query3), ImmutableMap.of());
+ Assert.assertEquals(2000, mockRunner.query.getThreshold());
+ }
}
private void doTestCacheStrategy(final ValueType valueType, final Object dimValue) throws IOException
diff --git a/processing/src/test/java/io/druid/query/topn/TopNQueryRunnerTest.java b/processing/src/test/java/io/druid/query/topn/TopNQueryRunnerTest.java
index 2f7aee6..e347b95 100644
--- a/processing/src/test/java/io/druid/query/topn/TopNQueryRunnerTest.java
+++ b/processing/src/test/java/io/druid/query/topn/TopNQueryRunnerTest.java
@@ -28,7 +28,7 @@ import com.google.common.collect.Maps;
import com.google.common.collect.Sets;
import com.google.common.primitives.Doubles;
import com.google.common.primitives.Longs;
-import io.druid.collections.StupidPool;
+import io.druid.collections.CloseableStupidPool;
import io.druid.common.config.NullHandling;
import io.druid.java.util.common.DateTimes;
import io.druid.java.util.common.IAE;
@@ -37,6 +37,7 @@ import io.druid.java.util.common.Intervals;
import io.druid.java.util.common.Pair;
import io.druid.java.util.common.granularity.Granularities;
import io.druid.java.util.common.guava.Sequence;
+import io.druid.java.util.common.io.Closer;
import io.druid.js.JavaScriptConfig;
import io.druid.math.expr.ExprMacroTable;
import io.druid.query.BySegmentResultValue;
@@ -90,6 +91,7 @@ import io.druid.segment.TestHelper;
import io.druid.segment.column.Column;
import io.druid.segment.column.ValueType;
import io.druid.segment.virtual.ExpressionVirtualColumn;
+import org.junit.AfterClass;
import org.junit.Assert;
import org.junit.Rule;
import org.junit.Test;
@@ -97,6 +99,7 @@ import org.junit.rules.ExpectedException;
import org.junit.runner.RunWith;
import org.junit.runners.Parameterized;
+import java.io.IOException;
import java.nio.ByteBuffer;
import java.util.ArrayList;
import java.util.Arrays;
@@ -111,6 +114,14 @@ import java.util.stream.Collectors;
@RunWith(Parameterized.class)
public class TopNQueryRunnerTest
{
+ private static final Closer resourceCloser = Closer.create();
+
+ @AfterClass
+ public static void teardown() throws IOException
+ {
+ resourceCloser.close();
+ }
+
@Parameterized.Parameters(name = "{0}")
public static Iterable<Object[]> constructorFeeder()
{
@@ -137,11 +148,17 @@ public class TopNQueryRunnerTest
public static List<QueryRunner<Result<TopNResultValue>>> queryRunners()
{
+ final CloseableStupidPool<ByteBuffer> defaultPool = TestQueryRunners.createDefaultNonBlockingPool();
+ final CloseableStupidPool<ByteBuffer> customPool = new CloseableStupidPool<>(
+ "TopNQueryRunnerFactory-bufferPool",
+ () -> ByteBuffer.allocate(20000)
+ );
+
List<QueryRunner<Result<TopNResultValue>>> retVal = Lists.newArrayList();
retVal.addAll(
QueryRunnerTestHelper.makeQueryRunners(
new TopNQueryRunnerFactory(
- TestQueryRunners.getPool(),
+ defaultPool,
new TopNQueryQueryToolChest(
new TopNQueryConfig(),
QueryRunnerTestHelper.sameThreadIntervalChunkingQueryRunnerDecorator()
@@ -153,10 +170,7 @@ public class TopNQueryRunnerTest
retVal.addAll(
QueryRunnerTestHelper.makeQueryRunners(
new TopNQueryRunnerFactory(
- new StupidPool<ByteBuffer>(
- "TopNQueryRunnerFactory-bufferPool",
- () -> ByteBuffer.allocate(20000)
- ),
+ customPool,
new TopNQueryQueryToolChest(
new TopNQueryConfig(),
QueryRunnerTestHelper.sameThreadIntervalChunkingQueryRunnerDecorator()
diff --git a/processing/src/test/java/io/druid/query/topn/TopNUnionQueryTest.java b/processing/src/test/java/io/druid/query/topn/TopNUnionQueryTest.java
index c150745..b760d54 100644
--- a/processing/src/test/java/io/druid/query/topn/TopNUnionQueryTest.java
+++ b/processing/src/test/java/io/druid/query/topn/TopNUnionQueryTest.java
@@ -22,8 +22,9 @@ package io.druid.query.topn;
import com.google.common.collect.ImmutableMap;
import com.google.common.collect.Iterables;
import com.google.common.collect.Lists;
-import io.druid.collections.StupidPool;
+import io.druid.collections.CloseableStupidPool;
import io.druid.java.util.common.DateTimes;
+import io.druid.java.util.common.io.Closer;
import io.druid.query.QueryPlus;
import io.druid.query.QueryRunner;
import io.druid.query.QueryRunnerTestHelper;
@@ -32,10 +33,12 @@ import io.druid.query.TestQueryRunners;
import io.druid.query.aggregation.DoubleMaxAggregatorFactory;
import io.druid.query.aggregation.DoubleMinAggregatorFactory;
import io.druid.segment.TestHelper;
+import org.junit.AfterClass;
import org.junit.Test;
import org.junit.runner.RunWith;
import org.junit.runners.Parameterized;
+import java.io.IOException;
import java.nio.ByteBuffer;
import java.util.Arrays;
import java.util.Collections;
@@ -46,23 +49,28 @@ import java.util.Map;
@RunWith(Parameterized.class)
public class TopNUnionQueryTest
{
- private final QueryRunner runner;
+ private static final Closer resourceCloser = Closer.create();
- public TopNUnionQueryTest(
- QueryRunner runner
- )
+ @AfterClass
+ public static void teardown() throws IOException
{
- this.runner = runner;
+ resourceCloser.close();
}
@Parameterized.Parameters(name = "{0}")
public static Iterable<Object[]> constructorFeeder()
{
+ final CloseableStupidPool<ByteBuffer> defaultPool = TestQueryRunners.createDefaultNonBlockingPool();
+ final CloseableStupidPool<ByteBuffer> customPool = new CloseableStupidPool<>(
+ "TopNQueryRunnerFactory-bufferPool",
+ () -> ByteBuffer.allocate(2000)
+ );
+
return QueryRunnerTestHelper.cartesian(
Iterables.concat(
QueryRunnerTestHelper.makeUnionQueryRunners(
new TopNQueryRunnerFactory(
- TestQueryRunners.getPool(),
+ defaultPool,
new TopNQueryQueryToolChest(
new TopNQueryConfig(),
QueryRunnerTestHelper.NoopIntervalChunkingQueryRunnerDecorator()
@@ -72,10 +80,7 @@ public class TopNUnionQueryTest
),
QueryRunnerTestHelper.makeUnionQueryRunners(
new TopNQueryRunnerFactory(
- new StupidPool<ByteBuffer>(
- "TopNQueryRunnerFactory-bufferPool",
- () -> ByteBuffer.allocate(2000)
- ),
+ customPool,
new TopNQueryQueryToolChest(
new TopNQueryConfig(),
QueryRunnerTestHelper.NoopIntervalChunkingQueryRunnerDecorator()
@@ -87,6 +92,15 @@ public class TopNUnionQueryTest
);
}
+ private final QueryRunner runner;
+
+ public TopNUnionQueryTest(
+ QueryRunner runner
+ )
+ {
+ this.runner = runner;
+ }
+
@Test
public void testTopNUnionQuery()
{
diff --git a/processing/src/test/java/io/druid/segment/AppendTest.java b/processing/src/test/java/io/druid/segment/AppendTest.java
index 15a9133..193c28e 100644
--- a/processing/src/test/java/io/druid/segment/AppendTest.java
+++ b/processing/src/test/java/io/druid/segment/AppendTest.java
@@ -22,6 +22,7 @@ package io.druid.segment;
import com.google.common.collect.ImmutableMap;
import com.google.common.collect.Iterables;
import com.google.common.collect.Lists;
+import io.druid.collections.CloseableStupidPool;
import io.druid.java.util.common.DateTimes;
import io.druid.java.util.common.Intervals;
import io.druid.java.util.common.Pair;
@@ -63,6 +64,7 @@ import org.junit.Before;
import org.junit.Ignore;
import org.junit.Test;
+import java.nio.ByteBuffer;
import java.util.Arrays;
import java.util.Collections;
import java.util.HashMap;
@@ -355,9 +357,11 @@ public class AppendTest
);
TopNQuery query = makeTopNQuery();
- QueryRunner runner = TestQueryRunners.makeTopNQueryRunner(segment);
- HashMap<String, Object> context = new HashMap<String, Object>();
- TestHelper.assertExpectedResults(expectedResults, runner.run(QueryPlus.wrap(query), context));
+ try (CloseableStupidPool<ByteBuffer> pool = TestQueryRunners.createDefaultNonBlockingPool()) {
+ QueryRunner runner = TestQueryRunners.makeTopNQueryRunner(segment, pool);
+ HashMap<String, Object> context = new HashMap<String, Object>();
+ TestHelper.assertExpectedResults(expectedResults, runner.run(QueryPlus.wrap(query), context));
+ }
}
@Test
@@ -401,9 +405,11 @@ public class AppendTest
);
TopNQuery query = makeTopNQuery();
- QueryRunner runner = TestQueryRunners.makeTopNQueryRunner(segment2);
- HashMap<String, Object> context = new HashMap<String, Object>();
- TestHelper.assertExpectedResults(expectedResults, runner.run(QueryPlus.wrap(query), context));
+ try (CloseableStupidPool<ByteBuffer> pool = TestQueryRunners.createDefaultNonBlockingPool()) {
+ QueryRunner runner = TestQueryRunners.makeTopNQueryRunner(segment2, pool);
+ HashMap<String, Object> context = new HashMap<String, Object>();
+ TestHelper.assertExpectedResults(expectedResults, runner.run(QueryPlus.wrap(query), context));
+ }
}
@Test
@@ -429,9 +435,11 @@ public class AppendTest
);
TopNQuery query = makeFilteredTopNQuery();
- QueryRunner runner = TestQueryRunners.makeTopNQueryRunner(segment);
- HashMap<String, Object> context = new HashMap<String, Object>();
- TestHelper.assertExpectedResults(expectedResults, runner.run(QueryPlus.wrap(query), context));
+ try (CloseableStupidPool<ByteBuffer> pool = TestQueryRunners.createDefaultNonBlockingPool()) {
+ QueryRunner runner = TestQueryRunners.makeTopNQueryRunner(segment, pool);
+ HashMap<String, Object> context = new HashMap<String, Object>();
+ TestHelper.assertExpectedResults(expectedResults, runner.run(QueryPlus.wrap(query), context));
+ }
}
@Test
@@ -447,9 +455,11 @@ public class AppendTest
);
TopNQuery query = makeFilteredTopNQuery();
- QueryRunner runner = TestQueryRunners.makeTopNQueryRunner(segment2);
- HashMap<String, Object> context = new HashMap<String, Object>();
- TestHelper.assertExpectedResults(expectedResults, runner.run(QueryPlus.wrap(query), context));
+ try (CloseableStupidPool<ByteBuffer> pool = TestQueryRunners.createDefaultNonBlockingPool()) {
+ QueryRunner runner = TestQueryRunners.makeTopNQueryRunner(segment2, pool);
+ HashMap<String, Object> context = new HashMap<String, Object>();
+ TestHelper.assertExpectedResults(expectedResults, runner.run(QueryPlus.wrap(query), context));
+ }
}
@Test
diff --git a/processing/src/test/java/io/druid/segment/SchemalessTestSimpleTest.java b/processing/src/test/java/io/druid/segment/SchemalessTestSimpleTest.java
index 09f92a4..e8718a2 100644
--- a/processing/src/test/java/io/druid/segment/SchemalessTestSimpleTest.java
+++ b/processing/src/test/java/io/druid/segment/SchemalessTestSimpleTest.java
@@ -22,6 +22,7 @@ package io.druid.segment;
import com.google.common.collect.ImmutableMap;
import com.google.common.collect.Iterables;
import com.google.common.collect.Lists;
+import io.druid.collections.CloseableStupidPool;
import io.druid.common.config.NullHandling;
import io.druid.java.util.common.DateTimes;
import io.druid.java.util.common.Intervals;
@@ -60,6 +61,7 @@ import org.junit.Test;
import org.junit.runner.RunWith;
import org.junit.runners.Parameterized;
+import java.nio.ByteBuffer;
import java.util.ArrayList;
import java.util.Arrays;
import java.util.Collection;
@@ -232,9 +234,11 @@ public class SchemalessTestSimpleTest
)
);
- QueryRunner runner = TestQueryRunners.makeTopNQueryRunner(segment);
- HashMap<String, Object> context = new HashMap<String, Object>();
- TestHelper.assertExpectedResults(expectedResults, runner.run(QueryPlus.wrap(query), context));
+ try (CloseableStupidPool<ByteBuffer> pool = TestQueryRunners.createDefaultNonBlockingPool()) {
+ QueryRunner runner = TestQueryRunners.makeTopNQueryRunner(segment, pool);
+ HashMap<String, Object> context = new HashMap<String, Object>();
+ TestHelper.assertExpectedResults(expectedResults, runner.run(QueryPlus.wrap(query), context));
+ }
}
@Test
diff --git a/processing/src/test/java/io/druid/segment/data/IncrementalIndexTest.java b/processing/src/test/java/io/druid/segment/data/IncrementalIndexTest.java
index 0ee0d07..34bd1ce 100644
--- a/processing/src/test/java/io/druid/segment/data/IncrementalIndexTest.java
+++ b/processing/src/test/java/io/druid/segment/data/IncrementalIndexTest.java
@@ -19,7 +19,6 @@
package io.druid.segment.data;
-import com.google.common.base.Supplier;
import com.google.common.base.Throwables;
import com.google.common.collect.ImmutableList;
import com.google.common.collect.ImmutableMap;
@@ -30,7 +29,7 @@ import com.google.common.util.concurrent.ListenableFuture;
import com.google.common.util.concurrent.ListeningExecutorService;
import com.google.common.util.concurrent.MoreExecutors;
import com.google.common.util.concurrent.ThreadFactoryBuilder;
-import io.druid.collections.StupidPool;
+import io.druid.collections.CloseableStupidPool;
import io.druid.data.input.MapBasedInputRow;
import io.druid.data.input.Row;
import io.druid.data.input.impl.DimensionsSpec;
@@ -39,6 +38,7 @@ import io.druid.java.util.common.StringUtils;
import io.druid.java.util.common.granularity.Granularities;
import io.druid.java.util.common.guava.Accumulator;
import io.druid.java.util.common.guava.Sequence;
+import io.druid.java.util.common.io.Closer;
import io.druid.query.Druids;
import io.druid.query.FinalizeResultsQueryRunner;
import io.druid.query.QueryPlus;
@@ -64,15 +64,18 @@ import io.druid.segment.CloserRule;
import io.druid.segment.IncrementalIndexSegment;
import io.druid.segment.Segment;
import io.druid.segment.incremental.IncrementalIndex;
+import io.druid.segment.incremental.IncrementalIndex.Builder;
import io.druid.segment.incremental.IncrementalIndexSchema;
import io.druid.segment.incremental.IndexSizeExceededException;
import org.joda.time.Interval;
+import org.junit.AfterClass;
import org.junit.Assert;
import org.junit.Rule;
import org.junit.Test;
import org.junit.runner.RunWith;
import org.junit.runners.Parameterized;
+import java.io.IOException;
import java.nio.ByteBuffer;
import java.util.ArrayList;
import java.util.Arrays;
@@ -99,14 +102,20 @@ public class IncrementalIndexTest
IncrementalIndex createIndex(AggregatorFactory[] aggregatorFactories);
}
+ private static final Closer resourceCloser = Closer.create();
+
+ @AfterClass
+ public static void teardown() throws IOException
+ {
+ resourceCloser.close();
+ }
+
private final IndexCreator indexCreator;
@Rule
- public final CloserRule closer = new CloserRule(false);
+ public final CloserRule closerRule = new CloserRule(false);
- public IncrementalIndexTest(
- IndexCreator indexCreator
- )
+ public IncrementalIndexTest(IndexCreator indexCreator)
{
this.indexCreator = indexCreator;
}
@@ -114,86 +123,42 @@ public class IncrementalIndexTest
@Parameterized.Parameters
public static Collection<?> constructorFeeder()
{
- return Arrays.asList(
- new Object[][]{
- {
- new IndexCreator()
- {
- @Override
- public IncrementalIndex createIndex(AggregatorFactory[] factories)
- {
- return IncrementalIndexTest.createIndex(factories);
- }
- }
- },
- {
- new IndexCreator()
- {
- @Override
- public IncrementalIndex createIndex(AggregatorFactory[] factories)
- {
- return new IncrementalIndex.Builder()
- .setSimpleTestingIndexSchema(factories)
- .setMaxRowCount(1000000)
- .buildOffheap(
- new StupidPool<ByteBuffer>(
- "OffheapIncrementalIndex-bufferPool",
- new Supplier<ByteBuffer>()
- {
- @Override
- public ByteBuffer get()
- {
- return ByteBuffer.allocate(256 * 1024);
- }
- }
- )
- );
- }
- }
- },
- {
- new IndexCreator()
- {
- @Override
- public IncrementalIndex createIndex(AggregatorFactory[] factories)
- {
- return IncrementalIndexTest.createNoRollupIndex(factories);
- }
- }
- },
- {
- new IndexCreator()
- {
- @Override
- public IncrementalIndex createIndex(AggregatorFactory[] factories)
- {
- return new IncrementalIndex.Builder()
- .setIndexSchema(
- new IncrementalIndexSchema.Builder()
- .withMetrics(factories)
- .withRollup(false)
- .build()
- )
- .setMaxRowCount(1000000)
- .buildOffheap(
- new StupidPool<ByteBuffer>(
- "OffheapIncrementalIndex-bufferPool",
- new Supplier<ByteBuffer>()
- {
- @Override
- public ByteBuffer get()
- {
- return ByteBuffer.allocate(256 * 1024);
- }
- }
- )
- );
- }
- }
- }
-
+ final List<Object[]> params = new ArrayList<>();
+ params.add(new Object[] {(IndexCreator) IncrementalIndexTest::createIndex});
+ final CloseableStupidPool<ByteBuffer> pool1 = new CloseableStupidPool<>(
+ "OffheapIncrementalIndex-bufferPool",
+ () -> ByteBuffer.allocate(256 * 1024)
+ );
+ resourceCloser.register(pool1);
+ params.add(
+ new Object[] {
+ (IndexCreator) factories -> new Builder()
+ .setSimpleTestingIndexSchema(factories)
+ .setMaxRowCount(1000000)
+ .buildOffheap(pool1)
}
);
+ params.add(new Object[] {(IndexCreator) IncrementalIndexTest::createNoRollupIndex});
+ final CloseableStupidPool<ByteBuffer> pool2 = new CloseableStupidPool<>(
+ "OffheapIncrementalIndex-bufferPool",
+ () -> ByteBuffer.allocate(256 * 1024)
+ );
+ resourceCloser.register(pool2);
+ params.add(
+ new Object[] {
+ (IndexCreator) factories -> new Builder()
+ .setIndexSchema(
+ new IncrementalIndexSchema.Builder()
+ .withMetrics(factories)
+ .withRollup(false)
+ .build()
+ )
+ .setMaxRowCount(1000000)
+ .buildOffheap(pool2)
+ }
+ );
+
+ return params;
}
public static AggregatorFactory[] getDefaultCombiningAggregatorFactories()
@@ -302,7 +267,7 @@ public class IncrementalIndexTest
public void testCaseSensitivity() throws Exception
{
long timestamp = System.currentTimeMillis();
- IncrementalIndex index = closer.closeLater(indexCreator.createIndex(defaultAggregatorFactories));
+ IncrementalIndex index = closerRule.closeLater(indexCreator.createIndex(defaultAggregatorFactories));
populateIndex(timestamp, index);
Assert.assertEquals(Arrays.asList("dim1", "dim2"), index.getDimensionNames());
@@ -324,7 +289,7 @@ public class IncrementalIndexTest
public void testFilteredAggregators() throws Exception
{
long timestamp = System.currentTimeMillis();
- IncrementalIndex index = closer.closeLater(
+ IncrementalIndex index = closerRule.closeLater(
indexCreator.createIndex(new AggregatorFactory[]{
new CountAggregatorFactory("count"),
new FilteredAggregatorFactory(
@@ -420,7 +385,7 @@ public class IncrementalIndexTest
);
}
- final IncrementalIndex index = closer.closeLater(
+ final IncrementalIndex index = closerRule.closeLater(
indexCreator.createIndex(
ingestAggregatorFactories.toArray(
new AggregatorFactory[0]
@@ -535,7 +500,7 @@ public class IncrementalIndexTest
}
- final IncrementalIndex index = closer.closeLater(
+ final IncrementalIndex index = closerRule.closeLater(
indexCreator.createIndex(ingestAggregatorFactories.toArray(new AggregatorFactory[0]))
);
final int concurrentThreads = 2;
@@ -717,7 +682,7 @@ public class IncrementalIndexTest
@Test
public void testConcurrentAdd() throws Exception
{
- final IncrementalIndex index = closer.closeLater(indexCreator.createIndex(defaultAggregatorFactories));
+ final IncrementalIndex index = closerRule.closeLater(indexCreator.createIndex(defaultAggregatorFactories));
final int threadCount = 10;
final int elementsPerThread = 200;
final int dimensionCount = 5;
@@ -778,7 +743,7 @@ public class IncrementalIndexTest
)
.setMaxRowCount(1000000)
.buildOnheap();
- closer.closeLater(incrementalIndex);
+ closerRule.closeLater(incrementalIndex);
Assert.assertEquals(Arrays.asList("dim0", "dim1"), incrementalIndex.getDimensionNames());
}
@@ -790,7 +755,7 @@ public class IncrementalIndexTest
.setSimpleTestingIndexSchema(/* empty */)
.setMaxRowCount(10)
.buildOnheap();
- closer.closeLater(index);
+ closerRule.closeLater(index);
index.add(
new MapBasedInputRow(
1481871600000L,
diff --git a/processing/src/test/java/io/druid/segment/incremental/IncrementalIndexStorageAdapterTest.java b/processing/src/test/java/io/druid/segment/incremental/IncrementalIndexStorageAdapterTest.java
index 69e493d..0a1a9c2 100644
--- a/processing/src/test/java/io/druid/segment/incremental/IncrementalIndexStorageAdapterTest.java
+++ b/processing/src/test/java/io/druid/segment/incremental/IncrementalIndexStorageAdapterTest.java
@@ -19,12 +19,11 @@
package io.druid.segment.incremental;
-import com.google.common.base.Supplier;
import com.google.common.base.Suppliers;
import com.google.common.collect.ImmutableMap;
import com.google.common.collect.Iterables;
import com.google.common.collect.Lists;
-import io.druid.collections.StupidPool;
+import io.druid.collections.CloseableStupidPool;
import io.druid.common.config.NullHandling;
import io.druid.data.input.MapBasedInputRow;
import io.druid.data.input.MapBasedRow;
@@ -127,29 +126,49 @@ public class IncrementalIndexStorageAdapterTest
)
);
- GroupByQueryEngine engine = makeGroupByQueryEngine();
-
- final Sequence<Row> rows = engine.process(
- GroupByQuery.builder()
- .setDataSource("test")
- .setGranularity(Granularities.ALL)
- .setInterval(new Interval(DateTimes.EPOCH, DateTimes.nowUtc()))
- .addDimension("billy")
- .addDimension("sally")
- .addAggregator(new LongSumAggregatorFactory("cnt", "cnt"))
- .build(),
- new IncrementalIndexStorageAdapter(index)
- );
- final List<Row> results = rows.toList();
+ try (
+ CloseableStupidPool<ByteBuffer> pool = new CloseableStupidPool<>(
+ "GroupByQueryEngine-bufferPool",
+ () -> ByteBuffer.allocate(50000)
+ )
+ ) {
+ final GroupByQueryEngine engine = new GroupByQueryEngine(
+ Suppliers.ofInstance(
+ new GroupByQueryConfig()
+ {
+ @Override
+ public int getMaxIntermediateRows()
+ {
+ return 5;
+ }
+ }
+ ),
+ pool
+ );
- Assert.assertEquals(2, results.size());
+ final Sequence<Row> rows = engine.process(
+ GroupByQuery.builder()
+ .setDataSource("test")
+ .setGranularity(Granularities.ALL)
+ .setInterval(new Interval(DateTimes.EPOCH, DateTimes.nowUtc()))
+ .addDimension("billy")
+ .addDimension("sally")
+ .addAggregator(new LongSumAggregatorFactory("cnt", "cnt"))
+ .build(),
+ new IncrementalIndexStorageAdapter(index)
+ );
+
+ final List<Row> results = rows.toList();
- MapBasedRow row = (MapBasedRow) results.get(0);
- Assert.assertEquals(ImmutableMap.of("sally", "bo", "cnt", 1L), row.getEvent());
+ Assert.assertEquals(2, results.size());
- row = (MapBasedRow) results.get(1);
- Assert.assertEquals(ImmutableMap.of("billy", "hi", "cnt", 1L), row.getEvent());
+ MapBasedRow row = (MapBasedRow) results.get(0);
+ Assert.assertEquals(ImmutableMap.of("sally", "bo", "cnt", 1L), row.getEvent());
+
+ row = (MapBasedRow) results.get(1);
+ Assert.assertEquals(ImmutableMap.of("billy", "hi", "cnt", 1L), row.getEvent());
+ }
}
@Test
@@ -174,68 +193,63 @@ public class IncrementalIndexStorageAdapterTest
)
);
- GroupByQueryEngine engine = makeGroupByQueryEngine();
-
- final Sequence<Row> rows = engine.process(
- GroupByQuery.builder()
- .setDataSource("test")
- .setGranularity(Granularities.ALL)
- .setInterval(new Interval(DateTimes.EPOCH, DateTimes.nowUtc()))
- .addDimension("billy")
- .addDimension("sally")
- .addAggregator(
- new LongSumAggregatorFactory("cnt", "cnt")
- )
- .addAggregator(
- new JavaScriptAggregatorFactory(
- "fieldLength",
- Arrays.asList("sally", "billy"),
- "function(current, s, b) { return current + (s == null ? 0 : s.length) + (b == null ? 0 : b.length); }",
- "function() { return 0; }",
- "function(a,b) { return a + b; }",
- JavaScriptConfig.getEnabledInstance()
- )
- )
- .build(),
- new IncrementalIndexStorageAdapter(index)
- );
+ try (
+ CloseableStupidPool<ByteBuffer> pool = new CloseableStupidPool<>(
+ "GroupByQueryEngine-bufferPool",
+ () -> ByteBuffer.allocate(50000)
+ )
+ ) {
+ final GroupByQueryEngine engine = new GroupByQueryEngine(
+ Suppliers.ofInstance(
+ new GroupByQueryConfig()
+ {
+ @Override
+ public int getMaxIntermediateRows()
+ {
+ return 5;
+ }
+ }
+ ),
+ pool
+ );
- final List<Row> results = rows.toList();
+ final Sequence<Row> rows = engine.process(
+ GroupByQuery.builder()
+ .setDataSource("test")
+ .setGranularity(Granularities.ALL)
+ .setInterval(new Interval(DateTimes.EPOCH, DateTimes.nowUtc()))
+ .addDimension("billy")
+ .addDimension("sally")
+ .addAggregator(
+ new LongSumAggregatorFactory("cnt", "cnt")
+ )
+ .addAggregator(
+ new JavaScriptAggregatorFactory(
+ "fieldLength",
+ Arrays.asList("sally", "billy"),
+ "function(current, s, b) { return current + (s == null ? 0 : s.length) + (b == null ? 0 : b.length); }",
+ "function() { return 0; }",
+ "function(a,b) { return a + b; }",
+ JavaScriptConfig.getEnabledInstance()
+ )
+ )
+ .build(),
+ new IncrementalIndexStorageAdapter(index)
+ );
- Assert.assertEquals(2, results.size());
+ final List<Row> results = rows.toList();
- MapBasedRow row = (MapBasedRow) results.get(0);
- Assert.assertEquals(ImmutableMap.of("billy", "hi", "cnt", 1L, "fieldLength", 2.0), row.getEvent());
+ Assert.assertEquals(2, results.size());
- row = (MapBasedRow) results.get(1);
- Assert.assertEquals(ImmutableMap.of("billy", "hip", "sally", "hop", "cnt", 1L, "fieldLength", 6.0), row.getEvent());
- }
+ MapBasedRow row = (MapBasedRow) results.get(0);
+ Assert.assertEquals(ImmutableMap.of("billy", "hi", "cnt", 1L, "fieldLength", 2.0), row.getEvent());
- private static GroupByQueryEngine makeGroupByQueryEngine()
- {
- return new GroupByQueryEngine(
- Suppliers.ofInstance(
- new GroupByQueryConfig()
- {
- @Override
- public int getMaxIntermediateRows()
- {
- return 5;
- }
- }
- ),
- new StupidPool(
- "GroupByQueryEngine-bufferPool",
- new Supplier<ByteBuffer>()
- {
- @Override
- public ByteBuffer get()
- {
- return ByteBuffer.allocate(50000);
- }
- }
- )
- );
+ row = (MapBasedRow) results.get(1);
+ Assert.assertEquals(
+ ImmutableMap.of("billy", "hip", "sally", "hop", "cnt", 1L, "fieldLength", 6.0),
+ row.getEvent()
+ );
+ }
}
@Test
@@ -312,38 +326,33 @@ public class IncrementalIndexStorageAdapterTest
)
);
- TopNQueryEngine engine = new TopNQueryEngine(
- new StupidPool<ByteBuffer>(
+ try (
+ CloseableStupidPool<ByteBuffer> pool = new CloseableStupidPool<>(
"TopNQueryEngine-bufferPool",
- new Supplier<ByteBuffer>()
- {
- @Override
- public ByteBuffer get()
- {
- return ByteBuffer.allocate(50000);
- }
- }
- )
- );
-
- final Iterable<Result<TopNResultValue>> results = engine
- .query(
- new TopNQueryBuilder()
- .dataSource("test")
- .granularity(Granularities.ALL)
- .intervals(Collections.singletonList(new Interval(DateTimes.EPOCH, DateTimes.nowUtc())))
- .dimension("sally")
- .metric("cnt")
- .threshold(10)
- .aggregators(Collections.singletonList(new LongSumAggregatorFactory("cnt", "cnt")))
- .build(),
- new IncrementalIndexStorageAdapter(index),
- null
+ () -> ByteBuffer.allocate(50000)
)
- .toList();
+ ) {
+ TopNQueryEngine engine = new TopNQueryEngine(pool);
+
+ final Iterable<Result<TopNResultValue>> results = engine
+ .query(
+ new TopNQueryBuilder()
+ .dataSource("test")
+ .granularity(Granularities.ALL)
+ .intervals(Collections.singletonList(new Interval(DateTimes.EPOCH, DateTimes.nowUtc())))
+ .dimension("sally")
+ .metric("cnt")
+ .threshold(10)
+ .aggregators(Collections.singletonList(new LongSumAggregatorFactory("cnt", "cnt")))
+ .build(),
+ new IncrementalIndexStorageAdapter(index),
+ null
+ )
+ .toList();
- Assert.assertEquals(1, Iterables.size(results));
- Assert.assertEquals(1, results.iterator().next().getValue().getValue().size());
+ Assert.assertEquals(1, Iterables.size(results));
+ Assert.assertEquals(1, results.iterator().next().getValue().getValue().size());
+ }
}
@Test
@@ -365,27 +374,46 @@ public class IncrementalIndexStorageAdapterTest
)
);
- GroupByQueryEngine engine = makeGroupByQueryEngine();
-
- final Sequence<Row> rows = engine.process(
- GroupByQuery.builder()
- .setDataSource("test")
- .setGranularity(Granularities.ALL)
- .setInterval(new Interval(DateTimes.EPOCH, DateTimes.nowUtc()))
- .addDimension("billy")
- .addDimension("sally")
- .addAggregator(new LongSumAggregatorFactory("cnt", "cnt"))
- .setDimFilter(DimFilters.dimEquals("sally", (String) null))
- .build(),
- new IncrementalIndexStorageAdapter(index)
- );
+ try (
+ CloseableStupidPool<ByteBuffer> pool = new CloseableStupidPool<>(
+ "GroupByQueryEngine-bufferPool",
+ () -> ByteBuffer.allocate(50000)
+ )
+ ) {
+ final GroupByQueryEngine engine = new GroupByQueryEngine(
+ Suppliers.ofInstance(
+ new GroupByQueryConfig()
+ {
+ @Override
+ public int getMaxIntermediateRows()
+ {
+ return 5;
+ }
+ }
+ ),
+ pool
+ );
- final List<Row> results = rows.toList();
+ final Sequence<Row> rows = engine.process(
+ GroupByQuery.builder()
+ .setDataSource("test")
+ .setGranularity(Granularities.ALL)
+ .setInterval(new Interval(DateTimes.EPOCH, DateTimes.nowUtc()))
+ .addDimension("billy")
+ .addDimension("sally")
+ .addAggregator(new LongSumAggregatorFactory("cnt", "cnt"))
+ .setDimFilter(DimFilters.dimEquals("sally", (String) null))
+ .build(),
+ new IncrementalIndexStorageAdapter(index)
+ );
+
+ final List<Row> results = rows.toList();
- Assert.assertEquals(1, results.size());
+ Assert.assertEquals(1, results.size());
- MapBasedRow row = (MapBasedRow) results.get(0);
- Assert.assertEquals(ImmutableMap.of("billy", "hi", "cnt", 1L), row.getEvent());
+ MapBasedRow row = (MapBasedRow) results.get(0);
+ Assert.assertEquals(ImmutableMap.of("billy", "hi", "cnt", 1L), row.getEvent());
+ }
}
@Test
diff --git a/processing/src/test/java/io/druid/segment/incremental/IncrementalIndexTest.java b/processing/src/test/java/io/druid/segment/incremental/IncrementalIndexTest.java
index 854c3af..4ac492e 100644
--- a/processing/src/test/java/io/druid/segment/incremental/IncrementalIndexTest.java
+++ b/processing/src/test/java/io/druid/segment/incremental/IncrementalIndexTest.java
@@ -19,11 +19,10 @@
package io.druid.segment.incremental;
-import com.google.common.base.Supplier;
import com.google.common.collect.ImmutableList;
import com.google.common.collect.ImmutableMap;
import com.google.common.collect.Lists;
-import io.druid.collections.StupidPool;
+import io.druid.collections.CloseableStupidPool;
import io.druid.data.input.MapBasedInputRow;
import io.druid.data.input.impl.DimensionsSpec;
import io.druid.data.input.impl.DoubleDimensionSchema;
@@ -32,12 +31,14 @@ import io.druid.data.input.impl.LongDimensionSchema;
import io.druid.data.input.impl.StringDimensionSchema;
import io.druid.java.util.common.ISE;
import io.druid.java.util.common.granularity.Granularities;
+import io.druid.java.util.common.io.Closer;
import io.druid.java.util.common.parsers.ParseException;
import io.druid.query.aggregation.AggregatorFactory;
import io.druid.query.aggregation.CountAggregatorFactory;
import io.druid.query.aggregation.FilteredAggregatorFactory;
import io.druid.query.filter.SelectorDimFilter;
import io.druid.segment.CloserRule;
+import org.junit.After;
import org.junit.Assert;
import org.junit.Rule;
import org.junit.Test;
@@ -45,6 +46,7 @@ import org.junit.rules.ExpectedException;
import org.junit.runner.RunWith;
import org.junit.runners.Parameterized;
+import java.io.IOException;
import java.nio.ByteBuffer;
import java.util.Arrays;
import java.util.Collection;
@@ -65,13 +67,21 @@ public class IncrementalIndexTest
public ExpectedException expectedException = ExpectedException.none();
@Rule
- public final CloserRule closer = new CloserRule(false);
+ public final CloserRule closerRule = new CloserRule(false);
private final IndexCreator indexCreator;
+ private final Closer resourceCloser;
- public IncrementalIndexTest(IndexCreator IndexCreator)
+ @After
+ public void teardown() throws IOException
+ {
+ resourceCloser.close();
+ }
+
+ public IncrementalIndexTest(IndexCreator IndexCreator, Closer resourceCloser)
{
this.indexCreator = IndexCreator;
+ this.resourceCloser = resourceCloser;
}
@Parameterized.Parameters
@@ -113,9 +123,16 @@ public class IncrementalIndexTest
.setMaxRowCount(1000)
.buildOnheap();
}
- }
+ },
+ Closer.create()
}
);
+ final Closer poolCloser = Closer.create();
+ final CloseableStupidPool<ByteBuffer> stupidPool = new CloseableStupidPool<>(
+ "OffheapIncrementalIndex-bufferPool",
+ () -> ByteBuffer.allocate(256 * 1024)
+ );
+ poolCloser.register(stupidPool);
constructors.add(
new Object[]{
new IndexCreator()
@@ -127,21 +144,10 @@ public class IncrementalIndexTest
.setIndexSchema(schema)
.setSortFacts(sortFacts)
.setMaxRowCount(1000000)
- .buildOffheap(
- new StupidPool<ByteBuffer>(
- "OffheapIncrementalIndex-bufferPool",
- new Supplier<ByteBuffer>()
- {
- @Override
- public ByteBuffer get()
- {
- return ByteBuffer.allocate(256 * 1024);
- }
- }
- )
- );
+ .buildOffheap(stupidPool);
}
- }
+ },
+ poolCloser
}
);
}
@@ -152,7 +158,7 @@ public class IncrementalIndexTest
@Test(expected = ISE.class)
public void testDuplicateDimensions() throws IndexSizeExceededException
{
- IncrementalIndex index = closer.closeLater(indexCreator.createIndex());
+ IncrementalIndex index = closerRule.closeLater(indexCreator.createIndex());
index.add(
new MapBasedInputRow(
System.currentTimeMillis() - 1,
@@ -172,7 +178,7 @@ public class IncrementalIndexTest
@Test(expected = ISE.class)
public void testDuplicateDimensionsFirstOccurrence() throws IndexSizeExceededException
{
- IncrementalIndex index = closer.closeLater(indexCreator.createIndex());
+ IncrementalIndex index = closerRule.closeLater(indexCreator.createIndex());
index.add(
new MapBasedInputRow(
System.currentTimeMillis() - 1,
@@ -185,7 +191,7 @@ public class IncrementalIndexTest
@Test
public void controlTest() throws IndexSizeExceededException
{
- IncrementalIndex index = closer.closeLater(indexCreator.createIndex());
+ IncrementalIndex index = closerRule.closeLater(indexCreator.createIndex());
index.add(
new MapBasedInputRow(
System.currentTimeMillis() - 1,
@@ -212,7 +218,7 @@ public class IncrementalIndexTest
@Test
public void testUnparseableNumerics() throws IndexSizeExceededException
{
- IncrementalIndex<?> index = closer.closeLater(indexCreator.createIndex());
+ IncrementalIndex<?> index = closerRule.closeLater(indexCreator.createIndex());
IncrementalIndexAddResult result;
result = index.add(
@@ -278,7 +284,7 @@ public class IncrementalIndexTest
Lists.newArrayList("billy", "joe"),
ImmutableMap.of("billy", "A", "joe", "B")
);
- IncrementalIndex index = closer.closeLater(indexCreator.createIndex());
+ IncrementalIndex index = closerRule.closeLater(indexCreator.createIndex());
index.add(row);
index.add(row);
index.add(row);
diff --git a/server/pom.xml b/server/pom.xml
index f81a6b8..2253c3c 100644
--- a/server/pom.xml
+++ b/server/pom.xml
@@ -211,6 +211,13 @@
</dependency>
<dependency>
<groupId>io.druid</groupId>
+ <artifactId>druid-common</artifactId>
+ <version>${project.parent.version}</version>
+ <type>test-jar</type>
+ <scope>test</scope>
+ </dependency>
+ <dependency>
+ <groupId>io.druid</groupId>
<artifactId>druid-processing</artifactId>
<version>${project.parent.version}</version>
<type>test-jar</type>
diff --git a/server/src/test/java/io/druid/client/CachingClusteredClientFunctionalityTest.java b/server/src/test/java/io/druid/client/CachingClusteredClientFunctionalityTest.java
index 67ac01c..228bb95 100644
--- a/server/src/test/java/io/druid/client/CachingClusteredClientFunctionalityTest.java
+++ b/server/src/test/java/io/druid/client/CachingClusteredClientFunctionalityTest.java
@@ -18,12 +18,15 @@
*/
package io.druid.client;
+import com.fasterxml.jackson.databind.ObjectMapper;
+import com.google.common.base.Supplier;
+import com.google.common.base.Suppliers;
import com.google.common.collect.ImmutableMap;
import com.google.common.collect.Lists;
import com.google.common.collect.Ordering;
import io.druid.client.cache.Cache;
-import io.druid.client.cache.CachePopulator;
import io.druid.client.cache.CacheConfig;
+import io.druid.client.cache.CachePopulator;
import io.druid.client.cache.CachePopulatorStats;
import io.druid.client.cache.ForegroundCachePopulator;
import io.druid.client.cache.MapCache;
@@ -31,13 +34,17 @@ import io.druid.client.selector.QueryableDruidServer;
import io.druid.client.selector.ServerSelector;
import io.druid.client.selector.TierSelectorStrategy;
import io.druid.java.util.common.Intervals;
+import io.druid.java.util.common.Pair;
import io.druid.java.util.common.guava.Sequence;
+import io.druid.java.util.common.io.Closer;
import io.druid.query.DataSource;
import io.druid.query.Druids;
import io.druid.query.Query;
import io.druid.query.QueryPlus;
import io.druid.query.QueryRunner;
+import io.druid.query.QueryToolChestWarehouse;
import io.druid.query.aggregation.CountAggregatorFactory;
+import io.druid.query.select.SelectQueryConfig;
import io.druid.server.coordination.ServerType;
import io.druid.timeline.DataSegment;
import io.druid.timeline.VersionedIntervalTimeline;
@@ -46,10 +53,12 @@ import io.druid.timeline.partition.SingleElementPartitionChunk;
import it.unimi.dsi.fastutil.ints.Int2ObjectRBTreeMap;
import org.easymock.EasyMock;
import org.joda.time.Interval;
+import org.junit.AfterClass;
import org.junit.Assert;
import org.junit.Before;
import org.junit.Test;
+import java.io.IOException;
import java.util.Collections;
import java.util.Comparator;
import java.util.HashMap;
@@ -62,12 +71,25 @@ import java.util.concurrent.Executor;
*/
public class CachingClusteredClientFunctionalityTest
{
-
- public CachingClusteredClient client;
-
- protected VersionedIntervalTimeline<String, ServerSelector> timeline;
- protected TimelineServerView serverView;
- protected Cache cache;
+ private static final ObjectMapper OBJECT_MAPPER = CachingClusteredClientTestUtils.createObjectMapper();
+ private static final Supplier<SelectQueryConfig> SELECT_CONFIG_SUPPLIER = Suppliers.ofInstance(
+ new SelectQueryConfig(true)
+ );
+ private static final Pair<QueryToolChestWarehouse, Closer> WAREHOUSE_AND_CLOSER = CachingClusteredClientTestUtils
+ .createWarehouse(OBJECT_MAPPER, SELECT_CONFIG_SUPPLIER);
+ private static final QueryToolChestWarehouse WAREHOUSE = WAREHOUSE_AND_CLOSER.lhs;
+ private static final Closer RESOURCE_CLOSER = WAREHOUSE_AND_CLOSER.rhs;
+
+ private CachingClusteredClient client;
+ private VersionedIntervalTimeline<String, ServerSelector> timeline;
+ private TimelineServerView serverView;
+ private Cache cache;
+
+ @AfterClass
+ public static void tearDownClass() throws IOException
+ {
+ RESOURCE_CLOSER.close();
+ }
@Before
public void setUp()
@@ -76,7 +98,7 @@ public class CachingClusteredClientFunctionalityTest
serverView = EasyMock.createNiceMock(TimelineServerView.class);
cache = MapCache.create(100000);
client = makeClient(
- new ForegroundCachePopulator(CachingClusteredClientTest.jsonMapper, new CachePopulatorStats(), -1)
+ new ForegroundCachePopulator(OBJECT_MAPPER, new CachePopulatorStats(), -1)
);
}
@@ -214,7 +236,7 @@ public class CachingClusteredClientFunctionalityTest
)
{
return new CachingClusteredClient(
- CachingClusteredClientTest.WAREHOUSE,
+ WAREHOUSE,
new TimelineServerView()
{
@Override
@@ -247,7 +269,7 @@ public class CachingClusteredClientFunctionalityTest
}
},
cache,
- CachingClusteredClientTest.jsonMapper,
+ OBJECT_MAPPER,
cachePopulator,
new CacheConfig()
{
diff --git a/server/src/test/java/io/druid/client/CachingClusteredClientTest.java b/server/src/test/java/io/druid/client/CachingClusteredClientTest.java
index 6143dad..3e509ae 100644
--- a/server/src/test/java/io/druid/client/CachingClusteredClientTest.java
+++ b/server/src/test/java/io/druid/client/CachingClusteredClientTest.java
@@ -20,8 +20,8 @@
package io.druid.client;
import com.fasterxml.jackson.annotation.JsonProperty;
+import com.fasterxml.jackson.databind.ObjectMapper;
import com.fasterxml.jackson.databind.annotation.JsonSerialize;
-import com.fasterxml.jackson.dataformat.smile.SmileFactory;
import com.google.common.base.Function;
import com.google.common.base.Preconditions;
import com.google.common.base.Supplier;
@@ -57,7 +57,6 @@ import io.druid.client.selector.ServerSelector;
import io.druid.data.input.MapBasedRow;
import io.druid.data.input.Row;
import io.druid.hll.HyperLogLogCollector;
-import io.druid.jackson.DefaultObjectMapper;
import io.druid.java.util.common.DateTimes;
import io.druid.java.util.common.ISE;
import io.druid.java.util.common.Intervals;
@@ -72,16 +71,15 @@ import io.druid.java.util.common.guava.MergeIterable;
import io.druid.java.util.common.guava.Sequence;
import io.druid.java.util.common.guava.Sequences;
import io.druid.java.util.common.guava.nary.TrinaryFn;
+import io.druid.java.util.common.io.Closer;
import io.druid.query.BySegmentResultValueClass;
import io.druid.query.DataSource;
import io.druid.query.Druids;
import io.druid.query.FinalizeResultsQueryRunner;
-import io.druid.query.MapQueryToolChestWarehouse;
import io.druid.query.Query;
import io.druid.query.QueryPlus;
import io.druid.query.QueryRunner;
import io.druid.query.QueryRunnerTestHelper;
-import io.druid.query.QueryToolChest;
import io.druid.query.QueryToolChestWarehouse;
import io.druid.query.Result;
import io.druid.query.SegmentDescriptor;
@@ -101,8 +99,6 @@ import io.druid.query.filter.InDimFilter;
import io.druid.query.filter.OrDimFilter;
import io.druid.query.filter.SelectorDimFilter;
import io.druid.query.groupby.GroupByQuery;
-import io.druid.query.groupby.GroupByQueryConfig;
-import io.druid.query.groupby.GroupByQueryRunnerTest;
import io.druid.query.groupby.strategy.GroupByStrategySelector;
import io.druid.query.ordering.StringComparators;
import io.druid.query.search.SearchHit;
@@ -119,7 +115,6 @@ import io.druid.query.select.SelectResultValue;
import io.druid.query.spec.MultipleIntervalSegmentSpec;
import io.druid.query.spec.MultipleSpecificSegmentSpec;
import io.druid.query.timeboundary.TimeBoundaryQuery;
-import io.druid.query.timeboundary.TimeBoundaryQueryQueryToolChest;
import io.druid.query.timeboundary.TimeBoundaryResultValue;
import io.druid.query.timeseries.TimeseriesQuery;
import io.druid.query.timeseries.TimeseriesQueryQueryToolChest;
@@ -145,6 +140,7 @@ import org.joda.time.DateTime;
import org.joda.time.DateTimeZone;
import org.joda.time.Interval;
import org.joda.time.Period;
+import org.junit.AfterClass;
import org.junit.Assert;
import org.junit.Before;
import org.junit.Test;
@@ -152,6 +148,7 @@ import org.junit.runner.RunWith;
import org.junit.runners.Parameterized;
import javax.annotation.Nullable;
+import java.io.IOException;
import java.nio.charset.StandardCharsets;
import java.util.ArrayList;
import java.util.Arrays;
@@ -172,19 +169,15 @@ import java.util.concurrent.Executor;
@RunWith(Parameterized.class)
public class CachingClusteredClientTest
{
- public static final ImmutableMap<String, Object> CONTEXT = ImmutableMap.of(
+ private static final ImmutableMap<String, Object> CONTEXT = ImmutableMap.of(
"finalize", false,
// GroupBy v2 won't cache on the broker, so test with v1.
"groupByStrategy", GroupByStrategySelector.STRATEGY_V1
);
- public static final MultipleIntervalSegmentSpec SEG_SPEC = new MultipleIntervalSegmentSpec(ImmutableList.of());
- public static final String DATA_SOURCE = "test";
- static final DefaultObjectMapper jsonMapper = new DefaultObjectMapper(new SmileFactory());
-
- static {
- jsonMapper.getFactory().setCodec(jsonMapper);
- }
+ private static final MultipleIntervalSegmentSpec SEG_SPEC = new MultipleIntervalSegmentSpec(ImmutableList.of());
+ private static final String DATA_SOURCE = "test";
+ private static final ObjectMapper JSON_MAPPER = CachingClusteredClientTestUtils.createObjectMapper();
/**
* We want a deterministic test, but we'd also like a bit of randomness for the distribution of segments
@@ -260,52 +253,22 @@ public class CachingClusteredClientTest
private static final DateTimeZone TIMEZONE = DateTimes.inferTzfromString("America/Los_Angeles");
private static final Granularity PT1H_TZ_GRANULARITY = new PeriodGranularity(new Period("PT1H"), null, TIMEZONE);
private static final String TOP_DIM = "a_dim";
-
- private static final Supplier<SelectQueryConfig> selectConfigSupplier = Suppliers.ofInstance(new SelectQueryConfig(true));
-
- static final QueryToolChestWarehouse WAREHOUSE = new MapQueryToolChestWarehouse(
- ImmutableMap.<Class<? extends Query>, QueryToolChest>builder()
- .put(
- TimeseriesQuery.class,
- new TimeseriesQueryQueryToolChest(
- QueryRunnerTestHelper.NoopIntervalChunkingQueryRunnerDecorator()
- )
- )
- .put(
- TopNQuery.class, new TopNQueryQueryToolChest(
- new TopNQueryConfig(),
- QueryRunnerTestHelper.NoopIntervalChunkingQueryRunnerDecorator()
- )
- )
- .put(
- SearchQuery.class, new SearchQueryQueryToolChest(
- new SearchQueryConfig(),
- QueryRunnerTestHelper.NoopIntervalChunkingQueryRunnerDecorator()
- )
- )
- .put(
- SelectQuery.class,
- new SelectQueryQueryToolChest(
- jsonMapper,
- QueryRunnerTestHelper.NoopIntervalChunkingQueryRunnerDecorator(),
- selectConfigSupplier
- )
- )
- .put(
- GroupByQuery.class,
- GroupByQueryRunnerTest.makeQueryRunnerFactory(new GroupByQueryConfig()).getToolchest()
- )
- .put(TimeBoundaryQuery.class, new TimeBoundaryQueryQueryToolChest())
- .build()
+ private static final Supplier<SelectQueryConfig> SELECT_CONFIG_SUPPLIER = Suppliers.ofInstance(
+ new SelectQueryConfig(true)
);
+ private static final Pair<QueryToolChestWarehouse, Closer> WAREHOUSE_AND_CLOSER = CachingClusteredClientTestUtils
+ .createWarehouse(JSON_MAPPER, SELECT_CONFIG_SUPPLIER);
+ private static final QueryToolChestWarehouse WAREHOUSE = WAREHOUSE_AND_CLOSER.lhs;
+ private static final Closer RESOURCE_CLOSER = WAREHOUSE_AND_CLOSER.rhs;
+
private final Random random;
- public CachingClusteredClient client;
- private Runnable queryCompletedCallback;
- protected VersionedIntervalTimeline<String, ServerSelector> timeline;
- protected TimelineServerView serverView;
- protected Cache cache;
- DruidServer[] servers;
+ private CachingClusteredClient client;
+ private Runnable queryCompletedCallback;
+ private TimelineServerView serverView;
+ private VersionedIntervalTimeline<String, ServerSelector> timeline;
+ private Cache cache;
+ private DruidServer[] servers;
public CachingClusteredClientTest(int randomSeed)
{
@@ -328,13 +291,19 @@ public class CachingClusteredClientTest
);
}
+ @AfterClass
+ public static void tearDownClass() throws IOException
+ {
+ RESOURCE_CLOSER.close();
+ }
+
@Before
public void setUp()
{
timeline = new VersionedIntervalTimeline<>(Ordering.natural());
serverView = EasyMock.createNiceMock(TimelineServerView.class);
cache = MapCache.create(100000);
- client = makeClient(new ForegroundCachePopulator(jsonMapper, new CachePopulatorStats(), -1));
+ client = makeClient(new ForegroundCachePopulator(JSON_MAPPER, new CachePopulatorStats(), -1));
servers = new DruidServer[]{
new DruidServer("test1", "test1", null, 10, ServerType.HISTORICAL, "bye", 0),
@@ -429,7 +398,7 @@ public class CachingClusteredClientTest
client = makeClient(
new BackgroundCachePopulator(
randomizingExecutorService,
- jsonMapper,
+ JSON_MAPPER,
new CachePopulatorStats(),
-1
)
@@ -590,7 +559,7 @@ public class CachingClusteredClientTest
.andReturn(ImmutableMap.of())
.once();
EasyMock.replay(cache);
- client = makeClient(new ForegroundCachePopulator(jsonMapper, new CachePopulatorStats(), -1), cache, limit);
+ client = makeClient(new ForegroundCachePopulator(JSON_MAPPER, new CachePopulatorStats(), -1), cache, limit);
final DruidServer lastServer = servers[random.nextInt(servers.length)];
final DataSegment dataSegment = EasyMock.createNiceMock(DataSegment.class);
EasyMock.expect(dataSegment.getIdentifier()).andReturn(DATA_SOURCE).anyTimes();
@@ -615,7 +584,7 @@ public class CachingClusteredClientTest
.andReturn(ImmutableMap.of())
.once();
EasyMock.replay(cache);
- client = makeClient(new ForegroundCachePopulator(jsonMapper, new CachePopulatorStats(), -1), cache, 0);
+ client = makeClient(new ForegroundCachePopulator(JSON_MAPPER, new CachePopulatorStats(), -1), cache, 0);
getDefaultQueryRunner().run(QueryPlus.wrap(query), context);
EasyMock.verify(cache);
EasyMock.verify(dataSegment);
@@ -1337,9 +1306,9 @@ public class CachingClusteredClientTest
QueryRunner runner = new FinalizeResultsQueryRunner(
getDefaultQueryRunner(),
new SelectQueryQueryToolChest(
- jsonMapper,
+ JSON_MAPPER,
QueryRunnerTestHelper.NoopIntervalChunkingQueryRunnerDecorator(),
- selectConfigSupplier
+ SELECT_CONFIG_SUPPLIER
)
);
HashMap<String, Object> context = new HashMap<String, Object>();
@@ -1410,9 +1379,9 @@ public class CachingClusteredClientTest
QueryRunner runner = new FinalizeResultsQueryRunner(
getDefaultQueryRunner(),
new SelectQueryQueryToolChest(
- jsonMapper,
+ JSON_MAPPER,
QueryRunnerTestHelper.NoopIntervalChunkingQueryRunnerDecorator(),
- selectConfigSupplier
+ SELECT_CONFIG_SUPPLIER
)
);
HashMap<String, Object> context = new HashMap<String, Object>();
@@ -1482,9 +1451,11 @@ public class CachingClusteredClientTest
collector.add(hashFn.hashString("abc123", StandardCharsets.UTF_8).asBytes());
collector.add(hashFn.hashString("123abc", StandardCharsets.UTF_8).asBytes());
+ final GroupByQuery query = builder.build();
+
testQueryCaching(
getDefaultQueryRunner(),
- builder.build(),
+ query,
Intervals.of("2011-01-01/2011-01-02"),
makeGroupByResults(
DateTimes.of("2011-01-01"),
@@ -1528,7 +1499,7 @@ public class CachingClusteredClientTest
QueryRunner runner = new FinalizeResultsQueryRunner(
getDefaultQueryRunner(),
- GroupByQueryRunnerTest.makeQueryRunnerFactory(new GroupByQueryConfig()).getToolchest()
+ WAREHOUSE.getToolChest(query)
);
HashMap<String, Object> context = new HashMap<String, Object>();
TestHelper.assertExpectedObjects(
@@ -2686,7 +2657,7 @@ public class CachingClusteredClientTest
}
},
cache,
- jsonMapper,
+ JSON_MAPPER,
cachePopulator,
new CacheConfig()
{
@@ -2988,9 +2959,10 @@ public class CachingClusteredClientTest
.setAggregatorSpecs(AGGS)
.setContext(CONTEXT);
+ final GroupByQuery query1 = builder.build();
testQueryCaching(
getDefaultQueryRunner(),
- builder.build(),
+ query1,
Intervals.of("2011-01-01/2011-01-02"),
makeGroupByResults(
DateTimes.of("2011-01-01"),
@@ -3024,7 +2996,7 @@ public class CachingClusteredClientTest
QueryRunner runner = new FinalizeResultsQueryRunner(
getDefaultQueryRunner(),
- GroupByQueryRunnerTest.makeQueryRunnerFactory(new GroupByQueryConfig()).getToolchest()
+ WAREHOUSE.getToolChest(query1)
);
HashMap<String, Object> context = new HashMap<String, Object>();
TestHelper.assertExpectedObjects(
@@ -3044,7 +3016,7 @@ public class CachingClusteredClientTest
""
);
- GroupByQuery query = builder
+ final GroupByQuery query2 = builder
.setInterval("2011-01-05/2011-01-10").setDimensions(new DefaultDimensionSpec("a", "output2"))
.setAggregatorSpecs(RENAMED_AGGS)
.build();
@@ -3061,7 +3033,7 @@ public class CachingClusteredClientTest
DateTimes.of("2011-01-09T"), ImmutableMap.of("output2", "g", "rows", 7, "imps", 7, "impers2", 7),
DateTimes.of("2011-01-09T01"), ImmutableMap.of("output2", "g", "rows", 7, "imps", 7, "impers2", 7)
),
- runner.run(QueryPlus.wrap(query), context),
+ runner.run(QueryPlus.wrap(query2), context),
"renamed aggregators test"
);
}
diff --git a/server/src/test/java/io/druid/client/CachingClusteredClientTestUtils.java b/server/src/test/java/io/druid/client/CachingClusteredClientTestUtils.java
new file mode 100644
index 0000000..190ecda
--- /dev/null
+++ b/server/src/test/java/io/druid/client/CachingClusteredClientTestUtils.java
@@ -0,0 +1,118 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package io.druid.client;
+
+import com.fasterxml.jackson.databind.ObjectMapper;
+import com.fasterxml.jackson.dataformat.smile.SmileFactory;
+import com.google.common.base.Supplier;
+import com.google.common.collect.ImmutableMap;
+import io.druid.jackson.DefaultObjectMapper;
+import io.druid.java.util.common.Pair;
+import io.druid.java.util.common.io.Closer;
+import io.druid.query.MapQueryToolChestWarehouse;
+import io.druid.query.Query;
+import io.druid.query.QueryRunnerTestHelper;
+import io.druid.query.QueryToolChest;
+import io.druid.query.QueryToolChestWarehouse;
+import io.druid.query.groupby.GroupByQuery;
+import io.druid.query.groupby.GroupByQueryConfig;
+import io.druid.query.groupby.GroupByQueryRunnerFactory;
+import io.druid.query.groupby.GroupByQueryRunnerTest;
+import io.druid.query.search.SearchQuery;
+import io.druid.query.search.SearchQueryConfig;
+import io.druid.query.search.SearchQueryQueryToolChest;
+import io.druid.query.select.SelectQuery;
+import io.druid.query.select.SelectQueryConfig;
+import io.druid.query.select.SelectQueryQueryToolChest;
+import io.druid.query.timeboundary.TimeBoundaryQuery;
+import io.druid.query.timeboundary.TimeBoundaryQueryQueryToolChest;
+import io.druid.query.timeseries.TimeseriesQuery;
+import io.druid.query.timeseries.TimeseriesQueryQueryToolChest;
+import io.druid.query.topn.TopNQuery;
+import io.druid.query.topn.TopNQueryConfig;
+import io.druid.query.topn.TopNQueryQueryToolChest;
+
+public final class CachingClusteredClientTestUtils
+{
+ /**
+ * Returns a new {@link QueryToolChestWarehouse} for unit tests and a resourceCloser which should be closed at the end
+ * of the test.
+ */
+ public static Pair<QueryToolChestWarehouse, Closer> createWarehouse(
+ ObjectMapper objectMapper,
+ Supplier<SelectQueryConfig> selectConfigSupplier
+ )
+ {
+ final Pair<GroupByQueryRunnerFactory, Closer> factoryCloserPair = GroupByQueryRunnerTest.makeQueryRunnerFactory(
+ new GroupByQueryConfig()
+ );
+ final GroupByQueryRunnerFactory factory = factoryCloserPair.lhs;
+ final Closer resourceCloser = factoryCloserPair.rhs;
+ return Pair.of(
+ new MapQueryToolChestWarehouse(
+ ImmutableMap.<Class<? extends Query>, QueryToolChest>builder()
+ .put(
+ TimeseriesQuery.class,
+ new TimeseriesQueryQueryToolChest(
+ QueryRunnerTestHelper.NoopIntervalChunkingQueryRunnerDecorator()
+ )
+ )
+ .put(
+ TopNQuery.class,
+ new TopNQueryQueryToolChest(
+ new TopNQueryConfig(),
+ QueryRunnerTestHelper.NoopIntervalChunkingQueryRunnerDecorator()
+ )
+ )
+ .put(
+ SearchQuery.class,
+ new SearchQueryQueryToolChest(
+ new SearchQueryConfig(),
+ QueryRunnerTestHelper.NoopIntervalChunkingQueryRunnerDecorator()
+ )
+ )
+ .put(
+ SelectQuery.class,
+ new SelectQueryQueryToolChest(
+ objectMapper,
+ QueryRunnerTestHelper.NoopIntervalChunkingQueryRunnerDecorator(),
+ selectConfigSupplier
+ )
+ )
+ .put(
+ GroupByQuery.class,
+ factory.getToolchest()
+ )
+ .put(TimeBoundaryQuery.class, new TimeBoundaryQueryQueryToolChest())
+ .build()
+ ),
+ resourceCloser
+ );
+ }
+
+ public static ObjectMapper createObjectMapper()
+ {
+ final SmileFactory factory = new SmileFactory();
+ final ObjectMapper objectMapper = new DefaultObjectMapper(factory);
+ factory.setCodec(objectMapper);
+ return objectMapper;
+ }
+
+ private CachingClusteredClientTestUtils() {}
+}
diff --git a/server/src/test/java/io/druid/segment/realtime/RealtimeManagerTest.java b/server/src/test/java/io/druid/segment/realtime/RealtimeManagerTest.java
index 6cee954..b829895 100644
--- a/server/src/test/java/io/druid/segment/realtime/RealtimeManagerTest.java
+++ b/server/src/test/java/io/druid/segment/realtime/RealtimeManagerTest.java
@@ -41,7 +41,9 @@ import io.druid.jackson.DefaultObjectMapper;
import io.druid.java.util.common.DateTimes;
import io.druid.java.util.common.ISE;
import io.druid.java.util.common.Intervals;
+import io.druid.java.util.common.Pair;
import io.druid.java.util.common.granularity.Granularities;
+import io.druid.java.util.common.io.Closer;
import io.druid.java.util.common.parsers.ParseException;
import io.druid.query.BaseQuery;
import io.druid.query.Query;
@@ -82,6 +84,7 @@ import org.joda.time.DateTime;
import org.joda.time.Interval;
import org.joda.time.Period;
import org.junit.After;
+import org.junit.AfterClass;
import org.junit.Assert;
import org.junit.Before;
import org.junit.BeforeClass;
@@ -89,6 +92,7 @@ import org.junit.Test;
import javax.annotation.Nullable;
import java.io.File;
+import java.io.IOException;
import java.util.Arrays;
import java.util.Collections;
import java.util.Iterator;
@@ -101,6 +105,7 @@ import java.util.concurrent.TimeUnit;
public class RealtimeManagerTest
{
private static QueryRunnerFactory factory;
+ private static Closer resourceCloser;
private static QueryRunnerFactoryConglomerate conglomerate;
private static final List<TestInputRowHolder> rows = Arrays.asList(
@@ -124,7 +129,9 @@ public class RealtimeManagerTest
@BeforeClass
public static void setupStatic()
{
- factory = initFactory();
+ final Pair<GroupByQueryRunnerFactory, Closer> factoryAndCloser = initFactory();
+ factory = factoryAndCloser.lhs;
+ resourceCloser = factoryAndCloser.rhs;
conglomerate = new QueryRunnerFactoryConglomerate()
{
@Override
@@ -135,6 +142,12 @@ public class RealtimeManagerTest
};
}
+ @AfterClass
+ public static void teardownStatic() throws IOException
+ {
+ resourceCloser.close();
+ }
+
@Before
public void setUp()
{
@@ -742,7 +755,7 @@ public class RealtimeManagerTest
}
- private static GroupByQueryRunnerFactory initFactory()
+ private static Pair<GroupByQueryRunnerFactory, Closer> initFactory()
{
final GroupByQueryConfig config = new GroupByQueryConfig();
config.setMaxIntermediateRows(10000);
diff --git a/sql/pom.xml b/sql/pom.xml
index 5b55247..7c041cc 100644
--- a/sql/pom.xml
+++ b/sql/pom.xml
@@ -90,6 +90,13 @@
</dependency>
<dependency>
<groupId>io.druid</groupId>
+ <artifactId>druid-common</artifactId>
+ <version>${project.parent.version}</version>
+ <type>test-jar</type>
+ <scope>test</scope>
+ </dependency>
+ <dependency>
+ <groupId>io.druid</groupId>
<artifactId>druid-processing</artifactId>
<version>${project.parent.version}</version>
<type>test-jar</type>
diff --git a/sql/src/test/java/io/druid/sql/avatica/DruidAvaticaHandlerTest.java b/sql/src/test/java/io/druid/sql/avatica/DruidAvaticaHandlerTest.java
index 1f84007..50ffd2c 100644
--- a/sql/src/test/java/io/druid/sql/avatica/DruidAvaticaHandlerTest.java
+++ b/sql/src/test/java/io/druid/sql/avatica/DruidAvaticaHandlerTest.java
@@ -39,7 +39,9 @@ import io.druid.initialization.Initialization;
import io.druid.java.util.common.DateTimes;
import io.druid.java.util.common.Pair;
import io.druid.java.util.common.StringUtils;
+import io.druid.java.util.common.io.Closer;
import io.druid.math.expr.ExprMacroTable;
+import io.druid.query.QueryRunnerFactoryConglomerate;
import io.druid.server.DruidNode;
import io.druid.server.security.AuthTestUtils;
import io.druid.server.security.AuthenticatorMapper;
@@ -62,13 +64,16 @@ import org.eclipse.jetty.server.Server;
import org.joda.time.DateTime;
import org.joda.time.DateTimeZone;
import org.junit.After;
+import org.junit.AfterClass;
import org.junit.Assert;
import org.junit.Before;
+import org.junit.BeforeClass;
import org.junit.Rule;
import org.junit.Test;
import org.junit.rules.ExpectedException;
import org.junit.rules.TemporaryFolder;
+import java.io.IOException;
import java.net.InetSocketAddress;
import java.sql.Connection;
import java.sql.DatabaseMetaData;
@@ -106,6 +111,24 @@ public class DruidAvaticaHandlerTest extends CalciteTestBase
}
};
+ private static QueryRunnerFactoryConglomerate conglomerate;
+ private static Closer resourceCloser;
+
+ @BeforeClass
+ public static void setUpClass()
+ {
+ final Pair<QueryRunnerFactoryConglomerate, Closer> conglomerateCloserPair = CalciteTests
+ .createQueryRunnerFactoryConglomerate();
+ conglomerate = conglomerateCloserPair.lhs;
+ resourceCloser = conglomerateCloserPair.rhs;
+ }
+
+ @AfterClass
+ public static void tearDownClass() throws IOException
+ {
+ resourceCloser.close();
+ }
+
@Rule
public ExpectedException expectedException = ExpectedException.none();
@@ -127,9 +150,9 @@ public class DruidAvaticaHandlerTest extends CalciteTestBase
@Before
public void setUp() throws Exception
{
- walker = CalciteTests.createMockWalker(temporaryFolder.newFolder());
+ walker = CalciteTests.createMockWalker(conglomerate, temporaryFolder.newFolder());
final PlannerConfig plannerConfig = new PlannerConfig();
- final DruidSchema druidSchema = CalciteTests.createMockSchema(walker, plannerConfig);
+ final DruidSchema druidSchema = CalciteTests.createMockSchema(conglomerate, walker, plannerConfig);
final DruidOperatorTable operatorTable = CalciteTests.createOperatorTable();
final ExprMacroTable macroTable = CalciteTests.createExprMacroTable();
@@ -154,7 +177,7 @@ public class DruidAvaticaHandlerTest extends CalciteTestBase
druidMeta = new DruidMeta(
new PlannerFactory(
druidSchema,
- CalciteTests.createMockQueryLifecycleFactory(walker),
+ CalciteTests.createMockQueryLifecycleFactory(walker, conglomerate),
operatorTable,
macroTable,
plannerConfig,
@@ -728,14 +751,14 @@ public class DruidAvaticaHandlerTest extends CalciteTestBase
};
final PlannerConfig plannerConfig = new PlannerConfig();
- final DruidSchema druidSchema = CalciteTests.createMockSchema(walker, plannerConfig);
+ final DruidSchema druidSchema = CalciteTests.createMockSchema(conglomerate, walker, plannerConfig);
final DruidOperatorTable operatorTable = CalciteTests.createOperatorTable();
final ExprMacroTable macroTable = CalciteTests.createExprMacroTable();
final List<Meta.Frame> frames = new ArrayList<>();
DruidMeta smallFrameDruidMeta = new DruidMeta(
new PlannerFactory(
druidSchema,
- CalciteTests.createMockQueryLifecycleFactory(walker),
+ CalciteTests.createMockQueryLifecycleFactory(walker, conglomerate),
operatorTable,
macroTable,
plannerConfig,
diff --git a/sql/src/test/java/io/druid/sql/avatica/DruidStatementTest.java b/sql/src/test/java/io/druid/sql/avatica/DruidStatementTest.java
index 50bd53e..cb77e83 100644
--- a/sql/src/test/java/io/druid/sql/avatica/DruidStatementTest.java
+++ b/sql/src/test/java/io/druid/sql/avatica/DruidStatementTest.java
@@ -23,7 +23,10 @@ import com.google.common.base.Function;
import com.google.common.collect.Lists;
import io.druid.common.config.NullHandling;
import io.druid.java.util.common.DateTimes;
+import io.druid.java.util.common.Pair;
+import io.druid.java.util.common.io.Closer;
import io.druid.math.expr.ExprMacroTable;
+import io.druid.query.QueryRunnerFactoryConglomerate;
import io.druid.server.security.AllowAllAuthenticator;
import io.druid.server.security.AuthTestUtils;
import io.druid.sql.calcite.planner.DruidOperatorTable;
@@ -37,12 +40,15 @@ import io.druid.sql.calcite.util.SpecificSegmentsQuerySegmentWalker;
import org.apache.calcite.avatica.ColumnMetaData;
import org.apache.calcite.avatica.Meta;
import org.junit.After;
+import org.junit.AfterClass;
import org.junit.Assert;
import org.junit.Before;
+import org.junit.BeforeClass;
import org.junit.Rule;
import org.junit.Test;
import org.junit.rules.TemporaryFolder;
+import java.io.IOException;
import java.util.List;
public class DruidStatementTest extends CalciteTestBase
@@ -53,23 +59,38 @@ public class DruidStatementTest extends CalciteTestBase
@Rule
public QueryLogHook queryLogHook = QueryLogHook.create();
+ private static QueryRunnerFactoryConglomerate conglomerate;
+ private static Closer resourceCloser;
+
+ @BeforeClass
+ public static void setUpClass()
+ {
+ final Pair<QueryRunnerFactoryConglomerate, Closer> conglomerateCloserPair = CalciteTests
+ .createQueryRunnerFactoryConglomerate();
+ conglomerate = conglomerateCloserPair.lhs;
+ resourceCloser = conglomerateCloserPair.rhs;
+ }
+
+ @AfterClass
+ public static void tearDownClass() throws IOException
+ {
+ resourceCloser.close();
+ }
+
private SpecificSegmentsQuerySegmentWalker walker;
private PlannerFactory plannerFactory;
@Before
public void setUp() throws Exception
{
- walker = CalciteTests.createMockWalker(temporaryFolder.newFolder());
+ walker = CalciteTests.createMockWalker(conglomerate, temporaryFolder.newFolder());
final PlannerConfig plannerConfig = new PlannerConfig();
- final DruidSchema druidSchema = CalciteTests.createMockSchema(
- walker,
- plannerConfig
- );
+ final DruidSchema druidSchema = CalciteTests.createMockSchema(conglomerate, walker, plannerConfig);
final DruidOperatorTable operatorTable = CalciteTests.createOperatorTable();
final ExprMacroTable macroTable = CalciteTests.createExprMacroTable();
plannerFactory = new PlannerFactory(
druidSchema,
- CalciteTests.createMockQueryLifecycleFactory(walker),
+ CalciteTests.createMockQueryLifecycleFactory(walker, conglomerate),
operatorTable,
macroTable,
plannerConfig,
diff --git a/sql/src/test/java/io/druid/sql/calcite/CalciteQueryTest.java b/sql/src/test/java/io/druid/sql/calcite/CalciteQueryTest.java
index c37a825..7a70341 100644
--- a/sql/src/test/java/io/druid/sql/calcite/CalciteQueryTest.java
+++ b/sql/src/test/java/io/druid/sql/calcite/CalciteQueryTest.java
@@ -27,15 +27,18 @@ import io.druid.common.config.NullHandling;
import io.druid.hll.HLLCV1;
import io.druid.java.util.common.DateTimes;
import io.druid.java.util.common.Intervals;
+import io.druid.java.util.common.Pair;
import io.druid.java.util.common.StringUtils;
import io.druid.java.util.common.granularity.Granularities;
import io.druid.java.util.common.granularity.PeriodGranularity;
+import io.druid.java.util.common.io.Closer;
import io.druid.java.util.common.logger.Logger;
import io.druid.math.expr.ExprMacroTable;
import io.druid.query.Druids;
import io.druid.query.Query;
import io.druid.query.QueryContexts;
import io.druid.query.QueryDataSource;
+import io.druid.query.QueryRunnerFactoryConglomerate;
import io.druid.query.aggregation.AggregatorFactory;
import io.druid.query.aggregation.CountAggregatorFactory;
import io.druid.query.aggregation.DoubleMaxAggregatorFactory;
@@ -111,8 +114,10 @@ import org.joda.time.Interval;
import org.joda.time.Period;
import org.joda.time.chrono.ISOChronology;
import org.junit.After;
+import org.junit.AfterClass;
import org.junit.Assert;
import org.junit.Before;
+import org.junit.BeforeClass;
import org.junit.Ignore;
import org.junit.Rule;
import org.junit.Test;
@@ -120,6 +125,7 @@ import org.junit.internal.matchers.ThrowableMessageMatcher;
import org.junit.rules.ExpectedException;
import org.junit.rules.TemporaryFolder;
+import java.io.IOException;
import java.util.ArrayList;
import java.util.Arrays;
import java.util.Collections;
@@ -231,6 +237,24 @@ public class CalciteQueryTest extends CalciteTestBase
private static final PagingSpec FIRST_PAGING_SPEC = new PagingSpec(null, 1000, true);
+ private static QueryRunnerFactoryConglomerate conglomerate;
+ private static Closer resourceCloser;
+
+ @BeforeClass
+ public static void setUpClass()
+ {
+ final Pair<QueryRunnerFactoryConglomerate, Closer> conglomerateCloserPair = CalciteTests
+ .createQueryRunnerFactoryConglomerate();
+ conglomerate = conglomerateCloserPair.lhs;
+ resourceCloser = conglomerateCloserPair.rhs;
+ }
+
+ @AfterClass
+ public static void tearDownClass() throws IOException
+ {
+ resourceCloser.close();
+ }
+
@Rule
public ExpectedException expectedException = ExpectedException.none();
@@ -245,7 +269,7 @@ public class CalciteQueryTest extends CalciteTestBase
@Before
public void setUp() throws Exception
{
- walker = CalciteTests.createMockWalker(temporaryFolder.newFolder());
+ walker = CalciteTests.createMockWalker(conglomerate, temporaryFolder.newFolder());
}
@After
@@ -7252,13 +7276,13 @@ public class CalciteQueryTest extends CalciteTestBase
) throws Exception
{
final InProcessViewManager viewManager = new InProcessViewManager(CalciteTests.TEST_AUTHENTICATOR_ESCALATOR);
- final DruidSchema druidSchema = CalciteTests.createMockSchema(walker, plannerConfig, viewManager);
+ final DruidSchema druidSchema = CalciteTests.createMockSchema(conglomerate, walker, plannerConfig, viewManager);
final DruidOperatorTable operatorTable = CalciteTests.createOperatorTable();
final ExprMacroTable macroTable = CalciteTests.createExprMacroTable();
final PlannerFactory plannerFactory = new PlannerFactory(
druidSchema,
- CalciteTests.createMockQueryLifecycleFactory(walker),
+ CalciteTests.createMockQueryLifecycleFactory(walker, conglomerate),
operatorTable,
macroTable,
plannerConfig,
diff --git a/sql/src/test/java/io/druid/sql/calcite/http/SqlResourceTest.java b/sql/src/test/java/io/druid/sql/calcite/http/SqlResourceTest.java
index 61a0f05..849c8eb 100644
--- a/sql/src/test/java/io/druid/sql/calcite/http/SqlResourceTest.java
+++ b/sql/src/test/java/io/druid/sql/calcite/http/SqlResourceTest.java
@@ -28,8 +28,10 @@ import io.druid.common.config.NullHandling;
import io.druid.jackson.DefaultObjectMapper;
import io.druid.java.util.common.ISE;
import io.druid.java.util.common.Pair;
+import io.druid.java.util.common.io.Closer;
import io.druid.math.expr.ExprMacroTable;
import io.druid.query.QueryInterruptedException;
+import io.druid.query.QueryRunnerFactoryConglomerate;
import io.druid.query.ResourceLimitExceededException;
import io.druid.server.security.AllowAllAuthenticator;
import io.druid.server.security.AuthConfig;
@@ -48,8 +50,10 @@ import io.druid.sql.http.SqlResource;
import org.apache.calcite.tools.ValidationException;
import org.easymock.EasyMock;
import org.junit.After;
+import org.junit.AfterClass;
import org.junit.Assert;
import org.junit.Before;
+import org.junit.BeforeClass;
import org.junit.Rule;
import org.junit.Test;
import org.junit.rules.TemporaryFolder;
@@ -58,6 +62,7 @@ import javax.servlet.http.HttpServletRequest;
import javax.ws.rs.core.Response;
import javax.ws.rs.core.StreamingOutput;
import java.io.ByteArrayOutputStream;
+import java.io.IOException;
import java.util.List;
import java.util.Map;
@@ -65,6 +70,24 @@ public class SqlResourceTest extends CalciteTestBase
{
private static final ObjectMapper JSON_MAPPER = new DefaultObjectMapper();
+ private static QueryRunnerFactoryConglomerate conglomerate;
+ private static Closer resourceCloser;
+
+ @BeforeClass
+ public static void setUpClass()
+ {
+ final Pair<QueryRunnerFactoryConglomerate, Closer> conglomerateCloserPair = CalciteTests
+ .createQueryRunnerFactoryConglomerate();
+ conglomerate = conglomerateCloserPair.lhs;
+ resourceCloser = conglomerateCloserPair.rhs;
+ }
+
+ @AfterClass
+ public static void tearDownClass() throws IOException
+ {
+ resourceCloser.close();
+ }
+
@Rule
public TemporaryFolder temporaryFolder = new TemporaryFolder();
@@ -77,14 +100,13 @@ public class SqlResourceTest extends CalciteTestBase
private HttpServletRequest req;
-
@Before
public void setUp() throws Exception
{
- walker = CalciteTests.createMockWalker(temporaryFolder.newFolder());
+ walker = CalciteTests.createMockWalker(conglomerate, temporaryFolder.newFolder());
final PlannerConfig plannerConfig = new PlannerConfig();
- final DruidSchema druidSchema = CalciteTests.createMockSchema(walker, plannerConfig);
+ final DruidSchema druidSchema = CalciteTests.createMockSchema(conglomerate, walker, plannerConfig);
final DruidOperatorTable operatorTable = CalciteTests.createOperatorTable();
final ExprMacroTable macroTable = CalciteTests.createExprMacroTable();
req = EasyMock.createStrictMock(HttpServletRequest.class);
@@ -106,7 +128,7 @@ public class SqlResourceTest extends CalciteTestBase
JSON_MAPPER,
new PlannerFactory(
druidSchema,
- CalciteTests.createMockQueryLifecycleFactory(walker),
+ CalciteTests.createMockQueryLifecycleFactory(walker, conglomerate),
operatorTable,
macroTable,
plannerConfig,
diff --git a/sql/src/test/java/io/druid/sql/calcite/schema/DruidSchemaTest.java b/sql/src/test/java/io/druid/sql/calcite/schema/DruidSchemaTest.java
index 309722f..76ef6ce 100644
--- a/sql/src/test/java/io/druid/sql/calcite/schema/DruidSchemaTest.java
+++ b/sql/src/test/java/io/druid/sql/calcite/schema/DruidSchemaTest.java
@@ -24,6 +24,9 @@ import com.google.common.collect.ImmutableMap;
import com.google.common.collect.ImmutableSet;
import io.druid.data.input.InputRow;
import io.druid.java.util.common.Intervals;
+import io.druid.java.util.common.Pair;
+import io.druid.java.util.common.io.Closer;
+import io.druid.query.QueryRunnerFactoryConglomerate;
import io.druid.query.aggregation.CountAggregatorFactory;
import io.druid.query.aggregation.DoubleSumAggregatorFactory;
import io.druid.query.aggregation.LongSumAggregatorFactory;
@@ -48,13 +51,16 @@ import org.apache.calcite.rel.type.RelDataTypeField;
import org.apache.calcite.schema.Table;
import org.apache.calcite.sql.type.SqlTypeName;
import org.junit.After;
+import org.junit.AfterClass;
import org.junit.Assert;
import org.junit.Before;
+import org.junit.BeforeClass;
import org.junit.Rule;
import org.junit.Test;
import org.junit.rules.TemporaryFolder;
import java.io.File;
+import java.io.IOException;
import java.util.List;
import java.util.Map;
@@ -62,18 +68,36 @@ public class DruidSchemaTest extends CalciteTestBase
{
private static final PlannerConfig PLANNER_CONFIG_DEFAULT = new PlannerConfig();
- public static final List<InputRow> ROWS1 = ImmutableList.of(
+ private static final List<InputRow> ROWS1 = ImmutableList.of(
CalciteTests.createRow(ImmutableMap.of("t", "2000-01-01", "m1", "1.0", "dim1", "")),
CalciteTests.createRow(ImmutableMap.of("t", "2000-01-02", "m1", "2.0", "dim1", "10.1")),
CalciteTests.createRow(ImmutableMap.of("t", "2000-01-03", "m1", "3.0", "dim1", "2"))
);
- public static final List<InputRow> ROWS2 = ImmutableList.of(
+ private static final List<InputRow> ROWS2 = ImmutableList.of(
CalciteTests.createRow(ImmutableMap.of("t", "2001-01-01", "m1", "4.0", "dim2", ImmutableList.of("a"))),
CalciteTests.createRow(ImmutableMap.of("t", "2001-01-02", "m1", "5.0", "dim2", ImmutableList.of("abc"))),
CalciteTests.createRow(ImmutableMap.of("t", "2001-01-03", "m1", "6.0"))
);
+ private static QueryRunnerFactoryConglomerate conglomerate;
+ private static Closer resourceCloser;
+
+ @BeforeClass
+ public static void setUpClass()
+ {
+ final Pair<QueryRunnerFactoryConglomerate, Closer> conglomerateCloserPair = CalciteTests
+ .createQueryRunnerFactoryConglomerate();
+ conglomerate = conglomerateCloserPair.lhs;
+ resourceCloser = conglomerateCloserPair.rhs;
+ }
+
+ @AfterClass
+ public static void tearDownClass() throws IOException
+ {
+ resourceCloser.close();
+ }
+
@Rule
public TemporaryFolder temporaryFolder = new TemporaryFolder();
@@ -112,7 +136,7 @@ public class DruidSchemaTest extends CalciteTestBase
.rows(ROWS2)
.buildMMappedIndex();
- walker = new SpecificSegmentsQuerySegmentWalker(CalciteTests.queryRunnerFactoryConglomerate()).add(
+ walker = new SpecificSegmentsQuerySegmentWalker(conglomerate).add(
DataSegment.builder()
.dataSource(CalciteTests.DATASOURCE1)
.interval(Intervals.of("2000/P1Y"))
@@ -140,7 +164,7 @@ public class DruidSchemaTest extends CalciteTestBase
schema = new DruidSchema(
- CalciteTests.createMockQueryLifecycleFactory(walker),
+ CalciteTests.createMockQueryLifecycleFactory(walker, conglomerate),
new TestServerInventoryView(walker.getSegments()),
PLANNER_CONFIG_DEFAULT,
new NoopViewManager(),
diff --git a/sql/src/test/java/io/druid/sql/calcite/util/CalciteTests.java b/sql/src/test/java/io/druid/sql/calcite/util/CalciteTests.java
index b9e60d9..45fd20e 100644
--- a/sql/src/test/java/io/druid/sql/calcite/util/CalciteTests.java
+++ b/sql/src/test/java/io/druid/sql/calcite/util/CalciteTests.java
@@ -32,7 +32,7 @@ import com.google.inject.Guice;
import com.google.inject.Injector;
import com.google.inject.Key;
import com.google.inject.Module;
-import io.druid.collections.StupidPool;
+import io.druid.collections.CloseableStupidPool;
import io.druid.data.input.InputRow;
import io.druid.data.input.impl.DimensionsSpec;
import io.druid.data.input.impl.InputRowParser;
@@ -41,6 +41,8 @@ import io.druid.data.input.impl.TimeAndDimsParseSpec;
import io.druid.data.input.impl.TimestampSpec;
import io.druid.guice.ExpressionModule;
import io.druid.guice.annotations.Json;
+import io.druid.java.util.common.Pair;
+import io.druid.java.util.common.io.Closer;
import io.druid.java.util.emitter.core.NoopEmitter;
import io.druid.java.util.emitter.service.ServiceEmitter;
import io.druid.math.expr.ExprMacroTable;
@@ -62,6 +64,7 @@ import io.druid.query.expression.LookupEnabledTestExprMacroTable;
import io.druid.query.expression.LookupExprMacro;
import io.druid.query.groupby.GroupByQuery;
import io.druid.query.groupby.GroupByQueryConfig;
+import io.druid.query.groupby.GroupByQueryRunnerFactory;
import io.druid.query.groupby.GroupByQueryRunnerTest;
import io.druid.query.groupby.strategy.GroupByStrategySelector;
import io.druid.query.lookup.LookupReferencesManager;
@@ -234,109 +237,6 @@ public class CalciteTests
}
);
- private static final QueryRunnerFactoryConglomerate CONGLOMERATE = new DefaultQueryRunnerFactoryConglomerate(
- ImmutableMap.<Class<? extends Query>, QueryRunnerFactory>builder()
- .put(
- SegmentMetadataQuery.class,
- new SegmentMetadataQueryRunnerFactory(
- new SegmentMetadataQueryQueryToolChest(
- new SegmentMetadataQueryConfig("P1W")
- ),
- QueryRunnerTestHelper.NOOP_QUERYWATCHER
- )
- )
- .put(
- ScanQuery.class,
- new ScanQueryRunnerFactory(
- new ScanQueryQueryToolChest(
- new ScanQueryConfig(),
- new DefaultGenericQueryMetricsFactory(TestHelper.makeJsonMapper())
- ),
- new ScanQueryEngine()
- )
- )
- .put(
- SelectQuery.class,
- new SelectQueryRunnerFactory(
- new SelectQueryQueryToolChest(
- TestHelper.makeJsonMapper(),
- QueryRunnerTestHelper.NoopIntervalChunkingQueryRunnerDecorator(),
- SELECT_CONFIG_SUPPLIER
- ),
- new SelectQueryEngine(),
- QueryRunnerTestHelper.NOOP_QUERYWATCHER
- )
- )
- .put(
- TimeseriesQuery.class,
- new TimeseriesQueryRunnerFactory(
- new TimeseriesQueryQueryToolChest(
- QueryRunnerTestHelper.NoopIntervalChunkingQueryRunnerDecorator()
- ),
- new TimeseriesQueryEngine(),
- QueryRunnerTestHelper.NOOP_QUERYWATCHER
- )
- )
- .put(
- TopNQuery.class,
- new TopNQueryRunnerFactory(
- new StupidPool<>(
- "TopNQueryRunnerFactory-bufferPool",
- new Supplier<ByteBuffer>()
- {
- @Override
- public ByteBuffer get()
- {
- return ByteBuffer.allocate(10 * 1024 * 1024);
- }
- }
- ),
- new TopNQueryQueryToolChest(
- new TopNQueryConfig(),
- QueryRunnerTestHelper.NoopIntervalChunkingQueryRunnerDecorator()
- ),
- QueryRunnerTestHelper.NOOP_QUERYWATCHER
- )
- )
- .put(
- GroupByQuery.class,
- GroupByQueryRunnerTest.makeQueryRunnerFactory(
- GroupByQueryRunnerTest.DEFAULT_MAPPER,
- new GroupByQueryConfig()
- {
- @Override
- public String getDefaultStrategy()
- {
- return GroupByStrategySelector.STRATEGY_V2;
- }
- },
- new DruidProcessingConfig()
- {
- @Override
- public String getFormatString()
- {
- return null;
- }
-
- @Override
- public int intermediateComputeSizeBytes()
- {
- return 10 * 1024 * 1024;
- }
-
- @Override
- public int getNumMergeBuffers()
- {
- // Need 3 buffers for CalciteQueryTest.testDoubleNestedGroupby.
- // Two buffers for the broker and one for the queryable
- return 3;
- }
- }
- )
- )
- .build()
- );
-
private static final InputRowParser<Map<String, Object>> PARSER = new MapInputRowParser(
new TimeAndDimsParseSpec(
new TimestampSpec(TIMESTAMP_COLUMN, "iso", null),
@@ -394,12 +294,126 @@ public class CalciteTests
// No instantiation.
}
- public static QueryRunnerFactoryConglomerate queryRunnerFactoryConglomerate()
+ /**
+ * Returns a new {@link QueryRunnerFactoryConglomerate} and a {@link Closer} which should be closed at the end of the
+ * test.
+ */
+ public static Pair<QueryRunnerFactoryConglomerate, Closer> createQueryRunnerFactoryConglomerate()
{
- return CONGLOMERATE;
+ final Closer resourceCloser = Closer.create();
+ final CloseableStupidPool<ByteBuffer> stupidPool = new CloseableStupidPool<>(
+ "TopNQueryRunnerFactory-bufferPool",
+ new Supplier<ByteBuffer>()
+ {
+ @Override
+ public ByteBuffer get()
+ {
+ return ByteBuffer.allocate(10 * 1024 * 1024);
+ }
+ }
+ );
+ resourceCloser.register(stupidPool);
+ final Pair<GroupByQueryRunnerFactory, Closer> factoryCloserPair = GroupByQueryRunnerTest
+ .makeQueryRunnerFactory(
+ GroupByQueryRunnerTest.DEFAULT_MAPPER,
+ new GroupByQueryConfig()
+ {
+ @Override
+ public String getDefaultStrategy()
+ {
+ return GroupByStrategySelector.STRATEGY_V2;
+ }
+ },
+ new DruidProcessingConfig()
+ {
+ @Override
+ public String getFormatString()
+ {
+ return null;
+ }
+
+ @Override
+ public int intermediateComputeSizeBytes()
+ {
+ return 10 * 1024 * 1024;
+ }
+
+ @Override
+ public int getNumMergeBuffers()
+ {
+ // Need 3 buffers for CalciteQueryTest.testDoubleNestedGroupby.
+ // Two buffers for the broker and one for the queryable
+ return 3;
+ }
+ }
+ );
+ final GroupByQueryRunnerFactory factory = factoryCloserPair.lhs;
+ resourceCloser.register(factoryCloserPair.rhs);
+
+ final QueryRunnerFactoryConglomerate conglomerate = new DefaultQueryRunnerFactoryConglomerate(
+ ImmutableMap.<Class<? extends Query>, QueryRunnerFactory>builder()
+ .put(
+ SegmentMetadataQuery.class,
+ new SegmentMetadataQueryRunnerFactory(
+ new SegmentMetadataQueryQueryToolChest(
+ new SegmentMetadataQueryConfig("P1W")
+ ),
+ QueryRunnerTestHelper.NOOP_QUERYWATCHER
+ )
+ )
+ .put(
+ ScanQuery.class,
+ new ScanQueryRunnerFactory(
+ new ScanQueryQueryToolChest(
+ new ScanQueryConfig(),
+ new DefaultGenericQueryMetricsFactory(TestHelper.makeJsonMapper())
+ ),
+ new ScanQueryEngine()
+ )
+ )
+ .put(
+ SelectQuery.class,
+ new SelectQueryRunnerFactory(
+ new SelectQueryQueryToolChest(
+ TestHelper.makeJsonMapper(),
+ QueryRunnerTestHelper.NoopIntervalChunkingQueryRunnerDecorator(),
+ SELECT_CONFIG_SUPPLIER
+ ),
+ new SelectQueryEngine(),
+ QueryRunnerTestHelper.NOOP_QUERYWATCHER
+ )
+ )
+ .put(
+ TimeseriesQuery.class,
+ new TimeseriesQueryRunnerFactory(
+ new TimeseriesQueryQueryToolChest(
+ QueryRunnerTestHelper.NoopIntervalChunkingQueryRunnerDecorator()
+ ),
+ new TimeseriesQueryEngine(),
+ QueryRunnerTestHelper.NOOP_QUERYWATCHER
+ )
+ )
+ .put(
+ TopNQuery.class,
+ new TopNQueryRunnerFactory(
+ stupidPool,
+ new TopNQueryQueryToolChest(
+ new TopNQueryConfig(),
+ QueryRunnerTestHelper.NoopIntervalChunkingQueryRunnerDecorator()
+ ),
+ QueryRunnerTestHelper.NOOP_QUERYWATCHER
+ )
+ )
+ .put(GroupByQuery.class, factory)
+ .build()
+ );
+ return Pair.of(conglomerate, resourceCloser);
}
- public static QueryLifecycleFactory createMockQueryLifecycleFactory(final QuerySegmentWalker walker)
+ public static QueryLifecycleFactory createMockQueryLifecycleFactory(
+ final QuerySegmentWalker walker,
+ final QueryRunnerFactoryConglomerate conglomerate
+ )
{
return new QueryLifecycleFactory(
new QueryToolChestWarehouse()
@@ -407,7 +421,7 @@ public class CalciteTests
@Override
public <T, QueryType extends Query<T>> QueryToolChest<T, QueryType> getToolChest(final QueryType query)
{
- return CONGLOMERATE.findFactory(query).getToolchest();
+ return conglomerate.findFactory(query).getToolchest();
}
},
walker,
@@ -424,7 +438,10 @@ public class CalciteTests
return INJECTOR.getInstance(Key.get(ObjectMapper.class, Json.class));
}
- public static SpecificSegmentsQuerySegmentWalker createMockWalker(final File tmpDir)
+ public static SpecificSegmentsQuerySegmentWalker createMockWalker(
+ final QueryRunnerFactoryConglomerate conglomerate,
+ final File tmpDir
+ )
{
final QueryableIndex index1 = IndexBuilder
.create()
@@ -450,7 +467,7 @@ public class CalciteTests
.rows(FORBIDDEN_ROWS)
.buildMMappedIndex();
- return new SpecificSegmentsQuerySegmentWalker(queryRunnerFactoryConglomerate()).add(
+ return new SpecificSegmentsQuerySegmentWalker(conglomerate).add(
DataSegment.builder()
.dataSource(DATASOURCE1)
.interval(index1.getDataInterval())
@@ -500,21 +517,23 @@ public class CalciteTests
}
public static DruidSchema createMockSchema(
+ final QueryRunnerFactoryConglomerate conglomerate,
final SpecificSegmentsQuerySegmentWalker walker,
final PlannerConfig plannerConfig
)
{
- return createMockSchema(walker, plannerConfig, new NoopViewManager());
+ return createMockSchema(conglomerate, walker, plannerConfig, new NoopViewManager());
}
public static DruidSchema createMockSchema(
+ final QueryRunnerFactoryConglomerate conglomerate,
final SpecificSegmentsQuerySegmentWalker walker,
final PlannerConfig plannerConfig,
final ViewManager viewManager
)
{
final DruidSchema schema = new DruidSchema(
- CalciteTests.createMockQueryLifecycleFactory(walker),
+ CalciteTests.createMockQueryLifecycleFactory(walker, conglomerate),
new TestServerInventoryView(walker.getSegments()),
plannerConfig,
viewManager,
---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscribe@druid.apache.org
For additional commands, e-mail: commits-help@druid.apache.org