You are viewing a plain text version of this content. The canonical link for it is here.
Posted to dev@druid.apache.org by GitBox <gi...@apache.org> on 2018/07/09 18:23:23 UTC

[GitHub] gianm closed pull request #5960: [Backport] add 'stopped' check and handling to HttpLoadQueuePeon load and drop segment methods

gianm closed pull request #5960: [Backport] add 'stopped' check and handling to HttpLoadQueuePeon load and drop segment methods
URL: https://github.com/apache/incubator-druid/pull/5960
 
 
   

This is a PR merged from a forked repository.
As GitHub hides the original diff on merge, it is displayed below for
the sake of provenance:

As this is a foreign pull request (from a fork), the diff is supplied
below (as it won't show otherwise due to GitHub magic):

diff --git a/server/src/main/java/io/druid/server/coordinator/HttpLoadQueuePeon.java b/server/src/main/java/io/druid/server/coordinator/HttpLoadQueuePeon.java
index dbeeb738601..ece1d4884fa 100644
--- a/server/src/main/java/io/druid/server/coordinator/HttpLoadQueuePeon.java
+++ b/server/src/main/java/io/druid/server/coordinator/HttpLoadQueuePeon.java
@@ -28,16 +28,16 @@
 import com.google.common.util.concurrent.FutureCallback;
 import com.google.common.util.concurrent.Futures;
 import com.google.common.util.concurrent.ListenableFuture;
+import io.druid.java.util.common.ISE;
 import io.druid.java.util.common.RE;
 import io.druid.java.util.common.StringUtils;
+import io.druid.java.util.common.concurrent.ScheduledExecutors;
 import io.druid.java.util.emitter.EmittingLogger;
 import io.druid.java.util.http.client.HttpClient;
 import io.druid.java.util.http.client.Request;
 import io.druid.java.util.http.client.io.AppendableByteArrayInputStream;
 import io.druid.java.util.http.client.response.ClientResponse;
 import io.druid.java.util.http.client.response.InputStreamResponseHandler;
-import io.druid.java.util.common.ISE;
-import io.druid.java.util.common.concurrent.ScheduledExecutors;
 import io.druid.server.coordination.DataSegmentChangeCallback;
 import io.druid.server.coordination.DataSegmentChangeHandler;
 import io.druid.server.coordination.DataSegmentChangeRequest;
@@ -61,7 +61,6 @@
 import java.util.List;
 import java.util.Map;
 import java.util.Set;
-import java.util.concurrent.Callable;
 import java.util.concurrent.ConcurrentSkipListMap;
 import java.util.concurrent.ConcurrentSkipListSet;
 import java.util.concurrent.ExecutorService;
@@ -261,6 +260,7 @@ public void onSuccess(InputStream result)
             public void onFailure(Throwable t)
             {
               try {
+                responseHandler.description = t.toString();
                 logRequestFailure(t);
               }
               finally {
@@ -333,20 +333,15 @@ public void start()
       ScheduledExecutors.scheduleAtFixedRate(
           processingExecutor,
           new Duration(config.getHttpLoadQueuePeonRepeatDelay()),
-          new Callable<ScheduledExecutors.Signal>()
-          {
-            @Override
-            public ScheduledExecutors.Signal call()
-            {
-              if (!stopped) {
-                doSegmentManagement();
-              }
+          () -> {
+            if (!stopped) {
+              doSegmentManagement();
+            }
 
-              if (stopped) {
-                return ScheduledExecutors.Signal.STOP;
-              } else {
-                return ScheduledExecutors.Signal.REPEAT;
-              }
+            if (stopped) {
+              return ScheduledExecutors.Signal.STOP;
+            } else {
+              return ScheduledExecutors.Signal.REPEAT;
             }
           }
       );
@@ -364,11 +359,11 @@ public void stop()
       stopped = true;
 
       for (SegmentHolder holder : segmentsToDrop.values()) {
-        holder.requestSucceeded();
+        holder.requestFailed("Stopping load queue peon.");
       }
 
       for (SegmentHolder holder : segmentsToLoad.values()) {
-        holder.requestSucceeded();
+        holder.requestFailed("Stopping load queue peon.");
       }
 
       segmentsToDrop.clear();
@@ -382,6 +377,16 @@ public void stop()
   public void loadSegment(DataSegment segment, LoadPeonCallback callback)
   {
     synchronized (lock) {
+      if (stopped) {
+        log.warn(
+            "Server[%s] cannot load segment[%s] because load queue peon is stopped.",
+            serverId,
+            segment.getIdentifier()
+        );
+        callback.execute();
+        return;
+      }
+
       SegmentHolder holder = segmentsToLoad.get(segment);
 
       if (holder == null) {
@@ -398,6 +403,15 @@ public void loadSegment(DataSegment segment, LoadPeonCallback callback)
   public void dropSegment(DataSegment segment, LoadPeonCallback callback)
   {
     synchronized (lock) {
+      if (stopped) {
+        log.warn(
+            "Server[%s] cannot drop segment[%s] because load queue peon is stopped.",
+            serverId,
+            segment.getIdentifier()
+        );
+        callback.execute();
+        return;
+      }
       SegmentHolder holder = segmentsToDrop.get(segment);
 
       if (holder == null) {
diff --git a/server/src/test/java/io/druid/server/coordinator/CuratorDruidCoordinatorTest.java b/server/src/test/java/io/druid/server/coordinator/CuratorDruidCoordinatorTest.java
index 9447c7b0895..0ed48ac726d 100644
--- a/server/src/test/java/io/druid/server/coordinator/CuratorDruidCoordinatorTest.java
+++ b/server/src/test/java/io/druid/server/coordinator/CuratorDruidCoordinatorTest.java
@@ -233,7 +233,7 @@ public void tearDown() throws Exception
     tearDownServerAndCurator();
   }
 
-  @Test(timeout = 5_000)
+  @Test(timeout = 10_000)
   public void testMoveSegment() throws Exception
   {
     segmentViewInitLatch = new CountDownLatch(1);
diff --git a/server/src/test/java/io/druid/server/coordinator/HttpLoadQueuePeonTest.java b/server/src/test/java/io/druid/server/coordinator/HttpLoadQueuePeonTest.java
index 72fb9a36a5d..c2388359a1e 100644
--- a/server/src/test/java/io/druid/server/coordinator/HttpLoadQueuePeonTest.java
+++ b/server/src/test/java/io/druid/server/coordinator/HttpLoadQueuePeonTest.java
@@ -24,14 +24,14 @@
 import com.google.common.collect.ImmutableMap;
 import com.google.common.util.concurrent.Futures;
 import com.google.common.util.concurrent.ListenableFuture;
-import io.druid.java.util.http.client.HttpClient;
-import io.druid.java.util.http.client.Request;
-import io.druid.java.util.http.client.response.HttpResponseHandler;
 import io.druid.discovery.DiscoveryDruidNode;
 import io.druid.discovery.DruidNodeDiscovery;
 import io.druid.java.util.common.Intervals;
 import io.druid.java.util.common.RE;
 import io.druid.java.util.common.concurrent.Execs;
+import io.druid.java.util.http.client.HttpClient;
+import io.druid.java.util.http.client.Request;
+import io.druid.java.util.http.client.response.HttpResponseHandler;
 import io.druid.server.ServerTestHelper;
 import io.druid.server.coordination.DataSegmentChangeRequest;
 import io.druid.server.coordination.SegmentLoadDropHandler;
@@ -57,40 +57,92 @@
  */
 public class HttpLoadQueuePeonTest
 {
+  final DataSegment segment1 = new DataSegment(
+      "test1", Intervals.of("2014/2015"), "v1",
+      null, null, null, null, 0, 0
+  );
+
+  final DataSegment segment2 = new DataSegment(
+      "test2", Intervals.of("2014/2015"), "v1",
+      null, null, null, null, 0, 0
+  );
+
+  final DataSegment segment3 = new DataSegment(
+      "test3", Intervals.of("2014/2015"), "v1",
+      null, null, null, null, 0, 0
+  );
+
+  final DataSegment segment4 = new DataSegment(
+      "test4", Intervals.of("2014/2015"), "v1",
+      null, null, null, null, 0, 0
+  );
+
+  final TestDruidCoordinatorConfig config = new TestDruidCoordinatorConfig(
+      null,
+      null,
+      null,
+      null,
+      null,
+      null,
+      10,
+      null,
+      false,
+      false,
+      Duration.ZERO
+  )
+  {
+    @Override
+    public int getHttpLoadQueuePeonBatchSize()
+    {
+      return 2;
+    }
+  };
+
   @Test(timeout = 10000)
   public void testSimple() throws Exception
   {
-    final DataSegment segment1 = new DataSegment(
-        "test1", Intervals.of("2014/2015"), "v1",
-        null, null, null, null, 0, 0
+    HttpLoadQueuePeon httpLoadQueuePeon = new HttpLoadQueuePeon(
+        "http://dummy:4000",
+        ServerTestHelper.MAPPER,
+        new TestHttpClient(),
+        config,
+        Executors.newScheduledThreadPool(
+            2,
+            Execs.makeThreadFactory("HttpLoadQueuePeonTest-%s")
+        ),
+        Execs.singleThreaded("HttpLoadQueuePeonTest")
     );
 
-    final DataSegment segment2 = new DataSegment(
-        "test2", Intervals.of("2014/2015"), "v1",
-        null, null, null, null, 0, 0
-    );
+    httpLoadQueuePeon.start();
 
-    final DataSegment segment3 = new DataSegment(
-        "test3", Intervals.of("2014/2015"), "v1",
-        null, null, null, null, 0, 0
+    Map<String, CountDownLatch> latches = ImmutableMap.of(
+        segment1.getIdentifier(), new CountDownLatch(1),
+        segment2.getIdentifier(), new CountDownLatch(1),
+        segment3.getIdentifier(), new CountDownLatch(1),
+        segment4.getIdentifier(), new CountDownLatch(1)
     );
 
-    final DataSegment segment4 = new DataSegment(
-        "test4", Intervals.of("2014/2015"), "v1",
-        null, null, null, null, 0, 0
-    );
+    httpLoadQueuePeon.dropSegment(segment1, () -> latches.get(segment1.getIdentifier()).countDown());
+    httpLoadQueuePeon.loadSegment(segment2, () -> latches.get(segment2.getIdentifier()).countDown());
+    httpLoadQueuePeon.dropSegment(segment3, () -> latches.get(segment3.getIdentifier()).countDown());
+    httpLoadQueuePeon.loadSegment(segment4, () -> latches.get(segment4.getIdentifier()).countDown());
+
+    latches.get(segment1.getIdentifier()).await();
+    latches.get(segment2.getIdentifier()).await();
+    latches.get(segment3.getIdentifier()).await();
+    latches.get(segment4.getIdentifier()).await();
 
+    httpLoadQueuePeon.stop();
+  }
+
+  @Test(timeout = 10000)
+  public void testLoadDropAfterStop() throws Exception
+  {
     HttpLoadQueuePeon httpLoadQueuePeon = new HttpLoadQueuePeon(
         "http://dummy:4000",
         ServerTestHelper.MAPPER,
         new TestHttpClient(),
-        new TestDruidCoordinatorConfig(null, null, null, null, null, null, 10, null, false, false, Duration.ZERO) {
-          @Override
-          public int getHttpLoadQueuePeonBatchSize()
-          {
-            return 2;
-          }
-        },
+        config,
         Executors.newScheduledThreadPool(
             2,
             Execs.makeThreadFactory("HttpLoadQueuePeonTest-%s")
@@ -107,48 +159,16 @@ public int getHttpLoadQueuePeonBatchSize()
         segment4.getIdentifier(), new CountDownLatch(1)
     );
 
-    httpLoadQueuePeon.dropSegment(segment1, new LoadPeonCallback()
-    {
-      @Override
-      public void execute()
-      {
-        latches.get(segment1.getIdentifier()).countDown();
-      }
-    });
-
-    httpLoadQueuePeon.loadSegment(segment2, new LoadPeonCallback()
-    {
-      @Override
-      public void execute()
-      {
-        latches.get(segment2.getIdentifier()).countDown();
-      }
-    });
-
-    httpLoadQueuePeon.dropSegment(segment3, new LoadPeonCallback()
-    {
-      @Override
-      public void execute()
-      {
-        latches.get(segment3.getIdentifier()).countDown();
-      }
-    });
-
-    httpLoadQueuePeon.loadSegment(segment4, new LoadPeonCallback()
-    {
-      @Override
-      public void execute()
-      {
-        latches.get(segment4.getIdentifier()).countDown();
-      }
-    });
-
+    httpLoadQueuePeon.dropSegment(segment1, () -> latches.get(segment1.getIdentifier()).countDown());
+    httpLoadQueuePeon.loadSegment(segment2, () -> latches.get(segment2.getIdentifier()).countDown());
     latches.get(segment1.getIdentifier()).await();
     latches.get(segment2.getIdentifier()).await();
+    httpLoadQueuePeon.stop();
+    httpLoadQueuePeon.dropSegment(segment3, () -> latches.get(segment3.getIdentifier()).countDown());
+    httpLoadQueuePeon.loadSegment(segment4, () -> latches.get(segment4.getIdentifier()).countDown());
     latches.get(segment3.getIdentifier()).await();
     latches.get(segment4.getIdentifier()).await();
 
-    httpLoadQueuePeon.stop();
   }
 
   private static class TestDruidNodeDiscovery implements DruidNodeDiscovery
@@ -191,12 +211,17 @@ public void registerListener(Listener listener)
       httpResponseHandler.handleResponse(httpResponse);
       try {
         List<DataSegmentChangeRequest> changeRequests = ServerTestHelper.MAPPER.readValue(
-            request.getContent().array(), new TypeReference<List<DataSegmentChangeRequest>>() {}
+            request.getContent().array(), new TypeReference<List<DataSegmentChangeRequest>>()
+            {
+            }
         );
 
         List<SegmentLoadDropHandler.DataSegmentChangeRequestAndStatus> statuses = new ArrayList<>(changeRequests.size());
         for (DataSegmentChangeRequest cr : changeRequests) {
-          statuses.add(new SegmentLoadDropHandler.DataSegmentChangeRequestAndStatus(cr, SegmentLoadDropHandler.Status.SUCCESS));
+          statuses.add(new SegmentLoadDropHandler.DataSegmentChangeRequestAndStatus(
+              cr,
+              SegmentLoadDropHandler.Status.SUCCESS
+          ));
         }
         return (ListenableFuture) Futures.immediateFuture(
             new ByteArrayInputStream(


 

----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
users@infra.apache.org


With regards,
Apache Git Services

---------------------------------------------------------------------
To unsubscribe, e-mail: dev-unsubscribe@druid.apache.org
For additional commands, e-mail: dev-help@druid.apache.org