You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@lucene.apache.org by sh...@apache.org on 2015/02/17 18:10:43 UTC

svn commit: r1660449 - in /lucene/dev/trunk/solr: CHANGES.txt core/src/java/org/apache/solr/cloud/Overseer.java core/src/java/org/apache/solr/cloud/OverseerCollectionProcessor.java

Author: shalin
Date: Tue Feb 17 17:10:43 2015
New Revision: 1660449

URL: http://svn.apache.org/r1660449
Log:
SOLR-6956: OverseerCollectionProcessor and replicas on the overseer node can sometimes operate on stale cluster state

Modified:
    lucene/dev/trunk/solr/CHANGES.txt
    lucene/dev/trunk/solr/core/src/java/org/apache/solr/cloud/Overseer.java
    lucene/dev/trunk/solr/core/src/java/org/apache/solr/cloud/OverseerCollectionProcessor.java

Modified: lucene/dev/trunk/solr/CHANGES.txt
URL: http://svn.apache.org/viewvc/lucene/dev/trunk/solr/CHANGES.txt?rev=1660449&r1=1660448&r2=1660449&view=diff
==============================================================================
--- lucene/dev/trunk/solr/CHANGES.txt (original)
+++ lucene/dev/trunk/solr/CHANGES.txt Tue Feb 17 17:10:43 2015
@@ -135,6 +135,10 @@ Bug Fixes
 * SOLR-7084: FreeTextSuggester: Better error message when doing a lookup
   during dictionary build. Used to be nullpointer (janhoy)
 
+* SOLR-6956: OverseerCollectionProcessor and replicas on the overseer node can sometimes
+  operate on stale cluster state due to overseer holding the state update lock for a
+  long time. (Mark Miller, shalin)
+
 Optimizations
 ----------------------
  * SOLR-7049: Move work done by the LIST Collections API call to the Collections

Modified: lucene/dev/trunk/solr/core/src/java/org/apache/solr/cloud/Overseer.java
URL: http://svn.apache.org/viewvc/lucene/dev/trunk/solr/core/src/java/org/apache/solr/cloud/Overseer.java?rev=1660449&r1=1660448&r2=1660449&view=diff
==============================================================================
--- lucene/dev/trunk/solr/core/src/java/org/apache/solr/cloud/Overseer.java (original)
+++ lucene/dev/trunk/solr/core/src/java/org/apache/solr/cloud/Overseer.java Tue Feb 17 17:10:43 2015
@@ -143,59 +143,7 @@ public class Overseer implements Closeab
         log.debug("am_i_leader unclear {}", isLeader);
         isLeader = amILeader();  // not a no, not a yes, try ask again
       }
-      if (!this.isClosed && LeaderStatus.YES == isLeader) {
-        // see if there's something left from the previous Overseer and re
-        // process all events that were not persisted into cloud state
-        synchronized (reader.getUpdateLock()) { // XXX this only protects
-                                                // against edits inside single
-                                                // node
-          try {
-            byte[] head = workQueue.peek();
-            
-            if (head != null) {
-              reader.updateClusterState(true);
-              ClusterState clusterState = reader.getClusterState();
-              log.info("Replaying operations from work queue.");
 
-              ZkStateWriter zkStateWriter = new ZkStateWriter(reader, stats);
-
-              while (head != null) {
-                isLeader = amILeader();
-                if (LeaderStatus.NO == isLeader) {
-                  break;
-                }
-                else if (LeaderStatus.YES == isLeader) {
-                  final ZkNodeProps message = ZkNodeProps.load(head);
-                  log.info("processMessage: queueSize: {}, message = {}", workQueue.getStats().getQueueLength(), message);
-                  clusterState = processQueueItem(message, clusterState, zkStateWriter, false, null);
-                  workQueue.poll(); // poll-ing removes the element we got by peek-ing
-                }
-                else {
-                  log.info("am_i_leader unclear {}", isLeader);                  
-                  // re-peek below in case our 'head' value is out-of-date by now
-                }
-                head = workQueue.peek();
-              }
-              // force flush at the end of the loop
-              clusterState = zkStateWriter.writePendingUpdates();
-            }
-          } catch (KeeperException e) {
-            if (e.code() == KeeperException.Code.SESSIONEXPIRED) {
-              log.warn("Solr cannot talk to ZK, exiting Overseer work queue loop", e);
-              return;
-            }
-            log.error("Exception in Overseer work queue loop", e);
-          } catch (InterruptedException e) {
-            Thread.currentThread().interrupt();
-            return;
-            
-          } catch (Exception e) {
-            log.error("Exception in Overseer work queue loop", e);
-          }
-        }
-        
-      }
-      
       log.info("Starting to work on the main queue");
       try {
         ZkStateWriter zkStateWriter = new ZkStateWriter(reader, stats);
@@ -210,6 +158,45 @@ public class Overseer implements Closeab
             log.debug("am_i_leader unclear {}", isLeader);
             continue; // not a no, not a yes, try ask again
           }
