You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@solr.apache.org by ma...@apache.org on 2022/05/31 19:12:43 UTC
[solr-sandbox] branch crossdc-wip updated: Working out remaining issues via manual testing. (#15)
This is an automated email from the ASF dual-hosted git repository.
markrmiller pushed a commit to branch crossdc-wip
in repository https://gitbox.apache.org/repos/asf/solr-sandbox.git
The following commit(s) were added to refs/heads/crossdc-wip by this push:
new 05f0ad8 Working out remaining issues via manual testing. (#15)
05f0ad8 is described below
commit 05f0ad8a0b590ef559b6f54a16b22c3671d6668f
Author: Mark Robert Miller <ma...@apache.org>
AuthorDate: Tue May 31 14:12:38 2022 -0500
Working out remaining issues via manual testing. (#15)
* Tie up
* Fix a variety of issues.
---
cluster-stop.sh | 23 +++
cluster.sh | 95 ++++++++++++
crossdc-commons/build.gradle | 6 +-
.../solr/crossdc/common}/KafkaMirroringSink.java | 18 ++-
.../common/MirroredSolrRequestSerializer.java | 57 +++++--
.../solr/crossdc/common}/MirroringException.java | 2 +-
.../solr/crossdc/common}/RequestMirroringSink.java | 4 +-
.../crossdc/common}/ResubmitBackoffPolicy.java | 2 +-
crossdc-consumer/build.gradle | 32 ++--
.../org/apache/solr/crossdc/consumer/Consumer.java | 86 +++++++----
.../crossdc/messageprocessor/MessageProcessor.java | 3 +-
.../messageprocessor/SolrMessageProcessor.java | 46 +++---
.../org/apache/solr/crossdc/IntegrationTest.java | 6 +-
.../solr/crossdc/SimpleSolrIntegrationTest.java | 9 +-
.../apache/solr/crossdc/TestMessageProcessor.java | 1 +
crossdc-consumer/src/test/resources/log4j2.xml | 27 +++-
crossdc-producer/build.gradle | 34 ++--
.../processor/KafkaRequestMirroringHandler.java | 21 +--
.../MirroringUpdateRequestProcessorFactory.java | 64 +++++++-
.../solr/crossdc/SolrAndKafkaIntegrationTest.java | 171 +++++++++++----------
.../SolrKafkaTestsIgnoredThreadsFilter.java | 0
.../configs/cloud-minimal/conf/schema.xml | 54 +++++++
.../configs/cloud-minimal/conf/solrconfig.xml | 120 +++++++++++++++
.../src/test/resources/log4j2.xml | 27 +++-
gradle/wrapper/gradle-wrapper.properties | 2 +-
.../src/test/resources/log4j2.xml => log4j2.xml | 27 +++-
26 files changed, 722 insertions(+), 215 deletions(-)
diff --git a/cluster-stop.sh b/cluster-stop.sh
new file mode 100644
index 0000000..91a6b21
--- /dev/null
+++ b/cluster-stop.sh
@@ -0,0 +1,23 @@
+#!/bin/bash
+
+kafkaBase="https://archive.apache.org/dist/kafka/2.8.1"
+solrBase="https://dlcdn.apache.org/lucene/solr/8.11.1"
+
+kafka="kafka_2.12-2.8.1"
+solr="solr-8.11.1"
+
+
+cd cluster
+
+(
+ cd "${kafka}" || exit
+
+ bin/zookeeper-server-stop.sh config/zookeeper.properties
+ bin/kafka-server-stop.sh config/server.properties
+)
+
+(
+ cd "${solr}" || exit
+
+ bin/solr stop -all
+)
\ No newline at end of file
diff --git a/cluster.sh b/cluster.sh
new file mode 100644
index 0000000..e7439df
--- /dev/null
+++ b/cluster.sh
@@ -0,0 +1,95 @@
+#!/bin/bash
+
+kafkaBase="https://archive.apache.org/dist/kafka/2.8.1"
+solrBase="https://dlcdn.apache.org/lucene/solr/8.11.1"
+
+kafka="kafka_2.12-2.8.1"
+solr="solr-8.11.1"
+
+if [ ! -d cluster ]
+then
+ mkdir cluster
+fi
+
+cd cluster || exit
+
+if [ ! -f ${kafka}.tgz ]
+then
+ wget "${kafkaBase}/${kafka}.tgz"
+fi
+
+if [ ! -d ${kafka} ]
+then
+ tar -xvzf ${kafka}.tgz
+fi
+
+if [ ! -f ${solr}.tgz ]
+then
+ wget "${solrBase}/${solr}.tgz"
+fi
+
+if [ ! -d ${solr} ]
+then
+ tar -xvzf ${solr}.tgz
+fi
+
+(
+ cd "${kafka}" || exit
+
+
+bin/zookeeper-server-start.sh config/zookeeper.properties > ../kafka_zk.log &
+
+bin/kafka-server-start.sh config/server.properties > ../kafka_server.log &
+
+# for kafka 2.x zk port of 2181, for 3.x broker of 9093
+
+# bin/kafka-topics.sh --create --topic my-kafka-topic --bootstrap-server localhost:9093 --partitions 3 --replication-factor 2
+
+# bin/kafka-topics.sh --list --bootstrap-server localhost:9093
+
+# bin/kafka-console-producer.sh --broker-list localhost:9093,localhost:9094,localhost:9095 --topic my-kafka-topic
+
+# bin/kafka-console-consumer.sh --bootstrap-server localhost:9093 --topic my-kafka-topic --from-beginning
+
+# bin/kafka-console-consumer.sh --bootstrap-server localhost:9093 --topic my-kafka-topic --from-beginning --group group2
+)
+
+# need to go to lib folder - I can't believe there is no shared lib folder by default - crazy
+mkdir "${solr}/server/solr/lib"
+cp ../crossdc-commons/build/libs/crossdc-commons-*.jar "${solr}"/server/solr/lib
+cp ../crossdc-producer/build/libs/crossdc-producer-*.jar "${solr}"/server/solr/lib
+cp ../crossdc-producer/build/libs/kafka-clients-*.jar "${solr}"/server/solr/lib
+
+(
+ cd "${solr}" || exit
+
+ echo -e "SOLR_OPTS=\"$SOLR_OPTS -Dsolr.sharedLib=lib -DbootstrapServers=127.0.0.1:2181 -DtopicName=crossdc\"" >> bin/solr.in.sh
+
+ bin/solr start -cloud > ../solr.log
+
+ # for kafka 2.x ZK is on 2181, for Solr ZK is on 9983
+ # for the moment we upload the config set used in crossdc-producer tests
+
+ if [ ! -d "../../crossdc-producer/src/test/resources/configs/cloud-minimal/conf" ]
+ then
+ echo "Could not find configset folder to upload"
+ exit 1
+ fi
+ bin/solr zk upconfig -z 127.0.0.1:9983 -n crossdc -d ../../crossdc-producer/src/test/resources/configs/cloud-minimal/conf
+
+ bin/solr create -c collection1 -n crossdc
+
+ bin/solr status
+)
+
+cp ../crossdc-consumer/build/distributions/crossdc-consumer-*.tar .
+
+tar -xvf crossdc-consumer-*.tar
+rm crossdc-consumer-*.tar
+
+(
+ cd crossdc-consumer* || exit
+ CROSSDC_CONSUMER_OPTS="-Dlog4j2.configurationFile=../log4j2.xml -DbootstrapServers=127.0.0.1:2181 -DzkConnectString=127.0.0.1:2181 -DtopicName=crossdc" bin/crossdc-consumer > ../crossdc_consumer.log
+)
+
+curl -X POST -d '{"add":{"doc":{"id":"1","text":"datalicious"},"commitWithin":10}}' -H "Content-Type: application/json" http://127.0.0.1:8983/solr/collection1/update
\ No newline at end of file
diff --git a/crossdc-commons/build.gradle b/crossdc-commons/build.gradle
index b491b1b..7ac0fd1 100644
--- a/crossdc-commons/build.gradle
+++ b/crossdc-commons/build.gradle
@@ -28,9 +28,9 @@ repositories {
}
dependencies {
- compile group: 'org.apache.solr', name: 'solr-solrj', version: '8.11.1'
- compile 'org.apache.kafka:kafka-clients:2.8.0'
- compile group: 'com.google.guava', name: 'guava', version: '14.0'
+ implementation 'org.apache.solr:solr-solrj:8.11.1'
+ implementation 'org.apache.kafka:kafka-clients:2.8.1'
+ implementation 'com.google.guava:guava:14.0'
}
subprojects {
diff --git a/crossdc-consumer/src/main/java/org/apache/solr/crossdc/KafkaMirroringSink.java b/crossdc-commons/src/main/java/org/apache/solr/crossdc/common/KafkaMirroringSink.java
similarity index 88%
rename from crossdc-consumer/src/main/java/org/apache/solr/crossdc/KafkaMirroringSink.java
rename to crossdc-commons/src/main/java/org/apache/solr/crossdc/common/KafkaMirroringSink.java
index d6dcd50..b062392 100644
--- a/crossdc-consumer/src/main/java/org/apache/solr/crossdc/KafkaMirroringSink.java
+++ b/crossdc-commons/src/main/java/org/apache/solr/crossdc/common/KafkaMirroringSink.java
@@ -14,14 +14,11 @@
* See the License for the specific language governing permissions and
* limitations under the License.
*/
-package org.apache.solr.crossdc;
+package org.apache.solr.crossdc.common;
import org.apache.kafka.clients.producer.KafkaProducer;
import org.apache.kafka.clients.producer.Producer;
import org.apache.kafka.common.serialization.StringSerializer;
-import org.apache.solr.crossdc.common.KafkaCrossDcConf;
-import org.apache.solr.crossdc.common.MirroredSolrRequest;
-import org.apache.solr.crossdc.common.MirroredSolrRequestSerializer;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
@@ -92,7 +89,14 @@ public class KafkaMirroringSink implements RequestMirroringSink, Closeable {
props.put("key.serializer", StringSerializer.class.getName());
props.put("value.serializer", MirroredSolrRequestSerializer.class.getName());
- Producer<String, MirroredSolrRequest> producer = new KafkaProducer(props);
+ ClassLoader originalContextClassLoader = Thread.currentThread().getContextClassLoader();
+ Thread.currentThread().setContextClassLoader(null);
+ Producer<String, MirroredSolrRequest> producer;
+ try {
+ producer = new KafkaProducer(props);
+ } finally {
+ Thread.currentThread().setContextClassLoader(originalContextClassLoader);
+ }
return producer;
}
@@ -103,6 +107,8 @@ public class KafkaMirroringSink implements RequestMirroringSink, Closeable {
}
@Override public void close() throws IOException {
- producer.close();
+ if (producer != null) {
+ producer.close();
+ }
}
}
diff --git a/crossdc-commons/src/main/java/org/apache/solr/crossdc/common/MirroredSolrRequestSerializer.java b/crossdc-commons/src/main/java/org/apache/solr/crossdc/common/MirroredSolrRequestSerializer.java
index bb3104e..221f12f 100644
--- a/crossdc-commons/src/main/java/org/apache/solr/crossdc/common/MirroredSolrRequestSerializer.java
+++ b/crossdc-commons/src/main/java/org/apache/solr/crossdc/common/MirroredSolrRequestSerializer.java
@@ -21,6 +21,10 @@ import org.apache.kafka.common.serialization.Serializer;
import org.apache.solr.client.solrj.SolrRequest;
import org.apache.solr.client.solrj.request.JavaBinUpdateRequestCodec;
import org.apache.solr.client.solrj.request.UpdateRequest;
+import org.apache.solr.common.params.MapSolrParams;
+import org.apache.solr.common.params.ModifiableSolrParams;
+import org.apache.solr.common.params.MultiMapSolrParams;
+import org.apache.solr.common.util.JavaBinCodec;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
@@ -28,6 +32,8 @@ import java.io.ByteArrayInputStream;
import java.io.ByteArrayOutputStream;
import java.io.IOException;
import java.lang.invoke.MethodHandles;
+import java.util.HashMap;
+import java.util.List;
import java.util.Map;
public class MirroredSolrRequestSerializer implements Serializer<MirroredSolrRequest>, Deserializer<MirroredSolrRequest> {
@@ -48,23 +54,39 @@ public class MirroredSolrRequestSerializer implements Serializer<MirroredSolrReq
@Override
public MirroredSolrRequest deserialize(String topic, byte[] data) {
- UpdateRequest solrRequest;
+ Map solrRequest;
- JavaBinUpdateRequestCodec codec = new JavaBinUpdateRequestCodec();
+ JavaBinCodec codec = new JavaBinCodec();
ByteArrayInputStream bais = new ByteArrayInputStream(data);
try {
- solrRequest = codec.unmarshal(bais,
- (document, req, commitWithin, override) -> {
+ solrRequest = (Map) codec.unmarshal(bais);
- });
- } catch (IOException e) {
+ log.info("Deserialized class={} solrRequest={}", solrRequest.getClass().getName(), solrRequest);
+
+
+ } catch (Exception e) {
+ log.error("Exception unmarshalling JavaBin", e);
throw new RuntimeException(e);
}
- log.info("solrRequest={}, {}", solrRequest.getParams(), solrRequest.getDocuments());
+ UpdateRequest updateRequest = new UpdateRequest();
+ List docs = (List) solrRequest.get("docs");
+ if (docs != null) {
+ updateRequest.add(docs);
+ }
- return new MirroredSolrRequest(solrRequest);
+ List deletes = (List) solrRequest.get("deletes");
+ if (deletes != null) {
+ updateRequest.deleteById(deletes);
+ }
+
+ Map params = (Map) solrRequest.get("params");
+ if (params != null) {
+ updateRequest.setParams(ModifiableSolrParams.of(new MapSolrParams(params)));
+ }
+
+ return new MirroredSolrRequest(updateRequest);
}
/**
@@ -77,12 +99,23 @@ public class MirroredSolrRequestSerializer implements Serializer<MirroredSolrReq
@Override
public byte[] serialize(String topic, MirroredSolrRequest request) {
// TODO: add checks
- SolrRequest solrRequest = request.getSolrRequest();
- UpdateRequest updateRequest = (UpdateRequest)solrRequest;
- JavaBinUpdateRequestCodec codec = new JavaBinUpdateRequestCodec();
+ UpdateRequest solrRequest = (UpdateRequest) request.getSolrRequest();
+
+ log.info("serialize request={} docs={}", solrRequest, solrRequest.getDocuments());
+
+ JavaBinCodec codec = new JavaBinCodec(null);
+
ExposedByteArrayOutputStream baos = new ExposedByteArrayOutputStream();
+ Map map = new HashMap();
+ map.put("params", solrRequest.getParams().getMap());
+ map.put("docs", solrRequest.getDocuments());
+
+ // TODO
+ //map.put("deletes", solrRequest.getDeleteByIdMap());
+ map.put("deletes", solrRequest.getDeleteById());
+
try {
- codec.marshal(updateRequest, baos);
+ codec.marshal(map, baos);
} catch (IOException e) {
throw new RuntimeException(e);
}
diff --git a/crossdc-consumer/src/main/java/org/apache/solr/crossdc/MirroringException.java b/crossdc-commons/src/main/java/org/apache/solr/crossdc/common/MirroringException.java
similarity index 96%
rename from crossdc-consumer/src/main/java/org/apache/solr/crossdc/MirroringException.java
rename to crossdc-commons/src/main/java/org/apache/solr/crossdc/common/MirroringException.java
index 2073750..ac633ec 100644
--- a/crossdc-consumer/src/main/java/org/apache/solr/crossdc/MirroringException.java
+++ b/crossdc-commons/src/main/java/org/apache/solr/crossdc/common/MirroringException.java
@@ -14,7 +14,7 @@
* See the License for the specific language governing permissions and
* limitations under the License.
*/
-package org.apache.solr.crossdc;
+package org.apache.solr.crossdc.common;
/**
* Exception thrown during cross-dc mirroring
diff --git a/crossdc-consumer/src/main/java/org/apache/solr/crossdc/RequestMirroringSink.java b/crossdc-commons/src/main/java/org/apache/solr/crossdc/common/RequestMirroringSink.java
similarity index 93%
rename from crossdc-consumer/src/main/java/org/apache/solr/crossdc/RequestMirroringSink.java
rename to crossdc-commons/src/main/java/org/apache/solr/crossdc/common/RequestMirroringSink.java
index 5d09c16..e8b2c69 100644
--- a/crossdc-consumer/src/main/java/org/apache/solr/crossdc/RequestMirroringSink.java
+++ b/crossdc-commons/src/main/java/org/apache/solr/crossdc/common/RequestMirroringSink.java
@@ -14,9 +14,7 @@
* See the License for the specific language governing permissions and
* limitations under the License.
*/
-package org.apache.solr.crossdc;
-
-import org.apache.solr.crossdc.common.MirroredSolrRequest;
+package org.apache.solr.crossdc.common;
public interface RequestMirroringSink {
diff --git a/crossdc-consumer/src/main/java/org/apache/solr/crossdc/ResubmitBackoffPolicy.java b/crossdc-commons/src/main/java/org/apache/solr/crossdc/common/ResubmitBackoffPolicy.java
similarity index 96%
rename from crossdc-consumer/src/main/java/org/apache/solr/crossdc/ResubmitBackoffPolicy.java
rename to crossdc-commons/src/main/java/org/apache/solr/crossdc/common/ResubmitBackoffPolicy.java
index e531cca..82babbf 100644
--- a/crossdc-consumer/src/main/java/org/apache/solr/crossdc/ResubmitBackoffPolicy.java
+++ b/crossdc-commons/src/main/java/org/apache/solr/crossdc/common/ResubmitBackoffPolicy.java
@@ -14,7 +14,7 @@
* See the License for the specific language governing permissions and
* limitations under the License.
*/
-package org.apache.solr.crossdc;
+package org.apache.solr.crossdc.common;
import org.apache.solr.crossdc.common.MirroredSolrRequest;
diff --git a/crossdc-consumer/build.gradle b/crossdc-consumer/build.gradle
index e173dd3..61de292 100644
--- a/crossdc-consumer/build.gradle
+++ b/crossdc-consumer/build.gradle
@@ -32,34 +32,32 @@ application {
}
dependencies {
- compile group: 'org.apache.solr', name: 'solr-solrj', version: '8.11.1'
- compile project(':crossdc-commons')
- implementation 'org.slf4j:slf4j-api'
- compile 'org.eclipse.jetty:jetty-http:9.4.41.v20210516'
- compile 'org.eclipse.jetty:jetty-server:9.4.41.v20210516'
- compile 'org.eclipse.jetty:jetty-servlet:9.4.41.v20210516'
- compile 'org.apache.kafka:kafka-clients:2.8.0'
- compile group: 'com.google.guava', name: 'guava', version: '14.0'
+ implementation group: 'org.apache.solr', name: 'solr-solrj', version: '8.11.1'
+ implementation project(':crossdc-commons')
+
+ implementation 'org.slf4j:slf4j-api:1.7.36'
+ implementation 'org.eclipse.jetty:jetty-http:9.4.41.v20210516'
+ implementation 'org.eclipse.jetty:jetty-server:9.4.41.v20210516'
+ implementation 'org.eclipse.jetty:jetty-servlet:9.4.41.v20210516'
+ implementation 'org.apache.kafka:kafka-clients:2.8.1'
+ implementation group: 'com.google.guava', name: 'guava', version: '14.0'
+ implementation 'org.apache.logging.log4j:log4j-slf4j-impl:2.17.2'
runtimeOnly ('com.google.protobuf:protobuf-java-util:3.19.2')
testImplementation 'org.hamcrest:hamcrest:2.2'
testImplementation 'junit:junit:4.13.2'
testImplementation('org.mockito:mockito-core:4.3.1', {
exclude group: "net.bytebuddy", module: "byte-buddy-agent"
})
+
+ testImplementation project(':crossdc-producer')
+
testImplementation group: 'org.apache.solr', name: 'solr-core', version: '8.11.1'
testImplementation group: 'org.apache.solr', name: 'solr-test-framework', version: '8.11.1'
testImplementation 'org.apache.kafka:kafka_2.13:2.8.1'
testImplementation 'org.apache.kafka:kafka-streams:2.8.1'
-
testImplementation 'org.apache.kafka:kafka_2.13:2.8.1:test'
- testImplementation ('org.apache.kafka:kafka-clients:2.8.1:test') {
- //exclude group: 'org.apache.kafka' // or finer grained, if we like
- }
- testImplementation ('org.apache.kafka:kafka-streams:2.8.1:test') {
- //exclude group: 'org.apache.kafka' // or finer grained, if we like
- }
- // testImplementation 'org.apache.kafka:kafka-streams-test-utils:2.8.1'
-
+ testImplementation 'org.apache.kafka:kafka-clients:2.8.1:test'
+ testImplementation 'org.apache.kafka:kafka-streams:2.8.1:test'
}
subprojects {
diff --git a/crossdc-consumer/src/main/java/org/apache/solr/crossdc/consumer/Consumer.java b/crossdc-consumer/src/main/java/org/apache/solr/crossdc/consumer/Consumer.java
index 62e0a84..5b3d01c 100644
--- a/crossdc-consumer/src/main/java/org/apache/solr/crossdc/consumer/Consumer.java
+++ b/crossdc-consumer/src/main/java/org/apache/solr/crossdc/consumer/Consumer.java
@@ -18,14 +18,14 @@ package org.apache.solr.crossdc.consumer;
import org.apache.kafka.clients.consumer.*;
import org.apache.kafka.common.TopicPartition;
+import org.apache.kafka.common.errors.SerializationException;
import org.apache.kafka.common.errors.WakeupException;
-import org.apache.kafka.common.serialization.ByteArrayDeserializer;
import org.apache.kafka.common.serialization.StringDeserializer;
-import org.apache.kafka.common.serialization.StringSerializer;
import org.apache.solr.client.solrj.impl.CloudSolrClient;
-import org.apache.solr.crossdc.KafkaMirroringSink;
-import org.apache.solr.crossdc.MirroringException;
-import org.apache.solr.crossdc.ResubmitBackoffPolicy;
+import org.apache.solr.common.util.IOUtils;
+import org.apache.solr.crossdc.common.KafkaMirroringSink;
+import org.apache.solr.crossdc.common.MirroringException;
+import org.apache.solr.crossdc.common.ResubmitBackoffPolicy;
import org.apache.solr.crossdc.common.IQueueHandler;
import org.apache.solr.crossdc.common.KafkaCrossDcConf;
import org.apache.solr.crossdc.common.MirroredSolrRequest;
@@ -62,6 +62,16 @@ public class Consumer {
private String topicName;
public void start(String bootstrapServers, String zkConnectString, String topicName, boolean enableDataEncryption, int port) {
+ if (bootstrapServers == null) {
+ throw new IllegalArgumentException("bootstrapServers config was not passed at startup");
+ }
+ if (bootstrapServers == null) {
+ throw new IllegalArgumentException("zkConnectString config was not passed at startup");
+ }
+ if (bootstrapServers == null) {
+ throw new IllegalArgumentException("topicName config was not passed at startup");
+ }
+
this.topicName = topicName;
server = new Server();
@@ -98,7 +108,9 @@ public class Consumer {
}
public void shutdown() {
- crossDcConsumer.shutdown();
+ if (crossDcConsumer != null) {
+ crossDcConsumer.shutdown();
+ }
}
/**
@@ -124,6 +136,8 @@ public class Consumer {
private final String topicName;
SolrMessageProcessor messageProcessor;
+ CloudSolrClient solrClient;
+
/**
* @param conf The Kafka consumer configuration
*/
@@ -134,12 +148,9 @@ public class Consumer {
kafkaConsumerProp.put("bootstrap.servers", conf.getBootStrapServers());
- kafkaConsumerProp.put("key.deserializer", StringDeserializer.class.getName());
- kafkaConsumerProp.put("value.deserializer", MirroredSolrRequestSerializer.class.getName());
-
- kafkaConsumerProp.put("group.id", "group_1");
+ kafkaConsumerProp.put("group.id", "group_1"); // TODO
- CloudSolrClient solrClient = new CloudSolrClient.Builder(Collections.singletonList(conf.getSolrZkConnectString()), Optional.empty()).build();
+ solrClient = new CloudSolrClient.Builder(Collections.singletonList(conf.getSolrZkConnectString()), Optional.empty()).build();
messageProcessor = new SolrMessageProcessor(solrClient, new ResubmitBackoffPolicy() {
@Override public long getBackoffTimeMs(MirroredSolrRequest resubmitRequest) {
@@ -158,7 +169,7 @@ public class Consumer {
}
private KafkaConsumer<String, MirroredSolrRequest> createConsumer(Properties properties) {
- KafkaConsumer kafkaConsumer = new KafkaConsumer(properties);
+ KafkaConsumer kafkaConsumer = new KafkaConsumer(properties, new StringDeserializer(), new MirroredSolrRequestSerializer());
return kafkaConsumer;
}
@@ -173,23 +184,29 @@ public class Consumer {
log.info("About to start Kafka consumer thread...");
log.info("Kafka consumer subscribing to topic topic={}", topicName);
- consumer.subscribe(Collections.singleton(topicName));
- while (pollAndProcessRequests()) {
- //no-op within this loop: everything is done in pollAndProcessRequests method defined above.
- }
-
- log.info("Closed kafka consumer. Exiting now.");
try {
- consumer.close();
- } catch (Exception e) {
- log.warn("Failed to close kafka consumer", e);
- }
- try {
- kafkaMirroringSink.close();
- } catch (Exception e) {
- log.warn("Failed to close kafka mirroring sink", e);
+ consumer.subscribe(Collections.singleton(topicName));
+
+ while (pollAndProcessRequests()) {
+ //no-op within this loop: everything is done in pollAndProcessRequests method defined above.
+ }
+
+ log.info("Closed kafka consumer. Exiting now.");
+ try {
+ consumer.close();
+ } catch (Exception e) {
+ log.warn("Failed to close kafka consumer", e);
+ }
+
+ try {
+ kafkaMirroringSink.close();
+ } catch (Exception e) {
+ log.warn("Failed to close kafka mirroring sink", e);
+ }
+ } finally {
+ IOUtils.closeQuietly(solrClient);
}
}
@@ -240,6 +257,11 @@ public class Consumer {
return false;
} catch (Exception e) {
// If there is any exception returned by handleItem, then reset the offset.
+
+ if (e instanceof ClassCastException || e instanceof ClassNotFoundException || e instanceof SerializationException) {
+ log.error("Non retryable error", e);
+ break;
+ }
log.warn("Exception occurred in Kafka consumer thread, but we will continue.", e);
resetOffsetForPartition(partition, partitionRecords);
break;
@@ -249,6 +271,13 @@ public class Consumer {
log.info("Caught wakeup exception, shutting down KafkaSolrRequestConsumer");
return false;
} catch (Exception e) {
+
+ e.printStackTrace();
+ if (e instanceof ClassCastException || e instanceof ClassNotFoundException || e instanceof SerializationException) {
+ log.error("Non retryable error", e);
+ return false;
+ }
+
log.error("Exception occurred in Kafka consumer thread, but we will continue.", e);
}
return true;
@@ -287,6 +316,11 @@ public class Consumer {
*/
public void shutdown() {
log.info("Shutdown called on KafkaCrossDcConsumer");
+ try {
+ solrClient.close();
+ } catch (Exception e) {
+ log.warn("Exception closing Solr client on shutdown");
+ }
consumer.wakeup();
}
diff --git a/crossdc-consumer/src/main/java/org/apache/solr/crossdc/messageprocessor/MessageProcessor.java b/crossdc-consumer/src/main/java/org/apache/solr/crossdc/messageprocessor/MessageProcessor.java
index efbfef6..0620e69 100644
--- a/crossdc-consumer/src/main/java/org/apache/solr/crossdc/messageprocessor/MessageProcessor.java
+++ b/crossdc-consumer/src/main/java/org/apache/solr/crossdc/messageprocessor/MessageProcessor.java
@@ -16,8 +16,7 @@
*/
package org.apache.solr.crossdc.messageprocessor;
-import org.apache.solr.client.solrj.impl.CloudSolrClient;
-import org.apache.solr.crossdc.ResubmitBackoffPolicy;
+import org.apache.solr.crossdc.common.ResubmitBackoffPolicy;
public abstract class MessageProcessor {
diff --git a/crossdc-consumer/src/main/java/org/apache/solr/crossdc/messageprocessor/SolrMessageProcessor.java b/crossdc-consumer/src/main/java/org/apache/solr/crossdc/messageprocessor/SolrMessageProcessor.java
index 79ac32b..140d0b9 100644
--- a/crossdc-consumer/src/main/java/org/apache/solr/crossdc/messageprocessor/SolrMessageProcessor.java
+++ b/crossdc-consumer/src/main/java/org/apache/solr/crossdc/messageprocessor/SolrMessageProcessor.java
@@ -25,7 +25,7 @@ import org.apache.solr.common.SolrInputDocument;
import org.apache.solr.common.SolrInputField;
import org.apache.solr.common.params.ModifiableSolrParams;
import org.apache.solr.common.params.SolrParams;
-import org.apache.solr.crossdc.ResubmitBackoffPolicy;
+import org.apache.solr.crossdc.common.ResubmitBackoffPolicy;
import org.apache.solr.crossdc.common.CrossDcConstants;
import org.apache.solr.crossdc.common.IQueueHandler;
import org.apache.solr.crossdc.common.MirroredSolrRequest;
@@ -33,6 +33,7 @@ import org.apache.solr.crossdc.common.SolrExceptionUtil;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
+import java.lang.invoke.MethodHandles;
import java.util.List;
import java.util.Map;
import java.util.concurrent.TimeUnit;
@@ -45,7 +46,7 @@ import java.util.concurrent.TimeUnit;
* 3. Flagging requests for resubmission by the underlying consumer implementation.
*/
public class SolrMessageProcessor extends MessageProcessor implements IQueueHandler<MirroredSolrRequest> {
- private static final Logger logger = LoggerFactory.getLogger(SolrMessageProcessor.class);
+ private static final Logger log = LoggerFactory.getLogger(MethodHandles.lookup().lookupClass());
final CloudSolrClient client;
private static final String VERSION_FIELD = "_version_";
@@ -70,13 +71,14 @@ public class SolrMessageProcessor extends MessageProcessor implements IQueueHand
}
private Result<MirroredSolrRequest> handleSolrRequest(MirroredSolrRequest mirroredSolrRequest) {
- logger.debug("Handling Solr request");
+ log.debug("Handling Solr request");
SolrRequest request = mirroredSolrRequest.getSolrRequest();
final SolrParams requestParams = request.getParams();
final String shouldMirror = requestParams.get("shouldMirror");
- if (shouldMirror != null && !Boolean.parseBoolean(shouldMirror)) {
- logger.warn("Skipping mirrored request because shouldMirror is set to false. request={}", request);
+ log.info("shouldMirror={}", shouldMirror);
+ if ("false".equalsIgnoreCase(shouldMirror)) {
+ log.warn("Skipping mirrored request because shouldMirror is set to false. request={}", requestParams);
return new Result<>(ResultStatus.FAILED_NO_RETRY);
}
logFirstAttemptLatency(mirroredSolrRequest);
@@ -85,7 +87,7 @@ public class SolrMessageProcessor extends MessageProcessor implements IQueueHand
try {
prepareIfUpdateRequest(request);
logRequest(request);
- logger.debug("About to submit Solr request {}", request);
+ log.debug("About to submit Solr request {}", request);
result = processMirroredSolrRequest(request);
} catch (Exception e) {
result = handleException(mirroredSolrRequest, e);
@@ -118,7 +120,7 @@ public class SolrMessageProcessor extends MessageProcessor implements IQueueHand
// If backoff policy is not configured (returns "0" by default), then sleep 1 second. If configured, do as it says.
sleepTimeMs = Math.max(1, Long.parseLong(backoffTimeSuggested));
}
- logger.info("Consumer backoff. sleepTimeMs={}", sleepTimeMs);
+ log.info("Consumer backoff. sleepTimeMs={}", sleepTimeMs);
uncheckedSleep(sleepTimeMs);
}
@@ -132,25 +134,25 @@ public class SolrMessageProcessor extends MessageProcessor implements IQueueHand
}
}
// Everything other than version conflict exceptions should be retried.
- logger.warn("Unexpected exception, will resubmit the request to the queue", e);
+ log.warn("Unexpected exception, will resubmit the request to the queue", e);
return true;
}
private void logIf4xxException(SolrException solrException) {
// This shouldn't really happen but if it doesn, it most likely requires fixing in the return code from Solr.
if (solrException != null && 400 <= solrException.code() && solrException.code() < 500) {
- logger.error("Exception occurred with 4xx response. {}", solrException.code(), solrException);
+ log.error("Exception occurred with 4xx response. {}", solrException.code(), solrException);
}
}
private void logFailure(MirroredSolrRequest mirroredSolrRequest, Exception e, SolrException solrException, boolean retryable) {
// This shouldn't really happen.
if (solrException != null && 400 <= solrException.code() && solrException.code() < 500) {
- logger.error("Exception occurred with 4xx response. {}", solrException.code(), solrException);
+ log.error("Exception occurred with 4xx response. {}", solrException.code(), solrException);
return;
}
- logger.warn("Resubmitting mirrored solr request after failure errorCode={} retryCount={}", solrException != null ? solrException.code() : -1, mirroredSolrRequest.getAttempt(), e);
+ log.warn("Resubmitting mirrored solr request after failure errorCode={} retryCount={}", solrException != null ? solrException.code() : -1, mirroredSolrRequest.getAttempt(), e);
}
/**
@@ -158,7 +160,9 @@ public class SolrMessageProcessor extends MessageProcessor implements IQueueHand
* Process the SolrRequest. If not, this method throws an exception.
*/
private Result<MirroredSolrRequest> processMirroredSolrRequest(SolrRequest request) throws Exception {
+ log.info("Sending request to Solr at {} with params {}", client.getZkHost(), request.getParams());
Result<MirroredSolrRequest> result;
+
SolrResponseBase response = (SolrResponseBase) request.process(client);
int status = response.getStatus();
@@ -183,7 +187,7 @@ public class SolrMessageProcessor extends MessageProcessor implements IQueueHand
if(((UpdateRequest) request).getDeleteQuery() != null) {
rmsg.append(" numDeleteByQuery=").append(((UpdateRequest) request).getDeleteQuery().size());
}
- logger.info(rmsg.toString());
+ log.info(rmsg.toString());
}
}
@@ -211,7 +215,7 @@ public class SolrMessageProcessor extends MessageProcessor implements IQueueHand
*/
private void sanitizeDocument(SolrInputDocument doc) {
SolrInputField field = doc.getField(VERSION_FIELD);
- logger.info("Removing {} value={}", VERSION_FIELD, field == null ? "null" : field.getValue());
+ log.info("Removing {} value={}", VERSION_FIELD, field == null ? "null" : field.getValue());
doc.remove(VERSION_FIELD);
}
@@ -230,7 +234,7 @@ public class SolrMessageProcessor extends MessageProcessor implements IQueueHand
// Only record the latency of the first attempt, essentially measuring the latency from submitting on the
// primary side until the request is eligible to be consumed on the buddy side (or vice versa).
if (mirroredSolrRequest.getAttempt() == 1) {
- logger.debug("First attempt latency = {}",
+ log.debug("First attempt latency = {}",
System.currentTimeMillis() - TimeUnit.NANOSECONDS.toMillis(mirroredSolrRequest.getSubmitTimeNanos()));
}
}
@@ -247,23 +251,23 @@ public class SolrMessageProcessor extends MessageProcessor implements IQueueHand
ModifiableSolrParams params = updateRequest.getParams();
String shouldMirror = (params == null ? null : params.get(CrossDcConstants.SHOULD_MIRROR));
if (shouldMirror == null) {
- logger.warn(CrossDcConstants.SHOULD_MIRROR + " param is missing - setting to false. Request={}", mirroredSolrRequest);
+ log.warn(CrossDcConstants.SHOULD_MIRROR + " param is missing - setting to false. Request={}", mirroredSolrRequest);
updateRequest.setParam(CrossDcConstants.SHOULD_MIRROR, "false");
} else if (!"false".equalsIgnoreCase(shouldMirror)) {
- logger.warn(CrossDcConstants.SHOULD_MIRROR + " param equal to " + shouldMirror);
+ log.warn(CrossDcConstants.SHOULD_MIRROR + " param equal to " + shouldMirror);
}
} else {
SolrParams params = mirroredSolrRequest.getSolrRequest().getParams();
String shouldMirror = (params == null ? null : params.get(CrossDcConstants.SHOULD_MIRROR));
if (shouldMirror == null) {
if (params instanceof ModifiableSolrParams) {
- logger.warn("{} {}", CrossDcConstants.SHOULD_MIRROR, "param is missing - setting to false");
+ log.warn("{} {}", CrossDcConstants.SHOULD_MIRROR, "param is missing - setting to false");
((ModifiableSolrParams) params).set(CrossDcConstants.SHOULD_MIRROR, "false");
} else {
- logger.warn("{} {}", CrossDcConstants.SHOULD_MIRROR, "param is missing and params are not modifiable");
+ log.warn("{} {}", CrossDcConstants.SHOULD_MIRROR, "param is missing and params are not modifiable");
}
} else if (!"false".equalsIgnoreCase(shouldMirror)) {
- logger.warn("{} {}", CrossDcConstants.SHOULD_MIRROR, "param is present and set to " + shouldMirror);
+ log.warn("{} {}", CrossDcConstants.SHOULD_MIRROR, "param is present and set to " + shouldMirror);
}
}
}
@@ -276,7 +280,7 @@ public class SolrMessageProcessor extends MessageProcessor implements IQueueHand
client.connect(); // volatile null-check if already connected
connected = true;
} catch (Exception e) {
- logger.error("Unable to connect to solr server. Not consuming.", e);
+ log.error("Unable to connect to solr server. Not consuming.", e);
uncheckedSleep(5000);
}
}
@@ -300,7 +304,7 @@ public class SolrMessageProcessor extends MessageProcessor implements IQueueHand
} catch (final InterruptedException ex) {
// we're about to exit the method anyway, so just log this and return the item. Let the caller
// handle it.
- logger.warn("Thread interrupted while backing off before retry");
+ log.warn("Thread interrupted while backing off before retry");
Thread.currentThread().interrupt();
}
}
diff --git a/crossdc-consumer/src/test/java/org/apache/solr/crossdc/IntegrationTest.java b/crossdc-consumer/src/test/java/org/apache/solr/crossdc/IntegrationTest.java
index 695ebab..1648f80 100644
--- a/crossdc-consumer/src/test/java/org/apache/solr/crossdc/IntegrationTest.java
+++ b/crossdc-consumer/src/test/java/org/apache/solr/crossdc/IntegrationTest.java
@@ -1,20 +1,16 @@
package org.apache.solr.crossdc;
-import org.apache.solr.client.solrj.impl.CloudSolrClient;
import org.apache.solr.client.solrj.request.UpdateRequest;
import org.apache.solr.cloud.MiniSolrCloudCluster;
import org.apache.solr.cloud.SolrCloudTestCase;
import org.apache.solr.common.SolrInputDocument;
import org.apache.solr.crossdc.common.MirroredSolrRequest;
+import org.apache.solr.crossdc.common.ResubmitBackoffPolicy;
import org.apache.solr.crossdc.messageprocessor.SolrMessageProcessor;
import org.junit.AfterClass;
-import org.junit.Before;
import org.junit.BeforeClass;
import org.junit.Test;
-import org.mockito.Mockito;
-import java.util.ArrayList;
-import java.util.List;
import java.util.Map;
import static org.mockito.Mockito.spy;
diff --git a/crossdc-consumer/src/test/java/org/apache/solr/crossdc/SimpleSolrIntegrationTest.java b/crossdc-consumer/src/test/java/org/apache/solr/crossdc/SimpleSolrIntegrationTest.java
index dec570c..46188a8 100644
--- a/crossdc-consumer/src/test/java/org/apache/solr/crossdc/SimpleSolrIntegrationTest.java
+++ b/crossdc-consumer/src/test/java/org/apache/solr/crossdc/SimpleSolrIntegrationTest.java
@@ -1,4 +1,3 @@
- }
package org.apache.solr.crossdc;
import org.apache.solr.client.solrj.impl.CloudSolrClient;
@@ -19,15 +18,11 @@ import static org.mockito.Mockito.spy;
public class SimpleSolrIntegrationTest extends SolrCloudTestCase {
static final String VERSION_FIELD = "_version_";
- private static final int NUM_BROKERS = 1;
protected static volatile MiniSolrCloudCluster cluster1;
private static SolrMessageProcessor processor;
- private static ResubmitBackoffPolicy backoffPolicy = spy(new TestMessageProcessor.NoOpResubmitBackoffPolicy());
- private static CloudSolrClient cloudClient1;
-
@BeforeClass
public static void setupIntegrationTest() throws Exception {
@@ -37,9 +32,9 @@ public class SimpleSolrIntegrationTest extends SolrCloudTestCase {
.configure();
String collection = "collection1";
- cloudClient1 = cluster1.getSolrClient();
+ CloudSolrClient cloudClient1 = cluster1.getSolrClient();
- processor = new SolrMessageProcessor(cloudClient1, backoffPolicy);
+ processor = new SolrMessageProcessor(cloudClient1, null);
CollectionAdminRequest.Create create =
CollectionAdminRequest.createCollection(collection, "conf", 1, 1);
diff --git a/crossdc-consumer/src/test/java/org/apache/solr/crossdc/TestMessageProcessor.java b/crossdc-consumer/src/test/java/org/apache/solr/crossdc/TestMessageProcessor.java
index 6158fc4..4e68f4f 100644
--- a/crossdc-consumer/src/test/java/org/apache/solr/crossdc/TestMessageProcessor.java
+++ b/crossdc-consumer/src/test/java/org/apache/solr/crossdc/TestMessageProcessor.java
@@ -24,6 +24,7 @@ import org.apache.solr.common.SolrInputDocument;
import org.apache.solr.common.util.NamedList;
import org.apache.solr.crossdc.common.IQueueHandler;
import org.apache.solr.crossdc.common.MirroredSolrRequest;
+import org.apache.solr.crossdc.common.ResubmitBackoffPolicy;
import org.apache.solr.crossdc.messageprocessor.SolrMessageProcessor;
import org.junit.Before;
import org.junit.Test;
diff --git a/crossdc-consumer/src/test/resources/log4j2.xml b/crossdc-consumer/src/test/resources/log4j2.xml
index 96f69f1..c63e736 100644
--- a/crossdc-consumer/src/test/resources/log4j2.xml
+++ b/crossdc-consumer/src/test/resources/log4j2.xml
@@ -26,9 +26,33 @@
</Pattern>
</PatternLayout>
</Console>
+
+ <RollingRandomAccessFile
+ name="MainLogFile"
+ fileName="${sys:log.dir:-logs}/${sys:log.name:-solr}.log"
+ filePattern="${sys:log.dir:-logs}/${sys:log.name:-solr}.log.%i">
+ <PatternLayout>
+ <Pattern>
+ %maxLen{%d{yyyy-MM-dd HH:mm:ss.SSS} %-5p (%t) [%notEmpty{c:%X{collection}}%notEmpty{ s:%X{shard}}%notEmpty{ r:%X{replica}}%notEmpty{ x:%X{core}}] %c{1.}
+ %m%notEmpty{ =>%ex{short}}}{10240}%n
+ </Pattern>
+ </PatternLayout>
+ <Policies>
+ <OnStartupTriggeringPolicy/>
+ <SizeBasedTriggeringPolicy size="128 MB"/>
+ </Policies>
+ <DefaultRolloverStrategy max="10"/>
+ </RollingRandomAccessFile>
</Appenders>
<Loggers>
- <!-- Use <AsyncLogger/<AsyncRoot and <Logger/<Root for asynchronous logging or synchonous logging respectively -->
+
+
+ <Logger name="kafka.server.KafkaConfig" level="WARN"/>
+ <Logger name="org.apache.kafka.clients.producer.ProducerConfig" level="WARN"/>
+ <Logger name="org.apache.kafka.clients.consumer.ConsumerConfig" level="WARN"/>
+ <Logger name="org.apache.kafka.clients.admin.AdminClientConfig" level="WARN"/>
+
+
<Logger name="org.apache.zookeeper" level="WARN"/>
<Logger name="org.apache.hadoop" level="WARN"/>
<Logger name="org.apache.directory" level="WARN"/>
@@ -36,6 +60,7 @@
<Logger name="org.eclipse.jetty" level="INFO"/>
<Root level="INFO">
+ <AppenderRef ref="MainLogFile"/>
<AppenderRef ref="STDERR"/>
</Root>
</Loggers>
diff --git a/crossdc-producer/build.gradle b/crossdc-producer/build.gradle
index f361cac..728706b 100644
--- a/crossdc-producer/build.gradle
+++ b/crossdc-producer/build.gradle
@@ -28,16 +28,16 @@ repositories {
}
dependencies {
- compile project(':crossdc-consumer')
- compile project(':crossdc-commons')
- compile group: 'org.apache.solr', name: 'solr-solrj', version: '8.11.1'
- compile group: 'org.apache.solr', name: 'solr-core', version: '8.11.1'
+ implementation project(':crossdc-consumer')
+ implementation project(':crossdc-commons')
+ implementation group: 'org.apache.solr', name: 'solr-solrj', version: '8.11.1'
+ implementation group: 'org.apache.solr', name: 'solr-core', version: '8.11.1'
implementation 'org.slf4j:slf4j-api'
- compile 'org.eclipse.jetty:jetty-http:9.4.41.v20210516'
- compile 'org.eclipse.jetty:jetty-server:9.4.41.v20210516'
- compile 'org.eclipse.jetty:jetty-servlet:9.4.41.v20210516'
- compile 'org.apache.kafka:kafka-clients:2.8.0'
- compile group: 'com.google.guava', name: 'guava', version: '14.0'
+ implementation 'org.eclipse.jetty:jetty-http:9.4.41.v20210516'
+ implementation 'org.eclipse.jetty:jetty-server:9.4.41.v20210516'
+ implementation 'org.eclipse.jetty:jetty-servlet:9.4.41.v20210516'
+ implementation 'org.apache.kafka:kafka-clients:2.8.0'
+ implementation group: 'com.google.guava', name: 'guava', version: '14.0'
runtimeOnly ('com.google.protobuf:protobuf-java-util:3.19.2')
testImplementation 'org.hamcrest:hamcrest:2.2'
testImplementation 'junit:junit:4.13.2'
@@ -50,13 +50,8 @@ dependencies {
testImplementation 'org.apache.kafka:kafka-streams:2.8.1'
testImplementation 'org.apache.kafka:kafka_2.13:2.8.1:test'
- testImplementation ('org.apache.kafka:kafka-clients:2.8.1:test') {
- //exclude group: 'org.apache.kafka' // or finer grained, if we like
- }
- testImplementation ('org.apache.kafka:kafka-streams:2.8.1:test') {
- //exclude group: 'org.apache.kafka' // or finer grained, if we like
- }
- // testImplementation 'org.apache.kafka:kafka-streams-test-utils:2.8.1'
+ testImplementation 'org.apache.kafka:kafka-clients:2.8.1:test'
+ testImplementation 'org.apache.kafka:kafka-streams:2.8.1:test'
}
@@ -64,6 +59,13 @@ subprojects {
group "org.apache.solr"
}
+task copyToLib(type: Copy) {
+ into "$buildDir/libs"
+ from configurations.runtimeClasspath
+}
+
+jar.dependsOn(copyToLib)
+
test {
jvmArgs '-Djava.security.egd=file:/dev/./urandom'
}
diff --git a/crossdc-producer/src/main/java/org/apache/solr/update/processor/KafkaRequestMirroringHandler.java b/crossdc-producer/src/main/java/org/apache/solr/update/processor/KafkaRequestMirroringHandler.java
index 5c4c23d..bcd620f 100644
--- a/crossdc-producer/src/main/java/org/apache/solr/update/processor/KafkaRequestMirroringHandler.java
+++ b/crossdc-producer/src/main/java/org/apache/solr/update/processor/KafkaRequestMirroringHandler.java
@@ -17,23 +17,25 @@
package org.apache.solr.update.processor;
import org.apache.solr.client.solrj.request.UpdateRequest;
-import org.apache.solr.crossdc.KafkaMirroringSink;
-import org.apache.solr.crossdc.MirroringException;
+import org.apache.solr.crossdc.common.KafkaMirroringSink;
+import org.apache.solr.crossdc.common.MirroringException;
import org.apache.solr.crossdc.common.KafkaCrossDcConf;
import org.apache.solr.crossdc.common.MirroredSolrRequest;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+import java.lang.invoke.MethodHandles;
import java.util.concurrent.TimeUnit;
public class KafkaRequestMirroringHandler implements RequestMirroringHandler {
- final KafkaCrossDcConf conf;
+
+ private static final Logger log = LoggerFactory.getLogger(MethodHandles.lookup().lookupClass());
+
final KafkaMirroringSink sink;
- public KafkaRequestMirroringHandler() {
- // TODO: Setup Kafka properly
- final String topicName = System.getProperty("topicname");
- final String boostrapServers = System.getProperty("bootstrapservers");
- conf = new KafkaCrossDcConf(boostrapServers, topicName, false, null);
- sink = new KafkaMirroringSink(conf);
+ public KafkaRequestMirroringHandler(KafkaMirroringSink sink) {
+ log.info("create KafkaRequestMirroringHandler");
+ this.sink = sink;
}
/**
@@ -43,6 +45,7 @@ public class KafkaRequestMirroringHandler implements RequestMirroringHandler {
*/
@Override
public void mirror(UpdateRequest request) throws MirroringException {
+ log.info("submit update to sink {}", request.getDocuments());
// TODO: Enforce external version constraint for consistent update replication (cross-cluster)
sink.submit(new MirroredSolrRequest(1, request, TimeUnit.MILLISECONDS.toNanos(
System.currentTimeMillis())));
diff --git a/crossdc-producer/src/main/java/org/apache/solr/update/processor/MirroringUpdateRequestProcessorFactory.java b/crossdc-producer/src/main/java/org/apache/solr/update/processor/MirroringUpdateRequestProcessorFactory.java
index 4097c20..cabdda8 100644
--- a/crossdc-producer/src/main/java/org/apache/solr/update/processor/MirroringUpdateRequestProcessorFactory.java
+++ b/crossdc-producer/src/main/java/org/apache/solr/update/processor/MirroringUpdateRequestProcessorFactory.java
@@ -23,11 +23,15 @@ import org.apache.solr.common.params.CommonParams;
import org.apache.solr.common.params.ModifiableSolrParams;
import org.apache.solr.common.params.SolrParams;
import org.apache.solr.common.util.NamedList;
+import org.apache.solr.core.CloseHook;
import org.apache.solr.core.CoreDescriptor;
import org.apache.solr.core.SolrCore;
+import org.apache.solr.crossdc.common.KafkaCrossDcConf;
+import org.apache.solr.crossdc.common.KafkaMirroringSink;
import org.apache.solr.request.SolrQueryRequest;
import org.apache.solr.response.SolrQueryResponse;
import org.apache.solr.update.AddUpdateCommand;
+import org.apache.solr.update.CommitUpdateCommand;
import org.apache.solr.update.DeleteUpdateCommand;
import org.apache.solr.update.RollbackUpdateCommand;
import org.apache.solr.util.plugin.SolrCoreAware;
@@ -35,6 +39,7 @@ import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import java.io.IOException;
+import java.lang.invoke.MethodHandles;
import static org.apache.solr.common.SolrException.ErrorCode.SERVER_ERROR;
import static org.apache.solr.update.processor.DistributedUpdateProcessor.*;
@@ -55,23 +60,69 @@ import static org.apache.solr.update.processor.DistributingUpdateProcessorFactor
*/
public class MirroringUpdateRequestProcessorFactory extends UpdateRequestProcessorFactory
implements SolrCoreAware, UpdateRequestProcessorFactory.RunAlways {
- private static final Logger log = LoggerFactory.getLogger(MirroringUpdateRequestProcessorFactory.class);
+
+ private static final Logger log = LoggerFactory.getLogger(MethodHandles.lookup().lookupClass());
// Flag for mirroring requests
public static String SERVER_SHOULD_MIRROR = "shouldMirror";
/** This is instantiated in inform(SolrCore) and then shared by all processor instances - visible for testing */
- volatile RequestMirroringHandler mirroringHandler;
+ volatile KafkaRequestMirroringHandler mirroringHandler;
@Override
public void init(final NamedList args) {
+
super.init(args);
}
+ private class Closer {
+ private final KafkaMirroringSink sink;
+
+ public Closer(KafkaMirroringSink sink) {
+ this.sink = sink;
+ }
+
+ public void close() {
+ try {
+ this.sink.close();
+ } catch (IOException e) {
+ log.error("Exception closing sink", sink);
+ }
+ }
+
+ }
+
@Override
public void inform(SolrCore core) {
+ log.info("KafkaRequestMirroringHandler inform");
// load the request mirroring sink class and instantiate.
- mirroringHandler = core.getResourceLoader().newInstance(RequestMirroringHandler.class.getName(), KafkaRequestMirroringHandler.class);
+ // mirroringHandler = core.getResourceLoader().newInstance(RequestMirroringHandler.class.getName(), KafkaRequestMirroringHandler.class);
+
+
+ // TODO: Setup Kafka properly
+ final String topicName = System.getProperty("topicName");
+ if (topicName == null) {
+ throw new IllegalArgumentException("topicName not specified for producer");
+ }
+ final String boostrapServers = System.getProperty("bootstrapServers");
+ if (boostrapServers == null) {
+ throw new IllegalArgumentException("boostrapServers not specified for producer");
+ }
+ KafkaCrossDcConf conf = new KafkaCrossDcConf(boostrapServers, topicName, false, null);
+ KafkaMirroringSink sink = new KafkaMirroringSink(conf);
+
+ Closer closer = new Closer(sink);
+ core.addCloseHook(new CloseHook() {
+ @Override public void preClose(SolrCore core) {
+
+ }
+
+ @Override public void postClose(SolrCore core) {
+ closer.close();
+ }
+ });
+
+ mirroringHandler = new KafkaRequestMirroringHandler(sink);
}
@Override
@@ -110,7 +161,7 @@ public class MirroringUpdateRequestProcessorFactory extends UpdateRequestProcess
// prevent circular mirroring
mirroredParams.set(SERVER_SHOULD_MIRROR, Boolean.FALSE.toString());
}
-
+ log.info("Create MirroringUpdateProcessor");
return new MirroringUpdateProcessor(next, doMirroring, mirroredParams,
DistribPhase.parseParam(req.getParams().get(DISTRIB_UPDATE_PARAM)), doMirroring ? mirroringHandler : null);
}
@@ -205,6 +256,11 @@ public class MirroringUpdateRequestProcessorFactory extends UpdateRequestProcess
// TODO: We can't/shouldn't support this ?
}
+ public void processCommit(CommitUpdateCommand cmd) throws IOException {
+ log.info("process commit cmd={}", cmd);
+ if (next != null) next.processCommit(cmd);
+ }
+
@Override
public void finish() throws IOException {
super.finish();
diff --git a/crossdc-consumer/src/test/java/org/apache/solr/crossdc/SolrAndKafkaIntegrationTest.java b/crossdc-producer/src/test/Java/org/apache/solr/crossdc/SolrAndKafkaIntegrationTest.java
similarity index 53%
rename from crossdc-consumer/src/test/java/org/apache/solr/crossdc/SolrAndKafkaIntegrationTest.java
rename to crossdc-producer/src/test/Java/org/apache/solr/crossdc/SolrAndKafkaIntegrationTest.java
index 147f2be..7df4438 100644
--- a/crossdc-consumer/src/test/java/org/apache/solr/crossdc/SolrAndKafkaIntegrationTest.java
+++ b/crossdc-producer/src/test/Java/org/apache/solr/crossdc/SolrAndKafkaIntegrationTest.java
@@ -1,8 +1,8 @@
package org.apache.solr.crossdc;
-<<<<<<< refs/remotes/markrmiller/itwip
import com.carrotsearch.randomizedtesting.annotations.ThreadLeakAction;
import com.carrotsearch.randomizedtesting.annotations.ThreadLeakFilters;
+import com.carrotsearch.randomizedtesting.annotations.ThreadLeakLingering;
import org.apache.kafka.clients.producer.KafkaProducer;
import org.apache.kafka.clients.producer.Producer;
import org.apache.kafka.clients.producer.ProducerRecord;
@@ -10,14 +10,19 @@ import org.apache.kafka.common.serialization.StringSerializer;
import org.apache.kafka.streams.integration.utils.EmbeddedKafkaCluster;
import org.apache.lucene.util.QuickPatchThreadsFilter;
import org.apache.solr.SolrIgnoredThreadsFilter;
+import org.apache.solr.SolrTestCaseJ4;
+import org.apache.solr.client.solrj.SolrQuery;
+import org.apache.solr.client.solrj.impl.CloudSolrClient;
import org.apache.solr.client.solrj.request.CollectionAdminRequest;
import org.apache.solr.client.solrj.request.UpdateRequest;
+import org.apache.solr.client.solrj.response.QueryResponse;
import org.apache.solr.cloud.MiniSolrCloudCluster;
import org.apache.solr.cloud.SolrCloudTestCase;
+import org.apache.solr.common.SolrInputDocument;
+import org.apache.solr.common.util.ObjectReleaseTracker;
import org.apache.solr.crossdc.common.MirroredSolrRequest;
import org.apache.solr.crossdc.common.MirroredSolrRequestSerializer;
import org.apache.solr.crossdc.consumer.Consumer;
-import org.apache.solr.crossdc.messageprocessor.SolrMessageProcessor;
import org.junit.AfterClass;
import org.junit.BeforeClass;
import org.slf4j.Logger;
@@ -25,31 +30,13 @@ import org.slf4j.LoggerFactory;
import java.lang.invoke.MethodHandles;
import java.util.Properties;
-=======
-import com.carrotsearch.randomizedtesting.annotations.ThreadLeakFilters;
-import org.apache.kafka.streams.integration.utils.EmbeddedKafkaCluster;
-import org.apache.lucene.util.QuickPatchThreadsFilter;
-import org.apache.solr.SolrIgnoredThreadsFilter;
-import org.apache.solr.client.solrj.request.UpdateRequest;
-import org.apache.solr.cloud.MiniSolrCloudCluster;
-import org.apache.solr.cloud.SolrCloudTestCase;
-import org.apache.solr.common.SolrInputDocument;
-import org.apache.solr.crossdc.common.MirroredSolrRequest;
-import org.apache.solr.crossdc.messageprocessor.SolrMessageProcessor;
-import org.junit.AfterClass;
-import org.junit.BeforeClass;
-
-import java.util.Map;
->>>>>>> Add back in the EmbeddedKafkaCluster.
import static org.mockito.Mockito.spy;
-@ThreadLeakFilters(
- defaultFilters = true,
- filters = { SolrIgnoredThreadsFilter.class, QuickPatchThreadsFilter.class, SolrKafkaTestsIgnoredThreadsFilter.class})
-<<<<<<< refs/remotes/markrmiller/itwip
-@ThreadLeakAction(ThreadLeakAction.Action.INTERRUPT)
-public class SolrAndKafkaIntegrationTest extends SolrCloudTestCase {
+@ThreadLeakFilters(defaultFilters = true, filters = { SolrIgnoredThreadsFilter.class,
+ QuickPatchThreadsFilter.class, SolrKafkaTestsIgnoredThreadsFilter.class })
+@ThreadLeakLingering(linger = 5000) public class SolrAndKafkaIntegrationTest extends
+ SolrTestCaseJ4 {
private static final Logger log = LoggerFactory.getLogger(MethodHandles.lookup().lookupClass());
@@ -64,28 +51,14 @@ public class SolrAndKafkaIntegrationTest extends SolrCloudTestCase {
protected static volatile Consumer consumer = new Consumer();
private static String TOPIC = "topic1";
-
- private static String COLLECTION = "collection1";
-=======
-public class SolrAndKafkaIntegrationTest extends SolrCloudTestCase {
- static final String VERSION_FIELD = "_version_";
-
- private static final int NUM_BROKERS = 1;
- public static final EmbeddedKafkaCluster CLUSTER = new EmbeddedKafkaCluster(NUM_BROKERS);
- protected static volatile MiniSolrCloudCluster cluster1;
- protected static volatile MiniSolrCloudCluster cluster2;
- private static SolrMessageProcessor processor;
->>>>>>> Add back in the EmbeddedKafkaCluster.
+ private static String COLLECTION = "collection1";
- private static ResubmitBackoffPolicy backoffPolicy = spy(new TestMessageProcessor.NoOpResubmitBackoffPolicy());
+ @BeforeClass public static void setupIntegrationTest() throws Exception {
- @BeforeClass
- public static void setupIntegrationTest() throws Exception {
-<<<<<<< refs/remotes/markrmiller/itwip
Properties config = new Properties();
- config.put("unclean.leader.election.enable", "true");
- config.put("enable.partition.eof", "false");
+ //config.put("unclean.leader.election.enable", "true");
+ //config.put("enable.partition.eof", "false");
kafkaCluster = new EmbeddedKafkaCluster(NUM_BROKERS, config) {
public String bootstrapServers() {
@@ -96,10 +69,11 @@ public class SolrAndKafkaIntegrationTest extends SolrCloudTestCase {
kafkaCluster.createTopic(TOPIC, 1, 1);
- solrCluster1 =
- new Builder(2, createTempDir())
- .addConfig("conf", getFile("src/test/resources/configs/cloud-minimal/conf").toPath())
- .configure();
+ System.setProperty("topicName", TOPIC);
+ System.setProperty("bootstrapServers", kafkaCluster.bootstrapServers());
+
+ solrCluster1 = new SolrCloudTestCase.Builder(1, createTempDir()).addConfig("conf",
+ getFile("src/test/resources/configs/cloud-minimal/conf").toPath()).configure();
CollectionAdminRequest.Create create =
CollectionAdminRequest.createCollection(COLLECTION, "conf", 1, 1);
@@ -108,27 +82,26 @@ public class SolrAndKafkaIntegrationTest extends SolrCloudTestCase {
solrCluster1.getSolrClient().setDefaultCollection(COLLECTION);
- String bootstrapServers = kafkaCluster.bootstrapServers();
- log.info("bootstrapServers={}", bootstrapServers);
+ solrCluster2 = new SolrCloudTestCase.Builder(1, createTempDir()).addConfig("conf",
+ getFile("src/test/resources/configs/cloud-minimal/conf").toPath()).configure();
- consumer.start(bootstrapServers, solrCluster1.getZkServer().getZkAddress(), TOPIC, false, 0);
+ CollectionAdminRequest.Create create2 =
+ CollectionAdminRequest.createCollection(COLLECTION, "conf", 1, 1);
+ solrCluster2.getSolrClient().request(create2);
+ solrCluster2.waitForActiveCollection(COLLECTION, 1, 1);
-=======
+ solrCluster2.getSolrClient().setDefaultCollection(COLLECTION);
- CLUSTER.start();
+ String bootstrapServers = kafkaCluster.bootstrapServers();
+ log.info("bootstrapServers={}", bootstrapServers);
- cluster1 =
- new Builder(2, createTempDir())
- .addConfig("conf", getFile("src/resources/configs/cloud-minimal/conf").toPath())
- .configure();
+ consumer.start(bootstrapServers, solrCluster2.getZkServer().getZkAddress(), TOPIC, false, 0);
- processor = new SolrMessageProcessor(cluster1.getSolrClient(), backoffPolicy);
->>>>>>> Add back in the EmbeddedKafkaCluster.
}
- @AfterClass
- public static void tearDownIntegrationTest() throws Exception {
-<<<<<<< refs/remotes/markrmiller/itwip
+ @AfterClass public static void tearDownIntegrationTest() throws Exception {
+ ObjectReleaseTracker.clear();
+
consumer.shutdown();
try {
@@ -138,14 +111,55 @@ public class SolrAndKafkaIntegrationTest extends SolrCloudTestCase {
}
if (solrCluster1 != null) {
+ solrCluster1.getZkServer().getZkClient().printLayoutToStdOut();
solrCluster1.shutdown();
}
if (solrCluster2 != null) {
+ solrCluster2.getZkServer().getZkClient().printLayoutToStdOut();
solrCluster2.shutdown();
}
+ //ObjectReleaseTracker.clear();
+ // if (solrCluster2 != null) {
+ // solrCluster2.shutdown();
+ //}
}
- public void test() throws InterruptedException {
+ public void testFullCloudToCloud() throws Exception {
+ Thread.sleep(10000); // TODO why?
+
+ CloudSolrClient client = solrCluster1.getSolrClient();
+ SolrInputDocument doc = new SolrInputDocument();
+ doc.addField("id", String.valueOf(System.currentTimeMillis()));
+ doc.addField("text", "some test");
+
+ client.add(doc);
+
+ client.commit(COLLECTION);
+
+ System.out.println("Sent producer record");
+
+ QueryResponse results = null;
+ boolean foundUpdates = false;
+ for (int i = 0; i < 50; i++) {
+ solrCluster2.getSolrClient().commit(COLLECTION);
+ solrCluster1.getSolrClient().query(COLLECTION, new SolrQuery("*:*"));
+ results = solrCluster2.getSolrClient().query(COLLECTION, new SolrQuery("*:*"));
+ if (results.getResults().getNumFound() == 1) {
+ foundUpdates = true;
+ } else {
+ Thread.sleep(500);
+ }
+ }
+
+ //producer.close();
+ System.out.println("Closed producer");
+
+ assertTrue("results=" + results, foundUpdates);
+ System.out.println("Rest: " + results);
+
+ }
+
+ public void testProducerToCloud() throws Exception {
Thread.sleep(10000);
Properties properties = new Properties();
properties.put("bootstrap.servers", kafkaCluster.bootstrapServers());
@@ -159,8 +173,8 @@ public class SolrAndKafkaIntegrationTest extends SolrCloudTestCase {
Producer<String, MirroredSolrRequest> producer = new KafkaProducer(properties);
UpdateRequest updateRequest = new UpdateRequest();
updateRequest.setParam("shouldMirror", "true");
- updateRequest.add("id", String.valueOf(System.currentTimeMillis()));
- updateRequest.add("text", "test");
+ updateRequest.add("id", String.valueOf(System.currentTimeMillis()), "text", "test");
+ updateRequest.add("id", String.valueOf(System.currentTimeMillis() + 22), "text", "test2");
updateRequest.setParam("collection", COLLECTION);
MirroredSolrRequest mirroredSolrRequest = new MirroredSolrRequest(updateRequest);
System.out.println("About to send producer record");
@@ -168,25 +182,26 @@ public class SolrAndKafkaIntegrationTest extends SolrCloudTestCase {
log.info("Producer finished sending metadata={}, exception={}", metadata, exception);
});
producer.flush();
- System.out.println("Sent producer record");
- producer.close();
- System.out.println("Closed producer");
- Thread.sleep(10000);
-=======
-
- CLUSTER.stop();
+ System.out.println("Sent producer record");
- if (cluster1 != null) {
- cluster1.shutdown();
- }
- if (cluster2 != null) {
- cluster2.shutdown();
+ QueryResponse results = null;
+ boolean foundUpdates = false;
+ for (int i = 0; i < 50; i++) {
+ // solrCluster1.getSolrClient().commit(COLLECTION);
+ results = solrCluster2.getSolrClient().query(COLLECTION, new SolrQuery("*:*"));
+ if (results.getResults().getNumFound() == 1) {
+ foundUpdates = true;
+ } else {
+ Thread.sleep(500);
+ }
}
- }
- public void test() {
+ System.out.println("Closed producer");
+
+ assertTrue("results=" + results, foundUpdates);
+ System.out.println("Rest: " + results);
->>>>>>> Add back in the EmbeddedKafkaCluster.
+ producer.close();
}
}
diff --git a/crossdc-consumer/src/test/java/org/apache/solr/crossdc/SolrKafkaTestsIgnoredThreadsFilter.java b/crossdc-producer/src/test/Java/org/apache/solr/crossdc/SolrKafkaTestsIgnoredThreadsFilter.java
similarity index 100%
rename from crossdc-consumer/src/test/java/org/apache/solr/crossdc/SolrKafkaTestsIgnoredThreadsFilter.java
rename to crossdc-producer/src/test/Java/org/apache/solr/crossdc/SolrKafkaTestsIgnoredThreadsFilter.java
diff --git a/crossdc-producer/src/test/resources/configs/cloud-minimal/conf/schema.xml b/crossdc-producer/src/test/resources/configs/cloud-minimal/conf/schema.xml
new file mode 100644
index 0000000..bc4676c
--- /dev/null
+++ b/crossdc-producer/src/test/resources/configs/cloud-minimal/conf/schema.xml
@@ -0,0 +1,54 @@
+<?xml version="1.0" encoding="UTF-8" ?>
+<!--
+ Licensed to the Apache Software Foundation (ASF) under one or more
+ contributor license agreements. See the NOTICE file distributed with
+ this work for additional information regarding copyright ownership.
+ The ASF licenses this file to You under the Apache License, Version 2.0
+ (the "License"); you may not use this file except in compliance with
+ the License. You may obtain a copy of the License at
+
+ http://www.apache.org/licenses/LICENSE-2.0
+
+ Unless required by applicable law or agreed to in writing, software
+ distributed under the License is distributed on an "AS IS" BASIS,
+ WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ See the License for the specific language governing permissions and
+ limitations under the License.
+-->
+<schema name="minimal" version="1.1">
+ <fieldType name="boolean" class="solr.BoolField" sortMissingLast="true"/>
+ <fieldType name="string" class="solr.StrField" docValues="true"/>
+ <fieldType name="int" class="org.apache.solr.schema.IntPointField" docValues="true" omitNorms="true"
+ positionIncrementGap="0"/>
+ <fieldType name="long" class="org.apache.solr.schema.LongPointField" docValues="true" omitNorms="true"
+ positionIncrementGap="0"/>
+ <fieldType name="float" class="org.apache.solr.schema.FloatPointField" docValues="true" omitNorms="true"
+ positionIncrementGap="0"/>
+ <fieldType name="double" class="org.apache.solr.schema.DoublePointField" docValues="true" omitNorms="true"
+ positionIncrementGap="0"/>
+ <fieldType name="date" class="org.apache.solr.schema.DatePointField" docValues="true" omitNorms="true"
+ positionIncrementGap="0"/>
+ <fieldType name="text" class="solr.TextField">
+ <analyzer>
+ <tokenizer class="solr.StandardTokenizerFactory"/>
+ <filter class="solr.LowerCaseFilterFactory"/>
+ </analyzer>
+ </fieldType>
+
+ <!-- for versioning -->
+ <field name="_version_" type="long" indexed="true" stored="true"/>
+ <field name="_root_" type="string" indexed="true" stored="true" multiValued="false" required="false"/>
+ <field name="id" type="string" indexed="true" stored="true"/>
+ <field name="text" type="text" indexed="true" stored="false"/>
+
+ <dynamicField name="*_b" type="boolean" indexed="true" stored="true"/>
+ <dynamicField name="*_s" type="string" indexed="true" stored="false"/>
+ <dynamicField name="*_t" type="text" indexed="true" stored="false"/>
+ <dynamicField name="*_i" type="int" indexed="false" stored="false"/>
+ <dynamicField name="*_l" type="long" indexed="false" stored="false"/>
+ <dynamicField name="*_f" type="float" indexed="false" stored="false"/>
+ <dynamicField name="*_d" type="double" indexed="false" stored="false"/>
+ <dynamicField name="*_dt" type="date" indexed="false" stored="false"/>
+
+ <uniqueKey>id</uniqueKey>
+</schema>
diff --git a/crossdc-producer/src/test/resources/configs/cloud-minimal/conf/solrconfig.xml b/crossdc-producer/src/test/resources/configs/cloud-minimal/conf/solrconfig.xml
new file mode 100644
index 0000000..6e057ab
--- /dev/null
+++ b/crossdc-producer/src/test/resources/configs/cloud-minimal/conf/solrconfig.xml
@@ -0,0 +1,120 @@
+<?xml version="1.0" ?>
+
+<!--
+ Licensed to the Apache Software Foundation (ASF) under one or more
+ contributor license agreements. See the NOTICE file distributed with
+ this work for additional information regarding copyright ownership.
+ The ASF licenses this file to You under the Apache License, Version 2.0
+ (the "License"); you may not use this file except in compliance with
+ the License. You may obtain a copy of the License at
+
+ http://www.apache.org/licenses/LICENSE-2.0
+
+ Unless required by applicable law or agreed to in writing, software
+ distributed under the License is distributed on an "AS IS" BASIS,
+ WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ See the License for the specific language governing permissions and
+ limitations under the License.
+-->
+
+<!-- Minimal solrconfig.xml with /select, /admin and /update only -->
+
+<config>
+
+ <dataDir>${solr.data.dir:}</dataDir>
+
+ <directoryFactory name="DirectoryFactory"
+ class="${directoryFactory:solr.NRTCachingDirectoryFactory}"/>
+ <schemaFactory class="ClassicIndexSchemaFactory"/>
+
+ <luceneMatchVersion>${tests.luceneMatchVersion:LATEST}</luceneMatchVersion>
+
+ <indexConfig>
+ <mergePolicyFactory class="${mergePolicyFactory:org.apache.solr.index.TieredMergePolicyFactory}">
+ <int name="maxMergeAtOnce">${maxMergeAtOnce:10}</int>
+ <int name="segmentsPerTier">${segmentsPerTier:10}</int>
+ <double name="noCFSRatio">${noCFSRatio:.1}</double>
+ </mergePolicyFactory>
+
+ <useCompoundFile>${useCompoundFile:true}</useCompoundFile>
+
+ <ramBufferSizeMB>${ramBufferSizeMB:160}</ramBufferSizeMB>
+ <maxBufferedDocs>${maxBufferedDocs:250000}</maxBufferedDocs> <!-- Force the common case to flush by doc count -->
+ <!-- <ramPerThreadHardLimitMB>60</ramPerThreadHardLimitMB> -->
+
+ <!-- <mergeScheduler class="org.apache.lucene.index.ConcurrentMergeScheduler">
+ <int name="maxThreadCount">6</int>
+ <int name="maxMergeCount">8</int>
+ <bool name="ioThrottle">false</bool>
+ </mergeScheduler> -->
+
+ <writeLockTimeout>1000</writeLockTimeout>
+ <commitLockTimeout>10000</commitLockTimeout>
+
+ <!-- this sys property is not set by SolrTestCaseJ4 because almost all tests should
+ use the single process lockType for speed - but tests that explicitly need
+ to vary the lockType canset it as needed.
+ -->
+ <lockType>${lockType:single}</lockType>
+
+ <infoStream>${infostream:false}</infoStream>
+
+ </indexConfig>
+
+ <updateHandler class="solr.DirectUpdateHandler2">
+ <commitWithin>
+ <softCommit>${commitwithin.softcommit:true}</softCommit>
+ </commitWithin>
+ <autoCommit>
+ <maxTime>${autoCommit.maxTime:60000}</maxTime>
+ </autoCommit>
+ <updateLog class="${ulog:solr.UpdateLog}" enable="${enable.update.log:true}"/>
+ </updateHandler>
+
+ <requestHandler name="/select" class="solr.SearchHandler">
+ <lst name="defaults">
+ <str name="echoParams">explicit</str>
+ <str name="indent">true</str>
+ <str name="df">text</str>
+ </lst>
+
+ </requestHandler>
+
+ <query>
+ <queryResultCache
+ enabled="${queryResultCache.enabled:false}"
+ class="${queryResultCache.class:solr.CaffeineCache}"
+ size="${queryResultCache.size:0}"
+ initialSize="${queryResultCache.initialSize:0}"
+ autowarmCount="${queryResultCache.autowarmCount:0}"/>
+ <documentCache
+ enabled="${documentCache.enabled:false}"
+ class="${documentCache.class:solr.CaffeineCache}"
+ size="${documentCache.size:0}"
+ initialSize="${documentCache.initialSize:0}"
+ autowarmCount="${documentCache.autowarmCount:0}"/>
+ <filterCache
+ enabled ="${filterCache.enabled:false}"
+ class="${filterCache.class:solr.CaffeineCache}"
+ size="${filterCache.size:1}"
+ initialSize="${filterCache.initialSize:1}"
+ autowarmCount="${filterCache.autowarmCount:0}"
+ async="${filterCache.async:false}"/>
+ <cache name="myPerSegmentCache"
+ enabled="${myPerSegmentCache.enabled:false}"
+ class="${myPerSegmentCache.class:solr.CaffeineCache}"
+ size="${myPerSegmentCache.size:0}"
+ initialSize="${myPerSegmentCache.initialSize:0}"
+ autowarmCount="${myPerSegmentCache.autowarmCount:0}"/>
+ </query>
+
+ <updateRequestProcessorChain name="mirrorUpdateChain" default="true">
+ <processor class="org.apache.solr.update.processor.MirroringUpdateRequestProcessorFactory">
+
+ </processor>
+ <processor class="solr.LogUpdateProcessorFactory" />
+ <processor class="solr.RunUpdateProcessorFactory" />
+ </updateRequestProcessorChain>
+
+</config>
+
diff --git a/crossdc-consumer/src/test/resources/log4j2.xml b/crossdc-producer/src/test/resources/log4j2.xml
similarity index 62%
copy from crossdc-consumer/src/test/resources/log4j2.xml
copy to crossdc-producer/src/test/resources/log4j2.xml
index 96f69f1..c63e736 100644
--- a/crossdc-consumer/src/test/resources/log4j2.xml
+++ b/crossdc-producer/src/test/resources/log4j2.xml
@@ -26,9 +26,33 @@
</Pattern>
</PatternLayout>
</Console>
+
+ <RollingRandomAccessFile
+ name="MainLogFile"
+ fileName="${sys:log.dir:-logs}/${sys:log.name:-solr}.log"
+ filePattern="${sys:log.dir:-logs}/${sys:log.name:-solr}.log.%i">
+ <PatternLayout>
+ <Pattern>
+ %maxLen{%d{yyyy-MM-dd HH:mm:ss.SSS} %-5p (%t) [%notEmpty{c:%X{collection}}%notEmpty{ s:%X{shard}}%notEmpty{ r:%X{replica}}%notEmpty{ x:%X{core}}] %c{1.}
+ %m%notEmpty{ =>%ex{short}}}{10240}%n
+ </Pattern>
+ </PatternLayout>
+ <Policies>
+ <OnStartupTriggeringPolicy/>
+ <SizeBasedTriggeringPolicy size="128 MB"/>
+ </Policies>
+ <DefaultRolloverStrategy max="10"/>
+ </RollingRandomAccessFile>
</Appenders>
<Loggers>
- <!-- Use <AsyncLogger/<AsyncRoot and <Logger/<Root for asynchronous logging or synchonous logging respectively -->
+
+
+ <Logger name="kafka.server.KafkaConfig" level="WARN"/>
+ <Logger name="org.apache.kafka.clients.producer.ProducerConfig" level="WARN"/>
+ <Logger name="org.apache.kafka.clients.consumer.ConsumerConfig" level="WARN"/>
+ <Logger name="org.apache.kafka.clients.admin.AdminClientConfig" level="WARN"/>
+
+
<Logger name="org.apache.zookeeper" level="WARN"/>
<Logger name="org.apache.hadoop" level="WARN"/>
<Logger name="org.apache.directory" level="WARN"/>
@@ -36,6 +60,7 @@
<Logger name="org.eclipse.jetty" level="INFO"/>
<Root level="INFO">
+ <AppenderRef ref="MainLogFile"/>
<AppenderRef ref="STDERR"/>
</Root>
</Loggers>
diff --git a/gradle/wrapper/gradle-wrapper.properties b/gradle/wrapper/gradle-wrapper.properties
index 8cf6eb5..ffed3a2 100644
--- a/gradle/wrapper/gradle-wrapper.properties
+++ b/gradle/wrapper/gradle-wrapper.properties
@@ -1,5 +1,5 @@
distributionBase=GRADLE_USER_HOME
distributionPath=wrapper/dists
-distributionUrl=https\://services.gradle.org/distributions/gradle-6.8.3-all.zip
+distributionUrl=https\://services.gradle.org/distributions/gradle-7.2-bin.zip
zipStoreBase=GRADLE_USER_HOME
zipStorePath=wrapper/dists
diff --git a/crossdc-consumer/src/test/resources/log4j2.xml b/log4j2.xml
similarity index 62%
copy from crossdc-consumer/src/test/resources/log4j2.xml
copy to log4j2.xml
index 96f69f1..c63e736 100644
--- a/crossdc-consumer/src/test/resources/log4j2.xml
+++ b/log4j2.xml
@@ -26,9 +26,33 @@
</Pattern>
</PatternLayout>
</Console>
+
+ <RollingRandomAccessFile
+ name="MainLogFile"
+ fileName="${sys:log.dir:-logs}/${sys:log.name:-solr}.log"
+ filePattern="${sys:log.dir:-logs}/${sys:log.name:-solr}.log.%i">
+ <PatternLayout>
+ <Pattern>
+ %maxLen{%d{yyyy-MM-dd HH:mm:ss.SSS} %-5p (%t) [%notEmpty{c:%X{collection}}%notEmpty{ s:%X{shard}}%notEmpty{ r:%X{replica}}%notEmpty{ x:%X{core}}] %c{1.}
+ %m%notEmpty{ =>%ex{short}}}{10240}%n
+ </Pattern>
+ </PatternLayout>
+ <Policies>
+ <OnStartupTriggeringPolicy/>
+ <SizeBasedTriggeringPolicy size="128 MB"/>
+ </Policies>
+ <DefaultRolloverStrategy max="10"/>
+ </RollingRandomAccessFile>
</Appenders>
<Loggers>
- <!-- Use <AsyncLogger/<AsyncRoot and <Logger/<Root for asynchronous logging or synchonous logging respectively -->
+
+
+ <Logger name="kafka.server.KafkaConfig" level="WARN"/>
+ <Logger name="org.apache.kafka.clients.producer.ProducerConfig" level="WARN"/>
+ <Logger name="org.apache.kafka.clients.consumer.ConsumerConfig" level="WARN"/>
+ <Logger name="org.apache.kafka.clients.admin.AdminClientConfig" level="WARN"/>
+
+
<Logger name="org.apache.zookeeper" level="WARN"/>
<Logger name="org.apache.hadoop" level="WARN"/>
<Logger name="org.apache.directory" level="WARN"/>
@@ -36,6 +60,7 @@
<Logger name="org.eclipse.jetty" level="INFO"/>
<Root level="INFO">
+ <AppenderRef ref="MainLogFile"/>
<AppenderRef ref="STDERR"/>
</Root>
</Loggers>