You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@druid.apache.org by jo...@apache.org on 2019/04/26 23:12:16 UTC

[incubator-druid] branch master updated: Fix time-ordered scan queries on realtime segments (#7546)

This is an automated email from the ASF dual-hosted git repository.

jonwei pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/incubator-druid.git


The following commit(s) were added to refs/heads/master by this push:
     new 07dd742  Fix time-ordered scan queries on realtime segments (#7546)
07dd742 is described below

commit 07dd742e357b5731e8848c943c2a641705a4785a
Author: Justin Borromeo <jb...@edu.uwaterloo.ca>
AuthorDate: Fri Apr 26 16:12:10 2019 -0700

    Fix time-ordered scan queries on realtime segments (#7546)
    
    * Initial commit
    
    * Added test for int to long conversion
    
    * Add appenderator test for realtime scan query
    
    * get rid of todo
    
    * Fix forbidden apis
    
    * Jon's recommendations
    
    * Formatting
---
 .../org/apache/druid/query/SinkQueryRunners.java   | 60 +++++++++++++++
 .../druid/query/scan/ScanQueryRunnerFactory.java   | 70 +++++++++--------
 .../apache/druid/query/scan/ScanResultValue.java   | 10 +--
 .../query/scan/ScanQueryRunnerFactoryTest.java     | 22 +++---
 .../druid/query/scan/ScanResultValueTest.java      | 65 ++++++++++------
 .../appenderator/SinkQuerySegmentWalker.java       | 87 ++++++++++------------
 .../druid/segment/realtime/plumber/Sink.java       |  5 +-
 .../realtime/appenderator/AppenderatorTest.java    | 41 +++++++++-
 .../realtime/appenderator/AppenderatorTester.java  | 15 ++++
 9 files changed, 257 insertions(+), 118 deletions(-)

diff --git a/processing/src/main/java/org/apache/druid/query/SinkQueryRunners.java b/processing/src/main/java/org/apache/druid/query/SinkQueryRunners.java
new file mode 100644
index 0000000..9cec39a
--- /dev/null
+++ b/processing/src/main/java/org/apache/druid/query/SinkQueryRunners.java
@@ -0,0 +1,60 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.druid.query;
+
+import org.apache.druid.java.util.common.Pair;
+import org.joda.time.Interval;
+
+import java.util.Iterator;
+
+public class SinkQueryRunners<T> implements Iterable<QueryRunner<T>>
+{
+  Iterable<Pair<Interval, QueryRunner<T>>> runners;
+
+  public SinkQueryRunners(Iterable<Pair<Interval, QueryRunner<T>>> runners)
+  {
+    this.runners = runners;
+  }
+
+  public Iterator<Pair<Interval, QueryRunner<T>>> runnerIntervalMappingIterator()
+  {
+    return runners.iterator();
+  }
+
+  @Override
+  public Iterator<QueryRunner<T>> iterator()
+  {
+    Iterator<Pair<Interval, QueryRunner<T>>> runnerIntervalIterator = runners.iterator();
+    return new Iterator<QueryRunner<T>>()
+    {
+      @Override
+      public boolean hasNext()
+      {
+        return runnerIntervalIterator.hasNext();
+      }
+
+      @Override
+      public QueryRunner<T> next()
+      {
+        return runnerIntervalIterator.next().rhs;
+      }
+    };
+  }
+}
diff --git a/processing/src/main/java/org/apache/druid/query/scan/ScanQueryRunnerFactory.java b/processing/src/main/java/org/apache/druid/query/scan/ScanQueryRunnerFactory.java
index 0a9f3b9..c34d58c 100644
--- a/processing/src/main/java/org/apache/druid/query/scan/ScanQueryRunnerFactory.java
+++ b/processing/src/main/java/org/apache/druid/query/scan/ScanQueryRunnerFactory.java
@@ -20,7 +20,6 @@
 package org.apache.druid.query.scan;
 
 import com.google.common.annotations.VisibleForTesting;
-import com.google.common.base.Preconditions;
 import com.google.common.collect.Lists;
 import com.google.common.collect.Ordering;
 import com.google.inject.Inject;
@@ -39,6 +38,7 @@ import org.apache.druid.query.QueryRunner;
 import org.apache.druid.query.QueryRunnerFactory;
 import org.apache.druid.query.QueryToolChest;
 import org.apache.druid.query.SegmentDescriptor;
+import org.apache.druid.query.SinkQueryRunners;
 import org.apache.druid.query.spec.MultipleSpecificSegmentSpec;
 import org.apache.druid.query.spec.QuerySegmentSpec;
 import org.apache.druid.query.spec.SpecificSegmentSpec;
@@ -114,11 +114,11 @@ public class ScanQueryRunnerFactory implements QueryRunnerFactory<ScanResultValu
           return returnedRows;
         }
       } else {
-        List<SegmentDescriptor> descriptorsOrdered = getSegmentDescriptorsFromSpecificQuerySpec(query.getQuerySegmentSpec());
+        List<Interval> intervalsOrdered = getIntervalsFromSpecificQuerySpec(query.getQuerySegmentSpec());
         List<QueryRunner<ScanResultValue>> queryRunnersOrdered = Lists.newArrayList(queryRunners);
 
         if (query.getOrder().equals(ScanQuery.Order.DESCENDING)) {
-          descriptorsOrdered = Lists.reverse(descriptorsOrdered);
+          intervalsOrdered = Lists.reverse(intervalsOrdered);
           queryRunnersOrdered = Lists.reverse(queryRunnersOrdered);
         }
         int maxRowsQueuedForOrdering = (query.getMaxRowsQueuedForOrdering() == null
@@ -132,28 +132,29 @@ public class ScanQueryRunnerFactory implements QueryRunnerFactory<ScanResultValu
                   input -> input.run(queryPlus, responseContext)
               )),
               query,
-              descriptorsOrdered
+              intervalsOrdered
           );
         } else {
-          Preconditions.checkState(
-              descriptorsOrdered.size() == queryRunnersOrdered.size(),
-              "Number of segment descriptors does not equal number of "
-              + "query runners...something went wrong!"
-          );
-
-          // Combine the two lists of segment descriptors and query runners into a single list of
-          // segment descriptors - query runner pairs.  This makes it easier to use stream operators.
-          List<Pair<SegmentDescriptor, QueryRunner<ScanResultValue>>> descriptorsAndRunnersOrdered = new ArrayList<>();
-          for (int i = 0; i < queryRunnersOrdered.size(); i++) {
-            descriptorsAndRunnersOrdered.add(new Pair<>(descriptorsOrdered.get(i), queryRunnersOrdered.get(i)));
+          // Use n-way merge strategy
+          List<Pair<Interval, QueryRunner<ScanResultValue>>> intervalsAndRunnersOrdered = new ArrayList<>();
+          if (intervalsOrdered.size() == queryRunnersOrdered.size()) {
+            for (int i = 0; i < queryRunnersOrdered.size(); i++) {
+              intervalsAndRunnersOrdered.add(new Pair<>(intervalsOrdered.get(i), queryRunnersOrdered.get(i)));
+            }
+          } else if (queryRunners instanceof SinkQueryRunners) {
+            ((SinkQueryRunners<ScanResultValue>) queryRunners).runnerIntervalMappingIterator()
+                                                              .forEachRemaining(intervalsAndRunnersOrdered::add);
+          } else {
+            throw new ISE("Number of segment descriptors does not equal number of "
+                          + "query runners...something went wrong!");
           }
 
           // Group the list of pairs by interval.  The LinkedHashMap will have an interval paired with a list of all the
           // query runners for that segment
-          LinkedHashMap<Interval, List<Pair<SegmentDescriptor, QueryRunner<ScanResultValue>>>> partitionsGroupedByInterval =
-              descriptorsAndRunnersOrdered.stream()
+          LinkedHashMap<Interval, List<Pair<Interval, QueryRunner<ScanResultValue>>>> partitionsGroupedByInterval =
+              intervalsAndRunnersOrdered.stream()
                                           .collect(Collectors.groupingBy(
-                                              x -> x.lhs.getInterval(),
+                                              x -> x.lhs,
                                               LinkedHashMap::new,
                                               Collectors.toList()
                                           ));
@@ -167,9 +168,9 @@ public class ScanQueryRunnerFactory implements QueryRunnerFactory<ScanResultValu
                                          .max(Comparator.comparing(Integer::valueOf))
                                          .get();
 
-          int segmentPartitionLimit = (query.getMaxSegmentPartitionsOrderedInMemory() == null
-                                       ? scanQueryConfig.getMaxSegmentPartitionsOrderedInMemory()
-                                       : query.getMaxSegmentPartitionsOrderedInMemory());
+          int segmentPartitionLimit = query.getMaxSegmentPartitionsOrderedInMemory() == null
+                                      ? scanQueryConfig.getMaxSegmentPartitionsOrderedInMemory()
+                                      : query.getMaxSegmentPartitionsOrderedInMemory();
           if (maxNumPartitionsInSegment <= segmentPartitionLimit) {
             // Use n-way merge strategy
 
@@ -205,7 +206,7 @@ public class ScanQueryRunnerFactory implements QueryRunnerFactory<ScanResultValu
   Sequence<ScanResultValue> priorityQueueSortAndLimit(
       Sequence<ScanResultValue> inputSequence,
       ScanQuery scanQuery,
-      List<SegmentDescriptor> descriptorsOrdered
+      List<Interval> intervalsOrdered
   )
   {
     Comparator<ScanResultValue> priorityQComparator = new ScanResultValueTimestampComparator(scanQuery);
@@ -254,9 +255,9 @@ public class ScanQueryRunnerFactory implements QueryRunnerFactory<ScanResultValu
         // Finish scanning the interval containing the limit row
         if (numRowsScanned > limit && finalInterval == null) {
           long timestampOfLimitRow = srv.getFirstEventTimestamp(scanQuery.getResultFormat());
-          for (SegmentDescriptor descriptor : descriptorsOrdered) {
-            if (descriptor.getInterval().contains(timestampOfLimitRow)) {
-              finalInterval = descriptor.getInterval();
+          for (Interval interval : intervalsOrdered) {
+            if (interval.contains(timestampOfLimitRow)) {
+              finalInterval = interval;
             }
           }
           if (finalInterval == null) {
@@ -280,23 +281,28 @@ public class ScanQueryRunnerFactory implements QueryRunnerFactory<ScanResultValu
   }
 
   @VisibleForTesting
-  List<SegmentDescriptor> getSegmentDescriptorsFromSpecificQuerySpec(QuerySegmentSpec spec)
+  List<Interval> getIntervalsFromSpecificQuerySpec(QuerySegmentSpec spec)
   {
     // Query segment spec must be an instance of MultipleSpecificSegmentSpec or SpecificSegmentSpec because
     // segment descriptors need to be present for a 1:1 matching of intervals with query runners.
     // The other types of segment spec condense the intervals (i.e. merge neighbouring intervals), eliminating
     // the 1:1 relationship between intervals and query runners.
-    List<SegmentDescriptor> descriptorsOrdered;
+    List<Interval> descriptorsOrdered;
 
     if (spec instanceof MultipleSpecificSegmentSpec) {
       // Ascending time order for both descriptors and query runners by default
-      descriptorsOrdered = ((MultipleSpecificSegmentSpec) spec).getDescriptors();
+      descriptorsOrdered = ((MultipleSpecificSegmentSpec) spec).getDescriptors()
+                                                               .stream()
+                                                               .map(SegmentDescriptor::getInterval)
+                                                               .collect(Collectors.toList());
     } else if (spec instanceof SpecificSegmentSpec) {
-      descriptorsOrdered = Collections.singletonList(((SpecificSegmentSpec) spec).getDescriptor());
+      descriptorsOrdered = Collections.singletonList(((SpecificSegmentSpec) spec).getDescriptor().getInterval());
     } else {
-      throw new UOE("Time-ordering on scan queries is only supported for queries with segment specs"
-                    + "of type MultipleSpecificSegmentSpec or SpecificSegmentSpec...a [%s] was received instead.",
-                    spec.getClass().getSimpleName());
+      throw new UOE(
+          "Time-ordering on scan queries is only supported for queries with segment specs"
+          + "of type MultipleSpecificSegmentSpec or SpecificSegmentSpec...a [%s] was received instead.",
+          spec.getClass().getSimpleName()
+      );
     }
     return descriptorsOrdered;
   }
diff --git a/processing/src/main/java/org/apache/druid/query/scan/ScanResultValue.java b/processing/src/main/java/org/apache/druid/query/scan/ScanResultValue.java
index a05e715..8673b34 100644
--- a/processing/src/main/java/org/apache/druid/query/scan/ScanResultValue.java
+++ b/processing/src/main/java/org/apache/druid/query/scan/ScanResultValue.java
@@ -23,6 +23,7 @@ import com.fasterxml.jackson.annotation.JsonCreator;
 import com.fasterxml.jackson.annotation.JsonProperty;
 import org.apache.druid.java.util.common.ISE;
 import org.apache.druid.java.util.common.UOE;
+import org.apache.druid.segment.DimensionHandlerUtils;
 import org.apache.druid.segment.column.ColumnHolder;
 
 import javax.annotation.Nullable;
@@ -79,18 +80,18 @@ public class ScanResultValue implements Comparable<ScanResultValue>
   public long getFirstEventTimestamp(ScanQuery.ResultFormat resultFormat)
   {
     if (resultFormat.equals(ScanQuery.ResultFormat.RESULT_FORMAT_LIST)) {
-      Long timestamp = (Long) ((Map<String, Object>) ((List<Object>) this.getEvents()).get(0)).get(ColumnHolder.TIME_COLUMN_NAME);
-      if (timestamp == null) {
+      Object timestampObj = ((Map<String, Object>) ((List<Object>) this.getEvents()).get(0)).get(ColumnHolder.TIME_COLUMN_NAME);
+      if (timestampObj == null) {
         throw new ISE("Unable to compare timestamp for rows without a time column");
       }
-      return timestamp;
+      return DimensionHandlerUtils.convertObjectToLong(timestampObj);
     } else if (resultFormat.equals(ScanQuery.ResultFormat.RESULT_FORMAT_COMPACTED_LIST)) {
       int timeColumnIndex = this.getColumns().indexOf(ColumnHolder.TIME_COLUMN_NAME);
       if (timeColumnIndex == -1) {
         throw new ISE("Unable to compare timestamp for rows without a time column");
       }
       List<Object> firstEvent = (List<Object>) ((List<Object>) this.getEvents()).get(0);
-      return (Long) firstEvent.get(timeColumnIndex);
+      return DimensionHandlerUtils.convertObjectToLong(firstEvent.get(timeColumnIndex));
     }
     throw new UOE("Unable to get first event timestamp using result format of [%s]", resultFormat.toString());
   }
@@ -105,7 +106,6 @@ public class ScanResultValue implements Comparable<ScanResultValue>
     return singleEventScanResultValues;
   }
 
-
   @Override
   public boolean equals(Object o)
   {
diff --git a/processing/src/test/java/org/apache/druid/query/scan/ScanQueryRunnerFactoryTest.java b/processing/src/test/java/org/apache/druid/query/scan/ScanQueryRunnerFactoryTest.java
index 6559c65..4b65f53 100644
--- a/processing/src/test/java/org/apache/druid/query/scan/ScanQueryRunnerFactoryTest.java
+++ b/processing/src/test/java/org/apache/druid/query/scan/ScanQueryRunnerFactoryTest.java
@@ -140,10 +140,10 @@ public class ScanQueryRunnerFactoryTest
         List<ScanResultValue> output = factory.priorityQueueSortAndLimit(
             inputSequence,
             query,
-            ImmutableList.of(new SegmentDescriptor(new Interval(
+            ImmutableList.of(new Interval(
                 DateTimes.of("2010-01-01"),
                 DateTimes.of("2019-01-01").plusHours(1)
-            ), "1", 0))
+            ))
         ).toList();
         if (query.getLimit() > Integer.MAX_VALUE) {
           Assert.fail("Unsupported exception should have been thrown due to high limit");
@@ -275,7 +275,7 @@ public class ScanQueryRunnerFactoryTest
     ), "1", 0);
 
     @Test
-    public void testGetValidSegmentDescriptorsFromSpec()
+    public void testGetValidIntervalsFromSpec()
     {
       QuerySegmentSpec multiSpecificSpec = new MultipleSpecificSegmentSpec(
           Collections.singletonList(
@@ -284,13 +284,13 @@ public class ScanQueryRunnerFactoryTest
       );
       QuerySegmentSpec singleSpecificSpec = new SpecificSegmentSpec(descriptor);
 
-      List<SegmentDescriptor> descriptors = factory.getSegmentDescriptorsFromSpecificQuerySpec(multiSpecificSpec);
-      Assert.assertEquals(1, descriptors.size());
-      Assert.assertEquals(descriptor, descriptors.get(0));
+      List<Interval> intervals = factory.getIntervalsFromSpecificQuerySpec(multiSpecificSpec);
+      Assert.assertEquals(1, intervals.size());
+      Assert.assertEquals(descriptor.getInterval(), intervals.get(0));
 
-      descriptors = factory.getSegmentDescriptorsFromSpecificQuerySpec(singleSpecificSpec);
-      Assert.assertEquals(1, descriptors.size());
-      Assert.assertEquals(descriptor, descriptors.get(0));
+      intervals = factory.getIntervalsFromSpecificQuerySpec(singleSpecificSpec);
+      Assert.assertEquals(1, intervals.size());
+      Assert.assertEquals(descriptor.getInterval(), intervals.get(0));
     }
 
     @Test(expected = UOE.class)
@@ -304,7 +304,7 @@ public class ScanQueryRunnerFactoryTest
               )
           )
       );
-      factory.getSegmentDescriptorsFromSpecificQuerySpec(multiIntervalSpec);
+      factory.getIntervalsFromSpecificQuerySpec(multiIntervalSpec);
     }
 
     @Test(expected = UOE.class)
@@ -316,7 +316,7 @@ public class ScanQueryRunnerFactoryTest
               DateTimes.of("2019-01-01").plusHours(1)
           )
       );
-      factory.getSegmentDescriptorsFromSpecificQuerySpec(legacySpec);
+      factory.getIntervalsFromSpecificQuerySpec(legacySpec);
     }
   }
 }
