You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@druid.apache.org by GitBox <gi...@apache.org> on 2019/03/12 07:13:51 UTC

[GitHub] [incubator-druid] samarthjain commented on a change in pull request #7088: Improve parallelism of zookeeper based segment change processing

samarthjain commented on a change in pull request #7088: Improve parallelism of zookeeper based segment change processing
URL: https://github.com/apache/incubator-druid/pull/7088#discussion_r264544577
 
 

 ##########
 File path: server/src/main/java/org/apache/druid/server/coordinator/CuratorLoadQueuePeon.java
 ##########
 @@ -290,128 +307,119 @@ private void processSegmentChangeRequest()
         // We do not create the existence watcher first, because then it will fire when we create the
         // node and we'll have the same race when trying to refresh that watcher.
         curator.create().withMode(CreateMode.EPHEMERAL).forPath(path, noopPayload);
-
-        entryRemoved(path);
+        entryRemoved(segmentHolder, path);
       }
     }
+    catch (KeeperException.NodeExistsException ne) {
+      // This is expected when historicals haven't yet picked up processing this segment and coordinator
+      // tries reassigning it to the same node.
+      log.warn(ne, "ZK node already exists because segment change request hasn't yet been processed");
+      failAssign(segmentHolder);
+    }
     catch (Exception e) {
-      failAssign(e);
+      failAssign(segmentHolder, e);
     }
   }
 
-  private void actionCompleted()
+  private void actionCompleted(SegmentHolder segmentHolder)
   {
-    if (currentlyProcessing != null) {
-      switch (currentlyProcessing.getType()) {
-        case LOAD:
-          segmentsToLoad.remove(currentlyProcessing.getSegment());
-          queuedSize.addAndGet(-currentlyProcessing.getSegmentSize());
-          break;
-        case DROP:
-          segmentsToDrop.remove(currentlyProcessing.getSegment());
-          break;
-        default:
-          throw new UnsupportedOperationException();
-      }
-
-      final List<LoadPeonCallback> callbacks = currentlyProcessing.getCallbacks();
-      currentlyProcessing = null;
-      callBackExecutor.execute(
-          () -> executeCallbacks(callbacks)
-      );
+    switch (segmentHolder.getType()) {
+      case LOAD:
+        segmentsToLoad.remove(segmentHolder.getSegment());
+        queuedSize.addAndGet(-segmentHolder.getSegmentSize());
+        break;
+      case DROP:
+        segmentsToDrop.remove(segmentHolder.getSegment());
+        break;
+      default:
+        throw new UnsupportedOperationException();
     }
+
+    callBackExecutor.execute(
+        () -> executeCallbacks(segmentHolder)
+    );
   }
 
+
   @Override
   public void start()
   {
-    ScheduledExecutors.scheduleAtFixedRate(
-        processingExecutor,
-        config.getLoadQueuePeonRepeatDelay(),
-        config.getLoadQueuePeonRepeatDelay(),
-        () -> {
-          processSegmentChangeRequest();
-
-          if (stopped) {
-            return ScheduledExecutors.Signal.STOP;
-          } else {
-            return ScheduledExecutors.Signal.REPEAT;
-          }
-        }
-    );
+    for (int i = 0; i < numProcessingQueues; i++) {
+      processingExecutor.scheduleAtFixedRate(
+          new SegmentChangeProcessor(segmentProcessingQueues[i]),
+          0,
+          config.getCuratorCreateZkNodesRepeatDelay().getMillis(),
 
 Review comment:
   Every thread gets its own queue of requests. This helps reduce contention between threads trying to call drainTo(Collection). 
   
   The idea behind repeated runs was to introduce batching so that there is not a flood of zookeeper nodes. Further, it gives historicals some time to service the segment load/drop requests. Without the batching and delay, it is possible that historicals will never get to complete the process segment request causing coordinator to repeatedly delete and create zk nodes. 

----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
users@infra.apache.org


With regards,
Apache Git Services

---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscribe@druid.apache.org
For additional commands, e-mail: commits-help@druid.apache.org