You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@lucene.apache.org by ma...@apache.org on 2021/01/24 02:25:06 UTC
[lucene-solr] 01/01: wip
This is an automated email from the ASF dual-hosted git repository.
markrmiller pushed a commit to branch reference_impl_dev
in repository https://gitbox.apache.org/repos/asf/lucene-solr.git
commit 5619dc6a0608885fb580dea43ecfe60b00ed1d9f
Author: markrmiller@gmail.com <ma...@gmail.com>
AuthorDate: Sat Jan 23 20:24:43 2021 -0600
wip
---
.../apache/lucene/store/NRTCachingDirectory.java | 4 +-
solr/bin/solr | 2 +-
.../java/org/apache/solr/cloud/LeaderElector.java | 2 +
.../src/java/org/apache/solr/cloud/Overseer.java | 27 +--
.../org/apache/solr/cloud/OverseerTaskQueue.java | 2 +
.../org/apache/solr/cloud/RecoveryStrategy.java | 35 ++--
.../java/org/apache/solr/cloud/ZkController.java | 49 +----
.../java/org/apache/solr/cloud/ZkShardTerms.java | 42 ++--
.../apache/solr/cloud/overseer/ZkStateWriter.java | 218 ++++++++++++---------
.../apache/solr/core/CachingDirectoryFactory.java | 2 +-
.../java/org/apache/solr/core/CoreContainer.java | 97 ++++-----
.../apache/solr/core/CorePropertiesLocator.java | 4 +-
.../src/java/org/apache/solr/core/SolrCore.java | 24 +--
.../src/java/org/apache/solr/core/SolrCores.java | 2 +-
.../java/org/apache/solr/handler/IndexFetcher.java | 15 +-
.../solr/handler/admin/CoreAdminOperation.java | 8 +-
.../apache/solr/handler/admin/PrepRecoveryOp.java | 4 +-
.../handler/component/RealTimeGetComponent.java | 6 +-
.../apache/solr/schema/ZkIndexSchemaReader.java | 2 +
.../org/apache/solr/servlet/SolrQoSFilter.java | 86 ++++----
.../apache/solr/update/DefaultSolrCoreState.java | 4 +-
.../src/java/org/apache/solr/update/PeerSync.java | 74 +++----
.../org/apache/solr/update/PeerSyncWithLeader.java | 9 +-
.../java/org/apache/solr/update/SolrCoreState.java | 4 +-
.../org/apache/solr/update/SolrIndexSplitter.java | 2 +-
.../org/apache/solr/update/SolrIndexWriter.java | 8 +-
.../src/java/org/apache/solr/update/UpdateLog.java | 12 +-
.../org/apache/solr/update/UpdateShardHandler.java | 2 +-
.../processor/DistributedZkUpdateProcessor.java | 8 +-
.../solr/cloud/MissingSegmentRecoveryTest.java | 2 +
solr/server/etc/jetty-https.xml | 2 +-
solr/server/etc/jetty-https8.xml | 5 +
solr/server/etc/jetty.xml | 4 -
solr/server/resources/log4j2.xml | 4 +-
.../solr/client/solrj/impl/Http2SolrClient.java | 8 +-
.../src/java/org/apache/solr/common/ParWork.java | 16 +-
.../apache/solr/common/PerThreadExecService.java | 20 +-
.../apache/solr/common/cloud/SolrZooKeeper.java | 36 ++--
.../apache/solr/common/cloud/ZkStateReader.java | 96 +++++----
.../java/org/apache/solr/common/util/SysStats.java | 23 ++-
.../org/apache/zookeeper/ZooKeeperExposed.java | 2 +-
41 files changed, 474 insertions(+), 498 deletions(-)
diff --git a/lucene/core/src/java/org/apache/lucene/store/NRTCachingDirectory.java b/lucene/core/src/java/org/apache/lucene/store/NRTCachingDirectory.java
index ec7e5b9..fee7f51 100644
--- a/lucene/core/src/java/org/apache/lucene/store/NRTCachingDirectory.java
+++ b/lucene/core/src/java/org/apache/lucene/store/NRTCachingDirectory.java
@@ -212,7 +212,7 @@ public class NRTCachingDirectory extends FilterDirectory implements Accountable
// it for defensive reasons... or in case the app is
// doing something custom (creating outputs directly w/o
// using IndexWriter):
- if (Boolean.getBoolean("solr.nrtDirSync")) { // nocommit)
+ // if (Boolean.getBoolean("solr.nrtDirSync")) { // nocommit)
IOUtils.close(() -> {
if (!closed.getAndSet(true)) {
for (String fileName : cacheDirectory.listAll()) {
@@ -220,7 +220,7 @@ public class NRTCachingDirectory extends FilterDirectory implements Accountable
}
}
}, cacheDirectory, in);
- }
+ // }
}
/** Subclass can override this to customize logic; return
diff --git a/solr/bin/solr b/solr/bin/solr
index 3216d78..49cfa83 100755
--- a/solr/bin/solr
+++ b/solr/bin/solr
@@ -1904,7 +1904,7 @@ if [ -z ${GC_LOG_OPTS+x} ]; then
'-XX:+PrintGCDateStamps' '-XX:+PrintGCTimeStamps' '-XX:+PrintTenuringDistribution' \
'-XX:+PrintGCApplicationStoppedTime')
else
- GC_LOG_OPTS=('-Xlog:gc*')
+ GC_LOG_OPTS=('')
fi
else
GC_LOG_OPTS=($GC_LOG_OPTS)
diff --git a/solr/core/src/java/org/apache/solr/cloud/LeaderElector.java b/solr/core/src/java/org/apache/solr/cloud/LeaderElector.java
index 21b2a41..184356c 100644
--- a/solr/core/src/java/org/apache/solr/cloud/LeaderElector.java
+++ b/solr/core/src/java/org/apache/solr/cloud/LeaderElector.java
@@ -573,6 +573,8 @@ public class LeaderElector implements Closeable {
SolrZooKeeper zk = zkClient.getSolrZooKeeper();
try {
zk.removeWatches(watchedNode, this, WatcherType.Any, true);
+ } catch (KeeperException.NoWatcherException e) {
+
} catch (Exception e) {
log.info("could not remove watch {} {}", e.getClass().getSimpleName(), e.getMessage());
}
diff --git a/solr/core/src/java/org/apache/solr/cloud/Overseer.java b/solr/core/src/java/org/apache/solr/cloud/Overseer.java
index 6dac590..754a970 100644
--- a/solr/core/src/java/org/apache/solr/cloud/Overseer.java
+++ b/solr/core/src/java/org/apache/solr/cloud/Overseer.java
@@ -30,6 +30,7 @@ import org.apache.solr.cloud.overseer.OverseerAction;
import org.apache.solr.cloud.overseer.ZkStateWriter;
import org.apache.solr.common.AlreadyClosedException;
import org.apache.solr.common.ParWork;
+import org.apache.solr.common.ParWorkExecutor;
import org.apache.solr.common.SolrCloseable;
import org.apache.solr.common.SolrException;
import org.apache.solr.common.SolrThread;
@@ -45,6 +46,7 @@ import org.apache.solr.common.util.IOUtils;
import org.apache.solr.common.util.NamedList;
import org.apache.solr.common.util.ObjectReleaseTracker;
import org.apache.solr.common.util.Pair;
+import org.apache.solr.common.util.SysStats;
import org.apache.solr.core.CloudConfig;
import org.apache.solr.core.CoreContainer;
import org.apache.solr.handler.admin.CollectionsHandler;
@@ -53,7 +55,6 @@ import org.apache.solr.update.UpdateShardHandler;
import org.apache.zookeeper.KeeperException;
import org.apache.zookeeper.WatchedEvent;
import org.apache.zookeeper.Watcher;
-import org.apache.zookeeper.data.Stat;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
@@ -71,7 +72,8 @@ import java.util.List;
import java.util.Map;
import java.util.Set;
import java.util.concurrent.ExecutorService;
-import java.util.concurrent.SynchronousQueue;
+import java.util.concurrent.LinkedBlockingQueue;
+import java.util.concurrent.TimeUnit;
import java.util.concurrent.locks.ReentrantLock;
import java.util.function.BiConsumer;
@@ -158,7 +160,7 @@ public class Overseer implements SolrCloseable {
private volatile boolean initedHttpClient = false;
private volatile QueueWatcher queueWatcher;
private volatile WorkQueueWatcher.CollectionWorkQueueWatcher collectionQueueWatcher;
- private volatile ExecutorService taskExecutor;
+ private volatile ParWorkExecutor taskExecutor;
public boolean isDone() {
return closeAndDone;
@@ -285,14 +287,10 @@ public class Overseer implements SolrCloseable {
//
// stateManagmentExecutor = ParWork.getParExecutorService("stateManagmentExecutor",
// 1, 1, 3000, new SynchronousQueue());
- taskExecutor = ParWork.getParExecutorService("overseerTaskExecutor",
- 3, 32, 1000, new SynchronousQueue());
+ taskExecutor = (ParWorkExecutor) ParWork.getParExecutorService("overseerTaskExecutor",
+ SysStats.PROC_COUNT, SysStats.PROC_COUNT * 2, 3000, new LinkedBlockingQueue<>(1024));
+ taskExecutor.prestartAllCoreThreads();
-// try {
-// if (context != null) context.close();
-// } catch (Exception e) {
-// log.error("", e);
-// }
if (overseerOnlyClient == null && !closeAndDone && !initedHttpClient) {
overseerOnlyClient = new Http2SolrClient.Builder().idleTimeout(60000).connectionTimeout(5000).markInternalRequest().build();
overseerOnlyClient.enableCloseLock();
@@ -336,7 +334,7 @@ public class Overseer implements SolrCloseable {
ThreadGroup ccTg = new ThreadGroup("Overseer collection creation process.");
- this.zkStateWriter = new ZkStateWriter(zkController.getZkStateReader(), stats);
+ this.zkStateWriter = new ZkStateWriter(zkController.getZkStateReader(), stats, this);
//systemCollectionCompatCheck(new StringBiConsumer());
queueWatcher = new WorkQueueWatcher(getCoreContainer());
@@ -538,7 +536,10 @@ public class Overseer implements SolrCloseable {
}
if (taskExecutor != null) {
- taskExecutor.shutdownNow();
+ try {
+ taskExecutor.awaitTermination(5000, TimeUnit.SECONDS);
+ } catch (InterruptedException e) {
+ }
}
}
@@ -813,6 +814,8 @@ public class Overseer implements SolrCloseable {
this.closed = true;
try {
zkController.getZkClient().getSolrZooKeeper().removeWatches(path, this, WatcherType.Data, true);
+ } catch (KeeperException.NoWatcherException e) {
+
} catch (Exception e) {
log.info("could not remove watch {} {}", e.getClass().getSimpleName(), e.getMessage());
}
diff --git a/solr/core/src/java/org/apache/solr/cloud/OverseerTaskQueue.java b/solr/core/src/java/org/apache/solr/cloud/OverseerTaskQueue.java
index 778336a..5eca2f4 100644
--- a/solr/core/src/java/org/apache/solr/cloud/OverseerTaskQueue.java
+++ b/solr/core/src/java/org/apache/solr/cloud/OverseerTaskQueue.java
@@ -226,6 +226,8 @@ public class OverseerTaskQueue extends ZkDistributedQueue {
this.closed = true;
try {
zkClient.getSolrZooKeeper().removeWatches(path, this, WatcherType.Data, true);
+ } catch (KeeperException.NoWatcherException e) {
+
} catch (Exception e) {
log.info("could not remove watch {} {}", e.getClass().getSimpleName(), e.getMessage());
}
diff --git a/solr/core/src/java/org/apache/solr/cloud/RecoveryStrategy.java b/solr/core/src/java/org/apache/solr/cloud/RecoveryStrategy.java
index ee085b2..cce536d 100644
--- a/solr/core/src/java/org/apache/solr/cloud/RecoveryStrategy.java
+++ b/solr/core/src/java/org/apache/solr/cloud/RecoveryStrategy.java
@@ -224,15 +224,14 @@ public class RecoveryStrategy implements Runnable, Closeable {
return leaderprops.getCoreUrl();
}
- final private IndexFetcher.IndexFetchResult replicate(Replica leaderprops)
+ final private IndexFetcher.IndexFetchResult replicate(Replica leader)
throws SolrServerException, IOException {
- log.info("Attempting to replicate from [{}].", leaderprops);
+ log.info("Attempting to replicate from [{}].", leader);
String leaderUrl;
// send commit
try {
- Replica leader = zkController.getZkStateReader().getLeaderRetry(coreDescriptor.getCollectionName(), coreDescriptor.getCloudDescriptor().getShardId(), 1500, false);
leaderUrl = leader.getCoreUrl();
commitOnLeader(leaderUrl);
} catch (Exception e) {
@@ -350,7 +349,7 @@ public class RecoveryStrategy implements Runnable, Closeable {
// expected
}
- Replica leader = zkController.getZkStateReader().getLeaderRetry(coreDescriptor.getCollectionName(), coreDescriptor.getCloudDescriptor().getShardId(), 1500, false);
+ Replica leader = zkController.getZkStateReader().getLeaderRetry(coreDescriptor.getCollectionName(), coreDescriptor.getCloudDescriptor().getShardId(), 3000, false);
if (leader != null && leader.getName().equals(coreName)) {
log.info("We are the leader, STOP recovery");
@@ -365,10 +364,10 @@ public class RecoveryStrategy implements Runnable, Closeable {
boolean successfulRecovery;
if (this.coreDescriptor.getCloudDescriptor().requiresTransactionLog()) {
if (log.isDebugEnabled()) log.debug("Sync or replica recovery");
- successfulRecovery = doSyncOrReplicateRecovery(core);
+ successfulRecovery = doSyncOrReplicateRecovery(core, leader);
} else {
if (log.isDebugEnabled()) log.debug("Replicate only recovery");
- successfulRecovery = doReplicateOnlyRecovery(core);
+ successfulRecovery = doReplicateOnlyRecovery(core, leader);
}
if (successfulRecovery) {
@@ -384,7 +383,7 @@ public class RecoveryStrategy implements Runnable, Closeable {
}
}
- final private boolean doReplicateOnlyRecovery(SolrCore core) throws Exception {
+ final private boolean doReplicateOnlyRecovery(SolrCore core, Replica leader) throws Exception {
boolean successfulRecovery = false;
// if (core.getUpdateHandler().getUpdateLog() != null) {
@@ -397,15 +396,18 @@ public class RecoveryStrategy implements Runnable, Closeable {
log.info("Publishing state of core [{}] as recovering {}", coreName, "doReplicateOnlyRecovery");
zkController.publish(this.coreDescriptor, Replica.State.RECOVERING);
-
+ int cnt = 0;
while (!successfulRecovery && !isClosed()) { // don't use interruption or
// it will close channels
// though
+ cnt++;
try {
CloudDescriptor cloudDesc = this.coreDescriptor.getCloudDescriptor();
- Replica leader;
+
try {
- leader = zkStateReader.getLeaderRetry(cloudDesc.getCollectionName(), cloudDesc.getShardId(), 1500, false);
+ if (cnt > 1) {
+ leader = zkStateReader.getLeaderRetry(cloudDesc.getCollectionName(), cloudDesc.getShardId(), 3000, false);
+ }
if (leader != null && leader.getName().equals(coreName)) {
log.info("We are the leader, STOP recovery");
@@ -503,16 +505,9 @@ public class RecoveryStrategy implements Runnable, Closeable {
}
// TODO: perhaps make this grab a new core each time through the loop to handle core reloads?
- public final boolean doSyncOrReplicateRecovery(SolrCore core) throws Exception {
+ public final boolean doSyncOrReplicateRecovery(SolrCore core, Replica leader) throws Exception {
log.info("Do peersync or replication recovery core={} collection={}", coreName, coreDescriptor.getCollectionName());
- Replica leader = zkController.getZkStateReader().getLeaderRetry(coreDescriptor.getCollectionName(), coreDescriptor.getCloudDescriptor().getShardId(), 1500);
- if (leader != null && leader.getName().equals(coreName)) {
- log.info("We are the leader, STOP recovery");
- close = true;
- throw new AlreadyClosedException();
- }
-
log.info("Publishing state of core [{}] as recovering {}", coreName, "doSyncOrReplicateRecovery");
zkController.publish(this.coreDescriptor, Replica.State.RECOVERING);
@@ -602,8 +597,10 @@ public class RecoveryStrategy implements Runnable, Closeable {
while (!successfulRecovery && !isClosed()) {
try {
CloudDescriptor cloudDesc = this.coreDescriptor.getCloudDescriptor();
- leader = zkStateReader.getLeaderRetry(cloudDesc.getCollectionName(), cloudDesc.getShardId(), 1500);
+ if (!firstTime) {
+ leader = zkStateReader.getLeaderRetry(cloudDesc.getCollectionName(), cloudDesc.getShardId(), 3000, false);
+ }
if (leader != null && leader.getName().equals(coreName)) {
log.info("We are the leader, STOP recovery");
close = true;
diff --git a/solr/core/src/java/org/apache/solr/cloud/ZkController.java b/solr/core/src/java/org/apache/solr/cloud/ZkController.java
index b2abf8f..e5dd8d3 100644
--- a/solr/core/src/java/org/apache/solr/cloud/ZkController.java
+++ b/solr/core/src/java/org/apache/solr/cloud/ZkController.java
@@ -170,7 +170,7 @@ public class ZkController implements Closeable, Runnable {
@Override
public void run() {
- disconnect(false);
+ disconnect(true);
}
public boolean isDcCalled() {
@@ -628,7 +628,7 @@ public class ZkController implements Closeable, Runnable {
}
public void disconnect(boolean publishDown) {
- if (log.isDebugEnabled()) log.debug("disconnect");
+ log.info("disconnect");
this.dcCalled = true;
try {
@@ -650,7 +650,6 @@ public class ZkController implements Closeable, Runnable {
}
return "PublishDown";
});
- closer.collect();
}
closer.collect(leaderElectors);
@@ -1432,7 +1431,7 @@ public class ZkController implements Closeable, Runnable {
// throw new AlreadyClosedException();
// }
- leader = zkStateReader.getLeaderRetry(collection, shardId, 1500, false);
+ leader = zkStateReader.getLeaderRetry(collection, shardId, 3000, false);
} catch (TimeoutException timeoutException) {
log.info("Timeout waiting to see leader, retry");
@@ -1569,44 +1568,6 @@ public class ZkController implements Closeable, Runnable {
}
}
- // timeoutms is the timeout for the first call to get the leader - there is then
- // a longer wait to make sure that leader matches our local state
- private String getLeader(final CloudDescriptor cloudDesc, int timeoutms) {
-
- String collection = cloudDesc.getCollectionName();
- String shardId = cloudDesc.getShardId();
- // rather than look in the cluster state file, we go straight to the zknodes
- // here, because on cluster restart there could be stale leader info in the
- // cluster state node that won't be updated for a moment
- String leaderUrl;
- try {
- leaderUrl = getLeaderProps(collection, cloudDesc.getShardId(), timeoutms)
- .getCoreUrl();
-
- zkStateReader.waitForState(collection, timeoutms * 2, TimeUnit.MILLISECONDS, (n, c) -> checkLeaderUrl(cloudDesc, leaderUrl, collection, shardId, leaderConflictResolveWait));
-
- } catch (Exception e) {
- ParWork.propagateInterrupt(e);
- throw new SolrException(ErrorCode.SERVER_ERROR, "Error getting leader from zk", e);
- }
- return leaderUrl;
- }
-
- private boolean checkLeaderUrl(CloudDescriptor cloudDesc, String leaderUrl, String collection, String shardId,
- int timeoutms) {
- // now wait until our currently cloud state contains the latest leader
- String clusterStateLeaderUrl;
- try {
- clusterStateLeaderUrl = zkStateReader.getLeaderUrl(collection, shardId, 10000);
-
- // leaderUrl = getLeaderProps(collection, cloudDesc.getShardId(), timeoutms).getCoreUrl();
- } catch (Exception e) {
- ParWork.propagateInterrupt(e);
- throw new SolrException(ErrorCode.SERVER_ERROR, e);
- }
- return clusterStateLeaderUrl != null;
- }
-
/**
* Get leader props directly from zk nodes.
* @throws SessionExpiredException on zk session expiration.
@@ -1656,6 +1617,7 @@ public class ZkController implements Closeable, Runnable {
Map<String, Object> props = new HashMap<>();
// we only put a subset of props into the leader node
props.put(ZkStateReader.NODE_NAME_PROP, getNodeName());
+ props.put(CORE_NAME_PROP, cd.getName());
Replica replica = new Replica(cd.getName(), props, collection, shardId, zkStateReader);
LeaderElector leaderElector;
@@ -1751,9 +1713,6 @@ public class ZkController implements Closeable, Runnable {
props.put(ZkStateReader.COLLECTION_PROP, collection);
props.put(ZkStateReader.REPLICA_TYPE, cd.getCloudDescriptor().getReplicaType().toString());
- if (numShards != null) {
- props.put(ZkStateReader.NUM_SHARDS_PROP, numShards.toString());
- }
try (SolrCore core = cc.getCore(cd.getName())) {
if (core != null && core.getDirectoryFactory().isSharedStorage()) {
// nocommit
diff --git a/solr/core/src/java/org/apache/solr/cloud/ZkShardTerms.java b/solr/core/src/java/org/apache/solr/cloud/ZkShardTerms.java
index d9eb9a3..2adfe5e 100644
--- a/solr/core/src/java/org/apache/solr/cloud/ZkShardTerms.java
+++ b/solr/core/src/java/org/apache/solr/cloud/ZkShardTerms.java
@@ -24,7 +24,6 @@ import java.util.HashMap;
import java.util.Map;
import java.util.Set;
import java.util.concurrent.ConcurrentHashMap;
-import java.util.concurrent.TimeoutException;
import java.util.concurrent.atomic.AtomicBoolean;
import java.util.concurrent.atomic.AtomicReference;
@@ -99,7 +98,7 @@ public class ZkShardTerms implements Closeable {
void close();
}
- public ZkShardTerms(String collection, String shard, SolrZkClient zkClient) throws IOException{
+ public ZkShardTerms(String collection, String shard, SolrZkClient zkClient) throws IOException, KeeperException {
this.znodePath = ZkStateReader.COLLECTIONS_ZKNODE + "/" + collection + "/terms/" + shard;
this.collection = collection;
this.shard = shard;
@@ -223,7 +222,7 @@ public class ZkShardTerms implements Closeable {
*/
void registerTerm(String coreNodeName) throws KeeperException, InterruptedException {
ShardTerms newTerms;
- while ( (newTerms = terms.get().registerTerm(coreNodeName)) != null) {
+ while ((newTerms = terms.get().registerTerm(coreNodeName)) != null) {
if (forceSaveTerms(newTerms)) break;
}
}
@@ -324,6 +323,7 @@ public class ZkShardTerms implements Closeable {
*/
private boolean saveTerms(ShardTerms newTerms) throws KeeperException, InterruptedException {
byte[] znodeData = Utils.toJSON(newTerms);
+
try {
Stat stat = zkClient.setData(znodePath, znodeData, newTerms.getVersion(), true);
ShardTerms newShardTerms = new ShardTerms(newTerms, stat.getVersion());
@@ -331,7 +331,12 @@ public class ZkShardTerms implements Closeable {
if (log.isDebugEnabled()) log.debug("Successful update of terms at {} to {}", znodePath, newTerms);
return true;
} catch (KeeperException.BadVersionException e) {
- log.info("Failed to save terms, version is not a match, retrying version={}", newTerms.getVersion());
+ int foundVersion = -1;
+ Stat stat = zkClient.exists(znodePath, null);
+ if (stat != null) {
+ foundVersion = stat.getVersion();
+ }
+ log.info("Failed to save terms, version is not a match, retrying version={} found={}", newTerms.getVersion(), foundVersion);
if (isClosed.get()) {
throw new AlreadyClosedException();
@@ -353,8 +358,11 @@ public class ZkShardTerms implements Closeable {
if (Watcher.Event.EventType.None == event.getType()) {
return;
}
- if (event.getType() == Watcher.Event.EventType.NodeCreated || event.getType() == Watcher.Event.EventType.NodeDataChanged) {
+
+ try {
retryRegisterWatcher();
+ } catch (KeeperException e) {
+ log.warn("Exception refreshing terms on watcher event", e);
}
};
Stat stat = new Stat();
@@ -365,7 +373,7 @@ public class ZkShardTerms implements Closeable {
} catch (KeeperException.NoNodeException e) {
log.warn("No node found for shard terms", e);
// we have likely been deleted
- return;
+ throw new AlreadyClosedException("No node found for shard terms");
} catch (InterruptedException e) {
ParWork.propagateInterrupt(e);
throw new SolrException(SolrException.ErrorCode.SERVER_ERROR, "Error updating shard term for collection: " + collection, e);
@@ -377,26 +385,8 @@ public class ZkShardTerms implements Closeable {
/**
* Retry register a watcher to the correspond ZK term node
*/
- private void retryRegisterWatcher() {
- while (!isClosed.get()) {
- try {
- refreshTerms(true);
- return;
- } catch (KeeperException.AuthFailedException e) {
- isClosed.set(true);
- log.error("Failed watching shard term for collection: {} due to unrecoverable exception", collection, e);
- return;
- } catch (KeeperException e) {
- log.warn("Failed watching shard term for collection: {}, retrying!", collection, e);
- try {
- zkClient.getConnectionManager().waitForConnected(zkClient.getZkClientTimeout());
- } catch (TimeoutException | InterruptedException te) {
- if (Thread.interrupted()) {
- throw new SolrException(SolrException.ErrorCode.SERVER_ERROR, "Error watching shard term for collection: " + collection, te);
- }
- }
- }
- }
+ private void retryRegisterWatcher() throws KeeperException {
+ refreshTerms(true);
}
/**
diff --git a/solr/core/src/java/org/apache/solr/cloud/overseer/ZkStateWriter.java b/solr/core/src/java/org/apache/solr/cloud/overseer/ZkStateWriter.java
index 80013b4..f5acb9a 100644
--- a/solr/core/src/java/org/apache/solr/cloud/overseer/ZkStateWriter.java
+++ b/solr/core/src/java/org/apache/solr/cloud/overseer/ZkStateWriter.java
@@ -56,6 +56,7 @@ public class ZkStateWriter {
private static final Logger log = LoggerFactory.getLogger(MethodHandles.lookup().lookupClass());
private final ZkStateReader reader;
+ private final Overseer overseer;
/**
* Represents a no-op {@link ZkWriteCommand} which will result in no modification to cluster state
@@ -85,9 +86,9 @@ public class ZkStateWriter {
private Set<String> dirtyStructure = new HashSet<>();
private Set<String> dirtyState = new HashSet<>();
- public ZkStateWriter(ZkStateReader zkStateReader, Stats stats) {
+ public ZkStateWriter(ZkStateReader zkStateReader, Stats stats, Overseer overseer) {
assert zkStateReader != null;
-
+ this.overseer = overseer;
this.reader = zkStateReader;
this.stats = stats;
@@ -225,7 +226,7 @@ public class ZkStateWriter {
List<Replica> replicas = docColl.getReplicas();
for (Replica replica : replicas) {
if (replica.getState() != Replica.State.DOWN && replica.getNodeName().equals(entry.getValue())) {
- log.info("set downnode for replica {}", replica);
+ if (log.isDebugEnabled()) log.debug("set downnode for replica {}", replica);
// nocommit
Slice slice = docColl.getSlice(replica.getSlice());
slice.setLeader(null);
@@ -297,6 +298,7 @@ public class ZkStateWriter {
docColl.getSlice(replica).setLeader(null);
}
updates.getProperties().put(replica.getName(), Replica.State.getShortState(state));
+ updates.getProperties().remove("leader");
// log.info("set state {} {}", state, replica);
replica.setState(state);
dirtyState.add(collection);
@@ -352,130 +354,154 @@ public class ZkStateWriter {
// if additional updates too large, publish structure changew
public void writePendingUpdates() {
- // writeLock.lock();
- // try {
- // log.info("Get our write lock");
- ourLock.lock();
+ do {
try {
- // log.info("Got our write lock");
+ write();
+ } catch (KeeperException.BadVersionException e) {
+ continue;
+ }
+ break;
+ } while (!overseer.isClosed());
- throttle.minimumWaitBetweenActions();
- throttle.markAttemptingAction();
+ }
- if (log.isTraceEnabled()) {
- log.trace("writePendingUpdates {}", cs);
- }
+ private void write() throws KeeperException.BadVersionException {
+ // writeLock.lock();
+ // try {
+ // log.info("Get our write lock");
+ ourLock.lock();
+ try {
+ // log.info("Got our write lock");
- if (failedUpdates.size() > 0) {
- log.warn("Some collection updates failed {} logging last exception", failedUpdates, lastFailedException); // nocommit expand
- failedUpdates.clear();
- throw new SolrException(SolrException.ErrorCode.SERVER_ERROR, lastFailedException.get());
- }
+ throttle.minimumWaitBetweenActions();
+ throttle.markAttemptingAction();
+
+ if (log.isTraceEnabled()) {
+ log.trace("writePendingUpdates {}", cs);
+ }
+
+ if (failedUpdates.size() > 0) {
+ log.warn("Some collection updates failed {} logging last exception", failedUpdates, lastFailedException); // nocommit expand
+ failedUpdates.clear();
+ throw new SolrException(SolrException.ErrorCode.SERVER_ERROR, lastFailedException.get());
+ }
// } finally {
// ourLock.unlock();
// }
- // wait to see our last publish version has propagated TODO don't wait on collections not hosted on overseer?
- // waitForStateWePublishedToComeBack();
-
- // ourLock.lock();
- AtomicInteger lastVersion = new AtomicInteger();
- //log.info("writing out state, looking at collections count={} toWrite={} {} : {}", cs.getCollectionsMap().size(), collectionsToWrite.size(), cs.getCollectionsMap().keySet(), collectionsToWrite);
- //try {
- cs.forEachCollection(collection -> {
- // log.info("check collection {}", collection);
- if (dirtyStructure.contains(collection.getName()) || dirtyState.contains(collection.getName())) {
- // log.info("process collection {}", collection);
- String name = collection.getName();
- String path = ZkStateReader.getCollectionPath(collection.getName());
- String pathSCN = ZkStateReader.getCollectionSCNPath(collection.getName());
- // log.info("process collection {} path {}", collection.getName(), path);
- Stat existsStat = null;
- if (log.isTraceEnabled()) log.trace("process {}", collection);
+ // wait to see our last publish version has propagated TODO don't wait on collections not hosted on overseer?
+ // waitForStateWePublishedToComeBack();
+
+ // ourLock.lock();
+ AtomicInteger lastVersion = new AtomicInteger();
+ AtomicReference<KeeperException.BadVersionException> badVersionException = new AtomicReference();
+ //log.info("writing out state, looking at collections count={} toWrite={} {} : {}", cs.getCollectionsMap().size(), collectionsToWrite.size(), cs.getCollectionsMap().keySet(), collectionsToWrite);
+ //try {
+ cs.forEachCollection(collection -> {
+ // log.info("check collection {}", collection);
+ Integer version = null;
+ if (dirtyStructure.contains(collection.getName()) || dirtyState.contains(collection.getName())) {
+ // log.info("process collection {}", collection);
+ String name = collection.getName();
+ String path = ZkStateReader.getCollectionPath(collection.getName());
+ String pathSCN = ZkStateReader.getCollectionSCNPath(collection.getName());
+ // log.info("process collection {} path {}", collection.getName(), path);
+ Stat existsStat = null;
+ if (log.isTraceEnabled()) log.trace("process {}", collection);
+ try {
+ // log.info("get data for {}", name);
+ byte[] data = Utils.toJSON(singletonMap(name, collection));
+ // log.info("got data for {} {}", name, data.length);
+
try {
- // log.info("get data for {}", name);
- byte[] data = Utils.toJSON(singletonMap(name, collection));
- // log.info("got data for {} {}", name, data.length);
+ Integer v = trackVersions.get(collection.getName());
- try {
- Integer version = null;
- Integer v = trackVersions.get(collection.getName());
+ if (v != null) {
+ //log.info("got version from cache {}", v);
+ version = v;
+ } else {
+ version = 0;
+ }
+ lastVersion.set(version);
+ if (log.isDebugEnabled()) log.debug("Write state.json prevVersion={} bytes={} col={}", version, data.length, collection);
- if (v != null) {
- //log.info("got version from cache {}", v);
- version = v;
- } else {
- version = 0;
- }
- lastVersion.set(version);
- if (log.isDebugEnabled()) log.debug("Write state.json prevVersion={} bytes={} col={}", version, data.length, collection);
-
- reader.getZkClient().setData(path, data, version, true);
- trackVersions.put(collection.getName(), version + 1);
- if (dirtyStructure.contains(collection.getName())) {
- if (log.isDebugEnabled()) log.debug("structure change in {}", collection.getName());
- dirtyStructure.remove(collection.getName());
- reader.getZkClient().setData(pathSCN, null, -1, true);
-
- ZkNodeProps updates = stateUpdates.get(collection.getName());
- if (updates != null) {
- updates.getProperties().clear();
- }
- }
+ reader.getZkClient().setData(path, data, version, true);
+ trackVersions.put(collection.getName(), version + 1);
+ if (dirtyStructure.contains(collection.getName())) {
+ if (log.isDebugEnabled()) log.debug("structure change in {}", collection.getName());
+ dirtyStructure.remove(collection.getName());
+ reader.getZkClient().setData(pathSCN, null, -1, true);
- } catch (KeeperException.NoNodeException e) {
- if (log.isDebugEnabled()) log.debug("No node found for state.json", e);
+ ZkNodeProps updates = stateUpdates.get(collection.getName());
+ if (updates != null) {
+ updates.getProperties().clear();
+ }
+ }
- lastVersion.set(-1);
- // trackVersions.remove(collection.getName());
- // likely deleted
- return;
+ } catch (KeeperException.NoNodeException e) {
+ if (log.isDebugEnabled()) log.debug("No node found for state.json", e);
- } catch (KeeperException.BadVersionException bve) {
- //lastFailedException.set(bve);
- //failedUpdates.put(collection.getName(), collection);
- // Stat estate = reader.getZkClient().exists(path, null);
- trackVersions.remove(collection.getName());
- throw bve;
+ lastVersion.set(-1);
+ // trackVersions.remove(collection.getName());
+ // likely deleted
+ return;
- }
+ } catch (KeeperException.BadVersionException bve) {
+ //lastFailedException.set(bve);
+ //failedUpdates.put(collection.getName(), collection);
+ // Stat estate = reader.getZkClient().exists(path, null);
+ trackVersions.remove(collection.getName());
+ Stat stat = reader.getZkClient().exists(path, null);
+ log.error("Tried to update state.json with bad version {} \n {}", version, stat != null ? stat.getVersion() : "null");
- if (dirtyState.contains(collection.getName())) {
- ZkNodeProps updates = stateUpdates.get(collection.getName());
- if (updates != null) {
- String stateUpdatesPath = ZkStateReader.getCollectionStateUpdatesPath(collection.getName());
- if (log.isDebugEnabled()) log.debug("write state updates for collection {} {}", collection.getName(), updates);
- dirtyState.remove(collection.getName());
- reader.getZkClient().setData(stateUpdatesPath, Utils.toJSON(updates), -1, true);
- }
+ if (!overseer.isClosed() && stat != null) {
+ trackVersions.put(collection.getName(), stat.getVersion());
}
- } catch (InterruptedException | AlreadyClosedException e) {
- log.info("We have been closed or one of our resources has, bailing {}", e.getClass().getSimpleName() + ":" + e.getMessage());
+ throw bve;
- } catch (Exception e) {
- log.error("Failed processing update=" + collection, e);
}
- }
- });
+ if (dirtyState.contains(collection.getName())) {
+ ZkNodeProps updates = stateUpdates.get(collection.getName());
+ if (updates != null) {
+ String stateUpdatesPath = ZkStateReader.getCollectionStateUpdatesPath(collection.getName());
+ if (log.isDebugEnabled()) log.debug("write state updates for collection {} {}", collection.getName(), updates);
+ dirtyState.remove(collection.getName());
+ reader.getZkClient().setData(stateUpdatesPath, Utils.toJSON(updates), -1, true);
+ }
+ }
+ } catch (KeeperException.BadVersionException bve) {
+ badVersionException.set(bve);
+ return;
+ } catch (InterruptedException | AlreadyClosedException e) {
+ log.info("We have been closed or one of our resources has, bailing {}", e.getClass().getSimpleName() + ":" + e.getMessage());
+ } catch (Exception e) {
+ log.error("Failed processing update=" + collection, e);
+ }
+ }
- //log.info("Done with successful cluster write out");
+ });
- } finally {
- ourLock.unlock();
+ if (badVersionException.get() != null) {
+ throw badVersionException.get();
}
-// } finally {
-// writeLock.unlock();
-// }
+
+ //log.info("Done with successful cluster write out");
+
+ } finally {
+ ourLock.unlock();
+ }
+ // } finally {
+ // writeLock.unlock();
+ // }
// nocommit - harden against failures and exceptions
// if (log.isDebugEnabled()) {
// log.debug("writePendingUpdates() - end - New Cluster State is: {}", newClusterState);
// }
-
}
private void waitForStateWePublishedToComeBack() {
diff --git a/solr/core/src/java/org/apache/solr/core/CachingDirectoryFactory.java b/solr/core/src/java/org/apache/solr/core/CachingDirectoryFactory.java
index 28c8028..430ba55 100644
--- a/solr/core/src/java/org/apache/solr/core/CachingDirectoryFactory.java
+++ b/solr/core/src/java/org/apache/solr/core/CachingDirectoryFactory.java
@@ -540,7 +540,7 @@ public abstract class CachingDirectoryFactory extends DirectoryFactory {
// }
cacheValue.refCnt--;
- if (cacheValue.refCnt == 0 && cacheValue.doneWithDir || closed) {
+ if (cacheValue.refCnt == 0 && cacheValue.doneWithDir || closed) {
boolean cl = closeCacheValue(cacheValue);
if (cl) {
removeFromCache(cacheValue);
diff --git a/solr/core/src/java/org/apache/solr/core/CoreContainer.java b/solr/core/src/java/org/apache/solr/core/CoreContainer.java
index 26f6a17..f4e223b 100644
--- a/solr/core/src/java/org/apache/solr/core/CoreContainer.java
+++ b/solr/core/src/java/org/apache/solr/core/CoreContainer.java
@@ -25,7 +25,6 @@ import org.apache.http.config.Lookup;
import org.apache.lucene.index.CorruptIndexException;
import org.apache.lucene.index.IndexWriter;
import org.apache.lucene.search.IndexSearcher;
-import org.apache.lucene.store.Directory;
import org.apache.solr.client.solrj.SolrClient;
import org.apache.solr.client.solrj.cloud.SolrCloudManager;
import org.apache.solr.client.solrj.impl.CloudHttp2SolrClient;
@@ -40,7 +39,6 @@ import org.apache.solr.cloud.ZkController;
import org.apache.solr.cloud.overseer.OverseerAction;
import org.apache.solr.common.AlreadyClosedException;
import org.apache.solr.common.ParWork;
-import org.apache.solr.common.PerThreadExecService;
import org.apache.solr.common.SolrException;
import org.apache.solr.common.SolrException.ErrorCode;
import org.apache.solr.common.cloud.DocCollection;
@@ -55,13 +53,12 @@ import org.apache.solr.common.util.IOUtils;
import org.apache.solr.common.util.ObjectCache;
import org.apache.solr.common.util.ObjectReleaseTracker;
import org.apache.solr.common.util.OrderedExecutor;
+import org.apache.solr.common.util.SysStats;
import org.apache.solr.common.util.Utils;
-import org.apache.solr.core.DirectoryFactory.DirContext;
import org.apache.solr.core.backup.repository.BackupRepository;
import org.apache.solr.core.backup.repository.BackupRepositoryFactory;
import org.apache.solr.filestore.PackageStoreAPI;
import org.apache.solr.handler.RequestHandlerBase;
-import org.apache.solr.handler.SnapShooter;
import org.apache.solr.handler.admin.CollectionsHandler;
import org.apache.solr.handler.admin.ConfigSetsHandler;
import org.apache.solr.handler.admin.CoreAdminHandler;
@@ -125,12 +122,10 @@ import java.nio.file.NoSuchFileException;
import java.nio.file.Path;
import java.nio.file.Paths;
import java.security.spec.InvalidKeySpecException;
-import java.text.SimpleDateFormat;
import java.util.ArrayList;
import java.util.Arrays;
import java.util.Collection;
import java.util.Comparator;
-import java.util.Date;
import java.util.HashMap;
import java.util.LinkedHashSet;
import java.util.List;
@@ -193,9 +188,7 @@ public class CoreContainer implements Closeable {
private volatile UpdateShardHandler updateShardHandler;
- public volatile ExecutorService solrCoreLoadExecutor;
-
- public volatile ExecutorService solrCoreCloseExecutor;
+ public volatile ExecutorService solrCoreExecutor;
private final OrderedExecutor replayUpdatesExecutor;
@@ -404,11 +397,8 @@ public class CoreContainer implements Closeable {
containerProperties.putAll(cfg.getSolrProperties());
- solrCoreLoadExecutor = new PerThreadExecService(ParWork.getRootSharedExecutor(), Math.max(16, Runtime.getRuntime().availableProcessors()),
- false, false);
-
- solrCoreCloseExecutor = new PerThreadExecService(ParWork.getRootSharedExecutor(), Math.max(16, Runtime.getRuntime().availableProcessors()),
- false, false);
+ solrCoreExecutor = ParWork.getParExecutorService("SolrCoreExecutor",
+ 1, SysStats.PROC_COUNT * 2, 3000, new LinkedBlockingQueue<>(1024));
}
@SuppressWarnings({"unchecked"})
@@ -903,7 +893,7 @@ public class CoreContainer implements Closeable {
}
if (cd.isLoadOnStartup()) {
- coreLoadFutures.add(solrCoreLoadExecutor.submit(() -> {
+ coreLoadFutures.add(solrCoreExecutor.submit(() -> {
SolrCore core = null;
MDCLoggingContext.setCoreDescriptor(this, cd);
try {
@@ -1075,10 +1065,6 @@ public class CoreContainer implements Closeable {
replayUpdatesExecutor.shutdownAndAwaitTermination();
});
- if (solrCoreLoadExecutor != null) {
- solrCoreLoadExecutor.shutdown();
- }
-
List<Callable<?>> callables = new ArrayList<>();
if (metricManager != null) {
@@ -1146,20 +1132,15 @@ public class CoreContainer implements Closeable {
closer.collect(callables);
closer.collect(metricsHistoryHandler);
-
- closer.collect(solrCoreLoadExecutor);
-
-
closer.collect("WaitForSolrCores", solrCores);
-
closer.addCollect();
closer.collect(shardHandlerFactory);
closer.collect(updateShardHandler);
- closer.collect(solrCoreCloseExecutor);
+ closer.collect(solrCoreExecutor);
closer.collect(solrClientCache);
closer.collect(loader);
@@ -1415,7 +1396,7 @@ public class CoreContainer implements Closeable {
zkSys.getZkController().throwErrorIfReplicaReplaced(dcore);
}
}
- new ZkController.RegisterCoreAsync(zkSys.zkController, dcore, false).call();
+ ParWork.getRootSharedExecutor().submit(new ZkController.RegisterCoreAsync(zkSys.zkController, dcore, false));
}
} catch (Exception e) {
@@ -1459,11 +1440,11 @@ public class CoreContainer implements Closeable {
if (core != null) {
SolrCore finalCore1 = core;
- solrCoreCloseExecutor.submit(() -> {
+ solrCoreExecutor.submit(() -> {
finalCore1.closeAndWait();
});
SolrCore finalOld = old;
- solrCoreCloseExecutor.submit(() -> {
+ solrCoreExecutor.submit(() -> {
if (finalOld != null) {
finalOld.closeAndWait();
}
@@ -1511,6 +1492,10 @@ public class CoreContainer implements Closeable {
private SolrCore processCoreCreateException(Exception original, CoreDescriptor dcore, ConfigSet coreConfig) {
log.error("Error creating SolrCore", original);
+ if (isShutDown) {
+ return null;
+ }
+
// Traverse full chain since CIE may not be root exception
Throwable cause = original;
if (!(cause instanceof CorruptIndexException)) {
@@ -1544,10 +1529,31 @@ public class CoreContainer implements Closeable {
.getLeader();
if (leader != null && leader.getState() == State.ACTIVE) {
log.info("Found active leader, will attempt to create fresh core and recover.");
- resetIndexDirectory(dcore, coreConfig);
+
+ SolrConfig config = coreConfig.getSolrConfig();
+
+ String registryName = SolrMetricManager.getRegistryName(SolrInfoBean.Group.core, dcore.getName());
+ DirectoryFactory df = DirectoryFactory.loadDirectoryFactory(config, this, registryName);
+ String dataDir = SolrCore.findDataDir(df, null, config, dcore);
+ df.close();
+
+ try {
+ while (new File(dataDir).exists()) {
+ try {
+ Files.walk(new File(dataDir).toPath()).sorted(Comparator.reverseOrder()).map(Path::toFile).forEach(File::delete);
+ } catch (NoSuchFileException e) {
+
+ }
+ }
+ } catch (Exception e) {
+ SolrException.log(log, "Failed to delete instance dir for core:" + dcore.getName() + " dir:" + dcore.getInstanceDir());
+ }
+
+ SolrCore core = new SolrCore(this, dcore, coreConfig);
+ core.getUpdateHandler().getUpdateLog().deleteAll();
+
// the index of this core is emptied, its term should be set to 0
getZkController().getShardTerms(desc.getCollectionName(), desc.getShardId()).setTermToZero(dcore.getName());
- return new SolrCore(this, dcore, coreConfig);
}
} catch (Exception se) {
se.addSuppressed(original);
@@ -1581,35 +1587,6 @@ public class CoreContainer implements Closeable {
}
/**
- * Write a new index directory for the a SolrCore, but do so without loading it.
- */
- private void resetIndexDirectory(CoreDescriptor dcore, ConfigSet coreConfig) {
- SolrConfig config = coreConfig.getSolrConfig();
-
- String registryName = SolrMetricManager.getRegistryName(SolrInfoBean.Group.core, dcore.getName());
- DirectoryFactory df = DirectoryFactory.loadDirectoryFactory(config, this, registryName);
- String dataDir = SolrCore.findDataDir(df, null, config, dcore);
-
- String tmpIdxDirName = "index." + new SimpleDateFormat(SnapShooter.DATE_FMT, Locale.ROOT).format(new Date());
- SolrCore.modifyIndexProps(df, dataDir, config, tmpIdxDirName);
-
- // Free the directory object that we had to create for this
- Directory dir = null;
- try {
- dir = df.get(dataDir, DirContext.META_DATA, config.indexConfig.lockType);
- } catch (IOException e) {
- throw new SolrException(SolrException.ErrorCode.SERVER_ERROR, e);
- } finally {
- try {
- df.doneWithDirectory(dir);
- df.release(dir);
- } catch (IOException e) {
- SolrException.log(log, e);
- }
- }
- }
-
- /**
* @return a Collection of registered SolrCores
*/
public Collection<SolrCore> getCores() {
@@ -1821,7 +1798,7 @@ public class CoreContainer implements Closeable {
if (!success) {
log.error("Failed reloading core, cleaning up new core");
SolrCore finalNewCore = newCore;
- solrCoreCloseExecutor.submit(() -> {
+ solrCoreExecutor.submit(() -> {
// try {
if (finalNewCore != null) {
log.error("Closing failed new core");
diff --git a/solr/core/src/java/org/apache/solr/core/CorePropertiesLocator.java b/solr/core/src/java/org/apache/solr/core/CorePropertiesLocator.java
index 4e4e2f8..3f9d586 100644
--- a/solr/core/src/java/org/apache/solr/core/CorePropertiesLocator.java
+++ b/solr/core/src/java/org/apache/solr/core/CorePropertiesLocator.java
@@ -171,8 +171,8 @@ public class CorePropertiesLocator implements CoresLocator {
log.info("Found {} core definitions underneath {}", cds.size(), rootDirectory);
}
if (cds.size() > 0) {
- if (log.isInfoEnabled()) {
- log.info("Cores are: {}", cds.stream().map(CoreDescriptor::getName).collect(Collectors.toList()));
+ if (log.isDebugEnabled()) {
+ log.debug("Cores are: {}", cds.stream().map(CoreDescriptor::getName).collect(Collectors.toList()));
}
}
return cds;
diff --git a/solr/core/src/java/org/apache/solr/core/SolrCore.java b/solr/core/src/java/org/apache/solr/core/SolrCore.java
index 29bc5bc..6d44700 100644
--- a/solr/core/src/java/org/apache/solr/core/SolrCore.java
+++ b/solr/core/src/java/org/apache/solr/core/SolrCore.java
@@ -173,6 +173,7 @@ import java.util.concurrent.Callable;
import java.util.concurrent.ConcurrentHashMap;
import java.util.concurrent.CopyOnWriteArrayList;
import java.util.concurrent.CountDownLatch;
+import java.util.concurrent.ExecutionException;
import java.util.concurrent.Future;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.atomic.AtomicBoolean;
@@ -900,8 +901,7 @@ public final class SolrCore implements SolrInfoBean, Closeable {
log.debug("{}Solr index directory '{}' doesn't exist. Creating new index...", logid, indexDir);
try (SolrIndexWriter writer = SolrIndexWriter.buildIndexWriter(this, "SolrCore.initIndex", indexDir, getDirectoryFactory(),
- true, getLatestSchema(), solrConfig.indexConfig, solrDelPolicy, codec)) {
- writer.commit();
+ true, getLatestSchema(), solrConfig.indexConfig, solrDelPolicy, codec, true)) {
} catch (Exception e) {
ParWork.propagateInterrupt(e);
throw new SolrException(ErrorCode.SERVER_ERROR, e);
@@ -1228,15 +1228,15 @@ public final class SolrCore implements SolrInfoBean, Closeable {
searcherReadyLatch.countDown();
// nocommit - wait before publish active
-// if (!getSolrConfig().useColdSearcher) {
-// try {
-// initSearcherFuture[0].get();
-// } catch (InterruptedException e) {
-// log.error("", e);
-// } catch (ExecutionException e) {
-// log.error("", e);
-// }
-// }
+ if (!getSolrConfig().useColdSearcher) {
+ try {
+ initSearcherFuture[0].get();
+ } catch (InterruptedException e) {
+ log.error("", e);
+ } catch (ExecutionException e) {
+ log.error("", e);
+ }
+ }
}
@@ -1295,7 +1295,7 @@ public final class SolrCore implements SolrInfoBean, Closeable {
}
Future[] waitSearcher = new Future[1];
try {
- getSearcher(false, false, null, true);
+ getSearcher(false, false, waitSearcher, true);
} finally {
newReaderCreator = null;
if (iwRef != null) {
diff --git a/solr/core/src/java/org/apache/solr/core/SolrCores.java b/solr/core/src/java/org/apache/solr/core/SolrCores.java
index 839d464..1c12281 100644
--- a/solr/core/src/java/org/apache/solr/core/SolrCores.java
+++ b/solr/core/src/java/org/apache/solr/core/SolrCores.java
@@ -115,7 +115,7 @@ class SolrCores implements Closeable {
}
cores.forEach((s, solrCore) -> {
- container.solrCoreCloseExecutor.submit(() -> {
+ container.solrCoreExecutor.submit(() -> {
MDCLoggingContext.setCore(solrCore);
try {
solrCore.closeAndWait();
diff --git a/solr/core/src/java/org/apache/solr/handler/IndexFetcher.java b/solr/core/src/java/org/apache/solr/handler/IndexFetcher.java
index 07887b3..ed01ffd 100644
--- a/solr/core/src/java/org/apache/solr/handler/IndexFetcher.java
+++ b/solr/core/src/java/org/apache/solr/handler/IndexFetcher.java
@@ -18,6 +18,7 @@ package org.apache.solr.handler;
import com.google.common.base.Strings;
import org.apache.lucene.codecs.CodecUtil;
+import org.apache.lucene.index.CorruptIndexException;
import org.apache.lucene.index.IndexCommit;
import org.apache.lucene.index.IndexWriter;
import org.apache.lucene.index.SegmentInfos;
@@ -520,7 +521,7 @@ public class IndexFetcher {
}
// Create the sync service
- fsyncService = ParWork.getExecutorService(15);
+ fsyncService = ParWork.getExecutorService(4);
// use a synchronized list because the list is read by other threads (to show details)
filesDownloaded = Collections.synchronizedList(new ArrayList<Map<String, Object>>());
// if the generation of master is older than that of the slave , it means they are not compatible to be copied
@@ -724,7 +725,7 @@ public class IndexFetcher {
ZkController zkController = solrCore.getCoreContainer().getZkController();
CloudDescriptor cd = solrCore.getCoreDescriptor().getCloudDescriptor();
Replica leaderReplica = zkController.getZkStateReader().getLeaderRetry(
- cd.getCollectionName(), cd.getShardId(), 1500, false);
+ cd.getCollectionName(), cd.getShardId(), 3000, false);
return leaderReplica;
}
@@ -1055,7 +1056,7 @@ public class IndexFetcher {
log.warn("WARNING: clearing disk space ahead of time to avoid running out of space, could cause problems with current SolrCore approxTotalSpaceReqd{}, usableSpace={}", atsr, usableSpace);
deleteFilesInAdvance(indexDir, indexDirPath, totalSpaceRequired, usableSpace);
}
- log.info("Files to download {}", filesToDownload);
+ if (log.isDebugEnabled()) log.debug("Files to download {}", filesToDownload);
try {
// nocommit
try (ParWork parWork = new ParWork(this, true)) {
@@ -1113,7 +1114,7 @@ public class IndexFetcher {
if (stop) {
throw new AlreadyClosedException();
}
- log.info("Downloaded {}", tmpIndexDir, file.get(NAME));
+ if (log.isDebugEnabled()) log.debug("Downloaded {}", tmpIndexDir, file.get(NAME));
filesDownloaded.add(Collections.unmodifiableMap(file));
} else {
if (log.isDebugEnabled()) {
@@ -1231,8 +1232,13 @@ public class IndexFetcher {
try {
indexFileChecksum = CodecUtil.retrieveChecksum(indexInput);
compareResult.checkSummed = true;
+ } catch (CorruptIndexException e) {
+ log.warn("Could not retrieve checksum from file.", e);
+ compareResult.equal = false;
+ return compareResult;
} catch (Exception e) {
log.warn("Could not retrieve checksum from file.", e);
+ compareResult.equal = false;
}
}
@@ -1722,7 +1728,6 @@ public class IndexFetcher {
//if cleanup succeeds . The file is downloaded fully
fsyncServiceFuture = fsyncService.submit(() -> {
try {
- log.info("Close fetched file", file);
file.close();
} catch (Exception e) {
fsyncException = e;
diff --git a/solr/core/src/java/org/apache/solr/handler/admin/CoreAdminOperation.java b/solr/core/src/java/org/apache/solr/handler/admin/CoreAdminOperation.java
index 935ee4a..655a12b 100644
--- a/solr/core/src/java/org/apache/solr/handler/admin/CoreAdminOperation.java
+++ b/solr/core/src/java/org/apache/solr/handler/admin/CoreAdminOperation.java
@@ -22,6 +22,7 @@ import java.nio.file.Path;
import java.util.Locale;
import java.util.Map;
import java.util.Optional;
+import java.util.concurrent.TimeUnit;
import org.apache.commons.lang3.StringUtils;
import org.apache.solr.cloud.CloudDescriptor;
@@ -99,8 +100,9 @@ enum CoreAdminOperation implements CoreAdminOp {
log().warn("Will not create SolrCore, CoreContainer is shutdown");
throw new AlreadyClosedException("Will not create SolrCore, CoreContainer is shutdown");
}
-
+ long start = System.nanoTime();
coreContainer.create(coreName, instancePath, coreParams, newCollection);
+ log().info("SolrCore {} created in {}ms", coreName, TimeUnit.NANOSECONDS.convert(System.nanoTime() - start, TimeUnit.MILLISECONDS));
it.rsp.add("core", coreName);
} finally {
@@ -283,6 +285,8 @@ enum CoreAdminOperation implements CoreAdminOp {
}
});
+ private static final Logger log = LoggerFactory.getLogger(MethodHandles.lookup().lookupClass());
+
final CoreAdminParams.CoreAdminAction action;
final CoreAdminOp fun;
@@ -291,7 +295,7 @@ enum CoreAdminOperation implements CoreAdminOp {
this.fun = fun;
}
- private static final Logger log = LoggerFactory.getLogger(MethodHandles.lookup().lookupClass());
+
static Logger log() {
return log;
diff --git a/solr/core/src/java/org/apache/solr/handler/admin/PrepRecoveryOp.java b/solr/core/src/java/org/apache/solr/handler/admin/PrepRecoveryOp.java
index ca07d0b..8434210 100644
--- a/solr/core/src/java/org/apache/solr/handler/admin/PrepRecoveryOp.java
+++ b/solr/core/src/java/org/apache/solr/handler/admin/PrepRecoveryOp.java
@@ -69,7 +69,6 @@ class PrepRecoveryOp implements CoreAdminHandler.CoreAdminOp {
try {
coreContainer.getZkController().getZkStateReader().waitForState(collection, 5, TimeUnit.SECONDS, (n, c) -> {
if (c == null) {
- log.info("collection not found {}", collection);
return false;
}
@@ -80,8 +79,7 @@ class PrepRecoveryOp implements CoreAdminHandler.CoreAdminOp {
if (replica != null) {
isLive = coreContainer.getZkController().getZkStateReader().isNodeLive(replica.getNodeName());
if (replica.getState() == waitForState) {
- // if (log.isDebugEnabled()) log.debug("replica={} state={} waitForState={}", replica, replica.getState(), waitForState);
- log.info("replica={} state={} waitForState={} isLive={}", replica, replica.getState(), waitForState, coreContainer.getZkController().getZkStateReader().isNodeLive(replica.getNodeName()));
+ if (log.isDebugEnabled()) log.debug("replica={} state={} waitForState={} isLive={}", replica, replica.getState(), waitForState, coreContainer.getZkController().getZkStateReader().isNodeLive(replica.getNodeName()));
return true;
}
}
diff --git a/solr/core/src/java/org/apache/solr/handler/component/RealTimeGetComponent.java b/solr/core/src/java/org/apache/solr/handler/component/RealTimeGetComponent.java
index 2c2dd9c..11b3e0e 100644
--- a/solr/core/src/java/org/apache/solr/handler/component/RealTimeGetComponent.java
+++ b/solr/core/src/java/org/apache/solr/handler/component/RealTimeGetComponent.java
@@ -356,7 +356,7 @@ public class RealTimeGetComponent extends SearchComponent
if (idStr == null) return;
AtomicLong version = new AtomicLong();
SolrInputDocument doc = getInputDocument(req.getCore(), new BytesRef(idStr), version, null, Resolution.DOC);
- log.info("getInputDocument called for id={}, returning {}", idStr, doc);
+ if (log.isDebugEnabled()) log.debug("getInputDocument called for id={}, returning {}", idStr, doc);
rb.rsp.add("inputDocument", doc);
rb.rsp.add("version", version.get());
}
@@ -970,7 +970,7 @@ public class RealTimeGetComponent extends SearchComponent
// the mappings.
for (int i=0; i<rb.slices.length; i++) {
- log.info("LOOKUP_SLICE:{}={}", rb.slices[i], rb.shards[i]);
+ if (log.isDebugEnabled()) log.debug("LOOKUP_SLICE:{}={}", rb.slices[i], rb.shards[i]);
if (lookup.equals(rb.slices[i]) || slice.equals(rb.slices[i])) {
return new String[]{rb.shards[i]};
}
@@ -1189,7 +1189,7 @@ public class RealTimeGetComponent extends SearchComponent
// TODO: get this from cache instead of rebuilding?
try (UpdateLog.RecentUpdates recentUpdates = ulog.getRecentUpdates()) {
- log.info("Get updates versionsRequested={} params={}", versions.size(), params);
+ if (log.isDebugEnabled()) log.debug("Get updates versionsRequested={} params={}", versions.size(), params);
for (Long version : versions) {
try {
Object o = recentUpdates.lookup(version);
diff --git a/solr/core/src/java/org/apache/solr/schema/ZkIndexSchemaReader.java b/solr/core/src/java/org/apache/solr/schema/ZkIndexSchemaReader.java
index c942a4a..2fed048 100644
--- a/solr/core/src/java/org/apache/solr/schema/ZkIndexSchemaReader.java
+++ b/solr/core/src/java/org/apache/solr/schema/ZkIndexSchemaReader.java
@@ -132,6 +132,8 @@ public class ZkIndexSchemaReader implements OnReconnect {
public void close() throws IOException {
try {
schemaReader.zkClient.getSolrZooKeeper().removeWatches(schemaReader.managedSchemaPath, this, WatcherType.Any, true);
+ } catch (KeeperException.NoWatcherException e) {
+
} catch (Exception e) {
if (log.isDebugEnabled()) log.debug("could not remove watch {} {}", e.getClass().getSimpleName(), e.getMessage());
}
diff --git a/solr/core/src/java/org/apache/solr/servlet/SolrQoSFilter.java b/solr/core/src/java/org/apache/solr/servlet/SolrQoSFilter.java
index f6b6f13..c3f69c9 100644
--- a/solr/core/src/java/org/apache/solr/servlet/SolrQoSFilter.java
+++ b/solr/core/src/java/org/apache/solr/servlet/SolrQoSFilter.java
@@ -50,7 +50,7 @@ public class SolrQoSFilter extends QoSFilter {
@Override
public void init(FilterConfig filterConfig) {
super.init(filterConfig);
- _origMaxRequests = Integer.getInteger("solr.concurrentRequests.max", 10000);
+ _origMaxRequests = Integer.getInteger("solr.concurrentRequests.max", 1000);
super.setMaxRequests(_origMaxRequests);
super.setSuspendMs(Integer.getInteger("solr.concurrentRequests.suspendms", 30000));
super.setWaitMs(Integer.getInteger("solr.concurrentRequests.waitms", 2000));
@@ -65,49 +65,11 @@ public class SolrQoSFilter extends QoSFilter {
boolean imagePath = req.getPathInfo() != null && req.getPathInfo().startsWith("/img/");
boolean externalRequest = !imagePath && (source == null || !source.equals(QoSParams.INTERNAL));
if (log.isDebugEnabled()) log.debug("SolrQoSFilter {} {} {}", sysStats.getSystemLoad(), sysStats.getTotalUsage(), externalRequest);
+ log.info("SolrQoSFilter {} {} {}", sysStats.getSystemLoad(), sysStats.getTotalUsage(), externalRequest);
if (externalRequest) {
if (log.isDebugEnabled()) log.debug("external request"); //nocommit: remove when testing is done
- double ourLoad = sysStats.getTotalUsage();
- if (log.isDebugEnabled()) log.debug("Our individual load is {}", ourLoad);
- double sLoad = sysStats.getSystemLoad();
- if (ourLoad > SysStats.OUR_LOAD_HIGH) {
-
- int cMax = getMaxRequests();
- if (cMax > 5) {
- int max = Math.max(5, (int) ((double) cMax * 0.30D));
- log.warn("Our individual load is {}", ourLoad);
- updateMaxRequests(max, sLoad, ourLoad);
- }
-
- } else {
- // nocommit - deal with no supported, use this as a fail safe with high and low watermark?
- if (ourLoad < 0.90 && sLoad < 1.6 && _origMaxRequests != getMaxRequests()) {
- if (sLoad < 0.9) {
- if (log.isDebugEnabled()) log.debug("set max concurrent requests to orig value {}", _origMaxRequests);
- updateMaxRequests(_origMaxRequests, sLoad, ourLoad);
- } else {
- updateMaxRequests(Math.min(_origMaxRequests, getMaxRequests() * 3), sLoad, ourLoad);
- }
- } else {
- if (ourLoad > 0.90 && sLoad > 1.5) {
- int cMax = getMaxRequests();
- if (cMax > 5) {
- int max = Math.max(5, (int) ((double) cMax * 0.30D));
- // log.warn("System load is {} and our load is {} procs is {}, set max concurrent requests to {}", sLoad, ourLoad, SysStats.PROC_COUNT, max);
- updateMaxRequests(max, sLoad, ourLoad);
- }
- } else if (ourLoad < 0.90 && sLoad < 2 && _origMaxRequests != getMaxRequests()) {
- if (sLoad < 0.9) {
- if (log.isDebugEnabled()) log.debug("set max concurrent requests to orig value {}", _origMaxRequests);
- updateMaxRequests(_origMaxRequests, sLoad, ourLoad);
- } else {
- updateMaxRequests(Math.min(_origMaxRequests, getMaxRequests() * 3), sLoad, ourLoad);
- }
-
- }
- }
- }
+ checkLoad();
//chain.doFilter(req, response);
super.doFilter(req, response, chain);
@@ -118,11 +80,51 @@ public class SolrQoSFilter extends QoSFilter {
}
}
+ private void checkLoad() {
+ double ourLoad = sysStats.getTotalUsage();
+ int currentMaxRequests = getMaxRequests();
+ if (log.isDebugEnabled()) log.debug("Our individual load is {}", ourLoad);
+ double sLoad = sysStats.getSystemLoad();
+
+
+ if (lowStateLoad(sLoad, currentMaxRequests)) {
+// if (log.isDebugEnabled()) log.debug("set max concurrent requests to orig value {}", _origMaxRequests);
+// updateMaxRequests(_origMaxRequests, sLoad, ourLoad);
+// } else {
+ updateMaxRequests(Math.min(_origMaxRequests, _origMaxRequests), sLoad, ourLoad);
+ } else {
+
+ if (hiLoadState(sLoad, currentMaxRequests)) {
+ int cMax = getMaxRequests();
+ int max = Math.max(0, (int) ((double) cMax * 0.90D));
+ updateMaxRequests(max, sLoad, ourLoad);
+ }
+ }
+ // nocommit - deal with no supported, use this as a fail safe with high and low watermark?
+ }
+
+ private boolean lowStateLoad(double sLoad, int currentMaxRequests) {
+ return currentMaxRequests < _origMaxRequests && sLoad < 1.0d;
+ }
+
+ private boolean hiLoadState(double sLoad, int currentMaxRequests) {
+ return sLoad > 9.0d;
+ }
+
private void updateMaxRequests(int max, double sLoad, double ourLoad) {
- if (System.currentTimeMillis() - lastUpdate > 2000) {
+ int currentMax = getMaxRequests();
+ if (max < currentMax) {
+ if (System.currentTimeMillis() - lastUpdate > 500) {
+ log.warn("Set max request to {} sload={} ourload={}", max, sLoad, ourLoad);
+ lastUpdate = System.currentTimeMillis();
+ setMaxRequests(max);
+ }
+ } else if (max > currentMax) {
+
log.warn("Set max request to {} sload={} ourload={}", max, sLoad, ourLoad);
lastUpdate = System.currentTimeMillis();
setMaxRequests(max);
}
+
}
}
\ No newline at end of file
diff --git a/solr/core/src/java/org/apache/solr/update/DefaultSolrCoreState.java b/solr/core/src/java/org/apache/solr/update/DefaultSolrCoreState.java
index 02ecd79..c8a271d 100644
--- a/solr/core/src/java/org/apache/solr/update/DefaultSolrCoreState.java
+++ b/solr/core/src/java/org/apache/solr/update/DefaultSolrCoreState.java
@@ -272,7 +272,7 @@ public final class DefaultSolrCoreState extends SolrCoreState implements Recover
SolrIndexWriter iw;
try {
iw = SolrIndexWriter.buildIndexWriter(core, name, core.getNewIndexDir(), core.getDirectoryFactory(), false, core.getLatestSchema(),
- core.getSolrConfig().indexConfig, core.getDeletionPolicy(), core.getCodec());
+ core.getSolrConfig().indexConfig, core.getDeletionPolicy(), core.getCodec(), false);
} catch (Exception e) {
ParWork.propagateInterrupt(e);
throw new SolrException(ErrorCode.SERVER_ERROR, e);
@@ -432,7 +432,7 @@ public final class DefaultSolrCoreState extends SolrCoreState implements Recover
@Override
public void cancelRecovery(boolean wait, boolean prepForClose) {
- log.info("Cancel recovery");
+ if (log.isDebugEnabled()) log.debug("Cancel recovery");
recoverying = false;
if (prepForClose) {
diff --git a/solr/core/src/java/org/apache/solr/update/PeerSync.java b/solr/core/src/java/org/apache/solr/update/PeerSync.java
index d24f905..a39c8db 100644
--- a/solr/core/src/java/org/apache/solr/update/PeerSync.java
+++ b/solr/core/src/java/org/apache/solr/update/PeerSync.java
@@ -140,6 +140,7 @@ public class PeerSync implements SolrMetricProducer {
}
public static long percentile(List<Long> arr, float frac) {
+ if (arr.size() == 0) return 0;
int elem = (int) (arr.size() * frac);
return Math.abs(arr.get(elem));
}
@@ -206,25 +207,8 @@ public class PeerSync implements SolrMetricProducer {
// we have no versions and hence no frame of reference to tell if we can use a peers
// updates to bring us into sync
- log.info("{} DONE. We have no versions. sync failed.", msg());
-
- for (;;) {
- if (log.isDebugEnabled()) log.debug("looping in check for versions on others");
- ShardResponse srsp = shardHandler.takeCompletedIncludingErrors();
- if (srsp == null) break;
- if (srsp.getException() == null) {
- if (log.isDebugEnabled()) log.debug("checking if others have versions {} {}", srsp.getSolrResponse().getResponse());
- List<Long> otherVersions = (List<Long>)srsp.getSolrResponse().getResponse().get("versions");
- if (otherVersions != null && !otherVersions.isEmpty()) {
- if (syncErrors != null) syncErrors.inc();
- if (log.isDebugEnabled()) log.debug("found another replica with versions");
- return PeerSyncResult.failure(true);
- }
- }
- }
- if (syncErrors != null) syncErrors.inc();
- if (log.isDebugEnabled()) log.debug("found no other replica with versions");
- return PeerSyncResult.failure(false);
+ return failOnNoVersions();
+
}
MissedUpdatesFinder missedUpdatesFinder = new MissedUpdatesFinder(ourUpdates, msg(), nUpdates, ourLowThreshold, ourHighThreshold);
@@ -265,6 +249,27 @@ public class PeerSync implements SolrMetricProducer {
}
}
+ private PeerSyncResult failOnNoVersions() {
+ log.info("{} DONE. We have no versions. sync failed.", msg());
+
+ for (;;) {
+ ShardResponse srsp = shardHandler.takeCompletedIncludingErrors();
+ if (srsp == null) break;
+ if (srsp.getException() == null) {
+ if (log.isDebugEnabled()) log.debug("checking if others have versions {} {}", srsp.getSolrResponse().getResponse());
+ List<Long> otherVersions = (List<Long>)srsp.getSolrResponse().getResponse().get("versions");
+ if (otherVersions != null && !otherVersions.isEmpty()) {
+ if (syncErrors != null) syncErrors.inc();
+ if (log.isDebugEnabled()) log.debug("found another replica with versions");
+ return PeerSyncResult.failure(true);
+ }
+ }
+ }
+ if (syncErrors != null) syncErrors.inc();
+ if (log.isDebugEnabled()) log.debug("found no other replica with versions");
+ return PeerSyncResult.failure(false);
+ }
+
/**
* Check if we are already in sync. Simple fingerprint comparison should do
*/
@@ -406,31 +411,6 @@ public class PeerSync implements SolrMetricProducer {
}
}
- private boolean canHandleVersionRanges(String replica) {
- SyncShardRequest sreq = new SyncShardRequest();
- requests.add(sreq);
-
- // determine if leader can handle version ranges
- sreq.shards = new String[] {replica};
- sreq.actualShards = sreq.shards;
- sreq.params = new ModifiableSolrParams();
- sreq.params.set("qt", "/get");
- sreq.params.set(DISTRIB, false);
- sreq.params.set("checkCanHandleVersionRanges", false);
-
- ShardHandler sh = shardHandlerFactory.getShardHandler();
- sh.submit(sreq, replica, sreq.params);
-
- ShardResponse srsp = sh.takeCompletedIncludingErrors();
- Boolean canHandleVersionRanges = srsp.getSolrResponse().getResponse().getBooleanArg("canHandleVersionRanges");
-
- if (canHandleVersionRanges == null || canHandleVersionRanges.booleanValue() == false) {
- return false;
- }
-
- return true;
- }
-
private boolean handleVersions(ShardResponse srsp, MissedUpdatesFinder missedUpdatesFinder) {
// we retrieved the last N updates from the replica
@SuppressWarnings({"unchecked"})
@@ -440,8 +420,8 @@ public class PeerSync implements SolrMetricProducer {
SyncShardRequest sreq = (SyncShardRequest) srsp.getShardRequest();
Object fingerprint = srsp.getSolrResponse().getResponse().get("fingerprint");
- if (log.isInfoEnabled()) {
- log.info("{} Received {} versions from {} {} fingerprint:{}", msg(), otherVersions.size(), otherVersions, sreq.shards[0], fingerprint);
+ if (log.isDebugEnabled()) {
+ log.debug("{} Received {} versions from {} {} fingerprint:{}", msg(), otherVersions.size(), otherVersions, sreq.shards[0], fingerprint);
}
if (fingerprint != null) {
sreq.fingerprint = IndexFingerprint.fromObject(fingerprint);
@@ -454,7 +434,7 @@ public class PeerSync implements SolrMetricProducer {
MissedUpdatesRequest updatesRequest = missedUpdatesFinder.find(
otherVersions, sreq.shards[0],
- () -> core.getSolrConfig().useRangeVersionsForPeerSync && canHandleVersionRanges(sreq.shards[0]));
+ () -> core.getSolrConfig().useRangeVersionsForPeerSync);
if (updatesRequest == MissedUpdatesRequest.ALREADY_IN_SYNC) {
return true;
diff --git a/solr/core/src/java/org/apache/solr/update/PeerSyncWithLeader.java b/solr/core/src/java/org/apache/solr/update/PeerSyncWithLeader.java
index b670372..2a480a6 100644
--- a/solr/core/src/java/org/apache/solr/update/PeerSyncWithLeader.java
+++ b/solr/core/src/java/org/apache/solr/update/PeerSyncWithLeader.java
@@ -171,7 +171,10 @@ public class PeerSyncWithLeader implements SolrMetricProducer {
// now make sure that the starting updates overlap our updates
// there shouldn't be reorders, so any overlap will do.
- long smallestNewUpdate = Math.abs(ourUpdates.get(ourUpdates.size() - 1));
+ long smallestNewUpdate = 0;
+ if (ourUpdates.size() > 0) {
+ smallestNewUpdate = Math.abs(ourUpdates.get(ourUpdates.size() - 1));
+ }
if (Math.abs(startingVersions.get(0)) < smallestNewUpdate) {
log.warn("{} too many updates received since start - startingUpdates no longer overlaps with our currentUpdates", msg());
@@ -298,7 +301,7 @@ public class PeerSyncWithLeader implements SolrMetricProducer {
// only DBI or DBQ in the gap (above) will satisfy this predicate
return version > leaderFingerprint.getMaxVersionEncountered() && (oper == UpdateLog.DELETE || oper == UpdateLog.DELETE_BY_QUERY);
});
- log.info("existDBIOrDBQInTheGap={}", existDBIOrDBQInTheGap);
+ if (log.isDebugEnabled()) log.debug("existDBIOrDBQInTheGap={}", existDBIOrDBQInTheGap);
if (!existDBIOrDBQInTheGap) {
// it is safe to use leaderFingerprint.maxVersionEncountered as cut point now.
updates.removeIf(e -> {
@@ -306,7 +309,7 @@ public class PeerSyncWithLeader implements SolrMetricProducer {
List<Object> u = (List<Object>) e;
long version = (Long) u.get(1);
boolean success = version > leaderFingerprint.getMaxVersionEncountered();
- log.info("existDBIOrDBQInTheGap version={} leaderFingerprint.getMaxVersionEncountered={} success={}", version, leaderFingerprint.getMaxVersionEncountered(), success);
+ if (log.isDebugEnabled()) log.debug("existDBIOrDBQInTheGap version={} leaderFingerprint.getMaxVersionEncountered={} success={}", version, leaderFingerprint.getMaxVersionEncountered(), success);
return success;
});
}
diff --git a/solr/core/src/java/org/apache/solr/update/SolrCoreState.java b/solr/core/src/java/org/apache/solr/update/SolrCoreState.java
index c53fe21..b0885ef 100644
--- a/solr/core/src/java/org/apache/solr/update/SolrCoreState.java
+++ b/solr/core/src/java/org/apache/solr/update/SolrCoreState.java
@@ -70,7 +70,7 @@ public abstract class SolrCoreState {
boolean close = false;
synchronized (this) {
solrCoreStateRefCnt--;
- log.info("SolrCoreState ref count {}", solrCoreStateRefCnt);
+ if (log.isDebugEnabled()) log.debug("SolrCoreState ref count {}", solrCoreStateRefCnt);
if (solrCoreStateRefCnt == 0) {
closed = true;
@@ -80,7 +80,7 @@ public abstract class SolrCoreState {
if (close) {
try {
- log.debug("Closing SolrCoreState");
+ if (log.isDebugEnabled()) log.debug("Closing SolrCoreState");
close(closer);
} catch (Exception e) {
log.error("Error closing SolrCoreState", e);
diff --git a/solr/core/src/java/org/apache/solr/update/SolrIndexSplitter.java b/solr/core/src/java/org/apache/solr/update/SolrIndexSplitter.java
index 3e55dab..d66f777 100644
--- a/solr/core/src/java/org/apache/solr/update/SolrIndexSplitter.java
+++ b/solr/core/src/java/org/apache/solr/update/SolrIndexSplitter.java
@@ -291,7 +291,7 @@ public class SolrIndexSplitter {
t = timings.sub("createSubIW");
t.resume();
iw = SolrIndexWriter.buildIndexWriter(core, partitionName, path, core.getDirectoryFactory(), true, core.getLatestSchema(),
- core.getSolrConfig().indexConfig, core.getDeletionPolicy(), core.getCodec());
+ core.getSolrConfig().indexConfig, core.getDeletionPolicy(), core.getCodec(), true);
t.pause();
}
}
diff --git a/solr/core/src/java/org/apache/solr/update/SolrIndexWriter.java b/solr/core/src/java/org/apache/solr/update/SolrIndexWriter.java
index 2674487..d710db8 100644
--- a/solr/core/src/java/org/apache/solr/update/SolrIndexWriter.java
+++ b/solr/core/src/java/org/apache/solr/update/SolrIndexWriter.java
@@ -112,12 +112,12 @@ public class SolrIndexWriter extends IndexWriter {
// return w;
// }
- public static SolrIndexWriter buildIndexWriter(SolrCore core, String name, String path, DirectoryFactory directoryFactory, boolean create, IndexSchema schema, SolrIndexConfig config, IndexDeletionPolicy delPolicy, Codec codec) {
+ public static SolrIndexWriter buildIndexWriter(SolrCore core, String name, String path, DirectoryFactory directoryFactory, boolean create, IndexSchema schema, SolrIndexConfig config, IndexDeletionPolicy delPolicy, Codec codec, boolean commitOnClose) {
SolrIndexWriter iw = null;
Directory dir = null;
try {
dir = getDir(directoryFactory, path, config);
- iw = new SolrIndexWriter(core, name, directoryFactory, dir, create, schema, config, delPolicy, codec);
+ iw = new SolrIndexWriter(core, name, directoryFactory, dir, create, schema, config, delPolicy, codec, commitOnClose);
} catch (Throwable e) {
ParWork.propagateInterrupt(e);
SolrException exp = new SolrException(SolrException.ErrorCode.SERVER_ERROR, e);
@@ -170,11 +170,11 @@ public class SolrIndexWriter extends IndexWriter {
assert ObjectReleaseTracker.track(this);
}
- public SolrIndexWriter(SolrCore core, String name, DirectoryFactory directoryFactory, Directory directory, boolean create, IndexSchema schema, SolrIndexConfig config, IndexDeletionPolicy delPolicy, Codec codec) throws IOException {
+ public SolrIndexWriter(SolrCore core, String name, DirectoryFactory directoryFactory, Directory directory, boolean create, IndexSchema schema, SolrIndexConfig config, IndexDeletionPolicy delPolicy, Codec codec, boolean commitOnClose) throws IOException {
super(directory,
config.toIndexWriterConfig(core).
setOpenMode(create ? IndexWriterConfig.OpenMode.CREATE : IndexWriterConfig.OpenMode.APPEND).
- setIndexDeletionPolicy(delPolicy).setCodec(codec)
+ setIndexDeletionPolicy(delPolicy).setCodec(codec).setCommitOnClose(commitOnClose)
);
try {
if (log.isDebugEnabled()) log.debug("Opened Writer " + name);
diff --git a/solr/core/src/java/org/apache/solr/update/UpdateLog.java b/solr/core/src/java/org/apache/solr/update/UpdateLog.java
index 39fa5de..2718286 100644
--- a/solr/core/src/java/org/apache/solr/update/UpdateLog.java
+++ b/solr/core/src/java/org/apache/solr/update/UpdateLog.java
@@ -1596,25 +1596,27 @@ public class UpdateLog implements PluginInfoInitialized, SolrMetricProducer {
if(Math.abs(ptr.version) > Math.abs(maxVersion)) continue;
if (ptr.version != 0) {
ret.add(ptr.version);
+ } else {
+ log.warn("Found version of 0 {} {} {}", ptr.pointer, ptr.previousVersion, ptr.log);
}
if (--n <= 0) return ret;
}
}
- log.info("Return getVersions {} {}", n, ret);
+ if (log.isDebugEnabled()) log.debug("Return getVersions {} {}", n, ret);
return ret;
}
public Object lookup(long version) {
- log.info("lookup {}", version);
+ if (log.isDebugEnabled()) log.debug("lookup {}", version);
Update update = updates.get(version);
if (update == null) return null;
- log.info("found update from updates {} {}", update.version, updates.size());
+ if (log.isDebugEnabled()) log.debug("found update from updates {} {}", update.version, updates.size());
Object object = update.log.lookup(update.pointer);
- log.info("found update from log {} {} ptr={} object={}", update.version, update.log, update.pointer, object);
+ if (log.isDebugEnabled()) log.debug("found update from log {} {} ptr={} object={}", update.version, update.log, update.pointer, object);
return object;
}
@@ -1704,7 +1706,7 @@ public class UpdateLog implements PluginInfoInitialized, SolrMetricProducer {
numUpdates++;
}
- log.info("Recent updates updates numUpdates={} numUpdatesToKeep={}", numUpdates, numRecordsToKeep);
+ if (log.isDebugEnabled()) log.debug("Recent updates updates numUpdates={} numUpdatesToKeep={}", numUpdates, numRecordsToKeep);
} catch (IOException | AssertionError e) { // catch AssertionError to handle certain test failures correctly
// failure to read a log record isn't fatal
diff --git a/solr/core/src/java/org/apache/solr/update/UpdateShardHandler.java b/solr/core/src/java/org/apache/solr/update/UpdateShardHandler.java
index ecdf3b6..0f7800e 100644
--- a/solr/core/src/java/org/apache/solr/update/UpdateShardHandler.java
+++ b/solr/core/src/java/org/apache/solr/update/UpdateShardHandler.java
@@ -108,7 +108,7 @@ public class UpdateShardHandler implements SolrInfoBean {
.connectionTimeout(cfg.getDistributedConnectionTimeout())
.idleTimeout(cfg.getDistributedSocketTimeout());
}
- updateOnlyClient = updateOnlyClientBuilder.markInternalRequest().strictEventOrdering(true).build();
+ updateOnlyClient = updateOnlyClientBuilder.markInternalRequest().strictEventOrdering(false).build();
updateOnlyClient.enableCloseLock();
// updateOnlyClient.addListenerFactory(updateHttpListenerFactory);
Set<String> queryParams = new HashSet<>(2);
diff --git a/solr/core/src/java/org/apache/solr/update/processor/DistributedZkUpdateProcessor.java b/solr/core/src/java/org/apache/solr/update/processor/DistributedZkUpdateProcessor.java
index a5a8847..3493dac 100644
--- a/solr/core/src/java/org/apache/solr/update/processor/DistributedZkUpdateProcessor.java
+++ b/solr/core/src/java/org/apache/solr/update/processor/DistributedZkUpdateProcessor.java
@@ -190,7 +190,7 @@ public class DistributedZkUpdateProcessor extends DistributedUpdateProcessor {
EnumSet.of(Replica.Type.TLOG, Replica.Type.NRT), true);
try {
- leaderReplica = zkController.getZkStateReader().getLeaderRetry(collection, cloudDesc.getShardId(), 1000, false);
+ leaderReplica = zkController.getZkStateReader().getLeaderRetry(collection, cloudDesc.getShardId(), 3000, false);
} catch (Exception e) {
ParWork.propagateInterrupt(e);
throw new SolrException(ErrorCode.SERVER_ERROR,
@@ -645,7 +645,7 @@ public class DistributedZkUpdateProcessor extends DistributedUpdateProcessor {
+ "failed since we're not in cloud mode.");
}
try {
- return zkController.getZkStateReader().getLeaderRetry(collection, cloudDesc.getShardId(), 1500, false).getCoreUrl();
+ return zkController.getZkStateReader().getLeaderRetry(collection, cloudDesc.getShardId(), 3000, false).getCoreUrl();
} catch (InterruptedException | TimeoutException e) {
ParWork.propagateInterrupt(e);
throw new SolrException(SolrException.ErrorCode.SERVER_ERROR, "Exception during fetching from leader.", e);
@@ -717,14 +717,14 @@ public class DistributedZkUpdateProcessor extends DistributedUpdateProcessor {
try {
// Not equivalent to getLeaderProps, which retries to find a leader.
// Replica leader = slice.getLeader();
- Replica leaderReplica = zkController.getZkStateReader().getLeaderRetry(collection, shardId, 100, false);
+ Replica leaderReplica = zkController.getZkStateReader().getLeaderRetry(collection, shardId, 3000, false);
isLeader = leaderReplica.getName().equals(desc.getName());
if (log.isTraceEnabled()) log.trace("Are we leader for sending to replicas? {} phase={}", isLeader, phase);
if (!isLeader) {
isSubShardLeader = amISubShardLeader(coll, slice, id, doc);
if (isSubShardLeader) {
shardId = cloudDesc.getShardId();
- leaderReplica = zkController.getZkStateReader().getLeaderRetry(collection, shardId, 1500, false);
+ leaderReplica = zkController.getZkStateReader().getLeaderRetry(collection, shardId, 3000, false);
}
}
diff --git a/solr/core/src/test/org/apache/solr/cloud/MissingSegmentRecoveryTest.java b/solr/core/src/test/org/apache/solr/cloud/MissingSegmentRecoveryTest.java
index c0907f2..addc405 100644
--- a/solr/core/src/test/org/apache/solr/cloud/MissingSegmentRecoveryTest.java
+++ b/solr/core/src/test/org/apache/solr/cloud/MissingSegmentRecoveryTest.java
@@ -38,11 +38,13 @@ import org.junit.After;
import org.junit.AfterClass;
import org.junit.Before;
import org.junit.BeforeClass;
+import org.junit.Ignore;
import org.junit.Test;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
@Slow
+@Ignore // nocommit - this feature needs a little work
public class MissingSegmentRecoveryTest extends SolrCloudTestCase {
private static final Logger log = LoggerFactory.getLogger(MethodHandles.lookup().lookupClass());
diff --git a/solr/server/etc/jetty-https.xml b/solr/server/etc/jetty-https.xml
index 331cb3d..c12b4f6 100644
--- a/solr/server/etc/jetty-https.xml
+++ b/solr/server/etc/jetty-https.xml
@@ -79,7 +79,7 @@
<Set name="acceptQueueSize"><Property name="solr.jetty.https.acceptQueueSize" default="4096"/></Set>
<Call name="addLifeCycleListener">
<Arg>
- <New class="org.apache.solr.servlet.SolrConnectorListener"/>
+ <New class="org.apache.solr.servlet.SolrLifcycleListener"/>
</Arg>
</Call>
</New>
diff --git a/solr/server/etc/jetty-https8.xml b/solr/server/etc/jetty-https8.xml
index f937852..9116a36 100644
--- a/solr/server/etc/jetty-https8.xml
+++ b/solr/server/etc/jetty-https8.xml
@@ -62,6 +62,11 @@
<Set name="idleTimeout"><Property name="solr.jetty.https.timeout" default="120000"/></Set>
<Set name="acceptorPriorityDelta"><Property name="solr.jetty.ssl.acceptorPriorityDelta" default="0"/></Set>
<Set name="acceptQueueSize"><Property name="solr.jetty.https.acceptQueueSize" default="0"/></Set>
+ <Call name="addLifeCycleListener">
+ <Arg>
+ <New class="org.apache.solr.servlet.SolrLifcycleListener"/>
+ </Arg>
+ </Call>
</New>
</Arg>
</Call>
diff --git a/solr/server/etc/jetty.xml b/solr/server/etc/jetty.xml
index 438eb36..0c43ba9 100644
--- a/solr/server/etc/jetty.xml
+++ b/solr/server/etc/jetty.xml
@@ -152,10 +152,6 @@
<Set name="handlers">
<Array type="org.eclipse.jetty.server.Handler">
<Item>
- <New id="ShutdownHandler" class="org.apache.solr.servlet.SolrShutdownHandler">
- </New>
- </Item>
- <Item>
<New class="org.eclipse.jetty.server.handler.InetAccessHandler">
<Call name="include">
<Arg>
diff --git a/solr/server/resources/log4j2.xml b/solr/server/resources/log4j2.xml
index 6ac550a..ae030fd 100644
--- a/solr/server/resources/log4j2.xml
+++ b/solr/server/resources/log4j2.xml
@@ -39,9 +39,9 @@
</PatternLayout>
<Policies>
<OnStartupTriggeringPolicy />
- <SizeBasedTriggeringPolicy size="32 MB"/>
+ <SizeBasedTriggeringPolicy size="64 MB"/>
</Policies>
- <DefaultRolloverStrategy max="10"/>
+ <DefaultRolloverStrategy max="20"/>
</RollingRandomAccessFile>
<RollingRandomAccessFile
diff --git a/solr/solrj/src/java/org/apache/solr/client/solrj/impl/Http2SolrClient.java b/solr/solrj/src/java/org/apache/solr/client/solrj/impl/Http2SolrClient.java
index 945fb62..e0de06e 100644
--- a/solr/solrj/src/java/org/apache/solr/client/solrj/impl/Http2SolrClient.java
+++ b/solr/solrj/src/java/org/apache/solr/client/solrj/impl/Http2SolrClient.java
@@ -83,6 +83,7 @@ import java.io.IOException;
import java.io.InputStream;
import java.io.OutputStream;
import java.lang.invoke.MethodHandles;
+import java.lang.management.ManagementFactory;
import java.net.MalformedURLException;
import java.net.URI;
import java.net.URL;
@@ -116,6 +117,9 @@ import java.util.concurrent.TimeoutException;
* @lucene.experimental
*/
public class Http2SolrClient extends SolrClient {
+
+ public static final int PROC_COUNT = ManagementFactory.getOperatingSystemMXBean().getAvailableProcessors();
+
public static final String REQ_PRINCIPAL_KEY = "solr-req-principal";
private static volatile SSLConfig defaultSSLConfig;
@@ -218,7 +222,7 @@ public class Http2SolrClient extends SolrClient {
ssl = true;
}
// nocommit - look at config again as well
- int minThreads = Integer.getInteger("solr.minHttp2ClientThreads", 6);
+ int minThreads = Integer.getInteger("solr.minHttp2ClientThreads", PROC_COUNT);
httpClientExecutor = new SolrQueuedThreadPool("http2Client", builder.maxThreadPoolSize, minThreads,
this.headers != null && this.headers.containsKey(QoSParams.REQUEST_SOURCE) && this.headers.get(QoSParams.REQUEST_SOURCE).equals(QoSParams.INTERNAL) ? 3000 : 5000,
null, -1, null);
@@ -1101,7 +1105,7 @@ public class Http2SolrClient extends SolrClient {
public static class Builder {
- public int maxThreadPoolSize = Integer.getInteger("solr.maxHttp2ClientThreads", 512);
+ public int maxThreadPoolSize = Integer.getInteger("solr.maxHttp2ClientThreads", PROC_COUNT * 2);
public int maxRequestsQueuedPerDestination = 1600;
private Http2SolrClient http2SolrClient;
private SSLConfig sslConfig = defaultSSLConfig;
diff --git a/solr/solrj/src/java/org/apache/solr/common/ParWork.java b/solr/solrj/src/java/org/apache/solr/common/ParWork.java
index 0627e65..42b3827 100644
--- a/solr/solrj/src/java/org/apache/solr/common/ParWork.java
+++ b/solr/solrj/src/java/org/apache/solr/common/ParWork.java
@@ -73,15 +73,15 @@ public class ParWork implements Closeable {
private static volatile ParWorkExecutor EXEC;
- // pretty much don't use it
public static ParWorkExecutor getRootSharedExecutor() {
if (EXEC == null) {
synchronized (ParWork.class) {
if (EXEC == null) {
EXEC = (ParWorkExecutor) getParExecutorService("RootExec",
- Integer.getInteger("solr.rootSharedThreadPoolCoreSize", 15), Integer.MAX_VALUE, 1000,
+ Integer.getInteger("solr.rootSharedThreadPoolCoreSize", 200), Integer.MAX_VALUE, 1000,
new SynchronousQueue());
((ParWorkExecutor)EXEC).enableCloseLock();
+ EXEC.prestartAllCoreThreads();
}
}
}
@@ -89,10 +89,12 @@ public class ParWork implements Closeable {
}
public static void shutdownParWorkExecutor() {
- try {
- shutdownParWorkExecutor(EXEC, true);
- } finally {
- EXEC = null;
+ synchronized (ParWork.class) {
+ try {
+ shutdownParWorkExecutor(EXEC, true);
+ } finally {
+ EXEC = null;
+ }
}
}
@@ -496,7 +498,7 @@ public class ParWork implements Closeable {
Integer minThreads;
Integer maxThreads;
minThreads = 4;
- maxThreads = PROC_COUNT;
+ maxThreads = PROC_COUNT / 2;
exec = getExecutorService(Math.max(minThreads, maxThreads)); // keep alive directly affects how long a worker might
// ((PerThreadExecService)exec).closeLock(true);
// be stuck in poll without an enqueue on shutdown
diff --git a/solr/solrj/src/java/org/apache/solr/common/PerThreadExecService.java b/solr/solrj/src/java/org/apache/solr/common/PerThreadExecService.java
index f8ccd29..4309cbb 100644
--- a/solr/solrj/src/java/org/apache/solr/common/PerThreadExecService.java
+++ b/solr/solrj/src/java/org/apache/solr/common/PerThreadExecService.java
@@ -206,22 +206,20 @@ public class PerThreadExecService extends AbstractExecutorService {
return maxSize;
}
- public boolean checkLoad() {
+ private boolean checkLoad() {
- double ourLoad = ParWork.getSysStats().getTotalUsage();
- if (ourLoad > SysStats.OUR_LOAD_HIGH) {
- if (log.isDebugEnabled()) log.debug("Our cpu usage is too high, run in caller thread {}", ourLoad);
+ double sLoad = sysStats.getSystemLoad();
+
+ if (hiStateLoad(sLoad)) {
return false;
- } else {
- double sLoad = sysStats.getSystemLoad();
- if (sLoad > 1) {
- if (log.isDebugEnabled()) log.debug("System load is too high, run in caller thread {}", sLoad);
- return false;
- }
}
return true;
}
-
+
+ private boolean hiStateLoad(double sLoad) {
+ return sLoad > 0.8d && running.get() > 3;
+ }
+
public void closeLock(boolean lock) {
if (lock) {
closeTracker.enableCloseLock();
diff --git a/solr/solrj/src/java/org/apache/solr/common/cloud/SolrZooKeeper.java b/solr/solrj/src/java/org/apache/solr/common/cloud/SolrZooKeeper.java
index 9338679..f3fc018 100644
--- a/solr/solrj/src/java/org/apache/solr/common/cloud/SolrZooKeeper.java
+++ b/solr/solrj/src/java/org/apache/solr/common/cloud/SolrZooKeeper.java
@@ -104,26 +104,26 @@ public class SolrZooKeeper extends ZooKeeper {
@Override
public void close() {
if (closeTracker != null) closeTracker.close();
-// try {
-// try {
-// RequestHeader h = new RequestHeader();
-// h.setType(ZooDefs.OpCode.closeSession);
-//
-// cnxn.submitRequest(h, null, null, null);
-// } catch (InterruptedException e) {
-// // ignore, close the send/event threads
-// } finally {
-// ZooKeeperExposed zk = new ZooKeeperExposed(this, cnxn);
-// zk.closeCnxn();
-// }
-// } catch (Exception e) {
-// log.warn("Exception closing zookeeper client", e);
-// }
try {
- super.close();
- } catch (InterruptedException e) {
- e.printStackTrace();
+ try {
+ RequestHeader h = new RequestHeader();
+ h.setType(ZooDefs.OpCode.closeSession);
+
+ cnxn.submitRequest(h, null, null, null);
+ } catch (InterruptedException e) {
+ // ignore, close the send/event threads
+ } finally {
+ ZooKeeperExposed zk = new ZooKeeperExposed(this, cnxn);
+ zk.closeCnxn();
+ }
+ } catch (Exception e) {
+ log.warn("Exception closing zookeeper client", e);
}
+// try {
+// super.close();
+// } catch (InterruptedException e) {
+// e.printStackTrace();
+// }
}
}
diff --git a/solr/solrj/src/java/org/apache/solr/common/cloud/ZkStateReader.java b/solr/solrj/src/java/org/apache/solr/common/cloud/ZkStateReader.java
index f4e7dd6..ff463e3 100644
--- a/solr/solrj/src/java/org/apache/solr/common/cloud/ZkStateReader.java
+++ b/solr/solrj/src/java/org/apache/solr/common/cloud/ZkStateReader.java
@@ -968,33 +968,6 @@ public class ZkStateReader implements SolrCloseable, Replica.NodeNameToBaseUrl {
* Get shard leader properties, with retry if none exist.
*/
public Replica getLeaderRetry(String collection, String shard, int timeout, boolean mustBeLive) throws InterruptedException, TimeoutException {
-
- DocCollection coll = getClusterState().getCollectionOrNull(collection);
-
- if (coll != null) {
- Slice slice = coll.getSlice(shard);
- if (slice != null) {
- Replica leader = slice.getLeader();
- boolean valid = false;
- try {
- valid = mustBeLive ? leader != null && isNodeLive(leader.getNodeName()) || zkClient.exists(COLLECTIONS_ZKNODE + "/" + collection + "/leaders/" + slice.getName() + "/leader") : leader != null && isNodeLive(leader.getNodeName());
- } catch (KeeperException e) {
-
- } catch (InterruptedException e) {
-
- }
- if (leader != null && leader.getState() == Replica.State.ACTIVE && valid) {
- return leader;
- }
- Collection<Replica> replicas = slice.getReplicas();
- for (Replica replica : replicas) {
- if ("true".equals(replica.getProperty(LEADER_PROP)) && replica.getState() == Replica.State.ACTIVE && valid) {
- return replica;
- }
- }
- }
- }
-
AtomicReference<Replica> returnLeader = new AtomicReference<>();
try {
waitForState(collection, timeout, TimeUnit.MILLISECONDS, (n, c) -> {
@@ -1002,26 +975,40 @@ public class ZkStateReader implements SolrCloseable, Replica.NodeNameToBaseUrl {
return false;
Slice slice = c.getSlice(shard);
if (slice == null) return false;
+ Replica zkLeader = null;
Replica leader = slice.getLeader();
- boolean valid = false;
- try {
- valid = mustBeLive ? (leader != null && isNodeLive(leader.getNodeName())) ||
- zkClient.exists(COLLECTIONS_ZKNODE + "/" + collection + "/leaders/" + slice.getName() + "/leader") :
- leader != null && isNodeLive(leader.getNodeName());
- } catch (KeeperException e) {
- return false;
- } catch (InterruptedException e) {
- return false;
- }
- if (leader != null && leader.getState() == Replica.State.ACTIVE && valid) {
- returnLeader.set(leader);
- return true;
+ if (leader != null && leader.getState() == Replica.State.ACTIVE) {
+ if (isNodeLive(leader.getNodeName())) {
+ returnLeader.set(leader);
+ return true;
+ }
+
+ if (!mustBeLive) {
+ if (zkLeader == null) {
+ zkLeader = getLeaderProps(collection, shard);
+ }
+ if (zkLeader != null && zkLeader.getName().equals(leader.getName())) {
+ returnLeader.set(leader);
+ return true;
+ }
+ }
}
Collection<Replica> replicas = slice.getReplicas();
for (Replica replica : replicas) {
- if ("true".equals(replica.getProperty(LEADER_PROP)) && replica.getState() == Replica.State.ACTIVE && valid) {
- returnLeader.set(replica);
- return true;
+ if ("true".equals(replica.getProperty(LEADER_PROP)) && replica.getState() == Replica.State.ACTIVE) {
+ if (isNodeLive(replica.getNodeName())) {
+ returnLeader.set(replica);
+ return true;
+ }
+ if (!mustBeLive) {
+ if (zkLeader == null) {
+ zkLeader = getLeaderProps(collection, shard);
+ }
+ if (zkLeader != null && zkLeader.getName().equals(replica.getName())) {
+ returnLeader.set(replica);
+ return true;
+ }
+ }
}
}
@@ -1035,6 +1022,25 @@ public class ZkStateReader implements SolrCloseable, Replica.NodeNameToBaseUrl {
return returnLeader.get();
}
+ public Replica getLeaderProps(final String collection, final String slice) {
+
+ try {
+ byte[] data = zkClient.getData(ZkStateReader.getShardLeadersPath(collection, slice), null, null);
+ ZkCoreNodeProps leaderProps = new ZkCoreNodeProps(ZkNodeProps.load(data));
+ String name = leaderProps.getNodeProps().getStr(ZkStateReader.CORE_NAME_PROP);
+ leaderProps.getNodeProps().getProperties().remove(ZkStateReader.CORE_NAME_PROP);
+ // nocommit - right key for leader name?
+ return new Replica(name, leaderProps.getNodeProps().getProperties(), collection, slice, this);
+
+ } catch (KeeperException.NoNodeException e) {
+ return null;
+ } catch (Exception e) {
+ SolrZkClient.checkInterrupted(e);
+ throw new SolrException(ErrorCode.SERVER_ERROR, e);
+ }
+
+ }
+
/**
* Get path where shard leader properties live in zookeeper.
*/
@@ -1594,6 +1600,8 @@ public class ZkStateReader implements SolrCloseable, Replica.NodeNameToBaseUrl {
work.collect("", () -> {
try {
zk.removeWatches(getCollectionSCNPath(coll), this, WatcherType.Any, true);
+ } catch (KeeperException.NoWatcherException e) {
+
} catch (Exception e) {
log.info("could not remove watch {} {}", e.getClass().getSimpleName(), e.getMessage());
}
@@ -1602,6 +1610,8 @@ public class ZkStateReader implements SolrCloseable, Replica.NodeNameToBaseUrl {
work.collect("", () -> {
try {
zk.removeWatches(getCollectionStateUpdatesPath(coll), watcher, WatcherType.Any, true);
+ } catch (KeeperException.NoWatcherException e) {
+
} catch (Exception e) {
log.info("could not remove watch {} {}", e.getClass().getSimpleName(), e.getMessage());
}
diff --git a/solr/solrj/src/java/org/apache/solr/common/util/SysStats.java b/solr/solrj/src/java/org/apache/solr/common/util/SysStats.java
index 05417d2..75a7a17 100644
--- a/solr/solrj/src/java/org/apache/solr/common/util/SysStats.java
+++ b/solr/solrj/src/java/org/apache/solr/common/util/SysStats.java
@@ -19,8 +19,8 @@ public class SysStats extends Thread {
private static final Logger log = LoggerFactory
.getLogger(MethodHandles.lookup().lookupClass());
- public static final double OUR_LOAD_HIGH = 1.0;
- public static final long REFRESH_INTERVAL = TimeUnit.NANOSECONDS.convert(5000, TimeUnit.MILLISECONDS);
+ public static final double OUR_LOAD_HIGH = 3.0;
+ public static final long REFRESH_INTERVAL = TimeUnit.NANOSECONDS.convert(2500, TimeUnit.MILLISECONDS);
public static final int PROC_COUNT = ManagementFactory.getOperatingSystemMXBean().getAvailableProcessors();
private final long refreshIntervalMs;
@@ -89,12 +89,12 @@ public class SysStats extends Thread {
threadTime.setLast(threadBean.getThreadCpuTime(threadTime.getId()));
}
- double load = ManagementFactory.getOperatingSystemMXBean().getSystemLoadAverage();
- if (load < 0) {
- log.warn("SystemLoadAverage not supported on this JVM");
- } else {
- sysLoad = load / (double) PROC_COUNT;
- }
+// double load = ManagementFactory.getOperatingSystemMXBean().getSystemLoadAverage();
+// if (load < 0) {
+// log.warn("SystemLoadAverage not supported on this JVM");
+// } else {
+// sysLoad = load / (double) PROC_COUNT;
+// }
} else {
double load = ManagementFactory.getOperatingSystemMXBean().getSystemLoadAverage();
@@ -162,6 +162,13 @@ public class SysStats extends Thread {
}
public double getSystemLoad() {
+ double sysLoad = -1;
+ double load = ManagementFactory.getOperatingSystemMXBean().getSystemLoadAverage();
+ if (load < 0) {
+ log.warn("SystemLoadAverage not supported on this JVM");
+ } else {
+ sysLoad = load / (double) PROC_COUNT;
+ }
return sysLoad;
}
diff --git a/solr/solrj/src/java/org/apache/zookeeper/ZooKeeperExposed.java b/solr/solrj/src/java/org/apache/zookeeper/ZooKeeperExposed.java
index 01d144c..e890259 100644
--- a/solr/solrj/src/java/org/apache/zookeeper/ZooKeeperExposed.java
+++ b/solr/solrj/src/java/org/apache/zookeeper/ZooKeeperExposed.java
@@ -64,7 +64,7 @@ public class ZooKeeperExposed {
clientCnxn.sendThread.close();
try {
- clientCnxn.sendThread.join(20);
+ clientCnxn.sendThread.join(50);
} catch (InterruptedException e) {
}