You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@lucene.apache.org by sh...@apache.org on 2015/08/17 22:34:58 UTC
svn commit: r1696335 -
/lucene/dev/trunk/solr/solrj/src/java/org/apache/solr/common/cloud/ZkStateReader.java
Author: shalin
Date: Mon Aug 17 20:34:58 2015
New Revision: 1696335
URL: http://svn.apache.org/r1696335
Log:
SOLR-5756: Fix for race condition between unwatch and watcher fire event
Modified:
lucene/dev/trunk/solr/solrj/src/java/org/apache/solr/common/cloud/ZkStateReader.java
Modified: lucene/dev/trunk/solr/solrj/src/java/org/apache/solr/common/cloud/ZkStateReader.java
URL: http://svn.apache.org/viewvc/lucene/dev/trunk/solr/solrj/src/java/org/apache/solr/common/cloud/ZkStateReader.java?rev=1696335&r1=1696334&r2=1696335&view=diff
==============================================================================
--- lucene/dev/trunk/solr/solrj/src/java/org/apache/solr/common/cloud/ZkStateReader.java (original)
+++ lucene/dev/trunk/solr/solrj/src/java/org/apache/solr/common/cloud/ZkStateReader.java Mon Aug 17 20:34:58 2015
@@ -21,6 +21,7 @@ import java.io.Closeable;
import java.io.UnsupportedEncodingException;
import java.net.URLDecoder;
import java.util.ArrayList;
+import java.util.Collection;
import java.util.Collections;
import java.util.HashMap;
import java.util.HashSet;
@@ -254,7 +255,9 @@ public class ZkStateReader implements Cl
}
// No need to set watchers because we should already have watchers registered for everything.
refreshLegacyClusterState(null);
- for (String coll : watchedCollectionStates.keySet()) {
+ // Need a copy so we don't delete from what we're iterating over.
+ Collection<String> safeCopy = new ArrayList<>(watchedCollectionStates.keySet());
+ for (String coll : safeCopy) {
DocCollection newState = fetchCollectionState(coll, null);
updateWatchedCollection(coll, newState);
}
@@ -450,6 +453,13 @@ public class ZkStateReader implements Cl
}
this.clusterState = new ClusterState(liveNodes, result, legacyClusterStateVersion);
+ log.debug("clusterStateSet: version {} legacy {} interesting {} watched {} lazy {} total {}",
+ clusterState.getZkClusterStateVersion(),
+ legacyCollectionStates.keySet(),
+ interestingCollections,
+ watchedCollectionStates.keySet(),
+ lazyCollectionStates.keySet(),
+ clusterState.getCollections());
}
/**
@@ -1023,34 +1033,47 @@ public class ZkStateReader implements Cl
return;
}
- log.info("Updating data for {} to ver {} ", coll, newState.getZNodeVersion());
// CAS update loop
while (true) {
+ if (!interestingCollections.contains(coll)) {
+ break;
+ }
DocCollection oldState = watchedCollectionStates.get(coll);
if (oldState == null) {
if (watchedCollectionStates.putIfAbsent(coll, newState) == null) {
+ log.info("Add data for {} ver {} ", coll, newState.getZNodeVersion());
break;
}
} else {
if (oldState.getZNodeVersion() >= newState.getZNodeVersion()) {
// Nothing to do, someone else updated same or newer.
- return;
+ break;
}
if (watchedCollectionStates.replace(coll, oldState, newState)) {
+ log.info("Updating data for {} from {} to {} ", coll, oldState.getZNodeVersion(), newState.getZNodeVersion());
break;
}
}
}
+
+ // Resolve race with removeZKWatch.
+ if (!interestingCollections.contains(coll)) {
+ watchedCollectionStates.remove(coll);
+ log.info("Removing uninteresting collection {}", coll);
+ }
}
/** This is not a public API. Only used by ZkController */
public void removeZKWatch(String coll) {
+ log.info("Removing watch for uninteresting collection {}", coll);
interestingCollections.remove(coll);
watchedCollectionStates.remove(coll);
ClusterState.CollectionRef lazyCollectionStateFormat2 = tryMakeLazyCollectionStateFormat2(coll);
synchronized (getUpdateLock()) {
if (lazyCollectionStateFormat2 != null) {
this.lazyCollectionStates.put(coll, lazyCollectionStateFormat2);
+ } else {
+ this.lazyCollectionStates.remove(coll);
}
constructState();
}