You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@lucene.apache.org by no...@apache.org on 2016/06/12 07:41:44 UTC

lucene-solr git commit: SOLR-8744: Minimize the impact on ZK when there are a lot of blocked tasks

Repository: lucene-solr
Updated Branches:
  refs/heads/master 844ca4a34 -> 232b44e28


SOLR-8744:�Minimize the impact on ZK when there are a lot of blocked tasks


Project: http://git-wip-us.apache.org/repos/asf/lucene-solr/repo
Commit: http://git-wip-us.apache.org/repos/asf/lucene-solr/commit/232b44e2
Tree: http://git-wip-us.apache.org/repos/asf/lucene-solr/tree/232b44e2
Diff: http://git-wip-us.apache.org/repos/asf/lucene-solr/diff/232b44e2

Branch: refs/heads/master
Commit: 232b44e283dfd01f9ec01b4e68b09b3755a1b17a
Parents: 844ca4a
Author: Noble Paul <no...@apache.org>
Authored: Sun Jun 12 13:11:07 2016 +0530
Committer: Noble Paul <no...@apache.org>
Committed: Sun Jun 12 13:11:07 2016 +0530

----------------------------------------------------------------------
 .../cloud/OverseerCollectionMessageHandler.java |  4 ++
 .../solr/cloud/OverseerTaskProcessor.java       | 70 ++++++++++++++++++--
 .../apache/solr/cloud/OverseerTaskQueue.java    |  5 +-
 .../apache/solr/cloud/MultiThreadedOCPTest.java | 62 +++++++++++++++--
 ...verseerCollectionConfigSetProcessorTest.java |  3 +-
 .../solr/cloud/OverseerTaskQueueTest.java       |  4 +-
 .../cloud/AbstractFullDistribZkTestBase.java    | 10 ++-
 7 files changed, 139 insertions(+), 19 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/lucene-solr/blob/232b44e2/solr/core/src/java/org/apache/solr/cloud/OverseerCollectionMessageHandler.java
----------------------------------------------------------------------
diff --git a/solr/core/src/java/org/apache/solr/cloud/OverseerCollectionMessageHandler.java b/solr/core/src/java/org/apache/solr/cloud/OverseerCollectionMessageHandler.java
index 54c0697..2cd09d1 100644
--- a/solr/core/src/java/org/apache/solr/cloud/OverseerCollectionMessageHandler.java
+++ b/solr/core/src/java/org/apache/solr/cloud/OverseerCollectionMessageHandler.java
@@ -82,6 +82,7 @@ import org.apache.solr.common.params.ShardParams;
 import org.apache.solr.common.util.NamedList;
 import org.apache.solr.common.util.SimpleOrderedMap;
 import org.apache.solr.common.util.StrUtils;
+import org.apache.solr.common.util.SuppressForbidden;
 import org.apache.solr.common.util.Utils;
 import org.apache.solr.handler.component.ShardHandler;
 import org.apache.solr.handler.component.ShardHandlerFactory;
@@ -210,6 +211,7 @@ public class OverseerCollectionMessageHandler implements OverseerMessageHandler
   }
 
   @Override
