You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@lucene.apache.org by gu...@apache.org on 2019/06/11 18:36:32 UTC

[lucene-solr] branch branch_8x updated: SOLR-13439 - Adds ability to locally cache collection properties for a specified duration.

This is an automated email from the ASF dual-hosted git repository.

gus pushed a commit to branch branch_8x
in repository https://gitbox.apache.org/repos/asf/lucene-solr.git


The following commit(s) were added to refs/heads/branch_8x by this push:
     new fbae72c  SOLR-13439 - Adds ability to locally cache collection properties for a specified duration.
fbae72c is described below

commit fbae72c4cc86c83ee1b6fd5fd4a6b1f6cdb391ae
Author: Gus Heck <gu...@apache.org>
AuthorDate: Tue Jun 11 14:36:04 2019 -0400

    SOLR-13439 - Adds ability to locally cache collection properties for a specified duration.
---
 .../overseer/ZkCollectionPropsCachingTest.java     | 100 ++++++++++++++
 .../apache/solr/common/cloud/ZkStateReader.java    | 151 +++++++++++++++++----
 2 files changed, 226 insertions(+), 25 deletions(-)

diff --git a/solr/core/src/test/org/apache/solr/cloud/overseer/ZkCollectionPropsCachingTest.java b/solr/core/src/test/org/apache/solr/cloud/overseer/ZkCollectionPropsCachingTest.java
new file mode 100644
index 0000000..a765ada
--- /dev/null
+++ b/solr/core/src/test/org/apache/solr/cloud/overseer/ZkCollectionPropsCachingTest.java
@@ -0,0 +1,100 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.solr.cloud.overseer;
+
+import java.io.IOException;
+import java.lang.invoke.MethodHandles;
+
+import org.apache.lucene.util.LuceneTestCase;
+import org.apache.solr.SolrTestCaseJ4;
+import org.apache.solr.client.solrj.request.CollectionAdminRequest;
+import org.apache.solr.client.solrj.response.CollectionAdminResponse;
+import org.apache.solr.cloud.SolrCloudTestCase;
+import org.apache.solr.common.cloud.CollectionProperties;
+import org.apache.solr.common.cloud.ZkStateReader;
+import org.junit.Before;
+import org.junit.BeforeClass;
+import org.junit.Test;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+@LuceneTestCase.Slow
+@SolrTestCaseJ4.SuppressSSL
+public class ZkCollectionPropsCachingTest extends SolrCloudTestCase {
+  //
+  // NOTE: This class can only have one test because our test for caching is to nuke the SolrZkClient to
+  // verify that a cached load is going to hit the cache, not try to talk to zk. Any other ZK related test
+  // method in this class will fail if it runs after testReadWriteCached, so don't add one! :)
+  //
+  private String collectionName;
+  private static final Logger log = LoggerFactory.getLogger(MethodHandles.lookup().lookupClass());
+
+  @BeforeClass
+  public static void setupClass() throws Exception {
+    Boolean useLegacyCloud = rarely();
+    log.info("Using legacyCloud?: {}", useLegacyCloud);
+
+    configureCluster(4)
+        .withProperty(ZkStateReader.LEGACY_CLOUD, String.valueOf(useLegacyCloud))
+        .addConfig("conf", configset("cloud-minimal"))
+        .configure();
+  }
+
+  @Before
+  @Override
+  public void setUp() throws Exception {
+    super.setUp();
+
+    collectionName = "CollectionPropsTest" + System.nanoTime();
+
+    CollectionAdminRequest.Create request = CollectionAdminRequest.createCollection(collectionName, "conf", 2, 2);
+    CollectionAdminResponse response = request.process(cluster.getSolrClient());
+    assertTrue("Unable to create collection: " + response.toString(), response.isSuccess());
+  }
+
+  @Test
+  public void testReadWriteCached() throws InterruptedException, IOException {
+    ZkStateReader zkStateReader = cluster.getSolrClient().getZkStateReader();
+
+    CollectionProperties collectionProps = new CollectionProperties(zkClient());
+
+    collectionProps.setCollectionProperty(collectionName, "property1", "value1");
+    checkValue("property1", "value1"); //Should be no cache, so the change should take effect immediately
+
+    zkStateReader.getCollectionProperties(collectionName,9000);
+    zkStateReader.getZkClient().close();
+    assertFalse(zkStateReader.isClosed());
+    checkValue("property1", "value1"); //Should be cached, so the change should not try to hit zk
+
+    Thread.sleep(10000); // test the timeout feature
+    try {
+      checkValue("property1", "value1"); //Should not be cached anymore
+      fail("cache should have expired, prev line should throw an exception trying to access zookeeper after closed");
+    } catch (Exception e) {
+      // expected, because we killed the client in zkStateReader.
+    }
+  }
+
+  private void checkValue(String propertyName, String expectedValue) throws InterruptedException {
+    final Object value = cluster.getSolrClient().getZkStateReader().getCollectionProperties(collectionName).get(propertyName);
+    assertEquals("Unexpected value for collection property: " + propertyName, expectedValue, value);
+  }
+
+
+
+}
diff --git a/solr/solrj/src/java/org/apache/solr/common/cloud/ZkStateReader.java b/solr/solrj/src/java/org/apache/solr/common/cloud/ZkStateReader.java
index f00cdff..e09f89e 100644
--- a/solr/solrj/src/java/org/apache/solr/common/cloud/ZkStateReader.java
+++ b/solr/solrj/src/java/org/apache/solr/common/cloud/ZkStateReader.java
@@ -35,6 +35,7 @@ import java.util.TreeSet;
 import java.util.concurrent.ConcurrentHashMap;
 import java.util.concurrent.CountDownLatch;
 import java.util.concurrent.ExecutorService;
