You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@druid.apache.org by jo...@apache.org on 2019/04/26 23:12:16 UTC
[incubator-druid] branch master updated: Fix time-ordered scan
queries on realtime segments (#7546)
This is an automated email from the ASF dual-hosted git repository.
jonwei pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/incubator-druid.git
The following commit(s) were added to refs/heads/master by this push:
new 07dd742 Fix time-ordered scan queries on realtime segments (#7546)
07dd742 is described below
commit 07dd742e357b5731e8848c943c2a641705a4785a
Author: Justin Borromeo <jb...@edu.uwaterloo.ca>
AuthorDate: Fri Apr 26 16:12:10 2019 -0700
Fix time-ordered scan queries on realtime segments (#7546)
* Initial commit
* Added test for int to long conversion
* Add appenderator test for realtime scan query
* get rid of todo
* Fix forbidden apis
* Jon's recommendations
* Formatting
---
.../org/apache/druid/query/SinkQueryRunners.java | 60 +++++++++++++++
.../druid/query/scan/ScanQueryRunnerFactory.java | 70 +++++++++--------
.../apache/druid/query/scan/ScanResultValue.java | 10 +--
.../query/scan/ScanQueryRunnerFactoryTest.java | 22 +++---
.../druid/query/scan/ScanResultValueTest.java | 65 ++++++++++------
.../appenderator/SinkQuerySegmentWalker.java | 87 ++++++++++------------
.../druid/segment/realtime/plumber/Sink.java | 5 +-
.../realtime/appenderator/AppenderatorTest.java | 41 +++++++++-
.../realtime/appenderator/AppenderatorTester.java | 15 ++++
9 files changed, 257 insertions(+), 118 deletions(-)
diff --git a/processing/src/main/java/org/apache/druid/query/SinkQueryRunners.java b/processing/src/main/java/org/apache/druid/query/SinkQueryRunners.java
new file mode 100644
index 0000000..9cec39a
--- /dev/null
+++ b/processing/src/main/java/org/apache/druid/query/SinkQueryRunners.java
@@ -0,0 +1,60 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.druid.query;
+
+import org.apache.druid.java.util.common.Pair;
+import org.joda.time.Interval;
+
+import java.util.Iterator;
+
+public class SinkQueryRunners<T> implements Iterable<QueryRunner<T>>
+{
+ Iterable<Pair<Interval, QueryRunner<T>>> runners;
+
+ public SinkQueryRunners(Iterable<Pair<Interval, QueryRunner<T>>> runners)
+ {
+ this.runners = runners;
+ }
+
+ public Iterator<Pair<Interval, QueryRunner<T>>> runnerIntervalMappingIterator()
+ {
+ return runners.iterator();
+ }
+
+ @Override
+ public Iterator<QueryRunner<T>> iterator()
+ {
+ Iterator<Pair<Interval, QueryRunner<T>>> runnerIntervalIterator = runners.iterator();
+ return new Iterator<QueryRunner<T>>()
+ {
+ @Override
+ public boolean hasNext()
+ {
+ return runnerIntervalIterator.hasNext();
+ }
+
+ @Override
+ public QueryRunner<T> next()
+ {
+ return runnerIntervalIterator.next().rhs;
+ }
+ };
+ }
+}
diff --git a/processing/src/main/java/org/apache/druid/query/scan/ScanQueryRunnerFactory.java b/processing/src/main/java/org/apache/druid/query/scan/ScanQueryRunnerFactory.java
index 0a9f3b9..c34d58c 100644
--- a/processing/src/main/java/org/apache/druid/query/scan/ScanQueryRunnerFactory.java
+++ b/processing/src/main/java/org/apache/druid/query/scan/ScanQueryRunnerFactory.java
@@ -20,7 +20,6 @@
package org.apache.druid.query.scan;
import com.google.common.annotations.VisibleForTesting;
-import com.google.common.base.Preconditions;
import com.google.common.collect.Lists;
import com.google.common.collect.Ordering;
import com.google.inject.Inject;
@@ -39,6 +38,7 @@ import org.apache.druid.query.QueryRunner;
import org.apache.druid.query.QueryRunnerFactory;
import org.apache.druid.query.QueryToolChest;
import org.apache.druid.query.SegmentDescriptor;
+import org.apache.druid.query.SinkQueryRunners;
import org.apache.druid.query.spec.MultipleSpecificSegmentSpec;
import org.apache.druid.query.spec.QuerySegmentSpec;
import org.apache.druid.query.spec.SpecificSegmentSpec;
@@ -114,11 +114,11 @@ public class ScanQueryRunnerFactory implements QueryRunnerFactory<ScanResultValu
return returnedRows;
}
} else {
- List<SegmentDescriptor> descriptorsOrdered = getSegmentDescriptorsFromSpecificQuerySpec(query.getQuerySegmentSpec());
+ List<Interval> intervalsOrdered = getIntervalsFromSpecificQuerySpec(query.getQuerySegmentSpec());
List<QueryRunner<ScanResultValue>> queryRunnersOrdered = Lists.newArrayList(queryRunners);
if (query.getOrder().equals(ScanQuery.Order.DESCENDING)) {
- descriptorsOrdered = Lists.reverse(descriptorsOrdered);
+ intervalsOrdered = Lists.reverse(intervalsOrdered);
queryRunnersOrdered = Lists.reverse(queryRunnersOrdered);
}
int maxRowsQueuedForOrdering = (query.getMaxRowsQueuedForOrdering() == null
@@ -132,28 +132,29 @@ public class ScanQueryRunnerFactory implements QueryRunnerFactory<ScanResultValu
input -> input.run(queryPlus, responseContext)
)),
query,
- descriptorsOrdered
+ intervalsOrdered
);
} else {
- Preconditions.checkState(
- descriptorsOrdered.size() == queryRunnersOrdered.size(),
- "Number of segment descriptors does not equal number of "
- + "query runners...something went wrong!"
- );
-
- // Combine the two lists of segment descriptors and query runners into a single list of
- // segment descriptors - query runner pairs. This makes it easier to use stream operators.
- List<Pair<SegmentDescriptor, QueryRunner<ScanResultValue>>> descriptorsAndRunnersOrdered = new ArrayList<>();
- for (int i = 0; i < queryRunnersOrdered.size(); i++) {
- descriptorsAndRunnersOrdered.add(new Pair<>(descriptorsOrdered.get(i), queryRunnersOrdered.get(i)));
+ // Use n-way merge strategy
+ List<Pair<Interval, QueryRunner<ScanResultValue>>> intervalsAndRunnersOrdered = new ArrayList<>();
+ if (intervalsOrdered.size() == queryRunnersOrdered.size()) {
+ for (int i = 0; i < queryRunnersOrdered.size(); i++) {
+ intervalsAndRunnersOrdered.add(new Pair<>(intervalsOrdered.get(i), queryRunnersOrdered.get(i)));
+ }
+ } else if (queryRunners instanceof SinkQueryRunners) {
+ ((SinkQueryRunners<ScanResultValue>) queryRunners).runnerIntervalMappingIterator()
+ .forEachRemaining(intervalsAndRunnersOrdered::add);
+ } else {
+ throw new ISE("Number of segment descriptors does not equal number of "
+ + "query runners...something went wrong!");
}
// Group the list of pairs by interval. The LinkedHashMap will have an interval paired with a list of all the
// query runners for that segment
- LinkedHashMap<Interval, List<Pair<SegmentDescriptor, QueryRunner<ScanResultValue>>>> partitionsGroupedByInterval =
- descriptorsAndRunnersOrdered.stream()
+ LinkedHashMap<Interval, List<Pair<Interval, QueryRunner<ScanResultValue>>>> partitionsGroupedByInterval =
+ intervalsAndRunnersOrdered.stream()
.collect(Collectors.groupingBy(
- x -> x.lhs.getInterval(),
+ x -> x.lhs,
LinkedHashMap::new,
Collectors.toList()
));
@@ -167,9 +168,9 @@ public class ScanQueryRunnerFactory implements QueryRunnerFactory<ScanResultValu
.max(Comparator.comparing(Integer::valueOf))
.get();
- int segmentPartitionLimit = (query.getMaxSegmentPartitionsOrderedInMemory() == null
- ? scanQueryConfig.getMaxSegmentPartitionsOrderedInMemory()
- : query.getMaxSegmentPartitionsOrderedInMemory());
+ int segmentPartitionLimit = query.getMaxSegmentPartitionsOrderedInMemory() == null
+ ? scanQueryConfig.getMaxSegmentPartitionsOrderedInMemory()
+ : query.getMaxSegmentPartitionsOrderedInMemory();
if (maxNumPartitionsInSegment <= segmentPartitionLimit) {
// Use n-way merge strategy
@@ -205,7 +206,7 @@ public class ScanQueryRunnerFactory implements QueryRunnerFactory<ScanResultValu
Sequence<ScanResultValue> priorityQueueSortAndLimit(
Sequence<ScanResultValue> inputSequence,
ScanQuery scanQuery,
- List<SegmentDescriptor> descriptorsOrdered
+ List<Interval> intervalsOrdered
)
{
Comparator<ScanResultValue> priorityQComparator = new ScanResultValueTimestampComparator(scanQuery);
@@ -254,9 +255,9 @@ public class ScanQueryRunnerFactory implements QueryRunnerFactory<ScanResultValu
// Finish scanning the interval containing the limit row
if (numRowsScanned > limit && finalInterval == null) {
long timestampOfLimitRow = srv.getFirstEventTimestamp(scanQuery.getResultFormat());
- for (SegmentDescriptor descriptor : descriptorsOrdered) {
- if (descriptor.getInterval().contains(timestampOfLimitRow)) {
- finalInterval = descriptor.getInterval();
+ for (Interval interval : intervalsOrdered) {
+ if (interval.contains(timestampOfLimitRow)) {
+ finalInterval = interval;
}
}
if (finalInterval == null) {
@@ -280,23 +281,28 @@ public class ScanQueryRunnerFactory implements QueryRunnerFactory<ScanResultValu
}
@VisibleForTesting
- List<SegmentDescriptor> getSegmentDescriptorsFromSpecificQuerySpec(QuerySegmentSpec spec)
+ List<Interval> getIntervalsFromSpecificQuerySpec(QuerySegmentSpec spec)
{
// Query segment spec must be an instance of MultipleSpecificSegmentSpec or SpecificSegmentSpec because
// segment descriptors need to be present for a 1:1 matching of intervals with query runners.
// The other types of segment spec condense the intervals (i.e. merge neighbouring intervals), eliminating
// the 1:1 relationship between intervals and query runners.
- List<SegmentDescriptor> descriptorsOrdered;
+ List<Interval> descriptorsOrdered;
if (spec instanceof MultipleSpecificSegmentSpec) {
// Ascending time order for both descriptors and query runners by default
- descriptorsOrdered = ((MultipleSpecificSegmentSpec) spec).getDescriptors();
+ descriptorsOrdered = ((MultipleSpecificSegmentSpec) spec).getDescriptors()
+ .stream()
+ .map(SegmentDescriptor::getInterval)
+ .collect(Collectors.toList());
} else if (spec instanceof SpecificSegmentSpec) {
- descriptorsOrdered = Collections.singletonList(((SpecificSegmentSpec) spec).getDescriptor());
+ descriptorsOrdered = Collections.singletonList(((SpecificSegmentSpec) spec).getDescriptor().getInterval());
} else {
- throw new UOE("Time-ordering on scan queries is only supported for queries with segment specs"
- + "of type MultipleSpecificSegmentSpec or SpecificSegmentSpec...a [%s] was received instead.",
- spec.getClass().getSimpleName());
+ throw new UOE(
+ "Time-ordering on scan queries is only supported for queries with segment specs"
+ + "of type MultipleSpecificSegmentSpec or SpecificSegmentSpec...a [%s] was received instead.",
+ spec.getClass().getSimpleName()
+ );
}
return descriptorsOrdered;
}
diff --git a/processing/src/main/java/org/apache/druid/query/scan/ScanResultValue.java b/processing/src/main/java/org/apache/druid/query/scan/ScanResultValue.java
index a05e715..8673b34 100644
--- a/processing/src/main/java/org/apache/druid/query/scan/ScanResultValue.java
+++ b/processing/src/main/java/org/apache/druid/query/scan/ScanResultValue.java
@@ -23,6 +23,7 @@ import com.fasterxml.jackson.annotation.JsonCreator;
import com.fasterxml.jackson.annotation.JsonProperty;
import org.apache.druid.java.util.common.ISE;
import org.apache.druid.java.util.common.UOE;
+import org.apache.druid.segment.DimensionHandlerUtils;
import org.apache.druid.segment.column.ColumnHolder;
import javax.annotation.Nullable;
@@ -79,18 +80,18 @@ public class ScanResultValue implements Comparable<ScanResultValue>
public long getFirstEventTimestamp(ScanQuery.ResultFormat resultFormat)
{
if (resultFormat.equals(ScanQuery.ResultFormat.RESULT_FORMAT_LIST)) {
- Long timestamp = (Long) ((Map<String, Object>) ((List<Object>) this.getEvents()).get(0)).get(ColumnHolder.TIME_COLUMN_NAME);
- if (timestamp == null) {
+ Object timestampObj = ((Map<String, Object>) ((List<Object>) this.getEvents()).get(0)).get(ColumnHolder.TIME_COLUMN_NAME);
+ if (timestampObj == null) {
throw new ISE("Unable to compare timestamp for rows without a time column");
}
- return timestamp;
+ return DimensionHandlerUtils.convertObjectToLong(timestampObj);
} else if (resultFormat.equals(ScanQuery.ResultFormat.RESULT_FORMAT_COMPACTED_LIST)) {
int timeColumnIndex = this.getColumns().indexOf(ColumnHolder.TIME_COLUMN_NAME);
if (timeColumnIndex == -1) {
throw new ISE("Unable to compare timestamp for rows without a time column");
}
List<Object> firstEvent = (List<Object>) ((List<Object>) this.getEvents()).get(0);
- return (Long) firstEvent.get(timeColumnIndex);
+ return DimensionHandlerUtils.convertObjectToLong(firstEvent.get(timeColumnIndex));
}
throw new UOE("Unable to get first event timestamp using result format of [%s]", resultFormat.toString());
}
@@ -105,7 +106,6 @@ public class ScanResultValue implements Comparable<ScanResultValue>
return singleEventScanResultValues;
}
-
@Override
public boolean equals(Object o)
{
diff --git a/processing/src/test/java/org/apache/druid/query/scan/ScanQueryRunnerFactoryTest.java b/processing/src/test/java/org/apache/druid/query/scan/ScanQueryRunnerFactoryTest.java
index 6559c65..4b65f53 100644
--- a/processing/src/test/java/org/apache/druid/query/scan/ScanQueryRunnerFactoryTest.java
+++ b/processing/src/test/java/org/apache/druid/query/scan/ScanQueryRunnerFactoryTest.java
@@ -140,10 +140,10 @@ public class ScanQueryRunnerFactoryTest
List<ScanResultValue> output = factory.priorityQueueSortAndLimit(
inputSequence,
query,
- ImmutableList.of(new SegmentDescriptor(new Interval(
+ ImmutableList.of(new Interval(
DateTimes.of("2010-01-01"),
DateTimes.of("2019-01-01").plusHours(1)
- ), "1", 0))
+ ))
).toList();
if (query.getLimit() > Integer.MAX_VALUE) {
Assert.fail("Unsupported exception should have been thrown due to high limit");
@@ -275,7 +275,7 @@ public class ScanQueryRunnerFactoryTest
), "1", 0);
@Test
- public void testGetValidSegmentDescriptorsFromSpec()
+ public void testGetValidIntervalsFromSpec()
{
QuerySegmentSpec multiSpecificSpec = new MultipleSpecificSegmentSpec(
Collections.singletonList(
@@ -284,13 +284,13 @@ public class ScanQueryRunnerFactoryTest
);
QuerySegmentSpec singleSpecificSpec = new SpecificSegmentSpec(descriptor);
- List<SegmentDescriptor> descriptors = factory.getSegmentDescriptorsFromSpecificQuerySpec(multiSpecificSpec);
- Assert.assertEquals(1, descriptors.size());
- Assert.assertEquals(descriptor, descriptors.get(0));
+ List<Interval> intervals = factory.getIntervalsFromSpecificQuerySpec(multiSpecificSpec);
+ Assert.assertEquals(1, intervals.size());
+ Assert.assertEquals(descriptor.getInterval(), intervals.get(0));
- descriptors = factory.getSegmentDescriptorsFromSpecificQuerySpec(singleSpecificSpec);
- Assert.assertEquals(1, descriptors.size());
- Assert.assertEquals(descriptor, descriptors.get(0));
+ intervals = factory.getIntervalsFromSpecificQuerySpec(singleSpecificSpec);
+ Assert.assertEquals(1, intervals.size());
+ Assert.assertEquals(descriptor.getInterval(), intervals.get(0));
}
@Test(expected = UOE.class)
@@ -304,7 +304,7 @@ public class ScanQueryRunnerFactoryTest
)
)
);
- factory.getSegmentDescriptorsFromSpecificQuerySpec(multiIntervalSpec);
+ factory.getIntervalsFromSpecificQuerySpec(multiIntervalSpec);
}
@Test(expected = UOE.class)
@@ -316,7 +316,7 @@ public class ScanQueryRunnerFactoryTest
DateTimes.of("2019-01-01").plusHours(1)
)
);
- factory.getSegmentDescriptorsFromSpecificQuerySpec(legacySpec);
+ factory.getIntervalsFromSpecificQuerySpec(legacySpec);
}
}
}
diff --git a/processing/src/test/java/org/apache/druid/query/scan/ScanResultValueTest.java b/processing/src/test/java/org/apache/druid/query/scan/ScanResultValueTest.java
index 47f82ad..90566aa 100644
--- a/processing/src/test/java/org/apache/druid/query/scan/ScanResultValueTest.java
+++ b/processing/src/test/java/org/apache/druid/query/scan/ScanResultValueTest.java
@@ -36,11 +36,14 @@ import java.util.Map;
public class ScanResultValueTest
{
private static final ObjectMapper JSON_MAPPER = new DefaultObjectMapper();
- private static final long TIME_1 = 1234567890000L;
- private static final long TIME_2 = 9876543210000L;
+ private static final long TIME_1_LONG = 1234567890000L;
+ private static final long TIME_2_LONG = 9876543210000L;
+ private static final int TIME_3_INT = Integer.MAX_VALUE;
- private static ScanResultValue compactedListSRV;
- private static ScanResultValue listSRV;
+ private static ScanResultValue compactedListSRVLongTimestamp;
+ private static ScanResultValue listSRVLongTimestamp;
+ private static ScanResultValue compactedListSRVIntegerTimestamp;
+ private static ScanResultValue listSRVIntegerTimestamp;
@BeforeClass
public static void setup()
@@ -48,73 +51,93 @@ public class ScanResultValueTest
String segmentId = "some_segment_id";
List<String> columns = new ArrayList<>(Arrays.asList(ColumnHolder.TIME_COLUMN_NAME, "name", "count"));
List<Object> event = new ArrayList<>(Arrays.asList(
- TIME_1,
+ TIME_1_LONG,
"Feridun",
4
));
List<Object> event2 = new ArrayList<>(Arrays.asList(
- TIME_2,
+ TIME_2_LONG,
"Justin",
6
));
List<List<Object>> events = Arrays.asList(event, event2);
- compactedListSRV = new ScanResultValue(segmentId, columns, events);
+ compactedListSRVLongTimestamp = new ScanResultValue(segmentId, columns, events);
+
+ List<Object> eventInt = new ArrayList<>(Arrays.asList(
+ TIME_3_INT,
+ "Feridun",
+ 4
+ ));
+
+ List<List<Object>> eventsInt = Arrays.asList(eventInt, event2);
+ compactedListSRVIntegerTimestamp = new ScanResultValue(segmentId, columns, eventsInt);
Map<String, Object> eventMap1 = new HashMap<>();
- eventMap1.put(ColumnHolder.TIME_COLUMN_NAME, TIME_1);
+ eventMap1.put(ColumnHolder.TIME_COLUMN_NAME, TIME_1_LONG);
eventMap1.put("name", "Feridun");
eventMap1.put("count", 4);
Map<String, Object> eventMap2 = new HashMap<>();
- eventMap2.put(ColumnHolder.TIME_COLUMN_NAME, TIME_2);
+ eventMap2.put(ColumnHolder.TIME_COLUMN_NAME, TIME_2_LONG);
eventMap2.put("name", "Justin");
eventMap2.put("count", 6);
List<Map<String, Object>> eventMaps = Arrays.asList(eventMap1, eventMap2);
- listSRV = new ScanResultValue(segmentId, columns, eventMaps);
+ listSRVLongTimestamp = new ScanResultValue(segmentId, columns, eventMaps);
+
+ Map<String, Object> eventMap3 = new HashMap<>();
+ eventMap3.put(ColumnHolder.TIME_COLUMN_NAME, TIME_3_INT);
+ eventMap3.put("name", "Justin");
+ eventMap3.put("count", 6);
+ List<Map<String, Object>> eventMapsInt = Arrays.asList(eventMap3, eventMap2);
+ listSRVIntegerTimestamp = new ScanResultValue(segmentId, columns, eventMapsInt);
}
@Test
public void testSerdeScanResultValueCompactedList() throws IOException
{
- String serialized = JSON_MAPPER.writeValueAsString(compactedListSRV);
+ String serialized = JSON_MAPPER.writeValueAsString(compactedListSRVLongTimestamp);
ScanResultValue deserialized = JSON_MAPPER.readValue(serialized, ScanResultValue.class);
- Assert.assertEquals(compactedListSRV, deserialized);
+ Assert.assertEquals(compactedListSRVLongTimestamp, deserialized);
}
@Test
public void testSerdeScanResultValueNonCompactedList() throws IOException
{
- String serialized = JSON_MAPPER.writeValueAsString(listSRV);
+ String serialized = JSON_MAPPER.writeValueAsString(listSRVLongTimestamp);
ScanResultValue deserialized = JSON_MAPPER.readValue(serialized, ScanResultValue.class);
- Assert.assertEquals(listSRV, deserialized);
+ Assert.assertEquals(listSRVLongTimestamp, deserialized);
}
@Test
public void testGetFirstEventTimestampCompactedList()
{
- long timestamp = compactedListSRV.getFirstEventTimestamp(ScanQuery.ResultFormat.RESULT_FORMAT_COMPACTED_LIST);
- Assert.assertEquals(TIME_1, timestamp);
+ long timestamp = compactedListSRVLongTimestamp.getFirstEventTimestamp(ScanQuery.ResultFormat.RESULT_FORMAT_COMPACTED_LIST);
+ Assert.assertEquals(TIME_1_LONG, timestamp);
+ long timestampInt = compactedListSRVIntegerTimestamp.getFirstEventTimestamp(ScanQuery.ResultFormat.RESULT_FORMAT_COMPACTED_LIST);
+ Assert.assertEquals(TIME_3_INT, timestampInt);
}
@Test
public void testGetFirstEventTimestampNonCompactedList()
{
- long timestamp = listSRV.getFirstEventTimestamp(ScanQuery.ResultFormat.RESULT_FORMAT_LIST);
- Assert.assertEquals(TIME_1, timestamp);
+ long timestamp = listSRVLongTimestamp.getFirstEventTimestamp(ScanQuery.ResultFormat.RESULT_FORMAT_LIST);
+ Assert.assertEquals(TIME_1_LONG, timestamp);
+ long timestampInt = listSRVIntegerTimestamp.getFirstEventTimestamp(ScanQuery.ResultFormat.RESULT_FORMAT_LIST);
+ Assert.assertEquals(TIME_3_INT, timestampInt);
}
@Test
public void testToSingleEventScanResultValues()
{
- List<ScanResultValue> compactedListScanResultValues = compactedListSRV.toSingleEventScanResultValues();
+ List<ScanResultValue> compactedListScanResultValues = compactedListSRVLongTimestamp.toSingleEventScanResultValues();
for (ScanResultValue srv : compactedListScanResultValues) {
List<Object> events = (List<Object>) srv.getEvents();
Assert.assertEquals(1, events.size());
}
- List<ScanResultValue> listScanResultValues = listSRV.toSingleEventScanResultValues();
- for (ScanResultValue srv : compactedListScanResultValues) {
+ List<ScanResultValue> listScanResultValues = listSRVLongTimestamp.toSingleEventScanResultValues();
+ for (ScanResultValue srv : listScanResultValues) {
List<Object> events = (List<Object>) srv.getEvents();
Assert.assertEquals(1, events.size());
}
diff --git a/server/src/main/java/org/apache/druid/segment/realtime/appenderator/SinkQuerySegmentWalker.java b/server/src/main/java/org/apache/druid/segment/realtime/appenderator/SinkQuerySegmentWalker.java
index 127c563..7d08c0f 100644
--- a/server/src/main/java/org/apache/druid/segment/realtime/appenderator/SinkQuerySegmentWalker.java
+++ b/server/src/main/java/org/apache/druid/segment/realtime/appenderator/SinkQuerySegmentWalker.java
@@ -29,6 +29,7 @@ import org.apache.druid.client.cache.CacheConfig;
import org.apache.druid.client.cache.CachePopulatorStats;
import org.apache.druid.client.cache.ForegroundCachePopulator;
import org.apache.druid.java.util.common.ISE;
+import org.apache.druid.java.util.common.Intervals;
import org.apache.druid.java.util.common.Pair;
import org.apache.druid.java.util.common.concurrent.Execs;
import org.apache.druid.java.util.common.guava.CloseQuietly;
@@ -49,6 +50,7 @@ import org.apache.druid.query.QuerySegmentWalker;
import org.apache.druid.query.QueryToolChest;
import org.apache.druid.query.ReportTimelineMissingSegmentQueryRunner;
import org.apache.druid.query.SegmentDescriptor;
+import org.apache.druid.query.SinkQueryRunners;
import org.apache.druid.query.TableDataSource;
import org.apache.druid.query.spec.SpecificSegmentQueryRunner;
import org.apache.druid.query.spec.SpecificSegmentSpec;
@@ -180,47 +182,40 @@ public class SinkQuerySegmentWalker implements QuerySegmentWalker
FunctionalIterable
.create(specs)
.transform(
- new Function<SegmentDescriptor, QueryRunner<T>>()
- {
- @Override
- public QueryRunner<T> apply(final SegmentDescriptor descriptor)
- {
- final PartitionHolder<Sink> holder = sinkTimeline.findEntry(
- descriptor.getInterval(),
- descriptor.getVersion()
- );
- if (holder == null) {
- return new ReportTimelineMissingSegmentQueryRunner<>(descriptor);
- }
+ descriptor -> {
+ final PartitionHolder<Sink> holder = sinkTimeline.findEntry(
+ descriptor.getInterval(),
+ descriptor.getVersion()
+ );
+ if (holder == null) {
+ return new ReportTimelineMissingSegmentQueryRunner<>(descriptor);
+ }
- final PartitionChunk<Sink> chunk = holder.getChunk(descriptor.getPartitionNumber());
- if (chunk == null) {
- return new ReportTimelineMissingSegmentQueryRunner<>(descriptor);
- }
+ final PartitionChunk<Sink> chunk = holder.getChunk(descriptor.getPartitionNumber());
+ if (chunk == null) {
+ return new ReportTimelineMissingSegmentQueryRunner<>(descriptor);
+ }
- final Sink theSink = chunk.getObject();
- final SegmentId sinkSegmentId = theSink.getSegment().getId();
+ final Sink theSink = chunk.getObject();
+ final SegmentId sinkSegmentId = theSink.getSegment().getId();
- return new SpecificSegmentQueryRunner<>(
- withPerSinkMetrics(
- new BySegmentQueryRunner<>(
- sinkSegmentId,
- descriptor.getInterval().getStart(),
- factory.mergeRunners(
- Execs.directExecutor(),
- Iterables.transform(
- theSink,
- new Function<FireHydrant, QueryRunner<T>>()
- {
- @Override
- public QueryRunner<T> apply(final FireHydrant hydrant)
- {
+ return new SpecificSegmentQueryRunner<>(
+ withPerSinkMetrics(
+ new BySegmentQueryRunner<>(
+ sinkSegmentId,
+ descriptor.getInterval().getStart(),
+ factory.mergeRunners(
+ Execs.directExecutor(),
+ new SinkQueryRunners<>(
+ Iterables.transform(
+ theSink,
+ hydrant -> {
// Hydrant might swap at any point, but if it's swapped at the start
// then we know it's *definitely* swapped.
final boolean hydrantDefinitelySwapped = hydrant.hasSwapped();
if (skipIncrementalSegment && !hydrantDefinitelySwapped) {
- return new NoopQueryRunner<>();
+ return new Pair<>(Intervals.ETERNITY, new NoopQueryRunner<>());
}
// Prevent the underlying segment from swapping when its being iterated
@@ -234,7 +229,7 @@ public class SinkQuerySegmentWalker implements QuerySegmentWalker
// 1) Only use caching if data is immutable
// 2) Hydrants are not the same between replicas, make sure cache is local
if (hydrantDefinitelySwapped && cache.isLocal()) {
- return new CachingQueryRunner<>(
+ QueryRunner<T> cachingRunner = new CachingQueryRunner<>(
makeHydrantCacheIdentifier(hydrant),
descriptor,
objectMapper,
@@ -249,8 +244,9 @@ public class SinkQuerySegmentWalker implements QuerySegmentWalker
),
cacheConfig
);
+ return new Pair<>(segment.lhs.getDataInterval(), cachingRunner);
} else {
- return baseRunner;
+ return new Pair<>(segment.lhs.getDataInterval(), baseRunner);
}
}
catch (RuntimeException e) {
@@ -258,17 +254,16 @@ public class SinkQuerySegmentWalker implements QuerySegmentWalker
throw e;
}
}
- }
- )
- )
- ),
- toolChest,
- sinkSegmentId,
- cpuTimeAccumulator
- ),
- new SpecificSegmentSpec(descriptor)
- );
- }
+ )
+ )
+ )
+ ),
+ toolChest,
+ sinkSegmentId,
+ cpuTimeAccumulator
+ ),
+ new SpecificSegmentSpec(descriptor)
+ );
}
)
)
diff --git a/server/src/main/java/org/apache/druid/segment/realtime/plumber/Sink.java b/server/src/main/java/org/apache/druid/segment/realtime/plumber/Sink.java
index d2d72ba..4e0c596 100644
--- a/server/src/main/java/org/apache/druid/segment/realtime/plumber/Sink.java
+++ b/server/src/main/java/org/apache/druid/segment/realtime/plumber/Sink.java
@@ -54,7 +54,8 @@ import java.util.concurrent.atomic.AtomicInteger;
public class Sink implements Iterable<FireHydrant>
{
- private static final IncrementalIndexAddResult ALREADY_SWAPPED = new IncrementalIndexAddResult(-1, -1, null, "write after index swapped");
+ private static final IncrementalIndexAddResult ALREADY_SWAPPED =
+ new IncrementalIndexAddResult(-1, -1, null, "write after index swapped");
private final Object hydrantLock = new Object();
private final Interval interval;
@@ -64,7 +65,7 @@ public class Sink implements Iterable<FireHydrant>
private final int maxRowsInMemory;
private final long maxBytesInMemory;
private final boolean reportParseExceptions;
- private final CopyOnWriteArrayList<FireHydrant> hydrants = new CopyOnWriteArrayList<FireHydrant>();
+ private final CopyOnWriteArrayList<FireHydrant> hydrants = new CopyOnWriteArrayList<>();
private final LinkedHashSet<String> dimOrder = new LinkedHashSet<>();
private final AtomicInteger numRowsExcludingCurrIndex = new AtomicInteger();
private volatile FireHydrant currHydrant;
diff --git a/server/src/test/java/org/apache/druid/segment/realtime/appenderator/AppenderatorTest.java b/server/src/test/java/org/apache/druid/segment/realtime/appenderator/AppenderatorTest.java
index 334214d..e9a168d 100644
--- a/server/src/test/java/org/apache/druid/segment/realtime/appenderator/AppenderatorTest.java
+++ b/server/src/test/java/org/apache/druid/segment/realtime/appenderator/AppenderatorTest.java
@@ -37,6 +37,8 @@ import org.apache.druid.query.QueryPlus;
import org.apache.druid.query.Result;
import org.apache.druid.query.SegmentDescriptor;
import org.apache.druid.query.aggregation.LongSumAggregatorFactory;
+import org.apache.druid.query.scan.ScanQuery;
+import org.apache.druid.query.scan.ScanResultValue;
import org.apache.druid.query.spec.MultipleSpecificSegmentSpec;
import org.apache.druid.query.timeseries.TimeseriesQuery;
import org.apache.druid.query.timeseries.TimeseriesResultValue;
@@ -49,6 +51,7 @@ import org.junit.Test;
import java.util.Arrays;
import java.util.Collections;
+import java.util.HashMap;
import java.util.List;
import java.util.Map;
import java.util.concurrent.ConcurrentHashMap;
@@ -730,7 +733,7 @@ public class AppenderatorTest
final List<Result<TimeseriesResultValue>> results3 =
QueryPlus.wrap(query3).run(appenderator, ImmutableMap.of()).toList();
Assert.assertEquals(
- "query2",
+ "query3",
ImmutableList.of(
new Result<>(
DateTimes.of("2001"),
@@ -739,6 +742,42 @@ public class AppenderatorTest
),
results3
);
+
+ final ScanQuery query4 = Druids.newScanQueryBuilder()
+ .dataSource(AppenderatorTester.DATASOURCE)
+ .intervals(
+ new MultipleSpecificSegmentSpec(
+ ImmutableList.of(
+ new SegmentDescriptor(
+ Intervals.of("2001/PT1H"),
+ IDENTIFIERS.get(2).getVersion(),
+ IDENTIFIERS.get(2).getShardSpec().getPartitionNum()
+ ),
+ new SegmentDescriptor(
+ Intervals.of("2001T03/PT1H"),
+ IDENTIFIERS.get(2).getVersion(),
+ IDENTIFIERS.get(2).getShardSpec().getPartitionNum()
+ )
+ )
+ )
+ )
+ .order(ScanQuery.Order.ASCENDING)
+ .batchSize(10)
+ .resultFormat(ScanQuery.ResultFormat.RESULT_FORMAT_COMPACTED_LIST)
+ .build();
+ final List<ScanResultValue> results4 =
+ QueryPlus.wrap(query4).run(appenderator, new HashMap<>()).toList();
+ Assert.assertEquals(2, results4.size()); // 2 segments, 1 row per segment
+ Assert.assertArrayEquals(new String[]{"__time", "dim", "count", "met"}, results4.get(0).getColumns().toArray());
+ Assert.assertArrayEquals(
+ new Object[]{DateTimes.of("2001").getMillis(), "foo", 1L, 8L},
+ ((List<Object>) ((List<Object>) results4.get(0).getEvents()).get(0)).toArray()
+ );
+ Assert.assertArrayEquals(new String[]{"__time", "dim", "count", "met"}, results4.get(0).getColumns().toArray());
+ Assert.assertArrayEquals(
+ new Object[]{DateTimes.of("2001T03").getMillis(), "foo", 1L, 64L},
+ ((List<Object>) ((List<Object>) results4.get(1).getEvents()).get(0)).toArray()
+ );
}
}
diff --git a/server/src/test/java/org/apache/druid/segment/realtime/appenderator/AppenderatorTester.java b/server/src/test/java/org/apache/druid/segment/realtime/appenderator/AppenderatorTester.java
index 3b6a9d9..af706dd 100644
--- a/server/src/test/java/org/apache/druid/segment/realtime/appenderator/AppenderatorTester.java
+++ b/server/src/test/java/org/apache/druid/segment/realtime/appenderator/AppenderatorTester.java
@@ -35,12 +35,18 @@ import org.apache.druid.java.util.common.granularity.Granularities;
import org.apache.druid.java.util.emitter.EmittingLogger;
import org.apache.druid.java.util.emitter.core.NoopEmitter;
import org.apache.druid.java.util.emitter.service.ServiceEmitter;
+import org.apache.druid.query.DefaultGenericQueryMetricsFactory;
import org.apache.druid.query.DefaultQueryRunnerFactoryConglomerate;
import org.apache.druid.query.IntervalChunkingQueryRunnerDecorator;
import org.apache.druid.query.QueryRunnerTestHelper;
import org.apache.druid.query.aggregation.AggregatorFactory;
import org.apache.druid.query.aggregation.CountAggregatorFactory;
import org.apache.druid.query.aggregation.LongSumAggregatorFactory;
+import org.apache.druid.query.scan.ScanQuery;
+import org.apache.druid.query.scan.ScanQueryConfig;
+import org.apache.druid.query.scan.ScanQueryEngine;
+import org.apache.druid.query.scan.ScanQueryQueryToolChest;
+import org.apache.druid.query.scan.ScanQueryRunnerFactory;
import org.apache.druid.query.timeseries.TimeseriesQuery;
import org.apache.druid.query.timeseries.TimeseriesQueryEngine;
import org.apache.druid.query.timeseries.TimeseriesQueryQueryToolChest;
@@ -48,6 +54,7 @@ import org.apache.druid.query.timeseries.TimeseriesQueryRunnerFactory;
import org.apache.druid.segment.IndexIO;
import org.apache.druid.segment.IndexMerger;
import org.apache.druid.segment.IndexMergerV9;
+import org.apache.druid.segment.TestHelper;
import org.apache.druid.segment.column.ColumnConfig;
import org.apache.druid.segment.indexing.DataSchema;
import org.apache.druid.segment.indexing.RealtimeTuningConfig;
@@ -241,6 +248,14 @@ public class AppenderatorTester implements AutoCloseable
),
new TimeseriesQueryEngine(),
QueryRunnerTestHelper.NOOP_QUERYWATCHER
+ ),
+ ScanQuery.class, new ScanQueryRunnerFactory(
+ new ScanQueryQueryToolChest(
+ new ScanQueryConfig(),
+ new DefaultGenericQueryMetricsFactory(TestHelper.makeJsonMapper())
+ ),
+ new ScanQueryEngine(),
+ new ScanQueryConfig()
)
)
),
---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscribe@druid.apache.org
For additional commands, e-mail: commits-help@druid.apache.org