You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@lucene.apache.org by ma...@apache.org on 2021/02/10 01:43:47 UTC
[lucene-solr] 03/09: @1330 Finishing up dist updates.
This is an automated email from the ASF dual-hosted git repository.
markrmiller pushed a commit to branch reference_impl_dev
in repository https://gitbox.apache.org/repos/asf/lucene-solr.git
commit ac03d338c5e9a5d1b9d64c8c961944b638ccab65
Author: markrmiller@gmail.com <ma...@gmail.com>
AuthorDate: Thu Feb 4 01:41:19 2021 -0600
@1330 Finishing up dist updates.
# Conflicts:
# solr/core/src/java/org/apache/solr/core/CoreContainer.java
# solr/core/src/test/org/apache/solr/cloud/TestTolerantUpdateProcessorRandomCloud.java
---
.../extraction/ExtractingDocumentLoader.java | 2 +-
.../src/java/org/apache/solr/cloud/Overseer.java | 9 +-
.../org/apache/solr/cloud/OverseerTaskQueue.java | 3 +-
.../cloud/api/collections/DeleteCollectionCmd.java | 12 +-
.../apache/solr/cloud/overseer/ZkStateWriter.java | 4 +-
.../java/org/apache/solr/core/CoreContainer.java | 19 +-
.../src/java/org/apache/solr/core/SolrCore.java | 6 +-
.../apache/solr/handler/loader/CSVLoaderBase.java | 2 +-
.../apache/solr/handler/loader/JavabinLoader.java | 8 +-
.../org/apache/solr/request/SolrRequestInfo.java | 3 +
.../org/apache/solr/update/AddUpdateCommand.java | 8 +-
.../org/apache/solr/update/DocumentBuilder.java | 1 +
.../src/java/org/apache/solr/update/PeerSync.java | 21 +-
.../org/apache/solr/update/SolrCmdDistributor.java | 1 -
.../org/apache/solr/update/TimedVersionBucket.java | 3 +-
.../src/java/org/apache/solr/update/UpdateLog.java | 145 +++++---
.../java/org/apache/solr/update/VersionBucket.java | 41 ++-
.../processor/DistributedUpdateProcessor.java | 409 +++++++++++----------
.../processor/DistributedZkUpdateProcessor.java | 28 +-
.../processor/RunUpdateProcessorFactory.java | 2 +-
.../apache/solr/cloud/CollectionsAPISolrJTest.java | 2 +
.../solr/update/TestExceedMaxTermLength.java | 2 +-
.../processor/DistributedUpdateProcessorTest.java | 13 +-
.../src/java/org/apache/solr/common/ParWork.java | 2 +-
.../apache/solr/common/ToleratedUpdateError.java | 2 +-
.../apache/solr/common/cloud/ZkStateReader.java | 2 +-
.../solr/common/util/SolrQueuedThreadPool.java | 1 -
solr/solrj/src/test-files/log4j2.xml | 1 +
.../src/java/org/apache/solr/SolrTestCase.java | 3 +-
29 files changed, 452 insertions(+), 303 deletions(-)
diff --git a/solr/contrib/extraction/src/java/org/apache/solr/handler/extraction/ExtractingDocumentLoader.java b/solr/contrib/extraction/src/java/org/apache/solr/handler/extraction/ExtractingDocumentLoader.java
index 2f73d2b..585d9bd 100644
--- a/solr/contrib/extraction/src/java/org/apache/solr/handler/extraction/ExtractingDocumentLoader.java
+++ b/solr/contrib/extraction/src/java/org/apache/solr/handler/extraction/ExtractingDocumentLoader.java
@@ -127,13 +127,13 @@ public class ExtractingDocumentLoader extends ContentStreamLoader {
*/
void doAdd(SolrContentHandler handler, AddUpdateCommand template)
throws IOException {
- template.solrDoc = handler.newDocument();
processor.processAdd(template);
}
void addDoc(SolrContentHandler handler) throws IOException {
templateAdd.clear();
templateAdd.setReq(req);
+ templateAdd.solrDoc = handler.newDocument();
templateAdd.overwrite = overwrite;
templateAdd.commitWithin = commitWithin;
doAdd(handler, templateAdd);
diff --git a/solr/core/src/java/org/apache/solr/cloud/Overseer.java b/solr/core/src/java/org/apache/solr/cloud/Overseer.java
index 23a91cd..b1a4b61 100644
--- a/solr/core/src/java/org/apache/solr/cloud/Overseer.java
+++ b/solr/core/src/java/org/apache/solr/cloud/Overseer.java
@@ -56,6 +56,7 @@ import org.apache.zookeeper.CreateMode;
import org.apache.zookeeper.KeeperException;
import org.apache.zookeeper.WatchedEvent;
import org.apache.zookeeper.Watcher;
+import org.eclipse.jetty.util.BlockingArrayQueue;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
@@ -294,15 +295,15 @@ public class Overseer implements SolrCloseable {
// stateManagmentExecutor = ParWork.getParExecutorService("stateManagmentExecutor",
// 1, 1, 3000, new SynchronousQueue());
taskExecutor = (ParWorkExecutor) ParWork.getParExecutorService("overseerTaskExecutor",
- 4, SysStats.PROC_COUNT, 1000, new LinkedBlockingQueue<>(1024));
+ 4, SysStats.PROC_COUNT, 1000, new BlockingArrayQueue<>(32, 64));
for (int i = 0; i < 4; i++) {
- taskExecutor.submit(() -> {});
+ taskExecutor.prestartCoreThread();
}
zkWriterExecutor = (ParWorkExecutor) ParWork.getParExecutorService("overseerZkWriterExecutor",
- 4, SysStats.PROC_COUNT, 1000, new LinkedBlockingQueue<>(1024));
+ 4, SysStats.PROC_COUNT, 1000, new BlockingArrayQueue<>(64, 128));
for (int i = 0; i < 4; i++) {
- zkWriterExecutor.submit(() -> {});
+ zkWriterExecutor.prestartCoreThread();
}
if (overseerOnlyClient == null && !closeAndDone && !initedHttpClient) {
diff --git a/solr/core/src/java/org/apache/solr/cloud/OverseerTaskQueue.java b/solr/core/src/java/org/apache/solr/cloud/OverseerTaskQueue.java
index dc7d389..ebe1d30 100644
--- a/solr/core/src/java/org/apache/solr/cloud/OverseerTaskQueue.java
+++ b/solr/core/src/java/org/apache/solr/cloud/OverseerTaskQueue.java
@@ -264,7 +264,7 @@ public class OverseerTaskQueue extends ZkDistributedQueue {
public QueueEvent offer(byte[] data, long timeout) throws KeeperException,
InterruptedException {
if (log.isDebugEnabled()) log.debug("offer operation to the Overseeer queue {}", Utils.fromJSON(data));
- log.info("offer operation to the Overseeer queue {}", Utils.fromJSON(data));
+
if (shuttingDown.get()) {
throw new SolrException(SolrException.ErrorCode.CONFLICT,"Solr is shutting down, no more overseer tasks may be offered");
}
@@ -276,7 +276,6 @@ public class OverseerTaskQueue extends ZkDistributedQueue {
String watchID = createResponseNode();
if (log.isDebugEnabled()) log.debug("watchId for response node {}, setting a watch ... ", watchID);
- log.info("watchId for response node {}, setting a watch ... ", watchID);
watcher = new LatchWatcher(Watcher.Event.EventType.NodeDataChanged, watchID, zookeeper);
Stat stat = zookeeper.exists(watchID, watcher, true);
diff --git a/solr/core/src/java/org/apache/solr/cloud/api/collections/DeleteCollectionCmd.java b/solr/core/src/java/org/apache/solr/cloud/api/collections/DeleteCollectionCmd.java
index b3fefb7..af19a82 100644
--- a/solr/core/src/java/org/apache/solr/cloud/api/collections/DeleteCollectionCmd.java
+++ b/solr/core/src/java/org/apache/solr/cloud/api/collections/DeleteCollectionCmd.java
@@ -192,12 +192,20 @@ public class DeleteCollectionCmd implements OverseerCollectionMessageHandler.Cmd
// make sure it's gone again after cores have been removed
try {
ocmh.overseer.getCoreContainer().getZkController().removeCollectionTerms(collection);
- zkStateReader.getZkClient().clean(ZkStateReader.COLLECTIONS_ZKNODE + "/" + collection);
-
+ } catch (Exception e) {
+ log.error("Exception while trying to remove collection terms", e);
+ }
+ try {
+ // was there a race? let's get after it
+ while (zkStateReader.getZkClient().exists(ZkStateReader.COLLECTIONS_ZKNODE + "/" + collection)) {
+ zkStateReader.getZkClient().clean(ZkStateReader.COLLECTIONS_ZKNODE + "/" + collection);
+ }
} catch (Exception e) {
log.error("Exception while trying to remove collection zknode", e);
}
+
+
AddReplicaCmd.Response response = new AddReplicaCmd.Response();
return response;
}
diff --git a/solr/core/src/java/org/apache/solr/cloud/overseer/ZkStateWriter.java b/solr/core/src/java/org/apache/solr/cloud/overseer/ZkStateWriter.java
index efc6e98..e23a8c1 100644
--- a/solr/core/src/java/org/apache/solr/cloud/overseer/ZkStateWriter.java
+++ b/solr/core/src/java/org/apache/solr/cloud/overseer/ZkStateWriter.java
@@ -190,10 +190,10 @@ public class ZkStateWriter {
for (Map.Entry<String,Object> entry : message.getProperties().entrySet()) {
if (OverseerAction.DOWNNODE.equals(OverseerAction.get(entry.getKey()))) {
- log.info("state cmd entry {} asOverseerCmd={}", entry, OverseerAction.get(entry.getKey()));
+ if (log.isDebugEnabled()) log.debug("state cmd entry {} asOverseerCmd={}", entry, OverseerAction.get(entry.getKey()));
nodeOperation(entry, Replica.State.getShortState(Replica.State.DOWN));
} else if (OverseerAction.RECOVERYNODE.equals(OverseerAction.get(entry.getKey()))) {
- log.info("state cmd entry {} asOverseerCmd={}", entry, OverseerAction.get(entry.getKey()));
+ if (log.isDebugEnabled()) log.debug("state cmd entry {} asOverseerCmd={}", entry, OverseerAction.get(entry.getKey()));
nodeOperation(entry, Replica.State.getShortState(Replica.State.RECOVERING));
}
}
diff --git a/solr/core/src/java/org/apache/solr/core/CoreContainer.java b/solr/core/src/java/org/apache/solr/core/CoreContainer.java
index 6aa6fdb..8d3a844 100644
--- a/solr/core/src/java/org/apache/solr/core/CoreContainer.java
+++ b/solr/core/src/java/org/apache/solr/core/CoreContainer.java
@@ -135,14 +135,13 @@ import java.util.Map;
import java.util.Optional;
import java.util.Properties;
import java.util.Set;
-import java.util.concurrent.ArrayBlockingQueue;
import java.util.concurrent.Callable;
import java.util.concurrent.ConcurrentHashMap;
import java.util.concurrent.ExecutionException;
-import java.util.concurrent.ExecutorService;
import java.util.concurrent.Future;
import java.util.concurrent.LinkedBlockingQueue;
import java.util.concurrent.RejectedExecutionException;
+import java.util.concurrent.ThreadPoolExecutor;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.TimeoutException;
import java.util.concurrent.locks.ReentrantLock;
@@ -192,10 +191,16 @@ public class CoreContainer implements Closeable {
private volatile UpdateShardHandler updateShardHandler;
- public volatile ExecutorService solrCoreExecutor;
+ public final ThreadPoolExecutor solrCoreExecutor = (ThreadPoolExecutor) ParWork.getParExecutorService("Core",
+ 4, Math.max(6, SysStats.PROC_COUNT * 2), 1000, new BlockingArrayQueue<>(256, 256));
- public final ExecutorService coreContainerExecutor = ParWork.getParExecutorService("Core",
- 2, Math.max(6, SysStats.PROC_COUNT), 1000, new ArrayBlockingQueue(256, false));
+ public final ThreadPoolExecutor coreContainerExecutor = (ThreadPoolExecutor) ParWork.getParExecutorService("Core",
+ 8, SysStats.PROC_COUNT, 1000, new BlockingArrayQueue<>(256, 256));
+
+ {
+ solrCoreExecutor.prestartAllCoreThreads();
+ coreContainerExecutor.prestartAllCoreThreads();
+ }
private final OrderedExecutor replayUpdatesExecutor;
@@ -371,7 +376,7 @@ public class CoreContainer implements Closeable {
this.replayUpdatesExecutor = new OrderedExecutor(cfg.getReplayUpdatesThreads(),
ParWork.getParExecutorService("replayUpdatesExecutor", cfg.getReplayUpdatesThreads(), cfg.getReplayUpdatesThreads(),
- 3000, new LinkedBlockingQueue<>(cfg.getReplayUpdatesThreads())));
+ 1000, new LinkedBlockingQueue<>(cfg.getReplayUpdatesThreads())));
metricManager = new SolrMetricManager(loader, cfg.getMetricsConfig());
String registryName = SolrMetricManager.getRegistryName(SolrInfoBean.Group.node);
@@ -410,8 +415,6 @@ public class CoreContainer implements Closeable {
containerProperties.putAll(cfg.getSolrProperties());
- solrCoreExecutor = ParWork.getParExecutorService("Core",
- 4, Math.max(6, SysStats.PROC_COUNT * 2), 1000, new BlockingArrayQueue(64, 16));
}
@SuppressWarnings({"unchecked"})
diff --git a/solr/core/src/java/org/apache/solr/core/SolrCore.java b/solr/core/src/java/org/apache/solr/core/SolrCore.java
index 7529e0a..55ec855 100644
--- a/solr/core/src/java/org/apache/solr/core/SolrCore.java
+++ b/solr/core/src/java/org/apache/solr/core/SolrCore.java
@@ -1031,18 +1031,16 @@ public final class SolrCore implements SolrInfoBean, Closeable {
updateHandler.getSolrCoreState().increfSolrCoreState();
}
- IndexSchema schema = configSet.getIndexSchema();
-
CoreDescriptor cd = Objects.requireNonNull(coreDescriptor, "coreDescriptor cannot be null");
setName(name);
this.solrConfig = configSet.getSolrConfig();
+ IndexSchema schema = configSet.getIndexSchema();
+ setLatestSchema(schema);
this.resourceLoader = configSet.getSolrConfig().getResourceLoader();
this.configSetProperties = configSet.getProperties();
- setLatestSchema(schema);
-
// Initialize the RestManager
StopWatch initRestManager = new StopWatch(this + "-initRestManager");
restManager = initRestManager(cd);
diff --git a/solr/core/src/java/org/apache/solr/handler/loader/CSVLoaderBase.java b/solr/core/src/java/org/apache/solr/handler/loader/CSVLoaderBase.java
index 3824917..97adeb5 100644
--- a/solr/core/src/java/org/apache/solr/handler/loader/CSVLoaderBase.java
+++ b/solr/core/src/java/org/apache/solr/handler/loader/CSVLoaderBase.java
@@ -378,7 +378,7 @@ abstract class CSVLoaderBase extends ContentStreamLoader {
}
try {
executor.shutdown();
- executor.awaitTermination(365, TimeUnit.DAYS);
+ executor.awaitTermination(365, TimeUnit.DAYS); // MRM TODO:
} catch (InterruptedException e) {
ParWork.propagateInterrupt(e);
throw new SolrException(SolrException.ErrorCode.SERVER_ERROR, e);
diff --git a/solr/core/src/java/org/apache/solr/handler/loader/JavabinLoader.java b/solr/core/src/java/org/apache/solr/handler/loader/JavabinLoader.java
index d80e065..762e2b7 100644
--- a/solr/core/src/java/org/apache/solr/handler/loader/JavabinLoader.java
+++ b/solr/core/src/java/org/apache/solr/handler/loader/JavabinLoader.java
@@ -89,9 +89,9 @@ public class JavabinLoader extends ContentStreamLoader {
if (document == null) {
return;
}
-
- AddUpdateCommand addCmd = getAddCommand(req, updateRequest.getParams());
-
+
+ AddUpdateCommand addCmd = getAddCommand(req, updateRequest.getParams());
+
addCmd.solrDoc = document;
if (commitWithin != null) {
addCmd.commitWithin = commitWithin;
@@ -133,8 +133,6 @@ public class JavabinLoader extends ContentStreamLoader {
SolrParams old = req.getParams();
try (JavaBinCodec jbc = new JavaBinCodec() {
SolrParams params;
- AddUpdateCommand addCmd = null;
-
@Override
public List<Object> readIterator(DataInputInputStream fis) throws IOException {
while (true) {
diff --git a/solr/core/src/java/org/apache/solr/request/SolrRequestInfo.java b/solr/core/src/java/org/apache/solr/request/SolrRequestInfo.java
index 5d6a73b..bfa352b 100644
--- a/solr/core/src/java/org/apache/solr/request/SolrRequestInfo.java
+++ b/solr/core/src/java/org/apache/solr/request/SolrRequestInfo.java
@@ -32,6 +32,7 @@ import org.apache.solr.common.util.ExecutorUtil;
import org.apache.solr.handler.component.ResponseBuilder;
import org.apache.solr.response.SolrQueryResponse;
import org.apache.solr.servlet.SolrDispatchFilter;
+import org.apache.solr.update.AddUpdateCommand;
import org.apache.solr.util.TimeZoneUtils;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
@@ -81,6 +82,8 @@ public class SolrRequestInfo {
}
} finally {
threadLocal.remove();
+ AddUpdateCommand.THREAD_LOCAL_AddUpdateCommand.get().clearAll();
+ AddUpdateCommand.THREAD_LOCAL_AddUpdateCommand_TLOG.get().clearAll();
}
}
diff --git a/solr/core/src/java/org/apache/solr/update/AddUpdateCommand.java b/solr/core/src/java/org/apache/solr/update/AddUpdateCommand.java
index 9bd2a31..70623f2 100644
--- a/solr/core/src/java/org/apache/solr/update/AddUpdateCommand.java
+++ b/solr/core/src/java/org/apache/solr/update/AddUpdateCommand.java
@@ -102,11 +102,15 @@ public class AddUpdateCommand extends UpdateCommand {
isLastDocInBatch = false;
version = 0;
prevVersion = -1;
- req = null;
overwrite = true;
commitWithin = -1;
}
+ public void clearAll() {
+ clear();
+ req = null;
+ }
+
public SolrInputDocument getSolrInputDocument() {
return solrDoc;
}
@@ -128,7 +132,7 @@ public class AddUpdateCommand extends UpdateCommand {
/** Returns the indexed ID for this document. The returned BytesRef is retained across multiple calls, and should not be modified. */
public BytesRef getIndexedId() {
- if (indexedId == null) {
+ if (indexedId == null && req != null) {
IndexSchema schema = req.getSchema();
SchemaField sf = schema.getUniqueKeyField();
if (sf != null) {
diff --git a/solr/core/src/java/org/apache/solr/update/DocumentBuilder.java b/solr/core/src/java/org/apache/solr/update/DocumentBuilder.java
index 03c83f0..ee2debf 100644
--- a/solr/core/src/java/org/apache/solr/update/DocumentBuilder.java
+++ b/solr/core/src/java/org/apache/solr/update/DocumentBuilder.java
@@ -123,6 +123,7 @@ public class DocumentBuilder {
* @return Built Lucene document
*/
public static Document toDocument(SolrInputDocument doc, IndexSchema schema, boolean forInPlaceUpdate, boolean ignoreNestedDocs) {
+ if (doc == null) throw new IllegalArgumentException("SolrInputDocument cannot be null");
if (!ignoreNestedDocs && doc.hasChildDocuments()) {
throw unexpectedNestedDocException(schema, forInPlaceUpdate);
}
diff --git a/solr/core/src/java/org/apache/solr/update/PeerSync.java b/solr/core/src/java/org/apache/solr/update/PeerSync.java
index a83ae95..535d955 100644
--- a/solr/core/src/java/org/apache/solr/update/PeerSync.java
+++ b/solr/core/src/java/org/apache/solr/update/PeerSync.java
@@ -634,10 +634,14 @@ public class PeerSync implements SolrMetricProducer {
{
// byte[] idBytes = (byte[]) entry.get(2);
SolrInputDocument sdoc = (SolrInputDocument)entry.get(entry.size()-1);
- AddUpdateCommand cmd = new AddUpdateCommand(req);
+ AddUpdateCommand cmd = AddUpdateCommand.THREAD_LOCAL_AddUpdateCommand_TLOG.get();
+ cmd.clear();
// cmd.setIndexedId(new BytesRef(idBytes));
cmd.solrDoc = sdoc;
+ cmd.setReq(req);
cmd.setVersion(version);
+ BytesRef indexedId = UpdateLog.getIndexedId(sdoc, req);
+ cmd.setIndexedId(indexedId);
cmd.setFlags(UpdateCommand.PEER_SYNC | UpdateCommand.IGNORE_AUTOCOMMIT);
if (debug) {
log.debug("{} add {} id {}", logPrefix, cmd, sdoc.getField(ID));
@@ -676,12 +680,19 @@ public class PeerSync implements SolrMetricProducer {
}
case UpdateLog.UPDATE_INPLACE:
{
- AddUpdateCommand cmd = UpdateLog.convertTlogEntryToAddUpdateCommand(req, entry, oper, version, null);
- cmd.setFlags(UpdateCommand.PEER_SYNC | UpdateCommand.IGNORE_AUTOCOMMIT);
+
+ SolrInputDocument sdoc = (SolrInputDocument) entry.get(entry.size() - 1);
+ Long prevVersion = (Long) entry.get(UpdateLog.PREV_VERSION_IDX);
+ AddUpdateCommand ucmd = AddUpdateCommand.THREAD_LOCAL_AddUpdateCommand_TLOG.get();
+ ucmd.clear();
+ ucmd.setReq(req);
+ ucmd.setFlags(UpdateCommand.PEER_SYNC | UpdateCommand.IGNORE_AUTOCOMMIT);
+ UpdateLog.convertTlogEntryToAddUpdateCommand(req, sdoc, oper, prevVersion, version, ucmd);
+
if (debug) {
- log.debug("{} inplace update {} prevVersion={} doc={}", logPrefix, cmd, cmd.prevVersion, cmd.solrDoc);
+ log.debug("{} inplace update {} prevVersion={} doc={}", logPrefix, ucmd, ucmd.prevVersion, ucmd.solrDoc);
}
- proc.processAdd(cmd);
+ proc.processAdd(ucmd);
break;
}
diff --git a/solr/core/src/java/org/apache/solr/update/SolrCmdDistributor.java b/solr/core/src/java/org/apache/solr/update/SolrCmdDistributor.java
index c4e64aa..207b4cc 100644
--- a/solr/core/src/java/org/apache/solr/update/SolrCmdDistributor.java
+++ b/solr/core/src/java/org/apache/solr/update/SolrCmdDistributor.java
@@ -290,7 +290,6 @@ public class SolrCmdDistributor implements Closeable {
log.error("Exception sending dist update {} {}", code, t);
cancels.remove(cancelIndex);
-
// nocommit - we want to prevent any more from this request
// to go just to this node rather than stop the whole request
if (code == 404) {
diff --git a/solr/core/src/java/org/apache/solr/update/TimedVersionBucket.java b/solr/core/src/java/org/apache/solr/update/TimedVersionBucket.java
index abeca02..f050330 100644
--- a/solr/core/src/java/org/apache/solr/update/TimedVersionBucket.java
+++ b/solr/core/src/java/org/apache/solr/update/TimedVersionBucket.java
@@ -22,6 +22,7 @@ import java.util.concurrent.locks.Condition;
import java.util.concurrent.locks.Lock;
import java.util.concurrent.locks.ReentrantLock;
+import org.apache.lucene.util.BytesRef;
import org.apache.solr.common.ParWork;
import org.apache.solr.common.SolrException;
@@ -40,7 +41,7 @@ public class TimedVersionBucket extends VersionBucket {
* <code>lockTimeoutMs</code>.
*/
@Override
- public <T,R> R runWithLock(int lockTimeoutMs, CheckedFunction<T,R> function) throws IOException {
+ public <T,R> R runWithLock(int lockTimeoutMs, CheckedFunction<T,R> function, BytesRef idBytes) throws IOException {
boolean success = false;
try {
diff --git a/solr/core/src/java/org/apache/solr/update/UpdateLog.java b/solr/core/src/java/org/apache/solr/update/UpdateLog.java
index 0eff8ea..4405429 100644
--- a/solr/core/src/java/org/apache/solr/update/UpdateLog.java
+++ b/solr/core/src/java/org/apache/solr/update/UpdateLog.java
@@ -52,12 +52,14 @@ import com.codahale.metrics.Meter;
import org.apache.commons.lang3.concurrent.ConcurrentUtils;
import org.apache.hadoop.fs.FileSystem;
import org.apache.lucene.util.BytesRef;
+import org.apache.lucene.util.BytesRefBuilder;
import org.apache.solr.common.AlreadyClosedException;
import org.apache.solr.common.ParWork;
import org.apache.solr.common.SolrDocumentBase;
import org.apache.solr.common.SolrException;
import org.apache.solr.common.SolrException.ErrorCode;
import org.apache.solr.common.SolrInputDocument;
+import org.apache.solr.common.SolrInputField;
import org.apache.solr.common.params.ModifiableSolrParams;
import org.apache.solr.common.params.SolrParams;
import org.apache.solr.common.util.ObjectReleaseTracker;
@@ -71,6 +73,8 @@ import org.apache.solr.request.LocalSolrQueryRequest;
import org.apache.solr.request.SolrQueryRequest;
import org.apache.solr.request.SolrRequestInfo;
import org.apache.solr.response.SolrQueryResponse;
+import org.apache.solr.schema.IndexSchema;
+import org.apache.solr.schema.SchemaField;
import org.apache.solr.search.SolrIndexSearcher;
import org.apache.solr.update.processor.DistributedUpdateProcessor;
import org.apache.solr.update.processor.UpdateRequestProcessor;
@@ -125,6 +129,30 @@ public class UpdateLog implements PluginInfoInitialized, SolrMetricProducer {
}
}
+ public static BytesRef getIndexedId(SolrInputDocument solrDoc, SolrQueryRequest req) {
+ BytesRef indexedId = null;
+ if (req != null) {
+ IndexSchema schema = req.getSchema();
+ SchemaField sf = schema.getUniqueKeyField();
+ if (sf != null) {
+ if (solrDoc != null) {
+ SolrInputField field = solrDoc.getField(sf.getName());
+ int count = field == null ? 0 : field.getValueCount();
+ if (count == 0) {
+
+ } else if (count > 1) {
+ throw new SolrException(SolrException.ErrorCode.BAD_REQUEST, "Document contains multiple values for uniqueKey field: " + field);
+ } else {
+ BytesRefBuilder b = new BytesRefBuilder();
+ sf.getType().readableToIndexed(field.getFirstValue().toString(), b);
+ indexedId = b.get();
+ }
+ }
+ }
+ }
+ return indexedId;
+ }
+
// NOTE: when adding new states make sure to keep existing numbers, because external metrics
// monitoring may depend on these values being stable.
public enum State { REPLAYING(0), BUFFERING(1), APPLYING_BUFFERED(2), ACTIVE(3);
@@ -1378,7 +1406,16 @@ public class UpdateLog implements PluginInfoInitialized, SolrMetricProducer {
switch (oper) {
case UpdateLog.UPDATE_INPLACE:
case UpdateLog.ADD: {
- AddUpdateCommand cmd = convertTlogEntryToAddUpdateCommand(req, entry, oper, version, null);
+
+ SolrInputDocument sdoc = (SolrInputDocument) entry.get(entry.size()-1);
+ Long prevVersion = null;
+ if (oper == UPDATE_INPLACE) {
+ prevVersion = (Long) entry.get(UpdateLog.PREV_VERSION_IDX);
+ }
+ AddUpdateCommand cmd = AddUpdateCommand.THREAD_LOCAL_AddUpdateCommand_TLOG.get();
+ cmd.clear();
+ cmd.setReq(req);
+ convertTlogEntryToAddUpdateCommand(req, sdoc, oper, prevVersion, version, cmd);
cmd.setFlags(UpdateCommand.IGNORE_AUTOCOMMIT);
add(cmd);
break;
@@ -1475,7 +1512,7 @@ public class UpdateLog implements PluginInfoInitialized, SolrMetricProducer {
if (writeCommit) {
// record a commit
if (log.isDebugEnabled()) log.debug("Recording current closed for {} log={}", uhandler.core, theLog);
- CommitUpdateCommand cmd = new CommitUpdateCommand(new LocalSolrQueryRequest(uhandler.core, new ModifiableSolrParams()), false);
+ CommitUpdateCommand cmd = new CommitUpdateCommand(new LocalSolrQueryRequest(uhandler.core, new ModifiableSolrParams((SolrParams)null)), false);
theLog.writeCommit(cmd);
}
@@ -1942,7 +1979,6 @@ public class UpdateLog implements PluginInfoInitialized, SolrMetricProducer {
SolrException.log(log, e);
} finally {
// change the state while updates are still blocked to prevent races
- AddUpdateCommand.THREAD_LOCAL_AddUpdateCommand_TLOG.get().clear();
state = State.ACTIVE;
if (finishing) {
@@ -1969,7 +2005,6 @@ public class UpdateLog implements PluginInfoInitialized, SolrMetricProducer {
SolrRequestInfo.clearRequestInfo();
}
-
public void doReplay(TransactionLog translog) {
UpdateRequestProcessor proc = null;
try {
@@ -2006,9 +2041,7 @@ public class UpdateLog implements PluginInfoInitialized, SolrMetricProducer {
long cpos = tlogReader.currentPos();
long csize = tlogReader.currentSize();
if (log.isInfoEnabled()) {
- loglog.info(
- "log replay status {} active={} starting pos={} current pos={} current size={} % read={}",
- translog, activeLog, recoveryInfo.positionOfStart, cpos, csize,
+ loglog.info("log replay status {} active={} starting pos={} current pos={} current size={} % read={}", translog, activeLog, recoveryInfo.positionOfStart, cpos, csize,
Math.floor(cpos / (double) csize * 100.));
}
@@ -2054,8 +2087,7 @@ public class UpdateLog implements PluginInfoInitialized, SolrMetricProducer {
try {
// should currently be a List<Oper,Ver,Doc/Id>
- @SuppressWarnings({"rawtypes"})
- List entry = (List) o;
+ @SuppressWarnings({"rawtypes"}) List entry = (List) o;
operationAndFlags = (Integer) entry.get(UpdateLog.FLAGS_IDX);
int oper = operationAndFlags & OPERATION_MASK;
long version = (Long) entry.get(UpdateLog.VERSION_IDX);
@@ -2064,15 +2096,17 @@ public class UpdateLog implements PluginInfoInitialized, SolrMetricProducer {
case UpdateLog.UPDATE_INPLACE: // fall through to ADD
case UpdateLog.ADD: {
recoveryInfo.adds++;
- AddUpdateCommand cmd = AddUpdateCommand.THREAD_LOCAL_AddUpdateCommand_TLOG.get();
- cmd.clear();
- cmd = convertTlogEntryToAddUpdateCommand(req, entry, oper, version, cmd);
- if (cmd.solrDoc == null) {
- throw new SolrException(ErrorCode.SERVER_ERROR, "No SolrInputDocument could be read");
+
+ Long prevVersion = null;
+ if (oper == UPDATE_INPLACE) {
+ prevVersion = (Long) entry.get(UpdateLog.PREV_VERSION_IDX);
}
- cmd.setFlags(UpdateCommand.REPLAY | UpdateCommand.IGNORE_AUTOCOMMIT);
- if (debug) log.debug("{} {}", oper == ADD ? "add" : "update", cmd);
- execute(cmd, executor, pendingTasks, proc, exceptionOnExecuteUpdate);
+
+ if (debug) log.debug("{}", oper == ADD ? "add" : "update");
+
+ AddUpdateCommand cmd = AddUpdateCommand.THREAD_LOCAL_AddUpdateCommand_TLOG.get();
+ cmd.clear();
+ execute(cmd, executor, pendingTasks, proc, exceptionOnExecuteUpdate, req, (SolrInputDocument) entry.get(entry.size() - 1), oper, version, prevVersion);
break;
}
case UpdateLog.DELETE: {
@@ -2083,7 +2117,7 @@ public class UpdateLog implements PluginInfoInitialized, SolrMetricProducer {
cmd.setVersion(version);
cmd.setFlags(UpdateCommand.REPLAY | UpdateCommand.IGNORE_AUTOCOMMIT);
if (debug) log.debug("delete {}", cmd);
- execute(cmd, executor, pendingTasks, proc, exceptionOnExecuteUpdate);
+ execute(cmd, executor, pendingTasks, proc, exceptionOnExecuteUpdate, req, null, oper, version, null);
break;
}
@@ -2097,7 +2131,7 @@ public class UpdateLog implements PluginInfoInitialized, SolrMetricProducer {
if (debug) log.debug("deleteByQuery {}", cmd);
waitForAllUpdatesGetExecuted(executor, pendingTasks);
// DBQ will be executed in the same thread
- execute(cmd, null, pendingTasks, proc, exceptionOnExecuteUpdate);
+ execute(cmd, null, pendingTasks, proc, exceptionOnExecuteUpdate, req, null, oper, version, null);
break;
}
case UpdateLog.COMMIT: {
@@ -2177,6 +2211,13 @@ public class UpdateLog implements PluginInfoInitialized, SolrMetricProducer {
}
}
+ private Integer getBucketHash(BytesRef idBytes) {
+
+ if (idBytes == null) return null;
+ return DistributedUpdateProcessor.bucketHash(idBytes);
+
+ }
+
private Integer getBucketHash(UpdateCommand cmd) {
if (cmd instanceof AddUpdateCommand) {
BytesRef idBytes = ((AddUpdateCommand)cmd).getIndexedId();
@@ -2193,21 +2234,39 @@ public class UpdateLog implements PluginInfoInitialized, SolrMetricProducer {
return null;
}
- private Future execute(UpdateCommand cmd, OrderedExecutor executor,
- LongAdder pendingTasks, UpdateRequestProcessor proc,
- AtomicReference<SolrException> exceptionHolder) {
- assert cmd instanceof AddUpdateCommand || cmd instanceof DeleteUpdateCommand;
+ private Future execute(UpdateCommand ucmd, OrderedExecutor executor, LongAdder pendingTasks, UpdateRequestProcessor proc, AtomicReference<SolrException> exceptionHolder, SolrQueryRequest req,
+ SolrInputDocument doc, int operation, long version, Long prevVersion) {
+ assert ucmd instanceof AddUpdateCommand || ucmd instanceof DeleteUpdateCommand || ucmd == null;
+
+ BytesRef indexedId = null;
+ Integer hash;
+ if (ucmd instanceof AddUpdateCommand) {
+ indexedId = getIndexedId(doc, req);
+ hash = getBucketHash(indexedId);
+ } else {
+ hash = getBucketHash(ucmd);
+ }
+
+
+ BytesRef finalIndexedId = indexedId;
if (executor != null) {
// by using the same hash as DUP, independent updates can avoid waiting for same bucket
- return executor.submit(getBucketHash(cmd), () -> {
+
+ return executor.submit(hash, () -> {
try {
// fail fast
if (exceptionHolder.get() != null) return;
- if (cmd instanceof AddUpdateCommand) {
- proc.processAdd((AddUpdateCommand) cmd);
+ if (ucmd instanceof AddUpdateCommand) {
+
+ convertTlogEntryToAddUpdateCommand(req, doc, operation, prevVersion, version, (AddUpdateCommand) ucmd);
+ ((AddUpdateCommand) ucmd).setIndexedId(finalIndexedId);
+ ucmd.setReq(req);
+ ucmd.setFlags(UpdateCommand.REPLAY | UpdateCommand.IGNORE_AUTOCOMMIT);
+
+ proc.processAdd((AddUpdateCommand) ucmd);
} else {
- proc.processDelete((DeleteUpdateCommand) cmd);
+ proc.processDelete((DeleteUpdateCommand) ucmd);
}
} catch (IOException e) {
recoveryInfo.errors++;
@@ -2227,10 +2286,15 @@ public class UpdateLog implements PluginInfoInitialized, SolrMetricProducer {
});
} else {
try {
- if (cmd instanceof AddUpdateCommand) {
- proc.processAdd((AddUpdateCommand) cmd);
+ if (ucmd instanceof AddUpdateCommand) {
+ convertTlogEntryToAddUpdateCommand(req, doc, operation, prevVersion, version, (AddUpdateCommand) ucmd);
+ ((AddUpdateCommand) ucmd).setIndexedId(finalIndexedId);
+ ucmd.setReq(req);
+ ucmd.setFlags(UpdateCommand.REPLAY | UpdateCommand.IGNORE_AUTOCOMMIT);
+
+ proc.processAdd((AddUpdateCommand) ucmd);
} else {
- proc.processDelete((DeleteUpdateCommand) cmd);
+ proc.processDelete((DeleteUpdateCommand) ucmd);
}
} catch (IOException e) {
recoveryInfo.errors++;
@@ -2246,8 +2310,6 @@ public class UpdateLog implements PluginInfoInitialized, SolrMetricProducer {
}
return ConcurrentUtils.constantFuture(null);
}
-
-
}
/**
@@ -2255,28 +2317,25 @@ public class UpdateLog implements PluginInfoInitialized, SolrMetricProducer {
* can be applied to ADD the document or do an UPDATE_INPLACE.
*
* @param req The request to use as the owner of the new AddUpdateCommand
- * @param entry Entry from the transaction log that contains the document to be added
+ * @param sdoc The document to be added
* @param operation The value of the operation flag; this must be either ADD or UPDATE_INPLACE --
* if it is UPDATE_INPLACE then the previous version will also be read from the entry
* @param version Version already obtained from the entry.
*/
- public static AddUpdateCommand convertTlogEntryToAddUpdateCommand(SolrQueryRequest req,
- @SuppressWarnings({"rawtypes"})List entry,
- int operation, long version, AddUpdateCommand cmd) {
- assert operation == UpdateLog.ADD || operation == UpdateLog.UPDATE_INPLACE;
- SolrInputDocument sdoc = (SolrInputDocument) entry.get(entry.size()-1);
+ public static AddUpdateCommand convertTlogEntryToAddUpdateCommand(SolrQueryRequest req, SolrInputDocument sdoc, int operation,
+ Long prevVersion, long version, AddUpdateCommand cmd) {
+ if (!(operation == UpdateLog.ADD || operation == UpdateLog.UPDATE_INPLACE)) {
+ throw new IllegalArgumentException(String.valueOf(cmd));
+ }
if (cmd == null) {
cmd = new AddUpdateCommand(req);
- } else {
- cmd.setReq(req);
}
cmd.solrDoc = sdoc;
cmd.setVersion(version);
-
- if (operation == UPDATE_INPLACE) {
- long prevVersion = (Long) entry.get(UpdateLog.PREV_VERSION_IDX);
+
+ if (prevVersion != null) {
cmd.prevVersion = prevVersion;
}
return cmd;
diff --git a/solr/core/src/java/org/apache/solr/update/VersionBucket.java b/solr/core/src/java/org/apache/solr/update/VersionBucket.java
index 2adb34f..efa442e 100644
--- a/solr/core/src/java/org/apache/solr/update/VersionBucket.java
+++ b/solr/core/src/java/org/apache/solr/update/VersionBucket.java
@@ -17,9 +17,15 @@
package org.apache.solr.update;
import java.io.IOException;
+import java.util.Map;
+import java.util.Set;
+import java.util.concurrent.ConcurrentHashMap;
+import java.util.concurrent.atomic.LongAccumulator;
+import java.util.concurrent.atomic.LongAdder;
import java.util.concurrent.locks.Condition;
import java.util.concurrent.locks.ReentrantLock;
+import org.apache.lucene.util.BytesRef;
import org.apache.solr.common.ParWork;
// TODO: make inner?
@@ -37,6 +43,8 @@ public class VersionBucket {
private final ReentrantLock lock = new ReentrantLock(true);
private final Condition lockCondition = lock.newCondition();
+ private Map<BytesRef,LongAdder> blockedIds = new ConcurrentHashMap<>();
+
public void updateHighest(long val) {
if (highest != 0) {
highest = Math.max(highest, Math.abs(val));
@@ -48,17 +56,44 @@ public class VersionBucket {
R apply() throws IOException;
}
- public <T, R> R runWithLock(int lockTimeoutMs, CheckedFunction<T, R> function) throws IOException {
+ public <T, R> R runWithLock(int lockTimeoutMs, CheckedFunction<T,R> function, BytesRef idBytes) throws IOException {
lock.lock();
try {
+ if (!blockedIds.keySet().contains(idBytes)) {
+ LongAdder adder = new LongAdder();
+ adder.increment();
+ blockedIds.put(idBytes, adder);
+ lock.unlock();
+ } else {
+ LongAdder adder = blockedIds.get(idBytes);
+ adder.increment();
+ }
return function.apply();
} finally {
- lock.unlock();
+ try {
+ if (!lock.isHeldByCurrentThread()) {
+ lock.lock();
+ LongAdder adder = blockedIds.get(idBytes);
+ adder.decrement();
+ if (adder.longValue() == 0L) {
+ blockedIds.remove(idBytes);
+ }
+ }
+ } finally {
+ if (lock.isHeldByCurrentThread()) lock.unlock();
+ }
}
}
public void signalAll() {
- lockCondition.signalAll();
+ if (!lock.isHeldByCurrentThread()) {
+ lock.lock();
+ try {
+ lockCondition.signalAll();
+ } finally {
+ lock.unlock();
+ }
+ }
}
public void awaitNanos(long nanosTimeout) {
diff --git a/solr/core/src/java/org/apache/solr/update/processor/DistributedUpdateProcessor.java b/solr/core/src/java/org/apache/solr/update/processor/DistributedUpdateProcessor.java
index 673ecbe..b97381b 100644
--- a/solr/core/src/java/org/apache/solr/update/processor/DistributedUpdateProcessor.java
+++ b/solr/core/src/java/org/apache/solr/update/processor/DistributedUpdateProcessor.java
@@ -228,104 +228,25 @@ public class DistributedUpdateProcessor extends UpdateRequestProcessor {
// the request right here but for now I think it is better to just return the status
// to the client that the minRf wasn't reached and let them handle it
- boolean dropCmd = false;
if (!forwardToLeader) {
- dropCmd = versionAdd(cmd);
- }
-
- if (vinfo == null) {
- return;
- }
-
- if (dropCmd) {
- // TODO: do we need to add anything to the response?
- if (log.isDebugEnabled()) log.debug("Dropping update {}", cmd.getPrintableId());
- return;
- }
-
- Future<?> distFuture = null;
- AddUpdateCommand cloneCmd = null;
-
- boolean nodist = noDistrib();
- if (!nodist) {
- if (!forwardToLeader) {
- // SolrInputDocument clonedDoc = cmd.solrDoc.deepCopy();
- cloneCmd = (AddUpdateCommand) cmd.clone();
- }
- AddUpdateCommand finalCloneCmd;
- if (cloneCmd != null) {
- finalCloneCmd = cloneCmd;
- } else {
- finalCloneCmd = cmd;
- }
- Callable distCall = () -> {
- if (log.isTraceEnabled()) log.trace("Run distrib add collection");
-
+ Future distFuture = versionAdd(cmd);
+ if (distFuture != null) {
try {
- doDistribAdd(finalCloneCmd);
- if (log.isTraceEnabled()) log.trace("after distrib add collection");
- } catch (Throwable e) {
- return e;
- }
- return null;
- };
-
- if (!forwardToLeader) {
- distFuture = ParWork.getRootSharedExecutor().submit(distCall);
- } else {
- try {
- distCall.call();
- } catch (Exception e) {
- if (e instanceof RuntimeException) {
- throw (RuntimeException) e;
- }
+ distFuture.get();
+ } catch (InterruptedException e) {
+ ParWork.propagateInterrupt(e, true);
throw new SolrException(ErrorCode.SERVER_ERROR, e);
+ } catch (ExecutionException e) {
+ Throwable de = e.getCause();
+ if (de instanceof RuntimeException) {
+ throw (RuntimeException) de;
+ } else {
+ throw new SolrException(ErrorCode.SERVER_ERROR, e);
+ }
}
}
- }
-
- // TODO: possibly set checkDeleteByQueries as a flag on the command?
- // if the update updates a doc that is part of a nested structure,
- // force open a realTimeSearcher to trigger a ulog cache refresh.
- // This refresh makes RTG handler aware of this update.q
-
- if (!forwardToLeader) {
- // TODO: possibly set checkDeleteByQueries as a flag on the command?
- if (log.isTraceEnabled()) log.trace("Local add cmd {}", cmd.solrDoc);
- try {
- doLocalAdd(cmd);
- } catch (Exception e) {
- Throwable t;
- if (e instanceof ExecutionException) {
- t = e.getCause();
- } else {
- t = e;
- }
- if (distFuture != null && isLeader && !forwardToLeader) {
- distFuture.cancel(false);
- cancelCmds.add(cloneCmd);
- }
- if (t instanceof RuntimeException) {
- throw (RuntimeException) t;
- }
- throw new SolrException(ErrorCode.SERVER_ERROR, t);
- }
- // if the update updates a doc that is part of a nested structure,
- // force open a realTimeSearcher to trigger a ulog cache refresh.
- // This refresh makes RTG handler aware of this update.q
- if (ulog != null) {
- if (req.getSchema().isUsableForChildDocs() && shouldRefreshUlogCaches(cmd)) {
- ulog.openRealtimeSearcher();
- }
- }
- }
-
- if (distFuture != null) {
- try {
- Throwable e = (Throwable) distFuture.get();
- } catch (Exception e) {
- log.error("dist of add failed", e);
- }
+ } else {
+ doDistribAdd(cmd);
}
// TODO: what to do when no idField?
@@ -371,12 +292,12 @@ public class DistributedUpdateProcessor extends UpdateRequestProcessor {
* @return whether or not to drop this cmd
* @throws IOException If there is a low-level I/O error.
*/
- protected boolean versionAdd(AddUpdateCommand cmd) throws IOException {
+ protected Future versionAdd(AddUpdateCommand cmd) throws IOException {
BytesRef idBytes = cmd.getIndexedId();
if (idBytes == null) {
super.processAdd(cmd);
- return true;
+ return null;
}
if (vinfo == null) {
@@ -385,7 +306,7 @@ public class DistributedUpdateProcessor extends UpdateRequestProcessor {
"Atomic document updates are not supported unless <updateLog/> is configured");
} else {
super.processAdd(cmd);
- return true;
+ return null;
}
}
@@ -425,16 +346,21 @@ public class DistributedUpdateProcessor extends UpdateRequestProcessor {
dependentVersionFound = waitForDependentUpdates(cmd, versionOnUpdate, isReplayOrPeersync, bucket);
if (dependentVersionFound == -1) {
// it means the document has been deleted by now at the leader. drop this update
- return true;
+ return null;
}
}
- long finalVersionOnUpdate = versionOnUpdate;
- return bucket.runWithLock(vinfo.getVersionBucketLockTimeoutMs(), () -> doVersionAdd(cmd, finalVersionOnUpdate, isReplayOrPeersync, leaderLogic, forwardedFromCollection, bucket));
-
+ vinfo.lockForUpdate();
+ try {
+ long finalVersionOnUpdate = versionOnUpdate;
+ return bucket.runWithLock(vinfo.getVersionBucketLockTimeoutMs(), () -> doVersionAdd(cmd, finalVersionOnUpdate, isReplayOrPeersync,
+ leaderLogic, forwardedFromCollection, bucket), idBytes);
+ } finally {
+ vinfo.unlockForUpdate();
+ }
}
- private boolean doVersionAdd(AddUpdateCommand cmd, long versionOnUpdate, boolean isReplayOrPeersync,
+ private Future doVersionAdd(AddUpdateCommand cmd, long versionOnUpdate, boolean isReplayOrPeersync,
boolean leaderLogic, boolean forwardedFromCollection, VersionBucket bucket) throws IOException {
BytesRef idBytes = cmd.getIndexedId();
@@ -448,7 +374,8 @@ public class DistributedUpdateProcessor extends UpdateRequestProcessor {
// realtime-get to work reliably.
// TODO: if versions aren't stored, do we need to set on the cmd anyway for some reason?
// there may be other reasons in the future for a version on the commands
-
+ boolean nodist = noDistrib();
+ AddUpdateCommand cloneCmd = null;
if (versionsStored) {
long bucketVersion = bucket.highest;
@@ -467,10 +394,6 @@ public class DistributedUpdateProcessor extends UpdateRequestProcessor {
getUpdatedDocument(cmd, versionOnUpdate);
- if (isSubShardLeader) {
- log.info("subshardleader");
- }
-
// leaders can also be in buffering state during "migrate" API call, see SOLR-5308
if (ulog.getState() != UpdateLog.State.ACTIVE && isReplayOrPeersync == false) {
// we're not in an active state, and this update isn't from a replay, so buffer it.
@@ -479,7 +402,7 @@ public class DistributedUpdateProcessor extends UpdateRequestProcessor {
}
cmd.setFlags(cmd.getFlags() | UpdateCommand.BUFFERING);
ulog.add(cmd);
- return true;
+ return null;
}
if (versionOnUpdate != 0) {
@@ -490,7 +413,7 @@ public class DistributedUpdateProcessor extends UpdateRequestProcessor {
// specified it must exist (versionOnUpdate==1) and it does.
} else {
if (cmd.getReq().getParams().getBool(CommonParams.FAIL_ON_VERSION_CONFLICTS, true) == false) {
- return true;
+ return null;
}
throw new SolrException(ErrorCode.CONFLICT, "version conflict for " + cmd.getPrintableId() + " expected=" + versionOnUpdate +
" actual=" + foundVersion + " params=" + req.getParams());
@@ -500,6 +423,18 @@ public class DistributedUpdateProcessor extends UpdateRequestProcessor {
long version = vinfo.getNewClock();
cmd.setVersion(version);
cmd.getSolrInputDocument().setField(CommonParams.VERSION_FIELD, version);
+
+ if (!nodist) {
+
+ SolrInputDocument clonedDoc = null;
+ if (shouldCloneCmdDoc()) {
+ // SolrInputDocument clonedDoc = cmd.solrDoc.deepCopy();
+ cloneCmd = (AddUpdateCommand) cmd.clone();
+ clonedDoc = shouldCloneCmdDoc() ? cmd.solrDoc.deepCopy() : null;
+ cloneCmd.solrDoc = clonedDoc;
+ }
+ }
+
bucket.updateHighest(version);
} else {
// The leader forwarded us this update.
@@ -509,7 +444,7 @@ public class DistributedUpdateProcessor extends UpdateRequestProcessor {
// we're not in an active state, and this update isn't from a replay, so buffer it.
cmd.setFlags(cmd.getFlags() | UpdateCommand.BUFFERING);
ulog.add(cmd);
- return true;
+ return null;
}
if (cmd.isInPlaceUpdate()) {
@@ -528,7 +463,7 @@ public class DistributedUpdateProcessor extends UpdateRequestProcessor {
log.info("In-place update of {} failed to find valid lastVersion to apply to, and the document was deleted at the leader subsequently.", idBytes.utf8ToString());
}
versionDelete((DeleteUpdateCommand) fetchedFromLeader);
- return true;
+ return null;
} else {
assert fetchedFromLeader instanceof AddUpdateCommand;
// Newer document was fetched from the leader. Apply that document instead of this current in-place
@@ -548,7 +483,7 @@ public class DistributedUpdateProcessor extends UpdateRequestProcessor {
// this means we got a newer full doc update and in that case it makes no sense to apply the older
// inplace update. Drop this update
log.info("Update was applied on version: {}, but last version I have is: {}. Dropping current update", prev, lastVersion);
- return true;
+ return null;
} else {
// We're good, we should apply this update. First, update the bucket's highest.
if (bucketVersion != 0 && bucketVersion < versionOnUpdate) {
@@ -569,17 +504,80 @@ public class DistributedUpdateProcessor extends UpdateRequestProcessor {
if (lastVersion != null && Math.abs(lastVersion) >= versionOnUpdate) {
// This update is a repeat, or was reordered. We need to drop this update.
if (log.isDebugEnabled()) log.debug("Dropping add update due to version {}", idBytes.utf8ToString());
- return true;
+ return null;
}
}
}
+
if (!isSubShardLeader && replicaType == Replica.Type.TLOG && (cmd.getFlags() & UpdateCommand.REPLAY) == 0) {
cmd.setFlags(cmd.getFlags() | UpdateCommand.IGNORE_INDEXWRITER);
}
}
}
- return false;
+ Future<?> distFuture = null;
+
+
+ AddUpdateCommand finalCloneCmd;
+ if (!nodist) {
+
+ if (cloneCmd != null) {
+ finalCloneCmd = cloneCmd;
+ } else {
+ finalCloneCmd = cmd;
+ }
+
+ Callable distCall = () -> {
+ if (log.isTraceEnabled()) log.trace("Run distrib add collection");
+ try {
+ doDistribAdd(finalCloneCmd);
+ if (log.isTraceEnabled()) log.trace("after distrib add collection");
+ } catch (Throwable e) {
+ return e;
+ }
+ return null;
+ };
+
+ distFuture = ParWork.getRootSharedExecutor().submit(distCall);
+ }
+
+ // TODO: possibly set checkDeleteByQueries as a flag on the command?
+ // if the update updates a doc that is part of a nested structure,
+ // force open a realTimeSearcher to trigger a ulog cache refresh.
+ // This refresh makes RTG handler aware of this update.q
+
+ try {
+ doLocalAdd(cmd);
+ } catch (Exception e) {
+ Throwable t;
+ if (e instanceof ExecutionException) {
+ t = e.getCause();
+ } else {
+ t = e;
+ }
+ if (distFuture != null && isLeader && !forwardToLeader) {
+ distFuture.cancel(false);
+ if (cloneCmd != null) {
+ cancelCmds.add(cloneCmd);
+ } else {
+ cancelCmds.add(cmd);
+ }
+ }
+ if (t instanceof RuntimeException) {
+ throw (RuntimeException) t;
+ }
+ throw new SolrException(ErrorCode.SERVER_ERROR, t);
+ }
+ // if the update updates a doc that is part of a nested structure,
+ // force open a realTimeSearcher to trigger a ulog cache refresh.
+ // This refresh makes RTG handler aware of this update.q
+ if (ulog != null) {
+ if (req.getSchema().isUsableForChildDocs() && shouldRefreshUlogCaches(cmd)) {
+ ulog.openRealtimeSearcher();
+ }
+ }
+
+ return distFuture;
}
/**
@@ -618,7 +616,8 @@ public class DistributedUpdateProcessor extends UpdateRequestProcessor {
vinfo.lockForUpdate();
try {
- lastFoundVersion = bucket.runWithLock(vinfo.getVersionBucketLockTimeoutMs(), () -> doWaitForDependentUpdates(cmd, versionOnUpdate, isReplayOrPeersync, bucket, waitTimeout));
+ lastFoundVersion = bucket.runWithLock(vinfo.getVersionBucketLockTimeoutMs(), () ->
+ doWaitForDependentUpdates(cmd, versionOnUpdate, isReplayOrPeersync, bucket, waitTimeout), cmd.getIndexedId());
} finally {
vinfo.unlockForUpdate();
}
@@ -820,27 +819,41 @@ public class DistributedUpdateProcessor extends UpdateRequestProcessor {
// we have to spoof the replicationTracker and set the achieved rf to the number of active replicas.
//
protected void doDeleteById(DeleteUpdateCommand cmd) throws IOException {
- // TODO: parallel
setupRequest(cmd);
- log.info("deletebyid {}", cmd.id);
- boolean dropCmd = false;
- if (!forwardToLeader) {
- dropCmd = versionDelete(cmd);
- }
+ if (log.isDebugEnabled()) log.debug("deletebyid {}", cmd.id);
- if (dropCmd) {
- // TODO: do we need to add anything to the response?
- return;
- }
- doDistribDeleteById(cmd);
+ if (!forwardToLeader) {
+ Future future = versionDelete(cmd);
+ if (future != null) {
+ try {
+ future.get();
+ } catch (InterruptedException e) {
+ ParWork.propagateInterrupt(e, true);
+ throw new SolrException(ErrorCode.SERVER_ERROR, e);
+ } catch (ExecutionException e) {
+ Throwable t;
+ if (e instanceof ExecutionException) {
+ t = e.getCause();
+ } else {
+ t = e;
+ }
+ if (t instanceof RuntimeException) {
+ throw (RuntimeException) t;
+ }
+ throw new SolrException(ErrorCode.SERVER_ERROR, t);
+ }
+ }
+ } else {
+ doDistribDeleteById(cmd);
+ }
// cmd.getIndexId == null when delete by query
// TODO: what to do when no idField?
if (returnVersions && rsp != null && cmd.getIndexedId() != null && idField != null) {
if (deleteResponse == null) {
deleteResponse = new NamedList<>(1);
- rsp.add("deletes",deleteResponse);
+ rsp.add("deletes", deleteResponse);
}
if (scratch == null) scratch = new CharsRefBuilder();
idField.getType().indexedToReadable(cmd.getIndexedId(), scratch);
@@ -906,69 +919,28 @@ public class DistributedUpdateProcessor extends UpdateRequestProcessor {
// at this point, there is an update we need to try and apply.
// we may or may not be the leader.
- boolean drop = false;
- if (!forwardToLeader) {
- drop = versionDeleteByQuery(cmd);
- }
+ Future future = versionDeleteByQuery(cmd, replicas, coll);
- if (drop) {
- return;
- }
-
- Future<?> distFuture = null;
- Callable distCall = () -> {
+ if (future != null) {
try {
- doDistribDeleteByQuery(cmd, replicas, coll);
- } catch (IOException e) {
- // MRM TODO: fail this harder...
+ future.get();
+ } catch (InterruptedException e) {
+ ParWork.propagateInterrupt(e, true);
throw new SolrException(ErrorCode.SERVER_ERROR, e);
- }
- return null;
- };
-
- if (!forwardToLeader) {
- distFuture = ParWork.getRootSharedExecutor().submit(distCall);
- } else {
- try {
- distCall.call();
- } catch (Exception e) {
- if (e instanceof RuntimeException) {
- throw (RuntimeException) e;
- }
- throw new SolrException(ErrorCode.SERVER_ERROR, e);
- }
- }
-
- if (!forwardToLeader) {
- try {
- doLocalDelete(cmd);
- } catch (Exception e) {
- log.error("Exception on local deleteByQuery", e);
+ } catch (ExecutionException e) {
Throwable t;
if (e instanceof ExecutionException) {
t = e.getCause();
} else {
t = e;
}
- if (distFuture != null && isLeader && !forwardToLeader) {
- distFuture.cancel(false);
- cancelCmds.add(cmd);
- }
- if (t instanceof SolrException) {
- throw (SolrException) t;
+ if (t instanceof RuntimeException) {
+ throw (RuntimeException) t;
}
throw new SolrException(ErrorCode.SERVER_ERROR, t);
}
}
- if (distFuture != null) {
- try {
- distFuture.get();
- } catch (Exception e) {
- log.error("dist of add failed", e);
- }
- }
-
if (returnVersions && rsp != null) {
if (deleteByQueryResponse == null) {
deleteByQueryResponse = new NamedList<>(1);
@@ -989,7 +961,7 @@ public class DistributedUpdateProcessor extends UpdateRequestProcessor {
// no-op for derived classes to implement
}
- protected boolean versionDeleteByQuery(DeleteUpdateCommand cmd) throws IOException {
+ protected Future versionDeleteByQuery(DeleteUpdateCommand cmd, List<Node> replicas, DocCollection coll) throws IOException {
// Find the version
long versionOnUpdate = findVersionOnUpdate(cmd);
@@ -1003,7 +975,7 @@ public class DistributedUpdateProcessor extends UpdateRequestProcessor {
vinfo.blockUpdates();
try {
- return doLocalDeleteByQuery(cmd, versionOnUpdate, isReplayOrPeersync);
+ return doLocalDeleteByQuery(cmd, versionOnUpdate, isReplayOrPeersync, replicas, coll);
// since we don't know which documents were deleted, the easiest thing to do is to invalidate
// all real-time caches (i.e. UpdateLog) which involves also getting a new version of the IndexReader
@@ -1024,13 +996,26 @@ public class DistributedUpdateProcessor extends UpdateRequestProcessor {
return versionOnUpdate;
}
- private boolean doLocalDeleteByQuery(DeleteUpdateCommand cmd, long versionOnUpdate, boolean isReplayOrPeersync) throws IOException {
+ private Future doLocalDeleteByQuery(DeleteUpdateCommand cmd, long versionOnUpdate, boolean isReplayOrPeersync, List<Node> replicas, DocCollection coll) throws IOException {
+ Future<?> future = null;
if (versionsStored) {
final boolean leaderLogic = isLeader & !isReplayOrPeersync;
if (leaderLogic) {
long version = vinfo.getNewClock();
cmd.setVersion(-version);
// TODO update versions in all buckets
+
+ future = ParWork.getRootSharedExecutor().submit(() -> {
+ try {
+ doDistribDeleteByQuery(cmd, replicas, coll);
+ } catch (IOException e) {
+ log.error("", e); // MRM TODO:
+ }
+ });
+
+
+ doLocalDelete(cmd);
+
} else {
cmd.setVersion(-versionOnUpdate);
@@ -1038,16 +1023,27 @@ public class DistributedUpdateProcessor extends UpdateRequestProcessor {
// we're not in an active state, and this update isn't from a replay, so buffer it.
cmd.setFlags(cmd.getFlags() | UpdateCommand.BUFFERING);
ulog.deleteByQuery(cmd);
- return true;
+ return null; // nocommit
}
+ future = ParWork.getRootSharedExecutor().submit(() -> {
+ try {
+ DeleteUpdateCommand clonedCmd = (DeleteUpdateCommand) cmd.clone();
+ doDistribDeleteByQuery(clonedCmd, replicas, coll);
+ } catch (IOException e) {
+ log.error("", e); // MRM TODO:
+ }
+ });
+
if (!isSubShardLeader && replicaType == Replica.Type.TLOG && (cmd.getFlags() & UpdateCommand.REPLAY) == 0) {
// TLOG replica not leader, don't write the DBQ to IW
cmd.setFlags(cmd.getFlags() | UpdateCommand.IGNORE_INDEXWRITER);
}
+ doLocalDelete(cmd);
}
}
- return false;
+
+ return future;
}
// internal helper method to setup request by processors who use this class.
@@ -1074,13 +1070,13 @@ public class DistributedUpdateProcessor extends UpdateRequestProcessor {
return req.getParams().get(DISTRIB_FROM);
}
- protected boolean versionDelete(DeleteUpdateCommand cmd) throws IOException {
+ protected Future versionDelete(DeleteUpdateCommand cmd) throws IOException {
BytesRef idBytes = cmd.getIndexedId();
if (vinfo == null || idBytes == null) {
super.processDelete(cmd);
- return false;
+ return null;
}
// This is only the hash for the bucket, and must be based only on the uniqueKey (i.e. do not use a pluggable hash
@@ -1113,17 +1109,18 @@ public class DistributedUpdateProcessor extends UpdateRequestProcessor {
try {
long finalVersionOnUpdate = versionOnUpdate;
return bucket.runWithLock(vinfo.getVersionBucketLockTimeoutMs(), () -> doVersionDelete(cmd, finalVersionOnUpdate, signedVersionOnUpdate, isReplayOrPeersync, leaderLogic,
- forwardedFromCollection, bucket));
+ forwardedFromCollection, bucket), idBytes);
} finally {
vinfo.unlockForUpdate();
}
}
- private boolean doVersionDelete(DeleteUpdateCommand cmd, long versionOnUpdate, long signedVersionOnUpdate,
+ private Future doVersionDelete(DeleteUpdateCommand cmd, long versionOnUpdate, long signedVersionOnUpdate,
boolean isReplayOrPeersync, boolean leaderLogic, boolean forwardedFromCollection, VersionBucket bucket)
throws IOException {
BytesRef idBytes = cmd.getIndexedId();
+ Future<?> distFuture = null;
if (versionsStored) {
long bucketVersion = bucket.highest;
@@ -1146,7 +1143,7 @@ public class DistributedUpdateProcessor extends UpdateRequestProcessor {
}
cmd.setFlags(cmd.getFlags() | UpdateCommand.BUFFERING);
ulog.delete(cmd);
- return true;
+ return null;
}
if (signedVersionOnUpdate != 0) {
@@ -1156,13 +1153,21 @@ public class DistributedUpdateProcessor extends UpdateRequestProcessor {
// we're ok if versions match, or if both are negative (all missing docs are equal), or if cmd
// specified it must exist (versionOnUpdate==1) and it does.
} else {
- throw new SolrException(ErrorCode.CONFLICT, "version conflict for " + cmd.getId() + " expected=" + signedVersionOnUpdate +
- " actual=" + foundVersion + " params=" + req.getParams());
+ throw new SolrException(ErrorCode.CONFLICT, "version conflict for " + cmd.getId() + " expected=" + signedVersionOnUpdate + " actual=" + foundVersion + " params=" + req.getParams());
}
}
long version = vinfo.getNewClock();
cmd.setVersion(-version);
+
+ distFuture = ParWork.getRootSharedExecutor().submit(() -> {
+ try {
+ doDistribDeleteById(cmd);
+ } catch (IOException e) {
+ log.error("", e); // MRM TODO
+ }
+ });
+
bucket.updateHighest(version);
} else {
cmd.setVersion(-versionOnUpdate);
@@ -1171,7 +1176,7 @@ public class DistributedUpdateProcessor extends UpdateRequestProcessor {
// we're not in an active state, and this update isn't from a replay, so buffer it.
cmd.setFlags(cmd.getFlags() | UpdateCommand.BUFFERING);
ulog.delete(cmd);
- return true;
+ return null;
}
// if we aren't the leader, then we need to check that updates were not re-ordered
@@ -1188,7 +1193,7 @@ public class DistributedUpdateProcessor extends UpdateRequestProcessor {
if (log.isDebugEnabled()) {
log.debug("Dropping delete update due to version {}", idBytes.utf8ToString());
}
- return true;
+ return null;
}
}
@@ -1198,9 +1203,29 @@ public class DistributedUpdateProcessor extends UpdateRequestProcessor {
}
}
- doLocalDelete(cmd);
- return false;
-
+ try {
+ doLocalDelete(cmd);
+ } catch (Exception e) {
+ Throwable t;
+ if (e instanceof ExecutionException) {
+ t = e.getCause();
+ } else {
+ t = e;
+ }
+ if (distFuture != null && isLeader && !forwardToLeader) {
+ distFuture.cancel(false);
+ if (cmd != null) {
+ cancelCmds.add(cmd);
+ } else {
+ cancelCmds.add(cmd);
+ }
+ }
+ if (t instanceof RuntimeException) {
+ throw (RuntimeException) t;
+ }
+ throw new SolrException(ErrorCode.SERVER_ERROR, t);
+ }
+ return distFuture;
}
@Override
diff --git a/solr/core/src/java/org/apache/solr/update/processor/DistributedZkUpdateProcessor.java b/solr/core/src/java/org/apache/solr/update/processor/DistributedZkUpdateProcessor.java
index d4f316e..68ce680 100644
--- a/solr/core/src/java/org/apache/solr/update/processor/DistributedZkUpdateProcessor.java
+++ b/solr/core/src/java/org/apache/solr/update/processor/DistributedZkUpdateProcessor.java
@@ -99,6 +99,11 @@ public class DistributedZkUpdateProcessor extends DistributedUpdateProcessor {
// 3) in general, not controlling carefully enough exactly when our view of clusterState is updated
protected volatile ClusterState clusterState;
+ // should we clone the document before sending it to replicas?
+ // this is set to true in the constructor if the next processors in the chain
+ // are custom and may modify the SolrInputDocument racing with its serialization for replication
+ private final boolean cloneRequiredOnLeader;
+
//used for keeping track of replicas that have processed an add/update from the leader
private volatile RollupRequestReplicationTracker rollupReplicationTracker = null;
private volatile LeaderRequestReplicationTracker leaderReplicationTracker = null;
@@ -114,6 +119,7 @@ public class DistributedZkUpdateProcessor extends DistributedUpdateProcessor {
zkController = cc.getZkController();
cmdDistrib = new SolrCmdDistributor(zkController.getZkStateReader(), cc.getUpdateShardHandler(), new IsCCClosed(req));
try {
+ cloneRequiredOnLeader = isCloneRequiredOnLeader(next);
collection = cloudDesc.getCollectionName();
clusterState = zkController.getClusterState();
DocCollection coll = clusterState.getCollectionOrNull(collection, true);
@@ -484,7 +490,6 @@ public class DistributedZkUpdateProcessor extends DistributedUpdateProcessor {
if (isLeader) {
// don't forward to ourself
- leaderForAnyShard = true;
} else {
leaders.add(new SolrCmdDistributor.ForwardNode(zkController.getZkStateReader(), leader, collection, sliceName));
}
@@ -499,10 +504,6 @@ public class DistributedZkUpdateProcessor extends DistributedUpdateProcessor {
}
cmdDistrib.distribDelete(cmd, leaders, outParams, false, rollupReplicationTracker, null);
- if (!leaderForAnyShard) {
- return;
- }
-
// change the phase to TOLEADER so we look up and forward to our own replicas (if any)
phase = DistribPhase.TOLEADER;
}
@@ -511,7 +512,7 @@ public class DistributedZkUpdateProcessor extends DistributedUpdateProcessor {
if (DistribPhase.TOLEADER == phase) {
// This core should be a leader
isLeader = true;
- replicas = setupRequestForDBQ();
+ replicas = setupRequestForDBQ(desc.getName());
} else if (DistribPhase.FROMLEADER == phase) {
isLeader = false;
}
@@ -581,19 +582,17 @@ public class DistributedZkUpdateProcessor extends DistributedUpdateProcessor {
}
// used for deleteByQuery to get the list of nodes this leader should forward to
- private List<SolrCmdDistributor.Node> setupRequestForDBQ() {
+ private List<SolrCmdDistributor.Node> setupRequestForDBQ(String name) {
List<SolrCmdDistributor.Node> nodes = null;
String shardId = cloudDesc.getShardId();
try {
- Replica leaderReplica = zkController.getZkStateReader().getLeaderRetry(collection, shardId);
- isLeader = leaderReplica.getName().equals(desc.getName());
// TODO: what if we are no longer the leader?
forwardToLeader = false;
List<Replica> replicaProps = zkController.getZkStateReader()
- .getReplicaProps(collection, shardId, leaderReplica.getName(), Replica.State.BUFFERING, Replica.State.ACTIVE, EnumSet.of(Replica.Type.NRT, Replica.Type.TLOG));
+ .getReplicaProps(collection, shardId, name, Replica.State.BUFFERING, Replica.State.ACTIVE, EnumSet.of(Replica.Type.NRT, Replica.Type.TLOG));
if (replicaProps != null) {
nodes = new ArrayList<>(replicaProps.size());
for (Replica props : replicaProps) {
@@ -802,7 +801,8 @@ public class DistributedZkUpdateProcessor extends DistributedUpdateProcessor {
@Override
protected boolean shouldCloneCmdDoc() {
- return true;
+ boolean willDistrib = isLeader && nodes != null && nodes.size() > 0;
+ return willDistrib & cloneRequiredOnLeader;
}
// helper method, processAdd was getting a bit large.
@@ -969,7 +969,7 @@ public class DistributedZkUpdateProcessor extends DistributedUpdateProcessor {
// slice leader can be null because node/shard is created zk before leader election
if (sliceLeader != null && zkController.getZkStateReader().isNodeLive(sliceLeader.getNodeName())) {
if (nodes == null) nodes = new ArrayList<>();
- nodes.add(new SolrCmdDistributor.StdNode(zkController.getZkStateReader(), sliceLeader, coll.getName(), aslice.getName()));
+ nodes.add(new SolrCmdDistributor.ForwardNode(zkController.getZkStateReader(), sliceLeader, coll.getName(), aslice.getName()));
}
}
}
@@ -1252,8 +1252,8 @@ public class DistributedZkUpdateProcessor extends DistributedUpdateProcessor {
} else {
// not the leader anymore maybe or the error'd node is not my replica?
if (!foundErrorNodeInReplicaList) {
- log.warn("Core {} belonging to {} {}, does not have error'd node {} as a replica. No request recovery command will be sent! replicas={}", desc.getName(), collection, cloudDesc.getShardId(),
- stdNode.getNodeProps().getCoreUrl(), myReplicas);
+ log.warn("Core {} belonging to {} {}, does not have error'd node {} as a replica. No request recovery command will be sent! replicas={} node={}", desc.getName(), collection, cloudDesc.getShardId(),
+ stdNode.getNodeProps().getCoreUrl(), myReplicas, error.req.node.getClass().getSimpleName());
if (!shardId.equals(cloudDesc.getShardId())) {
// some replicas on other shard did not receive the updates (ex: during splitshard),
// exception must be notified to clients
diff --git a/solr/core/src/java/org/apache/solr/update/processor/RunUpdateProcessorFactory.java b/solr/core/src/java/org/apache/solr/update/processor/RunUpdateProcessorFactory.java
index ecf17f2..70963f8 100644
--- a/solr/core/src/java/org/apache/solr/update/processor/RunUpdateProcessorFactory.java
+++ b/solr/core/src/java/org/apache/solr/update/processor/RunUpdateProcessorFactory.java
@@ -126,7 +126,7 @@ public class RunUpdateProcessorFactory extends UpdateRequestProcessorFactory {
try {
super.doClose();
} finally {
- AddUpdateCommand.THREAD_LOCAL_AddUpdateCommand.get().clear();
+ // AddUpdateCommand.THREAD_LOCAL_AddUpdateCommand.get().clear();
}
}
}
diff --git a/solr/core/src/test/org/apache/solr/cloud/CollectionsAPISolrJTest.java b/solr/core/src/test/org/apache/solr/cloud/CollectionsAPISolrJTest.java
index ce759d2..4c3e278 100644
--- a/solr/core/src/test/org/apache/solr/cloud/CollectionsAPISolrJTest.java
+++ b/solr/core/src/test/org/apache/solr/cloud/CollectionsAPISolrJTest.java
@@ -221,6 +221,8 @@ public class CollectionsAPISolrJTest extends SolrCloudTestCase {
response = CollectionAdminRequest.deleteCollection(collectionName).process(cluster.getSolrClient());
assertEquals(0, response.getStatus());
+
+ assertFalse(zkClient().exists(ZkStateReader.COLLECTIONS_ZKNODE + "/" + collectionName));
// nocommit what happened to success?
// assertTrue(response.toString(), response.isSuccess());
// Map<String,NamedList<Integer>> nodesStatus = response.getCollectionNodesStatus();
diff --git a/solr/core/src/test/org/apache/solr/update/TestExceedMaxTermLength.java b/solr/core/src/test/org/apache/solr/update/TestExceedMaxTermLength.java
index 9bf502b..8e7ac49 100644
--- a/solr/core/src/test/org/apache/solr/update/TestExceedMaxTermLength.java
+++ b/solr/core/src/test/org/apache/solr/update/TestExceedMaxTermLength.java
@@ -85,7 +85,7 @@ public class TestExceedMaxTermLength extends SolrTestCaseJ4 {
}
} catch (Exception e) {
//expected
- String msg= e.getCause().getMessage();
+ String msg= e.getCause() == null ? e.getMessage() : e.getCause().getMessage();
assertTrue(msg.contains("one immense term in field=\"cat\""));
}
diff --git a/solr/core/src/test/org/apache/solr/update/processor/DistributedUpdateProcessorTest.java b/solr/core/src/test/org/apache/solr/update/processor/DistributedUpdateProcessorTest.java
index 7fa942d..56a9abb 100644
--- a/solr/core/src/test/org/apache/solr/update/processor/DistributedUpdateProcessorTest.java
+++ b/solr/core/src/test/org/apache/solr/update/processor/DistributedUpdateProcessorTest.java
@@ -27,6 +27,7 @@ import java.util.concurrent.Future;
import java.util.concurrent.TimeUnit;
import java.util.function.Function;
+import org.apache.lucene.util.BytesRef;
import org.apache.solr.SolrTestCaseJ4;
import org.apache.solr.common.ParWork;
import org.apache.solr.common.SolrException;
@@ -93,7 +94,7 @@ public class DistributedUpdateProcessorTest extends SolrTestCaseJ4 {
public void testVersionAdd() throws IOException {
SolrQueryRequest req = new LocalSolrQueryRequest(h.getCore(), new ModifiableSolrParams(), true);
int threads = 5;
- Function<DistributedUpdateProcessor,Boolean> versionAddFunc = (DistributedUpdateProcessor process) -> {
+ Function<DistributedUpdateProcessor,Future> versionAddFunc = (DistributedUpdateProcessor process) -> {
try {
AddUpdateCommand cmd = new AddUpdateCommand(req);
cmd.solrDoc = new SolrInputDocument();
@@ -118,7 +119,7 @@ public class DistributedUpdateProcessorTest extends SolrTestCaseJ4 {
SolrQueryRequest req = new LocalSolrQueryRequest(h.getCore(), new ModifiableSolrParams(), true);
int threads = TEST_NIGHTLY ? 5 : 2;
- Function<DistributedUpdateProcessor,Boolean> versionDeleteFunc = (DistributedUpdateProcessor process) -> {
+ Function<DistributedUpdateProcessor,Future> versionDeleteFunc = (DistributedUpdateProcessor process) -> {
try {
DeleteUpdateCommand cmd = new DeleteUpdateCommand(req);
cmd.id = "1";
@@ -142,7 +143,7 @@ public class DistributedUpdateProcessorTest extends SolrTestCaseJ4 {
* @return how many requests succeeded
*/
private int runCommands(int threads, int versionBucketLockTimeoutMs, SolrQueryRequest req,
- Function<DistributedUpdateProcessor,Boolean> function)
+ Function<DistributedUpdateProcessor,Future> function)
throws IOException {
try (DistributedUpdateProcessor processor = new DistributedUpdateProcessor(
req, null, null, null)) {
@@ -156,7 +157,7 @@ public class DistributedUpdateProcessorTest extends SolrTestCaseJ4 {
* simulate the case: it takes 5 seconds to add the doc
*/
@Override
- public <T, R> R runWithLock(int lockTimeoutMs, CheckedFunction<T,R> function) throws IOException {
+ public <T, R> R runWithLock(int lockTimeoutMs, CheckedFunction<T,R> function, BytesRef idBytes) throws IOException {
boolean locked = false;
try {
locked = lock.tryLock(versionBucketLockTimeoutMs, TimeUnit.MILLISECONDS);
@@ -180,7 +181,7 @@ public class DistributedUpdateProcessorTest extends SolrTestCaseJ4 {
}).when(vinfo).bucket(anyInt());
}
CountDownLatch latch = new CountDownLatch(1);
- Collection<Future<Boolean>> futures = new ArrayList<>();
+ Collection<Future<Future>> futures = new ArrayList<>();
for (int t = 0; t < threads; ++t) {
futures.add(executor.submit(() -> {
latch.await();
@@ -190,7 +191,7 @@ public class DistributedUpdateProcessorTest extends SolrTestCaseJ4 {
latch.countDown();
int succeeded = 0;
- for (Future<Boolean> f : futures) {
+ for (Future<Future> f : futures) {
try {
f.get();
succeeded++;
diff --git a/solr/solrj/src/java/org/apache/solr/common/ParWork.java b/solr/solrj/src/java/org/apache/solr/common/ParWork.java
index 0586d70..acc1754 100644
--- a/solr/solrj/src/java/org/apache/solr/common/ParWork.java
+++ b/solr/solrj/src/java/org/apache/solr/common/ParWork.java
@@ -84,7 +84,7 @@ public class ParWork implements Closeable {
new SynchronousQueue());
((ParWorkExecutor)EXEC).enableCloseLock();
for (int i = 0; i < Math.min(coreSize, 12); i++) {
- EXEC.submit(() -> {});
+ EXEC.prestartCoreThread();
}
}
}
diff --git a/solr/solrj/src/java/org/apache/solr/common/ToleratedUpdateError.java b/solr/solrj/src/java/org/apache/solr/common/ToleratedUpdateError.java
index 329573e..b49e0a9 100644
--- a/solr/solrj/src/java/org/apache/solr/common/ToleratedUpdateError.java
+++ b/solr/solrj/src/java/org/apache/solr/common/ToleratedUpdateError.java
@@ -146,7 +146,7 @@ public final class ToleratedUpdateError {
* @see #parseMetadataIfToleratedUpdateError
*/
public String getMetadataValue() {
- return message.toString();
+ return message;
}
/**
diff --git a/solr/solrj/src/java/org/apache/solr/common/cloud/ZkStateReader.java b/solr/solrj/src/java/org/apache/solr/common/cloud/ZkStateReader.java
index 3b2cad9..7b63f14 100644
--- a/solr/solrj/src/java/org/apache/solr/common/cloud/ZkStateReader.java
+++ b/solr/solrj/src/java/org/apache/solr/common/cloud/ZkStateReader.java
@@ -757,7 +757,7 @@ public class ZkStateReader implements SolrCloseable, Replica.NodeNameToBaseUrl {
ParWork.propagateInterrupt(e);
throw new SolrException(ErrorCode.SERVER_ERROR, e);
}
- if (exists != null && exists.getVersion() == cachedDocCollection.getZNodeVersion()) {
+ if (exists != null && exists.getVersion() == cached.getZNodeVersion()) {
shouldFetch = false;
}
}
diff --git a/solr/solrj/src/java/org/apache/solr/common/util/SolrQueuedThreadPool.java b/solr/solrj/src/java/org/apache/solr/common/util/SolrQueuedThreadPool.java
index 3ce7fbf..b66e593 100644
--- a/solr/solrj/src/java/org/apache/solr/common/util/SolrQueuedThreadPool.java
+++ b/solr/solrj/src/java/org/apache/solr/common/util/SolrQueuedThreadPool.java
@@ -16,7 +16,6 @@
*/
package org.apache.solr.common.util;
-import org.apache.commons.math3.analysis.function.Add;
import org.apache.solr.common.ParWork;
import org.eclipse.jetty.util.AtomicBiInteger;
import org.eclipse.jetty.util.BlockingArrayQueue;
diff --git a/solr/solrj/src/test-files/log4j2.xml b/solr/solrj/src/test-files/log4j2.xml
index 73e4654..0ac5dfd 100644
--- a/solr/solrj/src/test-files/log4j2.xml
+++ b/solr/solrj/src/test-files/log4j2.xml
@@ -28,6 +28,7 @@
</Console>
</Appenders>
<Loggers>
+ <AsyncLogger name="org.apache.solr.update.LoggingInfoStream" level="off"/>
<AsyncLogger name="org.apache.zookeeper" level="WARN"/>
<AsyncLogger name="org.apache.hadoop" level="WARN"/>
<AsyncLogger name="org.apache.directory" level="WARN"/>
diff --git a/solr/test-framework/src/java/org/apache/solr/SolrTestCase.java b/solr/test-framework/src/java/org/apache/solr/SolrTestCase.java
index 284298b..967eafc 100644
--- a/solr/test-framework/src/java/org/apache/solr/SolrTestCase.java
+++ b/solr/test-framework/src/java/org/apache/solr/SolrTestCase.java
@@ -878,7 +878,8 @@ public class SolrTestCase extends LuceneTestCase {
if (testExecutor != null) {
return testExecutor;
}
- testExecutor = (ParWorkExecutor) ParWork.getParExecutorService("testExecutor", 10, 30, 1000, new BlockingArrayQueue(30, 16));
+ testExecutor = (ParWorkExecutor) ParWork.getParExecutorService(
+ "testExecutor", 10, 30, 500, new BlockingArrayQueue(32, 16));
testExecutor.prestartAllCoreThreads();
((ParWorkExecutor) testExecutor).enableCloseLock();
return testExecutor;