You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@lucene.apache.org by sh...@apache.org on 2015/02/17 18:17:14 UTC
svn commit: r1660450 - in /lucene/dev/branches/branch_5x: ./ solr/
solr/CHANGES.txt solr/core/
solr/core/src/java/org/apache/solr/cloud/Overseer.java
solr/core/src/java/org/apache/solr/cloud/OverseerCollectionProcessor.java
Author: shalin
Date: Tue Feb 17 17:17:13 2015
New Revision: 1660450
URL: http://svn.apache.org/r1660450
Log:
SOLR-6956: OverseerCollectionProcessor and replicas on the overseer node can sometimes operate on stale cluster state
Modified:
lucene/dev/branches/branch_5x/ (props changed)
lucene/dev/branches/branch_5x/solr/ (props changed)
lucene/dev/branches/branch_5x/solr/CHANGES.txt (contents, props changed)
lucene/dev/branches/branch_5x/solr/core/ (props changed)
lucene/dev/branches/branch_5x/solr/core/src/java/org/apache/solr/cloud/Overseer.java
lucene/dev/branches/branch_5x/solr/core/src/java/org/apache/solr/cloud/OverseerCollectionProcessor.java
Modified: lucene/dev/branches/branch_5x/solr/CHANGES.txt
URL: http://svn.apache.org/viewvc/lucene/dev/branches/branch_5x/solr/CHANGES.txt?rev=1660450&r1=1660449&r2=1660450&view=diff
==============================================================================
--- lucene/dev/branches/branch_5x/solr/CHANGES.txt (original)
+++ lucene/dev/branches/branch_5x/solr/CHANGES.txt Tue Feb 17 17:17:13 2015
@@ -87,6 +87,10 @@ Bug Fixes
* SOLR-7084: FreeTextSuggester: Better error message when doing a lookup
during dictionary build. Used to be nullpointer (janhoy)
+* SOLR-6956: OverseerCollectionProcessor and replicas on the overseer node can sometimes
+ operate on stale cluster state due to overseer holding the state update lock for a
+ long time. (Mark Miller, shalin)
+
Optimizations
----------------------
* SOLR-7049: Move work done by the LIST Collections API call to the Collections
Modified: lucene/dev/branches/branch_5x/solr/core/src/java/org/apache/solr/cloud/Overseer.java
URL: http://svn.apache.org/viewvc/lucene/dev/branches/branch_5x/solr/core/src/java/org/apache/solr/cloud/Overseer.java?rev=1660450&r1=1660449&r2=1660450&view=diff
==============================================================================
--- lucene/dev/branches/branch_5x/solr/core/src/java/org/apache/solr/cloud/Overseer.java (original)
+++ lucene/dev/branches/branch_5x/solr/core/src/java/org/apache/solr/cloud/Overseer.java Tue Feb 17 17:17:13 2015
@@ -143,59 +143,7 @@ public class Overseer implements Closeab
log.debug("am_i_leader unclear {}", isLeader);
isLeader = amILeader(); // not a no, not a yes, try ask again
}
- if (!this.isClosed && LeaderStatus.YES == isLeader) {
- // see if there's something left from the previous Overseer and re
- // process all events that were not persisted into cloud state
- synchronized (reader.getUpdateLock()) { // XXX this only protects
- // against edits inside single
- // node
- try {
- byte[] head = workQueue.peek();
-
- if (head != null) {
- reader.updateClusterState(true);
- ClusterState clusterState = reader.getClusterState();
- log.info("Replaying operations from work queue.");
- ZkStateWriter zkStateWriter = new ZkStateWriter(reader, stats);
-
- while (head != null) {
- isLeader = amILeader();
- if (LeaderStatus.NO == isLeader) {
- break;
- }
- else if (LeaderStatus.YES == isLeader) {
- final ZkNodeProps message = ZkNodeProps.load(head);
- log.info("processMessage: queueSize: {}, message = {}", workQueue.getStats().getQueueLength(), message);
- clusterState = processQueueItem(message, clusterState, zkStateWriter, false, null);
- workQueue.poll(); // poll-ing removes the element we got by peek-ing
- }
- else {
- log.info("am_i_leader unclear {}", isLeader);
- // re-peek below in case our 'head' value is out-of-date by now
- }
- head = workQueue.peek();
- }
- // force flush at the end of the loop
- clusterState = zkStateWriter.writePendingUpdates();
- }
- } catch (KeeperException e) {
- if (e.code() == KeeperException.Code.SESSIONEXPIRED) {
- log.warn("Solr cannot talk to ZK, exiting Overseer work queue loop", e);
- return;
- }
- log.error("Exception in Overseer work queue loop", e);
- } catch (InterruptedException e) {
- Thread.currentThread().interrupt();
- return;
-
- } catch (Exception e) {
- log.error("Exception in Overseer work queue loop", e);
- }
- }
-
- }
-
log.info("Starting to work on the main queue");
try {
ZkStateWriter zkStateWriter = new ZkStateWriter(reader, stats);
@@ -210,6 +158,45 @@ public class Overseer implements Closeab
log.debug("am_i_leader unclear {}", isLeader);
continue; // not a no, not a yes, try ask again
}
+
+ if (refreshClusterState) {
+ try {
+ reader.updateClusterState(true);
+ clusterState = reader.getClusterState();
+ refreshClusterState = false;
+
+ // if there were any errors while processing
+ // the state queue, items would have been left in the
+ // work queue so let's process those first
+ byte[] data = workQueue.peek();
+ boolean hadWorkItems = data != null;
+ while (data != null) {
+ final ZkNodeProps message = ZkNodeProps.load(data);
+ log.info("processMessage: workQueueSize: {}, message = {}", workQueue.getStats().getQueueLength(), message);
+ // force flush to ZK after each message because there is no fallback if workQueue items
+ // are removed from workQueue but fail to be written to ZK
+ clusterState = processQueueItem(message, clusterState, zkStateWriter, false, null);
+ workQueue.poll(); // poll-ing removes the element we got by peek-ing
+ data = workQueue.peek();
+ }
+ // force flush at the end of the loop
+ if (hadWorkItems) {
+ clusterState = zkStateWriter.writePendingUpdates();
+ }
+ } catch (KeeperException e) {
+ if (e.code() == KeeperException.Code.SESSIONEXPIRED) {
+ log.warn("Solr cannot talk to ZK, exiting Overseer work queue loop", e);
+ return;
+ }
+ log.error("Exception in Overseer work queue loop", e);
+ } catch (InterruptedException e) {
+ Thread.currentThread().interrupt();
+ return;
+ } catch (Exception e) {
+ log.error("Exception in Overseer work queue loop", e);
+ }
+ }
+
DistributedQueue.QueueEvent head = null;
try {
head = stateUpdateQueue.peek(true);
@@ -227,81 +214,53 @@ public class Overseer implements Closeab
} catch (Exception e) {
log.error("Exception in Overseer main queue loop", e);
}
- synchronized (reader.getUpdateLock()) {
- try {
- if (refreshClusterState) {
- reader.updateClusterState(true);
- clusterState = reader.getClusterState();
- refreshClusterState = false;
-
- // if there were any errors while processing
- // the state queue, items would have been left in the
- // work queue so let's process those first
- byte[] data = workQueue.peek();
- boolean hadWorkItems = data != null;
- while (data != null) {
- final ZkNodeProps message = ZkNodeProps.load(data);
- log.info("processMessage: queueSize: {}, message = {}", workQueue.getStats().getQueueLength(), message);
- // force flush to ZK after each message because there is no fallback if workQueue items
- // are removed from workQueue but fail to be written to ZK
- clusterState = processQueueItem(message, clusterState, zkStateWriter, false, null);
- workQueue.poll(); // poll-ing removes the element we got by peek-ing
- data = workQueue.peek();
+ try {
+ while (head != null) {
+ final byte[] data = head.getBytes();
+ final ZkNodeProps message = ZkNodeProps.load(head.getBytes());
+ log.info("processMessage: queueSize: {}, message = {} current state version: {}", stateUpdateQueue.getStats().getQueueLength(), message, clusterState.getZkClusterStateVersion());
+ // we can batch here because workQueue is our fallback in case a ZK write failed
+ clusterState = processQueueItem(message, clusterState, zkStateWriter, true, new ZkStateWriter.ZkWriteCallback() {
+ @Override
+ public void onEnqueue() throws Exception {
+ workQueue.offer(data);
}
- // force flush at the end of the loop
- if (hadWorkItems) {
- clusterState = zkStateWriter.writePendingUpdates();
+
+ @Override
+ public void onWrite() throws Exception {
+ // remove everything from workQueue
+ while (workQueue.poll() != null);
}
- }
+ });
- while (head != null) {
- final byte[] data = head.getBytes();
- final ZkNodeProps message = ZkNodeProps.load(head.getBytes());
- log.info("processMessage: queueSize: {}, message = {} current state version: {}", stateUpdateQueue.getStats().getQueueLength(), message, clusterState.getZkClusterStateVersion());
- // we can batch here because workQueue is our fallback in case a ZK write failed
- clusterState = processQueueItem(message, clusterState, zkStateWriter, true, new ZkStateWriter.ZkWriteCallback() {
- @Override
- public void onEnqueue() throws Exception {
- workQueue.offer(data);
- }
-
- @Override
- public void onWrite() throws Exception {
- // remove everything from workQueue
- while (workQueue.poll() != null);
- }
- });
-
- // it is safer to keep this poll here because an invalid message might never be queued
- // and therefore we can't rely on the ZkWriteCallback to remove the item
- stateUpdateQueue.poll();
-
- if (isClosed) break;
- // if an event comes in the next 100ms batch it together
- head = stateUpdateQueue.peek(100);
- }
- // we should force write all pending updates because the next iteration might sleep until there
- // are more items in the main queue
- clusterState = zkStateWriter.writePendingUpdates();
- // clean work queue
- while (workQueue.poll() != null);
+ // it is safer to keep this poll here because an invalid message might never be queued
+ // and therefore we can't rely on the ZkWriteCallback to remove the item
+ stateUpdateQueue.poll();
+
+ if (isClosed) break;
+ // if an event comes in the next 100ms batch it together
+ head = stateUpdateQueue.peek(100);
+ }
+ // we should force write all pending updates because the next iteration might sleep until there
+ // are more items in the main queue
+ clusterState = zkStateWriter.writePendingUpdates();
+ // clean work queue
+ while (workQueue.poll() != null);
- } catch (KeeperException e) {
- if (e.code() == KeeperException.Code.SESSIONEXPIRED) {
- log.warn("Solr cannot talk to ZK, exiting Overseer main queue loop", e);
- return;
- }
- log.error("Exception in Overseer main queue loop", e);
- refreshClusterState = true; // it might have been a bad version error
- } catch (InterruptedException e) {
- Thread.currentThread().interrupt();
+ } catch (KeeperException e) {
+ if (e.code() == KeeperException.Code.SESSIONEXPIRED) {
+ log.warn("Solr cannot talk to ZK, exiting Overseer main queue loop", e);
return;
- } catch (Exception e) {
- log.error("Exception in Overseer main queue loop", e);
- refreshClusterState = true; // it might have been a bad version error
}
+ log.error("Exception in Overseer main queue loop", e);
+ refreshClusterState = true; // it might have been a bad version error
+ } catch (InterruptedException e) {
+ Thread.currentThread().interrupt();
+ return;
+ } catch (Exception e) {
+ log.error("Exception in Overseer main queue loop", e);
+ refreshClusterState = true; // it might have been a bad version error
}
-
}
} finally {
log.info("Overseer Loop exiting : {}", LeaderElector.getNodeName(myId));
Modified: lucene/dev/branches/branch_5x/solr/core/src/java/org/apache/solr/cloud/OverseerCollectionProcessor.java
URL: http://svn.apache.org/viewvc/lucene/dev/branches/branch_5x/solr/core/src/java/org/apache/solr/cloud/OverseerCollectionProcessor.java?rev=1660450&r1=1660449&r2=1660450&view=diff
==============================================================================
--- lucene/dev/branches/branch_5x/solr/core/src/java/org/apache/solr/cloud/OverseerCollectionProcessor.java (original)
+++ lucene/dev/branches/branch_5x/solr/core/src/java/org/apache/solr/cloud/OverseerCollectionProcessor.java Tue Feb 17 17:17:13 2015
@@ -579,6 +579,8 @@ public class OverseerCollectionProcessor
NamedList results = new NamedList();
try {
+ // force update the cluster state
+ zkStateReader.updateClusterState(true);
CollectionParams.CollectionAction action = CollectionParams.CollectionAction.get(operation);
if (action == null) {
// back-compat because we used strings different than enum values before SOLR-6115