+
+          if (refreshClusterState) {
+            try {
+              reader.updateClusterState(true);
+              clusterState = reader.getClusterState();
+              refreshClusterState = false;
+
+              // if there were any errors while processing
+              // the state queue, items would have been left in the
+              // work queue so let's process those first
+              byte[] data = workQueue.peek();
+              boolean hadWorkItems = data != null;
+              while (data != null)  {
+                final ZkNodeProps message = ZkNodeProps.load(data);
+                log.info("processMessage: workQueueSize: {}, message = {}", workQueue.getStats().getQueueLength(), message);
+                // force flush to ZK after each message because there is no fallback if workQueue items
+                // are removed from workQueue but fail to be written to ZK
+                clusterState = processQueueItem(message, clusterState, zkStateWriter, false, null);
+                workQueue.poll(); // poll-ing removes the element we got by peek-ing
+                data = workQueue.peek();
+              }
+              // force flush at the end of the loop
+              if (hadWorkItems) {
+                clusterState = zkStateWriter.writePendingUpdates();
+              }
+            } catch (KeeperException e) {
+              if (e.code() == KeeperException.Code.SESSIONEXPIRED) {
+                log.warn("Solr cannot talk to ZK, exiting Overseer work queue loop", e);
+                return;
+              }
+              log.error("Exception in Overseer work queue loop", e);
+            } catch (InterruptedException e) {
+              Thread.currentThread().interrupt();
+              return;
+            } catch (Exception e) {
+              log.error("Exception in Overseer work queue loop", e);
+            }
+          }
+
           DistributedQueue.QueueEvent head = null;
           try {
             head = stateUpdateQueue.peek(true);
@@ -227,81 +214,53 @@ public class Overseer implements Closeab
           } catch (Exception e) {
             log.error("Exception in Overseer main queue loop", e);
           }