+  @SuppressForbidden(reason = "Needs currentTimeMillis for mock requests")
   @SuppressWarnings("unchecked")
   public SolrResponse processMessage(ZkNodeProps message, String operation) {
     log.info("OverseerCollectionMessageHandler.processMessage : "+ operation + " , "+ message.toString());
@@ -289,6 +291,8 @@ public class OverseerCollectionMessageHandler implements OverseerMessageHandler
         case MOCK_REPLICA_TASK: {
           //only for test purposes
           Thread.sleep(message.getInt("sleep", 1));
+          log.info("MOCK_TASK_EXECUTED time {} data {}",System.currentTimeMillis(), Utils.toJSONString(message));
+          results.add("MOCK_FINISHED", System.currentTimeMillis());
           break;
         }
         default:

http://git-wip-us.apache.org/repos/asf/lucene-solr/blob/232b44e2/solr/core/src/java/org/apache/solr/cloud/OverseerTaskProcessor.java
----------------------------------------------------------------------
diff --git a/solr/core/src/java/org/apache/solr/cloud/OverseerTaskProcessor.java b/solr/core/src/java/org/apache/solr/cloud/OverseerTaskProcessor.java
index 092ed97..9c739c1 100644
--- a/solr/core/src/java/org/apache/solr/cloud/OverseerTaskProcessor.java
+++ b/solr/core/src/java/org/apache/solr/cloud/OverseerTaskProcessor.java
@@ -19,15 +19,19 @@ package org.apache.solr.cloud;
 import java.io.Closeable;
 import java.lang.invoke.MethodHandles;
 import java.util.ArrayList;
+import java.util.Collections;
 import java.util.HashMap;
 import java.util.HashSet;
+import java.util.LinkedHashMap;
 import java.util.List;
 import java.util.Map;
 import java.util.Set;
 import java.util.concurrent.ExecutorService;
 import java.util.concurrent.SynchronousQueue;
 import java.util.concurrent.TimeUnit;
+import java.util.function.Predicate;
 
+import com.google.common.collect.ImmutableSet;
 import org.apache.solr.client.solrj.SolrResponse;
 import org.apache.solr.cloud.OverseerTaskQueue.QueueEvent;
 import org.apache.solr.cloud.Overseer.LeaderStatus;
@@ -36,6 +40,7 @@ import org.apache.solr.common.cloud.SolrZkClient;
 import org.apache.solr.common.cloud.ZkNodeProps;
 import org.apache.solr.common.cloud.ZkStateReader;
 import org.apache.solr.common.util.ExecutorUtil;
+import org.apache.solr.common.util.StrUtils;
 import org.apache.solr.common.util.Utils;
 import org.apache.solr.handler.component.ShardHandlerFactory;
 import org.apache.solr.util.DefaultSolrThreadFactory;
@@ -63,6 +68,7 @@ public class OverseerTaskProcessor implements Runnable, Closeable {
    * executed concurrently
    */
   public static final int MAX_PARALLEL_TASKS = 100;
+  public static final int MAX_BLOCKED_TASKS = 1000;
 
   public ExecutorService tpe;
 
@@ -74,7 +80,7 @@ public class OverseerTaskProcessor implements Runnable, Closeable {
   private DistributedMap failureMap;
 
   // Set that maintains a list of all the tasks that are running. This is keyed on zk id of the task.
-  final private Set runningTasks;
+  final private Set<String> runningTasks;
 
   // List of completed tasks. This is used to clean up workQueue in zk.
   final private HashMap<String, QueueEvent> completedTasks;
@@ -91,6 +97,24 @@ public class OverseerTaskProcessor implements Runnable, Closeable {
   // It may contain tasks that have completed execution, have been entered into the completed/failed map in zk but not
   // deleted from the work-queue as that is a batched operation.
   final private Set<String> runningZKTasks;
+  // This map may contain tasks which are read from work queue but could not
+  // be executed because they are blocked or the execution queue is full
+  // This is an optimization to ensure that we do not read the same tasks
+  // again and again from ZK.
+  final private Map<String, QueueEvent> blockedTasks = new LinkedHashMap<>();
+  final private Predicate<String> excludedTasks = new Predicate<String>() {
+    @Override
+    public boolean test(String s) {
+      return runningTasks.contains(s) || blockedTasks.containsKey(s);
+    }
+
+    @Override
+    public String toString() {
+      return StrUtils.join(ImmutableSet.of(runningTasks, blockedTasks.keySet()), ',');
+    }
+
+  };
+
   private final Object waitLock = new Object();
 
   private OverseerMessageHandlerSelector selector;
@@ -115,7 +139,7 @@ public class OverseerTaskProcessor implements Runnable, Closeable {
     this.completedMap = completedMap;
     this.failureMap = failureMap;
     this.runningZKTasks = new HashSet<>();
-    this.runningTasks = new HashSet();
+    this.runningTasks = new HashSet<>();
     this.completedTasks = new HashMap<>();
   }
 
@@ -189,17 +213,46 @@ public class OverseerTaskProcessor implements Runnable, Closeable {
           if (waited)
             cleanUpWorkQueue();
 
-          List<QueueEvent> heads = workQueue.peekTopN(MAX_PARALLEL_TASKS, runningZKTasks, 2000L);
+
+          ArrayList<QueueEvent> heads = new ArrayList<>(blockedTasks.size() + MAX_PARALLEL_TASKS);
+          heads.addAll(blockedTasks.values());
+
+          //If we have enough items in the blocked tasks already, it makes
+          // no sense to read more items from the work queue. it makes sense
+          // to clear out at least a few items in the queue before we read more items
+          if (heads.size() < MAX_BLOCKED_TASKS) {
+            //instead of reading MAX_PARALLEL_TASKS items always, we should only fetch as much as we can execute
+            int toFetch = Math.min(MAX_BLOCKED_TASKS - heads.size(), MAX_PARALLEL_TASKS - runningTasks.size());
+            List<QueueEvent> newTasks = workQueue.peekTopN(toFetch, excludedTasks, 2000L);
+            log.debug("Got {} tasks from work-queue : [{}]", newTasks.size(), newTasks);
+            heads.addAll(newTasks);
+          } else {
+            // Prevent free-spinning this loop.
+            Thread.sleep(1000);
+          }
+
+          if (isClosed) break;
+
           if (heads.isEmpty()) {
             continue;
           }
 
-          log.debug("Got {} tasks from work-queue : [{}]", heads.size(), heads.toString());
-
-          if (isClosed) break;
+          blockedTasks.clear(); // clear it now; may get refilled below.
 
           taskBatch.batchId++;
+          boolean tooManyTasks = false;
           for (QueueEvent head : heads) {
+            if (!tooManyTasks) {
+              synchronized (runningTasks) {
+                tooManyTasks = runningTasks.size() >= MAX_PARALLEL_TASKS;
+              }
+            }
+            if (tooManyTasks) {
+              // Too many tasks are running, just shove the rest into the "blocked" queue.
+              if(blockedTasks.size() < MAX_BLOCKED_TASKS)
+                blockedTasks.put(head.getId(), head);
+              continue;
+            }
             if (runningZKTasks.contains(head.getId())) continue;
             final ZkNodeProps message = ZkNodeProps.load(head.getBytes());
             OverseerMessageHandler messageHandler = selector.selectOverseerMessageHandler(message);
@@ -217,6 +270,9 @@ public class OverseerTaskProcessor implements Runnable, Closeable {
             OverseerMessageHandler.Lock lock = messageHandler.lockTask(message, taskBatch);
             if (lock == null) {
               log.debug("Exclusivity check failed for [{}]", message.toString());
+              //we may end crossing the size of the MAX_BLOCKED_TASKS. They are fine
+              if (blockedTasks.size() < MAX_BLOCKED_TASKS)
+                blockedTasks.put(head.getId(), head);
               continue;
             }
             try {
@@ -370,7 +426,6 @@ public class OverseerTaskProcessor implements Runnable, Closeable {
       runningTasks.add(head.getId());
     }
 
-//    messageHandler.markExclusiveTask(taskKey, message);
 
     if (asyncId != null)
       runningMap.put(asyncId, null);
@@ -512,6 +567,7 @@ public class OverseerTaskProcessor implements Runnable, Closeable {
       synchronized (runningTasks) {
         log.debug("RunningTasks: {}", runningTasks.toString());
       }
+      log.debug("BlockedTasks: {}", blockedTasks.keySet().toString());
       synchronized (completedTasks) {
         log.debug("CompletedTasks: {}", completedTasks.keySet().toString());
       }

http://git-wip-us.apache.org/repos/asf/lucene-solr/blob/232b44e2/solr/core/src/java/org/apache/solr/cloud/OverseerTaskQueue.java
----------------------------------------------------------------------
diff --git a/solr/core/src/java/org/apache/solr/cloud/OverseerTaskQueue.java b/solr/core/src/java/org/apache/solr/cloud/OverseerTaskQueue.java
index aae7df2..5719aa9 100644
--- a/solr/core/src/java/org/apache/solr/cloud/OverseerTaskQueue.java
+++ b/solr/core/src/java/org/apache/solr/cloud/OverseerTaskQueue.java
@@ -21,6 +21,7 @@ import java.util.ArrayList;
 import java.util.List;
 import java.util.Set;
 import java.util.TreeSet;
+import java.util.function.Predicate;
 
 import org.apache.solr.common.cloud.SolrZkClient;
 import org.apache.solr.common.cloud.ZkNodeProps;
@@ -222,7 +223,7 @@ public class OverseerTaskQueue extends DistributedQueue {
   }
 
 
-  public List<QueueEvent> peekTopN(int n, Set<String> excludeSet, long waitMillis)
+  public List<QueueEvent> peekTopN(int n, Predicate<String> excludeSet, long waitMillis)
       throws KeeperException, InterruptedException {
     ArrayList<QueueEvent> topN = new ArrayList<>();
 
@@ -232,7 +233,7 @@ public class OverseerTaskQueue extends DistributedQueue {
     else time = stats.time(dir + "_peekTopN_wait" + waitMillis);
 
     try {
-      for (Pair<String, byte[]> element : peekElements(n, waitMillis, child -> !excludeSet.contains(dir + "/" + child))) {
+      for (Pair<String, byte[]> element : peekElements(n, waitMillis, child -> !excludeSet.test(dir + "/" + child))) {
         topN.add(new QueueEvent(dir + "/" + element.first(),
             element.second(), null));
       }

http://git-wip-us.apache.org/repos/asf/lucene-solr/blob/232b44e2/solr/core/src/test/org/apache/solr/cloud/MultiThreadedOCPTest.java
----------------------------------------------------------------------
diff --git a/solr/core/src/test/org/apache/solr/cloud/MultiThreadedOCPTest.java b/solr/core/src/test/org/apache/solr/cloud/MultiThreadedOCPTest.java
index c18b330..ec8a6c4 100644
--- a/solr/core/src/test/org/apache/solr/cloud/MultiThreadedOCPTest.java
+++ b/solr/core/src/test/org/apache/solr/cloud/MultiThreadedOCPTest.java
@@ -38,6 +38,11 @@ import org.junit.Test;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 
+import static org.apache.solr.cloud.Overseer.QUEUE_OPERATION;
+import static org.apache.solr.cloud.OverseerTaskProcessor.MAX_PARALLEL_TASKS;
+import static org.apache.solr.common.params.CollectionParams.CollectionAction.MOCK_COLL_TASK;
+import static org.apache.solr.common.params.CommonAdminParams.ASYNC;
+
 /**
  * Tests the Multi threaded Collections API.
  */
@@ -55,11 +60,58 @@ public class MultiThreadedOCPTest extends AbstractFullDistribZkTestBase {
   @Test
   @ShardsFixed(num = 4)
   public void test() throws Exception {
-
     testParallelCollectionAPICalls();
     testTaskExclusivity();
     testDeduplicationOfSubmittedTasks();
     testLongAndShortRunningParallelApiCalls();
+    testFillWorkQueue();
+  }
+
+  private void testFillWorkQueue() throws Exception {
+    try (SolrClient client = createNewSolrClient("", getBaseUrl((HttpSolrClient) clients.get(0)))) {
+      DistributedQueue distributedQueue = new DistributedQueue(cloudClient.getZkStateReader().getZkClient(),
+          "/overseer/collection-queue-work", new Overseer.Stats());
+      //fill the work queue with blocked tasks by adding more than the no:of parallel tasks
+      for (int i = 0; i < MAX_PARALLEL_TASKS+5; i++) {
+        distributedQueue.offer(Utils.toJSON(Utils.makeMap(
+            "collection", "A_COLL",
+            QUEUE_OPERATION, MOCK_COLL_TASK.toLower(),
+            ASYNC, String.valueOf(i),
+
+            "sleep", (i == 0 ? "1000" : "1") //first task waits for 1 second, and thus blocking
+            // all other tasks. Subsequent tasks only wait for 1ms
+        )));
+        log.info("MOCK task added {}", i);
+
+      }
+      Thread.sleep(10);//wait and post the next message
+
+      //this is not going to be blocked because it operates on another collection
+      distributedQueue.offer(Utils.toJSON(Utils.makeMap(
+          "collection", "B_COLL",
+          QUEUE_OPERATION, MOCK_COLL_TASK.toLower(),
+          ASYNC, "200",
+          "sleep", "1"
+      )));
+
+
+      Long acoll = null, bcoll = null;
+      for (int i = 0; i < 100; i++) {
+        if (bcoll == null) {
+          CollectionAdminResponse statusResponse = getStatusResponse("200", client);
+          bcoll = (Long) statusResponse.getResponse().get("MOCK_FINISHED");
+        }
+        if (acoll == null) {
+          CollectionAdminResponse statusResponse = getStatusResponse("2", client);
+          acoll = (Long) statusResponse.getResponse().get("MOCK_FINISHED");
+        }
+        if (acoll != null && bcoll != null) break;
+        Thread.sleep(100);
+      }
+      assertTrue(acoll != null && bcoll != null);
+      assertTrue(acoll > bcoll);
+    }
+
   }
 
   private void testParallelCollectionAPICalls() throws IOException, SolrServerException {
@@ -116,14 +168,14 @@ public class MultiThreadedOCPTest extends AbstractFullDistribZkTestBase {
 
       distributedQueue.offer(Utils.toJSON(Utils.makeMap(
           "collection", "ocptest_shardsplit",
-          Overseer.QUEUE_OPERATION, CollectionParams.CollectionAction.MOCK_COLL_TASK.toLower(),
-          CommonAdminParams.ASYNC, "1001",
+          QUEUE_OPERATION, MOCK_COLL_TASK.toLower(),
+          ASYNC, "1001",
           "sleep", "100"
       )));
       distributedQueue.offer(Utils.toJSON(Utils.makeMap(
           "collection", "ocptest_shardsplit",
-          Overseer.QUEUE_OPERATION, CollectionParams.CollectionAction.MOCK_COLL_TASK.toLower(),
-          CommonAdminParams.ASYNC, "1002",
+          QUEUE_OPERATION, MOCK_COLL_TASK.toLower(),
+          ASYNC, "1002",
           "sleep", "100"
       )));
 

http://git-wip-us.apache.org/repos/asf/lucene-solr/blob/232b44e2/solr/core/src/test/org/apache/solr/cloud/OverseerCollectionConfigSetProcessorTest.java
----------------------------------------------------------------------
diff --git a/solr/core/src/test/org/apache/solr/cloud/OverseerCollectionConfigSetProcessorTest.java b/solr/core/src/test/org/apache/solr/cloud/OverseerCollectionConfigSetProcessorTest.java
index 4658367..8f4ee1e 100644
--- a/solr/core/src/test/org/apache/solr/cloud/OverseerCollectionConfigSetProcessorTest.java
+++ b/solr/core/src/test/org/apache/solr/cloud/OverseerCollectionConfigSetProcessorTest.java
@@ -21,6 +21,7 @@ import java.util.*;
 import java.util.Map.Entry;
 import java.util.concurrent.ArrayBlockingQueue;
 import java.util.concurrent.TimeUnit;
+import java.util.function.Predicate;
 
 import org.apache.solr.SolrTestCaseJ4;
 import org.apache.solr.client.solrj.SolrResponse;
@@ -163,7 +164,7 @@ public class OverseerCollectionConfigSetProcessorTest extends SolrTestCaseJ4 {
       log.info("SHARDHANDLER");
       return shardHandlerMock;
     }).anyTimes();
-    workQueueMock.peekTopN(EasyMock.anyInt(), anyObject(Set.class), EasyMock.anyLong());
+    workQueueMock.peekTopN(EasyMock.anyInt(), anyObject(Predicate.class), EasyMock.anyLong());
     expectLastCall().andAnswer(() -> {
       Object result;
       int count = 0;

http://git-wip-us.apache.org/repos/asf/lucene-solr/blob/232b44e2/solr/core/src/test/org/apache/solr/cloud/OverseerTaskQueueTest.java
----------------------------------------------------------------------
diff --git a/solr/core/src/test/org/apache/solr/cloud/OverseerTaskQueueTest.java b/solr/core/src/test/org/apache/solr/cloud/OverseerTaskQueueTest.java
index 95cdd40..6380aac 100644
--- a/solr/core/src/test/org/apache/solr/cloud/OverseerTaskQueueTest.java
+++ b/solr/core/src/test/org/apache/solr/cloud/OverseerTaskQueueTest.java
@@ -20,6 +20,8 @@ import java.util.Collections;
 import java.util.HashMap;
 import java.util.List;
 import java.util.Map;
+import java.util.function.Function;
+import java.util.function.Predicate;
 
 import org.apache.solr.client.solrj.SolrResponse;
 import org.apache.solr.client.solrj.response.SolrResponseBase;
@@ -75,7 +77,7 @@ public class OverseerTaskQueueTest extends DistributedQueueTest {
     tq.createRequestNode(Utils.toJSON(props), watchID);
 
     // Set a SolrResponse as the response node by removing the QueueEvent, as done in OverseerTaskProcessor
-    List<OverseerTaskQueue.QueueEvent> queueEvents = tq.peekTopN(2, Collections.emptySet(), 1000);
+    List<OverseerTaskQueue.QueueEvent> queueEvents = tq.peekTopN(2, s -> false, 1000);
     OverseerTaskQueue.QueueEvent requestId2Event = null;
     for (OverseerTaskQueue.QueueEvent queueEvent : queueEvents) {
       Map<String, Object> eventProps = (Map<String, Object>) Utils.fromJSON(queueEvent.getBytes());

http://git-wip-us.apache.org/repos/asf/lucene-solr/blob/232b44e2/solr/test-framework/src/java/org/apache/solr/cloud/AbstractFullDistribZkTestBase.java
----------------------------------------------------------------------
diff --git a/solr/test-framework/src/java/org/apache/solr/cloud/AbstractFullDistribZkTestBase.java b/solr/test-framework/src/java/org/apache/solr/cloud/AbstractFullDistribZkTestBase.java
index 5439841..4b0f7ab 100644
--- a/solr/test-framework/src/java/org/apache/solr/cloud/AbstractFullDistribZkTestBase.java
+++ b/solr/test-framework/src/java/org/apache/solr/cloud/AbstractFullDistribZkTestBase.java
@@ -1988,12 +1988,16 @@ public abstract class AbstractFullDistribZkTestBase extends AbstractDistribZkTes
   }
 
   static RequestStatusState getRequestState(String requestId, SolrClient client) throws IOException, SolrServerException {
-    CollectionAdminRequest.RequestStatus requestStatusRequest = new CollectionAdminRequest.RequestStatus();
-    requestStatusRequest.setRequestId(requestId);
-    CollectionAdminResponse response = requestStatusRequest.process(client);
+    CollectionAdminResponse response = getStatusResponse(requestId, client);
 
     NamedList innerResponse = (NamedList) response.getResponse().get("status");
     return RequestStatusState.fromKey((String) innerResponse.get("state"));
   }
 
+  static CollectionAdminResponse getStatusResponse(String requestId, SolrClient client) throws SolrServerException, IOException {
+    CollectionAdminRequest.RequestStatus requestStatusRequest = new CollectionAdminRequest.RequestStatus();
+    requestStatusRequest.setRequestId(requestId);
+    return requestStatusRequest.process(client);
+  }
+
 }