diff --git a/processing/src/test/java/org/apache/druid/query/scan/ScanResultValueTest.java b/processing/src/test/java/org/apache/druid/query/scan/ScanResultValueTest.java
index 47f82ad..90566aa 100644
--- a/processing/src/test/java/org/apache/druid/query/scan/ScanResultValueTest.java
+++ b/processing/src/test/java/org/apache/druid/query/scan/ScanResultValueTest.java
@@ -36,11 +36,14 @@ import java.util.Map;
 public class ScanResultValueTest
 {
   private static final ObjectMapper JSON_MAPPER = new DefaultObjectMapper();
-  private static final long TIME_1 = 1234567890000L;
-  private static final long TIME_2 = 9876543210000L;
+  private static final long TIME_1_LONG = 1234567890000L;
+  private static final long TIME_2_LONG = 9876543210000L;
+  private static final int TIME_3_INT = Integer.MAX_VALUE;
 
-  private static ScanResultValue compactedListSRV;
-  private static ScanResultValue listSRV;
+  private static ScanResultValue compactedListSRVLongTimestamp;
+  private static ScanResultValue listSRVLongTimestamp;
+  private static ScanResultValue compactedListSRVIntegerTimestamp;
+  private static ScanResultValue listSRVIntegerTimestamp;
 
   @BeforeClass
   public static void setup()
@@ -48,73 +51,93 @@ public class ScanResultValueTest
     String segmentId = "some_segment_id";
     List<String> columns = new ArrayList<>(Arrays.asList(ColumnHolder.TIME_COLUMN_NAME, "name", "count"));
     List<Object> event = new ArrayList<>(Arrays.asList(
-        TIME_1,
+        TIME_1_LONG,
         "Feridun",
         4
     ));
     List<Object> event2 = new ArrayList<>(Arrays.asList(
-        TIME_2,
+        TIME_2_LONG,
         "Justin",
         6
     ));
 
     List<List<Object>> events = Arrays.asList(event, event2);
-    compactedListSRV = new ScanResultValue(segmentId, columns, events);
+    compactedListSRVLongTimestamp = new ScanResultValue(segmentId, columns, events);
+
+    List<Object> eventInt = new ArrayList<>(Arrays.asList(
+        TIME_3_INT,
+        "Feridun",
+        4
+    ));
+
+    List<List<Object>> eventsInt = Arrays.asList(eventInt, event2);
+    compactedListSRVIntegerTimestamp = new ScanResultValue(segmentId, columns, eventsInt);
 
     Map<String, Object> eventMap1 = new HashMap<>();
-    eventMap1.put(ColumnHolder.TIME_COLUMN_NAME, TIME_1);
+    eventMap1.put(ColumnHolder.TIME_COLUMN_NAME, TIME_1_LONG);
     eventMap1.put("name", "Feridun");
     eventMap1.put("count", 4);
     Map<String, Object> eventMap2 = new HashMap<>();
-    eventMap2.put(ColumnHolder.TIME_COLUMN_NAME, TIME_2);
+    eventMap2.put(ColumnHolder.TIME_COLUMN_NAME, TIME_2_LONG);
     eventMap2.put("name", "Justin");
     eventMap2.put("count", 6);
     List<Map<String, Object>> eventMaps = Arrays.asList(eventMap1, eventMap2);
-    listSRV = new ScanResultValue(segmentId, columns, eventMaps);
+    listSRVLongTimestamp = new ScanResultValue(segmentId, columns, eventMaps);
+
+    Map<String, Object> eventMap3 = new HashMap<>();
+    eventMap3.put(ColumnHolder.TIME_COLUMN_NAME, TIME_3_INT);
+    eventMap3.put("name", "Justin");
+    eventMap3.put("count", 6);
+    List<Map<String, Object>> eventMapsInt = Arrays.asList(eventMap3, eventMap2);
+    listSRVIntegerTimestamp = new ScanResultValue(segmentId, columns, eventMapsInt);
   }
 
   @Test
   public void testSerdeScanResultValueCompactedList() throws IOException
   {
 
-    String serialized = JSON_MAPPER.writeValueAsString(compactedListSRV);
+    String serialized = JSON_MAPPER.writeValueAsString(compactedListSRVLongTimestamp);
     ScanResultValue deserialized = JSON_MAPPER.readValue(serialized, ScanResultValue.class);
-    Assert.assertEquals(compactedListSRV, deserialized);
+    Assert.assertEquals(compactedListSRVLongTimestamp, deserialized);
   }
 
   @Test
   public void testSerdeScanResultValueNonCompactedList() throws IOException
   {
 
-    String serialized = JSON_MAPPER.writeValueAsString(listSRV);
+    String serialized = JSON_MAPPER.writeValueAsString(listSRVLongTimestamp);
     ScanResultValue deserialized = JSON_MAPPER.readValue(serialized, ScanResultValue.class);
-    Assert.assertEquals(listSRV, deserialized);
+    Assert.assertEquals(listSRVLongTimestamp, deserialized);
   }
 
   @Test
   public void testGetFirstEventTimestampCompactedList()
   {
-    long timestamp = compactedListSRV.getFirstEventTimestamp(ScanQuery.ResultFormat.RESULT_FORMAT_COMPACTED_LIST);
-    Assert.assertEquals(TIME_1, timestamp);
+    long timestamp = compactedListSRVLongTimestamp.getFirstEventTimestamp(ScanQuery.ResultFormat.RESULT_FORMAT_COMPACTED_LIST);
+    Assert.assertEquals(TIME_1_LONG, timestamp);
+    long timestampInt = compactedListSRVIntegerTimestamp.getFirstEventTimestamp(ScanQuery.ResultFormat.RESULT_FORMAT_COMPACTED_LIST);
+    Assert.assertEquals(TIME_3_INT, timestampInt);
   }
 
   @Test
   public void testGetFirstEventTimestampNonCompactedList()
   {
-    long timestamp = listSRV.getFirstEventTimestamp(ScanQuery.ResultFormat.RESULT_FORMAT_LIST);
-    Assert.assertEquals(TIME_1, timestamp);
+    long timestamp = listSRVLongTimestamp.getFirstEventTimestamp(ScanQuery.ResultFormat.RESULT_FORMAT_LIST);
+    Assert.assertEquals(TIME_1_LONG, timestamp);
+    long timestampInt = listSRVIntegerTimestamp.getFirstEventTimestamp(ScanQuery.ResultFormat.RESULT_FORMAT_LIST);
+    Assert.assertEquals(TIME_3_INT, timestampInt);
   }
 
   @Test
   public void testToSingleEventScanResultValues()
   {
-    List<ScanResultValue> compactedListScanResultValues = compactedListSRV.toSingleEventScanResultValues();
+    List<ScanResultValue> compactedListScanResultValues = compactedListSRVLongTimestamp.toSingleEventScanResultValues();
     for (ScanResultValue srv : compactedListScanResultValues) {
       List<Object> events = (List<Object>) srv.getEvents();
       Assert.assertEquals(1, events.size());
     }
-    List<ScanResultValue> listScanResultValues = listSRV.toSingleEventScanResultValues();
-    for (ScanResultValue srv : compactedListScanResultValues) {
+    List<ScanResultValue> listScanResultValues = listSRVLongTimestamp.toSingleEventScanResultValues();
+    for (ScanResultValue srv : listScanResultValues) {
       List<Object> events = (List<Object>) srv.getEvents();
       Assert.assertEquals(1, events.size());
     }
diff --git a/server/src/main/java/org/apache/druid/segment/realtime/appenderator/SinkQuerySegmentWalker.java b/server/src/main/java/org/apache/druid/segment/realtime/appenderator/SinkQuerySegmentWalker.java
index 127c563..7d08c0f 100644
--- a/server/src/main/java/org/apache/druid/segment/realtime/appenderator/SinkQuerySegmentWalker.java
+++ b/server/src/main/java/org/apache/druid/segment/realtime/appenderator/SinkQuerySegmentWalker.java
@@ -29,6 +29,7 @@ import org.apache.druid.client.cache.CacheConfig;
 import org.apache.druid.client.cache.CachePopulatorStats;
 import org.apache.druid.client.cache.ForegroundCachePopulator;
 import org.apache.druid.java.util.common.ISE;
+import org.apache.druid.java.util.common.Intervals;
 import org.apache.druid.java.util.common.Pair;
 import org.apache.druid.java.util.common.concurrent.Execs;
 import org.apache.druid.java.util.common.guava.CloseQuietly;
@@ -49,6 +50,7 @@ import org.apache.druid.query.QuerySegmentWalker;
 import org.apache.druid.query.QueryToolChest;
 import org.apache.druid.query.ReportTimelineMissingSegmentQueryRunner;
 import org.apache.druid.query.SegmentDescriptor;
+import org.apache.druid.query.SinkQueryRunners;
 import org.apache.druid.query.TableDataSource;
 import org.apache.druid.query.spec.SpecificSegmentQueryRunner;
 import org.apache.druid.query.spec.SpecificSegmentSpec;
@@ -180,47 +182,40 @@ public class SinkQuerySegmentWalker implements QuerySegmentWalker
                 FunctionalIterable
                     .create(specs)
                     .transform(
-                        new Function<SegmentDescriptor, QueryRunner<T>>()
-                        {
-                          @Override
-                          public QueryRunner<T> apply(final SegmentDescriptor descriptor)
-                          {
-                            final PartitionHolder<Sink> holder = sinkTimeline.findEntry(
-                                descriptor.getInterval(),
-                                descriptor.getVersion()
-                            );
-                            if (holder == null) {
-                              return new ReportTimelineMissingSegmentQueryRunner<>(descriptor);
-                            }
+                        descriptor -> {
+                          final PartitionHolder<Sink> holder = sinkTimeline.findEntry(
+                              descriptor.getInterval(),
+                              descriptor.getVersion()
+                          );
+                          if (holder == null) {
+                            return new ReportTimelineMissingSegmentQueryRunner<>(descriptor);
+                          }
 
-                            final PartitionChunk<Sink> chunk = holder.getChunk(descriptor.getPartitionNumber());
-                            if (chunk == null) {
-                              return new ReportTimelineMissingSegmentQueryRunner<>(descriptor);
-                            }
+                          final PartitionChunk<Sink> chunk = holder.getChunk(descriptor.getPartitionNumber());
+                          if (chunk == null) {
+                            return new ReportTimelineMissingSegmentQueryRunner<>(descriptor);
+                          }
 
-                            final Sink theSink = chunk.getObject();
-                            final SegmentId sinkSegmentId = theSink.getSegment().getId();
+                          final Sink theSink = chunk.getObject();
+                          final SegmentId sinkSegmentId = theSink.getSegment().getId();
 
-                            return new SpecificSegmentQueryRunner<>(
-                                withPerSinkMetrics(
-                                    new BySegmentQueryRunner<>(
-                                        sinkSegmentId,
-                                        descriptor.getInterval().getStart(),
-                                        factory.mergeRunners(
-                                            Execs.directExecutor(),
-                                            Iterables.transform(
-                                                theSink,
-                                                new Function<FireHydrant, QueryRunner<T>>()
-                                                {
-                                                  @Override
-                                                  public QueryRunner<T> apply(final FireHydrant hydrant)
-                                                  {
+                          return new SpecificSegmentQueryRunner<>(
+                              withPerSinkMetrics(
+                                  new BySegmentQueryRunner<>(
+                                      sinkSegmentId,
+                                      descriptor.getInterval().getStart(),
+                                      factory.mergeRunners(
+                                          Execs.directExecutor(),
+                                          new SinkQueryRunners<>(
+                                              Iterables.transform(
+                                                  theSink,
+                                                  hydrant -> {
                                                     // Hydrant might swap at any point, but if it's swapped at the start
                                                     // then we know it's *definitely* swapped.
                                                     final boolean hydrantDefinitelySwapped = hydrant.hasSwapped();
 
                                                     if (skipIncrementalSegment && !hydrantDefinitelySwapped) {
-                                                      return new NoopQueryRunner<>();
+                                                      return new Pair<>(Intervals.ETERNITY, new NoopQueryRunner<>());
                                                     }
 
                                                     // Prevent the underlying segment from swapping when its being iterated
@@ -234,7 +229,7 @@ public class SinkQuerySegmentWalker implements QuerySegmentWalker
                                                       // 1) Only use caching if data is immutable
                                                       // 2) Hydrants are not the same between replicas, make sure cache is local
                                                       if (hydrantDefinitelySwapped && cache.isLocal()) {
-                                                        return new CachingQueryRunner<>(
+                                                        QueryRunner<T> cachingRunner = new CachingQueryRunner<>(
                                                             makeHydrantCacheIdentifier(hydrant),
                                                             descriptor,
                                                             objectMapper,
@@ -249,8 +244,9 @@ public class SinkQuerySegmentWalker implements QuerySegmentWalker
                                                             ),
                                                             cacheConfig
                                                         );
+                                                        return new Pair<>(segment.lhs.getDataInterval(), cachingRunner);
                                                       } else {
-                                                        return baseRunner;
+                                                        return new Pair<>(segment.lhs.getDataInterval(), baseRunner);
                                                       }
                                                     }
                                                     catch (RuntimeException e) {
@@ -258,17 +254,16 @@ public class SinkQuerySegmentWalker implements QuerySegmentWalker
                                                       throw e;
                                                     }
                                                   }
-                                                }
-                                            )
-                                        )
-                                    ),
-                                    toolChest,
-                                    sinkSegmentId,
-                                    cpuTimeAccumulator
-                                ),
-                                new SpecificSegmentSpec(descriptor)
-                            );
-                          }
+                                              )
+                                          )
+                                      )
+                                  ),
+                                  toolChest,
+                                  sinkSegmentId,
+                                  cpuTimeAccumulator
+                              ),
+                              new SpecificSegmentSpec(descriptor)
+                          );
                         }
                     )
             )
diff --git a/server/src/main/java/org/apache/druid/segment/realtime/plumber/Sink.java b/server/src/main/java/org/apache/druid/segment/realtime/plumber/Sink.java
index d2d72ba..4e0c596 100644
--- a/server/src/main/java/org/apache/druid/segment/realtime/plumber/Sink.java
+++ b/server/src/main/java/org/apache/druid/segment/realtime/plumber/Sink.java
@@ -54,7 +54,8 @@ import java.util.concurrent.atomic.AtomicInteger;
 
 public class Sink implements Iterable<FireHydrant>
 {
-  private static final IncrementalIndexAddResult ALREADY_SWAPPED = new IncrementalIndexAddResult(-1, -1, null, "write after index swapped");
+  private static final IncrementalIndexAddResult ALREADY_SWAPPED =
+      new IncrementalIndexAddResult(-1, -1, null, "write after index swapped");
 
   private final Object hydrantLock = new Object();
   private final Interval interval;
@@ -64,7 +65,7 @@ public class Sink implements Iterable<FireHydrant>
   private final int maxRowsInMemory;
   private final long maxBytesInMemory;
   private final boolean reportParseExceptions;
-  private final CopyOnWriteArrayList<FireHydrant> hydrants = new CopyOnWriteArrayList<FireHydrant>();
+  private final CopyOnWriteArrayList<FireHydrant> hydrants = new CopyOnWriteArrayList<>();
   private final LinkedHashSet<String> dimOrder = new LinkedHashSet<>();
   private final AtomicInteger numRowsExcludingCurrIndex = new AtomicInteger();
   private volatile FireHydrant currHydrant;
diff --git a/server/src/test/java/org/apache/druid/segment/realtime/appenderator/AppenderatorTest.java b/server/src/test/java/org/apache/druid/segment/realtime/appenderator/AppenderatorTest.java
index 334214d..e9a168d 100644
--- a/server/src/test/java/org/apache/druid/segment/realtime/appenderator/AppenderatorTest.java
+++ b/server/src/test/java/org/apache/druid/segment/realtime/appenderator/AppenderatorTest.java
@@ -37,6 +37,8 @@ import org.apache.druid.query.QueryPlus;
 import org.apache.druid.query.Result;
 import org.apache.druid.query.SegmentDescriptor;
 import org.apache.druid.query.aggregation.LongSumAggregatorFactory;
+import org.apache.druid.query.scan.ScanQuery;
+import org.apache.druid.query.scan.ScanResultValue;
 import org.apache.druid.query.spec.MultipleSpecificSegmentSpec;
 import org.apache.druid.query.timeseries.TimeseriesQuery;
 import org.apache.druid.query.timeseries.TimeseriesResultValue;
@@ -49,6 +51,7 @@ import org.junit.Test;
 
 import java.util.Arrays;
 import java.util.Collections;
+import java.util.HashMap;
 import java.util.List;
 import java.util.Map;
 import java.util.concurrent.ConcurrentHashMap;
@@ -730,7 +733,7 @@ public class AppenderatorTest
       final List<Result<TimeseriesResultValue>> results3 =
           QueryPlus.wrap(query3).run(appenderator, ImmutableMap.of()).toList();
       Assert.assertEquals(
-          "query2",
+          "query3",
           ImmutableList.of(
               new Result<>(
                   DateTimes.of("2001"),
@@ -739,6 +742,42 @@ public class AppenderatorTest
           ),
           results3
       );
+
+      final ScanQuery query4 = Druids.newScanQueryBuilder()
+                                     .dataSource(AppenderatorTester.DATASOURCE)
+                                     .intervals(
+                                         new MultipleSpecificSegmentSpec(
+                                             ImmutableList.of(
+                                                 new SegmentDescriptor(
+                                                     Intervals.of("2001/PT1H"),
+                                                     IDENTIFIERS.get(2).getVersion(),
+                                                     IDENTIFIERS.get(2).getShardSpec().getPartitionNum()
+                                                 ),
+                                                 new SegmentDescriptor(
+                                                     Intervals.of("2001T03/PT1H"),
+                                                     IDENTIFIERS.get(2).getVersion(),
+                                                     IDENTIFIERS.get(2).getShardSpec().getPartitionNum()
+                                                 )
+                                             )
+                                         )
+                                     )
+                                     .order(ScanQuery.Order.ASCENDING)
+                                     .batchSize(10)
+                                     .resultFormat(ScanQuery.ResultFormat.RESULT_FORMAT_COMPACTED_LIST)
+                                     .build();
+      final List<ScanResultValue> results4 =
+          QueryPlus.wrap(query4).run(appenderator, new HashMap<>()).toList();
+      Assert.assertEquals(2, results4.size()); // 2 segments, 1 row per segment
+      Assert.assertArrayEquals(new String[]{"__time", "dim", "count", "met"}, results4.get(0).getColumns().toArray());
+      Assert.assertArrayEquals(
+          new Object[]{DateTimes.of("2001").getMillis(), "foo", 1L, 8L},
+          ((List<Object>) ((List<Object>) results4.get(0).getEvents()).get(0)).toArray()
+      );
+      Assert.assertArrayEquals(new String[]{"__time", "dim", "count", "met"}, results4.get(0).getColumns().toArray());
+      Assert.assertArrayEquals(
+          new Object[]{DateTimes.of("2001T03").getMillis(), "foo", 1L, 64L},
+          ((List<Object>) ((List<Object>) results4.get(1).getEvents()).get(0)).toArray()
+      );
     }
   }
 
diff --git a/server/src/test/java/org/apache/druid/segment/realtime/appenderator/AppenderatorTester.java b/server/src/test/java/org/apache/druid/segment/realtime/appenderator/AppenderatorTester.java
index 3b6a9d9..af706dd 100644
--- a/server/src/test/java/org/apache/druid/segment/realtime/appenderator/AppenderatorTester.java
+++ b/server/src/test/java/org/apache/druid/segment/realtime/appenderator/AppenderatorTester.java
@@ -35,12 +35,18 @@ import org.apache.druid.java.util.common.granularity.Granularities;
 import org.apache.druid.java.util.emitter.EmittingLogger;
 import org.apache.druid.java.util.emitter.core.NoopEmitter;
 import org.apache.druid.java.util.emitter.service.ServiceEmitter;
+import org.apache.druid.query.DefaultGenericQueryMetricsFactory;
 import org.apache.druid.query.DefaultQueryRunnerFactoryConglomerate;
 import org.apache.druid.query.IntervalChunkingQueryRunnerDecorator;
 import org.apache.druid.query.QueryRunnerTestHelper;
 import org.apache.druid.query.aggregation.AggregatorFactory;
 import org.apache.druid.query.aggregation.CountAggregatorFactory;
 import org.apache.druid.query.aggregation.LongSumAggregatorFactory;
+import org.apache.druid.query.scan.ScanQuery;
+import org.apache.druid.query.scan.ScanQueryConfig;
+import org.apache.druid.query.scan.ScanQueryEngine;
+import org.apache.druid.query.scan.ScanQueryQueryToolChest;
+import org.apache.druid.query.scan.ScanQueryRunnerFactory;
 import org.apache.druid.query.timeseries.TimeseriesQuery;
 import org.apache.druid.query.timeseries.TimeseriesQueryEngine;
 import org.apache.druid.query.timeseries.TimeseriesQueryQueryToolChest;
@@ -48,6 +54,7 @@ import org.apache.druid.query.timeseries.TimeseriesQueryRunnerFactory;
 import org.apache.druid.segment.IndexIO;
 import org.apache.druid.segment.IndexMerger;
 import org.apache.druid.segment.IndexMergerV9;
+import org.apache.druid.segment.TestHelper;
 import org.apache.druid.segment.column.ColumnConfig;
 import org.apache.druid.segment.indexing.DataSchema;
 import org.apache.druid.segment.indexing.RealtimeTuningConfig;
@@ -241,6 +248,14 @@ public class AppenderatorTester implements AutoCloseable
                     ),
                     new TimeseriesQueryEngine(),
                     QueryRunnerTestHelper.NOOP_QUERYWATCHER
+                ),
+                ScanQuery.class, new ScanQueryRunnerFactory(
+                    new ScanQueryQueryToolChest(
+                        new ScanQueryConfig(),
+                        new DefaultGenericQueryMetricsFactory(TestHelper.makeJsonMapper())
+                    ),
+                    new ScanQueryEngine(),
+                    new ScanQueryConfig()
                 )
             )
         ),


---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscribe@druid.apache.org
For additional commands, e-mail: commits-help@druid.apache.org