You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@lucene.apache.org by ma...@apache.org on 2020/10/29 04:18:33 UTC
[lucene-solr] 01/02: @1064 Harden.
This is an automated email from the ASF dual-hosted git repository.
markrmiller pushed a commit to branch reference_impl_dev
in repository https://gitbox.apache.org/repos/asf/lucene-solr.git
commit 07243eb28bd271d9c3aab643a1e011d1eaff54a5
Author: markrmiller@gmail.com <ma...@gmail.com>
AuthorDate: Wed Oct 28 22:21:33 2020 -0500
@1064 Harden.
---
.../src/java/org/apache/solr/cloud/Overseer.java | 18 ++++-----------
.../apache/solr/cloud/OverseerTaskProcessor.java | 2 +-
.../org/apache/solr/cloud/ZkDistributedQueue.java | 11 ++++++---
.../apache/solr/cloud/overseer/ZkStateWriter.java | 3 ++-
.../java/org/apache/solr/update/UpdateHandler.java | 13 +++++++++++
.../src/java/org/apache/solr/update/UpdateLog.java | 4 ++--
.../java/org/apache/solr/update/VersionInfo.java | 27 +++++++++++-----------
.../org/apache/solr/cloud/TestCloudRecovery2.java | 27 ++++++++++------------
.../org/apache/solr/cloud/TestPrepRecovery.java | 9 ++------
.../collections/CollectionTooManyReplicasTest.java | 15 ++++--------
.../apache/solr/common/cloud/ZkStateReader.java | 8 -------
.../solrj/impl/CloudHttp2SolrClientTest.java | 2 +-
.../apache/solr/cloud/MiniSolrCloudCluster.java | 14 ++++++++---
13 files changed, 73 insertions(+), 80 deletions(-)
diff --git a/solr/core/src/java/org/apache/solr/cloud/Overseer.java b/solr/core/src/java/org/apache/solr/cloud/Overseer.java
index e13265f..287cd6b 100644
--- a/solr/core/src/java/org/apache/solr/cloud/Overseer.java
+++ b/solr/core/src/java/org/apache/solr/cloud/Overseer.java
@@ -252,7 +252,7 @@ public class Overseer implements SolrCloseable {
// force flush to ZK after each message because there is no fallback if workQueue items
// are removed from workQueue but fail to be written to ZK
try {
- processQueueItem(message, clusterState, zkStateWriter, false, null);
+ processQueueItem(message, reader.getClusterState(), zkStateWriter, false, null);
} catch (InterruptedException | AlreadyClosedException e) {
ParWork.propagateInterrupt(e);
return;
@@ -306,12 +306,7 @@ public class Overseer implements SolrCloseable {
try {
// We do not need to filter any nodes here cause all processed nodes are removed once we flush clusterstate
- long wait = 10000;
-// if (zkStateWriter.getUpdatesToWrite().isEmpty()) {
-// wait = 100;
-// } else {
-// wait = 0;
-// }
+ long wait = 5000;
queue = new LinkedList<>(stateUpdateQueue.peekElements(1000, wait, (x) -> true));
} catch (AlreadyClosedException e) {
if (isClosed()) {
@@ -358,14 +353,9 @@ public class Overseer implements SolrCloseable {
return;
}
// if an event comes in the next *ms batch it together
- int wait = 0;
-// if (zkStateWriter.getUpdatesToWrite().isEmpty()) {
-// wait = 10000;
-// } else {
-// wait = 0;
-// }
+ int wait = 10;
queue = new LinkedList<>(stateUpdateQueue.peekElements(100, wait, node -> !processedNodes.contains(node)));
- if (loopCnt >= 3) {
+ if (loopCnt >= 1) {
break;
}
loopCnt++;
diff --git a/solr/core/src/java/org/apache/solr/cloud/OverseerTaskProcessor.java b/solr/core/src/java/org/apache/solr/cloud/OverseerTaskProcessor.java
index 8be9b40..066d3aa 100644
--- a/solr/core/src/java/org/apache/solr/cloud/OverseerTaskProcessor.java
+++ b/solr/core/src/java/org/apache/solr/cloud/OverseerTaskProcessor.java
@@ -320,7 +320,7 @@ public class OverseerTaskProcessor implements Runnable, Closeable {
Set<Map.Entry<String, QueueEvent>> entrySet = completedTasks.entrySet();
AtomicBoolean sessionExpired = new AtomicBoolean();
AtomicBoolean interrupted = new AtomicBoolean();
- try (ParWork work = new ParWork(this)) {
+ try (ParWork work = new ParWork(this, true, true)) {
for (Map.Entry<String, QueueEvent> entry : entrySet) {
work.collect("cleanWorkQueue", ()->{
try {
diff --git a/solr/core/src/java/org/apache/solr/cloud/ZkDistributedQueue.java b/solr/core/src/java/org/apache/solr/cloud/ZkDistributedQueue.java
index 33cf8bf..c8da9c4 100644
--- a/solr/core/src/java/org/apache/solr/cloud/ZkDistributedQueue.java
+++ b/solr/core/src/java/org/apache/solr/cloud/ZkDistributedQueue.java
@@ -478,9 +478,7 @@ public class ZkDistributedQueue implements DistributedQueue {
break;
}
- TreeSet<String> existingChildren = knownChildren;
-
- while (existingChildren == knownChildren && existingChildren.size() == 0) {
+ while (foundChildren.size() == 0) {
try {
changed.await(250, TimeUnit.MILLISECONDS);
} catch (InterruptedException e) {
@@ -490,6 +488,13 @@ public class ZkDistributedQueue implements DistributedQueue {
if (timeout.hasTimedOut() || zookeeper.isClosed() || !zookeeper.isConnected()) {
return Collections.emptyList();
}
+
+ for (String child : knownChildren) {
+ if (acceptFilter.test(child)) {
+ foundChildren.add(child);
+ }
+ }
+
}
} finally {
if (updateLock.isHeldByCurrentThread()) {
diff --git a/solr/core/src/java/org/apache/solr/cloud/overseer/ZkStateWriter.java b/solr/core/src/java/org/apache/solr/cloud/overseer/ZkStateWriter.java
index 403d0cd..1233de6 100644
--- a/solr/core/src/java/org/apache/solr/cloud/overseer/ZkStateWriter.java
+++ b/solr/core/src/java/org/apache/solr/cloud/overseer/ZkStateWriter.java
@@ -297,7 +297,7 @@ public class ZkStateWriter {
// }
if (col != null && col.getZNodeVersion() > finalPrevVersion) {
- if (log.isDebugEnabled()) log.debug("Waited for ver: {}", col.getZNodeVersion());
+ if (log.isDebugEnabled()) log.debug("Waited for ver: {}", col.getZNodeVersion() + 1);
// System.out.println("found the version");
return true;
}
@@ -315,6 +315,7 @@ public class ZkStateWriter {
if (failedUpdates.size() > 0) {
+ log.warn("Some collection updates failed {} logging last exception", failedUpdates, lastFailedException);
failedUpdates.clear();
throw new SolrException(SolrException.ErrorCode.SERVER_ERROR, lastFailedException);
}
diff --git a/solr/core/src/java/org/apache/solr/update/UpdateHandler.java b/solr/core/src/java/org/apache/solr/update/UpdateHandler.java
index bc9002d..bc41035 100644
--- a/solr/core/src/java/org/apache/solr/update/UpdateHandler.java
+++ b/solr/core/src/java/org/apache/solr/update/UpdateHandler.java
@@ -160,6 +160,19 @@ UpdateHandler implements SolrInfoBean, Closeable {
}
ourUpdateLog.init(ulogPluginInfo);
ourUpdateLog.init(this, core);
+ } else if (updateLog == null && !skipUpdateLog && core.getCoreContainer().isZooKeeperAware()) {
+ DirectoryFactory dirFactory = core.getDirectoryFactory();
+
+ ourUpdateLog = new UpdateLog();
+
+ if (!core.isReloaded() && !dirFactory.isPersistent()) {
+ ourUpdateLog.clearLog(core, ulogPluginInfo);
+ }
+
+ if (log.isInfoEnabled()) {
+ log.info("Using UpdateLog implementation: {}", ourUpdateLog.getClass().getName());
+ }
+ ourUpdateLog.init(this, core);
} else {
ourUpdateLog = updateLog;
}
diff --git a/solr/core/src/java/org/apache/solr/update/UpdateLog.java b/solr/core/src/java/org/apache/solr/update/UpdateLog.java
index fb737f4..4f126be 100644
--- a/solr/core/src/java/org/apache/solr/update/UpdateLog.java
+++ b/solr/core/src/java/org/apache/solr/update/UpdateLog.java
@@ -203,8 +203,8 @@ public class UpdateLog implements PluginInfoInitialized, SolrMetricProducer {
protected final int numDeletesToKeep = 1000;
protected final int numDeletesByQueryToKeep = 100;
protected int numRecordsToKeep;
- protected int maxNumLogsToKeep;
- protected int numVersionBuckets; // This should only be used to initialize VersionInfo... the actual number of buckets may be rounded up to a power of two.
+ protected volatile int maxNumLogsToKeep;
+ protected volatile int numVersionBuckets = 65536; // This should only be used to initialize VersionInfo... the actual number of buckets may be rounded up to a power of two.
protected Long maxVersionFromIndex = null;
protected boolean existOldBufferLog = false;
diff --git a/solr/core/src/java/org/apache/solr/update/VersionInfo.java b/solr/core/src/java/org/apache/solr/update/VersionInfo.java
index efa3676..cbc9a4e 100644
--- a/solr/core/src/java/org/apache/solr/update/VersionInfo.java
+++ b/solr/core/src/java/org/apache/solr/update/VersionInfo.java
@@ -20,6 +20,7 @@ import java.io.IOException;
import java.lang.invoke.MethodHandles;
import java.util.Map;
import java.util.concurrent.locks.ReadWriteLock;
+import java.util.concurrent.locks.ReentrantLock;
import java.util.concurrent.locks.ReentrantReadWriteLock;
import org.apache.lucene.index.LeafReader;
@@ -199,35 +200,33 @@ public class VersionInfo {
// int h = hash + (hash >>> 8) + (hash >>> 16) + (hash >>> 24);
// Assume good hash codes for now.
int slot;
- buckUpdateLock.readLock().lock();
- VersionBucket bucket;
+ buckUpdateLock.writeLock().lock();
try {
+ VersionBucket bucket;
+
slot = hash & (buckets.length - 1);
bucket = buckets[slot];
- } finally {
- buckUpdateLock.readLock().unlock();
- }
- if (bucket == null) {
- buckUpdateLock.writeLock().lock();
- try {
+ if (bucket == null) {
+
bucket = buckets[slot];
if (bucket == null) {
if (versionBucketLockTimeoutMs > 0) {
- bucket= new TimedVersionBucket();
+ bucket = new TimedVersionBucket();
} else {
- bucket= new VersionBucket();
+ bucket = new VersionBucket();
}
bucket.updateHighest(highestVersion);
buckets[slot] = bucket;
}
- } finally {
- buckUpdateLock.writeLock().unlock();
+
}
- }
- return bucket;
+ return bucket;
+ } finally {
+ buckUpdateLock.writeLock().unlock();
+ }
}
public Long lookupVersion(BytesRef idBytes) {
diff --git a/solr/core/src/test/org/apache/solr/cloud/TestCloudRecovery2.java b/solr/core/src/test/org/apache/solr/cloud/TestCloudRecovery2.java
index 3aa4c43..b355d63 100644
--- a/solr/core/src/test/org/apache/solr/cloud/TestCloudRecovery2.java
+++ b/solr/core/src/test/org/apache/solr/cloud/TestCloudRecovery2.java
@@ -48,7 +48,7 @@ public class TestCloudRecovery2 extends SolrCloudTestCase {
CollectionAdminRequest
.createCollection(COLLECTION, "config", 1,2)
- .setMaxShardsPerNode(2)
+ .setMaxShardsPerNode(100)
.process(cluster.getSolrClient());
}
@@ -59,8 +59,6 @@ public class TestCloudRecovery2 extends SolrCloudTestCase {
try (Http2SolrClient client1 = SolrTestCaseJ4.getHttpSolrClient(node1.getBaseUrl().toString())) {
node2.stop();
- cluster.waitForJettyToStop(node2);
- waitForState("", COLLECTION, (liveNodes, collectionState) -> liveNodes.size() == 1);
UpdateRequest req = new UpdateRequest();
for (int i = 0; i < 100; i++) {
@@ -69,8 +67,8 @@ public class TestCloudRecovery2 extends SolrCloudTestCase {
req.commit(client1, COLLECTION);
node2.start();
- cluster.waitForNode(node2, 10);
- waitForState("", COLLECTION, clusterShape(1, 2));
+
+ cluster.waitForActiveCollection(COLLECTION, 1, 2);
try (Http2SolrClient client = SolrTestCaseJ4.getHttpSolrClient(node2.getBaseUrl().toString())) {
long numFound = client.query(COLLECTION, new SolrQuery("q","*:*", "distrib", "false")).getResults().getNumFound();
@@ -91,8 +89,7 @@ public class TestCloudRecovery2 extends SolrCloudTestCase {
//
node2.stop();
- cluster.waitForJettyToStop(node2);
- waitForState("", COLLECTION, (liveNodes, collectionState) -> liveNodes.size() == 1);
+
new UpdateRequest().add("id", "1", "num", "20")
.commit(client1, COLLECTION);
@@ -100,7 +97,9 @@ public class TestCloudRecovery2 extends SolrCloudTestCase {
assertEquals("20", v.toString());
node2.start();
- cluster.waitForNode(node2, 10);
+
+ cluster.waitForActiveCollection(COLLECTION, 1, 2);
+
waitForState("", COLLECTION, clusterShape(1, 2));
try (Http2SolrClient client = SolrTestCaseJ4.getHttpSolrClient(node2.getBaseUrl().toString())) {
v = client.query(COLLECTION, new SolrQuery("q","id:1", "distrib", "false")).getResults().get(0).get("num");
@@ -108,8 +107,6 @@ public class TestCloudRecovery2 extends SolrCloudTestCase {
}
node2.stop();
- cluster.waitForJettyToStop(node2);
- waitForState("", COLLECTION, (liveNodes, collectionState) -> liveNodes.size() == 1);
new UpdateRequest().add("id", "1", "num", "30")
.commit(client1, COLLECTION);
@@ -117,8 +114,8 @@ public class TestCloudRecovery2 extends SolrCloudTestCase {
SolrTestCaseJ4. assertEquals("30", v.toString());
node2.start();
- cluster.waitForNode(node2, 10);
- waitForState("", COLLECTION, clusterShape(1, 2));
+
+ cluster.waitForActiveCollection(COLLECTION, 1, 2);
try (Http2SolrClient client = SolrTestCaseJ4.getHttpSolrClient(node2.getBaseUrl().toString())) {
v = client.query(COLLECTION, new SolrQuery("q","id:1", "distrib", "false")).getResults().get(0).get("num");
@@ -129,15 +126,15 @@ public class TestCloudRecovery2 extends SolrCloudTestCase {
}
node1.stop();
- cluster.waitForJettyToStop(node1);
waitForState("", COLLECTION, (liveNodes, collectionState) -> {
Replica leader = collectionState.getLeader("shard1");
return leader != null && leader.getNodeName().equals(node2.getNodeName());
});
node1.start();
- cluster.waitForNode(node1, 10);
- waitForState("", COLLECTION, clusterShape(1, 2));
+
+ cluster.waitForActiveCollection(COLLECTION, 1, 2);
+
try (Http2SolrClient client = SolrTestCaseJ4.getHttpSolrClient(node1.getBaseUrl().toString())) {
Object v = client.query(COLLECTION, new SolrQuery("q","id:1", "distrib", "false")).getResults().get(0).get("num");
assertEquals("30", v.toString());
diff --git a/solr/core/src/test/org/apache/solr/cloud/TestPrepRecovery.java b/solr/core/src/test/org/apache/solr/cloud/TestPrepRecovery.java
index f6fae16..8f90b77 100644
--- a/solr/core/src/test/org/apache/solr/cloud/TestPrepRecovery.java
+++ b/solr/core/src/test/org/apache/solr/cloud/TestPrepRecovery.java
@@ -17,17 +17,13 @@
package org.apache.solr.cloud;
-import java.util.concurrent.TimeUnit;
-
import org.apache.solr.client.solrj.embedded.JettySolrRunner;
import org.apache.solr.client.solrj.impl.CloudHttp2SolrClient;
-import org.apache.solr.client.solrj.impl.CloudSolrClient;
import org.apache.solr.client.solrj.request.CollectionAdminRequest;
import org.apache.solr.common.cloud.Replica;
import org.apache.solr.util.TestInjection;
import org.junit.AfterClass;
import org.junit.BeforeClass;
-import org.junit.Ignore;
import org.junit.Test;
/**
@@ -62,11 +58,9 @@ public class TestPrepRecovery extends SolrCloudTestCase {
String collectionName = "testLeaderUnloaded";
CollectionAdminRequest.createCollection(collectionName, 1, 2)
+ .setMaxShardsPerNode(100)
.process(solrClient);
- waitForState("Expected collection: testLeaderUnloaded to be live with 1 shard and 2 replicas",
- collectionName, clusterShape(1, 2));
-
JettySolrRunner newNode = cluster.startJettySolrRunner();
String newNodeName = newNode.getNodeName();
@@ -97,6 +91,7 @@ public class TestPrepRecovery extends SolrCloudTestCase {
String collectionName = "testLeaderNotResponding";
CollectionAdminRequest.createCollection(collectionName, 1, 1)
+ .setMaxShardsPerNode(100)
.process(solrClient);
TestInjection.prepRecoveryOpPauseForever = "true:100";
diff --git a/solr/core/src/test/org/apache/solr/cloud/api/collections/CollectionTooManyReplicasTest.java b/solr/core/src/test/org/apache/solr/cloud/api/collections/CollectionTooManyReplicasTest.java
index 4352bf8..99fa3d2 100644
--- a/solr/core/src/test/org/apache/solr/cloud/api/collections/CollectionTooManyReplicasTest.java
+++ b/solr/core/src/test/org/apache/solr/cloud/api/collections/CollectionTooManyReplicasTest.java
@@ -16,10 +16,6 @@
*/
package org.apache.solr.cloud.api.collections;
-import java.util.List;
-import java.util.Map;
-import java.util.stream.Collectors;
-
import org.apache.lucene.util.LuceneTestCase.Slow;
import org.apache.solr.client.solrj.embedded.JettySolrRunner;
import org.apache.solr.client.solrj.request.CollectionAdminRequest;
@@ -28,11 +24,13 @@ import org.apache.solr.common.cloud.DocCollection;
import org.apache.solr.common.cloud.Replica;
import org.apache.solr.common.cloud.Slice;
import org.apache.zookeeper.KeeperException;
-import org.junit.Before;
import org.junit.BeforeClass;
-import org.junit.Ignore;
import org.junit.Test;
+import java.util.List;
+import java.util.Map;
+import java.util.stream.Collectors;
+
@Slow
public class CollectionTooManyReplicasTest extends SolrCloudTestCase {
@@ -43,11 +41,6 @@ public class CollectionTooManyReplicasTest extends SolrCloudTestCase {
.configure();
}
- @Before
- public void deleteCollections() throws Exception {
- cluster.deleteAllCollections();
- }
-
@Test
public void testAddShard() throws Exception {
diff --git a/solr/solrj/src/java/org/apache/solr/common/cloud/ZkStateReader.java b/solr/solrj/src/java/org/apache/solr/common/cloud/ZkStateReader.java
index f7a011a..3437007 100644
--- a/solr/solrj/src/java/org/apache/solr/common/cloud/ZkStateReader.java
+++ b/solr/solrj/src/java/org/apache/solr/common/cloud/ZkStateReader.java
@@ -838,14 +838,6 @@ public class ZkStateReader implements SolrCloseable {
} catch (NullPointerException e) {
// okay
}
- if (notifications != null) {
- try {
- boolean success = notifications.awaitTermination(1, TimeUnit.SECONDS);
- if (!success) notifications.shutdownNow();
- } catch (InterruptedException e) {
- ParWork.propagateInterrupt(e);
- }
- }
} finally {
assert ObjectReleaseTracker.release(this);
diff --git a/solr/solrj/src/test/org/apache/solr/client/solrj/impl/CloudHttp2SolrClientTest.java b/solr/solrj/src/test/org/apache/solr/client/solrj/impl/CloudHttp2SolrClientTest.java
index 95c0c16..f6001ee 100644
--- a/solr/solrj/src/test/org/apache/solr/client/solrj/impl/CloudHttp2SolrClientTest.java
+++ b/solr/solrj/src/test/org/apache/solr/client/solrj/impl/CloudHttp2SolrClientTest.java
@@ -914,7 +914,7 @@ public class CloudHttp2SolrClientTest extends SolrCloudTestCase {
// NOTE: don't use our stale_client for this -- don't tip it off of a collection change
.process(cluster.getSolrClient()).getStatus());
- cluster.waitForActiveCollection(COL, 1, 1);
+ cluster.waitForActiveCollection(COL, 1, 1, true);
// attempt a (direct) update that should succeed in spite of cached cluster state
// pointing solely to a node that's no longer part of our collection...
diff --git a/solr/test-framework/src/java/org/apache/solr/cloud/MiniSolrCloudCluster.java b/solr/test-framework/src/java/org/apache/solr/cloud/MiniSolrCloudCluster.java
index 23d9394..db32fa2 100644
--- a/solr/test-framework/src/java/org/apache/solr/cloud/MiniSolrCloudCluster.java
+++ b/solr/test-framework/src/java/org/apache/solr/cloud/MiniSolrCloudCluster.java
@@ -920,10 +920,14 @@ public class MiniSolrCloudCluster {
}
throw new SolrException(ErrorCode.NOT_FOUND, "No open Overseer found");
}
-
+
public void waitForActiveCollection(String collection, long wait, TimeUnit unit, int shards, int totalReplicas) {
+ waitForActiveCollection(collection, wait, unit, shards, totalReplicas, false);
+ }
+
+ public void waitForActiveCollection(String collection, long wait, TimeUnit unit, int shards, int totalReplicas, boolean exact) {
log.info("waitForActiveCollection: {}", collection);
- CollectionStatePredicate predicate = BaseCloudSolrClient.expectedShardsAndActiveReplicas(shards, totalReplicas, false);
+ CollectionStatePredicate predicate = BaseCloudSolrClient.expectedShardsAndActiveReplicas(shards, totalReplicas, exact);
AtomicReference<DocCollection> state = new AtomicReference<>();
AtomicReference<Set<String>> liveNodesLastSeen = new AtomicReference<>();
@@ -944,7 +948,11 @@ public class MiniSolrCloudCluster {
public void waitForActiveCollection(String collection, int shards, int totalReplicas) {
waitForActiveCollection(collection, 10, TimeUnit.SECONDS, shards, totalReplicas);
}
-
+
+ public void waitForActiveCollection(String collection, int shards, int totalReplicas, boolean exact) {
+ waitForActiveCollection(collection, 10, TimeUnit.SECONDS, shards, totalReplicas, exact);
+ }
+
public void waitForRemovedCollection(String collection) {
try {
getSolrClient().waitForState(collection, 10, TimeUnit.SECONDS, (n, c) -> {