You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@druid.apache.org by ab...@apache.org on 2022/01/31 07:47:36 UTC
[druid] branch master updated: Fix load-drop-load sequence for same segment and historical in http loadqueue peon (#11717)
This is an automated email from the ASF dual-hosted git repository.
abhishek pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/druid.git
The following commit(s) were added to refs/heads/master by this push:
new c4fa3cc Fix load-drop-load sequence for same segment and historical in http loadqueue peon (#11717)
c4fa3cc is described below
commit c4fa3ccfc4a8df0da3968b817dbb057e4ada79fc
Author: Rohan Garg <77...@users.noreply.github.com>
AuthorDate: Mon Jan 31 13:16:58 2022 +0530
Fix load-drop-load sequence for same segment and historical in http loadqueue peon (#11717)
Fixes an issue where a load-drop-load sequence for a segment and historical doesn't work correctly for http based load queue peon. The first cycle of load-drop works fine - the problem comes when there is an attempt to reload the segment. The historical caches load success for some recent segments and makes the reload as a no-op. But it doesn't consider that fact that the segment was also dropped in between the load requests.
This change invalidates the cache after a client tries to fetch a success result.
---
.../coordination/SegmentLoadDropHandler.java | 5 +
.../coordination/SegmentLoadDropHandlerTest.java | 121 +++++++++++++++++++--
2 files changed, 118 insertions(+), 8 deletions(-)
diff --git a/server/src/main/java/org/apache/druid/server/coordination/SegmentLoadDropHandler.java b/server/src/main/java/org/apache/druid/server/coordination/SegmentLoadDropHandler.java
index 53013d1..e723d6d 100644
--- a/server/src/main/java/org/apache/druid/server/coordination/SegmentLoadDropHandler.java
+++ b/server/src/main/java/org/apache/druid/server/coordination/SegmentLoadDropHandler.java
@@ -555,6 +555,11 @@ public class SegmentLoadDropHandler implements DataSegmentChangeHandler
},
this::resolveWaitingFutures
);
+ } else if (status.get().getState() == Status.STATE.SUCCESS) {
+ // SUCCESS case, we'll clear up the cached success while serving it to this client
+ // Not doing this can lead to an incorrect response to upcoming clients for a reload
+ requestStatuses.invalidate(changeRequest);
+ return status;
}
return requestStatuses.getIfPresent(changeRequest);
}
diff --git a/server/src/test/java/org/apache/druid/server/coordination/SegmentLoadDropHandlerTest.java b/server/src/test/java/org/apache/druid/server/coordination/SegmentLoadDropHandlerTest.java
index 9e41197..d03bc52 100644
--- a/server/src/test/java/org/apache/druid/server/coordination/SegmentLoadDropHandlerTest.java
+++ b/server/src/test/java/org/apache/druid/server/coordination/SegmentLoadDropHandlerTest.java
@@ -87,6 +87,7 @@ public class SegmentLoadDropHandlerTest
private SegmentManager segmentManager;
private List<Runnable> scheduledRunnable;
private SegmentLoaderConfig segmentLoaderConfig;
+ private SegmentLoaderConfig noAnnouncerSegmentLoaderConfig;
private SegmentLoaderConfig segmentLoaderConfigNoLocations;
private ScheduledExecutorFactory scheduledExecutorFactory;
private List<StorageLocationConfig> locations;
@@ -194,6 +195,39 @@ public class SegmentLoadDropHandlerTest
}
};
+ noAnnouncerSegmentLoaderConfig = new SegmentLoaderConfig()
+ {
+ @Override
+ public File getInfoDir()
+ {
+ return testStorageLocation.getInfoDir();
+ }
+
+ @Override
+ public int getNumLoadingThreads()
+ {
+ return 5;
+ }
+
+ @Override
+ public int getAnnounceIntervalMillis()
+ {
+ return 0;
+ }
+
+ @Override
+ public List<StorageLocationConfig> getLocations()
+ {
+ return locations;
+ }
+
+ @Override
+ public int getDropSegmentDelayMillis()
+ {
+ return 0;
+ }
+ };
+
segmentLoaderConfigNoLocations = new SegmentLoaderConfig()
{
@Override
@@ -475,15 +509,8 @@ public class SegmentLoadDropHandlerTest
runnable.run();
}
- result = segmentLoadDropHandler.processBatch(batch).get();
+ result = segmentLoadDropHandler.processBatch(ImmutableList.of(new SegmentChangeRequestLoad(segment1))).get();
Assert.assertEquals(SegmentLoadDropHandler.Status.SUCCESS, result.get(0).getStatus());
- Assert.assertEquals(SegmentLoadDropHandler.Status.SUCCESS, result.get(1).getStatus());
-
-
- for (SegmentLoadDropHandler.DataSegmentChangeRequestAndStatus e : segmentLoadDropHandler.processBatch(batch)
- .get()) {
- Assert.assertEquals(SegmentLoadDropHandler.Status.SUCCESS, e.getStatus());
- }
segmentLoadDropHandler.stop();
}
@@ -530,4 +557,82 @@ public class SegmentLoadDropHandlerTest
segmentLoadDropHandler.stop();
}
+
+ @Test(timeout = 60_000L)
+ public void testProcessBatchLoadDropLoadSequenceForSameSegment() throws Exception
+ {
+ final SegmentManager segmentManager = Mockito.mock(SegmentManager.class);
+ Mockito.when(segmentManager.loadSegment(ArgumentMatchers.any(), ArgumentMatchers.anyBoolean(), ArgumentMatchers.any()))
+ .thenReturn(true);
+ Mockito.doNothing().when(segmentManager).dropSegment(ArgumentMatchers.any());
+ final SegmentLoadDropHandler segmentLoadDropHandler = new SegmentLoadDropHandler(
+ jsonMapper,
+ noAnnouncerSegmentLoaderConfig,
+ announcer,
+ Mockito.mock(DataSegmentServerAnnouncer.class),
+ segmentManager,
+ segmentCacheManager,
+ scheduledExecutorFactory.create(5, "SegmentLoadDropHandlerTest-[%d]"),
+ new ServerTypeConfig(ServerType.HISTORICAL)
+ );
+
+ segmentLoadDropHandler.start();
+
+ DataSegment segment1 = makeSegment("batchtest1", "1", Intervals.of("P1d/2011-04-01"));
+
+ List<DataSegmentChangeRequest> batch = ImmutableList.of(new SegmentChangeRequestLoad(segment1));
+
+ // load the segment
+ ListenableFuture<List<DataSegmentChangeRequestAndStatus>> future = segmentLoadDropHandler
+ .processBatch(batch);
+ for (Runnable runnable : scheduledRunnable) {
+ runnable.run();
+ }
+ List<DataSegmentChangeRequestAndStatus> result = future.get();
+ Assert.assertEquals(STATE.SUCCESS, result.get(0).getStatus().getState());
+ scheduledRunnable.clear();
+
+ // drop the segment
+ batch = ImmutableList.of(new SegmentChangeRequestDrop(segment1));
+ future = segmentLoadDropHandler.processBatch(batch);
+ for (Runnable runnable : scheduledRunnable) {
+ runnable.run();
+ }
+ result = future.get();
+ Assert.assertEquals(STATE.SUCCESS, result.get(0).getStatus().getState());
+ scheduledRunnable.clear();
+
+ // check invocations after a load-drop sequence
+ Mockito.verify(segmentManager, Mockito.times(1)).loadSegment(ArgumentMatchers.any(), ArgumentMatchers.anyBoolean(), ArgumentMatchers.any());
+ Mockito.verify(segmentManager, Mockito.times(1)).dropSegment(ArgumentMatchers.any());
+
+ // try to reload the segment - this should be a no-op since it might be the case that this is the first load client
+ // with this request, we'll forget about the success of the load request
+ batch = ImmutableList.of(new SegmentChangeRequestLoad(segment1));
+ future = segmentLoadDropHandler.processBatch(batch);
+ Assert.assertEquals(scheduledRunnable.size(), 0);
+ result = future.get();
+ Assert.assertEquals(STATE.SUCCESS, result.get(0).getStatus().getState());
+
+ // check invocations - should stay the same
+ Mockito.verify(segmentManager, Mockito.times(1)).loadSegment(ArgumentMatchers.any(), ArgumentMatchers.anyBoolean(), ArgumentMatchers.any());
+ Mockito.verify(segmentManager, Mockito.times(1)).dropSegment(ArgumentMatchers.any());
+
+ // try to reload the segment - this time the loader will know that is a fresh request to load
+ // so, the segment manager will be asked to load
+ batch = ImmutableList.of(new SegmentChangeRequestLoad(segment1));
+ future = segmentLoadDropHandler.processBatch(batch);
+ for (Runnable runnable : scheduledRunnable) {
+ runnable.run();
+ }
+ result = future.get();
+ Assert.assertEquals(STATE.SUCCESS, result.get(0).getStatus().getState());
+ scheduledRunnable.clear();
+
+ // check invocations - the load segment counter should bump up
+ Mockito.verify(segmentManager, Mockito.times(2)).loadSegment(ArgumentMatchers.any(), ArgumentMatchers.anyBoolean(), ArgumentMatchers.any());
+ Mockito.verify(segmentManager, Mockito.times(1)).dropSegment(ArgumentMatchers.any());
+
+ segmentLoadDropHandler.stop();
+ }
}
---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscribe@druid.apache.org
For additional commands, e-mail: commits-help@druid.apache.org