You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@lucene.apache.org by gu...@apache.org on 2019/04/23 18:25:19 UTC

[lucene-solr] branch branch_8x updated: SOLR-13418 - safer synchronization and zk version checking for collection properties

This is an automated email from the ASF dual-hosted git repository.

gus pushed a commit to branch branch_8x
in repository https://gitbox.apache.org/repos/asf/lucene-solr.git


The following commit(s) were added to refs/heads/branch_8x by this push:
     new 0cfd85b  SOLR-13418 - safer synchronization and zk version checking for collection properties
0cfd85b is described below

commit 0cfd85baef7f6f6fb997330b9a14471d66a62889
Author: Gus Heck <gu...@apache.org>
AuthorDate: Tue Apr 23 12:29:08 2019 -0400

    SOLR-13418 - safer synchronization and zk version checking for collection properties
    
    (cherry picked from commit 80d3ac8709c6d93c4e4634dc7c10ef667a029cb1)
---
 .../apache/solr/common/cloud/ZkStateReader.java    | 136 ++++++++++++---------
 1 file changed, 76 insertions(+), 60 deletions(-)

diff --git a/solr/solrj/src/java/org/apache/solr/common/cloud/ZkStateReader.java b/solr/solrj/src/java/org/apache/solr/common/cloud/ZkStateReader.java
index cd7e41c..2e98189 100644
--- a/solr/solrj/src/java/org/apache/solr/common/cloud/ZkStateReader.java
+++ b/solr/solrj/src/java/org/apache/solr/common/cloud/ZkStateReader.java
@@ -77,7 +77,7 @@ import static org.apache.solr.common.util.Utils.fromJSON;
 public class ZkStateReader implements SolrCloseable {
   public static final int STATE_UPDATE_DELAY = Integer.getInteger("solr.OverseerStateUpdateDelay", 2000);  // delay between cloud state updates
   private static final Logger log = LoggerFactory.getLogger(MethodHandles.lookup().lookupClass());
-  
+
   public static final String BASE_URL_PROP = "base_url";
   public static final String NODE_NAME_PROP = "node_name";
   public static final String CORE_NODE_NAME_PROP = "core_node_name";
@@ -140,7 +140,7 @@ public class ZkStateReader implements SolrCloseable {
   public static final String COLLECTION_DEF = "collectionDefaults";
 
   public static final String URL_SCHEME = "urlScheme";
-  
+
   public static final String REPLICA_TYPE = "type";
 
   /** A view of the current state of all collections; combines all the different state sources into a single view. */
@@ -167,7 +167,7 @@ public class ZkStateReader implements SolrCloseable {
   private final ConcurrentHashMap<String, LazyCollectionRef> lazyCollectionStates = new ConcurrentHashMap<>();
 
   /** Collection properties being actively watched */
-  private final ConcurrentHashMap<String, Map<String, String>> watchedCollectionProps = new ConcurrentHashMap<>();
+  private final ConcurrentHashMap<String, VersionedCollectionProps> watchedCollectionProps = new ConcurrentHashMap<>();
 
   private volatile SortedSet<String> liveNodes = emptySortedSet();
 
@@ -188,7 +188,7 @@ public class ZkStateReader implements SolrCloseable {
   private final ExecutorService notifications = ExecutorUtil.newMDCAwareCachedThreadPool("watches");
 
   private Set<LiveNodesListener> liveNodesListeners = ConcurrentHashMap.newKeySet();
-  
+
   /** Used to submit notifications to Collection Properties watchers in order **/
   private final ExecutorService collectionPropsNotifications = ExecutorUtil.newMDCAwareSingleThreadExecutor(new SolrjNamedThreadFactory("collectionPropsNotifications"));
 
@@ -285,11 +285,11 @@ public class ZkStateReader implements SolrCloseable {
 
 
   private final SolrZkClient zkClient;
-  
+
   private final boolean closeClient;
 
   private volatile boolean closed = false;
-  
+
   private Set<CountDownLatch> waitLatches = ConcurrentHashMap.newKeySet();
 
   public ZkStateReader(SolrZkClient zkClient) {
@@ -327,7 +327,7 @@ public class ZkStateReader implements SolrCloseable {
     this.configManager = new ZkConfigManager(zkClient);
     this.closeClient = true;
     this.securityNodeListener = null;
-    
+
     assert ObjectReleaseTracker.track(this);
   }
 
@@ -339,7 +339,7 @@ public class ZkStateReader implements SolrCloseable {
    * Forcibly refresh cluster state from ZK. Do this only to avoid race conditions because it's expensive.
    *
    * It is cheaper to call {@link #forceUpdateCollection(String)} on a single collection if you must.
-   * 
+   *
    * @lucene.internal
    */
   public void forciblyRefreshAllClusterStateSlow() throws KeeperException, InterruptedException {
@@ -437,16 +437,16 @@ public class ZkStateReader implements SolrCloseable {
         collection = nu;
       }
     }
-    
+
     if (collection.getZNodeVersion() == version) {
       return null;
     }
-    
+
     log.debug("Wrong version from client [{}]!=[{}]", version, collection.getZNodeVersion());
-    
+
     return collection.getZNodeVersion();
   }
-  
+
   public synchronized void createClusterStateWatchersAndUpdate() throws KeeperException,
       InterruptedException {
     // We need to fetch the current cluster state and the set of live nodes
@@ -818,7 +818,7 @@ public class ZkStateReader implements SolrCloseable {
     if (listener.onChange(new TreeSet<>(getClusterState().getLiveNodes()), new TreeSet<>(getClusterState().getLiveNodes()))) {
       removeLiveNodesListener(listener);
     }
-    
+
     liveNodesListeners.add(listener);
   }
 
@@ -832,18 +832,18 @@ public class ZkStateReader implements SolrCloseable {
   public ClusterState getClusterState() {
     return clusterState;
   }
-  
+
   public Object getUpdateLock() {
     return this;
   }
 
   public void close() {
     this.closed  = true;
-    
+
     notifications.shutdownNow();
-    
+
     waitLatches.parallelStream().forEach(c -> { c.countDown(); });
-    
+
     ExecutorUtil.shutdownAndAwaitTermination(notifications);
     ExecutorUtil.shutdownAndAwaitTermination(collectionPropsNotifications);
     if (closeClient) {
@@ -856,12 +856,12 @@ public class ZkStateReader implements SolrCloseable {
   public boolean isClosed() {
     return closed;
   }
-  
+
   public String getLeaderUrl(String collection, String shard, int timeout) throws InterruptedException {
     ZkCoreNodeProps props = new ZkCoreNodeProps(getLeaderRetry(collection, shard, timeout));
     return props.getCoreUrl();
   }
-  
+
   public Replica getLeader(Set<String> liveNodes, DocCollection docCollection, String shard) {
     Replica replica = docCollection != null ? docCollection.getLeader(shard) : null;
     if (replica != null && liveNodes.contains(replica.getNodeName())) {
@@ -934,18 +934,18 @@ public class ZkStateReader implements SolrCloseable {
   public List<ZkCoreNodeProps> getReplicaProps(String collection, String shardId, String thisCoreNodeName) {
     return getReplicaProps(collection, shardId, thisCoreNodeName, null);
   }
-  
+
   public List<ZkCoreNodeProps> getReplicaProps(String collection, String shardId, String thisCoreNodeName,
       Replica.State mustMatchStateFilter) {
     return getReplicaProps(collection, shardId, thisCoreNodeName, mustMatchStateFilter, null);
   }
-  
+
   public List<ZkCoreNodeProps> getReplicaProps(String collection, String shardId, String thisCoreNodeName,
       Replica.State mustMatchStateFilter, Replica.State mustNotMatchStateFilter) {
     //TODO: We don't need all these getReplicaProps method overloading. Also, it's odd that the default is to return replicas of type TLOG and NRT only
     return getReplicaProps(collection, shardId, thisCoreNodeName, mustMatchStateFilter, null, EnumSet.of(Replica.Type.TLOG,  Replica.Type.NRT));
   }
-  
+
   public List<ZkCoreNodeProps> getReplicaProps(String collection, String shardId, String thisCoreNodeName,
       Replica.State mustMatchStateFilter, Replica.State mustNotMatchStateFilter, final EnumSet<Replica.Type> acceptReplicaType) {
     assert thisCoreNodeName != null;
@@ -958,20 +958,20 @@ public class ZkStateReader implements SolrCloseable {
       throw new ZooKeeperException(ErrorCode.BAD_REQUEST,
           "Could not find collection in zk: " + collection);
     }
-    
+
     Map<String,Slice> slices = docCollection.getSlicesMap();
     Slice replicas = slices.get(shardId);
     if (replicas == null) {
       throw new ZooKeeperException(ErrorCode.BAD_REQUEST, "Could not find shardId in zk: " + shardId);
     }
-    
+
     Map<String,Replica> shardMap = replicas.getReplicasMap();
     List<ZkCoreNodeProps> nodes = new ArrayList<>(shardMap.size());
     for (Entry<String,Replica> entry : shardMap.entrySet().stream().filter((e)->acceptReplicaType.contains(e.getValue().getType())).collect(Collectors.toList())) {
       ZkCoreNodeProps nodeProps = new ZkCoreNodeProps(entry.getValue());
-      
+
       String coreNodeName = entry.getValue().getName();
-      
+
       if (clusterState.liveNodesContain(nodeProps.getNodeName()) && !coreNodeName.equals(thisCoreNodeName)) {
         if (mustMatchStateFilter == null || mustMatchStateFilter == Replica.State.getState(nodeProps.getState())) {
           if (mustNotMatchStateFilter == null || mustNotMatchStateFilter != Replica.State.getState(nodeProps.getState())) {
@@ -1074,17 +1074,30 @@ public class ZkStateReader implements SolrCloseable {
    * otherwise fetch it directly from zookeeper.
    */
   public Map<String, String> getCollectionProperties(final String collection) {
-    Map<String, String> properties = watchedCollectionProps.get(collection);
-    if (properties == null) {
-      try {
-        properties = fetchCollectionProperties(collection, null);
-        // Not storing the value in watchedCollectionProps, because it can gat stale, since we have no watcher set.
-      } catch (Exception e) {
-        throw new SolrException(ErrorCode.SERVER_ERROR, "Error reading collection properties", SolrZkClient.checkInterrupted(e));
+    synchronized (watchedCollectionProps) { // making decisions based on the result of a get...
+      VersionedCollectionProps vprops = watchedCollectionProps.get(collection);
+      Map<String, String> properties = vprops != null ? vprops.props : null;
+      if (properties == null) {
+        try {
+          // todo: maybe we want to store/watch since if someone's calling this it's probably going to get called again?
+          // Not storing the value in watchedCollectionProps, because it can gat stale, since we have no watcher set.
+          properties = fetchCollectionProperties(collection, null ).props;
+        } catch (Exception e) {
+          throw new SolrException(ErrorCode.SERVER_ERROR, "Error reading collection properties", SolrZkClient.checkInterrupted(e));
+        }
       }
+      return properties;
+    }
+  }
+
+  private class VersionedCollectionProps {
+    public VersionedCollectionProps(int zkVersion, Map<String, String> props) {
+      this.zkVersion = zkVersion;
+      this.props = props;
     }
 
-    return properties;
+    int zkVersion;
+    Map<String,String> props;
   }
 
   static String getCollectionPropsPath(final String collection) {
@@ -1092,12 +1105,13 @@ public class ZkStateReader implements SolrCloseable {
   }
 
   @SuppressWarnings("unchecked")
-  private Map<String, String> fetchCollectionProperties(String collection, Watcher watcher) throws KeeperException, InterruptedException {
+  private VersionedCollectionProps fetchCollectionProperties(String collection, Watcher watcher) throws KeeperException, InterruptedException {
     final String znodePath = getCollectionPropsPath(collection);
     while (true) {
       try {
-        byte[] data = zkClient.getData(znodePath, watcher, null, true);
-        return (Map<String, String>) Utils.fromJSON(data);
+        Stat stat = new Stat();
+        byte[] data = zkClient.getData(znodePath, watcher, stat, true);
+        return new VersionedCollectionProps(stat.getVersion(),(Map<String, String>) Utils.fromJSON(data));
       } catch (ClassCastException e) {
         throw new SolrException(ErrorCode.SERVER_ERROR, "Unable to parse collection properties for collection " + collection, e);
       } catch (KeeperException.NoNodeException e) {
@@ -1110,7 +1124,7 @@ public class ZkStateReader implements SolrCloseable {
             continue;
           }
         }
-        return Collections.emptyMap();
+        return new VersionedCollectionProps(-1, EMPTY_MAP);
       }
     }
   }
@@ -1272,18 +1286,17 @@ public class ZkStateReader implements SolrCloseable {
      */
     void refreshAndWatch(boolean notifyWatchers) {
       try {
-        synchronized (coll) { // We only have one PropsWatcher instance per collection, so it's fine to sync on coll
-          Map<String, String> properties = fetchCollectionProperties(coll, this);
-          watchedCollectionProps.put(coll, properties);
-          /*
-           * Note that if two events were fired close to each other and the second one arrived first, we would read the collectionprops.json
-           * twice for the same data and notify watchers (in case of notifyWatchers==true) twice for the same data, however it's guaranteed
-           * that after processing both events, watchedCollectionProps will have the latest data written to ZooKeeper and that the watchers
-           * won't be called with out of order data
-           * 
-           */
-          if (notifyWatchers) {
-            notifyPropsWatchers(coll, properties);
+        synchronized (watchedCollectionProps) { // making decisions based on the result of a get...
+          VersionedCollectionProps vcp = fetchCollectionProperties(coll, this);
+          Map<String, String> properties = vcp.props;
+          VersionedCollectionProps existingVcp = watchedCollectionProps.get(coll);
+          if (existingVcp == null ||                   // never called before, record what we found
+              vcp.zkVersion > existingVcp.zkVersion || // newer info we should update
+              vcp.zkVersion == -1) {                   // node was deleted start over
+            watchedCollectionProps.put(coll, vcp);
+            if (notifyWatchers) {
+              notifyPropsWatchers(coll, properties);
+            }
           }
         }
       } catch (KeeperException.SessionExpiredException | KeeperException.ConnectionLossException e) {
@@ -1306,7 +1319,7 @@ public class ZkStateReader implements SolrCloseable {
       if (ZkStateReader.this.closed) {
         return;
       }
-      
+
       // session events are not change events, and do not remove the watcher
       if (EventType.None.equals(event.getType())) {
         return;
@@ -1507,11 +1520,11 @@ public class ZkStateReader implements SolrCloseable {
    */
   public void waitForState(final String collection, long wait, TimeUnit unit, CollectionStatePredicate predicate)
       throws InterruptedException, TimeoutException {
-    
+
     if (closed) {
       throw new AlreadyClosedException();
     }
-    
+
     final CountDownLatch latch = new CountDownLatch(1);
     waitLatches.add(latch);
     AtomicReference<DocCollection> docCollection = new AtomicReference<>();
@@ -1520,7 +1533,7 @@ public class ZkStateReader implements SolrCloseable {
       boolean matches = predicate.matches(n, c);
       if (matches)
         latch.countDown();
-      
+
       return matches;
     };
     registerCollectionStateWatcher(collection, watcher);
@@ -1551,22 +1564,22 @@ public class ZkStateReader implements SolrCloseable {
    */
   public void waitForLiveNodes(long wait, TimeUnit unit, LiveNodesPredicate predicate)
       throws InterruptedException, TimeoutException {
-    
+
     if (closed) {
       throw new AlreadyClosedException();
     }
-    
+
     final CountDownLatch latch = new CountDownLatch(1);
     waitLatches.add(latch);
 
-    
+
     LiveNodesListener listener = (o, n) -> {
       boolean matches = predicate.matches(o, n);
       if (matches)
         latch.countDown();
       return matches;
     };
-    
+
     registerLiveNodesListener(listener);
 
     try {
@@ -1581,7 +1594,7 @@ public class ZkStateReader implements SolrCloseable {
     }
   }
 
-  
+
   /**
    * Remove a watcher from a collection's watch list.
    *
@@ -1692,7 +1705,10 @@ public class ZkStateReader implements SolrCloseable {
         return null;
       v.stateWatchers.remove(watcher);
       if (v.canBeRemoved()) {
-        watchedCollectionProps.remove(collection);
+        // don't want this to happen in middle of other blocks that might add it back.
+        synchronized (watchedCollectionProps) {
+          watchedCollectionProps.remove(collection);
+        }
         return null;
       }
       return v;