-          synchronized (reader.getUpdateLock()) {
-            try {
-              if (refreshClusterState) {
-                reader.updateClusterState(true);
-                clusterState = reader.getClusterState();
-                refreshClusterState = false;
-
-                // if there were any errors while processing
-                // the state queue, items would have been left in the
-                // work queue so let's process those first
-                byte[] data = workQueue.peek();
-                boolean hadWorkItems = data != null;
-                while (data != null)  {
-                  final ZkNodeProps message = ZkNodeProps.load(data);
-                  log.info("processMessage: queueSize: {}, message = {}", workQueue.getStats().getQueueLength(), message);
-                  // force flush to ZK after each message because there is no fallback if workQueue items
-                  // are removed from workQueue but fail to be written to ZK
-                  clusterState = processQueueItem(message, clusterState, zkStateWriter, false, null);
-                  workQueue.poll(); // poll-ing removes the element we got by peek-ing
-                  data = workQueue.peek();
+          try {
+            while (head != null) {
+              final byte[] data = head.getBytes();
+              final ZkNodeProps message = ZkNodeProps.load(head.getBytes());
+              log.info("processMessage: queueSize: {}, message = {} current state version: {}", stateUpdateQueue.getStats().getQueueLength(), message, clusterState.getZkClusterStateVersion());
+              // we can batch here because workQueue is our fallback in case a ZK write failed
+              clusterState = processQueueItem(message, clusterState, zkStateWriter, true, new ZkStateWriter.ZkWriteCallback() {
+                @Override
+                public void onEnqueue() throws Exception {
+                  workQueue.offer(data);
                 }
-                // force flush at the end of the loop
-                if (hadWorkItems) {
-                  clusterState = zkStateWriter.writePendingUpdates();
+
+                @Override
+                public void onWrite() throws Exception {
+                  // remove everything from workQueue
+                  while (workQueue.poll() != null);
                 }
-              }
+              });
 
-              while (head != null) {
-                final byte[] data = head.getBytes();
-                final ZkNodeProps message = ZkNodeProps.load(head.getBytes());
-                log.info("processMessage: queueSize: {}, message = {} current state version: {}", stateUpdateQueue.getStats().getQueueLength(), message, clusterState.getZkClusterStateVersion());
-                // we can batch here because workQueue is our fallback in case a ZK write failed
-                clusterState = processQueueItem(message, clusterState, zkStateWriter, true, new ZkStateWriter.ZkWriteCallback() {
-                  @Override
-                  public void onEnqueue() throws Exception {
-                    workQueue.offer(data);
-                  }
-
-                  @Override
-                  public void onWrite() throws Exception {
-                    // remove everything from workQueue
-                    while (workQueue.poll() != null);
-                  }
-                });
-
-                // it is safer to keep this poll here because an invalid message might never be queued
-                // and therefore we can't rely on the ZkWriteCallback to remove the item
-                stateUpdateQueue.poll();
-
-                if (isClosed) break;
-                // if an event comes in the next 100ms batch it together
-                head = stateUpdateQueue.peek(100);
-              }
-              // we should force write all pending updates because the next iteration might sleep until there
-              // are more items in the main queue
-              clusterState = zkStateWriter.writePendingUpdates();
-              // clean work queue
-              while (workQueue.poll() != null);
+              // it is safer to keep this poll here because an invalid message might never be queued
+              // and therefore we can't rely on the ZkWriteCallback to remove the item
+              stateUpdateQueue.poll();
+
+              if (isClosed) break;
+              // if an event comes in the next 100ms batch it together
+              head = stateUpdateQueue.peek(100);
+            }
+            // we should force write all pending updates because the next iteration might sleep until there
+            // are more items in the main queue
+            clusterState = zkStateWriter.writePendingUpdates();
+            // clean work queue
+            while (workQueue.poll() != null);
 
-            } catch (KeeperException e) {
-              if (e.code() == KeeperException.Code.SESSIONEXPIRED) {
-                log.warn("Solr cannot talk to ZK, exiting Overseer main queue loop", e);
-                return;
-              }
-              log.error("Exception in Overseer main queue loop", e);
-              refreshClusterState = true; // it might have been a bad version error
-            } catch (InterruptedException e) {
-              Thread.currentThread().interrupt();
+          } catch (KeeperException e) {
+            if (e.code() == KeeperException.Code.SESSIONEXPIRED) {
+              log.warn("Solr cannot talk to ZK, exiting Overseer main queue loop", e);
               return;
-            } catch (Exception e) {
-              log.error("Exception in Overseer main queue loop", e);
-              refreshClusterState = true; // it might have been a bad version error
             }
+            log.error("Exception in Overseer main queue loop", e);
+            refreshClusterState = true; // it might have been a bad version error
+          } catch (InterruptedException e) {
+            Thread.currentThread().interrupt();
+            return;
+          } catch (Exception e) {
+            log.error("Exception in Overseer main queue loop", e);
+            refreshClusterState = true; // it might have been a bad version error
           }
-
         }
       } finally {
         log.info("Overseer Loop exiting : {}", LeaderElector.getNodeName(myId));

Modified: lucene/dev/trunk/solr/core/src/java/org/apache/solr/cloud/OverseerCollectionProcessor.java
URL: http://svn.apache.org/viewvc/lucene/dev/trunk/solr/core/src/java/org/apache/solr/cloud/OverseerCollectionProcessor.java?rev=1660449&r1=1660448&r2=1660449&view=diff
==============================================================================
--- lucene/dev/trunk/solr/core/src/java/org/apache/solr/cloud/OverseerCollectionProcessor.java (original)
+++ lucene/dev/trunk/solr/core/src/java/org/apache/solr/cloud/OverseerCollectionProcessor.java Tue Feb 17 17:10:43 2015
@@ -580,6 +580,8 @@ public class OverseerCollectionProcessor
 
     NamedList results = new NamedList();
     try {
+      // force update the cluster state
+      zkStateReader.updateClusterState(true);
       CollectionParams.CollectionAction action = CollectionParams.CollectionAction.get(operation);
       if (action == null) {
         // back-compat because we used strings different than enum values before SOLR-6115