You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@druid.apache.org by ab...@apache.org on 2022/01/31 07:47:36 UTC

[druid] branch master updated: Fix load-drop-load sequence for same segment and historical in http loadqueue peon (#11717)

This is an automated email from the ASF dual-hosted git repository.

abhishek pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/druid.git


The following commit(s) were added to refs/heads/master by this push:
     new c4fa3cc  Fix load-drop-load sequence for same segment and historical in http loadqueue peon (#11717)
c4fa3cc is described below

commit c4fa3ccfc4a8df0da3968b817dbb057e4ada79fc
Author: Rohan Garg <77...@users.noreply.github.com>
AuthorDate: Mon Jan 31 13:16:58 2022 +0530

    Fix load-drop-load sequence for same segment and historical in http loadqueue peon (#11717)
    
    Fixes an issue where a load-drop-load sequence for a segment and historical doesn't work correctly for http based load queue peon. The first cycle of load-drop works fine - the problem comes when there is an attempt to reload the segment. The historical caches load success for some recent segments and makes the reload as a no-op. But it doesn't consider that fact that the segment was also dropped in between the load requests.
    This change invalidates the cache after a client tries to fetch a success result.
---
 .../coordination/SegmentLoadDropHandler.java       |   5 +
 .../coordination/SegmentLoadDropHandlerTest.java   | 121 +++++++++++++++++++--
 2 files changed, 118 insertions(+), 8 deletions(-)

diff --git a/server/src/main/java/org/apache/druid/server/coordination/SegmentLoadDropHandler.java b/server/src/main/java/org/apache/druid/server/coordination/SegmentLoadDropHandler.java
index 53013d1..e723d6d 100644
--- a/server/src/main/java/org/apache/druid/server/coordination/SegmentLoadDropHandler.java
+++ b/server/src/main/java/org/apache/druid/server/coordination/SegmentLoadDropHandler.java
@@ -555,6 +555,11 @@ public class SegmentLoadDropHandler implements DataSegmentChangeHandler
             },
             this::resolveWaitingFutures
         );
+      } else if (status.get().getState() == Status.STATE.SUCCESS) {
+        // SUCCESS case, we'll clear up the cached success while serving it to this client
+        // Not doing this can lead to an incorrect response to upcoming clients for a reload
+        requestStatuses.invalidate(changeRequest);
+        return status;
       }
       return requestStatuses.getIfPresent(changeRequest);
     }
diff --git a/server/src/test/java/org/apache/druid/server/coordination/SegmentLoadDropHandlerTest.java b/server/src/test/java/org/apache/druid/server/coordination/SegmentLoadDropHandlerTest.java
index 9e41197..d03bc52 100644
--- a/server/src/test/java/org/apache/druid/server/coordination/SegmentLoadDropHandlerTest.java
+++ b/server/src/test/java/org/apache/druid/server/coordination/SegmentLoadDropHandlerTest.java
@@ -87,6 +87,7 @@ public class SegmentLoadDropHandlerTest
   private SegmentManager segmentManager;
   private List<Runnable> scheduledRunnable;
   private SegmentLoaderConfig segmentLoaderConfig;
+  private SegmentLoaderConfig noAnnouncerSegmentLoaderConfig;
   private SegmentLoaderConfig segmentLoaderConfigNoLocations;
   private ScheduledExecutorFactory scheduledExecutorFactory;
   private List<StorageLocationConfig> locations;
@@ -194,6 +195,39 @@ public class SegmentLoadDropHandlerTest
       }
     };
 
+    noAnnouncerSegmentLoaderConfig = new SegmentLoaderConfig()
+    {
+      @Override
+      public File getInfoDir()
+      {
+        return testStorageLocation.getInfoDir();
+      }
+
+      @Override
+      public int getNumLoadingThreads()
+      {
+        return 5;
+      }
+
+      @Override
+      public int getAnnounceIntervalMillis()
+      {
+        return 0;
+      }
+
+      @Override
+      public List<StorageLocationConfig> getLocations()
+      {
+        return locations;
+      }
+
+      @Override
+      public int getDropSegmentDelayMillis()
+      {
+        return 0;
+      }
+    };
+
     segmentLoaderConfigNoLocations = new SegmentLoaderConfig()
     {
       @Override
@@ -475,15 +509,8 @@ public class SegmentLoadDropHandlerTest
       runnable.run();
     }
 
-    result = segmentLoadDropHandler.processBatch(batch).get();
+    result = segmentLoadDropHandler.processBatch(ImmutableList.of(new SegmentChangeRequestLoad(segment1))).get();
     Assert.assertEquals(SegmentLoadDropHandler.Status.SUCCESS, result.get(0).getStatus());
