You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@solr.apache.org by an...@apache.org on 2022/05/12 19:15:23 UTC

[solr-sandbox] branch crossdc-wip updated: WIP for UpdateRequestProcessor and other producer related files (#8)

This is an automated email from the ASF dual-hosted git repository.

anshum pushed a commit to branch crossdc-wip
in repository https://gitbox.apache.org/repos/asf/solr-sandbox.git


The following commit(s) were added to refs/heads/crossdc-wip by this push:
     new d2ae639  WIP for UpdateRequestProcessor and other producer related files (#8)
d2ae639 is described below

commit d2ae639bd0bb7ac5cb4e20cb706bb1fabed4f326
Author: Anshum Gupta <an...@apache.org>
AuthorDate: Thu May 12 12:15:18 2022 -0700

    WIP for UpdateRequestProcessor and other producer related files (#8)
---
 .../apache/solr/crossdc/KafkaMirroringSink.java    |   1 -
 .../org/apache/solr/crossdc/consumer/Consumer.java |   2 +-
 .../processor/KafkaRequestMirroringHandler.java    |  50 +++++
 .../solr/update/processor/MirroringException.java  |  38 ++++
 .../MirroringUpdateRequestProcessorFactory.java    | 223 +++++++++++++++++++++
 .../update/processor/RequestMirroringHandler.java  |  25 +++
 6 files changed, 337 insertions(+), 2 deletions(-)

diff --git a/crossdc-consumer/src/main/java/org/apache/solr/crossdc/KafkaMirroringSink.java b/crossdc-consumer/src/main/java/org/apache/solr/crossdc/KafkaMirroringSink.java
index c5beb49..120241a 100644
--- a/crossdc-consumer/src/main/java/org/apache/solr/crossdc/KafkaMirroringSink.java
+++ b/crossdc-consumer/src/main/java/org/apache/solr/crossdc/KafkaMirroringSink.java
@@ -51,7 +51,6 @@ public class KafkaMirroringSink implements RequestMirroringSink, Closeable {
         final long enqueueStartNanos = System.nanoTime();
 
         // Create Producer record
-
         try {
             lastSuccessfulEnqueueNanos = System.nanoTime();
             // Record time since last successful enque as 0
diff --git a/crossdc-consumer/src/main/java/org/apache/solr/crossdc/consumer/Consumer.java b/crossdc-consumer/src/main/java/org/apache/solr/crossdc/consumer/Consumer.java
index f82eebb..8885d8a 100644
--- a/crossdc-consumer/src/main/java/org/apache/solr/crossdc/consumer/Consumer.java
+++ b/crossdc-consumer/src/main/java/org/apache/solr/crossdc/consumer/Consumer.java
@@ -82,7 +82,7 @@ public class Consumer {
     private CrossDcConsumer getCrossDcConsumer(String zkConnectString, String topicName,
         boolean enableDataEncryption) {
 
-        KafkaCrossDcConf conf = new KafkaCrossDcConf(topicName, enableDataEncryption, zkConnectString);
+        final KafkaCrossDcConf conf = new KafkaCrossDcConf(topicName, enableDataEncryption, zkConnectString);
         return new KafkaCrossDcConsumer(conf);
     }
 
diff --git a/crossdc-consumer/src/main/java/org/apache/solr/update/processor/KafkaRequestMirroringHandler.java b/crossdc-consumer/src/main/java/org/apache/solr/update/processor/KafkaRequestMirroringHandler.java
new file mode 100644
index 0000000..9d27bd9
--- /dev/null
+++ b/crossdc-consumer/src/main/java/org/apache/solr/update/processor/KafkaRequestMirroringHandler.java
@@ -0,0 +1,50 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.solr.update.processor;
+
+import org.apache.solr.client.solrj.request.UpdateRequest;
+import org.apache.solr.crossdc.KafkaMirroringSink;
+import org.apache.solr.crossdc.MirroringException;
+import org.apache.solr.crossdc.common.KafkaCrossDcConf;
+import org.apache.solr.crossdc.common.MirroredSolrRequest;
+
+import java.util.concurrent.TimeUnit;
+
+public class KafkaRequestMirroringHandler implements RequestMirroringHandler {
+    final KafkaCrossDcConf conf;
+    final KafkaMirroringSink sink;
+
+    public KafkaRequestMirroringHandler() {
+        // TODO: Setup Kafka properly
+        final String topicName = System.getProperty("topicname");
+        conf = new KafkaCrossDcConf(topicName, false, null);
+        sink = new KafkaMirroringSink(conf);
+    }
+
+    /**
+     * When called, should handle submitting the request to the queue
+     *
+     * @param request
+     */
+    @Override
+    public void mirror(UpdateRequest request) throws MirroringException {
+            // TODO: Enforce external version constraint for consistent update replication (cross-cluster)
+            sink.submit(new MirroredSolrRequest(1, request, TimeUnit.MILLISECONDS.toNanos(
+                    System.currentTimeMillis())));
+        }
+    }
+}
diff --git a/crossdc-consumer/src/main/java/org/apache/solr/update/processor/MirroringException.java b/crossdc-consumer/src/main/java/org/apache/solr/update/processor/MirroringException.java
new file mode 100644
index 0000000..6f00df9
--- /dev/null
+++ b/crossdc-consumer/src/main/java/org/apache/solr/update/processor/MirroringException.java
@@ -0,0 +1,38 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.solr.update.processor;
+
+/**
+ * Wrapper class for Mirroring exceptions.
+ */
+public class MirroringException extends Exception {
+    public MirroringException() {
+        super();
+    }
+
+    public MirroringException(String message) {
+        super(message);
+    }
+
+    public MirroringException(String message, Throwable cause) {
+        super(message, cause);
+    }
+
+    public MirroringException(Throwable cause) {
+        super(cause);
+    }
+}
diff --git a/crossdc-consumer/src/main/java/org/apache/solr/update/processor/MirroringUpdateRequestProcessorFactory.java b/crossdc-consumer/src/main/java/org/apache/solr/update/processor/MirroringUpdateRequestProcessorFactory.java
new file mode 100644
index 0000000..4097c20
--- /dev/null
+++ b/crossdc-consumer/src/main/java/org/apache/solr/update/processor/MirroringUpdateRequestProcessorFactory.java
@@ -0,0 +1,223 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.solr.update.processor;
+
+import org.apache.solr.client.solrj.request.UpdateRequest;
+import org.apache.solr.common.SolrException;
+import org.apache.solr.common.SolrInputDocument;
+import org.apache.solr.common.params.CommonParams;
+import org.apache.solr.common.params.ModifiableSolrParams;
+import org.apache.solr.common.params.SolrParams;
+import org.apache.solr.common.util.NamedList;
+import org.apache.solr.core.CoreDescriptor;
+import org.apache.solr.core.SolrCore;
+import org.apache.solr.request.SolrQueryRequest;
+import org.apache.solr.response.SolrQueryResponse;
+import org.apache.solr.update.AddUpdateCommand;
+import org.apache.solr.update.DeleteUpdateCommand;
+import org.apache.solr.update.RollbackUpdateCommand;
+import org.apache.solr.util.plugin.SolrCoreAware;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import java.io.IOException;
+
+import static org.apache.solr.common.SolrException.ErrorCode.SERVER_ERROR;
+import static org.apache.solr.update.processor.DistributedUpdateProcessor.*;
+import static org.apache.solr.update.processor.DistributingUpdateProcessorFactory.DISTRIB_UPDATE_PARAM;
+
+/**
+ * An update processor that works with the {@link UpdateRequestProcessorFactory} to mirror update requests by
+ * submitting them to a sink that implements a queue producer.
+ *
+ * ADDs and DeleteByIDs are mirrored from leader shards and have internal _version_ fields stripped.
+ * node.
+ *
+ * A single init arg is required, <b>requestMirroringHandler</b>, which specifies the plugin class used for mirroring
+ * requests. This class must implement {@link RequestMirroringHandler}.
+ *
+ * It is recommended to use the {@link DocBasedVersionConstraintsProcessorFactory} upstream of this factory to ensure
+ * doc consistency between this cluster and the mirror(s).
+ */
+public class MirroringUpdateRequestProcessorFactory extends UpdateRequestProcessorFactory
+        implements SolrCoreAware, UpdateRequestProcessorFactory.RunAlways {
+    private static final Logger log = LoggerFactory.getLogger(MirroringUpdateRequestProcessorFactory.class);
+
+    // Flag for mirroring requests
+    public static String SERVER_SHOULD_MIRROR = "shouldMirror";
+
+    /** This is instantiated in inform(SolrCore) and then shared by all processor instances - visible for testing */
+    volatile RequestMirroringHandler mirroringHandler;
+
+    @Override
+    public void init(final NamedList args) {
+        super.init(args);
+    }
+
+    @Override
+    public void inform(SolrCore core) {
+        // load the request mirroring sink class and instantiate.
+        mirroringHandler = core.getResourceLoader().newInstance(RequestMirroringHandler.class.getName(), KafkaRequestMirroringHandler.class);
+    }
+
+    @Override
+    public MirroringUpdateProcessor getInstance(final SolrQueryRequest req, final SolrQueryResponse rsp,
+                                                final UpdateRequestProcessor next) {
+        // if the class fails to initialize
+        if (mirroringHandler == null) {
+            throw new SolrException(SolrException.ErrorCode.SERVER_ERROR, "mirroringHandler is null");
+        }
+
+        // Check if mirroring is disabled in request params, defaults to true
+        boolean doMirroring = req.getParams().getBool(SERVER_SHOULD_MIRROR, true);
+
+        ModifiableSolrParams mirroredParams = null;
+        if (doMirroring) {
+            // Get the collection name for the core so we can be explicit in the mirrored request
+            CoreDescriptor coreDesc = req.getCore().getCoreDescriptor();
+            String collection = coreDesc.getCollectionName();
+            if (collection == null) {
+                throw new SolrException(SolrException.ErrorCode.SERVER_ERROR, "Could not determine collection name for "
+                        + MirroringUpdateProcessor.class.getSimpleName() + ". Solr may not be running in cloud mode.");
+            }
+
+            mirroredParams = new ModifiableSolrParams(req.getParams());
+            mirroredParams.set("collection", collection);
+            // remove internal version parameter
+            mirroredParams.remove(CommonParams.VERSION_FIELD);
+            // remove fields added by distributed update proc
+            mirroredParams.remove(DISTRIB_UPDATE_PARAM);
+            mirroredParams.remove(DISTRIB_FROM_COLLECTION);
+            mirroredParams.remove(DISTRIB_INPLACE_PREVVERSION);
+            mirroredParams.remove(COMMIT_END_POINT);
+            mirroredParams.remove(DISTRIB_FROM_SHARD);
+            mirroredParams.remove(DISTRIB_FROM_PARENT);
+            mirroredParams.remove(DISTRIB_FROM);
+            // prevent circular mirroring
+            mirroredParams.set(SERVER_SHOULD_MIRROR, Boolean.FALSE.toString());
+        }
+
+        return new MirroringUpdateProcessor(next, doMirroring, mirroredParams,
+                DistribPhase.parseParam(req.getParams().get(DISTRIB_UPDATE_PARAM)), doMirroring ? mirroringHandler : null);
+    }
+
+    public static class MirroringUpdateProcessor extends UpdateRequestProcessor {
+        /** Flag indicating whether this instance creates and submits a mirrored request. This override is
+         * necessary to prevent circular mirroring between coupled cluster running this processor. */
+        private final boolean doMirroring;
+        private final RequestMirroringHandler requestMirroringHandler;
+
+        /** The mirrored request starts as null, gets created and appended to at each process() call,
+         * then submitted on finish(). */
+        private UpdateRequest mirrorRequest;
+        private final SolrParams mirrorParams;
+
+        /** The distributed processor downstream from us so we can establish if we're running on a leader shard */
+        private DistributedUpdateProcessor distProc;
+
+        /** Distribution phase of the incoming requests */
+        private DistribPhase distribPhase;
+
+        public MirroringUpdateProcessor(final UpdateRequestProcessor next, boolean doMirroring,
+                                        final SolrParams mirroredReqParams, final DistribPhase distribPhase,
+                                        final RequestMirroringHandler requestMirroringHandler) {
+            super(next);
+            this.doMirroring = doMirroring;
+            this.mirrorParams = mirroredReqParams;
+            this.distribPhase = distribPhase;
+            this.requestMirroringHandler = requestMirroringHandler;
+
+            // Find the downstream distributed update processor
+            for (UpdateRequestProcessor proc = next; proc != null; proc = proc.next) {
+                if (proc instanceof DistributedUpdateProcessor) {
+                    distProc = (DistributedUpdateProcessor) proc;
+                    break;
+                }
+            }
+            if (distProc == null) {
+                throw new SolrException(SolrException.ErrorCode.SERVER_ERROR, "DistributedUpdateProcessor must follow "
+                        + MirroringUpdateProcessor.class.getSimpleName());
+            }
+        }
+
+        private UpdateRequest createAndOrGetMirrorRequest() {
+            if (mirrorRequest == null) {
+                mirrorRequest = new UpdateRequest();
+                mirrorRequest.setParams(new ModifiableSolrParams(mirrorParams));
+            }
+            if (log.isDebugEnabled()) log.debug("createOrGetMirrorRequest={}", mirrorRequest);
+            return mirrorRequest;
+        }
+
+        @Override
+        public void processAdd(final AddUpdateCommand cmd) throws IOException {
+            if (log.isDebugEnabled()) log.debug("processAdd isLeader={} cmd={}", distProc.isLeader(), cmd);
+            super.processAdd(cmd); // let this throw to prevent mirroring invalid reqs
+
+            // submit only from the leader shards so we mirror each doc once
+            if (doMirroring && distProc.isLeader()) {
+                SolrInputDocument doc = cmd.getSolrInputDocument().deepCopy();
+                doc.removeField(CommonParams.VERSION_FIELD); // strip internal doc version
+                createAndOrGetMirrorRequest().add(doc, cmd.commitWithin, cmd.overwrite);
+            }
+        }
+
+        @Override
+        public void processDelete(final DeleteUpdateCommand cmd) throws IOException {
+            if (log.isDebugEnabled()) log.debug("processDelete doMirroring={} isLeader={} cmd={}", doMirroring, distProc.isLeader(), cmd);
+            super.processDelete(cmd); // let this throw to prevent mirroring invalid requests
+
+            if (doMirroring) {
+                if (cmd.isDeleteById()) {
+                    // deleteById requests runs once per leader, so we just submit the request from the leader shard
+                    if (distProc.isLeader()) {
+                        createAndOrGetMirrorRequest().deleteById(cmd.getId()); // strip versions from deletes
+                    }
+                } else {
+                    // DBQs are sent to each shard leader, so we mirror from the original node to only mirror once
+                    // In general there's no way to guarantee that these run identically on the mirror since there are no
+                    // external doc versions.
+                    // TODO: Can we actually support this considering DBQs aren't versioned.
+                    if (distribPhase == DistribPhase.NONE) {
+                        createAndOrGetMirrorRequest().deleteByQuery(cmd.query);
+                    }
+                }
+            }
+        }
+
+        @Override
+        public void processRollback(final RollbackUpdateCommand cmd) throws IOException {
+            super.processRollback(cmd);
+            // TODO: We can't/shouldn't support this ?
+        }
+
+        @Override
+        public void finish() throws IOException {
+            super.finish();
+
+            if (doMirroring && mirrorRequest != null) {
+                try {
+                    requestMirroringHandler.mirror(mirrorRequest);
+                    mirrorRequest = null; // so we don't accidentally submit it again
+                } catch (Exception e) {
+                    log.error("mirror submit failed", e);
+                    throw new SolrException(SERVER_ERROR, "mirror submit failed", e);
+                }
+            }
+        }
+    }
+}
\ No newline at end of file
diff --git a/crossdc-consumer/src/main/java/org/apache/solr/update/processor/RequestMirroringHandler.java b/crossdc-consumer/src/main/java/org/apache/solr/update/processor/RequestMirroringHandler.java
new file mode 100644
index 0000000..16ca862
--- /dev/null
+++ b/crossdc-consumer/src/main/java/org/apache/solr/update/processor/RequestMirroringHandler.java
@@ -0,0 +1,25 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.solr.update.processor;
+
+import org.apache.solr.client.solrj.request.UpdateRequest;
+
+/** Plugin classes must implement this interface to be usable as the handlers for request mirroring */
+public interface RequestMirroringHandler {
+    /** When called, should handle submitting the request to the replica clusters  */
+    void mirror(UpdateRequest request) throws Exception;
+}