You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@solr.apache.org by an...@apache.org on 2022/05/13 00:29:20 UTC
[solr-sandbox] branch crossdc-wip updated: IntegrationTest WIP (#9)
This is an automated email from the ASF dual-hosted git repository.
anshum pushed a commit to branch crossdc-wip
in repository https://gitbox.apache.org/repos/asf/solr-sandbox.git
The following commit(s) were added to refs/heads/crossdc-wip by this push:
new f7bb6ab IntegrationTest WIP (#9)
f7bb6ab is described below
commit f7bb6aba1bb71355d05363cd3bc1046aa51026fd
Author: Mark Robert Miller <ma...@apache.org>
AuthorDate: Thu May 12 19:29:15 2022 -0500
IntegrationTest WIP (#9)
---
build.gradle | 2 +
crossdc-consumer/build.gradle | 19 +++-
.../apache/solr/crossdc/KafkaMirroringSink.java | 27 ++++--
.../apache/solr/crossdc/ResubmitBackoffPolicy.java | 16 ++++
.../solr/crossdc/common/CrossDcConstants.java | 1 -
.../apache/solr/crossdc/common/IQueueHandler.java | 1 -
.../solr/crossdc/common/KafkaCrossDcConf.java | 8 +-
.../common/MirroredSolrRequestSerializer.java | 30 +++++-
.../org/apache/solr/crossdc/consumer/Consumer.java | 95 ++++++++++++-------
.../solr/crossdc/helpers/SendDummyUpdates.java | 2 +-
.../crossdc/messageprocessor/MessageProcessor.java | 16 ++++
.../solr/crossdc/SimpleSolrIntegrationTest.java | 2 +-
.../solr/crossdc/SolrAndKafkaIntegrationTest.java | 105 +++++++++++++++++----
.../configs/cloud-minimal/conf/schema.xml | 0
.../configs/cloud-minimal/conf/solrconfig.xml | 0
.../src/{ => test}/resources/log4j2.xml | 0
16 files changed, 254 insertions(+), 70 deletions(-)
diff --git a/build.gradle b/build.gradle
index 081a382..79f252e 100644
--- a/build.gradle
+++ b/build.gradle
@@ -22,8 +22,10 @@
* Learn more about Gradle by exploring our samples at https://docs.gradle.org/6.7.1/samples
*/
+
description 'Root for Solr plugins sandbox'
+
subprojects {
group "org.apache.solr.crossdc"
}
diff --git a/crossdc-consumer/build.gradle b/crossdc-consumer/build.gradle
index 141b060..ab162f7 100644
--- a/crossdc-consumer/build.gradle
+++ b/crossdc-consumer/build.gradle
@@ -15,7 +15,7 @@
* limitations under the License.
*/
plugins {
- id 'java-library'
+ id 'application'
}
description = 'Cross-DC Consumer package'
@@ -27,13 +27,17 @@ repositories {
jcenter()
}
+application {
+ mainClass = 'org.apache.solr.crossdc.consumer.Consumer'
+}
+
dependencies {
implementation group: 'org.apache.solr', name: 'solr-solrj', version: '8.11.1'
implementation 'org.slf4j:slf4j-api'
- api 'org.eclipse.jetty:jetty-http:9.4.41.v20210516'
- api 'org.eclipse.jetty:jetty-server:9.4.41.v20210516'
- api 'org.eclipse.jetty:jetty-servlet:9.4.41.v20210516'
- api 'org.apache.kafka:kafka-clients:2.8.0'
+ compile 'org.eclipse.jetty:jetty-http:9.4.41.v20210516'
+ compile 'org.eclipse.jetty:jetty-server:9.4.41.v20210516'
+ compile 'org.eclipse.jetty:jetty-servlet:9.4.41.v20210516'
+ compile 'org.apache.kafka:kafka-clients:2.8.0'
compile group: 'com.google.guava', name: 'guava', version: '14.0'
runtimeOnly ('com.google.protobuf:protobuf-java-util:3.19.2')
testImplementation 'org.hamcrest:hamcrest:2.2'
@@ -56,6 +60,11 @@ dependencies {
// testImplementation 'org.apache.kafka:kafka-streams-test-utils:2.8.1'
}
+
subprojects {
group "org.apache.solr"
+}
+
+test {
+ jvmArgs '-Djava.security.egd=file:/dev/./urandom'
}
\ No newline at end of file
diff --git a/crossdc-consumer/src/main/java/org/apache/solr/crossdc/KafkaMirroringSink.java b/crossdc-consumer/src/main/java/org/apache/solr/crossdc/KafkaMirroringSink.java
index 120241a..d6dcd50 100644
--- a/crossdc-consumer/src/main/java/org/apache/solr/crossdc/KafkaMirroringSink.java
+++ b/crossdc-consumer/src/main/java/org/apache/solr/crossdc/KafkaMirroringSink.java
@@ -18,18 +18,22 @@ package org.apache.solr.crossdc;
import org.apache.kafka.clients.producer.KafkaProducer;
import org.apache.kafka.clients.producer.Producer;
+import org.apache.kafka.common.serialization.StringSerializer;
import org.apache.solr.crossdc.common.KafkaCrossDcConf;
import org.apache.solr.crossdc.common.MirroredSolrRequest;
+import org.apache.solr.crossdc.common.MirroredSolrRequestSerializer;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import java.io.Closeable;
import java.io.IOException;
+import java.lang.invoke.MethodHandles;
import java.util.Properties;
import java.util.concurrent.TimeUnit;
public class KafkaMirroringSink implements RequestMirroringSink, Closeable {
- private static final Logger logger = LoggerFactory.getLogger(KafkaMirroringSink.class);
+
+ private static final Logger log = LoggerFactory.getLogger(MethodHandles.lookup().lookupClass());
private long lastSuccessfulEnqueueNanos;
private KafkaCrossDcConf conf;
@@ -39,13 +43,13 @@ public class KafkaMirroringSink implements RequestMirroringSink, Closeable {
// Create Kafka Mirroring Sink
this.conf = conf;
this.producer = initProducer();
- logger.info("KafkaMirroringSink has been created. Producer & Topic have been created successfully! Configurations {}", conf);
+ log.info("KafkaMirroringSink has been created. Producer & Topic have been created successfully! Configurations {}", conf);
}
@Override
public void submit(MirroredSolrRequest request) throws MirroringException {
- if (logger.isDebugEnabled()) {
- logger.debug("About to submit a MirroredSolrRequest");
+ if (log.isDebugEnabled()) {
+ log.debug("About to submit a MirroredSolrRequest");
}
final long enqueueStartNanos = System.nanoTime();
@@ -53,7 +57,7 @@ public class KafkaMirroringSink implements RequestMirroringSink, Closeable {
// Create Producer record
try {
lastSuccessfulEnqueueNanos = System.nanoTime();
- // Record time since last successful enque as 0
+ // Record time since last successful enqueue as 0
long elapsedTimeMillis = TimeUnit.NANOSECONDS.toMillis(System.nanoTime() - enqueueStartNanos);
// Update elapsed time
@@ -64,7 +68,7 @@ public class KafkaMirroringSink implements RequestMirroringSink, Closeable {
// We are intentionally catching all exceptions, the expected exception form this function is {@link MirroringException}
String message = String.format("Unable to enqueue request %s, # of attempts %s", request, conf.getNumOfRetries());
- logger.error(message, e);
+ log.error(message, e);
throw new MirroringException(message, e);
}
@@ -80,13 +84,20 @@ public class KafkaMirroringSink implements RequestMirroringSink, Closeable {
private Producer<String, MirroredSolrRequest> initProducer() {
// Initialize and return Kafka producer
Properties props = new Properties();
- logger.info("Creating Kafka producer! Configurations {} ", conf.toString());
+
+ log.info("Creating Kafka producer! Configurations {} ", conf.toString());
+
+ props.put("bootstrap.servers", conf.getBootStrapServers());
+
+ props.put("key.serializer", StringSerializer.class.getName());
+ props.put("value.serializer", MirroredSolrRequestSerializer.class.getName());
+
Producer<String, MirroredSolrRequest> producer = new KafkaProducer(props);
return producer;
}
private void slowSubmitAction(Object request, long elapsedTimeMillis) {
- logger.warn("Enqueuing the request to Kafka took more than {} millis. enqueueElapsedTime={}",
+ log.warn("Enqueuing the request to Kafka took more than {} millis. enqueueElapsedTime={}",
conf.getSlowSubmitThresholdInMillis(),
elapsedTimeMillis);
}
diff --git a/crossdc-consumer/src/main/java/org/apache/solr/crossdc/ResubmitBackoffPolicy.java b/crossdc-consumer/src/main/java/org/apache/solr/crossdc/ResubmitBackoffPolicy.java
index 145e0d0..e531cca 100644
--- a/crossdc-consumer/src/main/java/org/apache/solr/crossdc/ResubmitBackoffPolicy.java
+++ b/crossdc-consumer/src/main/java/org/apache/solr/crossdc/ResubmitBackoffPolicy.java
@@ -1,3 +1,19 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
package org.apache.solr.crossdc;
import org.apache.solr.crossdc.common.MirroredSolrRequest;
diff --git a/crossdc-consumer/src/main/java/org/apache/solr/crossdc/common/CrossDcConstants.java b/crossdc-consumer/src/main/java/org/apache/solr/crossdc/common/CrossDcConstants.java
index 1d6e355..ba0a73d 100644
--- a/crossdc-consumer/src/main/java/org/apache/solr/crossdc/common/CrossDcConstants.java
+++ b/crossdc-consumer/src/main/java/org/apache/solr/crossdc/common/CrossDcConstants.java
@@ -14,7 +14,6 @@
* See the License for the specific language governing permissions and
* limitations under the License.
*/
-
package org.apache.solr.crossdc.common;
public class CrossDcConstants {
diff --git a/crossdc-consumer/src/main/java/org/apache/solr/crossdc/common/IQueueHandler.java b/crossdc-consumer/src/main/java/org/apache/solr/crossdc/common/IQueueHandler.java
index 3c0ddff..a242932 100644
--- a/crossdc-consumer/src/main/java/org/apache/solr/crossdc/common/IQueueHandler.java
+++ b/crossdc-consumer/src/main/java/org/apache/solr/crossdc/common/IQueueHandler.java
@@ -14,7 +14,6 @@
* See the License for the specific language governing permissions and
* limitations under the License.
*/
-
package org.apache.solr.crossdc.common;
public interface IQueueHandler<T> {
diff --git a/crossdc-consumer/src/main/java/org/apache/solr/crossdc/common/KafkaCrossDcConf.java b/crossdc-consumer/src/main/java/org/apache/solr/crossdc/common/KafkaCrossDcConf.java
index ee6a4a6..9748f66 100644
--- a/crossdc-consumer/src/main/java/org/apache/solr/crossdc/common/KafkaCrossDcConf.java
+++ b/crossdc-consumer/src/main/java/org/apache/solr/crossdc/common/KafkaCrossDcConf.java
@@ -19,12 +19,14 @@ package org.apache.solr.crossdc.common;
public class KafkaCrossDcConf extends CrossDcConf {
private final String topicName;
private final boolean enableDataEncryption;
+ private final String bootstrapServers;
private long slowSubmitThresholdInMillis;
private int numOfRetries = 5;
private final String solrZkConnectString;
- public KafkaCrossDcConf(String topicName, boolean enableDataEncryption, String solrZkConnectString) {
+ public KafkaCrossDcConf(String bootstrapServers, String topicName, boolean enableDataEncryption, String solrZkConnectString) {
+ this.bootstrapServers = bootstrapServers;
this.topicName = topicName;
this.enableDataEncryption = enableDataEncryption;
this.solrZkConnectString = solrZkConnectString;
@@ -55,4 +57,8 @@ public class KafkaCrossDcConf extends CrossDcConf {
public String getClusterName() {
return null;
}
+
+ public String getBootStrapServers() {
+ return bootstrapServers;
+ }
}
diff --git a/crossdc-consumer/src/main/java/org/apache/solr/crossdc/common/MirroredSolrRequestSerializer.java b/crossdc-consumer/src/main/java/org/apache/solr/crossdc/common/MirroredSolrRequestSerializer.java
index 592a36c..bb3104e 100644
--- a/crossdc-consumer/src/main/java/org/apache/solr/crossdc/common/MirroredSolrRequestSerializer.java
+++ b/crossdc-consumer/src/main/java/org/apache/solr/crossdc/common/MirroredSolrRequestSerializer.java
@@ -16,16 +16,23 @@
*/
package org.apache.solr.crossdc.common;
+import org.apache.kafka.common.serialization.Deserializer;
import org.apache.kafka.common.serialization.Serializer;
import org.apache.solr.client.solrj.SolrRequest;
import org.apache.solr.client.solrj.request.JavaBinUpdateRequestCodec;
import org.apache.solr.client.solrj.request.UpdateRequest;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+import java.io.ByteArrayInputStream;
import java.io.ByteArrayOutputStream;
import java.io.IOException;
+import java.lang.invoke.MethodHandles;
import java.util.Map;
-public class MirroredSolrRequestSerializer implements Serializer<MirroredSolrRequest> {
+public class MirroredSolrRequestSerializer implements Serializer<MirroredSolrRequest>, Deserializer<MirroredSolrRequest> {
+
+ private static final Logger log = LoggerFactory.getLogger(MethodHandles.lookup().lookupClass());
private boolean isKey;
/**
@@ -39,6 +46,27 @@ public class MirroredSolrRequestSerializer implements Serializer<MirroredSolrReq
this.isKey = isKey;
}
+ @Override
+ public MirroredSolrRequest deserialize(String topic, byte[] data) {
+ UpdateRequest solrRequest;
+
+ JavaBinUpdateRequestCodec codec = new JavaBinUpdateRequestCodec();
+ ByteArrayInputStream bais = new ByteArrayInputStream(data);
+
+ try {
+ solrRequest = codec.unmarshal(bais,
+ (document, req, commitWithin, override) -> {
+
+ });
+ } catch (IOException e) {
+ throw new RuntimeException(e);
+ }
+
+ log.info("solrRequest={}, {}", solrRequest.getParams(), solrRequest.getDocuments());
+
+ return new MirroredSolrRequest(solrRequest);
+ }
+
/**
* Convert {@code data} into a byte array.
*
diff --git a/crossdc-consumer/src/main/java/org/apache/solr/crossdc/consumer/Consumer.java b/crossdc-consumer/src/main/java/org/apache/solr/crossdc/consumer/Consumer.java
index 8885d8a..3373a96 100644
--- a/crossdc-consumer/src/main/java/org/apache/solr/crossdc/consumer/Consumer.java
+++ b/crossdc-consumer/src/main/java/org/apache/solr/crossdc/consumer/Consumer.java
@@ -14,20 +14,22 @@
* See the License for the specific language governing permissions and
* limitations under the License.
*/
-
package org.apache.solr.crossdc.consumer;
-import org.apache.kafka.clients.consumer.ConsumerRecord;
-import org.apache.kafka.clients.consumer.ConsumerRecords;
-import org.apache.kafka.clients.consumer.KafkaConsumer;
-import org.apache.kafka.clients.consumer.OffsetAndMetadata;
+import org.apache.kafka.clients.consumer.*;
import org.apache.kafka.common.TopicPartition;
import org.apache.kafka.common.errors.WakeupException;
+import org.apache.kafka.common.serialization.ByteArrayDeserializer;
+import org.apache.kafka.common.serialization.StringDeserializer;
+import org.apache.kafka.common.serialization.StringSerializer;
+import org.apache.solr.client.solrj.impl.CloudSolrClient;
import org.apache.solr.crossdc.KafkaMirroringSink;
import org.apache.solr.crossdc.MirroringException;
+import org.apache.solr.crossdc.ResubmitBackoffPolicy;
import org.apache.solr.crossdc.common.IQueueHandler;
import org.apache.solr.crossdc.common.KafkaCrossDcConf;
import org.apache.solr.crossdc.common.MirroredSolrRequest;
+import org.apache.solr.crossdc.common.MirroredSolrRequestSerializer;
import org.apache.solr.crossdc.messageprocessor.SolrMessageProcessor;
import org.eclipse.jetty.server.Connector;
import org.eclipse.jetty.server.Server;
@@ -40,6 +42,7 @@ import java.lang.invoke.MethodHandles;
import java.time.Duration;
import java.util.Collections;
import java.util.List;
+import java.util.Optional;
import java.util.Properties;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;
@@ -48,27 +51,25 @@ import java.util.concurrent.Executors;
public class Consumer {
private static boolean enabled = true;
+ private static final Logger log = LoggerFactory.getLogger(MethodHandles.lookup().lookupClass());
+
/**
* ExecutorService to manage the cross-dc consumer threads.
*/
private ExecutorService consumerThreadExecutor;
- private static final Logger logger = LoggerFactory.getLogger(Consumer.class);
-
private Server server;
CrossDcConsumer crossDcConsumer;
+ private String topicName;
- public void start(String[] args) {
-
- boolean enableDataEncryption = Boolean.getBoolean("enableDataEncryption");
- String topicName = System.getProperty("topicName");
- String zkConnectString = System.getProperty("zkConnectString");
+ public void start(String bootstrapServers, String zkConnectString, String topicName, boolean enableDataEncryption, int port) {
+ this.topicName = topicName;
server = new Server();
ServerConnector connector = new ServerConnector(server);
- connector.setPort(8090);
+ connector.setPort(port);
server.setConnectors(new Connector[] {connector});
- crossDcConsumer = getCrossDcConsumer(zkConnectString, topicName, enableDataEncryption);
+ crossDcConsumer = getCrossDcConsumer(bootstrapServers, zkConnectString, topicName, enableDataEncryption);
// Start consumer thread
consumerThreadExecutor = Executors.newSingleThreadExecutor();
@@ -79,16 +80,26 @@ public class Consumer {
Runtime.getRuntime().addShutdownHook(shutdownHook);
}
- private CrossDcConsumer getCrossDcConsumer(String zkConnectString, String topicName,
+ private CrossDcConsumer getCrossDcConsumer(String bootstrapServers, String zkConnectString, String topicName,
boolean enableDataEncryption) {
- final KafkaCrossDcConf conf = new KafkaCrossDcConf(topicName, enableDataEncryption, zkConnectString);
+ KafkaCrossDcConf conf = new KafkaCrossDcConf(bootstrapServers, topicName, enableDataEncryption, zkConnectString);
return new KafkaCrossDcConsumer(conf);
}
public static void main(String[] args) {
+ String bootstrapServers = System.getProperty("bootstrapServers");
+ boolean enableDataEncryption = Boolean.getBoolean("enableDataEncryption");
+ String topicName = System.getProperty("topicName");
+ String zkConnectString = System.getProperty("zkConnectString");
+ String port = System.getProperty("port", "8090");
+
Consumer consumer = new Consumer();
- consumer.start(args);
+ consumer.start(bootstrapServers, zkConnectString, topicName, enableDataEncryption, Integer.parseInt(port));
+ }
+
+ public void shutdown() {
+ crossDcConsumer.shutdown();
}
/**
@@ -96,15 +107,7 @@ public class Consumer {
*/
public abstract static class CrossDcConsumer implements Runnable {
SolrMessageProcessor messageProcessor;
-
- }
-
- public static class CrossDcConsumerFactory {
- private static final Logger log = LoggerFactory.getLogger(MethodHandles.lookup().lookupClass());
-
- CrossDcConsumer getCrossDcConsumer(){
- return null;
- }
+ abstract void shutdown();
}
@@ -119,13 +122,32 @@ public class Consumer {
private final KafkaMirroringSink kafkaMirroringSink;
private final int KAFKA_CONSUMER_POLL_TIMEOUT_MS = 100;
+ private final String topicName;
SolrMessageProcessor messageProcessor;
/**
* @param conf The Kafka consumer configuration
*/
public KafkaCrossDcConsumer(KafkaCrossDcConf conf) {
+ this.topicName = conf.getTopicName();
+
final Properties kafkaConsumerProp = new Properties();
+
+ kafkaConsumerProp.put("bootstrap.servers", conf.getBootStrapServers());
+
+ kafkaConsumerProp.put("key.deserializer", StringDeserializer.class.getName());
+ kafkaConsumerProp.put("value.deserializer", MirroredSolrRequestSerializer.class.getName());
+
+ kafkaConsumerProp.put("group.id", "group_1");
+
+ CloudSolrClient solrClient = new CloudSolrClient.Builder(Collections.singletonList(conf.getSolrZkConnectString()), Optional.empty()).build();
+
+ messageProcessor = new SolrMessageProcessor(solrClient, new ResubmitBackoffPolicy() {
+ @Override public long getBackoffTimeMs(MirroredSolrRequest resubmitRequest) {
+ return 0;
+ }
+ });
+
log.info("Creating Kafka consumer with configuration {}", kafkaConsumerProp);
consumer = createConsumer(kafkaConsumerProp);
@@ -150,21 +172,25 @@ public class Consumer {
@Override
public void run() {
log.info("About to start Kafka consumer thread...");
- String topic="topic";
- log.info("Kafka consumer subscribing to topic topic={}", topic);
- consumer.subscribe(Collections.singleton(topic));
+ log.info("Kafka consumer subscribing to topic topic={}", topicName);
+ consumer.subscribe(Collections.singleton(topicName));
while (pollAndProcessRequests()) {
//no-op within this loop: everything is done in pollAndProcessRequests method defined above.
}
log.info("Closed kafka consumer. Exiting now.");
- consumer.close();
+ try {
+ consumer.close();
+ } catch (Exception e) {
+ log.warn("Failed to close kafka consumer", e);
+ }
+
try {
kafkaMirroringSink.close();
- } catch (IOException e) {
- log.error("Failed to close kafka mirroring sink", e);
+ } catch (Exception e) {
+ log.warn("Failed to close kafka mirroring sink", e);
}
}
@@ -180,7 +206,7 @@ public class Consumer {
List<ConsumerRecord<String, MirroredSolrRequest>> partitionRecords = records.records(partition);
try {
for (ConsumerRecord<String, MirroredSolrRequest> record : partitionRecords) {
- log.trace("Fetched record from topic={} partition={} key={} value={}",
+ log.info("Fetched record from topic={} partition={} key={} value={}",
record.topic(), record.partition(), record.key(), record.value());
IQueueHandler.Result result = messageProcessor.handleItem(record.value());
switch (result.status()) {
@@ -260,7 +286,8 @@ public class Consumer {
/**
* Shutdown the Kafka consumer by calling wakeup.
*/
- void shutdown() {
+ public void shutdown() {
+ log.info("Shutdown called on KafkaCrossDcConsumer");
consumer.wakeup();
}
diff --git a/crossdc-consumer/src/main/java/org/apache/solr/crossdc/helpers/SendDummyUpdates.java b/crossdc-consumer/src/main/java/org/apache/solr/crossdc/helpers/SendDummyUpdates.java
index a066741..818369c 100644
--- a/crossdc-consumer/src/main/java/org/apache/solr/crossdc/helpers/SendDummyUpdates.java
+++ b/crossdc-consumer/src/main/java/org/apache/solr/crossdc/helpers/SendDummyUpdates.java
@@ -32,7 +32,7 @@ public class SendDummyUpdates {
Properties properties = new Properties();
properties.put("bootstrap.servers", "localhost:9092");
properties.put("acks", "all");
- properties.put("retries", 0);
+ properties.put("retries", 3);
properties.put("batch.size", 16384);
properties.put("buffer.memory", 33554432);
properties.put("linger.ms", 1);
diff --git a/crossdc-consumer/src/main/java/org/apache/solr/crossdc/messageprocessor/MessageProcessor.java b/crossdc-consumer/src/main/java/org/apache/solr/crossdc/messageprocessor/MessageProcessor.java
index 210f4ca..efbfef6 100644
--- a/crossdc-consumer/src/main/java/org/apache/solr/crossdc/messageprocessor/MessageProcessor.java
+++ b/crossdc-consumer/src/main/java/org/apache/solr/crossdc/messageprocessor/MessageProcessor.java
@@ -1,3 +1,19 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
package org.apache.solr.crossdc.messageprocessor;
import org.apache.solr.client.solrj.impl.CloudSolrClient;
diff --git a/crossdc-consumer/src/test/java/org/apache/solr/crossdc/SimpleSolrIntegrationTest.java b/crossdc-consumer/src/test/java/org/apache/solr/crossdc/SimpleSolrIntegrationTest.java
index 3177469..547fdca 100644
--- a/crossdc-consumer/src/test/java/org/apache/solr/crossdc/SimpleSolrIntegrationTest.java
+++ b/crossdc-consumer/src/test/java/org/apache/solr/crossdc/SimpleSolrIntegrationTest.java
@@ -32,7 +32,7 @@ public class SimpleSolrIntegrationTest extends SolrCloudTestCase {
cluster1 =
new SolrCloudTestCase.Builder(2, createTempDir())
- .addConfig("conf", getFile("src/resources/configs/cloud-minimal/conf").toPath())
+ .addConfig("conf", getFile("src/test/resources/configs/cloud-minimal/conf").toPath())
.configure();
String collection = "collection1";
diff --git a/crossdc-consumer/src/test/java/org/apache/solr/crossdc/SolrAndKafkaIntegrationTest.java b/crossdc-consumer/src/test/java/org/apache/solr/crossdc/SolrAndKafkaIntegrationTest.java
index 45f5392..b11a087 100644
--- a/crossdc-consumer/src/test/java/org/apache/solr/crossdc/SolrAndKafkaIntegrationTest.java
+++ b/crossdc-consumer/src/test/java/org/apache/solr/crossdc/SolrAndKafkaIntegrationTest.java
@@ -1,64 +1,135 @@
package org.apache.solr.crossdc;
+import com.carrotsearch.randomizedtesting.annotations.ThreadLeakAction;
import com.carrotsearch.randomizedtesting.annotations.ThreadLeakFilters;
+import org.apache.kafka.clients.producer.KafkaProducer;
+import org.apache.kafka.clients.producer.Producer;
+import org.apache.kafka.clients.producer.ProducerRecord;
+import org.apache.kafka.common.serialization.StringSerializer;
import org.apache.kafka.streams.integration.utils.EmbeddedKafkaCluster;
import org.apache.lucene.util.QuickPatchThreadsFilter;
import org.apache.solr.SolrIgnoredThreadsFilter;
+import org.apache.solr.client.solrj.request.CollectionAdminRequest;
import org.apache.solr.client.solrj.request.UpdateRequest;
import org.apache.solr.cloud.MiniSolrCloudCluster;
import org.apache.solr.cloud.SolrCloudTestCase;
-import org.apache.solr.common.SolrInputDocument;
import org.apache.solr.crossdc.common.MirroredSolrRequest;
+import org.apache.solr.crossdc.common.MirroredSolrRequestSerializer;
+import org.apache.solr.crossdc.consumer.Consumer;
import org.apache.solr.crossdc.messageprocessor.SolrMessageProcessor;
import org.junit.AfterClass;
import org.junit.BeforeClass;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
-import java.util.Map;
+import java.lang.invoke.MethodHandles;
+import java.util.Properties;
import static org.mockito.Mockito.spy;
@ThreadLeakFilters(
defaultFilters = true,
filters = { SolrIgnoredThreadsFilter.class, QuickPatchThreadsFilter.class, SolrKafkaTestsIgnoredThreadsFilter.class})
+@ThreadLeakAction(ThreadLeakAction.Action.INTERRUPT)
public class SolrAndKafkaIntegrationTest extends SolrCloudTestCase {
+
+ private static final Logger log = LoggerFactory.getLogger(MethodHandles.lookup().lookupClass());
+
static final String VERSION_FIELD = "_version_";
private static final int NUM_BROKERS = 1;
- public static final EmbeddedKafkaCluster CLUSTER = new EmbeddedKafkaCluster(NUM_BROKERS);
+ public static EmbeddedKafkaCluster kafkaCluster;
+
+ protected static volatile MiniSolrCloudCluster solrCluster1;
+ protected static volatile MiniSolrCloudCluster solrCluster2;
- protected static volatile MiniSolrCloudCluster cluster1;
- protected static volatile MiniSolrCloudCluster cluster2;
- private static SolrMessageProcessor processor;
+ protected static volatile Consumer consumer = new Consumer();
+
+ private static String TOPIC = "topic1";
+
+ private static String COLLECTION = "collection1";
private static ResubmitBackoffPolicy backoffPolicy = spy(new TestMessageProcessor.NoOpResubmitBackoffPolicy());
@BeforeClass
public static void setupIntegrationTest() throws Exception {
+ Properties config = new Properties();
+ config.put("unclean.leader.election.enable", "true");
+ config.put("enable.partition.eof", "false");
+
+ kafkaCluster = new EmbeddedKafkaCluster(NUM_BROKERS, config) {
+ public String bootstrapServers() {
+ return super.bootstrapServers().replaceAll("localhost", "127.0.0.1");
+ }
+ };
+ kafkaCluster.start();
- CLUSTER.start();
+ kafkaCluster.createTopic(TOPIC, 1, 1);
- cluster1 =
+ solrCluster1 =
new Builder(2, createTempDir())
- .addConfig("conf", getFile("src/resources/configs/cloud-minimal/conf").toPath())
+ .addConfig("conf", getFile("src/test/resources/configs/cloud-minimal/conf").toPath())
.configure();
- processor = new SolrMessageProcessor(cluster1.getSolrClient(), backoffPolicy);
+ CollectionAdminRequest.Create create =
+ CollectionAdminRequest.createCollection(COLLECTION, "conf", 1, 1);
+ solrCluster1.getSolrClient().request(create);
+ solrCluster1.waitForActiveCollection(COLLECTION, 1, 1);
+
+ solrCluster1.getSolrClient().setDefaultCollection(COLLECTION);
+
+ String bootstrapServers = kafkaCluster.bootstrapServers();
+ log.info("bootstrapServers={}", bootstrapServers);
+
+ consumer.start(bootstrapServers, solrCluster1.getZkServer().getZkAddress(), TOPIC, false, 0);
+
}
@AfterClass
public static void tearDownIntegrationTest() throws Exception {
+ consumer.shutdown();
- CLUSTER.stop();
+ try {
+ kafkaCluster.stop();
+ } catch (Exception e) {
+ log.error("Exception stopping Kafka cluster", e);
+ }
- if (cluster1 != null) {
- cluster1.shutdown();
+ if (solrCluster1 != null) {
+ solrCluster1.shutdown();
}
- if (cluster2 != null) {
- cluster2.shutdown();
+ if (solrCluster2 != null) {
+ solrCluster2.shutdown();
}
}
- public void test() {
-
+ public void test() throws InterruptedException {
+ Thread.sleep(10000);
+ Properties properties = new Properties();
+ properties.put("bootstrap.servers", kafkaCluster.bootstrapServers());
+ properties.put("acks", "all");
+ properties.put("retries", 0);
+ properties.put("batch.size", 1);
+ properties.put("buffer.memory", 33554432);
+ properties.put("linger.ms", 1);
+ properties.put("key.serializer", StringSerializer.class.getName());
+ properties.put("value.serializer", MirroredSolrRequestSerializer.class.getName());
+ Producer<String, MirroredSolrRequest> producer = new KafkaProducer(properties);
+ UpdateRequest updateRequest = new UpdateRequest();
+ updateRequest.setParam("shouldMirror", "true");
+ updateRequest.add("id", String.valueOf(System.currentTimeMillis()));
+ updateRequest.add("text", "test");
+ updateRequest.setParam("collection", COLLECTION);
+ MirroredSolrRequest mirroredSolrRequest = new MirroredSolrRequest(updateRequest);
+ System.out.println("About to send producer record");
+ producer.send(new ProducerRecord(TOPIC, mirroredSolrRequest), (metadata, exception) -> {
+ log.info("Producer finished sending metadata={}, exception={}", metadata, exception);
+ });
+ producer.flush();
+ System.out.println("Sent producer record");
+ producer.close();
+ System.out.println("Closed producer");
+
+ Thread.sleep(10000);
}
}
diff --git a/crossdc-consumer/src/resources/configs/cloud-minimal/conf/schema.xml b/crossdc-consumer/src/test/resources/configs/cloud-minimal/conf/schema.xml
similarity index 100%
rename from crossdc-consumer/src/resources/configs/cloud-minimal/conf/schema.xml
rename to crossdc-consumer/src/test/resources/configs/cloud-minimal/conf/schema.xml
diff --git a/crossdc-consumer/src/resources/configs/cloud-minimal/conf/solrconfig.xml b/crossdc-consumer/src/test/resources/configs/cloud-minimal/conf/solrconfig.xml
similarity index 100%
rename from crossdc-consumer/src/resources/configs/cloud-minimal/conf/solrconfig.xml
rename to crossdc-consumer/src/test/resources/configs/cloud-minimal/conf/solrconfig.xml
diff --git a/crossdc-consumer/src/resources/log4j2.xml b/crossdc-consumer/src/test/resources/log4j2.xml
similarity index 100%
rename from crossdc-consumer/src/resources/log4j2.xml
rename to crossdc-consumer/src/test/resources/log4j2.xml