-    Assert.assertEquals(SegmentLoadDropHandler.Status.SUCCESS, result.get(1).getStatus());
-
-
-    for (SegmentLoadDropHandler.DataSegmentChangeRequestAndStatus e : segmentLoadDropHandler.processBatch(batch)
-                                                                                            .get()) {
-      Assert.assertEquals(SegmentLoadDropHandler.Status.SUCCESS, e.getStatus());
-    }
 
     segmentLoadDropHandler.stop();
   }
@@ -530,4 +557,82 @@ public class SegmentLoadDropHandlerTest
 
     segmentLoadDropHandler.stop();
   }
+
+  @Test(timeout = 60_000L)
+  public void testProcessBatchLoadDropLoadSequenceForSameSegment() throws Exception
+  {
+    final SegmentManager segmentManager = Mockito.mock(SegmentManager.class);
+    Mockito.when(segmentManager.loadSegment(ArgumentMatchers.any(), ArgumentMatchers.anyBoolean(), ArgumentMatchers.any()))
+           .thenReturn(true);
+    Mockito.doNothing().when(segmentManager).dropSegment(ArgumentMatchers.any());
+    final SegmentLoadDropHandler segmentLoadDropHandler = new SegmentLoadDropHandler(
+        jsonMapper,
+        noAnnouncerSegmentLoaderConfig,
+        announcer,
+        Mockito.mock(DataSegmentServerAnnouncer.class),
+        segmentManager,
+        segmentCacheManager,
+        scheduledExecutorFactory.create(5, "SegmentLoadDropHandlerTest-[%d]"),
+        new ServerTypeConfig(ServerType.HISTORICAL)
+    );
+
+    segmentLoadDropHandler.start();
+
+    DataSegment segment1 = makeSegment("batchtest1", "1", Intervals.of("P1d/2011-04-01"));
+
+    List<DataSegmentChangeRequest> batch = ImmutableList.of(new SegmentChangeRequestLoad(segment1));
+
+    // load the segment
+    ListenableFuture<List<DataSegmentChangeRequestAndStatus>> future = segmentLoadDropHandler
+        .processBatch(batch);
+    for (Runnable runnable : scheduledRunnable) {
+      runnable.run();
+    }
+    List<DataSegmentChangeRequestAndStatus> result = future.get();
+    Assert.assertEquals(STATE.SUCCESS, result.get(0).getStatus().getState());
+    scheduledRunnable.clear();
+
+    // drop the segment
+    batch = ImmutableList.of(new SegmentChangeRequestDrop(segment1));
+    future = segmentLoadDropHandler.processBatch(batch);
+    for (Runnable runnable : scheduledRunnable) {
+      runnable.run();
+    }
+    result = future.get();
+    Assert.assertEquals(STATE.SUCCESS, result.get(0).getStatus().getState());
+    scheduledRunnable.clear();
+
+    // check invocations after a load-drop sequence
+    Mockito.verify(segmentManager, Mockito.times(1)).loadSegment(ArgumentMatchers.any(), ArgumentMatchers.anyBoolean(), ArgumentMatchers.any());
+    Mockito.verify(segmentManager, Mockito.times(1)).dropSegment(ArgumentMatchers.any());
+
+    // try to reload the segment - this should be a no-op since it might be the case that this is the first load client
+    // with this request, we'll forget about the success of the load request
+    batch = ImmutableList.of(new SegmentChangeRequestLoad(segment1));
+    future = segmentLoadDropHandler.processBatch(batch);
+    Assert.assertEquals(scheduledRunnable.size(), 0);
+    result = future.get();
+    Assert.assertEquals(STATE.SUCCESS, result.get(0).getStatus().getState());
+
+    // check invocations - should stay the same
+    Mockito.verify(segmentManager, Mockito.times(1)).loadSegment(ArgumentMatchers.any(), ArgumentMatchers.anyBoolean(), ArgumentMatchers.any());
+    Mockito.verify(segmentManager, Mockito.times(1)).dropSegment(ArgumentMatchers.any());
+
+    // try to reload the segment - this time the loader will know that is a fresh request to load
+    // so, the segment manager will be asked to load
+    batch = ImmutableList.of(new SegmentChangeRequestLoad(segment1));
+    future = segmentLoadDropHandler.processBatch(batch);
+    for (Runnable runnable : scheduledRunnable) {
+      runnable.run();
+    }
+    result = future.get();
+    Assert.assertEquals(STATE.SUCCESS, result.get(0).getStatus().getState());
+    scheduledRunnable.clear();
+
+    // check invocations - the load segment counter should bump up
+    Mockito.verify(segmentManager, Mockito.times(2)).loadSegment(ArgumentMatchers.any(), ArgumentMatchers.anyBoolean(), ArgumentMatchers.any());
+    Mockito.verify(segmentManager, Mockito.times(1)).dropSegment(ArgumentMatchers.any());
+
+    segmentLoadDropHandler.stop();
+  }
 }

---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscribe@druid.apache.org
For additional commands, e-mail: commits-help@druid.apache.org