You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@lucene.apache.org by ma...@apache.org on 2020/09/17 15:22:44 UTC

[lucene-solr] 10/18: @845 Non fair locks, too many max connections per host for http2 client.

This is an automated email from the ASF dual-hosted git repository.

markrmiller pushed a commit to branch reference_impl_dev
in repository https://gitbox.apache.org/repos/asf/lucene-solr.git

commit d3561af65783cdf8dd331d3a6b0df3f6bf33df5c
Author: markrmiller@gmail.com <ma...@gmail.com>
AuthorDate: Wed Sep 16 21:45:06 2020 -0500

    @845 Non fair locks, too many max connections per host for http2 client.
---
 .../client/solrj/embedded/JettySolrRunner.java     |   1 +
 .../src/java/org/apache/solr/core/PluginBag.java   |  10 +-
 .../org/apache/solr/update/CdcrTransactionLog.java |  15 ++-
 .../java/org/apache/solr/update/CommitTracker.java |  43 ++++++---
 .../org/apache/solr/update/HdfsTransactionLog.java |   5 +-
 .../org/apache/solr/update/TransactionLog.java     | 103 ++++++++++++++++-----
 .../org/apache/solr/update/UpdateShardHandler.java |   3 +-
 .../solr/client/solrj/impl/Http2SolrClient.java    |   2 +-
 .../apache/solr/common/cloud/ZkStateReader.java    |  23 +++--
 9 files changed, 149 insertions(+), 56 deletions(-)

