You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@lucene.apache.org by ma...@apache.org on 2021/03/11 18:28:32 UTC
[lucene-solr] 01/02: @1455 Write the collection we are interested
in.
This is an automated email from the ASF dual-hosted git repository.
markrmiller pushed a commit to branch reference_impl
in repository https://gitbox.apache.org/repos/asf/lucene-solr.git
commit ec7a1da7468307de43c51e0412575e4922c6d9cb
Author: markrmiller@gmail.com <ma...@gmail.com>
AuthorDate: Thu Mar 11 12:02:39 2021 -0600
@1455 Write the collection we are interested in.
Took 22 minutes
---
.../src/java/org/apache/solr/cloud/Overseer.java | 21 +-
.../solr/cloud/OverseerTaskExecutorTask.java | 6 +-
.../cloud/api/collections/CreateCollectionCmd.java | 2 +-
.../solr/cloud/api/collections/CreateShardCmd.java | 2 +-
.../cloud/api/collections/DeleteReplicaCmd.java | 4 +-
.../solr/cloud/api/collections/DeleteShardCmd.java | 2 +-
.../solr/cloud/api/collections/MoveReplicaCmd.java | 5 +-
.../OverseerCollectionMessageHandler.java | 4 +-
.../solr/cloud/api/collections/ReplaceNodeCmd.java | 6 +-
.../solr/cloud/api/collections/SplitShardCmd.java | 2 +-
.../apache/solr/cloud/overseer/ZkStateWriter.java | 233 ++++++++++-----------
11 files changed, 133 insertions(+), 154 deletions(-)
diff --git a/solr/core/src/java/org/apache/solr/cloud/Overseer.java b/solr/core/src/java/org/apache/solr/cloud/Overseer.java
index dd665e2..f200fe7 100644
--- a/solr/core/src/java/org/apache/solr/cloud/Overseer.java
+++ b/solr/core/src/java/org/apache/solr/cloud/Overseer.java
@@ -72,7 +72,6 @@ import java.util.HashSet;
import java.util.List;
import java.util.Map;
import java.util.Set;
-import java.util.concurrent.ExecutionException;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Future;
import java.util.concurrent.TimeUnit;
@@ -712,8 +711,8 @@ public class Overseer implements SolrCloseable {
return future;
}
- public Future writePendingUpdates() {
- return ParWork.getRootSharedExecutor().submit(new OverseerTaskExecutorTask.WriteTask(getCoreContainer(), zkStateWriter));
+ public Future writePendingUpdates(String collection) {
+ return ParWork.getRootSharedExecutor().submit(new OverseerTaskExecutorTask.WriteTask(getCoreContainer(), collection));
}
private static abstract class QueueWatcher implements Watcher, Closeable {
@@ -766,7 +765,7 @@ public class Overseer implements SolrCloseable {
return;
}
- //ourLock.lock();
+ ourLock.lock();
try {
try {
List<String> items = getItems();
@@ -779,7 +778,7 @@ public class Overseer implements SolrCloseable {
log.error("Exception during overseer queue queue processing", e);
}
} finally {
- // ourLock.unlock();
+ ourLock.unlock();
}
}
@@ -820,7 +819,6 @@ public class Overseer implements SolrCloseable {
@Override
protected void processQueueItems(List<String> items, boolean onStart) {
List<String> fullPaths = new ArrayList<>(items.size());
- ourLock.lock();
try {
if (log.isDebugEnabled()) log.debug("Found state update queue items {}", items);
for (String item : items) {
@@ -856,9 +854,13 @@ public class Overseer implements SolrCloseable {
log.error("failed waiting for enqueued updates", e);
}
}
- overseer.writePendingUpdates();
+ Set<String> collections = overseer.zkStateWriter.getDirtyStateCollections();
+ for (String collection : collections) {
+ overseer.writePendingUpdates(collection);
+ }
} finally {
+
if (overseer.zkStateWriter != null) {
if (zkController.getZkClient().isAlive()) {
try {
@@ -867,8 +869,8 @@ public class Overseer implements SolrCloseable {
log.warn("Failed deleting processed items", e);
}
}
+
}
- ourLock.unlock();
}
}
@@ -909,7 +911,6 @@ public class Overseer implements SolrCloseable {
@Override
protected void processQueueItems(List<String> items, boolean onStart) {
List<String> fullPaths = new ArrayList<>(items.size());
- ourLock.lock();
try {
log.info("Found collection queue items {} onStart={}", items, onStart);
for (String item : items) {
@@ -940,9 +941,7 @@ public class Overseer implements SolrCloseable {
} catch (Exception e) {
log.warn("Delete items failed {}", e.getMessage());
}
- ourLock.unlock();
}
-
}
private void runAsync(Map.Entry<String,byte[]> entry, boolean onStart) {
diff --git a/solr/core/src/java/org/apache/solr/cloud/OverseerTaskExecutorTask.java b/solr/core/src/java/org/apache/solr/cloud/OverseerTaskExecutorTask.java
index ac6a4cb..c118ac5 100644
--- a/solr/core/src/java/org/apache/solr/cloud/OverseerTaskExecutorTask.java
+++ b/solr/core/src/java/org/apache/solr/cloud/OverseerTaskExecutorTask.java
@@ -82,16 +82,18 @@ public class OverseerTaskExecutorTask {
}
public static class WriteTask implements Runnable {
+ private final String collection;
CoreContainer coreContainer;
- public WriteTask(CoreContainer coreContainer, ZkStateWriter zkStateWriter) {
+ public WriteTask(CoreContainer coreContainer, String collection) {
+ this.collection = collection;
this.coreContainer = coreContainer;
}
@Override
public void run() {
try {
- coreContainer.getZkController().getOverseer().getZkStateWriter().writePendingUpdates();
+ coreContainer.getZkController().getOverseer().getZkStateWriter().writePendingUpdates(collection);
} catch (NullPointerException e) {
if (log.isDebugEnabled()) log.debug("Won't write pending updates, zkStateWriter=null");
} catch (Exception e) {
diff --git a/solr/core/src/java/org/apache/solr/cloud/api/collections/CreateCollectionCmd.java b/solr/core/src/java/org/apache/solr/cloud/api/collections/CreateCollectionCmd.java
index 4783d7b..0ba35e8 100644
--- a/solr/core/src/java/org/apache/solr/cloud/api/collections/CreateCollectionCmd.java
+++ b/solr/core/src/java/org/apache/solr/cloud/api/collections/CreateCollectionCmd.java
@@ -321,7 +321,7 @@ public class CreateCollectionCmd implements OverseerCollectionMessageHandler.Cmd
Future future = ocmh.overseer.getZkStateWriter().enqueueUpdate(clusterState.getCollection(collectionName), null, false);
future.get();
- writeFuture = ocmh.overseer.writePendingUpdates();
+ writeFuture = ocmh.overseer.writePendingUpdates(collectionName);
if (log.isDebugEnabled()) log.debug("Sending create call for {} replicas for {}", coresToCreate.size(), collectionName);
for (Map.Entry<String,ShardRequest> e : coresToCreate.entrySet()) {
diff --git a/solr/core/src/java/org/apache/solr/cloud/api/collections/CreateShardCmd.java b/solr/core/src/java/org/apache/solr/cloud/api/collections/CreateShardCmd.java
index c9a2708..91851f9 100644
--- a/solr/core/src/java/org/apache/solr/cloud/api/collections/CreateShardCmd.java
+++ b/solr/core/src/java/org/apache/solr/cloud/api/collections/CreateShardCmd.java
@@ -188,7 +188,7 @@ public class CreateShardCmd implements OverseerCollectionMessageHandler.Cmd {
log.error("failure", e);
throw new SolrException(SolrException.ErrorCode.SERVER_ERROR, e);
}
- response.writeFuture = overseer.writePendingUpdates();
+ response.writeFuture = overseer.writePendingUpdates(collection);
if (resp.asyncFinalRunner != null) {
try {
resp.asyncFinalRunner.call();
diff --git a/solr/core/src/java/org/apache/solr/cloud/api/collections/DeleteReplicaCmd.java b/solr/core/src/java/org/apache/solr/cloud/api/collections/DeleteReplicaCmd.java
index 4b8a170..29ea39c 100644
--- a/solr/core/src/java/org/apache/solr/cloud/api/collections/DeleteReplicaCmd.java
+++ b/solr/core/src/java/org/apache/solr/cloud/api/collections/DeleteReplicaCmd.java
@@ -198,7 +198,7 @@ public class DeleteReplicaCmd implements Cmd {
if (waitForFinalState) {
try {
ocmh.overseer.getZkStateWriter().enqueueUpdate(finalClusterState.getCollection(finalCollectionName1), null, false).get();
- ocmh.overseer.writePendingUpdates();
+ ocmh.overseer.writePendingUpdates(finalCollectionName1);
waitForCoreNodeGone(finalCollectionName1, shard, replicaName, 5000); // MRM TODO: timeout
} catch (Exception e) {
log.error("Failed waiting for replica to be removed", e);
@@ -280,7 +280,7 @@ public class DeleteReplicaCmd implements Cmd {
for (Future future : futures) {
future.get();
}
- ocmh.overseer.writePendingUpdates();
+ ocmh.overseer.writePendingUpdates(collectionName);
} catch (Exception e) {
log.error("failed writing update to zkstatewriter", e);
throw new SolrException(SolrException.ErrorCode.SERVER_ERROR, e);
diff --git a/solr/core/src/java/org/apache/solr/cloud/api/collections/DeleteShardCmd.java b/solr/core/src/java/org/apache/solr/cloud/api/collections/DeleteShardCmd.java
index 3a3be00..1ab2ebe 100644
--- a/solr/core/src/java/org/apache/solr/cloud/api/collections/DeleteShardCmd.java
+++ b/solr/core/src/java/org/apache/solr/cloud/api/collections/DeleteShardCmd.java
@@ -178,7 +178,7 @@ public class DeleteShardCmd implements OverseerCollectionMessageHandler.Cmd {
}
if (waitForFinalState) {
ocmh.overseer.getZkStateWriter().enqueueUpdate(finalClusterState.getCollection(collectionName), null, false).get();
- ocmh.overseer.writePendingUpdates().get();
+ ocmh.overseer.writePendingUpdates(collectionName).get();
ocmh.overseer.getZkStateReader().waitForState(collectionName, 10, TimeUnit.SECONDS, (liveNodes, coll) -> {
if (coll == null) {
return true;
diff --git a/solr/core/src/java/org/apache/solr/cloud/api/collections/MoveReplicaCmd.java b/solr/core/src/java/org/apache/solr/cloud/api/collections/MoveReplicaCmd.java
index 8ae6612..e9f6aaa 100644
--- a/solr/core/src/java/org/apache/solr/cloud/api/collections/MoveReplicaCmd.java
+++ b/solr/core/src/java/org/apache/solr/cloud/api/collections/MoveReplicaCmd.java
@@ -308,8 +308,9 @@ public class MoveReplicaCmd implements OverseerCollectionMessageHandler.Cmd {
@SuppressWarnings({"rawtypes"}) NamedList deleteResult = new NamedList();
try {
response1.clusterState = ocmh.deleteReplica(clusterState, removeReplicasProps, deleteResult).clusterState;
- ocmh.overseer.getZkStateWriter().enqueueUpdate( response1.clusterState.getCollection(response1.clusterState.getCollectionsMap().keySet().iterator().next()), null,false).get();
- asyncResp.writeFuture = ocmh.overseer.writePendingUpdates();
+ String collection = response1.clusterState.getCollectionsMap().keySet().iterator().next();
+ ocmh.overseer.getZkStateWriter().enqueueUpdate( response1.clusterState.getCollection(collection), null,false).get();
+ asyncResp.writeFuture = ocmh.overseer.writePendingUpdates(collection);
} catch (SolrException e) {
deleteResult.add("failure", e.toString());
}
diff --git a/solr/core/src/java/org/apache/solr/cloud/api/collections/OverseerCollectionMessageHandler.java b/solr/core/src/java/org/apache/solr/cloud/api/collections/OverseerCollectionMessageHandler.java
index 3126c0e..faa10df 100644
--- a/solr/core/src/java/org/apache/solr/cloud/api/collections/OverseerCollectionMessageHandler.java
+++ b/solr/core/src/java/org/apache/solr/cloud/api/collections/OverseerCollectionMessageHandler.java
@@ -333,7 +333,7 @@ public class OverseerCollectionMessageHandler implements OverseerMessageHandler,
}
if (asyncResp == null || asyncResp.writeFuture == null) {
future.get();
- writeFuture2 = overseer.writePendingUpdates();
+ writeFuture2 = overseer.writePendingUpdates(collection);
}
} else {
@@ -356,7 +356,7 @@ public class OverseerCollectionMessageHandler implements OverseerMessageHandler,
if (future != null) {
future.get();
}
- writeFuture = overseer.writePendingUpdates();
+ writeFuture = overseer.writePendingUpdates(collection);
}
}
}
diff --git a/solr/core/src/java/org/apache/solr/cloud/api/collections/ReplaceNodeCmd.java b/solr/core/src/java/org/apache/solr/cloud/api/collections/ReplaceNodeCmd.java
index a694708..2b0fdec 100644
--- a/solr/core/src/java/org/apache/solr/cloud/api/collections/ReplaceNodeCmd.java
+++ b/solr/core/src/java/org/apache/solr/cloud/api/collections/ReplaceNodeCmd.java
@@ -145,9 +145,9 @@ public class ReplaceNodeCmd implements OverseerCollectionMessageHandler.Cmd {
};
runners.add(runner);
}
-
- ocmh.overseer.getZkStateWriter().enqueueUpdate(clusterState.getCollection(clusterState.getCollectionStates().keySet().iterator().next()), null, false).get();
- ocmh.overseer.writePendingUpdates();
+ String collection = clusterState.getCollectionStates().keySet().iterator().next();
+ ocmh.overseer.getZkStateWriter().enqueueUpdate(clusterState.getCollection(collection), null, false).get();
+ ocmh.overseer.writePendingUpdates(collection);
CollectionCmdResponse.Response response = new CollectionCmdResponse.Response();
response.results = results;
diff --git a/solr/core/src/java/org/apache/solr/cloud/api/collections/SplitShardCmd.java b/solr/core/src/java/org/apache/solr/cloud/api/collections/SplitShardCmd.java
index ca636a8..793ac18 100644
--- a/solr/core/src/java/org/apache/solr/cloud/api/collections/SplitShardCmd.java
+++ b/solr/core/src/java/org/apache/solr/cloud/api/collections/SplitShardCmd.java
@@ -350,7 +350,7 @@ public class SplitShardCmd implements OverseerCollectionMessageHandler.Cmd {
}
ocmh.overseer.getZkStateWriter().enqueueUpdate(clusterState.getCollection(collectionName), null,false).get();
- ocmh.overseer.writePendingUpdates();
+ ocmh.overseer.writePendingUpdates(collectionName);
log.info("Clusterstate after adding new shard for split {}", clusterState);
diff --git a/solr/core/src/java/org/apache/solr/cloud/overseer/ZkStateWriter.java b/solr/core/src/java/org/apache/solr/cloud/overseer/ZkStateWriter.java
index 92123bf..c039414 100644
--- a/solr/core/src/java/org/apache/solr/cloud/overseer/ZkStateWriter.java
+++ b/solr/core/src/java/org/apache/solr/cloud/overseer/ZkStateWriter.java
@@ -67,7 +67,6 @@ public class ZkStateWriter {
protected volatile Stats stats;
- AtomicReference<Exception> lastFailedException = new AtomicReference<>();
private final Map<String,Integer> trackVersions = new ConcurrentHashMap<>();
private final Map<String, ZkNodeProps> stateUpdates = new ConcurrentHashMap<>();
@@ -510,11 +509,11 @@ public class ZkStateWriter {
*/
// if additional updates too large, publish structure change
- public void writePendingUpdates() {
+ public void writePendingUpdates(String collection) {
do {
try {
- write();
+ write(collection);
break;
} catch (KeeperException.BadVersionException e) {
@@ -527,156 +526,129 @@ public class ZkStateWriter {
}
- private void write() throws KeeperException.BadVersionException {
- // writeLock.lock();
- // try {
- // log.info("Get our write lock");
-
- //ourLock.lock();
- try {
- // log.info("Got our write lock");
- if (log.isDebugEnabled()) {
- log.debug("writePendingUpdates {}", cs);
- }
+ private void write(String coll) throws KeeperException.BadVersionException {
+ if (log.isDebugEnabled()) {
+ log.debug("writePendingUpdates {}", cs);
+ }
+ AtomicInteger lastVersion = new AtomicInteger();
+ AtomicReference<KeeperException.BadVersionException> badVersionException = new AtomicReference();
- // if (failedUpdates.size() > 0) {
- // Exception lfe = lastFailedException.get();
- // log.warn("Some collection updates failed {} logging last exception", failedUpdates, lfe); // MRM TODO: expand
- // failedUpdates.clear();
- // lfe = null;
- // throw new SolrException(SolrException.ErrorCode.SERVER_ERROR, lfe);
- // }
- // } finally {
- // ourLock.unlock();
- // }
-
- // wait to see our last publish version has propagated TODO don't wait on collections not hosted on overseer?
- // waitForStateWePublishedToComeBack();
-
- // ourLock.lock();
- AtomicInteger lastVersion = new AtomicInteger();
- AtomicReference<KeeperException.BadVersionException> badVersionException = new AtomicReference();
- List<String> removeCollections = new ArrayList<>();
- //log.info("writing out state, looking at collections count={} toWrite={} {} : {}", cs.getCollectionsMap().size(), collectionsToWrite.size(), cs.getCollectionsMap().keySet(), collectionsToWrite);
- //try {
- cs.values().forEach(collection -> {
-
-
- if (log.isDebugEnabled()) log.debug("check collection {} {} {}", collection, dirtyStructure, dirtyState);
- Integer version = null;
- if (dirtyStructure.contains(collection.getName()) || dirtyState.contains(collection.getName())) {
- log.info("process collection {}", collection);
- ColState collState = collLocks.compute(collection.getName(), (s, reentrantLock) -> {
- if (reentrantLock == null) {
- ColState colState = new ColState();
- return colState;
- }
- return reentrantLock;
- });
+ DocCollection collection = cs.get(coll);
- collState.collLock.lock();
- try {
- collState.throttle.minimumWaitBetweenActions();
- collState.throttle.markAttemptingAction();
- String name = collection.getName();
- String path = ZkStateReader.getCollectionPath(collection.getName());
- String pathSCN = ZkStateReader.getCollectionSCNPath(collection.getName());
- // log.info("process collection {} path {}", collection.getName(), path);
- Stat existsStat = null;
- if (log.isTraceEnabled()) log.trace("process {}", collection);
- try {
- // log.info("get data for {}", name);
- byte[] data = Utils.toJSON(singletonMap(name, collection));
- // log.info("got data for {} {}", name, data.length);
+ if (collection == null) {
+ return;
+ }
- try {
+ if (log.isDebugEnabled()) log.debug("check collection {} {} {}", collection, dirtyStructure, dirtyState);
+ Integer version = null;
+ if (dirtyStructure.contains(collection.getName()) || dirtyState.contains(collection.getName())) {
+ log.info("process collection {}", collection);
+ ColState collState = collLocks.compute(collection.getName(), (s, reentrantLock) -> {
+ if (reentrantLock == null) {
+ ColState colState = new ColState();
+ return colState;
+ }
+ return reentrantLock;
+ });
- if (dirtyStructure.contains(collection.getName())) {
- if (log.isDebugEnabled()) log.debug("structure change in {}", collection.getName());
+ collState.collLock.lock();
+ try {
+ collState.throttle.minimumWaitBetweenActions();
+ collState.throttle.markAttemptingAction();
+ String name = collection.getName();
+ String path = ZkStateReader.getCollectionPath(collection.getName());
+ String pathSCN = ZkStateReader.getCollectionSCNPath(collection.getName());
+ // log.info("process collection {} path {}", collection.getName(), path);
+ Stat existsStat = null;
+ if (log.isTraceEnabled()) log.trace("process {}", collection);
+ try {
+ // log.info("get data for {}", name);
+ byte[] data = Utils.toJSON(singletonMap(name, collection));
+ // log.info("got data for {} {}", name, data.length);
- Integer v = trackVersions.get(collection.getName());
+ try {
- if (v != null) {
- //log.info("got version from cache {}", v);
- version = v;
- } else {
- version = 0;
- }
- lastVersion.set(version);
- if (log.isDebugEnabled()) log.debug("Write state.json prevVersion={} bytes={} col={}", version, data.length, collection);
+ if (dirtyStructure.contains(collection.getName())) {
+ if (log.isDebugEnabled()) log.debug("structure change in {}", collection.getName());
- reader.getZkClient().setData(path, data, version, true, false);
- if (log.isDebugEnabled()) log.debug("set new version {} {}", collection.getName(), version + 1);
- trackVersions.put(collection.getName(), version + 1);
+ Integer v = trackVersions.get(collection.getName());
- reader.getZkClient().setData(pathSCN, null, -1, true, false);
- dirtyStructure.remove(collection.getName());
+ if (v != null) {
+ //log.info("got version from cache {}", v);
+ version = v;
+ } else {
+ version = 0;
+ }
+ lastVersion.set(version);
+ if (log.isDebugEnabled()) log.debug("Write state.json prevVersion={} bytes={} col={}", version, data.length, collection);
- ZkNodeProps updates = stateUpdates.get(collection.getName());
- if (updates != null) {
- updates.getProperties().clear();
- }
- }
+ reader.getZkClient().setData(path, data, version, true, false);
+ if (log.isDebugEnabled()) log.debug("set new version {} {}", collection.getName(), version + 1);
+ trackVersions.put(collection.getName(), version + 1);
- } catch (KeeperException.NoNodeException e) {
- if (log.isDebugEnabled()) log.debug("No node found for state.json", e);
-
- lastVersion.set(-1);
- trackVersions.remove(collection.getName());
- stateUpdates.remove(collection.getName());
- cs.remove(collection);
- // likely deleted
-
- } catch (KeeperException.BadVersionException bve) {
- log.info("Tried to update state.json ({}) with bad version", collection);
- //lastFailedException.set(bve);
- //failedUpdates.put(collection.getName(), collection);
- // Stat estate = reader.getZkClient().exists(path, null);
- trackVersions.remove(collection.getName());
- Stat stat = reader.getZkClient().exists(path, null, false, false);
- log.info("Tried to update state.json ({}) with bad version {} \n {}", collection, version, stat != null ? stat.getVersion() : "null");
-
- if (!overseer.isClosed() && stat != null) {
- trackVersions.put(collection.getName(), stat.getVersion());
- }
- throw bve;
- }
+ reader.getZkClient().setData(pathSCN, null, -1, true, false);
+ dirtyStructure.remove(collection.getName());
- if (dirtyState.contains(collection.getName())) { //&& !dirtyStructure.contains(collection.getName())
- ZkNodeProps updates = stateUpdates.get(collection.getName());
- if (updates != null) {
- writeStateUpdates(collection, updates);
- }
+ ZkNodeProps updates = stateUpdates.get(collection.getName());
+ if (updates != null) {
+ updates.getProperties().clear();
}
+ }
- } catch (KeeperException.BadVersionException bve) {
- badVersionException.set(bve);
- } catch (InterruptedException | AlreadyClosedException e) {
- log.info("We have been closed or one of our resources has, bailing {}", e.getClass().getSimpleName() + ":" + e.getMessage());
-
- } catch (Exception e) {
- log.error("Failed processing update=" + collection, e);
+ } catch (KeeperException.NoNodeException e) {
+ if (log.isDebugEnabled()) log.debug("No node found for state.json", e);
+
+ lastVersion.set(-1);
+ trackVersions.remove(collection.getName());
+ stateUpdates.remove(collection.getName());
+ cs.remove(collection);
+ // likely deleted
+
+ } catch (KeeperException.BadVersionException bve) {
+ log.info("Tried to update state.json ({}) with bad version", collection);
+ //lastFailedException.set(bve);
+ //failedUpdates.put(collection.getName(), collection);
+ // Stat estate = reader.getZkClient().exists(path, null);
+ trackVersions.remove(collection.getName());
+ Stat stat = reader.getZkClient().exists(path, null, false, false);
+ log.info("Tried to update state.json ({}) with bad version {} \n {}", collection, version, stat != null ? stat.getVersion() : "null");
+
+ if (!overseer.isClosed() && stat != null) {
+ trackVersions.put(collection.getName(), stat.getVersion());
}
- } finally {
- collState.collLock.unlock();
+ throw bve;
}
- }
- });
+ if (dirtyState.contains(collection.getName())) { //&& !dirtyStructure.contains(collection.getName())
+ ZkNodeProps updates = stateUpdates.get(collection.getName());
+ if (updates != null) {
+ writeStateUpdates(collection, updates);
+ }
+ }
- //removeCollections.forEach(c -> removeCollection(c));
+ } catch (KeeperException.BadVersionException bve) {
+ badVersionException.set(bve);
+ } catch (InterruptedException | AlreadyClosedException e) {
+ log.info("We have been closed or one of our resources has, bailing {}", e.getClass().getSimpleName() + ":" + e.getMessage());
- if (badVersionException.get() != null) {
- throw badVersionException.get();
+ } catch (Exception e) {
+ log.error("Failed processing update=" + collection, e);
+ }
+ } finally {
+ collState.collLock.unlock();
}
+ }
- //log.info("Done with successful cluster write out");
- } finally {
+ //removeCollections.forEach(c -> removeCollection(c));
+ if (badVersionException.get() != null) {
+ throw badVersionException.get();
}
+ //log.info("Done with successful cluster write out");
+
// } finally {
// writeLock.unlock();
// }
@@ -703,6 +675,11 @@ public class ZkStateWriter {
return ClusterState.getRefCS(cs, -2);
}
+ public Set<String> getDirtyStateCollections() {
+ return dirtyState;
+ }
+
+
public void removeCollection(String collection) {
log.info("Removing collection from zk state {}", collection);
ColState collState = collLocks.compute(collection, (s, reentrantLock) -> {