You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@solr.apache.org by ma...@apache.org on 2022/04/14 06:52:34 UTC
[solr-sandbox] branch crossdc-wip updated: WIP for Cross DC consumer (#5)
This is an automated email from the ASF dual-hosted git repository.
markrmiller pushed a commit to branch crossdc-wip
in repository https://gitbox.apache.org/repos/asf/solr-sandbox.git
The following commit(s) were added to refs/heads/crossdc-wip by this push:
new d75761a WIP for Cross DC consumer (#5)
d75761a is described below
commit d75761a4b92adf4ac3d8aabacaa7455689447d8d
Author: Mark Robert Miller <ma...@apache.org>
AuthorDate: Thu Apr 14 01:52:30 2022 -0500
WIP for Cross DC consumer (#5)
* CrossDc Consumer wip
* Temp commit
* wip commit
* Draft commit, cleanup code
* WIP commit
* Fix tests, WIP commit
* Refactor MessageProcessor and some cleanup.
Co-authored-by: Anshum Gupta <an...@apache.org>
---
build.gradle | 6 +
crossdc-consumer/build.gradle | 18 +-
.../apache/solr/crossdc/KafkaMirroringSink.java | 92 ++++++
.../apache/solr/crossdc/MirroringException.java} | 26 +-
.../apache/solr/crossdc/RequestMirroringSink.java} | 21 +-
.../apache/solr/crossdc/ResubmitBackoffPolicy.java | 7 +
.../apache/solr/crossdc/common/CrossDcConf.java} | 9 +-
.../solr/crossdc/common/CrossDcConstants.java} | 15 +-
.../apache/solr/crossdc/common/IQueueHandler.java | 75 +++++
.../solr/crossdc/common/KafkaCrossDcConf.java | 58 ++++
.../solr/crossdc/common/MirroredSolrRequest.java | 124 +++++++++
.../common/MirroredSolrRequestSerializer.java | 83 ++++++
.../solr/crossdc/common/SolrExceptionUtil.java} | 23 +-
.../org/apache/solr/crossdc/consumer/Consumer.java | 255 +++++++++++++++++
.../solr/crossdc/helpers/SendDummyUpdates.java | 51 ++++
.../crossdc/messageprocessor/MessageProcessor.java | 17 ++
.../messageprocessor/SolrMessageProcessor.java | 308 +++++++++++++++++++++
.../apache/solr/crossdc/TestMessageProcessor.java | 125 +++++++++
gradle/wrapper/gradle-wrapper.properties | 2 +-
settings.gradle | 4 +-
version.props | 0
21 files changed, 1269 insertions(+), 50 deletions(-)
diff --git a/build.gradle b/build.gradle
index b9644b2..081a382 100644
--- a/build.gradle
+++ b/build.gradle
@@ -21,3 +21,9 @@
* This is a general purpose Gradle build.
* Learn more about Gradle by exploring our samples at https://docs.gradle.org/6.7.1/samples
*/
+
+description 'Root for Solr plugins sandbox'
+
+subprojects {
+ group "org.apache.solr.crossdc"
+}
diff --git a/crossdc-consumer/build.gradle b/crossdc-consumer/build.gradle
index 8c44542..58f3dac 100644
--- a/crossdc-consumer/build.gradle
+++ b/crossdc-consumer/build.gradle
@@ -15,7 +15,7 @@
* limitations under the License.
*/
plugins {
- id 'java'
+ id 'java-library'
}
description = 'Cross-DC Consumer package'
@@ -24,7 +24,23 @@ version '1.0-SNAPSHOT'
repositories {
mavenCentral()
+ jcenter()
}
dependencies {
+ implementation group: 'org.apache.solr', name: 'solr-solrj', version: '8.11.1'
+ api 'org.eclipse.jetty:jetty-http:9.4.41.v20210516'
+ api 'org.eclipse.jetty:jetty-server:9.4.41.v20210516'
+ api 'org.eclipse.jetty:jetty-servlet:9.4.41.v20210516'
+ api 'org.apache.kafka:kafka-clients:2.8.0'
+ compile group: 'com.google.guava', name: 'guava', version: '14.0'
+ runtimeOnly ('com.google.protobuf:protobuf-java-util:3.19.2')
+ testImplementation 'org.hamcrest:hamcrest:2.2'
+ testImplementation 'junit:junit:4.13.2'
+ testImplementation('org.mockito:mockito-core:4.3.1', {
+ exclude group: "net.bytebuddy", module: "byte-buddy-agent"
+ })
}
+subprojects {
+ group "org.apache.solr"
+}
\ No newline at end of file
diff --git a/crossdc-consumer/src/main/java/org/apache/solr/crossdc/KafkaMirroringSink.java b/crossdc-consumer/src/main/java/org/apache/solr/crossdc/KafkaMirroringSink.java
new file mode 100644
index 0000000..1b6109a
--- /dev/null
+++ b/crossdc-consumer/src/main/java/org/apache/solr/crossdc/KafkaMirroringSink.java
@@ -0,0 +1,92 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.solr.crossdc;
+
+import org.apache.kafka.clients.producer.KafkaProducer;
+import org.apache.kafka.clients.producer.Producer;
+import org.apache.solr.crossdc.common.KafkaCrossDcConf;
+import org.apache.solr.crossdc.common.MirroredSolrRequest;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import java.util.Properties;
+import java.util.concurrent.TimeUnit;
+
+public class KafkaMirroringSink implements RequestMirroringSink {
+ private static final Logger logger = LoggerFactory.getLogger(KafkaMirroringSink.class);
+
+ private long lastSuccessfulEnqueueNanos;
+ private KafkaCrossDcConf conf;
+ private final Producer<String, MirroredSolrRequest> producer;
+
+ public KafkaMirroringSink(final KafkaCrossDcConf conf) {
+ // Create Kafka Mirroring Sink
+ this.conf = conf;
+ this.producer = initProducer();
+ logger.info("KafkaMirroringSink has been created. Producer & Topic have been created successfully! Configurations {}", conf);
+ }
+
+ @Override
+ public void submit(MirroredSolrRequest request) throws MirroringException {
+ if (logger.isDebugEnabled()) {
+ logger.debug("About to submit a MirroredSolrRequest");
+ }
+
+ final long enqueueStartNanos = System.nanoTime();
+
+ // Create Producer record
+
+ try {
+ lastSuccessfulEnqueueNanos = System.nanoTime();
+ // Record time since last successful enque as 0
+ long elapsedTimeMillis = TimeUnit.NANOSECONDS.toMillis(System.nanoTime() - enqueueStartNanos);
+ // Update elapsed time
+
+ if (elapsedTimeMillis > conf.getSlowSubmitThresholdInMillis()) {
+ slowSubmitAction(request, elapsedTimeMillis);
+ }
+ } catch (Exception e) {
+ // We are intentionally catching all exceptions, the expected exception form this function is {@link MirroringException}
+
+ String message = String.format("Unable to enqueue request %s, # of attempts %s", request, conf.getNumOfRetries());
+ logger.error(message, e);
+
+ throw new MirroringException(message, e);
+ }
+ }
+
+ /**
+ * Create and init the producer using {@link this#conf}
+ * All producer configs are listed here
+ * https://kafka.apache.org/documentation/#producerconfigs
+ *
+ * @return
+ */
+ private Producer<String, MirroredSolrRequest> initProducer() {
+ // Initialize and return Kafka producer
+ Properties props = new Properties();
+ logger.info("Creating Kafka producer! Configurations {} ", conf.toString());
+ Producer<String, MirroredSolrRequest> producer = new KafkaProducer(props);
+ return producer;
+ }
+
+ private void slowSubmitAction(Object request, long elapsedTimeMillis) {
+ logger.warn("Enqueuing the request to Kafka took more than {} millis. enqueueElapsedTime={}",
+ conf.getSlowSubmitThresholdInMillis(),
+ elapsedTimeMillis);
+ }
+}
diff --git a/crossdc-consumer/build.gradle b/crossdc-consumer/src/main/java/org/apache/solr/crossdc/MirroringException.java
similarity index 64%
copy from crossdc-consumer/build.gradle
copy to crossdc-consumer/src/main/java/org/apache/solr/crossdc/MirroringException.java
index 8c44542..2073750 100644
--- a/crossdc-consumer/build.gradle
+++ b/crossdc-consumer/src/main/java/org/apache/solr/crossdc/MirroringException.java
@@ -14,17 +14,25 @@
* See the License for the specific language governing permissions and
* limitations under the License.
*/
-plugins {
- id 'java'
-}
+package org.apache.solr.crossdc;
-description = 'Cross-DC Consumer package'
+/**
+ * Exception thrown during cross-dc mirroring
+ */
+public class MirroringException extends Exception {
+ public MirroringException() {
+ super();
+ }
-version '1.0-SNAPSHOT'
+ public MirroringException(String message) {
+ super(message);
+ }
-repositories {
- mavenCentral()
-}
+ public MirroringException(String message, Throwable cause) {
+ super(message, cause);
+ }
-dependencies {
+ public MirroringException(Throwable cause) {
+ super(cause);
+ }
}
diff --git a/crossdc-consumer/build.gradle b/crossdc-consumer/src/main/java/org/apache/solr/crossdc/RequestMirroringSink.java
similarity index 55%
copy from crossdc-consumer/build.gradle
copy to crossdc-consumer/src/main/java/org/apache/solr/crossdc/RequestMirroringSink.java
index 8c44542..5d09c16 100644
--- a/crossdc-consumer/build.gradle
+++ b/crossdc-consumer/src/main/java/org/apache/solr/crossdc/RequestMirroringSink.java
@@ -14,17 +14,18 @@
* See the License for the specific language governing permissions and
* limitations under the License.
*/
-plugins {
- id 'java'
-}
-
-description = 'Cross-DC Consumer package'
+package org.apache.solr.crossdc;
-version '1.0-SNAPSHOT'
+import org.apache.solr.crossdc.common.MirroredSolrRequest;
-repositories {
- mavenCentral()
-}
+public interface RequestMirroringSink {
-dependencies {
+ /**
+ * Submits a mirrored solr request to the appropriate end-point such that it is eventually received by solr
+ * A direct sink may simply use CloudSolrServer to process requests directly.
+ * A queueing sink will serialize the request and submit it to a queue for later consumption
+ * @param request the request that is to be mirrored
+ * @throws MirroringException Implementations may throw an exception
+ */
+ void submit(final MirroredSolrRequest request) throws MirroringException;
}
diff --git a/crossdc-consumer/src/main/java/org/apache/solr/crossdc/ResubmitBackoffPolicy.java b/crossdc-consumer/src/main/java/org/apache/solr/crossdc/ResubmitBackoffPolicy.java
new file mode 100644
index 0000000..145e0d0
--- /dev/null
+++ b/crossdc-consumer/src/main/java/org/apache/solr/crossdc/ResubmitBackoffPolicy.java
@@ -0,0 +1,7 @@
+package org.apache.solr.crossdc;
+
+import org.apache.solr.crossdc.common.MirroredSolrRequest;
+
+public interface ResubmitBackoffPolicy {
+ long getBackoffTimeMs(MirroredSolrRequest resubmitRequest);
+}
diff --git a/crossdc-consumer/settings.gradle b/crossdc-consumer/src/main/java/org/apache/solr/crossdc/common/CrossDcConf.java
similarity index 84%
rename from crossdc-consumer/settings.gradle
rename to crossdc-consumer/src/main/java/org/apache/solr/crossdc/common/CrossDcConf.java
index 8c7c712..b1f3d24 100644
--- a/crossdc-consumer/settings.gradle
+++ b/crossdc-consumer/src/main/java/org/apache/solr/crossdc/common/CrossDcConf.java
@@ -14,11 +14,8 @@
* See the License for the specific language governing permissions and
* limitations under the License.
*/
+package org.apache.solr.crossdc.common;
-rootProject.name = 'crossdc-consumer'
-
-description = 'Module for Apache Solr Cross DC Consumer'
-
-subprojects {
- group "org.apache.solr.crossdc"
+public abstract class CrossDcConf {
+ public abstract String getClusterName();
}
diff --git a/crossdc-consumer/build.gradle b/crossdc-consumer/src/main/java/org/apache/solr/crossdc/common/CrossDcConstants.java
similarity index 80%
copy from crossdc-consumer/build.gradle
copy to crossdc-consumer/src/main/java/org/apache/solr/crossdc/common/CrossDcConstants.java
index 8c44542..1d6e355 100644
--- a/crossdc-consumer/build.gradle
+++ b/crossdc-consumer/src/main/java/org/apache/solr/crossdc/common/CrossDcConstants.java
@@ -14,17 +14,10 @@
* See the License for the specific language governing permissions and
* limitations under the License.
*/
-plugins {
- id 'java'
-}
-
-description = 'Cross-DC Consumer package'
-version '1.0-SNAPSHOT'
-
-repositories {
- mavenCentral()
-}
+package org.apache.solr.crossdc.common;
-dependencies {
+public class CrossDcConstants {
+ // Requests containing this parameter will not be mirrored.
+ public static final String SHOULD_MIRROR = "shouldMirror";
}
diff --git a/crossdc-consumer/src/main/java/org/apache/solr/crossdc/common/IQueueHandler.java b/crossdc-consumer/src/main/java/org/apache/solr/crossdc/common/IQueueHandler.java
new file mode 100644
index 0000000..3c0ddff
--- /dev/null
+++ b/crossdc-consumer/src/main/java/org/apache/solr/crossdc/common/IQueueHandler.java
@@ -0,0 +1,75 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.solr.crossdc.common;
+
+public interface IQueueHandler<T> {
+ enum ResultStatus {
+ /** Item was successfully processed */
+ HANDLED,
+
+ /** Item was not processed, and the consumer should shutdown */
+ NOT_HANDLED_SHUTDOWN,
+
+ /** Item processing failed, and the item should be retried immediately */
+ FAILED_RETRY,
+
+ /** Item processing failed, and the item should not be retried (unsuccessfully processed) */
+ FAILED_NO_RETRY,
+
+ /** Item processing failed, and the item should be re-queued */
+ FAILED_RESUBMIT
+ }
+
+ class Result<T> {
+ private final ResultStatus _status;
+ private final Throwable _throwable;
+ private final T _newItem;
+
+ public Result(final ResultStatus status) {
+ _status = status;
+ _throwable = null;
+ _newItem = null;
+ }
+
+ public Result(final ResultStatus status, final Throwable throwable) {
+ _status = status;
+ _throwable = throwable;
+ _newItem = null;
+ }
+
+ public Result(final ResultStatus status, final Throwable throwable, final T newItem) {
+ _status = status;
+ _throwable = throwable;
+ _newItem = newItem;
+ }
+
+ public ResultStatus status() {
+ return _status;
+ }
+
+ public Throwable throwable() {
+ return _throwable;
+ }
+
+ public T newItem() {
+ return _newItem;
+ }
+ }
+
+ Result<T> handleItem(T item);
+}
diff --git a/crossdc-consumer/src/main/java/org/apache/solr/crossdc/common/KafkaCrossDcConf.java b/crossdc-consumer/src/main/java/org/apache/solr/crossdc/common/KafkaCrossDcConf.java
new file mode 100644
index 0000000..ee6a4a6
--- /dev/null
+++ b/crossdc-consumer/src/main/java/org/apache/solr/crossdc/common/KafkaCrossDcConf.java
@@ -0,0 +1,58 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.solr.crossdc.common;
+
+public class KafkaCrossDcConf extends CrossDcConf {
+ private final String topicName;
+ private final boolean enableDataEncryption;
+ private long slowSubmitThresholdInMillis;
+ private int numOfRetries = 5;
+ private final String solrZkConnectString;
+
+
+ public KafkaCrossDcConf(String topicName, boolean enableDataEncryption, String solrZkConnectString) {
+ this.topicName = topicName;
+ this.enableDataEncryption = enableDataEncryption;
+ this.solrZkConnectString = solrZkConnectString;
+ }
+ public String getTopicName() {
+ return topicName;
+ }
+
+ public boolean getEnableDataEncryption() { return enableDataEncryption; }
+
+ public long getSlowSubmitThresholdInMillis() {
+ return slowSubmitThresholdInMillis;
+ }
+
+ public void setSlowSubmitThresholdInMillis(long slowSubmitThresholdInMillis) {
+ this.slowSubmitThresholdInMillis = slowSubmitThresholdInMillis;
+ }
+
+ public int getNumOfRetries() {
+ return numOfRetries;
+ }
+
+ public String getSolrZkConnectString() {
+ return solrZkConnectString;
+ }
+
+ @Override
+ public String getClusterName() {
+ return null;
+ }
+}
diff --git a/crossdc-consumer/src/main/java/org/apache/solr/crossdc/common/MirroredSolrRequest.java b/crossdc-consumer/src/main/java/org/apache/solr/crossdc/common/MirroredSolrRequest.java
new file mode 100644
index 0000000..4c96116
--- /dev/null
+++ b/crossdc-consumer/src/main/java/org/apache/solr/crossdc/common/MirroredSolrRequest.java
@@ -0,0 +1,124 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.solr.crossdc.common;
+
+import org.apache.solr.client.solrj.SolrRequest;
+import org.apache.solr.common.params.SolrParams;
+
+import java.util.*;
+import java.util.concurrent.TimeUnit;
+
+/**
+ * Class to encapsulate a mirrored Solr request.
+ * This adds a timestamp and #attempts to the request for tracking purpopse.
+ */
+public class MirroredSolrRequest {
+ private final SolrRequest solrRequest;
+
+ // Attempts counter for processing the request
+ private int attempt = 1;
+
+ // Timestamp to track when this request was first written. This should be used to track the replication lag.
+ private long submitTimeNanos = 0;
+
+ public MirroredSolrRequest(final SolrRequest solrRequest) {
+ this(1, solrRequest, 0);
+ }
+
+ public MirroredSolrRequest(final int attempt, final SolrRequest solrRequest, final long submitTimeNanos) {
+ if (solrRequest == null) {
+ throw new NullPointerException("solrRequest cannot be null");
+ }
+ this.attempt = attempt;
+ this.solrRequest = solrRequest;
+ this.submitTimeNanos = submitTimeNanos;
+ }
+
+ public MirroredSolrRequest(final int attempt,
+ final long submitTimeNanos) {
+ this.attempt = attempt;
+ this.submitTimeNanos = submitTimeNanos;
+ solrRequest = null;
+ }
+
+ public static MirroredSolrRequest mirroredAdminCollectionRequest(SolrParams params) {
+ Map<String, List<String>> createParams = new HashMap();
+ // don't mirror back
+ createParams.put(CrossDcConstants.SHOULD_MIRROR, Collections.singletonList("false"));
+
+ final Iterator<String> paramNamesIterator = params.getParameterNamesIterator();
+ while (paramNamesIterator.hasNext()) {
+ final String key = paramNamesIterator.next();
+ if (key.equals("createNodeSet") || key.equals("node")) {
+ // don't forward as nodeset most likely makes no sense here.
+ // should we log when we skip this parameter that was part of the original request ?
+ continue;
+ }
+ final String[] values = params.getParams(key);
+ if (values != null) {
+ createParams.put(key, Arrays.asList(values));
+ }
+ }
+
+ return new MirroredSolrRequest(1,
+ TimeUnit.MILLISECONDS.toNanos(System.currentTimeMillis()));
+ }
+
+ public int getAttempt() {
+ return attempt;
+ }
+
+ public void setAttempt(final int attempt) {
+ this.attempt = attempt;
+ }
+
+ public SolrRequest getSolrRequest() {
+ return solrRequest;
+ }
+
+ public long getSubmitTimeNanos() {
+ return submitTimeNanos;
+ }
+
+ public void setSubmitTimeNanos(final long submitTimeNanos) {
+ this.submitTimeNanos = submitTimeNanos;
+ }
+
+ @Override
+ public boolean equals(final Object o) {
+ if (this == o) return true;
+ if (!(o instanceof MirroredSolrRequest)) return false;
+
+ final MirroredSolrRequest that = (MirroredSolrRequest)o;
+
+ return Objects.equals(solrRequest, that.solrRequest);
+ }
+
+ @Override
+ public int hashCode() {
+ return solrRequest.hashCode();
+ }
+
+ @Override
+ public String toString() {
+ return "MirroredSolrRequest{" +
+ "solrRequest=" + solrRequest +
+ ", attempt=" + attempt +
+ ", submitTimeNanos=" + submitTimeNanos +
+ '}';
+ }
+}
diff --git a/crossdc-consumer/src/main/java/org/apache/solr/crossdc/common/MirroredSolrRequestSerializer.java b/crossdc-consumer/src/main/java/org/apache/solr/crossdc/common/MirroredSolrRequestSerializer.java
new file mode 100644
index 0000000..592a36c
--- /dev/null
+++ b/crossdc-consumer/src/main/java/org/apache/solr/crossdc/common/MirroredSolrRequestSerializer.java
@@ -0,0 +1,83 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.solr.crossdc.common;
+
+import org.apache.kafka.common.serialization.Serializer;
+import org.apache.solr.client.solrj.SolrRequest;
+import org.apache.solr.client.solrj.request.JavaBinUpdateRequestCodec;
+import org.apache.solr.client.solrj.request.UpdateRequest;
+
+import java.io.ByteArrayOutputStream;
+import java.io.IOException;
+import java.util.Map;
+
+public class MirroredSolrRequestSerializer implements Serializer<MirroredSolrRequest> {
+
+ private boolean isKey;
+ /**
+ * Configure this class.
+ *
+ * @param configs configs in key/value pairs
+ * @param isKey whether is for key or value
+ */
+ @Override
+ public void configure(Map<String, ?> configs, boolean isKey) {
+ this.isKey = isKey;
+ }
+
+ /**
+ * Convert {@code data} into a byte array.
+ *
+ * @param topic topic associated with data
+ * @param request MirroredSolrRequest that needs to be serialized
+ * @return serialized bytes
+ */
+ @Override
+ public byte[] serialize(String topic, MirroredSolrRequest request) {
+ // TODO: add checks
+ SolrRequest solrRequest = request.getSolrRequest();
+ UpdateRequest updateRequest = (UpdateRequest)solrRequest;
+ JavaBinUpdateRequestCodec codec = new JavaBinUpdateRequestCodec();
+ ExposedByteArrayOutputStream baos = new ExposedByteArrayOutputStream();
+ try {
+ codec.marshal(updateRequest, baos);
+ } catch (IOException e) {
+ throw new RuntimeException(e);
+ }
+ return baos.byteArray();
+ }
+
+ /**
+ * Close this serializer.
+ * <p>
+ * This method must be idempotent as it may be called multiple times.
+ */
+ @Override
+ public void close() {
+ Serializer.super.close();
+ }
+
+ private static final class ExposedByteArrayOutputStream extends ByteArrayOutputStream {
+ ExposedByteArrayOutputStream() {
+ super();
+ }
+
+ byte[] byteArray() {
+ return buf;
+ }
+ }
+}
diff --git a/crossdc-consumer/build.gradle b/crossdc-consumer/src/main/java/org/apache/solr/crossdc/common/SolrExceptionUtil.java
similarity index 62%
copy from crossdc-consumer/build.gradle
copy to crossdc-consumer/src/main/java/org/apache/solr/crossdc/common/SolrExceptionUtil.java
index 8c44542..47dced2 100644
--- a/crossdc-consumer/build.gradle
+++ b/crossdc-consumer/src/main/java/org/apache/solr/crossdc/common/SolrExceptionUtil.java
@@ -14,17 +14,18 @@
* See the License for the specific language governing permissions and
* limitations under the License.
*/
-plugins {
- id 'java'
-}
-
-description = 'Cross-DC Consumer package'
+package org.apache.solr.crossdc.common;
-version '1.0-SNAPSHOT'
-
-repositories {
- mavenCentral()
-}
+import org.apache.solr.common.SolrException;
-dependencies {
+public class SolrExceptionUtil {
+ public static SolrException asSolrException(final Exception e) {
+ SolrException solrException = null;
+ if (e.getCause() instanceof SolrException) {
+ solrException = (SolrException) e.getCause();
+ } else if (e instanceof SolrException) {
+ solrException = (SolrException) e;
+ }
+ return solrException;
+ }
}
diff --git a/crossdc-consumer/src/main/java/org/apache/solr/crossdc/consumer/Consumer.java b/crossdc-consumer/src/main/java/org/apache/solr/crossdc/consumer/Consumer.java
new file mode 100644
index 0000000..3bb62b3
--- /dev/null
+++ b/crossdc-consumer/src/main/java/org/apache/solr/crossdc/consumer/Consumer.java
@@ -0,0 +1,255 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.solr.crossdc.consumer;
+
+import org.apache.kafka.clients.consumer.ConsumerRecord;
+import org.apache.kafka.clients.consumer.ConsumerRecords;
+import org.apache.kafka.clients.consumer.KafkaConsumer;
+import org.apache.kafka.clients.consumer.OffsetAndMetadata;
+import org.apache.kafka.common.TopicPartition;
+import org.apache.kafka.common.errors.WakeupException;
+import org.apache.solr.crossdc.KafkaMirroringSink;
+import org.apache.solr.crossdc.MirroringException;
+import org.apache.solr.crossdc.common.IQueueHandler;
+import org.apache.solr.crossdc.common.KafkaCrossDcConf;
+import org.apache.solr.crossdc.common.MirroredSolrRequest;
+import org.apache.solr.crossdc.messageprocessor.SolrMessageProcessor;
+import org.eclipse.jetty.server.Connector;
+import org.eclipse.jetty.server.Server;
+import org.eclipse.jetty.server.ServerConnector;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import java.lang.invoke.MethodHandles;
+import java.time.Duration;
+import java.util.Collections;
+import java.util.List;
+import java.util.Properties;
+import java.util.concurrent.ExecutorService;
+import java.util.concurrent.Executors;
+
+// Cross-DC Consumer main class
+public class Consumer {
+ private static boolean enabled = true;
+
+ /**
+ * ExecutorService to manage the cross-dc consumer threads.
+ */
+ private ExecutorService consumerThreadExecutor;
+
+ private static final Logger logger = LoggerFactory.getLogger(Consumer.class);
+
+ private Server server;
+ CrossDcConsumer crossDcConsumer;
+
+ public void start(String[] args) {
+
+ // TODO: use args for config
+ server = new Server();
+ ServerConnector connector = new ServerConnector(server);
+ connector.setPort(8090);
+ server.setConnectors(new Connector[] {connector});
+ crossDcConsumer = getCrossDcConsumer();
+
+ // Start consumer thread
+ consumerThreadExecutor = Executors.newSingleThreadExecutor();
+ consumerThreadExecutor.submit(crossDcConsumer);
+
+ // Register shutdown hook
+ Thread shutdownHook = new Thread(() -> System.out.println("Shutting down consumers!"));
+ Runtime.getRuntime().addShutdownHook(shutdownHook);
+ }
+
+ private CrossDcConsumer getCrossDcConsumer() {
+ // nocommit - hardcoded conf
+ KafkaCrossDcConf conf = new KafkaCrossDcConf("test-topic", true, "localhost:2181");
+ return new KafkaCrossDcConsumer(conf);
+ }
+
+ public static void main(String[] args) {
+ Consumer consumer = new Consumer();
+ consumer.start(args);
+ }
+
+ /**
+ * Abstract class for defining cross-dc consumer
+ */
+ public abstract static class CrossDcConsumer implements Runnable {
+ SolrMessageProcessor messageProcessor;
+
+ }
+
+ public static class CrossDcConsumerFactory {
+ private static final Logger log = LoggerFactory.getLogger(MethodHandles.lookup().lookupClass());
+
+ CrossDcConsumer getCrossDcConsumer(){
+ return null;
+ }
+
+ }
+
+ /**
+ * Class to run the consumer thread for Kafka. This also contains the implementation for retries and
+ * resubmitting to the queue in case of temporary failures.
+ */
+ public static class KafkaCrossDcConsumer extends CrossDcConsumer {
+ private static final Logger logger = LoggerFactory.getLogger(KafkaCrossDcConsumer.class);
+
+ private final KafkaConsumer<String, MirroredSolrRequest> consumer;
+ private final KafkaMirroringSink kafkaMirroringSink;
+
+ private final int KAFKA_CONSUMER_POLL_TIMEOUT_MS = 100;
+ SolrMessageProcessor messageProcessor;
+
+ /**
+ * @param conf The Kafka consumer configuration
+ */
+ public KafkaCrossDcConsumer(KafkaCrossDcConf conf) {
+ final Properties kafkaConsumerProp = new Properties();
+ logger.info("Creating Kafka consumer with configuration {}", kafkaConsumerProp);
+ consumer = createConsumer(kafkaConsumerProp);
+
+ // Create producer for resubmitting failed requests
+ logger.info("Creating Kafka resubmit producer");
+ this.kafkaMirroringSink = new KafkaMirroringSink(conf);
+ logger.info("Created Kafka resubmit producer");
+
+ }
+
+ private KafkaConsumer<String, MirroredSolrRequest> createConsumer(Properties properties) {
+ KafkaConsumer kafkaConsumer = new KafkaConsumer(properties);
+ return kafkaConsumer;
+ }
+
+ /**
+ * This is where the magic happens.
+ * 1. Polls and gets the packets from the queue
+ * 2. Extract the MirroredSolrRequest objects
+ * 3. Send the request to the MirroredSolrRequestHandler that has the processing, retry, error handling logic.
+ */
+ @Override
+ public void run() {
+ logger.info("About to start Kafka consumer thread ");
+ String topic="topic";
+
+ logger.info("Kafka consumer subscribing to topic topic={}", topic);
+ consumer.subscribe(Collections.singleton(topic));
+
+ while (pollAndProcessRequests()) {
+ //no-op within this loop: everything is done in pollAndProcessRequests method defined above.
+ }
+
+ logger.info("Closed kafka consumer. Exiting now.");
+ consumer.close();
+
+ }
+
+ /**
+ * Polls and processes the requests from Kafka. This method returns false when the consumer needs to be
+ * shutdown i.e. when there's a wakeup exception.
+ */
+ boolean pollAndProcessRequests() {
+ try {
+ ConsumerRecords<String, MirroredSolrRequest> records = consumer.poll(Duration.ofMillis(KAFKA_CONSUMER_POLL_TIMEOUT_MS));
+ for (TopicPartition partition : records.partitions()) {
+ List<ConsumerRecord<String, MirroredSolrRequest>> partitionRecords = records.records(partition);
+ try {
+ for (ConsumerRecord<String, MirroredSolrRequest> record : partitionRecords) {
+ logger.trace("Fetched record from topic={} partition={} key={} value={}",
+ record.topic(), record.partition(), record.key(), record.value());
+ IQueueHandler.Result result = messageProcessor.handleItem(record.value());
+ switch (result.status()) {
+ case FAILED_RESUBMIT:
+ kafkaMirroringSink.submit(record.value());
+ break;
+ case HANDLED:
+ // no-op
+ break;
+ case NOT_HANDLED_SHUTDOWN:
+ case FAILED_RETRY:
+ logger.error("Unexpected response while processing request. We never expect {}.",
+ result.status().toString());
+ break;
+ default:
+ // no-op
+ }
+ }
+ updateOffset(partition, partitionRecords);
+
+ // handleItem sets the thread interrupt, let's exit if there has been an interrupt set
+ if(Thread.currentThread().isInterrupted()) {
+ logger.info("Kafka Consumer thread interrupted, shutting down Kafka consumer.");
+ return false;
+ }
+ } catch (MirroringException e) {
+ // We don't really know what to do here, so it's wiser to just break out.
+ logger.error("Mirroring exception occured while resubmitting to Kafka. We are going to stop the consumer thread now.", e);
+ return false;
+ } catch (WakeupException e) {
+ logger.info("Caught wakeup exception, shutting down KafkaSolrRequestConsumer.");
+ return false;
+ } catch (Exception e) {
+ // If there is any exception returned by handleItem, then reset the offset.
+ logger.warn("Exception occurred in Kafka consumer thread, but we will continue.", e);
+ resetOffsetForPartition(partition, partitionRecords);
+ break;
+ }
+ }
+ } catch (WakeupException e) {
+ logger.info("Caught wakeup exception, shutting down KafkaSolrRequestConsumer");
+ return false;
+ } catch (Exception e) {
+ logger.error("Exception occurred in Kafka consumer thread, but we will continue.", e);
+ }
+ return true;
+ }
+
+ /**
+ * Reset the local offset so that the consumer reads the records from Kafka again.
+ * @param partition The TopicPartition to reset the offset for
+ * @param partitionRecords PartitionRecords for the specified partition
+ */
+ private void resetOffsetForPartition(TopicPartition partition, List<ConsumerRecord<String, MirroredSolrRequest>> partitionRecords) {
+ logger.debug("Resetting offset to: {}", partitionRecords.get(0).offset());
+ long resetOffset = partitionRecords.get(0).offset();
+ consumer.seek(partition, resetOffset);
+ }
+
+ /**
+ * Logs and updates the commit point for the partition that has been processed.
+ * @param partition The TopicPartition to update the offset for
+ * @param partitionRecords PartitionRecords for the specified partition
+ */
+ private void updateOffset(TopicPartition partition, List<ConsumerRecord<String, MirroredSolrRequest>> partitionRecords) {
+ long nextOffset = partitionRecords.get(partitionRecords.size() - 1).offset() + 1;
+ consumer.commitSync(Collections.singletonMap(partition, new OffsetAndMetadata(nextOffset)));
+
+ logger.trace("Updated offset for topic={} partition={} to offset={}",
+ partition.topic(), partition.partition(), nextOffset);
+ }
+
+ /**
+ * Shutdown the Kafka consumer by calling wakeup.
+ */
+ void shutdown() {
+ consumer.wakeup();
+ }
+
+
+ }
+}
diff --git a/crossdc-consumer/src/main/java/org/apache/solr/crossdc/helpers/SendDummyUpdates.java b/crossdc-consumer/src/main/java/org/apache/solr/crossdc/helpers/SendDummyUpdates.java
new file mode 100644
index 0000000..a066741
--- /dev/null
+++ b/crossdc-consumer/src/main/java/org/apache/solr/crossdc/helpers/SendDummyUpdates.java
@@ -0,0 +1,51 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.solr.crossdc.helpers;
+
+import org.apache.kafka.clients.producer.KafkaProducer;
+import org.apache.kafka.clients.producer.Producer;
+import org.apache.kafka.clients.producer.ProducerRecord;
+import org.apache.kafka.common.serialization.StringSerializer;
+import org.apache.solr.client.solrj.request.UpdateRequest;
+import org.apache.solr.crossdc.common.MirroredSolrRequest;
+import org.apache.solr.crossdc.common.MirroredSolrRequestSerializer;
+
+import java.util.Properties;
+
+public class SendDummyUpdates {
+ public static void main(String[] args) {
+ String TOPIC = "Trial";
+ Properties properties = new Properties();
+ properties.put("bootstrap.servers", "localhost:9092");
+ properties.put("acks", "all");
+ properties.put("retries", 0);
+ properties.put("batch.size", 16384);
+ properties.put("buffer.memory", 33554432);
+ properties.put("linger.ms", 1);
+ properties.put("key.serializer", StringSerializer.class.getName());
+ properties.put("value.serializer", MirroredSolrRequestSerializer.class.getName());
+ Producer<String, MirroredSolrRequest> producer = new KafkaProducer(properties);
+ UpdateRequest updateRequest = new UpdateRequest();
+ updateRequest.add("id", String.valueOf(System.currentTimeMillis()));
+ MirroredSolrRequest mirroredSolrRequest = new MirroredSolrRequest(updateRequest);
+ System.out.println("About to send producer record");
+ producer.send(new ProducerRecord(TOPIC, mirroredSolrRequest));
+ System.out.println("Sent producer record");
+ producer.close();
+ System.out.println("Closed producer");
+ }
+}
diff --git a/crossdc-consumer/src/main/java/org/apache/solr/crossdc/messageprocessor/MessageProcessor.java b/crossdc-consumer/src/main/java/org/apache/solr/crossdc/messageprocessor/MessageProcessor.java
new file mode 100644
index 0000000..210f4ca
--- /dev/null
+++ b/crossdc-consumer/src/main/java/org/apache/solr/crossdc/messageprocessor/MessageProcessor.java
@@ -0,0 +1,17 @@
+package org.apache.solr.crossdc.messageprocessor;
+
+import org.apache.solr.client.solrj.impl.CloudSolrClient;
+import org.apache.solr.crossdc.ResubmitBackoffPolicy;
+
+public abstract class MessageProcessor {
+
+ private final ResubmitBackoffPolicy resubmitBackoffPolicy;
+
+ public MessageProcessor(ResubmitBackoffPolicy resubmitBackoffPolicy) {
+ this.resubmitBackoffPolicy = resubmitBackoffPolicy;
+ }
+
+ public ResubmitBackoffPolicy getResubmitBackoffPolicy() {
+ return resubmitBackoffPolicy;
+ }
+}
diff --git a/crossdc-consumer/src/main/java/org/apache/solr/crossdc/messageprocessor/SolrMessageProcessor.java b/crossdc-consumer/src/main/java/org/apache/solr/crossdc/messageprocessor/SolrMessageProcessor.java
new file mode 100644
index 0000000..650fc83
--- /dev/null
+++ b/crossdc-consumer/src/main/java/org/apache/solr/crossdc/messageprocessor/SolrMessageProcessor.java
@@ -0,0 +1,308 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.solr.crossdc.messageprocessor;
+
+import org.apache.solr.client.solrj.SolrRequest;
+import org.apache.solr.client.solrj.impl.CloudSolrClient;
+import org.apache.solr.client.solrj.request.UpdateRequest;
+import org.apache.solr.client.solrj.response.SolrResponseBase;
+import org.apache.solr.common.SolrException;
+import org.apache.solr.common.SolrInputDocument;
+import org.apache.solr.common.params.ModifiableSolrParams;
+import org.apache.solr.common.params.SolrParams;
+import org.apache.solr.crossdc.ResubmitBackoffPolicy;
+import org.apache.solr.crossdc.common.CrossDcConstants;
+import org.apache.solr.crossdc.common.IQueueHandler;
+import org.apache.solr.crossdc.common.MirroredSolrRequest;
+import org.apache.solr.crossdc.common.SolrExceptionUtil;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import java.util.List;
+import java.util.Map;
+import java.util.concurrent.TimeUnit;
+
+/**
+ * Message processor implements all the logic to process a MirroredSolrRequest.
+ * It handles:
+ * 1. Sending the update request to Solr
+ * 2. Discarding or retrying failed requests
+ * 3. Flagging requests for resubmission by the underlying consumer implementation.
+ */
+public class SolrMessageProcessor extends MessageProcessor implements IQueueHandler<MirroredSolrRequest> {
+ private static final Logger logger = LoggerFactory.getLogger(SolrMessageProcessor.class);
+ final CloudSolrClient client;
+
+ private static final String VERSION_FIELD = "_version_";
+
+ public SolrMessageProcessor(CloudSolrClient client, ResubmitBackoffPolicy resubmitBackoffPolicy) {
+ super(resubmitBackoffPolicy);
+ this.client = client;
+ }
+
+ @Override
+ public Result<MirroredSolrRequest> handleItem(MirroredSolrRequest mirroredSolrRequest) {
+ connectToSolrIfNeeded();
+ preventCircularMirroring(mirroredSolrRequest);
+ return processMirroredRequest(mirroredSolrRequest);
+ }
+
+ private Result<MirroredSolrRequest> processMirroredRequest(MirroredSolrRequest request) {
+ final Result<MirroredSolrRequest> result = handleSolrRequest(request);
+ // Back-off before returning
+ backoffIfNeeded(result);
+ return result;
+ }
+
+ private Result<MirroredSolrRequest> handleSolrRequest(MirroredSolrRequest mirroredSolrRequest) {
+ logger.debug("Handling Solr request");
+ SolrRequest request = mirroredSolrRequest.getSolrRequest();
+ final SolrParams requestParams = request.getParams();
+
+ final String shouldMirror = requestParams.get("shouldMirror");
+ if (shouldMirror != null && !Boolean.parseBoolean(shouldMirror)) {
+ logger.warn("Skipping mirrored request because shouldMirror is set to false. request={}", request);
+ return new Result<>(ResultStatus.FAILED_NO_RETRY);
+ }
+ logFirstAttemptLatency(mirroredSolrRequest);
+
+ Result<MirroredSolrRequest> result;
+ try {
+ prepareIfUpdateRequest(request);
+ logRequest(request);
+ logger.debug("About to submit Solr request {}", request);
+ result = processMirroredSolrRequest(request);
+ } catch (Exception e) {
+ result = handleException(mirroredSolrRequest, e);
+ }
+
+ return result;
+ }
+
+ private Result<MirroredSolrRequest> handleException(MirroredSolrRequest mirroredSolrRequest, Exception e) {
+ final SolrException solrException = SolrExceptionUtil.asSolrException(e);
+ logIf4xxException(solrException);
+ if (!isRetryable(e)) {
+ logFailure(mirroredSolrRequest, e, solrException, false);
+ return new Result<>(ResultStatus.FAILED_NO_RETRY, e);
+ } else {
+ logFailure(mirroredSolrRequest, e, solrException, true);
+ mirroredSolrRequest.setAttempt(mirroredSolrRequest.getAttempt() + 1);
+ maybeBackoff(solrException);
+ return new Result<>(ResultStatus.FAILED_RESUBMIT, e, mirroredSolrRequest);
+ }
+ }
+
+ private void maybeBackoff(SolrException solrException) {
+ if (solrException == null) {
+ return;
+ }
+ long sleepTimeMs = 1000;
+ String backoffTimeSuggested = solrException.getMetadata("backoffTime-ms");
+ if (backoffTimeSuggested != null && !"0".equals(backoffTimeSuggested)) {
+ // If backoff policy is not configured (returns "0" by default), then sleep 1 second. If configured, do as it says.
+ sleepTimeMs = Math.max(1, Long.parseLong(backoffTimeSuggested));
+ }
+ logger.info("Consumer backoff. sleepTimeMs={}", sleepTimeMs);
+ uncheckedSleep(sleepTimeMs);
+ }
+
+ private boolean isRetryable(Exception e) {
+ SolrException se = SolrExceptionUtil.asSolrException(e);
+
+ if (se != null) {
+ int code = se.code();
+ if (code == SolrException.ErrorCode.CONFLICT.code) {
+ return false;
+ }
+ }
+ // Everything other than version conflict exceptions should be retried.
+ logger.warn("Unexpected exception, will resubmit the request to the queue", e);
+ return true;
+ }
+
+ private void logIf4xxException(SolrException solrException) {
+ // This shouldn't really happen but if it doesn, it most likely requires fixing in the return code from Solr.
+ if (solrException != null && 400 <= solrException.code() && solrException.code() < 500) {
+ logger.error("Exception occurred with 4xx response. {}", solrException.code(), solrException);
+ }
+ }
+
+ private void logFailure(MirroredSolrRequest mirroredSolrRequest, Exception e, SolrException solrException, boolean retryable) {
+ // This shouldn't really happen.
+ if (solrException != null && 400 <= solrException.code() && solrException.code() < 500) {
+ logger.error("Exception occurred with 4xx response. {}", solrException.code(), solrException);
+ return;
+ }
+
+ logger.warn("Resubmitting mirrored solr request after failure errorCode={} retryCount={}", solrException != null ? solrException.code() : -1, mirroredSolrRequest.getAttempt(), e);
+ }
+
+ /**
+ *
+ * Process the SolrRequest. If not, this method throws an exception.
+ */
+ private Result<MirroredSolrRequest> processMirroredSolrRequest(SolrRequest request) throws Exception {
+ Result<MirroredSolrRequest> result;
+ SolrResponseBase response = (SolrResponseBase) request.process(client);
+
+ int status = response.getStatus();
+ if (status != 0) {
+ throw new SolrException(SolrException.ErrorCode.getErrorCode(status), "response=" + response);
+ }
+
+ result = new Result<>(ResultStatus.HANDLED);
+ return result;
+ }
+
+ private void logRequest(SolrRequest request) {
+ if(request instanceof UpdateRequest) {
+ final StringBuilder rmsg = new StringBuilder(64);
+ rmsg.append("Submitting update request");
+ if(((UpdateRequest) request).getDeleteById() != null) {
+ rmsg.append(" numDeleteByIds=").append(((UpdateRequest) request).getDeleteById().size());
+ }
+ if(((UpdateRequest) request).getDocuments() != null) {
+ rmsg.append(" numUpdates=").append(((UpdateRequest) request).getDocuments().size());
+ }
+ if(((UpdateRequest) request).getDeleteQuery() != null) {
+ rmsg.append(" numDeleteByQuery=").append(((UpdateRequest) request).getDeleteQuery().size());
+ }
+ logger.info(rmsg.toString());
+ }
+ }
+
+ /**
+ * Clean up the Solr request to be submitted locally.
+ * @param request The SolrRequest to be cleaned up for submitting locally.
+ */
+ private void prepareIfUpdateRequest(SolrRequest request) {
+ if (request instanceof UpdateRequest) {
+ // Remove versions from add requests
+ UpdateRequest updateRequest = (UpdateRequest) request;
+
+ List<SolrInputDocument> documents = updateRequest.getDocuments();
+ if (documents != null) {
+ for (SolrInputDocument doc : documents) {
+ sanitizeDocument(doc);
+ }
+ }
+ removeVersionFromDeleteByIds(updateRequest);
+ }
+ }
+
+ /**
+ * Strips fields that are problematic for replication.
+ */
+ private void sanitizeDocument(SolrInputDocument doc) {
+ logger.info("Removing {}", VERSION_FIELD + " : " + doc.getField(VERSION_FIELD).getValue());
+ doc.remove(VERSION_FIELD);
+ }
+
+ private void removeVersionFromDeleteByIds(UpdateRequest updateRequest) {
+ Map<String, Map<String, Object>> deleteIds = updateRequest.getDeleteByIdMap();
+ if (deleteIds != null) {
+ for (Map<String, Object> idParams : deleteIds.values()) {
+ if (idParams != null) {
+ idParams.put(UpdateRequest.VER, null);
+ }
+ }
+ }
+ }
+
+ private void logFirstAttemptLatency(MirroredSolrRequest mirroredSolrRequest) {
+ // Only record the latency of the first attempt, essentially measuring the latency from submitting on the
+ // primary side until the request is eligible to be consumed on the buddy side (or vice versa).
+ if (mirroredSolrRequest.getAttempt() == 1) {
+ logger.debug("First attempt latency = {}",
+ System.currentTimeMillis() - TimeUnit.NANOSECONDS.toMillis(mirroredSolrRequest.getSubmitTimeNanos()));
+ }
+ }
+
+ /**
+ * Adds {@link CrossDcConstants#SHOULD_MIRROR}=false to the params if it's not already specified.
+ * Logs a warning if it is specified and NOT set to false. (i.e. circular mirror may occur)
+ *
+ * @param mirroredSolrRequest MirroredSolrRequest object that is being processed.
+ */
+ private void preventCircularMirroring(MirroredSolrRequest mirroredSolrRequest) {
+ if (mirroredSolrRequest.getSolrRequest() instanceof UpdateRequest) {
+ UpdateRequest updateRequest = (UpdateRequest) mirroredSolrRequest.getSolrRequest();
+ ModifiableSolrParams params = updateRequest.getParams();
+ String shouldMirror = (params == null ? null : params.get(CrossDcConstants.SHOULD_MIRROR));
+ if (shouldMirror == null) {
+ logger.warn(CrossDcConstants.SHOULD_MIRROR + " param is missing - setting to false. Request={}", mirroredSolrRequest);
+ updateRequest.setParam(CrossDcConstants.SHOULD_MIRROR, "false");
+ } else if (!"false".equalsIgnoreCase(shouldMirror)) {
+ logger.warn(CrossDcConstants.SHOULD_MIRROR + " param equal to " + shouldMirror);
+ }
+ } else {
+ SolrParams params = mirroredSolrRequest.getSolrRequest().getParams();
+ String shouldMirror = (params == null ? null : params.get(CrossDcConstants.SHOULD_MIRROR));
+ if (shouldMirror == null) {
+ if (params instanceof ModifiableSolrParams) {
+ logger.warn("{} {}", CrossDcConstants.SHOULD_MIRROR, "param is missing - setting to false");
+ ((ModifiableSolrParams) params).set(CrossDcConstants.SHOULD_MIRROR, "false");
+ } else {
+ logger.warn("{} {}", CrossDcConstants.SHOULD_MIRROR, "param is missing and params are not modifiable");
+ }
+ } else if (!"false".equalsIgnoreCase(shouldMirror)) {
+ logger.warn("{} {}", CrossDcConstants.SHOULD_MIRROR, "param is present and set to " + shouldMirror);
+ }
+ }
+ }
+
+ private void connectToSolrIfNeeded() {
+ // Don't try to consume anything if we can't connect to the solr server
+ boolean connected = false;
+ while (!connected) {
+ try {
+ client.connect(); // volatile null-check if already connected
+ connected = true;
+ } catch (Exception e) {
+ logger.error("Unable to connect to solr server. Not consuming.", e);
+ uncheckedSleep(5000);
+ }
+ }
+ }
+
+ public void uncheckedSleep(long millis) {
+ try {
+ Thread.sleep(millis);
+ } catch (InterruptedException e) {
+ Thread.currentThread().interrupt();
+ throw new RuntimeException(e);
+ }
+ }
+
+ private void backoffIfNeeded(Result<MirroredSolrRequest> result) {
+ if (result.status().equals(ResultStatus.FAILED_RESUBMIT)) {
+ final long backoffMs = getResubmitBackoffPolicy().getBackoffTimeMs(result.newItem());
+ if (backoffMs > 0L) {
+ try {
+ Thread.sleep(backoffMs);
+ } catch (final InterruptedException ex) {
+ // we're about to exit the method anyway, so just log this and return the item. Let the caller
+ // handle it.
+ logger.warn("Thread interrupted while backing off before retry");
+ Thread.currentThread().interrupt();
+ }
+ }
+ }
+ }
+
+}
diff --git a/crossdc-consumer/src/test/java/org/apache/solr/crossdc/TestMessageProcessor.java b/crossdc-consumer/src/test/java/org/apache/solr/crossdc/TestMessageProcessor.java
new file mode 100644
index 0000000..34f44f9
--- /dev/null
+++ b/crossdc-consumer/src/test/java/org/apache/solr/crossdc/TestMessageProcessor.java
@@ -0,0 +1,125 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.solr.crossdc;
+
+import org.apache.solr.client.solrj.impl.CloudSolrClient;
+import org.apache.solr.client.solrj.request.UpdateRequest;
+import org.apache.solr.common.SolrException;
+import org.apache.solr.common.SolrInputDocument;
+import org.apache.solr.common.util.NamedList;
+import org.apache.solr.crossdc.common.IQueueHandler;
+import org.apache.solr.crossdc.common.MirroredSolrRequest;
+import org.apache.solr.crossdc.messageprocessor.SolrMessageProcessor;
+import org.junit.Before;
+import org.junit.Test;
+import org.mockito.Mock;
+import org.mockito.Mockito;
+import org.mockito.MockitoAnnotations;
+
+import java.util.Map;
+
+import static org.junit.Assert.assertEquals;
+import static org.junit.Assert.assertNull;
+import static org.mockito.Mockito.*;
+
+public class TestMessageProcessor {
+ static final String VERSION_FIELD = "_version_";
+
+ private static class NoOpResubmitBackoffPolicy implements ResubmitBackoffPolicy {
+ @Override
+ public long getBackoffTimeMs(MirroredSolrRequest resubmitRequest) {
+ return 0;
+ }
+ }
+
+ @Mock
+ private CloudSolrClient solrClient;
+ private SolrMessageProcessor processor;
+
+ private ResubmitBackoffPolicy backoffPolicy = spy(new NoOpResubmitBackoffPolicy());
+
+ @Before
+ public void setUp() {
+ MockitoAnnotations.initMocks(this);
+
+ processor = Mockito.spy(new SolrMessageProcessor(solrClient,
+ backoffPolicy));
+ Mockito.doNothing().when(processor).uncheckedSleep(anyLong());
+ }
+
+ @Test
+ public void testDocumentSanitization() {
+ UpdateRequest request = spy(new UpdateRequest());
+
+ // Add docs with and without version
+ request.add(new SolrInputDocument() {
+ {
+ setField("id", 1);
+ setField(VERSION_FIELD, 1);
+ }
+ });
+ request.add(new SolrInputDocument() {
+ {
+ setField("id", 2);
+ }
+ });
+
+ // Delete by id with and without version
+ request.deleteById("1");
+ request.deleteById("2", 10L);
+
+ request.setParam("shouldMirror", "true");
+ // The response is irrelevant, but it will fail because mocked server returns null when processing
+ processor.handleItem(new MirroredSolrRequest(request));
+
+ // After processing, check that all version fields are stripped
+ for (SolrInputDocument doc : request.getDocuments()) {
+ assertNull("Doc still has version", doc.getField(VERSION_FIELD));
+ }
+
+ // Check versions in delete by id
+ for (Map<String, Object> idParams : request.getDeleteByIdMap().values()) {
+ if (idParams != null) {
+ idParams.put(UpdateRequest.VER, null);
+ assertNull("Delete still has version", idParams.get(UpdateRequest.VER));
+ }
+ }
+ }
+
+ @Test
+ public void testSuccessNoBackoff() throws Exception {
+ final UpdateRequest request = spy(new UpdateRequest());
+ when(solrClient.request(eq(request), anyString())).thenReturn(new NamedList<>());
+
+ processor.handleItem(new MirroredSolrRequest(request));
+
+ verify(backoffPolicy, times(0)).getBackoffTimeMs(any());
+ }
+
+ @Test
+ public void testClientErrorNoRetries() throws Exception {
+ final UpdateRequest request = new UpdateRequest();
+ request.setParam("shouldMirror", "true");
+ when(solrClient.request(eq(request), anyString())).thenThrow(
+ new SolrException(
+ SolrException.ErrorCode.BAD_REQUEST, "err msg"));
+
+ IQueueHandler.Result<MirroredSolrRequest> result = processor.handleItem(new MirroredSolrRequest(request));
+ assertEquals(IQueueHandler.ResultStatus.FAILED_RESUBMIT, result.status());
+ }
+}
diff --git a/gradle/wrapper/gradle-wrapper.properties b/gradle/wrapper/gradle-wrapper.properties
index 4d9ca16..8cf6eb5 100644
--- a/gradle/wrapper/gradle-wrapper.properties
+++ b/gradle/wrapper/gradle-wrapper.properties
@@ -1,5 +1,5 @@
distributionBase=GRADLE_USER_HOME
distributionPath=wrapper/dists
-distributionUrl=https\://services.gradle.org/distributions/gradle-6.7.1-bin.zip
+distributionUrl=https\://services.gradle.org/distributions/gradle-6.8.3-all.zip
zipStoreBase=GRADLE_USER_HOME
zipStorePath=wrapper/dists
diff --git a/settings.gradle b/settings.gradle
index 825c91f..3e6558d 100644
--- a/settings.gradle
+++ b/settings.gradle
@@ -7,4 +7,6 @@
* in the user manual at https://docs.gradle.org/6.7.1/userguide/multi_project_builds.html
*/
-rootProject.name = 'lucene-solr-sandbox'
+rootProject.name = 'solr-sandbox'
+
+include 'crossdc-consumer'
\ No newline at end of file
diff --git a/version.props b/version.props
new file mode 100644
index 0000000..e69de29