You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@solr.apache.org by an...@apache.org on 2022/05/12 19:15:23 UTC
[solr-sandbox] branch crossdc-wip updated: WIP for UpdateRequestProcessor and other producer related files (#8)
This is an automated email from the ASF dual-hosted git repository.
anshum pushed a commit to branch crossdc-wip
in repository https://gitbox.apache.org/repos/asf/solr-sandbox.git
The following commit(s) were added to refs/heads/crossdc-wip by this push:
new d2ae639 WIP for UpdateRequestProcessor and other producer related files (#8)
d2ae639 is described below
commit d2ae639bd0bb7ac5cb4e20cb706bb1fabed4f326
Author: Anshum Gupta <an...@apache.org>
AuthorDate: Thu May 12 12:15:18 2022 -0700
WIP for UpdateRequestProcessor and other producer related files (#8)
---
.../apache/solr/crossdc/KafkaMirroringSink.java | 1 -
.../org/apache/solr/crossdc/consumer/Consumer.java | 2 +-
.../processor/KafkaRequestMirroringHandler.java | 50 +++++
.../solr/update/processor/MirroringException.java | 38 ++++
.../MirroringUpdateRequestProcessorFactory.java | 223 +++++++++++++++++++++
.../update/processor/RequestMirroringHandler.java | 25 +++
6 files changed, 337 insertions(+), 2 deletions(-)
diff --git a/crossdc-consumer/src/main/java/org/apache/solr/crossdc/KafkaMirroringSink.java b/crossdc-consumer/src/main/java/org/apache/solr/crossdc/KafkaMirroringSink.java
index c5beb49..120241a 100644
--- a/crossdc-consumer/src/main/java/org/apache/solr/crossdc/KafkaMirroringSink.java
+++ b/crossdc-consumer/src/main/java/org/apache/solr/crossdc/KafkaMirroringSink.java
@@ -51,7 +51,6 @@ public class KafkaMirroringSink implements RequestMirroringSink, Closeable {
final long enqueueStartNanos = System.nanoTime();
// Create Producer record
-
try {
lastSuccessfulEnqueueNanos = System.nanoTime();
// Record time since last successful enque as 0
diff --git a/crossdc-consumer/src/main/java/org/apache/solr/crossdc/consumer/Consumer.java b/crossdc-consumer/src/main/java/org/apache/solr/crossdc/consumer/Consumer.java
index f82eebb..8885d8a 100644
--- a/crossdc-consumer/src/main/java/org/apache/solr/crossdc/consumer/Consumer.java
+++ b/crossdc-consumer/src/main/java/org/apache/solr/crossdc/consumer/Consumer.java
@@ -82,7 +82,7 @@ public class Consumer {
private CrossDcConsumer getCrossDcConsumer(String zkConnectString, String topicName,
boolean enableDataEncryption) {
- KafkaCrossDcConf conf = new KafkaCrossDcConf(topicName, enableDataEncryption, zkConnectString);
+ final KafkaCrossDcConf conf = new KafkaCrossDcConf(topicName, enableDataEncryption, zkConnectString);
return new KafkaCrossDcConsumer(conf);
}
diff --git a/crossdc-consumer/src/main/java/org/apache/solr/update/processor/KafkaRequestMirroringHandler.java b/crossdc-consumer/src/main/java/org/apache/solr/update/processor/KafkaRequestMirroringHandler.java
new file mode 100644
index 0000000..9d27bd9
--- /dev/null
+++ b/crossdc-consumer/src/main/java/org/apache/solr/update/processor/KafkaRequestMirroringHandler.java
@@ -0,0 +1,50 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.solr.update.processor;
+
+import org.apache.solr.client.solrj.request.UpdateRequest;
+import org.apache.solr.crossdc.KafkaMirroringSink;
+import org.apache.solr.crossdc.MirroringException;
+import org.apache.solr.crossdc.common.KafkaCrossDcConf;
+import org.apache.solr.crossdc.common.MirroredSolrRequest;
+
+import java.util.concurrent.TimeUnit;
+
+public class KafkaRequestMirroringHandler implements RequestMirroringHandler {
+ final KafkaCrossDcConf conf;
+ final KafkaMirroringSink sink;
+
+ public KafkaRequestMirroringHandler() {
+ // TODO: Setup Kafka properly
+ final String topicName = System.getProperty("topicname");
+ conf = new KafkaCrossDcConf(topicName, false, null);
+ sink = new KafkaMirroringSink(conf);
+ }
+
+ /**
+ * When called, should handle submitting the request to the queue
+ *
+ * @param request
+ */
+ @Override
+ public void mirror(UpdateRequest request) throws MirroringException {
+ // TODO: Enforce external version constraint for consistent update replication (cross-cluster)
+ sink.submit(new MirroredSolrRequest(1, request, TimeUnit.MILLISECONDS.toNanos(
+ System.currentTimeMillis())));
+ }
+ }
+}
diff --git a/crossdc-consumer/src/main/java/org/apache/solr/update/processor/MirroringException.java b/crossdc-consumer/src/main/java/org/apache/solr/update/processor/MirroringException.java
new file mode 100644
index 0000000..6f00df9
--- /dev/null
+++ b/crossdc-consumer/src/main/java/org/apache/solr/update/processor/MirroringException.java
@@ -0,0 +1,38 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.solr.update.processor;
+
+/**
+ * Wrapper class for Mirroring exceptions.
+ */
+public class MirroringException extends Exception {
+ public MirroringException() {
+ super();
+ }
+
+ public MirroringException(String message) {
+ super(message);
+ }
+
+ public MirroringException(String message, Throwable cause) {
+ super(message, cause);
+ }
+
+ public MirroringException(Throwable cause) {
+ super(cause);
+ }
+}
diff --git a/crossdc-consumer/src/main/java/org/apache/solr/update/processor/MirroringUpdateRequestProcessorFactory.java b/crossdc-consumer/src/main/java/org/apache/solr/update/processor/MirroringUpdateRequestProcessorFactory.java
new file mode 100644
index 0000000..4097c20
--- /dev/null
+++ b/crossdc-consumer/src/main/java/org/apache/solr/update/processor/MirroringUpdateRequestProcessorFactory.java
@@ -0,0 +1,223 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.solr.update.processor;
+
+import org.apache.solr.client.solrj.request.UpdateRequest;
+import org.apache.solr.common.SolrException;
+import org.apache.solr.common.SolrInputDocument;
+import org.apache.solr.common.params.CommonParams;
+import org.apache.solr.common.params.ModifiableSolrParams;
+import org.apache.solr.common.params.SolrParams;
+import org.apache.solr.common.util.NamedList;
+import org.apache.solr.core.CoreDescriptor;
+import org.apache.solr.core.SolrCore;
+import org.apache.solr.request.SolrQueryRequest;
+import org.apache.solr.response.SolrQueryResponse;
+import org.apache.solr.update.AddUpdateCommand;
+import org.apache.solr.update.DeleteUpdateCommand;
+import org.apache.solr.update.RollbackUpdateCommand;
+import org.apache.solr.util.plugin.SolrCoreAware;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import java.io.IOException;
+
+import static org.apache.solr.common.SolrException.ErrorCode.SERVER_ERROR;
+import static org.apache.solr.update.processor.DistributedUpdateProcessor.*;
+import static org.apache.solr.update.processor.DistributingUpdateProcessorFactory.DISTRIB_UPDATE_PARAM;
+
+/**
+ * An update processor that works with the {@link UpdateRequestProcessorFactory} to mirror update requests by
+ * submitting them to a sink that implements a queue producer.
+ *
+ * ADDs and DeleteByIDs are mirrored from leader shards and have internal _version_ fields stripped.
+ * node.
+ *
+ * A single init arg is required, <b>requestMirroringHandler</b>, which specifies the plugin class used for mirroring
+ * requests. This class must implement {@link RequestMirroringHandler}.
+ *
+ * It is recommended to use the {@link DocBasedVersionConstraintsProcessorFactory} upstream of this factory to ensure
+ * doc consistency between this cluster and the mirror(s).
+ */
+public class MirroringUpdateRequestProcessorFactory extends UpdateRequestProcessorFactory
+ implements SolrCoreAware, UpdateRequestProcessorFactory.RunAlways {
+ private static final Logger log = LoggerFactory.getLogger(MirroringUpdateRequestProcessorFactory.class);
+
+ // Flag for mirroring requests
+ public static String SERVER_SHOULD_MIRROR = "shouldMirror";
+
+ /** This is instantiated in inform(SolrCore) and then shared by all processor instances - visible for testing */
+ volatile RequestMirroringHandler mirroringHandler;
+
+ @Override
+ public void init(final NamedList args) {
+ super.init(args);
+ }
+
+ @Override
+ public void inform(SolrCore core) {
+ // load the request mirroring sink class and instantiate.
+ mirroringHandler = core.getResourceLoader().newInstance(RequestMirroringHandler.class.getName(), KafkaRequestMirroringHandler.class);
+ }
+
+ @Override
+ public MirroringUpdateProcessor getInstance(final SolrQueryRequest req, final SolrQueryResponse rsp,
+ final UpdateRequestProcessor next) {
+ // if the class fails to initialize
+ if (mirroringHandler == null) {
+ throw new SolrException(SolrException.ErrorCode.SERVER_ERROR, "mirroringHandler is null");
+ }
+
+ // Check if mirroring is disabled in request params, defaults to true
+ boolean doMirroring = req.getParams().getBool(SERVER_SHOULD_MIRROR, true);
+
+ ModifiableSolrParams mirroredParams = null;
+ if (doMirroring) {
+ // Get the collection name for the core so we can be explicit in the mirrored request
+ CoreDescriptor coreDesc = req.getCore().getCoreDescriptor();
+ String collection = coreDesc.getCollectionName();
+ if (collection == null) {
+ throw new SolrException(SolrException.ErrorCode.SERVER_ERROR, "Could not determine collection name for "
+ + MirroringUpdateProcessor.class.getSimpleName() + ". Solr may not be running in cloud mode.");
+ }
+
+ mirroredParams = new ModifiableSolrParams(req.getParams());
+ mirroredParams.set("collection", collection);
+ // remove internal version parameter
+ mirroredParams.remove(CommonParams.VERSION_FIELD);
+ // remove fields added by distributed update proc
+ mirroredParams.remove(DISTRIB_UPDATE_PARAM);
+ mirroredParams.remove(DISTRIB_FROM_COLLECTION);
+ mirroredParams.remove(DISTRIB_INPLACE_PREVVERSION);
+ mirroredParams.remove(COMMIT_END_POINT);
+ mirroredParams.remove(DISTRIB_FROM_SHARD);
+ mirroredParams.remove(DISTRIB_FROM_PARENT);
+ mirroredParams.remove(DISTRIB_FROM);
+ // prevent circular mirroring
+ mirroredParams.set(SERVER_SHOULD_MIRROR, Boolean.FALSE.toString());
+ }
+
+ return new MirroringUpdateProcessor(next, doMirroring, mirroredParams,
+ DistribPhase.parseParam(req.getParams().get(DISTRIB_UPDATE_PARAM)), doMirroring ? mirroringHandler : null);
+ }
+
+ public static class MirroringUpdateProcessor extends UpdateRequestProcessor {
+ /** Flag indicating whether this instance creates and submits a mirrored request. This override is
+ * necessary to prevent circular mirroring between coupled cluster running this processor. */
+ private final boolean doMirroring;
+ private final RequestMirroringHandler requestMirroringHandler;
+
+ /** The mirrored request starts as null, gets created and appended to at each process() call,
+ * then submitted on finish(). */
+ private UpdateRequest mirrorRequest;
+ private final SolrParams mirrorParams;
+
+ /** The distributed processor downstream from us so we can establish if we're running on a leader shard */
+ private DistributedUpdateProcessor distProc;
+
+ /** Distribution phase of the incoming requests */
+ private DistribPhase distribPhase;
+
+ public MirroringUpdateProcessor(final UpdateRequestProcessor next, boolean doMirroring,
+ final SolrParams mirroredReqParams, final DistribPhase distribPhase,
+ final RequestMirroringHandler requestMirroringHandler) {
+ super(next);
+ this.doMirroring = doMirroring;
+ this.mirrorParams = mirroredReqParams;
+ this.distribPhase = distribPhase;
+ this.requestMirroringHandler = requestMirroringHandler;
+
+ // Find the downstream distributed update processor
+ for (UpdateRequestProcessor proc = next; proc != null; proc = proc.next) {
+ if (proc instanceof DistributedUpdateProcessor) {
+ distProc = (DistributedUpdateProcessor) proc;
+ break;
+ }
+ }
+ if (distProc == null) {
+ throw new SolrException(SolrException.ErrorCode.SERVER_ERROR, "DistributedUpdateProcessor must follow "
+ + MirroringUpdateProcessor.class.getSimpleName());
+ }
+ }
+
+ private UpdateRequest createAndOrGetMirrorRequest() {
+ if (mirrorRequest == null) {
+ mirrorRequest = new UpdateRequest();
+ mirrorRequest.setParams(new ModifiableSolrParams(mirrorParams));
+ }
+ if (log.isDebugEnabled()) log.debug("createOrGetMirrorRequest={}", mirrorRequest);
+ return mirrorRequest;
+ }
+
+ @Override
+ public void processAdd(final AddUpdateCommand cmd) throws IOException {
+ if (log.isDebugEnabled()) log.debug("processAdd isLeader={} cmd={}", distProc.isLeader(), cmd);
+ super.processAdd(cmd); // let this throw to prevent mirroring invalid reqs
+
+ // submit only from the leader shards so we mirror each doc once
+ if (doMirroring && distProc.isLeader()) {
+ SolrInputDocument doc = cmd.getSolrInputDocument().deepCopy();
+ doc.removeField(CommonParams.VERSION_FIELD); // strip internal doc version
+ createAndOrGetMirrorRequest().add(doc, cmd.commitWithin, cmd.overwrite);
+ }
+ }
+
+ @Override
+ public void processDelete(final DeleteUpdateCommand cmd) throws IOException {
+ if (log.isDebugEnabled()) log.debug("processDelete doMirroring={} isLeader={} cmd={}", doMirroring, distProc.isLeader(), cmd);
+ super.processDelete(cmd); // let this throw to prevent mirroring invalid requests
+
+ if (doMirroring) {
+ if (cmd.isDeleteById()) {
+ // deleteById requests runs once per leader, so we just submit the request from the leader shard
+ if (distProc.isLeader()) {
+ createAndOrGetMirrorRequest().deleteById(cmd.getId()); // strip versions from deletes
+ }
+ } else {
+ // DBQs are sent to each shard leader, so we mirror from the original node to only mirror once
+ // In general there's no way to guarantee that these run identically on the mirror since there are no
+ // external doc versions.
+ // TODO: Can we actually support this considering DBQs aren't versioned.
+ if (distribPhase == DistribPhase.NONE) {
+ createAndOrGetMirrorRequest().deleteByQuery(cmd.query);
+ }
+ }
+ }
+ }
+
+ @Override
+ public void processRollback(final RollbackUpdateCommand cmd) throws IOException {
+ super.processRollback(cmd);
+ // TODO: We can't/shouldn't support this ?
+ }
+
+ @Override
+ public void finish() throws IOException {
+ super.finish();
+
+ if (doMirroring && mirrorRequest != null) {
+ try {
+ requestMirroringHandler.mirror(mirrorRequest);
+ mirrorRequest = null; // so we don't accidentally submit it again
+ } catch (Exception e) {
+ log.error("mirror submit failed", e);
+ throw new SolrException(SERVER_ERROR, "mirror submit failed", e);
+ }
+ }
+ }
+ }
+}
\ No newline at end of file
diff --git a/crossdc-consumer/src/main/java/org/apache/solr/update/processor/RequestMirroringHandler.java b/crossdc-consumer/src/main/java/org/apache/solr/update/processor/RequestMirroringHandler.java
new file mode 100644
index 0000000..16ca862
--- /dev/null
+++ b/crossdc-consumer/src/main/java/org/apache/solr/update/processor/RequestMirroringHandler.java
@@ -0,0 +1,25 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.solr.update.processor;
+
+import org.apache.solr.client.solrj.request.UpdateRequest;
+
+/** Plugin classes must implement this interface to be usable as the handlers for request mirroring */
+public interface RequestMirroringHandler {
+ /** When called, should handle submitting the request to the replica clusters */
+ void mirror(UpdateRequest request) throws Exception;
+}