+import java.util.concurrent.Future;
 import java.util.concurrent.RejectedExecutionException;
 import java.util.concurrent.TimeUnit;
 import java.util.concurrent.TimeoutException;
@@ -171,6 +172,9 @@ public class ZkStateReader implements SolrCloseable {
   /** Collection properties being actively watched */
   private final ConcurrentHashMap<String, VersionedCollectionProps> watchedCollectionProps = new ConcurrentHashMap<>();
 
+  /** Collection properties being actively watched */
+  private final ConcurrentHashMap<String, PropsWatcher> collectionPropsWatchers = new ConcurrentHashMap<>();
+
   private volatile SortedSet<String> liveNodes = emptySortedSet();
 
   private volatile Map<String, Object> clusterProperties = Collections.emptyMap();
@@ -183,7 +187,8 @@ public class ZkStateReader implements SolrCloseable {
 
   private ConcurrentHashMap<String, CollectionWatch<CollectionStateWatcher>> collectionWatches = new ConcurrentHashMap<>();
 
-  private ConcurrentHashMap<String, CollectionWatch<CollectionPropsWatcher>> collectionPropsWatches = new ConcurrentHashMap<>();
+  // named this observers so there's less confusion between CollectionPropsWatcher map and the PropsWatcher map.
+  private ConcurrentHashMap<String, CollectionWatch<CollectionPropsWatcher>> collectionPropsObservers = new ConcurrentHashMap<>();
 
   private Set<CloudCollectionsListener> cloudCollectionsListeners = ConcurrentHashMap.newKeySet();
 
@@ -198,6 +203,8 @@ public class ZkStateReader implements SolrCloseable {
 
   private static final long LAZY_CACHE_TIME = TimeUnit.NANOSECONDS.convert(STATE_UPDATE_DELAY, TimeUnit.MILLISECONDS);
 
+  private Future<?> collectionPropsCacheCleaner; // only kept to identify if the cleaner has already been started.
+
   /**
    * Get current {@link AutoScalingConfig}.
    * @return current configuration from <code>autoscaling.json</code>. NOTE:
@@ -484,8 +491,8 @@ public class ZkStateReader implements SolrCloseable {
       securityData = getSecurityProps(true);
     }
 
-    collectionPropsWatches.forEach((k,v) -> {
-      new PropsWatcher(k).refreshAndWatch(true);
+    collectionPropsObservers.forEach((k, v) -> {
+      collectionPropsWatchers.computeIfAbsent(k, PropsWatcher::new).refreshAndWatch(true);
     });
   }
 
@@ -1094,17 +1101,60 @@ public class ZkStateReader implements SolrCloseable {
 
   /**
    * Get collection properties for a given collection. If the collection is watched, simply return it from the cache,
-   * otherwise fetch it directly from zookeeper.
+   * otherwise fetch it directly from zookeeper. This is a convenience for {@code getCollectionProperties(collection,0)}
+   *
+   * @param collection the collection for which properties are desired
+   * @return a map representing the key/value properties for the collection.
    */
   public Map<String, String> getCollectionProperties(final String collection) {
+    return getCollectionProperties(collection,0);
+  }
+
+  /**
+   * Get and cache collection properties for a given collection. If the collection is watched, or still cached
+   * simply return it from the cache, otherwise fetch it directly from zookeeper and retain the value for at
+   * least cacheForMillis milliseconds. Cached properties are watched in zookeeper and updated automatically.
+   * This version of {@code getCollectionProperties} should be used when properties need to be consulted
+   * frequently in the absence of an active {@link CollectionPropsWatcher}.
+   *
+   * @param collection The collection for which properties are desired
+   * @param cacheForMillis The minimum number of milliseconds to maintain a cache for the specified collection's
+   *                       properties. Setting a {@code CollectionPropsWatcher} will override this value and retain
+   *                       the cache for the life of the watcher. A lack of changes in zookeeper may allow the
+   *                       caching to remain for a greater duration up to the cycle time of {@link CacheCleaner}.
+   *                       Passing zero for this value will explicitly remove the cached copy if and only if it is
+   *                       due to expire and no watch exists. Any positive value will extend the expiration time
+   *                       if required.
+   * @return a map representing the key/value properties for the collection.
+   */
+  public Map<String, String> getCollectionProperties(final String collection, long cacheForMillis) {
     synchronized (watchedCollectionProps) { // making decisions based on the result of a get...
+      Watcher watcher = null;
+      if (cacheForMillis > 0) {
+        watcher = collectionPropsWatchers.compute(collection,
+            (c, w) -> w == null ? new PropsWatcher(c, cacheForMillis) : w.renew(cacheForMillis));
+      }
       VersionedCollectionProps vprops = watchedCollectionProps.get(collection);
-      Map<String, String> properties = vprops != null ? vprops.props : null;
-      if (properties == null) {
+      boolean haveUnexpiredProps = vprops != null && vprops.cacheUntilNs > System.nanoTime();
+      long untilNs = System.nanoTime() + TimeUnit.NANOSECONDS.convert(cacheForMillis, TimeUnit.MILLISECONDS);
+      Map<String, String> properties;
+      if (haveUnexpiredProps) {
+        properties = vprops.props;
+        vprops.cacheUntilNs = Math.max(vprops.cacheUntilNs, untilNs);
+      } else {
         try {
-          // todo: maybe we want to store/watch since if someone's calling this it's probably going to get called again?
-          // Not storing the value in watchedCollectionProps, because it can gat stale, since we have no watcher set.
-          properties = fetchCollectionProperties(collection, null ).props;
+          VersionedCollectionProps vcp = fetchCollectionProperties(collection, watcher);
+          properties = vcp.props;
+          if (cacheForMillis > 0) {
+            vcp.cacheUntilNs = untilNs;
+            watchedCollectionProps.put(collection,vcp);
+          } else {
+            // we're synchronized on watchedCollectionProps and we can only get here if we have found an expired
+            // vprops above, so it is safe to remove the cached value and let the GC free up some mem a bit sooner.
+            if (!collectionPropsObservers.containsKey(collection)) {
+              watchedCollectionProps.remove(collection);
+            }
+          }
         } catch (Exception e) {
           throw new SolrException(ErrorCode.SERVER_ERROR, "Error reading collection properties", SolrZkClient.checkInterrupted(e));
         }
@@ -1114,13 +1164,14 @@ public class ZkStateReader implements SolrCloseable {
   }
 
   private class VersionedCollectionProps {
-    public VersionedCollectionProps(int zkVersion, Map<String, String> props) {
+    int zkVersion;
+    Map<String,String> props;
+    long cacheUntilNs = 0;
+
+    VersionedCollectionProps(int zkVersion, Map<String, String> props) {
       this.zkVersion = zkVersion;
       this.props = props;
     }
-
-    int zkVersion;
-    Map<String,String> props;
   }
 
   static String getCollectionPropsPath(final String collection) {
@@ -1130,6 +1181,14 @@ public class ZkStateReader implements SolrCloseable {
   @SuppressWarnings("unchecked")
   private VersionedCollectionProps fetchCollectionProperties(String collection, Watcher watcher) throws KeeperException, InterruptedException {
     final String znodePath = getCollectionPropsPath(collection);
+    // lazy init cache cleaner once we know someone is using collection properties.
+    if (collectionPropsCacheCleaner == null) {
+      synchronized (this) { // There can be only one! :)
+        if (collectionPropsCacheCleaner == null) {
+          collectionPropsCacheCleaner = notifications.submit(new CacheCleaner());
+        }
+      }
+    }
     while (true) {
       try {
         Stat stat = new Stat();
@@ -1279,9 +1338,21 @@ public class ZkStateReader implements SolrCloseable {
   /** Watches collection properties */
   class PropsWatcher implements Watcher {
     private final String coll;
+    private long watchUntilNs;
 
     PropsWatcher(String coll) {
       this.coll = coll;
+      watchUntilNs = 0;
+    }
+
+    PropsWatcher(String coll, long forMillis) {
+      this.coll = coll;
+      watchUntilNs = System.nanoTime() + TimeUnit.NANOSECONDS.convert(forMillis,TimeUnit.MILLISECONDS);
+    }
+
+    public PropsWatcher renew(long forMillis) {
+      watchUntilNs = System.nanoTime() + TimeUnit.NANOSECONDS.convert(forMillis,TimeUnit.MILLISECONDS);
+      return this;
     }
 
     @Override
@@ -1291,7 +1362,8 @@ public class ZkStateReader implements SolrCloseable {
         return;
       }
 
-      if (!collectionPropsWatches.containsKey(coll)) {
+      boolean expired = System.nanoTime() > watchUntilNs;
+      if (!collectionPropsObservers.containsKey(coll) && expired) {
         // No one can be notified of the change, we can ignore it and "unset" the watch
         log.debug("Ignoring property change for collection {}", coll);
         return;
@@ -1320,6 +1392,19 @@ public class ZkStateReader implements SolrCloseable {
             if (notifyWatchers) {
               notifyPropsWatchers(coll, properties);
             }
+            if (vcp.zkVersion == -1 && existingVcp != null) { // Collection DELETE detected
+
+              // We should not be caching a collection that has been deleted.
+              watchedCollectionProps.remove(coll);
+
+              // core ref counting not relevant here, don't need canRemove(), we just sent
+              // a notification of an empty set of properties, no reason to watch what doesn't exist.
+              collectionPropsObservers.remove(coll);
+
+              // This is the one time we know it's safe to throw this out. We just failed to set the watch
+              // due to an NoNodeException, so it isn't held by ZK and can't re-set itself due to an update.
+              collectionPropsWatchers.remove(coll);
+            }
           }
         }
       } catch (KeeperException.SessionExpiredException | KeeperException.ConnectionLossException e) {
@@ -1708,7 +1793,7 @@ public class ZkStateReader implements SolrCloseable {
 
   public void registerCollectionPropsWatcher(final String collection, CollectionPropsWatcher propsWatcher) {
     AtomicBoolean watchSet = new AtomicBoolean(false);
-    collectionPropsWatches.compute(collection, (k, v) -> {
+    collectionPropsObservers.compute(collection, (k, v) -> {
       if (v == null) {
         v = new CollectionWatch<>();
         watchSet.set(true);
@@ -1718,12 +1803,12 @@ public class ZkStateReader implements SolrCloseable {
     });
 
     if (watchSet.get()) {
-      new PropsWatcher(collection).refreshAndWatch(false);
+      collectionPropsWatchers.computeIfAbsent(collection, PropsWatcher::new).refreshAndWatch(false);
     }
   }
 
   public void removeCollectionPropsWatcher(String collection, CollectionPropsWatcher watcher) {
-    collectionPropsWatches.compute(collection, (k, v) -> {
+    collectionPropsObservers.compute(collection, (k, v) -> {
       if (v == null)
         return null;
       v.stateWatchers.remove(watcher);
@@ -1983,30 +2068,46 @@ public class ZkStateReader implements SolrCloseable {
 
   private class PropsNotification implements Runnable {
 
-    final String collection;
-    final Map<String, String> collectionProperties;
+    private final String collection;
+    private final Map<String, String> collectionProperties;
+    private final List<CollectionPropsWatcher> watchers = new ArrayList<>();
 
     private PropsNotification(String collection, Map<String, String> collectionProperties) {
       this.collection = collection;
       this.collectionProperties = collectionProperties;
-    }
-
-    @Override
-    public void run() {
-      List<CollectionPropsWatcher> watchers = new ArrayList<>();
-      collectionPropsWatches.compute(collection, (k, v) -> {
+      // guarantee delivery of notification regardless of what happens to collectionPropsObservers
+      // while we wait our turn in the executor by capturing the list on creation.
+      collectionPropsObservers.compute(collection, (k, v) -> {
         if (v == null)
           return null;
         watchers.addAll(v.stateWatchers);
         return v;
       });
+    }
+
+    @Override
+    public void run() {
       for (CollectionPropsWatcher watcher : watchers) {
         if (watcher.onStateChanged(collectionProperties)) {
           removeCollectionPropsWatcher(collection, watcher);
         }
       }
     }
+  }
 
+  private class CacheCleaner implements Runnable {
+    public void run() {
+      while (!Thread.interrupted()) {
+        try {
+          Thread.sleep(60000);
+        } catch (InterruptedException e) {
+          // Executor shutdown will send us an interrupt
+          break;
+        }
+        watchedCollectionProps.entrySet().removeIf(entry ->
+            entry.getValue().cacheUntilNs < System.nanoTime() && !collectionPropsObservers.containsKey(entry.getKey()));
+      }
+    }
   }
 
 }