diff --git a/solr/core/src/java/org/apache/solr/client/solrj/embedded/JettySolrRunner.java b/solr/core/src/java/org/apache/solr/client/solrj/embedded/JettySolrRunner.java
index 0679ff5..68285e8 100644
--- a/solr/core/src/java/org/apache/solr/client/solrj/embedded/JettySolrRunner.java
+++ b/solr/core/src/java/org/apache/solr/client/solrj/embedded/JettySolrRunner.java
@@ -341,6 +341,7 @@ public class JettySolrRunner implements Closeable {
           HTTP2ServerConnectionFactory http2ConnectionFactory = new HTTP2ServerConnectionFactory(configuration);
 
           http2ConnectionFactory.setMaxConcurrentStreams(256);
+
           http2ConnectionFactory.setInputBufferSize(16384);
 
           ALPNServerConnectionFactory alpn = new ALPNServerConnectionFactory(
diff --git a/solr/core/src/java/org/apache/solr/core/PluginBag.java b/solr/core/src/java/org/apache/solr/core/PluginBag.java
index 5cb2c540..4f04b9c 100644
--- a/solr/core/src/java/org/apache/solr/core/PluginBag.java
+++ b/solr/core/src/java/org/apache/solr/core/PluginBag.java
@@ -86,7 +86,7 @@ public class PluginBag<T> implements AutoCloseable {
     // TODO: since reads will dominate writes, we could also think about creating a new instance of a map each time it changes.
     // Not sure how much benefit this would have over ConcurrentHashMap though
     // We could also perhaps make this constructor into a factory method to return different implementations depending on thread safety needs.
-    this.registry = new ConcurrentHashMap<>(64, 0.75f, 4);
+    this.registry = new ConcurrentHashMap<>(128, 0.75f, 3);
     this.immutableRegistry = Collections.unmodifiableMap(registry);
     meta = SolrConfig.classVsSolrPluginInfo.get(klass.getName());
     if (meta == null) {
@@ -364,10 +364,10 @@ public class PluginBag<T> implements AutoCloseable {
   @Override
   public void close() {
     try (ParWork worker = new ParWork(this)) {
-      for (Map.Entry<String,PluginHolder<T>> e : registry.entrySet()) {
-        worker.collect(e.getValue());
-      }
-      worker.addCollect();
+      registry.forEach((s, tPluginHolder) -> {
+        worker.collect(tPluginHolder);
+      });
+
     }
   }
 
diff --git a/solr/core/src/java/org/apache/solr/update/CdcrTransactionLog.java b/solr/core/src/java/org/apache/solr/update/CdcrTransactionLog.java
index a41d7eb..416e459 100644
--- a/solr/core/src/java/org/apache/solr/update/CdcrTransactionLog.java
+++ b/solr/core/src/java/org/apache/solr/update/CdcrTransactionLog.java
@@ -202,13 +202,16 @@ public class CdcrTransactionLog extends TransactionLog {
         codec.writePrimitive(false);
       }
 
-      synchronized (fosLock) {
+      fosLock.lock();
+      try {
         long pos = fos.size();   // if we had flushed, this should be equal to channel.position()
         assert pos != 0;
         out.writeAll(fos);
         endRecord(pos);
         // fos.flushBuffer();  // flush later
         return pos;
+      } finally {
+        fosLock.unlock();
       }
 
     } catch (IOException e) {
@@ -236,12 +239,15 @@ public class CdcrTransactionLog extends TransactionLog {
       } else {
         codec.writePrimitive(false);
       }
-      synchronized (fosLock) {
+      fosLock.lock();
+      try {
         long pos = fos.size();   // if we had flushed, this should be equal to channel.position()
         out.writeAll(fos);
         endRecord(pos);
         // fos.flushBuffer();  // flush later
         return pos;
+      } finally {
+        fosLock.unlock();
       }
     } catch (IOException e) {
       throw new SolrException(SolrException.ErrorCode.SERVER_ERROR, e);
@@ -251,7 +257,8 @@ public class CdcrTransactionLog extends TransactionLog {
   @Override
   public long writeCommit(CommitUpdateCommand cmd) {
     LogCodec codec = new LogCodec(resolver);
-    synchronized (fosLock) {
+    fosLock.lock();
+    try {
       try {
         long pos = fos.size();   // if we had flushed, this should be equal to channel.position()
 
@@ -278,6 +285,8 @@ public class CdcrTransactionLog extends TransactionLog {
       } catch (IOException e) {
         throw new SolrException(SolrException.ErrorCode.SERVER_ERROR, e);
       }
+    } finally {
+      fosLock.unlock();
     }
   }
 
diff --git a/solr/core/src/java/org/apache/solr/update/CommitTracker.java b/solr/core/src/java/org/apache/solr/update/CommitTracker.java
index 1db1f74..48664e4 100644
--- a/solr/core/src/java/org/apache/solr/update/CommitTracker.java
+++ b/solr/core/src/java/org/apache/solr/update/CommitTracker.java
@@ -38,6 +38,7 @@ import java.util.concurrent.ScheduledThreadPoolExecutor;
 import java.util.concurrent.TimeUnit;
 import java.util.concurrent.atomic.AtomicInteger;
 import java.util.concurrent.atomic.AtomicLong;
+import java.util.concurrent.locks.ReentrantLock;
 
 /**
  * Helper class for tracking autoCommit state.
@@ -62,6 +63,8 @@ public final class CommitTracker implements Runnable, Closeable {
   private long timeUpperBound;
   private long tLogFileSizeUpperBound;
 
+  private ReentrantLock lock = new ReentrantLock(true);
+
   // note: can't use ExecutorsUtil because it doesn't have a *scheduled* ExecutorService.
   //  Not a big deal but it means we must take care of MDC logging here.
   private final ScheduledThreadPoolExecutor scheduler = (ScheduledThreadPoolExecutor) Executors.newScheduledThreadPool(1, new SolrNamedThreadFactory("commitScheduler"));
@@ -105,15 +108,21 @@ public final class CommitTracker implements Runnable, Closeable {
     return openSearcher;
   }
   
-  public synchronized void close() {
-    this.closed = true;
+  public void close() {
+
+    lock.lock();
     try {
-      pending.cancel(true);
-    } catch (NullPointerException e) {
-      // okay
+      this.closed = true;
+      try {
+        pending.cancel(true);
+      } catch (NullPointerException e) {
+        // okay
+      }
+      pending = null;
+      ParWork.close(scheduler);
+    } finally {
+      lock.unlock();
     }
-    pending = null;
-    ParWork.close(scheduler);
     assert ObjectReleaseTracker.release(this);
   }
   
@@ -123,13 +132,16 @@ public final class CommitTracker implements Runnable, Closeable {
   }
 
   public void cancelPendingCommit() {
-    synchronized (this) {
+    lock.lock();
+    try {
       if (pending != null) {
         boolean canceled = pending.cancel(false);
         if (canceled) {
           pending = null;
         }
       }
+    } finally {
+      lock.unlock();
     }
   }
   
@@ -143,7 +155,8 @@ public final class CommitTracker implements Runnable, Closeable {
 
   private void _scheduleCommitWithin(long commitMaxTime) {
     if (commitMaxTime <= 0) return;
-    synchronized (this) {
+    lock.lock();
+    try {
       if (pending != null && pending.getDelay(TimeUnit.MILLISECONDS) <= commitMaxTime) {
         // There is already a pending commit that will happen first, so
         // nothing else to do here.
@@ -172,6 +185,8 @@ public final class CommitTracker implements Runnable, Closeable {
       if (!closed) {
         pending = scheduler.schedule(this, commitMaxTime, TimeUnit.MILLISECONDS);
       }
+    } finally {
+      lock.unlock();
     }
   }
   
@@ -248,21 +263,27 @@ public final class CommitTracker implements Runnable, Closeable {
   
   /** Inform tracker that a rollback has occurred, cancel any pending commits */
   public void didRollback() {
-    synchronized (this) {
+    lock.lock();
+    try {
       if (pending != null) {
         pending.cancel(false);
         pending = null; // let it start another one
       }
       docsSinceCommit.set(0);
+    } finally {
+      lock.unlock();
     }
   }
   
   /** This is the worker part for the ScheduledFuture **/
   @Override
   public void run() {
-    synchronized (this) {
+    lock.lock();
+    try {
       // log.info("###start commit. pending=null");
       pending = null;  // allow a new commit to be scheduled
+    } finally {
+      lock.unlock();
     }
 
     MDCLoggingContext.setCore(core);
diff --git a/solr/core/src/java/org/apache/solr/update/HdfsTransactionLog.java b/solr/core/src/java/org/apache/solr/update/HdfsTransactionLog.java
index cae45f6..48fef21 100644
--- a/solr/core/src/java/org/apache/solr/update/HdfsTransactionLog.java
+++ b/solr/core/src/java/org/apache/solr/update/HdfsTransactionLog.java
@@ -265,7 +265,8 @@ public class HdfsTransactionLog extends TransactionLog {
   }
 
   private void doCloseOutput() throws IOException {
-    synchronized (fosLock) {
+    fosLock.lock();
+    try {
       if (fos == null) return;
       if (debug) {
         log.debug("Closing output for {}", tlogFile);
@@ -273,6 +274,8 @@ public class HdfsTransactionLog extends TransactionLog {
       fos.flushBuffer();
       finalLogSize = fos.size();
       fos = null;
+    } finally {
+      fosLock.unlock();
     }
 
     tlogOutStream.hflush();
diff --git a/solr/core/src/java/org/apache/solr/update/TransactionLog.java b/solr/core/src/java/org/apache/solr/update/TransactionLog.java
index a7f5acb..ed69d85 100644
--- a/solr/core/src/java/org/apache/solr/update/TransactionLog.java
+++ b/solr/core/src/java/org/apache/solr/update/TransactionLog.java
@@ -35,6 +35,7 @@ import java.util.List;
 import java.util.Map;
 import java.util.TreeMap;
 import java.util.concurrent.atomic.AtomicInteger;
+import java.util.concurrent.locks.ReentrantLock;
 
 import org.apache.solr.common.util.FastInputStream;
 import org.apache.solr.common.util.FastOutputStream;
@@ -77,7 +78,7 @@ public class TransactionLog implements Closeable {
   FileChannel channel;
   OutputStream os;
   FastOutputStream fos;    // all accesses to this stream should be synchronized on "this" (The TransactionLog)
-  final Object fosLock = new Object();
+  final ReentrantLock fosLock = new ReentrantLock(true);
   volatile int numRecords;
   boolean isBuffer;
 
@@ -239,9 +240,12 @@ public class TransactionLog implements Closeable {
 
   public boolean endsWithCommit() throws IOException {
     long size;
-    synchronized (fosLock) {
+    fosLock.lock();
+    try {
       fos.flush();
       size = fos.size();
+    } finally {
+      fosLock.unlock();
     }
 
     // the end of the file should have the end message (added during a commit) plus a 4 byte size
@@ -326,12 +330,15 @@ public class TransactionLog implements Closeable {
     // rollback() is the only function that can reset to zero, and it blocks updates.
     if (fos.size() != 0) return;
 
-    synchronized (fosLock) {
+    fosLock.lock();
+    try {
       if (fos.size() != 0) return;  // check again while synchronized
       if (optional != null) {
         addGlobalStrings(optional.getFieldNames());
       }
       writeLogHeader(codec);
+    } finally {
+      fosLock.unlock();
     }
   }
 
@@ -390,7 +397,8 @@ public class TransactionLog implements Closeable {
       }
 
 
-      synchronized (fosLock) {
+      fosLock.lock();
+      try {
         lastAddSize = (int) out.size();
 
         long pos = fos.size();   // if we had flushed, this should be equal to channel.position()
@@ -407,6 +415,8 @@ public class TransactionLog implements Closeable {
         endRecord(pos);
         // fos.flushBuffer();  // flush later
         return pos;
+      } finally {
+        fosLock.unlock();
       }
 
     } catch (IOException e) {
@@ -430,13 +440,16 @@ public class TransactionLog implements Closeable {
       codec.writeLong(cmd.getVersion());
       codec.writeByteArray(br.bytes, br.offset, br.length);
 
-      synchronized (fosLock) {
+      fosLock.lock();
+      try {
         long pos = fos.size();   // if we had flushed, this should be equal to channel.position()
         assert pos != 0;
         out.writeAll(fos);
         endRecord(pos);
         // fos.flushBuffer();  // flush later
         return pos;
+      } finally {
+        fosLock.unlock();
       }
 
     } catch (IOException e) {
@@ -457,12 +470,15 @@ public class TransactionLog implements Closeable {
       codec.writeLong(cmd.getVersion());
       codec.writeStr(cmd.query);
 
-      synchronized (fosLock) {
+      fosLock.lock();
+      try {
         long pos = fos.size();   // if we had flushed, this should be equal to channel.position()
         out.writeAll(fos);
         endRecord(pos);
         // fos.flushBuffer();  // flush later
         return pos;
+      } finally {
+        fosLock.unlock();
       }
     } catch (IOException e) {
       throw new SolrException(SolrException.ErrorCode.SERVER_ERROR, e);
@@ -473,7 +489,8 @@ public class TransactionLog implements Closeable {
 
   public long writeCommit(CommitUpdateCommand cmd) {
     LogCodec codec = new LogCodec(resolver);
-    synchronized (fosLock) {
+    fosLock.lock();
+    try {
       try {
         long pos = fos.size();   // if we had flushed, this should be equal to channel.position()
 
@@ -496,6 +513,8 @@ public class TransactionLog implements Closeable {
       } catch (IOException e) {
         throw new SolrException(SolrException.ErrorCode.SERVER_ERROR, e);
       }
+    } finally {
+      fosLock.unlock();
     }
   }
 
@@ -509,7 +528,8 @@ public class TransactionLog implements Closeable {
 
     try {
       // make sure any unflushed buffer has been flushed
-      synchronized (fosLock) {
+      fosLock.lock();
+      try {
         // TODO: optimize this by keeping track of what we have flushed up to
         fos.flushBuffer();
         /***
@@ -518,6 +538,8 @@ public class TransactionLog implements Closeable {
          throw new RuntimeException("ERROR" + "###flushBuffer to " + fos.size() + " raf.length()=" + raf.length() + " pos="+pos);
          }
          ***/
+      } finally {
+        fosLock.unlock();
       }
 
       ChannelFastInputStream fis = new ChannelFastInputStream(channel, pos);
@@ -548,8 +570,11 @@ public class TransactionLog implements Closeable {
 
   /** returns the current position in the log file */
   public long position() {
-    synchronized (fosLock) {
+    fosLock.lock();
+    try {
       return fos.size();
+    } finally {
+      fosLock.unlock();
     }
   }
 
@@ -561,8 +586,11 @@ public class TransactionLog implements Closeable {
   public void finish(UpdateLog.SyncLevel syncLevel) {
     if (syncLevel == UpdateLog.SyncLevel.NONE) return;
     try {
-      synchronized (fosLock) {
+      fosLock.lock();
+      try {
         fos.flushBuffer();
+      } finally {
+        fosLock.unlock();
       }
 
       if (syncLevel == UpdateLog.SyncLevel.FSYNC) {
@@ -583,9 +611,12 @@ public class TransactionLog implements Closeable {
         log.debug("Closing tlog {}", this);
       }
 
-      synchronized (fosLock) {
+      fosLock.lock();
+      try {
         fos.flush();
         fos.close();
+      } finally {
+        fosLock.unlock();
       }
 
       if (deleteOnClose) {
@@ -665,7 +696,8 @@ public class TransactionLog implements Closeable {
      */
     public Object next() throws IOException, InterruptedException {
       long pos;
-      synchronized (fosLock) {
+      fosLock.lock();
+      try {
         pos = fis.position();
         if (trace) {
           log.trace("Reading log record.  pos={} currentSize={}", pos, fos.size());
@@ -696,6 +728,8 @@ public class TransactionLog implements Closeable {
         assert size == fis.position() - pos - 4;
 
         return o;
+      } finally {
+        fosLock.unlock();
       }
     }
 
@@ -737,7 +771,8 @@ public class TransactionLog implements Closeable {
 
     @Override
     public Object next() throws IOException, InterruptedException {
-      synchronized (fosLock) {
+      fosLock.lock();
+      try {
         if (versionToPos == null) {
           versionToPos = new TreeMap<>();
           Object o;
@@ -767,6 +802,8 @@ public class TransactionLog implements Closeable {
           if (pos != currentPos()) fis.seek(pos);
           return super.next();
         }
+      } finally {
+        fosLock.unlock();
       }
     }
   }
@@ -808,7 +845,8 @@ public class TransactionLog implements Closeable {
       incref();
 
       long sz;
-      synchronized (fosLock) {
+      fosLock.lock();
+      try {
         fos.flushBuffer();
         sz = fos.size();
         assert sz == channel.size();
@@ -820,6 +858,8 @@ public class TransactionLog implements Closeable {
           fis.seek(prevPos);
           nextLength = fis.readInt();
         }
+      } finally {
+        fosLock.unlock();
       }
     }
 
@@ -830,7 +870,8 @@ public class TransactionLog implements Closeable {
      */
     public Object next() throws IOException {
       Object o = null;
-      synchronized (fosLock) {
+      fosLock.lock();
+      try {
         if (prevPos <= 0) return null;
 
         long endOfThisRecord = prevPos;
@@ -862,6 +903,8 @@ public class TransactionLog implements Closeable {
         o = codec.readVal(fis);
 
         // assert fis.position() == prevPos + 4 + thisLength;  // this is only true if we read all the data (and we currently skip reading SolrInputDocument
+      } finally {
+        fosLock.unlock();
       }
       return o;
     }
@@ -877,8 +920,11 @@ public class TransactionLog implements Closeable {
 
     @Override
     public String toString() {
-      synchronized (fosLock) {
+      fosLock.lock();
+      try {
         return "LogReader{" + "file=" + tlogFile + ", position=" + fis.position() + ", end=" + fos.size() + "}";
+      } finally {
+        fosLock.unlock();
       }
     }
 
@@ -898,14 +944,18 @@ public class TransactionLog implements Closeable {
     @Override
     public int readWrappedStream(byte[] target, int offset, int len) throws IOException {
       ByteBuffer bb = ByteBuffer.wrap(target, offset, len);
-      synchronized (fosLock) {
+      fosLock.lock();
+      try {
         int ret = ch.read(bb, readFromStream);
         return ret;
+      } finally {
+        fosLock.unlock();
       }
     }
 
     public void seek(long position) throws IOException {
-      synchronized (fosLock) {
+      fosLock.lock();
+      try {
         if (position <= readFromStream && position >= getBufferPos()) {
           // seek within buffer
           pos = (int) (position - getBufferPos());
@@ -916,26 +966,37 @@ public class TransactionLog implements Closeable {
           end = pos = 0;
         }
         assert position() == position;
+      } finally {
+        fosLock.unlock();
       }
     }
 
   /** where is the start of the buffer relative to the whole file */
     public long getBufferPos() {
-      synchronized (fosLock) {
+      fosLock.lock();
+      try {
         return readFromStream - end;
+      } finally {
+        fosLock.unlock();
       }
     }
 
     public int getBufferSize() {
-      synchronized (fosLock) {
+      fosLock.lock();
+      try {
         return buf.length;
+      } finally {
+        fosLock.unlock();
       }
     }
 
     @Override
     public void close() throws IOException {
-      synchronized (fosLock) {
+      fosLock.lock();
+      try {
         ch.close();
+      } finally {
+        fosLock.unlock();
       }
     }
 
diff --git a/solr/core/src/java/org/apache/solr/update/UpdateShardHandler.java b/solr/core/src/java/org/apache/solr/update/UpdateShardHandler.java
index 089c719..29906b1 100644
--- a/solr/core/src/java/org/apache/solr/update/UpdateShardHandler.java
+++ b/solr/core/src/java/org/apache/solr/update/UpdateShardHandler.java
@@ -104,8 +104,7 @@ public class UpdateShardHandler implements SolrInfoBean {
     if (cfg != null) {
       updateOnlyClientBuilder
           .connectionTimeout(cfg.getDistributedConnectionTimeout())
-          .idleTimeout(cfg.getDistributedSocketTimeout())
-          .maxConnectionsPerHost(cfg.getMaxUpdateConnectionsPerHost());
+          .idleTimeout(cfg.getDistributedSocketTimeout());
     }
     updateOnlyClient = updateOnlyClientBuilder.markInternalRequest().build();
     updateOnlyClient.enableCloseLock();
diff --git a/solr/solrj/src/java/org/apache/solr/client/solrj/impl/Http2SolrClient.java b/solr/solrj/src/java/org/apache/solr/client/solrj/impl/Http2SolrClient.java
index 38959f7..ef0ca18 100644
--- a/solr/solrj/src/java/org/apache/solr/client/solrj/impl/Http2SolrClient.java
+++ b/solr/solrj/src/java/org/apache/solr/client/solrj/impl/Http2SolrClient.java
@@ -933,7 +933,7 @@ public class Http2SolrClient extends SolrClient {
     private SSLConfig sslConfig = defaultSSLConfig;
     private Integer idleTimeout = Integer.getInteger("solr.http2solrclient.default.idletimeout", 30000);
     private Integer connectionTimeout;
-    private Integer maxConnectionsPerHost;
+    private Integer maxConnectionsPerHost = 4;
     private boolean useHttp1_1 = Boolean.getBoolean("solr.http1");
     protected String baseSolrUrl;
     protected Map<String,String> headers = new ConcurrentHashMap<>();
diff --git a/solr/solrj/src/java/org/apache/solr/common/cloud/ZkStateReader.java b/solr/solrj/src/java/org/apache/solr/common/cloud/ZkStateReader.java
index 20e5c38..d534f5e 100644
--- a/solr/solrj/src/java/org/apache/solr/common/cloud/ZkStateReader.java
+++ b/solr/solrj/src/java/org/apache/solr/common/cloud/ZkStateReader.java
@@ -199,10 +199,10 @@ public class ZkStateReader implements SolrCloseable {
 
   private final Runnable securityNodeListener;
 
-  private final ConcurrentHashMap<String, CollectionWatch<DocCollectionWatcher>> collectionWatches = new ConcurrentHashMap<>(16, 0.75f, 5);
+  private final ConcurrentHashMap<String, CollectionWatch<DocCollectionWatcher>> collectionWatches = new ConcurrentHashMap<>(32, 0.75f, 3);
 
   // named this observers so there's less confusion between CollectionPropsWatcher map and the PropsWatcher map.
-  private final ConcurrentHashMap<String, CollectionPropsWatcher> collectionPropsObservers = new ConcurrentHashMap<>(16, 0.75f, 5);
+  private final ConcurrentHashMap<String, CollectionPropsWatcher> collectionPropsObservers = new ConcurrentHashMap<>(32, 0.75f, 3);
 
   private Set<CloudCollectionsListener> cloudCollectionsListeners = ConcurrentHashMap.newKeySet();
 
@@ -1677,16 +1677,15 @@ public class ZkStateReader implements SolrCloseable {
    */
   public void registerDocCollectionWatcher(String collection, DocCollectionWatcher stateWatcher) {
     AtomicBoolean watchSet = new AtomicBoolean(false);
-    synchronized (collectionWatches) {
-      collectionWatches.compute(collection, (k, v) -> {
-        if (v == null) {
-          v = new CollectionWatch<>();
-          watchSet.set(true);
-        }
-        v.stateWatchers.add(stateWatcher);
-        return v;
-      });
-    }
+
+    collectionWatches.compute(collection, (k, v) -> {
+      if (v == null) {
+        v = new CollectionWatch<>();
+        watchSet.set(true);
+      }
+      v.stateWatchers.add(stateWatcher);
+      return v;
+    });
 
     if (watchSet.get()) {
       new StateWatcher(collection).refreshAndWatch();