You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@druid.apache.org by "cryptoe (via GitHub)" <gi...@apache.org> on 2023/02/06 13:01:47 UTC

[GitHub] [druid] cryptoe commented on a diff in pull request #13706: Generate tombstones when running MSQ's replace

cryptoe commented on code in PR #13706:
URL: https://github.com/apache/druid/pull/13706#discussion_r1097290912


##########
extensions-core/multi-stage-query/src/test/java/org/apache/druid/msq/exec/MSQFaultsTest.java:
##########
@@ -114,30 +111,6 @@ public void testInsertCannotOrderByDescendingFault()
                      .verifyResults();
   }
 
-  @Test
-  public void testInsertCannotReplaceExistingSegmentFault()
-  {

Review Comment:
   We should have a test case which tests tombstone segments. This would give us more confidence in the PR. 



##########
indexing-service/src/main/java/org/apache/druid/indexing/common/task/batch/parallel/TombstoneHelper.java:
##########
@@ -122,10 +125,105 @@ public List<Interval> computeTombstoneIntervals() throws IOException
     return retVal;
   }
 
-  private DataSegment createTombstoneForTimeChunkInterval(String dataSource, String version, ShardSpec shardSpec, Interval timeChunkInterval)
+  public Set<DataSegment> computeTombstonesForReplace(
+      List<Interval> intervalsToDrop,
+      List<Interval> intervalsToReplace,
+      String dataSource,
+      Granularity replaceGranularity
+  ) throws IOException
+  {
+    Set<Interval> tombstoneIntervals = computeTombstoneIntervalsForReplace(
+        intervalsToReplace,
+        intervalsToDrop,
+        dataSource,
+        replaceGranularity
+    );
+
+    final List<TaskLock> locks = taskActionClient.submit(new LockListAction());
+
+    Set<DataSegment> tombstones = new HashSet<>();
+    for (Interval tombstoneInterval : tombstoneIntervals) {
+      String version = null;
+      for (final TaskLock lock : locks) {
+        if (lock.getInterval().contains(tombstoneInterval)) {

Review Comment:
   Shouldn't we do a data source filter here ?



##########
extensions-core/multi-stage-query/src/test/java/org/apache/druid/msq/exec/MSQReplaceTest.java:
##########
@@ -413,12 +413,7 @@ public void testReplaceWhereClauseLargerThanData()
                              new Object[]{946771200000L, 2.0f}
                          )
                      )
-                     .setExpectedSegment(ImmutableSet.of(SegmentId.of(
-                         "foo",
-                         Intervals.of("2000-01-01T/P1M"),
-                         "test",
-                         0
-                     )))
+                     .setExpectedTombstoneIntervals(ImmutableSet.of(Intervals.of("2001-01-01/2001-02-01")))

Review Comment:
   How is this tombstone since we are generating data for this interval?



##########
extensions-core/multi-stage-query/src/test/java/org/apache/druid/msq/test/MSQTestBase.java:
##########
@@ -1127,6 +1136,35 @@ public void verifyResults()
             Assert.assertTrue(segmentIdVsOutputRowsMap.get(diskSegment).contains(Arrays.asList(row)));
           }
         }
+        if (!testTaskActionClient.getPublishedSegments().isEmpty()) {
+          Set<SegmentId> expectedPublishedSegmentIds = segmentManager.getAllDataSegments()

Review Comment:
   Should't we substract segments which are present in segmentIdVsOutputRowMap.keys() when we are asserting against tombstone segments ?



##########
extensions-core/multi-stage-query/src/test/java/org/apache/druid/msq/test/MSQTestTaskActionClient.java:
##########
@@ -40,18 +41,29 @@
 import org.apache.druid.timeline.SegmentId;
 import org.joda.time.Interval;
 
+import java.util.HashSet;
+import java.util.List;
+import java.util.Map;
 import java.util.Set;
 import java.util.concurrent.ConcurrentHashMap;
 import java.util.concurrent.atomic.AtomicInteger;
+import java.util.stream.Collectors;
 
 public class MSQTestTaskActionClient implements TaskActionClient
 {
 
-  private static final String VERSION = "test";
+  public static final String VERSION = "test";
   private final ObjectMapper mapper;
   private final ConcurrentHashMap<SegmentId, AtomicInteger> segmentIdPartitionIdMap = new ConcurrentHashMap<>();
+  private final Map<String, List<Interval>> usedIntervals = ImmutableMap.of(
+      "foo", ImmutableList.of(Intervals.of("2001-01-01/2001-01-04"), Intervals.of("2000-01-01/2000-01-04")),
+      "foo2", ImmutableList.of(Intervals.of("2000-01-01/P1D"))
+  );
+  private final Set<DataSegment> publishedSegments = new HashSet<>();
 
-  public MSQTestTaskActionClient(ObjectMapper mapper)
+  public MSQTestTaskActionClient(

Review Comment:
   Looks like we have state now in this client. Might want to mention that somewhere. Does it work with the calciteTests for MSQ ?



##########
extensions-core/multi-stage-query/src/main/java/org/apache/druid/msq/exec/ControllerImpl.java:
##########
@@ -1247,48 +1244,33 @@ private void postResultPartitionBoundariesForStage(
   /**
    * Publish the list of segments. Additionally, if {@link DataSourceMSQDestination#isReplaceTimeChunks()},
    * also drop all other segments within the replacement intervals.
-   * <p>
-   * If any existing segments cannot be dropped because their intervals are not wholly contained within the
-   * replacement parameter, throws a {@link MSQException} with {@link InsertCannotReplaceExistingSegmentFault}.
    */
   private void publishAllSegments(final Set<DataSegment> segments) throws IOException
   {
     final DataSourceMSQDestination destination =
         (DataSourceMSQDestination) task.getQuerySpec().getDestination();
-    final Set<DataSegment> segmentsToDrop;
+    Set<DataSegment> segmentsWithTombstones = new HashSet<>(segments);

Review Comment:
   Nit: We can still make it final 



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscribe@druid.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscribe@druid.apache.org
For additional commands, e-mail: commits-help@druid.apache.org